From 837fd1ab667c19b5ef181fa556f658fdf457f77c Mon Sep 17 00:00:00 2001 From: Jon Staab Date: Fri, 11 Apr 2025 08:03:44 -0700 Subject: [PATCH] Update publish --- packages/app/src/feeds.ts | 13 +- packages/app/src/relaySelections.ts | 10 +- packages/app/src/sync.ts | 12 +- packages/app/src/thunk.ts | 65 ++++----- packages/dvm/src/handler.ts | 6 +- packages/dvm/src/request.ts | 7 +- packages/net/__tests__/publish.test.ts | 116 +++++++-------- packages/net/__tests__/request.test.ts | 56 ++++--- packages/net/src/diff.ts | 6 +- packages/net/src/publish.ts | 193 ++++++++++--------------- packages/signer/src/signers/nip46.ts | 7 +- packages/util/src/Events.ts | 2 +- 12 files changed, 219 insertions(+), 274 deletions(-) diff --git a/packages/app/src/feeds.ts b/packages/app/src/feeds.ts index a15f940..177b7b5 100644 --- a/packages/app/src/feeds.ts +++ b/packages/app/src/feeds.ts @@ -1,5 +1,5 @@ import {nthEq, partition, race, now} from "@welshman/lib" -import {createEvent, getPubkeyTagValues} from "@welshman/util" +import {createEvent, getPubkeyTagValues, TrustedEvent} from "@welshman/util" import {request, Tracker} from "@welshman/net" import {Scope, FeedController, RequestOpts, FeedOptions, DVMOpts, Feed} from "@welshman/feeds" import {makeDvmRequest, DVMEvent} from "@welshman/dvm" @@ -20,14 +20,9 @@ export const makeFeedRequestHandler = ({signal}: FeedRequestHandlerOptions) => const requestOptions = {} if (relays.length > 0) { - await new Promise(resolve => { - const req = request({tracker, signal, autoClose: true, relays, filters}) - - req.on(RequestEvent.Event, onEvent) - req.on(RequestEvent.Close, resolve) - }) + await request({tracker, signal, relays, filters, onEvent, autoClose: true}) } else { - const promises: Promise[][] = [] + const promises: Promise[] = [] const [withSearch, withoutSearch] = partition(f => Boolean(f.search), filters) if (withSearch.length > 0) { @@ -35,7 +30,7 @@ export const makeFeedRequestHandler = ({signal}: FeedRequestHandlerOptions) => request({ signal, tracker, - onEvent + onEvent, threshold: 0.1, autoClose: true, filters: withSearch, diff --git a/packages/app/src/relaySelections.ts b/packages/app/src/relaySelections.ts index e2654ba..a33d492 100644 --- a/packages/app/src/relaySelections.ts +++ b/packages/app/src/relaySelections.ts @@ -10,7 +10,7 @@ import { getRelayTagValues, } from "@welshman/util" import {TrustedEvent, Filter, PublishedList, List} from "@welshman/util" -import {request, load, RequestEvent} from "@welshman/net" +import {request, load} from "@welshman/net" import {deriveEventsMapped} from "@welshman/store" import {repository} from "./core.js" import {Router} from "./router.js" @@ -52,13 +52,7 @@ export const makeOutboxLoader = (kinds: number[]) => { const filters = [{authors, kinds}] const relays = router.merge(scenarios).getUrls() - - const promise = new Promise(resolve => { - const req = request({filters, relays, autoClose: true}) - - req.on(RequestEvent.Eose, () => resolve()) - req.on(RequestEvent.Close, () => resolve()) - }) + const promise = request({filters, relays, autoClose: true}) return requests.map(always(promise)) }) diff --git a/packages/app/src/sync.ts b/packages/app/src/sync.ts index bdbadea..9753694 100644 --- a/packages/app/src/sync.ts +++ b/packages/app/src/sync.ts @@ -3,8 +3,7 @@ import {isSignedEvent, SignedEvent} from "@welshman/util" import { push as basePush, pull as basePull, - PublishEvent, - SinglePublish, + publishOne, requestOne, } from "@welshman/net" import {repository} from "./core.js" @@ -46,14 +45,7 @@ export const push = async ({relays, filters}: AppSyncOpts) => { relays.map(async relay => { await (hasNegentropy(relay) ? basePush({filters, events, relays: [relay]}) - : Promise.all( - events.map( - (event: SignedEvent) => - new Promise(resolve => { - new SinglePublish({event, relay}).on(PublishEvent.Complete, resolve) - }), - ), - )) + : Promise.all(events.map((event: SignedEvent) => publishOne({event, relay})))) }), ) } diff --git a/packages/app/src/thunk.ts b/packages/app/src/thunk.ts index 3367983..1e4624a 100644 --- a/packages/app/src/thunk.ts +++ b/packages/app/src/thunk.ts @@ -26,7 +26,7 @@ import { isUnwrappedEvent, isSignedEvent, } from "@welshman/util" -import {MultiPublish, AdapterContext, PublishStatus, PublishEvent} from "@welshman/net" +import {publish, AdapterContext, PublishStatus} from "@welshman/net" import {repository, tracker} from "./core.js" import {pubkey, getSession, getSigner} from "./session.js" @@ -47,14 +47,14 @@ export type ThunkStatus = { status: PublishStatus } -export type ThunkStatusByUrl = Record +export type ThunkStatusByRelay = Record export type Thunk = { event: TrustedEvent request: ThunkRequest controller: AbortController - result: Deferred - status: Writable + result: Deferred + status: Writable } export const prepEvent = (event: ThunkEvent) => { @@ -85,8 +85,8 @@ export const makeThunk = (request: ThunkRequest) => { export type MergedThunk = { thunks: Thunk[] controller: AbortController - result: Promise - status: Readable + result: Promise + status: Readable } export const isMergedThunk = (thunk: Thunk | MergedThunk): thunk is MergedThunk => @@ -108,7 +108,7 @@ export const mergeThunks = (thunks: Thunk[]) => { status: derived( thunks.map(thunk => thunk.status), statuses => { - const mergedStatus: ThunkStatusByUrl = {} + const mergedStatus: ThunkStatusByRelay = {} for (const url of uniq(statuses.flatMap(s => Object.keys(s)))) { const urlStatuses = statuses.map(s => s[url]) @@ -196,7 +196,7 @@ export const thunkQueue = new TaskQueue({ // Avoid making this function async so multiple publishes can run concurrently Promise.resolve().then(async () => { const fail = (message: string) => { - const status: ThunkStatusByUrl = {} + const status: ThunkStatusByRelay = {} for (const url of thunk.request.relays) { status[url] = {status: PublishStatus.Failure, message} @@ -239,17 +239,37 @@ export const thunkQueue = new TaskQueue({ fromPairs( thunk.request.relays.map(url => [ url, - {status: PublishStatus.Pending, message: "Sending your message..."}, + {status: Pending, message: "Sending your message..."}, ]), ), ) // Send it off - const pub = new MultiPublish({ + publish({ event: signedEvent, relays: thunk.request.relays, context: thunk.request.context, timeout: thunk.request.timeout, + onSuccess: (message: string, url: string) => { + tracker.track(signedEvent.id, url) + thunk.status.update(assoc(url, {status: PublishStatus.Success, message})) + }, + onFailure: (message: string, url: string) => { + thunk.status.update(assoc(url, {status: PublishStatus.Failure, message})) + }, + onTimeout: (url: string) => { + const message = "Publish timed out" + + thunk.status.update(assoc(url, {status: PublishStatus.Timeout, message})) + }, + onAborted: (url: string) => { + const message = "Publish was aborted" + + thunk.status.update(assoc(url, {status: PublishStatus.Aborted, message})) + }, + onComplete: () => { + thunk.result.resolve(get(thunk.status)) + }, }) // Copy the signature over since we had deferred it @@ -259,31 +279,6 @@ export const thunkQueue = new TaskQueue({ if (savedEvent) { savedEvent.sig = signedEvent.sig } - - pub.on(PublishEvent.Success, (id: string, message: string, url: string) => { - tracker.track(id, url) - thunk.status.update(assoc(url, {status: PublishStatus.Success, message})) - }) - - pub.on(PublishEvent.Failure, (id: string, message: string, url: string) => { - thunk.status.update(assoc(url, {status: PublishStatus.Failure, message})) - }) - - pub.on(PublishEvent.Timeout, (url: string) => { - thunk.status.update( - assoc(url, {status: PublishStatus.Timeout, message: "Publish timed out"}), - ) - }) - - pub.on(PublishEvent.Aborted, (url: string) => { - thunk.status.update( - assoc(url, {status: PublishStatus.Aborted, message: "Publish was aborted"}), - ) - }) - - pub.on(PublishEvent.Complete, () => { - thunk.result.resolve(get(thunk.status)) - }) }) }, }) diff --git a/packages/dvm/src/handler.ts b/packages/dvm/src/handler.ts index 5e67cc8..33f9155 100644 --- a/packages/dvm/src/handler.ts +++ b/packages/dvm/src/handler.ts @@ -1,7 +1,7 @@ import {now} from "@welshman/lib" import {Nip01Signer} from "@welshman/signer" import {TrustedEvent, StampedEvent, Filter} from "@welshman/util" -import {request, MultiPublish, PublishEvent, AdapterContext} from "@welshman/net" +import {request, publish, AdapterContext} from "@welshman/net" export type DVMHandler = { stop?: () => void @@ -118,8 +118,6 @@ export class DVM { const {relays, context} = this.opts const event = await this.signer.sign(template) - await new Promise(resolve => { - new MultiPublish({event, relays, context}).on(PublishEvent.Complete, resolve) - }) + await publish({event, relays, context}) } } diff --git a/packages/dvm/src/request.ts b/packages/dvm/src/request.ts index abc9e30..72e543c 100644 --- a/packages/dvm/src/request.ts +++ b/packages/dvm/src/request.ts @@ -1,6 +1,6 @@ import {Emitter, now} from "@welshman/lib" import {TrustedEvent, SignedEvent, Filter} from "@welshman/util" -import {request, MultiPublish, AdapterContext} from "@welshman/net" +import {request, publish, AdapterContext} from "@welshman/net" export enum DVMEvent { Progress = "progress", @@ -19,7 +19,6 @@ export type DVMRequestOptions = { export type DVMRequest = { options: DVMRequestOptions emitter: Emitter - pub: MultiPublish } export const makeDvmRequest = (options: DVMRequestOptions) => { @@ -56,8 +55,8 @@ export const makeDvmRequest = (options: DVMRequestOptions) => { }, }) - const pub = new MultiPublish({relays, event, timeout, context}) + publish({relays, event, timeout, context}) - return {options, emitter, pub} as DVMRequest + return {options, emitter} as DVMRequest } diff --git a/packages/net/__tests__/publish.test.ts b/packages/net/__tests__/publish.test.ts index e2f4e0e..6ee88ff 100644 --- a/packages/net/__tests__/publish.test.ts +++ b/packages/net/__tests__/publish.test.ts @@ -1,11 +1,11 @@ import {describe, expect, it, vi, beforeEach, afterEach} from "vitest" -import {SinglePublish, MultiPublish, PublishEvent} from "../src/publish" +import {publishOne, publish} from "../src/publish" import {MockAdapter} from "../src/adapter" import {ClientMessageType} from "../src/message" import {makeEvent} from "@welshman/util" import {Nip01Signer} from "@welshman/signer" -describe("SinglePublish", () => { +describe("publishOne", () => { beforeEach(() => { vi.useFakeTimers() }) @@ -19,20 +19,18 @@ describe("SinglePublish", () => { const adapter = new MockAdapter("1", sendSpy) const signer = Nip01Signer.ephemeral() const event = await signer.sign(makeEvent(1)) - - const pub = new SinglePublish({ - relay: "1", - context: {getAdapter: () => adapter}, - event, - }) - const successSpy = vi.fn() const failureSpy = vi.fn() const completeSpy = vi.fn() - pub.on(PublishEvent.Success, successSpy) - pub.on(PublishEvent.Failure, failureSpy) - pub.on(PublishEvent.Complete, completeSpy) + publishOne({ + event, + relay: "1", + context: {getAdapter: () => adapter}, + onSuccess: successSpy, + onFailure: failureSpy, + onComplete: completeSpy, + }) await vi.advanceTimersByTimeAsync(200) @@ -42,7 +40,7 @@ describe("SinglePublish", () => { await vi.runAllTimers() - expect(successSpy).toHaveBeenCalledWith(event.id, "hi") + expect(successSpy).toHaveBeenCalledWith("hi", "1") expect(failureSpy).not.toHaveBeenCalled() expect(completeSpy).toHaveBeenCalled() }) @@ -52,20 +50,18 @@ describe("SinglePublish", () => { const adapter = new MockAdapter("1", sendSpy) const signer = Nip01Signer.ephemeral() const event = await signer.sign(makeEvent(1)) - - const pub = new SinglePublish({ - relay: "1", - context: {getAdapter: () => adapter}, - event, - }) - const successSpy = vi.fn() const failureSpy = vi.fn() const completeSpy = vi.fn() - pub.on(PublishEvent.Success, successSpy) - pub.on(PublishEvent.Failure, failureSpy) - pub.on(PublishEvent.Complete, completeSpy) + publishOne({ + event, + relay: "1", + context: {getAdapter: () => adapter}, + onSuccess: successSpy, + onFailure: failureSpy, + onComplete: completeSpy, + }) await vi.advanceTimersByTimeAsync(200) @@ -76,7 +72,7 @@ describe("SinglePublish", () => { await vi.runAllTimers() expect(successSpy).not.toHaveBeenCalled() - expect(failureSpy).toHaveBeenCalledWith(event.id, "hi") + expect(failureSpy).toHaveBeenCalledWith("hi", "1") expect(completeSpy).toHaveBeenCalled() }) @@ -85,22 +81,20 @@ describe("SinglePublish", () => { const adapter = new MockAdapter("1", sendSpy) const signer = Nip01Signer.ephemeral() const event = await signer.sign(makeEvent(1)) - - const pub = new SinglePublish({ - relay: "1", - context: {getAdapter: () => adapter}, - event, - }) - const successSpy = vi.fn() const failureSpy = vi.fn() const completeSpy = vi.fn() const timeoutSpy = vi.fn() - pub.on(PublishEvent.Success, successSpy) - pub.on(PublishEvent.Failure, failureSpy) - pub.on(PublishEvent.Complete, completeSpy) - pub.on(PublishEvent.Timeout, timeoutSpy) + publishOne({ + event, + relay: "1", + context: {getAdapter: () => adapter}, + onSuccess: successSpy, + onFailure: failureSpy, + onComplete: completeSpy, + onTimeout: timeoutSpy, + }) await vi.runAllTimers() @@ -119,28 +113,28 @@ describe("SinglePublish", () => { const adapter = new MockAdapter("1", sendSpy) const signer = Nip01Signer.ephemeral() const event = await signer.sign(makeEvent(1)) - - const pub = new SinglePublish({ - relay: "1", - context: {getAdapter: () => adapter}, - event, - }) - + const ctrl = new AbortController() const successSpy = vi.fn() const failureSpy = vi.fn() const completeSpy = vi.fn() const abortSpy = vi.fn() - pub.on(PublishEvent.Success, successSpy) - pub.on(PublishEvent.Failure, failureSpy) - pub.on(PublishEvent.Complete, completeSpy) - pub.on(PublishEvent.Timeout, abortSpy) + publishOne({ + event, + relay: "1", + signal: ctrl.signal, + context: {getAdapter: () => adapter}, + onSuccess: successSpy, + onFailure: failureSpy, + onComplete: completeSpy, + onTimeout: abortSpy, + }) await vi.runAllTimers() expect(sendSpy).toHaveBeenCalledWith([ClientMessageType.Event, event]) - pub.abort() + ctrl.abort() await vi.runAllTimers() @@ -151,7 +145,7 @@ describe("SinglePublish", () => { }) }) -describe("MultiPublish", () => { +describe("publish", () => { beforeEach(() => { vi.useFakeTimers() }) @@ -169,8 +163,12 @@ describe("MultiPublish", () => { const adapter3 = new MockAdapter("3", send3Spy) const signer = Nip01Signer.ephemeral() const event = await signer.sign(makeEvent(1)) + const successSpy = vi.fn() + const failureSpy = vi.fn() + const completeSpy = vi.fn() + const timeoutSpy = vi.fn() - const pub = new MultiPublish({ + publish({ event, relays: ["1", "2", "3"], context: { @@ -187,25 +185,19 @@ describe("MultiPublish", () => { } }, }, + onSuccess: successSpy, + onFailure: failureSpy, + onComplete: completeSpy, + onTimeout: timeoutSpy, }) - const successSpy = vi.fn() - const failureSpy = vi.fn() - const completeSpy = vi.fn() - const timeoutSpy = vi.fn() - - pub.on(PublishEvent.Success, successSpy) - pub.on(PublishEvent.Failure, failureSpy) - pub.on(PublishEvent.Complete, completeSpy) - pub.on(PublishEvent.Timeout, timeoutSpy) - adapter1.receive(["OK", event.id, true, "hi"]) adapter2.receive(["OK", event.id, false, "hi"]) - await vi.runAllTimers() + await vi.runAllTimersAsync() - expect(successSpy).toHaveBeenCalledWith(event.id, "hi", "1") - expect(failureSpy).toHaveBeenCalledWith(event.id, "hi", "2") + expect(successSpy).toHaveBeenCalledWith("hi", "1") + expect(failureSpy).toHaveBeenCalledWith("hi", "2") expect(completeSpy).toHaveBeenCalledTimes(1) expect(timeoutSpy).toHaveBeenCalledWith("3") }) diff --git a/packages/net/__tests__/request.test.ts b/packages/net/__tests__/request.test.ts index 52046fc..3bf5a86 100644 --- a/packages/net/__tests__/request.test.ts +++ b/packages/net/__tests__/request.test.ts @@ -15,8 +15,14 @@ describe("requestOne", () => { }) it("everything basically works", async () => { - const sendSpy = vi.fn() + let id + const sendSpy = vi.fn(m => { + if (m[0] === 'REQ') { + id = m[1] + } + }) const adapter = new MockAdapter("1", sendSpy) + const ctrl = new AbortController() const duplicateSpy = vi.fn() const invalidSpy = vi.fn() const filteredSpy = vi.fn() @@ -28,6 +34,7 @@ describe("requestOne", () => { relay: "whatever", filters: [{kinds: [1]}], context: {getAdapter: () => adapter}, + signal: ctrl.signal, onDuplicate: duplicateSpy, onInvalid: invalidSpy, onFiltered: filteredSpy, @@ -38,7 +45,7 @@ describe("requestOne", () => { await vi.runAllTimersAsync() - expect(sendSpy).toHaveBeenCalledWith([ClientMessageType.Req, expect.any(String), {kinds: [1]}]) + expect(sendSpy).toHaveBeenCalledWith([ClientMessageType.Req, id, {kinds: [1]}]) const signer = Nip01Signer.ephemeral() const event1 = await signer.sign(makeEvent(1)) @@ -52,17 +59,17 @@ describe("requestOne", () => { await vi.runAllTimersAsync() - expect(duplicateSpy).toHaveBeenCalledWith(event1) - expect(filteredSpy).toHaveBeenCalledWith(event2) - expect(invalidSpy).toHaveBeenCalledWith(event3) - expect(eventSpy).toHaveBeenCalledWith(event1) + expect(duplicateSpy).toHaveBeenCalledWith(event1, "1") + expect(filteredSpy).toHaveBeenCalledWith(event2, "1") + expect(invalidSpy).toHaveBeenCalledWith(event3, "1") + expect(eventSpy).toHaveBeenCalledWith(event1, "1") expect(eoseSpy).toHaveBeenCalledTimes(0) adapter.receive(["EOSE", id]) expect(eoseSpy).toHaveBeenCalledTimes(1) - req.close() + ctrl.abort() expect(closeSpy).toHaveBeenCalledTimes(1) }) @@ -78,10 +85,20 @@ describe("request", () => { }) it("everything basically works", async () => { - const send1Spy = vi.fn() + let id1, id2 + const send1Spy = vi.fn(m => { + if (m[0] === 'REQ') { + id1 = m[1] + } + }) const adapter1 = new MockAdapter("1", send1Spy) - const send2Spy = vi.fn() + const send2Spy = vi.fn(m => { + if (m[0] === 'REQ') { + id2 = m[1] + } + }) const adapter2 = new MockAdapter("2", send2Spy) + const ctrl = new AbortController() const duplicateSpy = vi.fn() const invalidSpy = vi.fn() const filteredSpy = vi.fn() @@ -92,6 +109,7 @@ describe("request", () => { request({ relays: ["1", "2"], filters: [{kinds: [1]}], + signal: ctrl.signal, context: { getAdapter: (url: string) => (url === "1" ? adapter1 : adapter2), }, @@ -105,8 +123,8 @@ describe("request", () => { await vi.runAllTimersAsync() - expect(send1Spy).toHaveBeenCalledTimes(1) - expect(send2Spy).toHaveBeenCalledTimes(1) + expect(send1Spy).toHaveBeenCalledWith([ClientMessageType.Req, id1, {kinds: [1]}]) + expect(send2Spy).toHaveBeenCalledWith([ClientMessageType.Req, id2, {kinds: [1]}]) const signer = Nip01Signer.ephemeral() const event1 = await signer.sign(makeEvent(1)) @@ -114,11 +132,11 @@ describe("request", () => { const event3 = makeEvent(1) const event4 = await signer.sign(makeEvent(1)) - adapter1.receive(["EVENT", expect.any(String), event1]) - adapter1.receive(["EVENT", expect.any(String), event2]) - adapter1.receive(["EVENT", expect.any(String), event3]) - adapter2.receive(["EVENT", expect.any(String), event1]) - adapter2.receive(["EVENT", expect.any(String), event4]) + adapter1.receive(["EVENT", id1, event1]) + adapter1.receive(["EVENT", id1, event2]) + adapter1.receive(["EVENT", id1, event3]) + adapter2.receive(["EVENT", id2, event1]) + adapter2.receive(["EVENT", id2, event4]) await vi.runAllTimersAsync() @@ -128,12 +146,12 @@ describe("request", () => { expect(eventSpy).toHaveBeenCalledWith(event1, "1") expect(eoseSpy).toHaveBeenCalledTimes(0) - adapter1.receive(["EOSE", expect.any(String)]) - adapter2.receive(["EOSE", expect.any(String)]) + adapter1.receive(["EOSE", id1]) + adapter2.receive(["EOSE", id2]) expect(eoseSpy).toHaveBeenCalledTimes(2) - req.close() + ctrl.abort() expect(closeSpy).toHaveBeenCalledTimes(1) }) diff --git a/packages/net/src/diff.ts b/packages/net/src/diff.ts index c113ad3..a9d1523 100644 --- a/packages/net/src/diff.ts +++ b/packages/net/src/diff.ts @@ -11,7 +11,7 @@ import { import {getAdapter, AdapterContext, AbstractAdapter, AdapterEvent} from "./adapter.js" import {Negentropy, NegentropyStorageVector} from "./negentropy.js" import {requestOne} from "./request.js" -import {MultiPublish, PublishEvent} from "./publish.js" +import {publish} from "./publish.js" export enum DifferenceEvent { Message = "difference:event:message", @@ -237,9 +237,7 @@ export const push = async ({context, events, ...options}: PushOptions) => { const relays = relaysById.get(event.id) if (relays) { - new Promise(resolve => { - new MultiPublish({event, relays, context}).on(PublishEvent.Complete, resolve) - }) + await publish({event, relays, context}) } }), ) diff --git a/packages/net/src/publish.ts b/packages/net/src/publish.ts index db5c10c..5ae003b 100644 --- a/packages/net/src/publish.ts +++ b/packages/net/src/publish.ts @@ -12,159 +12,122 @@ export enum PublishStatus { Aborted = "publish:status:aborted", } -export enum PublishEvent { - Success = "publish:event:success", - Failure = "publish:event:failure", - Timeout = "publish:event:timeout", - Aborted = "publish:event:aborted", - Complete = "publish:event:complete", +export type PublishResult = { + status: PublishStatus + detail: string } -// SinglePublish - -export type SinglePublishOptions = { +export type PublishOneOptions = { event: SignedEvent relay: string - context?: AdapterContext + signal?: AbortSignal timeout?: number + context?: AdapterContext + onStatus?: (status: PublishStatus, relay: string) => void + onSuccess?: (detail: string, relay: string) => void + onFailure?: (detail: string, relay: string) => void + onTimeout?: (relay: string) => void + onAborted?: (relay: string) => void + onComplete?: () => void } -export class SinglePublish extends EventEmitter { - status = PublishStatus.Pending +export const publishOne = (options: PublishOneOptions) => + new Promise(resolve => { + const adapter = getAdapter(options.relay, options.context) - _unsubscriber: () => void - _adapter: AbstractAdapter + let status = PublishStatus.Pending - constructor(readonly options: SinglePublishOptions) { - super() + const setStatus = (_status: PublishStatus) => { + status = _status + options.onStatus?.(status, options.relay) + } - // Set up our adapter - this._adapter = getAdapter(this.options.relay, this.options.context) + const cleanup = () => { + options.onComplete?.() + adapter.cleanup() + resolve(status) + } - // Listen for SinglePublish result - this._unsubscriber = on( - this._adapter, + adapter.on( AdapterEvent.Receive, (message: RelayMessage, url: string) => { if (isRelayOk(message)) { const [_, id, ok, detail] = message - if (id !== this.options.event.id) return + if (id !== options.event.id) return if (ok) { - this.status = PublishStatus.Success - this.emit(PublishEvent.Success, id, detail) + setStatus(PublishStatus.Success) + options.onSuccess?.(detail, options.relay) } else { - this.status = PublishStatus.Failure - this.emit(PublishEvent.Failure, id, detail) + setStatus(PublishStatus.Failure) + options.onFailure?.(detail, options.relay) } - this.cleanup() + cleanup() } }, ) - // Set timeout - sleep(this.options.timeout || 10_000).then(() => { - if (this.status === PublishStatus.Pending) { - this.status = PublishStatus.Timeout - this.emit(PublishEvent.Timeout) + options.signal?.addEventListener('abort', () => { + if (status === PublishStatus.Pending) { + setStatus(PublishStatus.Aborted) + options.onAborted?.(options.relay) } - this.cleanup() + cleanup() }) - // Start asynchronously so the caller can set up listeners - yieldThread().then(() => { - this._adapter.send([ClientMessageType.Event, this.options.event]) - }) - } + setTimeout(() => { + if (status === PublishStatus.Pending) { + setStatus(PublishStatus.Timeout) + options.onTimeout?.(options.relay) + } - abort = () => { - if (this.status === PublishStatus.Pending) { - this.status = PublishStatus.Aborted - this.emit(PublishEvent.Aborted) - this.cleanup() - } - } + cleanup() + }, options.timeout || 10_000) - cleanup = () => { - this.emit(PublishEvent.Complete) - this.removeAllListeners() - this._adapter.cleanup() - this._unsubscriber() - } -} + adapter.send([ClientMessageType.Event, options.event]) -// MultiPublish + setStatus(PublishStatus.Pending) + }) -export type MultiPublishOptions = Omit & { +export type PublishStatusByRelay = Record + +export type PublishOptions = Omit & { relays: string[] + onUpdate?: (status: PublishStatusByRelay) => void } -export class MultiPublish extends EventEmitter { - status: Record +export const publish = async (options: PublishOptions) => { + const status: PublishStatusByRelay = {} + const completed = new Set() + const relays = new Set(options.relays) - _children: SinglePublish[] = [] - _completed = new Set() - - constructor(options: MultiPublishOptions) { - super() - - const relays = new Set(options.relays) - - if (relays.size !== options.relays.length) { - console.warn("Non-unique relays passed to MultiPublish") - } - - this.status = fromPairs(Array.from(relays).map(relay => [relay, PublishStatus.Pending])) - - for (const relay of relays) { - const unicast = new SinglePublish({relay, ...options}) - - unicast.on(PublishEvent.Success, (id: string, detail: string) => { - this.status[relay] = unicast.status - this.emit(PublishEvent.Success, id, detail, relay) - }) - - unicast.on(PublishEvent.Failure, (id: string, detail: string) => { - this.status[relay] = unicast.status - this.emit(PublishEvent.Failure, id, detail, relay) - }) - - unicast.on(PublishEvent.Timeout, () => { - this.status[relay] = unicast.status - this.emit(PublishEvent.Timeout, relay) - }) - - unicast.on(PublishEvent.Aborted, () => { - this.status[relay] = unicast.status - this.emit(PublishEvent.Aborted, relay) - }) - - unicast.on(PublishEvent.Complete, () => { - this._completed.add(relay) - this.status[relay] = unicast.status - - if (this._completed.size === relays.size) { - this.emit(PublishEvent.Complete) - this.cleanup() - } - }) - - this._children.push(unicast) - } + if (relays.size !== options.relays.length) { + console.warn("Non-unique relays passed to publish") } - abort() { - for (const child of this._children) { - child.abort() - } - } + await Promise.all( + options.relays.map(relay => + publishOne({ + relay, + ...options, + onStatus: (_status: PublishStatus, relay: string) => { + status[relay] = _status + options.onStatus?.(_status, relay) + options.onUpdate?.(status) + }, + onComplete: () => { + completed.add(relay) - cleanup() { - this.removeAllListeners() - } + if (completed.size === relays.size) { + options.onComplete?.() + } + }, + }) + ) + ) + + return status } - -export const publish = (options: MultiPublishOptions) => new MultiPublish(options) diff --git a/packages/signer/src/signers/nip46.ts b/packages/signer/src/signers/nip46.ts index 5086450..ff4c185 100644 --- a/packages/signer/src/signers/nip46.ts +++ b/packages/signer/src/signers/nip46.ts @@ -15,7 +15,7 @@ import { StampedEvent, NOSTR_CONNECT, } from "@welshman/util" -import {MultiPublish, request, AdapterContext} from "@welshman/net" +import {publish, request, AdapterContext} from "@welshman/net" import {ISigner, EncryptionImplementation, decrypt, hash, own} from "../util.js" import {Nip01Signer} from "./nip01.js" @@ -167,9 +167,10 @@ export class Nip46Sender extends Emitter { const content = await this.signer[algorithm].encrypt(signerPubkey, payload) const template = createEvent(NOSTR_CONNECT, {content, tags: [["p", signerPubkey]]}) const event = await this.signer.sign(template) - const pub = new MultiPublish({relays, event, context}) - this.emit(Nip46Event.Send, {...request, pub}) + publish({relays, event, context}) + + this.emit(Nip46Event.Send, request) } // process the queue of requests diff --git a/packages/util/src/Events.ts b/packages/util/src/Events.ts index 4a5c0b5..68eb064 100644 --- a/packages/util/src/Events.ts +++ b/packages/util/src/Events.ts @@ -74,7 +74,7 @@ export const verifyEvent = (() => { }) } - return (event: TrustedEvent) => Boolean(event.sig && verify(event as SignedEvent)) + return (event: TrustedEvent) => Boolean(event.sig && (event[verifiedSymbol] || verify(event as SignedEvent))) })() export const isEventTemplate = (e: EventTemplate): e is EventTemplate =>