Add new publish function

This commit is contained in:
Jon Staab
2024-04-04 16:58:04 -07:00
parent babbb897bf
commit 98ef2c3e3e
5 changed files with 134 additions and 17 deletions
+91
View File
@@ -0,0 +1,91 @@
import type {Event} from 'nostr-tools'
import {Emitter, randomId, groupBy, batch, defer, uniq, uniqBy} from '@coracle.social/lib'
import type {Deferred} from '@coracle.social/lib'
import {asEvent,} from '@coracle.social/util'
import {Tracker} from "./Tracker"
import {Connection} from './Connection'
import {NetworkContext} from './Context'
export enum PublishStatus {
Pending = "pending",
Success = "success",
Failure = "failure",
Timeout = "timeout",
}
export type PublishStatusMap = Map<string, PublishStatus>
export type PublishRequest = {
event: Event
relays: string[]
timeout?: number
verb?: "EVENT" | "AUTH"
}
export type Publish = {
id: string
emitter: Emitter
request: PublishRequest
status: PublishStatusMap
result: Deferred<PublishStatusMap>
}
export const makePublish = (request: PublishRequest) => {
const id = randomId()
const emitter = new Emitter()
const result: Publish['result'] = defer()
const status: Publish['status'] = new Map()
return {id, request, emitter, result, status}
}
export const publish = (request: PublishRequest) => {
const pub = makePublish(request)
const event = asEvent(request.event)
const executor = NetworkContext.getExecutor(request.relays)
// Listen to updates and keep status up to date. Every time there's an update, check to
// see if we're done. If we are, clear our timeout, executor, etc.
pub.emitter.on("*", (status: PublishStatus, url: string) => {
pub.status.set(url, status)
if (Array.from(pub.status.values()).every((s: PublishStatus) => s !== PublishStatus.Pending)) {
clearTimeout(timeout)
executorSub.unsubscribe()
executor.target.cleanup()
pub.result.resolve(pub.status)
}
})
// Start everything off as pending
for (const relay of request.relays) {
pub.emitter.emit(PublishStatus.Pending, relay)
}
// Set a timeout
const timeout = setTimeout(() => {
for (const [url, status] of pub.status.entries()) {
if (status === PublishStatus.Pending) {
pub.emitter.emit(PublishStatus.Timeout, url)
}
}
}, request.timeout || 10_000)
// Delegate to our executor
const executorSub = executor.publish(event, {
verb: request.verb || "EVENT",
onOk: (url: string, eventId: string, ok: boolean) => {
if (ok) {
pub.emitter.emit(PublishStatus.Success, url)
} else {
pub.emitter.emit(PublishStatus.Failure, url)
}
},
onError: (url: string) => {
pub.emitter.emit(PublishStatus.Failure, url)
},
})
return pub
}
+18 -6
View File
@@ -1,5 +1,5 @@
import type {Event} from 'nostr-tools'
import {Emitter, randomId, groupBy, batch, defer, uniq} from '@coracle.social/lib'
import {Emitter, randomId, groupBy, batch, defer, uniq, uniqBy} from '@coracle.social/lib'
import type {Deferred} from '@coracle.social/lib'
import {matchFilters, calculateFilterGroup, mergeFilters} from '@coracle.social/util'
import type {Filter} from '@coracle.social/util'
@@ -85,10 +85,7 @@ export const mergeSubscriptions = (subs: Subscription[]) => {
filters: mergeFilters(callerSubs.flatMap((sub: Subscription) => sub.request.filters)),
})
for (const {id, emitter, tracker} of callerSubs) {
// Propagate links to the caller
tracker.link(mergedSub.tracker)
for (const {id, emitter} of callerSubs) {
// Propagate abort event from the caller to the merged subscription
emitter.on(SubscriptionEvent.Abort, () => {
abortedSubs.add(id)
@@ -99,6 +96,20 @@ export const mergeSubscriptions = (subs: Subscription[]) => {
})
}
mergedSub.emitter.on(SubscriptionEvent.Event, (url: string, event: Event) => {
for (const sub of callerSubs) {
if (sub.tracker.track(event.id, url)) {
continue
}
if (!matchFilters(sub.request.filters, event)) {
continue
}
sub.emitter.emit(SubscriptionEvent.Event, url, event)
}
})
// Pass events back to caller
const propagateEvent = (type: SubscriptionEvent, checkFilter: boolean) =>
mergedSub.emitter.on(type, (url: string, event: Event) => {
@@ -109,7 +120,6 @@ export const mergeSubscriptions = (subs: Subscription[]) => {
}
})
propagateEvent(SubscriptionEvent.Event, true)
propagateEvent(SubscriptionEvent.Duplicate, true)
propagateEvent(SubscriptionEvent.DeletedEvent, false)
propagateEvent(SubscriptionEvent.FailedFilter, false)
@@ -145,6 +155,8 @@ export const mergeSubscriptions = (subs: Subscription[]) => {
// Propagate promise resolution
mergedSub.result.then((events: Event[]) => {
events = uniqBy((event: Event) => event.id, events)
for (const sub of callerSubs) {
sub.result.resolve(events.filter((e: Event) => matchFilters(sub.request.filters, e)))
}
+2 -11
View File
@@ -1,7 +1,6 @@
import {writable} from '@coracle.social/lib'
export class Tracker {
links: Tracker[] = []
data = writable(new Map<string, Set<string>>())
getRelays = (eventId: string) => {
@@ -11,12 +10,6 @@ export class Tracker {
relays.add(relay)
}
for (const link of this.links) {
for (const relay of link.getRelays(eventId)) {
relays.add(relay)
}
}
return relays
}
@@ -37,15 +30,13 @@ export class Tracker {
}
track = (eventId: string, relay: string) => {
if (this.hasRelay(eventId, relay)) return true
const seen = this.data.get().has(eventId)
this.addRelay(eventId, relay)
return false
return seen
}
link = (tracker: Tracker) => this.links.push(tracker)
copy = (eventId1: string, eventId2: string) => {
for (const relay of this.getRelays(eventId1)) {
this.addRelay(eventId2, relay)
+1
View File
@@ -3,6 +3,7 @@ export * from "./ConnectionMeta"
export * from "./Context"
export * from "./Executor"
export * from "./Pool"
export * from "./Publish"
export * from "./Socket"
export * from "./Subscribe"
export * from "./Tracker"