Improve spec, clean up connection sender a bit
This commit is contained in:
@@ -586,13 +586,23 @@ export const nthNe =
|
|||||||
xs[i] !== v
|
xs[i] !== v
|
||||||
|
|
||||||
/** Returns a function that checks if key/value pairs of x match all pairs in spec */
|
/** Returns a function that checks if key/value pairs of x match all pairs in spec */
|
||||||
export const spec = (values: Obj) => (x: Obj) => {
|
export const spec =
|
||||||
for (const [k, v] of Object.entries(values)) {
|
(values: Obj | Array<any>) =>
|
||||||
if (x[k] !== v) return false
|
(x: Obj | Array<any>, ...args: unknown[]) => {
|
||||||
}
|
if (Array.isArray(values)) {
|
||||||
|
for (let i = 0; i < values.length; i++) {
|
||||||
|
if ((x as Array<any>)[i] !== values[i]) {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
for (const [k, v] of Object.entries(values)) {
|
||||||
|
if ((x as Obj)[k] !== v) return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Returns a function that checks equality with value */
|
/** Returns a function that checks equality with value */
|
||||||
export const eq =
|
export const eq =
|
||||||
|
|||||||
@@ -1,4 +1,4 @@
|
|||||||
import {Worker} from "@welshman/lib"
|
import {Worker, complement, spec} from "@welshman/lib"
|
||||||
import {AUTH_JOIN} from "@welshman/util"
|
import {AUTH_JOIN} from "@welshman/util"
|
||||||
import {SocketStatus} from "./Socket.js"
|
import {SocketStatus} from "./Socket.js"
|
||||||
import type {Message} from "./Socket.js"
|
import type {Message} from "./Socket.js"
|
||||||
@@ -10,8 +10,10 @@ export class ConnectionSender {
|
|||||||
|
|
||||||
constructor(readonly cxn: Connection) {
|
constructor(readonly cxn: Connection) {
|
||||||
this.worker = new Worker({
|
this.worker = new Worker({
|
||||||
shouldDefer: ([verb, ...extra]: Message) => {
|
shouldDefer: (message: Message) => {
|
||||||
// Always send CLOSE to clean up pending requests, even if the connection is closed
|
const verb = message[0]
|
||||||
|
|
||||||
|
// Always send CLOSE to clean up pending requests
|
||||||
if (verb === "CLOSE") return false
|
if (verb === "CLOSE") return false
|
||||||
|
|
||||||
// If we're not connected, nothing we can do
|
// If we're not connected, nothing we can do
|
||||||
@@ -21,7 +23,7 @@ export class ConnectionSender {
|
|||||||
if (verb === "AUTH") return false
|
if (verb === "AUTH") return false
|
||||||
|
|
||||||
// Always allow sending join requests
|
// Always allow sending join requests
|
||||||
if (verb === "EVENT" && extra[0].kind === AUTH_JOIN) return false
|
if (verb === "EVENT" && message[1].kind === AUTH_JOIN) return false
|
||||||
|
|
||||||
// Wait for auth
|
// Wait for auth
|
||||||
if (![AuthStatus.None, AuthStatus.Ok].includes(cxn.auth.status)) return true
|
if (![AuthStatus.None, AuthStatus.Ok].includes(cxn.auth.status)) return true
|
||||||
@@ -33,20 +35,24 @@ export class ConnectionSender {
|
|||||||
},
|
},
|
||||||
})
|
})
|
||||||
|
|
||||||
this.worker.addGlobalHandler(([verb, ...extra]: Message) => {
|
this.worker.addGlobalHandler((message: Message) => {
|
||||||
// If we ended up handling a CLOSE before we handled the REQ, don't send the REQ
|
const verb = message[0]
|
||||||
if (verb === "CLOSE") {
|
|
||||||
this.worker.buffer = this.worker.buffer.filter(m => !(m[0] === "REQ" && m[1] === extra[0]))
|
// If we're closing something that never got sent, skip it
|
||||||
|
if (verb === "CLOSE" && !cxn.state.pendingRequests.has(message[1])) {
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Re-check socket status since we let CLOSE through
|
cxn.socket.send(message)
|
||||||
if (cxn.socket.status === SocketStatus.Open) {
|
|
||||||
cxn.socket.send([verb, ...extra])
|
|
||||||
}
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
push = (message: Message) => {
|
push = (message: Message) => {
|
||||||
|
// If we ended up handling a CLOSE before we sent the REQ, don't send the REQ
|
||||||
|
if (message[0] === "CLOSE") {
|
||||||
|
this.worker.buffer = this.worker.buffer.filter(complement(spec(["REQ", message[1]])))
|
||||||
|
}
|
||||||
|
|
||||||
this.worker.push(message)
|
this.worker.push(message)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user