From 2735abe0608c7e7afe58888a8b46b6fd7982095f Mon Sep 17 00:00:00 2001 From: fiatjaf Date: Fri, 3 Apr 2026 08:26:56 -0300 Subject: [PATCH] khatru: listener needed a xsync.Map instead of a map, because of concurrent access. --- khatru/listener.go | 25 ++++++++++++++----------- 1 file changed, 14 insertions(+), 11 deletions(-) diff --git a/khatru/listener.go b/khatru/listener.go index 4a4f5a6..48a6e3e 100644 --- a/khatru/listener.go +++ b/khatru/listener.go @@ -7,6 +7,7 @@ import ( "fiatjaf.com/lib/set" "fiatjaf.com/nostr" + "github.com/puzpuzpuz/xsync/v3" ) var ErrSubscriptionClosedByClient = errors.New("subscription closed by client") @@ -31,7 +32,7 @@ type subscription struct { type dispatcher struct { serial int - subscriptions map[int]subscription + subscriptions *xsync.MapOf[int, subscription] byAuthor map[nostr.PubKey]set.Set[int] byKind map[nostr.Kind]set.Set[int] fallback set.Set[int] @@ -39,7 +40,7 @@ type dispatcher struct { func newDispatcher() dispatcher { return dispatcher{ - subscriptions: make(map[int]subscription, 100), + subscriptions: xsync.NewMapOf[int, subscription](), byAuthor: make(map[nostr.PubKey]set.Set[int]), byKind: make(map[nostr.Kind]set.Set[int]), fallback: set.NewSliceSet[int](), @@ -50,7 +51,7 @@ func (d *dispatcher) addSubscription(sub subscription) int { d.serial++ ssid := d.serial - d.subscriptions[ssid] = sub + d.subscriptions.Store(ssid, sub) indexed := false if sub.filter.Authors != nil { @@ -85,11 +86,10 @@ func (d *dispatcher) addSubscription(sub subscription) int { } func (d *dispatcher) removeSubscription(ssid int) { - sub, ok := d.subscriptions[ssid] + sub, ok := d.subscriptions.LoadAndDelete(ssid) if !ok { return } - delete(d.subscriptions, ssid) indexed := false if sub.filter.Authors != nil { @@ -132,7 +132,7 @@ func (d *dispatcher) candidates(event nostr.Event) iter.Seq[subscription] { if hasAuthorSubs && hasKindSubs { for _, ssid := range authorSubs.Slice() { - sub, _ := d.subscriptions[ssid] + sub, _ := d.subscriptions.Load(ssid) if kindSubs.Has(ssid) { if filterMatchesTimestampConstraintsAndTags(sub.filter, event) { @@ -147,7 +147,8 @@ func (d *dispatcher) candidates(event nostr.Event) iter.Seq[subscription] { } } else if hasAuthorSubs { for _, ssid := range authorSubs.Slice() { - sub, _ := d.subscriptions[ssid] + sub, _ := d.subscriptions.Load(ssid) + if sub.filter.Kinds != nil { // if there are any kinds in the filter we already know this doesn't qualify continue @@ -161,7 +162,8 @@ func (d *dispatcher) candidates(event nostr.Event) iter.Seq[subscription] { } } else if hasKindSubs { for _, ssid := range kindSubs.Slice() { - sub, _ := d.subscriptions[ssid] + sub, _ := d.subscriptions.Load(ssid) + if sub.filter.Authors != nil { // if there are any authors in the filter we already know this doesn't qualify continue @@ -176,7 +178,8 @@ func (d *dispatcher) candidates(event nostr.Event) iter.Seq[subscription] { } for _, ssid := range d.fallback.Slice() { - sub, _ := d.subscriptions[ssid] + sub, _ := d.subscriptions.Load(ssid) + if filterMatchesTimestampConstraintsAndTags(sub.filter, event) { if !yield(sub) { return @@ -211,8 +214,8 @@ func tagKeyValueKey(tagKey, tagValue string) string { } func (rl *Relay) GetListeningFilters() []nostr.Filter { - respfilters := make([]nostr.Filter, 0, len(rl.dispatcher.subscriptions)) - for _, sub := range rl.dispatcher.subscriptions { + respfilters := make([]nostr.Filter, 0, rl.dispatcher.subscriptions.Size()) + for _, sub := range rl.dispatcher.subscriptions.Range { respfilters = append(respfilters, sub.filter) } return respfilters