Thread context through feed controller

This commit is contained in:
Jon Staab
2025-05-05 14:05:34 -07:00
parent c538db070a
commit 6589886df2
6 changed files with 42 additions and 33 deletions
+6 -6
View File
@@ -8,9 +8,8 @@ import {
getIdFilters, getIdFilters,
unionFilters, unionFilters,
} from "@welshman/util" } from "@welshman/util"
import {Repository} from "@welshman/relay"
import {ISigner} from "@welshman/signer" import {ISigner} from "@welshman/signer"
import {Tracker} from "@welshman/net" import {AdapterContext} from "@welshman/net"
import { import {
CreatedAtItem, CreatedAtItem,
RequestItem, RequestItem,
@@ -28,8 +27,7 @@ import {requestPage, requestDVM} from "./request.js"
export type FeedCompilerOptions = { export type FeedCompilerOptions = {
signer?: ISigner signer?: ISigner
signal?: AbortSignal signal?: AbortSignal
tracker?: Tracker context?: AdapterContext
repository?: Repository
getPubkeysForScope: (scope: Scope) => string[] getPubkeysForScope: (scope: Scope) => string[]
getPubkeysForWOTRange: (minWOT: number, maxWOT: number) => string[] getPubkeysForWOTRange: (minWOT: number, maxWOT: number) => string[]
} }
@@ -160,6 +158,7 @@ export class FeedCompiler {
requestDVM({ requestDVM({
...request, ...request,
signer: this.options.signer, signer: this.options.signer,
context: this.options.context,
onResult: async (e: TrustedEvent) => { onResult: async (e: TrustedEvent) => {
const tags = (await tryCatch(() => JSON.parse(e.content))) || [] const tags = (await tryCatch(() => JSON.parse(e.content))) || []
@@ -269,8 +268,7 @@ export class FeedCompiler {
await requestPage({ await requestPage({
signal: this.options.signal, signal: this.options.signal,
tracker: this.options.tracker, context: this.options.context,
repository: this.options.repository,
filters: getIdFilters(addresses), filters: getIdFilters(addresses),
onEvent: (e: TrustedEvent) => eventsByAddress.set(getAddress(e), e), onEvent: (e: TrustedEvent) => eventsByAddress.set(getAddress(e), e),
}) })
@@ -304,6 +302,8 @@ export class FeedCompiler {
await Promise.all( await Promise.all(
labelItems.map(({mappings, relays, ...filter}) => labelItems.map(({mappings, relays, ...filter}) =>
requestPage({ requestPage({
signal: this.options.signal,
context: this.options.context,
relays, relays,
filters: [{kinds: [1985], ...filter}], filters: [{kinds: [1985], ...filter}],
onEvent: (e: TrustedEvent) => events.push(e), onEvent: (e: TrustedEvent) => events.push(e),
+3 -1
View File
@@ -1,11 +1,13 @@
import {inc, defer, Deferred, memoize, omitVals, max, min, now} from "@welshman/lib" import {inc, defer, Deferred, memoize, omitVals, max, min, now} from "@welshman/lib"
import {EPOCH, trimFilters, guessFilterDelta, TrustedEvent, Filter} from "@welshman/util" import {EPOCH, trimFilters, guessFilterDelta, TrustedEvent, Filter} from "@welshman/util"
import {Tracker} from "@welshman/net"
import {Feed, FeedType, RequestItem} from "./core.js" import {Feed, FeedType, RequestItem} from "./core.js"
import {FeedCompiler, FeedCompilerOptions} from "./compiler.js" import {FeedCompiler, FeedCompilerOptions} from "./compiler.js"
import {requestPage} from "./request.js" import {requestPage} from "./request.js"
export type FeedControllerOptions = FeedCompilerOptions & { export type FeedControllerOptions = FeedCompilerOptions & {
feed: Feed feed: Feed
tracker?: Tracker
onEvent?: (event: TrustedEvent) => void onEvent?: (event: TrustedEvent) => void
onExhausted?: () => void onExhausted?: () => void
useWindowing?: boolean useWindowing?: boolean
@@ -122,7 +124,7 @@ export class FeedController {
filters: trimFilters(requestFilters), filters: trimFilters(requestFilters),
signal: this.options.signal, signal: this.options.signal,
tracker: this.options.tracker, tracker: this.options.tracker,
repository: this.options.repository, context: this.options.context,
onEvent: (event: TrustedEvent) => { onEvent: (event: TrustedEvent) => {
count += 1 count += 1
until = Math.min(until, event.created_at - 1) until = Math.min(until, event.created_at - 1)
+18 -19
View File
@@ -10,30 +10,30 @@ import {
RELAYS, RELAYS,
} from "@welshman/util" } from "@welshman/util"
import {Nip01Signer, ISigner} from "@welshman/signer" import {Nip01Signer, ISigner} from "@welshman/signer"
import {Repository} from "@welshman/relay" import {LOCAL_RELAY_URL} from "@welshman/relay"
import {Router, getFilterSelections, addMinimalFallbacks} from "@welshman/router" import {Router, getFilterSelections, addMinimalFallbacks} from "@welshman/router"
import {Tracker, request} from "@welshman/net" import {Tracker, AdapterContext, request, netContext, RequestOptions} from "@welshman/net"
import {makeDvmRequest} from "@welshman/dvm" import {makeDvmRequest} from "@welshman/dvm"
export type RequestPageOptions = { export type RequestPageOptions = {
filters?: Filter[] filters: Filter[]
onEvent: (event: TrustedEvent) => void
relays?: string[] relays?: string[]
signal?: AbortSignal
tracker?: Tracker tracker?: Tracker
repository?: Repository signal?: AbortSignal
onEvent?: (event: TrustedEvent) => void context?: AdapterContext
} }
export const requestPage = async ({ export const requestPage = async ({
filters = [{}], filters,
relays = [],
onEvent, onEvent,
signal, relays = [],
repository,
tracker = new Tracker(), tracker = new Tracker(),
signal,
context,
}: RequestPageOptions) => { }: RequestPageOptions) => {
if (relays.length > 0) { if (relays.length > 0) {
return request({tracker, signal, relays, filters, onEvent, autoClose: true}) return request({tracker, signal, context, onEvent, relays, filters, autoClose: true})
} }
const promises: Promise<TrustedEvent[]>[] = [] const promises: Promise<TrustedEvent[]>[] = []
@@ -42,8 +42,9 @@ export const requestPage = async ({
if (withSearch.length > 0) { if (withSearch.length > 0) {
promises.push( promises.push(
request({ request({
signal,
tracker, tracker,
signal,
context,
onEvent, onEvent,
threshold: 0.1, threshold: 0.1,
autoClose: true, autoClose: true,
@@ -56,7 +57,7 @@ export const requestPage = async ({
if (withoutSearch.length > 0) { if (withoutSearch.length > 0) {
promises.push( promises.push(
...getFilterSelections(filters).flatMap(({relays, filters}) => ...getFilterSelections(filters).flatMap(({relays, filters}) =>
request({tracker, signal, onEvent, relays, filters, threshold: 0.8, autoClose: true}), request({tracker, signal, context, onEvent, relays, filters, threshold: 0.8, autoClose: true}),
), ),
) )
} }
@@ -68,11 +69,7 @@ export const requestPage = async ({
// Wait until after we've queried the network to access our local cache. This results in less // Wait until after we've queried the network to access our local cache. This results in less
// snappy response times, but is necessary to prevent stale stuff that the user has already seen // snappy response times, but is necessary to prevent stale stuff that the user has already seen
// from showing up at the top of the feed // from showing up at the top of the feed
if (repository) { await request({tracker, signal, context, onEvent, filters, relays: [LOCAL_RELAY_URL], autoClose: true})
for (const event of repository.query(filters)) {
onEvent?.(event)
}
}
} }
export type RequestDVMOptions = { export type RequestDVMOptions = {
@@ -80,6 +77,7 @@ export type RequestDVMOptions = {
tags?: string[][] tags?: string[][]
relays?: string[] relays?: string[]
signer?: ISigner signer?: ISigner
context?: AdapterContext
onResult: (event: TrustedEvent) => void onResult: (event: TrustedEvent) => void
} }
@@ -89,6 +87,7 @@ export const requestDVM = async ({
tags = [], tags = [],
relays = [], relays = [],
signer = Nip01Signer.ephemeral(), signer = Nip01Signer.ephemeral(),
context,
}: RequestDVMOptions) => { }: RequestDVMOptions) => {
if (relays.length === 0) { if (relays.length === 0) {
const events = await request({ const events = await request({
@@ -121,5 +120,5 @@ export const requestDVM = async ({
const event = await signer.sign(makeEvent(kind, {tags})) const event = await signer.sign(makeEvent(kind, {tags}))
await makeDvmRequest({relays, event, onResult}) await makeDvmRequest({relays, event, context, onResult})
} }
+13 -7
View File
@@ -39,9 +39,7 @@ const deduplicateEvents = (events: TrustedEvent[]) => {
return Array.from(eventsByAddress.values()) return Array.from(eventsByAddress.values())
} }
export type RequestOneOptions = { export type BaseRequestOptions = {
relay: string
filters: Filter[]
signal?: AbortSignal signal?: AbortSignal
tracker?: Tracker tracker?: Tracker
context?: AdapterContext context?: AdapterContext
@@ -58,6 +56,11 @@ export type RequestOneOptions = {
onClose?: () => void onClose?: () => void
} }
export type RequestOneOptions = BaseRequestOptions & {
relay: string
filters: Filter[]
}
export const requestOne = (options: RequestOneOptions) => { export const requestOne = (options: RequestOneOptions) => {
const ids = new Set<string>() const ids = new Set<string>()
const eose = new Set<string>() const eose = new Set<string>()
@@ -158,8 +161,9 @@ export const requestOne = (options: RequestOneOptions) => {
return deferred return deferred
} }
export type RequestOptions = Omit<RequestOneOptions, "relay"> & { export type RequestOptions = BaseRequestOptions & {
relays: string[] relays: string[]
filters: Filter[]
threshold?: number threshold?: number
} }
@@ -216,6 +220,8 @@ export type LoadOptions = {
onClose?: () => void onClose?: () => void
} }
export type Loader = (options: LoadOptions) => Promise<TrustedEvent[]>
/** /**
* Creates a convenience function which returns a promise of events from a request. * Creates a convenience function which returns a promise of events from a request.
* It may return early if filter cardinality is known, and it delays requests in order * It may return early if filter cardinality is known, and it delays requests in order
@@ -224,7 +230,7 @@ export type LoadOptions = {
* @returns - a load function * @returns - a load function
*/ */
export const makeLoader = (options: LoaderOptions) => export const makeLoader = (options: LoaderOptions) =>
batcher(options.delay, async (allRequests: LoadOptions[]) => { batcher(options.delay, (allRequests: LoadOptions[]) => {
const resultsByRequest = new Map<LoadOptions, Deferred<TrustedEvent[]>>() const resultsByRequest = new Map<LoadOptions, Deferred<TrustedEvent[]>>()
const eventsByRequest = new Map<LoadOptions, TrustedEvent[]>() const eventsByRequest = new Map<LoadOptions, TrustedEvent[]>()
const requestsByRelay = new Map<string, LoadOptions[]>() const requestsByRelay = new Map<string, LoadOptions[]>()
@@ -276,7 +282,7 @@ export const makeLoader = (options: LoaderOptions) =>
signalsByRelay.set(relay, AbortSignal.any(signals)) signalsByRelay.set(relay, AbortSignal.any(signals))
} }
Array.from(requestsByRelay).forEach(async ([relay, requests]) => { Array.from(requestsByRelay).forEach(([relay, requests]) => {
// Union all filters for a given request and send them together // Union all filters for a given request and send them together
const filters = unionFilters(requests.flatMap(r => r.filters)) const filters = unionFilters(requests.flatMap(r => r.filters))
@@ -318,6 +324,6 @@ export const makeLoader = (options: LoaderOptions) =>
}) })
return allRequests.map(r => resultsByRequest.get(r) || []) return allRequests.map(r => resultsByRequest.get(r) || [])
}) }) as Loader
export const load = makeLoader({delay: 200, timeout: 3000, threshold: 0.5}) export const load = makeLoader({delay: 200, timeout: 3000, threshold: 0.5})
+1
View File
@@ -122,6 +122,7 @@ export class Socket extends EventEmitter {
close = () => { close = () => {
this._ws?.close() this._ws?.close()
this._ws = undefined this._ws = undefined
this._sendQueue.clear()
} }
cleanup = () => { cleanup = () => {
+1
View File
@@ -105,6 +105,7 @@ export class Nip46Receiver extends Emitter {
) { ) {
super() super()
} }
// start listening to the remote signer for incoming events // start listening to the remote signer for incoming events
// broadcast any event returned by the remote signer // broadcast any event returned by the remote signer
start = async () => { start = async () => {