diff --git a/relay.go b/relay.go index 9abf1bd..bdb72e8 100644 --- a/relay.go +++ b/relay.go @@ -423,6 +423,7 @@ func (r *Relay) PrepareSubscription(ctx context.Context, filter Filter, opts Sub ClosedReason: make(chan string, 1), Filter: filter, match: filter.Matches, + eoseTimedOut: make(chan struct{}), } sub.checkDuplicate = opts.CheckDuplicate @@ -447,6 +448,7 @@ func (r *Relay) PrepareSubscription(ctx context.Context, filter Filter, opts Sub go func() { time.Sleep(opts.MaxWaitForEOSE) + sub.eoseTimedOut <- struct{}{} sub.dispatchEose() }() } diff --git a/sdk/dataloader/dataloader.go b/sdk/dataloader/dataloader.go index 74d302b..06922b3 100644 --- a/sdk/dataloader/dataloader.go +++ b/sdk/dataloader/dataloader.go @@ -83,7 +83,9 @@ func (l *Loader[K, V]) Load(ctx context.Context, key K) (value V, err error) { case <-b.thresholdReached: case <-time.After(l.wait): l.batchLock.Lock() - l.curBatcher = l.newBatcher() + if l.curBatcher == b { + l.curBatcher = l.newBatcher() + } l.batchLock.Unlock() } diff --git a/subscription.go b/subscription.go index 7c20f65..13c179c 100644 --- a/subscription.go +++ b/subscription.go @@ -47,10 +47,11 @@ type Subscription struct { // if it returns true that event will not be processed further. checkDuplicateReplaceable func(rk ReplaceableKey, ts Timestamp) bool - match func(Event) bool // this will be either Filters.Match or Filters.MatchIgnoringTimestampConstraints - live atomic.Bool - eosed atomic.Bool - cancel context.CancelCauseFunc + match func(Event) bool // this will be either Filters.Match or Filters.MatchIgnoringTimestampConstraints + live atomic.Bool + eosed atomic.Bool + eoseTimedOut chan struct{} + cancel context.CancelCauseFunc // this keeps track of the events we've received before the EOSE that we must dispatch before // closing the EndOfStoredEvents channel @@ -90,25 +91,29 @@ func (sub *Subscription) start() { func (sub *Subscription) GetID() string { return sub.id } func (sub *Subscription) dispatchEvent(evt Event) { - added := false + isStored := false if !sub.eosed.Load() { sub.storedwg.Add(1) - added = true + isStored = true } go func() { - sub.mu.Lock() - defer sub.mu.Unlock() - - if sub.live.Load() { - select { - case sub.Events <- evt: - case <-sub.Context.Done(): + if isStored { + if sub.live.Load() { + select { + case sub.Events <- evt: + case <-sub.Context.Done(): + case <-sub.eoseTimedOut: + } } - } - - if added { sub.storedwg.Done() + } else { + if sub.live.Load() { + select { + case sub.Events <- evt: + case <-sub.Context.Done(): + } + } } }() }