change ReplaceEvent() interface to return a list of the events deleted.

This commit is contained in:
fiatjaf
2026-04-10 11:56:08 -03:00
parent 4261bc88f8
commit b989b66bb7
11 changed files with 55 additions and 45 deletions
+3 -3
View File
@@ -103,13 +103,13 @@ func (b *BleveBackend) Init() error {
if b.RawEventStore == nil { if b.RawEventStore == nil {
return fmt.Errorf("missing RawEventStore") return fmt.Errorf("missing RawEventStore")
} }
if len(b.Languages) == 0 {
return fmt.Errorf("missing Languages")
}
if len(b.IndexableKinds) == 0 { if len(b.IndexableKinds) == 0 {
b.IndexableKinds = []nostr.Kind{0, 1, 6, 11, 16, 20, 21, 22, 24, 1111, 9802, 30023, 30818} b.IndexableKinds = []nostr.Kind{0, 1, 6, 11, 16, 20, 21, 22, 24, 1111, 9802, 30023, 30818}
} }
if len(b.Languages) == 0 {
b.Languages = SupportedLanguages
}
validLanguages := make([]lingua.Language, 0, len(b.Languages)) validLanguages := make([]lingua.Language, 0, len(b.Languages))
b.languageCodes = make([]string, 0, len(b.Languages)) b.languageCodes = make([]string, 0, len(b.Languages))
for _, lang := range b.Languages { for _, lang := range b.Languages {
+8 -6
View File
@@ -8,8 +8,8 @@ import (
"go.etcd.io/bbolt" "go.etcd.io/bbolt"
) )
func (b *BoltBackend) ReplaceEvent(evt nostr.Event) error { func (b *BoltBackend) ReplaceEvent(evt nostr.Event) (deleted []nostr.Event, err error) {
return b.DB.Update(func(txn *bbolt.Tx) error { err = b.DB.Update(func(txn *bbolt.Tx) error {
rawBucket := txn.Bucket(rawEventStore) rawBucket := txn.Bucket(rawEventStore)
// check if we already have this id // check if we already have this id
@@ -25,12 +25,12 @@ func (b *BoltBackend) ReplaceEvent(evt nostr.Event) error {
} }
// now we fetch the past events, whatever they are, delete them and then save the new // now we fetch the past events, whatever they are, delete them and then save the new
var err error var qerr error
var results iter.Seq[nostr.Event] = func(yield func(nostr.Event) bool) { var results iter.Seq[nostr.Event] = func(yield func(nostr.Event) bool) {
err = b.query(txn, filter, 10 /* in theory limit could be just 1 and this should work */, yield) qerr = b.query(txn, filter, 10 /* in theory limit could be just 1 and this should work */, yield)
} }
if err != nil { if qerr != nil {
return fmt.Errorf("failed to query past events with %s: %w", filter, err) return fmt.Errorf("failed to query past events with %s: %w", filter, qerr)
} }
shouldStore := true shouldStore := true
@@ -39,6 +39,7 @@ func (b *BoltBackend) ReplaceEvent(evt nostr.Event) error {
if err := b.delete(txn, previous.ID); err != nil { if err := b.delete(txn, previous.ID); err != nil {
return fmt.Errorf("failed to delete event %s for replacing: %w", previous.ID, err) return fmt.Errorf("failed to delete event %s for replacing: %w", previous.ID, err)
} }
deleted = append(deleted, previous)
} else { } else {
// there is a newer event already stored, so we won't store this // there is a newer event already stored, so we won't store this
shouldStore = false shouldStore = false
@@ -50,4 +51,5 @@ func (b *BoltBackend) ReplaceEvent(evt nostr.Event) error {
return nil return nil
}) })
return deleted, err
} }
+11 -8
View File
@@ -8,8 +8,8 @@ import (
"github.com/PowerDNS/lmdb-go/lmdb" "github.com/PowerDNS/lmdb-go/lmdb"
) )
func (b *LMDBBackend) ReplaceEvent(evt nostr.Event) error { func (b *LMDBBackend) ReplaceEvent(evt nostr.Event) (deleted []nostr.Event, err error) {
return b.lmdbEnv.Update(func(txn *lmdb.Txn) error { err = b.lmdbEnv.Update(func(txn *lmdb.Txn) error {
// check if we already have this id // check if we already have this id
_, existsErr := txn.Get(b.indexId, evt.ID[0:8]) _, existsErr := txn.Get(b.indexId, evt.ID[0:8])
if existsErr == nil { if existsErr == nil {
@@ -26,20 +26,21 @@ func (b *LMDBBackend) ReplaceEvent(evt nostr.Event) error {
} }
// now we fetch the past events, whatever they are, delete them and then save the new // now we fetch the past events, whatever they are, delete them and then save the new
var err error var qerr error
var results iter.Seq[nostr.Event] = func(yield func(nostr.Event) bool) { var results iter.Seq[nostr.Event] = func(yield func(nostr.Event) bool) {
err = b.query(txn, filter, 10 /* in theory limit could be just 1 and this should work */, yield) qerr = b.query(txn, filter, 10 /* in theory limit could be just 1 and this should work */, yield)
} }
if err != nil { if qerr != nil {
return fmt.Errorf("failed to query past events with %s: %w", filter, err) return fmt.Errorf("failed to query past events with %s: %w", filter, qerr)
} }
shouldStore := true shouldStore := true
for previous := range results { for previous := range results {
if nostr.IsOlder(previous, evt) { if nostr.IsOlder(previous, evt) {
if err := b.delete(txn, previous.ID); err != nil { if qerr := b.delete(txn, previous.ID); qerr != nil {
return fmt.Errorf("failed to delete event %s for replacing: %w", previous.ID, err) return fmt.Errorf("failed to delete event %s for replacing: %w", previous.ID, qerr)
} }
deleted = append(deleted, previous)
} else { } else {
// there is a newer event already stored, so we won't store this // there is a newer event already stored, so we won't store this
shouldStore = false shouldStore = false
@@ -51,4 +52,6 @@ func (b *LMDBBackend) ReplaceEvent(evt nostr.Event) error {
return nil return nil
}) })
return deleted, err
} }
+18 -16
View File
@@ -9,9 +9,9 @@ import (
"github.com/PowerDNS/lmdb-go/lmdb" "github.com/PowerDNS/lmdb-go/lmdb"
) )
func (il *IndexingLayer) ReplaceEvent(evt nostr.Event) error { func (il *IndexingLayer) ReplaceEvent(evt nostr.Event) (deleted []nostr.Event, err error) {
if il.mmmm.ReadOnly { if il.mmmm.ReadOnly {
return ReadOnly return nil, ReadOnly
} }
il.mmmm.writeMutex.Lock() il.mmmm.writeMutex.Lock()
@@ -29,7 +29,7 @@ func (il *IndexingLayer) ReplaceEvent(evt nostr.Event) error {
// prepare transactions // prepare transactions
mmmtxn, err := il.mmmm.lmdbEnv.BeginTxn(nil, 0) mmmtxn, err := il.mmmm.lmdbEnv.BeginTxn(nil, 0)
if err != nil { if err != nil {
return err return nil, err
} }
defer func() { defer func() {
// defer abort but only if we haven't committed (we'll set it to nil after committing) // defer abort but only if we haven't committed (we'll set it to nil after committing)
@@ -41,7 +41,7 @@ func (il *IndexingLayer) ReplaceEvent(evt nostr.Event) error {
iltxn, err := il.lmdbEnv.BeginTxn(nil, 0) iltxn, err := il.lmdbEnv.BeginTxn(nil, 0)
if err != nil { if err != nil {
return err return nil, err
} }
defer func() { defer func() {
// defer abort but only if we haven't committed (we'll set it to nil after committing) // defer abort but only if we haven't committed (we'll set it to nil after committing)
@@ -54,33 +54,35 @@ func (il *IndexingLayer) ReplaceEvent(evt nostr.Event) error {
// check if we already have this id // check if we already have this id
_, existsErr := mmmtxn.Get(il.mmmm.indexId, evt.ID[0:8]) _, existsErr := mmmtxn.Get(il.mmmm.indexId, evt.ID[0:8])
if existsErr == nil { if existsErr == nil {
return nil return nil, nil
} }
if !lmdb.IsNotFound(existsErr) { if !lmdb.IsNotFound(existsErr) {
return fmt.Errorf("error checking existence: %w", existsErr) return nil, fmt.Errorf("error checking existence: %w", existsErr)
} }
// now we fetch the past events, whatever they are, delete them and then save the new // now we fetch the past events, whatever they are, delete them and then save the new
var qerr error
var results iter.Seq[nostr.Event] = func(yield func(nostr.Event) bool) { var results iter.Seq[nostr.Event] = func(yield func(nostr.Event) bool) {
err = il.query(iltxn, filter, 10 /* in theory limit could be just 1 and this should work */, yield) qerr = il.query(iltxn, filter, 10 /* in theory limit could be just 1 and this should work */, yield)
} }
if err != nil { if qerr != nil {
return fmt.Errorf("failed to query past events with %s: %w", filter, err) return nil, fmt.Errorf("failed to query past events with %s: %w", filter, qerr)
} }
var acquiredFreeRangeFromDelete *position var acquiredFreeRangeFromDelete *position
shouldStore := true shouldStore := true
for previous := range results { for previous := range results {
if nostr.IsOlder(previous, evt) { if nostr.IsOlder(previous, evt) {
if pos, shouldPurge, err := il.delete(mmmtxn, iltxn, previous.ID); err != nil { if pos, shouldPurge, derr := il.delete(mmmtxn, iltxn, previous.ID); derr != nil {
return fmt.Errorf("failed to delete event %s for replacing: %w", previous.ID, err) return nil, fmt.Errorf("failed to delete event %s for replacing: %w", previous.ID, derr)
} else if shouldPurge { } else if shouldPurge {
// purge // purge
if err := mmmtxn.Del(il.mmmm.indexId, previous.ID[0:8], nil); err != nil { if err := mmmtxn.Del(il.mmmm.indexId, previous.ID[0:8], nil); err != nil {
return err return nil, err
} }
acquiredFreeRangeFromDelete = &pos acquiredFreeRangeFromDelete = &pos
} }
deleted = append(deleted, previous)
} else { } else {
// there is a newer event already stored, so we won't store this // there is a newer event already stored, so we won't store this
shouldStore = false shouldStore = false
@@ -90,17 +92,17 @@ func (il *IndexingLayer) ReplaceEvent(evt nostr.Event) error {
if shouldStore { if shouldStore {
_, err := il.mmmm.storeOn(mmmtxn, iltxn, il, evt) _, err := il.mmmm.storeOn(mmmtxn, iltxn, il, evt)
if err != nil { if err != nil {
return err return nil, err
} }
} }
// commit in this order to minimize problematic inconsistencies // commit in this order to minimize problematic inconsistencies
if err := mmmtxn.Commit(); err != nil { if err := mmmtxn.Commit(); err != nil {
return fmt.Errorf("can't commit mmmtxn: %w", err) return nil, fmt.Errorf("can't commit mmmtxn: %w", err)
} }
mmmtxn = nil mmmtxn = nil
if err := iltxn.Commit(); err != nil { if err := iltxn.Commit(); err != nil {
return fmt.Errorf("can't commit iltxn: %w", err) return nil, fmt.Errorf("can't commit iltxn: %w", err)
} }
iltxn = nil iltxn = nil
@@ -110,5 +112,5 @@ func (il *IndexingLayer) ReplaceEvent(evt nostr.Event) error {
il.mmmm.mergeNewFreeRange(*acquiredFreeRangeFromDelete) il.mmmm.mergeNewFreeRange(*acquiredFreeRangeFromDelete)
} }
return nil return deleted, nil
} }
+2 -2
View File
@@ -29,8 +29,8 @@ func (b NullStore) SaveEvent(evt nostr.Event) error {
return nil return nil
} }
func (b NullStore) ReplaceEvent(evt nostr.Event) error { func (b NullStore) ReplaceEvent(evt nostr.Event) ([]nostr.Event, error) {
return nil return nil, nil
} }
func (b NullStore) CountEvents(filter nostr.Filter) (uint32, error) { func (b NullStore) CountEvents(filter nostr.Filter) (uint32, error) {
+5 -4
View File
@@ -122,7 +122,7 @@ func (b *SliceStore) delete(id nostr.ID) error {
return nil return nil
} }
func (b *SliceStore) ReplaceEvent(evt nostr.Event) error { func (b *SliceStore) ReplaceEvent(evt nostr.Event) (deleted []nostr.Event, err error) {
b.Lock() b.Lock()
defer b.Unlock() defer b.Unlock()
@@ -135,8 +135,9 @@ func (b *SliceStore) ReplaceEvent(evt nostr.Event) error {
for previous := range b.QueryEvents(filter, 1) { for previous := range b.QueryEvents(filter, 1) {
if nostr.IsOlder(previous, evt) { if nostr.IsOlder(previous, evt) {
if err := b.delete(previous.ID); err != nil { if err := b.delete(previous.ID); err != nil {
return fmt.Errorf("failed to delete event for replacing: %w", err) return nil, fmt.Errorf("failed to delete event for replacing: %w", err)
} }
deleted = append(deleted, previous)
} else { } else {
shouldStore = false shouldStore = false
} }
@@ -144,11 +145,11 @@ func (b *SliceStore) ReplaceEvent(evt nostr.Event) error {
if shouldStore { if shouldStore {
if err := b.save(evt); err != nil && err != eventstore.ErrDupEvent { if err := b.save(evt); err != nil && err != eventstore.ErrDupEvent {
return fmt.Errorf("failed to save: %w", err) return nil, fmt.Errorf("failed to save: %w", err)
} }
} }
return nil return deleted, nil
} }
func eventTimestampComparator(e nostr.Event, t nostr.Timestamp) int { func eventTimestampComparator(e nostr.Event, t nostr.Timestamp) int {
+1 -1
View File
@@ -26,7 +26,7 @@ type Store interface {
// ReplaceEvent atomically replaces a replaceable or addressable event. // ReplaceEvent atomically replaces a replaceable or addressable event.
// Conceptually it is like a Query->Delete->Save, but streamlined. // Conceptually it is like a Query->Delete->Save, but streamlined.
ReplaceEvent(nostr.Event) error ReplaceEvent(nostr.Event) (deleted []nostr.Event, err error)
// CountEvents counts all events that match a given filter // CountEvents counts all events that match a given filter
CountEvents(nostr.Filter) (uint32, error) CountEvents(nostr.Filter) (uint32, error)
+2 -1
View File
@@ -36,5 +36,6 @@ func (w DynamicPublisher) Publish(ctx context.Context, evt nostr.Event) error {
} }
} }
return w.GetStore().ReplaceEvent(evt) _, err := w.GetStore().ReplaceEvent(evt)
return err
} }
+2 -1
View File
@@ -39,5 +39,6 @@ func (w StorePublisher) Publish(ctx context.Context, evt nostr.Event) error {
} }
// others are replaced // others are replaced
return w.Store.ReplaceEvent(evt) _, err := w.Store.ReplaceEvent(evt)
return err
} }
+1 -1
View File
@@ -46,7 +46,7 @@ func (rl *Relay) handleNormal(ctx context.Context, evt nostr.Event) (skipBroadca
} else { } else {
// otherwise it's a replaceable // otherwise it's a replaceable
if nil != rl.ReplaceEvent { if nil != rl.ReplaceEvent {
if err := rl.ReplaceEvent(ctx, evt); err != nil { if _, err := rl.ReplaceEvent(ctx, evt); err != nil {
switch err { switch err {
case eventstore.ErrDupEvent: case eventstore.ErrDupEvent:
return true, nil return true, nil
+2 -2
View File
@@ -68,7 +68,7 @@ type Relay struct {
// hooks that will be called at various times // hooks that will be called at various times
OnEvent func(ctx context.Context, event nostr.Event) (reject bool, msg string) OnEvent func(ctx context.Context, event nostr.Event) (reject bool, msg string)
StoreEvent func(ctx context.Context, event nostr.Event) error StoreEvent func(ctx context.Context, event nostr.Event) error
ReplaceEvent func(ctx context.Context, event nostr.Event) error ReplaceEvent func(ctx context.Context, event nostr.Event) ([]nostr.Event, error)
DeleteEvent func(ctx context.Context, id nostr.ID) error DeleteEvent func(ctx context.Context, id nostr.ID) error
OnEventSaved func(ctx context.Context, event nostr.Event) OnEventSaved func(ctx context.Context, event nostr.Event)
OnEventDeleted func(ctx context.Context, deleted nostr.Event) OnEventDeleted func(ctx context.Context, deleted nostr.Event)
@@ -145,7 +145,7 @@ func (rl *Relay) UseEventstore(store eventstore.Store, maxQueryLimit int) {
rl.StoreEvent = func(ctx context.Context, event nostr.Event) error { rl.StoreEvent = func(ctx context.Context, event nostr.Event) error {
return store.SaveEvent(event) return store.SaveEvent(event)
} }
rl.ReplaceEvent = func(ctx context.Context, event nostr.Event) error { rl.ReplaceEvent = func(ctx context.Context, event nostr.Event) ([]nostr.Event, error) {
return store.ReplaceEvent(event) return store.ReplaceEvent(event)
} }
rl.DeleteEvent = func(ctx context.Context, id nostr.ID) error { rl.DeleteEvent = func(ctx context.Context, id nostr.ID) error {