deliverator.go (view raw)
1//
2// Copyright (c) 2019 Ted Unangst <tedu@tedunangst.com>
3//
4// Permission to use, copy, modify, and distribute this software for any
5// purpose with or without fee is hereby granted, provided that the above
6// copyright notice and this permission notice appear in all copies.
7//
8// THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
9// WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
10// MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
11// ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
12// WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
13// ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
14// OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
15
16package main
17
18import (
19 "bytes"
20 "database/sql"
21 notrand "math/rand"
22 "strings"
23 "sync"
24 "time"
25
26 "humungus.tedunangst.com/r/webs/gate"
27)
28
29type Doover struct {
30 ID int64
31 When time.Time
32 Userid int64
33 Tries int64
34 Rcpt string
35 Msgs [][]byte
36}
37
38func sayitagain(doover Doover) {
39 doover.Tries += 1
40 var drift time.Duration
41 if doover.Tries <= 3 { // 5, 10, 15 minutes
42 drift = time.Duration(doover.Tries*5) * time.Minute
43 } else if doover.Tries <= 6 { // 1, 2, 3 hours
44 drift = time.Duration(doover.Tries-3) * time.Hour
45 } else if doover.Tries <= 9 { // 12, 12, 12 hours
46 drift = time.Duration(12) * time.Hour
47 } else {
48 ilog.Printf("he's dead jim: %s", doover.Rcpt)
49 return
50 }
51 drift += time.Duration(notrand.Int63n(int64(drift / 10)))
52 when := time.Now().Add(drift)
53 data := bytes.Join(doover.Msgs, []byte{0})
54 _, err := stmtAddDoover.Exec(when.UTC().Format(dbtimeformat), doover.Tries, doover.Userid, doover.Rcpt, data)
55 if err != nil {
56 elog.Printf("error saving doover: %s", err)
57 }
58 select {
59 case pokechan <- 0:
60 default:
61 }
62}
63
64func lethaldose(err error) int64 {
65 str := err.Error()
66 if strings.Contains(str, "no such host") {
67 return 8
68 }
69 return 0
70}
71
72var dqmtx sync.Mutex
73
74func delinquent(userid int64, rcpt string, msg []byte) bool {
75 dqmtx.Lock()
76 defer dqmtx.Unlock()
77 row := stmtDeliquentCheck.QueryRow(userid, rcpt)
78 var dooverid int64
79 var data []byte
80 err := row.Scan(&dooverid, &data)
81 if err == sql.ErrNoRows {
82 return false
83 }
84 if err != nil {
85 elog.Printf("error scanning deliquent check: %s", err)
86 return true
87 }
88 data = append(data, 0)
89 data = append(data, msg...)
90 _, err = stmtDeliquentUpdate.Exec(data, dooverid)
91 if err != nil {
92 elog.Printf("error updating deliquent: %s", err)
93 return true
94 }
95 return true
96}
97
98func deliverate(userid int64, rcpt string, msg []byte) {
99 if delinquent(userid, rcpt, msg) {
100 return
101 }
102 var d Doover
103 d.Userid = userid
104 d.Tries = 0
105 d.Rcpt = rcpt
106 d.Msgs = append(d.Msgs, msg)
107 deliveration(d)
108}
109
110var garage = gate.NewLimiter(40)
111
112func deliveration(doover Doover) {
113 garage.Start()
114 defer garage.Finish()
115
116 var ki *KeyInfo
117 ok := ziggies.Get(doover.Userid, &ki)
118 if !ok {
119 elog.Printf("lost key for delivery")
120 return
121 }
122 var inbox string
123 rcpt := doover.Rcpt
124 // already did the box indirection
125 if rcpt[0] == '%' {
126 inbox = rcpt[1:]
127 } else {
128 var box *Box
129 ok := boxofboxes.Get(rcpt, &box)
130 if !ok {
131 ilog.Printf("failed getting inbox for %s", rcpt)
132 sayitagain(doover)
133 return
134 }
135 inbox = box.In
136 }
137 for i, msg := range doover.Msgs {
138 if i > 0 {
139 time.Sleep(2 * time.Second)
140 }
141 err := PostMsg(ki.keyname, ki.seckey, inbox, msg)
142 if err != nil {
143 ilog.Printf("failed to post json to %s: %s", inbox, err)
144 if t := lethaldose(err); t > doover.Tries {
145 doover.Tries = t
146 }
147 doover.Msgs = doover.Msgs[i:]
148 sayitagain(doover)
149 return
150 }
151 }
152}
153
154var pokechan = make(chan int, 1)
155
156func getdoovers() []Doover {
157 rows, err := stmtGetDoovers.Query()
158 if err != nil {
159 elog.Printf("wat?")
160 time.Sleep(1 * time.Minute)
161 return nil
162 }
163 defer rows.Close()
164 var doovers []Doover
165 for rows.Next() {
166 var d Doover
167 var dt string
168 err := rows.Scan(&d.ID, &dt)
169 if err != nil {
170 elog.Printf("error scanning dooverid: %s", err)
171 continue
172 }
173 d.When, _ = time.Parse(dbtimeformat, dt)
174 doovers = append(doovers, d)
175 }
176 return doovers
177}
178
179func extractdoover(d *Doover) error {
180 dqmtx.Lock()
181 defer dqmtx.Unlock()
182 row := stmtLoadDoover.QueryRow(d.ID)
183 var data []byte
184 err := row.Scan(&d.Tries, &d.Userid, &d.Rcpt, &data)
185 if err != nil {
186 return err
187 }
188 _, err = stmtZapDoover.Exec(d.ID)
189 if err != nil {
190 return err
191 }
192 d.Msgs = bytes.Split(data, []byte{0})
193 return nil
194}
195
196func redeliverator() {
197 sleeper := time.NewTimer(5 * time.Second)
198 for {
199 select {
200 case <-pokechan:
201 if !sleeper.Stop() {
202 <-sleeper.C
203 }
204 time.Sleep(5 * time.Second)
205 case <-sleeper.C:
206 }
207
208 doovers := getdoovers()
209
210 now := time.Now()
211 nexttime := now.Add(24 * time.Hour)
212 for _, d := range doovers {
213 if d.When.Before(now) {
214 err := extractdoover(&d)
215 if err != nil {
216 elog.Printf("error extracting doover: %s", err)
217 continue
218 }
219 ilog.Printf("redeliverating %s try %d", d.Rcpt, d.Tries)
220 deliveration(d)
221 } else if d.When.Before(nexttime) {
222 nexttime = d.When
223 }
224 }
225 now = time.Now()
226 dur := 5 * time.Second
227 if now.Before(nexttime) {
228 dur += nexttime.Sub(now).Round(time.Second)
229 }
230 sleeper.Reset(dur)
231 }
232}