diff --git a/packages/app/src/thunk.ts b/packages/app/src/thunk.ts index 78dfd74..c67746b 100644 --- a/packages/app/src/thunk.ts +++ b/packages/app/src/thunk.ts @@ -1,5 +1,5 @@ import {Writable, Readable, writable, derived, get} from "svelte/store" -import {Deferred, Worker, dissoc, identity, uniq, defer, sleep, assoc} from "@welshman/lib" +import {Deferred, TaskQueue, dissoc, identity, uniq, defer, sleep, assoc} from "@welshman/lib" import {stamp, own, hash} from "@welshman/signer" import { TrustedEvent, @@ -129,7 +129,7 @@ export const thunks = writable>({}) export const publishThunk = (request: ThunkRequest) => { const thunk = makeThunk(request) - thunkWorker.push(thunk) + thunkQueue.push(thunk) repository.publish(thunk.event) @@ -147,7 +147,7 @@ export const publishThunks = (requests: ThunkRequest[]) => { const mergedThunk = mergeThunks(newThunks) for (const thunk of newThunks) { - thunkWorker.push(thunk) + thunkQueue.push(thunk) repository.publish(thunk.event) @@ -167,96 +167,99 @@ export const abortThunk = (thunk: Thunk) => { repository.removeEvent(thunk.event.id) } -export const thunkWorker = new Worker() +export const thunkQueue = new TaskQueue({ + batchSize: 50, + processItem: (thunk: Thunk) => { + let event = thunk.event -thunkWorker.addGlobalHandler((thunk: Thunk) => { - let event = thunk.event + // Handle abort immediately if possible + if (thunk.controller.signal.aborted) return - // Handle abort immediately if possible - if (thunk.controller.signal.aborted) return + // If we were given a wrapped event, make sure to publish the wrapper, not the rumor + if (isUnwrappedEvent(event)) { + event = event.wrap + } - // If we were given a wrapped event, make sure to publish the wrapper, not the rumor - if (isUnwrappedEvent(event)) { - event = event.wrap - } + // Avoid making this function async so multiple publishes can run concurrently + Promise.resolve().then(async () => { + const fail = (message: string) => { + const status: ThunkStatusByUrl = {} - // Avoid making this function async so multiple publishes can run concurrently - Promise.resolve().then(async () => { - const fail = (message: string) => { - const status: ThunkStatusByUrl = {} + for (const url of thunk.request.relays) { + status[url] = {status: PublishStatus.Failure, message} + } - for (const url of thunk.request.relays) { - status[url] = {status: PublishStatus.Failure, message} + thunk.status.set(status) } - thunk.status.set(status) - } + // If the event was already signed, leave it alone. Otherwise, sign it now. This is to + // decrease apparent latency in the UI that results from waiting for remote signers + if (!isSignedEvent(event)) { + const signer = getSigner(getSession(event.pubkey)) - // If the event was already signed, leave it alone. Otherwise, sign it now. This is to - // decrease apparent latency in the UI that results from waiting for remote signers - if (!isSignedEvent(event)) { - const signer = getSigner(getSession(event.pubkey)) + if (!signer) { + return fail(`No signer found for ${event.pubkey}`) + } - if (!signer) { - return fail(`No signer found for ${event.pubkey}`) + try { + event = await signer.sign(event) + } catch (e: any) { + return fail(String(e.error || e)) + } } - try { - event = await signer.sign(event) - } catch (e: any) { - return fail(String(e.error || e)) + // We're guaranteed to have a signed event at this point + const signedEvent = event as SignedEvent + + // Wait if the thunk is to be delayed + if (thunk.request.delay) { + await sleep(thunk.request.delay) } - } - // We're guaranteed to have a signed event at this point - const signedEvent = event as SignedEvent + // Skip publishing if aborted + if (thunk.controller.signal.aborted) { + return + } - // Wait if the thunk is to be delayed - if (thunk.request.delay) { - await sleep(thunk.request.delay) - } + // Send it off + const pub = new MultiPublish({ + event: signedEvent, + relays: thunk.request.relays, + context: thunk.request.context, + }) - // Skip publishing if aborted - if (thunk.controller.signal.aborted) { - return - } + // Copy the signature over since we had deferred it + const savedEvent = repository.getEvent(signedEvent.id) as SignedEvent - // Send it off - const pub = new MultiPublish({ - event: signedEvent, - relays: thunk.request.relays, - context: thunk.request.context, + // The event may already be replaced or deleted + if (savedEvent) { + savedEvent.sig = signedEvent.sig + } + + pub.on(PublishEvent.Success, (id: string, message: string, url: string) => { + tracker.track(id, url) + thunk.status.update(assoc(url, {status: PublishStatus.Success, message})) + }) + + pub.on(PublishEvent.Failure, (id: string, message: string, url: string) => { + thunk.status.update(assoc(url, {status: PublishStatus.Failure, message})) + }) + + pub.on(PublishEvent.Timeout, (url: string) => { + thunk.status.update( + assoc(url, {status: PublishStatus.Timeout, message: "Publish timed out"}), + ) + }) + + pub.on(PublishEvent.Aborted, (url: string) => { + thunk.status.update( + assoc(url, {status: PublishStatus.Aborted, message: "Publish was aborted"}), + ) + }) + + pub.on(PublishEvent.Complete, () => { + thunk.result.resolve(get(thunk.status)) + }) }) - - // Copy the signature over since we had deferred it - const savedEvent = repository.getEvent(signedEvent.id) as SignedEvent - - // The event may already be replaced or deleted - if (savedEvent) { - savedEvent.sig = signedEvent.sig - } - - pub.on(PublishEvent.Success, (id: string, message: string, url: string) => { - tracker.track(id, url) - thunk.status.update(assoc(url, {status: PublishStatus.Success, message})) - }) - - pub.on(PublishEvent.Failure, (id: string, message: string, url: string) => { - thunk.status.update(assoc(url, {status: PublishStatus.Failure, message})) - }) - - pub.on(PublishEvent.Timeout, (url: string) => { - thunk.status.update(assoc(url, {status: PublishStatus.Timeout, message: "Publish timed out"})) - }) - - pub.on(PublishEvent.Aborted, (url: string) => { - thunk.status.update( - assoc(url, {status: PublishStatus.Aborted, message: "Publish was aborted"}), - ) - }) - - pub.on(PublishEvent.Complete, () => { - thunk.result.resolve(get(thunk.status)) - }) - }) + }, }) diff --git a/packages/lib/README.md b/packages/lib/README.md index b6ee590..2c54b85 100644 --- a/packages/lib/README.md +++ b/packages/lib/README.md @@ -5,7 +5,7 @@ Some general-purpose utilities for use in @welshman apps. Includes: - LRU cache implementation -- Worker for throttling work to avoid locking up the UI +- TaskQueue for throttling work to avoid locking up the UI - URL normalization (taken from normalize-url) - A global `ctx` variable which can be used for global configuration - CustomPromise, which provides an error type, and `defer` utility diff --git a/packages/lib/__tests__/Worker.test.ts b/packages/lib/__tests__/Worker.test.ts deleted file mode 100644 index bb6a76e..0000000 --- a/packages/lib/__tests__/Worker.test.ts +++ /dev/null @@ -1,208 +0,0 @@ -import {describe, it, expect, vi, beforeEach, afterEach} from "vitest" -import {Worker} from "../src/Worker" - -describe("Worker", () => { - beforeEach(() => { - vi.useFakeTimers() - }) - - afterEach(() => { - vi.useRealTimers() - }) - - it("should process messages in batches", async () => { - const handler = vi.fn() - const worker = new Worker() - - worker.addGlobalHandler(handler) - - // Push messages - worker.push(1) - worker.push(2) - worker.push(3) - - // Initially no processing - expect(handler).not.toHaveBeenCalled() - - // Advance timer to trigger processing - await vi.advanceTimersByTimeAsync(50) - - expect(handler).toHaveBeenCalledTimes(3) - expect(handler).toHaveBeenNthCalledWith(1, 1) - expect(handler).toHaveBeenNthCalledWith(2, 2) - expect(handler).toHaveBeenNthCalledWith(3, 3) - }) - - it("should respect chunkSize option", async () => { - const handler = vi.fn() - const worker = new Worker({chunkSize: 2}) - - worker.addGlobalHandler(handler) - - // Push more messages than chunkSize - worker.push(1) - worker.push(2) - worker.push(3) - - // First batch - await vi.advanceTimersByTimeAsync(50) - expect(handler).toHaveBeenCalledTimes(2) - - // Second batch - await vi.advanceTimersByTimeAsync(50) - expect(handler).toHaveBeenCalledTimes(3) - }) - - it("should handle message routing by key", async () => { - const globalHandler = vi.fn() - const evenHandler = vi.fn() - const oddHandler = vi.fn() - - const worker = new Worker({ - getKey: x => (x % 2 === 0 ? "even" : "odd"), - }) - - worker.addGlobalHandler(globalHandler) - worker.addHandler("even", evenHandler) - worker.addHandler("odd", oddHandler) - - worker.push(1) - worker.push(2) - - await vi.advanceTimersByTimeAsync(50) - - expect(globalHandler).toHaveBeenCalledTimes(2) - expect(evenHandler).toHaveBeenCalledWith(2) - expect(oddHandler).toHaveBeenCalledWith(1) - }) - - it("should handle message deferral", async () => { - const handler = vi.fn() - let shouldDefer = true - - const worker = new Worker({ - shouldDefer: () => shouldDefer, - }) - - worker.addGlobalHandler(handler) - worker.push(1) - - // Message should be deferred - await vi.advanceTimersByTimeAsync(50) - expect(handler).not.toHaveBeenCalled() - - // Allow processing - shouldDefer = false - await vi.advanceTimersByTimeAsync(50) - expect(handler).toHaveBeenCalledWith(1) - }) - - it("should handle multiple handlers for same key", async () => { - const handler1 = vi.fn() - const handler2 = vi.fn() - - const worker = new Worker({ - getKey: () => "test", - }) - - worker.addHandler("test", handler1) - worker.addHandler("test", handler2) - - worker.push(1) - - await vi.advanceTimersByTimeAsync(50) - - expect(handler1).toHaveBeenCalledWith(1) - expect(handler2).toHaveBeenCalledWith(1) - }) - - it("should handle errors in handlers gracefully", async () => { - const consoleError = vi.spyOn(console, "error") - const errorHandler = vi.fn(() => { - throw new Error("Test error") - }) - const nextHandler = vi.fn() - - const worker = new Worker() - worker.addGlobalHandler(errorHandler) - worker.addGlobalHandler(nextHandler) - - worker.push(1) - - await vi.advanceTimersByTimeAsync(50) - - expect(consoleError).toHaveBeenCalled() - expect(nextHandler).toHaveBeenCalled() - }) - - describe("control methods", () => { - it("should clear the buffer", async () => { - const handler = vi.fn() - const worker = new Worker() - - worker.addGlobalHandler(handler) - worker.push(1) - worker.push(2) - - worker.clear() - - await vi.advanceTimersByTimeAsync(50) - expect(handler).not.toHaveBeenCalled() - }) - - it("should pause and resume processing", async () => { - const handler = vi.fn() - const worker = new Worker() - - worker.addGlobalHandler(handler) - worker.push(1) - - worker.pause() - await vi.advanceTimersByTimeAsync(50) - expect(handler).not.toHaveBeenCalled() - - worker.resume() - await vi.advanceTimersByTimeAsync(50) - expect(handler).toHaveBeenCalled() - }) - - it("should respect custom delay option", async () => { - const handler = vi.fn() - const worker = new Worker({delay: 100}) - - worker.addGlobalHandler(handler) - worker.push(1) - - await vi.advanceTimersByTimeAsync(50) - expect(handler).not.toHaveBeenCalled() - - await vi.advanceTimersByTimeAsync(50) // Total 100ms - expect(handler).toHaveBeenCalled() - }) - }) - - describe("async handlers", () => { - it("should wait for async handlers to complete", async () => { - const results: number[] = [] - const asyncHandler = vi.fn(async (x: number) => { - await new Promise(resolve => setTimeout(resolve, 100)) - results.push(x) - }) - - const worker = new Worker() - worker.addGlobalHandler(asyncHandler) - - worker.push(1) - worker.push(2) - - await vi.advanceTimersByTimeAsync(50) // Trigger processing - await vi.advanceTimersByTimeAsync(100) // Wait for one async handlers - - expect(results).toEqual([1]) - - await vi.advanceTimersByTimeAsync(100) // Wait for another async handlers - - expect(results).toEqual([1, 2]) - }) - }) -}) diff --git a/packages/lib/src/Worker.ts b/packages/lib/src/Worker.ts deleted file mode 100644 index e7df108..0000000 --- a/packages/lib/src/Worker.ts +++ /dev/null @@ -1,145 +0,0 @@ -import {remove} from "./Tools.js" - -/** Symbol used to identify global handlers */ -const ANY = Symbol("worker/ANY") - -/** Configuration options for Worker */ -export type WorkerOpts = { - /** Function to get key for message routing */ - getKey?: (x: T) => any - /** Function to determine if message processing should be deferred */ - shouldDefer?: (x: T) => boolean - /** Maximum number of messages to process in one batch */ - chunkSize?: number - /** Milliseconds to wait between processing batches */ - delay?: number -} - -/** - * Worker for processing messages in batches with throttling - * @template T - Type of messages to process - */ -export class Worker { - buffer: T[] = [] - handlers: Map void>> = new Map() - #timeout: number | undefined - #paused = false - - constructor(readonly opts: WorkerOpts = {}) {} - - #doWork = async () => { - const {chunkSize = 50} = this.opts - - for (let i = 0; i < chunkSize; i++) { - if (this.buffer.length === 0) { - break - } - - // Pop the buffer one at a time so handle can modify the queue - const [message] = this.buffer.splice(0, 1) - - if (this.opts.shouldDefer?.(message)) { - this.buffer.push(message) - } else { - for (const handler of this.handlers.get(ANY) || []) { - try { - await handler(message) - } catch (e) { - console.error(e) - } - } - - if (this.opts.getKey) { - const k = this.opts.getKey(message) - - for (const handler of this.handlers.get(k) || []) { - try { - await handler(message) - } catch (e) { - console.error(e) - } - } - } - } - } - - this.#timeout = undefined - this.#enqueueWork() - } - - #enqueueWork = () => { - const {delay = 50} = this.opts - - if (!this.#paused && !this.#timeout && this.buffer.length > 0) { - this.#timeout = setTimeout(this.#doWork, delay) as unknown as number - } - } - - /** - * Adds a message to the processing queue - * @param message - Message to process - */ - push = (message: T) => { - this.buffer.push(message) - this.#enqueueWork() - } - - /** - * Adds a handler for messages with specific key - * @param k - Key to handle - * @param handler - Function to process matching messages - */ - addHandler = (k: any, handler: (message: T) => void) => { - this.handlers.set(k, (this.handlers.get(k) || []).concat(handler)) - } - - /** - * Removes a handler for messages with specific key - * @param k - Key to handle - * @param handler - Function to process matching messages - */ - removeHandler = (k: any, handler: (message: T) => void) => { - const newHandlers = remove(handler, this.handlers.get(k) || []) - - if (newHandlers.length > 0) { - this.handlers.set(k, newHandlers) - } else { - this.handlers.delete(k) - } - } - - /** - * Adds a handler for all messages - * @param handler - Function to process all messages - */ - addGlobalHandler = (handler: (message: T) => void) => { - this.addHandler(ANY, handler) - } - - /** - * Removes a handler for all messages - * @param handler - Function to process all messages - */ - removeGlobalHandler = (handler: (message: T) => void) => { - this.removeHandler(ANY, handler) - } - - /** Removes all pending messages from the queue */ - clear() { - this.buffer = [] - } - - /** Pauses message processing */ - pause() { - clearTimeout(this.#timeout) - - this.#paused = true - this.#timeout = undefined - } - - /** Resumes message processing */ - resume() { - this.#paused = false - this.#enqueueWork() - } -} diff --git a/packages/lib/src/index.ts b/packages/lib/src/index.ts index ccf200e..a1f86b3 100644 --- a/packages/lib/src/index.ts +++ b/packages/lib/src/index.ts @@ -2,6 +2,5 @@ export * from "./Deferred.js" export * from "./Emitter.js" export * from "./LRUCache.js" export * from "./Tools.js" -export * from "./Worker.js" export * from "./TaskQueue.js" export {default as normalizeUrl} from "./normalize-url/index.js"