From 9bf9816c15a3550cabc7c6812d21ef3f5b534719 Mon Sep 17 00:00:00 2001 From: fiatjaf Date: Tue, 7 Apr 2026 18:13:05 -0300 Subject: [PATCH] pool: open new connections whenever a subscription limit is reached, reuse multiple simultaneous relay connections. --- pool.go | 111 +++++++++++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 97 insertions(+), 14 deletions(-) diff --git a/pool.go b/pool.go index b73e4bc..356d071 100644 --- a/pool.go +++ b/pool.go @@ -21,7 +21,7 @@ const ( // Pool manages connections to multiple relays, ensures they are reopened when necessary and not duplicated. type Pool struct { - Relays *xsync.MapOf[string, *Relay] + Relays *xsync.MapOf[string, []*Relay] Context context.Context authRequiredHandler func(context.Context, *Event) error @@ -54,7 +54,7 @@ func NewPool(opts PoolOptions) *Pool { ctx, cancel := context.WithCancelCause(context.Background()) pool := &Pool{ - Relays: xsync.NewMapOf[string, *Relay](), + Relays: xsync.NewMapOf[string, []*Relay](), Context: ctx, cancel: cancel, @@ -100,6 +100,7 @@ type PoolOptions struct { func (pool *Pool) startPenaltyBox() { pool.penaltyBox = make(map[string][2]float64) + go func() { sleep := 30.0 for { @@ -134,8 +135,8 @@ func (pool *Pool) EnsureRelay(url string) (*Relay, error) { nm := NormalizeURL(url) defer namedLock(nm)() - relay, ok := pool.Relays.Load(nm) - if ok && relay == nil { + relays, ok := pool.Relays.Load(nm) + if ok && relays == nil { if pool.penaltyBox != nil { pool.penaltyBoxMu.Lock() defer pool.penaltyBoxMu.Unlock() @@ -143,13 +144,30 @@ func (pool *Pool) EnsureRelay(url string) (*Relay, error) { if v[1] > 0 { return nil, fmt.Errorf("in penalty box, %fs remaining", v[1]) } + + // penaltybox ended, will proceed to create a connection + } + } else if ok { + // we have connections -- return that with the least amount of subscriptions + lessSubs := math.MaxInt + var lessSubsRelay *Relay + for _, relay := range relays { + if relay.IsConnected() && relay.Subscriptions.Size() < lessSubs { + lessSubsRelay = relay + lessSubs = relay.Subscriptions.Size() + } + } + + if lessSubsRelay != nil { + return lessSubsRelay, nil } - } else if ok && relay.IsConnected() { - // already connected, unlock and return - return relay, nil } - relay = NewRelay(pool.Context, url, pool.relayOptions) + return pool.ensureNewRelay(nm) +} + +func (pool *Pool) ensureNewRelay(nm string) (*Relay, error) { + relay := NewRelay(pool.Context, nm, pool.relayOptions) // try to connect // we use this ctx here so when the pool dies everything dies if err := relay.Connect(pool.Context); err != nil { @@ -159,17 +177,35 @@ func (pool *Pool) EnsureRelay(url string) (*Relay, error) { defer pool.penaltyBoxMu.Unlock() v, _ := pool.penaltyBox[nm] pool.penaltyBox[nm] = [2]float64{v[0] + 1, 30.0 + math.Pow(2, v[0]+1)} + pool.Relays.Store(nm, nil) // this is important for penalty box detection on EnsureRelay } return nil, fmt.Errorf("failed to connect: %w", err) } - pool.Relays.Store(nm, relay) - go func(r *Relay, relayURL string) { - <-r.Context().Done() - if current, ok := pool.Relays.Load(relayURL); ok && current == r { - pool.Relays.Delete(relayURL) - } + pool.Relays.Compute(nm, func(oldValue []*Relay, loaded bool) (newValue []*Relay, delete bool) { + return append(oldValue, relay), false + }) + + // when the connection dies we must ensure this relay is removed from the list + go func(relay *Relay, nm string) { + <-relay.Context().Done() + + pool.Relays.Compute(nm, func(oldValue []*Relay, loaded bool) (newValue []*Relay, delete bool) { + newValue = oldValue + + for i := 0; i < len(newValue); i++ { + if newValue[i] == relay { + // swap-delete + newValue[i] = newValue[len(newValue)-1] + newValue = newValue[0 : len(newValue)-1] + i-- + } + } + + return newValue, len(newValue) == 0 + }) }(relay, nm) + return relay, nil } @@ -383,6 +419,17 @@ func (pool *Pool) FetchManyReplaceable( subscribe: sub, err := relay.Subscribe(ctx, filter, opts) + + if errors.Is(err, ErrTooManySubscriptions) { + newRelay, newErr := pool.ensureNewRelay(relay.URL) + if newErr != nil { + return + } + newRelay.realSubscriptionsLimit = relay.realSubscriptionsLimit + relay = newRelay + goto subscribe + } + if err != nil { debugLogf("[pool] error subscribing to %s with %v: %s", relay, filter, err) return @@ -522,6 +569,16 @@ func (pool *Pool) subMany( subscribe: sub, err = relay.Subscribe(ctx, filter, opts) + + if errors.Is(err, ErrTooManySubscriptions) { + newRelay, newErr := pool.ensureNewRelay(relay.URL) + if newErr == nil { + newRelay.realSubscriptionsLimit = relay.realSubscriptionsLimit + relay = newRelay + goto subscribe + } + } + if err != nil { debugLogf("[pool] subscription to %s failed: %s -- will retry\n", nm, err) goto reconnect @@ -666,6 +723,17 @@ func (pool *Pool) subManyEose( subscribe: sub, err := relay.Subscribe(ctx, filter, opts) + + if errors.Is(err, ErrTooManySubscriptions) { + newRelay, newErr := pool.ensureNewRelay(relay.URL) + if newErr != nil { + return + } + newRelay.realSubscriptionsLimit = relay.realSubscriptionsLimit + relay = newRelay + goto subscribe + } + if err != nil { debugLogf("[pool] error subscribing to %s with %v: %s", relay, filter, err) return @@ -748,13 +816,28 @@ func (pool *Pool) CountMany( if err != nil { return } + + docount: ce, err := relay.countInternal(ctx, filter, opts) + + if errors.Is(err, ErrTooManySubscriptions) { + newRelay, newErr := pool.ensureNewRelay(relay.URL) + if newErr != nil { + return + } + newRelay.realSubscriptionsLimit = relay.realSubscriptionsLimit + relay = newRelay + goto docount + } + if err != nil { return } + if len(ce.HyperLogLog) != 256 { return } + hll.MergeRegisters(ce.HyperLogLog) }(NormalizeURL(url)) }