Rework storage adapters
This commit is contained in:
@@ -1,111 +0,0 @@
|
||||
import {derived} from "svelte/store"
|
||||
import {batch, fromPairs} from "@welshman/lib"
|
||||
import {
|
||||
PROFILE,
|
||||
FOLLOWS,
|
||||
MUTES,
|
||||
RELAYS,
|
||||
INBOX_RELAYS,
|
||||
getPubkeyTagValues,
|
||||
getListTags,
|
||||
} from "@welshman/util"
|
||||
import {throttled, withGetter} from "@welshman/store"
|
||||
import {RepositoryUpdate} from "@welshman/relay"
|
||||
import {getAll, bulkPut, bulkDelete} from "./storage.js"
|
||||
import {relays} from "./relays.js"
|
||||
import {handles, onHandle} from "./handles.js"
|
||||
import {zappers, onZapper} from "./zappers.js"
|
||||
import {plaintext} from "./plaintext.js"
|
||||
import {freshness} from "./freshness.js"
|
||||
import {repository} from "./core.js"
|
||||
import {sessions} from "./session.js"
|
||||
import {userFollows} from "./user.js"
|
||||
|
||||
export const defaultStorageAdapters = {
|
||||
relays: {
|
||||
keyPath: "url",
|
||||
init: async () => relays.set(await getAll("relays")),
|
||||
sync: () => throttled(3000, relays).subscribe($relays => bulkPut("relays", $relays)),
|
||||
},
|
||||
handles: {
|
||||
keyPath: "nip05",
|
||||
init: async () => handles.set(await getAll("handles")),
|
||||
sync: () => onHandle(batch(300, $handles => bulkPut("handles", $handles))),
|
||||
},
|
||||
zappers: {
|
||||
keyPath: "lnurl",
|
||||
init: async () => zappers.set(await getAll("zappers")),
|
||||
sync: () => onZapper(batch(300, $zappers => bulkPut("zappers", $zappers))),
|
||||
},
|
||||
freshness: {
|
||||
keyPath: "key",
|
||||
init: async () => {
|
||||
const items = await getAll("freshness")
|
||||
|
||||
freshness.set(fromPairs(items.map(item => [item.key, item.value])))
|
||||
},
|
||||
sync: () => {
|
||||
const interval = setInterval(() => {
|
||||
bulkPut(
|
||||
"freshness",
|
||||
Object.entries(freshness.get()).map(([key, value]) => ({key, value})),
|
||||
)
|
||||
}, 10_000)
|
||||
|
||||
return () => clearInterval(interval)
|
||||
},
|
||||
},
|
||||
plaintext: {
|
||||
keyPath: "key",
|
||||
init: async () => {
|
||||
const items = await getAll("plaintext")
|
||||
|
||||
plaintext.set(fromPairs(items.map(item => [item.key, item.value])))
|
||||
},
|
||||
sync: () => {
|
||||
const interval = setInterval(() => {
|
||||
bulkPut(
|
||||
"plaintext",
|
||||
Object.entries(plaintext.get()).map(([key, value]) => ({key, value})),
|
||||
)
|
||||
}, 10_000)
|
||||
|
||||
return () => clearInterval(interval)
|
||||
},
|
||||
},
|
||||
events: {
|
||||
keyPath: "id",
|
||||
init: async () => repository.load(await getAll("events")),
|
||||
sync: () => {
|
||||
const userFollowPubkeys = withGetter(
|
||||
derived(userFollows, l => new Set(getPubkeyTagValues(getListTags(l)))),
|
||||
)
|
||||
|
||||
const onUpdate = async ({added, removed}: RepositoryUpdate) => {
|
||||
const sessionKeys = new Set(Object.keys(sessions.get()))
|
||||
const metaKinds = [PROFILE, FOLLOWS, MUTES, RELAYS, INBOX_RELAYS]
|
||||
|
||||
if (removed.size > 0) {
|
||||
await bulkDelete("events", Array.from(removed))
|
||||
}
|
||||
|
||||
if (added.length > 0) {
|
||||
await bulkPut(
|
||||
"events",
|
||||
added.filter(e => {
|
||||
if (sessionKeys.has(e.pubkey)) return true
|
||||
if (e.tags.some(t => sessionKeys.has(t[1]))) return true
|
||||
if (metaKinds.includes(e.kind) && userFollowPubkeys.get()?.has(e.pubkey)) return true
|
||||
|
||||
return false
|
||||
}),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
repository.on("update", onUpdate)
|
||||
|
||||
return () => repository.off("update", onUpdate)
|
||||
},
|
||||
},
|
||||
}
|
||||
@@ -1,4 +1,3 @@
|
||||
export * from "./adapters.js"
|
||||
export * from "./context.js"
|
||||
export * from "./core.js"
|
||||
export * from "./collection.js"
|
||||
@@ -17,6 +16,7 @@ export * from "./router.js"
|
||||
export * from "./search.js"
|
||||
export * from "./session.js"
|
||||
export * from "./storage.js"
|
||||
export * from "./storageAdapters.js"
|
||||
export * from "./sync.js"
|
||||
export * from "./tags.js"
|
||||
export * from "./thunk.js"
|
||||
|
||||
@@ -28,7 +28,7 @@ export const {
|
||||
export const displayProfileByPubkey = (pubkey: string | undefined) =>
|
||||
pubkey ? displayProfile(profilesByPubkey.get().get(pubkey), displayPubkey(pubkey)) : ""
|
||||
|
||||
export const deriveProfileDisplay = (pubkey: string | undefined) =>
|
||||
export const deriveProfileDisplay = (pubkey: string | undefined, relays: string[] = []) =>
|
||||
pubkey
|
||||
? derived(deriveProfile(pubkey), $profile => displayProfile($profile, displayPubkey(pubkey)))
|
||||
? derived(deriveProfile(pubkey, relays), $profile => displayProfile($profile, displayPubkey(pubkey)))
|
||||
: readable("")
|
||||
|
||||
@@ -120,3 +120,4 @@ export const clearStorage = async () => {
|
||||
db = undefined // force initStorage to run again in tests
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -0,0 +1,253 @@
|
||||
import {derived} from "svelte/store"
|
||||
import {batch, throttle, sortBy, call, fromPairs} from "@welshman/lib"
|
||||
import {
|
||||
PROFILE,
|
||||
FOLLOWS,
|
||||
MUTES,
|
||||
RELAYS,
|
||||
INBOX_RELAYS,
|
||||
getPubkeyTagValues,
|
||||
getListTags,
|
||||
TrustedEvent,
|
||||
} from "@welshman/util"
|
||||
import {throttled, withGetter, WritableWithGetter} from "@welshman/store"
|
||||
import {Tracker} from "@welshman/net"
|
||||
import {Repository, RepositoryUpdate} from "@welshman/relay"
|
||||
import {getAll, bulkPut, bulkDelete} from "./storage.js"
|
||||
import {relays} from "./relays.js"
|
||||
import {handles, onHandle} from "./handles.js"
|
||||
import {zappers, onZapper} from "./zappers.js"
|
||||
import {plaintext} from "./plaintext.js"
|
||||
import {freshness} from "./freshness.js"
|
||||
import {repository, tracker} from "./core.js"
|
||||
import {sessions} from "./session.js"
|
||||
import {userFollows} from "./user.js"
|
||||
|
||||
export type RelaysStorageAdapterOptions = {
|
||||
name: string
|
||||
}
|
||||
|
||||
export class RelaysStorageAdapter {
|
||||
keyPath = "url"
|
||||
|
||||
constructor(readonly options: RelaysStorageAdapterOptions) {}
|
||||
|
||||
async init() {
|
||||
relays.set(await getAll(this.options.name))
|
||||
}
|
||||
|
||||
sync() {
|
||||
return throttled(3000, relays).subscribe($relays => bulkPut(this.options.name, $relays))
|
||||
}
|
||||
}
|
||||
|
||||
export type HandlesStorageAdapterOptions = {
|
||||
name: string
|
||||
}
|
||||
|
||||
export class HandlesStorageAdapter {
|
||||
keyPath = "nip05"
|
||||
|
||||
constructor(readonly options: HandlesStorageAdapterOptions) {}
|
||||
|
||||
async init() {
|
||||
handles.set(await getAll(this.options.name))
|
||||
}
|
||||
|
||||
sync() {
|
||||
return onHandle(batch(300, $handles => bulkPut(this.options.name, $handles)))
|
||||
}
|
||||
}
|
||||
|
||||
export type ZappersStorageAdapterOptions = {
|
||||
name: string
|
||||
}
|
||||
|
||||
export class ZappersStorageAdapter {
|
||||
keyPath = "lnurl"
|
||||
|
||||
constructor(readonly options: ZappersStorageAdapterOptions) {}
|
||||
|
||||
async init() {
|
||||
zappers.set(await getAll(this.options.name))
|
||||
}
|
||||
|
||||
sync() {
|
||||
return onZapper(batch(300, $zappers => bulkPut(this.options.name, $zappers)))
|
||||
}
|
||||
}
|
||||
|
||||
export type FreshnessStorageAdapterOptions = {
|
||||
name: string
|
||||
}
|
||||
|
||||
export class FreshnessStorageAdapter {
|
||||
keyPath = "key"
|
||||
|
||||
constructor(readonly options: FreshnessStorageAdapterOptions) {}
|
||||
|
||||
async init() {
|
||||
const items = await getAll(this.options.name)
|
||||
|
||||
freshness.set(fromPairs(items.map(item => [item.key, item.value])))
|
||||
}
|
||||
|
||||
sync() {
|
||||
const interval = setInterval(() => {
|
||||
bulkPut(
|
||||
this.options.name,
|
||||
Object.entries(freshness.get()).map(([key, value]) => ({key, value})),
|
||||
)
|
||||
}, 10_000)
|
||||
|
||||
return () => clearInterval(interval)
|
||||
}
|
||||
}
|
||||
|
||||
export type PlaintextStorageAdapterOptions = {
|
||||
name: string
|
||||
}
|
||||
|
||||
export class PlaintextStorageAdapter {
|
||||
keyPath = "key"
|
||||
|
||||
constructor(readonly options: PlaintextStorageAdapterOptions) {}
|
||||
|
||||
async init() {
|
||||
const items = await getAll(this.options.name)
|
||||
|
||||
plaintext.set(fromPairs(items.map(item => [item.key, item.value])))
|
||||
}
|
||||
|
||||
sync() {
|
||||
const interval = setInterval(() => {
|
||||
bulkPut(
|
||||
this.options.name,
|
||||
Object.entries(plaintext.get())
|
||||
.map(([key, value]) => ({key, value})),
|
||||
)
|
||||
}, 10_000)
|
||||
|
||||
return () => clearInterval(interval)
|
||||
}
|
||||
}
|
||||
|
||||
export type TrackerStorageAdapterOptions = {
|
||||
name: string
|
||||
tracker: Tracker
|
||||
}
|
||||
|
||||
export class TrackerStorageAdapter {
|
||||
keyPath = "id"
|
||||
|
||||
constructor(readonly options: TrackerStorageAdapterOptions) {}
|
||||
|
||||
async init() {
|
||||
const relaysById = new Map<string, Set<string>>()
|
||||
|
||||
for (const {id, relays} of await getAll(this.options.name)) {
|
||||
relaysById.set(id, new Set(relays))
|
||||
}
|
||||
|
||||
this.options.tracker.load(relaysById)
|
||||
}
|
||||
|
||||
sync() {
|
||||
const onUpdate = throttle(3000, async () => {
|
||||
await bulkPut(
|
||||
this.options.name,
|
||||
Array.from(this.options.tracker.relaysById.entries())
|
||||
.map(([id, relays]) => ({id, relays: Array.from(relays)}))
|
||||
)
|
||||
})
|
||||
|
||||
this.options.tracker.on('update', onUpdate)
|
||||
|
||||
return () => this.options.tracker.off('update', onUpdate)
|
||||
}
|
||||
}
|
||||
|
||||
export type EventsStorageAdapterOptions = {
|
||||
name: string
|
||||
limit: number
|
||||
repository: Repository
|
||||
rankEvent: (event: TrustedEvent) => number
|
||||
}
|
||||
|
||||
export class EventsStorageAdapter {
|
||||
keyPath = "id"
|
||||
eventCount = 0
|
||||
|
||||
constructor(readonly options: EventsStorageAdapterOptions) {}
|
||||
|
||||
async init() {
|
||||
const events = await getAll(this.options.name)
|
||||
|
||||
this.eventCount = events.length
|
||||
|
||||
this.options.repository.load(events)
|
||||
}
|
||||
|
||||
sync() {
|
||||
const {name, limit, rankEvent} = this.options
|
||||
|
||||
const onUpdate = async ({added, removed}: RepositoryUpdate) => {
|
||||
// Only add events we want to keep
|
||||
const keep = added.filter(e => rankEvent(e) > 0)
|
||||
|
||||
// Add new events
|
||||
if (keep.length > 0) {
|
||||
await bulkPut(name, keep)
|
||||
}
|
||||
|
||||
// If we're well above our retention limit, drop lowest-ranked events
|
||||
if (this.eventCount > limit * 1.5) {
|
||||
removed = new Set(removed)
|
||||
|
||||
for (const event of sortBy(e => -rankEvent(e), await getAll(name)).slice(limit)) {
|
||||
removed.add(event.id)
|
||||
}
|
||||
}
|
||||
|
||||
if (removed.size > 0) {
|
||||
await bulkDelete(name, Array.from(removed))
|
||||
}
|
||||
|
||||
// Keep track of our total number of events. This isn't strictly accurate, but it's close enough
|
||||
this.eventCount = this.eventCount + keep.length - removed.size
|
||||
}
|
||||
|
||||
this.options.repository.on("update", onUpdate)
|
||||
|
||||
return () => this.options.repository.off("update", onUpdate)
|
||||
}
|
||||
}
|
||||
|
||||
export const defaultStorageAdapters = {
|
||||
relays: new RelaysStorageAdapter({name: 'relays'}),
|
||||
handles: new HandlesStorageAdapter({name: 'handles'}),
|
||||
zappers: new ZappersStorageAdapter({name: 'zappers'}),
|
||||
freshness: new FreshnessStorageAdapter({name: 'freshness'}),
|
||||
plaintext: new PlaintextStorageAdapter({name: 'plaintext'}),
|
||||
tracker: new TrackerStorageAdapter({name: 'tracker', tracker}),
|
||||
events: new EventsStorageAdapter(call(() => {
|
||||
const userFollowPubkeys = withGetter(
|
||||
derived(userFollows, l => new Set(getPubkeyTagValues(getListTags(l)))),
|
||||
)
|
||||
|
||||
return {
|
||||
repository,
|
||||
name: 'events',
|
||||
limit: 10_000,
|
||||
rankEvent: (e: TrustedEvent) => {
|
||||
const $sessions = sessions.get()
|
||||
const metaKinds = [PROFILE, FOLLOWS, MUTES, RELAYS, INBOX_RELAYS]
|
||||
|
||||
if ($sessions[e.pubkey] || e.tags.some(t => $sessions[t[1]])) return 1
|
||||
if (metaKinds.includes(e.kind) && userFollowPubkeys.get()?.has(e.pubkey)) return 1
|
||||
|
||||
return 0
|
||||
},
|
||||
}
|
||||
})),
|
||||
}
|
||||
@@ -1,7 +1,20 @@
|
||||
import {bech32, utf8} from "@scure/base"
|
||||
|
||||
|
||||
type Obj<T = any> = Record<string, T>
|
||||
|
||||
// ----------------------------------------------------------------------------
|
||||
// Working with nil
|
||||
// ----------------------------------------------------------------------------
|
||||
|
||||
export type Nil = null | undefined
|
||||
|
||||
export const isNil = <T>(x: T, ...args: unknown[]) => x === undefined || x === null
|
||||
|
||||
export const isNotNil = <T>(x: T, ...args: unknown[]) => x !== undefined || x !== null
|
||||
|
||||
export const assertNotNil = <T>(x: T, ...args: unknown[]) => x!
|
||||
|
||||
// ----------------------------------------------------------------------------
|
||||
// Basic functional programming utilities
|
||||
// ----------------------------------------------------------------------------
|
||||
@@ -610,6 +623,9 @@ export const toIterable = (x: any) => (isIterable(x) ? x : [x])
|
||||
/** Ensures value is array by wrapping if needed */
|
||||
export const ensurePlural = <T>(x: T | T[]) => (x instanceof Array ? x : [x])
|
||||
|
||||
/** Ensures values are not undefined */
|
||||
export const removeNil = <T>(xs: T[]) => xs.filter(isNotNil).map(assertNotNil)
|
||||
|
||||
// ----------------------------------------------------------------------------
|
||||
// Objects
|
||||
// ----------------------------------------------------------------------------
|
||||
|
||||
Reference in New Issue
Block a user