Add feed loader

This commit is contained in:
Jon Staab
2024-04-12 14:27:55 -07:00
parent 9530ab4cf4
commit e5519b633d
6 changed files with 479 additions and 214 deletions
+142
View File
@@ -0,0 +1,142 @@
import {inc, now, isNil} from '@coracle.social/lib'
import type {Rumor, Filter} from '@coracle.social/util'
import {Tags, getIdFilters, mergeFilters} from '@coracle.social/util'
import type {DVMRequest, Scope, Feed, DynamicFilter} from './core'
import {FeedType} from './core'
export type ExecuteOpts<E> = {
onEvent: (event: E) => void
}
export type FeedCompilerOpts<E> = {
reqDvm: (request: DVMRequest, opts: ExecuteOpts<E>) => Promise<void>
reqFilters: (filters: Filter[], opts: ExecuteOpts<E>) => Promise<void>
getPubkeysForScope: (scope: Scope) => string[]
getPubkeysForWotRange: (minWot: number, maxWot: number) => string[]
}
export class FeedCompiler<E extends Rumor> {
constructor(readonly opts: FeedCompilerOpts<E>) {}
canCompile([type, ...feed]: Feed): boolean {
switch(type) {
case FeedType.Union:
return (feed as Feed[]).every(this.canCompile)
case FeedType.List:
case FeedType.LOL:
case FeedType.DVM:
case FeedType.Filter:
return true
default:
return false
}
}
async compile([type, ...feed]: Feed) {
switch(type) {
case FeedType.Union:
return await this._compileUnion(feed as Feed[])
case FeedType.List:
return await this._compileLists(feed as string[])
case FeedType.LOL:
return await this._compileLols(feed as string[])
case FeedType.DVM:
return await this._compileDvms(feed as DVMRequest[])
case FeedType.Filter:
return (feed as DynamicFilter[]).map(filter => this._compileFilter(filter))
default:
throw new Error(`Unable to convert feed of type ${type} to filters`)
}
}
async _compileUnion(feeds: Feed[]): Promise<Filter[]> {
const filters: Filter[] = []
await Promise.all(
feeds.map(async feed => {
for (const filter of await this.compile(feed)) {
filters.push(filter)
}
})
)
return mergeFilters(filters)
}
async _compileLists(addresses: string[]): Promise<Filter[]> {
const events: E[] = []
await this.opts.reqFilters(getIdFilters(addresses), {onEvent: events.push})
return this._getFiltersFromTags(Tags.fromEvents(events))
}
async _compileLols(addresses: string[]): Promise<Filter[]> {
const events: E[] = []
await this.opts.reqFilters(getIdFilters(addresses), {onEvent: events.push})
return this._compileLists(Tags.fromEvents(events).values("a").valueOf())
}
async _compileDvms(requests: DVMRequest[]): Promise<Filter[]> {
const events: E[] = []
await Promise.all(requests.map(request => this.opts.reqDvm(request, {onEvent: events.push})))
return this._getFiltersFromTags(Tags.fromEvents(events))
}
// Utilities
_compileFilter({scopes, min_wot, max_wot, until_ago, since_ago, ...filter}: DynamicFilter) {
if (scopes && !filter.authors) {
filter.authors = scopes.flatMap((scope: Scope) => this.opts.getPubkeysForScope(scope))
}
if ((!isNil(min_wot) || !isNil(max_wot))) {
const authors = this.opts.getPubkeysForWotRange(min_wot || 0, max_wot || 1)
if (filter.authors) {
const authorsSet = new Set(authors)
filter.authors = filter.authors.filter(pubkey => authorsSet.has(pubkey))
} else {
filter.authors = authors
}
}
if (!isNil(until_ago)) {
filter.until = now() - until_ago!
}
if (!isNil(since_ago)) {
filter.since = now() - since_ago!
}
return filter as Filter
}
_getFiltersFromTags(tags: Tags) {
const ttags = tags.values("t")
const ptags = tags.values("p")
const eatags = tags.filterByKey(["e", "a"]).values()
const filters: Filter[] = []
if (ttags.exists()) {
filters.push({"#t": ttags.valueOf()})
}
if (ptags.exists()) {
filters.push({authors: ptags.valueOf()})
}
if (eatags.exists()) {
for (const filter of getIdFilters(eatags.valueOf())) {
filters.push(filter)
}
}
return filters
}
}
+75
View File
@@ -0,0 +1,75 @@
import {inc, now, isNil} from '@coracle.social/lib'
import type {Rumor, Filter} from '@coracle.social/util'
import {Tags, getIdFilters, mergeFilters} from '@coracle.social/util'
export enum FeedType {
Difference = "\\",
Intersection = "∩",
SymmetricDifference = "Δ",
Union = "",
Filter = "filter",
Relay = "relay",
List = "list",
LOL = "lol",
DVM = "dvm",
}
export enum Scope {
Self = "self",
Follows = "follows",
Followers = "followers",
}
export type DynamicFilter = Filter & {
scopes?: Scope[]
min_wot?: number
max_wot?: number
until_ago?: number
since_ago?: number
}
export type DVMRequest = {
kind: number
tags?: string[][]
}
export type DifferenceFeed = [FeedType.Difference, ...Feed[]]
export type IntersectionFeed = [FeedType.Intersection, ...Feed[]]
export type SymmetricDifferenceFeed = [FeedType.SymmetricDifference, ...Feed[]]
export type UnionFeed = [FeedType.Union, ...Feed[]]
export type FilterFeed = [FeedType.Filter, ...DynamicFilter[]]
export type RelayFeed = [FeedType.Relay, ...string[]]
export type ListFeed = [FeedType.List, ...string[]]
export type LOLFeed = [FeedType.LOL, ...string[]]
export type DVMFeed = [FeedType.DVM, ...DVMRequest[]]
export type Feed =
DifferenceFeed |
IntersectionFeed |
SymmetricDifferenceFeed |
UnionFeed |
FilterFeed |
RelayFeed |
ListFeed |
LOLFeed |
DVMFeed
export const difference = (...feeds: Feed[]) =>
[FeedType.Difference, ...feeds] as DifferenceFeed
export const intersection = (...feeds: Feed[]) =>
[FeedType.Intersection, ...feeds] as IntersectionFeed
export const symmetricDifference = (...feeds: Feed[]) =>
[FeedType.SymmetricDifference, ...feeds] as SymmetricDifferenceFeed
export const union = (...feeds: Feed[]) =>
[FeedType.Union, ...feeds] as UnionFeed
export const filter = (...filters: DynamicFilter[]) =>
[FeedType.Filter, ...filters] as FilterFeed
export const relay = (...relays: string[]) =>
[FeedType.Relay, ...relays] as RelayFeed
export const list = (...addresses: string[]) =>
[FeedType.List, ...addresses] as ListFeed
export const lol = (...addresses: string[]) =>
[FeedType.LOL, ...addresses] as LOLFeed
export const dvm = (...requests: DVMRequest[]) =>
[FeedType.DVM, ...requests] as DVMFeed
+2 -214
View File
@@ -1,214 +1,2 @@
import {inc, now, isNil} from '@coracle.social/lib'
import type {Rumor, Filter} from '@coracle.social/util'
import {Tags, getIdFilters, mergeFilters} from '@coracle.social/util'
export enum FeedType {
Union = "union",
Filter = "filter",
List = "list",
LOL = "lol",
DVM = "dvm",
}
export enum Scope {
Self = "self",
Follows = "follows",
Followers = "followers",
}
export type DynamicFilter = Filter & {
scopes?: Scope[]
min_wot?: number
max_wot?: number
until_ago?: number
since_ago?: number
}
export type DVMRequest = {
kind: number
tags?: string[][]
}
export type UnionFeed = [FeedType.Union, ...Feed[]]
export type FilterFeed = [FeedType.Filter, ...DynamicFilter[]]
export type ListFeed = [FeedType.List, ...string[]]
export type LOLFeed = [FeedType.LOL, ...string[]]
export type DVMFeed = [FeedType.DVM, ...DVMRequest[]]
export type Feed = UnionFeed | FilterFeed | ListFeed | LOLFeed | DVMFeed
export const union = (...feeds: Feed[]) =>
[FeedType.Union, ...feeds] as UnionFeed
export const filter = (...filters: DynamicFilter[]) =>
[FeedType.Filter, ...filters] as FilterFeed
export const list = (...addresses: string[]) =>
[FeedType.List, ...addresses] as ListFeed
export const lol = (...addresses: string[]) =>
[FeedType.LOL, ...addresses] as LOLFeed
export const dvm = (...requests: DVMRequest[]) =>
[FeedType.DVM, ...requests] as DVMFeed
export type ExecuteOpts = {
onEvent: (event: Rumor) => void
onComplete?: () => void
}
export type FeedCompilerOpts = {
reqDvm: (request: DVMRequest, opts: ExecuteOpts) => void
reqFilters: (filters: Filter[], opts: ExecuteOpts) => void
getPubkeysForScope: (scope: Scope) => string[]
getPubkeysForWotRange: (minWot: number, maxWot: number) => string[]
}
export class FeedCompiler {
constructor(readonly opts: FeedCompilerOpts) {}
// Dispatch to different types of feed
execute(feed: Feed, opts: ExecuteOpts) {
return this.compile(feed).then(filters =>
this.opts.reqFilters(filters, opts)
)
}
async compile([type, ...feed]: Feed) {
switch(type) {
case FeedType.Union:
return await this._compileUnion(feed as Feed[])
case FeedType.List:
return await this._compileLists(feed as string[])
case FeedType.LOL:
return await this._compileLols(feed as string[])
case FeedType.DVM:
return await this._compileDvms(feed as DVMRequest[])
case FeedType.Filter:
return (feed as DynamicFilter[]).map(filter => this._compileFilter(filter))
default:
throw new Error(`Unable to convert feed of type ${type} to filters`)
}
}
// Everything can be compiled to filters
async _compileUnion(feeds: Feed[]): Promise<Filter[]> {
const filters: Filter[] = []
await Promise.all(
feeds.map(async feed => {
for (const filter of await this.compile(feed)) {
filters.push(filter)
}
})
)
return mergeFilters(filters)
}
async _compileLists(addresses: string[]): Promise<Filter[]> {
return new Promise(resolve => {
const events: Rumor[] = []
this.opts.reqFilters(getIdFilters(addresses), {
onEvent: (event: Rumor) =>events.push(event),
onComplete: () => resolve(this._getFiltersFromTags(Tags.fromEvents(events))),
})
})
}
async _compileLols(addresses: string[]): Promise<Filter[]> {
return new Promise(resolve => {
const events: Rumor[] = []
this.opts.reqFilters(getIdFilters(addresses), {
onEvent: (event: Rumor) => events.push(event),
onComplete: () => resolve(this._compileLists(Tags.fromEvents(events).values("a").valueOf())),
})
})
}
async _compileDvms(requests: DVMRequest[]): Promise<Filter[]> {
const events: Rumor[] = []
await Promise.all(
requests.map(request => {
return new Promise<void>(resolve => {
this.opts.reqDvm(request, {
onEvent: (event: Rumor) => events.push(event),
onComplete: resolve,
})
})
})
)
return this._getFiltersFromTags(Tags.fromEvents(events))
}
// Utilities
_getCompletionTracker(onComplete?: () => void) {
let pending = 0
return () => {
pending += 1
return () => {
pending -= 1
if (pending === 0) {
onComplete?.()
}
}
}
}
_compileFilter({scopes, min_wot, max_wot, until_ago, since_ago, ...filter}: DynamicFilter) {
if (scopes && !filter.authors) {
filter.authors = scopes.flatMap(scope => this.opts.getPubkeysForScope(scope))
}
if ((!isNil(min_wot) || !isNil(max_wot))) {
const authors = this.opts.getPubkeysForWotRange(min_wot || 0, max_wot || 1)
if (filter.authors) {
const authorsSet = new Set(authors)
filter.authors = filter.authors.filter(pubkey => authorsSet.has(pubkey))
} else {
filter.authors = authors
}
}
if (!isNil(until_ago)) {
filter.until = now() - until_ago!
}
if (!isNil(since_ago)) {
filter.since = now() - since_ago!
}
return filter as Filter
}
_getFiltersFromTags(tags: Tags) {
const ttags = tags.values("t")
const ptags = tags.values("p")
const eatags = tags.filterByKey(["e", "a"]).values()
const filters: Filter[] = []
if (ttags.exists()) {
filters.push({"#t": ttags.valueOf()})
}
if (ptags.exists()) {
filters.push({authors: ptags.valueOf()})
}
if (eatags.exists()) {
for (const filter of getIdFilters(eatags.valueOf())) {
filters.push(filter)
}
}
return filters
}
}
export * from './core'
export * from './compiler'
+257
View File
@@ -0,0 +1,257 @@
import {inc, max, min, now, isNil} from '@coracle.social/lib'
import type {Rumor, Filter} from '@coracle.social/util'
import {Tags, EPOCH, getIdFilters, guessFilterDelta, mergeFilters} from '@coracle.social/util'
import type {DVMRequest, Scope, Feed, DynamicFilter} from './core'
import {FeedType} from './core'
import type {FeedCompiler} from './compiler'
export type LoaderOpts<E> = {
onEvent: (event: E) => void
onExhausted: () => void
}
export type Loader = (limit: number) => Promise<void>
export class FeedLoader<E extends Rumor> {
_loader: Promise<Loader>
constructor(readonly compiler: FeedCompiler<E>, feed: Feed, opts: LoaderOpts<E>) {
this._loader = this.getLoader(feed, opts)
}
loadMore(limit: number) {
this._loader.then(loader => loader(limit))
}
async getLoader([type, ...feed]: Feed, opts: LoaderOpts<E>) {
if (this.compiler.canCompile([type, ...feed] as Feed)) {
const filters = await this.compiler.compile([type, ...feed] as Feed)
return this._getFilterLoader(filters, opts)
}
switch(type) {
case FeedType.Difference:
return this._getDifferenceLoader(feed as Feed[], opts)
case FeedType.Intersection:
return this._getIntersectionLoader(feed as Feed[], opts)
case FeedType.SymmetricDifference:
return this._getSymmetricDifferenceLoader(feed as Feed[], opts)
case FeedType.Union:
return this._getUnionLoader(feed as Feed[], opts)
default:
throw new Error(`Unable to convert feed of type ${type} to loader`)
}
}
async _getFilterLoader(filters: Filter[], {onEvent, onExhausted}: LoaderOpts<E>) {
const untils = filters.flatMap((filter: Filter) => filter.until ? [filter.until] : [])
const sinces = filters.flatMap((filter: Filter) => filter.since ? [filter.since] : [])
const maxUntil = untils.length === filters.length ? max(untils) : now()
const minSince = sinces.length === filters.length ? min(sinces) : EPOCH
const initialDelta = guessFilterDelta(filters)
let delta = initialDelta
let since = maxUntil - delta
let until = maxUntil
return async (limit: number) => {
const reqFilters = filters
// Remove filters that don't fit our window
.filter((filter: Filter) => {
const filterSince = filter.since || EPOCH
const filterUntil = filter.until || now()
return filterSince < until && filterUntil > since
})
// Modify the filters to define our window
.map((filter: Filter) => ({...filter, until, limit, since}))
let count = 0
await this.compiler.opts.reqFilters(filters, {
onEvent: (event: E) => {
count += 1
since = Math.min(since, event.created_at)
onEvent(event)
},
})
// Relays can't be relied upon to return events in descending order, do exponential
// windowing to ensure we get the most recent stuff on first load, but eventually find it all
if (count === 0) {
delta *= 10
}
since = Math.max(minSince, since - delta)
if (since === minSince) {
onExhausted()
}
}
}
async _getDifferenceLoader(feeds: Feed[], {onEvent, onExhausted}: LoaderOpts<E>) {
const exhausted = new Set<number>()
const skip = new Set<string>()
const events: E[] = []
const seen = new Set()
const loaders = await Promise.all(
feeds.map((feed: Feed, i: number) =>
this.getLoader(feed, {
onExhausted: () => exhausted.add(i),
onEvent: (event: E) => {
if (i === 0) {
events.push(event)
} else {
skip.add(event.id)
}
},
})
)
)
return async (limit: number) => {
await Promise.all(
loaders.map(async (loader: Loader, i: number) => {
if (exhausted.has(i)) {
return
}
await loader(limit)
})
)
for (const event of events.splice(0)) {
if (!skip.has(event.id) && !seen.has(event.id)) {
onEvent(event)
seen.add(event.id)
}
}
if (exhausted.size === loaders.length) {
onExhausted()
}
}
}
async _getIntersectionLoader(feeds: Feed[], {onEvent, onExhausted}: LoaderOpts<E>) {
const exhausted = new Set<number>()
const counts = new Map<string, number>()
const events: E[] = []
const seen = new Set()
const loaders = await Promise.all(
feeds.map((feed: Feed, i: number) =>
this.getLoader(feed, {
onExhausted: () => exhausted.add(i),
onEvent: (event: E) => {
events.push(event)
counts.set(event.id, inc(counts.get(event.id)))
},
})
)
)
return async (limit: number) => {
await Promise.all(
loaders.map(async (loader: Loader, i: number) => {
if (exhausted.has(i)) {
return
}
await loader(limit)
})
)
for (const event of events.splice(0)) {
if (counts.get(event.id) === loaders.length && !seen.has(event.id)) {
onEvent(event)
seen.add(event.id)
}
}
if (exhausted.size === loaders.length) {
onExhausted()
}
}
}
async _getSymmetricDifferenceLoader(feeds: Feed[], {onEvent, onExhausted}: LoaderOpts<E>) {
const exhausted = new Set<number>()
const counts = new Map<string, number>()
const events: E[] = []
const seen = new Set()
const loaders = await Promise.all(
feeds.map((feed: Feed, i: number) =>
this.getLoader(feed, {
onExhausted: () => exhausted.add(i),
onEvent: (event: E) => {
events.push(event)
counts.set(event.id, inc(counts.get(event.id)))
},
})
)
)
return async (limit: number) => {
await Promise.all(
loaders.map(async (loader: Loader, i: number) => {
if (exhausted.has(i)) {
return
}
await loader(limit)
})
)
for (const event of events.values()) {
if (counts.get(event.id) === 1 && !seen.has(event.id)) {
onEvent(event)
seen.add(event.id)
}
}
if (exhausted.size === loaders.length) {
onExhausted()
}
}
}
async _getUnionLoader(feeds: Feed[], {onEvent, onExhausted}: LoaderOpts<E>) {
const exhausted = new Set<number>()
const seen = new Set()
const loaders = await Promise.all(
feeds.map((feed: Feed, i: number) =>
this.getLoader(feed, {
onExhausted: () => exhausted.add(i),
onEvent: (event: E) => {
if (!seen.has(event.id)) {
onEvent(event)
seen.add(event.id)
}
},
})
)
)
return async (limit: number) => {
await Promise.all(
loaders.map(async (loader: Loader, i: number) => {
if (exhausted.has(i)) {
return
}
await loader(limit)
})
)
if (exhausted.size === loaders.length) {
onExhausted()
}
}
}
}
+1
View File
@@ -1,5 +1,6 @@
{
"targets": [
{"extname": ".cjs", "module": "commonjs"},
{"extname": ".mjs", "module": "esnext", "moduleResolution": "node"}
],
"projects": ["tsconfig.json"]
+2
View File
@@ -5,6 +5,8 @@ import type {Rumor} from './Events'
import {decodeAddress, addressFromEvent, encodeAddress} from './Address'
import {isReplaceableKind} from './Kinds'
export const EPOCH = 1609459200
export type Filter = {
ids?: string[]
kinds?: number[]