Get rid of Worker, use TaskQueue instead

This commit is contained in:
Jon Staab
2025-03-31 15:55:10 -07:00
parent db588495e3
commit 04816f8377
5 changed files with 82 additions and 433 deletions
+1 -1
View File
@@ -5,7 +5,7 @@ Some general-purpose utilities for use in @welshman apps.
Includes:
- LRU cache implementation
- Worker for throttling work to avoid locking up the UI
- TaskQueue for throttling work to avoid locking up the UI
- URL normalization (taken from normalize-url)
- A global `ctx` variable which can be used for global configuration
- CustomPromise, which provides an error type, and `defer` utility
-208
View File
@@ -1,208 +0,0 @@
import {describe, it, expect, vi, beforeEach, afterEach} from "vitest"
import {Worker} from "../src/Worker"
describe("Worker", () => {
beforeEach(() => {
vi.useFakeTimers()
})
afterEach(() => {
vi.useRealTimers()
})
it("should process messages in batches", async () => {
const handler = vi.fn()
const worker = new Worker<number>()
worker.addGlobalHandler(handler)
// Push messages
worker.push(1)
worker.push(2)
worker.push(3)
// Initially no processing
expect(handler).not.toHaveBeenCalled()
// Advance timer to trigger processing
await vi.advanceTimersByTimeAsync(50)
expect(handler).toHaveBeenCalledTimes(3)
expect(handler).toHaveBeenNthCalledWith(1, 1)
expect(handler).toHaveBeenNthCalledWith(2, 2)
expect(handler).toHaveBeenNthCalledWith(3, 3)
})
it("should respect chunkSize option", async () => {
const handler = vi.fn()
const worker = new Worker<number>({chunkSize: 2})
worker.addGlobalHandler(handler)
// Push more messages than chunkSize
worker.push(1)
worker.push(2)
worker.push(3)
// First batch
await vi.advanceTimersByTimeAsync(50)
expect(handler).toHaveBeenCalledTimes(2)
// Second batch
await vi.advanceTimersByTimeAsync(50)
expect(handler).toHaveBeenCalledTimes(3)
})
it("should handle message routing by key", async () => {
const globalHandler = vi.fn()
const evenHandler = vi.fn()
const oddHandler = vi.fn()
const worker = new Worker<number>({
getKey: x => (x % 2 === 0 ? "even" : "odd"),
})
worker.addGlobalHandler(globalHandler)
worker.addHandler("even", evenHandler)
worker.addHandler("odd", oddHandler)
worker.push(1)
worker.push(2)
await vi.advanceTimersByTimeAsync(50)
expect(globalHandler).toHaveBeenCalledTimes(2)
expect(evenHandler).toHaveBeenCalledWith(2)
expect(oddHandler).toHaveBeenCalledWith(1)
})
it("should handle message deferral", async () => {
const handler = vi.fn()
let shouldDefer = true
const worker = new Worker<number>({
shouldDefer: () => shouldDefer,
})
worker.addGlobalHandler(handler)
worker.push(1)
// Message should be deferred
await vi.advanceTimersByTimeAsync(50)
expect(handler).not.toHaveBeenCalled()
// Allow processing
shouldDefer = false
await vi.advanceTimersByTimeAsync(50)
expect(handler).toHaveBeenCalledWith(1)
})
it("should handle multiple handlers for same key", async () => {
const handler1 = vi.fn()
const handler2 = vi.fn()
const worker = new Worker<number>({
getKey: () => "test",
})
worker.addHandler("test", handler1)
worker.addHandler("test", handler2)
worker.push(1)
await vi.advanceTimersByTimeAsync(50)
expect(handler1).toHaveBeenCalledWith(1)
expect(handler2).toHaveBeenCalledWith(1)
})
it("should handle errors in handlers gracefully", async () => {
const consoleError = vi.spyOn(console, "error")
const errorHandler = vi.fn(() => {
throw new Error("Test error")
})
const nextHandler = vi.fn()
const worker = new Worker<number>()
worker.addGlobalHandler(errorHandler)
worker.addGlobalHandler(nextHandler)
worker.push(1)
await vi.advanceTimersByTimeAsync(50)
expect(consoleError).toHaveBeenCalled()
expect(nextHandler).toHaveBeenCalled()
})
describe("control methods", () => {
it("should clear the buffer", async () => {
const handler = vi.fn()
const worker = new Worker<number>()
worker.addGlobalHandler(handler)
worker.push(1)
worker.push(2)
worker.clear()
await vi.advanceTimersByTimeAsync(50)
expect(handler).not.toHaveBeenCalled()
})
it("should pause and resume processing", async () => {
const handler = vi.fn()
const worker = new Worker<number>()
worker.addGlobalHandler(handler)
worker.push(1)
worker.pause()
await vi.advanceTimersByTimeAsync(50)
expect(handler).not.toHaveBeenCalled()
worker.resume()
await vi.advanceTimersByTimeAsync(50)
expect(handler).toHaveBeenCalled()
})
it("should respect custom delay option", async () => {
const handler = vi.fn()
const worker = new Worker<number>({delay: 100})
worker.addGlobalHandler(handler)
worker.push(1)
await vi.advanceTimersByTimeAsync(50)
expect(handler).not.toHaveBeenCalled()
await vi.advanceTimersByTimeAsync(50) // Total 100ms
expect(handler).toHaveBeenCalled()
})
})
describe("async handlers", () => {
it("should wait for async handlers to complete", async () => {
const results: number[] = []
const asyncHandler = vi.fn(async (x: number) => {
await new Promise(resolve => setTimeout(resolve, 100))
results.push(x)
})
const worker = new Worker<number>()
worker.addGlobalHandler(asyncHandler)
worker.push(1)
worker.push(2)
await vi.advanceTimersByTimeAsync(50) // Trigger processing
await vi.advanceTimersByTimeAsync(100) // Wait for one async handlers
expect(results).toEqual([1])
await vi.advanceTimersByTimeAsync(100) // Wait for another async handlers
expect(results).toEqual([1, 2])
})
})
})
-145
View File
@@ -1,145 +0,0 @@
import {remove} from "./Tools.js"
/** Symbol used to identify global handlers */
const ANY = Symbol("worker/ANY")
/** Configuration options for Worker */
export type WorkerOpts<T> = {
/** Function to get key for message routing */
getKey?: (x: T) => any
/** Function to determine if message processing should be deferred */
shouldDefer?: (x: T) => boolean
/** Maximum number of messages to process in one batch */
chunkSize?: number
/** Milliseconds to wait between processing batches */
delay?: number
}
/**
* Worker for processing messages in batches with throttling
* @template T - Type of messages to process
*/
export class Worker<T> {
buffer: T[] = []
handlers: Map<any, Array<(x: T) => void>> = new Map()
#timeout: number | undefined
#paused = false
constructor(readonly opts: WorkerOpts<T> = {}) {}
#doWork = async () => {
const {chunkSize = 50} = this.opts
for (let i = 0; i < chunkSize; i++) {
if (this.buffer.length === 0) {
break
}
// Pop the buffer one at a time so handle can modify the queue
const [message] = this.buffer.splice(0, 1)
if (this.opts.shouldDefer?.(message)) {
this.buffer.push(message)
} else {
for (const handler of this.handlers.get(ANY) || []) {
try {
await handler(message)
} catch (e) {
console.error(e)
}
}
if (this.opts.getKey) {
const k = this.opts.getKey(message)
for (const handler of this.handlers.get(k) || []) {
try {
await handler(message)
} catch (e) {
console.error(e)
}
}
}
}
}
this.#timeout = undefined
this.#enqueueWork()
}
#enqueueWork = () => {
const {delay = 50} = this.opts
if (!this.#paused && !this.#timeout && this.buffer.length > 0) {
this.#timeout = setTimeout(this.#doWork, delay) as unknown as number
}
}
/**
* Adds a message to the processing queue
* @param message - Message to process
*/
push = (message: T) => {
this.buffer.push(message)
this.#enqueueWork()
}
/**
* Adds a handler for messages with specific key
* @param k - Key to handle
* @param handler - Function to process matching messages
*/
addHandler = (k: any, handler: (message: T) => void) => {
this.handlers.set(k, (this.handlers.get(k) || []).concat(handler))
}
/**
* Removes a handler for messages with specific key
* @param k - Key to handle
* @param handler - Function to process matching messages
*/
removeHandler = (k: any, handler: (message: T) => void) => {
const newHandlers = remove(handler, this.handlers.get(k) || [])
if (newHandlers.length > 0) {
this.handlers.set(k, newHandlers)
} else {
this.handlers.delete(k)
}
}
/**
* Adds a handler for all messages
* @param handler - Function to process all messages
*/
addGlobalHandler = (handler: (message: T) => void) => {
this.addHandler(ANY, handler)
}
/**
* Removes a handler for all messages
* @param handler - Function to process all messages
*/
removeGlobalHandler = (handler: (message: T) => void) => {
this.removeHandler(ANY, handler)
}
/** Removes all pending messages from the queue */
clear() {
this.buffer = []
}
/** Pauses message processing */
pause() {
clearTimeout(this.#timeout)
this.#paused = true
this.#timeout = undefined
}
/** Resumes message processing */
resume() {
this.#paused = false
this.#enqueueWork()
}
}
-1
View File
@@ -2,6 +2,5 @@ export * from "./Deferred.js"
export * from "./Emitter.js"
export * from "./LRUCache.js"
export * from "./Tools.js"
export * from "./Worker.js"
export * from "./TaskQueue.js"
export {default as normalizeUrl} from "./normalize-url/index.js"