diff --git a/packages/app/src/core.ts b/packages/app/src/core.ts index ed4ceac..820abd3 100644 --- a/packages/app/src/core.ts +++ b/packages/app/src/core.ts @@ -1,7 +1,7 @@ import {isNil} from "@welshman/lib" import {Repository, Relay, LOCAL_RELAY_URL, getFilterResultCardinality} from "@welshman/util" import type {TrustedEvent, Filter} from "@welshman/util" -import {Tracker, subscribe as baseSubscribe, mergeSubscriptions} from "@welshman/net" +import {Tracker, subscribe as baseSubscribe} from "@welshman/net" import type {SubscribeRequest} from "@welshman/net" import {createEventStore} from "@welshman/store" import type {Router} from './router' @@ -52,29 +52,23 @@ export const subscribe = (request: PartialSubscribeRequest) => { } } + // Make sure to query our local relay too const delay = AppContext.requestDelay const timeout = AppContext.requestTimeout + const sub = baseSubscribe({delay, authTimeout: timeout, relays: [], ...request}) - return mergeSubscriptions( - AppContext.splitRequest!(request).map(req => { - // Make sure to query our local relay too - const relays = [...req.relays, LOCAL_RELAY_URL] - const sub = baseSubscribe({delay, authTimeout: timeout, ...req, relays}) + sub.emitter.on("event", (url: string, e: TrustedEvent) => { + repository.publish(e) + }) - sub.emitter.on("event", (url: string, e: TrustedEvent) => { - repository.publish(e) - }) + // Keep cached results async so the caller can set up handlers + setTimeout(() => { + for (const event of events) { + sub.emitter.emit("event", LOCAL_RELAY_URL, event) + } + }) - // Keep cached results async so the caller can set up handlers - setTimeout(() => { - for (const event of events) { - sub.emitter.emit("event", LOCAL_RELAY_URL, event) - } - }) - - return sub - }) - ) + return sub } export const load = (request: PartialSubscribeRequest) => diff --git a/packages/app/src/index.ts b/packages/app/src/index.ts index 9de2712..c1dff61 100644 --- a/packages/app/src/index.ts +++ b/packages/app/src/index.ts @@ -16,19 +16,29 @@ export * from './topics' export * from './util' export * from './zappers' -import {NetworkContext} from "@welshman/net" -import {type TrustedEvent} from "@welshman/util" +import {partition} from "@welshman/lib" +import {type Subscription, NetworkContext, defaultOptimizeSubscriptions} from "@welshman/net" +import {type TrustedEvent, unionFilters} from "@welshman/util" import {tracker, repository, AppContext} from './core' -import {splitRequest, makeRouter} from './router' +import {makeRouter, getFilterSelections} from './router' import {onAuth} from './session' +export function* optimizeSubscriptions(subs: Subscription[]) { + const [withRelays, withoutRelays] = partition(sub => sub.request.relays.length > 0, subs) + + yield* defaultOptimizeSubscriptions(withRelays) + yield* getFilterSelections( + unionFilters(withoutRelays.flatMap(sub => sub.request.filters)) + ) +} + Object.assign(NetworkContext, { onAuth, onEvent: (url: string, event: TrustedEvent) => tracker.track(event.id, url), isDeleted: (url: string, event: TrustedEvent) => repository.isDeleted(event), + optimizeSubscriptions, }) Object.assign(AppContext, { - splitRequest, router: makeRouter(), }) diff --git a/packages/app/src/router.ts b/packages/app/src/router.ts index e8f21e8..6b9c0b2 100644 --- a/packages/app/src/router.ts +++ b/packages/app/src/router.ts @@ -3,7 +3,6 @@ import {Tags, getFilterId, unionFilters, isShareableRelayUrl, isCommunityAddress import type {TrustedEvent, Filter} from '@welshman/util' import {NetworkContext, ConnectionStatus} from '@welshman/net' import {AppContext} from './core' -import type {PartialSubscribeRequest} from './core' import {pubkey} from './session' import {relaySelectionsByPubkey, getReadRelayUrls, getWriteRelayUrls, getRelayUrls} from './relaySelections' import {relays, relaysByUrl} from './relays' @@ -443,7 +442,7 @@ export const makeRouter = (options: Partial = {}) => // Infer relay selections from filters export type RelayFilters = { - relay: string + relays: string[] filters: Filter[] } @@ -561,7 +560,7 @@ export const getFilterSelections = (filters: Filter[]): RelayFilters[] => { .getSelections() .map(({values, relay}) => ({ filters: values.map(id => filtersById.get(id)!), - relay, + relays: [relay], })) // Pubkey-based selections can get really big. Use the most popular relays for the long tail @@ -575,11 +574,3 @@ export const getFilterSelections = (filters: Filter[]): RelayFilters[] => { return keep } - -export const splitRequest = (req: PartialSubscribeRequest) => { - if ((req.relays || []).length > 0) return [req] - - return getFilterSelections(req.filters) - .map(({relay, filters}) => ({...req, filters, relays: [relay]})) -} - diff --git a/packages/lib/src/Tools.ts b/packages/lib/src/Tools.ts index 03d6c5c..f38ca09 100644 --- a/packages/lib/src/Tools.ts +++ b/packages/lib/src/Tools.ts @@ -197,6 +197,19 @@ export const isPojo = (obj: any) => { export const equals = (a: any, b: any) => { if (a === b) return true + if (a instanceof Set && b instanceof Set) { + a = Array.from(a) + b = Array.from(b) + } + + if (a instanceof Set) { + if (!(b instanceof Set) || a.size !== b.size) { + return false + } + + return Array.from(a).every(x => b.has(x)) + } + if (Array.isArray(a)) { if (!Array.isArray(b) || a.length !== b.length) { return false diff --git a/packages/net/src/Context.ts b/packages/net/src/Context.ts index a4124a3..51c38da 100644 --- a/packages/net/src/Context.ts +++ b/packages/net/src/Context.ts @@ -1,8 +1,10 @@ -import {matchFilters, hasValidSignature} from '@welshman/util' +import {uniq} from '@welshman/lib' +import {matchFilters, unionFilters, hasValidSignature} from '@welshman/util' import type {Filter, SignedEvent} from '@welshman/util' import {Pool} from "./Pool" import {Executor} from "./Executor" import {Relays} from "./target/Relays" +import type {Subscription} from "./Subscribe" export const defaultPool = new Pool() @@ -21,6 +23,15 @@ const defaultHasValidSignature = (url: string, event: SignedEvent) => hasValidSi const defaultMatchFilters = (url: string, filters: Filter[], event: SignedEvent) => matchFilters(filters, event) +export function* defaultOptimizeSubscriptions(subs: Subscription[]) { + for (const relay of uniq(subs.flatMap(sub => sub.request.relays || []))) { + const relaySubs = subs.filter(sub => sub.request.relays.includes(relay)) + const filters = unionFilters(relaySubs.flatMap(sub => sub.request.filters)) + + yield {relays: [relay], filters} + } +} + export const NetworkContext = { pool: defaultPool, getExecutor: defaultGetExecutor, @@ -30,4 +41,5 @@ export const NetworkContext = { isDeleted: defaultIsDeleted, hasValidSignature: defaultHasValidSignature, matchFilters: defaultMatchFilters, + optimizeSubscriptions: defaultOptimizeSubscriptions, } diff --git a/packages/net/src/Subscribe.ts b/packages/net/src/Subscribe.ts index 5a0275a..e532fb2 100644 --- a/packages/net/src/Subscribe.ts +++ b/packages/net/src/Subscribe.ts @@ -66,6 +66,7 @@ export const calculateSubscriptionGroup = (sub: Subscription) => { const parts: string[] = [] if (sub.request.timeout) parts.push(`timeout:${sub.request.timeout}`) + if (sub.request.authTimeout) parts.push(`authTimeout:${sub.request.authTimeout}`) if (sub.request.closeOnEose) parts.push('closeOnEose') return parts.join('|') @@ -123,28 +124,31 @@ export const mergeSubscriptions = (subs: Subscription[]) => { return mergedSub } -export const optimizeSubscriptions = (subs: Subscription[]) => - Array.from(groupBy(calculateSubscriptionGroup, subs).values()) +export const optimizeSubscriptions = (subs: Subscription[]) => { + return Array.from(groupBy(calculateSubscriptionGroup, subs).values()) .flatMap(group => { - const completedRelays = new Set() + const timeout = max(group.map(sub => sub.request.timeout || 0)) + const authTimeout = max(group.map(sub => sub.request.authTimeout || 0)) + const closeOnEose = group.every(sub => sub.request.closeOnEose) + const completedSubs = new Set() + const abortedSubs = new Set() + const closedSubs = new Set() + const eosedSubs = new Set() const mergedSubs = [] - for (const relay of uniq(group.flatMap((sub: Subscription) => sub.request.relays))) { - const abortedSubs = new Set() - const callerSubs = group.filter((sub: Subscription) => sub.request.relays.includes(relay)) - const mergedSub = makeSubscription({ - relays: [relay], - closeOnEose: callerSubs.every(sub => sub.request.closeOnEose), - timeout: max(callerSubs.map(sub => sub.request.timeout || 0)), - authTimeout: max(callerSubs.map(sub => sub.request.authTimeout || 0)), - filters: unionFilters(callerSubs.flatMap((sub: Subscription) => sub.request.filters)), + for (const {relays, filters} of NetworkContext.optimizeSubscriptions(group)) { + const mergedSub = makeSubscription({filters, + relays, + timeout, + authTimeout, + closeOnEose }) - for (const {id, controller, request} of callerSubs) { + for (const {id, controller, request} of group) { const onAbort = () => { abortedSubs.add(id) - if (abortedSubs.size === callerSubs.length) { + if (abortedSubs.size === group.length) { mergedSub.close() } } @@ -154,7 +158,7 @@ export const optimizeSubscriptions = (subs: Subscription[]) => } mergedSub.emitter.on(SubscriptionEvent.Event, (url: string, event: SignedEvent) => { - for (const sub of callerSubs) { + for (const sub of group) { if (!sub.tracker.track(event.id, url) && matchFilters(sub.request.filters, event)) { sub.emitter.emit(SubscriptionEvent.Event, url, event) } @@ -162,53 +166,48 @@ export const optimizeSubscriptions = (subs: Subscription[]) => }) // Pass events back to caller - const propagateEvent = (type: SubscriptionEvent, checkFilter: boolean) => + const propagateEvent = (type: SubscriptionEvent) => mergedSub.emitter.on(type, (url: string, event: SignedEvent) => { - for (const sub of callerSubs) { - if (!checkFilter || matchFilters(sub.request.filters, event)) { + for (const sub of group) { + if (matchFilters(sub.request.filters, event)) { sub.emitter.emit(type, url, event) } } }) - propagateEvent(SubscriptionEvent.Duplicate, true) - propagateEvent(SubscriptionEvent.DeletedEvent, false) - propagateEvent(SubscriptionEvent.FailedFilter, false) - propagateEvent(SubscriptionEvent.InvalidSignature, true) + propagateEvent(SubscriptionEvent.Duplicate) + propagateEvent(SubscriptionEvent.DeletedEvent) + propagateEvent(SubscriptionEvent.InvalidSignature) - // Propagate eose - mergedSub.emitter.on(SubscriptionEvent.Eose, (url: string) => { - for (const sub of callerSubs) { - sub.emitter.emit(SubscriptionEvent.Eose, url) - } - }) + const propagateFinality = (type: SubscriptionEvent, subIds: Set) => + mergedSub.emitter.on(type, (...args: any[]) => { + subIds.add(mergedSub.id) - // Propagate close - mergedSub.emitter.on(SubscriptionEvent.Close, (url: string) => { - for (const sub of callerSubs) { - sub.emitter.emit(SubscriptionEvent.Close, url) - } - }) - - // Propagate subscription completion. Since we split subs by relay, we need to wait - // until all relays are completed before we notify - mergedSub.emitter.on(SubscriptionEvent.Complete, () => { - completedRelays.add(relay) - - for (const sub of callerSubs) { - if (sub.request.relays.every(url => completedRelays.has(url))) { - sub.emitter.emit(SubscriptionEvent.Complete) + // Wait for all subscriptions to complete before reporting finality to the caller. + // This is sub-optimal, but because we're outsourcing filter/relay optimization + // we can't make any assumptions about which caller subscriptions have completed + // at any given time. + if (subIds.size === group.length) { + for (const sub of group) { + sub.emitter.emit(type, ...args) + } } - } - mergedSub.emitter.removeAllListeners() - }) + if (type === SubscriptionEvent.Complete) { + mergedSub.emitter.removeAllListeners() + } + }) + + propagateFinality(SubscriptionEvent.Eose, eosedSubs) + propagateFinality(SubscriptionEvent.Close, closedSubs) + propagateFinality(SubscriptionEvent.Complete, completedSubs) mergedSubs.push(mergedSub) } return mergedSubs }) +} export const executeSubscription = (sub: Subscription) => { const {request, emitter, tracker, controller} = sub @@ -328,10 +327,6 @@ export const executeSubscriptionBatched = (() => { export const subscribe = (request: SubscribeRequest) => { const subscription: Subscription = makeSubscription({delay: 50, ...request}) - if (request.relays.length === 0) { - console.warn("Attempted to execute a subscription with zero relays") - } - if (request.delay === 0) { executeSubscription(subscription) } else {