deliverator.go (view raw)
1//
2// Copyright (c) 2019 Ted Unangst <tedu@tedunangst.com>
3//
4// Permission to use, copy, modify, and distribute this software for any
5// purpose with or without fee is hereby granted, provided that the above
6// copyright notice and this permission notice appear in all copies.
7//
8// THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
9// WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
10// MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
11// ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
12// WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
13// ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
14// OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
15
16package main
17
18import (
19 "log"
20 notrand "math/rand"
21 "sync"
22 "time"
23)
24
25func init() {
26 notrand.Seed(time.Now().Unix())
27}
28
29type Doover struct {
30 ID int64
31 When time.Time
32}
33
34func sayitagain(goarounds int, username string, rcpt string, msg []byte) {
35 var drift time.Duration
36 switch goarounds {
37 case 1:
38 drift = 5 * time.Minute
39 case 2:
40 drift = 1 * time.Hour
41 case 3:
42 drift = 12 * time.Hour
43 case 4:
44 drift = 24 * time.Hour
45 default:
46 log.Printf("he's dead jim: %s", rcpt)
47 return
48 }
49 drift += time.Duration(notrand.Int63n(int64(drift / 10)))
50 when := time.Now().UTC().Add(drift)
51 stmtAddDoover.Exec(when.Format(dbtimeformat), goarounds, username, rcpt, msg)
52 select {
53 case pokechan <- 0:
54 default:
55 }
56}
57
58var trucksout = 0
59var maxtrucksout = 10
60var garagelock sync.Mutex
61var garagebell = sync.NewCond(&garagelock)
62
63func truckgoesout() {
64 garagelock.Lock()
65 for trucksout >= maxtrucksout {
66 garagebell.Wait()
67 }
68 trucksout++
69 garagelock.Unlock()
70}
71
72func truckcomesin() {
73 garagelock.Lock()
74 trucksout--
75 garagebell.Broadcast()
76 garagelock.Unlock()
77}
78
79func deliverate(goarounds int, username string, rcpt string, msg []byte) {
80 truckgoesout()
81 defer truckcomesin()
82
83 keyname, key := ziggy(username)
84 var inbox string
85 // already did the box indirection
86 if rcpt[0] == '%' {
87 inbox = rcpt[1:]
88 } else {
89 box, err := getboxes(rcpt)
90 if err != nil {
91 log.Printf("error getting inbox %s: %s", rcpt, err)
92 sayitagain(goarounds+1, username, rcpt, msg)
93 return
94 }
95 inbox = box.In
96 if box.Shared != "" {
97 inbox = box.Shared
98 }
99 }
100 err := PostMsg(keyname, key, inbox, msg)
101 if err != nil {
102 log.Printf("failed to post json to %s: %s", inbox, err)
103 sayitagain(goarounds+1, username, rcpt, msg)
104 return
105 }
106}
107
108var pokechan = make(chan int)
109
110func redeliverator() {
111 sleeper := time.NewTimer(0)
112 for {
113 select {
114 case <-pokechan:
115 if !sleeper.Stop() {
116 <-sleeper.C
117 }
118 time.Sleep(5 * time.Second)
119 case <-sleeper.C:
120 }
121
122 rows, err := stmtGetDoovers.Query()
123 if err != nil {
124 log.Printf("wat?")
125 time.Sleep(1 * time.Minute)
126 continue
127 }
128 var doovers []Doover
129 for rows.Next() {
130 var d Doover
131 var dt string
132 rows.Scan(&d.ID, &dt)
133 d.When, _ = time.Parse(dbtimeformat, dt)
134 doovers = append(doovers, d)
135 }
136 rows.Close()
137 now := time.Now().UTC()
138 nexttime := now.Add(24 * time.Hour)
139 for _, d := range doovers {
140 if d.When.Before(now) {
141 var goarounds int
142 var username, rcpt string
143 var msg []byte
144 row := stmtLoadDoover.QueryRow(d.ID)
145 row.Scan(&goarounds, &username, &rcpt, &msg)
146 stmtZapDoover.Exec(d.ID)
147 log.Printf("redeliverating %s try %d", rcpt, goarounds)
148 deliverate(goarounds, username, rcpt, msg)
149 } else if d.When.Before(nexttime) {
150 nexttime = d.When
151 }
152 }
153 dur := nexttime.Sub(now).Round(time.Second) + 5*time.Second
154 sleeper.Reset(dur)
155 }
156}