diff --git a/packages/lib/Store.ts b/packages/lib/Store.ts index 7a4f9e3..c6fec46 100644 --- a/packages/lib/Store.ts +++ b/packages/lib/Store.ts @@ -1,9 +1,10 @@ import {throttle} from "throttle-debounce" import {ensurePlural, identity} from "./Tools" -type Invalidator = (value?: T) => void +export type Invalidator = (value?: T) => void +export type Subscriber = (value: T) => void + type Derivable = Readable | Readable[] -type Subscriber = (value: T) => void type Unsubscriber = () => void type R = Record type M = Map diff --git a/packages/util/Relay.ts b/packages/util/Relay.ts index 0853094..51e9c69 100644 --- a/packages/util/Relay.ts +++ b/packages/util/Relay.ts @@ -1,4 +1,5 @@ -import {Emitter, chunk, sleep, uniq, omit, now, range, identity} from '@welshman/lib' +import type {Readable, Subscriber, Invalidator} from '@welshman/lib' +import {Derived, Emitter, randomId, chunk, sleep, uniq, omit, now, range, identity} from '@welshman/lib' import {matchFilters, matchFilter} from './Filters' import {encodeAddress, addressFromEvent} from './Address' import {isReplaceable} from './Events' @@ -17,8 +18,8 @@ export class Relay extends Emitter { eventsByTag = new Map() eventsByDay = new Map() eventsByAuthor = new Map() - subs = new Map() deletes = new Map() + subs = new Map() dump() { return Array.from(this.eventsById.values()) @@ -38,9 +39,9 @@ export class Relay extends Emitter { send(type: string, ...message: any[]) { switch(type) { - case 'EVENT': return this._onEVENT(message as [string]) - case 'CLOSE': return this._onCLOSE(message as [string]) - case 'REQ': return this._onREQ(message as [string, ...Filter[]]) + case 'EVENT': return this.handleEVENT(message as [string]) + case 'CLOSE': return this.handleCLOSE(message as [string]) + case 'REQ': return this.handleREQ(message as [string, ...Filter[]]) } } @@ -56,7 +57,7 @@ export class Relay extends Emitter { const duplicateById = this.eventsById.get(event.id) if (duplicateById) { - return + return false } const hasAddress = isReplaceable(event) @@ -64,13 +65,19 @@ export class Relay extends Emitter { const duplicateByAddress = hasAddress ? this.eventsByAddress.get(address) : undefined if (duplicateByAddress && duplicateByAddress.created_at >= event.created_at) { - return + return false } this._addEvent(event, duplicateByAddress) + + return true } - _onEVENT([json]: [string]) { + cursor(filters: Filter[]) { + return new RelayCursor(this, filters) + } + + handleEVENT([json]: [string]) { let event: E try { event = JSON.parse(json) @@ -78,10 +85,11 @@ export class Relay extends Emitter { return } - this.put(event) + const added = this.put(event) + this.emit('OK', event.id, true, "") - if (!this._isDeleted(event)) { + if (added && !this._isDeleted(event)) { for (const [subId, filters] of this.subs.entries()) { if (matchFilters(filters, event)) { this.emit('EVENT', subId, json) @@ -90,11 +98,11 @@ export class Relay extends Emitter { } } - _onCLOSE([subId]: [string]) { + handleCLOSE([subId]: [string]) { this.subs.delete(subId) } - _onREQ([subId, ...filters]: [string, ...Filter[]]) { + handleREQ([subId, ...filters]: [string, ...Filter[]]) { this.subs.set(subId, filters) const result = new Set() @@ -197,3 +205,51 @@ export class Relay extends Emitter { m.set(k, a) } } + +type Sub = (events: E[]) => void + +export class RelayCursor implements Readable { + subId = randomId() + subs: Sub[] = [] + events: E[] = [] + + constructor(readonly relay: Relay, readonly filters: Filter[]) { + this.relay.on('EVENT', (e: E) => { + this.events.push(e) + this.notify() + }) + + this.relay.handleREQ([this.subId, ...this.filters]) + } + + get() { + return this.events + } + + subscribe(f: Subscriber, invalidate?: Invalidator) { + this.subs.push(f) + + return () => { + this.subs = this.subs.filter(sub => sub !== f) + } + } + + derived(f: (v: E[]) => U): Derived { + return new Derived(this, f) + } + + throttle(t: number): Derived { + return new Derived(this, identity, t) + } + + notify() { + for (const sub of this.subs) { + sub(this.events) + } + } + + close() { + this.relay.handleCLOSE([this.subId]) + this.subs = [] + } +}