Re-work thunks

This commit is contained in:
Jon Staab
2024-10-17 08:38:53 -07:00
parent f7e570d868
commit 0523951e5a
2 changed files with 78 additions and 74 deletions
+77 -73
View File
@@ -1,5 +1,7 @@
import {writable, get} from 'svelte/store'
import {Worker, sleep, assoc} from '@welshman/lib'
import type {Writable} from 'svelte/store'
import {Worker, defer, sleep, assoc} from '@welshman/lib'
import type {Deferred} from '@welshman/lib'
import {stamp, own, hash} from "@welshman/signer"
import type {TrustedEvent, HashedEvent, EventTemplate, SignedEvent, StampedEvent, OwnedEvent} from '@welshman/util'
import {isStampedEvent, isOwnedEvent, isHashedEvent, isUnwrappedEvent, isSignedEvent} from '@welshman/util'
@@ -7,37 +9,76 @@ import {publish, PublishStatus} from "@welshman/net"
import {repository, tracker} from './core'
import {pubkey, getSession, getSigner} from './session'
export type PublishStatusData = {
id: string
url: string
export type ThunkEvent = EventTemplate | StampedEvent | OwnedEvent | TrustedEvent
export type ThunkRequest = {
event: ThunkEvent
relays: string[]
delay?: number
}
export type ThunkStatus = {
message: string
status: PublishStatus
}
export type PublishStatusDataByUrl = Record<string, PublishStatusData>
export type ThunkStatusByUrl = Record<string, ThunkStatus>
export type PublishStatusDataByUrlById = Record<string, PublishStatusDataByUrl>
export const publishStatusData = writable<PublishStatusDataByUrlById>({})
export type ThunkWithResolve = {
export type Thunk = {
event: TrustedEvent
relays: string[]
resolve: (data: PublishStatusDataByUrl) => void
delay?: number
signal?: AbortSignal
request: ThunkRequest
controller: AbortController
result: Deferred<ThunkStatusByUrl>
status: Writable<ThunkStatusByUrl>
}
export const thunkWorker = new Worker<ThunkWithResolve>()
export const prepEvent = (event: ThunkEvent) => {
if (!isStampedEvent(event as StampedEvent)) {
event = stamp(event)
}
thunkWorker.addGlobalHandler(async ({event, relays, resolve, delay, signal}: ThunkWithResolve) => {
let aborted = false
if (!isOwnedEvent(event as OwnedEvent)) {
event = own(event as StampedEvent, get(pubkey)!)
}
// Handle abort
signal?.addEventListener('abort', () => {
aborted = true
if (!isHashedEvent(event as HashedEvent)) {
event = hash(event as OwnedEvent)
}
return event as TrustedEvent
}
export const makeThunk = (request: ThunkRequest) => {
const event = prepEvent(request.event)
const controller = new AbortController()
const result: Thunk['result'] = defer()
const status: Thunk['status'] = writable({})
return {event, request, controller, result, status}
}
export const publishThunk = (request: ThunkRequest) => {
const thunk = makeThunk(request)
thunkWorker.push(thunk)
repository.publish(thunk.event)
thunk.controller.signal.addEventListener('abort', () => {
repository.removeEvent(thunk.event.id)
})
return thunk
}
export const thunkWorker = new Worker<Thunk>()
thunkWorker.addGlobalHandler(async (thunk: Thunk) => {
let event = thunk.event
// Handle abort immediately if possible
if (thunk.controller.signal.aborted) return
// If we were given a wrapped event, make sure to publish the wrapper, not the rumor
if (isUnwrappedEvent(event)) {
event = event.wrap
@@ -59,17 +100,17 @@ thunkWorker.addGlobalHandler(async ({event, relays, resolve, delay, signal}: Thu
const signedEvent = event as SignedEvent
// Wait if the thunk is to be delayed
if (delay) {
await sleep(delay)
if (thunk.request.delay) {
await sleep(thunk.request.delay)
}
// Skip publishing and remove from the repository if aborted
if (aborted) {
return repository.removeEvent(signedEvent.id)
// Skip publishing if aborted
if (thunk.controller.signal.aborted) {
return
}
// Send it off
const pub = publish({event: signedEvent, relays})
const pub = publish({event: signedEvent, relays: thunk.request.relays})
// Copy the signature over since we had deferred it
const savedEvent = repository.getEvent(signedEvent.id) as SignedEvent
@@ -79,58 +120,21 @@ thunkWorker.addGlobalHandler(async ({event, relays, resolve, delay, signal}: Thu
savedEvent.sig = signedEvent.sig
}
// Track publish success
const statusByUrl: PublishStatusDataByUrl = {}
const completed = new Set()
pub.emitter.on("*", async (status: PublishStatus, url: string, message: string) => {
const {id} = signedEvent
thunk.status.update(assoc(url, {status, message}))
Object.assign(statusByUrl, {[url]: {id, url, status, message}})
publishStatusData.update(assoc(id, statusByUrl))
if (status === PublishStatus.Success) {
tracker.track(id, url)
if (status !== PublishStatus.Pending) {
completed.add(url)
}
if (
Object.values(statusByUrl).filter(s => s.status !== PublishStatus.Pending).length ===
relays.length
) {
resolve(statusByUrl)
if (status === PublishStatus.Success) {
tracker.track(signedEvent.id, url)
}
if (completed.size === thunk.request.relays.length) {
thunk.result.resolve(get(thunk.status))
}
})
})
export type ThunkEvent = EventTemplate | StampedEvent | OwnedEvent | TrustedEvent
export const prepEvent = (event: ThunkEvent) => {
if (!isStampedEvent(event as StampedEvent)) {
event = stamp(event)
}
if (!isOwnedEvent(event as OwnedEvent)) {
event = own(event as StampedEvent, get(pubkey)!)
}
if (!isHashedEvent(event as HashedEvent)) {
event = hash(event as OwnedEvent)
}
return event as TrustedEvent
}
export type ThunkParams = {
event: ThunkEvent
relays: string[]
delay?: number
signal?: AbortSignal
}
export const publishThunk = (params: ThunkParams) =>
new Promise<PublishStatusDataByUrl>(resolve => {
const event = prepEvent(params.event)
thunkWorker.push({...params, event, resolve})
repository.publish(event)
})