From 6cbe984e168b7523545390b08d425df1eecd8fff Mon Sep 17 00:00:00 2001 From: fiatjaf Date: Sat, 14 Mar 2026 05:26:33 -0300 Subject: [PATCH] eliminate closedMutex and closeNotify because they are useless (apparently). --- relay.go | 53 +++++------------------------------------------------ 1 file changed, 5 insertions(+), 48 deletions(-) diff --git a/relay.go b/relay.go index 06fda9e..64e266a 100644 --- a/relay.go +++ b/relay.go @@ -17,7 +17,6 @@ import ( "sync/atomic" "time" - "fiatjaf.com/lib/channelmutex" ws "github.com/coder/websocket" "github.com/puzpuzpuz/xsync/v3" ) @@ -48,16 +47,13 @@ func (c closeCause) Error() string { // Relay represents a connection to a Nostr relay. type Relay struct { - closeMutex *channelmutex.Mutex - URL string requestHeader http.Header // e.g. for origin header // websocket connection - conn *ws.Conn - writeQueue chan writeRequest - closed *atomic.Bool - closedNotify chan struct{} + conn *ws.Conn + writeQueue chan writeRequest + closed *atomic.Bool Subscriptions *xsync.MapOf[int64, *Subscription] @@ -92,9 +88,7 @@ func NewRelay(ctx context.Context, url string, opts RelayOptions) *Relay { customHandler: opts.CustomHandler, noticeHandler: opts.NoticeHandler, authHandler: opts.AuthHandler, - closeMutex: channelmutex.New(), closed: &atomic.Bool{}, - closedNotify: make(chan struct{}), } go func() { @@ -104,8 +98,6 @@ func NewRelay(ctx context.Context, url string, opts RelayOptions) *Relay { return } - r.closeMutex.Invalidate() - if r.conn != nil { cause := context.Cause(ctx) code := ws.StatusNormalClosure @@ -120,14 +112,6 @@ func NewRelay(ctx context.Context, url string, opts RelayOptions) *Relay { _ = r.conn.Close(code, reason) } - if r.closeMutex != nil { - r.closeMutex.Lock() - if r.closedNotify != nil { - close(r.closedNotify) - } - r.conn = nil - r.closeMutex.Unlock() - } }() return r @@ -259,7 +243,6 @@ func (r *Relay) newConnection(ctx context.Context, httpClient *http.Client) erro r.conn = c r.writeQueue = make(chan writeRequest, 64 /* idem */) r.closed = &atomic.Bool{} - r.closedNotify = make(chan struct{}) connCtx := r.connectionContext go func() { @@ -269,8 +252,6 @@ func (r *Relay) newConnection(ctx context.Context, httpClient *http.Client) erro select { case <-connCtx.Done(): return - case <-r.closedNotify: - return case <-ticker.C: debugLogf("{%s} pinging\n", r.URL) pingCtx, cancel := context.WithTimeoutCause(connCtx, time.Millisecond*800, errors.New("ping took too long")) @@ -441,16 +422,6 @@ func (r *Relay) handleMessage(message string) { // Write queues an arbitrary message to be sent to the relay. func (r *Relay) Write(msg []byte) { - select { - case <-r.closeMutex.C(): // this locks the mutex - case <-r.closedNotify: - return - case <-r.connectionContext.Done(): - return - } - - defer r.closeMutex.Unlock() - select { case <-r.connectionContext.Done(): case r.writeQueue <- writeRequest{msg: msg, answer: nil}: @@ -461,16 +432,6 @@ func (r *Relay) Write(msg []byte) { func (r *Relay) WriteWithError(msg []byte) error { ch := make(chan error) - select { - case <-r.closeMutex.C(): // this locks the channel/mutex - case <-r.closedNotify: - return fmt.Errorf("failed to write to %s: ", r.URL) - case <-r.connectionContext.Done(): - return fmt.Errorf("failed to write to %s: ", r.URL) - } - - defer r.closeMutex.Unlock() - if r.writeQueue == nil { return nil } @@ -600,12 +561,8 @@ func (r *Relay) Subscribe(ctx context.Context, filter Filter, opts SubscriptionO } go func() { - select { - case <-r.closedNotify: - sub.cancel(ErrDisconnected) - case <-ctx.Done(): - sub.cancel(nil) - } + <-ctx.Done() + sub.cancel(nil) }() return sub, nil