pool: penalty box as a xsync map because mutexes are too hard.

This commit is contained in:
fiatjaf
2026-04-11 01:55:18 -03:00
parent b989b66bb7
commit 12af4717d4
+17 -15
View File
@@ -33,8 +33,7 @@ type Pool struct {
relayOptions RelayOptions relayOptions RelayOptions
// custom things not often used // custom things not often used
penaltyBoxMu sync.Mutex penaltyBox *xsync.MapOf[string, [2]float64]
penaltyBox map[string][2]float64
} }
// DirectedFilter combines a Filter with a specific relay URL. // DirectedFilter combines a Filter with a specific relay URL.
@@ -99,23 +98,22 @@ type PoolOptions struct {
} }
func (pool *Pool) startPenaltyBox() { func (pool *Pool) startPenaltyBox() {
pool.penaltyBox = make(map[string][2]float64) pool.penaltyBox = xsync.NewMapOf[string, [2]float64]()
go func() { go func() {
sleep := 30.0 sleep := 30.0
for { for {
time.Sleep(time.Duration(sleep) * time.Second) time.Sleep(time.Duration(sleep) * time.Second)
pool.penaltyBoxMu.Lock()
nextSleep := 300.0 nextSleep := 300.0
for url, v := range pool.penaltyBox { for url, v := range pool.penaltyBox.Range {
remainingSeconds := v[1] remainingSeconds := v[1]
remainingSeconds -= sleep remainingSeconds -= sleep
if remainingSeconds <= 0 { if remainingSeconds <= 0 {
pool.penaltyBox[url] = [2]float64{v[0], 0} pool.penaltyBox.Store(url, [2]float64{v[0], 0})
continue continue
} else { } else {
pool.penaltyBox[url] = [2]float64{v[0], remainingSeconds} pool.penaltyBox.Store(url, [2]float64{v[0], remainingSeconds})
} }
if remainingSeconds < nextSleep { if remainingSeconds < nextSleep {
@@ -124,7 +122,6 @@ func (pool *Pool) startPenaltyBox() {
} }
sleep = nextSleep sleep = nextSleep
pool.penaltyBoxMu.Unlock()
} }
}() }()
} }
@@ -138,9 +135,7 @@ func (pool *Pool) EnsureRelay(url string) (*Relay, error) {
relays, ok := pool.Relays.Load(nm) relays, ok := pool.Relays.Load(nm)
if ok && relays == nil { if ok && relays == nil {
if pool.penaltyBox != nil { if pool.penaltyBox != nil {
pool.penaltyBoxMu.Lock() v, _ := pool.penaltyBox.Load(nm)
defer pool.penaltyBoxMu.Unlock()
v, _ := pool.penaltyBox[nm]
if v[1] > 0 { if v[1] > 0 {
return nil, fmt.Errorf("in penalty box, %fs remaining", v[1]) return nil, fmt.Errorf("in penalty box, %fs remaining", v[1])
} }
@@ -173,10 +168,9 @@ func (pool *Pool) ensureNewRelay(nm string) (*Relay, error) {
if err := relay.Connect(pool.Context); err != nil { if err := relay.Connect(pool.Context); err != nil {
if pool.penaltyBox != nil { if pool.penaltyBox != nil {
// putting relay in penalty box // putting relay in penalty box
pool.penaltyBoxMu.Lock() pool.penaltyBox.Compute(nm, func(v [2]float64, loaded bool) (newV [2]float64, delete bool) {
defer pool.penaltyBoxMu.Unlock() return [2]float64{v[0] + 1, 30.0 + math.Pow(2, v[0]+1)}, false
v, _ := pool.penaltyBox[nm] })
pool.penaltyBox[nm] = [2]float64{v[0] + 1, 30.0 + math.Pow(2, v[0]+1)}
pool.Relays.Store(nm, nil) // this is important for penalty box detection on EnsureRelay pool.Relays.Store(nm, nil) // this is important for penalty box detection on EnsureRelay
} }
return nil, fmt.Errorf("failed to connect: %w", err) return nil, fmt.Errorf("failed to connect: %w", err)
@@ -421,7 +415,9 @@ func (pool *Pool) FetchManyReplaceable(
sub, err := relay.Subscribe(ctx, filter, opts) sub, err := relay.Subscribe(ctx, filter, opts)
if errors.Is(err, ErrTooManySubscriptions) { if errors.Is(err, ErrTooManySubscriptions) {
unlock := namedLock(relay.URL)
newRelay, newErr := pool.ensureNewRelay(relay.URL) newRelay, newErr := pool.ensureNewRelay(relay.URL)
unlock()
if newErr != nil { if newErr != nil {
return return
} }
@@ -571,7 +567,9 @@ func (pool *Pool) subMany(
sub, err = relay.Subscribe(ctx, filter, opts) sub, err = relay.Subscribe(ctx, filter, opts)
if errors.Is(err, ErrTooManySubscriptions) { if errors.Is(err, ErrTooManySubscriptions) {
unlock := namedLock(relay.URL)
newRelay, newErr := pool.ensureNewRelay(relay.URL) newRelay, newErr := pool.ensureNewRelay(relay.URL)
unlock()
if newErr == nil { if newErr == nil {
newRelay.realSubscriptionsLimit = relay.realSubscriptionsLimit newRelay.realSubscriptionsLimit = relay.realSubscriptionsLimit
relay = newRelay relay = newRelay
@@ -725,7 +723,9 @@ func (pool *Pool) subManyEose(
sub, err := relay.Subscribe(ctx, filter, opts) sub, err := relay.Subscribe(ctx, filter, opts)
if errors.Is(err, ErrTooManySubscriptions) { if errors.Is(err, ErrTooManySubscriptions) {
unlock := namedLock(relay.URL)
newRelay, newErr := pool.ensureNewRelay(relay.URL) newRelay, newErr := pool.ensureNewRelay(relay.URL)
unlock()
if newErr != nil { if newErr != nil {
return return
} }
@@ -821,7 +821,9 @@ func (pool *Pool) CountMany(
ce, err := relay.countInternal(ctx, filter, opts) ce, err := relay.countInternal(ctx, filter, opts)
if errors.Is(err, ErrTooManySubscriptions) { if errors.Is(err, ErrTooManySubscriptions) {
unlock := namedLock(relay.URL)
newRelay, newErr := pool.ensureNewRelay(relay.URL) newRelay, newErr := pool.ensureNewRelay(relay.URL)
unlock()
if newErr != nil { if newErr != nil {
return return
} }