deliverator.go (view raw)
1//
2// Copyright (c) 2019 Ted Unangst <tedu@tedunangst.com>
3//
4// Permission to use, copy, modify, and distribute this software for any
5// purpose with or without fee is hereby granted, provided that the above
6// copyright notice and this permission notice appear in all copies.
7//
8// THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
9// WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
10// MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
11// ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
12// WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
13// ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
14// OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
15
16package main
17
18import (
19 "fmt"
20 "log"
21 notrand "math/rand"
22 "time"
23
24 "humungus.tedunangst.com/r/webs/gate"
25)
26
27type Doover struct {
28 ID int64
29 When time.Time
30}
31
32func sayitagain(goarounds int64, userid int64, rcpt string, msg []byte) {
33 var drift time.Duration
34 switch goarounds {
35 case 1:
36 drift = 5 * time.Minute
37 case 2:
38 drift = 1 * time.Hour
39 case 3:
40 drift = 4 * time.Hour
41 case 4:
42 drift = 12 * time.Hour
43 case 5:
44 drift = 24 * time.Hour
45 default:
46 log.Printf("he's dead jim: %s", rcpt)
47 clearoutbound(rcpt)
48 return
49 }
50 drift += time.Duration(notrand.Int63n(int64(drift / 10)))
51 when := time.Now().UTC().Add(drift)
52 _, err := stmtAddDoover.Exec(when.Format(dbtimeformat), goarounds, userid, rcpt, msg)
53 if err != nil {
54 log.Printf("error saving doover: %s", err)
55 }
56 select {
57 case pokechan <- 0:
58 default:
59 }
60}
61
62func clearoutbound(rcpt string) {
63 hostname := originate(rcpt)
64 if hostname == "" {
65 return
66 }
67 xid := fmt.Sprintf("%%https://%s/%%", hostname)
68 log.Printf("clearing outbound for %s", xid)
69 db := opendatabase()
70 db.Exec("delete from doovers where rcpt like ?", xid)
71}
72
73var garage = gate.NewLimiter(40)
74
75func deliverate(goarounds int64, userid int64, rcpt string, msg []byte, prio bool) {
76 garage.Start()
77 defer garage.Finish()
78
79 var ki *KeyInfo
80 ok := ziggies.Get(userid, &ki)
81 if !ok {
82 log.Printf("lost key for delivery")
83 return
84 }
85 var inbox string
86 // already did the box indirection
87 if rcpt[0] == '%' {
88 inbox = rcpt[1:]
89 } else {
90 var box *Box
91 ok := boxofboxes.Get(rcpt, &box)
92 if !ok {
93 log.Printf("failed getting inbox for %s", rcpt)
94 sayitagain(goarounds+1, userid, rcpt, msg)
95 return
96 }
97 inbox = box.In
98 }
99 err := PostMsg(ki.keyname, ki.seckey, inbox, msg)
100 if err != nil {
101 log.Printf("failed to post json to %s: %s", inbox, err)
102 if prio {
103 sayitagain(goarounds+1, userid, rcpt, msg)
104 }
105 return
106 }
107}
108
109var pokechan = make(chan int, 1)
110
111func getdoovers() []Doover {
112 rows, err := stmtGetDoovers.Query()
113 if err != nil {
114 log.Printf("wat?")
115 time.Sleep(1 * time.Minute)
116 return nil
117 }
118 defer rows.Close()
119 var doovers []Doover
120 for rows.Next() {
121 var d Doover
122 var dt string
123 err := rows.Scan(&d.ID, &dt)
124 if err != nil {
125 log.Printf("error scanning dooverid: %s", err)
126 continue
127 }
128 d.When, _ = time.Parse(dbtimeformat, dt)
129 doovers = append(doovers, d)
130 }
131 return doovers
132}
133
134func redeliverator() {
135 sleeper := time.NewTimer(0)
136 for {
137 select {
138 case <-pokechan:
139 if !sleeper.Stop() {
140 <-sleeper.C
141 }
142 time.Sleep(5 * time.Second)
143 case <-sleeper.C:
144 }
145
146 doovers := getdoovers()
147
148 now := time.Now().UTC()
149 nexttime := now.Add(24 * time.Hour)
150 for _, d := range doovers {
151 if d.When.Before(now) {
152 var goarounds, userid int64
153 var rcpt string
154 var msg []byte
155 row := stmtLoadDoover.QueryRow(d.ID)
156 err := row.Scan(&goarounds, &userid, &rcpt, &msg)
157 if err != nil {
158 log.Printf("error scanning doover: %s", err)
159 continue
160 }
161 _, err = stmtZapDoover.Exec(d.ID)
162 if err != nil {
163 log.Printf("error deleting doover: %s", err)
164 continue
165 }
166 log.Printf("redeliverating %s try %d", rcpt, goarounds)
167 deliverate(goarounds, userid, rcpt, msg, true)
168 } else if d.When.Before(nexttime) {
169 nexttime = d.When
170 }
171 }
172 dur := nexttime.Sub(now).Round(time.Second) + 5*time.Second
173 sleeper.Reset(dur)
174 }
175}