From 9b6a779397702128f565039e4e663772fe53ecfa Mon Sep 17 00:00:00 2001 From: Jonathan Staab Date: Mon, 27 Mar 2023 15:05:34 -0500 Subject: [PATCH] Improve consistency of interface --- README.md | 46 +++++++++++++++++ lib/Executor.ts | 4 +- lib/Plex.ts | 22 ++++++++ lib/Pool.ts | 19 ++----- lib/Relay.ts | 119 ++++--------------------------------------- lib/RelaySet.ts | 24 --------- lib/Relays.ts | 26 ++++++++++ lib/util/EventBus.ts | 22 +++----- lib/util/Socket.ts | 106 ++++++++++++++++++++++++++++++++++++++ 9 files changed, 223 insertions(+), 165 deletions(-) create mode 100644 README.md create mode 100644 lib/Plex.ts delete mode 100644 lib/RelaySet.ts create mode 100644 lib/Relays.ts create mode 100644 lib/util/Socket.ts diff --git a/README.md b/README.md new file mode 100644 index 0000000..72aa8d6 --- /dev/null +++ b/README.md @@ -0,0 +1,46 @@ +# Paravel + +Another nostr toolkit, focused on creating highly a configurable client system. + +# Utilities + +- [Deferred](./lib/Deferred.ts') is just a promise with `resolve` and `reject` methods. +- [EventBus](./lib/EventBus.ts') is an implementation of an event bus. +- [Socket](./lib/Socket.ts') is a wrapper around isomorphic-ws that handles connection status and json parsing/serialization. + +# Components + +- [Pool](./lib/Pool.ts') is a thin wrapper around `Map` for use with `Relay`s. +- [Executor](./lib/Executor.ts') implements common nostr flows on `target` + +# Executables + +Executables have an event `bus` and a `send` method and are passed to an `Executor` for use. + +- [Relay](./lib/Relay.ts') takes a `Socket` and provides listeners for different verbs. +- [Relays](./lib/Relays.ts') takes an array of `Socket`s and provides listeners for different verbs, merging all events into a single stream. +- [Plex](./lib/Plex.ts') takes an array of urls and a `Socket` and sends and receives wrapped nostr messages over that connection. + +# Example + +Functionality is split into small chunks to allow for changing out implementations as needed. This is useful when attempting to support novel use cases. Here's a simple implementation of an agent that can use a multiplexer if enabled, or can fall back to communicating directly with all relays. + +```javascript +class Agent { + constructor(multiplexerUrl) { + this.multiplexerUrl = multiplexerUrl + this.pool = new Pool() + } + getTarget(urls) { + return this.multiplexerUrl + ? new Plex(urls, this.pool.add(this.multiplexerUrl)) + : new Relays(urls.map(url => this.pool.add(url))) + } + } + subscribe(urls, filters, id, {onEvent, onEose}) { + const executor = new Executor(this.getTarget(urls)) + + return executor.subscribe(filters, id, {onEvent, onEose}) + } +} +``` diff --git a/lib/Executor.ts b/lib/Executor.ts index 6d61933..19391ab 100644 --- a/lib/Executor.ts +++ b/lib/Executor.ts @@ -1,12 +1,12 @@ import type {EventBus} from './util/EventBus.ts' -type ExecutorTarget = { +type Executable = { bus: EventBus send: (verb: string, ...args) => void } export class Executor { - target: ExecutorTarget + target: Executable constructor(target) { this.target = target } diff --git a/lib/Plex.ts b/lib/Plex.ts new file mode 100644 index 0000000..4b976e6 --- /dev/null +++ b/lib/Plex.ts @@ -0,0 +1,22 @@ +import {EventBus} from "./util/EventBus" + +export class Plex { + constructor(urls, socket) { + this.urls = urls + this.socket = socket + this.bus = new EventBus() + this.onMessage = this.onMessage.bind(this) + + this.socket.bus.on('message', this.onMessage) + } + async send(...payload) { + await this.socket.connect() + + this.socket.send([{relays: this.urls}, payload]) + } + onMessage(message) { + const [verb, ...payload] = message[1] + + this.bus.handle(verb, ...payload) + } +} diff --git a/lib/Pool.ts b/lib/Pool.ts index 65ab57a..a429973 100644 --- a/lib/Pool.ts +++ b/lib/Pool.ts @@ -1,9 +1,7 @@ -import {Relay} from "./Relay" - -const normalizeUrl = url => url.replace(/\/+$/, "").toLowerCase().trim() +import {Socket} from "./util/Socket" export class Pool { - relays: Map + relays: Map constructor() { this.relays = new Map() this.interval = setInterval(() => { @@ -13,17 +11,13 @@ export class Pool { }, 30_000) } add(url) { - url = normalizeUrl(url) - if (!this.relays.has(url)) { - this.relays.set(url, new Relay(url)) + this.relays.set(url, new Socket(url)) } return this.relays.get(url) } remove(url) { - url = normalizeUrl(url) - this.relays.get(url)?.disconnect() this.relays.delete(url) } @@ -34,11 +28,4 @@ export class Pool { this.remove(url) } } - async waitFor(url) { - const relay = this.add(url) - - await relay.connect() - - return relay.status === Relay.STATUS.READY ? relay : null - } } diff --git a/lib/Relay.ts b/lib/Relay.ts index 0aa1738..226bfd9 100644 --- a/lib/Relay.ts +++ b/lib/Relay.ts @@ -1,118 +1,21 @@ -import WebSocket from "isomorphic-ws" import {EventBus} from "./util/EventBus" -import {Deferred, defer} from "./util/Deferred" export class Relay { - ws?: WebSocket - url: string - ready?: Deferred - queue: string[] - error: string - status: string - timeout?: NodeJS.Timeout - bus: EventBus - static STATUS = { - NEW: "new", - PENDING: "pending", - CLOSED: "closed", - ERROR: "error", - READY: "ready", - } - static ERROR = { - CONNECTION: "connection", - UNAUTHORIZED: "unauthorized", - FORBIDDEN: "forbidden", - } - constructor(url) { - this.ws = null - this.url = url - this.ready = null - this.queue = [] - this.timeout = null + constructor(socket) { + this.socket = socket this.bus = new EventBus() - this.error = null - this.status = Relay.STATUS.NEW + this.onMessage = this.onMessage.bind(this) + + this.socket.bus.on('message', this.onMessage) } - async connect() { - if (this.status === Relay.STATUS.NEW) { - if (this.ws) { - console.error("Attempted to connect when already connected", this) - } + async send(...payload) { + await this.socket.connect() - this.ready = defer() - this.ws = new WebSocket(this.url) - this.status = Relay.STATUS.PENDING - - this.ws.addEventListener("open", () => { - console.log(`Opened connection to ${this.url}`) - - this.status = Relay.STATUS.READY - this.ready.resolve() - }) - - this.ws.addEventListener("message", e => { - this.queue.push(e.data) - - if (!this.timeout) { - this.timeout = setTimeout(() => this.handleMessages(), 10) - } - }) - - this.ws.addEventListener("error", e => { - console.log(`Error on connection to ${this.url}`) - - this.disconnect() - this.ready.reject() - this.error = Relay.ERROR.CONNECTION - this.status = Relay.STATUS.CLOSED - }) - - this.ws.addEventListener("close", () => { - console.log(`Closed connection to ${this.url}`) - - this.disconnect() - this.ready.reject() - this.status = Relay.STATUS.CLOSED - }) - } - - await this.ready.catch(() => null) + this.socket.send(payload) } - reconnect() { - if (this.status === Relay.STATUS.ERROR) { - this.status = Relay.STATUS.NEW - this.connect() - } - } - disconnect() { - if (this.ws) { - console.log(`Disconnecting from ${this.url}`) + onMessage(message) { + const [verb, ...payload] = message - this.ws.close() - this.ws = null - } - } - handleMessages() { - for (const json of this.queue.splice(0, 10)) { - let message - try { - message = JSON.parse(json) - } catch (e) { - continue - } - - const [verb, ...args] = message - - this.bus.handle(verb, ...args) - } - - this.timeout = this.queue.length > 0 ? setTimeout(() => this.handleMessages(), 10) : null - } - send(...payload) { - if (this.ws?.readyState !== 1) { - console.warn("Send attempted before socket was ready", this) - } - - this.ws.send(JSON.stringify(payload)) + this.bus.handle(verb, ...payload) } } diff --git a/lib/RelaySet.ts b/lib/RelaySet.ts deleted file mode 100644 index 7138f30..0000000 --- a/lib/RelaySet.ts +++ /dev/null @@ -1,24 +0,0 @@ -import type {Relay} from './Relay' -import {EventBus} from './util/EventBus' - -export class RelaySet { - relays: Relay[] - bus: EventBus - constructor(relays) { - this.relays = relays - this.bus = new EventBus() - - relays.forEach(relay => { - relay.bus.pipe(EventBus.ANY, this.bus) - }) - } - send(...payload) { - this.relays.forEach(async relay => { - await relay.connect() - - if (relay.status === Relay.STATUS.READY) { - relay.send(...payload) - } - }) - } -} diff --git a/lib/Relays.ts b/lib/Relays.ts new file mode 100644 index 0000000..ed9214a --- /dev/null +++ b/lib/Relays.ts @@ -0,0 +1,26 @@ +import {Socket} from './util/Socket' +import {EventBus} from './util/EventBus' + +export class Relays { + sockets: Socket[] + bus: EventBus + constructor(sockets) { + this.sockets = sockets + this.bus = new EventBus() + this.onMessage = this.onMessage.bind(this) + + sockets.forEach(socket => socket.bus.on('message', this.onMessage)) + } + send(...payload) { + this.sockets.forEach(socket => { + await socket.connect() + + socket.send(...payload) + }) + } + onMessage(message) { + const [verb, ...payload] = message + + this.bus.handle(verb, ...payload) + } +} diff --git a/lib/util/EventBus.ts b/lib/util/EventBus.ts index 83741b0..5d1009b 100644 --- a/lib/util/EventBus.ts +++ b/lib/util/EventBus.ts @@ -1,32 +1,24 @@ export type EventBusHandler = (...args: any[]) => void -export type EventBusListener = { - id: string - handler: EventBusHandler -} export class EventBus { static ANY = Math.random().toString().slice(2) - listeners: Record> = {} + listeners: Record> = {} on(name: string, handler: EventBusHandler) { - const id = Math.random().toString().slice(2) - - this.listeners[name] = this.listeners[name] || ([] as Array) - this.listeners[name].push({id, handler}) - - return id + this.listeners[name] = this.listeners[name] || ([] as Array) + this.listeners[name].push(handler) } - off(name: string, id: string) { - this.listeners[name] = this.listeners[name].filter(l => l.id !== id) + off(name: string, handler: EventBusHandler) { + this.listeners[name] = this.listeners[name].filter(h => h !== handler) } clear() { this.listeners = {} } handle(k: string, ...payload: any) { - for (const {handler} of this.listeners[k] || []) { + for (const handler of this.listeners[k] || []) { handler(...payload) } - for (const {handler} of this.listeners[EventBus.ANY] || []) { + for (const handler of this.listeners[EventBus.ANY] || []) { handler(k, ...payload) } } diff --git a/lib/util/Socket.ts b/lib/util/Socket.ts new file mode 100644 index 0000000..b570b5a --- /dev/null +++ b/lib/util/Socket.ts @@ -0,0 +1,106 @@ +import WebSocket from "isomorphic-ws" +import {EventBus} from "./EventBus" +import {Deferred, defer} from "./Deferred" + +export class Socket { + ws?: WebSocket + url: string + ready?: Deferred + timeout?: NodeJS.Timeout + queue: string[] + bus: EventBus + status: string + static STATUS = { + NEW: "new", + PENDING: "pending", + CLOSED: "closed", + READY: "ready", + } + constructor(url: string) { + this.ws = undefined + this.url = url + this.ready = undefined + this.timeout = undefined + this.queue = [] + this.bus = new EventBus() + this.status = Socket.STATUS.NEW + } + async connect() { + if ([Socket.STATUS.NEW, Socket.STATUS.CLOSED].includes(this.status)) { + if (this.ws) { + console.error("Attempted to connect when already connected", this) + } + + this.ready = defer() + this.ws = new WebSocket(this.url) + this.status = Socket.STATUS.PENDING + + this.ws.addEventListener("open", () => { + console.log(`Opened connection to ${this.url}`) + + this.status = Socket.STATUS.READY + this.ready?.resolve() + }) + + this.ws.addEventListener("message", e => { + this.queue.push(e.data as string) + + if (!this.timeout) { + this.timeout = this.handleMessagesAsync() + } + }) + + this.ws.addEventListener("error", e => { + console.log(`Error on connection to ${this.url}`) + + this.disconnect() + this.ready?.reject() + this.status = Socket.STATUS.CLOSED + }) + + this.ws.addEventListener("close", () => { + console.log(`Closed connection to ${this.url}`) + + this.disconnect() + this.ready?.reject() + this.status = Socket.STATUS.CLOSED + }) + } + + await this.ready?.catch(() => null) + } + disconnect() { + if (this.ws) { + console.log(`Disconnecting from ${this.url}`) + + this.ws.close() + this.ws = undefined + } + } + handleMessages() { + for (const json of this.queue.splice(0, 10)) { + let message + try { + message = JSON.parse(json) + } catch (e) { + continue + } + + this.bus.handle('message', message) + } + + this.timeout = this.queue.length > 0 ? this.handleMessagesAsync() : undefined + } + handleMessagesAsync() { + return setTimeout(() => this.handleMessages(), 10) as NodeJS.Timeout + } + send(message: any) { + if (this.status === Socket.STATUS.READY) { + if (this.ws?.readyState !== 1) { + console.warn("Send attempted before socket was ready", this) + } + + this.ws?.send(JSON.stringify(message)) + } + } +}