Add listeners to FeedController

This commit is contained in:
Jon Staab
2025-06-19 13:58:32 -07:00
parent 24acae704d
commit 1d6bd887ba
2 changed files with 171 additions and 5 deletions
+165 -1
View File
@@ -1,4 +1,15 @@
import {inc, defer, Deferred, memoize, omitVals, max, min, now} from "@welshman/lib" import {
inc,
removeNil,
call,
defer,
Deferred,
memoize,
omitVals,
max,
min,
now,
} from "@welshman/lib"
import {EPOCH, trimFilters, guessFilterDelta, TrustedEvent, Filter} from "@welshman/util" import {EPOCH, trimFilters, guessFilterDelta, TrustedEvent, Filter} from "@welshman/util"
import {Tracker} from "@welshman/net" import {Tracker} from "@welshman/net"
import {Feed, FeedType, RequestItem} from "./core.js" import {Feed, FeedType, RequestItem} from "./core.js"
@@ -48,6 +59,28 @@ export class FeedController {
load = async (limit: number) => (await this.getLoader())(limit) load = async (limit: number) => (await this.getLoader())(limit)
getListener = memoize(async () => {
const [type, ...feed] = this.options.feed
const requestItems = await this.getRequestItems()
if (requestItems) {
return this._getRequestsListener(requestItems)
}
switch (type) {
case FeedType.Difference:
return this._getDifferenceListener(feed as Feed[])
case FeedType.Intersection:
return this._getIntersectionListener(feed as Feed[])
case FeedType.Union:
return this._getUnionListener(feed as Feed[])
default:
throw new Error(`Unable to convert feed of type ${type} to listener`)
}
})
listen = async () => (await this.getListener())()
async _getRequestsLoader(requests: RequestItem[]) { async _getRequestsLoader(requests: RequestItem[]) {
const seen = new Set() const seen = new Set()
const exhausted = new Set() const exhausted = new Set()
@@ -121,6 +154,7 @@ export class FeedController {
await requestPage( await requestPage(
omitVals([undefined], { omitVals([undefined], {
relays, relays,
autoClose: true,
filters: trimFilters(requestFilters), filters: trimFilters(requestFilters),
signal: this.options.signal, signal: this.options.signal,
tracker: this.options.tracker, tracker: this.options.tracker,
@@ -279,4 +313,134 @@ export class FeedController {
} }
} }
} }
async _getRequestsListener(requests: RequestItem[]) {
const seen = new Set()
const listeners = await Promise.all(
requests.map(request =>
this._getRequestListener(request, {
onEvent: e => {
if (!seen.has(e.id)) {
this.options.onEvent?.(e)
seen.add(e.id)
}
},
}),
),
)
return () => {
const unsubscribers = listeners.map(call)
return () => unsubscribers.forEach(call)
}
}
async _getRequestListener(
{relays, filters}: RequestItem,
{onEvent}: Pick<FeedControllerOptions, "onEvent">,
) {
// Make sure we have some kind of filter to send if we've been given an empty one, as happens with relay feeds
if (!filters || filters.length === 0) {
filters = [{}]
}
return () => {
const since = now()
const controller = new AbortController()
const signal = AbortSignal.any(removeNil([controller.signal, this.options.signal]))
const requestFilters = filters!
.filter((filter: Filter) => !filter.until || filter.until <= since)
.map((filter: Filter) => ({...filter, since}))
requestPage(
omitVals([undefined], {
relays,
signal,
onEvent: (event: TrustedEvent) => onEvent?.(event),
filters: trimFilters(requestFilters),
tracker: this.options.tracker,
context: this.options.context,
}),
)
return () => controller.abort()
}
}
_getDifferenceListener(feeds: Feed[]) {
const skip = new Set<string>()
const controllers = feeds.map(
(thisFeed: Feed, i: number) =>
new FeedController({
...this.options,
feed: thisFeed,
onEvent: (event: TrustedEvent) => {
if (i === 0 && !skip.has(event.id)) {
this.options.onEvent?.(event)
} else {
skip.add(event.id)
}
},
}),
)
return () => {
const unsubscribers = controllers.map(controller => controller.listen())
return () => unsubscribers.forEach(async p => call(await p))
}
}
_getIntersectionListener(feeds: Feed[]) {
const counts = new Map<string, number>()
const controllers = feeds.map(
(thisFeed: Feed, i: number) =>
new FeedController({
...this.options,
feed: thisFeed,
onEvent: (event: TrustedEvent) => {
const count = inc(counts.get(event.id))
if (count === feeds.length) {
this.options.onEvent?.(event)
}
counts.set(event.id, count)
},
}),
)
return () => {
const unsubscribers = controllers.map(controller => controller.listen())
return () => unsubscribers.forEach(async p => call(await p))
}
}
_getUnionListener(feeds: Feed[]) {
const seen = new Set()
const controllers = feeds.map(
(thisFeed: Feed, i: number) =>
new FeedController({
...this.options,
feed: thisFeed,
onEvent: (event: TrustedEvent) => {
if (!seen.has(event.id)) {
this.options.onEvent?.(event)
seen.add(event.id)
}
},
}),
)
return () => {
const unsubscribers = controllers.map(controller => controller.listen())
return () => unsubscribers.forEach(async p => call(await p))
}
}
} }
+6 -4
View File
@@ -21,6 +21,7 @@ export type RequestPageOptions = {
tracker?: Tracker tracker?: Tracker
signal?: AbortSignal signal?: AbortSignal
context?: AdapterContext context?: AdapterContext
autoClose?: boolean
} }
export const requestPage = async ({ export const requestPage = async ({
@@ -30,9 +31,10 @@ export const requestPage = async ({
tracker = new Tracker(), tracker = new Tracker(),
signal, signal,
context, context,
autoClose,
}: RequestPageOptions) => { }: RequestPageOptions) => {
if (relays.length > 0) { if (relays.length > 0) {
return request({tracker, signal, context, onEvent, relays, filters, autoClose: true}) return request({tracker, signal, context, onEvent, relays, filters, autoClose})
} }
const promises: Promise<TrustedEvent[]>[] = [] const promises: Promise<TrustedEvent[]>[] = []
@@ -46,7 +48,7 @@ export const requestPage = async ({
context, context,
onEvent, onEvent,
threshold: 0.1, threshold: 0.1,
autoClose: true, autoClose,
filters: withSearch, filters: withSearch,
relays: Router.get().Search().getUrls(), relays: Router.get().Search().getUrls(),
}), }),
@@ -64,7 +66,7 @@ export const requestPage = async ({
relays, relays,
filters, filters,
threshold: 0.8, threshold: 0.8,
autoClose: true, autoClose,
}), }),
), ),
) )
@@ -84,7 +86,7 @@ export const requestPage = async ({
onEvent, onEvent,
filters, filters,
relays: [LOCAL_RELAY_URL], relays: [LOCAL_RELAY_URL],
autoClose: true, autoClose,
}) })
} }