Send sockets to listeners rather than urls

This commit is contained in:
Jonathan Staab
2023-07-27 08:40:39 -07:00
parent e9d7ad166e
commit ef18009d33
6 changed files with 37 additions and 27 deletions
+1 -1
View File
@@ -1,6 +1,6 @@
{ {
"name": "paravel", "name": "paravel",
"version": "0.1.19", "version": "0.2.0",
"description": "Yet another toolkit for nostr", "description": "Yet another toolkit for nostr",
"repository": { "repository": {
"type": "git", "type": "git",
+4 -3
View File
@@ -6,7 +6,7 @@ export class Plex extends EventEmitter {
this.urls = urls this.urls = urls
this.socket = socket this.socket = socket
this.socket.on('message', this.onMessage) this.socket.on('receive', this.onMessage)
} }
get sockets() { get sockets() {
return [this.socket] return [this.socket]
@@ -14,10 +14,11 @@ export class Plex extends EventEmitter {
send = (...payload) => { send = (...payload) => {
this.socket.send([{relays: this.urls}, payload]) this.socket.send([{relays: this.urls}, payload])
} }
onMessage = (websocketUrl, [{relays}, [verb, ...payload]]) => { onMessage = (socket, [{relays}, [verb, ...payload]]) => {
this.emit(verb, relays[0], ...payload) this.emit(verb, relays[0], ...payload)
} }
cleanup = () => { cleanup = () => {
this.socket.off('message', this.onMessage) this.removeAllListeners()
this.socket.off('receive', this.onMessage)
} }
} }
+5 -3
View File
@@ -16,10 +16,10 @@ export class Pool extends EventEmitter {
const socket = new Socket(url) const socket = new Socket(url)
this.data.set(url, socket) this.data.set(url, socket)
this.emit('init', {url}) this.emit('init', socket)
socket.on('open', () => this.emit('open', {url})) socket.on('open', () => this.emit('open', socket))
socket.on('close', () => this.emit('close', {url})) socket.on('close', () => this.emit('close', socket))
} }
return this.data.get(url) return this.data.get(url)
@@ -28,7 +28,9 @@ export class Pool extends EventEmitter {
const socket = this.data.get(url) const socket = this.data.get(url)
if (socket) { if (socket) {
socket.disconnect()
socket.removeAllListeners() socket.removeAllListeners()
this.data.delete(url) this.data.delete(url)
} }
} }
+5 -4
View File
@@ -5,7 +5,7 @@ export class Relay extends EventEmitter {
super() super()
this.socket = socket this.socket = socket
this.socket.on('message', this.onMessage) this.socket.on('receive', this.onMessage)
} }
get sockets() { get sockets() {
return [this.socket] return [this.socket]
@@ -13,10 +13,11 @@ export class Relay extends EventEmitter {
send(...payload) { send(...payload) {
this.socket.send(payload) this.socket.send(payload)
} }
onMessage = (url, [verb, ...payload]) => { onMessage = (socket, [verb, ...payload]) => {
this.emit(verb, url, ...payload) this.emit(verb, socket.url, ...payload)
} }
cleanup = () => { cleanup = () => {
this.socket.off('message', this.onMessage) this.removeAllListeners()
this.socket.off('receive', this.onMessage)
} }
} }
+5 -4
View File
@@ -6,7 +6,7 @@ export class Relays extends EventEmitter {
this.sockets = sockets this.sockets = sockets
this.sockets.forEach(socket => { this.sockets.forEach(socket => {
socket.on('message', this.onMessage) socket.on('receive', this.onMessage)
}) })
} }
send = (...payload) => { send = (...payload) => {
@@ -14,12 +14,13 @@ export class Relays extends EventEmitter {
socket.send(payload) socket.send(payload)
}) })
} }
onMessage = (url, [verb, ...payload]) => { onMessage = (socket, [verb, ...payload]) => {
this.emit(verb, url, ...payload) this.emit(verb, socket.url, ...payload)
} }
cleanup = () => { cleanup = () => {
this.removeAllListeners()
this.sockets.forEach(socket => { this.sockets.forEach(socket => {
socket.off('message', this.onMessage) socket.off('receive', this.onMessage)
}) })
} }
} }
+17 -12
View File
@@ -9,11 +9,11 @@ export class Socket extends EventEmitter {
timeout?: NodeJS.Timeout timeout?: NodeJS.Timeout
queue: [string, any][] queue: [string, any][]
status: string status: string
error?: Error
static STATUS = { static STATUS = {
NEW: "new", NEW: "new",
PENDING: "pending", PENDING: "pending",
CLOSED: "closed", CLOSED: "closed",
ERROR: "error",
READY: "ready", READY: "ready",
} }
constructor(url: string) { constructor(url: string) {
@@ -36,20 +36,24 @@ export class Socket extends EventEmitter {
this.enqueueWork() this.enqueueWork()
} }
onOpen = () => { onOpen = () => {
this.error = undefined
this.status = Socket.STATUS.READY this.status = Socket.STATUS.READY
this.ready.resolve() this.ready.resolve()
this.emit('open') this.emit('open', this)
} }
onError = (error: Error) => { onError = () => {
this.error = error
this.emit('fault', error)
}
onClose = () => {
this.disconnect() this.disconnect()
this.ready.reject() this.ready.reject()
this.status = Socket.STATUS.CLOSED this.status = Socket.STATUS.ERROR
this.emit('close') this.emit('fault', this)
}
onClose = () => {
if (this.status !== Socket.STATUS.ERROR) {
this.disconnect()
this.ready.reject()
this.status = Socket.STATUS.CLOSED
}
this.emit('close', this)
} }
connect = () => { connect = () => {
const {NEW, CLOSED, PENDING} = Socket.STATUS const {NEW, CLOSED, PENDING} = Socket.STATUS
@@ -60,7 +64,6 @@ export class Socket extends EventEmitter {
this.ws = new WebSocket(this.url) this.ws = new WebSocket(this.url)
this.ws.addEventListener("open", this.onOpen) this.ws.addEventListener("open", this.onOpen)
this.ws.addEventListener("close", this.onClose) this.ws.addEventListener("close", this.onClose)
// @ts-ignore
this.ws.addEventListener("error", this.onError) this.ws.addEventListener("error", this.onError)
// @ts-ignore // @ts-ignore
this.ws.addEventListener("message", this.onMessage) this.ws.addEventListener("message", this.onMessage)
@@ -77,12 +80,14 @@ export class Socket extends EventEmitter {
} }
receiveMessage = (json: string) => { receiveMessage = (json: string) => {
try { try {
this.emit('message', this.url, JSON.parse(json)) this.emit('receive', this, JSON.parse(json))
} catch (e) { } catch (e) {
// pass // pass
} }
} }
sendMessage = (message: any) => { sendMessage = (message: any) => {
this.emit('send', this, message)
// @ts-ignore // @ts-ignore
this.ws.send(JSON.stringify(message)) this.ws.send(JSON.stringify(message))
} }