From 3d1a6a106e7cc5570e11e7fc1a754502d8ba3c8a Mon Sep 17 00:00:00 2001 From: Jon Staab Date: Thu, 24 Apr 2025 13:49:02 -0700 Subject: [PATCH] Re-work feed requests --- packages/app/src/feeds.ts | 110 ++----------------------------- packages/app/src/sync.ts | 5 +- packages/feeds/package.json | 4 ++ packages/feeds/src/compiler.ts | 43 +++++++++--- packages/feeds/src/controller.ts | 68 ++++++++++--------- packages/feeds/src/core.ts | 28 +------- packages/feeds/src/request.ts | 110 +++++++++++++++++++++++++++++++ packages/util/src/Relay.ts | 1 + pnpm-lock.yaml | 21 +++--- 9 files changed, 207 insertions(+), 183 deletions(-) create mode 100644 packages/feeds/src/request.ts diff --git a/packages/app/src/feeds.ts b/packages/app/src/feeds.ts index f69c062..3ac277f 100644 --- a/packages/app/src/feeds.ts +++ b/packages/app/src/feeds.ts @@ -1,103 +1,8 @@ -import {nthEq, partition, race, now} from "@welshman/lib" -import {createEvent, getPubkeyTagValues, TrustedEvent} from "@welshman/util" -import {request, Tracker} from "@welshman/net" -import {Scope, FeedController, RequestOpts, FeedOptions, DVMOpts, Feed} from "@welshman/feeds" -import {Router, addMinimalFallbacks, getFilterSelections} from "@welshman/router" -import {makeDvmRequest} from "@welshman/dvm" -import {makeSecret, Nip01Signer} from "@welshman/signer" +import {Scope, FeedController, FeedControllerOptions, Feed} from "@welshman/feeds" import {pubkey, signer} from "./session.js" -import {loadRelaySelections} from "./relaySelections.js" import {wotGraph, maxWot, getFollows, getNetwork, getFollowers} from "./wot.js" import {repository} from "./core.js" -export type FeedRequestHandlerOptions = { - signal?: AbortSignal -} - -export const makeFeedRequestHandler = - ({signal}: FeedRequestHandlerOptions) => - async ({filters = [{}], relays = [], onEvent}: RequestOpts) => { - const tracker = new Tracker() - - if (relays.length > 0) { - await request({tracker, signal, relays, filters, onEvent, autoClose: true}) - } else { - const promises: Promise[] = [] - const [withSearch, withoutSearch] = partition(f => Boolean(f.search), filters) - - if (withSearch.length > 0) { - promises.push( - request({ - signal, - tracker, - onEvent, - threshold: 0.1, - autoClose: true, - filters: withSearch, - relays: Router.get().Search().getUrls(), - }), - ) - } - - if (withoutSearch.length > 0) { - promises.push( - ...getFilterSelections(filters).flatMap(({relays, filters}) => - request({tracker, signal, onEvent, relays, filters, threshold: 0.8, autoClose: true}), - ), - ) - } - - // Break out selections by relay so we can complete early after a certain number - // of requests complete for faster load times - await race(withSearch.length > 0 ? 0.1 : 0.8, promises) - - // Wait until after we've queried the network to access our local cache. This results in less - // snappy response times, but is necessary to prevent stale stuff that the user has already seen - // from showing up at the top of the feed - for (const event of repository.query(filters)) { - onEvent(event) - } - } - } - -export const requestDVM = async ({kind, onEvent, ...request}: DVMOpts) => { - // Make sure we know what relays to use for target dvms - if (request.tags && !request.relays) { - for (const pubkey of getPubkeyTagValues(request.tags)) { - await loadRelaySelections(pubkey) - } - } - - const tags = request.tags || [] - const $signer = signer.get() || new Nip01Signer(makeSecret()) - const pubkey = await $signer.getPubkey() - const relays = - request.relays || - Router.get().FromPubkeys(getPubkeyTagValues(tags)).policy(addMinimalFallbacks).getUrls() - - if (!tags.some(nthEq(0, "expiration"))) { - tags.push(["expiration", String(now() + 60)]) - } - - if (!tags.some(nthEq(0, "relays"))) { - tags.push(["relays", ...relays]) - } - - if (!tags.some(nthEq(1, "user"))) { - tags.push(["param", "user", pubkey]) - } - - if (!tags.some(nthEq(1, "max_results"))) { - tags.push(["param", "max_results", "200"]) - } - - await makeDvmRequest({ - relays, - event: await $signer.sign(createEvent(kind, {tags})), - onResult: onEvent, - }) -} - export const getPubkeysForScope = (scope: string) => { const $pubkey = pubkey.get() @@ -133,16 +38,13 @@ export const getPubkeysForWOTRange = (min: number, max: number) => { return pubkeys } -type _FeedOptions = Partial> & {feed: Feed} +type MakeFeedControllerOptions = Partial> & {feed: Feed} -export const createFeedController = (options: _FeedOptions) => { - const request = makeFeedRequestHandler(options) - - return new FeedController({ - request, - requestDVM, +export const makeFeedController = (options: MakeFeedControllerOptions) => + new FeedController({ + repository, getPubkeysForScope, getPubkeysForWOTRange, + signer: signer.get(), ...options, }) -} diff --git a/packages/app/src/sync.ts b/packages/app/src/sync.ts index 2bbc34b..57e2792 100644 --- a/packages/app/src/sync.ts +++ b/packages/app/src/sync.ts @@ -10,8 +10,9 @@ const query = (filters: Filter[]) => export const hasNegentropy = (url: string) => { const p = relaysByUrl.get().get(url)?.profile - if (p?.supported_nips?.includes(77)) return true - if (p?.software?.includes("strfry") && !p?.version?.match(/^0\./)) return true + if (p?.negentropy) return true + if (p?.supported_nips?.includes?.(77)) return true + if (p?.software?.includes?.("strfry") && !p?.version?.match(/^0\./)) return true return false } diff --git a/packages/feeds/package.json b/packages/feeds/package.json index dc70f81..dddc808 100644 --- a/packages/feeds/package.json +++ b/packages/feeds/package.json @@ -20,7 +20,11 @@ }, "dependencies": { "@welshman/lib": "workspace:*", + "@welshman/dvm": "workspace:*", "@welshman/net": "workspace:*", + "@welshman/relay": "workspace:*", + "@welshman/router": "workspace:*", + "@welshman/signer": "workspace:*", "@welshman/util": "workspace:*", "trava": "^1.2.1" }, diff --git a/packages/feeds/src/compiler.ts b/packages/feeds/src/compiler.ts index 9886482..43dfebb 100644 --- a/packages/feeds/src/compiler.ts +++ b/packages/feeds/src/compiler.ts @@ -1,7 +1,17 @@ import {uniq, identity, flatten, pushToMapKey, intersection, tryCatch, now} from "@welshman/lib" -import type {TrustedEvent, Filter} from "@welshman/util" -import {intersectFilters, matchFilter, getAddress, getIdFilters, unionFilters} from "@welshman/util" -import type { +import { + TrustedEvent, + Filter, + intersectFilters, + matchFilter, + getAddress, + getIdFilters, + unionFilters, +} from "@welshman/util" +import {Repository} from "@welshman/relay" +import {ISigner} from "@welshman/signer" +import {Tracker} from "@welshman/net" +import { CreatedAtItem, RequestItem, ListItem, @@ -10,13 +20,22 @@ import type { DVMItem, Scope, Feed, - FeedOptions, + FeedType, } from "./core.js" import {getFeedArgs, feedsFromTags} from "./utils.js" -import {FeedType} from "./core.js" +import {requestPage, requestDVM} from "./request.js" + +export type FeedCompilerOptions = { + signer?: ISigner + signal?: AbortSignal + tracker?: Tracker + repository?: Repository + getPubkeysForScope: (scope: Scope) => string[] + getPubkeysForWOTRange: (minWOT: number, maxWOT: number) => string[] +} export class FeedCompiler { - constructor(readonly options: FeedOptions) {} + constructor(readonly options: FeedCompilerOptions) {} canCompile(feed: Feed): boolean { switch (feed[0]) { @@ -138,9 +157,10 @@ export class FeedCompiler { await Promise.all( items.map(({mappings, ...request}) => - this.options.requestDVM({ + requestDVM({ ...request, - onEvent: async (e: TrustedEvent) => { + signer: this.options.signer, + onResult: async (e: TrustedEvent) => { const tags = (await tryCatch(() => JSON.parse(e.content))) || [] for (const feed of feedsFromTags(tags, mappings)) { @@ -247,7 +267,10 @@ export class FeedCompiler { const addresses = uniq(listItems.flatMap(({addresses}) => addresses)) const eventsByAddress = new Map() - await this.options.request({ + await requestPage({ + signal: this.options.signal, + tracker: this.options.tracker, + repository: this.options.repository, filters: getIdFilters(addresses), onEvent: (e: TrustedEvent) => eventsByAddress.set(getAddress(e), e), }) @@ -280,7 +303,7 @@ export class FeedCompiler { await Promise.all( labelItems.map(({mappings, relays, ...filter}) => - this.options.request({ + requestPage({ relays, filters: [{kinds: [1985], ...filter}], onEvent: (e: TrustedEvent) => events.push(e), diff --git a/packages/feeds/src/controller.ts b/packages/feeds/src/controller.ts index f5de6b4..98b419c 100644 --- a/packages/feeds/src/controller.ts +++ b/packages/feeds/src/controller.ts @@ -1,14 +1,20 @@ import {inc, memoize, omitVals, max, min, now} from "@welshman/lib" -import type {TrustedEvent, Filter} from "@welshman/util" -import {EPOCH, trimFilters, guessFilterDelta} from "@welshman/util" -import type {Feed, RequestItem, FeedOptions} from "./core.js" -import {FeedType} from "./core.js" -import {FeedCompiler} from "./compiler.js" +import {EPOCH, trimFilters, guessFilterDelta, TrustedEvent, Filter} from "@welshman/util" +import {Feed, FeedType, RequestItem} from "./core.js" +import {FeedCompiler, FeedCompilerOptions} from "./compiler.js" +import {requestPage} from "./request.js" + +export type FeedControllerOptions = FeedCompilerOptions & { + feed: Feed + onEvent?: (event: TrustedEvent) => void + onExhausted?: () => void + useWindowing?: boolean +} export class FeedController { compiler: FeedCompiler - constructor(readonly options: FeedOptions) { + constructor(readonly options: FeedControllerOptions) { this.compiler = new FeedCompiler(options) } @@ -40,8 +46,7 @@ export class FeedController { load = async (limit: number) => (await this.getLoader())(limit) - async _getRequestsLoader(requests: RequestItem[], overrides: Partial = {}) { - const {onEvent, onExhausted} = {...this.options, ...overrides} + async _getRequestsLoader(requests: RequestItem[]) { const seen = new Set() const exhausted = new Set() const loaders = await Promise.all( @@ -50,7 +55,7 @@ export class FeedController { onExhausted: () => exhausted.add(request), onEvent: e => { if (!seen.has(e.id)) { - onEvent?.(e) + this.options.onEvent?.(e) seen.add(e.id) } }, @@ -62,14 +67,15 @@ export class FeedController { await Promise.all(loaders.map(loader => loader(limit))) if (exhausted.size === requests.length) { - onExhausted?.() + this.options?.onExhausted?.() } } } - async _getRequestLoader({relays, filters}: RequestItem, overrides: Partial = {}) { - const {useWindowing, onEvent, onExhausted, request} = {...this.options, ...overrides} - + async _getRequestLoader( + {relays, filters}: RequestItem, + {onEvent, onExhausted}: Pick, + ) { // Make sure we have some kind of filter to send if we've been given an empty one, as happens with relay feeds if (!filters || filters.length === 0) { filters = [{}] @@ -83,7 +89,7 @@ export class FeedController { let loading = false let delta = initialDelta - let since = useWindowing ? maxUntil - delta : minSince + let since = this.options.useWindowing ? maxUntil - delta : minSince let until = maxUntil return async (limit: number) => { @@ -110,10 +116,13 @@ export class FeedController { let count = 0 - await request( + await requestPage( omitVals([undefined], { relays, filters: trimFilters(requestFilters), + signal: this.options.signal, + tracker: this.options.tracker, + repository: this.options.repository, onEvent: (event: TrustedEvent) => { count += 1 until = Math.min(until, event.created_at - 1) @@ -122,7 +131,7 @@ export class FeedController { }), ) - if (useWindowing) { + if (this.options.useWindowing) { if (since === minSince) { onExhausted?.() } @@ -143,8 +152,7 @@ export class FeedController { } } - async _getDifferenceLoader(feeds: Feed[], overrides: Partial = {}) { - const {onEvent, onExhausted, ...options} = {...this.options, ...overrides} + async _getDifferenceLoader(feeds: Feed[]) { const exhausted = new Set() const skip = new Set() const events: TrustedEvent[] = [] @@ -154,7 +162,7 @@ export class FeedController { feeds.map( (thisFeed: Feed, i: number) => new FeedController({ - ...options, + ...this.options, feed: thisFeed, onExhausted: () => exhausted.add(i), onEvent: (event: TrustedEvent) => { @@ -181,19 +189,18 @@ export class FeedController { for (const event of events.splice(0)) { if (!skip.has(event.id) && !seen.has(event.id)) { - onEvent?.(event) + this.options.onEvent?.(event) seen.add(event.id) } } if (exhausted.size === controllers.length) { - onExhausted?.() + this.options.onExhausted?.() } } } - async _getIntersectionLoader(feeds: Feed[], overrides: Partial = {}) { - const {onEvent, onExhausted, ...options} = {...this.options, ...overrides} + async _getIntersectionLoader(feeds: Feed[]) { const exhausted = new Set() const counts = new Map() const events: TrustedEvent[] = [] @@ -203,7 +210,7 @@ export class FeedController { feeds.map( (thisFeed: Feed, i: number) => new FeedController({ - ...options, + ...this.options, feed: thisFeed, onExhausted: () => exhausted.add(i), onEvent: (event: TrustedEvent) => { @@ -227,19 +234,18 @@ export class FeedController { for (const event of events.splice(0)) { if (counts.get(event.id) === controllers.length && !seen.has(event.id)) { - onEvent?.(event) + this.options.onEvent?.(event) seen.add(event.id) } } if (exhausted.size === controllers.length) { - onExhausted?.() + this.options.onExhausted?.() } } } - async _getUnionLoader(feeds: Feed[], overrides: Partial = {}) { - const {onEvent, onExhausted, ...options} = {...this.options, ...overrides} + async _getUnionLoader(feeds: Feed[]) { const exhausted = new Set() const seen = new Set() @@ -247,12 +253,12 @@ export class FeedController { feeds.map( (thisFeed: Feed, i: number) => new FeedController({ - ...options, + ...this.options, feed: thisFeed, onExhausted: () => exhausted.add(i), onEvent: (event: TrustedEvent) => { if (!seen.has(event.id)) { - onEvent?.(event) + this.options.onEvent?.(event) seen.add(event.id) } }, @@ -272,7 +278,7 @@ export class FeedController { ) if (exhausted.size === controllers.length) { - onExhausted?.() + this.options.onExhausted?.() } } } diff --git a/packages/feeds/src/core.ts b/packages/feeds/src/core.ts index 84c7c8d..11094bb 100644 --- a/packages/feeds/src/core.ts +++ b/packages/feeds/src/core.ts @@ -1,4 +1,4 @@ -import type {TrustedEvent, Filter} from "@welshman/util" +import {Filter} from "@welshman/util" export enum FeedType { Address = "address", @@ -108,29 +108,3 @@ export type RequestItem = { relays?: string[] filters?: Filter[] } - -export type RequestOpts = RequestItem & { - onEvent: (event: TrustedEvent) => void -} - -export type DVMRequest = { - kind: number - tags?: string[][] - relays?: string[] -} - -export type DVMOpts = DVMRequest & { - onEvent: (event: TrustedEvent) => void -} - -export type FeedOptions = { - feed: Feed - request: (opts: RequestOpts) => Promise - requestDVM: (opts: DVMOpts) => Promise - getPubkeysForScope: (scope: Scope) => string[] - getPubkeysForWOTRange: (minWOT: number, maxWOT: number) => string[] - onEvent?: (event: TrustedEvent) => void - onExhausted?: () => void - useWindowing?: boolean - signal?: AbortSignal -} diff --git a/packages/feeds/src/request.ts b/packages/feeds/src/request.ts new file mode 100644 index 0000000..950f772 --- /dev/null +++ b/packages/feeds/src/request.ts @@ -0,0 +1,110 @@ +import {partition, now, nthEq, race} from "@welshman/lib" +import {makeEvent, Filter, getPubkeyTagValues, TrustedEvent} from "@welshman/util" +import {Nip01Signer, ISigner} from "@welshman/signer" +import {Repository} from "@welshman/relay" +import {Router, getFilterSelections, addMinimalFallbacks} from "@welshman/router" +import {Tracker, request} from "@welshman/net" +import {makeDvmRequest} from "@welshman/dvm" + +export type RequestPageOptions = { + filters?: Filter[] + relays?: string[] + signal?: AbortSignal + tracker?: Tracker + repository?: Repository + onEvent?: (event: TrustedEvent) => void +} + +export const requestPage = async ({ + filters = [{}], + relays = [], + onEvent, + signal, + repository, + tracker = new Tracker(), +}: RequestPageOptions) => { + if (relays.length > 0) { + return request({tracker, signal, relays, filters, onEvent, autoClose: true}) + } + + const promises: Promise[] = [] + const [withSearch, withoutSearch] = partition(f => Boolean(f.search), filters) + + if (withSearch.length > 0) { + promises.push( + request({ + signal, + tracker, + onEvent, + threshold: 0.1, + autoClose: true, + filters: withSearch, + relays: Router.get().Search().getUrls(), + }), + ) + } + + if (withoutSearch.length > 0) { + promises.push( + ...getFilterSelections(filters).flatMap(({relays, filters}) => + request({tracker, signal, onEvent, relays, filters, threshold: 0.8, autoClose: true}), + ), + ) + } + + // Break out selections by relay so we can complete early after a certain number + // of requests complete for faster load times + await race(withSearch.length > 0 ? 0.1 : 0.8, promises) + + // Wait until after we've queried the network to access our local cache. This results in less + // snappy response times, but is necessary to prevent stale stuff that the user has already seen + // from showing up at the top of the feed + if (repository) { + for (const event of repository.query(filters)) { + onEvent?.(event) + } + } +} + +export type RequestDVMOptions = { + kind: number + tags?: string[][] + relays?: string[] + signer?: ISigner + onResult: (event: TrustedEvent) => void +} + +export const requestDVM = async ({ + kind, + onResult, + tags = [], + relays = [], + signer = Nip01Signer.ephemeral(), +}: RequestDVMOptions) => { + if (relays.length === 0) { + relays = Router.get() + .FromPubkeys(getPubkeyTagValues(tags)) + .policy(addMinimalFallbacks) + .getUrls() + } + + if (!tags.some(nthEq(0, "expiration"))) { + tags.push(["expiration", String(now() + 60)]) + } + + if (!tags.some(nthEq(0, "relays"))) { + tags.push(["relays", ...relays]) + } + + if (!tags.some(nthEq(1, "user"))) { + tags.push(["param", "user", await signer.getPubkey()]) + } + + if (!tags.some(nthEq(1, "max_results"))) { + tags.push(["param", "max_results", "200"]) + } + + const event = await signer.sign(makeEvent(kind, {tags})) + + await makeDvmRequest({relays, event, onResult}) +} diff --git a/packages/util/src/Relay.ts b/packages/util/src/Relay.ts index 5ad4772..1c38910 100644 --- a/packages/util/src/Relay.ts +++ b/packages/util/src/Relay.ts @@ -11,6 +11,7 @@ export type RelayProfile = { contact?: string software?: string version?: string + negentropy?: number description?: string supported_nips?: number[] limitation?: { diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 827676c..c88b5aa 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -213,12 +213,24 @@ importers: packages/feeds: dependencies: + '@welshman/dvm': + specifier: workspace:* + version: link:../dvm '@welshman/lib': specifier: workspace:* version: link:../lib '@welshman/net': specifier: workspace:* version: link:../net + '@welshman/relay': + specifier: workspace:* + version: link:../relay + '@welshman/router': + specifier: workspace:* + version: link:../router + '@welshman/signer': + specifier: workspace:* + version: link:../signer '@welshman/util': specifier: workspace:* version: link:../util @@ -309,15 +321,6 @@ importers: specifier: ~5.8.0 version: 5.8.2 - packages/schema: - devDependencies: - rimraf: - specifier: ~6.0.0 - version: 6.0.1 - typescript: - specifier: ~5.8.0 - version: 5.8.2 - packages/signer: dependencies: '@noble/curves':