deliverator.go (view raw)
1//
2// Copyright (c) 2019 Ted Unangst <tedu@tedunangst.com>
3//
4// Permission to use, copy, modify, and distribute this software for any
5// purpose with or without fee is hereby granted, provided that the above
6// copyright notice and this permission notice appear in all copies.
7//
8// THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
9// WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
10// MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
11// ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
12// WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
13// ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
14// OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
15
16package main
17
18import (
19 "bytes"
20 "database/sql"
21 notrand "math/rand"
22 "strings"
23 "sync"
24 "time"
25
26 "humungus.tedunangst.com/r/webs/gate"
27)
28
29type Doover struct {
30 ID int64
31 When time.Time
32 Userid int64
33 Tries int64
34 Rcpt string
35 Msgs [][]byte
36}
37
38func sayitagain(doover Doover) {
39 doover.Tries += 1
40 var drift time.Duration
41 if doover.Tries <= 3 { // 5, 10, 15 minutes
42 drift = time.Duration(doover.Tries*5) * time.Minute
43 } else if doover.Tries <= 6 { // 1, 2, 3 hours
44 drift = time.Duration(doover.Tries-3) * time.Hour
45 } else if doover.Tries <= 9 { // 12, 12, 12 hours
46 drift = time.Duration(12) * time.Hour
47 } else {
48 ilog.Printf("he's dead jim: %s", doover.Rcpt)
49 return
50 }
51 drift += time.Duration(notrand.Int63n(int64(drift / 10)))
52 when := time.Now().Add(drift)
53 data := bytes.Join(doover.Msgs, []byte{0})
54 _, err := stmtAddDoover.Exec(when.UTC().Format(dbtimeformat), doover.Tries, doover.Userid, doover.Rcpt, data)
55 if err != nil {
56 elog.Printf("error saving doover: %s", err)
57 }
58 select {
59 case pokechan <- 0:
60 default:
61 }
62}
63
64func lethaldose(err error) int64 {
65 str := err.Error()
66 if strings.Contains(str, "no such host") {
67 return 8
68 }
69 return 0
70}
71
72func letitslide(err error) bool {
73 str := err.Error()
74 if strings.Contains(str, "http post status: 400") {
75 return true
76 }
77 if strings.Contains(str, "http post status: 422") {
78 return true
79 }
80 return false
81}
82
83var dqmtx sync.Mutex
84
85func delinquent(userid int64, rcpt string, msg []byte) bool {
86 dqmtx.Lock()
87 defer dqmtx.Unlock()
88 row := stmtDeliquentCheck.QueryRow(userid, rcpt)
89 var dooverid int64
90 var data []byte
91 err := row.Scan(&dooverid, &data)
92 if err == sql.ErrNoRows {
93 return false
94 }
95 if err != nil {
96 elog.Printf("error scanning deliquent check: %s", err)
97 return true
98 }
99 data = append(data, 0)
100 data = append(data, msg...)
101 _, err = stmtDeliquentUpdate.Exec(data, dooverid)
102 if err != nil {
103 elog.Printf("error updating deliquent: %s", err)
104 return true
105 }
106 return true
107}
108
109func deliverate(userid int64, rcpt string, msg []byte) {
110 if delinquent(userid, rcpt, msg) {
111 return
112 }
113 var d Doover
114 d.Userid = userid
115 d.Tries = 0
116 d.Rcpt = rcpt
117 d.Msgs = append(d.Msgs, msg)
118 deliveration(d)
119}
120
121var garage = gate.NewLimiter(40)
122
123func deliveration(doover Doover) {
124 rcpt := doover.Rcpt
125 garage.StartKey(rcpt)
126 defer garage.FinishKey(rcpt)
127
128 var ki *KeyInfo
129 ok := ziggies.Get(doover.Userid, &ki)
130 if !ok {
131 elog.Printf("lost key for delivery")
132 return
133 }
134 var inbox string
135 // already did the box indirection
136 if rcpt[0] == '%' {
137 inbox = rcpt[1:]
138 } else {
139 var box *Box
140 ok := boxofboxes.Get(rcpt, &box)
141 if !ok {
142 ilog.Printf("failed getting inbox for %s", rcpt)
143 sayitagain(doover)
144 return
145 }
146 inbox = box.In
147 }
148 for i, msg := range doover.Msgs {
149 if i > 0 {
150 time.Sleep(2 * time.Second)
151 }
152 err := PostMsg(ki.keyname, ki.seckey, inbox, msg)
153 if err != nil {
154 ilog.Printf("failed to post json to %s: %s", inbox, err)
155 if t := lethaldose(err); t > doover.Tries {
156 doover.Tries = t
157 }
158 if letitslide(err) {
159 dlog.Printf("whatever myever %s", inbox)
160 continue
161 }
162 doover.Msgs = doover.Msgs[i:]
163 sayitagain(doover)
164 return
165 }
166 }
167}
168
169var pokechan = make(chan int, 1)
170
171func getdoovers() []Doover {
172 rows, err := stmtGetDoovers.Query()
173 if err != nil {
174 elog.Printf("wat?")
175 time.Sleep(1 * time.Minute)
176 return nil
177 }
178 defer rows.Close()
179 var doovers []Doover
180 for rows.Next() {
181 var d Doover
182 var dt string
183 err := rows.Scan(&d.ID, &dt)
184 if err != nil {
185 elog.Printf("error scanning dooverid: %s", err)
186 continue
187 }
188 d.When, _ = time.Parse(dbtimeformat, dt)
189 doovers = append(doovers, d)
190 }
191 return doovers
192}
193
194func extractdoover(d *Doover) error {
195 dqmtx.Lock()
196 defer dqmtx.Unlock()
197 row := stmtLoadDoover.QueryRow(d.ID)
198 var data []byte
199 err := row.Scan(&d.Tries, &d.Userid, &d.Rcpt, &data)
200 if err != nil {
201 return err
202 }
203 _, err = stmtZapDoover.Exec(d.ID)
204 if err != nil {
205 return err
206 }
207 d.Msgs = bytes.Split(data, []byte{0})
208 return nil
209}
210
211func redeliverator() {
212 sleeper := time.NewTimer(5 * time.Second)
213 for {
214 select {
215 case <-pokechan:
216 if !sleeper.Stop() {
217 <-sleeper.C
218 }
219 time.Sleep(5 * time.Second)
220 case <-sleeper.C:
221 }
222
223 doovers := getdoovers()
224
225 now := time.Now()
226 nexttime := now.Add(24 * time.Hour)
227 for _, d := range doovers {
228 if d.When.Before(now) {
229 err := extractdoover(&d)
230 if err != nil {
231 elog.Printf("error extracting doover: %s", err)
232 continue
233 }
234 ilog.Printf("redeliverating %s try %d", d.Rcpt, d.Tries)
235 deliveration(d)
236 } else if d.When.Before(nexttime) {
237 nexttime = d.When
238 }
239 }
240 now = time.Now()
241 dur := 5 * time.Second
242 if now.Before(nexttime) {
243 dur += nexttime.Sub(now).Round(time.Second)
244 }
245 sleeper.Reset(dur)
246 }
247}