diff --git a/packages/lib/Context.ts b/packages/lib/Context.ts deleted file mode 100644 index daf861c..0000000 --- a/packages/lib/Context.ts +++ /dev/null @@ -1,13 +0,0 @@ -const ctx = new Map() - -export const getContext = (k: string) => ctx.get(k) - -export const setContext = (k: string, d: any) => ctx.set(k, d) - -export const withContext = (k: string, d: any, f: () => void) => { - const o = ctx.get(k) - - ctx.set(k, d) - f() - ctx.set(k, o) -} diff --git a/packages/lib/Tools.ts b/packages/lib/Tools.ts index cbb97c9..81df5dd 100644 --- a/packages/lib/Tools.ts +++ b/packages/lib/Tools.ts @@ -1,3 +1,4 @@ +import {throttle} from 'throttle-debounce' import {bech32, utf8} from "@scure/base" export const now = () => Math.round(Date.now() / 1000) @@ -12,6 +13,14 @@ export const prop = (k: string) => (x: Record) => x[k] export const identity = (x: T) => x +export const max = (xs: number[]) => xs.reduce((a, b) => Math.max(a, b), 0) + +export const min = (xs: number[]) => xs.reduce((a, b) => Math.min(a, b), 0) + +export const sum = (xs: number[]) => xs.reduce((a, b) => a + b, 0) + +export const avg = (xs: number[]) => sum(xs) / xs.length + export const between = (low: number, high: number, n: number) => n > low && n < high export const flatten = (xs: T[]) => xs.flatMap(identity) @@ -62,6 +71,16 @@ export const pushToKey = (m: Record | Map, k: strin return m } +export const batch = (t: number, f: (xs: T[]) => void) => { + const xs: T[] = [] + const cb = throttle(t, () => xs.length > 0 && f(xs.splice(0))) + + return (x: T) => { + xs.push(x) + cb() + } +} + export const hexToBech32 = (prefix: string, url: string) => bech32.encode(prefix, bech32.toWords(utf8.decode(url)), false) diff --git a/packages/lib/index.ts b/packages/lib/index.ts index 9f0a840..667162c 100644 --- a/packages/lib/index.ts +++ b/packages/lib/index.ts @@ -1,4 +1,3 @@ -export * from './Context' export * from './Deferred' export * from './Emitter' export * from './Fluent' diff --git a/packages/network/Context.ts b/packages/network/Context.ts new file mode 100644 index 0000000..acad6de --- /dev/null +++ b/packages/network/Context.ts @@ -0,0 +1,35 @@ +import type {Event} from 'nostr-tools' +import {matchFilters, hasValidSignature} from '@coracle.social/util' +import type {Filter} from '@coracle.social/util' +import {Pool} from "./Pool" +import {Executor} from "./Executor" +import {Relays} from "./target/Relays" +import type {Subscription} from "./Subscribe" + +export const defaultPool = new Pool() + +export const defaultGetExecutor = (relays: string[]) => + new Executor(new Relays(relays.map((relay: string) => NetworkContext.pool.get(relay)))) + +const defaultOnEvent = (url: string, event: Event) => null + +const defaultOnAuth = (url: string, challenge: string) => null + +const defaultOnOk = (url: string, id: string, ok: boolean, message: string) => null + +const defaultIsDeleted = (url: string, event: Event) => false + +const defaultHasValidSignature = (url: string, event: Event) => hasValidSignature(event) + +const defaultMatchFilters = (url: string, filters: Filter[], event: Event) => matchFilters(filters, event) + +export const NetworkContext = { + pool: defaultPool, + getExecutor: defaultGetExecutor, + onEvent: defaultOnEvent, + onAuth: defaultOnAuth, + onOk: defaultOnOk, + isDeleted: defaultIsDeleted, + hasValidSignature: defaultHasValidSignature, + matchFilters: defaultMatchFilters, +} diff --git a/packages/network/Executor.ts b/packages/network/Executor.ts index d4fc15c..c732108 100644 --- a/packages/network/Executor.ts +++ b/packages/network/Executor.ts @@ -2,6 +2,7 @@ import type {Event, Filter} from 'nostr-tools' import type {Emitter} from '@coracle.social/lib' import type {Connection} from './Connection' import type {Message} from './Socket' +import {NetworkContext} from './Context' export type Target = Emitter & { connections: Connection[] @@ -11,25 +12,37 @@ export type Target = Emitter & { type EventCallback = (url: string, event: Event) => void type EoseCallback = (url: string) => void -type AuthCallback = (url: string, challenge: string) => void type OkCallback = (url: string, id: string, ...extra: any[]) => void type ErrorCallback = (url: string, id: string, ...extra: any[]) => void type SubscribeOpts = {onEvent?: EventCallback, onEose?: EoseCallback} type PublishOpts = {verb?: string, onOk?: OkCallback, onError?: ErrorCallback} -type AuthOpts = {onAuth: AuthCallback, onOk: OkCallback} const createSubId = (prefix: string) => [prefix, Math.random().toString().slice(2, 10)].join('-') export class Executor { - constructor(readonly target: Target) {} + constructor(readonly target: Target) { + target.on('AUTH', NetworkContext.onAuth) + target.on('OK', NetworkContext.onOk) + } subscribe(filters: Filter[], {onEvent, onEose}: SubscribeOpts = {}) { let closed = false const id = createSubId('REQ') - const eventListener = (url: string, subid: string, e: Event) => subid === id && onEvent?.(url, e) - const eoseListener = (url: string, subid: string) => subid === id && onEose?.(url) + + const eventListener = (url: string, subid: string, e: Event) => { + if (subid === id) { + NetworkContext.onEvent(url, e) + onEvent?.(url, e) + } + } + + const eoseListener = (url: string, subid: string) => { + if (subid === id) { + onEose?.(url) + } + } this.target.on('EVENT', eventListener) this.target.on('EOSE', eoseListener) @@ -49,8 +62,18 @@ export class Executor { } publish(event: Event, {verb = 'EVENT', onOk, onError}: PublishOpts = {}) { - const okListener = (url: string, id: string, ...payload: any[]) => id === event.id && onOk?.(url, id, ...payload) - const errorListener = (url: string, id: string, ...payload: any[]) => id === event.id && onError?.(url, id, ...payload) + const okListener = (url: string, id: string, ...payload: any[]) => { + if (id === event.id) { + NetworkContext.onEvent(url, event) + onOk?.(url, id, ...payload) + } + } + + const errorListener = (url: string, id: string, ...payload: any[]) => { + if (id === event.id) { + onError?.(url, id, ...payload) + } + } this.target.on('OK', okListener) this.target.on('ERROR', errorListener) @@ -63,17 +86,5 @@ export class Executor { } } } - - handleAuth({onAuth, onOk}: AuthOpts) { - this.target.on('AUTH', onAuth) - this.target.on('OK', onOk) - - return { - unsubscribe: () => { - this.target.off('AUTH', onAuth) - this.target.off('OK', onOk) - } - } - } } diff --git a/packages/network/Subscribe.ts b/packages/network/Subscribe.ts new file mode 100644 index 0000000..d42c39c --- /dev/null +++ b/packages/network/Subscribe.ts @@ -0,0 +1,252 @@ +import type {Event} from 'nostr-tools' +import {Emitter, randomId, groupBy, batch, defer, uniq} from '@coracle.social/lib' +import type {Deferred} from '@coracle.social/lib' +import {matchFilters, calculateFilterGroup, combineFilters} from '@coracle.social/util' +import type {Filter} from '@coracle.social/util' +import {Tracker} from "./Tracker" +import {Connection} from './Connection' +import {NetworkContext} from './Context' + +// `subscribe` is a super function that handles batching subscriptions by merging +// them based on parameters (filters and subscribe opts), then splits them by relay. +// This results in fewer REQs being opened per connection, fewer duplicate events +// being downloaded, and therefore less signature validation. +// +// Behavior can be further configured using NetworkContext. This can be useful for +// adding support for querying a local cache like a relay, tracking deleted events, +// and bypassing validation for trusted relays. +// +// Urls that any given event was seen on are tracked using subscription request's `tracker` +// property. These are merged across all subscription requests, so it is possible that an +// event may be seen on more relays that were actually requested, in the case of overlapping +// subscriptions. + +export enum SubscriptionEvent { + Eose = "eose", + Close = "close", + Event = "event", + Abort = "abort", + Complete = "complete", + Duplicate = "duplicate", + DeletedEvent = "deleted-event", + FailedFilter = "failed-filter", + InvalidSignature = "invalid-signature", +} + +export type SubscribeRequest = { + relays: string[] + filters: Filter[] + timeout?: number + immediate?: boolean + closeOnEose?: boolean +} + +export type Subscription = { + id: string + emitter: Emitter + tracker: Tracker + result: Deferred + request: SubscribeRequest + close: () => void +} + +export const makeSubscription = (request: SubscribeRequest) => { + const id = randomId() + const emitter = new Emitter() + const tracker = new Tracker() + const result = defer() + const close = () => emitter.emit('abort') + + return {id, request, emitter, tracker, result, close} +} + +export const calculateSubscriptionGroup = (sub: Subscription) => { + const parts: string[] = sub.request.filters.map(calculateFilterGroup) + + if (sub.request.timeout) parts.push(`timeout:${sub.request.timeout}`) + if (sub.request.closeOnEose) parts.push('closeOnEose') + + return parts.join('|') +} + +export const mergeSubscriptions = (subs: Subscription[]) => { + const completedRelays = new Set() + const mergedSubscriptions = [] + + for (const group of Object.values(groupBy(calculateSubscriptionGroup, subs))) { + for (const relay of uniq(group.flatMap((sub: Subscription) => sub.request.relays))) { + const abortedSubs = new Set() + const callerSubs = group.filter((sub: Subscription) => sub.request.relays.includes(relay)) + const mergedSub = makeSubscription({ + relays: [relay], + timeout: callerSubs[0].request.timeout, + filters: combineFilters(callerSubs.flatMap((sub: Subscription) => sub.request.filters)), + }) + + for (const {id, emitter, tracker} of callerSubs) { + // Propagate links to the caller + tracker.link(mergedSub.tracker) + + // Propagate abort event from the caller to the merged subscription + emitter.on(SubscriptionEvent.Abort, () => { + abortedSubs.add(id) + + if (abortedSubs.size === callerSubs.length) { + mergedSub.close() + } + }) + } + + // Pass events back to caller + const propagateEvent = (type: SubscriptionEvent, checkFilter: boolean) => + mergedSub.emitter.on(type, (url: string, event: Event) => { + for (const sub of callerSubs) { + if (!checkFilter || matchFilters(sub.request.filters, event)) { + sub.emitter.emit(type, url, event) + } + } + }) + + propagateEvent(SubscriptionEvent.Event, true) + propagateEvent(SubscriptionEvent.Duplicate, true) + propagateEvent(SubscriptionEvent.DeletedEvent, false) + propagateEvent(SubscriptionEvent.FailedFilter, false) + propagateEvent(SubscriptionEvent.InvalidSignature, true) + + // Propagate eose + mergedSub.emitter.on(SubscriptionEvent.Eose, (url: string) => { + for (const sub of callerSubs) { + sub.emitter.emit(SubscriptionEvent.Eose, url) + } + }) + + // Propagate close + mergedSub.emitter.on(SubscriptionEvent.Close, (url: string) => { + for (const sub of callerSubs) { + sub.emitter.emit(SubscriptionEvent.Close, url) + } + }) + + // Propagate subscription completion. Since we split subs by relay, we need to wait + // until all relays are completed before we notify + mergedSub.emitter.on(SubscriptionEvent.Complete, () => { + completedRelays.add(relay) + + for (const sub of callerSubs) { + if (sub.request.relays.every(url => completedRelays.has(url))) { + sub.emitter.emit(SubscriptionEvent.Complete) + } + } + }) + + // Propagate promise resolution + mergedSub.result.then((events: Event[]) => { + for (const sub of callerSubs) { + sub.result.resolve(events.filter((e: Event) => matchFilters(sub.request.filters, e))) + } + }) + + mergedSubscriptions.push(mergedSub) + } + } + + return mergedSubscriptions +} + +export const executeSubscription = (sub: Subscription) => { + const {result, request, emitter, tracker} = sub + const {timeout, filters, closeOnEose, relays} = request + const executor = NetworkContext.getExecutor(relays) + const events: Event[] = [] + + const completedRelays = new Set() + let completed: number + + const complete = () => { + if (completed) return + + // Mark as cleaned upp, unsubscribe our executor + completed = Date.now() + executorSub.unsubscribe() + + // Resolve our promise + result.resolve(events) + + // Notify caller, clean up our event emitter + emitter.emit(SubscriptionEvent.Complete) + emitter.removeAllListeners() + + // Remove our onClose handler from connections, since they are shared by many subs + executor.target.connections.forEach((c: Connection) => c.off("close", onClose)) + executor.target.cleanup() + } + + const onEvent = (url: string, event: Event) => { + // Check the signature and filters. If we've seen this event, don't re-validate. + if (tracker.track(event.id, url)) { + emitter.emit(SubscriptionEvent.Duplicate, url, event) + } else if (NetworkContext.isDeleted(url, event)) { + emitter.emit(SubscriptionEvent.DeletedEvent, url, event) + } else if (!NetworkContext.matchFilters(url, filters, event)) { + emitter.emit(SubscriptionEvent.FailedFilter, url, event) + } else if (!NetworkContext.hasValidSignature(url, event)) { + emitter.emit(SubscriptionEvent.InvalidSignature, url, event) + } else { + emitter.emit(SubscriptionEvent.Event, url, event) + events.push(event) + } + } + + const onEose = (url: string) => { + completedRelays.add(url) + + emitter.emit(SubscriptionEvent.Eose, url) + + if (closeOnEose && completedRelays.size === executor.target.connections.length) { + complete() + } + } + + const onClose = (connection: Connection) => { + completedRelays.add(connection.url) + + emitter.emit(SubscriptionEvent.Close, connection.url) + + if (completedRelays.size === executor.target.connections.length) { + complete() + } + } + + // Allow the caller to cancel the subscription + emitter.on(SubscriptionEvent.Abort, complete) + + // If we have a timeout, complete the subscription automatically + if (timeout) setTimeout(complete, timeout) + + // If one of our connections gets closed make sure to kill our sub + executor.target.connections.forEach((c: Connection) => c.on('close', onClose)) + + // Finally, start our subscription + const executorSub = executor.subscribe(filters, {onEvent, onEose}) +} + +export const executeSubscriptions = (subs: Subscription[]) => + mergeSubscriptions(subs).forEach(executeSubscription) + +export const executeSubscriptionBatched = batch(500, executeSubscriptions) + +export const subscribe = (request: SubscribeRequest) => { + const subscription: Subscription = makeSubscription(request) + + if (request.filters.length === 0) { + throw new Error("Zero filters passed to subscribe") + } + + if (request.immediate) { + executeSubscription(subscription) + } else { + executeSubscriptionBatched(subscription) + } + + return subscription +} diff --git a/packages/network/Subscription.ts b/packages/network/Subscription.ts deleted file mode 100644 index 0fcc0b6..0000000 --- a/packages/network/Subscription.ts +++ /dev/null @@ -1,123 +0,0 @@ -import {Emitter} from '@coracle.social/lib' -import type {Filter} from '@coracle.social/util' -import {matchFilters, hasValidSignature} from '@coracle.social/util' -import type {Event} from 'nostr-tools' -import type {Executor} from "./Executor" -import type {Connection} from './Connection' - -export type SubscriptionOpts = { - executor: Executor - filters: Filter[] - timeout?: number - closeOnEose?: boolean - hasSeen?: (e: Event, url: string) => boolean - shouldValidate?: (e: Event, url: string) => boolean -} - -export class Subscription extends Emitter { - unsubscribe: () => void - dead = new Set() - seen = new Set() - eose = new Set() - closeHandlers = new Map() - opened = Date.now() - closed?: number - - constructor(readonly opts: SubscriptionOpts) { - super() - - const {executor, timeout, filters} = this.opts - - // If we have a timeout, close the subscription automatically - if (timeout) { - setTimeout(this.close, timeout) - } - - // If one of our connections gets closed make sure to kill our sub - executor.target.connections.forEach(con => { - const handler = () => { - this.dead.add(con.url) - - if (this.dead.size === executor.target.connections.length) { - this.close() - } - } - - this.closeHandlers.set(con.url, handler) - - con.on("close", handler) - }) - - // Start our subscription - const sub = executor.subscribe(filters, { - onEvent: this.onEvent, - onEose: this.onEose, - }) - - this.unsubscribe = sub.unsubscribe - } - - hasSeen = (event: Event, url: string) => { - if (this.opts.hasSeen) { - return this.opts.hasSeen(event, url) - } - - if (this.seen.has(event.id)) { - return true - } - - this.seen.add(event.id) - - return false - } - - hasValidSignature = (event: Event, url: string) => { - if (this.opts.shouldValidate && !this.opts.shouldValidate(event, url)) { - return true - } - - return hasValidSignature(event) - } - - onEvent = (url: string, event: Event) => { - // If we've seen this event, don't re-validate - // Otherwise, check the signature and filters - if (this.hasSeen(event, url)) { - this.emit("duplicate", event, url) - } else { - if (!this.hasValidSignature(event, url)) { - this.emit("invalid-signature", event, url) - } else if (!matchFilters(this.opts.filters, event)) { - this.emit("failed-filter", event, url) - } else { - this.emit("event", event, url) - } - } - } - - onEose = (url: string) => { - const {executor, closeOnEose} = this.opts - - this.emit("eose", url) - - this.eose.add(url) - - if (closeOnEose && this.eose.size >= executor.target.connections.length) { - this.close() - } - } - - close = () => { - if (!this.closed) { - const {target} = this.opts.executor - - this.closed = Date.now() - this.unsubscribe() - this.emit("close") - this.removeAllListeners() - - target.connections.forEach((con: Connection) => con.off("close", this.closeHandlers.get(con.url))) - target.cleanup() - } - } -} diff --git a/packages/network/Tracker.ts b/packages/network/Tracker.ts new file mode 100644 index 0000000..e28d308 --- /dev/null +++ b/packages/network/Tracker.ts @@ -0,0 +1,54 @@ +import {writable} from '@coracle.social/lib' + +export class Tracker { + links: Tracker[] = [] + data = writable(new Map>()) + + getRelays = (eventId: string) => { + const relays = new Set() + + for (const relay of this.data.get().get(eventId) || []) { + relays.add(relay) + } + + for (const link of this.links) { + for (const relay of link.getRelays(eventId)) { + relays.add(relay) + } + } + + return relays + } + + hasRelay = (eventId: string, relay: string) => { + return this.getRelays(eventId).has(relay) + } + + addRelay = (eventId: string, relay: string) => { + const relays = this.data.get().get(eventId) || new Set() + + relays.add(relay) + + this.data.update(m => { + m.set(eventId, relays) + + return m + }) + } + + track = (eventId: string, relay: string) => { + if (this.hasRelay(eventId, relay)) return true + + this.addRelay(eventId, relay) + + return false + } + + link = (tracker: Tracker) => this.links.push(tracker) + + copy = (eventId1: string, eventId2: string) => { + for (const relay of this.getRelays(eventId1)) { + this.addRelay(eventId2, relay) + } + } +} diff --git a/packages/network/index.ts b/packages/network/index.ts index 43f08a1..9d21ca4 100644 --- a/packages/network/index.ts +++ b/packages/network/index.ts @@ -1,9 +1,11 @@ export * from "./Connection" export * from "./ConnectionMeta" +export * from "./Context" export * from "./Executor" export * from "./Pool" export * from "./Socket" -export * from "./Subscription" +export * from "./Subscribe" +export * from "./Tracker" export * from "./target/Multi" export * from "./target/Plex" export * from "./target/Relay"