Handle AUTH at the socket level
This commit is contained in:
+1
-1
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "paravel",
|
||||
"version": "0.2.1",
|
||||
"version": "0.2.2",
|
||||
"description": "Yet another toolkit for nostr",
|
||||
"repository": {
|
||||
"type": "git",
|
||||
|
||||
+40
-20
@@ -7,10 +7,12 @@ export class Socket extends EventEmitter {
|
||||
url: string
|
||||
ready: Deferred<void>
|
||||
timeout?: NodeJS.Timeout
|
||||
queue: [string, any][]
|
||||
receiveQueue: any[] = []
|
||||
sendQueue: any[] = []
|
||||
status: string
|
||||
static STATUS = {
|
||||
NEW: "new",
|
||||
UNAUTHORIZED: "unauthorized",
|
||||
PENDING: "pending",
|
||||
CLOSED: "closed",
|
||||
ERROR: "error",
|
||||
@@ -21,18 +23,17 @@ export class Socket extends EventEmitter {
|
||||
|
||||
this.url = url
|
||||
this.ready = defer()
|
||||
this.queue = []
|
||||
this.status = Socket.STATUS.NEW
|
||||
|
||||
this.setMaxListeners(100)
|
||||
}
|
||||
send = (message: any) => {
|
||||
this.connect()
|
||||
this.queue.push(['send', message])
|
||||
this.sendQueue.push(message)
|
||||
this.enqueueWork()
|
||||
}
|
||||
onMessage = (event: {data: string}) => {
|
||||
this.queue.push(['receive', event.data])
|
||||
this.receiveQueue.push(event.data)
|
||||
this.enqueueWork()
|
||||
}
|
||||
onOpen = () => {
|
||||
@@ -75,13 +76,27 @@ export class Socket extends EventEmitter {
|
||||
|
||||
// Avoid "WebSocket was closed before the connection was established"
|
||||
this.ready.then(() => ws.close(), () => null)
|
||||
this.ws.removeAllListeners()
|
||||
this.ws.removeEventListener("open", this.onOpen)
|
||||
this.ws.removeEventListener("close", this.onClose)
|
||||
this.ws.removeEventListener("error", this.onError)
|
||||
// @ts-ignore
|
||||
this.ws.removeEventListener("message", this.onMessage)
|
||||
this.ws = undefined
|
||||
}
|
||||
}
|
||||
receiveMessage = (json: string) => {
|
||||
try {
|
||||
this.emit('receive', this, JSON.parse(json))
|
||||
const message = JSON.parse(json)
|
||||
|
||||
if (message?.[0] == 'AUTH') {
|
||||
this.status = Socket.STATUS.UNAUTHORIZED
|
||||
}
|
||||
|
||||
if (message?.[0] == 'OK' && this.status === Socket.STATUS.UNAUTHORIZED) {
|
||||
this.status = Socket.STATUS.READY
|
||||
}
|
||||
|
||||
this.emit('receive', this, message)
|
||||
} catch (e) {
|
||||
// pass
|
||||
}
|
||||
@@ -92,31 +107,36 @@ export class Socket extends EventEmitter {
|
||||
// @ts-ignore
|
||||
this.ws.send(JSON.stringify(message))
|
||||
}
|
||||
shouldDeferWork = () => {
|
||||
// These sometimes get out of sync
|
||||
return this.status !== Socket.STATUS.READY || this.ws?.readyState !== 1
|
||||
shouldDefer = (payload: any[]) => {
|
||||
if (this.ws?.readyState !== 1) {
|
||||
return true
|
||||
}
|
||||
|
||||
if (this.status === Socket.STATUS.UNAUTHORIZED) {
|
||||
return payload?.[0] !== 'AUTH'
|
||||
}
|
||||
|
||||
return this.status !== Socket.STATUS.READY
|
||||
}
|
||||
doWork = () => {
|
||||
this.timeout = undefined
|
||||
|
||||
for (const [action, payload] of this.queue.splice(0, 10)) {
|
||||
if (action === 'receive') {
|
||||
this.receiveMessage(payload)
|
||||
}
|
||||
for (const payload of this.receiveQueue.splice(0, 10)) {
|
||||
this.receiveMessage(payload)
|
||||
}
|
||||
|
||||
if (action === 'send') {
|
||||
if (this.shouldDeferWork()) {
|
||||
this.queue.push(['send', payload])
|
||||
} else {
|
||||
this.sendMessage(payload)
|
||||
}
|
||||
for (const payload of this.sendQueue.splice(0, 10)) {
|
||||
if (this.shouldDefer(payload)) {
|
||||
this.sendQueue.push(payload)
|
||||
} else {
|
||||
this.sendMessage(payload)
|
||||
}
|
||||
}
|
||||
|
||||
this.enqueueWork()
|
||||
}
|
||||
enqueueWork = () => {
|
||||
if (!this.timeout && this.queue.length > 0) {
|
||||
if (!this.timeout && (this.receiveQueue.length > 0 || this.sendQueue.length > 0)) {
|
||||
this.timeout = setTimeout(() => this.doWork(), 100) as NodeJS.Timeout
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user