khatru: we haven't fixed the nil ws bug on dispatcher, but at least now we have more tests and an even more efficient architecture!

This commit is contained in:
fiatjaf
2026-04-22 23:16:43 -03:00
parent 223d95461f
commit e2ad68d050
4 changed files with 101 additions and 57 deletions
+5 -1
View File
@@ -89,6 +89,7 @@ func FuzzDispatcherCandidates(f *testing.F) {
} }
ssid := d.addSubscription(sub) ssid := d.addSubscription(sub)
active[ssid] = sub active[ssid] = sub
activeSSIDs = append(activeSSIDs, ssid) activeSSIDs = append(activeSSIDs, ssid)
} else { } else {
@@ -96,14 +97,17 @@ func FuzzDispatcherCandidates(f *testing.F) {
ssid := activeSSIDs[idx] ssid := activeSSIDs[idx]
d.removeSubscription(ssid) d.removeSubscription(ssid)
delete(active, ssid) delete(active, ssid)
activeSSIDs = append(activeSSIDs[:idx], activeSSIDs[idx+1:]...) activeSSIDs = append(activeSSIDs[:idx], activeSSIDs[idx+1:]...)
} }
for range int(checks%7) + 1 { for range int(checks%7) + 1 {
event := fuzzDispatcherEvent(&state) event := fuzzDispatcherEvent(&state)
expected := expectedDispatcherCandidates(active, event) expected := expectedDispatcherCandidates(active, event)
actual := collectedDispatcherCandidates(&d, event) actual := collectedDispatcherCandidates(&d, event)
require.Equalf(t, expected, actual, "seed=%d advance=%d event=%s active=%v", seed, advance, event.String(), active) require.Equalf(t, expected, actual, "seed=%d advance=%d event=%s active=%v", seed, advance, event.String(), active)
} }
} }
@@ -203,7 +207,7 @@ func fuzzDispatcherTagMap(seed *fuzzState) nostr.TagMap {
count := seed.next(3) count := seed.next(3)
if count == 0 { if count == 0 {
return nostr.TagMap{} return nil
} }
tags := make(nostr.TagMap, count) tags := make(nostr.TagMap, count)
+86 -56
View File
@@ -31,19 +31,21 @@ type subscription struct {
} }
type dispatcher struct { type dispatcher struct {
serial int serial int
subscriptions *xsync.MapOf[int, subscription] subscriptions *xsync.MapOf[int, subscription]
byAuthor *xsync.MapOf[nostr.PubKey, set.Set[int]] byAuthor *xsync.MapOf[nostr.PubKey, set.Set[int]]
byKind *xsync.MapOf[nostr.Kind, set.Set[int]] byKind *xsync.MapOf[nostr.Kind, set.Set[int]]
fallback set.Set[int] fallbackTags set.Set[int]
fallbackNothing set.Set[int]
} }
func newDispatcher() dispatcher { func newDispatcher() dispatcher {
return dispatcher{ return dispatcher{
subscriptions: xsync.NewMapOf[int, subscription](), subscriptions: xsync.NewMapOf[int, subscription](),
byAuthor: xsync.NewMapOf[nostr.PubKey, set.Set[int]](), byAuthor: xsync.NewMapOf[nostr.PubKey, set.Set[int]](),
byKind: xsync.NewMapOf[nostr.Kind, set.Set[int]](), byKind: xsync.NewMapOf[nostr.Kind, set.Set[int]](),
fallback: set.NewSliceSet[int](), fallbackTags: set.NewSliceSet[int](),
fallbackNothing: set.NewSliceSet[int](),
} }
} }
@@ -57,72 +59,80 @@ func (d *dispatcher) addSubscription(sub subscription) int {
if sub.filter.Authors != nil { if sub.filter.Authors != nil {
indexed = true indexed = true
for _, author := range sub.filter.Authors { for _, author := range sub.filter.Authors {
s, ok := d.byAuthor.Load(author) d.byAuthor.Compute(author, func(s set.Set[int], loaded bool) (set.Set[int], bool) {
if !ok { if !loaded {
s = set.NewSliceSet[int]() s = set.NewSliceSet[int]()
d.byAuthor.Store(author, s) }
} s.Add(ssid)
s.Add(ssid) return s, false
})
} }
} }
if sub.filter.Kinds != nil { if sub.filter.Kinds != nil {
indexed = true indexed = true
for _, kind := range sub.filter.Kinds { for _, kind := range sub.filter.Kinds {
s, ok := d.byKind.Load(kind) d.byKind.Compute(kind, func(s set.Set[int], loaded bool) (set.Set[int], bool) {
if !ok { if !loaded {
s = set.NewSliceSet[int]() s = set.NewSliceSet[int]()
d.byKind.Store(kind, s) }
} s.Add(ssid)
s.Add(ssid) return s, false
})
} }
} }
if !indexed { if !indexed {
d.fallback.Add(ssid) if sub.filter.Tags != nil {
d.fallbackTags.Add(ssid)
} else {
d.fallbackNothing.Add(ssid)
}
} }
return ssid return ssid
} }
func (d *dispatcher) removeSubscription(ssid int) { func (d *dispatcher) removeSubscription(ssid int) {
sub, ok := d.subscriptions.LoadAndDelete(ssid) d.subscriptions.Compute(ssid, func(sub subscription, loaded bool) (subscription, bool) {
if !ok { indexed := false
return
}
indexed := false if sub.filter.Authors != nil {
if sub.filter.Authors != nil { indexed = true
indexed = true for _, author := range sub.filter.Authors {
for _, author := range sub.filter.Authors { d.byAuthor.Compute(author, func(s set.Set[int], loaded bool) (set.Set[int], bool) {
s, ok := d.byAuthor.Load(author) if !loaded {
if !ok { return s, true
return }
} s.Remove(ssid)
s.Remove(ssid) return s, s.Len() == 0
if s.Len() == 0 { })
d.byAuthor.Delete(author)
} }
} }
}
if sub.filter.Kinds != nil { if sub.filter.Kinds != nil {
indexed = true indexed = true
for _, kind := range sub.filter.Kinds { for _, kind := range sub.filter.Kinds {
s, ok := d.byKind.Load(kind) d.byKind.Compute(kind, func(s set.Set[int], loaded bool) (set.Set[int], bool) {
if !ok { if !loaded {
return return s, true
} }
s.Remove(ssid) s.Remove(ssid)
if s.Len() == 0 { return s, s.Len() == 0
d.byKind.Delete(kind) })
} }
} }
}
if !indexed { if !indexed {
d.fallback.Remove(ssid) if sub.filter.Tags != nil {
} d.fallbackTags.Remove(ssid)
} else {
d.fallbackNothing.Remove(ssid)
}
}
return sub, true
})
} }
func (d *dispatcher) candidates(event nostr.Event) iter.Seq[subscription] { func (d *dispatcher) candidates(event nostr.Event) iter.Seq[subscription] {
@@ -188,10 +198,21 @@ func (d *dispatcher) candidates(event nostr.Event) iter.Seq[subscription] {
} }
} }
for _, ssid := range d.fallback.Slice() { if len(event.Tags) > 0 {
sub, _ := d.subscriptions.Load(ssid) for _, ssid := range d.fallbackTags.Slice() {
sub, _ := d.subscriptions.Load(ssid)
if filterMatchesTimestampConstraintsAndTags(sub.filter, event) { if filterMatchesTimestampConstraintsAndTags(sub.filter, event) {
if !yield(sub) {
return
}
}
}
}
for _, ssid := range d.fallbackNothing.Slice() {
sub, _ := d.subscriptions.Load(ssid)
if filterMatchesTimestampConstraints(sub.filter, event) {
if !yield(sub) { if !yield(sub) {
return return
} }
@@ -201,7 +222,7 @@ func (d *dispatcher) candidates(event nostr.Event) iter.Seq[subscription] {
} }
//go:inline //go:inline
func filterMatchesTimestampConstraintsAndTags(filter nostr.Filter, event nostr.Event) bool { func filterMatchesTimestampConstraints(filter nostr.Filter, event nostr.Event) bool {
if filter.Since != 0 && event.CreatedAt < filter.Since { if filter.Since != 0 && event.CreatedAt < filter.Since {
return false return false
} }
@@ -210,6 +231,15 @@ func filterMatchesTimestampConstraintsAndTags(filter nostr.Filter, event nostr.E
return false return false
} }
return true
}
//go:inline
func filterMatchesTimestampConstraintsAndTags(filter nostr.Filter, event nostr.Event) bool {
if !filterMatchesTimestampConstraints(filter, event) {
return false
}
for f, v := range filter.Tags { for f, v := range filter.Tags {
if !event.Tags.ContainsAny(f, v) { if !event.Tags.ContainsAny(f, v) {
return false return false
@@ -0,0 +1,5 @@
go test fuzz v1
int(-180)
int(92)
byte('{')
byte('\n')
@@ -0,0 +1,5 @@
go test fuzz v1
int(140)
int(-52)
byte('"')
byte('h')