Add negentropy support to executor
This commit is contained in:
@@ -3,6 +3,7 @@ import type {Emitter} from '@welshman/lib'
|
||||
import type {SignedEvent, Filter} from '@welshman/util'
|
||||
import type {Message} from './Socket'
|
||||
import type {Connection} from './Connection'
|
||||
import {Negentropy, NegentropyStorageVector} from './Negentropy'
|
||||
|
||||
export type Target = Emitter & {
|
||||
connections: Connection[]
|
||||
@@ -12,10 +13,13 @@ export type Target = Emitter & {
|
||||
|
||||
type EventCallback = (url: string, event: SignedEvent) => void
|
||||
type EoseCallback = (url: string) => void
|
||||
type CloseCallback = () => void
|
||||
type OkCallback = (url: string, id: string, ...extra: any[]) => void
|
||||
type ErrorCallback = (url: string, id: string, ...extra: any[]) => void
|
||||
type DiffMessageCallback = (url: string, {have, need}: {have: string[], need: string[]}) => void
|
||||
type SubscribeOpts = {onEvent?: EventCallback, onEose?: EoseCallback}
|
||||
type PublishOpts = {verb?: string, onOk?: OkCallback, onError?: ErrorCallback}
|
||||
type DiffOpts = {onError?: ErrorCallback, onMessage?: DiffMessageCallback, onClose?: CloseCallback}
|
||||
|
||||
const createSubId = (prefix: string) => [prefix, Math.random().toString().slice(2, 10)].join('-')
|
||||
|
||||
@@ -50,11 +54,11 @@ export class Executor {
|
||||
|
||||
return {
|
||||
unsubscribe: () => {
|
||||
if (!closed) {
|
||||
this.target.send("CLOSE", id)
|
||||
this.target.off('EVENT', eventListener)
|
||||
this.target.off('EOSE', eoseListener)
|
||||
}
|
||||
if (closed) return
|
||||
|
||||
this.target.send("CLOSE", id)
|
||||
this.target.off('EVENT', eventListener)
|
||||
this.target.off('EOSE', eoseListener)
|
||||
|
||||
closed = true
|
||||
},
|
||||
@@ -86,5 +90,60 @@ export class Executor {
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
diff(filter: Filter, events: SignedEvent[], {onMessage, onError, onClose}: DiffOpts = {}) {
|
||||
let closed = false
|
||||
|
||||
const id = createSubId('NEG')
|
||||
const storage = new NegentropyStorageVector()
|
||||
const neg = new Negentropy(storage, 50_000)
|
||||
|
||||
for (const event of events) {
|
||||
storage.insert(event.created_at, event.id)
|
||||
}
|
||||
|
||||
storage.seal()
|
||||
|
||||
const msgListener = async (url: string, negid: string, msg: string) => {
|
||||
if (negid === id) {
|
||||
const [newMsg, have, need] = await neg.reconcile(msg)
|
||||
|
||||
onMessage?.(url, {have, need})
|
||||
|
||||
if (newMsg) {
|
||||
this.target.send('NEG-MSG', id, newMsg)
|
||||
} else {
|
||||
close()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const errListener = (url: string, negid: string, msg: string) => {
|
||||
if (negid === id) {
|
||||
onError?.(url, msg)
|
||||
}
|
||||
}
|
||||
|
||||
const close = () => {
|
||||
if (closed) return
|
||||
|
||||
this.target.send('NEG-CLOSE', id)
|
||||
this.target.off('NEG-MSG', msgListener)
|
||||
this.target.off('NEG-ERR', errListener)
|
||||
|
||||
closed = true
|
||||
onClose?.()
|
||||
}
|
||||
|
||||
this.target.on('NEG-MSG', msgListener)
|
||||
this.target.on('NEG-ERR', errListener)
|
||||
|
||||
neg.initiate().then((msg: string) => {
|
||||
this.target.send("NEG-OPEN", id, filter, msg)
|
||||
})
|
||||
|
||||
return {
|
||||
unsubscribe: close,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -326,7 +326,9 @@ export const executeSubscriptionBatched = (() => {
|
||||
|
||||
return (sub: Subscription) => {
|
||||
subs.push(sub)
|
||||
timeouts.push(setTimeout(executeAll, Math.max(16, sub.request.delay!)))
|
||||
timeouts.push(
|
||||
setTimeout(executeAll, Math.max(16, sub.request.delay!)) as unknown as number
|
||||
)
|
||||
}
|
||||
})()
|
||||
|
||||
|
||||
@@ -7,6 +7,7 @@ export * from "./Publish"
|
||||
export * from "./Socket"
|
||||
export * from "./Subscribe"
|
||||
export * from "./Tracker"
|
||||
export * from "./target/Echo"
|
||||
export * from "./target/Multi"
|
||||
export * from "./target/Plex"
|
||||
export * from "./target/Relay"
|
||||
|
||||
@@ -0,0 +1,16 @@
|
||||
import {Emitter} from '@welshman/lib'
|
||||
import type {Message} from '../Socket'
|
||||
|
||||
export class Echo extends Emitter {
|
||||
get connections() {
|
||||
return []
|
||||
}
|
||||
|
||||
send(...payload: Message) {
|
||||
this.emit(...payload)
|
||||
}
|
||||
|
||||
cleanup = () => {
|
||||
this.removeAllListeners()
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user