import {writable, derived, get} from 'svelte/store' import type {Writable, Readable} from 'svelte/store' import {Worker, identity, uniq, defer, sleep, assoc} from '@welshman/lib' import type {Deferred} from '@welshman/lib' import {stamp, own, hash} from "@welshman/signer" import type {TrustedEvent, HashedEvent, EventTemplate, SignedEvent, StampedEvent, OwnedEvent} from '@welshman/util' import {isStampedEvent, isOwnedEvent, isHashedEvent, isUnwrappedEvent, isSignedEvent} from '@welshman/util' import {publish, PublishStatus} from "@welshman/net" import {repository, tracker} from './core' import {pubkey, getSession, getSigner} from './session' const {Pending, Success, Failure, Timeout, Aborted} = PublishStatus export type ThunkEvent = EventTemplate | StampedEvent | OwnedEvent | TrustedEvent export type ThunkRequest = { event: ThunkEvent relays: string[] delay?: number } export type ThunkStatus = { message: string status: PublishStatus } export type ThunkStatusByUrl = Record export type Thunk = { event: TrustedEvent request: ThunkRequest controller: AbortController result: Deferred status: Writable } export const prepEvent = (event: ThunkEvent) => { if (!isStampedEvent(event as StampedEvent)) { event = stamp(event) } if (!isOwnedEvent(event as OwnedEvent)) { event = own(event as StampedEvent, get(pubkey)!) } if (!isHashedEvent(event as HashedEvent)) { event = hash(event as OwnedEvent) } return event as TrustedEvent } export const makeThunk = (request: ThunkRequest) => { const event = prepEvent(request.event) const controller = new AbortController() const result: Thunk['result'] = defer() const status: Thunk['status'] = writable({}) return {event, request, controller, result, status} } export type MergedThunk = { thunks: Thunk[] controller: AbortController result: Promise status: Readable } export const mergeThunks = (thunks: Thunk[]) => { const controller = new AbortController() controller.signal.addEventListener('abort', () => { for (const thunk of thunks) { thunk.controller.abort() } }) return { thunks, controller, result: Promise.all(thunks.map(thunk => thunk.result)), status: derived( thunks.map(thunk => thunk.status), statuses => { const mergedStatus: ThunkStatusByUrl = {} for (const url of uniq(statuses.flatMap(s => Object.keys(s)))) { const urlStatuses = statuses.map(s => s[url]) const thunkStatus = [Aborted, Failure, Timeout, Pending, Success] .map(status => urlStatuses.find(s => s?.status === status)) .find(identity) if (thunkStatus) { mergedStatus[url] = thunkStatus } } return mergedStatus } ) } } export const publishThunk = (request: ThunkRequest) => { const thunk = makeThunk(request) thunkWorker.push(thunk) repository.publish(thunk.event) thunk.controller.signal.addEventListener('abort', () => { repository.removeEvent(thunk.event.id) }) return thunk } export const thunkWorker = new Worker() thunkWorker.addGlobalHandler((thunk: Thunk) => { let event = thunk.event // Handle abort immediately if possible if (thunk.controller.signal.aborted) return // If we were given a wrapped event, make sure to publish the wrapper, not the rumor if (isUnwrappedEvent(event)) { event = event.wrap } // Avoid making this function async so multiple publishes can run concurrently Promise.resolve().then(async () => { // If the event was already signed, leave it alone. Otherwise, sign it now. This is to // decrease apparent latency in the UI that results from waiting for remote signers if (!isSignedEvent(event)) { const signer = getSigner(getSession(event.pubkey)) if (!signer) { return console.warn(`No signer found for ${event.pubkey}`) } event = await signer.sign(event) } // We're guaranteed to have a signed event at this point const signedEvent = event as SignedEvent // Wait if the thunk is to be delayed if (thunk.request.delay) { await sleep(thunk.request.delay) } // Skip publishing if aborted if (thunk.controller.signal.aborted) { return } // Send it off const pub = publish({event: signedEvent, relays: thunk.request.relays}) // Copy the signature over since we had deferred it const savedEvent = repository.getEvent(signedEvent.id) as SignedEvent // The event may already be replaced or deleted if (savedEvent) { savedEvent.sig = signedEvent.sig } const completed = new Set() pub.emitter.on("*", async (status: PublishStatus, url: string, message = "") => { thunk.status.update(assoc(url, {status, message})) if (status !== PublishStatus.Pending) { completed.add(url) } if (status === PublishStatus.Success) { tracker.track(signedEvent.id, url) } if (completed.size === thunk.request.relays.length) { thunk.result.resolve(get(thunk.status)) } }) }) })