diff --git a/src/Subscription.ts b/src/Subscription.ts new file mode 100644 index 0000000..5be66ad --- /dev/null +++ b/src/Subscription.ts @@ -0,0 +1,89 @@ +import EventEmitter from "events" +import type {Event} from 'nostr-tools' +import type {Executor} from "./Executor" +import type {Filter} from './util/nostr' +import {matchFilters, hasValidSignature} from "./util/nostr" + +export type SubscriptionOpts = { + executor: Executor + filters: Filter[] + timeout?: number + hasSeen?: (e: Event) => boolean +} + +export class Subscription extends EventEmitter { + unsubscribe: () => void + seen = new Set() + opened = Date.now() + closed?: number + + constructor(readonly opts: SubscriptionOpts) { + super() + + const {executor, timeout, filters} = this.opts + + // If we have a timeout, close the subscription automatically + if (timeout) { + setTimeout(this.close, timeout) + } + + // If one of our connections gets closed make sure to kill our sub + executor.target.connections.forEach(con => con.on("close", this.close)) + + // Start our subscription + const sub = executor.subscribe(filters, { + onEvent: this.onEvent, + onEose: this.onEose, + }) + + this.unsubscribe = sub.unsubscribe + } + + hasSeen = (event: Event) => { + if (this.opts.hasSeen) { + return this.opts.hasSeen(event) + } + + if (this.seen.has(event.id)) { + return true + } + + this.seen.add(event.id) + + return false + } + + onEvent = (url: string, event: Event) => { + // If we've seen this event, don't re-validate + // Otherwise, check the signature and filters + if (this.hasSeen(event)) { + this.emit("duplicate", event, url) + } else { + if (!hasValidSignature(event)) { + this.emit("invalid-signature", event, url) + } else if (!matchFilters(this.opts.filters, event)) { + this.emit("failed-filter", event, url) + } else { + this.emit("event", event, url) + } + } + } + + onEose = (url: string) => { + this.emit("eose", url) + } + + close = () => { + if (!this.closed) { + const {target} = this.opts.executor + + this.closed = Date.now() + this.unsubscribe() + this.emit("close") + this.removeAllListeners() + + target.connections.forEach(con => con.off("close", this.close)) + target.cleanup() + } + } +} diff --git a/src/util/LRUCache.ts b/src/util/LRUCache.ts new file mode 100644 index 0000000..7d566d9 --- /dev/null +++ b/src/util/LRUCache.ts @@ -0,0 +1,61 @@ +export class LRUCache { + map = new Map() + keys: T[] = [] + + constructor(readonly maxSize: number) {} + + has(k: T) { + return this.map.has(k) + } + + get(k: T) { + const v = this.map.get(k) + + if (v !== undefined) { + this.keys.push(this.keys.shift() as T) + } + + return v + } + + set(k: T, v: U) { + this.map.set(k, v) + this.keys.push(k) + + if (this.map.size > this.maxSize) { + this.map.delete(this.keys.shift() as T) + } + } +} + +export function cached({ + maxSize, + getKey, + getValue, +}: { + maxSize: number + getKey: (args: any[]) => T + getValue: (args: any[]) => V +}) { + const cache = new LRUCache(maxSize) + + const get = (...args: any[]) => { + const k = getKey(args) + + let v = cache.get(k) + + if (!v) { + v = getValue(args) + + cache.set(k, v) + } + + return v + } + + get.cache = cache + get.getKey = getKey + get.getValue = getValue + + return get +} diff --git a/src/util/nostr.ts b/src/util/nostr.ts index 8f4bd53..3c0ccd2 100644 --- a/src/util/nostr.ts +++ b/src/util/nostr.ts @@ -1,3 +1,10 @@ +import type {Event} from 'nostr-tools' +import {verifySignature, getEventHash, matchFilter as nostrToolsMatchFilter} from 'nostr-tools' +import {cached} from "./LRUCache" + +// =========================================================================== +// Relays + export const stripProto = (url: string) => url.replace(/.*:\/\//, "") export const isShareableRelay = (url: string) => @@ -27,6 +34,71 @@ export const normalizeRelayUrl = (url: string) => { } } +// =========================================================================== +// Nostr URIs + export const fromNostrURI = (s: string) => s.replace(/^[\w+]+:\/?\/?/, "") export const toNostrURI = (s: string) => `nostr:${s}` + +// =========================================================================== +// Events + +export const hasValidSignature = cached({ + maxSize: 10000, + getKey: ([e]: any[]) => [getEventHash(e), e.sig].join(":"), + getValue: ([e]: any[]) => { + try { + verifySignature(e) + } catch (e) { + return false + } + + return true + }, +}) + +// =========================================================================== +// Filters + +export type Filter = { + ids?: string[] + kinds?: number[] + authors?: string[] + since?: number + until?: number + limit?: number + search?: string + [key: `#${string}`]: string[] +} + +export const matchFilter = (filter: Filter, event: Event) => { + if (!nostrToolsMatchFilter(filter, event)) { + return false + } + + if (filter.search) { + const content = event.content.toLowerCase() + const terms = filter.search.toLowerCase().split(/\s+/g) + + for (const term of terms) { + if (content.includes(term)) { + return true + } + + return false + } + } + + return true +} + +export const matchFilters = (filters: Filter[], event: Event) => { + for (const filter of filters) { + if (matchFilter(filter, event)) { + return true + } + } + + return false +} diff --git a/yarn.lock b/yarn.lock index 3ae0cf7..77c306c 100644 --- a/yarn.lock +++ b/yarn.lock @@ -1939,7 +1939,7 @@ type-fest@^0.8.1: resolved "https://registry.yarnpkg.com/type-fest/-/type-fest-0.8.1.tgz#09e249ebde851d3b1e48d27c105444667f17b83d" integrity sha512-4dbzIzqvjtgiM5rw1k5rEHtBANKmdudhGyBEajN01fEyhaAIhsoKNy6y7+IN93IfpFtwY9iqi7kD+xwKhQsNJA== -typescript@~5.1: +typescript@~5.1.6: version "5.1.6" resolved "https://registry.yarnpkg.com/typescript/-/typescript-5.1.6.tgz#02f8ac202b6dad2c0dd5e0913745b47a37998274" integrity sha512-zaWCozRZ6DLEWAWFrVDz1H6FVXzUSfTy5FUMWsQlU8Ym5JP9eO4xkTIROFCQvhQf61z6O/G6ugw3SgAnvvm+HA==