the spice must flow. redeliverate with extreme prejudice.
Ted Unangst tedu@tedunangst.com
Sun, 14 Apr 2019 21:35:42 -0400
6 files changed,
90 insertions(+),
8 deletions(-)
M
deliverator.go
→
deliverator.go
@@ -21,6 +21,11 @@ "math/rand"
"time" ) +type Doover struct { + ID int64 + When time.Time +} + func sayitagain(goarounds int, username string, rcpt string, msg []byte) { var drift time.Duration switch goarounds {@@ -36,9 +41,13 @@ default:
log.Printf("he's dead jim: %s", rcpt) return } - drift += time.Duration(rand.Int63n(int64(drift / 16))) + drift += time.Duration(rand.Int63n(int64(drift / 10))) when := time.Now().UTC().Add(drift) - log.Print(when.Format(dbtimeformat), goarounds, username, rcpt, msg) + stmtAddDoover.Exec(when.Format(dbtimeformat), goarounds, username, rcpt, msg) + select { + case pokechan <- 0: + default: + } } func deliverate(goarounds int, username string, rcpt string, msg []byte) {@@ -53,5 +62,56 @@ err = PostMsg(keyname, key, inbox, msg)
if err != nil { log.Printf("failed to post json to %s: %s", inbox, err) sayitagain(goarounds+1, username, rcpt, msg) + return + } +} + +var pokechan = make(chan int) + +func redeliverator() { + sleeper := time.NewTimer(0) + for { + select { + case <-pokechan: + if !sleeper.Stop() { + <-sleeper.C + } + time.Sleep(1 * time.Minute) + case <-sleeper.C: + } + + rows, err := stmtGetDoovers.Query() + if err != nil { + log.Printf("wat?") + time.Sleep(1 * time.Minute) + continue + } + var doovers []Doover + for rows.Next() { + var d Doover + var dt string + rows.Scan(&d.ID, &dt) + d.When, _ = time.Parse(dbtimeformat, dt) + doovers = append(doovers, d) + } + rows.Close() + now := time.Now().UTC() + nexttime := now.Add(24 * time.Hour) + for _, d := range doovers { + if d.When.Before(now) { + var goarounds int + var username, rcpt string + var msg []byte + row := stmtLoadDoover.QueryRow(d.ID) + row.Scan(&goarounds, &username, &rcpt, &msg) + stmtZapDoover.Exec(d.ID) + log.Printf("redeliverating %s try %d", rcpt, goarounds) + deliverate(goarounds, username, rcpt, msg) + } else if d.When.Before(nexttime) { + nexttime = d.When + } + } + dur := nexttime.Sub(now).Round(time.Second) + 1*time.Minute + sleeper.Reset(dur) } }
M
honk.go
→
honk.go
@@ -914,6 +914,8 @@ listener, err := openListener()
if err != nil { log.Fatal(err) } + go redeliverator() + debug := false getconfig("debug", &debug) readviews = ParseTemplates(debug,@@ -975,6 +977,7 @@ var stmtHonkers, stmtDubbers, stmtOneXonk, stmtHonks, stmtUserHonks *sql.Stmt
var stmtHonksForUser, stmtDeleteHonk, stmtSaveDub *sql.Stmt var stmtHonksByHonker, stmtSaveHonk, stmtFileData, stmtWhatAbout *sql.Stmt var stmtFindXonk, stmtSaveDonk, stmtFindFile, stmtSaveFile *sql.Stmt +var stmtAddDoover, stmtGetDoovers, stmtLoadDoover, stmtZapDoover *sql.Stmt func preparetodie(db *sql.DB, s string) *sql.Stmt { stmt, err := db.Prepare(s)@@ -1001,6 +1004,10 @@ stmtFindFile = preparetodie(db, "select fileid from files where url = ?")
stmtSaveFile = preparetodie(db, "insert into files (xid, name, url, media, content) values (?, ?, ?, ?, ?)") stmtWhatAbout = preparetodie(db, "select userid, username, displayname, about, pubkey from users where username = ?") stmtSaveDub = preparetodie(db, "insert into honkers (userid, name, xid, flavor) values (?, ?, ?, ?)") + stmtAddDoover = preparetodie(db, "insert into doovers (dt, tries, username, rcpt, msg) values (?, ?, ?, ?, ?)") + stmtGetDoovers = preparetodie(db, "select dooverid, dt from doovers") + stmtLoadDoover = preparetodie(db, "select tries, username, rcpt, msg from doovers where dooverid = ?") + stmtZapDoover = preparetodie(db, "delete from doovers where dooverid = ?") } func ElaborateUnitTests() {
M
schema.sql
→
schema.sql
@@ -3,6 +3,7 @@ CREATE TABLE honks (honkid integer primary key, userid integer, what text, honker text, xid text, rid text, dt text, url text, audience text, noise text);
CREATE TABLE donks (honkid integer, fileid integer); CREATE TABLE files(fileid integer primary key, xid text, name text, url text, media text, content blob); CREATE TABLE honkers (honkerid integer primary key, userid integer, name text, xid text, flavor text, pubkey text); +create table doovers(dooverid integer primary key, dt text, tries integer, username text, rcpt text, msg blob); create index idx_honksxid on honks(xid); create index idx_honkshonker on honks(honker);
M
upgradedb.go
→
upgradedb.go
@@ -16,24 +16,32 @@
package main import ( + "database/sql" "log" "os" ) +func doordie(db *sql.DB, s string) { + _, err := db.Exec(s) + if err != nil { + log.Fatal(err) + } +} + func upgradedb() { db := opendatabase() dbversion := 0 getconfig("dbversion", &dbversion) - var err error switch dbversion { case 0: - _, err = db.Exec("insert into config (key, value) values ('dbversion', 1)") - if err != nil { - log.Fatal(err) - } + doordie(db, "insert into config (key, value) values ('dbversion', 1)") fallthrough case 1: + doordie(db, "create table doovers(dooverid integer primary key, dt text, tries integer, username text, rcpt text, msg blob)") + doordie(db, "update config set value = 2 where key = 'dbversion'") + fallthrough + case 2: default: log.Fatalf("can't upgrade unknown version %d", dbversion) }