Add shortcut handlers to subscribe

This commit is contained in:
Jon Staab
2024-09-05 11:47:38 -07:00
parent 25845dc837
commit 233a4e3576
5 changed files with 38 additions and 46 deletions
+1 -23
View File
@@ -1,10 +1,6 @@
import {readable, derived, type Readable} from 'svelte/store'
import {type SubscribeRequest} from "@welshman/net"
import {indexBy, type Maybe, now} from '@welshman/lib'
import {getIdFilters} from '@welshman/util'
import type {TrustedEvent} from '@welshman/util'
import {withGetter, deriveEvents} from '@welshman/store'
import {repository, load} from './core'
import {withGetter} from '@welshman/store'
import {getFreshness, setFreshness} from './freshness'
export const collection = <T, LoadArgs extends any[]>({
@@ -61,21 +57,3 @@ export const collection = <T, LoadArgs extends any[]>({
return {indexStore, deriveItem, loadItem, getItem}
}
export const deriveEvent = (idOrAddress: string, request: Partial<SubscribeRequest> = {}) => {
let attempted = false
const filters = getIdFilters([idOrAddress])
return derived(
deriveEvents(repository, {filters, includeDeleted: true}),
(events: TrustedEvent[]) => {
if (!attempted && events.length === 0) {
load({...request, filters})
attempted = true
}
return events[0]
},
)
}
+2 -3
View File
@@ -2,7 +2,7 @@ import {isNil} from "@welshman/lib"
import {Repository, Relay, LOCAL_RELAY_URL, getFilterResultCardinality} from "@welshman/util"
import type {TrustedEvent, Filter} from "@welshman/util"
import {Tracker, subscribe as baseSubscribe} from "@welshman/net"
import type {SubscribeRequest} from "@welshman/net"
import type {SubscribeRequestWithHandlers} from "@welshman/net"
import {createEventStore} from "@welshman/store"
import type {Router} from './router'
@@ -11,7 +11,6 @@ export const AppContext: {
requestDelay: number
requestTimeout: number
dufflepudUrl?: string
splitRequest?: (req: PartialSubscribeRequest) => SubscribeRequest[]
} = {
router: undefined as unknown as Router,
requestDelay: 50,
@@ -26,7 +25,7 @@ export const relay = new Relay(repository)
export const tracker = new Tracker()
export type PartialSubscribeRequest = Partial<SubscribeRequest> & {filters: Filter[]}
export type PartialSubscribeRequest = Partial<SubscribeRequestWithHandlers> & {filters: Filter[]}
export const subscribe = (request: PartialSubscribeRequest) => {
const events: TrustedEvent[] = []
+4 -2
View File
@@ -18,10 +18,10 @@ export * from './zappers'
import {partition} from "@welshman/lib"
import {type Subscription, NetworkContext, defaultOptimizeSubscriptions} from "@welshman/net"
import {type TrustedEvent, unionFilters} from "@welshman/util"
import {type TrustedEvent, unionFilters, isSignedEvent, hasValidSignature} from "@welshman/util"
import {tracker, repository, AppContext} from './core'
import {makeRouter, getFilterSelections} from './router'
import {onAuth} from './session'
import {onAuth, getSession} from './session'
export function* optimizeSubscriptions(subs: Subscription[]) {
const [withRelays, withoutRelays] = partition(sub => sub.request.relays.length > 0, subs)
@@ -38,6 +38,8 @@ Object.assign(NetworkContext, {
onAuth,
onEvent: (url: string, event: TrustedEvent) => tracker.track(event.id, url),
isDeleted: (url: string, event: TrustedEvent) => repository.isDeleted(event),
hasValidSignature: (event: TrustedEvent) =>
getSession(event.pubkey) || (isSignedEvent(event) && hasValidSignature(event)),
optimizeSubscriptions,
})
+6 -6
View File
@@ -1,6 +1,6 @@
import {uniq} from '@welshman/lib'
import {matchFilters, unionFilters, hasValidSignature} from '@welshman/util'
import type {Filter, SignedEvent} from '@welshman/util'
import {matchFilters, unionFilters, isSignedEvent, hasValidSignature} from '@welshman/util'
import type {Filter, TrustedEvent} from '@welshman/util'
import {Pool} from "./Pool"
import {Executor} from "./Executor"
import {Relays} from "./target/Relays"
@@ -11,17 +11,17 @@ export const defaultPool = new Pool()
export const defaultGetExecutor = (relays: string[]) =>
new Executor(new Relays(relays.map((relay: string) => NetworkContext.pool.get(relay))))
const defaultOnEvent = (url: string, event: SignedEvent) => null
const defaultOnEvent = (url: string, event: TrustedEvent) => null
const defaultOnAuth = (url: string, challenge: string) => null
const defaultOnOk = (url: string, id: string, ok: boolean, message: string) => null
const defaultIsDeleted = (url: string, event: SignedEvent) => false
const defaultIsDeleted = (url: string, event: TrustedEvent) => false
const defaultHasValidSignature = (url: string, event: SignedEvent) => hasValidSignature(event)
const defaultHasValidSignature = (url: string, event: TrustedEvent) => isSignedEvent(event) && hasValidSignature(event)
const defaultMatchFilters = (url: string, filters: Filter[], event: SignedEvent) => matchFilters(filters, event)
const defaultMatchFilters = (url: string, filters: Filter[], event: TrustedEvent) => matchFilters(filters, event)
export function* defaultOptimizeSubscriptions(subs: Subscription[]) {
for (const relay of uniq(subs.flatMap(sub => sub.request.relays || []))) {
+25 -12
View File
@@ -1,5 +1,5 @@
import {Emitter, max, chunk, randomId, once, groupBy, uniq} from '@welshman/lib'
import {matchFilters, unionFilters, SignedEvent} from '@welshman/util'
import {matchFilters, unionFilters, TrustedEvent} from '@welshman/util'
import type {Filter} from '@welshman/util'
import {Tracker} from "./Tracker"
import {Connection} from './Connection'
@@ -91,7 +91,7 @@ export const mergeSubscriptions = (subs: Subscription[]) => {
for (const sub of subs) {
// Propagate events, but avoid duplicates
sub.emitter.on(SubscriptionEvent.Event, (url: string, event: SignedEvent) => {
sub.emitter.on(SubscriptionEvent.Event, (url: string, event: TrustedEvent) => {
if (!mergedSub.tracker.track(event.id, url)) {
mergedSub.emitter.emit(SubscriptionEvent.Event, url, event)
}
@@ -157,7 +157,7 @@ export const optimizeSubscriptions = (subs: Subscription[]) => {
controller.signal.addEventListener('abort', onAbort)
}
mergedSub.emitter.on(SubscriptionEvent.Event, (url: string, event: SignedEvent) => {
mergedSub.emitter.on(SubscriptionEvent.Event, (url: string, event: TrustedEvent) => {
for (const sub of group) {
if (!sub.tracker.track(event.id, url) && matchFilters(sub.request.filters, event)) {
sub.emitter.emit(SubscriptionEvent.Event, url, event)
@@ -167,7 +167,7 @@ export const optimizeSubscriptions = (subs: Subscription[]) => {
// Pass events back to caller
const propagateEvent = (type: SubscriptionEvent) =>
mergedSub.emitter.on(type, (url: string, event: SignedEvent) => {
mergedSub.emitter.on(type, (url: string, event: TrustedEvent) => {
for (const sub of group) {
if (matchFilters(sub.request.filters, event)) {
sub.emitter.emit(type, url, event)
@@ -215,11 +215,11 @@ export const executeSubscription = (sub: Subscription) => {
const executor = NetworkContext.getExecutor(relays)
const subs: {unsubscribe: () => void}[] = []
const completedRelays = new Set()
const events: SignedEvent[] = []
const events: TrustedEvent[] = []
// Hook up our events
emitter.on(SubscriptionEvent.Event, (url: string, event: SignedEvent) => {
emitter.on(SubscriptionEvent.Event, (url: string, event: TrustedEvent) => {
events.push(event)
})
@@ -248,7 +248,7 @@ export const executeSubscription = (sub: Subscription) => {
// Functions for emitting events
const onEvent = (url: string, event: SignedEvent) => {
const onEvent = (url: string, event: TrustedEvent) => {
if (tracker.track(event.id, url)) {
emitter.emit(SubscriptionEvent.Duplicate, url, event)
} else if (NetworkContext.isDeleted(url, event)) {
@@ -324,14 +324,27 @@ export const executeSubscriptionBatched = (() => {
}
})()
export const subscribe = (request: SubscribeRequest) => {
const subscription: Subscription = makeSubscription({delay: 50, ...request})
export type SubscribeRequestWithHandlers = SubscribeRequest & {
onEvent?: (event: TrustedEvent) => void
onEose?: (url: string) => void
onClose?: (url: string) => void
onComplete?: () => void
}
export const subscribe = ({onEvent, onEose, onClose, onComplete, ...request}: SubscribeRequestWithHandlers) => {
const sub: Subscription = makeSubscription({delay: 50, ...request})
if (request.delay === 0) {
executeSubscription(subscription)
executeSubscription(sub)
} else {
executeSubscriptionBatched(subscription)
executeSubscriptionBatched(sub)
}
return subscription
// Signature for onEvent is different from emitter signature for historical reasons and convenience
if (onEvent) sub.emitter.on(SubscriptionEvent.Event, (url: string, event: TrustedEvent) => onEvent(event))
if (onEose) sub.emitter.on(SubscriptionEvent.Eose, onEose)
if (onClose) sub.emitter.on(SubscriptionEvent.Close, onClose)
if (onComplete) sub.emitter.on(SubscriptionEvent.Complete, onComplete)
return sub
}