Add custom emitter and wrapper target

This commit is contained in:
Jonathan Staab
2023-09-14 13:07:01 -07:00
parent 3b9b6f9dc0
commit 47ddbf6498
10 changed files with 57 additions and 13 deletions
+2 -2
View File
@@ -1,6 +1,6 @@
import {EventEmitter} from 'events'
import {Socket, isMessage, asMessage} from './util/Socket'
import type {SocketMessage} from './util/Socket'
import {Emitter} from './util/Emitter'
import {Queue} from './util/Queue'
import {AuthStatus, ConnectionMeta} from './ConnectionMeta'
@@ -52,7 +52,7 @@ class ReceiveQueue extends Queue {
}
}
export class Connection extends EventEmitter {
export class Connection extends Emitter {
url: string
socket: Socket
sendQueue: SendQueue
+8 -2
View File
@@ -1,9 +1,12 @@
import {EventEmitter} from 'events'
import type {Event, Filter} from './types'
import type {Connection} from './Connection'
import type {Emitter} from './util/Emitter'
import type {Message} from './util/Socket'
type Target = EventEmitter & {
export type Target = Emitter & {
connections: Connection[]
send: (...args: Message) => void
cleanup: () => void
}
type EventCallback = (url: string, event: Event) => void
@@ -46,6 +49,7 @@ export class Executor {
},
}
}
publish(event: Event, {verb = 'EVENT', onOk, onError}: PublishOpts) {
const okListener = (url: string, id: string, ...payload: any[]) => id === event.id && onOk(url, id, ...payload)
const errorListener = (url: string, id: string, ...payload: any[]) => id === event.id && onError(url, id, ...payload)
@@ -61,6 +65,7 @@ export class Executor {
}
}
}
count(filters: Filter[], {onCount}: CountOpts) {
const id = createSubId('COUNT')
const countListener = (url: string, subid: string, ...payload: any[]) => {
@@ -77,6 +82,7 @@ export class Executor {
unsubscribe: () => this.target.off('COUNT', countListener)
}
}
handleAuth({onAuth, onOk}: AuthOpts) {
this.target.on('AUTH', onAuth)
this.target.on('OK', onOk)
+2 -2
View File
@@ -1,7 +1,7 @@
import {Connection} from "./Connection"
import {EventEmitter} from 'events'
import {Emitter} from './util/Emitter'
export class Pool extends EventEmitter {
export class Pool extends Emitter {
data: Map<string, Connection>
constructor() {
super()
+2
View File
@@ -3,8 +3,10 @@ export * from "./ConnectionMeta"
export * from "./Executor"
export * from "./Pool"
export * from "./util/Deferred"
export * from "./util/Emitter"
export * from "./util/Queue"
export * from "./util/Socket"
export * from "./target/Plex"
export * from "./target/Relay"
export * from "./target/Relays"
export * from "./target/Multi"
+26
View File
@@ -0,0 +1,26 @@
import type {Target} from '../Executor'
import type {Message} from '../util/Socket'
import {Emitter} from '../util/Emitter'
export class Multi extends Emitter {
constructor(readonly targets: Target[]) {
super()
targets.forEach(t => {
t.on('*', (verb, ...args) => this.emit(verb, ...args))
})
}
get connections() {
return this.targets.flatMap(t => t.connections)
}
send(...payload: Message) {
this.targets.forEach(t => t.send(...payload))
}
cleanup = () => {
this.removeAllListeners()
this.targets.forEach(t => t.cleanup())
}
}
+2 -2
View File
@@ -1,8 +1,8 @@
import {EventEmitter} from 'events'
import {Connection} from '../Connection'
import {Emitter} from '../util/Emitter'
import type {PlexMessage, Message} from '../util/Socket'
export class Plex extends EventEmitter {
export class Plex extends Emitter {
constructor(readonly urls: string[], readonly connection: Connection) {
super()
+2 -2
View File
@@ -1,8 +1,8 @@
import {EventEmitter} from 'events'
import type {Connection} from '../Connection'
import type {Message} from '../util/Socket'
import {Emitter} from '../util/Emitter'
export class Relay extends EventEmitter {
export class Relay extends Emitter {
constructor(readonly connection: Connection) {
super()
+2 -2
View File
@@ -1,8 +1,8 @@
import {EventEmitter} from 'events'
import type {Connection} from '../Connection'
import type {Message} from '../util/Socket'
import {Emitter} from '../util/Emitter'
export class Relays extends EventEmitter {
export class Relays extends Emitter {
constructor(readonly connections: Connection[]) {
super()
+10
View File
@@ -0,0 +1,10 @@
import {EventEmitter} from 'events'
export class Emitter extends EventEmitter {
emit(type: string | symbol, ...args: any[]) {
const a = super.emit(type, ...args)
const b = super.emit('*', type, ...args)
return a && b
}
}