From 1e681b16e2eeefda750b55ad72806704b632126b Mon Sep 17 00:00:00 2001 From: Jon Staab Date: Fri, 21 Mar 2025 12:24:14 -0700 Subject: [PATCH] Use typesafe event emitters --- packages/net2/src/adapter.ts | 111 ++++++++++++--------------- packages/net2/src/auth.ts | 8 +- packages/net2/src/socket.ts | 142 ++++++++--------------------------- 3 files changed, 85 insertions(+), 176 deletions(-) diff --git a/packages/net2/src/adapter.ts b/packages/net2/src/adapter.ts index 5167e23..9721e17 100644 --- a/packages/net2/src/adapter.ts +++ b/packages/net2/src/adapter.ts @@ -1,57 +1,61 @@ -import {eq, on, call} from "@welshman/lib" -import {Relay} from "@welshman/util" +import EventEmitter from "events" +import TypedEventEmitter, {EventMap} from "typed-emitter" +import {call, on} from "@welshman/lib" +import {Relay, LOCAL_RELAY_URL} from "@welshman/util" import {RelayMessage, ClientMessage} from "./message.js" -import {Socket} from "./socket.js" +import {Socket, SocketEventType} from "./socket.js" + +type TypedEmitter = TypedEventEmitter.default type Unsubscriber = () => void -const trackUnsubscribers = (all: Unsubscriber[], local: Unsubscriber[]) => { - all.push(...local) - - return () => { - local.forEach(call) - - for (const f of local) { - all.splice(all.findIndex(eq(f)), 1) - } - } +export enum AdapterEventType { + Receive = "adapter:event:receive", } -type RelayMessageSub = (message: RelayMessage) => void - -export interface IAdapter { - sockets: Socket[] - send(message: ClientMessage): void - onMessage(cb: RelayMessageSub): Unsubscriber +export type AdapterEvents = { + [AdapterEventType.Receive]: (message: RelayMessage, url: string) => void } -export class SocketsAdapter implements IAdapter { +export abstract class BaseAdapter extends (EventEmitter as new () => TypedEmitter) { _unsubscribers: Unsubscriber[] = [] - constructor(readonly sockets: Socket[]) {} - - send(message: ClientMessage) { - for (const socket of this.sockets) { - socket.send(message) - } - } - - onMessage(cb: RelayMessageSub) { - return trackUnsubscribers( - this._unsubscribers, - this.sockets.map(s => s.onMessage(cb)), - ) - } + abstract sockets: Socket[] + abstract send(message: ClientMessage): void cleanup() { this._unsubscribers.splice(0).forEach(call) } } -export class LocalAdapter { - _unsubscribers: Unsubscriber[] = [] +export class SocketsAdapter extends BaseAdapter { + constructor(readonly sockets: Socket[]) { + super() - constructor(readonly relay: Relay) {} + this._unsubscribers = sockets.map(socket => { + return on(socket, SocketEventType.Receive, (message: RelayMessage, url: string) => { + this.emit(AdapterEventType.Receive, message, url) + }) + }) + } + + send(message: ClientMessage) { + for (const socket of this.sockets) { + socket.send(message) + } + } +} + +export class LocalAdapter extends BaseAdapter { + constructor(readonly relay: Relay) { + super() + + this._unsubscribers = [ + on(relay, "*", (...message: RelayMessage) => { + this.emit(AdapterEventType.Receive, message, LOCAL_RELAY_URL) + }), + ] + } get sockets() { return [] @@ -62,22 +66,18 @@ export class LocalAdapter { this.relay.send(type, ...rest) } - - onMessage(cb: RelayMessageSub) { - return trackUnsubscribers(this._unsubscribers, [ - on(this.relay, "*", (...args: any[]) => cb(args)), - ]) - } - - cleanup() { - this._unsubscribers.splice(0).forEach(call) - } } -export class MultiAdapter { - _unsubscribers: Unsubscriber[] = [] +export class MultiAdapter extends BaseAdapter { + constructor(readonly adapters: BaseAdapter[]) { + super() - constructor(readonly adapters: IAdapter[]) {} + this._unsubscribers = adapters.map(adapter => { + return on(adapter, AdapterEventType.Receive, (message: RelayMessage, url: string) => { + this.emit(AdapterEventType.Receive, message, url) + }) + }) + } get sockets() { return this.adapters.flatMap(t => t.sockets) @@ -88,15 +88,4 @@ export class MultiAdapter { adapter.send(message) } } - - onMessage(cb: RelayMessageSub) { - return trackUnsubscribers( - this._unsubscribers, - this.adapters.map(a => a.onMessage(cb)), - ) - } - - cleanup() { - this._unsubscribers.splice(0).forEach(call) - } } diff --git a/packages/net2/src/auth.ts b/packages/net2/src/auth.ts index 718c90a..2269df9 100644 --- a/packages/net2/src/auth.ts +++ b/packages/net2/src/auth.ts @@ -1,8 +1,8 @@ -import {sleep} from "@welshman/lib" +import {on, sleep} from "@welshman/lib" import type {SignedEvent, StampedEvent} from "@welshman/util" import {makeEvent, CLIENT_AUTH} from "@welshman/util" import {isRelayAuthMessage, isRelayOkMessage, RelayMessage} from "./message.js" -import {Socket, SocketStatus, SocketUnsubscriber} from "./socket.js" +import {Socket, SocketStatus, SocketEventType, SocketUnsubscriber} from "./socket.js" export const makeAuthEvent = (url: string, challenge: string) => makeEvent(CLIENT_AUTH, { @@ -44,7 +44,7 @@ export class AuthManager { readonly options: AuthManagerOptions, ) { this._unsubscribers.push( - socket.onMessage((message: RelayMessage) => { + on(socket, SocketEventType.Receive, (message: RelayMessage) => { if (isRelayOkMessage(message)) { const [_, id, ok, details] = message @@ -75,7 +75,7 @@ export class AuthManager { ) this._unsubscribers.push( - socket.onStatus(status => { + on(socket, SocketEventType.Status, (status: SocketStatus) => { if (status === SocketStatus.Closed) { this.challenge = undefined this.request = undefined diff --git a/packages/net2/src/socket.ts b/packages/net2/src/socket.ts index 0f6e1bb..1892c93 100644 --- a/packages/net2/src/socket.ts +++ b/packages/net2/src/socket.ts @@ -1,7 +1,11 @@ import WebSocket from "isomorphic-ws" -import {remove, now, ago, TaskQueue} from "@welshman/lib" +import EventEmitter from "events" +import TypedEventEmitter, {EventMap} from "typed-emitter" +import {on, now, ago, TaskQueue} from "@welshman/lib" import type {RelayMessage, ClientMessage} from "./message.js" +type TypedEmitter = TypedEventEmitter.default + export enum SocketStatus { Open = "socket:status:open", Opening = "socket:status:opening", @@ -14,81 +18,39 @@ export enum SocketStatus { export enum SocketEventType { Error = "socket:event:error", Status = "socket:event:status", - Message = "socket:event:message", + Send = "socket:event:send", + Receive = "socket:event:receive", } -export type SocketErrorEvent = { - type: SocketEventType.Error - error: string +export type SocketEvents = { + [SocketEventType.Error]: (error: string, url: string) => void + [SocketEventType.Status]: (status: SocketStatus, url: string) => void + [SocketEventType.Send]: (message: ClientMessage, url: string) => void + [SocketEventType.Receive]: (message: RelayMessage, url: string) => void } -export type SocketStatusEvent = { - type: SocketEventType.Status - status: SocketStatus -} - -export type SocketMessageEvent = { - type: SocketEventType.Message - message: RelayMessage -} - -export type SocketEvent = SocketErrorEvent | SocketStatusEvent | SocketMessageEvent - -export const makeSocketErrorEvent = (error: string): SocketErrorEvent => ({ - type: SocketEventType.Error, - error, -}) - -export const makeSocketStatusEvent = (status: SocketStatus): SocketStatusEvent => ({ - type: SocketEventType.Status, - status, -}) - -export const makeSocketMessageEvent = (message: RelayMessage): SocketMessageEvent => ({ - type: SocketEventType.Message, - message, -}) - -export const isSocketErrorEvent = (event: SocketEvent): event is SocketErrorEvent => - event.type === SocketEventType.Error - -export const isSocketStatusEvent = (event: SocketEvent): event is SocketStatusEvent => - event.type === SocketEventType.Status - -export const isSocketMessageEvent = (event: SocketEvent): event is SocketMessageEvent => - event.type === SocketEventType.Message - -export type SocketSendSubscriber = (message: ClientMessage) => void - -export type SocketRecvSubscriber = (event: SocketEvent) => void - export type SocketUnsubscriber = () => void -export class Socket { +export class Socket extends (EventEmitter as new () => TypedEmitter) { _ws?: WebSocket - _sendSubs: SocketSendSubscriber[] = [] - _recvSubs: SocketRecvSubscriber[] = [] _sendQueue: TaskQueue - _recvQueue: TaskQueue + _recvQueue: TaskQueue constructor(readonly url: string) { + super() + this._sendQueue = new TaskQueue({ batchSize: 50, processItem: (message: ClientMessage) => { this._ws?.send(JSON.stringify(message)) - - for (const cb of this._sendSubs) { - cb(message) - } + this.emit(SocketEventType.Send, message, this.url) }, }) - this._recvQueue = new TaskQueue({ + this._recvQueue = new TaskQueue({ batchSize: 50, - processItem: (event: SocketEvent) => { - for (const cb of this._recvSubs) { - cb(event) - } + processItem: (message: RelayMessage) => { + this.emit(SocketEventType.Receive, message, this.url) }, }) } @@ -100,17 +62,17 @@ export class Socket { try { this._ws = new WebSocket(this.url) - this._recvQueue.push(makeSocketStatusEvent(SocketStatus.Opening)) + this.emit(SocketEventType.Status, SocketStatus.Opening, this.url) - this._ws.onopen = () => this._recvQueue.push(makeSocketStatusEvent(SocketStatus.Open)) + this._ws.onopen = () => this.emit(SocketEventType.Status, SocketStatus.Open, this.url) this._ws.onerror = () => { - this._recvQueue.push(makeSocketStatusEvent(SocketStatus.Error)) + this.emit(SocketEventType.Status, SocketStatus.Error, this.url) this._ws = undefined } this._ws.onclose = () => { - this._recvQueue.push(makeSocketStatusEvent(SocketStatus.Closed)) + this.emit(SocketEventType.Status, SocketStatus.Closed, this.url) this._ws = undefined } @@ -121,16 +83,16 @@ export class Socket { const message = JSON.parse(data) if (Array.isArray(message)) { - this._recvQueue.push(makeSocketMessageEvent(message as RelayMessage)) + this._recvQueue.push(message as RelayMessage) } else { - this._recvQueue.push(makeSocketErrorEvent("Invalid message received")) + this.emit(SocketEventType.Error, "Invalid message received", this.url) } } catch (e) { - this._recvQueue.push(makeSocketErrorEvent("Invalid message received")) + this.emit(SocketEventType.Error, "Invalid message received", this.url) } } } catch (e) { - this._recvQueue.push(makeSocketStatusEvent(SocketStatus.Invalid)) + this.emit(SocketEventType.Status, SocketStatus.Invalid, this.url) } } @@ -147,60 +109,18 @@ export class Socket { cleanup = () => { this.close() - this._recvSubs = [] this._recvQueue.clear() - this._sendSubs = [] this._sendQueue.clear() } send = (message: ClientMessage) => { this._sendQueue.push(message) } - - onSend = (cb: SocketSendSubscriber) => { - this._sendSubs.push(cb) - - return () => { - this._sendSubs = remove(cb, this._sendSubs) - } - } - - subscribe = (cb: SocketRecvSubscriber) => { - this._recvSubs.push(cb) - - return () => { - this._recvSubs = remove(cb, this._recvSubs) - } - } - - onError = (cb: (error: string) => void) => { - return this.subscribe((event: SocketEvent) => { - if (isSocketErrorEvent(event)) { - cb(event.error) - } - }) - } - - onStatus = (cb: (status: SocketStatus) => void) => { - return this.subscribe((event: SocketEvent) => { - if (isSocketStatusEvent(event)) { - cb(event.status) - } - }) - } - - onMessage = (cb: (message: RelayMessage) => void) => { - return this.subscribe((event: SocketEvent) => { - if (isSocketMessageEvent(event)) { - cb(event.message) - } - }) - } } export const socketPolicySendWhenOpen = (socket: Socket) => { // Pause sending messages when the socket isn't open - const unsubscribe = socket.onStatus(newStatus => { + const unsubscribe = on(socket, SocketEventType.Status, newStatus => { if (newStatus === SocketStatus.Open) { socket._sendQueue.start() } else { @@ -215,7 +135,7 @@ export const socketPolicyConnectOnSend = (socket: Socket) => { let lastError = 0 let currentStatus = SocketStatus.Closed - const unsubscribeOnStatus = socket.onStatus(newStatus => { + const unsubscribeOnStatus = on(socket, SocketEventType.Status, (newStatus: SocketStatus) => { // Keep track of the most recent error if (newStatus === SocketStatus.Error) { lastError = now() @@ -225,7 +145,7 @@ export const socketPolicyConnectOnSend = (socket: Socket) => { currentStatus = newStatus }) - const unsubscribeOnSend = socket.onSend(message => { + const unsubscribeOnSend = on(socket, SocketEventType.Send, (message: ClientMessage) => { // When a new message is sent, make sure the socket is open (unless there was a recent error) if (currentStatus === SocketStatus.Closed && now() - lastError < ago(30)) { socket.open()