From 05a9d6461b56e8e753211203924c3ebd711f0e0b Mon Sep 17 00:00:00 2001 From: Jon Staab Date: Tue, 1 Apr 2025 13:15:50 -0700 Subject: [PATCH] Accept multiple filters to request --- package-lock.json | 7 +- packages/app/package.json | 2 +- packages/app/src/core.ts | 4 + packages/app/src/feeds.ts | 15 ++-- packages/app/src/follows.ts | 4 +- packages/app/src/mutes.ts | 4 +- packages/app/src/pins.ts | 4 +- packages/app/src/profiles.ts | 4 +- packages/app/src/relaySelections.ts | 4 +- packages/app/src/search.ts | 2 +- packages/app/src/sync.ts | 14 +-- packages/dvm/src/handler.ts | 2 +- packages/dvm/src/request.ts | 4 +- packages/net/src/diff.ts | 2 +- packages/net/src/request.ts | 122 ++++++++++++++++----------- packages/signer/src/signers/nip46.ts | 4 +- 16 files changed, 105 insertions(+), 93 deletions(-) diff --git a/package-lock.json b/package-lock.json index f8ec749..72aecf4 100644 --- a/package-lock.json +++ b/package-lock.json @@ -7681,7 +7681,7 @@ "@welshman/lib": "^0.1.0", "@welshman/net": "^0.0.49", "@welshman/relay": "^0.1.0", - "@welshman/signer": "^0.1.0", + "@welshman/signer": "^0.1.1", "@welshman/store": "^0.1.0", "@welshman/util": "^0.1.0", "fuse.js": "^7.0.0", @@ -7771,8 +7771,8 @@ "@noble/hashes": "^1.6.1", "@welshman/lib": "^0.1.0", "@welshman/net": "^0.0.49", - "@welshman/util": "^0.1.0", - "nostr-tools": "^2.7.2" + "@welshman/signer": "^0.1.1", + "@welshman/util": "^0.1.0" } }, "packages/dvm/node_modules/@noble/hashes": { @@ -7860,7 +7860,6 @@ "@welshman/relay": "^0.1.0", "@welshman/util": "^0.1.0", "isomorphic-ws": "^5.0.0", - "nostr-tools": "^2.11.0", "typed-emitter": "^2.1.0" } }, diff --git a/packages/app/package.json b/packages/app/package.json index a90bdc6..817fd6c 100644 --- a/packages/app/package.json +++ b/packages/app/package.json @@ -32,7 +32,7 @@ "@welshman/lib": "^0.1.0", "@welshman/relay": "^0.1.0", "@welshman/net": "^0.0.49", - "@welshman/signer": "^0.1.0", + "@welshman/signer": "^0.1.1", "@welshman/store": "^0.1.0", "@welshman/util": "^0.1.0", "fuse.js": "^7.0.0", diff --git a/packages/app/src/core.ts b/packages/app/src/core.ts index 58b6cec..0650ab0 100644 --- a/packages/app/src/core.ts +++ b/packages/app/src/core.ts @@ -3,12 +3,16 @@ import {verifyEvent, isEphemeralKind, isDVMKind} from "@welshman/util" import {Repository} from "@welshman/relay" import {Pool, Tracker, SocketEvent, isRelayEvent} from "@welshman/net" import {custom} from "@welshman/store" +import {loadRelay, trackRelayStats} from "./relays.js" export const repository = Repository.getSingleton() export const tracker = new Tracker() Pool.getSingleton().subscribe(socket => { + loadRelay(socket.url) + trackRelayStats(socket) + socket.on(SocketEvent.Receive, message => { if (isRelayEvent(message)) { const event = message[2] diff --git a/packages/app/src/feeds.ts b/packages/app/src/feeds.ts index 8531888..703c2fc 100644 --- a/packages/app/src/feeds.ts +++ b/packages/app/src/feeds.ts @@ -11,17 +11,12 @@ import {wotGraph, maxWot, getFollows, getNetwork, getFollowers} from "./wot.js" export const request = async ({filters = [{}], relays = [], onEvent}: RequestOpts) => { if (relays.length > 0) { - await Promise.all( - filters.map( - filter => - new Promise(resolve => { - const sub = new MultiRequest({filter, relays, timeout: 5000, autoClose: true}) + await new Promise(resolve => { + const sub = new MultiRequest({filters, relays, timeout: 5000, autoClose: true}) - sub.on(RequestEvent.Event, onEvent) - sub.on(RequestEvent.Close, resolve) - }), - ), - ) + sub.on(RequestEvent.Event, onEvent) + sub.on(RequestEvent.Close, resolve) + }) } else { await Promise.all(getFilterSelections(filters).map(opts => request({...opts, onEvent}))) } diff --git a/packages/app/src/follows.ts b/packages/app/src/follows.ts index 9deeead..8876a15 100644 --- a/packages/app/src/follows.ts +++ b/packages/app/src/follows.ts @@ -24,9 +24,9 @@ export const { load: async (pubkey: string, request: Partial = {}) => { await loadRelaySelections(pubkey, request) - const filter = {kinds: [FOLLOWS], authors: [pubkey]} + const filters = [{kinds: [FOLLOWS], authors: [pubkey]}] const relays = Router.get().FromPubkey(pubkey).getUrls() - await load({relays, ...request, filter}) + await load({relays, ...request, filters}) }, }) diff --git a/packages/app/src/mutes.ts b/packages/app/src/mutes.ts index 3b5b2d2..a2b0311 100644 --- a/packages/app/src/mutes.ts +++ b/packages/app/src/mutes.ts @@ -30,9 +30,9 @@ export const { load: async (pubkey: string, request: Partial = {}) => { await loadRelaySelections(pubkey, request) - const filter = {kinds: [MUTES], authors: [pubkey]} + const filters = [{kinds: [MUTES], authors: [pubkey]}] const relays = Router.get().FromPubkey(pubkey).getUrls() - await load({relays, ...request, filter}) + await load({relays, ...request, filters}) }, }) diff --git a/packages/app/src/pins.ts b/packages/app/src/pins.ts index 984a0e6..77b43cb 100644 --- a/packages/app/src/pins.ts +++ b/packages/app/src/pins.ts @@ -24,9 +24,9 @@ export const { load: async (pubkey: string, request: Partial = {}) => { await loadRelaySelections(pubkey, request) - const filter = {kinds: [PINS], authors: [pubkey]} + const filters = [{kinds: [PINS], authors: [pubkey]}] const relays = Router.get().FromPubkey(pubkey).getUrls() - await load({relays, ...request, filter}) + await load({relays, ...request, filters}) }, }) diff --git a/packages/app/src/profiles.ts b/packages/app/src/profiles.ts index a4f5308..0b8d1bf 100644 --- a/packages/app/src/profiles.ts +++ b/packages/app/src/profiles.ts @@ -28,10 +28,10 @@ export const { await loadRelaySelections(pubkey, request) const router = Router.get() - const filter = {kinds: [PROFILE], authors: [pubkey]} + const filters = [{kinds: [PROFILE], authors: [pubkey]}] const relays = router.merge([router.Index(), router.FromPubkey(pubkey)]).getUrls() - await load({relays, ...request, filter}) + await load({relays, ...request, filters}) }, }) diff --git a/packages/app/src/relaySelections.ts b/packages/app/src/relaySelections.ts index b3df256..2e49aa4 100644 --- a/packages/app/src/relaySelections.ts +++ b/packages/app/src/relaySelections.ts @@ -53,7 +53,7 @@ export const { await load({ relays: router.merge([router.Index(), router.FromPubkey(pubkey)]).getUrls(), ...request, - filter: {kinds: [RELAYS], authors: [pubkey]}, + filters: [{kinds: [RELAYS], authors: [pubkey]}], }) }, }) @@ -78,7 +78,7 @@ export const { await load({ relays: router.merge([router.Index(), router.FromPubkey(pubkey)]).getUrls(), ...request, - filter: {kinds: [INBOX_RELAYS], authors: [pubkey]}, + filters: [{kinds: [INBOX_RELAYS], authors: [pubkey]}], }) }, }) diff --git a/packages/app/src/search.ts b/packages/app/src/search.ts index 3778190..8a38246 100644 --- a/packages/app/src/search.ts +++ b/packages/app/src/search.ts @@ -55,7 +55,7 @@ export const createSearch = (options: T[], opts: SearchOptions): Sea export const searchProfiles = debounce(500, (search: string) => { if (search.length > 2) { load({ - filter: {kinds: [PROFILE], search}, + filters: [{kinds: [PROFILE], search}], relays: Router.get().Search().getUrls(), }) } diff --git a/packages/app/src/sync.ts b/packages/app/src/sync.ts index 78b7c69..92f803e 100644 --- a/packages/app/src/sync.ts +++ b/packages/app/src/sync.ts @@ -35,17 +35,9 @@ export const pull = async ({relays, filters}: AppSyncOpts) => { relays.map(async relay => { await (hasNegentropy(relay) ? basePull({filters, events, relays: [relay]}) - : Promise.all( - filters.map( - filter => - new Promise(resolve => { - new SingleRequest({filter, relay, autoClose: true}).on( - RequestEvent.Close, - resolve, - ) - }), - ), - )) + : new Promise(resolve => { + new SingleRequest({filters, relay, autoClose: true}).on(RequestEvent.Close, resolve) + })) }), ) } diff --git a/packages/dvm/src/handler.ts b/packages/dvm/src/handler.ts index 21c3cd7..50c5fc1 100644 --- a/packages/dvm/src/handler.ts +++ b/packages/dvm/src/handler.ts @@ -50,7 +50,7 @@ export class DVM { filter["#p"] = [pubkey] } - const req = new MultiRequest({relays, filter, context}) + const req = new MultiRequest({relays, filters: [filter], context}) req.on(RequestEvent.Event, this.onEvent) req.on(RequestEvent.Close, resolve) diff --git a/packages/dvm/src/request.ts b/packages/dvm/src/request.ts index 5479954..1bb0f16 100644 --- a/packages/dvm/src/request.ts +++ b/packages/dvm/src/request.ts @@ -35,9 +35,9 @@ export const makeDvmRequest = (request: DVMRequestOptions) => { } = request const kind = event.kind + 1000 const kinds = reportProgress ? [kind, 7000] : [kind] - const filter: Filter = {kinds, since: now() - 60, "#e": [event.id]} + const filters: Filter[] = [{kinds, since: now() - 60, "#e": [event.id]}] - const sub = new MultiRequest({relays, filter, timeout, context}) + const sub = new MultiRequest({relays, filters, timeout, context}) const pub = new MultiPublish({relays, event, timeout, context}) sub.on(RequestEvent.Event, (event: TrustedEvent, url: string) => { diff --git a/packages/net/src/diff.ts b/packages/net/src/diff.ts index 6f838a6..6673deb 100644 --- a/packages/net/src/diff.ts +++ b/packages/net/src/diff.ts @@ -204,7 +204,7 @@ export const pull = async ({context, ...options}: PullOptions) => { return Promise.all( chunk(500, allIds).map(ids => { return new Promise(resolve => { - const req = new SingleRequest({relay, context, filter: {ids}, autoClose: true}) + const req = new SingleRequest({relay, context, filters: [{ids}], autoClose: true}) req.on(RequestEvent.Close, resolve) req.on(RequestEvent.Event, event => result.push(event as SignedEvent)) diff --git a/packages/net/src/request.ts b/packages/net/src/request.ts index bb397a1..5b3615e 100644 --- a/packages/net/src/request.ts +++ b/packages/net/src/request.ts @@ -3,7 +3,7 @@ import {on, call, randomId, yieldThread, pushToMapKey, batcher} from "@welshman/ import { Filter, unionFilters, - matchFilter, + matchFilters, TrustedEvent, getFilterResultCardinality, } from "@welshman/util" @@ -40,7 +40,7 @@ export type SingleRequestEvents = { export type SingleRequestOptions = { relay: string - filter: Filter + filters: Filter[] context?: AdapterContext timeout?: number tracker?: Tracker @@ -49,8 +49,12 @@ export type SingleRequestOptions = { isEventDeleted?: (event: TrustedEvent, url: string) => boolean } +// Needed for typescript to infer emitter methods +export interface SingleRequest extends TypedEmitter {} + export class SingleRequest extends (EventEmitter as new () => TypedEmitter) { - _id = `REQ-${randomId().slice(0, 8)}` + _ids = new Set() + _eose = new Set() _unsubscribers: Unsubscriber[] = [] _adapter: AbstractAdapter _closed = false @@ -71,29 +75,33 @@ export class SingleRequest extends (EventEmitter as new () => TypedEmitter TypedEmitter { - this._adapter.send([ClientMessageType.Req, this._id, this.options.filter]) + for (const filter of this.options.filters) { + const id = `REQ-${randomId().slice(0, 8)}` + + this._ids.add(id) + this._adapter.send([ClientMessageType.Req, id, filter]) + } }) } close() { if (this._closed) return - this._adapter.send(["CLOSE", this._id]) + for (const id of this._ids) { + this._adapter.send(["CLOSE", id]) + } + this.emit(RequestEvent.Close) this.removeAllListeners() this._unsubscribers.map(call) @@ -155,6 +171,9 @@ export type MultiRequestOptions = Omit & { relays: string[] } +// Needed for typescript to infer emitter methods +export interface MultiRequest extends TypedEmitter {} + export class MultiRequest extends (EventEmitter as new () => TypedEmitter) { _children: SingleRequest[] = [] _closed = new Set() @@ -214,6 +233,8 @@ export class MultiRequest extends (EventEmitter as new () => TypedEmitter new MultiRequest(options) + /** * A convenience function which returns a promise of events from a request. * It may return early if filter cardinality is known, and it delays requests by @@ -224,9 +245,11 @@ export class MultiRequest extends (EventEmitter as new () => TypedEmitter { const filtersByRelay = new Map() - for (const {filter, relays} of requests) { + for (const {filters, relays} of requests) { for (const relay of relays) { - pushToMapKey(filtersByRelay, relay, filter) + for (const filter of filters) { + pushToMapKey(filtersByRelay, relay, filter) + } } } @@ -234,35 +257,34 @@ export const load = batcher(200, async (requests: MultiRequestOptions[]) => { const events: TrustedEvent[] = [] await Promise.all( - Array.from(filtersByRelay).map(async ([relay, filters]) => { - await Promise.all( - unionFilters(filters).map(filter => { - new Promise(resolve => { - const cardinality = getFilterResultCardinality(filter) - const req = new MultiRequest({ - filter, - tracker, - relays: [relay], - timeout: 5000, - autoClose: true, - }) - - let count = 0 - - req.on(RequestEvent.Event, (event: TrustedEvent) => { - events.push(event) - - if (++count === cardinality) { - resolve() - } - }) - - req.on(RequestEvent.Close, () => resolve()) + Array.from(filtersByRelay).map( + async ([relay, unmergedFilters]) => + new Promise(resolve => { + const filters = unionFilters(unmergedFilters) + const cardinality = + filters.length === 1 ? getFilterResultCardinality(filters[0]) : undefined + const req = new MultiRequest({ + filters, + tracker, + relays: [relay], + timeout: 5000, + autoClose: true, }) + + let count = 0 + + req.on(RequestEvent.Event, (event: TrustedEvent) => { + events.push(event) + + if (++count === cardinality) { + resolve() + } + }) + + req.on(RequestEvent.Close, () => resolve()) }), - ) - }), + ), ) - return requests.map(r => events.filter(event => matchFilter(r.filter, event))) + return requests.map(r => events.filter(event => matchFilters(r.filters, event))) }) diff --git a/packages/signer/src/signers/nip46.ts b/packages/signer/src/signers/nip46.ts index c5c305f..3e90877 100644 --- a/packages/signer/src/signers/nip46.ts +++ b/packages/signer/src/signers/nip46.ts @@ -112,9 +112,9 @@ export class Nip46Receiver extends Emitter { const {relays, context} = this.params const userPubkey = await this.signer.getPubkey() - const filter = {kinds: [NOSTR_CONNECT], "#p": [userPubkey]} + const filters = [{kinds: [NOSTR_CONNECT], "#p": [userPubkey]}] - this.sub = new MultiRequest({relays, filter, context}) + this.sub = new MultiRequest({relays, filters, context}) this.sub.on(RequestEvent.Event, async (event: TrustedEvent, url: string) => { const json = await decrypt(this.signer, event.pubkey, event.content)