diff --git a/packages/net/Connection.ts b/packages/net/Connection.ts index b43f022..de6ea90 100644 --- a/packages/net/Connection.ts +++ b/packages/net/Connection.ts @@ -24,7 +24,7 @@ export class Connection extends Emitter { createSender = () => { const worker = new Worker({ shouldDefer: (message: SocketMessage) => { - if (!this.socket.isReady()) { + if (!this.socket.isOpen()) { return true } @@ -91,13 +91,16 @@ export class Connection extends Emitter { this.emit('receive', this, message) } - ensureConnected = ({shouldReconnect = true}) => { - if (shouldReconnect && !this.socket.isHealthy()) { - this.disconnect() + ensureConnected = async ({shouldReconnect = true}) => { + const isUnhealthy = this.socket.isClosing() || this.socket.isClosed() + const noRecentFault = this.meta.lastFault < Date.now() - 60_000 + + if (shouldReconnect && isUnhealthy && noRecentFault) { + await this.disconnect() } - if (this.socket.isPending()) { - this.socket.connect() + if (this.socket.isNew()) { + await this.socket.connect() } } diff --git a/packages/net/ConnectionMeta.ts b/packages/net/ConnectionMeta.ts index 52b98a1..4f4e079 100644 --- a/packages/net/ConnectionMeta.ts +++ b/packages/net/ConnectionMeta.ts @@ -159,10 +159,13 @@ export class ConnectionMeta { getSpeed = () => this.responseCount ? this.responseTimer / this.responseCount : 0 getStatus = () => { + const socket = this.cxn.socket + if (this.authStatus === AuthStatus.Unauthorized) return ConnectionStatus.Unauthorized if (this.authStatus === AuthStatus.Forbidden) return ConnectionStatus.Forbidden + if (socket.isNew()) return ConnectionStatus.Closed if (this.lastFault && this.lastFault > this.lastOpen) return ConnectionStatus.Error - if (this.lastClose > this.lastOpen) return ConnectionStatus.Closed + if (socket.isClosed() || socket.isClosing()) return ConnectionStatus.Closed if (this.getSpeed() > 1000) return ConnectionStatus.Slow return ConnectionStatus.Ok diff --git a/packages/net/Socket.ts b/packages/net/Socket.ts index b7fec13..2651064 100644 --- a/packages/net/Socket.ts +++ b/packages/net/Socket.ts @@ -1,5 +1,5 @@ import WebSocket from "isomorphic-ws" -import {Deferred, defer} from '@welshman/lib' +import {sleep} from '@welshman/lib' export type Message = [string, ...any[]] @@ -19,50 +19,21 @@ export type SocketOpts = { } export class Socket { - url: string - ws?: WebSocket - ready: Deferred - failedToConnect = false + ws?: WebSocket | 'invalid' - constructor(url: string, readonly opts: SocketOpts) { - this.url = url - this.ready = defer() - } + constructor(readonly url: string, readonly opts: SocketOpts) {} - isPending() { - return !this.ws && !this.failedToConnect - } + isNew = () => this.ws === undefined - isConnecting() { - return this.ws?.readyState === WebSocket.CONNECTING - } + isInvalid = () => this.ws === 'invalid' - isReady() { - return this.ws?.readyState === WebSocket.OPEN - } + isConnecting = () => this.ws?.readyState === WebSocket.CONNECTING - isClosing() { - return this.ws?.readyState === WebSocket.CLOSING - } + isOpen = () => this.ws?.readyState === WebSocket.OPEN - isClosed() { - return this.ws?.readyState === WebSocket.CLOSED - } + isClosing = () => this.ws?.readyState === WebSocket.CLOSING - isHealthy() { - return this.isPending() || this.isConnecting() || this.isReady() - } - - onOpen = () => { - this.ready.resolve(true) - this.opts.onOpen() - } - - onError = () => { - this.failedToConnect = true - this.opts.onError() - this.disconnect() - } + isClosed = () => this.ws?.readyState === WebSocket.CLOSED onMessage = (event: {data: string}) => { try { @@ -78,41 +49,42 @@ export class Socket { } } - send = (message: any) => { - if (!this.ws) { - throw new Error('Send attempted before socket was opened') - } + send = (message: any) => this.ws.send(JSON.stringify(message)) - this.ws.send(JSON.stringify(message)) - } - - connect = () => { + connect = async () => { if (this.ws) { throw new Error(`Already attempted connection for ${this.url}`) } try { this.ws = new WebSocket(this.url) - this.ws.onopen = this.onOpen - this.ws.onerror = this.onError + this.ws.onopen = this.opts.onOpen + this.ws.onerror = this.opts.onError + this.ws.onclose = this.opts.onClose this.ws.onmessage = this.onMessage - this.ws.onclose = this.disconnect } catch (e) { - this.failedToConnect = true + this.ws = 'invalid' + this.opts.onError() + } + + while (this.isConnecting()) { + await sleep(100) } } - disconnect = () => { - if (this.ws) { - const currentWs = this.ws - - this.ready.finally(() => currentWs.close()) - this.ready = defer() - this.opts.onClose() - this.ws = undefined - - // Resolve a different instance of the promise - this.ready.resolve(false) + disconnect = async () => { + while (this.isConnecting()) { + await sleep(100) } + + if (this.isOpen()) { + this.ws.close() + } + + while (this.isClosing()) { + await sleep(100) + } + + this.ws = undefined } }