diff --git a/.fdignore b/.fdignore index dd87e2d..43d4ba4 100644 --- a/.fdignore +++ b/.fdignore @@ -1,2 +1,3 @@ node_modules build +docs diff --git a/packages/lib/package.json b/packages/lib/package.json index 1e6fd61..dd3f04a 100644 --- a/packages/lib/package.json +++ b/packages/lib/package.json @@ -7,6 +7,9 @@ "publishConfig": { "access": "public" }, + "engines": { + "node": ">=12.0.0" + }, "type": "module", "files": [ "build" diff --git a/packages/lib/src/TaskQueue.ts b/packages/lib/src/TaskQueue.ts new file mode 100644 index 0000000..10d936d --- /dev/null +++ b/packages/lib/src/TaskQueue.ts @@ -0,0 +1,45 @@ +import {yieldThread} from "./Tools.js" + +export type TaskQueueOptions = { + batchSize: number + processItem: (item: Item) => unknown +} + +export class TaskQueue { + items: Item[] = [] + isProcessing = false + + constructor(readonly options: TaskQueueOptions) {} + + push(item: Item) { + this.items.push(item) + + if (!this.isProcessing) { + this.processBatch() + } + } + + async processBatch() { + this.isProcessing = true + + for (const item of this.items.splice(0, this.options.batchSize)) { + try { + await this.options.processItem(item) + } catch (e) { + console.error(e) + } + } + + if (this.items.length > 0) { + await yieldThread() + + this.processBatch() + } else { + this.isProcessing = false + } + } + + clear() { + this.items = [] + } +} diff --git a/packages/lib/src/Tools.ts b/packages/lib/src/Tools.ts index 1e2d8b0..619d94e 100644 --- a/packages/lib/src/Tools.ts +++ b/packages/lib/src/Tools.ts @@ -351,6 +351,24 @@ export const displayDomain = (url: string) => displayUrl(first(url.split(/[\/\?] */ export const sleep = (t: number) => new Promise(resolve => setTimeout(resolve, t)) +/** + * Creates a microtask that yields to other tasks in the event loop + * @returns Promise that resolves after yielding + */ +export const yieldThread = () => { + if ( + typeof window !== "undefined" && + "scheduler" in window && + "yield" in (window as any).scheduler + ) { + return (window as any).scheduler.yield() + } + + return new Promise(resolve => { + setTimeout(resolve, 0) + }) +} + /** * Concatenates multiple arrays, filtering out null/undefined * @param xs - Arrays to concatenate diff --git a/packages/lib/src/index.ts b/packages/lib/src/index.ts index b79a71c..04583fe 100644 --- a/packages/lib/src/index.ts +++ b/packages/lib/src/index.ts @@ -4,6 +4,7 @@ export * from "./Emitter.js" export * from "./LRUCache.js" export * from "./Tools.js" export * from "./Worker.js" +export * from "./TaskQueue.js" export {default as normalizeUrl} from "./normalize-url/index.js" declare module "@welshman/lib" { diff --git a/packages/net2/package.json b/packages/net2/package.json index c8fd468..5afc03f 100644 --- a/packages/net2/package.json +++ b/packages/net2/package.json @@ -33,7 +33,6 @@ "@welshman/lib": "~0.0.40", "@welshman/util": "~0.0.59", "isomorphic-ws": "^5.0.0", - "rxjs": "^7.8.1", "ws": "^8.16.0" } } diff --git a/packages/net2/src/auth.ts b/packages/net2/src/auth.ts index 1775aad..db98012 100644 --- a/packages/net2/src/auth.ts +++ b/packages/net2/src/auth.ts @@ -1,11 +1,9 @@ -import type {WebSocketSubject} from "rxjs/websocket" -import {Subject} from "rxjs" import type {SignedEvent} from "@welshman/util" -import {createEvent, CLIENT_AUTH} from "@welshman/util" -import type {SocketResponse} from "./socket.js" +import {makeEvent, CLIENT_AUTH} from "@welshman/util" +import type {ISocket} from "./socket.js" -export const createAuthEvent = (url: string, challenge: string) => - createEvent(CLIENT_AUTH, { +export const makeAuthEvent = (url: string, challenge: string) => + makeEvent(CLIENT_AUTH, { tags: [ ["relay", url], ["challenge", challenge], @@ -17,22 +15,11 @@ export type AuthResult = { reason?: string } -export const authenticate = (socket: WebSocketSubject, event: SignedEvent) => { - const subject = new Subject() +export const authenticate = (socket: ISocket, event: SignedEvent) => + new Promise(resolve => { + socket.send(["AUTH", event]) - socket.next(["AUTH", event]) - - socket.subscribe(message => { - if (message[0] === "OK") { - const [id, ok = false, reason = ""] = message.slice(1) - - if (id === event.id) { - subject.next({ok, reason}) - } - } + socket.onOk(([id, ok = false, reason = ""]) => { + if (id === event.id) resolve({ok, reason}) + }) }) - - return subject -} - -export const forceAuth = (socket: WebSocketSubject) => {} diff --git a/packages/net2/src/message.ts b/packages/net2/src/message.ts new file mode 100644 index 0000000..edeca36 --- /dev/null +++ b/packages/net2/src/message.ts @@ -0,0 +1,38 @@ +import type {SignedEvent} from "@welshman/util" + +export enum RelayMessageType { + Auth = "AUTH", + Event = "EVENT", + Eose = "EOSE", + Ok = "OK", +} + +export type RelayAuthPayload = [string] + +export type RelayEventPayload = [string, SignedEvent] + +export type RelayEosePayload = [string, SignedEvent] + +export type RelayOkPayload = [string, boolean, string] + +export type RelayAuthMessage = ["AUTH", ...RelayAuthPayload] + +export type RelayEventMessage = ["EVENT", ...RelayEventPayload] + +export type RelayEoseMessage = ["EOSE", ...RelayEosePayload] + +export type RelayOkMessage = ["OK", ...RelayOkPayload] + +export type RelayMessage = any[] + +export const isRelayAuthMessage = (m: RelayMessage): m is RelayAuthMessage => + m[0] === RelayMessageType.Auth + +export const isRelayEventMessage = (m: RelayMessage): m is RelayEventMessage => + m[0] === RelayMessageType.Event + +export const isRelayEoseMessage = (m: RelayMessage): m is RelayEoseMessage => + m[0] === RelayMessageType.Eose + +export const isRelayOkMessage = (m: RelayMessage): m is RelayOkMessage => + m[0] === RelayMessageType.Ok diff --git a/packages/net2/src/socket.ts b/packages/net2/src/socket.ts index 3df927f..2c2520d 100644 --- a/packages/net2/src/socket.ts +++ b/packages/net2/src/socket.ts @@ -1,10 +1,226 @@ -import {webSocket} from "rxjs/websocket" -import type {SignedEvent} from "@welshman/util" +import WebSocket from "isomorphic-ws" +import {remove, TaskQueue} from "@welshman/lib" +import type { + RelayMessage, + RelayAuthPayload, + RelayEosePayload, + RelayEventPayload, + RelayOkPayload, +} from "./message.js" +import { + isRelayAuthMessage, + isRelayEoseMessage, + isRelayEventMessage, + isRelayOkMessage, +} from "./message.js" -export type SocketResponse = - | ["AUTH", string] - | ["EVENT", string, SignedEvent] - | ["EOSE", string, SignedEvent] - | ["OK", string, boolean, string] +export enum SocketStatus { + Open = "socket:status:open", + Opening = "socket:status:opening", + Closing = "socket:status:closing", + Closed = "socket:status:closed", + Error = "socket:status:error", + Invalid = "socket:status:invalid", +} -export const connect = (url: string) => webSocket(url) +export enum SocketEventType { + Error = "socket:event:error", + Status = "socket:event:status", + Message = "socket:event:message", +} + +export type SocketErrorEvent = { + type: SocketEventType.Error + error: string +} + +export type SocketStatusEvent = { + type: SocketEventType.Status + status: SocketStatus +} + +export type SocketMessageEvent = { + type: SocketEventType.Message + message: RelayMessage +} + +export type SocketEvent = SocketErrorEvent | SocketStatusEvent | SocketMessageEvent + +export const makeSocketErrorEvent = (error: string): SocketErrorEvent => ({ + type: SocketEventType.Error, + error, +}) + +export const makeSocketStatusEvent = (status: SocketStatus): SocketStatusEvent => ({ + type: SocketEventType.Status, + status, +}) + +export const makeSocketMessageEvent = (message: RelayMessage): SocketMessageEvent => ({ + type: SocketEventType.Message, + message, +}) + +export const isSocketErrorEvent = (event: SocketEvent): event is SocketErrorEvent => + event.type === SocketEventType.Error + +export const isSocketStatusEvent = (event: SocketEvent): event is SocketStatusEvent => + event.type === SocketEventType.Status + +export const isSocketMessageEvent = (event: SocketEvent): event is SocketMessageEvent => + event.type === SocketEventType.Message + +export type SocketSubscriber = (event: SocketEvent) => void + +export type SocketUnsubscriber = () => void + +export interface ISocket { + open(): void + close(): void + cleanup(): void + send(...message: any[]): void + subscribe(cb: SocketSubscriber): SocketUnsubscriber + onError(cb: (error: string) => void): SocketUnsubscriber + onStatus(cb: (status: SocketStatus) => void): SocketUnsubscriber + onMessage(cb: (message: RelayMessage) => void): SocketUnsubscriber + onAuth(cb: (message: RelayAuthPayload) => void): SocketUnsubscriber + onEose(cb: (message: RelayEosePayload) => void): SocketUnsubscriber + onEvent(cb: (message: RelayEventPayload) => void): SocketUnsubscriber + onOk(cb: (message: RelayOkPayload) => void): SocketUnsubscriber + wrap(overrides: Partial): ISocket +} + +export class Socket implements ISocket { + _ws?: WebSocket + _subs: SocketSubscriber[] = [] + _queue: TaskQueue + + constructor(readonly url: string) { + this._queue = new TaskQueue({ + batchSize: 50, + processItem: (event: SocketEvent) => { + for (const cb of this._subs) { + cb(event) + } + }, + }) + } + + open = () => { + try { + this._ws = new WebSocket(this.url) + this._queue.push(makeSocketStatusEvent(SocketStatus.Opening)) + this._ws.onopen = () => this._queue.push(makeSocketStatusEvent(SocketStatus.Open)) + this._ws.onerror = () => this._queue.push(makeSocketStatusEvent(SocketStatus.Error)) + this._ws.onclose = () => this._queue.push(makeSocketStatusEvent(SocketStatus.Closed)) + this._ws.onmessage = (event: any) => { + const data = event.data as string + + try { + const message = JSON.parse(data) + + if (Array.isArray(message)) { + this._queue.push(makeSocketMessageEvent(message as RelayMessage)) + } else { + this._queue.push(makeSocketErrorEvent("Invalid message received")) + } + } catch (e) { + this._queue.push(makeSocketErrorEvent("Invalid message received")) + } + } + } catch (e) { + this._queue.push(makeSocketStatusEvent(SocketStatus.Invalid)) + } + } + + close = () => { + this._ws?.close() + this._ws = undefined + } + + cleanup = () => { + this.close() + this._subs = [] + this._queue.clear() + } + + send = (...message: any[]) => { + this._ws?.send(JSON.stringify(message)) + } + + subscribe = (cb: SocketSubscriber) => { + this._subs.push(cb) + + return () => { + this._subs = remove(cb, this._subs) + } + } + + onError = (cb: (error: string) => void) => { + return this.subscribe((event: SocketEvent) => { + if (isSocketErrorEvent(event)) { + cb(event.error) + } + }) + } + + onStatus = (cb: (status: SocketStatus) => void) => { + return this.subscribe((event: SocketEvent) => { + if (isSocketStatusEvent(event)) { + cb(event.status) + } + }) + } + + onMessage = (cb: (message: RelayMessage) => void) => { + return this.subscribe((event: SocketEvent) => { + if (isSocketMessageEvent(event)) { + cb(event.message) + } + }) + } + + onAuth = (cb: (message: RelayAuthPayload) => void) => { + return this.onMessage((message: RelayMessage) => { + if (isRelayAuthMessage(message)) { + cb(message.slice(1) as RelayAuthPayload) + } + }) + } + + onEose = (cb: (message: RelayEosePayload) => void) => { + return this.onMessage((message: RelayMessage) => { + if (isRelayEoseMessage(message)) { + cb(message.slice(1) as RelayEosePayload) + } + }) + } + + onEvent = (cb: (message: RelayEventPayload) => void) => { + return this.onMessage((message: RelayMessage) => { + if (isRelayEventMessage(message)) { + cb(message.slice(1) as RelayEventPayload) + } + }) + } + + onOk = (cb: (message: RelayOkPayload) => void) => { + return this.onMessage((message: RelayMessage) => { + if (isRelayOkMessage(message)) { + cb(message.slice(1) as RelayOkPayload) + } + }) + } + + wrap = (overrides: Partial): ISocket => { + return new Proxy(this, { + get: (target, prop: keyof ISocket) => { + if (prop in overrides) { + return overrides[prop] + } + + return target[prop] + }, + }) + } +} diff --git a/packages/util/src/Events.ts b/packages/util/src/Events.ts index 9b16ded..27117b2 100644 --- a/packages/util/src/Events.ts +++ b/packages/util/src/Events.ts @@ -46,17 +46,19 @@ export type TrustedEvent = HashedEvent & { [verifiedSymbol]?: boolean } -export type CreateEventOpts = { +export type MakeEventOpts = { content?: string tags?: string[][] created_at?: number } -export const createEvent = ( +export const makeEvent = ( kind: number, - {content = "", tags = [], created_at = now()}: CreateEventOpts = {}, + {content = "", tags = [], created_at = now()}: MakeEventOpts = {}, ) => ({kind, content, tags, created_at}) +export const createEvent = makeEvent + export const isEventTemplate = (e: EventTemplate): e is EventTemplate => Boolean(typeof e.kind === "number" && Array.isArray(e.tags) && typeof e.content === "string")