Add diff utils, change how adapter is created, scope down subscribe/publish to a single relay for fastest possible completion

This commit is contained in:
Jon Staab
2025-03-24 15:30:49 -07:00
parent d7b74d2c7e
commit 88b44776d8
4 changed files with 276 additions and 112 deletions
+16 -17
View File
@@ -2,7 +2,7 @@ import {EventEmitter} from "events"
import {on, call, randomId, yieldThread} from "@welshman/lib"
import {Filter, matchFilter, SignedEvent} from "@welshman/util"
import {RelayMessage, ClientMessageType, isRelayEvent, isRelayEose} from "./message.js"
import {AbstractAdapter, AdapterEventType} from "./adapter.js"
import {getAdapter, AdapterContext, AbstractAdapter, AdapterEventType} from "./adapter.js"
import {SocketEventType, SocketStatus} from "./socket.js"
import {TypedEmitter, Unsubscriber} from "./util.js"
import {Tracker} from "./tracker.js"
@@ -28,11 +28,12 @@ export type SubscriptionEvents = {
}
export type SubscriptionOptions = {
adapter: AbstractAdapter
autoClose?: boolean
relay: string
filter: Filter
context: AdapterContext
timeout?: number
tracker?: Tracker
autoClose?: boolean
verifyEvent?: (event: SignedEvent) => boolean
on?: Partial<SubscriptionEvents>
}
@@ -40,18 +41,18 @@ export type SubscriptionOptions = {
export class Subscription extends (EventEmitter as new () => TypedEmitter<SubscriptionEvents>) {
_id = `REQ-${randomId().slice(0, 8)}`
_unsubscribers: Unsubscriber[] = []
_done = new Set<string>()
_adapter: AbstractAdapter
_closed = false
constructor(readonly options: SubscriptionOptions) {
super()
// Get our unique urls so we know when we're done
const urls = new Set(this.options.adapter.urls)
// Set up our adapter
this._adapter = getAdapter(this.options.relay, this.options.context)
// Listen for event/eose messages from the adapter
this._unsubscribers.push(
on(this.options.adapter, AdapterEventType.Receive, (message: RelayMessage, url: string) => {
on(this._adapter, AdapterEventType.Receive, (message: RelayMessage, url: string) => {
if (isRelayEvent(message)) {
const [_, id, event] = message
@@ -74,9 +75,7 @@ export class Subscription extends (EventEmitter as new () => TypedEmitter<Subscr
if (id === this._id) {
this.emit(SubscriptionEventType.Eose, url)
this._done.add(url)
if (this.options.autoClose && this._done.size === urls.size) {
if (this.options.autoClose) {
this.close()
}
}
@@ -85,15 +84,13 @@ export class Subscription extends (EventEmitter as new () => TypedEmitter<Subscr
)
// Listen to disconnects from any sockets
for (const socket of this.options.adapter.sockets) {
for (const socket of this._adapter.sockets) {
this._unsubscribers.push(
on(socket, SocketEventType.Status, (status: SocketStatus) => {
if (![SocketStatus.Open, SocketStatus.Opening].includes(status)) {
this.emit(SubscriptionEventType.Disconnect, socket.url)
this._done.add(socket.url)
if (this.options.autoClose && this._done.size === urls.size) {
if (this.options.autoClose) {
this.close()
}
}
@@ -119,17 +116,19 @@ export class Subscription extends (EventEmitter as new () => TypedEmitter<Subscr
}
// Send our request
this.options.adapter.send([ClientMessageType.Req, this._id, this.options.filter])
this._adapter.send([ClientMessageType.Req, this._id, this.options.filter])
}
close() {
if (this._closed) return
this.options.adapter.send(["CLOSE", this._id])
this._adapter.send(["CLOSE", this._id])
this.emit(SubscriptionEventType.Close)
this.options.adapter.cleanup()
this.removeAllListeners()
this._unsubscribers.map(call)
this._adapter.cleanup()
this._closed = true
}
}
export const subscribe = (options: SubscriptionOptions) => new Subscription(options)