diff --git a/relay.go b/relay.go index bfcab14..c0764c8 100644 --- a/relay.go +++ b/relay.go @@ -223,6 +223,9 @@ func (r *Relay) Connect(ctx context.Context) error { writeRequest.answer <- err } close(writeRequest.answer) + case <-r.connectionContext.Done(): + // stop here + return } } }() @@ -286,7 +289,10 @@ func (r *Relay) Connect(ctx context.Context) error { } // dispatch this to the internal .events channel of the subscription - subscription.events <- &env.Event + select { + case subscription.events <- &env.Event: + case <-subscription.Context.Done(): + } } case *EOSEEnvelope: if subscription, ok := r.Subscriptions.Load(string(*env)); ok { @@ -313,7 +319,11 @@ func (r *Relay) Connect(ctx context.Context) error { // Write queues a message to be sent to the relay. func (r *Relay) Write(msg []byte) <-chan error { ch := make(chan error) - r.writeQueue <- writeRequest{msg: msg, answer: ch} + select { + case r.writeQueue <- writeRequest{msg: msg, answer: ch}: + case <-r.connectionContext.Done(): + go func() { ch <- fmt.Errorf("connection closed") }() + } return ch }