relay: check for subscription limits and error.
This commit is contained in:
@@ -20,6 +20,7 @@ import (
|
|||||||
|
|
||||||
ws "github.com/coder/websocket"
|
ws "github.com/coder/websocket"
|
||||||
"github.com/puzpuzpuz/xsync/v3"
|
"github.com/puzpuzpuz/xsync/v3"
|
||||||
|
"github.com/tidwall/gjson"
|
||||||
)
|
)
|
||||||
|
|
||||||
var subscriptionIDCounter atomic.Int64
|
var subscriptionIDCounter atomic.Int64
|
||||||
@@ -73,6 +74,9 @@ type Relay struct {
|
|||||||
okCallbacksMutex sync.Mutex
|
okCallbacksMutex sync.Mutex
|
||||||
subscriptionChannelCloseQueue chan *Subscription
|
subscriptionChannelCloseQueue chan *Subscription
|
||||||
|
|
||||||
|
subscriptionLimitCheckMutex sync.Mutex
|
||||||
|
realSubscriptionsLimit int
|
||||||
|
|
||||||
// custom things that aren't often used
|
// custom things that aren't often used
|
||||||
//
|
//
|
||||||
AssumeValid bool // this will skip verifying signatures for events received from this relay
|
AssumeValid bool // this will skip verifying signatures for events received from this relay
|
||||||
@@ -577,6 +581,69 @@ func (r *Relay) publish(ctx context.Context, id ID, env Envelope) error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var ErrTooManySubscriptions = errors.New("subscription limit reached")
|
||||||
|
|
||||||
|
func (r *Relay) checkSubscriptionLimit(ctx context.Context) error {
|
||||||
|
current := r.Subscriptions.Size()
|
||||||
|
tempLimit := 20
|
||||||
|
|
||||||
|
// special case troublesome relays
|
||||||
|
if strings.HasPrefix(r.URL, "wss://relay.bullishbounty.com") {
|
||||||
|
r.realSubscriptionsLimit = 10
|
||||||
|
if current >= r.realSubscriptionsLimit {
|
||||||
|
return ErrTooManySubscriptions
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if current < tempLimit {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
r.subscriptionLimitCheckMutex.Lock()
|
||||||
|
if r.realSubscriptionsLimit == 0 {
|
||||||
|
r.realSubscriptionsLimit = r.fetchSubscriptionLimit(ctx)
|
||||||
|
}
|
||||||
|
r.subscriptionLimitCheckMutex.Unlock()
|
||||||
|
|
||||||
|
if current < r.realSubscriptionsLimit {
|
||||||
|
return ErrTooManySubscriptions
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *Relay) fetchSubscriptionLimit(ctx context.Context) int {
|
||||||
|
if _, ok := ctx.Deadline(); !ok {
|
||||||
|
var cancel context.CancelFunc
|
||||||
|
ctx, cancel = context.WithTimeout(ctx, 7*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
}
|
||||||
|
req, err := http.NewRequestWithContext(ctx, "GET", "http"+r.URL[2:], nil)
|
||||||
|
if err != nil {
|
||||||
|
return 40
|
||||||
|
}
|
||||||
|
req.Header.Add("Accept", "application/nostr+json")
|
||||||
|
req.Header.Add("User-Agent", "nostrlib/go")
|
||||||
|
resp, err := http.DefaultClient.Do(req)
|
||||||
|
if err != nil {
|
||||||
|
return 40
|
||||||
|
}
|
||||||
|
defer resp.Body.Close()
|
||||||
|
body, err := io.ReadAll(resp.Body)
|
||||||
|
if err != nil {
|
||||||
|
return 40
|
||||||
|
}
|
||||||
|
result := gjson.GetBytes(body, "limitations.max_subscriptions")
|
||||||
|
if !result.Exists() {
|
||||||
|
return 40
|
||||||
|
}
|
||||||
|
limit := result.Int()
|
||||||
|
if limit == 0 {
|
||||||
|
return 40
|
||||||
|
}
|
||||||
|
return int(limit)
|
||||||
|
}
|
||||||
|
|
||||||
// Subscribe sends a "REQ" command to the relay r as in NIP-01.
|
// Subscribe sends a "REQ" command to the relay r as in NIP-01.
|
||||||
// Events are returned through the channel sub.Events.
|
// Events are returned through the channel sub.Events.
|
||||||
// The subscription is closed when context ctx is cancelled ("CLOSE" in NIP-01).
|
// The subscription is closed when context ctx is cancelled ("CLOSE" in NIP-01).
|
||||||
@@ -587,6 +654,9 @@ func (r *Relay) Subscribe(ctx context.Context, filter Filter, opts SubscriptionO
|
|||||||
if !r.IsConnected() {
|
if !r.IsConnected() {
|
||||||
return nil, ErrDisconnected
|
return nil, ErrDisconnected
|
||||||
}
|
}
|
||||||
|
if err := r.checkSubscriptionLimit(ctx); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
sub := r.PrepareSubscription(ctx, filter, opts)
|
sub := r.PrepareSubscription(ctx, filter, opts)
|
||||||
|
|
||||||
@@ -748,6 +818,9 @@ func (r *Relay) countInternal(ctx context.Context, filter Filter, opts Subscript
|
|||||||
hasAuthed := false
|
hasAuthed := false
|
||||||
|
|
||||||
for {
|
for {
|
||||||
|
if err := r.checkSubscriptionLimit(ctx); err != nil {
|
||||||
|
return CountEnvelope{}, err
|
||||||
|
}
|
||||||
sub := r.PrepareSubscription(ctx, filter, opts)
|
sub := r.PrepareSubscription(ctx, filter, opts)
|
||||||
sub.countResult = make(chan CountEnvelope, 1)
|
sub.countResult = make(chan CountEnvelope, 1)
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user