diff --git a/packages/lib/src/Tools.ts b/packages/lib/src/Tools.ts index 465b21f..0445994 100644 --- a/packages/lib/src/Tools.ts +++ b/packages/lib/src/Tools.ts @@ -586,13 +586,23 @@ export const nthNe = xs[i] !== v /** Returns a function that checks if key/value pairs of x match all pairs in spec */ -export const spec = (values: Obj) => (x: Obj) => { - for (const [k, v] of Object.entries(values)) { - if (x[k] !== v) return false - } +export const spec = + (values: Obj | Array) => + (x: Obj | Array, ...args: unknown[]) => { + if (Array.isArray(values)) { + for (let i = 0; i < values.length; i++) { + if ((x as Array)[i] !== values[i]) { + return false + } + } + } else { + for (const [k, v] of Object.entries(values)) { + if ((x as Obj)[k] !== v) return false + } + } - return true -} + return true + } /** Returns a function that checks equality with value */ export const eq = diff --git a/packages/net/src/ConnectionSender.ts b/packages/net/src/ConnectionSender.ts index 285c366..5b8dce7 100644 --- a/packages/net/src/ConnectionSender.ts +++ b/packages/net/src/ConnectionSender.ts @@ -1,4 +1,4 @@ -import {Worker} from "@welshman/lib" +import {Worker, complement, spec} from "@welshman/lib" import {AUTH_JOIN} from "@welshman/util" import {SocketStatus} from "./Socket.js" import type {Message} from "./Socket.js" @@ -10,8 +10,10 @@ export class ConnectionSender { constructor(readonly cxn: Connection) { this.worker = new Worker({ - shouldDefer: ([verb, ...extra]: Message) => { - // Always send CLOSE to clean up pending requests, even if the connection is closed + shouldDefer: (message: Message) => { + const verb = message[0] + + // Always send CLOSE to clean up pending requests if (verb === "CLOSE") return false // If we're not connected, nothing we can do @@ -21,7 +23,7 @@ export class ConnectionSender { if (verb === "AUTH") return false // Always allow sending join requests - if (verb === "EVENT" && extra[0].kind === AUTH_JOIN) return false + if (verb === "EVENT" && message[1].kind === AUTH_JOIN) return false // Wait for auth if (![AuthStatus.None, AuthStatus.Ok].includes(cxn.auth.status)) return true @@ -33,20 +35,24 @@ export class ConnectionSender { }, }) - this.worker.addGlobalHandler(([verb, ...extra]: Message) => { - // If we ended up handling a CLOSE before we handled the REQ, don't send the REQ - if (verb === "CLOSE") { - this.worker.buffer = this.worker.buffer.filter(m => !(m[0] === "REQ" && m[1] === extra[0])) + this.worker.addGlobalHandler((message: Message) => { + const verb = message[0] + + // If we're closing something that never got sent, skip it + if (verb === "CLOSE" && !cxn.state.pendingRequests.has(message[1])) { + return } - // Re-check socket status since we let CLOSE through - if (cxn.socket.status === SocketStatus.Open) { - cxn.socket.send([verb, ...extra]) - } + cxn.socket.send(message) }) } push = (message: Message) => { + // If we ended up handling a CLOSE before we sent the REQ, don't send the REQ + if (message[0] === "CLOSE") { + this.worker.buffer = this.worker.buffer.filter(complement(spec(["REQ", message[1]]))) + } + this.worker.push(message) } }