Add anonymous session, re-work thunk utilities

This commit is contained in:
Jon Staab
2025-08-19 13:24:15 -07:00
parent d0d13433c3
commit 00c0497892
4 changed files with 123 additions and 91 deletions
+5 -5
View File
@@ -12,7 +12,7 @@ import {
prepEvent, prepEvent,
publishThunk, publishThunk,
thunkQueue, thunkQueue,
walkThunks, flattenThunks,
} from "../src/thunk" } from "../src/thunk"
const secret = makeSecret() const secret = makeSecret()
@@ -46,19 +46,19 @@ describe("thunk", () => {
const thunk2 = new Thunk(mockRequest) const thunk2 = new Thunk(mockRequest)
const merged = new MergedThunk([thunk1, thunk2]) const merged = new MergedThunk([thunk1, thunk2])
merged.controller.abort() abortThunk(merged)
expect(thunk1.controller.signal.aborted).toBe(true) expect(thunk1.controller.signal.aborted).toBe(true)
expect(thunk2.controller.signal.aborted).toBe(true) expect(thunk2.controller.signal.aborted).toBe(true)
}) })
}) })
describe("walkThunks", () => { describe("flattenThunks", () => {
it("should iterate through nested thunks", () => { it("should iterate through nested thunks", () => {
const thunk1 = new Thunk(mockRequest) const thunk1 = new Thunk(mockRequest)
const thunk2 = new Thunk(mockRequest) const thunk2 = new Thunk(mockRequest)
const merged = new MergedThunk([thunk1, thunk2]) const merged = new MergedThunk([thunk1, thunk2])
const thunks = Array.from(walkThunks([merged, thunk1])) const thunks = Array.from(flattenThunks([merged, thunk1]))
expect(thunks).toHaveLength(3) expect(thunks).toHaveLength(3)
}) })
@@ -78,7 +78,7 @@ describe("thunk", () => {
const removeEventSpy = vi.spyOn(repository, "removeEvent") const removeEventSpy = vi.spyOn(repository, "removeEvent")
const thunk = publishThunk(mockRequest) const thunk = publishThunk(mockRequest)
thunk.controller.abort() abortThunk(thunk)
expect(removeEventSpy).toHaveBeenCalledWith(thunk.event.id) expect(removeEventSpy).toHaveBeenCalledWith(thunk.event.id)
}) })
+6
View File
@@ -19,6 +19,7 @@ export enum SessionMethod {
Nip46 = "nip46", Nip46 = "nip46",
Nip55 = "nip55", Nip55 = "nip55",
Pubkey = "pubkey", Pubkey = "pubkey",
Anonymous = "anonymous",
} }
export type SessionNip01 = { export type SessionNip01 = {
@@ -53,12 +54,17 @@ export type SessionPubkey = {
pubkey: string pubkey: string
} }
export type SessionAnonymous = {
method: SessionMethod.Anonymous
}
export type SessionAnyMethod = export type SessionAnyMethod =
| SessionNip01 | SessionNip01
| SessionNip07 | SessionNip07
| SessionNip46 | SessionNip46
| SessionNip55 | SessionNip55
| SessionPubkey | SessionPubkey
| SessionAnonymous
export type Session = SessionAnyMethod & {wallet?: Wallet} & Record<string, any> export type Session = SessionAnyMethod & {wallet?: Wallet} & Record<string, any>
+107 -86
View File
@@ -3,13 +3,12 @@ import {writable, get} from "svelte/store"
import { import {
TaskQueue, TaskQueue,
ifLet, ifLet,
ensurePlural,
dissoc, dissoc,
remove, remove,
defer, defer,
sleep, sleep,
assoc, assoc,
spec,
nthEq,
nth, nth,
} from "@welshman/lib" } from "@welshman/lib"
import {stamp, own, hash} from "@welshman/signer" import {stamp, own, hash} from "@welshman/signer"
@@ -68,6 +67,13 @@ export class Thunk {
for (const relay of options.relays) { for (const relay of options.relays) {
this.status[relay] = PublishStatus.Sending this.status[relay] = PublishStatus.Sending
} }
this.controller.signal.addEventListener("abort", () => {
console.log("abort")
for (const relay of options.relays) {
this._setAborted(relay)
}
})
} }
_notify() { _notify() {
@@ -85,6 +91,26 @@ export class Thunk {
this._notify() this._notify()
} }
_setPending(relay: string) {
this.options.onPending?.(relay)
this.status[relay] = PublishStatus.Pending
this._notify()
}
_setTimeout(relay: string) {
this.options.onTimeout?.(relay)
this.status[relay] = PublishStatus.Timeout
this.details[relay] = "Publish timed out"
this._notify()
}
_setAborted(relay: string) {
this.options.onAborted?.(relay)
this.status[relay] = PublishStatus.Aborted
this.details[relay] = "Publish was aborted"
this._notify()
}
async publish() { async publish() {
let event = this.event let event = this.event
@@ -149,21 +175,13 @@ export class Thunk {
this._notify() this._notify()
}, },
onPending: (relay: string) => { onPending: (relay: string) => {
this.options.onPending?.(relay) this._setPending(relay)
this.status[relay] = PublishStatus.Pending
this._notify()
}, },
onTimeout: (relay: string) => { onTimeout: (relay: string) => {
this.options.onTimeout?.(relay) this._setTimeout(relay)
this.status[relay] = PublishStatus.Timeout
this.details[relay] = "Publish timed out"
this._notify()
}, },
onAborted: (relay: string) => { onAborted: (relay: string) => {
this.options.onAborted?.(relay) this._setAborted(relay)
this.status[relay] = PublishStatus.Aborted
this.details[relay] = "Publish was aborted"
this._notify()
}, },
onComplete: () => { onComplete: () => {
this.options.onComplete?.() this.options.onComplete?.()
@@ -187,24 +205,21 @@ export class Thunk {
export class MergedThunk { export class MergedThunk {
_subs: Subscriber<MergedThunk>[] = [] _subs: Subscriber<MergedThunk>[] = []
controller = new AbortController()
status: PublishStatusByRelay = {} status: PublishStatusByRelay = {}
details: Record<string, string> = {} details: Record<string, string> = {}
constructor(readonly thunks: Thunk[]) { constructor(readonly thunks: Thunk[]) {
const {Aborted, Failure, Timeout, Pending, Success} = PublishStatus const {Aborted, Failure, Timeout, Pending, Sending, Success} = PublishStatus
const relays = new Set(thunks.flatMap(thunk => Object.keys(thunk.options.relays))) const relays = new Set(thunks.flatMap(thunk => thunk.options.relays))
for (const thunk of thunks) { for (const thunk of thunks) {
this.controller.signal.addEventListener("abort", () => thunk.controller.abort())
thunk.subscribe($thunk => { thunk.subscribe($thunk => {
this.status = {} this.status = {}
this.details = {} this.details = {}
for (const relay of relays) { for (const relay of relays) {
for (const status of [Aborted, Failure, Timeout, Pending, Success]) { for (const status of [Aborted, Failure, Timeout, Pending, Sending, Success]) {
const thunk = thunks.find(spec({[relay]: status})) const thunk = thunks.find(t => t.status[relay] === status)
if (thunk) { if (thunk) {
this.status[relay] = thunk.status[relay]! this.status[relay] = thunk.status[relay]!
@@ -213,9 +228,11 @@ export class MergedThunk {
} }
} }
console.log(this.status)
this._notify() this._notify()
if (thunks.filter(thunkIsComplete).length === thunks.length) { if (thunks.every(thunkIsComplete)) {
this._subs = [] this._subs = []
} }
}) })
@@ -246,79 +263,65 @@ export const isThunk = (thunk: AbstractThunk): thunk is Thunk => thunk instanceo
export const isMergedThunk = (thunk: AbstractThunk): thunk is MergedThunk => export const isMergedThunk = (thunk: AbstractThunk): thunk is MergedThunk =>
thunk instanceof MergedThunk thunk instanceof MergedThunk
export const thunkHasStatus = (thunk: AbstractThunk, status: PublishStatus) => // Thunk status urls
Object.entries(thunk.status).some(nthEq(1, status))
export const thunkUrlsWithStatus = (thunk: AbstractThunk, status: PublishStatus) => export const getThunkUrlsWithStatus = (
Object.entries(thunk.status).filter(nthEq(1, status)).map(nth(0)) statuses: PublishStatus | PublishStatus[],
thunk: AbstractThunk,
export const thunkCompleteUrls = (thunk: AbstractThunk) => { ) => {
const incompleteStatuses = [PublishStatus.Sending, PublishStatus.Pending] statuses = ensurePlural(statuses)
return Object.entries(thunk.status) return Object.entries(thunk.status)
.filter(([_, s]) => !incompleteStatuses.includes(s)) .filter(([_, status]) => statuses.includes(status))
.map(nth(1)) .map(nth(0))
} }
export const thunkIncompleteUrls = (thunk: AbstractThunk) => { export const getCompleteThunkUrls = (thunk: AbstractThunk) =>
const incompleteStatuses = [PublishStatus.Sending, PublishStatus.Pending] getThunkUrlsWithStatus([PublishStatus.Sending, PublishStatus.Pending], thunk)
return Object.entries(thunk.status) export const getIncompleteThunkUrls = (thunk: AbstractThunk) =>
.filter(([_, s]) => incompleteStatuses.includes(s)) getThunkUrlsWithStatus([PublishStatus.Sending, PublishStatus.Pending], thunk)
.map(nth(1))
}
export const thunkIsComplete = (thunk: AbstractThunk) => thunkCompleteUrls(thunk).length > 0 export const getFailedThunkUrls = (thunk: AbstractThunk) =>
getThunkUrlsWithStatus([PublishStatus.Failure, PublishStatus.Timeout], thunk)
export const getThunkError = (thunk: Thunk) => // Thunk status checks
new Promise<string>(resolve => {
thunk.subscribe($thunk => {
for (const [relay, status] of Object.entries($thunk.status)) {
if (status === PublishStatus.Failure) {
resolve($thunk.details[relay])
}
}
if (thunkIsComplete($thunk)) { export const thunkHasStatus = (statuses: PublishStatus | PublishStatus[], thunk: AbstractThunk) =>
resolve("") getThunkUrlsWithStatus(statuses, thunk).length > 0
}
})
})
export const waitForThunkStatus = (thunk: Thunk, status: PublishStatus) => export const thunkIsComplete = (thunk: AbstractThunk) =>
new Promise<boolean>(resolve => { !thunkHasStatus([PublishStatus.Sending, PublishStatus.Pending], thunk)
thunk.subscribe($thunk => {
for (const [_, s] of Object.entries($thunk.status)) {
if (s === status) {
resolve(true)
}
}
if (thunkIsComplete($thunk)) { // Thunk errors
resolve(false)
}
})
})
export const waitForThunkCompletion = (thunk: Thunk) => export const getThunkError = (thunk: Thunk) => {
new Promise<void>(resolve => { for (const [relay, status] of Object.entries(thunk.status)) {
thunk.subscribe($thunk => { if (status === PublishStatus.Failure) {
if (thunkIsComplete($thunk)) { return thunk.details[relay]
resolve()
}
})
})
export function* walkThunks(thunks: AbstractThunk[]): Iterable<Thunk> {
for (const thunk of thunks) {
if (thunk instanceof MergedThunk) {
yield* walkThunks(thunk.thunks)
} else {
yield thunk
} }
} }
if (thunkIsComplete(thunk)) {
return ""
}
} }
// Thunk utilities that return promises
export const waitForThunkError = (thunk: Thunk) =>
new Promise<string>(resolve => {
thunk.subscribe($thunk => {
const error = getThunkError($thunk)
if (error !== undefined) {
resolve(error)
}
})
})
// Thunk state
export const thunks = writable<Record<string, AbstractThunk>>({}) export const thunks = writable<Record<string, AbstractThunk>>({})
export const thunkQueue = new TaskQueue<Thunk>({ export const thunkQueue = new TaskQueue<Thunk>({
@@ -328,6 +331,21 @@ export const thunkQueue = new TaskQueue<Thunk>({
}, },
}) })
// Other thunk utilities
export const mergeThunks = (thunks: AbstractThunk[]) =>
new MergedThunk(Array.from(flattenThunks(thunks)))
export function* flattenThunks(thunks: AbstractThunk[]): Iterable<Thunk> {
for (const thunk of thunks) {
if (isMergedThunk(thunk)) {
yield* flattenThunks(thunk.thunks)
} else {
yield thunk
}
}
}
export const publishThunk = (options: ThunkOptions) => { export const publishThunk = (options: ThunkOptions) => {
const thunk = new Thunk(options) const thunk = new Thunk(options)
@@ -337,15 +355,18 @@ export const publishThunk = (options: ThunkOptions) => {
thunks.update(assoc(thunk.event.id, thunk)) thunks.update(assoc(thunk.event.id, thunk))
thunk.controller.signal.addEventListener("abort", () => {
repository.removeEvent(thunk.event.id)
})
return thunk return thunk
} }
export const abortThunk = (thunk: Thunk) => { export const abortThunk = (thunk: AbstractThunk) => {
thunk.controller.abort() for (const child of flattenThunks([thunk])) {
thunks.update(dissoc(thunk.event.id)) child.controller.abort()
repository.removeEvent(thunk.event.id) thunks.update(dissoc(child.event.id))
repository.removeEvent(child.event.id)
}
} }
export const retryThunk = (thunk: AbstractThunk) =>
isMergedThunk(thunk)
? mergeThunks(thunk.thunks.map(t => publishThunk(t.options)))
: publishThunk(thunk.options)
+5
View File
@@ -1062,6 +1062,11 @@ export const poll = ({interval = 300, condition, signal}: PollOptions) =>
} }
}, interval) }, interval)
if (condition()) {
resolve()
clearInterval(int)
}
signal.addEventListener("abort", () => { signal.addEventListener("abort", () => {
resolve() resolve()
clearInterval(int) clearInterval(int)