From 989fc74374dae2cf8684b22fce2841b0c9bc87a7 Mon Sep 17 00:00:00 2001 From: Jon Staab Date: Thu, 10 Apr 2025 10:38:47 -0700 Subject: [PATCH] Change requests from classes to functions --- packages/app/src/feeds.ts | 29 +- packages/app/src/sync.ts | 7 +- packages/dvm/src/handler.ts | 13 +- packages/dvm/src/request.ts | 41 ++- packages/net/__tests__/request.test.ts | 81 +++-- packages/net/src/diff.ts | 22 +- packages/net/src/request.ts | 444 ++++++++++++------------- packages/signer/src/signers/nip46.ts | 41 ++- 8 files changed, 336 insertions(+), 342 deletions(-) diff --git a/packages/app/src/feeds.ts b/packages/app/src/feeds.ts index b06df22..a15f940 100644 --- a/packages/app/src/feeds.ts +++ b/packages/app/src/feeds.ts @@ -1,6 +1,6 @@ import {nthEq, partition, race, now} from "@welshman/lib" import {createEvent, getPubkeyTagValues} from "@welshman/util" -import {MultiRequest, Tracker, RequestEvent, request} from "@welshman/net" +import {request, Tracker} from "@welshman/net" import {Scope, FeedController, RequestOpts, FeedOptions, DVMOpts, Feed} from "@welshman/feeds" import {makeDvmRequest, DVMEvent} from "@welshman/dvm" import {makeSecret, Nip01Signer} from "@welshman/signer" @@ -27,13 +27,17 @@ export const makeFeedRequestHandler = ({signal}: FeedRequestHandlerOptions) => req.on(RequestEvent.Close, resolve) }) } else { - const requests: MultiRequest[] = [] + const promises: Promise[][] = [] const [withSearch, withoutSearch] = partition(f => Boolean(f.search), filters) if (withSearch.length > 0) { - requests.push( + promises.push( request({ - tracker, signal, autoClose: true, + signal, + tracker, + onEvent + threshold: 0.1, + autoClose: true, filters: withSearch, relays: Router.get().Search().getUrls(), }), @@ -41,25 +45,16 @@ export const makeFeedRequestHandler = ({signal}: FeedRequestHandlerOptions) => } if (withoutSearch.length > 0) { - requests.push( - ...getFilterSelections(filters).flatMap(options => - request({tracker, signal, autoClose: true, ...options}), + promises.push( + ...getFilterSelections(filters).flatMap(({relays, filters}) => + request({tracker, signal, onEvent, relays, filters, threshold: 0.8, autoClose: true}), ), ) } // Break out selections by relay so we can complete early after a certain number // of requests complete for faster load times - await race( - withSearch.length > 0 ? 0.1 : 0.8, - requests.map( - req => - new Promise(resolve => { - req.on(RequestEvent.Event, onEvent) - req.on(RequestEvent.Close, resolve) - }), - ), - ) + await race(withSearch.length > 0 ? 0.1 : 0.8, promises) // Wait until after we've queried the network to access our local cache. This results in less // snappy response times, but is necessary to prevent stale stuff that the user has already seen diff --git a/packages/app/src/sync.ts b/packages/app/src/sync.ts index 92f803e..bdbadea 100644 --- a/packages/app/src/sync.ts +++ b/packages/app/src/sync.ts @@ -4,9 +4,8 @@ import { push as basePush, pull as basePull, PublishEvent, - RequestEvent, SinglePublish, - SingleRequest, + requestOne, } from "@welshman/net" import {repository} from "./core.js" import {relaysByUrl} from "./relays.js" @@ -35,9 +34,7 @@ export const pull = async ({relays, filters}: AppSyncOpts) => { relays.map(async relay => { await (hasNegentropy(relay) ? basePull({filters, events, relays: [relay]}) - : new Promise(resolve => { - new SingleRequest({filters, relay, autoClose: true}).on(RequestEvent.Close, resolve) - })) + : requestOne({filters, relay, autoClose: true})) }), ) } diff --git a/packages/dvm/src/handler.ts b/packages/dvm/src/handler.ts index 50c5fc1..5e67cc8 100644 --- a/packages/dvm/src/handler.ts +++ b/packages/dvm/src/handler.ts @@ -1,7 +1,7 @@ import {now} from "@welshman/lib" import {Nip01Signer} from "@welshman/signer" import {TrustedEvent, StampedEvent, Filter} from "@welshman/util" -import {MultiRequest, MultiPublish, PublishEvent, RequestEvent, AdapterContext} from "@welshman/net" +import {request, MultiPublish, PublishEvent, AdapterContext} from "@welshman/net" export type DVMHandler = { stop?: () => void @@ -50,10 +50,13 @@ export class DVM { filter["#p"] = [pubkey] } - const req = new MultiRequest({relays, filters: [filter], context}) - - req.on(RequestEvent.Event, this.onEvent) - req.on(RequestEvent.Close, resolve) + request({ + relays, + filters: [filter], + context, + onClose: resolve, + onEvent: this.onEvent, + }) }) } } diff --git a/packages/dvm/src/request.ts b/packages/dvm/src/request.ts index 1bb0f16..abc9e30 100644 --- a/packages/dvm/src/request.ts +++ b/packages/dvm/src/request.ts @@ -1,6 +1,6 @@ import {Emitter, now} from "@welshman/lib" import {TrustedEvent, SignedEvent, Filter} from "@welshman/util" -import {MultiRequest, MultiPublish, RequestEvent, AdapterContext} from "@welshman/net" +import {request, MultiPublish, AdapterContext} from "@welshman/net" export enum DVMEvent { Progress = "progress", @@ -17,13 +17,12 @@ export type DVMRequestOptions = { } export type DVMRequest = { - request: DVMRequestOptions + options: DVMRequestOptions emitter: Emitter - sub: MultiRequest pub: MultiPublish } -export const makeDvmRequest = (request: DVMRequestOptions) => { +export const makeDvmRequest = (options: DVMRequestOptions) => { const emitter = new Emitter() const { event, @@ -32,25 +31,33 @@ export const makeDvmRequest = (request: DVMRequestOptions) => { timeout = 30_000, autoClose = true, reportProgress = true, - } = request + } = options const kind = event.kind + 1000 const kinds = reportProgress ? [kind, 7000] : [kind] const filters: Filter[] = [{kinds, since: now() - 60, "#e": [event.id]}] + const abortController = new AbortController() + const signal = AbortSignal.any([abortController.signal, AbortSignal.timeout(timeout)]) - const sub = new MultiRequest({relays, filters, timeout, context}) - const pub = new MultiPublish({relays, event, timeout, context}) + request({ + signal, + relays, + filters, + context, + onEvent: (event: TrustedEvent, url: string) => { + if (event.kind === 7000) { + emitter.emit(DVMEvent.Progress, url, event) + } else { + emitter.emit(DVMEvent.Result, url, event) - sub.on(RequestEvent.Event, (event: TrustedEvent, url: string) => { - if (event.kind === 7000) { - emitter.emit(DVMEvent.Progress, url, event) - } else { - emitter.emit(DVMEvent.Result, url, event) - - if (autoClose) { - sub.close() + if (autoClose) { + abortController.abort() + } } - } + }, }) - return {request, emitter, sub, pub} as DVMRequest + const pub = new MultiPublish({relays, event, timeout, context}) + + + return {options, emitter, pub} as DVMRequest } diff --git a/packages/net/__tests__/request.test.ts b/packages/net/__tests__/request.test.ts index 20eb722..52046fc 100644 --- a/packages/net/__tests__/request.test.ts +++ b/packages/net/__tests__/request.test.ts @@ -3,9 +3,9 @@ import {Nip01Signer} from "@welshman/signer" import {makeEvent} from "@welshman/util" import {ClientMessageType} from "../src/message" import {MockAdapter} from "../src/adapter" -import {SingleRequest, MultiRequest, RequestEvent} from "../src/request" +import {requestOne, request} from "../src/request" -describe("SingleRequest", () => { +describe("requestOne", () => { beforeEach(() => { vi.useFakeTimers() }) @@ -17,12 +17,6 @@ describe("SingleRequest", () => { it("everything basically works", async () => { const sendSpy = vi.fn() const adapter = new MockAdapter("1", sendSpy) - const req = new SingleRequest({ - relay: "whatever", - filters: [{kinds: [1]}], - context: {getAdapter: () => adapter}, - }) - const duplicateSpy = vi.fn() const invalidSpy = vi.fn() const filteredSpy = vi.fn() @@ -30,18 +24,21 @@ describe("SingleRequest", () => { const eoseSpy = vi.fn() const closeSpy = vi.fn() - req.on(RequestEvent.Duplicate, duplicateSpy) - req.on(RequestEvent.Invalid, invalidSpy) - req.on(RequestEvent.Filtered, filteredSpy) - req.on(RequestEvent.Event, eventSpy) - req.on(RequestEvent.Eose, eoseSpy) - req.on(RequestEvent.Close, closeSpy) + requestOne({ + relay: "whatever", + filters: [{kinds: [1]}], + context: {getAdapter: () => adapter}, + onDuplicate: duplicateSpy, + onInvalid: invalidSpy, + onFiltered: filteredSpy, + onEvent: eventSpy, + onEose: eoseSpy, + onClose: closeSpy, + }) await vi.runAllTimersAsync() - const id = Array.from(req._ids)[0] - - expect(sendSpy).toHaveBeenCalledWith([ClientMessageType.Req, id, {kinds: [1]}]) + expect(sendSpy).toHaveBeenCalledWith([ClientMessageType.Req, expect.any(String), {kinds: [1]}]) const signer = Nip01Signer.ephemeral() const event1 = await signer.sign(makeEvent(1)) @@ -71,7 +68,7 @@ describe("SingleRequest", () => { }) }) -describe("MultiRequest", () => { +describe("request", () => { beforeEach(() => { vi.useFakeTimers() }) @@ -85,14 +82,6 @@ describe("MultiRequest", () => { const adapter1 = new MockAdapter("1", send1Spy) const send2Spy = vi.fn() const adapter2 = new MockAdapter("2", send2Spy) - const req = new MultiRequest({ - relays: ["1", "2"], - filters: [{kinds: [1]}], - context: { - getAdapter: (url: string) => (url === "1" ? adapter1 : adapter2), - }, - }) - const duplicateSpy = vi.fn() const invalidSpy = vi.fn() const filteredSpy = vi.fn() @@ -100,20 +89,24 @@ describe("MultiRequest", () => { const eoseSpy = vi.fn() const closeSpy = vi.fn() - req.on(RequestEvent.Duplicate, duplicateSpy) - req.on(RequestEvent.Invalid, invalidSpy) - req.on(RequestEvent.Filtered, filteredSpy) - req.on(RequestEvent.Event, eventSpy) - req.on(RequestEvent.Eose, eoseSpy) - req.on(RequestEvent.Close, closeSpy) + request({ + relays: ["1", "2"], + filters: [{kinds: [1]}], + context: { + getAdapter: (url: string) => (url === "1" ? adapter1 : adapter2), + }, + onDuplicate: duplicateSpy, + onInvalid: invalidSpy, + onFiltered: filteredSpy, + onEvent: eventSpy, + onEose: eoseSpy, + onClose: closeSpy, + }) await vi.runAllTimersAsync() - const id1 = Array.from(req._children[0]._ids)[0] - const id2 = Array.from(req._children[1]._ids)[0] - - expect(send1Spy).toHaveBeenCalledWith([ClientMessageType.Req, id1, {kinds: [1]}]) - expect(send2Spy).toHaveBeenCalledWith([ClientMessageType.Req, id2, {kinds: [1]}]) + expect(send1Spy).toHaveBeenCalledTimes(1) + expect(send2Spy).toHaveBeenCalledTimes(1) const signer = Nip01Signer.ephemeral() const event1 = await signer.sign(makeEvent(1)) @@ -121,11 +114,11 @@ describe("MultiRequest", () => { const event3 = makeEvent(1) const event4 = await signer.sign(makeEvent(1)) - adapter1.receive(["EVENT", id1, event1]) - adapter1.receive(["EVENT", id1, event2]) - adapter1.receive(["EVENT", id1, event3]) - adapter2.receive(["EVENT", id2, event1]) - adapter2.receive(["EVENT", id2, event4]) + adapter1.receive(["EVENT", expect.any(String), event1]) + adapter1.receive(["EVENT", expect.any(String), event2]) + adapter1.receive(["EVENT", expect.any(String), event3]) + adapter2.receive(["EVENT", expect.any(String), event1]) + adapter2.receive(["EVENT", expect.any(String), event4]) await vi.runAllTimersAsync() @@ -135,8 +128,8 @@ describe("MultiRequest", () => { expect(eventSpy).toHaveBeenCalledWith(event1, "1") expect(eoseSpy).toHaveBeenCalledTimes(0) - adapter1.receive(["EOSE", id1]) - adapter2.receive(["EOSE", id2]) + adapter1.receive(["EOSE", expect.any(String)]) + adapter2.receive(["EOSE", expect.any(String)]) expect(eoseSpy).toHaveBeenCalledTimes(2) diff --git a/packages/net/src/diff.ts b/packages/net/src/diff.ts index 168d1ba..c113ad3 100644 --- a/packages/net/src/diff.ts +++ b/packages/net/src/diff.ts @@ -10,7 +10,7 @@ import { } from "./message.js" import {getAdapter, AdapterContext, AbstractAdapter, AdapterEvent} from "./adapter.js" import {Negentropy, NegentropyStorageVector} from "./negentropy.js" -import {SingleRequest, RequestEvent} from "./request.js" +import {requestOne} from "./request.js" import {MultiPublish, PublishEvent} from "./publish.js" export enum DifferenceEvent { @@ -200,14 +200,18 @@ export const pull = async ({context, ...options}: PullOptions) => { await Promise.all( Array.from(idsByRelay.entries()).map(([relay, allIds]) => { return Promise.all( - chunk(500, allIds).map(ids => { - return new Promise(resolve => { - const req = new SingleRequest({relay, context, filters: [{ids}], autoClose: true}) - - req.on(RequestEvent.Close, resolve) - req.on(RequestEvent.Event, event => result.push(event as SignedEvent)) - }) - }), + chunk(500, allIds).map(ids => + new Promise(resolve => + requestOne({ + relay, + context, + filters: [{ids}], + autoClose: true, + onClose: resolve, + onEvent: event => result.push(event as SignedEvent), + }) + ) + ), ) }), ) diff --git a/packages/net/src/request.ts b/packages/net/src/request.ts index c41d2dd..88cf2f6 100644 --- a/packages/net/src/request.ts +++ b/packages/net/src/request.ts @@ -1,5 +1,5 @@ import {EventEmitter} from "events" -import {on, call, randomId, yieldThread, pushToMapKey, batcher} from "@welshman/lib" +import {on, flatten, addToMapKey, defer, Deferred, call, randomId, yieldThread, pushToMapKey, batcher} from "@welshman/lib" import { Filter, unionFilters, @@ -14,232 +14,165 @@ import {Unsubscriber} from "./util.js" import {netContext} from "./context.js" import {Tracker} from "./tracker.js" -export enum RequestEvent { - Close = "request:event:close", - Disconnect = "request:event:disconnect", - Duplicate = "request:event:duplicate", - Eose = "request:event:eose", - Event = "request:event:event", - Filtered = "request:event:filtered", - Deleted = "request:event:deleted", - Invalid = "request:event:invalid", -} - -// SingleRequest - -export type SingleRequestEvents = { - [RequestEvent.Event]: (event: TrustedEvent) => void - [RequestEvent.Deleted]: (event: any) => void - [RequestEvent.Invalid]: (event: any) => void - [RequestEvent.Filtered]: (event: TrustedEvent) => void - [RequestEvent.Duplicate]: (event: TrustedEvent) => void - [RequestEvent.Disconnect]: () => void - [RequestEvent.Close]: () => void - [RequestEvent.Eose]: () => void -} - -export type SingleRequestOptions = { +export type RequestOneOptions = { relay: string filters: Filter[] signal?: AbortSignal - context?: AdapterContext - timeout?: number tracker?: Tracker + context?: AdapterContext autoClose?: boolean isEventValid?: (event: TrustedEvent, url: string) => boolean isEventDeleted?: (event: TrustedEvent, url: string) => boolean + onEvent?: (event: TrustedEvent, url: string) => void + onDeleted?: (event: unknown, url: string) => void + onInvalid?: (event: unknown, url: string) => void + onFiltered?: (event: TrustedEvent, url: string) => void + onDuplicate?: (event: TrustedEvent, url: string) => void + onDisconnect?: (url: string) => void + onEose?: (url: string) => void + onClose?: () => void } -export class SingleRequest extends EventEmitter { - _ids = new Set() - _eose = new Set() - _unsubscribers: Unsubscriber[] = [] - _adapter: AbstractAdapter - _closed = false +export const requestOne = (options: RequestOneOptions) => { + const ids = new Set() + const eose = new Set() + const events: TrustedEvent[] = [] + const deferred = defer() + const tracker = options.tracker || new Tracker() + const adapter = getAdapter(options.relay, options.context) + const isEventValid = options.isEventValid || netContext.isEventValid + const isEventDeleted = options.isEventDeleted || netContext.isEventDeleted - constructor(readonly options: SingleRequestOptions) { - super() + let closed = false - const tracker = options.tracker || new Tracker() - const isEventValid = options.isEventValid || netContext.isEventValid - const isEventDeleted = options.isEventDeleted || netContext.isEventDeleted + const close = () => { + if (closed) return - // Set up our adapter - this._adapter = getAdapter(this.options.relay, this.options.context) + closed = true - // Listen for event/eose messages from the adapter - this._unsubscribers.push( - on(this._adapter, AdapterEvent.Receive, (message: RelayMessage, url: string) => { - if (isRelayEvent(message)) { - const [_, id, event] = message + for (const id of ids) { + adapter.send(["CLOSE", id]) + } - if (this._ids.has(id)) { - if (tracker.track(event.id, url)) { - this.emit(RequestEvent.Duplicate, event) - } else if (isEventDeleted(event, url)) { - this.emit(RequestEvent.Deleted, event) - } else if (!isEventValid(event, url)) { - this.emit(RequestEvent.Invalid, event) - } else if (!matchFilters(this.options.filters, event)) { - this.emit(RequestEvent.Filtered, event) - } else { - this.emit(RequestEvent.Event, event) + options.onClose?.() + adapter.cleanup() + unsubscribers.map(call) + deferred.resolve(events) + } + + const unsubscribers = [ + on(adapter, AdapterEvent.Receive, (message: RelayMessage, url: string) => { + if (isRelayEvent(message)) { + const [_, id, event] = message + + if (ids.has(id)) { + if (tracker.track(event.id, url)) { + options.onDuplicate?.(event, url) + } else if (isEventDeleted(event, url)) { + options.onDeleted?.(event, url) + } else if (!isEventValid(event, url)) { + options.onInvalid?.(event, url) + } else if (!matchFilters(options.filters, event)) { + options.onFiltered?.(event, url) + } else { + options.onEvent?.(event, url) + events.push(event) + } + } + } + + if (isRelayEose(message)) { + const [_, id] = message + + if (ids.has(id)) { + eose.add(id) + + if (eose.size === ids.size) { + options.onEose?.(url) + + if (options.autoClose) { + close() } } } + } + }), + ] - if (isRelayEose(message)) { - const [_, id] = message + // Listen to disconnects from any sockets + for (const socket of adapter.sockets) { + unsubscribers.push( + on(socket, SocketEvent.Status, (status: SocketStatus) => { + if (![SocketStatus.Open, SocketStatus.Opening].includes(status)) { + options.onDisconnect?.(socket.url) - if (this._ids.has(id)) { - this._eose.add(id) - - if (this._eose.size === this._ids.size) { - this.emit(RequestEvent.Eose) - - if (this.options.autoClose) { - this.close() - } - } + if (options.autoClose) { + close() } } }), ) - - // Listen to disconnects from any sockets - for (const socket of this._adapter.sockets) { - this._unsubscribers.push( - on(socket, SocketEvent.Status, (status: SocketStatus) => { - if (![SocketStatus.Open, SocketStatus.Opening].includes(status)) { - this.emit(RequestEvent.Disconnect) - - if (this.options.autoClose) { - this.close() - } - } - }), - ) - } - - // Timeout our subscription - if (this.options.timeout || this.options.autoClose) { - setTimeout(() => this.close(), this.options.timeout || 10000) - } - - // Handle abort signal - this.options.signal?.addEventListener("abort", () => this.close()) - - // Start asynchronously so the caller can set up listeners - yieldThread().then(() => { - for (const filter of this.options.filters) { - const id = `REQ-${randomId().slice(0, 8)}` - - this._ids.add(id) - this._adapter.send([ClientMessageType.Req, id, filter]) - } - }) } - close() { - if (this._closed) return + // Handle abort signal + options.signal?.addEventListener("abort", close) - for (const id of this._ids) { - this._adapter.send(["CLOSE", id]) - } - - this._closed = true - this.emit(RequestEvent.Close) - this._adapter.cleanup() - this._unsubscribers.map(call) - this.removeAllListeners() + // If we're auto-closing, make sure it happens even if the relay doesn't send an eose + // and the caller doesn't provide a signal, in order to avoid memory leaks + if (options.autoClose && !options.signal) { + setTimeout(close, 30_000) } + + for (const filter of options.filters) { + const id = `REQ-${randomId().slice(0, 8)}` + + ids.add(id) + adapter.send([ClientMessageType.Req, id, filter]) + } + + return deferred } -// MultiRequest - -export type MultiRequestEvents = { - [RequestEvent.Event]: (event: TrustedEvent, url: string) => void - [RequestEvent.Deleted]: (event: TrustedEvent, url: string) => void - [RequestEvent.Invalid]: (event: TrustedEvent, url: string) => void - [RequestEvent.Filtered]: (event: TrustedEvent, url: string) => void - [RequestEvent.Duplicate]: (event: TrustedEvent, url: string) => void - [RequestEvent.Disconnect]: (url: string) => void - [RequestEvent.Eose]: (url: string) => void - [RequestEvent.Close]: () => void -} - -export type MultiRequestOptions = Omit & { +export type RequestOptions = Omit & { relays: string[] threshold?: number } -export class MultiRequest extends EventEmitter { - _children: SingleRequest[] = [] - _closed = new Set() +export const request = async (options: RequestOptions) => { + const closed = new Set() + const tracker = new Tracker() + const relays = new Set(options.relays) + const ctrl = new AbortController() + const signal = options.signal ? AbortSignal.any([options.signal, ctrl.signal]) : ctrl.signal + const threshold = options.threshold || 1 + const promises: Promise[] = [] - constructor(options: MultiRequestOptions) { - super() - - const tracker = new Tracker() - const relays = new Set(options.relays) - const threshold = options.threshold || 1 - - if (relays.size !== options.relays.length) { - console.warn("Non-unique relays passed to MultiRequest") - } - - for (const relay of relays) { - const req = new SingleRequest({relay, tracker, ...options}) - - req.on(RequestEvent.Event, (event: TrustedEvent) => { - this.emit(RequestEvent.Event, event, relay) - }) - - req.on(RequestEvent.Deleted, (event: TrustedEvent) => { - this.emit(RequestEvent.Deleted, event, relay) - }) - - req.on(RequestEvent.Invalid, (event: TrustedEvent) => { - this.emit(RequestEvent.Invalid, event, relay) - }) - - req.on(RequestEvent.Filtered, (event: TrustedEvent) => { - this.emit(RequestEvent.Filtered, event, relay) - }) - - req.on(RequestEvent.Duplicate, (event: TrustedEvent) => { - this.emit(RequestEvent.Duplicate, event, relay) - }) - - req.on(RequestEvent.Disconnect, () => { - this.emit(RequestEvent.Disconnect, relay) - }) - - req.on(RequestEvent.Eose, () => { - this.emit(RequestEvent.Eose, relay) - }) - - req.on(RequestEvent.Close, () => { - this._closed.add(relay) - - if (this._closed.size >= relays.size * threshold) { - this.emit(RequestEvent.Close) - this.close() - } - }) - - this._children.push(req) - } + if (relays.size !== options.relays.length) { + console.warn("Non-unique relays passed to request") } - close() { - for (const child of this._children) { - child.close() - } - } + return flatten( + await Promise.all( + Array.from(relays).map(relay => + requestOne({ + ...options, + tracker, + signal, + relay, + onClose: () => { + closed.add(relay) + + if (closed.size >= relays.size * threshold) { + options.onClose?.() + ctrl.abort() + } + } + }) + ) + ) + ) } -export const request = (options: MultiRequestOptions) => new MultiRequest(options) export type LoaderOptions = { delay: number @@ -253,61 +186,118 @@ export type LoaderOptions = { export type LoadOptions = { relays: string[] filters: Filter[] + signal?: AbortSignal + onEvent?: (event: TrustedEvent, url: string) => void + onDisconnect?: (url: string) => void + onEose?: (url: string) => void + onClose?: () => void } /** * Creates a convenience function which returns a promise of events from a request. - * It may return early if filter cardinality is known, and it delays requests by - * 200 in order to implement batching - * @param options - MultiRequestOptions - * @returns - a promise containing an array of TrustedEvents + * It may return early if filter cardinality is known, and it delays requests in order + * to implement batching + * @param options - LoaderOptions + * @returns - a load function */ export const makeLoader = (options: LoaderOptions) => - batcher(options.delay, async (requests: LoadOptions[]) => { - const filtersByRelay = new Map() + batcher(options.delay, async (allRequests: LoadOptions[]) => { + const resultsByRequest = new Map>() + const eventsByRequest = new Map() + const requestsByRelay = new Map() + const controllersByRelay = new Map() + const signalsByRelay = new Map() + const closedRequestsByRelay = new Map>() + const closedRelaysByRequest = new Map>() + const relays = allRequests.flatMap(r => r.relays) + const threshold = options.threshold || 1 + const tracker = new Tracker() - for (const {filters, relays} of requests) { - for (const relay of relays) { - for (const filter of filters) { - pushToMapKey(filtersByRelay, relay, filter) - } + const close = (relay: string, request: LoadOptions) => { + addToMapKey(closedRequestsByRelay, relay, request) + addToMapKey(closedRelaysByRequest, request, relay) + + const closedRelays = closedRelaysByRequest.get(request)?.size || 0 + if (closedRelays >= request.relays.length * threshold) { + request.onClose?.() + resultsByRequest.get(request)?.resolve(eventsByRequest.get(request) || []) + } + + if (closedRequestsByRelay.get(relay)?.size === requestsByRelay.get(relay)?.length) { + controllersByRelay.get(relay)?.abort() } } - const tracker = new Tracker() - const events: TrustedEvent[] = [] + for (const request of allRequests) { + for (const relay of request.relays) { + pushToMapKey(requestsByRelay, relay, request) + resultsByRequest.set(request, defer()) + } - await Promise.all( - Array.from(filtersByRelay).map( - async ([relay, unmergedFilters]) => - new Promise(resolve => { - const filters = unionFilters(unmergedFilters) - const cardinality = - filters.length === 1 ? getFilterResultCardinality(filters[0]) : undefined - const req = new MultiRequest({ - filters, - tracker, - relays: [relay], - autoClose: true, - ...options, - }) + // Propagate abort when all requests have been closed for a given relay + request.signal?.addEventListener('abort', () => { + for (const relay of request.relays) { + close(relay, request) + } + }) + } - let count = 0 + // Create an abort controller for each relay + for (const relay of relays) { + const controller = new AbortController() + const signals = [controller.signal] - req.on(RequestEvent.Event, (event: TrustedEvent) => { - events.push(event) + if (options.timeout) { + signals.push(AbortSignal.timeout(options.timeout)) + } - if (++count === cardinality) { - resolve() + controllersByRelay.set(relay, controller) + signalsByRelay.set(relay, AbortSignal.any(signals)) + } + + Array.from(requestsByRelay).forEach( + async ([relay, requests]) => { + // Union all filters for a given request and send them together + const filters = unionFilters(requests.flatMap(r => r.filters)) + + // Propagate events to caller, but only for requests that have not been aborted + const getOpenRequests = () => + requests.filter(request => !closedRequestsByRelay.get(relay)?.has(request)) + + requestOne({ + relay, + filters, + tracker, + autoClose: true, + signal: signalsByRelay.get(relay), + context: options.context, + isEventValid: options.isEventValid, + isEventDeleted: options.isEventDeleted, + onEvent: (event: TrustedEvent, url: string) => { + for (const request of getOpenRequests()) { + if (matchFilters(request.filters, event)) { + pushToMapKey(eventsByRequest, request, event) + request.onEvent?.(event, url) + + // Calculate cardinality for unioned filters so that we can return early + if (request.filters.length === 1) { + const cardinality = getFilterResultCardinality(request.filters[0]) + + if (eventsByRequest.get(request)?.length === cardinality) { + close(relay, request) + } + } } - }) - - req.on(RequestEvent.Close, () => resolve()) - }), - ), + } + }, + onDisconnect: (url: string) => getOpenRequests().forEach(request => request.onDisconnect?.(url)), + onEose: (url: string) => getOpenRequests().forEach(request => request.onEose?.(url)), + onClose: () => requests.forEach(request => close(relay, request)), + }) + } ) - return requests.map(r => events.filter(event => matchFilters(r.filters, event))) + return allRequests.map(r => resultsByRequest.get(r)) }) export const load = makeLoader({delay: 200, timeout: 3000, threshold: 0.5}) diff --git a/packages/signer/src/signers/nip46.ts b/packages/signer/src/signers/nip46.ts index 3e90877..5086450 100644 --- a/packages/signer/src/signers/nip46.ts +++ b/packages/signer/src/signers/nip46.ts @@ -15,7 +15,7 @@ import { StampedEvent, NOSTR_CONNECT, } from "@welshman/util" -import {MultiRequest, MultiPublish, RequestEvent, AdapterContext} from "@welshman/net" +import {MultiPublish, request, AdapterContext} from "@welshman/net" import {ISigner, EncryptionImplementation, decrypt, hash, own} from "../util.js" import {Nip01Signer} from "./nip01.js" @@ -97,7 +97,7 @@ const popupManager = (() => { })() export class Nip46Receiver extends Emitter { - public sub?: MultiRequest + public abortController?: AbortController constructor( public signer: ISigner, @@ -108,33 +108,38 @@ export class Nip46Receiver extends Emitter { // start listening to the remote signer for incoming events // broadcast any event returned by the remote signer start = async () => { - if (this.sub) return + if (this.abortController) return + + this.abortController = new AbortController() const {relays, context} = this.params const userPubkey = await this.signer.getPubkey() const filters = [{kinds: [NOSTR_CONNECT], "#p": [userPubkey]}] - this.sub = new MultiRequest({relays, filters, context}) + request({ + relays, + filters, + context, + signal: this.abortController.signal, + onEvent: async (event: TrustedEvent, url: string) => { + const json = await decrypt(this.signer, event.pubkey, event.content) + const response = tryCatch(() => JSON.parse(json)) || {} - this.sub.on(RequestEvent.Event, async (event: TrustedEvent, url: string) => { - const json = await decrypt(this.signer, event.pubkey, event.content) - const response = tryCatch(() => JSON.parse(json)) || {} + // Delay errors in case there's a zombie signer out there clogging things up + if (response.error) { + await sleep(3000) + } - // Delay errors in case there's a zombie signer out there clogging things up - if (response.error) { - await sleep(3000) - } - - this.emit(Nip46Event.Receive, {...response, url, event} as Nip46Response) - }) - - this.sub.on(RequestEvent.Close, () => { - this.sub = undefined + this.emit(Nip46Event.Receive, {...response, url, event} as Nip46Response) + }, + onClose: () => { + this.abortController = undefined + }, }) } stop = () => { - this.sub?.close() + this.abortController?.abort() this.removeAllListeners() } }