Fix some memory leaks, add executor.load and some utils
This commit is contained in:
@@ -108,5 +108,7 @@ export class Connection extends Emitter {
|
||||
destroy() {
|
||||
this.socket.disconnect()
|
||||
this.removeAllListeners()
|
||||
this.sendQueue.stop()
|
||||
this.receiveQueue.stop()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -12,11 +12,13 @@ export type Target = Emitter & {
|
||||
|
||||
type EventCallback = (url: string, event: Event) => void
|
||||
type EoseCallback = (url: string) => void
|
||||
type CloseCallback = () => void
|
||||
type AuthCallback = (url: string, challenge: string) => void
|
||||
type OkCallback = (url: string, id: string, ...extra: any[]) => void
|
||||
type ErrorCallback = (url: string, id: string, ...extra: any[]) => void
|
||||
type CountCallback = (url: string, ...extra: any[]) => void
|
||||
type SubscribeOpts = {onEvent?: EventCallback, onEose?: EoseCallback}
|
||||
type LoadOpts = SubscribeOpts & {timeout?: number, onClose?: CloseCallback}
|
||||
type PublishOpts = {verb: string, onOk: OkCallback, onError: ErrorCallback}
|
||||
type CountOpts = {onCount: CountCallback}
|
||||
type AuthOpts = {onAuth: AuthCallback, onOk: OkCallback}
|
||||
@@ -95,4 +97,29 @@ export class Executor {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
load(filters: Filter[], {timeout = 30_000, onEvent, onEose, onClose}: LoadOpts) {
|
||||
const eose = new Set()
|
||||
|
||||
const close = () => {
|
||||
onClose?.()
|
||||
sub.unsubscribe()
|
||||
clearTimeout(handle)
|
||||
}
|
||||
|
||||
const handle = setTimeout(close, timeout)
|
||||
|
||||
const sub = this.subscribe(filters, {
|
||||
onEvent,
|
||||
onEose: (url: string) => {
|
||||
onEose?.(url)
|
||||
eose.add(url)
|
||||
|
||||
if (eose.size === this.target.connections.length) {
|
||||
close()
|
||||
}
|
||||
},
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -2,6 +2,7 @@ export * from "./Connection"
|
||||
export * from "./ConnectionMeta"
|
||||
export * from "./Executor"
|
||||
export * from "./Pool"
|
||||
export * from "./util/nostr"
|
||||
export * from "./util/Deferred"
|
||||
export * from "./util/Emitter"
|
||||
export * from "./util/Queue"
|
||||
|
||||
@@ -50,4 +50,8 @@ export class Queue {
|
||||
|
||||
this.timeout = setTimeout(() => this.doWork(), 100) as NodeJS.Timeout
|
||||
}
|
||||
|
||||
stop() {
|
||||
clearTimeout(this.timeout)
|
||||
}
|
||||
}
|
||||
|
||||
+15
-7
@@ -23,6 +23,7 @@ export class Socket {
|
||||
url: string
|
||||
ws?: WebSocket
|
||||
ready: Deferred<void>
|
||||
failedToConnect = false
|
||||
|
||||
constructor(url: string, readonly opts: SocketOpts) {
|
||||
this.url = url
|
||||
@@ -39,7 +40,7 @@ export class Socket {
|
||||
}
|
||||
|
||||
isPending() {
|
||||
return !this.ws
|
||||
return !this.ws && !this.failedToConnect
|
||||
}
|
||||
|
||||
isConnecting() {
|
||||
@@ -70,7 +71,10 @@ export class Socket {
|
||||
onClose = () => {
|
||||
this.ready.reject()
|
||||
this.opts.onClose()
|
||||
this._close()
|
||||
|
||||
if (this.ws) {
|
||||
this._close()
|
||||
}
|
||||
}
|
||||
|
||||
onError = () => {
|
||||
@@ -100,11 +104,15 @@ export class Socket {
|
||||
throw new Error(`Already attempted connection for ${this.url}`)
|
||||
}
|
||||
|
||||
this.ws = new WebSocket(this.url)
|
||||
this.ws.onopen = this.onOpen
|
||||
this.ws.onclose = this.onClose
|
||||
this.ws.onerror = this.onError
|
||||
this.ws.onmessage = this.onMessage
|
||||
try {
|
||||
this.ws = new WebSocket(this.url)
|
||||
this.ws.onopen = this.onOpen
|
||||
this.ws.onclose = this.onClose
|
||||
this.ws.onerror = this.onError
|
||||
this.ws.onmessage = this.onMessage
|
||||
} catch (e) {
|
||||
this.failedToConnect = true
|
||||
}
|
||||
}
|
||||
|
||||
disconnect() {
|
||||
|
||||
@@ -0,0 +1,32 @@
|
||||
export const stripProto = (url: string) => url.replace(/.*:\/\//, "")
|
||||
|
||||
export const isShareableRelay = (url: string) =>
|
||||
// Is it actually a websocket url
|
||||
url.match(/^wss:\/\/.+/) &&
|
||||
// Sometimes bugs cause multiple relays to get concatenated
|
||||
url.match(/:\/\//g)?.length === 1 &&
|
||||
// It shouldn't have any whitespace
|
||||
!url.match(/\s/) &&
|
||||
// Don't match stuff with a port number
|
||||
!url.slice(6).match(/:\d+/) &&
|
||||
// Don't match raw ip addresses
|
||||
!url.slice(6).match(/\d+\.\d+\.\d+\.\d+/) &&
|
||||
// Skip nostr.wine's virtual relays
|
||||
!url.slice(6).match(/\/npub/)
|
||||
|
||||
export const normalizeRelayUrl = (url: string) => {
|
||||
// If it doesn't start with a compatible protocol, strip the proto and add wss
|
||||
if (!url.match(/^(wss|local):\/\/.+/)) {
|
||||
url = "wss://" + stripProto(url)
|
||||
}
|
||||
|
||||
try {
|
||||
return new URL(url).href.replace(/\/+$/, "").toLowerCase()
|
||||
} catch (e) {
|
||||
return null
|
||||
}
|
||||
}
|
||||
|
||||
export const fromNostrURI = (s: string) => s.replace(/^[\w+]+:\/?\/?/, "")
|
||||
|
||||
export const toNostrURI = (s: string) => `nostr:${s}`
|
||||
Reference in New Issue
Block a user