forked from coracle/flotilla
Re-work storage to optimize file access
This commit is contained in:
@@ -110,7 +110,7 @@ import {
|
|||||||
} from "@app/core/state"
|
} from "@app/core/state"
|
||||||
import {loadAlertStatuses} from "@app/core/requests"
|
import {loadAlertStatuses} from "@app/core/requests"
|
||||||
import {platform, platformName, getPushInfo} from "@app/util/push"
|
import {platform, platformName, getPushInfo} from "@app/util/push"
|
||||||
import {preferencesStorageProvider, collectionStorageProvider} from "@src/lib/storage"
|
import {preferencesStorageProvider, Collection} from "@src/lib/storage"
|
||||||
|
|
||||||
// Utils
|
// Utils
|
||||||
|
|
||||||
@@ -156,7 +156,7 @@ export const logout = async () => {
|
|||||||
localStorage.clear()
|
localStorage.clear()
|
||||||
|
|
||||||
await preferencesStorageProvider.clear()
|
await preferencesStorageProvider.clear()
|
||||||
await collectionStorageProvider.clear()
|
await Collection.clearAll()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Synchronization
|
// Synchronization
|
||||||
|
|||||||
+94
-37
@@ -1,4 +1,16 @@
|
|||||||
import {on, throttle, fromPairs, batch, sortBy, concat} from "@welshman/lib"
|
import {verifiedSymbol} from "nostr-tools/pure"
|
||||||
|
import {
|
||||||
|
always,
|
||||||
|
on,
|
||||||
|
hash,
|
||||||
|
last,
|
||||||
|
groupBy,
|
||||||
|
throttle,
|
||||||
|
fromPairs,
|
||||||
|
batch,
|
||||||
|
sortBy,
|
||||||
|
concat,
|
||||||
|
} from "@welshman/lib"
|
||||||
import {throttled, freshness} from "@welshman/store"
|
import {throttled, freshness} from "@welshman/store"
|
||||||
import {
|
import {
|
||||||
PROFILE,
|
PROFILE,
|
||||||
@@ -33,10 +45,23 @@ import {
|
|||||||
onZapper,
|
onZapper,
|
||||||
onHandle,
|
onHandle,
|
||||||
} from "@welshman/app"
|
} from "@welshman/app"
|
||||||
import {collectionStorageProvider} from "@lib/storage"
|
import {Collection} from "@lib/storage"
|
||||||
|
|
||||||
const syncEvents = async () => {
|
const syncEvents = async () => {
|
||||||
repository.load(await collectionStorageProvider.get<TrustedEvent>("events"))
|
const collection = new Collection<TrustedEvent>({
|
||||||
|
table: "events",
|
||||||
|
shards: Array.from("0123456789abcdef"),
|
||||||
|
getShard: (event: TrustedEvent) => last(event.id),
|
||||||
|
})
|
||||||
|
|
||||||
|
const initialEvents = await collection.get()
|
||||||
|
|
||||||
|
// Mark events verified to avoid re-verification of signatures
|
||||||
|
for (const event of initialEvents) {
|
||||||
|
event[verifiedSymbol] = true
|
||||||
|
}
|
||||||
|
|
||||||
|
repository.load(initialEvents)
|
||||||
|
|
||||||
const rankEvent = (event: TrustedEvent) => {
|
const rankEvent = (event: TrustedEvent) => {
|
||||||
switch (event.kind) {
|
switch (event.kind) {
|
||||||
@@ -102,24 +127,36 @@ const syncEvents = async () => {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (added.length > 0) {
|
if (removed.size > 0) {
|
||||||
let events = concat(await collectionStorageProvider.get<TrustedEvent>("events"), added)
|
for (const [shard, chunk] of groupBy(last, Array.from(removed))) {
|
||||||
|
const current = await collection.getShard(shard)
|
||||||
|
const modified = concat(
|
||||||
|
current.filter(e => !chunk.includes(e.id)),
|
||||||
|
added,
|
||||||
|
)
|
||||||
|
const pruned = sortBy(e => -rankEvent(e), modified).slice(0, 10_000)
|
||||||
|
|
||||||
// If we're well above our retention limit, drop lowest-ranked events
|
await collection.setShard(shard, pruned)
|
||||||
if (events.length > 15_000) {
|
|
||||||
events = sortBy(e => -rankEvent(e), events).slice(10_000)
|
|
||||||
}
|
}
|
||||||
|
} else if (added.length > 0) {
|
||||||
await collectionStorageProvider.set("events", events)
|
await collection.add(added)
|
||||||
}
|
}
|
||||||
}),
|
}),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type TrackerItem = [string, string[]]
|
||||||
|
|
||||||
const syncTracker = async () => {
|
const syncTracker = async () => {
|
||||||
|
const collection = new Collection<TrackerItem>({
|
||||||
|
table: "tracker",
|
||||||
|
shards: Array.from("0123456789abcdef"),
|
||||||
|
getShard: (item: TrackerItem) => last(item[0]),
|
||||||
|
})
|
||||||
|
|
||||||
const relaysById = new Map<string, Set<string>>()
|
const relaysById = new Map<string, Set<string>>()
|
||||||
|
|
||||||
for (const [id, relays] of await collectionStorageProvider.get<[string, string[]]>("tracker")) {
|
for (const [id, relays] of await collection.get()) {
|
||||||
relaysById.set(id, new Set(relays))
|
relaysById.set(id, new Set(relays))
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -129,17 +166,13 @@ const syncTracker = async () => {
|
|||||||
|
|
||||||
const updateOne = batch(3000, (ids: string[]) => {
|
const updateOne = batch(3000, (ids: string[]) => {
|
||||||
p = p.then(() => {
|
p = p.then(() => {
|
||||||
collectionStorageProvider.add(
|
collection.add(ids.map(id => [id, Array.from(tracker.getRelays(id))]))
|
||||||
"tracker",
|
|
||||||
ids.map(id => [id, Array.from(tracker.getRelays(id))]),
|
|
||||||
)
|
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
const updateAll = throttle(3000, () => {
|
const updateAll = throttle(3000, () => {
|
||||||
p = p.then(() => {
|
p = p.then(() => {
|
||||||
collectionStorageProvider.set(
|
collection.set(
|
||||||
"tracker",
|
|
||||||
Array.from(tracker.relaysById.entries()).map(([id, relays]) => [id, Array.from(relays)]),
|
Array.from(tracker.relaysById.entries()).map(([id, relays]) => [id, Array.from(relays)]),
|
||||||
)
|
)
|
||||||
})
|
})
|
||||||
@@ -159,46 +192,70 @@ const syncTracker = async () => {
|
|||||||
}
|
}
|
||||||
|
|
||||||
const syncRelays = async () => {
|
const syncRelays = async () => {
|
||||||
relays.set(await collectionStorageProvider.get<Relay>("relays"))
|
const collection = new Collection<Relay>({
|
||||||
|
table: "relays",
|
||||||
return throttled(3000, relays).subscribe($relays => {
|
shards: Array.from("0123456789"),
|
||||||
collectionStorageProvider.set("relays", $relays)
|
getShard: (item: Relay) => last(hash(item.url)),
|
||||||
})
|
})
|
||||||
|
|
||||||
|
relays.set(await collection.get())
|
||||||
|
|
||||||
|
return throttled(3000, relays).subscribe(collection.set)
|
||||||
}
|
}
|
||||||
|
|
||||||
const syncHandles = async () => {
|
const syncHandles = async () => {
|
||||||
handles.set(await collectionStorageProvider.get<Handle>("handles"))
|
const collection = new Collection<Handle>({
|
||||||
|
table: "handles",
|
||||||
|
shards: Array.from("0123456789"),
|
||||||
|
getShard: (item: Handle) => last(hash(item.nip05)),
|
||||||
|
})
|
||||||
|
|
||||||
return onHandle(
|
handles.set(await collection.get())
|
||||||
batch(3000, async $handles => {
|
|
||||||
await collectionStorageProvider.add("handles", $handles)
|
return onHandle(batch(3000, collection.add))
|
||||||
}),
|
|
||||||
)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
const syncZappers = async () => {
|
const syncZappers = async () => {
|
||||||
zappers.set(await collectionStorageProvider.get<Zapper>("zappers"))
|
const collection = new Collection<Zapper>({
|
||||||
|
table: "zappers",
|
||||||
|
shards: Array.from("0123456789"),
|
||||||
|
getShard: (item: Zapper) => last(hash(item.lnurl)),
|
||||||
|
})
|
||||||
|
|
||||||
return onZapper(
|
zappers.set(await collection.get())
|
||||||
batch(3000, async $zappers => {
|
|
||||||
await collectionStorageProvider.add("zappers", $zappers)
|
return onZapper(batch(3000, collection.add))
|
||||||
}),
|
|
||||||
)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type FreshnessItem = [string, number]
|
||||||
|
|
||||||
const syncFreshness = async () => {
|
const syncFreshness = async () => {
|
||||||
freshness.set(fromPairs(await collectionStorageProvider.get<[string, number]>("freshness")))
|
const collection = new Collection<FreshnessItem>({
|
||||||
|
table: "freshness",
|
||||||
|
shards: ["0"],
|
||||||
|
getShard: always("0"),
|
||||||
|
})
|
||||||
|
|
||||||
|
freshness.set(fromPairs(await collection.get()))
|
||||||
|
|
||||||
return throttled(3000, freshness).subscribe($freshness => {
|
return throttled(3000, freshness).subscribe($freshness => {
|
||||||
collectionStorageProvider.set("freshness", Object.entries($freshness))
|
collection.set(Object.entries($freshness))
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type PlaintextItem = [string, string]
|
||||||
|
|
||||||
const syncPlaintext = async () => {
|
const syncPlaintext = async () => {
|
||||||
plaintext.set(fromPairs(await collectionStorageProvider.get<[string, string]>("plaintext")))
|
const collection = new Collection<PlaintextItem>({
|
||||||
|
table: "plaintext",
|
||||||
|
shards: ["0"],
|
||||||
|
getShard: always("0"),
|
||||||
|
})
|
||||||
|
|
||||||
|
plaintext.set(fromPairs(await collection.get()))
|
||||||
|
|
||||||
return throttled(3000, plaintext).subscribe($plaintext => {
|
return throttled(3000, plaintext).subscribe($plaintext => {
|
||||||
collectionStorageProvider.set("plaintext", Object.entries($plaintext))
|
collection.set(Object.entries($plaintext))
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
+62
-48
@@ -1,3 +1,4 @@
|
|||||||
|
import {flatten, identity, groupBy} from "@welshman/lib"
|
||||||
import {type StorageProvider} from "@welshman/store"
|
import {type StorageProvider} from "@welshman/store"
|
||||||
import {Preferences} from "@capacitor/preferences"
|
import {Preferences} from "@capacitor/preferences"
|
||||||
import {Encoding, Filesystem, Directory} from "@capacitor/filesystem"
|
import {Encoding, Filesystem, Directory} from "@capacitor/filesystem"
|
||||||
@@ -30,79 +31,92 @@ export class PreferencesStorageProvider implements StorageProvider {
|
|||||||
|
|
||||||
export const preferencesStorageProvider = new PreferencesStorageProvider()
|
export const preferencesStorageProvider = new PreferencesStorageProvider()
|
||||||
|
|
||||||
export class CollectionStorageProvider implements StorageProvider {
|
export type CollectionOptions<T> = {
|
||||||
|
table: string
|
||||||
|
shards: string[]
|
||||||
|
getShard: (item: T) => string
|
||||||
|
}
|
||||||
|
|
||||||
|
export class Collection<T> {
|
||||||
p = Promise.resolve()
|
p = Promise.resolve()
|
||||||
|
|
||||||
get = async <T>(key: string): Promise<T[]> => {
|
constructor(readonly options: CollectionOptions<T>) {}
|
||||||
|
|
||||||
|
static clearAll = async (): Promise<void> => {
|
||||||
|
const res = await Filesystem.readdir({
|
||||||
|
path: "",
|
||||||
|
directory: Directory.Data,
|
||||||
|
})
|
||||||
|
|
||||||
|
await Promise.all(
|
||||||
|
res.files.map(file =>
|
||||||
|
Filesystem.deleteFile({
|
||||||
|
path: file.name,
|
||||||
|
directory: Directory.Data,
|
||||||
|
}),
|
||||||
|
),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
#then = async (f: () => Promise<void>) => {
|
||||||
|
this.p = this.p.then(f).catch(e => {
|
||||||
|
console.error(e)
|
||||||
|
})
|
||||||
|
|
||||||
|
await this.p
|
||||||
|
}
|
||||||
|
|
||||||
|
#path = (shard: string) => `collection_${this.options.table}_${shard}.json`
|
||||||
|
|
||||||
|
getShard = async (shard: string): Promise<T[]> => {
|
||||||
try {
|
try {
|
||||||
const file = await Filesystem.readFile({
|
const file = await Filesystem.readFile({
|
||||||
path: key + ".json",
|
path: this.#path(shard),
|
||||||
directory: Directory.Data,
|
directory: Directory.Data,
|
||||||
encoding: Encoding.UTF8,
|
encoding: Encoding.UTF8,
|
||||||
})
|
})
|
||||||
|
|
||||||
const items: T[] = []
|
// Speed things up by parsing only once
|
||||||
for (const line of file.data.toString().split("\n")) {
|
return JSON.parse("[" + file.data.toString().split("\n").filter(identity).join(",") + "]")
|
||||||
try {
|
|
||||||
items.push(JSON.parse(line))
|
|
||||||
} catch (e) {
|
|
||||||
// pass
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return items
|
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
// file doesn't exist, or isn't valid json
|
// file doesn't exist, or isn't valid json
|
||||||
return []
|
return []
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
set = async <T>(key: string, value: T[]): Promise<void> => {
|
get = async (): Promise<T[]> => flatten(await Promise.all(this.options.shards.map(this.getShard)))
|
||||||
this.p = this.p.then(async () => {
|
|
||||||
|
setShard = (shard: string, items: T[]) =>
|
||||||
|
this.#then(async () => {
|
||||||
await Filesystem.writeFile({
|
await Filesystem.writeFile({
|
||||||
path: key + ".json",
|
path: this.#path(shard),
|
||||||
directory: Directory.Data,
|
directory: Directory.Data,
|
||||||
encoding: Encoding.UTF8,
|
encoding: Encoding.UTF8,
|
||||||
data: value.map(v => JSON.stringify(v)).join("\n"),
|
data: items.map(v => JSON.stringify(v)).join("\n"),
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
await this.p
|
set = (items: T[]) =>
|
||||||
}
|
Promise.all(
|
||||||
|
Array.from(groupBy(this.options.getShard, items)).map(([shard, chunk]) =>
|
||||||
|
this.setShard(shard, chunk),
|
||||||
|
),
|
||||||
|
)
|
||||||
|
|
||||||
add = async <T>(key: string, value: T[]): Promise<void> => {
|
addToShard = (shard: string, items: T[]) =>
|
||||||
this.p = this.p.then(async () => {
|
this.#then(async () => {
|
||||||
await Filesystem.appendFile({
|
await Filesystem.appendFile({
|
||||||
path: key + ".json",
|
path: this.#path(shard),
|
||||||
directory: Directory.Data,
|
directory: Directory.Data,
|
||||||
encoding: Encoding.UTF8,
|
encoding: Encoding.UTF8,
|
||||||
data: "\n" + value.map(v => JSON.stringify(v)).join("\n"),
|
data: "\n" + items.map(v => JSON.stringify(v)).join("\n"),
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
await this.p
|
add = (items: T[]) =>
|
||||||
}
|
Promise.all(
|
||||||
|
Array.from(groupBy(this.options.getShard, items)).map(([shard, chunk]) =>
|
||||||
clear = async (): Promise<void> => {
|
this.addToShard(shard, chunk),
|
||||||
this.p = this.p.then(async () => {
|
),
|
||||||
try {
|
)
|
||||||
const res = await Filesystem.readdir({path: "./", directory: Directory.Data})
|
|
||||||
|
|
||||||
await Promise.all(
|
|
||||||
res.files.map(file =>
|
|
||||||
Filesystem.deleteFile({
|
|
||||||
path: file.name + ".json",
|
|
||||||
directory: Directory.Data,
|
|
||||||
}),
|
|
||||||
),
|
|
||||||
)
|
|
||||||
} catch (e) {
|
|
||||||
// Directory might not have been created yet
|
|
||||||
}
|
|
||||||
})
|
|
||||||
|
|
||||||
await this.p
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
export const collectionStorageProvider = new CollectionStorageProvider()
|
|
||||||
|
|||||||
Reference in New Issue
Block a user