diff --git a/packages/feeds/src/controller.ts b/packages/feeds/src/controller.ts index fea5c3a..56d3149 100644 --- a/packages/feeds/src/controller.ts +++ b/packages/feeds/src/controller.ts @@ -1,4 +1,15 @@ -import {inc, defer, Deferred, memoize, omitVals, max, min, now} from "@welshman/lib" +import { + inc, + removeNil, + call, + defer, + Deferred, + memoize, + omitVals, + max, + min, + now, +} from "@welshman/lib" import {EPOCH, trimFilters, guessFilterDelta, TrustedEvent, Filter} from "@welshman/util" import {Tracker} from "@welshman/net" import {Feed, FeedType, RequestItem} from "./core.js" @@ -48,6 +59,28 @@ export class FeedController { load = async (limit: number) => (await this.getLoader())(limit) + getListener = memoize(async () => { + const [type, ...feed] = this.options.feed + const requestItems = await this.getRequestItems() + + if (requestItems) { + return this._getRequestsListener(requestItems) + } + + switch (type) { + case FeedType.Difference: + return this._getDifferenceListener(feed as Feed[]) + case FeedType.Intersection: + return this._getIntersectionListener(feed as Feed[]) + case FeedType.Union: + return this._getUnionListener(feed as Feed[]) + default: + throw new Error(`Unable to convert feed of type ${type} to listener`) + } + }) + + listen = async () => (await this.getListener())() + async _getRequestsLoader(requests: RequestItem[]) { const seen = new Set() const exhausted = new Set() @@ -121,6 +154,7 @@ export class FeedController { await requestPage( omitVals([undefined], { relays, + autoClose: true, filters: trimFilters(requestFilters), signal: this.options.signal, tracker: this.options.tracker, @@ -279,4 +313,134 @@ export class FeedController { } } } + + async _getRequestsListener(requests: RequestItem[]) { + const seen = new Set() + const listeners = await Promise.all( + requests.map(request => + this._getRequestListener(request, { + onEvent: e => { + if (!seen.has(e.id)) { + this.options.onEvent?.(e) + seen.add(e.id) + } + }, + }), + ), + ) + + return () => { + const unsubscribers = listeners.map(call) + + return () => unsubscribers.forEach(call) + } + } + + async _getRequestListener( + {relays, filters}: RequestItem, + {onEvent}: Pick, + ) { + // Make sure we have some kind of filter to send if we've been given an empty one, as happens with relay feeds + if (!filters || filters.length === 0) { + filters = [{}] + } + + return () => { + const since = now() + const controller = new AbortController() + const signal = AbortSignal.any(removeNil([controller.signal, this.options.signal])) + const requestFilters = filters! + .filter((filter: Filter) => !filter.until || filter.until <= since) + .map((filter: Filter) => ({...filter, since})) + + requestPage( + omitVals([undefined], { + relays, + signal, + onEvent: (event: TrustedEvent) => onEvent?.(event), + filters: trimFilters(requestFilters), + tracker: this.options.tracker, + context: this.options.context, + }), + ) + + return () => controller.abort() + } + } + + _getDifferenceListener(feeds: Feed[]) { + const skip = new Set() + + const controllers = feeds.map( + (thisFeed: Feed, i: number) => + new FeedController({ + ...this.options, + feed: thisFeed, + onEvent: (event: TrustedEvent) => { + if (i === 0 && !skip.has(event.id)) { + this.options.onEvent?.(event) + } else { + skip.add(event.id) + } + }, + }), + ) + + return () => { + const unsubscribers = controllers.map(controller => controller.listen()) + + return () => unsubscribers.forEach(async p => call(await p)) + } + } + + _getIntersectionListener(feeds: Feed[]) { + const counts = new Map() + + const controllers = feeds.map( + (thisFeed: Feed, i: number) => + new FeedController({ + ...this.options, + feed: thisFeed, + onEvent: (event: TrustedEvent) => { + const count = inc(counts.get(event.id)) + + if (count === feeds.length) { + this.options.onEvent?.(event) + } + + counts.set(event.id, count) + }, + }), + ) + + return () => { + const unsubscribers = controllers.map(controller => controller.listen()) + + return () => unsubscribers.forEach(async p => call(await p)) + } + } + + _getUnionListener(feeds: Feed[]) { + const seen = new Set() + + const controllers = feeds.map( + (thisFeed: Feed, i: number) => + new FeedController({ + ...this.options, + feed: thisFeed, + onEvent: (event: TrustedEvent) => { + if (!seen.has(event.id)) { + this.options.onEvent?.(event) + seen.add(event.id) + } + }, + }), + ) + + return () => { + const unsubscribers = controllers.map(controller => controller.listen()) + + return () => unsubscribers.forEach(async p => call(await p)) + } + } } diff --git a/packages/feeds/src/request.ts b/packages/feeds/src/request.ts index 942a42f..87b11b9 100644 --- a/packages/feeds/src/request.ts +++ b/packages/feeds/src/request.ts @@ -21,6 +21,7 @@ export type RequestPageOptions = { tracker?: Tracker signal?: AbortSignal context?: AdapterContext + autoClose?: boolean } export const requestPage = async ({ @@ -30,9 +31,10 @@ export const requestPage = async ({ tracker = new Tracker(), signal, context, + autoClose, }: RequestPageOptions) => { if (relays.length > 0) { - return request({tracker, signal, context, onEvent, relays, filters, autoClose: true}) + return request({tracker, signal, context, onEvent, relays, filters, autoClose}) } const promises: Promise[] = [] @@ -46,7 +48,7 @@ export const requestPage = async ({ context, onEvent, threshold: 0.1, - autoClose: true, + autoClose, filters: withSearch, relays: Router.get().Search().getUrls(), }), @@ -64,7 +66,7 @@ export const requestPage = async ({ relays, filters, threshold: 0.8, - autoClose: true, + autoClose, }), ), ) @@ -84,7 +86,7 @@ export const requestPage = async ({ onEvent, filters, relays: [LOCAL_RELAY_URL], - autoClose: true, + autoClose, }) }