From ac2d4579f182b7c8d2138d39335eb20ec778b86d Mon Sep 17 00:00:00 2001 From: fiatjaf Date: Sun, 29 Mar 2026 11:14:22 -0300 Subject: [PATCH] khatru: get rid of subrelays + segregated indexed listeners. --- filter.go | 1 + khatru/docs/core/routing.md | 58 ------ khatru/examples/routing/main.go | 61 ------ khatru/get-started.go | 2 +- khatru/handlers.go | 49 ++--- khatru/listener.go | 291 ++++++++++++++++++++------- khatru/listener_fuzz_test.go | 61 +++--- khatru/listener_test.go | 342 +++++++++++--------------------- khatru/policies/events.go | 3 + khatru/relay.go | 9 +- khatru/relay_test.go | 112 ++++------- khatru/router.go | 77 ------- 12 files changed, 425 insertions(+), 641 deletions(-) delete mode 100644 khatru/docs/core/routing.md delete mode 100644 khatru/examples/routing/main.go delete mode 100644 khatru/router.go diff --git a/filter.go b/filter.go index c5e5311..76d8d4f 100644 --- a/filter.go +++ b/filter.go @@ -44,6 +44,7 @@ func (ef Filter) Matches(event Event) bool { return true } +//go:inline func (ef Filter) MatchesIgnoringTimestampConstraints(event Event) bool { if ef.IDs != nil && !slices.Contains(ef.IDs, event.ID) { return false diff --git a/khatru/docs/core/routing.md b/khatru/docs/core/routing.md deleted file mode 100644 index 4c406d7..0000000 --- a/khatru/docs/core/routing.md +++ /dev/null @@ -1,58 +0,0 @@ ---- -outline: deep ---- - -# Request Routing - -If you have one (or more) set of policies that have to be executed in sequence (for example, first you check for the presence of a tag, then later in the next policies you use that tag without checking) and they only apply to some class of events, but you still want your relay to deal with other classes of events that can lead to cumbersome sets of rules, always having to check if an event meets the requirements and so on. There is where routing can help you. - -```go -sk := os.Getenv("RELAY_SECRET_KEY") - -// a relay for NIP-29 groups -groupsStore := boltdb.BoltBackend{} -groupsStore.Init() -groupsRelay, _ := khatru29.Init(relay29.Options{Domain: "example.com", DB: groupsStore, SecretKey: sk}) -// ... - -// a relay for everything else -publicStore := slicestore.SliceStore{} -publicStore.Init() -publicRelay := khatru.NewRelay() -publicRelay.UseEventStore(publicStore, 1000) -// ... - -// a higher-level relay that just routes between the two above -router := khatru.NewRouter() - -// route requests and events to the groups relay -router.Route(). - Req(func (filter nostr.Filter) bool { - _, hasHTag := filter.Tags["h"] - if hasHTag { - return true - } - return slices.Contains(filter.Kinds, func (k int) bool { return k == 39000 || k == 39001 || k == 39002 }) - }). - Event(func (event *nostr.Event) bool { - switch { - case event.Kind <= 9021 && event.Kind >= 9000: - return true - case event.Kind <= 39010 && event.Kind >= 39000: - return true - case event.Kind <= 12 && event.Kind >= 9: - return true - case event.Tags.Find("h") != nil: - return true - default: - return false - } - }). - Relay(groupsRelay) - -// route requests and events to the other -router.Route(). - Req(func (filter nostr.Filter) bool { return true }). - Event(func (event *nostr.Event) bool { return true }). - Relay(publicRelay) -``` diff --git a/khatru/examples/routing/main.go b/khatru/examples/routing/main.go deleted file mode 100644 index c20a146..0000000 --- a/khatru/examples/routing/main.go +++ /dev/null @@ -1,61 +0,0 @@ -package main - -import ( - "fmt" - "net/http" - "slices" - - "fiatjaf.com/nostr" - "fiatjaf.com/nostr/eventstore/lmdb" - "fiatjaf.com/nostr/eventstore/slicestore" - "fiatjaf.com/nostr/khatru" -) - -func main() { - db1 := &slicestore.SliceStore{} - db1.Init() - r1 := khatru.NewRelay() - r1.UseEventstore(db1, 400) - - db2 := &lmdb.LMDBBackend{Path: "/tmp/t"} - db2.Init() - r2 := khatru.NewRelay() - r2.UseEventstore(db2, 400) - - db3 := &slicestore.SliceStore{} - db3.Init() - r3 := khatru.NewRelay() - r3.UseEventstore(db3, 400) - - router := khatru.NewRouter() - - router.Route(). - Req(func(filter nostr.Filter) bool { - return slices.Contains(filter.Kinds, 30023) - }). - Event(func(event *nostr.Event) bool { - return event.Kind == 30023 - }). - Relay(r1) - - router.Route(). - Req(func(filter nostr.Filter) bool { - return slices.Contains(filter.Kinds, 1) && slices.Contains(filter.Tags["t"], "spam") - }). - Event(func(event *nostr.Event) bool { - return event.Kind == 1 && event.Tags.FindWithValue("t", "spam") != nil - }). - Relay(r2) - - router.Route(). - Req(func(filter nostr.Filter) bool { - return slices.Contains(filter.Kinds, 1) - }). - Event(func(event *nostr.Event) bool { - return event.Kind == 1 - }). - Relay(r3) - - fmt.Println("running on :3334") - http.ListenAndServe(":3334", router) -} diff --git a/khatru/get-started.go b/khatru/get-started.go index aca4018..f1e522c 100644 --- a/khatru/get-started.go +++ b/khatru/get-started.go @@ -61,5 +61,5 @@ func (rl *Relay) Shutdown(ctx context.Context) { ws.conn.Close() } clear(rl.clients) - rl.listeners = rl.listeners[:0] + rl.dispatcher = newDispatcher() } diff --git a/khatru/handlers.go b/khatru/handlers.go index 2333252..df093eb 100644 --- a/khatru/handlers.go +++ b/khatru/handlers.go @@ -217,35 +217,30 @@ func (rl *Relay) HandleWebsocket(w http.ResponseWriter, r *http.Request) { return } - srl := rl - if rl.getSubRelayFromEvent != nil { - srl = rl.getSubRelayFromEvent(&env.Event) - } - var ok bool var writeErr error var skipBroadcast bool if env.Event.Kind == nostr.KindDeletion { // store the delete event first - skipBroadcast, writeErr = srl.handleNormal(ctx, env.Event) + skipBroadcast, writeErr = rl.handleNormal(ctx, env.Event) if writeErr == nil { // this always returns "blocked: " whenever it returns an error - writeErr = srl.handleDeleteRequest(ctx, env.Event) + writeErr = rl.handleDeleteRequest(ctx, env.Event) } } else if env.Event.Kind.IsEphemeral() { // this will also always return a prefixed reason - writeErr = srl.handleEphemeral(ctx, env.Event) + writeErr = rl.handleEphemeral(ctx, env.Event) } else { // this will also always return a prefixed reason - skipBroadcast, writeErr = srl.handleNormal(ctx, env.Event) + skipBroadcast, writeErr = rl.handleNormal(ctx, env.Event) } var reason string if writeErr == nil { ok = true if !skipBroadcast { - n := srl.notifyListeners(env.Event, false) + n := rl.notifyListeners(env.Event, false) // the number of notified listeners matters in ephemeral events if env.Event.Kind.IsEphemeral() { @@ -278,15 +273,10 @@ func (rl *Relay) HandleWebsocket(w http.ResponseWriter, r *http.Request) { var total uint32 var hll *hyperloglog.HyperLogLog - srl := rl - if rl.getSubRelayFromFilter != nil { - srl = rl.getSubRelayFromFilter(env.Filter) - } - if offset := nip45.HyperLogLogEventPubkeyOffsetForFilter(env.Filter); offset != -1 { - total, hll = srl.handleCountRequestWithHLL(ctx, ws, env.Filter, offset) + total, hll = rl.handleCountRequestWithHLL(ctx, ws, env.Filter, offset) } else { - total = srl.handleCountRequest(ctx, ws, env.Filter) + total = rl.handleCountRequest(ctx, ws, env.Filter) } resp := nostr.CountEnvelope{ @@ -311,11 +301,7 @@ func (rl *Relay) HandleWebsocket(w http.ResponseWriter, r *http.Request) { // handle each filter separately -- dispatching events as they're loaded from databases for _, filter := range env.Filters { - srl := rl - if rl.getSubRelayFromFilter != nil { - srl = rl.getSubRelayFromFilter(filter) - } - err := srl.handleRequest(reqCtx, env.SubscriptionID, &eose, ws, filter) + err := rl.handleRequest(reqCtx, env.SubscriptionID, &eose, ws, filter) if err != nil { // fail everything if any filter is rejected reason := err.Error() @@ -325,8 +311,11 @@ func (rl *Relay) HandleWebsocket(w http.ResponseWriter, r *http.Request) { ws.WriteJSON(nostr.ClosedEnvelope{SubscriptionID: env.SubscriptionID, Reason: reason}) cancelReqCtx(errors.New("filter rejected")) return - } else { - rl.addListener(ws, env.SubscriptionID, srl, filter, cancelReqCtx) + } else if filter.IDs == nil { + // a query that is just a bunch of "ids": [...] will not add listeners. + // is this a bug? maybe, but I don't think anyone is listening for an ID + // that hasn't been published yet anywhere -- if yes we can change later + rl.addListener(ws, env.SubscriptionID, filter, cancelReqCtx) } } @@ -363,15 +352,11 @@ func (rl *Relay) HandleWebsocket(w http.ResponseWriter, r *http.Request) { ws.WriteJSON(nostr.OKEnvelope{EventID: env.Event.ID, OK: false, Reason: "error: failed to authenticate: " + err.Error()}) } case *nip77.OpenEnvelope: - srl := rl - if rl.getSubRelayFromFilter != nil { - srl = rl.getSubRelayFromFilter(env.Filter) - if !srl.Negentropy { - // ignore - return - } + if !rl.Negentropy { + // ignore + return } - vec, err := srl.startNegentropySession(ctx, env.Filter) + vec, err := rl.startNegentropySession(ctx, env.Filter) if err != nil { // fail everything if any filter is rejected reason := err.Error() diff --git a/khatru/listener.go b/khatru/listener.go index 7a35ce1..4a4f5a6 100644 --- a/khatru/listener.go +++ b/khatru/listener.go @@ -3,18 +3,18 @@ package khatru import ( "context" "errors" - "slices" + "iter" + "fiatjaf.com/lib/set" "fiatjaf.com/nostr" ) var ErrSubscriptionClosedByClient = errors.New("subscription closed by client") type listenerSpec struct { - id string // kept here so we can easily match against it removeListenerId - cancel context.CancelCauseFunc - index int - subrelay *Relay // this is important when we're dealing with routing, otherwise it will be always the same + ssid int // internal numeric id for a listener + sid string // client-provided subscription id + cancel context.CancelCauseFunc } type listener struct { @@ -23,10 +23,197 @@ type listener struct { ws *WebSocket } +type subscription struct { + id string + filter nostr.Filter + ws *WebSocket +} + +type dispatcher struct { + serial int + subscriptions map[int]subscription + byAuthor map[nostr.PubKey]set.Set[int] + byKind map[nostr.Kind]set.Set[int] + fallback set.Set[int] +} + +func newDispatcher() dispatcher { + return dispatcher{ + subscriptions: make(map[int]subscription, 100), + byAuthor: make(map[nostr.PubKey]set.Set[int]), + byKind: make(map[nostr.Kind]set.Set[int]), + fallback: set.NewSliceSet[int](), + } +} + +func (d *dispatcher) addSubscription(sub subscription) int { + d.serial++ + ssid := d.serial + + d.subscriptions[ssid] = sub + + indexed := false + if sub.filter.Authors != nil { + indexed = true + for _, author := range sub.filter.Authors { + s, ok := d.byAuthor[author] + if !ok { + s = set.NewSliceSet[int]() + d.byAuthor[author] = s + } + s.Add(ssid) + } + } + + if sub.filter.Kinds != nil { + indexed = true + for _, kind := range sub.filter.Kinds { + s, ok := d.byKind[kind] + if !ok { + s = set.NewSliceSet[int]() + d.byKind[kind] = s + } + s.Add(ssid) + } + } + + if !indexed { + d.fallback.Add(ssid) + } + + return ssid +} + +func (d *dispatcher) removeSubscription(ssid int) { + sub, ok := d.subscriptions[ssid] + if !ok { + return + } + delete(d.subscriptions, ssid) + + indexed := false + if sub.filter.Authors != nil { + indexed = true + for _, author := range sub.filter.Authors { + s, ok := d.byAuthor[author] + if !ok { + return + } + s.Remove(ssid) + if s.Len() == 0 { + delete(d.byAuthor, author) + } + } + } + + if sub.filter.Kinds != nil { + indexed = true + for _, kind := range sub.filter.Kinds { + s, ok := d.byKind[kind] + if !ok { + return + } + s.Remove(ssid) + if s.Len() == 0 { + delete(d.byKind, kind) + } + } + } + + if !indexed { + d.fallback.Remove(ssid) + } +} + +func (d *dispatcher) candidates(event nostr.Event) iter.Seq[subscription] { + return func(yield func(subscription) bool) { + authorSubs, hasAuthorSubs := d.byAuthor[event.PubKey] + kindSubs, hasKindSubs := d.byKind[event.Kind] + + if hasAuthorSubs && hasKindSubs { + for _, ssid := range authorSubs.Slice() { + sub, _ := d.subscriptions[ssid] + + if kindSubs.Has(ssid) { + if filterMatchesTimestampConstraintsAndTags(sub.filter, event) { + if !yield(sub) { + return + } + } + } else { + // matched author but not tags, so this event doesn't qualify for any filter + continue + } + } + } else if hasAuthorSubs { + for _, ssid := range authorSubs.Slice() { + sub, _ := d.subscriptions[ssid] + if sub.filter.Kinds != nil { + // if there are any kinds in the filter we already know this doesn't qualify + continue + } + + if filterMatchesTimestampConstraintsAndTags(sub.filter, event) { + if !yield(sub) { + return + } + } + } + } else if hasKindSubs { + for _, ssid := range kindSubs.Slice() { + sub, _ := d.subscriptions[ssid] + if sub.filter.Authors != nil { + // if there are any authors in the filter we already know this doesn't qualify + continue + } + + if filterMatchesTimestampConstraintsAndTags(sub.filter, event) { + if !yield(sub) { + return + } + } + } + } + + for _, ssid := range d.fallback.Slice() { + sub, _ := d.subscriptions[ssid] + if filterMatchesTimestampConstraintsAndTags(sub.filter, event) { + if !yield(sub) { + return + } + } + } + } +} + +//go:inline +func filterMatchesTimestampConstraintsAndTags(filter nostr.Filter, event nostr.Event) bool { + if filter.Since != 0 && event.CreatedAt < filter.Since { + return false + } + + if filter.Until != 0 && event.CreatedAt > filter.Until { + return false + } + + for f, v := range filter.Tags { + if !event.Tags.ContainsAny(f, v) { + return false + } + } + + return true +} + +//go:inline +func tagKeyValueKey(tagKey, tagValue string) string { + return tagKey + "\x00" + tagValue +} + func (rl *Relay) GetListeningFilters() []nostr.Filter { - respfilters := make([]nostr.Filter, len(rl.listeners)) - for i, l := range rl.listeners { - respfilters[i] = l.filter + respfilters := make([]nostr.Filter, 0, len(rl.dispatcher.subscriptions)) + for _, sub := range rl.dispatcher.subscriptions { + respfilters = append(respfilters, sub.filter) } return respfilters } @@ -36,7 +223,6 @@ func (rl *Relay) GetListeningFilters() []nostr.Filter { func (rl *Relay) addListener( ws *WebSocket, id string, - subrelay *Relay, filter nostr.Filter, cancel context.CancelCauseFunc, ) { @@ -48,18 +234,16 @@ func (rl *Relay) addListener( } if specs, ok := rl.clients[ws]; ok /* this will always be true unless client has disconnected very rapidly */ { - idx := len(subrelay.listeners) - rl.clients[ws] = append(specs, listenerSpec{ - id: id, - cancel: cancel, - subrelay: subrelay, - index: idx, - }) - subrelay.listeners = append(subrelay.listeners, listener{ + ssid := rl.dispatcher.addSubscription(subscription{ ws: ws, id: id, filter: filter, }) + rl.clients[ws] = append(specs, listenerSpec{ + ssid: ssid, + cancel: cancel, + sid: id, + }) } } @@ -70,35 +254,16 @@ func (rl *Relay) removeListenerId(ws *WebSocket, id string) { defer rl.clientsMutex.Unlock() if specs, ok := rl.clients[ws]; ok { - // swap delete specs that match this id - for s := len(specs) - 1; s >= 0; s-- { - spec := specs[s] - if spec.id == id { + kept := specs[:0] + for _, spec := range specs { + if spec.sid == id { spec.cancel(ErrSubscriptionClosedByClient) - specs[s] = specs[len(specs)-1] - specs = specs[0 : len(specs)-1] - rl.clients[ws] = specs - - // swap delete listeners one at a time, as they may be each in a different subrelay - srl := spec.subrelay // == rl in normal cases, but different when this came from a route - - if spec.index != len(srl.listeners)-1 { - movedFromIndex := len(srl.listeners) - 1 - moved := srl.listeners[movedFromIndex] // this wasn't removed, but will be moved - srl.listeners[spec.index] = moved - - // now we must update the the listener we just moved - // so its .index reflects its new position on srl.listeners - movedSpecs := rl.clients[moved.ws] - idx := slices.IndexFunc(movedSpecs, func(ls listenerSpec) bool { - return ls.index == movedFromIndex && ls.subrelay == srl - }) - movedSpecs[idx].index = spec.index - rl.clients[moved.ws] = movedSpecs - } - srl.listeners = srl.listeners[0 : len(srl.listeners)-1] // finally reduce the slice length + rl.dispatcher.removeSubscription(spec.ssid) + continue } + kept = append(kept, spec) } + rl.clients[ws] = kept } } @@ -106,31 +271,9 @@ func (rl *Relay) removeClientAndListeners(ws *WebSocket) { rl.clientsMutex.Lock() defer rl.clientsMutex.Unlock() if specs, ok := rl.clients[ws]; ok { - // swap delete listeners and delete client (all specs will be deleted) - for s, spec := range specs { + for _, spec := range specs { // no need to cancel contexts since they inherit from the main connection context - // just delete the listeners (swap-delete) - srl := spec.subrelay - - if spec.index != len(srl.listeners)-1 { - movedFromIndex := len(srl.listeners) - 1 - moved := srl.listeners[movedFromIndex] // this wasn't removed, but will be moved - srl.listeners[spec.index] = moved - - // temporarily update the spec of the listener being removed to have index == -1 - // (since it was removed) so it doesn't match in the search below - rl.clients[ws][s].index = -1 - - // now we must update the the listener we just moved - // so its .index reflects its new position on srl.listeners - movedSpecs := rl.clients[moved.ws] - idx := slices.IndexFunc(movedSpecs, func(ls listenerSpec) bool { - return ls.index == movedFromIndex && ls.subrelay == srl - }) - movedSpecs[idx].index = spec.index - rl.clients[moved.ws] = movedSpecs - } - srl.listeners = srl.listeners[0 : len(srl.listeners)-1] // finally reduce the slice length + rl.dispatcher.removeSubscription(spec.ssid) } } delete(rl.clients, ws) @@ -140,16 +283,14 @@ func (rl *Relay) removeClientAndListeners(ws *WebSocket) { func (rl *Relay) notifyListeners(event nostr.Event, skipPrevent bool) int { count := 0 listenersloop: - for _, listener := range rl.listeners { - if listener.filter.Matches(event) { - if !skipPrevent && nil != rl.PreventBroadcast { - if rl.PreventBroadcast(listener.ws, listener.filter, event) { - continue listenersloop - } + for sub := range rl.dispatcher.candidates(event) { + if !skipPrevent && nil != rl.PreventBroadcast { + if rl.PreventBroadcast(sub.ws, sub.filter, event) { + continue listenersloop } - listener.ws.WriteJSON(nostr.EventEnvelope{SubscriptionID: &listener.id, Event: event}) - count++ } + sub.ws.WriteJSON(nostr.EventEnvelope{SubscriptionID: &sub.id, Event: event}) + count++ } return count } diff --git a/khatru/listener_fuzz_test.go b/khatru/listener_fuzz_test.go index 0d7eb6c..b6ff57d 100644 --- a/khatru/listener_fuzz_test.go +++ b/khatru/listener_fuzz_test.go @@ -25,7 +25,7 @@ func FuzzRandomListenerClientRemoving(f *testing.F) { l := 0 for i := 0; i < totalWebsockets; i++ { - ws := &WebSocket{} + ws := &WebSocket{Context: rl.ctx} websockets = append(websockets, ws) rl.clients[ws] = nil } @@ -38,7 +38,7 @@ func FuzzRandomListenerClientRemoving(f *testing.F) { if s%addListenerFreq == 0 { l++ - rl.addListener(ws, w+":"+idFromSeqLower(j), rl, f, cancel) + rl.addListener(ws, w+":"+idFromSeqLower(j), f, cancel) } s++ @@ -46,14 +46,22 @@ func FuzzRandomListenerClientRemoving(f *testing.F) { } require.Len(t, rl.clients, totalWebsockets) - require.Len(t, rl.listeners, l) + ssidCount := 0 + for _, specs := range rl.clients { + ssidCount += len(specs) + } + require.Equal(t, l, ssidCount) for ws := range rl.clients { rl.removeClientAndListeners(ws) } require.Len(t, rl.clients, 0) - require.Len(t, rl.listeners, 0) + ssidCount = 0 + for _, specs := range rl.clients { + ssidCount += len(specs) + } + require.Equal(t, 0, ssidCount) }) } @@ -84,7 +92,7 @@ func FuzzRandomListenerIdRemoving(f *testing.F) { extra := 0 for i := 0; i < totalWebsockets; i++ { - ws := &WebSocket{} + ws := &WebSocket{Context: rl.ctx} websockets = append(websockets, ws) rl.clients[ws] = nil } @@ -97,11 +105,11 @@ func FuzzRandomListenerIdRemoving(f *testing.F) { if s%addListenerFreq == 0 { id := w + ":" + idFromSeqLower(j) - rl.addListener(ws, id, rl, f, cancel) + rl.addListener(ws, id, f, cancel) subs = append(subs, wsid{ws, id}) if s%addExtraListenerFreq == 0 { - rl.addListener(ws, id, rl, f, cancel) + rl.addListener(ws, id, f, cancel) extra++ } } @@ -111,7 +119,11 @@ func FuzzRandomListenerIdRemoving(f *testing.F) { } require.Len(t, rl.clients, totalWebsockets) - require.Len(t, rl.listeners, len(subs)+extra) + ssidCount := 0 + for _, specs := range rl.clients { + ssidCount += len(specs) + } + require.Equal(t, len(subs)+extra, ssidCount) rand.Shuffle(len(subs), func(i, j int) { subs[i], subs[j] = subs[j], subs[i] @@ -120,7 +132,11 @@ func FuzzRandomListenerIdRemoving(f *testing.F) { rl.removeListenerId(wsidToRemove.ws, wsidToRemove.id) } - require.Len(t, rl.listeners, 0) + ssidCount = 0 + for _, specs := range rl.clients { + ssidCount += len(specs) + } + require.Equal(t, 0, ssidCount) require.Len(t, rl.clients, totalWebsockets) for _, specs := range rl.clients { require.Len(t, specs, 0) @@ -129,23 +145,17 @@ func FuzzRandomListenerIdRemoving(f *testing.F) { } func FuzzRouterListenersPabloCrash(f *testing.F) { - f.Add(uint(3), uint(6), uint(2), uint(20)) - f.Fuzz(func(t *testing.T, totalRelays uint, totalConns uint, subFreq uint, subIterations uint) { - totalRelays++ + f.Add(uint(6), uint(2), uint(20)) + f.Fuzz(func(t *testing.T, totalConns uint, subFreq uint, subIterations uint) { totalConns++ subFreq++ subIterations++ rl := NewRelay() - relays := make([]*Relay, int(totalRelays)) - for i := 0; i < int(totalRelays); i++ { - relays[i] = NewRelay() - } - conns := make([]*WebSocket, int(totalConns)) for i := 0; i < int(totalConns); i++ { - ws := &WebSocket{} + ws := &WebSocket{Context: rl.ctx} conns[i] = ws rl.clients[ws] = make([]listenerSpec, 0, subIterations) } @@ -159,18 +169,16 @@ func FuzzRouterListenersPabloCrash(f *testing.F) { } s := 0 - subs := make([]wsid, 0, subIterations*totalConns*totalRelays) + subs := make([]wsid, 0, subIterations*totalConns) for i, conn := range conns { w := idFromSeqUpper(i) for j := 0; j < int(subIterations); j++ { id := w + ":" + idFromSeqLower(j) - for _, rlt := range relays { - if s%int(subFreq) == 0 { - rl.addListener(conn, id, rlt, f, cancel) - subs = append(subs, wsid{conn, id}) - } - s++ + if s%int(subFreq) == 0 { + rl.addListener(conn, id, f, cancel) + subs = append(subs, wsid{conn, id}) } + s++ } } @@ -181,8 +189,5 @@ func FuzzRouterListenersPabloCrash(f *testing.F) { for _, wsid := range subs { require.Len(t, rl.clients[wsid.ws], 0) } - for _, rlt := range relays { - require.Len(t, rlt.listeners, 0) - } }) } diff --git a/khatru/listener_test.go b/khatru/listener_test.go index 5dcb5b4..758ba27 100644 --- a/khatru/listener_test.go +++ b/khatru/listener_test.go @@ -26,8 +26,8 @@ func idFromSeq(seq int, min, max int) string { func TestListenerSetupAndRemoveOnce(t *testing.T) { rl := NewRelay() - ws1 := &WebSocket{} - ws2 := &WebSocket{} + ws1 := &WebSocket{Context: rl.ctx} + ws2 := &WebSocket{Context: rl.ctx} f1 := nostr.Filter{Kinds: []nostr.Kind{1}} f2 := nostr.Filter{Kinds: []nostr.Kind{2}} @@ -39,28 +39,21 @@ func TestListenerSetupAndRemoveOnce(t *testing.T) { var cancel func(cause error) = nil t.Run("adding listeners", func(t *testing.T) { - rl.addListener(ws1, "1a", rl, f1, cancel) - rl.addListener(ws1, "1b", rl, f2, cancel) - rl.addListener(ws2, "2a", rl, f3, cancel) - rl.addListener(ws1, "1c", rl, f3, cancel) + rl.addListener(ws1, "1a", f1, cancel) + rl.addListener(ws1, "1b", f2, cancel) + rl.addListener(ws2, "2a", f3, cancel) + rl.addListener(ws1, "1c", f3, cancel) require.Equal(t, map[*WebSocket][]listenerSpec{ ws1: { - {"1a", cancel, 0, rl}, - {"1b", cancel, 1, rl}, - {"1c", cancel, 3, rl}, + {1, "1a", cancel}, + {2, "1b", cancel}, + {4, "1c", cancel}, }, ws2: { - {"2a", cancel, 2, rl}, + {3, "2a", cancel}, }, }, rl.clients) - - require.Equal(t, []listener{ - {"1a", f1, ws1}, - {"1b", f2, ws1}, - {"2a", f3, ws2}, - {"1c", f3, ws1}, - }, rl.listeners) }) t.Run("removing a client", func(t *testing.T) { @@ -68,23 +61,19 @@ func TestListenerSetupAndRemoveOnce(t *testing.T) { require.Equal(t, map[*WebSocket][]listenerSpec{ ws2: { - {"2a", cancel, 0, rl}, + {3, "2a", cancel}, }, }, rl.clients) - - require.Equal(t, []listener{ - {"2a", f3, ws2}, - }, rl.listeners) }) } func TestListenerMoreConvolutedCase(t *testing.T) { rl := NewRelay() - ws1 := &WebSocket{} - ws2 := &WebSocket{} - ws3 := &WebSocket{} - ws4 := &WebSocket{} + ws1 := &WebSocket{Context: rl.ctx} + ws2 := &WebSocket{Context: rl.ctx} + ws3 := &WebSocket{Context: rl.ctx} + ws4 := &WebSocket{Context: rl.ctx} f1 := nostr.Filter{Kinds: []nostr.Kind{1}} f2 := nostr.Filter{Kinds: []nostr.Kind{2}} @@ -98,35 +87,27 @@ func TestListenerMoreConvolutedCase(t *testing.T) { var cancel func(cause error) = nil t.Run("adding listeners", func(t *testing.T) { - rl.addListener(ws1, "c", rl, f1, cancel) - rl.addListener(ws2, "b", rl, f2, cancel) - rl.addListener(ws3, "a", rl, f3, cancel) - rl.addListener(ws4, "d", rl, f3, cancel) - rl.addListener(ws2, "b", rl, f1, cancel) + rl.addListener(ws1, "c", f1, cancel) + rl.addListener(ws2, "b", f2, cancel) + rl.addListener(ws3, "a", f3, cancel) + rl.addListener(ws4, "d", f3, cancel) + rl.addListener(ws2, "b", f1, cancel) require.Equal(t, map[*WebSocket][]listenerSpec{ ws1: { - {"c", cancel, 0, rl}, + {1, "c", cancel}, }, ws2: { - {"b", cancel, 1, rl}, - {"b", cancel, 4, rl}, + {2, "b", cancel}, + {5, "b", cancel}, }, ws3: { - {"a", cancel, 2, rl}, + {3, "a", cancel}, }, ws4: { - {"d", cancel, 3, rl}, + {4, "d", cancel}, }, }, rl.clients) - - require.Equal(t, []listener{ - {"c", f1, ws1}, - {"b", f2, ws2}, - {"a", f3, ws3}, - {"d", f3, ws4}, - {"b", f1, ws2}, - }, rl.listeners) }) t.Run("removing a client", func(t *testing.T) { @@ -134,85 +115,62 @@ func TestListenerMoreConvolutedCase(t *testing.T) { require.Equal(t, map[*WebSocket][]listenerSpec{ ws1: { - {"c", cancel, 0, rl}, + {1, "c", cancel}, }, ws3: { - {"a", cancel, 2, rl}, + {3, "a", cancel}, }, ws4: { - {"d", cancel, 1, rl}, + {4, "d", cancel}, }, }, rl.clients) - - require.Equal(t, []listener{ - {"c", f1, ws1}, - {"d", f3, ws4}, - {"a", f3, ws3}, - }, rl.listeners) }) t.Run("reorganize the first case differently and then remove again", func(t *testing.T) { rl.clients = map[*WebSocket][]listenerSpec{ ws1: { - {"c", cancel, 1, rl}, + {2, "c", cancel}, }, ws2: { - {"b", cancel, 2, rl}, - {"b", cancel, 4, rl}, + {3, "b", cancel}, + {5, "b", cancel}, }, ws3: { - {"a", cancel, 0, rl}, + {1, "a", cancel}, }, ws4: { - {"d", cancel, 3, rl}, + {4, "d", cancel}, }, } - rl.listeners = []listener{ - {"a", f3, ws3}, - {"c", f1, ws1}, - {"b", f2, ws2}, - {"d", f3, ws4}, - {"b", f1, ws2}, - } rl.removeClientAndListeners(ws2) require.Equal(t, map[*WebSocket][]listenerSpec{ ws1: { - {"c", cancel, 1, rl}, + {2, "c", cancel}, }, ws3: { - {"a", cancel, 0, rl}, + {1, "a", cancel}, }, ws4: { - {"d", cancel, 2, rl}, + {4, "d", cancel}, }, }, rl.clients) - - require.Equal(t, []listener{ - {"a", f3, ws3}, - {"c", f1, ws1}, - {"d", f3, ws4}, - }, rl.listeners) }) } func TestListenerMoreStuffWithMultipleRelays(t *testing.T) { rl := NewRelay() - ws1 := &WebSocket{} - ws2 := &WebSocket{} - ws3 := &WebSocket{} - ws4 := &WebSocket{} + ws1 := &WebSocket{Context: rl.ctx} + ws2 := &WebSocket{Context: rl.ctx} + ws3 := &WebSocket{Context: rl.ctx} + ws4 := &WebSocket{Context: rl.ctx} f1 := nostr.Filter{Kinds: []nostr.Kind{1}} f2 := nostr.Filter{Kinds: []nostr.Kind{2}} f3 := nostr.Filter{Kinds: []nostr.Kind{3}} - rlx := NewRelay() - rly := NewRelay() - rlz := NewRelay() - rl.clients[ws1] = nil rl.clients[ws2] = nil rl.clients[ws3] = nil @@ -221,56 +179,37 @@ func TestListenerMoreStuffWithMultipleRelays(t *testing.T) { var cancel func(cause error) = nil t.Run("adding listeners", func(t *testing.T) { - rl.addListener(ws1, "c", rlx, f1, cancel) - rl.addListener(ws2, "b", rly, f2, cancel) - rl.addListener(ws3, "a", rlz, f3, cancel) - rl.addListener(ws4, "d", rlx, f3, cancel) - rl.addListener(ws4, "e", rlx, f3, cancel) - rl.addListener(ws3, "a", rlx, f3, cancel) - rl.addListener(ws4, "e", rly, f3, cancel) - rl.addListener(ws3, "f", rly, f3, cancel) - rl.addListener(ws1, "g", rlz, f1, cancel) - rl.addListener(ws2, "g", rlz, f2, cancel) + rl.addListener(ws1, "c", f1, cancel) + rl.addListener(ws2, "b", f2, cancel) + rl.addListener(ws3, "a", f3, cancel) + rl.addListener(ws4, "d", f3, cancel) + rl.addListener(ws4, "e", f3, cancel) + rl.addListener(ws3, "a", f3, cancel) + rl.addListener(ws4, "e", f3, cancel) + rl.addListener(ws3, "f", f3, cancel) + rl.addListener(ws1, "g", f1, cancel) + rl.addListener(ws2, "g", f2, cancel) require.Equal(t, map[*WebSocket][]listenerSpec{ ws1: { - {"c", cancel, 0, rlx}, - {"g", cancel, 1, rlz}, + {1, "c", cancel}, + {9, "g", cancel}, }, ws2: { - {"b", cancel, 0, rly}, - {"g", cancel, 2, rlz}, + {2, "b", cancel}, + {10, "g", cancel}, }, ws3: { - {"a", cancel, 0, rlz}, - {"a", cancel, 3, rlx}, - {"f", cancel, 2, rly}, + {3, "a", cancel}, + {6, "a", cancel}, + {8, "f", cancel}, }, ws4: { - {"d", cancel, 1, rlx}, - {"e", cancel, 2, rlx}, - {"e", cancel, 1, rly}, + {4, "d", cancel}, + {5, "e", cancel}, + {7, "e", cancel}, }, }, rl.clients) - - require.Equal(t, []listener{ - {"c", f1, ws1}, - {"d", f3, ws4}, - {"e", f3, ws4}, - {"a", f3, ws3}, - }, rlx.listeners) - - require.Equal(t, []listener{ - {"b", f2, ws2}, - {"e", f3, ws4}, - {"f", f3, ws3}, - }, rly.listeners) - - require.Equal(t, []listener{ - {"a", f3, ws3}, - {"g", f1, ws1}, - {"g", f2, ws2}, - }, rlz.listeners) }) t.Run("removing a subscription id", func(t *testing.T) { @@ -280,41 +219,23 @@ func TestListenerMoreStuffWithMultipleRelays(t *testing.T) { require.Equal(t, map[*WebSocket][]listenerSpec{ ws1: { - {"c", cancel, 0, rlx}, - {"g", cancel, 1, rlz}, + {1, "c", cancel}, + {9, "g", cancel}, }, ws2: { - {"b", cancel, 0, rly}, - {"g", cancel, 2, rlz}, + {2, "b", cancel}, + {10, "g", cancel}, }, ws3: { - {"a", cancel, 0, rlz}, - {"a", cancel, 1, rlx}, - {"f", cancel, 2, rly}, + {3, "a", cancel}, + {6, "a", cancel}, + {8, "f", cancel}, }, ws4: { - {"e", cancel, 1, rly}, - {"e", cancel, 2, rlx}, + {5, "e", cancel}, + {7, "e", cancel}, }, }, rl.clients) - - require.Equal(t, []listener{ - {"c", f1, ws1}, - {"a", f3, ws3}, - {"e", f3, ws4}, - }, rlx.listeners) - - require.Equal(t, []listener{ - {"b", f2, ws2}, - {"e", f3, ws4}, - {"f", f3, ws3}, - }, rly.listeners) - - require.Equal(t, []listener{ - {"a", f3, ws3}, - {"g", f1, ws1}, - {"g", f2, ws2}, - }, rlz.listeners) }) t.Run("removing another subscription id", func(t *testing.T) { @@ -325,37 +246,21 @@ func TestListenerMoreStuffWithMultipleRelays(t *testing.T) { require.Equal(t, map[*WebSocket][]listenerSpec{ ws1: { - {"c", cancel, 0, rlx}, - {"g", cancel, 1, rlz}, + {1, "c", cancel}, + {9, "g", cancel}, }, ws2: { - {"b", cancel, 0, rly}, - {"g", cancel, 0, rlz}, + {2, "b", cancel}, + {10, "g", cancel}, }, ws3: { - {"f", cancel, 2, rly}, + {8, "f", cancel}, }, ws4: { - {"e", cancel, 1, rly}, - {"e", cancel, 1, rlx}, + {5, "e", cancel}, + {7, "e", cancel}, }, }, rl.clients) - - require.Equal(t, []listener{ - {"c", f1, ws1}, - {"e", f3, ws4}, - }, rlx.listeners) - - require.Equal(t, []listener{ - {"b", f2, ws2}, - {"e", f3, ws4}, - {"f", f3, ws3}, - }, rly.listeners) - - require.Equal(t, []listener{ - {"g", f2, ws2}, - {"g", f1, ws1}, - }, rlz.listeners) }) t.Run("removing a connection", func(t *testing.T) { @@ -363,31 +268,17 @@ func TestListenerMoreStuffWithMultipleRelays(t *testing.T) { require.Equal(t, map[*WebSocket][]listenerSpec{ ws1: { - {"c", cancel, 0, rlx}, - {"g", cancel, 0, rlz}, + {1, "c", cancel}, + {9, "g", cancel}, }, ws3: { - {"f", cancel, 0, rly}, + {8, "f", cancel}, }, ws4: { - {"e", cancel, 1, rly}, - {"e", cancel, 1, rlx}, + {5, "e", cancel}, + {7, "e", cancel}, }, }, rl.clients) - - require.Equal(t, []listener{ - {"c", f1, ws1}, - {"e", f3, ws4}, - }, rlx.listeners) - - require.Equal(t, []listener{ - {"f", f3, ws3}, - {"e", f3, ws4}, - }, rly.listeners) - - require.Equal(t, []listener{ - {"g", f1, ws1}, - }, rlz.listeners) }) t.Run("removing another subscription id", func(t *testing.T) { @@ -398,26 +289,14 @@ func TestListenerMoreStuffWithMultipleRelays(t *testing.T) { require.Equal(t, map[*WebSocket][]listenerSpec{ ws1: { - {"c", cancel, 0, rlx}, - {"g", cancel, 0, rlz}, + {1, "c", cancel}, + {9, "g", cancel}, }, ws3: { - {"f", cancel, 0, rly}, + {8, "f", cancel}, }, ws4: {}, }, rl.clients) - - require.Equal(t, []listener{ - {"c", f1, ws1}, - }, rlx.listeners) - - require.Equal(t, []listener{ - {"f", f3, ws3}, - }, rly.listeners) - - require.Equal(t, []listener{ - {"g", f1, ws1}, - }, rlz.listeners) }) } @@ -432,7 +311,7 @@ func TestRandomListenerClientRemoving(t *testing.T) { l := 0 for i := 0; i < 20; i++ { - ws := &WebSocket{} + ws := &WebSocket{Context: rl.ctx} websockets = append(websockets, ws) rl.clients[ws] = nil } @@ -444,20 +323,28 @@ func TestRandomListenerClientRemoving(t *testing.T) { if rand.Intn(2) < 1 { l++ - rl.addListener(ws, w+":"+idFromSeqLower(j), rl, f, cancel) + rl.addListener(ws, w+":"+idFromSeqLower(j), f, cancel) } } } require.Len(t, rl.clients, 20) - require.Len(t, rl.listeners, l) + ssidCount := 0 + for _, specs := range rl.clients { + ssidCount += len(specs) + } + require.Equal(t, l, ssidCount) for ws := range rl.clients { rl.removeClientAndListeners(ws) } require.Len(t, rl.clients, 0) - require.Len(t, rl.listeners, 0) + ssidCount = 0 + for _, specs := range rl.clients { + ssidCount += len(specs) + } + require.Equal(t, 0, ssidCount) } func TestRandomListenerIdRemoving(t *testing.T) { @@ -477,7 +364,7 @@ func TestRandomListenerIdRemoving(t *testing.T) { extra := 0 for i := 0; i < 20; i++ { - ws := &WebSocket{} + ws := &WebSocket{Context: rl.ctx} websockets = append(websockets, ws) rl.clients[ws] = nil } @@ -489,11 +376,11 @@ func TestRandomListenerIdRemoving(t *testing.T) { if rand.Intn(2) < 1 { id := w + ":" + idFromSeqLower(j) - rl.addListener(ws, id, rl, f, cancel) + rl.addListener(ws, id, f, cancel) subs = append(subs, wsid{ws, id}) if rand.Intn(5) < 1 { - rl.addListener(ws, id, rl, f, cancel) + rl.addListener(ws, id, f, cancel) extra++ } } @@ -501,7 +388,11 @@ func TestRandomListenerIdRemoving(t *testing.T) { } require.Len(t, rl.clients, 20) - require.Len(t, rl.listeners, len(subs)+extra) + ssidCount := 0 + for _, specs := range rl.clients { + ssidCount += len(specs) + } + require.Equal(t, len(subs)+extra, ssidCount) rand.Shuffle(len(subs), func(i, j int) { subs[i], subs[j] = subs[j], subs[i] @@ -510,7 +401,11 @@ func TestRandomListenerIdRemoving(t *testing.T) { rl.removeListenerId(wsidToRemove.ws, wsidToRemove.id) } - require.Len(t, rl.listeners, 0) + ssidCount = 0 + for _, specs := range rl.clients { + ssidCount += len(specs) + } + require.Equal(t, 0, ssidCount) require.Len(t, rl.clients, 20) for _, specs := range rl.clients { require.Len(t, specs, 0) @@ -520,12 +415,9 @@ func TestRandomListenerIdRemoving(t *testing.T) { func TestRouterListenersPabloCrash(t *testing.T) { rl := NewRelay() - rla := NewRelay() - rlb := NewRelay() - - ws1 := &WebSocket{} - ws2 := &WebSocket{} - ws3 := &WebSocket{} + ws1 := &WebSocket{Context: rl.ctx} + ws2 := &WebSocket{Context: rl.ctx} + ws3 := &WebSocket{Context: rl.ctx} rl.clients[ws1] = nil rl.clients[ws2] = nil @@ -534,11 +426,11 @@ func TestRouterListenersPabloCrash(t *testing.T) { f := nostr.Filter{Kinds: []nostr.Kind{1}} cancel := func(cause error) {} - rl.addListener(ws1, ":1", rla, f, cancel) - rl.addListener(ws2, ":1", rlb, f, cancel) - rl.addListener(ws3, "a", rlb, f, cancel) - rl.addListener(ws3, "b", rla, f, cancel) - rl.addListener(ws3, "c", rlb, f, cancel) + rl.addListener(ws1, ":1", f, cancel) + rl.addListener(ws2, ":1", f, cancel) + rl.addListener(ws3, "a", f, cancel) + rl.addListener(ws3, "b", f, cancel) + rl.addListener(ws3, "c", f, cancel) rl.removeClientAndListeners(ws1) rl.removeClientAndListeners(ws3) diff --git a/khatru/policies/events.go b/khatru/policies/events.go index 61acbfb..15f56fa 100644 --- a/khatru/policies/events.go +++ b/khatru/policies/events.go @@ -124,6 +124,9 @@ var nostrReferencesPrefix = regexp.MustCompile(`\b(nevent1|npub1|nprofile1|note1 func RejectUnprefixedNostrReferences(ctx context.Context, event nostr.Event) (bool, string) { content := sdk.GetMainContent(event) + if content == "" { + content = event.Content + } // only do it for stuff that wasn't parsed as blocks already // (since those are already good references or URLs) diff --git a/khatru/relay.go b/khatru/relay.go index 0cc150e..e09226f 100644 --- a/khatru/relay.go +++ b/khatru/relay.go @@ -42,7 +42,7 @@ func NewRelay() *Relay { clients: make(map[*WebSocket][]listenerSpec, 100), clientsMutex: channelmutex.New(), - listeners: make([]listener, 0, 100), + dispatcher: newDispatcher(), serveMux: &http.ServeMux{}, @@ -87,11 +87,6 @@ type Relay struct { // this can be ignored unless you know what you're doing ChallengePrefix string - // these are used when this relays acts as a router - routes []Route - getSubRelayFromEvent func(*nostr.Event) *Relay // used for handling EVENTs - getSubRelayFromFilter func(nostr.Filter) *Relay // used for handling REQs - // setting up handlers here will enable these methods ManagementAPI RelayManagementAPI @@ -108,7 +103,7 @@ type Relay struct { // keep a connection reference to all connected clients for Server.Shutdown // also used for keeping track of who is listening to what clients map[*WebSocket][]listenerSpec - listeners []listener + dispatcher dispatcher clientsMutex *channelmutex.Mutex // set this to true to support negentropy diff --git a/khatru/relay_test.go b/khatru/relay_test.go index 7693d8a..baceb4b 100644 --- a/khatru/relay_test.go +++ b/khatru/relay_test.go @@ -9,6 +9,7 @@ import ( "fiatjaf.com/nostr" "fiatjaf.com/nostr/eventstore/slicestore" + "github.com/stretchr/testify/require" ) func TestBasicRelayFunctionality(t *testing.T) { @@ -46,15 +47,11 @@ func TestBasicRelayFunctionality(t *testing.T) { // connect two test clients url := "ws" + server.URL[4:] client1, err := nostr.RelayConnect(t.Context(), url, nostr.RelayOptions{}) - if err != nil { - t.Fatalf("failed to connect client1: %v", err) - } + require.NoError(t, err, "failed to connect client1") defer client1.Close() client2, err := nostr.RelayConnect(t.Context(), url, nostr.RelayOptions{}) - if err != nil { - t.Fatalf("failed to connect client2: %v", err) - } + require.NoError(t, err, "failed to connect client2") defer client2.Close() // test 1: store and query events @@ -64,18 +61,14 @@ func TestBasicRelayFunctionality(t *testing.T) { evt1 := createEvent(sk1, 1, "hello world", nil) err := client1.Publish(ctx, evt1) - if err != nil { - t.Fatalf("failed to publish event: %v", err) - } + require.NoError(t, err, "failed to publish event") // Query the event back sub, err := client2.Subscribe(ctx, nostr.Filter{ Authors: []nostr.PubKey{pk1}, Kinds: []nostr.Kind{1}, }, nostr.SubscriptionOptions{}) - if err != nil { - t.Fatalf("failed to subscribe: %v", err) - } + require.NoError(t, err, "failed to subscribe") defer sub.Unsub() // Wait for event @@ -85,7 +78,7 @@ func TestBasicRelayFunctionality(t *testing.T) { t.Errorf("got wrong event: %v", env.ID) } case <-ctx.Done(): - t.Fatal("timeout waiting for event") + require.FailNow(t, "timeout waiting for event") } }) @@ -99,17 +92,13 @@ func TestBasicRelayFunctionality(t *testing.T) { Authors: []nostr.PubKey{pk2}, Kinds: []nostr.Kind{1}, }, nostr.SubscriptionOptions{}) - if err != nil { - t.Fatalf("failed to subscribe: %v", err) - } + require.NoError(t, err, "failed to subscribe") defer sub.Unsub() // Publish event from client2 evt2 := createEvent(sk2, 1, "testing live events", nil) err = client2.Publish(ctx, evt2) - if err != nil { - t.Fatalf("failed to publish event: %v", err) - } + require.NoError(t, err, "failed to publish event") // Wait for event on subscription select { @@ -118,7 +107,7 @@ func TestBasicRelayFunctionality(t *testing.T) { t.Errorf("got wrong event: %v", env.ID) } case <-ctx.Done(): - t.Fatal("timeout waiting for live event") + require.FailNow(t, "timeout waiting for live event") } }) @@ -130,24 +119,18 @@ func TestBasicRelayFunctionality(t *testing.T) { // Create an event to be deleted evt3 := createEvent(sk1, 1, "delete me", nil) err = client1.Publish(ctx, evt3) - if err != nil { - t.Fatalf("failed to publish event: %v", err) - } + require.NoError(t, err, "failed to publish event") // Create deletion event delEvent := createEvent(sk1, 5, "deleting", nostr.Tags{{"e", evt3.ID.Hex()}}) err = client1.Publish(ctx, delEvent) - if err != nil { - t.Fatalf("failed to publish deletion event: %v", err) - } + require.NoError(t, err, "failed to publish deletion event") // Try to query the deleted event sub, err := client2.Subscribe(ctx, nostr.Filter{ IDs: []nostr.ID{evt3.ID}, }, nostr.SubscriptionOptions{}) - if err != nil { - t.Fatalf("failed to subscribe: %v", err) - } + require.NoError(t, err, "failed to subscribe") defer sub.Unsub() // Should get EOSE without receiving the deleted event @@ -162,7 +145,7 @@ func TestBasicRelayFunctionality(t *testing.T) { } goto checkDeleteStored case <-ctx.Done(): - t.Fatal("timeout waiting for EOSE") + require.FailNow(t, "timeout waiting for EOSE") } } @@ -171,9 +154,7 @@ func TestBasicRelayFunctionality(t *testing.T) { subDelete, err := client2.Subscribe(ctx, nostr.Filter{ IDs: []nostr.ID{delEvent.ID}, }, nostr.SubscriptionOptions{}) - if err != nil { - t.Fatalf("failed to subscribe to delete event: %v", err) - } + require.NoError(t, err, "failed to subscribe to delete event") defer subDelete.Unsub() gotDeleteEvent := false @@ -189,7 +170,7 @@ func TestBasicRelayFunctionality(t *testing.T) { } return case <-ctx.Done(): - t.Fatal("timeout waiting for EOSE on delete event") + require.FailNow(t, "timeout waiting for EOSE on delete event") } } }) @@ -204,36 +185,28 @@ func TestBasicRelayFunctionality(t *testing.T) { evt1.CreatedAt = 1000 // Set specific timestamp for testing evt1.Sign(sk1) err = client1.Publish(ctx, evt1) - if err != nil { - t.Fatalf("failed to publish initial event: %v", err) - } + require.NoError(t, err, "failed to publish initial event") // create newer event that should replace the first evt2 := createEvent(sk1, 0, `{"name":"newer"}`, nil) evt2.CreatedAt = 2004 // Newer timestamp evt2.Sign(sk1) err = client1.Publish(ctx, evt2) - if err != nil { - t.Fatalf("failed to publish newer event: %v", err) - } + require.NoError(t, err, "failed to publish newer event") // create older event that should not replace the current one evt3 := createEvent(sk1, 0, `{"name":"older"}`, nil) evt3.CreatedAt = 1500 // Older than evt2 evt3.Sign(sk1) err = client1.Publish(ctx, evt3) - if err != nil { - t.Fatalf("failed to publish older event: %v", err) - } + require.NoError(t, err, "failed to publish older event") // query to verify only the newest event exists sub, err := client2.Subscribe(ctx, nostr.Filter{ Authors: []nostr.PubKey{pk1}, Kinds: []nostr.Kind{0}, }, nostr.SubscriptionOptions{}) - if err != nil { - t.Fatalf("failed to subscribe: %v", err) - } + require.NoError(t, err, "failed to subscribe") defer sub.Unsub() // should only get one event back (the newest one) @@ -251,7 +224,7 @@ func TestBasicRelayFunctionality(t *testing.T) { } return case <-ctx.Done(): - t.Fatal("timeout waiting for events") + require.FailNow(t, "timeout waiting for events") } } }) @@ -281,26 +254,20 @@ func TestBasicRelayFunctionality(t *testing.T) { // connect test client url := "ws" + server.URL[4:] client, err := nostr.RelayConnect(t.Context(), url, nostr.RelayOptions{}) - if err != nil { - t.Fatalf("failed to connect client: %v", err) - } + require.NoError(t, err, "failed to connect client") defer client.Close() // create event that expires in 2 seconds expiration := strconv.FormatInt(int64(nostr.Now()+2), 10) evt := createEvent(sk1, 1, "i will expire soon", nostr.Tags{{"expiration", expiration}}) err = client.Publish(ctx, evt) - if err != nil { - t.Fatalf("failed to publish event: %v", err) - } + require.NoError(t, err, "failed to publish event") // verify event exists initially sub, err := client.Subscribe(ctx, nostr.Filter{ IDs: []nostr.ID{evt.ID}, }, nostr.SubscriptionOptions{}) - if err != nil { - t.Fatalf("failed to subscribe: %v", err) - } + require.NoError(t, err, "failed to subscribe") // should get the event select { @@ -309,7 +276,7 @@ func TestBasicRelayFunctionality(t *testing.T) { t.Error("got wrong event") } case <-ctx.Done(): - t.Fatal("timeout waiting for event") + require.FailNow(t, "timeout waiting for event") } sub.Unsub() @@ -320,9 +287,7 @@ func TestBasicRelayFunctionality(t *testing.T) { sub, err = client.Subscribe(ctx, nostr.Filter{ IDs: []nostr.ID{evt.ID}, }, nostr.SubscriptionOptions{}) - if err != nil { - t.Fatalf("failed to subscribe: %v", err) - } + require.NoError(t, err, "failed to subscribe") defer sub.Unsub() // should get EOSE without receiving the expired event @@ -337,7 +302,7 @@ func TestBasicRelayFunctionality(t *testing.T) { } return case <-ctx.Done(): - t.Fatal("timeout waiting for EOSE") + require.FailNow(t, "timeout waiting for EOSE") } } }) @@ -350,33 +315,26 @@ func TestBasicRelayFunctionality(t *testing.T) { // create an event from client1 evt4 := createEvent(sk1, 1, "try to delete me", nil) err = client1.Publish(ctx, evt4) - if err != nil { - t.Fatalf("failed to publish event: %v", err) - } + require.NoError(t, err) - // Try to delete it with client2 + // try to delete it with client2 delEvent := createEvent(sk2, 5, "trying to delete", nostr.Tags{{"e", evt4.ID.Hex()}}) err = client2.Publish(ctx, delEvent) - if err == nil { - t.Fatalf("should have failed to publish deletion event: %v", err) - } + require.Error(t, err) - // Verify event still exists + // verify event still exists sub, err := client1.Subscribe(ctx, nostr.Filter{ IDs: []nostr.ID{evt4.ID}, }, nostr.SubscriptionOptions{}) - if err != nil { - t.Fatalf("failed to subscribe: %v", err) - } + require.NoError(t, err) defer sub.Unsub() select { - case env := <-sub.Events: - if env.ID != evt4.ID { - t.Error("got wrong event") - } + case env, more := <-sub.Events: + require.True(t, more, "should get an event, got nothing") + require.Equal(t, env.ID, evt4.ID, "got wrong event") case <-ctx.Done(): - t.Fatal("event should still exist") + require.FailNow(t, "event should still exist") } }) } diff --git a/khatru/router.go b/khatru/router.go deleted file mode 100644 index ccb52c5..0000000 --- a/khatru/router.go +++ /dev/null @@ -1,77 +0,0 @@ -package khatru - -import ( - "fiatjaf.com/nostr" -) - -type Router struct{ *Relay } - -type Route struct { - eventMatcher func(*nostr.Event) bool - filterMatcher func(nostr.Filter) bool - relay *Relay -} - -type routeBuilder struct { - router *Router - eventMatcher func(*nostr.Event) bool - filterMatcher func(nostr.Filter) bool -} - -func NewRouter() *Router { - rr := &Router{Relay: NewRelay()} - rr.routes = make([]Route, 0, 3) - rr.getSubRelayFromFilter = func(f nostr.Filter) *Relay { - for _, route := range rr.routes { - if route.filterMatcher == nil || route.filterMatcher(f) { - return route.relay - } - } - return rr.Relay - } - rr.getSubRelayFromEvent = func(e *nostr.Event) *Relay { - for _, route := range rr.routes { - if route.eventMatcher == nil || route.eventMatcher(e) { - return route.relay - } - } - return rr.Relay - } - return rr -} - -func (rr *Router) Route() routeBuilder { - return routeBuilder{ - router: rr, - filterMatcher: func(f nostr.Filter) bool { return false }, - eventMatcher: func(e *nostr.Event) bool { return false }, - } -} - -func (rb routeBuilder) Req(fn func(nostr.Filter) bool) routeBuilder { - rb.filterMatcher = fn - return rb -} - -func (rb routeBuilder) AnyReq() routeBuilder { - rb.filterMatcher = nil - return rb -} - -func (rb routeBuilder) Event(fn func(*nostr.Event) bool) routeBuilder { - rb.eventMatcher = fn - return rb -} - -func (rb routeBuilder) AnyEvent() routeBuilder { - rb.eventMatcher = nil - return rb -} - -func (rb routeBuilder) Relay(relay *Relay) { - rb.router.routes = append(rb.router.routes, Route{ - filterMatcher: rb.filterMatcher, - eventMatcher: rb.eventMatcher, - relay: relay, - }) -}