prevent halting in some other places.
This commit is contained in:
@@ -223,6 +223,9 @@ func (r *Relay) Connect(ctx context.Context) error {
|
|||||||
writeRequest.answer <- err
|
writeRequest.answer <- err
|
||||||
}
|
}
|
||||||
close(writeRequest.answer)
|
close(writeRequest.answer)
|
||||||
|
case <-r.connectionContext.Done():
|
||||||
|
// stop here
|
||||||
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
@@ -286,7 +289,10 @@ func (r *Relay) Connect(ctx context.Context) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// dispatch this to the internal .events channel of the subscription
|
// dispatch this to the internal .events channel of the subscription
|
||||||
subscription.events <- &env.Event
|
select {
|
||||||
|
case subscription.events <- &env.Event:
|
||||||
|
case <-subscription.Context.Done():
|
||||||
|
}
|
||||||
}
|
}
|
||||||
case *EOSEEnvelope:
|
case *EOSEEnvelope:
|
||||||
if subscription, ok := r.Subscriptions.Load(string(*env)); ok {
|
if subscription, ok := r.Subscriptions.Load(string(*env)); ok {
|
||||||
@@ -313,7 +319,11 @@ func (r *Relay) Connect(ctx context.Context) error {
|
|||||||
// Write queues a message to be sent to the relay.
|
// Write queues a message to be sent to the relay.
|
||||||
func (r *Relay) Write(msg []byte) <-chan error {
|
func (r *Relay) Write(msg []byte) <-chan error {
|
||||||
ch := make(chan error)
|
ch := make(chan error)
|
||||||
r.writeQueue <- writeRequest{msg: msg, answer: ch}
|
select {
|
||||||
|
case r.writeQueue <- writeRequest{msg: msg, answer: ch}:
|
||||||
|
case <-r.connectionContext.Done():
|
||||||
|
go func() { ch <- fmt.Errorf("connection closed") }()
|
||||||
|
}
|
||||||
return ch
|
return ch
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user