From 8515153df256f334a0ce65d1c59585cfde7ff159 Mon Sep 17 00:00:00 2001 From: fiatjaf Date: Sat, 11 Apr 2026 19:25:39 -0300 Subject: [PATCH] Revert "pool: open new connections whenever a subscription limit is reached, reuse multiple simultaneous relay connections." This reverts commit 9bf9816c15a3550cabc7c6812d21ef3f5b534719. --- pool.go | 117 +++++++------------------------------------------------- 1 file changed, 14 insertions(+), 103 deletions(-) diff --git a/pool.go b/pool.go index bf39ad9..c44252c 100644 --- a/pool.go +++ b/pool.go @@ -21,7 +21,7 @@ const ( // Pool manages connections to multiple relays, ensures they are reopened when necessary and not duplicated. type Pool struct { - Relays *xsync.MapOf[string, []*Relay] + Relays *xsync.MapOf[string, *Relay] Context context.Context authRequiredHandler func(context.Context, *Event) error @@ -53,7 +53,7 @@ func NewPool(opts PoolOptions) *Pool { ctx, cancel := context.WithCancelCause(context.Background()) pool := &Pool{ - Relays: xsync.NewMapOf[string, []*Relay](), + Relays: xsync.NewMapOf[string, *Relay](), Context: ctx, cancel: cancel, @@ -132,37 +132,20 @@ func (pool *Pool) EnsureRelay(url string) (*Relay, error) { nm := NormalizeURL(url) defer namedLock(nm)() - relays, ok := pool.Relays.Load(nm) - if ok && relays == nil { + relay, ok := pool.Relays.Load(nm) + if ok && relay == nil { if pool.penaltyBox != nil { v, _ := pool.penaltyBox.Load(nm) if v[1] > 0 { return nil, fmt.Errorf("in penalty box, %fs remaining", v[1]) } - - // penaltybox ended, will proceed to create a connection - } - } else if ok { - // we have connections -- return that with the least amount of subscriptions - lessSubs := math.MaxInt - var lessSubsRelay *Relay - for _, relay := range relays { - if relay.IsConnected() && relay.Subscriptions.Size() < lessSubs { - lessSubsRelay = relay - lessSubs = relay.Subscriptions.Size() - } - } - - if lessSubsRelay != nil { - return lessSubsRelay, nil } + } else if ok && relay.IsConnected() { + // already connected, unlock and return + return relay, nil } - return pool.ensureNewRelay(nm) -} - -func (pool *Pool) ensureNewRelay(nm string) (*Relay, error) { - relay := NewRelay(pool.Context, nm, pool.relayOptions) + relay = NewRelay(pool.Context, url, pool.relayOptions) // try to connect // we use this ctx here so when the pool dies everything dies if err := relay.Connect(pool.Context); err != nil { @@ -176,30 +159,13 @@ func (pool *Pool) ensureNewRelay(nm string) (*Relay, error) { return nil, fmt.Errorf("failed to connect: %w", err) } - pool.Relays.Compute(nm, func(oldValue []*Relay, loaded bool) (newValue []*Relay, delete bool) { - return append(oldValue, relay), false - }) - - // when the connection dies we must ensure this relay is removed from the list - go func(relay *Relay, nm string) { - <-relay.Context().Done() - - pool.Relays.Compute(nm, func(oldValue []*Relay, loaded bool) (newValue []*Relay, delete bool) { - newValue = oldValue - - for i := 0; i < len(newValue); i++ { - if newValue[i] == relay { - // swap-delete - newValue[i] = newValue[len(newValue)-1] - newValue = newValue[0 : len(newValue)-1] - i-- - } - } - - return newValue, len(newValue) == 0 - }) + pool.Relays.Store(nm, relay) + go func(r *Relay, relayURL string) { + <-r.Context().Done() + if current, ok := pool.Relays.Load(relayURL); ok && current == r { + pool.Relays.Delete(relayURL) + } }(relay, nm) - return relay, nil } @@ -413,19 +379,6 @@ func (pool *Pool) FetchManyReplaceable( subscribe: sub, err := relay.Subscribe(ctx, filter, opts) - - if errors.Is(err, ErrTooManySubscriptions) { - unlock := namedLock(relay.URL) - newRelay, newErr := pool.ensureNewRelay(relay.URL) - unlock() - if newErr != nil { - return - } - newRelay.realSubscriptionsLimit = relay.realSubscriptionsLimit - relay = newRelay - goto subscribe - } - if err != nil { debugLogf("[pool] error subscribing to %s with %v: %s", relay, filter, err) return @@ -565,18 +518,6 @@ func (pool *Pool) subMany( subscribe: sub, err = relay.Subscribe(ctx, filter, opts) - - if errors.Is(err, ErrTooManySubscriptions) { - unlock := namedLock(relay.URL) - newRelay, newErr := pool.ensureNewRelay(relay.URL) - unlock() - if newErr == nil { - newRelay.realSubscriptionsLimit = relay.realSubscriptionsLimit - relay = newRelay - goto subscribe - } - } - if err != nil { debugLogf("[pool] subscription to %s failed: %s -- will retry\n", nm, err) goto reconnect @@ -721,19 +662,6 @@ func (pool *Pool) subManyEose( subscribe: sub, err := relay.Subscribe(ctx, filter, opts) - - if errors.Is(err, ErrTooManySubscriptions) { - unlock := namedLock(relay.URL) - newRelay, newErr := pool.ensureNewRelay(relay.URL) - unlock() - if newErr != nil { - return - } - newRelay.realSubscriptionsLimit = relay.realSubscriptionsLimit - relay = newRelay - goto subscribe - } - if err != nil { debugLogf("[pool] error subscribing to %s with %v: %s", relay, filter, err) return @@ -816,30 +744,13 @@ func (pool *Pool) CountMany( if err != nil { return } - - docount: ce, err := relay.countInternal(ctx, filter, opts) - - if errors.Is(err, ErrTooManySubscriptions) { - unlock := namedLock(relay.URL) - newRelay, newErr := pool.ensureNewRelay(relay.URL) - unlock() - if newErr != nil { - return - } - newRelay.realSubscriptionsLimit = relay.realSubscriptionsLimit - relay = newRelay - goto docount - } - if err != nil { return } - if len(ce.HyperLogLog) != 256 { return } - hll.MergeRegisters(ce.HyperLogLog) }(NormalizeURL(url)) }