seenAlready must drop older events
This commit is contained in:
committed by
fiatjaf_
parent
8a540998b9
commit
ba25770f65
@@ -9,6 +9,10 @@ import (
|
|||||||
"github.com/puzpuzpuz/xsync/v2"
|
"github.com/puzpuzpuz/xsync/v2"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
seenAlreadyDropTick = time.Minute
|
||||||
|
)
|
||||||
|
|
||||||
type SimplePool struct {
|
type SimplePool struct {
|
||||||
Relays *xsync.MapOf[string, *Relay]
|
Relays *xsync.MapOf[string, *Relay]
|
||||||
Context context.Context
|
Context context.Context
|
||||||
@@ -68,7 +72,8 @@ func (pool *SimplePool) SubManyNonUnique(ctx context.Context, urls []string, fil
|
|||||||
|
|
||||||
func (pool *SimplePool) subMany(ctx context.Context, urls []string, filters Filters, unique bool) chan IncomingEvent {
|
func (pool *SimplePool) subMany(ctx context.Context, urls []string, filters Filters, unique bool) chan IncomingEvent {
|
||||||
events := make(chan IncomingEvent)
|
events := make(chan IncomingEvent)
|
||||||
seenAlready := xsync.NewMapOf[bool]()
|
seenAlready := xsync.NewMapOf[Timestamp]()
|
||||||
|
ticker := time.NewTicker(seenAlreadyDropTick)
|
||||||
|
|
||||||
pending := xsync.NewCounter()
|
pending := xsync.NewCounter()
|
||||||
initial := len(urls)
|
initial := len(urls)
|
||||||
@@ -87,12 +92,24 @@ func (pool *SimplePool) subMany(ctx context.Context, urls []string, filters Filt
|
|||||||
|
|
||||||
for evt := range sub.Events {
|
for evt := range sub.Events {
|
||||||
if unique {
|
if unique {
|
||||||
if _, seen := seenAlready.LoadOrStore(evt.ID, true); seen {
|
if _, seen := seenAlready.LoadOrStore(evt.ID, evt.CreatedAt); seen {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
select {
|
select {
|
||||||
|
case <-ticker.C:
|
||||||
|
del := map[string]struct{}{}
|
||||||
|
old := Timestamp(time.Now().Add(-seenAlreadyDropTick).Unix())
|
||||||
|
seenAlready.Range(func(key string, value Timestamp) bool {
|
||||||
|
if value < old {
|
||||||
|
del[evt.ID] = struct{}{}
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
})
|
||||||
|
for k := range del {
|
||||||
|
seenAlready.Delete(k)
|
||||||
|
}
|
||||||
case events <- IncomingEvent{Event: evt, Relay: relay}:
|
case events <- IncomingEvent{Event: evt, Relay: relay}:
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return
|
return
|
||||||
|
|||||||
Reference in New Issue
Block a user