upgrade interfaces, add local filtering for subscriptions and some other things I forgot. also a README with examples.
This commit is contained in:
+20
-4
@@ -9,6 +9,7 @@ import (
|
||||
"log"
|
||||
"time"
|
||||
|
||||
"github.com/fiatjaf/bip340"
|
||||
"github.com/fiatjaf/go-nostr/event"
|
||||
"github.com/fiatjaf/go-nostr/filter"
|
||||
nostrutils "github.com/fiatjaf/go-nostr/utils"
|
||||
@@ -121,11 +122,18 @@ func (r *RelayPool) Add(url string, policy *Policy) error {
|
||||
if subscription, ok := r.subscriptions[channel]; ok {
|
||||
var event event.Event
|
||||
json.Unmarshal(jsonMessage[2], &event)
|
||||
|
||||
// check signature of all received events, ignore invalid
|
||||
ok, _ := event.CheckSignature()
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
// check if the event matches the desired filter, ignore otherwise
|
||||
if !subscription.filters.Match(&event) {
|
||||
continue
|
||||
}
|
||||
|
||||
subscription.Events <- EventMessage{
|
||||
Relay: nm,
|
||||
Event: event,
|
||||
@@ -153,7 +161,7 @@ func (r *RelayPool) Remove(url string) {
|
||||
delete(r.websockets, nm)
|
||||
}
|
||||
|
||||
func (r *RelayPool) Sub(filter filter.EventFilter) *Subscription {
|
||||
func (r *RelayPool) Sub(filters filter.EventFilters) *Subscription {
|
||||
random := make([]byte, 7)
|
||||
rand.Read(random)
|
||||
|
||||
@@ -170,17 +178,25 @@ func (r *RelayPool) Sub(filter filter.EventFilter) *Subscription {
|
||||
subscription.UniqueEvents = make(chan event.Event)
|
||||
r.subscriptions[subscription.channel] = &subscription
|
||||
|
||||
subscription.Sub(&filter)
|
||||
subscription.Sub(filters)
|
||||
return &subscription
|
||||
}
|
||||
|
||||
func (r *RelayPool) PublishEvent(evt *event.Event) (*event.Event, chan PublishStatus, error) {
|
||||
status := make(chan PublishStatus, 1)
|
||||
|
||||
if r.SecretKey == nil && evt.Sig == "" {
|
||||
if r.SecretKey == nil && (evt.PubKey == "" || evt.Sig == "") {
|
||||
return nil, status, errors.New("PublishEvent needs either a signed event to publish or to have been configured with a .SecretKey.")
|
||||
}
|
||||
|
||||
if evt.PubKey == "" {
|
||||
secretKeyN, err := bip340.ParsePrivateKey(*r.SecretKey)
|
||||
if err != nil {
|
||||
return nil, status, fmt.Errorf("The pool's global SecretKey is invalid: %w", err)
|
||||
}
|
||||
evt.PubKey = fmt.Sprintf("%x", bip340.GetPublicKey(secretKeyN))
|
||||
}
|
||||
|
||||
if evt.Sig == "" {
|
||||
err := evt.Sign(*r.SecretKey)
|
||||
if err != nil {
|
||||
@@ -197,7 +213,7 @@ func (r *RelayPool) PublishEvent(evt *event.Event) (*event.Event, chan PublishSt
|
||||
}
|
||||
status <- PublishStatus{relay, PublishStatusSent}
|
||||
|
||||
subscription := r.Sub(filter.EventFilter{ID: evt.ID})
|
||||
subscription := r.Sub(filter.EventFilters{{ID: evt.ID}})
|
||||
for {
|
||||
select {
|
||||
case event := <-subscription.UniqueEvents:
|
||||
|
||||
+18
-13
@@ -10,8 +10,8 @@ type Subscription struct {
|
||||
channel string
|
||||
relays map[string]*websocket.Conn
|
||||
|
||||
filter *filter.EventFilter
|
||||
Events chan EventMessage
|
||||
filters filter.EventFilters
|
||||
Events chan EventMessage
|
||||
|
||||
started bool
|
||||
UniqueEvents chan event.Event
|
||||
@@ -38,17 +38,17 @@ func (subscription Subscription) Unsub() {
|
||||
}
|
||||
}
|
||||
|
||||
func (subscription Subscription) Sub(filter *filter.EventFilter) {
|
||||
if filter != nil {
|
||||
subscription.filter = filter
|
||||
}
|
||||
|
||||
func (subscription Subscription) Sub(filters filter.EventFilters) {
|
||||
for _, ws := range subscription.relays {
|
||||
ws.WriteJSON([]interface{}{
|
||||
message := []interface{}{
|
||||
"REQ",
|
||||
subscription.channel,
|
||||
subscription.filter,
|
||||
})
|
||||
}
|
||||
for _, filter := range subscription.filters {
|
||||
message = append(message, filter)
|
||||
}
|
||||
|
||||
ws.WriteJSON(message)
|
||||
}
|
||||
|
||||
if !subscription.started {
|
||||
@@ -79,9 +79,14 @@ func (subscription Subscription) removeRelay(relay string) {
|
||||
|
||||
func (subscription Subscription) addRelay(relay string, ws *websocket.Conn) {
|
||||
subscription.relays[relay] = ws
|
||||
ws.WriteJSON([]interface{}{
|
||||
|
||||
message := []interface{}{
|
||||
"REQ",
|
||||
subscription.channel,
|
||||
subscription.filter,
|
||||
})
|
||||
}
|
||||
for _, filter := range subscription.filters {
|
||||
message = append(message, filter)
|
||||
}
|
||||
|
||||
ws.WriteJSON(message)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user