Approach request optimization differently
This commit is contained in:
@@ -1,8 +1,10 @@
|
||||
import {matchFilters, hasValidSignature} from '@welshman/util'
|
||||
import {uniq} from '@welshman/lib'
|
||||
import {matchFilters, unionFilters, hasValidSignature} from '@welshman/util'
|
||||
import type {Filter, SignedEvent} from '@welshman/util'
|
||||
import {Pool} from "./Pool"
|
||||
import {Executor} from "./Executor"
|
||||
import {Relays} from "./target/Relays"
|
||||
import type {Subscription} from "./Subscribe"
|
||||
|
||||
export const defaultPool = new Pool()
|
||||
|
||||
@@ -21,6 +23,15 @@ const defaultHasValidSignature = (url: string, event: SignedEvent) => hasValidSi
|
||||
|
||||
const defaultMatchFilters = (url: string, filters: Filter[], event: SignedEvent) => matchFilters(filters, event)
|
||||
|
||||
export function* defaultOptimizeSubscriptions(subs: Subscription[]) {
|
||||
for (const relay of uniq(subs.flatMap(sub => sub.request.relays || []))) {
|
||||
const relaySubs = subs.filter(sub => sub.request.relays.includes(relay))
|
||||
const filters = unionFilters(relaySubs.flatMap(sub => sub.request.filters))
|
||||
|
||||
yield {relays: [relay], filters}
|
||||
}
|
||||
}
|
||||
|
||||
export const NetworkContext = {
|
||||
pool: defaultPool,
|
||||
getExecutor: defaultGetExecutor,
|
||||
@@ -30,4 +41,5 @@ export const NetworkContext = {
|
||||
isDeleted: defaultIsDeleted,
|
||||
hasValidSignature: defaultHasValidSignature,
|
||||
matchFilters: defaultMatchFilters,
|
||||
optimizeSubscriptions: defaultOptimizeSubscriptions,
|
||||
}
|
||||
|
||||
@@ -66,6 +66,7 @@ export const calculateSubscriptionGroup = (sub: Subscription) => {
|
||||
const parts: string[] = []
|
||||
|
||||
if (sub.request.timeout) parts.push(`timeout:${sub.request.timeout}`)
|
||||
if (sub.request.authTimeout) parts.push(`authTimeout:${sub.request.authTimeout}`)
|
||||
if (sub.request.closeOnEose) parts.push('closeOnEose')
|
||||
|
||||
return parts.join('|')
|
||||
@@ -123,28 +124,31 @@ export const mergeSubscriptions = (subs: Subscription[]) => {
|
||||
return mergedSub
|
||||
}
|
||||
|
||||
export const optimizeSubscriptions = (subs: Subscription[]) =>
|
||||
Array.from(groupBy(calculateSubscriptionGroup, subs).values())
|
||||
export const optimizeSubscriptions = (subs: Subscription[]) => {
|
||||
return Array.from(groupBy(calculateSubscriptionGroup, subs).values())
|
||||
.flatMap(group => {
|
||||
const completedRelays = new Set()
|
||||
const timeout = max(group.map(sub => sub.request.timeout || 0))
|
||||
const authTimeout = max(group.map(sub => sub.request.authTimeout || 0))
|
||||
const closeOnEose = group.every(sub => sub.request.closeOnEose)
|
||||
const completedSubs = new Set<string>()
|
||||
const abortedSubs = new Set<string>()
|
||||
const closedSubs = new Set<string>()
|
||||
const eosedSubs = new Set<string>()
|
||||
const mergedSubs = []
|
||||
|
||||
for (const relay of uniq(group.flatMap((sub: Subscription) => sub.request.relays))) {
|
||||
const abortedSubs = new Set()
|
||||
const callerSubs = group.filter((sub: Subscription) => sub.request.relays.includes(relay))
|
||||
const mergedSub = makeSubscription({
|
||||
relays: [relay],
|
||||
closeOnEose: callerSubs.every(sub => sub.request.closeOnEose),
|
||||
timeout: max(callerSubs.map(sub => sub.request.timeout || 0)),
|
||||
authTimeout: max(callerSubs.map(sub => sub.request.authTimeout || 0)),
|
||||
filters: unionFilters(callerSubs.flatMap((sub: Subscription) => sub.request.filters)),
|
||||
for (const {relays, filters} of NetworkContext.optimizeSubscriptions(group)) {
|
||||
const mergedSub = makeSubscription({filters,
|
||||
relays,
|
||||
timeout,
|
||||
authTimeout,
|
||||
closeOnEose
|
||||
})
|
||||
|
||||
for (const {id, controller, request} of callerSubs) {
|
||||
for (const {id, controller, request} of group) {
|
||||
const onAbort = () => {
|
||||
abortedSubs.add(id)
|
||||
|
||||
if (abortedSubs.size === callerSubs.length) {
|
||||
if (abortedSubs.size === group.length) {
|
||||
mergedSub.close()
|
||||
}
|
||||
}
|
||||
@@ -154,7 +158,7 @@ export const optimizeSubscriptions = (subs: Subscription[]) =>
|
||||
}
|
||||
|
||||
mergedSub.emitter.on(SubscriptionEvent.Event, (url: string, event: SignedEvent) => {
|
||||
for (const sub of callerSubs) {
|
||||
for (const sub of group) {
|
||||
if (!sub.tracker.track(event.id, url) && matchFilters(sub.request.filters, event)) {
|
||||
sub.emitter.emit(SubscriptionEvent.Event, url, event)
|
||||
}
|
||||
@@ -162,53 +166,48 @@ export const optimizeSubscriptions = (subs: Subscription[]) =>
|
||||
})
|
||||
|
||||
// Pass events back to caller
|
||||
const propagateEvent = (type: SubscriptionEvent, checkFilter: boolean) =>
|
||||
const propagateEvent = (type: SubscriptionEvent) =>
|
||||
mergedSub.emitter.on(type, (url: string, event: SignedEvent) => {
|
||||
for (const sub of callerSubs) {
|
||||
if (!checkFilter || matchFilters(sub.request.filters, event)) {
|
||||
for (const sub of group) {
|
||||
if (matchFilters(sub.request.filters, event)) {
|
||||
sub.emitter.emit(type, url, event)
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
propagateEvent(SubscriptionEvent.Duplicate, true)
|
||||
propagateEvent(SubscriptionEvent.DeletedEvent, false)
|
||||
propagateEvent(SubscriptionEvent.FailedFilter, false)
|
||||
propagateEvent(SubscriptionEvent.InvalidSignature, true)
|
||||
propagateEvent(SubscriptionEvent.Duplicate)
|
||||
propagateEvent(SubscriptionEvent.DeletedEvent)
|
||||
propagateEvent(SubscriptionEvent.InvalidSignature)
|
||||
|
||||
// Propagate eose
|
||||
mergedSub.emitter.on(SubscriptionEvent.Eose, (url: string) => {
|
||||
for (const sub of callerSubs) {
|
||||
sub.emitter.emit(SubscriptionEvent.Eose, url)
|
||||
}
|
||||
})
|
||||
const propagateFinality = (type: SubscriptionEvent, subIds: Set<string>) =>
|
||||
mergedSub.emitter.on(type, (...args: any[]) => {
|
||||
subIds.add(mergedSub.id)
|
||||
|
||||
// Propagate close
|
||||
mergedSub.emitter.on(SubscriptionEvent.Close, (url: string) => {
|
||||
for (const sub of callerSubs) {
|
||||
sub.emitter.emit(SubscriptionEvent.Close, url)
|
||||
}
|
||||
})
|
||||
|
||||
// Propagate subscription completion. Since we split subs by relay, we need to wait
|
||||
// until all relays are completed before we notify
|
||||
mergedSub.emitter.on(SubscriptionEvent.Complete, () => {
|
||||
completedRelays.add(relay)
|
||||
|
||||
for (const sub of callerSubs) {
|
||||
if (sub.request.relays.every(url => completedRelays.has(url))) {
|
||||
sub.emitter.emit(SubscriptionEvent.Complete)
|
||||
// Wait for all subscriptions to complete before reporting finality to the caller.
|
||||
// This is sub-optimal, but because we're outsourcing filter/relay optimization
|
||||
// we can't make any assumptions about which caller subscriptions have completed
|
||||
// at any given time.
|
||||
if (subIds.size === group.length) {
|
||||
for (const sub of group) {
|
||||
sub.emitter.emit(type, ...args)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
mergedSub.emitter.removeAllListeners()
|
||||
})
|
||||
if (type === SubscriptionEvent.Complete) {
|
||||
mergedSub.emitter.removeAllListeners()
|
||||
}
|
||||
})
|
||||
|
||||
propagateFinality(SubscriptionEvent.Eose, eosedSubs)
|
||||
propagateFinality(SubscriptionEvent.Close, closedSubs)
|
||||
propagateFinality(SubscriptionEvent.Complete, completedSubs)
|
||||
|
||||
mergedSubs.push(mergedSub)
|
||||
}
|
||||
|
||||
return mergedSubs
|
||||
})
|
||||
}
|
||||
|
||||
export const executeSubscription = (sub: Subscription) => {
|
||||
const {request, emitter, tracker, controller} = sub
|
||||
@@ -328,10 +327,6 @@ export const executeSubscriptionBatched = (() => {
|
||||
export const subscribe = (request: SubscribeRequest) => {
|
||||
const subscription: Subscription = makeSubscription({delay: 50, ...request})
|
||||
|
||||
if (request.relays.length === 0) {
|
||||
console.warn("Attempted to execute a subscription with zero relays")
|
||||
}
|
||||
|
||||
if (request.delay === 0) {
|
||||
executeSubscription(subscription)
|
||||
} else {
|
||||
|
||||
Reference in New Issue
Block a user