Add subscribe and publish utils

This commit is contained in:
Jon Staab
2025-03-21 12:47:16 -07:00
parent 1e681b16e2
commit 158f6d50f9
5 changed files with 118 additions and 12 deletions
+6 -8
View File
@@ -1,11 +1,9 @@
import EventEmitter from "events"
import TypedEventEmitter, {EventMap} from "typed-emitter"
import {call, on} from "@welshman/lib"
import {Relay, LOCAL_RELAY_URL} from "@welshman/util"
import {RelayMessage, ClientMessage} from "./message.js"
import {Socket, SocketEventType} from "./socket.js"
type TypedEmitter<T extends EventMap> = TypedEventEmitter.default<T>
import {TypedEmitter} from "./util.js"
type Unsubscriber = () => void
@@ -17,7 +15,7 @@ export type AdapterEvents = {
[AdapterEventType.Receive]: (message: RelayMessage, url: string) => void
}
export abstract class BaseAdapter extends (EventEmitter as new () => TypedEmitter<AdapterEvents>) {
export abstract class AbstractAdapter extends (EventEmitter as new () => TypedEmitter<AdapterEvents>) {
_unsubscribers: Unsubscriber[] = []
abstract sockets: Socket[]
@@ -28,7 +26,7 @@ export abstract class BaseAdapter extends (EventEmitter as new () => TypedEmitte
}
}
export class SocketsAdapter extends BaseAdapter {
export class SocketsAdapter extends AbstractAdapter {
constructor(readonly sockets: Socket[]) {
super()
@@ -46,7 +44,7 @@ export class SocketsAdapter extends BaseAdapter {
}
}
export class LocalAdapter extends BaseAdapter {
export class LocalAdapter extends AbstractAdapter {
constructor(readonly relay: Relay) {
super()
@@ -68,8 +66,8 @@ export class LocalAdapter extends BaseAdapter {
}
}
export class MultiAdapter extends BaseAdapter {
constructor(readonly adapters: BaseAdapter[]) {
export class MultiAdapter extends AbstractAdapter {
constructor(readonly adapters: AbstractAdapter[]) {
super()
this._unsubscribers = adapters.map(adapter => {
+46
View File
@@ -0,0 +1,46 @@
import {EventEmitter} from "events"
import {on} from "@welshman/lib"
import {SignedEvent} from "@welshman/util"
import {RelayMessage, isRelayOkMessage} from "./message.js"
import {AbstractAdapter, AdapterEventType} from "./adapter.js"
import {TypedEmitter} from "./util.js"
export enum PublishEventType {
Ok = "publish:event:ok",
}
export type PublishEvents = {
[PublishEventType.Ok]: (id: string, ok: boolean, detail: string, url: string) => void
}
export class Publish extends (EventEmitter as new () => TypedEmitter<PublishEvents>) {
_unsubscriber: () => void
constructor(
readonly adapter: AbstractAdapter,
readonly event: SignedEvent,
readonly verb = "EVENT",
) {
super()
this._unsubscriber = on(
adapter,
AdapterEventType.Receive,
(message: RelayMessage, url: string) => {
if (isRelayOkMessage(message)) {
const [_, id, ok, detail] = message
if (id === event.id) {
this.emit(PublishEventType.Ok, id, ok, detail, url)
}
}
},
)
adapter.send([verb, event])
}
close() {
this._unsubscriber()
}
}
+2 -4
View File
@@ -1,10 +1,8 @@
import WebSocket from "isomorphic-ws"
import EventEmitter from "events"
import TypedEventEmitter, {EventMap} from "typed-emitter"
import {on, now, ago, TaskQueue} from "@welshman/lib"
import type {RelayMessage, ClientMessage} from "./message.js"
type TypedEmitter<T extends EventMap> = TypedEventEmitter.default<T>
import {RelayMessage, ClientMessage} from "./message.js"
import {TypedEmitter} from "./util.js"
export enum SocketStatus {
Open = "socket:status:open",
+61
View File
@@ -0,0 +1,61 @@
import {EventEmitter} from "events"
import {on, randomId} from "@welshman/lib"
import {Filter, SignedEvent} from "@welshman/util"
import {RelayMessage, isRelayEventMessage, isRelayEoseMessage} from "./message.js"
import {AbstractAdapter, AdapterEventType} from "./adapter.js"
import {TypedEmitter} from "./util.js"
export enum SubscribeEventType {
Event = "subscribe:event:event",
Eose = "subscribe:event:eose",
}
export type SubscribeEvents = {
[SubscribeEventType.Event]: (event: SignedEvent, url: string) => void
[SubscribeEventType.Eose]: (url: string) => void
}
export class Subscribe extends (EventEmitter as new () => TypedEmitter<SubscribeEvents>) {
_id = `REQ-${randomId().slice(0, 8)}`
_unsubscriber: () => void
_closed = false
constructor(
readonly adapter: AbstractAdapter,
readonly filter: Filter,
) {
super()
this._unsubscriber = on(
adapter,
AdapterEventType.Receive,
(message: RelayMessage, url: string) => {
if (isRelayEventMessage(message)) {
const [_, id, event] = message
if (id === this._id) {
this.emit(SubscribeEventType.Event, event, url)
}
}
if (isRelayEoseMessage(message)) {
const [_, id] = message
if (id === this._id) {
this.emit(SubscribeEventType.Eose, url)
}
}
},
)
adapter.send(["REQ", this._id, filter])
}
close() {
if (!this._closed) {
this.adapter.send(["CLOSE", this._id])
this._unsubscriber()
this._closed = true
}
}
}
+3
View File
@@ -0,0 +1,3 @@
import TypedEventEmitter, {EventMap} from "typed-emitter"
export type TypedEmitter<T extends EventMap> = TypedEventEmitter.default<T>