Add more policy
This commit is contained in:
@@ -4,6 +4,7 @@ import type {SignedEvent, Filter} from "@welshman/util"
|
||||
|
||||
export enum RelayMessageType {
|
||||
Auth = "AUTH",
|
||||
Closed = "CLOSED",
|
||||
Eose = "EOSE",
|
||||
Event = "EVENT",
|
||||
NegErr = "NEG-ERR",
|
||||
@@ -15,6 +16,8 @@ export type RelayMessage = any[]
|
||||
|
||||
export type RelayAuthPayload = [string]
|
||||
|
||||
export type RelayClosedPayload = [string, string]
|
||||
|
||||
export type RelayEosePayload = [string, SignedEvent]
|
||||
|
||||
export type RelayEventPayload = [string, SignedEvent]
|
||||
@@ -27,6 +30,8 @@ export type RelayOkPayload = [string, boolean, string]
|
||||
|
||||
export type RelayAuth = [RelayMessageType.Auth, ...RelayAuthPayload]
|
||||
|
||||
export type RelayClosed = [RelayMessageType.Closed, ...RelayClosedPayload]
|
||||
|
||||
export type RelayEose = [RelayMessageType.Eose, ...RelayEosePayload]
|
||||
|
||||
export type RelayEvent = [RelayMessageType.Event, ...RelayEventPayload]
|
||||
@@ -39,6 +44,8 @@ export type RelayOk = [RelayMessageType.Ok, ...RelayOkPayload]
|
||||
|
||||
export const isRelayAuth = (m: RelayMessage): m is RelayAuth => m[0] === RelayMessageType.Auth
|
||||
|
||||
export const isRelayClosed = (m: RelayMessage): m is RelayClosed => m[0] === RelayMessageType.Closed
|
||||
|
||||
export const isRelayEose = (m: RelayMessage): m is RelayEose => m[0] === RelayMessageType.Eose
|
||||
|
||||
export const isRelayEvent = (m: RelayMessage): m is RelayEvent => m[0] === RelayMessageType.Event
|
||||
|
||||
+112
-2
@@ -1,11 +1,15 @@
|
||||
import {on, spec, ago, now} from "@welshman/lib"
|
||||
import {on, sleep, spec, ago, now} from "@welshman/lib"
|
||||
import {AUTH_JOIN} from "@welshman/util"
|
||||
import {
|
||||
ClientMessage,
|
||||
isClientAuth,
|
||||
isClientClose,
|
||||
isClientEvent,
|
||||
isClientReq,
|
||||
ClientMessageType,
|
||||
RelayMessage,
|
||||
isRelayOk,
|
||||
isRelayClosed,
|
||||
} from "./message.js"
|
||||
import {Socket, SocketStatus, SocketEventType} from "./socket.js"
|
||||
import {AuthState, AuthStatus, AuthStateEventType} from "./auth.js"
|
||||
@@ -72,6 +76,62 @@ export const socketPolicyDeferOnAuth = (socket: Socket) => {
|
||||
}
|
||||
}
|
||||
|
||||
export const socketPolicyRetryAuthRequired = (socket: Socket) => {
|
||||
const retried = new Set<string>()
|
||||
const pending = new Map<string, ClientMessage>()
|
||||
|
||||
// Watch outgoing events and requests and keep a copy
|
||||
const unsubscribeSend = on(socket, SocketEventType.Send, (message: ClientMessage) => {
|
||||
if (isClientEvent(message)) {
|
||||
const [_, event] = message
|
||||
|
||||
if (!retried.has(event.id) && event.kind !== AUTH_JOIN) {
|
||||
pending.set(event.id, message)
|
||||
}
|
||||
}
|
||||
|
||||
if (isClientReq(message)) {
|
||||
const [_, id] = message
|
||||
|
||||
if (!retried.has(id)) {
|
||||
pending.set(id, message)
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
// If a message is rejected with auth-required, re-enqueue it one time
|
||||
const unsubscribeReceive = on(socket, SocketEventType.Receive, (message: RelayMessage) => {
|
||||
if (isRelayOk(message)) {
|
||||
const [_, id, ok, detail] = message
|
||||
const pendingMessage = pending.get(id)
|
||||
|
||||
if (pendingMessage && !ok && detail?.startsWith("auth-required:")) {
|
||||
socket.send(pendingMessage)
|
||||
retried.add(id)
|
||||
}
|
||||
|
||||
pending.delete(id)
|
||||
}
|
||||
|
||||
if (isRelayClosed(message)) {
|
||||
const [_, id, detail] = message
|
||||
const pendingMessage = pending.get(id)
|
||||
|
||||
if (pendingMessage && detail?.startsWith("auth-required:")) {
|
||||
socket.send(pendingMessage)
|
||||
retried.add(id)
|
||||
}
|
||||
|
||||
pending.delete(id)
|
||||
}
|
||||
})
|
||||
|
||||
return () => {
|
||||
unsubscribeSend()
|
||||
unsubscribeReceive()
|
||||
}
|
||||
}
|
||||
|
||||
export const socketPolicyConnectOnSend = (socket: Socket) => {
|
||||
let lastError = 0
|
||||
let currentStatus = SocketStatus.Closed
|
||||
@@ -106,7 +166,7 @@ export const socketPolicyCloseOnTimeout = (socket: Socket) => {
|
||||
lastActivity = now()
|
||||
})
|
||||
|
||||
const unsubscribeReceive = on(socket, SocketEventType.Receive, (message: ClientMessage) => {
|
||||
const unsubscribeReceive = on(socket, SocketEventType.Receive, (message: RelayMessage) => {
|
||||
lastActivity = now()
|
||||
})
|
||||
|
||||
@@ -123,9 +183,59 @@ export const socketPolicyCloseOnTimeout = (socket: Socket) => {
|
||||
}
|
||||
}
|
||||
|
||||
export const socketPolicyReopenActive = (socket: Socket) => {
|
||||
const pending = new Map<string, ClientMessage>()
|
||||
|
||||
let lastOpen = 0
|
||||
|
||||
const unsubscribeStatus = on(socket, SocketEventType.Status, (newStatus: SocketStatus) => {
|
||||
// Keep track of the most recent error
|
||||
if (newStatus === SocketStatus.Open) {
|
||||
lastOpen = Date.now()
|
||||
}
|
||||
|
||||
// If the socket closed and we have no error, reopen it but don't flap
|
||||
if (newStatus === SocketStatus.Closed && pending.size) {
|
||||
sleep(Math.max(0, 30_000 - (Date.now() - lastOpen))).then(() => {
|
||||
for (const message of pending.values()) {
|
||||
socket.send(message)
|
||||
}
|
||||
})
|
||||
}
|
||||
})
|
||||
|
||||
const unsubscribeSend = on(socket, SocketEventType.Send, (message: ClientMessage) => {
|
||||
if (isClientEvent(message)) {
|
||||
pending.set(message[1].id, message)
|
||||
}
|
||||
|
||||
if (isClientReq(message)) {
|
||||
pending.set(message[1], message)
|
||||
}
|
||||
|
||||
if (isClientClose(message)) {
|
||||
pending.delete(message[1])
|
||||
}
|
||||
})
|
||||
|
||||
const unsubscribeReceive = on(socket, SocketEventType.Receive, (message: RelayMessage) => {
|
||||
if (isRelayClosed(message) || isRelayOk(message)) {
|
||||
pending.delete(message[1])
|
||||
}
|
||||
})
|
||||
|
||||
return () => {
|
||||
unsubscribeStatus()
|
||||
unsubscribeSend()
|
||||
unsubscribeReceive()
|
||||
}
|
||||
}
|
||||
|
||||
export const defaultSocketPolicies = [
|
||||
socketPolicySendWhenOpen,
|
||||
socketPolicyDeferOnAuth,
|
||||
socketPolicyRetryAuthRequired,
|
||||
socketPolicyConnectOnSend,
|
||||
socketPolicyCloseOnTimeout,
|
||||
socketPolicyReopenActive,
|
||||
]
|
||||
|
||||
Reference in New Issue
Block a user