Add filter method to repository

This commit is contained in:
Jon Staab
2024-05-07 14:45:10 -07:00
parent 199dbca32a
commit f75f24c2ec
5 changed files with 81 additions and 21 deletions
+66 -8
View File
@@ -1,6 +1,8 @@
import {throttle} from 'throttle-debounce'
import type {Readable, Subscriber, Invalidator} from '@welshman/lib'
import {Derived, chunk, sleep, uniq, omit, now, range, identity} from '@welshman/lib'
import {matchFilter} from './Filters'
import {Derived, Emitter, writable, first, always, chunk, sleep, uniq, omit, now, range, identity} from '@welshman/lib'
import {Kind} from './Kinds'
import {matchFilter, getIdFilters, matchFilters} from './Filters'
import {encodeAddress, addressFromEvent} from './Address'
import {isReplaceable} from './Events'
import type {Filter} from './Filters'
@@ -10,7 +12,11 @@ export const DAY = 86400
const getDay = (ts: number) => Math.floor(ts / DAY)
export class Repository<E extends Rumor> implements Readable<Repository<E>> {
export type RepositoryOptions = {
throttle?: number
}
export class Repository<E extends Rumor> extends Emitter implements Readable<Repository<E>> {
eventsById = new Map<string, E>()
eventsByAddress = new Map<string, E>()
eventsByTag = new Map<string, E[]>()
@@ -19,6 +25,14 @@ export class Repository<E extends Rumor> implements Readable<Repository<E>> {
deletes = new Map<string, number>()
subs: Subscriber<typeof this>[] = []
constructor(private options: RepositoryOptions) {
super()
if (options.throttle) {
this.notify = throttle(options.throttle, this.notify.bind(this))
}
}
// Methods for implementing store interface
get() {
@@ -41,10 +55,45 @@ export class Repository<E extends Rumor> implements Readable<Repository<E>> {
return new Derived<Repository<E>>(this, identity, t)
}
notify() {
filter(getFilters: () => Filter[]) {
const store = writable<E[]>([])
const onNotify = (event?: E) => {
const filters = getFilters()
if (!event || matchFilters(filters, event)) {
store.set(Array.from(this.query(filters)))
}
}
const subscribe = store.subscribe.bind(store)
store.subscribe = (f: Subscriber<E[]>) => {
if (store.subs.length === 0) {
this.on('notify', onNotify)
onNotify()
}
const unsubscribe = subscribe(f)
return () => {
unsubscribe()
if (store.subs.length === 0) {
this.off('notify', onNotify)
}
}
}
return store
}
notify(event?: E) {
for (const sub of this.subs) {
sub(this)
}
this.emit('notify', event)
}
// Load/dump
@@ -75,16 +124,20 @@ export class Repository<E extends Rumor> implements Readable<Repository<E>> {
: this.eventsById.get(idOrAddress)
}
watchEvent(idOrAddress: string) {
return this.filter(always(getIdFilters([idOrAddress]))).derived(first)
}
*query(filters: Filter[]) {
for (let filter of filters) {
let events: Iterable<E> = this.eventsById.values()
if (filter.ids) {
filter = omit(['ids'], filter)
events = filter.ids!.map(id => this.eventsById.get(id)).filter(identity) as E[]
filter = omit(['ids'], filter)
} else if (filter.authors) {
filter = omit(['authors'], filter)
events = uniq(filter.authors!.flatMap(pubkey => this.eventsByAuthor.get(pubkey) || []))
filter = omit(['authors'], filter)
} else if (filter.since || filter.until) {
const sinceDay = getDay(filter.since || 0)
const untilDay = getDay(filter.since || now())
@@ -172,7 +225,7 @@ export class Repository<E extends Rumor> implements Readable<Repository<E>> {
if (tag[0].length === 1) {
this._updateIndex(this.eventsByTag, tag.slice(0, 2).join(':'), event, duplicate)
if (event.kind === 5) {
if (event.kind === Kind.Delete) {
const id = tag[1]
const ts = Math.max(event.created_at, this.deletes.get(tag[1]) || 0)
@@ -182,7 +235,12 @@ export class Repository<E extends Rumor> implements Readable<Repository<E>> {
}
if (!this.isDeleted(event)) {
this.notify()
// Deletes are tricky, re-evaluate all subscriptions if that's what we're dealing with
if (event.kind === Kind.Delete) {
this.notify()
} else {
this.notify(event)
}
}
}