Various optimizations
This commit is contained in:
@@ -1,6 +1,13 @@
|
|||||||
import {writable, derived, Subscriber} from "svelte/store"
|
import {writable, Subscriber} from "svelte/store"
|
||||||
import {tryCatch, fetchJson, batcher, postJson, last} from "@welshman/lib"
|
import {tryCatch, fetchJson, batcher, postJson, last} from "@welshman/lib"
|
||||||
import {getter, deriveItems, makeForceLoadItem, makeLoadItem, makeDeriveItem} from "@welshman/store"
|
import {
|
||||||
|
getter,
|
||||||
|
deriveItems,
|
||||||
|
deriveDeduplicated,
|
||||||
|
makeForceLoadItem,
|
||||||
|
makeLoadItem,
|
||||||
|
makeDeriveItem,
|
||||||
|
} from "@welshman/store"
|
||||||
import {deriveProfile, loadProfile} from "./profiles.js"
|
import {deriveProfile, loadProfile} from "./profiles.js"
|
||||||
import {appContext} from "./context.js"
|
import {appContext} from "./context.js"
|
||||||
|
|
||||||
@@ -123,15 +130,18 @@ export const loadHandleForPubkey = async (pubkey: string, relays: string[] = [])
|
|||||||
export const deriveHandleForPubkey = (pubkey: string, relays: string[] = []) => {
|
export const deriveHandleForPubkey = (pubkey: string, relays: string[] = []) => {
|
||||||
loadHandleForPubkey(pubkey, relays)
|
loadHandleForPubkey(pubkey, relays)
|
||||||
|
|
||||||
return derived([handlesByNip05, deriveProfile(pubkey, relays)], ([$handlesByNip05, $profile]) => {
|
return deriveDeduplicated(
|
||||||
if (!$profile?.nip05) return undefined
|
[handlesByNip05, deriveProfile(pubkey, relays)],
|
||||||
|
([$handlesByNip05, $profile]) => {
|
||||||
|
if (!$profile?.nip05) return undefined
|
||||||
|
|
||||||
const handle = $handlesByNip05.get($profile.nip05)
|
const handle = $handlesByNip05.get($profile.nip05)
|
||||||
|
|
||||||
if (handle?.pubkey !== pubkey) return undefined
|
if (handle?.pubkey !== pubkey) return undefined
|
||||||
|
|
||||||
return handle
|
return handle
|
||||||
})
|
},
|
||||||
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
export const displayNip05 = (nip05: string) =>
|
export const displayNip05 = (nip05: string) =>
|
||||||
|
|||||||
@@ -109,7 +109,7 @@ export const getRelayQuality = (url: string) => {
|
|||||||
|
|
||||||
type RelayStatsUpdate = [string, (stats: RelayStats) => void]
|
type RelayStatsUpdate = [string, (stats: RelayStats) => void]
|
||||||
|
|
||||||
const updateRelayStats = batch(500, (updates: RelayStatsUpdate[]) => {
|
const updateRelayStats = batch(1000, (updates: RelayStatsUpdate[]) => {
|
||||||
relayStatsByUrl.update($relayStatsByUrl => {
|
relayStatsByUrl.update($relayStatsByUrl => {
|
||||||
for (const [url, items] of groupBy(([url]) => url, updates)) {
|
for (const [url, items] of groupBy(([url]) => url, updates)) {
|
||||||
if (!url || !isRelayUrl(url)) {
|
if (!url || !isRelayUrl(url)) {
|
||||||
|
|||||||
@@ -372,7 +372,8 @@ export const waitForThunkCompletion = (thunk: Thunk) =>
|
|||||||
export const thunks = writable<Thunk[]>([])
|
export const thunks = writable<Thunk[]>([])
|
||||||
|
|
||||||
export const thunkQueue = new TaskQueue<Thunk>({
|
export const thunkQueue = new TaskQueue<Thunk>({
|
||||||
batchSize: 50,
|
batchSize: 10,
|
||||||
|
batchDelay: 100,
|
||||||
processItem: (thunk: Thunk) => {
|
processItem: (thunk: Thunk) => {
|
||||||
thunk.publish()
|
thunk.publish()
|
||||||
},
|
},
|
||||||
|
|||||||
@@ -1,5 +1,5 @@
|
|||||||
import {derived, Readable} from "svelte/store"
|
import {derived, Readable} from "svelte/store"
|
||||||
import {ItemsByKey} from "@welshman/store"
|
import {ItemsByKey, deriveDeduplicated} from "@welshman/store"
|
||||||
import {pubkey} from "./session.js"
|
import {pubkey} from "./session.js"
|
||||||
import {profilesByPubkey, forceLoadProfile, loadProfile} from "./profiles.js"
|
import {profilesByPubkey, forceLoadProfile, loadProfile} from "./profiles.js"
|
||||||
import {followListsByPubkey, forceLoadFollowList, loadFollowList} from "./follows.js"
|
import {followListsByPubkey, forceLoadFollowList, loadFollowList} from "./follows.js"
|
||||||
@@ -27,7 +27,7 @@ export const makeUserData = <T>(
|
|||||||
itemsByKey: Readable<ItemsByKey<T>>,
|
itemsByKey: Readable<ItemsByKey<T>>,
|
||||||
onDerive?: (key: string, ...args: any[]) => void,
|
onDerive?: (key: string, ...args: any[]) => void,
|
||||||
) =>
|
) =>
|
||||||
derived([itemsByKey, pubkey], ([$itemsByKey, $pubkey]) => {
|
deriveDeduplicated([itemsByKey, pubkey], ([$itemsByKey, $pubkey]) => {
|
||||||
if (!$pubkey) return undefined
|
if (!$pubkey) return undefined
|
||||||
|
|
||||||
onDerive?.($pubkey)
|
onDerive?.($pubkey)
|
||||||
|
|||||||
@@ -1,4 +1,4 @@
|
|||||||
import {writable, derived, Subscriber} from "svelte/store"
|
import {writable, Subscriber} from "svelte/store"
|
||||||
import {Zapper, TrustedEvent, Zap, getTagValues, getLnUrl, zapFromEvent} from "@welshman/util"
|
import {Zapper, TrustedEvent, Zap, getTagValues, getLnUrl, zapFromEvent} from "@welshman/util"
|
||||||
import {
|
import {
|
||||||
removeUndefined,
|
removeUndefined,
|
||||||
@@ -9,7 +9,14 @@ import {
|
|||||||
batcher,
|
batcher,
|
||||||
postJson,
|
postJson,
|
||||||
} from "@welshman/lib"
|
} from "@welshman/lib"
|
||||||
import {getter, deriveItems, makeForceLoadItem, makeLoadItem, makeDeriveItem} from "@welshman/store"
|
import {
|
||||||
|
getter,
|
||||||
|
deriveItems,
|
||||||
|
deriveDeduplicated,
|
||||||
|
makeForceLoadItem,
|
||||||
|
makeLoadItem,
|
||||||
|
makeDeriveItem,
|
||||||
|
} from "@welshman/store"
|
||||||
import {deriveProfile, loadProfile} from "./profiles.js"
|
import {deriveProfile, loadProfile} from "./profiles.js"
|
||||||
import {appContext} from "./context.js"
|
import {appContext} from "./context.js"
|
||||||
|
|
||||||
@@ -103,9 +110,12 @@ export const loadZapperForPubkey = async (pubkey: string, relays: string[] = [])
|
|||||||
export const deriveZapperForPubkey = (pubkey: string, relays: string[] = []) => {
|
export const deriveZapperForPubkey = (pubkey: string, relays: string[] = []) => {
|
||||||
loadZapperForPubkey(pubkey, relays)
|
loadZapperForPubkey(pubkey, relays)
|
||||||
|
|
||||||
return derived([zappersByLnurl, deriveProfile(pubkey, relays)], ([$zappersByLnurl, $profile]) => {
|
return deriveDeduplicated(
|
||||||
return $profile?.lnurl ? $zappersByLnurl.get($profile.lnurl) : undefined
|
[zappersByLnurl, deriveProfile(pubkey, relays)],
|
||||||
})
|
([$zappersByLnurl, $profile]) => {
|
||||||
|
return $profile?.lnurl ? $zappersByLnurl.get($profile.lnurl) : undefined
|
||||||
|
},
|
||||||
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
export const getLnUrlsForEvent = async (event: TrustedEvent) => {
|
export const getLnUrlsForEvent = async (event: TrustedEvent) => {
|
||||||
|
|||||||
@@ -1,7 +1,8 @@
|
|||||||
import {remove, yieldThread} from "./Tools.js"
|
import {remove} from "./Tools.js"
|
||||||
|
|
||||||
export type TaskQueueOptions<Item> = {
|
export type TaskQueueOptions<Item> = {
|
||||||
batchSize: number
|
batchSize: number
|
||||||
|
batchDelay: number
|
||||||
processItem: (item: Item) => unknown
|
processItem: (item: Item) => unknown
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -30,32 +31,32 @@ export class TaskQueue<Item> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async process() {
|
process() {
|
||||||
if (this.isProcessing || this.isPaused || this.items.length === 0) {
|
if (this.isProcessing || this.isPaused || this.items.length === 0) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
this.isProcessing = true
|
this.isProcessing = true
|
||||||
|
|
||||||
await yieldThread()
|
setTimeout(async () => {
|
||||||
|
for (const item of this.items.splice(0, this.options.batchSize)) {
|
||||||
|
try {
|
||||||
|
for (const subscriber of this._subs) {
|
||||||
|
subscriber(item)
|
||||||
|
}
|
||||||
|
|
||||||
for (const item of this.items.splice(0, this.options.batchSize)) {
|
await this.options.processItem(item)
|
||||||
try {
|
} catch (e) {
|
||||||
for (const subscriber of this._subs) {
|
console.error(e)
|
||||||
subscriber(item)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
await this.options.processItem(item)
|
|
||||||
} catch (e) {
|
|
||||||
console.error(e)
|
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
this.isProcessing = false
|
this.isProcessing = false
|
||||||
|
|
||||||
if (this.items.length > 0) {
|
if (this.items.length > 0) {
|
||||||
this.process()
|
this.process()
|
||||||
}
|
}
|
||||||
|
}, this.options.batchDelay)
|
||||||
}
|
}
|
||||||
|
|
||||||
stop() {
|
stop() {
|
||||||
|
|||||||
@@ -1257,11 +1257,9 @@ export const batch = <T>(t: number, f: (xs: T[]) => void) => {
|
|||||||
}
|
}
|
||||||
|
|
||||||
return (x: T) => {
|
return (x: T) => {
|
||||||
const shouldFlush = timeoutId === undefined
|
|
||||||
|
|
||||||
xs.push(x)
|
xs.push(x)
|
||||||
|
|
||||||
if (shouldFlush) {
|
if (!timeoutId) {
|
||||||
f(xs.splice(0))
|
f(xs.splice(0))
|
||||||
timeoutId = setTimeout(later, t)
|
timeoutId = setTimeout(later, t)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -32,6 +32,7 @@ export type SocketEvents = {
|
|||||||
|
|
||||||
export class Socket extends EventEmitter {
|
export class Socket extends EventEmitter {
|
||||||
static batchSize = 10
|
static batchSize = 10
|
||||||
|
static batchDelay = 50
|
||||||
|
|
||||||
auth: AuthState
|
auth: AuthState
|
||||||
status = SocketStatus.Closed
|
status = SocketStatus.Closed
|
||||||
@@ -47,6 +48,7 @@ export class Socket extends EventEmitter {
|
|||||||
|
|
||||||
this._sendQueue = new TaskQueue<ClientMessage>({
|
this._sendQueue = new TaskQueue<ClientMessage>({
|
||||||
batchSize: Socket.batchSize,
|
batchSize: Socket.batchSize,
|
||||||
|
batchDelay: Socket.batchDelay,
|
||||||
processItem: (message: ClientMessage) => {
|
processItem: (message: ClientMessage) => {
|
||||||
this._ws?.send(JSON.stringify(message))
|
this._ws?.send(JSON.stringify(message))
|
||||||
this.emit(SocketEvent.Send, message, this.url)
|
this.emit(SocketEvent.Send, message, this.url)
|
||||||
@@ -55,6 +57,7 @@ export class Socket extends EventEmitter {
|
|||||||
|
|
||||||
this._recvQueue = new TaskQueue<RelayMessage>({
|
this._recvQueue = new TaskQueue<RelayMessage>({
|
||||||
batchSize: Socket.batchSize,
|
batchSize: Socket.batchSize,
|
||||||
|
batchDelay: Socket.batchDelay,
|
||||||
processItem: (message: RelayMessage) => {
|
processItem: (message: RelayMessage) => {
|
||||||
this.emit(SocketEvent.Receive, message, this.url)
|
this.emit(SocketEvent.Receive, message, this.url)
|
||||||
},
|
},
|
||||||
|
|||||||
@@ -455,27 +455,27 @@ export const makeLoadItem = <T>(
|
|||||||
const pending = new Map<string, Promise<Maybe<T>>>()
|
const pending = new Map<string, Promise<Maybe<T>>>()
|
||||||
const attempts = new Map<string, number>()
|
const attempts = new Map<string, number>()
|
||||||
|
|
||||||
return async (key: string, ...args: any[]): Promise<Maybe<T>> => {
|
return (key: string, ...args: any[]): Promise<Maybe<T>> => {
|
||||||
const stale = getItem(key)
|
const stale = getItem(key)
|
||||||
const fetched = getFetched(key)
|
const fetched = getFetched(key)
|
||||||
|
|
||||||
// If we have an item, reload if it's relatively recent
|
// If we have an item, reload if it's relatively recent
|
||||||
if (stale && fetched > now() - timeout) {
|
if (stale && fetched > now() - timeout) {
|
||||||
return stale
|
return Promise.resolve(stale)
|
||||||
}
|
}
|
||||||
|
|
||||||
const pendingItem = pending.get(key)
|
const pendingItem = pending.get(key)
|
||||||
|
|
||||||
// If we already are loading, await and return
|
// If we already are loading, await and return
|
||||||
if (pendingItem) {
|
if (pendingItem) {
|
||||||
return pendingItem
|
return Promise.resolve(pendingItem)
|
||||||
}
|
}
|
||||||
|
|
||||||
const attempt = attempts.get(key) || 0
|
const attempt = attempts.get(key) || 0
|
||||||
|
|
||||||
// Use exponential backoff to throttle attempts
|
// Use exponential backoff to throttle attempts
|
||||||
if (fetched > now() - Math.pow(2, attempt)) {
|
if (fetched > now() - Math.pow(2, attempt)) {
|
||||||
return stale
|
return Promise.resolve(stale)
|
||||||
}
|
}
|
||||||
|
|
||||||
attempts.set(key, attempt + 1)
|
attempts.set(key, attempt + 1)
|
||||||
@@ -486,20 +486,22 @@ export const makeLoadItem = <T>(
|
|||||||
|
|
||||||
pending.set(key, promise)
|
pending.set(key, promise)
|
||||||
|
|
||||||
let item
|
return call(async () => {
|
||||||
try {
|
let item
|
||||||
item = await promise
|
try {
|
||||||
} catch (e) {
|
item = await promise
|
||||||
console.warn(`Failed to load item ${key}`, e)
|
} catch (e) {
|
||||||
} finally {
|
console.warn(`Failed to load item ${key}`, e)
|
||||||
pending.delete(key)
|
} finally {
|
||||||
}
|
pending.delete(key)
|
||||||
|
}
|
||||||
|
|
||||||
if (item) {
|
if (item) {
|
||||||
attempts.delete(key)
|
attempts.delete(key)
|
||||||
}
|
}
|
||||||
|
|
||||||
return item
|
return item
|
||||||
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user