all repos — wsabi @ d351f6e665f889ccc51c7dc808734c44e1f296a3

websocket proxy that sends stats to statsd

Initial implementation of stats

Sends stats every minute to a local statsd server.
Anirudh Oppiliappan x@icyphox.sh
Wed, 29 Jul 2020 18:17:37 +0530
commit

d351f6e665f889ccc51c7dc808734c44e1f296a3

parent

776001e3508e0ffdb0049678c4da33609cbf8943

M .gitignore.gitignore

@@ -1,1 +1,1 @@

-wsabi +bin
M src/config.nimssrc/config.nims

@@ -1,1 +1,1 @@

---d: ssl +--d:ssl
M src/wsabi.nimsrc/wsabi.nim

@@ -1,20 +1,14 @@

import - ws, - asyncdispatch, - asynchttpserver, - strformat, - asyncnet, - wsabipkg/[args,httputils], - httpclient, - strutils, - json, - uri + ws, asyncdispatch, asynchttpserver, strformat, asyncnet, httpclient, + strutils, json, uri +import wsabi/[httputils, args, stats] type Client = object ws: WebSocket remote: WebSocket + connected: bool var server = newAsyncHttpServer()

@@ -24,10 +18,16 @@

proc commRemoteClient(c: Client) {.async.} = ## Fetch from remote and send to client - while c.remote.readyState == Open: - var data = await c.remote.receiveStrPacket() - echo "from remote: ", data - await c.ws.send(data) + try: + while c.remote.readyState == Open and c.connected: + var data = await c.remote.receiveStrPacket() + await parseStanza(data) + echo "from remote: ", data + await c.ws.send(data) + except OSError: + discard + except WebSocketError: + discard proc localServer(req: Request) {.async, gcsafe.} = ## Listen on localhost:PORT/ws

@@ -39,25 +39,29 @@ let bits = parseUri(remoteHost[0])

newreq.headers["host"] = @[fmt"{bits.hostname}:{bits.port}"] newreq.headers["hostname"] = @[bits.hostname] - client.ws = await newWebSocket(newreq, protocol = "xmpp") - echo "connecting to remote host..." - client.remote = await newWebSocket(remoteHost[0], protocol = "xmpp") - let (address, port) = client.remote.tcpSocket.getPeerAddr() - echo fmt"connected to {address}:{port.int}" - clients.add client + try: + client.ws = await newWebSocket(newreq, protocol = "xmpp") + client.connected = true + echo "connecting to remote host..." + client.remote = await newWebSocket(remoteHost[0], protocol = "xmpp") + let (address, port) = client.remote.tcpSocket.getPeerAddr() + echo fmt"connected to {address}:{port.int}" + clients.add client - try: while client.ws.readyState == Open: let clientAddress = client.ws.tcpSocket.getPeerAddr()[0] var packet = await client.ws.receiveStrPacket() repacket = packet.replace(clientAddress, by=bits.hostname) echo "from local: " & repacket + await parseStanza(repacket) await client.remote.send(repacket) echo "sent local packet" asyncCheck commRemoteClient(client) except WebSocketError as e: + client.ws.close() + client.remote.close() echo "client closed socket: ", e.msg ## Health check endpoint
A src/wsabi/stats.nim

@@ -0,0 +1,40 @@

+# parse xml stanzas and send stats to statsd + +import + xmltree, xmlparser, + asyncdispatch, asyncnet, nativesockets, + strformat, + tables + + +var + statsd: AsyncSocket + clientAddr: string + counter = initCountTable[string]() +const prefix = "wsabi.dev." +let tags = @["message", "auth", "success", "failure"] + + +proc connectStatsd() {.async.} = + statsd = await asyncnet.dial("127.0.0.1", 8125.Port, IPPROTO_UDP) + +proc resetCounter(fd: AsyncFD): bool {.gcsafe.} = + for tag in tags: + if counter[tag] != 0: + waitFor statsd.send(&"{prefix}{tag}:{counter[tag]}|c") + counter[tag] = 0 + +proc parseStanza*(stanza: string) {.async.} = + try: + let p = parseXml(stanza) + try: + if p.tag in tags: + counter.inc(p.tag) + addTimer(60_000, false, resetCounter) + except OSError: + echo "unable to reach statsd" + except XmlError: + discard + + +asyncCheck connectStatsd()
M wsabi.nimblewsabi.nimble

@@ -6,6 +6,7 @@ description = "Load balance, monitor and failover Openfire servers"

license = "MIT" srcDir = "src" bin = @["wsabi"] +binDir = "bin" # Dependencies