diff --git a/packages/net2/src/adapter.ts b/packages/net2/src/adapter.ts index 459232c..c84bd8b 100644 --- a/packages/net2/src/adapter.ts +++ b/packages/net2/src/adapter.ts @@ -3,9 +3,7 @@ import {call, on} from "@welshman/lib" import {Relay, LOCAL_RELAY_URL} from "@welshman/util" import {RelayMessage, ClientMessage} from "./message.js" import {Socket, SocketEventType} from "./socket.js" -import {TypedEmitter} from "./util.js" - -type Unsubscriber = () => void +import {TypedEmitter, Unsubscriber} from "./util.js" export enum AdapterEventType { Receive = "adapter:event:receive", @@ -18,6 +16,7 @@ export type AdapterEvents = { export abstract class AbstractAdapter extends (EventEmitter as new () => TypedEmitter) { _unsubscribers: Unsubscriber[] = [] + abstract urls: string[] abstract sockets: Socket[] abstract send(message: ClientMessage): void @@ -37,6 +36,10 @@ export class SocketsAdapter extends AbstractAdapter { }) } + get urls() { + return this.sockets.map(socket => socket.url) + } + send(message: ClientMessage) { for (const socket of this.sockets) { socket.send(message) @@ -55,6 +58,10 @@ export class LocalAdapter extends AbstractAdapter { ] } + get urls() { + return [LOCAL_RELAY_URL] + } + get sockets() { return [] } @@ -77,6 +84,10 @@ export class MultiAdapter extends AbstractAdapter { }) } + get urls() { + return this.adapters.flatMap(t => t.urls) + } + get sockets() { return this.adapters.flatMap(t => t.sockets) } diff --git a/packages/net2/src/auth.ts b/packages/net2/src/auth.ts index 1739f66..ff94350 100644 --- a/packages/net2/src/auth.ts +++ b/packages/net2/src/auth.ts @@ -3,8 +3,8 @@ import {on, call, sleep} from "@welshman/lib" import type {SignedEvent, StampedEvent} from "@welshman/util" import {makeEvent, CLIENT_AUTH} from "@welshman/util" import {isRelayAuth, isClientAuth, isRelayOk, RelayMessage} from "./message.js" -import {Socket, SocketStatus, SocketEventType, SocketUnsubscriber} from "./socket.js" -import {TypedEmitter} from "./util.js" +import {Socket, SocketStatus, SocketEventType} from "./socket.js" +import {TypedEmitter, Unsubscriber} from "./util.js" export const makeAuthEvent = (url: string, challenge: string) => makeEvent(CLIENT_AUTH, { @@ -42,7 +42,7 @@ export class AuthState extends (EventEmitter as new () => TypedEmitter void } -export type SocketUnsubscriber = () => void - export class Socket extends (EventEmitter as new () => TypedEmitter) { _ws?: WebSocket _sendQueue: TaskQueue diff --git a/packages/net2/src/subscribe.ts b/packages/net2/src/subscribe.ts index 0ec2f43..9f2e685 100644 --- a/packages/net2/src/subscribe.ts +++ b/packages/net2/src/subscribe.ts @@ -1,40 +1,68 @@ import {EventEmitter} from "events" -import {on, randomId} from "@welshman/lib" -import {Filter, SignedEvent} from "@welshman/util" +import {on, call, randomId} from "@welshman/lib" +import {Filter, matchFilter, SignedEvent} from "@welshman/util" import {RelayMessage, isRelayEvent, isRelayEose} from "./message.js" import {AbstractAdapter, AdapterEventType} from "./adapter.js" -import {TypedEmitter} from "./util.js" +import {SocketEventType, SocketStatus} from "./socket.js" +import {TypedEmitter, Unsubscriber} from "./util.js" +import {Tracker} from "./tracker.js" -export enum SubscribeEventType { - Event = "subscribe:event:event", - Eose = "subscribe:event:eose", +export enum SubscriptionEventType { + Close = "subscription:event:close", + Disconnect = "subscription:event:disconnect", + Duplicate = "subscription:event:duplicate", + Eose = "subscription:event:eose", + Event = "subscription:event:event", + Filtered = "subscription:event:filtered", + Invalid = "subscription:event:invalid", } -export type SubscribeEvents = { - [SubscribeEventType.Event]: (event: SignedEvent, url: string) => void - [SubscribeEventType.Eose]: (url: string) => void +export type SubscriptionEvents = { + [SubscriptionEventType.Close]: () => void + [SubscriptionEventType.Disconnect]: (url: string) => void + [SubscriptionEventType.Duplicate]: (event: SignedEvent, url: string) => void + [SubscriptionEventType.Eose]: (url: string) => void + [SubscriptionEventType.Event]: (event: SignedEvent, url: string) => void + [SubscriptionEventType.Filtered]: (event: SignedEvent, url: string) => void + [SubscriptionEventType.Invalid]: (event: SignedEvent, url: string) => void } -export class Subscribe extends (EventEmitter as new () => TypedEmitter) { +export type SubscriptionOptions = { + adapter: AbstractAdapter + autoClose?: boolean + filter: Filter + timeout?: number + tracker?: Tracker + verifyEvent?: (event: SignedEvent) => boolean + events?: Partial +} + +export class Subscription extends (EventEmitter as new () => TypedEmitter) { _id = `REQ-${randomId().slice(0, 8)}` - _unsubscriber: () => void + _unsubscribers: Unsubscriber[] = [] _closed = false - constructor( - readonly adapter: AbstractAdapter, - readonly filter: Filter, - ) { + constructor(readonly options: SubscriptionOptions) { super() - this._unsubscriber = on( - adapter, - AdapterEventType.Receive, - (message: RelayMessage, url: string) => { + const done = new Set() + const urls = new Set(options.adapter.urls) + + this._unsubscribers.push( + on(options.adapter, AdapterEventType.Receive, (message: RelayMessage, url: string) => { if (isRelayEvent(message)) { const [_, id, event] = message - if (id === this._id) { - this.emit(SubscribeEventType.Event, event, url) + if (id !== this._id) return + + if (options.tracker?.track(event.id, url)) { + this.emit(SubscriptionEventType.Duplicate, event, url) + } else if (options.verifyEvent?.(event) === false) { + this.emit(SubscriptionEventType.Invalid, event, url) + } else if (!matchFilter(options.filter, event)) { + this.emit(SubscriptionEventType.Filtered, event, url) + } else { + this.emit(SubscriptionEventType.Event, event, url) } } @@ -42,20 +70,54 @@ export class Subscribe extends (EventEmitter as new () => TypedEmitter { + if (![SocketStatus.Open, SocketStatus.Opening].includes(status)) { + this.emit(SubscriptionEventType.Disconnect, socket.url) + + done.add(socket.url) + + if (options.autoClose && done.size === urls.size) { + this.close() + } + } + }), + ) + } + + if (options.timeout) { + setTimeout(() => this.close(), options.timeout) + } + + if (options.events) { + for (const [k, listener] of Object.entries(options.events)) { + this.on(k as keyof SubscriptionEvents, listener) + } + } + + options.adapter.send(["REQ", this._id, options.filter]) } close() { if (this._closed) return - this.adapter.send(["CLOSE", this._id]) - this._unsubscriber() + this.options.adapter.send(["CLOSE", this._id]) + this.emit(SubscriptionEventType.Close) + this.removeAllListeners() + this._unsubscribers.map(call) this._closed = true } } diff --git a/packages/net2/src/tracker.ts b/packages/net2/src/tracker.ts new file mode 100644 index 0000000..81630fe --- /dev/null +++ b/packages/net2/src/tracker.ts @@ -0,0 +1,85 @@ +import {Emitter, addToMapKey} from "@welshman/lib" + +export class Tracker extends Emitter { + relaysById = new Map>() + idsByRelay = new Map>() + + constructor() { + super() + + this.setMaxListeners(100) + } + + getIds = (relay: string) => this.idsByRelay.get(relay) || new Set() + + getRelays = (eventId: string) => this.relaysById.get(eventId) || new Set() + + hasRelay = (eventId: string, relay: string) => this.relaysById.get(eventId)?.has(relay) + + addRelay = (eventId: string, relay: string) => { + let relays = this.relaysById.get(eventId) + let ids = this.idsByRelay.get(relay) + + if (relays?.has(relay) && ids?.has(eventId)) return + + if (!relays) { + relays = new Set() + } + + if (!ids) { + ids = new Set() + } + + relays.add(relay) + ids.add(eventId) + + this.relaysById.set(eventId, relays) + this.idsByRelay.set(relay, ids) + + this.emit("update") + } + + removeRelay = (eventId: string, relay: string) => { + const didDeleteRelay = this.relaysById.get(eventId)?.delete(relay) + const didDeleteId = this.idsByRelay.get(relay)?.delete(eventId) + + if (!didDeleteRelay && !didDeleteId) return + + this.emit("update") + } + + track = (eventId: string, relay: string) => { + const seen = this.relaysById.has(eventId) + + this.addRelay(eventId, relay) + + return seen + } + + copy = (eventId1: string, eventId2: string) => { + for (const relay of this.getRelays(eventId1)) { + this.addRelay(eventId2, relay) + } + } + + load = (relaysById: Tracker["relaysById"]) => { + this.relaysById.clear() + this.idsByRelay.clear() + + for (const [id, relays] of relaysById.entries()) { + for (const relay of relays) { + addToMapKey(this.relaysById, id, relay) + addToMapKey(this.idsByRelay, relay, id) + } + } + + this.emit("update") + } + + clear = () => { + this.relaysById.clear() + this.idsByRelay.clear() + + this.emit("update") + } +} diff --git a/packages/net2/src/util.ts b/packages/net2/src/util.ts index 0b6ced4..c1abb8a 100644 --- a/packages/net2/src/util.ts +++ b/packages/net2/src/util.ts @@ -1,3 +1,5 @@ import TypedEventEmitter, {EventMap} from "typed-emitter" export type TypedEmitter = TypedEventEmitter.default + +export type Unsubscriber = () => void