Avoid infinite loop on failed claim; allow concurrent thunks
This commit is contained in:
+47
-44
@@ -117,7 +117,7 @@ export const publishThunk = (request: ThunkRequest) => {
|
|||||||
|
|
||||||
export const thunkWorker = new Worker<Thunk>()
|
export const thunkWorker = new Worker<Thunk>()
|
||||||
|
|
||||||
thunkWorker.addGlobalHandler(async (thunk: Thunk) => {
|
thunkWorker.addGlobalHandler((thunk: Thunk) => {
|
||||||
let event = thunk.event
|
let event = thunk.event
|
||||||
|
|
||||||
// Handle abort immediately if possible
|
// Handle abort immediately if possible
|
||||||
@@ -128,57 +128,60 @@ thunkWorker.addGlobalHandler(async (thunk: Thunk) => {
|
|||||||
event = event.wrap
|
event = event.wrap
|
||||||
}
|
}
|
||||||
|
|
||||||
// If the event was already signed, leave it alone. Otherwise, sign it now. This is to
|
// Avoid making this function async so multiple publishes can run concurrently
|
||||||
// decrease apparent latency in the UI that results from waiting for remote signers
|
Promise.resolve().then(async () => {
|
||||||
if (!isSignedEvent(event)) {
|
// If the event was already signed, leave it alone. Otherwise, sign it now. This is to
|
||||||
const signer = getSigner(getSession(event.pubkey))
|
// decrease apparent latency in the UI that results from waiting for remote signers
|
||||||
|
if (!isSignedEvent(event)) {
|
||||||
|
const signer = getSigner(getSession(event.pubkey))
|
||||||
|
|
||||||
if (!signer) {
|
if (!signer) {
|
||||||
return console.warn(`No signer found for ${event.pubkey}`)
|
return console.warn(`No signer found for ${event.pubkey}`)
|
||||||
|
}
|
||||||
|
|
||||||
|
event = await signer.sign(event)
|
||||||
}
|
}
|
||||||
|
|
||||||
event = await signer.sign(event)
|
// We're guaranteed to have a signed event at this point
|
||||||
}
|
const signedEvent = event as SignedEvent
|
||||||
|
|
||||||
// We're guaranteed to have a signed event at this point
|
// Wait if the thunk is to be delayed
|
||||||
const signedEvent = event as SignedEvent
|
if (thunk.request.delay) {
|
||||||
|
await sleep(thunk.request.delay)
|
||||||
// Wait if the thunk is to be delayed
|
|
||||||
if (thunk.request.delay) {
|
|
||||||
await sleep(thunk.request.delay)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Skip publishing if aborted
|
|
||||||
if (thunk.controller.signal.aborted) {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Send it off
|
|
||||||
const pub = publish({event: signedEvent, relays: thunk.request.relays})
|
|
||||||
|
|
||||||
// Copy the signature over since we had deferred it
|
|
||||||
const savedEvent = repository.getEvent(signedEvent.id) as SignedEvent
|
|
||||||
|
|
||||||
// The event may already be replaced or deleted
|
|
||||||
if (savedEvent) {
|
|
||||||
savedEvent.sig = signedEvent.sig
|
|
||||||
}
|
|
||||||
|
|
||||||
const completed = new Set()
|
|
||||||
|
|
||||||
pub.emitter.on("*", async (status: PublishStatus, url: string, message = "") => {
|
|
||||||
thunk.status.update(assoc(url, {status, message}))
|
|
||||||
|
|
||||||
if (status !== PublishStatus.Pending) {
|
|
||||||
completed.add(url)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (status === PublishStatus.Success) {
|
// Skip publishing if aborted
|
||||||
tracker.track(signedEvent.id, url)
|
if (thunk.controller.signal.aborted) {
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if (completed.size === thunk.request.relays.length) {
|
// Send it off
|
||||||
thunk.result.resolve(get(thunk.status))
|
const pub = publish({event: signedEvent, relays: thunk.request.relays})
|
||||||
|
|
||||||
|
// Copy the signature over since we had deferred it
|
||||||
|
const savedEvent = repository.getEvent(signedEvent.id) as SignedEvent
|
||||||
|
|
||||||
|
// The event may already be replaced or deleted
|
||||||
|
if (savedEvent) {
|
||||||
|
savedEvent.sig = signedEvent.sig
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const completed = new Set()
|
||||||
|
|
||||||
|
pub.emitter.on("*", async (status: PublishStatus, url: string, message = "") => {
|
||||||
|
thunk.status.update(assoc(url, {status, message}))
|
||||||
|
|
||||||
|
if (status !== PublishStatus.Pending) {
|
||||||
|
completed.add(url)
|
||||||
|
}
|
||||||
|
|
||||||
|
if (status === PublishStatus.Success) {
|
||||||
|
tracker.track(signedEvent.id, url)
|
||||||
|
}
|
||||||
|
|
||||||
|
if (completed.size === thunk.request.relays.length) {
|
||||||
|
thunk.result.resolve(get(thunk.status))
|
||||||
|
}
|
||||||
|
})
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|||||||
@@ -1,4 +1,5 @@
|
|||||||
import {Emitter, Worker, sleep} from '@welshman/lib'
|
import {Emitter, Worker, sleep} from '@welshman/lib'
|
||||||
|
import {AUTH_JOIN} from '@welshman/util'
|
||||||
import {ConnectionMeta} from './ConnectionMeta'
|
import {ConnectionMeta} from './ConnectionMeta'
|
||||||
import {ConnectionAuth, AuthStatus} from './ConnectionAuth'
|
import {ConnectionAuth, AuthStatus} from './ConnectionAuth'
|
||||||
import {Socket, isMessage, asMessage} from './Socket'
|
import {Socket, isMessage, asMessage} from './Socket'
|
||||||
@@ -43,7 +44,7 @@ export class Connection extends Emitter {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Allow relay requests through
|
// Allow relay requests through
|
||||||
if (verb === 'EVENT' && extra[0].kind === 28934) {
|
if (verb === 'EVENT' && extra[0].kind === AUTH_JOIN) {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -46,8 +46,6 @@ export class ConnectionAuth {
|
|||||||
const [id, ok, message] = extra
|
const [id, ok, message] = extra
|
||||||
|
|
||||||
if (id === this.request) {
|
if (id === this.request) {
|
||||||
this.challenge = undefined
|
|
||||||
this.request = undefined
|
|
||||||
this.message = message
|
this.message = message
|
||||||
this.status = ok ? Ok : Forbidden
|
this.status = ok ? Ok : Forbidden
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,3 +1,4 @@
|
|||||||
|
import {AUTH_JOIN} from '@welshman/util'
|
||||||
import type {SignedEvent, Filter} from '@welshman/util'
|
import type {SignedEvent, Filter} from '@welshman/util'
|
||||||
import type {Message} from './Socket'
|
import type {Message} from './Socket'
|
||||||
import type {Connection} from './Connection'
|
import type {Connection} from './Connection'
|
||||||
@@ -86,20 +87,16 @@ export class ConnectionMeta {
|
|||||||
}
|
}
|
||||||
|
|
||||||
onReceiveOk([verb, eventId, ok, notice]: Message) {
|
onReceiveOk([verb, eventId, ok, notice]: Message) {
|
||||||
|
const pub = this.pendingPublishes.get(eventId)
|
||||||
|
|
||||||
|
if (!pub) return
|
||||||
|
|
||||||
// Re-enqueue pending events when auth challenge is received
|
// Re-enqueue pending events when auth challenge is received
|
||||||
if (notice?.startsWith('auth-required:')) {
|
if (notice?.startsWith('auth-required:') && pub.event.kind !== AUTH_JOIN) {
|
||||||
const pub = this.pendingPublishes.get(eventId)
|
this.cxn.send(['EVENT', pub.event])
|
||||||
|
} else {
|
||||||
if (pub) {
|
|
||||||
this.cxn.send(['EVENT', pub.event])
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
const publish = this.pendingPublishes.get(eventId)
|
|
||||||
|
|
||||||
if (publish) {
|
|
||||||
this.responseCount++
|
this.responseCount++
|
||||||
this.responseTimer += Date.now() - publish.sent
|
this.responseTimer += Date.now() - pub.sent
|
||||||
this.pendingPublishes.delete(eventId)
|
this.pendingPublishes.delete(eventId)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user