all repos — honk @ 4a94132ee1da9c769ac5ec462c7866efa94b9fe7

my fork of honk

deliverator.go (view raw)

  1//
  2// Copyright (c) 2019 Ted Unangst <tedu@tedunangst.com>
  3//
  4// Permission to use, copy, modify, and distribute this software for any
  5// purpose with or without fee is hereby granted, provided that the above
  6// copyright notice and this permission notice appear in all copies.
  7//
  8// THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
  9// WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
 10// MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
 11// ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
 12// WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
 13// ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
 14// OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
 15
 16package main
 17
 18import (
 19	"fmt"
 20	"log"
 21	notrand "math/rand"
 22	"time"
 23
 24	"humungus.tedunangst.com/r/webs/gate"
 25)
 26
 27type Doover struct {
 28	ID   int64
 29	When time.Time
 30}
 31
 32func sayitagain(goarounds int64, userid int64, rcpt string, msg []byte) {
 33	var drift time.Duration
 34	switch goarounds {
 35	case 1:
 36		drift = 5 * time.Minute
 37	case 2:
 38		drift = 1 * time.Hour
 39	case 3:
 40		drift = 4 * time.Hour
 41	case 4:
 42		drift = 12 * time.Hour
 43	case 5:
 44		drift = 24 * time.Hour
 45	default:
 46		log.Printf("he's dead jim: %s", rcpt)
 47		clearoutbound(rcpt)
 48		return
 49	}
 50	drift += time.Duration(notrand.Int63n(int64(drift / 10)))
 51	when := time.Now().UTC().Add(drift)
 52	_, err := stmtAddDoover.Exec(when.Format(dbtimeformat), goarounds, userid, rcpt, msg)
 53	if err != nil {
 54		log.Printf("error saving doover: %s", err)
 55	}
 56	select {
 57	case pokechan <- 0:
 58	default:
 59	}
 60}
 61
 62func clearoutbound(rcpt string) {
 63	hostname := originate(rcpt)
 64	if hostname == "" {
 65		return
 66	}
 67	xid := fmt.Sprintf("%%https://%s/%%", hostname)
 68	log.Printf("clearing outbound for %s", xid)
 69	db := opendatabase()
 70	db.Exec("delete from doovers where rcpt like ?", xid)
 71}
 72
 73var garage = gate.NewLimiter(40)
 74
 75func deliverate(goarounds int64, userid int64, rcpt string, msg []byte, prio bool) {
 76	garage.Start()
 77	defer garage.Finish()
 78
 79	var ki *KeyInfo
 80	ok := ziggies.Get(userid, &ki)
 81	if !ok {
 82		log.Printf("lost key for delivery")
 83		return
 84	}
 85	var inbox string
 86	// already did the box indirection
 87	if rcpt[0] == '%' {
 88		inbox = rcpt[1:]
 89	} else {
 90		var box *Box
 91		ok := boxofboxes.Get(rcpt, &box)
 92		if !ok {
 93			log.Printf("failed getting inbox for %s", rcpt)
 94			sayitagain(goarounds+1, userid, rcpt, msg)
 95			return
 96		}
 97		inbox = box.In
 98	}
 99	err := PostMsg(ki.keyname, ki.seckey, inbox, msg)
100	if err != nil {
101		log.Printf("failed to post json to %s: %s", inbox, err)
102		if prio {
103			sayitagain(goarounds+1, userid, rcpt, msg)
104		}
105		return
106	}
107}
108
109var pokechan = make(chan int, 1)
110
111func getdoovers() []Doover {
112	rows, err := stmtGetDoovers.Query()
113	if err != nil {
114		log.Printf("wat?")
115		time.Sleep(1 * time.Minute)
116		return nil
117	}
118	defer rows.Close()
119	var doovers []Doover
120	for rows.Next() {
121		var d Doover
122		var dt string
123		err := rows.Scan(&d.ID, &dt)
124		if err != nil {
125			log.Printf("error scanning dooverid: %s", err)
126			continue
127		}
128		d.When, _ = time.Parse(dbtimeformat, dt)
129		doovers = append(doovers, d)
130	}
131	return doovers
132}
133
134func redeliverator() {
135	sleeper := time.NewTimer(0)
136	for {
137		select {
138		case <-pokechan:
139			if !sleeper.Stop() {
140				<-sleeper.C
141			}
142			time.Sleep(5 * time.Second)
143		case <-sleeper.C:
144		}
145
146		doovers := getdoovers()
147
148		now := time.Now().UTC()
149		nexttime := now.Add(24 * time.Hour)
150		for _, d := range doovers {
151			if d.When.Before(now) {
152				var goarounds, userid int64
153				var rcpt string
154				var msg []byte
155				row := stmtLoadDoover.QueryRow(d.ID)
156				err := row.Scan(&goarounds, &userid, &rcpt, &msg)
157				if err != nil {
158					log.Printf("error scanning doover: %s", err)
159					continue
160				}
161				_, err = stmtZapDoover.Exec(d.ID)
162				if err != nil {
163					log.Printf("error deleting doover: %s", err)
164					continue
165				}
166				log.Printf("redeliverating %s try %d", rcpt, goarounds)
167				deliverate(goarounds, userid, rcpt, msg, true)
168			} else if d.When.Before(nexttime) {
169				nexttime = d.When
170			}
171		}
172		dur := nexttime.Sub(now).Round(time.Second) + 5*time.Second
173		sleeper.Reset(dur)
174	}
175}