Move net2 to net, update dvm
This commit is contained in:
@@ -1,70 +0,0 @@
|
||||
import {Emitter} from "@welshman/lib"
|
||||
import {normalizeRelayUrl} from "@welshman/util"
|
||||
import {Socket} from "./Socket.js"
|
||||
import type {Message} from "./Socket.js"
|
||||
import {ConnectionEvent} from "./ConnectionEvent.js"
|
||||
import {ConnectionState} from "./ConnectionState.js"
|
||||
import {ConnectionStats} from "./ConnectionStats.js"
|
||||
import {ConnectionAuth} from "./ConnectionAuth.js"
|
||||
import {ConnectionSender} from "./ConnectionSender.js"
|
||||
|
||||
export enum ConnectionStatus {
|
||||
Open = "open",
|
||||
Closed = "Closed",
|
||||
}
|
||||
|
||||
const {Open, Closed} = ConnectionStatus
|
||||
|
||||
export class Connection extends Emitter {
|
||||
url: string
|
||||
socket: Socket
|
||||
sender: ConnectionSender
|
||||
state: ConnectionState
|
||||
stats: ConnectionStats
|
||||
auth: ConnectionAuth
|
||||
status = Open
|
||||
|
||||
constructor(url: string) {
|
||||
super()
|
||||
|
||||
if (url !== normalizeRelayUrl(url)) {
|
||||
console.warn(`Attempted to open connection to non-normalized url ${url}`)
|
||||
}
|
||||
|
||||
this.url = url
|
||||
this.socket = new Socket(this)
|
||||
this.sender = new ConnectionSender(this)
|
||||
this.state = new ConnectionState(this)
|
||||
this.stats = new ConnectionStats(this)
|
||||
this.auth = new ConnectionAuth(this)
|
||||
this.setMaxListeners(100)
|
||||
}
|
||||
|
||||
emit = (type: ConnectionEvent, ...args: any[]) => super.emit(type, this, ...args)
|
||||
|
||||
send = async (message: Message) => {
|
||||
if (this.status !== Open) {
|
||||
throw new Error(`Attempted to send message on ${this.status} connection`)
|
||||
}
|
||||
|
||||
this.socket.open()
|
||||
this.sender.push(message)
|
||||
}
|
||||
|
||||
open = () => {
|
||||
this.status = Open
|
||||
this.socket.open()
|
||||
this.sender.worker.resume()
|
||||
}
|
||||
|
||||
close = () => {
|
||||
this.status = Closed
|
||||
this.socket.close()
|
||||
this.sender.worker.pause()
|
||||
}
|
||||
|
||||
cleanup = () => {
|
||||
this.close()
|
||||
this.removeAllListeners()
|
||||
}
|
||||
}
|
||||
@@ -1,120 +0,0 @@
|
||||
import {ctx, sleep} from "@welshman/lib"
|
||||
import {CLIENT_AUTH, createEvent} from "@welshman/util"
|
||||
import {ConnectionEvent} from "./ConnectionEvent.js"
|
||||
import type {Connection} from "./Connection.js"
|
||||
import type {Message} from "./Socket.js"
|
||||
|
||||
export enum AuthMode {
|
||||
Implicit = "implicit",
|
||||
Explicit = "explicit",
|
||||
}
|
||||
|
||||
export enum AuthStatus {
|
||||
None = "none",
|
||||
Requested = "requested",
|
||||
PendingSignature = "pending_signature",
|
||||
DeniedSignature = "denied_signature",
|
||||
PendingResponse = "pending_response",
|
||||
Forbidden = "forbidden",
|
||||
Ok = "ok",
|
||||
}
|
||||
|
||||
const {None, Requested, PendingSignature, DeniedSignature, PendingResponse, Forbidden, Ok} =
|
||||
AuthStatus
|
||||
|
||||
export class ConnectionAuth {
|
||||
challenge: string | undefined
|
||||
request: string | undefined
|
||||
message: string | undefined
|
||||
status = None
|
||||
|
||||
constructor(readonly cxn: Connection) {
|
||||
this.cxn.on(ConnectionEvent.Close, this.#onClose)
|
||||
this.cxn.on(ConnectionEvent.Receive, this.#onReceive)
|
||||
}
|
||||
|
||||
#onReceive = (cxn: Connection, [verb, ...extra]: Message) => {
|
||||
if (verb === "OK") {
|
||||
const [id, ok, message] = extra
|
||||
|
||||
if (id === this.request) {
|
||||
this.message = message
|
||||
this.status = ok ? Ok : Forbidden
|
||||
}
|
||||
}
|
||||
|
||||
if (verb === "AUTH" && extra[0] !== this.challenge) {
|
||||
this.challenge = extra[0]
|
||||
this.request = undefined
|
||||
this.message = undefined
|
||||
this.status = Requested
|
||||
|
||||
if (ctx.net.authMode === AuthMode.Implicit) {
|
||||
this.respond()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#onClose = (cxn: Connection) => {
|
||||
this.challenge = undefined
|
||||
this.request = undefined
|
||||
this.message = undefined
|
||||
this.status = None
|
||||
}
|
||||
|
||||
waitFor = async (condition: () => boolean, timeout = 300) => {
|
||||
const start = Date.now()
|
||||
|
||||
while (Date.now() - timeout <= start) {
|
||||
if (condition()) {
|
||||
break
|
||||
}
|
||||
await sleep(Math.min(100, Math.ceil(timeout / 3)))
|
||||
}
|
||||
}
|
||||
|
||||
waitForChallenge = async (timeout = 300) => this.waitFor(() => Boolean(this.challenge), timeout)
|
||||
|
||||
waitForResolution = async (timeout = 300) =>
|
||||
this.waitFor(() => [None, DeniedSignature, Forbidden, Ok].includes(this.status), timeout)
|
||||
|
||||
respond = async () => {
|
||||
if (!this.challenge) {
|
||||
throw new Error("Attempted to authenticate with no challenge")
|
||||
}
|
||||
|
||||
if (this.status !== Requested) {
|
||||
throw new Error(`Attempted to authenticate when auth is already ${this.status}`)
|
||||
}
|
||||
|
||||
this.status = PendingSignature
|
||||
|
||||
const template = createEvent(CLIENT_AUTH, {
|
||||
tags: [
|
||||
["relay", this.cxn.url],
|
||||
["challenge", this.challenge],
|
||||
],
|
||||
})
|
||||
|
||||
const [event] = await Promise.all([ctx.net.signEvent(template), this.cxn.socket.open()])
|
||||
|
||||
if (event) {
|
||||
this.request = event.id
|
||||
this.cxn.send(["AUTH", event])
|
||||
this.status = PendingResponse
|
||||
} else {
|
||||
this.status = DeniedSignature
|
||||
}
|
||||
}
|
||||
|
||||
attempt = async (timeout = 300) => {
|
||||
await this.cxn.socket.open()
|
||||
await this.waitForChallenge(Math.ceil(timeout / 2))
|
||||
|
||||
if (this.status === Requested) {
|
||||
await this.respond()
|
||||
}
|
||||
|
||||
await this.waitForResolution(Math.ceil(timeout / 2))
|
||||
}
|
||||
}
|
||||
@@ -1,11 +0,0 @@
|
||||
export enum ConnectionEvent {
|
||||
InvalidUrl = "invalid:url",
|
||||
InvalidMessage = "invalid:message:receive",
|
||||
Open = "socket:open",
|
||||
Reset = "socket:reset",
|
||||
Close = "socket:close",
|
||||
Error = "socket:error",
|
||||
Receive = "receive:message",
|
||||
Notice = "receive:notice",
|
||||
Send = "send:message",
|
||||
}
|
||||
@@ -1,57 +0,0 @@
|
||||
import {Worker, complement, spec} from "@welshman/lib"
|
||||
import {AUTH_JOIN} from "@welshman/util"
|
||||
import {SocketStatus} from "./Socket.js"
|
||||
import type {Message} from "./Socket.js"
|
||||
import type {Connection} from "./Connection.js"
|
||||
import {AuthStatus} from "./ConnectionAuth.js"
|
||||
|
||||
export class ConnectionSender {
|
||||
worker: Worker<Message>
|
||||
|
||||
constructor(readonly cxn: Connection) {
|
||||
this.worker = new Worker({
|
||||
shouldDefer: (message: Message) => {
|
||||
const verb = message[0]
|
||||
|
||||
// Always send CLOSE to clean up pending requests
|
||||
if (verb === "CLOSE") return false
|
||||
|
||||
// If we're not connected, nothing we can do
|
||||
if (cxn.socket.status !== SocketStatus.Open) return true
|
||||
|
||||
// Always allow sending AUTH
|
||||
if (verb === "AUTH") return false
|
||||
|
||||
// Always allow sending join requests
|
||||
if (verb === "EVENT" && message[1].kind === AUTH_JOIN) return false
|
||||
|
||||
// Wait for auth
|
||||
if (![AuthStatus.None, AuthStatus.Ok].includes(cxn.auth.status)) return true
|
||||
|
||||
// Limit concurrent requests
|
||||
if (verb === "REQ") return cxn.state.pendingRequests.size >= 50
|
||||
|
||||
return false
|
||||
},
|
||||
})
|
||||
|
||||
this.worker.addGlobalHandler((message: Message) => {
|
||||
const verb = message[0]
|
||||
|
||||
// If we're closing something that never got sent, skip it
|
||||
if (verb === "CLOSE" && !cxn.state.pendingRequests.has(message[1])) {
|
||||
return
|
||||
}
|
||||
cxn.socket.send(message)
|
||||
})
|
||||
}
|
||||
|
||||
push = (message: Message) => {
|
||||
// If we ended up handling a CLOSE before we sent the REQ, don't send the REQ
|
||||
if (message[0] === "CLOSE") {
|
||||
this.worker.buffer = this.worker.buffer.filter(complement(spec(["REQ", message[1]])))
|
||||
}
|
||||
|
||||
this.worker.push(message)
|
||||
}
|
||||
}
|
||||
@@ -1,112 +0,0 @@
|
||||
import {sleep} from "@welshman/lib"
|
||||
import {AUTH_JOIN} from "@welshman/util"
|
||||
import type {SignedEvent, Filter} from "@welshman/util"
|
||||
import type {Message} from "./Socket.js"
|
||||
import type {Connection} from "./Connection.js"
|
||||
import {ConnectionEvent} from "./ConnectionEvent.js"
|
||||
|
||||
export type PublishState = {
|
||||
sent: number
|
||||
event: SignedEvent
|
||||
}
|
||||
|
||||
export type RequestState = {
|
||||
sent: number
|
||||
filters: Filter[]
|
||||
eose?: boolean
|
||||
}
|
||||
|
||||
export class ConnectionState {
|
||||
pendingPublishes = new Map<string, PublishState>()
|
||||
pendingRequests = new Map<string, RequestState>()
|
||||
|
||||
constructor(readonly cxn: Connection) {
|
||||
cxn.sender.worker.addGlobalHandler(([verb, ...extra]: Message) => {
|
||||
if (verb === "REQ") {
|
||||
const [reqId, ...filters] = extra
|
||||
|
||||
this.pendingRequests.set(reqId, {filters, sent: Date.now()})
|
||||
}
|
||||
|
||||
if (verb === "CLOSE") {
|
||||
const [reqId] = extra
|
||||
|
||||
this.pendingRequests.delete(reqId)
|
||||
}
|
||||
|
||||
if (verb === "EVENT") {
|
||||
const [event] = extra
|
||||
|
||||
this.pendingPublishes.set(event.id, {sent: Date.now(), event})
|
||||
}
|
||||
})
|
||||
|
||||
cxn.socket.worker.addGlobalHandler(([verb, ...extra]: Message) => {
|
||||
if (verb === "OK") {
|
||||
const [eventId, _ok, notice] = extra
|
||||
const pub = this.pendingPublishes.get(eventId)
|
||||
|
||||
if (!pub) return
|
||||
|
||||
// Re-enqueue pending events when auth challenge is received
|
||||
if (notice?.startsWith("auth-required:") && pub.event.kind !== AUTH_JOIN) {
|
||||
this.cxn.send(["EVENT", pub.event])
|
||||
} else {
|
||||
this.pendingPublishes.delete(eventId)
|
||||
}
|
||||
}
|
||||
|
||||
if (verb === "EOSE") {
|
||||
const [reqId] = extra
|
||||
const req = this.pendingRequests.get(reqId)
|
||||
|
||||
if (req) {
|
||||
req.eose = true
|
||||
}
|
||||
}
|
||||
|
||||
if (verb === "CLOSED") {
|
||||
const [reqId] = extra
|
||||
|
||||
// Re-enqueue pending reqs when auth challenge is received
|
||||
if (extra[1]?.startsWith("auth-required:")) {
|
||||
const req = this.pendingRequests.get(reqId)
|
||||
|
||||
if (req) {
|
||||
this.cxn.send(["REQ", reqId, ...req.filters])
|
||||
}
|
||||
|
||||
if (extra[1]) {
|
||||
this.cxn.emit(ConnectionEvent.Notice, extra[1])
|
||||
}
|
||||
}
|
||||
|
||||
this.pendingRequests.delete(reqId)
|
||||
}
|
||||
|
||||
if (verb === "NOTICE") {
|
||||
const [notice] = extra
|
||||
|
||||
this.cxn.emit(ConnectionEvent.Notice, notice)
|
||||
}
|
||||
})
|
||||
|
||||
// Whenever we reconnect, re-enqueue pending stuff. Delay this so that if a connection
|
||||
// is flapping we're not sending too much noise.
|
||||
cxn.on(ConnectionEvent.Close, async (cxn: Connection) => {
|
||||
await sleep(10_000)
|
||||
|
||||
if (this.pendingRequests.size > 0 || this.pendingPublishes.size > 0) {
|
||||
this.cxn.open()
|
||||
}
|
||||
|
||||
for (const [reqId, req] of this.pendingRequests.entries()) {
|
||||
this.cxn.send(["REQ", reqId, ...req.filters])
|
||||
}
|
||||
|
||||
for (const [_, pub] of this.pendingPublishes.entries()) {
|
||||
this.cxn.send(["EVENT", pub.event])
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -1,98 +0,0 @@
|
||||
import type {Message} from "./Socket.js"
|
||||
import type {Connection} from "./Connection.js"
|
||||
import {ConnectionEvent} from "./ConnectionEvent.js"
|
||||
|
||||
export class ConnectionStats {
|
||||
openCount = 0
|
||||
closeCount = 0
|
||||
errorCount = 0
|
||||
publishCount = 0
|
||||
requestCount = 0
|
||||
eventCount = 0
|
||||
lastOpen = 0
|
||||
lastClose = 0
|
||||
lastError = 0
|
||||
lastPublish = 0
|
||||
lastRequest = 0
|
||||
lastEvent = 0
|
||||
lastAuth = 0
|
||||
publishTimer = 0
|
||||
publishSuccessCount = 0
|
||||
publishFailureCount = 0
|
||||
eoseCount = 0
|
||||
eoseTimer = 0
|
||||
noticeCount = 0
|
||||
|
||||
constructor(readonly cxn: Connection) {
|
||||
cxn.on(ConnectionEvent.Open, (cxn: Connection) => {
|
||||
this.openCount++
|
||||
this.lastOpen = Date.now()
|
||||
})
|
||||
|
||||
cxn.on(ConnectionEvent.Close, (cxn: Connection) => {
|
||||
this.closeCount++
|
||||
this.lastClose = Date.now()
|
||||
})
|
||||
|
||||
cxn.on(ConnectionEvent.Error, (cxn: Connection) => {
|
||||
this.errorCount++
|
||||
this.lastError = Date.now()
|
||||
})
|
||||
|
||||
cxn.on(ConnectionEvent.Send, (cxn: Connection, [verb]: Message) => {
|
||||
if (verb === "REQ") {
|
||||
this.requestCount++
|
||||
this.lastRequest = Date.now()
|
||||
}
|
||||
|
||||
if (verb === "EVENT") {
|
||||
this.publishCount++
|
||||
this.lastPublish = Date.now()
|
||||
}
|
||||
})
|
||||
|
||||
cxn.on(ConnectionEvent.Receive, (cxn: Connection, [verb, ...extra]: Message) => {
|
||||
if (verb === "OK") {
|
||||
const pub = this.cxn.state.pendingPublishes.get(extra[0])
|
||||
|
||||
if (pub) {
|
||||
this.publishTimer += Date.now() - pub.sent
|
||||
}
|
||||
|
||||
if (extra[1]) {
|
||||
this.publishSuccessCount++
|
||||
} else {
|
||||
this.publishFailureCount++
|
||||
}
|
||||
}
|
||||
|
||||
if (verb === "AUTH") {
|
||||
this.lastAuth = Date.now()
|
||||
}
|
||||
|
||||
if (verb === "EVENT") {
|
||||
this.eventCount++
|
||||
this.lastEvent = Date.now()
|
||||
}
|
||||
|
||||
if (verb === "EOSE") {
|
||||
const request = this.cxn.state.pendingRequests.get(extra[0])
|
||||
|
||||
// Only count the first eose
|
||||
if (request && !request.eose) {
|
||||
this.eoseCount++
|
||||
this.eoseTimer += Date.now() - request.sent
|
||||
}
|
||||
}
|
||||
|
||||
if (verb === "NOTICE") {
|
||||
this.noticeCount++
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
getRequestSpeed = () => (this.eoseCount ? this.eoseTimer / this.eoseCount : 0)
|
||||
|
||||
getPublishSpeed = () =>
|
||||
this.publishSuccessCount ? this.publishTimer / this.publishSuccessCount : 0
|
||||
}
|
||||
@@ -1,70 +0,0 @@
|
||||
import {ctx, randomInt, uniq, noop, always} from "@welshman/lib"
|
||||
import {
|
||||
LOCAL_RELAY_URL,
|
||||
matchFilters,
|
||||
unionFilters,
|
||||
isSignedEvent,
|
||||
hasValidSignature,
|
||||
} from "@welshman/util"
|
||||
import type {StampedEvent, SignedEvent, Filter, TrustedEvent} from "@welshman/util"
|
||||
import {Pool} from "./Pool.js"
|
||||
import {Executor} from "./Executor.js"
|
||||
import {AuthMode} from "./ConnectionAuth.js"
|
||||
import {Relays} from "./target/Relays.js"
|
||||
import type {Subscription, RelaysAndFilters} from "./Subscribe.js"
|
||||
|
||||
export type NetContext = {
|
||||
pool: Pool
|
||||
authMode: AuthMode
|
||||
onEvent: (url: string, event: TrustedEvent) => void
|
||||
signEvent: (event: StampedEvent) => Promise<SignedEvent | undefined>
|
||||
getExecutor: (relays: string[]) => Executor
|
||||
isDeleted: (url: string, event: TrustedEvent) => boolean
|
||||
isValid: (url: string, event: TrustedEvent) => boolean
|
||||
matchFilters: (url: string, filters: Filter[], event: TrustedEvent) => boolean
|
||||
optimizeSubscriptions: (subs: Subscription[]) => RelaysAndFilters[]
|
||||
}
|
||||
|
||||
export const defaultOptimizeSubscriptions = (subs: Subscription[]) =>
|
||||
uniq(subs.flatMap(sub => sub.request.relays || [])).map(relay => {
|
||||
const relaySubs = subs.filter(sub => sub.request.relays.includes(relay))
|
||||
const filters = unionFilters(relaySubs.flatMap(sub => sub.request.filters))
|
||||
|
||||
return {relays: [relay], filters}
|
||||
})
|
||||
|
||||
export const eventValidationScores = new Map<string, number>()
|
||||
|
||||
export const isEventValid = (url: string, event: TrustedEvent) => {
|
||||
if (url === LOCAL_RELAY_URL) return true
|
||||
|
||||
const validCount = eventValidationScores.get(url) || 0
|
||||
|
||||
// The more events we've actually validated from this relay, the more we can trust it.
|
||||
if (validCount > randomInt(100, 1000)) return true
|
||||
|
||||
const isValid = isSignedEvent(event) && hasValidSignature(event)
|
||||
|
||||
// If the event was valid, increase the relay's score. If not, reset it
|
||||
// Never validate less than 10% to make sure we're never totally checking out
|
||||
if (!isValid || validCount < 900) {
|
||||
eventValidationScores.set(url, isValid ? validCount + 1 : 0)
|
||||
}
|
||||
|
||||
return isValid
|
||||
}
|
||||
|
||||
export const getDefaultNetContext = (overrides: Partial<NetContext> = {}) => ({
|
||||
pool: new Pool(),
|
||||
authMode: AuthMode.Implicit,
|
||||
onEvent: noop,
|
||||
signEvent: noop,
|
||||
isDeleted: always(false),
|
||||
isValid: isEventValid,
|
||||
getExecutor: (relays: string[]) =>
|
||||
new Executor(new Relays(relays.map((relay: string) => ctx.net.pool.get(relay)))),
|
||||
matchFilters: (url: string, filters: Filter[], event: TrustedEvent) =>
|
||||
matchFilters(filters, event),
|
||||
optimizeSubscriptions: defaultOptimizeSubscriptions,
|
||||
...overrides,
|
||||
})
|
||||
@@ -1,154 +0,0 @@
|
||||
import {ctx, noop} from "@welshman/lib"
|
||||
import type {Emitter} from "@welshman/lib"
|
||||
import type {SignedEvent, TrustedEvent, Filter} from "@welshman/util"
|
||||
import type {Message} from "./Socket.js"
|
||||
import type {Connection} from "./Connection.js"
|
||||
import {Negentropy, NegentropyStorageVector} from "./Negentropy.js"
|
||||
|
||||
export type Target = Emitter & {
|
||||
connections: Connection[]
|
||||
send: (...args: Message) => Promise<void>
|
||||
cleanup: () => void
|
||||
}
|
||||
|
||||
export type NegentropyMessage = {
|
||||
have: string[]
|
||||
need: string[]
|
||||
}
|
||||
|
||||
type EventCallback = (url: string, event: TrustedEvent) => void
|
||||
type EoseCallback = (url: string) => void
|
||||
type CloseCallback = () => void
|
||||
type OkCallback = (url: string, id: string, ...extra: any[]) => void
|
||||
type ErrorCallback = (url: string, id: string, ...extra: any[]) => void
|
||||
type DiffMessage = {have: string[]; need: string[]}
|
||||
type DiffMessageCallback = (url: string, {have, need}: DiffMessage) => void
|
||||
type SubscribeOpts = {onEvent?: EventCallback; onEose?: EoseCallback}
|
||||
type PublishOpts = {verb?: string; onOk?: OkCallback; onError?: ErrorCallback}
|
||||
type DiffOpts = {onError?: ErrorCallback; onMessage?: DiffMessageCallback; onClose?: CloseCallback}
|
||||
|
||||
const createSubId = (prefix: string) => `${prefix}-${Math.random().toString().slice(2, 10)}`
|
||||
|
||||
export class Executor {
|
||||
constructor(readonly target: Target) {}
|
||||
|
||||
subscribe(filters: Filter[], {onEvent, onEose}: SubscribeOpts = {}) {
|
||||
let closed = false
|
||||
|
||||
const id = createSubId("REQ")
|
||||
|
||||
const eventListener = (url: string, subid: string, e: TrustedEvent) => {
|
||||
if (subid === id) {
|
||||
ctx.net.onEvent(url, e)
|
||||
onEvent?.(url, e)
|
||||
}
|
||||
}
|
||||
|
||||
const eoseListener = (url: string, subid: string) => {
|
||||
if (subid === id) {
|
||||
onEose?.(url)
|
||||
}
|
||||
}
|
||||
|
||||
this.target.on("EVENT", eventListener)
|
||||
this.target.on("EOSE", eoseListener)
|
||||
this.target.send("REQ", id, ...filters)
|
||||
|
||||
return {
|
||||
unsubscribe: () => {
|
||||
if (closed) return
|
||||
|
||||
this.target.send("CLOSE", id).catch(noop)
|
||||
this.target.off("EVENT", eventListener)
|
||||
this.target.off("EOSE", eoseListener)
|
||||
|
||||
closed = true
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
publish(event: SignedEvent, {verb = "EVENT", onOk, onError}: PublishOpts = {}) {
|
||||
const okListener = (url: string, id: string, ok: boolean, message: string) => {
|
||||
if (id === event.id) {
|
||||
if (ok) {
|
||||
ctx.net.onEvent(url, event)
|
||||
}
|
||||
|
||||
onOk?.(url, id, ok, message)
|
||||
}
|
||||
}
|
||||
|
||||
const errorListener = (url: string, id: string, ...payload: any[]) => {
|
||||
if (id === event.id) {
|
||||
onError?.(url, id, ...payload)
|
||||
}
|
||||
}
|
||||
|
||||
this.target.on("OK", okListener)
|
||||
this.target.on("ERROR", errorListener)
|
||||
this.target.send(verb, event)
|
||||
|
||||
return {
|
||||
unsubscribe: () => {
|
||||
this.target.off("OK", okListener)
|
||||
this.target.off("ERROR", errorListener)
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
diff(filter: Filter, events: TrustedEvent[], {onMessage, onError, onClose}: DiffOpts = {}) {
|
||||
let closed = false
|
||||
|
||||
const id = createSubId("NEG")
|
||||
const storage = new NegentropyStorageVector()
|
||||
const neg = new Negentropy(storage, 50_000)
|
||||
|
||||
for (const event of events) {
|
||||
storage.insert(event.created_at, event.id)
|
||||
}
|
||||
|
||||
storage.seal()
|
||||
|
||||
const msgListener = async (url: string, negid: string, msg: string) => {
|
||||
if (negid === id) {
|
||||
const [newMsg, have, need] = await neg.reconcile(msg)
|
||||
|
||||
onMessage?.(url, {have, need})
|
||||
|
||||
if (newMsg) {
|
||||
this.target.send("NEG-MSG", id, newMsg)
|
||||
} else {
|
||||
close()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const errListener = (url: string, negid: string, msg: string) => {
|
||||
if (negid === id) {
|
||||
onError?.(url, msg)
|
||||
}
|
||||
}
|
||||
|
||||
const close = () => {
|
||||
if (closed) return
|
||||
|
||||
this.target.send("NEG-CLOSE", id).catch(noop)
|
||||
this.target.off("NEG-MSG", msgListener)
|
||||
this.target.off("NEG-ERR", errListener)
|
||||
|
||||
closed = true
|
||||
onClose?.()
|
||||
}
|
||||
|
||||
this.target.on("NEG-MSG", msgListener)
|
||||
this.target.on("NEG-ERR", errListener)
|
||||
|
||||
neg.initiate().then((msg: string) => {
|
||||
this.target.send("NEG-OPEN", id, filter, msg)
|
||||
})
|
||||
|
||||
return {
|
||||
unsubscribe: close,
|
||||
}
|
||||
}
|
||||
}
|
||||
+58
-23
@@ -1,46 +1,81 @@
|
||||
import {Emitter} from "@welshman/lib"
|
||||
import {Connection} from "./Connection.js"
|
||||
import {remove} from "@welshman/lib"
|
||||
import {normalizeRelayUrl} from "@welshman/util"
|
||||
import {Socket} from "./socket.js"
|
||||
import {defaultSocketPolicies} from "./policy.js"
|
||||
|
||||
export class Pool extends Emitter {
|
||||
data: Map<string, Connection>
|
||||
export const makeSocket = (url: string, policies = defaultSocketPolicies) => {
|
||||
const socket = new Socket(url)
|
||||
|
||||
constructor() {
|
||||
super()
|
||||
|
||||
this.data = new Map()
|
||||
for (const applyPolicy of policies) {
|
||||
applyPolicy(socket)
|
||||
}
|
||||
|
||||
return socket
|
||||
}
|
||||
|
||||
export type PoolSubscription = (socket: Socket) => void
|
||||
|
||||
export type PoolOptions = {
|
||||
makeSocket?: (url: string) => Socket
|
||||
}
|
||||
|
||||
export class Pool {
|
||||
_data = new Map<string, Socket>()
|
||||
_subs: PoolSubscription[] = []
|
||||
|
||||
constructor(readonly options: PoolOptions = {}) {}
|
||||
|
||||
has(url: string) {
|
||||
return this.data.has(url)
|
||||
return this._data.has(normalizeRelayUrl(url))
|
||||
}
|
||||
|
||||
get(url: string): Connection {
|
||||
const oldConnection = this.data.get(url)
|
||||
|
||||
if (oldConnection) {
|
||||
return oldConnection
|
||||
makeSocket(url: string) {
|
||||
if (this.options.makeSocket) {
|
||||
return this.options.makeSocket(url)
|
||||
}
|
||||
|
||||
const newConnection = new Connection(url)
|
||||
return makeSocket(url)
|
||||
}
|
||||
|
||||
this.data.set(url, newConnection)
|
||||
this.emit("init", newConnection)
|
||||
get(_url: string): Socket {
|
||||
const url = normalizeRelayUrl(_url)
|
||||
const oldSocket = this._data.get(url)
|
||||
|
||||
return newConnection
|
||||
if (oldSocket) {
|
||||
return oldSocket
|
||||
}
|
||||
|
||||
const socket = this.makeSocket(url)
|
||||
|
||||
this._data.set(url, socket)
|
||||
|
||||
for (const cb of this._subs) {
|
||||
cb(socket)
|
||||
}
|
||||
|
||||
return socket
|
||||
}
|
||||
|
||||
subscribe(cb: PoolSubscription) {
|
||||
this._subs.push(cb)
|
||||
|
||||
return () => {
|
||||
this._subs = remove(cb, this._subs)
|
||||
}
|
||||
}
|
||||
|
||||
remove(url: string) {
|
||||
const connection = this.data.get(url)
|
||||
const socket = this._data.get(url)
|
||||
|
||||
if (connection) {
|
||||
connection.cleanup()
|
||||
if (socket) {
|
||||
socket.cleanup()
|
||||
|
||||
this.data.delete(url)
|
||||
this._data.delete(url)
|
||||
}
|
||||
}
|
||||
|
||||
clear() {
|
||||
for (const url of this.data.keys()) {
|
||||
for (const url of this._data.keys()) {
|
||||
this.remove(url)
|
||||
}
|
||||
}
|
||||
|
||||
+167
-80
@@ -1,98 +1,185 @@
|
||||
import {ctx, Emitter, now, randomId, defer} from "@welshman/lib"
|
||||
import type {Deferred} from "@welshman/lib"
|
||||
import {asSignedEvent} from "@welshman/util"
|
||||
import type {SignedEvent} from "@welshman/util"
|
||||
import {EventEmitter} from "events"
|
||||
import {on, fromPairs, sleep, yieldThread} from "@welshman/lib"
|
||||
import {SignedEvent} from "@welshman/util"
|
||||
import {RelayMessage, ClientMessageType, isRelayOk} from "./message.js"
|
||||
import {AbstractAdapter, AdapterEventType, AdapterContext, getAdapter} from "./adapter.js"
|
||||
import {TypedEmitter} from "./util.js"
|
||||
|
||||
export enum PublishStatus {
|
||||
Pending = "pending",
|
||||
Success = "success",
|
||||
Failure = "failure",
|
||||
Timeout = "timeout",
|
||||
Aborted = "aborted",
|
||||
Pending = "publish:status:pending",
|
||||
Success = "publish:status:success",
|
||||
Failure = "publish:status:failure",
|
||||
Timeout = "publish:status:timeout",
|
||||
Aborted = "publish:status:aborted",
|
||||
}
|
||||
|
||||
export type PublishStatusMap = Map<string, PublishStatus>
|
||||
export enum PublishEventType {
|
||||
Success = "publish:event:success",
|
||||
Failure = "publish:event:failure",
|
||||
Timeout = "publish:event:timeout",
|
||||
Aborted = "publish:event:aborted",
|
||||
Complete = "publish:event:complete",
|
||||
}
|
||||
|
||||
export type PublishRequest = {
|
||||
// Unicast
|
||||
|
||||
export type UnicastEvents = {
|
||||
[PublishEventType.Success]: (id: string, detail: string) => void
|
||||
[PublishEventType.Failure]: (id: string, detail: string) => void
|
||||
[PublishEventType.Timeout]: () => void
|
||||
[PublishEventType.Aborted]: () => void
|
||||
[PublishEventType.Complete]: () => void
|
||||
}
|
||||
|
||||
export type UnicastOptions = {
|
||||
event: SignedEvent
|
||||
relays: string[]
|
||||
signal?: AbortSignal
|
||||
relay: string
|
||||
context: AdapterContext
|
||||
timeout?: number
|
||||
verb?: "EVENT" | "AUTH"
|
||||
}
|
||||
|
||||
export type Publish = {
|
||||
id: string
|
||||
created_at: number
|
||||
emitter: Emitter
|
||||
request: PublishRequest
|
||||
status: PublishStatusMap
|
||||
result: Deferred<PublishStatusMap>
|
||||
}
|
||||
export class Unicast extends (EventEmitter as new () => TypedEmitter<UnicastEvents>) {
|
||||
status = PublishStatus.Pending
|
||||
|
||||
export const makePublish = (request: PublishRequest) => {
|
||||
const id = randomId()
|
||||
const created_at = now()
|
||||
const emitter = new Emitter()
|
||||
const result: Publish["result"] = defer()
|
||||
const status: Publish["status"] = new Map()
|
||||
_unsubscriber: () => void
|
||||
_adapter: AbstractAdapter
|
||||
|
||||
return {id, created_at, request, emitter, result, status}
|
||||
}
|
||||
constructor(readonly options: UnicastOptions) {
|
||||
super()
|
||||
|
||||
export const publish = (request: PublishRequest) => {
|
||||
const pub = makePublish(request)
|
||||
const event = asSignedEvent(request.event)
|
||||
const executor = ctx.net.getExecutor(request.relays)
|
||||
// Set up our adapter
|
||||
this._adapter = getAdapter(this.options.relay, this.options.context)
|
||||
|
||||
const abort = (reason: PublishStatus) => {
|
||||
for (const [url, status] of pub.status.entries()) {
|
||||
if (status === PublishStatus.Pending) {
|
||||
pub.emitter.emit(reason, url)
|
||||
// Listen for Unicast result
|
||||
this._unsubscriber = on(
|
||||
this._adapter,
|
||||
AdapterEventType.Receive,
|
||||
(message: RelayMessage, url: string) => {
|
||||
if (isRelayOk(message)) {
|
||||
const [_, id, ok, detail] = message
|
||||
|
||||
if (id !== this.options.event.id) return
|
||||
|
||||
if (ok) {
|
||||
this.status = PublishStatus.Success
|
||||
this.emit(PublishEventType.Success, id, detail)
|
||||
} else {
|
||||
this.status = PublishStatus.Failure
|
||||
this.emit(PublishEventType.Failure, id, detail)
|
||||
}
|
||||
|
||||
this.cleanup()
|
||||
}
|
||||
},
|
||||
)
|
||||
|
||||
// Set timeout
|
||||
sleep(this.options.timeout || 10_000).then(() => {
|
||||
if (this.status === PublishStatus.Pending) {
|
||||
this.status = PublishStatus.Timeout
|
||||
this.emit(PublishEventType.Timeout)
|
||||
}
|
||||
|
||||
this.cleanup()
|
||||
})
|
||||
|
||||
// Start asynchronously so the caller can set up listeners
|
||||
yieldThread().then(() => {
|
||||
this._adapter.send([ClientMessageType.Event, this.options.event])
|
||||
})
|
||||
}
|
||||
|
||||
abort = () => {
|
||||
if (this.status === PublishStatus.Pending) {
|
||||
this.status = PublishStatus.Aborted
|
||||
this.emit(PublishEventType.Aborted)
|
||||
this.cleanup()
|
||||
}
|
||||
}
|
||||
|
||||
// Listen to updates and keep status up to date. Every time there's an update, check to
|
||||
// see if we're done. If we are, clean everything up
|
||||
pub.emitter.on("*", (status: PublishStatus, url: string) => {
|
||||
pub.status.set(url, status)
|
||||
|
||||
if (Array.from(pub.status.values()).every((s: PublishStatus) => s !== PublishStatus.Pending)) {
|
||||
clearTimeout(timeout)
|
||||
executorSub.unsubscribe()
|
||||
executor.target.cleanup()
|
||||
pub.result.resolve(pub.status)
|
||||
}
|
||||
})
|
||||
|
||||
// Start everything off as pending. Do it asynchronously to avoid breaking caller assumptions
|
||||
setTimeout(() => {
|
||||
for (const relay of request.relays) {
|
||||
pub.emitter.emit(PublishStatus.Pending, relay)
|
||||
}
|
||||
})
|
||||
|
||||
// Give up after a specified time
|
||||
const timeout = setTimeout(() => abort(PublishStatus.Timeout), request.timeout || 10_000)
|
||||
|
||||
// If we have a signal, use it
|
||||
request.signal?.addEventListener("abort", () => abort(PublishStatus.Aborted))
|
||||
|
||||
// Delegate to our executor
|
||||
const executorSub = executor.publish(event, {
|
||||
verb: request.verb || "EVENT",
|
||||
onOk: (url: string, eventId: string, ok: boolean, message: string) => {
|
||||
if (ok) {
|
||||
pub.emitter.emit(PublishStatus.Success, url, message)
|
||||
} else {
|
||||
pub.emitter.emit(PublishStatus.Failure, url, message)
|
||||
}
|
||||
},
|
||||
onError: (url: string) => {
|
||||
pub.emitter.emit(PublishStatus.Failure, url)
|
||||
},
|
||||
})
|
||||
|
||||
return pub
|
||||
cleanup = () => {
|
||||
this.emit(PublishEventType.Complete)
|
||||
this.removeAllListeners()
|
||||
this._adapter.cleanup()
|
||||
this._unsubscriber()
|
||||
}
|
||||
}
|
||||
|
||||
// Multicast
|
||||
|
||||
export type MulticastEvents = {
|
||||
[PublishEventType.Success]: (id: string, detail: string, url: string) => void
|
||||
[PublishEventType.Failure]: (id: string, detail: string, url: string) => void
|
||||
[PublishEventType.Timeout]: (url: string) => void
|
||||
[PublishEventType.Aborted]: (url: string) => void
|
||||
[PublishEventType.Complete]: () => void
|
||||
}
|
||||
|
||||
export type MulticastOptions = Omit<UnicastOptions, "relay"> & {
|
||||
relays: string[]
|
||||
}
|
||||
|
||||
export class Multicast extends (EventEmitter as new () => TypedEmitter<MulticastEvents>) {
|
||||
status: Record<string, PublishStatus>
|
||||
|
||||
_children: Unicast[] = []
|
||||
_completed = new Set<string>()
|
||||
|
||||
constructor({relays, ...options}: MulticastOptions) {
|
||||
super()
|
||||
|
||||
this.status = fromPairs(relays.map(relay => [relay, PublishStatus.Pending]))
|
||||
|
||||
for (const relay of relays) {
|
||||
const unicast = new Unicast({relay, ...options})
|
||||
|
||||
unicast.on(PublishEventType.Success, (id: string, detail: string) => {
|
||||
this.status[relay] = unicast.status
|
||||
this.emit(PublishEventType.Success, id, detail, relay)
|
||||
})
|
||||
|
||||
unicast.on(PublishEventType.Failure, (id: string, detail: string) => {
|
||||
this.status[relay] = unicast.status
|
||||
this.emit(PublishEventType.Failure, id, detail, relay)
|
||||
})
|
||||
|
||||
unicast.on(PublishEventType.Timeout, () => {
|
||||
this.status[relay] = unicast.status
|
||||
this.emit(PublishEventType.Timeout, relay)
|
||||
})
|
||||
|
||||
unicast.on(PublishEventType.Aborted, () => {
|
||||
this.status[relay] = unicast.status
|
||||
this.emit(PublishEventType.Aborted, relay)
|
||||
})
|
||||
|
||||
unicast.on(PublishEventType.Complete, () => {
|
||||
this._completed.add(relay)
|
||||
this.status[relay] = unicast.status
|
||||
|
||||
if (this._completed.size === relays.length) {
|
||||
this.emit(PublishEventType.Complete)
|
||||
this.cleanup()
|
||||
}
|
||||
})
|
||||
|
||||
this._children.push(unicast)
|
||||
}
|
||||
}
|
||||
|
||||
abort() {
|
||||
for (const child of this._children) {
|
||||
child.abort()
|
||||
}
|
||||
}
|
||||
|
||||
cleanup() {
|
||||
this.removeAllListeners()
|
||||
}
|
||||
}
|
||||
|
||||
// Convenience functions
|
||||
|
||||
export const unicast = (options: UnicastOptions) => new Unicast(options)
|
||||
|
||||
export const multicast = (options: MulticastOptions) => new Multicast(options)
|
||||
|
||||
+96
-100
@@ -1,134 +1,130 @@
|
||||
import WebSocket from "isomorphic-ws"
|
||||
import {Worker, sleep} from "@welshman/lib"
|
||||
import {ConnectionEvent} from "./ConnectionEvent.js"
|
||||
import type {Connection} from "./Connection.js"
|
||||
|
||||
export type Message = [string, ...any[]]
|
||||
import EventEmitter from "events"
|
||||
import {TaskQueue} from "@welshman/lib"
|
||||
import {RelayMessage, ClientMessage} from "./message.js"
|
||||
import {TypedEmitter} from "./util.js"
|
||||
|
||||
export enum SocketStatus {
|
||||
New = "new",
|
||||
Open = "open",
|
||||
Opening = "opening",
|
||||
Closing = "closing",
|
||||
Closed = "closed",
|
||||
Error = "error",
|
||||
Invalid = "invalid",
|
||||
Open = "socket:status:open",
|
||||
Opening = "socket:status:opening",
|
||||
Closing = "socket:status:closing",
|
||||
Closed = "socket:status:closed",
|
||||
Error = "socket:status:error",
|
||||
Invalid = "socket:status:invalid",
|
||||
}
|
||||
|
||||
export class Socket {
|
||||
lastError = 0
|
||||
status = SocketStatus.New
|
||||
worker = new Worker<Message>()
|
||||
ws?: WebSocket
|
||||
export enum SocketEventType {
|
||||
Error = "socket:event:error",
|
||||
Status = "socket:event:status",
|
||||
Send = "socket:event:send",
|
||||
Enqueue = "socket:event:enqueue",
|
||||
Receive = "socket:event:receive",
|
||||
}
|
||||
|
||||
constructor(readonly cxn: Connection) {
|
||||
// Use a worker to throttle incoming data
|
||||
this.worker.addGlobalHandler((message: Message) => {
|
||||
this.cxn.emit(ConnectionEvent.Receive, message)
|
||||
export type SocketEvents = {
|
||||
[SocketEventType.Error]: (error: string, url: string) => void
|
||||
[SocketEventType.Status]: (status: SocketStatus, url: string) => void
|
||||
[SocketEventType.Send]: (message: ClientMessage, url: string) => void
|
||||
[SocketEventType.Enqueue]: (message: ClientMessage, url: string) => void
|
||||
[SocketEventType.Receive]: (message: RelayMessage, url: string) => void
|
||||
}
|
||||
|
||||
export class Socket extends (EventEmitter as new () => TypedEmitter<SocketEvents>) {
|
||||
status = SocketStatus.Closed
|
||||
|
||||
_ws?: WebSocket
|
||||
_sendQueue: TaskQueue<ClientMessage>
|
||||
_recvQueue: TaskQueue<RelayMessage>
|
||||
|
||||
constructor(readonly url: string) {
|
||||
super()
|
||||
|
||||
this._sendQueue = new TaskQueue<ClientMessage>({
|
||||
batchSize: 50,
|
||||
processItem: (message: ClientMessage) => {
|
||||
this._ws?.send(JSON.stringify(message))
|
||||
this.emit(SocketEventType.Send, message, this.url)
|
||||
},
|
||||
})
|
||||
|
||||
this._recvQueue = new TaskQueue<RelayMessage>({
|
||||
batchSize: 50,
|
||||
processItem: (message: RelayMessage) => {
|
||||
this.emit(SocketEventType.Receive, message, this.url)
|
||||
},
|
||||
})
|
||||
|
||||
this.on(SocketEventType.Status, (status: SocketStatus) => {
|
||||
this.status = status
|
||||
})
|
||||
}
|
||||
|
||||
wait = async (timeout = 300) => {
|
||||
const start = Date.now()
|
||||
while (
|
||||
Date.now() - timeout <= start &&
|
||||
[SocketStatus.Opening, SocketStatus.Closing].includes(this.status)
|
||||
) {
|
||||
await sleep(100)
|
||||
}
|
||||
}
|
||||
|
||||
open = async () => {
|
||||
// If we're in a provisional state, wait
|
||||
await this.wait()
|
||||
|
||||
// If the socket is closed, reset
|
||||
if (this.status === SocketStatus.Closed) {
|
||||
this.status = SocketStatus.New
|
||||
this.cxn.emit(ConnectionEvent.Reset)
|
||||
open = () => {
|
||||
if (this._ws) {
|
||||
throw new Error("Attempted to open a websocket that has not been closed")
|
||||
}
|
||||
|
||||
// If we're closed due to an error retry after a delay
|
||||
if (this.status === SocketStatus.Error && Date.now() - this.lastError > 15_000) {
|
||||
this.status = SocketStatus.New
|
||||
this.cxn.emit(ConnectionEvent.Reset)
|
||||
}
|
||||
|
||||
// If the socket is new, connect
|
||||
if (this.status === SocketStatus.New) {
|
||||
this.#init()
|
||||
}
|
||||
|
||||
// Wait until we're connected (or fail to connect)
|
||||
await this.wait()
|
||||
}
|
||||
|
||||
close = async () => {
|
||||
this.worker.pause()
|
||||
this.ws?.close()
|
||||
this.ws = undefined
|
||||
|
||||
// Allow the socket to start closing before waiting
|
||||
await sleep(100)
|
||||
|
||||
// Wait for the socket to fully close
|
||||
await this.wait()
|
||||
}
|
||||
|
||||
send = async (message: Message) => {
|
||||
await this.open()
|
||||
|
||||
if (!this.ws) {
|
||||
throw new Error(`No websocket available when sending to ${this.cxn.url}`)
|
||||
}
|
||||
|
||||
this.cxn.emit(ConnectionEvent.Send, message)
|
||||
this.ws.send(JSON.stringify(message))
|
||||
}
|
||||
|
||||
#init = () => {
|
||||
try {
|
||||
this.ws = new WebSocket(this.cxn.url)
|
||||
this.status = SocketStatus.Opening
|
||||
this._ws = new WebSocket(this.url)
|
||||
this.emit(SocketEventType.Status, SocketStatus.Opening, this.url)
|
||||
|
||||
this.ws.onopen = () => {
|
||||
this.status = SocketStatus.Open
|
||||
this.cxn.emit(ConnectionEvent.Open)
|
||||
this._ws.onopen = () => {
|
||||
this.emit(SocketEventType.Status, SocketStatus.Open, this.url)
|
||||
this._sendQueue.start()
|
||||
}
|
||||
|
||||
this.ws.onerror = () => {
|
||||
this.status = SocketStatus.Error
|
||||
this.lastError = Date.now()
|
||||
this.cxn.emit(ConnectionEvent.Error)
|
||||
this._ws.onerror = () => {
|
||||
this.emit(SocketEventType.Status, SocketStatus.Error, this.url)
|
||||
this._sendQueue.stop()
|
||||
this._ws = undefined
|
||||
}
|
||||
|
||||
this.ws.onclose = () => {
|
||||
if (this.status !== SocketStatus.Error) {
|
||||
this.status = SocketStatus.Closed
|
||||
}
|
||||
|
||||
this.cxn.emit(ConnectionEvent.Close)
|
||||
this._ws.onclose = () => {
|
||||
this.emit(SocketEventType.Status, SocketStatus.Closed, this.url)
|
||||
this._sendQueue.stop()
|
||||
this._ws = undefined
|
||||
}
|
||||
|
||||
this.ws.onmessage = (event: any) => {
|
||||
this._ws.onmessage = (event: any) => {
|
||||
const data = event.data as string
|
||||
|
||||
try {
|
||||
const message = JSON.parse(data)
|
||||
|
||||
if (Array.isArray(message)) {
|
||||
this.worker.push(message as Message)
|
||||
this._recvQueue.push(message as RelayMessage)
|
||||
} else {
|
||||
this.cxn.emit(ConnectionEvent.InvalidMessage, data)
|
||||
this.emit(SocketEventType.Error, "Invalid message received", this.url)
|
||||
}
|
||||
} catch (e) {
|
||||
this.cxn.emit(ConnectionEvent.InvalidMessage, data)
|
||||
this.emit(SocketEventType.Error, "Invalid message received", this.url)
|
||||
}
|
||||
}
|
||||
} catch (e) {
|
||||
this.lastError = Date.now()
|
||||
this.status = SocketStatus.Invalid
|
||||
this.cxn.emit(ConnectionEvent.InvalidUrl)
|
||||
this.emit(SocketEventType.Status, SocketStatus.Invalid, this.url)
|
||||
}
|
||||
}
|
||||
|
||||
attemptToOpen = () => {
|
||||
if (!this._ws) {
|
||||
this.open()
|
||||
}
|
||||
}
|
||||
|
||||
close = () => {
|
||||
this._ws?.close()
|
||||
this._ws = undefined
|
||||
}
|
||||
|
||||
cleanup = () => {
|
||||
this.close()
|
||||
this._recvQueue.clear()
|
||||
this._sendQueue.clear()
|
||||
this.removeAllListeners()
|
||||
}
|
||||
|
||||
send = (message: ClientMessage) => {
|
||||
this._sendQueue.push(message)
|
||||
this.emit(SocketEventType.Enqueue, message, this.url)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,390 +0,0 @@
|
||||
import {ctx, Emitter, max, chunk, randomId, once, groupBy, uniq} from "@welshman/lib"
|
||||
import {
|
||||
LOCAL_RELAY_URL,
|
||||
matchFilters,
|
||||
normalizeRelayUrl,
|
||||
unionFilters,
|
||||
TrustedEvent,
|
||||
} from "@welshman/util"
|
||||
import type {Filter} from "@welshman/util"
|
||||
import {Tracker} from "./Tracker.js"
|
||||
import {Executor} from "./Executor.js"
|
||||
import {Connection} from "./Connection.js"
|
||||
import {ConnectionEvent} from "./ConnectionEvent.js"
|
||||
|
||||
// `subscribe` is a super function that handles batching subscriptions by merging
|
||||
// them based on parameters (filters and subscribe opts), then splits them by relay.
|
||||
// This results in fewer REQs being opened per connection, fewer duplicate events
|
||||
// being downloaded, and therefore less signature validation.
|
||||
//
|
||||
// Behavior can be further configured using ctx.net. This can be useful for
|
||||
// adding support for querying a local cache like a relay, tracking deleted events,
|
||||
// and bypassing validation for trusted relays.
|
||||
//
|
||||
// Urls that any given event was seen on are tracked using subscription request's `tracker`
|
||||
// property. These are merged across all subscription requests, so it is possible that an
|
||||
// event may be seen on more relays that were actually requested, in the case of overlapping
|
||||
// subscriptions.
|
||||
|
||||
export enum SubscriptionEvent {
|
||||
Eose = "eose",
|
||||
Send = "send",
|
||||
Close = "close",
|
||||
Event = "event",
|
||||
Complete = "complete",
|
||||
Duplicate = "duplicate",
|
||||
DeletedEvent = "deleted-event",
|
||||
FailedFilter = "failed-filter",
|
||||
Invalid = "invalid",
|
||||
}
|
||||
|
||||
export type RelaysAndFilters = {
|
||||
relays: string[]
|
||||
filters: Filter[]
|
||||
}
|
||||
|
||||
export type SubscribeRequest = RelaysAndFilters & {
|
||||
delay?: number
|
||||
signal?: AbortSignal
|
||||
timeout?: number
|
||||
tracker?: Tracker
|
||||
closeOnEose?: boolean
|
||||
authTimeout?: number
|
||||
}
|
||||
|
||||
export class Subscription extends Emitter {
|
||||
id = randomId()
|
||||
controller = new AbortController()
|
||||
tracker = new Tracker()
|
||||
completed = new Set()
|
||||
executorSubs: {unsubscribe: () => void}[] = []
|
||||
executor: Executor
|
||||
|
||||
constructor(readonly request: SubscribeRequest) {
|
||||
super()
|
||||
|
||||
if (request.tracker) {
|
||||
this.tracker = request.tracker
|
||||
}
|
||||
|
||||
this.setMaxListeners(100)
|
||||
this.executor = ctx.net.getExecutor(request.relays)
|
||||
}
|
||||
|
||||
onEvent = (url: string, event: TrustedEvent) => {
|
||||
const {filters} = this.request
|
||||
|
||||
if (this.tracker.track(event.id, url)) {
|
||||
this.emit(SubscriptionEvent.Duplicate, url, event)
|
||||
} else if (ctx.net.isDeleted(url, event)) {
|
||||
this.emit(SubscriptionEvent.DeletedEvent, url, event)
|
||||
} else if (!ctx.net.matchFilters(url, filters, event)) {
|
||||
this.emit(SubscriptionEvent.FailedFilter, url, event)
|
||||
} else if (!ctx.net.isValid(url, event)) {
|
||||
this.emit(SubscriptionEvent.Invalid, url, event)
|
||||
} else {
|
||||
this.emit(SubscriptionEvent.Event, url, event)
|
||||
}
|
||||
}
|
||||
|
||||
onEose = (url: string) => {
|
||||
const {closeOnEose, relays} = this.request
|
||||
|
||||
this.emit(SubscriptionEvent.Eose, url)
|
||||
|
||||
this.completed.add(url)
|
||||
|
||||
if (closeOnEose && this.completed.size === uniq(relays).length) {
|
||||
this.onComplete()
|
||||
}
|
||||
}
|
||||
|
||||
onClose = (connection: Connection) => {
|
||||
const {relays} = this.request
|
||||
|
||||
this.emit(SubscriptionEvent.Close, connection.url)
|
||||
|
||||
this.completed.add(connection.url)
|
||||
|
||||
if (this.completed.size === uniq(relays).length) {
|
||||
this.onComplete()
|
||||
}
|
||||
}
|
||||
|
||||
onComplete = once(() => {
|
||||
this.emit(SubscriptionEvent.Complete)
|
||||
this.executorSubs.forEach(sub => sub.unsubscribe())
|
||||
this.removeAllListeners()
|
||||
this.executor.target.cleanup()
|
||||
this.executor.target.connections.forEach((c: Connection) => {
|
||||
c.off(ConnectionEvent.Close, this.onClose)
|
||||
})
|
||||
})
|
||||
|
||||
execute = async () => {
|
||||
const {filters, signal, timeout, authTimeout = 0} = this.request
|
||||
|
||||
// If we didn't get any filters, don't even send the request, just close it.
|
||||
// This can be valid when a caller fulfills a request themselves but still needs a subscription object.
|
||||
if (filters.length === 0) {
|
||||
this.emit(SubscriptionEvent.Send)
|
||||
this.onComplete()
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// Hook up our events
|
||||
|
||||
// Listen for abort via caller signal
|
||||
signal?.addEventListener("abort", this.onComplete)
|
||||
|
||||
// Listen for abort via our own internal signal
|
||||
this.controller.signal.addEventListener("abort", this.onComplete)
|
||||
|
||||
// If we have a timeout, complete the subscription automatically
|
||||
if (timeout) setTimeout(this.onComplete, timeout + authTimeout)
|
||||
|
||||
// If one of our connections gets closed make sure to kill our sub
|
||||
this.executor.target.connections.forEach((c: Connection) =>
|
||||
c.on(ConnectionEvent.Close, this.onClose),
|
||||
)
|
||||
|
||||
// Wait for auth if needed
|
||||
await Promise.all(
|
||||
this.executor.target.connections.map(async (connection: Connection) => {
|
||||
if (authTimeout) {
|
||||
await connection.auth.attempt(authTimeout)
|
||||
}
|
||||
}),
|
||||
)
|
||||
|
||||
// If we send too many filters in a request relays will refuse to respond. REQs are rate
|
||||
// limited client-side by Connection, so this will throttle concurrent requests.
|
||||
for (const filtersChunk of chunk(8, filters)) {
|
||||
this.executorSubs.push(
|
||||
this.executor.subscribe(filtersChunk, {
|
||||
onEvent: this.onEvent,
|
||||
onEose: this.onEose,
|
||||
}),
|
||||
)
|
||||
}
|
||||
|
||||
// Notify that we've sent the subscription
|
||||
this.emit(SubscriptionEvent.Send)
|
||||
}
|
||||
|
||||
close = () => this.controller.abort()
|
||||
}
|
||||
|
||||
export const calculateSubscriptionGroup = (sub: Subscription) => {
|
||||
const parts: string[] = []
|
||||
|
||||
if (sub.request.timeout) parts.push(`timeout:${sub.request.timeout}`)
|
||||
if (sub.request.authTimeout) parts.push(`authTimeout:${sub.request.authTimeout}`)
|
||||
if (sub.request.closeOnEose) parts.push("closeOnEose")
|
||||
|
||||
return parts.join("|")
|
||||
}
|
||||
|
||||
export const mergeSubscriptions = (subs: Subscription[]) => {
|
||||
const mergedSub = new Subscription({
|
||||
relays: uniq(subs.flatMap(sub => sub.request.relays)),
|
||||
filters: unionFilters(subs.flatMap(sub => sub.request.filters)),
|
||||
timeout: max(subs.map(sub => sub.request.timeout || 0)),
|
||||
authTimeout: max(subs.map(sub => sub.request.authTimeout || 0)),
|
||||
closeOnEose: subs.every(sub => sub.request.closeOnEose),
|
||||
})
|
||||
|
||||
mergedSub.controller.signal.addEventListener("abort", () => {
|
||||
for (const sub of subs) {
|
||||
sub.close()
|
||||
}
|
||||
})
|
||||
|
||||
const completedSubs = new Set()
|
||||
|
||||
for (const sub of subs) {
|
||||
// Propagate events, but avoid duplicates
|
||||
sub.on(SubscriptionEvent.Event, (url: string, event: TrustedEvent) => {
|
||||
if (!mergedSub.tracker.track(event.id, url)) {
|
||||
mergedSub.emit(SubscriptionEvent.Event, url, event)
|
||||
}
|
||||
})
|
||||
|
||||
// Propagate subscription completion. Since we split subs by relay, we need to wait
|
||||
// until all relays are completed before we notify
|
||||
sub.on(SubscriptionEvent.Complete, () => {
|
||||
completedSubs.add(sub.id)
|
||||
|
||||
if (completedSubs.size === subs.length) {
|
||||
mergedSub.emit(SubscriptionEvent.Complete)
|
||||
}
|
||||
|
||||
sub.removeAllListeners()
|
||||
})
|
||||
|
||||
// Propagate everything else too
|
||||
const propagateEvent = (type: SubscriptionEvent) =>
|
||||
sub.on(type, (...args) => mergedSub.emit(type, ...args))
|
||||
|
||||
propagateEvent(SubscriptionEvent.Duplicate)
|
||||
propagateEvent(SubscriptionEvent.DeletedEvent)
|
||||
propagateEvent(SubscriptionEvent.FailedFilter)
|
||||
propagateEvent(SubscriptionEvent.Invalid)
|
||||
propagateEvent(SubscriptionEvent.Eose)
|
||||
propagateEvent(SubscriptionEvent.Send)
|
||||
propagateEvent(SubscriptionEvent.Close)
|
||||
}
|
||||
|
||||
return mergedSub
|
||||
}
|
||||
|
||||
export const optimizeSubscriptions = (subs: Subscription[]) => {
|
||||
return Array.from(groupBy(calculateSubscriptionGroup, subs).values()).flatMap(group => {
|
||||
const timeout = max(group.map(sub => sub.request.timeout || 0))
|
||||
const authTimeout = max(group.map(sub => sub.request.authTimeout || 0))
|
||||
const closeOnEose = group.every(sub => sub.request.closeOnEose)
|
||||
const completedSubs = new Set<string>()
|
||||
const abortedSubs = new Set<string>()
|
||||
const closedSubs = new Set<string>()
|
||||
const eosedSubs = new Set<string>()
|
||||
const sentSubs = new Set<string>()
|
||||
const mergedSubs: Subscription[] = []
|
||||
|
||||
for (const {relays, filters} of ctx.net.optimizeSubscriptions(group)) {
|
||||
for (const filter of filters) {
|
||||
const mergedSub = new Subscription({
|
||||
filters: [filter],
|
||||
relays,
|
||||
timeout,
|
||||
authTimeout,
|
||||
closeOnEose,
|
||||
})
|
||||
|
||||
for (const {id, controller, request} of group) {
|
||||
const onAbort = () => {
|
||||
abortedSubs.add(id)
|
||||
|
||||
if (abortedSubs.size === group.length) {
|
||||
mergedSub.close()
|
||||
}
|
||||
}
|
||||
|
||||
request.signal?.addEventListener("abort", onAbort)
|
||||
controller.signal.addEventListener("abort", onAbort)
|
||||
}
|
||||
|
||||
mergedSub.on(SubscriptionEvent.Event, (url: string, event: TrustedEvent) => {
|
||||
for (const sub of group) {
|
||||
if (matchFilters(sub.request.filters, event) && !sub.tracker.track(event.id, url)) {
|
||||
sub.emit(SubscriptionEvent.Event, url, event)
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
// Pass events back to caller
|
||||
const propagateEvent = (type: SubscriptionEvent) =>
|
||||
mergedSub.on(type, (url: string, event: TrustedEvent) => {
|
||||
for (const sub of group) {
|
||||
if (matchFilters(sub.request.filters, event)) {
|
||||
sub.emit(type, url, event)
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
propagateEvent(SubscriptionEvent.Duplicate)
|
||||
propagateEvent(SubscriptionEvent.DeletedEvent)
|
||||
propagateEvent(SubscriptionEvent.Invalid)
|
||||
|
||||
const propagateFinality = (type: SubscriptionEvent, subIds: Set<string>) =>
|
||||
mergedSub.on(type, (...args: any[]) => {
|
||||
subIds.add(mergedSub.id)
|
||||
|
||||
// Wait for all subscriptions to complete before reporting finality to the caller.
|
||||
// This is sub-optimal, but because we're outsourcing filter/relay optimization
|
||||
// we can't make any assumptions about which caller subscriptions have completed
|
||||
// at any given time.
|
||||
if (subIds.size === mergedSubs.length) {
|
||||
for (const sub of group) {
|
||||
sub.emit(type, ...args)
|
||||
}
|
||||
}
|
||||
|
||||
if (type === SubscriptionEvent.Complete) {
|
||||
mergedSub.removeAllListeners()
|
||||
}
|
||||
})
|
||||
|
||||
propagateFinality(SubscriptionEvent.Send, sentSubs)
|
||||
propagateFinality(SubscriptionEvent.Eose, eosedSubs)
|
||||
propagateFinality(SubscriptionEvent.Close, closedSubs)
|
||||
propagateFinality(SubscriptionEvent.Complete, completedSubs)
|
||||
|
||||
mergedSubs.push(mergedSub)
|
||||
}
|
||||
}
|
||||
|
||||
return mergedSubs
|
||||
})
|
||||
}
|
||||
|
||||
export const executeSubscription = (sub: Subscription) =>
|
||||
optimizeSubscriptions([sub]).forEach(sub => sub.execute())
|
||||
|
||||
export const executeSubscriptions = (subs: Subscription[]) =>
|
||||
optimizeSubscriptions(subs).forEach(sub => sub.execute())
|
||||
|
||||
export const executeSubscriptionBatched = (() => {
|
||||
const subs: Subscription[] = []
|
||||
const timeouts: number[] = []
|
||||
|
||||
const executeAll = () => {
|
||||
executeSubscriptions(subs.splice(0))
|
||||
|
||||
for (const timeout of timeouts.splice(0)) {
|
||||
clearTimeout(timeout)
|
||||
}
|
||||
}
|
||||
|
||||
return (sub: Subscription) => {
|
||||
subs.push(sub)
|
||||
timeouts.push(setTimeout(executeAll, Math.max(16, sub.request.delay!)) as unknown as number)
|
||||
}
|
||||
})()
|
||||
|
||||
export type SubscribeRequestWithHandlers = SubscribeRequest & {
|
||||
onEvent?: (event: TrustedEvent) => void
|
||||
onEose?: (url: string) => void
|
||||
onClose?: (url: string) => void
|
||||
onComplete?: () => void
|
||||
}
|
||||
|
||||
export const subscribe = ({
|
||||
onEvent,
|
||||
onEose,
|
||||
onClose,
|
||||
onComplete,
|
||||
...request
|
||||
}: SubscribeRequestWithHandlers) => {
|
||||
const sub: Subscription = new Subscription({delay: 50, ...request})
|
||||
|
||||
for (const relay of request.relays) {
|
||||
if (relay !== LOCAL_RELAY_URL && relay !== normalizeRelayUrl(relay)) {
|
||||
console.warn(`Attempted to open subscription to non-normalized url ${relay}`)
|
||||
}
|
||||
}
|
||||
|
||||
if (request.delay === 0) {
|
||||
executeSubscription(sub)
|
||||
} else {
|
||||
executeSubscriptionBatched(sub)
|
||||
}
|
||||
|
||||
// Signature for onEvent is different from emitter signature for historical reasons and convenience
|
||||
if (onEvent) sub.on(SubscriptionEvent.Event, (url: string, event: TrustedEvent) => onEvent(event))
|
||||
if (onEose) sub.on(SubscriptionEvent.Eose, onEose)
|
||||
if (onClose) sub.on(SubscriptionEvent.Close, onClose)
|
||||
if (onComplete) sub.on(SubscriptionEvent.Complete, onComplete)
|
||||
|
||||
return sub
|
||||
}
|
||||
@@ -1,208 +0,0 @@
|
||||
import {ctx, assoc, lt, groupBy, now, pushToMapKey, inc, flatten, chunk} from "@welshman/lib"
|
||||
import type {SignedEvent, TrustedEvent, Filter} from "@welshman/util"
|
||||
import {subscribe} from "./Subscribe.js"
|
||||
import {publish} from "./Publish.js"
|
||||
|
||||
export type DiffOpts = {
|
||||
relays: string[]
|
||||
filters: Filter[]
|
||||
events: TrustedEvent[]
|
||||
}
|
||||
|
||||
export const diff = async ({relays, filters, events}: DiffOpts) => {
|
||||
const diffs = flatten(
|
||||
await Promise.all(
|
||||
relays.flatMap(async relay => {
|
||||
return await Promise.all(
|
||||
filters.map(async filter => {
|
||||
const executor = ctx.net.getExecutor([relay])
|
||||
const have = new Set<string>()
|
||||
const need = new Set<string>()
|
||||
|
||||
await new Promise<void>((resolve, reject) => {
|
||||
executor.diff(filter, events, {
|
||||
onClose: resolve,
|
||||
onError: (url, message) => reject(message),
|
||||
onMessage: (url, message) => {
|
||||
for (const id of message.have) {
|
||||
have.add(id)
|
||||
}
|
||||
|
||||
for (const id of message.need) {
|
||||
need.add(id)
|
||||
}
|
||||
},
|
||||
})
|
||||
})
|
||||
|
||||
return {relay, have, need}
|
||||
}),
|
||||
)
|
||||
}),
|
||||
),
|
||||
)
|
||||
|
||||
return Array.from(groupBy(diff => diff.relay, diffs).entries()).map(([relay, diffs]) => {
|
||||
const have = new Set<string>()
|
||||
const need = new Set<string>()
|
||||
|
||||
for (const diff of diffs) {
|
||||
for (const id of diff.have) {
|
||||
have.add(id)
|
||||
}
|
||||
|
||||
for (const id of diff.need) {
|
||||
need.add(id)
|
||||
}
|
||||
}
|
||||
|
||||
return {relay, have: Array.from(have), need: Array.from(need)}
|
||||
})
|
||||
}
|
||||
|
||||
export type PullOpts = {
|
||||
relays: string[]
|
||||
filters: Filter[]
|
||||
events: TrustedEvent[]
|
||||
onEvent?: (event: TrustedEvent) => void
|
||||
}
|
||||
|
||||
export const pull = async ({relays, filters, events, onEvent}: PullOpts) => {
|
||||
const countById = new Map<string, number>()
|
||||
const idsByRelay = new Map<string, string[]>()
|
||||
|
||||
for (const {relay, need} of await diff({relays, filters, events})) {
|
||||
for (const id of need) {
|
||||
const count = countById.get(id) || 0
|
||||
|
||||
// Reduce, but don't completely eliminate duplicates, just in case a relay
|
||||
// won't give us what we ask for.
|
||||
if (count < 2) {
|
||||
pushToMapKey(idsByRelay, relay, id)
|
||||
countById.set(id, inc(count))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const result: TrustedEvent[] = []
|
||||
|
||||
await Promise.all(
|
||||
Array.from(idsByRelay.entries()).map(([relay, allIds]) => {
|
||||
return Promise.all(
|
||||
chunk(1024, allIds).map(ids => {
|
||||
return new Promise(resolve => {
|
||||
subscribe({
|
||||
relays: [relay],
|
||||
filters: [{ids}],
|
||||
closeOnEose: true,
|
||||
onClose: resolve,
|
||||
onEvent: event => {
|
||||
result.push(event)
|
||||
onEvent?.(event)
|
||||
},
|
||||
})
|
||||
})
|
||||
}),
|
||||
)
|
||||
}),
|
||||
)
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
export type PushOpts = {
|
||||
relays: string[]
|
||||
filters: Filter[]
|
||||
events: SignedEvent[]
|
||||
}
|
||||
|
||||
export const push = async ({relays, filters, events}: PushOpts) => {
|
||||
const relaysById = new Map<string, string[]>()
|
||||
|
||||
for (const {relay, have} of await diff({relays, filters, events})) {
|
||||
for (const id of have) {
|
||||
pushToMapKey(relaysById, id, relay)
|
||||
}
|
||||
}
|
||||
|
||||
await Promise.all(
|
||||
events.map(async event => {
|
||||
const relays = relaysById.get(event.id)
|
||||
|
||||
if (relays) {
|
||||
await publish({event, relays}).result
|
||||
}
|
||||
}),
|
||||
)
|
||||
}
|
||||
|
||||
export type SyncOpts = {
|
||||
relays: string[]
|
||||
filters: Filter[]
|
||||
events: SignedEvent[]
|
||||
}
|
||||
|
||||
export const sync = async (opts: SyncOpts) => {
|
||||
await pull(opts)
|
||||
await push(opts)
|
||||
}
|
||||
|
||||
// Legacy alternatives for use with relays that don't support negentropy
|
||||
|
||||
export type PullWithoutNegentropyOpts = {
|
||||
relays: string[]
|
||||
filters: Filter[]
|
||||
onEvent?: (event: TrustedEvent) => void
|
||||
}
|
||||
|
||||
export const pullWithoutNegentropy = async ({
|
||||
relays,
|
||||
filters,
|
||||
onEvent,
|
||||
}: PullWithoutNegentropyOpts) => {
|
||||
let done = false
|
||||
let until = now() + 30
|
||||
|
||||
const result: TrustedEvent[] = []
|
||||
|
||||
while (!done) {
|
||||
let anyResults = false
|
||||
|
||||
await new Promise<void>(resolve => {
|
||||
subscribe({
|
||||
relays,
|
||||
filters: filters.filter(f => lt(f.since, until)).map(assoc("until", until)),
|
||||
closeOnEose: true,
|
||||
onComplete: () => {
|
||||
done = !anyResults
|
||||
resolve()
|
||||
},
|
||||
onEvent: event => {
|
||||
anyResults = true
|
||||
until = Math.min(until, event.created_at - 1)
|
||||
result.push(event)
|
||||
onEvent?.(event)
|
||||
},
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
export type PushWithoutNegentropyOpts = {
|
||||
relays: string[]
|
||||
events: SignedEvent[]
|
||||
}
|
||||
|
||||
export const pushWithoutNegentropy = ({relays, events}: PushWithoutNegentropyOpts) =>
|
||||
Promise.all(
|
||||
events.map(async event => {
|
||||
await publish({event, relays}).result
|
||||
}),
|
||||
)
|
||||
|
||||
export const syncWithoutNegentropy = async (opts: SyncOpts) => {
|
||||
await pullWithoutNegentropy(opts)
|
||||
await pushWithoutNegentropy(opts)
|
||||
}
|
||||
@@ -0,0 +1,112 @@
|
||||
import EventEmitter from "events"
|
||||
import {call, on} from "@welshman/lib"
|
||||
import {Relay, LOCAL_RELAY_URL, isRelayUrl} from "@welshman/util"
|
||||
import {RelayMessage, ClientMessage} from "./message.js"
|
||||
import {Socket, SocketEventType} from "./socket.js"
|
||||
import {TypedEmitter, Unsubscriber} from "./util.js"
|
||||
import {Pool} from "./pool.js"
|
||||
|
||||
export enum AdapterEventType {
|
||||
Receive = "adapter:event:receive",
|
||||
}
|
||||
|
||||
export type AdapterEvents = {
|
||||
[AdapterEventType.Receive]: (message: RelayMessage, url: string) => void
|
||||
}
|
||||
|
||||
export abstract class AbstractAdapter extends (EventEmitter as new () => TypedEmitter<AdapterEvents>) {
|
||||
_unsubscribers: Unsubscriber[] = []
|
||||
|
||||
abstract urls: string[]
|
||||
abstract sockets: Socket[]
|
||||
abstract send(message: ClientMessage): void
|
||||
|
||||
cleanup() {
|
||||
this.removeAllListeners()
|
||||
this._unsubscribers.splice(0).forEach(call)
|
||||
}
|
||||
}
|
||||
|
||||
export class SocketAdapter extends AbstractAdapter {
|
||||
constructor(readonly socket: Socket) {
|
||||
super()
|
||||
|
||||
this._unsubscribers.push(
|
||||
on(socket, SocketEventType.Receive, (message: RelayMessage, url: string) => {
|
||||
this.emit(AdapterEventType.Receive, message, url)
|
||||
}),
|
||||
)
|
||||
}
|
||||
|
||||
get sockets() {
|
||||
return [this.socket]
|
||||
}
|
||||
|
||||
get urls() {
|
||||
return [this.socket.url]
|
||||
}
|
||||
|
||||
send(message: ClientMessage) {
|
||||
this.socket.send(message)
|
||||
}
|
||||
}
|
||||
|
||||
export class LocalAdapter extends AbstractAdapter {
|
||||
constructor(readonly relay: Relay) {
|
||||
super()
|
||||
|
||||
this._unsubscribers.push(
|
||||
on(relay, "*", (...message: RelayMessage) => {
|
||||
this.emit(AdapterEventType.Receive, message, LOCAL_RELAY_URL)
|
||||
}),
|
||||
)
|
||||
}
|
||||
|
||||
get sockets() {
|
||||
return []
|
||||
}
|
||||
|
||||
get urls() {
|
||||
return [LOCAL_RELAY_URL]
|
||||
}
|
||||
|
||||
send(message: ClientMessage) {
|
||||
const [type, ...rest] = message
|
||||
|
||||
this.relay.send(type, ...rest)
|
||||
}
|
||||
}
|
||||
|
||||
export type AdapterContext = {
|
||||
pool?: Pool
|
||||
relay?: Relay
|
||||
getAdapter?: (url: string, context: AdapterContext) => AbstractAdapter
|
||||
}
|
||||
|
||||
export const getAdapter = (url: string, context: AdapterContext) => {
|
||||
if (context.getAdapter) {
|
||||
const adapter = context.getAdapter(url, context)
|
||||
|
||||
if (adapter) {
|
||||
return adapter
|
||||
}
|
||||
}
|
||||
|
||||
if (url === LOCAL_RELAY_URL) {
|
||||
if (!context.relay) {
|
||||
throw new Error(`Unable to get local relay for ${url}`)
|
||||
}
|
||||
|
||||
return new LocalAdapter(context.relay)
|
||||
}
|
||||
|
||||
if (isRelayUrl(url)) {
|
||||
if (!context.pool) {
|
||||
throw new Error(`Unable to get socket for ${url}`)
|
||||
}
|
||||
|
||||
return new SocketAdapter(context.pool.get(url))
|
||||
}
|
||||
|
||||
throw new Error(`Invalid relay url ${url}`)
|
||||
}
|
||||
@@ -0,0 +1,184 @@
|
||||
import EventEmitter from "events"
|
||||
import {on, call, sleep} from "@welshman/lib"
|
||||
import type {SignedEvent, StampedEvent} from "@welshman/util"
|
||||
import {makeEvent, CLIENT_AUTH} from "@welshman/util"
|
||||
import {isRelayAuth, isClientAuth, isRelayOk, RelayMessage} from "./message.js"
|
||||
import {Socket, SocketStatus, SocketEventType} from "./socket.js"
|
||||
import {TypedEmitter, Unsubscriber} from "./util.js"
|
||||
|
||||
export const makeAuthEvent = (url: string, challenge: string) =>
|
||||
makeEvent(CLIENT_AUTH, {
|
||||
tags: [
|
||||
["relay", url],
|
||||
["challenge", challenge],
|
||||
],
|
||||
})
|
||||
|
||||
export enum AuthStatus {
|
||||
None = "auth:status:none",
|
||||
Requested = "auth:status:requested",
|
||||
PendingSignature = "auth:status:pending_signature",
|
||||
DeniedSignature = "auth:status:denied_signature",
|
||||
PendingResponse = "auth:status:pending_response",
|
||||
Forbidden = "auth:status:forbidden",
|
||||
Ok = "auth:status:ok",
|
||||
}
|
||||
|
||||
export type AuthResult = {
|
||||
ok: boolean
|
||||
reason?: string
|
||||
}
|
||||
|
||||
export enum AuthStateEventType {
|
||||
Status = "auth:event:status",
|
||||
}
|
||||
|
||||
export type AuthStateEvents = {
|
||||
[AuthStateEventType.Status]: (status: AuthStatus) => void
|
||||
}
|
||||
|
||||
export class AuthState extends (EventEmitter as new () => TypedEmitter<AuthStateEvents>) {
|
||||
challenge: string | undefined
|
||||
request: string | undefined
|
||||
details: string | undefined
|
||||
status = AuthStatus.None
|
||||
_unsubscribers: Unsubscriber[] = []
|
||||
|
||||
constructor(readonly socket: Socket) {
|
||||
super()
|
||||
|
||||
this._unsubscribers.push(
|
||||
on(socket, SocketEventType.Receive, (message: RelayMessage) => {
|
||||
if (isRelayOk(message)) {
|
||||
const [_, id, ok, details] = message
|
||||
|
||||
if (id === this.request) {
|
||||
this.details = details
|
||||
|
||||
if (ok) {
|
||||
this.setStatus(AuthStatus.Ok)
|
||||
} else {
|
||||
this.setStatus(AuthStatus.Forbidden)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (isRelayAuth(message)) {
|
||||
const [_, challenge] = message
|
||||
|
||||
this.challenge = challenge
|
||||
this.request = undefined
|
||||
this.details = undefined
|
||||
this.setStatus(AuthStatus.Requested)
|
||||
}
|
||||
}),
|
||||
on(socket, SocketEventType.Enqueue, (message: RelayMessage) => {
|
||||
if (isClientAuth(message)) {
|
||||
this.setStatus(AuthStatus.PendingResponse)
|
||||
}
|
||||
}),
|
||||
on(socket, SocketEventType.Status, (status: SocketStatus) => {
|
||||
if (status === SocketStatus.Closed) {
|
||||
this.challenge = undefined
|
||||
this.request = undefined
|
||||
this.details = undefined
|
||||
this.setStatus(AuthStatus.None)
|
||||
}
|
||||
}),
|
||||
)
|
||||
}
|
||||
|
||||
setStatus(status: AuthStatus) {
|
||||
this.status = status
|
||||
this.emit(AuthStateEventType.Status, status)
|
||||
}
|
||||
|
||||
cleanup() {
|
||||
this.removeAllListeners()
|
||||
this._unsubscribers.forEach(call)
|
||||
}
|
||||
}
|
||||
|
||||
export type AuthManagerOptions = {
|
||||
sign: (event: StampedEvent) => Promise<SignedEvent>
|
||||
eager?: boolean
|
||||
}
|
||||
|
||||
export class AuthManager {
|
||||
state: AuthState
|
||||
|
||||
constructor(
|
||||
readonly socket: Socket,
|
||||
readonly options: AuthManagerOptions,
|
||||
) {
|
||||
this.state = new AuthState(socket)
|
||||
this.state.on(AuthStateEventType.Status, (status: string) => {
|
||||
if (status === AuthStatus.Requested && options.eager) {
|
||||
this.respond()
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
async waitFor(condition: () => boolean, timeout = 300) {
|
||||
const start = Date.now()
|
||||
|
||||
while (Date.now() - timeout <= start) {
|
||||
if (condition()) {
|
||||
break
|
||||
}
|
||||
|
||||
await sleep(Math.min(100, Math.ceil(timeout / 3)))
|
||||
}
|
||||
}
|
||||
|
||||
async waitForChallenge(timeout = 300) {
|
||||
await this.waitFor(() => Boolean(this.state.challenge), timeout)
|
||||
}
|
||||
|
||||
async waitForResolution(timeout = 300) {
|
||||
await this.waitFor(
|
||||
() =>
|
||||
[AuthStatus.None, AuthStatus.DeniedSignature, AuthStatus.Forbidden, AuthStatus.Ok].includes(
|
||||
this.state.status,
|
||||
),
|
||||
timeout,
|
||||
)
|
||||
}
|
||||
|
||||
async attempt(timeout = 300) {
|
||||
await this.socket.attemptToOpen()
|
||||
await this.waitForChallenge(Math.ceil(timeout / 2))
|
||||
|
||||
if (this.state.status === AuthStatus.Requested) {
|
||||
await this.respond()
|
||||
}
|
||||
|
||||
await this.waitForResolution(Math.ceil(timeout / 2))
|
||||
}
|
||||
|
||||
async respond() {
|
||||
if (!this.state.challenge) {
|
||||
throw new Error("Attempted to authenticate with no challenge")
|
||||
}
|
||||
|
||||
if (this.state.status !== AuthStatus.Requested) {
|
||||
throw new Error(`Attempted to authenticate when auth is already ${this.state.status}`)
|
||||
}
|
||||
|
||||
this.state.setStatus(AuthStatus.PendingSignature)
|
||||
|
||||
const template = makeAuthEvent(this.socket.url, this.state.challenge)
|
||||
const event = await this.options.sign(template)
|
||||
|
||||
if (event) {
|
||||
this.state.request = event.id
|
||||
this.socket.send(["AUTH", event])
|
||||
} else {
|
||||
this.state.setStatus(AuthStatus.DeniedSignature)
|
||||
}
|
||||
}
|
||||
|
||||
cleanup() {
|
||||
this.state.cleanup()
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,244 @@
|
||||
import {EventEmitter} from "events"
|
||||
import {on, sleep, randomId, groupBy, pushToMapKey, inc, flatten, chunk} from "@welshman/lib"
|
||||
import {SignedEvent, Filter} from "@welshman/util"
|
||||
import {TypedEmitter} from "./util.js"
|
||||
import {
|
||||
RelayMessage,
|
||||
isRelayNegErr,
|
||||
isRelayNegMsg,
|
||||
RelayMessageType,
|
||||
ClientMessageType,
|
||||
} from "./message.js"
|
||||
import {getAdapter, AdapterContext, AbstractAdapter, AdapterEventType} from "./adapter.js"
|
||||
import {Negentropy, NegentropyStorageVector} from "./negentropy.js"
|
||||
import {unireq, RequestEventType} from "./request.js"
|
||||
import {multicast, PublishEventType} from "./publish.js"
|
||||
|
||||
export enum DifferenceEventType {
|
||||
Message = "difference:event:message",
|
||||
Error = "difference:event:error",
|
||||
Close = "difference:event:close",
|
||||
}
|
||||
|
||||
export type DifferenceEvents = {
|
||||
[DifferenceEventType.Message]: (payload: {have: string[]; need: string[]}, url: string) => void
|
||||
[DifferenceEventType.Error]: (error: string, url: string) => void
|
||||
[DifferenceEventType.Close]: () => void
|
||||
}
|
||||
|
||||
export type DifferenceOptions = {
|
||||
relay: string
|
||||
filter: Filter
|
||||
events: SignedEvent[]
|
||||
context: AdapterContext
|
||||
}
|
||||
|
||||
export class Difference extends (EventEmitter as new () => TypedEmitter<DifferenceEvents>) {
|
||||
have = new Set<string>()
|
||||
need = new Set<string>()
|
||||
|
||||
_id = `NEG-${randomId().slice(0, 8)}`
|
||||
_unsubscriber: () => void
|
||||
_adapter: AbstractAdapter
|
||||
_closed = false
|
||||
|
||||
constructor(readonly options: DifferenceOptions) {
|
||||
super()
|
||||
|
||||
// Set up our adapter
|
||||
this._adapter = getAdapter(this.options.relay, this.options.context)
|
||||
|
||||
// Set up negentropy
|
||||
const storage = new NegentropyStorageVector()
|
||||
const neg = new Negentropy(storage, 50_000)
|
||||
|
||||
for (const event of this.options.events) {
|
||||
storage.insert(event.created_at, event.id)
|
||||
}
|
||||
|
||||
storage.seal()
|
||||
|
||||
// Add listeners
|
||||
this._unsubscriber = on(
|
||||
this._adapter,
|
||||
AdapterEventType.Receive,
|
||||
async (message: RelayMessage, url: string) => {
|
||||
if (isRelayNegMsg(message)) {
|
||||
const [_, negid, msg] = message
|
||||
|
||||
if (negid === this._id) {
|
||||
const [newMsg, have, need] = await neg.reconcile(msg)
|
||||
|
||||
for (const id of have) {
|
||||
this.have.add(id)
|
||||
}
|
||||
|
||||
for (const id of need) {
|
||||
this.need.add(id)
|
||||
}
|
||||
|
||||
this.emit(DifferenceEventType.Message, {have, need}, url)
|
||||
|
||||
if (newMsg) {
|
||||
this._adapter.send([RelayMessageType.NegMsg, this._id, newMsg])
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (isRelayNegErr(message)) {
|
||||
const [_, negid, msg] = message
|
||||
|
||||
if (negid === this._id) {
|
||||
this.emit(DifferenceEventType.Error, msg, url)
|
||||
}
|
||||
}
|
||||
},
|
||||
)
|
||||
|
||||
neg.initiate().then((msg: string) => {
|
||||
this._adapter.send([ClientMessageType.NegOpen, this._id, this.options.filter, msg])
|
||||
})
|
||||
}
|
||||
|
||||
close() {
|
||||
if (this._closed) return
|
||||
|
||||
this._adapter.send([ClientMessageType.NegClose, this._id])
|
||||
this.emit(DifferenceEventType.Close)
|
||||
this.removeAllListeners()
|
||||
this._adapter.cleanup()
|
||||
this._unsubscriber()
|
||||
this._closed = true
|
||||
}
|
||||
}
|
||||
|
||||
// diff is a shortcut for diffing multiple filters across multiple relays
|
||||
|
||||
export type DiffOptions = {
|
||||
relays: string[]
|
||||
filters: Filter[]
|
||||
events: SignedEvent[]
|
||||
context: AdapterContext
|
||||
}
|
||||
|
||||
export type DiffItem = {
|
||||
relay: string
|
||||
have: Set<string>
|
||||
need: Set<string>
|
||||
}
|
||||
|
||||
export const diff = async ({relays, filters, ...options}: DiffOptions) => {
|
||||
const diffs = flatten(
|
||||
await Promise.all(
|
||||
relays.flatMap(async relay => {
|
||||
return await Promise.all(
|
||||
filters.map(
|
||||
async filter =>
|
||||
new Promise<DiffItem>((resolve, reject) => {
|
||||
const diff = new Difference({relay, filter, ...options})
|
||||
|
||||
diff.on(DifferenceEventType.Close, () => {
|
||||
resolve({relay, have: diff.have, need: diff.need})
|
||||
diff.close()
|
||||
})
|
||||
|
||||
diff.on(DifferenceEventType.Error, (url, message) => {
|
||||
reject(message)
|
||||
diff.close()
|
||||
})
|
||||
|
||||
sleep(30_000).then(() => {
|
||||
reject("timeout")
|
||||
diff.close()
|
||||
})
|
||||
}),
|
||||
),
|
||||
)
|
||||
}),
|
||||
),
|
||||
)
|
||||
|
||||
return Array.from(groupBy(diff => diff.relay, diffs).entries()).map(([relay, diffs]) => {
|
||||
const have = new Set<string>()
|
||||
const need = new Set<string>()
|
||||
|
||||
for (const diff of diffs) {
|
||||
for (const id of diff.have) {
|
||||
have.add(id)
|
||||
}
|
||||
|
||||
for (const id of diff.need) {
|
||||
need.add(id)
|
||||
}
|
||||
}
|
||||
|
||||
return {relay, have: Array.from(have), need: Array.from(need)}
|
||||
})
|
||||
}
|
||||
|
||||
// Pull diffs multiple arrays and fetches missing events
|
||||
|
||||
export type PullOptions = DiffOptions
|
||||
|
||||
export const pull = async ({context, ...options}: PullOptions) => {
|
||||
const countById = new Map<string, number>()
|
||||
const idsByRelay = new Map<string, string[]>()
|
||||
|
||||
for (const {relay, need} of await diff({context, ...options})) {
|
||||
for (const id of need) {
|
||||
const count = countById.get(id) || 0
|
||||
|
||||
// Reduce, but don't completely eliminate duplicates, just in case a relay
|
||||
// won't give us what we ask for.
|
||||
if (count < 2) {
|
||||
pushToMapKey(idsByRelay, relay, id)
|
||||
countById.set(id, inc(count))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const result: SignedEvent[] = []
|
||||
|
||||
await Promise.all(
|
||||
Array.from(idsByRelay.entries()).map(([relay, allIds]) => {
|
||||
return Promise.all(
|
||||
chunk(500, allIds).map(ids => {
|
||||
return new Promise<void>(resolve => {
|
||||
const req = unireq({relay, context, filter: {ids}, autoClose: true})
|
||||
|
||||
req.on(RequestEventType.Close, resolve)
|
||||
req.on(RequestEventType.Event, event => result.push(event))
|
||||
})
|
||||
}),
|
||||
)
|
||||
}),
|
||||
)
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
// Push diffs multiple relays and publishes missing events
|
||||
|
||||
export type PushOptions = DiffOptions
|
||||
|
||||
export const push = async ({context, events, ...options}: PushOptions) => {
|
||||
const relaysById = new Map<string, string[]>()
|
||||
|
||||
for (const {relay, have} of await diff({context, events, ...options})) {
|
||||
for (const id of have) {
|
||||
pushToMapKey(relaysById, id, relay)
|
||||
}
|
||||
}
|
||||
|
||||
await Promise.all(
|
||||
events.map(async event => {
|
||||
const relays = relaysById.get(event.id)
|
||||
|
||||
if (relays) {
|
||||
new Promise<void>(resolve => {
|
||||
multicast({event, relays, context}).on(PublishEventType.Complete, resolve)
|
||||
})
|
||||
}
|
||||
}),
|
||||
)
|
||||
}
|
||||
+11
-27
@@ -1,27 +1,11 @@
|
||||
export * from "./Connection.js"
|
||||
export * from "./ConnectionAuth.js"
|
||||
export * from "./ConnectionEvent.js"
|
||||
export * from "./ConnectionSender.js"
|
||||
export * from "./ConnectionState.js"
|
||||
export * from "./ConnectionStats.js"
|
||||
export * from "./Context.js"
|
||||
export * from "./Executor.js"
|
||||
export * from "./Pool.js"
|
||||
export * from "./Publish.js"
|
||||
export * from "./Socket.js"
|
||||
export * from "./Subscribe.js"
|
||||
export * from "./Sync.js"
|
||||
export * from "./Tracker.js"
|
||||
export * from "./target/Echo.js"
|
||||
export * from "./target/Multi.js"
|
||||
export * from "./target/Relay.js"
|
||||
export * from "./target/Relays.js"
|
||||
export * from "./target/Local.js"
|
||||
|
||||
import type {NetContext} from "./Context.js"
|
||||
|
||||
declare module "@welshman/lib" {
|
||||
interface Context {
|
||||
net: NetContext
|
||||
}
|
||||
}
|
||||
export * from "./adapter.js"
|
||||
export * from "./auth.js"
|
||||
export * from "./diff.js"
|
||||
export * from "./message.js"
|
||||
export * from "./negentropy.js"
|
||||
export * from "./policy.js"
|
||||
export * from "./pool.js"
|
||||
export * from "./publish.js"
|
||||
export * from "./socket.js"
|
||||
export * from "./request.js"
|
||||
export * from "./tracker.js"
|
||||
|
||||
@@ -0,0 +1,110 @@
|
||||
import type {SignedEvent, Filter} from "@welshman/util"
|
||||
|
||||
// relay -> client
|
||||
|
||||
export enum RelayMessageType {
|
||||
Auth = "AUTH",
|
||||
Closed = "CLOSED",
|
||||
Eose = "EOSE",
|
||||
Event = "EVENT",
|
||||
NegErr = "NEG-ERR",
|
||||
NegMsg = "NEG-MSG",
|
||||
Ok = "OK",
|
||||
}
|
||||
|
||||
export type RelayMessage = any[]
|
||||
|
||||
export type RelayAuthPayload = [string]
|
||||
|
||||
export type RelayClosedPayload = [string, string]
|
||||
|
||||
export type RelayEosePayload = [string, SignedEvent]
|
||||
|
||||
export type RelayEventPayload = [string, SignedEvent]
|
||||
|
||||
export type RelayNegErrPayload = [string, string]
|
||||
|
||||
export type RelayNegMsgPayload = [string, string]
|
||||
|
||||
export type RelayOkPayload = [string, boolean, string]
|
||||
|
||||
export type RelayAuth = [RelayMessageType.Auth, ...RelayAuthPayload]
|
||||
|
||||
export type RelayClosed = [RelayMessageType.Closed, ...RelayClosedPayload]
|
||||
|
||||
export type RelayEose = [RelayMessageType.Eose, ...RelayEosePayload]
|
||||
|
||||
export type RelayEvent = [RelayMessageType.Event, ...RelayEventPayload]
|
||||
|
||||
export type RelayNegErr = [RelayMessageType.NegErr, ...RelayNegErrPayload]
|
||||
|
||||
export type RelayNegMsg = [RelayMessageType.NegMsg, ...RelayNegMsgPayload]
|
||||
|
||||
export type RelayOk = [RelayMessageType.Ok, ...RelayOkPayload]
|
||||
|
||||
export const isRelayAuth = (m: RelayMessage): m is RelayAuth => m[0] === RelayMessageType.Auth
|
||||
|
||||
export const isRelayClosed = (m: RelayMessage): m is RelayClosed => m[0] === RelayMessageType.Closed
|
||||
|
||||
export const isRelayEose = (m: RelayMessage): m is RelayEose => m[0] === RelayMessageType.Eose
|
||||
|
||||
export const isRelayEvent = (m: RelayMessage): m is RelayEvent => m[0] === RelayMessageType.Event
|
||||
|
||||
export const isRelayNegErr = (m: RelayMessage): m is RelayNegErr => m[0] === RelayMessageType.NegErr
|
||||
|
||||
export const isRelayNegMsg = (m: RelayMessage): m is RelayNegMsg => m[0] === RelayMessageType.NegMsg
|
||||
|
||||
export const isRelayOk = (m: RelayMessage): m is RelayOk => m[0] === RelayMessageType.Ok
|
||||
|
||||
// client -> relay
|
||||
|
||||
export enum ClientMessageType {
|
||||
Auth = "AUTH",
|
||||
Close = "CLOSE",
|
||||
Event = "EVENT",
|
||||
NegClose = "NEG-CLOSE",
|
||||
NegOpen = "NEG-OPEN",
|
||||
Req = "REQ",
|
||||
}
|
||||
|
||||
export type ClientMessage = any[]
|
||||
|
||||
export type ClientAuthPayload = [string]
|
||||
|
||||
export type ClientClosePayload = [string]
|
||||
|
||||
export type ClientEventPayload = [SignedEvent]
|
||||
|
||||
export type ClientNegClosePayload = [string]
|
||||
|
||||
export type ClientNegOpenPayload = [string, Filter, string]
|
||||
|
||||
export type ClientReqPayload = [string, Filter]
|
||||
|
||||
export type ClientAuth = [ClientMessageType.Auth, ...ClientAuthPayload]
|
||||
|
||||
export type ClientClose = [ClientMessageType.Close, ...ClientClosePayload]
|
||||
|
||||
export type ClientEvent = [ClientMessageType.Event, ...ClientEventPayload]
|
||||
|
||||
export type ClientNegClose = [ClientMessageType.NegClose, ...ClientNegClosePayload]
|
||||
|
||||
export type ClientNegOpen = [ClientMessageType.NegOpen, ...ClientNegOpenPayload]
|
||||
|
||||
export type ClientReq = [ClientMessageType.Req, ...ClientReqPayload]
|
||||
|
||||
export const isClientAuth = (m: ClientMessage): m is ClientAuth => m[0] === ClientMessageType.Auth
|
||||
|
||||
export const isClientClose = (m: ClientMessage): m is ClientClose =>
|
||||
m[0] === ClientMessageType.Close
|
||||
|
||||
export const isClientEvent = (m: ClientMessage): m is ClientEvent =>
|
||||
m[0] === ClientMessageType.Event
|
||||
|
||||
export const isClientNegClose = (m: ClientMessage): m is ClientNegClose =>
|
||||
m[0] === ClientMessageType.NegClose
|
||||
|
||||
export const isClientNegOpen = (m: ClientMessage): m is ClientNegOpen =>
|
||||
m[0] === ClientMessageType.NegOpen
|
||||
|
||||
export const isClientReq = (m: ClientMessage): m is ClientReq => m[0] === ClientMessageType.Req
|
||||
@@ -0,0 +1,246 @@
|
||||
import {on, call, sleep, spec, ago, now} from "@welshman/lib"
|
||||
import {AUTH_JOIN} from "@welshman/util"
|
||||
import {
|
||||
ClientMessage,
|
||||
isClientAuth,
|
||||
isClientClose,
|
||||
isClientEvent,
|
||||
isClientReq,
|
||||
ClientMessageType,
|
||||
RelayMessage,
|
||||
isRelayOk,
|
||||
isRelayClosed,
|
||||
} from "./message.js"
|
||||
import {Socket, SocketStatus, SocketEventType} from "./socket.js"
|
||||
import {AuthState, AuthStatus, AuthStateEventType} from "./auth.js"
|
||||
|
||||
/**
|
||||
* Defers sending messages when a challenge has been presented and not answered yet
|
||||
* @param socket - a Socket object
|
||||
* @return a cleanup function
|
||||
*/
|
||||
export const socketPolicyDeferOnAuth = (socket: Socket) => {
|
||||
const buffer: ClientMessage[] = []
|
||||
const authState = new AuthState(socket)
|
||||
const okStatuses = [AuthStatus.None, AuthStatus.Ok]
|
||||
|
||||
const unsubscribers = [
|
||||
// Pause sending certain messages when we're not authenticated
|
||||
on(socket, SocketEventType.Enqueue, (message: ClientMessage) => {
|
||||
// If we're closing a request, but it never got sent, remove both from the queue
|
||||
// Otherwise, always send CLOSE
|
||||
if (isClientClose(message)) {
|
||||
const req = buffer.find(spec([ClientMessageType.Req, message[1]]))
|
||||
|
||||
if (req) {
|
||||
socket._sendQueue.remove(req)
|
||||
socket._sendQueue.remove(message)
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// Always allow sending auth
|
||||
if (isClientAuth(message)) return
|
||||
|
||||
// Always allow sending join requests
|
||||
if (isClientEvent(message) && message[1].kind === AUTH_JOIN) return
|
||||
|
||||
// If we're not ok, remove the message and save it for later
|
||||
if (!okStatuses.includes(authState.status)) {
|
||||
buffer.push(message)
|
||||
socket._sendQueue.remove(message)
|
||||
}
|
||||
}),
|
||||
// Send buffered messages when we get successful auth
|
||||
on(authState, AuthStateEventType.Status, (status: AuthStatus) => {
|
||||
if (okStatuses.includes(status) && buffer.length > 0) {
|
||||
for (const message of buffer.splice(0)) {
|
||||
socket.send(message)
|
||||
}
|
||||
}
|
||||
}),
|
||||
]
|
||||
|
||||
return () => {
|
||||
unsubscribers.forEach(call)
|
||||
authState.cleanup()
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Re-enqueues event/req messages once if rejected due to auth-required
|
||||
* @param socket - a Socket object
|
||||
* @return a cleanup function
|
||||
*/
|
||||
export const socketPolicyRetryAuthRequired = (socket: Socket) => {
|
||||
const retried = new Set<string>()
|
||||
const pending = new Map<string, ClientMessage>()
|
||||
|
||||
const unsubscribers = [
|
||||
// Watch outgoing events and requests and keep a copy
|
||||
on(socket, SocketEventType.Send, (message: ClientMessage) => {
|
||||
if (isClientEvent(message)) {
|
||||
const [_, event] = message
|
||||
|
||||
if (!retried.has(event.id) && event.kind !== AUTH_JOIN) {
|
||||
pending.set(event.id, message)
|
||||
}
|
||||
}
|
||||
|
||||
if (isClientReq(message)) {
|
||||
const [_, id] = message
|
||||
|
||||
if (!retried.has(id)) {
|
||||
pending.set(id, message)
|
||||
}
|
||||
}
|
||||
}),
|
||||
// If a message is rejected with auth-required, re-enqueue it one time
|
||||
on(socket, SocketEventType.Receive, (message: RelayMessage) => {
|
||||
if (isRelayOk(message)) {
|
||||
const [_, id, ok, detail] = message
|
||||
const pendingMessage = pending.get(id)
|
||||
|
||||
if (pendingMessage && !ok && detail?.startsWith("auth-required:")) {
|
||||
socket.send(pendingMessage)
|
||||
retried.add(id)
|
||||
}
|
||||
|
||||
pending.delete(id)
|
||||
}
|
||||
|
||||
if (isRelayClosed(message)) {
|
||||
const [_, id, detail] = message
|
||||
const pendingMessage = pending.get(id)
|
||||
|
||||
if (pendingMessage && detail?.startsWith("auth-required:")) {
|
||||
socket.send(pendingMessage)
|
||||
retried.add(id)
|
||||
}
|
||||
|
||||
pending.delete(id)
|
||||
}
|
||||
}),
|
||||
]
|
||||
|
||||
return () => unsubscribers.forEach(call)
|
||||
}
|
||||
|
||||
/**
|
||||
* Auto-connects a closed socket when a message is sent unless there was a recent error
|
||||
* @param socket - a Socket object
|
||||
* @return a cleanup function
|
||||
*/
|
||||
export const socketPolicyConnectOnSend = (socket: Socket) => {
|
||||
let lastError = 0
|
||||
let currentStatus = SocketStatus.Closed
|
||||
|
||||
const unsubscribers = [
|
||||
on(socket, SocketEventType.Status, (newStatus: SocketStatus) => {
|
||||
// Keep track of the most recent error
|
||||
if (newStatus === SocketStatus.Error) {
|
||||
lastError = now()
|
||||
}
|
||||
|
||||
// Keep track of the current status
|
||||
currentStatus = newStatus
|
||||
}),
|
||||
on(socket, SocketEventType.Enqueue, (message: ClientMessage) => {
|
||||
// When a new message is sent, make sure the socket is open (unless there was a recent error)
|
||||
if (currentStatus === SocketStatus.Closed && lastError < ago(30)) {
|
||||
socket.open()
|
||||
}
|
||||
}),
|
||||
]
|
||||
|
||||
return () => unsubscribers.forEach(call)
|
||||
}
|
||||
|
||||
/**
|
||||
* Auto-closes a socket after 30 seconds of inactivity
|
||||
* @param socket - a Socket object
|
||||
* @return a cleanup function
|
||||
*/
|
||||
export const socketPolicyCloseOnTimeout = (socket: Socket) => {
|
||||
let lastActivity = now()
|
||||
|
||||
const unsubscribers = [
|
||||
on(socket, SocketEventType.Send, (message: ClientMessage) => {
|
||||
lastActivity = now()
|
||||
}),
|
||||
on(socket, SocketEventType.Receive, (message: RelayMessage) => {
|
||||
lastActivity = now()
|
||||
}),
|
||||
]
|
||||
|
||||
const interval = setInterval(() => {
|
||||
if (socket.status === SocketStatus.Open && lastActivity < ago(30)) {
|
||||
socket.close()
|
||||
}
|
||||
}, 3000)
|
||||
|
||||
return () => {
|
||||
unsubscribers.forEach(call)
|
||||
clearInterval(interval)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Automatically re-opens a socket if there are active requests or publishes
|
||||
* @param socket - a Socket object
|
||||
* @return a cleanup function
|
||||
*/
|
||||
export const socketPolicyReopenActive = (socket: Socket) => {
|
||||
const pending = new Map<string, ClientMessage>()
|
||||
|
||||
let lastOpen = Date.now()
|
||||
|
||||
const unsubscribers = [
|
||||
on(socket, SocketEventType.Status, (newStatus: SocketStatus) => {
|
||||
// Keep track of the most recent error
|
||||
if (newStatus === SocketStatus.Open) {
|
||||
lastOpen = Date.now()
|
||||
}
|
||||
|
||||
// If the socket closed and we have no error, reopen it but don't flap
|
||||
if (newStatus === SocketStatus.Closed && pending.size) {
|
||||
console.log("1")
|
||||
sleep(Math.max(0, 30_000 - (Date.now() - lastOpen))).then(() => {
|
||||
console.log("2")
|
||||
for (const message of pending.values()) {
|
||||
socket.send(message)
|
||||
}
|
||||
})
|
||||
}
|
||||
}),
|
||||
on(socket, SocketEventType.Send, (message: ClientMessage) => {
|
||||
if (isClientEvent(message)) {
|
||||
pending.set(message[1].id, message)
|
||||
}
|
||||
|
||||
if (isClientReq(message)) {
|
||||
pending.set(message[1], message)
|
||||
}
|
||||
|
||||
if (isClientClose(message)) {
|
||||
pending.delete(message[1])
|
||||
}
|
||||
}),
|
||||
on(socket, SocketEventType.Receive, (message: RelayMessage) => {
|
||||
if (isRelayClosed(message) || isRelayOk(message)) {
|
||||
pending.delete(message[1])
|
||||
}
|
||||
}),
|
||||
]
|
||||
|
||||
return () => unsubscribers.forEach(call)
|
||||
}
|
||||
|
||||
export const defaultSocketPolicies = [
|
||||
socketPolicyDeferOnAuth,
|
||||
socketPolicyRetryAuthRequired,
|
||||
socketPolicyConnectOnSend,
|
||||
socketPolicyCloseOnTimeout,
|
||||
socketPolicyReopenActive,
|
||||
]
|
||||
@@ -0,0 +1,213 @@
|
||||
import {EventEmitter} from "events"
|
||||
import {verifyEvent as nostrToolsVerifyEvent} from "nostr-tools/pure"
|
||||
import {on, call, randomId, yieldThread} from "@welshman/lib"
|
||||
import {Filter, matchFilter, SignedEvent} from "@welshman/util"
|
||||
import {RelayMessage, ClientMessageType, isRelayEvent, isRelayEose} from "./message.js"
|
||||
import {getAdapter, AdapterContext, AbstractAdapter, AdapterEventType} from "./adapter.js"
|
||||
import {SocketEventType, SocketStatus} from "./socket.js"
|
||||
import {TypedEmitter, Unsubscriber} from "./util.js"
|
||||
import {Tracker} from "./tracker.js"
|
||||
|
||||
export const defaultVerifyEvent = (event: SignedEvent) => {
|
||||
try {
|
||||
return nostrToolsVerifyEvent(event)
|
||||
} catch (e) {
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
export enum RequestEventType {
|
||||
Close = "request:event:close",
|
||||
Disconnect = "request:event:disconnect",
|
||||
Duplicate = "request:event:duplicate",
|
||||
Eose = "request:event:eose",
|
||||
Event = "request:event:event",
|
||||
Filtered = "request:event:filtered",
|
||||
Invalid = "request:event:invalid",
|
||||
}
|
||||
|
||||
// Unireq
|
||||
|
||||
export type UnireqEvents = {
|
||||
[RequestEventType.Event]: (event: SignedEvent) => void
|
||||
[RequestEventType.Invalid]: (event: SignedEvent) => void
|
||||
[RequestEventType.Filtered]: (event: SignedEvent) => void
|
||||
[RequestEventType.Duplicate]: (event: SignedEvent) => void
|
||||
[RequestEventType.Disconnect]: () => void
|
||||
[RequestEventType.Close]: () => void
|
||||
[RequestEventType.Eose]: () => void
|
||||
}
|
||||
|
||||
export type UnireqOptions = {
|
||||
relay: string
|
||||
filter: Filter
|
||||
context: AdapterContext
|
||||
timeout?: number
|
||||
tracker?: Tracker
|
||||
autoClose?: boolean
|
||||
verifyEvent?: (event: SignedEvent) => boolean
|
||||
}
|
||||
|
||||
export class Unireq extends (EventEmitter as new () => TypedEmitter<UnireqEvents>) {
|
||||
_id = `REQ-${randomId().slice(0, 8)}`
|
||||
_unsubscribers: Unsubscriber[] = []
|
||||
_adapter: AbstractAdapter
|
||||
_closed = false
|
||||
|
||||
constructor(readonly options: UnireqOptions) {
|
||||
super()
|
||||
|
||||
const tracker = options.tracker || new Tracker()
|
||||
|
||||
const verifyEvent = options.verifyEvent || defaultVerifyEvent
|
||||
|
||||
// Set up our adapter
|
||||
this._adapter = getAdapter(this.options.relay, this.options.context)
|
||||
|
||||
// Listen for event/eose messages from the adapter
|
||||
this._unsubscribers.push(
|
||||
on(this._adapter, AdapterEventType.Receive, (message: RelayMessage, url: string) => {
|
||||
if (isRelayEvent(message)) {
|
||||
const [_, id, event] = message
|
||||
|
||||
if (id !== this._id) return
|
||||
|
||||
if (tracker.track(event.id, url)) {
|
||||
this.emit(RequestEventType.Duplicate, event)
|
||||
} else if (verifyEvent?.(event) === false) {
|
||||
this.emit(RequestEventType.Invalid, event)
|
||||
} else if (!matchFilter(this.options.filter, event)) {
|
||||
this.emit(RequestEventType.Filtered, event)
|
||||
} else {
|
||||
this.emit(RequestEventType.Event, event)
|
||||
}
|
||||
}
|
||||
|
||||
if (isRelayEose(message)) {
|
||||
const [_, id] = message
|
||||
|
||||
if (id === this._id) {
|
||||
this.emit(RequestEventType.Eose)
|
||||
|
||||
if (this.options.autoClose) {
|
||||
this.close()
|
||||
}
|
||||
}
|
||||
}
|
||||
}),
|
||||
)
|
||||
|
||||
// Listen to disconnects from any sockets
|
||||
for (const socket of this._adapter.sockets) {
|
||||
this._unsubscribers.push(
|
||||
on(socket, SocketEventType.Status, (status: SocketStatus) => {
|
||||
if (![SocketStatus.Open, SocketStatus.Opening].includes(status)) {
|
||||
this.emit(RequestEventType.Disconnect)
|
||||
|
||||
if (this.options.autoClose) {
|
||||
this.close()
|
||||
}
|
||||
}
|
||||
}),
|
||||
)
|
||||
}
|
||||
|
||||
// Timeout our subscription
|
||||
if (this.options.timeout) {
|
||||
setTimeout(() => this.close(), this.options.timeout)
|
||||
}
|
||||
|
||||
// Start asynchronously so the caller can set up listeners
|
||||
yieldThread().then(() => {
|
||||
this._adapter.send([ClientMessageType.Req, this._id, this.options.filter])
|
||||
})
|
||||
}
|
||||
|
||||
close() {
|
||||
if (this._closed) return
|
||||
|
||||
this._adapter.send(["CLOSE", this._id])
|
||||
this.emit(RequestEventType.Close)
|
||||
this.removeAllListeners()
|
||||
this._unsubscribers.map(call)
|
||||
this._adapter.cleanup()
|
||||
this._closed = true
|
||||
}
|
||||
}
|
||||
|
||||
// Multireq
|
||||
|
||||
export type MultireqEvents = {
|
||||
[RequestEventType.Event]: (event: SignedEvent, url: string) => void
|
||||
[RequestEventType.Invalid]: (event: SignedEvent, url: string) => void
|
||||
[RequestEventType.Filtered]: (event: SignedEvent, url: string) => void
|
||||
[RequestEventType.Duplicate]: (event: SignedEvent, url: string) => void
|
||||
[RequestEventType.Disconnect]: (url: string) => void
|
||||
[RequestEventType.Eose]: (url: string) => void
|
||||
[RequestEventType.Close]: () => void
|
||||
}
|
||||
|
||||
export type MultireqOptions = Omit<UnireqOptions, "relay"> & {
|
||||
relays: string[]
|
||||
}
|
||||
|
||||
export class Multireq extends (EventEmitter as new () => TypedEmitter<MultireqEvents>) {
|
||||
_children: Unireq[] = []
|
||||
_closed = new Set<string>()
|
||||
|
||||
constructor({relays, ...options}: MultireqOptions) {
|
||||
super()
|
||||
|
||||
const tracker = new Tracker()
|
||||
|
||||
for (const relay of relays) {
|
||||
const req = new Unireq({relay, tracker, ...options})
|
||||
|
||||
req.on(RequestEventType.Event, (event: SignedEvent) => {
|
||||
this.emit(RequestEventType.Event, event, relay)
|
||||
})
|
||||
|
||||
req.on(RequestEventType.Invalid, (event: SignedEvent) => {
|
||||
this.emit(RequestEventType.Invalid, event, relay)
|
||||
})
|
||||
|
||||
req.on(RequestEventType.Filtered, (event: SignedEvent) => {
|
||||
this.emit(RequestEventType.Filtered, event, relay)
|
||||
})
|
||||
|
||||
req.on(RequestEventType.Duplicate, (event: SignedEvent) => {
|
||||
this.emit(RequestEventType.Duplicate, event, relay)
|
||||
})
|
||||
|
||||
req.on(RequestEventType.Disconnect, () => {
|
||||
this.emit(RequestEventType.Disconnect, relay)
|
||||
})
|
||||
|
||||
req.on(RequestEventType.Eose, () => {
|
||||
this.emit(RequestEventType.Eose, relay)
|
||||
})
|
||||
|
||||
req.on(RequestEventType.Close, () => {
|
||||
this._closed.add(relay)
|
||||
|
||||
if (this._closed.size === relays.length) {
|
||||
this.emit(RequestEventType.Close)
|
||||
}
|
||||
})
|
||||
|
||||
this._children.push(req)
|
||||
}
|
||||
}
|
||||
|
||||
close() {
|
||||
for (const child of this._children) {
|
||||
child.close()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Convenience functions
|
||||
|
||||
export const unireq = (options: UnireqOptions) => new Unireq(options)
|
||||
|
||||
export const multireq = (options: MultireqOptions) => new Multireq(options)
|
||||
@@ -1,16 +0,0 @@
|
||||
import {Emitter} from "@welshman/lib"
|
||||
import type {Message} from "../Socket.js"
|
||||
|
||||
export class Echo extends Emitter {
|
||||
get connections() {
|
||||
return []
|
||||
}
|
||||
|
||||
async send(...payload: Message) {
|
||||
this.emit(...payload)
|
||||
}
|
||||
|
||||
cleanup = () => {
|
||||
this.removeAllListeners()
|
||||
}
|
||||
}
|
||||
@@ -1,30 +0,0 @@
|
||||
import {Emitter} from "@welshman/lib"
|
||||
import {Relay, LOCAL_RELAY_URL} from "@welshman/util"
|
||||
import type {Message} from "../Socket.js"
|
||||
|
||||
export class Local extends Emitter {
|
||||
constructor(readonly relay: Relay) {
|
||||
super()
|
||||
|
||||
relay.on("*", this.onMessage)
|
||||
}
|
||||
|
||||
get connections() {
|
||||
return []
|
||||
}
|
||||
|
||||
async send(...payload: Message) {
|
||||
await this.relay.send(...payload)
|
||||
}
|
||||
|
||||
onMessage = (...message: Message) => {
|
||||
const [verb, ...payload] = message
|
||||
|
||||
this.emit(verb, LOCAL_RELAY_URL, ...payload)
|
||||
}
|
||||
|
||||
cleanup = () => {
|
||||
this.removeAllListeners()
|
||||
this.relay.off("*", this.onMessage)
|
||||
}
|
||||
}
|
||||
@@ -1,26 +0,0 @@
|
||||
import {Emitter} from "@welshman/lib"
|
||||
import type {Message} from "../Socket.js"
|
||||
import type {Target} from "../Executor.js"
|
||||
|
||||
export class Multi extends Emitter {
|
||||
constructor(readonly targets: Target[]) {
|
||||
super()
|
||||
|
||||
targets.forEach(t => {
|
||||
t.on("*", (verb, ...args) => this.emit(verb, ...args))
|
||||
})
|
||||
}
|
||||
|
||||
get connections() {
|
||||
return this.targets.flatMap(t => t.connections)
|
||||
}
|
||||
|
||||
async send(...payload: Message) {
|
||||
await Promise.all(this.targets.map(t => t.send(...payload)))
|
||||
}
|
||||
|
||||
cleanup = () => {
|
||||
this.removeAllListeners()
|
||||
this.targets.forEach(t => t.cleanup())
|
||||
}
|
||||
}
|
||||
@@ -1,29 +0,0 @@
|
||||
import {Emitter} from "@welshman/lib"
|
||||
import {ConnectionEvent} from "../ConnectionEvent.js"
|
||||
import type {Message} from "../Socket.js"
|
||||
import type {Connection} from "../Connection.js"
|
||||
|
||||
export class Relay extends Emitter {
|
||||
constructor(readonly connection: Connection) {
|
||||
super()
|
||||
|
||||
this.connection.on(ConnectionEvent.Receive, this.onMessage)
|
||||
}
|
||||
|
||||
get connections() {
|
||||
return [this.connection]
|
||||
}
|
||||
|
||||
async send(...payload: Message) {
|
||||
await this.connection.send(payload)
|
||||
}
|
||||
|
||||
onMessage = (connection: Connection, [verb, ...payload]: Message) => {
|
||||
this.emit(verb, connection.url, ...payload)
|
||||
}
|
||||
|
||||
cleanup = () => {
|
||||
this.removeAllListeners()
|
||||
this.connection.off(ConnectionEvent.Receive, this.onMessage)
|
||||
}
|
||||
}
|
||||
@@ -1,29 +0,0 @@
|
||||
import {Emitter} from "@welshman/lib"
|
||||
import type {Message} from "../Socket.js"
|
||||
import type {Connection} from "../Connection.js"
|
||||
import {ConnectionEvent} from "../ConnectionEvent.js"
|
||||
|
||||
export class Relays extends Emitter {
|
||||
constructor(readonly connections: Connection[]) {
|
||||
super()
|
||||
|
||||
connections.forEach(connection => {
|
||||
connection.on(ConnectionEvent.Receive, this.onMessage)
|
||||
})
|
||||
}
|
||||
|
||||
async send(...payload: Message) {
|
||||
await Promise.all(this.connections.map(c => c.send(payload)))
|
||||
}
|
||||
|
||||
onMessage = (connection: Connection, [verb, ...payload]: Message) => {
|
||||
this.emit(verb, connection.url, ...payload)
|
||||
}
|
||||
|
||||
cleanup = () => {
|
||||
this.removeAllListeners()
|
||||
this.connections.forEach(connection => {
|
||||
connection.off(ConnectionEvent.Receive, this.onMessage)
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,5 @@
|
||||
import TypedEventEmitter, {EventMap} from "typed-emitter"
|
||||
|
||||
export type TypedEmitter<T extends EventMap> = TypedEventEmitter.default<T>
|
||||
|
||||
export type Unsubscriber = () => void
|
||||
Reference in New Issue
Block a user