eliminate closedMutex and closeNotify because they are useless (apparently).
This commit is contained in:
@@ -17,7 +17,6 @@ import (
|
|||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"fiatjaf.com/lib/channelmutex"
|
|
||||||
ws "github.com/coder/websocket"
|
ws "github.com/coder/websocket"
|
||||||
"github.com/puzpuzpuz/xsync/v3"
|
"github.com/puzpuzpuz/xsync/v3"
|
||||||
)
|
)
|
||||||
@@ -48,16 +47,13 @@ func (c closeCause) Error() string {
|
|||||||
|
|
||||||
// Relay represents a connection to a Nostr relay.
|
// Relay represents a connection to a Nostr relay.
|
||||||
type Relay struct {
|
type Relay struct {
|
||||||
closeMutex *channelmutex.Mutex
|
|
||||||
|
|
||||||
URL string
|
URL string
|
||||||
requestHeader http.Header // e.g. for origin header
|
requestHeader http.Header // e.g. for origin header
|
||||||
|
|
||||||
// websocket connection
|
// websocket connection
|
||||||
conn *ws.Conn
|
conn *ws.Conn
|
||||||
writeQueue chan writeRequest
|
writeQueue chan writeRequest
|
||||||
closed *atomic.Bool
|
closed *atomic.Bool
|
||||||
closedNotify chan struct{}
|
|
||||||
|
|
||||||
Subscriptions *xsync.MapOf[int64, *Subscription]
|
Subscriptions *xsync.MapOf[int64, *Subscription]
|
||||||
|
|
||||||
@@ -92,9 +88,7 @@ func NewRelay(ctx context.Context, url string, opts RelayOptions) *Relay {
|
|||||||
customHandler: opts.CustomHandler,
|
customHandler: opts.CustomHandler,
|
||||||
noticeHandler: opts.NoticeHandler,
|
noticeHandler: opts.NoticeHandler,
|
||||||
authHandler: opts.AuthHandler,
|
authHandler: opts.AuthHandler,
|
||||||
closeMutex: channelmutex.New(),
|
|
||||||
closed: &atomic.Bool{},
|
closed: &atomic.Bool{},
|
||||||
closedNotify: make(chan struct{}),
|
|
||||||
}
|
}
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
@@ -104,8 +98,6 @@ func NewRelay(ctx context.Context, url string, opts RelayOptions) *Relay {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
r.closeMutex.Invalidate()
|
|
||||||
|
|
||||||
if r.conn != nil {
|
if r.conn != nil {
|
||||||
cause := context.Cause(ctx)
|
cause := context.Cause(ctx)
|
||||||
code := ws.StatusNormalClosure
|
code := ws.StatusNormalClosure
|
||||||
@@ -120,14 +112,6 @@ func NewRelay(ctx context.Context, url string, opts RelayOptions) *Relay {
|
|||||||
|
|
||||||
_ = r.conn.Close(code, reason)
|
_ = r.conn.Close(code, reason)
|
||||||
}
|
}
|
||||||
if r.closeMutex != nil {
|
|
||||||
r.closeMutex.Lock()
|
|
||||||
if r.closedNotify != nil {
|
|
||||||
close(r.closedNotify)
|
|
||||||
}
|
|
||||||
r.conn = nil
|
|
||||||
r.closeMutex.Unlock()
|
|
||||||
}
|
|
||||||
}()
|
}()
|
||||||
|
|
||||||
return r
|
return r
|
||||||
@@ -259,7 +243,6 @@ func (r *Relay) newConnection(ctx context.Context, httpClient *http.Client) erro
|
|||||||
r.conn = c
|
r.conn = c
|
||||||
r.writeQueue = make(chan writeRequest, 64 /* idem */)
|
r.writeQueue = make(chan writeRequest, 64 /* idem */)
|
||||||
r.closed = &atomic.Bool{}
|
r.closed = &atomic.Bool{}
|
||||||
r.closedNotify = make(chan struct{})
|
|
||||||
|
|
||||||
connCtx := r.connectionContext
|
connCtx := r.connectionContext
|
||||||
go func() {
|
go func() {
|
||||||
@@ -269,8 +252,6 @@ func (r *Relay) newConnection(ctx context.Context, httpClient *http.Client) erro
|
|||||||
select {
|
select {
|
||||||
case <-connCtx.Done():
|
case <-connCtx.Done():
|
||||||
return
|
return
|
||||||
case <-r.closedNotify:
|
|
||||||
return
|
|
||||||
case <-ticker.C:
|
case <-ticker.C:
|
||||||
debugLogf("{%s} pinging\n", r.URL)
|
debugLogf("{%s} pinging\n", r.URL)
|
||||||
pingCtx, cancel := context.WithTimeoutCause(connCtx, time.Millisecond*800, errors.New("ping took too long"))
|
pingCtx, cancel := context.WithTimeoutCause(connCtx, time.Millisecond*800, errors.New("ping took too long"))
|
||||||
@@ -441,16 +422,6 @@ func (r *Relay) handleMessage(message string) {
|
|||||||
|
|
||||||
// Write queues an arbitrary message to be sent to the relay.
|
// Write queues an arbitrary message to be sent to the relay.
|
||||||
func (r *Relay) Write(msg []byte) {
|
func (r *Relay) Write(msg []byte) {
|
||||||
select {
|
|
||||||
case <-r.closeMutex.C(): // this locks the mutex
|
|
||||||
case <-r.closedNotify:
|
|
||||||
return
|
|
||||||
case <-r.connectionContext.Done():
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
defer r.closeMutex.Unlock()
|
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-r.connectionContext.Done():
|
case <-r.connectionContext.Done():
|
||||||
case r.writeQueue <- writeRequest{msg: msg, answer: nil}:
|
case r.writeQueue <- writeRequest{msg: msg, answer: nil}:
|
||||||
@@ -461,16 +432,6 @@ func (r *Relay) Write(msg []byte) {
|
|||||||
func (r *Relay) WriteWithError(msg []byte) error {
|
func (r *Relay) WriteWithError(msg []byte) error {
|
||||||
ch := make(chan error)
|
ch := make(chan error)
|
||||||
|
|
||||||
select {
|
|
||||||
case <-r.closeMutex.C(): // this locks the channel/mutex
|
|
||||||
case <-r.closedNotify:
|
|
||||||
return fmt.Errorf("failed to write to %s: <closed>", r.URL)
|
|
||||||
case <-r.connectionContext.Done():
|
|
||||||
return fmt.Errorf("failed to write to %s: <closed>", r.URL)
|
|
||||||
}
|
|
||||||
|
|
||||||
defer r.closeMutex.Unlock()
|
|
||||||
|
|
||||||
if r.writeQueue == nil {
|
if r.writeQueue == nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@@ -600,12 +561,8 @@ func (r *Relay) Subscribe(ctx context.Context, filter Filter, opts SubscriptionO
|
|||||||
}
|
}
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
select {
|
<-ctx.Done()
|
||||||
case <-r.closedNotify:
|
sub.cancel(nil)
|
||||||
sub.cancel(ErrDisconnected)
|
|
||||||
case <-ctx.Done():
|
|
||||||
sub.cancel(nil)
|
|
||||||
}
|
|
||||||
}()
|
}()
|
||||||
|
|
||||||
return sub, nil
|
return sub, nil
|
||||||
|
|||||||
Reference in New Issue
Block a user