From 7e6a0eb6144b8449e93536084e22032d0985f620 Mon Sep 17 00:00:00 2001 From: fiatjaf Date: Mon, 4 Aug 2025 23:37:03 -0300 Subject: [PATCH] mmm: copy new lmdb querying mechanism because it's so much more elegant. --- eventstore/internal/helpers.go | 64 ----- eventstore/mmm/fuzz_test.go | 1 - eventstore/mmm/helpers.go | 127 ++++++++- eventstore/mmm/query.go | 464 ++++++++------------------------- eventstore/mmm/replace.go | 15 +- 5 files changed, 246 insertions(+), 425 deletions(-) diff --git a/eventstore/internal/helpers.go b/eventstore/internal/helpers.go index f167cf8..8e362f2 100644 --- a/eventstore/internal/helpers.go +++ b/eventstore/internal/helpers.go @@ -3,9 +3,7 @@ package internal import ( "bytes" "math" - "slices" - mergesortedslices "fiatjaf.com/lib/merge-sorted-slices" "fiatjaf.com/nostr" ) @@ -78,68 +76,6 @@ func CopyMapWithoutKey[K comparable, V any](originalMap map[K]V, key K) map[K]V return newMap } -// MergeSortMultipleBatches takes the results of multiple iterators, which are already sorted, -// and merges them into a single big sorted slice -func MergeSortMultiple(batches [][]nostr.Event, limit int, dst []nostr.Event) []nostr.Event { - // clear up empty lists here while simultaneously computing the total count. - // this helps because if there are a bunch of empty lists then this pre-clean - // step will get us in the faster 'merge' branch otherwise we would go to the other. - // we would have to do the cleaning anyway inside it. - // and even if we still go on the other we save one iteration by already computing the - // total count. - total := 0 - for i := len(batches) - 1; i >= 0; i-- { - if len(batches[i]) == 0 { - batches = SwapDelete(batches, i) - } else { - total += len(batches[i]) - } - } - - if limit == -1 { - limit = total - } - - // this amazing equation will ensure that if one of the two sides goes very small (like 1 or 2) - // the other can go very high (like 500) and we're still in the 'merge' branch. - // if values go somewhere in the middle then they may match the 'merge' branch (batches=20,limit=70) - // or not (batches=25, limit=60) - if math.Log(float64(len(batches)*2))+math.Log(float64(limit)) < 8 { - if dst == nil { - dst = make([]nostr.Event, limit) - } else if cap(dst) < limit { - dst = slices.Grow(dst, limit-len(dst)) - } - dst = dst[0:limit] - return mergesortedslices.MergeFuncNoEmptyListsIntoSlice(dst, batches, nostr.CompareEvent) - } else { - if dst == nil { - dst = make([]nostr.Event, total) - } else if cap(dst) < total { - dst = slices.Grow(dst, total-len(dst)) - } - dst = dst[0:total] - - // use quicksort in a dumb way that will still be fast because it's cheated - lastIndex := 0 - for _, batch := range batches { - copy(dst[lastIndex:], batch) - lastIndex += len(batch) - } - - slices.SortFunc(dst, nostr.CompareEvent) - - for i, j := 0, total-1; i < j; i, j = i+1, j-1 { - dst[i], dst[j] = dst[j], dst[i] - } - - if limit < len(dst) { - return dst[0:limit] - } - return dst - } -} - // BatchSizePerNumberOfQueries tries to make an educated guess for the batch size given the total filter limit and // the number of abstract queries we'll be conducting at the same time func BatchSizePerNumberOfQueries(totalFilterLimit int, numberOfQueries int) int { diff --git a/eventstore/mmm/fuzz_test.go b/eventstore/mmm/fuzz_test.go index 1072b0e..d87b9ac 100644 --- a/eventstore/mmm/fuzz_test.go +++ b/eventstore/mmm/fuzz_test.go @@ -103,7 +103,6 @@ func FuzzTest(f *testing.F) { layer := mmm.layers[rnd.Int()%len(mmm.layers)] evt, layers := mmm.GetByID(id) - if slices.Contains(deleted[id], layer) { // already deleted from this layer require.NotContains(t, layers, layer) diff --git a/eventstore/mmm/helpers.go b/eventstore/mmm/helpers.go index 37d51e0..30fecd3 100644 --- a/eventstore/mmm/helpers.go +++ b/eventstore/mmm/helpers.go @@ -1,6 +1,7 @@ package mmm import ( + "bytes" "encoding/binary" "encoding/hex" "iter" @@ -12,14 +13,61 @@ import ( "github.com/PowerDNS/lmdb-go/lmdb" ) -// this iterator always goes backwards type iterator struct { + query query + + // iteration stuff cursor *lmdb.Cursor key []byte posb []byte err error + + // this keeps track of last timestamp value pulled from this + last uint32 + + // if we shouldn't fetch more from this + exhausted bool + + // results not yet emitted + posbs [][]byte + timestamps []uint32 } +func (it *iterator) pull(n int, since uint32) { + query := it.query + + for range n { + // in the beginning we already have a k and a v and an err from the cursor setup, so check and use these + if it.err != nil { + it.exhausted = true + return + } + + if len(it.key) != query.keySize || !bytes.HasPrefix(it.key, query.prefix) { + // we reached the end of this prefix + it.exhausted = true + return + } + + createdAt := binary.BigEndian.Uint32(it.key[len(it.key)-4:]) + if createdAt < since { + it.exhausted = true + return + } + + // got a key + it.posbs = append(it.posbs, it.posb) + it.timestamps = append(it.timestamps, createdAt) + it.last = createdAt + + // advance the cursor for the next call + it.next() + } + + return +} + +// goes backwards func (it *iterator) seek(key []byte) { if _, _, errsr := it.cursor.Get(key, nil, lmdb.SetRange); errsr != nil { if operr, ok := errsr.(*lmdb.OpError); !ok || operr.Errno != lmdb.NotFound { @@ -36,11 +84,88 @@ func (it *iterator) seek(key []byte) { } } +// goes backwards func (it *iterator) next() { // move one back (we'll look into k and v and err in the next iteration) it.key, it.posb, it.err = it.cursor.Get(nil, nil, lmdb.Prev) } +type iterators []*iterator + +// quickselect reorders the slice just enough to make the top k elements be arranged at the end +// i.e. [1, 700, 25, 312, 44, 28] with k=3 becomes something like [700, 312, 44, 1, 25, 28] +// in this case it's hardcoded to use the 'last' field of the iterator +// copied from https://github.com/chrislee87/go-quickselect +// this is modified to also return the highest 'last' (because it's not guaranteed it will be the first item) +func (its iterators) quickselect(k int) uint32 { + if len(its) == 0 || k >= len(its) { + return 0 + } + + left, right := 0, len(its)-1 + + for { + // insertion sort for small ranges + if right-left <= 20 { + for i := left + 1; i <= right; i++ { + for j := i; j > 0 && its[j].last > its[j-1].last; j-- { + its[j], its[j-1] = its[j-1], its[j] + } + } + return its[0].last + } + + // median-of-three to choose pivot + pivotIndex := left + (right-left)/2 + if its[right].last > its[left].last { + its[right], its[left] = its[left], its[right] + } + if its[pivotIndex].last > its[left].last { + its[pivotIndex], its[left] = its[left], its[pivotIndex] + } + if its[right].last > its[pivotIndex].last { + its[right], its[pivotIndex] = its[pivotIndex], its[right] + } + + // partition + its[left], its[pivotIndex] = its[pivotIndex], its[left] + ll := left + 1 + rr := right + for ll <= rr { + for ll <= right && its[ll].last > its[left].last { + ll++ + } + for rr >= left && its[left].last > its[rr].last { + rr-- + } + if ll <= rr { + its[ll], its[rr] = its[rr], its[ll] + ll++ + rr-- + } + } + its[left], its[rr] = its[rr], its[left] // swap into right place + pivotIndex = rr + + if k == pivotIndex { + // now that stuff is selected we get the highest "last" + highest := its[0].last + for i := 1; i < k; i++ { + if its[i].last > highest { + highest = its[i].last + } + } + return highest + } + + if k < pivotIndex { + right = pivotIndex - 1 + } else { + left = pivotIndex + 1 + } + } +} + type key struct { dbi lmdb.DBI key []byte diff --git a/eventstore/mmm/query.go b/eventstore/mmm/query.go index 32a7b13..019b505 100644 --- a/eventstore/mmm/query.go +++ b/eventstore/mmm/query.go @@ -1,11 +1,11 @@ package mmm import ( - "bytes" "encoding/binary" "fmt" "iter" "log" + "math" "slices" "fiatjaf.com/nostr" @@ -16,18 +16,15 @@ import ( // GetByID returns the event -- if found in this mmm -- and all the IndexingLayers it belongs to. func (b *MultiMmapManager) GetByID(id nostr.ID) (*nostr.Event, IndexingLayers) { - presence := make(chan []uint16) - var event *nostr.Event - b.queryByIDs(func(evt nostr.Event) bool { + layers := b.queryByIDs([]nostr.ID{id}, func(evt nostr.Event) bool { event = &evt return false - }, []nostr.ID{id}, presence) + }, true) if event != nil { - p := <-presence - present := make([]*IndexingLayer, len(p)) - for i, id := range p { + present := make([]*IndexingLayer, len(layers)) + for i, id := range layers { present[i] = b.layers.ByID(id) } return event, present @@ -37,14 +34,9 @@ func (b *MultiMmapManager) GetByID(id nostr.ID) (*nostr.Event, IndexingLayers) { } // queryByIDs emits the events of the given id to the given channel if they exist anywhere in this mmm. -// if presence is given it will also be used to emit slices of the ids of the IndexingLayers this event is stored in. -// it closes the channels when it ends. -func (b *MultiMmapManager) queryByIDs(yield func(nostr.Event) bool, ids []nostr.ID, presence chan []uint16) { +func (b *MultiMmapManager) queryByIDs(ids []nostr.ID, yield func(nostr.Event) bool, withLayers bool) (layers []uint16) { b.lmdbEnv.View(func(txn *lmdb.Txn) error { txn.RawRead = true - if presence != nil { - defer close(presence) - } for _, id := range ids { val, err := txn.Get(b.indexId, id[0:8]) @@ -55,30 +47,34 @@ func (b *MultiMmapManager) queryByIDs(yield func(nostr.Event) bool, ids []nostr. panic(fmt.Errorf("failed to decode event from %v: %w", pos, err)) } - if !yield(evt) { - return nil - } + stop := yield(evt) - if presence != nil { - layers := make([]uint16, 0, (len(val)-12)/2) + if withLayers { + layers = make([]uint16, 0, (len(val)-12)/2) for s := 12; s < len(val); s += 2 { layers = append(layers, binary.BigEndian.Uint16(val[s:s+2])) } - presence <- layers + } + + if stop { + return nil } } } return nil }) + + return layers } func (il *IndexingLayer) QueryEvents(filter nostr.Filter, maxLimit int) iter.Seq[nostr.Event] { return func(yield func(nostr.Event) bool) { if len(filter.IDs) > 0 { - il.mmmm.queryByIDs(yield, filter.IDs, nil) + il.mmmm.queryByIDs(filter.IDs, yield, false) return } + if filter.Search != "" { return } @@ -96,361 +92,121 @@ func (il *IndexingLayer) QueryEvents(filter nostr.Filter, maxLimit int) iter.Seq il.lmdbEnv.View(func(txn *lmdb.Txn) error { txn.RawRead = true - results, err := il.query(txn, filter, maxLimit) - for _, ie := range results { - if !yield(ie.Event) { - break - } - } - - return err + return il.query(txn, filter, maxLimit, yield) }) } } -func (il *IndexingLayer) query(txn *lmdb.Txn, filter nostr.Filter, limit int) ([]internal.IterEvent, error) { +func (il *IndexingLayer) query(txn *lmdb.Txn, filter nostr.Filter, limit int, yield func(nostr.Event) bool) error { queries, extraAuthors, extraKinds, extraTagKey, extraTagValues, since, err := il.prepareQueries(filter) if err != nil { - return nil, err + return err } - iterators := make([]*iterator, len(queries)) - exhausted := make([]bool, len(queries)) // indicates that a query won't be used anymore - results := make([][]internal.IterEvent, len(queries)) - pulledPerQuery := make([]int, len(queries)) + iterators := make(iterators, len(queries)) + batchSizePerQuery := internal.BatchSizePerNumberOfQueries(limit, len(queries)) - // these are kept updated so we never pull from the iterator that is at further distance - // (i.e. the one that has the oldest event among all) - // we will continue to pull from it as soon as some other iterator takes the position - oldest := internal.IterEvent{Q: -1} - - sndPhase := false // after we have gathered enough events we will change the way we iterate - secondBatch := make([][]internal.IterEvent, 0, len(queries)+1) - sndPhaseParticipants := make([]int, 0, len(queries)+1) - - // while merging results in the second phase we will alternate between these two lists - // to avoid having to create new lists all the time - var sndPhaseResultsA []internal.IterEvent - var sndPhaseResultsB []internal.IterEvent - var sndPhaseResultsToggle bool // this is just a dummy thing we use to keep track of the alternating - var sndPhaseHasResultsPending bool - - remainingUnexhausted := len(queries) // when all queries are exhausted we can finally end this thing - batchSizePerQuery := internal.BatchSizePerNumberOfQueries(limit, remainingUnexhausted) - firstPhaseTotalPulled := 0 - - exhaust := func(q int) { - exhausted[q] = true - remainingUnexhausted-- - if q == oldest.Q { - oldest = internal.IterEvent{Q: -1} - } - } - - var firstPhaseResults []internal.IterEvent - - for q := range queries { + for q, query := range queries { cursor, err := txn.OpenCursor(queries[q].dbi) if err != nil { - return nil, err + return err } - iterators[q] = &iterator{cursor: cursor} + iterators[q] = &iterator{ + query: query, + cursor: cursor, + } + defer cursor.Close() iterators[q].seek(queries[q].startingPoint) - results[q] = make([]internal.IterEvent, 0, batchSizePerQuery*2) } - // fmt.Println("queries", len(queries)) + // initial pull from all queries + for i := range iterators { + iterators[i].pull(batchSizePerQuery, since) + } - for c := 0; ; c++ { - batchSizePerQuery = internal.BatchSizePerNumberOfQueries(limit, remainingUnexhausted) + numberOfIteratorsToPullOnEachRound := max(1, int(math.Ceil(float64(len(iterators))/float64(12)))) + totalEventsEmitted := 0 + tempResults := make([]nostr.Event, 0, batchSizePerQuery*2) - // fmt.Println(" iteration", c, "remaining", remainingUnexhausted, "batchsize", batchSizePerQuery) - // we will go through all the iterators in batches until we have pulled all the required results - for q, query := range queries { - if exhausted[q] { + for len(iterators) > 0 { + // reset stuff + tempResults = tempResults[:0] + + // after pulling from all iterators once we now find out what iterators are + // the ones we should keep pulling from next (i.e. which one's last emitted timestamp is the highest) + threshold := iterators.quickselect(min(numberOfIteratorsToPullOnEachRound, len(iterators))) + + // so we can emit all the events higher than the threshold + for i := range iterators { + for t := 0; t < len(iterators[i].timestamps); t++ { + if iterators[i].timestamps[t] >= threshold { + posb := iterators[i].posbs[t] + + // discard this regardless of what happens + iterators[i].timestamps = internal.SwapDelete(iterators[i].timestamps, t) + iterators[i].posbs = internal.SwapDelete(iterators[i].posbs, t) + t-- + + // fetch actual event + pos := positionFromBytes(posb) + bin := il.mmmm.mmapf[pos.start : pos.start+uint64(pos.size)] + + // check it against pubkeys without decoding the entire thing + if extraAuthors != nil && !slices.Contains(extraAuthors, betterbinary.GetPubKey(bin)) { + continue + } + + // check it against kinds without decoding the entire thing + if extraKinds != nil && !slices.Contains(extraKinds, betterbinary.GetKind(bin)) { + continue + } + + // decode the entire thing + event := nostr.Event{} + if err := betterbinary.Unmarshal(bin, &event); err != nil { + log.Printf("lmdb: value read error (id %x) on query prefix %x sp %x dbi %v: %s\n", + betterbinary.GetID(bin), iterators[i].query.prefix, iterators[i].query.startingPoint, iterators[i].query.dbi, err) + continue + } + + // if there is still a tag to be checked, do it now + if extraTagValues != nil && !event.Tags.ContainsAny(extraTagKey, extraTagValues) { + continue + } + + tempResults = append(tempResults, event) + } + } + } + + // emit this stuff in order + slices.SortFunc(tempResults, nostr.CompareEventReverse) + for _, evt := range tempResults { + if !yield(evt) { + return nil + } + + totalEventsEmitted++ + if totalEventsEmitted == limit { + return nil + } + } + + // now pull more events + for i := 0; i < min(len(iterators), numberOfIteratorsToPullOnEachRound); i++ { + if iterators[i].exhausted { + if len(iterators[i].posbs) == 0 { + // eliminating this from the list of iterators + iterators = internal.SwapDelete(iterators, i) + i-- + } continue } - if oldest.Q == q && remainingUnexhausted > 1 { - continue - } - // fmt.Println(" query", q, unsafe.Pointer(&results[q]), hex.EncodeToString(query.prefix), len(results[q])) - it := iterators[q] - pulledThisIteration := 0 - - for { - // we already have a k and a v and an err from the cursor setup, so check and use these - if it.err != nil || - len(it.key) != query.keySize || - !bytes.HasPrefix(it.key, query.prefix) { - // either iteration has errored or we reached the end of this prefix - // fmt.Println(" reached end", it.key, query.keySize, query.prefix) - exhaust(q) - break - } - - // "id" indexes don't contain a timestamp - if query.timestampSize == 4 { - createdAt := binary.BigEndian.Uint32(it.key[len(it.key)-4:]) - if createdAt < since { - // fmt.Println(" reached since", createdAt, "<", since) - exhaust(q) - break - } - } - - // fetch actual event - pos := positionFromBytes(it.posb) - bin := il.mmmm.mmapf[pos.start : pos.start+uint64(pos.size)] - - // check it against pubkeys without decoding the entire thing - if extraAuthors != nil && !slices.Contains(extraAuthors, betterbinary.GetPubKey(bin)) { - it.next() - continue - } - - // check it against kinds without decoding the entire thing - if extraKinds != nil && !slices.Contains(extraKinds, betterbinary.GetKind(bin)) { - it.next() - continue - } - - // decode the entire thing (TODO: do a conditional decode while also checking the extra tag) - event := nostr.Event{} - if err := betterbinary.Unmarshal(bin, &event); err != nil { - log.Printf("mmm: value read error (id %x) on query prefix %x sp %x dbi %d: %s\n", - bin[0:32], query.prefix, query.startingPoint, query.dbi, err) - return nil, fmt.Errorf("event read error: %w", err) - } - - // fmt.Println(" event", betterbinary.GetID(bin), "kind", betterbinary.GetKind(bin).Num(), "author", betterbinary.GetPubKey(bin), "ts", betterbinary.GetCreatedAt(bin), hex.EncodeToString(it.key), it.valIdx) - - // if there is still a tag to be checked, do it now - if extraTagValues != nil && !event.Tags.ContainsAny(extraTagKey, extraTagValues) { - it.next() - continue - } - - // this event is good to be used - evt := internal.IterEvent{Event: event, Q: q} - // - // - if sndPhase { - // do the process described below at HIWAWVRTP. - // if we've reached here this means we've already passed the `since` check. - // now we have to eliminate the event currently at the `since` threshold. - nextThreshold := firstPhaseResults[len(firstPhaseResults)-2] - if oldest.Event.ID == nostr.ZeroID { - // fmt.Println(" b1", evt.ID[0:8]) - // BRANCH WHEN WE DON'T HAVE THE OLDEST EVENT (BWWDHTOE) - // when we don't have the oldest set, we will keep the results - // and not change the cutting point -- it's bad, but hopefully not that bad. - results[q] = append(results[q], evt) - sndPhaseHasResultsPending = true - } else if nextThreshold.CreatedAt > oldest.CreatedAt { - // fmt.Println(" b2", nextThreshold.CreatedAt, ">", oldest.CreatedAt, evt.ID[0:8]) - // one of the events we have stored is the actual next threshold - // eliminate last, update since with oldest - firstPhaseResults = firstPhaseResults[0 : len(firstPhaseResults)-1] - since = uint32(oldest.CreatedAt) - // fmt.Println(" new since", since, evt.ID[0:8]) - // we null the oldest Event as we can't rely on it anymore - // (we'll fall under BWWDHTOE above) until we have a new oldest set. - oldest = internal.IterEvent{Q: -1} - // anything we got that would be above this won't trigger an update to - // the oldest anyway, because it will be discarded as being after the limit. - // - // finally - // add this to the results to be merged later - results[q] = append(results[q], evt) - sndPhaseHasResultsPending = true - } else if nextThreshold.CreatedAt < evt.CreatedAt { - // the next last event in the firstPhaseResults is the next threshold - // fmt.Println(" b3", nextThreshold.CreatedAt, "<", oldest.CreatedAt, evt.ID[0:8]) - // eliminate last, update since with the antelast - firstPhaseResults = firstPhaseResults[0 : len(firstPhaseResults)-1] - since = uint32(nextThreshold.CreatedAt) - // fmt.Println(" new since", since) - // add this to the results to be merged later - results[q] = append(results[q], evt) - sndPhaseHasResultsPending = true - // update the oldest event - if evt.CreatedAt < oldest.CreatedAt { - oldest = evt - } - } else { - // fmt.Println(" b4", evt.ID[0:8]) - // oops, _we_ are the next `since` threshold - firstPhaseResults[len(firstPhaseResults)-1] = evt - since = uint32(evt.CreatedAt) - // fmt.Println(" new since", since) - // do not add us to the results to be merged later - // as we're already inhabiting the firstPhaseResults slice - } - } else { - results[q] = append(results[q], evt) - firstPhaseTotalPulled++ - - // update the oldest event - if oldest.Event.ID == nostr.ZeroID || evt.CreatedAt < oldest.CreatedAt { - oldest = evt - } - } - - pulledPerQuery[q]++ - pulledThisIteration++ - if pulledThisIteration > batchSizePerQuery { - // batch filled - it.next() - // fmt.Println(" filled", hex.EncodeToString(it.key), it.valIdx) - break - } - if pulledPerQuery[q] >= limit { - // batch filled + reached limit for this query (which is the global limit) - exhaust(q) - it.next() - break - } - - it.next() - } - } - - // we will do this check if we don't accumulated the requested number of events yet - // fmt.Println("oldest", oldest.Event, "from iter", oldest.Q) - if sndPhase && sndPhaseHasResultsPending && (oldest.Event.ID == nostr.ZeroID || remainingUnexhausted == 0) { - // fmt.Println("second phase aggregation!") - // when we are in the second phase we will aggressively aggregate results on every iteration - // - secondBatch = secondBatch[:0] - for s := 0; s < len(sndPhaseParticipants); s++ { - q := sndPhaseParticipants[s] - - if len(results[q]) > 0 { - secondBatch = append(secondBatch, results[q]) - } - - if exhausted[q] { - sndPhaseParticipants = internal.SwapDelete(sndPhaseParticipants, s) - s-- - } - } - - // every time we get here we will alternate between these A and B lists - // combining everything we have into a new partial results list. - // after we've done that we can again set the oldest. - // fmt.Println(" xxx", sndPhaseResultsToggle) - if sndPhaseResultsToggle { - secondBatch = append(secondBatch, sndPhaseResultsB) - sndPhaseResultsA = internal.MergeSortMultiple(secondBatch, limit, sndPhaseResultsA) - oldest = sndPhaseResultsA[len(sndPhaseResultsA)-1] - // fmt.Println(" new aggregated a", len(sndPhaseResultsB)) - } else { - secondBatch = append(secondBatch, sndPhaseResultsA) - sndPhaseResultsB = internal.MergeSortMultiple(secondBatch, limit, sndPhaseResultsB) - oldest = sndPhaseResultsB[len(sndPhaseResultsB)-1] - // fmt.Println(" new aggregated b", len(sndPhaseResultsB)) - } - sndPhaseResultsToggle = !sndPhaseResultsToggle - - since = uint32(oldest.CreatedAt) - // fmt.Println(" new since", since) - - // reset the `results` list so we can keep using it - results = results[:len(queries)] - for _, q := range sndPhaseParticipants { - results[q] = results[q][:0] - } - } else if !sndPhase && firstPhaseTotalPulled >= limit && remainingUnexhausted > 0 { - // fmt.Println("have enough!", firstPhaseTotalPulled, "/", limit, "remaining", remainingUnexhausted) - - // we will exclude this oldest number as it is not relevant anymore - // (we now want to keep track only of the oldest among the remaining iterators) - oldest = internal.IterEvent{Q: -1} - - // HOW IT WORKS AFTER WE'VE REACHED THIS POINT (HIWAWVRTP) - // now we can combine the results we have and check what is our current oldest event. - // we also discard anything that is after the current cutting point (`limit`). - // so if we have [1,2,3], [10, 15, 20] and [7, 21, 49] but we only want 6 total - // we can just keep [1,2,3,7,10,15] and discard [20, 21, 49], - // and also adjust our `since` parameter to `15`, discarding anything we get after it - // and immediately declaring that iterator exhausted. - // also every time we get result that is more recent than this updated `since` we can - // keep it but also discard the previous since, moving the needle one back -- for example, - // if we get an `8` we can keep it and move the `since` parameter to `10`, discarding `15` - // in the process. - all := make([][]internal.IterEvent, len(results)) - copy(all, results) // we have to use this otherwise internal.MergeSortMultiple will scramble our results slice - firstPhaseResults = internal.MergeSortMultiple(all, limit, nil) - oldest = firstPhaseResults[limit-1] - since = uint32(oldest.CreatedAt) - // fmt.Println("new since", since) - - for q := range queries { - if exhausted[q] { - continue - } - - // we also automatically exhaust any of the iterators that have already passed the - // cutting point (`since`) - if results[q][len(results[q])-1].CreatedAt < oldest.CreatedAt { - exhausted[q] = true - remainingUnexhausted-- - continue - } - - // for all the remaining iterators, - // since we have merged all the events in this `firstPhaseResults` slice, we can empty the - // current `results` slices and reuse them. - results[q] = results[q][:0] - - // build this index of indexes with everybody who remains - sndPhaseParticipants = append(sndPhaseParticipants, q) - } - - // we create these two lists and alternate between them so we don't have to create a - // a new one every time - sndPhaseResultsA = make([]internal.IterEvent, 0, limit*2) - sndPhaseResultsB = make([]internal.IterEvent, 0, limit*2) - - // from now on we won't run this block anymore - sndPhase = true - } - - // fmt.Println("remaining", remainingUnexhausted) - if remainingUnexhausted == 0 { - break + iterators[i].pull(batchSizePerQuery, since) } } - // fmt.Println("is sndPhase?", sndPhase) - - var combinedResults []internal.IterEvent - - if sndPhase { - // fmt.Println("ending second phase") - // when we reach this point either sndPhaseResultsA or sndPhaseResultsB will be full of stuff, - // the other will be empty - var sndPhaseResults []internal.IterEvent - // fmt.Println("xxx", sndPhaseResultsToggle, len(sndPhaseResultsA), len(sndPhaseResultsB)) - if sndPhaseResultsToggle { - sndPhaseResults = sndPhaseResultsB - combinedResults = sndPhaseResultsA[0:limit] // reuse this - // fmt.Println(" using b", len(sndPhaseResultsA)) - } else { - sndPhaseResults = sndPhaseResultsA - combinedResults = sndPhaseResultsB[0:limit] // reuse this - // fmt.Println(" using a", len(sndPhaseResultsA)) - } - - all := [][]internal.IterEvent{firstPhaseResults, sndPhaseResults} - combinedResults = internal.MergeSortMultiple(all, limit, combinedResults) - // fmt.Println("final combinedResults", len(combinedResults), cap(combinedResults), limit) - } else { - combinedResults = make([]internal.IterEvent, limit) - combinedResults = internal.MergeSortMultiple(results, limit, combinedResults) - } - - return combinedResults, nil + return nil } diff --git a/eventstore/mmm/replace.go b/eventstore/mmm/replace.go index 9b769eb..301a1f2 100644 --- a/eventstore/mmm/replace.go +++ b/eventstore/mmm/replace.go @@ -2,6 +2,7 @@ package mmm import ( "fmt" + "iter" "math" "runtime" @@ -33,16 +34,20 @@ func (il *IndexingLayer) ReplaceEvent(evt nostr.Event) error { return il.lmdbEnv.Update(func(iltxn *lmdb.Txn) error { // now we fetch the past events, whatever they are, delete them and then save the new - prevResults, err := il.query(iltxn, filter, 10) // in theory limit could be just 1 and this should work + var yield_ func(nostr.Event) bool + var results iter.Seq[nostr.Event] = func(yield func(nostr.Event) bool) { + yield_ = yield + } + err := il.query(iltxn, filter, 10 /* in theory limit could be just 1 and this should work */, yield_) if err != nil { return fmt.Errorf("failed to query past events with %s: %w", filter, err) } shouldStore := true - for _, previous := range prevResults { - if internal.IsOlder(previous.Event, evt) { - if err := il.delete(mmmtxn, iltxn, previous.Event.ID); err != nil { - return fmt.Errorf("failed to delete event %s for replacing: %w", previous.Event.ID, err) + for previous := range results { + if internal.IsOlder(previous, evt) { + if err := il.delete(mmmtxn, iltxn, previous.ID); err != nil { + return fmt.Errorf("failed to delete event %s for replacing: %w", previous.ID, err) } } else { // there is a newer event already stored, so we won't store this