diff --git a/docs/net/policy.md b/docs/net/policy.md index 6c56650..e68ea85 100644 --- a/docs/net/policy.md +++ b/docs/net/policy.md @@ -21,13 +21,9 @@ Buffers messages during authentication flow and replays them after successful au Auto-connects closed sockets when messages are sent (with error cooldown). -### `socketPolicyCloseOnTimeout` +### `socketPolicyCloseInactive` -Closes sockets after 30 seconds of inactivity. - -### `socketPolicyReopenActive` - -Reopens sockets that have pending requests or publishes. +Closes sockets after 30 seconds of inactivity and reopens sockets that have pending requests or publishes. ## Custom Auth Policy diff --git a/packages/net/__tests__/policy.test.ts b/packages/net/__tests__/policy.test.ts index 45353d3..dc029f8 100644 --- a/packages/net/__tests__/policy.test.ts +++ b/packages/net/__tests__/policy.test.ts @@ -5,8 +5,7 @@ import {AuthStatus, AuthStateEvent} from "../src/auth" import { socketPolicyAuthBuffer, socketPolicyConnectOnSend, - socketPolicyCloseOnTimeout, - socketPolicyReopenActive, + socketPolicyCloseInactive, } from "../src/policy" import {ClientMessage, RelayMessage} from "../src/message" @@ -267,9 +266,9 @@ describe("policy", () => { }) }) - describe("socketPolicyCloseOnTimeout", () => { + describe("socketPolicyCloseInactive", () => { it("should close socket after 30 seconds of inactivity", async () => { - const cleanup = socketPolicyCloseOnTimeout(socket) + const cleanup = socketPolicyCloseInactive(socket) const closeSpy = vi.spyOn(socket, "close") // Set socket as open @@ -285,7 +284,7 @@ describe("policy", () => { }) it("should reset timer on send activity", () => { - const cleanup = socketPolicyCloseOnTimeout(socket) + const cleanup = socketPolicyCloseInactive(socket) const closeSpy = vi.spyOn(socket, "close") // Set socket as open @@ -296,6 +295,7 @@ describe("policy", () => { // Send a message socket.emit(SocketEvent.Send, ["EVENT", {id: "123"}]) + socket.emit(SocketEvent.Receive, ["OK", "123", true, ""]) // Advance time partially again vi.advanceTimersByTime(20000) @@ -313,7 +313,7 @@ describe("policy", () => { }) it("should reset timer on receive activity", () => { - const cleanup = socketPolicyCloseOnTimeout(socket) + const cleanup = socketPolicyCloseInactive(socket) const closeSpy = vi.spyOn(socket, "close") // Set socket as open @@ -341,7 +341,7 @@ describe("policy", () => { }) it("should not close socket if not open", () => { - const cleanup = socketPolicyCloseOnTimeout(socket) + const cleanup = socketPolicyCloseInactive(socket) const closeSpy = vi.spyOn(socket, "close") // Set socket as closed @@ -355,11 +355,9 @@ describe("policy", () => { cleanup() }) - }) - describe("socketPolicyReopenActive", () => { it("should reopen socket when closed with pending messages", async () => { - const cleanup = socketPolicyReopenActive(socket) + const cleanup = socketPolicyCloseInactive(socket) const sendSpy = vi.spyOn(socket, "send") // Send an event that will be pending @@ -379,7 +377,7 @@ describe("policy", () => { }) it("should reopen socket when closed with pending requests", async () => { - const cleanup = socketPolicyReopenActive(socket) + const cleanup = socketPolicyCloseInactive(socket) const sendSpy = vi.spyOn(socket, "send") // Send a request that will be pending @@ -399,7 +397,7 @@ describe("policy", () => { }) it("should not reopen socket immediately after previous open", async () => { - const cleanup = socketPolicyReopenActive(socket) + const cleanup = socketPolicyCloseInactive(socket) const sendSpy = vi.spyOn(socket, "send") // Send an event that will be pending @@ -426,7 +424,7 @@ describe("policy", () => { }) it("should remove pending messages when they complete", () => { - const cleanup = socketPolicyReopenActive(socket) + const cleanup = socketPolicyCloseInactive(socket) const sendSpy = vi.spyOn(socket, "send") // Send an event that will be pending @@ -449,7 +447,7 @@ describe("policy", () => { }) it("should remove pending messages when closed", () => { - const cleanup = socketPolicyReopenActive(socket) + const cleanup = socketPolicyCloseInactive(socket) const sendSpy = vi.spyOn(socket, "send") // Send a request that will be pending diff --git a/packages/net/src/policy.ts b/packages/net/src/policy.ts index 4bf541c..4d38ca9 100644 --- a/packages/net/src/policy.ts +++ b/packages/net/src/policy.ts @@ -1,4 +1,4 @@ -import {on, nthNe, always, call, sleep, ago, now} from "@welshman/lib" +import {on, ms, nthNe, always, call, sleep, ago, now} from "@welshman/lib" import {AUTH_JOIN, StampedEvent, SignedEvent} from "@welshman/util" import { ClientMessage, @@ -116,56 +116,28 @@ export const socketPolicyConnectOnSend = (socket: Socket) => { } /** - * Auto-closes a socket after 30 seconds of inactivity + * Auto-closes inactive sockets, and re-opens sockets with pending messages * @param socket - a Socket object * @return a cleanup function */ -export const socketPolicyCloseOnTimeout = (socket: Socket) => { - let lastActivity = now() - - const unsubscribers = [ - on(socket, SocketEvent.Send, (message: ClientMessage) => { - lastActivity = now() - }), - on(socket, SocketEvent.Receive, (message: RelayMessage) => { - lastActivity = now() - }), - ] - - const interval = setInterval(() => { - if (socket.status === SocketStatus.Open && lastActivity < ago(30)) { - socket.close() - } - }, 3000) - - return () => { - unsubscribers.forEach(call) - clearInterval(interval) - } -} - -/** - * Automatically re-opens a socket if there are active requests or publishes - * @param socket - a Socket object - * @return a cleanup function - */ -export const socketPolicyReopenActive = (socket: Socket) => { +export const socketPolicyCloseInactive = (socket: Socket) => { const pending = new Map() - let lastOpen = Date.now() + let lastOpen = now() + let lastActivity = now() const unsubscribers = [ on(socket, SocketEvent.Status, (newStatus: SocketStatus) => { const isClosed = [SocketStatus.Closed, SocketStatus.Error].includes(socket.status) - // Keep track of the most recent error + // Keep track of the most recent open if (newStatus === SocketStatus.Open) { - lastOpen = Date.now() + lastOpen = now() } // If the socket closed and we have no error, reopen it but don't flap if (isClosed && pending.size) { - sleep(Math.max(0, 5000 - (Date.now() - lastOpen))).then(() => { + sleep(Math.max(0, ms(5 - (now() - lastOpen)))).then(() => { for (const message of pending.values()) { socket.send(message) } @@ -173,6 +145,8 @@ export const socketPolicyReopenActive = (socket: Socket) => { } }), on(socket, SocketEvent.Send, (message: ClientMessage) => { + lastActivity = now() + if (isClientEvent(message)) { pending.set(message[1].id, message) } @@ -186,13 +160,24 @@ export const socketPolicyReopenActive = (socket: Socket) => { } }), on(socket, SocketEvent.Receive, (message: RelayMessage) => { + lastActivity = now() + if (isRelayClosed(message) || isRelayOk(message)) { pending.delete(message[1]) } }), ] - return () => unsubscribers.forEach(call) + const interval = setInterval(() => { + if (socket.status === SocketStatus.Open && lastActivity < ago(30) && pending.size === 0) { + socket.close() + } + }, 3000) + + return () => { + unsubscribers.forEach(call) + clearInterval(interval) + } } export type SocketPolicyAuthOptions = { @@ -224,6 +209,5 @@ export const makeSocketPolicyAuth = (options: SocketPolicyAuthOptions) => (socke export const defaultSocketPolicies = [ socketPolicyAuthBuffer, socketPolicyConnectOnSend, - socketPolicyCloseOnTimeout, - socketPolicyReopenActive, + socketPolicyCloseInactive, ]