pool: open new connections whenever a subscription limit is reached, reuse multiple simultaneous relay connections.
This commit is contained in:
@@ -21,7 +21,7 @@ const (
|
|||||||
|
|
||||||
// Pool manages connections to multiple relays, ensures they are reopened when necessary and not duplicated.
|
// Pool manages connections to multiple relays, ensures they are reopened when necessary and not duplicated.
|
||||||
type Pool struct {
|
type Pool struct {
|
||||||
Relays *xsync.MapOf[string, *Relay]
|
Relays *xsync.MapOf[string, []*Relay]
|
||||||
Context context.Context
|
Context context.Context
|
||||||
|
|
||||||
authRequiredHandler func(context.Context, *Event) error
|
authRequiredHandler func(context.Context, *Event) error
|
||||||
@@ -54,7 +54,7 @@ func NewPool(opts PoolOptions) *Pool {
|
|||||||
ctx, cancel := context.WithCancelCause(context.Background())
|
ctx, cancel := context.WithCancelCause(context.Background())
|
||||||
|
|
||||||
pool := &Pool{
|
pool := &Pool{
|
||||||
Relays: xsync.NewMapOf[string, *Relay](),
|
Relays: xsync.NewMapOf[string, []*Relay](),
|
||||||
|
|
||||||
Context: ctx,
|
Context: ctx,
|
||||||
cancel: cancel,
|
cancel: cancel,
|
||||||
@@ -100,6 +100,7 @@ type PoolOptions struct {
|
|||||||
|
|
||||||
func (pool *Pool) startPenaltyBox() {
|
func (pool *Pool) startPenaltyBox() {
|
||||||
pool.penaltyBox = make(map[string][2]float64)
|
pool.penaltyBox = make(map[string][2]float64)
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
sleep := 30.0
|
sleep := 30.0
|
||||||
for {
|
for {
|
||||||
@@ -134,8 +135,8 @@ func (pool *Pool) EnsureRelay(url string) (*Relay, error) {
|
|||||||
nm := NormalizeURL(url)
|
nm := NormalizeURL(url)
|
||||||
defer namedLock(nm)()
|
defer namedLock(nm)()
|
||||||
|
|
||||||
relay, ok := pool.Relays.Load(nm)
|
relays, ok := pool.Relays.Load(nm)
|
||||||
if ok && relay == nil {
|
if ok && relays == nil {
|
||||||
if pool.penaltyBox != nil {
|
if pool.penaltyBox != nil {
|
||||||
pool.penaltyBoxMu.Lock()
|
pool.penaltyBoxMu.Lock()
|
||||||
defer pool.penaltyBoxMu.Unlock()
|
defer pool.penaltyBoxMu.Unlock()
|
||||||
@@ -143,13 +144,30 @@ func (pool *Pool) EnsureRelay(url string) (*Relay, error) {
|
|||||||
if v[1] > 0 {
|
if v[1] > 0 {
|
||||||
return nil, fmt.Errorf("in penalty box, %fs remaining", v[1])
|
return nil, fmt.Errorf("in penalty box, %fs remaining", v[1])
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// penaltybox ended, will proceed to create a connection
|
||||||
|
}
|
||||||
|
} else if ok {
|
||||||
|
// we have connections -- return that with the least amount of subscriptions
|
||||||
|
lessSubs := math.MaxInt
|
||||||
|
var lessSubsRelay *Relay
|
||||||
|
for _, relay := range relays {
|
||||||
|
if relay.IsConnected() && relay.Subscriptions.Size() < lessSubs {
|
||||||
|
lessSubsRelay = relay
|
||||||
|
lessSubs = relay.Subscriptions.Size()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if lessSubsRelay != nil {
|
||||||
|
return lessSubsRelay, nil
|
||||||
}
|
}
|
||||||
} else if ok && relay.IsConnected() {
|
|
||||||
// already connected, unlock and return
|
|
||||||
return relay, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
relay = NewRelay(pool.Context, url, pool.relayOptions)
|
return pool.ensureNewRelay(nm)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (pool *Pool) ensureNewRelay(nm string) (*Relay, error) {
|
||||||
|
relay := NewRelay(pool.Context, nm, pool.relayOptions)
|
||||||
// try to connect
|
// try to connect
|
||||||
// we use this ctx here so when the pool dies everything dies
|
// we use this ctx here so when the pool dies everything dies
|
||||||
if err := relay.Connect(pool.Context); err != nil {
|
if err := relay.Connect(pool.Context); err != nil {
|
||||||
@@ -159,17 +177,35 @@ func (pool *Pool) EnsureRelay(url string) (*Relay, error) {
|
|||||||
defer pool.penaltyBoxMu.Unlock()
|
defer pool.penaltyBoxMu.Unlock()
|
||||||
v, _ := pool.penaltyBox[nm]
|
v, _ := pool.penaltyBox[nm]
|
||||||
pool.penaltyBox[nm] = [2]float64{v[0] + 1, 30.0 + math.Pow(2, v[0]+1)}
|
pool.penaltyBox[nm] = [2]float64{v[0] + 1, 30.0 + math.Pow(2, v[0]+1)}
|
||||||
|
pool.Relays.Store(nm, nil) // this is important for penalty box detection on EnsureRelay
|
||||||
}
|
}
|
||||||
return nil, fmt.Errorf("failed to connect: %w", err)
|
return nil, fmt.Errorf("failed to connect: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
pool.Relays.Store(nm, relay)
|
pool.Relays.Compute(nm, func(oldValue []*Relay, loaded bool) (newValue []*Relay, delete bool) {
|
||||||
go func(r *Relay, relayURL string) {
|
return append(oldValue, relay), false
|
||||||
<-r.Context().Done()
|
})
|
||||||
if current, ok := pool.Relays.Load(relayURL); ok && current == r {
|
|
||||||
pool.Relays.Delete(relayURL)
|
// when the connection dies we must ensure this relay is removed from the list
|
||||||
}
|
go func(relay *Relay, nm string) {
|
||||||
|
<-relay.Context().Done()
|
||||||
|
|
||||||
|
pool.Relays.Compute(nm, func(oldValue []*Relay, loaded bool) (newValue []*Relay, delete bool) {
|
||||||
|
newValue = oldValue
|
||||||
|
|
||||||
|
for i := 0; i < len(newValue); i++ {
|
||||||
|
if newValue[i] == relay {
|
||||||
|
// swap-delete
|
||||||
|
newValue[i] = newValue[len(newValue)-1]
|
||||||
|
newValue = newValue[0 : len(newValue)-1]
|
||||||
|
i--
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return newValue, len(newValue) == 0
|
||||||
|
})
|
||||||
}(relay, nm)
|
}(relay, nm)
|
||||||
|
|
||||||
return relay, nil
|
return relay, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -383,6 +419,17 @@ func (pool *Pool) FetchManyReplaceable(
|
|||||||
|
|
||||||
subscribe:
|
subscribe:
|
||||||
sub, err := relay.Subscribe(ctx, filter, opts)
|
sub, err := relay.Subscribe(ctx, filter, opts)
|
||||||
|
|
||||||
|
if errors.Is(err, ErrTooManySubscriptions) {
|
||||||
|
newRelay, newErr := pool.ensureNewRelay(relay.URL)
|
||||||
|
if newErr != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
newRelay.realSubscriptionsLimit = relay.realSubscriptionsLimit
|
||||||
|
relay = newRelay
|
||||||
|
goto subscribe
|
||||||
|
}
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
debugLogf("[pool] error subscribing to %s with %v: %s", relay, filter, err)
|
debugLogf("[pool] error subscribing to %s with %v: %s", relay, filter, err)
|
||||||
return
|
return
|
||||||
@@ -522,6 +569,16 @@ func (pool *Pool) subMany(
|
|||||||
|
|
||||||
subscribe:
|
subscribe:
|
||||||
sub, err = relay.Subscribe(ctx, filter, opts)
|
sub, err = relay.Subscribe(ctx, filter, opts)
|
||||||
|
|
||||||
|
if errors.Is(err, ErrTooManySubscriptions) {
|
||||||
|
newRelay, newErr := pool.ensureNewRelay(relay.URL)
|
||||||
|
if newErr == nil {
|
||||||
|
newRelay.realSubscriptionsLimit = relay.realSubscriptionsLimit
|
||||||
|
relay = newRelay
|
||||||
|
goto subscribe
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
debugLogf("[pool] subscription to %s failed: %s -- will retry\n", nm, err)
|
debugLogf("[pool] subscription to %s failed: %s -- will retry\n", nm, err)
|
||||||
goto reconnect
|
goto reconnect
|
||||||
@@ -666,6 +723,17 @@ func (pool *Pool) subManyEose(
|
|||||||
|
|
||||||
subscribe:
|
subscribe:
|
||||||
sub, err := relay.Subscribe(ctx, filter, opts)
|
sub, err := relay.Subscribe(ctx, filter, opts)
|
||||||
|
|
||||||
|
if errors.Is(err, ErrTooManySubscriptions) {
|
||||||
|
newRelay, newErr := pool.ensureNewRelay(relay.URL)
|
||||||
|
if newErr != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
newRelay.realSubscriptionsLimit = relay.realSubscriptionsLimit
|
||||||
|
relay = newRelay
|
||||||
|
goto subscribe
|
||||||
|
}
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
debugLogf("[pool] error subscribing to %s with %v: %s", relay, filter, err)
|
debugLogf("[pool] error subscribing to %s with %v: %s", relay, filter, err)
|
||||||
return
|
return
|
||||||
@@ -748,13 +816,28 @@ func (pool *Pool) CountMany(
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
docount:
|
||||||
ce, err := relay.countInternal(ctx, filter, opts)
|
ce, err := relay.countInternal(ctx, filter, opts)
|
||||||
|
|
||||||
|
if errors.Is(err, ErrTooManySubscriptions) {
|
||||||
|
newRelay, newErr := pool.ensureNewRelay(relay.URL)
|
||||||
|
if newErr != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
newRelay.realSubscriptionsLimit = relay.realSubscriptionsLimit
|
||||||
|
relay = newRelay
|
||||||
|
goto docount
|
||||||
|
}
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(ce.HyperLogLog) != 256 {
|
if len(ce.HyperLogLog) != 256 {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
hll.MergeRegisters(ce.HyperLogLog)
|
hll.MergeRegisters(ce.HyperLogLog)
|
||||||
}(NormalizeURL(url))
|
}(NormalizeURL(url))
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user