mmm: free ranges tracking improved with b.freeRangesLarge and b.freeRangesAll
one is unsorted and fast and we only care about it with picking a new free range. the other is sorted and used when merging a new freed range with existing free ranges. both are computed from the events id index at beginning, then tracked manually on each addition or deletion. this change uncovered some errors so we fixed them and added some more fuzz test invariant checking. code is simplified a little bit. there was another thing I forgot.
This commit is contained in:
@@ -29,8 +29,6 @@ type Store interface {
|
||||
}
|
||||
```
|
||||
|
||||
[](https://pkg.go.dev/fiatjaf.com/nostr/eventstore) [](https://fiatjaf.com/nostr/eventstore/actions/workflows/test.yml)
|
||||
|
||||
## Available Implementations
|
||||
|
||||
- **bleve**: Full-text search and indexing using the Bleve search library
|
||||
|
||||
@@ -116,8 +116,7 @@ func (b *MultiMmapManager) Rescan() error {
|
||||
}
|
||||
}
|
||||
|
||||
b.freeRanges, err = b.gatherFreeRanges(mmmtxn)
|
||||
if err != nil {
|
||||
if err := b.gatherFreeRanges(mmmtxn); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
||||
@@ -8,10 +8,12 @@ import (
|
||||
"github.com/PowerDNS/lmdb-go/lmdb"
|
||||
)
|
||||
|
||||
func (b *MultiMmapManager) gatherFreeRanges(txn *lmdb.Txn) (positions, error) {
|
||||
const LARGE_FREERANGE = 142
|
||||
|
||||
func (b *MultiMmapManager) gatherFreeRanges(txn *lmdb.Txn) error {
|
||||
cursor, err := txn.OpenCursor(b.indexId)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to open cursor on indexId: %w", err)
|
||||
return fmt.Errorf("failed to open cursor on indexId: %w", err)
|
||||
}
|
||||
defer cursor.Close()
|
||||
|
||||
@@ -28,31 +30,35 @@ func (b *MultiMmapManager) gatherFreeRanges(txn *lmdb.Txn) (positions, error) {
|
||||
usedPositions = append(usedPositions, position{start: b.mmapfEnd, size: 0})
|
||||
|
||||
// calculate free ranges as gaps between used positions
|
||||
freeRanges := make(positions, 0, len(usedPositions)/2)
|
||||
b.freeRangesAll = make(positions, 0, len(usedPositions))
|
||||
b.freeRangesLarge = make([]position, 0, len(usedPositions)/10)
|
||||
var currentStart uint64 = 0
|
||||
for _, used := range usedPositions {
|
||||
if used.start > currentStart {
|
||||
// gap from currentStart to pos.start
|
||||
freeSize := used.start - currentStart
|
||||
if freeSize > 0 {
|
||||
freeRanges = append(freeRanges, position{
|
||||
fr := position{
|
||||
start: currentStart,
|
||||
size: uint32(freeSize),
|
||||
})
|
||||
}
|
||||
b.freeRangesAll = append(b.freeRangesAll, fr)
|
||||
if fr.isLarge() {
|
||||
b.freeRangesLarge = append(b.freeRangesLarge, fr)
|
||||
}
|
||||
}
|
||||
}
|
||||
currentStart = used.start + uint64(used.size)
|
||||
}
|
||||
|
||||
return freeRanges, nil
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b *MultiMmapManager) mergeNewFreeRange(newFreeRange position) {
|
||||
// use binary search to find the insertion point for the new pos
|
||||
idx, exists := slices.BinarySearchFunc(b.freeRanges, newFreeRange.start, func(item position, target uint64) int {
|
||||
idx, exists := slices.BinarySearchFunc(b.freeRangesAll, newFreeRange.start, func(item position, target uint64) int {
|
||||
return cmp.Compare(item.start, target)
|
||||
})
|
||||
|
||||
if exists {
|
||||
panic(fmt.Errorf("can't add free range that already exists: %s", newFreeRange))
|
||||
}
|
||||
@@ -62,7 +68,7 @@ func (b *MultiMmapManager) mergeNewFreeRange(newFreeRange position) {
|
||||
|
||||
// check the range immediately before
|
||||
if idx > 0 {
|
||||
before := b.freeRanges[idx-1]
|
||||
before := b.freeRangesAll[idx-1]
|
||||
if before.start+uint64(before.size) == newFreeRange.start {
|
||||
deleteStart = idx - 1
|
||||
deleting++
|
||||
@@ -72,8 +78,8 @@ func (b *MultiMmapManager) mergeNewFreeRange(newFreeRange position) {
|
||||
}
|
||||
|
||||
// check the range immediately after
|
||||
if idx < len(b.freeRanges) {
|
||||
after := b.freeRanges[idx]
|
||||
if idx < len(b.freeRangesAll) {
|
||||
after := b.freeRangesAll[idx]
|
||||
if newFreeRange.start+uint64(newFreeRange.size) == after.start {
|
||||
if deleteStart == -1 {
|
||||
deleteStart = idx
|
||||
@@ -87,13 +93,60 @@ func (b *MultiMmapManager) mergeNewFreeRange(newFreeRange position) {
|
||||
switch deleting {
|
||||
case 0:
|
||||
// if we are not deleting anything we must insert the new free range
|
||||
b.freeRanges = slices.Insert(b.freeRanges, idx, newFreeRange)
|
||||
b.freeRangesAll = slices.Insert(b.freeRangesAll, idx, newFreeRange)
|
||||
|
||||
// if it's large add it to the list of large free ranges
|
||||
if newFreeRange.isLarge() {
|
||||
b.freeRangesLarge = append(b.freeRangesLarge, newFreeRange)
|
||||
}
|
||||
case 1:
|
||||
deleted := b.freeRangesAll[deleteStart]
|
||||
|
||||
// if we're deleting a single range, don't delete it, modify it in-place instead.
|
||||
b.freeRanges[deleteStart] = newFreeRange
|
||||
b.freeRangesAll[deleteStart] = newFreeRange
|
||||
|
||||
// if the list we're modifying is in the list of large ranges modify it there too
|
||||
if deleted.isLarge() {
|
||||
for i, large := range b.freeRangesLarge {
|
||||
if large.start == deleted.start {
|
||||
b.freeRangesLarge[i] = newFreeRange
|
||||
break
|
||||
}
|
||||
}
|
||||
} else if newFreeRange.isLarge() {
|
||||
// otherwise: if after modification it's big enough we should add it to list of large ranges
|
||||
b.freeRangesLarge = append(b.freeRangesLarge, newFreeRange)
|
||||
}
|
||||
case 2:
|
||||
// now if we're deleting two ranges, delete just one instead and modify the other in place
|
||||
b.freeRanges[deleteStart] = newFreeRange
|
||||
b.freeRanges = slices.Delete(b.freeRanges, deleteStart+1, deleteStart+1+1)
|
||||
// now if we're deleting two ranges, delete the second instead and modify the first in place
|
||||
first := b.freeRangesAll[deleteStart]
|
||||
second := b.freeRangesAll[deleteStart+1]
|
||||
|
||||
b.freeRangesAll = slices.Delete(b.freeRangesAll, deleteStart+1, deleteStart+1+1)
|
||||
b.freeRangesAll[deleteStart] = newFreeRange
|
||||
|
||||
// if the second was in the list of large lists delete it from there too
|
||||
if second.isLarge() {
|
||||
for i, large := range b.freeRangesLarge {
|
||||
if large.start == second.start {
|
||||
b.freeRangesLarge[i] = b.freeRangesLarge[len(b.freeRangesLarge)-1]
|
||||
b.freeRangesLarge = b.freeRangesLarge[0 : len(b.freeRangesLarge)-1]
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// if the list we're modifying (the first) is already in the list of large ranges modify it there too
|
||||
if first.isLarge() {
|
||||
for i, large := range b.freeRangesLarge {
|
||||
if large.start == first.start {
|
||||
b.freeRangesLarge[i] = newFreeRange
|
||||
break
|
||||
}
|
||||
}
|
||||
} else if newFreeRange.isLarge() {
|
||||
// otherwise if after modification has become big enough we should add it to list of large ranges
|
||||
b.freeRangesLarge = append(b.freeRangesLarge, newFreeRange)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -45,7 +45,7 @@ func FuzzFreeRanges(f *testing.F) {
|
||||
|
||||
total := 0
|
||||
for {
|
||||
freeBefore, spaceBefore := countUsableFreeRanges(mmmm)
|
||||
freeBefore, spaceBefore := countUsableFreeRanges(t, mmmm)
|
||||
|
||||
hasAdded := false
|
||||
for i := range rnd.IntN(40) {
|
||||
@@ -69,7 +69,7 @@ func FuzzFreeRanges(f *testing.F) {
|
||||
total++
|
||||
}
|
||||
|
||||
freeAfter, spaceAfter := countUsableFreeRanges(mmmm)
|
||||
freeAfter, spaceAfter := countUsableFreeRanges(t, mmmm)
|
||||
if hasAdded && freeBefore > 0 {
|
||||
require.Lessf(t, spaceAfter, spaceBefore, "must use some of the existing free ranges when inserting new events (before: %d, after: %d)", freeBefore, freeAfter)
|
||||
}
|
||||
@@ -86,9 +86,35 @@ func FuzzFreeRanges(f *testing.F) {
|
||||
}
|
||||
}
|
||||
|
||||
verifyFreeRangesInvariants(t, mmmm)
|
||||
|
||||
// add more events
|
||||
for i := range rnd.IntN(40) {
|
||||
content := "1"
|
||||
if i > 0 {
|
||||
content = strings.Repeat("z", rnd.IntN(1000))
|
||||
}
|
||||
|
||||
evt := nostr.Event{
|
||||
CreatedAt: nostr.Timestamp(rnd.Uint32()),
|
||||
Kind: 1,
|
||||
Content: content,
|
||||
Tags: nostr.Tags{},
|
||||
}
|
||||
evt.Sign(sk)
|
||||
err := il.SaveEvent(evt)
|
||||
require.NoError(t, err)
|
||||
|
||||
total++
|
||||
}
|
||||
|
||||
verifyFreeRangesInvariants(t, mmmm)
|
||||
|
||||
mmmm.lmdbEnv.View(func(txn *lmdb.Txn) error {
|
||||
expectedFreeRanges, _ := mmmm.gatherFreeRanges(txn)
|
||||
require.Equalf(t, expectedFreeRanges, mmmm.freeRanges, "expected %s, got %s", expectedFreeRanges, mmmm.freeRanges)
|
||||
before := mmmm.freeRangesAll
|
||||
err := mmmm.gatherFreeRanges(txn)
|
||||
require.NoError(t, err)
|
||||
require.Equalf(t, mmmm.freeRangesAll, before, "expected %s, got %s", before, mmmm.freeRangesAll)
|
||||
return nil
|
||||
})
|
||||
|
||||
@@ -99,12 +125,54 @@ func FuzzFreeRanges(f *testing.F) {
|
||||
})
|
||||
}
|
||||
|
||||
func countUsableFreeRanges(mmmm *MultiMmapManager) (count int, space int) {
|
||||
for _, fr := range mmmm.freeRanges {
|
||||
if fr.size >= 142 {
|
||||
func countUsableFreeRanges(t *testing.T, mmmm *MultiMmapManager) (count int, space int) {
|
||||
for _, fr := range mmmm.freeRangesAll {
|
||||
if fr.size >= LARGE_FREERANGE {
|
||||
count++
|
||||
space += int(fr.size)
|
||||
}
|
||||
}
|
||||
|
||||
require.Equal(t, count, len(mmmm.freeRangesLarge))
|
||||
|
||||
return count, space
|
||||
}
|
||||
|
||||
func verifyFreeRangesInvariants(t *testing.T, mmmm *MultiMmapManager) {
|
||||
all := mmmm.freeRangesAll
|
||||
large := mmmm.freeRangesLarge
|
||||
|
||||
for _, l := range large {
|
||||
found := false
|
||||
for _, a := range all {
|
||||
if l.start == a.start && l.size == a.size {
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
require.True(t, found, "large range %v not found in all ranges", l)
|
||||
}
|
||||
|
||||
for i := 1; i < len(all); i++ {
|
||||
require.Greater(t, all[i].start, all[i-1].start, "all ranges should be sorted by start")
|
||||
}
|
||||
|
||||
for i := range all {
|
||||
for j := i + 1; j < len(all); j++ {
|
||||
end1 := all[i].start + uint64(all[i].size)
|
||||
end2 := all[j].start + uint64(all[j].size)
|
||||
require.False(t, (all[i].start >= all[j].start && all[i].start < end2) ||
|
||||
(all[j].start >= all[i].start && all[j].start < end1),
|
||||
"ranges %v and %v overlap", all[i], all[j])
|
||||
}
|
||||
}
|
||||
|
||||
mmmm.lmdbEnv.View(func(txn *lmdb.Txn) error {
|
||||
before := make(positions, len(mmmm.freeRangesAll))
|
||||
copy(before, mmmm.freeRangesAll)
|
||||
err := mmmm.gatherFreeRanges(txn)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, before, mmmm.freeRangesAll, "recomputing free ranges should yield the same result")
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
@@ -41,7 +41,8 @@ type MultiMmapManager struct {
|
||||
knownLayers lmdb.DBI
|
||||
indexId lmdb.DBI
|
||||
|
||||
freeRanges positions
|
||||
freeRangesAll positions // sorted by position
|
||||
freeRangesLarge []position // unsorted
|
||||
}
|
||||
|
||||
func (b *MultiMmapManager) String() string {
|
||||
@@ -139,13 +140,12 @@ func (b *MultiMmapManager) Init() error {
|
||||
|
||||
if !b.ReadOnly {
|
||||
// scan index table to calculate free ranges from used positions
|
||||
b.freeRanges, err = b.gatherFreeRanges(txn)
|
||||
if err != nil {
|
||||
if err := b.gatherFreeRanges(txn); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
logOp := b.Logger.Debug()
|
||||
for _, pos := range b.freeRanges {
|
||||
for _, pos := range b.freeRangesLarge {
|
||||
if pos.size > 20 {
|
||||
logOp = logOp.Uint32(fmt.Sprintf("%d", pos.start), pos.size)
|
||||
}
|
||||
|
||||
@@ -1,22 +1,35 @@
|
||||
package mmm
|
||||
|
||||
import (
|
||||
"cmp"
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
"slices"
|
||||
"strings"
|
||||
)
|
||||
|
||||
type positions []position
|
||||
|
||||
func (poss positions) find(start uint64) (idx int) {
|
||||
idx, _ = slices.BinarySearchFunc(poss, start, func(item position, target uint64) int {
|
||||
return cmp.Compare(item.start, target)
|
||||
})
|
||||
return idx
|
||||
}
|
||||
|
||||
func (poss positions) del(start uint64) positions {
|
||||
idx := poss.find(start)
|
||||
return slices.Delete(poss, idx, idx+1)
|
||||
}
|
||||
|
||||
func (poss positions) String() string {
|
||||
str := strings.Builder{}
|
||||
str.Grow(10 + 20*len(poss))
|
||||
str.WriteString("positions:[")
|
||||
for _, pos := range poss {
|
||||
str.WriteByte(' ')
|
||||
str.WriteString(pos.String())
|
||||
}
|
||||
str.WriteString(" ]")
|
||||
str.WriteString("]")
|
||||
return str.String()
|
||||
}
|
||||
|
||||
@@ -29,6 +42,10 @@ func (pos position) String() string {
|
||||
return fmt.Sprintf("<%d|%d|%d>", pos.start, pos.size, pos.start+uint64(pos.size))
|
||||
}
|
||||
|
||||
func (pos position) isLarge() bool {
|
||||
return pos.size >= LARGE_FREERANGE
|
||||
}
|
||||
|
||||
func positionFromBytes(posb []byte) position {
|
||||
return position{
|
||||
size: binary.BigEndian.Uint32(posb[0:4]),
|
||||
|
||||
@@ -180,8 +180,8 @@ func (il *IndexingLayer) query(txn *lmdb.Txn, filter nostr.Filter, limit int, yi
|
||||
// decode the entire thing
|
||||
event := nostr.Event{}
|
||||
if err := betterbinary.Unmarshal(bin, &event); err != nil {
|
||||
log.Printf("lmdb: value read error (id %x) on query prefix %x sp %x dbi %v: %s\n",
|
||||
betterbinary.GetID(bin), iterators[i].query.prefix, iterators[i].query.startingPoint, iterators[i].query.dbi, err)
|
||||
log.Printf("mmm: value read error (id %s) on query prefix %x sp %x dbi %v: %s\n",
|
||||
betterbinary.GetID(bin).Hex(), iterators[i].query.prefix, iterators[i].query.startingPoint, iterators[i].query.dbi, err)
|
||||
continue
|
||||
}
|
||||
|
||||
|
||||
+22
-7
@@ -5,7 +5,6 @@ import (
|
||||
"fmt"
|
||||
"os"
|
||||
"runtime"
|
||||
"slices"
|
||||
"syscall"
|
||||
"unsafe"
|
||||
|
||||
@@ -104,25 +103,41 @@ func (b *MultiMmapManager) storeOn(
|
||||
return false, fmt.Errorf("event too large to store, max %d, got %d", 1<<16, pos.size)
|
||||
}
|
||||
|
||||
// find a suitable place for this to be stored in
|
||||
// find a suitable place for this to be stored in (search only large free ranges)
|
||||
appendToMmap := true
|
||||
for f, fr := range b.freeRanges {
|
||||
for f, fr := range b.freeRangesLarge {
|
||||
if fr.size >= pos.size {
|
||||
// found the smallest possible place that can fit this event
|
||||
// found a place that can fit this event
|
||||
appendToMmap = false
|
||||
pos.start = fr.start
|
||||
|
||||
// modify the free ranges we're keeping track of
|
||||
// (in case of conflict we lose this free range but it's ok, it will be recovered on the next startup)
|
||||
if pos.size == fr.size {
|
||||
// if we've used it entirely just delete it
|
||||
b.freeRanges = slices.Delete(b.freeRanges, f, f+1)
|
||||
// if we've used it entirely just delete it (swap-delete since it's unsorted)
|
||||
b.freeRangesLarge[f] = b.freeRangesLarge[len(b.freeRangesLarge)-1]
|
||||
b.freeRangesLarge = b.freeRangesLarge[0 : len(b.freeRangesLarge)-1]
|
||||
|
||||
// also delete it from b.freeRangesAll
|
||||
b.freeRangesAll = b.freeRangesAll.del(fr.start)
|
||||
} else {
|
||||
// otherwise modify it in place
|
||||
b.freeRanges[f] = position{
|
||||
newFreeRange := position{
|
||||
start: fr.start + uint64(pos.size),
|
||||
size: fr.size - pos.size,
|
||||
}
|
||||
// only keep it in freeRangesLarge if it's still large enough
|
||||
if newFreeRange.size >= LARGE_FREERANGE {
|
||||
b.freeRangesLarge[f] = newFreeRange
|
||||
} else {
|
||||
// remove it from freeRangesLarge if it's no longer large enough
|
||||
b.freeRangesLarge[f] = b.freeRangesLarge[len(b.freeRangesLarge)-1]
|
||||
b.freeRangesLarge = b.freeRangesLarge[0 : len(b.freeRangesLarge)-1]
|
||||
}
|
||||
|
||||
// also modify it in b.freeRangesAll
|
||||
idx := b.freeRangesAll.find(fr.start)
|
||||
b.freeRangesAll[idx] = newFreeRange
|
||||
}
|
||||
|
||||
break
|
||||
|
||||
Reference in New Issue
Block a user