import type {Subscriber} from "svelte/store" import {writable} from "svelte/store" import type {Override} from "@welshman/lib" import {append, TaskQueue, ensurePlural, remove, defer, sleep, nth, without} from "@welshman/lib" import { HashedEvent, EventTemplate, SignedEvent, isSignedEvent, WRAPPED_KINDS, prep, makePow, } from "@welshman/util" import {PublishStatus, PublishResult, PublishOptions, PublishResultsByRelay} from "@welshman/net" import {Nip01Signer, Nip59} from "@welshman/signer" import type {Client} from './client.js' import type {User} from './user.js' export type ThunkOptions = Override< PublishOptions, { user: User client: Client event: EventTemplate recipient?: string delay?: number pow?: number } > export class Thunk { _subs: Subscriber[] = [] event: HashedEvent results: PublishResultsByRelay = {} complete = defer() controller = new AbortController() wrap?: SignedEvent constructor(readonly options: ThunkOptions) { if (!options.recipient && WRAPPED_KINDS.includes(options.event.kind)) { throw new Error(`Attempted to publish a kind ${options.event.kind} without wrapping it`) } this.event = prep(options.event, this.options.user.pubkey) for (const relay of options.relays) { this.results[relay] = { relay, status: PublishStatus.Sending, detail: "sending...", } } this.controller.signal.addEventListener("abort", () => { for (const relay of options.relays) { this._setAborted({ relay, status: PublishStatus.Aborted, detail: "aborted", }) } }) } _notify() { for (const subscriber of this._subs) { subscriber(this) } } _fail(detail: string) { for (const relay of this.options.relays) { this.results[relay] = { relay, status: PublishStatus.Failure, detail: detail, } } this._notify() } _setPending = (result: PublishResult) => { this.options.onPending?.(result) this.results[result.relay] = result this._notify() } _setTimeout = (result: PublishResult) => { this.options.onTimeout?.(result) this.results[result.relay] = result this._notify() } _setAborted = (result: PublishResult) => { this.options.onAborted?.(result) this.results[result.relay] = result this._notify() } async _publish(event: SignedEvent) { // Wait if the thunk is to be delayed if (this.options.delay) { await sleep(this.options.delay) } // Skip publishing if aborted if (this.controller.signal.aborted) { return } // Send it off await this.options.client.publish({ ...this.options, event, onSuccess: (result: PublishResult) => { this.options.onSuccess?.(result) this.results[result.relay] = result this._notify() }, onFailure: (result: PublishResult) => { this.options.onFailure?.(result) this.results[result.relay] = result this._notify() }, onPending: this._setPending, onTimeout: this._setTimeout, onAborted: this._setAborted, onComplete: (result: PublishResult) => { if (result.status !== PublishStatus.Success) { this.options.client.tracker.removeRelay(event.id, result.relay) } this.options.onComplete?.(result) this._subs = [] }, }) // Notify the caller that we're done this.complete.resolve() } async publish() { // Handle abort immediately if possible if (this.controller.signal.aborted) return const {recipient} = this.options // If we're sending it privately, wrap the event using nip 59 if (recipient) { const wrapper = Nip01Signer.ephemeral() const nip59 = new Nip59(this.options.user.signer, wrapper) this.wrap = await nip59.wrap(recipient, this.event) // If we're calculating pow, update the hash and re-sign if (this.options.pow) { this.wrap = await wrapper.sign(await makePow(this.wrap, this.options.pow).result, { signal: AbortSignal.timeout(30_000), }) } this.options.client.wrapManager.add({recipient, wrap: this.wrap, rumor: this.event}) return this._publish(this.wrap) } // If the event has been signed, we're good to go if (isSignedEvent(this.event)) { if (this.options.pow) { console.warn("Event is already signed, skipping proof of work calculation") } return this._publish(this.event) } // Allow for lazily signing/powing events in order to decrease apparent latency in the UI // that results from waiting for remote signers try { if (this.options.pow) { this.event = await makePow(this.event, this.options.pow).result } const signedEvent = await this.options.user.signer.sign(this.event, { signal: AbortSignal.timeout(30_000), }) // Update tracker and repository with the signed event since the id will have changed if (this.options.pow) { for (const url of this.options.relays) { this.options.client.tracker.removeRelay(this.event.id, url) this.options.client.tracker.track(signedEvent.id, url) } } this.options.client.repository.removeEvent(this.event.id) this.options.client.repository.publish(signedEvent) return this._publish(signedEvent) } catch (e: any) { console.error("Failed to sign event", e) return this._fail(String(e || "Failed to sign event")) } } enqueue() { thunkQueue.push(this) for (const url of this.options.relays) { this.options.client.tracker.track(this.event.id, url) } this.options.client.repository.publish(this.event) thunks.update($thunks => append(this, $thunks)) this.controller.signal.addEventListener("abort", () => { if (this.wrap) { this.options.client.wrapManager.remove(this.wrap.id) } else { this.options.client.repository.removeEvent(this.event.id) } thunks.update($thunks => remove(this, $thunks)) }) } subscribe(subscriber: Subscriber) { this._subs.push(subscriber) subscriber(this) return () => { this._subs = remove(subscriber, this._subs) } } } export class MergedThunk { _subs: Subscriber[] = [] results: PublishResultsByRelay = {} constructor(readonly thunks: Thunk[]) { const {Aborted, Failure, Timeout, Pending, Sending, Success} = PublishStatus const relays = new Set(thunks.flatMap(thunk => thunk.options.relays)) for (const thunk of thunks) { thunk.subscribe($thunk => { this.results = {} for (const relay of relays) { for (const status of [Aborted, Failure, Timeout, Pending, Sending, Success]) { const thunk = thunks.find(t => t.results[relay]?.status === status) if (thunk) { this.results[relay] = thunk.results[relay]! } } } this._notify() if (thunks.every(thunkIsComplete)) { this._subs = [] } }) } } _notify() { for (const subscriber of this._subs) { subscriber(this) } } subscribe(subscriber: Subscriber) { this._subs.push(subscriber) subscriber(this) return () => { this._subs = remove(subscriber, this._subs) } } } export type AbstractThunk = Thunk | MergedThunk export const isThunk = (thunk: AbstractThunk): thunk is Thunk => thunk instanceof Thunk export const isMergedThunk = (thunk: AbstractThunk): thunk is MergedThunk => thunk instanceof MergedThunk // Thunk status urls export const getThunkUrlsWithStatus = ( statuses: PublishStatus | PublishStatus[], thunk: AbstractThunk, ) => { statuses = ensurePlural(statuses) return Object.entries(thunk.results) .filter(([_, {status}]) => statuses.includes(status)) .map(nth(0)) as string[] } export const getCompleteThunkUrls = (thunk: AbstractThunk) => getThunkUrlsWithStatus( without([PublishStatus.Sending, PublishStatus.Pending], Object.values(PublishStatus)), thunk, ) export const getIncompleteThunkUrls = (thunk: AbstractThunk) => getThunkUrlsWithStatus([PublishStatus.Sending, PublishStatus.Pending], thunk) export const getFailedThunkUrls = (thunk: AbstractThunk) => getThunkUrlsWithStatus([PublishStatus.Failure, PublishStatus.Timeout], thunk) // Thunk status checks export const thunkHasStatus = (statuses: PublishStatus | PublishStatus[], thunk: AbstractThunk) => getThunkUrlsWithStatus(statuses, thunk).length > 0 export const thunkIsComplete = (thunk: AbstractThunk) => !thunkHasStatus([PublishStatus.Sending, PublishStatus.Pending], thunk) // Thunk errors export const getThunkError = (thunk: Thunk) => { for (const [_, {status, detail}] of Object.entries(thunk.results)) { if (status === PublishStatus.Failure) { return detail } } if (thunkIsComplete(thunk)) { return "" } } // Thunk utilities that return promises export const waitForThunkError = (thunk: Thunk) => new Promise(resolve => { thunk.subscribe($thunk => { const error = getThunkError($thunk) if (error !== undefined) { resolve(error) } }) }) export const waitForThunkCompletion = (thunk: Thunk) => new Promise(resolve => { thunk.subscribe($thunk => { if (thunkIsComplete($thunk)) { resolve() } }) }) // Thunk state export const thunks = writable([]) export const thunkQueue = new TaskQueue({ batchSize: 10, batchDelay: 100, processItem: (thunk: Thunk) => { thunk.publish() }, }) // Other thunk utilities export const mergeThunks = (thunks: AbstractThunk[]) => new MergedThunk(Array.from(flattenThunks(thunks))) export function* flattenThunks(thunks: AbstractThunk[]): Iterable { for (const thunk of thunks) { if (isMergedThunk(thunk)) { yield* flattenThunks(thunk.thunks) } else { yield thunk } } } export const publishThunk = (options: ThunkOptions) => { const thunk = new Thunk(options) thunk.enqueue() return thunk } export const abortThunk = (thunk: AbstractThunk) => { for (const child of flattenThunks([thunk])) { child.controller.abort() } } export const retryThunk = (thunk: AbstractThunk) => isMergedThunk(thunk) ? mergeThunks(thunk.thunks.map(t => publishThunk(t.options))) : publishThunk(thunk.options)