all repos — honk @ de456f02f96590b1431fc37a99cd84f7c2e3ccd3

my fork of honk

deliverator.go (view raw)

  1//
  2// Copyright (c) 2019 Ted Unangst <tedu@tedunangst.com>
  3//
  4// Permission to use, copy, modify, and distribute this software for any
  5// purpose with or without fee is hereby granted, provided that the above
  6// copyright notice and this permission notice appear in all copies.
  7//
  8// THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
  9// WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
 10// MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
 11// ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
 12// WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
 13// ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
 14// OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
 15
 16package main
 17
 18import (
 19	"fmt"
 20	notrand "math/rand"
 21	"time"
 22
 23	"humungus.tedunangst.com/r/webs/gate"
 24)
 25
 26type Doover struct {
 27	ID   int64
 28	When time.Time
 29}
 30
 31func sayitagain(goarounds int64, userid int64, rcpt string, msg []byte) {
 32	var drift time.Duration
 33	switch goarounds {
 34	case 1:
 35		drift = 5 * time.Minute
 36	case 2:
 37		drift = 1 * time.Hour
 38	case 3:
 39		drift = 4 * time.Hour
 40	case 4:
 41		drift = 12 * time.Hour
 42	case 5:
 43		drift = 24 * time.Hour
 44	default:
 45		ilog.Printf("he's dead jim: %s", rcpt)
 46		clearoutbound(rcpt)
 47		return
 48	}
 49	drift += time.Duration(notrand.Int63n(int64(drift / 10)))
 50	when := time.Now().Add(drift)
 51	_, err := stmtAddDoover.Exec(when.UTC().Format(dbtimeformat), goarounds, userid, rcpt, msg)
 52	if err != nil {
 53		elog.Printf("error saving doover: %s", err)
 54	}
 55	select {
 56	case pokechan <- 0:
 57	default:
 58	}
 59}
 60
 61func clearoutbound(rcpt string) {
 62	hostname := originate(rcpt)
 63	if hostname == "" {
 64		return
 65	}
 66	xid := fmt.Sprintf("%%https://%s/%%", hostname)
 67	ilog.Printf("clearing outbound for %s", xid)
 68	db := opendatabase()
 69	db.Exec("delete from doovers where rcpt like ?", xid)
 70}
 71
 72var garage = gate.NewLimiter(40)
 73
 74func deliverate(goarounds int64, userid int64, rcpt string, msg []byte) {
 75	garage.Start()
 76	defer garage.Finish()
 77
 78	var ki *KeyInfo
 79	ok := ziggies.Get(userid, &ki)
 80	if !ok {
 81		elog.Printf("lost key for delivery")
 82		return
 83	}
 84	var inbox string
 85	// already did the box indirection
 86	if rcpt[0] == '%' {
 87		inbox = rcpt[1:]
 88	} else {
 89		var box *Box
 90		ok := boxofboxes.Get(rcpt, &box)
 91		if !ok {
 92			ilog.Printf("failed getting inbox for %s", rcpt)
 93			sayitagain(goarounds+1, userid, rcpt, msg)
 94			return
 95		}
 96		inbox = box.In
 97	}
 98	err := PostMsg(ki.keyname, ki.seckey, inbox, msg)
 99	if err != nil {
100		ilog.Printf("failed to post json to %s: %s", inbox, err)
101		sayitagain(goarounds+1, userid, rcpt, msg)
102		return
103	}
104}
105
106var pokechan = make(chan int, 1)
107
108func getdoovers() []Doover {
109	rows, err := stmtGetDoovers.Query()
110	if err != nil {
111		elog.Printf("wat?")
112		time.Sleep(1 * time.Minute)
113		return nil
114	}
115	defer rows.Close()
116	var doovers []Doover
117	for rows.Next() {
118		var d Doover
119		var dt string
120		err := rows.Scan(&d.ID, &dt)
121		if err != nil {
122			elog.Printf("error scanning dooverid: %s", err)
123			continue
124		}
125		d.When, _ = time.Parse(dbtimeformat, dt)
126		doovers = append(doovers, d)
127	}
128	return doovers
129}
130
131func redeliverator() {
132	sleeper := time.NewTimer(5 * time.Second)
133	for {
134		select {
135		case <-pokechan:
136			if !sleeper.Stop() {
137				<-sleeper.C
138			}
139			time.Sleep(5 * time.Second)
140		case <-sleeper.C:
141		}
142
143		doovers := getdoovers()
144
145		now := time.Now()
146		nexttime := now.Add(24 * time.Hour)
147		for _, d := range doovers {
148			if d.When.Before(now) {
149				var goarounds, userid int64
150				var rcpt string
151				var msg []byte
152				row := stmtLoadDoover.QueryRow(d.ID)
153				err := row.Scan(&goarounds, &userid, &rcpt, &msg)
154				if err != nil {
155					elog.Printf("error scanning doover: %s", err)
156					continue
157				}
158				_, err = stmtZapDoover.Exec(d.ID)
159				if err != nil {
160					elog.Printf("error deleting doover: %s", err)
161					continue
162				}
163				ilog.Printf("redeliverating %s try %d", rcpt, goarounds)
164				deliverate(goarounds, userid, rcpt, msg)
165			} else if d.When.Before(nexttime) {
166				nexttime = d.When
167			}
168		}
169		now = time.Now()
170		dur := 5 * time.Second
171		if now.Before(nexttime) {
172			dur += nexttime.Sub(now).Round(time.Second)
173		}
174		sleeper.Reset(dur)
175	}
176}