Add legacy alternatives to sync
This commit is contained in:
+104
-36
@@ -1,56 +1,66 @@
|
|||||||
import {ctx, pushToMapKey, inc, flatten, chunk} from '@welshman/lib'
|
import {ctx, groupBy, now, pushToMapKey, inc, flatten, chunk} from '@welshman/lib'
|
||||||
import type {SignedEvent, TrustedEvent, Filter} from '@welshman/util'
|
import type {SignedEvent, TrustedEvent, Filter} from '@welshman/util'
|
||||||
import type {NegentropyMessage} from './Executor'
|
|
||||||
import {subscribe} from './Subscribe'
|
import {subscribe} from './Subscribe'
|
||||||
import {publish} from './Publish'
|
import {publish} from './Publish'
|
||||||
|
|
||||||
export type DiffOneOpts = {
|
export type DiffOpts = {
|
||||||
relay: string
|
|
||||||
filter: Filter
|
|
||||||
events: TrustedEvent[]
|
|
||||||
}
|
|
||||||
|
|
||||||
export const diffOne = ({relay, filter, events}: DiffOneOpts) => {
|
|
||||||
const executor = ctx.net.getExecutor([relay])
|
|
||||||
const have = new Set<string>()
|
|
||||||
const need = new Set<string>()
|
|
||||||
|
|
||||||
return new Promise<NegentropyMessage>((resolve, reject) => {
|
|
||||||
executor.diff(filter, events, {
|
|
||||||
onClose: () => resolve({have: Array.from(have), need: Array.from(need)}),
|
|
||||||
onError: (_, message) => reject(message),
|
|
||||||
onMessage: (_, message) => {
|
|
||||||
for (const id of message.have) {
|
|
||||||
have.add(id)
|
|
||||||
}
|
|
||||||
|
|
||||||
for (const id of message.need) {
|
|
||||||
need.add(id)
|
|
||||||
}
|
|
||||||
},
|
|
||||||
})
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
export type DiffAllOpts = {
|
|
||||||
relays: string[]
|
relays: string[]
|
||||||
filters: Filter[]
|
filters: Filter[]
|
||||||
events: TrustedEvent[]
|
events: TrustedEvent[]
|
||||||
}
|
}
|
||||||
|
|
||||||
export const diffAll = async ({relays, filters, events}: DiffAllOpts) =>
|
export const diff = async ({relays, filters, events}: DiffOpts) => {
|
||||||
flatten(
|
const diffs = flatten(
|
||||||
await Promise.all(
|
await Promise.all(
|
||||||
relays.flatMap(async relay => {
|
relays.flatMap(async relay => {
|
||||||
return await Promise.all(
|
return await Promise.all(
|
||||||
filters.map(async filter => {
|
filters.map(async filter => {
|
||||||
return {relay, ...await diffOne({relay, filter, events})}
|
const executor = ctx.net.getExecutor([relay])
|
||||||
|
const have = new Set<string>()
|
||||||
|
const need = new Set<string>()
|
||||||
|
|
||||||
|
await new Promise<void>((resolve, reject) => {
|
||||||
|
executor.diff(filter, events, {
|
||||||
|
onClose: resolve,
|
||||||
|
onError: (_, message) => reject(message),
|
||||||
|
onMessage: (_, message) => {
|
||||||
|
for (const id of message.have) {
|
||||||
|
have.add(id)
|
||||||
|
}
|
||||||
|
|
||||||
|
for (const id of message.need) {
|
||||||
|
need.add(id)
|
||||||
|
}
|
||||||
|
},
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
return {relay, have, need}
|
||||||
})
|
})
|
||||||
)
|
)
|
||||||
})
|
})
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
return Array.from(groupBy(diff => diff.relay, diffs).entries())
|
||||||
|
.map(([relay, diffs]) => {
|
||||||
|
const have = new Set<string>()
|
||||||
|
const need = new Set<string>()
|
||||||
|
|
||||||
|
for (const diff of diffs) {
|
||||||
|
for (const id of diff.have) {
|
||||||
|
have.add(id)
|
||||||
|
}
|
||||||
|
|
||||||
|
for (const id of diff.need) {
|
||||||
|
need.add(id)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return {relay, have: Array.from(have), need: Array.from(need)}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
export type PullOpts = {
|
export type PullOpts = {
|
||||||
relays: string[]
|
relays: string[]
|
||||||
filters: Filter[]
|
filters: Filter[]
|
||||||
@@ -62,7 +72,7 @@ export const pull = async ({relays, filters, events, onEvent}: PullOpts) => {
|
|||||||
const countById = new Map<string, number>()
|
const countById = new Map<string, number>()
|
||||||
const idsByRelay = new Map<string, string[]>()
|
const idsByRelay = new Map<string, string[]>()
|
||||||
|
|
||||||
for (const {relay, need} of await diffAll({relays, filters, events})) {
|
for (const {relay, need} of await diff({relays, filters, events})) {
|
||||||
for (const id of need) {
|
for (const id of need) {
|
||||||
const count = countById.get(id) || 0
|
const count = countById.get(id) || 0
|
||||||
|
|
||||||
@@ -110,7 +120,7 @@ export type PushOpts = {
|
|||||||
export const push = async ({relays, filters, events}: PushOpts) => {
|
export const push = async ({relays, filters, events}: PushOpts) => {
|
||||||
const relaysById = new Map<string, string[]>()
|
const relaysById = new Map<string, string[]>()
|
||||||
|
|
||||||
for (const {relay, have} of await diffAll({relays, filters, events})) {
|
for (const {relay, have} of await diff({relays, filters, events})) {
|
||||||
for (const id of have) {
|
for (const id of have) {
|
||||||
pushToMapKey(relaysById, id, relay)
|
pushToMapKey(relaysById, id, relay)
|
||||||
}
|
}
|
||||||
@@ -137,3 +147,61 @@ export const sync = async (opts: SyncOpts) => {
|
|||||||
await pull(opts)
|
await pull(opts)
|
||||||
await push(opts)
|
await push(opts)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Legacy alternatives for use with relays that don't support negentropy
|
||||||
|
|
||||||
|
export type PullWithoutNegentropyOpts = {
|
||||||
|
relays: string[]
|
||||||
|
filters: Filter[]
|
||||||
|
onEvent?: (event: TrustedEvent) => void
|
||||||
|
}
|
||||||
|
|
||||||
|
export const pullWithoutNegentropy = async ({relays, filters, onEvent}: PullWithoutNegentropyOpts) => {
|
||||||
|
let done = false
|
||||||
|
let until = now() + 30
|
||||||
|
|
||||||
|
const result: TrustedEvent[] = []
|
||||||
|
|
||||||
|
while (!done) {
|
||||||
|
let anyResults = false
|
||||||
|
|
||||||
|
await new Promise<void>(resolve => {
|
||||||
|
subscribe({
|
||||||
|
relays,
|
||||||
|
filters: filters
|
||||||
|
.filter(filter => filter.since && filter.since > until)
|
||||||
|
.map(filter => ({...filter, until})),
|
||||||
|
closeOnEose: true,
|
||||||
|
onClose: () => {
|
||||||
|
done = !anyResults
|
||||||
|
resolve()
|
||||||
|
},
|
||||||
|
onEvent: event => {
|
||||||
|
anyResults = true
|
||||||
|
until = Math.min(until, event.created_at - 1)
|
||||||
|
result.push(event)
|
||||||
|
onEvent?.(event)
|
||||||
|
},
|
||||||
|
})
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
|
||||||
|
export type PushWithoutNegentropyOpts = {
|
||||||
|
relays: string[]
|
||||||
|
events: SignedEvent[]
|
||||||
|
}
|
||||||
|
|
||||||
|
export const pushWithoutNegentropy = ({relays, events}: PushWithoutNegentropyOpts) =>
|
||||||
|
Promise.all(
|
||||||
|
events.map(async event => {
|
||||||
|
await publish({event, relays}).result
|
||||||
|
})
|
||||||
|
)
|
||||||
|
|
||||||
|
export const syncWithoutNegentropy = async (opts: SyncOpts) => {
|
||||||
|
await pullWithoutNegentropy(opts)
|
||||||
|
await pushWithoutNegentropy(opts)
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user