Re-work socket and reconnection/status logic

This commit is contained in:
Jon Staab
2024-05-22 17:33:20 -07:00
parent f8d74f5984
commit c5e3f6d5a4
3 changed files with 46 additions and 68 deletions
+9 -6
View File
@@ -24,7 +24,7 @@ export class Connection extends Emitter {
createSender = () => { createSender = () => {
const worker = new Worker<SocketMessage>({ const worker = new Worker<SocketMessage>({
shouldDefer: (message: SocketMessage) => { shouldDefer: (message: SocketMessage) => {
if (!this.socket.isReady()) { if (!this.socket.isOpen()) {
return true return true
} }
@@ -91,13 +91,16 @@ export class Connection extends Emitter {
this.emit('receive', this, message) this.emit('receive', this, message)
} }
ensureConnected = ({shouldReconnect = true}) => { ensureConnected = async ({shouldReconnect = true}) => {
if (shouldReconnect && !this.socket.isHealthy()) { const isUnhealthy = this.socket.isClosing() || this.socket.isClosed()
this.disconnect() const noRecentFault = this.meta.lastFault < Date.now() - 60_000
if (shouldReconnect && isUnhealthy && noRecentFault) {
await this.disconnect()
} }
if (this.socket.isPending()) { if (this.socket.isNew()) {
this.socket.connect() await this.socket.connect()
} }
} }
+4 -1
View File
@@ -159,10 +159,13 @@ export class ConnectionMeta {
getSpeed = () => this.responseCount ? this.responseTimer / this.responseCount : 0 getSpeed = () => this.responseCount ? this.responseTimer / this.responseCount : 0
getStatus = () => { getStatus = () => {
const socket = this.cxn.socket
if (this.authStatus === AuthStatus.Unauthorized) return ConnectionStatus.Unauthorized if (this.authStatus === AuthStatus.Unauthorized) return ConnectionStatus.Unauthorized
if (this.authStatus === AuthStatus.Forbidden) return ConnectionStatus.Forbidden if (this.authStatus === AuthStatus.Forbidden) return ConnectionStatus.Forbidden
if (socket.isNew()) return ConnectionStatus.Closed
if (this.lastFault && this.lastFault > this.lastOpen) return ConnectionStatus.Error if (this.lastFault && this.lastFault > this.lastOpen) return ConnectionStatus.Error
if (this.lastClose > this.lastOpen) return ConnectionStatus.Closed if (socket.isClosed() || socket.isClosing()) return ConnectionStatus.Closed
if (this.getSpeed() > 1000) return ConnectionStatus.Slow if (this.getSpeed() > 1000) return ConnectionStatus.Slow
return ConnectionStatus.Ok return ConnectionStatus.Ok
+33 -61
View File
@@ -1,5 +1,5 @@
import WebSocket from "isomorphic-ws" import WebSocket from "isomorphic-ws"
import {Deferred, defer} from '@welshman/lib' import {sleep} from '@welshman/lib'
export type Message = [string, ...any[]] export type Message = [string, ...any[]]
@@ -19,50 +19,21 @@ export type SocketOpts = {
} }
export class Socket { export class Socket {
url: string ws?: WebSocket | 'invalid'
ws?: WebSocket
ready: Deferred<boolean>
failedToConnect = false
constructor(url: string, readonly opts: SocketOpts) { constructor(readonly url: string, readonly opts: SocketOpts) {}
this.url = url
this.ready = defer()
}
isPending() { isNew = () => this.ws === undefined
return !this.ws && !this.failedToConnect
}
isConnecting() { isInvalid = () => this.ws === 'invalid'
return this.ws?.readyState === WebSocket.CONNECTING
}
isReady() { isConnecting = () => this.ws?.readyState === WebSocket.CONNECTING
return this.ws?.readyState === WebSocket.OPEN
}
isClosing() { isOpen = () => this.ws?.readyState === WebSocket.OPEN
return this.ws?.readyState === WebSocket.CLOSING
}
isClosed() { isClosing = () => this.ws?.readyState === WebSocket.CLOSING
return this.ws?.readyState === WebSocket.CLOSED
}
isHealthy() { isClosed = () => this.ws?.readyState === WebSocket.CLOSED
return this.isPending() || this.isConnecting() || this.isReady()
}
onOpen = () => {
this.ready.resolve(true)
this.opts.onOpen()
}
onError = () => {
this.failedToConnect = true
this.opts.onError()
this.disconnect()
}
onMessage = (event: {data: string}) => { onMessage = (event: {data: string}) => {
try { try {
@@ -78,41 +49,42 @@ export class Socket {
} }
} }
send = (message: any) => { send = (message: any) => this.ws.send(JSON.stringify(message))
if (!this.ws) {
throw new Error('Send attempted before socket was opened')
}
this.ws.send(JSON.stringify(message)) connect = async () => {
}
connect = () => {
if (this.ws) { if (this.ws) {
throw new Error(`Already attempted connection for ${this.url}`) throw new Error(`Already attempted connection for ${this.url}`)
} }
try { try {
this.ws = new WebSocket(this.url) this.ws = new WebSocket(this.url)
this.ws.onopen = this.onOpen this.ws.onopen = this.opts.onOpen
this.ws.onerror = this.onError this.ws.onerror = this.opts.onError
this.ws.onclose = this.opts.onClose
this.ws.onmessage = this.onMessage this.ws.onmessage = this.onMessage
this.ws.onclose = this.disconnect
} catch (e) { } catch (e) {
this.failedToConnect = true this.ws = 'invalid'
this.opts.onError()
}
while (this.isConnecting()) {
await sleep(100)
} }
} }
disconnect = () => { disconnect = async () => {
if (this.ws) { while (this.isConnecting()) {
const currentWs = this.ws await sleep(100)
this.ready.finally(() => currentWs.close())
this.ready = defer()
this.opts.onClose()
this.ws = undefined
// Resolve a different instance of the promise
this.ready.resolve(false)
} }
if (this.isOpen()) {
this.ws.close()
}
while (this.isClosing()) {
await sleep(100)
}
this.ws = undefined
} }
} }