remove result from subscription, pass closeOnEose through

This commit is contained in:
Jon Staab
2024-06-20 12:11:38 -07:00
parent da6df3fc97
commit fafbb345f1
2 changed files with 5 additions and 18 deletions
+1 -1
View File
@@ -1,6 +1,6 @@
{ {
"name": "@welshman/net", "name": "@welshman/net",
"version": "0.0.13", "version": "0.0.14",
"author": "hodlbod", "author": "hodlbod",
"license": "MIT", "license": "MIT",
"description": "Utilities for connecting with nostr relays.", "description": "Utilities for connecting with nostr relays.",
+4 -17
View File
@@ -1,6 +1,5 @@
import type {Event} from 'nostr-tools' import type {Event} from 'nostr-tools'
import {Emitter, chunk, flatten, randomId, once, groupBy, batch, defer, uniq, uniqBy} from '@welshman/lib' import {Emitter, chunk, randomId, once, groupBy, batch, uniq} from '@welshman/lib'
import type {Deferred} from '@welshman/lib'
import {matchFilters, unionFilters} from '@welshman/util' import {matchFilters, unionFilters} from '@welshman/util'
import type {Filter} from '@welshman/util' import type {Filter} from '@welshman/util'
import {Tracker} from "./Tracker" import {Tracker} from "./Tracker"
@@ -47,7 +46,6 @@ export type Subscription = {
emitter: Emitter emitter: Emitter
tracker: Tracker tracker: Tracker
controller: AbortController controller: AbortController
result: Deferred<Event[]>
request: SubscribeRequest request: SubscribeRequest
close: () => void close: () => void
} }
@@ -56,13 +54,12 @@ export const makeSubscription = (request: SubscribeRequest) => {
const id = randomId() const id = randomId()
const emitter = new Emitter() const emitter = new Emitter()
const controller = new AbortController() const controller = new AbortController()
const result = defer<Event[]>()
const tracker = request.tracker || new Tracker() const tracker = request.tracker || new Tracker()
const close = () => controller.abort() const close = () => controller.abort()
emitter.setMaxListeners(100) emitter.setMaxListeners(100)
return {id, request, emitter, tracker, controller, result, close} return {id, request, emitter, tracker, controller, close}
} }
export const calculateSubscriptionGroup = (sub: Subscription) => { export const calculateSubscriptionGroup = (sub: Subscription) => {
@@ -87,6 +84,7 @@ export const mergeSubscriptions = (subs: Subscription[]) => {
const mergedSub = makeSubscription({ const mergedSub = makeSubscription({
relays: [relay], relays: [relay],
timeout: callerSubs[0].request.timeout, timeout: callerSubs[0].request.timeout,
closeOnEose: callerSubs[0].request.closeOnEose,
filters: unionFilters(callerSubs.flatMap((sub: Subscription) => sub.request.filters)), filters: unionFilters(callerSubs.flatMap((sub: Subscription) => sub.request.filters)),
}) })
@@ -163,23 +161,13 @@ export const mergeSubscriptions = (subs: Subscription[]) => {
mergedSubscriptions.push(mergedSub) mergedSubscriptions.push(mergedSub)
groupSubscriptions.push(mergedSub) groupSubscriptions.push(mergedSub)
} }
// Propagate promise resolution
Promise.all(groupSubscriptions.map(sub => sub.result))
.then((chunks: Event[][]) => {
const events = uniqBy((event: Event) => event.id, flatten(chunks))
for (const sub of group) {
sub.result.resolve(events.filter((e: Event) => matchFilters(sub.request.filters, e)))
}
})
} }
return mergedSubscriptions return mergedSubscriptions
} }
export const executeSubscription = (sub: Subscription) => { export const executeSubscription = (sub: Subscription) => {
const {result, request, emitter, tracker, controller} = sub const {request, emitter, tracker, controller} = sub
const {timeout, filters, closeOnEose, relays, signal} = request const {timeout, filters, closeOnEose, relays, signal} = request
const executor = NetworkContext.getExecutor(relays) const executor = NetworkContext.getExecutor(relays)
const subs: {unsubscribe: () => void}[] = [] const subs: {unsubscribe: () => void}[] = []
@@ -209,7 +197,6 @@ export const executeSubscription = (sub: Subscription) => {
}) })
emitter.on(SubscriptionEvent.Complete, () => { emitter.on(SubscriptionEvent.Complete, () => {
result.resolve(events)
emitter.removeAllListeners() emitter.removeAllListeners()
subs.forEach(sub => sub.unsubscribe()) subs.forEach(sub => sub.unsubscribe())
executor.target.connections.forEach((c: Connection) => c.off("close", onClose)) executor.target.connections.forEach((c: Connection) => c.off("close", onClose))