diff --git a/packages/net/__tests__/pool.test.ts b/packages/net/__tests__/pool.test.ts index 0273273..6e1aeea 100644 --- a/packages/net/__tests__/pool.test.ts +++ b/packages/net/__tests__/pool.test.ts @@ -94,7 +94,7 @@ describe("Pool", () => { describe("remove", () => { it("should remove and cleanup existing socket", () => { - const mockSocket = {url: "wss://test.relay", cleanup: vi.fn()} + const mockSocket = {url: "wss://test.relay/", cleanup: vi.fn()} pool._data.set(mockSocket.url, mockSocket as unknown as Socket) pool.remove(mockSocket.url) @@ -111,7 +111,7 @@ describe("Pool", () => { describe("clear", () => { it("should remove all sockets", () => { - const urls = ["wss://test1.relay", "wss://test2.relay"] + const urls = ["wss://test1.relay/", "wss://test2.relay/"] const mockSockets = urls.map(url => ({url, cleanup: vi.fn()})) for (const mockSocket of mockSockets) { diff --git a/packages/net/src/auth.ts b/packages/net/src/auth.ts index a5c264b..e5f7766 100644 --- a/packages/net/src/auth.ts +++ b/packages/net/src/auth.ts @@ -100,12 +100,20 @@ export class AuthState extends EventEmitter { throw new Error(`Attempted to authenticate when auth is already ${this.status}`) } + const challenge = this.challenge + this.setStatus(AuthStatus.PendingSignature) - const template = makeRelayAuth(this.socket.url, this.challenge) + const template = makeRelayAuth(this.socket.url, challenge) const event = await tryCatch(() => sign(template)) if (event) { + // If a new challenge arrived while signing, our signature is stale, so + // abort rather than responding to an obsolete challenge + if (this.challenge !== challenge) { + return + } + this.request = event.id this.socket.send(["AUTH", event]) } else { diff --git a/packages/net/src/diff.ts b/packages/net/src/diff.ts index 4599c4a..0832fbd 100644 --- a/packages/net/src/diff.ts +++ b/packages/net/src/diff.ts @@ -41,6 +41,7 @@ export class Difference extends EventEmitter { _unsubscriber: () => void _adapter: AbstractAdapter _closed = false + _reconcileQueue: Promise = Promise.resolve() constructor(readonly options: DifferenceOptions) { super() @@ -70,23 +71,27 @@ export class Difference extends EventEmitter { const [_, negid, msg] = message if (negid === this._id) { - const [newMsg, have, need] = await neg.reconcile(msg) + // Serialize reconcile calls through a promise queue so that concurrent + // messages don't corrupt negentropy's internal timestamp state. + await (this._reconcileQueue = this._reconcileQueue.then(async () => { + const [newMsg, have, need] = await neg.reconcile(msg) - for (const id of have) { - this.have.add(id) - } + for (const id of have) { + this.have.add(id) + } - for (const id of need) { - this.need.add(id) - } + for (const id of need) { + this.need.add(id) + } - this.emit(DifferenceEvent.Message, {have, need}, url) + this.emit(DifferenceEvent.Message, {have, need}, url) - if (newMsg) { - this._adapter.send([RelayMessageType.NegMsg, this._id, newMsg]) - } else { - this.close() - } + if (newMsg) { + this._adapter.send([RelayMessageType.NegMsg, this._id, newMsg]) + } else { + this.close() + } + })) } } diff --git a/packages/net/src/policy.ts b/packages/net/src/policy.ts index f032d6d..58f05cb 100644 --- a/packages/net/src/policy.ts +++ b/packages/net/src/policy.ts @@ -154,7 +154,7 @@ export const socketPolicyCloseInactive = (socket: Socket) => { const unsubscribers = [ on(socket, SocketEvent.Status, (newStatus: SocketStatus) => { - const isClosed = [SocketStatus.Closed, SocketStatus.Error].includes(socket.status) + const isClosed = [SocketStatus.Closed, SocketStatus.Error].includes(newStatus) // Keep track of the most recent open if (newStatus === SocketStatus.Open) { diff --git a/packages/net/src/pool.ts b/packages/net/src/pool.ts index fcdc913..0382bf6 100644 --- a/packages/net/src/pool.ts +++ b/packages/net/src/pool.ts @@ -65,12 +65,12 @@ export class Pool { } remove(url: string) { - const socket = this._data.get(url) + const socket = this._data.get(normalizeRelayUrl(url)) if (socket) { socket.cleanup() - this._data.delete(url) + this._data.delete(normalizeRelayUrl(url)) } } diff --git a/packages/net/src/publish.ts b/packages/net/src/publish.ts index 4207a50..f541872 100644 --- a/packages/net/src/publish.ts +++ b/packages/net/src/publish.ts @@ -1,4 +1,4 @@ -import {fromPairs} from "@welshman/lib" +import {fromPairs, once} from "@welshman/lib" import {SignedEvent} from "@welshman/util" import {RelayMessage, ClientMessageType, isRelayOk} from "./message.js" import {AdapterEvent, AdapterContext, getAdapter} from "./adapter.js" @@ -44,11 +44,26 @@ export const publishOne = (options: PublishOneOptions) => options.onPending?.(result) - const cleanup = () => { + let timeoutId: ReturnType | null = null + + const abort = () => { + if (result.status === PublishStatus.Pending) { + result.status = PublishStatus.Aborted + result.detail = "aborted" + + options.onAborted?.(result) + } + + cleanup() + } + + const cleanup = once(() => { + options.signal?.removeEventListener("abort", abort) options.onComplete?.(result) + clearTimeout(timeoutId) adapter.cleanup() resolve(result) - } + }) adapter.on(AdapterEvent.Receive, (message: RelayMessage, url: string) => { if (isRelayOk(message)) { @@ -72,18 +87,11 @@ export const publishOne = (options: PublishOneOptions) => } }) - options.signal?.addEventListener("abort", () => { - if (result.status === PublishStatus.Pending) { - result.status = PublishStatus.Aborted - result.detail = "aborted" + if (options.signal) { + options.signal.addEventListener("abort", abort) + } - options.onAborted?.(result) - } - - cleanup() - }) - - setTimeout(() => { + timeoutId = setTimeout(() => { if (result.status === PublishStatus.Pending) { result.status = PublishStatus.Timeout result.detail = "timed out" diff --git a/packages/net/src/repository.ts b/packages/net/src/repository.ts index 8ba613d..28410e4 100644 --- a/packages/net/src/repository.ts +++ b/packages/net/src/repository.ts @@ -327,7 +327,11 @@ export class Repository extends Emitter { a.push(add) } - m.set(k, a) + if (a.length > 0) { + m.set(k, a) + } else { + m.delete(k) + } } _getEvents = (ids: Iterable) => { diff --git a/packages/net/src/request.ts b/packages/net/src/request.ts index c151011..8eda0d4 100644 --- a/packages/net/src/request.ts +++ b/packages/net/src/request.ts @@ -148,7 +148,10 @@ export const requestOne = (options: RequestOneOptions) => { } // Handle abort signal - options.signal?.addEventListener("abort", close) + if (options.signal) { + options.signal.addEventListener("abort", close) + unsubscribers.push(() => options.signal.removeEventListener("abort", close)) + } // If we're auto-closing, make sure it happens even if the relay doesn't send an eose // and the caller doesn't provide a signal, in order to avoid memory leaks @@ -247,6 +250,8 @@ export const makeLoader = (options: LoaderOptions) => const threshold = options.threshold || 1 const tracker = new Tracker() + const abortHandlersByRequest = new Map void>() + const close = (relay: string, request: LoadOptions) => { addToMapKey(closedRequestsByRelay, relay, request) addToMapKey(closedRelaysByRequest, request, relay) @@ -257,6 +262,13 @@ export const makeLoader = (options: LoaderOptions) => request.onClose?.() resultsByRequest.get(request)?.resolve(events) + + // Clean up the abort listener once the request is fully resolved + const abortHandler = abortHandlersByRequest.get(request) + if (abortHandler) { + request.signal?.removeEventListener("abort", abortHandler) + abortHandlersByRequest.delete(request) + } } if (closedRequestsByRelay.get(relay)?.size === requestsByRelay.get(relay)?.length) { @@ -270,7 +282,11 @@ export const makeLoader = (options: LoaderOptions) => resultsByRequest.set(request, defer()) // Propagate abort when all requests have been closed for a given relay - request.signal?.addEventListener("abort", () => close(relay, request)) + if (request.signal) { + const abortHandler = () => close(relay, request) + abortHandlersByRequest.set(request, abortHandler) + request.signal.addEventListener("abort", abortHandler) + } } } diff --git a/packages/net/src/socket.ts b/packages/net/src/socket.ts index 4107928..c839d1c 100644 --- a/packages/net/src/socket.ts +++ b/packages/net/src/socket.ts @@ -141,9 +141,9 @@ export class Socket extends EventEmitter { } cleanup = () => { + this.unsubscribers.forEach(call) this.close() this.auth.cleanup() - this.unsubscribers.forEach(call) this._recvQueue.clear() this._sendQueue.clear() this.removeAllListeners() diff --git a/packages/net/src/tracker.ts b/packages/net/src/tracker.ts index 7c7bbb3..02eddd3 100644 --- a/packages/net/src/tracker.ts +++ b/packages/net/src/tracker.ts @@ -40,10 +40,10 @@ export class Tracker extends Emitter { } removeRelay = (eventId: string, relay: string) => { - const didDeleteRelay = this.relaysById.get(eventId)?.delete(relay) - const didDeleteId = this.idsByRelay.get(relay)?.delete(eventId) + const didDeleteRelay = this.relaysById.get(eventId)?.delete(relay) ?? false + const didDeleteId = this.idsByRelay.get(relay)?.delete(eventId) ?? false - if (!didDeleteRelay && !didDeleteId) return + if (!didDeleteRelay || !didDeleteId) return this.emit("remove", eventId, relay) }