Add Executor and RelaySet

This commit is contained in:
Jonathan Staab
2023-03-27 13:39:48 -05:00
parent 4b362e3061
commit 853b42c1c9
9 changed files with 121 additions and 72 deletions
+1
View File
@@ -1 +1,2 @@
node_modules node_modules
dist
+56
View File
@@ -0,0 +1,56 @@
import type {EventBus} from './util/EventBus.ts'
type ExecutorTarget = {
bus: EventBus
send: (verb: string, ...args) => void
}
export class Executor {
target: ExecutorTarget
constructor(target) {
this.target = target
}
subscribe(filters, id, {onEvent, onEose}) {
const [eventChannel, eoseChannel] = [
this.target.bus.on("EVENT", (subid, e) => subid === id && onEvent?.(e)),
this.target.bus.on("EOSE", subid => subid === id && onEose?.()),
]
this.target.send("REQ", id, ...filters)
return {
unsubscribe: () => {
this.target.send("CLOSE", id)
this.target.bus.off("EVENT", eventChannel)
this.target.bus.off("EOSE", eoseChannel)
},
}
}
publish(event, {onOk, onError}) {
const withCleanup = cb => (id, ...payload) => {
if (id === event.id) {
cb(id, ...payload)
this.target.bus.off("OK", okChannel)
this.target.bus.off("ERROR", errorChannel)
}
}
const [okChannel, errorChannel] = [
this.target.bus.on("OK", withCleanup(onOk)),
this.target.bus.on("ERROR", withCleanup(onError)),
]
this.target.send("EVENT", event)
}
count(filter, id, {onCount}) {
const channel = this.target.bus.on("COUNT", (subid, ...payload) => {
if (subid === id) {
onCount(...payload)
this.target.bus.off("COUNT", channel)
}
})
this.target.send("COUNT", id, ...filter)
}
}
+12 -15
View File
@@ -6,6 +6,11 @@ export class Pool {
relays: Map<string, Relay> relays: Map<string, Relay>
constructor() { constructor() {
this.relays = new Map() this.relays = new Map()
this.interval = setInterval(() => {
for (const relay of this.relays) {
relay.reconnect()
}
}, 30_000)
} }
add(url) { add(url) {
url = normalizeUrl(url) url = normalizeUrl(url)
@@ -22,6 +27,13 @@ export class Pool {
this.relays.get(url)?.disconnect() this.relays.get(url)?.disconnect()
this.relays.delete(url) this.relays.delete(url)
} }
cleanup() {
this.interval = clearInterval(this.interval)
for (const url of this.relays.keys()) {
this.remove(url)
}
}
async waitFor(url) { async waitFor(url) {
const relay = this.add(url) const relay = this.add(url)
@@ -29,19 +41,4 @@ export class Pool {
return relay.status === Relay.STATUS.READY ? relay : null return relay.status === Relay.STATUS.READY ? relay : null
} }
async execute(urls, callback) {
const results = await Promise.all([
urls.map(async url => {
const relay = await this.waitFor(url)
if (!relay) {
return null
}
return [relay, callback(relay)]
}),
])
return results.filter(Boolean)
}
} }
+8 -49
View File
@@ -1,6 +1,6 @@
import WebSocket from "isomorphic-ws" import WebSocket from "isomorphic-ws"
import {EventBus} from "./EventBus" import {EventBus} from "./util/EventBus"
import {Deferred, defer} from "./Deferred" import {Deferred, defer} from "./util/Deferred"
export class Relay { export class Relay {
ws?: WebSocket ws?: WebSocket
@@ -78,6 +78,12 @@ export class Relay {
await this.ready.catch(() => null) await this.ready.catch(() => null)
} }
reconnect() {
if (this.status === Relay.STATUS.ERROR) {
this.status = Relay.STATUS.NEW
this.connect()
}
}
disconnect() { disconnect() {
if (this.ws) { if (this.ws) {
console.log(`Disconnecting from ${this.url}`) console.log(`Disconnecting from ${this.url}`)
@@ -109,51 +115,4 @@ export class Relay {
this.ws.send(JSON.stringify(payload)) this.ws.send(JSON.stringify(payload))
} }
subscribe(filters, id, {onEvent, onEose}) {
const [eventChannel, eoseChannel] = [
this.bus.on("EVENT", (subid, e) => subid === id && onEvent?.(e)),
this.bus.on("EOSE", subid => subid === id && onEose?.()),
]
this.send("REQ", id, ...filters)
return {
conn: this,
unsub: () => {
if (this.status === Relay.STATUS.READY) {
this.send("CLOSE", id)
}
this.bus.off("EVENT", eventChannel)
this.bus.off("EOSE", eoseChannel)
},
}
}
publish(event, {onOk, onError}) {
const withCleanup = cb => k => {
if (k === event.id) {
cb()
this.bus.off("OK", okChannel)
this.bus.off("ERROR", errorChannel)
}
}
const [okChannel, errorChannel] = [
this.bus.on("OK", withCleanup(onOk)),
this.bus.on("ERROR", withCleanup(onError)),
]
this.send("EVENT", event)
}
count(filter, id, {onCount}) {
const channel = this.bus.on("COUNT", (subid, ...payload) => {
if (subid === id) {
onCount(...payload)
this.bus.off("COUNT", channel)
}
})
this.send("COUNT", id, ...filter)
}
} }
+24
View File
@@ -0,0 +1,24 @@
import type {Relay} from './Relay'
import {EventBus} from './util/EventBus'
export class RelaySet {
relays: Relay[]
bus: EventBus
constructor(relays) {
this.relays = relays
this.bus = new EventBus()
relays.forEach(relay => {
relay.bus.pipe(EventBus.ANY, this.bus)
})
}
send(...payload) {
this.relays.forEach(async relay => {
await relay.connect()
if (relay.status === Relay.STATUS.READY) {
relay.send(...payload)
}
})
}
}
+2 -2
View File
@@ -1,4 +1,4 @@
export * from "./EventBus" export * from "./util/EventBus"
export * from "./Deferred" export * from "./util/Deferred"
export * from "./Relay" export * from "./Relay"
export * from "./Pool" export * from "./Pool"
+1 -1
View File
@@ -10,5 +10,5 @@ export const defer = (): Deferred<any> => {
reject = reject_ reject = reject_
}) })
return Object.assign(p, {resolve, reject}) return Object.assign(p, {resolve, reject}) as any
} }
+16 -4
View File
@@ -1,11 +1,13 @@
export type EventBusHandler = (...args: any[]) => void
export type EventBusListener = { export type EventBusListener = {
id: string id: string
handler: (...args: any[]) => void handler: EventBusHandler
} }
export class EventBus { export class EventBus {
static ANY = Math.random().toString().slice(2)
listeners: Record<string, Array<EventBusListener>> = {} listeners: Record<string, Array<EventBusListener>> = {}
on(name, handler) { on(name: string, handler: EventBusHandler) {
const id = Math.random().toString().slice(2) const id = Math.random().toString().slice(2)
this.listeners[name] = this.listeners[name] || ([] as Array<EventBusListener>) this.listeners[name] = this.listeners[name] || ([] as Array<EventBusListener>)
@@ -13,12 +15,22 @@ export class EventBus {
return id return id
} }
off(name, id) { off(name: string, id: string) {
this.listeners[name] = this.listeners[name].filter(l => l.id !== id) this.listeners[name] = this.listeners[name].filter(l => l.id !== id)
} }
handle(k, ...payload) { clear() {
this.listeners = {}
}
handle(k: string, ...payload: any) {
for (const {handler} of this.listeners[k] || []) { for (const {handler} of this.listeners[k] || []) {
handler(...payload) handler(...payload)
} }
for (const {handler} of this.listeners[EventBus.ANY] || []) {
handler(k, ...payload)
}
}
pipe(k: string, bus: EventBus) {
this.on(k, (...payload: any[]) => bus.handle(k, ...payload))
} }
} }
+1 -1
View File
@@ -15,7 +15,7 @@
"scripts": { "scripts": {
"build": "node build.js", "build": "node build.js",
"pub": "npm i && node build.js && npm publish", "pub": "npm i && node build.js && npm publish",
"check:ts": "tsc --noEmit --esModuleInterop lib/*", "check:ts": "tsc --noEmit --esModuleInterop --strict lib/**/*",
"check:es": "eslint lib/*", "check:es": "eslint lib/*",
"check": "run-p check:*" "check": "run-p check:*"
}, },