Add event bus to pool for socket open/close

This commit is contained in:
Jonathan Staab
2023-03-28 15:07:06 -05:00
parent 5131a89c29
commit 557ab542b7
4 changed files with 25 additions and 13 deletions
+2 -2
View File
@@ -1,12 +1,12 @@
{ {
"name": "paravel", "name": "paravel",
"version": "0.1.5", "version": "0.1.7",
"lockfileVersion": 2, "lockfileVersion": 2,
"requires": true, "requires": true,
"packages": { "packages": {
"": { "": {
"name": "paravel", "name": "paravel",
"version": "0.1.5", "version": "0.1.7",
"license": "MIT", "license": "MIT",
"dependencies": { "dependencies": {
"husky": "^8.0.3", "husky": "^8.0.3",
+1 -1
View File
@@ -1,6 +1,6 @@
{ {
"name": "paravel", "name": "paravel",
"version": "0.1.6", "version": "0.1.7",
"description": "Yet another toolkit for nostr", "description": "Yet another toolkit for nostr",
"repository": { "repository": {
"type": "git", "type": "git",
+11 -2
View File
@@ -1,16 +1,25 @@
import {Socket} from "./util/Socket" import {Socket} from "./util/Socket"
import {EventBus} from "./util/EventBus"
export class Pool { export class Pool {
data: Map<string, Socket> data: Map<string, Socket>
constructor() { constructor() {
this.data = new Map() this.data = new Map()
this.bus = new EventBus()
} }
has(url) { has(url) {
return this.data.has(url) return this.data.has(url)
} }
get(url) { get(url) {
if (!this.data.has(url)) { if (!this.data.has(url)) {
this.data.set(url, new Socket(url)) const socket = new Socket(url)
this.data.set(url, socket)
socket.bus.addListeners({
open: () => this.bus.emit('open', {url}),
close: () => this.bus.emit('close', {url}),
})
} }
return this.data.get(url) return this.data.get(url)
@@ -19,7 +28,7 @@ export class Pool {
const socket = this.data.get(url) const socket = this.data.get(url)
if (socket) { if (socket) {
socket.disconnect() socket.cleanup()
this.data.delete(url) this.data.delete(url)
} }
} }
+11 -8
View File
@@ -31,6 +31,7 @@ export class Socket {
this._onOpen = e => { this._onOpen = e => {
this.status = Socket.STATUS.READY this.status = Socket.STATUS.READY
this.ready?.resolve() this.ready?.resolve()
this.bus.emit('open')
} }
this._onMessage = e => { this._onMessage = e => {
@@ -45,6 +46,7 @@ export class Socket {
this.disconnect() this.disconnect()
this.ready?.reject() this.ready?.reject()
this.status = Socket.STATUS.CLOSED this.status = Socket.STATUS.CLOSED
this.bus.emit('close')
} }
} }
async connect() { async connect() {
@@ -65,14 +67,15 @@ export class Socket {
await this.ready?.catch(() => null) await this.ready?.catch(() => null)
} }
disconnect() { disconnect() {
if (this.ws) { this.ws?.close()
this.ws.close() this.ws?.removeEventListener("open", this._onOpen)
this.ws.removeEventListener("open", this._onOpen) this.ws?.removeEventListener("message", this._onMessage)
this.ws.removeEventListener("message", this._onMessage) this.ws?.removeEventListener("close", this._onClose)
this.ws.removeEventListener("error", this._onClose) this.ws = undefined
this.ws.removeEventListener("close", this._onClose) }
this.ws = undefined cleanup() {
} this.disconnect()
this.bus.clear()
} }
handleMessages() { handleMessages() {
for (const json of this.queue.splice(0, 10)) { for (const json of this.queue.splice(0, 10)) {