Fix a few bugs, improve storage adapter
This commit is contained in:
@@ -20,7 +20,7 @@ export const collection = <T, LoadArgs extends any[]>({
|
||||
|
||||
const loadItem = async (key: string, ...args: LoadArgs) => {
|
||||
const item = indexStore.get().get(key)
|
||||
const delta = item ? 3600 : 300
|
||||
const delta = item ? 3600 : 30
|
||||
|
||||
if (getFreshness(name, key) > now() - delta) {
|
||||
return item
|
||||
|
||||
@@ -58,8 +58,15 @@ export const {
|
||||
export const deriveHandleForPubkey = (pubkey: string, request: Partial<SubscribeRequest> = {}) =>
|
||||
derived(
|
||||
[handlesByNip05, deriveProfile(pubkey, request)],
|
||||
([$handlesByNip05, $profile]) =>
|
||||
$profile?.nip05 ? $handlesByNip05.get($profile?.nip05) : undefined
|
||||
([$handlesByNip05, $profile]) => {
|
||||
if (!$profile?.nip05) {
|
||||
return undefined
|
||||
}
|
||||
|
||||
loadHandle($profile.nip05)
|
||||
|
||||
return $handlesByNip05.get($profile?.nip05)
|
||||
}
|
||||
)
|
||||
|
||||
export const displayHandle = (handle: Handle) =>
|
||||
|
||||
@@ -25,11 +25,13 @@ import {onAuth} from './session'
|
||||
|
||||
export function* optimizeSubscriptions(subs: Subscription[]) {
|
||||
const [withRelays, withoutRelays] = partition(sub => sub.request.relays.length > 0, subs)
|
||||
const filters = unionFilters(withoutRelays.flatMap(sub => sub.request.filters))
|
||||
|
||||
yield* defaultOptimizeSubscriptions(withRelays)
|
||||
yield* getFilterSelections(
|
||||
unionFilters(withoutRelays.flatMap(sub => sub.request.filters))
|
||||
)
|
||||
|
||||
if (filters.length > 0) {
|
||||
yield* getFilterSelections(filters)
|
||||
}
|
||||
}
|
||||
|
||||
Object.assign(NetworkContext, {
|
||||
|
||||
@@ -99,7 +99,7 @@ export const relaySearch = derived(relays, $relays =>
|
||||
type RelayStatsUpdate = [string, (stats: RelayStats) => void]
|
||||
|
||||
const updateRelayStats = batch(500, (updates: RelayStatsUpdate[]) => {
|
||||
relays.update(($relays: Relay[]) => {
|
||||
relays.update($relays => {
|
||||
const $relaysByUrl = indexBy(r => r.url, $relays)
|
||||
const $itemsByUrl = groupBy(([url]) => url, updates)
|
||||
|
||||
@@ -125,13 +125,9 @@ const onConnectionSend = ({url}: Connection, socketMessage: SocketMessage) => {
|
||||
const [verb] = asMessage(socketMessage)
|
||||
|
||||
if (verb === 'REQ') {
|
||||
updateRelayStats([url, stats => {
|
||||
stats.request_count = stats.request_count + 1
|
||||
}])
|
||||
updateRelayStats([url, stats => ++stats.request_count])
|
||||
} else if (verb === 'EVENT') {
|
||||
updateRelayStats([url, stats => {
|
||||
stats.publish_count = stats.publish_count + 1
|
||||
}])
|
||||
updateRelayStats([url, stats => ++stats.publish_count])
|
||||
}
|
||||
}
|
||||
|
||||
@@ -139,9 +135,7 @@ const onConnectionReceive = ({url}: Connection, socketMessage: SocketMessage) =>
|
||||
const [verb] = asMessage(socketMessage)
|
||||
|
||||
if (verb === 'EVENT') {
|
||||
updateRelayStats([url, stats => {
|
||||
stats.event_count = stats.event_count + 1
|
||||
}])
|
||||
updateRelayStats([url, stats => ++stats.event_count])
|
||||
} else if (verb === 'OK') {
|
||||
updateRelayStats([url, stats => {
|
||||
stats.last_auth_status = AuthStatus.Ok
|
||||
@@ -159,9 +153,7 @@ const onConnectionFault = ({url}: Connection) =>
|
||||
}])
|
||||
|
||||
export const trackRelayStats = (connection: Connection) => {
|
||||
updateRelayStats([connection.url, stats => {
|
||||
stats.connect_count = stats.connect_count + 1
|
||||
}])
|
||||
updateRelayStats([connection.url, stats => ++stats.connect_count])
|
||||
|
||||
connection.on('send', onConnectionSend)
|
||||
connection.on('receive', onConnectionReceive)
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
import {
|
||||
intersection, first, switcher, throttleWithValue, clamp, last, splitAt, identity, sortBy, uniq, shuffle,
|
||||
pushToMapKey,
|
||||
pushToMapKey, now,
|
||||
} from '@welshman/lib'
|
||||
import {
|
||||
Tags, getFilterId, unionFilters, isShareableRelayUrl, isCommunityAddress, isGroupAddress, isContextAddress,
|
||||
@@ -401,19 +401,19 @@ export const getRelayQuality = (url: string) => {
|
||||
if (!connection) {
|
||||
const lastFault = last(recent_errors) || 0
|
||||
|
||||
if (recent_errors.filter(n => n > Date.now() - oneHour).length > 10) {
|
||||
if (recent_errors.filter(n => n > now() - oneHour).length > 10) {
|
||||
return 0
|
||||
}
|
||||
|
||||
if (recent_errors.filter(n => n > Date.now() - oneDay).length > 50) {
|
||||
if (recent_errors.filter(n => n > now() - oneDay).length > 50) {
|
||||
return 0
|
||||
}
|
||||
|
||||
if (recent_errors.filter(n => n > Date.now() - oneWeek).length > 100) {
|
||||
if (recent_errors.filter(n => n > now() - oneWeek).length > 100) {
|
||||
return 0
|
||||
}
|
||||
|
||||
return Math.max(0, Math.min(0.5, (Date.now() - oneMinute - lastFault) / oneHour))
|
||||
return Math.max(0, Math.min(0.5, (now() - oneMinute - lastFault) / oneHour))
|
||||
}
|
||||
|
||||
return switcher(connection.meta.getStatus(), {
|
||||
|
||||
+28
-15
@@ -3,8 +3,8 @@ import type {IDBPDatabase} from "idb"
|
||||
import {throttle} from "throttle-debounce"
|
||||
import {writable} from "svelte/store"
|
||||
import type {Unsubscriber, Writable} from "svelte/store"
|
||||
import {randomInt} from "@welshman/lib"
|
||||
import {withGetter} from "@welshman/store"
|
||||
import {randomInt, fromPairs} from "@welshman/lib"
|
||||
import {withGetter, adapter} from "@welshman/store"
|
||||
|
||||
export type Item = Record<string, any>
|
||||
|
||||
@@ -19,8 +19,6 @@ export const dead = withGetter(writable(false))
|
||||
|
||||
export const subs: Unsubscriber[] = []
|
||||
|
||||
export const DB_NAME = "flotilla"
|
||||
|
||||
export const getAll = async (name: string) => {
|
||||
const tx = db.transaction(name, "readwrite")
|
||||
const store = tx.objectStore(name)
|
||||
@@ -48,22 +46,20 @@ export const bulkDelete = async (name: string, ids: string[]) => {
|
||||
}
|
||||
|
||||
export const initIndexedDbAdapter = async (name: string, adapter: IndexedDbAdapter) => {
|
||||
let copy = await getAll(name)
|
||||
let prevRecords = await getAll(name)
|
||||
|
||||
adapter.store.set(copy)
|
||||
adapter.store.set(prevRecords)
|
||||
|
||||
adapter.store.subscribe(
|
||||
throttle(randomInt(3000, 5000), async (data: Item[]) => {
|
||||
throttle(randomInt(3000, 5000), async (newRecords: Item[]) => {
|
||||
if (dead.get()) {
|
||||
return
|
||||
}
|
||||
|
||||
const prevIds = new Set(copy.map(item => item[adapter.keyPath]))
|
||||
const currentIds = new Set(data.map(item => item[adapter.keyPath]))
|
||||
const newRecords = data.filter(r => !prevIds.has(r[adapter.keyPath]))
|
||||
const removedRecords = copy.filter(r => !currentIds.has(r[adapter.keyPath]))
|
||||
const currentIds = new Set(newRecords.map(item => item[adapter.keyPath]))
|
||||
const removedRecords = prevRecords.filter(r => !currentIds.has(r[adapter.keyPath]))
|
||||
|
||||
copy = data
|
||||
prevRecords = newRecords
|
||||
|
||||
if (newRecords.length > 0) {
|
||||
await bulkPut(name, newRecords)
|
||||
@@ -79,12 +75,16 @@ export const initIndexedDbAdapter = async (name: string, adapter: IndexedDbAdapt
|
||||
)
|
||||
}
|
||||
|
||||
export const initStorage = async (version: number, adapters: Record<string, IndexedDbAdapter>) => {
|
||||
export const initStorage = async (name: string, version: number, adapters: Record<string, IndexedDbAdapter>) => {
|
||||
if (!window.indexedDB) return
|
||||
|
||||
window.addEventListener("beforeunload", () => closeStorage())
|
||||
|
||||
db = await openDB(DB_NAME, version, {
|
||||
if (db) {
|
||||
throw new Error("Db initialized multiple times")
|
||||
}
|
||||
|
||||
db = await openDB(name, version, {
|
||||
upgrade(db: IDBPDatabase) {
|
||||
const names = Object.keys(adapters)
|
||||
|
||||
@@ -117,5 +117,18 @@ export const closeStorage = async () => {
|
||||
|
||||
export const clearStorage = async () => {
|
||||
await closeStorage()
|
||||
await deleteDB(DB_NAME)
|
||||
await deleteDB(db.name)
|
||||
}
|
||||
|
||||
export const storageAdapters = {
|
||||
fromObjectStore: <T>(store: Writable<Record<string, T>>) => ({
|
||||
keyPath: "key",
|
||||
store: adapter({
|
||||
store: store,
|
||||
forward: ($data: Record<string, T>) =>
|
||||
Object.entries($data).map(([key, value]) => ({key, value})),
|
||||
backward: (data: {key: string, value: T}[]) =>
|
||||
fromPairs(data.map(({key, value}) => [key, value])),
|
||||
}),
|
||||
}),
|
||||
}
|
||||
|
||||
@@ -54,7 +54,14 @@ export const {
|
||||
export const deriveZapperForPubkey = (pubkey: string, request: Partial<SubscribeRequest> = {}) =>
|
||||
derived(
|
||||
[zappersByLnurl, deriveProfile(pubkey, request)],
|
||||
([$zappersByLnurl, $profile]) =>
|
||||
$profile?.lnurl ? $zappersByLnurl.get($profile?.lnurl) : undefined
|
||||
([$zappersByLnurl, $profile]) => {
|
||||
if (!$profile?.lnurl) {
|
||||
return undefined
|
||||
}
|
||||
|
||||
loadZapper($profile.lnurl)
|
||||
|
||||
return $zappersByLnurl.get($profile?.lnurl)
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
@@ -254,6 +254,8 @@ export const nth = (i: number) => <T>(xs: T[], ...args: unknown[]) => xs[i]
|
||||
|
||||
export const nthEq = (i: number, v: any) => (xs: any[], ...args: unknown[]) => xs[i] === v
|
||||
|
||||
export const nthNe = (i: number, v: any) => (xs: any[], ...args: unknown[]) => xs[i] !== v
|
||||
|
||||
export const eq = <T>(v: T) => (x: T) => x === v
|
||||
|
||||
export const ne = <T>(v: T) => (x: T) => x !== v
|
||||
|
||||
Reference in New Issue
Block a user