all repos — honk @ 7e2ae34eb0d8e9dfbc090ec340c277686800b75c

my fork of honk

deliverator.go (view raw)

  1//
  2// Copyright (c) 2019 Ted Unangst <tedu@tedunangst.com>
  3//
  4// Permission to use, copy, modify, and distribute this software for any
  5// purpose with or without fee is hereby granted, provided that the above
  6// copyright notice and this permission notice appear in all copies.
  7//
  8// THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
  9// WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
 10// MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
 11// ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
 12// WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
 13// ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
 14// OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
 15
 16package main
 17
 18import (
 19	"bytes"
 20	"database/sql"
 21	notrand "math/rand"
 22	"sync"
 23	"time"
 24
 25	"humungus.tedunangst.com/r/webs/gate"
 26)
 27
 28type Doover struct {
 29	ID   int64
 30	When time.Time
 31	Rcpt string
 32	Msgs [][]byte
 33}
 34
 35func sayitagain(goarounds int64, userid int64, doover Doover) {
 36	rcpt := doover.Rcpt
 37	var drift time.Duration
 38	switch goarounds {
 39	case 1:
 40		drift = 5 * time.Minute
 41	case 2:
 42		drift = 1 * time.Hour
 43	case 3:
 44		drift = 4 * time.Hour
 45	case 4:
 46		drift = 12 * time.Hour
 47	case 5:
 48		drift = 24 * time.Hour
 49	default:
 50		ilog.Printf("he's dead jim: %s", rcpt)
 51		return
 52	}
 53	drift += time.Duration(notrand.Int63n(int64(drift / 10)))
 54	when := time.Now().Add(drift)
 55	data := bytes.Join(doover.Msgs, []byte{0})
 56	_, err := stmtAddDoover.Exec(when.UTC().Format(dbtimeformat), goarounds, userid, rcpt, data)
 57	if err != nil {
 58		elog.Printf("error saving doover: %s", err)
 59	}
 60	select {
 61	case pokechan <- 0:
 62	default:
 63	}
 64}
 65
 66var dqmtx sync.Mutex
 67
 68func delinquent(rcpt string, msg []byte) bool {
 69	dqmtx.Lock()
 70	defer dqmtx.Unlock()
 71	row := stmtDeliquentCheck.QueryRow(rcpt)
 72	var dooverid int64
 73	var data []byte
 74	err := row.Scan(&dooverid, data)
 75	if err == sql.ErrNoRows {
 76		return false
 77	}
 78	if err != nil {
 79		elog.Printf("error scanning deliquent check: %s", err)
 80		return true
 81	}
 82	data = append(data, 0)
 83	data = append(data, msg...)
 84	_, err = stmtDeliquentUpdate.Exec(data, dooverid)
 85	if err != nil {
 86		elog.Printf("error updating deliquent: %s", err)
 87		return true
 88	}
 89	return true
 90}
 91
 92func deliverate(goarounds int64, userid int64, rcpt string, msg []byte) {
 93	if delinquent(rcpt, msg) {
 94		return
 95	}
 96	var d Doover
 97	d.Rcpt = rcpt
 98	d.Msgs = append(d.Msgs, msg)
 99	deliveration(goarounds, userid, d)
100}
101
102var garage = gate.NewLimiter(40)
103
104func deliveration(goarounds int64, userid int64, doover Doover) {
105	garage.Start()
106	defer garage.Finish()
107
108	var ki *KeyInfo
109	ok := ziggies.Get(userid, &ki)
110	if !ok {
111		elog.Printf("lost key for delivery")
112		return
113	}
114	var inbox string
115	rcpt := doover.Rcpt
116	// already did the box indirection
117	if rcpt[0] == '%' {
118		inbox = rcpt[1:]
119	} else {
120		var box *Box
121		ok := boxofboxes.Get(rcpt, &box)
122		if !ok {
123			ilog.Printf("failed getting inbox for %s", rcpt)
124			sayitagain(goarounds+1, userid, doover)
125			return
126		}
127		inbox = box.In
128	}
129	for i, msg := range doover.Msgs {
130		if i > 0 {
131			time.Sleep(2 * time.Second)
132		}
133		err := PostMsg(ki.keyname, ki.seckey, inbox, msg)
134		if err != nil {
135			ilog.Printf("failed to post json to %s: %s", inbox, err)
136			doover.Msgs = doover.Msgs[i:]
137			sayitagain(goarounds+1, userid, doover)
138			return
139		}
140	}
141}
142
143var pokechan = make(chan int, 1)
144
145func getdoovers() []Doover {
146	rows, err := stmtGetDoovers.Query()
147	if err != nil {
148		elog.Printf("wat?")
149		time.Sleep(1 * time.Minute)
150		return nil
151	}
152	defer rows.Close()
153	var doovers []Doover
154	for rows.Next() {
155		var d Doover
156		var dt string
157		err := rows.Scan(&d.ID, &dt)
158		if err != nil {
159			elog.Printf("error scanning dooverid: %s", err)
160			continue
161		}
162		d.When, _ = time.Parse(dbtimeformat, dt)
163		doovers = append(doovers, d)
164	}
165	return doovers
166}
167
168func redeliverator() {
169	sleeper := time.NewTimer(5 * time.Second)
170	for {
171		select {
172		case <-pokechan:
173			if !sleeper.Stop() {
174				<-sleeper.C
175			}
176			time.Sleep(5 * time.Second)
177		case <-sleeper.C:
178		}
179
180		doovers := getdoovers()
181
182		now := time.Now()
183		nexttime := now.Add(24 * time.Hour)
184		for _, d := range doovers {
185			if d.When.Before(now) {
186				var goarounds, userid int64
187				var data []byte
188				dqmtx.Lock()
189				row := stmtLoadDoover.QueryRow(d.ID)
190				err := row.Scan(&goarounds, &userid, &d.Rcpt, &data)
191				if err != nil {
192					elog.Printf("error scanning doover: %s", err)
193					dqmtx.Unlock() // defer
194					continue
195				}
196				_, err = stmtZapDoover.Exec(d.ID)
197				if err != nil {
198					elog.Printf("error deleting doover: %s", err)
199					dqmtx.Unlock() // defer
200					continue
201				}
202				dqmtx.Unlock() // defer
203				d.Msgs = bytes.Split(data, []byte{0})
204				rcpt := d.Rcpt
205				ilog.Printf("redeliverating %s try %d", rcpt, goarounds)
206				deliveration(goarounds, userid, d)
207			} else if d.When.Before(nexttime) {
208				nexttime = d.When
209			}
210		}
211		now = time.Now()
212		dur := 5 * time.Second
213		if now.Before(nexttime) {
214			dur += nexttime.Sub(now).Round(time.Second)
215		}
216		sleeper.Reset(dur)
217	}
218}