Re-work waiting for auth
This commit is contained in:
@@ -14,7 +14,7 @@ export const feedLoader = new FeedLoader({
|
|||||||
await load({onEvent, filters, relays})
|
await load({onEvent, filters, relays})
|
||||||
} else {
|
} else {
|
||||||
await Promise.all(
|
await Promise.all(
|
||||||
Array.from(getFilterSelections(filters))
|
getFilterSelections(filters)
|
||||||
.map(opts => load({onEvent, ...opts}))
|
.map(opts => load({onEvent, ...opts}))
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -312,16 +312,15 @@ export class RouterScenario {
|
|||||||
export const getRelayQuality = (url: string) => {
|
export const getRelayQuality = (url: string) => {
|
||||||
const relay = relaysByUrl.get().get(url)
|
const relay = relaysByUrl.get().get(url)
|
||||||
|
|
||||||
if (!relay?.stats) return 1
|
if (relay?.stats) {
|
||||||
|
if (relay.stats.recent_errors.filter(n => n > ago(5)).length > 0) return 0
|
||||||
|
if (relay.stats.recent_errors.filter(n => n > ago(MINUTE)).length > 1) return 0
|
||||||
|
if (relay.stats.recent_errors.filter(n => n > ago(HOUR)).length > 5) return 0
|
||||||
|
if (relay.stats.recent_errors.filter(n => n > ago(DAY)).length > 20) return 0
|
||||||
|
if (relay.stats.recent_errors.filter(n => n > ago(WEEK)).length > 100) return 0
|
||||||
|
}
|
||||||
|
|
||||||
const {recent_errors} = relay.stats
|
return 1
|
||||||
const last_error = last(recent_errors) || 0
|
|
||||||
|
|
||||||
if (recent_errors.filter(n => n > ago(HOUR)).length > 5) return 0
|
|
||||||
if (recent_errors.filter(n => n > ago(DAY)).length > 20) return 0
|
|
||||||
if (recent_errors.filter(n => n > ago(WEEK)).length > 100) return 0
|
|
||||||
|
|
||||||
return Math.max(0, Math.min(0.5, (ago(MINUTE) - last_error) / HOUR))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
export const getPubkeyRelays = (pubkey: string, mode?: string) => {
|
export const getPubkeyRelays = (pubkey: string, mode?: string) => {
|
||||||
|
|||||||
@@ -39,17 +39,13 @@ export class Connection extends Emitter {
|
|||||||
emit = (type: ConnectionEvent, ...args: any[]) => super.emit(type, this, ...args)
|
emit = (type: ConnectionEvent, ...args: any[]) => super.emit(type, this, ...args)
|
||||||
|
|
||||||
send = async (message: Message) => {
|
send = async (message: Message) => {
|
||||||
await this.open()
|
await this.socket.open()
|
||||||
|
|
||||||
if (this.status === Ready) {
|
if (this.status === Ready) {
|
||||||
this.sender.push(message)
|
this.sender.push(message)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
open = async () => {
|
|
||||||
await this.socket.open()
|
|
||||||
}
|
|
||||||
|
|
||||||
close = async () => {
|
close = async () => {
|
||||||
this.status = Closing
|
this.status = Closing
|
||||||
|
|
||||||
|
|||||||
@@ -37,9 +37,10 @@ export class ConnectionAuth {
|
|||||||
|
|
||||||
constructor(readonly cxn: Connection) {
|
constructor(readonly cxn: Connection) {
|
||||||
this.cxn.on(ConnectionEvent.Close, this.#onClose)
|
this.cxn.on(ConnectionEvent.Close, this.#onClose)
|
||||||
|
this.cxn.on(ConnectionEvent.Receive, this.#onReceive)
|
||||||
}
|
}
|
||||||
|
|
||||||
#onMessage = (cxn: Connection, [verb, ...extra]: Message) => {
|
#onReceive = (cxn: Connection, [verb, ...extra]: Message) => {
|
||||||
if (verb === 'OK') {
|
if (verb === 'OK') {
|
||||||
const [id, ok, message] = extra
|
const [id, ok, message] = extra
|
||||||
|
|
||||||
@@ -56,7 +57,7 @@ export class ConnectionAuth {
|
|||||||
this.status = Requested
|
this.status = Requested
|
||||||
|
|
||||||
if (ctx.net.authMode === AuthMode.Implicit) {
|
if (ctx.net.authMode === AuthMode.Implicit) {
|
||||||
this.attempt()
|
this.respond()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -68,7 +69,25 @@ export class ConnectionAuth {
|
|||||||
this.status = None
|
this.status = None
|
||||||
}
|
}
|
||||||
|
|
||||||
attempt = async () => {
|
waitFor = async (condition: () => boolean, timeout = 300) => {
|
||||||
|
const start = Date.now()
|
||||||
|
|
||||||
|
while (Date.now() - timeout <= start) {
|
||||||
|
await sleep(100)
|
||||||
|
|
||||||
|
if (condition()) {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
waitForChallenge = async (timeout = 300) =>
|
||||||
|
this.waitFor(() => Boolean(this.challenge), timeout)
|
||||||
|
|
||||||
|
waitForResolution = async (timeout = 300) =>
|
||||||
|
this.waitFor(() => [None, DeniedSignature, Forbidden, Ok].includes(this.status), timeout)
|
||||||
|
|
||||||
|
respond = async () => {
|
||||||
if (!this.challenge) {
|
if (!this.challenge) {
|
||||||
throw new Error("Attempted to authenticate with no challenge")
|
throw new Error("Attempted to authenticate with no challenge")
|
||||||
}
|
}
|
||||||
@@ -86,7 +105,10 @@ export class ConnectionAuth {
|
|||||||
],
|
],
|
||||||
})
|
})
|
||||||
|
|
||||||
const [event] = await Promise.all([ctx.net.signEvent(template), this.cxn.open()])
|
const [event] = await Promise.all([
|
||||||
|
ctx.net.signEvent(template),
|
||||||
|
this.cxn.socket.open(),
|
||||||
|
])
|
||||||
|
|
||||||
if (event) {
|
if (event) {
|
||||||
this.request = event.id
|
this.request = event.id
|
||||||
@@ -97,33 +119,14 @@ export class ConnectionAuth {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
attemptIfRequested = async () => {
|
attempt = async (timeout = 300) => {
|
||||||
|
await this.cxn.socket.open()
|
||||||
|
await this.waitForChallenge(timeout)
|
||||||
|
|
||||||
if (this.status === Requested) {
|
if (this.status === Requested) {
|
||||||
await this.attempt()
|
await this.respond()
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
wait = async ({timeout = 3000}: {timeout?: number} = {}) => {
|
await this.waitForResolution(timeout)
|
||||||
const deadline = Date.now() + timeout
|
|
||||||
|
|
||||||
while (Date.now() < deadline) {
|
|
||||||
await sleep(100)
|
|
||||||
|
|
||||||
// State got reset while we were waiting
|
|
||||||
if ([None, Requested].includes(this.status)) {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
|
|
||||||
// We've completed the auth flow
|
|
||||||
if ([DeniedSignature, Forbidden, Ok].includes(this.status)) {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
waitIfPending = async ({timeout = 3000}: {timeout?: number} = {}) => {
|
|
||||||
while ([PendingSignature, PendingResponse].includes(this.status)) {
|
|
||||||
await this.wait({timeout})
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -63,9 +63,16 @@ export class Socket {
|
|||||||
}
|
}
|
||||||
|
|
||||||
close = async () => {
|
close = async () => {
|
||||||
|
if (this.cxn.url === 'wss://filter.nostr.wine/') {
|
||||||
|
console.trace('closing')
|
||||||
|
}
|
||||||
this.worker.pause()
|
this.worker.pause()
|
||||||
this.ws?.close()
|
this.ws?.close()
|
||||||
|
|
||||||
|
// Allow the socket to start closing before waiting
|
||||||
|
await sleep(100)
|
||||||
|
|
||||||
|
// Wait for the socket to fully clos
|
||||||
await this.wait()
|
await this.wait()
|
||||||
|
|
||||||
this.ws = undefined
|
this.ws = undefined
|
||||||
|
|||||||
@@ -300,10 +300,8 @@ const _executeSubscription = (sub: Subscription) => {
|
|||||||
if (filters.length > 0) {
|
if (filters.length > 0) {
|
||||||
Promise.all(
|
Promise.all(
|
||||||
executor.target.connections.map(async (connection: Connection) => {
|
executor.target.connections.map(async (connection: Connection) => {
|
||||||
await connection.open()
|
|
||||||
|
|
||||||
if (authTimeout) {
|
if (authTimeout) {
|
||||||
await connection.auth.waitIfPending({timeout: authTimeout})
|
await connection.auth.attempt(authTimeout)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
).then(() => {
|
).then(() => {
|
||||||
|
|||||||
Reference in New Issue
Block a user