diff --git a/relay.go b/relay.go index df7ffe3..a56e42d 100644 --- a/relay.go +++ b/relay.go @@ -174,7 +174,18 @@ func (r *Relay) String() string { func (r *Relay) Context() context.Context { return r.connectionContext } // IsConnected returns true if the connection to this relay seems to be active. -func (r *Relay) IsConnected() bool { return !r.closed.Load() } +func (r *Relay) IsConnected() bool { + if r.closed.Load() { + return false + } + if r.conn == nil { + return false + } + if r.connectionContext == nil { + return false + } + return r.connectionContext.Err() == nil +} // Connect tries to establish a websocket connection to r.URL. // If the context expires before the connection is complete, an error is returned. @@ -576,7 +587,7 @@ func (r *Relay) publish(ctx context.Context, id ID, env Envelope) error { // Remember to cancel subscriptions, either by calling `.Unsub()` on them or ensuring their `context.Context` will be canceled at some point. // Failure to do that will result in a huge number of halted goroutines being created. func (r *Relay) Subscribe(ctx context.Context, filter Filter, opts SubscriptionOptions) (*Subscription, error) { - if r.conn == nil || r.closed.Load() { + if !r.IsConnected() { return nil, ErrDisconnected } @@ -651,11 +662,13 @@ func (r *Relay) PrepareSubscription(ctx context.Context, filter Filter, opts Sub go func() { <-sub.Context.Done() - // mark subscription as closed and send a CLOSE to the relay (naïve sync.Once implementation) + // mark subscription as closed and send a CLOSE to the relay (naive sync.Once implementation) if sub.live.CompareAndSwap(true, false) { closeMsg := CloseEnvelope(sub.id) closeb, _ := (&closeMsg).MarshalJSON() - sub.Relay.Write(closeb) + if err := sub.Relay.WriteWithError(closeb); err != nil { + _ = sub.Relay.close(err) + } } // remove subscription from our map