Clean up subscribe a bit, add once

This commit is contained in:
Jon Staab
2024-05-17 13:57:49 -07:00
parent 42b3b8b5e7
commit 69b8bb3b54
3 changed files with 61 additions and 42 deletions
+11
View File
@@ -254,6 +254,17 @@ export const chunks = <T>(n: number, xs: T[]) => {
return result return result
} }
export const once = (f: (...args: any) => void) => {
let called = false
return (...args: any) => {
if (!called) {
called = true
f(...args)
}
}
}
export const batch = <T>(t: number, f: (xs: T[]) => void) => { export const batch = <T>(t: number, f: (xs: T[]) => void) => {
const xs: T[] = [] const xs: T[] = []
const cb = throttle(t, () => xs.length > 0 && f(xs.splice(0))) const cb = throttle(t, () => xs.length > 0 && f(xs.splice(0)))
+40 -41
View File
@@ -1,5 +1,5 @@
import type {Event} from 'nostr-tools' import type {Event} from 'nostr-tools'
import {Emitter, randomId, groupBy, batch, defer, uniq, uniqBy} from '@welshman/lib' import {Emitter, randomId, once, groupBy, batch, defer, uniq, uniqBy} from '@welshman/lib'
import type {Deferred} from '@welshman/lib' import type {Deferred} from '@welshman/lib'
import {matchFilters, unionFilters} from '@welshman/util' import {matchFilters, unionFilters} from '@welshman/util'
import type {Filter} from '@welshman/util' import type {Filter} from '@welshman/util'
@@ -178,32 +178,42 @@ export const executeSubscription = (sub: Subscription) => {
const {result, request, emitter, tracker, controller} = sub const {result, request, emitter, tracker, controller} = sub
const {timeout, filters, closeOnEose, relays, signal} = request const {timeout, filters, closeOnEose, relays, signal} = request
const executor = NetworkContext.getExecutor(relays) const executor = NetworkContext.getExecutor(relays)
const completedRelays = new Set()
const events: Event[] = [] const events: Event[] = []
const completedRelays = new Set() // Hook up our events
let completed: number
const complete = () => { emitter.on(SubscriptionEvent.Event, (url: string, event: Event) => {
if (completed) return events.push(event)
})
// Mark as cleaned upp, unsubscribe our executor emitter.on(SubscriptionEvent.Eose, (url: string) => {
completed = Date.now() completedRelays.add(url)
executorSub.unsubscribe()
// Resolve our promise if (closeOnEose && completedRelays.size === executor.target.connections.length) {
onComplete()
}
})
emitter.on(SubscriptionEvent.Close, (url: string) => {
completedRelays.add(url)
if (completedRelays.size === executor.target.connections.length) {
onComplete()
}
})
emitter.on(SubscriptionEvent.Complete, () => {
result.resolve(events) result.resolve(events)
executorSub?.unsubscribe()
// Notify caller, clean up our event emitter
emitter.emit(SubscriptionEvent.Complete)
emitter.removeAllListeners() emitter.removeAllListeners()
// Remove our onClose handler from connections, since they are shared by many subs
executor.target.connections.forEach((c: Connection) => c.off("close", onClose)) executor.target.connections.forEach((c: Connection) => c.off("close", onClose))
executor.target.cleanup() executor.target.cleanup()
} })
// Functions for emitting events
const onEvent = (url: string, event: Event) => { const onEvent = (url: string, event: Event) => {
// Check the signature and filters. If we've seen this event, don't re-validate.
if (tracker.track(event.id, url)) { if (tracker.track(event.id, url)) {
emitter.emit(SubscriptionEvent.Duplicate, url, event) emitter.emit(SubscriptionEvent.Duplicate, url, event)
} else if (NetworkContext.isDeleted(url, event)) { } else if (NetworkContext.isDeleted(url, event)) {
@@ -214,44 +224,37 @@ export const executeSubscription = (sub: Subscription) => {
emitter.emit(SubscriptionEvent.InvalidSignature, url, event) emitter.emit(SubscriptionEvent.InvalidSignature, url, event)
} else { } else {
emitter.emit(SubscriptionEvent.Event, url, event) emitter.emit(SubscriptionEvent.Event, url, event)
events.push(event)
} }
} }
const onEose = (url: string) => { const onEose = (url: string) =>
completedRelays.add(url)
emitter.emit(SubscriptionEvent.Eose, url) emitter.emit(SubscriptionEvent.Eose, url)
if (closeOnEose && completedRelays.size === executor.target.connections.length) { const onClose = (connection: Connection) =>
complete()
}
}
const onClose = (connection: Connection) => {
completedRelays.add(connection.url)
emitter.emit(SubscriptionEvent.Close, connection.url) emitter.emit(SubscriptionEvent.Close, connection.url)
if (completedRelays.size === executor.target.connections.length) { const onComplete = once(() => emitter.emit(SubscriptionEvent.Complete))
complete()
}
}
// Listen for abort via caller signal // Listen for abort via caller signal
signal?.addEventListener('abort', complete) signal?.addEventListener('abort', onComplete)
// Listen for abort via our own internal signal // Listen for abort via our own internal signal
controller.signal.addEventListener('abort', complete) controller.signal.addEventListener('abort', onComplete)
// If we have a timeout, complete the subscription automatically // If we have a timeout, complete the subscription automatically
if (timeout) setTimeout(complete, timeout) if (timeout) setTimeout(onComplete, timeout)
// If one of our connections gets closed make sure to kill our sub // If one of our connections gets closed make sure to kill our sub
executor.target.connections.forEach((c: Connection) => c.on('close', onClose)) executor.target.connections.forEach((c: Connection) => c.on('close', onClose))
// Finally, start our subscription // Finally, start our subscription. If we didn't get any filters, don't even send the
const executorSub = executor.subscribe(filters, {onEvent, onEose}) // request, just close it. This can be valid when a caller fulfills a request themselves.
let executorSub: {unsubscribe: () => void}
if (filters.length > 0) {
executorSub = executor.subscribe(filters, {onEvent, onEose})
} else {
onComplete()
}
} }
export const executeSubscriptions = (subs: Subscription[]) => export const executeSubscriptions = (subs: Subscription[]) =>
@@ -262,10 +265,6 @@ export const executeSubscriptionBatched = batch(800, executeSubscriptions)
export const subscribe = (request: SubscribeRequest) => { export const subscribe = (request: SubscribeRequest) => {
const subscription: Subscription = makeSubscription(request) const subscription: Subscription = makeSubscription(request)
if (request.filters.length === 0) {
throw new Error("Zero filters passed to subscribe")
}
if (request.immediate) { if (request.immediate) {
executeSubscription(subscription) executeSubscription(subscription)
} else { } else {
+10 -1
View File
@@ -2,7 +2,7 @@ import {Event} from 'nostr-tools'
import {matchFilter as nostrToolsMatchFilter} from 'nostr-tools' import {matchFilter as nostrToolsMatchFilter} from 'nostr-tools'
import {prop, avg, hash, groupBy, randomId, uniq} from '@welshman/lib' import {prop, avg, hash, groupBy, randomId, uniq} from '@welshman/lib'
import type {HashedEvent, TrustedEvent} from './Events' import type {HashedEvent, TrustedEvent} from './Events'
import {isReplaceableKind} from './Kinds' import {isReplaceableKind, isPlainReplaceableKind} from './Kinds'
import {Address, getAddress} from './Address' import {Address, getAddress} from './Address'
export const EPOCH = 1609459200 export const EPOCH = 1609459200
@@ -204,3 +204,12 @@ export const getFilterGenerality = (filter: Filter) => {
export const guessFilterDelta = (filters: Filter[], max = 60 * 60 * 24 * 7) => export const guessFilterDelta = (filters: Filter[], max = 60 * 60 * 24 * 7) =>
Math.round(max * Math.max(0.005, 1 - avg(filters.map(getFilterGenerality)))) Math.round(max * Math.max(0.005, 1 - avg(filters.map(getFilterGenerality))))
// If a filter is specifying ids, we know how many results to expect
export const getFilterResultCardinality = (filter: Filter) => {
if (filter.ids) {
return filter.ids.length
}
return null
}