Avoid re-opening connections after they're closed

This commit is contained in:
Jon Staab
2024-12-16 15:20:10 -08:00
parent 8825ed4d57
commit 887fbfc25d
9 changed files with 24 additions and 22 deletions
+5 -3
View File
@@ -44,11 +44,13 @@ export class Connection extends Emitter {
emit = (type: ConnectionEvent, ...args: any[]) => super.emit(type, this, ...args) emit = (type: ConnectionEvent, ...args: any[]) => super.emit(type, this, ...args)
send = async (message: Message) => { send = async (message: Message) => {
if (this.status !== Ready) {
throw new Error(`Attempted to send message on ${this.status} connection`)
}
await this.socket.open() await this.socket.open()
if (this.status === Ready) { this.sender.push(message)
this.sender.push(message)
}
} }
close = async () => { close = async () => {
+4 -4
View File
@@ -1,4 +1,4 @@
import {ctx} from '@welshman/lib' import {ctx, noop} from '@welshman/lib'
import type {Emitter} from '@welshman/lib' import type {Emitter} from '@welshman/lib'
import type {SignedEvent, TrustedEvent, Filter} from '@welshman/util' import type {SignedEvent, TrustedEvent, Filter} from '@welshman/util'
import type {Message} from './Socket' import type {Message} from './Socket'
@@ -7,7 +7,7 @@ import {Negentropy, NegentropyStorageVector} from './Negentropy'
export type Target = Emitter & { export type Target = Emitter & {
connections: Connection[] connections: Connection[]
send: (...args: Message) => void send: (...args: Message) => Promise<void>
cleanup: () => void cleanup: () => void
} }
@@ -58,7 +58,7 @@ export class Executor {
unsubscribe: () => { unsubscribe: () => {
if (closed) return if (closed) return
this.target.send("CLOSE", id) this.target.send("CLOSE", id).catch(noop)
this.target.off('EVENT', eventListener) this.target.off('EVENT', eventListener)
this.target.off('EOSE', eoseListener) this.target.off('EOSE', eoseListener)
@@ -132,7 +132,7 @@ export class Executor {
const close = () => { const close = () => {
if (closed) return if (closed) return
this.target.send('NEG-CLOSE', id) this.target.send('NEG-CLOSE', id).catch(noop)
this.target.off('NEG-MSG', msgListener) this.target.off('NEG-MSG', msgListener)
this.target.off('NEG-ERR', errListener) this.target.off('NEG-ERR', errListener)
+5
View File
@@ -3,14 +3,17 @@ import {Connection} from "./Connection"
export class Pool extends Emitter { export class Pool extends Emitter {
data: Map<string, Connection> data: Map<string, Connection>
constructor() { constructor() {
super() super()
this.data = new Map() this.data = new Map()
} }
has(url: string) { has(url: string) {
return this.data.has(url) return this.data.has(url)
} }
get(url: string): Connection { get(url: string): Connection {
const oldConnection = this.data.get(url) const oldConnection = this.data.get(url)
@@ -25,6 +28,7 @@ export class Pool extends Emitter {
return newConnection return newConnection
} }
remove(url: string) { remove(url: string) {
const connection = this.data.get(url) const connection = this.data.get(url)
@@ -34,6 +38,7 @@ export class Pool extends Emitter {
this.data.delete(url) this.data.delete(url)
} }
} }
clear() { clear() {
for (const url of this.data.keys()) { for (const url of this.data.keys()) {
this.remove(url) this.remove(url)
+1 -4
View File
@@ -247,10 +247,7 @@ const _executeSubscription = (sub: Subscription) => {
emitter.on(SubscriptionEvent.Complete, () => { emitter.on(SubscriptionEvent.Complete, () => {
emitter.removeAllListeners() emitter.removeAllListeners()
subs.forEach(sub => sub.unsubscribe()) subs.forEach(sub => sub.unsubscribe())
executor.target.connections.forEach((c: Connection) => { executor.target.connections.forEach(c => c.off(ConnectionEvent.Close, onClose))
c.off(ConnectionEvent.Close, onClose)
})
executor.target.cleanup() executor.target.cleanup()
}) })
+1 -1
View File
@@ -6,7 +6,7 @@ export class Echo extends Emitter {
return [] return []
} }
send(...payload: Message) { async send(...payload: Message) {
this.emit(...payload) this.emit(...payload)
} }
+2 -2
View File
@@ -13,8 +13,8 @@ export class Local extends Emitter {
return [] return []
} }
send(...payload: Message) { async send(...payload: Message) {
this.relay.send(...payload) await this.relay.send(...payload)
} }
onMessage = (...message: Message) => { onMessage = (...message: Message) => {
+2 -2
View File
@@ -15,8 +15,8 @@ export class Multi extends Emitter {
return this.targets.flatMap(t => t.connections) return this.targets.flatMap(t => t.connections)
} }
send(...payload: Message) { async send(...payload: Message) {
this.targets.forEach(t => t.send(...payload)) await Promise.all(this.targets.map(t => t.send(...payload)))
} }
cleanup = () => { cleanup = () => {
+2 -2
View File
@@ -14,8 +14,8 @@ export class Relay extends Emitter {
return [this.connection] return [this.connection]
} }
send(...payload: Message) { async send(...payload: Message) {
this.connection.send(payload) await this.connection.send(payload)
} }
onMessage = (connection: Connection, [verb, ...payload]: Message) => { onMessage = (connection: Connection, [verb, ...payload]: Message) => {
+2 -4
View File
@@ -12,10 +12,8 @@ export class Relays extends Emitter {
}) })
} }
send = (...payload: Message) => { async send(...payload: Message) {
this.connections.forEach(connection => { await Promise.all(this.connections.map(c => c.send(payload)))
connection.send(payload)
})
} }
onMessage = (connection: Connection, [verb, ...payload]: Message) => { onMessage = (connection: Connection, [verb, ...payload]: Message) => {