all repos — honk @ 813c3cddc806197336b16e3b93a382dc82accb6a

my fork of honk

deliverator.go (view raw)

  1//
  2// Copyright (c) 2019 Ted Unangst <tedu@tedunangst.com>
  3//
  4// Permission to use, copy, modify, and distribute this software for any
  5// purpose with or without fee is hereby granted, provided that the above
  6// copyright notice and this permission notice appear in all copies.
  7//
  8// THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
  9// WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
 10// MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
 11// ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
 12// WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
 13// ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
 14// OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
 15
 16package main
 17
 18import (
 19	"log"
 20	notrand "math/rand"
 21	"sync"
 22	"time"
 23)
 24
 25func init() {
 26	notrand.Seed(time.Now().Unix())
 27}
 28
 29type Doover struct {
 30	ID   int64
 31	When time.Time
 32}
 33
 34func sayitagain(goarounds int, username string, rcpt string, msg []byte) {
 35	var drift time.Duration
 36	switch goarounds {
 37	case 1:
 38		drift = 5 * time.Minute
 39	case 2:
 40		drift = 1 * time.Hour
 41	case 3:
 42		drift = 12 * time.Hour
 43	case 4:
 44		drift = 24 * time.Hour
 45	default:
 46		log.Printf("he's dead jim: %s", rcpt)
 47		return
 48	}
 49	drift += time.Duration(notrand.Int63n(int64(drift / 10)))
 50	when := time.Now().UTC().Add(drift)
 51	stmtAddDoover.Exec(when.Format(dbtimeformat), goarounds, username, rcpt, msg)
 52	select {
 53	case pokechan <- 0:
 54	default:
 55	}
 56}
 57
 58var trucksout = 0
 59var maxtrucksout = 10
 60var garagelock sync.Mutex
 61var garagebell = sync.NewCond(&garagelock)
 62
 63func truckgoesout() {
 64	garagelock.Lock()
 65	for trucksout >= maxtrucksout {
 66		garagebell.Wait()
 67	}
 68	trucksout++
 69	garagelock.Unlock()
 70}
 71
 72func truckcomesin() {
 73	garagelock.Lock()
 74	trucksout--
 75	garagebell.Broadcast()
 76	garagelock.Unlock()
 77}
 78
 79func deliverate(goarounds int, username string, rcpt string, msg []byte) {
 80	truckgoesout()
 81	defer truckcomesin()
 82
 83	keyname, key := ziggy(username)
 84	var inbox string
 85	// already did the box indirection
 86	if rcpt[0] == '%' {
 87		inbox = rcpt[1:]
 88	} else {
 89		box, err := getboxes(rcpt)
 90		if err != nil {
 91			log.Printf("error getting inbox %s: %s", rcpt, err)
 92			sayitagain(goarounds+1, username, rcpt, msg)
 93			return
 94		}
 95		inbox = box.In
 96		if box.Shared != "" {
 97			inbox = box.Shared
 98		}
 99	}
100	err := PostMsg(keyname, key, inbox, msg)
101	if err != nil {
102		log.Printf("failed to post json to %s: %s", inbox, err)
103		sayitagain(goarounds+1, username, rcpt, msg)
104		return
105	}
106}
107
108var pokechan = make(chan int)
109
110func redeliverator() {
111	sleeper := time.NewTimer(0)
112	for {
113		select {
114		case <-pokechan:
115			if !sleeper.Stop() {
116				<-sleeper.C
117			}
118			time.Sleep(5 * time.Second)
119		case <-sleeper.C:
120		}
121
122		rows, err := stmtGetDoovers.Query()
123		if err != nil {
124			log.Printf("wat?")
125			time.Sleep(1 * time.Minute)
126			continue
127		}
128		var doovers []Doover
129		for rows.Next() {
130			var d Doover
131			var dt string
132			rows.Scan(&d.ID, &dt)
133			d.When, _ = time.Parse(dbtimeformat, dt)
134			doovers = append(doovers, d)
135		}
136		rows.Close()
137		now := time.Now().UTC()
138		nexttime := now.Add(24 * time.Hour)
139		for _, d := range doovers {
140			if d.When.Before(now) {
141				var goarounds int
142				var username, rcpt string
143				var msg []byte
144				row := stmtLoadDoover.QueryRow(d.ID)
145				row.Scan(&goarounds, &username, &rcpt, &msg)
146				stmtZapDoover.Exec(d.ID)
147				log.Printf("redeliverating %s try %d", rcpt, goarounds)
148				deliverate(goarounds, username, rcpt, msg)
149			} else if d.When.Before(nexttime) {
150				nexttime = d.When
151			}
152		}
153		dur := nexttime.Sub(now).Round(time.Second) + 5*time.Second
154		sleeper.Reset(dur)
155	}
156}