khatru: listener needed a xsync.Map instead of a map, because of concurrent access.

This commit is contained in:
fiatjaf
2026-04-03 08:26:56 -03:00
parent b9a3e78752
commit 2735abe060
+14 -11
View File
@@ -7,6 +7,7 @@ import (
"fiatjaf.com/lib/set" "fiatjaf.com/lib/set"
"fiatjaf.com/nostr" "fiatjaf.com/nostr"
"github.com/puzpuzpuz/xsync/v3"
) )
var ErrSubscriptionClosedByClient = errors.New("subscription closed by client") var ErrSubscriptionClosedByClient = errors.New("subscription closed by client")
@@ -31,7 +32,7 @@ type subscription struct {
type dispatcher struct { type dispatcher struct {
serial int serial int
subscriptions map[int]subscription subscriptions *xsync.MapOf[int, subscription]
byAuthor map[nostr.PubKey]set.Set[int] byAuthor map[nostr.PubKey]set.Set[int]
byKind map[nostr.Kind]set.Set[int] byKind map[nostr.Kind]set.Set[int]
fallback set.Set[int] fallback set.Set[int]
@@ -39,7 +40,7 @@ type dispatcher struct {
func newDispatcher() dispatcher { func newDispatcher() dispatcher {
return dispatcher{ return dispatcher{
subscriptions: make(map[int]subscription, 100), subscriptions: xsync.NewMapOf[int, subscription](),
byAuthor: make(map[nostr.PubKey]set.Set[int]), byAuthor: make(map[nostr.PubKey]set.Set[int]),
byKind: make(map[nostr.Kind]set.Set[int]), byKind: make(map[nostr.Kind]set.Set[int]),
fallback: set.NewSliceSet[int](), fallback: set.NewSliceSet[int](),
@@ -50,7 +51,7 @@ func (d *dispatcher) addSubscription(sub subscription) int {
d.serial++ d.serial++
ssid := d.serial ssid := d.serial
d.subscriptions[ssid] = sub d.subscriptions.Store(ssid, sub)
indexed := false indexed := false
if sub.filter.Authors != nil { if sub.filter.Authors != nil {
@@ -85,11 +86,10 @@ func (d *dispatcher) addSubscription(sub subscription) int {
} }
func (d *dispatcher) removeSubscription(ssid int) { func (d *dispatcher) removeSubscription(ssid int) {
sub, ok := d.subscriptions[ssid] sub, ok := d.subscriptions.LoadAndDelete(ssid)
if !ok { if !ok {
return return
} }
delete(d.subscriptions, ssid)
indexed := false indexed := false
if sub.filter.Authors != nil { if sub.filter.Authors != nil {
@@ -132,7 +132,7 @@ func (d *dispatcher) candidates(event nostr.Event) iter.Seq[subscription] {
if hasAuthorSubs && hasKindSubs { if hasAuthorSubs && hasKindSubs {
for _, ssid := range authorSubs.Slice() { for _, ssid := range authorSubs.Slice() {
sub, _ := d.subscriptions[ssid] sub, _ := d.subscriptions.Load(ssid)
if kindSubs.Has(ssid) { if kindSubs.Has(ssid) {
if filterMatchesTimestampConstraintsAndTags(sub.filter, event) { if filterMatchesTimestampConstraintsAndTags(sub.filter, event) {
@@ -147,7 +147,8 @@ func (d *dispatcher) candidates(event nostr.Event) iter.Seq[subscription] {
} }
} else if hasAuthorSubs { } else if hasAuthorSubs {
for _, ssid := range authorSubs.Slice() { for _, ssid := range authorSubs.Slice() {
sub, _ := d.subscriptions[ssid] sub, _ := d.subscriptions.Load(ssid)
if sub.filter.Kinds != nil { if sub.filter.Kinds != nil {
// if there are any kinds in the filter we already know this doesn't qualify // if there are any kinds in the filter we already know this doesn't qualify
continue continue
@@ -161,7 +162,8 @@ func (d *dispatcher) candidates(event nostr.Event) iter.Seq[subscription] {
} }
} else if hasKindSubs { } else if hasKindSubs {
for _, ssid := range kindSubs.Slice() { for _, ssid := range kindSubs.Slice() {
sub, _ := d.subscriptions[ssid] sub, _ := d.subscriptions.Load(ssid)
if sub.filter.Authors != nil { if sub.filter.Authors != nil {
// if there are any authors in the filter we already know this doesn't qualify // if there are any authors in the filter we already know this doesn't qualify
continue continue
@@ -176,7 +178,8 @@ func (d *dispatcher) candidates(event nostr.Event) iter.Seq[subscription] {
} }
for _, ssid := range d.fallback.Slice() { for _, ssid := range d.fallback.Slice() {
sub, _ := d.subscriptions[ssid] sub, _ := d.subscriptions.Load(ssid)
if filterMatchesTimestampConstraintsAndTags(sub.filter, event) { if filterMatchesTimestampConstraintsAndTags(sub.filter, event) {
if !yield(sub) { if !yield(sub) {
return return
@@ -211,8 +214,8 @@ func tagKeyValueKey(tagKey, tagValue string) string {
} }
func (rl *Relay) GetListeningFilters() []nostr.Filter { func (rl *Relay) GetListeningFilters() []nostr.Filter {
respfilters := make([]nostr.Filter, 0, len(rl.dispatcher.subscriptions)) respfilters := make([]nostr.Filter, 0, rl.dispatcher.subscriptions.Size())
for _, sub := range rl.dispatcher.subscriptions { for _, sub := range rl.dispatcher.subscriptions.Range {
respfilters = append(respfilters, sub.filter) respfilters = append(respfilters, sub.filter)
} }
return respfilters return respfilters