From d2ae601ac0cf082414c3e6768d26f0c958eeef8d Mon Sep 17 00:00:00 2001 From: Jon Staab Date: Thu, 9 Nov 2023 15:28:10 -0800 Subject: [PATCH] Fix premature closure of connections, add Tags util --- package.json | 3 +- src/Subscription.ts | 18 ++++- src/main.ts | 1 + src/util/nostr.ts | 187 +++++++++++++++++++++++++++++++++++++++----- yarn.lock | 5 ++ 5 files changed, 190 insertions(+), 24 deletions(-) diff --git a/package.json b/package.json index bc264c8..d8e9f62 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "paravel", - "version": "0.4.0", + "version": "0.4.1", "description": "Yet another toolkit for nostr", "author": "hodlbod", "license": "MIT", @@ -35,6 +35,7 @@ }, "dependencies": { "isomorphic-ws": "^5.0.0", + "normalize-url": "^8.0.0", "nostr-tools": "^1.15.0", "ws": "^8.14.2" }, diff --git a/src/Subscription.ts b/src/Subscription.ts index a2dfc6b..1da5024 100644 --- a/src/Subscription.ts +++ b/src/Subscription.ts @@ -14,8 +14,10 @@ export type SubscriptionOpts = { export class Subscription extends EventEmitter { unsubscribe: () => void + dead = new Set() seen = new Set() eose = new Set() + closeHandlers = new Map() opened = Date.now() closed?: number @@ -30,7 +32,19 @@ export class Subscription extends EventEmitter { } // If one of our connections gets closed make sure to kill our sub - executor.target.connections.forEach(con => con.on("close", this.close)) + executor.target.connections.forEach(con => { + const handler = () => { + this.dead.add(con.url) + + if (this.dead.size === executor.target.connections.length) { + this.close() + } + } + + this.closeHandlers.set(con.url, handler) + + con.on("close", handler) + }) // Start our subscription const sub = executor.subscribe(filters, { @@ -92,7 +106,7 @@ export class Subscription extends EventEmitter { this.emit("close") this.removeAllListeners() - target.connections.forEach(con => con.off("close", this.close)) + target.connections.forEach(con => con.off("close", this.closeHandlers.get(con.url))) target.cleanup() } } diff --git a/src/main.ts b/src/main.ts index 659dafe..54ab566 100644 --- a/src/main.ts +++ b/src/main.ts @@ -4,6 +4,7 @@ export * from "./Executor" export * from "./Pool" export * from "./Subscription" export * from "./util/nostr" +export * from "./util/LRUCache" export * from "./util/Deferred" export * from "./util/Emitter" export * from "./util/Queue" diff --git a/src/util/nostr.ts b/src/util/nostr.ts index 56c7053..f47ebb4 100644 --- a/src/util/nostr.ts +++ b/src/util/nostr.ts @@ -1,4 +1,5 @@ import type {Event} from 'nostr-tools' +import normalizeUrl from "normalize-url" import {verifySignature, getEventHash, matchFilter as nostrToolsMatchFilter} from 'nostr-tools' import {cached} from "./LRUCache" @@ -7,38 +8,51 @@ import {cached} from "./LRUCache" export const now = () => Math.round(Date.now() / 1000) +export const last = (xs: T[]) => xs[xs.length - 1] + +export const identity = (x: T) => x + // =========================================================================== // Relays export const stripProto = (url: string) => url.replace(/.*:\/\//, "") export const isShareableRelay = (url: string) => - // Is it actually a websocket url - url.match(/^wss:\/\/.+/) && - // Sometimes bugs cause multiple relays to get concatenated - url.match(/:\/\//g)?.length === 1 && - // It shouldn't have any whitespace - !url.match(/\s/) && - // Don't match stuff with a port number - !url.slice(6).match(/:\d+/) && - // Don't match raw ip addresses - !url.slice(6).match(/\d+\.\d+\.\d+\.\d+/) && - // Skip nostr.wine's virtual relays - !url.slice(6).match(/\/npub/) + Boolean( + // Is it actually a websocket url and has a dot + url.match(/^wss?:\/\/.+\..+/) && + // Sometimes bugs cause multiple relays to get concatenated + url.match(/:\/\//g)?.length === 1 && + // It shouldn't have any whitespace + !url.match(/\s/) && + // It shouldn't have any url-encoded whitespace + !url.match(/%/) && + // Is it secure + url.match(/^wss:\/\/.+/) && + // Don't match stuff with a port number + !url.slice(6).match(/:\d+/) && + // Don't match raw ip addresses + !url.slice(6).match(/\d+\.\d+\.\d+\.\d+/) && + // Skip nostr.wine's virtual relays + !url.slice(6).match(/\/npub/) + ) export const normalizeRelayUrl = (url: string) => { - // If it doesn't start with a compatible protocol, strip the proto and add wss - if (!url.match(/^(wss|local):\/\/.+/)) { - url = "wss://" + stripProto(url) + // Use our library to normalize + url = normalizeUrl(url, {stripHash: true, stripAuthentication: false}) + + // Strip the protocol since only wss works + url = stripProto(url) + + // Urls without pathnames are supposed to have a trailing slash + if (!url.includes("/")) { + url += "/" } - try { - return new URL(url).href.replace(/\/+$/, "").toLowerCase() - } catch (e) { - return null - } + return "wss://" + url } + // =========================================================================== // Nostr URIs @@ -49,7 +63,13 @@ export const toNostrURI = (s: string) => `nostr:${s}` // =========================================================================== // Events -export const createEvent = (kind: number, {content = "", tags = [], created_at = now()}) => +export type CreateEventOpts = { + content?: string + tags?: string[][] + created_at?: number +} + +export const createEvent = (kind: number, {content = "", tags = [], created_at = now()}: CreateEventOpts) => ({kind, content, tags, created_at}) export const hasValidSignature = cached({ @@ -66,6 +86,131 @@ export const hasValidSignature = cached({ }, }) +// ========================================================================== +// Tags + +export class Fluent { + xs: any[] + + constructor(xs: T[]) { + this.xs = xs.filter(identity) + } + + as = (f: (xs: T[]) => U) => f(this.xs) + + all = () => this.xs + + count = () => this.xs.length + + exists = () => this.xs.length > 0 + + first = () => this.xs[0] + + nth = (i: number) => this.xs[i] + + last = () => last(this.xs) + + flat = () => new Fluent(this.xs.flatMap(identity)) + + uniq = () => new Fluent(Array.from(new Set(this.xs))) + + drop = (n: number) => new Fluent(this.xs.map(t => t.slice(n))) + + take = (n: number) => new Fluent(this.xs.map(t => t.slice(0, n))) + + map = (f: (t: T) => U) => new Fluent(this.xs.map(f)) + + pluck = (k: number | string) => new Fluent(this.xs.map(x => x[k])) + + filter = (f: (t: T) => boolean) => new Fluent(this.xs.filter(f)) + + reject = (f: (t: T) => boolean) => new Fluent(this.xs.filter(t => !f(t))) + + any = (f: (t: T) => boolean) => this.filter(f).exists() + + find = (f: (t: T) => boolean) => this.xs.find(f) + + has = (x: any) => this.xs.includes(x) +} + +export class Tags extends Fluent { + static from (e: Event | Event[]) { + const events = Array.isArray(e) ? e : [e] + + return new Tags(events.flatMap(e => e.tags)) + } + + valueEquals = (v: string) => new Tags(this.xs.filter(t => t[1] === v)) + + values = (k?: string) => this.filter(t => !k || t[0] === k).pluck(1) + + type(t: string | string[]) { + const types = Array.isArray(t) ? t : [t] + + return new Tags(this.xs.filter(t => types.includes(t[0]))) + } + + mark(m: string | string[]) { + const marks = Array.isArray(m) ? m : [m] + + return new Tags(this.xs.filter(t => marks.includes(last(t)))) + } + + relays = () => this.flat().filter(isShareableRelay).uniq() + + topics = () => this.type("t").values().map((t: string) => t.replace(/^#/, "")) + + pubkeys = () => this.type("p").values() + + urls = () => this.type("r").values() + + getValue = (k?: string) => this.values(k).first() + + getDict() { + const meta: Record = {} + + for (const [k, v] of this.xs) { + if (!meta[k]) { + meta[k] = v + } + } + + return meta + } + + // Support the deprecated version where tags are not marked as replies + normalize() { + const tags = this.type(["a", "e"]).filter(t => last(t) !== "mention") + const legacy = tags.any(t => !["reply", "root"].includes(last(t))) + + if (!legacy) { + return this + } + + const reply = tags.last() + const root = tags.count() > 1 ? tags.first() : null + const newTags = tags.reject(t => [reply?.[1], root?.[1]].includes(t[1])).all() + + if (reply) { + newTags.push(reply.slice(0, 3).concat("reply")) + } + + if (root) { + newTags.push(root.slice(0, 3).concat("root")) + } + + return new Tags(newTags) + } + + getReply = () => { + const tags = this.normalize() + + return tags.mark('reply').values().first() || tags.mark('root').values().first() + } + + getRoot = () => this.normalize().mark('root').values().first() +} + // =========================================================================== // Filters diff --git a/yarn.lock b/yarn.lock index 448373c..bd7a97c 100644 --- a/yarn.lock +++ b/yarn.lock @@ -1365,6 +1365,11 @@ normalize-package-data@^3.0.0: semver "^7.3.4" validate-npm-package-license "^3.0.1" +normalize-url@^8.0.0: + version "8.0.0" + resolved "https://registry.yarnpkg.com/normalize-url/-/normalize-url-8.0.0.tgz#593dbd284f743e8dcf6a5ddf8fadff149c82701a" + integrity sha512-uVFpKhj5MheNBJRTiMZ9pE/7hD1QTeEvugSJW/OmLzAp78PB5O6adfMNTvmfKhXBkvCzC+rqifWcVYpGFwTjnw== + nostr-tools@^1.15.0: version "1.15.0" resolved "https://registry.npmjs.org/nostr-tools/-/nostr-tools-1.15.0.tgz"