Improve connection management, re-send stuff after a connection gets closed

This commit is contained in:
Jon Staab
2024-12-16 16:09:58 -08:00
parent 887fbfc25d
commit d8005b7c10
5 changed files with 52 additions and 29 deletions
+16 -12
View File
@@ -9,12 +9,11 @@ import {ConnectionAuth} from './ConnectionAuth'
import {ConnectionSender} from './ConnectionSender' import {ConnectionSender} from './ConnectionSender'
export enum ConnectionStatus { export enum ConnectionStatus {
Ready = "ready", Open = "open",
Closed = "Closed", Closed = "Closed",
Closing = "Closing",
} }
const {Ready, Closed, Closing} = ConnectionStatus const {Open, Closed} = ConnectionStatus
export class Connection extends Emitter { export class Connection extends Emitter {
url: string url: string
@@ -23,7 +22,7 @@ export class Connection extends Emitter {
state: ConnectionState state: ConnectionState
stats: ConnectionStats stats: ConnectionStats
auth: ConnectionAuth auth: ConnectionAuth
status = Ready status = Open
constructor(url: string) { constructor(url: string) {
super() super()
@@ -44,22 +43,27 @@ export class Connection extends Emitter {
emit = (type: ConnectionEvent, ...args: any[]) => super.emit(type, this, ...args) emit = (type: ConnectionEvent, ...args: any[]) => super.emit(type, this, ...args)
send = async (message: Message) => { send = async (message: Message) => {
if (this.status !== Ready) { if (this.status !== Open) {
throw new Error(`Attempted to send message on ${this.status} connection`) throw new Error(`Attempted to send message on ${this.status} connection`)
} }
await this.socket.open()
this.sender.push(message) this.sender.push(message)
} }
close = async () => { open = () => {
this.status = Closing this.status = Open
this.socket.open()
await this.sender.close() this.sender.worker.resume()
await this.socket.close() }
close = () => {
this.status = Closed this.status = Closed
this.socket.close()
this.sender.worker.pause()
}
cleanup = () => {
this.close()
this.removeAllListeners() this.removeAllListeners()
} }
} }
+10 -11
View File
@@ -11,23 +11,23 @@ export class ConnectionSender {
constructor(readonly cxn: Connection) { constructor(readonly cxn: Connection) {
this.worker = new Worker({ this.worker = new Worker({
shouldDefer: ([verb, ...extra]: Message) => { shouldDefer: ([verb, ...extra]: Message) => {
// Always send CLOSE to clean up pending requests, even if the connection is closed
if (verb === 'CLOSE') return false
// If we're not connected, nothing we can do // If we're not connected, nothing we can do
if (this.cxn.socket.status !== SocketStatus.Open) return true if (cxn.socket.status !== SocketStatus.Open) return true
// Always allow sending AUTH // Always allow sending AUTH
if (verb === 'AUTH') return false if (verb === 'AUTH') return false
// Only close reqs that have been sent
if (verb === 'CLOSE') return !this.cxn.state.pendingRequests.has(extra[0])
// Always allow sending join requests // Always allow sending join requests
if (verb === 'EVENT' && extra[0].kind === AUTH_JOIN) return false if (verb === 'EVENT' && extra[0].kind === AUTH_JOIN) return false
// Wait for auth // Wait for auth
if (![AuthStatus.None, AuthStatus.Ok].includes(this.cxn.auth.status)) return true if (![AuthStatus.None, AuthStatus.Ok].includes(cxn.auth.status)) return true
// Limit concurrent requests // Limit concurrent requests
if (verb === 'REQ') return this.cxn.state.pendingRequests.size >= 8 if (verb === 'REQ') return cxn.state.pendingRequests.size >= 8
return false return false
}, },
@@ -39,15 +39,14 @@ export class ConnectionSender {
this.worker.buffer = this.worker.buffer.filter(m => !(m[0] === 'REQ' && m[1] === extra[0])) this.worker.buffer = this.worker.buffer.filter(m => !(m[0] === 'REQ' && m[1] === extra[0]))
} }
this.cxn.socket.send([verb, ...extra]) // Re-check socket status since we let CLOSE through
if (cxn.socket.status === SocketStatus.Open) {
cxn.socket.send([verb, ...extra])
}
}) })
} }
push = (message: Message) => { push = (message: Message) => {
this.worker.push(message) this.worker.push(message)
} }
close = async () => {
this.worker.pause()
}
} }
+24 -3
View File
@@ -1,3 +1,4 @@
import {sleep} from '@welshman/lib'
import {AUTH_JOIN} from '@welshman/util' import {AUTH_JOIN} from '@welshman/util'
import type {SignedEvent, Filter} from '@welshman/util' import type {SignedEvent, Filter} from '@welshman/util'
import type {Message} from './Socket' import type {Message} from './Socket'
@@ -20,7 +21,7 @@ export class ConnectionState {
pendingRequests = new Map<string, RequestState>() pendingRequests = new Map<string, RequestState>()
constructor(readonly cxn: Connection) { constructor(readonly cxn: Connection) {
cxn.on(ConnectionEvent.Send, (cxn: Connection, [verb, ...extra]: Message) => { cxn.sender.worker.addGlobalHandler(([verb, ...extra]: Message) => {
if (verb === 'REQ') { if (verb === 'REQ') {
const [reqId, ...filters] = extra const [reqId, ...filters] = extra
@@ -36,11 +37,11 @@ export class ConnectionState {
if (verb === 'EVENT') { if (verb === 'EVENT') {
const [event] = extra const [event] = extra
this.pendingPublishes.set(event.id, {sent: Date.now(), event: event.id}) this.pendingPublishes.set(event.id, {sent: Date.now(), event})
} }
}) })
cxn.on(ConnectionEvent.Receive, (cxn: Connection, [verb, ...extra]: Message) => { cxn.socket.worker.addGlobalHandler(([verb, ...extra]: Message) => {
if (verb === 'OK') { if (verb === 'OK') {
const [eventId, _ok, notice] = extra const [eventId, _ok, notice] = extra
const pub = this.pendingPublishes.get(eventId) const pub = this.pendingPublishes.get(eventId)
@@ -79,6 +80,8 @@ export class ConnectionState {
this.cxn.emit(ConnectionEvent.Notice, extra[1]) this.cxn.emit(ConnectionEvent.Notice, extra[1])
} }
} }
this.pendingRequests.delete(reqId)
} }
if (verb === 'NOTICE') { if (verb === 'NOTICE') {
@@ -87,5 +90,23 @@ export class ConnectionState {
this.cxn.emit(ConnectionEvent.Notice, notice) this.cxn.emit(ConnectionEvent.Notice, notice)
} }
}) })
// Whenever we reconnect, re-enqueue pending stuff. Delay this so that if a connection
// is flapping we're not sending too much noise.
cxn.on(ConnectionEvent.Close, async (cxn: Connection) => {
await sleep(10_000)
if (this.pendingRequests.size > 0 || this.pendingPublishes.size > 0) {
this.cxn.open()
}
for (const [reqId, req] of this.pendingRequests.entries()) {
this.cxn.send(['REQ', reqId, ...req.filters])
}
for (const [_, pub] of this.pendingPublishes.entries()) {
this.cxn.send(['EVENT', pub.event])
}
})
} }
} }
+1 -1
View File
@@ -33,7 +33,7 @@ export class Pool extends Emitter {
const connection = this.data.get(url) const connection = this.data.get(url)
if (connection) { if (connection) {
connection.close() connection.cleanup()
this.data.delete(url) this.data.delete(url)
} }
+1 -2
View File
@@ -72,14 +72,13 @@ export class Socket {
close = async () => { close = async () => {
this.worker.pause() this.worker.pause()
this.ws?.close() this.ws?.close()
this.ws = undefined
// Allow the socket to start closing before waiting // Allow the socket to start closing before waiting
await sleep(100) await sleep(100)
// Wait for the socket to fully close // Wait for the socket to fully close
await this.wait() await this.wait()
this.ws = undefined
} }
send = async (message: Message) => { send = async (message: Message) => {