From c31b92707bd45af38151b8b6b9a304ae9fe3e5bd Mon Sep 17 00:00:00 2001 From: fiatjaf Date: Thu, 26 Feb 2026 23:01:18 -0300 Subject: [PATCH] trying to prevent leaking subscriptions. --- relay.go | 3 +++ sdk/addressable_loader.go | 11 ++++++----- sdk/dataloader/dataloader.go | 14 +++++++++----- sdk/replaceable_loader.go | 4 ++-- subscription.go | 5 +++++ 5 files changed, 25 insertions(+), 12 deletions(-) diff --git a/relay.go b/relay.go index d9a5b29..9abf1bd 100644 --- a/relay.go +++ b/relay.go @@ -384,10 +384,12 @@ func (r *Relay) Subscribe(ctx context.Context, filter Filter, opts SubscriptionO sub := r.PrepareSubscription(ctx, filter, opts) if r.conn == nil { + sub.unsub(ErrNotConnected) return nil, fmt.Errorf("not connected to %s", r.URL) } if err := sub.Fire(); err != nil { + sub.unsub(ErrFireFailed) return nil, fmt.Errorf("couldn't subscribe to %v at %s: %w", filter, r.URL, err) } @@ -396,6 +398,7 @@ func (r *Relay) Subscribe(ctx context.Context, filter Filter, opts SubscriptionO case <-r.closedNotify: sub.unsub(ErrDisconnected) case <-ctx.Done(): + sub.unsub(nil) } }() diff --git a/sdk/addressable_loader.go b/sdk/addressable_loader.go index 5a5a12a..625e461 100644 --- a/sdk/addressable_loader.go +++ b/sdk/addressable_loader.go @@ -3,6 +3,7 @@ package sdk import ( "context" "sync" + "sync/atomic" "time" "fiatjaf.com/nostr" @@ -55,14 +56,15 @@ func (sys *System) batchLoadAddressableEvents( cm := sync.Mutex{} aggregatedContext, aggregatedCancel := context.WithCancel(context.Background()) - waiting := len(pubkeys) + waiting := atomic.Int32{} + waiting.Add(int32(len(pubkeys))) for i, pubkey := range pubkeys { ctx, cancel := context.WithCancel(ctxs[i]) defer cancel() // build batched queries for the external relays - go func(i int, pubkey nostr.PubKey) { + go func(i int, pubkey nostr.PubKey, ctx context.Context) { // gather relays we'll use for this pubkey relays := sys.determineRelaysToQuery(ctx, pubkey, kind) @@ -92,11 +94,10 @@ func (sys *System) batchLoadAddressableEvents( wg.Done() <-ctx.Done() - waiting-- - if waiting == 0 { + if waiting.Add(-1) == 0 { aggregatedCancel() } - }(i, pubkey) + }(i, pubkey, ctx) } // wait for relay batches to be prepared diff --git a/sdk/dataloader/dataloader.go b/sdk/dataloader/dataloader.go index bc3434e..74d302b 100644 --- a/sdk/dataloader/dataloader.go +++ b/sdk/dataloader/dataloader.go @@ -68,7 +68,7 @@ func NewBatchedLoader[K comparable, V any](batchFn BatchFunc[K, V], opts Options // The first context passed to this function within a given batch window will be provided to // the registered BatchFunc. func (l *Loader[K, V]) Load(ctx context.Context, key K) (value V, err error) { - c := make(chan Result[V]) + c := make(chan Result[V], 1) // this is sent to batch fn. It contains the key and the channel to return // the result on @@ -117,11 +117,15 @@ func (l *Loader[K, V]) Load(ctx context.Context, key K) (value V, err error) { l.batchLock.Unlock() - if v, ok := <-c; ok { - return v.Data, v.Error + select { + case v, ok := <-c: + if ok { + return v.Data, v.Error + } + return value, NoValueError + case <-ctx.Done(): + return value, ctx.Err() } - - return value, NoValueError } type batcher[K comparable, V any] struct { diff --git a/sdk/replaceable_loader.go b/sdk/replaceable_loader.go index ed94fb6..2401ff8 100644 --- a/sdk/replaceable_loader.go +++ b/sdk/replaceable_loader.go @@ -85,7 +85,7 @@ func (sys *System) batchLoadReplaceableEvents( defer cancel() // build batched queries for the external relays - go func(i int, pubkey nostr.PubKey) { + go func(i int, pubkey nostr.PubKey, ctx context.Context) { // gather relays we'll use for this pubkey relays := sys.determineRelaysToQuery(ctx, pubkey, kind) @@ -118,7 +118,7 @@ func (sys *System) batchLoadReplaceableEvents( if waiting.Add(-1) == 0 { aggregatedCancel() } - }(i, pubkey) + }(i, pubkey, ctx) } // query all relays with the prepared filters diff --git a/subscription.go b/subscription.go index 802fe62..7c20f65 100644 --- a/subscription.go +++ b/subscription.go @@ -9,6 +9,11 @@ import ( "time" ) +var ( + ErrNotConnected = errors.New("not connected") + ErrFireFailed = errors.New("failed to fire") +) + // Subscription represents a subscription to a relay. type Subscription struct { counter int64