all repos — honk @ 45968526cda8d13cb2a87990d6b57efa64fa55cf

my fork of honk

deliverator.go (view raw)

  1//
  2// Copyright (c) 2019 Ted Unangst <tedu@tedunangst.com>
  3//
  4// Permission to use, copy, modify, and distribute this software for any
  5// purpose with or without fee is hereby granted, provided that the above
  6// copyright notice and this permission notice appear in all copies.
  7//
  8// THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
  9// WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
 10// MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
 11// ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
 12// WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
 13// ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
 14// OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
 15
 16package main
 17
 18import (
 19	"log"
 20	notrand "math/rand"
 21	"time"
 22
 23	"humungus.tedunangst.com/r/webs/gate"
 24)
 25
 26func init() {
 27	notrand.Seed(time.Now().Unix())
 28}
 29
 30type Doover struct {
 31	ID   int64
 32	When time.Time
 33}
 34
 35func sayitagain(goarounds int64, userid int64, rcpt string, msg []byte) {
 36	var drift time.Duration
 37	switch goarounds {
 38	case 1:
 39		drift = 5 * time.Minute
 40	case 2:
 41		drift = 1 * time.Hour
 42	case 3:
 43		drift = 4 * time.Hour
 44	case 4:
 45		drift = 12 * time.Hour
 46	case 5:
 47		drift = 24 * time.Hour
 48	default:
 49		log.Printf("he's dead jim: %s", rcpt)
 50		return
 51	}
 52	drift += time.Duration(notrand.Int63n(int64(drift / 10)))
 53	when := time.Now().UTC().Add(drift)
 54	_, err := stmtAddDoover.Exec(when.Format(dbtimeformat), goarounds, userid, rcpt, msg)
 55	if err != nil {
 56		log.Printf("error saving doover: %s", err)
 57	}
 58	select {
 59	case pokechan <- 0:
 60	default:
 61	}
 62}
 63
 64var garage = gate.NewLimiter(20)
 65
 66func deliverate(goarounds int64, userid int64, rcpt string, msg []byte) {
 67	garage.Start()
 68	defer garage.Finish()
 69
 70	var ki *KeyInfo
 71	ok := ziggies.Get(userid, &ki)
 72	if !ok {
 73		log.Printf("lost key for delivery")
 74		return
 75	}
 76	var inbox string
 77	// already did the box indirection
 78	if rcpt[0] == '%' {
 79		inbox = rcpt[1:]
 80	} else {
 81		var box *Box
 82		ok := boxofboxes.Get(rcpt, &box)
 83		if !ok {
 84			log.Printf("failed getting inbox for %s", rcpt)
 85			sayitagain(goarounds+1, userid, rcpt, msg)
 86			return
 87		}
 88		inbox = box.In
 89	}
 90	err := PostMsg(ki.keyname, ki.seckey, inbox, msg)
 91	if err != nil {
 92		log.Printf("failed to post json to %s: %s", inbox, err)
 93		sayitagain(goarounds+1, userid, rcpt, msg)
 94		return
 95	}
 96}
 97
 98var pokechan = make(chan int, 1)
 99
100func getdoovers() []Doover {
101	rows, err := stmtGetDoovers.Query()
102	if err != nil {
103		log.Printf("wat?")
104		time.Sleep(1 * time.Minute)
105		return nil
106	}
107	defer rows.Close()
108	var doovers []Doover
109	for rows.Next() {
110		var d Doover
111		var dt string
112		err := rows.Scan(&d.ID, &dt)
113		if err != nil {
114			log.Printf("error scanning dooverid: %s", err)
115			continue
116		}
117		d.When, _ = time.Parse(dbtimeformat, dt)
118		doovers = append(doovers, d)
119	}
120	return doovers
121}
122
123func redeliverator() {
124	sleeper := time.NewTimer(0)
125	for {
126		select {
127		case <-pokechan:
128			if !sleeper.Stop() {
129				<-sleeper.C
130			}
131			time.Sleep(5 * time.Second)
132		case <-sleeper.C:
133		}
134
135		doovers := getdoovers()
136
137		now := time.Now().UTC()
138		nexttime := now.Add(24 * time.Hour)
139		for _, d := range doovers {
140			if d.When.Before(now) {
141				var goarounds, userid int64
142				var rcpt string
143				var msg []byte
144				row := stmtLoadDoover.QueryRow(d.ID)
145				err := row.Scan(&goarounds, &userid, &rcpt, &msg)
146				if err != nil {
147					log.Printf("error scanning doover: %s", err)
148					continue
149				}
150				_, err = stmtZapDoover.Exec(d.ID)
151				if err != nil {
152					log.Printf("error deleting doover: %s", err)
153					continue
154				}
155				log.Printf("redeliverating %s try %d", rcpt, goarounds)
156				deliverate(goarounds, userid, rcpt, msg)
157			} else if d.When.Before(nexttime) {
158				nexttime = d.When
159			}
160		}
161		dur := nexttime.Sub(now).Round(time.Second) + 5*time.Second
162		sleeper.Reset(dur)
163	}
164}