Re-organize connection management
This commit is contained in:
@@ -0,0 +1,107 @@
|
||||
import {EventEmitter} from 'events'
|
||||
import {Socket, isMessage, asMessage} from './util/Socket'
|
||||
import type {SocketMessage} from './util/Socket'
|
||||
import {Queue} from './util/Queue'
|
||||
import {AuthStatus, ConnectionMeta} from './ConnectionMeta'
|
||||
|
||||
class SendQueue extends Queue {
|
||||
constructor(readonly cxn: Connection) {
|
||||
super()
|
||||
}
|
||||
|
||||
shouldSend(message: SocketMessage) {
|
||||
if (!this.cxn.socket.isReady()) {
|
||||
return false
|
||||
}
|
||||
|
||||
const [verb] = asMessage(message)
|
||||
|
||||
if (['AUTH', 'CLOSE'].includes(verb)) {
|
||||
return true
|
||||
}
|
||||
|
||||
// Only defer for auth if we're not multiplexing
|
||||
if (isMessage(message) && ![AuthStatus.Ok, AuthStatus.Pending].includes(this.cxn.meta.authStatus)) {
|
||||
return false
|
||||
}
|
||||
|
||||
if (verb === 'REQ') {
|
||||
return this.cxn.meta.pendingRequests.size < 10
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
handle(message: SocketMessage) {
|
||||
this.cxn.onSend(message)
|
||||
}
|
||||
}
|
||||
|
||||
class ReceiveQueue extends Queue {
|
||||
constructor(readonly cxn: Connection) {
|
||||
super()
|
||||
}
|
||||
|
||||
handle(message: SocketMessage) {
|
||||
this.cxn.onReceive(message)
|
||||
}
|
||||
}
|
||||
|
||||
export class Connection extends EventEmitter {
|
||||
url: string
|
||||
socket: Socket
|
||||
sendQueue: SendQueue
|
||||
receiveQueue: ReceiveQueue
|
||||
meta: ConnectionMeta
|
||||
|
||||
constructor(url: string) {
|
||||
super()
|
||||
|
||||
this.url = url
|
||||
this.socket = new Socket(url, this)
|
||||
this.sendQueue = new SendQueue(this)
|
||||
this.receiveQueue = new ReceiveQueue(this)
|
||||
this.meta = new ConnectionMeta(this)
|
||||
this.setMaxListeners(100)
|
||||
}
|
||||
|
||||
send = (m: SocketMessage) => this.sendQueue.push(m)
|
||||
|
||||
onOpen = () => this.emit('open', this)
|
||||
|
||||
onClose = () => this.emit('close', this)
|
||||
|
||||
onError = () => this.emit('fault', this)
|
||||
|
||||
onMessage = (m: SocketMessage) => this.receiveQueue.push(m)
|
||||
|
||||
onSend = (message: SocketMessage) => {
|
||||
this.emit('send', this, message)
|
||||
this.socket.send(message)
|
||||
}
|
||||
|
||||
onReceive = (message: SocketMessage) => {
|
||||
this.emit('receive', this, message)
|
||||
}
|
||||
|
||||
ensureConnected = ({shouldReconnect = true}) => {
|
||||
if (shouldReconnect && !this.socket.isHealthy()) {
|
||||
this.reset()
|
||||
}
|
||||
|
||||
if (this.socket.isPending()) {
|
||||
this.socket.connect()
|
||||
}
|
||||
}
|
||||
|
||||
reset() {
|
||||
this.socket.reset()
|
||||
this.sendQueue.clear()
|
||||
this.meta.clearPending()
|
||||
}
|
||||
|
||||
destroy() {
|
||||
this.socket.disconnect()
|
||||
this.removeAllListeners()
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,163 @@
|
||||
import type {Event, Filter} from './types'
|
||||
import type {Connection} from './Connection'
|
||||
|
||||
export type PublishMeta = {
|
||||
sent: number
|
||||
event: Event
|
||||
}
|
||||
|
||||
export type RequestMeta = {
|
||||
sent: number
|
||||
filters: Filter[]
|
||||
eoseReceived: boolean
|
||||
}
|
||||
|
||||
export enum AuthStatus {
|
||||
Ok = 'ok',
|
||||
Pending = 'pending',
|
||||
Unauthorized = 'unauthorized',
|
||||
Forbidden = 'forbidden',
|
||||
}
|
||||
|
||||
export enum ConnectionStatus {
|
||||
Unauthorized = 'unauthorized',
|
||||
Forbidden = 'forbidden',
|
||||
Error = 'error',
|
||||
Closed = 'closed',
|
||||
Slow = 'slow',
|
||||
Ok = 'ok',
|
||||
}
|
||||
|
||||
export class ConnectionMeta {
|
||||
authStatus = AuthStatus.Pending
|
||||
pendingPublishes = new Map<string, PublishMeta>()
|
||||
pendingRequests = new Map<string, RequestMeta>()
|
||||
publishCount = 0
|
||||
requestCount = 0
|
||||
eventCount = 0
|
||||
lastOpen = 0
|
||||
lastClose = 0
|
||||
lastFault = 0
|
||||
lastPublish = 0
|
||||
lastRequest = 0
|
||||
lastEvent = 0
|
||||
responseCount = 0
|
||||
responseTimer = 0
|
||||
|
||||
constructor(cxn: Connection) {
|
||||
cxn.on('open', () => {
|
||||
this.lastOpen = Date.now()
|
||||
})
|
||||
|
||||
cxn.on('close', () => {
|
||||
this.lastClose = Date.now()
|
||||
})
|
||||
|
||||
cxn.on('fault', () => {
|
||||
this.lastFault = Date.now()
|
||||
})
|
||||
|
||||
// @ts-ignore
|
||||
cxn.on('send', (cxn, [verb, ...payload]) => {
|
||||
// @ts-ignore
|
||||
if (verb === 'REQ') this.onSendRequest(...payload)
|
||||
// @ts-ignore
|
||||
if (verb === 'CLOSE') this.onSendClose(...payload)
|
||||
// @ts-ignore
|
||||
if (verb === 'EVENT') this.onSendEvent(...payload)
|
||||
})
|
||||
|
||||
// @ts-ignore
|
||||
cxn.on('receive', (cxn, [verb, ...payload]) => {
|
||||
// @ts-ignore
|
||||
if (verb === 'OK') this.onReceiveOk(...payload)
|
||||
// @ts-ignore
|
||||
if (verb === 'AUTH') this.onReceiveAuth(...payload)
|
||||
// @ts-ignore
|
||||
if (verb === 'EVENT') this.onReceiveEvent(...payload)
|
||||
// @ts-ignore
|
||||
if (verb === 'EOSE') this.onReceiveEose(...payload)
|
||||
})
|
||||
}
|
||||
|
||||
onSendRequest(subId: string, ...filters: Filter[]) {
|
||||
this.requestCount++
|
||||
this.lastRequest = Date.now()
|
||||
this.pendingRequests.set(subId, {
|
||||
filters,
|
||||
sent: Date.now(),
|
||||
eoseReceived: false,
|
||||
})
|
||||
}
|
||||
|
||||
onSendClose(subId: string) {
|
||||
this.pendingRequests.delete(subId)
|
||||
}
|
||||
|
||||
onSendEvent(event: Event) {
|
||||
this.publishCount++
|
||||
this.lastPublish = Date.now()
|
||||
this.pendingPublishes.set(event.id, {sent: Date.now(), event})
|
||||
}
|
||||
|
||||
onReceiveOk(eventId: string) {
|
||||
const publish = this.pendingPublishes.get(eventId)
|
||||
|
||||
this.authStatus = AuthStatus.Ok
|
||||
|
||||
if (publish) {
|
||||
this.responseCount++
|
||||
this.responseTimer += Date.now() - publish.sent
|
||||
this.pendingPublishes.delete(eventId)
|
||||
}
|
||||
}
|
||||
|
||||
onReceiveAuth(eventId: string) {
|
||||
this.authStatus = AuthStatus.Unauthorized
|
||||
}
|
||||
|
||||
onReceiveEvent(event: Event) {
|
||||
this.eventCount++
|
||||
this.lastEvent = Date.now()
|
||||
}
|
||||
|
||||
onReceiveEose(subId: string) {
|
||||
const request = this.pendingRequests.get(subId)
|
||||
|
||||
// Only count the first eose
|
||||
if (request && !request.eoseReceived) {
|
||||
request.eoseReceived = true
|
||||
|
||||
this.responseCount++
|
||||
this.responseTimer += Date.now() - request.sent
|
||||
}
|
||||
}
|
||||
|
||||
clearPending = () => {
|
||||
this.pendingPublishes.clear()
|
||||
this.pendingRequests.clear()
|
||||
}
|
||||
|
||||
getSpeed = () => this.responseCount ? this.responseTimer / this.responseCount : 0
|
||||
|
||||
getStatus = () => {
|
||||
if (this.authStatus === AuthStatus.Unauthorized) return ConnectionStatus.Unauthorized
|
||||
if (this.authStatus === AuthStatus.Forbidden) return ConnectionStatus.Forbidden
|
||||
if (this.lastFault > this.lastOpen) return ConnectionStatus.Error
|
||||
if (this.lastClose > this.lastOpen) return ConnectionStatus.Closed
|
||||
if (this.getSpeed() > 500) return ConnectionStatus.Slow
|
||||
|
||||
return ConnectionStatus.Ok
|
||||
}
|
||||
|
||||
getDescription = () => {
|
||||
switch (this.getStatus()) {
|
||||
case ConnectionStatus.Unauthorized: return 'Logging in'
|
||||
case ConnectionStatus.Forbidden: return 'Failed to log in'
|
||||
case ConnectionStatus.Error: return 'Failed to connect'
|
||||
case ConnectionStatus.Closed: return 'Waiting to reconnect'
|
||||
case ConnectionStatus.Slow: return 'Slow Connection'
|
||||
case ConnectionStatus.Ok: return 'Connected'
|
||||
}
|
||||
}
|
||||
}
|
||||
+30
-14
@@ -1,18 +1,34 @@
|
||||
import {EventEmitter} from 'events'
|
||||
import type {Event, Filter} from './types'
|
||||
import type {Message} from './util/Socket'
|
||||
|
||||
const createSubId = prefix => [prefix, Math.random().toString().slice(2, 10)].join('-')
|
||||
type Target = EventEmitter & {
|
||||
send: (...args: Message) => void
|
||||
}
|
||||
|
||||
type EventCallback = (url: string, event: Event) => void
|
||||
type EoseCallback = (url: string) => void
|
||||
type AuthCallback = (url: string, challenge: string) => void
|
||||
type OkCallback = (url: string, id: string, ...extra: any[]) => void
|
||||
type ErrorCallback = (url: string, id: string, ...extra: any[]) => void
|
||||
type CountCallback = (url: string, ...extra: any[]) => void
|
||||
type SubscribeOpts = {onEvent: EventCallback, onEose: EoseCallback}
|
||||
type PublishOpts = {verb: string, onOk: OkCallback, onError: ErrorCallback}
|
||||
type CountOpts = {onCount: CountCallback}
|
||||
type AuthOpts = {onAuth: AuthCallback, onOk: OkCallback}
|
||||
|
||||
const createSubId = (prefix: string) => [prefix, Math.random().toString().slice(2, 10)].join('-')
|
||||
|
||||
export class Executor {
|
||||
target: EventEmitter
|
||||
constructor(target) {
|
||||
this.target = target
|
||||
}
|
||||
subscribe(filters, {onEvent, onEose}) {
|
||||
|
||||
constructor(readonly target: Target) {}
|
||||
|
||||
subscribe(filters: Filter[], {onEvent, onEose}: SubscribeOpts) {
|
||||
let closed = false
|
||||
|
||||
const id = createSubId('REQ')
|
||||
const eventListener = (url, subid, e) => subid === id && onEvent?.(url, e)
|
||||
const eoseListener = (url, subid) => subid === id && onEose?.(url)
|
||||
const eventListener = (url: string, subid: string, e: Event) => subid === id && onEvent?.(url, e)
|
||||
const eoseListener = (url: string, subid: string) => subid === id && onEose?.(url)
|
||||
|
||||
this.target.on('EVENT', eventListener)
|
||||
this.target.on('EOSE', eoseListener)
|
||||
@@ -30,9 +46,9 @@ export class Executor {
|
||||
},
|
||||
}
|
||||
}
|
||||
publish(event, {verb = 'EVENT', onOk, onError} = {}) {
|
||||
const okListener = (url, id, ...payload) => id === event.id && onOk(url, id, ...payload)
|
||||
const errorListener = (url, id, ...payload) => id === event.id && onError(url, id, ...payload)
|
||||
publish(event: Event, {verb = 'EVENT', onOk, onError}: PublishOpts) {
|
||||
const okListener = (url: string, id: string, ...payload: any[]) => id === event.id && onOk(url, id, ...payload)
|
||||
const errorListener = (url: string, id: string, ...payload: any[]) => id === event.id && onError(url, id, ...payload)
|
||||
|
||||
this.target.on('OK', okListener)
|
||||
this.target.on('ERROR', errorListener)
|
||||
@@ -45,9 +61,9 @@ export class Executor {
|
||||
}
|
||||
}
|
||||
}
|
||||
count(filters, {onCount}) {
|
||||
count(filters: Filter[], {onCount}: CountOpts) {
|
||||
const id = createSubId('COUNT')
|
||||
const countListener = (url, subid, ...payload) => {
|
||||
const countListener = (url: string, subid: string, ...payload: any[]) => {
|
||||
if (subid === id) {
|
||||
onCount(url, ...payload)
|
||||
this.target.off('COUNT', countListener)
|
||||
@@ -61,7 +77,7 @@ export class Executor {
|
||||
unsubscribe: () => this.target.off('COUNT', countListener)
|
||||
}
|
||||
}
|
||||
handleAuth({onAuth, onOk}) {
|
||||
handleAuth({onAuth, onOk}: AuthOpts) {
|
||||
this.target.on('AUTH', onAuth)
|
||||
this.target.on('OK', onOk)
|
||||
|
||||
|
||||
-24
@@ -1,24 +0,0 @@
|
||||
import {EventEmitter} from 'events'
|
||||
|
||||
export class Plex extends EventEmitter {
|
||||
constructor(urls, socket) {
|
||||
super()
|
||||
|
||||
this.urls = urls
|
||||
this.socket = socket
|
||||
this.socket.on('receive', this.onMessage)
|
||||
}
|
||||
get sockets() {
|
||||
return [this.socket]
|
||||
}
|
||||
send = (...payload) => {
|
||||
this.socket.send([{relays: this.urls}, payload])
|
||||
}
|
||||
onMessage = (socket, [{relays}, [verb, ...payload]]) => {
|
||||
this.emit(verb, relays[0], ...payload)
|
||||
}
|
||||
cleanup = () => {
|
||||
this.removeAllListeners()
|
||||
this.socket.off('receive', this.onMessage)
|
||||
}
|
||||
}
|
||||
+23
-16
@@ -1,35 +1,42 @@
|
||||
import {Socket} from "./util/Socket"
|
||||
import {Connection} from "./Connection"
|
||||
import {EventEmitter} from 'events'
|
||||
|
||||
export class Pool extends EventEmitter {
|
||||
data: Map<string, Socket>
|
||||
data: Map<string, Connection>
|
||||
constructor() {
|
||||
super()
|
||||
|
||||
this.data = new Map()
|
||||
}
|
||||
has(url) {
|
||||
has(url: string) {
|
||||
return this.data.has(url)
|
||||
}
|
||||
get(url, {autoConnect = true} = {}) {
|
||||
if (!this.data.has(url) && autoConnect) {
|
||||
const socket = new Socket(url)
|
||||
get(url: string, {autoConnect = true} = {}) {
|
||||
let connection = this.data.get(url)
|
||||
|
||||
this.data.set(url, socket)
|
||||
this.emit('init', socket)
|
||||
if (autoConnect) {
|
||||
if (!connection) {
|
||||
connection = new Connection(url)
|
||||
|
||||
socket.on('open', () => this.emit('open', socket))
|
||||
socket.on('close', () => this.emit('close', socket))
|
||||
this.data.set(url, connection)
|
||||
this.emit('init', connection)
|
||||
|
||||
connection.on('open', () => this.emit('open', connection))
|
||||
connection.on('close', () => this.emit('close', connection))
|
||||
}
|
||||
|
||||
connection.ensureConnected({
|
||||
shouldReconnect: connection.meta.lastClose < Date.now() - 30_000,
|
||||
})
|
||||
}
|
||||
|
||||
return this.data.get(url)
|
||||
return connection
|
||||
}
|
||||
remove(url) {
|
||||
const socket = this.data.get(url)
|
||||
remove(url: string) {
|
||||
const connection = this.data.get(url)
|
||||
|
||||
if (socket) {
|
||||
socket.disconnect()
|
||||
socket.removeAllListeners()
|
||||
if (connection) {
|
||||
connection.destroy()
|
||||
|
||||
this.data.delete(url)
|
||||
}
|
||||
|
||||
@@ -1,23 +0,0 @@
|
||||
import {EventEmitter} from 'events'
|
||||
|
||||
export class Relay extends EventEmitter {
|
||||
constructor(socket) {
|
||||
super()
|
||||
|
||||
this.socket = socket
|
||||
this.socket.on('receive', this.onMessage)
|
||||
}
|
||||
get sockets() {
|
||||
return [this.socket]
|
||||
}
|
||||
send(...payload) {
|
||||
this.socket.send(payload)
|
||||
}
|
||||
onMessage = (socket, [verb, ...payload]) => {
|
||||
this.emit(verb, socket.url, ...payload)
|
||||
}
|
||||
cleanup = () => {
|
||||
this.removeAllListeners()
|
||||
this.socket.off('receive', this.onMessage)
|
||||
}
|
||||
}
|
||||
@@ -1,26 +0,0 @@
|
||||
import {EventEmitter} from 'events'
|
||||
|
||||
export class Relays extends EventEmitter {
|
||||
constructor(sockets) {
|
||||
super()
|
||||
|
||||
this.sockets = sockets
|
||||
this.sockets.forEach(socket => {
|
||||
socket.on('receive', this.onMessage)
|
||||
})
|
||||
}
|
||||
send = (...payload) => {
|
||||
this.sockets.forEach(socket => {
|
||||
socket.send(payload)
|
||||
})
|
||||
}
|
||||
onMessage = (socket, [verb, ...payload]) => {
|
||||
this.emit(verb, socket.url, ...payload)
|
||||
}
|
||||
cleanup = () => {
|
||||
this.removeAllListeners()
|
||||
this.sockets.forEach(socket => {
|
||||
socket.off('receive', this.onMessage)
|
||||
})
|
||||
}
|
||||
}
|
||||
+8
-5
@@ -1,7 +1,10 @@
|
||||
export * from "./util/Deferred"
|
||||
export * from "./util/Socket"
|
||||
export * from "./Connection"
|
||||
export * from "./ConnectionMeta"
|
||||
export * from "./Executor"
|
||||
export * from "./Plex"
|
||||
export * from "./Pool"
|
||||
export * from "./Relay"
|
||||
export * from "./Relays"
|
||||
export * from "./util/Deferred"
|
||||
export * from "./util/Queue"
|
||||
export * from "./util/Socket"
|
||||
export * from "./target/Plex"
|
||||
export * from "./target/Relay"
|
||||
export * from "./target/Relays"
|
||||
|
||||
@@ -0,0 +1,28 @@
|
||||
import {EventEmitter} from 'events'
|
||||
import {Connection} from '../Connection'
|
||||
import type {PlexMessage, Message} from '../util/Socket'
|
||||
|
||||
export class Plex extends EventEmitter {
|
||||
constructor(readonly urls: string[], readonly connection: Connection) {
|
||||
super()
|
||||
|
||||
this.connection.on('receive', this.onMessage)
|
||||
}
|
||||
|
||||
get connections() {
|
||||
return [this.connection]
|
||||
}
|
||||
|
||||
send = (...payload: Message) => {
|
||||
this.connection.send([{relays: this.urls}, payload])
|
||||
}
|
||||
|
||||
onMessage = (connection: Connection, [{relays}, [verb, ...payload]]: PlexMessage) => {
|
||||
this.emit(verb, relays[0], ...payload)
|
||||
}
|
||||
|
||||
cleanup = () => {
|
||||
this.removeAllListeners()
|
||||
this.connection.off('receive', this.onMessage)
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,28 @@
|
||||
import {EventEmitter} from 'events'
|
||||
import type {Connection} from '../Connection'
|
||||
import type {Message} from '../util/Socket'
|
||||
|
||||
export class Relay extends EventEmitter {
|
||||
constructor(readonly connection: Connection) {
|
||||
super()
|
||||
|
||||
this.connection.on('receive', this.onMessage)
|
||||
}
|
||||
|
||||
get connections() {
|
||||
return [this.connection]
|
||||
}
|
||||
|
||||
send(...payload: Message) {
|
||||
this.connection.send(payload)
|
||||
}
|
||||
|
||||
onMessage = (connection: Connection, [verb, ...payload]: Message) => {
|
||||
this.emit(verb, connection.url, ...payload)
|
||||
}
|
||||
|
||||
cleanup = () => {
|
||||
this.removeAllListeners()
|
||||
this.connection.off('receive', this.onMessage)
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,30 @@
|
||||
import {EventEmitter} from 'events'
|
||||
import type {Connection} from '../Connection'
|
||||
import type {Message} from '../util/Socket'
|
||||
|
||||
export class Relays extends EventEmitter {
|
||||
constructor(readonly connections: Connection[]) {
|
||||
super()
|
||||
|
||||
connections.forEach(connection => {
|
||||
connection.on('receive', this.onMessage)
|
||||
})
|
||||
}
|
||||
|
||||
send = (...payload: Message) => {
|
||||
this.connections.forEach(connection => {
|
||||
connection.send(payload)
|
||||
})
|
||||
}
|
||||
|
||||
onMessage = (connection: Connection, [verb, ...payload]: Message) => {
|
||||
this.emit(verb, connection.url, ...payload)
|
||||
}
|
||||
|
||||
cleanup = () => {
|
||||
this.removeAllListeners()
|
||||
this.connections.forEach(connection => {
|
||||
connection.off('receive', this.onMessage)
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,5 @@
|
||||
export type Event = {
|
||||
id: string
|
||||
}
|
||||
|
||||
export type Filter = Record<string, any>
|
||||
@@ -0,0 +1,46 @@
|
||||
export class Queue {
|
||||
timeout?: NodeJS.Timeout
|
||||
messages: any[] = []
|
||||
|
||||
clear() {
|
||||
this.messages = []
|
||||
}
|
||||
|
||||
push(message: any) {
|
||||
this.messages.push(message)
|
||||
this.enqueueWork()
|
||||
}
|
||||
|
||||
handle(message: any) {
|
||||
throw new Error("Not implemented")
|
||||
}
|
||||
|
||||
shouldSend(message: any) {
|
||||
return true
|
||||
}
|
||||
|
||||
doWork() {
|
||||
for (const message of this.messages.splice(0, 10)) {
|
||||
if (this.shouldSend(message)) {
|
||||
this.handle(message)
|
||||
} else {
|
||||
this.messages.push(message)
|
||||
}
|
||||
}
|
||||
|
||||
this.timeout = undefined
|
||||
this.enqueueWork()
|
||||
}
|
||||
|
||||
enqueueWork() {
|
||||
if (this.timeout) {
|
||||
return
|
||||
}
|
||||
|
||||
if (this.messages.length === 0) {
|
||||
return
|
||||
}
|
||||
|
||||
this.timeout = setTimeout(() => this.doWork(), 100) as NodeJS.Timeout
|
||||
}
|
||||
}
|
||||
+88
-110
@@ -1,144 +1,122 @@
|
||||
import type {MessageEvent} from 'isomorphic-ws'
|
||||
import WebSocket from "isomorphic-ws"
|
||||
import {EventEmitter} from 'events'
|
||||
import {Deferred, defer} from "./Deferred"
|
||||
|
||||
export class Socket extends EventEmitter {
|
||||
ws?: WebSocket
|
||||
url: string
|
||||
ready: Deferred<void>
|
||||
timeout?: NodeJS.Timeout
|
||||
receiveQueue: any[] = []
|
||||
sendQueue: any[] = []
|
||||
status: string
|
||||
static STATUS = {
|
||||
NEW: "new",
|
||||
UNAUTHORIZED: "unauthorized",
|
||||
PENDING: "pending",
|
||||
CLOSED: "closed",
|
||||
ERROR: "error",
|
||||
READY: "ready",
|
||||
}
|
||||
constructor(url: string) {
|
||||
super()
|
||||
export type Message = [string, ...any[]]
|
||||
|
||||
export type PlexMessage = [{relays: string[]}, Message]
|
||||
|
||||
export type SocketMessage = Message | PlexMessage
|
||||
|
||||
export const isMessage = (m: SocketMessage): boolean => typeof m[0] === 'string'
|
||||
|
||||
export const asMessage = (m: SocketMessage): Message => isMessage(m) ? m : m[1]
|
||||
|
||||
export type SocketOpts = {
|
||||
onOpen: () => void
|
||||
onClose: () => void
|
||||
onError: () => void
|
||||
onMessage: (message: SocketMessage) => void
|
||||
}
|
||||
|
||||
export class Socket {
|
||||
url: string
|
||||
ws?: WebSocket
|
||||
ready: Deferred<void>
|
||||
|
||||
constructor(url: string, readonly opts: SocketOpts) {
|
||||
this.url = url
|
||||
this.ready = defer()
|
||||
this.status = Socket.STATUS.NEW
|
||||
}
|
||||
|
||||
this.setMaxListeners(100)
|
||||
_close() {
|
||||
if (!this.ws) {
|
||||
throw new Error('Socket was closed before it was opened')
|
||||
}
|
||||
|
||||
// Avoid "WebSocket was closed before the connection was established"
|
||||
this.ready.then(() => this.ws?.close()).catch(() => null)
|
||||
}
|
||||
send = (message: any) => {
|
||||
this.connect()
|
||||
this.sendQueue.push(message)
|
||||
this.enqueueWork()
|
||||
|
||||
isPending() {
|
||||
return !this.ws
|
||||
}
|
||||
onMessage = (event: {data: string}) => {
|
||||
this.receiveQueue.push(event.data)
|
||||
this.enqueueWork()
|
||||
|
||||
isConnecting() {
|
||||
return this.ws?.readyState === WebSocket.CONNECTING
|
||||
}
|
||||
|
||||
isReady() {
|
||||
return this.ws?.readyState === WebSocket.OPEN
|
||||
}
|
||||
|
||||
isClosing() {
|
||||
return this.ws?.readyState === WebSocket.CLOSING
|
||||
}
|
||||
|
||||
isClosed() {
|
||||
return this.ws?.readyState === WebSocket.CLOSED
|
||||
}
|
||||
|
||||
isHealthy() {
|
||||
return this.isPending() || this.isConnecting() || this.isReady()
|
||||
}
|
||||
|
||||
onOpen = () => {
|
||||
this.status = Socket.STATUS.READY
|
||||
this.ready.resolve()
|
||||
this.emit('open', this)
|
||||
}
|
||||
onError = () => {
|
||||
this.disconnect()
|
||||
this.ready.reject()
|
||||
this.status = Socket.STATUS.ERROR
|
||||
this.emit('fault', this)
|
||||
this.opts.onOpen()
|
||||
}
|
||||
|
||||
onClose = () => {
|
||||
if (this.ws) {
|
||||
const ws = this.ws
|
||||
|
||||
// Avoid "WebSocket was closed before the connection was established"
|
||||
this.ready.then(() => ws.close(), () => null)
|
||||
this.ws.removeEventListener("open", this.onOpen)
|
||||
this.ws.removeEventListener("close", this.onClose)
|
||||
this.ws.removeEventListener("error", this.onError)
|
||||
// @ts-ignore
|
||||
this.ws.removeEventListener("message", this.onMessage)
|
||||
this.ws = undefined
|
||||
}
|
||||
|
||||
if (this.status !== Socket.STATUS.ERROR) {
|
||||
this.status = Socket.STATUS.CLOSED
|
||||
}
|
||||
|
||||
this.ready.reject()
|
||||
this.emit('close', this)
|
||||
this.opts.onClose()
|
||||
this._close()
|
||||
}
|
||||
connect = () => {
|
||||
const {NEW, CLOSED, PENDING} = Socket.STATUS
|
||||
|
||||
if ([NEW, CLOSED].includes(this.status)) {
|
||||
this.ready = defer()
|
||||
this.status = PENDING
|
||||
this.ws = new WebSocket(this.url)
|
||||
this.ws.addEventListener("open", this.onOpen)
|
||||
this.ws.addEventListener("close", this.onClose)
|
||||
this.ws.addEventListener("error", this.onError)
|
||||
// @ts-ignore
|
||||
this.ws.addEventListener("message", this.onMessage)
|
||||
}
|
||||
onError = () => {
|
||||
this.ready.reject()
|
||||
this.opts.onError()
|
||||
this._close()
|
||||
}
|
||||
disconnect = () => {
|
||||
this.onClose()
|
||||
}
|
||||
receiveMessage = (json: string) => {
|
||||
|
||||
onMessage = (event: MessageEvent) => {
|
||||
try {
|
||||
const message = JSON.parse(json)
|
||||
|
||||
if (message?.[0] == 'AUTH') {
|
||||
this.status = Socket.STATUS.UNAUTHORIZED
|
||||
}
|
||||
|
||||
if (message?.[0] == 'OK' && this.status === Socket.STATUS.UNAUTHORIZED) {
|
||||
this.status = Socket.STATUS.READY
|
||||
}
|
||||
|
||||
this.emit('receive', this, message)
|
||||
this.opts.onMessage(JSON.parse(event.data as string))
|
||||
} catch (e) {
|
||||
// pass
|
||||
}
|
||||
}
|
||||
sendMessage = (message: any) => {
|
||||
this.emit('send', this, message)
|
||||
|
||||
// @ts-ignore
|
||||
send = (message: any) => {
|
||||
if (!this.ws) {
|
||||
throw new Error('Send attempted before socket was opened')
|
||||
}
|
||||
|
||||
this.ws.send(JSON.stringify(message))
|
||||
}
|
||||
shouldDefer = (payload: any[]) => {
|
||||
if (this.ws?.readyState !== 1) {
|
||||
return true
|
||||
|
||||
connect = () => {
|
||||
if (this.ws) {
|
||||
throw new Error(`Already attempted connection for ${this.url}`)
|
||||
}
|
||||
|
||||
if (this.status === Socket.STATUS.UNAUTHORIZED) {
|
||||
return payload?.[0] !== 'AUTH'
|
||||
}
|
||||
|
||||
return this.status !== Socket.STATUS.READY
|
||||
this.ws = new WebSocket(this.url)
|
||||
this.ws.onopen = this.onOpen
|
||||
this.ws.onclose = this.onClose
|
||||
this.ws.onerror = this.onError
|
||||
this.ws.onmessage = this.onMessage
|
||||
}
|
||||
doWork = () => {
|
||||
this.timeout = undefined
|
||||
|
||||
for (const payload of this.receiveQueue.splice(0, 10)) {
|
||||
this.receiveMessage(payload)
|
||||
}
|
||||
|
||||
for (const payload of this.sendQueue.splice(0, 10)) {
|
||||
if (this.shouldDefer(payload)) {
|
||||
this.sendQueue.push(payload)
|
||||
} else {
|
||||
this.sendMessage(payload)
|
||||
}
|
||||
}
|
||||
|
||||
this.enqueueWork()
|
||||
disconnect() {
|
||||
this.onClose()
|
||||
}
|
||||
enqueueWork = () => {
|
||||
if (!this.timeout && (this.receiveQueue.length > 0 || this.sendQueue.length > 0)) {
|
||||
this.timeout = setTimeout(() => this.doWork(), 100) as NodeJS.Timeout
|
||||
|
||||
reset() {
|
||||
if (this.ws) {
|
||||
this._close()
|
||||
}
|
||||
|
||||
this.ws = undefined
|
||||
this.ready = defer()
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user