Add task queue, work on socket

This commit is contained in:
Jon Staab
2025-03-20 14:22:40 -07:00
parent 7706034b99
commit 7d0d303dae
10 changed files with 345 additions and 35 deletions
+3
View File
@@ -7,6 +7,9 @@
"publishConfig": {
"access": "public"
},
"engines": {
"node": ">=12.0.0"
},
"type": "module",
"files": [
"build"
+45
View File
@@ -0,0 +1,45 @@
import {yieldThread} from "./Tools.js"
export type TaskQueueOptions<Item> = {
batchSize: number
processItem: (item: Item) => unknown
}
export class TaskQueue<Item> {
items: Item[] = []
isProcessing = false
constructor(readonly options: TaskQueueOptions<Item>) {}
push(item: Item) {
this.items.push(item)
if (!this.isProcessing) {
this.processBatch()
}
}
async processBatch() {
this.isProcessing = true
for (const item of this.items.splice(0, this.options.batchSize)) {
try {
await this.options.processItem(item)
} catch (e) {
console.error(e)
}
}
if (this.items.length > 0) {
await yieldThread()
this.processBatch()
} else {
this.isProcessing = false
}
}
clear() {
this.items = []
}
}
+18
View File
@@ -351,6 +351,24 @@ export const displayDomain = (url: string) => displayUrl(first(url.split(/[\/\?]
*/
export const sleep = (t: number) => new Promise(resolve => setTimeout(resolve, t))
/**
* Creates a microtask that yields to other tasks in the event loop
* @returns Promise that resolves after yielding
*/
export const yieldThread = () => {
if (
typeof window !== "undefined" &&
"scheduler" in window &&
"yield" in (window as any).scheduler
) {
return (window as any).scheduler.yield()
}
return new Promise<void>(resolve => {
setTimeout(resolve, 0)
})
}
/**
* Concatenates multiple arrays, filtering out null/undefined
* @param xs - Arrays to concatenate
+1
View File
@@ -4,6 +4,7 @@ export * from "./Emitter.js"
export * from "./LRUCache.js"
export * from "./Tools.js"
export * from "./Worker.js"
export * from "./TaskQueue.js"
export {default as normalizeUrl} from "./normalize-url/index.js"
declare module "@welshman/lib" {
-1
View File
@@ -33,7 +33,6 @@
"@welshman/lib": "~0.0.40",
"@welshman/util": "~0.0.59",
"isomorphic-ws": "^5.0.0",
"rxjs": "^7.8.1",
"ws": "^8.16.0"
}
}
+10 -23
View File
@@ -1,11 +1,9 @@
import type {WebSocketSubject} from "rxjs/websocket"
import {Subject} from "rxjs"
import type {SignedEvent} from "@welshman/util"
import {createEvent, CLIENT_AUTH} from "@welshman/util"
import type {SocketResponse} from "./socket.js"
import {makeEvent, CLIENT_AUTH} from "@welshman/util"
import type {ISocket} from "./socket.js"
export const createAuthEvent = (url: string, challenge: string) =>
createEvent(CLIENT_AUTH, {
export const makeAuthEvent = (url: string, challenge: string) =>
makeEvent(CLIENT_AUTH, {
tags: [
["relay", url],
["challenge", challenge],
@@ -17,22 +15,11 @@ export type AuthResult = {
reason?: string
}
export const authenticate = (socket: WebSocketSubject<SocketResponse>, event: SignedEvent) => {
const subject = new Subject<AuthResult>()
export const authenticate = (socket: ISocket, event: SignedEvent) =>
new Promise(resolve => {
socket.send(["AUTH", event])
socket.next(["AUTH", event])
socket.subscribe(message => {
if (message[0] === "OK") {
const [id, ok = false, reason = ""] = message.slice(1)
if (id === event.id) {
subject.next({ok, reason})
}
}
socket.onOk(([id, ok = false, reason = ""]) => {
if (id === event.id) resolve({ok, reason})
})
})
return subject
}
export const forceAuth = <T>(socket: WebSocketSubject<T>) => {}
+38
View File
@@ -0,0 +1,38 @@
import type {SignedEvent} from "@welshman/util"
export enum RelayMessageType {
Auth = "AUTH",
Event = "EVENT",
Eose = "EOSE",
Ok = "OK",
}
export type RelayAuthPayload = [string]
export type RelayEventPayload = [string, SignedEvent]
export type RelayEosePayload = [string, SignedEvent]
export type RelayOkPayload = [string, boolean, string]
export type RelayAuthMessage = ["AUTH", ...RelayAuthPayload]
export type RelayEventMessage = ["EVENT", ...RelayEventPayload]
export type RelayEoseMessage = ["EOSE", ...RelayEosePayload]
export type RelayOkMessage = ["OK", ...RelayOkPayload]
export type RelayMessage = any[]
export const isRelayAuthMessage = (m: RelayMessage): m is RelayAuthMessage =>
m[0] === RelayMessageType.Auth
export const isRelayEventMessage = (m: RelayMessage): m is RelayEventMessage =>
m[0] === RelayMessageType.Event
export const isRelayEoseMessage = (m: RelayMessage): m is RelayEoseMessage =>
m[0] === RelayMessageType.Eose
export const isRelayOkMessage = (m: RelayMessage): m is RelayOkMessage =>
m[0] === RelayMessageType.Ok
+224 -8
View File
@@ -1,10 +1,226 @@
import {webSocket} from "rxjs/websocket"
import type {SignedEvent} from "@welshman/util"
import WebSocket from "isomorphic-ws"
import {remove, TaskQueue} from "@welshman/lib"
import type {
RelayMessage,
RelayAuthPayload,
RelayEosePayload,
RelayEventPayload,
RelayOkPayload,
} from "./message.js"
import {
isRelayAuthMessage,
isRelayEoseMessage,
isRelayEventMessage,
isRelayOkMessage,
} from "./message.js"
export type SocketResponse =
| ["AUTH", string]
| ["EVENT", string, SignedEvent]
| ["EOSE", string, SignedEvent]
| ["OK", string, boolean, string]
export enum SocketStatus {
Open = "socket:status:open",
Opening = "socket:status:opening",
Closing = "socket:status:closing",
Closed = "socket:status:closed",
Error = "socket:status:error",
Invalid = "socket:status:invalid",
}
export const connect = (url: string) => webSocket<SocketResponse>(url)
export enum SocketEventType {
Error = "socket:event:error",
Status = "socket:event:status",
Message = "socket:event:message",
}
export type SocketErrorEvent = {
type: SocketEventType.Error
error: string
}
export type SocketStatusEvent = {
type: SocketEventType.Status
status: SocketStatus
}
export type SocketMessageEvent = {
type: SocketEventType.Message
message: RelayMessage
}
export type SocketEvent = SocketErrorEvent | SocketStatusEvent | SocketMessageEvent
export const makeSocketErrorEvent = (error: string): SocketErrorEvent => ({
type: SocketEventType.Error,
error,
})
export const makeSocketStatusEvent = (status: SocketStatus): SocketStatusEvent => ({
type: SocketEventType.Status,
status,
})
export const makeSocketMessageEvent = (message: RelayMessage): SocketMessageEvent => ({
type: SocketEventType.Message,
message,
})
export const isSocketErrorEvent = (event: SocketEvent): event is SocketErrorEvent =>
event.type === SocketEventType.Error
export const isSocketStatusEvent = (event: SocketEvent): event is SocketStatusEvent =>
event.type === SocketEventType.Status
export const isSocketMessageEvent = (event: SocketEvent): event is SocketMessageEvent =>
event.type === SocketEventType.Message
export type SocketSubscriber = (event: SocketEvent) => void
export type SocketUnsubscriber = () => void
export interface ISocket {
open(): void
close(): void
cleanup(): void
send(...message: any[]): void
subscribe(cb: SocketSubscriber): SocketUnsubscriber
onError(cb: (error: string) => void): SocketUnsubscriber
onStatus(cb: (status: SocketStatus) => void): SocketUnsubscriber
onMessage(cb: (message: RelayMessage) => void): SocketUnsubscriber
onAuth(cb: (message: RelayAuthPayload) => void): SocketUnsubscriber
onEose(cb: (message: RelayEosePayload) => void): SocketUnsubscriber
onEvent(cb: (message: RelayEventPayload) => void): SocketUnsubscriber
onOk(cb: (message: RelayOkPayload) => void): SocketUnsubscriber
wrap(overrides: Partial<ISocket>): ISocket
}
export class Socket implements ISocket {
_ws?: WebSocket
_subs: SocketSubscriber[] = []
_queue: TaskQueue<SocketEvent>
constructor(readonly url: string) {
this._queue = new TaskQueue<SocketEvent>({
batchSize: 50,
processItem: (event: SocketEvent) => {
for (const cb of this._subs) {
cb(event)
}
},
})
}
open = () => {
try {
this._ws = new WebSocket(this.url)
this._queue.push(makeSocketStatusEvent(SocketStatus.Opening))
this._ws.onopen = () => this._queue.push(makeSocketStatusEvent(SocketStatus.Open))
this._ws.onerror = () => this._queue.push(makeSocketStatusEvent(SocketStatus.Error))
this._ws.onclose = () => this._queue.push(makeSocketStatusEvent(SocketStatus.Closed))
this._ws.onmessage = (event: any) => {
const data = event.data as string
try {
const message = JSON.parse(data)
if (Array.isArray(message)) {
this._queue.push(makeSocketMessageEvent(message as RelayMessage))
} else {
this._queue.push(makeSocketErrorEvent("Invalid message received"))
}
} catch (e) {
this._queue.push(makeSocketErrorEvent("Invalid message received"))
}
}
} catch (e) {
this._queue.push(makeSocketStatusEvent(SocketStatus.Invalid))
}
}
close = () => {
this._ws?.close()
this._ws = undefined
}
cleanup = () => {
this.close()
this._subs = []
this._queue.clear()
}
send = (...message: any[]) => {
this._ws?.send(JSON.stringify(message))
}
subscribe = (cb: SocketSubscriber) => {
this._subs.push(cb)
return () => {
this._subs = remove(cb, this._subs)
}
}
onError = (cb: (error: string) => void) => {
return this.subscribe((event: SocketEvent) => {
if (isSocketErrorEvent(event)) {
cb(event.error)
}
})
}
onStatus = (cb: (status: SocketStatus) => void) => {
return this.subscribe((event: SocketEvent) => {
if (isSocketStatusEvent(event)) {
cb(event.status)
}
})
}
onMessage = (cb: (message: RelayMessage) => void) => {
return this.subscribe((event: SocketEvent) => {
if (isSocketMessageEvent(event)) {
cb(event.message)
}
})
}
onAuth = (cb: (message: RelayAuthPayload) => void) => {
return this.onMessage((message: RelayMessage) => {
if (isRelayAuthMessage(message)) {
cb(message.slice(1) as RelayAuthPayload)
}
})
}
onEose = (cb: (message: RelayEosePayload) => void) => {
return this.onMessage((message: RelayMessage) => {
if (isRelayEoseMessage(message)) {
cb(message.slice(1) as RelayEosePayload)
}
})
}
onEvent = (cb: (message: RelayEventPayload) => void) => {
return this.onMessage((message: RelayMessage) => {
if (isRelayEventMessage(message)) {
cb(message.slice(1) as RelayEventPayload)
}
})
}
onOk = (cb: (message: RelayOkPayload) => void) => {
return this.onMessage((message: RelayMessage) => {
if (isRelayOkMessage(message)) {
cb(message.slice(1) as RelayOkPayload)
}
})
}
wrap = (overrides: Partial<ISocket>): ISocket => {
return new Proxy(this, {
get: (target, prop: keyof ISocket) => {
if (prop in overrides) {
return overrides[prop]
}
return target[prop]
},
})
}
}
+5 -3
View File
@@ -46,17 +46,19 @@ export type TrustedEvent = HashedEvent & {
[verifiedSymbol]?: boolean
}
export type CreateEventOpts = {
export type MakeEventOpts = {
content?: string
tags?: string[][]
created_at?: number
}
export const createEvent = (
export const makeEvent = (
kind: number,
{content = "", tags = [], created_at = now()}: CreateEventOpts = {},
{content = "", tags = [], created_at = now()}: MakeEventOpts = {},
) => ({kind, content, tags, created_at})
export const createEvent = makeEvent
export const isEventTemplate = (e: EventTemplate): e is EventTemplate =>
Boolean(typeof e.kind === "number" && Array.isArray(e.tags) && typeof e.content === "string")