deliverator.go (view raw)
1//
2// Copyright (c) 2019 Ted Unangst <tedu@tedunangst.com>
3//
4// Permission to use, copy, modify, and distribute this software for any
5// purpose with or without fee is hereby granted, provided that the above
6// copyright notice and this permission notice appear in all copies.
7//
8// THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
9// WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
10// MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
11// ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
12// WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
13// ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
14// OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
15
16package main
17
18import (
19 "bytes"
20 "database/sql"
21 notrand "math/rand"
22 "strings"
23 "sync"
24 "time"
25
26 "humungus.tedunangst.com/r/webs/gate"
27)
28
29type Doover struct {
30 ID int64
31 When time.Time
32 Userid int64
33 Tries int64
34 Rcpt string
35 Msgs [][]byte
36}
37
38func sayitagain(doover Doover) {
39 doover.Tries += 1
40 var drift time.Duration
41 if doover.Tries <= 3 { // 5, 10, 15 minutes
42 drift = time.Duration(doover.Tries*5) * time.Minute
43 } else if doover.Tries <= 6 { // 1, 2, 3 hours
44 drift = time.Duration(doover.Tries-3) * time.Hour
45 } else if doover.Tries <= 9 { // 12, 12, 12 hours
46 drift = time.Duration(12) * time.Hour
47 } else {
48 ilog.Printf("he's dead jim: %s", doover.Rcpt)
49 return
50 }
51 drift += time.Duration(notrand.Int63n(int64(drift / 10)))
52 when := time.Now().Add(drift)
53 data := bytes.Join(doover.Msgs, []byte{0})
54 _, err := stmtAddDoover.Exec(when.UTC().Format(dbtimeformat), doover.Tries, doover.Userid, doover.Rcpt, data)
55 if err != nil {
56 elog.Printf("error saving doover: %s", err)
57 }
58 select {
59 case pokechan <- 0:
60 default:
61 }
62}
63
64func lethaldose(err error) int64 {
65 str := err.Error()
66 if strings.Contains(str, "no such host") {
67 return 8
68 }
69 return 0
70}
71
72func letitslide(err error) bool {
73 str := err.Error()
74 if strings.Contains(str, "http post status: 400") {
75 return true
76 }
77 return false
78}
79
80var dqmtx sync.Mutex
81
82func delinquent(userid int64, rcpt string, msg []byte) bool {
83 dqmtx.Lock()
84 defer dqmtx.Unlock()
85 row := stmtDeliquentCheck.QueryRow(userid, rcpt)
86 var dooverid int64
87 var data []byte
88 err := row.Scan(&dooverid, &data)
89 if err == sql.ErrNoRows {
90 return false
91 }
92 if err != nil {
93 elog.Printf("error scanning deliquent check: %s", err)
94 return true
95 }
96 data = append(data, 0)
97 data = append(data, msg...)
98 _, err = stmtDeliquentUpdate.Exec(data, dooverid)
99 if err != nil {
100 elog.Printf("error updating deliquent: %s", err)
101 return true
102 }
103 return true
104}
105
106func deliverate(userid int64, rcpt string, msg []byte) {
107 if delinquent(userid, rcpt, msg) {
108 return
109 }
110 var d Doover
111 d.Userid = userid
112 d.Tries = 0
113 d.Rcpt = rcpt
114 d.Msgs = append(d.Msgs, msg)
115 deliveration(d)
116}
117
118var garage = gate.NewLimiter(40)
119
120func deliveration(doover Doover) {
121 rcpt := doover.Rcpt
122 garage.StartKey(rcpt)
123 defer garage.FinishKey(rcpt)
124
125 var ki *KeyInfo
126 ok := ziggies.Get(doover.Userid, &ki)
127 if !ok {
128 elog.Printf("lost key for delivery")
129 return
130 }
131 var inbox string
132 // already did the box indirection
133 if rcpt[0] == '%' {
134 inbox = rcpt[1:]
135 } else {
136 var box *Box
137 ok := boxofboxes.Get(rcpt, &box)
138 if !ok {
139 ilog.Printf("failed getting inbox for %s", rcpt)
140 sayitagain(doover)
141 return
142 }
143 inbox = box.In
144 }
145 for i, msg := range doover.Msgs {
146 if i > 0 {
147 time.Sleep(2 * time.Second)
148 }
149 err := PostMsg(ki.keyname, ki.seckey, inbox, msg)
150 if err != nil {
151 ilog.Printf("failed to post json to %s: %s", inbox, err)
152 if t := lethaldose(err); t > doover.Tries {
153 doover.Tries = t
154 }
155 if letitslide(err) {
156 dlog.Printf("whatever myever %s", inbox)
157 continue
158 }
159 doover.Msgs = doover.Msgs[i:]
160 sayitagain(doover)
161 return
162 }
163 }
164}
165
166var pokechan = make(chan int, 1)
167
168func getdoovers() []Doover {
169 rows, err := stmtGetDoovers.Query()
170 if err != nil {
171 elog.Printf("wat?")
172 time.Sleep(1 * time.Minute)
173 return nil
174 }
175 defer rows.Close()
176 var doovers []Doover
177 for rows.Next() {
178 var d Doover
179 var dt string
180 err := rows.Scan(&d.ID, &dt)
181 if err != nil {
182 elog.Printf("error scanning dooverid: %s", err)
183 continue
184 }
185 d.When, _ = time.Parse(dbtimeformat, dt)
186 doovers = append(doovers, d)
187 }
188 return doovers
189}
190
191func extractdoover(d *Doover) error {
192 dqmtx.Lock()
193 defer dqmtx.Unlock()
194 row := stmtLoadDoover.QueryRow(d.ID)
195 var data []byte
196 err := row.Scan(&d.Tries, &d.Userid, &d.Rcpt, &data)
197 if err != nil {
198 return err
199 }
200 _, err = stmtZapDoover.Exec(d.ID)
201 if err != nil {
202 return err
203 }
204 d.Msgs = bytes.Split(data, []byte{0})
205 return nil
206}
207
208func redeliverator() {
209 sleeper := time.NewTimer(5 * time.Second)
210 for {
211 select {
212 case <-pokechan:
213 if !sleeper.Stop() {
214 <-sleeper.C
215 }
216 time.Sleep(5 * time.Second)
217 case <-sleeper.C:
218 }
219
220 doovers := getdoovers()
221
222 now := time.Now()
223 nexttime := now.Add(24 * time.Hour)
224 for _, d := range doovers {
225 if d.When.Before(now) {
226 err := extractdoover(&d)
227 if err != nil {
228 elog.Printf("error extracting doover: %s", err)
229 continue
230 }
231 ilog.Printf("redeliverating %s try %d", d.Rcpt, d.Tries)
232 deliveration(d)
233 } else if d.When.Before(nexttime) {
234 nexttime = d.When
235 }
236 }
237 now = time.Now()
238 dur := 5 * time.Second
239 if now.Before(nexttime) {
240 dur += nexttime.Sub(now).Round(time.Second)
241 }
242 sleeper.Reset(dur)
243 }
244}