import type {Event} from 'nostr-tools' import {Emitter, randomId, groupBy, batch, defer, uniq, uniqBy} from '@welshman/lib' import type {Deferred} from '@welshman/lib' import {matchFilters, mergeFilters} from '@welshman/util' import type {Filter} from '@welshman/util' import {Tracker} from "./Tracker" import {Connection} from './Connection' import {NetworkContext} from './Context' // `subscribe` is a super function that handles batching subscriptions by merging // them based on parameters (filters and subscribe opts), then splits them by relay. // This results in fewer REQs being opened per connection, fewer duplicate events // being downloaded, and therefore less signature validation. // // Behavior can be further configured using NetworkContext. This can be useful for // adding support for querying a local cache like a relay, tracking deleted events, // and bypassing validation for trusted relays. // // Urls that any given event was seen on are tracked using subscription request's `tracker` // property. These are merged across all subscription requests, so it is possible that an // event may be seen on more relays that were actually requested, in the case of overlapping // subscriptions. export enum SubscriptionEvent { Eose = "eose", Close = "close", Event = "event", Abort = "abort", Complete = "complete", Duplicate = "duplicate", DeletedEvent = "deleted-event", FailedFilter = "failed-filter", InvalidSignature = "invalid-signature", } export type SubscribeRequest = { relays: string[] filters: Filter[] timeout?: number immediate?: boolean closeOnEose?: boolean } export type Subscription = { id: string emitter: Emitter tracker: Tracker result: Deferred request: SubscribeRequest close: () => void } export const makeSubscription = (request: SubscribeRequest) => { const id = randomId() const emitter = new Emitter() const tracker = new Tracker() const result = defer() const close = () => emitter.emit('abort') emitter.setMaxListeners(100) return {id, request, emitter, tracker, result, close} } export const calculateSubscriptionGroup = (sub: Subscription) => { const parts: string[] = [] if (sub.request.timeout) parts.push(`timeout:${sub.request.timeout}`) if (sub.request.closeOnEose) parts.push('closeOnEose') return parts.join('|') } export const mergeSubscriptions = (subs: Subscription[]) => { const completedRelays = new Set() const mergedSubscriptions = [] for (const group of Object.values(groupBy(calculateSubscriptionGroup, subs))) { for (const relay of uniq(group.flatMap((sub: Subscription) => sub.request.relays))) { const abortedSubs = new Set() const callerSubs = group.filter((sub: Subscription) => sub.request.relays.includes(relay)) const mergedSub = makeSubscription({ relays: [relay], timeout: callerSubs[0].request.timeout, filters: mergeFilters(callerSubs.flatMap((sub: Subscription) => sub.request.filters)), }) for (const {id, emitter} of callerSubs) { // Propagate abort event from the caller to the merged subscription emitter.on(SubscriptionEvent.Abort, () => { abortedSubs.add(id) if (abortedSubs.size === callerSubs.length) { mergedSub.close() } }) } mergedSub.emitter.on(SubscriptionEvent.Event, (url: string, event: Event) => { for (const sub of callerSubs) { if (sub.tracker.track(event.id, url)) { continue } if (!matchFilters(sub.request.filters, event)) { continue } sub.emitter.emit(SubscriptionEvent.Event, url, event) } }) // Pass events back to caller const propagateEvent = (type: SubscriptionEvent, checkFilter: boolean) => mergedSub.emitter.on(type, (url: string, event: Event) => { for (const sub of callerSubs) { if (!checkFilter || matchFilters(sub.request.filters, event)) { sub.emitter.emit(type, url, event) } } }) propagateEvent(SubscriptionEvent.Duplicate, true) propagateEvent(SubscriptionEvent.DeletedEvent, false) propagateEvent(SubscriptionEvent.FailedFilter, false) propagateEvent(SubscriptionEvent.InvalidSignature, true) // Propagate eose mergedSub.emitter.on(SubscriptionEvent.Eose, (url: string) => { for (const sub of callerSubs) { sub.emitter.emit(SubscriptionEvent.Eose, url) } }) // Propagate close mergedSub.emitter.on(SubscriptionEvent.Close, (url: string) => { for (const sub of callerSubs) { sub.emitter.emit(SubscriptionEvent.Close, url) } }) // Propagate subscription completion. Since we split subs by relay, we need to wait // until all relays are completed before we notify mergedSub.emitter.on(SubscriptionEvent.Complete, () => { completedRelays.add(relay) for (const sub of callerSubs) { if (sub.request.relays.every(url => completedRelays.has(url))) { sub.emitter.emit(SubscriptionEvent.Complete) } } mergedSub.emitter.removeAllListeners() }) // Propagate promise resolution mergedSub.result.then((events: Event[]) => { events = uniqBy((event: Event) => event.id, events) for (const sub of callerSubs) { sub.result.resolve(events.filter((e: Event) => matchFilters(sub.request.filters, e))) } }) mergedSubscriptions.push(mergedSub) } } return mergedSubscriptions } export const executeSubscription = (sub: Subscription) => { const {result, request, emitter, tracker} = sub const {timeout, filters, closeOnEose, relays} = request const executor = NetworkContext.getExecutor(relays) const events: Event[] = [] const completedRelays = new Set() let completed: number const complete = () => { if (completed) return // Mark as cleaned upp, unsubscribe our executor completed = Date.now() executorSub.unsubscribe() // Resolve our promise result.resolve(events) // Notify caller, clean up our event emitter emitter.emit(SubscriptionEvent.Complete) emitter.removeAllListeners() // Remove our onClose handler from connections, since they are shared by many subs executor.target.connections.forEach((c: Connection) => c.off("close", onClose)) executor.target.cleanup() } const onEvent = (url: string, event: Event) => { // Check the signature and filters. If we've seen this event, don't re-validate. if (tracker.track(event.id, url)) { emitter.emit(SubscriptionEvent.Duplicate, url, event) } else if (NetworkContext.isDeleted(url, event)) { emitter.emit(SubscriptionEvent.DeletedEvent, url, event) } else if (!NetworkContext.matchFilters(url, filters, event)) { emitter.emit(SubscriptionEvent.FailedFilter, url, event) } else if (!NetworkContext.hasValidSignature(url, event)) { emitter.emit(SubscriptionEvent.InvalidSignature, url, event) } else { emitter.emit(SubscriptionEvent.Event, url, event) events.push(event) } } const onEose = (url: string) => { completedRelays.add(url) emitter.emit(SubscriptionEvent.Eose, url) if (closeOnEose && completedRelays.size === executor.target.connections.length) { complete() } } const onClose = (connection: Connection) => { completedRelays.add(connection.url) emitter.emit(SubscriptionEvent.Close, connection.url) if (completedRelays.size === executor.target.connections.length) { complete() } } // Allow the caller to cancel the subscription emitter.on(SubscriptionEvent.Abort, complete) // If we have a timeout, complete the subscription automatically if (timeout) setTimeout(complete, timeout) // If one of our connections gets closed make sure to kill our sub executor.target.connections.forEach((c: Connection) => c.on('close', onClose)) // Finally, start our subscription const executorSub = executor.subscribe(filters, {onEvent, onEose}) } export const executeSubscriptions = (subs: Subscription[]) => mergeSubscriptions(subs).forEach(executeSubscription) export const executeSubscriptionBatched = batch(800, executeSubscriptions) export const subscribe = (request: SubscribeRequest) => { const subscription: Subscription = makeSubscription(request) if (request.filters.length === 0) { throw new Error("Zero filters passed to subscribe") } if (request.immediate) { executeSubscription(subscription) } else { executeSubscriptionBatched(subscription) } return subscription }