all repos — honk @ 62434f814a1649adcbee6960a81d9d9a5962896d

my fork of honk

deliverator.go (view raw)

  1//
  2// Copyright (c) 2019 Ted Unangst <tedu@tedunangst.com>
  3//
  4// Permission to use, copy, modify, and distribute this software for any
  5// purpose with or without fee is hereby granted, provided that the above
  6// copyright notice and this permission notice appear in all copies.
  7//
  8// THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
  9// WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
 10// MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
 11// ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
 12// WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
 13// ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
 14// OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
 15
 16package main
 17
 18import (
 19	"bytes"
 20	"database/sql"
 21	notrand "math/rand"
 22	"strings"
 23	"sync"
 24	"time"
 25
 26	"humungus.tedunangst.com/r/webs/gate"
 27)
 28
 29type Doover struct {
 30	ID     int64
 31	When   time.Time
 32	Userid UserID
 33	Tries  int64
 34	Rcpt   string
 35	Msgs   [][]byte
 36}
 37
 38func sayitagain(doover Doover) {
 39	doover.Tries += 1
 40	var drift time.Duration
 41	if doover.Tries <= 3 { // 5, 10, 15 minutes
 42		drift = time.Duration(doover.Tries*5) * time.Minute
 43	} else if doover.Tries <= 6 { // 1, 2, 3 hours
 44		drift = time.Duration(doover.Tries-3) * time.Hour
 45	} else if doover.Tries <= 9 { // 12, 12, 12 hours
 46		drift = time.Duration(12) * time.Hour
 47	} else {
 48		ilog.Printf("he's dead jim: %s", doover.Rcpt)
 49		return
 50	}
 51	drift += time.Duration(notrand.Int63n(int64(drift / 10)))
 52	when := time.Now().Add(drift)
 53	data := bytes.Join(doover.Msgs, []byte{0})
 54	_, err := stmtAddDoover.Exec(when.UTC().Format(dbtimeformat), doover.Tries, doover.Userid, doover.Rcpt, data)
 55	if err != nil {
 56		elog.Printf("error saving doover: %s", err)
 57	}
 58	select {
 59	case pokechan <- 0:
 60	default:
 61	}
 62}
 63
 64const nearlyDead = 8
 65
 66func lethaldose(err error) int64 {
 67	str := err.Error()
 68	if strings.Contains(str, "no such host") {
 69		return nearlyDead
 70	}
 71	return 0
 72}
 73
 74func letitslide(err error) bool {
 75	str := err.Error()
 76	if strings.Contains(str, "http post status: 400") {
 77		return true
 78	}
 79	if strings.Contains(str, "http post status: 422") {
 80		return true
 81	}
 82	return false
 83}
 84
 85var dqmtx sync.Mutex
 86
 87func delinquent(userid UserID, rcpt string, msg []byte) bool {
 88	dqmtx.Lock()
 89	defer dqmtx.Unlock()
 90	row := stmtDeliquentCheck.QueryRow(userid, rcpt)
 91	var dooverid int64
 92	var data []byte
 93	err := row.Scan(&dooverid, &data)
 94	if err == sql.ErrNoRows {
 95		return false
 96	}
 97	if err != nil {
 98		elog.Printf("error scanning deliquent check: %s", err)
 99		return true
100	}
101	data = append(data, 0)
102	data = append(data, msg...)
103	_, err = stmtDeliquentUpdate.Exec(data, dooverid)
104	if err != nil {
105		elog.Printf("error updating deliquent: %s", err)
106		return true
107	}
108	return true
109}
110
111func deliverate(userid UserID, rcpt string, msg []byte) {
112	if delinquent(userid, rcpt, msg) {
113		return
114	}
115	var d Doover
116	d.Userid = userid
117	d.Tries = 0
118	d.Rcpt = rcpt
119	d.Msgs = append(d.Msgs, msg)
120	deliveration(d)
121}
122
123var garage = gate.NewLimiter(40)
124
125func deliveration(doover Doover) {
126	requestWG.Add(1)
127	defer requestWG.Done()
128	rcpt := doover.Rcpt
129	garage.StartKey(rcpt)
130	defer garage.FinishKey(rcpt)
131
132	ki := ziggy(doover.Userid)
133	if ki == nil {
134		elog.Printf("lost key for delivery")
135		return
136	}
137	var inbox string
138	// already did the box indirection
139	if rcpt[0] == '%' {
140		inbox = rcpt[1:]
141	} else {
142		box, ok := boxofboxes.Get(rcpt)
143		if !ok {
144			ilog.Printf("failed getting inbox for %s", rcpt)
145			if doover.Tries < nearlyDead {
146				doover.Tries = nearlyDead
147			}
148			sayitagain(doover)
149			return
150		}
151		inbox = box.In
152	}
153	for i, msg := range doover.Msgs {
154		if i > 0 {
155			time.Sleep(2 * time.Second)
156		}
157		err := PostMsg(ki.keyname, ki.seckey, inbox, msg)
158		if err != nil {
159			ilog.Printf("failed to post json to %s: %s", inbox, err)
160			if t := lethaldose(err); t > doover.Tries {
161				doover.Tries = t
162			}
163			if letitslide(err) {
164				dlog.Printf("whatever myever %s", inbox)
165				continue
166			}
167			doover.Msgs = doover.Msgs[i:]
168			sayitagain(doover)
169			return
170		}
171	}
172}
173
174var pokechan = make(chan int, 1)
175
176func getdoovers() []Doover {
177	rows, err := stmtGetDoovers.Query()
178	if err != nil {
179		elog.Printf("wat?")
180		time.Sleep(1 * time.Minute)
181		return nil
182	}
183	defer rows.Close()
184	var doovers []Doover
185	for rows.Next() {
186		var d Doover
187		var dt string
188		err := rows.Scan(&d.ID, &dt)
189		if err != nil {
190			elog.Printf("error scanning dooverid: %s", err)
191			continue
192		}
193		d.When, _ = time.Parse(dbtimeformat, dt)
194		doovers = append(doovers, d)
195	}
196	return doovers
197}
198
199func extractdoover(d *Doover) error {
200	dqmtx.Lock()
201	defer dqmtx.Unlock()
202	row := stmtLoadDoover.QueryRow(d.ID)
203	var data []byte
204	err := row.Scan(&d.Tries, &d.Userid, &d.Rcpt, &data)
205	if err != nil {
206		return err
207	}
208	_, err = stmtZapDoover.Exec(d.ID)
209	if err != nil {
210		return err
211	}
212	d.Msgs = bytes.Split(data, []byte{0})
213	return nil
214}
215
216func redeliverator() {
217	workinprogress++
218	sleeper := time.NewTimer(5 * time.Second)
219	for {
220		select {
221		case <-pokechan:
222			if !sleeper.Stop() {
223				<-sleeper.C
224			}
225			time.Sleep(5 * time.Second)
226		case <-sleeper.C:
227		case <-endoftheworld:
228			readyalready <- true
229			return
230		}
231
232		doovers := getdoovers()
233
234		now := time.Now()
235		nexttime := now.Add(24 * time.Hour)
236		for _, d := range doovers {
237			if d.When.Before(now) {
238				err := extractdoover(&d)
239				if err != nil {
240					elog.Printf("error extracting doover: %s", err)
241					continue
242				}
243				ilog.Printf("redeliverating %s try %d", d.Rcpt, d.Tries)
244				deliveration(d)
245			} else if d.When.Before(nexttime) {
246				nexttime = d.When
247			}
248		}
249		now = time.Now()
250		dur := 5 * time.Second
251		if now.Before(nexttime) {
252			dur += nexttime.Sub(now).Round(time.Second)
253		}
254		sleeper.Reset(dur)
255	}
256}