From f7e570d868ef517c9dd9ffbf62ef6aef21b9a12d Mon Sep 17 00:00:00 2001 From: Jon Staab Date: Wed, 16 Oct 2024 12:07:57 -0700 Subject: [PATCH] Add abort to thunk --- packages/app/src/context.ts | 6 ++---- packages/app/src/thunk.ts | 34 +++++++++++++++++++++++++------ packages/lib/src/Tools.ts | 2 ++ packages/util/src/Kinds.ts | 2 ++ packages/util/src/Repository.ts | 36 +++++++++++++++++++++++++++++---- 5 files changed, 66 insertions(+), 14 deletions(-) diff --git a/packages/app/src/context.ts b/packages/app/src/context.ts index 50a29bf..ca9bd9a 100644 --- a/packages/app/src/context.ts +++ b/packages/app/src/context.ts @@ -1,11 +1,11 @@ import {partition} from "@welshman/lib" import {defaultOptimizeSubscriptions, getDefaultNetContext as originalGetDefaultNetContext} from "@welshman/net" import type {Subscription, RelaysAndFilters, NetContext} from "@welshman/net" -import {WRAP, unionFilters, isSignedEvent, hasValidSignature} from "@welshman/util" +import {WRAP, unionFilters} from "@welshman/util" import type {TrustedEvent, StampedEvent} from "@welshman/util" import {tracker, repository} from './core' import {makeRouter, getFilterSelections} from './router' -import {getSession, signer} from './session' +import {signer} from './session' import type {Router} from './router' import {loadProfile} from './profiles' @@ -31,8 +31,6 @@ export const getDefaultNetContext = (overrides: Partial = {}) => ({ } }, isDeleted: (url: string, event: TrustedEvent) => repository.isDeleted(event), - isValid: (url: string, event: TrustedEvent) => - getSession(event.pubkey) || (isSignedEvent(event) && hasValidSignature(event)), optimizeSubscriptions: (subs: Subscription[]) => { const [withRelays, withoutRelays] = partition(sub => sub.request.relays.length > 0, subs) const filters = unionFilters(withoutRelays.flatMap(sub => sub.request.filters)) diff --git a/packages/app/src/thunk.ts b/packages/app/src/thunk.ts index 6d5fd5a..bfa2a43 100644 --- a/packages/app/src/thunk.ts +++ b/packages/app/src/thunk.ts @@ -1,5 +1,5 @@ import {writable, get} from 'svelte/store' -import {Worker, assoc} from '@welshman/lib' +import {Worker, sleep, assoc} from '@welshman/lib' import {stamp, own, hash} from "@welshman/signer" import type {TrustedEvent, HashedEvent, EventTemplate, SignedEvent, StampedEvent, OwnedEvent} from '@welshman/util' import {isStampedEvent, isOwnedEvent, isHashedEvent, isUnwrappedEvent, isSignedEvent} from '@welshman/util' @@ -24,11 +24,20 @@ export type ThunkWithResolve = { event: TrustedEvent relays: string[] resolve: (data: PublishStatusDataByUrl) => void + delay?: number + signal?: AbortSignal } export const thunkWorker = new Worker() -thunkWorker.addGlobalHandler(async ({event, relays, resolve}: ThunkWithResolve) => { +thunkWorker.addGlobalHandler(async ({event, relays, resolve, delay, signal}: ThunkWithResolve) => { + let aborted = false + + // Handle abort + signal?.addEventListener('abort', () => { + aborted = true + }) + // If we were given a wrapped event, make sure to publish the wrapper, not the rumor if (isUnwrappedEvent(event)) { event = event.wrap @@ -48,23 +57,34 @@ thunkWorker.addGlobalHandler(async ({event, relays, resolve}: ThunkWithResolve) // We're guaranteed to have a signed event at this point const signedEvent = event as SignedEvent - const {id, sig} = signedEvent + + // Wait if the thunk is to be delayed + if (delay) { + await sleep(delay) + } + + // Skip publishing and remove from the repository if aborted + if (aborted) { + return repository.removeEvent(signedEvent.id) + } // Send it off const pub = publish({event: signedEvent, relays}) // Copy the signature over since we had deferred it - const savedEvent = repository.getEvent(id) as SignedEvent + const savedEvent = repository.getEvent(signedEvent.id) as SignedEvent // The event may already be replaced or deleted if (savedEvent) { - savedEvent.sig = sig + savedEvent.sig = signedEvent.sig } // Track publish success const statusByUrl: PublishStatusDataByUrl = {} - pub.emitter.on("*", (status: PublishStatus, url: string, message: string) => { + pub.emitter.on("*", async (status: PublishStatus, url: string, message: string) => { + const {id} = signedEvent + Object.assign(statusByUrl, {[url]: {id, url, status, message}}) publishStatusData.update(assoc(id, statusByUrl)) @@ -103,6 +123,8 @@ export const prepEvent = (event: ThunkEvent) => { export type ThunkParams = { event: ThunkEvent relays: string[] + delay?: number + signal?: AbortSignal } export const publishThunk = (params: ThunkParams) => diff --git a/packages/lib/src/Tools.ts b/packages/lib/src/Tools.ts index 65f764f..fdef67c 100644 --- a/packages/lib/src/Tools.ts +++ b/packages/lib/src/Tools.ts @@ -15,6 +15,8 @@ export const noop = (...args: unknown[]) => undefined export const first = (xs: T[], ...args: unknown[]) => xs[0] +export const ffirst = (xs: T[][], ...args: unknown[]) => xs[0][0] + export const last = (xs: T[], ...args: unknown[]) => xs[xs.length - 1] export const identity = (x: T, ...args: unknown[]) => x diff --git a/packages/util/src/Kinds.ts b/packages/util/src/Kinds.ts index 62ff0ba..910d115 100644 --- a/packages/util/src/Kinds.ts +++ b/packages/util/src/Kinds.ts @@ -116,6 +116,8 @@ export const SEEN_CONTEXT = 10116 export const SEEN_CONVERSATION = 10117 export const LIGHTNING_PUB_RPC = 21000 export const CLIENT_AUTH = 22242 +export const AUTH_JOIN = 28934 +export const AUTH_INVITE = 28935 export const WALLET_INFO = 13194 export const WALLET_REQUEST = 23194 export const WALLET_RESPONSE = 23195 diff --git a/packages/util/src/Repository.ts b/packages/util/src/Repository.ts index 99057d0..5572cd6 100644 --- a/packages/util/src/Repository.ts +++ b/packages/util/src/Repository.ts @@ -80,6 +80,31 @@ export class Repository extends Emitter { return duplicate && duplicate.created_at >= event.created_at } + removeEvent = (idOrAddress: string) => { + const event = this.getEvent(idOrAddress) + + if (event) { + this.eventsById.delete(event.id) + + if (isUnwrappedEvent(event)) { + this.eventsByWrap.delete(event.wrap.id) + } + + this.eventsByAddress.delete(getAddress(event)) + + for (const [k, v] of event.tags) { + if (k.length === 1) { + this._updateIndex(this.eventsByTag, `${k}:${v}`, undefined, event) + } + } + + this._updateIndex(this.eventsByDay, getDay(event.created_at), undefined, event) + this._updateIndex(this.eventsByAuthor, event.pubkey, undefined, event) + + this.emit('update', {added: [], removed: [event.id]}) + } + } + query = (filters: Filter[], {includeDeleted = false} = {}) => { const result: E[][] = [] for (let filter of filters) { @@ -215,14 +240,17 @@ export class Repository extends Emitter { // Utilities - _updateIndex(m: Map, k: K, e: E, duplicate?: E) { + _updateIndex(m: Map, k: K, add?: E, remove?: E) { let a = m.get(k) || [] - if (duplicate) { - a = a.filter((x: E) => x !== duplicate) + if (remove) { + a = a.filter((x: E) => x !== remove) + } + + if (add) { + a.push(add) } - a.push(e) m.set(k, a) } }