Add Subscription

This commit is contained in:
Jonathan Staab
2023-10-25 11:21:32 -07:00
parent 7f1ac721dc
commit 77dc2144d3
4 changed files with 223 additions and 1 deletions
+89
View File
@@ -0,0 +1,89 @@
import EventEmitter from "events"
import type {Event} from 'nostr-tools'
import type {Executor} from "./Executor"
import type {Filter} from './util/nostr'
import {matchFilters, hasValidSignature} from "./util/nostr"
export type SubscriptionOpts = {
executor: Executor
filters: Filter[]
timeout?: number
hasSeen?: (e: Event) => boolean
}
export class Subscription extends EventEmitter {
unsubscribe: () => void
seen = new Set<string>()
opened = Date.now()
closed?: number
constructor(readonly opts: SubscriptionOpts) {
super()
const {executor, timeout, filters} = this.opts
// If we have a timeout, close the subscription automatically
if (timeout) {
setTimeout(this.close, timeout)
}
// If one of our connections gets closed make sure to kill our sub
executor.target.connections.forEach(con => con.on("close", this.close))
// Start our subscription
const sub = executor.subscribe(filters, {
onEvent: this.onEvent,
onEose: this.onEose,
})
this.unsubscribe = sub.unsubscribe
}
hasSeen = (event: Event) => {
if (this.opts.hasSeen) {
return this.opts.hasSeen(event)
}
if (this.seen.has(event.id)) {
return true
}
this.seen.add(event.id)
return false
}
onEvent = (url: string, event: Event) => {
// If we've seen this event, don't re-validate
// Otherwise, check the signature and filters
if (this.hasSeen(event)) {
this.emit("duplicate", event, url)
} else {
if (!hasValidSignature(event)) {
this.emit("invalid-signature", event, url)
} else if (!matchFilters(this.opts.filters, event)) {
this.emit("failed-filter", event, url)
} else {
this.emit("event", event, url)
}
}
}
onEose = (url: string) => {
this.emit("eose", url)
}
close = () => {
if (!this.closed) {
const {target} = this.opts.executor
this.closed = Date.now()
this.unsubscribe()
this.emit("close")
this.removeAllListeners()
target.connections.forEach(con => con.off("close", this.close))
target.cleanup()
}
}
}
+61
View File
@@ -0,0 +1,61 @@
export class LRUCache<T, U> {
map = new Map<T, U>()
keys: T[] = []
constructor(readonly maxSize: number) {}
has(k: T) {
return this.map.has(k)
}
get(k: T) {
const v = this.map.get(k)
if (v !== undefined) {
this.keys.push(this.keys.shift() as T)
}
return v
}
set(k: T, v: U) {
this.map.set(k, v)
this.keys.push(k)
if (this.map.size > this.maxSize) {
this.map.delete(this.keys.shift() as T)
}
}
}
export function cached<T, V>({
maxSize,
getKey,
getValue,
}: {
maxSize: number
getKey: (args: any[]) => T
getValue: (args: any[]) => V
}) {
const cache = new LRUCache<T, V>(maxSize)
const get = (...args: any[]) => {
const k = getKey(args)
let v = cache.get(k)
if (!v) {
v = getValue(args)
cache.set(k, v)
}
return v
}
get.cache = cache
get.getKey = getKey
get.getValue = getValue
return get
}
+72
View File
@@ -1,3 +1,10 @@
import type {Event} from 'nostr-tools'
import {verifySignature, getEventHash, matchFilter as nostrToolsMatchFilter} from 'nostr-tools'
import {cached} from "./LRUCache"
// ===========================================================================
// Relays
export const stripProto = (url: string) => url.replace(/.*:\/\//, "") export const stripProto = (url: string) => url.replace(/.*:\/\//, "")
export const isShareableRelay = (url: string) => export const isShareableRelay = (url: string) =>
@@ -27,6 +34,71 @@ export const normalizeRelayUrl = (url: string) => {
} }
} }
// ===========================================================================
// Nostr URIs
export const fromNostrURI = (s: string) => s.replace(/^[\w+]+:\/?\/?/, "") export const fromNostrURI = (s: string) => s.replace(/^[\w+]+:\/?\/?/, "")
export const toNostrURI = (s: string) => `nostr:${s}` export const toNostrURI = (s: string) => `nostr:${s}`
// ===========================================================================
// Events
export const hasValidSignature = cached({
maxSize: 10000,
getKey: ([e]: any[]) => [getEventHash(e), e.sig].join(":"),
getValue: ([e]: any[]) => {
try {
verifySignature(e)
} catch (e) {
return false
}
return true
},
})
// ===========================================================================
// Filters
export type Filter = {
ids?: string[]
kinds?: number[]
authors?: string[]
since?: number
until?: number
limit?: number
search?: string
[key: `#${string}`]: string[]
}
export const matchFilter = (filter: Filter, event: Event) => {
if (!nostrToolsMatchFilter(filter, event)) {
return false
}
if (filter.search) {
const content = event.content.toLowerCase()
const terms = filter.search.toLowerCase().split(/\s+/g)
for (const term of terms) {
if (content.includes(term)) {
return true
}
return false
}
}
return true
}
export const matchFilters = (filters: Filter[], event: Event) => {
for (const filter of filters) {
if (matchFilter(filter, event)) {
return true
}
}
return false
}
+1 -1
View File
@@ -1939,7 +1939,7 @@ type-fest@^0.8.1:
resolved "https://registry.yarnpkg.com/type-fest/-/type-fest-0.8.1.tgz#09e249ebde851d3b1e48d27c105444667f17b83d" resolved "https://registry.yarnpkg.com/type-fest/-/type-fest-0.8.1.tgz#09e249ebde851d3b1e48d27c105444667f17b83d"
integrity sha512-4dbzIzqvjtgiM5rw1k5rEHtBANKmdudhGyBEajN01fEyhaAIhsoKNy6y7+IN93IfpFtwY9iqi7kD+xwKhQsNJA== integrity sha512-4dbzIzqvjtgiM5rw1k5rEHtBANKmdudhGyBEajN01fEyhaAIhsoKNy6y7+IN93IfpFtwY9iqi7kD+xwKhQsNJA==
typescript@~5.1: typescript@~5.1.6:
version "5.1.6" version "5.1.6"
resolved "https://registry.yarnpkg.com/typescript/-/typescript-5.1.6.tgz#02f8ac202b6dad2c0dd5e0913745b47a37998274" resolved "https://registry.yarnpkg.com/typescript/-/typescript-5.1.6.tgz#02f8ac202b6dad2c0dd5e0913745b47a37998274"
integrity sha512-zaWCozRZ6DLEWAWFrVDz1H6FVXzUSfTy5FUMWsQlU8Ym5JP9eO4xkTIROFCQvhQf61z6O/G6ugw3SgAnvvm+HA== integrity sha512-zaWCozRZ6DLEWAWFrVDz1H6FVXzUSfTy5FUMWsQlU8Ym5JP9eO4xkTIROFCQvhQf61z6O/G6ugw3SgAnvvm+HA==