From d20d38a83817db555ab50f169b7c9081b48f6bfe Mon Sep 17 00:00:00 2001 From: Jon Staab Date: Thu, 17 Oct 2024 11:27:10 -0700 Subject: [PATCH] Add MergedThunk --- packages/app/src/thunk.ts | 52 ++++++++++++++++++++++++++++++++++++--- 1 file changed, 48 insertions(+), 4 deletions(-) diff --git a/packages/app/src/thunk.ts b/packages/app/src/thunk.ts index 908eeaa..921f479 100644 --- a/packages/app/src/thunk.ts +++ b/packages/app/src/thunk.ts @@ -1,6 +1,6 @@ -import {writable, get} from 'svelte/store' -import type {Writable} from 'svelte/store' -import {Worker, defer, sleep, assoc} from '@welshman/lib' +import {writable, derived, get} from 'svelte/store' +import type {Writable, Readable} from 'svelte/store' +import {Worker, identity, uniq, defer, sleep, assoc} from '@welshman/lib' import type {Deferred} from '@welshman/lib' import {stamp, own, hash} from "@welshman/signer" import type {TrustedEvent, HashedEvent, EventTemplate, SignedEvent, StampedEvent, OwnedEvent} from '@welshman/util' @@ -9,6 +9,8 @@ import {publish, PublishStatus} from "@welshman/net" import {repository, tracker} from './core' import {pubkey, getSession, getSigner} from './session' +const {Pending, Success, Failure, Timeout, Aborted} = PublishStatus + export type ThunkEvent = EventTemplate | StampedEvent | OwnedEvent | TrustedEvent export type ThunkRequest = { @@ -57,6 +59,48 @@ export const makeThunk = (request: ThunkRequest) => { return {event, request, controller, result, status} } +export type MergedThunk = { + thunks: Thunk[] + controller: AbortController + result: Promise + status: Readable +} + +export const mergeThunks = (thunks: Thunk[]) => { + const controller = new AbortController() + + controller.signal.addEventListener('abort', () => { + for (const thunk of thunks) { + thunk.controller.abort() + } + }) + + return { + thunks, + controller, + result: Promise.all(thunks.map(thunk => thunk.result)), + status: derived( + thunks.map(thunk => thunk.status), + statuses => { + const mergedStatus: ThunkStatusByUrl = {} + + for (const url of uniq(statuses.flatMap(s => Object.keys(s)))) { + const urlStatuses = statuses.map(s => s[url]) + const thunkStatus = [Aborted, Failure, Timeout, Pending, Success] + .map(status => urlStatuses.find(s => s?.status === status)) + .find(identity) + + if (thunkStatus) { + mergedStatus[url] = thunkStatus + } + } + + return mergedStatus + } + ) + } +} + export const publishThunk = (request: ThunkRequest) => { const thunk = makeThunk(request) @@ -122,7 +166,7 @@ thunkWorker.addGlobalHandler(async (thunk: Thunk) => { const completed = new Set() - pub.emitter.on("*", async (status: PublishStatus, url: string, message: string) => { + pub.emitter.on("*", async (status: PublishStatus, url: string, message = "") => { thunk.status.update(assoc(url, {status, message})) if (status !== PublishStatus.Pending) {