From 4237b145aeeeb2ca605e465b9b848a8feff4e8bc Mon Sep 17 00:00:00 2001 From: Jon Staab Date: Mon, 31 Mar 2025 09:35:48 -0700 Subject: [PATCH] Use default pool, make adapter context optional --- packages/dvm/src/handler.ts | 16 ++--- packages/dvm/src/request.ts | 13 +--- packages/net/__tests__/Publish.test.ts | 42 ++++++------ packages/net/__tests__/Socket.test.ts | 22 +++---- packages/net/__tests__/adapter.test.ts | 10 +-- packages/net/__tests__/auth.test.ts | 22 +++---- packages/net/__tests__/policy.test.ts | 88 +++++++++++++------------- packages/net/__tests__/request.test.ts | 30 ++++----- packages/net/src/Pool.ts | 10 +++ packages/net/src/Publish.ts | 58 ++++++++--------- packages/net/src/Socket.ts | 34 +++++----- packages/net/src/adapter.ts | 38 ++++++----- packages/net/src/auth.ts | 16 ++--- packages/net/src/diff.ts | 36 +++++------ packages/net/src/policy.ts | 26 ++++---- packages/net/src/request.ts | 82 ++++++++++++------------ packages/signer/src/signers/nip46.ts | 40 ++++++------ 17 files changed, 293 insertions(+), 290 deletions(-) diff --git a/packages/dvm/src/handler.ts b/packages/dvm/src/handler.ts index 106f779..f262d5f 100644 --- a/packages/dvm/src/handler.ts +++ b/packages/dvm/src/handler.ts @@ -2,13 +2,7 @@ import {hexToBytes} from "@noble/hashes/utils" import {getPublicKey, finalizeEvent} from "nostr-tools/pure" import {now} from "@welshman/lib" import {TrustedEvent, StampedEvent, Filter} from "@welshman/util" -import { - multireq, - multicast, - PublishEventType, - RequestEventType, - AdapterContext, -} from "@welshman/net" +import {multireq, multicast, PublishEvent, RequestEvent, AdapterContext} from "@welshman/net" export type DVMHandler = { stop?: () => void @@ -20,10 +14,10 @@ export type CreateDVMHandler = (dvm: DVM) => DVMHandler export type DVMOpts = { sk: string relays: string[] - context: AdapterContext handlers: Record expireAfter?: number requireMention?: boolean + context?: AdapterContext } export class DVM { @@ -55,8 +49,8 @@ export class DVM { const sub = multireq({relays, filter, context}) - sub.on(RequestEventType.Event, (e: TrustedEvent, url: string) => this.onEvent(e)) - sub.on(RequestEventType.Close, () => resolve()) + sub.on(RequestEvent.Event, (e: TrustedEvent, url: string) => this.onEvent(e)) + sub.on(RequestEvent.Close, () => resolve()) }) } } @@ -119,7 +113,7 @@ export class DVM { const event = finalizeEvent(template, hexToBytes(sk)) await new Promise(resolve => { - multicast({event, relays, context}).on(PublishEventType.Complete, resolve) + multicast({event, relays, context}).on(PublishEvent.Complete, resolve) }) } } diff --git a/packages/dvm/src/request.ts b/packages/dvm/src/request.ts index c446a77..64f8501 100644 --- a/packages/dvm/src/request.ts +++ b/packages/dvm/src/request.ts @@ -1,13 +1,6 @@ import {Emitter, now} from "@welshman/lib" import {TrustedEvent, SignedEvent, Filter} from "@welshman/util" -import { - multireq, - multicast, - Multireq, - Multicast, - RequestEventType, - AdapterContext, -} from "@welshman/net" +import {multireq, multicast, Multireq, Multicast, RequestEvent, AdapterContext} from "@welshman/net" export enum DVMEvent { Progress = "progress", @@ -17,10 +10,10 @@ export enum DVMEvent { export type DVMRequestOptions = { event: SignedEvent relays: string[] - context: AdapterContext timeout?: number autoClose?: boolean reportProgress?: boolean + context?: AdapterContext } export type DVMRequest = { @@ -47,7 +40,7 @@ export const makeDvmRequest = (request: DVMRequestOptions) => { const sub = multireq({relays, filter, timeout, context}) const pub = multicast({relays, event, timeout, context}) - sub.on(RequestEventType.Event, (event: TrustedEvent, url: string) => { + sub.on(RequestEvent.Event, (event: TrustedEvent, url: string) => { if (event.kind === 7000) { emitter.emit(DVMEvent.Progress, url, event) } else { diff --git a/packages/net/__tests__/Publish.test.ts b/packages/net/__tests__/Publish.test.ts index 70157ed..2a726c0 100644 --- a/packages/net/__tests__/Publish.test.ts +++ b/packages/net/__tests__/Publish.test.ts @@ -1,7 +1,7 @@ import { describe, expect, it, vi, beforeEach, afterEach } from "vitest" import { EventEmitter } from "events" -import { Unicast, Multicast, PublishEventType, PublishStatus, unicast, multicast } from "../src/publish" -import { AbstractAdapter, AdapterEventType } from "../src/adapter" +import { Unicast, Multicast, PublishEvent, PublishStatus, unicast, multicast } from "../src/publish" +import { AbstractAdapter, AdapterEvent } from "../src/adapter" import { ClientMessageType, RelayMessage } from "../src/message" import { SignedEvent, makeEvent } from "@welshman/util" import { Nip01Signer } from '@welshman/signer' @@ -20,7 +20,7 @@ class MockAdapter extends AbstractAdapter { } receive = (message: RelayMessage) => { - this.emit(AdapterEventType.Receive, message, this.url) + this.emit(AdapterEvent.Receive, message, this.url) } } @@ -49,9 +49,9 @@ describe("Unicast", () => { const failureSpy = vi.fn() const completeSpy = vi.fn() - pub.on(PublishEventType.Success, successSpy) - pub.on(PublishEventType.Failure, failureSpy) - pub.on(PublishEventType.Complete, completeSpy) + pub.on(PublishEvent.Success, successSpy) + pub.on(PublishEvent.Failure, failureSpy) + pub.on(PublishEvent.Complete, completeSpy) await vi.advanceTimersByTimeAsync(200) @@ -82,9 +82,9 @@ describe("Unicast", () => { const failureSpy = vi.fn() const completeSpy = vi.fn() - pub.on(PublishEventType.Success, successSpy) - pub.on(PublishEventType.Failure, failureSpy) - pub.on(PublishEventType.Complete, completeSpy) + pub.on(PublishEvent.Success, successSpy) + pub.on(PublishEvent.Failure, failureSpy) + pub.on(PublishEvent.Complete, completeSpy) await vi.advanceTimersByTimeAsync(200) @@ -116,10 +116,10 @@ describe("Unicast", () => { const completeSpy = vi.fn() const timeoutSpy = vi.fn() - pub.on(PublishEventType.Success, successSpy) - pub.on(PublishEventType.Failure, failureSpy) - pub.on(PublishEventType.Complete, completeSpy) - pub.on(PublishEventType.Timeout, timeoutSpy) + pub.on(PublishEvent.Success, successSpy) + pub.on(PublishEvent.Failure, failureSpy) + pub.on(PublishEvent.Complete, completeSpy) + pub.on(PublishEvent.Timeout, timeoutSpy) await vi.runAllTimers(200) @@ -150,10 +150,10 @@ describe("Unicast", () => { const completeSpy = vi.fn() const abortSpy = vi.fn() - pub.on(PublishEventType.Success, successSpy) - pub.on(PublishEventType.Failure, failureSpy) - pub.on(PublishEventType.Complete, completeSpy) - pub.on(PublishEventType.Timeout, abortSpy) + pub.on(PublishEvent.Success, successSpy) + pub.on(PublishEvent.Failure, failureSpy) + pub.on(PublishEvent.Complete, completeSpy) + pub.on(PublishEvent.Timeout, abortSpy) await vi.runAllTimers(200) @@ -209,10 +209,10 @@ describe("Multicast", () => { const completeSpy = vi.fn() const timeoutSpy = vi.fn() - pub.on(PublishEventType.Success, successSpy) - pub.on(PublishEventType.Failure, failureSpy) - pub.on(PublishEventType.Complete, completeSpy) - pub.on(PublishEventType.Timeout, timeoutSpy) + pub.on(PublishEvent.Success, successSpy) + pub.on(PublishEvent.Failure, failureSpy) + pub.on(PublishEvent.Complete, completeSpy) + pub.on(PublishEvent.Timeout, timeoutSpy) adapter1.receive(["OK", event.id, true, "hi"]) adapter2.receive(["OK", event.id, false, "hi"]) diff --git a/packages/net/__tests__/Socket.test.ts b/packages/net/__tests__/Socket.test.ts index a61ca33..206b634 100644 --- a/packages/net/__tests__/Socket.test.ts +++ b/packages/net/__tests__/Socket.test.ts @@ -1,7 +1,7 @@ import { sleep } from "@welshman/lib" import WebSocket from 'isomorphic-ws' import { afterEach, beforeEach, describe, expect, it, vi } from "vitest" -import { Socket, SocketStatus, SocketEventType } from "../src/socket" +import { Socket, SocketStatus, SocketEvent } from "../src/socket" import { ClientMessage, RelayMessage } from "../src/message" vi.mock('isomorphic-ws', () => { @@ -39,7 +39,7 @@ describe("Socket", () => { describe("open", () => { it("should create websocket and emit opening status", () => { const statusSpy = vi.fn() - socket.on(SocketEventType.Status, statusSpy) + socket.on(SocketEvent.Status, statusSpy) socket.open() @@ -58,7 +58,7 @@ describe("Socket", () => { it("should emit invalid status on invalid URL", () => { const statusSpy = vi.fn() - socket.on(SocketEventType.Status, statusSpy) + socket.on(SocketEvent.Status, statusSpy) vi.mocked(WebSocket).mockImplementationOnce(() => { throw new Error() @@ -73,7 +73,7 @@ describe("Socket", () => { describe("close", () => { it("should close websocket and emit closed status", () => { const statusSpy = vi.fn() - socket.on(SocketEventType.Status, statusSpy) + socket.on(SocketEvent.Status, statusSpy) socket.open() @@ -89,7 +89,7 @@ describe("Socket", () => { describe("send", () => { it("should queue messages and emit enqueue event", () => { const enqueueSpy = vi.fn() - socket.on(SocketEventType.Enqueue, enqueueSpy) + socket.on(SocketEvent.Enqueue, enqueueSpy) const message: ClientMessage = ["EVENT", { id: "123", kind: 1 }] socket.send(message) @@ -99,7 +99,7 @@ describe("Socket", () => { it("should send messages when socket is open", async () => { const sendSpy = vi.fn() - socket.on(SocketEventType.Send, sendSpy) + socket.on(SocketEvent.Send, sendSpy) socket.open() socket._ws.onopen() @@ -117,7 +117,7 @@ describe("Socket", () => { describe("receive", () => { it("should handle valid relay messages", async () => { const receiveSpy = vi.fn() - socket.on(SocketEventType.Receive, receiveSpy) + socket.on(SocketEvent.Receive, receiveSpy) socket.open() const message: RelayMessage = ["EVENT", "123", { id: "123", kind: 1 }] @@ -131,7 +131,7 @@ describe("Socket", () => { it("should emit error on invalid JSON", () => { const errorSpy = vi.fn() - socket.on(SocketEventType.Error, errorSpy) + socket.on(SocketEvent.Error, errorSpy) socket.open() socket._ws.onmessage({ data: "invalid json" }) @@ -141,7 +141,7 @@ describe("Socket", () => { it("should emit error on non-array message", () => { const errorSpy = vi.fn() - socket.on(SocketEventType.Error, errorSpy) + socket.on(SocketEvent.Error, errorSpy) socket.open() socket._ws.onmessage({ data: JSON.stringify({ not: "an array" }) }) @@ -159,14 +159,14 @@ describe("Socket", () => { socket.cleanup() expect(ws.close).toHaveBeenCalled() - expect(socket.listenerCount(SocketEventType.Send)).toBe(0) + expect(socket.listenerCount(SocketEvent.Send)).toBe(0) }) }) describe("error handling", () => { it("should emit error status on websocket error", () => { const statusSpy = vi.fn() - socket.on(SocketEventType.Status, statusSpy) + socket.on(SocketEvent.Status, statusSpy) socket.open() socket._ws.onerror() diff --git a/packages/net/__tests__/adapter.test.ts b/packages/net/__tests__/adapter.test.ts index 712926d..91db9b9 100644 --- a/packages/net/__tests__/adapter.test.ts +++ b/packages/net/__tests__/adapter.test.ts @@ -1,7 +1,7 @@ import { describe, expect, it, vi, beforeEach, afterEach } from "vitest" -import { Socket, SocketEventType } from "../src/socket" +import { Socket, SocketEvent } from "../src/socket" import { Relay, LOCAL_RELAY_URL, isRelayUrl } from "@welshman/util" -import { AdapterEventType, SocketAdapter, LocalAdapter, getAdapter } from "../src/adapter" +import { AdapterEvent, SocketAdapter, LocalAdapter, getAdapter } from "../src/adapter" import { Pool } from "../src/pool" import { ClientMessage, RelayMessage } from "../src/message" import EventEmitter from "events" @@ -45,10 +45,10 @@ describe("SocketAdapter", () => { it("should forward received messages", () => { const receiveSpy = vi.fn() - adapter.on(AdapterEventType.Receive, receiveSpy) + adapter.on(AdapterEvent.Receive, receiveSpy) const message: RelayMessage = ["EVENT", "123", { id: "123", kind: 1 }] - socket.emit(SocketEventType.Receive, message, "wss://test.relay") + socket.emit(SocketEvent.Receive, message, "wss://test.relay") expect(receiveSpy).toHaveBeenCalledWith(message, "wss://test.relay") }) @@ -95,7 +95,7 @@ describe("LocalAdapter", () => { it("should forward received messages", () => { const receiveSpy = vi.fn() - adapter.on(AdapterEventType.Receive, receiveSpy) + adapter.on(AdapterEvent.Receive, receiveSpy) const message: RelayMessage = ["EVENT", "123", { id: "123", kind: 1 }] relay.emit("*", ...message) diff --git a/packages/net/__tests__/auth.test.ts b/packages/net/__tests__/auth.test.ts index b96490c..e5fba3f 100644 --- a/packages/net/__tests__/auth.test.ts +++ b/packages/net/__tests__/auth.test.ts @@ -1,8 +1,8 @@ import { describe, expect, it, vi, beforeEach, afterEach } from "vitest" -import { Socket, SocketStatus, SocketEventType } from "../src/socket" +import { Socket, SocketStatus, SocketEvent } from "../src/socket" import { makeEvent, CLIENT_AUTH } from "@welshman/util" import { Nip01Signer } from "@welshman/signer" -import { AuthState, AuthStatus, AuthStateEventType, AuthManager, makeAuthEvent } from "../src/auth" +import { AuthState, AuthStatus, AuthStateEvent, AuthManager, makeAuthEvent } from "../src/auth" import EventEmitter from "events" import { RelayMessage } from "../src/message" @@ -43,7 +43,7 @@ describe('auth', () => { it("should handle AUTH message from relay", () => { const message: RelayMessage = ["AUTH", "challenge123"] - socket.emit(SocketEventType.Receive, message) + socket.emit(SocketEvent.Receive, message) expect(authManager.state.challenge).toBe("challenge123") expect(authManager.state.status).toBe(AuthStatus.Requested) @@ -52,7 +52,7 @@ describe('auth', () => { it("should handle successful OK message", () => { authManager.state.request = "request123" const message: RelayMessage = ["OK", "request123", true, "success"] - socket.emit(SocketEventType.Receive, message) + socket.emit(SocketEvent.Receive, message) expect(authManager.state.status).toBe(AuthStatus.Ok) expect(authManager.state.details).toBe("success") @@ -61,7 +61,7 @@ describe('auth', () => { it("should handle failed OK message", () => { authManager.state.request = "request123" const message: RelayMessage = ["OK", "request123", false, "forbidden"] - socket.emit(SocketEventType.Receive, message) + socket.emit(SocketEvent.Receive, message) expect(authManager.state.status).toBe(AuthStatus.Forbidden) expect(authManager.state.details).toBe("forbidden") @@ -70,14 +70,14 @@ describe('auth', () => { it("should ignore OK messages for different requests", () => { authManager.state.request = "request123" const message: RelayMessage = ["OK", "different-request", true, "success"] - socket.emit(SocketEventType.Receive, message) + socket.emit(SocketEvent.Receive, message) expect(authManager.state.status).toBe(AuthStatus.None) }) it("should handle client AUTH message", () => { const message: RelayMessage = ["AUTH", { id: "123", kind: CLIENT_AUTH }] - socket.emit(SocketEventType.Enqueue, message) + socket.emit(SocketEvent.Enqueue, message) expect(authManager.state.status).toBe(AuthStatus.PendingResponse) }) @@ -88,7 +88,7 @@ describe('auth', () => { authManager.state.details = "details" authManager.state.status = AuthStatus.PendingResponse - socket.emit(SocketEventType.Status, SocketStatus.Closed) + socket.emit(SocketEvent.Status, SocketStatus.Closed) expect(authManager.state.challenge).toBeUndefined() expect(authManager.state.request).toBeUndefined() @@ -98,7 +98,7 @@ describe('auth', () => { it("should emit status changes", () => { const statusSpy = vi.fn() - authManager.state.on(AuthStateEventType.Status, statusSpy) + authManager.state.on(AuthStateEvent.Status, statusSpy) authManager.state.setStatus(AuthStatus.Requested) @@ -121,14 +121,14 @@ describe('auth', () => { const respondSpy = vi.spyOn(AuthManager.prototype, "respond") const eagerManager = new AuthManager(socket, { sign, eager: true }) - socket.emit(SocketEventType.Receive, ["AUTH", "challenge123"]) + socket.emit(SocketEvent.Receive, ["AUTH", "challenge123"]) expect(respondSpy).toHaveBeenCalled() }) it("should not respond automatically when eager is false", () => { const respondSpy = vi.spyOn(AuthManager.prototype, "respond") - socket.emit(SocketEventType.Receive, ["AUTH", "challenge123"]) + socket.emit(SocketEvent.Receive, ["AUTH", "challenge123"]) expect(respondSpy).not.toHaveBeenCalled() }) diff --git a/packages/net/__tests__/policy.test.ts b/packages/net/__tests__/policy.test.ts index aab5112..1564a29 100644 --- a/packages/net/__tests__/policy.test.ts +++ b/packages/net/__tests__/policy.test.ts @@ -1,7 +1,7 @@ import { AUTH_JOIN } from "@welshman/util" import { describe, expect, it, vi, beforeEach, afterEach } from "vitest" -import { Socket, SocketStatus, SocketEventType } from "../src/socket" -import { AuthStatus, AuthStateEventType } from "../src/auth" +import { Socket, SocketStatus, SocketEvent } from "../src/socket" +import { AuthStatus, AuthStateEvent } from "../src/auth" import { socketPolicySendWhenOpen, socketPolicyDeferOnAuth, @@ -46,7 +46,7 @@ describe('policy', () => { const cleanup = socketPolicyDeferOnAuth(socket) const removeSpy = vi.spyOn(socket._sendQueue, 'remove') - socket.emit(SocketEventType.Receive, ["AUTH", "challenge"]) + socket.emit(SocketEvent.Receive, ["AUTH", "challenge"]) // Regular event should be buffered const event: ClientMessage = ["EVENT", { id: "123"}] @@ -70,7 +70,7 @@ describe('policy', () => { const cleanup = socketPolicyDeferOnAuth(socket) const sendSpy = vi.spyOn(socket, 'send') - socket.emit(SocketEventType.Receive, ["AUTH", "challenge"]) + socket.emit(SocketEvent.Receive, ["AUTH", "challenge"]) // Buffer some messages const event1: ClientMessage = ["EVENT", { id: "123"}] @@ -80,7 +80,7 @@ describe('policy', () => { // Auth succeeds socket.send(["AUTH", { id: "auth" }]) - socket.emit(AuthStateEventType.Status, AuthStatus.Ok) + socket.emit(AuthStateEvent.Status, AuthStatus.Ok) expect(sendSpy).toHaveBeenCalledWith(event1) expect(sendSpy).toHaveBeenCalledWith(event2) @@ -92,7 +92,7 @@ describe('policy', () => { const cleanup = socketPolicyDeferOnAuth(socket) const removeSpy = vi.spyOn(socket._sendQueue, 'remove') - socket.emit(SocketEventType.Receive, ["AUTH", "challenge"]) + socket.emit(SocketEvent.Receive, ["AUTH", "challenge"]) // Buffer a REQ message const req: ClientMessage = ["REQ", "123", { kinds: [1] }] @@ -117,16 +117,16 @@ describe('policy', () => { // Send an event const event: ClientMessage = ["EVENT", { id: "123", kind: 1, content: "", tags: [], pubkey: "", sig: "" }] - socket.emit(SocketEventType.Send, event) + socket.emit(SocketEvent.Send, event) // Receive auth-required rejection - socket.emit(SocketEventType.Receive, ["OK", "123", false, "auth-required: need to auth first"]) + socket.emit(SocketEvent.Receive, ["OK", "123", false, "auth-required: need to auth first"]) // Should retry the event expect(sendSpy).toHaveBeenCalledWith(event) // Receive another auth-required rejection - socket.emit(SocketEventType.Receive, ["OK", "123", false, "auth-required: need to auth first"]) + socket.emit(SocketEvent.Receive, ["OK", "123", false, "auth-required: need to auth first"]) // Should not retry again expect(sendSpy).toHaveBeenCalledTimes(1) @@ -140,16 +140,16 @@ describe('policy', () => { // Send a REQ const req: ClientMessage = ["REQ", "123", { kinds: [1] }] - socket.emit(SocketEventType.Send, req) + socket.emit(SocketEvent.Send, req) // Receive auth-required rejection via CLOSED - socket.emit(SocketEventType.Receive, ["CLOSED", "123", "auth-required: need to auth first"]) + socket.emit(SocketEvent.Receive, ["CLOSED", "123", "auth-required: need to auth first"]) // Should retry the request expect(sendSpy).toHaveBeenCalledWith(req) // Receive another auth-required rejection - socket.emit(SocketEventType.Receive, ["CLOSED", "123", "auth-required: need to auth first"]) + socket.emit(SocketEvent.Receive, ["CLOSED", "123", "auth-required: need to auth first"]) // Should not retry again expect(sendSpy).toHaveBeenCalledTimes(1) @@ -163,10 +163,10 @@ describe('policy', () => { // Send an AUTH_JOIN event const event: ClientMessage = ["EVENT", { id: "123", kind: AUTH_JOIN, content: "", tags: [], pubkey: "", sig: "" }] - socket.emit(SocketEventType.Send, event) + socket.emit(SocketEvent.Send, event) // Receive auth-required rejection - socket.emit(SocketEventType.Receive, ["OK", "123", false, "auth-required: need to auth first"]) + socket.emit(SocketEvent.Receive, ["OK", "123", false, "auth-required: need to auth first"]) // Should not retry AUTH_JOIN events expect(sendSpy).not.toHaveBeenCalled() @@ -180,13 +180,13 @@ describe('policy', () => { // Send an event const event: ClientMessage = ["EVENT", { id: "123", kind: 1, content: "", tags: [], pubkey: "", sig: "" }] - socket.emit(SocketEventType.Send, event) + socket.emit(SocketEvent.Send, event) // Receive successful response - socket.emit(SocketEventType.Receive, ["OK", "123", true, ""]) + socket.emit(SocketEvent.Receive, ["OK", "123", true, ""]) // Receive auth-required rejection (should not trigger retry since message was cleared) - socket.emit(SocketEventType.Receive, ["OK", "123", false, "auth-required: need to auth first"]) + socket.emit(SocketEvent.Receive, ["OK", "123", false, "auth-required: need to auth first"]) // Should not retry expect(sendSpy).not.toHaveBeenCalled() @@ -201,11 +201,11 @@ describe('policy', () => { const openSpy = vi.spyOn(socket, 'open') // Socket starts closed - socket.emit(SocketEventType.Status, SocketStatus.Closed) + socket.emit(SocketEvent.Status, SocketStatus.Closed) // Send a message const event: ClientMessage = ["EVENT", { id: "123", kind: 1 }] - socket.emit(SocketEventType.Enqueue, event) + socket.emit(SocketEvent.Enqueue, event) // Should open the socket expect(openSpy).toHaveBeenCalled() @@ -218,11 +218,11 @@ describe('policy', () => { const openSpy = vi.spyOn(socket, 'open') // Socket is open - socket.emit(SocketEventType.Status, SocketStatus.Open) + socket.emit(SocketEvent.Status, SocketStatus.Open) // Send a message const event: ClientMessage = ["EVENT", { id: "123", kind: 1 }] - socket.emit(SocketEventType.Enqueue, event) + socket.emit(SocketEvent.Enqueue, event) // Should not try to open the socket expect(openSpy).not.toHaveBeenCalled() @@ -235,12 +235,12 @@ describe('policy', () => { const openSpy = vi.spyOn(socket, 'open') // Socket has an error - socket.emit(SocketEventType.Status, SocketStatus.Error) - socket.emit(SocketEventType.Status, SocketStatus.Closed) + socket.emit(SocketEvent.Status, SocketStatus.Error) + socket.emit(SocketEvent.Status, SocketStatus.Closed) // Send a message const event: ClientMessage = ["EVENT", { id: "123", kind: 1 }] - socket.emit(SocketEventType.Enqueue, event) + socket.emit(SocketEvent.Enqueue, event) // Should not try to open the socket due to recent error expect(openSpy).not.toHaveBeenCalled() @@ -249,7 +249,7 @@ describe('policy', () => { vi.advanceTimersByTime(31000) // Send another message - socket.emit(SocketEventType.Enqueue, event) + socket.emit(SocketEvent.Enqueue, event) // Now it should try to open expect(openSpy).toHaveBeenCalled() @@ -264,7 +264,7 @@ describe('policy', () => { const closeSpy = vi.spyOn(socket, 'close') // Set socket as open - socket.emit(SocketEventType.Status, SocketStatus.Open) + socket.emit(SocketEvent.Status, SocketStatus.Open) // Advance time past the timeout await vi.advanceTimersByTimeAsync(35000) @@ -280,13 +280,13 @@ describe('policy', () => { const closeSpy = vi.spyOn(socket, 'close') // Set socket as open - socket.emit(SocketEventType.Status, SocketStatus.Open) + socket.emit(SocketEvent.Status, SocketStatus.Open) // Advance time partially vi.advanceTimersByTime(20000) // Send a message - socket.emit(SocketEventType.Send, ["EVENT", { id: "123" }]) + socket.emit(SocketEvent.Send, ["EVENT", { id: "123" }]) // Advance time partially again vi.advanceTimersByTime(20000) @@ -308,13 +308,13 @@ describe('policy', () => { const closeSpy = vi.spyOn(socket, 'close') // Set socket as open - socket.emit(SocketEventType.Status, SocketStatus.Open) + socket.emit(SocketEvent.Status, SocketStatus.Open) // Advance time partially vi.advanceTimersByTime(20000) // Receive a message - socket.emit(SocketEventType.Receive, ["EVENT", "123", { id: "123" }]) + socket.emit(SocketEvent.Receive, ["EVENT", "123", { id: "123" }]) // Advance time partially again vi.advanceTimersByTime(20000) @@ -336,7 +336,7 @@ describe('policy', () => { const closeSpy = vi.spyOn(socket, 'close') // Set socket as closed - socket.emit(SocketEventType.Status, SocketStatus.Closed) + socket.emit(SocketEvent.Status, SocketStatus.Closed) // Advance time past the timeout vi.advanceTimersByTime(31000) @@ -355,10 +355,10 @@ describe('policy', () => { // Send an event that will be pending const event: ClientMessage = ["EVENT", { id: "123", kind: 1 }] - socket.emit(SocketEventType.Send, event) + socket.emit(SocketEvent.Send, event) // Socket closes - socket.emit(SocketEventType.Status, SocketStatus.Closed) + socket.emit(SocketEvent.Status, SocketStatus.Closed) // Advance past the reopen delay await vi.advanceTimersByTimeAsync(30000) @@ -375,10 +375,10 @@ describe('policy', () => { // Send a request that will be pending const req: ClientMessage = ["REQ", "123", { kinds: [1] }] - socket.emit(SocketEventType.Send, req) + socket.emit(SocketEvent.Send, req) // Socket closes - socket.emit(SocketEventType.Status, SocketStatus.Closed) + socket.emit(SocketEvent.Status, SocketStatus.Closed) // Advance past the reopen delay await vi.advanceTimersByTimeAsync(30000) @@ -395,11 +395,11 @@ describe('policy', () => { // Send an event that will be pending const event: ClientMessage = ["EVENT", { id: "123", kind: 1 }] - socket.emit(SocketEventType.Send, event) + socket.emit(SocketEvent.Send, event) // Socket opens then closes quickly - socket.emit(SocketEventType.Status, SocketStatus.Open) - socket.emit(SocketEventType.Status, SocketStatus.Closed) + socket.emit(SocketEvent.Status, SocketStatus.Open) + socket.emit(SocketEvent.Status, SocketStatus.Closed) // Advance a short time vi.advanceTimersByTime(5000) @@ -422,13 +422,13 @@ describe('policy', () => { // Send an event that will be pending const event: ClientMessage = ["EVENT", { id: "123", kind: 1 }] - socket.emit(SocketEventType.Send, event) + socket.emit(SocketEvent.Send, event) // Event completes successfully - socket.emit(SocketEventType.Receive, ["OK", "123", true]) + socket.emit(SocketEvent.Receive, ["OK", "123", true]) // Socket closes - socket.emit(SocketEventType.Status, SocketStatus.Closed) + socket.emit(SocketEvent.Status, SocketStatus.Closed) // Advance past the reopen delay vi.advanceTimersByTime(30000) @@ -445,14 +445,14 @@ describe('policy', () => { // Send a request that will be pending const req: ClientMessage = ["REQ", "123", { kinds: [1] }] - socket.emit(SocketEventType.Send, req) + socket.emit(SocketEvent.Send, req) // Send close for the request const close: ClientMessage = ["CLOSE", "123"] - socket.emit(SocketEventType.Send, close) + socket.emit(SocketEvent.Send, close) // Socket closes - socket.emit(SocketEventType.Status, SocketStatus.Closed) + socket.emit(SocketEvent.Status, SocketStatus.Closed) // Advance past the reopen delay vi.advanceTimersByTime(30000) diff --git a/packages/net/__tests__/request.test.ts b/packages/net/__tests__/request.test.ts index ffec9f2..f4760ae 100644 --- a/packages/net/__tests__/request.test.ts +++ b/packages/net/__tests__/request.test.ts @@ -2,8 +2,8 @@ import { describe, expect, it, vi, beforeEach, afterEach } from "vitest" import { Nip01Signer } from '@welshman/signer' import { LOCAL_RELAY_URL, makeEvent } from '@welshman/util' import { ClientMessageType, RelayMessage } from "../src/message" -import { AdapterContext, AbstractAdapter, AdapterEventType } from "../src/adapter" -import { unireq, multireq, RequestEventType } from "../src/request" +import { AdapterContext, AbstractAdapter, AdapterEvent } from "../src/adapter" +import { unireq, multireq, RequestEvent } from "../src/request" import { Tracker } from "../src/tracker" class MockAdapter extends AbstractAdapter { @@ -20,7 +20,7 @@ class MockAdapter extends AbstractAdapter { } receive = (message: RelayMessage) => { - this.emit(AdapterEventType.Receive, message, this.url) + this.emit(AdapterEvent.Receive, message, this.url) } } @@ -50,12 +50,12 @@ describe("Unireq", () => { const eoseSpy = vi.fn() const closeSpy = vi.fn() - req.on(RequestEventType.Duplicate, duplicateSpy) - req.on(RequestEventType.Invalid, invalidSpy) - req.on(RequestEventType.Filtered, filteredSpy) - req.on(RequestEventType.Event, eventSpy) - req.on(RequestEventType.Eose, eoseSpy) - req.on(RequestEventType.Close, closeSpy) + req.on(RequestEvent.Duplicate, duplicateSpy) + req.on(RequestEvent.Invalid, invalidSpy) + req.on(RequestEvent.Filtered, filteredSpy) + req.on(RequestEvent.Event, eventSpy) + req.on(RequestEvent.Eose, eoseSpy) + req.on(RequestEvent.Close, closeSpy) await vi.runAllTimers() @@ -116,12 +116,12 @@ describe("Multireq", () => { const eoseSpy = vi.fn() const closeSpy = vi.fn() - req.on(RequestEventType.Duplicate, duplicateSpy) - req.on(RequestEventType.Invalid, invalidSpy) - req.on(RequestEventType.Filtered, filteredSpy) - req.on(RequestEventType.Event, eventSpy) - req.on(RequestEventType.Eose, eoseSpy) - req.on(RequestEventType.Close, closeSpy) + req.on(RequestEvent.Duplicate, duplicateSpy) + req.on(RequestEvent.Invalid, invalidSpy) + req.on(RequestEvent.Filtered, filteredSpy) + req.on(RequestEvent.Event, eventSpy) + req.on(RequestEvent.Eose, eoseSpy) + req.on(RequestEvent.Close, closeSpy) await vi.runAllTimers() diff --git a/packages/net/src/Pool.ts b/packages/net/src/Pool.ts index 190b2de..eff0ea5 100644 --- a/packages/net/src/Pool.ts +++ b/packages/net/src/Pool.ts @@ -19,10 +19,20 @@ export type PoolOptions = { makeSocket?: (url: string) => Socket } +export let poolSingleton: Pool + export class Pool { _data = new Map() _subs: PoolSubscription[] = [] + static getSingleton() { + if (!poolSingleton) { + poolSingleton = new Pool() + } + + return poolSingleton + } + constructor(readonly options: PoolOptions = {}) {} has(url: string) { diff --git a/packages/net/src/Publish.ts b/packages/net/src/Publish.ts index ff52555..8ee0279 100644 --- a/packages/net/src/Publish.ts +++ b/packages/net/src/Publish.ts @@ -2,7 +2,7 @@ import {EventEmitter} from "events" import {on, fromPairs, sleep, yieldThread} from "@welshman/lib" import {SignedEvent} from "@welshman/util" import {RelayMessage, ClientMessageType, isRelayOk} from "./message.js" -import {AbstractAdapter, AdapterEventType, AdapterContext, getAdapter} from "./adapter.js" +import {AbstractAdapter, AdapterEvent, AdapterContext, getAdapter} from "./adapter.js" import {TypedEmitter} from "./util.js" export enum PublishStatus { @@ -13,7 +13,7 @@ export enum PublishStatus { Aborted = "publish:status:aborted", } -export enum PublishEventType { +export enum PublishEvent { Success = "publish:event:success", Failure = "publish:event:failure", Timeout = "publish:event:timeout", @@ -24,17 +24,17 @@ export enum PublishEventType { // Unicast export type UnicastEvents = { - [PublishEventType.Success]: (id: string, detail: string) => void - [PublishEventType.Failure]: (id: string, detail: string) => void - [PublishEventType.Timeout]: () => void - [PublishEventType.Aborted]: () => void - [PublishEventType.Complete]: () => void + [PublishEvent.Success]: (id: string, detail: string) => void + [PublishEvent.Failure]: (id: string, detail: string) => void + [PublishEvent.Timeout]: () => void + [PublishEvent.Aborted]: () => void + [PublishEvent.Complete]: () => void } export type UnicastOptions = { event: SignedEvent relay: string - context: AdapterContext + context?: AdapterContext timeout?: number } @@ -53,7 +53,7 @@ export class Unicast extends (EventEmitter as new () => TypedEmitter { if (isRelayOk(message)) { const [_, id, ok, detail] = message @@ -62,10 +62,10 @@ export class Unicast extends (EventEmitter as new () => TypedEmitter TypedEmitter { if (this.status === PublishStatus.Pending) { this.status = PublishStatus.Timeout - this.emit(PublishEventType.Timeout) + this.emit(PublishEvent.Timeout) } this.cleanup() @@ -92,13 +92,13 @@ export class Unicast extends (EventEmitter as new () => TypedEmitter { if (this.status === PublishStatus.Pending) { this.status = PublishStatus.Aborted - this.emit(PublishEventType.Aborted) + this.emit(PublishEvent.Aborted) this.cleanup() } } cleanup = () => { - this.emit(PublishEventType.Complete) + this.emit(PublishEvent.Complete) this.removeAllListeners() this._adapter.cleanup() this._unsubscriber() @@ -108,11 +108,11 @@ export class Unicast extends (EventEmitter as new () => TypedEmitter void - [PublishEventType.Failure]: (id: string, detail: string, url: string) => void - [PublishEventType.Timeout]: (url: string) => void - [PublishEventType.Aborted]: (url: string) => void - [PublishEventType.Complete]: () => void + [PublishEvent.Success]: (id: string, detail: string, url: string) => void + [PublishEvent.Failure]: (id: string, detail: string, url: string) => void + [PublishEvent.Timeout]: (url: string) => void + [PublishEvent.Aborted]: (url: string) => void + [PublishEvent.Complete]: () => void } export type MulticastOptions = Omit & { @@ -133,32 +133,32 @@ export class Multicast extends (EventEmitter as new () => TypedEmitter { + unicast.on(PublishEvent.Success, (id: string, detail: string) => { this.status[relay] = unicast.status - this.emit(PublishEventType.Success, id, detail, relay) + this.emit(PublishEvent.Success, id, detail, relay) }) - unicast.on(PublishEventType.Failure, (id: string, detail: string) => { + unicast.on(PublishEvent.Failure, (id: string, detail: string) => { this.status[relay] = unicast.status - this.emit(PublishEventType.Failure, id, detail, relay) + this.emit(PublishEvent.Failure, id, detail, relay) }) - unicast.on(PublishEventType.Timeout, () => { + unicast.on(PublishEvent.Timeout, () => { this.status[relay] = unicast.status - this.emit(PublishEventType.Timeout, relay) + this.emit(PublishEvent.Timeout, relay) }) - unicast.on(PublishEventType.Aborted, () => { + unicast.on(PublishEvent.Aborted, () => { this.status[relay] = unicast.status - this.emit(PublishEventType.Aborted, relay) + this.emit(PublishEvent.Aborted, relay) }) - unicast.on(PublishEventType.Complete, () => { + unicast.on(PublishEvent.Complete, () => { this._completed.add(relay) this.status[relay] = unicast.status if (this._completed.size === relays.length) { - this.emit(PublishEventType.Complete) + this.emit(PublishEvent.Complete) this.cleanup() } }) diff --git a/packages/net/src/Socket.ts b/packages/net/src/Socket.ts index 7d80f67..d9a0f61 100644 --- a/packages/net/src/Socket.ts +++ b/packages/net/src/Socket.ts @@ -13,7 +13,7 @@ export enum SocketStatus { Invalid = "socket:status:invalid", } -export enum SocketEventType { +export enum SocketEvent { Error = "socket:event:error", Status = "socket:event:status", Send = "socket:event:send", @@ -22,11 +22,11 @@ export enum SocketEventType { } export type SocketEvents = { - [SocketEventType.Error]: (error: string, url: string) => void - [SocketEventType.Status]: (status: SocketStatus, url: string) => void - [SocketEventType.Send]: (message: ClientMessage, url: string) => void - [SocketEventType.Enqueue]: (message: ClientMessage, url: string) => void - [SocketEventType.Receive]: (message: RelayMessage, url: string) => void + [SocketEvent.Error]: (error: string, url: string) => void + [SocketEvent.Status]: (status: SocketStatus, url: string) => void + [SocketEvent.Send]: (message: ClientMessage, url: string) => void + [SocketEvent.Enqueue]: (message: ClientMessage, url: string) => void + [SocketEvent.Receive]: (message: RelayMessage, url: string) => void } export class Socket extends (EventEmitter as new () => TypedEmitter) { @@ -43,18 +43,18 @@ export class Socket extends (EventEmitter as new () => TypedEmitter { this._ws?.send(JSON.stringify(message)) - this.emit(SocketEventType.Send, message, this.url) + this.emit(SocketEvent.Send, message, this.url) }, }) this._recvQueue = new TaskQueue({ batchSize: 50, processItem: (message: RelayMessage) => { - this.emit(SocketEventType.Receive, message, this.url) + this.emit(SocketEvent.Receive, message, this.url) }, }) - this.on(SocketEventType.Status, (status: SocketStatus) => { + this.on(SocketEvent.Status, (status: SocketStatus) => { this.status = status }) } @@ -66,21 +66,21 @@ export class Socket extends (EventEmitter as new () => TypedEmitter { - this.emit(SocketEventType.Status, SocketStatus.Open, this.url) + this.emit(SocketEvent.Status, SocketStatus.Open, this.url) this._sendQueue.start() } this._ws.onerror = () => { - this.emit(SocketEventType.Status, SocketStatus.Error, this.url) + this.emit(SocketEvent.Status, SocketStatus.Error, this.url) this._sendQueue.stop() this._ws = undefined } this._ws.onclose = () => { - this.emit(SocketEventType.Status, SocketStatus.Closed, this.url) + this.emit(SocketEvent.Status, SocketStatus.Closed, this.url) this._sendQueue.stop() this._ws = undefined } @@ -94,14 +94,14 @@ export class Socket extends (EventEmitter as new () => TypedEmitter TypedEmitter { this._sendQueue.push(message) - this.emit(SocketEventType.Enqueue, message, this.url) + this.emit(SocketEvent.Enqueue, message, this.url) } } diff --git a/packages/net/src/adapter.ts b/packages/net/src/adapter.ts index 1874161..f5ce979 100644 --- a/packages/net/src/adapter.ts +++ b/packages/net/src/adapter.ts @@ -2,16 +2,16 @@ import EventEmitter from "events" import {call, on} from "@welshman/lib" import {Relay, LOCAL_RELAY_URL, isRelayUrl} from "@welshman/util" import {RelayMessage, ClientMessage} from "./message.js" -import {Socket, SocketEventType} from "./socket.js" +import {Socket, SocketEvent} from "./socket.js" import {TypedEmitter, Unsubscriber} from "./util.js" import {Pool} from "./pool.js" -export enum AdapterEventType { +export enum AdapterEvent { Receive = "adapter:event:receive", } export type AdapterEvents = { - [AdapterEventType.Receive]: (message: RelayMessage, url: string) => void + [AdapterEvent.Receive]: (message: RelayMessage, url: string) => void } export abstract class AbstractAdapter extends (EventEmitter as new () => TypedEmitter) { @@ -32,8 +32,8 @@ export class SocketAdapter extends AbstractAdapter { super() this._unsubscribers.push( - on(socket, SocketEventType.Receive, (message: RelayMessage, url: string) => { - this.emit(AdapterEventType.Receive, message, url) + on(socket, SocketEvent.Receive, (message: RelayMessage, url: string) => { + this.emit(AdapterEvent.Receive, message, url) }), ) } @@ -57,7 +57,7 @@ export class LocalAdapter extends AbstractAdapter { this._unsubscribers.push( on(relay, "*", (...message: RelayMessage) => { - this.emit(AdapterEventType.Receive, message, LOCAL_RELAY_URL) + this.emit(AdapterEvent.Receive, message, LOCAL_RELAY_URL) }), ) } @@ -77,13 +77,25 @@ export class LocalAdapter extends AbstractAdapter { } } +export class EmptyAdapter extends AbstractAdapter { + get sockets() { + return [] + } + + get urls() { + return [] + } + + send(message: ClientMessage) {} +} + export type AdapterContext = { pool?: Pool relay?: Relay getAdapter?: (url: string, context: AdapterContext) => AbstractAdapter } -export const getAdapter = (url: string, context: AdapterContext) => { +export const getAdapter = (url: string, context: AdapterContext = {}) => { if (context.getAdapter) { const adapter = context.getAdapter(url, context) @@ -93,19 +105,13 @@ export const getAdapter = (url: string, context: AdapterContext) => { } if (url === LOCAL_RELAY_URL) { - if (!context.relay) { - throw new Error(`Unable to get local relay for ${url}`) - } - - return new LocalAdapter(context.relay) + return context.relay ? new LocalAdapter(context.relay) : new EmptyAdapter() } if (isRelayUrl(url)) { - if (!context.pool) { - throw new Error(`Unable to get socket for ${url}`) - } + const pool = context.pool || Pool.getSingleton() - return new SocketAdapter(context.pool.get(url)) + return new SocketAdapter(pool.get(url)) } throw new Error(`Invalid relay url ${url}`) diff --git a/packages/net/src/auth.ts b/packages/net/src/auth.ts index a1b1424..a91a5a2 100644 --- a/packages/net/src/auth.ts +++ b/packages/net/src/auth.ts @@ -3,7 +3,7 @@ import {on, call, sleep} from "@welshman/lib" import type {SignedEvent, StampedEvent} from "@welshman/util" import {makeEvent, CLIENT_AUTH} from "@welshman/util" import {isRelayAuth, isClientAuth, isRelayOk, RelayMessage} from "./message.js" -import {Socket, SocketStatus, SocketEventType} from "./socket.js" +import {Socket, SocketStatus, SocketEvent} from "./socket.js" import {TypedEmitter, Unsubscriber} from "./util.js" export const makeAuthEvent = (url: string, challenge: string) => @@ -29,12 +29,12 @@ export type AuthResult = { reason?: string } -export enum AuthStateEventType { +export enum AuthStateEvent { Status = "auth:event:status", } export type AuthStateEvents = { - [AuthStateEventType.Status]: (status: AuthStatus) => void + [AuthStateEvent.Status]: (status: AuthStatus) => void } export class AuthState extends (EventEmitter as new () => TypedEmitter) { @@ -48,7 +48,7 @@ export class AuthState extends (EventEmitter as new () => TypedEmitter { + on(socket, SocketEvent.Receive, (message: RelayMessage) => { if (isRelayOk(message)) { const [_, id, ok, details] = message @@ -72,12 +72,12 @@ export class AuthState extends (EventEmitter as new () => TypedEmitter { + on(socket, SocketEvent.Enqueue, (message: RelayMessage) => { if (isClientAuth(message)) { this.setStatus(AuthStatus.PendingResponse) } }), - on(socket, SocketEventType.Status, (status: SocketStatus) => { + on(socket, SocketEvent.Status, (status: SocketStatus) => { if (status === SocketStatus.Closed) { this.challenge = undefined this.request = undefined @@ -90,7 +90,7 @@ export class AuthState extends (EventEmitter as new () => TypedEmitter { + this.state.on(AuthStateEvent.Status, (status: string) => { if (status === AuthStatus.Requested && options.eager) { this.respond() } diff --git a/packages/net/src/diff.ts b/packages/net/src/diff.ts index e3f061c..1af5856 100644 --- a/packages/net/src/diff.ts +++ b/packages/net/src/diff.ts @@ -9,28 +9,28 @@ import { RelayMessageType, ClientMessageType, } from "./message.js" -import {getAdapter, AdapterContext, AbstractAdapter, AdapterEventType} from "./adapter.js" +import {getAdapter, AdapterContext, AbstractAdapter, AdapterEvent} from "./adapter.js" import {Negentropy, NegentropyStorageVector} from "./negentropy.js" -import {unireq, RequestEventType} from "./request.js" -import {multicast, PublishEventType} from "./publish.js" +import {unireq, RequestEvent} from "./request.js" +import {multicast, PublishEvent} from "./publish.js" -export enum DifferenceEventType { +export enum DifferenceEvent { Message = "difference:event:message", Error = "difference:event:error", Close = "difference:event:close", } export type DifferenceEvents = { - [DifferenceEventType.Message]: (payload: {have: string[]; need: string[]}, url: string) => void - [DifferenceEventType.Error]: (error: string, url: string) => void - [DifferenceEventType.Close]: () => void + [DifferenceEvent.Message]: (payload: {have: string[]; need: string[]}, url: string) => void + [DifferenceEvent.Error]: (error: string, url: string) => void + [DifferenceEvent.Close]: () => void } export type DifferenceOptions = { relay: string filter: Filter events: SignedEvent[] - context: AdapterContext + context?: AdapterContext } export class Difference extends (EventEmitter as new () => TypedEmitter) { @@ -61,7 +61,7 @@ export class Difference extends (EventEmitter as new () => TypedEmitter { if (isRelayNegMsg(message)) { const [_, negid, msg] = message @@ -77,7 +77,7 @@ export class Difference extends (EventEmitter as new () => TypedEmitter TypedEmitter TypedEmitter { new Promise((resolve, reject) => { const diff = new Difference({relay, filter, ...options}) - diff.on(DifferenceEventType.Close, () => { + diff.on(DifferenceEvent.Close, () => { resolve({relay, have: diff.have, need: diff.need}) diff.close() }) - diff.on(DifferenceEventType.Error, (url, message) => { + diff.on(DifferenceEvent.Error, (url, message) => { reject(message) diff.close() }) @@ -206,8 +206,8 @@ export const pull = async ({context, ...options}: PullOptions) => { return new Promise(resolve => { const req = unireq({relay, context, filter: {ids}, autoClose: true}) - req.on(RequestEventType.Close, resolve) - req.on(RequestEventType.Event, event => result.push(event)) + req.on(RequestEvent.Close, resolve) + req.on(RequestEvent.Event, event => result.push(event)) }) }), ) @@ -236,7 +236,7 @@ export const push = async ({context, events, ...options}: PushOptions) => { if (relays) { new Promise(resolve => { - multicast({event, relays, context}).on(PublishEventType.Complete, resolve) + multicast({event, relays, context}).on(PublishEvent.Complete, resolve) }) } }), diff --git a/packages/net/src/policy.ts b/packages/net/src/policy.ts index 07bb9c2..d3f78be 100644 --- a/packages/net/src/policy.ts +++ b/packages/net/src/policy.ts @@ -11,8 +11,8 @@ import { isRelayOk, isRelayClosed, } from "./message.js" -import {Socket, SocketStatus, SocketEventType} from "./socket.js" -import {AuthState, AuthStatus, AuthStateEventType} from "./auth.js" +import {Socket, SocketStatus, SocketEvent} from "./socket.js" +import {AuthState, AuthStatus, AuthStateEvent} from "./auth.js" /** * Defers sending messages when a challenge has been presented and not answered yet @@ -26,7 +26,7 @@ export const socketPolicyDeferOnAuth = (socket: Socket) => { const unsubscribers = [ // Pause sending certain messages when we're not authenticated - on(socket, SocketEventType.Enqueue, (message: ClientMessage) => { + on(socket, SocketEvent.Enqueue, (message: ClientMessage) => { // If we're closing a request, but it never got sent, remove both from the queue // Otherwise, always send CLOSE if (isClientClose(message)) { @@ -53,7 +53,7 @@ export const socketPolicyDeferOnAuth = (socket: Socket) => { } }), // Send buffered messages when we get successful auth - on(authState, AuthStateEventType.Status, (status: AuthStatus) => { + on(authState, AuthStateEvent.Status, (status: AuthStatus) => { if (okStatuses.includes(status) && buffer.length > 0) { for (const message of buffer.splice(0)) { socket.send(message) @@ -79,7 +79,7 @@ export const socketPolicyRetryAuthRequired = (socket: Socket) => { const unsubscribers = [ // Watch outgoing events and requests and keep a copy - on(socket, SocketEventType.Send, (message: ClientMessage) => { + on(socket, SocketEvent.Send, (message: ClientMessage) => { if (isClientEvent(message)) { const [_, event] = message @@ -97,7 +97,7 @@ export const socketPolicyRetryAuthRequired = (socket: Socket) => { } }), // If a message is rejected with auth-required, re-enqueue it one time - on(socket, SocketEventType.Receive, (message: RelayMessage) => { + on(socket, SocketEvent.Receive, (message: RelayMessage) => { if (isRelayOk(message)) { const [_, id, ok, detail] = message const pendingMessage = pending.get(id) @@ -137,7 +137,7 @@ export const socketPolicyConnectOnSend = (socket: Socket) => { let currentStatus = SocketStatus.Closed const unsubscribers = [ - on(socket, SocketEventType.Status, (newStatus: SocketStatus) => { + on(socket, SocketEvent.Status, (newStatus: SocketStatus) => { // Keep track of the most recent error if (newStatus === SocketStatus.Error) { lastError = now() @@ -146,7 +146,7 @@ export const socketPolicyConnectOnSend = (socket: Socket) => { // Keep track of the current status currentStatus = newStatus }), - on(socket, SocketEventType.Enqueue, (message: ClientMessage) => { + on(socket, SocketEvent.Enqueue, (message: ClientMessage) => { // When a new message is sent, make sure the socket is open (unless there was a recent error) if (currentStatus === SocketStatus.Closed && lastError < ago(30)) { socket.open() @@ -166,10 +166,10 @@ export const socketPolicyCloseOnTimeout = (socket: Socket) => { let lastActivity = now() const unsubscribers = [ - on(socket, SocketEventType.Send, (message: ClientMessage) => { + on(socket, SocketEvent.Send, (message: ClientMessage) => { lastActivity = now() }), - on(socket, SocketEventType.Receive, (message: RelayMessage) => { + on(socket, SocketEvent.Receive, (message: RelayMessage) => { lastActivity = now() }), ] @@ -197,7 +197,7 @@ export const socketPolicyReopenActive = (socket: Socket) => { let lastOpen = Date.now() const unsubscribers = [ - on(socket, SocketEventType.Status, (newStatus: SocketStatus) => { + on(socket, SocketEvent.Status, (newStatus: SocketStatus) => { // Keep track of the most recent error if (newStatus === SocketStatus.Open) { lastOpen = Date.now() @@ -214,7 +214,7 @@ export const socketPolicyReopenActive = (socket: Socket) => { }) } }), - on(socket, SocketEventType.Send, (message: ClientMessage) => { + on(socket, SocketEvent.Send, (message: ClientMessage) => { if (isClientEvent(message)) { pending.set(message[1].id, message) } @@ -227,7 +227,7 @@ export const socketPolicyReopenActive = (socket: Socket) => { pending.delete(message[1]) } }), - on(socket, SocketEventType.Receive, (message: RelayMessage) => { + on(socket, SocketEvent.Receive, (message: RelayMessage) => { if (isRelayClosed(message) || isRelayOk(message)) { pending.delete(message[1]) } diff --git a/packages/net/src/request.ts b/packages/net/src/request.ts index 0946050..4b9662e 100644 --- a/packages/net/src/request.ts +++ b/packages/net/src/request.ts @@ -3,8 +3,8 @@ import {verifyEvent as nostrToolsVerifyEvent} from "nostr-tools/pure" import {on, call, randomId, yieldThread} from "@welshman/lib" import {Filter, matchFilter, SignedEvent} from "@welshman/util" import {RelayMessage, ClientMessageType, isRelayEvent, isRelayEose} from "./message.js" -import {getAdapter, AdapterContext, AbstractAdapter, AdapterEventType} from "./adapter.js" -import {SocketEventType, SocketStatus} from "./socket.js" +import {getAdapter, AdapterContext, AbstractAdapter, AdapterEvent} from "./adapter.js" +import {SocketEvent, SocketStatus} from "./socket.js" import {TypedEmitter, Unsubscriber} from "./util.js" import {Tracker} from "./tracker.js" @@ -16,7 +16,7 @@ export const defaultVerifyEvent = (event: SignedEvent) => { } } -export enum RequestEventType { +export enum RequestEvent { Close = "request:event:close", Disconnect = "request:event:disconnect", Duplicate = "request:event:duplicate", @@ -29,19 +29,19 @@ export enum RequestEventType { // Unireq export type UnireqEvents = { - [RequestEventType.Event]: (event: SignedEvent) => void - [RequestEventType.Invalid]: (event: SignedEvent) => void - [RequestEventType.Filtered]: (event: SignedEvent) => void - [RequestEventType.Duplicate]: (event: SignedEvent) => void - [RequestEventType.Disconnect]: () => void - [RequestEventType.Close]: () => void - [RequestEventType.Eose]: () => void + [RequestEvent.Event]: (event: SignedEvent) => void + [RequestEvent.Invalid]: (event: SignedEvent) => void + [RequestEvent.Filtered]: (event: SignedEvent) => void + [RequestEvent.Duplicate]: (event: SignedEvent) => void + [RequestEvent.Disconnect]: () => void + [RequestEvent.Close]: () => void + [RequestEvent.Eose]: () => void } export type UnireqOptions = { relay: string filter: Filter - context: AdapterContext + context?: AdapterContext timeout?: number tracker?: Tracker autoClose?: boolean @@ -66,20 +66,20 @@ export class Unireq extends (EventEmitter as new () => TypedEmitter { + on(this._adapter, AdapterEvent.Receive, (message: RelayMessage, url: string) => { if (isRelayEvent(message)) { const [_, id, event] = message if (id !== this._id) return if (tracker.track(event.id, url)) { - this.emit(RequestEventType.Duplicate, event) + this.emit(RequestEvent.Duplicate, event) } else if (verifyEvent?.(event) === false) { - this.emit(RequestEventType.Invalid, event) + this.emit(RequestEvent.Invalid, event) } else if (!matchFilter(this.options.filter, event)) { - this.emit(RequestEventType.Filtered, event) + this.emit(RequestEvent.Filtered, event) } else { - this.emit(RequestEventType.Event, event) + this.emit(RequestEvent.Event, event) } } @@ -87,7 +87,7 @@ export class Unireq extends (EventEmitter as new () => TypedEmitter TypedEmitter { + on(socket, SocketEvent.Status, (status: SocketStatus) => { if (![SocketStatus.Open, SocketStatus.Opening].includes(status)) { - this.emit(RequestEventType.Disconnect) + this.emit(RequestEvent.Disconnect) if (this.options.autoClose) { this.close() @@ -127,7 +127,7 @@ export class Unireq extends (EventEmitter as new () => TypedEmitter TypedEmitter void - [RequestEventType.Invalid]: (event: SignedEvent, url: string) => void - [RequestEventType.Filtered]: (event: SignedEvent, url: string) => void - [RequestEventType.Duplicate]: (event: SignedEvent, url: string) => void - [RequestEventType.Disconnect]: (url: string) => void - [RequestEventType.Eose]: (url: string) => void - [RequestEventType.Close]: () => void + [RequestEvent.Event]: (event: SignedEvent, url: string) => void + [RequestEvent.Invalid]: (event: SignedEvent, url: string) => void + [RequestEvent.Filtered]: (event: SignedEvent, url: string) => void + [RequestEvent.Duplicate]: (event: SignedEvent, url: string) => void + [RequestEvent.Disconnect]: (url: string) => void + [RequestEvent.Eose]: (url: string) => void + [RequestEvent.Close]: () => void } export type MultireqOptions = Omit & { @@ -163,35 +163,35 @@ export class Multireq extends (EventEmitter as new () => TypedEmitter { - this.emit(RequestEventType.Event, event, relay) + req.on(RequestEvent.Event, (event: SignedEvent) => { + this.emit(RequestEvent.Event, event, relay) }) - req.on(RequestEventType.Invalid, (event: SignedEvent) => { - this.emit(RequestEventType.Invalid, event, relay) + req.on(RequestEvent.Invalid, (event: SignedEvent) => { + this.emit(RequestEvent.Invalid, event, relay) }) - req.on(RequestEventType.Filtered, (event: SignedEvent) => { - this.emit(RequestEventType.Filtered, event, relay) + req.on(RequestEvent.Filtered, (event: SignedEvent) => { + this.emit(RequestEvent.Filtered, event, relay) }) - req.on(RequestEventType.Duplicate, (event: SignedEvent) => { - this.emit(RequestEventType.Duplicate, event, relay) + req.on(RequestEvent.Duplicate, (event: SignedEvent) => { + this.emit(RequestEvent.Duplicate, event, relay) }) - req.on(RequestEventType.Disconnect, () => { - this.emit(RequestEventType.Disconnect, relay) + req.on(RequestEvent.Disconnect, () => { + this.emit(RequestEvent.Disconnect, relay) }) - req.on(RequestEventType.Eose, () => { - this.emit(RequestEventType.Eose, relay) + req.on(RequestEvent.Eose, () => { + this.emit(RequestEvent.Eose, relay) }) - req.on(RequestEventType.Close, () => { + req.on(RequestEvent.Close, () => { this._closed.add(relay) if (this._closed.size === relays.length) { - this.emit(RequestEventType.Close) + this.emit(RequestEvent.Close) } }) diff --git a/packages/signer/src/signers/nip46.ts b/packages/signer/src/signers/nip46.ts index b411be8..03c622c 100644 --- a/packages/signer/src/signers/nip46.ts +++ b/packages/signer/src/signers/nip46.ts @@ -15,7 +15,7 @@ import { StampedEvent, NOSTR_CONNECT, } from "@welshman/util" -import {subscribe, publish, Subscription, SubscriptionEvent} from "@welshman/net" +import {multireq, multicast, Multireq, RequestEvent, AdapterContext} from "@welshman/net" import {ISigner, EncryptionImplementation, decrypt, hash, own} from "../util.js" import {Nip01Signer} from "./nip01.js" @@ -32,6 +32,7 @@ export type Nip46BrokerParams = { connectSecret?: string signerPubkey?: string algorithm?: Nip46Algorithm + context?: AdapterContext } export type Nip46Response = { @@ -96,7 +97,7 @@ const popupManager = (() => { })() export class Nip46Receiver extends Emitter { - public sub?: Subscription + public sub?: Multireq constructor( public signer: ISigner, @@ -109,29 +110,28 @@ export class Nip46Receiver extends Emitter { start = async () => { if (this.sub) return + const {relays, context} = this.params const userPubkey = await this.signer.getPubkey() - const filters = [{kinds: [NOSTR_CONNECT], "#p": [userPubkey]}] + const filter = {kinds: [NOSTR_CONNECT], "#p": [userPubkey]} - this.sub = subscribe({relays: this.params.relays, filters}) + this.sub = multireq({relays, filter, context}) - return new Promise(resolve => { - this.sub!.on(SubscriptionEvent.Send, resolve) + this.sub.on(RequestEvent.Send, resolve) - this.sub!.on(SubscriptionEvent.Event, async (url: string, event: TrustedEvent) => { - const json = await decrypt(this.signer, event.pubkey, event.content) - const response = tryCatch(() => JSON.parse(json)) || {} + this.sub.on(RequestEvent.Event, async (event: TrustedEvent, url: string) => { + const json = await decrypt(this.signer, event.pubkey, event.content) + const response = tryCatch(() => JSON.parse(json)) || {} - // Delay errors in case there's a zombie signer out there clogging things up - if (response.error) { - await sleep(3000) - } + // Delay errors in case there's a zombie signer out there clogging things up + if (response.error) { + await sleep(3000) + } - this.emit(Nip46Event.Receive, {...response, url, event} as Nip46Response) - }) + this.emit(Nip46Event.Receive, {...response, url, event} as Nip46Response) + }) - this.sub!.on(SubscriptionEvent.Complete, () => { - this.sub = undefined - }) + this.sub.on(RequestEvent.Close, () => { + this.sub = undefined }) } @@ -154,7 +154,7 @@ export class Nip46Sender extends Emitter { // send a request to the remote signer, emitting the request and the pub public send = async (request: Nip46Request) => { const {id, method, params} = request - const {relays, signerPubkey, algorithm = "nip44"} = this.params + const {relays, signerPubkey, context, algorithm = "nip44"} = this.params if (!signerPubkey) { throw new Error("Unable to send nip46 request without a signer pubkey") @@ -164,7 +164,7 @@ export class Nip46Sender extends Emitter { const content = await this.signer[algorithm].encrypt(signerPubkey, payload) const template = createEvent(NOSTR_CONNECT, {content, tags: [["p", signerPubkey]]}) const event = await this.signer.sign(template) - const pub = publish({relays, event}) + const pub = multicast({relays, event, context}) this.emit(Nip46Event.Send, {...request, pub}) }