trying to prevent leaking subscriptions.
This commit is contained in:
@@ -384,10 +384,12 @@ func (r *Relay) Subscribe(ctx context.Context, filter Filter, opts SubscriptionO
|
|||||||
sub := r.PrepareSubscription(ctx, filter, opts)
|
sub := r.PrepareSubscription(ctx, filter, opts)
|
||||||
|
|
||||||
if r.conn == nil {
|
if r.conn == nil {
|
||||||
|
sub.unsub(ErrNotConnected)
|
||||||
return nil, fmt.Errorf("not connected to %s", r.URL)
|
return nil, fmt.Errorf("not connected to %s", r.URL)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := sub.Fire(); err != nil {
|
if err := sub.Fire(); err != nil {
|
||||||
|
sub.unsub(ErrFireFailed)
|
||||||
return nil, fmt.Errorf("couldn't subscribe to %v at %s: %w", filter, r.URL, err)
|
return nil, fmt.Errorf("couldn't subscribe to %v at %s: %w", filter, r.URL, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -396,6 +398,7 @@ func (r *Relay) Subscribe(ctx context.Context, filter Filter, opts SubscriptionO
|
|||||||
case <-r.closedNotify:
|
case <-r.closedNotify:
|
||||||
sub.unsub(ErrDisconnected)
|
sub.unsub(ErrDisconnected)
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
|
sub.unsub(nil)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
|||||||
@@ -3,6 +3,7 @@ package sdk
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"sync"
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"fiatjaf.com/nostr"
|
"fiatjaf.com/nostr"
|
||||||
@@ -55,14 +56,15 @@ func (sys *System) batchLoadAddressableEvents(
|
|||||||
cm := sync.Mutex{}
|
cm := sync.Mutex{}
|
||||||
|
|
||||||
aggregatedContext, aggregatedCancel := context.WithCancel(context.Background())
|
aggregatedContext, aggregatedCancel := context.WithCancel(context.Background())
|
||||||
waiting := len(pubkeys)
|
waiting := atomic.Int32{}
|
||||||
|
waiting.Add(int32(len(pubkeys)))
|
||||||
|
|
||||||
for i, pubkey := range pubkeys {
|
for i, pubkey := range pubkeys {
|
||||||
ctx, cancel := context.WithCancel(ctxs[i])
|
ctx, cancel := context.WithCancel(ctxs[i])
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
// build batched queries for the external relays
|
// build batched queries for the external relays
|
||||||
go func(i int, pubkey nostr.PubKey) {
|
go func(i int, pubkey nostr.PubKey, ctx context.Context) {
|
||||||
// gather relays we'll use for this pubkey
|
// gather relays we'll use for this pubkey
|
||||||
relays := sys.determineRelaysToQuery(ctx, pubkey, kind)
|
relays := sys.determineRelaysToQuery(ctx, pubkey, kind)
|
||||||
|
|
||||||
@@ -92,11 +94,10 @@ func (sys *System) batchLoadAddressableEvents(
|
|||||||
wg.Done()
|
wg.Done()
|
||||||
|
|
||||||
<-ctx.Done()
|
<-ctx.Done()
|
||||||
waiting--
|
if waiting.Add(-1) == 0 {
|
||||||
if waiting == 0 {
|
|
||||||
aggregatedCancel()
|
aggregatedCancel()
|
||||||
}
|
}
|
||||||
}(i, pubkey)
|
}(i, pubkey, ctx)
|
||||||
}
|
}
|
||||||
|
|
||||||
// wait for relay batches to be prepared
|
// wait for relay batches to be prepared
|
||||||
|
|||||||
@@ -68,7 +68,7 @@ func NewBatchedLoader[K comparable, V any](batchFn BatchFunc[K, V], opts Options
|
|||||||
// The first context passed to this function within a given batch window will be provided to
|
// The first context passed to this function within a given batch window will be provided to
|
||||||
// the registered BatchFunc.
|
// the registered BatchFunc.
|
||||||
func (l *Loader[K, V]) Load(ctx context.Context, key K) (value V, err error) {
|
func (l *Loader[K, V]) Load(ctx context.Context, key K) (value V, err error) {
|
||||||
c := make(chan Result[V])
|
c := make(chan Result[V], 1)
|
||||||
|
|
||||||
// this is sent to batch fn. It contains the key and the channel to return
|
// this is sent to batch fn. It contains the key and the channel to return
|
||||||
// the result on
|
// the result on
|
||||||
@@ -117,11 +117,15 @@ func (l *Loader[K, V]) Load(ctx context.Context, key K) (value V, err error) {
|
|||||||
|
|
||||||
l.batchLock.Unlock()
|
l.batchLock.Unlock()
|
||||||
|
|
||||||
if v, ok := <-c; ok {
|
select {
|
||||||
return v.Data, v.Error
|
case v, ok := <-c:
|
||||||
|
if ok {
|
||||||
|
return v.Data, v.Error
|
||||||
|
}
|
||||||
|
return value, NoValueError
|
||||||
|
case <-ctx.Done():
|
||||||
|
return value, ctx.Err()
|
||||||
}
|
}
|
||||||
|
|
||||||
return value, NoValueError
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type batcher[K comparable, V any] struct {
|
type batcher[K comparable, V any] struct {
|
||||||
|
|||||||
@@ -85,7 +85,7 @@ func (sys *System) batchLoadReplaceableEvents(
|
|||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
// build batched queries for the external relays
|
// build batched queries for the external relays
|
||||||
go func(i int, pubkey nostr.PubKey) {
|
go func(i int, pubkey nostr.PubKey, ctx context.Context) {
|
||||||
// gather relays we'll use for this pubkey
|
// gather relays we'll use for this pubkey
|
||||||
relays := sys.determineRelaysToQuery(ctx, pubkey, kind)
|
relays := sys.determineRelaysToQuery(ctx, pubkey, kind)
|
||||||
|
|
||||||
@@ -118,7 +118,7 @@ func (sys *System) batchLoadReplaceableEvents(
|
|||||||
if waiting.Add(-1) == 0 {
|
if waiting.Add(-1) == 0 {
|
||||||
aggregatedCancel()
|
aggregatedCancel()
|
||||||
}
|
}
|
||||||
}(i, pubkey)
|
}(i, pubkey, ctx)
|
||||||
}
|
}
|
||||||
|
|
||||||
// query all relays with the prepared filters
|
// query all relays with the prepared filters
|
||||||
|
|||||||
@@ -9,6 +9,11 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
ErrNotConnected = errors.New("not connected")
|
||||||
|
ErrFireFailed = errors.New("failed to fire")
|
||||||
|
)
|
||||||
|
|
||||||
// Subscription represents a subscription to a relay.
|
// Subscription represents a subscription to a relay.
|
||||||
type Subscription struct {
|
type Subscription struct {
|
||||||
counter int64
|
counter int64
|
||||||
|
|||||||
Reference in New Issue
Block a user