forked from coracle/flotilla
Push shards into storage lib
This commit is contained in:
@@ -59,7 +59,7 @@
|
|||||||
const profile = deriveProfile(event.pubkey, [url])
|
const profile = deriveProfile(event.pubkey, [url])
|
||||||
const profileDisplay = deriveProfileDisplay(event.pubkey, [url])
|
const profileDisplay = deriveProfileDisplay(event.pubkey, [url])
|
||||||
const thunk = mergeThunks($thunks.filter(t => t.event.id === event.id))
|
const thunk = mergeThunks($thunks.filter(t => t.event.id === event.id))
|
||||||
const [_, colorValue] = colors[parseInt(hash(event.pubkey)) % colors.length]
|
const [_, colorValue] = colors[hash(event.pubkey) % colors.length]
|
||||||
const comments = deriveEventsForUrl(url, [{kinds: [COMMENT], "#e": [event.id]}])
|
const comments = deriveEventsForUrl(url, [{kinds: [COMMENT], "#e": [event.id]}])
|
||||||
|
|
||||||
const reply = () => replyTo!(event)
|
const reply = () => replyTo!(event)
|
||||||
|
|||||||
@@ -40,7 +40,7 @@
|
|||||||
const profile = deriveProfile(event.pubkey)
|
const profile = deriveProfile(event.pubkey)
|
||||||
const profileDisplay = deriveProfileDisplay(event.pubkey)
|
const profileDisplay = deriveProfileDisplay(event.pubkey)
|
||||||
const thunk = mergeThunks($thunks.filter(t => t.event.id === event.id))
|
const thunk = mergeThunks($thunks.filter(t => t.event.id === event.id))
|
||||||
const [_, colorValue] = colors[parseInt(hash(event.pubkey)) % colors.length]
|
const [_, colorValue] = colors[hash(event.pubkey) % colors.length]
|
||||||
|
|
||||||
const reply = () => replyTo(event)
|
const reply = () => replyTo(event)
|
||||||
|
|
||||||
|
|||||||
@@ -10,6 +10,7 @@ import {
|
|||||||
uniqBy,
|
uniqBy,
|
||||||
sortBy,
|
sortBy,
|
||||||
sort,
|
sort,
|
||||||
|
prop,
|
||||||
uniq,
|
uniq,
|
||||||
nth,
|
nth,
|
||||||
pushToMapKey,
|
pushToMapKey,
|
||||||
@@ -524,7 +525,7 @@ export const chats = derived(
|
|||||||
c => -c.last_activity,
|
c => -c.last_activity,
|
||||||
Array.from(messagesByChatId.entries()).map(([id, events]): Chat => {
|
Array.from(messagesByChatId.entries()).map(([id, events]): Chat => {
|
||||||
const pubkeys = remove($pubkey!, splitChatId(id))
|
const pubkeys = remove($pubkey!, splitChatId(id))
|
||||||
const messages = sortBy(e => -e.created_at, events)
|
const messages = sortBy(e => -e.created_at, uniqBy(prop("id"), events))
|
||||||
const last_activity = messages[0].created_at
|
const last_activity = messages[0].created_at
|
||||||
const search_text =
|
const search_text =
|
||||||
pubkeys.length === 0
|
pubkeys.length === 0
|
||||||
|
|||||||
+21
-64
@@ -1,16 +1,4 @@
|
|||||||
import {
|
import {prop, first, call, on, groupBy, throttle, fromPairs, batch, sortBy, concat} from "@welshman/lib"
|
||||||
always,
|
|
||||||
call,
|
|
||||||
on,
|
|
||||||
hash,
|
|
||||||
last,
|
|
||||||
groupBy,
|
|
||||||
throttle,
|
|
||||||
fromPairs,
|
|
||||||
batch,
|
|
||||||
sortBy,
|
|
||||||
concat,
|
|
||||||
} from "@welshman/lib"
|
|
||||||
import {throttled, freshness} from "@welshman/store"
|
import {throttled, freshness} from "@welshman/store"
|
||||||
import {
|
import {
|
||||||
PROFILE,
|
PROFILE,
|
||||||
@@ -50,11 +38,7 @@ import {
|
|||||||
import {Collection} from "@lib/storage"
|
import {Collection} from "@lib/storage"
|
||||||
|
|
||||||
const syncEvents = async () => {
|
const syncEvents = async () => {
|
||||||
const collection = new Collection<TrustedEvent>({
|
const collection = new Collection<TrustedEvent>({table: "events", getId: prop("id")})
|
||||||
table: "events",
|
|
||||||
shards: Array.from("0123456789abcdef"),
|
|
||||||
getShard: (event: TrustedEvent) => last(event.id),
|
|
||||||
})
|
|
||||||
|
|
||||||
const initialEvents = await collection.get()
|
const initialEvents = await collection.get()
|
||||||
|
|
||||||
@@ -131,8 +115,8 @@ const syncEvents = async () => {
|
|||||||
if (removed.size > 0) {
|
if (removed.size > 0) {
|
||||||
added = added.filter(e => !removed.has(e.id))
|
added = added.filter(e => !removed.has(e.id))
|
||||||
|
|
||||||
const removedByShard = groupBy(id => last(id), removed)
|
const removedByShard = groupBy(id => collection.getShardId(id), removed)
|
||||||
const addedByShard = groupBy(e => last(e.id), added)
|
const addedByShard = groupBy(collection.getShardIdFromItem, added)
|
||||||
const shards = new Set([...removedByShard.keys(), ...addedByShard.keys()])
|
const shards = new Set([...removedByShard.keys(), ...addedByShard.keys()])
|
||||||
|
|
||||||
for (const shard of shards) {
|
for (const shard of shards) {
|
||||||
@@ -141,7 +125,7 @@ const syncEvents = async () => {
|
|||||||
const current = await collection.getShard(shard)
|
const current = await collection.getShard(shard)
|
||||||
const filtered = current.filter(e => !removedInShard?.includes(e.id))
|
const filtered = current.filter(e => !removedInShard?.includes(e.id))
|
||||||
const sorted = sortBy(e => -rankEvent(e), concat(filtered, addedInShard))
|
const sorted = sortBy(e => -rankEvent(e), concat(filtered, addedInShard))
|
||||||
const pruned = sorted.slice(0, 10_000)
|
const pruned = sorted.slice(0, 1000)
|
||||||
|
|
||||||
await collection.setShard(shard, pruned)
|
await collection.setShard(shard, pruned)
|
||||||
}
|
}
|
||||||
@@ -155,11 +139,7 @@ const syncEvents = async () => {
|
|||||||
type TrackerItem = [string, string[]]
|
type TrackerItem = [string, string[]]
|
||||||
|
|
||||||
const syncTracker = async () => {
|
const syncTracker = async () => {
|
||||||
const collection = new Collection<TrackerItem>({
|
const collection = new Collection<TrackerItem>({table: "tracker", getId: first})
|
||||||
table: "tracker",
|
|
||||||
shards: Array.from("0123456789abcdef"),
|
|
||||||
getShard: (item: TrackerItem) => last(item[0]),
|
|
||||||
})
|
|
||||||
|
|
||||||
const relaysById = new Map<string, Set<string>>()
|
const relaysById = new Map<string, Set<string>>()
|
||||||
|
|
||||||
@@ -193,11 +173,7 @@ const syncTracker = async () => {
|
|||||||
}
|
}
|
||||||
|
|
||||||
const syncRelays = async () => {
|
const syncRelays = async () => {
|
||||||
const collection = new Collection<Relay>({
|
const collection = new Collection<Relay>({table: "relays", getId: prop("url")})
|
||||||
table: "relays",
|
|
||||||
shards: Array.from("0123456789"),
|
|
||||||
getShard: (item: Relay) => last(hash(item.url)),
|
|
||||||
})
|
|
||||||
|
|
||||||
relays.set(await collection.get())
|
relays.set(await collection.get())
|
||||||
|
|
||||||
@@ -205,11 +181,7 @@ const syncRelays = async () => {
|
|||||||
}
|
}
|
||||||
|
|
||||||
const syncHandles = async () => {
|
const syncHandles = async () => {
|
||||||
const collection = new Collection<Handle>({
|
const collection = new Collection<Handle>({table: "handles", getId: prop("nip05")})
|
||||||
table: "handles",
|
|
||||||
shards: Array.from("0123456789"),
|
|
||||||
getShard: (item: Handle) => last(hash(item.nip05)),
|
|
||||||
})
|
|
||||||
|
|
||||||
handles.set(await collection.get())
|
handles.set(await collection.get())
|
||||||
|
|
||||||
@@ -217,11 +189,7 @@ const syncHandles = async () => {
|
|||||||
}
|
}
|
||||||
|
|
||||||
const syncZappers = async () => {
|
const syncZappers = async () => {
|
||||||
const collection = new Collection<Zapper>({
|
const collection = new Collection<Zapper>({table: "zappers", getId: prop("lnurl")})
|
||||||
table: "zappers",
|
|
||||||
shards: Array.from("0123456789"),
|
|
||||||
getShard: (item: Zapper) => last(hash(item.lnurl)),
|
|
||||||
})
|
|
||||||
|
|
||||||
zappers.set(await collection.get())
|
zappers.set(await collection.get())
|
||||||
|
|
||||||
@@ -231,11 +199,7 @@ const syncZappers = async () => {
|
|||||||
type FreshnessItem = [string, number]
|
type FreshnessItem = [string, number]
|
||||||
|
|
||||||
const syncFreshness = async () => {
|
const syncFreshness = async () => {
|
||||||
const collection = new Collection<FreshnessItem>({
|
const collection = new Collection<FreshnessItem>({table: "freshness", getId: first})
|
||||||
table: "freshness",
|
|
||||||
shards: ["0"],
|
|
||||||
getShard: always("0"),
|
|
||||||
})
|
|
||||||
|
|
||||||
freshness.set(fromPairs(await collection.get()))
|
freshness.set(fromPairs(await collection.get()))
|
||||||
|
|
||||||
@@ -247,11 +211,7 @@ const syncFreshness = async () => {
|
|||||||
type PlaintextItem = [string, string]
|
type PlaintextItem = [string, string]
|
||||||
|
|
||||||
const syncPlaintext = async () => {
|
const syncPlaintext = async () => {
|
||||||
const collection = new Collection<PlaintextItem>({
|
const collection = new Collection<PlaintextItem>({table: "plaintext", getId: first})
|
||||||
table: "plaintext",
|
|
||||||
shards: ["0"],
|
|
||||||
getShard: always("0"),
|
|
||||||
})
|
|
||||||
|
|
||||||
plaintext.set(fromPairs(await collection.get()))
|
plaintext.set(fromPairs(await collection.get()))
|
||||||
|
|
||||||
@@ -261,11 +221,7 @@ const syncPlaintext = async () => {
|
|||||||
}
|
}
|
||||||
|
|
||||||
const syncWrapManager = async () => {
|
const syncWrapManager = async () => {
|
||||||
const collection = new Collection<WrapItem>({
|
const collection = new Collection<WrapItem>({table: "wraps", getId: prop("id")})
|
||||||
table: "wraps",
|
|
||||||
shards: Array.from("0123456789abcdef"),
|
|
||||||
getShard: (item: WrapItem) => last(hash(item.id)),
|
|
||||||
})
|
|
||||||
|
|
||||||
wrapManager.load(await collection.get())
|
wrapManager.load(await collection.get())
|
||||||
|
|
||||||
@@ -283,15 +239,16 @@ const syncWrapManager = async () => {
|
|||||||
}
|
}
|
||||||
|
|
||||||
export const syncDataStores = async () => {
|
export const syncDataStores = async () => {
|
||||||
|
const t = Date.now()
|
||||||
const unsubscribers = await Promise.all([
|
const unsubscribers = await Promise.all([
|
||||||
syncEvents(),
|
syncEvents().then(f => console.log("syncEvents", Date.now() - t) || f),
|
||||||
syncTracker(),
|
syncTracker().then(f => console.log("syncTracker", Date.now() - t) || f),
|
||||||
syncRelays(),
|
syncRelays().then(f => console.log("syncRelays", Date.now() - t) || f),
|
||||||
syncHandles(),
|
syncHandles().then(f => console.log("syncHandles", Date.now() - t) || f),
|
||||||
syncZappers(),
|
syncZappers().then(f => console.log("syncZappers", Date.now() - t) || f),
|
||||||
syncFreshness(),
|
syncFreshness().then(f => console.log("syncFreshness", Date.now() - t) || f),
|
||||||
syncPlaintext(),
|
syncPlaintext().then(f => console.log("syncPlaintext", Date.now() - t) || f),
|
||||||
syncWrapManager(),
|
syncWrapManager().then(f => console.log("syncWrapManager", Date.now() - t) || f),
|
||||||
])
|
])
|
||||||
|
|
||||||
return () => unsubscribers.forEach(call)
|
return () => unsubscribers.forEach(call)
|
||||||
|
|||||||
+46
-42
@@ -1,4 +1,4 @@
|
|||||||
import {flatten, identity, groupBy} from "@welshman/lib"
|
import {hash, range, reject, flatten, identity, groupBy} from "@welshman/lib"
|
||||||
import {type StorageProvider} from "@welshman/store"
|
import {type StorageProvider} from "@welshman/store"
|
||||||
import {Preferences} from "@capacitor/preferences"
|
import {Preferences} from "@capacitor/preferences"
|
||||||
import {Encoding, Filesystem, Directory} from "@capacitor/filesystem"
|
import {Encoding, Filesystem, Directory} from "@capacitor/filesystem"
|
||||||
@@ -33,12 +33,11 @@ export const preferencesStorageProvider = new PreferencesStorageProvider()
|
|||||||
|
|
||||||
export type CollectionOptions<T> = {
|
export type CollectionOptions<T> = {
|
||||||
table: string
|
table: string
|
||||||
shards: string[]
|
getId: (item: T) => string
|
||||||
getShard: (item: T) => string
|
|
||||||
}
|
}
|
||||||
|
|
||||||
export class Collection<T> {
|
export class Collection<T> {
|
||||||
#promises = new Map<string, Promise<any>>()
|
#shardCount = 1000
|
||||||
|
|
||||||
constructor(readonly options: CollectionOptions<T>) {}
|
constructor(readonly options: CollectionOptions<T>) {}
|
||||||
|
|
||||||
@@ -58,67 +57,72 @@ export class Collection<T> {
|
|||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
#then = <R>(shard: string, f: () => Promise<R>) => {
|
getShardIds = () => Array.from(range(0, this.#shardCount))
|
||||||
const oldPromise = this.#promises.get(shard) || Promise.resolve()
|
|
||||||
const newPromise = oldPromise.then(f)
|
|
||||||
|
|
||||||
this.#promises.set(shard, newPromise)
|
getShardId = (id: string) => String(hash(id) % this.#shardCount)
|
||||||
|
|
||||||
return newPromise
|
getShardIdFromItem = (item: T) => this.getShardId(this.options.getId(item))
|
||||||
}
|
|
||||||
|
|
||||||
#path = (shard: string) => `collection_${this.options.table}_${shard}.json`
|
#path = (shard: string) => `collection_${this.options.table}_${shard}.json`
|
||||||
|
|
||||||
getShard = (shard: string): Promise<T[]> =>
|
getShard = async (shard: string): Promise<T[]> => {
|
||||||
this.#then(shard, async () => {
|
try {
|
||||||
try {
|
const file = await Filesystem.readFile({
|
||||||
const file = await Filesystem.readFile({
|
|
||||||
path: this.#path(shard),
|
|
||||||
directory: Directory.Data,
|
|
||||||
encoding: Encoding.UTF8,
|
|
||||||
})
|
|
||||||
|
|
||||||
// Speed things up by parsing only once
|
|
||||||
return JSON.parse("[" + file.data.toString().split("\n").filter(identity).join(",") + "]")
|
|
||||||
} catch (err) {
|
|
||||||
// file doesn't exist, or isn't valid json
|
|
||||||
return []
|
|
||||||
}
|
|
||||||
})
|
|
||||||
|
|
||||||
get = async (): Promise<T[]> => flatten(await Promise.all(this.options.shards.map(this.getShard)))
|
|
||||||
|
|
||||||
setShard = (shard: string, items: T[]) =>
|
|
||||||
this.#then(shard, async () => {
|
|
||||||
await Filesystem.writeFile({
|
|
||||||
path: this.#path(shard),
|
path: this.#path(shard),
|
||||||
directory: Directory.Data,
|
directory: Directory.Data,
|
||||||
encoding: Encoding.UTF8,
|
encoding: Encoding.UTF8,
|
||||||
data: items.map(v => JSON.stringify(v)).join("\n"),
|
|
||||||
})
|
})
|
||||||
|
|
||||||
|
// Speed things up by parsing only once
|
||||||
|
return JSON.parse("[" + file.data.toString().split("\n").filter(identity).join(",") + "]")
|
||||||
|
} catch (err) {
|
||||||
|
// file doesn't exist, or isn't valid json
|
||||||
|
return []
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
get = async (): Promise<T[]> => flatten(await Promise.all(this.getShardIds().map(id => this.getShard(id))))
|
||||||
|
|
||||||
|
setShard = async (shard: string, items: T[]) =>
|
||||||
|
Filesystem.writeFile({
|
||||||
|
path: this.#path(shard),
|
||||||
|
directory: Directory.Data,
|
||||||
|
encoding: Encoding.UTF8,
|
||||||
|
data: items.map(v => JSON.stringify(v)).join("\n"),
|
||||||
})
|
})
|
||||||
|
|
||||||
set = (items: T[]) =>
|
set = (items: T[]) =>
|
||||||
Promise.all(
|
Promise.all(
|
||||||
Array.from(groupBy(this.options.getShard, items)).map(([shard, chunk]) =>
|
Array.from(groupBy(this.getShardIdFromItem, items)).map(([shard, chunk]) =>
|
||||||
this.setShard(shard, chunk),
|
this.setShard(shard, chunk),
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
|
|
||||||
addToShard = (shard: string, items: T[]) =>
|
addToShard = (shard: string, items: T[]) =>
|
||||||
this.#then(shard, async () => {
|
Filesystem.appendFile({
|
||||||
await Filesystem.appendFile({
|
path: this.#path(shard),
|
||||||
path: this.#path(shard),
|
directory: Directory.Data,
|
||||||
directory: Directory.Data,
|
encoding: Encoding.UTF8,
|
||||||
encoding: Encoding.UTF8,
|
data: "\n" + items.map(v => JSON.stringify(v)).join("\n"),
|
||||||
data: "\n" + items.map(v => JSON.stringify(v)).join("\n"),
|
|
||||||
})
|
|
||||||
})
|
})
|
||||||
|
|
||||||
add = (items: T[]) =>
|
add = (items: T[]) =>
|
||||||
Promise.all(
|
Promise.all(
|
||||||
Array.from(groupBy(this.options.getShard, items)).map(([shard, chunk]) =>
|
Array.from(groupBy(this.getShardIdFromItem, items)).map(([shard, chunk]) =>
|
||||||
this.addToShard(shard, chunk),
|
this.addToShard(shard, chunk),
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
removeFromShard = async (shard: string, ids: Set<string>) =>
|
||||||
|
this.setShard(
|
||||||
|
shard,
|
||||||
|
reject(item => ids.has(this.options.getId(item)), await this.getShard(shard)),
|
||||||
|
)
|
||||||
|
|
||||||
|
remove = (ids: Iterable<string>) =>
|
||||||
|
Promise.all(
|
||||||
|
Array.from(groupBy(this.getShardId, ids)).map(([shard, chunk]) =>
|
||||||
|
this.removeFromShard(shard, new Set(chunk)),
|
||||||
|
),
|
||||||
|
)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -147,9 +147,7 @@
|
|||||||
document.documentElement.style["font-size"] = `${$userSettingsValues.font_size}rem`
|
document.documentElement.style["font-size"] = `${$userSettingsValues.font_size}rem`
|
||||||
})
|
})
|
||||||
|
|
||||||
let unsubscribeStorage: () => void
|
const unsubscribeStorage = call(async () => {
|
||||||
|
|
||||||
const ready = call(async () => {
|
|
||||||
// Sync stuff to localstorage
|
// Sync stuff to localstorage
|
||||||
await Promise.all([
|
await Promise.all([
|
||||||
sync({
|
sync({
|
||||||
@@ -170,7 +168,7 @@
|
|||||||
])
|
])
|
||||||
|
|
||||||
// Sync stuff to indexeddb
|
// Sync stuff to indexeddb
|
||||||
unsubscribeStorage = await storage.syncDataStores()
|
return await storage.syncDataStores()
|
||||||
})
|
})
|
||||||
|
|
||||||
// Default socket policies
|
// Default socket policies
|
||||||
@@ -190,7 +188,7 @@
|
|||||||
unsubscribeSignerLog()
|
unsubscribeSignerLog()
|
||||||
unsubscribeTheme()
|
unsubscribeTheme()
|
||||||
unsubscribeSettings()
|
unsubscribeSettings()
|
||||||
unsubscribeStorage?.()
|
unsubscribeStorage.then(call)
|
||||||
defaultSocketPolicies.splice(-additionalPolicies.length)
|
defaultSocketPolicies.splice(-additionalPolicies.length)
|
||||||
})
|
})
|
||||||
</script>
|
</script>
|
||||||
@@ -201,8 +199,8 @@
|
|||||||
{/if}
|
{/if}
|
||||||
</svelte:head>
|
</svelte:head>
|
||||||
|
|
||||||
{#await ready}
|
{#await unsubscribeStorage}
|
||||||
<div></div>
|
<!-- pass -->
|
||||||
{:then}
|
{:then}
|
||||||
<div>
|
<div>
|
||||||
<AppContainer>
|
<AppContainer>
|
||||||
|
|||||||
Reference in New Issue
Block a user