Use typesafe event emitters

This commit is contained in:
Jon Staab
2025-03-21 12:24:14 -07:00
parent b2126aeb5f
commit 1e681b16e2
3 changed files with 85 additions and 176 deletions
+50 -61
View File
@@ -1,57 +1,61 @@
import {eq, on, call} from "@welshman/lib" import EventEmitter from "events"
import {Relay} from "@welshman/util" import TypedEventEmitter, {EventMap} from "typed-emitter"
import {call, on} from "@welshman/lib"
import {Relay, LOCAL_RELAY_URL} from "@welshman/util"
import {RelayMessage, ClientMessage} from "./message.js" import {RelayMessage, ClientMessage} from "./message.js"
import {Socket} from "./socket.js" import {Socket, SocketEventType} from "./socket.js"
type TypedEmitter<T extends EventMap> = TypedEventEmitter.default<T>
type Unsubscriber = () => void type Unsubscriber = () => void
const trackUnsubscribers = (all: Unsubscriber[], local: Unsubscriber[]) => { export enum AdapterEventType {
all.push(...local) Receive = "adapter:event:receive",
return () => {
local.forEach(call)
for (const f of local) {
all.splice(all.findIndex(eq(f)), 1)
}
}
} }
type RelayMessageSub = (message: RelayMessage) => void export type AdapterEvents = {
[AdapterEventType.Receive]: (message: RelayMessage, url: string) => void
export interface IAdapter {
sockets: Socket[]
send(message: ClientMessage): void
onMessage(cb: RelayMessageSub): Unsubscriber
} }
export class SocketsAdapter implements IAdapter { export abstract class BaseAdapter extends (EventEmitter as new () => TypedEmitter<AdapterEvents>) {
_unsubscribers: Unsubscriber[] = [] _unsubscribers: Unsubscriber[] = []
constructor(readonly sockets: Socket[]) {} abstract sockets: Socket[]
abstract send(message: ClientMessage): void
send(message: ClientMessage) {
for (const socket of this.sockets) {
socket.send(message)
}
}
onMessage(cb: RelayMessageSub) {
return trackUnsubscribers(
this._unsubscribers,
this.sockets.map(s => s.onMessage(cb)),
)
}
cleanup() { cleanup() {
this._unsubscribers.splice(0).forEach(call) this._unsubscribers.splice(0).forEach(call)
} }
} }
export class LocalAdapter { export class SocketsAdapter extends BaseAdapter {
_unsubscribers: Unsubscriber[] = [] constructor(readonly sockets: Socket[]) {
super()
constructor(readonly relay: Relay) {} this._unsubscribers = sockets.map(socket => {
return on(socket, SocketEventType.Receive, (message: RelayMessage, url: string) => {
this.emit(AdapterEventType.Receive, message, url)
})
})
}
send(message: ClientMessage) {
for (const socket of this.sockets) {
socket.send(message)
}
}
}
export class LocalAdapter extends BaseAdapter {
constructor(readonly relay: Relay) {
super()
this._unsubscribers = [
on(relay, "*", (...message: RelayMessage) => {
this.emit(AdapterEventType.Receive, message, LOCAL_RELAY_URL)
}),
]
}
get sockets() { get sockets() {
return [] return []
@@ -62,22 +66,18 @@ export class LocalAdapter {
this.relay.send(type, ...rest) this.relay.send(type, ...rest)
} }
onMessage(cb: RelayMessageSub) {
return trackUnsubscribers(this._unsubscribers, [
on(this.relay, "*", (...args: any[]) => cb(args)),
])
}
cleanup() {
this._unsubscribers.splice(0).forEach(call)
}
} }
export class MultiAdapter { export class MultiAdapter extends BaseAdapter {
_unsubscribers: Unsubscriber[] = [] constructor(readonly adapters: BaseAdapter[]) {
super()
constructor(readonly adapters: IAdapter[]) {} this._unsubscribers = adapters.map(adapter => {
return on(adapter, AdapterEventType.Receive, (message: RelayMessage, url: string) => {
this.emit(AdapterEventType.Receive, message, url)
})
})
}
get sockets() { get sockets() {
return this.adapters.flatMap(t => t.sockets) return this.adapters.flatMap(t => t.sockets)
@@ -88,15 +88,4 @@ export class MultiAdapter {
adapter.send(message) adapter.send(message)
} }
} }
onMessage(cb: RelayMessageSub) {
return trackUnsubscribers(
this._unsubscribers,
this.adapters.map(a => a.onMessage(cb)),
)
}
cleanup() {
this._unsubscribers.splice(0).forEach(call)
}
} }
+4 -4
View File
@@ -1,8 +1,8 @@
import {sleep} from "@welshman/lib" import {on, sleep} from "@welshman/lib"
import type {SignedEvent, StampedEvent} from "@welshman/util" import type {SignedEvent, StampedEvent} from "@welshman/util"
import {makeEvent, CLIENT_AUTH} from "@welshman/util" import {makeEvent, CLIENT_AUTH} from "@welshman/util"
import {isRelayAuthMessage, isRelayOkMessage, RelayMessage} from "./message.js" import {isRelayAuthMessage, isRelayOkMessage, RelayMessage} from "./message.js"
import {Socket, SocketStatus, SocketUnsubscriber} from "./socket.js" import {Socket, SocketStatus, SocketEventType, SocketUnsubscriber} from "./socket.js"
export const makeAuthEvent = (url: string, challenge: string) => export const makeAuthEvent = (url: string, challenge: string) =>
makeEvent(CLIENT_AUTH, { makeEvent(CLIENT_AUTH, {
@@ -44,7 +44,7 @@ export class AuthManager {
readonly options: AuthManagerOptions, readonly options: AuthManagerOptions,
) { ) {
this._unsubscribers.push( this._unsubscribers.push(
socket.onMessage((message: RelayMessage) => { on(socket, SocketEventType.Receive, (message: RelayMessage) => {
if (isRelayOkMessage(message)) { if (isRelayOkMessage(message)) {
const [_, id, ok, details] = message const [_, id, ok, details] = message
@@ -75,7 +75,7 @@ export class AuthManager {
) )
this._unsubscribers.push( this._unsubscribers.push(
socket.onStatus(status => { on(socket, SocketEventType.Status, (status: SocketStatus) => {
if (status === SocketStatus.Closed) { if (status === SocketStatus.Closed) {
this.challenge = undefined this.challenge = undefined
this.request = undefined this.request = undefined
+31 -111
View File
@@ -1,7 +1,11 @@
import WebSocket from "isomorphic-ws" import WebSocket from "isomorphic-ws"
import {remove, now, ago, TaskQueue} from "@welshman/lib" import EventEmitter from "events"
import TypedEventEmitter, {EventMap} from "typed-emitter"
import {on, now, ago, TaskQueue} from "@welshman/lib"
import type {RelayMessage, ClientMessage} from "./message.js" import type {RelayMessage, ClientMessage} from "./message.js"
type TypedEmitter<T extends EventMap> = TypedEventEmitter.default<T>
export enum SocketStatus { export enum SocketStatus {
Open = "socket:status:open", Open = "socket:status:open",
Opening = "socket:status:opening", Opening = "socket:status:opening",
@@ -14,81 +18,39 @@ export enum SocketStatus {
export enum SocketEventType { export enum SocketEventType {
Error = "socket:event:error", Error = "socket:event:error",
Status = "socket:event:status", Status = "socket:event:status",
Message = "socket:event:message", Send = "socket:event:send",
Receive = "socket:event:receive",
} }
export type SocketErrorEvent = { export type SocketEvents = {
type: SocketEventType.Error [SocketEventType.Error]: (error: string, url: string) => void
error: string [SocketEventType.Status]: (status: SocketStatus, url: string) => void
[SocketEventType.Send]: (message: ClientMessage, url: string) => void
[SocketEventType.Receive]: (message: RelayMessage, url: string) => void
} }
export type SocketStatusEvent = {
type: SocketEventType.Status
status: SocketStatus
}
export type SocketMessageEvent = {
type: SocketEventType.Message
message: RelayMessage
}
export type SocketEvent = SocketErrorEvent | SocketStatusEvent | SocketMessageEvent
export const makeSocketErrorEvent = (error: string): SocketErrorEvent => ({
type: SocketEventType.Error,
error,
})
export const makeSocketStatusEvent = (status: SocketStatus): SocketStatusEvent => ({
type: SocketEventType.Status,
status,
})
export const makeSocketMessageEvent = (message: RelayMessage): SocketMessageEvent => ({
type: SocketEventType.Message,
message,
})
export const isSocketErrorEvent = (event: SocketEvent): event is SocketErrorEvent =>
event.type === SocketEventType.Error
export const isSocketStatusEvent = (event: SocketEvent): event is SocketStatusEvent =>
event.type === SocketEventType.Status
export const isSocketMessageEvent = (event: SocketEvent): event is SocketMessageEvent =>
event.type === SocketEventType.Message
export type SocketSendSubscriber = (message: ClientMessage) => void
export type SocketRecvSubscriber = (event: SocketEvent) => void
export type SocketUnsubscriber = () => void export type SocketUnsubscriber = () => void
export class Socket { export class Socket extends (EventEmitter as new () => TypedEmitter<SocketEvents>) {
_ws?: WebSocket _ws?: WebSocket
_sendSubs: SocketSendSubscriber[] = []
_recvSubs: SocketRecvSubscriber[] = []
_sendQueue: TaskQueue<ClientMessage> _sendQueue: TaskQueue<ClientMessage>
_recvQueue: TaskQueue<SocketEvent> _recvQueue: TaskQueue<RelayMessage>
constructor(readonly url: string) { constructor(readonly url: string) {
super()
this._sendQueue = new TaskQueue<ClientMessage>({ this._sendQueue = new TaskQueue<ClientMessage>({
batchSize: 50, batchSize: 50,
processItem: (message: ClientMessage) => { processItem: (message: ClientMessage) => {
this._ws?.send(JSON.stringify(message)) this._ws?.send(JSON.stringify(message))
this.emit(SocketEventType.Send, message, this.url)
for (const cb of this._sendSubs) {
cb(message)
}
}, },
}) })
this._recvQueue = new TaskQueue<SocketEvent>({ this._recvQueue = new TaskQueue<RelayMessage>({
batchSize: 50, batchSize: 50,
processItem: (event: SocketEvent) => { processItem: (message: RelayMessage) => {
for (const cb of this._recvSubs) { this.emit(SocketEventType.Receive, message, this.url)
cb(event)
}
}, },
}) })
} }
@@ -100,17 +62,17 @@ export class Socket {
try { try {
this._ws = new WebSocket(this.url) this._ws = new WebSocket(this.url)
this._recvQueue.push(makeSocketStatusEvent(SocketStatus.Opening)) this.emit(SocketEventType.Status, SocketStatus.Opening, this.url)
this._ws.onopen = () => this._recvQueue.push(makeSocketStatusEvent(SocketStatus.Open)) this._ws.onopen = () => this.emit(SocketEventType.Status, SocketStatus.Open, this.url)
this._ws.onerror = () => { this._ws.onerror = () => {
this._recvQueue.push(makeSocketStatusEvent(SocketStatus.Error)) this.emit(SocketEventType.Status, SocketStatus.Error, this.url)
this._ws = undefined this._ws = undefined
} }
this._ws.onclose = () => { this._ws.onclose = () => {
this._recvQueue.push(makeSocketStatusEvent(SocketStatus.Closed)) this.emit(SocketEventType.Status, SocketStatus.Closed, this.url)
this._ws = undefined this._ws = undefined
} }
@@ -121,16 +83,16 @@ export class Socket {
const message = JSON.parse(data) const message = JSON.parse(data)
if (Array.isArray(message)) { if (Array.isArray(message)) {
this._recvQueue.push(makeSocketMessageEvent(message as RelayMessage)) this._recvQueue.push(message as RelayMessage)
} else { } else {
this._recvQueue.push(makeSocketErrorEvent("Invalid message received")) this.emit(SocketEventType.Error, "Invalid message received", this.url)
} }
} catch (e) { } catch (e) {
this._recvQueue.push(makeSocketErrorEvent("Invalid message received")) this.emit(SocketEventType.Error, "Invalid message received", this.url)
} }
} }
} catch (e) { } catch (e) {
this._recvQueue.push(makeSocketStatusEvent(SocketStatus.Invalid)) this.emit(SocketEventType.Status, SocketStatus.Invalid, this.url)
} }
} }
@@ -147,60 +109,18 @@ export class Socket {
cleanup = () => { cleanup = () => {
this.close() this.close()
this._recvSubs = []
this._recvQueue.clear() this._recvQueue.clear()
this._sendSubs = []
this._sendQueue.clear() this._sendQueue.clear()
} }
send = (message: ClientMessage) => { send = (message: ClientMessage) => {
this._sendQueue.push(message) this._sendQueue.push(message)
} }
onSend = (cb: SocketSendSubscriber) => {
this._sendSubs.push(cb)
return () => {
this._sendSubs = remove(cb, this._sendSubs)
}
}
subscribe = (cb: SocketRecvSubscriber) => {
this._recvSubs.push(cb)
return () => {
this._recvSubs = remove(cb, this._recvSubs)
}
}
onError = (cb: (error: string) => void) => {
return this.subscribe((event: SocketEvent) => {
if (isSocketErrorEvent(event)) {
cb(event.error)
}
})
}
onStatus = (cb: (status: SocketStatus) => void) => {
return this.subscribe((event: SocketEvent) => {
if (isSocketStatusEvent(event)) {
cb(event.status)
}
})
}
onMessage = (cb: (message: RelayMessage) => void) => {
return this.subscribe((event: SocketEvent) => {
if (isSocketMessageEvent(event)) {
cb(event.message)
}
})
}
} }
export const socketPolicySendWhenOpen = (socket: Socket) => { export const socketPolicySendWhenOpen = (socket: Socket) => {
// Pause sending messages when the socket isn't open // Pause sending messages when the socket isn't open
const unsubscribe = socket.onStatus(newStatus => { const unsubscribe = on(socket, SocketEventType.Status, newStatus => {
if (newStatus === SocketStatus.Open) { if (newStatus === SocketStatus.Open) {
socket._sendQueue.start() socket._sendQueue.start()
} else { } else {
@@ -215,7 +135,7 @@ export const socketPolicyConnectOnSend = (socket: Socket) => {
let lastError = 0 let lastError = 0
let currentStatus = SocketStatus.Closed let currentStatus = SocketStatus.Closed
const unsubscribeOnStatus = socket.onStatus(newStatus => { const unsubscribeOnStatus = on(socket, SocketEventType.Status, (newStatus: SocketStatus) => {
// Keep track of the most recent error // Keep track of the most recent error
if (newStatus === SocketStatus.Error) { if (newStatus === SocketStatus.Error) {
lastError = now() lastError = now()
@@ -225,7 +145,7 @@ export const socketPolicyConnectOnSend = (socket: Socket) => {
currentStatus = newStatus currentStatus = newStatus
}) })
const unsubscribeOnSend = socket.onSend(message => { const unsubscribeOnSend = on(socket, SocketEventType.Send, (message: ClientMessage) => {
// When a new message is sent, make sure the socket is open (unless there was a recent error) // When a new message is sent, make sure the socket is open (unless there was a recent error)
if (currentStatus === SocketStatus.Closed && now() - lastError < ago(30)) { if (currentStatus === SocketStatus.Closed && now() - lastError < ago(30)) {
socket.open() socket.open()