deliverator.go (view raw)
1//
2// Copyright (c) 2019 Ted Unangst <tedu@tedunangst.com>
3//
4// Permission to use, copy, modify, and distribute this software for any
5// purpose with or without fee is hereby granted, provided that the above
6// copyright notice and this permission notice appear in all copies.
7//
8// THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
9// WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
10// MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
11// ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
12// WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
13// ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
14// OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
15
16package main
17
18import (
19 "bytes"
20 "database/sql"
21 notrand "math/rand"
22 "strings"
23 "sync"
24 "time"
25
26 "humungus.tedunangst.com/r/webs/gate"
27)
28
29type Doover struct {
30 ID int64
31 When time.Time
32 Userid int64
33 Tries int64
34 Rcpt string
35 Msgs [][]byte
36}
37
38func sayitagain(doover Doover) {
39 doover.Tries += 1
40 var drift time.Duration
41 if doover.Tries <= 3 { // 5, 10, 15 minutes
42 drift = time.Duration(doover.Tries*5) * time.Minute
43 } else if doover.Tries <= 6 { // 1, 2, 3 hours
44 drift = time.Duration(doover.Tries-3) * time.Hour
45 } else if doover.Tries <= 9 { // 12, 12, 12 hours
46 drift = time.Duration(12) * time.Hour
47 } else {
48 ilog.Printf("he's dead jim: %s", doover.Rcpt)
49 return
50 }
51 drift += time.Duration(notrand.Int63n(int64(drift / 10)))
52 when := time.Now().Add(drift)
53 data := bytes.Join(doover.Msgs, []byte{0})
54 _, err := stmtAddDoover.Exec(when.UTC().Format(dbtimeformat), doover.Tries, doover.Userid, doover.Rcpt, data)
55 if err != nil {
56 elog.Printf("error saving doover: %s", err)
57 }
58 select {
59 case pokechan <- 0:
60 default:
61 }
62}
63
64func lethaldose(err error) int64 {
65 str := err.Error()
66 if strings.Contains(str, "no such host") {
67 return 8
68 }
69 return 0
70}
71
72func letitslide(err error) bool {
73 str := err.Error()
74 if strings.Contains(str, "http post status: 400") {
75 return true
76 }
77 if strings.Contains(str, "http post status: 422") {
78 return true
79 }
80 return false
81}
82
83var dqmtx sync.Mutex
84
85func delinquent(userid int64, rcpt string, msg []byte) bool {
86 dqmtx.Lock()
87 defer dqmtx.Unlock()
88 row := stmtDeliquentCheck.QueryRow(userid, rcpt)
89 var dooverid int64
90 var data []byte
91 err := row.Scan(&dooverid, &data)
92 if err == sql.ErrNoRows {
93 return false
94 }
95 if err != nil {
96 elog.Printf("error scanning deliquent check: %s", err)
97 return true
98 }
99 data = append(data, 0)
100 data = append(data, msg...)
101 _, err = stmtDeliquentUpdate.Exec(data, dooverid)
102 if err != nil {
103 elog.Printf("error updating deliquent: %s", err)
104 return true
105 }
106 return true
107}
108
109func deliverate(userid int64, rcpt string, msg []byte) {
110 if delinquent(userid, rcpt, msg) {
111 return
112 }
113 var d Doover
114 d.Userid = userid
115 d.Tries = 0
116 d.Rcpt = rcpt
117 d.Msgs = append(d.Msgs, msg)
118 deliveration(d)
119}
120
121var garage = gate.NewLimiter(40)
122
123func deliveration(doover Doover) {
124 rcpt := doover.Rcpt
125 garage.StartKey(rcpt)
126 defer garage.FinishKey(rcpt)
127
128 ki := ziggy(doover.Userid)
129 if ki == nil {
130 elog.Printf("lost key for delivery")
131 return
132 }
133 var inbox string
134 // already did the box indirection
135 if rcpt[0] == '%' {
136 inbox = rcpt[1:]
137 } else {
138 box, ok := boxofboxes.Get(rcpt)
139 if !ok {
140 ilog.Printf("failed getting inbox for %s", rcpt)
141 sayitagain(doover)
142 return
143 }
144 inbox = box.In
145 }
146 for i, msg := range doover.Msgs {
147 if i > 0 {
148 time.Sleep(2 * time.Second)
149 }
150 err := PostMsg(ki.keyname, ki.seckey, inbox, msg)
151 if err != nil {
152 ilog.Printf("failed to post json to %s: %s", inbox, err)
153 if t := lethaldose(err); t > doover.Tries {
154 doover.Tries = t
155 }
156 if letitslide(err) {
157 dlog.Printf("whatever myever %s", inbox)
158 continue
159 }
160 doover.Msgs = doover.Msgs[i:]
161 sayitagain(doover)
162 return
163 }
164 }
165}
166
167var pokechan = make(chan int, 1)
168
169func getdoovers() []Doover {
170 rows, err := stmtGetDoovers.Query()
171 if err != nil {
172 elog.Printf("wat?")
173 time.Sleep(1 * time.Minute)
174 return nil
175 }
176 defer rows.Close()
177 var doovers []Doover
178 for rows.Next() {
179 var d Doover
180 var dt string
181 err := rows.Scan(&d.ID, &dt)
182 if err != nil {
183 elog.Printf("error scanning dooverid: %s", err)
184 continue
185 }
186 d.When, _ = time.Parse(dbtimeformat, dt)
187 doovers = append(doovers, d)
188 }
189 return doovers
190}
191
192func extractdoover(d *Doover) error {
193 dqmtx.Lock()
194 defer dqmtx.Unlock()
195 row := stmtLoadDoover.QueryRow(d.ID)
196 var data []byte
197 err := row.Scan(&d.Tries, &d.Userid, &d.Rcpt, &data)
198 if err != nil {
199 return err
200 }
201 _, err = stmtZapDoover.Exec(d.ID)
202 if err != nil {
203 return err
204 }
205 d.Msgs = bytes.Split(data, []byte{0})
206 return nil
207}
208
209func redeliverator() {
210 sleeper := time.NewTimer(5 * time.Second)
211 for {
212 select {
213 case <-pokechan:
214 if !sleeper.Stop() {
215 <-sleeper.C
216 }
217 time.Sleep(5 * time.Second)
218 case <-sleeper.C:
219 }
220
221 doovers := getdoovers()
222
223 now := time.Now()
224 nexttime := now.Add(24 * time.Hour)
225 for _, d := range doovers {
226 if d.When.Before(now) {
227 err := extractdoover(&d)
228 if err != nil {
229 elog.Printf("error extracting doover: %s", err)
230 continue
231 }
232 ilog.Printf("redeliverating %s try %d", d.Rcpt, d.Tries)
233 deliveration(d)
234 } else if d.When.Before(nexttime) {
235 nexttime = d.When
236 }
237 }
238 now = time.Now()
239 dur := 5 * time.Second
240 if now.Before(nexttime) {
241 dur += nexttime.Sub(now).Round(time.Second)
242 }
243 sleeper.Reset(dur)
244 }
245}