deliverator.go (view raw)
1//
2// Copyright (c) 2019 Ted Unangst <tedu@tedunangst.com>
3//
4// Permission to use, copy, modify, and distribute this software for any
5// purpose with or without fee is hereby granted, provided that the above
6// copyright notice and this permission notice appear in all copies.
7//
8// THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
9// WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
10// MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
11// ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
12// WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
13// ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
14// OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
15
16package main
17
18import (
19 "bytes"
20 "database/sql"
21 notrand "math/rand"
22 "sync"
23 "time"
24
25 "humungus.tedunangst.com/r/webs/gate"
26)
27
28type Doover struct {
29 ID int64
30 When time.Time
31 Userid int64
32 Tries int64
33 Rcpt string
34 Msgs [][]byte
35}
36
37func sayitagain(doover Doover) {
38 doover.Tries += 1
39 var drift time.Duration
40 if doover.Tries <= 3 { // 5, 10, 15 minutes
41 drift = time.Duration(doover.Tries*5) * time.Minute
42 } else if doover.Tries <= 6 { // 1, 2, 3 hours
43 drift = time.Duration(doover.Tries-3) * time.Hour
44 } else if doover.Tries <= 9 { // 12, 12, 12 hours
45 drift = time.Duration(12) * time.Hour
46 } else {
47 ilog.Printf("he's dead jim: %s", doover.Rcpt)
48 return
49 }
50 drift += time.Duration(notrand.Int63n(int64(drift / 10)))
51 when := time.Now().Add(drift)
52 data := bytes.Join(doover.Msgs, []byte{0})
53 _, err := stmtAddDoover.Exec(when.UTC().Format(dbtimeformat), doover.Tries, doover.Userid, doover.Rcpt, data)
54 if err != nil {
55 elog.Printf("error saving doover: %s", err)
56 }
57 select {
58 case pokechan <- 0:
59 default:
60 }
61}
62
63var dqmtx sync.Mutex
64
65func delinquent(userid int64, rcpt string, msg []byte) bool {
66 dqmtx.Lock()
67 defer dqmtx.Unlock()
68 row := stmtDeliquentCheck.QueryRow(userid, rcpt)
69 var dooverid int64
70 var data []byte
71 err := row.Scan(&dooverid, &data)
72 if err == sql.ErrNoRows {
73 return false
74 }
75 if err != nil {
76 elog.Printf("error scanning deliquent check: %s", err)
77 return true
78 }
79 data = append(data, 0)
80 data = append(data, msg...)
81 _, err = stmtDeliquentUpdate.Exec(data, dooverid)
82 if err != nil {
83 elog.Printf("error updating deliquent: %s", err)
84 return true
85 }
86 return true
87}
88
89func deliverate(userid int64, rcpt string, msg []byte) {
90 if delinquent(userid, rcpt, msg) {
91 return
92 }
93 var d Doover
94 d.Userid = userid
95 d.Tries = 0
96 d.Rcpt = rcpt
97 d.Msgs = append(d.Msgs, msg)
98 deliveration(d)
99}
100
101var garage = gate.NewLimiter(40)
102
103func deliveration(doover Doover) {
104 garage.Start()
105 defer garage.Finish()
106
107 var ki *KeyInfo
108 ok := ziggies.Get(doover.Userid, &ki)
109 if !ok {
110 elog.Printf("lost key for delivery")
111 return
112 }
113 var inbox string
114 rcpt := doover.Rcpt
115 // already did the box indirection
116 if rcpt[0] == '%' {
117 inbox = rcpt[1:]
118 } else {
119 var box *Box
120 ok := boxofboxes.Get(rcpt, &box)
121 if !ok {
122 ilog.Printf("failed getting inbox for %s", rcpt)
123 sayitagain(doover)
124 return
125 }
126 inbox = box.In
127 }
128 for i, msg := range doover.Msgs {
129 if i > 0 {
130 time.Sleep(2 * time.Second)
131 }
132 err := PostMsg(ki.keyname, ki.seckey, inbox, msg)
133 if err != nil {
134 ilog.Printf("failed to post json to %s: %s", inbox, err)
135 doover.Msgs = doover.Msgs[i:]
136 sayitagain(doover)
137 return
138 }
139 }
140}
141
142var pokechan = make(chan int, 1)
143
144func getdoovers() []Doover {
145 rows, err := stmtGetDoovers.Query()
146 if err != nil {
147 elog.Printf("wat?")
148 time.Sleep(1 * time.Minute)
149 return nil
150 }
151 defer rows.Close()
152 var doovers []Doover
153 for rows.Next() {
154 var d Doover
155 var dt string
156 err := rows.Scan(&d.ID, &dt)
157 if err != nil {
158 elog.Printf("error scanning dooverid: %s", err)
159 continue
160 }
161 d.When, _ = time.Parse(dbtimeformat, dt)
162 doovers = append(doovers, d)
163 }
164 return doovers
165}
166
167func extractdoover(d *Doover) error {
168 dqmtx.Lock()
169 defer dqmtx.Unlock()
170 row := stmtLoadDoover.QueryRow(d.ID)
171 var data []byte
172 err := row.Scan(&d.Tries, &d.Userid, &d.Rcpt, &data)
173 if err != nil {
174 return err
175 }
176 _, err = stmtZapDoover.Exec(d.ID)
177 if err != nil {
178 return err
179 }
180 d.Msgs = bytes.Split(data, []byte{0})
181 return nil
182}
183
184func redeliverator() {
185 sleeper := time.NewTimer(5 * time.Second)
186 for {
187 select {
188 case <-pokechan:
189 if !sleeper.Stop() {
190 <-sleeper.C
191 }
192 time.Sleep(5 * time.Second)
193 case <-sleeper.C:
194 }
195
196 doovers := getdoovers()
197
198 now := time.Now()
199 nexttime := now.Add(24 * time.Hour)
200 for _, d := range doovers {
201 if d.When.Before(now) {
202 err := extractdoover(&d)
203 if err != nil {
204 elog.Printf("error extracting doover: %s", err)
205 continue
206 }
207 ilog.Printf("redeliverating %s try %d", d.Rcpt, d.Tries)
208 deliveration(d)
209 } else if d.When.Before(nexttime) {
210 nexttime = d.When
211 }
212 }
213 now = time.Now()
214 dur := 5 * time.Second
215 if now.Before(nexttime) {
216 dur += nexttime.Sub(now).Round(time.Second)
217 }
218 sleeper.Reset(dur)
219 }
220}