all repos — honk @ 5052a3e0a3e58c83e671a67b798fc6563e233b08

my fork of honk

deliverator.go (view raw)

  1//
  2// Copyright (c) 2019 Ted Unangst <tedu@tedunangst.com>
  3//
  4// Permission to use, copy, modify, and distribute this software for any
  5// purpose with or without fee is hereby granted, provided that the above
  6// copyright notice and this permission notice appear in all copies.
  7//
  8// THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
  9// WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
 10// MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
 11// ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
 12// WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
 13// ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
 14// OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
 15
 16package main
 17
 18import (
 19	"fmt"
 20	"log"
 21	notrand "math/rand"
 22	"time"
 23
 24	"humungus.tedunangst.com/r/webs/gate"
 25)
 26
 27type Doover struct {
 28	ID   int64
 29	When time.Time
 30}
 31
 32func sayitagain(goarounds int64, userid int64, rcpt string, msg []byte) {
 33	var drift time.Duration
 34	switch goarounds {
 35	case 1:
 36		drift = 5 * time.Minute
 37	case 2:
 38		drift = 1 * time.Hour
 39	case 3:
 40		drift = 4 * time.Hour
 41	case 4:
 42		drift = 12 * time.Hour
 43	case 5:
 44		drift = 24 * time.Hour
 45	default:
 46		log.Printf("he's dead jim: %s", rcpt)
 47		clearoutbound(rcpt)
 48		return
 49	}
 50	drift += time.Duration(notrand.Int63n(int64(drift / 10)))
 51	when := time.Now().UTC().Add(drift)
 52	_, err := stmtAddDoover.Exec(when.Format(dbtimeformat), goarounds, userid, rcpt, msg)
 53	if err != nil {
 54		log.Printf("error saving doover: %s", err)
 55	}
 56	select {
 57	case pokechan <- 0:
 58	default:
 59	}
 60}
 61
 62func clearoutbound(rcpt string) {
 63	hostname := originate(rcpt)
 64	if hostname == "" {
 65		return
 66	}
 67	xid := fmt.Sprintf("%%https://%s/%%", hostname)
 68	log.Printf("clearing outbound for %s", xid)
 69	db := opendatabase()
 70	db.Exec("delete from doovers where rcpt like ?", xid)
 71}
 72
 73var garage = gate.NewLimiter(40)
 74
 75func deliverate(goarounds int64, userid int64, rcpt string, msg []byte) {
 76	garage.Start()
 77	defer garage.Finish()
 78
 79	var ki *KeyInfo
 80	ok := ziggies.Get(userid, &ki)
 81	if !ok {
 82		log.Printf("lost key for delivery")
 83		return
 84	}
 85	var inbox string
 86	// already did the box indirection
 87	if rcpt[0] == '%' {
 88		inbox = rcpt[1:]
 89	} else {
 90		var box *Box
 91		ok := boxofboxes.Get(rcpt, &box)
 92		if !ok {
 93			log.Printf("failed getting inbox for %s", rcpt)
 94			sayitagain(goarounds+1, userid, rcpt, msg)
 95			return
 96		}
 97		inbox = box.In
 98	}
 99	err := PostMsg(ki.keyname, ki.seckey, inbox, msg)
100	if err != nil {
101		log.Printf("failed to post json to %s: %s", inbox, err)
102		sayitagain(goarounds+1, userid, rcpt, msg)
103		return
104	}
105}
106
107var pokechan = make(chan int, 1)
108
109func getdoovers() []Doover {
110	rows, err := stmtGetDoovers.Query()
111	if err != nil {
112		log.Printf("wat?")
113		time.Sleep(1 * time.Minute)
114		return nil
115	}
116	defer rows.Close()
117	var doovers []Doover
118	for rows.Next() {
119		var d Doover
120		var dt string
121		err := rows.Scan(&d.ID, &dt)
122		if err != nil {
123			log.Printf("error scanning dooverid: %s", err)
124			continue
125		}
126		d.When, _ = time.Parse(dbtimeformat, dt)
127		doovers = append(doovers, d)
128	}
129	return doovers
130}
131
132func redeliverator() {
133	sleeper := time.NewTimer(0)
134	for {
135		select {
136		case <-pokechan:
137			if !sleeper.Stop() {
138				<-sleeper.C
139			}
140			time.Sleep(5 * time.Second)
141		case <-sleeper.C:
142		}
143
144		doovers := getdoovers()
145
146		now := time.Now().UTC()
147		nexttime := now.Add(24 * time.Hour)
148		for _, d := range doovers {
149			if d.When.Before(now) {
150				var goarounds, userid int64
151				var rcpt string
152				var msg []byte
153				row := stmtLoadDoover.QueryRow(d.ID)
154				err := row.Scan(&goarounds, &userid, &rcpt, &msg)
155				if err != nil {
156					log.Printf("error scanning doover: %s", err)
157					continue
158				}
159				_, err = stmtZapDoover.Exec(d.ID)
160				if err != nil {
161					log.Printf("error deleting doover: %s", err)
162					continue
163				}
164				log.Printf("redeliverating %s try %d", rcpt, goarounds)
165				deliverate(goarounds, userid, rcpt, msg)
166			} else if d.When.Before(nexttime) {
167				nexttime = d.When
168			}
169		}
170		dur := nexttime.Sub(now).Round(time.Second) + 5*time.Second
171		sleeper.Reset(dur)
172	}
173}