21 KiB
name, description
| name | description |
|---|---|
| welshman-net | Use this skill when working with @welshman/net: relay connections, request/publish flows, auth, relay pool management, adapters, policies, or low-level nostr network I/O. |
welshman/net — Relay Network Layer
@welshman/net is the core networking layer for welshman-based nostr apps. It manages WebSocket relay connections, subscriptions, event publishing, NIP-42 auth, and NIP-77 negentropy sync. It sits below @welshman/app (which provides higher-level reactive stores and routing) and depends on @welshman/util for event types and @welshman/lib for utilities.
Installation
npm install @welshman/net
# or
pnpm add @welshman/net
yarn add @welshman/net
Key Exports
Pool & Sockets
| Export | Description |
|---|---|
Pool |
Singleton connection pool; creates and manages Socket instances per relay URL |
Pool.get() |
Returns the singleton Pool instance |
pool.get(url) |
Gets or lazily creates a Socket for the given relay URL |
pool.remove(url) |
Removes and cleans up a socket |
pool.subscribe(cb) |
Fires cb(socket) each time a new socket is created; returns unsubscriber |
Socket |
WebSocket wrapper with status tracking, send queue, and auth state |
SocketStatus |
Enum: Open, Opening, Closing, Closed, Error |
SocketEvent |
Enum: Status, Send, Sending, Receive, Receiving, Error |
socket.auth |
AuthState instance for NIP-42 on this connection |
Request
| Export | Description |
|---|---|
requestOne(options) |
Subscribe to a single relay; returns Promise<TrustedEvent[]> |
request(options) |
Subscribe to multiple relays in parallel; returns Promise<TrustedEvent[]> |
makeLoader(options) |
Creates a batching load function with configurable delay/timeout/threshold |
load(options) |
Pre-built loader: 200 ms batch delay, 3 s timeout, 0.5 threshold. Simpler than request() when you just want events — auto-closes after EOSE, timeout, or disconnect; resolves when half the relays' subscriptions have closed; returns a Promise<TrustedEvent[]>. When used with @welshman/app, received events auto-flow into the repository and tracker. |
request / requestOne options (key fields):
relay/relays— relay URL(s)filters— array of nostrFilterobjectsautoClose?: boolean— close subscription after EOSE or on socket disconnectsignal?: AbortSignal— cancellationtracker?: Tracker— cross-relay deduplication (shared automatically byrequest)- Callbacks:
onEvent(event, url),onEose(url),onClose(),onDisconnect(url),onFiltered,onDuplicate,onDeleted,onInvalid,onClosed(reason, url)
request-only options:
threshold?: number— fraction of relays that must close before the promise resolves (default1)
Without autoClose or a signal, requestOne streams indefinitely — the returned promise only resolves if the relay sends CLOSED for all active subscription IDs. Default policies also re-send the REQ when sockets reconnect.
Publish
| Export | Description |
|---|---|
publish(options) |
Publishes to multiple relays; resolves to PublishResultsByRelay |
publishOne(options) |
Publishes to a single relay; resolves to PublishResult |
PublishStatus |
Enum: Sending, Pending, Success, Failure, Timeout, Aborted |
PublishResult |
{ relay: string, status: PublishStatus, detail: string } |
PublishResultsByRelay |
Record<string, PublishResult> |
publish options: event, relays, timeout? (default 10 s), signal?, context?, plus callbacks onSuccess, onFailure, onPending, onTimeout, onAborted, onComplete.
Auth (NIP-42)
| Export | Description |
|---|---|
AuthState |
Manages auth state for one socket; available as socket.auth |
AuthStatus |
Enum: None, Requested, PendingSignature, DeniedSignature, PendingResponse, Forbidden, Ok |
AuthStateEvent.Status |
Emitted when auth status changes |
makeSocketPolicyAuth(options) |
Creates a socket policy that auto-handles auth challenges |
defaultSocketPolicies |
Mutable array of policies applied to every new socket |
Policies
| Export | Description |
|---|---|
socketPolicyPing |
Sends a PING frame every 30 s when the socket is open and idle, to keep the connection alive |
socketPolicyAuthBuffer |
Buffers outgoing messages during auth and replays after success |
socketPolicyConnectOnSend |
Auto-opens closed sockets when a message is queued |
socketPolicyCloseInactive |
Closes idle sockets after 30 s (when no pending work remains); if the socket closes with pending work it delays and reopens, replaying queued messages |
defaultSocketPolicies |
Array of the four above; passed to every socket created by Pool |
A SocketPolicy is (socket: Socket) => Unsubscriber.
Repository
| Export | Description |
|---|---|
Repository |
In-memory indexed event store with delete/expiry support |
Repository.get() |
Returns the singleton instance |
repository.publish(event) |
Stores an event; returns false if duplicate/stale |
repository.query(filters, opts?) |
Returns matching TrustedEvent[] sorted by created_at desc |
repository.getEvent(idOrAddress) |
Look up by id or NIP-01 address (kind:pubkey:d) |
repository.isDeleted(event) |
true if a kind-5 delete covers this event |
repository.dump() |
Returns all stored events as TrustedEvent[] |
repository.load(events) |
Bulk-replaces all stored events; emits a single "update" diff. Events with event[verifiedSymbol] = true skip signature re-verification. |
LOCAL_RELAY_URL |
"local://welshman.relay/" — conventional URL for the local repository |
RepositoryUpdate |
{ added: TrustedEvent[], removed: Set<string> } — payload of "update" events |
mergeRepositoryUpdates(updates) |
Merges an array of RepositoryUpdate objects into one |
Emits "update" with RepositoryUpdate ({ added: TrustedEvent[], removed: Set<string> }) on every change.
Prefer
LOCAL_RELAY_URLover direct repository access. Rather than callingrepository.query()orrepository.publish()directly, passLOCAL_RELAY_URLas a relay URL to the standardload(),request(), andpublish()functions. This keeps local reads/writes going through the same policy, deduplication, and tracking pipeline as remote relay operations. Direct repository access is appropriate only for bulk startup (repository.load()) and low-level introspection (repository.getEvent(),repository.isDeleted()).
Tracker
| Export | Description |
|---|---|
Tracker |
Bidirectional map of eventId ↔ Set<relayUrl> |
tracker.track(eventId, relay) |
Records relay; returns true if the event was already seen |
tracker.getRelays(eventId) |
Set of relay URLs that have sent this event |
tracker.getIds(relay) |
Set of event ids seen from a relay |
tracker.copy(id1, id2) |
Copies relay associations from one id to another (used for gift wraps) |
tracker.load(relaysById) |
Bulk-replaces all relay mappings from a Map<string, Set<string>>; emits "load" |
tracker.clear() |
Removes all relay mappings; emits "clear" |
Adapters
| Export | Description |
|---|---|
getAdapter(url, context?) |
Factory: returns SocketAdapter, LocalAdapter, or custom adapter |
SocketAdapter |
WebSocket relay adapter |
LocalAdapter |
In-memory relay adapter |
MockAdapter |
Test adapter with manual send control |
AbstractAdapter |
Base class for custom adapters |
AdapterEvent.Receive |
Emitted when a relay message arrives |
Context
| Export | Description |
|---|---|
netContext |
Global NetContext config object |
NetContext |
{ pool, repository, isEventValid, isEventDeleted, getAdapter? } |
Mutate netContext fields directly to change global defaults; pass context to individual calls to override per-request.
Negentropy / Diff (NIP-77)
| Export | Description |
|---|---|
diff(options) |
Compares local events against relays; returns { relay, have, need }[] |
pull(options) |
Fetches events relays have that you don't |
push(options) |
Publishes events you have that relays don't |
Difference |
Low-level per-relay negentropy session |
Messages
| Export | Description |
|---|---|
RelayMessageType |
Enum of relay→client message types |
ClientMessageType |
Enum of client→relay message types |
isRelayEvent(), isRelayEose(), isRelayOk(), isRelayAuth(), etc. |
Type guards for relay messages |
isClientReq(), isClientEvent(), etc. |
Type guards for client messages |
WrapManager
| Export | Description |
|---|---|
WrapManager |
Tracks NIP-59 gift wrap → rumor relationships; stores decrypted rumors in the repository and copies relay tracking from the wrap to the rumor |
Common Patterns
Connect to a relay and stream events
import {Pool, SocketEvent, SocketStatus} from '@welshman/net'
const pool = Pool.get()
const socket = pool.get('wss://relay.example.com')
socket.on(SocketEvent.Status, (status: SocketStatus) => {
console.log('status:', status)
})
// Send REQ directly (prefer request() for higher-level use)
socket.send(['REQ', 'my-sub', {kinds: [1], limit: 10}])
Load events (one-shot, batched)
import {load} from '@welshman/net'
// load() batches multiple concurrent calls within 200 ms into a single REQ per relay.
// It auto-closes after EOSE, timeout, or disconnect, and resolves at 50 % relay threshold.
const events = await load({
relays: ['wss://relay.example.com', 'wss://relay2.example.com'],
filters: [{kinds: [0], authors: ['<pubkey>']}],
})
Stream events indefinitely
import {request} from '@welshman/net'
import {now} from '@welshman/lib'
// Without autoClose this will stream forever.
// The returned promise never settles unless all relays close the subscription.
const ctrl = new AbortController()
request({
relays: ['wss://relay.example.com'],
filters: [{kinds: [1], since: now()}],
signal: ctrl.signal,
onEvent: (event, url) => console.log(event.id, 'from', url),
})
// Later:
ctrl.abort()
Publish an event
import {publish, PublishStatus} from '@welshman/net'
const results = await publish({
event: signedEvent,
relays: ['wss://relay.example.com', 'wss://relay2.example.com'],
timeout: 5000,
onSuccess: r => console.log('accepted by', r.relay),
onFailure: r => console.warn('rejected by', r.relay, r.detail),
})
for (const [relay, result] of Object.entries(results)) {
if (result.status === PublishStatus.Success) {
console.log(relay, 'ok')
}
}
Enable NIP-42 auth globally
import {defaultSocketPolicies, makeSocketPolicyAuth} from '@welshman/net'
import type {StampedEvent} from '@welshman/util'
// Call once at app startup, before any sockets are opened.
defaultSocketPolicies.push(
makeSocketPolicyAuth({
sign: (event: StampedEvent) => mySigner.sign(event),
shouldAuth: (socket) => true, // auth on every relay
}),
)
Custom socket policies
A SocketPolicy is (socket: Socket) => Unsubscriber. It receives the socket when it is created, attaches listeners or patches socket methods, and returns a cleanup function. Push custom policies onto defaultSocketPolicies before any sockets are opened.
import {writable} from 'svelte/store'
import {on} from '@welshman/lib'
import {defaultSocketPolicies, SocketEvent, isRelayEvent} from '@welshman/net'
import type {Socket, RelayMessage} from '@welshman/net'
// Track how many events each relay has delivered this session
export const eventCountByRelay = writable<Record<string, number>>({})
const eventCountPolicy = (socket: Socket) => {
const unsub = on(socket, SocketEvent.Receive, (message: RelayMessage) => {
if (isRelayEvent(message)) {
eventCountByRelay.update(counts => ({
...counts,
[socket.url]: (counts[socket.url] ?? 0) + 1,
}))
}
})
return unsub // called when the socket is destroyed
}
defaultSocketPolicies.push(eventCountPolicy)
The same structure applies to more advanced patterns — patch socket.open to block connections, listen to SocketEvent.Sending/SocketEvent.Receiving to intercept messages before they are processed, or manipulate socket._recvQueue directly to suppress or replay messages.
Custom adapter (e.g. non-WebSocket backend)
import {AbstractAdapter, AdapterEvent, request} from '@welshman/net'
import type {ClientMessage} from '@welshman/net'
class MyAdapter extends AbstractAdapter {
constructor(private url: string) {
super()
// set up your transport here
}
get urls() { return [this.url] }
get sockets() { return [] }
send(message: ClientMessage) {
// forward message to your backend; call this.emit(AdapterEvent.Receive, replyMsg, this.url) when data arrives
}
}
request({
relays: ['myscheme://some-id'],
filters: [{kinds: [1]}],
autoClose: true,
context: {
getAdapter: (url) => url.startsWith('myscheme://') ? new MyAdapter(url) : undefined,
},
})
Use LOCAL_RELAY_URL to read/write the local repository
Pass LOCAL_RELAY_URL as a relay to the standard net functions so local operations go through the same pipeline as remote ones (policies, deduplication, tracker):
import {load, publish, request, LOCAL_RELAY_URL} from '@welshman/net'
import {now} from '@welshman/lib'
// Read from the local repository the same way you'd read from a remote relay
const events = await load({
relays: [LOCAL_RELAY_URL],
filters: [{kinds: [1], authors: ['<pubkey>'], limit: 20}],
})
// Write to the local repository (and any remote relays) in one call
await publish({
event: signedEvent,
relays: [LOCAL_RELAY_URL, 'wss://relay.example.com'],
})
// Subscribe to new local events in real time
request({
relays: [LOCAL_RELAY_URL],
filters: [{kinds: [1], since: now()}],
onEvent: (event) => console.log('new local event', event.id),
})
Direct repository API calls (repository.load(), repository.getEvent(), repository.isDeleted(), repository.dump()) are still appropriate for bulk startup and low-level introspection — but for routine reads and writes prefer LOCAL_RELAY_URL.
Startup: bulk-load persisted events (skip re-verification)
import {Repository} from '@welshman/net'
import {verifiedSymbol} from '@welshman/util'
import type {TrustedEvent} from '@welshman/util'
const repo = Repository.get()
// Mark events as already-verified so welshman skips signature checks
const storedEvents: TrustedEvent[] = await loadFromStorage()
for (const event of storedEvents) {
event[verifiedSymbol] = true
}
// Replaces all in-memory events in one pass; emits a single "update"
repo.load(storedEvents)
Startup: bulk-load Tracker state
import {tracker} from '@welshman/app' // singleton wired to the pool and repository
// Build the map from your stored relay<->event mappings
const relaysById = new Map<string, Set<string>>()
for (const {id, relays} of storedTrackerItems) {
if (repo.getEvent(id)) { // skip orphaned entries
relaysById.set(id, new Set(relays))
}
}
// Takes Map<string, Set<string>> — same shape as tracker.relaysById
tracker.load(relaysById)
Persist repository changes to IndexedDB (canonical pattern)
import {on, batch} from '@welshman/lib'
import {repository} from '@welshman/app' // singleton; or Repository.get() standalone
import type {RepositoryUpdate} from '@welshman/net'
import type {TrustedEvent} from '@welshman/util'
// batch(ms, fn) collects all "update" events fired within `ms` and calls fn once
on(
repository,
'update',
batch(3000, async (updates: RepositoryUpdate[]) => {
const toAdd: TrustedEvent[] = []
const toRemove = new Set<string>()
for (const {added, removed} of updates) {
for (const event of added) toAdd.push(event)
for (const id of removed) toRemove.add(id)
}
const tx = db.transaction('events', 'readwrite')
await Promise.all([
...toAdd.map(e => tx.store.put(e)),
...Array.from(toRemove).map(id => tx.store.delete(id)),
tx.done,
])
}),
)
Integration Notes
@welshman/util— providesTrustedEvent,SignedEvent,Filter,verifyEvent,matchFilters,getAddress, etc. All event objects flowing through@welshman/netareTrustedEvent(already verified).@welshman/lib— utility helpers (Emitter,batcher,defer,on, etc.) used internally;Emitter(from@welshman/lib) is the base class forTracker,Repository, andWrapManager.Socket,AuthState,AbstractAdapter, andDifferenceextend node's built-inEventEmitterdirectly.@welshman/app— wraps@welshman/netwith reactive Svelte stores, a router, and higher-level helpers. Most app-level code should use@welshman/app; drop down to@welshman/netonly for raw relay I/O or when building non-Svelte clients.netContext— shared singleton used as the default byrequest,requestOne, and the repository. Override fields onnetContextat startup, or pass acontextobject per-call to isolate behavior.
Gotchas & Tips
-
Use
LOCAL_RELAY_URL, not direct repository calls, for routine reads/writes. PassingLOCAL_RELAY_URLtoload(),publish(), orrequest()routes through the normal net pipeline (policies, deduplication, tracker). Callingrepository.query()/repository.publish()directly bypasses all of that. Reserve the direct API for bulk startup (repository.load()), introspection (getEvent,isDeleted,dump), and listening to"update"events. -
request()withoutautoCloseorsignalnever resolves. Always passautoClose: trueor anAbortSignalwhen you just want a one-shot fetch. Useload()for the common case. -
load()setsautoClose: trueinternally and uses a 0.5 relay threshold; it resolves when half the relays' subscriptions have closed (typically after EOSE, timeout, or disconnect) — useful when some relays are slow or offline. -
Relay URL normalization happens inside
Pool.get(url)vianormalizeRelayUrl. Pass raw URLs everywhere; the pool handles canonicalization. -
defaultSocketPoliciesis mutable. Push policies before any sockets are created. Sockets created before a policy is pushed will not have it applied. -
socketPolicyCloseInactiveonly replays pending work on unexpected close. It reopens and replays queued messages when a socket closes while work is pending — it does not proactively open sockets when new work is queued (that issocketPolicyConnectOnSend's job). Afterpool.remove(url)the socket is cleaned up including its policy listeners, sosocketPolicyCloseInactivecan no longer reopen it. -
Pool.get(url)lazily creates a new socket on every call afterpool.remove(url). Callingpool.remove(url)forgets the URL and cleans up the socket — any subsequentpool.get(url)will construct a fresh socket. Callpool.remove()only when you want the pool to forget the URL entirely, not merely to disconnect temporarily. -
Trackeris shared across relays inrequest(). This meansonDuplicatefires for events received from more than one relay — expected behavior for cross-relay deduplication. -
Repository.publish()returnsfalsefor stale replaceable events. If a newer version of a replaceable event is already stored, the older one is silently dropped. -
WrapManagerstores the decrypted rumor in theRepositoryand copies relay tracking from the gift-wrap event id to the rumor id. Keep a reference to theWrapManagerinstance alongside yourRepositoryandTrackersingletons. -
makeSocketPolicyAuthrequires asignfunction that returns aPromise<SignedEvent>. If the user cancels signing, have thesignfunction throw or reject;doAuthwill catch the failure viatryCatchand automatically transition toAuthStatus.DeniedSignature, preventing infinite retry loops. -
Each filter in
filtersarray generates a separate REQ insiderequestOne. For large filter arrays consider merging them withunionFiltersfrom@welshman/utilbefore callingrequest. -
repository.load()replaces all events, not appends. It clears internal indexes first, then re-inserts every event. Emit a single batched"update"diff — do not call it repeatedly for incremental updates; userepository.publish(event)for that. -
RepositoryUpdate.removedisSet<string>, not an array. Iterate withfor...oforArray.from(removed). Thebatch()helper from@welshman/libdelivers updates asRepositoryUpdate[]to your flush callback — merge them yourself or usemergeRepositoryUpdates. -
tracker.load()takesMap<string, Set<string>>(the same type astracker.relaysById). Load it afterrepository.load()so you can filter out orphaned event ids.