deliverator.go (view raw)
1//
2// Copyright (c) 2019 Ted Unangst <tedu@tedunangst.com>
3//
4// Permission to use, copy, modify, and distribute this software for any
5// purpose with or without fee is hereby granted, provided that the above
6// copyright notice and this permission notice appear in all copies.
7//
8// THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
9// WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
10// MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
11// ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
12// WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
13// ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
14// OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
15
16package main
17
18import (
19 "bytes"
20 "database/sql"
21 notrand "math/rand"
22 "strings"
23 "sync"
24 "time"
25
26 "humungus.tedunangst.com/r/webs/gate"
27)
28
29type Doover struct {
30 ID int64
31 When time.Time
32 Userid UserID
33 Tries int64
34 Rcpt string
35 Msgs [][]byte
36}
37
38func sayitagain(doover Doover) {
39 doover.Tries += 1
40 var drift time.Duration
41 if doover.Tries <= 3 { // 5, 10, 15 minutes
42 drift = time.Duration(doover.Tries*5) * time.Minute
43 } else if doover.Tries <= 6 { // 1, 2, 3 hours
44 drift = time.Duration(doover.Tries-3) * time.Hour
45 } else if doover.Tries <= 9 { // 12, 12, 12 hours
46 drift = time.Duration(12) * time.Hour
47 } else {
48 ilog.Printf("he's dead jim: %s", doover.Rcpt)
49 return
50 }
51 drift += time.Duration(notrand.Int63n(int64(drift / 10)))
52 when := time.Now().Add(drift)
53 data := bytes.Join(doover.Msgs, []byte{0})
54 _, err := stmtAddDoover.Exec(when.UTC().Format(dbtimeformat), doover.Tries, doover.Userid, doover.Rcpt, data)
55 if err != nil {
56 elog.Printf("error saving doover: %s", err)
57 }
58 select {
59 case pokechan <- 0:
60 default:
61 }
62}
63
64const nearlyDead = 8
65
66func lethaldose(err error) int64 {
67 str := err.Error()
68 if strings.Contains(str, "no such host") {
69 return nearlyDead
70 }
71 return 0
72}
73
74func letitslide(err error) bool {
75 str := err.Error()
76 if strings.Contains(str, "http post status: 400") {
77 return true
78 }
79 if strings.Contains(str, "http post status: 422") {
80 return true
81 }
82 return false
83}
84
85var dqmtx sync.Mutex
86
87func delinquent(userid UserID, rcpt string, msg []byte) bool {
88 dqmtx.Lock()
89 defer dqmtx.Unlock()
90 row := stmtDeliquentCheck.QueryRow(userid, rcpt)
91 var dooverid int64
92 var data []byte
93 err := row.Scan(&dooverid, &data)
94 if err == sql.ErrNoRows {
95 return false
96 }
97 if err != nil {
98 elog.Printf("error scanning deliquent check: %s", err)
99 return true
100 }
101 data = append(data, 0)
102 data = append(data, msg...)
103 _, err = stmtDeliquentUpdate.Exec(data, dooverid)
104 if err != nil {
105 elog.Printf("error updating deliquent: %s", err)
106 return true
107 }
108 return true
109}
110
111func deliverate(userid UserID, rcpt string, msg []byte) {
112 if delinquent(userid, rcpt, msg) {
113 return
114 }
115 var d Doover
116 d.Userid = userid
117 d.Tries = 0
118 d.Rcpt = rcpt
119 d.Msgs = append(d.Msgs, msg)
120 deliveration(d)
121}
122
123var garage = gate.NewLimiter(40)
124
125func deliveration(doover Doover) {
126 requestWG.Add(1)
127 defer requestWG.Done()
128 rcpt := doover.Rcpt
129 garage.StartKey(rcpt)
130 defer garage.FinishKey(rcpt)
131
132 ki := ziggy(doover.Userid)
133 if ki == nil {
134 elog.Printf("lost key for delivery")
135 return
136 }
137 var inbox string
138 // already did the box indirection
139 if rcpt[0] == '%' {
140 inbox = rcpt[1:]
141 } else {
142 box, ok := boxofboxes.Get(rcpt)
143 if !ok {
144 ilog.Printf("failed getting inbox for %s", rcpt)
145 if doover.Tries < nearlyDead {
146 doover.Tries = nearlyDead
147 }
148 sayitagain(doover)
149 return
150 }
151 inbox = box.In
152 }
153 for i, msg := range doover.Msgs {
154 if i > 0 {
155 time.Sleep(2 * time.Second)
156 }
157 err := PostMsg(ki.keyname, ki.seckey, inbox, msg)
158 if err != nil {
159 ilog.Printf("failed to post json to %s: %s", inbox, err)
160 if t := lethaldose(err); t > doover.Tries {
161 doover.Tries = t
162 }
163 if letitslide(err) {
164 dlog.Printf("whatever myever %s", inbox)
165 continue
166 }
167 doover.Msgs = doover.Msgs[i:]
168 sayitagain(doover)
169 return
170 }
171 }
172}
173
174var pokechan = make(chan int, 1)
175
176func getdoovers() []Doover {
177 rows, err := stmtGetDoovers.Query()
178 if err != nil {
179 elog.Printf("wat?")
180 time.Sleep(1 * time.Minute)
181 return nil
182 }
183 defer rows.Close()
184 var doovers []Doover
185 for rows.Next() {
186 var d Doover
187 var dt string
188 err := rows.Scan(&d.ID, &dt)
189 if err != nil {
190 elog.Printf("error scanning dooverid: %s", err)
191 continue
192 }
193 d.When, _ = time.Parse(dbtimeformat, dt)
194 doovers = append(doovers, d)
195 }
196 return doovers
197}
198
199func extractdoover(d *Doover) error {
200 dqmtx.Lock()
201 defer dqmtx.Unlock()
202 row := stmtLoadDoover.QueryRow(d.ID)
203 var data []byte
204 err := row.Scan(&d.Tries, &d.Userid, &d.Rcpt, &data)
205 if err != nil {
206 return err
207 }
208 _, err = stmtZapDoover.Exec(d.ID)
209 if err != nil {
210 return err
211 }
212 d.Msgs = bytes.Split(data, []byte{0})
213 return nil
214}
215
216func redeliverator() {
217 workinprogress++
218 sleeper := time.NewTimer(5 * time.Second)
219 for {
220 select {
221 case <-pokechan:
222 if !sleeper.Stop() {
223 <-sleeper.C
224 }
225 time.Sleep(5 * time.Second)
226 case <-sleeper.C:
227 case <-endoftheworld:
228 readyalready <- true
229 return
230 }
231
232 doovers := getdoovers()
233
234 now := time.Now()
235 nexttime := now.Add(24 * time.Hour)
236 for _, d := range doovers {
237 if d.When.Before(now) {
238 err := extractdoover(&d)
239 if err != nil {
240 elog.Printf("error extracting doover: %s", err)
241 continue
242 }
243 ilog.Printf("redeliverating %s try %d", d.Rcpt, d.Tries)
244 deliveration(d)
245 } else if d.When.Before(nexttime) {
246 nexttime = d.When
247 }
248 }
249 now = time.Now()
250 dur := 5 * time.Second
251 if now.Before(nexttime) {
252 dur += nexttime.Sub(now).Round(time.Second)
253 }
254 sleeper.Reset(dur)
255 }
256}