diff --git a/src/app/core/commands.ts b/src/app/core/commands.ts index baff7101..27928f5a 100644 --- a/src/app/core/commands.ts +++ b/src/app/core/commands.ts @@ -110,7 +110,7 @@ import { } from "@app/core/state" import {loadAlertStatuses} from "@app/core/requests" import {platform, platformName, getPushInfo} from "@app/util/push" -import {preferencesStorageProvider, collectionStorageProvider} from "@src/lib/storage" +import {preferencesStorageProvider, Collection} from "@src/lib/storage" // Utils @@ -156,7 +156,7 @@ export const logout = async () => { localStorage.clear() await preferencesStorageProvider.clear() - await collectionStorageProvider.clear() + await Collection.clearAll() } // Synchronization diff --git a/src/app/util/storage.ts b/src/app/util/storage.ts index 1803ff7a..1a0831a3 100644 --- a/src/app/util/storage.ts +++ b/src/app/util/storage.ts @@ -1,4 +1,16 @@ -import {on, throttle, fromPairs, batch, sortBy, concat} from "@welshman/lib" +import {verifiedSymbol} from "nostr-tools/pure" +import { + always, + on, + hash, + last, + groupBy, + throttle, + fromPairs, + batch, + sortBy, + concat, +} from "@welshman/lib" import {throttled, freshness} from "@welshman/store" import { PROFILE, @@ -33,10 +45,23 @@ import { onZapper, onHandle, } from "@welshman/app" -import {collectionStorageProvider} from "@lib/storage" +import {Collection} from "@lib/storage" const syncEvents = async () => { - repository.load(await collectionStorageProvider.get("events")) + const collection = new Collection({ + table: "events", + shards: Array.from("0123456789abcdef"), + getShard: (event: TrustedEvent) => last(event.id), + }) + + const initialEvents = await collection.get() + + // Mark events verified to avoid re-verification of signatures + for (const event of initialEvents) { + event[verifiedSymbol] = true + } + + repository.load(initialEvents) const rankEvent = (event: TrustedEvent) => { switch (event.kind) { @@ -102,24 +127,36 @@ const syncEvents = async () => { } } - if (added.length > 0) { - let events = concat(await collectionStorageProvider.get("events"), added) + if (removed.size > 0) { + for (const [shard, chunk] of groupBy(last, Array.from(removed))) { + const current = await collection.getShard(shard) + const modified = concat( + current.filter(e => !chunk.includes(e.id)), + added, + ) + const pruned = sortBy(e => -rankEvent(e), modified).slice(0, 10_000) - // If we're well above our retention limit, drop lowest-ranked events - if (events.length > 15_000) { - events = sortBy(e => -rankEvent(e), events).slice(10_000) + await collection.setShard(shard, pruned) } - - await collectionStorageProvider.set("events", events) + } else if (added.length > 0) { + await collection.add(added) } }), ) } +type TrackerItem = [string, string[]] + const syncTracker = async () => { + const collection = new Collection({ + table: "tracker", + shards: Array.from("0123456789abcdef"), + getShard: (item: TrackerItem) => last(item[0]), + }) + const relaysById = new Map>() - for (const [id, relays] of await collectionStorageProvider.get<[string, string[]]>("tracker")) { + for (const [id, relays] of await collection.get()) { relaysById.set(id, new Set(relays)) } @@ -129,17 +166,13 @@ const syncTracker = async () => { const updateOne = batch(3000, (ids: string[]) => { p = p.then(() => { - collectionStorageProvider.add( - "tracker", - ids.map(id => [id, Array.from(tracker.getRelays(id))]), - ) + collection.add(ids.map(id => [id, Array.from(tracker.getRelays(id))])) }) }) const updateAll = throttle(3000, () => { p = p.then(() => { - collectionStorageProvider.set( - "tracker", + collection.set( Array.from(tracker.relaysById.entries()).map(([id, relays]) => [id, Array.from(relays)]), ) }) @@ -159,46 +192,70 @@ const syncTracker = async () => { } const syncRelays = async () => { - relays.set(await collectionStorageProvider.get("relays")) - - return throttled(3000, relays).subscribe($relays => { - collectionStorageProvider.set("relays", $relays) + const collection = new Collection({ + table: "relays", + shards: Array.from("0123456789"), + getShard: (item: Relay) => last(hash(item.url)), }) + + relays.set(await collection.get()) + + return throttled(3000, relays).subscribe(collection.set) } const syncHandles = async () => { - handles.set(await collectionStorageProvider.get("handles")) + const collection = new Collection({ + table: "handles", + shards: Array.from("0123456789"), + getShard: (item: Handle) => last(hash(item.nip05)), + }) - return onHandle( - batch(3000, async $handles => { - await collectionStorageProvider.add("handles", $handles) - }), - ) + handles.set(await collection.get()) + + return onHandle(batch(3000, collection.add)) } const syncZappers = async () => { - zappers.set(await collectionStorageProvider.get("zappers")) + const collection = new Collection({ + table: "zappers", + shards: Array.from("0123456789"), + getShard: (item: Zapper) => last(hash(item.lnurl)), + }) - return onZapper( - batch(3000, async $zappers => { - await collectionStorageProvider.add("zappers", $zappers) - }), - ) + zappers.set(await collection.get()) + + return onZapper(batch(3000, collection.add)) } +type FreshnessItem = [string, number] + const syncFreshness = async () => { - freshness.set(fromPairs(await collectionStorageProvider.get<[string, number]>("freshness"))) + const collection = new Collection({ + table: "freshness", + shards: ["0"], + getShard: always("0"), + }) + + freshness.set(fromPairs(await collection.get())) return throttled(3000, freshness).subscribe($freshness => { - collectionStorageProvider.set("freshness", Object.entries($freshness)) + collection.set(Object.entries($freshness)) }) } +type PlaintextItem = [string, string] + const syncPlaintext = async () => { - plaintext.set(fromPairs(await collectionStorageProvider.get<[string, string]>("plaintext"))) + const collection = new Collection({ + table: "plaintext", + shards: ["0"], + getShard: always("0"), + }) + + plaintext.set(fromPairs(await collection.get())) return throttled(3000, plaintext).subscribe($plaintext => { - collectionStorageProvider.set("plaintext", Object.entries($plaintext)) + collection.set(Object.entries($plaintext)) }) } diff --git a/src/lib/storage.ts b/src/lib/storage.ts index d40bf299..3b57e222 100644 --- a/src/lib/storage.ts +++ b/src/lib/storage.ts @@ -1,3 +1,4 @@ +import {flatten, identity, groupBy} from "@welshman/lib" import {type StorageProvider} from "@welshman/store" import {Preferences} from "@capacitor/preferences" import {Encoding, Filesystem, Directory} from "@capacitor/filesystem" @@ -30,79 +31,92 @@ export class PreferencesStorageProvider implements StorageProvider { export const preferencesStorageProvider = new PreferencesStorageProvider() -export class CollectionStorageProvider implements StorageProvider { +export type CollectionOptions = { + table: string + shards: string[] + getShard: (item: T) => string +} + +export class Collection { p = Promise.resolve() - get = async (key: string): Promise => { + constructor(readonly options: CollectionOptions) {} + + static clearAll = async (): Promise => { + const res = await Filesystem.readdir({ + path: "", + directory: Directory.Data, + }) + + await Promise.all( + res.files.map(file => + Filesystem.deleteFile({ + path: file.name, + directory: Directory.Data, + }), + ), + ) + } + + #then = async (f: () => Promise) => { + this.p = this.p.then(f).catch(e => { + console.error(e) + }) + + await this.p + } + + #path = (shard: string) => `collection_${this.options.table}_${shard}.json` + + getShard = async (shard: string): Promise => { try { const file = await Filesystem.readFile({ - path: key + ".json", + path: this.#path(shard), directory: Directory.Data, encoding: Encoding.UTF8, }) - const items: T[] = [] - for (const line of file.data.toString().split("\n")) { - try { - items.push(JSON.parse(line)) - } catch (e) { - // pass - } - } - - return items + // Speed things up by parsing only once + return JSON.parse("[" + file.data.toString().split("\n").filter(identity).join(",") + "]") } catch (err) { // file doesn't exist, or isn't valid json return [] } } - set = async (key: string, value: T[]): Promise => { - this.p = this.p.then(async () => { + get = async (): Promise => flatten(await Promise.all(this.options.shards.map(this.getShard))) + + setShard = (shard: string, items: T[]) => + this.#then(async () => { await Filesystem.writeFile({ - path: key + ".json", + path: this.#path(shard), directory: Directory.Data, encoding: Encoding.UTF8, - data: value.map(v => JSON.stringify(v)).join("\n"), + data: items.map(v => JSON.stringify(v)).join("\n"), }) }) - await this.p - } + set = (items: T[]) => + Promise.all( + Array.from(groupBy(this.options.getShard, items)).map(([shard, chunk]) => + this.setShard(shard, chunk), + ), + ) - add = async (key: string, value: T[]): Promise => { - this.p = this.p.then(async () => { + addToShard = (shard: string, items: T[]) => + this.#then(async () => { await Filesystem.appendFile({ - path: key + ".json", + path: this.#path(shard), directory: Directory.Data, encoding: Encoding.UTF8, - data: "\n" + value.map(v => JSON.stringify(v)).join("\n"), + data: "\n" + items.map(v => JSON.stringify(v)).join("\n"), }) }) - await this.p - } - - clear = async (): Promise => { - this.p = this.p.then(async () => { - try { - const res = await Filesystem.readdir({path: "./", directory: Directory.Data}) - - await Promise.all( - res.files.map(file => - Filesystem.deleteFile({ - path: file.name + ".json", - directory: Directory.Data, - }), - ), - ) - } catch (e) { - // Directory might not have been created yet - } - }) - - await this.p - } + add = (items: T[]) => + Promise.all( + Array.from(groupBy(this.options.getShard, items)).map(([shard, chunk]) => + this.addToShard(shard, chunk), + ), + ) } - -export const collectionStorageProvider = new CollectionStorageProvider()