From d1730e74a227fa965b627c4065061868371350ed Mon Sep 17 00:00:00 2001 From: Jon Staab Date: Thu, 26 Sep 2024 17:36:08 -0700 Subject: [PATCH] Add sync util --- packages/net/src/Executor.ts | 13 +++- packages/net/src/Sync.ts | 139 +++++++++++++++++++++++++++++++++++ packages/net/src/index.ts | 1 + 3 files changed, 149 insertions(+), 4 deletions(-) create mode 100644 packages/net/src/Sync.ts diff --git a/packages/net/src/Executor.ts b/packages/net/src/Executor.ts index 1ed0d21..83758c8 100644 --- a/packages/net/src/Executor.ts +++ b/packages/net/src/Executor.ts @@ -1,6 +1,6 @@ import {ctx} from '@welshman/lib' import type {Emitter} from '@welshman/lib' -import type {SignedEvent, Filter} from '@welshman/util' +import type {SignedEvent, TrustedEvent, Filter} from '@welshman/util' import type {Message} from './Socket' import type {Connection} from './Connection' import {Negentropy, NegentropyStorageVector} from './Negentropy' @@ -11,7 +11,12 @@ export type Target = Emitter & { cleanup: () => void } -type EventCallback = (url: string, event: SignedEvent) => void +export type NegentropyMessage = { + have: string[] + need: string[] +} + +type EventCallback = (url: string, event: TrustedEvent) => void type EoseCallback = (url: string) => void type CloseCallback = () => void type OkCallback = (url: string, id: string, ...extra: any[]) => void @@ -35,7 +40,7 @@ export class Executor { const id = createSubId('REQ') - const eventListener = (url: string, subid: string, e: SignedEvent) => { + const eventListener = (url: string, subid: string, e: TrustedEvent) => { if (subid === id) { ctx.net.onEvent(url, e) onEvent?.(url, e) @@ -91,7 +96,7 @@ export class Executor { } } - diff(filter: Filter, events: SignedEvent[], {onMessage, onError, onClose}: DiffOpts = {}) { + diff(filter: Filter, events: TrustedEvent[], {onMessage, onError, onClose}: DiffOpts = {}) { let closed = false const id = createSubId('NEG') diff --git a/packages/net/src/Sync.ts b/packages/net/src/Sync.ts new file mode 100644 index 0000000..d162aa7 --- /dev/null +++ b/packages/net/src/Sync.ts @@ -0,0 +1,139 @@ +import {ctx, pushToMapKey, inc, flatten, chunk} from '@welshman/lib' +import type {SignedEvent, TrustedEvent, Filter} from '@welshman/util' +import type {NegentropyMessage} from './Executor' +import {subscribe} from './Subscribe' +import {publish} from './Publish' + +export type DiffOneOpts = { + relay: string + filter: Filter + events: TrustedEvent[] +} + +export const diffOne = ({relay, filter, events}: DiffOneOpts) => { + const executor = ctx.net.getExecutor([relay]) + const have = new Set() + const need = new Set() + + return new Promise((resolve, reject) => { + executor.diff(filter, events, { + onClose: () => resolve({have: Array.from(have), need: Array.from(need)}), + onError: (_, message) => reject(message), + onMessage: (_, message) => { + for (const id of message.have) { + have.add(id) + } + + for (const id of message.need) { + need.add(id) + } + }, + }) + }) +} + +export type DiffAllOpts = { + relays: string[] + filters: Filter[] + events: TrustedEvent[] +} + +export const diffAll = async ({relays, filters, events}: DiffAllOpts) => + flatten( + await Promise.all( + relays.flatMap(async relay => { + return await Promise.all( + filters.map(async filter => { + return {relay, ...await diffOne({relay, filter, events})} + }) + ) + }) + ) + ) + +export type PullOpts = { + relays: string[] + filters: Filter[] + events: TrustedEvent[] + onEvent?: (event: TrustedEvent) => void +} + +export const pull = async ({relays, filters, events, onEvent}: PullOpts) => { + const countById = new Map() + const idsByRelay = new Map() + + for (const {relay, need} of await diffAll({relays, filters, events})) { + for (const id of need) { + const count = countById.get(id) || 0 + + // Reduce, but don't completely eliminate duplicates, just in case a relay + // won't give us what we ask for. + if (count < 2) { + pushToMapKey(idsByRelay, relay, id) + countById.set(id, inc(count)) + } + } + } + + const result: TrustedEvent[] = [] + + await Promise.all( + Array.from(idsByRelay.entries()).map(([relay, allIds]) => { + return Promise.all( + chunk(1024, allIds).map(ids => { + return new Promise(resolve => { + subscribe({ + relays: [relay], + filters: [{ids}], + closeOnEose: true, + onClose: resolve, + onEvent: event => { + result.push(event) + onEvent?.(event) + }, + }) + }) + }) + ) + }) + ) + + return result +} + +export type PushOpts = { + relays: string[] + filters: Filter[] + events: SignedEvent[] +} + +export const push = async ({relays, filters, events}: PushOpts) => { + const relaysById = new Map() + + for (const {relay, have} of await diffAll({relays, filters, events})) { + for (const id of have) { + pushToMapKey(relaysById, id, relay) + } + } + + await Promise.all( + events.map(async event => { + const relays = relaysById.get(event.id) + + if (relays) { + await publish({event, relays}).result + } + }) + ) +} + +export type SyncOpts = { + relays: string[] + filters: Filter[] + events: SignedEvent[] +} + +export const sync = async (opts: SyncOpts) => { + await pull(opts) + await push(opts) +} diff --git a/packages/net/src/index.ts b/packages/net/src/index.ts index a1016c7..355e058 100644 --- a/packages/net/src/index.ts +++ b/packages/net/src/index.ts @@ -6,6 +6,7 @@ export * from "./Pool" export * from "./Publish" export * from "./Socket" export * from "./Subscribe" +export * from "./Sync" export * from "./Tracker" export * from "./target/Echo" export * from "./target/Multi"