Re-work feeds

This commit is contained in:
Jon Staab
2024-04-30 08:39:23 -07:00
parent 00b142243b
commit ad7a67efbb
7 changed files with 379 additions and 204 deletions
+29 -3
View File
@@ -21,7 +21,7 @@ export class FeedLoader<E extends Rumor> {
async getLoader([type, ...feed]: Feed, loadOpts: LoadOpts<E>) {
if (this.compiler.canCompile([type, ...feed] as Feed)) {
return this._getRequestLoader(await this.compiler.compile([type, ...feed] as Feed), loadOpts)
return this._getRequestsLoader(await this.compiler.compile([type, ...feed] as Feed), loadOpts)
}
switch(type) {
@@ -38,9 +38,35 @@ export class FeedLoader<E extends Rumor> {
}
}
async _getRequestsLoader(requests: RequestItem[], {onEvent, onExhausted}: LoadOpts<E>) {
const seen = new Set()
const exhausted = new Set()
const loaders = await Promise.all(
requests.map(
request => this._getRequestLoader(request, {
onExhausted: () => exhausted.add(request),
onEvent: e => {
if (!seen.has(e.id)) {
onEvent?.(e)
seen.add(e.id)
}
},
})
)
)
return async (limit: number) => {
await Promise.all(loaders.map(loader => loader(limit)))
if (exhausted.size === requests.length) {
onExhausted?.()
}
}
}
async _getRequestLoader({relays, filters}: RequestItem, {onEvent, onExhausted}: LoadOpts<E>) {
// Make sure we have some kind of filter to send if we've been given an empty one, as happens with relay feeds
if (filters.length === 0) {
if (!filters || filters.length === 0) {
filters = [{}]
}
@@ -55,7 +81,7 @@ export class FeedLoader<E extends Rumor> {
let until = maxUntil
return async (limit: number) => {
const requestFilters = filters
const requestFilters = filters!
// Remove filters that don't fit our window
.filter((filter: Filter) => {
const filterSince = filter.since || EPOCH