From ef18009d3315d4461ea9e8fe4ad5ac0b9a752efd Mon Sep 17 00:00:00 2001 From: Jonathan Staab Date: Thu, 27 Jul 2023 08:40:39 -0700 Subject: [PATCH] Send sockets to listeners rather than urls --- package.json | 2 +- src/Plex.ts | 7 ++++--- src/Pool.ts | 8 +++++--- src/Relay.ts | 9 +++++---- src/Relays.ts | 9 +++++---- src/util/Socket.ts | 29 +++++++++++++++++------------ 6 files changed, 37 insertions(+), 27 deletions(-) diff --git a/package.json b/package.json index 3e36874..ee6b1ee 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "paravel", - "version": "0.1.19", + "version": "0.2.0", "description": "Yet another toolkit for nostr", "repository": { "type": "git", diff --git a/src/Plex.ts b/src/Plex.ts index 954f6c9..f908fa5 100644 --- a/src/Plex.ts +++ b/src/Plex.ts @@ -6,7 +6,7 @@ export class Plex extends EventEmitter { this.urls = urls this.socket = socket - this.socket.on('message', this.onMessage) + this.socket.on('receive', this.onMessage) } get sockets() { return [this.socket] @@ -14,10 +14,11 @@ export class Plex extends EventEmitter { send = (...payload) => { this.socket.send([{relays: this.urls}, payload]) } - onMessage = (websocketUrl, [{relays}, [verb, ...payload]]) => { + onMessage = (socket, [{relays}, [verb, ...payload]]) => { this.emit(verb, relays[0], ...payload) } cleanup = () => { - this.socket.off('message', this.onMessage) + this.removeAllListeners() + this.socket.off('receive', this.onMessage) } } diff --git a/src/Pool.ts b/src/Pool.ts index 3f3cc70..429d5a2 100644 --- a/src/Pool.ts +++ b/src/Pool.ts @@ -16,10 +16,10 @@ export class Pool extends EventEmitter { const socket = new Socket(url) this.data.set(url, socket) - this.emit('init', {url}) + this.emit('init', socket) - socket.on('open', () => this.emit('open', {url})) - socket.on('close', () => this.emit('close', {url})) + socket.on('open', () => this.emit('open', socket)) + socket.on('close', () => this.emit('close', socket)) } return this.data.get(url) @@ -28,7 +28,9 @@ export class Pool extends EventEmitter { const socket = this.data.get(url) if (socket) { + socket.disconnect() socket.removeAllListeners() + this.data.delete(url) } } diff --git a/src/Relay.ts b/src/Relay.ts index a77e48e..c69c95a 100644 --- a/src/Relay.ts +++ b/src/Relay.ts @@ -5,7 +5,7 @@ export class Relay extends EventEmitter { super() this.socket = socket - this.socket.on('message', this.onMessage) + this.socket.on('receive', this.onMessage) } get sockets() { return [this.socket] @@ -13,10 +13,11 @@ export class Relay extends EventEmitter { send(...payload) { this.socket.send(payload) } - onMessage = (url, [verb, ...payload]) => { - this.emit(verb, url, ...payload) + onMessage = (socket, [verb, ...payload]) => { + this.emit(verb, socket.url, ...payload) } cleanup = () => { - this.socket.off('message', this.onMessage) + this.removeAllListeners() + this.socket.off('receive', this.onMessage) } } diff --git a/src/Relays.ts b/src/Relays.ts index c9a8cd1..d0a6dbf 100644 --- a/src/Relays.ts +++ b/src/Relays.ts @@ -6,7 +6,7 @@ export class Relays extends EventEmitter { this.sockets = sockets this.sockets.forEach(socket => { - socket.on('message', this.onMessage) + socket.on('receive', this.onMessage) }) } send = (...payload) => { @@ -14,12 +14,13 @@ export class Relays extends EventEmitter { socket.send(payload) }) } - onMessage = (url, [verb, ...payload]) => { - this.emit(verb, url, ...payload) + onMessage = (socket, [verb, ...payload]) => { + this.emit(verb, socket.url, ...payload) } cleanup = () => { + this.removeAllListeners() this.sockets.forEach(socket => { - socket.off('message', this.onMessage) + socket.off('receive', this.onMessage) }) } } diff --git a/src/util/Socket.ts b/src/util/Socket.ts index 2c3a3f8..537c0e7 100644 --- a/src/util/Socket.ts +++ b/src/util/Socket.ts @@ -9,11 +9,11 @@ export class Socket extends EventEmitter { timeout?: NodeJS.Timeout queue: [string, any][] status: string - error?: Error static STATUS = { NEW: "new", PENDING: "pending", CLOSED: "closed", + ERROR: "error", READY: "ready", } constructor(url: string) { @@ -36,20 +36,24 @@ export class Socket extends EventEmitter { this.enqueueWork() } onOpen = () => { - this.error = undefined this.status = Socket.STATUS.READY this.ready.resolve() - this.emit('open') + this.emit('open', this) } - onError = (error: Error) => { - this.error = error - this.emit('fault', error) - } - onClose = () => { + onError = () => { this.disconnect() this.ready.reject() - this.status = Socket.STATUS.CLOSED - this.emit('close') + this.status = Socket.STATUS.ERROR + this.emit('fault', this) + } + onClose = () => { + if (this.status !== Socket.STATUS.ERROR) { + this.disconnect() + this.ready.reject() + this.status = Socket.STATUS.CLOSED + } + + this.emit('close', this) } connect = () => { const {NEW, CLOSED, PENDING} = Socket.STATUS @@ -60,7 +64,6 @@ export class Socket extends EventEmitter { this.ws = new WebSocket(this.url) this.ws.addEventListener("open", this.onOpen) this.ws.addEventListener("close", this.onClose) - // @ts-ignore this.ws.addEventListener("error", this.onError) // @ts-ignore this.ws.addEventListener("message", this.onMessage) @@ -77,12 +80,14 @@ export class Socket extends EventEmitter { } receiveMessage = (json: string) => { try { - this.emit('message', this.url, JSON.parse(json)) + this.emit('receive', this, JSON.parse(json)) } catch (e) { // pass } } sendMessage = (message: any) => { + this.emit('send', this, message) + // @ts-ignore this.ws.send(JSON.stringify(message)) }