Move router into app, and out of util
This commit is contained in:
+50
-33
@@ -1,17 +1,21 @@
|
||||
import {isNil} from "@welshman/lib"
|
||||
import {Repository, Relay, LOCAL_RELAY_URL, getFilterResultCardinality} from "@welshman/util"
|
||||
import type {TrustedEvent, Filter} from "@welshman/util"
|
||||
import {Tracker, subscribe as baseSubscribe} from "@welshman/net"
|
||||
import {Tracker, subscribe as baseSubscribe, mergeSubscriptions} from "@welshman/net"
|
||||
import type {SubscribeRequest} from "@welshman/net"
|
||||
import {createEventStore} from "@welshman/store"
|
||||
import type {Router} from './router'
|
||||
|
||||
export const AppContext: {
|
||||
BOOTSTRAP_RELAYS: string[]
|
||||
DUFFLEPUD_URL?: string
|
||||
[key: string]: any
|
||||
router: Router,
|
||||
requestDelay: number
|
||||
requestTimeout: number
|
||||
dufflepudUrl?: string
|
||||
splitRequest?: (req: PartialSubscribeRequest) => SubscribeRequest[]
|
||||
} = {
|
||||
BOOTSTRAP_RELAYS: [],
|
||||
DUFFLEPUD_URL: undefined,
|
||||
router: undefined as unknown as Router,
|
||||
requestDelay: 50,
|
||||
requestTimeout: 3000,
|
||||
}
|
||||
|
||||
export const repository = new Repository<TrustedEvent>()
|
||||
@@ -22,56 +26,69 @@ export const relay = new Relay(repository)
|
||||
|
||||
export const tracker = new Tracker()
|
||||
|
||||
export const subscribe = (request: Partial<SubscribeRequest> & {filters: Filter[]}) => {
|
||||
export type PartialSubscribeRequest = Partial<SubscribeRequest> & {filters: Filter[]}
|
||||
|
||||
export const subscribe = (request: PartialSubscribeRequest) => {
|
||||
const events: TrustedEvent[] = []
|
||||
|
||||
// If we already have all results for any filter, don't send the filter to the network
|
||||
for (const filter of request.filters.splice(0)) {
|
||||
const cardinality = getFilterResultCardinality(filter)
|
||||
if (request.closeOnEose) {
|
||||
for (const filter of request.filters.splice(0)) {
|
||||
const cardinality = getFilterResultCardinality(filter)
|
||||
|
||||
if (!isNil(cardinality)) {
|
||||
const results = repository.query([filter])
|
||||
if (!isNil(cardinality)) {
|
||||
const results = repository.query([filter])
|
||||
|
||||
if (results.length === cardinality) {
|
||||
for (const event of results) {
|
||||
events.push(event)
|
||||
if (results.length === cardinality) {
|
||||
for (const event of results) {
|
||||
events.push(event)
|
||||
}
|
||||
|
||||
break
|
||||
}
|
||||
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
request.filters.push(filter)
|
||||
request.filters.push(filter)
|
||||
}
|
||||
}
|
||||
|
||||
const sub = baseSubscribe({delay: 50, authTimeout: 3000, relays: [], ...request})
|
||||
const delay = AppContext.requestDelay
|
||||
const timeout = AppContext.requestTimeout
|
||||
|
||||
sub.emitter.on("event", (url: string, e: TrustedEvent) => {
|
||||
repository.publish(e)
|
||||
})
|
||||
return mergeSubscriptions(
|
||||
AppContext.splitRequest!(request).map(req => {
|
||||
// Make sure to query our local relay too
|
||||
const relays = [...req.relays, LOCAL_RELAY_URL]
|
||||
const sub = baseSubscribe({delay, authTimeout: timeout, ...req, relays})
|
||||
|
||||
// Keep cached results async so the caller can set up handlers
|
||||
setTimeout(() => {
|
||||
for (const event of events) {
|
||||
sub.emitter.emit("event", LOCAL_RELAY_URL, event)
|
||||
}
|
||||
})
|
||||
sub.emitter.on("event", (url: string, e: TrustedEvent) => {
|
||||
repository.publish(e)
|
||||
})
|
||||
|
||||
return sub
|
||||
// Keep cached results async so the caller can set up handlers
|
||||
setTimeout(() => {
|
||||
for (const event of events) {
|
||||
sub.emitter.emit("event", LOCAL_RELAY_URL, event)
|
||||
}
|
||||
})
|
||||
|
||||
return sub
|
||||
})
|
||||
)
|
||||
}
|
||||
|
||||
export const load = (request: Partial<SubscribeRequest> & {filters: Filter[]}) =>
|
||||
export const load = (request: PartialSubscribeRequest) =>
|
||||
new Promise<TrustedEvent[]>(resolve => {
|
||||
const sub = subscribe({closeOnEose: true, timeout: 3000, ...request})
|
||||
const sub = subscribe({closeOnEose: true, timeout: AppContext.requestTimeout, ...request})
|
||||
const events: TrustedEvent[] = []
|
||||
|
||||
sub.emitter.on("event", (url: string, e: TrustedEvent) => events.push(e))
|
||||
sub.emitter.on("complete", () => resolve(events))
|
||||
})
|
||||
|
||||
export const loadOne = (request: Partial<SubscribeRequest> & {filters: Filter[]}) =>
|
||||
export const loadOne = (request: PartialSubscribeRequest) =>
|
||||
new Promise<TrustedEvent | null>(resolve => {
|
||||
const sub = subscribe({closeOnEose: true, timeout: 3000, ...request})
|
||||
const sub = subscribe({closeOnEose: true, timeout: AppContext.requestTimeout, ...request})
|
||||
|
||||
sub.emitter.on("event", (url: string, event: TrustedEvent) => {
|
||||
resolve(event)
|
||||
|
||||
@@ -16,10 +16,10 @@ export type Handle = {
|
||||
export const handles = withGetter(writable<Handle[]>([]))
|
||||
|
||||
export const fetchHandles = (handles: string[]) => {
|
||||
const base = AppContext.DUFFLEPUD_URL!
|
||||
const base = AppContext.dufflepudUrl!
|
||||
|
||||
if (!base) {
|
||||
throw new Error("DUFFLEPUD_URL is required to fetch nip05 info")
|
||||
throw new Error("AppContext.dufflepudUrl is required to fetch nip05 info")
|
||||
}
|
||||
|
||||
const res: any = postJson(`${base}/handle/info`, {handles})
|
||||
|
||||
@@ -8,6 +8,7 @@ export * from './plaintext'
|
||||
export * from './profiles'
|
||||
export * from './relays'
|
||||
export * from './relaySelections'
|
||||
export * from './router'
|
||||
export * from './session'
|
||||
export * from './storage'
|
||||
export * from './thunk'
|
||||
@@ -17,7 +18,8 @@ export * from './zappers'
|
||||
|
||||
import {NetworkContext} from "@welshman/net"
|
||||
import {type TrustedEvent} from "@welshman/util"
|
||||
import {tracker, repository} from './core'
|
||||
import {tracker, repository, AppContext} from './core'
|
||||
import {splitRequest, makeRouter} from './router'
|
||||
import {onAuth} from './session'
|
||||
|
||||
Object.assign(NetworkContext, {
|
||||
@@ -25,3 +27,8 @@ Object.assign(NetworkContext, {
|
||||
onEvent: (url: string, event: TrustedEvent) => tracker.track(event.id, url),
|
||||
isDeleted: (url: string, event: TrustedEvent) => repository.isDeleted(event),
|
||||
})
|
||||
|
||||
Object.assign(AppContext, {
|
||||
splitRequest,
|
||||
router: makeRouter(),
|
||||
})
|
||||
|
||||
@@ -36,8 +36,8 @@ export const profileSearch = derived(profiles, $profiles =>
|
||||
}),
|
||||
)
|
||||
|
||||
export const displayProfileByPubkey = (pubkey: string) =>
|
||||
export const displayProfileByPubkey = (pubkey = "") =>
|
||||
displayProfile(profilesByPubkey.get().get(pubkey), displayPubkey(pubkey))
|
||||
|
||||
export const deriveProfileDisplay = (pubkey: string) =>
|
||||
export const deriveProfileDisplay = (pubkey = "") =>
|
||||
derived(deriveProfile(pubkey), $profile => displayProfile($profile, displayPubkey(pubkey)))
|
||||
|
||||
@@ -45,10 +45,10 @@ export const relaysByPubkey = derived(relays, $relays =>
|
||||
)
|
||||
|
||||
export const fetchRelayProfiles = (urls: string[]) => {
|
||||
const base = AppContext.DUFFLEPUD_URL!
|
||||
const base = AppContext.dufflepudUrl!
|
||||
|
||||
if (!base) {
|
||||
throw new Error("DUFFLEPUD_URL is required to fetch relay metadata")
|
||||
throw new Error("AppContext.dufflepudUrl is required to fetch relay metadata")
|
||||
}
|
||||
|
||||
const res: any = postJson(`${base}/relay/info`, {urls})
|
||||
|
||||
@@ -1,8 +1,12 @@
|
||||
import {first, splitAt, identity, sortBy, uniq, shuffle, pushToMapKey} from '@welshman/lib'
|
||||
import {Tags} from './Tags'
|
||||
import type {TrustedEvent} from './Events'
|
||||
import {isShareableRelayUrl} from './Relay'
|
||||
import {isCommunityAddress, isGroupAddress} from './Address'
|
||||
import {first, switcher, throttleWithValue, clamp, last, splitAt, identity, sortBy, uniq, shuffle, pushToMapKey} from '@welshman/lib'
|
||||
import {Tags, getFilterId, unionFilters, isShareableRelayUrl, isCommunityAddress, isGroupAddress, isContextAddress} from '@welshman/util'
|
||||
import type {TrustedEvent, Filter} from '@welshman/util'
|
||||
import {NetworkContext, ConnectionStatus} from '@welshman/net'
|
||||
import {AppContext} from './core'
|
||||
import type {PartialSubscribeRequest} from './core'
|
||||
import {pubkey} from './session'
|
||||
import {relaySelectionsByPubkey, getReadRelayUrls, getWriteRelayUrls, getRelayUrls} from './relaySelections'
|
||||
import {relays, relaysByUrl} from './relays'
|
||||
|
||||
export enum RelayMode {
|
||||
Read = "read",
|
||||
@@ -84,8 +88,16 @@ export type ValueRelays = {
|
||||
relays: string[]
|
||||
}
|
||||
|
||||
// Fallback policies
|
||||
|
||||
export type FallbackPolicy = (count: number, limit: number) => number
|
||||
|
||||
export const addNoFallbacks = (count: number, redundancy: number) => 0
|
||||
|
||||
export const addMinimalFallbacks = (count: number, redundancy: number) => count > 0 ? 0 : 1
|
||||
|
||||
export const addMaximalFallbacks = (count: number, redundancy: number) => redundancy - count
|
||||
|
||||
export class Router {
|
||||
constructor(readonly options: RouterOptions) {}
|
||||
|
||||
@@ -170,7 +182,7 @@ export class Router {
|
||||
this.scenario([
|
||||
...this.getUserSelections(RelayMode.Inbox),
|
||||
this.getPubkeySelection(pubkey, RelayMode.Inbox),
|
||||
]).policy(this.addMinimalFallbacks)
|
||||
]).policy(addMinimalFallbacks)
|
||||
|
||||
Event = (event: TrustedEvent) =>
|
||||
this.scenario(this.forceValue(event.id, [
|
||||
@@ -215,7 +227,7 @@ export class Router {
|
||||
if (tags.groups().exists()) {
|
||||
return this
|
||||
.scenario(this.getContextSelections(tags.groups()))
|
||||
.policy(this.addNoFallbacks)
|
||||
.policy(addNoFallbacks)
|
||||
}
|
||||
|
||||
return this.scenario(this.forceValue(event.id, [
|
||||
@@ -234,7 +246,7 @@ export class Router {
|
||||
WithinGroup = (address: string, relays?: string) =>
|
||||
this
|
||||
.scenario(this.getContextSelections(Tags.wrap([["a", address]])))
|
||||
.policy(this.addNoFallbacks)
|
||||
.policy(addNoFallbacks)
|
||||
|
||||
WithinCommunity = (address: string) =>
|
||||
this.scenario(this.getContextSelections(Tags.wrap([["a", address]])))
|
||||
@@ -256,14 +268,6 @@ export class Router {
|
||||
|
||||
Search = (term: string, relays: string[] = []) =>
|
||||
this.product([term], uniq(relays.concat(this.options.getSearchRelays?.() || [])))
|
||||
|
||||
// Fallback policies
|
||||
|
||||
addNoFallbacks = (count: number, redundancy: number) => 0
|
||||
|
||||
addMinimalFallbacks = (count: number, redundancy: number) => count > 0 ? 0 : 1
|
||||
|
||||
addMaximalFallbacks = (count: number, redundancy: number) => redundancy - count
|
||||
}
|
||||
|
||||
// Router Scenario
|
||||
@@ -291,7 +295,7 @@ export class RouterScenario {
|
||||
|
||||
getRedundancy = () => this.options.redundancy || this.router.options.getRedundancy?.() || 3
|
||||
|
||||
getPolicy = () => this.options.policy || this.router.addMaximalFallbacks
|
||||
getPolicy = () => this.options.policy || addMaximalFallbacks
|
||||
|
||||
getLimit = () => this.options.limit || this.router.options.getLimit?.() || 10
|
||||
|
||||
@@ -359,3 +363,223 @@ export class RouterScenario {
|
||||
|
||||
getUrl = () => first(this.getUrls())
|
||||
}
|
||||
|
||||
// Default router options
|
||||
|
||||
export const getRelayQuality = (url: string) => {
|
||||
const oneMinute = 60 * 1000
|
||||
const oneHour = 60 * oneMinute
|
||||
const oneDay = 24 * oneHour
|
||||
const oneWeek = 7 * oneDay
|
||||
const relay = relaysByUrl.get().get(url)
|
||||
const connect_count = relay?.stats?.connect_count || 0
|
||||
const recent_errors = relay?.stats?.recent_errors || []
|
||||
const connection = NetworkContext.pool.get(url, {autoConnect: false})
|
||||
|
||||
// If we haven't connected, consult our relay record and see if there has
|
||||
// been a recent fault. If there has been, penalize the relay. If there have been several,
|
||||
// don't use the relay.
|
||||
if (!connection) {
|
||||
const lastFault = last(recent_errors) || 0
|
||||
|
||||
if (recent_errors.filter(n => n > Date.now() - oneHour).length > 10) {
|
||||
return 0
|
||||
}
|
||||
|
||||
if (recent_errors.filter(n => n > Date.now() - oneDay).length > 50) {
|
||||
return 0
|
||||
}
|
||||
|
||||
if (recent_errors.filter(n => n > Date.now() - oneWeek).length > 100) {
|
||||
return 0
|
||||
}
|
||||
|
||||
return Math.max(0, Math.min(0.5, (Date.now() - oneMinute - lastFault) / oneHour))
|
||||
}
|
||||
|
||||
return switcher(connection.meta.getStatus(), {
|
||||
[ConnectionStatus.Unauthorized]: 0.5,
|
||||
[ConnectionStatus.Forbidden]: 0,
|
||||
[ConnectionStatus.Error]: 0,
|
||||
[ConnectionStatus.Closed]: 0.6,
|
||||
[ConnectionStatus.Slow]: 0.5,
|
||||
[ConnectionStatus.Ok]: 1,
|
||||
default: clamp([0.5, 1], connect_count / 1000),
|
||||
})
|
||||
}
|
||||
|
||||
export const getPubkeyRelays = (pubkey: string, mode?: string) => {
|
||||
const $relaySelections = relaySelectionsByPubkey.get()
|
||||
const $inboxSelections = relaySelectionsByPubkey.get()
|
||||
|
||||
switch (mode) {
|
||||
case RelayMode.Read: return getReadRelayUrls($relaySelections.get(pubkey))
|
||||
case RelayMode.Write: return getWriteRelayUrls($relaySelections.get(pubkey))
|
||||
case RelayMode.Inbox: return getRelayUrls($inboxSelections.get(pubkey))
|
||||
default: return getRelayUrls($relaySelections.get(pubkey))
|
||||
}
|
||||
}
|
||||
|
||||
export const getFallbackRelays = throttleWithValue(300, () =>
|
||||
relays.get().filter(r => getRelayQuality(r.url) >= 0.5).map(r => r.url)
|
||||
)
|
||||
|
||||
export const getSearchRelays = throttleWithValue(300, () =>
|
||||
relays.get().filter(r => getRelayQuality(r.url) >= 0.5 && r.profile?.supported_nips?.includes(50)).map(r => r.url)
|
||||
)
|
||||
|
||||
export const makeRouter = (options: Partial<RouterOptions> = {}) =>
|
||||
new Router({
|
||||
getPubkeyRelays,
|
||||
getFallbackRelays,
|
||||
getSearchRelays,
|
||||
getRelayQuality,
|
||||
getUserPubkey: () => pubkey.get(),
|
||||
getRedundancy: () => 2,
|
||||
getLimit: () => 5,
|
||||
...options,
|
||||
})
|
||||
|
||||
// Infer relay selections from filters
|
||||
|
||||
export type RelayFilters = {
|
||||
relay: string
|
||||
filters: Filter[]
|
||||
}
|
||||
|
||||
export type FilterSelection = {
|
||||
id: string,
|
||||
filter: Filter,
|
||||
scenario: RouterScenario
|
||||
}
|
||||
|
||||
export const makeFilterSelection = (id: string, filter: Filter, scenario: RouterScenario) =>
|
||||
({id, filter, scenario})
|
||||
|
||||
export const getFilterSelectionsForSearch = (filter: Filter) => {
|
||||
const id = getFilterId(filter)
|
||||
const relays = AppContext.router.options.getSearchRelays?.() || []
|
||||
const scenario = AppContext.router.product([id], relays)
|
||||
|
||||
return [makeFilterSelection(id, filter, scenario)]
|
||||
}
|
||||
|
||||
export const getFilterSelectionsForContext = (filter: Filter) => {
|
||||
const filterSelections = []
|
||||
const contexts = filter["#a"].filter(isContextAddress)
|
||||
const scenario = AppContext.router.WithinMultipleContexts(contexts)
|
||||
|
||||
for (const {relay, values} of scenario.getSelections()) {
|
||||
const contextFilter = {...filter, "#a": Array.from(values)}
|
||||
const id = getFilterId(contextFilter)
|
||||
const scenario = AppContext.router.product([id], [relay])
|
||||
|
||||
filterSelections.push(
|
||||
makeFilterSelection(id, contextFilter, scenario)
|
||||
)
|
||||
}
|
||||
|
||||
return filterSelections
|
||||
}
|
||||
|
||||
export const getFilterSelectionsForAuthors = (filter: Filter) => {
|
||||
const filterSelections = []
|
||||
const scenario = AppContext.router.FromPubkeys(filter.authors!)
|
||||
|
||||
for (const {relay, values} of scenario.getSelections()) {
|
||||
const authorsFilter = {...filter, authors: Array.from(values)}
|
||||
const id = getFilterId(authorsFilter)
|
||||
|
||||
filterSelections.push(
|
||||
makeFilterSelection(id, authorsFilter, AppContext.router.product([id], [relay]))
|
||||
)
|
||||
}
|
||||
|
||||
return filterSelections
|
||||
}
|
||||
|
||||
export const getFilterSelectionsForMentions = (filter: Filter) => {
|
||||
const filterSelections = []
|
||||
const scenario = AppContext.router.ForPubkeys(filter['#p']!)
|
||||
|
||||
for (const {relay, values} of scenario.getSelections()) {
|
||||
const mentionsFilter = {...filter, '#p': Array.from(values)}
|
||||
const id = getFilterId(mentionsFilter)
|
||||
|
||||
filterSelections.push(
|
||||
makeFilterSelection(id, mentionsFilter, AppContext.router.product([id], [relay]))
|
||||
)
|
||||
}
|
||||
|
||||
return filterSelections
|
||||
}
|
||||
|
||||
export const getFilterSelectionsForUser = (filter: Filter) => {
|
||||
const id = getFilterId(filter)
|
||||
const scenario = AppContext.router.ReadRelays()
|
||||
|
||||
return [makeFilterSelection(id, filter, AppContext.router.product([id], scenario.getUrls()))]
|
||||
}
|
||||
|
||||
export const getFilterSelections = (filters: Filter[]): RelayFilters[] => {
|
||||
const scenarios: RouterScenario[] = []
|
||||
const filtersById = new Map<string, Filter>()
|
||||
|
||||
const addSelections = (selections: FilterSelection[]) => {
|
||||
for (const {id, filter, scenario} of selections) {
|
||||
filtersById.set(id, filter)
|
||||
scenarios.push(scenario)
|
||||
}
|
||||
}
|
||||
|
||||
for (const filter of filters) {
|
||||
if (filter.search) {
|
||||
addSelections(getFilterSelectionsForSearch(filter))
|
||||
}
|
||||
|
||||
if (filter["#a"]?.some(isContextAddress)) {
|
||||
addSelections(getFilterSelectionsForContext(filter))
|
||||
}
|
||||
|
||||
if (filter.authors) {
|
||||
addSelections(getFilterSelectionsForAuthors(filter))
|
||||
}
|
||||
|
||||
if (filter['#p']) {
|
||||
addSelections(getFilterSelectionsForMentions(filter))
|
||||
}
|
||||
|
||||
if (scenarios.length === 0) {
|
||||
addSelections(getFilterSelectionsForUser(filter))
|
||||
}
|
||||
}
|
||||
|
||||
// Use low redundancy because filters will be very low cardinality
|
||||
const selections = AppContext.router
|
||||
.merge(scenarios)
|
||||
.redundancy(1)
|
||||
.getSelections()
|
||||
.map(({values, relay}) => ({
|
||||
filters: values.map(id => filtersById.get(id)!),
|
||||
relay,
|
||||
}))
|
||||
|
||||
// Pubkey-based selections can get really big. Use the most popular relays for the long tail
|
||||
const limit = AppContext.router.options.getLimit?.() || 8
|
||||
const redundancy = AppContext.router.options.getRedundancy?.() || 3
|
||||
const [keep, discard] = splitAt(limit, selections)
|
||||
|
||||
for (const target of keep.slice(0, redundancy)) {
|
||||
target.filters = unionFilters([...discard, target].flatMap(s => s.filters))
|
||||
}
|
||||
|
||||
return keep
|
||||
}
|
||||
|
||||
export const splitRequest = (req: PartialSubscribeRequest) => {
|
||||
if ((req.relays || []).length > 0) return [req]
|
||||
|
||||
return getFilterSelections(req.filters)
|
||||
.map(({relay, filters}) => ({...req, filters, relays: [relay]}))
|
||||
}
|
||||
|
||||
@@ -10,10 +10,10 @@ import {deriveProfile} from './profiles'
|
||||
export const zappers = withGetter(writable<Zapper[]>([]))
|
||||
|
||||
export const fetchZappers = (lnurls: string[]) => {
|
||||
const base = AppContext.DUFFLEPUD_URL!
|
||||
const base = AppContext.dufflepudUrl!
|
||||
|
||||
if (!base) {
|
||||
throw new Error("DUFFLEPUD_URL is required to fetch zapper info")
|
||||
throw new Error("AppContext.dufflepudUrl is required to fetch zapper info")
|
||||
}
|
||||
|
||||
const zappersByLnurl = new Map<string, Zapper>()
|
||||
|
||||
@@ -430,6 +430,20 @@ export const memoize = <T>(f: (...args: any[]) => T) => {
|
||||
}
|
||||
}
|
||||
|
||||
export const throttleWithValue = <T>(ms: number, f: () => T) => {
|
||||
let value: T
|
||||
|
||||
const update = throttle(ms, () => {
|
||||
value = f()
|
||||
})
|
||||
|
||||
return () => {
|
||||
update()
|
||||
|
||||
return value
|
||||
}
|
||||
}
|
||||
|
||||
export const batch = <T>(t: number, f: (xs: T[]) => void) => {
|
||||
const xs: T[] = []
|
||||
const cb = throttle(t, () => xs.length > 0 && f(xs.splice(0)))
|
||||
|
||||
+123
-76
@@ -1,4 +1,4 @@
|
||||
import {Emitter, identity, max, chunk, randomId, once, groupBy, uniq} from '@welshman/lib'
|
||||
import {Emitter, max, chunk, randomId, once, groupBy, uniq} from '@welshman/lib'
|
||||
import {matchFilters, unionFilters, SignedEvent} from '@welshman/util'
|
||||
import type {Filter} from '@welshman/util'
|
||||
import {Tracker} from "./Tracker"
|
||||
@@ -72,104 +72,147 @@ export const calculateSubscriptionGroup = (sub: Subscription) => {
|
||||
}
|
||||
|
||||
export const mergeSubscriptions = (subs: Subscription[]) => {
|
||||
const completedRelays = new Set()
|
||||
const mergedSubscriptions = []
|
||||
const mergedSub = makeSubscription({
|
||||
relays: uniq(subs.flatMap(sub => sub.request.relays)),
|
||||
filters: unionFilters(subs.flatMap(sub => sub.request.filters)),
|
||||
timeout: max(subs.map(sub => sub.request.timeout || 0)),
|
||||
authTimeout: max(subs.map(sub => sub.request.authTimeout || 0)),
|
||||
closeOnEose: subs.every(sub => sub.request.closeOnEose),
|
||||
})
|
||||
|
||||
for (const group of groupBy(calculateSubscriptionGroup, subs).values()) {
|
||||
const groupSubscriptions = []
|
||||
mergedSub.controller.signal.addEventListener('abort', () => {
|
||||
for (const sub of subs) {
|
||||
sub.close()
|
||||
}
|
||||
})
|
||||
|
||||
for (const relay of uniq(group.flatMap((sub: Subscription) => sub.request.relays))) {
|
||||
const abortedSubs = new Set()
|
||||
const callerSubs = group.filter((sub: Subscription) => sub.request.relays.includes(relay))
|
||||
const mergedSub = makeSubscription({
|
||||
relays: [relay],
|
||||
timeout: callerSubs[0].request.timeout,
|
||||
closeOnEose: callerSubs[0].request.closeOnEose,
|
||||
authTimeout: max(callerSubs.map(r => r.request.authTimeout!).filter(identity)),
|
||||
filters: unionFilters(callerSubs.flatMap((sub: Subscription) => sub.request.filters)),
|
||||
})
|
||||
const completedSubs = new Set()
|
||||
|
||||
for (const {id, controller, request} of callerSubs) {
|
||||
const onAbort = () => {
|
||||
abortedSubs.add(id)
|
||||
for (const sub of subs) {
|
||||
// Propagate events, but avoid duplicates
|
||||
sub.emitter.on(SubscriptionEvent.Event, (url: string, event: SignedEvent) => {
|
||||
if (!mergedSub.tracker.track(event.id, url)) {
|
||||
mergedSub.emitter.emit(SubscriptionEvent.Event, url, event)
|
||||
}
|
||||
})
|
||||
|
||||
if (abortedSubs.size === callerSubs.length) {
|
||||
mergedSub.close()
|
||||
}
|
||||
}
|
||||
// Propagate subscription completion. Since we split subs by relay, we need to wait
|
||||
// until all relays are completed before we notify
|
||||
sub.emitter.on(SubscriptionEvent.Complete, () => {
|
||||
completedSubs.add(sub.id)
|
||||
|
||||
request.signal?.addEventListener('abort', onAbort)
|
||||
controller.signal.addEventListener('abort', onAbort)
|
||||
if (completedSubs.size === subs.length) {
|
||||
mergedSub.emitter.emit(SubscriptionEvent.Complete)
|
||||
}
|
||||
|
||||
mergedSub.emitter.on(SubscriptionEvent.Event, (url: string, event: SignedEvent) => {
|
||||
for (const sub of callerSubs) {
|
||||
if (sub.tracker.track(event.id, url)) {
|
||||
continue
|
||||
sub.emitter.removeAllListeners()
|
||||
})
|
||||
|
||||
// Propagate everything else too
|
||||
const propagateEvent = (type: SubscriptionEvent) =>
|
||||
sub.emitter.on(type, (...args) => mergedSub.emitter.emit(type, ...args))
|
||||
|
||||
propagateEvent(SubscriptionEvent.Duplicate)
|
||||
propagateEvent(SubscriptionEvent.DeletedEvent)
|
||||
propagateEvent(SubscriptionEvent.FailedFilter)
|
||||
propagateEvent(SubscriptionEvent.InvalidSignature)
|
||||
propagateEvent(SubscriptionEvent.Eose)
|
||||
propagateEvent(SubscriptionEvent.Close)
|
||||
}
|
||||
|
||||
return mergedSub
|
||||
}
|
||||
|
||||
export const optimizeSubscriptions = (subs: Subscription[]) =>
|
||||
Array.from(groupBy(calculateSubscriptionGroup, subs).values())
|
||||
.flatMap(group => {
|
||||
const completedRelays = new Set()
|
||||
const mergedSubs = []
|
||||
|
||||
for (const relay of uniq(group.flatMap((sub: Subscription) => sub.request.relays))) {
|
||||
const abortedSubs = new Set()
|
||||
const callerSubs = group.filter((sub: Subscription) => sub.request.relays.includes(relay))
|
||||
const mergedSub = makeSubscription({
|
||||
relays: [relay],
|
||||
closeOnEose: callerSubs.every(sub => sub.request.closeOnEose),
|
||||
timeout: max(callerSubs.map(sub => sub.request.timeout || 0)),
|
||||
authTimeout: max(callerSubs.map(sub => sub.request.authTimeout || 0)),
|
||||
filters: unionFilters(callerSubs.flatMap((sub: Subscription) => sub.request.filters)),
|
||||
})
|
||||
|
||||
for (const {id, controller, request} of callerSubs) {
|
||||
const onAbort = () => {
|
||||
abortedSubs.add(id)
|
||||
|
||||
if (abortedSubs.size === callerSubs.length) {
|
||||
mergedSub.close()
|
||||
}
|
||||
}
|
||||
|
||||
if (!matchFilters(sub.request.filters, event)) {
|
||||
continue
|
||||
}
|
||||
|
||||
sub.emitter.emit(SubscriptionEvent.Event, url, event)
|
||||
request.signal?.addEventListener('abort', onAbort)
|
||||
controller.signal.addEventListener('abort', onAbort)
|
||||
}
|
||||
})
|
||||
|
||||
// Pass events back to caller
|
||||
const propagateEvent = (type: SubscriptionEvent, checkFilter: boolean) =>
|
||||
mergedSub.emitter.on(type, (url: string, event: SignedEvent) => {
|
||||
mergedSub.emitter.on(SubscriptionEvent.Event, (url: string, event: SignedEvent) => {
|
||||
for (const sub of callerSubs) {
|
||||
if (!checkFilter || matchFilters(sub.request.filters, event)) {
|
||||
sub.emitter.emit(type, url, event)
|
||||
if (!sub.tracker.track(event.id, url) && matchFilters(sub.request.filters, event)) {
|
||||
sub.emitter.emit(SubscriptionEvent.Event, url, event)
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
propagateEvent(SubscriptionEvent.Duplicate, true)
|
||||
propagateEvent(SubscriptionEvent.DeletedEvent, false)
|
||||
propagateEvent(SubscriptionEvent.FailedFilter, false)
|
||||
propagateEvent(SubscriptionEvent.InvalidSignature, true)
|
||||
// Pass events back to caller
|
||||
const propagateEvent = (type: SubscriptionEvent, checkFilter: boolean) =>
|
||||
mergedSub.emitter.on(type, (url: string, event: SignedEvent) => {
|
||||
for (const sub of callerSubs) {
|
||||
if (!checkFilter || matchFilters(sub.request.filters, event)) {
|
||||
sub.emitter.emit(type, url, event)
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
// Propagate eose
|
||||
mergedSub.emitter.on(SubscriptionEvent.Eose, (url: string) => {
|
||||
for (const sub of callerSubs) {
|
||||
sub.emitter.emit(SubscriptionEvent.Eose, url)
|
||||
}
|
||||
})
|
||||
propagateEvent(SubscriptionEvent.Duplicate, true)
|
||||
propagateEvent(SubscriptionEvent.DeletedEvent, false)
|
||||
propagateEvent(SubscriptionEvent.FailedFilter, false)
|
||||
propagateEvent(SubscriptionEvent.InvalidSignature, true)
|
||||
|
||||
// Propagate close
|
||||
mergedSub.emitter.on(SubscriptionEvent.Close, (url: string) => {
|
||||
for (const sub of callerSubs) {
|
||||
sub.emitter.emit(SubscriptionEvent.Close, url)
|
||||
}
|
||||
})
|
||||
|
||||
// Propagate subscription completion. Since we split subs by relay, we need to wait
|
||||
// until all relays are completed before we notify
|
||||
mergedSub.emitter.on(SubscriptionEvent.Complete, () => {
|
||||
completedRelays.add(relay)
|
||||
|
||||
for (const sub of callerSubs) {
|
||||
if (sub.request.relays.every(url => completedRelays.has(url))) {
|
||||
sub.emitter.emit(SubscriptionEvent.Complete)
|
||||
// Propagate eose
|
||||
mergedSub.emitter.on(SubscriptionEvent.Eose, (url: string) => {
|
||||
for (const sub of callerSubs) {
|
||||
sub.emitter.emit(SubscriptionEvent.Eose, url)
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
mergedSub.emitter.removeAllListeners()
|
||||
})
|
||||
// Propagate close
|
||||
mergedSub.emitter.on(SubscriptionEvent.Close, (url: string) => {
|
||||
for (const sub of callerSubs) {
|
||||
sub.emitter.emit(SubscriptionEvent.Close, url)
|
||||
}
|
||||
})
|
||||
|
||||
mergedSubscriptions.push(mergedSub)
|
||||
groupSubscriptions.push(mergedSub)
|
||||
}
|
||||
}
|
||||
// Propagate subscription completion. Since we split subs by relay, we need to wait
|
||||
// until all relays are completed before we notify
|
||||
mergedSub.emitter.on(SubscriptionEvent.Complete, () => {
|
||||
completedRelays.add(relay)
|
||||
|
||||
return mergedSubscriptions
|
||||
}
|
||||
for (const sub of callerSubs) {
|
||||
if (sub.request.relays.every(url => completedRelays.has(url))) {
|
||||
sub.emitter.emit(SubscriptionEvent.Complete)
|
||||
}
|
||||
}
|
||||
|
||||
mergedSub.emitter.removeAllListeners()
|
||||
})
|
||||
|
||||
mergedSubs.push(mergedSub)
|
||||
}
|
||||
|
||||
return mergedSubs
|
||||
})
|
||||
|
||||
export const executeSubscription = (sub: Subscription) => {
|
||||
const {request, emitter, tracker, controller} = sub
|
||||
const {timeout, filters, closeOnEose, relays, signal, authTimeout = 0} = request
|
||||
const {filters, closeOnEose, relays, signal, timeout, authTimeout = 0} = request
|
||||
const executor = NetworkContext.getExecutor(relays)
|
||||
const subs: {unsubscribe: () => void}[] = []
|
||||
const completedRelays = new Set()
|
||||
@@ -262,7 +305,7 @@ export const executeSubscription = (sub: Subscription) => {
|
||||
}
|
||||
|
||||
export const executeSubscriptions = (subs: Subscription[]) =>
|
||||
mergeSubscriptions(subs).forEach(executeSubscription)
|
||||
optimizeSubscriptions(subs).forEach(executeSubscription)
|
||||
|
||||
export const executeSubscriptionBatched = (() => {
|
||||
const subs: Subscription[] = []
|
||||
@@ -283,7 +326,11 @@ export const executeSubscriptionBatched = (() => {
|
||||
})()
|
||||
|
||||
export const subscribe = (request: SubscribeRequest) => {
|
||||
const subscription: Subscription = makeSubscription({delay: 800, ...request})
|
||||
const subscription: Subscription = makeSubscription({delay: 50, ...request})
|
||||
|
||||
if (request.relays.length === 0) {
|
||||
console.warn("Attempted to execute a subscription with zero relays")
|
||||
}
|
||||
|
||||
if (request.delay === 0) {
|
||||
executeSubscription(subscription)
|
||||
|
||||
@@ -3,6 +3,12 @@ import {Emitter} from '@welshman/lib'
|
||||
export class Tracker extends Emitter {
|
||||
data = new Map<string, Set<string>>()
|
||||
|
||||
constructor() {
|
||||
super()
|
||||
|
||||
this.setMaxListeners(100)
|
||||
}
|
||||
|
||||
getRelays = (eventId: string) => {
|
||||
const relays = new Set<string>()
|
||||
|
||||
|
||||
@@ -19,6 +19,12 @@ export class Repository<E extends HashedEvent = TrustedEvent> extends Emitter {
|
||||
eventsByAuthor = new Map<string, E[]>()
|
||||
deletes = new Map<string, number>()
|
||||
|
||||
constructor() {
|
||||
super()
|
||||
|
||||
this.setMaxListeners(100)
|
||||
}
|
||||
|
||||
// Dump/load/clear
|
||||
|
||||
dump = () => {
|
||||
|
||||
@@ -9,6 +9,5 @@ export * from './List'
|
||||
export * from './Profile'
|
||||
export * from './Relay'
|
||||
export * from './Repository'
|
||||
export * from './Router'
|
||||
export * from './Tags'
|
||||
export * from './Zaps'
|
||||
|
||||
Reference in New Issue
Block a user