From 44123426f2f1eae655be0f8e587b721418dab7f8 Mon Sep 17 00:00:00 2001 From: Jon Staab Date: Fri, 21 Mar 2025 16:16:40 -0700 Subject: [PATCH] flesh out publish --- packages/net2/src/adapter.ts | 1 + packages/net2/src/diff.ts | 16 ++++-- packages/net2/src/publish.ts | 99 ++++++++++++++++++++++++++++------ packages/net2/src/socket.ts | 1 + packages/net2/src/subscribe.ts | 52 +++++++++++------- 5 files changed, 129 insertions(+), 40 deletions(-) diff --git a/packages/net2/src/adapter.ts b/packages/net2/src/adapter.ts index c84bd8b..05502bb 100644 --- a/packages/net2/src/adapter.ts +++ b/packages/net2/src/adapter.ts @@ -21,6 +21,7 @@ export abstract class AbstractAdapter extends (EventEmitter as new () => TypedEm abstract send(message: ClientMessage): void cleanup() { + this.removeAllListeners() this._unsubscribers.splice(0).forEach(call) } } diff --git a/packages/net2/src/diff.ts b/packages/net2/src/diff.ts index f62a4f4..c000075 100644 --- a/packages/net2/src/diff.ts +++ b/packages/net2/src/diff.ts @@ -1,7 +1,13 @@ import {EventEmitter} from "events" import {on, randomId} from "@welshman/lib" import {SignedEvent, Filter} from "@welshman/util" -import {RelayMessage, isRelayNegErr, isRelayNegMsg} from "./message.js" +import { + RelayMessage, + isRelayNegErr, + isRelayNegMsg, + RelayMessageType, + ClientMessageType, +} from "./message.js" import {AbstractAdapter, AdapterEventType} from "./adapter.js" import {Negentropy, NegentropyStorageVector} from "./negentropy.js" import {TypedEmitter} from "./util.js" @@ -52,7 +58,7 @@ export class Diff extends (EventEmitter as new () => TypedEmitter) { this.emit(DiffEventType.Message, {have, need}, url) if (newMsg) { - adapter.send(["NEG-MSG", this._id, newMsg]) + adapter.send([RelayMessageType.NegMsg, this._id, newMsg]) } else { this.close() } @@ -70,17 +76,17 @@ export class Diff extends (EventEmitter as new () => TypedEmitter) { ) neg.initiate().then((msg: string) => { - adapter.send(["NEG-OPEN", this._id, filter, msg]) + adapter.send([ClientMessageType.NegOpen, this._id, filter, msg]) }) } close() { if (this._closed) return - this.adapter.send(["NEG-CLOSE", this._id]) + this.adapter.send([ClientMessageType.NegClose, this._id]) this.emit(DiffEventType.Close) + this.removeAllListeners() this._unsubscriber() - this._closed = true } } diff --git a/packages/net2/src/publish.ts b/packages/net2/src/publish.ts index 2dd1459..b677c59 100644 --- a/packages/net2/src/publish.ts +++ b/packages/net2/src/publish.ts @@ -1,46 +1,115 @@ import {EventEmitter} from "events" -import {on} from "@welshman/lib" +import {on, sleep, yieldThread} from "@welshman/lib" import {SignedEvent} from "@welshman/util" -import {RelayMessage, isRelayOk} from "./message.js" +import {RelayMessage, ClientMessageType, isRelayOk} from "./message.js" import {AbstractAdapter, AdapterEventType} from "./adapter.js" import {TypedEmitter} from "./util.js" -export enum PublishEventType { - Ok = "publish:event:ok", +export enum PublishStatus { + Pending = "publish:status:pending", + Success = "publish:status:success", + Failure = "publish:status:failure", + Timeout = "publish:status:timeout", + Aborted = "publish:status:aborted", } export type PublishEvents = { - [PublishEventType.Ok]: (id: string, ok: boolean, detail: string, url: string) => void + [PublishStatus.Pending]: (url: string) => void + [PublishStatus.Success]: (id: string, detail: string, url: string) => void + [PublishStatus.Failure]: (id: string, detail: string, url: string) => void + [PublishStatus.Timeout]: (url: string) => void + [PublishStatus.Aborted]: (url: string) => void +} + +export type PublishOptions = { + adapter: AbstractAdapter + event: SignedEvent + timeout?: number + events?: Partial } export class Publish extends (EventEmitter as new () => TypedEmitter) { + status = new Map() + + _done = new Set() _unsubscriber: () => void - constructor( - readonly adapter: AbstractAdapter, - readonly event: SignedEvent, - readonly verb = "EVENT", - ) { + constructor(readonly options: PublishOptions) { super() + // Listen for publish result this._unsubscriber = on( - adapter, + this.options.adapter, AdapterEventType.Receive, (message: RelayMessage, url: string) => { if (isRelayOk(message)) { const [_, id, ok, detail] = message - if (id === event.id) { - this.emit(PublishEventType.Ok, id, ok, detail, url) + if (id !== this.options.event.id) return + + if (ok) { + this.status.set(url, PublishStatus.Success) + this.emit(PublishStatus.Success, id, detail, url) + } else { + this.status.set(url, PublishStatus.Failure) + this.emit(PublishStatus.Failure, id, detail, url) + } + + if (!Array.from(this.status.values()).includes(PublishStatus.Pending)) { + this.cleanup() } } }, ) - adapter.send([verb, event]) + // Register handlers + if (this.options.events) { + for (const [k, listener] of Object.entries(this.options.events)) { + this.on(k as keyof PublishEvents, listener) + } + } + + // Autostart asynchronously so the caller can set up listeners + yieldThread().then(this.start) } - close() { + start = () => { + // Initialize status + for (const url of this.options.adapter.urls) { + this.status.set(url, PublishStatus.Pending) + this.emit(PublishStatus.Pending, url) + } + + // Set timeout + sleep(this.options.timeout || 10_000).then(() => { + for (const [url, status] of this.status.entries()) { + if (status === PublishStatus.Pending) { + this.status.set(url, PublishStatus.Timeout) + this.emit(PublishStatus.Timeout, url) + } + } + + this.cleanup() + }) + + // Send the publish message + this.options.adapter.send([ClientMessageType.Event, event]) + } + + abort = () => { + for (const [url, status] of this.status.entries()) { + if (status === PublishStatus.Pending) { + this.status.set(url, PublishStatus.Aborted) + this.emit(PublishStatus.Aborted, url) + } + } + + this.cleanup() + } + + cleanup = () => { + this.options.adapter.cleanup() + this.removeAllListeners() this._unsubscriber() } } diff --git a/packages/net2/src/socket.ts b/packages/net2/src/socket.ts index 98103fc..ad463af 100644 --- a/packages/net2/src/socket.ts +++ b/packages/net2/src/socket.ts @@ -109,6 +109,7 @@ export class Socket extends (EventEmitter as new () => TypedEmitter { diff --git a/packages/net2/src/subscribe.ts b/packages/net2/src/subscribe.ts index 9f2e685..5c04055 100644 --- a/packages/net2/src/subscribe.ts +++ b/packages/net2/src/subscribe.ts @@ -1,7 +1,7 @@ import {EventEmitter} from "events" -import {on, call, randomId} from "@welshman/lib" +import {on, call, randomId, yieldThread} from "@welshman/lib" import {Filter, matchFilter, SignedEvent} from "@welshman/util" -import {RelayMessage, isRelayEvent, isRelayEose} from "./message.js" +import {RelayMessage, ClientMessageType, isRelayEvent, isRelayEose} from "./message.js" import {AbstractAdapter, AdapterEventType} from "./adapter.js" import {SocketEventType, SocketStatus} from "./socket.js" import {TypedEmitter, Unsubscriber} from "./util.js" @@ -40,26 +40,28 @@ export type SubscriptionOptions = { export class Subscription extends (EventEmitter as new () => TypedEmitter) { _id = `REQ-${randomId().slice(0, 8)}` _unsubscribers: Unsubscriber[] = [] + _done = new Set() _closed = false constructor(readonly options: SubscriptionOptions) { super() - const done = new Set() - const urls = new Set(options.adapter.urls) + // Get our unique urls so we know when we're done + const urls = new Set(this.options.adapter.urls) + // Listen for event/eose messages from the adapter this._unsubscribers.push( - on(options.adapter, AdapterEventType.Receive, (message: RelayMessage, url: string) => { + on(this.options.adapter, AdapterEventType.Receive, (message: RelayMessage, url: string) => { if (isRelayEvent(message)) { const [_, id, event] = message if (id !== this._id) return - if (options.tracker?.track(event.id, url)) { + if (this.options.tracker?.track(event.id, url)) { this.emit(SubscriptionEventType.Duplicate, event, url) - } else if (options.verifyEvent?.(event) === false) { + } else if (this.options.verifyEvent?.(event) === false) { this.emit(SubscriptionEventType.Invalid, event, url) - } else if (!matchFilter(options.filter, event)) { + } else if (!matchFilter(this.options.filter, event)) { this.emit(SubscriptionEventType.Filtered, event, url) } else { this.emit(SubscriptionEventType.Event, event, url) @@ -72,9 +74,9 @@ export class Subscription extends (EventEmitter as new () => TypedEmitter TypedEmitter { if (![SocketStatus.Open, SocketStatus.Opening].includes(status)) { this.emit(SubscriptionEventType.Disconnect, socket.url) - done.add(socket.url) + this._done.add(socket.url) - if (options.autoClose && done.size === urls.size) { + if (this.options.autoClose && this._done.size === urls.size) { this.close() } } @@ -98,17 +101,25 @@ export class Subscription extends (EventEmitter as new () => TypedEmitter this.close(), options.timeout) - } - - if (options.events) { - for (const [k, listener] of Object.entries(options.events)) { + // Register listeners + if (this.options.events) { + for (const [k, listener] of Object.entries(this.options.events)) { this.on(k as keyof SubscriptionEvents, listener) } } - options.adapter.send(["REQ", this._id, options.filter]) + // Autostart asynchronously so the caller can set up listeners + yieldThread().then(this.open) + } + + open = () => { + // Timeout our subscription + if (this.options.timeout) { + setTimeout(() => this.close(), this.options.timeout) + } + + // Send our request + this.options.adapter.send([ClientMessageType.Req, this._id, this.options.filter]) } close() { @@ -116,6 +127,7 @@ export class Subscription extends (EventEmitter as new () => TypedEmitter