From ecb08cace9518162e7cb71f7f46e7538ecd56817 Mon Sep 17 00:00:00 2001 From: Jon Staab Date: Tue, 5 Nov 2024 12:05:04 -0800 Subject: [PATCH] Re-work connections and relay stats --- packages/app/src/relays.ts | 164 +++++++++++++++++++------- packages/app/src/router.ts | 70 +++-------- packages/app/src/storage.ts | 4 +- packages/lib/src/Worker.ts | 21 +++- packages/net/src/Connection.ts | 147 ++++++----------------- packages/net/src/ConnectionAuth.ts | 29 ++--- packages/net/src/ConnectionEvent.ts | 11 ++ packages/net/src/ConnectionMeta.ts | 157 ------------------------- packages/net/src/ConnectionSender.ts | 53 +++++++++ packages/net/src/ConnectionState.ts | 91 +++++++++++++++ packages/net/src/ConnectionStats.ts | 97 ++++++++++++++++ packages/net/src/Pool.ts | 29 ++--- packages/net/src/Socket.ts | 168 ++++++++++++++++----------- packages/net/src/Subscribe.ts | 12 +- packages/net/src/index.ts | 6 +- packages/net/src/target/Plex.ts | 28 ----- packages/net/src/target/Relay.ts | 5 +- packages/net/src/target/Relays.ts | 3 +- packages/util/src/Profile.ts | 2 +- 19 files changed, 589 insertions(+), 508 deletions(-) create mode 100644 packages/net/src/ConnectionEvent.ts delete mode 100644 packages/net/src/ConnectionMeta.ts create mode 100644 packages/net/src/ConnectionSender.ts create mode 100644 packages/net/src/ConnectionState.ts create mode 100644 packages/net/src/ConnectionStats.ts delete mode 100644 packages/net/src/target/Plex.ts diff --git a/packages/app/src/relays.ts b/packages/app/src/relays.ts index 38f1ed6..2170f14 100644 --- a/packages/app/src/relays.ts +++ b/packages/app/src/relays.ts @@ -1,29 +1,56 @@ import {writable, derived} from 'svelte/store' import {withGetter} from '@welshman/store' -import {ctx, groupBy, indexBy, batch, now, uniq, batcher, postJson} from '@welshman/lib' +import {ctx, groupBy, indexBy, batch, now, ago, uniq, batcher, postJson} from '@welshman/lib' import type {RelayProfile} from "@welshman/util" import {normalizeRelayUrl, displayRelayUrl, displayRelayProfile} from "@welshman/util" -import {asMessage, type Connection, type SocketMessage} from '@welshman/net' +import {ConnectionEvent} from '@welshman/net' +import type {Connection, Message} from '@welshman/net' import {collection} from './collection' export type RelayStats = { first_seen: number - event_count: number - request_count: number - publish_count: number - connect_count: number recent_errors: number[] + open_count: number + close_count: number + publish_count: number + request_count: number + event_count: number + last_open: number + last_close: number + last_error: number + last_publish: number + last_request: number + last_event: number + last_auth: number + publish_timer: number + publish_success_count: number + publish_failure_count: number + eose_count: number + eose_timer: number + notice_count: number } -// Relays - export const makeRelayStats = (): RelayStats => ({ first_seen: now(), - event_count: 0, - request_count: 0, - publish_count: 0, - connect_count: 0, recent_errors: [], + open_count: 0, + close_count: 0, + publish_count: 0, + request_count: 0, + event_count: 0, + last_open: 0, + last_close: 0, + last_error: 0, + last_publish: 0, + last_request: 0, + last_event: 0, + last_auth: 0, + publish_timer: 0, + publish_success_count: 0, + publish_failure_count: 0, + eose_count: 0, + eose_timer: 0, + notice_count: 0, }) export type Relay = { @@ -106,52 +133,109 @@ const updateRelayStats = batch(500, (updates: RelayStatsUpdate[]) => { if (!$relay.stats) { $relay.stats = makeRelayStats() + } else if ($relay.stats.notice_count === undefined) { + // Migrate from old stats + $relay.stats = {...makeRelayStats(), ...$relay.stats} } for (const [_, update] of items) { update($relay.stats) } - $relaysByUrl.set(url, $relay) + // Copy so the database gets updated, since we're mutating in updates + $relaysByUrl.set(url, {...$relay}) } return Array.from($relaysByUrl.values()) }) }) -const onConnectionSend = ({url}: Connection, socketMessage: SocketMessage) => { - const [verb] = asMessage(socketMessage) - - if (verb === 'REQ') { - updateRelayStats([url, stats => ++stats.request_count]) - } else if (verb === 'EVENT') { - updateRelayStats([url, stats => ++stats.publish_count]) - } -} - -const onConnectionReceive = ({url}: Connection, socketMessage: SocketMessage) => { - const [verb] = asMessage(socketMessage) - - if (verb === 'EVENT') { - updateRelayStats([url, stats => ++stats.event_count]) - } -} - -const onConnectionFault = ({url}: Connection) => +const onConnectionOpen = ({url}: Connection) => updateRelayStats([url, stats => { + stats.last_open = now() + stats.open_count++ + }]) + +const onConnectionClose = ({url}: Connection) => + updateRelayStats([url, stats => { + stats.last_close = now() + stats.close_count++ + }]) + +const onConnectionSend = ({url}: Connection, [verb]: Message) => { + if (verb === 'REQ') { + updateRelayStats([url, stats => { + stats.request_count++ + stats.last_request = now() + }]) + } else if (verb === 'EVENT') { + updateRelayStats([url, stats => { + stats.publish_count++ + stats.last_publish = now() + }]) + } +} + +const onConnectionReceive = ({url, state}: Connection, [verb, ...extra]: Message) => { + if (verb === 'OK') { + const [eventId, ok] = extra + const pub = state.pendingPublishes.get(eventId) + + updateRelayStats([url, stats => { + if (pub) { + stats.publish_timer += ago(pub.sent) + } + + if (ok) { + stats.publish_success_count++ + } else { + stats.publish_failure_count++ + } + }]) + } else if (verb === 'AUTH') { + updateRelayStats([url, stats => { + stats.last_auth = now() + }]) + } else if (verb === 'EVENT') { + updateRelayStats([url, stats => { + stats.event_count++ + stats.last_event = now() + }]) + } else if (verb === 'EOSE') { + const request = state.pendingRequests.get(extra[0]) + + // Only count the first eose + if (request && !request.eose) { + updateRelayStats([url, stats => { + stats.eose_count++ + stats.eose_timer += now() - request.sent + }]) + } + } else if (verb === 'NOTICE') { + updateRelayStats([url, stats => { + stats.notice_count++ + }]) + } +} + +const onConnectionError = ({url}: Connection) => + updateRelayStats([url, stats => { + stats.last_error = now() stats.recent_errors = stats.recent_errors.concat(now()).slice(-10) }]) export const trackRelayStats = (connection: Connection) => { - updateRelayStats([connection.url, stats => ++stats.connect_count]) - - connection.on('send', onConnectionSend) - connection.on('receive', onConnectionReceive) - connection.on('fault', onConnectionFault) + connection.on(ConnectionEvent.Open, onConnectionOpen) + connection.on(ConnectionEvent.Close, onConnectionClose) + connection.on(ConnectionEvent.Send, onConnectionSend) + connection.on(ConnectionEvent.Receive, onConnectionReceive) + connection.on(ConnectionEvent.Error, onConnectionError) return () => { - connection.off('send', onConnectionSend) - connection.off('receive', onConnectionReceive) - connection.off('fault', onConnectionFault) + connection.off(ConnectionEvent.Open, onConnectionOpen) + connection.off(ConnectionEvent.Close, onConnectionClose) + connection.off(ConnectionEvent.Send, onConnectionSend) + connection.off(ConnectionEvent.Receive, onConnectionReceive) + connection.off(ConnectionEvent.Error, onConnectionError) } } diff --git a/packages/app/src/router.ts b/packages/app/src/router.ts index a4607b3..7dfbcae 100644 --- a/packages/app/src/router.ts +++ b/packages/app/src/router.ts @@ -129,7 +129,7 @@ export class Router { getRelaysForUser = (mode?: RelayMode) => { const pubkey = this.options.getUserPubkey?.() - return pubkey ? this.getRelaysForPubkey(pubkey) : [] + return pubkey ? this.getRelaysForPubkey(pubkey, mode) : [] } // Utilities for creating scenarios @@ -274,10 +274,14 @@ export class RouterScenario { } const scoreRelay = (relay: string) => { - const quality = this.router.options.getRelayQuality?.(relay) || 1 + const {getRelayQuality = always(1)} = this.router.options + const quality = getRelayQuality(relay) const weight = relayWeights.get(relay)! - return -(quality * weight) + // Log the weight, since it's a straight count which ends up over-weighting hubs. + // Also add some random noise so that we'll occasionally pick lower quality/less + // popular relays. + return -(quality * inc(Math.log(weight)) * Math.random()) } const relays = take( @@ -306,51 +310,18 @@ export class RouterScenario { // Default router options export const getRelayQuality = (url: string) => { - const oneMinute = 60 * 1000 - const oneHour = 60 * oneMinute - const oneDay = 24 * oneHour - const oneWeek = 7 * oneDay const relay = relaysByUrl.get().get(url) - const connect_count = relay?.stats?.connect_count || 0 - const recent_errors = relay?.stats?.recent_errors || [] - const connection = ctx.net.pool.get(url, {autoConnect: false}) - // If we haven't connected, consult our relay record and see if there has - // been a recent fault. If there has been, penalize the relay. If there have been several, - // don't use the relay. - if (!connection) { - const lastFault = last(recent_errors) || 0 + if (!relay?.stats) return 1 - if (recent_errors.filter(n => n > now() - oneHour).length > 10) { - return 0 - } + const {recent_errors} = relay.stats + const last_error = last(recent_errors) || 0 - if (recent_errors.filter(n => n > now() - oneDay).length > 50) { - return 0 - } + if (recent_errors.filter(n => n > ago(HOUR)).length > 5) return 0 + if (recent_errors.filter(n => n > ago(DAY)).length > 20) return 0 + if (recent_errors.filter(n => n > ago(WEEK)).length > 100) return 0 - if (recent_errors.filter(n => n > now() - oneWeek).length > 100) { - return 0 - } - - return Math.max(0, Math.min(0.5, (now() - oneMinute - lastFault) / oneHour)) - } - - const authScore = switcher(connection.auth.status, { - [AuthStatus.Forbidden]: 0, - [AuthStatus.Ok]: 1, - default: 0.5, - }) - - const connectionScore = switcher(connection.meta.getStatus(), { - [ConnectionStatus.Error]: 0, - [ConnectionStatus.Closed]: 0.6, - [ConnectionStatus.Slow]: 0.5, - [ConnectionStatus.Ok]: 1, - default: clamp([0.5, 1], connect_count / 1000), - }) - - return authScore * connectionScore + return Math.max(0, Math.min(0.5, (ago(MINUTE) - last_error) / HOUR)) } export const getPubkeyRelays = (pubkey: string, mode?: string) => { @@ -402,9 +373,6 @@ type FilterScenario = {filter: Filter, scenario: RouterScenario} type FilterSelectionRule = (filter: Filter) => FilterScenario[] -export const getFilterSelectionsForLocalRelay = (filter: Filter) => - [{filter, scenario: ctx.app.router.FromRelays([LOCAL_RELAY_URL])}] - export const getFilterSelectionsForSearch = (filter: Filter) => { if (!filter.search) return [] @@ -438,7 +406,7 @@ export const getFilterSelectionsForIndexedKinds = (filter: Filter) => { export const getFilterSelectionsForAuthors = (filter: Filter) => { if (!filter.authors) return [] - const chunkCount = clamp([1, 4], Math.round(filter.authors.length / 50)) + const chunkCount = clamp([1, 30], Math.round(filter.authors.length / 30)) return chunks(chunkCount, filter.authors) .map(authors => ({ @@ -448,10 +416,9 @@ export const getFilterSelectionsForAuthors = (filter: Filter) => { } export const getFilterSelectionsForUser = (filter: Filter) => - [{filter, scenario: ctx.app.router.ForUser().weight(0.5)}] + [{filter, scenario: ctx.app.router.ForUser().weight(0.2)}] export const defaultFilterSelectionRules = [ - getFilterSelectionsForLocalRelay, getFilterSelectionsForSearch, getFilterSelectionsForWraps, getFilterSelectionsForIndexedKinds, @@ -461,7 +428,7 @@ export const defaultFilterSelectionRules = [ export const getFilterSelections = ( filters: Filter[], - rules: FilterSelectionRule[] = defaultFilterSelectionRules + rules: FilterSelectionRule[] = defaultFilterSelectionRules ): RelaysAndFilters[] => { const filtersById = new Map() const scenariosById = new Map() @@ -479,8 +446,9 @@ export const getFilterSelections = ( for (const [id, filter] of filtersById.entries()) { const scenario = ctx.app.router.merge(scenariosById.get(id) || []) + const relays = scenario.getUrls().concat(LOCAL_RELAY_URL) - result.push({filters: [filter], relays: scenario.getUrls()}) + result.push({filters: [filter], relays}) } return result diff --git a/packages/app/src/storage.ts b/packages/app/src/storage.ts index 53b7de5..b531dae 100644 --- a/packages/app/src/storage.ts +++ b/packages/app/src/storage.ts @@ -3,7 +3,7 @@ import type {IDBPDatabase} from "idb" import {throttle} from "throttle-debounce" import {writable} from "svelte/store" import type {Unsubscriber, Writable} from "svelte/store" -import {indexBy, equals, fromPairs} from "@welshman/lib" +import {indexBy, fromPairs} from "@welshman/lib" import type {TrustedEvent, Repository} from "@welshman/util" import type {Tracker} from "@welshman/net" import {withGetter, adapter, throttled, custom} from "@welshman/store" @@ -60,7 +60,7 @@ export const initIndexedDbAdapter = async (name: string, adapter: IndexedDbAdapt const removedRecords = prevRecords.filter(r => !currentIds.has(r[adapter.keyPath])) const prevRecordsById = indexBy(item => item[adapter.keyPath], prevRecords) - const updatedRecords = currentRecords.filter(r => !equals(r, prevRecordsById.get(r[adapter.keyPath]))) + const updatedRecords = currentRecords.filter(r => r !== prevRecordsById.get(r[adapter.keyPath])) prevRecords = currentRecords diff --git a/packages/lib/src/Worker.ts b/packages/lib/src/Worker.ts index 0085dc7..7d4e4ca 100644 --- a/packages/lib/src/Worker.ts +++ b/packages/lib/src/Worker.ts @@ -8,7 +8,8 @@ export type WorkerOpts = { export class Worker { buffer: T[] = [] handlers: Map void>> = new Map() - timeout: number | undefined + #timeout: number | undefined + #paused = false constructor(readonly opts: WorkerOpts = {}) {} @@ -46,13 +47,13 @@ export class Worker { } } - this.timeout = undefined + this.#timeout = undefined this.#enqueueWork() } #enqueueWork = () => { - if (!this.timeout && this.buffer.length > 0) { - this.timeout = setTimeout(this.#doWork, 50) as unknown as number + if (!this.#paused && !this.#timeout && this.buffer.length > 0) { + this.#timeout = setTimeout(this.#doWork, 50) as unknown as number } } @@ -73,7 +74,15 @@ export class Worker { this.buffer = [] } - stop() { - clearTimeout(this.timeout) + pause() { + clearTimeout(this.#timeout) + + this.#paused = true + this.#timeout = undefined + } + + resume() { + this.#paused = false + this.#enqueueWork() } } diff --git a/packages/net/src/Connection.ts b/packages/net/src/Connection.ts index 8d7d97c..36ab9a8 100644 --- a/packages/net/src/Connection.ts +++ b/packages/net/src/Connection.ts @@ -1,135 +1,62 @@ -import {Emitter, Worker, sleep} from '@welshman/lib' -import {AUTH_JOIN} from '@welshman/util' -import {ConnectionMeta} from './ConnectionMeta' -import {ConnectionAuth, AuthStatus} from './ConnectionAuth' -import {Socket, isMessage, asMessage} from './Socket' -import type {SocketMessage} from './Socket' +import {Emitter} from '@welshman/lib' +import {Socket} from './Socket' +import type {Message} from './Socket' +import {ConnectionEvent} from './ConnectionEvent' +import {ConnectionState} from './ConnectionState' +import {ConnectionStats} from './ConnectionStats' +import {ConnectionAuth} from './ConnectionAuth' +import {ConnectionSender} from './ConnectionSender' + +export enum ConnectionStatus { + Ready = "ready", + Closed = "Closed", + Closing = "Closing", +} + +const {Ready, Closed, Closing} = ConnectionStatus export class Connection extends Emitter { url: string socket: Socket - sender: Worker - receiver: Worker - meta: ConnectionMeta + sender: ConnectionSender + state: ConnectionState + stats: ConnectionStats auth: ConnectionAuth + status = Ready constructor(url: string) { super() this.url = url - this.socket = new Socket(url, this) - this.sender = this.createSender() - this.receiver = this.createReceiver() - this.meta = new ConnectionMeta(this) + this.socket = new Socket(this) + this.sender = new ConnectionSender(this) + this.state = new ConnectionState(this) + this.stats = new ConnectionStats(this) this.auth = new ConnectionAuth(this) this.setMaxListeners(100) } - createSender = () => { - const worker = new Worker({ - shouldDefer: (message: SocketMessage) => { - if (!this.socket.isOpen()) { - return true - } + emit = (type: ConnectionEvent, ...args: any[]) => super.emit(type, this, ...args) - const [verb, ...extra] = asMessage(message) + send = async (message: Message) => { + await this.open() - if (verb === 'AUTH') { - return false - } - - // Only close reqs that have been sent - if (verb === 'CLOSE') { - return !this.meta.pendingRequests.has(extra[0]) - } - - // Allow relay requests through - if (verb === 'EVENT' && extra[0].kind === AUTH_JOIN) { - return false - } - - // Only defer for auth if we're not multiplexing - if (isMessage(message) && ![AuthStatus.None, AuthStatus.Ok].includes(this.auth.status)) { - return true - } - - if (verb === 'REQ') { - return this.meta.pendingRequests.size >= 8 - } - - return false - } - }) - - worker.addGlobalHandler((message: SocketMessage) => { - // If we ended up handling a CLOSE before we handled the REQ, don't send the REQ - if (message[0] === 'CLOSE') { - worker.buffer = worker.buffer.filter(m => !(m[0] === 'REQ' && m[1] === message[1])) - } - - this.onSend(message) - }) - - return worker - } - - createReceiver = () => { - const worker = new Worker() - - worker.addGlobalHandler(this.onReceive) - - return worker - } - - send = (m: SocketMessage) => this.sender.push(m) - - onOpen = () => this.emit('open', this) - - onClose = () => this.emit('close', this) - - onFault = () => this.emit('fault', this) - - onMessage = (m: SocketMessage) => this.receiver.push(m) - - onSend = (message: SocketMessage) => { - this.emit('send', this, message) - this.socket.send(message) - } - - onReceive = (message: SocketMessage) => { - this.emit('receive', this, message) - } - - ensureConnected = async ({shouldReconnect = true} = {}) => { - const isUnhealthy = this.socket.isClosing() || this.socket.isClosed() - const noRecentFault = this.meta.lastFault < Date.now() - 60_000 - - if (shouldReconnect && isUnhealthy && noRecentFault) { - await this.disconnect() - } - - if (this.socket.isNew()) { - await this.socket.connect() - } - - while (this.socket.isConnecting()) { - await sleep(100) + if (this.status === Ready) { + this.sender.push(message) } } - async disconnect() { - await this.socket.disconnect() - - this.sender.clear() - this.receiver.clear() - this.meta.clearPending() + open = async () => { + await this.socket.open() } - async destroy() { - await this.disconnect() + close = async () => { + this.status = Closing + await this.sender.close() + await this.socket.close() + + this.status = Closed this.removeAllListeners() - this.sender.stop() - this.receiver.stop() } } diff --git a/packages/net/src/ConnectionAuth.ts b/packages/net/src/ConnectionAuth.ts index 3a7568d..45778ad 100644 --- a/packages/net/src/ConnectionAuth.ts +++ b/packages/net/src/ConnectionAuth.ts @@ -1,8 +1,8 @@ import {ctx, sleep} from '@welshman/lib' import {CLIENT_AUTH, createEvent} from '@welshman/util' +import {ConnectionEvent} from './ConnectionEvent' import type {Connection} from './Connection' -import type {SocketMessage} from './Socket' -import {asMessage} from './Socket' +import type {Message} from './Socket' export enum AuthMode { Implicit = 'implicit', @@ -35,14 +35,11 @@ export class ConnectionAuth { message: string | undefined status = None - constructor(readonly connection: Connection) { - this.connection.on('receive', this.#onReceive) - this.connection.on('close', this.#onClose) + constructor(readonly cxn: Connection) { + this.cxn.on(ConnectionEvent.Close, this.#onClose) } - #onReceive = (connection: Connection, message: SocketMessage) => { - const [verb, ...extra] = asMessage(message) - + #onMessage = (cxn: Connection, [verb, ...extra]: Message) => { if (verb === 'OK') { const [id, ok, message] = extra @@ -64,7 +61,7 @@ export class ConnectionAuth { } } - #onClose = (connection: Connection) => { + #onClose = (cxn: Connection) => { this.challenge = undefined this.request = undefined this.message = undefined @@ -84,19 +81,16 @@ export class ConnectionAuth { const template = createEvent(CLIENT_AUTH, { tags: [ - ["relay", this.connection.url], + ["relay", this.cxn.url], ["challenge", this.challenge], ], }) - const [event] = await Promise.all([ - ctx.net.signEvent(template), - this.connection.ensureConnected(), - ]) + const [event] = await Promise.all([ctx.net.signEvent(template), this.cxn.open()]) if (event) { this.request = event.id - this.connection.send(['AUTH', event]) + this.cxn.send(['AUTH', event]) this.status = PendingResponse } else { this.status = DeniedSignature @@ -132,9 +126,4 @@ export class ConnectionAuth { await this.wait({timeout}) } } - - destroy = () => { - this.connection.off('receive', this.#onReceive) - this.connection.off('close', this.#onClose) - } } diff --git a/packages/net/src/ConnectionEvent.ts b/packages/net/src/ConnectionEvent.ts new file mode 100644 index 0000000..7df6e42 --- /dev/null +++ b/packages/net/src/ConnectionEvent.ts @@ -0,0 +1,11 @@ +export enum ConnectionEvent { + InvalidUrl = 'invalid:url', + InvalidMessage = 'invalid:message:receive', + Open = 'socket:open', + Reset = 'socket:reset', + Close = 'socket:close', + Error = 'socket:error', + Receive = 'receive:message', + Notice = 'receive:notice', + Send = 'send:message', +} diff --git a/packages/net/src/ConnectionMeta.ts b/packages/net/src/ConnectionMeta.ts deleted file mode 100644 index ed1447f..0000000 --- a/packages/net/src/ConnectionMeta.ts +++ /dev/null @@ -1,157 +0,0 @@ -import {AUTH_JOIN} from '@welshman/util' -import type {SignedEvent, Filter} from '@welshman/util' -import type {Message} from './Socket' -import type {Connection} from './Connection' - -export type PublishMeta = { - sent: number - event: SignedEvent -} - -export type RequestMeta = { - sent: number - filters: Filter[] - eoseReceived: boolean -} - -export enum ConnectionStatus { - Error = 'error', - Closed = 'closed', - Slow = 'slow', - Ok = 'ok', -} - -export class ConnectionMeta { - pendingPublishes = new Map() - pendingRequests = new Map() - publishCount = 0 - requestCount = 0 - eventCount = 0 - lastOpen = 0 - lastClose = 0 - lastFault = 0 - lastPublish = 0 - lastRequest = 0 - lastEvent = 0 - lastAuth = 0 - responseCount = 0 - responseTimer = 0 - - constructor(readonly cxn: Connection) { - cxn.on('open', () => { - this.lastOpen = Date.now() - }) - - cxn.on('close', () => { - this.lastClose = Date.now() - }) - - cxn.on('fault', () => { - this.lastFault = Date.now() - }) - - cxn.on('send', (cxn: Connection, message: Message) => { - if (message[0] === 'REQ') this.onSendRequest(message) - if (message[0] === 'CLOSE') this.onSendClose(message) - if (message[0] === 'EVENT') this.onSendEvent(message) - }) - - cxn.on('receive', (cxn: Connection, message: Message) => { - if (message[0] === 'OK') this.onReceiveOk(message) - if (message[0] === 'AUTH') this.onReceiveAuth(message) - if (message[0] === 'EVENT') this.onReceiveEvent(message) - if (message[0] === 'EOSE') this.onReceiveEose(message) - if (message[0] === 'CLOSED') this.onReceiveClosed(message) - if (message[0] === 'NOTICE') this.onReceiveNotice(message) - }) - } - - onSendRequest([verb, subId, ...filters]: Message) { - this.requestCount++ - this.lastRequest = Date.now() - this.pendingRequests.set(subId, { - filters, - sent: Date.now(), - eoseReceived: false, - }) - } - - onSendClose([verb, subId]: Message) { - this.pendingRequests.delete(subId) - } - - onSendEvent([verb, event]: Message) { - this.publishCount++ - this.lastPublish = Date.now() - this.pendingPublishes.set(event.id, {sent: Date.now(), event}) - } - - onReceiveOk([verb, eventId, ok, notice]: Message) { - const pub = this.pendingPublishes.get(eventId) - - if (!pub) return - - // Re-enqueue pending events when auth challenge is received - if (notice?.startsWith('auth-required:') && pub.event.kind !== AUTH_JOIN) { - this.cxn.send(['EVENT', pub.event]) - } else { - this.responseCount++ - this.responseTimer += Date.now() - pub.sent - this.pendingPublishes.delete(eventId) - } - } - - onReceiveAuth(message: Message) { - this.lastAuth = Date.now() - } - - onReceiveEvent([verb, event]: Message) { - this.eventCount++ - this.lastEvent = Date.now() - } - - onReceiveEose([verb, subId]: Message) { - const request = this.pendingRequests.get(subId) - - // Only count the first eose - if (request && !request.eoseReceived) { - request.eoseReceived = true - - this.responseCount++ - this.responseTimer += Date.now() - request.sent - } - } - - onReceiveClosed([verb, id, notice]: Message) { - // Re-enqueue pending reqs when auth challenge is received - if (notice?.startsWith('auth-required:')) { - const req = this.pendingRequests.get(id) - - if (req) { - this.cxn.send(['REQ', id, ...req.filters]) - } - } - } - - onReceiveNotice([verb, notice]: Message) { - console.warn('NOTICE', this.cxn.url, notice) - } - - clearPending = () => { - this.pendingPublishes.clear() - this.pendingRequests.clear() - } - - getSpeed = () => this.responseCount ? this.responseTimer / this.responseCount : 0 - - getStatus = () => { - const socket = this.cxn.socket - - if (socket.isNew()) return ConnectionStatus.Closed - if (this.lastFault && this.lastFault > this.lastOpen) return ConnectionStatus.Error - if (socket.isClosed() || socket.isClosing()) return ConnectionStatus.Closed - if (this.getSpeed() > 2000) return ConnectionStatus.Slow - - return ConnectionStatus.Ok - } -} diff --git a/packages/net/src/ConnectionSender.ts b/packages/net/src/ConnectionSender.ts new file mode 100644 index 0000000..abeaf95 --- /dev/null +++ b/packages/net/src/ConnectionSender.ts @@ -0,0 +1,53 @@ +import {Worker} from '@welshman/lib' +import {AUTH_JOIN} from '@welshman/util' +import {SocketStatus} from './Socket' +import type {Message} from './Socket' +import type {Connection} from './Connection' +import {AuthStatus} from './ConnectionAuth' + +export class ConnectionSender { + worker: Worker + + constructor(readonly cxn: Connection) { + this.worker = new Worker({ + shouldDefer: ([verb, ...extra]: Message) => { + // If we're not connected, nothing we can do + if (this.cxn.socket.status !== SocketStatus.Open) return true + + // Always allow sending AUTH + if (verb === 'AUTH') return false + + // Only close reqs that have been sent + if (verb === 'CLOSE') return !this.cxn.state.pendingRequests.has(extra[0]) + + // Always allow sending join requests + if (verb === 'EVENT' && extra[0].kind === AUTH_JOIN) return false + + // Wait for auth + if (![AuthStatus.None, AuthStatus.Ok].includes(this.cxn.auth.status)) return true + + // Limit concurrent requests + if (verb === 'REQ') return this.cxn.state.pendingRequests.size >= 8 + + return false + }, + }) + + this.worker.addGlobalHandler(([verb, ...extra]: Message) => { + // If we ended up handling a CLOSE before we handled the REQ, don't send the REQ + if (verb === 'CLOSE') { + this.worker.buffer = this.worker.buffer.filter(m => !(m[0] === 'REQ' && m[1] === extra[0])) + } + + this.cxn.socket.send([verb, ...extra]) + }) + } + + push = (message: Message) => { + this.worker.push(message) + } + + close = async () => { + this.worker.pause() + } +} diff --git a/packages/net/src/ConnectionState.ts b/packages/net/src/ConnectionState.ts new file mode 100644 index 0000000..3682d67 --- /dev/null +++ b/packages/net/src/ConnectionState.ts @@ -0,0 +1,91 @@ +import {AUTH_JOIN} from '@welshman/util' +import type {SignedEvent, Filter} from '@welshman/util' +import type {Message} from './Socket' +import type {Connection} from './Connection' +import {ConnectionEvent} from './ConnectionEvent' + +export type PublishState = { + sent: number + event: SignedEvent +} + +export type RequestState = { + sent: number + filters: Filter[] + eose?: boolean +} + +export class ConnectionState { + pendingPublishes = new Map() + pendingRequests = new Map() + + constructor(readonly cxn: Connection) { + cxn.on(ConnectionEvent.Send, (cxn: Connection, [verb, ...extra]: Message) => { + if (verb === 'REQ') { + const [reqId, ...filters] = extra + + this.pendingRequests.set(reqId, {filters, sent: Date.now()}) + } + + if (verb === 'CLOSE') { + const [reqId] = extra + + this.pendingRequests.delete(reqId) + } + + if (verb === 'EVENT') { + const [event] = extra + + this.pendingPublishes.set(event.id, {sent: Date.now(), event: event.id}) + } + }) + + cxn.on(ConnectionEvent.Receive, (cxn: Connection, [verb, ...extra]: Message) => { + if (verb === 'OK') { + const [eventId, _ok, notice] = extra + const pub = this.pendingPublishes.get(eventId) + + if (!pub) return + + // Re-enqueue pending events when auth challenge is received + if (notice?.startsWith('auth-required:') && pub.event.kind !== AUTH_JOIN) { + this.cxn.send(['EVENT', pub.event]) + } else { + this.pendingPublishes.delete(eventId) + } + } + + if (verb === 'EOSE') { + const [reqId] = extra + const req = this.pendingRequests.get(reqId) + + if (req) { + req.eose = true + } + } + + if (verb === 'CLOSED') { + const [reqId] = extra + + // Re-enqueue pending reqs when auth challenge is received + if (extra[1]?.startsWith('auth-required:')) { + const req = this.pendingRequests.get(reqId) + + if (req) { + this.cxn.send(['REQ', reqId, ...req.filters]) + } + + if (extra[1]) { + this.cxn.emit(ConnectionEvent.Notice, extra[1]) + } + } + } + + if (verb === 'NOTICE') { + const [notice] = extra + + this.cxn.emit(ConnectionEvent.Notice, notice) + } + }) + } +} diff --git a/packages/net/src/ConnectionStats.ts b/packages/net/src/ConnectionStats.ts new file mode 100644 index 0000000..3ac0fdc --- /dev/null +++ b/packages/net/src/ConnectionStats.ts @@ -0,0 +1,97 @@ +import type {Message} from './Socket' +import type {Connection} from './Connection' +import {ConnectionEvent} from './ConnectionEvent' + +export class ConnectionStats { + openCount = 0 + closeCount = 0 + errorCount = 0 + publishCount = 0 + requestCount = 0 + eventCount = 0 + lastOpen = 0 + lastClose = 0 + lastError = 0 + lastPublish = 0 + lastRequest = 0 + lastEvent = 0 + lastAuth = 0 + publishTimer = 0 + publishSuccessCount = 0 + publishFailureCount = 0 + eoseCount = 0 + eoseTimer = 0 + noticeCount = 0 + + constructor(readonly cxn: Connection) { + cxn.on(ConnectionEvent.Open, (cxn: Connection) => { + this.openCount++ + this.lastOpen = Date.now() + }) + + cxn.on(ConnectionEvent.Close, (cxn: Connection) => { + this.closeCount++ + this.lastClose = Date.now() + }) + + cxn.on(ConnectionEvent.Error, (cxn: Connection) => { + this.errorCount++ + this.lastError = Date.now() + }) + + cxn.on(ConnectionEvent.Send, (cxn: Connection, [verb]: Message) => { + if (verb === 'REQ') { + this.requestCount++ + this.lastRequest = Date.now() + } + + if (verb === 'EVENT') { + this.publishCount++ + this.lastPublish = Date.now() + } + }) + + cxn.on(ConnectionEvent.Receive, (cxn: Connection, [verb, ...extra]: Message) => { + if (verb === 'OK') { + const pub = this.cxn.state.pendingPublishes.get(extra[0]) + + if (pub) { + this.publishTimer += Date.now() - pub.sent + } + + if (extra[1]) { + this.publishSuccessCount++ + } else { + this.publishFailureCount++ + } + } + + if (verb === 'AUTH') { + this.lastAuth = Date.now() + } + + if (verb === 'EVENT') { + this.eventCount++ + this.lastEvent = Date.now() + } + + if (verb === 'EOSE') { + const request = this.cxn.state.pendingRequests.get(extra[0]) + + // Only count the first eose + if (request && !request.eose) { + this.eoseCount++ + this.eoseTimer += Date.now() - request.sent + } + } + + if (verb === 'NOTICE') { + this.noticeCount++ + } + }) + } + + getRequestSpeed = () => this.eoseCount ? this.eoseTimer / this.eoseCount : 0 + + getPublishSpeed = () => this.publishSuccessCount ? this.publishTimer / this.publishSuccessCount : 0 +} diff --git a/packages/net/src/Pool.ts b/packages/net/src/Pool.ts index 5af5035..d198491 100644 --- a/packages/net/src/Pool.ts +++ b/packages/net/src/Pool.ts @@ -11,32 +11,25 @@ export class Pool extends Emitter { has(url: string) { return this.data.has(url) } - get(url: string, {autoConnect = true, reconnectAfter = 3000} = {}): Connection { - let connection = this.data.get(url) + get(url: string): Connection { + const oldConnection = this.data.get(url) - if (autoConnect) { - if (!connection) { - connection = new Connection(url) - - this.data.set(url, connection) - this.emit('init', connection) - - connection.on('open', () => this.emit('open', connection)) - connection.on('close', () => this.emit('close', connection)) - } - - connection.ensureConnected({ - shouldReconnect: connection.meta.lastClose < Date.now() - reconnectAfter, - }) + if (oldConnection) { + return oldConnection } - return connection! + const newConnection = new Connection(url) + + this.data.set(url, newConnection) + this.emit('init', newConnection) + + return newConnection } remove(url: string) { const connection = this.data.get(url) if (connection) { - connection.destroy() + connection.close() this.data.delete(url) } diff --git a/packages/net/src/Socket.ts b/packages/net/src/Socket.ts index 2e8f97f..1382af9 100644 --- a/packages/net/src/Socket.ts +++ b/packages/net/src/Socket.ts @@ -1,90 +1,122 @@ import WebSocket from "isomorphic-ws" -import {sleep} from '@welshman/lib' +import {Worker, sleep} from '@welshman/lib' +import {ConnectionEvent} from './ConnectionEvent' +import type {Connection} from './Connection' export type Message = [string, ...any[]] -export type PlexMessage = [{relays: string[]}, Message] - -export type SocketMessage = Message | PlexMessage - -export const isMessage = (m: SocketMessage): boolean => typeof m[0] === 'string' - -export const asMessage = (m: SocketMessage): Message => isMessage(m) ? m : m[1] - -export type SocketOpts = { - onOpen: () => void - onClose: () => void - onFault: () => void - onMessage: (message: SocketMessage) => void +export enum SocketStatus { + New = 'new', + Open = 'open', + Opening = 'opening', + Closing = 'closing', + Closed = 'closed', + Error = 'error', + Invalid = 'invalid', } +const { + New, + Open, + Opening, + Closing, + Closed, + Error, + Invalid, +} = SocketStatus + export class Socket { - ws?: WebSocket | 'invalid' + status = SocketStatus.New + worker = new Worker() + ws?: WebSocket - constructor(readonly url: string, readonly opts: SocketOpts) {} - - isNew = () => this.ws === undefined - - isInvalid = () => this.ws === 'invalid' - - isConnecting = () => this.ws?.readyState === WebSocket.CONNECTING - - isOpen = () => this.ws?.readyState === WebSocket.OPEN - - isClosing = () => this.ws?.readyState === WebSocket.CLOSING - - isClosed = () => this.ws?.readyState === WebSocket.CLOSED - - onMessage = (event: {data: string}) => { - try { - const message = JSON.parse(event.data as string) - - if (Array.isArray(message)) { - this.opts.onMessage(message as Message) - } else { - console.warn(`Invalid message received on ${this.url}:`, message) - } - } catch (e) { - // pass - } + constructor(readonly cxn: Connection) { + // Use a worker to throttle incoming data + this.worker.addGlobalHandler((message: Message) => { + this.cxn.emit(ConnectionEvent.Receive, message) + }) } - send = (message: any) => this.ws.send(JSON.stringify(message)) - - connect = async () => { - if (this.ws) { - throw new Error(`Already attempted connection for ${this.url}`) - } - - try { - this.ws = new WebSocket(this.url) - this.ws.onopen = this.opts.onOpen - this.ws.onerror = this.opts.onFault - this.ws.onclose = this.opts.onClose - this.ws.onmessage = this.onMessage - } catch (e) { - this.ws = 'invalid' - this.opts.onFault() - } - - while (this.isConnecting()) { + wait = async () => { + while ([Opening, Closing].includes(this.status)) { await sleep(100) } } - disconnect = async () => { - while (this.isConnecting()) { - await sleep(100) + open = async () => { + // If we're in a provisional state, wait + await this.wait() + + // If the socket is closed, reset + if (this.status === Closed) { + this.status = New + this.cxn.emit(ConnectionEvent.Reset) } - if (this.isOpen()) { - this.ws.close() + // If the socket is new, connect + if (this.status === New) { + this.#init() } - while (this.isClosing()) { - await sleep(100) - } + // Wait until we're connected (or fail to connect) + await this.wait() + } + + close = async () => { + this.worker.pause() + this.ws?.close() + + await this.wait() this.ws = undefined } + + send = async (message: Message) => { + await this.open() + + this.cxn.emit(ConnectionEvent.Send, message) + this.ws.send(JSON.stringify(message)) + } + + #init = () => { + try { + this.ws = new WebSocket(this.cxn.url) + this.status = Opening + + this.ws.onopen = () => { + this.status = Open + this.cxn.emit(ConnectionEvent.Open) + } + + this.ws.onerror = () => { + this.status = Error + this.cxn.emit(ConnectionEvent.Error) + } + + this.ws.onclose = () => { + if (this.status !== Error) { + this.status = Closed + } + + this.cxn.emit(ConnectionEvent.Close) + } + + this.ws.onmessage = (event: {data: string}) => { + try { + const message = JSON.parse(event.data as string) + + if (Array.isArray(message)) { + this.worker.push(message as Message) + } else { + this.cxn.emit(ConnectionEvent.InvalidMessage, event.data) + } + } catch (e) { + this.cxn.emit(ConnectionEvent.InvalidMessage, event.data) + } + } + } catch (e) { + this.status = Invalid + this.cxn.emit(ConnectionEvent.InvalidUrl) + } + } } diff --git a/packages/net/src/Subscribe.ts b/packages/net/src/Subscribe.ts index 5493dfc..29cb835 100644 --- a/packages/net/src/Subscribe.ts +++ b/packages/net/src/Subscribe.ts @@ -3,6 +3,7 @@ import {matchFilters, unionFilters, TrustedEvent} from '@welshman/util' import type {Filter} from '@welshman/util' import {Tracker} from "./Tracker" import {Connection} from './Connection' +import {ConnectionEvent} from './ConnectionEvent' // `subscribe` is a super function that handles batching subscriptions by merging // them based on parameters (filters and subscribe opts), then splits them by relay. @@ -249,7 +250,10 @@ const _executeSubscription = (sub: Subscription) => { emitter.on(SubscriptionEvent.Complete, () => { emitter.removeAllListeners() subs.forEach(sub => sub.unsubscribe()) - executor.target.connections.forEach((c: Connection) => c.off("close", onClose)) + executor.target.connections.forEach((c: Connection) => { + c.off(ConnectionEvent.Close, onClose) + }) + executor.target.cleanup() }) @@ -287,13 +291,17 @@ const _executeSubscription = (sub: Subscription) => { if (timeout) setTimeout(onComplete, timeout + authTimeout) // If one of our connections gets closed make sure to kill our sub - executor.target.connections.forEach((c: Connection) => c.on('close', onClose)) + executor.target.connections.forEach((c: Connection) => { + c.on(ConnectionEvent.Close, onClose) + }) // Finally, start our subscription. If we didn't get any filters, don't even send the // request, just close it. This can be valid when a caller fulfills a request themselves. if (filters.length > 0) { Promise.all( executor.target.connections.map(async (connection: Connection) => { + await connection.open() + if (authTimeout) { await connection.auth.waitIfPending({timeout: authTimeout}) } diff --git a/packages/net/src/index.ts b/packages/net/src/index.ts index 0f66ceb..62ed71b 100644 --- a/packages/net/src/index.ts +++ b/packages/net/src/index.ts @@ -1,6 +1,9 @@ export * from "./Connection" export * from "./ConnectionAuth" -export * from "./ConnectionMeta" +export * from "./ConnectionEvent" +export * from "./ConnectionSender" +export * from "./ConnectionState" +export * from "./ConnectionStats" export * from "./Context" export * from "./Executor" export * from "./Pool" @@ -11,7 +14,6 @@ export * from "./Sync" export * from "./Tracker" export * from "./target/Echo" export * from "./target/Multi" -export * from "./target/Plex" export * from "./target/Relay" export * from "./target/Relays" export * from "./target/Local" diff --git a/packages/net/src/target/Plex.ts b/packages/net/src/target/Plex.ts deleted file mode 100644 index 7008643..0000000 --- a/packages/net/src/target/Plex.ts +++ /dev/null @@ -1,28 +0,0 @@ -import {Emitter} from '@welshman/lib' -import type {PlexMessage, Message} from '../Socket' -import type {Connection} from '../Connection' - -export class Plex extends Emitter { - constructor(readonly urls: string[], readonly connection: Connection) { - super() - - this.connection.on('receive', this.onMessage) - } - - get connections() { - return [this.connection] - } - - send = (...payload: Message) => { - this.connection.send([{relays: this.urls}, payload]) - } - - onMessage = (connection: Connection, [{relays}, [verb, ...payload]]: PlexMessage) => { - this.emit(verb, relays[0], ...payload) - } - - cleanup = () => { - this.removeAllListeners() - this.connection.off('receive', this.onMessage) - } -} diff --git a/packages/net/src/target/Relay.ts b/packages/net/src/target/Relay.ts index 60208fe..3751fe2 100644 --- a/packages/net/src/target/Relay.ts +++ b/packages/net/src/target/Relay.ts @@ -1,4 +1,5 @@ import {Emitter} from '@welshman/lib' +import {ConnectionEvent} from '../ConnectionEvent' import type {Message} from '../Socket' import type {Connection} from '../Connection' @@ -6,7 +7,7 @@ export class Relay extends Emitter { constructor(readonly connection: Connection) { super() - this.connection.on('receive', this.onMessage) + this.connection.on(ConnectionEvent.Receive, this.onMessage) } get connections() { @@ -23,6 +24,6 @@ export class Relay extends Emitter { cleanup = () => { this.removeAllListeners() - this.connection.off('receive', this.onMessage) + this.connection.off(ConnectionEvent.Receive, this.onMessage) } } diff --git a/packages/net/src/target/Relays.ts b/packages/net/src/target/Relays.ts index e1c75e5..0baa004 100644 --- a/packages/net/src/target/Relays.ts +++ b/packages/net/src/target/Relays.ts @@ -1,13 +1,14 @@ import {Emitter} from '@welshman/lib' import type {Message} from '../Socket' import type {Connection} from '../Connection' +import {ConnectionEvent} from '../ConnectionEvent' export class Relays extends Emitter { constructor(readonly connections: Connection[]) { super() connections.forEach(connection => { - connection.on('receive', this.onMessage) + connection.on(ConnectionEvent.Receive, this.onMessage) }) } diff --git a/packages/util/src/Profile.ts b/packages/util/src/Profile.ts index 487c648..e803292 100644 --- a/packages/util/src/Profile.ts +++ b/packages/util/src/Profile.ts @@ -27,7 +27,7 @@ export const isPublishedProfile = (profile: Profile): profile is PublishedProfil export const makeProfile = (profile: Partial = {}): Profile => { const address = profile.lud06 || profile.lud16 - const lnurl = address ? getLnUrl(address) : null + const lnurl = typeof address === 'string' ? getLnUrl(address) : null return lnurl ? {lnurl, ...profile} : profile }