diff --git a/relay.go b/relay.go index b3cf377..0c8776a 100644 --- a/relay.go +++ b/relay.go @@ -585,12 +585,12 @@ func (r *Relay) Subscribe(ctx context.Context, filter Filter, opts SubscriptionO sub := r.PrepareSubscription(ctx, filter, opts) if r.conn == nil { - sub.unsub(ErrNotConnected) + sub.cancel(ErrNotConnected) return nil, fmt.Errorf("not connected to %s", r.URL) } if err := sub.Fire(); err != nil { - sub.unsub(ErrFireFailed) + sub.cancel(ErrFireFailed) return nil, fmt.Errorf("couldn't subscribe to %v at %s: %w", filter, r.URL, err) } @@ -655,7 +655,24 @@ func (r *Relay) PrepareSubscription(ctx context.Context, filter Filter, opts Sub } // start handling events, eose, unsub etc: - go sub.start() + go func() { + <-sub.Context.Done() + + // mark subscription as closed and send a CLOSE to the relay (naïve sync.Once implementation) + if sub.live.CompareAndSwap(true, false) { + closeMsg := CloseEnvelope(sub.id) + closeb, _ := (&closeMsg).MarshalJSON() + sub.Relay.Write(closeb) + } + + // remove subscription from our map + sub.Relay.Subscriptions.Delete(sub.counter) + + // do this so we don't have the possibility of closing the Events channel and then trying to send to it + sub.mu.Lock() + close(sub.Events) + sub.mu.Unlock() + }() return sub } @@ -711,7 +728,7 @@ func (r *Relay) countInternal(ctx context.Context, filter Filter, opts Subscript return CountEnvelope{}, err } - defer sub.unsub(errors.New("countInternal() ended")) + defer sub.cancel(errors.New("countInternal() ended")) if _, ok := ctx.Deadline(); !ok { // if no timeout is set, force it to 7 seconds diff --git a/subscription.go b/subscription.go index 13c179c..768d5c9 100644 --- a/subscription.go +++ b/subscription.go @@ -75,18 +75,6 @@ type SubscriptionOptions struct { MaxWaitForEOSE time.Duration } -func (sub *Subscription) start() { - <-sub.Context.Done() - - // the subscription ends once the context is canceled (if not already) - sub.unsub(errors.New("context done on start()")) // this will set sub.live to false - - // do this so we don't have the possibility of closing the Events channel and then trying to send to it - sub.mu.Lock() - close(sub.Events) - sub.mu.Unlock() -} - // GetID returns the subscription ID. func (sub *Subscription) GetID() string { return sub.id } @@ -133,37 +121,14 @@ func (sub *Subscription) handleClosed(reason string) { go func() { sub.ClosedReason <- reason sub.live.Store(false) // set this so we don't send an unnecessary CLOSE to the relay - sub.unsub(fmt.Errorf("CLOSED received: %s", reason)) + sub.cancel(fmt.Errorf("CLOSED received: %s", reason)) }() } // Unsub closes the subscription, sending "CLOSE" to relay as in NIP-01. // Unsub() also closes the channel sub.Events and makes a new one. func (sub *Subscription) Unsub() { - sub.unsub(errors.New("Unsub() called")) -} - -// unsub is the internal implementation of Unsub. -func (sub *Subscription) unsub(err error) { - // cancel the context (if it's not canceled already) - sub.cancel(err) - - // mark subscription as closed and send a CLOSE to the relay (naïve sync.Once implementation) - if sub.live.CompareAndSwap(true, false) { - sub.Close() - } - - // remove subscription from our map - sub.Relay.Subscriptions.Delete(sub.counter) -} - -// Close just sends a CLOSE message. You probably want Unsub() instead. -func (sub *Subscription) Close() { - if sub.Relay.IsConnected() { - closeMsg := CloseEnvelope(sub.id) - closeb, _ := (&closeMsg).MarshalJSON() - sub.Relay.Write(closeb) - } + sub.cancel(errors.New("Unsub() called")) } // Sub sets sub.Filters and then calls sub.Fire(ctx).