Improve auth buffering policy
This commit is contained in:
@@ -3,9 +3,7 @@ import { describe, expect, it, vi, beforeEach, afterEach } from "vitest"
|
||||
import { Socket, SocketStatus, SocketEvent } from "../src/socket"
|
||||
import { AuthStatus, AuthStateEvent } from "../src/auth"
|
||||
import {
|
||||
socketPolicySendWhenOpen,
|
||||
socketPolicyDeferOnAuth,
|
||||
socketPolicyRetryAuthRequired,
|
||||
socketPolicyAuthBuffer,
|
||||
socketPolicyConnectOnSend,
|
||||
socketPolicyCloseOnTimeout,
|
||||
socketPolicyReopenActive
|
||||
@@ -41,9 +39,9 @@ describe('policy', () => {
|
||||
vi.clearAllMocks()
|
||||
})
|
||||
|
||||
describe("socketPolicyDeferOnAuth", () => {
|
||||
describe("socketPolicyAuthBuffer", () => {
|
||||
it("should buffer messages when not authenticated", () => {
|
||||
const cleanup = socketPolicyDeferOnAuth(socket)
|
||||
const cleanup = socketPolicyAuthBuffer(socket)
|
||||
const removeSpy = vi.spyOn(socket._sendQueue, 'remove')
|
||||
|
||||
socket.emit(SocketEvent.Receive, ["AUTH", "challenge"])
|
||||
@@ -67,7 +65,7 @@ describe('policy', () => {
|
||||
})
|
||||
|
||||
it("should send buffered messages when auth succeeds", () => {
|
||||
const cleanup = socketPolicyDeferOnAuth(socket)
|
||||
const cleanup = socketPolicyAuthBuffer(socket)
|
||||
const sendSpy = vi.spyOn(socket, 'send')
|
||||
|
||||
socket.emit(SocketEvent.Receive, ["AUTH", "challenge"])
|
||||
@@ -89,7 +87,7 @@ describe('policy', () => {
|
||||
})
|
||||
|
||||
it("should handle CLOSE messages properly", () => {
|
||||
const cleanup = socketPolicyDeferOnAuth(socket)
|
||||
const cleanup = socketPolicyAuthBuffer(socket)
|
||||
const removeSpy = vi.spyOn(socket._sendQueue, 'remove')
|
||||
|
||||
socket.emit(SocketEvent.Receive, ["AUTH", "challenge"])
|
||||
@@ -108,11 +106,9 @@ describe('policy', () => {
|
||||
|
||||
cleanup()
|
||||
})
|
||||
})
|
||||
|
||||
describe("socketPolicyRetryAuthRequired", () => {
|
||||
it("should retry events once when auth-required", () => {
|
||||
const cleanup = socketPolicyRetryAuthRequired(socket)
|
||||
const cleanup = socketPolicyAuthBuffer(socket)
|
||||
const sendSpy = vi.spyOn(socket, 'send')
|
||||
|
||||
// Send an event
|
||||
@@ -135,7 +131,7 @@ describe('policy', () => {
|
||||
})
|
||||
|
||||
it("should retry REQ once when auth-required", () => {
|
||||
const cleanup = socketPolicyRetryAuthRequired(socket)
|
||||
const cleanup = socketPolicyAuthBuffer(socket)
|
||||
const sendSpy = vi.spyOn(socket, 'send')
|
||||
|
||||
// Send a REQ
|
||||
@@ -158,7 +154,7 @@ describe('policy', () => {
|
||||
})
|
||||
|
||||
it("should not retry AUTH_JOIN events", () => {
|
||||
const cleanup = socketPolicyRetryAuthRequired(socket)
|
||||
const cleanup = socketPolicyAuthBuffer(socket)
|
||||
const sendSpy = vi.spyOn(socket, 'send')
|
||||
|
||||
// Send an AUTH_JOIN event
|
||||
@@ -175,7 +171,7 @@ describe('policy', () => {
|
||||
})
|
||||
|
||||
it("should clear pending messages on successful response", () => {
|
||||
const cleanup = socketPolicyRetryAuthRequired(socket)
|
||||
const cleanup = socketPolicyAuthBuffer(socket)
|
||||
const sendSpy = vi.spyOn(socket, 'send')
|
||||
|
||||
// Send an event
|
||||
|
||||
@@ -66,10 +66,14 @@ export class AuthState extends EventEmitter {
|
||||
if (isRelayAuth(message)) {
|
||||
const [_, challenge] = message
|
||||
|
||||
this.challenge = challenge
|
||||
this.request = undefined
|
||||
this.details = undefined
|
||||
this.setStatus(AuthStatus.Requested)
|
||||
// Sometimes relays send the same challenge multiple times, no need to
|
||||
// respond to it twice
|
||||
if (challenge !== this.challenge) {
|
||||
this.challenge = challenge
|
||||
this.request = undefined
|
||||
this.details = undefined
|
||||
this.setStatus(AuthStatus.Requested)
|
||||
}
|
||||
}
|
||||
}),
|
||||
on(socket, SocketEvent.Sending, (message: RelayMessage) => {
|
||||
|
||||
+42
-88
@@ -1,4 +1,4 @@
|
||||
import {on, always, call, sleep, spec, ago, now} from "@welshman/lib"
|
||||
import {on, nthEq, always, call, sleep, spec, ago, now} from "@welshman/lib"
|
||||
import {AUTH_JOIN, StampedEvent, SignedEvent} from "@welshman/util"
|
||||
import {
|
||||
ClientMessage,
|
||||
@@ -6,118 +6,73 @@ import {
|
||||
isClientClose,
|
||||
isClientEvent,
|
||||
isClientReq,
|
||||
isClientNegClose,
|
||||
ClientMessageType,
|
||||
RelayMessage,
|
||||
isRelayOk,
|
||||
isRelayEose,
|
||||
isRelayClosed,
|
||||
} from "./message.js"
|
||||
import {Socket, SocketStatus, SocketEvent} from "./socket.js"
|
||||
import {AuthStatus, AuthStateEvent} from "./auth.js"
|
||||
|
||||
/**
|
||||
* Defers sending messages when a challenge has been presented and not answered yet
|
||||
* Handles auth-related message management:
|
||||
* - Defers sending messages when a challenge is pending
|
||||
* - Re-enqueues event/req messages once if rejected due to auth-required
|
||||
* @param socket - a Socket object
|
||||
* @return a cleanup function
|
||||
*/
|
||||
export const socketPolicyDeferOnAuth = (socket: Socket) => {
|
||||
const buffer: ClientMessage[] = []
|
||||
const okStatuses = [AuthStatus.None, AuthStatus.Ok]
|
||||
export const socketPolicyAuthBuffer = (socket: Socket) => {
|
||||
const {None, Ok, DeniedSignature, Forbidden} = AuthStatus
|
||||
const terminalStatuses = [Ok, DeniedSignature, Forbidden]
|
||||
|
||||
let buffer: ClientMessage[] = []
|
||||
|
||||
const unsubscribers = [
|
||||
// Pause sending certain messages when we're not authenticated
|
||||
on(socket, SocketEvent.Sending, (message: ClientMessage) => {
|
||||
// If we're closing a request, but it never got sent, remove both from the queue
|
||||
// Otherwise, always send CLOSE
|
||||
if (isClientClose(message)) {
|
||||
const req = buffer.find(spec([ClientMessageType.Req, message[1]]))
|
||||
|
||||
if (req) {
|
||||
socket._sendQueue.remove(req)
|
||||
socket._sendQueue.remove(message)
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// Always allow sending auth
|
||||
if (isClientAuth(message)) return
|
||||
|
||||
// Always allow sending join requests
|
||||
if (isClientEvent(message) && message[1].kind === AUTH_JOIN) return
|
||||
|
||||
// If we're not ok, remove the message and save it for later
|
||||
if (!okStatuses.includes(socket.auth.status)) {
|
||||
buffer.push(message)
|
||||
socket._sendQueue.remove(message)
|
||||
// If the auth flow is complete, no need to buffer anymore
|
||||
if (terminalStatuses.includes(socket.auth.status)) return
|
||||
|
||||
// If the client is closing a req, remove both from our buffer
|
||||
// Otherwise, if auth isn't done, hang on to recent messages in case we need to replay them
|
||||
if (isClientClose(message) || isClientNegClose(message)) {
|
||||
buffer = buffer.filter(nthEq(1, message[1]))
|
||||
} else {
|
||||
buffer = buffer.slice(-50).concat([message])
|
||||
}
|
||||
}),
|
||||
on(socket, SocketEvent.Receiving, (message: RelayMessage) => {
|
||||
// If the client is closing a request during auth, don't tell the caller, we'll retry it
|
||||
if (isRelayClosed(message) && message[2]?.startsWith('auth-required:')) {
|
||||
socket._recvQueue.remove(message)
|
||||
}
|
||||
|
||||
// If we get an eose but we're in the middle of authenticating, wait
|
||||
if (isRelayEose(message) && ![None, Ok].includes(socket.auth.status)) {
|
||||
socket._recvQueue.remove(message)
|
||||
}
|
||||
|
||||
// If the client is rejecting an event during auth, don't tell the caller, we'll retry it
|
||||
if (isRelayOk(message) && !message[2] && message[3]?.startsWith('auth-required:')) {
|
||||
socket._recvQueue.remove(message)
|
||||
}
|
||||
}),
|
||||
// Send buffered messages when we get successful auth
|
||||
on(socket.auth, AuthStateEvent.Status, (status: AuthStatus) => {
|
||||
if (okStatuses.includes(status) && buffer.length > 0) {
|
||||
// Send buffered messages when we get successful auth. In any case, clear them out
|
||||
// if the auth flow is complete
|
||||
if (status === Ok) {
|
||||
for (const message of buffer.splice(0)) {
|
||||
socket.send(message)
|
||||
}
|
||||
}
|
||||
}),
|
||||
]
|
||||
|
||||
return () => {
|
||||
unsubscribers.forEach(call)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Re-enqueues event/req messages once if rejected due to auth-required
|
||||
* @param socket - a Socket object
|
||||
* @return a cleanup function
|
||||
*/
|
||||
export const socketPolicyRetryAuthRequired = (socket: Socket) => {
|
||||
const retried = new Set<string>()
|
||||
const pending = new Map<string, ClientMessage>()
|
||||
|
||||
const unsubscribers = [
|
||||
// Watch outgoing events and requests and keep a copy
|
||||
on(socket, SocketEvent.Send, (message: ClientMessage) => {
|
||||
if (isClientEvent(message)) {
|
||||
const [_, event] = message
|
||||
|
||||
if (!retried.has(event.id) && event.kind !== AUTH_JOIN) {
|
||||
pending.set(event.id, message)
|
||||
}
|
||||
}
|
||||
|
||||
if (isClientReq(message)) {
|
||||
const [_, id] = message
|
||||
|
||||
if (!retried.has(id)) {
|
||||
pending.set(id, message)
|
||||
}
|
||||
}
|
||||
}),
|
||||
// If a message is rejected with auth-required, re-enqueue it one time
|
||||
on(socket, SocketEvent.Receive, (message: RelayMessage) => {
|
||||
if (isRelayOk(message)) {
|
||||
const [_, id, ok, detail] = message
|
||||
const pendingMessage = pending.get(id)
|
||||
|
||||
if (pendingMessage && !ok && detail?.startsWith("auth-required:")) {
|
||||
socket.send(pendingMessage)
|
||||
retried.add(id)
|
||||
}
|
||||
|
||||
pending.delete(id)
|
||||
}
|
||||
|
||||
if (isRelayClosed(message)) {
|
||||
const [_, id, detail] = message
|
||||
const pendingMessage = pending.get(id)
|
||||
|
||||
if (pendingMessage && detail?.startsWith("auth-required:")) {
|
||||
socket.send(pendingMessage)
|
||||
retried.add(id)
|
||||
}
|
||||
|
||||
pending.delete(id)
|
||||
} else if (terminalStatuses.includes(socket.auth.status)) {
|
||||
buffer = []
|
||||
}
|
||||
}),
|
||||
]
|
||||
@@ -256,8 +211,7 @@ export const makeSocketPolicyAuth = (options: SocketPolicyAuthOptions) => (socke
|
||||
}
|
||||
|
||||
export const defaultSocketPolicies = [
|
||||
socketPolicyDeferOnAuth,
|
||||
socketPolicyRetryAuthRequired,
|
||||
socketPolicyAuthBuffer,
|
||||
socketPolicyConnectOnSend,
|
||||
socketPolicyCloseOnTimeout,
|
||||
socketPolicyReopenActive,
|
||||
|
||||
Reference in New Issue
Block a user