khatru: use a channelmutex so we can fail to lock on addListener() if there's a disconnect.
This commit is contained in:
@@ -40,6 +40,7 @@ require (
|
|||||||
)
|
)
|
||||||
|
|
||||||
require (
|
require (
|
||||||
|
fiatjaf.com/lib v0.3.6
|
||||||
github.com/dgraph-io/ristretto/v2 v2.3.0
|
github.com/dgraph-io/ristretto/v2 v2.3.0
|
||||||
github.com/go-git/go-git/v5 v5.16.3
|
github.com/go-git/go-git/v5 v5.16.3
|
||||||
github.com/sivukhin/godjot v1.0.6
|
github.com/sivukhin/godjot v1.0.6
|
||||||
|
|||||||
@@ -1,3 +1,5 @@
|
|||||||
|
fiatjaf.com/lib v0.3.6 h1:GRZNSxHI2EWdjSKVuzaT+c0aifLDtS16SzkeJaHyJfY=
|
||||||
|
fiatjaf.com/lib v0.3.6/go.mod h1:UlHaZvPHj25PtKLh9GjZkUHRmQ2xZ8Jkoa4VRaLeeQ8=
|
||||||
github.com/FactomProject/basen v0.0.0-20150613233007-fe3947df716e h1:ahyvB3q25YnZWly5Gq1ekg6jcmWaGj/vG/MhF4aisoc=
|
github.com/FactomProject/basen v0.0.0-20150613233007-fe3947df716e h1:ahyvB3q25YnZWly5Gq1ekg6jcmWaGj/vG/MhF4aisoc=
|
||||||
github.com/FactomProject/basen v0.0.0-20150613233007-fe3947df716e/go.mod h1:kGUqhHd//musdITWjFvNTHn90WG9bMLBEPQZ17Cmlpw=
|
github.com/FactomProject/basen v0.0.0-20150613233007-fe3947df716e/go.mod h1:kGUqhHd//musdITWjFvNTHn90WG9bMLBEPQZ17Cmlpw=
|
||||||
github.com/FactomProject/btcutilecc v0.0.0-20130527213604-d3a63a5752ec h1:1Qb69mGp/UtRPn422BH4/Y4Q3SLUrD9KHuDkm8iodFc=
|
github.com/FactomProject/btcutilecc v0.0.0-20130527213604-d3a63a5752ec h1:1Qb69mGp/UtRPn422BH4/Y4Q3SLUrD9KHuDkm8iodFc=
|
||||||
|
|||||||
+6
-2
@@ -40,8 +40,12 @@ func (rl *Relay) addListener(
|
|||||||
filter nostr.Filter,
|
filter nostr.Filter,
|
||||||
cancel context.CancelCauseFunc,
|
cancel context.CancelCauseFunc,
|
||||||
) {
|
) {
|
||||||
rl.clientsMutex.Lock()
|
select {
|
||||||
defer rl.clientsMutex.Unlock()
|
case <-rl.clientsMutex.C():
|
||||||
|
defer rl.clientsMutex.Unlock()
|
||||||
|
case <-ws.Context.Done():
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
if specs, ok := rl.clients[ws]; ok /* this will always be true unless client has disconnected very rapidly */ {
|
if specs, ok := rl.clients[ws]; ok /* this will always be true unless client has disconnected very rapidly */ {
|
||||||
idx := len(subrelay.listeners)
|
idx := len(subrelay.listeners)
|
||||||
|
|||||||
+5
-3
@@ -8,9 +8,9 @@ import (
|
|||||||
"os"
|
"os"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"fiatjaf.com/lib/channelmutex"
|
||||||
"fiatjaf.com/nostr"
|
"fiatjaf.com/nostr"
|
||||||
"fiatjaf.com/nostr/eventstore"
|
"fiatjaf.com/nostr/eventstore"
|
||||||
"fiatjaf.com/nostr/nip11"
|
"fiatjaf.com/nostr/nip11"
|
||||||
@@ -39,7 +39,9 @@ func NewRelay() *Relay {
|
|||||||
CheckOrigin: func(r *http.Request) bool { return true },
|
CheckOrigin: func(r *http.Request) bool { return true },
|
||||||
},
|
},
|
||||||
|
|
||||||
clients: make(map[*WebSocket][]listenerSpec, 100),
|
clients: make(map[*WebSocket][]listenerSpec, 100),
|
||||||
|
clientsMutex: channelmutex.New(),
|
||||||
|
|
||||||
listeners: make([]listener, 0, 100),
|
listeners: make([]listener, 0, 100),
|
||||||
|
|
||||||
serveMux: &http.ServeMux{},
|
serveMux: &http.ServeMux{},
|
||||||
@@ -107,7 +109,7 @@ type Relay struct {
|
|||||||
// also used for keeping track of who is listening to what
|
// also used for keeping track of who is listening to what
|
||||||
clients map[*WebSocket][]listenerSpec
|
clients map[*WebSocket][]listenerSpec
|
||||||
listeners []listener
|
listeners []listener
|
||||||
clientsMutex sync.Mutex
|
clientsMutex *channelmutex.Mutex
|
||||||
|
|
||||||
// set this to true to support negentropy
|
// set this to true to support negentropy
|
||||||
Negentropy bool
|
Negentropy bool
|
||||||
|
|||||||
Reference in New Issue
Block a user