Fix tags again, tweak feeds and stores

This commit is contained in:
Jon Staab
2024-05-09 16:00:09 -07:00
parent 9f70d8643e
commit fddee3c6ee
7 changed files with 66 additions and 46 deletions
+20 -12
View File
@@ -76,15 +76,13 @@ export class FeedCompiler<E extends Rumor> {
_compileCreatedAt(items: CreatedAtItem[]) { _compileCreatedAt(items: CreatedAtItem[]) {
const filters = items const filters = items
.map(({since, until, relative}) => { .map(({since, until, relative = []}) => {
if (relative) { if (since && relative.includes("since")) {
if (typeof since === 'number') { since = now() - since
since = now() - since }
}
if (typeof until === 'number') { if (until && relative.includes("until")) {
until = now() - until until = now() - until
}
} }
if (since && until) return {since, until} if (since && until) return {since, until}
@@ -224,7 +222,7 @@ export class FeedCompiler<E extends Rumor> {
} }
async _compileLists(listItems: ListItem[]): Promise<RequestItem[]> { async _compileLists(listItems: ListItem[]): Promise<RequestItem[]> {
const addresses = uniq(listItems.map(({address}) => address)) const addresses = uniq(listItems.flatMap(({addresses}) => addresses))
const eventsByAddress = new Map<string, E>() const eventsByAddress = new Map<string, E>()
await this.options.request({ await this.options.request({
@@ -234,10 +232,20 @@ export class FeedCompiler<E extends Rumor> {
const feeds = flatten( const feeds = flatten(
await Promise.all( await Promise.all(
listItems.map(({address, mappings}) => { listItems.map(({addresses, mappings}) => {
const event = eventsByAddress.get(address) const feeds: Feed[] = []
return event ? feedsFromTags(Tags.fromEvent(event), mappings) : [] for (const address of addresses) {
const event = eventsByAddress.get(address)
if (event) {
for (const feed of feedsFromTags(Tags.fromEvent(event), mappings)) {
feeds.push(feed)
}
}
}
return feeds
}) })
) )
) )
+3 -3
View File
@@ -44,8 +44,8 @@ export type DVMItem = {
} }
export type ListItem = { export type ListItem = {
address: string, addresses: string,
mappings: TagFeedMapping[], mappings?: TagFeedMapping[],
} }
export type WOTItem = { export type WOTItem = {
@@ -56,7 +56,7 @@ export type WOTItem = {
export type CreatedAtItem = { export type CreatedAtItem = {
since?: number, since?: number,
until?: number, until?: number,
relative?: boolean, relative?: string[],
} }
export type AddressFeed = [type: FeedType.Address, ...addresses: string[]] export type AddressFeed = [type: FeedType.Address, ...addresses: string[]]
+24 -20
View File
@@ -4,19 +4,23 @@ import {ensurePlural, identity} from "./Tools"
export type Invalidator<T> = (value?: T) => void export type Invalidator<T> = (value?: T) => void
export type Subscriber<T> = (value: T) => void export type Subscriber<T> = (value: T) => void
type Derivable = Readable<any> | Readable<any>[] type Derivable = IReadable<any> | IReadable<any>[]
type Unsubscriber = () => void type Unsubscriber = () => void
type R = Record<string, any> type R = Record<string, any>
type M<T> = Map<string, T> type M<T> = Map<string, T>
export interface Readable<T> { export interface IReadable<T> {
get: () => T get: () => T
subscribe(this: void, run: Subscriber<T>, invalidate?: Invalidator<T>): Unsubscriber subscribe(this: void, run: Subscriber<T>, invalidate?: Invalidator<T>): Unsubscriber
derived: <U>(f: (v: T) => U) => Readable<U> derived: <U>(f: (v: T) => U) => IReadable<U>
throttle(t: number): Readable<T> throttle(t: number): IReadable<T>
} }
export class Writable<T> implements Readable<T> { export interface IWritable<T> extends IReadable<T> {
set: (xs: T) => void
}
export class Writable<T> implements IWritable<T> {
value: T value: T
subs: Subscriber<T>[] = [] subs: Subscriber<T>[] = []
@@ -70,7 +74,7 @@ export class Writable<T> implements Readable<T> {
} }
} }
export class Derived<T> implements Readable<T> { export class Derived<T> implements IReadable<T> {
callerSubs: Subscriber<T>[] = [] callerSubs: Subscriber<T>[] = []
mySubs: Unsubscriber[] = [] mySubs: Unsubscriber[] = []
stores: Derivable stores: Derivable
@@ -130,20 +134,20 @@ export class Derived<T> implements Readable<T> {
} }
} }
derived<U>(f: (v: T) => U): Readable<U> { derived<U>(f: (v: T) => U): IReadable<U> {
return new Derived(this, f) as Readable<U> return new Derived(this, f) as IReadable<U>
} }
throttle = (t: number): Readable<T> => { throttle = (t: number): IReadable<T> => {
return new Derived<T>(this, identity, t) return new Derived<T>(this, identity, t)
} }
} }
export class Key<T extends R> implements Readable<T> { export class Key<T extends R> implements IReadable<T> {
readonly pk: string readonly pk: string
readonly key: string readonly key: string
base: Writable<M<T>> base: Writable<M<T>>
store: Readable<T> store: IReadable<T>
constructor(base: Writable<M<T>>, pk: string, key: string) { constructor(base: Writable<M<T>>, pk: string, key: string) {
if (!(base.get() instanceof Map)) { if (!(base.get() instanceof Map)) {
@@ -202,13 +206,13 @@ export class Key<T extends R> implements Readable<T> {
} }
} }
export class DerivedKey<T extends R> implements Readable<T> { export class DerivedKey<T extends R> implements IReadable<T> {
readonly pk: string readonly pk: string
readonly key: string readonly key: string
base: Readable<M<T>> base: IReadable<M<T>>
store: Readable<T> store: IReadable<T>
constructor(base: Readable<M<T>>, pk: string, key: string) { constructor(base: IReadable<M<T>>, pk: string, key: string) {
if (!(base.get() instanceof Map)) { if (!(base.get() instanceof Map)) {
throw new Error("`key` can only be used on map collections") throw new Error("`key` can only be used on map collections")
} }
@@ -230,10 +234,10 @@ export class DerivedKey<T extends R> implements Readable<T> {
exists = () => this.base.get().has(this.key) exists = () => this.base.get().has(this.key)
} }
export class Collection<T extends R> implements Readable<T[]> { export class Collection<T extends R> implements IReadable<T[]> {
readonly pk: string readonly pk: string
readonly mapStore: Writable<M<T>> readonly mapStore: Writable<M<T>>
readonly listStore: Readable<T[]> readonly listStore: IReadable<T[]>
constructor(pk: string, t?: number) { constructor(pk: string, t?: number) {
this.pk = pk this.pk = pk
@@ -284,9 +288,9 @@ export class Collection<T extends R> implements Readable<T[]> {
map = (f: (v: T) => T) => this.update((xs: T[]) => xs.map(f)) map = (f: (v: T) => T) => this.update((xs: T[]) => xs.map(f))
} }
export class DerivedCollection<T extends R> implements Readable<T[]> { export class DerivedCollection<T extends R> implements IReadable<T[]> {
readonly listStore: Derived<T[]> readonly listStore: Derived<T[]>
readonly mapStore: Readable<M<T>> readonly mapStore: IReadable<M<T>>
constructor( constructor(
readonly pk: string, readonly pk: string,
@@ -316,7 +320,7 @@ export const writable = <T>(v: T) => new Writable(v)
export const derived = <T>(stores: Derivable, getValue: (values: any) => T) => export const derived = <T>(stores: Derivable, getValue: (values: any) => T) =>
new Derived(stores, getValue) new Derived(stores, getValue)
export const readable = <T>(v: T) => derived(new Writable(v), identity) as Readable<T> export const readable = <T>(v: T) => derived(new Writable(v), identity) as IReadable<T>
export const derivedCollection = <T extends R>( export const derivedCollection = <T extends R>(
pk: string, pk: string,
+3 -3
View File
@@ -46,7 +46,7 @@ export const publish = (request: PublishRequest) => {
const event = asEvent(request.event) const event = asEvent(request.event)
const executor = NetworkContext.getExecutor(request.relays) const executor = NetworkContext.getExecutor(request.relays)
const abort = (reason: PublishStatus) => () => { const abort = (reason: PublishStatus) => {
for (const [url, status] of pub.status.entries()) { for (const [url, status] of pub.status.entries()) {
if (status === PublishStatus.Pending) { if (status === PublishStatus.Pending) {
pub.emitter.emit(reason, url) pub.emitter.emit(reason, url)
@@ -75,10 +75,10 @@ export const publish = (request: PublishRequest) => {
}) })
// Give up after a specified time // Give up after a specified time
const timeout = setTimeout(abort(PublishStatus.Timeout), request.timeout || 10_000) const timeout = setTimeout(() => abort(PublishStatus.Timeout), request.timeout || 10_000)
// If we have a signal, use it // If we have a signal, use it
request.signal?.addEventListener('abort', abort(PublishStatus.Aborted)) request.signal?.addEventListener('abort', () => abort(PublishStatus.Aborted))
// Delegate to our executor // Delegate to our executor
const executorSub = executor.publish(event, { const executorSub = executor.publish(event, {
+13 -5
View File
@@ -35,6 +35,7 @@ export enum SubscriptionEvent {
export type SubscribeRequest = { export type SubscribeRequest = {
relays: string[] relays: string[]
filters: Filter[] filters: Filter[]
signal?: AbortSignal
timeout?: number timeout?: number
tracker?: Tracker tracker?: Tracker
immediate?: boolean immediate?: boolean
@@ -87,14 +88,17 @@ export const mergeSubscriptions = (subs: Subscription[]) => {
filters: unionFilters(callerSubs.flatMap((sub: Subscription) => sub.request.filters)), filters: unionFilters(callerSubs.flatMap((sub: Subscription) => sub.request.filters)),
}) })
for (const {id, controller} of callerSubs) { for (const {id, controller, request} of callerSubs) {
controller.signal.addEventListener('abort', () => { const onAbort = () => {
abortedSubs.add(id) abortedSubs.add(id)
if (abortedSubs.size === callerSubs.length) { if (abortedSubs.size === callerSubs.length) {
mergedSub.close() mergedSub.close()
} }
}) }
request.signal?.addEventListener('abort', onAbort)
controller.signal.addEventListener('abort', onAbort)
} }
mergedSub.emitter.on(SubscriptionEvent.Event, (url: string, event: Event) => { mergedSub.emitter.on(SubscriptionEvent.Event, (url: string, event: Event) => {
@@ -172,7 +176,7 @@ export const mergeSubscriptions = (subs: Subscription[]) => {
export const executeSubscription = (sub: Subscription) => { export const executeSubscription = (sub: Subscription) => {
const {result, request, emitter, tracker, controller} = sub const {result, request, emitter, tracker, controller} = sub
const {timeout, filters, closeOnEose, relays} = request const {timeout, filters, closeOnEose, relays, signal} = request
const executor = NetworkContext.getExecutor(relays) const executor = NetworkContext.getExecutor(relays)
const events: Event[] = [] const events: Event[] = []
@@ -234,7 +238,11 @@ export const executeSubscription = (sub: Subscription) => {
} }
} }
// Listen for abort via signal // Listen for abort via caller signal
signal?.addEventListener('abort', complete)
signal?.addEventListener('abort', () => console.log('aborted'))
// Listen for abort via our own internal signal
controller.signal.addEventListener('abort', complete) controller.signal.addEventListener('abort', complete)
// If we have a timeout, complete the subscription automatically // If we have a timeout, complete the subscription automatically
+2 -2
View File
@@ -1,5 +1,5 @@
import {throttle} from 'throttle-debounce' import {throttle} from 'throttle-debounce'
import type {Readable, Subscriber, Invalidator} from '@welshman/lib' import type {IReadable, Subscriber, Invalidator} from '@welshman/lib'
import {Derived, Emitter, writable, first, always, chunk, sleep, uniq, omit, now, range, identity} from '@welshman/lib' import {Derived, Emitter, writable, first, always, chunk, sleep, uniq, omit, now, range, identity} from '@welshman/lib'
import {Kind} from './Kinds' import {Kind} from './Kinds'
import {matchFilter, getIdFilters, matchFilters} from './Filters' import {matchFilter, getIdFilters, matchFilters} from './Filters'
@@ -16,7 +16,7 @@ export type RepositoryOptions = {
throttle?: number throttle?: number
} }
export class Repository<E extends Rumor> extends Emitter implements Readable<Repository<E>> { export class Repository<E extends Rumor> extends Emitter implements IReadable<Repository<E>> {
eventsById = new Map<string, E>() eventsById = new Map<string, E>()
eventsByAddress = new Map<string, E>() eventsByAddress = new Map<string, E>()
eventsByTag = new Map<string, E[]>() eventsByTag = new Map<string, E[]>()
+1 -1
View File
@@ -108,7 +108,7 @@ export class Tags extends (Fluent<Tag> as OmitStatics<typeof Fluent<Tag>, 'from'
// Add different types separately so positional logic works // Add different types separately so positional logic works
dispatchTags(tags.whereKey("e")) dispatchTags(tags.whereKey("e"))
dispatchTags(tags.whereKey("a")) dispatchTags(tags.whereKey("a").filter(t => Boolean(t.nth(3))))
mentionTags.forEach((t: Tag) => mentions.push(t.valueOf())) mentionTags.forEach((t: Tag) => mentions.push(t.valueOf()))
return { return {