Revert "pool: open new connections whenever a subscription limit is reached, reuse multiple simultaneous relay connections."
This reverts commit 9bf9816c15.
This commit is contained in:
@@ -21,7 +21,7 @@ const (
|
||||
|
||||
// Pool manages connections to multiple relays, ensures they are reopened when necessary and not duplicated.
|
||||
type Pool struct {
|
||||
Relays *xsync.MapOf[string, []*Relay]
|
||||
Relays *xsync.MapOf[string, *Relay]
|
||||
Context context.Context
|
||||
|
||||
authRequiredHandler func(context.Context, *Event) error
|
||||
@@ -53,7 +53,7 @@ func NewPool(opts PoolOptions) *Pool {
|
||||
ctx, cancel := context.WithCancelCause(context.Background())
|
||||
|
||||
pool := &Pool{
|
||||
Relays: xsync.NewMapOf[string, []*Relay](),
|
||||
Relays: xsync.NewMapOf[string, *Relay](),
|
||||
|
||||
Context: ctx,
|
||||
cancel: cancel,
|
||||
@@ -132,37 +132,20 @@ func (pool *Pool) EnsureRelay(url string) (*Relay, error) {
|
||||
nm := NormalizeURL(url)
|
||||
defer namedLock(nm)()
|
||||
|
||||
relays, ok := pool.Relays.Load(nm)
|
||||
if ok && relays == nil {
|
||||
relay, ok := pool.Relays.Load(nm)
|
||||
if ok && relay == nil {
|
||||
if pool.penaltyBox != nil {
|
||||
v, _ := pool.penaltyBox.Load(nm)
|
||||
if v[1] > 0 {
|
||||
return nil, fmt.Errorf("in penalty box, %fs remaining", v[1])
|
||||
}
|
||||
|
||||
// penaltybox ended, will proceed to create a connection
|
||||
}
|
||||
} else if ok {
|
||||
// we have connections -- return that with the least amount of subscriptions
|
||||
lessSubs := math.MaxInt
|
||||
var lessSubsRelay *Relay
|
||||
for _, relay := range relays {
|
||||
if relay.IsConnected() && relay.Subscriptions.Size() < lessSubs {
|
||||
lessSubsRelay = relay
|
||||
lessSubs = relay.Subscriptions.Size()
|
||||
}
|
||||
}
|
||||
|
||||
if lessSubsRelay != nil {
|
||||
return lessSubsRelay, nil
|
||||
}
|
||||
} else if ok && relay.IsConnected() {
|
||||
// already connected, unlock and return
|
||||
return relay, nil
|
||||
}
|
||||
|
||||
return pool.ensureNewRelay(nm)
|
||||
}
|
||||
|
||||
func (pool *Pool) ensureNewRelay(nm string) (*Relay, error) {
|
||||
relay := NewRelay(pool.Context, nm, pool.relayOptions)
|
||||
relay = NewRelay(pool.Context, url, pool.relayOptions)
|
||||
// try to connect
|
||||
// we use this ctx here so when the pool dies everything dies
|
||||
if err := relay.Connect(pool.Context); err != nil {
|
||||
@@ -176,30 +159,13 @@ func (pool *Pool) ensureNewRelay(nm string) (*Relay, error) {
|
||||
return nil, fmt.Errorf("failed to connect: %w", err)
|
||||
}
|
||||
|
||||
pool.Relays.Compute(nm, func(oldValue []*Relay, loaded bool) (newValue []*Relay, delete bool) {
|
||||
return append(oldValue, relay), false
|
||||
})
|
||||
|
||||
// when the connection dies we must ensure this relay is removed from the list
|
||||
go func(relay *Relay, nm string) {
|
||||
<-relay.Context().Done()
|
||||
|
||||
pool.Relays.Compute(nm, func(oldValue []*Relay, loaded bool) (newValue []*Relay, delete bool) {
|
||||
newValue = oldValue
|
||||
|
||||
for i := 0; i < len(newValue); i++ {
|
||||
if newValue[i] == relay {
|
||||
// swap-delete
|
||||
newValue[i] = newValue[len(newValue)-1]
|
||||
newValue = newValue[0 : len(newValue)-1]
|
||||
i--
|
||||
}
|
||||
}
|
||||
|
||||
return newValue, len(newValue) == 0
|
||||
})
|
||||
pool.Relays.Store(nm, relay)
|
||||
go func(r *Relay, relayURL string) {
|
||||
<-r.Context().Done()
|
||||
if current, ok := pool.Relays.Load(relayURL); ok && current == r {
|
||||
pool.Relays.Delete(relayURL)
|
||||
}
|
||||
}(relay, nm)
|
||||
|
||||
return relay, nil
|
||||
}
|
||||
|
||||
@@ -413,19 +379,6 @@ func (pool *Pool) FetchManyReplaceable(
|
||||
|
||||
subscribe:
|
||||
sub, err := relay.Subscribe(ctx, filter, opts)
|
||||
|
||||
if errors.Is(err, ErrTooManySubscriptions) {
|
||||
unlock := namedLock(relay.URL)
|
||||
newRelay, newErr := pool.ensureNewRelay(relay.URL)
|
||||
unlock()
|
||||
if newErr != nil {
|
||||
return
|
||||
}
|
||||
newRelay.realSubscriptionsLimit = relay.realSubscriptionsLimit
|
||||
relay = newRelay
|
||||
goto subscribe
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
debugLogf("[pool] error subscribing to %s with %v: %s", relay, filter, err)
|
||||
return
|
||||
@@ -565,18 +518,6 @@ func (pool *Pool) subMany(
|
||||
|
||||
subscribe:
|
||||
sub, err = relay.Subscribe(ctx, filter, opts)
|
||||
|
||||
if errors.Is(err, ErrTooManySubscriptions) {
|
||||
unlock := namedLock(relay.URL)
|
||||
newRelay, newErr := pool.ensureNewRelay(relay.URL)
|
||||
unlock()
|
||||
if newErr == nil {
|
||||
newRelay.realSubscriptionsLimit = relay.realSubscriptionsLimit
|
||||
relay = newRelay
|
||||
goto subscribe
|
||||
}
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
debugLogf("[pool] subscription to %s failed: %s -- will retry\n", nm, err)
|
||||
goto reconnect
|
||||
@@ -721,19 +662,6 @@ func (pool *Pool) subManyEose(
|
||||
|
||||
subscribe:
|
||||
sub, err := relay.Subscribe(ctx, filter, opts)
|
||||
|
||||
if errors.Is(err, ErrTooManySubscriptions) {
|
||||
unlock := namedLock(relay.URL)
|
||||
newRelay, newErr := pool.ensureNewRelay(relay.URL)
|
||||
unlock()
|
||||
if newErr != nil {
|
||||
return
|
||||
}
|
||||
newRelay.realSubscriptionsLimit = relay.realSubscriptionsLimit
|
||||
relay = newRelay
|
||||
goto subscribe
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
debugLogf("[pool] error subscribing to %s with %v: %s", relay, filter, err)
|
||||
return
|
||||
@@ -816,30 +744,13 @@ func (pool *Pool) CountMany(
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
docount:
|
||||
ce, err := relay.countInternal(ctx, filter, opts)
|
||||
|
||||
if errors.Is(err, ErrTooManySubscriptions) {
|
||||
unlock := namedLock(relay.URL)
|
||||
newRelay, newErr := pool.ensureNewRelay(relay.URL)
|
||||
unlock()
|
||||
if newErr != nil {
|
||||
return
|
||||
}
|
||||
newRelay.realSubscriptionsLimit = relay.realSubscriptionsLimit
|
||||
relay = newRelay
|
||||
goto docount
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
if len(ce.HyperLogLog) != 256 {
|
||||
return
|
||||
}
|
||||
|
||||
hll.MergeRegisters(ce.HyperLogLog)
|
||||
}(NormalizeURL(url))
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user