deliverator.go (view raw)
1//
2// Copyright (c) 2019 Ted Unangst <tedu@tedunangst.com>
3//
4// Permission to use, copy, modify, and distribute this software for any
5// purpose with or without fee is hereby granted, provided that the above
6// copyright notice and this permission notice appear in all copies.
7//
8// THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
9// WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
10// MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
11// ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
12// WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
13// ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
14// OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
15
16package main
17
18import (
19 "bytes"
20 "database/sql"
21 notrand "math/rand"
22 "sync"
23 "time"
24
25 "humungus.tedunangst.com/r/webs/gate"
26)
27
28type Doover struct {
29 ID int64
30 When time.Time
31 Userid int64
32 Tries int64
33 Rcpt string
34 Msgs [][]byte
35}
36
37func sayitagain(doover Doover) {
38 rcpt := doover.Rcpt
39 var drift time.Duration
40 doover.Tries += 1
41 switch doover.Tries {
42 case 1:
43 drift = 5 * time.Minute
44 case 2:
45 drift = 1 * time.Hour
46 case 3:
47 drift = 4 * time.Hour
48 case 4:
49 drift = 12 * time.Hour
50 case 5:
51 drift = 24 * time.Hour
52 default:
53 ilog.Printf("he's dead jim: %s", rcpt)
54 return
55 }
56 drift += time.Duration(notrand.Int63n(int64(drift / 10)))
57 when := time.Now().Add(drift)
58 data := bytes.Join(doover.Msgs, []byte{0})
59 _, err := stmtAddDoover.Exec(when.UTC().Format(dbtimeformat), doover.Tries, doover.Userid, rcpt, data)
60 if err != nil {
61 elog.Printf("error saving doover: %s", err)
62 }
63 select {
64 case pokechan <- 0:
65 default:
66 }
67}
68
69var dqmtx sync.Mutex
70
71func delinquent(userid int64, rcpt string, msg []byte) bool {
72 dqmtx.Lock()
73 defer dqmtx.Unlock()
74 row := stmtDeliquentCheck.QueryRow(userid, rcpt)
75 var dooverid int64
76 var data []byte
77 err := row.Scan(&dooverid, data)
78 if err == sql.ErrNoRows {
79 return false
80 }
81 if err != nil {
82 elog.Printf("error scanning deliquent check: %s", err)
83 return true
84 }
85 data = append(data, 0)
86 data = append(data, msg...)
87 _, err = stmtDeliquentUpdate.Exec(data, dooverid)
88 if err != nil {
89 elog.Printf("error updating deliquent: %s", err)
90 return true
91 }
92 return true
93}
94
95func deliverate(userid int64, rcpt string, msg []byte) {
96 if delinquent(userid, rcpt, msg) {
97 return
98 }
99 var d Doover
100 d.Userid = userid
101 d.Tries = 0
102 d.Rcpt = rcpt
103 d.Msgs = append(d.Msgs, msg)
104 deliveration(d)
105}
106
107var garage = gate.NewLimiter(40)
108
109func deliveration(doover Doover) {
110 garage.Start()
111 defer garage.Finish()
112
113 var ki *KeyInfo
114 ok := ziggies.Get(doover.Userid, &ki)
115 if !ok {
116 elog.Printf("lost key for delivery")
117 return
118 }
119 var inbox string
120 rcpt := doover.Rcpt
121 // already did the box indirection
122 if rcpt[0] == '%' {
123 inbox = rcpt[1:]
124 } else {
125 var box *Box
126 ok := boxofboxes.Get(rcpt, &box)
127 if !ok {
128 ilog.Printf("failed getting inbox for %s", rcpt)
129 sayitagain(doover)
130 return
131 }
132 inbox = box.In
133 }
134 for i, msg := range doover.Msgs {
135 if i > 0 {
136 time.Sleep(2 * time.Second)
137 }
138 err := PostMsg(ki.keyname, ki.seckey, inbox, msg)
139 if err != nil {
140 ilog.Printf("failed to post json to %s: %s", inbox, err)
141 doover.Msgs = doover.Msgs[i:]
142 sayitagain(doover)
143 return
144 }
145 }
146}
147
148var pokechan = make(chan int, 1)
149
150func getdoovers() []Doover {
151 rows, err := stmtGetDoovers.Query()
152 if err != nil {
153 elog.Printf("wat?")
154 time.Sleep(1 * time.Minute)
155 return nil
156 }
157 defer rows.Close()
158 var doovers []Doover
159 for rows.Next() {
160 var d Doover
161 var dt string
162 err := rows.Scan(&d.ID, &dt)
163 if err != nil {
164 elog.Printf("error scanning dooverid: %s", err)
165 continue
166 }
167 d.When, _ = time.Parse(dbtimeformat, dt)
168 doovers = append(doovers, d)
169 }
170 return doovers
171}
172
173func extractdoover(d *Doover) error {
174 dqmtx.Lock()
175 defer dqmtx.Unlock()
176 row := stmtLoadDoover.QueryRow(d.ID)
177 var data []byte
178 err := row.Scan(&d.Tries, &d.Userid, &d.Rcpt, &data)
179 if err != nil {
180 return err
181 }
182 _, err = stmtZapDoover.Exec(d.ID)
183 if err != nil {
184 return err
185 }
186 d.Msgs = bytes.Split(data, []byte{0})
187 return nil
188}
189
190func redeliverator() {
191 sleeper := time.NewTimer(5 * time.Second)
192 for {
193 select {
194 case <-pokechan:
195 if !sleeper.Stop() {
196 <-sleeper.C
197 }
198 time.Sleep(5 * time.Second)
199 case <-sleeper.C:
200 }
201
202 doovers := getdoovers()
203
204 now := time.Now()
205 nexttime := now.Add(24 * time.Hour)
206 for _, d := range doovers {
207 if d.When.Before(now) {
208 err := extractdoover(&d)
209 if err != nil {
210 elog.Printf("error extracting doover: %s", err)
211 continue
212 }
213 ilog.Printf("redeliverating %s try %d", d.Rcpt, d.Tries)
214 deliveration(d)
215 } else if d.When.Before(nexttime) {
216 nexttime = d.When
217 }
218 }
219 now = time.Now()
220 dur := 5 * time.Second
221 if now.Before(nexttime) {
222 dur += nexttime.Sub(now).Round(time.Second)
223 }
224 sleeper.Reset(dur)
225 }
226}