From c6847fa7c9f88a2c7334fdcea9867890da1fba41 Mon Sep 17 00:00:00 2001 From: Jon Staab Date: Tue, 5 May 2026 09:35:11 -0700 Subject: [PATCH] Unapply socket policies on cleanup --- packages/net/src/policy.ts | 12 ++---------- packages/net/src/pool.ts | 12 +----------- packages/net/src/socket.ts | 13 +++++++++++-- 3 files changed, 14 insertions(+), 23 deletions(-) diff --git a/packages/net/src/policy.ts b/packages/net/src/policy.ts index fe1da4d..f032d6d 100644 --- a/packages/net/src/policy.ts +++ b/packages/net/src/policy.ts @@ -14,16 +14,8 @@ import { isRelayClosed, isRelayNegErr, } from "./message.js" -import {Socket, SocketStatus, SocketEvent} from "./socket.js" +import {Socket, SocketStatus, SocketEvent, SocketPolicy} from "./socket.js" import {AuthStatus, AuthStateEvent} from "./auth.js" -import {Unsubscriber} from "./util.js" - -/** - * The contract for socket policies - * @param socket - a Socket object - * @return a cleanup function - */ -export type SocketPolicy = (socket: Socket) => Unsubscriber /** * Sends a ping message every so often to ensure connection health @@ -260,7 +252,7 @@ export const makeSocketPolicyAuth = (options: SocketPolicyAuthOptions) => (socke } } -export const defaultSocketPolicies = [ +export const defaultSocketPolicies: SocketPolicy[] = [ socketPolicyPing, socketPolicyAuthBuffer, socketPolicyConnectOnSend, diff --git a/packages/net/src/pool.ts b/packages/net/src/pool.ts index 4d56448..fcdc913 100644 --- a/packages/net/src/pool.ts +++ b/packages/net/src/pool.ts @@ -3,16 +3,6 @@ import {normalizeRelayUrl} from "@welshman/util" import {Socket} from "./socket.js" import {defaultSocketPolicies} from "./policy.js" -export const makeSocket = (url: string, policies = defaultSocketPolicies) => { - const socket = new Socket(url) - - for (const applyPolicy of policies) { - applyPolicy(socket) - } - - return socket -} - export type PoolSubscription = (socket: Socket) => void export type PoolOptions = { @@ -44,7 +34,7 @@ export class Pool { return this.options.makeSocket(url) } - return makeSocket(url) + return new Socket(url, defaultSocketPolicies) } get(_url: string): Socket { diff --git a/packages/net/src/socket.ts b/packages/net/src/socket.ts index 6b367a0..4107928 100644 --- a/packages/net/src/socket.ts +++ b/packages/net/src/socket.ts @@ -1,8 +1,9 @@ import WebSocket from "isomorphic-ws" import EventEmitter from "events" -import {TaskQueue} from "@welshman/lib" +import {TaskQueue, call} from "@welshman/lib" import {RelayMessage, ClientMessage} from "./message.js" import {AuthState} from "./auth.js" +import {Unsubscriber} from "./util.js" export enum SocketStatus { Open = "open", @@ -30,18 +31,24 @@ export type SocketEvents = { [SocketEvent.Receiving]: (message: RelayMessage, url: string) => void } +export type SocketPolicy = (socket: Socket) => Unsubscriber + export class Socket extends EventEmitter { static batchSize = 20 static batchDelay = 100 auth: AuthState status = SocketStatus.Closed + unsubscribers: Unsubscriber[] _ws?: WebSocket _sendQueue: TaskQueue _recvQueue: TaskQueue - constructor(readonly url: string) { + constructor( + readonly url: string, + readonly policies: SocketPolicy[] = [], + ) { super() this.auth = new AuthState(this) @@ -69,6 +76,7 @@ export class Socket extends EventEmitter { this._sendQueue.stop() this.setMaxListeners(1000) + this.unsubscribers = policies.map(p => p(this)) } open = () => { @@ -135,6 +143,7 @@ export class Socket extends EventEmitter { cleanup = () => { this.close() this.auth.cleanup() + this.unsubscribers.forEach(call) this._recvQueue.clear() this._sendQueue.clear() this.removeAllListeners()