Refactor everything to use EventEmitter

This commit is contained in:
Jonathan Staab
2023-07-07 17:37:35 -07:00
parent 46470aabe4
commit 4d6ea603ff
11 changed files with 157 additions and 189 deletions
+35 -28
View File
@@ -1,62 +1,69 @@
import type {EventBus} from './util/EventBus.ts'
import {EventEmitter} from 'events'
const createSubId = prefix => [prefix, Math.random().toString().slice(2, 10)].join('-')
type Executable = {
bus: EventBus
send: (verb: string, ...args) => void
}
export class Executor {
target: Executable
target: EventEmitter
constructor(target) {
this.target = target
}
subscribe(filters, {onEvent, onEose}) {
const id = createSubId('REQ')
const unsubscribe = this.target.bus.addListeners({
EVENT: (url, subid, e) => subid === id && onEvent?.(url, e),
EOSE: (url, subid) => subid === id && onEose?.(url),
})
const eventListener = (url, subid, e) => subid === id && onEvent?.(url, e)
const eoseListener = (url, subid) => subid === id && onEose?.(url)
this.target.on('EVENT', eventListener)
this.target.on('EOSE', eoseListener)
this.target.send("REQ", id, ...filters)
return {
unsubscribe: () => {
this.target.send("CLOSE", id)
unsubscribe()
this.target.off('EVENT', eventListener)
this.target.off('EOSE', eoseListener)
},
}
}
publish(event, {verb = 'EVENT', onOk, onError}) {
const unsubscribe = this.target.bus.addListeners({
OK: (url, id, ...payload) => id === event.id && onOk(url, id, ...payload),
ERROR: (url, id, ...payload) => id === event.id && onError(url, id, ...payload),
})
const okListener = (url, id, ...payload) => id === event.id && onOk(url, id, ...payload)
const errorListener = (url, id, ...payload) => id === event.id && onError(url, id, ...payload)
this.target.on('OK', okListener)
this.target.on('ERROR', errorListener)
this.target.send(verb, event)
return {unsubscribe}
return {
unsubscribe: () => {
this.target.off('OK', okListener)
this.target.off('ERROR', errorListener)
}
}
}
count(filters, {onCount}) {
const id = createSubId('COUNT')
const unsubscribe = this.target.bus.addListeners({
COUNT: (url, subid, ...payload) => {
if (subid === id) {
onCount(url, ...payload)
unsubscribe()
}
const countListener = (url, subid, ...payload) => {
if (subid === id) {
onCount(url, ...payload)
this.target.off('COUNT', countListener)
}
})
}
this.target.on('COUNT', countListener)
this.target.send("COUNT", id, ...filters)
return {unsubscribe}
return {
unsubscribe: () => this.target.off('COUNT', countListener)
}
}
handleAuth({onAuth, onOk}) {
const unsubscribe = this.target.bus.addListeners({AUTH: onAuth, OK: onOk})
this.target.on('AUTH', onAuth)
this.target.on('OK', onOk)
return {unsubscribe}
return {
unsubscribe: () => {
this.target.off('AUTH', onAuth)
this.target.off('OK', onOk)
}
}
}
}