Re-work connection auth
This commit is contained in:
@@ -1,7 +1,8 @@
|
||||
import {ctx, Emitter, Worker, sleep} from '@welshman/lib'
|
||||
import {AuthStatus, ConnectionMeta} from './ConnectionMeta'
|
||||
import {Emitter, Worker, sleep} from '@welshman/lib'
|
||||
import {ConnectionMeta} from './ConnectionMeta'
|
||||
import {ConnectionAuth, AuthStatus} from './ConnectionAuth'
|
||||
import {Socket, isMessage, asMessage} from './Socket'
|
||||
import type {SocketMessage, Message} from './Socket'
|
||||
import type {SocketMessage} from './Socket'
|
||||
|
||||
export class Connection extends Emitter {
|
||||
url: string
|
||||
@@ -9,6 +10,7 @@ export class Connection extends Emitter {
|
||||
sender: Worker<SocketMessage>
|
||||
receiver: Worker<SocketMessage>
|
||||
meta: ConnectionMeta
|
||||
auth: ConnectionAuth
|
||||
|
||||
constructor(url: string) {
|
||||
super()
|
||||
@@ -18,6 +20,7 @@ export class Connection extends Emitter {
|
||||
this.sender = this.createSender()
|
||||
this.receiver = this.createReceiver()
|
||||
this.meta = new ConnectionMeta(this)
|
||||
this.auth = new ConnectionAuth(this)
|
||||
this.setMaxListeners(100)
|
||||
}
|
||||
|
||||
@@ -45,7 +48,7 @@ export class Connection extends Emitter {
|
||||
}
|
||||
|
||||
// Only defer for auth if we're not multiplexing
|
||||
if (isMessage(message) && ![AuthStatus.Ok, AuthStatus.Pending].includes(this.meta.authStatus)) {
|
||||
if (isMessage(message) && ![AuthStatus.None, AuthStatus.Ok].includes(this.auth.status)) {
|
||||
return true
|
||||
}
|
||||
|
||||
@@ -93,14 +96,6 @@ export class Connection extends Emitter {
|
||||
}
|
||||
|
||||
onReceive = (message: SocketMessage) => {
|
||||
const [verb, ...extra] = asMessage(message)
|
||||
|
||||
if (verb === 'AUTH') {
|
||||
const [challenge] = extra
|
||||
|
||||
ctx.net.onAuth(this.url, challenge)
|
||||
}
|
||||
|
||||
this.emit('receive', this, message)
|
||||
}
|
||||
|
||||
@@ -121,26 +116,6 @@ export class Connection extends Emitter {
|
||||
}
|
||||
}
|
||||
|
||||
ensureAuth = async ({timeout = 3000} = {}) => {
|
||||
await this.ensureConnected()
|
||||
|
||||
if ([AuthStatus.Unauthorized, AuthStatus.Pending].includes(this.meta.authStatus)) {
|
||||
await Promise.race([
|
||||
sleep(timeout),
|
||||
new Promise<void>(resolve => {
|
||||
const onReceive = (cxn: Connection, message: Message) => {
|
||||
if (message[0] === 'OK' && message[2]) {
|
||||
this.off('receive', onReceive)
|
||||
resolve()
|
||||
}
|
||||
}
|
||||
|
||||
this.on('receive', onReceive)
|
||||
})
|
||||
])
|
||||
}
|
||||
}
|
||||
|
||||
disconnect() {
|
||||
this.socket.disconnect()
|
||||
this.sender.clear()
|
||||
|
||||
Reference in New Issue
Block a user