all repos — honk @ 4dc42a9ed445ac4351aaa15f9742065dacce3558

my fork of honk

deliverator.go (view raw)

  1//
  2// Copyright (c) 2019 Ted Unangst <tedu@tedunangst.com>
  3//
  4// Permission to use, copy, modify, and distribute this software for any
  5// purpose with or without fee is hereby granted, provided that the above
  6// copyright notice and this permission notice appear in all copies.
  7//
  8// THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
  9// WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
 10// MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
 11// ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
 12// WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
 13// ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
 14// OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
 15
 16package main
 17
 18import (
 19	"log"
 20	notrand "math/rand"
 21	"time"
 22
 23	"humungus.tedunangst.com/r/webs/gate"
 24)
 25
 26type Doover struct {
 27	ID   int64
 28	When time.Time
 29}
 30
 31func sayitagain(goarounds int64, userid int64, rcpt string, msg []byte) {
 32	var drift time.Duration
 33	switch goarounds {
 34	case 1:
 35		drift = 5 * time.Minute
 36	case 2:
 37		drift = 1 * time.Hour
 38	case 3:
 39		drift = 4 * time.Hour
 40	case 4:
 41		drift = 12 * time.Hour
 42	case 5:
 43		drift = 24 * time.Hour
 44	default:
 45		log.Printf("he's dead jim: %s", rcpt)
 46		return
 47	}
 48	drift += time.Duration(notrand.Int63n(int64(drift / 10)))
 49	when := time.Now().UTC().Add(drift)
 50	_, err := stmtAddDoover.Exec(when.Format(dbtimeformat), goarounds, userid, rcpt, msg)
 51	if err != nil {
 52		log.Printf("error saving doover: %s", err)
 53	}
 54	select {
 55	case pokechan <- 0:
 56	default:
 57	}
 58}
 59
 60var garage = gate.NewLimiter(20)
 61
 62func deliverate(goarounds int64, userid int64, rcpt string, msg []byte) {
 63	garage.Start()
 64	defer garage.Finish()
 65
 66	var ki *KeyInfo
 67	ok := ziggies.Get(userid, &ki)
 68	if !ok {
 69		log.Printf("lost key for delivery")
 70		return
 71	}
 72	var inbox string
 73	// already did the box indirection
 74	if rcpt[0] == '%' {
 75		inbox = rcpt[1:]
 76	} else {
 77		var box *Box
 78		ok := boxofboxes.Get(rcpt, &box)
 79		if !ok {
 80			log.Printf("failed getting inbox for %s", rcpt)
 81			sayitagain(goarounds+1, userid, rcpt, msg)
 82			return
 83		}
 84		inbox = box.In
 85	}
 86	err := PostMsg(ki.keyname, ki.seckey, inbox, msg)
 87	if err != nil {
 88		log.Printf("failed to post json to %s: %s", inbox, err)
 89		sayitagain(goarounds+1, userid, rcpt, msg)
 90		return
 91	}
 92}
 93
 94var pokechan = make(chan int, 1)
 95
 96func getdoovers() []Doover {
 97	rows, err := stmtGetDoovers.Query()
 98	if err != nil {
 99		log.Printf("wat?")
100		time.Sleep(1 * time.Minute)
101		return nil
102	}
103	defer rows.Close()
104	var doovers []Doover
105	for rows.Next() {
106		var d Doover
107		var dt string
108		err := rows.Scan(&d.ID, &dt)
109		if err != nil {
110			log.Printf("error scanning dooverid: %s", err)
111			continue
112		}
113		d.When, _ = time.Parse(dbtimeformat, dt)
114		doovers = append(doovers, d)
115	}
116	return doovers
117}
118
119func redeliverator() {
120	sleeper := time.NewTimer(0)
121	for {
122		select {
123		case <-pokechan:
124			if !sleeper.Stop() {
125				<-sleeper.C
126			}
127			time.Sleep(5 * time.Second)
128		case <-sleeper.C:
129		}
130
131		doovers := getdoovers()
132
133		now := time.Now().UTC()
134		nexttime := now.Add(24 * time.Hour)
135		for _, d := range doovers {
136			if d.When.Before(now) {
137				var goarounds, userid int64
138				var rcpt string
139				var msg []byte
140				row := stmtLoadDoover.QueryRow(d.ID)
141				err := row.Scan(&goarounds, &userid, &rcpt, &msg)
142				if err != nil {
143					log.Printf("error scanning doover: %s", err)
144					continue
145				}
146				_, err = stmtZapDoover.Exec(d.ID)
147				if err != nil {
148					log.Printf("error deleting doover: %s", err)
149					continue
150				}
151				log.Printf("redeliverating %s try %d", rcpt, goarounds)
152				deliverate(goarounds, userid, rcpt, msg)
153			} else if d.When.Before(nexttime) {
154				nexttime = d.When
155			}
156		}
157		dur := nexttime.Sub(now).Round(time.Second) + 5*time.Second
158		sleeper.Reset(dur)
159	}
160}