rework context
This commit is contained in:
+31
-36
@@ -1,45 +1,40 @@
|
||||
import {uniq} from '@welshman/lib'
|
||||
import {ctx, uniq, noop, always} from '@welshman/lib'
|
||||
import {matchFilters, unionFilters, isSignedEvent, hasValidSignature} from '@welshman/util'
|
||||
import type {Filter, TrustedEvent} from '@welshman/util'
|
||||
import {Pool} from "./Pool"
|
||||
import {Executor} from "./Executor"
|
||||
import {Relays} from "./target/Relays"
|
||||
import type {Subscription} from "./Subscribe"
|
||||
import type {Subscription, RelaysAndFilters} from "./Subscribe"
|
||||
|
||||
export const defaultPool = new Pool()
|
||||
|
||||
export const defaultGetExecutor = (relays: string[]) =>
|
||||
new Executor(new Relays(relays.map((relay: string) => NetworkContext.pool.get(relay))))
|
||||
|
||||
const defaultOnEvent = (url: string, event: TrustedEvent) => null
|
||||
|
||||
const defaultOnAuth = (url: string, challenge: string) => null
|
||||
|
||||
const defaultOnOk = (url: string, id: string, ok: boolean, message: string) => null
|
||||
|
||||
const defaultIsDeleted = (url: string, event: TrustedEvent) => false
|
||||
|
||||
const defaultHasValidSignature = (url: string, event: TrustedEvent) => isSignedEvent(event) && hasValidSignature(event)
|
||||
|
||||
const defaultMatchFilters = (url: string, filters: Filter[], event: TrustedEvent) => matchFilters(filters, event)
|
||||
|
||||
export function* defaultOptimizeSubscriptions(subs: Subscription[]) {
|
||||
for (const relay of uniq(subs.flatMap(sub => sub.request.relays || []))) {
|
||||
const relaySubs = subs.filter(sub => sub.request.relays.includes(relay))
|
||||
const filters = unionFilters(relaySubs.flatMap(sub => sub.request.filters))
|
||||
|
||||
yield {relays: [relay], filters}
|
||||
}
|
||||
export type NetContext = {
|
||||
pool: Pool
|
||||
getExecutor: (relays: string[]) => Executor
|
||||
onEvent: (url: string, event: TrustedEvent) => void
|
||||
onAuth: (url: string, challenge: string) => void
|
||||
onOk: (url: string, id: string, ok: boolean, message: string) => void
|
||||
isDeleted: (url: string, event: TrustedEvent) => boolean
|
||||
hasValidSignature: (url: string, event: TrustedEvent) => boolean
|
||||
matchFilters: (url: string, filters: Filter[], event: TrustedEvent) => boolean
|
||||
optimizeSubscriptions: (subs: Subscription[]) => RelaysAndFilters[]
|
||||
}
|
||||
|
||||
export const NetworkContext = {
|
||||
pool: defaultPool,
|
||||
getExecutor: defaultGetExecutor,
|
||||
onEvent: defaultOnEvent,
|
||||
onAuth: defaultOnAuth,
|
||||
onOk: defaultOnOk,
|
||||
isDeleted: defaultIsDeleted,
|
||||
hasValidSignature: defaultHasValidSignature,
|
||||
matchFilters: defaultMatchFilters,
|
||||
export const defaultOptimizeSubscriptions = (subs: Subscription[]) =>
|
||||
uniq(subs.flatMap(sub => sub.request.relays || []))
|
||||
.map(relay => {
|
||||
const relaySubs = subs.filter(sub => sub.request.relays.includes(relay))
|
||||
const filters = unionFilters(relaySubs.flatMap(sub => sub.request.filters))
|
||||
|
||||
return {relays: [relay], filters}
|
||||
})
|
||||
|
||||
export const getDefaultNetContext = () => ({
|
||||
onOk: noop,
|
||||
onAuth: noop,
|
||||
onEvent: noop,
|
||||
pool: new Pool(),
|
||||
isDeleted: always(false),
|
||||
getExecutor: (relays: string[]) => new Executor(new Relays(relays.map((relay: string) => ctx.net.pool.get(relay)))),
|
||||
hasValidSignature: (url: string, event: TrustedEvent) => isSignedEvent(event) && hasValidSignature(event),
|
||||
matchFilters: (url: string, filters: Filter[], event: TrustedEvent) => matchFilters(filters, event),
|
||||
optimizeSubscriptions: defaultOptimizeSubscriptions,
|
||||
}
|
||||
})
|
||||
|
||||
@@ -1,8 +1,8 @@
|
||||
import {ctx} from '@welshman/lib'
|
||||
import type {Emitter} from '@welshman/lib'
|
||||
import type {SignedEvent, Filter} from '@welshman/util'
|
||||
import type {Message} from './Socket'
|
||||
import type {Connection} from './Connection'
|
||||
import {NetworkContext} from './Context'
|
||||
|
||||
export type Target = Emitter & {
|
||||
connections: Connection[]
|
||||
@@ -22,8 +22,8 @@ const createSubId = (prefix: string) => [prefix, Math.random().toString().slice(
|
||||
export class Executor {
|
||||
|
||||
constructor(readonly target: Target) {
|
||||
target.on('AUTH', NetworkContext.onAuth)
|
||||
target.on('OK', NetworkContext.onOk)
|
||||
target.on('AUTH', ctx.net.onAuth)
|
||||
target.on('OK', ctx.net.onOk)
|
||||
}
|
||||
|
||||
subscribe(filters: Filter[], {onEvent, onEose}: SubscribeOpts = {}) {
|
||||
@@ -33,7 +33,7 @@ export class Executor {
|
||||
|
||||
const eventListener = (url: string, subid: string, e: SignedEvent) => {
|
||||
if (subid === id) {
|
||||
NetworkContext.onEvent(url, e)
|
||||
ctx.net.onEvent(url, e)
|
||||
onEvent?.(url, e)
|
||||
}
|
||||
}
|
||||
@@ -64,7 +64,7 @@ export class Executor {
|
||||
publish(event: SignedEvent, {verb = 'EVENT', onOk, onError}: PublishOpts = {}) {
|
||||
const okListener = (url: string, id: string, ...payload: any[]) => {
|
||||
if (id === event.id) {
|
||||
NetworkContext.onEvent(url, event)
|
||||
ctx.net.onEvent(url, event)
|
||||
onOk?.(url, id, ...payload)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,8 +1,7 @@
|
||||
import {Emitter, now, randomId, defer} from '@welshman/lib'
|
||||
import {ctx, Emitter, now, randomId, defer} from '@welshman/lib'
|
||||
import type {Deferred} from '@welshman/lib'
|
||||
import {asSignedEvent} from '@welshman/util'
|
||||
import type {SignedEvent} from '@welshman/util'
|
||||
import {NetworkContext} from './Context'
|
||||
|
||||
export enum PublishStatus {
|
||||
Pending = "pending",
|
||||
@@ -44,7 +43,7 @@ export const makePublish = (request: PublishRequest) => {
|
||||
export const publish = (request: PublishRequest) => {
|
||||
const pub = makePublish(request)
|
||||
const event = asSignedEvent(request.event)
|
||||
const executor = NetworkContext.getExecutor(request.relays)
|
||||
const executor = ctx.net.getExecutor(request.relays)
|
||||
|
||||
const abort = (reason: PublishStatus) => {
|
||||
for (const [url, status] of pub.status.entries()) {
|
||||
|
||||
@@ -1,16 +1,15 @@
|
||||
import {Emitter, max, chunk, randomId, once, groupBy, uniq} from '@welshman/lib'
|
||||
import {ctx, Emitter, max, chunk, randomId, once, groupBy, uniq} from '@welshman/lib'
|
||||
import {matchFilters, unionFilters, TrustedEvent} from '@welshman/util'
|
||||
import type {Filter} from '@welshman/util'
|
||||
import {Tracker} from "./Tracker"
|
||||
import {Connection} from './Connection'
|
||||
import {NetworkContext} from './Context'
|
||||
|
||||
// `subscribe` is a super function that handles batching subscriptions by merging
|
||||
// them based on parameters (filters and subscribe opts), then splits them by relay.
|
||||
// This results in fewer REQs being opened per connection, fewer duplicate events
|
||||
// being downloaded, and therefore less signature validation.
|
||||
//
|
||||
// Behavior can be further configured using NetworkContext. This can be useful for
|
||||
// Behavior can be further configured using ctx.net. This can be useful for
|
||||
// adding support for querying a local cache like a relay, tracking deleted events,
|
||||
// and bypassing validation for trusted relays.
|
||||
//
|
||||
@@ -30,9 +29,12 @@ export enum SubscriptionEvent {
|
||||
InvalidSignature = "invalid-signature",
|
||||
}
|
||||
|
||||
export type SubscribeRequest = {
|
||||
export type RelaysAndFilters = {
|
||||
relays: string[]
|
||||
filters: Filter[]
|
||||
}
|
||||
|
||||
export type SubscribeRequest = RelaysAndFilters & {
|
||||
delay?: number
|
||||
signal?: AbortSignal
|
||||
timeout?: number
|
||||
@@ -136,7 +138,7 @@ export const optimizeSubscriptions = (subs: Subscription[]) => {
|
||||
const eosedSubs = new Set<string>()
|
||||
const mergedSubs = []
|
||||
|
||||
for (const {relays, filters} of NetworkContext.optimizeSubscriptions(group)) {
|
||||
for (const {relays, filters} of ctx.net.optimizeSubscriptions(group)) {
|
||||
const mergedSub = makeSubscription({filters,
|
||||
relays,
|
||||
timeout,
|
||||
@@ -212,7 +214,7 @@ export const optimizeSubscriptions = (subs: Subscription[]) => {
|
||||
export const executeSubscription = (sub: Subscription) => {
|
||||
const {request, emitter, tracker, controller} = sub
|
||||
const {filters, closeOnEose, relays, signal, timeout, authTimeout = 0} = request
|
||||
const executor = NetworkContext.getExecutor(relays)
|
||||
const executor = ctx.net.getExecutor(relays)
|
||||
const subs: {unsubscribe: () => void}[] = []
|
||||
const completedRelays = new Set()
|
||||
const events: TrustedEvent[] = []
|
||||
@@ -251,11 +253,11 @@ export const executeSubscription = (sub: Subscription) => {
|
||||
const onEvent = (url: string, event: TrustedEvent) => {
|
||||
if (tracker.track(event.id, url)) {
|
||||
emitter.emit(SubscriptionEvent.Duplicate, url, event)
|
||||
} else if (NetworkContext.isDeleted(url, event)) {
|
||||
} else if (ctx.net.isDeleted(url, event)) {
|
||||
emitter.emit(SubscriptionEvent.DeletedEvent, url, event)
|
||||
} else if (!NetworkContext.matchFilters(url, filters, event)) {
|
||||
} else if (!ctx.net.matchFilters(url, filters, event)) {
|
||||
emitter.emit(SubscriptionEvent.FailedFilter, url, event)
|
||||
} else if (!NetworkContext.hasValidSignature(url, event)) {
|
||||
} else if (!ctx.net.hasValidSignature(url, event)) {
|
||||
emitter.emit(SubscriptionEvent.InvalidSignature, url, event)
|
||||
} else {
|
||||
emitter.emit(SubscriptionEvent.Event, url, event)
|
||||
|
||||
Vendored
+8
@@ -0,0 +1,8 @@
|
||||
import type {NetContext} from './Context'
|
||||
|
||||
|
||||
declare module "@welshman/lib" {
|
||||
interface Context {
|
||||
net: NetContext
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user