all repos — honk @ a11b8e6d794e6f5738fc4782d87b8234e329a200

my fork of honk

deliverator.go (view raw)

  1//
  2// Copyright (c) 2019 Ted Unangst <tedu@tedunangst.com>
  3//
  4// Permission to use, copy, modify, and distribute this software for any
  5// purpose with or without fee is hereby granted, provided that the above
  6// copyright notice and this permission notice appear in all copies.
  7//
  8// THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
  9// WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
 10// MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
 11// ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
 12// WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
 13// ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
 14// OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
 15
 16package main
 17
 18import (
 19	"bytes"
 20	"database/sql"
 21	notrand "math/rand"
 22	"sync"
 23	"time"
 24
 25	"humungus.tedunangst.com/r/webs/gate"
 26)
 27
 28type Doover struct {
 29	ID     int64
 30	When   time.Time
 31	Userid int64
 32	Tries  int64
 33	Rcpt   string
 34	Msgs   [][]byte
 35}
 36
 37func sayitagain(doover Doover) {
 38	rcpt := doover.Rcpt
 39	var drift time.Duration
 40	doover.Tries += 1
 41	switch doover.Tries {
 42	case 1:
 43		drift = 5 * time.Minute
 44	case 2:
 45		drift = 1 * time.Hour
 46	case 3:
 47		drift = 4 * time.Hour
 48	case 4:
 49		drift = 12 * time.Hour
 50	case 5:
 51		drift = 24 * time.Hour
 52	default:
 53		ilog.Printf("he's dead jim: %s", rcpt)
 54		return
 55	}
 56	drift += time.Duration(notrand.Int63n(int64(drift / 10)))
 57	when := time.Now().Add(drift)
 58	data := bytes.Join(doover.Msgs, []byte{0})
 59	_, err := stmtAddDoover.Exec(when.UTC().Format(dbtimeformat), doover.Tries, doover.Userid, rcpt, data)
 60	if err != nil {
 61		elog.Printf("error saving doover: %s", err)
 62	}
 63	select {
 64	case pokechan <- 0:
 65	default:
 66	}
 67}
 68
 69var dqmtx sync.Mutex
 70
 71func delinquent(userid int64, rcpt string, msg []byte) bool {
 72	dqmtx.Lock()
 73	defer dqmtx.Unlock()
 74	row := stmtDeliquentCheck.QueryRow(userid, rcpt)
 75	var dooverid int64
 76	var data []byte
 77	err := row.Scan(&dooverid, &data)
 78	if err == sql.ErrNoRows {
 79		return false
 80	}
 81	if err != nil {
 82		elog.Printf("error scanning deliquent check: %s", err)
 83		return true
 84	}
 85	data = append(data, 0)
 86	data = append(data, msg...)
 87	_, err = stmtDeliquentUpdate.Exec(data, dooverid)
 88	if err != nil {
 89		elog.Printf("error updating deliquent: %s", err)
 90		return true
 91	}
 92	return true
 93}
 94
 95func deliverate(userid int64, rcpt string, msg []byte) {
 96	if delinquent(userid, rcpt, msg) {
 97		return
 98	}
 99	var d Doover
100	d.Userid = userid
101	d.Tries = 0
102	d.Rcpt = rcpt
103	d.Msgs = append(d.Msgs, msg)
104	deliveration(d)
105}
106
107var garage = gate.NewLimiter(40)
108
109func deliveration(doover Doover) {
110	garage.Start()
111	defer garage.Finish()
112
113	var ki *KeyInfo
114	ok := ziggies.Get(doover.Userid, &ki)
115	if !ok {
116		elog.Printf("lost key for delivery")
117		return
118	}
119	var inbox string
120	rcpt := doover.Rcpt
121	// already did the box indirection
122	if rcpt[0] == '%' {
123		inbox = rcpt[1:]
124	} else {
125		var box *Box
126		ok := boxofboxes.Get(rcpt, &box)
127		if !ok {
128			ilog.Printf("failed getting inbox for %s", rcpt)
129			sayitagain(doover)
130			return
131		}
132		inbox = box.In
133	}
134	for i, msg := range doover.Msgs {
135		if i > 0 {
136			time.Sleep(2 * time.Second)
137		}
138		err := PostMsg(ki.keyname, ki.seckey, inbox, msg)
139		if err != nil {
140			ilog.Printf("failed to post json to %s: %s", inbox, err)
141			doover.Msgs = doover.Msgs[i:]
142			sayitagain(doover)
143			return
144		}
145	}
146}
147
148var pokechan = make(chan int, 1)
149
150func getdoovers() []Doover {
151	rows, err := stmtGetDoovers.Query()
152	if err != nil {
153		elog.Printf("wat?")
154		time.Sleep(1 * time.Minute)
155		return nil
156	}
157	defer rows.Close()
158	var doovers []Doover
159	for rows.Next() {
160		var d Doover
161		var dt string
162		err := rows.Scan(&d.ID, &dt)
163		if err != nil {
164			elog.Printf("error scanning dooverid: %s", err)
165			continue
166		}
167		d.When, _ = time.Parse(dbtimeformat, dt)
168		doovers = append(doovers, d)
169	}
170	return doovers
171}
172
173func extractdoover(d *Doover) error {
174	dqmtx.Lock()
175	defer dqmtx.Unlock()
176	row := stmtLoadDoover.QueryRow(d.ID)
177	var data []byte
178	err := row.Scan(&d.Tries, &d.Userid, &d.Rcpt, &data)
179	if err != nil {
180		return err
181	}
182	_, err = stmtZapDoover.Exec(d.ID)
183	if err != nil {
184		return err
185	}
186	d.Msgs = bytes.Split(data, []byte{0})
187	return nil
188}
189
190func redeliverator() {
191	sleeper := time.NewTimer(5 * time.Second)
192	for {
193		select {
194		case <-pokechan:
195			if !sleeper.Stop() {
196				<-sleeper.C
197			}
198			time.Sleep(5 * time.Second)
199		case <-sleeper.C:
200		}
201
202		doovers := getdoovers()
203
204		now := time.Now()
205		nexttime := now.Add(24 * time.Hour)
206		for _, d := range doovers {
207			if d.When.Before(now) {
208				err := extractdoover(&d)
209				if err != nil {
210					elog.Printf("error extracting doover: %s", err)
211					continue
212				}
213				ilog.Printf("redeliverating %s try %d", d.Rcpt, d.Tries)
214				deliveration(d)
215			} else if d.When.Before(nexttime) {
216				nexttime = d.When
217			}
218		}
219		now = time.Now()
220		dur := 5 * time.Second
221		if now.Before(nexttime) {
222			dur += nexttime.Sub(now).Round(time.Second)
223		}
224		sleeper.Reset(dur)
225	}
226}