mmm: DefragmentOne() is probably better.
This commit is contained in:
+105
-88
@@ -170,6 +170,17 @@ func (b *MultiMmapManager) mergeNewFreeRange(newFreeRange position) {
|
||||
}
|
||||
|
||||
func (b *MultiMmapManager) Defragment(n int) error {
|
||||
for range min(n, len(b.freeRangesAll)-1) {
|
||||
if err := b.DefragmentOne(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Defragment a single free range
|
||||
func (b *MultiMmapManager) DefragmentOne() error {
|
||||
if b.ReadOnly {
|
||||
return ReadOnly
|
||||
}
|
||||
@@ -180,10 +191,7 @@ func (b *MultiMmapManager) Defragment(n int) error {
|
||||
runtime.LockOSThread()
|
||||
defer runtime.UnlockOSThread()
|
||||
|
||||
if n > len(b.freeRangesAll)-1 {
|
||||
n = len(b.freeRangesAll) - 1
|
||||
}
|
||||
if n <= 0 {
|
||||
if len(b.freeRangesAll) < 2 {
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -204,110 +212,119 @@ func (b *MultiMmapManager) Defragment(n int) error {
|
||||
}
|
||||
}()
|
||||
|
||||
// iterate only through `n` free ranges, as specified
|
||||
for i := 0; i < n; i++ {
|
||||
fr := b.freeRangesAll[i]
|
||||
// will put stuff into the first free range
|
||||
fr := b.freeRangesAll[0]
|
||||
|
||||
// where the free range ends, the events start (any number of them)
|
||||
eventsStart := fr.start + uint64(fr.size)
|
||||
eventsEnd := b.freeRangesAll[i+1].start // and they end when the next free range starts
|
||||
// where the free range ends, the events start (any number of them)
|
||||
eventsStart := fr.start + uint64(fr.size)
|
||||
eventsEnd := b.freeRangesAll[1].start // and they end when the next free range starts
|
||||
|
||||
c := uint64(0) // this tracks our relative position inside the events section
|
||||
for (eventsStart + c) < eventsEnd {
|
||||
var evt nostr.Event
|
||||
if err := betterbinary.Unmarshal(b.mmapf[(eventsStart+c):eventsEnd], &evt); err != nil {
|
||||
id := betterbinary.GetID(b.mmapf[(eventsStart + c):eventsEnd])
|
||||
return fmt.Errorf("failed to read event (%x) from mmap: %w", id[:], err)
|
||||
}
|
||||
fmt.Println("# defrag", fr, eventsStart, eventsEnd)
|
||||
|
||||
// now that we have an event we'll update its pos on the id index and on every layer:
|
||||
oldVal, err := mmmtxn.Get(b.indexId, evt.ID[0:8])
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to read val (%x) from index: %w", evt.ID[:], err)
|
||||
}
|
||||
c := uint64(0) // this tracks our relative position inside the events section
|
||||
for (eventsStart + c) < eventsEnd {
|
||||
var evt nostr.Event
|
||||
if err := betterbinary.Unmarshal(b.mmapf[(eventsStart+c):eventsEnd], &evt); err != nil {
|
||||
id := betterbinary.GetID(b.mmapf[(eventsStart + c):eventsEnd])
|
||||
return fmt.Errorf("failed to read event (%x) from mmap: %w", id[:], err)
|
||||
}
|
||||
|
||||
// current position
|
||||
pos := positionFromBytes(oldVal[0:12])
|
||||
// now that we have an event we'll update its pos on the id index and on every layer:
|
||||
oldVal, err := mmmtxn.Get(b.indexId, evt.ID[0:8])
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to read val (%x) from index: %w", evt.ID[:], err)
|
||||
}
|
||||
|
||||
// new position (from the beginning of the free range before + relative position)
|
||||
pos.start = fr.start + uint64(c)
|
||||
// current position
|
||||
pos := positionFromBytes(oldVal[0:12])
|
||||
|
||||
// update this cursor
|
||||
c += uint64(pos.size)
|
||||
// new position (from the beginning of the free range before + relative position)
|
||||
fmt.Println(" moving event", evt.ID, "from", pos)
|
||||
pos.start = fr.start + uint64(c)
|
||||
|
||||
// prepare and save id index
|
||||
newVal := make([]byte, len(oldVal))
|
||||
writeBytesFromPosition(newVal, pos)
|
||||
copy(newVal[12:], oldVal[12:])
|
||||
if err := mmmtxn.Put(b.indexId, evt.ID[0:8], newVal, 0); err != nil {
|
||||
return fmt.Errorf("failed to write new pos to id index: %w", err)
|
||||
}
|
||||
// update this cursor
|
||||
c += uint64(pos.size)
|
||||
fmt.Println(" to", pos, "...", c, "layers:", oldVal[12:])
|
||||
|
||||
for s := 12; s < len(oldVal); s += 2 {
|
||||
layer := binary.BigEndian.Uint16(oldVal[s : s+2])
|
||||
lt, ok := layerTxns[layer]
|
||||
if !ok {
|
||||
il := b.layers.ByID(layer)
|
||||
if il == nil {
|
||||
fmt.Println(b.layers)
|
||||
panic(fmt.Errorf("missing layer %d", layer))
|
||||
}
|
||||
txn, err := il.lmdbEnv.BeginTxn(nil, 0)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to begin layer txn for layer %d: %w", il.id, err)
|
||||
}
|
||||
txn.RawRead = true
|
||||
lt = &layerTxn{il: il, txn: txn}
|
||||
layerTxns[il.id] = lt
|
||||
// prepare and save id index
|
||||
newVal := make([]byte, len(oldVal))
|
||||
writeBytesFromPosition(newVal, pos)
|
||||
copy(newVal[12:], oldVal[12:])
|
||||
if err := mmmtxn.Put(b.indexId, evt.ID[0:8], newVal, 0); err != nil {
|
||||
return fmt.Errorf("failed to write new pos to id index: %w", err)
|
||||
}
|
||||
|
||||
for s := 12; s < len(oldVal); s += 2 {
|
||||
layer := binary.BigEndian.Uint16(oldVal[s : s+2])
|
||||
lt, ok := layerTxns[layer]
|
||||
if !ok {
|
||||
il := b.layers.ByID(layer)
|
||||
if il == nil {
|
||||
fmt.Println(b.layers)
|
||||
panic(fmt.Errorf("missing layer %d", layer))
|
||||
}
|
||||
txn, err := il.lmdbEnv.BeginTxn(nil, 0)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to begin layer txn for layer %d: %w", il.id, err)
|
||||
}
|
||||
txn.RawRead = true
|
||||
lt = &layerTxn{il: il, txn: txn}
|
||||
layerTxns[il.id] = lt
|
||||
}
|
||||
|
||||
for k := range lt.il.getIndexKeysForEvent(evt) {
|
||||
if err := lt.txn.Del(k.dbi, k.key, oldVal[0:12]); err != nil {
|
||||
return fmt.Errorf("failed to delete old index entry for %x: %w", evt.ID[:], err)
|
||||
}
|
||||
if err := lt.txn.Put(k.dbi, k.key, newVal[0:12], 0); err != nil {
|
||||
return fmt.Errorf("failed to insert new index entry for %x: %w", evt.ID[:], err)
|
||||
}
|
||||
fmt.Println(" layer", lt.il.id)
|
||||
|
||||
for k := range lt.il.getIndexKeysForEvent(evt) {
|
||||
fmt.Println(" index", k.dbi, k.key)
|
||||
if err := lt.txn.Del(k.dbi, k.key, oldVal[0:12]); err != nil {
|
||||
return fmt.Errorf("failed to delete old index entry for %x: %w", evt.ID[:], err)
|
||||
}
|
||||
if err := lt.txn.Put(k.dbi, k.key, newVal[0:12], 0); err != nil {
|
||||
return fmt.Errorf("failed to insert new index entry for %x: %w", evt.ID[:], err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// now that we have updated all the pointers, just copy all the bytes between the two free ranges
|
||||
copy(b.mmapf[fr.start:], b.mmapf[fr.start+uint64(fr.size):eventsEnd])
|
||||
// now that we have updated all the pointers, just copy all the bytes between the two free ranges
|
||||
copy(b.mmapf[fr.start:], b.mmapf[fr.start+uint64(fr.size):eventsEnd])
|
||||
|
||||
// delete this free range if it's one of the big ones
|
||||
if fr.isLarge() {
|
||||
for l, lfr := range b.freeRangesLarge {
|
||||
if lfr.start == fr.start {
|
||||
b.freeRangesLarge[l] = b.freeRangesLarge[len(b.freeRangesLarge)-1]
|
||||
b.freeRangesLarge = b.freeRangesLarge[0 : len(b.freeRangesLarge)-1]
|
||||
break
|
||||
}
|
||||
// delete this free range if it's one of the big ones
|
||||
if fr.isLarge() {
|
||||
for l, lfr := range b.freeRangesLarge {
|
||||
if lfr.start == fr.start {
|
||||
fmt.Println(" deleting large fr", l, lfr)
|
||||
b.freeRangesLarge[l] = b.freeRangesLarge[len(b.freeRangesLarge)-1]
|
||||
b.freeRangesLarge = b.freeRangesLarge[0 : len(b.freeRangesLarge)-1]
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// now we have some space left at the end of this events section that is a free range
|
||||
remainingSpaceStart := fr.start + c
|
||||
// now we have some space left at the end of this events section that is a free range
|
||||
remainingSpaceStart := fr.start + c
|
||||
|
||||
// it must be merged with the next free range
|
||||
updated := position{
|
||||
start: remainingSpaceStart,
|
||||
size: b.freeRangesAll[i+1].size + uint32(eventsEnd) - uint32(remainingSpaceStart),
|
||||
}
|
||||
nextWasLarge := b.freeRangesAll[i+1].isLarge()
|
||||
b.freeRangesAll[i+1] = updated
|
||||
// it must be merged with the next free range
|
||||
updated := position{
|
||||
start: remainingSpaceStart,
|
||||
size: b.freeRangesAll[1].size + uint32(eventsEnd) - uint32(remainingSpaceStart),
|
||||
}
|
||||
nextWasLarge := b.freeRangesAll[1].isLarge()
|
||||
fmt.Println(" updating next", updated)
|
||||
b.freeRangesAll[1] = updated
|
||||
|
||||
if nextWasLarge {
|
||||
for l, lfr := range b.freeRangesLarge {
|
||||
if lfr.start == eventsEnd {
|
||||
b.freeRangesLarge[l] = updated
|
||||
break
|
||||
}
|
||||
if nextWasLarge {
|
||||
for l, lfr := range b.freeRangesLarge {
|
||||
if lfr.start == eventsEnd {
|
||||
fmt.Println("it is large:", l, lfr, "(now", updated, ")")
|
||||
b.freeRangesLarge[l] = updated
|
||||
break
|
||||
}
|
||||
} else if updated.isLarge() {
|
||||
// if it wasn't large but now is, add it to the list of large free ranges
|
||||
b.freeRangesLarge = append(b.freeRangesLarge, updated)
|
||||
}
|
||||
} else if updated.isLarge() {
|
||||
// if it wasn't large but now is, add it to the list of large free ranges
|
||||
fmt.Println(" a new large fr was created", updated)
|
||||
b.freeRangesLarge = append(b.freeRangesLarge, updated)
|
||||
}
|
||||
|
||||
// msync
|
||||
@@ -327,8 +344,8 @@ func (b *MultiMmapManager) Defragment(n int) error {
|
||||
}
|
||||
}
|
||||
|
||||
// delete the free ranges in bulk
|
||||
b.freeRangesAll = slices.Delete(b.freeRangesAll, 0, n)
|
||||
// delete the first free range
|
||||
b.freeRangesAll = slices.Delete(b.freeRangesAll, 0, 1)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -157,7 +157,7 @@ func (b *MultiMmapManager) storeOn(
|
||||
}
|
||||
|
||||
// write to the mmap
|
||||
if err := betterbinary.Marshal(evt, b.mmapf[pos.start:]); err != nil {
|
||||
if err := betterbinary.Marshal(evt, b.mmapf[pos.start:pos.start+uint64(pos.size)]); err != nil {
|
||||
return false, fmt.Errorf("error marshaling to %d: %w", pos.start, err)
|
||||
}
|
||||
|
||||
|
||||
@@ -0,0 +1,2 @@
|
||||
go test fuzz v1
|
||||
int(-360)
|
||||
Reference in New Issue
Block a user