From 0523951e5a66941d7838edd52910461e92a68a20 Mon Sep 17 00:00:00 2001 From: Jon Staab Date: Thu, 17 Oct 2024 08:38:53 -0700 Subject: [PATCH] Re-work thunks --- packages/app/src/thunk.ts | 150 +++++++++++++++-------------- packages/net/src/ConnectionAuth.ts | 2 +- 2 files changed, 78 insertions(+), 74 deletions(-) diff --git a/packages/app/src/thunk.ts b/packages/app/src/thunk.ts index bfa2a43..908eeaa 100644 --- a/packages/app/src/thunk.ts +++ b/packages/app/src/thunk.ts @@ -1,5 +1,7 @@ import {writable, get} from 'svelte/store' -import {Worker, sleep, assoc} from '@welshman/lib' +import type {Writable} from 'svelte/store' +import {Worker, defer, sleep, assoc} from '@welshman/lib' +import type {Deferred} from '@welshman/lib' import {stamp, own, hash} from "@welshman/signer" import type {TrustedEvent, HashedEvent, EventTemplate, SignedEvent, StampedEvent, OwnedEvent} from '@welshman/util' import {isStampedEvent, isOwnedEvent, isHashedEvent, isUnwrappedEvent, isSignedEvent} from '@welshman/util' @@ -7,37 +9,76 @@ import {publish, PublishStatus} from "@welshman/net" import {repository, tracker} from './core' import {pubkey, getSession, getSigner} from './session' -export type PublishStatusData = { - id: string - url: string +export type ThunkEvent = EventTemplate | StampedEvent | OwnedEvent | TrustedEvent + +export type ThunkRequest = { + event: ThunkEvent + relays: string[] + delay?: number +} + +export type ThunkStatus = { message: string status: PublishStatus } -export type PublishStatusDataByUrl = Record +export type ThunkStatusByUrl = Record -export type PublishStatusDataByUrlById = Record - -export const publishStatusData = writable({}) - -export type ThunkWithResolve = { +export type Thunk = { event: TrustedEvent - relays: string[] - resolve: (data: PublishStatusDataByUrl) => void - delay?: number - signal?: AbortSignal + request: ThunkRequest + controller: AbortController + result: Deferred + status: Writable } -export const thunkWorker = new Worker() +export const prepEvent = (event: ThunkEvent) => { + if (!isStampedEvent(event as StampedEvent)) { + event = stamp(event) + } -thunkWorker.addGlobalHandler(async ({event, relays, resolve, delay, signal}: ThunkWithResolve) => { - let aborted = false + if (!isOwnedEvent(event as OwnedEvent)) { + event = own(event as StampedEvent, get(pubkey)!) + } - // Handle abort - signal?.addEventListener('abort', () => { - aborted = true + if (!isHashedEvent(event as HashedEvent)) { + event = hash(event as OwnedEvent) + } + + return event as TrustedEvent +} + +export const makeThunk = (request: ThunkRequest) => { + const event = prepEvent(request.event) + const controller = new AbortController() + const result: Thunk['result'] = defer() + const status: Thunk['status'] = writable({}) + + return {event, request, controller, result, status} +} + +export const publishThunk = (request: ThunkRequest) => { + const thunk = makeThunk(request) + + thunkWorker.push(thunk) + + repository.publish(thunk.event) + + thunk.controller.signal.addEventListener('abort', () => { + repository.removeEvent(thunk.event.id) }) + return thunk +} + +export const thunkWorker = new Worker() + +thunkWorker.addGlobalHandler(async (thunk: Thunk) => { + let event = thunk.event + + // Handle abort immediately if possible + if (thunk.controller.signal.aborted) return + // If we were given a wrapped event, make sure to publish the wrapper, not the rumor if (isUnwrappedEvent(event)) { event = event.wrap @@ -59,17 +100,17 @@ thunkWorker.addGlobalHandler(async ({event, relays, resolve, delay, signal}: Thu const signedEvent = event as SignedEvent // Wait if the thunk is to be delayed - if (delay) { - await sleep(delay) + if (thunk.request.delay) { + await sleep(thunk.request.delay) } - // Skip publishing and remove from the repository if aborted - if (aborted) { - return repository.removeEvent(signedEvent.id) + // Skip publishing if aborted + if (thunk.controller.signal.aborted) { + return } // Send it off - const pub = publish({event: signedEvent, relays}) + const pub = publish({event: signedEvent, relays: thunk.request.relays}) // Copy the signature over since we had deferred it const savedEvent = repository.getEvent(signedEvent.id) as SignedEvent @@ -79,58 +120,21 @@ thunkWorker.addGlobalHandler(async ({event, relays, resolve, delay, signal}: Thu savedEvent.sig = signedEvent.sig } - // Track publish success - const statusByUrl: PublishStatusDataByUrl = {} + const completed = new Set() pub.emitter.on("*", async (status: PublishStatus, url: string, message: string) => { - const {id} = signedEvent + thunk.status.update(assoc(url, {status, message})) - Object.assign(statusByUrl, {[url]: {id, url, status, message}}) - - publishStatusData.update(assoc(id, statusByUrl)) - - if (status === PublishStatus.Success) { - tracker.track(id, url) + if (status !== PublishStatus.Pending) { + completed.add(url) } - if ( - Object.values(statusByUrl).filter(s => s.status !== PublishStatus.Pending).length === - relays.length - ) { - resolve(statusByUrl) + if (status === PublishStatus.Success) { + tracker.track(signedEvent.id, url) + } + + if (completed.size === thunk.request.relays.length) { + thunk.result.resolve(get(thunk.status)) } }) }) - -export type ThunkEvent = EventTemplate | StampedEvent | OwnedEvent | TrustedEvent - -export const prepEvent = (event: ThunkEvent) => { - if (!isStampedEvent(event as StampedEvent)) { - event = stamp(event) - } - - if (!isOwnedEvent(event as OwnedEvent)) { - event = own(event as StampedEvent, get(pubkey)!) - } - - if (!isHashedEvent(event as HashedEvent)) { - event = hash(event as OwnedEvent) - } - - return event as TrustedEvent -} - -export type ThunkParams = { - event: ThunkEvent - relays: string[] - delay?: number - signal?: AbortSignal -} - -export const publishThunk = (params: ThunkParams) => - new Promise(resolve => { - const event = prepEvent(params.event) - - thunkWorker.push({...params, event, resolve}) - repository.publish(event) - }) diff --git a/packages/net/src/ConnectionAuth.ts b/packages/net/src/ConnectionAuth.ts index 89a69b8..d84a4ef 100644 --- a/packages/net/src/ConnectionAuth.ts +++ b/packages/net/src/ConnectionAuth.ts @@ -120,7 +120,7 @@ export class ConnectionAuth { } waitIfPending = async ({timeout = 3000}: {timeout?: number} = {}) => { - if ([PendingSignature, PendingResponse].includes(this.status)) { + while ([PendingSignature, PendingResponse].includes(this.status)) { await this.wait({timeout}) } }