From 0c2e58811b8e2b2c1e7accbf2d60a5a5038c0695 Mon Sep 17 00:00:00 2001 From: Jon Staab Date: Fri, 27 Sep 2024 08:59:53 -0700 Subject: [PATCH] Add legacy alternatives to sync --- packages/net/src/Sync.ts | 140 +++++++++++++++++++++++++++++---------- 1 file changed, 104 insertions(+), 36 deletions(-) diff --git a/packages/net/src/Sync.ts b/packages/net/src/Sync.ts index d162aa7..b53e802 100644 --- a/packages/net/src/Sync.ts +++ b/packages/net/src/Sync.ts @@ -1,56 +1,66 @@ -import {ctx, pushToMapKey, inc, flatten, chunk} from '@welshman/lib' +import {ctx, groupBy, now, pushToMapKey, inc, flatten, chunk} from '@welshman/lib' import type {SignedEvent, TrustedEvent, Filter} from '@welshman/util' -import type {NegentropyMessage} from './Executor' import {subscribe} from './Subscribe' import {publish} from './Publish' -export type DiffOneOpts = { - relay: string - filter: Filter - events: TrustedEvent[] -} - -export const diffOne = ({relay, filter, events}: DiffOneOpts) => { - const executor = ctx.net.getExecutor([relay]) - const have = new Set() - const need = new Set() - - return new Promise((resolve, reject) => { - executor.diff(filter, events, { - onClose: () => resolve({have: Array.from(have), need: Array.from(need)}), - onError: (_, message) => reject(message), - onMessage: (_, message) => { - for (const id of message.have) { - have.add(id) - } - - for (const id of message.need) { - need.add(id) - } - }, - }) - }) -} - -export type DiffAllOpts = { +export type DiffOpts = { relays: string[] filters: Filter[] events: TrustedEvent[] } -export const diffAll = async ({relays, filters, events}: DiffAllOpts) => - flatten( +export const diff = async ({relays, filters, events}: DiffOpts) => { + const diffs = flatten( await Promise.all( relays.flatMap(async relay => { return await Promise.all( filters.map(async filter => { - return {relay, ...await diffOne({relay, filter, events})} + const executor = ctx.net.getExecutor([relay]) + const have = new Set() + const need = new Set() + + await new Promise((resolve, reject) => { + executor.diff(filter, events, { + onClose: resolve, + onError: (_, message) => reject(message), + onMessage: (_, message) => { + for (const id of message.have) { + have.add(id) + } + + for (const id of message.need) { + need.add(id) + } + }, + }) + }) + + return {relay, have, need} }) ) }) ) ) + return Array.from(groupBy(diff => diff.relay, diffs).entries()) + .map(([relay, diffs]) => { + const have = new Set() + const need = new Set() + + for (const diff of diffs) { + for (const id of diff.have) { + have.add(id) + } + + for (const id of diff.need) { + need.add(id) + } + } + + return {relay, have: Array.from(have), need: Array.from(need)} + }) +} + export type PullOpts = { relays: string[] filters: Filter[] @@ -62,7 +72,7 @@ export const pull = async ({relays, filters, events, onEvent}: PullOpts) => { const countById = new Map() const idsByRelay = new Map() - for (const {relay, need} of await diffAll({relays, filters, events})) { + for (const {relay, need} of await diff({relays, filters, events})) { for (const id of need) { const count = countById.get(id) || 0 @@ -110,7 +120,7 @@ export type PushOpts = { export const push = async ({relays, filters, events}: PushOpts) => { const relaysById = new Map() - for (const {relay, have} of await diffAll({relays, filters, events})) { + for (const {relay, have} of await diff({relays, filters, events})) { for (const id of have) { pushToMapKey(relaysById, id, relay) } @@ -137,3 +147,61 @@ export const sync = async (opts: SyncOpts) => { await pull(opts) await push(opts) } + +// Legacy alternatives for use with relays that don't support negentropy + +export type PullWithoutNegentropyOpts = { + relays: string[] + filters: Filter[] + onEvent?: (event: TrustedEvent) => void +} + +export const pullWithoutNegentropy = async ({relays, filters, onEvent}: PullWithoutNegentropyOpts) => { + let done = false + let until = now() + 30 + + const result: TrustedEvent[] = [] + + while (!done) { + let anyResults = false + + await new Promise(resolve => { + subscribe({ + relays, + filters: filters + .filter(filter => filter.since && filter.since > until) + .map(filter => ({...filter, until})), + closeOnEose: true, + onClose: () => { + done = !anyResults + resolve() + }, + onEvent: event => { + anyResults = true + until = Math.min(until, event.created_at - 1) + result.push(event) + onEvent?.(event) + }, + }) + }) + } + + return result +} + +export type PushWithoutNegentropyOpts = { + relays: string[] + events: SignedEvent[] +} + +export const pushWithoutNegentropy = ({relays, events}: PushWithoutNegentropyOpts) => + Promise.all( + events.map(async event => { + await publish({event, relays}).result + }) + ) + +export const syncWithoutNegentropy = async (opts: SyncOpts) => { + await pullWithoutNegentropy(opts) + await pushWithoutNegentropy(opts) +}