diff --git a/packages/net/src/Connection.ts b/packages/net/src/Connection.ts index d52a7c8..044e0c5 100644 --- a/packages/net/src/Connection.ts +++ b/packages/net/src/Connection.ts @@ -9,12 +9,11 @@ import {ConnectionAuth} from './ConnectionAuth' import {ConnectionSender} from './ConnectionSender' export enum ConnectionStatus { - Ready = "ready", + Open = "open", Closed = "Closed", - Closing = "Closing", } -const {Ready, Closed, Closing} = ConnectionStatus +const {Open, Closed} = ConnectionStatus export class Connection extends Emitter { url: string @@ -23,7 +22,7 @@ export class Connection extends Emitter { state: ConnectionState stats: ConnectionStats auth: ConnectionAuth - status = Ready + status = Open constructor(url: string) { super() @@ -44,22 +43,27 @@ export class Connection extends Emitter { emit = (type: ConnectionEvent, ...args: any[]) => super.emit(type, this, ...args) send = async (message: Message) => { - if (this.status !== Ready) { + if (this.status !== Open) { throw new Error(`Attempted to send message on ${this.status} connection`) } - await this.socket.open() - this.sender.push(message) } - close = async () => { - this.status = Closing - - await this.sender.close() - await this.socket.close() + open = () => { + this.status = Open + this.socket.open() + this.sender.worker.resume() + } + close = () => { this.status = Closed + this.socket.close() + this.sender.worker.pause() + } + + cleanup = () => { + this.close() this.removeAllListeners() } } diff --git a/packages/net/src/ConnectionSender.ts b/packages/net/src/ConnectionSender.ts index abeaf95..2bf7f56 100644 --- a/packages/net/src/ConnectionSender.ts +++ b/packages/net/src/ConnectionSender.ts @@ -11,23 +11,23 @@ export class ConnectionSender { constructor(readonly cxn: Connection) { this.worker = new Worker({ shouldDefer: ([verb, ...extra]: Message) => { + // Always send CLOSE to clean up pending requests, even if the connection is closed + if (verb === 'CLOSE') return false + // If we're not connected, nothing we can do - if (this.cxn.socket.status !== SocketStatus.Open) return true + if (cxn.socket.status !== SocketStatus.Open) return true // Always allow sending AUTH if (verb === 'AUTH') return false - // Only close reqs that have been sent - if (verb === 'CLOSE') return !this.cxn.state.pendingRequests.has(extra[0]) - // Always allow sending join requests if (verb === 'EVENT' && extra[0].kind === AUTH_JOIN) return false // Wait for auth - if (![AuthStatus.None, AuthStatus.Ok].includes(this.cxn.auth.status)) return true + if (![AuthStatus.None, AuthStatus.Ok].includes(cxn.auth.status)) return true // Limit concurrent requests - if (verb === 'REQ') return this.cxn.state.pendingRequests.size >= 8 + if (verb === 'REQ') return cxn.state.pendingRequests.size >= 8 return false }, @@ -39,15 +39,14 @@ export class ConnectionSender { this.worker.buffer = this.worker.buffer.filter(m => !(m[0] === 'REQ' && m[1] === extra[0])) } - this.cxn.socket.send([verb, ...extra]) + // Re-check socket status since we let CLOSE through + if (cxn.socket.status === SocketStatus.Open) { + cxn.socket.send([verb, ...extra]) + } }) } push = (message: Message) => { this.worker.push(message) } - - close = async () => { - this.worker.pause() - } } diff --git a/packages/net/src/ConnectionState.ts b/packages/net/src/ConnectionState.ts index 3682d67..458bc70 100644 --- a/packages/net/src/ConnectionState.ts +++ b/packages/net/src/ConnectionState.ts @@ -1,3 +1,4 @@ +import {sleep} from '@welshman/lib' import {AUTH_JOIN} from '@welshman/util' import type {SignedEvent, Filter} from '@welshman/util' import type {Message} from './Socket' @@ -20,7 +21,7 @@ export class ConnectionState { pendingRequests = new Map() constructor(readonly cxn: Connection) { - cxn.on(ConnectionEvent.Send, (cxn: Connection, [verb, ...extra]: Message) => { + cxn.sender.worker.addGlobalHandler(([verb, ...extra]: Message) => { if (verb === 'REQ') { const [reqId, ...filters] = extra @@ -36,11 +37,11 @@ export class ConnectionState { if (verb === 'EVENT') { const [event] = extra - this.pendingPublishes.set(event.id, {sent: Date.now(), event: event.id}) + this.pendingPublishes.set(event.id, {sent: Date.now(), event}) } }) - cxn.on(ConnectionEvent.Receive, (cxn: Connection, [verb, ...extra]: Message) => { + cxn.socket.worker.addGlobalHandler(([verb, ...extra]: Message) => { if (verb === 'OK') { const [eventId, _ok, notice] = extra const pub = this.pendingPublishes.get(eventId) @@ -79,6 +80,8 @@ export class ConnectionState { this.cxn.emit(ConnectionEvent.Notice, extra[1]) } } + + this.pendingRequests.delete(reqId) } if (verb === 'NOTICE') { @@ -87,5 +90,23 @@ export class ConnectionState { this.cxn.emit(ConnectionEvent.Notice, notice) } }) + + // Whenever we reconnect, re-enqueue pending stuff. Delay this so that if a connection + // is flapping we're not sending too much noise. + cxn.on(ConnectionEvent.Close, async (cxn: Connection) => { + await sleep(10_000) + + if (this.pendingRequests.size > 0 || this.pendingPublishes.size > 0) { + this.cxn.open() + } + + for (const [reqId, req] of this.pendingRequests.entries()) { + this.cxn.send(['REQ', reqId, ...req.filters]) + } + + for (const [_, pub] of this.pendingPublishes.entries()) { + this.cxn.send(['EVENT', pub.event]) + } + }) } } diff --git a/packages/net/src/Pool.ts b/packages/net/src/Pool.ts index 581f165..ffa84ef 100644 --- a/packages/net/src/Pool.ts +++ b/packages/net/src/Pool.ts @@ -33,7 +33,7 @@ export class Pool extends Emitter { const connection = this.data.get(url) if (connection) { - connection.close() + connection.cleanup() this.data.delete(url) } diff --git a/packages/net/src/Socket.ts b/packages/net/src/Socket.ts index 12d3313..ce22503 100644 --- a/packages/net/src/Socket.ts +++ b/packages/net/src/Socket.ts @@ -72,14 +72,13 @@ export class Socket { close = async () => { this.worker.pause() this.ws?.close() + this.ws = undefined // Allow the socket to start closing before waiting await sleep(100) // Wait for the socket to fully close await this.wait() - - this.ws = undefined } send = async (message: Message) => {