Fix some bugs with message delivery

This commit is contained in:
Jonathan Staab
2023-03-28 14:23:31 -05:00
parent 9b6a779397
commit 5a1a72491e
20 changed files with 275 additions and 224 deletions
+91
View File
@@ -0,0 +1,91 @@
import type {EventBus} from './util/EventBus.ts'
const createFilterId = filters =>
[Math.random().toString().slice(2, 6), filters.map(describeFilter).join(":")].join("-")
const describeFilter = ({kinds = [], ...filter}) => {
const parts = []
parts.push(kinds.join(","))
for (const [key, value] of Object.entries(filter)) {
if (value instanceof Array) {
parts.push(`${key}[${value.length}]`)
} else {
parts.push(key)
}
}
return "(" + parts.join(",") + ")"
}
type Executable = {
bus: EventBus
send: (verb: string, ...args) => void
}
export class Executor {
target: Executable
constructor(target) {
this.target = target
}
subscribe(filters, {onEvent, onEose}) {
const id = createFilterId(filters)
const unsubscribe = this.target.bus.addListeners({
EVENT: (url, subid, e) => subid === id && onEvent?.(url, e),
EOSE: (url, subid) => subid === id && onEose?.(url),
})
this.target.send("REQ", id, ...filters)
return {
unsubscribe: () => {
this.target.send("CLOSE", id)
unsubscribe()
},
}
}
publish(event, {onOk, onError}) {
const unsubscribe = this.target.bus.addListeners({
OK: (url, id, ...payload) => id === event.id && onOk(url, ...payload),
ERROR: (url, id, ...payload) => id === event.id && onError(url, ...payload),
})
this.target.send("EVENT", event)
return {unsubscribe}
}
count(filters, {onCount}) {
const id = createFilterId(filters)
const unsubscribe = this.target.bus.addListeners({
COUNT: (url, subid, ...payload) => {
if (subid === id) {
onCount(url, ...payload)
unsubscribe()
}
}
})
this.target.send("COUNT", id, ...filters)
return {unsubscribe}
}
handleAuth({onAuth, onOk}) {
let event
const unsubscribe = this.target.bus.addListeners({
AUTH: async (url, challenge) => {
event = await onAuth(url, challenge)
},
OK: (url, id, ok, message) => {
if (id === event?.id) {
event = null
onOk(url, id, ok, message)
}
}
})
return {unsubscribe}
}
}
+23
View File
@@ -0,0 +1,23 @@
import {EventBus} from "./util/EventBus"
export class Plex {
constructor(urls, socket) {
this.urls = urls
this.socket = socket
this.bus = new EventBus()
this.listeners = sockets.map(socket => {
return socket.bus.addListener('message', (url, [verb, ...payload]) => {
this.bus.emit(verb, url, ...payload)
})
})
}
async send(...payload) {
await this.socket.connect()
this.socket.send([{relays: this.urls}, payload])
}
cleanup() {
this.bus.clear()
this.listeners.map(unsubscribe => unsubscribe())
}
}
+31
View File
@@ -0,0 +1,31 @@
import {Socket} from "./util/Socket"
export class Pool {
data: Map<string, Socket>
constructor() {
this.data = new Map()
}
has(url) {
return this.data.has(url)
}
get(url) {
if (!this.data.has(url)) {
this.data.set(url, new Socket(url))
}
return this.data.get(url)
}
remove(url) {
const socket = this.data.get(url)
if (socket) {
socket.disconnect()
this.data.delete(url)
}
}
clear() {
for (const url of this.data.keys()) {
this.remove(url)
}
}
}
+22
View File
@@ -0,0 +1,22 @@
import {EventBus} from "./util/EventBus"
export class Relay {
constructor(socket) {
this.socket = socket
this.bus = new EventBus()
this.listeners = sockets.map(socket => {
return socket.bus.addListener('message', (url, [verb, ...payload]) => {
this.bus.emit(verb, url, ...payload)
})
})
}
async send(...payload) {
await this.socket.connect()
this.socket.send(payload)
}
cleanup() {
this.bus.clear()
this.listeners.map(unsubscribe => unsubscribe())
}
}
+27
View File
@@ -0,0 +1,27 @@
import {Socket} from './util/Socket'
import {EventBus} from './util/EventBus'
export class Relays {
sockets: Socket[]
bus: EventBus
constructor(sockets) {
this.sockets = sockets
this.bus = new EventBus()
this.listeners = sockets.map(socket => {
return socket.bus.addListener('message', (url, [verb, ...payload]) => {
this.bus.emit(verb, url, ...payload)
})
})
}
send(...payload) {
this.sockets.forEach(async socket => {
await socket.connect()
socket.send(payload)
})
}
cleanup() {
this.bus.clear()
this.listeners.map(unsubscribe => unsubscribe())
}
}
+8
View File
@@ -0,0 +1,8 @@
export * from "./util/EventBus"
export * from "./util/Deferred"
export * from "./util/Socket"
export * from "./Executor"
export * from "./Plex"
export * from "./Pool"
export * from "./Relay"
export * from "./Relays"
+14
View File
@@ -0,0 +1,14 @@
export type Deferred<T> = Promise<T> & {
resolve: (arg: T) => void
reject: (arg: T) => void
}
export const defer = (): Deferred<any> => {
let resolve, reject
const p = new Promise((resolve_, reject_) => {
resolve = resolve_
reject = reject_
})
return Object.assign(p, {resolve, reject}) as any
}
+35
View File
@@ -0,0 +1,35 @@
export type EventBusHandler = (...args: any[]) => void
export class EventBus {
static ANY = Math.random().toString().slice(2)
listeners: Record<string, Array<EventBusHandler>> = {}
addListener(name: string, handler: EventBusHandler) {
this.listeners[name] = this.listeners[name] || ([] as Array<EventBusHandler>)
this.listeners[name].push(handler)
return () => this.removeListener(name, handler)
}
addListeners(config: Record<string, EventBusHandler>) {
const callbacks = [] as Array<() => void>
for (const [name, handler] of Object.entries(config)) {
callbacks.push(this.addListener(name, handler))
}
return () => callbacks.forEach(unsubscribe => unsubscribe())
}
removeListener(name: string, handler: EventBusHandler) {
this.listeners[name] = (this.listeners[name] || []).filter(h => h !== handler)
}
clear() {
this.listeners = {}
}
emit(k: string, ...payload: any) {
for (const handler of this.listeners[k] || []) {
handler(...payload)
}
for (const handler of this.listeners[EventBus.ANY] || []) {
handler(k, ...payload)
}
}
}
+107
View File
@@ -0,0 +1,107 @@
import WebSocket from "isomorphic-ws"
import {EventBus} from "./EventBus"
import {Deferred, defer} from "./Deferred"
export class Socket {
ws?: WebSocket
url: string
ready?: Deferred<void>
timeout?: NodeJS.Timeout
queue: string[]
bus: EventBus
status: string
_onOpen: (e: any) => void
_onMessage: (e: any) => void
_onClose: (e: any) => void
static STATUS = {
NEW: "new",
PENDING: "pending",
CLOSED: "closed",
READY: "ready",
}
constructor(url: string) {
this.ws = undefined
this.url = url
this.ready = undefined
this.timeout = undefined
this.queue = []
this.bus = new EventBus()
this.status = Socket.STATUS.NEW
this._onOpen = e => {
this.status = Socket.STATUS.READY
this.ready?.resolve()
}
this._onMessage = e => {
this.queue.push(e.data as string)
if (!this.timeout) {
this.handleMessagesAsync()
}
}
this._onClose = e => {
this.disconnect()
this.ready?.reject()
this.status = Socket.STATUS.CLOSED
}
}
async connect() {
if ([Socket.STATUS.NEW, Socket.STATUS.CLOSED].includes(this.status)) {
if (this.ws) {
console.error("Attempted to connect when already connected", this)
}
this.ready = defer()
this.ws = new WebSocket(this.url)
this.status = Socket.STATUS.PENDING
this.ws.addEventListener("open", this._onOpen)
this.ws.addEventListener("message", this._onMessage)
this.ws.addEventListener("close", this._onClose)
}
await this.ready?.catch(() => null)
}
disconnect() {
if (this.ws) {
this.ws.close()
this.ws.removeEventListener("open", this._onOpen)
this.ws.removeEventListener("message", this._onMessage)
this.ws.removeEventListener("error", this._onClose)
this.ws.removeEventListener("close", this._onClose)
this.ws = undefined
}
}
handleMessages() {
for (const json of this.queue.splice(0, 10)) {
let message
try {
message = JSON.parse(json)
} catch (e) {
continue
}
this.bus.emit('message', this.url, message)
}
if (this.queue.length > 0) {
this.handleMessagesAsync()
} else {
this.timeout = undefined
}
}
handleMessagesAsync() {
this.timeout = setTimeout(() => this.handleMessages(), 10) as NodeJS.Timeout
}
send(message: any) {
if (this.status === Socket.STATUS.READY) {
if (this.ws?.readyState !== 1) {
console.warn("Send attempted before socket was ready", this)
}
this.ws?.send(JSON.stringify(message))
}
}
}