diff --git a/relay.go b/relay.go index 4539538..3393e8c 100644 --- a/relay.go +++ b/relay.go @@ -20,6 +20,7 @@ import ( ws "github.com/coder/websocket" "github.com/puzpuzpuz/xsync/v3" + "github.com/tidwall/gjson" ) var subscriptionIDCounter atomic.Int64 @@ -73,6 +74,9 @@ type Relay struct { okCallbacksMutex sync.Mutex subscriptionChannelCloseQueue chan *Subscription + subscriptionLimitCheckMutex sync.Mutex + realSubscriptionsLimit int + // custom things that aren't often used // AssumeValid bool // this will skip verifying signatures for events received from this relay @@ -577,6 +581,69 @@ func (r *Relay) publish(ctx context.Context, id ID, env Envelope) error { } } +var ErrTooManySubscriptions = errors.New("subscription limit reached") + +func (r *Relay) checkSubscriptionLimit(ctx context.Context) error { + current := r.Subscriptions.Size() + tempLimit := 20 + + // special case troublesome relays + if strings.HasPrefix(r.URL, "wss://relay.bullishbounty.com") { + r.realSubscriptionsLimit = 10 + if current >= r.realSubscriptionsLimit { + return ErrTooManySubscriptions + } + } + + if current < tempLimit { + return nil + } + + r.subscriptionLimitCheckMutex.Lock() + if r.realSubscriptionsLimit == 0 { + r.realSubscriptionsLimit = r.fetchSubscriptionLimit(ctx) + } + r.subscriptionLimitCheckMutex.Unlock() + + if current < r.realSubscriptionsLimit { + return ErrTooManySubscriptions + } + + return nil +} + +func (r *Relay) fetchSubscriptionLimit(ctx context.Context) int { + if _, ok := ctx.Deadline(); !ok { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, 7*time.Second) + defer cancel() + } + req, err := http.NewRequestWithContext(ctx, "GET", "http"+r.URL[2:], nil) + if err != nil { + return 40 + } + req.Header.Add("Accept", "application/nostr+json") + req.Header.Add("User-Agent", "nostrlib/go") + resp, err := http.DefaultClient.Do(req) + if err != nil { + return 40 + } + defer resp.Body.Close() + body, err := io.ReadAll(resp.Body) + if err != nil { + return 40 + } + result := gjson.GetBytes(body, "limitations.max_subscriptions") + if !result.Exists() { + return 40 + } + limit := result.Int() + if limit == 0 { + return 40 + } + return int(limit) +} + // Subscribe sends a "REQ" command to the relay r as in NIP-01. // Events are returned through the channel sub.Events. // The subscription is closed when context ctx is cancelled ("CLOSE" in NIP-01). @@ -587,6 +654,9 @@ func (r *Relay) Subscribe(ctx context.Context, filter Filter, opts SubscriptionO if !r.IsConnected() { return nil, ErrDisconnected } + if err := r.checkSubscriptionLimit(ctx); err != nil { + return nil, err + } sub := r.PrepareSubscription(ctx, filter, opts) @@ -748,6 +818,9 @@ func (r *Relay) countInternal(ctx context.Context, filter Filter, opts Subscript hasAuthed := false for { + if err := r.checkSubscriptionLimit(ctx); err != nil { + return CountEnvelope{}, err + } sub := r.PrepareSubscription(ctx, filter, opts) sub.countResult = make(chan CountEnvelope, 1)