Remove tsc-multi, re-install gts, apply autoformatting and linting

This commit is contained in:
Jon Staab
2024-12-17 10:59:27 -08:00
parent 0b86613161
commit f33e03740e
122 changed files with 2243 additions and 2178 deletions
+9 -9
View File
@@ -1,12 +1,12 @@
import {Emitter} from '@welshman/lib'
import {normalizeRelayUrl} from '@welshman/util'
import {Socket} from './Socket'
import type {Message} from './Socket'
import {ConnectionEvent} from './ConnectionEvent'
import {ConnectionState} from './ConnectionState'
import {ConnectionStats} from './ConnectionStats'
import {ConnectionAuth} from './ConnectionAuth'
import {ConnectionSender} from './ConnectionSender'
import {Emitter} from "@welshman/lib"
import {normalizeRelayUrl} from "@welshman/util"
import {Socket} from "./Socket.js"
import type {Message} from "./Socket.js"
import {ConnectionEvent} from "./ConnectionEvent.js"
import {ConnectionState} from "./ConnectionState.js"
import {ConnectionStats} from "./ConnectionStats.js"
import {ConnectionAuth} from "./ConnectionAuth.js"
import {ConnectionSender} from "./ConnectionSender.js"
export enum ConnectionStatus {
Open = "open",
+21 -32
View File
@@ -1,33 +1,26 @@
import {ctx, sleep} from '@welshman/lib'
import {CLIENT_AUTH, createEvent} from '@welshman/util'
import {ConnectionEvent} from './ConnectionEvent'
import type {Connection} from './Connection'
import type {Message} from './Socket'
import {ctx, sleep} from "@welshman/lib"
import {CLIENT_AUTH, createEvent} from "@welshman/util"
import {ConnectionEvent} from "./ConnectionEvent.js"
import type {Connection} from "./Connection.js"
import type {Message} from "./Socket.js"
export enum AuthMode {
Implicit = 'implicit',
Explicit = 'explicit',
Implicit = "implicit",
Explicit = "explicit",
}
export enum AuthStatus {
None = 'none',
Requested = 'requested',
PendingSignature = 'pending_signature',
DeniedSignature = 'denied_signature',
PendingResponse = 'pending_response',
Forbidden = 'forbidden',
Ok = 'ok',
None = "none",
Requested = "requested",
PendingSignature = "pending_signature",
DeniedSignature = "denied_signature",
PendingResponse = "pending_response",
Forbidden = "forbidden",
Ok = "ok",
}
const {
None,
Requested,
PendingSignature,
DeniedSignature,
PendingResponse,
Forbidden,
Ok,
} = AuthStatus
const {None, Requested, PendingSignature, DeniedSignature, PendingResponse, Forbidden, Ok} =
AuthStatus
export class ConnectionAuth {
challenge: string | undefined
@@ -41,7 +34,7 @@ export class ConnectionAuth {
}
#onReceive = (cxn: Connection, [verb, ...extra]: Message) => {
if (verb === 'OK') {
if (verb === "OK") {
const [id, ok, message] = extra
if (id === this.request) {
@@ -50,7 +43,7 @@ export class ConnectionAuth {
}
}
if (verb === 'AUTH' && extra[0] !== this.challenge) {
if (verb === "AUTH" && extra[0] !== this.challenge) {
this.challenge = extra[0]
this.request = undefined
this.message = undefined
@@ -81,8 +74,7 @@ export class ConnectionAuth {
}
}
waitForChallenge = async (timeout = 300) =>
this.waitFor(() => Boolean(this.challenge), timeout)
waitForChallenge = async (timeout = 300) => this.waitFor(() => Boolean(this.challenge), timeout)
waitForResolution = async (timeout = 300) =>
this.waitFor(() => [None, DeniedSignature, Forbidden, Ok].includes(this.status), timeout)
@@ -105,14 +97,11 @@ export class ConnectionAuth {
],
})
const [event] = await Promise.all([
ctx.net.signEvent(template),
this.cxn.socket.open(),
])
const [event] = await Promise.all([ctx.net.signEvent(template), this.cxn.socket.open()])
if (event) {
this.request = event.id
this.cxn.send(['AUTH', event])
this.cxn.send(["AUTH", event])
this.status = PendingResponse
} else {
this.status = DeniedSignature
+9 -9
View File
@@ -1,11 +1,11 @@
export enum ConnectionEvent {
InvalidUrl = 'invalid:url',
InvalidMessage = 'invalid:message:receive',
Open = 'socket:open',
Reset = 'socket:reset',
Close = 'socket:close',
Error = 'socket:error',
Receive = 'receive:message',
Notice = 'receive:notice',
Send = 'send:message',
InvalidUrl = "invalid:url",
InvalidMessage = "invalid:message:receive",
Open = "socket:open",
Reset = "socket:reset",
Close = "socket:close",
Error = "socket:error",
Receive = "receive:message",
Notice = "receive:notice",
Send = "send:message",
}
+12 -12
View File
@@ -1,9 +1,9 @@
import {Worker} from '@welshman/lib'
import {AUTH_JOIN} from '@welshman/util'
import {SocketStatus} from './Socket'
import type {Message} from './Socket'
import type {Connection} from './Connection'
import {AuthStatus} from './ConnectionAuth'
import {Worker} from "@welshman/lib"
import {AUTH_JOIN} from "@welshman/util"
import {SocketStatus} from "./Socket.js"
import type {Message} from "./Socket.js"
import type {Connection} from "./Connection.js"
import {AuthStatus} from "./ConnectionAuth.js"
export class ConnectionSender {
worker: Worker<Message>
@@ -12,22 +12,22 @@ export class ConnectionSender {
this.worker = new Worker({
shouldDefer: ([verb, ...extra]: Message) => {
// Always send CLOSE to clean up pending requests, even if the connection is closed
if (verb === 'CLOSE') return false
if (verb === "CLOSE") return false
// If we're not connected, nothing we can do
if (cxn.socket.status !== SocketStatus.Open) return true
// Always allow sending AUTH
if (verb === 'AUTH') return false
if (verb === "AUTH") return false
// Always allow sending join requests
if (verb === 'EVENT' && extra[0].kind === AUTH_JOIN) return false
if (verb === "EVENT" && extra[0].kind === AUTH_JOIN) return false
// Wait for auth
if (![AuthStatus.None, AuthStatus.Ok].includes(cxn.auth.status)) return true
// Limit concurrent requests
if (verb === 'REQ') return cxn.state.pendingRequests.size >= 8
if (verb === "REQ") return cxn.state.pendingRequests.size >= 8
return false
},
@@ -35,8 +35,8 @@ export class ConnectionSender {
this.worker.addGlobalHandler(([verb, ...extra]: Message) => {
// If we ended up handling a CLOSE before we handled the REQ, don't send the REQ
if (verb === 'CLOSE') {
this.worker.buffer = this.worker.buffer.filter(m => !(m[0] === 'REQ' && m[1] === extra[0]))
if (verb === "CLOSE") {
this.worker.buffer = this.worker.buffer.filter(m => !(m[0] === "REQ" && m[1] === extra[0]))
}
// Re-check socket status since we let CLOSE through
+19 -19
View File
@@ -1,9 +1,9 @@
import {sleep} from '@welshman/lib'
import {AUTH_JOIN} from '@welshman/util'
import type {SignedEvent, Filter} from '@welshman/util'
import type {Message} from './Socket'
import type {Connection} from './Connection'
import {ConnectionEvent} from './ConnectionEvent'
import {sleep} from "@welshman/lib"
import {AUTH_JOIN} from "@welshman/util"
import type {SignedEvent, Filter} from "@welshman/util"
import type {Message} from "./Socket.js"
import type {Connection} from "./Connection.js"
import {ConnectionEvent} from "./ConnectionEvent.js"
export type PublishState = {
sent: number
@@ -22,19 +22,19 @@ export class ConnectionState {
constructor(readonly cxn: Connection) {
cxn.sender.worker.addGlobalHandler(([verb, ...extra]: Message) => {
if (verb === 'REQ') {
if (verb === "REQ") {
const [reqId, ...filters] = extra
this.pendingRequests.set(reqId, {filters, sent: Date.now()})
}
if (verb === 'CLOSE') {
if (verb === "CLOSE") {
const [reqId] = extra
this.pendingRequests.delete(reqId)
}
if (verb === 'EVENT') {
if (verb === "EVENT") {
const [event] = extra
this.pendingPublishes.set(event.id, {sent: Date.now(), event})
@@ -42,21 +42,21 @@ export class ConnectionState {
})
cxn.socket.worker.addGlobalHandler(([verb, ...extra]: Message) => {
if (verb === 'OK') {
if (verb === "OK") {
const [eventId, _ok, notice] = extra
const pub = this.pendingPublishes.get(eventId)
if (!pub) return
// Re-enqueue pending events when auth challenge is received
if (notice?.startsWith('auth-required:') && pub.event.kind !== AUTH_JOIN) {
this.cxn.send(['EVENT', pub.event])
if (notice?.startsWith("auth-required:") && pub.event.kind !== AUTH_JOIN) {
this.cxn.send(["EVENT", pub.event])
} else {
this.pendingPublishes.delete(eventId)
}
}
if (verb === 'EOSE') {
if (verb === "EOSE") {
const [reqId] = extra
const req = this.pendingRequests.get(reqId)
@@ -65,15 +65,15 @@ export class ConnectionState {
}
}
if (verb === 'CLOSED') {
if (verb === "CLOSED") {
const [reqId] = extra
// Re-enqueue pending reqs when auth challenge is received
if (extra[1]?.startsWith('auth-required:')) {
if (extra[1]?.startsWith("auth-required:")) {
const req = this.pendingRequests.get(reqId)
if (req) {
this.cxn.send(['REQ', reqId, ...req.filters])
this.cxn.send(["REQ", reqId, ...req.filters])
}
if (extra[1]) {
@@ -84,7 +84,7 @@ export class ConnectionState {
this.pendingRequests.delete(reqId)
}
if (verb === 'NOTICE') {
if (verb === "NOTICE") {
const [notice] = extra
this.cxn.emit(ConnectionEvent.Notice, notice)
@@ -101,11 +101,11 @@ export class ConnectionState {
}
for (const [reqId, req] of this.pendingRequests.entries()) {
this.cxn.send(['REQ', reqId, ...req.filters])
this.cxn.send(["REQ", reqId, ...req.filters])
}
for (const [_, pub] of this.pendingPublishes.entries()) {
this.cxn.send(['EVENT', pub.event])
this.cxn.send(["EVENT", pub.event])
}
})
}
+13 -12
View File
@@ -1,6 +1,6 @@
import type {Message} from './Socket'
import type {Connection} from './Connection'
import {ConnectionEvent} from './ConnectionEvent'
import type {Message} from "./Socket.js"
import type {Connection} from "./Connection.js"
import {ConnectionEvent} from "./ConnectionEvent.js"
export class ConnectionStats {
openCount = 0
@@ -40,19 +40,19 @@ export class ConnectionStats {
})
cxn.on(ConnectionEvent.Send, (cxn: Connection, [verb]: Message) => {
if (verb === 'REQ') {
if (verb === "REQ") {
this.requestCount++
this.lastRequest = Date.now()
}
if (verb === 'EVENT') {
if (verb === "EVENT") {
this.publishCount++
this.lastPublish = Date.now()
}
})
cxn.on(ConnectionEvent.Receive, (cxn: Connection, [verb, ...extra]: Message) => {
if (verb === 'OK') {
if (verb === "OK") {
const pub = this.cxn.state.pendingPublishes.get(extra[0])
if (pub) {
@@ -66,16 +66,16 @@ export class ConnectionStats {
}
}
if (verb === 'AUTH') {
if (verb === "AUTH") {
this.lastAuth = Date.now()
}
if (verb === 'EVENT') {
if (verb === "EVENT") {
this.eventCount++
this.lastEvent = Date.now()
}
if (verb === 'EOSE') {
if (verb === "EOSE") {
const request = this.cxn.state.pendingRequests.get(extra[0])
// Only count the first eose
@@ -85,13 +85,14 @@ export class ConnectionStats {
}
}
if (verb === 'NOTICE') {
if (verb === "NOTICE") {
this.noticeCount++
}
})
}
getRequestSpeed = () => this.eoseCount ? this.eoseTimer / this.eoseCount : 0
getRequestSpeed = () => (this.eoseCount ? this.eoseTimer / this.eoseCount : 0)
getPublishSpeed = () => this.publishSuccessCount ? this.publishTimer / this.publishSuccessCount : 0
getPublishSpeed = () =>
this.publishSuccessCount ? this.publishTimer / this.publishSuccessCount : 0
}
+24 -17
View File
@@ -1,15 +1,21 @@
import {ctx, randomInt, uniq, noop, always} from '@welshman/lib'
import {LOCAL_RELAY_URL, matchFilters, unionFilters, isSignedEvent, hasValidSignature} from '@welshman/util'
import type {StampedEvent, SignedEvent, Filter, TrustedEvent} from '@welshman/util'
import {Pool} from "./Pool"
import {Executor} from "./Executor"
import {AuthMode} from "./ConnectionAuth"
import {Relays} from "./target/Relays"
import type {Subscription, RelaysAndFilters} from "./Subscribe"
import {ctx, randomInt, uniq, noop, always} from "@welshman/lib"
import {
LOCAL_RELAY_URL,
matchFilters,
unionFilters,
isSignedEvent,
hasValidSignature,
} from "@welshman/util"
import type {StampedEvent, SignedEvent, Filter, TrustedEvent} from "@welshman/util"
import {Pool} from "./Pool.js"
import {Executor} from "./Executor.js"
import {AuthMode} from "./ConnectionAuth.js"
import {Relays} from "./target/Relays.js"
import type {Subscription, RelaysAndFilters} from "./Subscribe.js"
export type NetContext = {
pool: Pool
authMode: AuthMode,
authMode: AuthMode
onEvent: (url: string, event: TrustedEvent) => void
signEvent: (event: StampedEvent) => Promise<SignedEvent | undefined>
getExecutor: (relays: string[]) => Executor
@@ -20,13 +26,12 @@ export type NetContext = {
}
export const defaultOptimizeSubscriptions = (subs: Subscription[]) =>
uniq(subs.flatMap(sub => sub.request.relays || []))
.map(relay => {
const relaySubs = subs.filter(sub => sub.request.relays.includes(relay))
const filters = unionFilters(relaySubs.flatMap(sub => sub.request.filters))
uniq(subs.flatMap(sub => sub.request.relays || [])).map(relay => {
const relaySubs = subs.filter(sub => sub.request.relays.includes(relay))
const filters = unionFilters(relaySubs.flatMap(sub => sub.request.filters))
return {relays: [relay], filters}
})
return {relays: [relay], filters}
})
export const eventValidationScores = new Map<string, number>()
@@ -56,8 +61,10 @@ export const getDefaultNetContext = (overrides: Partial<NetContext> = {}) => ({
signEvent: noop,
isDeleted: always(false),
isValid: isEventValid,
getExecutor: (relays: string[]) => new Executor(new Relays(relays.map((relay: string) => ctx.net.pool.get(relay)))),
matchFilters: (url: string, filters: Filter[], event: TrustedEvent) => matchFilters(filters, event),
getExecutor: (relays: string[]) =>
new Executor(new Relays(relays.map((relay: string) => ctx.net.pool.get(relay)))),
matchFilters: (url: string, filters: Filter[], event: TrustedEvent) =>
matchFilters(filters, event),
optimizeSubscriptions: defaultOptimizeSubscriptions,
...overrides,
})
+30 -30
View File
@@ -1,9 +1,9 @@
import {ctx, noop} from '@welshman/lib'
import type {Emitter} from '@welshman/lib'
import type {SignedEvent, TrustedEvent, Filter} from '@welshman/util'
import type {Message} from './Socket'
import type {Connection} from './Connection'
import {Negentropy, NegentropyStorageVector} from './Negentropy'
import {ctx, noop} from "@welshman/lib"
import type {Emitter} from "@welshman/lib"
import type {SignedEvent, TrustedEvent, Filter} from "@welshman/util"
import type {Message} from "./Socket.js"
import type {Connection} from "./Connection.js"
import {Negentropy, NegentropyStorageVector} from "./Negentropy.js"
export type Target = Emitter & {
connections: Connection[]
@@ -21,21 +21,21 @@ type EoseCallback = (url: string) => void
type CloseCallback = () => void
type OkCallback = (url: string, id: string, ...extra: any[]) => void
type ErrorCallback = (url: string, id: string, ...extra: any[]) => void
type DiffMessageCallback = (url: string, {have, need}: {have: string[], need: string[]}) => void
type SubscribeOpts = {onEvent?: EventCallback, onEose?: EoseCallback}
type PublishOpts = {verb?: string, onOk?: OkCallback, onError?: ErrorCallback}
type DiffOpts = {onError?: ErrorCallback, onMessage?: DiffMessageCallback, onClose?: CloseCallback}
type DiffMessage = {have: string[]; need: string[]}
type DiffMessageCallback = (url: string, {have, need}: DiffMessage) => void
type SubscribeOpts = {onEvent?: EventCallback; onEose?: EoseCallback}
type PublishOpts = {verb?: string; onOk?: OkCallback; onError?: ErrorCallback}
type DiffOpts = {onError?: ErrorCallback; onMessage?: DiffMessageCallback; onClose?: CloseCallback}
const createSubId = (prefix: string) => [prefix, Math.random().toString().slice(2, 10)].join('-')
const createSubId = (prefix: string) => [prefix, Math.random().toString().slice(2, 10)].join("-")
export class Executor {
constructor(readonly target: Target) {}
subscribe(filters: Filter[], {onEvent, onEose}: SubscribeOpts = {}) {
let closed = false
const id = createSubId('REQ')
const id = createSubId("REQ")
const eventListener = (url: string, subid: string, e: TrustedEvent) => {
if (subid === id) {
@@ -50,8 +50,8 @@ export class Executor {
}
}
this.target.on('EVENT', eventListener)
this.target.on('EOSE', eoseListener)
this.target.on("EVENT", eventListener)
this.target.on("EOSE", eoseListener)
this.target.send("REQ", id, ...filters)
return {
@@ -59,15 +59,15 @@ export class Executor {
if (closed) return
this.target.send("CLOSE", id).catch(noop)
this.target.off('EVENT', eventListener)
this.target.off('EOSE', eoseListener)
this.target.off("EVENT", eventListener)
this.target.off("EOSE", eoseListener)
closed = true
},
}
}
publish(event: SignedEvent, {verb = 'EVENT', onOk, onError}: PublishOpts = {}) {
publish(event: SignedEvent, {verb = "EVENT", onOk, onError}: PublishOpts = {}) {
const okListener = (url: string, id: string, ok: boolean, message: string) => {
if (id === event.id) {
if (ok) {
@@ -84,22 +84,22 @@ export class Executor {
}
}
this.target.on('OK', okListener)
this.target.on('ERROR', errorListener)
this.target.on("OK", okListener)
this.target.on("ERROR", errorListener)
this.target.send(verb, event)
return {
unsubscribe: () => {
this.target.off('OK', okListener)
this.target.off('ERROR', errorListener)
}
this.target.off("OK", okListener)
this.target.off("ERROR", errorListener)
},
}
}
diff(filter: Filter, events: TrustedEvent[], {onMessage, onError, onClose}: DiffOpts = {}) {
let closed = false
const id = createSubId('NEG')
const id = createSubId("NEG")
const storage = new NegentropyStorageVector()
const neg = new Negentropy(storage, 50_000)
@@ -116,7 +116,7 @@ export class Executor {
onMessage?.(url, {have, need})
if (newMsg) {
this.target.send('NEG-MSG', id, newMsg)
this.target.send("NEG-MSG", id, newMsg)
} else {
close()
}
@@ -132,16 +132,16 @@ export class Executor {
const close = () => {
if (closed) return
this.target.send('NEG-CLOSE', id).catch(noop)
this.target.off('NEG-MSG', msgListener)
this.target.off('NEG-ERR', errListener)
this.target.send("NEG-CLOSE", id).catch(noop)
this.target.off("NEG-MSG", msgListener)
this.target.off("NEG-ERR", errListener)
closed = true
onClose?.()
}
this.target.on('NEG-MSG', msgListener)
this.target.on('NEG-ERR', errListener)
this.target.on("NEG-MSG", msgListener)
this.target.on("NEG-ERR", errListener)
neg.initiate().then((msg: string) => {
this.target.send("NEG-OPEN", id, filter, msg)
+3 -3
View File
@@ -1,5 +1,5 @@
import {Emitter} from '@welshman/lib'
import {Connection} from "./Connection"
import {Emitter} from "@welshman/lib"
import {Connection} from "./Connection.js"
export class Pool extends Emitter {
data: Map<string, Connection>
@@ -24,7 +24,7 @@ export class Pool extends Emitter {
const newConnection = new Connection(url)
this.data.set(url, newConnection)
this.emit('init', newConnection)
this.emit("init", newConnection)
return newConnection
}
+7 -8
View File
@@ -1,7 +1,7 @@
import {ctx, Emitter, now, randomId, defer} from '@welshman/lib'
import type {Deferred} from '@welshman/lib'
import {asSignedEvent} from '@welshman/util'
import type {SignedEvent} from '@welshman/util'
import {ctx, Emitter, now, randomId, defer} from "@welshman/lib"
import type {Deferred} from "@welshman/lib"
import {asSignedEvent} from "@welshman/util"
import type {SignedEvent} from "@welshman/util"
export enum PublishStatus {
Pending = "pending",
@@ -34,8 +34,8 @@ export const makePublish = (request: PublishRequest) => {
const id = randomId()
const created_at = now()
const emitter = new Emitter()
const result: Publish['result'] = defer()
const status: Publish['status'] = new Map()
const result: Publish["result"] = defer()
const status: Publish["status"] = new Map()
return {id, created_at, request, emitter, result, status}
}
@@ -77,7 +77,7 @@ export const publish = (request: PublishRequest) => {
const timeout = setTimeout(() => abort(PublishStatus.Timeout), request.timeout || 10_000)
// If we have a signal, use it
request.signal?.addEventListener('abort', () => abort(PublishStatus.Aborted))
request.signal?.addEventListener("abort", () => abort(PublishStatus.Aborted))
// Delegate to our executor
const executorSub = executor.publish(event, {
@@ -96,4 +96,3 @@ export const publish = (request: PublishRequest) => {
return pub
}
+32 -36
View File
@@ -1,30 +1,20 @@
import WebSocket from "isomorphic-ws"
import {Worker, sleep} from '@welshman/lib'
import {ConnectionEvent} from './ConnectionEvent'
import type {Connection} from './Connection'
import {Worker, sleep} from "@welshman/lib"
import {ConnectionEvent} from "./ConnectionEvent.js"
import type {Connection} from "./Connection.js"
export type Message = [string, ...any[]]
export enum SocketStatus {
New = 'new',
Open = 'open',
Opening = 'opening',
Closing = 'closing',
Closed = 'closed',
Error = 'error',
Invalid = 'invalid',
New = "new",
Open = "open",
Opening = "opening",
Closing = "closing",
Closed = "closed",
Error = "error",
Invalid = "invalid",
}
const {
New,
Open,
Opening,
Closing,
Closed,
Error,
Invalid,
} = SocketStatus
export class Socket {
lastError = 0
status = SocketStatus.New
@@ -39,7 +29,7 @@ export class Socket {
}
wait = async () => {
while ([Opening, Closing].includes(this.status)) {
while ([SocketStatus.Opening, SocketStatus.Closing].includes(this.status)) {
await sleep(100)
}
}
@@ -49,19 +39,19 @@ export class Socket {
await this.wait()
// If the socket is closed, reset
if (this.status === Closed) {
this.status = New
if (this.status === SocketStatus.Closed) {
this.status = SocketStatus.New
this.cxn.emit(ConnectionEvent.Reset)
}
// If we're closed due to an error retry after a delay
if (this.status === Error && Date.now() - this.lastError > 15_000) {
this.status = New
if (this.status === SocketStatus.Error && Date.now() - this.lastError > 15_000) {
this.status = SocketStatus.New
this.cxn.emit(ConnectionEvent.Reset)
}
// If the socket is new, connect
if (this.status === New) {
if (this.status === SocketStatus.New) {
this.#init()
}
@@ -84,6 +74,10 @@ export class Socket {
send = async (message: Message) => {
await this.open()
if (!this.ws) {
throw new Error(`No websocket available when sending to ${this.cxn.url}`)
}
this.cxn.emit(ConnectionEvent.Send, message)
this.ws.send(JSON.stringify(message))
}
@@ -91,42 +85,44 @@ export class Socket {
#init = () => {
try {
this.ws = new WebSocket(this.cxn.url)
this.status = Opening
this.status = SocketStatus.Opening
this.ws.onopen = () => {
this.status = Open
this.status = SocketStatus.Open
this.cxn.emit(ConnectionEvent.Open)
}
this.ws.onerror = () => {
this.status = Error
this.status = SocketStatus.Error
this.lastError = Date.now()
this.cxn.emit(ConnectionEvent.Error)
}
this.ws.onclose = () => {
if (this.status !== Error) {
this.status = Closed
if (this.status !== SocketStatus.Error) {
this.status = SocketStatus.Closed
}
this.cxn.emit(ConnectionEvent.Close)
}
this.ws.onmessage = (event: {data: string}) => {
this.ws.onmessage = (event: any) => {
const data = event.data as string
try {
const message = JSON.parse(event.data as string)
const message = JSON.parse(data)
if (Array.isArray(message)) {
this.worker.push(message as Message)
} else {
this.cxn.emit(ConnectionEvent.InvalidMessage, event.data)
this.cxn.emit(ConnectionEvent.InvalidMessage, data)
}
} catch (e) {
this.cxn.emit(ConnectionEvent.InvalidMessage, event.data)
this.cxn.emit(ConnectionEvent.InvalidMessage, data)
}
}
} catch (e) {
this.status = Invalid
this.status = SocketStatus.Invalid
this.cxn.emit(ConnectionEvent.InvalidUrl)
}
}
+98 -90
View File
@@ -1,9 +1,15 @@
import {ctx, Emitter, max, chunk, randomId, once, groupBy, uniq} from '@welshman/lib'
import {LOCAL_RELAY_URL, matchFilters, normalizeRelayUrl, unionFilters, TrustedEvent} from '@welshman/util'
import type {Filter} from '@welshman/util'
import {Tracker} from "./Tracker"
import {Connection} from './Connection'
import {ConnectionEvent} from './ConnectionEvent'
import {ctx, Emitter, max, chunk, randomId, once, groupBy, uniq} from "@welshman/lib"
import {
LOCAL_RELAY_URL,
matchFilters,
normalizeRelayUrl,
unionFilters,
TrustedEvent,
} from "@welshman/util"
import type {Filter} from "@welshman/util"
import {Tracker} from "./Tracker.js"
import {Connection} from "./Connection.js"
import {ConnectionEvent} from "./ConnectionEvent.js"
// `subscribe` is a super function that handles batching subscriptions by merging
// them based on parameters (filters and subscribe opts), then splits them by relay.
@@ -71,9 +77,9 @@ export const calculateSubscriptionGroup = (sub: Subscription) => {
if (sub.request.timeout) parts.push(`timeout:${sub.request.timeout}`)
if (sub.request.authTimeout) parts.push(`authTimeout:${sub.request.authTimeout}`)
if (sub.request.closeOnEose) parts.push('closeOnEose')
if (sub.request.closeOnEose) parts.push("closeOnEose")
return parts.join('|')
return parts.join("|")
}
export const mergeSubscriptions = (subs: Subscription[]) => {
@@ -85,7 +91,7 @@ export const mergeSubscriptions = (subs: Subscription[]) => {
closeOnEose: subs.every(sub => sub.request.closeOnEose),
})
mergedSub.controller.signal.addEventListener('abort', () => {
mergedSub.controller.signal.addEventListener("abort", () => {
for (const sub of subs) {
sub.close()
}
@@ -130,93 +136,92 @@ export const mergeSubscriptions = (subs: Subscription[]) => {
}
export const optimizeSubscriptions = (subs: Subscription[]) => {
return Array.from(groupBy(calculateSubscriptionGroup, subs).values())
.flatMap(group => {
const timeout = max(group.map(sub => sub.request.timeout || 0))
const authTimeout = max(group.map(sub => sub.request.authTimeout || 0))
const closeOnEose = group.every(sub => sub.request.closeOnEose)
const completedSubs = new Set<string>()
const abortedSubs = new Set<string>()
const closedSubs = new Set<string>()
const eosedSubs = new Set<string>()
const sentSubs = new Set<string>()
const mergedSubs: Subscription[] = []
return Array.from(groupBy(calculateSubscriptionGroup, subs).values()).flatMap(group => {
const timeout = max(group.map(sub => sub.request.timeout || 0))
const authTimeout = max(group.map(sub => sub.request.authTimeout || 0))
const closeOnEose = group.every(sub => sub.request.closeOnEose)
const completedSubs = new Set<string>()
const abortedSubs = new Set<string>()
const closedSubs = new Set<string>()
const eosedSubs = new Set<string>()
const sentSubs = new Set<string>()
const mergedSubs: Subscription[] = []
for (const {relays, filters} of ctx.net.optimizeSubscriptions(group)) {
for (const filter of filters) {
const mergedSub = makeSubscription({
filters: [filter],
relays,
timeout,
authTimeout,
closeOnEose
})
for (const {relays, filters} of ctx.net.optimizeSubscriptions(group)) {
for (const filter of filters) {
const mergedSub = makeSubscription({
filters: [filter],
relays,
timeout,
authTimeout,
closeOnEose,
})
for (const {id, controller, request} of group) {
const onAbort = () => {
abortedSubs.add(id)
for (const {id, controller, request} of group) {
const onAbort = () => {
abortedSubs.add(id)
if (abortedSubs.size === group.length) {
mergedSub.close()
}
if (abortedSubs.size === group.length) {
mergedSub.close()
}
request.signal?.addEventListener('abort', onAbort)
controller.signal.addEventListener('abort', onAbort)
}
mergedSub.emitter.on(SubscriptionEvent.Event, (url: string, event: TrustedEvent) => {
request.signal?.addEventListener("abort", onAbort)
controller.signal.addEventListener("abort", onAbort)
}
mergedSub.emitter.on(SubscriptionEvent.Event, (url: string, event: TrustedEvent) => {
for (const sub of group) {
if (matchFilters(sub.request.filters, event) && !sub.tracker.track(event.id, url)) {
sub.emitter.emit(SubscriptionEvent.Event, url, event)
}
}
})
// Pass events back to caller
const propagateEvent = (type: SubscriptionEvent) =>
mergedSub.emitter.on(type, (url: string, event: TrustedEvent) => {
for (const sub of group) {
if (matchFilters(sub.request.filters, event) && !sub.tracker.track(event.id, url)) {
sub.emitter.emit(SubscriptionEvent.Event, url, event)
if (matchFilters(sub.request.filters, event)) {
sub.emitter.emit(type, url, event)
}
}
})
// Pass events back to caller
const propagateEvent = (type: SubscriptionEvent) =>
mergedSub.emitter.on(type, (url: string, event: TrustedEvent) => {
propagateEvent(SubscriptionEvent.Duplicate)
propagateEvent(SubscriptionEvent.DeletedEvent)
propagateEvent(SubscriptionEvent.Invalid)
const propagateFinality = (type: SubscriptionEvent, subIds: Set<string>) =>
mergedSub.emitter.on(type, (...args: any[]) => {
subIds.add(mergedSub.id)
// Wait for all subscriptions to complete before reporting finality to the caller.
// This is sub-optimal, but because we're outsourcing filter/relay optimization
// we can't make any assumptions about which caller subscriptions have completed
// at any given time.
if (subIds.size === mergedSubs.length) {
for (const sub of group) {
if (matchFilters(sub.request.filters, event)) {
sub.emitter.emit(type, url, event)
}
sub.emitter.emit(type, ...args)
}
})
}
propagateEvent(SubscriptionEvent.Duplicate)
propagateEvent(SubscriptionEvent.DeletedEvent)
propagateEvent(SubscriptionEvent.Invalid)
if (type === SubscriptionEvent.Complete) {
mergedSub.emitter.removeAllListeners()
}
})
const propagateFinality = (type: SubscriptionEvent, subIds: Set<string>) =>
mergedSub.emitter.on(type, (...args: any[]) => {
subIds.add(mergedSub.id)
propagateFinality(SubscriptionEvent.Send, sentSubs)
propagateFinality(SubscriptionEvent.Eose, eosedSubs)
propagateFinality(SubscriptionEvent.Close, closedSubs)
propagateFinality(SubscriptionEvent.Complete, completedSubs)
// Wait for all subscriptions to complete before reporting finality to the caller.
// This is sub-optimal, but because we're outsourcing filter/relay optimization
// we can't make any assumptions about which caller subscriptions have completed
// at any given time.
if (subIds.size === mergedSubs.length) {
for (const sub of group) {
sub.emitter.emit(type, ...args)
}
}
if (type === SubscriptionEvent.Complete) {
mergedSub.emitter.removeAllListeners()
}
})
propagateFinality(SubscriptionEvent.Send, sentSubs)
propagateFinality(SubscriptionEvent.Eose, eosedSubs)
propagateFinality(SubscriptionEvent.Close, closedSubs)
propagateFinality(SubscriptionEvent.Complete, completedSubs)
mergedSubs.push(mergedSub)
}
mergedSubs.push(mergedSub)
}
}
return mergedSubs
})
return mergedSubs
})
}
const _executeSubscription = (sub: Subscription) => {
@@ -267,19 +272,17 @@ const _executeSubscription = (sub: Subscription) => {
}
}
const onEose = (url: string) =>
emitter.emit(SubscriptionEvent.Eose, url)
const onEose = (url: string) => emitter.emit(SubscriptionEvent.Eose, url)
const onClose = (connection: Connection) =>
emitter.emit(SubscriptionEvent.Close, connection.url)
const onClose = (connection: Connection) => emitter.emit(SubscriptionEvent.Close, connection.url)
const onComplete = once(() => emitter.emit(SubscriptionEvent.Complete))
// Listen for abort via caller signal
signal?.addEventListener('abort', onComplete)
signal?.addEventListener("abort", onComplete)
// Listen for abort via our own internal signal
controller.signal.addEventListener('abort', onComplete)
controller.signal.addEventListener("abort", onComplete)
// If we have a timeout, complete the subscription automatically
if (timeout) setTimeout(onComplete, timeout + authTimeout)
@@ -297,7 +300,7 @@ const _executeSubscription = (sub: Subscription) => {
if (authTimeout) {
await connection.auth.attempt(authTimeout)
}
})
}),
).then(() => {
// If we send too many filters in a request relays will refuse to respond. REQs are rate
// limited client-side by Connection, so this will throttle concurrent requests.
@@ -333,9 +336,7 @@ export const executeSubscriptionBatched = (() => {
return (sub: Subscription) => {
subs.push(sub)
timeouts.push(
setTimeout(executeAll, Math.max(16, sub.request.delay!)) as unknown as number
)
timeouts.push(setTimeout(executeAll, Math.max(16, sub.request.delay!)) as unknown as number)
}
})()
@@ -346,7 +347,13 @@ export type SubscribeRequestWithHandlers = SubscribeRequest & {
onComplete?: () => void
}
export const subscribe = ({onEvent, onEose, onClose, onComplete, ...request}: SubscribeRequestWithHandlers) => {
export const subscribe = ({
onEvent,
onEose,
onClose,
onComplete,
...request
}: SubscribeRequestWithHandlers) => {
const sub: Subscription = makeSubscription({delay: 50, ...request})
for (const relay of request.relays) {
@@ -362,7 +369,8 @@ export const subscribe = ({onEvent, onEose, onClose, onComplete, ...request}: Su
}
// Signature for onEvent is different from emitter signature for historical reasons and convenience
if (onEvent) sub.emitter.on(SubscriptionEvent.Event, (url: string, event: TrustedEvent) => onEvent(event))
if (onEvent)
sub.emitter.on(SubscriptionEvent.Event, (url: string, event: TrustedEvent) => onEvent(event))
if (onEose) sub.emitter.on(SubscriptionEvent.Eose, onEose)
if (onClose) sub.emitter.on(SubscriptionEvent.Close, onClose)
if (onComplete) sub.emitter.on(SubscriptionEvent.Complete, onComplete)
+33 -30
View File
@@ -1,7 +1,7 @@
import {ctx, assoc, lt, groupBy, now, pushToMapKey, inc, flatten, chunk} from '@welshman/lib'
import type {SignedEvent, TrustedEvent, Filter} from '@welshman/util'
import {subscribe} from './Subscribe'
import {publish} from './Publish'
import {ctx, assoc, lt, groupBy, now, pushToMapKey, inc, flatten, chunk} from "@welshman/lib"
import type {SignedEvent, TrustedEvent, Filter} from "@welshman/util"
import {subscribe} from "./Subscribe.js"
import {publish} from "./Publish.js"
export type DiffOpts = {
relays: string[]
@@ -19,11 +19,11 @@ export const diff = async ({relays, filters, events}: DiffOpts) => {
const have = new Set<string>()
const need = new Set<string>()
await new Promise<void>((resolve, reject) => {
await new Promise<void>((resolve, reject) => {
executor.diff(filter, events, {
onClose: resolve,
onError: (_, message) => reject(message),
onMessage: (_, message) => {
onError: (url, message) => reject(message),
onMessage: (url, message) => {
for (const id of message.have) {
have.add(id)
}
@@ -36,29 +36,28 @@ export const diff = async ({relays, filters, events}: DiffOpts) => {
})
return {relay, have, need}
})
}),
)
})
)
}),
),
)
return Array.from(groupBy(diff => diff.relay, diffs).entries())
.map(([relay, diffs]) => {
const have = new Set<string>()
const need = new Set<string>()
return Array.from(groupBy(diff => diff.relay, diffs).entries()).map(([relay, diffs]) => {
const have = new Set<string>()
const need = new Set<string>()
for (const diff of diffs) {
for (const id of diff.have) {
have.add(id)
}
for (const id of diff.need) {
need.add(id)
}
for (const diff of diffs) {
for (const id of diff.have) {
have.add(id)
}
return {relay, have: Array.from(have), need: Array.from(need)}
})
for (const id of diff.need) {
need.add(id)
}
}
return {relay, have: Array.from(have), need: Array.from(need)}
})
}
export type PullOpts = {
@@ -103,9 +102,9 @@ export const pull = async ({relays, filters, events, onEvent}: PullOpts) => {
},
})
})
})
}),
)
})
}),
)
return result
@@ -133,7 +132,7 @@ export const push = async ({relays, filters, events}: PushOpts) => {
if (relays) {
await publish({event, relays}).result
}
})
}),
)
}
@@ -156,7 +155,11 @@ export type PullWithoutNegentropyOpts = {
onEvent?: (event: TrustedEvent) => void
}
export const pullWithoutNegentropy = async ({relays, filters, onEvent}: PullWithoutNegentropyOpts) => {
export const pullWithoutNegentropy = async ({
relays,
filters,
onEvent,
}: PullWithoutNegentropyOpts) => {
let done = false
let until = now() + 30
@@ -168,7 +171,7 @@ export const pullWithoutNegentropy = async ({relays, filters, onEvent}: PullWith
await new Promise<void>(resolve => {
subscribe({
relays,
filters: filters.filter(f => lt(f.since, until)).map(assoc('until', until)),
filters: filters.filter(f => lt(f.since, until)).map(assoc("until", until)),
closeOnEose: true,
onComplete: () => {
done = !anyResults
@@ -196,7 +199,7 @@ export const pushWithoutNegentropy = ({relays, events}: PushWithoutNegentropyOpt
Promise.all(
events.map(async event => {
await publish({event, relays}).result
})
}),
)
export const syncWithoutNegentropy = async (opts: SyncOpts) => {
+6 -6
View File
@@ -1,4 +1,4 @@
import {Emitter, addToMapKey} from '@welshman/lib'
import {Emitter, addToMapKey} from "@welshman/lib"
export class Tracker extends Emitter {
relaysById = new Map<string, Set<string>>()
@@ -36,7 +36,7 @@ export class Tracker extends Emitter {
this.relaysById.set(eventId, relays)
this.idsByRelay.set(eventId, relays)
this.emit('update')
this.emit("update")
}
removeRelay = (eventId: string, relay: string) => {
@@ -45,7 +45,7 @@ export class Tracker extends Emitter {
if (!didDeleteRelay && !didDeleteId) return
this.emit('update')
this.emit("update")
}
track = (eventId: string, relay: string) => {
@@ -62,7 +62,7 @@ export class Tracker extends Emitter {
}
}
load = (relaysById: Tracker['relaysById']) => {
load = (relaysById: Tracker["relaysById"]) => {
this.relaysById.clear()
this.idsByRelay.clear()
@@ -73,13 +73,13 @@ export class Tracker extends Emitter {
}
}
this.emit('update')
this.emit("update")
}
clear = () => {
this.relaysById.clear()
this.idsByRelay.clear()
this.emit('update')
this.emit("update")
}
}
+27 -19
View File
@@ -1,19 +1,27 @@
export * from "./Connection"
export * from "./ConnectionAuth"
export * from "./ConnectionEvent"
export * from "./ConnectionSender"
export * from "./ConnectionState"
export * from "./ConnectionStats"
export * from "./Context"
export * from "./Executor"
export * from "./Pool"
export * from "./Publish"
export * from "./Socket"
export * from "./Subscribe"
export * from "./Sync"
export * from "./Tracker"
export * from "./target/Echo"
export * from "./target/Multi"
export * from "./target/Relay"
export * from "./target/Relays"
export * from "./target/Local"
export * from "./Connection.js"
export * from "./ConnectionAuth.js"
export * from "./ConnectionEvent.js"
export * from "./ConnectionSender.js"
export * from "./ConnectionState.js"
export * from "./ConnectionStats.js"
export * from "./Context.js"
export * from "./Executor.js"
export * from "./Pool.js"
export * from "./Publish.js"
export * from "./Socket.js"
export * from "./Subscribe.js"
export * from "./Sync.js"
export * from "./Tracker.js"
export * from "./target/Echo.js"
export * from "./target/Multi.js"
export * from "./target/Relay.js"
export * from "./target/Relays.js"
export * from "./target/Local.js"
import type {NetContext} from "./Context.js"
declare module "@welshman/lib" {
interface Context {
net: NetContext
}
}
+2 -2
View File
@@ -1,5 +1,5 @@
import {Emitter} from '@welshman/lib'
import type {Message} from '../Socket'
import {Emitter} from "@welshman/lib"
import type {Message} from "../Socket.js"
export class Echo extends Emitter {
get connections() {
+5 -5
View File
@@ -1,12 +1,12 @@
import {Emitter} from '@welshman/lib'
import {Relay, LOCAL_RELAY_URL} from '@welshman/util'
import type {Message} from '../Socket'
import {Emitter} from "@welshman/lib"
import {Relay, LOCAL_RELAY_URL} from "@welshman/util"
import type {Message} from "../Socket.js"
export class Local extends Emitter {
constructor(readonly relay: Relay) {
super()
relay.on('*', this.onMessage)
relay.on("*", this.onMessage)
}
get connections() {
@@ -25,6 +25,6 @@ export class Local extends Emitter {
cleanup = () => {
this.removeAllListeners()
this.relay.off('*', this.onMessage)
this.relay.off("*", this.onMessage)
}
}
+4 -4
View File
@@ -1,13 +1,13 @@
import {Emitter} from '@welshman/lib'
import type {Message} from '../Socket'
import type {Target} from '../Executor'
import {Emitter} from "@welshman/lib"
import type {Message} from "../Socket.js"
import type {Target} from "../Executor.js"
export class Multi extends Emitter {
constructor(readonly targets: Target[]) {
super()
targets.forEach(t => {
t.on('*', (verb, ...args) => this.emit(verb, ...args))
t.on("*", (verb, ...args) => this.emit(verb, ...args))
})
}
+4 -4
View File
@@ -1,7 +1,7 @@
import {Emitter} from '@welshman/lib'
import {ConnectionEvent} from '../ConnectionEvent'
import type {Message} from '../Socket'
import type {Connection} from '../Connection'
import {Emitter} from "@welshman/lib"
import {ConnectionEvent} from "../ConnectionEvent.js"
import type {Message} from "../Socket.js"
import type {Connection} from "../Connection.js"
export class Relay extends Emitter {
constructor(readonly connection: Connection) {
+5 -5
View File
@@ -1,7 +1,7 @@
import {Emitter} from '@welshman/lib'
import type {Message} from '../Socket'
import type {Connection} from '../Connection'
import {ConnectionEvent} from '../ConnectionEvent'
import {Emitter} from "@welshman/lib"
import type {Message} from "../Socket.js"
import type {Connection} from "../Connection.js"
import {ConnectionEvent} from "../ConnectionEvent.js"
export class Relays extends Emitter {
constructor(readonly connections: Connection[]) {
@@ -23,7 +23,7 @@ export class Relays extends Emitter {
cleanup = () => {
this.removeAllListeners()
this.connections.forEach(connection => {
connection.off('receive', this.onMessage)
connection.off("receive", this.onMessage)
})
}
}
-8
View File
@@ -1,8 +0,0 @@
import type {NetContext} from './Context'
declare module "@welshman/lib" {
interface Context {
net: NetContext
}
}