fix potential subscription leaking bug with MaxWaitForEOSE never being effective.
This commit is contained in:
@@ -423,6 +423,7 @@ func (r *Relay) PrepareSubscription(ctx context.Context, filter Filter, opts Sub
|
|||||||
ClosedReason: make(chan string, 1),
|
ClosedReason: make(chan string, 1),
|
||||||
Filter: filter,
|
Filter: filter,
|
||||||
match: filter.Matches,
|
match: filter.Matches,
|
||||||
|
eoseTimedOut: make(chan struct{}),
|
||||||
}
|
}
|
||||||
|
|
||||||
sub.checkDuplicate = opts.CheckDuplicate
|
sub.checkDuplicate = opts.CheckDuplicate
|
||||||
@@ -447,6 +448,7 @@ func (r *Relay) PrepareSubscription(ctx context.Context, filter Filter, opts Sub
|
|||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
time.Sleep(opts.MaxWaitForEOSE)
|
time.Sleep(opts.MaxWaitForEOSE)
|
||||||
|
sub.eoseTimedOut <- struct{}{}
|
||||||
sub.dispatchEose()
|
sub.dispatchEose()
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -83,7 +83,9 @@ func (l *Loader[K, V]) Load(ctx context.Context, key K) (value V, err error) {
|
|||||||
case <-b.thresholdReached:
|
case <-b.thresholdReached:
|
||||||
case <-time.After(l.wait):
|
case <-time.After(l.wait):
|
||||||
l.batchLock.Lock()
|
l.batchLock.Lock()
|
||||||
l.curBatcher = l.newBatcher()
|
if l.curBatcher == b {
|
||||||
|
l.curBatcher = l.newBatcher()
|
||||||
|
}
|
||||||
l.batchLock.Unlock()
|
l.batchLock.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
+21
-16
@@ -47,10 +47,11 @@ type Subscription struct {
|
|||||||
// if it returns true that event will not be processed further.
|
// if it returns true that event will not be processed further.
|
||||||
checkDuplicateReplaceable func(rk ReplaceableKey, ts Timestamp) bool
|
checkDuplicateReplaceable func(rk ReplaceableKey, ts Timestamp) bool
|
||||||
|
|
||||||
match func(Event) bool // this will be either Filters.Match or Filters.MatchIgnoringTimestampConstraints
|
match func(Event) bool // this will be either Filters.Match or Filters.MatchIgnoringTimestampConstraints
|
||||||
live atomic.Bool
|
live atomic.Bool
|
||||||
eosed atomic.Bool
|
eosed atomic.Bool
|
||||||
cancel context.CancelCauseFunc
|
eoseTimedOut chan struct{}
|
||||||
|
cancel context.CancelCauseFunc
|
||||||
|
|
||||||
// this keeps track of the events we've received before the EOSE that we must dispatch before
|
// this keeps track of the events we've received before the EOSE that we must dispatch before
|
||||||
// closing the EndOfStoredEvents channel
|
// closing the EndOfStoredEvents channel
|
||||||
@@ -90,25 +91,29 @@ func (sub *Subscription) start() {
|
|||||||
func (sub *Subscription) GetID() string { return sub.id }
|
func (sub *Subscription) GetID() string { return sub.id }
|
||||||
|
|
||||||
func (sub *Subscription) dispatchEvent(evt Event) {
|
func (sub *Subscription) dispatchEvent(evt Event) {
|
||||||
added := false
|
isStored := false
|
||||||
if !sub.eosed.Load() {
|
if !sub.eosed.Load() {
|
||||||
sub.storedwg.Add(1)
|
sub.storedwg.Add(1)
|
||||||
added = true
|
isStored = true
|
||||||
}
|
}
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
sub.mu.Lock()
|
if isStored {
|
||||||
defer sub.mu.Unlock()
|
if sub.live.Load() {
|
||||||
|
select {
|
||||||
if sub.live.Load() {
|
case sub.Events <- evt:
|
||||||
select {
|
case <-sub.Context.Done():
|
||||||
case sub.Events <- evt:
|
case <-sub.eoseTimedOut:
|
||||||
case <-sub.Context.Done():
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
if added {
|
|
||||||
sub.storedwg.Done()
|
sub.storedwg.Done()
|
||||||
|
} else {
|
||||||
|
if sub.live.Load() {
|
||||||
|
select {
|
||||||
|
case sub.Events <- evt:
|
||||||
|
case <-sub.Context.Done():
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user