convert deliverater queue to per rcpt fifo
Ted Unangst tedu@tedunangst.com
Mon, 12 Jun 2023 14:40:28 -0400
2 files changed,
67 insertions(+),
22 deletions(-)
M
database.go
→
database.go
@@ -1106,6 +1106,7 @@ var stmtSaveMeta, stmtDeleteAllMeta, stmtDeleteOneMeta, stmtDeleteSomeMeta, stmtUpdateHonk *sql.Stmt
var stmtHonksISaved, stmtGetFilters, stmtSaveFilter, stmtDeleteFilter *sql.Stmt var stmtGetTracks *sql.Stmt var stmtSaveChonk, stmtLoadChonks, stmtGetChatters *sql.Stmt +var stmtDeliquentCheck, stmtDeliquentUpdate *sql.Stmt func preparetodie(db *sql.DB, s string) *sql.Stmt { stmt, err := db.Prepare(s)@@ -1192,4 +1193,6 @@ stmtGetTracks = preparetodie(db, "select fetches from tracks where xid = ?")
stmtSaveChonk = preparetodie(db, "insert into chonks (userid, xid, who, target, dt, noise, format) values (?, ?, ?, ?, ?, ?, ?)") stmtLoadChonks = preparetodie(db, "select chonkid, userid, xid, who, target, dt, noise, format from chonks where userid = ? and dt > ? order by chonkid asc") stmtGetChatters = preparetodie(db, "select distinct(target) from chonks where userid = ?") + stmtDeliquentCheck = preparetodie(db, "select dooverid, msg from doovers where rcpt = ?") + stmtDeliquentUpdate = preparetodie(db, "update doovers set data = ? where dooverid = ?") }
M
deliverator.go
→
deliverator.go
@@ -16,8 +16,10 @@
package main import ( - "fmt" + "bytes" + "database/sql" notrand "math/rand" + "sync" "time" "humungus.tedunangst.com/r/webs/gate"@@ -26,9 +28,12 @@
type Doover struct { ID int64 When time.Time + Rcpt string + Msgs [][]byte } -func sayitagain(goarounds int64, userid int64, rcpt string, msg []byte) { +func sayitagain(goarounds int64, userid int64, doover Doover) { + rcpt := doover.Rcpt var drift time.Duration switch goarounds { case 1:@@ -43,12 +48,12 @@ case 5:
drift = 24 * time.Hour default: ilog.Printf("he's dead jim: %s", rcpt) - clearoutbound(rcpt) return } drift += time.Duration(notrand.Int63n(int64(drift / 10))) when := time.Now().Add(drift) - _, err := stmtAddDoover.Exec(when.UTC().Format(dbtimeformat), goarounds, userid, rcpt, msg) + data := bytes.Join(doover.Msgs, []byte{0}) + _, err := stmtAddDoover.Exec(when.UTC().Format(dbtimeformat), goarounds, userid, rcpt, data) if err != nil { elog.Printf("error saving doover: %s", err) }@@ -58,20 +63,45 @@ default:
} } -func clearoutbound(rcpt string) { - hostname := originate(rcpt) - if hostname == "" { +var dqmtx sync.Mutex + +func delinquent(rcpt string, msg []byte) bool { + dqmtx.Lock() + defer dqmtx.Unlock() + row := stmtDeliquentCheck.QueryRow(rcpt) + var dooverid int64 + var data []byte + err := row.Scan(&dooverid, data) + if err == sql.ErrNoRows { + return false + } + if err != nil { + elog.Printf("error scanning deliquent check: %s", err) + return true + } + data = append(data, 0) + data = append(data, msg...) + _, err = stmtDeliquentUpdate.Exec(data, dooverid) + if err != nil { + elog.Printf("error updating deliquent: %s", err) + return true + } + return true +} + +func deliverate(goarounds int64, userid int64, rcpt string, msg []byte) { + if delinquent(rcpt, msg) { return } - xid := fmt.Sprintf("%%https://%s/%%", hostname) - ilog.Printf("clearing outbound for %s", xid) - db := opendatabase() - db.Exec("delete from doovers where rcpt like ?", xid) + var d Doover + d.Rcpt = rcpt + d.Msgs = append(d.Msgs, msg) + deliveration(goarounds, userid, d) } var garage = gate.NewLimiter(40) -func deliverate(goarounds int64, userid int64, rcpt string, msg []byte) { +func deliveration(goarounds int64, userid int64, doover Doover) { garage.Start() defer garage.Finish()@@ -82,6 +112,7 @@ elog.Printf("lost key for delivery")
return } var inbox string + rcpt := doover.Rcpt // already did the box indirection if rcpt[0] == '%' { inbox = rcpt[1:]@@ -90,16 +121,22 @@ var box *Box
ok := boxofboxes.Get(rcpt, &box) if !ok { ilog.Printf("failed getting inbox for %s", rcpt) - sayitagain(goarounds+1, userid, rcpt, msg) + sayitagain(goarounds+1, userid, doover) return } inbox = box.In } - err := PostMsg(ki.keyname, ki.seckey, inbox, msg) - if err != nil { - ilog.Printf("failed to post json to %s: %s", inbox, err) - sayitagain(goarounds+1, userid, rcpt, msg) - return + for i, msg := range doover.Msgs { + if i > 0 { + time.Sleep(2 * time.Second) + } + err := PostMsg(ki.keyname, ki.seckey, inbox, msg) + if err != nil { + ilog.Printf("failed to post json to %s: %s", inbox, err) + doover.Msgs = doover.Msgs[i:] + sayitagain(goarounds+1, userid, doover) + return + } } }@@ -147,21 +184,26 @@ nexttime := now.Add(24 * time.Hour)
for _, d := range doovers { if d.When.Before(now) { var goarounds, userid int64 - var rcpt string - var msg []byte + var data []byte + dqmtx.Lock() row := stmtLoadDoover.QueryRow(d.ID) - err := row.Scan(&goarounds, &userid, &rcpt, &msg) + err := row.Scan(&goarounds, &userid, &d.Rcpt, &data) if err != nil { elog.Printf("error scanning doover: %s", err) + dqmtx.Unlock() // defer continue } _, err = stmtZapDoover.Exec(d.ID) if err != nil { elog.Printf("error deleting doover: %s", err) + dqmtx.Unlock() // defer continue } + dqmtx.Unlock() // defer + d.Msgs = bytes.Split(data, []byte{0}) + rcpt := d.Rcpt ilog.Printf("redeliverating %s try %d", rcpt, goarounds) - deliverate(goarounds, userid, rcpt, msg) + deliveration(goarounds, userid, d) } else if d.When.Before(nexttime) { nexttime = d.When }