From ba25770f6585386900cdad440af24dc9f5e249e2 Mon Sep 17 00:00:00 2001 From: Yasuhiro Matsumoto Date: Fri, 24 Nov 2023 17:24:36 +0900 Subject: [PATCH] seenAlready must drop older events --- pool.go | 21 +++++++++++++++++++-- 1 file changed, 19 insertions(+), 2 deletions(-) diff --git a/pool.go b/pool.go index e79df03..b1b6cf0 100644 --- a/pool.go +++ b/pool.go @@ -9,6 +9,10 @@ import ( "github.com/puzpuzpuz/xsync/v2" ) +const ( + seenAlreadyDropTick = time.Minute +) + type SimplePool struct { Relays *xsync.MapOf[string, *Relay] Context context.Context @@ -68,7 +72,8 @@ func (pool *SimplePool) SubManyNonUnique(ctx context.Context, urls []string, fil func (pool *SimplePool) subMany(ctx context.Context, urls []string, filters Filters, unique bool) chan IncomingEvent { events := make(chan IncomingEvent) - seenAlready := xsync.NewMapOf[bool]() + seenAlready := xsync.NewMapOf[Timestamp]() + ticker := time.NewTicker(seenAlreadyDropTick) pending := xsync.NewCounter() initial := len(urls) @@ -87,12 +92,24 @@ func (pool *SimplePool) subMany(ctx context.Context, urls []string, filters Filt for evt := range sub.Events { if unique { - if _, seen := seenAlready.LoadOrStore(evt.ID, true); seen { + if _, seen := seenAlready.LoadOrStore(evt.ID, evt.CreatedAt); seen { continue } } select { + case <-ticker.C: + del := map[string]struct{}{} + old := Timestamp(time.Now().Add(-seenAlreadyDropTick).Unix()) + seenAlready.Range(func(key string, value Timestamp) bool { + if value < old { + del[evt.ID] = struct{}{} + } + return true + }) + for k := range del { + seenAlready.Delete(k) + } case events <- IncomingEvent{Event: evt, Relay: relay}: case <-ctx.Done(): return