From bc94c705f3123a17fdc8dbd723a66a370339cd1c Mon Sep 17 00:00:00 2001 From: Jon Staab Date: Wed, 25 Mar 2026 14:36:48 -0700 Subject: [PATCH] Make space syncing more robust --- src/app/core/sync.ts | 141 +++++++++++++++++++++---------------------- 1 file changed, 70 insertions(+), 71 deletions(-) diff --git a/src/app/core/sync.ts b/src/app/core/sync.ts index dd48dd3a..32bb33a8 100644 --- a/src/app/core/sync.ts +++ b/src/app/core/sync.ts @@ -1,7 +1,7 @@ import {page} from "$app/stores" import type {Unsubscriber} from "svelte/store" import {derived, get} from "svelte/store" -import {last, call, assoc, chunk, sleep, identity, WEEK, ago} from "@welshman/lib" +import {last, call, ifLet, assoc, chunk, sleep, identity, WEEK, ago} from "@welshman/lib" import { getListTags, getRelayTagValues, @@ -13,14 +13,14 @@ import { ROOM_ADD_MEMBER, ROOM_REMOVE_MEMBER, ROOM_CREATE_PERMISSION, - ROOM_JOIN, RELAY_MEMBERS, RELAY_ADD_MEMBER, RELAY_REMOVE_MEMBER, isSignedEvent, unionFilters, + getTagValue, } from "@welshman/util" -import type {Filter} from "@welshman/util" +import type {Filter, TrustedEvent} from "@welshman/util" import {request, requestOne, Difference, DifferenceEvent} from "@welshman/net" import { pubkey, @@ -64,12 +64,24 @@ type SyncOpts = { url: string signal: AbortSignal filters: Filter[] + onEvent?: (event: TrustedEvent) => void } -const pullOneWithFallback = async (url: string, filter: Filter, signal: AbortSignal) => { +const pullOneWithFallback = async ( + url: string, + filter: Filter, + signal: AbortSignal, + onEvent?: (event: TrustedEvent) => void, +) => { const cachedEvents = repository.query([filter]).filter(isSignedEvent) const since = last(cachedEvents.slice(10))?.created_at || 0 + if (onEvent) { + for (const event of cachedEvents) { + onEvent(event) + } + } + const shouldFallback = !hasNegentropy(url) || (await new Promise(resolve => { @@ -81,7 +93,7 @@ const pullOneWithFallback = async (url: string, filter: Filter, signal: AbortSig diff.on(DifferenceEvent.Close, () => { for (const ids of chunk(100, Array.from(diff.need))) { - requestOne({relay: url, signal, autoClose: true, filters: [{ids}]}) + requestOne({relay: url, signal, autoClose: true, filters: [{ids}], onEvent}) } resolve(false) @@ -89,29 +101,29 @@ const pullOneWithFallback = async (url: string, filter: Filter, signal: AbortSig })) if (shouldFallback && !signal.aborted) { - request({relays: [url], signal, autoClose: true, filters: [{...filter, since}]}) + request({relays: [url], signal, autoClose: true, filters: [{since, ...filter}], onEvent}) } } -export const pullWithFallback = async ({url, signal, filters}: SyncOpts) => { +export const pullWithFallback = async ({url, signal, filters, onEvent}: SyncOpts) => { await loadRelay(url) if (signal.aborted) return for (const filter of filters) { - pullOneWithFallback(url, filter, signal) + pullOneWithFallback(url, filter, signal, onEvent) } } -const listen = ({url, signal, filters}: SyncOpts) => { +const listen = ({url, signal, filters, onEvent}: SyncOpts) => { const relays = [url] - request({relays, signal, filters: unionFilters(filters.map(assoc("limit", 0)))}) + request({relays, signal, filters: unionFilters(filters.map(assoc("limit", 0))), onEvent}) } -const pullAndListen = ({url, filters, signal}: SyncOpts) => { - pullWithFallback({url, signal, filters}) - listen({url, signal, filters}) +const pullAndListen = (options: SyncOpts) => { + pullWithFallback(options) + listen(options) } // Relays @@ -256,68 +268,55 @@ const syncUserData = () => { // Spaces const syncSpace = (url: string, rooms: string[]) => { + const since = ago(WEEK) + const seen = new Set() const controller = new AbortController() - // Relay-level kinds don't need #h tags - pullAndListen({ - url, - signal: controller.signal, - filters: [{kinds: [RELAY_MEMBERS, RELAY_ADD_MEMBER, RELAY_REMOVE_MEMBER]}], - }) - - // Room metadata uses #d tags, not #h, so no filtering needed - pullAndListen({ - url, - signal: controller.signal, - filters: [{kinds: [ROOM_META, ROOM_ADMINS, ROOM_MEMBERS, LIVEKIT_PARTICIPANTS, ROOM_JOIN]}], - }) - - // Room-scoped kinds: add #h tags when we know which rooms the user is in. - // This avoids sending broad filters that picky relays reject. - const roomKinds = [ROOM_DELETE, ROOM_ADD_MEMBER, ROOM_REMOVE_MEMBER] - const since = ago(WEEK) - - if (rooms.length > 0) { - pullAndListen({ - url, - signal: controller.signal, - filters: [{kinds: roomKinds, "#h": rooms}], - }) - - pullAndListen({ - url, - signal: controller.signal, - filters: [ - {kinds: MESSAGE_KINDS, "#h": rooms, since}, - makeCommentFilter(CONTENT_KINDS, {"#h": rooms, since}), - ], - }) - - listen({ - url, - signal: controller.signal, - filters: [{kinds: REACTION_KINDS, "#h": rooms}], - }) - } else { - pullAndListen({ - url, - signal: controller.signal, - filters: [{kinds: roomKinds}], - }) - - pullAndListen({ - url, - signal: controller.signal, - filters: [{kinds: MESSAGE_KINDS, since}, makeCommentFilter(CONTENT_KINDS, {since})], - }) - - listen({ - url, - signal: controller.signal, - filters: [{kinds: REACTION_KINDS}], - }) + const pullRoomContent = (room: string) => { + if (!seen.has(room)) { + seen.add(room) + pullAndListen({ + url, + signal: controller.signal, + filters: [ + {kinds: MESSAGE_KINDS, since, "#h": [room]}, + makeCommentFilter(CONTENT_KINDS, {since, "#h": [room]}), + ], + }) + } } + for (const room of rooms) { + pullRoomContent(room) + } + + const relayKinds = [RELAY_MEMBERS, RELAY_ADD_MEMBER, RELAY_REMOVE_MEMBER] + const roomMetaKinds = [ROOM_META, ROOM_ADMINS, ROOM_MEMBERS, LIVEKIT_PARTICIPANTS] + const roomMemberKinds = [ROOM_DELETE, ROOM_ADD_MEMBER, ROOM_REMOVE_MEMBER] + + pullAndListen({ + url, + signal: controller.signal, + filters: [ + {kinds: relayKinds}, + {kinds: roomMetaKinds}, + {kinds: roomMemberKinds}, + {kinds: MESSAGE_KINDS, since}, + makeCommentFilter(CONTENT_KINDS, {since}), + ], + onEvent: event => { + if (event.kind === ROOM_META) { + ifLet(getTagValue("d", event.tags), pullRoomContent) + } + }, + }) + + listen({ + url, + signal: controller.signal, + filters: [{kinds: REACTION_KINDS}], + }) + return () => controller.abort() }