package mmm import ( "cmp" "encoding/binary" "fmt" "iter" "runtime" "slices" "syscall" "unsafe" "fiatjaf.com/nostr" "fiatjaf.com/nostr/eventstore/codec/betterbinary" "github.com/PowerDNS/lmdb-go/lmdb" ) const LARGE_FREERANGE = 142 // AllFreeRanges returns an iterator of (start_pos, size) of all free ranges, in positional order. func (b *MultiMmapManager) AllFreeRanges() iter.Seq2[uint64, uint32] { return func(yield func(uint64, uint32) bool) { for _, pos := range b.freeRangesAll { if !yield(pos.start, pos.size) { return } } } } func (b *MultiMmapManager) gatherFreeRanges(txn *lmdb.Txn) error { cursor, err := txn.OpenCursor(b.indexId) if err != nil { return fmt.Errorf("failed to open cursor on indexId: %w", err) } defer cursor.Close() usedPositions := make(positions, 0, 256) for key, val, err := cursor.Get(nil, nil, lmdb.First); err == nil; key, val, err = cursor.Get(key, val, lmdb.Next) { pos := positionFromBytes(val[0:12]) usedPositions = append(usedPositions, pos) } // sort used positions by start slices.SortFunc(usedPositions, func(a, b position) int { return cmp.Compare(a.start, b.start) }) // if there is free space at the end this will simulate it usedPositions = append(usedPositions, position{start: b.mmapfEnd, size: 0}) // calculate free ranges as gaps between used positions b.freeRangesAll = make(positions, 0, len(usedPositions)) b.freeRangesLarge = make([]position, 0, len(usedPositions)/10) var currentStart uint64 = 0 for _, used := range usedPositions { if used.start > currentStart { // gap from currentStart to pos.start freeSize := used.start - currentStart if freeSize > 0 { fr := position{ start: currentStart, size: uint32(freeSize), } b.freeRangesAll = append(b.freeRangesAll, fr) if fr.isLarge() { b.freeRangesLarge = append(b.freeRangesLarge, fr) } } } currentStart = used.start + uint64(used.size) } return nil } func (b *MultiMmapManager) mergeNewFreeRange(newFreeRange position) { // use binary search to find the insertion point for the new pos idx, exists := slices.BinarySearchFunc(b.freeRangesAll, newFreeRange.start, func(item position, target uint64) int { return cmp.Compare(item.start, target) }) if exists { panic(fmt.Errorf("can't add free range that already exists: %s", newFreeRange)) } deleteStart := -1 deleting := 0 // check the range immediately before if idx > 0 { before := b.freeRangesAll[idx-1] if before.start+uint64(before.size) == newFreeRange.start { deleteStart = idx - 1 deleting++ newFreeRange.start = before.start newFreeRange.size = before.size + newFreeRange.size } } // check the range immediately after if idx < len(b.freeRangesAll) { after := b.freeRangesAll[idx] if newFreeRange.start+uint64(newFreeRange.size) == after.start { if deleteStart == -1 { deleteStart = idx } deleting++ newFreeRange.size = newFreeRange.size + after.size } } switch deleting { case 0: // if we are not deleting anything we must insert the new free range b.freeRangesAll = slices.Insert(b.freeRangesAll, idx, newFreeRange) // if it's large add it to the list of large free ranges if newFreeRange.isLarge() { b.freeRangesLarge = append(b.freeRangesLarge, newFreeRange) } case 1: deleted := b.freeRangesAll[deleteStart] // if we're deleting a single range, don't delete it, modify it in-place instead. b.freeRangesAll[deleteStart] = newFreeRange // if the list we're modifying is in the list of large ranges modify it there too if deleted.isLarge() { for i, large := range b.freeRangesLarge { if large.start == deleted.start { b.freeRangesLarge[i] = newFreeRange break } } } else if newFreeRange.isLarge() { // otherwise: if after modification it's big enough we should add it to list of large ranges b.freeRangesLarge = append(b.freeRangesLarge, newFreeRange) } case 2: // now if we're deleting two ranges, delete the second instead and modify the first in place first := b.freeRangesAll[deleteStart] second := b.freeRangesAll[deleteStart+1] b.freeRangesAll = slices.Delete(b.freeRangesAll, deleteStart+1, deleteStart+1+1) b.freeRangesAll[deleteStart] = newFreeRange // if the second was in the list of large lists delete it from there too if second.isLarge() { for i, large := range b.freeRangesLarge { if large.start == second.start { b.freeRangesLarge[i] = b.freeRangesLarge[len(b.freeRangesLarge)-1] b.freeRangesLarge = b.freeRangesLarge[0 : len(b.freeRangesLarge)-1] break } } } // if the list we're modifying (the first) is already in the list of large ranges modify it there too if first.isLarge() { for i, large := range b.freeRangesLarge { if large.start == first.start { b.freeRangesLarge[i] = newFreeRange break } } } else if newFreeRange.isLarge() { // otherwise if after modification has become big enough we should add it to list of large ranges b.freeRangesLarge = append(b.freeRangesLarge, newFreeRange) } } } func (b *MultiMmapManager) Defragment(n int) error { for range min(n, len(b.freeRangesAll)-1) { if err := b.DefragmentOne(); err != nil { return err } } return nil } // Defragment a single free range func (b *MultiMmapManager) DefragmentOne() error { if b.ReadOnly { return ReadOnly } b.writeMutex.Lock() defer b.writeMutex.Unlock() runtime.LockOSThread() defer runtime.UnlockOSThread() if len(b.freeRangesAll) < 2 { return nil } mmmtxn, err := b.lmdbEnv.BeginTxn(nil, 0) if err != nil { return fmt.Errorf("failed to begin mmm transaction: %w", err) } defer mmmtxn.Abort() type layerTxn struct { il *IndexingLayer txn *lmdb.Txn } layerTxns := make(map[uint16]*layerTxn) defer func() { for _, lt := range layerTxns { lt.txn.Abort() } }() // will put stuff into the first free range fr := b.freeRangesAll[0] // where the free range ends, the events start (any number of them) eventsStart := fr.start + uint64(fr.size) eventsEnd := b.freeRangesAll[1].start // and they end when the next free range starts fmt.Println("# defrag", fr, eventsStart, eventsEnd) c := uint64(0) // this tracks our relative position inside the events section for (eventsStart + c) < eventsEnd { var evt nostr.Event if err := betterbinary.Unmarshal(b.mmapf[(eventsStart+c):eventsEnd], &evt); err != nil { id := betterbinary.GetID(b.mmapf[(eventsStart + c):eventsEnd]) return fmt.Errorf("failed to read event (%x) from mmap: %w", id[:], err) } // now that we have an event we'll update its pos on the id index and on every layer: oldVal, err := mmmtxn.Get(b.indexId, evt.ID[0:8]) if err != nil { return fmt.Errorf("failed to read val (%x) from index: %w", evt.ID[:], err) } // current position pos := positionFromBytes(oldVal[0:12]) // new position (from the beginning of the free range before + relative position) fmt.Println(" moving event", evt.ID, "from", pos) pos.start = fr.start + uint64(c) // update this cursor c += uint64(pos.size) fmt.Println(" to", pos, "...", c, "layers:", oldVal[12:]) // prepare and save id index newVal := make([]byte, len(oldVal)) writeBytesFromPosition(newVal, pos) copy(newVal[12:], oldVal[12:]) if err := mmmtxn.Put(b.indexId, evt.ID[0:8], newVal, 0); err != nil { return fmt.Errorf("failed to write new pos to id index: %w", err) } for s := 12; s < len(oldVal); s += 2 { layer := binary.BigEndian.Uint16(oldVal[s : s+2]) lt, ok := layerTxns[layer] if !ok { il := b.layers.ByID(layer) if il == nil { fmt.Println(b.layers) panic(fmt.Errorf("missing layer %d", layer)) } txn, err := il.lmdbEnv.BeginTxn(nil, 0) if err != nil { return fmt.Errorf("failed to begin layer txn for layer %d: %w", il.id, err) } txn.RawRead = true lt = &layerTxn{il: il, txn: txn} layerTxns[il.id] = lt } fmt.Println(" layer", lt.il.id) for k := range lt.il.getIndexKeysForEvent(evt) { fmt.Println(" index", k.dbi, k.key) if err := lt.txn.Del(k.dbi, k.key, oldVal[0:12]); err != nil { return fmt.Errorf("failed to delete old index entry for %x: %w", evt.ID[:], err) } if err := lt.txn.Put(k.dbi, k.key, newVal[0:12], 0); err != nil { return fmt.Errorf("failed to insert new index entry for %x: %w", evt.ID[:], err) } } } } // now that we have updated all the pointers, just copy all the bytes between the two free ranges copy(b.mmapf[fr.start:], b.mmapf[fr.start+uint64(fr.size):eventsEnd]) // delete this free range if it's one of the big ones if fr.isLarge() { for l, lfr := range b.freeRangesLarge { if lfr.start == fr.start { fmt.Println(" deleting large fr", l, lfr) b.freeRangesLarge[l] = b.freeRangesLarge[len(b.freeRangesLarge)-1] b.freeRangesLarge = b.freeRangesLarge[0 : len(b.freeRangesLarge)-1] break } } } // now we have some space left at the end of this events section that is a free range remainingSpaceStart := fr.start + c // it must be merged with the next free range updated := position{ start: remainingSpaceStart, size: b.freeRangesAll[1].size + uint32(eventsEnd) - uint32(remainingSpaceStart), } nextWasLarge := b.freeRangesAll[1].isLarge() fmt.Println(" updating next", updated) b.freeRangesAll[1] = updated if nextWasLarge { for l, lfr := range b.freeRangesLarge { if lfr.start == eventsEnd { fmt.Println("it is large:", l, lfr, "(now", updated, ")") b.freeRangesLarge[l] = updated break } } } else if updated.isLarge() { // if it wasn't large but now is, add it to the list of large free ranges fmt.Println(" a new large fr was created", updated) b.freeRangesLarge = append(b.freeRangesLarge, updated) } // msync _, _, errno := syscall.Syscall(syscall.SYS_MSYNC, uintptr(unsafe.Pointer(&b.mmapf[0])), uintptr(len(b.mmapf)), syscall.MS_SYNC) if errno != 0 { return fmt.Errorf("msync failed: %w", syscall.Errno(errno)) } // commit transactions if err := mmmtxn.Commit(); err != nil { return fmt.Errorf("failed to commit mmm transaction: %w", err) } for lid, lt := range layerTxns { if err := lt.txn.Commit(); err != nil { return fmt.Errorf("failed to commit layer %d transaction: %w", lid, err) } } // delete the first free range b.freeRangesAll = slices.Delete(b.freeRangesAll, 0, 1) return nil }