diff --git a/packages/feeds/compiler.ts b/packages/feeds/compiler.ts index 3c94a06..087721e 100644 --- a/packages/feeds/compiler.ts +++ b/packages/feeds/compiler.ts @@ -1,11 +1,11 @@ import {uniq, identity, flatten, pushToMapKey, intersection, tryCatch, now} from '@welshman/lib' -import type {Rumor, Filter} from '@welshman/util' +import type {TrustedEvent, Filter} from '@welshman/util' import {Tags, intersectFilters, getAddress, getIdFilters, unionFilters} from '@welshman/util' import type {CreatedAtItem, RequestItem, ListItem, WOTItem, DVMItem, Scope, Feed, FeedOptions} from './core' import {hasSubFeeds, getFeedArgs, feedsFromTags} from './utils' import {FeedType} from './core' -export class FeedCompiler { +export class FeedCompiler { constructor(readonly options: FeedOptions) {} walk(feed: Feed, visit: (feed: Feed) => void) { diff --git a/packages/feeds/loader.ts b/packages/feeds/loader.ts index 0354b24..a33085e 100644 --- a/packages/feeds/loader.ts +++ b/packages/feeds/loader.ts @@ -1,5 +1,5 @@ import {inc, max, min, now} from '@welshman/lib' -import type {Rumor, Filter} from '@welshman/util' +import type {TrustedEvent, Filter} from '@welshman/util' import {EPOCH, guessFilterDelta} from '@welshman/util' import type {Feed, RequestItem, FeedOptions} from './core' import {FeedType} from './core' @@ -12,7 +12,7 @@ export type LoadOpts = { export type Loader = (limit: number) => Promise -export class FeedLoader { +export class FeedLoader { compiler: FeedCompiler constructor(readonly options: FeedOptions) { diff --git a/packages/lib/Store.ts b/packages/lib/Store.ts index 2ebfc21..da1d471 100644 --- a/packages/lib/Store.ts +++ b/packages/lib/Store.ts @@ -4,14 +4,17 @@ import {ensurePlural, identity} from "./Tools" export type Invalidator = (value?: T) => void export type Subscriber = (value: T) => void -type Derivable = IReadable | IReadable[] +type Derivable = IDerivable | IDerivable[] type Unsubscriber = () => void type R = Record type M = Map -export interface IReadable { +export interface IDerivable { get: () => T subscribe(this: void, run: Subscriber, invalidate?: Invalidator): Unsubscriber +} + +export interface IReadable extends IDerivable { derived: (f: (v: T) => U) => IReadable throttle(t: number): IReadable } @@ -51,10 +54,6 @@ export class Writable implements IWritable { this.set(f(this.value)) } - async updateAsync(f: (v: T) => Promise) { - this.set(await f(this.value)) - } - subscribe(f: Subscriber) { this.subs.push(f) @@ -332,3 +331,48 @@ export const key = (base: Writable>, pk: string, key: string) new Key(base, pk, key) export const collection = (pk: string) => new Collection(pk) + +export const asReadable = (store: IDerivable) => { + return { + ...store, + derived: (f: (v: T) => U) => new Derived(store, f), + throttle: (t: number) => new Derived(store, identity, t), + } +} + +export type ICustomStore = { + get: () => T + start: (set: (x: T) => void) => () => void +} + +export const customStore = ({get, start}: ICustomStore) => { + const subs: Subscriber[] = [] + + const set = (newValue: T) => { + value = newValue + } + + let stop: () => void + let value = get() + + return asReadable({ + get: () => subs.length === 0 ? get() : value, + subscribe: (subscriber: Subscriber) => { + if (subs.length === 0) { + stop = start(set) + value = get() + } + + subs.push(subscriber) + + return () => { + subs.splice(subs.findIndex(sub => sub === subscriber), 1) + + if (subs.length === 0) { + stop() + } + } + }, + }) +} + diff --git a/packages/net/index.ts b/packages/net/index.ts index 3c1e39d..a605350 100644 --- a/packages/net/index.ts +++ b/packages/net/index.ts @@ -11,3 +11,4 @@ export * from "./target/Multi" export * from "./target/Plex" export * from "./target/Relay" export * from "./target/Relays" +export * from "./target/Local" diff --git a/packages/net/target/Local.ts b/packages/net/target/Local.ts new file mode 100644 index 0000000..cb583b1 --- /dev/null +++ b/packages/net/target/Local.ts @@ -0,0 +1,32 @@ +import {Emitter} from '@welshman/lib' +import {Repository, LOCAL_RELAY_URL, Relay} from '@welshman/util' +import type {Message} from '../Socket' + +export class Local extends Emitter { + relay: Relay + constructor(readonly repository: Repository) { + super() + + this.relay = new Relay(repository) + this.relay.on('*', this.onMessage) + } + + get connections() { + return [] + } + + send(...payload: Message) { + this.relay.send(...payload) + } + + onMessage = (...message: Message) => { + const [verb, ...payload] = message + + this.emit(verb, LOCAL_RELAY_URL, ...payload) + } + + cleanup = () => { + this.removeAllListeners() + this.relay.off('*', this.onMessage) + } +} diff --git a/packages/util/Events.ts b/packages/util/Events.ts index 7bf3b41..2167e05 100644 --- a/packages/util/Events.ts +++ b/packages/util/Events.ts @@ -45,7 +45,7 @@ export const createEvent = (kind: number, {content = "", tags = [], created_at = ({kind, content, tags, created_at}) export const isEventTemplate = (e: EventTemplate): e is EventTemplate => - Boolean(e.kind && e.tags && e.content && e.created_at) + Boolean(typeof e.kind === "number" && e.tags && typeof e.content === "string" && e.created_at) export const isOwnedEvent = (e: OwnedEvent): e is OwnedEvent => Boolean(isEventTemplate(e) && e.pubkey) diff --git a/packages/util/Links.ts b/packages/util/Links.ts index 966a305..fe0ff02 100644 --- a/packages/util/Links.ts +++ b/packages/util/Links.ts @@ -1,3 +1,3 @@ -export const fromNostrURI = (s: string) => s.replace(/^[\w+]+:\/?\/?/, "") +export const fromNostrURI = (s: string) => s.replace(/^nostr:\/?\/?/, "") export const toNostrURI = (s: string) => `nostr:${s}` diff --git a/packages/util/Relay.ts b/packages/util/Relay.ts index d7ee63b..04b94ea 100644 --- a/packages/util/Relay.ts +++ b/packages/util/Relay.ts @@ -1,42 +1,75 @@ -import {Emitter} from '@welshman/lib' +import {Emitter, normalizeUrl, stripProtocol} from '@welshman/lib' import {matchFilters} from './Filters' import type {Repository} from './Repository' import type {Filter} from './Filters' import type {TrustedEvent} from './Events' -export class Relay extends Emitter { +export const LOCAL_RELAY_URL = "local://welshman.relay" + +export const BOGUS_RELAY_URL = "bogus://welshman.relay" + +export const isShareableRelayUrl = (url: string) => + Boolean( + typeof url === 'string' && + // Is it actually a websocket url and has a dot + url.match(/^wss:\/\/.+\..+/) && + // Sometimes bugs cause multiple relays to get concatenated + url.match(/:\/\//g)?.length === 1 && + // It shouldn't have any whitespace, url-encoded or otherwise + !url.match(/\s|%/) && + // Don't match stuff with a port number + !url.slice(6).match(/:\d+/) && + // Don't match raw ip addresses + !url.slice(6).match(/\d+\.\d+\.\d+\.\d+/) && + // Skip nostr.wine's virtual relays + !url.slice(6).match(/\/npub/) + ) + +type NormalizeRelayUrlOpts = { + allowInsecure?: boolean +} + +export const normalizeRelayUrl = (url: string, {allowInsecure = false}: NormalizeRelayUrlOpts = {}) => { + const prefix = allowInsecure ? url.match(/^wss?:\/\//)?.[0] || "wss://" : "wss://" + + // Use our library to normalize + url = normalizeUrl(url, {stripHash: true, stripAuthentication: false}) + + // Strip the protocol since only wss works, lowercase + url = stripProtocol(url).toLowerCase() + + // Urls without pathnames are supposed to have a trailing slash + if (!url.includes("/")) { + url += "/" + } + + return prefix + url +} + +export class Relay extends Emitter { subs = new Map() - constructor(readonly repository: Repository) { + constructor(readonly repository: Repository) { super() } send(type: string, ...message: any[]) { switch(type) { - case 'EVENT': return this.handleEVENT(message as [string]) + case 'EVENT': return this.handleEVENT(message as [TrustedEvent]) case 'CLOSE': return this.handleCLOSE(message as [string]) case 'REQ': return this.handleREQ(message as [string, ...Filter[]]) } } - handleEVENT([json]: [string]) { - let event: E - try { - event = JSON.parse(json) - } catch (e) { - return - } - + handleEVENT([event]: [TrustedEvent]) { this.repository.publish(event) this.emit('OK', event.id, true, "") if (!this.repository.isDeleted(event)) { - const json = JSON.stringify(event) - for (const [subId, filters] of this.subs.entries()) { if (matchFilters(filters, event)) { - this.emit('EVENT', subId, json) + this.emit('EVENT', subId, event) } } } @@ -50,7 +83,7 @@ export class Relay extends Emitter { this.subs.set(subId, filters) for (const event of this.repository.query(filters)) { - this.emit('EVENT', subId, JSON.stringify(event)) + this.emit('EVENT', subId, event) } this.emit('EOSE', subId) diff --git a/packages/util/Relays.ts b/packages/util/Relays.ts deleted file mode 100644 index dcf3e72..0000000 --- a/packages/util/Relays.ts +++ /dev/null @@ -1,43 +0,0 @@ -import {normalizeUrl, stripProtocol} from '@welshman/lib' - -export const LOCAL_RELAY_URL = "local://welshman.relay" - -export const BOGUS_RELAY_URL = "bogus://welshman.relay" - -export const isShareableRelayUrl = (url: string) => - Boolean( - typeof url === 'string' && - // Is it actually a websocket url and has a dot - url.match(/^wss:\/\/.+\..+/) && - // Sometimes bugs cause multiple relays to get concatenated - url.match(/:\/\//g)?.length === 1 && - // It shouldn't have any whitespace, url-encoded or otherwise - !url.match(/\s|%/) && - // Don't match stuff with a port number - !url.slice(6).match(/:\d+/) && - // Don't match raw ip addresses - !url.slice(6).match(/\d+\.\d+\.\d+\.\d+/) && - // Skip nostr.wine's virtual relays - !url.slice(6).match(/\/npub/) - ) - -type NormalizeRelayUrlOpts = { - allowInsecure?: boolean -} - -export const normalizeRelayUrl = (url: string, {allowInsecure = false}: NormalizeRelayUrlOpts = {}) => { - const prefix = allowInsecure ? url.match(/^wss?:\/\//)?.[0] || "wss://" : "wss://" - - // Use our library to normalize - url = normalizeUrl(url, {stripHash: true, stripAuthentication: false}) - - // Strip the protocol since only wss works, lowercase - url = stripProtocol(url).toLowerCase() - - // Urls without pathnames are supposed to have a trailing slash - if (!url.includes("/")) { - url += "/" - } - - return prefix + url -} diff --git a/packages/util/Repository.ts b/packages/util/Repository.ts index efb93d8..e872c9e 100644 --- a/packages/util/Repository.ts +++ b/packages/util/Repository.ts @@ -1,10 +1,9 @@ import {throttle} from 'throttle-debounce' import type {IReadable, Subscriber, Invalidator} from '@welshman/lib' -import {Derived, Emitter, writable, first, always, chunk, sleep, uniq, omit, now, range, identity} from '@welshman/lib' +import {Derived, Emitter, sortBy, customStore, inc, first, always, chunk, sleep, uniq, omit, now, range, identity} from '@welshman/lib' import {DELETE} from './Kinds' import {matchFilter, getIdFilters, matchFilters} from './Filters' -import {encodeAddress, addressFromEvent} from './Address' -import {isReplaceable} from './Events' +import {isReplaceable, isTrustedEvent, getAddress} from './Events' import type {Filter} from './Filters' import type {TrustedEvent} from './Events' @@ -16,14 +15,14 @@ export type RepositoryOptions = { throttle?: number } -export class Repository extends Emitter implements IReadable> { - eventsById = new Map() - eventsByAddress = new Map() - eventsByTag = new Map() - eventsByDay = new Map() - eventsByAuthor = new Map() +export class Repository extends Emitter implements IReadable { + eventsById = new Map() + eventsByAddress = new Map() + eventsByTag = new Map() + eventsByDay = new Map() + eventsByAuthor = new Map() deletes = new Map() - subs: Subscriber[] = [] + subs: Subscriber[] = [] constructor(private options: RepositoryOptions) { super() @@ -36,76 +35,13 @@ export class Repository extends Emitter implements IRead // Methods for implementing store interface get() { - return this - } - - subscribe(f: Subscriber>, invalidate?: Invalidator>) { - this.subs.push(f) - - return () => { - this.subs = this.subs.filter(sub => sub !== f) - } - } - - derived(f: (v: Repository) => U): Derived { - return new Derived(this, f) - } - - throttle(t: number): Derived> { - return new Derived>(this, identity, t) - } - - filter(getFilters: () => Filter[]) { - const store = writable([]) - - const onNotify = (event?: E) => { - const filters = getFilters() - - if (!event || matchFilters(filters, event)) { - store.set(Array.from(this.query(filters))) - } - } - - const subscribe = store.subscribe.bind(store) - - store.subscribe = (f: Subscriber) => { - if (store.subs.length === 0) { - this.on('notify', onNotify) - onNotify() - } - - const unsubscribe = subscribe(f) - - return () => { - unsubscribe() - - if (store.subs.length === 0) { - this.off('notify', onNotify) - } - } - } - - return store - } - - notify(event?: E) { - for (const sub of this.subs) { - sub(this) - } - - this.emit('notify', event) - } - - // Load/dump - - dump() { return Array.from(this.eventsById.values()) } - async load(events: E[], chunkSize = 1000) { + async set(events: TrustedEvent[], chunkSize = 1000) { for (const eventsChunk of chunk(chunkSize, events)) { for (const event of eventsChunk) { - this._addEvent(event) + this.publish(event, {notify: false}) } if (eventsChunk.length === chunkSize) { @@ -116,6 +52,52 @@ export class Repository extends Emitter implements IRead this.notify() } + + subscribe(f: Subscriber, invalidate?: Invalidator) { + this.subs.push(f) + + return () => { + this.subs = this.subs.filter(sub => sub !== f) + } + } + + derived(f: (v: TrustedEvent[]) => U): Derived { + return new Derived(this, f) + } + + throttle(t: number): Derived { + return new Derived(this, identity, t) + } + + filter(getFilters: () => Filter[]) { + const getValue = () => Array.from(this.query(getFilters())) + + return customStore({ + get: getValue, + start: setValue => { + const onNotify = (event?: TrustedEvent) => { + if (!event || matchFilters(getFilters(), event)) { + setValue(getValue()) + } + } + + this.on('notify', onNotify) + + return () => this.off('notify', onNotify) + }, + }) + } + + notify(event?: TrustedEvent) { + const events = this.get() + + for (const sub of this.subs) { + sub(events) + } + + this.emit('notify', event) + } + // API getEvent(idOrAddress: string) { @@ -130,21 +112,20 @@ export class Repository extends Emitter implements IRead *query(filters: Filter[]) { for (let filter of filters) { - let events: Iterable = this.eventsById.values() + let events: TrustedEvent[] = Array.from(this.eventsById.values()) if (filter.ids) { - events = filter.ids!.map(id => this.eventsById.get(id)).filter(identity) as E[] + events = filter.ids!.map(id => this.eventsById.get(id)).filter(identity) as TrustedEvent[] filter = omit(['ids'], filter) } else if (filter.authors) { events = uniq(filter.authors!.flatMap(pubkey => this.eventsByAuthor.get(pubkey) || [])) filter = omit(['authors'], filter) } else if (filter.since || filter.until) { const sinceDay = getDay(filter.since || 0) - const untilDay = getDay(filter.since || now()) + const untilDay = getDay(filter.until || now()) - filter = omit(['since', 'until'], filter) events = uniq( - Array.from(range(sinceDay, untilDay)) + Array.from(range(sinceDay, inc(untilDay))) .flatMap((day: number) => this.eventsByDay.get(day) || []) ) } else { @@ -162,79 +143,67 @@ export class Repository extends Emitter implements IRead } } - for (const event of events) { + let i = 0 + + for (const event of sortBy((e: TrustedEvent) => -e.created_at, events)) { + if (filter.limit && i > filter.limit) { + break + } + if (!this.isDeleted(event) && matchFilter(filter, event)) { yield event + i += 1 } } } } - publish(event: E) { - const duplicateById = this.eventsById.get(event.id) - - if (duplicateById) { - return false + publish(event: TrustedEvent, {notify = false} = {}) { + if (!isTrustedEvent(event)) { + throw new Error("Invalid event published to Repository", event) } - const hasAddress = isReplaceable(event) - const address = encodeAddress(addressFromEvent(event)) - const duplicateByAddress = hasAddress ? this.eventsByAddress.get(address) : undefined + const address = getAddress(event) + const duplicate = ( + this.eventsById.get(event.id) || + this.eventsByAddress.get(address) + ) - if (duplicateByAddress && duplicateByAddress.created_at >= event.created_at) { - return false - } + // If our duplicate is newer than the event we're adding, we're done + if (!duplicate || duplicate.created_at < event.created_at) { + this.eventsById.set(event.id, event) - this._addEvent(event, duplicateByAddress) - - return true - } - - isDeleted(event: E) { - const idDeletedAt = this.deletes.get(event.id) || 0 - - if (idDeletedAt > event.created_at) { - return true - } - - if (isReplaceable(event)) { - const address = encodeAddress(addressFromEvent(event)) - const addressDeletedAt = this.deletes.get(address) || 0 - - if (addressDeletedAt > event.created_at) { - return true + if (isReplaceable(event)) { + this.eventsByAddress.set(address, event) } - } - return false - } + if (duplicate) { + this.eventsById.delete(duplicate.id) - // Implementation + if (isReplaceable(duplicate)) { + this.eventsByAddress.delete(address) + } + } - _addEvent(event: E, duplicate?: E) { - this.eventsById.set(event.id, event) + this._updateIndex(this.eventsByDay, getDay(event.created_at), event, duplicate) + this._updateIndex(this.eventsByAuthor, event.pubkey, event, duplicate) - if (isReplaceable(event)) { - this.eventsByAddress.set(encodeAddress(addressFromEvent(event)), event) - } + // Store our event by tags + for (const tag of event.tags) { + if (tag[0].length === 1) { + this._updateIndex(this.eventsByTag, tag.slice(0, 2).join(':'), event, duplicate) - this._updateIndex(this.eventsByDay, getDay(event.created_at), event, duplicate) - this._updateIndex(this.eventsByAuthor, event.pubkey, event, duplicate) + if (event.kind === DELETE) { + const id = tag[1] + const ts = Math.max(event.created_at, this.deletes.get(tag[1]) || 0) - for (const tag of event.tags) { - if (tag[0].length === 1) { - this._updateIndex(this.eventsByTag, tag.slice(0, 2).join(':'), event, duplicate) - - if (event.kind === DELETE) { - const id = tag[1] - const ts = Math.max(event.created_at, this.deletes.get(tag[1]) || 0) - - this.deletes.set(id, ts) + this.deletes.set(id, ts) + } } } } - if (!this.isDeleted(event)) { + if (notify && !this.isDeleted(event)) { // Deletes are tricky, re-evaluate all subscriptions if that's what we're dealing with if (event.kind === DELETE) { this.notify() @@ -244,11 +213,31 @@ export class Repository extends Emitter implements IRead } } - _updateIndex(m: Map, k: K, e: E, duplicate?: E) { + isDeleted(event: TrustedEvent) { + const idDeletedAt = this.deletes.get(event.id) || 0 + + if (idDeletedAt > event.created_at) { + return true + } + + if (isReplaceable(event)) { + const addressDeletedAt = this.deletes.get(getAddress(event)) || 0 + + if (addressDeletedAt > event.created_at) { + return true + } + } + + return false + } + + // Utilities + + _updateIndex(m: Map, k: K, e: TrustedEvent, duplicate?: TrustedEvent) { let a = m.get(k) || [] if (duplicate) { - a = a.filter((x: E) => x !== duplicate) + a = a.filter((x: TrustedEvent) => x !== duplicate) } a.push(e) diff --git a/packages/util/Router.ts b/packages/util/Router.ts index 4fb876e..ff7f92f 100644 --- a/packages/util/Router.ts +++ b/packages/util/Router.ts @@ -2,7 +2,7 @@ import {first, splitAt, identity, sortBy, uniq, shuffle, pushToMapKey} from '@we import {Tags, Tag} from '@welshman/util' import type {TrustedEvent} from './Events' import {getAddress, isReplaceable} from './Events' -import {isShareableRelayUrl} from './Relays' +import {isShareableRelayUrl} from './Relay' import {addressFromEvent, decodeAddress, isCommunityAddress, isGroupAddress} from './Address' export enum RelayMode { diff --git a/packages/util/Tags.ts b/packages/util/Tags.ts index ffa2497..78ab87d 100644 --- a/packages/util/Tags.ts +++ b/packages/util/Tags.ts @@ -1,7 +1,7 @@ import {EventTemplate} from 'nostr-tools' import type {OmitStatics} from '@welshman/lib' import {Fluent, ensurePlural} from '@welshman/lib' -import {isShareableRelayUrl, normalizeRelayUrl} from './Relays' +import {isShareableRelayUrl, normalizeRelayUrl} from './Relay' import type {Address} from './Address' import {encodeAddress, decodeAddress} from './Address' import {GROUP, COMMUNITY} from './Kinds' diff --git a/packages/util/index.ts b/packages/util/index.ts index 8bb1533..232e42a 100644 --- a/packages/util/index.ts +++ b/packages/util/index.ts @@ -4,7 +4,6 @@ export * from './Filters' export * from './Kinds' export * from './Links' export * from './Relay' -export * from './Relays' export * from './Repository' export * from './Router' export * from './Tags' diff --git a/versions.sh b/versions.sh new file mode 100755 index 0000000..06d6a1c --- /dev/null +++ b/versions.sh @@ -0,0 +1,7 @@ +#!/bin/bash + +for upstream in $(ls packages); do + version=$(sed -nr 's/ +"version": "(.+)",/\1/p' packages/$upstream/package.json) + + echo $upstream $version +done