From c5ea7edf6bbd1a76e3d93c6ea864a6b1f4a9b99c Mon Sep 17 00:00:00 2001 From: Jon Staab Date: Wed, 16 Apr 2025 12:53:49 -0700 Subject: [PATCH] Rework storage adapters --- packages/app/src/adapters.ts | 111 ------------ packages/app/src/index.ts | 2 +- packages/app/src/profiles.ts | 4 +- packages/app/src/storage.ts | 1 + packages/app/src/storageAdapters.ts | 253 ++++++++++++++++++++++++++++ packages/lib/src/Tools.ts | 16 ++ 6 files changed, 273 insertions(+), 114 deletions(-) delete mode 100644 packages/app/src/adapters.ts create mode 100644 packages/app/src/storageAdapters.ts diff --git a/packages/app/src/adapters.ts b/packages/app/src/adapters.ts deleted file mode 100644 index a01ffb8..0000000 --- a/packages/app/src/adapters.ts +++ /dev/null @@ -1,111 +0,0 @@ -import {derived} from "svelte/store" -import {batch, fromPairs} from "@welshman/lib" -import { - PROFILE, - FOLLOWS, - MUTES, - RELAYS, - INBOX_RELAYS, - getPubkeyTagValues, - getListTags, -} from "@welshman/util" -import {throttled, withGetter} from "@welshman/store" -import {RepositoryUpdate} from "@welshman/relay" -import {getAll, bulkPut, bulkDelete} from "./storage.js" -import {relays} from "./relays.js" -import {handles, onHandle} from "./handles.js" -import {zappers, onZapper} from "./zappers.js" -import {plaintext} from "./plaintext.js" -import {freshness} from "./freshness.js" -import {repository} from "./core.js" -import {sessions} from "./session.js" -import {userFollows} from "./user.js" - -export const defaultStorageAdapters = { - relays: { - keyPath: "url", - init: async () => relays.set(await getAll("relays")), - sync: () => throttled(3000, relays).subscribe($relays => bulkPut("relays", $relays)), - }, - handles: { - keyPath: "nip05", - init: async () => handles.set(await getAll("handles")), - sync: () => onHandle(batch(300, $handles => bulkPut("handles", $handles))), - }, - zappers: { - keyPath: "lnurl", - init: async () => zappers.set(await getAll("zappers")), - sync: () => onZapper(batch(300, $zappers => bulkPut("zappers", $zappers))), - }, - freshness: { - keyPath: "key", - init: async () => { - const items = await getAll("freshness") - - freshness.set(fromPairs(items.map(item => [item.key, item.value]))) - }, - sync: () => { - const interval = setInterval(() => { - bulkPut( - "freshness", - Object.entries(freshness.get()).map(([key, value]) => ({key, value})), - ) - }, 10_000) - - return () => clearInterval(interval) - }, - }, - plaintext: { - keyPath: "key", - init: async () => { - const items = await getAll("plaintext") - - plaintext.set(fromPairs(items.map(item => [item.key, item.value]))) - }, - sync: () => { - const interval = setInterval(() => { - bulkPut( - "plaintext", - Object.entries(plaintext.get()).map(([key, value]) => ({key, value})), - ) - }, 10_000) - - return () => clearInterval(interval) - }, - }, - events: { - keyPath: "id", - init: async () => repository.load(await getAll("events")), - sync: () => { - const userFollowPubkeys = withGetter( - derived(userFollows, l => new Set(getPubkeyTagValues(getListTags(l)))), - ) - - const onUpdate = async ({added, removed}: RepositoryUpdate) => { - const sessionKeys = new Set(Object.keys(sessions.get())) - const metaKinds = [PROFILE, FOLLOWS, MUTES, RELAYS, INBOX_RELAYS] - - if (removed.size > 0) { - await bulkDelete("events", Array.from(removed)) - } - - if (added.length > 0) { - await bulkPut( - "events", - added.filter(e => { - if (sessionKeys.has(e.pubkey)) return true - if (e.tags.some(t => sessionKeys.has(t[1]))) return true - if (metaKinds.includes(e.kind) && userFollowPubkeys.get()?.has(e.pubkey)) return true - - return false - }), - ) - } - } - - repository.on("update", onUpdate) - - return () => repository.off("update", onUpdate) - }, - }, -} diff --git a/packages/app/src/index.ts b/packages/app/src/index.ts index 4200350..f902da1 100644 --- a/packages/app/src/index.ts +++ b/packages/app/src/index.ts @@ -1,4 +1,3 @@ -export * from "./adapters.js" export * from "./context.js" export * from "./core.js" export * from "./collection.js" @@ -17,6 +16,7 @@ export * from "./router.js" export * from "./search.js" export * from "./session.js" export * from "./storage.js" +export * from "./storageAdapters.js" export * from "./sync.js" export * from "./tags.js" export * from "./thunk.js" diff --git a/packages/app/src/profiles.ts b/packages/app/src/profiles.ts index aa0aed2..128c084 100644 --- a/packages/app/src/profiles.ts +++ b/packages/app/src/profiles.ts @@ -28,7 +28,7 @@ export const { export const displayProfileByPubkey = (pubkey: string | undefined) => pubkey ? displayProfile(profilesByPubkey.get().get(pubkey), displayPubkey(pubkey)) : "" -export const deriveProfileDisplay = (pubkey: string | undefined) => +export const deriveProfileDisplay = (pubkey: string | undefined, relays: string[] = []) => pubkey - ? derived(deriveProfile(pubkey), $profile => displayProfile($profile, displayPubkey(pubkey))) + ? derived(deriveProfile(pubkey, relays), $profile => displayProfile($profile, displayPubkey(pubkey))) : readable("") diff --git a/packages/app/src/storage.ts b/packages/app/src/storage.ts index ba21060..855f8a8 100644 --- a/packages/app/src/storage.ts +++ b/packages/app/src/storage.ts @@ -120,3 +120,4 @@ export const clearStorage = async () => { db = undefined // force initStorage to run again in tests } } + diff --git a/packages/app/src/storageAdapters.ts b/packages/app/src/storageAdapters.ts new file mode 100644 index 0000000..504eda7 --- /dev/null +++ b/packages/app/src/storageAdapters.ts @@ -0,0 +1,253 @@ +import {derived} from "svelte/store" +import {batch, throttle, sortBy, call, fromPairs} from "@welshman/lib" +import { + PROFILE, + FOLLOWS, + MUTES, + RELAYS, + INBOX_RELAYS, + getPubkeyTagValues, + getListTags, + TrustedEvent, +} from "@welshman/util" +import {throttled, withGetter, WritableWithGetter} from "@welshman/store" +import {Tracker} from "@welshman/net" +import {Repository, RepositoryUpdate} from "@welshman/relay" +import {getAll, bulkPut, bulkDelete} from "./storage.js" +import {relays} from "./relays.js" +import {handles, onHandle} from "./handles.js" +import {zappers, onZapper} from "./zappers.js" +import {plaintext} from "./plaintext.js" +import {freshness} from "./freshness.js" +import {repository, tracker} from "./core.js" +import {sessions} from "./session.js" +import {userFollows} from "./user.js" + +export type RelaysStorageAdapterOptions = { + name: string +} + +export class RelaysStorageAdapter { + keyPath = "url" + + constructor(readonly options: RelaysStorageAdapterOptions) {} + + async init() { + relays.set(await getAll(this.options.name)) + } + + sync() { + return throttled(3000, relays).subscribe($relays => bulkPut(this.options.name, $relays)) + } +} + +export type HandlesStorageAdapterOptions = { + name: string +} + +export class HandlesStorageAdapter { + keyPath = "nip05" + + constructor(readonly options: HandlesStorageAdapterOptions) {} + + async init() { + handles.set(await getAll(this.options.name)) + } + + sync() { + return onHandle(batch(300, $handles => bulkPut(this.options.name, $handles))) + } +} + +export type ZappersStorageAdapterOptions = { + name: string +} + +export class ZappersStorageAdapter { + keyPath = "lnurl" + + constructor(readonly options: ZappersStorageAdapterOptions) {} + + async init() { + zappers.set(await getAll(this.options.name)) + } + + sync() { + return onZapper(batch(300, $zappers => bulkPut(this.options.name, $zappers))) + } +} + +export type FreshnessStorageAdapterOptions = { + name: string +} + +export class FreshnessStorageAdapter { + keyPath = "key" + + constructor(readonly options: FreshnessStorageAdapterOptions) {} + + async init() { + const items = await getAll(this.options.name) + + freshness.set(fromPairs(items.map(item => [item.key, item.value]))) + } + + sync() { + const interval = setInterval(() => { + bulkPut( + this.options.name, + Object.entries(freshness.get()).map(([key, value]) => ({key, value})), + ) + }, 10_000) + + return () => clearInterval(interval) + } +} + +export type PlaintextStorageAdapterOptions = { + name: string +} + +export class PlaintextStorageAdapter { + keyPath = "key" + + constructor(readonly options: PlaintextStorageAdapterOptions) {} + + async init() { + const items = await getAll(this.options.name) + + plaintext.set(fromPairs(items.map(item => [item.key, item.value]))) + } + + sync() { + const interval = setInterval(() => { + bulkPut( + this.options.name, + Object.entries(plaintext.get()) + .map(([key, value]) => ({key, value})), + ) + }, 10_000) + + return () => clearInterval(interval) + } +} + +export type TrackerStorageAdapterOptions = { + name: string + tracker: Tracker +} + +export class TrackerStorageAdapter { + keyPath = "id" + + constructor(readonly options: TrackerStorageAdapterOptions) {} + + async init() { + const relaysById = new Map>() + + for (const {id, relays} of await getAll(this.options.name)) { + relaysById.set(id, new Set(relays)) + } + + this.options.tracker.load(relaysById) + } + + sync() { + const onUpdate = throttle(3000, async () => { + await bulkPut( + this.options.name, + Array.from(this.options.tracker.relaysById.entries()) + .map(([id, relays]) => ({id, relays: Array.from(relays)})) + ) + }) + + this.options.tracker.on('update', onUpdate) + + return () => this.options.tracker.off('update', onUpdate) + } +} + +export type EventsStorageAdapterOptions = { + name: string + limit: number + repository: Repository + rankEvent: (event: TrustedEvent) => number +} + +export class EventsStorageAdapter { + keyPath = "id" + eventCount = 0 + + constructor(readonly options: EventsStorageAdapterOptions) {} + + async init() { + const events = await getAll(this.options.name) + + this.eventCount = events.length + + this.options.repository.load(events) + } + + sync() { + const {name, limit, rankEvent} = this.options + + const onUpdate = async ({added, removed}: RepositoryUpdate) => { + // Only add events we want to keep + const keep = added.filter(e => rankEvent(e) > 0) + + // Add new events + if (keep.length > 0) { + await bulkPut(name, keep) + } + + // If we're well above our retention limit, drop lowest-ranked events + if (this.eventCount > limit * 1.5) { + removed = new Set(removed) + + for (const event of sortBy(e => -rankEvent(e), await getAll(name)).slice(limit)) { + removed.add(event.id) + } + } + + if (removed.size > 0) { + await bulkDelete(name, Array.from(removed)) + } + + // Keep track of our total number of events. This isn't strictly accurate, but it's close enough + this.eventCount = this.eventCount + keep.length - removed.size + } + + this.options.repository.on("update", onUpdate) + + return () => this.options.repository.off("update", onUpdate) + } +} + +export const defaultStorageAdapters = { + relays: new RelaysStorageAdapter({name: 'relays'}), + handles: new HandlesStorageAdapter({name: 'handles'}), + zappers: new ZappersStorageAdapter({name: 'zappers'}), + freshness: new FreshnessStorageAdapter({name: 'freshness'}), + plaintext: new PlaintextStorageAdapter({name: 'plaintext'}), + tracker: new TrackerStorageAdapter({name: 'tracker', tracker}), + events: new EventsStorageAdapter(call(() => { + const userFollowPubkeys = withGetter( + derived(userFollows, l => new Set(getPubkeyTagValues(getListTags(l)))), + ) + + return { + repository, + name: 'events', + limit: 10_000, + rankEvent: (e: TrustedEvent) => { + const $sessions = sessions.get() + const metaKinds = [PROFILE, FOLLOWS, MUTES, RELAYS, INBOX_RELAYS] + + if ($sessions[e.pubkey] || e.tags.some(t => $sessions[t[1]])) return 1 + if (metaKinds.includes(e.kind) && userFollowPubkeys.get()?.has(e.pubkey)) return 1 + + return 0 + }, + } + })), +} diff --git a/packages/lib/src/Tools.ts b/packages/lib/src/Tools.ts index a6685c5..dceeed6 100644 --- a/packages/lib/src/Tools.ts +++ b/packages/lib/src/Tools.ts @@ -1,7 +1,20 @@ import {bech32, utf8} from "@scure/base" + type Obj = Record +// ---------------------------------------------------------------------------- +// Working with nil +// ---------------------------------------------------------------------------- + +export type Nil = null | undefined + +export const isNil = (x: T, ...args: unknown[]) => x === undefined || x === null + +export const isNotNil = (x: T, ...args: unknown[]) => x !== undefined || x !== null + +export const assertNotNil = (x: T, ...args: unknown[]) => x! + // ---------------------------------------------------------------------------- // Basic functional programming utilities // ---------------------------------------------------------------------------- @@ -610,6 +623,9 @@ export const toIterable = (x: any) => (isIterable(x) ? x : [x]) /** Ensures value is array by wrapping if needed */ export const ensurePlural = (x: T | T[]) => (x instanceof Array ? x : [x]) +/** Ensures values are not undefined */ +export const removeNil = (xs: T[]) => xs.filter(isNotNil).map(assertNotNil) + // ---------------------------------------------------------------------------- // Objects // ----------------------------------------------------------------------------