Move subscription to a class

This commit is contained in:
Jon Staab
2025-01-24 10:05:04 -08:00
parent 4b0aa94b4b
commit 6b9e5ae77f
7 changed files with 153 additions and 142 deletions
+3 -3
View File
@@ -40,7 +40,7 @@ export const subscribe = (request: PartialSubscribeRequest) => {
// Keep cached results async so the caller can set up handlers // Keep cached results async so the caller can set up handlers
setTimeout(() => { setTimeout(() => {
for (const event of events) { for (const event of events) {
sub.emitter.emit(SubscriptionEvent.Event, LOCAL_RELAY_URL, event) sub.emit(SubscriptionEvent.Event, LOCAL_RELAY_URL, event)
} }
}) })
@@ -52,6 +52,6 @@ export const load = (request: PartialSubscribeRequest) =>
const sub = subscribe({closeOnEose: true, timeout: ctx.app.requestTimeout, ...request}) const sub = subscribe({closeOnEose: true, timeout: ctx.app.requestTimeout, ...request})
const events: TrustedEvent[] = [] const events: TrustedEvent[] = []
sub.emitter.on(SubscriptionEvent.Event, (url: string, e: TrustedEvent) => events.push(e)) sub.on(SubscriptionEvent.Event, (url: string, e: TrustedEvent) => events.push(e))
sub.emitter.on(SubscriptionEvent.Complete, () => resolve(events)) sub.on(SubscriptionEvent.Complete, () => resolve(events))
}) })
+1 -1
View File
@@ -53,7 +53,7 @@ const sub = subscribe({
}) })
// Push event ids to our suggestions // Push event ids to our suggestions
sub.emitter.on('event', (url, e) => tags.push(["e", e.id, url])) sub.on('event', (url, e) => tags.push(["e", e.id, url]))
const dvm = new DVM({ const dvm = new DVM({
// The private key used to sign events // The private key used to sign events
+2 -2
View File
@@ -49,8 +49,8 @@ export class DVM {
const filters = [filter] const filters = [filter]
const sub = subscribe({relays, filters}) const sub = subscribe({relays, filters})
sub.emitter.on("event", (url: string, e: TrustedEvent) => this.onEvent(e)) sub.on("event", (url: string, e: TrustedEvent) => this.onEvent(e))
sub.emitter.on("complete", () => resolve()) sub.on("complete", () => resolve())
}) })
} }
} }
+1 -1
View File
@@ -33,7 +33,7 @@ export const makeDvmRequest = (request: DVMRequestOptions) => {
const sub = subscribe({relays, timeout, filters}) const sub = subscribe({relays, timeout, filters})
const pub = publish({event, relays, timeout}) const pub = publish({event, relays, timeout})
sub.emitter.on(SubscriptionEvent.Event, (url: string, event: TrustedEvent) => { sub.on(SubscriptionEvent.Event, (url: string, event: TrustedEvent) => {
if (event.kind === 7000) { if (event.kind === 7000) {
emitter.emit(DVMEvent.Progress, url, event) emitter.emit(DVMEvent.Progress, url, event)
} else { } else {
+1 -1
View File
@@ -18,7 +18,7 @@ const sub = subscribe({
timeout: 10000, timeout: 10000,
}) })
sub.emitter.on(SubscriptionEvent.Event, (url: string, event: TrustedEvent) => { sub.on(SubscriptionEvent.Event, (url: string, event: TrustedEvent) => {
console.log(url, event) console.log(url, event)
sub.close() sub.close()
}) })
+142 -131
View File
@@ -8,6 +8,7 @@ import {
} from "@welshman/util" } from "@welshman/util"
import type {Filter} from "@welshman/util" import type {Filter} from "@welshman/util"
import {Tracker} from "./Tracker.js" import {Tracker} from "./Tracker.js"
import {Executor} from "./Executor.js"
import {Connection} from "./Connection.js" import {Connection} from "./Connection.js"
import {ConnectionEvent} from "./ConnectionEvent.js" import {ConnectionEvent} from "./ConnectionEvent.js"
@@ -51,25 +52,128 @@ export type SubscribeRequest = RelaysAndFilters & {
authTimeout?: number authTimeout?: number
} }
export type Subscription = { export class Subscription extends Emitter {
id: string id = randomId()
emitter: Emitter controller = new AbortController()
tracker: Tracker tracker = new Tracker()
controller: AbortController completed = new Set()
request: SubscribeRequest executorSubs: {unsubscribe: () => void}[] = []
close: () => void executor: Executor
}
export const makeSubscription = (request: SubscribeRequest) => { constructor(readonly request: SubscribeRequest) {
const id = randomId() super()
const emitter = new Emitter()
const controller = new AbortController()
const tracker = request.tracker || new Tracker()
const close = () => controller.abort()
emitter.setMaxListeners(100) if (request.tracker) {
this.tracker = request.tracker
}
return {id, request, emitter, tracker, controller, close} this.setMaxListeners(100)
this.executor = ctx.net.getExecutor(request.relays)
}
onEvent = (url: string, event: TrustedEvent) => {
const {filters} = this.request
if (this.tracker.track(event.id, url)) {
this.emit(SubscriptionEvent.Duplicate, url, event)
} else if (ctx.net.isDeleted(url, event)) {
this.emit(SubscriptionEvent.DeletedEvent, url, event)
} else if (!ctx.net.matchFilters(url, filters, event)) {
this.emit(SubscriptionEvent.FailedFilter, url, event)
} else if (!ctx.net.isValid(url, event)) {
this.emit(SubscriptionEvent.Invalid, url, event)
} else {
this.emit(SubscriptionEvent.Event, url, event)
}
}
onEose = (url: string) => {
const {closeOnEose, relays} = this.request
this.emit(SubscriptionEvent.Eose, url)
this.completed.add(url)
if (closeOnEose && this.completed.size === uniq(relays).length) {
this.onComplete()
}
}
onClose = (connection: Connection) => {
const {relays} = this.request
this.emit(SubscriptionEvent.Close, connection.url)
this.completed.add(connection.url)
if (this.completed.size === uniq(relays).length) {
this.onComplete()
}
}
onComplete = once(() => {
this.emit(SubscriptionEvent.Complete)
this.executorSubs.forEach(sub => sub.unsubscribe())
this.removeAllListeners()
this.executor.target.cleanup()
this.executor.target.connections.forEach((c: Connection) => {
c.off(ConnectionEvent.Close, this.onClose)
})
})
execute = async () => {
const {filters, signal, timeout, authTimeout = 0} = this.request
// If we didn't get any filters, don't even send the request, just close it.
// This can be valid when a caller fulfills a request themselves but still needs a subscription object.
if (filters.length === 0) {
this.emit(SubscriptionEvent.Send)
this.onComplete()
return
}
// Hook up our events
// Listen for abort via caller signal
signal?.addEventListener("abort", this.onComplete)
// Listen for abort via our own internal signal
this.controller.signal.addEventListener("abort", this.onComplete)
// If we have a timeout, complete the subscription automatically
if (timeout) setTimeout(this.onComplete, timeout + authTimeout)
// If one of our connections gets closed make sure to kill our sub
this.executor.target.connections.forEach((c: Connection) =>
c.on(ConnectionEvent.Close, this.onClose),
)
// Wait for auth if needed
await Promise.all(
this.executor.target.connections.map(async (connection: Connection) => {
if (authTimeout) {
await connection.auth.attempt(authTimeout)
}
}),
)
// If we send too many filters in a request relays will refuse to respond. REQs are rate
// limited client-side by Connection, so this will throttle concurrent requests.
for (const filtersChunk of chunk(8, filters)) {
this.executorSubs.push(
this.executor.subscribe(filtersChunk, {
onEvent: this.onEvent,
onEose: this.onEose,
}),
)
}
// Notify that we've sent the subscription
this.emit(SubscriptionEvent.Send)
}
close = () => this.controller.abort()
} }
export const calculateSubscriptionGroup = (sub: Subscription) => { export const calculateSubscriptionGroup = (sub: Subscription) => {
@@ -83,7 +187,7 @@ export const calculateSubscriptionGroup = (sub: Subscription) => {
} }
export const mergeSubscriptions = (subs: Subscription[]) => { export const mergeSubscriptions = (subs: Subscription[]) => {
const mergedSub = makeSubscription({ const mergedSub = new Subscription({
relays: uniq(subs.flatMap(sub => sub.request.relays)), relays: uniq(subs.flatMap(sub => sub.request.relays)),
filters: unionFilters(subs.flatMap(sub => sub.request.filters)), filters: unionFilters(subs.flatMap(sub => sub.request.filters)),
timeout: max(subs.map(sub => sub.request.timeout || 0)), timeout: max(subs.map(sub => sub.request.timeout || 0)),
@@ -101,27 +205,27 @@ export const mergeSubscriptions = (subs: Subscription[]) => {
for (const sub of subs) { for (const sub of subs) {
// Propagate events, but avoid duplicates // Propagate events, but avoid duplicates
sub.emitter.on(SubscriptionEvent.Event, (url: string, event: TrustedEvent) => { sub.on(SubscriptionEvent.Event, (url: string, event: TrustedEvent) => {
if (!mergedSub.tracker.track(event.id, url)) { if (!mergedSub.tracker.track(event.id, url)) {
mergedSub.emitter.emit(SubscriptionEvent.Event, url, event) mergedSub.emit(SubscriptionEvent.Event, url, event)
} }
}) })
// Propagate subscription completion. Since we split subs by relay, we need to wait // Propagate subscription completion. Since we split subs by relay, we need to wait
// until all relays are completed before we notify // until all relays are completed before we notify
sub.emitter.on(SubscriptionEvent.Complete, () => { sub.on(SubscriptionEvent.Complete, () => {
completedSubs.add(sub.id) completedSubs.add(sub.id)
if (completedSubs.size === subs.length) { if (completedSubs.size === subs.length) {
mergedSub.emitter.emit(SubscriptionEvent.Complete) mergedSub.emit(SubscriptionEvent.Complete)
} }
sub.emitter.removeAllListeners() sub.removeAllListeners()
}) })
// Propagate everything else too // Propagate everything else too
const propagateEvent = (type: SubscriptionEvent) => const propagateEvent = (type: SubscriptionEvent) =>
sub.emitter.on(type, (...args) => mergedSub.emitter.emit(type, ...args)) sub.on(type, (...args) => mergedSub.emit(type, ...args))
propagateEvent(SubscriptionEvent.Duplicate) propagateEvent(SubscriptionEvent.Duplicate)
propagateEvent(SubscriptionEvent.DeletedEvent) propagateEvent(SubscriptionEvent.DeletedEvent)
@@ -149,7 +253,7 @@ export const optimizeSubscriptions = (subs: Subscription[]) => {
for (const {relays, filters} of ctx.net.optimizeSubscriptions(group)) { for (const {relays, filters} of ctx.net.optimizeSubscriptions(group)) {
for (const filter of filters) { for (const filter of filters) {
const mergedSub = makeSubscription({ const mergedSub = new Subscription({
filters: [filter], filters: [filter],
relays, relays,
timeout, timeout,
@@ -170,20 +274,20 @@ export const optimizeSubscriptions = (subs: Subscription[]) => {
controller.signal.addEventListener("abort", onAbort) controller.signal.addEventListener("abort", onAbort)
} }
mergedSub.emitter.on(SubscriptionEvent.Event, (url: string, event: TrustedEvent) => { mergedSub.on(SubscriptionEvent.Event, (url: string, event: TrustedEvent) => {
for (const sub of group) { for (const sub of group) {
if (matchFilters(sub.request.filters, event) && !sub.tracker.track(event.id, url)) { if (matchFilters(sub.request.filters, event) && !sub.tracker.track(event.id, url)) {
sub.emitter.emit(SubscriptionEvent.Event, url, event) sub.emit(SubscriptionEvent.Event, url, event)
} }
} }
}) })
// Pass events back to caller // Pass events back to caller
const propagateEvent = (type: SubscriptionEvent) => const propagateEvent = (type: SubscriptionEvent) =>
mergedSub.emitter.on(type, (url: string, event: TrustedEvent) => { mergedSub.on(type, (url: string, event: TrustedEvent) => {
for (const sub of group) { for (const sub of group) {
if (matchFilters(sub.request.filters, event)) { if (matchFilters(sub.request.filters, event)) {
sub.emitter.emit(type, url, event) sub.emit(type, url, event)
} }
} }
}) })
@@ -193,7 +297,7 @@ export const optimizeSubscriptions = (subs: Subscription[]) => {
propagateEvent(SubscriptionEvent.Invalid) propagateEvent(SubscriptionEvent.Invalid)
const propagateFinality = (type: SubscriptionEvent, subIds: Set<string>) => const propagateFinality = (type: SubscriptionEvent, subIds: Set<string>) =>
mergedSub.emitter.on(type, (...args: any[]) => { mergedSub.on(type, (...args: any[]) => {
subIds.add(mergedSub.id) subIds.add(mergedSub.id)
// Wait for all subscriptions to complete before reporting finality to the caller. // Wait for all subscriptions to complete before reporting finality to the caller.
@@ -202,12 +306,12 @@ export const optimizeSubscriptions = (subs: Subscription[]) => {
// at any given time. // at any given time.
if (subIds.size === mergedSubs.length) { if (subIds.size === mergedSubs.length) {
for (const sub of group) { for (const sub of group) {
sub.emitter.emit(type, ...args) sub.emit(type, ...args)
} }
} }
if (type === SubscriptionEvent.Complete) { if (type === SubscriptionEvent.Complete) {
mergedSub.emitter.removeAllListeners() mergedSub.removeAllListeners()
} }
}) })
@@ -224,103 +328,11 @@ export const optimizeSubscriptions = (subs: Subscription[]) => {
}) })
} }
const _executeSubscription = (sub: Subscription) => {
const {request, emitter, tracker, controller} = sub
const {filters, closeOnEose, relays, signal, timeout, authTimeout = 0} = request
const executor = ctx.net.getExecutor(relays)
const subs: {unsubscribe: () => void}[] = []
const completedRelays = new Set()
// Hook up our events
emitter.on(SubscriptionEvent.Eose, (url: string) => {
completedRelays.add(url)
if (closeOnEose && completedRelays.size === uniq(relays).length) {
onComplete()
}
})
emitter.on(SubscriptionEvent.Close, (url: string) => {
completedRelays.add(url)
if (completedRelays.size === uniq(relays).length) {
onComplete()
}
})
emitter.on(SubscriptionEvent.Complete, () => {
emitter.removeAllListeners()
subs.forEach(sub => sub.unsubscribe())
executor.target.connections.forEach(c => c.off(ConnectionEvent.Close, onClose))
executor.target.cleanup()
})
// Functions for emitting events
const onEvent = (url: string, event: TrustedEvent) => {
if (tracker.track(event.id, url)) {
emitter.emit(SubscriptionEvent.Duplicate, url, event)
} else if (ctx.net.isDeleted(url, event)) {
emitter.emit(SubscriptionEvent.DeletedEvent, url, event)
} else if (!ctx.net.matchFilters(url, filters, event)) {
emitter.emit(SubscriptionEvent.FailedFilter, url, event)
} else if (!ctx.net.isValid(url, event)) {
emitter.emit(SubscriptionEvent.Invalid, url, event)
} else {
emitter.emit(SubscriptionEvent.Event, url, event)
}
}
const onEose = (url: string) => emitter.emit(SubscriptionEvent.Eose, url)
const onClose = (connection: Connection) => emitter.emit(SubscriptionEvent.Close, connection.url)
const onComplete = once(() => emitter.emit(SubscriptionEvent.Complete))
// Listen for abort via caller signal
signal?.addEventListener("abort", onComplete)
// Listen for abort via our own internal signal
controller.signal.addEventListener("abort", onComplete)
// If we have a timeout, complete the subscription automatically
if (timeout) setTimeout(onComplete, timeout + authTimeout)
// If one of our connections gets closed make sure to kill our sub
executor.target.connections.forEach((c: Connection) => {
c.on(ConnectionEvent.Close, onClose)
})
// Finally, start our subscription. If we didn't get any filters, don't even send the
// request, just close it. This can be valid when a caller fulfills a request themselves.
if (filters.length > 0) {
Promise.all(
executor.target.connections.map(async (connection: Connection) => {
if (authTimeout) {
await connection.auth.attempt(authTimeout)
}
}),
).then(() => {
// If we send too many filters in a request relays will refuse to respond. REQs are rate
// limited client-side by Connection, so this will throttle concurrent requests.
for (const filtersChunk of chunk(8, filters)) {
subs.push(executor.subscribe(filtersChunk, {onEvent, onEose}))
}
emitter.emit(SubscriptionEvent.Send)
})
} else {
emitter.emit(SubscriptionEvent.Send)
onComplete()
}
}
export const executeSubscription = (sub: Subscription) => export const executeSubscription = (sub: Subscription) =>
optimizeSubscriptions([sub]).forEach(_executeSubscription) optimizeSubscriptions([sub]).forEach(sub => sub.execute())
export const executeSubscriptions = (subs: Subscription[]) => export const executeSubscriptions = (subs: Subscription[]) =>
optimizeSubscriptions(subs).forEach(_executeSubscription) optimizeSubscriptions(subs).forEach(sub => sub.execute())
export const executeSubscriptionBatched = (() => { export const executeSubscriptionBatched = (() => {
const subs: Subscription[] = [] const subs: Subscription[] = []
@@ -354,7 +366,7 @@ export const subscribe = ({
onComplete, onComplete,
...request ...request
}: SubscribeRequestWithHandlers) => { }: SubscribeRequestWithHandlers) => {
const sub: Subscription = makeSubscription({delay: 50, ...request}) const sub: Subscription = new Subscription({delay: 50, ...request})
for (const relay of request.relays) { for (const relay of request.relays) {
if (relay !== LOCAL_RELAY_URL && relay !== normalizeRelayUrl(relay)) { if (relay !== LOCAL_RELAY_URL && relay !== normalizeRelayUrl(relay)) {
@@ -369,11 +381,10 @@ export const subscribe = ({
} }
// Signature for onEvent is different from emitter signature for historical reasons and convenience // Signature for onEvent is different from emitter signature for historical reasons and convenience
if (onEvent) if (onEvent) sub.on(SubscriptionEvent.Event, (url: string, event: TrustedEvent) => onEvent(event))
sub.emitter.on(SubscriptionEvent.Event, (url: string, event: TrustedEvent) => onEvent(event)) if (onEose) sub.on(SubscriptionEvent.Eose, onEose)
if (onEose) sub.emitter.on(SubscriptionEvent.Eose, onEose) if (onClose) sub.on(SubscriptionEvent.Close, onClose)
if (onClose) sub.emitter.on(SubscriptionEvent.Close, onClose) if (onComplete) sub.on(SubscriptionEvent.Complete, onComplete)
if (onComplete) sub.emitter.on(SubscriptionEvent.Complete, onComplete)
return sub return sub
} }
+3 -3
View File
@@ -114,9 +114,9 @@ export class Nip46Receiver extends Emitter {
this.sub = subscribe({relays: this.params.relays, filters}) this.sub = subscribe({relays: this.params.relays, filters})
return new Promise<void>(resolve => { return new Promise<void>(resolve => {
this.sub!.emitter.on(SubscriptionEvent.Send, resolve) this.sub!.on(SubscriptionEvent.Send, resolve)
this.sub!.emitter.on(SubscriptionEvent.Event, async (url: string, event: TrustedEvent) => { this.sub!.on(SubscriptionEvent.Event, async (url: string, event: TrustedEvent) => {
const json = await decrypt(this.signer, event.pubkey, event.content) const json = await decrypt(this.signer, event.pubkey, event.content)
const response = tryCatch(() => JSON.parse(json)) || {} const response = tryCatch(() => JSON.parse(json)) || {}
@@ -128,7 +128,7 @@ export class Nip46Receiver extends Emitter {
this.emit(Nip46Event.Receive, {...response, url, event} as Nip46Response) this.emit(Nip46Event.Receive, {...response, url, event} as Nip46Response)
}) })
this.sub!.emitter.on(SubscriptionEvent.Complete, () => { this.sub!.on(SubscriptionEvent.Complete, () => {
this.sub = undefined this.sub = undefined
}) })
}) })