From e5519b633def6feae595c8b087cb2cdc9313fd71 Mon Sep 17 00:00:00 2001 From: Jon Staab Date: Fri, 12 Apr 2024 14:27:55 -0700 Subject: [PATCH] Add feed loader --- packages/custom-feeds/compiler.ts | 142 +++++++++++++++++ packages/custom-feeds/core.ts | 75 +++++++++ packages/custom-feeds/index.ts | 216 +------------------------ packages/custom-feeds/loader.ts | 257 ++++++++++++++++++++++++++++++ packages/lib/tsc-multi.json | 1 + packages/util/Filters.ts | 2 + 6 files changed, 479 insertions(+), 214 deletions(-) create mode 100644 packages/custom-feeds/compiler.ts create mode 100644 packages/custom-feeds/core.ts create mode 100644 packages/custom-feeds/loader.ts diff --git a/packages/custom-feeds/compiler.ts b/packages/custom-feeds/compiler.ts new file mode 100644 index 0000000..7cc0fc6 --- /dev/null +++ b/packages/custom-feeds/compiler.ts @@ -0,0 +1,142 @@ +import {inc, now, isNil} from '@coracle.social/lib' +import type {Rumor, Filter} from '@coracle.social/util' +import {Tags, getIdFilters, mergeFilters} from '@coracle.social/util' +import type {DVMRequest, Scope, Feed, DynamicFilter} from './core' +import {FeedType} from './core' + +export type ExecuteOpts = { + onEvent: (event: E) => void +} + +export type FeedCompilerOpts = { + reqDvm: (request: DVMRequest, opts: ExecuteOpts) => Promise + reqFilters: (filters: Filter[], opts: ExecuteOpts) => Promise + getPubkeysForScope: (scope: Scope) => string[] + getPubkeysForWotRange: (minWot: number, maxWot: number) => string[] +} + +export class FeedCompiler { + constructor(readonly opts: FeedCompilerOpts) {} + + canCompile([type, ...feed]: Feed): boolean { + switch(type) { + case FeedType.Union: + return (feed as Feed[]).every(this.canCompile) + case FeedType.List: + case FeedType.LOL: + case FeedType.DVM: + case FeedType.Filter: + return true + default: + return false + } + } + + async compile([type, ...feed]: Feed) { + switch(type) { + case FeedType.Union: + return await this._compileUnion(feed as Feed[]) + case FeedType.List: + return await this._compileLists(feed as string[]) + case FeedType.LOL: + return await this._compileLols(feed as string[]) + case FeedType.DVM: + return await this._compileDvms(feed as DVMRequest[]) + case FeedType.Filter: + return (feed as DynamicFilter[]).map(filter => this._compileFilter(filter)) + default: + throw new Error(`Unable to convert feed of type ${type} to filters`) + } + } + + async _compileUnion(feeds: Feed[]): Promise { + const filters: Filter[] = [] + + await Promise.all( + feeds.map(async feed => { + for (const filter of await this.compile(feed)) { + filters.push(filter) + } + }) + ) + + return mergeFilters(filters) + } + + async _compileLists(addresses: string[]): Promise { + const events: E[] = [] + + await this.opts.reqFilters(getIdFilters(addresses), {onEvent: events.push}) + + return this._getFiltersFromTags(Tags.fromEvents(events)) + } + + async _compileLols(addresses: string[]): Promise { + const events: E[] = [] + + await this.opts.reqFilters(getIdFilters(addresses), {onEvent: events.push}) + + return this._compileLists(Tags.fromEvents(events).values("a").valueOf()) + } + + async _compileDvms(requests: DVMRequest[]): Promise { + const events: E[] = [] + + await Promise.all(requests.map(request => this.opts.reqDvm(request, {onEvent: events.push}))) + + return this._getFiltersFromTags(Tags.fromEvents(events)) + } + + // Utilities + + _compileFilter({scopes, min_wot, max_wot, until_ago, since_ago, ...filter}: DynamicFilter) { + if (scopes && !filter.authors) { + filter.authors = scopes.flatMap((scope: Scope) => this.opts.getPubkeysForScope(scope)) + } + + if ((!isNil(min_wot) || !isNil(max_wot))) { + const authors = this.opts.getPubkeysForWotRange(min_wot || 0, max_wot || 1) + + if (filter.authors) { + const authorsSet = new Set(authors) + + filter.authors = filter.authors.filter(pubkey => authorsSet.has(pubkey)) + } else { + filter.authors = authors + } + } + + if (!isNil(until_ago)) { + filter.until = now() - until_ago! + } + + if (!isNil(since_ago)) { + filter.since = now() - since_ago! + } + + return filter as Filter + } + + _getFiltersFromTags(tags: Tags) { + const ttags = tags.values("t") + const ptags = tags.values("p") + const eatags = tags.filterByKey(["e", "a"]).values() + const filters: Filter[] = [] + + if (ttags.exists()) { + filters.push({"#t": ttags.valueOf()}) + } + + if (ptags.exists()) { + filters.push({authors: ptags.valueOf()}) + } + + if (eatags.exists()) { + for (const filter of getIdFilters(eatags.valueOf())) { + filters.push(filter) + } + } + + return filters + } +} diff --git a/packages/custom-feeds/core.ts b/packages/custom-feeds/core.ts new file mode 100644 index 0000000..3d9851e --- /dev/null +++ b/packages/custom-feeds/core.ts @@ -0,0 +1,75 @@ + +import {inc, now, isNil} from '@coracle.social/lib' +import type {Rumor, Filter} from '@coracle.social/util' +import {Tags, getIdFilters, mergeFilters} from '@coracle.social/util' + +export enum FeedType { + Difference = "\\", + Intersection = "∩", + SymmetricDifference = "Δ", + Union = "∪", + Filter = "filter", + Relay = "relay", + List = "list", + LOL = "lol", + DVM = "dvm", +} + +export enum Scope { + Self = "self", + Follows = "follows", + Followers = "followers", +} + +export type DynamicFilter = Filter & { + scopes?: Scope[] + min_wot?: number + max_wot?: number + until_ago?: number + since_ago?: number +} + +export type DVMRequest = { + kind: number + tags?: string[][] +} + +export type DifferenceFeed = [FeedType.Difference, ...Feed[]] +export type IntersectionFeed = [FeedType.Intersection, ...Feed[]] +export type SymmetricDifferenceFeed = [FeedType.SymmetricDifference, ...Feed[]] +export type UnionFeed = [FeedType.Union, ...Feed[]] +export type FilterFeed = [FeedType.Filter, ...DynamicFilter[]] +export type RelayFeed = [FeedType.Relay, ...string[]] +export type ListFeed = [FeedType.List, ...string[]] +export type LOLFeed = [FeedType.LOL, ...string[]] +export type DVMFeed = [FeedType.DVM, ...DVMRequest[]] + +export type Feed = + DifferenceFeed | + IntersectionFeed | + SymmetricDifferenceFeed | + UnionFeed | + FilterFeed | + RelayFeed | + ListFeed | + LOLFeed | + DVMFeed + +export const difference = (...feeds: Feed[]) => + [FeedType.Difference, ...feeds] as DifferenceFeed +export const intersection = (...feeds: Feed[]) => + [FeedType.Intersection, ...feeds] as IntersectionFeed +export const symmetricDifference = (...feeds: Feed[]) => + [FeedType.SymmetricDifference, ...feeds] as SymmetricDifferenceFeed +export const union = (...feeds: Feed[]) => + [FeedType.Union, ...feeds] as UnionFeed +export const filter = (...filters: DynamicFilter[]) => + [FeedType.Filter, ...filters] as FilterFeed +export const relay = (...relays: string[]) => + [FeedType.Relay, ...relays] as RelayFeed +export const list = (...addresses: string[]) => + [FeedType.List, ...addresses] as ListFeed +export const lol = (...addresses: string[]) => + [FeedType.LOL, ...addresses] as LOLFeed +export const dvm = (...requests: DVMRequest[]) => + [FeedType.DVM, ...requests] as DVMFeed diff --git a/packages/custom-feeds/index.ts b/packages/custom-feeds/index.ts index 4fa1a4d..8fbf9cd 100644 --- a/packages/custom-feeds/index.ts +++ b/packages/custom-feeds/index.ts @@ -1,214 +1,2 @@ -import {inc, now, isNil} from '@coracle.social/lib' -import type {Rumor, Filter} from '@coracle.social/util' -import {Tags, getIdFilters, mergeFilters} from '@coracle.social/util' - -export enum FeedType { - Union = "union", - Filter = "filter", - List = "list", - LOL = "lol", - DVM = "dvm", -} - -export enum Scope { - Self = "self", - Follows = "follows", - Followers = "followers", -} - -export type DynamicFilter = Filter & { - scopes?: Scope[] - min_wot?: number - max_wot?: number - until_ago?: number - since_ago?: number -} - -export type DVMRequest = { - kind: number - tags?: string[][] -} - -export type UnionFeed = [FeedType.Union, ...Feed[]] -export type FilterFeed = [FeedType.Filter, ...DynamicFilter[]] -export type ListFeed = [FeedType.List, ...string[]] -export type LOLFeed = [FeedType.LOL, ...string[]] -export type DVMFeed = [FeedType.DVM, ...DVMRequest[]] - -export type Feed = UnionFeed | FilterFeed | ListFeed | LOLFeed | DVMFeed - -export const union = (...feeds: Feed[]) => - [FeedType.Union, ...feeds] as UnionFeed -export const filter = (...filters: DynamicFilter[]) => - [FeedType.Filter, ...filters] as FilterFeed -export const list = (...addresses: string[]) => - [FeedType.List, ...addresses] as ListFeed -export const lol = (...addresses: string[]) => - [FeedType.LOL, ...addresses] as LOLFeed -export const dvm = (...requests: DVMRequest[]) => - [FeedType.DVM, ...requests] as DVMFeed - -export type ExecuteOpts = { - onEvent: (event: Rumor) => void - onComplete?: () => void -} - -export type FeedCompilerOpts = { - reqDvm: (request: DVMRequest, opts: ExecuteOpts) => void - reqFilters: (filters: Filter[], opts: ExecuteOpts) => void - getPubkeysForScope: (scope: Scope) => string[] - getPubkeysForWotRange: (minWot: number, maxWot: number) => string[] -} - -export class FeedCompiler { - constructor(readonly opts: FeedCompilerOpts) {} - - // Dispatch to different types of feed - - execute(feed: Feed, opts: ExecuteOpts) { - return this.compile(feed).then(filters => - this.opts.reqFilters(filters, opts) - ) - } - - async compile([type, ...feed]: Feed) { - switch(type) { - case FeedType.Union: - return await this._compileUnion(feed as Feed[]) - case FeedType.List: - return await this._compileLists(feed as string[]) - case FeedType.LOL: - return await this._compileLols(feed as string[]) - case FeedType.DVM: - return await this._compileDvms(feed as DVMRequest[]) - case FeedType.Filter: - return (feed as DynamicFilter[]).map(filter => this._compileFilter(filter)) - default: - throw new Error(`Unable to convert feed of type ${type} to filters`) - } - } - - // Everything can be compiled to filters - - async _compileUnion(feeds: Feed[]): Promise { - const filters: Filter[] = [] - - await Promise.all( - feeds.map(async feed => { - for (const filter of await this.compile(feed)) { - filters.push(filter) - } - }) - ) - - return mergeFilters(filters) - } - - async _compileLists(addresses: string[]): Promise { - return new Promise(resolve => { - const events: Rumor[] = [] - - this.opts.reqFilters(getIdFilters(addresses), { - onEvent: (event: Rumor) =>events.push(event), - onComplete: () => resolve(this._getFiltersFromTags(Tags.fromEvents(events))), - }) - }) - } - - async _compileLols(addresses: string[]): Promise { - return new Promise(resolve => { - const events: Rumor[] = [] - - this.opts.reqFilters(getIdFilters(addresses), { - onEvent: (event: Rumor) => events.push(event), - onComplete: () => resolve(this._compileLists(Tags.fromEvents(events).values("a").valueOf())), - }) - }) - } - - async _compileDvms(requests: DVMRequest[]): Promise { - const events: Rumor[] = [] - - await Promise.all( - requests.map(request => { - return new Promise(resolve => { - this.opts.reqDvm(request, { - onEvent: (event: Rumor) => events.push(event), - onComplete: resolve, - }) - }) - }) - ) - - return this._getFiltersFromTags(Tags.fromEvents(events)) - } - - // Utilities - - _getCompletionTracker(onComplete?: () => void) { - let pending = 0 - - return () => { - pending += 1 - - return () => { - pending -= 1 - - if (pending === 0) { - onComplete?.() - } - } - } - } - - _compileFilter({scopes, min_wot, max_wot, until_ago, since_ago, ...filter}: DynamicFilter) { - if (scopes && !filter.authors) { - filter.authors = scopes.flatMap(scope => this.opts.getPubkeysForScope(scope)) - } - - if ((!isNil(min_wot) || !isNil(max_wot))) { - const authors = this.opts.getPubkeysForWotRange(min_wot || 0, max_wot || 1) - - if (filter.authors) { - const authorsSet = new Set(authors) - - filter.authors = filter.authors.filter(pubkey => authorsSet.has(pubkey)) - } else { - filter.authors = authors - } - } - - if (!isNil(until_ago)) { - filter.until = now() - until_ago! - } - - if (!isNil(since_ago)) { - filter.since = now() - since_ago! - } - - return filter as Filter - } - - _getFiltersFromTags(tags: Tags) { - const ttags = tags.values("t") - const ptags = tags.values("p") - const eatags = tags.filterByKey(["e", "a"]).values() - const filters: Filter[] = [] - - if (ttags.exists()) { - filters.push({"#t": ttags.valueOf()}) - } - - if (ptags.exists()) { - filters.push({authors: ptags.valueOf()}) - } - - if (eatags.exists()) { - for (const filter of getIdFilters(eatags.valueOf())) { - filters.push(filter) - } - } - - return filters - } -} +export * from './core' +export * from './compiler' diff --git a/packages/custom-feeds/loader.ts b/packages/custom-feeds/loader.ts new file mode 100644 index 0000000..496aacf --- /dev/null +++ b/packages/custom-feeds/loader.ts @@ -0,0 +1,257 @@ +import {inc, max, min, now, isNil} from '@coracle.social/lib' +import type {Rumor, Filter} from '@coracle.social/util' +import {Tags, EPOCH, getIdFilters, guessFilterDelta, mergeFilters} from '@coracle.social/util' +import type {DVMRequest, Scope, Feed, DynamicFilter} from './core' +import {FeedType} from './core' +import type {FeedCompiler} from './compiler' + +export type LoaderOpts = { + onEvent: (event: E) => void + onExhausted: () => void +} + +export type Loader = (limit: number) => Promise + +export class FeedLoader { + _loader: Promise + + constructor(readonly compiler: FeedCompiler, feed: Feed, opts: LoaderOpts) { + this._loader = this.getLoader(feed, opts) + } + + loadMore(limit: number) { + this._loader.then(loader => loader(limit)) + } + + async getLoader([type, ...feed]: Feed, opts: LoaderOpts) { + if (this.compiler.canCompile([type, ...feed] as Feed)) { + const filters = await this.compiler.compile([type, ...feed] as Feed) + + return this._getFilterLoader(filters, opts) + } + + switch(type) { + case FeedType.Difference: + return this._getDifferenceLoader(feed as Feed[], opts) + case FeedType.Intersection: + return this._getIntersectionLoader(feed as Feed[], opts) + case FeedType.SymmetricDifference: + return this._getSymmetricDifferenceLoader(feed as Feed[], opts) + case FeedType.Union: + return this._getUnionLoader(feed as Feed[], opts) + default: + throw new Error(`Unable to convert feed of type ${type} to loader`) + } + } + + async _getFilterLoader(filters: Filter[], {onEvent, onExhausted}: LoaderOpts) { + const untils = filters.flatMap((filter: Filter) => filter.until ? [filter.until] : []) + const sinces = filters.flatMap((filter: Filter) => filter.since ? [filter.since] : []) + const maxUntil = untils.length === filters.length ? max(untils) : now() + const minSince = sinces.length === filters.length ? min(sinces) : EPOCH + const initialDelta = guessFilterDelta(filters) + + let delta = initialDelta + let since = maxUntil - delta + let until = maxUntil + + return async (limit: number) => { + const reqFilters = filters + // Remove filters that don't fit our window + .filter((filter: Filter) => { + const filterSince = filter.since || EPOCH + const filterUntil = filter.until || now() + + return filterSince < until && filterUntil > since + }) + // Modify the filters to define our window + .map((filter: Filter) => ({...filter, until, limit, since})) + + let count = 0 + + await this.compiler.opts.reqFilters(filters, { + onEvent: (event: E) => { + count += 1 + since = Math.min(since, event.created_at) + onEvent(event) + }, + }) + + // Relays can't be relied upon to return events in descending order, do exponential + // windowing to ensure we get the most recent stuff on first load, but eventually find it all + if (count === 0) { + delta *= 10 + } + + since = Math.max(minSince, since - delta) + + if (since === minSince) { + onExhausted() + } + } + } + + async _getDifferenceLoader(feeds: Feed[], {onEvent, onExhausted}: LoaderOpts) { + const exhausted = new Set() + const skip = new Set() + const events: E[] = [] + const seen = new Set() + + const loaders = await Promise.all( + feeds.map((feed: Feed, i: number) => + this.getLoader(feed, { + onExhausted: () => exhausted.add(i), + onEvent: (event: E) => { + if (i === 0) { + events.push(event) + } else { + skip.add(event.id) + } + }, + }) + ) + ) + + return async (limit: number) => { + await Promise.all( + loaders.map(async (loader: Loader, i: number) => { + if (exhausted.has(i)) { + return + } + + await loader(limit) + }) + ) + + for (const event of events.splice(0)) { + if (!skip.has(event.id) && !seen.has(event.id)) { + onEvent(event) + seen.add(event.id) + } + } + + if (exhausted.size === loaders.length) { + onExhausted() + } + } + } + + async _getIntersectionLoader(feeds: Feed[], {onEvent, onExhausted}: LoaderOpts) { + const exhausted = new Set() + const counts = new Map() + const events: E[] = [] + const seen = new Set() + + const loaders = await Promise.all( + feeds.map((feed: Feed, i: number) => + this.getLoader(feed, { + onExhausted: () => exhausted.add(i), + onEvent: (event: E) => { + events.push(event) + counts.set(event.id, inc(counts.get(event.id))) + }, + }) + ) + ) + + return async (limit: number) => { + await Promise.all( + loaders.map(async (loader: Loader, i: number) => { + if (exhausted.has(i)) { + return + } + + await loader(limit) + }) + ) + + for (const event of events.splice(0)) { + if (counts.get(event.id) === loaders.length && !seen.has(event.id)) { + onEvent(event) + seen.add(event.id) + } + } + + if (exhausted.size === loaders.length) { + onExhausted() + } + } + } + + async _getSymmetricDifferenceLoader(feeds: Feed[], {onEvent, onExhausted}: LoaderOpts) { + const exhausted = new Set() + const counts = new Map() + const events: E[] = [] + const seen = new Set() + + const loaders = await Promise.all( + feeds.map((feed: Feed, i: number) => + this.getLoader(feed, { + onExhausted: () => exhausted.add(i), + onEvent: (event: E) => { + events.push(event) + counts.set(event.id, inc(counts.get(event.id))) + }, + }) + ) + ) + + return async (limit: number) => { + await Promise.all( + loaders.map(async (loader: Loader, i: number) => { + if (exhausted.has(i)) { + return + } + + await loader(limit) + }) + ) + + for (const event of events.values()) { + if (counts.get(event.id) === 1 && !seen.has(event.id)) { + onEvent(event) + seen.add(event.id) + } + } + + if (exhausted.size === loaders.length) { + onExhausted() + } + } + } + + async _getUnionLoader(feeds: Feed[], {onEvent, onExhausted}: LoaderOpts) { + const exhausted = new Set() + const seen = new Set() + + const loaders = await Promise.all( + feeds.map((feed: Feed, i: number) => + this.getLoader(feed, { + onExhausted: () => exhausted.add(i), + onEvent: (event: E) => { + if (!seen.has(event.id)) { + onEvent(event) + seen.add(event.id) + } + }, + }) + ) + ) + + return async (limit: number) => { + await Promise.all( + loaders.map(async (loader: Loader, i: number) => { + if (exhausted.has(i)) { + return + } + + await loader(limit) + }) + ) + + if (exhausted.size === loaders.length) { + onExhausted() + } + } + } +} diff --git a/packages/lib/tsc-multi.json b/packages/lib/tsc-multi.json index dd7b078..6c37019 100644 --- a/packages/lib/tsc-multi.json +++ b/packages/lib/tsc-multi.json @@ -1,5 +1,6 @@ { "targets": [ + {"extname": ".cjs", "module": "commonjs"}, {"extname": ".mjs", "module": "esnext", "moduleResolution": "node"} ], "projects": ["tsconfig.json"] diff --git a/packages/util/Filters.ts b/packages/util/Filters.ts index 130994b..6adb39f 100644 --- a/packages/util/Filters.ts +++ b/packages/util/Filters.ts @@ -5,6 +5,8 @@ import type {Rumor} from './Events' import {decodeAddress, addressFromEvent, encodeAddress} from './Address' import {isReplaceableKind} from './Kinds' +export const EPOCH = 1609459200 + export type Filter = { ids?: string[] kinds?: number[]