From 0f8843afac44399b74c2deb65aa9b9c45629e20c Mon Sep 17 00:00:00 2001 From: fiatjaf Date: Mon, 15 Jun 2026 14:23:54 -0300 Subject: [PATCH] eventstore/mmm: defrag. --- eventstore/mmm/freeranges.go | 170 +++++++++++ eventstore/mmm/freeranges_test.go | 283 ++++++++++++++++++ eventstore/mmm/fuzz_test.go | 5 + eventstore/mmm/mmmm.go | 1 + eventstore/mmm/position.go | 7 +- eventstore/mmm/save.go | 4 +- .../fuzz/FuzzDefragment/d5845b29b7a801f7 | 2 + .../testdata/fuzz/FuzzTest/ffd0b6bc2949a534 | 5 + 8 files changed, 471 insertions(+), 6 deletions(-) create mode 100644 eventstore/mmm/testdata/fuzz/FuzzDefragment/d5845b29b7a801f7 create mode 100644 eventstore/mmm/testdata/fuzz/FuzzTest/ffd0b6bc2949a534 diff --git a/eventstore/mmm/freeranges.go b/eventstore/mmm/freeranges.go index c2198e1..2811c19 100644 --- a/eventstore/mmm/freeranges.go +++ b/eventstore/mmm/freeranges.go @@ -2,9 +2,15 @@ package mmm import ( "cmp" + "encoding/binary" "fmt" + "runtime" "slices" + "syscall" + "unsafe" + "fiatjaf.com/nostr" + "fiatjaf.com/nostr/eventstore/codec/betterbinary" "github.com/PowerDNS/lmdb-go/lmdb" ) @@ -150,3 +156,167 @@ func (b *MultiMmapManager) mergeNewFreeRange(newFreeRange position) { } } } + +func (b *MultiMmapManager) Defragment(n int) error { + if b.ReadOnly { + return ReadOnly + } + + b.writeMutex.Lock() + defer b.writeMutex.Unlock() + + runtime.LockOSThread() + defer runtime.UnlockOSThread() + + if n > len(b.freeRangesAll)-1 { + n = len(b.freeRangesAll) - 1 + } + if n <= 0 { + return nil + } + + mmmtxn, err := b.lmdbEnv.BeginTxn(nil, 0) + if err != nil { + return fmt.Errorf("failed to begin mmm transaction: %w", err) + } + defer mmmtxn.Abort() + + type layerTxn struct { + il *IndexingLayer + txn *lmdb.Txn + } + layerTxns := make(map[uint16]*layerTxn) + defer func() { + for _, lt := range layerTxns { + lt.txn.Abort() + } + }() + + // iterate only through `n` free ranges, as specified + for i := 0; i < n; i++ { + fr := b.freeRangesAll[i] + + // where the free range ends, the events start (any number of them) + eventsStart := fr.start + uint64(fr.size) + eventsEnd := b.freeRangesAll[i+1].start // and they end when the next free range starts + + c := uint64(0) // this tracks our relative position inside the events section + for (eventsStart + c) < eventsEnd { + var evt nostr.Event + if err := betterbinary.Unmarshal(b.mmapf[(eventsStart+c):eventsEnd], &evt); err != nil { + id := betterbinary.GetID(b.mmapf[(eventsStart + c):eventsEnd]) + return fmt.Errorf("failed to read event (%x) from mmap: %w", id[:], err) + } + + // now that we have an event we'll update its pos on the id index and on every layer: + oldVal, err := mmmtxn.Get(b.indexId, evt.ID[0:8]) + if err != nil { + return fmt.Errorf("failed to read val (%x) from index: %w", evt.ID[:], err) + } + + // current position + pos := positionFromBytes(oldVal[0:12]) + + // new position (from the beginning of the free range before + relative position) + pos.start = fr.start + uint64(c) + + // update this cursor + c += uint64(pos.size) + + // prepare and save id index + newVal := make([]byte, len(oldVal)) + writeBytesFromPosition(newVal, pos) + copy(newVal[12:], oldVal[12:]) + if err := mmmtxn.Put(b.indexId, evt.ID[0:8], newVal, 0); err != nil { + return fmt.Errorf("failed to write new pos to id index: %w", err) + } + + for s := 12; s < len(oldVal); s += 2 { + layer := binary.BigEndian.Uint16(oldVal[s : s+2]) + lt, ok := layerTxns[layer] + if !ok { + il := b.layers.ByID(layer) + if il == nil { + fmt.Println(b.layers) + panic(fmt.Errorf("missing layer %d", layer)) + } + txn, err := il.lmdbEnv.BeginTxn(nil, 0) + if err != nil { + return fmt.Errorf("failed to begin layer txn for layer %d: %w", il.id, err) + } + txn.RawRead = true + lt = &layerTxn{il: il, txn: txn} + layerTxns[il.id] = lt + } + + for k := range lt.il.getIndexKeysForEvent(evt) { + if err := lt.txn.Del(k.dbi, k.key, oldVal[0:12]); err != nil { + return fmt.Errorf("failed to delete old index entry for %x: %w", evt.ID[:], err) + } + if err := lt.txn.Put(k.dbi, k.key, newVal[0:12], 0); err != nil { + return fmt.Errorf("failed to insert new index entry for %x: %w", evt.ID[:], err) + } + } + } + } + + // now that we have updated all the pointers, just copy all the bytes between the two free ranges + copy(b.mmapf[fr.start:], b.mmapf[fr.start+uint64(fr.size):eventsEnd]) + + // delete this free range if it's one of the big ones + if fr.isLarge() { + for l, lfr := range b.freeRangesLarge { + if lfr.start == fr.start { + b.freeRangesLarge[l] = b.freeRangesLarge[len(b.freeRangesLarge)-1] + b.freeRangesLarge = b.freeRangesLarge[0 : len(b.freeRangesLarge)-1] + break + } + } + } + + // now we have some space left at the end of this events section that is a free range + remainingSpaceStart := fr.start + c + + // it must be merged with the next free range + updated := position{ + start: remainingSpaceStart, + size: b.freeRangesAll[i+1].size + uint32(eventsEnd) - uint32(remainingSpaceStart), + } + nextWasLarge := b.freeRangesAll[i+1].isLarge() + b.freeRangesAll[i+1] = updated + + if nextWasLarge { + for l, lfr := range b.freeRangesLarge { + if lfr.start == eventsEnd { + b.freeRangesLarge[l] = updated + break + } + } + } else if updated.isLarge() { + // if it wasn't large but now is, add it to the list of large free ranges + b.freeRangesLarge = append(b.freeRangesLarge, updated) + } + } + + // msync + _, _, errno := syscall.Syscall(syscall.SYS_MSYNC, + uintptr(unsafe.Pointer(&b.mmapf[0])), uintptr(len(b.mmapf)), syscall.MS_SYNC) + if errno != 0 { + return fmt.Errorf("msync failed: %w", syscall.Errno(errno)) + } + + // commit transactions + if err := mmmtxn.Commit(); err != nil { + return fmt.Errorf("failed to commit mmm transaction: %w", err) + } + for lid, lt := range layerTxns { + if err := lt.txn.Commit(); err != nil { + return fmt.Errorf("failed to commit layer %d transaction: %w", lid, err) + } + } + + // delete the free ranges in bulk + b.freeRangesAll = slices.Delete(b.freeRangesAll, 0, n) + + return nil +} diff --git a/eventstore/mmm/freeranges_test.go b/eventstore/mmm/freeranges_test.go index 7b101e6..86f21c1 100644 --- a/eventstore/mmm/freeranges_test.go +++ b/eventstore/mmm/freeranges_test.go @@ -1,8 +1,11 @@ package mmm import ( + "cmp" + "fmt" "math/rand/v2" "os" + "slices" "strings" "testing" @@ -125,6 +128,100 @@ func FuzzFreeRanges(f *testing.F) { }) } +func TestDefragment(t *testing.T) { + tmpDir, err := os.MkdirTemp("", "mmm-defrag-test-*") + require.NoError(t, err) + defer os.RemoveAll(tmpDir) + + mmmm := &MultiMmapManager{Dir: tmpDir} + err = mmmm.Init() + require.NoError(t, err) + defer mmmm.Close() + + il, err := mmmm.EnsureLayer("a") + require.NoError(t, err) + defer il.Close() + + sk := nostr.MustSecretKeyFromHex("945e01e37662430162121b804d3645a86d97df9d256917d86735d0eb219393eb") + + const nevents = 30 + var stored [nevents]nostr.Event + for i := range nevents { + evt := nostr.Event{ + CreatedAt: nostr.Timestamp(i), + Kind: nostr.KindTextNote, + Tags: nostr.Tags{}, + Content: fmt.Sprintf("============= event %d ============= "+strings.Repeat("+", 23), i), + } + evt.Sign(sk) + err := il.SaveEvent(evt) + require.NoError(t, err) + stored[i] = evt + } + + toDelete := []int{0, 5, 10, 15, 20} + var remaining []nostr.Event + for i, evt := range stored { + if slices.Contains(toDelete, i) { + err := il.DeleteEvent(evt.ID) + require.NoError(t, err) + } else { + remaining = append(remaining, evt) + } + } + + require.Len(t, toDelete, len(mmmm.freeRangesAll)) + + err = mmmm.Defragment(2) + require.NoError(t, err) + + require.Len(t, mmmm.freeRangesAll, 3) + require.Len(t, remaining, nevents-len(toDelete)) + + // all remaining events still accessible with correct content via GetByID + for _, evt := range remaining { + gotEvt, layers := mmmm.GetByID(evt.ID) + require.NotNil(t, gotEvt, "event %s should exist after defrag", evt.ID) + require.NotEmpty(t, layers, "event %s should have layers after defrag", evt.ID) + require.Equal(t, evt.Content, gotEvt.Content, "event %s content should match after defrag", evt.ID) + + // also accessible via a query + require.Equal(t, il, layers[0]) + } + + evts := slices.Collect(il.QueryEvents(nostr.Filter{Kinds: []nostr.Kind{nostr.KindTextNote}}, 100)) + require.Len(t, evts, nevents-len(toDelete)) + + // free range invariants hold after defrag + verifyFreeRangesInvariants(t, mmmm) + + // no overlapping positions after defrag + mmmm.lmdbEnv.View(func(txn *lmdb.Txn) error { + cursor, err := txn.OpenCursor(mmmm.indexId) + require.NoError(t, err) + defer cursor.Close() + + var allPositions []position + for _, val, err := cursor.Get(nil, nil, lmdb.First); err == nil; _, val, err = cursor.Get(nil, val, lmdb.Next) { + pos := positionFromBytes(val[0:12]) + allPositions = append(allPositions, pos) + } + + slices.SortFunc(allPositions, func(a, b position) int { + return cmp.Compare(a.start, b.start) + }) + + var lastEnd uint64 + for _, pos := range allPositions { + if pos.start < lastEnd { + t.Fatalf("event overlap after defrag: %d < %d", pos.start, lastEnd) + } + lastEnd = pos.start + uint64(pos.size) + } + return nil + }) +} + func countUsableFreeRanges(t *testing.T, mmmm *MultiMmapManager) (count int, space int) { for _, fr := range mmmm.freeRangesAll { if fr.size >= LARGE_FREERANGE { @@ -176,3 +273,189 @@ func verifyFreeRangesInvariants(t *testing.T, mmmm *MultiMmapManager) { return nil }) } + +func FuzzDefragment(f *testing.F) { + f.Add(0) + f.Fuzz(func(t *testing.T, seed int) { + tmpDir, err := os.MkdirTemp("", "mmm-defrag-fuzz-*") + require.NoError(t, err) + defer os.RemoveAll(tmpDir) + + logger := zerolog.Nop() + rnd := rand.New(rand.NewPCG(uint64(seed), 0)) + + mmmm := &MultiMmapManager{ + Dir: tmpDir, + Logger: &logger, + } + err = mmmm.Init() + require.NoError(t, err) + defer mmmm.Close() + + layerNames := []string{"a", "b", "c"} + var layers []*IndexingLayer + for _, name := range layerNames { + il, err := mmmm.EnsureLayer(name) + require.NoError(t, err) + defer il.Close() + layers = append(layers, il) + } + + type indexedEvent struct { + evt nostr.Event + tag string + } + layerEvents := make([][]indexedEvent, len(layers)) + + sk := nostr.MustSecretKeyFromHex("945e01e37662430162121b804d3645a86d97df9d256917d86735d0eb219393eb") + pk := sk.Public() + + totalEvents := rnd.IntN(500) + tChoices := []string{"foo", "bar", "banana"} + var written int + + for written < totalEvents { + n := rnd.IntN(50) + 1 + if n > totalEvents-written { + n = totalEvents - written + } + + for i := 0; i < n; i++ { + sizeParam := rnd.IntN(2000) + content := strings.Repeat("z", sizeParam) + + chosenTag := tChoices[rnd.IntN(3)] + evt := nostr.Event{ + CreatedAt: nostr.Timestamp(rnd.Uint32()), + Kind: nostr.KindTextNote, + Content: content, + Tags: nostr.Tags{{"t", chosenTag}}, + } + evt.Sign(sk) + + nLayers := rnd.IntN(len(layers)) + 1 + perm := rnd.Perm(len(layers)) + for pi := 0; pi < nLayers; pi++ { + li := perm[pi] + err := layers[li].SaveEvent(evt) + require.NoError(t, err) + layerEvents[li] = append(layerEvents[li], indexedEvent{evt, chosenTag}) + } + written++ + } + + if n > 0 { + totalRemaining := 0 + for _, levts := range layerEvents { + totalRemaining += len(levts) + } + if totalRemaining > 0 { + m := rnd.IntN(n) + if m > totalRemaining { + m = totalRemaining + } + for i := 0; i < m; i++ { + var nonEmpty []int + for li, levts := range layerEvents { + if len(levts) > 0 { + nonEmpty = append(nonEmpty, li) + } + } + if len(nonEmpty) == 0 { + break + } + li := nonEmpty[rnd.IntN(len(nonEmpty))] + idx := rnd.IntN(len(layerEvents[li])) + evtInfo := layerEvents[li][idx] + err := layers[li].DeleteEvent(evtInfo.evt.ID) + require.NoError(t, err) + layerEvents[li] = append(layerEvents[li][:idx], layerEvents[li][idx+1:]...) + } + } + } + + if n > 0 { + o := rnd.IntN(n) + for i := 0; i < o; i++ { + if len(mmmm.freeRangesAll) > 1 { + param := rnd.IntN(len(mmmm.freeRangesAll)) + err := mmmm.Defragment(param) + require.NoError(t, err) + } + } + } + } + + // query each layer + for li, il := range layers { + levts := layerEvents[li] + + // query by author + evts := slices.Collect(il.QueryEvents(nostr.Filter{Authors: []nostr.PubKey{pk}}, 10000)) + require.Equal(t, len(levts), len(evts)) + + // query by author and kind + evts = slices.Collect(il.QueryEvents(nostr.Filter{Authors: []nostr.PubKey{pk}, Kinds: []nostr.Kind{nostr.KindTextNote}}, 10000)) + require.Equal(t, len(levts), len(evts)) + + // query by "t" tag + for _, tagVal := range tChoices { + expected := 0 + for _, ie := range levts { + if ie.tag == tagVal { + expected++ + } + } + evts = slices.Collect(il.QueryEvents(nostr.Filter{Tags: nostr.TagMap{"t": []string{tagVal}}}, 10000)) + require.Equal(t, expected, len(evts)) + } + + // query with no parameters + allEvts := slices.Collect(il.QueryEvents(nostr.Filter{}, 10000)) + require.Equal(t, len(levts), len(allEvts)) + } + + // build union of all events across all layers + allEventSet := make(map[string]nostr.Event) + for _, levts := range layerEvents { + for _, ie := range levts { + allEventSet[ie.evt.ID.String()] = ie.evt + } + } + + // all events still accessible via GetByID + for _, evt := range allEventSet { + gotEvt, eventLayers := mmmm.GetByID(evt.ID) + require.NotNil(t, gotEvt) + require.NotEmpty(t, eventLayers) + require.Equal(t, evt.Content, gotEvt.Content) + } + + verifyFreeRangesInvariants(t, mmmm) + + mmmm.lmdbEnv.View(func(txn *lmdb.Txn) error { + cursor, err := txn.OpenCursor(mmmm.indexId) + require.NoError(t, err) + defer cursor.Close() + + var allPositions []position + for _, val, err := cursor.Get(nil, nil, lmdb.First); err == nil; _, val, err = cursor.Get(nil, val, lmdb.Next) { + pos := positionFromBytes(val[0:12]) + allPositions = append(allPositions, pos) + } + + slices.SortFunc(allPositions, func(a, b position) int { + return cmp.Compare(a.start, b.start) + }) + + var lastEnd uint64 + for _, pos := range allPositions { + if pos.start < lastEnd { + t.Fatalf("event overlap after defrag: %d < %d", pos.start, lastEnd) + } + lastEnd = pos.start + uint64(pos.size) + } + return nil + }) + }) +} diff --git a/eventstore/mmm/fuzz_test.go b/eventstore/mmm/fuzz_test.go index c352e66..f93587c 100644 --- a/eventstore/mmm/fuzz_test.go +++ b/eventstore/mmm/fuzz_test.go @@ -142,6 +142,11 @@ func FuzzTest(f *testing.F) { mmmm.Rescan() } + // perform random defrags -- shouldn't break the database + if rnd.UintN(3) == 1 { + mmmm.Defragment(len(deleted) / 3) + } + for id, deletedlayers := range deleted { evt, foundlayers := mmmm.GetByID(id) diff --git a/eventstore/mmm/mmmm.go b/eventstore/mmm/mmmm.go index 9af9f55..042c664 100644 --- a/eventstore/mmm/mmmm.go +++ b/eventstore/mmm/mmmm.go @@ -341,6 +341,7 @@ func (b *MultiMmapManager) removeAllReferencesFromLayer(txn *lmdb.Txn, layerId u return nil } +//go:inline func (b *MultiMmapManager) loadEvent(pos position, eventReceiver *nostr.Event) error { return betterbinary.Unmarshal(b.mmapf[pos.start:pos.start+uint64(pos.size)], eventReceiver) } diff --git a/eventstore/mmm/position.go b/eventstore/mmm/position.go index 21b11d0..3fb37e6 100644 --- a/eventstore/mmm/position.go +++ b/eventstore/mmm/position.go @@ -17,11 +17,6 @@ func (poss positions) find(start uint64) (idx int) { return idx } -func (poss positions) del(start uint64) positions { - idx := poss.find(start) - return slices.Delete(poss, idx, idx+1) -} - func (poss positions) String() string { str := strings.Builder{} str.Grow(10 + 20*len(poss)) @@ -46,6 +41,7 @@ func (pos position) isLarge() bool { return pos.size >= LARGE_FREERANGE } +//go:inline func positionFromBytes(posb []byte) position { return position{ size: binary.BigEndian.Uint32(posb[0:4]), @@ -53,6 +49,7 @@ func positionFromBytes(posb []byte) position { } } +//go:inline func writeBytesFromPosition(out []byte, pos position) { binary.BigEndian.PutUint32(out[0:4], pos.size) binary.BigEndian.PutUint64(out[4:12], pos.start) diff --git a/eventstore/mmm/save.go b/eventstore/mmm/save.go index 3bf5e9c..b9f3361 100644 --- a/eventstore/mmm/save.go +++ b/eventstore/mmm/save.go @@ -5,6 +5,7 @@ import ( "fmt" "os" "runtime" + "slices" "syscall" "unsafe" @@ -119,7 +120,8 @@ func (b *MultiMmapManager) storeOn( b.freeRangesLarge = b.freeRangesLarge[0 : len(b.freeRangesLarge)-1] // also delete it from b.freeRangesAll - b.freeRangesAll = b.freeRangesAll.del(fr.start) + idx := b.freeRangesAll.find(fr.start) + b.freeRangesAll = slices.Delete(b.freeRangesAll, idx, idx+1) } else { // otherwise modify it in place newFreeRange := position{ diff --git a/eventstore/mmm/testdata/fuzz/FuzzDefragment/d5845b29b7a801f7 b/eventstore/mmm/testdata/fuzz/FuzzDefragment/d5845b29b7a801f7 new file mode 100644 index 0000000..7ded844 --- /dev/null +++ b/eventstore/mmm/testdata/fuzz/FuzzDefragment/d5845b29b7a801f7 @@ -0,0 +1,2 @@ +go test fuzz v1 +int(-17) diff --git a/eventstore/mmm/testdata/fuzz/FuzzTest/ffd0b6bc2949a534 b/eventstore/mmm/testdata/fuzz/FuzzTest/ffd0b6bc2949a534 new file mode 100644 index 0000000..433f278 --- /dev/null +++ b/eventstore/mmm/testdata/fuzz/FuzzTest/ffd0b6bc2949a534 @@ -0,0 +1,5 @@ +go test fuzz v1 +int(46) +uint(84) +uint(55) +uint(5)