Fix duplicate events by address in load
This commit is contained in:
+24
-11
@@ -1,7 +1,8 @@
|
|||||||
import {EventEmitter} from "events"
|
import {EventEmitter} from "events"
|
||||||
import {on, flatten, addToMapKey, defer, Deferred, call, randomId, yieldThread, pushToMapKey, batcher} from "@welshman/lib"
|
import {on, uniq, lt, flatten, addToMapKey, defer, Deferred, call, randomId, yieldThread, pushToMapKey, batcher} from "@welshman/lib"
|
||||||
import {
|
import {
|
||||||
Filter,
|
Filter,
|
||||||
|
getAddress,
|
||||||
unionFilters,
|
unionFilters,
|
||||||
matchFilters,
|
matchFilters,
|
||||||
TrustedEvent,
|
TrustedEvent,
|
||||||
@@ -14,6 +15,20 @@ import {Unsubscriber} from "./util.js"
|
|||||||
import {netContext} from "./context.js"
|
import {netContext} from "./context.js"
|
||||||
import {Tracker} from "./tracker.js"
|
import {Tracker} from "./tracker.js"
|
||||||
|
|
||||||
|
const deduplicateEvents = (events: TrustedEvent[]) => {
|
||||||
|
const eventsByAddress = new Map<string, TrustedEvent>()
|
||||||
|
|
||||||
|
for (const event of events) {
|
||||||
|
const address = getAddress(event)
|
||||||
|
|
||||||
|
if (lt(eventsByAddress.get(address)?.created_at, event.created_at)) {
|
||||||
|
eventsByAddress.set(address, event)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return Array.from(eventsByAddress.values())
|
||||||
|
}
|
||||||
|
|
||||||
export type RequestOneOptions = {
|
export type RequestOneOptions = {
|
||||||
relay: string
|
relay: string
|
||||||
filters: Filter[]
|
filters: Filter[]
|
||||||
@@ -57,7 +72,7 @@ export const requestOne = (options: RequestOneOptions) => {
|
|||||||
options.onClose?.()
|
options.onClose?.()
|
||||||
adapter.cleanup()
|
adapter.cleanup()
|
||||||
unsubscribers.map(call)
|
unsubscribers.map(call)
|
||||||
deferred.resolve(events)
|
deferred.resolve(deduplicateEvents(events))
|
||||||
}
|
}
|
||||||
|
|
||||||
const unsubscribers = [
|
const unsubscribers = [
|
||||||
@@ -209,7 +224,7 @@ export const makeLoader = (options: LoaderOptions) =>
|
|||||||
const signalsByRelay = new Map<string, AbortSignal>()
|
const signalsByRelay = new Map<string, AbortSignal>()
|
||||||
const closedRequestsByRelay = new Map<string, Set<LoadOptions>>()
|
const closedRequestsByRelay = new Map<string, Set<LoadOptions>>()
|
||||||
const closedRelaysByRequest = new Map<LoadOptions, Set<string>>()
|
const closedRelaysByRequest = new Map<LoadOptions, Set<string>>()
|
||||||
const relays = allRequests.flatMap(r => r.relays)
|
const relays = uniq(allRequests.flatMap(r => r.relays))
|
||||||
const threshold = options.threshold || 1
|
const threshold = options.threshold || 1
|
||||||
const tracker = new Tracker()
|
const tracker = new Tracker()
|
||||||
|
|
||||||
@@ -218,9 +233,11 @@ export const makeLoader = (options: LoaderOptions) =>
|
|||||||
addToMapKey(closedRelaysByRequest, request, relay)
|
addToMapKey(closedRelaysByRequest, request, relay)
|
||||||
|
|
||||||
const closedRelays = closedRelaysByRequest.get(request)?.size || 0
|
const closedRelays = closedRelaysByRequest.get(request)?.size || 0
|
||||||
if (closedRelays >= request.relays.length * threshold) {
|
if (closedRelays >= uniq(request.relays).length * threshold) {
|
||||||
|
const events = deduplicateEvents(eventsByRequest.get(request) || [])
|
||||||
|
|
||||||
request.onClose?.()
|
request.onClose?.()
|
||||||
resultsByRequest.get(request)?.resolve(eventsByRequest.get(request) || [])
|
resultsByRequest.get(request)?.resolve(events)
|
||||||
}
|
}
|
||||||
|
|
||||||
if (closedRequestsByRelay.get(relay)?.size === requestsByRelay.get(relay)?.length) {
|
if (closedRequestsByRelay.get(relay)?.size === requestsByRelay.get(relay)?.length) {
|
||||||
@@ -229,17 +246,13 @@ export const makeLoader = (options: LoaderOptions) =>
|
|||||||
}
|
}
|
||||||
|
|
||||||
for (const request of allRequests) {
|
for (const request of allRequests) {
|
||||||
for (const relay of request.relays) {
|
for (const relay of uniq(request.relays)) {
|
||||||
pushToMapKey(requestsByRelay, relay, request)
|
pushToMapKey(requestsByRelay, relay, request)
|
||||||
resultsByRequest.set(request, defer())
|
resultsByRequest.set(request, defer())
|
||||||
}
|
|
||||||
|
|
||||||
// Propagate abort when all requests have been closed for a given relay
|
// Propagate abort when all requests have been closed for a given relay
|
||||||
request.signal?.addEventListener('abort', () => {
|
request.signal?.addEventListener('abort', () => close(relay, request))
|
||||||
for (const relay of request.relays) {
|
|
||||||
close(relay, request)
|
|
||||||
}
|
}
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create an abort controller for each relay
|
// Create an abort controller for each relay
|
||||||
|
|||||||
Reference in New Issue
Block a user