deliverator.go (view raw)
1//
2// Copyright (c) 2019 Ted Unangst <tedu@tedunangst.com>
3//
4// Permission to use, copy, modify, and distribute this software for any
5// purpose with or without fee is hereby granted, provided that the above
6// copyright notice and this permission notice appear in all copies.
7//
8// THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
9// WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
10// MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
11// ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
12// WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
13// ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
14// OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
15
16package main
17
18import (
19 "log"
20 notrand "math/rand"
21 "sync"
22 "time"
23)
24
25func init() {
26 notrand.Seed(time.Now().Unix())
27}
28
29type Doover struct {
30 ID int64
31 When time.Time
32}
33
34func sayitagain(goarounds int64, userid int64, rcpt string, msg []byte) {
35 var drift time.Duration
36 switch goarounds {
37 case 1:
38 drift = 5 * time.Minute
39 case 2:
40 drift = 1 * time.Hour
41 case 3:
42 drift = 4 * time.Hour
43 case 4:
44 drift = 12 * time.Hour
45 case 5:
46 drift = 24 * time.Hour
47 default:
48 log.Printf("he's dead jim: %s", rcpt)
49 return
50 }
51 drift += time.Duration(notrand.Int63n(int64(drift / 10)))
52 when := time.Now().UTC().Add(drift)
53 _, err := stmtAddDoover.Exec(when.Format(dbtimeformat), goarounds, userid, rcpt, msg)
54 if err != nil {
55 log.Printf("error saving doover: %s", err)
56 }
57 select {
58 case pokechan <- 0:
59 default:
60 }
61}
62
63var trucksout = 0
64var maxtrucksout = 20
65var garagelock sync.Mutex
66var garagebell = sync.NewCond(&garagelock)
67
68func truckgoesout() {
69 garagelock.Lock()
70 for trucksout >= maxtrucksout {
71 garagebell.Wait()
72 }
73 trucksout++
74 garagelock.Unlock()
75}
76
77func truckcomesin() {
78 garagelock.Lock()
79 trucksout--
80 garagebell.Broadcast()
81 garagelock.Unlock()
82}
83
84func deliverate(goarounds int64, userid int64, rcpt string, msg []byte) {
85 truckgoesout()
86 defer truckcomesin()
87
88 var ki *KeyInfo
89 ok := ziggies.Get(userid, &ki)
90 if !ok {
91 log.Printf("lost key for delivery")
92 return
93 }
94 var inbox string
95 // already did the box indirection
96 if rcpt[0] == '%' {
97 inbox = rcpt[1:]
98 } else {
99 var box *Box
100 ok := boxofboxes.Get(rcpt, &box)
101 if !ok {
102 log.Printf("failed getting inbox for %s", rcpt)
103 sayitagain(goarounds+1, userid, rcpt, msg)
104 return
105 }
106 inbox = box.In
107 }
108 err := PostMsg(ki.keyname, ki.seckey, inbox, msg)
109 if err != nil {
110 log.Printf("failed to post json to %s: %s", inbox, err)
111 sayitagain(goarounds+1, userid, rcpt, msg)
112 return
113 }
114}
115
116var pokechan = make(chan int, 1)
117
118func getdoovers() []Doover {
119 rows, err := stmtGetDoovers.Query()
120 if err != nil {
121 log.Printf("wat?")
122 time.Sleep(1 * time.Minute)
123 return nil
124 }
125 defer rows.Close()
126 var doovers []Doover
127 for rows.Next() {
128 var d Doover
129 var dt string
130 err := rows.Scan(&d.ID, &dt)
131 if err != nil {
132 log.Printf("error scanning dooverid: %s", err)
133 continue
134 }
135 d.When, _ = time.Parse(dbtimeformat, dt)
136 doovers = append(doovers, d)
137 }
138 return doovers
139}
140
141func redeliverator() {
142 sleeper := time.NewTimer(0)
143 for {
144 select {
145 case <-pokechan:
146 if !sleeper.Stop() {
147 <-sleeper.C
148 }
149 time.Sleep(5 * time.Second)
150 case <-sleeper.C:
151 }
152
153 doovers := getdoovers()
154
155 now := time.Now().UTC()
156 nexttime := now.Add(24 * time.Hour)
157 for _, d := range doovers {
158 if d.When.Before(now) {
159 var goarounds, userid int64
160 var rcpt string
161 var msg []byte
162 row := stmtLoadDoover.QueryRow(d.ID)
163 err := row.Scan(&goarounds, &userid, &rcpt, &msg)
164 if err != nil {
165 log.Printf("error scanning doover: %s", err)
166 continue
167 }
168 _, err = stmtZapDoover.Exec(d.ID)
169 if err != nil {
170 log.Printf("error deleting doover: %s", err)
171 continue
172 }
173 log.Printf("redeliverating %s try %d", rcpt, goarounds)
174 deliverate(goarounds, userid, rcpt, msg)
175 } else if d.When.Before(nexttime) {
176 nexttime = d.When
177 }
178 }
179 dur := nexttime.Sub(now).Round(time.Second) + 5*time.Second
180 sleeper.Reset(dur)
181 }
182}