From f200fe1a42a35d5a84c40faf12eaaba0a2fc47dd Mon Sep 17 00:00:00 2001 From: Jon Staab Date: Fri, 11 Apr 2025 11:35:57 -0700 Subject: [PATCH] Fix duplicate events by address in load --- packages/net/src/request.ts | 39 ++++++++++++++++++++++++------------- 1 file changed, 26 insertions(+), 13 deletions(-) diff --git a/packages/net/src/request.ts b/packages/net/src/request.ts index 00b8056..2d089cf 100644 --- a/packages/net/src/request.ts +++ b/packages/net/src/request.ts @@ -1,7 +1,8 @@ import {EventEmitter} from "events" -import {on, flatten, addToMapKey, defer, Deferred, call, randomId, yieldThread, pushToMapKey, batcher} from "@welshman/lib" +import {on, uniq, lt, flatten, addToMapKey, defer, Deferred, call, randomId, yieldThread, pushToMapKey, batcher} from "@welshman/lib" import { Filter, + getAddress, unionFilters, matchFilters, TrustedEvent, @@ -14,6 +15,20 @@ import {Unsubscriber} from "./util.js" import {netContext} from "./context.js" import {Tracker} from "./tracker.js" +const deduplicateEvents = (events: TrustedEvent[]) => { + const eventsByAddress = new Map() + + for (const event of events) { + const address = getAddress(event) + + if (lt(eventsByAddress.get(address)?.created_at, event.created_at)) { + eventsByAddress.set(address, event) + } + } + + return Array.from(eventsByAddress.values()) +} + export type RequestOneOptions = { relay: string filters: Filter[] @@ -57,7 +72,7 @@ export const requestOne = (options: RequestOneOptions) => { options.onClose?.() adapter.cleanup() unsubscribers.map(call) - deferred.resolve(events) + deferred.resolve(deduplicateEvents(events)) } const unsubscribers = [ @@ -209,7 +224,7 @@ export const makeLoader = (options: LoaderOptions) => const signalsByRelay = new Map() const closedRequestsByRelay = new Map>() const closedRelaysByRequest = new Map>() - const relays = allRequests.flatMap(r => r.relays) + const relays = uniq(allRequests.flatMap(r => r.relays)) const threshold = options.threshold || 1 const tracker = new Tracker() @@ -218,9 +233,11 @@ export const makeLoader = (options: LoaderOptions) => addToMapKey(closedRelaysByRequest, request, relay) const closedRelays = closedRelaysByRequest.get(request)?.size || 0 - if (closedRelays >= request.relays.length * threshold) { + if (closedRelays >= uniq(request.relays).length * threshold) { + const events = deduplicateEvents(eventsByRequest.get(request) || []) + request.onClose?.() - resultsByRequest.get(request)?.resolve(eventsByRequest.get(request) || []) + resultsByRequest.get(request)?.resolve(events) } if (closedRequestsByRelay.get(relay)?.size === requestsByRelay.get(relay)?.length) { @@ -229,17 +246,13 @@ export const makeLoader = (options: LoaderOptions) => } for (const request of allRequests) { - for (const relay of request.relays) { + for (const relay of uniq(request.relays)) { pushToMapKey(requestsByRelay, relay, request) resultsByRequest.set(request, defer()) - } - // Propagate abort when all requests have been closed for a given relay - request.signal?.addEventListener('abort', () => { - for (const relay of request.relays) { - close(relay, request) - } - }) + // Propagate abort when all requests have been closed for a given relay + request.signal?.addEventListener('abort', () => close(relay, request)) + } } // Create an abort controller for each relay