Use default pool, make adapter context optional

This commit is contained in:
Jon Staab
2025-03-31 09:35:48 -07:00
parent 51dd44161a
commit 4237b145ae
17 changed files with 293 additions and 290 deletions
+10
View File
@@ -19,10 +19,20 @@ export type PoolOptions = {
makeSocket?: (url: string) => Socket
}
export let poolSingleton: Pool
export class Pool {
_data = new Map<string, Socket>()
_subs: PoolSubscription[] = []
static getSingleton() {
if (!poolSingleton) {
poolSingleton = new Pool()
}
return poolSingleton
}
constructor(readonly options: PoolOptions = {}) {}
has(url: string) {
+29 -29
View File
@@ -2,7 +2,7 @@ import {EventEmitter} from "events"
import {on, fromPairs, sleep, yieldThread} from "@welshman/lib"
import {SignedEvent} from "@welshman/util"
import {RelayMessage, ClientMessageType, isRelayOk} from "./message.js"
import {AbstractAdapter, AdapterEventType, AdapterContext, getAdapter} from "./adapter.js"
import {AbstractAdapter, AdapterEvent, AdapterContext, getAdapter} from "./adapter.js"
import {TypedEmitter} from "./util.js"
export enum PublishStatus {
@@ -13,7 +13,7 @@ export enum PublishStatus {
Aborted = "publish:status:aborted",
}
export enum PublishEventType {
export enum PublishEvent {
Success = "publish:event:success",
Failure = "publish:event:failure",
Timeout = "publish:event:timeout",
@@ -24,17 +24,17 @@ export enum PublishEventType {
// Unicast
export type UnicastEvents = {
[PublishEventType.Success]: (id: string, detail: string) => void
[PublishEventType.Failure]: (id: string, detail: string) => void
[PublishEventType.Timeout]: () => void
[PublishEventType.Aborted]: () => void
[PublishEventType.Complete]: () => void
[PublishEvent.Success]: (id: string, detail: string) => void
[PublishEvent.Failure]: (id: string, detail: string) => void
[PublishEvent.Timeout]: () => void
[PublishEvent.Aborted]: () => void
[PublishEvent.Complete]: () => void
}
export type UnicastOptions = {
event: SignedEvent
relay: string
context: AdapterContext
context?: AdapterContext
timeout?: number
}
@@ -53,7 +53,7 @@ export class Unicast extends (EventEmitter as new () => TypedEmitter<UnicastEven
// Listen for Unicast result
this._unsubscriber = on(
this._adapter,
AdapterEventType.Receive,
AdapterEvent.Receive,
(message: RelayMessage, url: string) => {
if (isRelayOk(message)) {
const [_, id, ok, detail] = message
@@ -62,10 +62,10 @@ export class Unicast extends (EventEmitter as new () => TypedEmitter<UnicastEven
if (ok) {
this.status = PublishStatus.Success
this.emit(PublishEventType.Success, id, detail)
this.emit(PublishEvent.Success, id, detail)
} else {
this.status = PublishStatus.Failure
this.emit(PublishEventType.Failure, id, detail)
this.emit(PublishEvent.Failure, id, detail)
}
this.cleanup()
@@ -77,7 +77,7 @@ export class Unicast extends (EventEmitter as new () => TypedEmitter<UnicastEven
sleep(this.options.timeout || 10_000).then(() => {
if (this.status === PublishStatus.Pending) {
this.status = PublishStatus.Timeout
this.emit(PublishEventType.Timeout)
this.emit(PublishEvent.Timeout)
}
this.cleanup()
@@ -92,13 +92,13 @@ export class Unicast extends (EventEmitter as new () => TypedEmitter<UnicastEven
abort = () => {
if (this.status === PublishStatus.Pending) {
this.status = PublishStatus.Aborted
this.emit(PublishEventType.Aborted)
this.emit(PublishEvent.Aborted)
this.cleanup()
}
}
cleanup = () => {
this.emit(PublishEventType.Complete)
this.emit(PublishEvent.Complete)
this.removeAllListeners()
this._adapter.cleanup()
this._unsubscriber()
@@ -108,11 +108,11 @@ export class Unicast extends (EventEmitter as new () => TypedEmitter<UnicastEven
// Multicast
export type MulticastEvents = {
[PublishEventType.Success]: (id: string, detail: string, url: string) => void
[PublishEventType.Failure]: (id: string, detail: string, url: string) => void
[PublishEventType.Timeout]: (url: string) => void
[PublishEventType.Aborted]: (url: string) => void
[PublishEventType.Complete]: () => void
[PublishEvent.Success]: (id: string, detail: string, url: string) => void
[PublishEvent.Failure]: (id: string, detail: string, url: string) => void
[PublishEvent.Timeout]: (url: string) => void
[PublishEvent.Aborted]: (url: string) => void
[PublishEvent.Complete]: () => void
}
export type MulticastOptions = Omit<UnicastOptions, "relay"> & {
@@ -133,32 +133,32 @@ export class Multicast extends (EventEmitter as new () => TypedEmitter<Multicast
for (const relay of relays) {
const unicast = new Unicast({relay, ...options})
unicast.on(PublishEventType.Success, (id: string, detail: string) => {
unicast.on(PublishEvent.Success, (id: string, detail: string) => {
this.status[relay] = unicast.status
this.emit(PublishEventType.Success, id, detail, relay)
this.emit(PublishEvent.Success, id, detail, relay)
})
unicast.on(PublishEventType.Failure, (id: string, detail: string) => {
unicast.on(PublishEvent.Failure, (id: string, detail: string) => {
this.status[relay] = unicast.status
this.emit(PublishEventType.Failure, id, detail, relay)
this.emit(PublishEvent.Failure, id, detail, relay)
})
unicast.on(PublishEventType.Timeout, () => {
unicast.on(PublishEvent.Timeout, () => {
this.status[relay] = unicast.status
this.emit(PublishEventType.Timeout, relay)
this.emit(PublishEvent.Timeout, relay)
})
unicast.on(PublishEventType.Aborted, () => {
unicast.on(PublishEvent.Aborted, () => {
this.status[relay] = unicast.status
this.emit(PublishEventType.Aborted, relay)
this.emit(PublishEvent.Aborted, relay)
})
unicast.on(PublishEventType.Complete, () => {
unicast.on(PublishEvent.Complete, () => {
this._completed.add(relay)
this.status[relay] = unicast.status
if (this._completed.size === relays.length) {
this.emit(PublishEventType.Complete)
this.emit(PublishEvent.Complete)
this.cleanup()
}
})
+17 -17
View File
@@ -13,7 +13,7 @@ export enum SocketStatus {
Invalid = "socket:status:invalid",
}
export enum SocketEventType {
export enum SocketEvent {
Error = "socket:event:error",
Status = "socket:event:status",
Send = "socket:event:send",
@@ -22,11 +22,11 @@ export enum SocketEventType {
}
export type SocketEvents = {
[SocketEventType.Error]: (error: string, url: string) => void
[SocketEventType.Status]: (status: SocketStatus, url: string) => void
[SocketEventType.Send]: (message: ClientMessage, url: string) => void
[SocketEventType.Enqueue]: (message: ClientMessage, url: string) => void
[SocketEventType.Receive]: (message: RelayMessage, url: string) => void
[SocketEvent.Error]: (error: string, url: string) => void
[SocketEvent.Status]: (status: SocketStatus, url: string) => void
[SocketEvent.Send]: (message: ClientMessage, url: string) => void
[SocketEvent.Enqueue]: (message: ClientMessage, url: string) => void
[SocketEvent.Receive]: (message: RelayMessage, url: string) => void
}
export class Socket extends (EventEmitter as new () => TypedEmitter<SocketEvents>) {
@@ -43,18 +43,18 @@ export class Socket extends (EventEmitter as new () => TypedEmitter<SocketEvents
batchSize: 50,
processItem: (message: ClientMessage) => {
this._ws?.send(JSON.stringify(message))
this.emit(SocketEventType.Send, message, this.url)
this.emit(SocketEvent.Send, message, this.url)
},
})
this._recvQueue = new TaskQueue<RelayMessage>({
batchSize: 50,
processItem: (message: RelayMessage) => {
this.emit(SocketEventType.Receive, message, this.url)
this.emit(SocketEvent.Receive, message, this.url)
},
})
this.on(SocketEventType.Status, (status: SocketStatus) => {
this.on(SocketEvent.Status, (status: SocketStatus) => {
this.status = status
})
}
@@ -66,21 +66,21 @@ export class Socket extends (EventEmitter as new () => TypedEmitter<SocketEvents
try {
this._ws = new WebSocket(this.url)
this.emit(SocketEventType.Status, SocketStatus.Opening, this.url)
this.emit(SocketEvent.Status, SocketStatus.Opening, this.url)
this._ws.onopen = () => {
this.emit(SocketEventType.Status, SocketStatus.Open, this.url)
this.emit(SocketEvent.Status, SocketStatus.Open, this.url)
this._sendQueue.start()
}
this._ws.onerror = () => {
this.emit(SocketEventType.Status, SocketStatus.Error, this.url)
this.emit(SocketEvent.Status, SocketStatus.Error, this.url)
this._sendQueue.stop()
this._ws = undefined
}
this._ws.onclose = () => {
this.emit(SocketEventType.Status, SocketStatus.Closed, this.url)
this.emit(SocketEvent.Status, SocketStatus.Closed, this.url)
this._sendQueue.stop()
this._ws = undefined
}
@@ -94,14 +94,14 @@ export class Socket extends (EventEmitter as new () => TypedEmitter<SocketEvents
if (Array.isArray(message)) {
this._recvQueue.push(message as RelayMessage)
} else {
this.emit(SocketEventType.Error, "Invalid message received", this.url)
this.emit(SocketEvent.Error, "Invalid message received", this.url)
}
} catch (e) {
this.emit(SocketEventType.Error, "Invalid message received", this.url)
this.emit(SocketEvent.Error, "Invalid message received", this.url)
}
}
} catch (e) {
this.emit(SocketEventType.Status, SocketStatus.Invalid, this.url)
this.emit(SocketEvent.Status, SocketStatus.Invalid, this.url)
}
}
@@ -125,6 +125,6 @@ export class Socket extends (EventEmitter as new () => TypedEmitter<SocketEvents
send = (message: ClientMessage) => {
this._sendQueue.push(message)
this.emit(SocketEventType.Enqueue, message, this.url)
this.emit(SocketEvent.Enqueue, message, this.url)
}
}
+22 -16
View File
@@ -2,16 +2,16 @@ import EventEmitter from "events"
import {call, on} from "@welshman/lib"
import {Relay, LOCAL_RELAY_URL, isRelayUrl} from "@welshman/util"
import {RelayMessage, ClientMessage} from "./message.js"
import {Socket, SocketEventType} from "./socket.js"
import {Socket, SocketEvent} from "./socket.js"
import {TypedEmitter, Unsubscriber} from "./util.js"
import {Pool} from "./pool.js"
export enum AdapterEventType {
export enum AdapterEvent {
Receive = "adapter:event:receive",
}
export type AdapterEvents = {
[AdapterEventType.Receive]: (message: RelayMessage, url: string) => void
[AdapterEvent.Receive]: (message: RelayMessage, url: string) => void
}
export abstract class AbstractAdapter extends (EventEmitter as new () => TypedEmitter<AdapterEvents>) {
@@ -32,8 +32,8 @@ export class SocketAdapter extends AbstractAdapter {
super()
this._unsubscribers.push(
on(socket, SocketEventType.Receive, (message: RelayMessage, url: string) => {
this.emit(AdapterEventType.Receive, message, url)
on(socket, SocketEvent.Receive, (message: RelayMessage, url: string) => {
this.emit(AdapterEvent.Receive, message, url)
}),
)
}
@@ -57,7 +57,7 @@ export class LocalAdapter extends AbstractAdapter {
this._unsubscribers.push(
on(relay, "*", (...message: RelayMessage) => {
this.emit(AdapterEventType.Receive, message, LOCAL_RELAY_URL)
this.emit(AdapterEvent.Receive, message, LOCAL_RELAY_URL)
}),
)
}
@@ -77,13 +77,25 @@ export class LocalAdapter extends AbstractAdapter {
}
}
export class EmptyAdapter extends AbstractAdapter {
get sockets() {
return []
}
get urls() {
return []
}
send(message: ClientMessage) {}
}
export type AdapterContext = {
pool?: Pool
relay?: Relay
getAdapter?: (url: string, context: AdapterContext) => AbstractAdapter
}
export const getAdapter = (url: string, context: AdapterContext) => {
export const getAdapter = (url: string, context: AdapterContext = {}) => {
if (context.getAdapter) {
const adapter = context.getAdapter(url, context)
@@ -93,19 +105,13 @@ export const getAdapter = (url: string, context: AdapterContext) => {
}
if (url === LOCAL_RELAY_URL) {
if (!context.relay) {
throw new Error(`Unable to get local relay for ${url}`)
}
return new LocalAdapter(context.relay)
return context.relay ? new LocalAdapter(context.relay) : new EmptyAdapter()
}
if (isRelayUrl(url)) {
if (!context.pool) {
throw new Error(`Unable to get socket for ${url}`)
}
const pool = context.pool || Pool.getSingleton()
return new SocketAdapter(context.pool.get(url))
return new SocketAdapter(pool.get(url))
}
throw new Error(`Invalid relay url ${url}`)
+8 -8
View File
@@ -3,7 +3,7 @@ import {on, call, sleep} from "@welshman/lib"
import type {SignedEvent, StampedEvent} from "@welshman/util"
import {makeEvent, CLIENT_AUTH} from "@welshman/util"
import {isRelayAuth, isClientAuth, isRelayOk, RelayMessage} from "./message.js"
import {Socket, SocketStatus, SocketEventType} from "./socket.js"
import {Socket, SocketStatus, SocketEvent} from "./socket.js"
import {TypedEmitter, Unsubscriber} from "./util.js"
export const makeAuthEvent = (url: string, challenge: string) =>
@@ -29,12 +29,12 @@ export type AuthResult = {
reason?: string
}
export enum AuthStateEventType {
export enum AuthStateEvent {
Status = "auth:event:status",
}
export type AuthStateEvents = {
[AuthStateEventType.Status]: (status: AuthStatus) => void
[AuthStateEvent.Status]: (status: AuthStatus) => void
}
export class AuthState extends (EventEmitter as new () => TypedEmitter<AuthStateEvents>) {
@@ -48,7 +48,7 @@ export class AuthState extends (EventEmitter as new () => TypedEmitter<AuthState
super()
this._unsubscribers.push(
on(socket, SocketEventType.Receive, (message: RelayMessage) => {
on(socket, SocketEvent.Receive, (message: RelayMessage) => {
if (isRelayOk(message)) {
const [_, id, ok, details] = message
@@ -72,12 +72,12 @@ export class AuthState extends (EventEmitter as new () => TypedEmitter<AuthState
this.setStatus(AuthStatus.Requested)
}
}),
on(socket, SocketEventType.Enqueue, (message: RelayMessage) => {
on(socket, SocketEvent.Enqueue, (message: RelayMessage) => {
if (isClientAuth(message)) {
this.setStatus(AuthStatus.PendingResponse)
}
}),
on(socket, SocketEventType.Status, (status: SocketStatus) => {
on(socket, SocketEvent.Status, (status: SocketStatus) => {
if (status === SocketStatus.Closed) {
this.challenge = undefined
this.request = undefined
@@ -90,7 +90,7 @@ export class AuthState extends (EventEmitter as new () => TypedEmitter<AuthState
setStatus(status: AuthStatus) {
this.status = status
this.emit(AuthStateEventType.Status, status)
this.emit(AuthStateEvent.Status, status)
}
cleanup() {
@@ -112,7 +112,7 @@ export class AuthManager {
readonly options: AuthManagerOptions,
) {
this.state = new AuthState(socket)
this.state.on(AuthStateEventType.Status, (status: string) => {
this.state.on(AuthStateEvent.Status, (status: string) => {
if (status === AuthStatus.Requested && options.eager) {
this.respond()
}
+18 -18
View File
@@ -9,28 +9,28 @@ import {
RelayMessageType,
ClientMessageType,
} from "./message.js"
import {getAdapter, AdapterContext, AbstractAdapter, AdapterEventType} from "./adapter.js"
import {getAdapter, AdapterContext, AbstractAdapter, AdapterEvent} from "./adapter.js"
import {Negentropy, NegentropyStorageVector} from "./negentropy.js"
import {unireq, RequestEventType} from "./request.js"
import {multicast, PublishEventType} from "./publish.js"
import {unireq, RequestEvent} from "./request.js"
import {multicast, PublishEvent} from "./publish.js"
export enum DifferenceEventType {
export enum DifferenceEvent {
Message = "difference:event:message",
Error = "difference:event:error",
Close = "difference:event:close",
}
export type DifferenceEvents = {
[DifferenceEventType.Message]: (payload: {have: string[]; need: string[]}, url: string) => void
[DifferenceEventType.Error]: (error: string, url: string) => void
[DifferenceEventType.Close]: () => void
[DifferenceEvent.Message]: (payload: {have: string[]; need: string[]}, url: string) => void
[DifferenceEvent.Error]: (error: string, url: string) => void
[DifferenceEvent.Close]: () => void
}
export type DifferenceOptions = {
relay: string
filter: Filter
events: SignedEvent[]
context: AdapterContext
context?: AdapterContext
}
export class Difference extends (EventEmitter as new () => TypedEmitter<DifferenceEvents>) {
@@ -61,7 +61,7 @@ export class Difference extends (EventEmitter as new () => TypedEmitter<Differen
// Add listeners
this._unsubscriber = on(
this._adapter,
AdapterEventType.Receive,
AdapterEvent.Receive,
async (message: RelayMessage, url: string) => {
if (isRelayNegMsg(message)) {
const [_, negid, msg] = message
@@ -77,7 +77,7 @@ export class Difference extends (EventEmitter as new () => TypedEmitter<Differen
this.need.add(id)
}
this.emit(DifferenceEventType.Message, {have, need}, url)
this.emit(DifferenceEvent.Message, {have, need}, url)
if (newMsg) {
this._adapter.send([RelayMessageType.NegMsg, this._id, newMsg])
@@ -89,7 +89,7 @@ export class Difference extends (EventEmitter as new () => TypedEmitter<Differen
const [_, negid, msg] = message
if (negid === this._id) {
this.emit(DifferenceEventType.Error, msg, url)
this.emit(DifferenceEvent.Error, msg, url)
}
}
},
@@ -104,7 +104,7 @@ export class Difference extends (EventEmitter as new () => TypedEmitter<Differen
if (this._closed) return
this._adapter.send([ClientMessageType.NegClose, this._id])
this.emit(DifferenceEventType.Close)
this.emit(DifferenceEvent.Close)
this.removeAllListeners()
this._adapter.cleanup()
this._unsubscriber()
@@ -118,7 +118,7 @@ export type DiffOptions = {
relays: string[]
filters: Filter[]
events: SignedEvent[]
context: AdapterContext
context?: AdapterContext
}
export type DiffItem = {
@@ -137,12 +137,12 @@ export const diff = async ({relays, filters, ...options}: DiffOptions) => {
new Promise<DiffItem>((resolve, reject) => {
const diff = new Difference({relay, filter, ...options})
diff.on(DifferenceEventType.Close, () => {
diff.on(DifferenceEvent.Close, () => {
resolve({relay, have: diff.have, need: diff.need})
diff.close()
})
diff.on(DifferenceEventType.Error, (url, message) => {
diff.on(DifferenceEvent.Error, (url, message) => {
reject(message)
diff.close()
})
@@ -206,8 +206,8 @@ export const pull = async ({context, ...options}: PullOptions) => {
return new Promise<void>(resolve => {
const req = unireq({relay, context, filter: {ids}, autoClose: true})
req.on(RequestEventType.Close, resolve)
req.on(RequestEventType.Event, event => result.push(event))
req.on(RequestEvent.Close, resolve)
req.on(RequestEvent.Event, event => result.push(event))
})
}),
)
@@ -236,7 +236,7 @@ export const push = async ({context, events, ...options}: PushOptions) => {
if (relays) {
new Promise<void>(resolve => {
multicast({event, relays, context}).on(PublishEventType.Complete, resolve)
multicast({event, relays, context}).on(PublishEvent.Complete, resolve)
})
}
}),
+13 -13
View File
@@ -11,8 +11,8 @@ import {
isRelayOk,
isRelayClosed,
} from "./message.js"
import {Socket, SocketStatus, SocketEventType} from "./socket.js"
import {AuthState, AuthStatus, AuthStateEventType} from "./auth.js"
import {Socket, SocketStatus, SocketEvent} from "./socket.js"
import {AuthState, AuthStatus, AuthStateEvent} from "./auth.js"
/**
* Defers sending messages when a challenge has been presented and not answered yet
@@ -26,7 +26,7 @@ export const socketPolicyDeferOnAuth = (socket: Socket) => {
const unsubscribers = [
// Pause sending certain messages when we're not authenticated
on(socket, SocketEventType.Enqueue, (message: ClientMessage) => {
on(socket, SocketEvent.Enqueue, (message: ClientMessage) => {
// If we're closing a request, but it never got sent, remove both from the queue
// Otherwise, always send CLOSE
if (isClientClose(message)) {
@@ -53,7 +53,7 @@ export const socketPolicyDeferOnAuth = (socket: Socket) => {
}
}),
// Send buffered messages when we get successful auth
on(authState, AuthStateEventType.Status, (status: AuthStatus) => {
on(authState, AuthStateEvent.Status, (status: AuthStatus) => {
if (okStatuses.includes(status) && buffer.length > 0) {
for (const message of buffer.splice(0)) {
socket.send(message)
@@ -79,7 +79,7 @@ export const socketPolicyRetryAuthRequired = (socket: Socket) => {
const unsubscribers = [
// Watch outgoing events and requests and keep a copy
on(socket, SocketEventType.Send, (message: ClientMessage) => {
on(socket, SocketEvent.Send, (message: ClientMessage) => {
if (isClientEvent(message)) {
const [_, event] = message
@@ -97,7 +97,7 @@ export const socketPolicyRetryAuthRequired = (socket: Socket) => {
}
}),
// If a message is rejected with auth-required, re-enqueue it one time
on(socket, SocketEventType.Receive, (message: RelayMessage) => {
on(socket, SocketEvent.Receive, (message: RelayMessage) => {
if (isRelayOk(message)) {
const [_, id, ok, detail] = message
const pendingMessage = pending.get(id)
@@ -137,7 +137,7 @@ export const socketPolicyConnectOnSend = (socket: Socket) => {
let currentStatus = SocketStatus.Closed
const unsubscribers = [
on(socket, SocketEventType.Status, (newStatus: SocketStatus) => {
on(socket, SocketEvent.Status, (newStatus: SocketStatus) => {
// Keep track of the most recent error
if (newStatus === SocketStatus.Error) {
lastError = now()
@@ -146,7 +146,7 @@ export const socketPolicyConnectOnSend = (socket: Socket) => {
// Keep track of the current status
currentStatus = newStatus
}),
on(socket, SocketEventType.Enqueue, (message: ClientMessage) => {
on(socket, SocketEvent.Enqueue, (message: ClientMessage) => {
// When a new message is sent, make sure the socket is open (unless there was a recent error)
if (currentStatus === SocketStatus.Closed && lastError < ago(30)) {
socket.open()
@@ -166,10 +166,10 @@ export const socketPolicyCloseOnTimeout = (socket: Socket) => {
let lastActivity = now()
const unsubscribers = [
on(socket, SocketEventType.Send, (message: ClientMessage) => {
on(socket, SocketEvent.Send, (message: ClientMessage) => {
lastActivity = now()
}),
on(socket, SocketEventType.Receive, (message: RelayMessage) => {
on(socket, SocketEvent.Receive, (message: RelayMessage) => {
lastActivity = now()
}),
]
@@ -197,7 +197,7 @@ export const socketPolicyReopenActive = (socket: Socket) => {
let lastOpen = Date.now()
const unsubscribers = [
on(socket, SocketEventType.Status, (newStatus: SocketStatus) => {
on(socket, SocketEvent.Status, (newStatus: SocketStatus) => {
// Keep track of the most recent error
if (newStatus === SocketStatus.Open) {
lastOpen = Date.now()
@@ -214,7 +214,7 @@ export const socketPolicyReopenActive = (socket: Socket) => {
})
}
}),
on(socket, SocketEventType.Send, (message: ClientMessage) => {
on(socket, SocketEvent.Send, (message: ClientMessage) => {
if (isClientEvent(message)) {
pending.set(message[1].id, message)
}
@@ -227,7 +227,7 @@ export const socketPolicyReopenActive = (socket: Socket) => {
pending.delete(message[1])
}
}),
on(socket, SocketEventType.Receive, (message: RelayMessage) => {
on(socket, SocketEvent.Receive, (message: RelayMessage) => {
if (isRelayClosed(message) || isRelayOk(message)) {
pending.delete(message[1])
}
+41 -41
View File
@@ -3,8 +3,8 @@ import {verifyEvent as nostrToolsVerifyEvent} from "nostr-tools/pure"
import {on, call, randomId, yieldThread} from "@welshman/lib"
import {Filter, matchFilter, SignedEvent} from "@welshman/util"
import {RelayMessage, ClientMessageType, isRelayEvent, isRelayEose} from "./message.js"
import {getAdapter, AdapterContext, AbstractAdapter, AdapterEventType} from "./adapter.js"
import {SocketEventType, SocketStatus} from "./socket.js"
import {getAdapter, AdapterContext, AbstractAdapter, AdapterEvent} from "./adapter.js"
import {SocketEvent, SocketStatus} from "./socket.js"
import {TypedEmitter, Unsubscriber} from "./util.js"
import {Tracker} from "./tracker.js"
@@ -16,7 +16,7 @@ export const defaultVerifyEvent = (event: SignedEvent) => {
}
}
export enum RequestEventType {
export enum RequestEvent {
Close = "request:event:close",
Disconnect = "request:event:disconnect",
Duplicate = "request:event:duplicate",
@@ -29,19 +29,19 @@ export enum RequestEventType {
// Unireq
export type UnireqEvents = {
[RequestEventType.Event]: (event: SignedEvent) => void
[RequestEventType.Invalid]: (event: SignedEvent) => void
[RequestEventType.Filtered]: (event: SignedEvent) => void
[RequestEventType.Duplicate]: (event: SignedEvent) => void
[RequestEventType.Disconnect]: () => void
[RequestEventType.Close]: () => void
[RequestEventType.Eose]: () => void
[RequestEvent.Event]: (event: SignedEvent) => void
[RequestEvent.Invalid]: (event: SignedEvent) => void
[RequestEvent.Filtered]: (event: SignedEvent) => void
[RequestEvent.Duplicate]: (event: SignedEvent) => void
[RequestEvent.Disconnect]: () => void
[RequestEvent.Close]: () => void
[RequestEvent.Eose]: () => void
}
export type UnireqOptions = {
relay: string
filter: Filter
context: AdapterContext
context?: AdapterContext
timeout?: number
tracker?: Tracker
autoClose?: boolean
@@ -66,20 +66,20 @@ export class Unireq extends (EventEmitter as new () => TypedEmitter<UnireqEvents
// Listen for event/eose messages from the adapter
this._unsubscribers.push(
on(this._adapter, AdapterEventType.Receive, (message: RelayMessage, url: string) => {
on(this._adapter, AdapterEvent.Receive, (message: RelayMessage, url: string) => {
if (isRelayEvent(message)) {
const [_, id, event] = message
if (id !== this._id) return
if (tracker.track(event.id, url)) {
this.emit(RequestEventType.Duplicate, event)
this.emit(RequestEvent.Duplicate, event)
} else if (verifyEvent?.(event) === false) {
this.emit(RequestEventType.Invalid, event)
this.emit(RequestEvent.Invalid, event)
} else if (!matchFilter(this.options.filter, event)) {
this.emit(RequestEventType.Filtered, event)
this.emit(RequestEvent.Filtered, event)
} else {
this.emit(RequestEventType.Event, event)
this.emit(RequestEvent.Event, event)
}
}
@@ -87,7 +87,7 @@ export class Unireq extends (EventEmitter as new () => TypedEmitter<UnireqEvents
const [_, id] = message
if (id === this._id) {
this.emit(RequestEventType.Eose)
this.emit(RequestEvent.Eose)
if (this.options.autoClose) {
this.close()
@@ -100,9 +100,9 @@ export class Unireq extends (EventEmitter as new () => TypedEmitter<UnireqEvents
// Listen to disconnects from any sockets
for (const socket of this._adapter.sockets) {
this._unsubscribers.push(
on(socket, SocketEventType.Status, (status: SocketStatus) => {
on(socket, SocketEvent.Status, (status: SocketStatus) => {
if (![SocketStatus.Open, SocketStatus.Opening].includes(status)) {
this.emit(RequestEventType.Disconnect)
this.emit(RequestEvent.Disconnect)
if (this.options.autoClose) {
this.close()
@@ -127,7 +127,7 @@ export class Unireq extends (EventEmitter as new () => TypedEmitter<UnireqEvents
if (this._closed) return
this._adapter.send(["CLOSE", this._id])
this.emit(RequestEventType.Close)
this.emit(RequestEvent.Close)
this.removeAllListeners()
this._unsubscribers.map(call)
this._adapter.cleanup()
@@ -138,13 +138,13 @@ export class Unireq extends (EventEmitter as new () => TypedEmitter<UnireqEvents
// Multireq
export type MultireqEvents = {
[RequestEventType.Event]: (event: SignedEvent, url: string) => void
[RequestEventType.Invalid]: (event: SignedEvent, url: string) => void
[RequestEventType.Filtered]: (event: SignedEvent, url: string) => void
[RequestEventType.Duplicate]: (event: SignedEvent, url: string) => void
[RequestEventType.Disconnect]: (url: string) => void
[RequestEventType.Eose]: (url: string) => void
[RequestEventType.Close]: () => void
[RequestEvent.Event]: (event: SignedEvent, url: string) => void
[RequestEvent.Invalid]: (event: SignedEvent, url: string) => void
[RequestEvent.Filtered]: (event: SignedEvent, url: string) => void
[RequestEvent.Duplicate]: (event: SignedEvent, url: string) => void
[RequestEvent.Disconnect]: (url: string) => void
[RequestEvent.Eose]: (url: string) => void
[RequestEvent.Close]: () => void
}
export type MultireqOptions = Omit<UnireqOptions, "relay"> & {
@@ -163,35 +163,35 @@ export class Multireq extends (EventEmitter as new () => TypedEmitter<MultireqEv
for (const relay of relays) {
const req = new Unireq({relay, tracker, ...options})
req.on(RequestEventType.Event, (event: SignedEvent) => {
this.emit(RequestEventType.Event, event, relay)
req.on(RequestEvent.Event, (event: SignedEvent) => {
this.emit(RequestEvent.Event, event, relay)
})
req.on(RequestEventType.Invalid, (event: SignedEvent) => {
this.emit(RequestEventType.Invalid, event, relay)
req.on(RequestEvent.Invalid, (event: SignedEvent) => {
this.emit(RequestEvent.Invalid, event, relay)
})
req.on(RequestEventType.Filtered, (event: SignedEvent) => {
this.emit(RequestEventType.Filtered, event, relay)
req.on(RequestEvent.Filtered, (event: SignedEvent) => {
this.emit(RequestEvent.Filtered, event, relay)
})
req.on(RequestEventType.Duplicate, (event: SignedEvent) => {
this.emit(RequestEventType.Duplicate, event, relay)
req.on(RequestEvent.Duplicate, (event: SignedEvent) => {
this.emit(RequestEvent.Duplicate, event, relay)
})
req.on(RequestEventType.Disconnect, () => {
this.emit(RequestEventType.Disconnect, relay)
req.on(RequestEvent.Disconnect, () => {
this.emit(RequestEvent.Disconnect, relay)
})
req.on(RequestEventType.Eose, () => {
this.emit(RequestEventType.Eose, relay)
req.on(RequestEvent.Eose, () => {
this.emit(RequestEvent.Eose, relay)
})
req.on(RequestEventType.Close, () => {
req.on(RequestEvent.Close, () => {
this._closed.add(relay)
if (this._closed.size === relays.length) {
this.emit(RequestEventType.Close)
this.emit(RequestEvent.Close)
}
})