Add policy tests
This commit is contained in:
@@ -259,5 +259,208 @@ describe('policy', () => {
|
||||
})
|
||||
|
||||
describe("socketPolicyCloseOnTimeout", () => {
|
||||
it("should close socket after 30 seconds of inactivity", async () => {
|
||||
const cleanup = socketPolicyCloseOnTimeout(socket)
|
||||
const closeSpy = vi.spyOn(socket, 'close')
|
||||
|
||||
// Set socket as open
|
||||
socket.emit(SocketEventType.Status, SocketStatus.Open)
|
||||
|
||||
// Advance time past the timeout
|
||||
await vi.advanceTimersByTimeAsync(35000)
|
||||
|
||||
// Socket should be closed
|
||||
expect(closeSpy).toHaveBeenCalled()
|
||||
|
||||
cleanup()
|
||||
})
|
||||
|
||||
it("should reset timer on send activity", () => {
|
||||
const cleanup = socketPolicyCloseOnTimeout(socket)
|
||||
const closeSpy = vi.spyOn(socket, 'close')
|
||||
|
||||
// Set socket as open
|
||||
socket.emit(SocketEventType.Status, SocketStatus.Open)
|
||||
|
||||
// Advance time partially
|
||||
vi.advanceTimersByTime(20000)
|
||||
|
||||
// Send a message
|
||||
socket.emit(SocketEventType.Send, ["EVENT", { id: "123" }])
|
||||
|
||||
// Advance time partially again
|
||||
vi.advanceTimersByTime(20000)
|
||||
|
||||
// Socket should not be closed yet
|
||||
expect(closeSpy).not.toHaveBeenCalled()
|
||||
|
||||
// Advance remaining time
|
||||
vi.advanceTimersByTime(11000)
|
||||
|
||||
// Now socket should be closed
|
||||
expect(closeSpy).toHaveBeenCalled()
|
||||
|
||||
cleanup()
|
||||
})
|
||||
|
||||
it("should reset timer on receive activity", () => {
|
||||
const cleanup = socketPolicyCloseOnTimeout(socket)
|
||||
const closeSpy = vi.spyOn(socket, 'close')
|
||||
|
||||
// Set socket as open
|
||||
socket.emit(SocketEventType.Status, SocketStatus.Open)
|
||||
|
||||
// Advance time partially
|
||||
vi.advanceTimersByTime(20000)
|
||||
|
||||
// Receive a message
|
||||
socket.emit(SocketEventType.Receive, ["EVENT", "123", { id: "123" }])
|
||||
|
||||
// Advance time partially again
|
||||
vi.advanceTimersByTime(20000)
|
||||
|
||||
// Socket should not be closed yet
|
||||
expect(closeSpy).not.toHaveBeenCalled()
|
||||
|
||||
// Advance remaining time
|
||||
vi.advanceTimersByTime(11000)
|
||||
|
||||
// Now socket should be closed
|
||||
expect(closeSpy).toHaveBeenCalled()
|
||||
|
||||
cleanup()
|
||||
})
|
||||
|
||||
it("should not close socket if not open", () => {
|
||||
const cleanup = socketPolicyCloseOnTimeout(socket)
|
||||
const closeSpy = vi.spyOn(socket, 'close')
|
||||
|
||||
// Set socket as closed
|
||||
socket.emit(SocketEventType.Status, SocketStatus.Closed)
|
||||
|
||||
// Advance time past the timeout
|
||||
vi.advanceTimersByTime(31000)
|
||||
|
||||
// Socket should not be closed
|
||||
expect(closeSpy).not.toHaveBeenCalled()
|
||||
|
||||
cleanup()
|
||||
})
|
||||
})
|
||||
|
||||
describe("socketPolicyReopenActive", () => {
|
||||
it("should reopen socket when closed with pending messages", async () => {
|
||||
const cleanup = socketPolicyReopenActive(socket)
|
||||
const sendSpy = vi.spyOn(socket, 'send')
|
||||
|
||||
// Send an event that will be pending
|
||||
const event: ClientMessage = ["EVENT", { id: "123", kind: 1 }]
|
||||
socket.emit(SocketEventType.Send, event)
|
||||
|
||||
// Socket closes
|
||||
socket.emit(SocketEventType.Status, SocketStatus.Closed)
|
||||
|
||||
// Advance past the reopen delay
|
||||
await vi.advanceTimersByTimeAsync(30000)
|
||||
|
||||
// Should resend the pending event
|
||||
expect(sendSpy).toHaveBeenCalledWith(event)
|
||||
|
||||
cleanup()
|
||||
})
|
||||
|
||||
it("should reopen socket when closed with pending requests", async () => {
|
||||
const cleanup = socketPolicyReopenActive(socket)
|
||||
const sendSpy = vi.spyOn(socket, 'send')
|
||||
|
||||
// Send a request that will be pending
|
||||
const req: ClientMessage = ["REQ", "123", { kinds: [1] }]
|
||||
socket.emit(SocketEventType.Send, req)
|
||||
|
||||
// Socket closes
|
||||
socket.emit(SocketEventType.Status, SocketStatus.Closed)
|
||||
|
||||
// Advance past the reopen delay
|
||||
await vi.advanceTimersByTimeAsync(30000)
|
||||
|
||||
// Should resend the pending request
|
||||
expect(sendSpy).toHaveBeenCalledWith(req)
|
||||
|
||||
cleanup()
|
||||
})
|
||||
|
||||
it("should not reopen socket immediately after previous open", async () => {
|
||||
const cleanup = socketPolicyReopenActive(socket)
|
||||
const sendSpy = vi.spyOn(socket, 'send')
|
||||
|
||||
// Send an event that will be pending
|
||||
const event: ClientMessage = ["EVENT", { id: "123", kind: 1 }]
|
||||
socket.emit(SocketEventType.Send, event)
|
||||
|
||||
// Socket opens then closes quickly
|
||||
socket.emit(SocketEventType.Status, SocketStatus.Open)
|
||||
socket.emit(SocketEventType.Status, SocketStatus.Closed)
|
||||
|
||||
// Advance a short time
|
||||
vi.advanceTimersByTime(5000)
|
||||
|
||||
// Should not resend yet to prevent flapping
|
||||
expect(sendSpy).not.toHaveBeenCalled()
|
||||
|
||||
// Advance remaining time
|
||||
await vi.advanceTimersByTimeAsync(25000)
|
||||
|
||||
// Now should resend
|
||||
expect(sendSpy).toHaveBeenCalledWith(event)
|
||||
|
||||
cleanup()
|
||||
})
|
||||
|
||||
it("should remove pending messages when they complete", () => {
|
||||
const cleanup = socketPolicyReopenActive(socket)
|
||||
const sendSpy = vi.spyOn(socket, 'send')
|
||||
|
||||
// Send an event that will be pending
|
||||
const event: ClientMessage = ["EVENT", { id: "123", kind: 1 }]
|
||||
socket.emit(SocketEventType.Send, event)
|
||||
|
||||
// Event completes successfully
|
||||
socket.emit(SocketEventType.Receive, ["OK", "123", true])
|
||||
|
||||
// Socket closes
|
||||
socket.emit(SocketEventType.Status, SocketStatus.Closed)
|
||||
|
||||
// Advance past the reopen delay
|
||||
vi.advanceTimersByTime(30000)
|
||||
|
||||
// Should not resend since event was completed
|
||||
expect(sendSpy).not.toHaveBeenCalled()
|
||||
|
||||
cleanup()
|
||||
})
|
||||
|
||||
it("should remove pending messages when closed", () => {
|
||||
const cleanup = socketPolicyReopenActive(socket)
|
||||
const sendSpy = vi.spyOn(socket, 'send')
|
||||
|
||||
// Send a request that will be pending
|
||||
const req: ClientMessage = ["REQ", "123", { kinds: [1] }]
|
||||
socket.emit(SocketEventType.Send, req)
|
||||
|
||||
// Send close for the request
|
||||
const close: ClientMessage = ["CLOSE", "123"]
|
||||
socket.emit(SocketEventType.Send, close)
|
||||
|
||||
// Socket closes
|
||||
socket.emit(SocketEventType.Status, SocketStatus.Closed)
|
||||
|
||||
// Advance past the reopen delay
|
||||
vi.advanceTimersByTime(30000)
|
||||
|
||||
// Should not resend since request was closed
|
||||
expect(sendSpy).not.toHaveBeenCalled()
|
||||
|
||||
cleanup()
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
@@ -194,7 +194,7 @@ export const socketPolicyCloseOnTimeout = (socket: Socket) => {
|
||||
export const socketPolicyReopenActive = (socket: Socket) => {
|
||||
const pending = new Map<string, ClientMessage>()
|
||||
|
||||
let lastOpen = 0
|
||||
let lastOpen = Date.now()
|
||||
|
||||
const unsubscribers = [
|
||||
on(socket, SocketEventType.Status, (newStatus: SocketStatus) => {
|
||||
@@ -205,7 +205,9 @@ export const socketPolicyReopenActive = (socket: Socket) => {
|
||||
|
||||
// If the socket closed and we have no error, reopen it but don't flap
|
||||
if (newStatus === SocketStatus.Closed && pending.size) {
|
||||
console.log('1')
|
||||
sleep(Math.max(0, 30_000 - (Date.now() - lastOpen))).then(() => {
|
||||
console.log('2')
|
||||
for (const message of pending.values()) {
|
||||
socket.send(message)
|
||||
}
|
||||
|
||||
@@ -30,6 +30,8 @@ export type SocketEvents = {
|
||||
}
|
||||
|
||||
export class Socket extends (EventEmitter as new () => TypedEmitter<SocketEvents>) {
|
||||
readonly status = SocketStatus.Closed
|
||||
|
||||
_ws?: WebSocket
|
||||
_sendQueue: TaskQueue<ClientMessage>
|
||||
_recvQueue: TaskQueue<RelayMessage>
|
||||
@@ -51,6 +53,10 @@ export class Socket extends (EventEmitter as new () => TypedEmitter<SocketEvents
|
||||
this.emit(SocketEventType.Receive, message, this.url)
|
||||
},
|
||||
})
|
||||
|
||||
this.on(SocketEventType.Status, (status: SocketStatus) => {
|
||||
this.status = status
|
||||
})
|
||||
}
|
||||
|
||||
open = () => {
|
||||
|
||||
Reference in New Issue
Block a user