diff --git a/packages/lib/Tools.ts b/packages/lib/Tools.ts index 1413a68..4c6e3ae 100644 --- a/packages/lib/Tools.ts +++ b/packages/lib/Tools.ts @@ -27,12 +27,34 @@ export const sum = (xs: number[]) => xs.reduce((a, b) => a + b, 0) export const avg = (xs: number[]) => sum(xs) / xs.length +export const drop = (n: number, xs: T[]) => xs.slice(n) + +export const take = (n: number, xs: T[]) => xs.slice(0, n) + export const between = (low: number, high: number, n: number) => n > low && n < high export const flatten = (xs: T[]) => xs.flatMap(identity) export const uniq = (xs: T[]) => Array.from(new Set(xs)) +export const uniqBy = (f: (x: T) => any, xs: T[]) => { + const s = new Set() + const r = [] + + for (const x of xs) { + const k = f(x) + + if (s.has(k)) { + continue + } + + s.add(k) + r.push(x) + } + + return r +} + export const choice = (xs: T[]): T => xs[Math.floor(xs.length * Math.random())] export const shuffle = (xs: Iterable): T[] => Array.from(xs).sort(() => Math.random() > 0.5 ? 1 : -1) diff --git a/packages/network/Publish.ts b/packages/network/Publish.ts new file mode 100644 index 0000000..7743171 --- /dev/null +++ b/packages/network/Publish.ts @@ -0,0 +1,91 @@ +import type {Event} from 'nostr-tools' +import {Emitter, randomId, groupBy, batch, defer, uniq, uniqBy} from '@coracle.social/lib' +import type {Deferred} from '@coracle.social/lib' +import {asEvent,} from '@coracle.social/util' +import {Tracker} from "./Tracker" +import {Connection} from './Connection' +import {NetworkContext} from './Context' + +export enum PublishStatus { + Pending = "pending", + Success = "success", + Failure = "failure", + Timeout = "timeout", +} + +export type PublishStatusMap = Map + +export type PublishRequest = { + event: Event + relays: string[] + timeout?: number + verb?: "EVENT" | "AUTH" +} + +export type Publish = { + id: string + emitter: Emitter + request: PublishRequest + status: PublishStatusMap + result: Deferred +} + +export const makePublish = (request: PublishRequest) => { + const id = randomId() + const emitter = new Emitter() + const result: Publish['result'] = defer() + const status: Publish['status'] = new Map() + + return {id, request, emitter, result, status} +} + +export const publish = (request: PublishRequest) => { + const pub = makePublish(request) + const event = asEvent(request.event) + const executor = NetworkContext.getExecutor(request.relays) + + // Listen to updates and keep status up to date. Every time there's an update, check to + // see if we're done. If we are, clear our timeout, executor, etc. + pub.emitter.on("*", (status: PublishStatus, url: string) => { + pub.status.set(url, status) + + if (Array.from(pub.status.values()).every((s: PublishStatus) => s !== PublishStatus.Pending)) { + clearTimeout(timeout) + executorSub.unsubscribe() + executor.target.cleanup() + pub.result.resolve(pub.status) + } + }) + + // Start everything off as pending + for (const relay of request.relays) { + pub.emitter.emit(PublishStatus.Pending, relay) + } + + // Set a timeout + const timeout = setTimeout(() => { + for (const [url, status] of pub.status.entries()) { + if (status === PublishStatus.Pending) { + pub.emitter.emit(PublishStatus.Timeout, url) + } + } + }, request.timeout || 10_000) + + // Delegate to our executor + const executorSub = executor.publish(event, { + verb: request.verb || "EVENT", + onOk: (url: string, eventId: string, ok: boolean) => { + if (ok) { + pub.emitter.emit(PublishStatus.Success, url) + } else { + pub.emitter.emit(PublishStatus.Failure, url) + } + }, + onError: (url: string) => { + pub.emitter.emit(PublishStatus.Failure, url) + }, + }) + + return pub +} + diff --git a/packages/network/Subscribe.ts b/packages/network/Subscribe.ts index 4a8e17b..4f4181e 100644 --- a/packages/network/Subscribe.ts +++ b/packages/network/Subscribe.ts @@ -1,5 +1,5 @@ import type {Event} from 'nostr-tools' -import {Emitter, randomId, groupBy, batch, defer, uniq} from '@coracle.social/lib' +import {Emitter, randomId, groupBy, batch, defer, uniq, uniqBy} from '@coracle.social/lib' import type {Deferred} from '@coracle.social/lib' import {matchFilters, calculateFilterGroup, mergeFilters} from '@coracle.social/util' import type {Filter} from '@coracle.social/util' @@ -85,10 +85,7 @@ export const mergeSubscriptions = (subs: Subscription[]) => { filters: mergeFilters(callerSubs.flatMap((sub: Subscription) => sub.request.filters)), }) - for (const {id, emitter, tracker} of callerSubs) { - // Propagate links to the caller - tracker.link(mergedSub.tracker) - + for (const {id, emitter} of callerSubs) { // Propagate abort event from the caller to the merged subscription emitter.on(SubscriptionEvent.Abort, () => { abortedSubs.add(id) @@ -99,6 +96,20 @@ export const mergeSubscriptions = (subs: Subscription[]) => { }) } + mergedSub.emitter.on(SubscriptionEvent.Event, (url: string, event: Event) => { + for (const sub of callerSubs) { + if (sub.tracker.track(event.id, url)) { + continue + } + + if (!matchFilters(sub.request.filters, event)) { + continue + } + + sub.emitter.emit(SubscriptionEvent.Event, url, event) + } + }) + // Pass events back to caller const propagateEvent = (type: SubscriptionEvent, checkFilter: boolean) => mergedSub.emitter.on(type, (url: string, event: Event) => { @@ -109,7 +120,6 @@ export const mergeSubscriptions = (subs: Subscription[]) => { } }) - propagateEvent(SubscriptionEvent.Event, true) propagateEvent(SubscriptionEvent.Duplicate, true) propagateEvent(SubscriptionEvent.DeletedEvent, false) propagateEvent(SubscriptionEvent.FailedFilter, false) @@ -145,6 +155,8 @@ export const mergeSubscriptions = (subs: Subscription[]) => { // Propagate promise resolution mergedSub.result.then((events: Event[]) => { + events = uniqBy((event: Event) => event.id, events) + for (const sub of callerSubs) { sub.result.resolve(events.filter((e: Event) => matchFilters(sub.request.filters, e))) } diff --git a/packages/network/Tracker.ts b/packages/network/Tracker.ts index e28d308..3afbc24 100644 --- a/packages/network/Tracker.ts +++ b/packages/network/Tracker.ts @@ -1,7 +1,6 @@ import {writable} from '@coracle.social/lib' export class Tracker { - links: Tracker[] = [] data = writable(new Map>()) getRelays = (eventId: string) => { @@ -11,12 +10,6 @@ export class Tracker { relays.add(relay) } - for (const link of this.links) { - for (const relay of link.getRelays(eventId)) { - relays.add(relay) - } - } - return relays } @@ -37,15 +30,13 @@ export class Tracker { } track = (eventId: string, relay: string) => { - if (this.hasRelay(eventId, relay)) return true + const seen = this.data.get().has(eventId) this.addRelay(eventId, relay) - return false + return seen } - link = (tracker: Tracker) => this.links.push(tracker) - copy = (eventId1: string, eventId2: string) => { for (const relay of this.getRelays(eventId1)) { this.addRelay(eventId2, relay) diff --git a/packages/network/index.ts b/packages/network/index.ts index 9d21ca4..3c1e39d 100644 --- a/packages/network/index.ts +++ b/packages/network/index.ts @@ -3,6 +3,7 @@ export * from "./ConnectionMeta" export * from "./Context" export * from "./Executor" export * from "./Pool" +export * from "./Publish" export * from "./Socket" export * from "./Subscribe" export * from "./Tracker"