diff --git a/packages/lib/Tools.ts b/packages/lib/Tools.ts index 18d2f56..d278fb6 100644 --- a/packages/lib/Tools.ts +++ b/packages/lib/Tools.ts @@ -130,6 +130,18 @@ export const ensurePlural = (x: T | T[]) => (x instanceof Array ? x : [x]) export const ensureNumber = (x: number | string) => parseFloat(x as string) +export const fromPairs = (pairs: [k?: string, v?: T, ...args: unknown[]][]) => { + const r: Record = {} + + for (const [k, v] of pairs) { + if (k && v) { + r[k] = v + } + } + + return r +} + export const flatten = (xs: T[][]) => xs.flatMap(identity) export const partition = (f: (x: T) => boolean, xs: T[]) => { diff --git a/packages/util/Relay.ts b/packages/util/Relay.ts index 51e9c69..2991c5a 100644 --- a/packages/util/Relay.ts +++ b/packages/util/Relay.ts @@ -1,40 +1,14 @@ -import type {Readable, Subscriber, Invalidator} from '@welshman/lib' -import {Derived, Emitter, randomId, chunk, sleep, uniq, omit, now, range, identity} from '@welshman/lib' -import {matchFilters, matchFilter} from './Filters' -import {encodeAddress, addressFromEvent} from './Address' -import {isReplaceable} from './Events' +import {Emitter} from '@welshman/lib' +import {matchFilters} from './Filters' +import type {Repository} from './Repository' import type {Filter} from './Filters' import type {Rumor} from './Events' -export const DAY = 86400 - -const getDay = (ts: number) => Math.floor(ts / DAY) - -export type Message = [string, ...any[]] - export class Relay extends Emitter { - eventsById = new Map() - eventsByAddress = new Map() - eventsByTag = new Map() - eventsByDay = new Map() - eventsByAuthor = new Map() - deletes = new Map() subs = new Map() - dump() { - return Array.from(this.eventsById.values()) - } - - async load(events: E[], chunkSize = 1000) { - for (const eventsChunk of chunk(chunkSize, events)) { - for (const event of eventsChunk) { - this._addEvent(event) - } - - if (eventsChunk.length === chunkSize) { - await sleep(1) - } - } + constructor(readonly repository: Repository) { + super() } send(type: string, ...message: any[]) { @@ -45,38 +19,6 @@ export class Relay extends Emitter { } } - has(id: string) { - return this.eventsById.has(id) - } - - get(id: string) { - return this.eventsById.get(id) - } - - put(event: E) { - const duplicateById = this.eventsById.get(event.id) - - if (duplicateById) { - return false - } - - const hasAddress = isReplaceable(event) - const address = encodeAddress(addressFromEvent(event)) - const duplicateByAddress = hasAddress ? this.eventsByAddress.get(address) : undefined - - if (duplicateByAddress && duplicateByAddress.created_at >= event.created_at) { - return false - } - - this._addEvent(event, duplicateByAddress) - - return true - } - - cursor(filters: Filter[]) { - return new RelayCursor(this, filters) - } - handleEVENT([json]: [string]) { let event: E try { @@ -85,11 +27,13 @@ export class Relay extends Emitter { return } - const added = this.put(event) + this.repository.publish(event) this.emit('OK', event.id, true, "") - if (added && !this._isDeleted(event)) { + if (!this.repository.isDeleted(event)) { + const json = JSON.stringify(event) + for (const [subId, filters] of this.subs.entries()) { if (matchFilters(filters, event)) { this.emit('EVENT', subId, json) @@ -105,151 +49,10 @@ export class Relay extends Emitter { handleREQ([subId, ...filters]: [string, ...Filter[]]) { this.subs.set(subId, filters) - const result = new Set() - - for (let filter of filters) { - let events: Iterable = this.eventsById.values() - - if (filter.ids) { - filter = omit(['ids'], filter) - events = filter.ids!.map(id => this.eventsById.get(id)).filter(identity) as E[] - } else if (filter.authors) { - filter = omit(['authors'], filter) - events = uniq(filter.authors!.flatMap(pubkey => this.eventsByAuthor.get(pubkey) || [])) - } else if (filter.since || filter.until) { - const sinceDay = getDay(filter.since || 0) - const untilDay = getDay(filter.since || now()) - - filter = omit(['since', 'until'], filter) - events = uniq( - Array.from(range(sinceDay, untilDay)) - .flatMap((day: number) => this.eventsByDay.get(day) || []) - ) - } else { - for (const [k, values] of Object.entries(filter)) { - if (!k.startsWith('#') || k.length !== 2) { - continue - } - - filter = omit([k], filter) - events = uniq( - (values as string[]).flatMap(v => this.eventsByTag.get(`${k[1]}:${v}`) || []) - ) - - break - } - } - - for (const event of events) { - if (!this._isDeleted(event) && matchFilter(filter, event)) { - result.add(event) - } - } - } - - for (const event of result) { + for (const event of this.repository.query(filters)) { this.emit('EVENT', subId, JSON.stringify(event)) } this.emit('EOSE', subId) } - - _isDeleted(event: E) { - const idDeletedAt = this.deletes.get(event.id) || 0 - - if (idDeletedAt > event.created_at) { - return true - } - - if (isReplaceable(event)) { - const address = encodeAddress(addressFromEvent(event)) - const addressDeletedAt = this.deletes.get(address) || 0 - - if (addressDeletedAt > event.created_at) { - return true - } - } - - return false - } - - _addEvent(event: E, duplicate?: E) { - this.eventsById.set(event.id, event) - - if (isReplaceable(event)) { - this.eventsByAddress.set(encodeAddress(addressFromEvent(event)), event) - } - - this._updateIndex(this.eventsByDay, getDay(event.created_at), event, duplicate) - this._updateIndex(this.eventsByAuthor, event.pubkey, event, duplicate) - - for (const tag of event.tags) { - if (tag[0].length === 1) { - this._updateIndex(this.eventsByTag, tag.slice(0, 2).join(':'), event, duplicate) - - if (event.kind === 5) { - this.deletes.set(tag[1], Math.max(event.created_at, this.deletes.get(tag[1]) || 0)) - } - } - } - } - - _updateIndex(m: Map, k: K, e: E, duplicate?: E) { - let a = m.get(k) || [] - - if (duplicate) { - a = a.filter((x: E) => x !== duplicate) - } - - a.push(e) - m.set(k, a) - } -} - -type Sub = (events: E[]) => void - -export class RelayCursor implements Readable { - subId = randomId() - subs: Sub[] = [] - events: E[] = [] - - constructor(readonly relay: Relay, readonly filters: Filter[]) { - this.relay.on('EVENT', (e: E) => { - this.events.push(e) - this.notify() - }) - - this.relay.handleREQ([this.subId, ...this.filters]) - } - - get() { - return this.events - } - - subscribe(f: Subscriber, invalidate?: Invalidator) { - this.subs.push(f) - - return () => { - this.subs = this.subs.filter(sub => sub !== f) - } - } - - derived(f: (v: E[]) => U): Derived { - return new Derived(this, f) - } - - throttle(t: number): Derived { - return new Derived(this, identity, t) - } - - notify() { - for (const sub of this.subs) { - sub(this.events) - } - } - - close() { - this.relay.handleCLOSE([this.subId]) - this.subs = [] - } } diff --git a/packages/util/Repository.ts b/packages/util/Repository.ts new file mode 100644 index 0000000..56893f7 --- /dev/null +++ b/packages/util/Repository.ts @@ -0,0 +1,199 @@ +import type {Readable, Subscriber, Invalidator} from '@welshman/lib' +import {Derived, chunk, sleep, uniq, omit, now, range, identity} from '@welshman/lib' +import {matchFilter} from './Filters' +import {encodeAddress, addressFromEvent} from './Address' +import {isReplaceable} from './Events' +import type {Filter} from './Filters' +import type {Rumor} from './Events' + +export const DAY = 86400 + +const getDay = (ts: number) => Math.floor(ts / DAY) + +export class Repository implements Readable> { + eventsById = new Map() + eventsByAddress = new Map() + eventsByTag = new Map() + eventsByDay = new Map() + eventsByAuthor = new Map() + deletes = new Map() + subs: Subscriber[] = [] + + // Methods for implementing store interface + + get() { + return this + } + + subscribe(f: Subscriber>, invalidate?: Invalidator>) { + this.subs.push(f) + + return () => { + this.subs = this.subs.filter(sub => sub !== f) + } + } + + derived(f: (v: Repository) => U): Derived { + return new Derived(this, f) + } + + throttle(t: number): Derived> { + return new Derived>(this, identity, t) + } + + notify() { + for (const sub of this.subs) { + sub(this) + } + } + + // Load/dump + + dump() { + return Array.from(this.eventsById.values()) + } + + async load(events: E[], chunkSize = 1000) { + for (const eventsChunk of chunk(chunkSize, events)) { + for (const event of eventsChunk) { + this._addEvent(event) + } + + if (eventsChunk.length === chunkSize) { + await sleep(1) + } + } + + this.notify() + } + + // API + + getEvent(idOrAddress: string) { + return idOrAddress.includes(':') + ? this.eventsByAddress.get(idOrAddress) + : this.eventsById.get(idOrAddress) + } + + *query(filters: Filter[]) { + for (let filter of filters) { + let events: Iterable = this.eventsById.values() + + if (filter.ids) { + filter = omit(['ids'], filter) + events = filter.ids!.map(id => this.eventsById.get(id)).filter(identity) as E[] + } else if (filter.authors) { + filter = omit(['authors'], filter) + events = uniq(filter.authors!.flatMap(pubkey => this.eventsByAuthor.get(pubkey) || [])) + } else if (filter.since || filter.until) { + const sinceDay = getDay(filter.since || 0) + const untilDay = getDay(filter.since || now()) + + filter = omit(['since', 'until'], filter) + events = uniq( + Array.from(range(sinceDay, untilDay)) + .flatMap((day: number) => this.eventsByDay.get(day) || []) + ) + } else { + for (const [k, values] of Object.entries(filter)) { + if (!k.startsWith('#') || k.length !== 2) { + continue + } + + filter = omit([k], filter) + events = uniq( + (values as string[]).flatMap(v => this.eventsByTag.get(`${k[1]}:${v}`) || []) + ) + + break + } + } + + for (const event of events) { + if (!this.isDeleted(event) && matchFilter(filter, event)) { + yield event + } + } + } + } + + publish(event: E) { + const duplicateById = this.eventsById.get(event.id) + + if (duplicateById) { + return false + } + + const hasAddress = isReplaceable(event) + const address = encodeAddress(addressFromEvent(event)) + const duplicateByAddress = hasAddress ? this.eventsByAddress.get(address) : undefined + + if (duplicateByAddress && duplicateByAddress.created_at >= event.created_at) { + return false + } + + this._addEvent(event, duplicateByAddress) + + return true + } + + isDeleted(event: E) { + const idDeletedAt = this.deletes.get(event.id) || 0 + + if (idDeletedAt > event.created_at) { + return true + } + + if (isReplaceable(event)) { + const address = encodeAddress(addressFromEvent(event)) + const addressDeletedAt = this.deletes.get(address) || 0 + + if (addressDeletedAt > event.created_at) { + return true + } + } + + return false + } + + // Implementation + + _addEvent(event: E, duplicate?: E) { + this.eventsById.set(event.id, event) + + if (isReplaceable(event)) { + this.eventsByAddress.set(encodeAddress(addressFromEvent(event)), event) + } + + this._updateIndex(this.eventsByDay, getDay(event.created_at), event, duplicate) + this._updateIndex(this.eventsByAuthor, event.pubkey, event, duplicate) + + for (const tag of event.tags) { + if (tag[0].length === 1) { + this._updateIndex(this.eventsByTag, tag.slice(0, 2).join(':'), event, duplicate) + + if (event.kind === 5) { + const id = tag[1] + const ts = Math.max(event.created_at, this.deletes.get(tag[1]) || 0) + + this.deletes.set(id, ts) + } + } + } + + if (!this.isDeleted(event)) { + this.notify() + } + } + + _updateIndex(m: Map, k: K, e: E, duplicate?: E) { + let a = m.get(k) || [] + + if (duplicate) { + a = a.filter((x: E) => x !== duplicate) + } + + a.push(e) + m.set(k, a) + } +} diff --git a/packages/util/index.ts b/packages/util/index.ts index 759f806..8bb1533 100644 --- a/packages/util/index.ts +++ b/packages/util/index.ts @@ -5,6 +5,7 @@ export * from './Kinds' export * from './Links' export * from './Relay' export * from './Relays' +export * from './Repository' export * from './Router' export * from './Tags' export * from './Zaps'