Remove collection, make loader standalone
This commit is contained in:
@@ -1,142 +0,0 @@
|
|||||||
import {readable, derived, writable, Readable, Subscriber} from "svelte/store"
|
|
||||||
import {batch, indexBy, remove, assoc, now} from "@welshman/lib"
|
|
||||||
import {withGetter, ReadableWithGetter} from "./getter.js"
|
|
||||||
import {memoized} from "./memoize.js"
|
|
||||||
|
|
||||||
// Collection utility
|
|
||||||
|
|
||||||
export type FreshnessUpdate = {
|
|
||||||
ns: string
|
|
||||||
key: string
|
|
||||||
ts: number
|
|
||||||
}
|
|
||||||
|
|
||||||
export const freshness = withGetter(writable<Record<string, number>>({}))
|
|
||||||
|
|
||||||
export const getFreshnessKey = (ns: string, key: string) => `${ns}:${key}`
|
|
||||||
|
|
||||||
export const getFreshness = (ns: string, key: string) =>
|
|
||||||
freshness.get()[getFreshnessKey(ns, key)] || 0
|
|
||||||
|
|
||||||
export const setFreshnessImmediate = ({ns, key, ts}: FreshnessUpdate) =>
|
|
||||||
freshness.update(assoc(getFreshnessKey(ns, key), ts))
|
|
||||||
|
|
||||||
export const setFreshnessThrottled = batch(100, (updates: FreshnessUpdate[]) =>
|
|
||||||
freshness.update($freshness => {
|
|
||||||
for (const {ns, key, ts} of updates) {
|
|
||||||
$freshness[getFreshnessKey(ns, key)] = ts
|
|
||||||
}
|
|
||||||
|
|
||||||
return $freshness
|
|
||||||
}),
|
|
||||||
)
|
|
||||||
|
|
||||||
export type CachedLoaderOptions<T> = {
|
|
||||||
name: string
|
|
||||||
indexStore: ReadableWithGetter<Map<string, T>>
|
|
||||||
load: (key: string, relays: string[]) => Promise<any>
|
|
||||||
subscribers?: Subscriber<T>[]
|
|
||||||
}
|
|
||||||
|
|
||||||
export const makeCachedLoader = <T>({
|
|
||||||
name,
|
|
||||||
load,
|
|
||||||
indexStore,
|
|
||||||
subscribers = [],
|
|
||||||
}: CachedLoaderOptions<T>) => {
|
|
||||||
const pending = new Map<string, Promise<T | void>>()
|
|
||||||
const loadAttempts = new Map<string, number>()
|
|
||||||
|
|
||||||
return async (key: string, relays: string[] = [], force = false) => {
|
|
||||||
const stale = indexStore.get().get(key)
|
|
||||||
|
|
||||||
// If we have no loader function, nothing we can do
|
|
||||||
if (!load) {
|
|
||||||
return stale
|
|
||||||
}
|
|
||||||
|
|
||||||
const freshness = getFreshness(name, key)
|
|
||||||
|
|
||||||
// If we have an item, reload if it's stale
|
|
||||||
if (stale && freshness > now() - 3600 && !force) {
|
|
||||||
return stale
|
|
||||||
}
|
|
||||||
|
|
||||||
// If we already are loading, await and return
|
|
||||||
if (pending.has(key)) {
|
|
||||||
return pending.get(key)!.then(() => indexStore.get().get(key))
|
|
||||||
}
|
|
||||||
|
|
||||||
const attempt = loadAttempts.get(key) || 0
|
|
||||||
|
|
||||||
// Use exponential backoff to throttle attempts
|
|
||||||
if (freshness > now() - Math.pow(2, attempt) && !force) {
|
|
||||||
return stale
|
|
||||||
}
|
|
||||||
|
|
||||||
loadAttempts.set(key, attempt + 1)
|
|
||||||
|
|
||||||
setFreshnessThrottled({ns: name, key, ts: now()})
|
|
||||||
|
|
||||||
const promise = load(key, relays)
|
|
||||||
|
|
||||||
pending.set(key, promise)
|
|
||||||
|
|
||||||
try {
|
|
||||||
await promise
|
|
||||||
} catch (e) {
|
|
||||||
console.warn(`Failed to load ${name} item ${key}`, e)
|
|
||||||
} finally {
|
|
||||||
pending.delete(key)
|
|
||||||
}
|
|
||||||
|
|
||||||
const fresh = indexStore.get().get(key)
|
|
||||||
|
|
||||||
if (fresh) {
|
|
||||||
loadAttempts.delete(key)
|
|
||||||
|
|
||||||
for (const subscriber of subscribers) {
|
|
||||||
subscriber(fresh)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return fresh
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
export type CollectionOptions<T> = {
|
|
||||||
name: string
|
|
||||||
store: Readable<T[]>
|
|
||||||
getKey: (item: T) => string
|
|
||||||
load: (key: string, relays: string[]) => Promise<any>
|
|
||||||
}
|
|
||||||
|
|
||||||
export const collection = <T>({name, store, getKey, load}: CollectionOptions<T>) => {
|
|
||||||
const indexStore = withGetter(derived(store, $items => indexBy(getKey, $items)))
|
|
||||||
|
|
||||||
let subscribers: Subscriber<T>[] = []
|
|
||||||
|
|
||||||
const loadItem = makeCachedLoader({name, load, indexStore, subscribers})
|
|
||||||
|
|
||||||
const deriveItem = (key: string | undefined, relays: string[] = []) => {
|
|
||||||
if (!key) {
|
|
||||||
return readable(undefined)
|
|
||||||
}
|
|
||||||
|
|
||||||
// If we don't yet have the item, or it's stale, trigger a request for it. The derived
|
|
||||||
// store will update when it arrives
|
|
||||||
loadItem(key, relays)
|
|
||||||
|
|
||||||
return memoized<T | undefined>(derived(indexStore, $index => $index.get(key)))
|
|
||||||
}
|
|
||||||
|
|
||||||
const onItem = (cb: Subscriber<T>) => {
|
|
||||||
subscribers.push(cb)
|
|
||||||
|
|
||||||
return () => {
|
|
||||||
subscribers = remove(cb, subscribers)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return {indexStore, deriveItem, loadItem, onItem}
|
|
||||||
}
|
|
||||||
@@ -1,6 +1,6 @@
|
|||||||
export * from "./synced.js"
|
export * from "./synced.js"
|
||||||
export * from "./getter.js"
|
export * from "./getter.js"
|
||||||
|
export * from "./loader.js"
|
||||||
export * from "./throttle.js"
|
export * from "./throttle.js"
|
||||||
export * from "./memoize.js"
|
export * from "./memoize.js"
|
||||||
export * from "./repository.js"
|
export * from "./repository.js"
|
||||||
export * from "./collection.js"
|
|
||||||
|
|||||||
@@ -0,0 +1,71 @@
|
|||||||
|
import {readable, derived, writable, Readable, Subscriber} from "svelte/store"
|
||||||
|
import {Maybe, batch, indexBy, remove, assoc, now} from "@welshman/lib"
|
||||||
|
import {withGetter, ReadableWithGetter} from "./getter.js"
|
||||||
|
import {memoized} from "./memoize.js"
|
||||||
|
|
||||||
|
export type LoaderOptions<T> = {
|
||||||
|
getItem: (key: string) => T
|
||||||
|
getLastFetched: (key: string) => number
|
||||||
|
setLastFetched: (key: string, ts: number) => void
|
||||||
|
load: (key: string, ...args: any[]) => Promise<unknown>
|
||||||
|
timeout?: number
|
||||||
|
}
|
||||||
|
|
||||||
|
export const makeLoader = <T>(options: LoaderOptions<T>) => {
|
||||||
|
const timeout = options.timeout || 3600
|
||||||
|
const pending = new Map<string, Promise<Maybe<T>>>()
|
||||||
|
const attempts = new Map<string, number>()
|
||||||
|
|
||||||
|
const baseLoad = async (key: string, force: boolean, ...args: any[]): Promise<Maybe<T>> => {
|
||||||
|
const stale = options.getItem(key)
|
||||||
|
const lastFetched = options.getLastFetched(key)
|
||||||
|
|
||||||
|
// If we have an item, reload if it's stale
|
||||||
|
if (stale && lastFetched > now() - timeout && !force) {
|
||||||
|
return stale
|
||||||
|
}
|
||||||
|
|
||||||
|
const pendingItem = pending.get(key)
|
||||||
|
|
||||||
|
// If we already are loading, await and return
|
||||||
|
if (pendingItem) {
|
||||||
|
return pendingItem
|
||||||
|
}
|
||||||
|
|
||||||
|
const attempt = attempts.get(key) || 0
|
||||||
|
|
||||||
|
// Use exponential backoff to throttle attempts
|
||||||
|
if (lastFetched > now() - Math.pow(2, attempt) && !force) {
|
||||||
|
return stale
|
||||||
|
}
|
||||||
|
|
||||||
|
attempts.set(key, attempt + 1)
|
||||||
|
|
||||||
|
options.setLastFetched(key, now())
|
||||||
|
|
||||||
|
const promise = options.load(key, ...args).then(() => options.getItem(key))
|
||||||
|
|
||||||
|
pending.set(key, promise)
|
||||||
|
|
||||||
|
let item
|
||||||
|
try {
|
||||||
|
item = await promise
|
||||||
|
} catch (e) {
|
||||||
|
console.warn(`Failed to load ${name} item ${key}`, e)
|
||||||
|
} finally {
|
||||||
|
pending.delete(key)
|
||||||
|
}
|
||||||
|
|
||||||
|
if (item) {
|
||||||
|
attempts.delete(key)
|
||||||
|
}
|
||||||
|
|
||||||
|
return item
|
||||||
|
}
|
||||||
|
|
||||||
|
const load = (key: string, ...args: any[]) => baseLoad(key, false, ...args)
|
||||||
|
|
||||||
|
const forceLoad = (key: string, ...args: any[]) => baseLoad(key, true, ...args)
|
||||||
|
|
||||||
|
return {load, forceLoad}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user