From 2a2b6b8fecf45bd6b8056d00e34aa4e0b522cd12 Mon Sep 17 00:00:00 2001 From: Jon Staab Date: Wed, 15 May 2024 09:15:46 -0700 Subject: [PATCH] Work some bugs out of relays/loaders/repository --- packages/feeds/loader.ts | 31 +++++----- packages/net/Subscribe.ts | 1 - packages/net/target/Local.ts | 8 +-- packages/util/Relay.ts | 28 +++++---- packages/util/Repository.ts | 110 +++++++++++++++++++++-------------- packages/util/Router.ts | 4 +- 6 files changed, 106 insertions(+), 76 deletions(-) diff --git a/packages/feeds/loader.ts b/packages/feeds/loader.ts index a33085e..f7cdfee 100644 --- a/packages/feeds/loader.ts +++ b/packages/feeds/loader.ts @@ -89,35 +89,38 @@ export class FeedLoader { const requestFilters = filters! // Remove filters that don't fit our window .filter((filter: Filter) => { - const filterSince = filter.since || EPOCH - const filterUntil = filter.until || now() + const filterSince = filter.since || minSince + const filterUntil = filter.until || maxUntil return filterSince < until && filterUntil > since }) // Modify the filters to define our window .map((filter: Filter) => ({...filter, until, limit, since})) + if (requestFilters.length === 0) { + return onExhausted?.() + } + let count = 0 - if (requestFilters.length > 0) { - await this.options.request({ - relays, - filters: requestFilters, - onEvent: (event: E) => { - count += 1 - until = Math.min(until, event.created_at) - onEvent?.(event) - }, - }) - } + await this.options.request({ + relays, + filters: requestFilters, + onEvent: (event: E) => { + count += 1 + until = Math.min(until, event.created_at) + onEvent?.(event) + }, + }) // Relays can't be relied upon to return events in descending order, do exponential // windowing to ensure we get the most recent stuff on first load, but eventually find it all if (count === 0) { delta *= 10 + until = since } - since = Math.max(minSince, since - delta) + since = Math.max(minSince, until - delta) if (since === minSince) { onExhausted?.() diff --git a/packages/net/Subscribe.ts b/packages/net/Subscribe.ts index 0d188ce..01c1df8 100644 --- a/packages/net/Subscribe.ts +++ b/packages/net/Subscribe.ts @@ -240,7 +240,6 @@ export const executeSubscription = (sub: Subscription) => { // Listen for abort via caller signal signal?.addEventListener('abort', complete) - signal?.addEventListener('abort', () => console.log('aborted')) // Listen for abort via our own internal signal controller.signal.addEventListener('abort', complete) diff --git a/packages/net/target/Local.ts b/packages/net/target/Local.ts index cb583b1..9dfb4fa 100644 --- a/packages/net/target/Local.ts +++ b/packages/net/target/Local.ts @@ -1,14 +1,12 @@ import {Emitter} from '@welshman/lib' -import {Repository, LOCAL_RELAY_URL, Relay} from '@welshman/util' +import {Relay, LOCAL_RELAY_URL} from '@welshman/util' import type {Message} from '../Socket' export class Local extends Emitter { - relay: Relay - constructor(readonly repository: Repository) { + constructor(readonly relay: Relay) { super() - this.relay = new Relay(repository) - this.relay.on('*', this.onMessage) + relay.on('*', this.onMessage) } get connections() { diff --git a/packages/util/Relay.ts b/packages/util/Relay.ts index 04b94ea..2d03d0e 100644 --- a/packages/util/Relay.ts +++ b/packages/util/Relay.ts @@ -1,4 +1,4 @@ -import {Emitter, normalizeUrl, stripProtocol} from '@welshman/lib' +import {Emitter, normalizeUrl, sleep, stripProtocol} from '@welshman/lib' import {matchFilters} from './Filters' import type {Repository} from './Repository' import type {Filter} from './Filters' @@ -64,15 +64,18 @@ export class Relay extends Emitter { handleEVENT([event]: [TrustedEvent]) { this.repository.publish(event) - this.emit('OK', event.id, true, "") + // Callers generally expect async relays + sleep(1).then(() => { + this.emit('OK', event.id, true, "") - if (!this.repository.isDeleted(event)) { - for (const [subId, filters] of this.subs.entries()) { - if (matchFilters(filters, event)) { - this.emit('EVENT', subId, event) + if (!this.repository.isDeleted(event)) { + for (const [subId, filters] of this.subs.entries()) { + if (matchFilters(filters, event)) { + this.emit('EVENT', subId, event) + } } } - } + }) } handleCLOSE([subId]: [string]) { @@ -82,10 +85,13 @@ export class Relay extends Emitter { handleREQ([subId, ...filters]: [string, ...Filter[]]) { this.subs.set(subId, filters) - for (const event of this.repository.query(filters)) { - this.emit('EVENT', subId, event) - } + // Callers generally expect async relays + sleep(1).then(() => { + for (const event of this.repository.query(filters)) { + this.emit('EVENT', subId, event) + } - this.emit('EOSE', subId) + this.emit('EOSE', subId) + }) } } diff --git a/packages/util/Repository.ts b/packages/util/Repository.ts index e872c9e..49c4223 100644 --- a/packages/util/Repository.ts +++ b/packages/util/Repository.ts @@ -2,7 +2,7 @@ import {throttle} from 'throttle-debounce' import type {IReadable, Subscriber, Invalidator} from '@welshman/lib' import {Derived, Emitter, sortBy, customStore, inc, first, always, chunk, sleep, uniq, omit, now, range, identity} from '@welshman/lib' import {DELETE} from './Kinds' -import {matchFilter, getIdFilters, matchFilters} from './Filters' +import {EPOCH, matchFilter, getIdFilters, matchFilters} from './Filters' import {isReplaceable, isTrustedEvent, getAddress} from './Events' import type {Filter} from './Filters' import type {TrustedEvent} from './Events' @@ -11,6 +11,8 @@ export const DAY = 86400 const getDay = (ts: number) => Math.floor(ts / DAY) +const maybeThrottle = (t: number | undefined, f: () => void) => t ? throttle(t, f) : f + export type RepositoryOptions = { throttle?: number } @@ -26,10 +28,6 @@ export class Repository extends Emitter implements IReadable { constructor(private options: RepositoryOptions) { super() - - if (options.throttle) { - this.notify = throttle(options.throttle, this.notify.bind(this)) - } } // Methods for implementing store interface @@ -41,15 +39,13 @@ export class Repository extends Emitter implements IReadable { async set(events: TrustedEvent[], chunkSize = 1000) { for (const eventsChunk of chunk(chunkSize, events)) { for (const event of eventsChunk) { - this.publish(event, {notify: false}) + this.publish(event) } if (eventsChunk.length === chunkSize) { await sleep(1) } } - - this.notify() } @@ -75,27 +71,43 @@ export class Repository extends Emitter implements IReadable { return customStore({ get: getValue, start: setValue => { - const onNotify = (event?: TrustedEvent) => { - if (!event || matchFilters(getFilters(), event)) { + const onEvent = (event: TrustedEvent) => { + if (matchFilters(getFilters(), event)) { setValue(getValue()) } } - this.on('notify', onNotify) + const onDelete = () => { + setValue(getValue()) + } - return () => this.off('notify', onNotify) + this.on('event', onEvent) + this.on('delete', onDelete) + + return () => { + this.off('event', onEvent) + this.off('delete', onDelete) + } }, }) } - notify(event?: TrustedEvent) { + notifyUpdate = maybeThrottle(this.options.throttle, () => { const events = this.get() for (const sub of this.subs) { sub(events) } - this.emit('notify', event) + this.emit('update') + }) + + notifyEvent = (event: TrustedEvent) => { + this.emit('event', event) + } + + notifyDelete = (event: TrustedEvent) => { + this.emit('delete', event) } // API @@ -106,6 +118,15 @@ export class Repository extends Emitter implements IReadable { : this.eventsById.get(idOrAddress) } + hasEvent(event: TrustedEvent) { + const duplicate = ( + this.eventsById.get(event.id) || + this.eventsByAddress.get(getAddress(event)) + ) + + return duplicate && duplicate.created_at >= event.created_at + } + watchEvent(idOrAddress: string) { return this.filter(always(getIdFilters([idOrAddress]))).derived(first) } @@ -121,7 +142,7 @@ export class Repository extends Emitter implements IReadable { events = uniq(filter.authors!.flatMap(pubkey => this.eventsByAuthor.get(pubkey) || [])) filter = omit(['authors'], filter) } else if (filter.since || filter.until) { - const sinceDay = getDay(filter.since || 0) + const sinceDay = getDay(filter.since || EPOCH) const untilDay = getDay(filter.until || now()) events = uniq( @@ -158,7 +179,7 @@ export class Repository extends Emitter implements IReadable { } } - publish(event: TrustedEvent, {notify = false} = {}) { + publish(event: TrustedEvent) { if (!isTrustedEvent(event)) { throw new Error("Invalid event published to Repository", event) } @@ -170,45 +191,48 @@ export class Repository extends Emitter implements IReadable { ) // If our duplicate is newer than the event we're adding, we're done - if (!duplicate || duplicate.created_at < event.created_at) { - this.eventsById.set(event.id, event) + if (duplicate && duplicate.created_at >= event.created_at) { + return + } - if (isReplaceable(event)) { - this.eventsByAddress.set(address, event) + this.eventsById.set(event.id, event) + + if (isReplaceable(event)) { + this.eventsByAddress.set(address, event) + } + + if (duplicate) { + this.eventsById.delete(duplicate.id) + + if (isReplaceable(duplicate)) { + this.eventsByAddress.delete(address) } + } - if (duplicate) { - this.eventsById.delete(duplicate.id) + this._updateIndex(this.eventsByDay, getDay(event.created_at), event, duplicate) + this._updateIndex(this.eventsByAuthor, event.pubkey, event, duplicate) - if (isReplaceable(duplicate)) { - this.eventsByAddress.delete(address) - } - } + // Store our event by tags + for (const tag of event.tags) { + if (tag[0].length === 1) { + this._updateIndex(this.eventsByTag, tag.slice(0, 2).join(':'), event, duplicate) - this._updateIndex(this.eventsByDay, getDay(event.created_at), event, duplicate) - this._updateIndex(this.eventsByAuthor, event.pubkey, event, duplicate) + if (event.kind === DELETE) { + const id = tag[1] + const ts = Math.max(event.created_at, this.deletes.get(tag[1]) || 0) - // Store our event by tags - for (const tag of event.tags) { - if (tag[0].length === 1) { - this._updateIndex(this.eventsByTag, tag.slice(0, 2).join(':'), event, duplicate) - - if (event.kind === DELETE) { - const id = tag[1] - const ts = Math.max(event.created_at, this.deletes.get(tag[1]) || 0) - - this.deletes.set(id, ts) - } + this.deletes.set(id, ts) } } } - if (notify && !this.isDeleted(event)) { + if (!this.isDeleted(event)) { + this.notifyUpdate() + this.notifyEvent(event) + // Deletes are tricky, re-evaluate all subscriptions if that's what we're dealing with if (event.kind === DELETE) { - this.notify() - } else { - this.notify(event) + this.notifyDelete(event) } } } diff --git a/packages/util/Router.ts b/packages/util/Router.ts index ff7f92f..eef4213 100644 --- a/packages/util/Router.ts +++ b/packages/util/Router.ts @@ -268,9 +268,9 @@ export class Router { // Fallback policies - addNoFallbacks = (count: number, redundancy: number) => count + addNoFallbacks = (count: number, redundancy: number) => 0 - addMinimalFallbacks = (count: number, redundancy: number) => Math.max(count, 1) + addMinimalFallbacks = (count: number, redundancy: number) => count > 0 ? 0 : 1 addMaximalFallbacks = (count: number, redundancy: number) => redundancy - count