diff --git a/packages/net/__tests__/policy.test.ts b/packages/net/__tests__/policy.test.ts index 3049a94..5f9b0f1 100644 --- a/packages/net/__tests__/policy.test.ts +++ b/packages/net/__tests__/policy.test.ts @@ -3,9 +3,7 @@ import { describe, expect, it, vi, beforeEach, afterEach } from "vitest" import { Socket, SocketStatus, SocketEvent } from "../src/socket" import { AuthStatus, AuthStateEvent } from "../src/auth" import { - socketPolicySendWhenOpen, - socketPolicyDeferOnAuth, - socketPolicyRetryAuthRequired, + socketPolicyAuthBuffer, socketPolicyConnectOnSend, socketPolicyCloseOnTimeout, socketPolicyReopenActive @@ -41,9 +39,9 @@ describe('policy', () => { vi.clearAllMocks() }) - describe("socketPolicyDeferOnAuth", () => { + describe("socketPolicyAuthBuffer", () => { it("should buffer messages when not authenticated", () => { - const cleanup = socketPolicyDeferOnAuth(socket) + const cleanup = socketPolicyAuthBuffer(socket) const removeSpy = vi.spyOn(socket._sendQueue, 'remove') socket.emit(SocketEvent.Receive, ["AUTH", "challenge"]) @@ -67,7 +65,7 @@ describe('policy', () => { }) it("should send buffered messages when auth succeeds", () => { - const cleanup = socketPolicyDeferOnAuth(socket) + const cleanup = socketPolicyAuthBuffer(socket) const sendSpy = vi.spyOn(socket, 'send') socket.emit(SocketEvent.Receive, ["AUTH", "challenge"]) @@ -89,7 +87,7 @@ describe('policy', () => { }) it("should handle CLOSE messages properly", () => { - const cleanup = socketPolicyDeferOnAuth(socket) + const cleanup = socketPolicyAuthBuffer(socket) const removeSpy = vi.spyOn(socket._sendQueue, 'remove') socket.emit(SocketEvent.Receive, ["AUTH", "challenge"]) @@ -108,11 +106,9 @@ describe('policy', () => { cleanup() }) - }) - describe("socketPolicyRetryAuthRequired", () => { it("should retry events once when auth-required", () => { - const cleanup = socketPolicyRetryAuthRequired(socket) + const cleanup = socketPolicyAuthBuffer(socket) const sendSpy = vi.spyOn(socket, 'send') // Send an event @@ -135,7 +131,7 @@ describe('policy', () => { }) it("should retry REQ once when auth-required", () => { - const cleanup = socketPolicyRetryAuthRequired(socket) + const cleanup = socketPolicyAuthBuffer(socket) const sendSpy = vi.spyOn(socket, 'send') // Send a REQ @@ -158,7 +154,7 @@ describe('policy', () => { }) it("should not retry AUTH_JOIN events", () => { - const cleanup = socketPolicyRetryAuthRequired(socket) + const cleanup = socketPolicyAuthBuffer(socket) const sendSpy = vi.spyOn(socket, 'send') // Send an AUTH_JOIN event @@ -175,7 +171,7 @@ describe('policy', () => { }) it("should clear pending messages on successful response", () => { - const cleanup = socketPolicyRetryAuthRequired(socket) + const cleanup = socketPolicyAuthBuffer(socket) const sendSpy = vi.spyOn(socket, 'send') // Send an event diff --git a/packages/net/src/auth.ts b/packages/net/src/auth.ts index 1d8b815..6b0e394 100644 --- a/packages/net/src/auth.ts +++ b/packages/net/src/auth.ts @@ -66,10 +66,14 @@ export class AuthState extends EventEmitter { if (isRelayAuth(message)) { const [_, challenge] = message - this.challenge = challenge - this.request = undefined - this.details = undefined - this.setStatus(AuthStatus.Requested) + // Sometimes relays send the same challenge multiple times, no need to + // respond to it twice + if (challenge !== this.challenge) { + this.challenge = challenge + this.request = undefined + this.details = undefined + this.setStatus(AuthStatus.Requested) + } } }), on(socket, SocketEvent.Sending, (message: RelayMessage) => { diff --git a/packages/net/src/policy.ts b/packages/net/src/policy.ts index 889d859..48ff41b 100644 --- a/packages/net/src/policy.ts +++ b/packages/net/src/policy.ts @@ -1,4 +1,4 @@ -import {on, always, call, sleep, spec, ago, now} from "@welshman/lib" +import {on, nthEq, always, call, sleep, spec, ago, now} from "@welshman/lib" import {AUTH_JOIN, StampedEvent, SignedEvent} from "@welshman/util" import { ClientMessage, @@ -6,118 +6,73 @@ import { isClientClose, isClientEvent, isClientReq, + isClientNegClose, ClientMessageType, RelayMessage, isRelayOk, + isRelayEose, isRelayClosed, } from "./message.js" import {Socket, SocketStatus, SocketEvent} from "./socket.js" import {AuthStatus, AuthStateEvent} from "./auth.js" /** - * Defers sending messages when a challenge has been presented and not answered yet + * Handles auth-related message management: + * - Defers sending messages when a challenge is pending + * - Re-enqueues event/req messages once if rejected due to auth-required * @param socket - a Socket object * @return a cleanup function */ -export const socketPolicyDeferOnAuth = (socket: Socket) => { - const buffer: ClientMessage[] = [] - const okStatuses = [AuthStatus.None, AuthStatus.Ok] +export const socketPolicyAuthBuffer = (socket: Socket) => { + const {None, Ok, DeniedSignature, Forbidden} = AuthStatus + const terminalStatuses = [Ok, DeniedSignature, Forbidden] + + let buffer: ClientMessage[] = [] const unsubscribers = [ - // Pause sending certain messages when we're not authenticated on(socket, SocketEvent.Sending, (message: ClientMessage) => { - // If we're closing a request, but it never got sent, remove both from the queue - // Otherwise, always send CLOSE - if (isClientClose(message)) { - const req = buffer.find(spec([ClientMessageType.Req, message[1]])) - - if (req) { - socket._sendQueue.remove(req) - socket._sendQueue.remove(message) - } - - return - } - // Always allow sending auth if (isClientAuth(message)) return // Always allow sending join requests if (isClientEvent(message) && message[1].kind === AUTH_JOIN) return - // If we're not ok, remove the message and save it for later - if (!okStatuses.includes(socket.auth.status)) { - buffer.push(message) - socket._sendQueue.remove(message) + // If the auth flow is complete, no need to buffer anymore + if (terminalStatuses.includes(socket.auth.status)) return + + // If the client is closing a req, remove both from our buffer + // Otherwise, if auth isn't done, hang on to recent messages in case we need to replay them + if (isClientClose(message) || isClientNegClose(message)) { + buffer = buffer.filter(nthEq(1, message[1])) + } else { + buffer = buffer.slice(-50).concat([message]) + } + }), + on(socket, SocketEvent.Receiving, (message: RelayMessage) => { + // If the client is closing a request during auth, don't tell the caller, we'll retry it + if (isRelayClosed(message) && message[2]?.startsWith('auth-required:')) { + socket._recvQueue.remove(message) + } + + // If we get an eose but we're in the middle of authenticating, wait + if (isRelayEose(message) && ![None, Ok].includes(socket.auth.status)) { + socket._recvQueue.remove(message) + } + + // If the client is rejecting an event during auth, don't tell the caller, we'll retry it + if (isRelayOk(message) && !message[2] && message[3]?.startsWith('auth-required:')) { + socket._recvQueue.remove(message) } }), - // Send buffered messages when we get successful auth on(socket.auth, AuthStateEvent.Status, (status: AuthStatus) => { - if (okStatuses.includes(status) && buffer.length > 0) { + // Send buffered messages when we get successful auth. In any case, clear them out + // if the auth flow is complete + if (status === Ok) { for (const message of buffer.splice(0)) { socket.send(message) } - } - }), - ] - - return () => { - unsubscribers.forEach(call) - } -} - -/** - * Re-enqueues event/req messages once if rejected due to auth-required - * @param socket - a Socket object - * @return a cleanup function - */ -export const socketPolicyRetryAuthRequired = (socket: Socket) => { - const retried = new Set() - const pending = new Map() - - const unsubscribers = [ - // Watch outgoing events and requests and keep a copy - on(socket, SocketEvent.Send, (message: ClientMessage) => { - if (isClientEvent(message)) { - const [_, event] = message - - if (!retried.has(event.id) && event.kind !== AUTH_JOIN) { - pending.set(event.id, message) - } - } - - if (isClientReq(message)) { - const [_, id] = message - - if (!retried.has(id)) { - pending.set(id, message) - } - } - }), - // If a message is rejected with auth-required, re-enqueue it one time - on(socket, SocketEvent.Receive, (message: RelayMessage) => { - if (isRelayOk(message)) { - const [_, id, ok, detail] = message - const pendingMessage = pending.get(id) - - if (pendingMessage && !ok && detail?.startsWith("auth-required:")) { - socket.send(pendingMessage) - retried.add(id) - } - - pending.delete(id) - } - - if (isRelayClosed(message)) { - const [_, id, detail] = message - const pendingMessage = pending.get(id) - - if (pendingMessage && detail?.startsWith("auth-required:")) { - socket.send(pendingMessage) - retried.add(id) - } - - pending.delete(id) + } else if (terminalStatuses.includes(socket.auth.status)) { + buffer = [] } }), ] @@ -256,8 +211,7 @@ export const makeSocketPolicyAuth = (options: SocketPolicyAuthOptions) => (socke } export const defaultSocketPolicies = [ - socketPolicyDeferOnAuth, - socketPolicyRetryAuthRequired, + socketPolicyAuthBuffer, socketPolicyConnectOnSend, socketPolicyCloseOnTimeout, socketPolicyReopenActive,