Change requests from classes to functions
This commit is contained in:
+217
-227
@@ -1,5 +1,5 @@
|
||||
import {EventEmitter} from "events"
|
||||
import {on, call, randomId, yieldThread, pushToMapKey, batcher} from "@welshman/lib"
|
||||
import {on, flatten, addToMapKey, defer, Deferred, call, randomId, yieldThread, pushToMapKey, batcher} from "@welshman/lib"
|
||||
import {
|
||||
Filter,
|
||||
unionFilters,
|
||||
@@ -14,232 +14,165 @@ import {Unsubscriber} from "./util.js"
|
||||
import {netContext} from "./context.js"
|
||||
import {Tracker} from "./tracker.js"
|
||||
|
||||
export enum RequestEvent {
|
||||
Close = "request:event:close",
|
||||
Disconnect = "request:event:disconnect",
|
||||
Duplicate = "request:event:duplicate",
|
||||
Eose = "request:event:eose",
|
||||
Event = "request:event:event",
|
||||
Filtered = "request:event:filtered",
|
||||
Deleted = "request:event:deleted",
|
||||
Invalid = "request:event:invalid",
|
||||
}
|
||||
|
||||
// SingleRequest
|
||||
|
||||
export type SingleRequestEvents = {
|
||||
[RequestEvent.Event]: (event: TrustedEvent) => void
|
||||
[RequestEvent.Deleted]: (event: any) => void
|
||||
[RequestEvent.Invalid]: (event: any) => void
|
||||
[RequestEvent.Filtered]: (event: TrustedEvent) => void
|
||||
[RequestEvent.Duplicate]: (event: TrustedEvent) => void
|
||||
[RequestEvent.Disconnect]: () => void
|
||||
[RequestEvent.Close]: () => void
|
||||
[RequestEvent.Eose]: () => void
|
||||
}
|
||||
|
||||
export type SingleRequestOptions = {
|
||||
export type RequestOneOptions = {
|
||||
relay: string
|
||||
filters: Filter[]
|
||||
signal?: AbortSignal
|
||||
context?: AdapterContext
|
||||
timeout?: number
|
||||
tracker?: Tracker
|
||||
context?: AdapterContext
|
||||
autoClose?: boolean
|
||||
isEventValid?: (event: TrustedEvent, url: string) => boolean
|
||||
isEventDeleted?: (event: TrustedEvent, url: string) => boolean
|
||||
onEvent?: (event: TrustedEvent, url: string) => void
|
||||
onDeleted?: (event: unknown, url: string) => void
|
||||
onInvalid?: (event: unknown, url: string) => void
|
||||
onFiltered?: (event: TrustedEvent, url: string) => void
|
||||
onDuplicate?: (event: TrustedEvent, url: string) => void
|
||||
onDisconnect?: (url: string) => void
|
||||
onEose?: (url: string) => void
|
||||
onClose?: () => void
|
||||
}
|
||||
|
||||
export class SingleRequest extends EventEmitter {
|
||||
_ids = new Set<string>()
|
||||
_eose = new Set<string>()
|
||||
_unsubscribers: Unsubscriber[] = []
|
||||
_adapter: AbstractAdapter
|
||||
_closed = false
|
||||
export const requestOne = (options: RequestOneOptions) => {
|
||||
const ids = new Set<string>()
|
||||
const eose = new Set<string>()
|
||||
const events: TrustedEvent[] = []
|
||||
const deferred = defer<TrustedEvent[]>()
|
||||
const tracker = options.tracker || new Tracker()
|
||||
const adapter = getAdapter(options.relay, options.context)
|
||||
const isEventValid = options.isEventValid || netContext.isEventValid
|
||||
const isEventDeleted = options.isEventDeleted || netContext.isEventDeleted
|
||||
|
||||
constructor(readonly options: SingleRequestOptions) {
|
||||
super()
|
||||
let closed = false
|
||||
|
||||
const tracker = options.tracker || new Tracker()
|
||||
const isEventValid = options.isEventValid || netContext.isEventValid
|
||||
const isEventDeleted = options.isEventDeleted || netContext.isEventDeleted
|
||||
const close = () => {
|
||||
if (closed) return
|
||||
|
||||
// Set up our adapter
|
||||
this._adapter = getAdapter(this.options.relay, this.options.context)
|
||||
closed = true
|
||||
|
||||
// Listen for event/eose messages from the adapter
|
||||
this._unsubscribers.push(
|
||||
on(this._adapter, AdapterEvent.Receive, (message: RelayMessage, url: string) => {
|
||||
if (isRelayEvent(message)) {
|
||||
const [_, id, event] = message
|
||||
for (const id of ids) {
|
||||
adapter.send(["CLOSE", id])
|
||||
}
|
||||
|
||||
if (this._ids.has(id)) {
|
||||
if (tracker.track(event.id, url)) {
|
||||
this.emit(RequestEvent.Duplicate, event)
|
||||
} else if (isEventDeleted(event, url)) {
|
||||
this.emit(RequestEvent.Deleted, event)
|
||||
} else if (!isEventValid(event, url)) {
|
||||
this.emit(RequestEvent.Invalid, event)
|
||||
} else if (!matchFilters(this.options.filters, event)) {
|
||||
this.emit(RequestEvent.Filtered, event)
|
||||
} else {
|
||||
this.emit(RequestEvent.Event, event)
|
||||
options.onClose?.()
|
||||
adapter.cleanup()
|
||||
unsubscribers.map(call)
|
||||
deferred.resolve(events)
|
||||
}
|
||||
|
||||
const unsubscribers = [
|
||||
on(adapter, AdapterEvent.Receive, (message: RelayMessage, url: string) => {
|
||||
if (isRelayEvent(message)) {
|
||||
const [_, id, event] = message
|
||||
|
||||
if (ids.has(id)) {
|
||||
if (tracker.track(event.id, url)) {
|
||||
options.onDuplicate?.(event, url)
|
||||
} else if (isEventDeleted(event, url)) {
|
||||
options.onDeleted?.(event, url)
|
||||
} else if (!isEventValid(event, url)) {
|
||||
options.onInvalid?.(event, url)
|
||||
} else if (!matchFilters(options.filters, event)) {
|
||||
options.onFiltered?.(event, url)
|
||||
} else {
|
||||
options.onEvent?.(event, url)
|
||||
events.push(event)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (isRelayEose(message)) {
|
||||
const [_, id] = message
|
||||
|
||||
if (ids.has(id)) {
|
||||
eose.add(id)
|
||||
|
||||
if (eose.size === ids.size) {
|
||||
options.onEose?.(url)
|
||||
|
||||
if (options.autoClose) {
|
||||
close()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}),
|
||||
]
|
||||
|
||||
if (isRelayEose(message)) {
|
||||
const [_, id] = message
|
||||
// Listen to disconnects from any sockets
|
||||
for (const socket of adapter.sockets) {
|
||||
unsubscribers.push(
|
||||
on(socket, SocketEvent.Status, (status: SocketStatus) => {
|
||||
if (![SocketStatus.Open, SocketStatus.Opening].includes(status)) {
|
||||
options.onDisconnect?.(socket.url)
|
||||
|
||||
if (this._ids.has(id)) {
|
||||
this._eose.add(id)
|
||||
|
||||
if (this._eose.size === this._ids.size) {
|
||||
this.emit(RequestEvent.Eose)
|
||||
|
||||
if (this.options.autoClose) {
|
||||
this.close()
|
||||
}
|
||||
}
|
||||
if (options.autoClose) {
|
||||
close()
|
||||
}
|
||||
}
|
||||
}),
|
||||
)
|
||||
|
||||
// Listen to disconnects from any sockets
|
||||
for (const socket of this._adapter.sockets) {
|
||||
this._unsubscribers.push(
|
||||
on(socket, SocketEvent.Status, (status: SocketStatus) => {
|
||||
if (![SocketStatus.Open, SocketStatus.Opening].includes(status)) {
|
||||
this.emit(RequestEvent.Disconnect)
|
||||
|
||||
if (this.options.autoClose) {
|
||||
this.close()
|
||||
}
|
||||
}
|
||||
}),
|
||||
)
|
||||
}
|
||||
|
||||
// Timeout our subscription
|
||||
if (this.options.timeout || this.options.autoClose) {
|
||||
setTimeout(() => this.close(), this.options.timeout || 10000)
|
||||
}
|
||||
|
||||
// Handle abort signal
|
||||
this.options.signal?.addEventListener("abort", () => this.close())
|
||||
|
||||
// Start asynchronously so the caller can set up listeners
|
||||
yieldThread().then(() => {
|
||||
for (const filter of this.options.filters) {
|
||||
const id = `REQ-${randomId().slice(0, 8)}`
|
||||
|
||||
this._ids.add(id)
|
||||
this._adapter.send([ClientMessageType.Req, id, filter])
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
close() {
|
||||
if (this._closed) return
|
||||
// Handle abort signal
|
||||
options.signal?.addEventListener("abort", close)
|
||||
|
||||
for (const id of this._ids) {
|
||||
this._adapter.send(["CLOSE", id])
|
||||
}
|
||||
|
||||
this._closed = true
|
||||
this.emit(RequestEvent.Close)
|
||||
this._adapter.cleanup()
|
||||
this._unsubscribers.map(call)
|
||||
this.removeAllListeners()
|
||||
// If we're auto-closing, make sure it happens even if the relay doesn't send an eose
|
||||
// and the caller doesn't provide a signal, in order to avoid memory leaks
|
||||
if (options.autoClose && !options.signal) {
|
||||
setTimeout(close, 30_000)
|
||||
}
|
||||
|
||||
for (const filter of options.filters) {
|
||||
const id = `REQ-${randomId().slice(0, 8)}`
|
||||
|
||||
ids.add(id)
|
||||
adapter.send([ClientMessageType.Req, id, filter])
|
||||
}
|
||||
|
||||
return deferred
|
||||
}
|
||||
|
||||
// MultiRequest
|
||||
|
||||
export type MultiRequestEvents = {
|
||||
[RequestEvent.Event]: (event: TrustedEvent, url: string) => void
|
||||
[RequestEvent.Deleted]: (event: TrustedEvent, url: string) => void
|
||||
[RequestEvent.Invalid]: (event: TrustedEvent, url: string) => void
|
||||
[RequestEvent.Filtered]: (event: TrustedEvent, url: string) => void
|
||||
[RequestEvent.Duplicate]: (event: TrustedEvent, url: string) => void
|
||||
[RequestEvent.Disconnect]: (url: string) => void
|
||||
[RequestEvent.Eose]: (url: string) => void
|
||||
[RequestEvent.Close]: () => void
|
||||
}
|
||||
|
||||
export type MultiRequestOptions = Omit<SingleRequestOptions, "relay"> & {
|
||||
export type RequestOptions = Omit<RequestOneOptions, "relay"> & {
|
||||
relays: string[]
|
||||
threshold?: number
|
||||
}
|
||||
|
||||
export class MultiRequest extends EventEmitter {
|
||||
_children: SingleRequest[] = []
|
||||
_closed = new Set<string>()
|
||||
export const request = async (options: RequestOptions) => {
|
||||
const closed = new Set<string>()
|
||||
const tracker = new Tracker()
|
||||
const relays = new Set(options.relays)
|
||||
const ctrl = new AbortController()
|
||||
const signal = options.signal ? AbortSignal.any([options.signal, ctrl.signal]) : ctrl.signal
|
||||
const threshold = options.threshold || 1
|
||||
const promises: Promise<TrustedEvent[]>[] = []
|
||||
|
||||
constructor(options: MultiRequestOptions) {
|
||||
super()
|
||||
|
||||
const tracker = new Tracker()
|
||||
const relays = new Set(options.relays)
|
||||
const threshold = options.threshold || 1
|
||||
|
||||
if (relays.size !== options.relays.length) {
|
||||
console.warn("Non-unique relays passed to MultiRequest")
|
||||
}
|
||||
|
||||
for (const relay of relays) {
|
||||
const req = new SingleRequest({relay, tracker, ...options})
|
||||
|
||||
req.on(RequestEvent.Event, (event: TrustedEvent) => {
|
||||
this.emit(RequestEvent.Event, event, relay)
|
||||
})
|
||||
|
||||
req.on(RequestEvent.Deleted, (event: TrustedEvent) => {
|
||||
this.emit(RequestEvent.Deleted, event, relay)
|
||||
})
|
||||
|
||||
req.on(RequestEvent.Invalid, (event: TrustedEvent) => {
|
||||
this.emit(RequestEvent.Invalid, event, relay)
|
||||
})
|
||||
|
||||
req.on(RequestEvent.Filtered, (event: TrustedEvent) => {
|
||||
this.emit(RequestEvent.Filtered, event, relay)
|
||||
})
|
||||
|
||||
req.on(RequestEvent.Duplicate, (event: TrustedEvent) => {
|
||||
this.emit(RequestEvent.Duplicate, event, relay)
|
||||
})
|
||||
|
||||
req.on(RequestEvent.Disconnect, () => {
|
||||
this.emit(RequestEvent.Disconnect, relay)
|
||||
})
|
||||
|
||||
req.on(RequestEvent.Eose, () => {
|
||||
this.emit(RequestEvent.Eose, relay)
|
||||
})
|
||||
|
||||
req.on(RequestEvent.Close, () => {
|
||||
this._closed.add(relay)
|
||||
|
||||
if (this._closed.size >= relays.size * threshold) {
|
||||
this.emit(RequestEvent.Close)
|
||||
this.close()
|
||||
}
|
||||
})
|
||||
|
||||
this._children.push(req)
|
||||
}
|
||||
if (relays.size !== options.relays.length) {
|
||||
console.warn("Non-unique relays passed to request")
|
||||
}
|
||||
|
||||
close() {
|
||||
for (const child of this._children) {
|
||||
child.close()
|
||||
}
|
||||
}
|
||||
return flatten(
|
||||
await Promise.all(
|
||||
Array.from(relays).map(relay =>
|
||||
requestOne({
|
||||
...options,
|
||||
tracker,
|
||||
signal,
|
||||
relay,
|
||||
onClose: () => {
|
||||
closed.add(relay)
|
||||
|
||||
if (closed.size >= relays.size * threshold) {
|
||||
options.onClose?.()
|
||||
ctrl.abort()
|
||||
}
|
||||
}
|
||||
})
|
||||
)
|
||||
)
|
||||
)
|
||||
}
|
||||
|
||||
export const request = (options: MultiRequestOptions) => new MultiRequest(options)
|
||||
|
||||
export type LoaderOptions = {
|
||||
delay: number
|
||||
@@ -253,61 +186,118 @@ export type LoaderOptions = {
|
||||
export type LoadOptions = {
|
||||
relays: string[]
|
||||
filters: Filter[]
|
||||
signal?: AbortSignal
|
||||
onEvent?: (event: TrustedEvent, url: string) => void
|
||||
onDisconnect?: (url: string) => void
|
||||
onEose?: (url: string) => void
|
||||
onClose?: () => void
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a convenience function which returns a promise of events from a request.
|
||||
* It may return early if filter cardinality is known, and it delays requests by
|
||||
* 200 in order to implement batching
|
||||
* @param options - MultiRequestOptions
|
||||
* @returns - a promise containing an array of TrustedEvents
|
||||
* It may return early if filter cardinality is known, and it delays requests in order
|
||||
* to implement batching
|
||||
* @param options - LoaderOptions
|
||||
* @returns - a load function
|
||||
*/
|
||||
export const makeLoader = (options: LoaderOptions) =>
|
||||
batcher(options.delay, async (requests: LoadOptions[]) => {
|
||||
const filtersByRelay = new Map<string, Filter[]>()
|
||||
batcher(options.delay, async (allRequests: LoadOptions[]) => {
|
||||
const resultsByRequest = new Map<LoadOptions, Deferred<TrustedEvent[]>>()
|
||||
const eventsByRequest = new Map<LoadOptions, TrustedEvent[]>()
|
||||
const requestsByRelay = new Map<string, LoadOptions[]>()
|
||||
const controllersByRelay = new Map<string, AbortController>()
|
||||
const signalsByRelay = new Map<string, AbortSignal>()
|
||||
const closedRequestsByRelay = new Map<string, Set<LoadOptions>>()
|
||||
const closedRelaysByRequest = new Map<LoadOptions, Set<string>>()
|
||||
const relays = allRequests.flatMap(r => r.relays)
|
||||
const threshold = options.threshold || 1
|
||||
const tracker = new Tracker()
|
||||
|
||||
for (const {filters, relays} of requests) {
|
||||
for (const relay of relays) {
|
||||
for (const filter of filters) {
|
||||
pushToMapKey(filtersByRelay, relay, filter)
|
||||
}
|
||||
const close = (relay: string, request: LoadOptions) => {
|
||||
addToMapKey(closedRequestsByRelay, relay, request)
|
||||
addToMapKey(closedRelaysByRequest, request, relay)
|
||||
|
||||
const closedRelays = closedRelaysByRequest.get(request)?.size || 0
|
||||
if (closedRelays >= request.relays.length * threshold) {
|
||||
request.onClose?.()
|
||||
resultsByRequest.get(request)?.resolve(eventsByRequest.get(request) || [])
|
||||
}
|
||||
|
||||
if (closedRequestsByRelay.get(relay)?.size === requestsByRelay.get(relay)?.length) {
|
||||
controllersByRelay.get(relay)?.abort()
|
||||
}
|
||||
}
|
||||
|
||||
const tracker = new Tracker()
|
||||
const events: TrustedEvent[] = []
|
||||
for (const request of allRequests) {
|
||||
for (const relay of request.relays) {
|
||||
pushToMapKey(requestsByRelay, relay, request)
|
||||
resultsByRequest.set(request, defer())
|
||||
}
|
||||
|
||||
await Promise.all(
|
||||
Array.from(filtersByRelay).map(
|
||||
async ([relay, unmergedFilters]) =>
|
||||
new Promise<void>(resolve => {
|
||||
const filters = unionFilters(unmergedFilters)
|
||||
const cardinality =
|
||||
filters.length === 1 ? getFilterResultCardinality(filters[0]) : undefined
|
||||
const req = new MultiRequest({
|
||||
filters,
|
||||
tracker,
|
||||
relays: [relay],
|
||||
autoClose: true,
|
||||
...options,
|
||||
})
|
||||
// Propagate abort when all requests have been closed for a given relay
|
||||
request.signal?.addEventListener('abort', () => {
|
||||
for (const relay of request.relays) {
|
||||
close(relay, request)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
let count = 0
|
||||
// Create an abort controller for each relay
|
||||
for (const relay of relays) {
|
||||
const controller = new AbortController()
|
||||
const signals = [controller.signal]
|
||||
|
||||
req.on(RequestEvent.Event, (event: TrustedEvent) => {
|
||||
events.push(event)
|
||||
if (options.timeout) {
|
||||
signals.push(AbortSignal.timeout(options.timeout))
|
||||
}
|
||||
|
||||
if (++count === cardinality) {
|
||||
resolve()
|
||||
controllersByRelay.set(relay, controller)
|
||||
signalsByRelay.set(relay, AbortSignal.any(signals))
|
||||
}
|
||||
|
||||
Array.from(requestsByRelay).forEach(
|
||||
async ([relay, requests]) => {
|
||||
// Union all filters for a given request and send them together
|
||||
const filters = unionFilters(requests.flatMap(r => r.filters))
|
||||
|
||||
// Propagate events to caller, but only for requests that have not been aborted
|
||||
const getOpenRequests = () =>
|
||||
requests.filter(request => !closedRequestsByRelay.get(relay)?.has(request))
|
||||
|
||||
requestOne({
|
||||
relay,
|
||||
filters,
|
||||
tracker,
|
||||
autoClose: true,
|
||||
signal: signalsByRelay.get(relay),
|
||||
context: options.context,
|
||||
isEventValid: options.isEventValid,
|
||||
isEventDeleted: options.isEventDeleted,
|
||||
onEvent: (event: TrustedEvent, url: string) => {
|
||||
for (const request of getOpenRequests()) {
|
||||
if (matchFilters(request.filters, event)) {
|
||||
pushToMapKey(eventsByRequest, request, event)
|
||||
request.onEvent?.(event, url)
|
||||
|
||||
// Calculate cardinality for unioned filters so that we can return early
|
||||
if (request.filters.length === 1) {
|
||||
const cardinality = getFilterResultCardinality(request.filters[0])
|
||||
|
||||
if (eventsByRequest.get(request)?.length === cardinality) {
|
||||
close(relay, request)
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
req.on(RequestEvent.Close, () => resolve())
|
||||
}),
|
||||
),
|
||||
}
|
||||
},
|
||||
onDisconnect: (url: string) => getOpenRequests().forEach(request => request.onDisconnect?.(url)),
|
||||
onEose: (url: string) => getOpenRequests().forEach(request => request.onEose?.(url)),
|
||||
onClose: () => requests.forEach(request => close(relay, request)),
|
||||
})
|
||||
}
|
||||
)
|
||||
|
||||
return requests.map(r => events.filter(event => matchFilters(r.filters, event)))
|
||||
return allRequests.map(r => resultsByRequest.get(r))
|
||||
})
|
||||
|
||||
export const load = makeLoader({delay: 200, timeout: 3000, threshold: 0.5})
|
||||
|
||||
Reference in New Issue
Block a user