From 3da44438ab3808d06afca23d10a953f08d458534 Mon Sep 17 00:00:00 2001 From: Jon Staab Date: Thu, 6 Jun 2024 09:26:06 -0700 Subject: [PATCH] Make repository updates atomic --- packages/util/Repository.ts | 37 ++++++++++++++++--------------------- 1 file changed, 16 insertions(+), 21 deletions(-) diff --git a/packages/util/Repository.ts b/packages/util/Repository.ts index 8358428..d8d12ec 100644 --- a/packages/util/Repository.ts +++ b/packages/util/Repository.ts @@ -1,4 +1,4 @@ -import {flatten, nth, Emitter, sortBy, inc, chunk, sleep, uniq, omit, now, range, identity} from '@welshman/lib' +import {flatten, Emitter, sortBy, inc, chunk, sleep, uniq, omit, now, range, identity} from '@welshman/lib' import {DELETE} from './Kinds' import {EPOCH, matchFilter} from './Filters' import {isReplaceable, isTrustedEvent} from './Events' @@ -11,7 +11,6 @@ export const DAY = 86400 const getDay = (ts: number) => Math.floor(ts / DAY) export class Repository extends Emitter { - _shouldNotify = true eventsById = new Map() eventsByWrap = new Map() eventsByAddress = new Map() @@ -27,13 +26,11 @@ export class Repository extends Emitter { } load = async (events: TrustedEvent[], chunkSize = 1000) => { - this._shouldNotify = false - this.clear() for (const eventsChunk of chunk(chunkSize, events)) { for (const event of eventsChunk) { - this.publish(event) + this.publish(event, {shouldNotify: false}) } if (eventsChunk.length === chunkSize) { @@ -41,12 +38,11 @@ export class Repository extends Emitter { } } + this.emit('update', { added: events.filter(e => !this.isDeleted(e)), removed: new Set(), }) - - this._shouldNotify = false } clear = () => { @@ -131,7 +127,7 @@ export class Repository extends Emitter { return uniq(flatten(result)) } - publish = (event: TrustedEvent) => { + publish = (event: TrustedEvent, {shouldNotify = true} = {}) => { if (!isTrustedEvent(event)) { throw new Error("Invalid event published to Repository", event) } @@ -164,6 +160,9 @@ export class Repository extends Emitter { this._updateIndex(this.eventsByDay, getDay(event.created_at), event, duplicate) this._updateIndex(this.eventsByAuthor, event.pubkey, event, duplicate) + // Keep track of deleted events to notify about + const removed = new Set() + // Update our tag indexes for (const tag of event.tags) { if (tag[0].length === 1) { @@ -173,26 +172,22 @@ export class Repository extends Emitter { // deleted so that replaceables can be restored. if (event.kind === DELETE) { this.deletes.set(tag[1], Math.max(event.created_at, this.deletes.get(tag[1]) || 0)) + + const deletedEvent = this.getEvent(tag[1]) + + if (deletedEvent && this.isDeleted(deletedEvent)) { + removed.add(deletedEvent.id) + } } } } - // Notify caller - const added = this.isDeleted(event) ? [] : [event] - const removed = new Set() - if (duplicate) { - removed.add(duplicate!.id) + removed.add(duplicate.id) } - if (event.kind === DELETE) { - for (const value of event.tags.map(nth(1))) { - removed.add(value) - } - } - - if (this._shouldNotify) { - this.emit('update', {added, removed}) + if (shouldNotify) { + this.emit('update', {added: this.isDeleted(event) ? [] : [event], removed}) } }