fix subscription grouping to not include filter keys
This commit is contained in:
+39
-23
@@ -1,18 +1,18 @@
|
|||||||
import {throttle} from 'throttle-debounce'
|
import {throttle} from 'throttle-debounce'
|
||||||
import {bech32, utf8} from "@scure/base"
|
import {bech32, utf8} from "@scure/base"
|
||||||
|
|
||||||
|
// Data types
|
||||||
|
|
||||||
export type Nil = null | undefined
|
export type Nil = null | undefined
|
||||||
|
|
||||||
export const now = () => Math.round(Date.now() / 1000)
|
// Regular old utils
|
||||||
|
|
||||||
export const nth = (i: number) => <T>(xs: T[]) => xs[i]
|
export const now = () => Math.round(Date.now() / 1000)
|
||||||
|
|
||||||
export const first = <T>(xs: T[], ...args: unknown[]) => xs[0]
|
export const first = <T>(xs: T[], ...args: unknown[]) => xs[0]
|
||||||
|
|
||||||
export const last = <T>(xs: T[], ...args: unknown[]) => xs[xs.length - 1]
|
export const last = <T>(xs: T[], ...args: unknown[]) => xs[xs.length - 1]
|
||||||
|
|
||||||
export const prop = (k: string) => <T>(x: Record<string, T>) => x[k]
|
|
||||||
|
|
||||||
export const identity = <T>(x: T) => x
|
export const identity = <T>(x: T) => x
|
||||||
|
|
||||||
export const inc = (x: number | Nil) => (x || 0) + 1
|
export const inc = (x: number | Nil) => (x || 0) + 1
|
||||||
@@ -33,6 +33,39 @@ export const take = <T>(n: number, xs: T[]) => xs.slice(0, n)
|
|||||||
|
|
||||||
export const between = (low: number, high: number, n: number) => n > low && n < high
|
export const between = (low: number, high: number, n: number) => n > low && n < high
|
||||||
|
|
||||||
|
export const randomId = (): string => Math.random().toString().slice(2)
|
||||||
|
|
||||||
|
export const stripProtocol = (url: string) => url.replace(/.*:\/\//, "")
|
||||||
|
|
||||||
|
// Curried utils
|
||||||
|
|
||||||
|
export const nth = (i: number) => <T>(xs: T[], ...args: unknown[]) => xs[i]
|
||||||
|
|
||||||
|
export const nthEq = (i: number, v: any) => (xs: any[], ...args: unknown[]) => xs[i] === v
|
||||||
|
|
||||||
|
export const eq = <T>(v: T) => (x: T) => x === v
|
||||||
|
|
||||||
|
export const ne = <T>(v: T) => (x: T) => x !== v
|
||||||
|
|
||||||
|
export const prop = (k: string) => <T>(x: Record<string, T>) => x[k]
|
||||||
|
|
||||||
|
export const hash = (s: string) =>
|
||||||
|
Math.abs(s.split("").reduce((a, b) => ((a << 5) - a + b.charCodeAt(0)) | 0, 0)).toString()
|
||||||
|
|
||||||
|
// Collections
|
||||||
|
|
||||||
|
export const splitAt = <T>(n: number, xs: T[]) => [xs.slice(0, n), xs.slice(n)]
|
||||||
|
|
||||||
|
export const choice = <T>(xs: T[]): T => xs[Math.floor(xs.length * Math.random())]
|
||||||
|
|
||||||
|
export const shuffle = <T>(xs: Iterable<T>): T[] => Array.from(xs).sort(() => Math.random() > 0.5 ? 1 : -1)
|
||||||
|
|
||||||
|
export const isIterable = (x: any) => Symbol.iterator in Object(x)
|
||||||
|
|
||||||
|
export const toIterable = (x: any) => isIterable(x) ? x : [x]
|
||||||
|
|
||||||
|
export const ensurePlural = <T>(x: T | T[]) => (x instanceof Array ? x : [x])
|
||||||
|
|
||||||
export const flatten = <T>(xs: T[]) => xs.flatMap(identity)
|
export const flatten = <T>(xs: T[]) => xs.flatMap(identity)
|
||||||
|
|
||||||
export const uniq = <T>(xs: T[]) => Array.from(new Set(xs))
|
export const uniq = <T>(xs: T[]) => Array.from(new Set(xs))
|
||||||
@@ -55,25 +88,6 @@ export const uniqBy = <T>(f: (x: T) => any, xs: T[]) => {
|
|||||||
return r
|
return r
|
||||||
}
|
}
|
||||||
|
|
||||||
export const choice = <T>(xs: T[]): T => xs[Math.floor(xs.length * Math.random())]
|
|
||||||
|
|
||||||
export const shuffle = <T>(xs: Iterable<T>): T[] => Array.from(xs).sort(() => Math.random() > 0.5 ? 1 : -1)
|
|
||||||
|
|
||||||
export const randomId = (): string => Math.random().toString().slice(2)
|
|
||||||
|
|
||||||
export const isIterable = (x: any) => Symbol.iterator in Object(x)
|
|
||||||
|
|
||||||
export const toIterable = (x: any) => isIterable(x) ? x : [x]
|
|
||||||
|
|
||||||
export const stripProtocol = (url: string) => url.replace(/.*:\/\//, "")
|
|
||||||
|
|
||||||
export const ensurePlural = <T>(x: T | T[]) => (x instanceof Array ? x : [x])
|
|
||||||
|
|
||||||
export const splitAt = <T>(n: number, xs: T[]) => [xs.slice(0, n), xs.slice(n)]
|
|
||||||
|
|
||||||
export const hash = (s: string) =>
|
|
||||||
Math.abs(s.split("").reduce((a, b) => ((a << 5) - a + b.charCodeAt(0)) | 0, 0)).toString()
|
|
||||||
|
|
||||||
export const sortBy = <T>(f: (x: T) => number, xs: T[]) =>
|
export const sortBy = <T>(f: (x: T) => number, xs: T[]) =>
|
||||||
xs.sort((a: T, b: T) => f(a) - f(b))
|
xs.sort((a: T, b: T) => f(a) - f(b))
|
||||||
|
|
||||||
@@ -167,6 +181,8 @@ export const pushToMapKey = <T>(m: Map<string, T[]>, k: string, v: T) => {
|
|||||||
m.set(k, a)
|
m.set(k, a)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Random obscure stuff
|
||||||
|
|
||||||
export const hexToBech32 = (prefix: string, url: string) =>
|
export const hexToBech32 = (prefix: string, url: string) =>
|
||||||
bech32.encode(prefix, bech32.toWords(utf8.decode(url)), false)
|
bech32.encode(prefix, bech32.toWords(utf8.decode(url)), false)
|
||||||
|
|
||||||
|
|||||||
@@ -1,5 +1,5 @@
|
|||||||
import type {Event} from 'nostr-tools'
|
import type {Event} from 'nostr-tools'
|
||||||
import {Emitter, randomId, groupBy, batch, defer, uniq, uniqBy} from '@coracle.social/lib'
|
import {Emitter, now, randomId, groupBy, batch, defer, uniq, uniqBy} from '@coracle.social/lib'
|
||||||
import type {Deferred} from '@coracle.social/lib'
|
import type {Deferred} from '@coracle.social/lib'
|
||||||
import {asEvent,} from '@coracle.social/util'
|
import {asEvent,} from '@coracle.social/util'
|
||||||
import {Tracker} from "./Tracker"
|
import {Tracker} from "./Tracker"
|
||||||
@@ -24,6 +24,7 @@ export type PublishRequest = {
|
|||||||
|
|
||||||
export type Publish = {
|
export type Publish = {
|
||||||
id: string
|
id: string
|
||||||
|
created_at: number
|
||||||
emitter: Emitter
|
emitter: Emitter
|
||||||
request: PublishRequest
|
request: PublishRequest
|
||||||
status: PublishStatusMap
|
status: PublishStatusMap
|
||||||
@@ -32,11 +33,12 @@ export type Publish = {
|
|||||||
|
|
||||||
export const makePublish = (request: PublishRequest) => {
|
export const makePublish = (request: PublishRequest) => {
|
||||||
const id = randomId()
|
const id = randomId()
|
||||||
|
const created_at = now()
|
||||||
const emitter = new Emitter()
|
const emitter = new Emitter()
|
||||||
const result: Publish['result'] = defer()
|
const result: Publish['result'] = defer()
|
||||||
const status: Publish['status'] = new Map()
|
const status: Publish['status'] = new Map()
|
||||||
|
|
||||||
return {id, request, emitter, result, status}
|
return {id, created_at, request, emitter, result, status}
|
||||||
}
|
}
|
||||||
|
|
||||||
export const publish = (request: PublishRequest) => {
|
export const publish = (request: PublishRequest) => {
|
||||||
@@ -58,9 +60,11 @@ export const publish = (request: PublishRequest) => {
|
|||||||
})
|
})
|
||||||
|
|
||||||
// Start everything off as pending
|
// Start everything off as pending
|
||||||
for (const relay of request.relays) {
|
requestAnimationFrame(() => {
|
||||||
pub.emitter.emit(PublishStatus.Pending, relay)
|
for (const relay of request.relays) {
|
||||||
}
|
pub.emitter.emit(PublishStatus.Pending, relay)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
// Set a timeout
|
// Set a timeout
|
||||||
const timeout = setTimeout(() => {
|
const timeout = setTimeout(() => {
|
||||||
|
|||||||
@@ -63,7 +63,7 @@ export const makeSubscription = (request: SubscribeRequest) => {
|
|||||||
}
|
}
|
||||||
|
|
||||||
export const calculateSubscriptionGroup = (sub: Subscription) => {
|
export const calculateSubscriptionGroup = (sub: Subscription) => {
|
||||||
const parts: string[] = sub.request.filters.map(calculateFilterGroup)
|
const parts: string[] = []
|
||||||
|
|
||||||
if (sub.request.timeout) parts.push(`timeout:${sub.request.timeout}`)
|
if (sub.request.timeout) parts.push(`timeout:${sub.request.timeout}`)
|
||||||
if (sub.request.closeOnEose) parts.push('closeOnEose')
|
if (sub.request.closeOnEose) parts.push('closeOnEose')
|
||||||
@@ -166,11 +166,11 @@ export const mergeSubscriptions = (subs: Subscription[]) => {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
console.log(
|
// console.log(
|
||||||
`Starting ${mergedSubscriptions.length} subscriptions on ${uniq(mergedSubscriptions.flatMap(s => s.request.relays)).length} relays`,
|
// `Starting ${mergedSubscriptions.length} subscriptions on ${uniq(mergedSubscriptions.flatMap(s => s.request.relays)).length} relays`,
|
||||||
uniq(mergedSubscriptions.flatMap(s => s.request.relays)),
|
// uniq(mergedSubscriptions.flatMap(s => s.request.relays)),
|
||||||
...mergeFilters(mergedSubscriptions.flatMap(s => s.request.filters)),
|
// ...mergeFilters(mergedSubscriptions.flatMap(s => s.request.filters)),
|
||||||
)
|
// )
|
||||||
|
|
||||||
return mergedSubscriptions
|
return mergedSubscriptions
|
||||||
}
|
}
|
||||||
@@ -255,7 +255,7 @@ export const executeSubscription = (sub: Subscription) => {
|
|||||||
export const executeSubscriptions = (subs: Subscription[]) =>
|
export const executeSubscriptions = (subs: Subscription[]) =>
|
||||||
mergeSubscriptions(subs).forEach(executeSubscription)
|
mergeSubscriptions(subs).forEach(executeSubscription)
|
||||||
|
|
||||||
export const executeSubscriptionBatched = batch(300, executeSubscriptions)
|
export const executeSubscriptionBatched = batch(800, executeSubscriptions)
|
||||||
|
|
||||||
export const subscribe = (request: SubscribeRequest) => {
|
export const subscribe = (request: SubscribeRequest) => {
|
||||||
const subscription: Subscription = makeSubscription(request)
|
const subscription: Subscription = makeSubscription(request)
|
||||||
@@ -265,11 +265,11 @@ export const subscribe = (request: SubscribeRequest) => {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (request.immediate) {
|
if (request.immediate) {
|
||||||
console.log(
|
// console.log(
|
||||||
`Starting 1 subscriptions on ${request.relays.length} relays`,
|
// `Starting 1 subscriptions on ${request.relays.length} relays`,
|
||||||
request.relays,
|
// request.relays,
|
||||||
...mergeFilters(request.filters)
|
// ...mergeFilters(request.filters)
|
||||||
)
|
// )
|
||||||
|
|
||||||
executeSubscription(subscription)
|
executeSubscription(subscription)
|
||||||
} else {
|
} else {
|
||||||
|
|||||||
Reference in New Issue
Block a user