deliverator.go (view raw)
1//
2// Copyright (c) 2019 Ted Unangst <tedu@tedunangst.com>
3//
4// Permission to use, copy, modify, and distribute this software for any
5// purpose with or without fee is hereby granted, provided that the above
6// copyright notice and this permission notice appear in all copies.
7//
8// THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
9// WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
10// MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
11// ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
12// WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
13// ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
14// OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
15
16package main
17
18import (
19 "fmt"
20 notrand "math/rand"
21 "time"
22
23 "humungus.tedunangst.com/r/webs/gate"
24)
25
26type Doover struct {
27 ID int64
28 When time.Time
29}
30
31func sayitagain(goarounds int64, userid int64, rcpt string, msg []byte) {
32 var drift time.Duration
33 switch goarounds {
34 case 1:
35 drift = 5 * time.Minute
36 case 2:
37 drift = 1 * time.Hour
38 case 3:
39 drift = 4 * time.Hour
40 case 4:
41 drift = 12 * time.Hour
42 case 5:
43 drift = 24 * time.Hour
44 default:
45 ilog.Printf("he's dead jim: %s", rcpt)
46 clearoutbound(rcpt)
47 return
48 }
49 drift += time.Duration(notrand.Int63n(int64(drift / 10)))
50 when := time.Now().Add(drift)
51 _, err := stmtAddDoover.Exec(when.UTC().Format(dbtimeformat), goarounds, userid, rcpt, msg)
52 if err != nil {
53 elog.Printf("error saving doover: %s", err)
54 }
55 select {
56 case pokechan <- 0:
57 default:
58 }
59}
60
61func clearoutbound(rcpt string) {
62 hostname := originate(rcpt)
63 if hostname == "" {
64 return
65 }
66 xid := fmt.Sprintf("%%https://%s/%%", hostname)
67 ilog.Printf("clearing outbound for %s", xid)
68 db := opendatabase()
69 db.Exec("delete from doovers where rcpt like ?", xid)
70}
71
72var garage = gate.NewLimiter(40)
73
74func deliverate(goarounds int64, userid int64, rcpt string, msg []byte) {
75 garage.Start()
76 defer garage.Finish()
77
78 var ki *KeyInfo
79 ok := ziggies.Get(userid, &ki)
80 if !ok {
81 elog.Printf("lost key for delivery")
82 return
83 }
84 var inbox string
85 // already did the box indirection
86 if rcpt[0] == '%' {
87 inbox = rcpt[1:]
88 } else {
89 var box *Box
90 ok := boxofboxes.Get(rcpt, &box)
91 if !ok {
92 ilog.Printf("failed getting inbox for %s", rcpt)
93 sayitagain(goarounds+1, userid, rcpt, msg)
94 return
95 }
96 inbox = box.In
97 }
98 err := PostMsg(ki.keyname, ki.seckey, inbox, msg)
99 if err != nil {
100 ilog.Printf("failed to post json to %s: %s", inbox, err)
101 sayitagain(goarounds+1, userid, rcpt, msg)
102 return
103 }
104}
105
106var pokechan = make(chan int, 1)
107
108func getdoovers() []Doover {
109 rows, err := stmtGetDoovers.Query()
110 if err != nil {
111 elog.Printf("wat?")
112 time.Sleep(1 * time.Minute)
113 return nil
114 }
115 defer rows.Close()
116 var doovers []Doover
117 for rows.Next() {
118 var d Doover
119 var dt string
120 err := rows.Scan(&d.ID, &dt)
121 if err != nil {
122 elog.Printf("error scanning dooverid: %s", err)
123 continue
124 }
125 d.When, _ = time.Parse(dbtimeformat, dt)
126 doovers = append(doovers, d)
127 }
128 return doovers
129}
130
131func redeliverator() {
132 sleeper := time.NewTimer(5 * time.Second)
133 for {
134 select {
135 case <-pokechan:
136 if !sleeper.Stop() {
137 <-sleeper.C
138 }
139 time.Sleep(5 * time.Second)
140 case <-sleeper.C:
141 }
142
143 doovers := getdoovers()
144
145 now := time.Now()
146 nexttime := now.Add(24 * time.Hour)
147 for _, d := range doovers {
148 if d.When.Before(now) {
149 var goarounds, userid int64
150 var rcpt string
151 var msg []byte
152 row := stmtLoadDoover.QueryRow(d.ID)
153 err := row.Scan(&goarounds, &userid, &rcpt, &msg)
154 if err != nil {
155 elog.Printf("error scanning doover: %s", err)
156 continue
157 }
158 _, err = stmtZapDoover.Exec(d.ID)
159 if err != nil {
160 elog.Printf("error deleting doover: %s", err)
161 continue
162 }
163 ilog.Printf("redeliverating %s try %d", rcpt, goarounds)
164 deliverate(goarounds, userid, rcpt, msg)
165 } else if d.When.Before(nexttime) {
166 nexttime = d.When
167 }
168 }
169 now = time.Now()
170 dur := 5 * time.Second
171 if now.Before(nexttime) {
172 dur += nexttime.Sub(now).Round(time.Second)
173 }
174 sleeper.Reset(dur)
175 }
176}