Rework thunks

This commit is contained in:
Jon Staab
2026-06-16 17:07:20 -07:00
parent abb9f20747
commit bc728c680e
2 changed files with 155 additions and 174 deletions
+6 -3
View File
@@ -13,7 +13,7 @@ import {
} from "@welshman/util" } from "@welshman/util"
import type {TrustedEvent, PublishedList} from "@welshman/util" import type {TrustedEvent, PublishedList} from "@welshman/util"
import {RepositoryCollection} from "./repositoryCollection.js" import {RepositoryCollection} from "./repositoryCollection.js"
import {Router} from "./router.js" import {Router, addMinimalFallbacks} from "./router.js"
import {Network} from "./network.js" import {Network} from "./network.js"
import {User} from "./user.js" import {User} from "./user.js"
import {Thunks} from "./thunk.js" import {Thunks} from "./thunk.js"
@@ -74,8 +74,11 @@ export class RelayLists extends RepositoryCollection<PublishedList> {
const event = {kind: list.kind, content: list.event?.content || "", tags} const event = {kind: list.kind, content: list.event?.content || "", tags}
// Pass the old relay as an extra so it's notified of the removal too // publishToOutbox is outbox-only, so build relays here to also notify the
return this.ctx.use(Thunks).publishToOutbox({event, relays: [url]}) // removed relay of its removal
const relays = [url, ...this.ctx.use(Router).FromUser().policy(addMinimalFallbacks).getUrls()]
return this.ctx.use(Thunks).publish({event, relays})
} }
setRelays = (tags: string[][]) => { setRelays = (tags: string[][]) => {
+149 -171
View File
@@ -15,13 +15,12 @@ import {PublishStatus, PublishResult, PublishOptions, PublishResultsByRelay} fro
import {Nip01Signer, Nip59} from "@welshman/signer" import {Nip01Signer, Nip59} from "@welshman/signer"
import type {IClient} from "./client.js" import type {IClient} from "./client.js"
import {Network} from "./network.js" import {Network} from "./network.js"
import {Router, addMaximalFallbacks} from "./router.js" import {Router, addMinimalFallbacks} from "./router.js"
import {User} from "./user.js" import {User} from "./user.js"
export type ThunkOptions = Override< export type ThunkOptions = Override<
PublishOptions, PublishOptions,
{ {
user: User
client: IClient client: IClient
event: EventTemplate event: EventTemplate
recipient?: string recipient?: string
@@ -30,21 +29,111 @@ export type ThunkOptions = Override<
} }
> >
export class Thunk { /**
_subs: Subscriber<Thunk>[] = [] * Shared base for `Thunk` and `MergedThunk`: a subscribable bag of per-relay
* publish `results`.
event: HashedEvent */
export abstract class BaseThunk {
_subs: Subscriber<any>[] = []
results: PublishResultsByRelay = {} results: PublishResultsByRelay = {}
abstract abort(): void
_notify() {
for (const subscriber of this._subs) {
subscriber(this)
}
}
subscribe(subscriber: Subscriber<this>) {
this._subs.push(subscriber)
subscriber(this)
return () => {
this._subs = remove(subscriber, this._subs)
}
}
getUrlsWithStatus(statuses: PublishStatus | PublishStatus[]) {
const matches = ensurePlural(statuses)
return Object.entries(this.results)
.filter(([_, {status}]) => matches.includes(status))
.map(nth(0)) as string[]
}
getCompleteUrls() {
return this.getUrlsWithStatus(
without([PublishStatus.Sending, PublishStatus.Pending], Object.values(PublishStatus)),
)
}
getIncompleteUrls() {
return this.getUrlsWithStatus([PublishStatus.Sending, PublishStatus.Pending])
}
getFailedUrls() {
return this.getUrlsWithStatus([PublishStatus.Failure, PublishStatus.Timeout])
}
hasStatus(statuses: PublishStatus | PublishStatus[]) {
return this.getUrlsWithStatus(statuses).length > 0
}
isComplete() {
return !this.hasStatus([PublishStatus.Sending, PublishStatus.Pending])
}
getError() {
for (const [_, {status, detail}] of Object.entries(this.results)) {
if (status === PublishStatus.Failure) {
return detail
}
}
if (this.isComplete()) {
return ""
}
}
waitForError() {
return new Promise<string>(resolve => {
this.subscribe(thunk => {
const error = thunk.getError()
if (error !== undefined) {
resolve(error)
}
})
})
}
waitForCompletion() {
return new Promise<void>(resolve => {
this.subscribe(thunk => {
if (thunk.isComplete()) {
resolve()
}
})
})
}
}
export class Thunk extends BaseThunk {
event: HashedEvent
complete = defer<void>() complete = defer<void>()
controller = new AbortController() controller = new AbortController()
wrap?: SignedEvent wrap?: SignedEvent
constructor(readonly options: ThunkOptions) { constructor(readonly options: ThunkOptions) {
super()
if (!options.recipient && WRAPPED_KINDS.includes(options.event.kind)) { if (!options.recipient && WRAPPED_KINDS.includes(options.event.kind)) {
throw new Error(`Attempted to publish a kind ${options.event.kind} without wrapping it`) throw new Error(`Attempted to publish a kind ${options.event.kind} without wrapping it`)
} }
this.event = prep(options.event, this.options.user.pubkey) this.event = prep(options.event, this.user.pubkey)
for (const relay of options.relays) { for (const relay of options.relays) {
this.results[relay] = { this.results[relay] = {
@@ -65,10 +154,8 @@ export class Thunk {
}) })
} }
_notify() { get user() {
for (const subscriber of this._subs) { return User.require(this.options.client)
subscriber(this)
}
} }
_fail(detail: string) { _fail(detail: string) {
@@ -152,7 +239,7 @@ export class Thunk {
// If we're sending it privately, wrap the event using nip 59 // If we're sending it privately, wrap the event using nip 59
if (recipient) { if (recipient) {
const wrapper = Nip01Signer.ephemeral() const wrapper = Nip01Signer.ephemeral()
const nip59 = new Nip59(this.options.user.signer, wrapper) const nip59 = new Nip59(this.user.signer, wrapper)
this.wrap = await nip59.wrap(recipient, this.event) this.wrap = await nip59.wrap(recipient, this.event)
@@ -184,7 +271,7 @@ export class Thunk {
this.event = await makePow(this.event, this.options.pow).result this.event = await makePow(this.event, this.options.pow).result
} }
const signedEvent = await this.options.user.signer.sign(this.event, { const signedEvent = await this.user.signer.sign(this.event, {
signal: AbortSignal.timeout(30_000), signal: AbortSignal.timeout(30_000),
}) })
@@ -206,160 +293,43 @@ export class Thunk {
} }
} }
subscribe(subscriber: Subscriber<Thunk>) { abort() {
this._subs.push(subscriber) this.controller.abort()
subscriber(this)
return () => {
this._subs = remove(subscriber, this._subs)
}
} }
} }
export class MergedThunk { export class MergedThunk extends BaseThunk {
_subs: Subscriber<MergedThunk>[] = []
results: PublishResultsByRelay = {}
constructor(readonly thunks: Thunk[]) { constructor(readonly thunks: Thunk[]) {
super()
const {Aborted, Failure, Timeout, Pending, Sending, Success} = PublishStatus const {Aborted, Failure, Timeout, Pending, Sending, Success} = PublishStatus
const relays = new Set(thunks.flatMap(thunk => thunk.options.relays)) const relays = new Set(thunks.flatMap(thunk => thunk.options.relays))
for (const thunk of thunks) { for (const thunk of thunks) {
thunk.subscribe($thunk => { thunk.subscribe(() => {
this.results = {} this.results = {}
for (const relay of relays) { for (const relay of relays) {
for (const status of [Aborted, Failure, Timeout, Pending, Sending, Success]) { for (const status of [Aborted, Failure, Timeout, Pending, Sending, Success]) {
const thunk = thunks.find(t => t.results[relay]?.status === status) const match = thunks.find(t => t.results[relay]?.status === status)
if (thunk) { if (match) {
this.results[relay] = thunk.results[relay]! this.results[relay] = match.results[relay]!
} }
} }
} }
this._notify() this._notify()
if (thunks.every(thunkIsComplete)) { if (thunks.every(t => t.isComplete())) {
this._subs = [] this._subs = []
} }
}) })
} }
} }
_notify() { abort() {
for (const subscriber of this._subs) { this.thunks.forEach(thunk => thunk.abort())
subscriber(this)
}
}
subscribe(subscriber: Subscriber<MergedThunk>) {
this._subs.push(subscriber)
subscriber(this)
return () => {
this._subs = remove(subscriber, this._subs)
}
}
}
export type AbstractThunk = Thunk | MergedThunk
export const isThunk = (thunk: AbstractThunk): thunk is Thunk => thunk instanceof Thunk
export const isMergedThunk = (thunk: AbstractThunk): thunk is MergedThunk =>
thunk instanceof MergedThunk
// Thunk status urls
export const getThunkUrlsWithStatus = (
statuses: PublishStatus | PublishStatus[],
thunk: AbstractThunk,
) => {
statuses = ensurePlural(statuses)
return Object.entries(thunk.results)
.filter(([_, {status}]) => statuses.includes(status))
.map(nth(0)) as string[]
}
export const getCompleteThunkUrls = (thunk: AbstractThunk) =>
getThunkUrlsWithStatus(
without([PublishStatus.Sending, PublishStatus.Pending], Object.values(PublishStatus)),
thunk,
)
export const getIncompleteThunkUrls = (thunk: AbstractThunk) =>
getThunkUrlsWithStatus([PublishStatus.Sending, PublishStatus.Pending], thunk)
export const getFailedThunkUrls = (thunk: AbstractThunk) =>
getThunkUrlsWithStatus([PublishStatus.Failure, PublishStatus.Timeout], thunk)
// Thunk status checks
export const thunkHasStatus = (statuses: PublishStatus | PublishStatus[], thunk: AbstractThunk) =>
getThunkUrlsWithStatus(statuses, thunk).length > 0
export const thunkIsComplete = (thunk: AbstractThunk) =>
!thunkHasStatus([PublishStatus.Sending, PublishStatus.Pending], thunk)
// Thunk errors
export const getThunkError = (thunk: Thunk) => {
for (const [_, {status, detail}] of Object.entries(thunk.results)) {
if (status === PublishStatus.Failure) {
return detail
}
}
if (thunkIsComplete(thunk)) {
return ""
}
}
// Thunk utilities that return promises
export const waitForThunkError = (thunk: Thunk) =>
new Promise<string>(resolve => {
thunk.subscribe($thunk => {
const error = getThunkError($thunk)
if (error !== undefined) {
resolve(error)
}
})
})
export const waitForThunkCompletion = (thunk: Thunk) =>
new Promise<void>(resolve => {
thunk.subscribe($thunk => {
if (thunkIsComplete($thunk)) {
resolve()
}
})
})
// Other thunk utilities
export const mergeThunks = (thunks: AbstractThunk[]) =>
new MergedThunk(Array.from(flattenThunks(thunks)))
export function* flattenThunks(thunks: AbstractThunk[]): Iterable<Thunk> {
for (const thunk of thunks) {
if (isMergedThunk(thunk)) {
yield* flattenThunks(thunk.thunks)
} else {
yield thunk
}
}
}
export const abortThunk = (thunk: AbstractThunk) => {
for (const child of flattenThunks([thunk])) {
child.controller.abort()
} }
} }
@@ -367,8 +337,8 @@ export const abortThunk = (thunk: AbstractThunk) => {
* Per-client thunk manager — the publish-side counterpart of `Network`. Owns * Per-client thunk manager — the publish-side counterpart of `Network`. Owns
* the client's optimistic-publish `history` store and the `queue` that paces * the client's optimistic-publish `history` store and the `queue` that paces
* publishing. Reach it via `client.use(Thunks)`; `publish` fills in the client * publishing. Reach it via `client.use(Thunks)`; `publish` fills in the client
* and user, enqueues the thunk (optimistically writing it to the repository), * (the acting user is derived from it), enqueues the thunk (optimistically
* and returns it. * writing it to the repository), and returns it.
*/ */
export class Thunks { export class Thunks {
history = writable<Thunk[]>([]) history = writable<Thunk[]>([])
@@ -383,33 +353,7 @@ export class Thunks {
constructor(readonly ctx: IClient) {} constructor(readonly ctx: IClient) {}
publish = (options: Omit<ThunkOptions, "client" | "user">) => { enqueue(thunk: Thunk) {
const thunk = new Thunk({...options, client: this.ctx, user: User.require(this.ctx)})
this.enqueue(thunk)
return thunk
}
// Publish as the user to their outbox (write) relays, plus any extra `relays`.
publishToOutbox = ({
relays = [],
...options
}: Omit<ThunkOptions, "client" | "user" | "relays"> & {relays?: string[]}) =>
this.publish({
...options,
relays: uniq([
...relays,
...this.ctx.use(Router).FromUser().policy(addMaximalFallbacks).getUrls(),
]),
})
retry = (thunk: AbstractThunk) =>
isMergedThunk(thunk)
? mergeThunks(thunk.thunks.map(t => this.publish(t.options)))
: this.publish(thunk.options)
private enqueue(thunk: Thunk) {
this.queue.push(thunk) this.queue.push(thunk)
for (const url of thunk.options.relays) { for (const url of thunk.options.relays) {
@@ -429,4 +373,38 @@ export class Thunks {
this.history.update($history => remove(thunk, $history)) this.history.update($history => remove(thunk, $history))
}) })
} }
publish = (options: Omit<ThunkOptions, "client">) => {
const thunk = new Thunk({...options, client: this.ctx})
this.enqueue(thunk)
return thunk
}
// Publish as the user to their outbox (write) relays
publishToOutbox = (options: Omit<ThunkOptions, "client" | "relays">) =>
this.publish({
...options,
relays: this.ctx.use(Router).FromUser().policy(addMinimalFallbacks).getUrls(),
})
retry = (thunk: BaseThunk) =>
thunk instanceof MergedThunk
? new MergedThunk(thunk.thunks.map(t => this.publish(t.options)))
: this.publish((thunk as Thunk).options)
merge(thunks: BaseThunk[]) {
return new MergedThunk(Array.from(this.flatten(thunks)))
}
*flatten(thunks: BaseThunk[]): Iterable<Thunk> {
for (const thunk of thunks) {
if (thunk instanceof MergedThunk) {
yield* this.flatten(thunk.thunks)
} else if (thunk instanceof Thunk) {
yield thunk
}
}
}
} }