Support signals in publish/subscribe fixes #1
This commit is contained in:
+17
-10
@@ -9,6 +9,7 @@ export enum PublishStatus {
|
|||||||
Success = "success",
|
Success = "success",
|
||||||
Failure = "failure",
|
Failure = "failure",
|
||||||
Timeout = "timeout",
|
Timeout = "timeout",
|
||||||
|
Aborted = "aborted",
|
||||||
}
|
}
|
||||||
|
|
||||||
export type PublishStatusMap = Map<string, PublishStatus>
|
export type PublishStatusMap = Map<string, PublishStatus>
|
||||||
@@ -16,6 +17,7 @@ export type PublishStatusMap = Map<string, PublishStatus>
|
|||||||
export type PublishRequest = {
|
export type PublishRequest = {
|
||||||
event: Event
|
event: Event
|
||||||
relays: string[]
|
relays: string[]
|
||||||
|
signal?: AbortSignal
|
||||||
timeout?: number
|
timeout?: number
|
||||||
verb?: "EVENT" | "AUTH"
|
verb?: "EVENT" | "AUTH"
|
||||||
}
|
}
|
||||||
@@ -44,8 +46,16 @@ export const publish = (request: PublishRequest) => {
|
|||||||
const event = asEvent(request.event)
|
const event = asEvent(request.event)
|
||||||
const executor = NetworkContext.getExecutor(request.relays)
|
const executor = NetworkContext.getExecutor(request.relays)
|
||||||
|
|
||||||
|
const abort = (reason: PublishStatus) => () => {
|
||||||
|
for (const [url, status] of pub.status.entries()) {
|
||||||
|
if (status === PublishStatus.Pending) {
|
||||||
|
pub.emitter.emit(reason, url)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Listen to updates and keep status up to date. Every time there's an update, check to
|
// Listen to updates and keep status up to date. Every time there's an update, check to
|
||||||
// see if we're done. If we are, clear our timeout, executor, etc.
|
// see if we're done. If we are, clean everything up
|
||||||
pub.emitter.on("*", (status: PublishStatus, url: string) => {
|
pub.emitter.on("*", (status: PublishStatus, url: string) => {
|
||||||
pub.status.set(url, status)
|
pub.status.set(url, status)
|
||||||
|
|
||||||
@@ -57,21 +67,18 @@ export const publish = (request: PublishRequest) => {
|
|||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
// Start everything off as pending
|
// Start everything off as pending. Do it asynchronously to avoid breaking caller assumptions
|
||||||
setTimeout(() => {
|
setTimeout(() => {
|
||||||
for (const relay of request.relays) {
|
for (const relay of request.relays) {
|
||||||
pub.emitter.emit(PublishStatus.Pending, relay)
|
pub.emitter.emit(PublishStatus.Pending, relay)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
// Set a timeout
|
// Give up after a specified time
|
||||||
const timeout = setTimeout(() => {
|
const timeout = setTimeout(abort(PublishStatus.Timeout), request.timeout || 10_000)
|
||||||
for (const [url, status] of pub.status.entries()) {
|
|
||||||
if (status === PublishStatus.Pending) {
|
// If we have a signal, use it
|
||||||
pub.emitter.emit(PublishStatus.Timeout, url)
|
request.signal?.addEventListener('abort', abort(PublishStatus.Aborted))
|
||||||
}
|
|
||||||
}
|
|
||||||
}, request.timeout || 10_000)
|
|
||||||
|
|
||||||
// Delegate to our executor
|
// Delegate to our executor
|
||||||
const executorSub = executor.publish(event, {
|
const executorSub = executor.publish(event, {
|
||||||
|
|||||||
@@ -25,7 +25,6 @@ export enum SubscriptionEvent {
|
|||||||
Eose = "eose",
|
Eose = "eose",
|
||||||
Close = "close",
|
Close = "close",
|
||||||
Event = "event",
|
Event = "event",
|
||||||
Abort = "abort",
|
|
||||||
Complete = "complete",
|
Complete = "complete",
|
||||||
Duplicate = "duplicate",
|
Duplicate = "duplicate",
|
||||||
DeletedEvent = "deleted-event",
|
DeletedEvent = "deleted-event",
|
||||||
@@ -46,6 +45,7 @@ export type Subscription = {
|
|||||||
id: string
|
id: string
|
||||||
emitter: Emitter
|
emitter: Emitter
|
||||||
tracker: Tracker
|
tracker: Tracker
|
||||||
|
controller: AbortController
|
||||||
result: Deferred<Event[]>
|
result: Deferred<Event[]>
|
||||||
request: SubscribeRequest
|
request: SubscribeRequest
|
||||||
close: () => void
|
close: () => void
|
||||||
@@ -54,13 +54,14 @@ export type Subscription = {
|
|||||||
export const makeSubscription = (request: SubscribeRequest) => {
|
export const makeSubscription = (request: SubscribeRequest) => {
|
||||||
const id = randomId()
|
const id = randomId()
|
||||||
const emitter = new Emitter()
|
const emitter = new Emitter()
|
||||||
|
const controller = new AbortController()
|
||||||
const result = defer<Event[]>()
|
const result = defer<Event[]>()
|
||||||
const tracker = request.tracker || new Tracker()
|
const tracker = request.tracker || new Tracker()
|
||||||
const close = () => emitter.emit('abort')
|
const close = () => controller.abort()
|
||||||
|
|
||||||
emitter.setMaxListeners(100)
|
emitter.setMaxListeners(100)
|
||||||
|
|
||||||
return {id, request, emitter, tracker, result, close}
|
return {id, request, emitter, tracker, controller, result, close}
|
||||||
}
|
}
|
||||||
|
|
||||||
export const calculateSubscriptionGroup = (sub: Subscription) => {
|
export const calculateSubscriptionGroup = (sub: Subscription) => {
|
||||||
@@ -86,9 +87,8 @@ export const mergeSubscriptions = (subs: Subscription[]) => {
|
|||||||
filters: unionFilters(callerSubs.flatMap((sub: Subscription) => sub.request.filters)),
|
filters: unionFilters(callerSubs.flatMap((sub: Subscription) => sub.request.filters)),
|
||||||
})
|
})
|
||||||
|
|
||||||
for (const {id, emitter} of callerSubs) {
|
for (const {id, controller} of callerSubs) {
|
||||||
// Propagate abort event from the caller to the merged subscription
|
controller.signal.addEventListener('abort', () => {
|
||||||
emitter.on(SubscriptionEvent.Abort, () => {
|
|
||||||
abortedSubs.add(id)
|
abortedSubs.add(id)
|
||||||
|
|
||||||
if (abortedSubs.size === callerSubs.length) {
|
if (abortedSubs.size === callerSubs.length) {
|
||||||
@@ -171,7 +171,7 @@ export const mergeSubscriptions = (subs: Subscription[]) => {
|
|||||||
}
|
}
|
||||||
|
|
||||||
export const executeSubscription = (sub: Subscription) => {
|
export const executeSubscription = (sub: Subscription) => {
|
||||||
const {result, request, emitter, tracker} = sub
|
const {result, request, emitter, tracker, controller} = sub
|
||||||
const {timeout, filters, closeOnEose, relays} = request
|
const {timeout, filters, closeOnEose, relays} = request
|
||||||
const executor = NetworkContext.getExecutor(relays)
|
const executor = NetworkContext.getExecutor(relays)
|
||||||
const events: Event[] = []
|
const events: Event[] = []
|
||||||
@@ -234,8 +234,8 @@ export const executeSubscription = (sub: Subscription) => {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Allow the caller to cancel the subscription
|
// Listen for abort via signal
|
||||||
emitter.on(SubscriptionEvent.Abort, complete)
|
controller.signal.addEventListener('abort', complete)
|
||||||
|
|
||||||
// If we have a timeout, complete the subscription automatically
|
// If we have a timeout, complete the subscription automatically
|
||||||
if (timeout) setTimeout(complete, timeout)
|
if (timeout) setTimeout(complete, timeout)
|
||||||
|
|||||||
Reference in New Issue
Block a user