deliverator.go (view raw)
1//
2// Copyright (c) 2019 Ted Unangst <tedu@tedunangst.com>
3//
4// Permission to use, copy, modify, and distribute this software for any
5// purpose with or without fee is hereby granted, provided that the above
6// copyright notice and this permission notice appear in all copies.
7//
8// THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
9// WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
10// MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
11// ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
12// WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
13// ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
14// OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
15
16package main
17
18import (
19 "bytes"
20 "database/sql"
21 notrand "math/rand"
22 "sync"
23 "time"
24
25 "humungus.tedunangst.com/r/webs/gate"
26)
27
28type Doover struct {
29 ID int64
30 When time.Time
31 Rcpt string
32 Msgs [][]byte
33}
34
35func sayitagain(goarounds int64, userid int64, doover Doover) {
36 rcpt := doover.Rcpt
37 var drift time.Duration
38 switch goarounds {
39 case 1:
40 drift = 5 * time.Minute
41 case 2:
42 drift = 1 * time.Hour
43 case 3:
44 drift = 4 * time.Hour
45 case 4:
46 drift = 12 * time.Hour
47 case 5:
48 drift = 24 * time.Hour
49 default:
50 ilog.Printf("he's dead jim: %s", rcpt)
51 return
52 }
53 drift += time.Duration(notrand.Int63n(int64(drift / 10)))
54 when := time.Now().Add(drift)
55 data := bytes.Join(doover.Msgs, []byte{0})
56 _, err := stmtAddDoover.Exec(when.UTC().Format(dbtimeformat), goarounds, userid, rcpt, data)
57 if err != nil {
58 elog.Printf("error saving doover: %s", err)
59 }
60 select {
61 case pokechan <- 0:
62 default:
63 }
64}
65
66var dqmtx sync.Mutex
67
68func delinquent(rcpt string, msg []byte) bool {
69 dqmtx.Lock()
70 defer dqmtx.Unlock()
71 row := stmtDeliquentCheck.QueryRow(rcpt)
72 var dooverid int64
73 var data []byte
74 err := row.Scan(&dooverid, data)
75 if err == sql.ErrNoRows {
76 return false
77 }
78 if err != nil {
79 elog.Printf("error scanning deliquent check: %s", err)
80 return true
81 }
82 data = append(data, 0)
83 data = append(data, msg...)
84 _, err = stmtDeliquentUpdate.Exec(data, dooverid)
85 if err != nil {
86 elog.Printf("error updating deliquent: %s", err)
87 return true
88 }
89 return true
90}
91
92func deliverate(goarounds int64, userid int64, rcpt string, msg []byte) {
93 if delinquent(rcpt, msg) {
94 return
95 }
96 var d Doover
97 d.Rcpt = rcpt
98 d.Msgs = append(d.Msgs, msg)
99 deliveration(goarounds, userid, d)
100}
101
102var garage = gate.NewLimiter(40)
103
104func deliveration(goarounds int64, userid int64, doover Doover) {
105 garage.Start()
106 defer garage.Finish()
107
108 var ki *KeyInfo
109 ok := ziggies.Get(userid, &ki)
110 if !ok {
111 elog.Printf("lost key for delivery")
112 return
113 }
114 var inbox string
115 rcpt := doover.Rcpt
116 // already did the box indirection
117 if rcpt[0] == '%' {
118 inbox = rcpt[1:]
119 } else {
120 var box *Box
121 ok := boxofboxes.Get(rcpt, &box)
122 if !ok {
123 ilog.Printf("failed getting inbox for %s", rcpt)
124 sayitagain(goarounds+1, userid, doover)
125 return
126 }
127 inbox = box.In
128 }
129 for i, msg := range doover.Msgs {
130 if i > 0 {
131 time.Sleep(2 * time.Second)
132 }
133 err := PostMsg(ki.keyname, ki.seckey, inbox, msg)
134 if err != nil {
135 ilog.Printf("failed to post json to %s: %s", inbox, err)
136 doover.Msgs = doover.Msgs[i:]
137 sayitagain(goarounds+1, userid, doover)
138 return
139 }
140 }
141}
142
143var pokechan = make(chan int, 1)
144
145func getdoovers() []Doover {
146 rows, err := stmtGetDoovers.Query()
147 if err != nil {
148 elog.Printf("wat?")
149 time.Sleep(1 * time.Minute)
150 return nil
151 }
152 defer rows.Close()
153 var doovers []Doover
154 for rows.Next() {
155 var d Doover
156 var dt string
157 err := rows.Scan(&d.ID, &dt)
158 if err != nil {
159 elog.Printf("error scanning dooverid: %s", err)
160 continue
161 }
162 d.When, _ = time.Parse(dbtimeformat, dt)
163 doovers = append(doovers, d)
164 }
165 return doovers
166}
167
168func redeliverator() {
169 sleeper := time.NewTimer(5 * time.Second)
170 for {
171 select {
172 case <-pokechan:
173 if !sleeper.Stop() {
174 <-sleeper.C
175 }
176 time.Sleep(5 * time.Second)
177 case <-sleeper.C:
178 }
179
180 doovers := getdoovers()
181
182 now := time.Now()
183 nexttime := now.Add(24 * time.Hour)
184 for _, d := range doovers {
185 if d.When.Before(now) {
186 var goarounds, userid int64
187 var data []byte
188 dqmtx.Lock()
189 row := stmtLoadDoover.QueryRow(d.ID)
190 err := row.Scan(&goarounds, &userid, &d.Rcpt, &data)
191 if err != nil {
192 elog.Printf("error scanning doover: %s", err)
193 dqmtx.Unlock() // defer
194 continue
195 }
196 _, err = stmtZapDoover.Exec(d.ID)
197 if err != nil {
198 elog.Printf("error deleting doover: %s", err)
199 dqmtx.Unlock() // defer
200 continue
201 }
202 dqmtx.Unlock() // defer
203 d.Msgs = bytes.Split(data, []byte{0})
204 rcpt := d.Rcpt
205 ilog.Printf("redeliverating %s try %d", rcpt, goarounds)
206 deliveration(goarounds, userid, d)
207 } else if d.When.Before(nexttime) {
208 nexttime = d.When
209 }
210 }
211 now = time.Now()
212 dur := 5 * time.Second
213 if now.Before(nexttime) {
214 dur += nexttime.Sub(now).Round(time.Second)
215 }
216 sleeper.Reset(dur)
217 }
218}