Improve feed loader, wait for db before executing reads/updates, make taskQueue subscribable, add race, rename auth methods, fix failed wasm verify, fix localhost urls

This commit is contained in:
Jon Staab
2025-04-09 15:39:07 -07:00
parent 1bcc57d695
commit 859f7fa68f
14 changed files with 155 additions and 32 deletions
+1
View File
@@ -1,3 +1,4 @@
--ignore-dir=docs
--ignore-dir=docs/reference --ignore-dir=docs/reference
--ignore-dir=docs/.vitepress/cache --ignore-dir=docs/.vitepress/cache
--ignore-dir=dist --ignore-dir=dist
+1
View File
@@ -1,4 +1,5 @@
node_modules node_modules
docs
docs/reference docs/reference
docs/.vitepress/cache docs/.vitepress/cache
build build
+66 -15
View File
@@ -1,6 +1,6 @@
import {nthEq, now} from "@welshman/lib" import {nthEq, partition, race, now} from "@welshman/lib"
import {createEvent, getPubkeyTagValues} from "@welshman/util" import {createEvent, getPubkeyTagValues} from "@welshman/util"
import {MultiRequest, RequestEvent} from "@welshman/net" import {MultiRequest, Tracker, RequestEvent, request} from "@welshman/net"
import {Scope, FeedController, RequestOpts, FeedOptions, DVMOpts, Feed} from "@welshman/feeds" import {Scope, FeedController, RequestOpts, FeedOptions, DVMOpts, Feed} from "@welshman/feeds"
import {makeDvmRequest, DVMEvent} from "@welshman/dvm" import {makeDvmRequest, DVMEvent} from "@welshman/dvm"
import {makeSecret, Nip01Signer} from "@welshman/signer" import {makeSecret, Nip01Signer} from "@welshman/signer"
@@ -8,20 +8,68 @@ import {pubkey, signer} from "./session.js"
import {Router, addMinimalFallbacks, getFilterSelections} from "./router.js" import {Router, addMinimalFallbacks, getFilterSelections} from "./router.js"
import {loadRelaySelections} from "./relaySelections.js" import {loadRelaySelections} from "./relaySelections.js"
import {wotGraph, maxWot, getFollows, getNetwork, getFollowers} from "./wot.js" import {wotGraph, maxWot, getFollows, getNetwork, getFollowers} from "./wot.js"
import {repository} from "./core.js"
export const request = async ({filters = [{}], relays = [], onEvent}: RequestOpts) => { export type FeedRequestHandlerOptions = {
if (relays.length > 0) { signal?: AbortSignal
await new Promise<void>(resolve => {
const sub = new MultiRequest({filters, relays, timeout: 5000, autoClose: true})
sub.on(RequestEvent.Event, onEvent)
sub.on(RequestEvent.Close, resolve)
})
} else {
await Promise.all(getFilterSelections(filters).map(opts => request({...opts, onEvent})))
}
} }
export const makeFeedRequestHandler = ({signal}: FeedRequestHandlerOptions) =>
async ({filters = [{}], relays = [], onEvent}: RequestOpts) => {
const tracker = new Tracker()
const requestOptions = {}
if (relays.length > 0) {
await new Promise(resolve => {
const req = request({tracker, signal, autoClose: true, relays, filters})
req.on(RequestEvent.Event, onEvent)
req.on(RequestEvent.Close, resolve)
})
} else {
const requests: MultiRequest[] = []
const [withSearch, withoutSearch] = partition(f => Boolean(f.search), filters)
if (withSearch.length > 0) {
requests.push(
request({
tracker, signal, autoClose: true,
filters: withSearch,
relays: Router.get().Search().getUrls(),
}),
)
}
if (withoutSearch.length > 0) {
requests.push(
...getFilterSelections(filters).flatMap(options =>
request({tracker, signal, autoClose: true, ...options}),
),
)
}
// Break out selections by relay so we can complete early after a certain number
// of requests complete for faster load times
await race(
withSearch.length > 0 ? 0.1 : 0.8,
requests.map(
req =>
new Promise(resolve => {
req.on(RequestEvent.Event, onEvent)
req.on(RequestEvent.Close, resolve)
}),
),
)
// Wait until after we've queried the network to access our local cache. This results in less
// snappy response times, but is necessary to prevent stale stuff that the user has already seen
// from showing up at the top of the feed
for (const event of repository.query(filters)) {
onEvent(event)
}
}
}
export const requestDVM = async ({kind, onEvent, ...request}: DVMOpts) => { export const requestDVM = async ({kind, onEvent, ...request}: DVMOpts) => {
// Make sure we know what relays to use for target dvms // Make sure we know what relays to use for target dvms
if (request.tags && !request.relays) { if (request.tags && !request.relays) {
@@ -101,11 +149,14 @@ export const getPubkeysForWOTRange = (min: number, max: number) => {
type _FeedOptions = Partial<Omit<FeedOptions, "feed">> & {feed: Feed} type _FeedOptions = Partial<Omit<FeedOptions, "feed">> & {feed: Feed}
export const createFeedController = (options: _FeedOptions) => export const createFeedController = (options: _FeedOptions) => {
new FeedController({ const request = makeFeedRequestHandler(options)
return new FeedController({
request, request,
requestDVM, requestDVM,
getPubkeysForScope, getPubkeysForScope,
getPubkeysForWOTRange, getPubkeysForWOTRange,
...options, ...options,
}) })
}
+11 -1
View File
@@ -2,7 +2,7 @@ import {openDB, deleteDB} from "idb"
import {IDBPDatabase} from "idb" import {IDBPDatabase} from "idb"
import {writable} from "svelte/store" import {writable} from "svelte/store"
import {Unsubscriber} from "svelte/store" import {Unsubscriber} from "svelte/store"
import {call} from "@welshman/lib" import {call, defer} from "@welshman/lib"
import {withGetter} from "@welshman/store" import {withGetter} from "@welshman/store"
export type StorageAdapterOptions = { export type StorageAdapterOptions = {
@@ -18,11 +18,15 @@ export type StorageAdapter = {
export let db: IDBPDatabase | undefined export let db: IDBPDatabase | undefined
export const ready = defer<void>()
export const dead = withGetter(writable(false)) export const dead = withGetter(writable(false))
export const subs: Unsubscriber[] = [] export const subs: Unsubscriber[] = []
export const getAll = async (name: string) => { export const getAll = async (name: string) => {
await ready
const tx = db!.transaction(name, "readwrite") const tx = db!.transaction(name, "readwrite")
const store = tx.objectStore(name) const store = tx.objectStore(name)
const result = await store.getAll() const result = await store.getAll()
@@ -33,6 +37,8 @@ export const getAll = async (name: string) => {
} }
export const bulkPut = async (name: string, data: any[]) => { export const bulkPut = async (name: string, data: any[]) => {
await ready
const tx = db!.transaction(name, "readwrite") const tx = db!.transaction(name, "readwrite")
const store = tx.objectStore(name) const store = tx.objectStore(name)
@@ -50,6 +56,8 @@ export const bulkPut = async (name: string, data: any[]) => {
} }
export const bulkDelete = async (name: string, ids: string[]) => { export const bulkDelete = async (name: string, ids: string[]) => {
await ready
const tx = db!.transaction(name, "readwrite") const tx = db!.transaction(name, "readwrite")
const store = tx.objectStore(name) const store = tx.objectStore(name)
@@ -90,6 +98,8 @@ export const initStorage = async (
}, },
}) })
ready.resolve()
await Promise.all(Object.values(adapters).map(adapter => adapter.init())) await Promise.all(Object.values(adapters).map(adapter => adapter.init()))
const unsubscribers = Object.values(adapters).map(adapter => adapter.sync()) const unsubscribers = Object.values(adapters).map(adapter => adapter.sync())
+2
View File
@@ -1,9 +1,11 @@
import type {Subscriber} from "svelte/store"
import {Writable, Readable, writable, derived, get} from "svelte/store" import {Writable, Readable, writable, derived, get} from "svelte/store"
import { import {
Deferred, Deferred,
fromPairs, fromPairs,
TaskQueue, TaskQueue,
dissoc, dissoc,
remove,
identity, identity,
uniq, uniq,
defer, defer,
+1 -1
View File
@@ -132,5 +132,5 @@ export type FeedOptions = {
onEvent?: (event: TrustedEvent) => void onEvent?: (event: TrustedEvent) => void
onExhausted?: () => void onExhausted?: () => void
useWindowing?: boolean useWindowing?: boolean
abortController?: AbortController signal?: AbortSignal
} }
+13
View File
@@ -6,6 +6,7 @@ export type TaskQueueOptions<Item> = {
} }
export class TaskQueue<Item> { export class TaskQueue<Item> {
_subs: ((item: Item) => void)[] = []
items: Item[] = [] items: Item[] = []
isPaused = false isPaused = false
isProcessing = false isProcessing = false
@@ -21,6 +22,14 @@ export class TaskQueue<Item> {
this.items = remove(item, this.items) this.items = remove(item, this.items)
} }
subscribe(subscriber: (item: Item) => void) {
this._subs.push(subscriber)
return () => {
this._subs = remove(subscriber, this._subs)
}
}
async process() { async process() {
if (this.isProcessing || this.isPaused || this.items.length === 0) { if (this.isProcessing || this.isPaused || this.items.length === 0) {
return return
@@ -32,6 +41,10 @@ export class TaskQueue<Item> {
for (const item of this.items.splice(0, this.options.batchSize)) { for (const item of this.items.splice(0, this.options.batchSize)) {
try { try {
for (const subscriber of this._subs) {
subscriber(item)
}
await this.options.processItem(item) await this.options.processItem(item)
} catch (e) { } catch (e) {
console.error(e) console.error(e)
+26
View File
@@ -1038,6 +1038,32 @@ export const batcher = <T, U>(t: number, execute: (request: T[]) => U[] | Promis
}) })
} }
/**
* Returns a promise that resolves after some proportion of promises complete
* @param threshold - number between 0 and 1 for how many promises to wait for
* @param promises - array of promises
* @returns promise
*/
export const race = (threshold: number, promises: Promise<unknown>[]) => {
let count = 0
if (threshold === 0) {
return Promise.resolve()
}
return new Promise<void>((resolve, reject) => {
promises.forEach(p => {
p.then(() => {
count++
if (count >= threshold * promises.length) {
resolve()
}
}).catch(reject)
})
})
}
// ---------------------------------------------------------------------------- // ----------------------------------------------------------------------------
// URLs // URLs
// ---------------------------------------------------------------------------- // ----------------------------------------------------------------------------
+5 -5
View File
@@ -107,11 +107,11 @@ describe("auth", () => {
}) })
}) })
describe("authenticate", () => { describe("doAuth", () => {
it("should throw an error when there is no challenge", async () => { it("should throw an error when there is no challenge", async () => {
const sign = vi.fn() const sign = vi.fn()
await expect(socket.auth.authenticate(sign)).rejects.toThrow( await expect(socket.auth.doAuth(sign)).rejects.toThrow(
"Attempted to authenticate with no challenge", "Attempted to authenticate with no challenge",
) )
}) })
@@ -122,7 +122,7 @@ describe("auth", () => {
socket.auth.challenge = "challenge123" socket.auth.challenge = "challenge123"
socket.auth.status = AuthStatus.PendingResponse socket.auth.status = AuthStatus.PendingResponse
await expect(socket.auth.authenticate(sign)).rejects.toThrow( await expect(socket.auth.doAuth(sign)).rejects.toThrow(
"Attempted to authenticate when auth is already auth:status:pending_response", "Attempted to authenticate when auth is already auth:status:pending_response",
) )
}) })
@@ -133,7 +133,7 @@ describe("auth", () => {
socket.auth.challenge = "challenge123" socket.auth.challenge = "challenge123"
socket.auth.status = AuthStatus.Requested socket.auth.status = AuthStatus.Requested
await socket.auth.authenticate(sign) await socket.auth.doAuth(sign)
expect(socket.auth.status).toBe(AuthStatus.DeniedSignature) expect(socket.auth.status).toBe(AuthStatus.DeniedSignature)
}) })
@@ -151,7 +151,7 @@ describe("auth", () => {
return event return event
} }
await socket.auth.authenticate(sign) await socket.auth.doAuth(sign)
expect(socket.auth.request).toStrictEqual(event!.id) expect(socket.auth.request).toStrictEqual(event!.id)
expect(sendSpy).toHaveBeenCalledWith(["AUTH", event]) expect(sendSpy).toHaveBeenCalledWith(["AUTH", event])
+20 -2
View File
@@ -1,5 +1,5 @@
import EventEmitter from "events" import EventEmitter from "events"
import {on, call} from "@welshman/lib" import {on, poll, call} from "@welshman/lib"
import {SignedEvent, StampedEvent} from "@welshman/util" import {SignedEvent, StampedEvent} from "@welshman/util"
import {makeEvent, CLIENT_AUTH} from "@welshman/util" import {makeEvent, CLIENT_AUTH} from "@welshman/util"
import {isRelayAuth, isClientAuth, isRelayOk, RelayMessage} from "./message.js" import {isRelayAuth, isClientAuth, isRelayOk, RelayMessage} from "./message.js"
@@ -97,7 +97,7 @@ export class AuthState extends EventEmitter {
this.emit(AuthStateEvent.Status, status) this.emit(AuthStateEvent.Status, status)
} }
async authenticate(sign: (event: StampedEvent) => Promise<SignedEvent>) { async doAuth(sign: (event: StampedEvent) => Promise<SignedEvent>) {
if (!this.challenge) { if (!this.challenge) {
throw new Error("Attempted to authenticate with no challenge") throw new Error("Attempted to authenticate with no challenge")
} }
@@ -119,6 +119,24 @@ export class AuthState extends EventEmitter {
} }
} }
async attemptAuth(sign: (event: StampedEvent) => Promise<SignedEvent>) {
this.socket.attemptToOpen()
await poll({
signal: AbortSignal.timeout(800),
condition: () => this.status === AuthStatus.Requested,
})
if (this.status === AuthStatus.Requested) {
await this.doAuth(sign)
}
await poll({
signal: AbortSignal.timeout(800),
condition: () => this.status !== AuthStatus.PendingResponse,
})
}
cleanup() { cleanup() {
this.removeAllListeners() this.removeAllListeners()
this._unsubscribers.forEach(call) this._unsubscribers.forEach(call)
+1 -1
View File
@@ -14,6 +14,6 @@ export type NetContext = {
export const netContext: NetContext = { export const netContext: NetContext = {
pool: Pool.getSingleton(), pool: Pool.getSingleton(),
repository: Repository.getSingleton(), repository: Repository.getSingleton(),
isEventValid: (event, url) => Boolean(event.sig && verifyEvent(event as SignedEvent)), isEventValid: (event, url) => verifyEvent(event),
isEventDeleted: (event, url) => netContext.repository.isDeleted(event), isEventDeleted: (event, url) => netContext.repository.isDeleted(event),
} }
+1 -1
View File
@@ -199,7 +199,7 @@ export const makeSocketPolicyAuth = (options: SocketPolicyAuthOptions) => (socke
const unsubscribers = [ const unsubscribers = [
on(socket.auth, AuthStateEvent.Status, (status: AuthStatus) => { on(socket.auth, AuthStateEvent.Status, (status: AuthStatus) => {
if (status === AuthStatus.Requested && shouldAuth(socket)) { if (status === AuthStatus.Requested && shouldAuth(socket)) {
socket.auth.authenticate(options.sign) socket.auth.doAuth(options.sign)
} }
}), }),
] ]
+5 -4
View File
@@ -66,14 +66,15 @@ export const verifyEvent = (() => {
if (typeof WebAssembly === "object") { if (typeof WebAssembly === "object") {
initNostrWasm() initNostrWasm()
.then(setNostrWasm, noop) .then(nostrWasm => {
.then(() => { setNostrWasm(nostrWasm)
verify = verifyEventWasm verify = verifyEventWasm
}, e => {
console.warn(e)
}) })
} }
return (event: TrustedEvent) => return (event: TrustedEvent) => Boolean(event.sig && verify(event as SignedEvent))
event.sig && (event[verifiedSymbol] || verify(event as SignedEvent))
})() })()
export const isEventTemplate = (e: EventTemplate): e is EventTemplate => export const isEventTemplate = (e: EventTemplate): e is EventTemplate =>
+2 -2
View File
@@ -33,8 +33,8 @@ export const isRelayUrl = (url: string) => {
// Skip urls with a slash before the dot // Skip urls with a slash before the dot
if (url.match(/\\.*\./)) return false if (url.match(/\\.*\./)) return false
// Skip urls without a dot // Skip non-localhost urls without a dot
if (!url.match(/\./)) return false if (!url.match(/\./) && !url.includes('localhost')) return false
try { try {
new URL(url) new URL(url)