From 233a4e357636e71bd89805573d1a198904cf0489 Mon Sep 17 00:00:00 2001 From: Jon Staab Date: Thu, 5 Sep 2024 11:47:38 -0700 Subject: [PATCH] Add shortcut handlers to subscribe --- packages/app/src/collection.ts | 24 +--------------------- packages/app/src/core.ts | 5 ++--- packages/app/src/index.ts | 6 ++++-- packages/net/src/Context.ts | 12 +++++------ packages/net/src/Subscribe.ts | 37 +++++++++++++++++++++++----------- 5 files changed, 38 insertions(+), 46 deletions(-) diff --git a/packages/app/src/collection.ts b/packages/app/src/collection.ts index a54f156..e5039cc 100644 --- a/packages/app/src/collection.ts +++ b/packages/app/src/collection.ts @@ -1,10 +1,6 @@ import {readable, derived, type Readable} from 'svelte/store' -import {type SubscribeRequest} from "@welshman/net" import {indexBy, type Maybe, now} from '@welshman/lib' -import {getIdFilters} from '@welshman/util' -import type {TrustedEvent} from '@welshman/util' -import {withGetter, deriveEvents} from '@welshman/store' -import {repository, load} from './core' +import {withGetter} from '@welshman/store' import {getFreshness, setFreshness} from './freshness' export const collection = ({ @@ -61,21 +57,3 @@ export const collection = ({ return {indexStore, deriveItem, loadItem, getItem} } - -export const deriveEvent = (idOrAddress: string, request: Partial = {}) => { - let attempted = false - - const filters = getIdFilters([idOrAddress]) - - return derived( - deriveEvents(repository, {filters, includeDeleted: true}), - (events: TrustedEvent[]) => { - if (!attempted && events.length === 0) { - load({...request, filters}) - attempted = true - } - - return events[0] - }, - ) -} diff --git a/packages/app/src/core.ts b/packages/app/src/core.ts index 2e97f55..0a5cb4d 100644 --- a/packages/app/src/core.ts +++ b/packages/app/src/core.ts @@ -2,7 +2,7 @@ import {isNil} from "@welshman/lib" import {Repository, Relay, LOCAL_RELAY_URL, getFilterResultCardinality} from "@welshman/util" import type {TrustedEvent, Filter} from "@welshman/util" import {Tracker, subscribe as baseSubscribe} from "@welshman/net" -import type {SubscribeRequest} from "@welshman/net" +import type {SubscribeRequestWithHandlers} from "@welshman/net" import {createEventStore} from "@welshman/store" import type {Router} from './router' @@ -11,7 +11,6 @@ export const AppContext: { requestDelay: number requestTimeout: number dufflepudUrl?: string - splitRequest?: (req: PartialSubscribeRequest) => SubscribeRequest[] } = { router: undefined as unknown as Router, requestDelay: 50, @@ -26,7 +25,7 @@ export const relay = new Relay(repository) export const tracker = new Tracker() -export type PartialSubscribeRequest = Partial & {filters: Filter[]} +export type PartialSubscribeRequest = Partial & {filters: Filter[]} export const subscribe = (request: PartialSubscribeRequest) => { const events: TrustedEvent[] = [] diff --git a/packages/app/src/index.ts b/packages/app/src/index.ts index 36d2f83..148c694 100644 --- a/packages/app/src/index.ts +++ b/packages/app/src/index.ts @@ -18,10 +18,10 @@ export * from './zappers' import {partition} from "@welshman/lib" import {type Subscription, NetworkContext, defaultOptimizeSubscriptions} from "@welshman/net" -import {type TrustedEvent, unionFilters} from "@welshman/util" +import {type TrustedEvent, unionFilters, isSignedEvent, hasValidSignature} from "@welshman/util" import {tracker, repository, AppContext} from './core' import {makeRouter, getFilterSelections} from './router' -import {onAuth} from './session' +import {onAuth, getSession} from './session' export function* optimizeSubscriptions(subs: Subscription[]) { const [withRelays, withoutRelays] = partition(sub => sub.request.relays.length > 0, subs) @@ -38,6 +38,8 @@ Object.assign(NetworkContext, { onAuth, onEvent: (url: string, event: TrustedEvent) => tracker.track(event.id, url), isDeleted: (url: string, event: TrustedEvent) => repository.isDeleted(event), + hasValidSignature: (event: TrustedEvent) => + getSession(event.pubkey) || (isSignedEvent(event) && hasValidSignature(event)), optimizeSubscriptions, }) diff --git a/packages/net/src/Context.ts b/packages/net/src/Context.ts index 51c38da..eb0a860 100644 --- a/packages/net/src/Context.ts +++ b/packages/net/src/Context.ts @@ -1,6 +1,6 @@ import {uniq} from '@welshman/lib' -import {matchFilters, unionFilters, hasValidSignature} from '@welshman/util' -import type {Filter, SignedEvent} from '@welshman/util' +import {matchFilters, unionFilters, isSignedEvent, hasValidSignature} from '@welshman/util' +import type {Filter, TrustedEvent} from '@welshman/util' import {Pool} from "./Pool" import {Executor} from "./Executor" import {Relays} from "./target/Relays" @@ -11,17 +11,17 @@ export const defaultPool = new Pool() export const defaultGetExecutor = (relays: string[]) => new Executor(new Relays(relays.map((relay: string) => NetworkContext.pool.get(relay)))) -const defaultOnEvent = (url: string, event: SignedEvent) => null +const defaultOnEvent = (url: string, event: TrustedEvent) => null const defaultOnAuth = (url: string, challenge: string) => null const defaultOnOk = (url: string, id: string, ok: boolean, message: string) => null -const defaultIsDeleted = (url: string, event: SignedEvent) => false +const defaultIsDeleted = (url: string, event: TrustedEvent) => false -const defaultHasValidSignature = (url: string, event: SignedEvent) => hasValidSignature(event) +const defaultHasValidSignature = (url: string, event: TrustedEvent) => isSignedEvent(event) && hasValidSignature(event) -const defaultMatchFilters = (url: string, filters: Filter[], event: SignedEvent) => matchFilters(filters, event) +const defaultMatchFilters = (url: string, filters: Filter[], event: TrustedEvent) => matchFilters(filters, event) export function* defaultOptimizeSubscriptions(subs: Subscription[]) { for (const relay of uniq(subs.flatMap(sub => sub.request.relays || []))) { diff --git a/packages/net/src/Subscribe.ts b/packages/net/src/Subscribe.ts index e532fb2..ae4b378 100644 --- a/packages/net/src/Subscribe.ts +++ b/packages/net/src/Subscribe.ts @@ -1,5 +1,5 @@ import {Emitter, max, chunk, randomId, once, groupBy, uniq} from '@welshman/lib' -import {matchFilters, unionFilters, SignedEvent} from '@welshman/util' +import {matchFilters, unionFilters, TrustedEvent} from '@welshman/util' import type {Filter} from '@welshman/util' import {Tracker} from "./Tracker" import {Connection} from './Connection' @@ -91,7 +91,7 @@ export const mergeSubscriptions = (subs: Subscription[]) => { for (const sub of subs) { // Propagate events, but avoid duplicates - sub.emitter.on(SubscriptionEvent.Event, (url: string, event: SignedEvent) => { + sub.emitter.on(SubscriptionEvent.Event, (url: string, event: TrustedEvent) => { if (!mergedSub.tracker.track(event.id, url)) { mergedSub.emitter.emit(SubscriptionEvent.Event, url, event) } @@ -157,7 +157,7 @@ export const optimizeSubscriptions = (subs: Subscription[]) => { controller.signal.addEventListener('abort', onAbort) } - mergedSub.emitter.on(SubscriptionEvent.Event, (url: string, event: SignedEvent) => { + mergedSub.emitter.on(SubscriptionEvent.Event, (url: string, event: TrustedEvent) => { for (const sub of group) { if (!sub.tracker.track(event.id, url) && matchFilters(sub.request.filters, event)) { sub.emitter.emit(SubscriptionEvent.Event, url, event) @@ -167,7 +167,7 @@ export const optimizeSubscriptions = (subs: Subscription[]) => { // Pass events back to caller const propagateEvent = (type: SubscriptionEvent) => - mergedSub.emitter.on(type, (url: string, event: SignedEvent) => { + mergedSub.emitter.on(type, (url: string, event: TrustedEvent) => { for (const sub of group) { if (matchFilters(sub.request.filters, event)) { sub.emitter.emit(type, url, event) @@ -215,11 +215,11 @@ export const executeSubscription = (sub: Subscription) => { const executor = NetworkContext.getExecutor(relays) const subs: {unsubscribe: () => void}[] = [] const completedRelays = new Set() - const events: SignedEvent[] = [] + const events: TrustedEvent[] = [] // Hook up our events - emitter.on(SubscriptionEvent.Event, (url: string, event: SignedEvent) => { + emitter.on(SubscriptionEvent.Event, (url: string, event: TrustedEvent) => { events.push(event) }) @@ -248,7 +248,7 @@ export const executeSubscription = (sub: Subscription) => { // Functions for emitting events - const onEvent = (url: string, event: SignedEvent) => { + const onEvent = (url: string, event: TrustedEvent) => { if (tracker.track(event.id, url)) { emitter.emit(SubscriptionEvent.Duplicate, url, event) } else if (NetworkContext.isDeleted(url, event)) { @@ -324,14 +324,27 @@ export const executeSubscriptionBatched = (() => { } })() -export const subscribe = (request: SubscribeRequest) => { - const subscription: Subscription = makeSubscription({delay: 50, ...request}) +export type SubscribeRequestWithHandlers = SubscribeRequest & { + onEvent?: (event: TrustedEvent) => void + onEose?: (url: string) => void + onClose?: (url: string) => void + onComplete?: () => void +} + +export const subscribe = ({onEvent, onEose, onClose, onComplete, ...request}: SubscribeRequestWithHandlers) => { + const sub: Subscription = makeSubscription({delay: 50, ...request}) if (request.delay === 0) { - executeSubscription(subscription) + executeSubscription(sub) } else { - executeSubscriptionBatched(subscription) + executeSubscriptionBatched(sub) } - return subscription + // Signature for onEvent is different from emitter signature for historical reasons and convenience + if (onEvent) sub.emitter.on(SubscriptionEvent.Event, (url: string, event: TrustedEvent) => onEvent(event)) + if (onEose) sub.emitter.on(SubscriptionEvent.Eose, onEose) + if (onClose) sub.emitter.on(SubscriptionEvent.Close, onClose) + if (onComplete) sub.emitter.on(SubscriptionEvent.Complete, onComplete) + + return sub }