deliverator.go (view raw)
1//
2// Copyright (c) 2019 Ted Unangst <tedu@tedunangst.com>
3//
4// Permission to use, copy, modify, and distribute this software for any
5// purpose with or without fee is hereby granted, provided that the above
6// copyright notice and this permission notice appear in all copies.
7//
8// THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
9// WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
10// MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
11// ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
12// WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
13// ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
14// OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
15
16package main
17
18import (
19 "fmt"
20 "log"
21 notrand "math/rand"
22 "time"
23
24 "humungus.tedunangst.com/r/webs/gate"
25)
26
27type Doover struct {
28 ID int64
29 When time.Time
30}
31
32func sayitagain(goarounds int64, userid int64, rcpt string, msg []byte) {
33 var drift time.Duration
34 switch goarounds {
35 case 1:
36 drift = 5 * time.Minute
37 case 2:
38 drift = 1 * time.Hour
39 case 3:
40 drift = 4 * time.Hour
41 case 4:
42 drift = 12 * time.Hour
43 case 5:
44 drift = 24 * time.Hour
45 default:
46 log.Printf("he's dead jim: %s", rcpt)
47 clearoutbound(rcpt)
48 return
49 }
50 drift += time.Duration(notrand.Int63n(int64(drift / 10)))
51 when := time.Now().UTC().Add(drift)
52 _, err := stmtAddDoover.Exec(when.Format(dbtimeformat), goarounds, userid, rcpt, msg)
53 if err != nil {
54 log.Printf("error saving doover: %s", err)
55 }
56 select {
57 case pokechan <- 0:
58 default:
59 }
60}
61
62func clearoutbound(rcpt string) {
63 hostname := originate(rcpt)
64 if hostname == "" {
65 return
66 }
67 xid := fmt.Sprintf("%%https://%s/%%", hostname)
68 log.Printf("clearing outbound for %s", xid)
69 db := opendatabase()
70 db.Exec("delete from doovers where rcpt like ?", xid)
71}
72
73var garage = gate.NewLimiter(40)
74
75func deliverate(goarounds int64, userid int64, rcpt string, msg []byte) {
76 garage.Start()
77 defer garage.Finish()
78
79 var ki *KeyInfo
80 ok := ziggies.Get(userid, &ki)
81 if !ok {
82 log.Printf("lost key for delivery")
83 return
84 }
85 var inbox string
86 // already did the box indirection
87 if rcpt[0] == '%' {
88 inbox = rcpt[1:]
89 } else {
90 var box *Box
91 ok := boxofboxes.Get(rcpt, &box)
92 if !ok {
93 log.Printf("failed getting inbox for %s", rcpt)
94 sayitagain(goarounds+1, userid, rcpt, msg)
95 return
96 }
97 inbox = box.In
98 }
99 err := PostMsg(ki.keyname, ki.seckey, inbox, msg)
100 if err != nil {
101 log.Printf("failed to post json to %s: %s", inbox, err)
102 sayitagain(goarounds+1, userid, rcpt, msg)
103 return
104 }
105}
106
107var pokechan = make(chan int, 1)
108
109func getdoovers() []Doover {
110 rows, err := stmtGetDoovers.Query()
111 if err != nil {
112 log.Printf("wat?")
113 time.Sleep(1 * time.Minute)
114 return nil
115 }
116 defer rows.Close()
117 var doovers []Doover
118 for rows.Next() {
119 var d Doover
120 var dt string
121 err := rows.Scan(&d.ID, &dt)
122 if err != nil {
123 log.Printf("error scanning dooverid: %s", err)
124 continue
125 }
126 d.When, _ = time.Parse(dbtimeformat, dt)
127 doovers = append(doovers, d)
128 }
129 return doovers
130}
131
132func redeliverator() {
133 sleeper := time.NewTimer(0)
134 for {
135 select {
136 case <-pokechan:
137 if !sleeper.Stop() {
138 <-sleeper.C
139 }
140 time.Sleep(5 * time.Second)
141 case <-sleeper.C:
142 }
143
144 doovers := getdoovers()
145
146 now := time.Now().UTC()
147 nexttime := now.Add(24 * time.Hour)
148 for _, d := range doovers {
149 if d.When.Before(now) {
150 var goarounds, userid int64
151 var rcpt string
152 var msg []byte
153 row := stmtLoadDoover.QueryRow(d.ID)
154 err := row.Scan(&goarounds, &userid, &rcpt, &msg)
155 if err != nil {
156 log.Printf("error scanning doover: %s", err)
157 continue
158 }
159 _, err = stmtZapDoover.Exec(d.ID)
160 if err != nil {
161 log.Printf("error deleting doover: %s", err)
162 continue
163 }
164 log.Printf("redeliverating %s try %d", rcpt, goarounds)
165 deliverate(goarounds, userid, rcpt, msg)
166 } else if d.When.Before(nexttime) {
167 nexttime = d.When
168 }
169 }
170 dur := nexttime.Sub(now).Round(time.Second) + 5*time.Second
171 sleeper.Reset(dur)
172 }
173}