diff --git a/packages/net/src/Connection.ts b/packages/net/src/Connection.ts index bbab68c..d52a7c8 100644 --- a/packages/net/src/Connection.ts +++ b/packages/net/src/Connection.ts @@ -44,11 +44,13 @@ export class Connection extends Emitter { emit = (type: ConnectionEvent, ...args: any[]) => super.emit(type, this, ...args) send = async (message: Message) => { + if (this.status !== Ready) { + throw new Error(`Attempted to send message on ${this.status} connection`) + } + await this.socket.open() - if (this.status === Ready) { - this.sender.push(message) - } + this.sender.push(message) } close = async () => { diff --git a/packages/net/src/Executor.ts b/packages/net/src/Executor.ts index 5264849..0aa121e 100644 --- a/packages/net/src/Executor.ts +++ b/packages/net/src/Executor.ts @@ -1,4 +1,4 @@ -import {ctx} from '@welshman/lib' +import {ctx, noop} from '@welshman/lib' import type {Emitter} from '@welshman/lib' import type {SignedEvent, TrustedEvent, Filter} from '@welshman/util' import type {Message} from './Socket' @@ -7,7 +7,7 @@ import {Negentropy, NegentropyStorageVector} from './Negentropy' export type Target = Emitter & { connections: Connection[] - send: (...args: Message) => void + send: (...args: Message) => Promise cleanup: () => void } @@ -58,7 +58,7 @@ export class Executor { unsubscribe: () => { if (closed) return - this.target.send("CLOSE", id) + this.target.send("CLOSE", id).catch(noop) this.target.off('EVENT', eventListener) this.target.off('EOSE', eoseListener) @@ -132,7 +132,7 @@ export class Executor { const close = () => { if (closed) return - this.target.send('NEG-CLOSE', id) + this.target.send('NEG-CLOSE', id).catch(noop) this.target.off('NEG-MSG', msgListener) this.target.off('NEG-ERR', errListener) diff --git a/packages/net/src/Pool.ts b/packages/net/src/Pool.ts index d198491..581f165 100644 --- a/packages/net/src/Pool.ts +++ b/packages/net/src/Pool.ts @@ -3,14 +3,17 @@ import {Connection} from "./Connection" export class Pool extends Emitter { data: Map + constructor() { super() this.data = new Map() } + has(url: string) { return this.data.has(url) } + get(url: string): Connection { const oldConnection = this.data.get(url) @@ -25,6 +28,7 @@ export class Pool extends Emitter { return newConnection } + remove(url: string) { const connection = this.data.get(url) @@ -34,6 +38,7 @@ export class Pool extends Emitter { this.data.delete(url) } } + clear() { for (const url of this.data.keys()) { this.remove(url) diff --git a/packages/net/src/Subscribe.ts b/packages/net/src/Subscribe.ts index 87beb55..e0e3704 100644 --- a/packages/net/src/Subscribe.ts +++ b/packages/net/src/Subscribe.ts @@ -247,10 +247,7 @@ const _executeSubscription = (sub: Subscription) => { emitter.on(SubscriptionEvent.Complete, () => { emitter.removeAllListeners() subs.forEach(sub => sub.unsubscribe()) - executor.target.connections.forEach((c: Connection) => { - c.off(ConnectionEvent.Close, onClose) - }) - + executor.target.connections.forEach(c => c.off(ConnectionEvent.Close, onClose)) executor.target.cleanup() }) diff --git a/packages/net/src/target/Echo.ts b/packages/net/src/target/Echo.ts index 4a68c42..04fcd49 100644 --- a/packages/net/src/target/Echo.ts +++ b/packages/net/src/target/Echo.ts @@ -6,7 +6,7 @@ export class Echo extends Emitter { return [] } - send(...payload: Message) { + async send(...payload: Message) { this.emit(...payload) } diff --git a/packages/net/src/target/Local.ts b/packages/net/src/target/Local.ts index 9dfb4fa..b975941 100644 --- a/packages/net/src/target/Local.ts +++ b/packages/net/src/target/Local.ts @@ -13,8 +13,8 @@ export class Local extends Emitter { return [] } - send(...payload: Message) { - this.relay.send(...payload) + async send(...payload: Message) { + await this.relay.send(...payload) } onMessage = (...message: Message) => { diff --git a/packages/net/src/target/Multi.ts b/packages/net/src/target/Multi.ts index 3e79a35..83fbd51 100644 --- a/packages/net/src/target/Multi.ts +++ b/packages/net/src/target/Multi.ts @@ -15,8 +15,8 @@ export class Multi extends Emitter { return this.targets.flatMap(t => t.connections) } - send(...payload: Message) { - this.targets.forEach(t => t.send(...payload)) + async send(...payload: Message) { + await Promise.all(this.targets.map(t => t.send(...payload))) } cleanup = () => { diff --git a/packages/net/src/target/Relay.ts b/packages/net/src/target/Relay.ts index 3751fe2..51b12d7 100644 --- a/packages/net/src/target/Relay.ts +++ b/packages/net/src/target/Relay.ts @@ -14,8 +14,8 @@ export class Relay extends Emitter { return [this.connection] } - send(...payload: Message) { - this.connection.send(payload) + async send(...payload: Message) { + await this.connection.send(payload) } onMessage = (connection: Connection, [verb, ...payload]: Message) => { diff --git a/packages/net/src/target/Relays.ts b/packages/net/src/target/Relays.ts index 0baa004..d942d50 100644 --- a/packages/net/src/target/Relays.ts +++ b/packages/net/src/target/Relays.ts @@ -12,10 +12,8 @@ export class Relays extends Emitter { }) } - send = (...payload: Message) => { - this.connections.forEach(connection => { - connection.send(payload) - }) + async send(...payload: Message) { + await Promise.all(this.connections.map(c => c.send(payload))) } onMessage = (connection: Connection, [verb, ...payload]: Message) => {