From bc21228f047dc4ba44808c4fe3f6d8a245f0e367 Mon Sep 17 00:00:00 2001 From: Jon Staab Date: Tue, 26 Mar 2024 11:35:30 -0700 Subject: [PATCH] replace queue with worker --- README.md | 2 +- packages/lib/Queue.ts | 57 --------------- packages/lib/Worker.ts | 71 +++++++++++++++++++ packages/lib/index.ts | 2 +- packages/network/Connection.ts | 126 ++++++++++++++++----------------- packages/util/Relays.ts | 10 ++- packages/util/Tags.ts | 2 +- 7 files changed, 144 insertions(+), 126 deletions(-) delete mode 100644 packages/lib/Queue.ts create mode 100644 packages/lib/Worker.ts diff --git a/README.md b/README.md index c9ad78c..9423ad9 100644 --- a/README.md +++ b/README.md @@ -12,7 +12,7 @@ Some general-purpose utilities used elsewhere in paravel. - `Emitter` extends EventEmitter to support `emitter.on('*', ...)`. - `Fluent` is a wrapper around arrays with chained methods that modify and copy the underlying array. - `LRUCache` is an implementation of an LRU cache. -- `Queue` is an implementation of an asynchronous queue. +- `Worker` is an implementation of an asynchronous queue. - `Tools` is a collection of general-purpose utility functions. ## @coracle.social/util diff --git a/packages/lib/Queue.ts b/packages/lib/Queue.ts deleted file mode 100644 index 9c2d9d6..0000000 --- a/packages/lib/Queue.ts +++ /dev/null @@ -1,57 +0,0 @@ -export class Queue { - timeout?: number - messages: any[] = [] - - clear() { - this.messages = [] - } - - push(message: any) { - this.messages.push(message) - this.enqueueWork() - } - - handle(message: any) { - throw new Error("Not implemented") - } - - shouldSend(message: any) { - return true - } - - doWork() { - for (let i = 0; i < 10; i++) { - if (this.messages.length === 0) { - break - } - - // Pop the messages one at a time so handle can modify the queue - const [message] = this.messages.splice(0, 1) - - if (this.shouldSend(message)) { - this.handle(message) - } else { - this.messages.push(message) - } - } - - this.timeout = undefined - this.enqueueWork() - } - - enqueueWork() { - if (this.timeout) { - return - } - - if (this.messages.length === 0) { - return - } - - this.timeout = setTimeout(() => this.doWork(), 100) - } - - stop() { - clearTimeout(this.timeout) - } -} diff --git a/packages/lib/Worker.ts b/packages/lib/Worker.ts new file mode 100644 index 0000000..81c705d --- /dev/null +++ b/packages/lib/Worker.ts @@ -0,0 +1,71 @@ +const ANY = Symbol("worker/ANY") + +export type WorkerOpts = { + getKey?: (x: T) => any + shouldDefer?: (x: T) => boolean +} + +export class Worker { + buffer: T[] = [] + handlers: Map void>> = new Map() + timeout: number | undefined + + constructor(readonly opts: WorkerOpts = {}) {} + + #doWork = async () => { + for (let i = 0; i < 50; i++) { + if (this.buffer.length === 0) { + break + } + + // Pop the buffer one at a time so handle can modify the queue + const [message] = this.buffer.splice(0, 1) + + if (this.opts.shouldDefer?.(message)) { + this.buffer.push(message) + } else { + for (const handler of this.handlers.get(ANY) || []) { + await handler(message) + } + + if (this.opts.getKey) { + const k = this.opts.getKey(message) + + for (const handler of this.handlers.get(k) || []) { + await handler(message) + } + } + } + } + + this.timeout = undefined + this.#enqueueWork() + } + + #enqueueWork = () => { + if (!this.timeout && this.buffer.length > 0) { + this.timeout = setTimeout(this.#doWork, 50) + } + } + + push = (message: T) => { + this.buffer.push(message) + this.#enqueueWork() + } + + addHandler = (k: any, handler: (message: T) => void) => { + this.handlers.set(k, (this.handlers.get(k) || []).concat(handler)) + } + + addGlobalHandler = (handler: (message: T) => void) => { + this.addHandler(ANY, handler) + } + + clear() { + this.buffer = [] + } + + stop() { + clearTimeout(this.timeout) + } +} diff --git a/packages/lib/index.ts b/packages/lib/index.ts index c06caad..4e5566a 100644 --- a/packages/lib/index.ts +++ b/packages/lib/index.ts @@ -3,6 +3,6 @@ export * from './Deferred' export * from './Emitter' export * from './Fluent' export * from './LRUCache' -export * from './Queue' +export * from './Worker' export * from './Tools' export {default as normalizeUrl} from './normalize-url' diff --git a/packages/network/Connection.ts b/packages/network/Connection.ts index b4411b5..6471eae 100644 --- a/packages/network/Connection.ts +++ b/packages/network/Connection.ts @@ -1,66 +1,13 @@ -import {Emitter, Queue} from '@coracle.social/lib' +import {Emitter, Worker} from '@coracle.social/lib' import {AuthStatus, ConnectionMeta} from './ConnectionMeta' import {Socket, isMessage, asMessage} from './Socket' import type {SocketMessage} from './Socket' -class SendQueue extends Queue { - constructor(readonly cxn: Connection) { - super() - } - - shouldSend(message: SocketMessage) { - if (!this.cxn.socket.isReady()) { - return false - } - - const [verb, ...extra] = asMessage(message) - - if (['AUTH', 'CLOSE'].includes(verb)) { - return true - } - - // Allow relay requests through - if (verb === 'EVENT' && extra[0].kind === 28934) { - return true - } - - // Only defer for auth if we're not multiplexing - if (isMessage(message) && ![AuthStatus.Ok, AuthStatus.Pending].includes(this.cxn.meta.authStatus)) { - return false - } - - if (verb === 'REQ') { - return this.cxn.meta.pendingRequests.size < 8 - } - - return true - } - - handle(message: SocketMessage) { - // If we ended up handling a CLOSE before we handled the REQ, don't send the REQ - if (message[0] === 'CLOSE') { - this.messages = this.messages.filter(m => !(m[0] === 'REQ' && m[1] === message[1])) - } - - this.cxn.onSend(message) - } -} - -class ReceiveQueue extends Queue { - constructor(readonly cxn: Connection) { - super() - } - - handle(message: SocketMessage) { - this.cxn.onReceive(message) - } -} - export class Connection extends Emitter { url: string socket: Socket - sendQueue: SendQueue - receiveQueue: ReceiveQueue + sender: Worker + receiver: Worker meta: ConnectionMeta constructor(url: string) { @@ -68,13 +15,64 @@ export class Connection extends Emitter { this.url = url this.socket = new Socket(url, this) - this.sendQueue = new SendQueue(this) - this.receiveQueue = new ReceiveQueue(this) + this.sender = this.createSender() + this.receiver = this.createReceiver() this.meta = new ConnectionMeta(this) this.setMaxListeners(100) } - send = (m: SocketMessage) => this.sendQueue.push(m) + createSender = () => { + const worker = new Worker({ + shouldDefer: (message: SocketMessage) => { + if (!this.socket.isReady()) { + return true + } + + const [verb, ...extra] = asMessage(message) + + if (['AUTH', 'CLOSE'].includes(verb)) { + return false + } + + // Allow relay requests through + if (verb === 'EVENT' && extra[0].kind === 28934) { + return false + } + + // Only defer for auth if we're not multiplexing + if (isMessage(message) && ![AuthStatus.Ok, AuthStatus.Pending].includes(this.meta.authStatus)) { + return true + } + + if (verb === 'REQ') { + return this.meta.pendingRequests.size >= 8 + } + + return false + } + }) + + worker.addGlobalHandler((message: SocketMessage) => { + // If we ended up handling a CLOSE before we handled the REQ, don't send the REQ + if (message[0] === 'CLOSE') { + worker.buffer = worker.buffer.filter(m => !(m[0] === 'REQ' && m[1] === message[1])) + } + + this.onSend(message) + }) + + return worker + } + + createReceiver = () => { + const worker = new Worker() + + worker.addGlobalHandler(this.onReceive) + + return worker + } + + send = (m: SocketMessage) => this.sender.push(m) onOpen = () => this.emit('open', this) @@ -82,7 +80,7 @@ export class Connection extends Emitter { onError = () => this.emit('fault', this) - onMessage = (m: SocketMessage) => this.receiveQueue.push(m) + onMessage = (m: SocketMessage) => this.receiver.push(m) onSend = (message: SocketMessage) => { this.emit('send', this, message) @@ -105,15 +103,15 @@ export class Connection extends Emitter { disconnect() { this.socket.disconnect() - this.sendQueue.clear() - this.receiveQueue.clear() + this.sender.clear() + this.receiver.clear() this.meta.clearPending() } destroy() { this.disconnect() this.removeAllListeners() - this.sendQueue.stop() - this.receiveQueue.stop() + this.sender.stop() + this.receiver.stop() } } diff --git a/packages/util/Relays.ts b/packages/util/Relays.ts index 4b072bc..7eee96a 100644 --- a/packages/util/Relays.ts +++ b/packages/util/Relays.ts @@ -17,7 +17,13 @@ export const isShareableRelayUrl = (url: string) => !url.slice(6).match(/\/npub/) ) -export const normalizeRelayUrl = (url: string) => { +type NormalizeRelayUrlOpts = { + allowInsecure?: boolean +} + +export const normalizeRelayUrl = (url: string, {allowInsecure = false}: NormalizeRelayUrlOpts = {}) => { + const prefix = allowInsecure ? url.match(/^wss?:\/\//)?.[0] || "wss://" : "wss://" + // Use our library to normalize url = normalizeUrl(url, {stripHash: true, stripAuthentication: false}) @@ -29,5 +35,5 @@ export const normalizeRelayUrl = (url: string) => { url += "/" } - return "wss://" + url + return prefix + url } diff --git a/packages/util/Tags.ts b/packages/util/Tags.ts index fd24d0b..c8d1f2f 100644 --- a/packages/util/Tags.ts +++ b/packages/util/Tags.ts @@ -78,7 +78,7 @@ export class Tags extends (Fluent as OmitStatics, 'from' entries = () => this.mapTo(t => t.entry()) - relays = () => this.flatMap((t: Tag) => t.valueOf().filter(isShareableRelayUrl).map(normalizeRelayUrl)).uniq() + relays = () => this.flatMap((t: Tag) => t.valueOf().filter(isShareableRelayUrl).map(url => normalizeRelayUrl(url))).uniq() topics = () => this.whereKey("t").values().map((t: string) => t.replace(/^#/, ""))