khatru: get rid of subrelays + segregated indexed listeners.

This commit is contained in:
fiatjaf
2026-03-29 11:14:22 -03:00
parent 56610a32e6
commit ac2d4579f1
12 changed files with 425 additions and 641 deletions
+1
View File
@@ -44,6 +44,7 @@ func (ef Filter) Matches(event Event) bool {
return true
}
//go:inline
func (ef Filter) MatchesIgnoringTimestampConstraints(event Event) bool {
if ef.IDs != nil && !slices.Contains(ef.IDs, event.ID) {
return false
-58
View File
@@ -1,58 +0,0 @@
---
outline: deep
---
# Request Routing
If you have one (or more) set of policies that have to be executed in sequence (for example, first you check for the presence of a tag, then later in the next policies you use that tag without checking) and they only apply to some class of events, but you still want your relay to deal with other classes of events that can lead to cumbersome sets of rules, always having to check if an event meets the requirements and so on. There is where routing can help you.
```go
sk := os.Getenv("RELAY_SECRET_KEY")
// a relay for NIP-29 groups
groupsStore := boltdb.BoltBackend{}
groupsStore.Init()
groupsRelay, _ := khatru29.Init(relay29.Options{Domain: "example.com", DB: groupsStore, SecretKey: sk})
// ...
// a relay for everything else
publicStore := slicestore.SliceStore{}
publicStore.Init()
publicRelay := khatru.NewRelay()
publicRelay.UseEventStore(publicStore, 1000)
// ...
// a higher-level relay that just routes between the two above
router := khatru.NewRouter()
// route requests and events to the groups relay
router.Route().
Req(func (filter nostr.Filter) bool {
_, hasHTag := filter.Tags["h"]
if hasHTag {
return true
}
return slices.Contains(filter.Kinds, func (k int) bool { return k == 39000 || k == 39001 || k == 39002 })
}).
Event(func (event *nostr.Event) bool {
switch {
case event.Kind <= 9021 && event.Kind >= 9000:
return true
case event.Kind <= 39010 && event.Kind >= 39000:
return true
case event.Kind <= 12 && event.Kind >= 9:
return true
case event.Tags.Find("h") != nil:
return true
default:
return false
}
}).
Relay(groupsRelay)
// route requests and events to the other
router.Route().
Req(func (filter nostr.Filter) bool { return true }).
Event(func (event *nostr.Event) bool { return true }).
Relay(publicRelay)
```
-61
View File
@@ -1,61 +0,0 @@
package main
import (
"fmt"
"net/http"
"slices"
"fiatjaf.com/nostr"
"fiatjaf.com/nostr/eventstore/lmdb"
"fiatjaf.com/nostr/eventstore/slicestore"
"fiatjaf.com/nostr/khatru"
)
func main() {
db1 := &slicestore.SliceStore{}
db1.Init()
r1 := khatru.NewRelay()
r1.UseEventstore(db1, 400)
db2 := &lmdb.LMDBBackend{Path: "/tmp/t"}
db2.Init()
r2 := khatru.NewRelay()
r2.UseEventstore(db2, 400)
db3 := &slicestore.SliceStore{}
db3.Init()
r3 := khatru.NewRelay()
r3.UseEventstore(db3, 400)
router := khatru.NewRouter()
router.Route().
Req(func(filter nostr.Filter) bool {
return slices.Contains(filter.Kinds, 30023)
}).
Event(func(event *nostr.Event) bool {
return event.Kind == 30023
}).
Relay(r1)
router.Route().
Req(func(filter nostr.Filter) bool {
return slices.Contains(filter.Kinds, 1) && slices.Contains(filter.Tags["t"], "spam")
}).
Event(func(event *nostr.Event) bool {
return event.Kind == 1 && event.Tags.FindWithValue("t", "spam") != nil
}).
Relay(r2)
router.Route().
Req(func(filter nostr.Filter) bool {
return slices.Contains(filter.Kinds, 1)
}).
Event(func(event *nostr.Event) bool {
return event.Kind == 1
}).
Relay(r3)
fmt.Println("running on :3334")
http.ListenAndServe(":3334", router)
}
+1 -1
View File
@@ -61,5 +61,5 @@ func (rl *Relay) Shutdown(ctx context.Context) {
ws.conn.Close()
}
clear(rl.clients)
rl.listeners = rl.listeners[:0]
rl.dispatcher = newDispatcher()
}
+17 -32
View File
@@ -217,35 +217,30 @@ func (rl *Relay) HandleWebsocket(w http.ResponseWriter, r *http.Request) {
return
}
srl := rl
if rl.getSubRelayFromEvent != nil {
srl = rl.getSubRelayFromEvent(&env.Event)
}
var ok bool
var writeErr error
var skipBroadcast bool
if env.Event.Kind == nostr.KindDeletion {
// store the delete event first
skipBroadcast, writeErr = srl.handleNormal(ctx, env.Event)
skipBroadcast, writeErr = rl.handleNormal(ctx, env.Event)
if writeErr == nil {
// this always returns "blocked: " whenever it returns an error
writeErr = srl.handleDeleteRequest(ctx, env.Event)
writeErr = rl.handleDeleteRequest(ctx, env.Event)
}
} else if env.Event.Kind.IsEphemeral() {
// this will also always return a prefixed reason
writeErr = srl.handleEphemeral(ctx, env.Event)
writeErr = rl.handleEphemeral(ctx, env.Event)
} else {
// this will also always return a prefixed reason
skipBroadcast, writeErr = srl.handleNormal(ctx, env.Event)
skipBroadcast, writeErr = rl.handleNormal(ctx, env.Event)
}
var reason string
if writeErr == nil {
ok = true
if !skipBroadcast {
n := srl.notifyListeners(env.Event, false)
n := rl.notifyListeners(env.Event, false)
// the number of notified listeners matters in ephemeral events
if env.Event.Kind.IsEphemeral() {
@@ -278,15 +273,10 @@ func (rl *Relay) HandleWebsocket(w http.ResponseWriter, r *http.Request) {
var total uint32
var hll *hyperloglog.HyperLogLog
srl := rl
if rl.getSubRelayFromFilter != nil {
srl = rl.getSubRelayFromFilter(env.Filter)
}
if offset := nip45.HyperLogLogEventPubkeyOffsetForFilter(env.Filter); offset != -1 {
total, hll = srl.handleCountRequestWithHLL(ctx, ws, env.Filter, offset)
total, hll = rl.handleCountRequestWithHLL(ctx, ws, env.Filter, offset)
} else {
total = srl.handleCountRequest(ctx, ws, env.Filter)
total = rl.handleCountRequest(ctx, ws, env.Filter)
}
resp := nostr.CountEnvelope{
@@ -311,11 +301,7 @@ func (rl *Relay) HandleWebsocket(w http.ResponseWriter, r *http.Request) {
// handle each filter separately -- dispatching events as they're loaded from databases
for _, filter := range env.Filters {
srl := rl
if rl.getSubRelayFromFilter != nil {
srl = rl.getSubRelayFromFilter(filter)
}
err := srl.handleRequest(reqCtx, env.SubscriptionID, &eose, ws, filter)
err := rl.handleRequest(reqCtx, env.SubscriptionID, &eose, ws, filter)
if err != nil {
// fail everything if any filter is rejected
reason := err.Error()
@@ -325,8 +311,11 @@ func (rl *Relay) HandleWebsocket(w http.ResponseWriter, r *http.Request) {
ws.WriteJSON(nostr.ClosedEnvelope{SubscriptionID: env.SubscriptionID, Reason: reason})
cancelReqCtx(errors.New("filter rejected"))
return
} else {
rl.addListener(ws, env.SubscriptionID, srl, filter, cancelReqCtx)
} else if filter.IDs == nil {
// a query that is just a bunch of "ids": [...] will not add listeners.
// is this a bug? maybe, but I don't think anyone is listening for an ID
// that hasn't been published yet anywhere -- if yes we can change later
rl.addListener(ws, env.SubscriptionID, filter, cancelReqCtx)
}
}
@@ -363,15 +352,11 @@ func (rl *Relay) HandleWebsocket(w http.ResponseWriter, r *http.Request) {
ws.WriteJSON(nostr.OKEnvelope{EventID: env.Event.ID, OK: false, Reason: "error: failed to authenticate: " + err.Error()})
}
case *nip77.OpenEnvelope:
srl := rl
if rl.getSubRelayFromFilter != nil {
srl = rl.getSubRelayFromFilter(env.Filter)
if !srl.Negentropy {
// ignore
return
}
if !rl.Negentropy {
// ignore
return
}
vec, err := srl.startNegentropySession(ctx, env.Filter)
vec, err := rl.startNegentropySession(ctx, env.Filter)
if err != nil {
// fail everything if any filter is rejected
reason := err.Error()
+216 -75
View File
@@ -3,18 +3,18 @@ package khatru
import (
"context"
"errors"
"slices"
"iter"
"fiatjaf.com/lib/set"
"fiatjaf.com/nostr"
)
var ErrSubscriptionClosedByClient = errors.New("subscription closed by client")
type listenerSpec struct {
id string // kept here so we can easily match against it removeListenerId
cancel context.CancelCauseFunc
index int
subrelay *Relay // this is important when we're dealing with routing, otherwise it will be always the same
ssid int // internal numeric id for a listener
sid string // client-provided subscription id
cancel context.CancelCauseFunc
}
type listener struct {
@@ -23,10 +23,197 @@ type listener struct {
ws *WebSocket
}
type subscription struct {
id string
filter nostr.Filter
ws *WebSocket
}
type dispatcher struct {
serial int
subscriptions map[int]subscription
byAuthor map[nostr.PubKey]set.Set[int]
byKind map[nostr.Kind]set.Set[int]
fallback set.Set[int]
}
func newDispatcher() dispatcher {
return dispatcher{
subscriptions: make(map[int]subscription, 100),
byAuthor: make(map[nostr.PubKey]set.Set[int]),
byKind: make(map[nostr.Kind]set.Set[int]),
fallback: set.NewSliceSet[int](),
}
}
func (d *dispatcher) addSubscription(sub subscription) int {
d.serial++
ssid := d.serial
d.subscriptions[ssid] = sub
indexed := false
if sub.filter.Authors != nil {
indexed = true
for _, author := range sub.filter.Authors {
s, ok := d.byAuthor[author]
if !ok {
s = set.NewSliceSet[int]()
d.byAuthor[author] = s
}
s.Add(ssid)
}
}
if sub.filter.Kinds != nil {
indexed = true
for _, kind := range sub.filter.Kinds {
s, ok := d.byKind[kind]
if !ok {
s = set.NewSliceSet[int]()
d.byKind[kind] = s
}
s.Add(ssid)
}
}
if !indexed {
d.fallback.Add(ssid)
}
return ssid
}
func (d *dispatcher) removeSubscription(ssid int) {
sub, ok := d.subscriptions[ssid]
if !ok {
return
}
delete(d.subscriptions, ssid)
indexed := false
if sub.filter.Authors != nil {
indexed = true
for _, author := range sub.filter.Authors {
s, ok := d.byAuthor[author]
if !ok {
return
}
s.Remove(ssid)
if s.Len() == 0 {
delete(d.byAuthor, author)
}
}
}
if sub.filter.Kinds != nil {
indexed = true
for _, kind := range sub.filter.Kinds {
s, ok := d.byKind[kind]
if !ok {
return
}
s.Remove(ssid)
if s.Len() == 0 {
delete(d.byKind, kind)
}
}
}
if !indexed {
d.fallback.Remove(ssid)
}
}
func (d *dispatcher) candidates(event nostr.Event) iter.Seq[subscription] {
return func(yield func(subscription) bool) {
authorSubs, hasAuthorSubs := d.byAuthor[event.PubKey]
kindSubs, hasKindSubs := d.byKind[event.Kind]
if hasAuthorSubs && hasKindSubs {
for _, ssid := range authorSubs.Slice() {
sub, _ := d.subscriptions[ssid]
if kindSubs.Has(ssid) {
if filterMatchesTimestampConstraintsAndTags(sub.filter, event) {
if !yield(sub) {
return
}
}
} else {
// matched author but not tags, so this event doesn't qualify for any filter
continue
}
}
} else if hasAuthorSubs {
for _, ssid := range authorSubs.Slice() {
sub, _ := d.subscriptions[ssid]
if sub.filter.Kinds != nil {
// if there are any kinds in the filter we already know this doesn't qualify
continue
}
if filterMatchesTimestampConstraintsAndTags(sub.filter, event) {
if !yield(sub) {
return
}
}
}
} else if hasKindSubs {
for _, ssid := range kindSubs.Slice() {
sub, _ := d.subscriptions[ssid]
if sub.filter.Authors != nil {
// if there are any authors in the filter we already know this doesn't qualify
continue
}
if filterMatchesTimestampConstraintsAndTags(sub.filter, event) {
if !yield(sub) {
return
}
}
}
}
for _, ssid := range d.fallback.Slice() {
sub, _ := d.subscriptions[ssid]
if filterMatchesTimestampConstraintsAndTags(sub.filter, event) {
if !yield(sub) {
return
}
}
}
}
}
//go:inline
func filterMatchesTimestampConstraintsAndTags(filter nostr.Filter, event nostr.Event) bool {
if filter.Since != 0 && event.CreatedAt < filter.Since {
return false
}
if filter.Until != 0 && event.CreatedAt > filter.Until {
return false
}
for f, v := range filter.Tags {
if !event.Tags.ContainsAny(f, v) {
return false
}
}
return true
}
//go:inline
func tagKeyValueKey(tagKey, tagValue string) string {
return tagKey + "\x00" + tagValue
}
func (rl *Relay) GetListeningFilters() []nostr.Filter {
respfilters := make([]nostr.Filter, len(rl.listeners))
for i, l := range rl.listeners {
respfilters[i] = l.filter
respfilters := make([]nostr.Filter, 0, len(rl.dispatcher.subscriptions))
for _, sub := range rl.dispatcher.subscriptions {
respfilters = append(respfilters, sub.filter)
}
return respfilters
}
@@ -36,7 +223,6 @@ func (rl *Relay) GetListeningFilters() []nostr.Filter {
func (rl *Relay) addListener(
ws *WebSocket,
id string,
subrelay *Relay,
filter nostr.Filter,
cancel context.CancelCauseFunc,
) {
@@ -48,18 +234,16 @@ func (rl *Relay) addListener(
}
if specs, ok := rl.clients[ws]; ok /* this will always be true unless client has disconnected very rapidly */ {
idx := len(subrelay.listeners)
rl.clients[ws] = append(specs, listenerSpec{
id: id,
cancel: cancel,
subrelay: subrelay,
index: idx,
})
subrelay.listeners = append(subrelay.listeners, listener{
ssid := rl.dispatcher.addSubscription(subscription{
ws: ws,
id: id,
filter: filter,
})
rl.clients[ws] = append(specs, listenerSpec{
ssid: ssid,
cancel: cancel,
sid: id,
})
}
}
@@ -70,35 +254,16 @@ func (rl *Relay) removeListenerId(ws *WebSocket, id string) {
defer rl.clientsMutex.Unlock()
if specs, ok := rl.clients[ws]; ok {
// swap delete specs that match this id
for s := len(specs) - 1; s >= 0; s-- {
spec := specs[s]
if spec.id == id {
kept := specs[:0]
for _, spec := range specs {
if spec.sid == id {
spec.cancel(ErrSubscriptionClosedByClient)
specs[s] = specs[len(specs)-1]
specs = specs[0 : len(specs)-1]
rl.clients[ws] = specs
// swap delete listeners one at a time, as they may be each in a different subrelay
srl := spec.subrelay // == rl in normal cases, but different when this came from a route
if spec.index != len(srl.listeners)-1 {
movedFromIndex := len(srl.listeners) - 1
moved := srl.listeners[movedFromIndex] // this wasn't removed, but will be moved
srl.listeners[spec.index] = moved
// now we must update the the listener we just moved
// so its .index reflects its new position on srl.listeners
movedSpecs := rl.clients[moved.ws]
idx := slices.IndexFunc(movedSpecs, func(ls listenerSpec) bool {
return ls.index == movedFromIndex && ls.subrelay == srl
})
movedSpecs[idx].index = spec.index
rl.clients[moved.ws] = movedSpecs
}
srl.listeners = srl.listeners[0 : len(srl.listeners)-1] // finally reduce the slice length
rl.dispatcher.removeSubscription(spec.ssid)
continue
}
kept = append(kept, spec)
}
rl.clients[ws] = kept
}
}
@@ -106,31 +271,9 @@ func (rl *Relay) removeClientAndListeners(ws *WebSocket) {
rl.clientsMutex.Lock()
defer rl.clientsMutex.Unlock()
if specs, ok := rl.clients[ws]; ok {
// swap delete listeners and delete client (all specs will be deleted)
for s, spec := range specs {
for _, spec := range specs {
// no need to cancel contexts since they inherit from the main connection context
// just delete the listeners (swap-delete)
srl := spec.subrelay
if spec.index != len(srl.listeners)-1 {
movedFromIndex := len(srl.listeners) - 1
moved := srl.listeners[movedFromIndex] // this wasn't removed, but will be moved
srl.listeners[spec.index] = moved
// temporarily update the spec of the listener being removed to have index == -1
// (since it was removed) so it doesn't match in the search below
rl.clients[ws][s].index = -1
// now we must update the the listener we just moved
// so its .index reflects its new position on srl.listeners
movedSpecs := rl.clients[moved.ws]
idx := slices.IndexFunc(movedSpecs, func(ls listenerSpec) bool {
return ls.index == movedFromIndex && ls.subrelay == srl
})
movedSpecs[idx].index = spec.index
rl.clients[moved.ws] = movedSpecs
}
srl.listeners = srl.listeners[0 : len(srl.listeners)-1] // finally reduce the slice length
rl.dispatcher.removeSubscription(spec.ssid)
}
}
delete(rl.clients, ws)
@@ -140,16 +283,14 @@ func (rl *Relay) removeClientAndListeners(ws *WebSocket) {
func (rl *Relay) notifyListeners(event nostr.Event, skipPrevent bool) int {
count := 0
listenersloop:
for _, listener := range rl.listeners {
if listener.filter.Matches(event) {
if !skipPrevent && nil != rl.PreventBroadcast {
if rl.PreventBroadcast(listener.ws, listener.filter, event) {
continue listenersloop
}
for sub := range rl.dispatcher.candidates(event) {
if !skipPrevent && nil != rl.PreventBroadcast {
if rl.PreventBroadcast(sub.ws, sub.filter, event) {
continue listenersloop
}
listener.ws.WriteJSON(nostr.EventEnvelope{SubscriptionID: &listener.id, Event: event})
count++
}
sub.ws.WriteJSON(nostr.EventEnvelope{SubscriptionID: &sub.id, Event: event})
count++
}
return count
}
+33 -28
View File
@@ -25,7 +25,7 @@ func FuzzRandomListenerClientRemoving(f *testing.F) {
l := 0
for i := 0; i < totalWebsockets; i++ {
ws := &WebSocket{}
ws := &WebSocket{Context: rl.ctx}
websockets = append(websockets, ws)
rl.clients[ws] = nil
}
@@ -38,7 +38,7 @@ func FuzzRandomListenerClientRemoving(f *testing.F) {
if s%addListenerFreq == 0 {
l++
rl.addListener(ws, w+":"+idFromSeqLower(j), rl, f, cancel)
rl.addListener(ws, w+":"+idFromSeqLower(j), f, cancel)
}
s++
@@ -46,14 +46,22 @@ func FuzzRandomListenerClientRemoving(f *testing.F) {
}
require.Len(t, rl.clients, totalWebsockets)
require.Len(t, rl.listeners, l)
ssidCount := 0
for _, specs := range rl.clients {
ssidCount += len(specs)
}
require.Equal(t, l, ssidCount)
for ws := range rl.clients {
rl.removeClientAndListeners(ws)
}
require.Len(t, rl.clients, 0)
require.Len(t, rl.listeners, 0)
ssidCount = 0
for _, specs := range rl.clients {
ssidCount += len(specs)
}
require.Equal(t, 0, ssidCount)
})
}
@@ -84,7 +92,7 @@ func FuzzRandomListenerIdRemoving(f *testing.F) {
extra := 0
for i := 0; i < totalWebsockets; i++ {
ws := &WebSocket{}
ws := &WebSocket{Context: rl.ctx}
websockets = append(websockets, ws)
rl.clients[ws] = nil
}
@@ -97,11 +105,11 @@ func FuzzRandomListenerIdRemoving(f *testing.F) {
if s%addListenerFreq == 0 {
id := w + ":" + idFromSeqLower(j)
rl.addListener(ws, id, rl, f, cancel)
rl.addListener(ws, id, f, cancel)
subs = append(subs, wsid{ws, id})
if s%addExtraListenerFreq == 0 {
rl.addListener(ws, id, rl, f, cancel)
rl.addListener(ws, id, f, cancel)
extra++
}
}
@@ -111,7 +119,11 @@ func FuzzRandomListenerIdRemoving(f *testing.F) {
}
require.Len(t, rl.clients, totalWebsockets)
require.Len(t, rl.listeners, len(subs)+extra)
ssidCount := 0
for _, specs := range rl.clients {
ssidCount += len(specs)
}
require.Equal(t, len(subs)+extra, ssidCount)
rand.Shuffle(len(subs), func(i, j int) {
subs[i], subs[j] = subs[j], subs[i]
@@ -120,7 +132,11 @@ func FuzzRandomListenerIdRemoving(f *testing.F) {
rl.removeListenerId(wsidToRemove.ws, wsidToRemove.id)
}
require.Len(t, rl.listeners, 0)
ssidCount = 0
for _, specs := range rl.clients {
ssidCount += len(specs)
}
require.Equal(t, 0, ssidCount)
require.Len(t, rl.clients, totalWebsockets)
for _, specs := range rl.clients {
require.Len(t, specs, 0)
@@ -129,23 +145,17 @@ func FuzzRandomListenerIdRemoving(f *testing.F) {
}
func FuzzRouterListenersPabloCrash(f *testing.F) {
f.Add(uint(3), uint(6), uint(2), uint(20))
f.Fuzz(func(t *testing.T, totalRelays uint, totalConns uint, subFreq uint, subIterations uint) {
totalRelays++
f.Add(uint(6), uint(2), uint(20))
f.Fuzz(func(t *testing.T, totalConns uint, subFreq uint, subIterations uint) {
totalConns++
subFreq++
subIterations++
rl := NewRelay()
relays := make([]*Relay, int(totalRelays))
for i := 0; i < int(totalRelays); i++ {
relays[i] = NewRelay()
}
conns := make([]*WebSocket, int(totalConns))
for i := 0; i < int(totalConns); i++ {
ws := &WebSocket{}
ws := &WebSocket{Context: rl.ctx}
conns[i] = ws
rl.clients[ws] = make([]listenerSpec, 0, subIterations)
}
@@ -159,18 +169,16 @@ func FuzzRouterListenersPabloCrash(f *testing.F) {
}
s := 0
subs := make([]wsid, 0, subIterations*totalConns*totalRelays)
subs := make([]wsid, 0, subIterations*totalConns)
for i, conn := range conns {
w := idFromSeqUpper(i)
for j := 0; j < int(subIterations); j++ {
id := w + ":" + idFromSeqLower(j)
for _, rlt := range relays {
if s%int(subFreq) == 0 {
rl.addListener(conn, id, rlt, f, cancel)
subs = append(subs, wsid{conn, id})
}
s++
if s%int(subFreq) == 0 {
rl.addListener(conn, id, f, cancel)
subs = append(subs, wsid{conn, id})
}
s++
}
}
@@ -181,8 +189,5 @@ func FuzzRouterListenersPabloCrash(f *testing.F) {
for _, wsid := range subs {
require.Len(t, rl.clients[wsid.ws], 0)
}
for _, rlt := range relays {
require.Len(t, rlt.listeners, 0)
}
})
}
+117 -225
View File
@@ -26,8 +26,8 @@ func idFromSeq(seq int, min, max int) string {
func TestListenerSetupAndRemoveOnce(t *testing.T) {
rl := NewRelay()
ws1 := &WebSocket{}
ws2 := &WebSocket{}
ws1 := &WebSocket{Context: rl.ctx}
ws2 := &WebSocket{Context: rl.ctx}
f1 := nostr.Filter{Kinds: []nostr.Kind{1}}
f2 := nostr.Filter{Kinds: []nostr.Kind{2}}
@@ -39,28 +39,21 @@ func TestListenerSetupAndRemoveOnce(t *testing.T) {
var cancel func(cause error) = nil
t.Run("adding listeners", func(t *testing.T) {
rl.addListener(ws1, "1a", rl, f1, cancel)
rl.addListener(ws1, "1b", rl, f2, cancel)
rl.addListener(ws2, "2a", rl, f3, cancel)
rl.addListener(ws1, "1c", rl, f3, cancel)
rl.addListener(ws1, "1a", f1, cancel)
rl.addListener(ws1, "1b", f2, cancel)
rl.addListener(ws2, "2a", f3, cancel)
rl.addListener(ws1, "1c", f3, cancel)
require.Equal(t, map[*WebSocket][]listenerSpec{
ws1: {
{"1a", cancel, 0, rl},
{"1b", cancel, 1, rl},
{"1c", cancel, 3, rl},
{1, "1a", cancel},
{2, "1b", cancel},
{4, "1c", cancel},
},
ws2: {
{"2a", cancel, 2, rl},
{3, "2a", cancel},
},
}, rl.clients)
require.Equal(t, []listener{
{"1a", f1, ws1},
{"1b", f2, ws1},
{"2a", f3, ws2},
{"1c", f3, ws1},
}, rl.listeners)
})
t.Run("removing a client", func(t *testing.T) {
@@ -68,23 +61,19 @@ func TestListenerSetupAndRemoveOnce(t *testing.T) {
require.Equal(t, map[*WebSocket][]listenerSpec{
ws2: {
{"2a", cancel, 0, rl},
{3, "2a", cancel},
},
}, rl.clients)
require.Equal(t, []listener{
{"2a", f3, ws2},
}, rl.listeners)
})
}
func TestListenerMoreConvolutedCase(t *testing.T) {
rl := NewRelay()
ws1 := &WebSocket{}
ws2 := &WebSocket{}
ws3 := &WebSocket{}
ws4 := &WebSocket{}
ws1 := &WebSocket{Context: rl.ctx}
ws2 := &WebSocket{Context: rl.ctx}
ws3 := &WebSocket{Context: rl.ctx}
ws4 := &WebSocket{Context: rl.ctx}
f1 := nostr.Filter{Kinds: []nostr.Kind{1}}
f2 := nostr.Filter{Kinds: []nostr.Kind{2}}
@@ -98,35 +87,27 @@ func TestListenerMoreConvolutedCase(t *testing.T) {
var cancel func(cause error) = nil
t.Run("adding listeners", func(t *testing.T) {
rl.addListener(ws1, "c", rl, f1, cancel)
rl.addListener(ws2, "b", rl, f2, cancel)
rl.addListener(ws3, "a", rl, f3, cancel)
rl.addListener(ws4, "d", rl, f3, cancel)
rl.addListener(ws2, "b", rl, f1, cancel)
rl.addListener(ws1, "c", f1, cancel)
rl.addListener(ws2, "b", f2, cancel)
rl.addListener(ws3, "a", f3, cancel)
rl.addListener(ws4, "d", f3, cancel)
rl.addListener(ws2, "b", f1, cancel)
require.Equal(t, map[*WebSocket][]listenerSpec{
ws1: {
{"c", cancel, 0, rl},
{1, "c", cancel},
},
ws2: {
{"b", cancel, 1, rl},
{"b", cancel, 4, rl},
{2, "b", cancel},
{5, "b", cancel},
},
ws3: {
{"a", cancel, 2, rl},
{3, "a", cancel},
},
ws4: {
{"d", cancel, 3, rl},
{4, "d", cancel},
},
}, rl.clients)
require.Equal(t, []listener{
{"c", f1, ws1},
{"b", f2, ws2},
{"a", f3, ws3},
{"d", f3, ws4},
{"b", f1, ws2},
}, rl.listeners)
})
t.Run("removing a client", func(t *testing.T) {
@@ -134,85 +115,62 @@ func TestListenerMoreConvolutedCase(t *testing.T) {
require.Equal(t, map[*WebSocket][]listenerSpec{
ws1: {
{"c", cancel, 0, rl},
{1, "c", cancel},
},
ws3: {
{"a", cancel, 2, rl},
{3, "a", cancel},
},
ws4: {
{"d", cancel, 1, rl},
{4, "d", cancel},
},
}, rl.clients)
require.Equal(t, []listener{
{"c", f1, ws1},
{"d", f3, ws4},
{"a", f3, ws3},
}, rl.listeners)
})
t.Run("reorganize the first case differently and then remove again", func(t *testing.T) {
rl.clients = map[*WebSocket][]listenerSpec{
ws1: {
{"c", cancel, 1, rl},
{2, "c", cancel},
},
ws2: {
{"b", cancel, 2, rl},
{"b", cancel, 4, rl},
{3, "b", cancel},
{5, "b", cancel},
},
ws3: {
{"a", cancel, 0, rl},
{1, "a", cancel},
},
ws4: {
{"d", cancel, 3, rl},
{4, "d", cancel},
},
}
rl.listeners = []listener{
{"a", f3, ws3},
{"c", f1, ws1},
{"b", f2, ws2},
{"d", f3, ws4},
{"b", f1, ws2},
}
rl.removeClientAndListeners(ws2)
require.Equal(t, map[*WebSocket][]listenerSpec{
ws1: {
{"c", cancel, 1, rl},
{2, "c", cancel},
},
ws3: {
{"a", cancel, 0, rl},
{1, "a", cancel},
},
ws4: {
{"d", cancel, 2, rl},
{4, "d", cancel},
},
}, rl.clients)
require.Equal(t, []listener{
{"a", f3, ws3},
{"c", f1, ws1},
{"d", f3, ws4},
}, rl.listeners)
})
}
func TestListenerMoreStuffWithMultipleRelays(t *testing.T) {
rl := NewRelay()
ws1 := &WebSocket{}
ws2 := &WebSocket{}
ws3 := &WebSocket{}
ws4 := &WebSocket{}
ws1 := &WebSocket{Context: rl.ctx}
ws2 := &WebSocket{Context: rl.ctx}
ws3 := &WebSocket{Context: rl.ctx}
ws4 := &WebSocket{Context: rl.ctx}
f1 := nostr.Filter{Kinds: []nostr.Kind{1}}
f2 := nostr.Filter{Kinds: []nostr.Kind{2}}
f3 := nostr.Filter{Kinds: []nostr.Kind{3}}
rlx := NewRelay()
rly := NewRelay()
rlz := NewRelay()
rl.clients[ws1] = nil
rl.clients[ws2] = nil
rl.clients[ws3] = nil
@@ -221,56 +179,37 @@ func TestListenerMoreStuffWithMultipleRelays(t *testing.T) {
var cancel func(cause error) = nil
t.Run("adding listeners", func(t *testing.T) {
rl.addListener(ws1, "c", rlx, f1, cancel)
rl.addListener(ws2, "b", rly, f2, cancel)
rl.addListener(ws3, "a", rlz, f3, cancel)
rl.addListener(ws4, "d", rlx, f3, cancel)
rl.addListener(ws4, "e", rlx, f3, cancel)
rl.addListener(ws3, "a", rlx, f3, cancel)
rl.addListener(ws4, "e", rly, f3, cancel)
rl.addListener(ws3, "f", rly, f3, cancel)
rl.addListener(ws1, "g", rlz, f1, cancel)
rl.addListener(ws2, "g", rlz, f2, cancel)
rl.addListener(ws1, "c", f1, cancel)
rl.addListener(ws2, "b", f2, cancel)
rl.addListener(ws3, "a", f3, cancel)
rl.addListener(ws4, "d", f3, cancel)
rl.addListener(ws4, "e", f3, cancel)
rl.addListener(ws3, "a", f3, cancel)
rl.addListener(ws4, "e", f3, cancel)
rl.addListener(ws3, "f", f3, cancel)
rl.addListener(ws1, "g", f1, cancel)
rl.addListener(ws2, "g", f2, cancel)
require.Equal(t, map[*WebSocket][]listenerSpec{
ws1: {
{"c", cancel, 0, rlx},
{"g", cancel, 1, rlz},
{1, "c", cancel},
{9, "g", cancel},
},
ws2: {
{"b", cancel, 0, rly},
{"g", cancel, 2, rlz},
{2, "b", cancel},
{10, "g", cancel},
},
ws3: {
{"a", cancel, 0, rlz},
{"a", cancel, 3, rlx},
{"f", cancel, 2, rly},
{3, "a", cancel},
{6, "a", cancel},
{8, "f", cancel},
},
ws4: {
{"d", cancel, 1, rlx},
{"e", cancel, 2, rlx},
{"e", cancel, 1, rly},
{4, "d", cancel},
{5, "e", cancel},
{7, "e", cancel},
},
}, rl.clients)
require.Equal(t, []listener{
{"c", f1, ws1},
{"d", f3, ws4},
{"e", f3, ws4},
{"a", f3, ws3},
}, rlx.listeners)
require.Equal(t, []listener{
{"b", f2, ws2},
{"e", f3, ws4},
{"f", f3, ws3},
}, rly.listeners)
require.Equal(t, []listener{
{"a", f3, ws3},
{"g", f1, ws1},
{"g", f2, ws2},
}, rlz.listeners)
})
t.Run("removing a subscription id", func(t *testing.T) {
@@ -280,41 +219,23 @@ func TestListenerMoreStuffWithMultipleRelays(t *testing.T) {
require.Equal(t, map[*WebSocket][]listenerSpec{
ws1: {
{"c", cancel, 0, rlx},
{"g", cancel, 1, rlz},
{1, "c", cancel},
{9, "g", cancel},
},
ws2: {
{"b", cancel, 0, rly},
{"g", cancel, 2, rlz},
{2, "b", cancel},
{10, "g", cancel},
},
ws3: {
{"a", cancel, 0, rlz},
{"a", cancel, 1, rlx},
{"f", cancel, 2, rly},
{3, "a", cancel},
{6, "a", cancel},
{8, "f", cancel},
},
ws4: {
{"e", cancel, 1, rly},
{"e", cancel, 2, rlx},
{5, "e", cancel},
{7, "e", cancel},
},
}, rl.clients)
require.Equal(t, []listener{
{"c", f1, ws1},
{"a", f3, ws3},
{"e", f3, ws4},
}, rlx.listeners)
require.Equal(t, []listener{
{"b", f2, ws2},
{"e", f3, ws4},
{"f", f3, ws3},
}, rly.listeners)
require.Equal(t, []listener{
{"a", f3, ws3},
{"g", f1, ws1},
{"g", f2, ws2},
}, rlz.listeners)
})
t.Run("removing another subscription id", func(t *testing.T) {
@@ -325,37 +246,21 @@ func TestListenerMoreStuffWithMultipleRelays(t *testing.T) {
require.Equal(t, map[*WebSocket][]listenerSpec{
ws1: {
{"c", cancel, 0, rlx},
{"g", cancel, 1, rlz},
{1, "c", cancel},
{9, "g", cancel},
},
ws2: {
{"b", cancel, 0, rly},
{"g", cancel, 0, rlz},
{2, "b", cancel},
{10, "g", cancel},
},
ws3: {
{"f", cancel, 2, rly},
{8, "f", cancel},
},
ws4: {
{"e", cancel, 1, rly},
{"e", cancel, 1, rlx},
{5, "e", cancel},
{7, "e", cancel},
},
}, rl.clients)
require.Equal(t, []listener{
{"c", f1, ws1},
{"e", f3, ws4},
}, rlx.listeners)
require.Equal(t, []listener{
{"b", f2, ws2},
{"e", f3, ws4},
{"f", f3, ws3},
}, rly.listeners)
require.Equal(t, []listener{
{"g", f2, ws2},
{"g", f1, ws1},
}, rlz.listeners)
})
t.Run("removing a connection", func(t *testing.T) {
@@ -363,31 +268,17 @@ func TestListenerMoreStuffWithMultipleRelays(t *testing.T) {
require.Equal(t, map[*WebSocket][]listenerSpec{
ws1: {
{"c", cancel, 0, rlx},
{"g", cancel, 0, rlz},
{1, "c", cancel},
{9, "g", cancel},
},
ws3: {
{"f", cancel, 0, rly},
{8, "f", cancel},
},
ws4: {
{"e", cancel, 1, rly},
{"e", cancel, 1, rlx},
{5, "e", cancel},
{7, "e", cancel},
},
}, rl.clients)
require.Equal(t, []listener{
{"c", f1, ws1},
{"e", f3, ws4},
}, rlx.listeners)
require.Equal(t, []listener{
{"f", f3, ws3},
{"e", f3, ws4},
}, rly.listeners)
require.Equal(t, []listener{
{"g", f1, ws1},
}, rlz.listeners)
})
t.Run("removing another subscription id", func(t *testing.T) {
@@ -398,26 +289,14 @@ func TestListenerMoreStuffWithMultipleRelays(t *testing.T) {
require.Equal(t, map[*WebSocket][]listenerSpec{
ws1: {
{"c", cancel, 0, rlx},
{"g", cancel, 0, rlz},
{1, "c", cancel},
{9, "g", cancel},
},
ws3: {
{"f", cancel, 0, rly},
{8, "f", cancel},
},
ws4: {},
}, rl.clients)
require.Equal(t, []listener{
{"c", f1, ws1},
}, rlx.listeners)
require.Equal(t, []listener{
{"f", f3, ws3},
}, rly.listeners)
require.Equal(t, []listener{
{"g", f1, ws1},
}, rlz.listeners)
})
}
@@ -432,7 +311,7 @@ func TestRandomListenerClientRemoving(t *testing.T) {
l := 0
for i := 0; i < 20; i++ {
ws := &WebSocket{}
ws := &WebSocket{Context: rl.ctx}
websockets = append(websockets, ws)
rl.clients[ws] = nil
}
@@ -444,20 +323,28 @@ func TestRandomListenerClientRemoving(t *testing.T) {
if rand.Intn(2) < 1 {
l++
rl.addListener(ws, w+":"+idFromSeqLower(j), rl, f, cancel)
rl.addListener(ws, w+":"+idFromSeqLower(j), f, cancel)
}
}
}
require.Len(t, rl.clients, 20)
require.Len(t, rl.listeners, l)
ssidCount := 0
for _, specs := range rl.clients {
ssidCount += len(specs)
}
require.Equal(t, l, ssidCount)
for ws := range rl.clients {
rl.removeClientAndListeners(ws)
}
require.Len(t, rl.clients, 0)
require.Len(t, rl.listeners, 0)
ssidCount = 0
for _, specs := range rl.clients {
ssidCount += len(specs)
}
require.Equal(t, 0, ssidCount)
}
func TestRandomListenerIdRemoving(t *testing.T) {
@@ -477,7 +364,7 @@ func TestRandomListenerIdRemoving(t *testing.T) {
extra := 0
for i := 0; i < 20; i++ {
ws := &WebSocket{}
ws := &WebSocket{Context: rl.ctx}
websockets = append(websockets, ws)
rl.clients[ws] = nil
}
@@ -489,11 +376,11 @@ func TestRandomListenerIdRemoving(t *testing.T) {
if rand.Intn(2) < 1 {
id := w + ":" + idFromSeqLower(j)
rl.addListener(ws, id, rl, f, cancel)
rl.addListener(ws, id, f, cancel)
subs = append(subs, wsid{ws, id})
if rand.Intn(5) < 1 {
rl.addListener(ws, id, rl, f, cancel)
rl.addListener(ws, id, f, cancel)
extra++
}
}
@@ -501,7 +388,11 @@ func TestRandomListenerIdRemoving(t *testing.T) {
}
require.Len(t, rl.clients, 20)
require.Len(t, rl.listeners, len(subs)+extra)
ssidCount := 0
for _, specs := range rl.clients {
ssidCount += len(specs)
}
require.Equal(t, len(subs)+extra, ssidCount)
rand.Shuffle(len(subs), func(i, j int) {
subs[i], subs[j] = subs[j], subs[i]
@@ -510,7 +401,11 @@ func TestRandomListenerIdRemoving(t *testing.T) {
rl.removeListenerId(wsidToRemove.ws, wsidToRemove.id)
}
require.Len(t, rl.listeners, 0)
ssidCount = 0
for _, specs := range rl.clients {
ssidCount += len(specs)
}
require.Equal(t, 0, ssidCount)
require.Len(t, rl.clients, 20)
for _, specs := range rl.clients {
require.Len(t, specs, 0)
@@ -520,12 +415,9 @@ func TestRandomListenerIdRemoving(t *testing.T) {
func TestRouterListenersPabloCrash(t *testing.T) {
rl := NewRelay()
rla := NewRelay()
rlb := NewRelay()
ws1 := &WebSocket{}
ws2 := &WebSocket{}
ws3 := &WebSocket{}
ws1 := &WebSocket{Context: rl.ctx}
ws2 := &WebSocket{Context: rl.ctx}
ws3 := &WebSocket{Context: rl.ctx}
rl.clients[ws1] = nil
rl.clients[ws2] = nil
@@ -534,11 +426,11 @@ func TestRouterListenersPabloCrash(t *testing.T) {
f := nostr.Filter{Kinds: []nostr.Kind{1}}
cancel := func(cause error) {}
rl.addListener(ws1, ":1", rla, f, cancel)
rl.addListener(ws2, ":1", rlb, f, cancel)
rl.addListener(ws3, "a", rlb, f, cancel)
rl.addListener(ws3, "b", rla, f, cancel)
rl.addListener(ws3, "c", rlb, f, cancel)
rl.addListener(ws1, ":1", f, cancel)
rl.addListener(ws2, ":1", f, cancel)
rl.addListener(ws3, "a", f, cancel)
rl.addListener(ws3, "b", f, cancel)
rl.addListener(ws3, "c", f, cancel)
rl.removeClientAndListeners(ws1)
rl.removeClientAndListeners(ws3)
+3
View File
@@ -124,6 +124,9 @@ var nostrReferencesPrefix = regexp.MustCompile(`\b(nevent1|npub1|nprofile1|note1
func RejectUnprefixedNostrReferences(ctx context.Context, event nostr.Event) (bool, string) {
content := sdk.GetMainContent(event)
if content == "" {
content = event.Content
}
// only do it for stuff that wasn't parsed as blocks already
// (since those are already good references or URLs)
+2 -7
View File
@@ -42,7 +42,7 @@ func NewRelay() *Relay {
clients: make(map[*WebSocket][]listenerSpec, 100),
clientsMutex: channelmutex.New(),
listeners: make([]listener, 0, 100),
dispatcher: newDispatcher(),
serveMux: &http.ServeMux{},
@@ -87,11 +87,6 @@ type Relay struct {
// this can be ignored unless you know what you're doing
ChallengePrefix string
// these are used when this relays acts as a router
routes []Route
getSubRelayFromEvent func(*nostr.Event) *Relay // used for handling EVENTs
getSubRelayFromFilter func(nostr.Filter) *Relay // used for handling REQs
// setting up handlers here will enable these methods
ManagementAPI RelayManagementAPI
@@ -108,7 +103,7 @@ type Relay struct {
// keep a connection reference to all connected clients for Server.Shutdown
// also used for keeping track of who is listening to what
clients map[*WebSocket][]listenerSpec
listeners []listener
dispatcher dispatcher
clientsMutex *channelmutex.Mutex
// set this to true to support negentropy
+35 -77
View File
@@ -9,6 +9,7 @@ import (
"fiatjaf.com/nostr"
"fiatjaf.com/nostr/eventstore/slicestore"
"github.com/stretchr/testify/require"
)
func TestBasicRelayFunctionality(t *testing.T) {
@@ -46,15 +47,11 @@ func TestBasicRelayFunctionality(t *testing.T) {
// connect two test clients
url := "ws" + server.URL[4:]
client1, err := nostr.RelayConnect(t.Context(), url, nostr.RelayOptions{})
if err != nil {
t.Fatalf("failed to connect client1: %v", err)
}
require.NoError(t, err, "failed to connect client1")
defer client1.Close()
client2, err := nostr.RelayConnect(t.Context(), url, nostr.RelayOptions{})
if err != nil {
t.Fatalf("failed to connect client2: %v", err)
}
require.NoError(t, err, "failed to connect client2")
defer client2.Close()
// test 1: store and query events
@@ -64,18 +61,14 @@ func TestBasicRelayFunctionality(t *testing.T) {
evt1 := createEvent(sk1, 1, "hello world", nil)
err := client1.Publish(ctx, evt1)
if err != nil {
t.Fatalf("failed to publish event: %v", err)
}
require.NoError(t, err, "failed to publish event")
// Query the event back
sub, err := client2.Subscribe(ctx, nostr.Filter{
Authors: []nostr.PubKey{pk1},
Kinds: []nostr.Kind{1},
}, nostr.SubscriptionOptions{})
if err != nil {
t.Fatalf("failed to subscribe: %v", err)
}
require.NoError(t, err, "failed to subscribe")
defer sub.Unsub()
// Wait for event
@@ -85,7 +78,7 @@ func TestBasicRelayFunctionality(t *testing.T) {
t.Errorf("got wrong event: %v", env.ID)
}
case <-ctx.Done():
t.Fatal("timeout waiting for event")
require.FailNow(t, "timeout waiting for event")
}
})
@@ -99,17 +92,13 @@ func TestBasicRelayFunctionality(t *testing.T) {
Authors: []nostr.PubKey{pk2},
Kinds: []nostr.Kind{1},
}, nostr.SubscriptionOptions{})
if err != nil {
t.Fatalf("failed to subscribe: %v", err)
}
require.NoError(t, err, "failed to subscribe")
defer sub.Unsub()
// Publish event from client2
evt2 := createEvent(sk2, 1, "testing live events", nil)
err = client2.Publish(ctx, evt2)
if err != nil {
t.Fatalf("failed to publish event: %v", err)
}
require.NoError(t, err, "failed to publish event")
// Wait for event on subscription
select {
@@ -118,7 +107,7 @@ func TestBasicRelayFunctionality(t *testing.T) {
t.Errorf("got wrong event: %v", env.ID)
}
case <-ctx.Done():
t.Fatal("timeout waiting for live event")
require.FailNow(t, "timeout waiting for live event")
}
})
@@ -130,24 +119,18 @@ func TestBasicRelayFunctionality(t *testing.T) {
// Create an event to be deleted
evt3 := createEvent(sk1, 1, "delete me", nil)
err = client1.Publish(ctx, evt3)
if err != nil {
t.Fatalf("failed to publish event: %v", err)
}
require.NoError(t, err, "failed to publish event")
// Create deletion event
delEvent := createEvent(sk1, 5, "deleting", nostr.Tags{{"e", evt3.ID.Hex()}})
err = client1.Publish(ctx, delEvent)
if err != nil {
t.Fatalf("failed to publish deletion event: %v", err)
}
require.NoError(t, err, "failed to publish deletion event")
// Try to query the deleted event
sub, err := client2.Subscribe(ctx, nostr.Filter{
IDs: []nostr.ID{evt3.ID},
}, nostr.SubscriptionOptions{})
if err != nil {
t.Fatalf("failed to subscribe: %v", err)
}
require.NoError(t, err, "failed to subscribe")
defer sub.Unsub()
// Should get EOSE without receiving the deleted event
@@ -162,7 +145,7 @@ func TestBasicRelayFunctionality(t *testing.T) {
}
goto checkDeleteStored
case <-ctx.Done():
t.Fatal("timeout waiting for EOSE")
require.FailNow(t, "timeout waiting for EOSE")
}
}
@@ -171,9 +154,7 @@ func TestBasicRelayFunctionality(t *testing.T) {
subDelete, err := client2.Subscribe(ctx, nostr.Filter{
IDs: []nostr.ID{delEvent.ID},
}, nostr.SubscriptionOptions{})
if err != nil {
t.Fatalf("failed to subscribe to delete event: %v", err)
}
require.NoError(t, err, "failed to subscribe to delete event")
defer subDelete.Unsub()
gotDeleteEvent := false
@@ -189,7 +170,7 @@ func TestBasicRelayFunctionality(t *testing.T) {
}
return
case <-ctx.Done():
t.Fatal("timeout waiting for EOSE on delete event")
require.FailNow(t, "timeout waiting for EOSE on delete event")
}
}
})
@@ -204,36 +185,28 @@ func TestBasicRelayFunctionality(t *testing.T) {
evt1.CreatedAt = 1000 // Set specific timestamp for testing
evt1.Sign(sk1)
err = client1.Publish(ctx, evt1)
if err != nil {
t.Fatalf("failed to publish initial event: %v", err)
}
require.NoError(t, err, "failed to publish initial event")
// create newer event that should replace the first
evt2 := createEvent(sk1, 0, `{"name":"newer"}`, nil)
evt2.CreatedAt = 2004 // Newer timestamp
evt2.Sign(sk1)
err = client1.Publish(ctx, evt2)
if err != nil {
t.Fatalf("failed to publish newer event: %v", err)
}
require.NoError(t, err, "failed to publish newer event")
// create older event that should not replace the current one
evt3 := createEvent(sk1, 0, `{"name":"older"}`, nil)
evt3.CreatedAt = 1500 // Older than evt2
evt3.Sign(sk1)
err = client1.Publish(ctx, evt3)
if err != nil {
t.Fatalf("failed to publish older event: %v", err)
}
require.NoError(t, err, "failed to publish older event")
// query to verify only the newest event exists
sub, err := client2.Subscribe(ctx, nostr.Filter{
Authors: []nostr.PubKey{pk1},
Kinds: []nostr.Kind{0},
}, nostr.SubscriptionOptions{})
if err != nil {
t.Fatalf("failed to subscribe: %v", err)
}
require.NoError(t, err, "failed to subscribe")
defer sub.Unsub()
// should only get one event back (the newest one)
@@ -251,7 +224,7 @@ func TestBasicRelayFunctionality(t *testing.T) {
}
return
case <-ctx.Done():
t.Fatal("timeout waiting for events")
require.FailNow(t, "timeout waiting for events")
}
}
})
@@ -281,26 +254,20 @@ func TestBasicRelayFunctionality(t *testing.T) {
// connect test client
url := "ws" + server.URL[4:]
client, err := nostr.RelayConnect(t.Context(), url, nostr.RelayOptions{})
if err != nil {
t.Fatalf("failed to connect client: %v", err)
}
require.NoError(t, err, "failed to connect client")
defer client.Close()
// create event that expires in 2 seconds
expiration := strconv.FormatInt(int64(nostr.Now()+2), 10)
evt := createEvent(sk1, 1, "i will expire soon", nostr.Tags{{"expiration", expiration}})
err = client.Publish(ctx, evt)
if err != nil {
t.Fatalf("failed to publish event: %v", err)
}
require.NoError(t, err, "failed to publish event")
// verify event exists initially
sub, err := client.Subscribe(ctx, nostr.Filter{
IDs: []nostr.ID{evt.ID},
}, nostr.SubscriptionOptions{})
if err != nil {
t.Fatalf("failed to subscribe: %v", err)
}
require.NoError(t, err, "failed to subscribe")
// should get the event
select {
@@ -309,7 +276,7 @@ func TestBasicRelayFunctionality(t *testing.T) {
t.Error("got wrong event")
}
case <-ctx.Done():
t.Fatal("timeout waiting for event")
require.FailNow(t, "timeout waiting for event")
}
sub.Unsub()
@@ -320,9 +287,7 @@ func TestBasicRelayFunctionality(t *testing.T) {
sub, err = client.Subscribe(ctx, nostr.Filter{
IDs: []nostr.ID{evt.ID},
}, nostr.SubscriptionOptions{})
if err != nil {
t.Fatalf("failed to subscribe: %v", err)
}
require.NoError(t, err, "failed to subscribe")
defer sub.Unsub()
// should get EOSE without receiving the expired event
@@ -337,7 +302,7 @@ func TestBasicRelayFunctionality(t *testing.T) {
}
return
case <-ctx.Done():
t.Fatal("timeout waiting for EOSE")
require.FailNow(t, "timeout waiting for EOSE")
}
}
})
@@ -350,33 +315,26 @@ func TestBasicRelayFunctionality(t *testing.T) {
// create an event from client1
evt4 := createEvent(sk1, 1, "try to delete me", nil)
err = client1.Publish(ctx, evt4)
if err != nil {
t.Fatalf("failed to publish event: %v", err)
}
require.NoError(t, err)
// Try to delete it with client2
// try to delete it with client2
delEvent := createEvent(sk2, 5, "trying to delete", nostr.Tags{{"e", evt4.ID.Hex()}})
err = client2.Publish(ctx, delEvent)
if err == nil {
t.Fatalf("should have failed to publish deletion event: %v", err)
}
require.Error(t, err)
// Verify event still exists
// verify event still exists
sub, err := client1.Subscribe(ctx, nostr.Filter{
IDs: []nostr.ID{evt4.ID},
}, nostr.SubscriptionOptions{})
if err != nil {
t.Fatalf("failed to subscribe: %v", err)
}
require.NoError(t, err)
defer sub.Unsub()
select {
case env := <-sub.Events:
if env.ID != evt4.ID {
t.Error("got wrong event")
}
case env, more := <-sub.Events:
require.True(t, more, "should get an event, got nothing")
require.Equal(t, env.ID, evt4.ID, "got wrong event")
case <-ctx.Done():
t.Fatal("event should still exist")
require.FailNow(t, "event should still exist")
}
})
}
-77
View File
@@ -1,77 +0,0 @@
package khatru
import (
"fiatjaf.com/nostr"
)
type Router struct{ *Relay }
type Route struct {
eventMatcher func(*nostr.Event) bool
filterMatcher func(nostr.Filter) bool
relay *Relay
}
type routeBuilder struct {
router *Router
eventMatcher func(*nostr.Event) bool
filterMatcher func(nostr.Filter) bool
}
func NewRouter() *Router {
rr := &Router{Relay: NewRelay()}
rr.routes = make([]Route, 0, 3)
rr.getSubRelayFromFilter = func(f nostr.Filter) *Relay {
for _, route := range rr.routes {
if route.filterMatcher == nil || route.filterMatcher(f) {
return route.relay
}
}
return rr.Relay
}
rr.getSubRelayFromEvent = func(e *nostr.Event) *Relay {
for _, route := range rr.routes {
if route.eventMatcher == nil || route.eventMatcher(e) {
return route.relay
}
}
return rr.Relay
}
return rr
}
func (rr *Router) Route() routeBuilder {
return routeBuilder{
router: rr,
filterMatcher: func(f nostr.Filter) bool { return false },
eventMatcher: func(e *nostr.Event) bool { return false },
}
}
func (rb routeBuilder) Req(fn func(nostr.Filter) bool) routeBuilder {
rb.filterMatcher = fn
return rb
}
func (rb routeBuilder) AnyReq() routeBuilder {
rb.filterMatcher = nil
return rb
}
func (rb routeBuilder) Event(fn func(*nostr.Event) bool) routeBuilder {
rb.eventMatcher = fn
return rb
}
func (rb routeBuilder) AnyEvent() routeBuilder {
rb.eventMatcher = nil
return rb
}
func (rb routeBuilder) Relay(relay *Relay) {
rb.router.routes = append(rb.router.routes, Route{
filterMatcher: rb.filterMatcher,
eventMatcher: rb.eventMatcher,
relay: relay,
})
}