Fix some net bugs

This commit is contained in:
Jon Staab
2026-05-28 11:30:54 -07:00
parent 4137f35cee
commit 3b2f2b14e4
10 changed files with 81 additions and 40 deletions
+2 -2
View File
@@ -94,7 +94,7 @@ describe("Pool", () => {
describe("remove", () => { describe("remove", () => {
it("should remove and cleanup existing socket", () => { it("should remove and cleanup existing socket", () => {
const mockSocket = {url: "wss://test.relay", cleanup: vi.fn()} const mockSocket = {url: "wss://test.relay/", cleanup: vi.fn()}
pool._data.set(mockSocket.url, mockSocket as unknown as Socket) pool._data.set(mockSocket.url, mockSocket as unknown as Socket)
pool.remove(mockSocket.url) pool.remove(mockSocket.url)
@@ -111,7 +111,7 @@ describe("Pool", () => {
describe("clear", () => { describe("clear", () => {
it("should remove all sockets", () => { it("should remove all sockets", () => {
const urls = ["wss://test1.relay", "wss://test2.relay"] const urls = ["wss://test1.relay/", "wss://test2.relay/"]
const mockSockets = urls.map(url => ({url, cleanup: vi.fn()})) const mockSockets = urls.map(url => ({url, cleanup: vi.fn()}))
for (const mockSocket of mockSockets) { for (const mockSocket of mockSockets) {
+9 -1
View File
@@ -100,12 +100,20 @@ export class AuthState extends EventEmitter {
throw new Error(`Attempted to authenticate when auth is already ${this.status}`) throw new Error(`Attempted to authenticate when auth is already ${this.status}`)
} }
const challenge = this.challenge
this.setStatus(AuthStatus.PendingSignature) this.setStatus(AuthStatus.PendingSignature)
const template = makeRelayAuth(this.socket.url, this.challenge) const template = makeRelayAuth(this.socket.url, challenge)
const event = await tryCatch(() => sign(template)) const event = await tryCatch(() => sign(template))
if (event) { if (event) {
// If a new challenge arrived while signing, our signature is stale, so
// abort rather than responding to an obsolete challenge
if (this.challenge !== challenge) {
return
}
this.request = event.id this.request = event.id
this.socket.send(["AUTH", event]) this.socket.send(["AUTH", event])
} else { } else {
+18 -13
View File
@@ -41,6 +41,7 @@ export class Difference extends EventEmitter {
_unsubscriber: () => void _unsubscriber: () => void
_adapter: AbstractAdapter _adapter: AbstractAdapter
_closed = false _closed = false
_reconcileQueue: Promise<void> = Promise.resolve()
constructor(readonly options: DifferenceOptions) { constructor(readonly options: DifferenceOptions) {
super() super()
@@ -70,23 +71,27 @@ export class Difference extends EventEmitter {
const [_, negid, msg] = message const [_, negid, msg] = message
if (negid === this._id) { if (negid === this._id) {
const [newMsg, have, need] = await neg.reconcile(msg) // Serialize reconcile calls through a promise queue so that concurrent
// messages don't corrupt negentropy's internal timestamp state.
await (this._reconcileQueue = this._reconcileQueue.then(async () => {
const [newMsg, have, need] = await neg.reconcile(msg)
for (const id of have) { for (const id of have) {
this.have.add(id) this.have.add(id)
} }
for (const id of need) { for (const id of need) {
this.need.add(id) this.need.add(id)
} }
this.emit(DifferenceEvent.Message, {have, need}, url) this.emit(DifferenceEvent.Message, {have, need}, url)
if (newMsg) { if (newMsg) {
this._adapter.send([RelayMessageType.NegMsg, this._id, newMsg]) this._adapter.send([RelayMessageType.NegMsg, this._id, newMsg])
} else { } else {
this.close() this.close()
} }
}))
} }
} }
+1 -1
View File
@@ -154,7 +154,7 @@ export const socketPolicyCloseInactive = (socket: Socket) => {
const unsubscribers = [ const unsubscribers = [
on(socket, SocketEvent.Status, (newStatus: SocketStatus) => { on(socket, SocketEvent.Status, (newStatus: SocketStatus) => {
const isClosed = [SocketStatus.Closed, SocketStatus.Error].includes(socket.status) const isClosed = [SocketStatus.Closed, SocketStatus.Error].includes(newStatus)
// Keep track of the most recent open // Keep track of the most recent open
if (newStatus === SocketStatus.Open) { if (newStatus === SocketStatus.Open) {
+2 -2
View File
@@ -65,12 +65,12 @@ export class Pool {
} }
remove(url: string) { remove(url: string) {
const socket = this._data.get(url) const socket = this._data.get(normalizeRelayUrl(url))
if (socket) { if (socket) {
socket.cleanup() socket.cleanup()
this._data.delete(url) this._data.delete(normalizeRelayUrl(url))
} }
} }
+22 -14
View File
@@ -1,4 +1,4 @@
import {fromPairs} from "@welshman/lib" import {fromPairs, once} from "@welshman/lib"
import {SignedEvent} from "@welshman/util" import {SignedEvent} from "@welshman/util"
import {RelayMessage, ClientMessageType, isRelayOk} from "./message.js" import {RelayMessage, ClientMessageType, isRelayOk} from "./message.js"
import {AdapterEvent, AdapterContext, getAdapter} from "./adapter.js" import {AdapterEvent, AdapterContext, getAdapter} from "./adapter.js"
@@ -44,11 +44,26 @@ export const publishOne = (options: PublishOneOptions) =>
options.onPending?.(result) options.onPending?.(result)
const cleanup = () => { let timeoutId: ReturnType<typeof setTimeout> | null = null
const abort = () => {
if (result.status === PublishStatus.Pending) {
result.status = PublishStatus.Aborted
result.detail = "aborted"
options.onAborted?.(result)
}
cleanup()
}
const cleanup = once(() => {
options.signal?.removeEventListener("abort", abort)
options.onComplete?.(result) options.onComplete?.(result)
clearTimeout(timeoutId)
adapter.cleanup() adapter.cleanup()
resolve(result) resolve(result)
} })
adapter.on(AdapterEvent.Receive, (message: RelayMessage, url: string) => { adapter.on(AdapterEvent.Receive, (message: RelayMessage, url: string) => {
if (isRelayOk(message)) { if (isRelayOk(message)) {
@@ -72,18 +87,11 @@ export const publishOne = (options: PublishOneOptions) =>
} }
}) })
options.signal?.addEventListener("abort", () => { if (options.signal) {
if (result.status === PublishStatus.Pending) { options.signal.addEventListener("abort", abort)
result.status = PublishStatus.Aborted }
result.detail = "aborted"
options.onAborted?.(result) timeoutId = setTimeout(() => {
}
cleanup()
})
setTimeout(() => {
if (result.status === PublishStatus.Pending) { if (result.status === PublishStatus.Pending) {
result.status = PublishStatus.Timeout result.status = PublishStatus.Timeout
result.detail = "timed out" result.detail = "timed out"
+5 -1
View File
@@ -327,7 +327,11 @@ export class Repository extends Emitter {
a.push(add) a.push(add)
} }
m.set(k, a) if (a.length > 0) {
m.set(k, a)
} else {
m.delete(k)
}
} }
_getEvents = (ids: Iterable<string>) => { _getEvents = (ids: Iterable<string>) => {
+18 -2
View File
@@ -148,7 +148,10 @@ export const requestOne = (options: RequestOneOptions) => {
} }
// Handle abort signal // Handle abort signal
options.signal?.addEventListener("abort", close) if (options.signal) {
options.signal.addEventListener("abort", close)
unsubscribers.push(() => options.signal.removeEventListener("abort", close))
}
// If we're auto-closing, make sure it happens even if the relay doesn't send an eose // If we're auto-closing, make sure it happens even if the relay doesn't send an eose
// and the caller doesn't provide a signal, in order to avoid memory leaks // and the caller doesn't provide a signal, in order to avoid memory leaks
@@ -247,6 +250,8 @@ export const makeLoader = (options: LoaderOptions) =>
const threshold = options.threshold || 1 const threshold = options.threshold || 1
const tracker = new Tracker() const tracker = new Tracker()
const abortHandlersByRequest = new Map<LoadOptions, (relay: string) => void>()
const close = (relay: string, request: LoadOptions) => { const close = (relay: string, request: LoadOptions) => {
addToMapKey(closedRequestsByRelay, relay, request) addToMapKey(closedRequestsByRelay, relay, request)
addToMapKey(closedRelaysByRequest, request, relay) addToMapKey(closedRelaysByRequest, request, relay)
@@ -257,6 +262,13 @@ export const makeLoader = (options: LoaderOptions) =>
request.onClose?.() request.onClose?.()
resultsByRequest.get(request)?.resolve(events) resultsByRequest.get(request)?.resolve(events)
// Clean up the abort listener once the request is fully resolved
const abortHandler = abortHandlersByRequest.get(request)
if (abortHandler) {
request.signal?.removeEventListener("abort", abortHandler)
abortHandlersByRequest.delete(request)
}
} }
if (closedRequestsByRelay.get(relay)?.size === requestsByRelay.get(relay)?.length) { if (closedRequestsByRelay.get(relay)?.size === requestsByRelay.get(relay)?.length) {
@@ -270,7 +282,11 @@ export const makeLoader = (options: LoaderOptions) =>
resultsByRequest.set(request, defer()) resultsByRequest.set(request, defer())
// Propagate abort when all requests have been closed for a given relay // Propagate abort when all requests have been closed for a given relay
request.signal?.addEventListener("abort", () => close(relay, request)) if (request.signal) {
const abortHandler = () => close(relay, request)
abortHandlersByRequest.set(request, abortHandler)
request.signal.addEventListener("abort", abortHandler)
}
} }
} }
+1 -1
View File
@@ -141,9 +141,9 @@ export class Socket extends EventEmitter {
} }
cleanup = () => { cleanup = () => {
this.unsubscribers.forEach(call)
this.close() this.close()
this.auth.cleanup() this.auth.cleanup()
this.unsubscribers.forEach(call)
this._recvQueue.clear() this._recvQueue.clear()
this._sendQueue.clear() this._sendQueue.clear()
this.removeAllListeners() this.removeAllListeners()
+3 -3
View File
@@ -40,10 +40,10 @@ export class Tracker extends Emitter {
} }
removeRelay = (eventId: string, relay: string) => { removeRelay = (eventId: string, relay: string) => {
const didDeleteRelay = this.relaysById.get(eventId)?.delete(relay) const didDeleteRelay = this.relaysById.get(eventId)?.delete(relay) ?? false
const didDeleteId = this.idsByRelay.get(relay)?.delete(eventId) const didDeleteId = this.idsByRelay.get(relay)?.delete(eventId) ?? false
if (!didDeleteRelay && !didDeleteId) return if (!didDeleteRelay || !didDeleteId) return
this.emit("remove", eventId, relay) this.emit("remove", eventId, relay)
} }