diff --git a/eventstore/README.md b/eventstore/README.md index 5c11f9a..6875022 100644 --- a/eventstore/README.md +++ b/eventstore/README.md @@ -29,8 +29,6 @@ type Store interface { } ``` -[![Go Reference](https://pkg.go.dev/badge/fiatjaf.com/nostr/eventstore.svg)](https://pkg.go.dev/fiatjaf.com/nostr/eventstore) [![Run Tests](https://fiatjaf.com/nostr/eventstore/actions/workflows/test.yml/badge.svg)](https://fiatjaf.com/nostr/eventstore/actions/workflows/test.yml) - ## Available Implementations - **bleve**: Full-text search and indexing using the Bleve search library diff --git a/eventstore/mmm/fix.go b/eventstore/mmm/fix.go index 32aae2a..dba4f59 100644 --- a/eventstore/mmm/fix.go +++ b/eventstore/mmm/fix.go @@ -116,8 +116,7 @@ func (b *MultiMmapManager) Rescan() error { } } - b.freeRanges, err = b.gatherFreeRanges(mmmtxn) - if err != nil { + if err := b.gatherFreeRanges(mmmtxn); err != nil { return err } diff --git a/eventstore/mmm/freeranges.go b/eventstore/mmm/freeranges.go index 985a3ba..c2198e1 100644 --- a/eventstore/mmm/freeranges.go +++ b/eventstore/mmm/freeranges.go @@ -8,10 +8,12 @@ import ( "github.com/PowerDNS/lmdb-go/lmdb" ) -func (b *MultiMmapManager) gatherFreeRanges(txn *lmdb.Txn) (positions, error) { +const LARGE_FREERANGE = 142 + +func (b *MultiMmapManager) gatherFreeRanges(txn *lmdb.Txn) error { cursor, err := txn.OpenCursor(b.indexId) if err != nil { - return nil, fmt.Errorf("failed to open cursor on indexId: %w", err) + return fmt.Errorf("failed to open cursor on indexId: %w", err) } defer cursor.Close() @@ -28,31 +30,35 @@ func (b *MultiMmapManager) gatherFreeRanges(txn *lmdb.Txn) (positions, error) { usedPositions = append(usedPositions, position{start: b.mmapfEnd, size: 0}) // calculate free ranges as gaps between used positions - freeRanges := make(positions, 0, len(usedPositions)/2) + b.freeRangesAll = make(positions, 0, len(usedPositions)) + b.freeRangesLarge = make([]position, 0, len(usedPositions)/10) var currentStart uint64 = 0 for _, used := range usedPositions { if used.start > currentStart { // gap from currentStart to pos.start freeSize := used.start - currentStart if freeSize > 0 { - freeRanges = append(freeRanges, position{ + fr := position{ start: currentStart, size: uint32(freeSize), - }) + } + b.freeRangesAll = append(b.freeRangesAll, fr) + if fr.isLarge() { + b.freeRangesLarge = append(b.freeRangesLarge, fr) + } } } currentStart = used.start + uint64(used.size) } - return freeRanges, nil + return nil } func (b *MultiMmapManager) mergeNewFreeRange(newFreeRange position) { // use binary search to find the insertion point for the new pos - idx, exists := slices.BinarySearchFunc(b.freeRanges, newFreeRange.start, func(item position, target uint64) int { + idx, exists := slices.BinarySearchFunc(b.freeRangesAll, newFreeRange.start, func(item position, target uint64) int { return cmp.Compare(item.start, target) }) - if exists { panic(fmt.Errorf("can't add free range that already exists: %s", newFreeRange)) } @@ -62,7 +68,7 @@ func (b *MultiMmapManager) mergeNewFreeRange(newFreeRange position) { // check the range immediately before if idx > 0 { - before := b.freeRanges[idx-1] + before := b.freeRangesAll[idx-1] if before.start+uint64(before.size) == newFreeRange.start { deleteStart = idx - 1 deleting++ @@ -72,8 +78,8 @@ func (b *MultiMmapManager) mergeNewFreeRange(newFreeRange position) { } // check the range immediately after - if idx < len(b.freeRanges) { - after := b.freeRanges[idx] + if idx < len(b.freeRangesAll) { + after := b.freeRangesAll[idx] if newFreeRange.start+uint64(newFreeRange.size) == after.start { if deleteStart == -1 { deleteStart = idx @@ -87,13 +93,60 @@ func (b *MultiMmapManager) mergeNewFreeRange(newFreeRange position) { switch deleting { case 0: // if we are not deleting anything we must insert the new free range - b.freeRanges = slices.Insert(b.freeRanges, idx, newFreeRange) + b.freeRangesAll = slices.Insert(b.freeRangesAll, idx, newFreeRange) + + // if it's large add it to the list of large free ranges + if newFreeRange.isLarge() { + b.freeRangesLarge = append(b.freeRangesLarge, newFreeRange) + } case 1: + deleted := b.freeRangesAll[deleteStart] + // if we're deleting a single range, don't delete it, modify it in-place instead. - b.freeRanges[deleteStart] = newFreeRange + b.freeRangesAll[deleteStart] = newFreeRange + + // if the list we're modifying is in the list of large ranges modify it there too + if deleted.isLarge() { + for i, large := range b.freeRangesLarge { + if large.start == deleted.start { + b.freeRangesLarge[i] = newFreeRange + break + } + } + } else if newFreeRange.isLarge() { + // otherwise: if after modification it's big enough we should add it to list of large ranges + b.freeRangesLarge = append(b.freeRangesLarge, newFreeRange) + } case 2: - // now if we're deleting two ranges, delete just one instead and modify the other in place - b.freeRanges[deleteStart] = newFreeRange - b.freeRanges = slices.Delete(b.freeRanges, deleteStart+1, deleteStart+1+1) + // now if we're deleting two ranges, delete the second instead and modify the first in place + first := b.freeRangesAll[deleteStart] + second := b.freeRangesAll[deleteStart+1] + + b.freeRangesAll = slices.Delete(b.freeRangesAll, deleteStart+1, deleteStart+1+1) + b.freeRangesAll[deleteStart] = newFreeRange + + // if the second was in the list of large lists delete it from there too + if second.isLarge() { + for i, large := range b.freeRangesLarge { + if large.start == second.start { + b.freeRangesLarge[i] = b.freeRangesLarge[len(b.freeRangesLarge)-1] + b.freeRangesLarge = b.freeRangesLarge[0 : len(b.freeRangesLarge)-1] + break + } + } + } + + // if the list we're modifying (the first) is already in the list of large ranges modify it there too + if first.isLarge() { + for i, large := range b.freeRangesLarge { + if large.start == first.start { + b.freeRangesLarge[i] = newFreeRange + break + } + } + } else if newFreeRange.isLarge() { + // otherwise if after modification has become big enough we should add it to list of large ranges + b.freeRangesLarge = append(b.freeRangesLarge, newFreeRange) + } } } diff --git a/eventstore/mmm/freeranges_test.go b/eventstore/mmm/freeranges_test.go index db5a08f..7b101e6 100644 --- a/eventstore/mmm/freeranges_test.go +++ b/eventstore/mmm/freeranges_test.go @@ -45,7 +45,7 @@ func FuzzFreeRanges(f *testing.F) { total := 0 for { - freeBefore, spaceBefore := countUsableFreeRanges(mmmm) + freeBefore, spaceBefore := countUsableFreeRanges(t, mmmm) hasAdded := false for i := range rnd.IntN(40) { @@ -69,7 +69,7 @@ func FuzzFreeRanges(f *testing.F) { total++ } - freeAfter, spaceAfter := countUsableFreeRanges(mmmm) + freeAfter, spaceAfter := countUsableFreeRanges(t, mmmm) if hasAdded && freeBefore > 0 { require.Lessf(t, spaceAfter, spaceBefore, "must use some of the existing free ranges when inserting new events (before: %d, after: %d)", freeBefore, freeAfter) } @@ -86,9 +86,35 @@ func FuzzFreeRanges(f *testing.F) { } } + verifyFreeRangesInvariants(t, mmmm) + + // add more events + for i := range rnd.IntN(40) { + content := "1" + if i > 0 { + content = strings.Repeat("z", rnd.IntN(1000)) + } + + evt := nostr.Event{ + CreatedAt: nostr.Timestamp(rnd.Uint32()), + Kind: 1, + Content: content, + Tags: nostr.Tags{}, + } + evt.Sign(sk) + err := il.SaveEvent(evt) + require.NoError(t, err) + + total++ + } + + verifyFreeRangesInvariants(t, mmmm) + mmmm.lmdbEnv.View(func(txn *lmdb.Txn) error { - expectedFreeRanges, _ := mmmm.gatherFreeRanges(txn) - require.Equalf(t, expectedFreeRanges, mmmm.freeRanges, "expected %s, got %s", expectedFreeRanges, mmmm.freeRanges) + before := mmmm.freeRangesAll + err := mmmm.gatherFreeRanges(txn) + require.NoError(t, err) + require.Equalf(t, mmmm.freeRangesAll, before, "expected %s, got %s", before, mmmm.freeRangesAll) return nil }) @@ -99,12 +125,54 @@ func FuzzFreeRanges(f *testing.F) { }) } -func countUsableFreeRanges(mmmm *MultiMmapManager) (count int, space int) { - for _, fr := range mmmm.freeRanges { - if fr.size >= 142 { +func countUsableFreeRanges(t *testing.T, mmmm *MultiMmapManager) (count int, space int) { + for _, fr := range mmmm.freeRangesAll { + if fr.size >= LARGE_FREERANGE { count++ space += int(fr.size) } } + + require.Equal(t, count, len(mmmm.freeRangesLarge)) + return count, space } + +func verifyFreeRangesInvariants(t *testing.T, mmmm *MultiMmapManager) { + all := mmmm.freeRangesAll + large := mmmm.freeRangesLarge + + for _, l := range large { + found := false + for _, a := range all { + if l.start == a.start && l.size == a.size { + found = true + break + } + } + require.True(t, found, "large range %v not found in all ranges", l) + } + + for i := 1; i < len(all); i++ { + require.Greater(t, all[i].start, all[i-1].start, "all ranges should be sorted by start") + } + + for i := range all { + for j := i + 1; j < len(all); j++ { + end1 := all[i].start + uint64(all[i].size) + end2 := all[j].start + uint64(all[j].size) + require.False(t, (all[i].start >= all[j].start && all[i].start < end2) || + (all[j].start >= all[i].start && all[j].start < end1), + "ranges %v and %v overlap", all[i], all[j]) + } + } + + mmmm.lmdbEnv.View(func(txn *lmdb.Txn) error { + before := make(positions, len(mmmm.freeRangesAll)) + copy(before, mmmm.freeRangesAll) + err := mmmm.gatherFreeRanges(txn) + require.NoError(t, err) + require.Equal(t, before, mmmm.freeRangesAll, "recomputing free ranges should yield the same result") + return nil + }) +} diff --git a/eventstore/mmm/mmmm.go b/eventstore/mmm/mmmm.go index 269c60f..1da424d 100644 --- a/eventstore/mmm/mmmm.go +++ b/eventstore/mmm/mmmm.go @@ -41,7 +41,8 @@ type MultiMmapManager struct { knownLayers lmdb.DBI indexId lmdb.DBI - freeRanges positions + freeRangesAll positions // sorted by position + freeRangesLarge []position // unsorted } func (b *MultiMmapManager) String() string { @@ -139,13 +140,12 @@ func (b *MultiMmapManager) Init() error { if !b.ReadOnly { // scan index table to calculate free ranges from used positions - b.freeRanges, err = b.gatherFreeRanges(txn) - if err != nil { + if err := b.gatherFreeRanges(txn); err != nil { return err } logOp := b.Logger.Debug() - for _, pos := range b.freeRanges { + for _, pos := range b.freeRangesLarge { if pos.size > 20 { logOp = logOp.Uint32(fmt.Sprintf("%d", pos.start), pos.size) } diff --git a/eventstore/mmm/position.go b/eventstore/mmm/position.go index c0bb6ec..21b11d0 100644 --- a/eventstore/mmm/position.go +++ b/eventstore/mmm/position.go @@ -1,22 +1,35 @@ package mmm import ( + "cmp" "encoding/binary" "fmt" + "slices" "strings" ) type positions []position +func (poss positions) find(start uint64) (idx int) { + idx, _ = slices.BinarySearchFunc(poss, start, func(item position, target uint64) int { + return cmp.Compare(item.start, target) + }) + return idx +} + +func (poss positions) del(start uint64) positions { + idx := poss.find(start) + return slices.Delete(poss, idx, idx+1) +} + func (poss positions) String() string { str := strings.Builder{} str.Grow(10 + 20*len(poss)) str.WriteString("positions:[") for _, pos := range poss { - str.WriteByte(' ') str.WriteString(pos.String()) } - str.WriteString(" ]") + str.WriteString("]") return str.String() } @@ -29,6 +42,10 @@ func (pos position) String() string { return fmt.Sprintf("<%d|%d|%d>", pos.start, pos.size, pos.start+uint64(pos.size)) } +func (pos position) isLarge() bool { + return pos.size >= LARGE_FREERANGE +} + func positionFromBytes(posb []byte) position { return position{ size: binary.BigEndian.Uint32(posb[0:4]), diff --git a/eventstore/mmm/query.go b/eventstore/mmm/query.go index 26729f0..4d5c837 100644 --- a/eventstore/mmm/query.go +++ b/eventstore/mmm/query.go @@ -180,8 +180,8 @@ func (il *IndexingLayer) query(txn *lmdb.Txn, filter nostr.Filter, limit int, yi // decode the entire thing event := nostr.Event{} if err := betterbinary.Unmarshal(bin, &event); err != nil { - log.Printf("lmdb: value read error (id %x) on query prefix %x sp %x dbi %v: %s\n", - betterbinary.GetID(bin), iterators[i].query.prefix, iterators[i].query.startingPoint, iterators[i].query.dbi, err) + log.Printf("mmm: value read error (id %s) on query prefix %x sp %x dbi %v: %s\n", + betterbinary.GetID(bin).Hex(), iterators[i].query.prefix, iterators[i].query.startingPoint, iterators[i].query.dbi, err) continue } diff --git a/eventstore/mmm/save.go b/eventstore/mmm/save.go index 9fa55b7..3bf5e9c 100644 --- a/eventstore/mmm/save.go +++ b/eventstore/mmm/save.go @@ -5,7 +5,6 @@ import ( "fmt" "os" "runtime" - "slices" "syscall" "unsafe" @@ -104,25 +103,41 @@ func (b *MultiMmapManager) storeOn( return false, fmt.Errorf("event too large to store, max %d, got %d", 1<<16, pos.size) } - // find a suitable place for this to be stored in + // find a suitable place for this to be stored in (search only large free ranges) appendToMmap := true - for f, fr := range b.freeRanges { + for f, fr := range b.freeRangesLarge { if fr.size >= pos.size { - // found the smallest possible place that can fit this event + // found a place that can fit this event appendToMmap = false pos.start = fr.start // modify the free ranges we're keeping track of // (in case of conflict we lose this free range but it's ok, it will be recovered on the next startup) if pos.size == fr.size { - // if we've used it entirely just delete it - b.freeRanges = slices.Delete(b.freeRanges, f, f+1) + // if we've used it entirely just delete it (swap-delete since it's unsorted) + b.freeRangesLarge[f] = b.freeRangesLarge[len(b.freeRangesLarge)-1] + b.freeRangesLarge = b.freeRangesLarge[0 : len(b.freeRangesLarge)-1] + + // also delete it from b.freeRangesAll + b.freeRangesAll = b.freeRangesAll.del(fr.start) } else { // otherwise modify it in place - b.freeRanges[f] = position{ + newFreeRange := position{ start: fr.start + uint64(pos.size), size: fr.size - pos.size, } + // only keep it in freeRangesLarge if it's still large enough + if newFreeRange.size >= LARGE_FREERANGE { + b.freeRangesLarge[f] = newFreeRange + } else { + // remove it from freeRangesLarge if it's no longer large enough + b.freeRangesLarge[f] = b.freeRangesLarge[len(b.freeRangesLarge)-1] + b.freeRangesLarge = b.freeRangesLarge[0 : len(b.freeRangesLarge)-1] + } + + // also modify it in b.freeRangesAll + idx := b.freeRangesAll.find(fr.start) + b.freeRangesAll[idx] = newFreeRange } break