all repos — honk @ b816ce1f91e0633e47ba93cc890a629ce4c2951b

my fork of honk

deliverator.go (view raw)

  1//
  2// Copyright (c) 2019 Ted Unangst <tedu@tedunangst.com>
  3//
  4// Permission to use, copy, modify, and distribute this software for any
  5// purpose with or without fee is hereby granted, provided that the above
  6// copyright notice and this permission notice appear in all copies.
  7//
  8// THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
  9// WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
 10// MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
 11// ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
 12// WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
 13// ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
 14// OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
 15
 16package main
 17
 18import (
 19	"bytes"
 20	"database/sql"
 21	notrand "math/rand"
 22	"sync"
 23	"time"
 24
 25	"humungus.tedunangst.com/r/webs/gate"
 26)
 27
 28type Doover struct {
 29	ID     int64
 30	When   time.Time
 31	Userid int64
 32	Tries  int64
 33	Rcpt   string
 34	Msgs   [][]byte
 35}
 36
 37func sayitagain(doover Doover) {
 38	doover.Tries += 1
 39	var drift time.Duration
 40	if doover.Tries <= 3 { // 5, 10, 15 minutes
 41		drift = time.Duration(doover.Tries*5) * time.Minute
 42	} else if doover.Tries <= 6 { // 1, 2, 3 hours
 43		drift = time.Duration(doover.Tries-3) * time.Hour
 44	} else if doover.Tries <= 9 { // 12, 12, 12 hours
 45		drift = time.Duration(12) * time.Hour
 46	} else {
 47		ilog.Printf("he's dead jim: %s", doover.Rcpt)
 48		return
 49	}
 50	drift += time.Duration(notrand.Int63n(int64(drift / 10)))
 51	when := time.Now().Add(drift)
 52	data := bytes.Join(doover.Msgs, []byte{0})
 53	_, err := stmtAddDoover.Exec(when.UTC().Format(dbtimeformat), doover.Tries, doover.Userid, doover.Rcpt, data)
 54	if err != nil {
 55		elog.Printf("error saving doover: %s", err)
 56	}
 57	select {
 58	case pokechan <- 0:
 59	default:
 60	}
 61}
 62
 63var dqmtx sync.Mutex
 64
 65func delinquent(userid int64, rcpt string, msg []byte) bool {
 66	dqmtx.Lock()
 67	defer dqmtx.Unlock()
 68	row := stmtDeliquentCheck.QueryRow(userid, rcpt)
 69	var dooverid int64
 70	var data []byte
 71	err := row.Scan(&dooverid, &data)
 72	if err == sql.ErrNoRows {
 73		return false
 74	}
 75	if err != nil {
 76		elog.Printf("error scanning deliquent check: %s", err)
 77		return true
 78	}
 79	data = append(data, 0)
 80	data = append(data, msg...)
 81	_, err = stmtDeliquentUpdate.Exec(data, dooverid)
 82	if err != nil {
 83		elog.Printf("error updating deliquent: %s", err)
 84		return true
 85	}
 86	return true
 87}
 88
 89func deliverate(userid int64, rcpt string, msg []byte) {
 90	if delinquent(userid, rcpt, msg) {
 91		return
 92	}
 93	var d Doover
 94	d.Userid = userid
 95	d.Tries = 0
 96	d.Rcpt = rcpt
 97	d.Msgs = append(d.Msgs, msg)
 98	deliveration(d)
 99}
100
101var garage = gate.NewLimiter(40)
102
103func deliveration(doover Doover) {
104	garage.Start()
105	defer garage.Finish()
106
107	var ki *KeyInfo
108	ok := ziggies.Get(doover.Userid, &ki)
109	if !ok {
110		elog.Printf("lost key for delivery")
111		return
112	}
113	var inbox string
114	rcpt := doover.Rcpt
115	// already did the box indirection
116	if rcpt[0] == '%' {
117		inbox = rcpt[1:]
118	} else {
119		var box *Box
120		ok := boxofboxes.Get(rcpt, &box)
121		if !ok {
122			ilog.Printf("failed getting inbox for %s", rcpt)
123			sayitagain(doover)
124			return
125		}
126		inbox = box.In
127	}
128	for i, msg := range doover.Msgs {
129		if i > 0 {
130			time.Sleep(2 * time.Second)
131		}
132		err := PostMsg(ki.keyname, ki.seckey, inbox, msg)
133		if err != nil {
134			ilog.Printf("failed to post json to %s: %s", inbox, err)
135			doover.Msgs = doover.Msgs[i:]
136			sayitagain(doover)
137			return
138		}
139	}
140}
141
142var pokechan = make(chan int, 1)
143
144func getdoovers() []Doover {
145	rows, err := stmtGetDoovers.Query()
146	if err != nil {
147		elog.Printf("wat?")
148		time.Sleep(1 * time.Minute)
149		return nil
150	}
151	defer rows.Close()
152	var doovers []Doover
153	for rows.Next() {
154		var d Doover
155		var dt string
156		err := rows.Scan(&d.ID, &dt)
157		if err != nil {
158			elog.Printf("error scanning dooverid: %s", err)
159			continue
160		}
161		d.When, _ = time.Parse(dbtimeformat, dt)
162		doovers = append(doovers, d)
163	}
164	return doovers
165}
166
167func extractdoover(d *Doover) error {
168	dqmtx.Lock()
169	defer dqmtx.Unlock()
170	row := stmtLoadDoover.QueryRow(d.ID)
171	var data []byte
172	err := row.Scan(&d.Tries, &d.Userid, &d.Rcpt, &data)
173	if err != nil {
174		return err
175	}
176	_, err = stmtZapDoover.Exec(d.ID)
177	if err != nil {
178		return err
179	}
180	d.Msgs = bytes.Split(data, []byte{0})
181	return nil
182}
183
184func redeliverator() {
185	sleeper := time.NewTimer(5 * time.Second)
186	for {
187		select {
188		case <-pokechan:
189			if !sleeper.Stop() {
190				<-sleeper.C
191			}
192			time.Sleep(5 * time.Second)
193		case <-sleeper.C:
194		}
195
196		doovers := getdoovers()
197
198		now := time.Now()
199		nexttime := now.Add(24 * time.Hour)
200		for _, d := range doovers {
201			if d.When.Before(now) {
202				err := extractdoover(&d)
203				if err != nil {
204					elog.Printf("error extracting doover: %s", err)
205					continue
206				}
207				ilog.Printf("redeliverating %s try %d", d.Rcpt, d.Tries)
208				deliveration(d)
209			} else if d.When.Before(nexttime) {
210				nexttime = d.When
211			}
212		}
213		now = time.Now()
214		dur := 5 * time.Second
215		if now.Before(nexttime) {
216			dur += nexttime.Sub(now).Round(time.Second)
217		}
218		sleeper.Reset(dur)
219	}
220}