Batch load requests
This commit is contained in:
@@ -14,7 +14,7 @@ import {
|
|||||||
isUnwrappedEvent,
|
isUnwrappedEvent,
|
||||||
isSignedEvent,
|
isSignedEvent,
|
||||||
} from "@welshman/util"
|
} from "@welshman/util"
|
||||||
import {MultiPublish, PublishStatus, PublishEvent} from "@welshman/net"
|
import {MultiPublish, AdapterContext, PublishStatus, PublishEvent} from "@welshman/net"
|
||||||
import {repository, tracker} from "./core.js"
|
import {repository, tracker} from "./core.js"
|
||||||
import {pubkey, getSession, getSigner} from "./session.js"
|
import {pubkey, getSession, getSigner} from "./session.js"
|
||||||
|
|
||||||
@@ -26,7 +26,7 @@ export type ThunkRequest = {
|
|||||||
event: ThunkEvent
|
event: ThunkEvent
|
||||||
relays: string[]
|
relays: string[]
|
||||||
delay?: number
|
delay?: number
|
||||||
context?: AdapterContext,
|
context?: AdapterContext
|
||||||
}
|
}
|
||||||
|
|
||||||
export type ThunkStatus = {
|
export type ThunkStatus = {
|
||||||
|
|||||||
+52
-16
@@ -1,7 +1,13 @@
|
|||||||
import {EventEmitter} from "events"
|
import {EventEmitter} from "events"
|
||||||
import {verifyEvent as nostrToolsVerifyEvent} from "nostr-tools/pure"
|
import {verifyEvent as nostrToolsVerifyEvent} from "nostr-tools/pure"
|
||||||
import {on, call, randomId, yieldThread} from "@welshman/lib"
|
import {on, call, randomId, yieldThread, pushToMapKey, batcher} from "@welshman/lib"
|
||||||
import {Filter, matchFilter, TrustedEvent, getFilterResultCardinality} from "@welshman/util"
|
import {
|
||||||
|
Filter,
|
||||||
|
unionFilters,
|
||||||
|
matchFilter,
|
||||||
|
TrustedEvent,
|
||||||
|
getFilterResultCardinality,
|
||||||
|
} from "@welshman/util"
|
||||||
import {RelayMessage, ClientMessageType, isRelayEvent, isRelayEose} from "./message.js"
|
import {RelayMessage, ClientMessageType, isRelayEvent, isRelayEose} from "./message.js"
|
||||||
import {getAdapter, AdapterContext, AbstractAdapter, AdapterEvent} from "./adapter.js"
|
import {getAdapter, AdapterContext, AbstractAdapter, AdapterEvent} from "./adapter.js"
|
||||||
import {SocketEvent, SocketStatus} from "./socket.js"
|
import {SocketEvent, SocketStatus} from "./socket.js"
|
||||||
@@ -208,23 +214,53 @@ export class MultiRequest extends (EventEmitter as new () => TypedEmitter<MultiR
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* A convenience function which returns a promise of events from a request.
|
* A convenience function which returns a promise of events from a request.
|
||||||
* It may return early if filter cardinality is known.
|
* It may return early if filter cardinality is known, and it delays requests by
|
||||||
|
* 200 in order to implement batching
|
||||||
* @param options - MultiRequestOptions
|
* @param options - MultiRequestOptions
|
||||||
* @returns - a promise containing an array of TrustedEvents
|
* @returns - a promise containing an array of TrustedEvents
|
||||||
*/
|
*/
|
||||||
export const load = (options: MultiRequestOptions) =>
|
export const load = batcher(200, async (requests: MultiRequestOptions[]) => {
|
||||||
new Promise(resolve => {
|
const filtersByRelay = new Map<string, Filter[]>()
|
||||||
const cardinality = getFilterResultCardinality(options.filter)
|
|
||||||
const req = new MultiRequest({timeout: 5000, ...options, autoClose: true})
|
|
||||||
const events: TrustedEvent[] = []
|
|
||||||
|
|
||||||
req.on(RequestEvent.Event, (event: TrustedEvent) => {
|
for (const {filter, relays} of requests) {
|
||||||
events.push(event)
|
for (const relay of relays) {
|
||||||
|
pushToMapKey(filtersByRelay, relay, filter)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (events.length === cardinality) {
|
const tracker = new Tracker()
|
||||||
resolve(events)
|
const events: TrustedEvent[] = []
|
||||||
}
|
|
||||||
})
|
|
||||||
|
|
||||||
req.on(RequestEvent.Close, () => resolve(events))
|
await Promise.all(
|
||||||
})
|
Array.from(filtersByRelay).map(async ([relay, filters]) => {
|
||||||
|
await Promise.all(
|
||||||
|
unionFilters(filters).map(filter => {
|
||||||
|
new Promise<void>(resolve => {
|
||||||
|
const cardinality = getFilterResultCardinality(filter)
|
||||||
|
const req = new MultiRequest({
|
||||||
|
filter,
|
||||||
|
tracker,
|
||||||
|
relays: [relay],
|
||||||
|
timeout: 5000,
|
||||||
|
autoClose: true,
|
||||||
|
})
|
||||||
|
|
||||||
|
let count = 0
|
||||||
|
|
||||||
|
req.on(RequestEvent.Event, (event: TrustedEvent) => {
|
||||||
|
events.push(event)
|
||||||
|
|
||||||
|
if (++count === cardinality) {
|
||||||
|
resolve()
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
req.on(RequestEvent.Close, () => resolve())
|
||||||
|
})
|
||||||
|
}),
|
||||||
|
)
|
||||||
|
}),
|
||||||
|
)
|
||||||
|
|
||||||
|
return requests.map(r => events.filter(event => matchFilter(r.filter, event)))
|
||||||
|
})
|
||||||
|
|||||||
@@ -245,7 +245,7 @@ export const getFilterResultCardinality = (filter: Filter) => {
|
|||||||
return filter.ids.length
|
return filter.ids.length
|
||||||
}
|
}
|
||||||
|
|
||||||
return null
|
return undefined
|
||||||
}
|
}
|
||||||
|
|
||||||
export const trimFilter = (filter: Filter): Filter =>
|
export const trimFilter = (filter: Filter): Filter =>
|
||||||
|
|||||||
Reference in New Issue
Block a user