From a78e72310d4d2ef40efa9e8e6da264c096c3ed54 Mon Sep 17 00:00:00 2001 From: Jon Staab Date: Fri, 2 Feb 2024 16:46:33 -0800 Subject: [PATCH] Add new router helper for relay hints --- src/connect/Connection.ts | 4 +- src/connect/Executor.ts | 4 +- src/connect/Pool.ts | 2 +- src/connect/Socket.ts | 2 +- src/connect/Subscription.ts | 4 +- src/connect/target/Multi.ts | 4 +- src/connect/target/Plex.ts | 6 +- src/connect/target/Relay.ts | 6 +- src/connect/target/Relays.ts | 6 +- src/main.ts | 8 +- src/nostr/Router.ts | 386 ++++++++++++++++++----------------- src/{util => nostr}/Tag.ts | 6 +- src/{util => nostr}/Tags.ts | 12 +- src/{util => nostr}/kinds.ts | 0 src/util/misc.ts | 4 + 15 files changed, 234 insertions(+), 220 deletions(-) rename src/{util => nostr}/Tag.ts (76%) rename src/{util => nostr}/Tags.ts (94%) rename src/{util => nostr}/kinds.ts (100%) diff --git a/src/connect/Connection.ts b/src/connect/Connection.ts index 667668a..4fdd97e 100644 --- a/src/connect/Connection.ts +++ b/src/connect/Connection.ts @@ -1,5 +1,5 @@ -import {Emitter} from './util/Emitter' -import {Queue} from './util/Queue' +import {Emitter} from '../util/Emitter' +import {Queue} from '../util/Queue' import {AuthStatus, ConnectionMeta} from './ConnectionMeta' import {Socket, isMessage, asMessage} from './Socket' import type {SocketMessage} from './Socket' diff --git a/src/connect/Executor.ts b/src/connect/Executor.ts index 862fdf0..04597bd 100644 --- a/src/connect/Executor.ts +++ b/src/connect/Executor.ts @@ -1,7 +1,7 @@ import type {Event, Filter} from 'nostr-tools' +import type {Emitter} from '../util/Emitter' import type {Connection} from './Connection' -import type {Emitter} from './util/Emitter' -import type {Message} from './connect/Socket' +import type {Message} from './Socket' export type Target = Emitter & { connections: Connection[] diff --git a/src/connect/Pool.ts b/src/connect/Pool.ts index 48b74a8..02d1e71 100644 --- a/src/connect/Pool.ts +++ b/src/connect/Pool.ts @@ -1,5 +1,5 @@ import {Connection} from "./Connection" -import {Emitter} from './util/Emitter' +import {Emitter} from '../util/Emitter' export class Pool extends Emitter { data: Map diff --git a/src/connect/Socket.ts b/src/connect/Socket.ts index 517a350..abbde22 100644 --- a/src/connect/Socket.ts +++ b/src/connect/Socket.ts @@ -1,6 +1,6 @@ import type {MessageEvent} from 'isomorphic-ws' import WebSocket from "isomorphic-ws" -import {Deferred, defer} from "./Deferred" +import {Deferred, defer} from "../util/Deferred" export type Message = [string, ...any[]] diff --git a/src/connect/Subscription.ts b/src/connect/Subscription.ts index 92206f5..1d69bce 100644 --- a/src/connect/Subscription.ts +++ b/src/connect/Subscription.ts @@ -1,8 +1,8 @@ import EventEmitter from "events" import type {Event} from 'nostr-tools' import type {Executor} from "./Executor" -import type {Filter} from './util/nostr' -import {matchFilters, hasValidSignature} from "./util/nostr" +import type {Filter} from '../util/nostr' +import {matchFilters, hasValidSignature} from "../util/nostr" export type SubscriptionOpts = { executor: Executor diff --git a/src/connect/target/Multi.ts b/src/connect/target/Multi.ts index d67f552..56bf9d5 100644 --- a/src/connect/target/Multi.ts +++ b/src/connect/target/Multi.ts @@ -1,6 +1,6 @@ +import {Emitter} from '../../util/Emitter' import type {Target} from '../Executor' -import type {Message} from '../connect/Socket' -import {Emitter} from '../util/Emitter' +import type {Message} from '../Socket' export class Multi extends Emitter { constructor(readonly targets: Target[]) { diff --git a/src/connect/target/Plex.ts b/src/connect/target/Plex.ts index 01ff328..7d7fe09 100644 --- a/src/connect/target/Plex.ts +++ b/src/connect/target/Plex.ts @@ -1,6 +1,6 @@ -import {Emitter} from '../util/Emitter' -import type {PlexMessage, Message} from '../connect/Socket' -import {Connection} from '../connect/Connection' +import {Emitter} from '../../util/Emitter' +import type {PlexMessage, Message} from '../Socket' +import type {Connection} from '../Connection' export class Plex extends Emitter { constructor(readonly urls: string[], readonly connection: Connection) { diff --git a/src/connect/target/Relay.ts b/src/connect/target/Relay.ts index d83d1d1..e3b8d7f 100644 --- a/src/connect/target/Relay.ts +++ b/src/connect/target/Relay.ts @@ -1,6 +1,6 @@ -import {Emitter} from '../util/Emitter' -import type {Message} from '../connect/Socket' -import type {Connection} from '../connect/Connection' +import {Emitter} from '../../util/Emitter' +import type {Message} from '../Socket' +import type {Connection} from '../Connection' export class Relay extends Emitter { constructor(readonly connection: Connection) { diff --git a/src/connect/target/Relays.ts b/src/connect/target/Relays.ts index d464bb3..b96471f 100644 --- a/src/connect/target/Relays.ts +++ b/src/connect/target/Relays.ts @@ -1,6 +1,6 @@ -import {Emitter} from '../util/Emitter' -import type {Message} from '../connect/Socket' -import type {Connection} from '../connect/Connection' +import {Emitter} from '../../util/Emitter' +import type {Message} from '../Socket' +import type {Connection} from '../Connection' export class Relays extends Emitter { constructor(readonly connections: Connection[]) { diff --git a/src/main.ts b/src/main.ts index 2164268..f2dfde7 100644 --- a/src/main.ts +++ b/src/main.ts @@ -4,9 +4,9 @@ export * from "./util/LRUCache" export * from "./util/Deferred" export * from "./util/Emitter" export * from "./util/Queue" -export * from "./util/Tag" -export * from "./util/Tags" export * from "./util/Fluent" +export * from "./nostr/Tag" +export * from "./nostr/Tags" export * from "./connect/Socket" export * from "./connect/Connection" export * from "./connect/ConnectionMeta" @@ -17,7 +17,3 @@ export * from "./connect/target/Plex" export * from "./connect/target/Relay" export * from "./connect/target/Relays" export * from "./connect/target/Multi" -export * from "./target/Plex" -export * from "./target/Relay" -export * from "./target/Relays" -export * from "./target/Multi" diff --git a/src/nostr/Router.ts b/src/nostr/Router.ts index b1875e1..599c5fe 100644 --- a/src/nostr/Router.ts +++ b/src/nostr/Router.ts @@ -1,218 +1,230 @@ - -/* - Smart relay selection - - From Mike Dilger: - - 1) Other people's write relays — pull events from people you follow, - including their contact lists - 2) Other people's read relays — push events that tag them (replies or just tagging). - However, these may be authenticated, use with caution - 3) Your write relays —- write events you post to your microblog feed for the - world to see. ALSO write your contact list. ALSO read back your own contact list. - 4) Your read relays —- read events that tag you. ALSO both write and read - client-private data like client configuration events or anything that the world - doesn't need to see. - 5) Advertise relays — write and read back your own relay list -*/ +import type {Event} from 'nostr-tools' +import {Tags, fromTags, getPubkeys, getGroups, getCommunities, getCommunitiesAndGroups, getReplyHints, getRootHints} from './Tags' +import {nth, first} from '../util/misc' export type RouterOptions = { - hintLimit: number - getUserPubkey: () => string[][] + getUserPubkey: () => string | null getGroupRelayTags: (address: string) => string[][] + getCommunityRelayTags: (address: string) => string[][] getPubkeyRelayTags: (pubkey: string) => string[][] - getRelayQuality: (url: string) => number + getFallbackRelayTags: () => string[][] + getRelayQuality?: (url: string) => number } +// - Fetch from and publish to non-shareable relays, but don't use them for hints +// - Test that scoring/sorting makes sense, particularly asc/desc sort + export class Router { constructor(readonly options: RouterOptions) {} - FetchUserDMs = () => new RouterScenario(() => { - const tags = Tags.from(this.options.getPubkeyRelayTags(this.options.getUserPubkey())) + // Utilities derived from options - return tags. + getGroupRelayUrls = (address: string) => + this.options.getGroupRelayTags(address).map(nth(1)) + + getCommunityRelayUrls = (address: string) => + this.options.getCommunityRelayTags(address).map(nth(1)) + + getPubkeyRelayTags = (pubkey: string, mode?: string) => { + const tags = this.options.getPubkeyRelayTags(pubkey) + + return mode ? fromTags(Tags.from(tags).whereMark(mode)) : tags + } + + getPubkeyRelayUrls = (pubkey: string, mode?: string) => + this.getPubkeyRelayTags(pubkey, mode).map(nth(1)) + + getUserRelayTags = (mode?: string) => { + const pubkey = this.options.getUserPubkey() + + return pubkey ? this.getPubkeyRelayTags(pubkey, mode) : [] + } + + getUserRelayUrls = (mode?: string) => { + const pubkey = this.options.getUserPubkey() + + return pubkey ? this.getPubkeyRelayUrls(pubkey, mode) : [] + } + + getEventGroupOrCommunityRelayUrlGroups = (event: Event, otherGroups: string[][]) => { + const groupAddresses = getGroups(event) + + if (groupAddresses.count() > 0) { + return Array.from(groupAddresses.map(this.getGroupRelayUrls)) + } + + return [ + ...getCommunities(event).map(this.getCommunityRelayUrls), + ...otherGroups, + ] + } + + // Utilities for processing hints + + getGroupScores = (groups: string[][]) => { + const scores: RouteScenarioScores = {} + + // TODO: see if weighting earlier groups slightly heavier improves things + for (const urls of groups) { + urls.forEach((url, i) => { + const score = 1 / (i + 1) / urls.length + + if (!scores[url]) { + scores[url] = {score: 0, count: 0} + } + + scores[url].score += score + scores[url].count += 1 + }) + } + + // Use log-sum-exp to get a a weighted sum + for (const [url, score] of Object.entries(scores)) { + const weight = Math.log(groups.length / score.count) + const thisScore = Math.log1p(Math.exp(score.score - score.count)) + const thatScore = this.options.getRelayQuality?.(url) || 1 + + score.score = (weight + thisScore) * thatScore + } + + return scores + } + + urlsFromScores = (limit: number, scores: RouteScenarioScores) => + Object.entries(scores).sort((a, b) => a[1].score > b[1].score ? 1 : -1).map(pair => pair[0] as string).slice(0, limit) + + groupsToUrls = (limit: number, groups: string[][]) => + this.urlsFromScores(limit, this.getGroupScores(groups)) + + // Routing scenarios + + FetchAllDirectMessage = () => new RouterScenario(this, { + fallbackPolicy: useMinimalFallbacks("read"), + getGroups: () => [this.getUserRelayUrls()], + }) + + FetchDirectMessages = (pubkey: string) => new RouterScenario(this, { + fallbackPolicy: useMinimalFallbacks("read"), + getGroups: () => [this.getUserRelayUrls(), this.getPubkeyRelayUrls(pubkey)], + }) + + PublishDirectMessage = (pubkey: string) => new RouterScenario(this, { + fallbackPolicy: useMinimalFallbacks("write"), + getGroups: () => [this.getUserRelayUrls("write"), this.getPubkeyRelayUrls(pubkey, "read")], + }) + + FetchPubkeyEvents = (pubkey: string) => new RouterScenario(this, { + fallbackPolicy: useMaximalFallbacks("read"), + getGroups: () => [this.getPubkeyRelayUrls(pubkey, "write")], + }) + + FetchEvent = (event: Event) => new RouterScenario(this, { + fallbackPolicy: useMaximalFallbacks("read"), + getGroups: () => + this.getEventGroupOrCommunityRelayUrlGroups(event, [ + this.getPubkeyRelayUrls(event.pubkey, "write"), + ]), + }) + + FetchEventChildren = (event: Event) => new RouterScenario(this, { + fallbackPolicy: useMaximalFallbacks("read"), + getGroups: () => + this.getEventGroupOrCommunityRelayUrlGroups(event, [ + this.getPubkeyRelayUrls(event.pubkey, "read"), + ]), + }) + + FetchEventParent = (event: Event) => new RouterScenario(this, { + fallbackPolicy: useMaximalFallbacks("read"), + getGroups: () => + this.getEventGroupOrCommunityRelayUrlGroups(event, [ + getReplyHints(event), + this.getPubkeyRelayUrls(event.pubkey, "read"), + ]), + }) + + FetchEventRoot = (event: Event) => new RouterScenario(this, { + fallbackPolicy: useMaximalFallbacks("read"), + getGroups: () => + this.getEventGroupOrCommunityRelayUrlGroups(event, [ + getRootHints(event), + this.getPubkeyRelayUrls(event.pubkey, "read"), + ]), + }) + + PublishEvent = (event: Event) => new RouterScenario(this, { + fallbackPolicy: useMinimalFallbacks("write"), + getGroups: () => + this.getEventGroupOrCommunityRelayUrlGroups(event, [ + this.getPubkeyRelayUrls(event.pubkey, "write"), + ...getPubkeys(event).map(pubkey => this.getPubkeyRelayUrls(pubkey, "read")), + ]), + }) + + FetchFromGroup = (address: string) => new RouterScenario(this, { + fallbackPolicy: useNoFallbacks(), + getGroups: () => [this.getGroupRelayUrls(address)], + }) + + PublishToGroup = (address: string) => new RouterScenario(this, { + fallbackPolicy: useNoFallbacks(), + getGroups: () => [this.getGroupRelayUrls(address)], + }) + + FetchFromCommunity = (address: string) => new RouterScenario(this, { + fallbackPolicy: useMaximalFallbacks("read"), + getGroups: () => [this.getCommunityRelayUrls(address)], + }) + + PublishToCommunity = (address: string) => new RouterScenario(this, { + fallbackPolicy: useMaximalFallbacks("write"), + getGroups: () => [this.getCommunityRelayUrls(address)], }) } +// Router Scenario + +export type RouterScenarioOptions = { + getGroups: () => string[][] + fallbackPolicy: FallbackPolicy +} + +export type RouteScenarioScores = Record export class RouterScenario { - constructor(readonly getAllHints) {} + constructor(readonly router: Router, readonly options: RouterScenarioOptions) {} - getHints = (limit: number) => this.getAllHints().slice(0, limit) -} + addFallbackUrls = (limit: number, urls: string[]) => { + if (urls.length < limit) { + const {mode, getLimit} = this.options.fallbackPolicy + const fallbackRelayTags = this.router.options.getFallbackRelayTags() + const fallbackUrls = Tags.from(fallbackRelayTags).whereMark(mode).values() + const fallbackLimit = getLimit(limit, urls) -Router.getHints( - -export const selectHints = (hints: Iterable, limit: number = null) => { - const {FORCE_RELAYS} = env.get() - const seen = new Set() - const ok = [] - const bad = [] - - if (!limit) { - limit = getSetting("relay_limit") - } - - for (const url of FORCE_RELAYS.length > 0 ? FORCE_RELAYS : hints) { - if (seen.has(url)) { - continue + return [...urls, ...fallbackUrls.slice(0, fallbackLimit)] } - seen.add(url) - - // Skip relays that just shouldn't ever be published - if (!isShareableRelay(url)) { - continue - } - - // Filter out relays that appear to be broken or slow - if (relayIsLowQuality(url)) { - bad.push(url) - } else { - ok.push(url) - } - - if (ok.length > limit) { - break - } + return urls } - // If we don't have enough hints, use the broken ones - const result = ok.concat(bad).slice(0, limit) + getUrls = (limit: number, extra: string[] = []) => { + const urlGroups = this.options.getGroups().concat([extra]) + const urls = this.router.groupsToUrls(limit, urlGroups) - if (result.length === 0) { - warn("No results returned from selectHints") + return this.addFallbackUrls(limit, urls) } - return result + getUrl = () => first(this.getUrls(1)) } -export const selectHintsWithFallback = (hints: Iterable = null, limit = null) => - selectHints(chain(hints || [], getUserRelayUrls(RelayMode.Read), env.get().DEFAULT_RELAYS), limit) +// Fallback Policy -export class HintSelector { - constructor( - readonly generateHints, - readonly hintsLimit = null, - ) {} - - limit = hintsLimit => new HintSelector(this.generateHints, hintsLimit) - - getHints = (...args) => - selectHints(this.generateHints(...args), this.hintsLimit || getSetting("relay_limit")) +class FallbackPolicy { + constructor(readonly mode: string, readonly getLimit: (limit: number, urls: string[]) => number) {} } -export const hintSelector = (generateHints: (...args: any[]) => Iterable) => { - const selector = new HintSelector(generateHints) - const getHints = selector.getHints +const useNoFallbacks = () => new FallbackPolicy("read", (limit: number, urls: string[]) => 0) - ;(getHints as any).limit = selector.limit +const useMinimalFallbacks = (mode: string) => new FallbackPolicy(mode, (limit: number, urls: string[]) => urls.length === 0 ? 1 : 0) - return getHints as typeof getHints & {limit: typeof selector.limit} -} - -export const getPubkeyHints = hintSelector(function* (pubkey: string, mode: RelayMode) { - yield* getPubkeyRelayUrls(pubkey, mode) -}) - -export const getPubkeyHint = (pubkey: string): string => - first(getPubkeyHints(1, pubkey, "write")) || "" - -export const getUserHints = hintSelector(function* (mode: RelayMode) { - yield* getUserRelayUrls(mode) -}) - -export const getUserHint = (pubkey: string): string => first(getUserHints(1, "write")) || "" - -export const getEventHints = hintSelector(function* (event: Event) { - for (const address of Tags.from(event).circles().all()) { - yield* getGroupHints(address) - } - - yield* getPubkeyRelayUrls(event.pubkey, RelayMode.Write) - yield* event.seen_on.filter(isShareableRelay) -}) - -export const getEventHint = (event: Event) => first(getEventHints.limit(1).getHints(event)) || "" - -// If we're looking for an event's children, the read relays the author has -// advertised would be the most reliable option, since well-behaved clients -// will write replies there. -export const getReplyHints = hintSelector(function* (event) { - for (const address of Tags.from(event).circles().all()) { - yield* getGroupHints(address) - } - - yield* getPubkeyRelayUrls(event.pubkey, RelayMode.Read) -}) - -// If we're looking for an event's parent, tags are the most reliable hint, -// but we can also look at where the author of the note reads from -export const getParentHints = hintSelector(function* (event) { - yield* Tags.from(event).getReplyHints() - yield* getPubkeyRelayUrls(event.pubkey, RelayMode.Read) -}) - -export const getRootHints = hintSelector(function* (event) { - yield* Tags.from(event).getRootHints() - yield* getPubkeyRelayUrls(event.pubkey, RelayMode.Read) -}) - -// If we're replying or reacting to an event, we want the author to know, as well as -// anyone else who is tagged in the original event or the reply. Get everyone's read -// relays. Limit how many per pubkey we publish to though. We also want to advertise -// our content to our followers, so publish to our write relays as well. -export const getPublishHints = hintSelector(function* (event: Event) { - for (const address of Tags.from(event).circles().all()) { - yield* getGroupHints(address) - } - - const pubkeys = Tags.from(event).type("p").values().all() - const hintGroups = pubkeys.map(pubkey => getPubkeyRelayUrls(pubkey, RelayMode.Read)) - const authorRelays = getPubkeyRelayUrls(event.pubkey, RelayMode.Write) - - yield* mergeHints([...hintGroups, authorRelays, getUserHints(RelayMode.Write)]) -}) - -export const getInboxHints = hintSelector(function* (pubkeys: string[]) { - yield* mergeHints(pubkeys.map(pk => getPubkeyHints(pk, "read"))) -}) - -export const getGroupHints = hintSelector(function* (address: string) { - yield* getGroupRelayUrls(address) - yield* getPubkeyHints(Naddr.fromTagValue(address).pubkey) -}) - -export const getGroupPublishHints = (addresses: string[]) => { - const urls = mergeHints(addresses.map(getGroupRelayUrls)) - - return urls.length === 0 ? getUserHints("write") : urls -} - -export const mergeHints = (groups: string[][], limit: number = null) => { - const scores = {} as Record - - for (const hints of groups) { - hints.forEach((hint, i) => { - const score = 1 / (i + 1) / hints.length - - if (!scores[hint]) { - scores[hint] = {score: 0, count: 0} - } - - scores[hint].score += score - scores[hint].count += 1 - }) - } - - // Use the log-sum-exp and a weighted sum - for (const score of Object.values(scores)) { - const weight = Math.log(groups.length / score.count) - - score.score = weight + Math.log1p(Math.exp(score.score - score.count)) - } - - return sortBy(([hint, {score}]) => score, Object.entries(scores)) - .map(nth(0)) - .slice(0, limit || getSetting("relay_limit")) -} +const useMaximalFallbacks = (mode: string) => new FallbackPolicy(mode, (limit: number, urls: string[]) => Math.max(0, limit - urls.length)) diff --git a/src/util/Tag.ts b/src/nostr/Tag.ts similarity index 76% rename from src/util/Tag.ts rename to src/nostr/Tag.ts index 7a18a5a..a813112 100644 --- a/src/util/Tag.ts +++ b/src/nostr/Tag.ts @@ -1,6 +1,6 @@ -import type {OmitStatics} from './misc' -import {last} from './misc' -import {Fluent} from './Fluent' +import type {OmitStatics} from '../util/misc' +import {last} from '../util/misc' +import {Fluent} from '../util/Fluent' export class Tag extends (Fluent as OmitStatics, 'from'>) { static from(parts: Iterable) { diff --git a/src/util/Tags.ts b/src/nostr/Tags.ts similarity index 94% rename from src/util/Tags.ts rename to src/nostr/Tags.ts index ed2283e..cd54513 100644 --- a/src/util/Tags.ts +++ b/src/nostr/Tags.ts @@ -1,10 +1,10 @@ import type {Event} from 'nostr-tools' -import {Tag} from './Tag' -import {Fluent} from './Fluent' -import type {OmitStatics} from './misc' -import {isIterable, uniq} from './misc' -import {isShareableRelay} from './nostr' +import {Fluent} from '../util/Fluent' +import type {OmitStatics} from '../util/misc' +import {isIterable, uniq} from '../util/misc' +import {isShareableRelay} from '../util/nostr' import {isCommunityAddress, isGroupAddress, isCommunityOrGroupAddress} from './kinds' +import {Tag} from './Tag' export class Tags extends (Fluent as OmitStatics, 'from'>) { static from(p: Iterable) { @@ -62,6 +62,8 @@ export const coerceToTags = (x: CoercibleToTags) => { throw new Error('Received invalid value to coerceToTags: ${x}') } +export const fromTags = (tags: Tags) => Array.from(tags).map(tag => Array.from(tag)) + export const getRelays = (x: CoercibleToTags) => uniq(Array.from(coerceToTags(x)).flatMap((t: Tag) => Array.from(t)).filter(isShareableRelay)) diff --git a/src/util/kinds.ts b/src/nostr/kinds.ts similarity index 100% rename from src/util/kinds.ts rename to src/nostr/kinds.ts diff --git a/src/util/misc.ts b/src/util/misc.ts index 98b08c1..93aefa2 100644 --- a/src/util/misc.ts +++ b/src/util/misc.ts @@ -1,5 +1,9 @@ export const now = () => Math.round(Date.now() / 1000) +export const nth = (i: number) => (xs: T[]) => xs[i] + +export const first = (xs: T[]) => xs[0] + export const last = (xs: T[]) => xs[xs.length - 1] export const identity = (x: T) => x