From 6589886df2971f5633873109c99be8de6e94c845 Mon Sep 17 00:00:00 2001 From: Jon Staab Date: Mon, 5 May 2025 14:05:34 -0700 Subject: [PATCH] Thread context through feed controller --- packages/feeds/src/compiler.ts | 12 ++++----- packages/feeds/src/controller.ts | 4 ++- packages/feeds/src/request.ts | 37 ++++++++++++++-------------- packages/net/src/request.ts | 20 +++++++++------ packages/net/src/socket.ts | 1 + packages/signer/src/signers/nip46.ts | 1 + 6 files changed, 42 insertions(+), 33 deletions(-) diff --git a/packages/feeds/src/compiler.ts b/packages/feeds/src/compiler.ts index 43dfebb..c00a110 100644 --- a/packages/feeds/src/compiler.ts +++ b/packages/feeds/src/compiler.ts @@ -8,9 +8,8 @@ import { getIdFilters, unionFilters, } from "@welshman/util" -import {Repository} from "@welshman/relay" import {ISigner} from "@welshman/signer" -import {Tracker} from "@welshman/net" +import {AdapterContext} from "@welshman/net" import { CreatedAtItem, RequestItem, @@ -28,8 +27,7 @@ import {requestPage, requestDVM} from "./request.js" export type FeedCompilerOptions = { signer?: ISigner signal?: AbortSignal - tracker?: Tracker - repository?: Repository + context?: AdapterContext getPubkeysForScope: (scope: Scope) => string[] getPubkeysForWOTRange: (minWOT: number, maxWOT: number) => string[] } @@ -160,6 +158,7 @@ export class FeedCompiler { requestDVM({ ...request, signer: this.options.signer, + context: this.options.context, onResult: async (e: TrustedEvent) => { const tags = (await tryCatch(() => JSON.parse(e.content))) || [] @@ -269,8 +268,7 @@ export class FeedCompiler { await requestPage({ signal: this.options.signal, - tracker: this.options.tracker, - repository: this.options.repository, + context: this.options.context, filters: getIdFilters(addresses), onEvent: (e: TrustedEvent) => eventsByAddress.set(getAddress(e), e), }) @@ -304,6 +302,8 @@ export class FeedCompiler { await Promise.all( labelItems.map(({mappings, relays, ...filter}) => requestPage({ + signal: this.options.signal, + context: this.options.context, relays, filters: [{kinds: [1985], ...filter}], onEvent: (e: TrustedEvent) => events.push(e), diff --git a/packages/feeds/src/controller.ts b/packages/feeds/src/controller.ts index d5910a0..32d6497 100644 --- a/packages/feeds/src/controller.ts +++ b/packages/feeds/src/controller.ts @@ -1,11 +1,13 @@ import {inc, defer, Deferred, memoize, omitVals, max, min, now} from "@welshman/lib" import {EPOCH, trimFilters, guessFilterDelta, TrustedEvent, Filter} from "@welshman/util" +import {Tracker} from "@welshman/net" import {Feed, FeedType, RequestItem} from "./core.js" import {FeedCompiler, FeedCompilerOptions} from "./compiler.js" import {requestPage} from "./request.js" export type FeedControllerOptions = FeedCompilerOptions & { feed: Feed + tracker?: Tracker onEvent?: (event: TrustedEvent) => void onExhausted?: () => void useWindowing?: boolean @@ -122,7 +124,7 @@ export class FeedController { filters: trimFilters(requestFilters), signal: this.options.signal, tracker: this.options.tracker, - repository: this.options.repository, + context: this.options.context, onEvent: (event: TrustedEvent) => { count += 1 until = Math.min(until, event.created_at - 1) diff --git a/packages/feeds/src/request.ts b/packages/feeds/src/request.ts index bcb6b46..3cbce38 100644 --- a/packages/feeds/src/request.ts +++ b/packages/feeds/src/request.ts @@ -10,30 +10,30 @@ import { RELAYS, } from "@welshman/util" import {Nip01Signer, ISigner} from "@welshman/signer" -import {Repository} from "@welshman/relay" +import {LOCAL_RELAY_URL} from "@welshman/relay" import {Router, getFilterSelections, addMinimalFallbacks} from "@welshman/router" -import {Tracker, request} from "@welshman/net" +import {Tracker, AdapterContext, request, netContext, RequestOptions} from "@welshman/net" import {makeDvmRequest} from "@welshman/dvm" export type RequestPageOptions = { - filters?: Filter[] + filters: Filter[] + onEvent: (event: TrustedEvent) => void relays?: string[] - signal?: AbortSignal tracker?: Tracker - repository?: Repository - onEvent?: (event: TrustedEvent) => void + signal?: AbortSignal + context?: AdapterContext } export const requestPage = async ({ - filters = [{}], - relays = [], + filters, onEvent, - signal, - repository, + relays = [], tracker = new Tracker(), + signal, + context, }: RequestPageOptions) => { if (relays.length > 0) { - return request({tracker, signal, relays, filters, onEvent, autoClose: true}) + return request({tracker, signal, context, onEvent, relays, filters, autoClose: true}) } const promises: Promise[] = [] @@ -42,8 +42,9 @@ export const requestPage = async ({ if (withSearch.length > 0) { promises.push( request({ - signal, tracker, + signal, + context, onEvent, threshold: 0.1, autoClose: true, @@ -56,7 +57,7 @@ export const requestPage = async ({ if (withoutSearch.length > 0) { promises.push( ...getFilterSelections(filters).flatMap(({relays, filters}) => - request({tracker, signal, onEvent, relays, filters, threshold: 0.8, autoClose: true}), + request({tracker, signal, context, onEvent, relays, filters, threshold: 0.8, autoClose: true}), ), ) } @@ -68,11 +69,7 @@ export const requestPage = async ({ // Wait until after we've queried the network to access our local cache. This results in less // snappy response times, but is necessary to prevent stale stuff that the user has already seen // from showing up at the top of the feed - if (repository) { - for (const event of repository.query(filters)) { - onEvent?.(event) - } - } + await request({tracker, signal, context, onEvent, filters, relays: [LOCAL_RELAY_URL], autoClose: true}) } export type RequestDVMOptions = { @@ -80,6 +77,7 @@ export type RequestDVMOptions = { tags?: string[][] relays?: string[] signer?: ISigner + context?: AdapterContext onResult: (event: TrustedEvent) => void } @@ -89,6 +87,7 @@ export const requestDVM = async ({ tags = [], relays = [], signer = Nip01Signer.ephemeral(), + context, }: RequestDVMOptions) => { if (relays.length === 0) { const events = await request({ @@ -121,5 +120,5 @@ export const requestDVM = async ({ const event = await signer.sign(makeEvent(kind, {tags})) - await makeDvmRequest({relays, event, onResult}) + await makeDvmRequest({relays, event, context, onResult}) } diff --git a/packages/net/src/request.ts b/packages/net/src/request.ts index 9623d99..351d4c2 100644 --- a/packages/net/src/request.ts +++ b/packages/net/src/request.ts @@ -39,9 +39,7 @@ const deduplicateEvents = (events: TrustedEvent[]) => { return Array.from(eventsByAddress.values()) } -export type RequestOneOptions = { - relay: string - filters: Filter[] +export type BaseRequestOptions = { signal?: AbortSignal tracker?: Tracker context?: AdapterContext @@ -58,6 +56,11 @@ export type RequestOneOptions = { onClose?: () => void } +export type RequestOneOptions = BaseRequestOptions & { + relay: string + filters: Filter[] +} + export const requestOne = (options: RequestOneOptions) => { const ids = new Set() const eose = new Set() @@ -158,8 +161,9 @@ export const requestOne = (options: RequestOneOptions) => { return deferred } -export type RequestOptions = Omit & { +export type RequestOptions = BaseRequestOptions & { relays: string[] + filters: Filter[] threshold?: number } @@ -216,6 +220,8 @@ export type LoadOptions = { onClose?: () => void } +export type Loader = (options: LoadOptions) => Promise + /** * Creates a convenience function which returns a promise of events from a request. * It may return early if filter cardinality is known, and it delays requests in order @@ -224,7 +230,7 @@ export type LoadOptions = { * @returns - a load function */ export const makeLoader = (options: LoaderOptions) => - batcher(options.delay, async (allRequests: LoadOptions[]) => { + batcher(options.delay, (allRequests: LoadOptions[]) => { const resultsByRequest = new Map>() const eventsByRequest = new Map() const requestsByRelay = new Map() @@ -276,7 +282,7 @@ export const makeLoader = (options: LoaderOptions) => signalsByRelay.set(relay, AbortSignal.any(signals)) } - Array.from(requestsByRelay).forEach(async ([relay, requests]) => { + Array.from(requestsByRelay).forEach(([relay, requests]) => { // Union all filters for a given request and send them together const filters = unionFilters(requests.flatMap(r => r.filters)) @@ -318,6 +324,6 @@ export const makeLoader = (options: LoaderOptions) => }) return allRequests.map(r => resultsByRequest.get(r) || []) - }) + }) as Loader export const load = makeLoader({delay: 200, timeout: 3000, threshold: 0.5}) diff --git a/packages/net/src/socket.ts b/packages/net/src/socket.ts index fa91f7a..92006e4 100644 --- a/packages/net/src/socket.ts +++ b/packages/net/src/socket.ts @@ -122,6 +122,7 @@ export class Socket extends EventEmitter { close = () => { this._ws?.close() this._ws = undefined + this._sendQueue.clear() } cleanup = () => { diff --git a/packages/signer/src/signers/nip46.ts b/packages/signer/src/signers/nip46.ts index 69e0c0d..a1e17a8 100644 --- a/packages/signer/src/signers/nip46.ts +++ b/packages/signer/src/signers/nip46.ts @@ -105,6 +105,7 @@ export class Nip46Receiver extends Emitter { ) { super() } + // start listening to the remote signer for incoming events // broadcast any event returned by the remote signer start = async () => {