From 88b44776d896f6b9b13b227fa52284b9211b3674 Mon Sep 17 00:00:00 2001 From: Jon Staab Date: Mon, 24 Mar 2025 15:30:49 -0700 Subject: [PATCH] Add diff utils, change how adapter is created, scope down subscribe/publish to a single relay for fastest possible completion --- packages/net2/src/adapter.ts | 85 ++++++++------ packages/net2/src/diff.ts | 207 ++++++++++++++++++++++++++++----- packages/net2/src/publish.ts | 63 +++++----- packages/net2/src/subscribe.ts | 33 +++--- 4 files changed, 276 insertions(+), 112 deletions(-) diff --git a/packages/net2/src/adapter.ts b/packages/net2/src/adapter.ts index 05502bb..1874161 100644 --- a/packages/net2/src/adapter.ts +++ b/packages/net2/src/adapter.ts @@ -1,9 +1,10 @@ import EventEmitter from "events" import {call, on} from "@welshman/lib" -import {Relay, LOCAL_RELAY_URL} from "@welshman/util" +import {Relay, LOCAL_RELAY_URL, isRelayUrl} from "@welshman/util" import {RelayMessage, ClientMessage} from "./message.js" import {Socket, SocketEventType} from "./socket.js" import {TypedEmitter, Unsubscriber} from "./util.js" +import {Pool} from "./pool.js" export enum AdapterEventType { Receive = "adapter:event:receive", @@ -26,25 +27,27 @@ export abstract class AbstractAdapter extends (EventEmitter as new () => TypedEm } } -export class SocketsAdapter extends AbstractAdapter { - constructor(readonly sockets: Socket[]) { +export class SocketAdapter extends AbstractAdapter { + constructor(readonly socket: Socket) { super() - this._unsubscribers = sockets.map(socket => { - return on(socket, SocketEventType.Receive, (message: RelayMessage, url: string) => { + this._unsubscribers.push( + on(socket, SocketEventType.Receive, (message: RelayMessage, url: string) => { this.emit(AdapterEventType.Receive, message, url) - }) - }) + }), + ) + } + + get sockets() { + return [this.socket] } get urls() { - return this.sockets.map(socket => socket.url) + return [this.socket.url] } send(message: ClientMessage) { - for (const socket of this.sockets) { - socket.send(message) - } + this.socket.send(message) } } @@ -52,21 +55,21 @@ export class LocalAdapter extends AbstractAdapter { constructor(readonly relay: Relay) { super() - this._unsubscribers = [ + this._unsubscribers.push( on(relay, "*", (...message: RelayMessage) => { this.emit(AdapterEventType.Receive, message, LOCAL_RELAY_URL) }), - ] - } - - get urls() { - return [LOCAL_RELAY_URL] + ) } get sockets() { return [] } + get urls() { + return [LOCAL_RELAY_URL] + } + send(message: ClientMessage) { const [type, ...rest] = message @@ -74,28 +77,36 @@ export class LocalAdapter extends AbstractAdapter { } } -export class MultiAdapter extends AbstractAdapter { - constructor(readonly adapters: AbstractAdapter[]) { - super() +export type AdapterContext = { + pool?: Pool + relay?: Relay + getAdapter?: (url: string, context: AdapterContext) => AbstractAdapter +} - this._unsubscribers = adapters.map(adapter => { - return on(adapter, AdapterEventType.Receive, (message: RelayMessage, url: string) => { - this.emit(AdapterEventType.Receive, message, url) - }) - }) - } +export const getAdapter = (url: string, context: AdapterContext) => { + if (context.getAdapter) { + const adapter = context.getAdapter(url, context) - get urls() { - return this.adapters.flatMap(t => t.urls) - } - - get sockets() { - return this.adapters.flatMap(t => t.sockets) - } - - send(message: ClientMessage) { - for (const adapter of this.adapters) { - adapter.send(message) + if (adapter) { + return adapter } } + + if (url === LOCAL_RELAY_URL) { + if (!context.relay) { + throw new Error(`Unable to get local relay for ${url}`) + } + + return new LocalAdapter(context.relay) + } + + if (isRelayUrl(url)) { + if (!context.pool) { + throw new Error(`Unable to get socket for ${url}`) + } + + return new SocketAdapter(context.pool.get(url)) + } + + throw new Error(`Invalid relay url ${url}`) } diff --git a/packages/net2/src/diff.ts b/packages/net2/src/diff.ts index 4e821a6..ab0f6d7 100644 --- a/packages/net2/src/diff.ts +++ b/packages/net2/src/diff.ts @@ -1,6 +1,7 @@ import {EventEmitter} from "events" -import {on, randomId} from "@welshman/lib" +import {on, sleep, randomId, groupBy, pushToMapKey, inc, flatten, chunk} from "@welshman/lib" import {SignedEvent, Filter} from "@welshman/util" +import {TypedEmitter} from "./util.js" import { RelayMessage, isRelayNegErr, @@ -8,37 +9,47 @@ import { RelayMessageType, ClientMessageType, } from "./message.js" -import {AbstractAdapter, AdapterEventType} from "./adapter.js" +import {getAdapter, AdapterContext, AbstractAdapter, AdapterEventType} from "./adapter.js" import {Negentropy, NegentropyStorageVector} from "./negentropy.js" -import {TypedEmitter} from "./util.js" +import {subscribe, SubscriptionEventType} from "./subscribe.js" +import {publish, PublishEventType} from "./publish.js" -export enum DiffEventType { - Message = "diff:event:message", - Error = "diff:event:error", - Close = "diff:event:close", +export enum DifferenceEventType { + Message = "difference:event:message", + Error = "difference:event:error", + Close = "difference:event:close", } -export type DiffEvents = { - [DiffEventType.Message]: (payload: {have: string[]; need: string[]}, url: string) => void - [DiffEventType.Error]: (error: string, url: string) => void - [DiffEventType.Close]: () => void +export type DifferenceEvents = { + [DifferenceEventType.Message]: (payload: {have: string[]; need: string[]}, url: string) => void + [DifferenceEventType.Error]: (error: string, url: string) => void + [DifferenceEventType.Close]: () => void } -export type DiffOptions = { +export type DifferenceOptions = { + relay: string filter: Filter events: SignedEvent[] - adapter: AbstractAdapter - on?: Partial + context: AdapterContext + on?: Partial } -export class Diff extends (EventEmitter as new () => TypedEmitter) { +export class Difference extends (EventEmitter as new () => TypedEmitter) { + have = new Set() + need = new Set() + _id = `NEG-${randomId().slice(0, 8)}` _unsubscriber: () => void + _adapter: AbstractAdapter _closed = false - constructor(readonly options: DiffOptions) { + constructor(readonly options: DifferenceOptions) { super() + // Set up our adapter + this._adapter = getAdapter(this.options.relay, this.options.context) + + // Set up negentropy const storage = new NegentropyStorageVector() const neg = new Negentropy(storage, 50_000) @@ -48,8 +59,9 @@ export class Diff extends (EventEmitter as new () => TypedEmitter) { storage.seal() + // Add listeners this._unsubscriber = on( - this.options.adapter, + this._adapter, AdapterEventType.Receive, async (message: RelayMessage, url: string) => { if (isRelayNegMsg(message)) { @@ -58,12 +70,18 @@ export class Diff extends (EventEmitter as new () => TypedEmitter) { if (negid === this._id) { const [newMsg, have, need] = await neg.reconcile(msg) - this.emit(DiffEventType.Message, {have, need}, url) + for (const id of have) { + this.have.add(id) + } + + for (const id of need) { + this.need.add(id) + } + + this.emit(DifferenceEventType.Message, {have, need}, url) if (newMsg) { - this.options.adapter.send([RelayMessageType.NegMsg, this._id, newMsg]) - } else { - this.close() + this._adapter.send([RelayMessageType.NegMsg, this._id, newMsg]) } } } @@ -72,7 +90,7 @@ export class Diff extends (EventEmitter as new () => TypedEmitter) { const [_, negid, msg] = message if (negid === this._id) { - this.emit(DiffEventType.Error, msg, url) + this.emit(DifferenceEventType.Error, msg, url) } } }, @@ -81,22 +99,159 @@ export class Diff extends (EventEmitter as new () => TypedEmitter) { // Register listeners if (this.options.on) { for (const [k, listener] of Object.entries(this.options.on)) { - this.on(k as keyof DiffEvents, listener) + this.on(k as keyof DifferenceEvents, listener) } } neg.initiate().then((msg: string) => { - this.options.adapter.send([ClientMessageType.NegOpen, this._id, this.options.filter, msg]) + this._adapter.send([ClientMessageType.NegOpen, this._id, this.options.filter, msg]) }) } close() { if (this._closed) return - this.options.adapter.send([ClientMessageType.NegClose, this._id]) - this.emit(DiffEventType.Close) + this._adapter.send([ClientMessageType.NegClose, this._id]) + this.emit(DifferenceEventType.Close) this.removeAllListeners() + this._adapter.cleanup() this._unsubscriber() this._closed = true } } + +// diff is a shortcut for diffing multiple filters across multiple relays + +export type DiffOptions = { + relays: string[] + filters: Filter[] + events: SignedEvent[] + context: AdapterContext +} + +export type DiffItem = { + relay: string + have: Set + need: Set +} + +export const diff = async ({relays, filters, ...options}: DiffOptions) => { + const diffs = flatten( + await Promise.all( + relays.flatMap(async relay => { + return await Promise.all( + filters.map( + async filter => + new Promise((resolve, reject) => { + const diff = new Difference({relay, filter, ...options}) + + diff.on(DifferenceEventType.Close, () => { + resolve({relay, have: diff.have, need: diff.need}) + diff.close() + }) + + diff.on(DifferenceEventType.Error, (url, message) => { + reject(message) + diff.close() + }) + + sleep(30_000).then(() => { + reject("timeout") + diff.close() + }) + }), + ), + ) + }), + ), + ) + + return Array.from(groupBy(diff => diff.relay, diffs).entries()).map(([relay, diffs]) => { + const have = new Set() + const need = new Set() + + for (const diff of diffs) { + for (const id of diff.have) { + have.add(id) + } + + for (const id of diff.need) { + need.add(id) + } + } + + return {relay, have: Array.from(have), need: Array.from(need)} + }) +} + +// Pull diffs multiple arrays and fetches missing events + +export type PullOptions = DiffOptions + +export const pull = async ({context, ...options}: PullOptions) => { + const countById = new Map() + const idsByRelay = new Map() + + for (const {relay, need} of await diff({context, ...options})) { + for (const id of need) { + const count = countById.get(id) || 0 + + // Reduce, but don't completely eliminate duplicates, just in case a relay + // won't give us what we ask for. + if (count < 2) { + pushToMapKey(idsByRelay, relay, id) + countById.set(id, inc(count)) + } + } + } + + const result: SignedEvent[] = [] + + await Promise.all( + Array.from(idsByRelay.entries()).map(([relay, allIds]) => { + return Promise.all( + chunk(500, allIds).map(ids => { + return new Promise(resolve => { + const sub = subscribe({relay, filter: {ids}, context, autoClose: true}) + + sub.on(SubscriptionEventType.Close, resolve) + sub.on(SubscriptionEventType.Event, event => result.push(event)) + }) + }), + ) + }), + ) + + return result +} + +// Push diffs multiple relays and publishes missing events + +export type PushOptions = DiffOptions + +export const push = async ({context, events, ...options}: PushOptions) => { + const relaysById = new Map() + + for (const {relay, have} of await diff({context, events, ...options})) { + for (const id of have) { + pushToMapKey(relaysById, id, relay) + } + } + + await Promise.all( + events.map(async event => { + const relays = relaysById.get(event.id) + + if (relays) { + await Promise.all( + relays.map( + relay => + new Promise(resolve => { + publish({event, relay, context}).on(PublishEventType.Complete, resolve) + }), + ), + ) + } + }), + ) +} diff --git a/packages/net2/src/publish.ts b/packages/net2/src/publish.ts index 000ca62..194240e 100644 --- a/packages/net2/src/publish.ts +++ b/packages/net2/src/publish.ts @@ -2,7 +2,7 @@ import {EventEmitter} from "events" import {on, sleep, yieldThread} from "@welshman/lib" import {SignedEvent} from "@welshman/util" import {RelayMessage, ClientMessageType, isRelayOk} from "./message.js" -import {AbstractAdapter, AdapterEventType} from "./adapter.js" +import {AbstractAdapter, AdapterEventType, AdapterContext, getAdapter} from "./adapter.js" import {TypedEmitter} from "./util.js" export enum PublishStatus { @@ -13,33 +13,42 @@ export enum PublishStatus { Aborted = "publish:status:aborted", } +export enum PublishEventType { + Complete = "publish:status:complete", +} + export type PublishEvents = { - [PublishStatus.Pending]: (url: string) => void [PublishStatus.Success]: (id: string, detail: string, url: string) => void [PublishStatus.Failure]: (id: string, detail: string, url: string) => void - [PublishStatus.Timeout]: (url: string) => void - [PublishStatus.Aborted]: (url: string) => void + [PublishStatus.Timeout]: () => void + [PublishStatus.Aborted]: () => void + [PublishEventType.Complete]: () => void } export type PublishOptions = { - adapter: AbstractAdapter + relay: string event: SignedEvent + context: AdapterContext timeout?: number on?: Partial } export class Publish extends (EventEmitter as new () => TypedEmitter) { - status = new Map() + status = PublishStatus.Pending _done = new Set() _unsubscriber: () => void + _adapter: AbstractAdapter constructor(readonly options: PublishOptions) { super() + // Set up our adapter + this._adapter = getAdapter(this.options.relay, this.options.context) + // Listen for publish result this._unsubscriber = on( - this.options.adapter, + this._adapter, AdapterEventType.Receive, (message: RelayMessage, url: string) => { if (isRelayOk(message)) { @@ -48,16 +57,14 @@ export class Publish extends (EventEmitter as new () => TypedEmitter TypedEmitter { - // Initialize status - for (const url of this.options.adapter.urls) { - this.status.set(url, PublishStatus.Pending) - this.emit(PublishStatus.Pending, url) - } - // Set timeout sleep(this.options.timeout || 10_000).then(() => { - for (const [url, status] of this.status.entries()) { - if (status === PublishStatus.Pending) { - this.status.set(url, PublishStatus.Timeout) - this.emit(PublishStatus.Timeout, url) - } + if (this.status === PublishStatus.Pending) { + this.status = PublishStatus.Timeout + this.emit(PublishStatus.Timeout) } this.cleanup() }) // Send the publish message - this.options.adapter.send([ClientMessageType.Event, event]) + this._adapter.send([ClientMessageType.Event, event]) } abort = () => { - for (const [url, status] of this.status.entries()) { - if (status === PublishStatus.Pending) { - this.status.set(url, PublishStatus.Aborted) - this.emit(PublishStatus.Aborted, url) - } + if (this.status === PublishStatus.Pending) { + this.status = PublishStatus.Aborted + this.emit(PublishStatus.Aborted) + this.cleanup() } - - this.cleanup() } cleanup = () => { - this.options.adapter.cleanup() + this.emit(PublishEventType.Complete) this.removeAllListeners() + this._adapter.cleanup() this._unsubscriber() } } + +export const publish = (options: PublishOptions) => new Publish(options) diff --git a/packages/net2/src/subscribe.ts b/packages/net2/src/subscribe.ts index 0317bb9..54fe03f 100644 --- a/packages/net2/src/subscribe.ts +++ b/packages/net2/src/subscribe.ts @@ -2,7 +2,7 @@ import {EventEmitter} from "events" import {on, call, randomId, yieldThread} from "@welshman/lib" import {Filter, matchFilter, SignedEvent} from "@welshman/util" import {RelayMessage, ClientMessageType, isRelayEvent, isRelayEose} from "./message.js" -import {AbstractAdapter, AdapterEventType} from "./adapter.js" +import {getAdapter, AdapterContext, AbstractAdapter, AdapterEventType} from "./adapter.js" import {SocketEventType, SocketStatus} from "./socket.js" import {TypedEmitter, Unsubscriber} from "./util.js" import {Tracker} from "./tracker.js" @@ -28,11 +28,12 @@ export type SubscriptionEvents = { } export type SubscriptionOptions = { - adapter: AbstractAdapter - autoClose?: boolean + relay: string filter: Filter + context: AdapterContext timeout?: number tracker?: Tracker + autoClose?: boolean verifyEvent?: (event: SignedEvent) => boolean on?: Partial } @@ -40,18 +41,18 @@ export type SubscriptionOptions = { export class Subscription extends (EventEmitter as new () => TypedEmitter) { _id = `REQ-${randomId().slice(0, 8)}` _unsubscribers: Unsubscriber[] = [] - _done = new Set() + _adapter: AbstractAdapter _closed = false constructor(readonly options: SubscriptionOptions) { super() - // Get our unique urls so we know when we're done - const urls = new Set(this.options.adapter.urls) + // Set up our adapter + this._adapter = getAdapter(this.options.relay, this.options.context) // Listen for event/eose messages from the adapter this._unsubscribers.push( - on(this.options.adapter, AdapterEventType.Receive, (message: RelayMessage, url: string) => { + on(this._adapter, AdapterEventType.Receive, (message: RelayMessage, url: string) => { if (isRelayEvent(message)) { const [_, id, event] = message @@ -74,9 +75,7 @@ export class Subscription extends (EventEmitter as new () => TypedEmitter TypedEmitter { if (![SocketStatus.Open, SocketStatus.Opening].includes(status)) { this.emit(SubscriptionEventType.Disconnect, socket.url) - this._done.add(socket.url) - - if (this.options.autoClose && this._done.size === urls.size) { + if (this.options.autoClose) { this.close() } } @@ -119,17 +116,19 @@ export class Subscription extends (EventEmitter as new () => TypedEmitter new Subscription(options)