diff --git a/packages/feeds/compiler.ts b/packages/feeds/compiler.ts index 5cf23a3..3c94a06 100644 --- a/packages/feeds/compiler.ts +++ b/packages/feeds/compiler.ts @@ -76,15 +76,13 @@ export class FeedCompiler { _compileCreatedAt(items: CreatedAtItem[]) { const filters = items - .map(({since, until, relative}) => { - if (relative) { - if (typeof since === 'number') { - since = now() - since - } + .map(({since, until, relative = []}) => { + if (since && relative.includes("since")) { + since = now() - since + } - if (typeof until === 'number') { - until = now() - until - } + if (until && relative.includes("until")) { + until = now() - until } if (since && until) return {since, until} @@ -224,7 +222,7 @@ export class FeedCompiler { } async _compileLists(listItems: ListItem[]): Promise { - const addresses = uniq(listItems.map(({address}) => address)) + const addresses = uniq(listItems.flatMap(({addresses}) => addresses)) const eventsByAddress = new Map() await this.options.request({ @@ -234,10 +232,20 @@ export class FeedCompiler { const feeds = flatten( await Promise.all( - listItems.map(({address, mappings}) => { - const event = eventsByAddress.get(address) + listItems.map(({addresses, mappings}) => { + const feeds: Feed[] = [] - return event ? feedsFromTags(Tags.fromEvent(event), mappings) : [] + for (const address of addresses) { + const event = eventsByAddress.get(address) + + if (event) { + for (const feed of feedsFromTags(Tags.fromEvent(event), mappings)) { + feeds.push(feed) + } + } + } + + return feeds }) ) ) diff --git a/packages/feeds/core.ts b/packages/feeds/core.ts index b41f6d3..6f63b88 100644 --- a/packages/feeds/core.ts +++ b/packages/feeds/core.ts @@ -44,8 +44,8 @@ export type DVMItem = { } export type ListItem = { - address: string, - mappings: TagFeedMapping[], + addresses: string, + mappings?: TagFeedMapping[], } export type WOTItem = { @@ -56,7 +56,7 @@ export type WOTItem = { export type CreatedAtItem = { since?: number, until?: number, - relative?: boolean, + relative?: string[], } export type AddressFeed = [type: FeedType.Address, ...addresses: string[]] diff --git a/packages/lib/Store.ts b/packages/lib/Store.ts index 66e2d46..2ebfc21 100644 --- a/packages/lib/Store.ts +++ b/packages/lib/Store.ts @@ -4,19 +4,23 @@ import {ensurePlural, identity} from "./Tools" export type Invalidator = (value?: T) => void export type Subscriber = (value: T) => void -type Derivable = Readable | Readable[] +type Derivable = IReadable | IReadable[] type Unsubscriber = () => void type R = Record type M = Map -export interface Readable { +export interface IReadable { get: () => T subscribe(this: void, run: Subscriber, invalidate?: Invalidator): Unsubscriber - derived: (f: (v: T) => U) => Readable - throttle(t: number): Readable + derived: (f: (v: T) => U) => IReadable + throttle(t: number): IReadable } -export class Writable implements Readable { +export interface IWritable extends IReadable { + set: (xs: T) => void +} + +export class Writable implements IWritable { value: T subs: Subscriber[] = [] @@ -70,7 +74,7 @@ export class Writable implements Readable { } } -export class Derived implements Readable { +export class Derived implements IReadable { callerSubs: Subscriber[] = [] mySubs: Unsubscriber[] = [] stores: Derivable @@ -130,20 +134,20 @@ export class Derived implements Readable { } } - derived(f: (v: T) => U): Readable { - return new Derived(this, f) as Readable + derived(f: (v: T) => U): IReadable { + return new Derived(this, f) as IReadable } - throttle = (t: number): Readable => { + throttle = (t: number): IReadable => { return new Derived(this, identity, t) } } -export class Key implements Readable { +export class Key implements IReadable { readonly pk: string readonly key: string base: Writable> - store: Readable + store: IReadable constructor(base: Writable>, pk: string, key: string) { if (!(base.get() instanceof Map)) { @@ -202,13 +206,13 @@ export class Key implements Readable { } } -export class DerivedKey implements Readable { +export class DerivedKey implements IReadable { readonly pk: string readonly key: string - base: Readable> - store: Readable + base: IReadable> + store: IReadable - constructor(base: Readable>, pk: string, key: string) { + constructor(base: IReadable>, pk: string, key: string) { if (!(base.get() instanceof Map)) { throw new Error("`key` can only be used on map collections") } @@ -230,10 +234,10 @@ export class DerivedKey implements Readable { exists = () => this.base.get().has(this.key) } -export class Collection implements Readable { +export class Collection implements IReadable { readonly pk: string readonly mapStore: Writable> - readonly listStore: Readable + readonly listStore: IReadable constructor(pk: string, t?: number) { this.pk = pk @@ -284,9 +288,9 @@ export class Collection implements Readable { map = (f: (v: T) => T) => this.update((xs: T[]) => xs.map(f)) } -export class DerivedCollection implements Readable { +export class DerivedCollection implements IReadable { readonly listStore: Derived - readonly mapStore: Readable> + readonly mapStore: IReadable> constructor( readonly pk: string, @@ -316,7 +320,7 @@ export const writable = (v: T) => new Writable(v) export const derived = (stores: Derivable, getValue: (values: any) => T) => new Derived(stores, getValue) -export const readable = (v: T) => derived(new Writable(v), identity) as Readable +export const readable = (v: T) => derived(new Writable(v), identity) as IReadable export const derivedCollection = ( pk: string, diff --git a/packages/net/Publish.ts b/packages/net/Publish.ts index 8c86894..f5b0844 100644 --- a/packages/net/Publish.ts +++ b/packages/net/Publish.ts @@ -46,7 +46,7 @@ export const publish = (request: PublishRequest) => { const event = asEvent(request.event) const executor = NetworkContext.getExecutor(request.relays) - const abort = (reason: PublishStatus) => () => { + const abort = (reason: PublishStatus) => { for (const [url, status] of pub.status.entries()) { if (status === PublishStatus.Pending) { pub.emitter.emit(reason, url) @@ -75,10 +75,10 @@ export const publish = (request: PublishRequest) => { }) // Give up after a specified time - const timeout = setTimeout(abort(PublishStatus.Timeout), request.timeout || 10_000) + const timeout = setTimeout(() => abort(PublishStatus.Timeout), request.timeout || 10_000) // If we have a signal, use it - request.signal?.addEventListener('abort', abort(PublishStatus.Aborted)) + request.signal?.addEventListener('abort', () => abort(PublishStatus.Aborted)) // Delegate to our executor const executorSub = executor.publish(event, { diff --git a/packages/net/Subscribe.ts b/packages/net/Subscribe.ts index 71ca5ab..0d188ce 100644 --- a/packages/net/Subscribe.ts +++ b/packages/net/Subscribe.ts @@ -35,6 +35,7 @@ export enum SubscriptionEvent { export type SubscribeRequest = { relays: string[] filters: Filter[] + signal?: AbortSignal timeout?: number tracker?: Tracker immediate?: boolean @@ -87,14 +88,17 @@ export const mergeSubscriptions = (subs: Subscription[]) => { filters: unionFilters(callerSubs.flatMap((sub: Subscription) => sub.request.filters)), }) - for (const {id, controller} of callerSubs) { - controller.signal.addEventListener('abort', () => { + for (const {id, controller, request} of callerSubs) { + const onAbort = () => { abortedSubs.add(id) if (abortedSubs.size === callerSubs.length) { mergedSub.close() } - }) + } + + request.signal?.addEventListener('abort', onAbort) + controller.signal.addEventListener('abort', onAbort) } mergedSub.emitter.on(SubscriptionEvent.Event, (url: string, event: Event) => { @@ -172,7 +176,7 @@ export const mergeSubscriptions = (subs: Subscription[]) => { export const executeSubscription = (sub: Subscription) => { const {result, request, emitter, tracker, controller} = sub - const {timeout, filters, closeOnEose, relays} = request + const {timeout, filters, closeOnEose, relays, signal} = request const executor = NetworkContext.getExecutor(relays) const events: Event[] = [] @@ -234,7 +238,11 @@ export const executeSubscription = (sub: Subscription) => { } } - // Listen for abort via signal + // Listen for abort via caller signal + signal?.addEventListener('abort', complete) + signal?.addEventListener('abort', () => console.log('aborted')) + + // Listen for abort via our own internal signal controller.signal.addEventListener('abort', complete) // If we have a timeout, complete the subscription automatically diff --git a/packages/util/Repository.ts b/packages/util/Repository.ts index b8935d2..6388ed3 100644 --- a/packages/util/Repository.ts +++ b/packages/util/Repository.ts @@ -1,5 +1,5 @@ import {throttle} from 'throttle-debounce' -import type {Readable, Subscriber, Invalidator} from '@welshman/lib' +import type {IReadable, Subscriber, Invalidator} from '@welshman/lib' import {Derived, Emitter, writable, first, always, chunk, sleep, uniq, omit, now, range, identity} from '@welshman/lib' import {Kind} from './Kinds' import {matchFilter, getIdFilters, matchFilters} from './Filters' @@ -16,7 +16,7 @@ export type RepositoryOptions = { throttle?: number } -export class Repository extends Emitter implements Readable> { +export class Repository extends Emitter implements IReadable> { eventsById = new Map() eventsByAddress = new Map() eventsByTag = new Map() diff --git a/packages/util/Tags.ts b/packages/util/Tags.ts index 9edabbf..9d3eff0 100644 --- a/packages/util/Tags.ts +++ b/packages/util/Tags.ts @@ -108,7 +108,7 @@ export class Tags extends (Fluent as OmitStatics, 'from' // Add different types separately so positional logic works dispatchTags(tags.whereKey("e")) - dispatchTags(tags.whereKey("a")) + dispatchTags(tags.whereKey("a").filter(t => Boolean(t.nth(3)))) mentionTags.forEach((t: Tag) => mentions.push(t.valueOf())) return {