khatru: handle request synchronously until EOSE, no need for waitgroups.
This commit is contained in:
+2
-9
@@ -295,9 +295,6 @@ func (rl *Relay) HandleWebsocket(w http.ResponseWriter, r *http.Request) {
|
|||||||
case *nostr.ReqEnvelope:
|
case *nostr.ReqEnvelope:
|
||||||
rl.removeListenerId(ws, env.SubscriptionID)
|
rl.removeListenerId(ws, env.SubscriptionID)
|
||||||
|
|
||||||
eose := sync.WaitGroup{}
|
|
||||||
eose.Add(len(env.Filters))
|
|
||||||
|
|
||||||
// a context just for the "stored events" request handler
|
// a context just for the "stored events" request handler
|
||||||
reqCtx, cancelReqCtx := context.WithCancelCause(ctx)
|
reqCtx, cancelReqCtx := context.WithCancelCause(ctx)
|
||||||
|
|
||||||
@@ -306,7 +303,7 @@ func (rl *Relay) HandleWebsocket(w http.ResponseWriter, r *http.Request) {
|
|||||||
|
|
||||||
// handle each filter separately -- dispatching events as they're loaded from databases
|
// handle each filter separately -- dispatching events as they're loaded from databases
|
||||||
for _, filter := range env.Filters {
|
for _, filter := range env.Filters {
|
||||||
err := rl.handleRequest(reqCtx, env.SubscriptionID, &eose, ws, filter)
|
err := rl.handleRequest(reqCtx, env.SubscriptionID, ws, filter)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// fail everything if any filter is rejected
|
// fail everything if any filter is rejected
|
||||||
reason := err.Error()
|
reason := err.Error()
|
||||||
@@ -324,11 +321,7 @@ func (rl *Relay) HandleWebsocket(w http.ResponseWriter, r *http.Request) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
go func() {
|
ws.WriteJSON(nostr.EOSEEnvelope(env.SubscriptionID))
|
||||||
// when all events have been loaded from databases and dispatched we can fire the EOSE message
|
|
||||||
eose.Wait()
|
|
||||||
ws.WriteJSON(nostr.EOSEEnvelope(env.SubscriptionID))
|
|
||||||
}()
|
|
||||||
case *nostr.CloseEnvelope:
|
case *nostr.CloseEnvelope:
|
||||||
id := string(*env)
|
id := string(*env)
|
||||||
rl.removeListenerId(ws, id)
|
rl.removeListenerId(ws, id)
|
||||||
|
|||||||
@@ -3,15 +3,12 @@ package khatru
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"sync"
|
|
||||||
|
|
||||||
"fiatjaf.com/nostr"
|
"fiatjaf.com/nostr"
|
||||||
"fiatjaf.com/nostr/nip45/hyperloglog"
|
"fiatjaf.com/nostr/nip45/hyperloglog"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (rl *Relay) handleRequest(ctx context.Context, id string, eose *sync.WaitGroup, ws *WebSocket, filter nostr.Filter) error {
|
func (rl *Relay) handleRequest(ctx context.Context, id string, ws *WebSocket, filter nostr.Filter) error {
|
||||||
defer eose.Done()
|
|
||||||
|
|
||||||
// then check if we'll reject this filter (we apply this after overwriting
|
// then check if we'll reject this filter (we apply this after overwriting
|
||||||
// because we may, for example, remove some things from the incoming filters
|
// because we may, for example, remove some things from the incoming filters
|
||||||
// that we know we don't support, and then if the end result is an empty
|
// that we know we don't support, and then if the end result is an empty
|
||||||
|
|||||||
Reference in New Issue
Block a user