Speed up loading
This commit is contained in:
@@ -3,7 +3,7 @@ import {indexBy, remove, type Maybe, now} from "@welshman/lib"
|
||||
import {withGetter} from "@welshman/store"
|
||||
import {getFreshness, setFreshnessThrottled} from "./freshness.js"
|
||||
|
||||
export const collection = <T, LoadArgs extends any[]>({
|
||||
export const collection = <T>({
|
||||
name,
|
||||
store,
|
||||
getKey,
|
||||
@@ -12,7 +12,7 @@ export const collection = <T, LoadArgs extends any[]>({
|
||||
name: string
|
||||
store: Readable<T[]>
|
||||
getKey: (item: T) => string
|
||||
load?: (key: string, ...args: LoadArgs) => Promise<any>
|
||||
load?: (key: string, relays: string[]) => Promise<any>
|
||||
}) => {
|
||||
const indexStore = withGetter(derived(store, $items => indexBy(getKey, $items)))
|
||||
const pending = new Map<string, Promise<Maybe<T>>>()
|
||||
@@ -20,7 +20,7 @@ export const collection = <T, LoadArgs extends any[]>({
|
||||
|
||||
let subscribers: Subscriber<T>[] = []
|
||||
|
||||
const loadItem = async (key: string, ...args: LoadArgs) => {
|
||||
const loadItem = async (key: string, relays: string[] = []) => {
|
||||
const stale = indexStore.get().get(key)
|
||||
|
||||
// If we have no loader function, nothing we can do
|
||||
@@ -51,7 +51,7 @@ export const collection = <T, LoadArgs extends any[]>({
|
||||
|
||||
setFreshnessThrottled({ns: name, key, ts: now()})
|
||||
|
||||
const promise = load(key, ...args)
|
||||
const promise = load(key, relays)
|
||||
|
||||
pending.set(key, promise)
|
||||
|
||||
@@ -76,14 +76,14 @@ export const collection = <T, LoadArgs extends any[]>({
|
||||
return fresh
|
||||
}
|
||||
|
||||
const deriveItem = (key: Maybe<string>, ...args: LoadArgs) => {
|
||||
const deriveItem = (key: Maybe<string>, relays: string[] = []) => {
|
||||
if (!key) {
|
||||
return readable(undefined)
|
||||
}
|
||||
|
||||
// If we don't yet have the item, or it's stale, trigger a request for it. The derived
|
||||
// store will update when it arrives
|
||||
loadItem(key, ...args)
|
||||
loadItem(key, relays)
|
||||
|
||||
return derived(indexStore, $index => $index.get(key))
|
||||
}
|
||||
|
||||
@@ -5,7 +5,7 @@ import {deriveEventsMapped} from "@welshman/store"
|
||||
import {repository} from "./core.js"
|
||||
import {Router} from "./router.js"
|
||||
import {collection} from "./collection.js"
|
||||
import {loadRelaySelections} from "./relaySelections.js"
|
||||
import {loadWithAsapMetaRelayUrls} from "./relaySelections.js"
|
||||
|
||||
export const follows = deriveEventsMapped<PublishedList>(repository, {
|
||||
filters: [{kinds: [FOLLOWS]}],
|
||||
@@ -21,12 +21,6 @@ export const {
|
||||
name: "follows",
|
||||
store: follows,
|
||||
getKey: follows => follows.event.pubkey,
|
||||
load: async (pubkey: string, request: Partial<MultiRequestOptions> = {}) => {
|
||||
await loadRelaySelections(pubkey, request)
|
||||
|
||||
const filters = [{kinds: [FOLLOWS], authors: [pubkey]}]
|
||||
const relays = Router.get().FromPubkey(pubkey).getUrls()
|
||||
|
||||
await load({relays, ...request, filters})
|
||||
},
|
||||
load: (pubkey: string, relays: string[]) =>
|
||||
loadWithAsapMetaRelayUrls(pubkey, relays, [{kinds: [FOLLOWS], authors: [pubkey]}]),
|
||||
})
|
||||
|
||||
@@ -105,8 +105,8 @@ export const {
|
||||
}),
|
||||
})
|
||||
|
||||
export const deriveHandleForPubkey = (pubkey: string, request: Partial<MultiRequestOptions> = {}) =>
|
||||
derived([handlesByNip05, deriveProfile(pubkey, request)], ([$handlesByNip05, $profile]) => {
|
||||
export const deriveHandleForPubkey = (pubkey: string, relays: string[] = []) =>
|
||||
derived([handlesByNip05, deriveProfile(pubkey, relays)], ([$handlesByNip05, $profile]) => {
|
||||
if (!$profile?.nip05) {
|
||||
return undefined
|
||||
}
|
||||
|
||||
@@ -6,7 +6,7 @@ import {repository} from "./core.js"
|
||||
import {Router} from "./router.js"
|
||||
import {collection} from "./collection.js"
|
||||
import {ensurePlaintext} from "./plaintext.js"
|
||||
import {loadRelaySelections} from "./relaySelections.js"
|
||||
import {loadWithAsapMetaRelayUrls} from "./relaySelections.js"
|
||||
|
||||
export const mutes = deriveEventsMapped<PublishedList>(repository, {
|
||||
filters: [{kinds: [MUTES]}],
|
||||
@@ -27,12 +27,6 @@ export const {
|
||||
name: "mutes",
|
||||
store: mutes,
|
||||
getKey: mute => mute.event.pubkey,
|
||||
load: async (pubkey: string, request: Partial<MultiRequestOptions> = {}) => {
|
||||
await loadRelaySelections(pubkey, request)
|
||||
|
||||
const filters = [{kinds: [MUTES], authors: [pubkey]}]
|
||||
const relays = Router.get().FromPubkey(pubkey).getUrls()
|
||||
|
||||
await load({relays, ...request, filters})
|
||||
},
|
||||
load: (pubkey: string, relays: string[]) =>
|
||||
loadWithAsapMetaRelayUrls(pubkey, relays, [{kinds: [MUTES], authors: [pubkey]}]),
|
||||
})
|
||||
|
||||
@@ -5,7 +5,7 @@ import {deriveEventsMapped} from "@welshman/store"
|
||||
import {repository} from "./core.js"
|
||||
import {Router} from "./router.js"
|
||||
import {collection} from "./collection.js"
|
||||
import {loadRelaySelections} from "./relaySelections.js"
|
||||
import {loadWithAsapMetaRelayUrls} from "./relaySelections.js"
|
||||
|
||||
export const pins = deriveEventsMapped<PublishedList>(repository, {
|
||||
filters: [{kinds: [PINS]}],
|
||||
@@ -21,12 +21,6 @@ export const {
|
||||
name: "pins",
|
||||
store: pins,
|
||||
getKey: pins => pins.event.pubkey,
|
||||
load: async (pubkey: string, request: Partial<MultiRequestOptions> = {}) => {
|
||||
await loadRelaySelections(pubkey, request)
|
||||
|
||||
const filters = [{kinds: [PINS], authors: [pubkey]}]
|
||||
const relays = Router.get().FromPubkey(pubkey).getUrls()
|
||||
|
||||
await load({relays, ...request, filters})
|
||||
},
|
||||
load: (pubkey: string, relays: string[]) =>
|
||||
loadWithAsapMetaRelayUrls(pubkey, relays, [{kinds: [PINS], authors: [pubkey]}]),
|
||||
})
|
||||
|
||||
@@ -6,7 +6,7 @@ import {deriveEventsMapped, withGetter} from "@welshman/store"
|
||||
import {repository} from "./core.js"
|
||||
import {Router} from "./router.js"
|
||||
import {collection} from "./collection.js"
|
||||
import {loadRelaySelections} from "./relaySelections.js"
|
||||
import {loadWithAsapMetaRelayUrls} from "./relaySelections.js"
|
||||
|
||||
export const profiles = withGetter(
|
||||
deriveEventsMapped<PublishedProfile>(repository, {
|
||||
@@ -24,15 +24,8 @@ export const {
|
||||
name: "profiles",
|
||||
store: profiles,
|
||||
getKey: profile => profile.event.pubkey,
|
||||
load: async (pubkey: string, request: Partial<MultiRequestOptions> = {}) => {
|
||||
await loadRelaySelections(pubkey, request)
|
||||
|
||||
const router = Router.get()
|
||||
const filters = [{kinds: [PROFILE], authors: [pubkey]}]
|
||||
const relays = router.merge([router.Index(), router.FromPubkey(pubkey)]).getUrls()
|
||||
|
||||
await load({relays, ...request, filters})
|
||||
},
|
||||
load: (pubkey: string, relays: string[]) =>
|
||||
loadWithAsapMetaRelayUrls(pubkey, relays, [{kinds: [PROFILE], authors: [pubkey]}])
|
||||
})
|
||||
|
||||
export const displayProfileByPubkey = (pubkey: string | undefined) =>
|
||||
|
||||
@@ -9,11 +9,11 @@ import {
|
||||
getRelayTags,
|
||||
getRelayTagValues,
|
||||
} from "@welshman/util"
|
||||
import {TrustedEvent, PublishedList, List} from "@welshman/util"
|
||||
import {TrustedEvent, Filter, PublishedList, List} from "@welshman/util"
|
||||
import {load, MultiRequestOptions} from "@welshman/net"
|
||||
import {deriveEventsMapped} from "@welshman/store"
|
||||
import {repository} from "./core.js"
|
||||
import {Router} from "./router.js"
|
||||
import {Router, addNoFallbacks} from "./router.js"
|
||||
import {collection} from "./collection.js"
|
||||
|
||||
export const getRelayUrls = (list?: List): string[] =>
|
||||
@@ -47,17 +47,29 @@ export const {
|
||||
name: "relaySelections",
|
||||
store: relaySelections,
|
||||
getKey: relaySelections => relaySelections.event.pubkey,
|
||||
load: async (pubkey: string, request: Partial<MultiRequestOptions> = {}) => {
|
||||
load: async (pubkey: string, relays: string[]) => {
|
||||
const router = Router.get()
|
||||
|
||||
await load({
|
||||
relays: router.merge([router.Index(), router.FromPubkey(pubkey)]).getUrls(),
|
||||
...request,
|
||||
relays: router.merge([router.Index(), router.FromRelays(relays), router.FromPubkey(pubkey)]).getUrls(),
|
||||
filters: [{kinds: [RELAYS], authors: [pubkey]}],
|
||||
})
|
||||
},
|
||||
})
|
||||
|
||||
export const loadWithAsapMetaRelayUrls = <T>(pubkey: string, relays: string[], filters: Filter[]) => {
|
||||
const router = Router.get()
|
||||
|
||||
return Promise.race([
|
||||
load({filters, relays: router.merge([router.FromRelays(relays), router.Index()]).getUrls()}),
|
||||
loadRelaySelections(pubkey, relays).then(() => {
|
||||
const relays = router.FromPubkey(pubkey).policy(addNoFallbacks).getUrls()
|
||||
|
||||
return load({filters, relays})
|
||||
}),
|
||||
])
|
||||
}
|
||||
|
||||
export const inboxRelaySelections = deriveEventsMapped<PublishedList>(repository, {
|
||||
filters: [{kinds: [INBOX_RELAYS]}],
|
||||
itemToEvent: item => item.event,
|
||||
@@ -72,13 +84,6 @@ export const {
|
||||
name: "inboxRelaySelections",
|
||||
store: inboxRelaySelections,
|
||||
getKey: inboxRelaySelections => inboxRelaySelections.event.pubkey,
|
||||
load: async (pubkey: string, request: Partial<MultiRequestOptions> = {}) => {
|
||||
const router = Router.get()
|
||||
|
||||
await load({
|
||||
relays: router.merge([router.Index(), router.FromPubkey(pubkey)]).getUrls(),
|
||||
...request,
|
||||
filters: [{kinds: [INBOX_RELAYS], authors: [pubkey]}],
|
||||
})
|
||||
},
|
||||
load: (pubkey: string, relays: string[]) =>
|
||||
loadWithAsapMetaRelayUrls(pubkey, relays, [{kinds: [INBOX_RELAYS], authors: [pubkey]}])
|
||||
})
|
||||
|
||||
@@ -81,8 +81,8 @@ export const {
|
||||
}),
|
||||
})
|
||||
|
||||
export const deriveZapperForPubkey = (pubkey: string, request: Partial<MultiRequestOptions> = {}) =>
|
||||
derived([zappersByLnurl, deriveProfile(pubkey, request)], ([$zappersByLnurl, $profile]) => {
|
||||
export const deriveZapperForPubkey = (pubkey: string, relays: string[] = []) =>
|
||||
derived([zappersByLnurl, deriveProfile(pubkey, relays)], ([$zappersByLnurl, $profile]) => {
|
||||
if (!$profile?.lnurl) {
|
||||
return undefined
|
||||
}
|
||||
|
||||
+62
-42
@@ -147,11 +147,11 @@ export class SingleRequest extends EventEmitter {
|
||||
this._adapter.send(["CLOSE", id])
|
||||
}
|
||||
|
||||
this.emit(RequestEvent.Close)
|
||||
this.removeAllListeners()
|
||||
this._unsubscribers.map(call)
|
||||
this._adapter.cleanup()
|
||||
this._closed = true
|
||||
this.emit(RequestEvent.Close)
|
||||
this._adapter.cleanup()
|
||||
this._unsubscribers.map(call)
|
||||
this.removeAllListeners()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -170,6 +170,7 @@ export type MultiRequestEvents = {
|
||||
|
||||
export type MultiRequestOptions = Omit<SingleRequestOptions, "relay"> & {
|
||||
relays: string[]
|
||||
threshold?: number
|
||||
}
|
||||
|
||||
export class MultiRequest extends EventEmitter {
|
||||
@@ -181,6 +182,7 @@ export class MultiRequest extends EventEmitter {
|
||||
|
||||
const tracker = new Tracker()
|
||||
const relays = new Set(options.relays)
|
||||
const threshold = options.threshold || 1
|
||||
|
||||
if (relays.size !== options.relays.length) {
|
||||
console.warn("Non-unique relays passed to MultiRequest")
|
||||
@@ -220,8 +222,9 @@ export class MultiRequest extends EventEmitter {
|
||||
req.on(RequestEvent.Close, () => {
|
||||
this._closed.add(relay)
|
||||
|
||||
if (this._closed.size === relays.size) {
|
||||
if (this._closed.size >= relays.size * threshold) {
|
||||
this.emit(RequestEvent.Close)
|
||||
this.close()
|
||||
}
|
||||
})
|
||||
|
||||
@@ -238,56 +241,73 @@ export class MultiRequest extends EventEmitter {
|
||||
|
||||
export const request = (options: MultiRequestOptions) => new MultiRequest(options)
|
||||
|
||||
export type LoaderOptions = {
|
||||
delay: number
|
||||
timeout?: number
|
||||
threshold?: number
|
||||
context?: AdapterContext
|
||||
isEventValid?: (event: TrustedEvent, url: string) => boolean
|
||||
isEventDeleted?: (event: TrustedEvent, url: string) => boolean
|
||||
}
|
||||
|
||||
export type LoadOptions = {
|
||||
relays: string[]
|
||||
filters: Filter[]
|
||||
}
|
||||
|
||||
/**
|
||||
* A convenience function which returns a promise of events from a request.
|
||||
* Creates a convenience function which returns a promise of events from a request.
|
||||
* It may return early if filter cardinality is known, and it delays requests by
|
||||
* 200 in order to implement batching
|
||||
* @param options - MultiRequestOptions
|
||||
* @returns - a promise containing an array of TrustedEvents
|
||||
*/
|
||||
export const load = batcher(200, async (requests: MultiRequestOptions[]) => {
|
||||
const filtersByRelay = new Map<string, Filter[]>()
|
||||
export const makeLoader = (options: LoaderOptions) =>
|
||||
batcher(options.delay, async (requests: LoadOptions[]) => {
|
||||
const filtersByRelay = new Map<string, Filter[]>()
|
||||
|
||||
for (const {filters, relays} of requests) {
|
||||
for (const relay of relays) {
|
||||
for (const filter of filters) {
|
||||
pushToMapKey(filtersByRelay, relay, filter)
|
||||
for (const {filters, relays} of requests) {
|
||||
for (const relay of relays) {
|
||||
for (const filter of filters) {
|
||||
pushToMapKey(filtersByRelay, relay, filter)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const tracker = new Tracker()
|
||||
const events: TrustedEvent[] = []
|
||||
const tracker = new Tracker()
|
||||
const events: TrustedEvent[] = []
|
||||
|
||||
await Promise.all(
|
||||
Array.from(filtersByRelay).map(
|
||||
async ([relay, unmergedFilters]) =>
|
||||
new Promise<void>(resolve => {
|
||||
const filters = unionFilters(unmergedFilters)
|
||||
const cardinality =
|
||||
filters.length === 1 ? getFilterResultCardinality(filters[0]) : undefined
|
||||
const req = new MultiRequest({
|
||||
filters,
|
||||
tracker,
|
||||
relays: [relay],
|
||||
timeout: 5000,
|
||||
autoClose: true,
|
||||
})
|
||||
await Promise.all(
|
||||
Array.from(filtersByRelay).map(
|
||||
async ([relay, unmergedFilters]) =>
|
||||
new Promise<void>(resolve => {
|
||||
const filters = unionFilters(unmergedFilters)
|
||||
const cardinality =
|
||||
filters.length === 1 ? getFilterResultCardinality(filters[0]) : undefined
|
||||
const req = new MultiRequest({
|
||||
filters,
|
||||
tracker,
|
||||
relays: [relay],
|
||||
autoClose: true,
|
||||
...options
|
||||
})
|
||||
|
||||
let count = 0
|
||||
let count = 0
|
||||
|
||||
req.on(RequestEvent.Event, (event: TrustedEvent) => {
|
||||
events.push(event)
|
||||
req.on(RequestEvent.Event, (event: TrustedEvent) => {
|
||||
events.push(event)
|
||||
|
||||
if (++count === cardinality) {
|
||||
resolve()
|
||||
}
|
||||
})
|
||||
if (++count === cardinality) {
|
||||
resolve()
|
||||
}
|
||||
})
|
||||
|
||||
req.on(RequestEvent.Close, () => resolve())
|
||||
}),
|
||||
),
|
||||
)
|
||||
req.on(RequestEvent.Close, () => resolve())
|
||||
}),
|
||||
),
|
||||
)
|
||||
|
||||
return requests.map(r => events.filter(event => matchFilters(r.filters, event)))
|
||||
})
|
||||
return requests.map(r => events.filter(event => matchFilters(r.filters, event)))
|
||||
})
|
||||
|
||||
export const load = makeLoader({delay: 200, timeout: 3000, threshold: 0.5})
|
||||
|
||||
Reference in New Issue
Block a user