diff --git a/.fdignore b/.fdignore index 3c3629e..f06235c 100644 --- a/.fdignore +++ b/.fdignore @@ -1 +1,2 @@ node_modules +dist diff --git a/lib/Executor.ts b/lib/Executor.ts new file mode 100644 index 0000000..6d61933 --- /dev/null +++ b/lib/Executor.ts @@ -0,0 +1,56 @@ +import type {EventBus} from './util/EventBus.ts' + +type ExecutorTarget = { + bus: EventBus + send: (verb: string, ...args) => void +} + +export class Executor { + target: ExecutorTarget + constructor(target) { + this.target = target + } + subscribe(filters, id, {onEvent, onEose}) { + const [eventChannel, eoseChannel] = [ + this.target.bus.on("EVENT", (subid, e) => subid === id && onEvent?.(e)), + this.target.bus.on("EOSE", subid => subid === id && onEose?.()), + ] + + this.target.send("REQ", id, ...filters) + + return { + unsubscribe: () => { + this.target.send("CLOSE", id) + this.target.bus.off("EVENT", eventChannel) + this.target.bus.off("EOSE", eoseChannel) + }, + } + } + publish(event, {onOk, onError}) { + const withCleanup = cb => (id, ...payload) => { + if (id === event.id) { + cb(id, ...payload) + this.target.bus.off("OK", okChannel) + this.target.bus.off("ERROR", errorChannel) + } + } + + const [okChannel, errorChannel] = [ + this.target.bus.on("OK", withCleanup(onOk)), + this.target.bus.on("ERROR", withCleanup(onError)), + ] + + this.target.send("EVENT", event) + } + count(filter, id, {onCount}) { + const channel = this.target.bus.on("COUNT", (subid, ...payload) => { + if (subid === id) { + onCount(...payload) + + this.target.bus.off("COUNT", channel) + } + }) + + this.target.send("COUNT", id, ...filter) + } +} diff --git a/lib/Pool.ts b/lib/Pool.ts index 03fd92b..65ab57a 100644 --- a/lib/Pool.ts +++ b/lib/Pool.ts @@ -6,6 +6,11 @@ export class Pool { relays: Map constructor() { this.relays = new Map() + this.interval = setInterval(() => { + for (const relay of this.relays) { + relay.reconnect() + } + }, 30_000) } add(url) { url = normalizeUrl(url) @@ -22,6 +27,13 @@ export class Pool { this.relays.get(url)?.disconnect() this.relays.delete(url) } + cleanup() { + this.interval = clearInterval(this.interval) + + for (const url of this.relays.keys()) { + this.remove(url) + } + } async waitFor(url) { const relay = this.add(url) @@ -29,19 +41,4 @@ export class Pool { return relay.status === Relay.STATUS.READY ? relay : null } - async execute(urls, callback) { - const results = await Promise.all([ - urls.map(async url => { - const relay = await this.waitFor(url) - - if (!relay) { - return null - } - - return [relay, callback(relay)] - }), - ]) - - return results.filter(Boolean) - } } diff --git a/lib/Relay.ts b/lib/Relay.ts index 0ed003c..0aa1738 100644 --- a/lib/Relay.ts +++ b/lib/Relay.ts @@ -1,6 +1,6 @@ import WebSocket from "isomorphic-ws" -import {EventBus} from "./EventBus" -import {Deferred, defer} from "./Deferred" +import {EventBus} from "./util/EventBus" +import {Deferred, defer} from "./util/Deferred" export class Relay { ws?: WebSocket @@ -78,6 +78,12 @@ export class Relay { await this.ready.catch(() => null) } + reconnect() { + if (this.status === Relay.STATUS.ERROR) { + this.status = Relay.STATUS.NEW + this.connect() + } + } disconnect() { if (this.ws) { console.log(`Disconnecting from ${this.url}`) @@ -109,51 +115,4 @@ export class Relay { this.ws.send(JSON.stringify(payload)) } - subscribe(filters, id, {onEvent, onEose}) { - const [eventChannel, eoseChannel] = [ - this.bus.on("EVENT", (subid, e) => subid === id && onEvent?.(e)), - this.bus.on("EOSE", subid => subid === id && onEose?.()), - ] - - this.send("REQ", id, ...filters) - - return { - conn: this, - unsub: () => { - if (this.status === Relay.STATUS.READY) { - this.send("CLOSE", id) - } - - this.bus.off("EVENT", eventChannel) - this.bus.off("EOSE", eoseChannel) - }, - } - } - publish(event, {onOk, onError}) { - const withCleanup = cb => k => { - if (k === event.id) { - cb() - this.bus.off("OK", okChannel) - this.bus.off("ERROR", errorChannel) - } - } - - const [okChannel, errorChannel] = [ - this.bus.on("OK", withCleanup(onOk)), - this.bus.on("ERROR", withCleanup(onError)), - ] - - this.send("EVENT", event) - } - count(filter, id, {onCount}) { - const channel = this.bus.on("COUNT", (subid, ...payload) => { - if (subid === id) { - onCount(...payload) - - this.bus.off("COUNT", channel) - } - }) - - this.send("COUNT", id, ...filter) - } } diff --git a/lib/RelaySet.ts b/lib/RelaySet.ts new file mode 100644 index 0000000..7138f30 --- /dev/null +++ b/lib/RelaySet.ts @@ -0,0 +1,24 @@ +import type {Relay} from './Relay' +import {EventBus} from './util/EventBus' + +export class RelaySet { + relays: Relay[] + bus: EventBus + constructor(relays) { + this.relays = relays + this.bus = new EventBus() + + relays.forEach(relay => { + relay.bus.pipe(EventBus.ANY, this.bus) + }) + } + send(...payload) { + this.relays.forEach(async relay => { + await relay.connect() + + if (relay.status === Relay.STATUS.READY) { + relay.send(...payload) + } + }) + } +} diff --git a/lib/main.ts b/lib/main.ts index 50f5c91..88e8d75 100644 --- a/lib/main.ts +++ b/lib/main.ts @@ -1,4 +1,4 @@ -export * from "./EventBus" -export * from "./Deferred" +export * from "./util/EventBus" +export * from "./util/Deferred" export * from "./Relay" export * from "./Pool" diff --git a/lib/Deferred.ts b/lib/util/Deferred.ts similarity index 83% rename from lib/Deferred.ts rename to lib/util/Deferred.ts index 5318319..e8fb93f 100644 --- a/lib/Deferred.ts +++ b/lib/util/Deferred.ts @@ -10,5 +10,5 @@ export const defer = (): Deferred => { reject = reject_ }) - return Object.assign(p, {resolve, reject}) + return Object.assign(p, {resolve, reject}) as any } diff --git a/lib/EventBus.ts b/lib/util/EventBus.ts similarity index 50% rename from lib/EventBus.ts rename to lib/util/EventBus.ts index 66e1889..83741b0 100644 --- a/lib/EventBus.ts +++ b/lib/util/EventBus.ts @@ -1,11 +1,13 @@ +export type EventBusHandler = (...args: any[]) => void export type EventBusListener = { id: string - handler: (...args: any[]) => void + handler: EventBusHandler } export class EventBus { + static ANY = Math.random().toString().slice(2) listeners: Record> = {} - on(name, handler) { + on(name: string, handler: EventBusHandler) { const id = Math.random().toString().slice(2) this.listeners[name] = this.listeners[name] || ([] as Array) @@ -13,12 +15,22 @@ export class EventBus { return id } - off(name, id) { + off(name: string, id: string) { this.listeners[name] = this.listeners[name].filter(l => l.id !== id) } - handle(k, ...payload) { + clear() { + this.listeners = {} + } + handle(k: string, ...payload: any) { for (const {handler} of this.listeners[k] || []) { handler(...payload) } + + for (const {handler} of this.listeners[EventBus.ANY] || []) { + handler(k, ...payload) + } + } + pipe(k: string, bus: EventBus) { + this.on(k, (...payload: any[]) => bus.handle(k, ...payload)) } } diff --git a/package.json b/package.json index 20db854..fb6ff4f 100644 --- a/package.json +++ b/package.json @@ -15,7 +15,7 @@ "scripts": { "build": "node build.js", "pub": "npm i && node build.js && npm publish", - "check:ts": "tsc --noEmit --esModuleInterop lib/*", + "check:ts": "tsc --noEmit --esModuleInterop --strict lib/**/*", "check:es": "eslint lib/*", "check": "run-p check:*" },