Files
welshman/packages/net/__tests__/Sync.test.ts
T
2025-02-25 14:34:49 -08:00

274 lines
7.2 KiB
TypeScript

import {diff, pull, push, sync, pullWithoutNegentropy, pushWithoutNegentropy} from "../src/Sync"
import {ctx, now} from "@welshman/lib"
import type {SignedEvent, TrustedEvent, Filter} from "@welshman/util"
import {vi, describe, it, expect, beforeEach} from "vitest"
import {subscribe} from "../src/Subscribe"
import {publish} from "../src/Publish"
// Mock dependencies
vi.mock("../src/Subscribe", () => ({
subscribe: vi.fn(),
}))
vi.mock("../src/Publish", () => ({
publish: vi.fn(),
}))
vi.mock("@welshman/lib", async importOriginal => {
return {
...(await importOriginal()),
now: vi.fn().mockReturnValue(1000),
}
})
describe("Sync", () => {
let mockExecutor: any
let mockDiffSub: any
beforeEach(() => {
vi.clearAllMocks()
mockDiffSub = {unsubscribe: vi.fn()}
mockExecutor = {
diff: vi.fn().mockImplementation((filter, events, {onMessage, onClose}) => {
// Simulate diff message
onMessage("relay1", {have: ["id1"], need: ["id2"]})
onClose()
return mockDiffSub
}),
target: {
cleanup: vi.fn(),
},
}
ctx.net = {
...ctx.net,
getExecutor: vi.fn().mockReturnValue(mockExecutor),
}
// Mock subscribe to simulate event reception
vi.mocked(subscribe).mockImplementation(({onEvent, onClose, onComplete}) => {
if (onEvent) {
onEvent({id: "id2", created_at: 900} as TrustedEvent)
}
onClose?.("relay1")
onComplete?.()
return {close: vi.fn()}
})
// Mock publish to return resolved result
vi.mocked(publish).mockImplementation(() => ({
result: Promise.resolve(new Map()),
id: "pub1",
created_at: 1000,
emitter: {} as any,
request: {} as any,
status: new Map(),
}))
})
describe("diff", () => {
it("should aggregate diff results by relay", async () => {
const result = await diff({
relays: ["relay1", "relay2"],
filters: [{kinds: [1]}],
events: [{id: "id1"} as TrustedEvent],
})
expect(result).toEqual([
{
relay: "relay1",
have: ["id1"],
need: ["id2"],
},
{
relay: "relay2",
have: ["id1"],
need: ["id2"],
},
])
})
it("should handle multiple filters", async () => {
const result = await diff({
relays: ["relay1"],
filters: [{kinds: [1]}, {kinds: [2]}],
events: [{id: "id1"} as TrustedEvent],
})
expect(mockExecutor.diff).toHaveBeenCalledTimes(2)
})
it("should handle diff errors", async () => {
mockExecutor.diff.mockImplementation((filter, events, {onError}) => {
onError("relay1", "error message")
return mockDiffSub
})
await expect(
diff({
relays: ["relay1"],
filters: [{kinds: [1]}],
events: [],
}),
).rejects.toEqual("error message")
})
})
describe("pull", () => {
it("should pull needed events", async () => {
const onEvent = vi.fn()
const result = await pull({
relays: ["relay1"],
filters: [{kinds: [1]}],
events: [],
onEvent,
})
expect(result).toHaveLength(1)
expect(result[0].id).toBe("id2")
expect(onEvent).toHaveBeenCalled()
})
it("should limit duplicate pulls", async () => {
// Mock diff to return same need from multiple relays
mockExecutor.diff.mockImplementation((filter, events, {onMessage, onClose}) => {
onMessage("relay1", {have: [], need: ["id2"]})
onClose()
return mockDiffSub
})
await pull({
relays: ["relay1", "relay2", "relay3"],
filters: [{kinds: [1]}],
events: [],
})
// Should only subscribe maximum twice for the same ID
expect(subscribe).toHaveBeenCalledTimes(2)
})
it("should chunk large ID lists", async () => {
const manyIds = Array.from({length: 2000}, (_, i) => `id${i}`)
mockExecutor.diff.mockImplementation((filter, events, {onMessage, onClose}) => {
onMessage("relay1", {have: [], need: manyIds})
onClose()
return mockDiffSub
})
await pull({
relays: ["relay1"],
filters: [{kinds: [1]}],
events: [],
})
// Should split into chunks of 1024
expect(subscribe).toHaveBeenCalledTimes(2)
})
})
describe("push", () => {
it("should push events to relays that have them", async () => {
await push({
relays: ["relay1"],
filters: [{kinds: [1]}],
events: [{id: "id1"} as SignedEvent],
})
expect(publish).toHaveBeenCalledWith({
event: expect.any(Object),
relays: ["relay1"],
})
})
it("should skip events with no matching relays", async () => {
mockExecutor.diff.mockImplementation((filter, events, {onMessage, onClose}) => {
onMessage("relay1", {have: [], need: []})
onClose()
return mockDiffSub
})
await push({
relays: ["relay1"],
filters: [{kinds: [1]}],
events: [{id: "id1"} as SignedEvent],
})
expect(publish).not.toHaveBeenCalled()
})
})
describe("sync", () => {
it("should perform pull and push operations", async () => {
await sync({
relays: ["relay1"],
filters: [{kinds: [1]}],
events: [{id: "id1"} as SignedEvent],
})
expect(subscribe).toHaveBeenCalled()
expect(publish).toHaveBeenCalled()
})
})
describe("pullWithoutNegentropy", () => {
it("should pull events until no more results", async () => {
let callCount = 0
vi.mocked(subscribe).mockImplementation(({onEvent, onComplete}) => {
if (callCount++ < 2) {
onEvent?.({id: `id${callCount}`, created_at: 900} as TrustedEvent)
}
onComplete?.()
return {close: vi.fn()}
})
const result = await pullWithoutNegentropy({
relays: ["relay1"],
filters: [{kinds: [1]}],
})
expect(result).toHaveLength(2)
expect(subscribe).toHaveBeenCalledTimes(3) // 2 with results + 1 final check
})
it("should update until timestamp based on events", async () => {
let callCount = 0
vi.mocked(subscribe).mockImplementation(({onEvent, onComplete}) => {
if (!callCount) {
onEvent?.({id: "id1", created_at: 500} as TrustedEvent)
callCount++
}
onComplete?.()
return {close: vi.fn()}
})
await pullWithoutNegentropy({
relays: ["relay1"],
filters: [{kinds: [1]}],
})
// Second subscription should use updated until
expect(subscribe).toHaveBeenLastCalledWith(
expect.objectContaining({
filters: expect.arrayContaining([expect.objectContaining({until: 499})]),
}),
)
})
})
describe("pushWithoutNegentropy", () => {
it("should push all events to all relays", async () => {
await pushWithoutNegentropy({
relays: ["relay1", "relay2"],
events: [{id: "id1"} as SignedEvent, {id: "id2"} as SignedEvent],
})
expect(publish).toHaveBeenCalledTimes(2)
expect(publish).toHaveBeenCalledWith({
event: expect.any(Object),
relays: ["relay1", "relay2"],
})
})
})
})