From 69b8bb3b54e94bb7e14b691ff3e500ef54cf4f79 Mon Sep 17 00:00:00 2001 From: Jon Staab Date: Fri, 17 May 2024 13:57:49 -0700 Subject: [PATCH] Clean up subscribe a bit, add once --- packages/lib/Tools.ts | 11 ++++++ packages/net/Subscribe.ts | 81 +++++++++++++++++++-------------------- packages/util/Filters.ts | 11 +++++- 3 files changed, 61 insertions(+), 42 deletions(-) diff --git a/packages/lib/Tools.ts b/packages/lib/Tools.ts index c9ff5fd..95afd8e 100644 --- a/packages/lib/Tools.ts +++ b/packages/lib/Tools.ts @@ -254,6 +254,17 @@ export const chunks = (n: number, xs: T[]) => { return result } +export const once = (f: (...args: any) => void) => { + let called = false + + return (...args: any) => { + if (!called) { + called = true + f(...args) + } + } +} + export const batch = (t: number, f: (xs: T[]) => void) => { const xs: T[] = [] const cb = throttle(t, () => xs.length > 0 && f(xs.splice(0))) diff --git a/packages/net/Subscribe.ts b/packages/net/Subscribe.ts index 98e64c9..b06ebea 100644 --- a/packages/net/Subscribe.ts +++ b/packages/net/Subscribe.ts @@ -1,5 +1,5 @@ import type {Event} from 'nostr-tools' -import {Emitter, randomId, groupBy, batch, defer, uniq, uniqBy} from '@welshman/lib' +import {Emitter, randomId, once, groupBy, batch, defer, uniq, uniqBy} from '@welshman/lib' import type {Deferred} from '@welshman/lib' import {matchFilters, unionFilters} from '@welshman/util' import type {Filter} from '@welshman/util' @@ -178,32 +178,42 @@ export const executeSubscription = (sub: Subscription) => { const {result, request, emitter, tracker, controller} = sub const {timeout, filters, closeOnEose, relays, signal} = request const executor = NetworkContext.getExecutor(relays) + const completedRelays = new Set() const events: Event[] = [] - const completedRelays = new Set() - let completed: number + // Hook up our events - const complete = () => { - if (completed) return + emitter.on(SubscriptionEvent.Event, (url: string, event: Event) => { + events.push(event) + }) - // Mark as cleaned upp, unsubscribe our executor - completed = Date.now() - executorSub.unsubscribe() + emitter.on(SubscriptionEvent.Eose, (url: string) => { + completedRelays.add(url) - // Resolve our promise + if (closeOnEose && completedRelays.size === executor.target.connections.length) { + onComplete() + } + }) + + emitter.on(SubscriptionEvent.Close, (url: string) => { + completedRelays.add(url) + + if (completedRelays.size === executor.target.connections.length) { + onComplete() + } + }) + + emitter.on(SubscriptionEvent.Complete, () => { result.resolve(events) - - // Notify caller, clean up our event emitter - emitter.emit(SubscriptionEvent.Complete) + executorSub?.unsubscribe() emitter.removeAllListeners() - - // Remove our onClose handler from connections, since they are shared by many subs executor.target.connections.forEach((c: Connection) => c.off("close", onClose)) executor.target.cleanup() - } + }) + + // Functions for emitting events const onEvent = (url: string, event: Event) => { - // Check the signature and filters. If we've seen this event, don't re-validate. if (tracker.track(event.id, url)) { emitter.emit(SubscriptionEvent.Duplicate, url, event) } else if (NetworkContext.isDeleted(url, event)) { @@ -214,44 +224,37 @@ export const executeSubscription = (sub: Subscription) => { emitter.emit(SubscriptionEvent.InvalidSignature, url, event) } else { emitter.emit(SubscriptionEvent.Event, url, event) - events.push(event) } } - const onEose = (url: string) => { - completedRelays.add(url) - + const onEose = (url: string) => emitter.emit(SubscriptionEvent.Eose, url) - if (closeOnEose && completedRelays.size === executor.target.connections.length) { - complete() - } - } - - const onClose = (connection: Connection) => { - completedRelays.add(connection.url) - + const onClose = (connection: Connection) => emitter.emit(SubscriptionEvent.Close, connection.url) - if (completedRelays.size === executor.target.connections.length) { - complete() - } - } + const onComplete = once(() => emitter.emit(SubscriptionEvent.Complete)) // Listen for abort via caller signal - signal?.addEventListener('abort', complete) + signal?.addEventListener('abort', onComplete) // Listen for abort via our own internal signal - controller.signal.addEventListener('abort', complete) + controller.signal.addEventListener('abort', onComplete) // If we have a timeout, complete the subscription automatically - if (timeout) setTimeout(complete, timeout) + if (timeout) setTimeout(onComplete, timeout) // If one of our connections gets closed make sure to kill our sub executor.target.connections.forEach((c: Connection) => c.on('close', onClose)) - // Finally, start our subscription - const executorSub = executor.subscribe(filters, {onEvent, onEose}) + // Finally, start our subscription. If we didn't get any filters, don't even send the + // request, just close it. This can be valid when a caller fulfills a request themselves. + let executorSub: {unsubscribe: () => void} + if (filters.length > 0) { + executorSub = executor.subscribe(filters, {onEvent, onEose}) + } else { + onComplete() + } } export const executeSubscriptions = (subs: Subscription[]) => @@ -262,10 +265,6 @@ export const executeSubscriptionBatched = batch(800, executeSubscriptions) export const subscribe = (request: SubscribeRequest) => { const subscription: Subscription = makeSubscription(request) - if (request.filters.length === 0) { - throw new Error("Zero filters passed to subscribe") - } - if (request.immediate) { executeSubscription(subscription) } else { diff --git a/packages/util/Filters.ts b/packages/util/Filters.ts index 9c2cfcc..a709957 100644 --- a/packages/util/Filters.ts +++ b/packages/util/Filters.ts @@ -2,7 +2,7 @@ import {Event} from 'nostr-tools' import {matchFilter as nostrToolsMatchFilter} from 'nostr-tools' import {prop, avg, hash, groupBy, randomId, uniq} from '@welshman/lib' import type {HashedEvent, TrustedEvent} from './Events' -import {isReplaceableKind} from './Kinds' +import {isReplaceableKind, isPlainReplaceableKind} from './Kinds' import {Address, getAddress} from './Address' export const EPOCH = 1609459200 @@ -204,3 +204,12 @@ export const getFilterGenerality = (filter: Filter) => { export const guessFilterDelta = (filters: Filter[], max = 60 * 60 * 24 * 7) => Math.round(max * Math.max(0.005, 1 - avg(filters.map(getFilterGenerality)))) + +// If a filter is specifying ids, we know how many results to expect +export const getFilterResultCardinality = (filter: Filter) => { + if (filter.ids) { + return filter.ids.length + } + + return null +}