Remove executor load/count, add closeOnEose to subscription
This commit is contained in:
+1
-1
@@ -1,6 +1,6 @@
|
|||||||
{
|
{
|
||||||
"name": "paravel",
|
"name": "paravel",
|
||||||
"version": "0.3.9",
|
"version": "0.4.0",
|
||||||
"description": "Yet another toolkit for nostr",
|
"description": "Yet another toolkit for nostr",
|
||||||
"author": "hodlbod",
|
"author": "hodlbod",
|
||||||
"license": "MIT",
|
"license": "MIT",
|
||||||
|
|||||||
@@ -11,15 +11,11 @@ export type Target = Emitter & {
|
|||||||
|
|
||||||
type EventCallback = (url: string, event: Event) => void
|
type EventCallback = (url: string, event: Event) => void
|
||||||
type EoseCallback = (url: string) => void
|
type EoseCallback = (url: string) => void
|
||||||
type CloseCallback = () => void
|
|
||||||
type AuthCallback = (url: string, challenge: string) => void
|
type AuthCallback = (url: string, challenge: string) => void
|
||||||
type OkCallback = (url: string, id: string, ...extra: any[]) => void
|
type OkCallback = (url: string, id: string, ...extra: any[]) => void
|
||||||
type ErrorCallback = (url: string, id: string, ...extra: any[]) => void
|
type ErrorCallback = (url: string, id: string, ...extra: any[]) => void
|
||||||
type CountCallback = (url: string, ...extra: any[]) => void
|
|
||||||
type SubscribeOpts = {onEvent?: EventCallback, onEose?: EoseCallback}
|
type SubscribeOpts = {onEvent?: EventCallback, onEose?: EoseCallback}
|
||||||
type LoadOpts = SubscribeOpts & {timeout?: number, onClose?: CloseCallback}
|
|
||||||
type PublishOpts = {verb: string, onOk: OkCallback, onError: ErrorCallback}
|
type PublishOpts = {verb: string, onOk: OkCallback, onError: ErrorCallback}
|
||||||
type CountOpts = {onCount: CountCallback}
|
|
||||||
type AuthOpts = {onAuth: AuthCallback, onOk: OkCallback}
|
type AuthOpts = {onAuth: AuthCallback, onOk: OkCallback}
|
||||||
|
|
||||||
const createSubId = (prefix: string) => [prefix, Math.random().toString().slice(2, 10)].join('-')
|
const createSubId = (prefix: string) => [prefix, Math.random().toString().slice(2, 10)].join('-')
|
||||||
@@ -68,23 +64,6 @@ export class Executor {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
count(filters: Filter[], {onCount}: CountOpts) {
|
|
||||||
const id = createSubId('COUNT')
|
|
||||||
const countListener = (url: string, subid: string, ...payload: any[]) => {
|
|
||||||
if (subid === id) {
|
|
||||||
onCount(url, ...payload)
|
|
||||||
this.target.off('COUNT', countListener)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
this.target.on('COUNT', countListener)
|
|
||||||
this.target.send("COUNT", id, ...filters)
|
|
||||||
|
|
||||||
return {
|
|
||||||
unsubscribe: () => this.target.off('COUNT', countListener)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
handleAuth({onAuth, onOk}: AuthOpts) {
|
handleAuth({onAuth, onOk}: AuthOpts) {
|
||||||
this.target.on('AUTH', onAuth)
|
this.target.on('AUTH', onAuth)
|
||||||
this.target.on('OK', onOk)
|
this.target.on('OK', onOk)
|
||||||
@@ -96,31 +75,5 @@ export class Executor {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
load(filters: Filter[], {timeout = 30_000, onEvent, onEose, onClose}: LoadOpts) {
|
|
||||||
const eose = new Set()
|
|
||||||
|
|
||||||
const unsubscribe = () => {
|
|
||||||
onClose?.()
|
|
||||||
sub.unsubscribe()
|
|
||||||
clearTimeout(handle)
|
|
||||||
}
|
|
||||||
|
|
||||||
const handle = setTimeout(unsubscribe, timeout)
|
|
||||||
|
|
||||||
const sub = this.subscribe(filters, {
|
|
||||||
onEvent,
|
|
||||||
onEose: (url: string) => {
|
|
||||||
onEose?.(url)
|
|
||||||
eose.add(url)
|
|
||||||
|
|
||||||
if (eose.size === this.target.connections.length) {
|
|
||||||
unsubscribe()
|
|
||||||
}
|
|
||||||
},
|
|
||||||
})
|
|
||||||
|
|
||||||
return {unsubscribe}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
+14
-4
@@ -8,12 +8,14 @@ export type SubscriptionOpts = {
|
|||||||
executor: Executor
|
executor: Executor
|
||||||
filters: Filter[]
|
filters: Filter[]
|
||||||
timeout?: number
|
timeout?: number
|
||||||
hasSeen?: (e: Event) => boolean
|
closeOnEose?: boolean
|
||||||
|
hasSeen?: (e: Event, url: string) => boolean
|
||||||
}
|
}
|
||||||
|
|
||||||
export class Subscription extends EventEmitter {
|
export class Subscription extends EventEmitter {
|
||||||
unsubscribe: () => void
|
unsubscribe: () => void
|
||||||
seen = new Set<string>()
|
seen = new Set<string>()
|
||||||
|
eose = new Set<string>()
|
||||||
opened = Date.now()
|
opened = Date.now()
|
||||||
closed?: number
|
closed?: number
|
||||||
|
|
||||||
@@ -39,9 +41,9 @@ export class Subscription extends EventEmitter {
|
|||||||
this.unsubscribe = sub.unsubscribe
|
this.unsubscribe = sub.unsubscribe
|
||||||
}
|
}
|
||||||
|
|
||||||
hasSeen = (event: Event) => {
|
hasSeen = (event: Event, url: string) => {
|
||||||
if (this.opts.hasSeen) {
|
if (this.opts.hasSeen) {
|
||||||
return this.opts.hasSeen(event)
|
return this.opts.hasSeen(event, url)
|
||||||
}
|
}
|
||||||
|
|
||||||
if (this.seen.has(event.id)) {
|
if (this.seen.has(event.id)) {
|
||||||
@@ -56,7 +58,7 @@ export class Subscription extends EventEmitter {
|
|||||||
onEvent = (url: string, event: Event) => {
|
onEvent = (url: string, event: Event) => {
|
||||||
// If we've seen this event, don't re-validate
|
// If we've seen this event, don't re-validate
|
||||||
// Otherwise, check the signature and filters
|
// Otherwise, check the signature and filters
|
||||||
if (this.hasSeen(event)) {
|
if (this.hasSeen(event, url)) {
|
||||||
this.emit("duplicate", event, url)
|
this.emit("duplicate", event, url)
|
||||||
} else {
|
} else {
|
||||||
if (!hasValidSignature(event)) {
|
if (!hasValidSignature(event)) {
|
||||||
@@ -70,7 +72,15 @@ export class Subscription extends EventEmitter {
|
|||||||
}
|
}
|
||||||
|
|
||||||
onEose = (url: string) => {
|
onEose = (url: string) => {
|
||||||
|
const {executor, closeOnEose} = this.opts
|
||||||
|
|
||||||
this.emit("eose", url)
|
this.emit("eose", url)
|
||||||
|
|
||||||
|
this.eose.add(url)
|
||||||
|
|
||||||
|
if (closeOnEose && this.eose.size === executor.target.connections.length) {
|
||||||
|
this.close()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
close = () => {
|
close = () => {
|
||||||
|
|||||||
@@ -2,6 +2,7 @@ export * from "./Connection"
|
|||||||
export * from "./ConnectionMeta"
|
export * from "./ConnectionMeta"
|
||||||
export * from "./Executor"
|
export * from "./Executor"
|
||||||
export * from "./Pool"
|
export * from "./Pool"
|
||||||
|
export * from "./Subscription"
|
||||||
export * from "./util/nostr"
|
export * from "./util/nostr"
|
||||||
export * from "./util/Deferred"
|
export * from "./util/Deferred"
|
||||||
export * from "./util/Emitter"
|
export * from "./util/Emitter"
|
||||||
|
|||||||
Reference in New Issue
Block a user