Small fixes

This commit is contained in:
Jon Staab
2024-11-13 16:14:36 -08:00
parent b1f92f05bb
commit 8ff22b4fea
4 changed files with 42 additions and 21 deletions
+15 -9
View File
@@ -23,9 +23,11 @@ export const pull = async ({relays, filters}: AppSyncOpts) => {
await Promise.all( await Promise.all(
relays.map(async relay => { relays.map(async relay => {
await hasNegentropy(relay) await (
? basePull({filters, events, relays: [relay]}) hasNegentropy(relay)
: pullWithoutNegentropy({filters, relays: [relay]}) ? basePull({filters, events, relays: [relay]})
: pullWithoutNegentropy({filters, relays: [relay]})
)
}) })
) )
} }
@@ -35,9 +37,11 @@ export const push = async ({relays, filters}: AppSyncOpts) => {
await Promise.all( await Promise.all(
relays.map(async relay => { relays.map(async relay => {
await hasNegentropy(relay) await (
? basePush({filters, events, relays: [relay]}) hasNegentropy(relay)
: pushWithoutNegentropy({events, relays: [relay]}) ? basePush({filters, events, relays: [relay]})
: pushWithoutNegentropy({events, relays: [relay]})
)
}) })
) )
} }
@@ -47,9 +51,11 @@ export const sync = async ({relays, filters}: AppSyncOpts) => {
await Promise.all( await Promise.all(
relays.map(async relay => { relays.map(async relay => {
await hasNegentropy(relay) await (
? baseSync({filters, events, relays: [relay]}) hasNegentropy(relay)
: syncWithoutNegentropy({filters, events, relays: [relay]}) ? baseSync({filters, events, relays: [relay]})
: syncWithoutNegentropy({filters, events, relays: [relay]})
)
}) })
) )
} }
+8 -2
View File
@@ -3,6 +3,8 @@ const ANY = Symbol("worker/ANY")
export type WorkerOpts<T> = { export type WorkerOpts<T> = {
getKey?: (x: T) => any getKey?: (x: T) => any
shouldDefer?: (x: T) => boolean shouldDefer?: (x: T) => boolean
chunkSize?: number
delay?: number
} }
export class Worker<T> { export class Worker<T> {
@@ -14,7 +16,9 @@ export class Worker<T> {
constructor(readonly opts: WorkerOpts<T> = {}) {} constructor(readonly opts: WorkerOpts<T> = {}) {}
#doWork = async () => { #doWork = async () => {
for (let i = 0; i < 50; i++) { const {chunkSize = 50} = this.opts
for (let i = 0; i < chunkSize; i++) {
if (this.buffer.length === 0) { if (this.buffer.length === 0) {
break break
} }
@@ -52,8 +56,10 @@ export class Worker<T> {
} }
#enqueueWork = () => { #enqueueWork = () => {
const {delay = 50} = this.opts
if (!this.#paused && !this.#timeout && this.buffer.length > 0) { if (!this.#paused && !this.#timeout && this.buffer.length > 0) {
this.#timeout = setTimeout(this.#doWork, 50) as unknown as number this.#timeout = setTimeout(this.#doWork, delay) as unknown as number
} }
} }
+17 -8
View File
@@ -1,4 +1,4 @@
import {Emitter, tryCatch, randomId, equals} from "@welshman/lib" import {Emitter, sleep, tryCatch, randomId, equals} from "@welshman/lib"
import {createEvent, TrustedEvent, StampedEvent, NOSTR_CONNECT} from "@welshman/util" import {createEvent, TrustedEvent, StampedEvent, NOSTR_CONNECT} from "@welshman/util"
import {subscribe, publish, Subscription, SubscriptionEvent} from "@welshman/net" import {subscribe, publish, Subscription, SubscriptionEvent} from "@welshman/net"
import {ISigner, decrypt, hash, own, makeSecret, getPubkey} from '../util' import {ISigner, decrypt, hash, own, makeSecret, getPubkey} from '../util'
@@ -169,18 +169,21 @@ export class Nip46Broker extends Emitter {
this.#sub.emitter.on(SubscriptionEvent.Event, async (url: string, event: TrustedEvent) => { this.#sub.emitter.on(SubscriptionEvent.Event, async (url: string, event: TrustedEvent) => {
const json = await decrypt(this.#signer, event.pubkey, event.content) const json = await decrypt(this.#signer, event.pubkey, event.content)
const response = tryCatch(() => JSON.parse(json)) const response = tryCatch(() => JSON.parse(json)) || {}
if (!response.id) { if (!response.id) {
console.error(`Invalid nostr-connect response: ${json}`) console.error(`Invalid nostr-connect response: ${json}`)
} }
console.log('nip46 response:', response) // Delay errors in case there's a zombie signer out there clogging things up
if (response.error) {
await sleep(3000)
}
if (response.result === "auth_url") { if (response.result === "auth_url") {
this.emit(`auth-${response.id}`, {...response, url, event}) this.emit(`auth-${response.id}`, response)
} else { } else {
this.emit(`res-${response.id}`, {...response, url, event}) this.emit(`res-${response.id}`, response)
} }
}) })
@@ -201,9 +204,15 @@ export class Nip46Broker extends Emitter {
while (this.#queue.length > 0) { while (this.#queue.length > 0) {
const [{method, params, resolve}] = this.#queue.splice(0, 1) const [{method, params, resolve}] = this.#queue.splice(0, 1)
this.request(method, params).then(resolve, error => { try {
console.error(`Failed to send nip46 request`, {method, params, error}) const response = await this.request(method, params)
})
console.log('nip46 response:', {method, params, ...response})
resolve(response)
} catch (error: any) {
console.error(`nip46 error:`, {method, params, ...error})
}
} }
} finally { } finally {
this.#processing = false this.#processing = false
+2 -2
View File
@@ -33,8 +33,8 @@ export const nip44 = {
getKey: ([secret, pubkey]) => [secret, pubkey].join(":"), getKey: ([secret, pubkey]) => [secret, pubkey].join(":"),
getValue: ([secret, pubkey]: string[]) => nt44.v2.utils.getConversationKey(hexToBytes(secret), pubkey), getValue: ([secret, pubkey]: string[]) => nt44.v2.utils.getConversationKey(hexToBytes(secret), pubkey),
}), }),
encrypt: (pubkey: string, secret: string, m: string) => nt44.v2.encrypt(m, nip44.getSharedSecret(secret, pubkey)), encrypt: (pubkey: string, secret: string, m: string) => nt44.v2.encrypt(m, nip44.getSharedSecret(secret, pubkey)!),
decrypt: (pubkey: string, secret: string, m: string) => nt44.v2.decrypt(m, nip44.getSharedSecret(secret, pubkey)), decrypt: (pubkey: string, secret: string, m: string) => nt44.v2.decrypt(m, nip44.getSharedSecret(secret, pubkey)!),
} }
export type Sign = (event: StampedEvent) => Promise<SignedEvent> export type Sign = (event: StampedEvent) => Promise<SignedEvent>