Re-work feed requests

This commit is contained in:
Jon Staab
2025-04-24 13:49:02 -07:00
parent 4031564c6c
commit 3d1a6a106e
9 changed files with 207 additions and 183 deletions
+6 -104
View File
@@ -1,103 +1,8 @@
import {nthEq, partition, race, now} from "@welshman/lib" import {Scope, FeedController, FeedControllerOptions, Feed} from "@welshman/feeds"
import {createEvent, getPubkeyTagValues, TrustedEvent} from "@welshman/util"
import {request, Tracker} from "@welshman/net"
import {Scope, FeedController, RequestOpts, FeedOptions, DVMOpts, Feed} from "@welshman/feeds"
import {Router, addMinimalFallbacks, getFilterSelections} from "@welshman/router"
import {makeDvmRequest} from "@welshman/dvm"
import {makeSecret, Nip01Signer} from "@welshman/signer"
import {pubkey, signer} from "./session.js" import {pubkey, signer} from "./session.js"
import {loadRelaySelections} from "./relaySelections.js"
import {wotGraph, maxWot, getFollows, getNetwork, getFollowers} from "./wot.js" import {wotGraph, maxWot, getFollows, getNetwork, getFollowers} from "./wot.js"
import {repository} from "./core.js" import {repository} from "./core.js"
export type FeedRequestHandlerOptions = {
signal?: AbortSignal
}
export const makeFeedRequestHandler =
({signal}: FeedRequestHandlerOptions) =>
async ({filters = [{}], relays = [], onEvent}: RequestOpts) => {
const tracker = new Tracker()
if (relays.length > 0) {
await request({tracker, signal, relays, filters, onEvent, autoClose: true})
} else {
const promises: Promise<TrustedEvent[]>[] = []
const [withSearch, withoutSearch] = partition(f => Boolean(f.search), filters)
if (withSearch.length > 0) {
promises.push(
request({
signal,
tracker,
onEvent,
threshold: 0.1,
autoClose: true,
filters: withSearch,
relays: Router.get().Search().getUrls(),
}),
)
}
if (withoutSearch.length > 0) {
promises.push(
...getFilterSelections(filters).flatMap(({relays, filters}) =>
request({tracker, signal, onEvent, relays, filters, threshold: 0.8, autoClose: true}),
),
)
}
// Break out selections by relay so we can complete early after a certain number
// of requests complete for faster load times
await race(withSearch.length > 0 ? 0.1 : 0.8, promises)
// Wait until after we've queried the network to access our local cache. This results in less
// snappy response times, but is necessary to prevent stale stuff that the user has already seen
// from showing up at the top of the feed
for (const event of repository.query(filters)) {
onEvent(event)
}
}
}
export const requestDVM = async ({kind, onEvent, ...request}: DVMOpts) => {
// Make sure we know what relays to use for target dvms
if (request.tags && !request.relays) {
for (const pubkey of getPubkeyTagValues(request.tags)) {
await loadRelaySelections(pubkey)
}
}
const tags = request.tags || []
const $signer = signer.get() || new Nip01Signer(makeSecret())
const pubkey = await $signer.getPubkey()
const relays =
request.relays ||
Router.get().FromPubkeys(getPubkeyTagValues(tags)).policy(addMinimalFallbacks).getUrls()
if (!tags.some(nthEq(0, "expiration"))) {
tags.push(["expiration", String(now() + 60)])
}
if (!tags.some(nthEq(0, "relays"))) {
tags.push(["relays", ...relays])
}
if (!tags.some(nthEq(1, "user"))) {
tags.push(["param", "user", pubkey])
}
if (!tags.some(nthEq(1, "max_results"))) {
tags.push(["param", "max_results", "200"])
}
await makeDvmRequest({
relays,
event: await $signer.sign(createEvent(kind, {tags})),
onResult: onEvent,
})
}
export const getPubkeysForScope = (scope: string) => { export const getPubkeysForScope = (scope: string) => {
const $pubkey = pubkey.get() const $pubkey = pubkey.get()
@@ -133,16 +38,13 @@ export const getPubkeysForWOTRange = (min: number, max: number) => {
return pubkeys return pubkeys
} }
type _FeedOptions = Partial<Omit<FeedOptions, "feed">> & {feed: Feed} type MakeFeedControllerOptions = Partial<Omit<FeedControllerOptions, "feed">> & {feed: Feed}
export const createFeedController = (options: _FeedOptions) => { export const makeFeedController = (options: MakeFeedControllerOptions) =>
const request = makeFeedRequestHandler(options) new FeedController({
repository,
return new FeedController({
request,
requestDVM,
getPubkeysForScope, getPubkeysForScope,
getPubkeysForWOTRange, getPubkeysForWOTRange,
signer: signer.get(),
...options, ...options,
}) })
}
+3 -2
View File
@@ -10,8 +10,9 @@ const query = (filters: Filter[]) =>
export const hasNegentropy = (url: string) => { export const hasNegentropy = (url: string) => {
const p = relaysByUrl.get().get(url)?.profile const p = relaysByUrl.get().get(url)?.profile
if (p?.supported_nips?.includes(77)) return true if (p?.negentropy) return true
if (p?.software?.includes("strfry") && !p?.version?.match(/^0\./)) return true if (p?.supported_nips?.includes?.(77)) return true
if (p?.software?.includes?.("strfry") && !p?.version?.match(/^0\./)) return true
return false return false
} }
+4
View File
@@ -20,7 +20,11 @@
}, },
"dependencies": { "dependencies": {
"@welshman/lib": "workspace:*", "@welshman/lib": "workspace:*",
"@welshman/dvm": "workspace:*",
"@welshman/net": "workspace:*", "@welshman/net": "workspace:*",
"@welshman/relay": "workspace:*",
"@welshman/router": "workspace:*",
"@welshman/signer": "workspace:*",
"@welshman/util": "workspace:*", "@welshman/util": "workspace:*",
"trava": "^1.2.1" "trava": "^1.2.1"
}, },
+33 -10
View File
@@ -1,7 +1,17 @@
import {uniq, identity, flatten, pushToMapKey, intersection, tryCatch, now} from "@welshman/lib" import {uniq, identity, flatten, pushToMapKey, intersection, tryCatch, now} from "@welshman/lib"
import type {TrustedEvent, Filter} from "@welshman/util" import {
import {intersectFilters, matchFilter, getAddress, getIdFilters, unionFilters} from "@welshman/util" TrustedEvent,
import type { Filter,
intersectFilters,
matchFilter,
getAddress,
getIdFilters,
unionFilters,
} from "@welshman/util"
import {Repository} from "@welshman/relay"
import {ISigner} from "@welshman/signer"
import {Tracker} from "@welshman/net"
import {
CreatedAtItem, CreatedAtItem,
RequestItem, RequestItem,
ListItem, ListItem,
@@ -10,13 +20,22 @@ import type {
DVMItem, DVMItem,
Scope, Scope,
Feed, Feed,
FeedOptions, FeedType,
} from "./core.js" } from "./core.js"
import {getFeedArgs, feedsFromTags} from "./utils.js" import {getFeedArgs, feedsFromTags} from "./utils.js"
import {FeedType} from "./core.js" import {requestPage, requestDVM} from "./request.js"
export type FeedCompilerOptions = {
signer?: ISigner
signal?: AbortSignal
tracker?: Tracker
repository?: Repository
getPubkeysForScope: (scope: Scope) => string[]
getPubkeysForWOTRange: (minWOT: number, maxWOT: number) => string[]
}
export class FeedCompiler { export class FeedCompiler {
constructor(readonly options: FeedOptions) {} constructor(readonly options: FeedCompilerOptions) {}
canCompile(feed: Feed): boolean { canCompile(feed: Feed): boolean {
switch (feed[0]) { switch (feed[0]) {
@@ -138,9 +157,10 @@ export class FeedCompiler {
await Promise.all( await Promise.all(
items.map(({mappings, ...request}) => items.map(({mappings, ...request}) =>
this.options.requestDVM({ requestDVM({
...request, ...request,
onEvent: async (e: TrustedEvent) => { signer: this.options.signer,
onResult: async (e: TrustedEvent) => {
const tags = (await tryCatch(() => JSON.parse(e.content))) || [] const tags = (await tryCatch(() => JSON.parse(e.content))) || []
for (const feed of feedsFromTags(tags, mappings)) { for (const feed of feedsFromTags(tags, mappings)) {
@@ -247,7 +267,10 @@ export class FeedCompiler {
const addresses = uniq(listItems.flatMap(({addresses}) => addresses)) const addresses = uniq(listItems.flatMap(({addresses}) => addresses))
const eventsByAddress = new Map<string, TrustedEvent>() const eventsByAddress = new Map<string, TrustedEvent>()
await this.options.request({ await requestPage({
signal: this.options.signal,
tracker: this.options.tracker,
repository: this.options.repository,
filters: getIdFilters(addresses), filters: getIdFilters(addresses),
onEvent: (e: TrustedEvent) => eventsByAddress.set(getAddress(e), e), onEvent: (e: TrustedEvent) => eventsByAddress.set(getAddress(e), e),
}) })
@@ -280,7 +303,7 @@ export class FeedCompiler {
await Promise.all( await Promise.all(
labelItems.map(({mappings, relays, ...filter}) => labelItems.map(({mappings, relays, ...filter}) =>
this.options.request({ requestPage({
relays, relays,
filters: [{kinds: [1985], ...filter}], filters: [{kinds: [1985], ...filter}],
onEvent: (e: TrustedEvent) => events.push(e), onEvent: (e: TrustedEvent) => events.push(e),
+37 -31
View File
@@ -1,14 +1,20 @@
import {inc, memoize, omitVals, max, min, now} from "@welshman/lib" import {inc, memoize, omitVals, max, min, now} from "@welshman/lib"
import type {TrustedEvent, Filter} from "@welshman/util" import {EPOCH, trimFilters, guessFilterDelta, TrustedEvent, Filter} from "@welshman/util"
import {EPOCH, trimFilters, guessFilterDelta} from "@welshman/util" import {Feed, FeedType, RequestItem} from "./core.js"
import type {Feed, RequestItem, FeedOptions} from "./core.js" import {FeedCompiler, FeedCompilerOptions} from "./compiler.js"
import {FeedType} from "./core.js" import {requestPage} from "./request.js"
import {FeedCompiler} from "./compiler.js"
export type FeedControllerOptions = FeedCompilerOptions & {
feed: Feed
onEvent?: (event: TrustedEvent) => void
onExhausted?: () => void
useWindowing?: boolean
}
export class FeedController { export class FeedController {
compiler: FeedCompiler compiler: FeedCompiler
constructor(readonly options: FeedOptions) { constructor(readonly options: FeedControllerOptions) {
this.compiler = new FeedCompiler(options) this.compiler = new FeedCompiler(options)
} }
@@ -40,8 +46,7 @@ export class FeedController {
load = async (limit: number) => (await this.getLoader())(limit) load = async (limit: number) => (await this.getLoader())(limit)
async _getRequestsLoader(requests: RequestItem[], overrides: Partial<FeedOptions> = {}) { async _getRequestsLoader(requests: RequestItem[]) {
const {onEvent, onExhausted} = {...this.options, ...overrides}
const seen = new Set() const seen = new Set()
const exhausted = new Set() const exhausted = new Set()
const loaders = await Promise.all( const loaders = await Promise.all(
@@ -50,7 +55,7 @@ export class FeedController {
onExhausted: () => exhausted.add(request), onExhausted: () => exhausted.add(request),
onEvent: e => { onEvent: e => {
if (!seen.has(e.id)) { if (!seen.has(e.id)) {
onEvent?.(e) this.options.onEvent?.(e)
seen.add(e.id) seen.add(e.id)
} }
}, },
@@ -62,14 +67,15 @@ export class FeedController {
await Promise.all(loaders.map(loader => loader(limit))) await Promise.all(loaders.map(loader => loader(limit)))
if (exhausted.size === requests.length) { if (exhausted.size === requests.length) {
onExhausted?.() this.options?.onExhausted?.()
} }
} }
} }
async _getRequestLoader({relays, filters}: RequestItem, overrides: Partial<FeedOptions> = {}) { async _getRequestLoader(
const {useWindowing, onEvent, onExhausted, request} = {...this.options, ...overrides} {relays, filters}: RequestItem,
{onEvent, onExhausted}: Pick<FeedControllerOptions, "onEvent" | "onExhausted">,
) {
// Make sure we have some kind of filter to send if we've been given an empty one, as happens with relay feeds // Make sure we have some kind of filter to send if we've been given an empty one, as happens with relay feeds
if (!filters || filters.length === 0) { if (!filters || filters.length === 0) {
filters = [{}] filters = [{}]
@@ -83,7 +89,7 @@ export class FeedController {
let loading = false let loading = false
let delta = initialDelta let delta = initialDelta
let since = useWindowing ? maxUntil - delta : minSince let since = this.options.useWindowing ? maxUntil - delta : minSince
let until = maxUntil let until = maxUntil
return async (limit: number) => { return async (limit: number) => {
@@ -110,10 +116,13 @@ export class FeedController {
let count = 0 let count = 0
await request( await requestPage(
omitVals([undefined], { omitVals([undefined], {
relays, relays,
filters: trimFilters(requestFilters), filters: trimFilters(requestFilters),
signal: this.options.signal,
tracker: this.options.tracker,
repository: this.options.repository,
onEvent: (event: TrustedEvent) => { onEvent: (event: TrustedEvent) => {
count += 1 count += 1
until = Math.min(until, event.created_at - 1) until = Math.min(until, event.created_at - 1)
@@ -122,7 +131,7 @@ export class FeedController {
}), }),
) )
if (useWindowing) { if (this.options.useWindowing) {
if (since === minSince) { if (since === minSince) {
onExhausted?.() onExhausted?.()
} }
@@ -143,8 +152,7 @@ export class FeedController {
} }
} }
async _getDifferenceLoader(feeds: Feed[], overrides: Partial<FeedOptions> = {}) { async _getDifferenceLoader(feeds: Feed[]) {
const {onEvent, onExhausted, ...options} = {...this.options, ...overrides}
const exhausted = new Set<number>() const exhausted = new Set<number>()
const skip = new Set<string>() const skip = new Set<string>()
const events: TrustedEvent[] = [] const events: TrustedEvent[] = []
@@ -154,7 +162,7 @@ export class FeedController {
feeds.map( feeds.map(
(thisFeed: Feed, i: number) => (thisFeed: Feed, i: number) =>
new FeedController({ new FeedController({
...options, ...this.options,
feed: thisFeed, feed: thisFeed,
onExhausted: () => exhausted.add(i), onExhausted: () => exhausted.add(i),
onEvent: (event: TrustedEvent) => { onEvent: (event: TrustedEvent) => {
@@ -181,19 +189,18 @@ export class FeedController {
for (const event of events.splice(0)) { for (const event of events.splice(0)) {
if (!skip.has(event.id) && !seen.has(event.id)) { if (!skip.has(event.id) && !seen.has(event.id)) {
onEvent?.(event) this.options.onEvent?.(event)
seen.add(event.id) seen.add(event.id)
} }
} }
if (exhausted.size === controllers.length) { if (exhausted.size === controllers.length) {
onExhausted?.() this.options.onExhausted?.()
} }
} }
} }
async _getIntersectionLoader(feeds: Feed[], overrides: Partial<FeedOptions> = {}) { async _getIntersectionLoader(feeds: Feed[]) {
const {onEvent, onExhausted, ...options} = {...this.options, ...overrides}
const exhausted = new Set<number>() const exhausted = new Set<number>()
const counts = new Map<string, number>() const counts = new Map<string, number>()
const events: TrustedEvent[] = [] const events: TrustedEvent[] = []
@@ -203,7 +210,7 @@ export class FeedController {
feeds.map( feeds.map(
(thisFeed: Feed, i: number) => (thisFeed: Feed, i: number) =>
new FeedController({ new FeedController({
...options, ...this.options,
feed: thisFeed, feed: thisFeed,
onExhausted: () => exhausted.add(i), onExhausted: () => exhausted.add(i),
onEvent: (event: TrustedEvent) => { onEvent: (event: TrustedEvent) => {
@@ -227,19 +234,18 @@ export class FeedController {
for (const event of events.splice(0)) { for (const event of events.splice(0)) {
if (counts.get(event.id) === controllers.length && !seen.has(event.id)) { if (counts.get(event.id) === controllers.length && !seen.has(event.id)) {
onEvent?.(event) this.options.onEvent?.(event)
seen.add(event.id) seen.add(event.id)
} }
} }
if (exhausted.size === controllers.length) { if (exhausted.size === controllers.length) {
onExhausted?.() this.options.onExhausted?.()
} }
} }
} }
async _getUnionLoader(feeds: Feed[], overrides: Partial<FeedOptions> = {}) { async _getUnionLoader(feeds: Feed[]) {
const {onEvent, onExhausted, ...options} = {...this.options, ...overrides}
const exhausted = new Set<number>() const exhausted = new Set<number>()
const seen = new Set() const seen = new Set()
@@ -247,12 +253,12 @@ export class FeedController {
feeds.map( feeds.map(
(thisFeed: Feed, i: number) => (thisFeed: Feed, i: number) =>
new FeedController({ new FeedController({
...options, ...this.options,
feed: thisFeed, feed: thisFeed,
onExhausted: () => exhausted.add(i), onExhausted: () => exhausted.add(i),
onEvent: (event: TrustedEvent) => { onEvent: (event: TrustedEvent) => {
if (!seen.has(event.id)) { if (!seen.has(event.id)) {
onEvent?.(event) this.options.onEvent?.(event)
seen.add(event.id) seen.add(event.id)
} }
}, },
@@ -272,7 +278,7 @@ export class FeedController {
) )
if (exhausted.size === controllers.length) { if (exhausted.size === controllers.length) {
onExhausted?.() this.options.onExhausted?.()
} }
} }
} }
+1 -27
View File
@@ -1,4 +1,4 @@
import type {TrustedEvent, Filter} from "@welshman/util" import {Filter} from "@welshman/util"
export enum FeedType { export enum FeedType {
Address = "address", Address = "address",
@@ -108,29 +108,3 @@ export type RequestItem = {
relays?: string[] relays?: string[]
filters?: Filter[] filters?: Filter[]
} }
export type RequestOpts = RequestItem & {
onEvent: (event: TrustedEvent) => void
}
export type DVMRequest = {
kind: number
tags?: string[][]
relays?: string[]
}
export type DVMOpts = DVMRequest & {
onEvent: (event: TrustedEvent) => void
}
export type FeedOptions = {
feed: Feed
request: (opts: RequestOpts) => Promise<void>
requestDVM: (opts: DVMOpts) => Promise<void>
getPubkeysForScope: (scope: Scope) => string[]
getPubkeysForWOTRange: (minWOT: number, maxWOT: number) => string[]
onEvent?: (event: TrustedEvent) => void
onExhausted?: () => void
useWindowing?: boolean
signal?: AbortSignal
}
+110
View File
@@ -0,0 +1,110 @@
import {partition, now, nthEq, race} from "@welshman/lib"
import {makeEvent, Filter, getPubkeyTagValues, TrustedEvent} from "@welshman/util"
import {Nip01Signer, ISigner} from "@welshman/signer"
import {Repository} from "@welshman/relay"
import {Router, getFilterSelections, addMinimalFallbacks} from "@welshman/router"
import {Tracker, request} from "@welshman/net"
import {makeDvmRequest} from "@welshman/dvm"
export type RequestPageOptions = {
filters?: Filter[]
relays?: string[]
signal?: AbortSignal
tracker?: Tracker
repository?: Repository
onEvent?: (event: TrustedEvent) => void
}
export const requestPage = async ({
filters = [{}],
relays = [],
onEvent,
signal,
repository,
tracker = new Tracker(),
}: RequestPageOptions) => {
if (relays.length > 0) {
return request({tracker, signal, relays, filters, onEvent, autoClose: true})
}
const promises: Promise<TrustedEvent[]>[] = []
const [withSearch, withoutSearch] = partition(f => Boolean(f.search), filters)
if (withSearch.length > 0) {
promises.push(
request({
signal,
tracker,
onEvent,
threshold: 0.1,
autoClose: true,
filters: withSearch,
relays: Router.get().Search().getUrls(),
}),
)
}
if (withoutSearch.length > 0) {
promises.push(
...getFilterSelections(filters).flatMap(({relays, filters}) =>
request({tracker, signal, onEvent, relays, filters, threshold: 0.8, autoClose: true}),
),
)
}
// Break out selections by relay so we can complete early after a certain number
// of requests complete for faster load times
await race(withSearch.length > 0 ? 0.1 : 0.8, promises)
// Wait until after we've queried the network to access our local cache. This results in less
// snappy response times, but is necessary to prevent stale stuff that the user has already seen
// from showing up at the top of the feed
if (repository) {
for (const event of repository.query(filters)) {
onEvent?.(event)
}
}
}
export type RequestDVMOptions = {
kind: number
tags?: string[][]
relays?: string[]
signer?: ISigner
onResult: (event: TrustedEvent) => void
}
export const requestDVM = async ({
kind,
onResult,
tags = [],
relays = [],
signer = Nip01Signer.ephemeral(),
}: RequestDVMOptions) => {
if (relays.length === 0) {
relays = Router.get()
.FromPubkeys(getPubkeyTagValues(tags))
.policy(addMinimalFallbacks)
.getUrls()
}
if (!tags.some(nthEq(0, "expiration"))) {
tags.push(["expiration", String(now() + 60)])
}
if (!tags.some(nthEq(0, "relays"))) {
tags.push(["relays", ...relays])
}
if (!tags.some(nthEq(1, "user"))) {
tags.push(["param", "user", await signer.getPubkey()])
}
if (!tags.some(nthEq(1, "max_results"))) {
tags.push(["param", "max_results", "200"])
}
const event = await signer.sign(makeEvent(kind, {tags}))
await makeDvmRequest({relays, event, onResult})
}
+1
View File
@@ -11,6 +11,7 @@ export type RelayProfile = {
contact?: string contact?: string
software?: string software?: string
version?: string version?: string
negentropy?: number
description?: string description?: string
supported_nips?: number[] supported_nips?: number[]
limitation?: { limitation?: {
+12 -9
View File
@@ -213,12 +213,24 @@ importers:
packages/feeds: packages/feeds:
dependencies: dependencies:
'@welshman/dvm':
specifier: workspace:*
version: link:../dvm
'@welshman/lib': '@welshman/lib':
specifier: workspace:* specifier: workspace:*
version: link:../lib version: link:../lib
'@welshman/net': '@welshman/net':
specifier: workspace:* specifier: workspace:*
version: link:../net version: link:../net
'@welshman/relay':
specifier: workspace:*
version: link:../relay
'@welshman/router':
specifier: workspace:*
version: link:../router
'@welshman/signer':
specifier: workspace:*
version: link:../signer
'@welshman/util': '@welshman/util':
specifier: workspace:* specifier: workspace:*
version: link:../util version: link:../util
@@ -309,15 +321,6 @@ importers:
specifier: ~5.8.0 specifier: ~5.8.0
version: 5.8.2 version: 5.8.2
packages/schema:
devDependencies:
rimraf:
specifier: ~6.0.0
version: 6.0.1
typescript:
specifier: ~5.8.0
version: 5.8.2
packages/signer: packages/signer:
dependencies: dependencies:
'@noble/curves': '@noble/curves':