Combine auto open and auto close policies
This commit is contained in:
+2
-6
@@ -21,13 +21,9 @@ Buffers messages during authentication flow and replays them after successful au
|
|||||||
|
|
||||||
Auto-connects closed sockets when messages are sent (with error cooldown).
|
Auto-connects closed sockets when messages are sent (with error cooldown).
|
||||||
|
|
||||||
### `socketPolicyCloseOnTimeout`
|
### `socketPolicyCloseInactive`
|
||||||
|
|
||||||
Closes sockets after 30 seconds of inactivity.
|
Closes sockets after 30 seconds of inactivity and reopens sockets that have pending requests or publishes.
|
||||||
|
|
||||||
### `socketPolicyReopenActive`
|
|
||||||
|
|
||||||
Reopens sockets that have pending requests or publishes.
|
|
||||||
|
|
||||||
## Custom Auth Policy
|
## Custom Auth Policy
|
||||||
|
|
||||||
|
|||||||
@@ -5,8 +5,7 @@ import {AuthStatus, AuthStateEvent} from "../src/auth"
|
|||||||
import {
|
import {
|
||||||
socketPolicyAuthBuffer,
|
socketPolicyAuthBuffer,
|
||||||
socketPolicyConnectOnSend,
|
socketPolicyConnectOnSend,
|
||||||
socketPolicyCloseOnTimeout,
|
socketPolicyCloseInactive,
|
||||||
socketPolicyReopenActive,
|
|
||||||
} from "../src/policy"
|
} from "../src/policy"
|
||||||
import {ClientMessage, RelayMessage} from "../src/message"
|
import {ClientMessage, RelayMessage} from "../src/message"
|
||||||
|
|
||||||
@@ -267,9 +266,9 @@ describe("policy", () => {
|
|||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
describe("socketPolicyCloseOnTimeout", () => {
|
describe("socketPolicyCloseInactive", () => {
|
||||||
it("should close socket after 30 seconds of inactivity", async () => {
|
it("should close socket after 30 seconds of inactivity", async () => {
|
||||||
const cleanup = socketPolicyCloseOnTimeout(socket)
|
const cleanup = socketPolicyCloseInactive(socket)
|
||||||
const closeSpy = vi.spyOn(socket, "close")
|
const closeSpy = vi.spyOn(socket, "close")
|
||||||
|
|
||||||
// Set socket as open
|
// Set socket as open
|
||||||
@@ -285,7 +284,7 @@ describe("policy", () => {
|
|||||||
})
|
})
|
||||||
|
|
||||||
it("should reset timer on send activity", () => {
|
it("should reset timer on send activity", () => {
|
||||||
const cleanup = socketPolicyCloseOnTimeout(socket)
|
const cleanup = socketPolicyCloseInactive(socket)
|
||||||
const closeSpy = vi.spyOn(socket, "close")
|
const closeSpy = vi.spyOn(socket, "close")
|
||||||
|
|
||||||
// Set socket as open
|
// Set socket as open
|
||||||
@@ -296,6 +295,7 @@ describe("policy", () => {
|
|||||||
|
|
||||||
// Send a message
|
// Send a message
|
||||||
socket.emit(SocketEvent.Send, ["EVENT", {id: "123"}])
|
socket.emit(SocketEvent.Send, ["EVENT", {id: "123"}])
|
||||||
|
socket.emit(SocketEvent.Receive, ["OK", "123", true, ""])
|
||||||
|
|
||||||
// Advance time partially again
|
// Advance time partially again
|
||||||
vi.advanceTimersByTime(20000)
|
vi.advanceTimersByTime(20000)
|
||||||
@@ -313,7 +313,7 @@ describe("policy", () => {
|
|||||||
})
|
})
|
||||||
|
|
||||||
it("should reset timer on receive activity", () => {
|
it("should reset timer on receive activity", () => {
|
||||||
const cleanup = socketPolicyCloseOnTimeout(socket)
|
const cleanup = socketPolicyCloseInactive(socket)
|
||||||
const closeSpy = vi.spyOn(socket, "close")
|
const closeSpy = vi.spyOn(socket, "close")
|
||||||
|
|
||||||
// Set socket as open
|
// Set socket as open
|
||||||
@@ -341,7 +341,7 @@ describe("policy", () => {
|
|||||||
})
|
})
|
||||||
|
|
||||||
it("should not close socket if not open", () => {
|
it("should not close socket if not open", () => {
|
||||||
const cleanup = socketPolicyCloseOnTimeout(socket)
|
const cleanup = socketPolicyCloseInactive(socket)
|
||||||
const closeSpy = vi.spyOn(socket, "close")
|
const closeSpy = vi.spyOn(socket, "close")
|
||||||
|
|
||||||
// Set socket as closed
|
// Set socket as closed
|
||||||
@@ -355,11 +355,9 @@ describe("policy", () => {
|
|||||||
|
|
||||||
cleanup()
|
cleanup()
|
||||||
})
|
})
|
||||||
})
|
|
||||||
|
|
||||||
describe("socketPolicyReopenActive", () => {
|
|
||||||
it("should reopen socket when closed with pending messages", async () => {
|
it("should reopen socket when closed with pending messages", async () => {
|
||||||
const cleanup = socketPolicyReopenActive(socket)
|
const cleanup = socketPolicyCloseInactive(socket)
|
||||||
const sendSpy = vi.spyOn(socket, "send")
|
const sendSpy = vi.spyOn(socket, "send")
|
||||||
|
|
||||||
// Send an event that will be pending
|
// Send an event that will be pending
|
||||||
@@ -379,7 +377,7 @@ describe("policy", () => {
|
|||||||
})
|
})
|
||||||
|
|
||||||
it("should reopen socket when closed with pending requests", async () => {
|
it("should reopen socket when closed with pending requests", async () => {
|
||||||
const cleanup = socketPolicyReopenActive(socket)
|
const cleanup = socketPolicyCloseInactive(socket)
|
||||||
const sendSpy = vi.spyOn(socket, "send")
|
const sendSpy = vi.spyOn(socket, "send")
|
||||||
|
|
||||||
// Send a request that will be pending
|
// Send a request that will be pending
|
||||||
@@ -399,7 +397,7 @@ describe("policy", () => {
|
|||||||
})
|
})
|
||||||
|
|
||||||
it("should not reopen socket immediately after previous open", async () => {
|
it("should not reopen socket immediately after previous open", async () => {
|
||||||
const cleanup = socketPolicyReopenActive(socket)
|
const cleanup = socketPolicyCloseInactive(socket)
|
||||||
const sendSpy = vi.spyOn(socket, "send")
|
const sendSpy = vi.spyOn(socket, "send")
|
||||||
|
|
||||||
// Send an event that will be pending
|
// Send an event that will be pending
|
||||||
@@ -426,7 +424,7 @@ describe("policy", () => {
|
|||||||
})
|
})
|
||||||
|
|
||||||
it("should remove pending messages when they complete", () => {
|
it("should remove pending messages when they complete", () => {
|
||||||
const cleanup = socketPolicyReopenActive(socket)
|
const cleanup = socketPolicyCloseInactive(socket)
|
||||||
const sendSpy = vi.spyOn(socket, "send")
|
const sendSpy = vi.spyOn(socket, "send")
|
||||||
|
|
||||||
// Send an event that will be pending
|
// Send an event that will be pending
|
||||||
@@ -449,7 +447,7 @@ describe("policy", () => {
|
|||||||
})
|
})
|
||||||
|
|
||||||
it("should remove pending messages when closed", () => {
|
it("should remove pending messages when closed", () => {
|
||||||
const cleanup = socketPolicyReopenActive(socket)
|
const cleanup = socketPolicyCloseInactive(socket)
|
||||||
const sendSpy = vi.spyOn(socket, "send")
|
const sendSpy = vi.spyOn(socket, "send")
|
||||||
|
|
||||||
// Send a request that will be pending
|
// Send a request that will be pending
|
||||||
|
|||||||
+23
-39
@@ -1,4 +1,4 @@
|
|||||||
import {on, nthNe, always, call, sleep, ago, now} from "@welshman/lib"
|
import {on, ms, nthNe, always, call, sleep, ago, now} from "@welshman/lib"
|
||||||
import {AUTH_JOIN, StampedEvent, SignedEvent} from "@welshman/util"
|
import {AUTH_JOIN, StampedEvent, SignedEvent} from "@welshman/util"
|
||||||
import {
|
import {
|
||||||
ClientMessage,
|
ClientMessage,
|
||||||
@@ -116,56 +116,28 @@ export const socketPolicyConnectOnSend = (socket: Socket) => {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Auto-closes a socket after 30 seconds of inactivity
|
* Auto-closes inactive sockets, and re-opens sockets with pending messages
|
||||||
* @param socket - a Socket object
|
* @param socket - a Socket object
|
||||||
* @return a cleanup function
|
* @return a cleanup function
|
||||||
*/
|
*/
|
||||||
export const socketPolicyCloseOnTimeout = (socket: Socket) => {
|
export const socketPolicyCloseInactive = (socket: Socket) => {
|
||||||
let lastActivity = now()
|
|
||||||
|
|
||||||
const unsubscribers = [
|
|
||||||
on(socket, SocketEvent.Send, (message: ClientMessage) => {
|
|
||||||
lastActivity = now()
|
|
||||||
}),
|
|
||||||
on(socket, SocketEvent.Receive, (message: RelayMessage) => {
|
|
||||||
lastActivity = now()
|
|
||||||
}),
|
|
||||||
]
|
|
||||||
|
|
||||||
const interval = setInterval(() => {
|
|
||||||
if (socket.status === SocketStatus.Open && lastActivity < ago(30)) {
|
|
||||||
socket.close()
|
|
||||||
}
|
|
||||||
}, 3000)
|
|
||||||
|
|
||||||
return () => {
|
|
||||||
unsubscribers.forEach(call)
|
|
||||||
clearInterval(interval)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Automatically re-opens a socket if there are active requests or publishes
|
|
||||||
* @param socket - a Socket object
|
|
||||||
* @return a cleanup function
|
|
||||||
*/
|
|
||||||
export const socketPolicyReopenActive = (socket: Socket) => {
|
|
||||||
const pending = new Map<string, ClientMessage>()
|
const pending = new Map<string, ClientMessage>()
|
||||||
|
|
||||||
let lastOpen = Date.now()
|
let lastOpen = now()
|
||||||
|
let lastActivity = now()
|
||||||
|
|
||||||
const unsubscribers = [
|
const unsubscribers = [
|
||||||
on(socket, SocketEvent.Status, (newStatus: SocketStatus) => {
|
on(socket, SocketEvent.Status, (newStatus: SocketStatus) => {
|
||||||
const isClosed = [SocketStatus.Closed, SocketStatus.Error].includes(socket.status)
|
const isClosed = [SocketStatus.Closed, SocketStatus.Error].includes(socket.status)
|
||||||
|
|
||||||
// Keep track of the most recent error
|
// Keep track of the most recent open
|
||||||
if (newStatus === SocketStatus.Open) {
|
if (newStatus === SocketStatus.Open) {
|
||||||
lastOpen = Date.now()
|
lastOpen = now()
|
||||||
}
|
}
|
||||||
|
|
||||||
// If the socket closed and we have no error, reopen it but don't flap
|
// If the socket closed and we have no error, reopen it but don't flap
|
||||||
if (isClosed && pending.size) {
|
if (isClosed && pending.size) {
|
||||||
sleep(Math.max(0, 5000 - (Date.now() - lastOpen))).then(() => {
|
sleep(Math.max(0, ms(5 - (now() - lastOpen)))).then(() => {
|
||||||
for (const message of pending.values()) {
|
for (const message of pending.values()) {
|
||||||
socket.send(message)
|
socket.send(message)
|
||||||
}
|
}
|
||||||
@@ -173,6 +145,8 @@ export const socketPolicyReopenActive = (socket: Socket) => {
|
|||||||
}
|
}
|
||||||
}),
|
}),
|
||||||
on(socket, SocketEvent.Send, (message: ClientMessage) => {
|
on(socket, SocketEvent.Send, (message: ClientMessage) => {
|
||||||
|
lastActivity = now()
|
||||||
|
|
||||||
if (isClientEvent(message)) {
|
if (isClientEvent(message)) {
|
||||||
pending.set(message[1].id, message)
|
pending.set(message[1].id, message)
|
||||||
}
|
}
|
||||||
@@ -186,13 +160,24 @@ export const socketPolicyReopenActive = (socket: Socket) => {
|
|||||||
}
|
}
|
||||||
}),
|
}),
|
||||||
on(socket, SocketEvent.Receive, (message: RelayMessage) => {
|
on(socket, SocketEvent.Receive, (message: RelayMessage) => {
|
||||||
|
lastActivity = now()
|
||||||
|
|
||||||
if (isRelayClosed(message) || isRelayOk(message)) {
|
if (isRelayClosed(message) || isRelayOk(message)) {
|
||||||
pending.delete(message[1])
|
pending.delete(message[1])
|
||||||
}
|
}
|
||||||
}),
|
}),
|
||||||
]
|
]
|
||||||
|
|
||||||
return () => unsubscribers.forEach(call)
|
const interval = setInterval(() => {
|
||||||
|
if (socket.status === SocketStatus.Open && lastActivity < ago(30) && pending.size === 0) {
|
||||||
|
socket.close()
|
||||||
|
}
|
||||||
|
}, 3000)
|
||||||
|
|
||||||
|
return () => {
|
||||||
|
unsubscribers.forEach(call)
|
||||||
|
clearInterval(interval)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
export type SocketPolicyAuthOptions = {
|
export type SocketPolicyAuthOptions = {
|
||||||
@@ -224,6 +209,5 @@ export const makeSocketPolicyAuth = (options: SocketPolicyAuthOptions) => (socke
|
|||||||
export const defaultSocketPolicies = [
|
export const defaultSocketPolicies = [
|
||||||
socketPolicyAuthBuffer,
|
socketPolicyAuthBuffer,
|
||||||
socketPolicyConnectOnSend,
|
socketPolicyConnectOnSend,
|
||||||
socketPolicyCloseOnTimeout,
|
socketPolicyCloseInactive,
|
||||||
socketPolicyReopenActive,
|
|
||||||
]
|
]
|
||||||
|
|||||||
Reference in New Issue
Block a user