diff --git a/eventstore/mmm/freeranges.go b/eventstore/mmm/freeranges.go index d17133a..c9ad91d 100644 --- a/eventstore/mmm/freeranges.go +++ b/eventstore/mmm/freeranges.go @@ -170,6 +170,17 @@ func (b *MultiMmapManager) mergeNewFreeRange(newFreeRange position) { } func (b *MultiMmapManager) Defragment(n int) error { + for range min(n, len(b.freeRangesAll)-1) { + if err := b.DefragmentOne(); err != nil { + return err + } + } + + return nil +} + +// Defragment a single free range +func (b *MultiMmapManager) DefragmentOne() error { if b.ReadOnly { return ReadOnly } @@ -180,10 +191,7 @@ func (b *MultiMmapManager) Defragment(n int) error { runtime.LockOSThread() defer runtime.UnlockOSThread() - if n > len(b.freeRangesAll)-1 { - n = len(b.freeRangesAll) - 1 - } - if n <= 0 { + if len(b.freeRangesAll) < 2 { return nil } @@ -204,110 +212,119 @@ func (b *MultiMmapManager) Defragment(n int) error { } }() - // iterate only through `n` free ranges, as specified - for i := 0; i < n; i++ { - fr := b.freeRangesAll[i] + // will put stuff into the first free range + fr := b.freeRangesAll[0] - // where the free range ends, the events start (any number of them) - eventsStart := fr.start + uint64(fr.size) - eventsEnd := b.freeRangesAll[i+1].start // and they end when the next free range starts + // where the free range ends, the events start (any number of them) + eventsStart := fr.start + uint64(fr.size) + eventsEnd := b.freeRangesAll[1].start // and they end when the next free range starts - c := uint64(0) // this tracks our relative position inside the events section - for (eventsStart + c) < eventsEnd { - var evt nostr.Event - if err := betterbinary.Unmarshal(b.mmapf[(eventsStart+c):eventsEnd], &evt); err != nil { - id := betterbinary.GetID(b.mmapf[(eventsStart + c):eventsEnd]) - return fmt.Errorf("failed to read event (%x) from mmap: %w", id[:], err) - } + fmt.Println("# defrag", fr, eventsStart, eventsEnd) - // now that we have an event we'll update its pos on the id index and on every layer: - oldVal, err := mmmtxn.Get(b.indexId, evt.ID[0:8]) - if err != nil { - return fmt.Errorf("failed to read val (%x) from index: %w", evt.ID[:], err) - } + c := uint64(0) // this tracks our relative position inside the events section + for (eventsStart + c) < eventsEnd { + var evt nostr.Event + if err := betterbinary.Unmarshal(b.mmapf[(eventsStart+c):eventsEnd], &evt); err != nil { + id := betterbinary.GetID(b.mmapf[(eventsStart + c):eventsEnd]) + return fmt.Errorf("failed to read event (%x) from mmap: %w", id[:], err) + } - // current position - pos := positionFromBytes(oldVal[0:12]) + // now that we have an event we'll update its pos on the id index and on every layer: + oldVal, err := mmmtxn.Get(b.indexId, evt.ID[0:8]) + if err != nil { + return fmt.Errorf("failed to read val (%x) from index: %w", evt.ID[:], err) + } - // new position (from the beginning of the free range before + relative position) - pos.start = fr.start + uint64(c) + // current position + pos := positionFromBytes(oldVal[0:12]) - // update this cursor - c += uint64(pos.size) + // new position (from the beginning of the free range before + relative position) + fmt.Println(" moving event", evt.ID, "from", pos) + pos.start = fr.start + uint64(c) - // prepare and save id index - newVal := make([]byte, len(oldVal)) - writeBytesFromPosition(newVal, pos) - copy(newVal[12:], oldVal[12:]) - if err := mmmtxn.Put(b.indexId, evt.ID[0:8], newVal, 0); err != nil { - return fmt.Errorf("failed to write new pos to id index: %w", err) - } + // update this cursor + c += uint64(pos.size) + fmt.Println(" to", pos, "...", c, "layers:", oldVal[12:]) - for s := 12; s < len(oldVal); s += 2 { - layer := binary.BigEndian.Uint16(oldVal[s : s+2]) - lt, ok := layerTxns[layer] - if !ok { - il := b.layers.ByID(layer) - if il == nil { - fmt.Println(b.layers) - panic(fmt.Errorf("missing layer %d", layer)) - } - txn, err := il.lmdbEnv.BeginTxn(nil, 0) - if err != nil { - return fmt.Errorf("failed to begin layer txn for layer %d: %w", il.id, err) - } - txn.RawRead = true - lt = &layerTxn{il: il, txn: txn} - layerTxns[il.id] = lt + // prepare and save id index + newVal := make([]byte, len(oldVal)) + writeBytesFromPosition(newVal, pos) + copy(newVal[12:], oldVal[12:]) + if err := mmmtxn.Put(b.indexId, evt.ID[0:8], newVal, 0); err != nil { + return fmt.Errorf("failed to write new pos to id index: %w", err) + } + + for s := 12; s < len(oldVal); s += 2 { + layer := binary.BigEndian.Uint16(oldVal[s : s+2]) + lt, ok := layerTxns[layer] + if !ok { + il := b.layers.ByID(layer) + if il == nil { + fmt.Println(b.layers) + panic(fmt.Errorf("missing layer %d", layer)) } + txn, err := il.lmdbEnv.BeginTxn(nil, 0) + if err != nil { + return fmt.Errorf("failed to begin layer txn for layer %d: %w", il.id, err) + } + txn.RawRead = true + lt = &layerTxn{il: il, txn: txn} + layerTxns[il.id] = lt + } - for k := range lt.il.getIndexKeysForEvent(evt) { - if err := lt.txn.Del(k.dbi, k.key, oldVal[0:12]); err != nil { - return fmt.Errorf("failed to delete old index entry for %x: %w", evt.ID[:], err) - } - if err := lt.txn.Put(k.dbi, k.key, newVal[0:12], 0); err != nil { - return fmt.Errorf("failed to insert new index entry for %x: %w", evt.ID[:], err) - } + fmt.Println(" layer", lt.il.id) + + for k := range lt.il.getIndexKeysForEvent(evt) { + fmt.Println(" index", k.dbi, k.key) + if err := lt.txn.Del(k.dbi, k.key, oldVal[0:12]); err != nil { + return fmt.Errorf("failed to delete old index entry for %x: %w", evt.ID[:], err) + } + if err := lt.txn.Put(k.dbi, k.key, newVal[0:12], 0); err != nil { + return fmt.Errorf("failed to insert new index entry for %x: %w", evt.ID[:], err) } } } + } - // now that we have updated all the pointers, just copy all the bytes between the two free ranges - copy(b.mmapf[fr.start:], b.mmapf[fr.start+uint64(fr.size):eventsEnd]) + // now that we have updated all the pointers, just copy all the bytes between the two free ranges + copy(b.mmapf[fr.start:], b.mmapf[fr.start+uint64(fr.size):eventsEnd]) - // delete this free range if it's one of the big ones - if fr.isLarge() { - for l, lfr := range b.freeRangesLarge { - if lfr.start == fr.start { - b.freeRangesLarge[l] = b.freeRangesLarge[len(b.freeRangesLarge)-1] - b.freeRangesLarge = b.freeRangesLarge[0 : len(b.freeRangesLarge)-1] - break - } + // delete this free range if it's one of the big ones + if fr.isLarge() { + for l, lfr := range b.freeRangesLarge { + if lfr.start == fr.start { + fmt.Println(" deleting large fr", l, lfr) + b.freeRangesLarge[l] = b.freeRangesLarge[len(b.freeRangesLarge)-1] + b.freeRangesLarge = b.freeRangesLarge[0 : len(b.freeRangesLarge)-1] + break } } + } - // now we have some space left at the end of this events section that is a free range - remainingSpaceStart := fr.start + c + // now we have some space left at the end of this events section that is a free range + remainingSpaceStart := fr.start + c - // it must be merged with the next free range - updated := position{ - start: remainingSpaceStart, - size: b.freeRangesAll[i+1].size + uint32(eventsEnd) - uint32(remainingSpaceStart), - } - nextWasLarge := b.freeRangesAll[i+1].isLarge() - b.freeRangesAll[i+1] = updated + // it must be merged with the next free range + updated := position{ + start: remainingSpaceStart, + size: b.freeRangesAll[1].size + uint32(eventsEnd) - uint32(remainingSpaceStart), + } + nextWasLarge := b.freeRangesAll[1].isLarge() + fmt.Println(" updating next", updated) + b.freeRangesAll[1] = updated - if nextWasLarge { - for l, lfr := range b.freeRangesLarge { - if lfr.start == eventsEnd { - b.freeRangesLarge[l] = updated - break - } + if nextWasLarge { + for l, lfr := range b.freeRangesLarge { + if lfr.start == eventsEnd { + fmt.Println("it is large:", l, lfr, "(now", updated, ")") + b.freeRangesLarge[l] = updated + break } - } else if updated.isLarge() { - // if it wasn't large but now is, add it to the list of large free ranges - b.freeRangesLarge = append(b.freeRangesLarge, updated) } + } else if updated.isLarge() { + // if it wasn't large but now is, add it to the list of large free ranges + fmt.Println(" a new large fr was created", updated) + b.freeRangesLarge = append(b.freeRangesLarge, updated) } // msync @@ -327,8 +344,8 @@ func (b *MultiMmapManager) Defragment(n int) error { } } - // delete the free ranges in bulk - b.freeRangesAll = slices.Delete(b.freeRangesAll, 0, n) + // delete the first free range + b.freeRangesAll = slices.Delete(b.freeRangesAll, 0, 1) return nil } diff --git a/eventstore/mmm/save.go b/eventstore/mmm/save.go index b9f3361..b6f8083 100644 --- a/eventstore/mmm/save.go +++ b/eventstore/mmm/save.go @@ -157,7 +157,7 @@ func (b *MultiMmapManager) storeOn( } // write to the mmap - if err := betterbinary.Marshal(evt, b.mmapf[pos.start:]); err != nil { + if err := betterbinary.Marshal(evt, b.mmapf[pos.start:pos.start+uint64(pos.size)]); err != nil { return false, fmt.Errorf("error marshaling to %d: %w", pos.start, err) } diff --git a/eventstore/mmm/testdata/fuzz/FuzzDefragment/2ce076b09281c580 b/eventstore/mmm/testdata/fuzz/FuzzDefragment/2ce076b09281c580 new file mode 100644 index 0000000..ef8c989 --- /dev/null +++ b/eventstore/mmm/testdata/fuzz/FuzzDefragment/2ce076b09281c580 @@ -0,0 +1,2 @@ +go test fuzz v1 +int(-360)