all repos — honk @ ddc61615a1aafa81de8da6e6c55697e11e134bd9

my fork of honk

deliverator.go (view raw)

  1//
  2// Copyright (c) 2019 Ted Unangst <tedu@tedunangst.com>
  3//
  4// Permission to use, copy, modify, and distribute this software for any
  5// purpose with or without fee is hereby granted, provided that the above
  6// copyright notice and this permission notice appear in all copies.
  7//
  8// THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
  9// WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
 10// MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
 11// ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
 12// WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
 13// ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
 14// OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
 15
 16package main
 17
 18import (
 19	"log"
 20	notrand "math/rand"
 21	"sync"
 22	"time"
 23)
 24
 25func init() {
 26	notrand.Seed(time.Now().Unix())
 27}
 28
 29type Doover struct {
 30	ID   int64
 31	When time.Time
 32}
 33
 34func sayitagain(goarounds int64, userid int64, rcpt string, msg []byte) {
 35	var drift time.Duration
 36	switch goarounds {
 37	case 1:
 38		drift = 5 * time.Minute
 39	case 2:
 40		drift = 1 * time.Hour
 41	case 3:
 42		drift = 4 * time.Hour
 43	case 4:
 44		drift = 12 * time.Hour
 45	case 5:
 46		drift = 24 * time.Hour
 47	default:
 48		log.Printf("he's dead jim: %s", rcpt)
 49		return
 50	}
 51	drift += time.Duration(notrand.Int63n(int64(drift / 10)))
 52	when := time.Now().UTC().Add(drift)
 53	_, err := stmtAddDoover.Exec(when.Format(dbtimeformat), goarounds, userid, rcpt, msg)
 54	if err != nil {
 55		log.Printf("error saving doover: %s", err)
 56	}
 57	select {
 58	case pokechan <- 0:
 59	default:
 60	}
 61}
 62
 63var trucksout = 0
 64var maxtrucksout = 20
 65var garagelock sync.Mutex
 66var garagebell = sync.NewCond(&garagelock)
 67
 68func truckgoesout() {
 69	garagelock.Lock()
 70	for trucksout >= maxtrucksout {
 71		garagebell.Wait()
 72	}
 73	trucksout++
 74	garagelock.Unlock()
 75}
 76
 77func truckcomesin() {
 78	garagelock.Lock()
 79	trucksout--
 80	garagebell.Broadcast()
 81	garagelock.Unlock()
 82}
 83
 84func deliverate(goarounds int64, userid int64, rcpt string, msg []byte) {
 85	truckgoesout()
 86	defer truckcomesin()
 87
 88	var ki *KeyInfo
 89	ok := ziggies.Get(userid, &ki)
 90	if !ok {
 91		log.Printf("lost key for delivery")
 92		return
 93	}
 94	var inbox string
 95	// already did the box indirection
 96	if rcpt[0] == '%' {
 97		inbox = rcpt[1:]
 98	} else {
 99		var box *Box
100		ok := boxofboxes.Get(rcpt, &box)
101		if !ok {
102			log.Printf("failed getting inbox for %s", rcpt)
103			sayitagain(goarounds+1, userid, rcpt, msg)
104			return
105		}
106		inbox = box.In
107	}
108	err := PostMsg(ki.keyname, ki.seckey, inbox, msg)
109	if err != nil {
110		log.Printf("failed to post json to %s: %s", inbox, err)
111		sayitagain(goarounds+1, userid, rcpt, msg)
112		return
113	}
114}
115
116var pokechan = make(chan int, 1)
117
118func getdoovers() []Doover {
119	rows, err := stmtGetDoovers.Query()
120	if err != nil {
121		log.Printf("wat?")
122		time.Sleep(1 * time.Minute)
123		return nil
124	}
125	defer rows.Close()
126	var doovers []Doover
127	for rows.Next() {
128		var d Doover
129		var dt string
130		err := rows.Scan(&d.ID, &dt)
131		if err != nil {
132			log.Printf("error scanning dooverid: %s", err)
133			continue
134		}
135		d.When, _ = time.Parse(dbtimeformat, dt)
136		doovers = append(doovers, d)
137	}
138	return doovers
139}
140
141func redeliverator() {
142	sleeper := time.NewTimer(0)
143	for {
144		select {
145		case <-pokechan:
146			if !sleeper.Stop() {
147				<-sleeper.C
148			}
149			time.Sleep(5 * time.Second)
150		case <-sleeper.C:
151		}
152
153		doovers := getdoovers()
154
155		now := time.Now().UTC()
156		nexttime := now.Add(24 * time.Hour)
157		for _, d := range doovers {
158			if d.When.Before(now) {
159				var goarounds, userid int64
160				var rcpt string
161				var msg []byte
162				row := stmtLoadDoover.QueryRow(d.ID)
163				err := row.Scan(&goarounds, &userid, &rcpt, &msg)
164				if err != nil {
165					log.Printf("error scanning doover: %s", err)
166					continue
167				}
168				_, err = stmtZapDoover.Exec(d.ID)
169				if err != nil {
170					log.Printf("error deleting doover: %s", err)
171					continue
172				}
173				log.Printf("redeliverating %s try %d", rcpt, goarounds)
174				deliverate(goarounds, userid, rcpt, msg)
175			} else if d.When.Before(nexttime) {
176				nexttime = d.When
177			}
178		}
179		dur := nexttime.Sub(now).Round(time.Second) + 5*time.Second
180		sleeper.Reset(dur)
181	}
182}