Fix task queue

This commit is contained in:
Jon Staab
2026-02-09 10:02:02 -08:00
parent 764241fc21
commit 119cb6248b
2 changed files with 19 additions and 22 deletions
+3 -10
View File
@@ -6,7 +6,6 @@ import {
Wallet, Wallet,
WRAP, WRAP,
getPubkeyTagValues, getPubkeyTagValues,
HashedEvent,
StampedEvent, StampedEvent,
SignedEvent, SignedEvent,
getPubkey, getPubkey,
@@ -323,8 +322,6 @@ export const unwrapAndStore = async (wrap: SignedEvent) => {
return cached return cached
} }
let result: {rumor: HashedEvent; recipient: string} | undefined
// Next, try to decrypt as the recipient // Next, try to decrypt as the recipient
for (const recipient of getPubkeyTagValues(wrap.tags)) { for (const recipient of getPubkeyTagValues(wrap.tags)) {
const signer = getSignerFromPubkey(recipient) const signer = getSignerFromPubkey(recipient)
@@ -333,16 +330,12 @@ export const unwrapAndStore = async (wrap: SignedEvent) => {
try { try {
const rumor = await Nip59.fromSigner(signer).unwrap(wrap) const rumor = await Nip59.fromSigner(signer).unwrap(wrap)
result = {rumor, recipient} wrapManager.add({wrap, rumor, recipient})
return rumor
} catch (e) { } catch (e) {
failedUnwraps.add(wrap.id) failedUnwraps.add(wrap.id)
} }
} }
} }
if (result) {
wrapManager.add({wrap, ...result})
return result.rumor
}
} }
+16 -12
View File
@@ -39,22 +39,26 @@ export class TaskQueue<Item> {
this.isProcessing = true this.isProcessing = true
setTimeout(async () => { setTimeout(async () => {
for (const item of this.items.splice(0, this.options.batchSize)) { if (this.isPaused) {
try { this.isProcessing = false
for (const subscriber of this._subs) { } else {
subscriber(item) for (const item of this.items.splice(0, this.options.batchSize)) {
try {
for (const subscriber of this._subs) {
subscriber(item)
}
await this.options.processItem(item)
} catch (e) {
console.error(e)
} }
await this.options.processItem(item)
} catch (e) {
console.error(e)
} }
}
this.isProcessing = false this.isProcessing = false
if (this.items.length > 0) { if (this.items.length > 0) {
this.process() this.process()
}
} }
}, this.options.batchDelay) }, this.options.batchDelay)
} }