Re-work connections and relay stats

This commit is contained in:
Jon Staab
2024-11-05 12:05:04 -08:00
parent 770ce1a617
commit ecb08cace9
19 changed files with 589 additions and 508 deletions
+124 -40
View File
@@ -1,29 +1,56 @@
import {writable, derived} from 'svelte/store'
import {withGetter} from '@welshman/store'
import {ctx, groupBy, indexBy, batch, now, uniq, batcher, postJson} from '@welshman/lib'
import {ctx, groupBy, indexBy, batch, now, ago, uniq, batcher, postJson} from '@welshman/lib'
import type {RelayProfile} from "@welshman/util"
import {normalizeRelayUrl, displayRelayUrl, displayRelayProfile} from "@welshman/util"
import {asMessage, type Connection, type SocketMessage} from '@welshman/net'
import {ConnectionEvent} from '@welshman/net'
import type {Connection, Message} from '@welshman/net'
import {collection} from './collection'
export type RelayStats = {
first_seen: number
event_count: number
request_count: number
publish_count: number
connect_count: number
recent_errors: number[]
open_count: number
close_count: number
publish_count: number
request_count: number
event_count: number
last_open: number
last_close: number
last_error: number
last_publish: number
last_request: number
last_event: number
last_auth: number
publish_timer: number
publish_success_count: number
publish_failure_count: number
eose_count: number
eose_timer: number
notice_count: number
}
// Relays
export const makeRelayStats = (): RelayStats => ({
first_seen: now(),
event_count: 0,
request_count: 0,
publish_count: 0,
connect_count: 0,
recent_errors: [],
open_count: 0,
close_count: 0,
publish_count: 0,
request_count: 0,
event_count: 0,
last_open: 0,
last_close: 0,
last_error: 0,
last_publish: 0,
last_request: 0,
last_event: 0,
last_auth: 0,
publish_timer: 0,
publish_success_count: 0,
publish_failure_count: 0,
eose_count: 0,
eose_timer: 0,
notice_count: 0,
})
export type Relay = {
@@ -106,52 +133,109 @@ const updateRelayStats = batch(500, (updates: RelayStatsUpdate[]) => {
if (!$relay.stats) {
$relay.stats = makeRelayStats()
} else if ($relay.stats.notice_count === undefined) {
// Migrate from old stats
$relay.stats = {...makeRelayStats(), ...$relay.stats}
}
for (const [_, update] of items) {
update($relay.stats)
}
$relaysByUrl.set(url, $relay)
// Copy so the database gets updated, since we're mutating in updates
$relaysByUrl.set(url, {...$relay})
}
return Array.from($relaysByUrl.values())
})
})
const onConnectionSend = ({url}: Connection, socketMessage: SocketMessage) => {
const [verb] = asMessage(socketMessage)
if (verb === 'REQ') {
updateRelayStats([url, stats => ++stats.request_count])
} else if (verb === 'EVENT') {
updateRelayStats([url, stats => ++stats.publish_count])
}
}
const onConnectionReceive = ({url}: Connection, socketMessage: SocketMessage) => {
const [verb] = asMessage(socketMessage)
if (verb === 'EVENT') {
updateRelayStats([url, stats => ++stats.event_count])
}
}
const onConnectionFault = ({url}: Connection) =>
const onConnectionOpen = ({url}: Connection) =>
updateRelayStats([url, stats => {
stats.last_open = now()
stats.open_count++
}])
const onConnectionClose = ({url}: Connection) =>
updateRelayStats([url, stats => {
stats.last_close = now()
stats.close_count++
}])
const onConnectionSend = ({url}: Connection, [verb]: Message) => {
if (verb === 'REQ') {
updateRelayStats([url, stats => {
stats.request_count++
stats.last_request = now()
}])
} else if (verb === 'EVENT') {
updateRelayStats([url, stats => {
stats.publish_count++
stats.last_publish = now()
}])
}
}
const onConnectionReceive = ({url, state}: Connection, [verb, ...extra]: Message) => {
if (verb === 'OK') {
const [eventId, ok] = extra
const pub = state.pendingPublishes.get(eventId)
updateRelayStats([url, stats => {
if (pub) {
stats.publish_timer += ago(pub.sent)
}
if (ok) {
stats.publish_success_count++
} else {
stats.publish_failure_count++
}
}])
} else if (verb === 'AUTH') {
updateRelayStats([url, stats => {
stats.last_auth = now()
}])
} else if (verb === 'EVENT') {
updateRelayStats([url, stats => {
stats.event_count++
stats.last_event = now()
}])
} else if (verb === 'EOSE') {
const request = state.pendingRequests.get(extra[0])
// Only count the first eose
if (request && !request.eose) {
updateRelayStats([url, stats => {
stats.eose_count++
stats.eose_timer += now() - request.sent
}])
}
} else if (verb === 'NOTICE') {
updateRelayStats([url, stats => {
stats.notice_count++
}])
}
}
const onConnectionError = ({url}: Connection) =>
updateRelayStats([url, stats => {
stats.last_error = now()
stats.recent_errors = stats.recent_errors.concat(now()).slice(-10)
}])
export const trackRelayStats = (connection: Connection) => {
updateRelayStats([connection.url, stats => ++stats.connect_count])
connection.on('send', onConnectionSend)
connection.on('receive', onConnectionReceive)
connection.on('fault', onConnectionFault)
connection.on(ConnectionEvent.Open, onConnectionOpen)
connection.on(ConnectionEvent.Close, onConnectionClose)
connection.on(ConnectionEvent.Send, onConnectionSend)
connection.on(ConnectionEvent.Receive, onConnectionReceive)
connection.on(ConnectionEvent.Error, onConnectionError)
return () => {
connection.off('send', onConnectionSend)
connection.off('receive', onConnectionReceive)
connection.off('fault', onConnectionFault)
connection.off(ConnectionEvent.Open, onConnectionOpen)
connection.off(ConnectionEvent.Close, onConnectionClose)
connection.off(ConnectionEvent.Send, onConnectionSend)
connection.off(ConnectionEvent.Receive, onConnectionReceive)
connection.off(ConnectionEvent.Error, onConnectionError)
}
}
+19 -51
View File
@@ -129,7 +129,7 @@ export class Router {
getRelaysForUser = (mode?: RelayMode) => {
const pubkey = this.options.getUserPubkey?.()
return pubkey ? this.getRelaysForPubkey(pubkey) : []
return pubkey ? this.getRelaysForPubkey(pubkey, mode) : []
}
// Utilities for creating scenarios
@@ -274,10 +274,14 @@ export class RouterScenario {
}
const scoreRelay = (relay: string) => {
const quality = this.router.options.getRelayQuality?.(relay) || 1
const {getRelayQuality = always(1)} = this.router.options
const quality = getRelayQuality(relay)
const weight = relayWeights.get(relay)!
return -(quality * weight)
// Log the weight, since it's a straight count which ends up over-weighting hubs.
// Also add some random noise so that we'll occasionally pick lower quality/less
// popular relays.
return -(quality * inc(Math.log(weight)) * Math.random())
}
const relays = take(
@@ -306,51 +310,18 @@ export class RouterScenario {
// Default router options
export const getRelayQuality = (url: string) => {
const oneMinute = 60 * 1000
const oneHour = 60 * oneMinute
const oneDay = 24 * oneHour
const oneWeek = 7 * oneDay
const relay = relaysByUrl.get().get(url)
const connect_count = relay?.stats?.connect_count || 0
const recent_errors = relay?.stats?.recent_errors || []
const connection = ctx.net.pool.get(url, {autoConnect: false})
// If we haven't connected, consult our relay record and see if there has
// been a recent fault. If there has been, penalize the relay. If there have been several,
// don't use the relay.
if (!connection) {
const lastFault = last(recent_errors) || 0
if (!relay?.stats) return 1
if (recent_errors.filter(n => n > now() - oneHour).length > 10) {
return 0
}
const {recent_errors} = relay.stats
const last_error = last(recent_errors) || 0
if (recent_errors.filter(n => n > now() - oneDay).length > 50) {
return 0
}
if (recent_errors.filter(n => n > ago(HOUR)).length > 5) return 0
if (recent_errors.filter(n => n > ago(DAY)).length > 20) return 0
if (recent_errors.filter(n => n > ago(WEEK)).length > 100) return 0
if (recent_errors.filter(n => n > now() - oneWeek).length > 100) {
return 0
}
return Math.max(0, Math.min(0.5, (now() - oneMinute - lastFault) / oneHour))
}
const authScore = switcher(connection.auth.status, {
[AuthStatus.Forbidden]: 0,
[AuthStatus.Ok]: 1,
default: 0.5,
})
const connectionScore = switcher(connection.meta.getStatus(), {
[ConnectionStatus.Error]: 0,
[ConnectionStatus.Closed]: 0.6,
[ConnectionStatus.Slow]: 0.5,
[ConnectionStatus.Ok]: 1,
default: clamp([0.5, 1], connect_count / 1000),
})
return authScore * connectionScore
return Math.max(0, Math.min(0.5, (ago(MINUTE) - last_error) / HOUR))
}
export const getPubkeyRelays = (pubkey: string, mode?: string) => {
@@ -402,9 +373,6 @@ type FilterScenario = {filter: Filter, scenario: RouterScenario}
type FilterSelectionRule = (filter: Filter) => FilterScenario[]
export const getFilterSelectionsForLocalRelay = (filter: Filter) =>
[{filter, scenario: ctx.app.router.FromRelays([LOCAL_RELAY_URL])}]
export const getFilterSelectionsForSearch = (filter: Filter) => {
if (!filter.search) return []
@@ -438,7 +406,7 @@ export const getFilterSelectionsForIndexedKinds = (filter: Filter) => {
export const getFilterSelectionsForAuthors = (filter: Filter) => {
if (!filter.authors) return []
const chunkCount = clamp([1, 4], Math.round(filter.authors.length / 50))
const chunkCount = clamp([1, 30], Math.round(filter.authors.length / 30))
return chunks(chunkCount, filter.authors)
.map(authors => ({
@@ -448,10 +416,9 @@ export const getFilterSelectionsForAuthors = (filter: Filter) => {
}
export const getFilterSelectionsForUser = (filter: Filter) =>
[{filter, scenario: ctx.app.router.ForUser().weight(0.5)}]
[{filter, scenario: ctx.app.router.ForUser().weight(0.2)}]
export const defaultFilterSelectionRules = [
getFilterSelectionsForLocalRelay,
getFilterSelectionsForSearch,
getFilterSelectionsForWraps,
getFilterSelectionsForIndexedKinds,
@@ -461,7 +428,7 @@ export const defaultFilterSelectionRules = [
export const getFilterSelections = (
filters: Filter[],
rules: FilterSelectionRule[] = defaultFilterSelectionRules
rules: FilterSelectionRule[] = defaultFilterSelectionRules
): RelaysAndFilters[] => {
const filtersById = new Map<string, Filter>()
const scenariosById = new Map<string, RouterScenario[]>()
@@ -479,8 +446,9 @@ export const getFilterSelections = (
for (const [id, filter] of filtersById.entries()) {
const scenario = ctx.app.router.merge(scenariosById.get(id) || [])
const relays = scenario.getUrls().concat(LOCAL_RELAY_URL)
result.push({filters: [filter], relays: scenario.getUrls()})
result.push({filters: [filter], relays})
}
return result
+2 -2
View File
@@ -3,7 +3,7 @@ import type {IDBPDatabase} from "idb"
import {throttle} from "throttle-debounce"
import {writable} from "svelte/store"
import type {Unsubscriber, Writable} from "svelte/store"
import {indexBy, equals, fromPairs} from "@welshman/lib"
import {indexBy, fromPairs} from "@welshman/lib"
import type {TrustedEvent, Repository} from "@welshman/util"
import type {Tracker} from "@welshman/net"
import {withGetter, adapter, throttled, custom} from "@welshman/store"
@@ -60,7 +60,7 @@ export const initIndexedDbAdapter = async (name: string, adapter: IndexedDbAdapt
const removedRecords = prevRecords.filter(r => !currentIds.has(r[adapter.keyPath]))
const prevRecordsById = indexBy(item => item[adapter.keyPath], prevRecords)
const updatedRecords = currentRecords.filter(r => !equals(r, prevRecordsById.get(r[adapter.keyPath])))
const updatedRecords = currentRecords.filter(r => r !== prevRecordsById.get(r[adapter.keyPath]))
prevRecords = currentRecords
+15 -6
View File
@@ -8,7 +8,8 @@ export type WorkerOpts<T> = {
export class Worker<T> {
buffer: T[] = []
handlers: Map<any, Array<(x: T) => void>> = new Map()
timeout: number | undefined
#timeout: number | undefined
#paused = false
constructor(readonly opts: WorkerOpts<T> = {}) {}
@@ -46,13 +47,13 @@ export class Worker<T> {
}
}
this.timeout = undefined
this.#timeout = undefined
this.#enqueueWork()
}
#enqueueWork = () => {
if (!this.timeout && this.buffer.length > 0) {
this.timeout = setTimeout(this.#doWork, 50) as unknown as number
if (!this.#paused && !this.#timeout && this.buffer.length > 0) {
this.#timeout = setTimeout(this.#doWork, 50) as unknown as number
}
}
@@ -73,7 +74,15 @@ export class Worker<T> {
this.buffer = []
}
stop() {
clearTimeout(this.timeout)
pause() {
clearTimeout(this.#timeout)
this.#paused = true
this.#timeout = undefined
}
resume() {
this.#paused = false
this.#enqueueWork()
}
}
+37 -110
View File
@@ -1,135 +1,62 @@
import {Emitter, Worker, sleep} from '@welshman/lib'
import {AUTH_JOIN} from '@welshman/util'
import {ConnectionMeta} from './ConnectionMeta'
import {ConnectionAuth, AuthStatus} from './ConnectionAuth'
import {Socket, isMessage, asMessage} from './Socket'
import type {SocketMessage} from './Socket'
import {Emitter} from '@welshman/lib'
import {Socket} from './Socket'
import type {Message} from './Socket'
import {ConnectionEvent} from './ConnectionEvent'
import {ConnectionState} from './ConnectionState'
import {ConnectionStats} from './ConnectionStats'
import {ConnectionAuth} from './ConnectionAuth'
import {ConnectionSender} from './ConnectionSender'
export enum ConnectionStatus {
Ready = "ready",
Closed = "Closed",
Closing = "Closing",
}
const {Ready, Closed, Closing} = ConnectionStatus
export class Connection extends Emitter {
url: string
socket: Socket
sender: Worker<SocketMessage>
receiver: Worker<SocketMessage>
meta: ConnectionMeta
sender: ConnectionSender
state: ConnectionState
stats: ConnectionStats
auth: ConnectionAuth
status = Ready
constructor(url: string) {
super()
this.url = url
this.socket = new Socket(url, this)
this.sender = this.createSender()
this.receiver = this.createReceiver()
this.meta = new ConnectionMeta(this)
this.socket = new Socket(this)
this.sender = new ConnectionSender(this)
this.state = new ConnectionState(this)
this.stats = new ConnectionStats(this)
this.auth = new ConnectionAuth(this)
this.setMaxListeners(100)
}
createSender = () => {
const worker = new Worker<SocketMessage>({
shouldDefer: (message: SocketMessage) => {
if (!this.socket.isOpen()) {
return true
}
emit = (type: ConnectionEvent, ...args: any[]) => super.emit(type, this, ...args)
const [verb, ...extra] = asMessage(message)
send = async (message: Message) => {
await this.open()
if (verb === 'AUTH') {
return false
}
// Only close reqs that have been sent
if (verb === 'CLOSE') {
return !this.meta.pendingRequests.has(extra[0])
}
// Allow relay requests through
if (verb === 'EVENT' && extra[0].kind === AUTH_JOIN) {
return false
}
// Only defer for auth if we're not multiplexing
if (isMessage(message) && ![AuthStatus.None, AuthStatus.Ok].includes(this.auth.status)) {
return true
}
if (verb === 'REQ') {
return this.meta.pendingRequests.size >= 8
}
return false
}
})
worker.addGlobalHandler((message: SocketMessage) => {
// If we ended up handling a CLOSE before we handled the REQ, don't send the REQ
if (message[0] === 'CLOSE') {
worker.buffer = worker.buffer.filter(m => !(m[0] === 'REQ' && m[1] === message[1]))
}
this.onSend(message)
})
return worker
}
createReceiver = () => {
const worker = new Worker<SocketMessage>()
worker.addGlobalHandler(this.onReceive)
return worker
}
send = (m: SocketMessage) => this.sender.push(m)
onOpen = () => this.emit('open', this)
onClose = () => this.emit('close', this)
onFault = () => this.emit('fault', this)
onMessage = (m: SocketMessage) => this.receiver.push(m)
onSend = (message: SocketMessage) => {
this.emit('send', this, message)
this.socket.send(message)
}
onReceive = (message: SocketMessage) => {
this.emit('receive', this, message)
}
ensureConnected = async ({shouldReconnect = true} = {}) => {
const isUnhealthy = this.socket.isClosing() || this.socket.isClosed()
const noRecentFault = this.meta.lastFault < Date.now() - 60_000
if (shouldReconnect && isUnhealthy && noRecentFault) {
await this.disconnect()
}
if (this.socket.isNew()) {
await this.socket.connect()
}
while (this.socket.isConnecting()) {
await sleep(100)
if (this.status === Ready) {
this.sender.push(message)
}
}
async disconnect() {
await this.socket.disconnect()
this.sender.clear()
this.receiver.clear()
this.meta.clearPending()
open = async () => {
await this.socket.open()
}
async destroy() {
await this.disconnect()
close = async () => {
this.status = Closing
await this.sender.close()
await this.socket.close()
this.status = Closed
this.removeAllListeners()
this.sender.stop()
this.receiver.stop()
}
}
+9 -20
View File
@@ -1,8 +1,8 @@
import {ctx, sleep} from '@welshman/lib'
import {CLIENT_AUTH, createEvent} from '@welshman/util'
import {ConnectionEvent} from './ConnectionEvent'
import type {Connection} from './Connection'
import type {SocketMessage} from './Socket'
import {asMessage} from './Socket'
import type {Message} from './Socket'
export enum AuthMode {
Implicit = 'implicit',
@@ -35,14 +35,11 @@ export class ConnectionAuth {
message: string | undefined
status = None
constructor(readonly connection: Connection) {
this.connection.on('receive', this.#onReceive)
this.connection.on('close', this.#onClose)
constructor(readonly cxn: Connection) {
this.cxn.on(ConnectionEvent.Close, this.#onClose)
}
#onReceive = (connection: Connection, message: SocketMessage) => {
const [verb, ...extra] = asMessage(message)
#onMessage = (cxn: Connection, [verb, ...extra]: Message) => {
if (verb === 'OK') {
const [id, ok, message] = extra
@@ -64,7 +61,7 @@ export class ConnectionAuth {
}
}
#onClose = (connection: Connection) => {
#onClose = (cxn: Connection) => {
this.challenge = undefined
this.request = undefined
this.message = undefined
@@ -84,19 +81,16 @@ export class ConnectionAuth {
const template = createEvent(CLIENT_AUTH, {
tags: [
["relay", this.connection.url],
["relay", this.cxn.url],
["challenge", this.challenge],
],
})
const [event] = await Promise.all([
ctx.net.signEvent(template),
this.connection.ensureConnected(),
])
const [event] = await Promise.all([ctx.net.signEvent(template), this.cxn.open()])
if (event) {
this.request = event.id
this.connection.send(['AUTH', event])
this.cxn.send(['AUTH', event])
this.status = PendingResponse
} else {
this.status = DeniedSignature
@@ -132,9 +126,4 @@ export class ConnectionAuth {
await this.wait({timeout})
}
}
destroy = () => {
this.connection.off('receive', this.#onReceive)
this.connection.off('close', this.#onClose)
}
}
+11
View File
@@ -0,0 +1,11 @@
export enum ConnectionEvent {
InvalidUrl = 'invalid:url',
InvalidMessage = 'invalid:message:receive',
Open = 'socket:open',
Reset = 'socket:reset',
Close = 'socket:close',
Error = 'socket:error',
Receive = 'receive:message',
Notice = 'receive:notice',
Send = 'send:message',
}
-157
View File
@@ -1,157 +0,0 @@
import {AUTH_JOIN} from '@welshman/util'
import type {SignedEvent, Filter} from '@welshman/util'
import type {Message} from './Socket'
import type {Connection} from './Connection'
export type PublishMeta = {
sent: number
event: SignedEvent
}
export type RequestMeta = {
sent: number
filters: Filter[]
eoseReceived: boolean
}
export enum ConnectionStatus {
Error = 'error',
Closed = 'closed',
Slow = 'slow',
Ok = 'ok',
}
export class ConnectionMeta {
pendingPublishes = new Map<string, PublishMeta>()
pendingRequests = new Map<string, RequestMeta>()
publishCount = 0
requestCount = 0
eventCount = 0
lastOpen = 0
lastClose = 0
lastFault = 0
lastPublish = 0
lastRequest = 0
lastEvent = 0
lastAuth = 0
responseCount = 0
responseTimer = 0
constructor(readonly cxn: Connection) {
cxn.on('open', () => {
this.lastOpen = Date.now()
})
cxn.on('close', () => {
this.lastClose = Date.now()
})
cxn.on('fault', () => {
this.lastFault = Date.now()
})
cxn.on('send', (cxn: Connection, message: Message) => {
if (message[0] === 'REQ') this.onSendRequest(message)
if (message[0] === 'CLOSE') this.onSendClose(message)
if (message[0] === 'EVENT') this.onSendEvent(message)
})
cxn.on('receive', (cxn: Connection, message: Message) => {
if (message[0] === 'OK') this.onReceiveOk(message)
if (message[0] === 'AUTH') this.onReceiveAuth(message)
if (message[0] === 'EVENT') this.onReceiveEvent(message)
if (message[0] === 'EOSE') this.onReceiveEose(message)
if (message[0] === 'CLOSED') this.onReceiveClosed(message)
if (message[0] === 'NOTICE') this.onReceiveNotice(message)
})
}
onSendRequest([verb, subId, ...filters]: Message) {
this.requestCount++
this.lastRequest = Date.now()
this.pendingRequests.set(subId, {
filters,
sent: Date.now(),
eoseReceived: false,
})
}
onSendClose([verb, subId]: Message) {
this.pendingRequests.delete(subId)
}
onSendEvent([verb, event]: Message) {
this.publishCount++
this.lastPublish = Date.now()
this.pendingPublishes.set(event.id, {sent: Date.now(), event})
}
onReceiveOk([verb, eventId, ok, notice]: Message) {
const pub = this.pendingPublishes.get(eventId)
if (!pub) return
// Re-enqueue pending events when auth challenge is received
if (notice?.startsWith('auth-required:') && pub.event.kind !== AUTH_JOIN) {
this.cxn.send(['EVENT', pub.event])
} else {
this.responseCount++
this.responseTimer += Date.now() - pub.sent
this.pendingPublishes.delete(eventId)
}
}
onReceiveAuth(message: Message) {
this.lastAuth = Date.now()
}
onReceiveEvent([verb, event]: Message) {
this.eventCount++
this.lastEvent = Date.now()
}
onReceiveEose([verb, subId]: Message) {
const request = this.pendingRequests.get(subId)
// Only count the first eose
if (request && !request.eoseReceived) {
request.eoseReceived = true
this.responseCount++
this.responseTimer += Date.now() - request.sent
}
}
onReceiveClosed([verb, id, notice]: Message) {
// Re-enqueue pending reqs when auth challenge is received
if (notice?.startsWith('auth-required:')) {
const req = this.pendingRequests.get(id)
if (req) {
this.cxn.send(['REQ', id, ...req.filters])
}
}
}
onReceiveNotice([verb, notice]: Message) {
console.warn('NOTICE', this.cxn.url, notice)
}
clearPending = () => {
this.pendingPublishes.clear()
this.pendingRequests.clear()
}
getSpeed = () => this.responseCount ? this.responseTimer / this.responseCount : 0
getStatus = () => {
const socket = this.cxn.socket
if (socket.isNew()) return ConnectionStatus.Closed
if (this.lastFault && this.lastFault > this.lastOpen) return ConnectionStatus.Error
if (socket.isClosed() || socket.isClosing()) return ConnectionStatus.Closed
if (this.getSpeed() > 2000) return ConnectionStatus.Slow
return ConnectionStatus.Ok
}
}
+53
View File
@@ -0,0 +1,53 @@
import {Worker} from '@welshman/lib'
import {AUTH_JOIN} from '@welshman/util'
import {SocketStatus} from './Socket'
import type {Message} from './Socket'
import type {Connection} from './Connection'
import {AuthStatus} from './ConnectionAuth'
export class ConnectionSender {
worker: Worker<Message>
constructor(readonly cxn: Connection) {
this.worker = new Worker({
shouldDefer: ([verb, ...extra]: Message) => {
// If we're not connected, nothing we can do
if (this.cxn.socket.status !== SocketStatus.Open) return true
// Always allow sending AUTH
if (verb === 'AUTH') return false
// Only close reqs that have been sent
if (verb === 'CLOSE') return !this.cxn.state.pendingRequests.has(extra[0])
// Always allow sending join requests
if (verb === 'EVENT' && extra[0].kind === AUTH_JOIN) return false
// Wait for auth
if (![AuthStatus.None, AuthStatus.Ok].includes(this.cxn.auth.status)) return true
// Limit concurrent requests
if (verb === 'REQ') return this.cxn.state.pendingRequests.size >= 8
return false
},
})
this.worker.addGlobalHandler(([verb, ...extra]: Message) => {
// If we ended up handling a CLOSE before we handled the REQ, don't send the REQ
if (verb === 'CLOSE') {
this.worker.buffer = this.worker.buffer.filter(m => !(m[0] === 'REQ' && m[1] === extra[0]))
}
this.cxn.socket.send([verb, ...extra])
})
}
push = (message: Message) => {
this.worker.push(message)
}
close = async () => {
this.worker.pause()
}
}
+91
View File
@@ -0,0 +1,91 @@
import {AUTH_JOIN} from '@welshman/util'
import type {SignedEvent, Filter} from '@welshman/util'
import type {Message} from './Socket'
import type {Connection} from './Connection'
import {ConnectionEvent} from './ConnectionEvent'
export type PublishState = {
sent: number
event: SignedEvent
}
export type RequestState = {
sent: number
filters: Filter[]
eose?: boolean
}
export class ConnectionState {
pendingPublishes = new Map<string, PublishState>()
pendingRequests = new Map<string, RequestState>()
constructor(readonly cxn: Connection) {
cxn.on(ConnectionEvent.Send, (cxn: Connection, [verb, ...extra]: Message) => {
if (verb === 'REQ') {
const [reqId, ...filters] = extra
this.pendingRequests.set(reqId, {filters, sent: Date.now()})
}
if (verb === 'CLOSE') {
const [reqId] = extra
this.pendingRequests.delete(reqId)
}
if (verb === 'EVENT') {
const [event] = extra
this.pendingPublishes.set(event.id, {sent: Date.now(), event: event.id})
}
})
cxn.on(ConnectionEvent.Receive, (cxn: Connection, [verb, ...extra]: Message) => {
if (verb === 'OK') {
const [eventId, _ok, notice] = extra
const pub = this.pendingPublishes.get(eventId)
if (!pub) return
// Re-enqueue pending events when auth challenge is received
if (notice?.startsWith('auth-required:') && pub.event.kind !== AUTH_JOIN) {
this.cxn.send(['EVENT', pub.event])
} else {
this.pendingPublishes.delete(eventId)
}
}
if (verb === 'EOSE') {
const [reqId] = extra
const req = this.pendingRequests.get(reqId)
if (req) {
req.eose = true
}
}
if (verb === 'CLOSED') {
const [reqId] = extra
// Re-enqueue pending reqs when auth challenge is received
if (extra[1]?.startsWith('auth-required:')) {
const req = this.pendingRequests.get(reqId)
if (req) {
this.cxn.send(['REQ', reqId, ...req.filters])
}
if (extra[1]) {
this.cxn.emit(ConnectionEvent.Notice, extra[1])
}
}
}
if (verb === 'NOTICE') {
const [notice] = extra
this.cxn.emit(ConnectionEvent.Notice, notice)
}
})
}
}
+97
View File
@@ -0,0 +1,97 @@
import type {Message} from './Socket'
import type {Connection} from './Connection'
import {ConnectionEvent} from './ConnectionEvent'
export class ConnectionStats {
openCount = 0
closeCount = 0
errorCount = 0
publishCount = 0
requestCount = 0
eventCount = 0
lastOpen = 0
lastClose = 0
lastError = 0
lastPublish = 0
lastRequest = 0
lastEvent = 0
lastAuth = 0
publishTimer = 0
publishSuccessCount = 0
publishFailureCount = 0
eoseCount = 0
eoseTimer = 0
noticeCount = 0
constructor(readonly cxn: Connection) {
cxn.on(ConnectionEvent.Open, (cxn: Connection) => {
this.openCount++
this.lastOpen = Date.now()
})
cxn.on(ConnectionEvent.Close, (cxn: Connection) => {
this.closeCount++
this.lastClose = Date.now()
})
cxn.on(ConnectionEvent.Error, (cxn: Connection) => {
this.errorCount++
this.lastError = Date.now()
})
cxn.on(ConnectionEvent.Send, (cxn: Connection, [verb]: Message) => {
if (verb === 'REQ') {
this.requestCount++
this.lastRequest = Date.now()
}
if (verb === 'EVENT') {
this.publishCount++
this.lastPublish = Date.now()
}
})
cxn.on(ConnectionEvent.Receive, (cxn: Connection, [verb, ...extra]: Message) => {
if (verb === 'OK') {
const pub = this.cxn.state.pendingPublishes.get(extra[0])
if (pub) {
this.publishTimer += Date.now() - pub.sent
}
if (extra[1]) {
this.publishSuccessCount++
} else {
this.publishFailureCount++
}
}
if (verb === 'AUTH') {
this.lastAuth = Date.now()
}
if (verb === 'EVENT') {
this.eventCount++
this.lastEvent = Date.now()
}
if (verb === 'EOSE') {
const request = this.cxn.state.pendingRequests.get(extra[0])
// Only count the first eose
if (request && !request.eose) {
this.eoseCount++
this.eoseTimer += Date.now() - request.sent
}
}
if (verb === 'NOTICE') {
this.noticeCount++
}
})
}
getRequestSpeed = () => this.eoseCount ? this.eoseTimer / this.eoseCount : 0
getPublishSpeed = () => this.publishSuccessCount ? this.publishTimer / this.publishSuccessCount : 0
}
+11 -18
View File
@@ -11,32 +11,25 @@ export class Pool extends Emitter {
has(url: string) {
return this.data.has(url)
}
get(url: string, {autoConnect = true, reconnectAfter = 3000} = {}): Connection {
let connection = this.data.get(url)
get(url: string): Connection {
const oldConnection = this.data.get(url)
if (autoConnect) {
if (!connection) {
connection = new Connection(url)
this.data.set(url, connection)
this.emit('init', connection)
connection.on('open', () => this.emit('open', connection))
connection.on('close', () => this.emit('close', connection))
}
connection.ensureConnected({
shouldReconnect: connection.meta.lastClose < Date.now() - reconnectAfter,
})
if (oldConnection) {
return oldConnection
}
return connection!
const newConnection = new Connection(url)
this.data.set(url, newConnection)
this.emit('init', newConnection)
return newConnection
}
remove(url: string) {
const connection = this.data.get(url)
if (connection) {
connection.destroy()
connection.close()
this.data.delete(url)
}
+100 -68
View File
@@ -1,90 +1,122 @@
import WebSocket from "isomorphic-ws"
import {sleep} from '@welshman/lib'
import {Worker, sleep} from '@welshman/lib'
import {ConnectionEvent} from './ConnectionEvent'
import type {Connection} from './Connection'
export type Message = [string, ...any[]]
export type PlexMessage = [{relays: string[]}, Message]
export type SocketMessage = Message | PlexMessage
export const isMessage = (m: SocketMessage): boolean => typeof m[0] === 'string'
export const asMessage = (m: SocketMessage): Message => isMessage(m) ? m : m[1]
export type SocketOpts = {
onOpen: () => void
onClose: () => void
onFault: () => void
onMessage: (message: SocketMessage) => void
export enum SocketStatus {
New = 'new',
Open = 'open',
Opening = 'opening',
Closing = 'closing',
Closed = 'closed',
Error = 'error',
Invalid = 'invalid',
}
const {
New,
Open,
Opening,
Closing,
Closed,
Error,
Invalid,
} = SocketStatus
export class Socket {
ws?: WebSocket | 'invalid'
status = SocketStatus.New
worker = new Worker<Message>()
ws?: WebSocket
constructor(readonly url: string, readonly opts: SocketOpts) {}
isNew = () => this.ws === undefined
isInvalid = () => this.ws === 'invalid'
isConnecting = () => this.ws?.readyState === WebSocket.CONNECTING
isOpen = () => this.ws?.readyState === WebSocket.OPEN
isClosing = () => this.ws?.readyState === WebSocket.CLOSING
isClosed = () => this.ws?.readyState === WebSocket.CLOSED
onMessage = (event: {data: string}) => {
try {
const message = JSON.parse(event.data as string)
if (Array.isArray(message)) {
this.opts.onMessage(message as Message)
} else {
console.warn(`Invalid message received on ${this.url}:`, message)
}
} catch (e) {
// pass
}
constructor(readonly cxn: Connection) {
// Use a worker to throttle incoming data
this.worker.addGlobalHandler((message: Message) => {
this.cxn.emit(ConnectionEvent.Receive, message)
})
}
send = (message: any) => this.ws.send(JSON.stringify(message))
connect = async () => {
if (this.ws) {
throw new Error(`Already attempted connection for ${this.url}`)
}
try {
this.ws = new WebSocket(this.url)
this.ws.onopen = this.opts.onOpen
this.ws.onerror = this.opts.onFault
this.ws.onclose = this.opts.onClose
this.ws.onmessage = this.onMessage
} catch (e) {
this.ws = 'invalid'
this.opts.onFault()
}
while (this.isConnecting()) {
wait = async () => {
while ([Opening, Closing].includes(this.status)) {
await sleep(100)
}
}
disconnect = async () => {
while (this.isConnecting()) {
await sleep(100)
open = async () => {
// If we're in a provisional state, wait
await this.wait()
// If the socket is closed, reset
if (this.status === Closed) {
this.status = New
this.cxn.emit(ConnectionEvent.Reset)
}
if (this.isOpen()) {
this.ws.close()
// If the socket is new, connect
if (this.status === New) {
this.#init()
}
while (this.isClosing()) {
await sleep(100)
}
// Wait until we're connected (or fail to connect)
await this.wait()
}
close = async () => {
this.worker.pause()
this.ws?.close()
await this.wait()
this.ws = undefined
}
send = async (message: Message) => {
await this.open()
this.cxn.emit(ConnectionEvent.Send, message)
this.ws.send(JSON.stringify(message))
}
#init = () => {
try {
this.ws = new WebSocket(this.cxn.url)
this.status = Opening
this.ws.onopen = () => {
this.status = Open
this.cxn.emit(ConnectionEvent.Open)
}
this.ws.onerror = () => {
this.status = Error
this.cxn.emit(ConnectionEvent.Error)
}
this.ws.onclose = () => {
if (this.status !== Error) {
this.status = Closed
}
this.cxn.emit(ConnectionEvent.Close)
}
this.ws.onmessage = (event: {data: string}) => {
try {
const message = JSON.parse(event.data as string)
if (Array.isArray(message)) {
this.worker.push(message as Message)
} else {
this.cxn.emit(ConnectionEvent.InvalidMessage, event.data)
}
} catch (e) {
this.cxn.emit(ConnectionEvent.InvalidMessage, event.data)
}
}
} catch (e) {
this.status = Invalid
this.cxn.emit(ConnectionEvent.InvalidUrl)
}
}
}
+10 -2
View File
@@ -3,6 +3,7 @@ import {matchFilters, unionFilters, TrustedEvent} from '@welshman/util'
import type {Filter} from '@welshman/util'
import {Tracker} from "./Tracker"
import {Connection} from './Connection'
import {ConnectionEvent} from './ConnectionEvent'
// `subscribe` is a super function that handles batching subscriptions by merging
// them based on parameters (filters and subscribe opts), then splits them by relay.
@@ -249,7 +250,10 @@ const _executeSubscription = (sub: Subscription) => {
emitter.on(SubscriptionEvent.Complete, () => {
emitter.removeAllListeners()
subs.forEach(sub => sub.unsubscribe())
executor.target.connections.forEach((c: Connection) => c.off("close", onClose))
executor.target.connections.forEach((c: Connection) => {
c.off(ConnectionEvent.Close, onClose)
})
executor.target.cleanup()
})
@@ -287,13 +291,17 @@ const _executeSubscription = (sub: Subscription) => {
if (timeout) setTimeout(onComplete, timeout + authTimeout)
// If one of our connections gets closed make sure to kill our sub
executor.target.connections.forEach((c: Connection) => c.on('close', onClose))
executor.target.connections.forEach((c: Connection) => {
c.on(ConnectionEvent.Close, onClose)
})
// Finally, start our subscription. If we didn't get any filters, don't even send the
// request, just close it. This can be valid when a caller fulfills a request themselves.
if (filters.length > 0) {
Promise.all(
executor.target.connections.map(async (connection: Connection) => {
await connection.open()
if (authTimeout) {
await connection.auth.waitIfPending({timeout: authTimeout})
}
+4 -2
View File
@@ -1,6 +1,9 @@
export * from "./Connection"
export * from "./ConnectionAuth"
export * from "./ConnectionMeta"
export * from "./ConnectionEvent"
export * from "./ConnectionSender"
export * from "./ConnectionState"
export * from "./ConnectionStats"
export * from "./Context"
export * from "./Executor"
export * from "./Pool"
@@ -11,7 +14,6 @@ export * from "./Sync"
export * from "./Tracker"
export * from "./target/Echo"
export * from "./target/Multi"
export * from "./target/Plex"
export * from "./target/Relay"
export * from "./target/Relays"
export * from "./target/Local"
-28
View File
@@ -1,28 +0,0 @@
import {Emitter} from '@welshman/lib'
import type {PlexMessage, Message} from '../Socket'
import type {Connection} from '../Connection'
export class Plex extends Emitter {
constructor(readonly urls: string[], readonly connection: Connection) {
super()
this.connection.on('receive', this.onMessage)
}
get connections() {
return [this.connection]
}
send = (...payload: Message) => {
this.connection.send([{relays: this.urls}, payload])
}
onMessage = (connection: Connection, [{relays}, [verb, ...payload]]: PlexMessage) => {
this.emit(verb, relays[0], ...payload)
}
cleanup = () => {
this.removeAllListeners()
this.connection.off('receive', this.onMessage)
}
}
+3 -2
View File
@@ -1,4 +1,5 @@
import {Emitter} from '@welshman/lib'
import {ConnectionEvent} from '../ConnectionEvent'
import type {Message} from '../Socket'
import type {Connection} from '../Connection'
@@ -6,7 +7,7 @@ export class Relay extends Emitter {
constructor(readonly connection: Connection) {
super()
this.connection.on('receive', this.onMessage)
this.connection.on(ConnectionEvent.Receive, this.onMessage)
}
get connections() {
@@ -23,6 +24,6 @@ export class Relay extends Emitter {
cleanup = () => {
this.removeAllListeners()
this.connection.off('receive', this.onMessage)
this.connection.off(ConnectionEvent.Receive, this.onMessage)
}
}
+2 -1
View File
@@ -1,13 +1,14 @@
import {Emitter} from '@welshman/lib'
import type {Message} from '../Socket'
import type {Connection} from '../Connection'
import {ConnectionEvent} from '../ConnectionEvent'
export class Relays extends Emitter {
constructor(readonly connections: Connection[]) {
super()
connections.forEach(connection => {
connection.on('receive', this.onMessage)
connection.on(ConnectionEvent.Receive, this.onMessage)
})
}
+1 -1
View File
@@ -27,7 +27,7 @@ export const isPublishedProfile = (profile: Profile): profile is PublishedProfil
export const makeProfile = (profile: Partial<Profile> = {}): Profile => {
const address = profile.lud06 || profile.lud16
const lnurl = address ? getLnUrl(address) : null
const lnurl = typeof address === 'string' ? getLnUrl(address) : null
return lnurl ? {lnurl, ...profile} : profile
}