Clean up policy a tad

This commit is contained in:
Jon Staab
2025-03-21 16:55:54 -07:00
parent f215354891
commit b985003333
+144 -150
View File
@@ -1,4 +1,4 @@
import {on, sleep, spec, ago, now} from "@welshman/lib" import {on, call, sleep, spec, ago, now} from "@welshman/lib"
import {AUTH_JOIN} from "@welshman/util" import {AUTH_JOIN} from "@welshman/util"
import { import {
ClientMessage, ClientMessage,
@@ -16,15 +16,17 @@ import {AuthState, AuthStatus, AuthStateEventType} from "./auth.js"
// Pause sending messages when the socket isn't open // Pause sending messages when the socket isn't open
export const socketPolicySendWhenOpen = (socket: Socket) => { export const socketPolicySendWhenOpen = (socket: Socket) => {
const unsubscribe = on(socket, SocketEventType.Status, (newStatus: SocketStatus) => { const unsubscribers = [
if (newStatus === SocketStatus.Open) { on(socket, SocketEventType.Status, (newStatus: SocketStatus) => {
socket._sendQueue.start() if (newStatus === SocketStatus.Open) {
} else { socket._sendQueue.start()
socket._sendQueue.stop() } else {
} socket._sendQueue.stop()
}) }
}),
]
return unsubscribe return () => unsubscribers.forEach(call)
} }
export const socketPolicyDeferOnAuth = (socket: Socket) => { export const socketPolicyDeferOnAuth = (socket: Socket) => {
@@ -32,46 +34,46 @@ export const socketPolicyDeferOnAuth = (socket: Socket) => {
const authState = new AuthState(socket) const authState = new AuthState(socket)
const okStatuses = [AuthStatus.None, AuthStatus.Ok] const okStatuses = [AuthStatus.None, AuthStatus.Ok]
// Pause sending certain messages when we're not authenticated const unsubscribers = [
const unsubscribeEnqueue = on(socket, SocketEventType.Enqueue, (message: ClientMessage) => { // Pause sending certain messages when we're not authenticated
// If we're closing a request, but it never got sent, remove both from the queue on(socket, SocketEventType.Enqueue, (message: ClientMessage) => {
// Otherwise, always send CLOSE // If we're closing a request, but it never got sent, remove both from the queue
if (isClientClose(message)) { // Otherwise, always send CLOSE
const req = buffer.find(spec([ClientMessageType.Req, message[1]])) if (isClientClose(message)) {
const req = buffer.find(spec([ClientMessageType.Req, message[1]]))
if (req) { if (req) {
socket._sendQueue.remove(req) socket._sendQueue.remove(req)
socket._sendQueue.remove(message)
}
return
}
// Always allow sending auth
if (isClientAuth(message)) return
// Always allow sending join requests
if (isClientEvent(message) && message[1].kind === AUTH_JOIN) return
// If we're not ok, remove the message and save it for later
if (!okStatuses.includes(authState.status)) {
buffer.push(message)
socket._sendQueue.remove(message) socket._sendQueue.remove(message)
} }
}),
return // Send buffered messages when we get successful auth
} on(authState, AuthStateEventType.Status, (status: AuthStatus) => {
if (okStatuses.includes(status) && buffer.length > 0) {
// Always allow sending auth for (const message of buffer.splice(0)) {
if (isClientAuth(message)) return socket.send(message)
}
// Always allow sending join requests
if (isClientEvent(message) && message[1].kind === AUTH_JOIN) return
// If we're not ok, remove the message and save it for later
if (!okStatuses.includes(authState.status)) {
buffer.push(message)
socket._sendQueue.remove(message)
}
})
// Send buffered messages when we get successful auth
const unsubscribeAuthStatus = on(authState, AuthStateEventType.Status, (status: AuthStatus) => {
if (okStatuses.includes(status) && buffer.length > 0) {
for (const message of buffer.splice(0)) {
socket.send(message)
} }
} }),
}) ]
return () => { return () => {
unsubscribeAuthStatus() unsubscribers.forEach(call)
unsubscribeEnqueue()
authState.cleanup() authState.cleanup()
} }
} }
@@ -80,95 +82,92 @@ export const socketPolicyRetryAuthRequired = (socket: Socket) => {
const retried = new Set<string>() const retried = new Set<string>()
const pending = new Map<string, ClientMessage>() const pending = new Map<string, ClientMessage>()
// Watch outgoing events and requests and keep a copy const unsubscribers = [
const unsubscribeSend = on(socket, SocketEventType.Send, (message: ClientMessage) => { // Watch outgoing events and requests and keep a copy
if (isClientEvent(message)) { on(socket, SocketEventType.Send, (message: ClientMessage) => {
const [_, event] = message if (isClientEvent(message)) {
const [_, event] = message
if (!retried.has(event.id) && event.kind !== AUTH_JOIN) { if (!retried.has(event.id) && event.kind !== AUTH_JOIN) {
pending.set(event.id, message) pending.set(event.id, message)
} }
}
if (isClientReq(message)) {
const [_, id] = message
if (!retried.has(id)) {
pending.set(id, message)
}
}
})
// If a message is rejected with auth-required, re-enqueue it one time
const unsubscribeReceive = on(socket, SocketEventType.Receive, (message: RelayMessage) => {
if (isRelayOk(message)) {
const [_, id, ok, detail] = message
const pendingMessage = pending.get(id)
if (pendingMessage && !ok && detail?.startsWith("auth-required:")) {
socket.send(pendingMessage)
retried.add(id)
} }
pending.delete(id) if (isClientReq(message)) {
} const [_, id] = message
if (isRelayClosed(message)) { if (!retried.has(id)) {
const [_, id, detail] = message pending.set(id, message)
const pendingMessage = pending.get(id) }
}
}),
// If a message is rejected with auth-required, re-enqueue it one time
on(socket, SocketEventType.Receive, (message: RelayMessage) => {
if (isRelayOk(message)) {
const [_, id, ok, detail] = message
const pendingMessage = pending.get(id)
if (pendingMessage && detail?.startsWith("auth-required:")) { if (pendingMessage && !ok && detail?.startsWith("auth-required:")) {
socket.send(pendingMessage) socket.send(pendingMessage)
retried.add(id) retried.add(id)
}
pending.delete(id)
} }
pending.delete(id) if (isRelayClosed(message)) {
} const [_, id, detail] = message
}) const pendingMessage = pending.get(id)
return () => { if (pendingMessage && detail?.startsWith("auth-required:")) {
unsubscribeSend() socket.send(pendingMessage)
unsubscribeReceive() retried.add(id)
} }
pending.delete(id)
}
}),
]
return () => unsubscribers.forEach(call)
} }
export const socketPolicyConnectOnSend = (socket: Socket) => { export const socketPolicyConnectOnSend = (socket: Socket) => {
let lastError = 0 let lastError = 0
let currentStatus = SocketStatus.Closed let currentStatus = SocketStatus.Closed
const unsubscribeStatus = on(socket, SocketEventType.Status, (newStatus: SocketStatus) => { const unsubscribers = [
// Keep track of the most recent error on(socket, SocketEventType.Status, (newStatus: SocketStatus) => {
if (newStatus === SocketStatus.Error) { // Keep track of the most recent error
lastError = now() if (newStatus === SocketStatus.Error) {
} lastError = now()
}
// Keep track of the current status // Keep track of the current status
currentStatus = newStatus currentStatus = newStatus
}) }),
on(socket, SocketEventType.Send, (message: ClientMessage) => {
// When a new message is sent, make sure the socket is open (unless there was a recent error)
if (currentStatus === SocketStatus.Closed && now() - lastError < ago(30)) {
socket.open()
}
}),
]
const unsubscribeSend = on(socket, SocketEventType.Send, (message: ClientMessage) => { return () => unsubscribers.forEach(call)
// When a new message is sent, make sure the socket is open (unless there was a recent error)
if (currentStatus === SocketStatus.Closed && now() - lastError < ago(30)) {
socket.open()
}
})
return () => {
unsubscribeStatus()
unsubscribeSend()
}
} }
export const socketPolicyCloseOnTimeout = (socket: Socket) => { export const socketPolicyCloseOnTimeout = (socket: Socket) => {
let lastActivity = 0 let lastActivity = 0
const unsubscribeSend = on(socket, SocketEventType.Send, (message: ClientMessage) => { const unsubscribers = [
lastActivity = now() on(socket, SocketEventType.Send, (message: ClientMessage) => {
}) lastActivity = now()
}),
const unsubscribeReceive = on(socket, SocketEventType.Receive, (message: RelayMessage) => { on(socket, SocketEventType.Receive, (message: RelayMessage) => {
lastActivity = now() lastActivity = now()
}) }),
]
const interval = setInterval(() => { const interval = setInterval(() => {
if (lastActivity < ago(30)) { if (lastActivity < ago(30)) {
@@ -177,8 +176,7 @@ export const socketPolicyCloseOnTimeout = (socket: Socket) => {
}, 3000) }, 3000)
return () => { return () => {
unsubscribeSend() unsubscribers.forEach(call)
unsubscribeReceive()
clearInterval(interval) clearInterval(interval)
} }
} }
@@ -188,47 +186,43 @@ export const socketPolicyReopenActive = (socket: Socket) => {
let lastOpen = 0 let lastOpen = 0
const unsubscribeStatus = on(socket, SocketEventType.Status, (newStatus: SocketStatus) => { const unsubscribers = [
// Keep track of the most recent error on(socket, SocketEventType.Status, (newStatus: SocketStatus) => {
if (newStatus === SocketStatus.Open) { // Keep track of the most recent error
lastOpen = Date.now() if (newStatus === SocketStatus.Open) {
} lastOpen = Date.now()
}
// If the socket closed and we have no error, reopen it but don't flap // If the socket closed and we have no error, reopen it but don't flap
if (newStatus === SocketStatus.Closed && pending.size) { if (newStatus === SocketStatus.Closed && pending.size) {
sleep(Math.max(0, 30_000 - (Date.now() - lastOpen))).then(() => { sleep(Math.max(0, 30_000 - (Date.now() - lastOpen))).then(() => {
for (const message of pending.values()) { for (const message of pending.values()) {
socket.send(message) socket.send(message)
} }
}) })
} }
}) }),
on(socket, SocketEventType.Send, (message: ClientMessage) => {
if (isClientEvent(message)) {
pending.set(message[1].id, message)
}
const unsubscribeSend = on(socket, SocketEventType.Send, (message: ClientMessage) => { if (isClientReq(message)) {
if (isClientEvent(message)) { pending.set(message[1], message)
pending.set(message[1].id, message) }
}
if (isClientReq(message)) { if (isClientClose(message)) {
pending.set(message[1], message) pending.delete(message[1])
} }
}),
on(socket, SocketEventType.Receive, (message: RelayMessage) => {
if (isRelayClosed(message) || isRelayOk(message)) {
pending.delete(message[1])
}
}),
]
if (isClientClose(message)) { return () => unsubscribers.forEach(call)
pending.delete(message[1])
}
})
const unsubscribeReceive = on(socket, SocketEventType.Receive, (message: RelayMessage) => {
if (isRelayClosed(message) || isRelayOk(message)) {
pending.delete(message[1])
}
})
return () => {
unsubscribeStatus()
unsubscribeSend()
unsubscribeReceive()
}
} }
export const defaultSocketPolicies = [ export const defaultSocketPolicies = [