eventstore/mmm: defrag.

This commit is contained in:
fiatjaf
2026-06-15 14:23:54 -03:00
parent 0616b30ab3
commit 0f8843afac
8 changed files with 471 additions and 6 deletions
+170
View File
@@ -2,9 +2,15 @@ package mmm
import (
"cmp"
"encoding/binary"
"fmt"
"runtime"
"slices"
"syscall"
"unsafe"
"fiatjaf.com/nostr"
"fiatjaf.com/nostr/eventstore/codec/betterbinary"
"github.com/PowerDNS/lmdb-go/lmdb"
)
@@ -150,3 +156,167 @@ func (b *MultiMmapManager) mergeNewFreeRange(newFreeRange position) {
}
}
}
func (b *MultiMmapManager) Defragment(n int) error {
if b.ReadOnly {
return ReadOnly
}
b.writeMutex.Lock()
defer b.writeMutex.Unlock()
runtime.LockOSThread()
defer runtime.UnlockOSThread()
if n > len(b.freeRangesAll)-1 {
n = len(b.freeRangesAll) - 1
}
if n <= 0 {
return nil
}
mmmtxn, err := b.lmdbEnv.BeginTxn(nil, 0)
if err != nil {
return fmt.Errorf("failed to begin mmm transaction: %w", err)
}
defer mmmtxn.Abort()
type layerTxn struct {
il *IndexingLayer
txn *lmdb.Txn
}
layerTxns := make(map[uint16]*layerTxn)
defer func() {
for _, lt := range layerTxns {
lt.txn.Abort()
}
}()
// iterate only through `n` free ranges, as specified
for i := 0; i < n; i++ {
fr := b.freeRangesAll[i]
// where the free range ends, the events start (any number of them)
eventsStart := fr.start + uint64(fr.size)
eventsEnd := b.freeRangesAll[i+1].start // and they end when the next free range starts
c := uint64(0) // this tracks our relative position inside the events section
for (eventsStart + c) < eventsEnd {
var evt nostr.Event
if err := betterbinary.Unmarshal(b.mmapf[(eventsStart+c):eventsEnd], &evt); err != nil {
id := betterbinary.GetID(b.mmapf[(eventsStart + c):eventsEnd])
return fmt.Errorf("failed to read event (%x) from mmap: %w", id[:], err)
}
// now that we have an event we'll update its pos on the id index and on every layer:
oldVal, err := mmmtxn.Get(b.indexId, evt.ID[0:8])
if err != nil {
return fmt.Errorf("failed to read val (%x) from index: %w", evt.ID[:], err)
}
// current position
pos := positionFromBytes(oldVal[0:12])
// new position (from the beginning of the free range before + relative position)
pos.start = fr.start + uint64(c)
// update this cursor
c += uint64(pos.size)
// prepare and save id index
newVal := make([]byte, len(oldVal))
writeBytesFromPosition(newVal, pos)
copy(newVal[12:], oldVal[12:])
if err := mmmtxn.Put(b.indexId, evt.ID[0:8], newVal, 0); err != nil {
return fmt.Errorf("failed to write new pos to id index: %w", err)
}
for s := 12; s < len(oldVal); s += 2 {
layer := binary.BigEndian.Uint16(oldVal[s : s+2])
lt, ok := layerTxns[layer]
if !ok {
il := b.layers.ByID(layer)
if il == nil {
fmt.Println(b.layers)
panic(fmt.Errorf("missing layer %d", layer))
}
txn, err := il.lmdbEnv.BeginTxn(nil, 0)
if err != nil {
return fmt.Errorf("failed to begin layer txn for layer %d: %w", il.id, err)
}
txn.RawRead = true
lt = &layerTxn{il: il, txn: txn}
layerTxns[il.id] = lt
}
for k := range lt.il.getIndexKeysForEvent(evt) {
if err := lt.txn.Del(k.dbi, k.key, oldVal[0:12]); err != nil {
return fmt.Errorf("failed to delete old index entry for %x: %w", evt.ID[:], err)
}
if err := lt.txn.Put(k.dbi, k.key, newVal[0:12], 0); err != nil {
return fmt.Errorf("failed to insert new index entry for %x: %w", evt.ID[:], err)
}
}
}
}
// now that we have updated all the pointers, just copy all the bytes between the two free ranges
copy(b.mmapf[fr.start:], b.mmapf[fr.start+uint64(fr.size):eventsEnd])
// delete this free range if it's one of the big ones
if fr.isLarge() {
for l, lfr := range b.freeRangesLarge {
if lfr.start == fr.start {
b.freeRangesLarge[l] = b.freeRangesLarge[len(b.freeRangesLarge)-1]
b.freeRangesLarge = b.freeRangesLarge[0 : len(b.freeRangesLarge)-1]
break
}
}
}
// now we have some space left at the end of this events section that is a free range
remainingSpaceStart := fr.start + c
// it must be merged with the next free range
updated := position{
start: remainingSpaceStart,
size: b.freeRangesAll[i+1].size + uint32(eventsEnd) - uint32(remainingSpaceStart),
}
nextWasLarge := b.freeRangesAll[i+1].isLarge()
b.freeRangesAll[i+1] = updated
if nextWasLarge {
for l, lfr := range b.freeRangesLarge {
if lfr.start == eventsEnd {
b.freeRangesLarge[l] = updated
break
}
}
} else if updated.isLarge() {
// if it wasn't large but now is, add it to the list of large free ranges
b.freeRangesLarge = append(b.freeRangesLarge, updated)
}
}
// msync
_, _, errno := syscall.Syscall(syscall.SYS_MSYNC,
uintptr(unsafe.Pointer(&b.mmapf[0])), uintptr(len(b.mmapf)), syscall.MS_SYNC)
if errno != 0 {
return fmt.Errorf("msync failed: %w", syscall.Errno(errno))
}
// commit transactions
if err := mmmtxn.Commit(); err != nil {
return fmt.Errorf("failed to commit mmm transaction: %w", err)
}
for lid, lt := range layerTxns {
if err := lt.txn.Commit(); err != nil {
return fmt.Errorf("failed to commit layer %d transaction: %w", lid, err)
}
}
// delete the free ranges in bulk
b.freeRangesAll = slices.Delete(b.freeRangesAll, 0, n)
return nil
}
+283
View File
@@ -1,8 +1,11 @@
package mmm
import (
"cmp"
"fmt"
"math/rand/v2"
"os"
"slices"
"strings"
"testing"
@@ -125,6 +128,100 @@ func FuzzFreeRanges(f *testing.F) {
})
}
func TestDefragment(t *testing.T) {
tmpDir, err := os.MkdirTemp("", "mmm-defrag-test-*")
require.NoError(t, err)
defer os.RemoveAll(tmpDir)
mmmm := &MultiMmapManager{Dir: tmpDir}
err = mmmm.Init()
require.NoError(t, err)
defer mmmm.Close()
il, err := mmmm.EnsureLayer("a")
require.NoError(t, err)
defer il.Close()
sk := nostr.MustSecretKeyFromHex("945e01e37662430162121b804d3645a86d97df9d256917d86735d0eb219393eb")
const nevents = 30
var stored [nevents]nostr.Event
for i := range nevents {
evt := nostr.Event{
CreatedAt: nostr.Timestamp(i),
Kind: nostr.KindTextNote,
Tags: nostr.Tags{},
Content: fmt.Sprintf("============= event %d ============= "+strings.Repeat("+", 23), i),
}
evt.Sign(sk)
err := il.SaveEvent(evt)
require.NoError(t, err)
stored[i] = evt
}
toDelete := []int{0, 5, 10, 15, 20}
var remaining []nostr.Event
for i, evt := range stored {
if slices.Contains(toDelete, i) {
err := il.DeleteEvent(evt.ID)
require.NoError(t, err)
} else {
remaining = append(remaining, evt)
}
}
require.Len(t, toDelete, len(mmmm.freeRangesAll))
err = mmmm.Defragment(2)
require.NoError(t, err)
require.Len(t, mmmm.freeRangesAll, 3)
require.Len(t, remaining, nevents-len(toDelete))
// all remaining events still accessible with correct content via GetByID
for _, evt := range remaining {
gotEvt, layers := mmmm.GetByID(evt.ID)
require.NotNil(t, gotEvt, "event %s should exist after defrag", evt.ID)
require.NotEmpty(t, layers, "event %s should have layers after defrag", evt.ID)
require.Equal(t, evt.Content, gotEvt.Content, "event %s content should match after defrag", evt.ID)
// also accessible via a query
require.Equal(t, il, layers[0])
}
evts := slices.Collect(il.QueryEvents(nostr.Filter{Kinds: []nostr.Kind{nostr.KindTextNote}}, 100))
require.Len(t, evts, nevents-len(toDelete))
// free range invariants hold after defrag
verifyFreeRangesInvariants(t, mmmm)
// no overlapping positions after defrag
mmmm.lmdbEnv.View(func(txn *lmdb.Txn) error {
cursor, err := txn.OpenCursor(mmmm.indexId)
require.NoError(t, err)
defer cursor.Close()
var allPositions []position
for _, val, err := cursor.Get(nil, nil, lmdb.First); err == nil; _, val, err = cursor.Get(nil, val, lmdb.Next) {
pos := positionFromBytes(val[0:12])
allPositions = append(allPositions, pos)
}
slices.SortFunc(allPositions, func(a, b position) int {
return cmp.Compare(a.start, b.start)
})
var lastEnd uint64
for _, pos := range allPositions {
if pos.start < lastEnd {
t.Fatalf("event overlap after defrag: %d < %d", pos.start, lastEnd)
}
lastEnd = pos.start + uint64(pos.size)
}
return nil
})
}
func countUsableFreeRanges(t *testing.T, mmmm *MultiMmapManager) (count int, space int) {
for _, fr := range mmmm.freeRangesAll {
if fr.size >= LARGE_FREERANGE {
@@ -176,3 +273,189 @@ func verifyFreeRangesInvariants(t *testing.T, mmmm *MultiMmapManager) {
return nil
})
}
func FuzzDefragment(f *testing.F) {
f.Add(0)
f.Fuzz(func(t *testing.T, seed int) {
tmpDir, err := os.MkdirTemp("", "mmm-defrag-fuzz-*")
require.NoError(t, err)
defer os.RemoveAll(tmpDir)
logger := zerolog.Nop()
rnd := rand.New(rand.NewPCG(uint64(seed), 0))
mmmm := &MultiMmapManager{
Dir: tmpDir,
Logger: &logger,
}
err = mmmm.Init()
require.NoError(t, err)
defer mmmm.Close()
layerNames := []string{"a", "b", "c"}
var layers []*IndexingLayer
for _, name := range layerNames {
il, err := mmmm.EnsureLayer(name)
require.NoError(t, err)
defer il.Close()
layers = append(layers, il)
}
type indexedEvent struct {
evt nostr.Event
tag string
}
layerEvents := make([][]indexedEvent, len(layers))
sk := nostr.MustSecretKeyFromHex("945e01e37662430162121b804d3645a86d97df9d256917d86735d0eb219393eb")
pk := sk.Public()
totalEvents := rnd.IntN(500)
tChoices := []string{"foo", "bar", "banana"}
var written int
for written < totalEvents {
n := rnd.IntN(50) + 1
if n > totalEvents-written {
n = totalEvents - written
}
for i := 0; i < n; i++ {
sizeParam := rnd.IntN(2000)
content := strings.Repeat("z", sizeParam)
chosenTag := tChoices[rnd.IntN(3)]
evt := nostr.Event{
CreatedAt: nostr.Timestamp(rnd.Uint32()),
Kind: nostr.KindTextNote,
Content: content,
Tags: nostr.Tags{{"t", chosenTag}},
}
evt.Sign(sk)
nLayers := rnd.IntN(len(layers)) + 1
perm := rnd.Perm(len(layers))
for pi := 0; pi < nLayers; pi++ {
li := perm[pi]
err := layers[li].SaveEvent(evt)
require.NoError(t, err)
layerEvents[li] = append(layerEvents[li], indexedEvent{evt, chosenTag})
}
written++
}
if n > 0 {
totalRemaining := 0
for _, levts := range layerEvents {
totalRemaining += len(levts)
}
if totalRemaining > 0 {
m := rnd.IntN(n)
if m > totalRemaining {
m = totalRemaining
}
for i := 0; i < m; i++ {
var nonEmpty []int
for li, levts := range layerEvents {
if len(levts) > 0 {
nonEmpty = append(nonEmpty, li)
}
}
if len(nonEmpty) == 0 {
break
}
li := nonEmpty[rnd.IntN(len(nonEmpty))]
idx := rnd.IntN(len(layerEvents[li]))
evtInfo := layerEvents[li][idx]
err := layers[li].DeleteEvent(evtInfo.evt.ID)
require.NoError(t, err)
layerEvents[li] = append(layerEvents[li][:idx], layerEvents[li][idx+1:]...)
}
}
}
if n > 0 {
o := rnd.IntN(n)
for i := 0; i < o; i++ {
if len(mmmm.freeRangesAll) > 1 {
param := rnd.IntN(len(mmmm.freeRangesAll))
err := mmmm.Defragment(param)
require.NoError(t, err)
}
}
}
}
// query each layer
for li, il := range layers {
levts := layerEvents[li]
// query by author
evts := slices.Collect(il.QueryEvents(nostr.Filter{Authors: []nostr.PubKey{pk}}, 10000))
require.Equal(t, len(levts), len(evts))
// query by author and kind
evts = slices.Collect(il.QueryEvents(nostr.Filter{Authors: []nostr.PubKey{pk}, Kinds: []nostr.Kind{nostr.KindTextNote}}, 10000))
require.Equal(t, len(levts), len(evts))
// query by "t" tag
for _, tagVal := range tChoices {
expected := 0
for _, ie := range levts {
if ie.tag == tagVal {
expected++
}
}
evts = slices.Collect(il.QueryEvents(nostr.Filter{Tags: nostr.TagMap{"t": []string{tagVal}}}, 10000))
require.Equal(t, expected, len(evts))
}
// query with no parameters
allEvts := slices.Collect(il.QueryEvents(nostr.Filter{}, 10000))
require.Equal(t, len(levts), len(allEvts))
}
// build union of all events across all layers
allEventSet := make(map[string]nostr.Event)
for _, levts := range layerEvents {
for _, ie := range levts {
allEventSet[ie.evt.ID.String()] = ie.evt
}
}
// all events still accessible via GetByID
for _, evt := range allEventSet {
gotEvt, eventLayers := mmmm.GetByID(evt.ID)
require.NotNil(t, gotEvt)
require.NotEmpty(t, eventLayers)
require.Equal(t, evt.Content, gotEvt.Content)
}
verifyFreeRangesInvariants(t, mmmm)
mmmm.lmdbEnv.View(func(txn *lmdb.Txn) error {
cursor, err := txn.OpenCursor(mmmm.indexId)
require.NoError(t, err)
defer cursor.Close()
var allPositions []position
for _, val, err := cursor.Get(nil, nil, lmdb.First); err == nil; _, val, err = cursor.Get(nil, val, lmdb.Next) {
pos := positionFromBytes(val[0:12])
allPositions = append(allPositions, pos)
}
slices.SortFunc(allPositions, func(a, b position) int {
return cmp.Compare(a.start, b.start)
})
var lastEnd uint64
for _, pos := range allPositions {
if pos.start < lastEnd {
t.Fatalf("event overlap after defrag: %d < %d", pos.start, lastEnd)
}
lastEnd = pos.start + uint64(pos.size)
}
return nil
})
})
}
+5
View File
@@ -142,6 +142,11 @@ func FuzzTest(f *testing.F) {
mmmm.Rescan()
}
// perform random defrags -- shouldn't break the database
if rnd.UintN(3) == 1 {
mmmm.Defragment(len(deleted) / 3)
}
for id, deletedlayers := range deleted {
evt, foundlayers := mmmm.GetByID(id)
+1
View File
@@ -341,6 +341,7 @@ func (b *MultiMmapManager) removeAllReferencesFromLayer(txn *lmdb.Txn, layerId u
return nil
}
//go:inline
func (b *MultiMmapManager) loadEvent(pos position, eventReceiver *nostr.Event) error {
return betterbinary.Unmarshal(b.mmapf[pos.start:pos.start+uint64(pos.size)], eventReceiver)
}
+2 -5
View File
@@ -17,11 +17,6 @@ func (poss positions) find(start uint64) (idx int) {
return idx
}
func (poss positions) del(start uint64) positions {
idx := poss.find(start)
return slices.Delete(poss, idx, idx+1)
}
func (poss positions) String() string {
str := strings.Builder{}
str.Grow(10 + 20*len(poss))
@@ -46,6 +41,7 @@ func (pos position) isLarge() bool {
return pos.size >= LARGE_FREERANGE
}
//go:inline
func positionFromBytes(posb []byte) position {
return position{
size: binary.BigEndian.Uint32(posb[0:4]),
@@ -53,6 +49,7 @@ func positionFromBytes(posb []byte) position {
}
}
//go:inline
func writeBytesFromPosition(out []byte, pos position) {
binary.BigEndian.PutUint32(out[0:4], pos.size)
binary.BigEndian.PutUint64(out[4:12], pos.start)
+3 -1
View File
@@ -5,6 +5,7 @@ import (
"fmt"
"os"
"runtime"
"slices"
"syscall"
"unsafe"
@@ -119,7 +120,8 @@ func (b *MultiMmapManager) storeOn(
b.freeRangesLarge = b.freeRangesLarge[0 : len(b.freeRangesLarge)-1]
// also delete it from b.freeRangesAll
b.freeRangesAll = b.freeRangesAll.del(fr.start)
idx := b.freeRangesAll.find(fr.start)
b.freeRangesAll = slices.Delete(b.freeRangesAll, idx, idx+1)
} else {
// otherwise modify it in place
newFreeRange := position{
@@ -0,0 +1,2 @@
go test fuzz v1
int(-17)
@@ -0,0 +1,5 @@
go test fuzz v1
int(46)
uint(84)
uint(55)
uint(5)