diff --git a/packages/app/src/subscribe.ts b/packages/app/src/subscribe.ts index a66b8e9..64764d2 100644 --- a/packages/app/src/subscribe.ts +++ b/packages/app/src/subscribe.ts @@ -40,7 +40,7 @@ export const subscribe = (request: PartialSubscribeRequest) => { // Keep cached results async so the caller can set up handlers setTimeout(() => { for (const event of events) { - sub.emitter.emit(SubscriptionEvent.Event, LOCAL_RELAY_URL, event) + sub.emit(SubscriptionEvent.Event, LOCAL_RELAY_URL, event) } }) @@ -52,6 +52,6 @@ export const load = (request: PartialSubscribeRequest) => const sub = subscribe({closeOnEose: true, timeout: ctx.app.requestTimeout, ...request}) const events: TrustedEvent[] = [] - sub.emitter.on(SubscriptionEvent.Event, (url: string, e: TrustedEvent) => events.push(e)) - sub.emitter.on(SubscriptionEvent.Complete, () => resolve(events)) + sub.on(SubscriptionEvent.Event, (url: string, e: TrustedEvent) => events.push(e)) + sub.on(SubscriptionEvent.Complete, () => resolve(events)) }) diff --git a/packages/dvm/README.md b/packages/dvm/README.md index 3311367..301cf00 100644 --- a/packages/dvm/README.md +++ b/packages/dvm/README.md @@ -53,7 +53,7 @@ const sub = subscribe({ }) // Push event ids to our suggestions -sub.emitter.on('event', (url, e) => tags.push(["e", e.id, url])) +sub.on('event', (url, e) => tags.push(["e", e.id, url])) const dvm = new DVM({ // The private key used to sign events diff --git a/packages/dvm/src/handler.ts b/packages/dvm/src/handler.ts index ed8706b..47b9187 100644 --- a/packages/dvm/src/handler.ts +++ b/packages/dvm/src/handler.ts @@ -49,8 +49,8 @@ export class DVM { const filters = [filter] const sub = subscribe({relays, filters}) - sub.emitter.on("event", (url: string, e: TrustedEvent) => this.onEvent(e)) - sub.emitter.on("complete", () => resolve()) + sub.on("event", (url: string, e: TrustedEvent) => this.onEvent(e)) + sub.on("complete", () => resolve()) }) } } diff --git a/packages/dvm/src/request.ts b/packages/dvm/src/request.ts index 7847467..ebc21ea 100644 --- a/packages/dvm/src/request.ts +++ b/packages/dvm/src/request.ts @@ -33,7 +33,7 @@ export const makeDvmRequest = (request: DVMRequestOptions) => { const sub = subscribe({relays, timeout, filters}) const pub = publish({event, relays, timeout}) - sub.emitter.on(SubscriptionEvent.Event, (url: string, event: TrustedEvent) => { + sub.on(SubscriptionEvent.Event, (url: string, event: TrustedEvent) => { if (event.kind === 7000) { emitter.emit(DVMEvent.Progress, url, event) } else { diff --git a/packages/net/README.md b/packages/net/README.md index b66b2d2..b543670 100644 --- a/packages/net/README.md +++ b/packages/net/README.md @@ -18,7 +18,7 @@ const sub = subscribe({ timeout: 10000, }) -sub.emitter.on(SubscriptionEvent.Event, (url: string, event: TrustedEvent) => { +sub.on(SubscriptionEvent.Event, (url: string, event: TrustedEvent) => { console.log(url, event) sub.close() }) diff --git a/packages/net/src/Subscribe.ts b/packages/net/src/Subscribe.ts index 11ff60a..8fceb18 100644 --- a/packages/net/src/Subscribe.ts +++ b/packages/net/src/Subscribe.ts @@ -8,6 +8,7 @@ import { } from "@welshman/util" import type {Filter} from "@welshman/util" import {Tracker} from "./Tracker.js" +import {Executor} from "./Executor.js" import {Connection} from "./Connection.js" import {ConnectionEvent} from "./ConnectionEvent.js" @@ -51,25 +52,128 @@ export type SubscribeRequest = RelaysAndFilters & { authTimeout?: number } -export type Subscription = { - id: string - emitter: Emitter - tracker: Tracker - controller: AbortController - request: SubscribeRequest - close: () => void -} +export class Subscription extends Emitter { + id = randomId() + controller = new AbortController() + tracker = new Tracker() + completed = new Set() + executorSubs: {unsubscribe: () => void}[] = [] + executor: Executor -export const makeSubscription = (request: SubscribeRequest) => { - const id = randomId() - const emitter = new Emitter() - const controller = new AbortController() - const tracker = request.tracker || new Tracker() - const close = () => controller.abort() + constructor(readonly request: SubscribeRequest) { + super() - emitter.setMaxListeners(100) + if (request.tracker) { + this.tracker = request.tracker + } - return {id, request, emitter, tracker, controller, close} + this.setMaxListeners(100) + this.executor = ctx.net.getExecutor(request.relays) + } + + onEvent = (url: string, event: TrustedEvent) => { + const {filters} = this.request + + if (this.tracker.track(event.id, url)) { + this.emit(SubscriptionEvent.Duplicate, url, event) + } else if (ctx.net.isDeleted(url, event)) { + this.emit(SubscriptionEvent.DeletedEvent, url, event) + } else if (!ctx.net.matchFilters(url, filters, event)) { + this.emit(SubscriptionEvent.FailedFilter, url, event) + } else if (!ctx.net.isValid(url, event)) { + this.emit(SubscriptionEvent.Invalid, url, event) + } else { + this.emit(SubscriptionEvent.Event, url, event) + } + } + + onEose = (url: string) => { + const {closeOnEose, relays} = this.request + + this.emit(SubscriptionEvent.Eose, url) + + this.completed.add(url) + + if (closeOnEose && this.completed.size === uniq(relays).length) { + this.onComplete() + } + } + + onClose = (connection: Connection) => { + const {relays} = this.request + + this.emit(SubscriptionEvent.Close, connection.url) + + this.completed.add(connection.url) + + if (this.completed.size === uniq(relays).length) { + this.onComplete() + } + } + + onComplete = once(() => { + this.emit(SubscriptionEvent.Complete) + this.executorSubs.forEach(sub => sub.unsubscribe()) + this.removeAllListeners() + this.executor.target.cleanup() + this.executor.target.connections.forEach((c: Connection) => { + c.off(ConnectionEvent.Close, this.onClose) + }) + }) + + execute = async () => { + const {filters, signal, timeout, authTimeout = 0} = this.request + + // If we didn't get any filters, don't even send the request, just close it. + // This can be valid when a caller fulfills a request themselves but still needs a subscription object. + if (filters.length === 0) { + this.emit(SubscriptionEvent.Send) + this.onComplete() + + return + } + + // Hook up our events + + // Listen for abort via caller signal + signal?.addEventListener("abort", this.onComplete) + + // Listen for abort via our own internal signal + this.controller.signal.addEventListener("abort", this.onComplete) + + // If we have a timeout, complete the subscription automatically + if (timeout) setTimeout(this.onComplete, timeout + authTimeout) + + // If one of our connections gets closed make sure to kill our sub + this.executor.target.connections.forEach((c: Connection) => + c.on(ConnectionEvent.Close, this.onClose), + ) + + // Wait for auth if needed + await Promise.all( + this.executor.target.connections.map(async (connection: Connection) => { + if (authTimeout) { + await connection.auth.attempt(authTimeout) + } + }), + ) + + // If we send too many filters in a request relays will refuse to respond. REQs are rate + // limited client-side by Connection, so this will throttle concurrent requests. + for (const filtersChunk of chunk(8, filters)) { + this.executorSubs.push( + this.executor.subscribe(filtersChunk, { + onEvent: this.onEvent, + onEose: this.onEose, + }), + ) + } + + // Notify that we've sent the subscription + this.emit(SubscriptionEvent.Send) + } + + close = () => this.controller.abort() } export const calculateSubscriptionGroup = (sub: Subscription) => { @@ -83,7 +187,7 @@ export const calculateSubscriptionGroup = (sub: Subscription) => { } export const mergeSubscriptions = (subs: Subscription[]) => { - const mergedSub = makeSubscription({ + const mergedSub = new Subscription({ relays: uniq(subs.flatMap(sub => sub.request.relays)), filters: unionFilters(subs.flatMap(sub => sub.request.filters)), timeout: max(subs.map(sub => sub.request.timeout || 0)), @@ -101,27 +205,27 @@ export const mergeSubscriptions = (subs: Subscription[]) => { for (const sub of subs) { // Propagate events, but avoid duplicates - sub.emitter.on(SubscriptionEvent.Event, (url: string, event: TrustedEvent) => { + sub.on(SubscriptionEvent.Event, (url: string, event: TrustedEvent) => { if (!mergedSub.tracker.track(event.id, url)) { - mergedSub.emitter.emit(SubscriptionEvent.Event, url, event) + mergedSub.emit(SubscriptionEvent.Event, url, event) } }) // Propagate subscription completion. Since we split subs by relay, we need to wait // until all relays are completed before we notify - sub.emitter.on(SubscriptionEvent.Complete, () => { + sub.on(SubscriptionEvent.Complete, () => { completedSubs.add(sub.id) if (completedSubs.size === subs.length) { - mergedSub.emitter.emit(SubscriptionEvent.Complete) + mergedSub.emit(SubscriptionEvent.Complete) } - sub.emitter.removeAllListeners() + sub.removeAllListeners() }) // Propagate everything else too const propagateEvent = (type: SubscriptionEvent) => - sub.emitter.on(type, (...args) => mergedSub.emitter.emit(type, ...args)) + sub.on(type, (...args) => mergedSub.emit(type, ...args)) propagateEvent(SubscriptionEvent.Duplicate) propagateEvent(SubscriptionEvent.DeletedEvent) @@ -149,7 +253,7 @@ export const optimizeSubscriptions = (subs: Subscription[]) => { for (const {relays, filters} of ctx.net.optimizeSubscriptions(group)) { for (const filter of filters) { - const mergedSub = makeSubscription({ + const mergedSub = new Subscription({ filters: [filter], relays, timeout, @@ -170,20 +274,20 @@ export const optimizeSubscriptions = (subs: Subscription[]) => { controller.signal.addEventListener("abort", onAbort) } - mergedSub.emitter.on(SubscriptionEvent.Event, (url: string, event: TrustedEvent) => { + mergedSub.on(SubscriptionEvent.Event, (url: string, event: TrustedEvent) => { for (const sub of group) { if (matchFilters(sub.request.filters, event) && !sub.tracker.track(event.id, url)) { - sub.emitter.emit(SubscriptionEvent.Event, url, event) + sub.emit(SubscriptionEvent.Event, url, event) } } }) // Pass events back to caller const propagateEvent = (type: SubscriptionEvent) => - mergedSub.emitter.on(type, (url: string, event: TrustedEvent) => { + mergedSub.on(type, (url: string, event: TrustedEvent) => { for (const sub of group) { if (matchFilters(sub.request.filters, event)) { - sub.emitter.emit(type, url, event) + sub.emit(type, url, event) } } }) @@ -193,7 +297,7 @@ export const optimizeSubscriptions = (subs: Subscription[]) => { propagateEvent(SubscriptionEvent.Invalid) const propagateFinality = (type: SubscriptionEvent, subIds: Set) => - mergedSub.emitter.on(type, (...args: any[]) => { + mergedSub.on(type, (...args: any[]) => { subIds.add(mergedSub.id) // Wait for all subscriptions to complete before reporting finality to the caller. @@ -202,12 +306,12 @@ export const optimizeSubscriptions = (subs: Subscription[]) => { // at any given time. if (subIds.size === mergedSubs.length) { for (const sub of group) { - sub.emitter.emit(type, ...args) + sub.emit(type, ...args) } } if (type === SubscriptionEvent.Complete) { - mergedSub.emitter.removeAllListeners() + mergedSub.removeAllListeners() } }) @@ -224,103 +328,11 @@ export const optimizeSubscriptions = (subs: Subscription[]) => { }) } -const _executeSubscription = (sub: Subscription) => { - const {request, emitter, tracker, controller} = sub - const {filters, closeOnEose, relays, signal, timeout, authTimeout = 0} = request - const executor = ctx.net.getExecutor(relays) - const subs: {unsubscribe: () => void}[] = [] - const completedRelays = new Set() - - // Hook up our events - - emitter.on(SubscriptionEvent.Eose, (url: string) => { - completedRelays.add(url) - - if (closeOnEose && completedRelays.size === uniq(relays).length) { - onComplete() - } - }) - - emitter.on(SubscriptionEvent.Close, (url: string) => { - completedRelays.add(url) - - if (completedRelays.size === uniq(relays).length) { - onComplete() - } - }) - - emitter.on(SubscriptionEvent.Complete, () => { - emitter.removeAllListeners() - subs.forEach(sub => sub.unsubscribe()) - executor.target.connections.forEach(c => c.off(ConnectionEvent.Close, onClose)) - executor.target.cleanup() - }) - - // Functions for emitting events - - const onEvent = (url: string, event: TrustedEvent) => { - if (tracker.track(event.id, url)) { - emitter.emit(SubscriptionEvent.Duplicate, url, event) - } else if (ctx.net.isDeleted(url, event)) { - emitter.emit(SubscriptionEvent.DeletedEvent, url, event) - } else if (!ctx.net.matchFilters(url, filters, event)) { - emitter.emit(SubscriptionEvent.FailedFilter, url, event) - } else if (!ctx.net.isValid(url, event)) { - emitter.emit(SubscriptionEvent.Invalid, url, event) - } else { - emitter.emit(SubscriptionEvent.Event, url, event) - } - } - - const onEose = (url: string) => emitter.emit(SubscriptionEvent.Eose, url) - - const onClose = (connection: Connection) => emitter.emit(SubscriptionEvent.Close, connection.url) - - const onComplete = once(() => emitter.emit(SubscriptionEvent.Complete)) - - // Listen for abort via caller signal - signal?.addEventListener("abort", onComplete) - - // Listen for abort via our own internal signal - controller.signal.addEventListener("abort", onComplete) - - // If we have a timeout, complete the subscription automatically - if (timeout) setTimeout(onComplete, timeout + authTimeout) - - // If one of our connections gets closed make sure to kill our sub - executor.target.connections.forEach((c: Connection) => { - c.on(ConnectionEvent.Close, onClose) - }) - - // Finally, start our subscription. If we didn't get any filters, don't even send the - // request, just close it. This can be valid when a caller fulfills a request themselves. - if (filters.length > 0) { - Promise.all( - executor.target.connections.map(async (connection: Connection) => { - if (authTimeout) { - await connection.auth.attempt(authTimeout) - } - }), - ).then(() => { - // If we send too many filters in a request relays will refuse to respond. REQs are rate - // limited client-side by Connection, so this will throttle concurrent requests. - for (const filtersChunk of chunk(8, filters)) { - subs.push(executor.subscribe(filtersChunk, {onEvent, onEose})) - } - - emitter.emit(SubscriptionEvent.Send) - }) - } else { - emitter.emit(SubscriptionEvent.Send) - onComplete() - } -} - export const executeSubscription = (sub: Subscription) => - optimizeSubscriptions([sub]).forEach(_executeSubscription) + optimizeSubscriptions([sub]).forEach(sub => sub.execute()) export const executeSubscriptions = (subs: Subscription[]) => - optimizeSubscriptions(subs).forEach(_executeSubscription) + optimizeSubscriptions(subs).forEach(sub => sub.execute()) export const executeSubscriptionBatched = (() => { const subs: Subscription[] = [] @@ -354,7 +366,7 @@ export const subscribe = ({ onComplete, ...request }: SubscribeRequestWithHandlers) => { - const sub: Subscription = makeSubscription({delay: 50, ...request}) + const sub: Subscription = new Subscription({delay: 50, ...request}) for (const relay of request.relays) { if (relay !== LOCAL_RELAY_URL && relay !== normalizeRelayUrl(relay)) { @@ -369,11 +381,10 @@ export const subscribe = ({ } // Signature for onEvent is different from emitter signature for historical reasons and convenience - if (onEvent) - sub.emitter.on(SubscriptionEvent.Event, (url: string, event: TrustedEvent) => onEvent(event)) - if (onEose) sub.emitter.on(SubscriptionEvent.Eose, onEose) - if (onClose) sub.emitter.on(SubscriptionEvent.Close, onClose) - if (onComplete) sub.emitter.on(SubscriptionEvent.Complete, onComplete) + if (onEvent) sub.on(SubscriptionEvent.Event, (url: string, event: TrustedEvent) => onEvent(event)) + if (onEose) sub.on(SubscriptionEvent.Eose, onEose) + if (onClose) sub.on(SubscriptionEvent.Close, onClose) + if (onComplete) sub.on(SubscriptionEvent.Complete, onComplete) return sub } diff --git a/packages/signer/src/signers/nip46.ts b/packages/signer/src/signers/nip46.ts index 66ab5da..e756a99 100644 --- a/packages/signer/src/signers/nip46.ts +++ b/packages/signer/src/signers/nip46.ts @@ -114,9 +114,9 @@ export class Nip46Receiver extends Emitter { this.sub = subscribe({relays: this.params.relays, filters}) return new Promise(resolve => { - this.sub!.emitter.on(SubscriptionEvent.Send, resolve) + this.sub!.on(SubscriptionEvent.Send, resolve) - this.sub!.emitter.on(SubscriptionEvent.Event, async (url: string, event: TrustedEvent) => { + this.sub!.on(SubscriptionEvent.Event, async (url: string, event: TrustedEvent) => { const json = await decrypt(this.signer, event.pubkey, event.content) const response = tryCatch(() => JSON.parse(json)) || {} @@ -128,7 +128,7 @@ export class Nip46Receiver extends Emitter { this.emit(Nip46Event.Receive, {...response, url, event} as Nip46Response) }) - this.sub!.emitter.on(SubscriptionEvent.Complete, () => { + this.sub!.on(SubscriptionEvent.Complete, () => { this.sub = undefined }) })