Add pool, flesh out auth

This commit is contained in:
Jon Staab
2025-03-20 16:50:39 -07:00
parent 7d0d303dae
commit f00ffc5c9c
5 changed files with 351 additions and 40 deletions
+19 -8
View File
@@ -7,19 +7,21 @@ export type TaskQueueOptions<Item> = {
export class TaskQueue<Item> { export class TaskQueue<Item> {
items: Item[] = [] items: Item[] = []
isPaused = false
isProcessing = false isProcessing = false
constructor(readonly options: TaskQueueOptions<Item>) {} constructor(readonly options: TaskQueueOptions<Item>) {}
push(item: Item) { push(item: Item) {
this.items.push(item) this.items.push(item)
this.process()
if (!this.isProcessing) {
this.processBatch()
}
} }
async processBatch() { async process() {
if (this.isProcessing || this.isPaused) {
return
}
this.isProcessing = true this.isProcessing = true
for (const item of this.items.splice(0, this.options.batchSize)) { for (const item of this.items.splice(0, this.options.batchSize)) {
@@ -30,15 +32,24 @@ export class TaskQueue<Item> {
} }
} }
this.isProcessing = false
if (this.items.length > 0) { if (this.items.length > 0) {
await yieldThread() await yieldThread()
this.processBatch() this.process()
} else {
this.isProcessing = false
} }
} }
stop() {
this.isPaused = true
}
start() {
this.isPaused = false
this.process()
}
clear() { clear() {
this.items = [] this.items = []
} }
+133 -9
View File
@@ -1,6 +1,7 @@
import type {SignedEvent} from "@welshman/util" import {sleep} from "@welshman/lib"
import type {SignedEvent, StampedEvent} from "@welshman/util"
import {makeEvent, CLIENT_AUTH} from "@welshman/util" import {makeEvent, CLIENT_AUTH} from "@welshman/util"
import type {ISocket} from "./socket.js" import {Socket, SocketStatus, SocketUnsubscriber} from "./socket.js"
export const makeAuthEvent = (url: string, challenge: string) => export const makeAuthEvent = (url: string, challenge: string) =>
makeEvent(CLIENT_AUTH, { makeEvent(CLIENT_AUTH, {
@@ -10,16 +11,139 @@ export const makeAuthEvent = (url: string, challenge: string) =>
], ],
}) })
export enum AuthStatus {
None = "auth:status:none",
Requested = "auth:status:requested",
PendingSignature = "auth:status:pending_signature",
DeniedSignature = "auth:status:denied_signature",
PendingResponse = "auth:status:pending_response",
Forbidden = "auth:status:forbidden",
Ok = "auth:status:ok",
}
export type AuthResult = { export type AuthResult = {
ok: boolean ok: boolean
reason?: string reason?: string
} }
export const authenticate = (socket: ISocket, event: SignedEvent) => export type AuthManagerOptions = {
new Promise(resolve => { sign: (event: StampedEvent) => Promise<SignedEvent>
socket.send(["AUTH", event]) eager?: boolean
}
socket.onOk(([id, ok = false, reason = ""]) => { export class AuthManager {
if (id === event.id) resolve({ok, reason}) challenge: string | undefined
}) request: string | undefined
}) message: string | undefined
status = AuthStatus.None
_unsubscribers: SocketUnsubscriber[] = []
constructor(
readonly socket: Socket,
readonly options: AuthManagerOptions,
) {
this._unsubscribers.push(
socket.onOk(([id, ok, message]) => {
if (id === this.request) {
this.message = message
if (ok) {
this.status = AuthStatus.Ok
} else {
this.status = AuthStatus.Forbidden
}
}
}),
)
this._unsubscribers.push(
socket.onAuth(([challenge]) => {
this.challenge = challenge
this.request = undefined
this.message = undefined
this.status = AuthStatus.Requested
if (this.options.eager) {
this.respond()
}
}),
)
this._unsubscribers.push(
socket.onStatus(status => {
if (status === SocketStatus.Closed) {
this.challenge = undefined
this.request = undefined
this.message = undefined
this.status = AuthStatus.None
}
}),
)
}
async waitFor(condition: () => boolean, timeout = 300) {
const start = Date.now()
while (Date.now() - timeout <= start) {
if (condition()) {
break
}
await sleep(Math.min(100, Math.ceil(timeout / 3)))
}
}
async waitForChallenge(timeout = 300) {
await this.waitFor(() => Boolean(this.challenge), timeout)
}
async waitForResolution(timeout = 300) {
await this.waitFor(
() =>
[AuthStatus.None, AuthStatus.DeniedSignature, AuthStatus.Forbidden, AuthStatus.Ok].includes(
this.status,
),
timeout,
)
}
async attempt(timeout = 300) {
await this.socket.attemptToOpen()
await this.waitForChallenge(Math.ceil(timeout / 2))
if (this.status === AuthStatus.Requested) {
await this.respond()
}
await this.waitForResolution(Math.ceil(timeout / 2))
}
async respond() {
if (!this.challenge) {
throw new Error("Attempted to authenticate with no challenge")
}
if (this.status !== AuthStatus.Requested) {
throw new Error(`Attempted to authenticate when auth is already ${this.status}`)
}
this.status = AuthStatus.PendingSignature
const template = makeAuthEvent(this.socket.url, this.challenge)
const event = await this.options.sign(template)
if (event) {
this.request = event.id
this.socket.send(["AUTH", event])
this.status = AuthStatus.PendingResponse
} else {
this.status = AuthStatus.DeniedSignature
}
}
cleanup() {
for (const cb of this._unsubscribers) {
cb()
}
}
}
+6
View File
@@ -1,5 +1,7 @@
import type {SignedEvent} from "@welshman/util" import type {SignedEvent} from "@welshman/util"
// relay -> client
export enum RelayMessageType { export enum RelayMessageType {
Auth = "AUTH", Auth = "AUTH",
Event = "EVENT", Event = "EVENT",
@@ -36,3 +38,7 @@ export const isRelayEoseMessage = (m: RelayMessage): m is RelayEoseMessage =>
export const isRelayOkMessage = (m: RelayMessage): m is RelayOkMessage => export const isRelayOkMessage = (m: RelayMessage): m is RelayOkMessage =>
m[0] === RelayMessageType.Ok m[0] === RelayMessageType.Ok
// client -> relay
export type ClientMessage = any[]
+71
View File
@@ -0,0 +1,71 @@
import {remove} from "@welshman/lib"
import {normalizeRelayUrl} from "@welshman/util"
import {ISocket, makeSocket} from "./socket.js"
export type PoolSubscription = (socket: ISocket) => void
export type PoolOptions = {
makeSocket?: (url: string) => ISocket
}
export class Pool {
_data = new Map<string, ISocket>()
_subs: PoolSubscription[] = []
constructor(readonly options: PoolOptions) {}
has(url: string) {
return this._data.has(url)
}
makeSocket(url: string) {
if (this.options.makeSocket) {
return this.options.makeSocket(url)
}
return makeSocket(url)
}
get(_url: string): ISocket {
const url = normalizeRelayUrl(_url)
const oldSocket = this._data.get(url)
if (oldSocket) {
return oldSocket
}
const socket = this.makeSocket(url)
this._data.set(url, socket)
for (const cb of this._subs) {
cb(socket)
}
return socket
}
subscribe(cb: PoolSubscription) {
this._subs.push(cb)
return () => {
this._subs = remove(cb, this._subs)
}
}
remove(url: string) {
const socket = this._data.get(url)
if (socket) {
socket.cleanup()
this._data.delete(url)
}
}
clear() {
for (const url of this._data.keys()) {
this.remove(url)
}
}
}
+122 -23
View File
@@ -1,11 +1,12 @@
import WebSocket from "isomorphic-ws" import WebSocket from "isomorphic-ws"
import {remove, TaskQueue} from "@welshman/lib" import {remove, now, ago, TaskQueue} from "@welshman/lib"
import type { import type {
RelayMessage, RelayMessage,
RelayAuthPayload, RelayAuthPayload,
RelayEosePayload, RelayEosePayload,
RelayEventPayload, RelayEventPayload,
RelayOkPayload, RelayOkPayload,
ClientMessage,
} from "./message.js" } from "./message.js"
import { import {
isRelayAuthMessage, isRelayAuthMessage,
@@ -70,7 +71,9 @@ export const isSocketStatusEvent = (event: SocketEvent): event is SocketStatusEv
export const isSocketMessageEvent = (event: SocketEvent): event is SocketMessageEvent => export const isSocketMessageEvent = (event: SocketEvent): event is SocketMessageEvent =>
event.type === SocketEventType.Message event.type === SocketEventType.Message
export type SocketSubscriber = (event: SocketEvent) => void export type SocketSendSubscriber = (message: ClientMessage) => void
export type SocketRecvSubscriber = (event: SocketEvent) => void
export type SocketUnsubscriber = () => void export type SocketUnsubscriber = () => void
@@ -78,8 +81,9 @@ export interface ISocket {
open(): void open(): void
close(): void close(): void
cleanup(): void cleanup(): void
send(...message: any[]): void send(message: ClientMessage): void
subscribe(cb: SocketSubscriber): SocketUnsubscriber onSend(cb: SocketSendSubscriber): SocketUnsubscriber
subscribe(cb: SocketRecvSubscriber): SocketUnsubscriber
onError(cb: (error: string) => void): SocketUnsubscriber onError(cb: (error: string) => void): SocketUnsubscriber
onStatus(cb: (status: SocketStatus) => void): SocketUnsubscriber onStatus(cb: (status: SocketStatus) => void): SocketUnsubscriber
onMessage(cb: (message: RelayMessage) => void): SocketUnsubscriber onMessage(cb: (message: RelayMessage) => void): SocketUnsubscriber
@@ -92,14 +96,27 @@ export interface ISocket {
export class Socket implements ISocket { export class Socket implements ISocket {
_ws?: WebSocket _ws?: WebSocket
_subs: SocketSubscriber[] = [] _sendSubs: SocketSendSubscriber[] = []
_queue: TaskQueue<SocketEvent> _recvSubs: SocketRecvSubscriber[] = []
_sendQueue: TaskQueue<ClientMessage>
_recvQueue: TaskQueue<SocketEvent>
constructor(readonly url: string) { constructor(readonly url: string) {
this._queue = new TaskQueue<SocketEvent>({ this._sendQueue = new TaskQueue<ClientMessage>({
batchSize: 50,
processItem: (message: ClientMessage) => {
this._ws?.send(JSON.stringify(message))
for (const cb of this._sendSubs) {
cb(message)
}
},
})
this._recvQueue = new TaskQueue<SocketEvent>({
batchSize: 50, batchSize: 50,
processItem: (event: SocketEvent) => { processItem: (event: SocketEvent) => {
for (const cb of this._subs) { for (const cb of this._recvSubs) {
cb(event) cb(event)
} }
}, },
@@ -107,12 +124,26 @@ export class Socket implements ISocket {
} }
open = () => { open = () => {
if (this._ws) {
throw new Error("Attempted to open a websocket that has not been closed")
}
try { try {
this._ws = new WebSocket(this.url) this._ws = new WebSocket(this.url)
this._queue.push(makeSocketStatusEvent(SocketStatus.Opening)) this._recvQueue.push(makeSocketStatusEvent(SocketStatus.Opening))
this._ws.onopen = () => this._queue.push(makeSocketStatusEvent(SocketStatus.Open))
this._ws.onerror = () => this._queue.push(makeSocketStatusEvent(SocketStatus.Error)) this._ws.onopen = () => this._recvQueue.push(makeSocketStatusEvent(SocketStatus.Open))
this._ws.onclose = () => this._queue.push(makeSocketStatusEvent(SocketStatus.Closed))
this._ws.onerror = () => {
this._recvQueue.push(makeSocketStatusEvent(SocketStatus.Error))
this._ws = undefined
}
this._ws.onclose = () => {
this._recvQueue.push(makeSocketStatusEvent(SocketStatus.Closed))
this._ws = undefined
}
this._ws.onmessage = (event: any) => { this._ws.onmessage = (event: any) => {
const data = event.data as string const data = event.data as string
@@ -120,16 +151,22 @@ export class Socket implements ISocket {
const message = JSON.parse(data) const message = JSON.parse(data)
if (Array.isArray(message)) { if (Array.isArray(message)) {
this._queue.push(makeSocketMessageEvent(message as RelayMessage)) this._recvQueue.push(makeSocketMessageEvent(message as RelayMessage))
} else { } else {
this._queue.push(makeSocketErrorEvent("Invalid message received")) this._recvQueue.push(makeSocketErrorEvent("Invalid message received"))
} }
} catch (e) { } catch (e) {
this._queue.push(makeSocketErrorEvent("Invalid message received")) this._recvQueue.push(makeSocketErrorEvent("Invalid message received"))
} }
} }
} catch (e) { } catch (e) {
this._queue.push(makeSocketStatusEvent(SocketStatus.Invalid)) this._recvQueue.push(makeSocketStatusEvent(SocketStatus.Invalid))
}
}
attemptToOpen = () => {
if (!this._ws) {
this.open()
} }
} }
@@ -140,19 +177,29 @@ export class Socket implements ISocket {
cleanup = () => { cleanup = () => {
this.close() this.close()
this._subs = [] this._recvSubs = []
this._queue.clear() this._recvQueue.clear()
this._sendSubs = []
this._sendQueue.clear()
} }
send = (...message: any[]) => { send = (message: ClientMessage) => {
this._ws?.send(JSON.stringify(message)) this._sendQueue.push(message)
} }
subscribe = (cb: SocketSubscriber) => { onSend = (cb: SocketSendSubscriber) => {
this._subs.push(cb) this._sendSubs.push(cb)
return () => { return () => {
this._subs = remove(cb, this._subs) this._sendSubs = remove(cb, this._sendSubs)
}
}
subscribe = (cb: SocketRecvSubscriber) => {
this._recvSubs.push(cb)
return () => {
this._recvSubs = remove(cb, this._recvSubs)
} }
} }
@@ -224,3 +271,55 @@ export class Socket implements ISocket {
}) })
} }
} }
export const socketPolicySendWhenOpen = (socket: Socket) => {
// Pause sending messages when the socket isn't open
const unsubscribe = socket.onStatus(newStatus => {
if (newStatus === SocketStatus.Open) {
socket._sendQueue.start()
} else {
socket._sendQueue.stop()
}
})
return unsubscribe
}
export const socketPolicyConnectOnSend = (socket: Socket) => {
let lastError = 0
let currentStatus = SocketStatus.Closed
const unsubscribeOnStatus = socket.onStatus(newStatus => {
// Keep track of the most recent error
if (newStatus === SocketStatus.Error) {
lastError = now()
}
// Keep track of the current status
currentStatus = newStatus
})
const unsubscribeOnSend = socket.onSend(message => {
// When a new message is sent, make sure the socket is open (unless there was a recent error)
if (currentStatus === SocketStatus.Closed && now() - lastError < ago(30)) {
socket.open()
}
})
return () => {
unsubscribeOnStatus()
unsubscribeOnSend()
}
}
export const defaultSocketPolicies = [socketPolicySendWhenOpen, socketPolicyConnectOnSend]
export const makeSocket = (url: string, policies = defaultSocketPolicies) => {
const socket = new Socket(url)
for (const applyPolicy of defaultSocketPolicies) {
applyPolicy(socket)
}
return socket
}