diff --git a/sdk/addressable_loader.go b/sdk/addressable_loader.go new file mode 100644 index 0000000..c0761b3 --- /dev/null +++ b/sdk/addressable_loader.go @@ -0,0 +1,183 @@ +package sdk + +import ( + "context" + "fmt" + "strconv" + "sync" + "time" + + "github.com/graph-gophers/dataloader/v7" + "github.com/nbd-wtf/go-nostr" +) + +// this is similar to replaceable_loader and reuses logic from that. + +type addressableIndex int + +const ( + kind_30000 addressableIndex = 0 + kind_30002 addressableIndex = 1 + kind_30015 addressableIndex = 2 + kind_30030 addressableIndex = 3 +) + +func (sys *System) initializeAddressableDataloaders() { + sys.addressableLoaders[kind_30000] = sys.createAddressableDataloader(30000) + sys.addressableLoaders[kind_30002] = sys.createAddressableDataloader(30002) + sys.addressableLoaders[kind_30015] = sys.createAddressableDataloader(30015) + sys.addressableLoaders[kind_30030] = sys.createAddressableDataloader(30030) +} + +func (sys *System) createAddressableDataloader(kind int) *dataloader.Loader[string, []*nostr.Event] { + return dataloader.NewBatchedLoader( + func(_ context.Context, pubkeys []string) []*dataloader.Result[[]*nostr.Event] { + return sys.batchLoadAddressableEvents(kind, pubkeys) + }, + dataloader.WithBatchCapacity[string, []*nostr.Event](60), + dataloader.WithClearCacheOnBatch[string, []*nostr.Event](), + dataloader.WithWait[string, []*nostr.Event](time.Millisecond*350), + ) +} + +func (sys *System) batchLoadAddressableEvents( + kind int, + pubkeys []string, +) []*dataloader.Result[[]*nostr.Event] { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*4) + defer cancel() + + batchSize := len(pubkeys) + results := make([]*dataloader.Result[[]*nostr.Event], batchSize) + keyPositions := make(map[string]int) // { [pubkey]: slice_index } + relayFilters := make(map[string]nostr.Filter) // { [relayUrl]: filter } + + wg := sync.WaitGroup{} + wg.Add(len(pubkeys)) + cm := sync.Mutex{} + + for i, pubkey := range pubkeys { + // build batched queries for the external relays + keyPositions[pubkey] = i // this is to help us know where to save the result later + + go func(i int, pubkey string) { + defer wg.Done() + + // if we're attempting this query with a short key (last 8 characters), stop here + if len(pubkey) != 64 { + results[i] = &dataloader.Result[[]*nostr.Event]{ + Error: fmt.Errorf("won't proceed to query relays with a shortened key (%d)", kind), + } + return + } + + // save attempts here so we don't try the same failed query over and over + if doItNow := doThisNotMoreThanOnceAnHour("repl:" + strconv.Itoa(kind) + pubkey); !doItNow { + results[i] = &dataloader.Result[[]*nostr.Event]{ + Error: fmt.Errorf("last attempt failed, waiting more to try again"), + } + return + } + + // gather relays we'll use for this pubkey + relays := sys.determineRelaysToQuery(ctx, pubkey, kind) + + // by default we will return an error (this will be overwritten when we find an event) + results[i] = &dataloader.Result[[]*nostr.Event]{ + Error: fmt.Errorf("couldn't find a kind %d event anywhere %v", kind, relays), + } + + cm.Lock() + for _, relay := range relays { + // each relay will have a custom filter + filter, ok := relayFilters[relay] + if !ok { + filter = nostr.Filter{ + Kinds: []int{kind}, + Authors: make([]string, 0, batchSize-i /* this and all pubkeys after this can be added */), + } + } + filter.Authors = append(filter.Authors, pubkey) + relayFilters[relay] = filter + } + cm.Unlock() + }(i, pubkey) + } + + // query all relays with the prepared filters + wg.Wait() + multiSubs := sys.batchAddressableRelayQueries(ctx, relayFilters) +nextEvent: + for { + select { + case evt, more := <-multiSubs: + if !more { + return results + } + + // insert this event at the desired position + pos := keyPositions[evt.PubKey] // @unchecked: it must succeed because it must be a key we passed + + events := results[pos].Data + if events == nil { + // no events found, so just add this and end + results[pos] = &dataloader.Result[[]*nostr.Event]{Data: []*nostr.Event{evt}} + continue nextEvent + } + + // there are events, so look for a match + d := evt.Tags.GetD() + for i, event := range events { + if event.Tags.GetD() == d { + // there is a match + if event.CreatedAt < evt.CreatedAt { + // ...and this one is newer, so replace + events[i] = evt + } else { + // ... but this one is older, so ignore + } + // in any case we end this here + continue nextEvent + } + } + + // there is no match, so add to the end + events = append(events, evt) + results[pos].Data = events + case <-ctx.Done(): + return results + } + } +} + +// batchAddressableRelayQueries is like batchReplaceableRelayQueries, except it doesn't count results to +// try to exit early. +func (sys *System) batchAddressableRelayQueries( + ctx context.Context, + relayFilters map[string]nostr.Filter, +) <-chan *nostr.Event { + all := make(chan *nostr.Event) + + wg := sync.WaitGroup{} + wg.Add(len(relayFilters)) + for url, filter := range relayFilters { + go func(url string, filter nostr.Filter) { + defer wg.Done() + n := len(filter.Authors) + + ctx, cancel := context.WithTimeout(ctx, time.Millisecond*450+time.Millisecond*50*time.Duration(n)) + defer cancel() + + for ie := range sys.Pool.SubManyEose(ctx, []string{url}, nostr.Filters{filter}, nostr.WithLabel("addr")) { + all <- ie.Event + } + }(url, filter) + } + + go func() { + wg.Wait() + close(all) + }() + + return all +} diff --git a/sdk/lists_profile.go b/sdk/lists_profile.go index 253e5c2..60b312e 100644 --- a/sdk/lists_profile.go +++ b/sdk/lists_profile.go @@ -26,6 +26,11 @@ func (sys *System) FetchMuteList(ctx context.Context, pubkey string) GenericList return ml } +func (sys *System) FetchFollowSets(ctx context.Context, pubkey string) GenericSets[ProfileRef] { + ml, _ := fetchGenericSets(sys, ctx, pubkey, 30000, kind_30000, parseProfileRef, sys.FollowSetsCache, false) + return ml +} + func parseProfileRef(tag nostr.Tag) (fw ProfileRef, ok bool) { if len(tag) < 2 { return fw, false diff --git a/sdk/lists_relay.go b/sdk/lists_relay.go index da85b47..bae4799 100644 --- a/sdk/lists_relay.go +++ b/sdk/lists_relay.go @@ -33,6 +33,11 @@ func (sys *System) FetchSearchRelayList(ctx context.Context, pubkey string) Gene return ml } +func (sys *System) FetchRelaySets(ctx context.Context, pubkey string) GenericSets[RelayURL] { + ml, _ := fetchGenericSets(sys, ctx, pubkey, 30002, kind_30002, parseRelayURL, sys.RelaySetsCache, false) + return ml +} + func parseRelayFromKind10002(tag nostr.Tag) (rl Relay, ok bool) { if u := tag.Value(); u != "" && tag[0] == "r" { if !nostr.IsValidRelayURL(u) { diff --git a/sdk/lists_topics.go b/sdk/lists_topics.go new file mode 100644 index 0000000..78d477e --- /dev/null +++ b/sdk/lists_topics.go @@ -0,0 +1,28 @@ +package sdk + +import ( + "context" + + "github.com/nbd-wtf/go-nostr" +) + +type Topic string + +func (r Topic) Value() string { return string(r) } + +func (sys *System) FetchTopicList(ctx context.Context, pubkey string) GenericList[Topic] { + ml, _ := fetchGenericList(sys, ctx, pubkey, 10015, kind_10015, parseTopicString, sys.TopicListCache, false) + return ml +} + +func (sys *System) FetchTopicSets(ctx context.Context, pubkey string) GenericSets[Topic] { + ml, _ := fetchGenericSets(sys, ctx, pubkey, 30015, kind_30015, parseTopicString, sys.TopicSetsCache, false) + return ml +} + +func parseTopicString(tag nostr.Tag) (t Topic, ok bool) { + if t := tag.Value(); t != "" && tag[0] == "t" { + return Topic(t), true + } + return t, false +} diff --git a/sdk/replaceable_loader.go b/sdk/replaceable_loader.go index 694d14e..e5f59ac 100644 --- a/sdk/replaceable_loader.go +++ b/sdk/replaceable_loader.go @@ -31,7 +31,7 @@ const ( type EventResult dataloader.Result[*nostr.Event] -func (sys *System) initializeDataloaders() { +func (sys *System) initializeReplaceableDataloaders() { sys.replaceableLoaders[kind_0] = sys.createReplaceableDataloader(0) sys.replaceableLoaders[kind_3] = sys.createReplaceableDataloader(3) sys.replaceableLoaders[kind_10000] = sys.createReplaceableDataloader(10000) @@ -143,7 +143,7 @@ func (sys *System) batchLoadReplaceableEvents( } func (sys *System) determineRelaysToQuery(ctx context.Context, pubkey string, kind int) []string { - relays := make([]string, 0, 10) + var relays []string // search in specific relays for user if kind == 10002 { diff --git a/sdk/set.go b/sdk/set.go new file mode 100644 index 0000000..2a2aa3a --- /dev/null +++ b/sdk/set.go @@ -0,0 +1,98 @@ +package sdk + +import ( + "context" + "slices" + "time" + + "github.com/nbd-wtf/go-nostr" + "github.com/nbd-wtf/go-nostr/sdk/cache" +) + +// this is similar to list.go and inherits code from that. + +type GenericSets[I TagItemWithValue] struct { + PubKey string `json:"-"` + Events []*nostr.Event `json:"-"` + + Sets map[string][]I +} + +func fetchGenericSets[I TagItemWithValue]( + sys *System, + ctx context.Context, + pubkey string, + actualKind int, + addressableIndex addressableIndex, + parseTag func(nostr.Tag) (I, bool), + cache cache.Cache32[GenericSets[I]], + skipFetch bool, +) (fl GenericSets[I], fromInternal bool) { + // we have 24 mutexes, so we can load up to 24 lists at the same time, but if we do the same exact + // call that will do it only once, the subsequent ones will wait for a result to be cached + // and then return it from cache -- 13 is an arbitrary index for the pubkey + lockIdx := (int(pubkey[13]) + int(addressableIndex)) % 24 + genericListMutexes[lockIdx].Lock() + + if valueWasJustCached[lockIdx] { + // this ensures the cache has had time to commit the values + // so we don't repeat a fetch immediately after the other + valueWasJustCached[lockIdx] = false + time.Sleep(time.Millisecond * 10) + } + + defer genericListMutexes[lockIdx].Unlock() + + if v, ok := cache.Get(pubkey); ok { + return v, true + } + + v := GenericSets[I]{PubKey: pubkey} + + events, _ := sys.StoreRelay.QuerySync(ctx, nostr.Filter{Kinds: []int{actualKind}, Authors: []string{pubkey}}) + if len(events) != 0 { + sets := parseSetsFromEvents(events, parseTag) + v.Events = events + v.Sets = sets + cache.SetWithTTL(pubkey, v, time.Hour*6) + valueWasJustCached[lockIdx] = true + return v, true + } + + if !skipFetch { + thunk := sys.addressableLoaders[addressableIndex].Load(ctx, pubkey) + events, err := thunk() + if err == nil { + sets := parseSetsFromEvents(events, parseTag) + v.Sets = sets + for _, evt := range events { + sys.StoreRelay.Publish(ctx, *evt) + } + } + cache.SetWithTTL(pubkey, v, time.Hour*6) + valueWasJustCached[lockIdx] = true + } + + return v, false +} + +func parseSetsFromEvents[I TagItemWithValue]( + events []*nostr.Event, + parseTag func(nostr.Tag) (I, bool), +) map[string][]I { + sets := make(map[string][]I, len(events)) + for _, evt := range events { + items := make([]I, 0, len(evt.Tags)) + for _, tag := range evt.Tags { + item, ok := parseTag(tag) + if ok { + // check if this already exists before adding + if slices.IndexFunc(items, func(i I) bool { return i.Value() == item.Value() }) == -1 { + items = append(items, item) + } + } + } + sets[evt.Tags.GetD()] = items + } + return sets +} diff --git a/sdk/system.go b/sdk/system.go index 7809fb1..dd9847e 100644 --- a/sdk/system.go +++ b/sdk/system.go @@ -23,6 +23,10 @@ type System struct { PinListCache cache.Cache32[GenericList[EventRef]] BlockedRelayListCache cache.Cache32[GenericList[RelayURL]] SearchRelayListCache cache.Cache32[GenericList[RelayURL]] + TopicListCache cache.Cache32[GenericList[Topic]] + RelaySetsCache cache.Cache32[GenericSets[RelayURL]] + FollowSetsCache cache.Cache32[GenericSets[ProfileRef]] + TopicSetsCache cache.Cache32[GenericSets[Topic]] Hints hints.HintsDB Pool *nostr.SimplePool RelayListRelays *RelayStream @@ -37,6 +41,7 @@ type System struct { StoreRelay nostr.RelayStore replaceableLoaders []*dataloader.Loader[string, *nostr.Event] + addressableLoaders []*dataloader.Loader[string, []*nostr.Event] outboxShortTermCache cache.Cache32[[]string] } @@ -66,6 +71,10 @@ func NewSystem(mods ...SystemModifier) *System { PinListCache: cache_memory.New32[GenericList[EventRef]](1000), BlockedRelayListCache: cache_memory.New32[GenericList[RelayURL]](1000), SearchRelayListCache: cache_memory.New32[GenericList[RelayURL]](1000), + TopicListCache: cache_memory.New32[GenericList[Topic]](1000), + RelaySetsCache: cache_memory.New32[GenericSets[RelayURL]](1000), + FollowSetsCache: cache_memory.New32[GenericSets[ProfileRef]](1000), + TopicSetsCache: cache_memory.New32[GenericSets[Topic]](1000), RelayListRelays: NewRelayStream("wss://purplepag.es", "wss://user.kindpag.es", "wss://relay.nos.social"), FollowListRelays: NewRelayStream("wss://purplepag.es", "wss://user.kindpag.es", "wss://relay.nos.social"), MetadataRelays: NewRelayStream("wss://purplepag.es", "wss://user.kindpag.es", "wss://relay.nos.social"), @@ -112,7 +121,8 @@ func NewSystem(mods ...SystemModifier) *System { } sys.StoreRelay = eventstore.RelayWrapper{Store: sys.Store} - sys.initializeDataloaders() + sys.initializeReplaceableDataloaders() + sys.initializeAddressableDataloaders() return sys }