Rename everything to welshman
This commit is contained in:
@@ -0,0 +1,2 @@
|
||||
build
|
||||
normalize-url
|
||||
@@ -0,0 +1,117 @@
|
||||
import {Emitter, Worker} from '@welshman/lib'
|
||||
import {AuthStatus, ConnectionMeta} from './ConnectionMeta'
|
||||
import {Socket, isMessage, asMessage} from './Socket'
|
||||
import type {SocketMessage} from './Socket'
|
||||
|
||||
export class Connection extends Emitter {
|
||||
url: string
|
||||
socket: Socket
|
||||
sender: Worker<SocketMessage>
|
||||
receiver: Worker<SocketMessage>
|
||||
meta: ConnectionMeta
|
||||
|
||||
constructor(url: string) {
|
||||
super()
|
||||
|
||||
this.url = url
|
||||
this.socket = new Socket(url, this)
|
||||
this.sender = this.createSender()
|
||||
this.receiver = this.createReceiver()
|
||||
this.meta = new ConnectionMeta(this)
|
||||
this.setMaxListeners(100)
|
||||
}
|
||||
|
||||
createSender = () => {
|
||||
const worker = new Worker<SocketMessage>({
|
||||
shouldDefer: (message: SocketMessage) => {
|
||||
if (!this.socket.isReady()) {
|
||||
return true
|
||||
}
|
||||
|
||||
const [verb, ...extra] = asMessage(message)
|
||||
|
||||
if (['AUTH', 'CLOSE'].includes(verb)) {
|
||||
return false
|
||||
}
|
||||
|
||||
// Allow relay requests through
|
||||
if (verb === 'EVENT' && extra[0].kind === 28934) {
|
||||
return false
|
||||
}
|
||||
|
||||
// Only defer for auth if we're not multiplexing
|
||||
if (isMessage(message) && ![AuthStatus.Ok, AuthStatus.Pending].includes(this.meta.authStatus)) {
|
||||
return true
|
||||
}
|
||||
|
||||
if (verb === 'REQ') {
|
||||
return this.meta.pendingRequests.size >= 8
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
})
|
||||
|
||||
worker.addGlobalHandler((message: SocketMessage) => {
|
||||
// If we ended up handling a CLOSE before we handled the REQ, don't send the REQ
|
||||
if (message[0] === 'CLOSE') {
|
||||
worker.buffer = worker.buffer.filter(m => !(m[0] === 'REQ' && m[1] === message[1]))
|
||||
}
|
||||
|
||||
this.onSend(message)
|
||||
})
|
||||
|
||||
return worker
|
||||
}
|
||||
|
||||
createReceiver = () => {
|
||||
const worker = new Worker<SocketMessage>()
|
||||
|
||||
worker.addGlobalHandler(this.onReceive)
|
||||
|
||||
return worker
|
||||
}
|
||||
|
||||
send = (m: SocketMessage) => this.sender.push(m)
|
||||
|
||||
onOpen = () => this.emit('open', this)
|
||||
|
||||
onClose = () => this.emit('close', this)
|
||||
|
||||
onError = () => this.emit('fault', this)
|
||||
|
||||
onMessage = (m: SocketMessage) => this.receiver.push(m)
|
||||
|
||||
onSend = (message: SocketMessage) => {
|
||||
this.emit('send', this, message)
|
||||
this.socket.send(message)
|
||||
}
|
||||
|
||||
onReceive = (message: SocketMessage) => {
|
||||
this.emit('receive', this, message)
|
||||
}
|
||||
|
||||
ensureConnected = ({shouldReconnect = true}) => {
|
||||
if (shouldReconnect && !this.socket.isHealthy()) {
|
||||
this.disconnect()
|
||||
}
|
||||
|
||||
if (this.socket.isPending()) {
|
||||
this.socket.connect()
|
||||
}
|
||||
}
|
||||
|
||||
disconnect() {
|
||||
this.socket.disconnect()
|
||||
this.sender.clear()
|
||||
this.receiver.clear()
|
||||
this.meta.clearPending()
|
||||
}
|
||||
|
||||
destroy() {
|
||||
this.disconnect()
|
||||
this.removeAllListeners()
|
||||
this.sender.stop()
|
||||
this.receiver.stop()
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,181 @@
|
||||
import type {Event, Filter} from 'nostr-tools'
|
||||
import type {Message} from '@welshman/util'
|
||||
import type {Connection} from './Connection'
|
||||
|
||||
export type PublishMeta = {
|
||||
sent: number
|
||||
event: Event
|
||||
}
|
||||
|
||||
export type RequestMeta = {
|
||||
sent: number
|
||||
filters: Filter[]
|
||||
eoseReceived: boolean
|
||||
}
|
||||
|
||||
export enum AuthStatus {
|
||||
Ok = 'ok',
|
||||
Pending = 'pending',
|
||||
Unauthorized = 'unauthorized',
|
||||
Forbidden = 'forbidden',
|
||||
}
|
||||
|
||||
export enum ConnectionStatus {
|
||||
Unauthorized = 'unauthorized',
|
||||
Forbidden = 'forbidden',
|
||||
Error = 'error',
|
||||
Closed = 'closed',
|
||||
Slow = 'slow',
|
||||
Ok = 'ok',
|
||||
}
|
||||
|
||||
export class ConnectionMeta {
|
||||
authStatus = AuthStatus.Pending
|
||||
pendingPublishes = new Map<string, PublishMeta>()
|
||||
pendingRequests = new Map<string, RequestMeta>()
|
||||
publishCount = 0
|
||||
requestCount = 0
|
||||
eventCount = 0
|
||||
lastOpen = 0
|
||||
lastClose = 0
|
||||
lastFault = 0
|
||||
lastPublish = 0
|
||||
lastRequest = 0
|
||||
lastEvent = 0
|
||||
responseCount = 0
|
||||
responseTimer = 0
|
||||
|
||||
constructor(readonly cxn: Connection) {
|
||||
cxn.on('open', () => {
|
||||
this.lastOpen = Date.now()
|
||||
})
|
||||
|
||||
cxn.on('close', () => {
|
||||
this.lastClose = Date.now()
|
||||
})
|
||||
|
||||
cxn.on('fault', () => {
|
||||
this.lastFault = Date.now()
|
||||
})
|
||||
|
||||
cxn.on('send', (cxn: Connection, message: Message) => {
|
||||
if (message[0] === 'REQ') this.onSendRequest(message)
|
||||
if (message[0] === 'CLOSE') this.onSendClose(message)
|
||||
if (message[0] === 'EVENT') this.onSendEvent(message)
|
||||
})
|
||||
|
||||
cxn.on('receive', (cxn: Connection, message: Message) => {
|
||||
if (message[0] === 'OK') this.onReceiveOk(message)
|
||||
if (message[0] === 'AUTH') this.onReceiveAuth(message)
|
||||
if (message[0] === 'EVENT') this.onReceiveEvent(message)
|
||||
if (message[0] === 'EOSE') this.onReceiveEose(message)
|
||||
if (message[0] === 'CLOSED') this.onReceiveClosed(message)
|
||||
if (message[0] === 'NOTICE') this.onReceiveNotice(message)
|
||||
})
|
||||
}
|
||||
|
||||
onSendRequest([verb, subId, ...filters]: Message) {
|
||||
this.requestCount++
|
||||
this.lastRequest = Date.now()
|
||||
this.pendingRequests.set(subId, {
|
||||
filters,
|
||||
sent: Date.now(),
|
||||
eoseReceived: false,
|
||||
})
|
||||
}
|
||||
|
||||
onSendClose([verb, subId]: Message) {
|
||||
this.pendingRequests.delete(subId)
|
||||
}
|
||||
|
||||
onSendEvent([verb, event]: Message) {
|
||||
this.publishCount++
|
||||
this.lastPublish = Date.now()
|
||||
this.pendingPublishes.set(event.id, {sent: Date.now(), event})
|
||||
}
|
||||
|
||||
onReceiveOk([verb, eventId, ok, notice]: Message) {
|
||||
const publish = this.pendingPublishes.get(eventId)
|
||||
|
||||
if (ok) {
|
||||
this.authStatus = AuthStatus.Ok
|
||||
} else if (notice.startsWith('auth-required:')) {
|
||||
// Re-enqueue pending reqs when auth challenge is received
|
||||
const pub = this.pendingPublishes.get(eventId)
|
||||
|
||||
if (pub) {
|
||||
this.cxn.send(['EVENT', pub.event])
|
||||
}
|
||||
}
|
||||
|
||||
if (publish) {
|
||||
this.responseCount++
|
||||
this.responseTimer += Date.now() - publish.sent
|
||||
this.pendingPublishes.delete(eventId)
|
||||
}
|
||||
}
|
||||
|
||||
onReceiveAuth([verb, eventId]: Message) {
|
||||
this.authStatus = AuthStatus.Unauthorized
|
||||
}
|
||||
|
||||
onReceiveEvent([verb, event]: Message) {
|
||||
this.eventCount++
|
||||
this.lastEvent = Date.now()
|
||||
}
|
||||
|
||||
onReceiveEose([verb, subId]: Message) {
|
||||
const request = this.pendingRequests.get(subId)
|
||||
|
||||
// Only count the first eose
|
||||
if (request && !request.eoseReceived) {
|
||||
request.eoseReceived = true
|
||||
|
||||
this.responseCount++
|
||||
this.responseTimer += Date.now() - request.sent
|
||||
}
|
||||
}
|
||||
|
||||
onReceiveClosed([verb, id, notice]: Message) {
|
||||
if (notice.startsWith('auth-required:')) {
|
||||
// Re-enqueue pending reqs when auth challenge is received
|
||||
const req = this.pendingRequests.get(id)
|
||||
|
||||
if (req) {
|
||||
this.cxn.send(['REQ', id, ...req.filters])
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
onReceiveNotice([verb, notice]: Message) {
|
||||
console.warn('NOTICE', this.cxn.url, notice)
|
||||
}
|
||||
|
||||
clearPending = () => {
|
||||
this.pendingPublishes.clear()
|
||||
this.pendingRequests.clear()
|
||||
}
|
||||
|
||||
getSpeed = () => this.responseCount ? this.responseTimer / this.responseCount : 0
|
||||
|
||||
getStatus = () => {
|
||||
if (this.authStatus === AuthStatus.Unauthorized) return ConnectionStatus.Unauthorized
|
||||
if (this.authStatus === AuthStatus.Forbidden) return ConnectionStatus.Forbidden
|
||||
if (this.lastFault > this.lastOpen) return ConnectionStatus.Error
|
||||
if (this.lastClose > this.lastOpen) return ConnectionStatus.Closed
|
||||
if (this.getSpeed() > 1000) return ConnectionStatus.Slow
|
||||
|
||||
return ConnectionStatus.Ok
|
||||
}
|
||||
|
||||
getDescription = () => {
|
||||
switch (this.getStatus()) {
|
||||
case ConnectionStatus.Unauthorized: return 'Logging in'
|
||||
case ConnectionStatus.Forbidden: return 'Failed to log in'
|
||||
case ConnectionStatus.Error: return 'Failed to connect'
|
||||
case ConnectionStatus.Closed: return 'Waiting to reconnect'
|
||||
case ConnectionStatus.Slow: return 'Slow Connection'
|
||||
case ConnectionStatus.Ok: return 'Connected'
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,34 @@
|
||||
import type {Event} from 'nostr-tools'
|
||||
import {matchFilters, hasValidSignature} from '@welshman/util'
|
||||
import type {Filter} from '@welshman/util'
|
||||
import {Pool} from "./Pool"
|
||||
import {Executor} from "./Executor"
|
||||
import {Relays} from "./target/Relays"
|
||||
|
||||
export const defaultPool = new Pool()
|
||||
|
||||
export const defaultGetExecutor = (relays: string[]) =>
|
||||
new Executor(new Relays(relays.map((relay: string) => NetworkContext.pool.get(relay))))
|
||||
|
||||
const defaultOnEvent = (url: string, event: Event) => null
|
||||
|
||||
const defaultOnAuth = (url: string, challenge: string) => null
|
||||
|
||||
const defaultOnOk = (url: string, id: string, ok: boolean, message: string) => null
|
||||
|
||||
const defaultIsDeleted = (url: string, event: Event) => false
|
||||
|
||||
const defaultHasValidSignature = (url: string, event: Event) => hasValidSignature(event)
|
||||
|
||||
const defaultMatchFilters = (url: string, filters: Filter[], event: Event) => matchFilters(filters, event)
|
||||
|
||||
export const NetworkContext = {
|
||||
pool: defaultPool,
|
||||
getExecutor: defaultGetExecutor,
|
||||
onEvent: defaultOnEvent,
|
||||
onAuth: defaultOnAuth,
|
||||
onOk: defaultOnOk,
|
||||
isDeleted: defaultIsDeleted,
|
||||
hasValidSignature: defaultHasValidSignature,
|
||||
matchFilters: defaultMatchFilters,
|
||||
}
|
||||
@@ -0,0 +1,90 @@
|
||||
import type {Event, Filter} from 'nostr-tools'
|
||||
import type {Emitter} from '@welshman/lib'
|
||||
import type {Message} from '@welshman/util'
|
||||
import type {Connection} from './Connection'
|
||||
import {NetworkContext} from './Context'
|
||||
|
||||
export type Target = Emitter & {
|
||||
connections: Connection[]
|
||||
send: (...args: Message) => void
|
||||
cleanup: () => void
|
||||
}
|
||||
|
||||
type EventCallback = (url: string, event: Event) => void
|
||||
type EoseCallback = (url: string) => void
|
||||
type OkCallback = (url: string, id: string, ...extra: any[]) => void
|
||||
type ErrorCallback = (url: string, id: string, ...extra: any[]) => void
|
||||
type SubscribeOpts = {onEvent?: EventCallback, onEose?: EoseCallback}
|
||||
type PublishOpts = {verb?: string, onOk?: OkCallback, onError?: ErrorCallback}
|
||||
|
||||
const createSubId = (prefix: string) => [prefix, Math.random().toString().slice(2, 10)].join('-')
|
||||
|
||||
export class Executor {
|
||||
|
||||
constructor(readonly target: Target) {
|
||||
target.on('AUTH', NetworkContext.onAuth)
|
||||
target.on('OK', NetworkContext.onOk)
|
||||
}
|
||||
|
||||
subscribe(filters: Filter[], {onEvent, onEose}: SubscribeOpts = {}) {
|
||||
let closed = false
|
||||
|
||||
const id = createSubId('REQ')
|
||||
|
||||
const eventListener = (url: string, subid: string, e: Event) => {
|
||||
if (subid === id) {
|
||||
NetworkContext.onEvent(url, e)
|
||||
onEvent?.(url, e)
|
||||
}
|
||||
}
|
||||
|
||||
const eoseListener = (url: string, subid: string) => {
|
||||
if (subid === id) {
|
||||
onEose?.(url)
|
||||
}
|
||||
}
|
||||
|
||||
this.target.on('EVENT', eventListener)
|
||||
this.target.on('EOSE', eoseListener)
|
||||
this.target.send("REQ", id, ...filters)
|
||||
|
||||
return {
|
||||
unsubscribe: () => {
|
||||
if (!closed) {
|
||||
this.target.send("CLOSE", id)
|
||||
this.target.off('EVENT', eventListener)
|
||||
this.target.off('EOSE', eoseListener)
|
||||
}
|
||||
|
||||
closed = true
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
publish(event: Event, {verb = 'EVENT', onOk, onError}: PublishOpts = {}) {
|
||||
const okListener = (url: string, id: string, ...payload: any[]) => {
|
||||
if (id === event.id) {
|
||||
NetworkContext.onEvent(url, event)
|
||||
onOk?.(url, id, ...payload)
|
||||
}
|
||||
}
|
||||
|
||||
const errorListener = (url: string, id: string, ...payload: any[]) => {
|
||||
if (id === event.id) {
|
||||
onError?.(url, id, ...payload)
|
||||
}
|
||||
}
|
||||
|
||||
this.target.on('OK', okListener)
|
||||
this.target.on('ERROR', errorListener)
|
||||
this.target.send(verb, event)
|
||||
|
||||
return {
|
||||
unsubscribe: () => {
|
||||
this.target.off('OK', okListener)
|
||||
this.target.off('ERROR', errorListener)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,49 @@
|
||||
import {Emitter} from '@welshman/lib'
|
||||
import {Connection} from "./Connection"
|
||||
|
||||
export class Pool extends Emitter {
|
||||
data: Map<string, Connection>
|
||||
constructor() {
|
||||
super()
|
||||
|
||||
this.data = new Map()
|
||||
}
|
||||
has(url: string) {
|
||||
return this.data.has(url)
|
||||
}
|
||||
get(url: string, {autoConnect = true, reconnectAfter = 3000} = {}): Connection {
|
||||
let connection = this.data.get(url)
|
||||
|
||||
if (autoConnect) {
|
||||
if (!connection) {
|
||||
connection = new Connection(url)
|
||||
|
||||
this.data.set(url, connection)
|
||||
this.emit('init', connection)
|
||||
|
||||
connection.on('open', () => this.emit('open', connection))
|
||||
connection.on('close', () => this.emit('close', connection))
|
||||
}
|
||||
|
||||
connection.ensureConnected({
|
||||
shouldReconnect: connection.meta.lastClose < Date.now() - reconnectAfter,
|
||||
})
|
||||
}
|
||||
|
||||
return connection!
|
||||
}
|
||||
remove(url: string) {
|
||||
const connection = this.data.get(url)
|
||||
|
||||
if (connection) {
|
||||
connection.destroy()
|
||||
|
||||
this.data.delete(url)
|
||||
}
|
||||
}
|
||||
clear() {
|
||||
for (const url of this.data.keys()) {
|
||||
this.remove(url)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,93 @@
|
||||
import type {Event} from 'nostr-tools'
|
||||
import {Emitter, now, randomId, defer} from '@welshman/lib'
|
||||
import type {Deferred} from '@welshman/lib'
|
||||
import {asEvent,} from '@welshman/util'
|
||||
import {NetworkContext} from './Context'
|
||||
|
||||
export enum PublishStatus {
|
||||
Pending = "pending",
|
||||
Success = "success",
|
||||
Failure = "failure",
|
||||
Timeout = "timeout",
|
||||
}
|
||||
|
||||
export type PublishStatusMap = Map<string, PublishStatus>
|
||||
|
||||
export type PublishRequest = {
|
||||
event: Event
|
||||
relays: string[]
|
||||
timeout?: number
|
||||
verb?: "EVENT" | "AUTH"
|
||||
}
|
||||
|
||||
export type Publish = {
|
||||
id: string
|
||||
created_at: number
|
||||
emitter: Emitter
|
||||
request: PublishRequest
|
||||
status: PublishStatusMap
|
||||
result: Deferred<PublishStatusMap>
|
||||
}
|
||||
|
||||
export const makePublish = (request: PublishRequest) => {
|
||||
const id = randomId()
|
||||
const created_at = now()
|
||||
const emitter = new Emitter()
|
||||
const result: Publish['result'] = defer()
|
||||
const status: Publish['status'] = new Map()
|
||||
|
||||
return {id, created_at, request, emitter, result, status}
|
||||
}
|
||||
|
||||
export const publish = (request: PublishRequest) => {
|
||||
const pub = makePublish(request)
|
||||
const event = asEvent(request.event)
|
||||
const executor = NetworkContext.getExecutor(request.relays)
|
||||
|
||||
// Listen to updates and keep status up to date. Every time there's an update, check to
|
||||
// see if we're done. If we are, clear our timeout, executor, etc.
|
||||
pub.emitter.on("*", (status: PublishStatus, url: string) => {
|
||||
pub.status.set(url, status)
|
||||
|
||||
if (Array.from(pub.status.values()).every((s: PublishStatus) => s !== PublishStatus.Pending)) {
|
||||
clearTimeout(timeout)
|
||||
executorSub.unsubscribe()
|
||||
executor.target.cleanup()
|
||||
pub.result.resolve(pub.status)
|
||||
}
|
||||
})
|
||||
|
||||
// Start everything off as pending
|
||||
setTimeout(() => {
|
||||
for (const relay of request.relays) {
|
||||
pub.emitter.emit(PublishStatus.Pending, relay)
|
||||
}
|
||||
})
|
||||
|
||||
// Set a timeout
|
||||
const timeout = setTimeout(() => {
|
||||
for (const [url, status] of pub.status.entries()) {
|
||||
if (status === PublishStatus.Pending) {
|
||||
pub.emitter.emit(PublishStatus.Timeout, url)
|
||||
}
|
||||
}
|
||||
}, request.timeout || 10_000)
|
||||
|
||||
// Delegate to our executor
|
||||
const executorSub = executor.publish(event, {
|
||||
verb: request.verb || "EVENT",
|
||||
onOk: (url: string, eventId: string, ok: boolean) => {
|
||||
if (ok) {
|
||||
pub.emitter.emit(PublishStatus.Success, url)
|
||||
} else {
|
||||
pub.emitter.emit(PublishStatus.Failure, url)
|
||||
}
|
||||
},
|
||||
onError: (url: string) => {
|
||||
pub.emitter.emit(PublishStatus.Failure, url)
|
||||
},
|
||||
})
|
||||
|
||||
return pub
|
||||
}
|
||||
|
||||
@@ -0,0 +1,20 @@
|
||||
# @welshman/net [](https://npmjs.com/package/@welshman/net)
|
||||
|
||||
Utilities having to do with connection management and nostr messages.
|
||||
|
||||
- `Connection` - a wrapper for `Socket` with send and receive queues, and a `ConnectionMeta` instance.
|
||||
- `ConnectionMeta` - tracks stats for a given `Connection`.
|
||||
- `Context` - an object containing a default `Pool` and global configuration options.
|
||||
- `Executor` - implements common nostr flows on a given `target`
|
||||
- `Pool` - a thin wrapper around `Map` which stores `Connection`s.
|
||||
- `Publish` - utilities for publishing events.
|
||||
- `Socket` - a wrapper around isomorphic-ws that handles json parsing/serialization.
|
||||
- `Subscribe` - utilities for making requests against nostr relays.
|
||||
- `Tracker` - tracks which relays a given event was seen on.
|
||||
|
||||
Executor `target`s extend `Emitter`, and have a `send` method, a `cleanup` method, and a `connections` getter. They are intended to be passed to an `Executor` for use.
|
||||
|
||||
- `targets/Multi` allows you to compose multiple targets together.
|
||||
- `targets/Plex` takes an array of urls and a `Connection` and sends and receives wrapped nostr messages over that connection.
|
||||
- `targets/Relay` takes a `Connection` and provides listeners for different verbs.
|
||||
- `targets/Relays` takes an array of `Connection`s and provides listeners for different verbs, merging all events into a single stream.
|
||||
@@ -0,0 +1,116 @@
|
||||
import WebSocket from "isomorphic-ws"
|
||||
import {Deferred, defer} from '@welshman/lib'
|
||||
import type {Message} from '@welshman/util'
|
||||
|
||||
export type PlexMessage = [{relays: string[]}, Message]
|
||||
|
||||
export type SocketMessage = Message | PlexMessage
|
||||
|
||||
export const isMessage = (m: SocketMessage): boolean => typeof m[0] === 'string'
|
||||
|
||||
export const asMessage = (m: SocketMessage): Message => isMessage(m) ? m : m[1]
|
||||
|
||||
export type SocketOpts = {
|
||||
onOpen: () => void
|
||||
onClose: () => void
|
||||
onError: () => void
|
||||
onMessage: (message: SocketMessage) => void
|
||||
}
|
||||
|
||||
export class Socket {
|
||||
url: string
|
||||
ws?: WebSocket
|
||||
ready: Deferred<boolean>
|
||||
failedToConnect = false
|
||||
|
||||
constructor(url: string, readonly opts: SocketOpts) {
|
||||
this.url = url
|
||||
this.ready = defer()
|
||||
}
|
||||
|
||||
isPending() {
|
||||
return !this.ws && !this.failedToConnect
|
||||
}
|
||||
|
||||
isConnecting() {
|
||||
return this.ws?.readyState === WebSocket.CONNECTING
|
||||
}
|
||||
|
||||
isReady() {
|
||||
return this.ws?.readyState === WebSocket.OPEN
|
||||
}
|
||||
|
||||
isClosing() {
|
||||
return this.ws?.readyState === WebSocket.CLOSING
|
||||
}
|
||||
|
||||
isClosed() {
|
||||
return this.ws?.readyState === WebSocket.CLOSED
|
||||
}
|
||||
|
||||
isHealthy() {
|
||||
return this.isPending() || this.isConnecting() || this.isReady()
|
||||
}
|
||||
|
||||
onOpen = () => {
|
||||
this.ready.resolve(true)
|
||||
this.opts.onOpen()
|
||||
}
|
||||
|
||||
onError = () => {
|
||||
this.opts.onError()
|
||||
this.disconnect()
|
||||
}
|
||||
|
||||
onMessage = (event: {data: string}) => {
|
||||
try {
|
||||
const message = JSON.parse(event.data as string)
|
||||
|
||||
if (Array.isArray(message)) {
|
||||
this.opts.onMessage(message as Message)
|
||||
} else {
|
||||
console.warn("Invalid messages received:", message)
|
||||
}
|
||||
} catch (e) {
|
||||
// pass
|
||||
}
|
||||
}
|
||||
|
||||
send = (message: any) => {
|
||||
if (!this.ws) {
|
||||
throw new Error('Send attempted before socket was opened')
|
||||
}
|
||||
|
||||
this.ws.send(JSON.stringify(message))
|
||||
}
|
||||
|
||||
connect = () => {
|
||||
if (this.ws) {
|
||||
throw new Error(`Already attempted connection for ${this.url}`)
|
||||
}
|
||||
|
||||
try {
|
||||
this.ws = new WebSocket(this.url)
|
||||
this.ws.onopen = this.onOpen
|
||||
this.ws.onerror = this.onError
|
||||
this.ws.onmessage = this.onMessage
|
||||
this.ws.onclose = this.disconnect
|
||||
} catch (e) {
|
||||
this.failedToConnect = true
|
||||
}
|
||||
}
|
||||
|
||||
disconnect = () => {
|
||||
if (this.ws) {
|
||||
const currentWs = this.ws
|
||||
|
||||
this.ready.then(() => currentWs.close())
|
||||
this.ready = defer()
|
||||
this.opts.onClose()
|
||||
this.ws = undefined
|
||||
|
||||
// Resolve a different instance of the promise
|
||||
this.ready.resolve(false)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,268 @@
|
||||
import type {Event} from 'nostr-tools'
|
||||
import {Emitter, randomId, groupBy, batch, defer, uniq, uniqBy} from '@welshman/lib'
|
||||
import type {Deferred} from '@welshman/lib'
|
||||
import {matchFilters, mergeFilters} from '@welshman/util'
|
||||
import type {Filter} from '@welshman/util'
|
||||
import {Tracker} from "./Tracker"
|
||||
import {Connection} from './Connection'
|
||||
import {NetworkContext} from './Context'
|
||||
|
||||
// `subscribe` is a super function that handles batching subscriptions by merging
|
||||
// them based on parameters (filters and subscribe opts), then splits them by relay.
|
||||
// This results in fewer REQs being opened per connection, fewer duplicate events
|
||||
// being downloaded, and therefore less signature validation.
|
||||
//
|
||||
// Behavior can be further configured using NetworkContext. This can be useful for
|
||||
// adding support for querying a local cache like a relay, tracking deleted events,
|
||||
// and bypassing validation for trusted relays.
|
||||
//
|
||||
// Urls that any given event was seen on are tracked using subscription request's `tracker`
|
||||
// property. These are merged across all subscription requests, so it is possible that an
|
||||
// event may be seen on more relays that were actually requested, in the case of overlapping
|
||||
// subscriptions.
|
||||
|
||||
export enum SubscriptionEvent {
|
||||
Eose = "eose",
|
||||
Close = "close",
|
||||
Event = "event",
|
||||
Abort = "abort",
|
||||
Complete = "complete",
|
||||
Duplicate = "duplicate",
|
||||
DeletedEvent = "deleted-event",
|
||||
FailedFilter = "failed-filter",
|
||||
InvalidSignature = "invalid-signature",
|
||||
}
|
||||
|
||||
export type SubscribeRequest = {
|
||||
relays: string[]
|
||||
filters: Filter[]
|
||||
timeout?: number
|
||||
immediate?: boolean
|
||||
closeOnEose?: boolean
|
||||
}
|
||||
|
||||
export type Subscription = {
|
||||
id: string
|
||||
emitter: Emitter
|
||||
tracker: Tracker
|
||||
result: Deferred<Event[]>
|
||||
request: SubscribeRequest
|
||||
close: () => void
|
||||
}
|
||||
|
||||
export const makeSubscription = (request: SubscribeRequest) => {
|
||||
const id = randomId()
|
||||
const emitter = new Emitter()
|
||||
const tracker = new Tracker()
|
||||
const result = defer<Event[]>()
|
||||
const close = () => emitter.emit('abort')
|
||||
|
||||
emitter.setMaxListeners(100)
|
||||
|
||||
return {id, request, emitter, tracker, result, close}
|
||||
}
|
||||
|
||||
export const calculateSubscriptionGroup = (sub: Subscription) => {
|
||||
const parts: string[] = []
|
||||
|
||||
if (sub.request.timeout) parts.push(`timeout:${sub.request.timeout}`)
|
||||
if (sub.request.closeOnEose) parts.push('closeOnEose')
|
||||
|
||||
return parts.join('|')
|
||||
}
|
||||
|
||||
export const mergeSubscriptions = (subs: Subscription[]) => {
|
||||
const completedRelays = new Set()
|
||||
const mergedSubscriptions = []
|
||||
|
||||
for (const group of Object.values(groupBy(calculateSubscriptionGroup, subs))) {
|
||||
for (const relay of uniq(group.flatMap((sub: Subscription) => sub.request.relays))) {
|
||||
const abortedSubs = new Set()
|
||||
const callerSubs = group.filter((sub: Subscription) => sub.request.relays.includes(relay))
|
||||
const mergedSub = makeSubscription({
|
||||
relays: [relay],
|
||||
timeout: callerSubs[0].request.timeout,
|
||||
filters: mergeFilters(callerSubs.flatMap((sub: Subscription) => sub.request.filters)),
|
||||
})
|
||||
|
||||
for (const {id, emitter} of callerSubs) {
|
||||
// Propagate abort event from the caller to the merged subscription
|
||||
emitter.on(SubscriptionEvent.Abort, () => {
|
||||
abortedSubs.add(id)
|
||||
|
||||
if (abortedSubs.size === callerSubs.length) {
|
||||
mergedSub.close()
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
mergedSub.emitter.on(SubscriptionEvent.Event, (url: string, event: Event) => {
|
||||
for (const sub of callerSubs) {
|
||||
if (sub.tracker.track(event.id, url)) {
|
||||
continue
|
||||
}
|
||||
|
||||
if (!matchFilters(sub.request.filters, event)) {
|
||||
continue
|
||||
}
|
||||
|
||||
sub.emitter.emit(SubscriptionEvent.Event, url, event)
|
||||
}
|
||||
})
|
||||
|
||||
// Pass events back to caller
|
||||
const propagateEvent = (type: SubscriptionEvent, checkFilter: boolean) =>
|
||||
mergedSub.emitter.on(type, (url: string, event: Event) => {
|
||||
for (const sub of callerSubs) {
|
||||
if (!checkFilter || matchFilters(sub.request.filters, event)) {
|
||||
sub.emitter.emit(type, url, event)
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
propagateEvent(SubscriptionEvent.Duplicate, true)
|
||||
propagateEvent(SubscriptionEvent.DeletedEvent, false)
|
||||
propagateEvent(SubscriptionEvent.FailedFilter, false)
|
||||
propagateEvent(SubscriptionEvent.InvalidSignature, true)
|
||||
|
||||
// Propagate eose
|
||||
mergedSub.emitter.on(SubscriptionEvent.Eose, (url: string) => {
|
||||
for (const sub of callerSubs) {
|
||||
sub.emitter.emit(SubscriptionEvent.Eose, url)
|
||||
}
|
||||
})
|
||||
|
||||
// Propagate close
|
||||
mergedSub.emitter.on(SubscriptionEvent.Close, (url: string) => {
|
||||
for (const sub of callerSubs) {
|
||||
sub.emitter.emit(SubscriptionEvent.Close, url)
|
||||
}
|
||||
})
|
||||
|
||||
// Propagate subscription completion. Since we split subs by relay, we need to wait
|
||||
// until all relays are completed before we notify
|
||||
mergedSub.emitter.on(SubscriptionEvent.Complete, () => {
|
||||
completedRelays.add(relay)
|
||||
|
||||
for (const sub of callerSubs) {
|
||||
if (sub.request.relays.every(url => completedRelays.has(url))) {
|
||||
sub.emitter.emit(SubscriptionEvent.Complete)
|
||||
}
|
||||
}
|
||||
|
||||
mergedSub.emitter.removeAllListeners()
|
||||
})
|
||||
|
||||
// Propagate promise resolution
|
||||
mergedSub.result.then((events: Event[]) => {
|
||||
events = uniqBy((event: Event) => event.id, events)
|
||||
|
||||
for (const sub of callerSubs) {
|
||||
sub.result.resolve(events.filter((e: Event) => matchFilters(sub.request.filters, e)))
|
||||
}
|
||||
})
|
||||
|
||||
mergedSubscriptions.push(mergedSub)
|
||||
}
|
||||
}
|
||||
|
||||
return mergedSubscriptions
|
||||
}
|
||||
|
||||
export const executeSubscription = (sub: Subscription) => {
|
||||
const {result, request, emitter, tracker} = sub
|
||||
const {timeout, filters, closeOnEose, relays} = request
|
||||
const executor = NetworkContext.getExecutor(relays)
|
||||
const events: Event[] = []
|
||||
|
||||
const completedRelays = new Set()
|
||||
let completed: number
|
||||
|
||||
const complete = () => {
|
||||
if (completed) return
|
||||
|
||||
// Mark as cleaned upp, unsubscribe our executor
|
||||
completed = Date.now()
|
||||
executorSub.unsubscribe()
|
||||
|
||||
// Resolve our promise
|
||||
result.resolve(events)
|
||||
|
||||
// Notify caller, clean up our event emitter
|
||||
emitter.emit(SubscriptionEvent.Complete)
|
||||
emitter.removeAllListeners()
|
||||
|
||||
// Remove our onClose handler from connections, since they are shared by many subs
|
||||
executor.target.connections.forEach((c: Connection) => c.off("close", onClose))
|
||||
executor.target.cleanup()
|
||||
}
|
||||
|
||||
const onEvent = (url: string, event: Event) => {
|
||||
// Check the signature and filters. If we've seen this event, don't re-validate.
|
||||
if (tracker.track(event.id, url)) {
|
||||
emitter.emit(SubscriptionEvent.Duplicate, url, event)
|
||||
} else if (NetworkContext.isDeleted(url, event)) {
|
||||
emitter.emit(SubscriptionEvent.DeletedEvent, url, event)
|
||||
} else if (!NetworkContext.matchFilters(url, filters, event)) {
|
||||
emitter.emit(SubscriptionEvent.FailedFilter, url, event)
|
||||
} else if (!NetworkContext.hasValidSignature(url, event)) {
|
||||
emitter.emit(SubscriptionEvent.InvalidSignature, url, event)
|
||||
} else {
|
||||
emitter.emit(SubscriptionEvent.Event, url, event)
|
||||
events.push(event)
|
||||
}
|
||||
}
|
||||
|
||||
const onEose = (url: string) => {
|
||||
completedRelays.add(url)
|
||||
|
||||
emitter.emit(SubscriptionEvent.Eose, url)
|
||||
|
||||
if (closeOnEose && completedRelays.size === executor.target.connections.length) {
|
||||
complete()
|
||||
}
|
||||
}
|
||||
|
||||
const onClose = (connection: Connection) => {
|
||||
completedRelays.add(connection.url)
|
||||
|
||||
emitter.emit(SubscriptionEvent.Close, connection.url)
|
||||
|
||||
if (completedRelays.size === executor.target.connections.length) {
|
||||
complete()
|
||||
}
|
||||
}
|
||||
|
||||
// Allow the caller to cancel the subscription
|
||||
emitter.on(SubscriptionEvent.Abort, complete)
|
||||
|
||||
// If we have a timeout, complete the subscription automatically
|
||||
if (timeout) setTimeout(complete, timeout)
|
||||
|
||||
// If one of our connections gets closed make sure to kill our sub
|
||||
executor.target.connections.forEach((c: Connection) => c.on('close', onClose))
|
||||
|
||||
// Finally, start our subscription
|
||||
const executorSub = executor.subscribe(filters, {onEvent, onEose})
|
||||
}
|
||||
|
||||
export const executeSubscriptions = (subs: Subscription[]) =>
|
||||
mergeSubscriptions(subs).forEach(executeSubscription)
|
||||
|
||||
export const executeSubscriptionBatched = batch(800, executeSubscriptions)
|
||||
|
||||
export const subscribe = (request: SubscribeRequest) => {
|
||||
const subscription: Subscription = makeSubscription(request)
|
||||
|
||||
if (request.filters.length === 0) {
|
||||
throw new Error("Zero filters passed to subscribe")
|
||||
}
|
||||
|
||||
if (request.immediate) {
|
||||
executeSubscription(subscription)
|
||||
} else {
|
||||
executeSubscriptionBatched(subscription)
|
||||
}
|
||||
|
||||
return subscription
|
||||
}
|
||||
@@ -0,0 +1,53 @@
|
||||
import {writable} from '@welshman/lib'
|
||||
|
||||
export class Tracker {
|
||||
data = writable(new Map<string, Set<string>>())
|
||||
|
||||
getRelays = (eventId: string) => {
|
||||
const relays = new Set<string>()
|
||||
|
||||
for (const relay of this.data.get().get(eventId) || []) {
|
||||
relays.add(relay)
|
||||
}
|
||||
|
||||
return relays
|
||||
}
|
||||
|
||||
hasRelay = (eventId: string, relay: string) => {
|
||||
return this.getRelays(eventId).has(relay)
|
||||
}
|
||||
|
||||
addRelay = (eventId: string, relay: string) => {
|
||||
const relays = this.data.get().get(eventId) || new Set()
|
||||
|
||||
relays.add(relay)
|
||||
|
||||
this.data.update(m => {
|
||||
m.set(eventId, relays)
|
||||
|
||||
return m
|
||||
})
|
||||
}
|
||||
|
||||
track = (eventId: string, relay: string) => {
|
||||
const seen = this.data.get().has(eventId)
|
||||
|
||||
this.addRelay(eventId, relay)
|
||||
|
||||
return seen
|
||||
}
|
||||
|
||||
copy = (eventId1: string, eventId2: string) => {
|
||||
for (const relay of this.getRelays(eventId1)) {
|
||||
this.addRelay(eventId2, relay)
|
||||
}
|
||||
}
|
||||
|
||||
clear = () => {
|
||||
this.data.update(m => {
|
||||
m.clear()
|
||||
|
||||
return m
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,13 @@
|
||||
export * from "./Connection"
|
||||
export * from "./ConnectionMeta"
|
||||
export * from "./Context"
|
||||
export * from "./Executor"
|
||||
export * from "./Pool"
|
||||
export * from "./Publish"
|
||||
export * from "./Socket"
|
||||
export * from "./Subscribe"
|
||||
export * from "./Tracker"
|
||||
export * from "./target/Multi"
|
||||
export * from "./target/Plex"
|
||||
export * from "./target/Relay"
|
||||
export * from "./target/Relays"
|
||||
@@ -0,0 +1,40 @@
|
||||
{
|
||||
"name": "@welshman/net",
|
||||
"version": "0.0.1",
|
||||
"author": "hodlbod",
|
||||
"license": "MIT",
|
||||
"description": "Utilities for connecting with nostr relays.",
|
||||
"publishConfig": {
|
||||
"access": "public"
|
||||
},
|
||||
"type": "module",
|
||||
"files": [
|
||||
"build"
|
||||
],
|
||||
"types": "./build/index.d.ts",
|
||||
"exports": {
|
||||
".": {
|
||||
"types": "./build/index.d.ts",
|
||||
"import": "./build/index.mjs",
|
||||
"require": "./build/index.cjs"
|
||||
}
|
||||
},
|
||||
"scripts": {
|
||||
"pub": "npm run lint && npm run build && npm publish",
|
||||
"build": "gts clean && tsc-multi",
|
||||
"lint": "gts lint",
|
||||
"fix": "gts fix"
|
||||
},
|
||||
"devDependencies": {
|
||||
"@types/events": "^3.0.3",
|
||||
"gts": "^5.0.1",
|
||||
"tsc-multi": "^1.1.0",
|
||||
"typescript": "~5.1.6"
|
||||
},
|
||||
"dependencies": {
|
||||
"@welshman/lib": "0.0.1",
|
||||
"@welshman/util": "0.0.1",
|
||||
"isomorphic-ws": "^5.0.0",
|
||||
"ws": "^8.16.0"
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,26 @@
|
||||
import {Emitter} from '@welshman/lib'
|
||||
import type {Message} from '@welshman/util'
|
||||
import type {Target} from '../Executor'
|
||||
|
||||
export class Multi extends Emitter {
|
||||
constructor(readonly targets: Target[]) {
|
||||
super()
|
||||
|
||||
targets.forEach(t => {
|
||||
t.on('*', (verb, ...args) => this.emit(verb, ...args))
|
||||
})
|
||||
}
|
||||
|
||||
get connections() {
|
||||
return this.targets.flatMap(t => t.connections)
|
||||
}
|
||||
|
||||
send(...payload: Message) {
|
||||
this.targets.forEach(t => t.send(...payload))
|
||||
}
|
||||
|
||||
cleanup = () => {
|
||||
this.removeAllListeners()
|
||||
this.targets.forEach(t => t.cleanup())
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,29 @@
|
||||
import {Emitter} from '@welshman/lib'
|
||||
import type {Message} from '@welshman/util'
|
||||
import type {PlexMessage} from '../Socket'
|
||||
import type {Connection} from '../Connection'
|
||||
|
||||
export class Plex extends Emitter {
|
||||
constructor(readonly urls: string[], readonly connection: Connection) {
|
||||
super()
|
||||
|
||||
this.connection.on('receive', this.onMessage)
|
||||
}
|
||||
|
||||
get connections() {
|
||||
return [this.connection]
|
||||
}
|
||||
|
||||
send = (...payload: Message) => {
|
||||
this.connection.send([{relays: this.urls}, payload])
|
||||
}
|
||||
|
||||
onMessage = (connection: Connection, [{relays}, [verb, ...payload]]: PlexMessage) => {
|
||||
this.emit(verb, relays[0], ...payload)
|
||||
}
|
||||
|
||||
cleanup = () => {
|
||||
this.removeAllListeners()
|
||||
this.connection.off('receive', this.onMessage)
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,28 @@
|
||||
import {Emitter} from '@welshman/lib'
|
||||
import type {Message} from '@welshman/util'
|
||||
import type {Connection} from '../Connection'
|
||||
|
||||
export class Relay extends Emitter {
|
||||
constructor(readonly connection: Connection) {
|
||||
super()
|
||||
|
||||
this.connection.on('receive', this.onMessage)
|
||||
}
|
||||
|
||||
get connections() {
|
||||
return [this.connection]
|
||||
}
|
||||
|
||||
send(...payload: Message) {
|
||||
this.connection.send(payload)
|
||||
}
|
||||
|
||||
onMessage = (connection: Connection, [verb, ...payload]: Message) => {
|
||||
this.emit(verb, connection.url, ...payload)
|
||||
}
|
||||
|
||||
cleanup = () => {
|
||||
this.removeAllListeners()
|
||||
this.connection.off('receive', this.onMessage)
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,30 @@
|
||||
import {Emitter} from '@welshman/lib'
|
||||
import type {Message} from '@welshman/util'
|
||||
import type {Connection} from '../Connection'
|
||||
|
||||
export class Relays extends Emitter {
|
||||
constructor(readonly connections: Connection[]) {
|
||||
super()
|
||||
|
||||
connections.forEach(connection => {
|
||||
connection.on('receive', this.onMessage)
|
||||
})
|
||||
}
|
||||
|
||||
send = (...payload: Message) => {
|
||||
this.connections.forEach(connection => {
|
||||
connection.send(payload)
|
||||
})
|
||||
}
|
||||
|
||||
onMessage = (connection: Connection, [verb, ...payload]: Message) => {
|
||||
this.emit(verb, connection.url, ...payload)
|
||||
}
|
||||
|
||||
cleanup = () => {
|
||||
this.removeAllListeners()
|
||||
this.connections.forEach(connection => {
|
||||
connection.off('receive', this.onMessage)
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,7 @@
|
||||
{
|
||||
"targets": [
|
||||
{"extname": ".cjs", "module": "commonjs"},
|
||||
{"extname": ".mjs", "module": "esnext", "moduleResolution": "node"}
|
||||
],
|
||||
"projects": ["tsconfig.json"]
|
||||
}
|
||||
@@ -0,0 +1,11 @@
|
||||
{
|
||||
"extends": "../../node_modules/gts/tsconfig-google.json",
|
||||
"compilerOptions": {
|
||||
"rootDir": ".",
|
||||
"outDir": "build",
|
||||
"esModuleInterop": true,
|
||||
"skipLibCheck": true,
|
||||
"lib": ["esnext", "dom"]
|
||||
},
|
||||
"include": ["**/*.ts"]
|
||||
}
|
||||
Reference in New Issue
Block a user