From eef32ca11e44848dfa4c6b2389218a71ff0bc95f Mon Sep 17 00:00:00 2001 From: Jon Staab Date: Thu, 26 Feb 2026 14:29:13 -0800 Subject: [PATCH] Make sync logic more robust --- src/app/core/sync.ts | 143 ++++++++++++++++++++++++++++++------------- 1 file changed, 100 insertions(+), 43 deletions(-) diff --git a/src/app/core/sync.ts b/src/app/core/sync.ts index 155b9add..96eab24f 100644 --- a/src/app/core/sync.ts +++ b/src/app/core/sync.ts @@ -20,7 +20,7 @@ import { unionFilters, } from "@welshman/util" import type {Filter} from "@welshman/util" -import {request, pull} from "@welshman/net" +import {request, requestOne, Difference, DifferenceEvent} from "@welshman/net" import { pubkey, loadRelay, @@ -64,26 +64,47 @@ type SyncOpts = { filters: Filter[] } -export const pullWithFallback = ({url, signal, filters}: SyncOpts) => { - const relays = [url] - const events = repository.query(filters).filter(isSignedEvent) +const pullOneWithFallback = async (url: string, filter: Filter, signal: AbortSignal) => { + const cachedEvents = repository.query([filter]).filter(isSignedEvent) + const since = last(cachedEvents.slice(10))?.created_at || 0 - if (hasNegentropy(url)) { - pull({relays, signal, events, filters}) - } else { - request({ - relays, - signal, - autoClose: true, - filters: filters.map(assoc("since", last(events.slice(10))?.created_at || 0)), - }) + const shouldFallback = + !hasNegentropy(url) || + (await new Promise(resolve => { + const diff = new Difference({relay: url, filter, events: cachedEvents, signal}) + + diff.on(DifferenceEvent.Error, () => { + resolve(true) + }) + + diff.on(DifferenceEvent.Close, () => { + for (const ids of chunk(100, Array.from(diff.need))) { + requestOne({relay: url, signal, autoClose: true, filters: [{ids}]}) + } + + resolve(false) + }) + })) + + if (shouldFallback && !signal.aborted) { + request({relays: [url], signal, autoClose: true, filters: [{...filter, since}]}) + } +} + +export const pullWithFallback = async ({url, signal, filters}: SyncOpts) => { + await loadRelay(url) + + if (signal.aborted) return + + for (const filter of filters) { + pullOneWithFallback(url, filter, signal) } } const listen = ({url, signal, filters}: SyncOpts) => { const relays = [url] - request({relays, signal, filters: unionFilters(filters).map(assoc("limit", 0))}) + request({relays, signal, filters: unionFilters(filters.map(assoc("limit", 0)))}) } const pullAndListen = ({url, filters, signal}: SyncOpts) => { @@ -232,69 +253,105 @@ const syncUserData = () => { // Spaces -const syncSpace = (url: string) => { +const syncSpace = (url: string, rooms: string[]) => { const controller = new AbortController() - // These are separated so that old versions of relay29 don't barf - + // Relay-level kinds don't need #h tags pullAndListen({ url, signal: controller.signal, filters: [{kinds: [RELAY_MEMBERS, RELAY_ADD_MEMBER, RELAY_REMOVE_MEMBER]}], }) + // Room metadata uses #d tags, not #h, so no filtering needed pullAndListen({ url, signal: controller.signal, filters: [{kinds: [ROOM_META, ROOM_ADMINS, ROOM_MEMBERS]}], }) - pullAndListen({ - url, - signal: controller.signal, - filters: [{kinds: [ROOM_DELETE, ROOM_ADD_MEMBER, ROOM_REMOVE_MEMBER]}], - }) - + // Room-scoped kinds: add #h tags when we know which rooms the user is in. + // This avoids sending broad filters that picky relays reject. + const roomKinds = [ROOM_DELETE, ROOM_ADD_MEMBER, ROOM_REMOVE_MEMBER] const since = ago(WEEK) - pullAndListen({ - url, - signal: controller.signal, - filters: [{kinds: MESSAGE_KINDS, since}, makeCommentFilter(CONTENT_KINDS, {since})], - }) + if (rooms.length > 0) { + pullAndListen({ + url, + signal: controller.signal, + filters: [{kinds: roomKinds, "#h": rooms}], + }) - listen({ - url, - signal: controller.signal, - filters: [{kinds: REACTION_KINDS, limit: 0}], - }) + pullAndListen({ + url, + signal: controller.signal, + filters: [ + {kinds: MESSAGE_KINDS, "#h": rooms, since}, + makeCommentFilter(CONTENT_KINDS, {"#h": rooms, since}), + ], + }) + + listen({ + url, + signal: controller.signal, + filters: [{kinds: REACTION_KINDS, "#h": rooms}], + }) + } else { + pullAndListen({ + url, + signal: controller.signal, + filters: [{kinds: roomKinds}], + }) + + pullAndListen({ + url, + signal: controller.signal, + filters: [{kinds: MESSAGE_KINDS, since}, makeCommentFilter(CONTENT_KINDS, {since})], + }) + + listen({ + url, + signal: controller.signal, + filters: [{kinds: REACTION_KINDS}], + }) + } return () => controller.abort() } const syncSpaces = () => { - const store = derived([userSpaceUrls, page], identity) + const store = derived([userGroupList, page], identity) const unsubscribersByUrl = new Map() - const unsubscribe = store.subscribe(([$userSpaceUrls, $page]) => { - const urls = Array.from($userSpaceUrls) + const roomsByUrl = new Map() + + const unsubscribe = store.subscribe(([$userGroupList, $page]) => { + const urls = new Set(getSpaceUrlsFromGroupList($userGroupList)) if ($page.params.relay) { - urls.push(decodeRelay($page.params.relay)) + urls.add(decodeRelay($page.params.relay)) } - // stop syncing removed spaces + // Stop syncing removed spaces for (const [url, unsubscribe] of unsubscribersByUrl.entries()) { - if (!urls.includes(url)) { + if (!urls.has(url)) { unsubscribersByUrl.delete(url) + roomsByUrl.delete(url) unsubscribe() } } - // Start syncing newly added spaces + // Start or restart syncing for each space for (const url of urls) { - if (!unsubscribersByUrl.has(url)) { - unsubscribersByUrl.set(url, syncSpace(url)) - } + const rooms = getSpaceRoomsFromGroupList(url, $userGroupList) + const roomsKey = rooms.join(",") + + if (unsubscribersByUrl.has(url) && roomsByUrl.get(url) === roomsKey) continue + + // Tear down existing sync if rooms changed + unsubscribersByUrl.get(url)?.() + + roomsByUrl.set(url, roomsKey) + unsubscribersByUrl.set(url, syncSpace(url, rooms)) } })