Inflate redundancy when few values are being requested
This commit is contained in:
@@ -47,6 +47,8 @@ export const stripProtocol = (url: string) => url.replace(/.*:\/\//, "")
|
|||||||
|
|
||||||
export const ensurePlural = <T>(x: T | T[]) => (x instanceof Array ? x : [x])
|
export const ensurePlural = <T>(x: T | T[]) => (x instanceof Array ? x : [x])
|
||||||
|
|
||||||
|
export const splitAt = <T>(n: number, xs: T[]) => [xs.slice(0, n), xs.slice(n)]
|
||||||
|
|
||||||
export const hash = (s: string) =>
|
export const hash = (s: string) =>
|
||||||
Math.abs(s.split("").reduce((a, b) => ((a << 5) - a + b.charCodeAt(0)) | 0, 0)).toString()
|
Math.abs(s.split("").reduce((a, b) => ((a << 5) - a + b.charCodeAt(0)) | 0, 0)).toString()
|
||||||
|
|
||||||
@@ -90,11 +92,11 @@ export const initArray = <T>(n: number, f: () => T) => {
|
|||||||
return result
|
return result
|
||||||
}
|
}
|
||||||
|
|
||||||
export const chunk = <T>(chunkLength: number, coll: T[]) => {
|
export const chunk = <T>(chunkLength: number, xs: T[]) => {
|
||||||
const result: T[][] = []
|
const result: T[][] = []
|
||||||
const current: T[] = []
|
const current: T[] = []
|
||||||
|
|
||||||
for (const item of coll) {
|
for (const item of xs) {
|
||||||
if (current.length < chunkLength) {
|
if (current.length < chunkLength) {
|
||||||
current.push(item)
|
current.push(item)
|
||||||
} else {
|
} else {
|
||||||
@@ -109,11 +111,11 @@ export const chunk = <T>(chunkLength: number, coll: T[]) => {
|
|||||||
return result
|
return result
|
||||||
}
|
}
|
||||||
|
|
||||||
export const chunks = <T>(n: number, coll: T[]) => {
|
export const chunks = <T>(n: number, xs: T[]) => {
|
||||||
const result: T[][] = initArray(n, () => [])
|
const result: T[][] = initArray(n, () => [])
|
||||||
|
|
||||||
for (let i = 0; i < coll.length; i++) {
|
for (let i = 0; i < xs.length; i++) {
|
||||||
result[i % n].push(coll[i])
|
result[i % n].push(xs[i])
|
||||||
}
|
}
|
||||||
|
|
||||||
return result
|
return result
|
||||||
|
|||||||
@@ -1,7 +1,7 @@
|
|||||||
import type {Event} from 'nostr-tools'
|
import type {Event} from 'nostr-tools'
|
||||||
import {Emitter, randomId, groupBy, batch, defer, uniq} from '@coracle.social/lib'
|
import {Emitter, randomId, groupBy, batch, defer, uniq} from '@coracle.social/lib'
|
||||||
import type {Deferred} from '@coracle.social/lib'
|
import type {Deferred} from '@coracle.social/lib'
|
||||||
import {matchFilters, calculateFilterGroup, combineFilters} from '@coracle.social/util'
|
import {matchFilters, calculateFilterGroup, mergeFilters} from '@coracle.social/util'
|
||||||
import type {Filter} from '@coracle.social/util'
|
import type {Filter} from '@coracle.social/util'
|
||||||
import {Tracker} from "./Tracker"
|
import {Tracker} from "./Tracker"
|
||||||
import {Connection} from './Connection'
|
import {Connection} from './Connection'
|
||||||
@@ -82,7 +82,7 @@ export const mergeSubscriptions = (subs: Subscription[]) => {
|
|||||||
const mergedSub = makeSubscription({
|
const mergedSub = makeSubscription({
|
||||||
relays: [relay],
|
relays: [relay],
|
||||||
timeout: callerSubs[0].request.timeout,
|
timeout: callerSubs[0].request.timeout,
|
||||||
filters: combineFilters(callerSubs.flatMap((sub: Subscription) => sub.request.filters)),
|
filters: mergeFilters(callerSubs.flatMap((sub: Subscription) => sub.request.filters)),
|
||||||
})
|
})
|
||||||
|
|
||||||
for (const {id, emitter, tracker} of callerSubs) {
|
for (const {id, emitter, tracker} of callerSubs) {
|
||||||
@@ -154,6 +154,12 @@ export const mergeSubscriptions = (subs: Subscription[]) => {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// console.log(
|
||||||
|
// `Starting ${mergedSubscriptions.length} subscriptions on ${uniq(mergedSubscriptions.flatMap(s => s.request.relays)).length} relays`,
|
||||||
|
// uniq(mergedSubscriptions.flatMap(s => s.request.relays)),
|
||||||
|
// ...mergeFilters(mergedSubscriptions.flatMap(s => s.request.filters)),
|
||||||
|
// )
|
||||||
|
|
||||||
return mergedSubscriptions
|
return mergedSubscriptions
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -74,7 +74,7 @@ export const calculateFilterGroup = ({since, until, limit, search, ...filter}: F
|
|||||||
return group.sort().join("-")
|
return group.sort().join("-")
|
||||||
}
|
}
|
||||||
|
|
||||||
export const combineFilters = (filters: Filter[]) => {
|
export const mergeFilters = (filters: Filter[]) => {
|
||||||
const result = []
|
const result = []
|
||||||
|
|
||||||
for (const group of Object.values(groupBy(calculateFilterGroup, filters))) {
|
for (const group of Object.values(groupBy(calculateFilterGroup, filters))) {
|
||||||
@@ -112,7 +112,7 @@ export const getIdFilters = (idsOrAddresses: Iterable<string>) => {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
const filters = combineFilters(aFilters)
|
const filters = mergeFilters(aFilters)
|
||||||
|
|
||||||
if (ids.length > 0) {
|
if (ids.length > 0) {
|
||||||
filters.push({ids})
|
filters.push({ids})
|
||||||
|
|||||||
+24
-10
@@ -22,6 +22,7 @@ export type RouterOptions = {
|
|||||||
getSearchRelays: () => string[]
|
getSearchRelays: () => string[]
|
||||||
getRelayQuality: (url: string) => number
|
getRelayQuality: (url: string) => number
|
||||||
getRedundancy: () => number
|
getRedundancy: () => number
|
||||||
|
getLimit: () => number
|
||||||
}
|
}
|
||||||
|
|
||||||
export type ValuesByRelay = Map<string, string[]>
|
export type ValuesByRelay = Map<string, string[]>
|
||||||
@@ -78,7 +79,11 @@ export class Router {
|
|||||||
// Utilities for processing hints
|
// Utilities for processing hints
|
||||||
|
|
||||||
relaySelectionsFromMap = (valuesByRelay: ValuesByRelay) =>
|
relaySelectionsFromMap = (valuesByRelay: ValuesByRelay) =>
|
||||||
Array.from(valuesByRelay).map(([relay, values]: [string, string[]]) => ({relay, values: uniq(values)}))
|
sortBy(
|
||||||
|
({values}) => -values.length,
|
||||||
|
Array.from(valuesByRelay)
|
||||||
|
.map(([relay, values]: [string, string[]]) => ({relay, values: uniq(values)}))
|
||||||
|
)
|
||||||
|
|
||||||
scoreRelaySelection = ({values, relay}: RelayValues) =>
|
scoreRelaySelection = ({values, relay}: RelayValues) =>
|
||||||
values.length * this.options.getRelayQuality(relay)
|
values.length * this.options.getRelayQuality(relay)
|
||||||
@@ -208,8 +213,11 @@ export class Router {
|
|||||||
WithinMultipleContexts = (addresses: string[]) =>
|
WithinMultipleContexts = (addresses: string[]) =>
|
||||||
this.merge(addresses.map(this.WithinContext))
|
this.merge(addresses.map(this.WithinContext))
|
||||||
|
|
||||||
Search = (term: string) =>
|
Search = (term: string, relays: string[] = []) =>
|
||||||
this.product([term], this.options.getSearchRelays())
|
this.product([term], uniq(this.options.getSearchRelays().concat(relays)))
|
||||||
|
|
||||||
|
Indexers = (relays: string[] = []) =>
|
||||||
|
this.fromRelays(uniq(this.options.getIndexerRelays().concat(relays)))
|
||||||
|
|
||||||
// Fallback policies
|
// Fallback policies
|
||||||
|
|
||||||
@@ -271,17 +279,25 @@ export class RouterScenario {
|
|||||||
|
|
||||||
getPolicy = () => this.options.policy || this.router.addMaximalFallbacks
|
getPolicy = () => this.options.policy || this.router.addMaximalFallbacks
|
||||||
|
|
||||||
getLimit = () => this.options.limit || Infinity
|
getLimit = () => this.options.limit || this.router.options.getLimit()
|
||||||
|
|
||||||
getSelections = () => {
|
getSelections = () => {
|
||||||
|
const allValues = new Set()
|
||||||
const valuesByRelay: ValuesByRelay = new Map()
|
const valuesByRelay: ValuesByRelay = new Map()
|
||||||
for (const {value, relays} of this.selections) {
|
for (const {value, relays} of this.selections) {
|
||||||
|
allValues.add(value)
|
||||||
|
|
||||||
for (const relay of relays) {
|
for (const relay of relays) {
|
||||||
pushToMapKey(valuesByRelay, relay, value)
|
pushToMapKey(valuesByRelay, relay, value)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Adjust redundancy by limit, since if we're looking for very specific values odds
|
||||||
|
// are wee're less tolerant of failure. Add more redundancy to fill our relay limit.
|
||||||
|
const limit = this.getLimit()
|
||||||
const redundancy = this.getRedundancy()
|
const redundancy = this.getRedundancy()
|
||||||
|
const adjustedRedundancy = redundancy * (limit / (allValues.size * redundancy))
|
||||||
|
|
||||||
const seen = new Map<string, number>()
|
const seen = new Map<string, number>()
|
||||||
const result: ValuesByRelay = new Map()
|
const result: ValuesByRelay = new Map()
|
||||||
const relaySelections = this.router.relaySelectionsFromMap(valuesByRelay)
|
const relaySelections = this.router.relaySelectionsFromMap(valuesByRelay)
|
||||||
@@ -290,7 +306,7 @@ export class RouterScenario {
|
|||||||
for (const value of valuesByRelay.get(relay) || []) {
|
for (const value of valuesByRelay.get(relay) || []) {
|
||||||
const timesSeen = seen.get(value) || 0
|
const timesSeen = seen.get(value) || 0
|
||||||
|
|
||||||
if (timesSeen < redundancy) {
|
if (timesSeen < adjustedRedundancy) {
|
||||||
seen.set(value, timesSeen + 1)
|
seen.set(value, timesSeen + 1)
|
||||||
values.add(value)
|
values.add(value)
|
||||||
}
|
}
|
||||||
@@ -305,7 +321,7 @@ export class RouterScenario {
|
|||||||
const fallbackPolicy = this.getPolicy()
|
const fallbackPolicy = this.getPolicy()
|
||||||
for (const {value} of this.selections) {
|
for (const {value} of this.selections) {
|
||||||
const timesSeen = seen.get(value) || 0
|
const timesSeen = seen.get(value) || 0
|
||||||
const fallbacksNeeded = fallbackPolicy(timesSeen, redundancy)
|
const fallbacksNeeded = fallbackPolicy(timesSeen, adjustedRedundancy)
|
||||||
|
|
||||||
if (fallbacksNeeded > 0) {
|
if (fallbacksNeeded > 0) {
|
||||||
for (const relay of fallbacks.slice(0, fallbacksNeeded)) {
|
for (const relay of fallbacks.slice(0, fallbacksNeeded)) {
|
||||||
@@ -314,11 +330,9 @@ export class RouterScenario {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
const limit = this.getLimit()
|
const selections = this.router.relaySelectionsFromMap(result)
|
||||||
|
|
||||||
return limit
|
return limit ? selections.slice(0, limit) : selections
|
||||||
? this.router.relaySelectionsFromMap(result).slice(0, limit)
|
|
||||||
: this.router.relaySelectionsFromMap(result)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
getUrls = () => this.getSelections().map((selection: RelayValues) => selection.relay)
|
getUrls = () => this.getSelections().map((selection: RelayValues) => selection.relay)
|
||||||
|
|||||||
Reference in New Issue
Block a user