From 3081d1e6b453479b93e4ad95d9bbdce05be973e3 Mon Sep 17 00:00:00 2001 From: Jon Staab Date: Thu, 27 Mar 2025 18:05:12 -0700 Subject: [PATCH] Add policy tests --- packages/net2/__tests__/policy.test.ts | 203 +++++++++++++++++++++++++ packages/net2/src/policy.ts | 4 +- packages/net2/src/socket.ts | 6 + 3 files changed, 212 insertions(+), 1 deletion(-) diff --git a/packages/net2/__tests__/policy.test.ts b/packages/net2/__tests__/policy.test.ts index 26dec62..aab5112 100644 --- a/packages/net2/__tests__/policy.test.ts +++ b/packages/net2/__tests__/policy.test.ts @@ -259,5 +259,208 @@ describe('policy', () => { }) describe("socketPolicyCloseOnTimeout", () => { + it("should close socket after 30 seconds of inactivity", async () => { + const cleanup = socketPolicyCloseOnTimeout(socket) + const closeSpy = vi.spyOn(socket, 'close') + + // Set socket as open + socket.emit(SocketEventType.Status, SocketStatus.Open) + + // Advance time past the timeout + await vi.advanceTimersByTimeAsync(35000) + + // Socket should be closed + expect(closeSpy).toHaveBeenCalled() + + cleanup() + }) + + it("should reset timer on send activity", () => { + const cleanup = socketPolicyCloseOnTimeout(socket) + const closeSpy = vi.spyOn(socket, 'close') + + // Set socket as open + socket.emit(SocketEventType.Status, SocketStatus.Open) + + // Advance time partially + vi.advanceTimersByTime(20000) + + // Send a message + socket.emit(SocketEventType.Send, ["EVENT", { id: "123" }]) + + // Advance time partially again + vi.advanceTimersByTime(20000) + + // Socket should not be closed yet + expect(closeSpy).not.toHaveBeenCalled() + + // Advance remaining time + vi.advanceTimersByTime(11000) + + // Now socket should be closed + expect(closeSpy).toHaveBeenCalled() + + cleanup() + }) + + it("should reset timer on receive activity", () => { + const cleanup = socketPolicyCloseOnTimeout(socket) + const closeSpy = vi.spyOn(socket, 'close') + + // Set socket as open + socket.emit(SocketEventType.Status, SocketStatus.Open) + + // Advance time partially + vi.advanceTimersByTime(20000) + + // Receive a message + socket.emit(SocketEventType.Receive, ["EVENT", "123", { id: "123" }]) + + // Advance time partially again + vi.advanceTimersByTime(20000) + + // Socket should not be closed yet + expect(closeSpy).not.toHaveBeenCalled() + + // Advance remaining time + vi.advanceTimersByTime(11000) + + // Now socket should be closed + expect(closeSpy).toHaveBeenCalled() + + cleanup() + }) + + it("should not close socket if not open", () => { + const cleanup = socketPolicyCloseOnTimeout(socket) + const closeSpy = vi.spyOn(socket, 'close') + + // Set socket as closed + socket.emit(SocketEventType.Status, SocketStatus.Closed) + + // Advance time past the timeout + vi.advanceTimersByTime(31000) + + // Socket should not be closed + expect(closeSpy).not.toHaveBeenCalled() + + cleanup() + }) + }) + + describe("socketPolicyReopenActive", () => { + it("should reopen socket when closed with pending messages", async () => { + const cleanup = socketPolicyReopenActive(socket) + const sendSpy = vi.spyOn(socket, 'send') + + // Send an event that will be pending + const event: ClientMessage = ["EVENT", { id: "123", kind: 1 }] + socket.emit(SocketEventType.Send, event) + + // Socket closes + socket.emit(SocketEventType.Status, SocketStatus.Closed) + + // Advance past the reopen delay + await vi.advanceTimersByTimeAsync(30000) + + // Should resend the pending event + expect(sendSpy).toHaveBeenCalledWith(event) + + cleanup() + }) + + it("should reopen socket when closed with pending requests", async () => { + const cleanup = socketPolicyReopenActive(socket) + const sendSpy = vi.spyOn(socket, 'send') + + // Send a request that will be pending + const req: ClientMessage = ["REQ", "123", { kinds: [1] }] + socket.emit(SocketEventType.Send, req) + + // Socket closes + socket.emit(SocketEventType.Status, SocketStatus.Closed) + + // Advance past the reopen delay + await vi.advanceTimersByTimeAsync(30000) + + // Should resend the pending request + expect(sendSpy).toHaveBeenCalledWith(req) + + cleanup() + }) + + it("should not reopen socket immediately after previous open", async () => { + const cleanup = socketPolicyReopenActive(socket) + const sendSpy = vi.spyOn(socket, 'send') + + // Send an event that will be pending + const event: ClientMessage = ["EVENT", { id: "123", kind: 1 }] + socket.emit(SocketEventType.Send, event) + + // Socket opens then closes quickly + socket.emit(SocketEventType.Status, SocketStatus.Open) + socket.emit(SocketEventType.Status, SocketStatus.Closed) + + // Advance a short time + vi.advanceTimersByTime(5000) + + // Should not resend yet to prevent flapping + expect(sendSpy).not.toHaveBeenCalled() + + // Advance remaining time + await vi.advanceTimersByTimeAsync(25000) + + // Now should resend + expect(sendSpy).toHaveBeenCalledWith(event) + + cleanup() + }) + + it("should remove pending messages when they complete", () => { + const cleanup = socketPolicyReopenActive(socket) + const sendSpy = vi.spyOn(socket, 'send') + + // Send an event that will be pending + const event: ClientMessage = ["EVENT", { id: "123", kind: 1 }] + socket.emit(SocketEventType.Send, event) + + // Event completes successfully + socket.emit(SocketEventType.Receive, ["OK", "123", true]) + + // Socket closes + socket.emit(SocketEventType.Status, SocketStatus.Closed) + + // Advance past the reopen delay + vi.advanceTimersByTime(30000) + + // Should not resend since event was completed + expect(sendSpy).not.toHaveBeenCalled() + + cleanup() + }) + + it("should remove pending messages when closed", () => { + const cleanup = socketPolicyReopenActive(socket) + const sendSpy = vi.spyOn(socket, 'send') + + // Send a request that will be pending + const req: ClientMessage = ["REQ", "123", { kinds: [1] }] + socket.emit(SocketEventType.Send, req) + + // Send close for the request + const close: ClientMessage = ["CLOSE", "123"] + socket.emit(SocketEventType.Send, close) + + // Socket closes + socket.emit(SocketEventType.Status, SocketStatus.Closed) + + // Advance past the reopen delay + vi.advanceTimersByTime(30000) + + // Should not resend since request was closed + expect(sendSpy).not.toHaveBeenCalled() + + cleanup() + }) }) }) diff --git a/packages/net2/src/policy.ts b/packages/net2/src/policy.ts index a2e9ffd..0f81dbb 100644 --- a/packages/net2/src/policy.ts +++ b/packages/net2/src/policy.ts @@ -194,7 +194,7 @@ export const socketPolicyCloseOnTimeout = (socket: Socket) => { export const socketPolicyReopenActive = (socket: Socket) => { const pending = new Map() - let lastOpen = 0 + let lastOpen = Date.now() const unsubscribers = [ on(socket, SocketEventType.Status, (newStatus: SocketStatus) => { @@ -205,7 +205,9 @@ export const socketPolicyReopenActive = (socket: Socket) => { // If the socket closed and we have no error, reopen it but don't flap if (newStatus === SocketStatus.Closed && pending.size) { + console.log('1') sleep(Math.max(0, 30_000 - (Date.now() - lastOpen))).then(() => { + console.log('2') for (const message of pending.values()) { socket.send(message) } diff --git a/packages/net2/src/socket.ts b/packages/net2/src/socket.ts index 30545a8..203bb4a 100644 --- a/packages/net2/src/socket.ts +++ b/packages/net2/src/socket.ts @@ -30,6 +30,8 @@ export type SocketEvents = { } export class Socket extends (EventEmitter as new () => TypedEmitter) { + readonly status = SocketStatus.Closed + _ws?: WebSocket _sendQueue: TaskQueue _recvQueue: TaskQueue @@ -51,6 +53,10 @@ export class Socket extends (EventEmitter as new () => TypedEmitter { + this.status = status + }) } open = () => {