Add ensureAuth/authTimeout

This commit is contained in:
Jon Staab
2024-08-19 10:20:33 -07:00
parent bacc86823f
commit b89ec16997
2 changed files with 37 additions and 10 deletions
+19 -2
View File
@@ -1,7 +1,7 @@
import {Emitter, Worker} from '@welshman/lib' import {Emitter, Worker, sleep} from '@welshman/lib'
import {AuthStatus, ConnectionMeta} from './ConnectionMeta' import {AuthStatus, ConnectionMeta} from './ConnectionMeta'
import {Socket, isMessage, asMessage} from './Socket' import {Socket, isMessage, asMessage} from './Socket'
import type {SocketMessage} from './Socket' import type {SocketMessage, Message} from './Socket'
export class Connection extends Emitter { export class Connection extends Emitter {
url: string url: string
@@ -109,6 +109,23 @@ export class Connection extends Emitter {
} }
} }
ensureAuth = async ({timeout = 3000}) => {
await this.ensureConnected({shouldReconnect: true})
if (this.meta.authStatus === AuthStatus.Pending) {
await Promise.race([
sleep(timeout),
new Promise<void>(resolve => {
this.on('receive', (cxn: Connection, message: Message) => {
if (message[0] === 'OK' && message[2]) {
resolve()
}
})
})
])
}
}
disconnect() { disconnect() {
this.socket.disconnect() this.socket.disconnect()
this.sender.clear() this.sender.clear()
+18 -8
View File
@@ -1,4 +1,4 @@
import {Emitter, chunk, randomId, once, groupBy, uniq} from '@welshman/lib' import {Emitter, identity, max, chunk, randomId, once, groupBy, uniq} from '@welshman/lib'
import {matchFilters, unionFilters, SignedEvent} from '@welshman/util' import {matchFilters, unionFilters, SignedEvent} from '@welshman/util'
import type {Filter} from '@welshman/util' import type {Filter} from '@welshman/util'
import {Tracker} from "./Tracker" import {Tracker} from "./Tracker"
@@ -38,6 +38,7 @@ export type SubscribeRequest = {
timeout?: number timeout?: number
tracker?: Tracker tracker?: Tracker
closeOnEose?: boolean closeOnEose?: boolean
authTimeout?: number
} }
export type Subscription = { export type Subscription = {
@@ -84,6 +85,7 @@ export const mergeSubscriptions = (subs: Subscription[]) => {
relays: [relay], relays: [relay],
timeout: callerSubs[0].request.timeout, timeout: callerSubs[0].request.timeout,
closeOnEose: callerSubs[0].request.closeOnEose, closeOnEose: callerSubs[0].request.closeOnEose,
authTimeout: max(callerSubs.map(r => r.request.authTimeout!).filter(identity)),
filters: unionFilters(callerSubs.flatMap((sub: Subscription) => sub.request.filters)), filters: unionFilters(callerSubs.flatMap((sub: Subscription) => sub.request.filters)),
}) })
@@ -167,7 +169,7 @@ export const mergeSubscriptions = (subs: Subscription[]) => {
export const executeSubscription = (sub: Subscription) => { export const executeSubscription = (sub: Subscription) => {
const {request, emitter, tracker, controller} = sub const {request, emitter, tracker, controller} = sub
const {timeout, filters, closeOnEose, relays, signal} = request const {timeout, filters, closeOnEose, relays, signal, authTimeout = 0} = request
const executor = NetworkContext.getExecutor(relays) const executor = NetworkContext.getExecutor(relays)
const subs: {unsubscribe: () => void}[] = [] const subs: {unsubscribe: () => void}[] = []
const completedRelays = new Set() const completedRelays = new Set()
@@ -233,7 +235,7 @@ export const executeSubscription = (sub: Subscription) => {
controller.signal.addEventListener('abort', onComplete) controller.signal.addEventListener('abort', onComplete)
// If we have a timeout, complete the subscription automatically // If we have a timeout, complete the subscription automatically
if (timeout) setTimeout(onComplete, timeout) if (timeout) setTimeout(onComplete, timeout + authTimeout)
// If one of our connections gets closed make sure to kill our sub // If one of our connections gets closed make sure to kill our sub
executor.target.connections.forEach((c: Connection) => c.on('close', onClose)) executor.target.connections.forEach((c: Connection) => c.on('close', onClose))
@@ -241,11 +243,19 @@ export const executeSubscription = (sub: Subscription) => {
// Finally, start our subscription. If we didn't get any filters, don't even send the // Finally, start our subscription. If we didn't get any filters, don't even send the
// request, just close it. This can be valid when a caller fulfills a request themselves. // request, just close it. This can be valid when a caller fulfills a request themselves.
if (filters.length > 0) { if (filters.length > 0) {
// If we send too many filters in a request relays will refuse to respond. REQs are rate Promise.all(
// limited client-side by Connection, so this will throttle concurrent requests. executor.target.connections.map(async (connection: Connection) => {
for (const filtersChunk of chunk(8, filters)) { if (authTimeout) {
subs.push(executor.subscribe(filtersChunk, {onEvent, onEose})) await connection.ensureAuth({timeout: authTimeout})
} }
})
).then(() => {
// If we send too many filters in a request relays will refuse to respond. REQs are rate
// limited client-side by Connection, so this will throttle concurrent requests.
for (const filtersChunk of chunk(8, filters)) {
subs.push(executor.subscribe(filtersChunk, {onEvent, onEose}))
}
})
} else { } else {
onComplete() onComplete()
} }