diff --git a/packages/net/Publish.ts b/packages/net/Publish.ts index 13b18f7..8c86894 100644 --- a/packages/net/Publish.ts +++ b/packages/net/Publish.ts @@ -9,6 +9,7 @@ export enum PublishStatus { Success = "success", Failure = "failure", Timeout = "timeout", + Aborted = "aborted", } export type PublishStatusMap = Map @@ -16,6 +17,7 @@ export type PublishStatusMap = Map export type PublishRequest = { event: Event relays: string[] + signal?: AbortSignal timeout?: number verb?: "EVENT" | "AUTH" } @@ -44,8 +46,16 @@ export const publish = (request: PublishRequest) => { const event = asEvent(request.event) const executor = NetworkContext.getExecutor(request.relays) + const abort = (reason: PublishStatus) => () => { + for (const [url, status] of pub.status.entries()) { + if (status === PublishStatus.Pending) { + pub.emitter.emit(reason, url) + } + } + } + // Listen to updates and keep status up to date. Every time there's an update, check to - // see if we're done. If we are, clear our timeout, executor, etc. + // see if we're done. If we are, clean everything up pub.emitter.on("*", (status: PublishStatus, url: string) => { pub.status.set(url, status) @@ -57,21 +67,18 @@ export const publish = (request: PublishRequest) => { } }) - // Start everything off as pending + // Start everything off as pending. Do it asynchronously to avoid breaking caller assumptions setTimeout(() => { for (const relay of request.relays) { pub.emitter.emit(PublishStatus.Pending, relay) } }) - // Set a timeout - const timeout = setTimeout(() => { - for (const [url, status] of pub.status.entries()) { - if (status === PublishStatus.Pending) { - pub.emitter.emit(PublishStatus.Timeout, url) - } - } - }, request.timeout || 10_000) + // Give up after a specified time + const timeout = setTimeout(abort(PublishStatus.Timeout), request.timeout || 10_000) + + // If we have a signal, use it + request.signal?.addEventListener('abort', abort(PublishStatus.Aborted)) // Delegate to our executor const executorSub = executor.publish(event, { diff --git a/packages/net/Subscribe.ts b/packages/net/Subscribe.ts index 43bca72..71ca5ab 100644 --- a/packages/net/Subscribe.ts +++ b/packages/net/Subscribe.ts @@ -25,7 +25,6 @@ export enum SubscriptionEvent { Eose = "eose", Close = "close", Event = "event", - Abort = "abort", Complete = "complete", Duplicate = "duplicate", DeletedEvent = "deleted-event", @@ -46,6 +45,7 @@ export type Subscription = { id: string emitter: Emitter tracker: Tracker + controller: AbortController result: Deferred request: SubscribeRequest close: () => void @@ -54,13 +54,14 @@ export type Subscription = { export const makeSubscription = (request: SubscribeRequest) => { const id = randomId() const emitter = new Emitter() + const controller = new AbortController() const result = defer() const tracker = request.tracker || new Tracker() - const close = () => emitter.emit('abort') + const close = () => controller.abort() emitter.setMaxListeners(100) - return {id, request, emitter, tracker, result, close} + return {id, request, emitter, tracker, controller, result, close} } export const calculateSubscriptionGroup = (sub: Subscription) => { @@ -86,9 +87,8 @@ export const mergeSubscriptions = (subs: Subscription[]) => { filters: unionFilters(callerSubs.flatMap((sub: Subscription) => sub.request.filters)), }) - for (const {id, emitter} of callerSubs) { - // Propagate abort event from the caller to the merged subscription - emitter.on(SubscriptionEvent.Abort, () => { + for (const {id, controller} of callerSubs) { + controller.signal.addEventListener('abort', () => { abortedSubs.add(id) if (abortedSubs.size === callerSubs.length) { @@ -171,7 +171,7 @@ export const mergeSubscriptions = (subs: Subscription[]) => { } export const executeSubscription = (sub: Subscription) => { - const {result, request, emitter, tracker} = sub + const {result, request, emitter, tracker, controller} = sub const {timeout, filters, closeOnEose, relays} = request const executor = NetworkContext.getExecutor(relays) const events: Event[] = [] @@ -234,8 +234,8 @@ export const executeSubscription = (sub: Subscription) => { } } - // Allow the caller to cancel the subscription - emitter.on(SubscriptionEvent.Abort, complete) + // Listen for abort via signal + controller.signal.addEventListener('abort', complete) // If we have a timeout, complete the subscription automatically if (timeout) setTimeout(complete, timeout)