From b989b66bb727fc2ce6437c6c4b985587e277107c Mon Sep 17 00:00:00 2001 From: fiatjaf Date: Fri, 10 Apr 2026 11:56:08 -0300 Subject: [PATCH] change ReplaceEvent() interface to return a list of the events deleted. --- eventstore/bleve/lib.go | 6 ++--- eventstore/boltdb/replace.go | 14 +++++----- eventstore/lmdb/replace.go | 19 +++++++------ eventstore/mmm/replace.go | 34 +++++++++++++----------- eventstore/nullstore/lib.go | 4 +-- eventstore/slicestore/lib.go | 9 ++++--- eventstore/store.go | 2 +- eventstore/wrappers/dynamic_publisher.go | 3 ++- eventstore/wrappers/publisher.go | 3 ++- khatru/adding.go | 2 +- khatru/relay.go | 4 +-- 11 files changed, 55 insertions(+), 45 deletions(-) diff --git a/eventstore/bleve/lib.go b/eventstore/bleve/lib.go index 5072417..26b8e36 100644 --- a/eventstore/bleve/lib.go +++ b/eventstore/bleve/lib.go @@ -103,13 +103,13 @@ func (b *BleveBackend) Init() error { if b.RawEventStore == nil { return fmt.Errorf("missing RawEventStore") } + if len(b.Languages) == 0 { + return fmt.Errorf("missing Languages") + } if len(b.IndexableKinds) == 0 { b.IndexableKinds = []nostr.Kind{0, 1, 6, 11, 16, 20, 21, 22, 24, 1111, 9802, 30023, 30818} } - if len(b.Languages) == 0 { - b.Languages = SupportedLanguages - } validLanguages := make([]lingua.Language, 0, len(b.Languages)) b.languageCodes = make([]string, 0, len(b.Languages)) for _, lang := range b.Languages { diff --git a/eventstore/boltdb/replace.go b/eventstore/boltdb/replace.go index fd3b924..22aa1a3 100644 --- a/eventstore/boltdb/replace.go +++ b/eventstore/boltdb/replace.go @@ -8,8 +8,8 @@ import ( "go.etcd.io/bbolt" ) -func (b *BoltBackend) ReplaceEvent(evt nostr.Event) error { - return b.DB.Update(func(txn *bbolt.Tx) error { +func (b *BoltBackend) ReplaceEvent(evt nostr.Event) (deleted []nostr.Event, err error) { + err = b.DB.Update(func(txn *bbolt.Tx) error { rawBucket := txn.Bucket(rawEventStore) // check if we already have this id @@ -25,12 +25,12 @@ func (b *BoltBackend) ReplaceEvent(evt nostr.Event) error { } // now we fetch the past events, whatever they are, delete them and then save the new - var err error + var qerr error var results iter.Seq[nostr.Event] = func(yield func(nostr.Event) bool) { - err = b.query(txn, filter, 10 /* in theory limit could be just 1 and this should work */, yield) + qerr = b.query(txn, filter, 10 /* in theory limit could be just 1 and this should work */, yield) } - if err != nil { - return fmt.Errorf("failed to query past events with %s: %w", filter, err) + if qerr != nil { + return fmt.Errorf("failed to query past events with %s: %w", filter, qerr) } shouldStore := true @@ -39,6 +39,7 @@ func (b *BoltBackend) ReplaceEvent(evt nostr.Event) error { if err := b.delete(txn, previous.ID); err != nil { return fmt.Errorf("failed to delete event %s for replacing: %w", previous.ID, err) } + deleted = append(deleted, previous) } else { // there is a newer event already stored, so we won't store this shouldStore = false @@ -50,4 +51,5 @@ func (b *BoltBackend) ReplaceEvent(evt nostr.Event) error { return nil }) + return deleted, err } diff --git a/eventstore/lmdb/replace.go b/eventstore/lmdb/replace.go index d47ac94..f11a500 100644 --- a/eventstore/lmdb/replace.go +++ b/eventstore/lmdb/replace.go @@ -8,8 +8,8 @@ import ( "github.com/PowerDNS/lmdb-go/lmdb" ) -func (b *LMDBBackend) ReplaceEvent(evt nostr.Event) error { - return b.lmdbEnv.Update(func(txn *lmdb.Txn) error { +func (b *LMDBBackend) ReplaceEvent(evt nostr.Event) (deleted []nostr.Event, err error) { + err = b.lmdbEnv.Update(func(txn *lmdb.Txn) error { // check if we already have this id _, existsErr := txn.Get(b.indexId, evt.ID[0:8]) if existsErr == nil { @@ -26,20 +26,21 @@ func (b *LMDBBackend) ReplaceEvent(evt nostr.Event) error { } // now we fetch the past events, whatever they are, delete them and then save the new - var err error + var qerr error var results iter.Seq[nostr.Event] = func(yield func(nostr.Event) bool) { - err = b.query(txn, filter, 10 /* in theory limit could be just 1 and this should work */, yield) + qerr = b.query(txn, filter, 10 /* in theory limit could be just 1 and this should work */, yield) } - if err != nil { - return fmt.Errorf("failed to query past events with %s: %w", filter, err) + if qerr != nil { + return fmt.Errorf("failed to query past events with %s: %w", filter, qerr) } shouldStore := true for previous := range results { if nostr.IsOlder(previous, evt) { - if err := b.delete(txn, previous.ID); err != nil { - return fmt.Errorf("failed to delete event %s for replacing: %w", previous.ID, err) + if qerr := b.delete(txn, previous.ID); qerr != nil { + return fmt.Errorf("failed to delete event %s for replacing: %w", previous.ID, qerr) } + deleted = append(deleted, previous) } else { // there is a newer event already stored, so we won't store this shouldStore = false @@ -51,4 +52,6 @@ func (b *LMDBBackend) ReplaceEvent(evt nostr.Event) error { return nil }) + + return deleted, err } diff --git a/eventstore/mmm/replace.go b/eventstore/mmm/replace.go index 8e11ea5..31c25ce 100644 --- a/eventstore/mmm/replace.go +++ b/eventstore/mmm/replace.go @@ -9,9 +9,9 @@ import ( "github.com/PowerDNS/lmdb-go/lmdb" ) -func (il *IndexingLayer) ReplaceEvent(evt nostr.Event) error { +func (il *IndexingLayer) ReplaceEvent(evt nostr.Event) (deleted []nostr.Event, err error) { if il.mmmm.ReadOnly { - return ReadOnly + return nil, ReadOnly } il.mmmm.writeMutex.Lock() @@ -29,7 +29,7 @@ func (il *IndexingLayer) ReplaceEvent(evt nostr.Event) error { // prepare transactions mmmtxn, err := il.mmmm.lmdbEnv.BeginTxn(nil, 0) if err != nil { - return err + return nil, err } defer func() { // defer abort but only if we haven't committed (we'll set it to nil after committing) @@ -41,7 +41,7 @@ func (il *IndexingLayer) ReplaceEvent(evt nostr.Event) error { iltxn, err := il.lmdbEnv.BeginTxn(nil, 0) if err != nil { - return err + return nil, err } defer func() { // defer abort but only if we haven't committed (we'll set it to nil after committing) @@ -54,33 +54,35 @@ func (il *IndexingLayer) ReplaceEvent(evt nostr.Event) error { // check if we already have this id _, existsErr := mmmtxn.Get(il.mmmm.indexId, evt.ID[0:8]) if existsErr == nil { - return nil + return nil, nil } if !lmdb.IsNotFound(existsErr) { - return fmt.Errorf("error checking existence: %w", existsErr) + return nil, fmt.Errorf("error checking existence: %w", existsErr) } // now we fetch the past events, whatever they are, delete them and then save the new + var qerr error var results iter.Seq[nostr.Event] = func(yield func(nostr.Event) bool) { - err = il.query(iltxn, filter, 10 /* in theory limit could be just 1 and this should work */, yield) + qerr = il.query(iltxn, filter, 10 /* in theory limit could be just 1 and this should work */, yield) } - if err != nil { - return fmt.Errorf("failed to query past events with %s: %w", filter, err) + if qerr != nil { + return nil, fmt.Errorf("failed to query past events with %s: %w", filter, qerr) } var acquiredFreeRangeFromDelete *position shouldStore := true for previous := range results { if nostr.IsOlder(previous, evt) { - if pos, shouldPurge, err := il.delete(mmmtxn, iltxn, previous.ID); err != nil { - return fmt.Errorf("failed to delete event %s for replacing: %w", previous.ID, err) + if pos, shouldPurge, derr := il.delete(mmmtxn, iltxn, previous.ID); derr != nil { + return nil, fmt.Errorf("failed to delete event %s for replacing: %w", previous.ID, derr) } else if shouldPurge { // purge if err := mmmtxn.Del(il.mmmm.indexId, previous.ID[0:8], nil); err != nil { - return err + return nil, err } acquiredFreeRangeFromDelete = &pos } + deleted = append(deleted, previous) } else { // there is a newer event already stored, so we won't store this shouldStore = false @@ -90,17 +92,17 @@ func (il *IndexingLayer) ReplaceEvent(evt nostr.Event) error { if shouldStore { _, err := il.mmmm.storeOn(mmmtxn, iltxn, il, evt) if err != nil { - return err + return nil, err } } // commit in this order to minimize problematic inconsistencies if err := mmmtxn.Commit(); err != nil { - return fmt.Errorf("can't commit mmmtxn: %w", err) + return nil, fmt.Errorf("can't commit mmmtxn: %w", err) } mmmtxn = nil if err := iltxn.Commit(); err != nil { - return fmt.Errorf("can't commit iltxn: %w", err) + return nil, fmt.Errorf("can't commit iltxn: %w", err) } iltxn = nil @@ -110,5 +112,5 @@ func (il *IndexingLayer) ReplaceEvent(evt nostr.Event) error { il.mmmm.mergeNewFreeRange(*acquiredFreeRangeFromDelete) } - return nil + return deleted, nil } diff --git a/eventstore/nullstore/lib.go b/eventstore/nullstore/lib.go index 68e660d..6868cd9 100644 --- a/eventstore/nullstore/lib.go +++ b/eventstore/nullstore/lib.go @@ -29,8 +29,8 @@ func (b NullStore) SaveEvent(evt nostr.Event) error { return nil } -func (b NullStore) ReplaceEvent(evt nostr.Event) error { - return nil +func (b NullStore) ReplaceEvent(evt nostr.Event) ([]nostr.Event, error) { + return nil, nil } func (b NullStore) CountEvents(filter nostr.Filter) (uint32, error) { diff --git a/eventstore/slicestore/lib.go b/eventstore/slicestore/lib.go index fd5ff8c..8d63795 100644 --- a/eventstore/slicestore/lib.go +++ b/eventstore/slicestore/lib.go @@ -122,7 +122,7 @@ func (b *SliceStore) delete(id nostr.ID) error { return nil } -func (b *SliceStore) ReplaceEvent(evt nostr.Event) error { +func (b *SliceStore) ReplaceEvent(evt nostr.Event) (deleted []nostr.Event, err error) { b.Lock() defer b.Unlock() @@ -135,8 +135,9 @@ func (b *SliceStore) ReplaceEvent(evt nostr.Event) error { for previous := range b.QueryEvents(filter, 1) { if nostr.IsOlder(previous, evt) { if err := b.delete(previous.ID); err != nil { - return fmt.Errorf("failed to delete event for replacing: %w", err) + return nil, fmt.Errorf("failed to delete event for replacing: %w", err) } + deleted = append(deleted, previous) } else { shouldStore = false } @@ -144,11 +145,11 @@ func (b *SliceStore) ReplaceEvent(evt nostr.Event) error { if shouldStore { if err := b.save(evt); err != nil && err != eventstore.ErrDupEvent { - return fmt.Errorf("failed to save: %w", err) + return nil, fmt.Errorf("failed to save: %w", err) } } - return nil + return deleted, nil } func eventTimestampComparator(e nostr.Event, t nostr.Timestamp) int { diff --git a/eventstore/store.go b/eventstore/store.go index ef8eadc..48b6919 100644 --- a/eventstore/store.go +++ b/eventstore/store.go @@ -26,7 +26,7 @@ type Store interface { // ReplaceEvent atomically replaces a replaceable or addressable event. // Conceptually it is like a Query->Delete->Save, but streamlined. - ReplaceEvent(nostr.Event) error + ReplaceEvent(nostr.Event) (deleted []nostr.Event, err error) // CountEvents counts all events that match a given filter CountEvents(nostr.Filter) (uint32, error) diff --git a/eventstore/wrappers/dynamic_publisher.go b/eventstore/wrappers/dynamic_publisher.go index baae390..d93e6e3 100644 --- a/eventstore/wrappers/dynamic_publisher.go +++ b/eventstore/wrappers/dynamic_publisher.go @@ -36,5 +36,6 @@ func (w DynamicPublisher) Publish(ctx context.Context, evt nostr.Event) error { } } - return w.GetStore().ReplaceEvent(evt) + _, err := w.GetStore().ReplaceEvent(evt) + return err } diff --git a/eventstore/wrappers/publisher.go b/eventstore/wrappers/publisher.go index 6505e99..2052edf 100644 --- a/eventstore/wrappers/publisher.go +++ b/eventstore/wrappers/publisher.go @@ -39,5 +39,6 @@ func (w StorePublisher) Publish(ctx context.Context, evt nostr.Event) error { } // others are replaced - return w.Store.ReplaceEvent(evt) + _, err := w.Store.ReplaceEvent(evt) + return err } diff --git a/khatru/adding.go b/khatru/adding.go index c352cee..3871f0a 100644 --- a/khatru/adding.go +++ b/khatru/adding.go @@ -46,7 +46,7 @@ func (rl *Relay) handleNormal(ctx context.Context, evt nostr.Event) (skipBroadca } else { // otherwise it's a replaceable if nil != rl.ReplaceEvent { - if err := rl.ReplaceEvent(ctx, evt); err != nil { + if _, err := rl.ReplaceEvent(ctx, evt); err != nil { switch err { case eventstore.ErrDupEvent: return true, nil diff --git a/khatru/relay.go b/khatru/relay.go index 7404e40..b919dd8 100644 --- a/khatru/relay.go +++ b/khatru/relay.go @@ -68,7 +68,7 @@ type Relay struct { // hooks that will be called at various times OnEvent func(ctx context.Context, event nostr.Event) (reject bool, msg string) StoreEvent func(ctx context.Context, event nostr.Event) error - ReplaceEvent func(ctx context.Context, event nostr.Event) error + ReplaceEvent func(ctx context.Context, event nostr.Event) ([]nostr.Event, error) DeleteEvent func(ctx context.Context, id nostr.ID) error OnEventSaved func(ctx context.Context, event nostr.Event) OnEventDeleted func(ctx context.Context, deleted nostr.Event) @@ -145,7 +145,7 @@ func (rl *Relay) UseEventstore(store eventstore.Store, maxQueryLimit int) { rl.StoreEvent = func(ctx context.Context, event nostr.Event) error { return store.SaveEvent(event) } - rl.ReplaceEvent = func(ctx context.Context, event nostr.Event) error { + rl.ReplaceEvent = func(ctx context.Context, event nostr.Event) ([]nostr.Event, error) { return store.ReplaceEvent(event) } rl.DeleteEvent = func(ctx context.Context, id nostr.ID) error {