re-work thunks

This commit is contained in:
Jon Staab
2025-04-14 11:51:09 -07:00
parent b6b54f650b
commit e54cb0a62d
3 changed files with 282 additions and 222 deletions
+224 -194
View File
@@ -4,6 +4,7 @@ import {
Deferred, Deferred,
fromPairs, fromPairs,
TaskQueue, TaskQueue,
ifLet,
dissoc, dissoc,
remove, remove,
identity, identity,
@@ -11,6 +12,9 @@ import {
defer, defer,
sleep, sleep,
assoc, assoc,
spec,
nthEq,
nth,
} from "@welshman/lib" } from "@welshman/lib"
import {stamp, own, hash} from "@welshman/signer" import {stamp, own, hash} from "@welshman/signer"
import { import {
@@ -26,37 +30,12 @@ import {
isUnwrappedEvent, isUnwrappedEvent,
isSignedEvent, isSignedEvent,
} from "@welshman/util" } from "@welshman/util"
import {publish, AdapterContext, PublishStatus} from "@welshman/net" import {publish, AdapterContext, PublishStatus, PublishOptions, PublishStatusByRelay} from "@welshman/net"
import {repository, tracker} from "./core.js" import {repository, tracker} from "./core.js"
import {pubkey, getSession, getSigner} from "./session.js" import {pubkey, getSession, getSigner} from "./session.js"
const {Pending, Success, Failure, Timeout, Aborted} = PublishStatus
export type ThunkEvent = EventTemplate | StampedEvent | OwnedEvent | TrustedEvent export type ThunkEvent = EventTemplate | StampedEvent | OwnedEvent | TrustedEvent
export type ThunkRequest = {
event: ThunkEvent
relays: string[]
delay?: number
timeout?: number
context?: AdapterContext
}
export type ThunkStatus = {
message: string
status: PublishStatus
}
export type ThunkStatusByRelay = Record<string, ThunkStatus>
export type Thunk = {
event: TrustedEvent
request: ThunkRequest
controller: AbortController
result: Deferred<ThunkStatusByRelay>
status: Writable<ThunkStatusByRelay>
}
export const prepEvent = (event: ThunkEvent) => { export const prepEvent = (event: ThunkEvent) => {
if (!isStampedEvent(event as StampedEvent)) { if (!isStampedEvent(event as StampedEvent)) {
event = stamp(event) event = stamp(event)
@@ -73,63 +52,229 @@ export const prepEvent = (event: ThunkEvent) => {
return event as TrustedEvent return event as TrustedEvent
} }
export const makeThunk = (request: ThunkRequest) => { export type ThunkOptions = Omit<PublishOptions, 'event'> & {
const event = prepEvent(request.event) event: ThunkEvent
const controller = new AbortController() delay?: number
const result: Thunk["result"] = defer()
const status: Thunk["status"] = writable({})
return {event, request, controller, result, status}
} }
export type MergedThunk = { export class Thunk {
thunks: Thunk[] _subs: Subscriber<Thunk>[] = []
controller: AbortController
result: Promise<ThunkStatusByRelay[]> event: TrustedEvent
status: Readable<ThunkStatusByRelay> result = defer<PublishStatusByRelay>()
status: PublishStatusByRelay = {}
details: Record<string, string> = {}
controller = new AbortController()
constructor(readonly options: ThunkOptions) {
this.event = prepEvent(options.event)
for (const relay of options.relays) {
this.status[relay] = PublishStatus.Sending
}
} }
export const isMergedThunk = (thunk: Thunk | MergedThunk): thunk is MergedThunk => _notify() {
Boolean((thunk as any).thunks) for (const subscriber of this._subs) {
subscriber(this)
export const mergeThunks = (thunks: Thunk[]) => {
const controller = new AbortController()
controller.signal.addEventListener("abort", () => {
for (const thunk of thunks) {
thunk.controller.abort()
} }
}
_fail(message: string) {
for (const relay of this.options.relays) {
this.status[relay] = PublishStatus.Failure
this.details[relay] = message
}
this._notify()
}
async publish() {
let event = this.event
// Handle abort immediately if possible
if (this.controller.signal.aborted) return
// If we were given a wrapped event, make sure to publish the wrapper, not the rumor
if (isUnwrappedEvent(event)) {
event = event.wrap
}
// If the event was already signed, leave it alone. Otherwise, sign it now. This is to
// decrease apparent latency in the UI that results from waiting for remote signers
if (!isSignedEvent(event)) {
const signer = getSigner(getSession(event.pubkey))
if (!signer) {
return this._fail(`No signer found for ${event.pubkey}`)
}
try {
event = await signer.sign(event)
} catch (e: any) {
return this._fail(String(e.error || e))
}
}
// We're guaranteed to have a signed event at this point
const signedEvent = event as SignedEvent
// Copy the signature over since we had deferred signing
ifLet(repository.getEvent(signedEvent.id), savedEvent => {
savedEvent.sig = signedEvent.sig
}) })
return { // Wait if the thunk is to be delayed
thunks, if (this.options.delay) {
controller, await sleep(this.options.delay)
result: Promise.all(thunks.map(thunk => thunk.result)),
status: derived(
thunks.map(thunk => thunk.status),
statuses => {
const mergedStatus: ThunkStatusByRelay = {}
for (const url of uniq(statuses.flatMap(s => Object.keys(s)))) {
const urlStatuses = statuses.map(s => s[url])
const thunkStatus = [Aborted, Failure, Timeout, Pending, Success]
.map(status => urlStatuses.find(s => s?.status === status))
.find(identity)
if (thunkStatus) {
mergedStatus[url] = thunkStatus
}
} }
return mergedStatus // Skip publishing if aborted
if (this.controller.signal.aborted) {
return
}
// Send it off
this.result.resolve(
await publish({
...this.options,
event: signedEvent,
onSuccess: (message: string, relay: string) => {
tracker.track(signedEvent.id, relay)
this.options.onSuccess?.(message, relay)
this.status[relay] = PublishStatus.Success
this.details[relay] = message
this._notify()
}, },
), onFailure: (message: string, relay: string) => {
this.options.onFailure?.(message, relay)
this.status[relay] = PublishStatus.Failure
this.details[relay] = message
this._notify()
},
onPending: (relay: string) => {
this.options.onPending?.(relay)
this.status[relay] = PublishStatus.Pending
this._notify()
},
onTimeout: (relay: string) => {
this.options.onTimeout?.(relay)
this.status[relay] = PublishStatus.Timeout
this.details[relay] = "Publish timed out"
this._notify()
},
onAborted: (relay: string) => {
this.options.onAborted?.(relay)
this.status[relay] = PublishStatus.Aborted
this.details[relay] = "Publish was aborted"
this._notify()
},
onComplete: () => {
this.options.onComplete?.()
this._subs = []
},
})
)
}
subscribe(subscriber: Subscriber<Thunk>) {
this._subs.push(subscriber)
subscriber(this)
return () => {
this._subs = remove(subscriber, this._subs)
}
} }
} }
export function* walkThunks(thunks: (Thunk | MergedThunk)[]): Iterable<Thunk> { export class MergedThunk {
_subs: Subscriber<MergedThunk>[] = []
controller = new AbortController()
status: PublishStatusByRelay = {}
details: Record<string, string> = {}
constructor(readonly thunks: Thunk[]) {
const {Aborted, Failure, Timeout, Pending, Success} = PublishStatus
const relays = new Set(thunks.flatMap(thunk => Object.keys(thunk.options.relays)))
const statusMaps = thunks.map(thunk => thunk.status)
for (const thunk of thunks) { for (const thunk of thunks) {
if (isMergedThunk(thunk)) { this.controller.signal.addEventListener("abort", () => thunk.controller.abort())
thunk.subscribe($thunk => {
this.status = {}
this.details = {}
for (const relay of relays) {
for (const status of [Aborted, Failure, Timeout, Pending, Success]) {
const thunk = thunks.find(spec({[relay]: status}))
if (thunk) {
this.status[relay] = thunk.status[relay]!
this.details[relay] = thunk.details[relay]!
}
}
}
this._notify()
if (thunks.filter(thunkIsComplete).length === thunks.length) {
this._subs = []
}
})
}
}
_notify() {
for (const subscriber of this._subs) {
subscriber(this)
}
}
subscribe(subscriber: Subscriber<MergedThunk>) {
this._subs.push(subscriber)
subscriber(this)
return () => {
this._subs = remove(subscriber, this._subs)
}
}
}
export type AbstractThunk = Thunk | MergedThunk
export const isThunk = (thunk: AbstractThunk): thunk is Thunk =>
thunk instanceof Thunk
export const isMergedThunk = (thunk: AbstractThunk): thunk is MergedThunk =>
thunk instanceof MergedThunk
export const thunkHasStatus = (thunk: AbstractThunk, status: PublishStatus) =>
Object.entries(thunk.status).some(nthEq(1, status))
export const thunkUrlsWithStatus = (thunk: AbstractThunk, status: PublishStatus) =>
Object.entries(thunk.status).filter(nthEq(1, status)).map(nth(0))
export const thunkCompleteUrls = (thunk: AbstractThunk) => {
const incompleteStatuses = [PublishStatus.Sending, PublishStatus.Pending]
return Object.entries(thunk.status).filter(([_, s]) => !incompleteStatuses.includes(s)).map(nth(1))
}
export const thunkIncompleteUrls = (thunk: AbstractThunk) => {
const incompleteStatuses = [PublishStatus.Sending, PublishStatus.Pending]
return Object.entries(thunk.status).filter(([_, s]) => incompleteStatuses.includes(s)).map(nth(1))
}
export const thunkIsComplete = (thunk: AbstractThunk) => thunkCompleteUrls(thunk).length > 0
export function* walkThunks(thunks: (AbstractThunk)[]): Iterable<Thunk> {
for (const thunk of thunks) {
if (thunk instanceof MergedThunk) {
yield* walkThunks(thunk.thunks) yield* walkThunks(thunk.thunks)
} else { } else {
yield thunk yield thunk
@@ -137,10 +282,17 @@ export function* walkThunks(thunks: (Thunk | MergedThunk)[]): Iterable<Thunk> {
} }
} }
export const thunks = writable<Record<string, Thunk | MergedThunk>>({}) export const thunks = writable<Record<string, AbstractThunk>>({})
export const publishThunk = (request: ThunkRequest) => { export const thunkQueue = new TaskQueue<Thunk>({
const thunk = makeThunk(request) batchSize: 50,
processItem: (thunk: Thunk) => {
thunk.publish()
},
})
export const publishThunk = (options: ThunkOptions) => {
const thunk = new Thunk(options)
thunkQueue.push(thunk) thunkQueue.push(thunk)
@@ -155,130 +307,8 @@ export const publishThunk = (request: ThunkRequest) => {
return thunk return thunk
} }
export const publishThunks = (requests: ThunkRequest[]) => {
const newThunks = requests.map(makeThunk)
const mergedThunk = mergeThunks(newThunks)
for (const thunk of newThunks) {
thunkQueue.push(thunk)
repository.publish(thunk.event)
thunks.update(assoc(thunk.event.id, mergedThunk))
thunk.controller.signal.addEventListener("abort", () => {
repository.removeEvent(thunk.event.id)
})
}
return mergedThunk
}
export const abortThunk = (thunk: Thunk) => { export const abortThunk = (thunk: Thunk) => {
thunk.controller.abort() thunk.controller.abort()
thunks.update(dissoc(thunk.event.id)) thunks.update(dissoc(thunk.event.id))
repository.removeEvent(thunk.event.id) repository.removeEvent(thunk.event.id)
} }
export const thunkQueue = new TaskQueue<Thunk>({
batchSize: 50,
processItem: (thunk: Thunk) => {
let event = thunk.event
// Handle abort immediately if possible
if (thunk.controller.signal.aborted) return
// If we were given a wrapped event, make sure to publish the wrapper, not the rumor
if (isUnwrappedEvent(event)) {
event = event.wrap
}
// Avoid making this function async so multiple publishes can run concurrently
Promise.resolve().then(async () => {
const fail = (message: string) => {
const status: ThunkStatusByRelay = {}
for (const url of thunk.request.relays) {
status[url] = {status: PublishStatus.Failure, message}
}
thunk.status.set(status)
}
// If the event was already signed, leave it alone. Otherwise, sign it now. This is to
// decrease apparent latency in the UI that results from waiting for remote signers
if (!isSignedEvent(event)) {
const signer = getSigner(getSession(event.pubkey))
if (!signer) {
return fail(`No signer found for ${event.pubkey}`)
}
try {
event = await signer.sign(event)
} catch (e: any) {
return fail(String(e.error || e))
}
}
// We're guaranteed to have a signed event at this point
const signedEvent = event as SignedEvent
// Wait if the thunk is to be delayed
if (thunk.request.delay) {
await sleep(thunk.request.delay)
}
// Skip publishing if aborted
if (thunk.controller.signal.aborted) {
return
}
// Update status to pending
thunk.status.set(
fromPairs(
thunk.request.relays.map(url => [
url,
{status: Pending, message: "Sending your message..."},
]),
),
)
// Send it off
publish({
event: signedEvent,
relays: thunk.request.relays,
context: thunk.request.context,
timeout: thunk.request.timeout,
onSuccess: (message: string, url: string) => {
tracker.track(signedEvent.id, url)
thunk.status.update(assoc(url, {status: PublishStatus.Success, message}))
},
onFailure: (message: string, url: string) => {
thunk.status.update(assoc(url, {status: PublishStatus.Failure, message}))
},
onTimeout: (url: string) => {
const message = "Publish timed out"
thunk.status.update(assoc(url, {status: PublishStatus.Timeout, message}))
},
onAborted: (url: string) => {
const message = "Publish was aborted"
thunk.status.update(assoc(url, {status: PublishStatus.Aborted, message}))
},
onComplete: () => {
thunk.result.resolve(get(thunk.status))
},
})
// Copy the signature over since we had deferred it
const savedEvent = repository.getEvent(signedEvent.id) as SignedEvent
// The event may already be replaced or deleted
if (savedEvent) {
savedEvent.sig = signedEvent.sig
}
})
},
})
+8 -2
View File
@@ -1283,13 +1283,13 @@ export const spec =
/** Returns a function that checks equality with value */ /** Returns a function that checks equality with value */
export const eq = export const eq =
<T>(v: T) => <T>(v: T) =>
(x: T) => (x: T, ...args: unknown[]) =>
x === v x === v
/** Returns a function that checks inequality with value */ /** Returns a function that checks inequality with value */
export const ne = export const ne =
<T>(v: T) => <T>(v: T) =>
(x: T) => (x: T, ...args: unknown[]) =>
x !== v x !== v
/** Returns a function that gets property value from object */ /** Returns a function that gets property value from object */
@@ -1310,6 +1310,12 @@ export const dissoc =
(o: T) => (o: T) =>
omit([k], o) omit([k], o)
/** Returns a function that checks whether a value is in the given sequence */
export const member =
<T>(xs: Iterable<T>) =>
(x: T) =>
Array.from(xs).includes(x)
// ---------------------------------------------------------------------------- // ----------------------------------------------------------------------------
// Sets // Sets
// ---------------------------------------------------------------------------- // ----------------------------------------------------------------------------
+50 -26
View File
@@ -5,6 +5,7 @@ import {RelayMessage, ClientMessageType, isRelayOk} from "./message.js"
import {AbstractAdapter, AdapterEvent, AdapterContext, getAdapter} from "./adapter.js" import {AbstractAdapter, AdapterEvent, AdapterContext, getAdapter} from "./adapter.js"
export enum PublishStatus { export enum PublishStatus {
Sending = "publish:status:sending",
Pending = "publish:status:pending", Pending = "publish:status:pending",
Success = "publish:status:success", Success = "publish:status:success",
Failure = "publish:status:failure", Failure = "publish:status:failure",
@@ -23,11 +24,11 @@ export type PublishOneOptions = {
signal?: AbortSignal signal?: AbortSignal
timeout?: number timeout?: number
context?: AdapterContext context?: AdapterContext
onStatus?: (status: PublishStatus, relay: string) => void onSuccess?: (detail: string) => void
onSuccess?: (detail: string, relay: string) => void onFailure?: (detail: string) => void
onFailure?: (detail: string, relay: string) => void onPending?: () => void
onTimeout?: (relay: string) => void onTimeout?: () => void
onAborted?: (relay: string) => void onAborted?: () => void
onComplete?: () => void onComplete?: () => void
} }
@@ -37,10 +38,7 @@ export const publishOne = (options: PublishOneOptions) =>
let status = PublishStatus.Pending let status = PublishStatus.Pending
const setStatus = (_status: PublishStatus) => { options.onPending?.()
status = _status
options.onStatus?.(status, options.relay)
}
const cleanup = () => { const cleanup = () => {
options.onComplete?.() options.onComplete?.()
@@ -57,11 +55,11 @@ export const publishOne = (options: PublishOneOptions) =>
if (id !== options.event.id) return if (id !== options.event.id) return
if (ok) { if (ok) {
setStatus(PublishStatus.Success) status = PublishStatus.Success
options.onSuccess?.(detail, options.relay) options.onSuccess?.(detail)
} else { } else {
setStatus(PublishStatus.Failure) status = PublishStatus.Failure
options.onFailure?.(detail, options.relay) options.onFailure?.(detail)
} }
cleanup() cleanup()
@@ -71,8 +69,8 @@ export const publishOne = (options: PublishOneOptions) =>
options.signal?.addEventListener('abort', () => { options.signal?.addEventListener('abort', () => {
if (status === PublishStatus.Pending) { if (status === PublishStatus.Pending) {
setStatus(PublishStatus.Aborted) status = PublishStatus.Aborted
options.onAborted?.(options.relay) options.onAborted?.()
} }
cleanup() cleanup()
@@ -80,26 +78,34 @@ export const publishOne = (options: PublishOneOptions) =>
setTimeout(() => { setTimeout(() => {
if (status === PublishStatus.Pending) { if (status === PublishStatus.Pending) {
setStatus(PublishStatus.Timeout) status = PublishStatus.Timeout
options.onTimeout?.(options.relay) options.onTimeout?.()
} }
cleanup() cleanup()
}, options.timeout || 10_000) }, options.timeout || 10_000)
adapter.send([ClientMessageType.Event, options.event]) adapter.send([ClientMessageType.Event, options.event])
setStatus(PublishStatus.Pending)
}) })
export type PublishStatusByRelay = Record<string, PublishStatus> export type PublishStatusByRelay = Record<string, PublishStatus>
export type PublishOptions = Omit<PublishOneOptions, "relay"> & { export type PublishOptions = {
event: SignedEvent
relays: string[] relays: string[]
onUpdate?: (status: PublishStatusByRelay) => void signal?: AbortSignal
timeout?: number
context?: AdapterContext
onSuccess?: (detail: string, relay: string) => void
onFailure?: (detail: string, relay: string) => void
onPending?: (relay: string) => void
onTimeout?: (relay: string) => void
onAborted?: (relay: string) => void
onComplete?: () => void
} }
export const publish = async (options: PublishOptions) => { export const publish = async (options: PublishOptions) => {
const {event, timeout, signal, context} = options
const status: PublishStatusByRelay = {} const status: PublishStatusByRelay = {}
const completed = new Set<string>() const completed = new Set<string>()
const relays = new Set(options.relays) const relays = new Set(options.relays)
@@ -111,12 +117,30 @@ export const publish = async (options: PublishOptions) => {
await Promise.all( await Promise.all(
options.relays.map(relay => options.relays.map(relay =>
publishOne({ publishOne({
event,
relay, relay,
...options, signal,
onStatus: (_status: PublishStatus, relay: string) => { timeout,
status[relay] = _status context,
options.onStatus?.(_status, relay) onSuccess: (detail: string) => {
options.onUpdate?.(status) status[relay] = PublishStatus.Success
options.onSuccess?.(detail, relay)
},
onFailure: (detail: string) => {
status[relay] = PublishStatus.Failure
options.onFailure?.(detail, relay)
},
onPending: () => {
status[relay] = PublishStatus.Pending
options.onPending?.(relay)
},
onTimeout: () => {
status[relay] = PublishStatus.Timeout
options.onTimeout?.(relay)
},
onAborted: () => {
status[relay] = PublishStatus.Aborted
options.onAborted?.(relay)
}, },
onComplete: () => { onComplete: () => {
completed.add(relay) completed.add(relay)