diff --git a/packages/net2/src/message.ts b/packages/net2/src/message.ts index 1bdeb6a..c75667f 100644 --- a/packages/net2/src/message.ts +++ b/packages/net2/src/message.ts @@ -4,6 +4,7 @@ import type {SignedEvent, Filter} from "@welshman/util" export enum RelayMessageType { Auth = "AUTH", + Closed = "CLOSED", Eose = "EOSE", Event = "EVENT", NegErr = "NEG-ERR", @@ -15,6 +16,8 @@ export type RelayMessage = any[] export type RelayAuthPayload = [string] +export type RelayClosedPayload = [string, string] + export type RelayEosePayload = [string, SignedEvent] export type RelayEventPayload = [string, SignedEvent] @@ -27,6 +30,8 @@ export type RelayOkPayload = [string, boolean, string] export type RelayAuth = [RelayMessageType.Auth, ...RelayAuthPayload] +export type RelayClosed = [RelayMessageType.Closed, ...RelayClosedPayload] + export type RelayEose = [RelayMessageType.Eose, ...RelayEosePayload] export type RelayEvent = [RelayMessageType.Event, ...RelayEventPayload] @@ -39,6 +44,8 @@ export type RelayOk = [RelayMessageType.Ok, ...RelayOkPayload] export const isRelayAuth = (m: RelayMessage): m is RelayAuth => m[0] === RelayMessageType.Auth +export const isRelayClosed = (m: RelayMessage): m is RelayClosed => m[0] === RelayMessageType.Closed + export const isRelayEose = (m: RelayMessage): m is RelayEose => m[0] === RelayMessageType.Eose export const isRelayEvent = (m: RelayMessage): m is RelayEvent => m[0] === RelayMessageType.Event diff --git a/packages/net2/src/policy.ts b/packages/net2/src/policy.ts index f643c76..65ba2a3 100644 --- a/packages/net2/src/policy.ts +++ b/packages/net2/src/policy.ts @@ -1,11 +1,15 @@ -import {on, spec, ago, now} from "@welshman/lib" +import {on, sleep, spec, ago, now} from "@welshman/lib" import {AUTH_JOIN} from "@welshman/util" import { ClientMessage, isClientAuth, isClientClose, isClientEvent, + isClientReq, ClientMessageType, + RelayMessage, + isRelayOk, + isRelayClosed, } from "./message.js" import {Socket, SocketStatus, SocketEventType} from "./socket.js" import {AuthState, AuthStatus, AuthStateEventType} from "./auth.js" @@ -72,6 +76,62 @@ export const socketPolicyDeferOnAuth = (socket: Socket) => { } } +export const socketPolicyRetryAuthRequired = (socket: Socket) => { + const retried = new Set() + const pending = new Map() + + // Watch outgoing events and requests and keep a copy + const unsubscribeSend = on(socket, SocketEventType.Send, (message: ClientMessage) => { + if (isClientEvent(message)) { + const [_, event] = message + + if (!retried.has(event.id) && event.kind !== AUTH_JOIN) { + pending.set(event.id, message) + } + } + + if (isClientReq(message)) { + const [_, id] = message + + if (!retried.has(id)) { + pending.set(id, message) + } + } + }) + + // If a message is rejected with auth-required, re-enqueue it one time + const unsubscribeReceive = on(socket, SocketEventType.Receive, (message: RelayMessage) => { + if (isRelayOk(message)) { + const [_, id, ok, detail] = message + const pendingMessage = pending.get(id) + + if (pendingMessage && !ok && detail?.startsWith("auth-required:")) { + socket.send(pendingMessage) + retried.add(id) + } + + pending.delete(id) + } + + if (isRelayClosed(message)) { + const [_, id, detail] = message + const pendingMessage = pending.get(id) + + if (pendingMessage && detail?.startsWith("auth-required:")) { + socket.send(pendingMessage) + retried.add(id) + } + + pending.delete(id) + } + }) + + return () => { + unsubscribeSend() + unsubscribeReceive() + } +} + export const socketPolicyConnectOnSend = (socket: Socket) => { let lastError = 0 let currentStatus = SocketStatus.Closed @@ -106,7 +166,7 @@ export const socketPolicyCloseOnTimeout = (socket: Socket) => { lastActivity = now() }) - const unsubscribeReceive = on(socket, SocketEventType.Receive, (message: ClientMessage) => { + const unsubscribeReceive = on(socket, SocketEventType.Receive, (message: RelayMessage) => { lastActivity = now() }) @@ -123,9 +183,59 @@ export const socketPolicyCloseOnTimeout = (socket: Socket) => { } } +export const socketPolicyReopenActive = (socket: Socket) => { + const pending = new Map() + + let lastOpen = 0 + + const unsubscribeStatus = on(socket, SocketEventType.Status, (newStatus: SocketStatus) => { + // Keep track of the most recent error + if (newStatus === SocketStatus.Open) { + lastOpen = Date.now() + } + + // If the socket closed and we have no error, reopen it but don't flap + if (newStatus === SocketStatus.Closed && pending.size) { + sleep(Math.max(0, 30_000 - (Date.now() - lastOpen))).then(() => { + for (const message of pending.values()) { + socket.send(message) + } + }) + } + }) + + const unsubscribeSend = on(socket, SocketEventType.Send, (message: ClientMessage) => { + if (isClientEvent(message)) { + pending.set(message[1].id, message) + } + + if (isClientReq(message)) { + pending.set(message[1], message) + } + + if (isClientClose(message)) { + pending.delete(message[1]) + } + }) + + const unsubscribeReceive = on(socket, SocketEventType.Receive, (message: RelayMessage) => { + if (isRelayClosed(message) || isRelayOk(message)) { + pending.delete(message[1]) + } + }) + + return () => { + unsubscribeStatus() + unsubscribeSend() + unsubscribeReceive() + } +} + export const defaultSocketPolicies = [ socketPolicySendWhenOpen, socketPolicyDeferOnAuth, + socketPolicyRetryAuthRequired, socketPolicyConnectOnSend, socketPolicyCloseOnTimeout, + socketPolicyReopenActive, ]