deliverator.go (view raw)
1//
2// Copyright (c) 2019 Ted Unangst <tedu@tedunangst.com>
3//
4// Permission to use, copy, modify, and distribute this software for any
5// purpose with or without fee is hereby granted, provided that the above
6// copyright notice and this permission notice appear in all copies.
7//
8// THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
9// WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
10// MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
11// ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
12// WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
13// ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
14// OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
15
16package main
17
18import (
19 "log"
20 notrand "math/rand"
21 "time"
22
23 "humungus.tedunangst.com/r/webs/gate"
24)
25
26type Doover struct {
27 ID int64
28 When time.Time
29}
30
31func sayitagain(goarounds int64, userid int64, rcpt string, msg []byte) {
32 var drift time.Duration
33 switch goarounds {
34 case 1:
35 drift = 5 * time.Minute
36 case 2:
37 drift = 1 * time.Hour
38 case 3:
39 drift = 4 * time.Hour
40 case 4:
41 drift = 12 * time.Hour
42 case 5:
43 drift = 24 * time.Hour
44 default:
45 log.Printf("he's dead jim: %s", rcpt)
46 return
47 }
48 drift += time.Duration(notrand.Int63n(int64(drift / 10)))
49 when := time.Now().UTC().Add(drift)
50 _, err := stmtAddDoover.Exec(when.Format(dbtimeformat), goarounds, userid, rcpt, msg)
51 if err != nil {
52 log.Printf("error saving doover: %s", err)
53 }
54 select {
55 case pokechan <- 0:
56 default:
57 }
58}
59
60var garage = gate.NewLimiter(40)
61
62func deliverate(goarounds int64, userid int64, rcpt string, msg []byte) {
63 garage.Start()
64 defer garage.Finish()
65
66 var ki *KeyInfo
67 ok := ziggies.Get(userid, &ki)
68 if !ok {
69 log.Printf("lost key for delivery")
70 return
71 }
72 var inbox string
73 // already did the box indirection
74 if rcpt[0] == '%' {
75 inbox = rcpt[1:]
76 } else {
77 var box *Box
78 ok := boxofboxes.Get(rcpt, &box)
79 if !ok {
80 log.Printf("failed getting inbox for %s", rcpt)
81 sayitagain(goarounds+1, userid, rcpt, msg)
82 return
83 }
84 inbox = box.In
85 }
86 err := PostMsg(ki.keyname, ki.seckey, inbox, msg)
87 if err != nil {
88 log.Printf("failed to post json to %s: %s", inbox, err)
89 sayitagain(goarounds+1, userid, rcpt, msg)
90 return
91 }
92}
93
94var pokechan = make(chan int, 1)
95
96func getdoovers() []Doover {
97 rows, err := stmtGetDoovers.Query()
98 if err != nil {
99 log.Printf("wat?")
100 time.Sleep(1 * time.Minute)
101 return nil
102 }
103 defer rows.Close()
104 var doovers []Doover
105 for rows.Next() {
106 var d Doover
107 var dt string
108 err := rows.Scan(&d.ID, &dt)
109 if err != nil {
110 log.Printf("error scanning dooverid: %s", err)
111 continue
112 }
113 d.When, _ = time.Parse(dbtimeformat, dt)
114 doovers = append(doovers, d)
115 }
116 return doovers
117}
118
119func redeliverator() {
120 sleeper := time.NewTimer(0)
121 for {
122 select {
123 case <-pokechan:
124 if !sleeper.Stop() {
125 <-sleeper.C
126 }
127 time.Sleep(5 * time.Second)
128 case <-sleeper.C:
129 }
130
131 doovers := getdoovers()
132
133 now := time.Now().UTC()
134 nexttime := now.Add(24 * time.Hour)
135 for _, d := range doovers {
136 if d.When.Before(now) {
137 var goarounds, userid int64
138 var rcpt string
139 var msg []byte
140 row := stmtLoadDoover.QueryRow(d.ID)
141 err := row.Scan(&goarounds, &userid, &rcpt, &msg)
142 if err != nil {
143 log.Printf("error scanning doover: %s", err)
144 continue
145 }
146 _, err = stmtZapDoover.Exec(d.ID)
147 if err != nil {
148 log.Printf("error deleting doover: %s", err)
149 continue
150 }
151 log.Printf("redeliverating %s try %d", rcpt, goarounds)
152 deliverate(goarounds, userid, rcpt, msg)
153 } else if d.When.Before(nexttime) {
154 nexttime = d.When
155 }
156 }
157 dur := nexttime.Sub(now).Round(time.Second) + 5*time.Second
158 sleeper.Reset(dur)
159 }
160}