Add abort to thunk

This commit is contained in:
Jon Staab
2024-10-16 12:07:57 -07:00
parent ac3c2e7d0b
commit f7e570d868
5 changed files with 66 additions and 14 deletions
+2 -4
View File
@@ -1,11 +1,11 @@
import {partition} from "@welshman/lib"
import {defaultOptimizeSubscriptions, getDefaultNetContext as originalGetDefaultNetContext} from "@welshman/net"
import type {Subscription, RelaysAndFilters, NetContext} from "@welshman/net"
import {WRAP, unionFilters, isSignedEvent, hasValidSignature} from "@welshman/util"
import {WRAP, unionFilters} from "@welshman/util"
import type {TrustedEvent, StampedEvent} from "@welshman/util"
import {tracker, repository} from './core'
import {makeRouter, getFilterSelections} from './router'
import {getSession, signer} from './session'
import {signer} from './session'
import type {Router} from './router'
import {loadProfile} from './profiles'
@@ -31,8 +31,6 @@ export const getDefaultNetContext = (overrides: Partial<NetContext> = {}) => ({
}
},
isDeleted: (url: string, event: TrustedEvent) => repository.isDeleted(event),
isValid: (url: string, event: TrustedEvent) =>
getSession(event.pubkey) || (isSignedEvent(event) && hasValidSignature(event)),
optimizeSubscriptions: (subs: Subscription[]) => {
const [withRelays, withoutRelays] = partition(sub => sub.request.relays.length > 0, subs)
const filters = unionFilters(withoutRelays.flatMap(sub => sub.request.filters))
+28 -6
View File
@@ -1,5 +1,5 @@
import {writable, get} from 'svelte/store'
import {Worker, assoc} from '@welshman/lib'
import {Worker, sleep, assoc} from '@welshman/lib'
import {stamp, own, hash} from "@welshman/signer"
import type {TrustedEvent, HashedEvent, EventTemplate, SignedEvent, StampedEvent, OwnedEvent} from '@welshman/util'
import {isStampedEvent, isOwnedEvent, isHashedEvent, isUnwrappedEvent, isSignedEvent} from '@welshman/util'
@@ -24,11 +24,20 @@ export type ThunkWithResolve = {
event: TrustedEvent
relays: string[]
resolve: (data: PublishStatusDataByUrl) => void
delay?: number
signal?: AbortSignal
}
export const thunkWorker = new Worker<ThunkWithResolve>()
thunkWorker.addGlobalHandler(async ({event, relays, resolve}: ThunkWithResolve) => {
thunkWorker.addGlobalHandler(async ({event, relays, resolve, delay, signal}: ThunkWithResolve) => {
let aborted = false
// Handle abort
signal?.addEventListener('abort', () => {
aborted = true
})
// If we were given a wrapped event, make sure to publish the wrapper, not the rumor
if (isUnwrappedEvent(event)) {
event = event.wrap
@@ -48,23 +57,34 @@ thunkWorker.addGlobalHandler(async ({event, relays, resolve}: ThunkWithResolve)
// We're guaranteed to have a signed event at this point
const signedEvent = event as SignedEvent
const {id, sig} = signedEvent
// Wait if the thunk is to be delayed
if (delay) {
await sleep(delay)
}
// Skip publishing and remove from the repository if aborted
if (aborted) {
return repository.removeEvent(signedEvent.id)
}
// Send it off
const pub = publish({event: signedEvent, relays})
// Copy the signature over since we had deferred it
const savedEvent = repository.getEvent(id) as SignedEvent
const savedEvent = repository.getEvent(signedEvent.id) as SignedEvent
// The event may already be replaced or deleted
if (savedEvent) {
savedEvent.sig = sig
savedEvent.sig = signedEvent.sig
}
// Track publish success
const statusByUrl: PublishStatusDataByUrl = {}
pub.emitter.on("*", (status: PublishStatus, url: string, message: string) => {
pub.emitter.on("*", async (status: PublishStatus, url: string, message: string) => {
const {id} = signedEvent
Object.assign(statusByUrl, {[url]: {id, url, status, message}})
publishStatusData.update(assoc(id, statusByUrl))
@@ -103,6 +123,8 @@ export const prepEvent = (event: ThunkEvent) => {
export type ThunkParams = {
event: ThunkEvent
relays: string[]
delay?: number
signal?: AbortSignal
}
export const publishThunk = (params: ThunkParams) =>
+2
View File
@@ -15,6 +15,8 @@ export const noop = (...args: unknown[]) => undefined
export const first = <T>(xs: T[], ...args: unknown[]) => xs[0]
export const ffirst = <T>(xs: T[][], ...args: unknown[]) => xs[0][0]
export const last = <T>(xs: T[], ...args: unknown[]) => xs[xs.length - 1]
export const identity = <T>(x: T, ...args: unknown[]) => x
+2
View File
@@ -116,6 +116,8 @@ export const SEEN_CONTEXT = 10116
export const SEEN_CONVERSATION = 10117
export const LIGHTNING_PUB_RPC = 21000
export const CLIENT_AUTH = 22242
export const AUTH_JOIN = 28934
export const AUTH_INVITE = 28935
export const WALLET_INFO = 13194
export const WALLET_REQUEST = 23194
export const WALLET_RESPONSE = 23195
+32 -4
View File
@@ -80,6 +80,31 @@ export class Repository<E extends HashedEvent = TrustedEvent> extends Emitter {
return duplicate && duplicate.created_at >= event.created_at
}
removeEvent = (idOrAddress: string) => {
const event = this.getEvent(idOrAddress)
if (event) {
this.eventsById.delete(event.id)
if (isUnwrappedEvent(event)) {
this.eventsByWrap.delete(event.wrap.id)
}
this.eventsByAddress.delete(getAddress(event))
for (const [k, v] of event.tags) {
if (k.length === 1) {
this._updateIndex(this.eventsByTag, `${k}:${v}`, undefined, event)
}
}
this._updateIndex(this.eventsByDay, getDay(event.created_at), undefined, event)
this._updateIndex(this.eventsByAuthor, event.pubkey, undefined, event)
this.emit('update', {added: [], removed: [event.id]})
}
}
query = (filters: Filter[], {includeDeleted = false} = {}) => {
const result: E[][] = []
for (let filter of filters) {
@@ -215,14 +240,17 @@ export class Repository<E extends HashedEvent = TrustedEvent> extends Emitter {
// Utilities
_updateIndex<K>(m: Map<K, E[]>, k: K, e: E, duplicate?: E) {
_updateIndex<K>(m: Map<K, E[]>, k: K, add?: E, remove?: E) {
let a = m.get(k) || []
if (duplicate) {
a = a.filter((x: E) => x !== duplicate)
if (remove) {
a = a.filter((x: E) => x !== remove)
}
if (add) {
a.push(add)
}
a.push(e)
m.set(k, a)
}
}