Add RelayCursor

This commit is contained in:
Jon Staab
2024-05-03 14:58:23 -07:00
parent 0dc2690e86
commit 4a1d61dd99
2 changed files with 71 additions and 14 deletions
+68 -12
View File
@@ -1,4 +1,5 @@
import {Emitter, chunk, sleep, uniq, omit, now, range, identity} from '@welshman/lib'
import type {Readable, Subscriber, Invalidator} from '@welshman/lib'
import {Derived, Emitter, randomId, chunk, sleep, uniq, omit, now, range, identity} from '@welshman/lib'
import {matchFilters, matchFilter} from './Filters'
import {encodeAddress, addressFromEvent} from './Address'
import {isReplaceable} from './Events'
@@ -17,8 +18,8 @@ export class Relay<E extends Rumor> extends Emitter {
eventsByTag = new Map<string, E[]>()
eventsByDay = new Map<number, E[]>()
eventsByAuthor = new Map<string, E[]>()
subs = new Map<string, Filter[]>()
deletes = new Map<string, number>()
subs = new Map<string, Filter[]>()
dump() {
return Array.from(this.eventsById.values())
@@ -38,9 +39,9 @@ export class Relay<E extends Rumor> extends Emitter {
send(type: string, ...message: any[]) {
switch(type) {
case 'EVENT': return this._onEVENT(message as [string])
case 'CLOSE': return this._onCLOSE(message as [string])
case 'REQ': return this._onREQ(message as [string, ...Filter[]])
case 'EVENT': return this.handleEVENT(message as [string])
case 'CLOSE': return this.handleCLOSE(message as [string])
case 'REQ': return this.handleREQ(message as [string, ...Filter[]])
}
}
@@ -56,7 +57,7 @@ export class Relay<E extends Rumor> extends Emitter {
const duplicateById = this.eventsById.get(event.id)
if (duplicateById) {
return
return false
}
const hasAddress = isReplaceable(event)
@@ -64,13 +65,19 @@ export class Relay<E extends Rumor> extends Emitter {
const duplicateByAddress = hasAddress ? this.eventsByAddress.get(address) : undefined
if (duplicateByAddress && duplicateByAddress.created_at >= event.created_at) {
return
return false
}
this._addEvent(event, duplicateByAddress)
return true
}
_onEVENT([json]: [string]) {
cursor(filters: Filter[]) {
return new RelayCursor(this, filters)
}
handleEVENT([json]: [string]) {
let event: E
try {
event = JSON.parse(json)
@@ -78,10 +85,11 @@ export class Relay<E extends Rumor> extends Emitter {
return
}
this.put(event)
const added = this.put(event)
this.emit('OK', event.id, true, "")
if (!this._isDeleted(event)) {
if (added && !this._isDeleted(event)) {
for (const [subId, filters] of this.subs.entries()) {
if (matchFilters(filters, event)) {
this.emit('EVENT', subId, json)
@@ -90,11 +98,11 @@ export class Relay<E extends Rumor> extends Emitter {
}
}
_onCLOSE([subId]: [string]) {
handleCLOSE([subId]: [string]) {
this.subs.delete(subId)
}
_onREQ([subId, ...filters]: [string, ...Filter[]]) {
handleREQ([subId, ...filters]: [string, ...Filter[]]) {
this.subs.set(subId, filters)
const result = new Set()
@@ -197,3 +205,51 @@ export class Relay<E extends Rumor> extends Emitter {
m.set(k, a)
}
}
type Sub<E> = (events: E[]) => void
export class RelayCursor<E extends Rumor> implements Readable<E[]> {
subId = randomId()
subs: Sub<E>[] = []
events: E[] = []
constructor(readonly relay: Relay<E>, readonly filters: Filter[]) {
this.relay.on('EVENT', (e: E) => {
this.events.push(e)
this.notify()
})
this.relay.handleREQ([this.subId, ...this.filters])
}
get() {
return this.events
}
subscribe(f: Subscriber<E[]>, invalidate?: Invalidator<E[]>) {
this.subs.push(f)
return () => {
this.subs = this.subs.filter(sub => sub !== f)
}
}
derived<U>(f: (v: E[]) => U): Derived<U> {
return new Derived<U>(this, f)
}
throttle(t: number): Derived<E[]> {
return new Derived<E[]>(this, identity, t)
}
notify() {
for (const sub of this.subs) {
sub(this.events)
}
}
close() {
this.relay.handleCLOSE([this.subId])
this.subs = []
}
}