diff --git a/eventstore/mmm/delete.go b/eventstore/mmm/delete.go index 7188b21..967f416 100644 --- a/eventstore/mmm/delete.go +++ b/eventstore/mmm/delete.go @@ -3,6 +3,7 @@ package mmm import ( "encoding/binary" "fmt" + "runtime" "slices" "fiatjaf.com/nostr" @@ -13,32 +14,84 @@ func (il *IndexingLayer) DeleteEvent(id nostr.ID) error { il.mmmm.writeMutex.Lock() defer il.mmmm.writeMutex.Unlock() - return il.mmmm.lmdbEnv.Update(func(mmmtxn *lmdb.Txn) error { - return il.lmdbEnv.Update(func(iltxn *lmdb.Txn) error { - return il.delete(mmmtxn, iltxn, id) - }) - }) + runtime.LockOSThread() + defer runtime.UnlockOSThread() + + // prepare transactions + mmmtxn, err := il.mmmm.lmdbEnv.BeginTxn(nil, 0) + if err != nil { + return err + } + defer func() { + // defer abort but only if we haven't committed (we'll set it to nil after committing) + if mmmtxn != nil { + mmmtxn.Abort() + } + }() + mmmtxn.RawRead = true + + iltxn, err := il.lmdbEnv.BeginTxn(nil, 0) + if err != nil { + return err + } + defer func() { + // defer abort but only if we haven't committed (we'll set it to nil after committing) + if iltxn != nil { + iltxn.Abort() + } + }() + iltxn.RawRead = true + + var acquiredFreeRangeFromDelete *position + if pos, shouldPurge, err := il.delete(mmmtxn, iltxn, id); err != nil { + return fmt.Errorf("failed to delete event %s: %w", id, err) + } else if shouldPurge { + // purge + if err := mmmtxn.Del(il.mmmm.indexId, id[0:8], nil); err != nil { + return err + } + acquiredFreeRangeFromDelete = &pos + } + + // commit in this order to minimize problematic inconsistencies + if err := mmmtxn.Commit(); err != nil { + return fmt.Errorf("can't commit mmmtxn: %w", err) + } + mmmtxn = nil + if err := iltxn.Commit(); err != nil { + return fmt.Errorf("can't commit iltxn: %w", err) + } + iltxn = nil + + // finally merge in the new free range (in this order it makes more sense, the worst that can + // happen is that we lose this free range but we'll have it again on the next startup) + if acquiredFreeRangeFromDelete != nil { + il.mmmm.mergeNewFreeRange(*acquiredFreeRangeFromDelete) + } + + return nil } -func (il *IndexingLayer) delete(mmmtxn *lmdb.Txn, iltxn *lmdb.Txn, id nostr.ID) error { - zeroRefs := false - b := il.mmmm - - b.Logger.Debug().Str("layer", il.name).Uint16("il", il.id).Msg("deleting") +func (il *IndexingLayer) delete( + mmmtxn *lmdb.Txn, + iltxn *lmdb.Txn, + id nostr.ID, +) (pos position, shouldPurge bool, err error) { + il.mmmm.Logger.Debug().Str("layer", il.name).Uint16("il", il.id).Msg("deleting") // first in the mmmm txn we check if we have the event still - val, err := mmmtxn.Get(b.indexId, id[0:8]) + val, err := mmmtxn.Get(il.mmmm.indexId, id[0:8]) if err != nil { if lmdb.IsNotFound(err) { // we already do not have this anywhere - return nil + return position{}, false, nil } - return fmt.Errorf("failed to check if we have the event %x: %w", id, err) + return position{}, false, fmt.Errorf("failed to check if we have the event %x: %w", id, err) } // we have this, but do we have it in the current layer? // val is [posb][il_idx][il_idx...] - pos := positionFromBytes(val[0:12]) + pos = positionFromBytes(val[0:12]) // check references currentLayer := binary.BigEndian.AppendUint16(nil, il.id) @@ -49,12 +102,12 @@ func (il *IndexingLayer) delete(mmmtxn *lmdb.Txn, iltxn *lmdb.Txn, id nostr.ID) copy(nextval, val[0:i]) copy(nextval[i:], val[i+2:]) - if err := mmmtxn.Put(b.indexId, id[0:8], nextval, 0); err != nil { - return fmt.Errorf("failed to update references for %x: %w", id[:], err) + if err := mmmtxn.Put(il.mmmm.indexId, id[0:8], nextval, 0); err != nil { + return pos, false, fmt.Errorf("failed to update references for %x: %w", id[:], err) } // if there are no more layers we will delete everything later - zeroRefs = len(nextval) == 12 + shouldPurge = len(nextval) == 12 break } @@ -63,21 +116,14 @@ func (il *IndexingLayer) delete(mmmtxn *lmdb.Txn, iltxn *lmdb.Txn, id nostr.ID) // load the event so we can compute the indexes var evt nostr.Event if err := il.mmmm.loadEvent(pos, &evt); err != nil { - return fmt.Errorf("failed to load event %x when deleting: %w", id[:], err) + return pos, false, fmt.Errorf("failed to load event %x when deleting: %w", id[:], err) } if err := il.deleteIndexes(iltxn, evt, val[0:12]); err != nil { - return fmt.Errorf("failed to delete indexes for %s=>%v: %w", evt.ID, val[0:12], err) + return pos, false, fmt.Errorf("failed to delete indexes for %s=>%v: %w", evt.ID, val[0:12], err) } - // if there are no more refs we delete the event from the id index and mmap - if zeroRefs { - if err := b.purge(mmmtxn, id[0:8], pos); err != nil { - panic(err) - } - } - - return nil + return pos, shouldPurge, nil } func (il *IndexingLayer) deleteIndexes(iltxn *lmdb.Txn, event nostr.Event, posbytes []byte) error { diff --git a/eventstore/mmm/fix.go b/eventstore/mmm/fix.go index 50f5ad3..32aae2a 100644 --- a/eventstore/mmm/fix.go +++ b/eventstore/mmm/fix.go @@ -3,7 +3,6 @@ package mmm import ( "bytes" "encoding/binary" - "encoding/hex" "slices" "fiatjaf.com/nostr" @@ -21,12 +20,7 @@ func (b *MultiMmapManager) Rescan() error { } defer cursor.Close() - type entry struct { - idPrefix []byte - pos position - } - var toPurge []entry - + var toPurge [][]byte // a list of idPrefix entries for key, val, err := cursor.Get(nil, nil, lmdb.First); err == nil; key, val, err = cursor.Get(key, val, lmdb.Next) { pos := positionFromBytes(val[0:12]) @@ -91,7 +85,7 @@ func (b *MultiMmapManager) Rescan() error { } if borked { - toPurge = append(toPurge, entry{idPrefix: key, pos: pos}) + toPurge = append(toPurge, key) } else if len(layersToRemove) > 0 { for s := 12; s < len(val); { if slices.Contains(layersToRemove, binary.BigEndian.Uint16(val[s:s+2])) { @@ -108,16 +102,16 @@ func (b *MultiMmapManager) Rescan() error { return err } } else { - toPurge = append(toPurge, entry{idPrefix: key, pos: pos}) + toPurge = append(toPurge, key) } } } - for _, entry := range toPurge { + for _, idPrefix := range toPurge { // just delete from the ids index, // no need to deal with the freeranges list as it will be recalculated afterwards. // this also ensures any brokenly overlapping overwritten events don't have to be sacrificed. - if err := mmmtxn.Del(b.indexId, entry.idPrefix, nil); err != nil { + if err := mmmtxn.Del(b.indexId, idPrefix, nil); err != nil { return err } } diff --git a/eventstore/mmm/fix_test.go b/eventstore/mmm/fix_test.go index ad6a4dc..52b932c 100644 --- a/eventstore/mmm/fix_test.go +++ b/eventstore/mmm/fix_test.go @@ -113,7 +113,7 @@ func FuzzBorkedRescan(f *testing.F) { // this won't be erased, just removed from this specific layer layer := layers[rnd.IntN(len(layers))] posb := make([]byte, 12) - bytesFromPosition(posb, pos) + writeBytesFromPosition(posb, pos) if err := layer.lmdbEnv.Update(func(iltxn *lmdb.Txn) error { return layer.deleteIndexes(iltxn, evt, posb) diff --git a/eventstore/mmm/freeranges.go b/eventstore/mmm/freeranges.go index dc93633..985a3ba 100644 --- a/eventstore/mmm/freeranges.go +++ b/eventstore/mmm/freeranges.go @@ -24,7 +24,7 @@ func (b *MultiMmapManager) gatherFreeRanges(txn *lmdb.Txn) (positions, error) { // sort used positions by start slices.SortFunc(usedPositions, func(a, b position) int { return cmp.Compare(a.start, b.start) }) - // if there is free space at the end (which doesn't happen in normal conditions) do this to simulate it + // if there is free space at the end this will simulate it usedPositions = append(usedPositions, position{start: b.mmapfEnd, size: 0}) // calculate free ranges as gaps between used positions @@ -47,9 +47,7 @@ func (b *MultiMmapManager) gatherFreeRanges(txn *lmdb.Txn) (positions, error) { return freeRanges, nil } -// this injects the new free range into the list, merging it with existing free ranges if necessary. -// it also takes a pointer so it can modify it for the caller to use it in setting up the new mmapf. -func (b *MultiMmapManager) mergeNewFreeRange(newFreeRange *position) (isAtEnd bool) { +func (b *MultiMmapManager) mergeNewFreeRange(newFreeRange position) { // use binary search to find the insertion point for the new pos idx, exists := slices.BinarySearchFunc(b.freeRanges, newFreeRange.start, func(item position, target uint64) int { return cmp.Compare(item.start, target) @@ -86,27 +84,16 @@ func (b *MultiMmapManager) mergeNewFreeRange(newFreeRange *position) (isAtEnd bo } } - // when we're at the end of a file we just delete everything and don't add new free ranges - // the caller will truncate the mmap file and adjust the position accordingly - if newFreeRange.start+uint64(newFreeRange.size) == b.mmapfEnd { - if deleting > 0 { - b.freeRanges = slices.Delete(b.freeRanges, deleteStart, deleteStart+deleting) - } - return true - } - switch deleting { case 0: // if we are not deleting anything we must insert the new free range - b.freeRanges = slices.Insert(b.freeRanges, idx, *newFreeRange) + b.freeRanges = slices.Insert(b.freeRanges, idx, newFreeRange) case 1: // if we're deleting a single range, don't delete it, modify it in-place instead. - b.freeRanges[deleteStart] = *newFreeRange + b.freeRanges[deleteStart] = newFreeRange case 2: // now if we're deleting two ranges, delete just one instead and modify the other in place - b.freeRanges[deleteStart] = *newFreeRange + b.freeRanges[deleteStart] = newFreeRange b.freeRanges = slices.Delete(b.freeRanges, deleteStart+1, deleteStart+1+1) } - - return false } diff --git a/eventstore/mmm/freeranges_test.go b/eventstore/mmm/freeranges_test.go index 7c856be..db5a08f 100644 --- a/eventstore/mmm/freeranges_test.go +++ b/eventstore/mmm/freeranges_test.go @@ -45,12 +45,15 @@ func FuzzFreeRanges(f *testing.F) { total := 0 for { - freeBefore := countUsableFreeRanges(mmmm) + freeBefore, spaceBefore := countUsableFreeRanges(mmmm) + hasAdded := false for i := range rnd.IntN(40) { + hasAdded = true + content := "1" // ensure at least one event is as small as it can be if i > 0 { - strings.Repeat("z", rnd.IntN(1000)) + content = strings.Repeat("z", rnd.IntN(1000)) } evt := nostr.Event{ @@ -66,9 +69,9 @@ func FuzzFreeRanges(f *testing.F) { total++ } - freeAfter := countUsableFreeRanges(mmmm) - if freeBefore > 0 { - require.Lessf(t, freeAfter, freeBefore, "must use some of the existing free ranges when inserting new events") + freeAfter, spaceAfter := countUsableFreeRanges(mmmm) + if hasAdded && freeBefore > 0 { + require.Lessf(t, spaceAfter, spaceBefore, "must use some of the existing free ranges when inserting new events (before: %d, after: %d)", freeBefore, freeAfter) } // delete some events @@ -96,12 +99,12 @@ func FuzzFreeRanges(f *testing.F) { }) } -func countUsableFreeRanges(mmmm *MultiMmapManager) int { - count := 0 +func countUsableFreeRanges(mmmm *MultiMmapManager) (count int, space int) { for _, fr := range mmmm.freeRanges { - if fr.size > 150 { + if fr.size >= 142 { count++ + space += int(fr.size) } } - return count + return count, space } diff --git a/eventstore/mmm/helpers.go b/eventstore/mmm/helpers.go index d57fa8b..7c24e6c 100644 --- a/eventstore/mmm/helpers.go +++ b/eventstore/mmm/helpers.go @@ -34,8 +34,6 @@ type iterator struct { } func (it *iterator) pull(n int, since uint32) { - query := it.query - for range n { // in the beginning we already have a k and a v and an err from the cursor setup, so check and use these if it.err != nil { @@ -43,7 +41,7 @@ func (it *iterator) pull(n int, since uint32) { return } - if len(it.key) != query.keySize || !bytes.HasPrefix(it.key, query.prefix) { + if len(it.key) != it.query.keySize || !bytes.HasPrefix(it.key, it.query.prefix) { // we reached the end of this prefix it.exhausted = true return diff --git a/eventstore/mmm/mmmm.go b/eventstore/mmm/mmmm.go index 6ae5739..43dc2b5 100644 --- a/eventstore/mmm/mmmm.go +++ b/eventstore/mmm/mmmm.go @@ -276,9 +276,11 @@ func (b *MultiMmapManager) removeAllReferencesFromLayer(txn *lmdb.Txn, layerId u posb := val[0:12] pos := positionFromBytes(posb) - if err := b.purge(txn, idPrefix8, pos); err != nil { + if err := txn.Del(b.indexId, idPrefix8, nil); err != nil { return fmt.Errorf("failed to purge unreferenced event %x: %w", idPrefix8, err) } + + b.mergeNewFreeRange(pos) } else if update { if err := txn.Put(b.indexId, idPrefix8, val, 0); err != nil { return fmt.Errorf("failed to put updated index+refs: %w", err) diff --git a/eventstore/mmm/position.go b/eventstore/mmm/position.go index d692300..c0bb6ec 100644 --- a/eventstore/mmm/position.go +++ b/eventstore/mmm/position.go @@ -36,7 +36,7 @@ func positionFromBytes(posb []byte) position { } } -func bytesFromPosition(out []byte, pos position) { +func writeBytesFromPosition(out []byte, pos position) { binary.BigEndian.PutUint32(out[0:4], pos.size) binary.BigEndian.PutUint64(out[4:12], pos.start) } diff --git a/eventstore/mmm/purge.go b/eventstore/mmm/purge.go deleted file mode 100644 index bb6b293..0000000 --- a/eventstore/mmm/purge.go +++ /dev/null @@ -1,35 +0,0 @@ -package mmm - -import ( - "bytes" - "fmt" - "os" - - "github.com/PowerDNS/lmdb-go/lmdb" -) - -func (b *MultiMmapManager) purge(txn *lmdb.Txn, idPrefix8 []byte, pos position) error { - b.Logger.Debug().Hex("event", idPrefix8).Stringer("pos", pos).Msg("purging") - - // delete from index - if err := txn.Del(b.indexId, idPrefix8, nil); err != nil { - return err - } - - // will add the current range to free ranges, which means it is "deleted" (or merge with existing) - isAtEnd := b.mergeNewFreeRange(&pos) - if isAtEnd { - // when at the end, truncate the mmap - // [new_pos_to_be_freed][end_of_file] -> shrink file! - pos.size = 0 // so we don't try to add this some lines below - if err := os.Truncate(b.mmapfPath, int64(pos.start)); err != nil { - panic(fmt.Errorf("error decreasing %s: %w", b.mmapfPath, err)) - } - b.mmapfEnd = pos.start - } else { - // this is for debugging ------------- - copy(b.mmapf[pos.start:], bytes.Repeat([]byte{'!'}, int(pos.size))) - } - - return nil -} diff --git a/eventstore/mmm/query_planner.go b/eventstore/mmm/query_planner.go index e4d457f..831b0a7 100644 --- a/eventstore/mmm/query_planner.go +++ b/eventstore/mmm/query_planner.go @@ -14,7 +14,6 @@ type query struct { i int dbi lmdb.DBI prefix []byte - results chan *nostr.Event keySize int timestampSize int startingPoint []byte @@ -46,7 +45,6 @@ func (il *IndexingLayer) prepareQueries(filter nostr.Filter) ( sp = sp[0:len(q.prefix)] copy(sp, q.prefix) queries[i].startingPoint = binary.BigEndian.AppendUint32(sp, uint32(until)) - queries[i].results = make(chan *nostr.Event, 12) } }() diff --git a/eventstore/mmm/replace.go b/eventstore/mmm/replace.go index 5b9a0d6..079b84e 100644 --- a/eventstore/mmm/replace.go +++ b/eventstore/mmm/replace.go @@ -7,7 +7,6 @@ import ( "fiatjaf.com/nostr" "fiatjaf.com/nostr/eventstore/internal" - "github.com/PowerDNS/lmdb-go/lmdb" ) func (il *IndexingLayer) ReplaceEvent(evt nostr.Event) error { @@ -23,36 +22,80 @@ func (il *IndexingLayer) ReplaceEvent(evt nostr.Event) error { filter.Tags = nostr.TagMap{"d": []string{evt.Tags.GetD()}} } - return il.mmmm.lmdbEnv.Update(func(mmmtxn *lmdb.Txn) error { - mmmtxn.RawRead = true + // prepare transactions + mmmtxn, err := il.mmmm.lmdbEnv.BeginTxn(nil, 0) + if err != nil { + return err + } + defer func() { + // defer abort but only if we haven't committed (we'll set it to nil after committing) + if mmmtxn != nil { + mmmtxn.Abort() + } + }() + mmmtxn.RawRead = true - return il.lmdbEnv.Update(func(iltxn *lmdb.Txn) error { - // now we fetch the past events, whatever they are, delete them and then save the new - var err error - var results iter.Seq[nostr.Event] = func(yield func(nostr.Event) bool) { - err = il.query(iltxn, filter, 10 /* in theory limit could be just 1 and this should work */, yield) - } - if err != nil { - return fmt.Errorf("failed to query past events with %s: %w", filter, err) - } + iltxn, err := il.lmdbEnv.BeginTxn(nil, 0) + if err != nil { + return err + } + defer func() { + // defer abort but only if we haven't committed (we'll set it to nil after committing) + if iltxn != nil { + iltxn.Abort() + } + }() + iltxn.RawRead = true - shouldStore := true - for previous := range results { - if internal.IsOlder(previous, evt) { - if err := il.delete(mmmtxn, iltxn, previous.ID); err != nil { - return fmt.Errorf("failed to delete event %s for replacing: %w", previous.ID, err) - } - } else { - // there is a newer event already stored, so we won't store this - shouldStore = false + // now we fetch the past events, whatever they are, delete them and then save the new + var results iter.Seq[nostr.Event] = func(yield func(nostr.Event) bool) { + err = il.query(iltxn, filter, 10 /* in theory limit could be just 1 and this should work */, yield) + } + if err != nil { + return fmt.Errorf("failed to query past events with %s: %w", filter, err) + } + + var acquiredFreeRangeFromDelete *position + shouldStore := true + for previous := range results { + if internal.IsOlder(previous, evt) { + if pos, shouldPurge, err := il.delete(mmmtxn, iltxn, previous.ID); err != nil { + return fmt.Errorf("failed to delete event %s for replacing: %w", previous.ID, err) + } else if shouldPurge { + // purge + if err := mmmtxn.Del(il.mmmm.indexId, previous.ID[0:8], nil); err != nil { + return err } + acquiredFreeRangeFromDelete = &pos } - if shouldStore { - _, err := il.mmmm.storeOn(mmmtxn, []*IndexingLayer{il}, []*lmdb.Txn{iltxn}, evt) - return err - } + } else { + // there is a newer event already stored, so we won't store this + shouldStore = false + } + } - return nil - }) - }) + if shouldStore { + _, err := il.mmmm.storeOn(mmmtxn, iltxn, il, evt) + if err != nil { + return err + } + } + + // commit in this order to minimize problematic inconsistencies + if err := mmmtxn.Commit(); err != nil { + return fmt.Errorf("can't commit mmmtxn: %w", err) + } + mmmtxn = nil + if err := iltxn.Commit(); err != nil { + return fmt.Errorf("can't commit iltxn: %w", err) + } + iltxn = nil + + // finally merge in the new free range (in this order it makes more sense, the worst that can + // happen is that we lose this free range but we'll have it again on the next startup) + if acquiredFreeRangeFromDelete != nil { + il.mmmm.mergeNewFreeRange(*acquiredFreeRangeFromDelete) + } + + return nil } diff --git a/eventstore/mmm/save.go b/eventstore/mmm/save.go index 9f70512..66f29ac 100644 --- a/eventstore/mmm/save.go +++ b/eventstore/mmm/save.go @@ -21,150 +21,150 @@ func (il *IndexingLayer) SaveEvent(evt nostr.Event) error { runtime.LockOSThread() defer runtime.UnlockOSThread() - // do this just so it's cleaner, we're already locking the thread and the mutex anyway + // prepare transactions mmmtxn, err := il.mmmm.lmdbEnv.BeginTxn(nil, 0) if err != nil { - return fmt.Errorf("failed to begin global transaction: %w", err) + return err } + defer func() { + // defer abort but only if we haven't committed (we'll set it to nil after committing) + if mmmtxn != nil { + mmmtxn.Abort() + } + }() mmmtxn.RawRead = true iltxn, err := il.lmdbEnv.BeginTxn(nil, 0) if err != nil { - mmmtxn.Abort() - return fmt.Errorf("failed to start txn on %s: %w", il.name, err) + return err } - - if _, err := il.mmmm.storeOn(mmmtxn, []*IndexingLayer{il}, []*lmdb.Txn{iltxn}, evt); err != nil { - mmmtxn.Abort() + defer func() { + // defer abort but only if we haven't committed (we'll set it to nil after committing) if iltxn != nil { iltxn.Abort() } + }() + iltxn.RawRead = true + + // the actual save operation + if _, err := il.mmmm.storeOn(mmmtxn, iltxn, il, evt); err != nil { return err } + // commit in this order to minimize problematic inconsistencies if err := mmmtxn.Commit(); err != nil { - return err + return fmt.Errorf("can't commit mmmtxn: %w", err) } - + mmmtxn = nil if err := iltxn.Commit(); err != nil { - return err + return fmt.Errorf("can't commit iltxn: %w", err) } + iltxn = nil return nil } func (b *MultiMmapManager) storeOn( mmmtxn *lmdb.Txn, - ils []*IndexingLayer, - iltxns []*lmdb.Txn, + iltxn *lmdb.Txn, + il *IndexingLayer, evt nostr.Event, ) (stored bool, err error) { // check if we already have this id + var pos position val, err := mmmtxn.Get(b.indexId, evt.ID[0:8]) if err == nil { - // we found the event, now check if it is already indexed by the layers that want to store it - for i := len(ils) - 1; i >= 0; i-- { - for s := 12; s < len(val); s += 2 { - ilid := binary.BigEndian.Uint16(val[s : s+2]) - if ils[i].id == ilid { - // swap delete this il, but keep the deleted ones at the end - // (so the caller can successfully finalize the transactions) - ils[i], ils[len(ils)-1] = ils[len(ils)-1], ils[i] - ils = ils[0 : len(ils)-1] - iltxns[i], iltxns[len(iltxns)-1] = iltxns[len(iltxns)-1], iltxns[i] - iltxns = iltxns[0 : len(iltxns)-1] - break - } + pos = positionFromBytes(val[0:12]) + // we found the event, now check if it is already indexed by the layer that wants to store it + for s := 12; s < len(val); s += 2 { + ilid := binary.BigEndian.Uint16(val[s : s+2]) + if il.id == ilid { + // already on the specified layer, we can end here + return false, nil } } } else if !lmdb.IsNotFound(err) { - // now if we got an error from lmdb we will only proceed if we get a NotFound -- for anything else we will error + // if we got an error from lmdb we will only proceed if it's NotFound -- for anything else we will error return false, fmt.Errorf("error checking existence: %w", err) } - // if all ils already have this event indexed (or no il was given) we can end here - if len(ils) == 0 { - return false, nil - } - - // get event binary size - pos := position{ - size: uint32(betterbinary.Measure(evt)), - } - if pos.size >= 1<<16 { - return false, fmt.Errorf("event too large to store, max %d, got %d", 1<<16, pos.size) - } - - // find a suitable place for this to be stored in - appendToMmap := true - for f, fr := range b.freeRanges { - if fr.size >= pos.size { - // found the smallest possible place that can fit this event - appendToMmap = false - pos.start = fr.start - - // modify the free ranges we're keeping track of - if pos.size == fr.size { - // if we've used it entirely just delete it - b.freeRanges = slices.Delete(b.freeRanges, f, f+1) - } else { - // otherwise modify it in place - b.freeRanges[f] = position{ - start: fr.start + uint64(pos.size), - size: fr.size - pos.size, - } - } - - break - } - } - - if appendToMmap { - // no free ranges found, so write to the end of the mmap file - pos.start = b.mmapfEnd - mmapfNewSize := int64(b.mmapfEnd) + int64(pos.size) - if err := os.Truncate(b.mmapfPath, mmapfNewSize); err != nil { - return false, fmt.Errorf("error increasing %s: %w", b.mmapfPath, err) - } - b.mmapfEnd = uint64(mmapfNewSize) - } - - // write to the mmap - if err := betterbinary.Marshal(evt, b.mmapf[pos.start:]); err != nil { - return false, fmt.Errorf("error marshaling to %d: %w", pos.start, err) - } - - // prepare value to be saved in the id index (if we didn't have it already) - // val: [posb][layerIdRefs...] + // ok, now we have to write the event to the mmapped file + // unless we already have the event stored, in that case we don't have to write it again, we'll just reuse it if val == nil { - val = make([]byte, 12, 12+2*len(b.layers)) - binary.BigEndian.PutUint32(val[0:4], pos.size) - binary.BigEndian.PutUint64(val[4:12], pos.start) - } + // get event binary size + pos = position{ + size: uint32(betterbinary.Measure(evt)), + } + if pos.size >= 1<<16 { + return false, fmt.Errorf("event too large to store, max %d, got %d", 1<<16, pos.size) + } - // each index that was reserved above for the different layers - for i, il := range ils { - iltxn := iltxns[i] + // find a suitable place for this to be stored in + appendToMmap := true + for f, fr := range b.freeRanges { + if fr.size >= pos.size { + // found the smallest possible place that can fit this event + appendToMmap = false + pos.start = fr.start - for k := range il.getIndexKeysForEvent(evt) { - if err := iltxn.Put(k.dbi, k.key, val[0:12] /* pos */, 0); err != nil { - b.Logger.Warn().Str("name", il.name).Msg("failed to index event on layer") + // modify the free ranges we're keeping track of + // (in case of conflict we lose this free range but it's ok, it will be recovered on the next startup) + if pos.size == fr.size { + // if we've used it entirely just delete it + b.freeRanges = slices.Delete(b.freeRanges, f, f+1) + } else { + // otherwise modify it in place + b.freeRanges[f] = position{ + start: fr.start + uint64(pos.size), + size: fr.size - pos.size, + } + } + + break } } - val = binary.BigEndian.AppendUint16(val, il.id) + if appendToMmap { + // no free ranges found, so write to the end of the mmap file + pos.start = b.mmapfEnd + mmapfNewSize := int64(b.mmapfEnd) + int64(pos.size) + if err := os.Truncate(b.mmapfPath, mmapfNewSize); err != nil { + return false, fmt.Errorf("error increasing %s: %w", b.mmapfPath, err) + } + b.mmapfEnd = uint64(mmapfNewSize) + } + + // write to the mmap + if err := betterbinary.Marshal(evt, b.mmapf[pos.start:]); err != nil { + return false, fmt.Errorf("error marshaling to %d: %w", pos.start, err) + } + + // msync + _, _, errno := syscall.Syscall(syscall.SYS_MSYNC, + uintptr(unsafe.Pointer(&b.mmapf[0])), uintptr(len(b.mmapf)), syscall.MS_SYNC) + if errno != 0 { + panic(fmt.Errorf("msync failed: %w", syscall.Errno(errno))) + } + + // prepare value to be saved in the id index (if we didn't have it already) + // val: [posb][layerIdRefs...] + val = make([]byte, 12, 12+2) // only reserve room for one layer after the position + writeBytesFromPosition(val, pos) } - // store the id index with the layer references + // generate and save indexes + for k := range il.getIndexKeysForEvent(evt) { + if err := iltxn.Put(k.dbi, k.key, val[0:12] /* pos */, 0); err != nil { + b.Logger.Warn().Str("name", il.name).Msg("failed to index event on layer") + } + } + + // add layer to the id index val + val = binary.BigEndian.AppendUint16(val, il.id) + + // store the id index with the new layer reference if err := mmmtxn.Put(b.indexId, evt.ID[0:8], val, 0); err != nil { - panic(fmt.Errorf("failed to store %x by id: %w", evt.ID[:], err)) - } - - // msync - _, _, errno := syscall.Syscall(syscall.SYS_MSYNC, - uintptr(unsafe.Pointer(&b.mmapf[0])), uintptr(len(b.mmapf)), syscall.MS_SYNC) - if errno != 0 { - panic(fmt.Errorf("msync failed: %w", syscall.Errno(errno))) + return false, fmt.Errorf("failed to store %x by id: %w", evt.ID[:], err) } return true, nil diff --git a/eventstore/mmm/testdata/fuzz/FuzzFreeRanges/0188ac89a089c7cd b/eventstore/mmm/testdata/fuzz/FuzzFreeRanges/0188ac89a089c7cd new file mode 100644 index 0000000..d658c9e --- /dev/null +++ b/eventstore/mmm/testdata/fuzz/FuzzFreeRanges/0188ac89a089c7cd @@ -0,0 +1,2 @@ +go test fuzz v1 +int(-367) diff --git a/eventstore/mmm/testdata/fuzz/FuzzFreeRanges/05eed5305d584366 b/eventstore/mmm/testdata/fuzz/FuzzFreeRanges/05eed5305d584366 new file mode 100644 index 0000000..24e7de6 --- /dev/null +++ b/eventstore/mmm/testdata/fuzz/FuzzFreeRanges/05eed5305d584366 @@ -0,0 +1,2 @@ +go test fuzz v1 +int(91) diff --git a/eventstore/mmm/testdata/fuzz/FuzzFreeRanges/17ecf610e929eefe b/eventstore/mmm/testdata/fuzz/FuzzFreeRanges/17ecf610e929eefe new file mode 100644 index 0000000..483fee3 --- /dev/null +++ b/eventstore/mmm/testdata/fuzz/FuzzFreeRanges/17ecf610e929eefe @@ -0,0 +1,2 @@ +go test fuzz v1 +int(-188)