Unapply socket policies on cleanup
This commit is contained in:
@@ -14,16 +14,8 @@ import {
|
|||||||
isRelayClosed,
|
isRelayClosed,
|
||||||
isRelayNegErr,
|
isRelayNegErr,
|
||||||
} from "./message.js"
|
} from "./message.js"
|
||||||
import {Socket, SocketStatus, SocketEvent} from "./socket.js"
|
import {Socket, SocketStatus, SocketEvent, SocketPolicy} from "./socket.js"
|
||||||
import {AuthStatus, AuthStateEvent} from "./auth.js"
|
import {AuthStatus, AuthStateEvent} from "./auth.js"
|
||||||
import {Unsubscriber} from "./util.js"
|
|
||||||
|
|
||||||
/**
|
|
||||||
* The contract for socket policies
|
|
||||||
* @param socket - a Socket object
|
|
||||||
* @return a cleanup function
|
|
||||||
*/
|
|
||||||
export type SocketPolicy = (socket: Socket) => Unsubscriber
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Sends a ping message every so often to ensure connection health
|
* Sends a ping message every so often to ensure connection health
|
||||||
@@ -260,7 +252,7 @@ export const makeSocketPolicyAuth = (options: SocketPolicyAuthOptions) => (socke
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
export const defaultSocketPolicies = [
|
export const defaultSocketPolicies: SocketPolicy[] = [
|
||||||
socketPolicyPing,
|
socketPolicyPing,
|
||||||
socketPolicyAuthBuffer,
|
socketPolicyAuthBuffer,
|
||||||
socketPolicyConnectOnSend,
|
socketPolicyConnectOnSend,
|
||||||
|
|||||||
@@ -3,16 +3,6 @@ import {normalizeRelayUrl} from "@welshman/util"
|
|||||||
import {Socket} from "./socket.js"
|
import {Socket} from "./socket.js"
|
||||||
import {defaultSocketPolicies} from "./policy.js"
|
import {defaultSocketPolicies} from "./policy.js"
|
||||||
|
|
||||||
export const makeSocket = (url: string, policies = defaultSocketPolicies) => {
|
|
||||||
const socket = new Socket(url)
|
|
||||||
|
|
||||||
for (const applyPolicy of policies) {
|
|
||||||
applyPolicy(socket)
|
|
||||||
}
|
|
||||||
|
|
||||||
return socket
|
|
||||||
}
|
|
||||||
|
|
||||||
export type PoolSubscription = (socket: Socket) => void
|
export type PoolSubscription = (socket: Socket) => void
|
||||||
|
|
||||||
export type PoolOptions = {
|
export type PoolOptions = {
|
||||||
@@ -44,7 +34,7 @@ export class Pool {
|
|||||||
return this.options.makeSocket(url)
|
return this.options.makeSocket(url)
|
||||||
}
|
}
|
||||||
|
|
||||||
return makeSocket(url)
|
return new Socket(url, defaultSocketPolicies)
|
||||||
}
|
}
|
||||||
|
|
||||||
get(_url: string): Socket {
|
get(_url: string): Socket {
|
||||||
|
|||||||
@@ -1,8 +1,9 @@
|
|||||||
import WebSocket from "isomorphic-ws"
|
import WebSocket from "isomorphic-ws"
|
||||||
import EventEmitter from "events"
|
import EventEmitter from "events"
|
||||||
import {TaskQueue} from "@welshman/lib"
|
import {TaskQueue, call} from "@welshman/lib"
|
||||||
import {RelayMessage, ClientMessage} from "./message.js"
|
import {RelayMessage, ClientMessage} from "./message.js"
|
||||||
import {AuthState} from "./auth.js"
|
import {AuthState} from "./auth.js"
|
||||||
|
import {Unsubscriber} from "./util.js"
|
||||||
|
|
||||||
export enum SocketStatus {
|
export enum SocketStatus {
|
||||||
Open = "open",
|
Open = "open",
|
||||||
@@ -30,18 +31,24 @@ export type SocketEvents = {
|
|||||||
[SocketEvent.Receiving]: (message: RelayMessage, url: string) => void
|
[SocketEvent.Receiving]: (message: RelayMessage, url: string) => void
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export type SocketPolicy = (socket: Socket) => Unsubscriber
|
||||||
|
|
||||||
export class Socket extends EventEmitter {
|
export class Socket extends EventEmitter {
|
||||||
static batchSize = 20
|
static batchSize = 20
|
||||||
static batchDelay = 100
|
static batchDelay = 100
|
||||||
|
|
||||||
auth: AuthState
|
auth: AuthState
|
||||||
status = SocketStatus.Closed
|
status = SocketStatus.Closed
|
||||||
|
unsubscribers: Unsubscriber[]
|
||||||
|
|
||||||
_ws?: WebSocket
|
_ws?: WebSocket
|
||||||
_sendQueue: TaskQueue<ClientMessage>
|
_sendQueue: TaskQueue<ClientMessage>
|
||||||
_recvQueue: TaskQueue<RelayMessage>
|
_recvQueue: TaskQueue<RelayMessage>
|
||||||
|
|
||||||
constructor(readonly url: string) {
|
constructor(
|
||||||
|
readonly url: string,
|
||||||
|
readonly policies: SocketPolicy[] = [],
|
||||||
|
) {
|
||||||
super()
|
super()
|
||||||
|
|
||||||
this.auth = new AuthState(this)
|
this.auth = new AuthState(this)
|
||||||
@@ -69,6 +76,7 @@ export class Socket extends EventEmitter {
|
|||||||
|
|
||||||
this._sendQueue.stop()
|
this._sendQueue.stop()
|
||||||
this.setMaxListeners(1000)
|
this.setMaxListeners(1000)
|
||||||
|
this.unsubscribers = policies.map(p => p(this))
|
||||||
}
|
}
|
||||||
|
|
||||||
open = () => {
|
open = () => {
|
||||||
@@ -135,6 +143,7 @@ export class Socket extends EventEmitter {
|
|||||||
cleanup = () => {
|
cleanup = () => {
|
||||||
this.close()
|
this.close()
|
||||||
this.auth.cleanup()
|
this.auth.cleanup()
|
||||||
|
this.unsubscribers.forEach(call)
|
||||||
this._recvQueue.clear()
|
this._recvQueue.clear()
|
||||||
this._sendQueue.clear()
|
this._sendQueue.clear()
|
||||||
this.removeAllListeners()
|
this.removeAllListeners()
|
||||||
|
|||||||
Reference in New Issue
Block a user