Fix Plex data transformation, handle socket errors

This commit is contained in:
Jonathan Staab
2023-03-29 10:06:12 -05:00
parent 557ab542b7
commit da3b176b49
4 changed files with 36 additions and 23 deletions
+5 -5
View File
@@ -5,10 +5,10 @@ export class Plex {
this.urls = urls
this.socket = socket
this.bus = new EventBus()
this.listeners = sockets.map(socket => {
return socket.bus.addListener('message', (url, [verb, ...payload]) => {
this.bus.emit(verb, url, ...payload)
})
this.unsubscribe = socket.bus.addListeners({
message: (websocketUrl, [{relays}, [verb, ...payload]]) => {
this.bus.emit(verb, relays[0], ...payload)
},
})
}
async send(...payload) {
@@ -18,6 +18,6 @@ export class Plex {
}
cleanup() {
this.bus.clear()
this.listeners.map(unsubscribe => unsubscribe())
this.unsubscribe()
}
}
+27 -14
View File
@@ -5,13 +5,14 @@ import {Deferred, defer} from "./Deferred"
export class Socket {
ws?: WebSocket
url: string
ready?: Deferred<void>
ready: Deferred<void>
timeout?: NodeJS.Timeout
queue: string[]
bus: EventBus
status: string
_onOpen: (e: any) => void
_onMessage: (e: any) => void
_onError: (e: any) => void
_onClose: (e: any) => void
static STATUS = {
NEW: "new",
@@ -22,29 +23,33 @@ export class Socket {
constructor(url: string) {
this.ws = undefined
this.url = url
this.ready = undefined
this.ready = defer()
this.timeout = undefined
this.queue = []
this.bus = new EventBus()
this.status = Socket.STATUS.NEW
this._onOpen = e => {
this._onOpen = () => {
this.status = Socket.STATUS.READY
this.ready?.resolve()
this.ready.resolve()
this.bus.emit('open')
}
this._onMessage = e => {
this.queue.push(e.data as string)
this._onMessage = event => {
this.queue.push(event.data as string)
if (!this.timeout) {
this.handleMessagesAsync()
}
}
this._onClose = e => {
this._onError = (err: Error) => {
this.bus.emit('error', err)
}
this._onClose = () => {
this.disconnect()
this.ready?.reject()
this.ready.reject()
this.status = Socket.STATUS.CLOSED
this.bus.emit('close')
}
@@ -61,17 +66,25 @@ export class Socket {
this.ws.addEventListener("open", this._onOpen)
this.ws.addEventListener("message", this._onMessage)
this.ws.addEventListener("error", this._onError)
this.ws.addEventListener("close", this._onClose)
}
await this.ready?.catch(() => null)
await this.ready.catch(() => null)
}
disconnect() {
this.ws?.close()
this.ws?.removeEventListener("open", this._onOpen)
this.ws?.removeEventListener("message", this._onMessage)
this.ws?.removeEventListener("close", this._onClose)
this.ws = undefined
if (this.ws) {
const ws = this.ws
// Avoid "WebSocket was closed before the connection was established"
this.ready.then(() => ws.close())
this.ws.removeEventListener("open", this._onOpen)
this.ws.removeEventListener("message", this._onMessage)
this.ws.removeEventListener("error", this._onError)
this.ws.removeEventListener("close", this._onClose)
this.ws = undefined
}
}
cleanup() {
this.disconnect()