deliverator.go (view raw)
1//
2// Copyright (c) 2019 Ted Unangst <tedu@tedunangst.com>
3//
4// Permission to use, copy, modify, and distribute this software for any
5// purpose with or without fee is hereby granted, provided that the above
6// copyright notice and this permission notice appear in all copies.
7//
8// THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
9// WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
10// MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
11// ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
12// WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
13// ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
14// OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
15
16package main
17
18import (
19 "fmt"
20 notrand "math/rand"
21 "time"
22
23 "humungus.tedunangst.com/r/webs/gate"
24)
25
26type Doover struct {
27 ID int64
28 When time.Time
29}
30
31func sayitagain(goarounds int64, userid int64, rcpt string, msg []byte) {
32 var drift time.Duration
33 switch goarounds {
34 case 1:
35 drift = 5 * time.Minute
36 case 2:
37 drift = 1 * time.Hour
38 case 3:
39 drift = 4 * time.Hour
40 case 4:
41 drift = 12 * time.Hour
42 case 5:
43 drift = 24 * time.Hour
44 default:
45 ilog.Printf("he's dead jim: %s", rcpt)
46 clearoutbound(rcpt)
47 return
48 }
49 drift += time.Duration(notrand.Int63n(int64(drift / 10)))
50 when := time.Now().UTC().Add(drift)
51 _, err := stmtAddDoover.Exec(when.Format(dbtimeformat), goarounds, userid, rcpt, msg)
52 if err != nil {
53 elog.Printf("error saving doover: %s", err)
54 }
55 select {
56 case pokechan <- 0:
57 default:
58 }
59}
60
61func clearoutbound(rcpt string) {
62 hostname := originate(rcpt)
63 if hostname == "" {
64 return
65 }
66 xid := fmt.Sprintf("%%https://%s/%%", hostname)
67 ilog.Printf("clearing outbound for %s", xid)
68 db := opendatabase()
69 db.Exec("delete from doovers where rcpt like ?", xid)
70}
71
72var garage = gate.NewLimiter(40)
73
74func deliverate(goarounds int64, userid int64, rcpt string, msg []byte, prio bool) {
75 garage.Start()
76 defer garage.Finish()
77
78 var ki *KeyInfo
79 ok := ziggies.Get(userid, &ki)
80 if !ok {
81 elog.Printf("lost key for delivery")
82 return
83 }
84 var inbox string
85 // already did the box indirection
86 if rcpt[0] == '%' {
87 inbox = rcpt[1:]
88 } else {
89 var box *Box
90 ok := boxofboxes.Get(rcpt, &box)
91 if !ok {
92 ilog.Printf("failed getting inbox for %s", rcpt)
93 sayitagain(goarounds+1, userid, rcpt, msg)
94 return
95 }
96 inbox = box.In
97 }
98 err := PostMsg(ki.keyname, ki.seckey, inbox, msg)
99 if err != nil {
100 ilog.Printf("failed to post json to %s: %s", inbox, err)
101 if prio {
102 sayitagain(goarounds+1, userid, rcpt, msg)
103 }
104 return
105 }
106}
107
108var pokechan = make(chan int, 1)
109
110func getdoovers() []Doover {
111 rows, err := stmtGetDoovers.Query()
112 if err != nil {
113 elog.Printf("wat?")
114 time.Sleep(1 * time.Minute)
115 return nil
116 }
117 defer rows.Close()
118 var doovers []Doover
119 for rows.Next() {
120 var d Doover
121 var dt string
122 err := rows.Scan(&d.ID, &dt)
123 if err != nil {
124 elog.Printf("error scanning dooverid: %s", err)
125 continue
126 }
127 d.When, _ = time.Parse(dbtimeformat, dt)
128 doovers = append(doovers, d)
129 }
130 return doovers
131}
132
133func redeliverator() {
134 sleeper := time.NewTimer(0)
135 for {
136 select {
137 case <-pokechan:
138 if !sleeper.Stop() {
139 <-sleeper.C
140 }
141 time.Sleep(5 * time.Second)
142 case <-sleeper.C:
143 }
144
145 doovers := getdoovers()
146
147 now := time.Now().UTC()
148 nexttime := now.Add(24 * time.Hour)
149 for _, d := range doovers {
150 if d.When.Before(now) {
151 var goarounds, userid int64
152 var rcpt string
153 var msg []byte
154 row := stmtLoadDoover.QueryRow(d.ID)
155 err := row.Scan(&goarounds, &userid, &rcpt, &msg)
156 if err != nil {
157 elog.Printf("error scanning doover: %s", err)
158 continue
159 }
160 _, err = stmtZapDoover.Exec(d.ID)
161 if err != nil {
162 elog.Printf("error deleting doover: %s", err)
163 continue
164 }
165 ilog.Printf("redeliverating %s try %d", rcpt, goarounds)
166 deliverate(goarounds, userid, rcpt, msg, true)
167 } else if d.When.Before(nexttime) {
168 nexttime = d.When
169 }
170 }
171 dur := nexttime.Sub(now).Round(time.Second) + 5*time.Second
172 sleeper.Reset(dur)
173 }
174}