Compile feeds to filters as far as possible

This commit is contained in:
Jon Staab
2024-04-11 16:44:11 -07:00
parent 0f2605567e
commit d3604ed9f2
+91 -78
View File
@@ -1,6 +1,6 @@
import {inc, now, isNil} from '@coracle.social/lib' import {inc, now, isNil} from '@coracle.social/lib'
import type {Rumor, Filter} from '@coracle.social/util' import type {Rumor, Filter} from '@coracle.social/util'
import {Tags, getIdFilters} from '@coracle.social/util' import {Tags, getIdFilters, mergeFilters} from '@coracle.social/util'
// TODO: // TODO:
// - if one of the feeds in a union is a filter, don't execute it, // - if one of the feeds in a union is a filter, don't execute it,
@@ -83,28 +83,57 @@ export type ExecuteOpts = {
export class Interpreter { export class Interpreter {
constructor(readonly opts: InterpreterOpts) {} constructor(readonly opts: InterpreterOpts) {}
// Dispatch to different types of feed
execute([type, ...feed]: Feed, opts: ExecuteOpts) { execute([type, ...feed]: Feed, opts: ExecuteOpts) {
switch (type) { switch(type) {
case FeedType.Difference: case FeedType.Difference:
return this.executeDifference(feed as Feed[], opts) return this._executeDifference(feed as Feed[], opts)
case FeedType.Intersection: case FeedType.Intersection:
return this.executeIntersection(feed as Feed[], opts) return this._executeIntersection(feed as Feed[], opts)
case FeedType.SymmetricDifference: case FeedType.SymmetricDifference:
return this.executeSymmetricDifference(feed as Feed[], opts) return this._executeSymmetricDifference(feed as Feed[], opts)
case FeedType.Union: default:
return this.executeUnion(feed as Feed[], opts) return this._feedToFilters([type, ...feed] as Feed).then(filters =>
case FeedType.Filter: this.opts.reqFilters(filters.map(this._compileFilter), opts)
return this.executeFilter(feed as DynamicFilter[], opts) )
case FeedType.List:
return this.executeList(feed as string[], opts)
case FeedType.LOL:
return this.executeLOL(feed as string[], opts)
case FeedType.DVM:
return this.executeDVM(feed as DVMRequest[], opts)
} }
} }
executeDifference(feeds: Feed[], {onEvent, onComplete}: ExecuteOpts) { async _feedToFilters([type, ...feed]: Feed) {
switch(type) {
case FeedType.Union:
return await this._unionToFilters(feed as Feed[])
case FeedType.List:
return await this._listsToFilters(feed as string[])
case FeedType.LOL:
return await this._lolsToFilters(feed as string[])
case FeedType.DVM:
return await this._dvmsToFilters(feed as DVMRequest[])
case FeedType.Filter:
return feed as Filter[]
default:
throw new Error(`Unable to convert feed of type ${type} to filters`)
}
}
async _feedsToFilters(feeds: Feed[]) {
const filters: Filter[] = []
await Promise.all(
feeds.map(async feed => {
for (const filter of await this._feedToFilters(feed)) {
filters.push(this._compileFilter(filter))
}
})
)
return filters
}
// Special-case executors for set operations we can't infer filters for
async _executeDifference(feeds: Feed[], {onEvent, onComplete}: ExecuteOpts) {
const skip = new Set<string>() const skip = new Set<string>()
const events: Rumor[] = [] const events: Rumor[] = []
@@ -130,7 +159,7 @@ export class Interpreter {
}) })
} }
executeIntersection(feeds: Feed[], {onEvent, onComplete}: ExecuteOpts) { async _executeIntersection(feeds: Feed[], {onEvent, onComplete}: ExecuteOpts) {
const counts = new Map<string, number>() const counts = new Map<string, number>()
const events = new Map<string, Rumor>() const events = new Map<string, Rumor>()
@@ -153,7 +182,7 @@ export class Interpreter {
}) })
} }
executeSymmetricDifference(feeds: Feed[], {onEvent, onComplete}: ExecuteOpts) { async _executeSymmetricDifference(feeds: Feed[], {onEvent, onComplete}: ExecuteOpts) {
const counts = new Map<string, number>() const counts = new Map<string, number>()
const events = new Map<string, Rumor>() const events = new Map<string, Rumor>()
@@ -176,70 +205,54 @@ export class Interpreter {
}) })
} }
executeUnion(feeds: Feed[], {onEvent, onComplete}: ExecuteOpts) { // Everything else can be compiled to filters
const getOnComplete = this.#getCompletionTracker(onComplete)
for (const subFeed of feeds) { async _unionToFilters(feeds: Feed[]): Promise<Filter[]> {
this.execute(subFeed, {onEvent, onComplete: getOnComplete}) return mergeFilters(await this._feedsToFilters(feeds))
}
} }
executeFilter(filters: DynamicFilter[], opts: ExecuteOpts) { async _listsToFilters(addresses: string[]): Promise<Filter[]> {
this.opts.reqFilters(filters.map(this.#compileFilter), opts) return new Promise(resolve => {
} const events: Rumor[] = []
executeList(addresses: string[], {onEvent, onComplete}: ExecuteOpts) { this.opts.reqFilters(getIdFilters(addresses), {
const getOnComplete = this.#getCompletionTracker(onComplete) onEvent: (event: Rumor) => events.push(event),
onComplete: () => resolve(this._getFiltersFromTags(Tags.fromEvents(events))),
this.opts.reqFilters(getIdFilters(addresses), {
onComplete: getOnComplete(),
onEvent: (event: Rumor) => {
this.opts.reqFilters(this.#getFiltersFromTags(Tags.fromEvent(event)), {
onEvent,
onComplete: getOnComplete(),
})
},
})
}
executeLOL(addresses: string[], {onEvent, onComplete}: ExecuteOpts) {
const getOnComplete = this.#getCompletionTracker(onComplete)
this.opts.reqFilters(getIdFilters(addresses), {
onComplete: getOnComplete(),
onEvent: (event: Rumor) => {
const addresses = Tags.fromEvent(event).values("a").valueOf()
this.opts.reqFilters(getIdFilters(addresses), {
onComplete: getOnComplete(),
onEvent: (event: Rumor) => {
this.opts.reqFilters(this.#getFiltersFromTags(Tags.fromEvent(event)), {
onEvent,
onComplete: getOnComplete(),
})
},
})
},
})
}
executeDVM(requests: DVMRequest[], {onEvent, onComplete}: ExecuteOpts) {
const getOnComplete = this.#getCompletionTracker(onComplete)
for (const request of requests) {
this.opts.reqDvm(request, {
onComplete: getOnComplete(),
onEvent: (event: Rumor) => {
this.opts.reqFilters(this.#getFiltersFromTags(Tags.fromEvent(event)), {
onEvent,
onComplete: getOnComplete(),
})
},
}) })
} })
} }
#getCompletionTracker(onComplete?: () => void) { async _lolsToFilters(addresses: string[]): Promise<Filter[]> {
return new Promise(resolve => {
const events: Rumor[] = []
this.opts.reqFilters(getIdFilters(addresses), {
onEvent: (event: Rumor) => events.push(event),
onComplete: () => resolve(this._listsToFilters(Tags.fromEvents(events).values("a").valueOf())),
})
})
}
async _dvmsToFilters(requests: DVMRequest[]): Promise<Filter[]> {
const events: Rumor[] = []
await Promise.all(
requests.map(request => {
return new Promise<void>(resolve => {
this.opts.reqDvm(request, {
onEvent: (event: Rumor) => events.push(event),
onComplete: resolve,
})
})
})
)
return this._getFiltersFromTags(Tags.fromEvents(events))
}
// Utilities
_getCompletionTracker(onComplete?: () => void) {
let pending = 0 let pending = 0
return () => { return () => {
@@ -255,7 +268,7 @@ export class Interpreter {
} }
} }
#compileFilter({scopes, min_wot, max_wot, until_ago, since_ago, ...filter}: DynamicFilter) { _compileFilter({scopes, min_wot, max_wot, until_ago, since_ago, ...filter}: DynamicFilter) {
if (scopes && !filter.authors) { if (scopes && !filter.authors) {
filter.authors = scopes.flatMap(scope => this.opts.getPubkeysForScope(scope)) filter.authors = scopes.flatMap(scope => this.opts.getPubkeysForScope(scope))
} }
@@ -272,10 +285,10 @@ export class Interpreter {
filter.since = now() - since_ago! filter.since = now() - since_ago!
} }
return filter return filter as Filter
} }
#getFiltersFromTags(tags: Tags) { _getFiltersFromTags(tags: Tags) {
const ttags = tags.values("t") const ttags = tags.values("t")
const ptags = tags.values("p") const ptags = tags.values("p")
const eatags = tags.filterByKey(["e", "a"]).values() const eatags = tags.filterByKey(["e", "a"]).values()