diff --git a/.gitignore b/.gitignore index 0981428d..39569863 100644 --- a/.gitignore +++ b/.gitignore @@ -28,6 +28,7 @@ node_modules/ .pnpm-store/ build/ .svelte-kit/ +.next/ # Rust/Tauri *target/ diff --git a/src/app/core/sync.ts b/src/app/core/sync.ts index 131730a9..84eee092 100644 --- a/src/app/core/sync.ts +++ b/src/app/core/sync.ts @@ -1,8 +1,9 @@ import {page} from "$app/stores" import type {Unsubscriber} from "svelte/store" -import {derived, get} from "svelte/store" -import {last, call, ifLet, assoc, chunk, sleep, identity, WEEK, ago} from "@welshman/lib" +import {get} from "svelte/store" +import {last, call, ifLet, assoc, chunk, sleep, WEEK, ago} from "@welshman/lib" import {PollResponse} from "nostr-tools/kinds" +import {merged} from "@welshman/store" import { getListTags, getRelayTagValues, @@ -21,7 +22,7 @@ import { unionFilters, getTagValue, } from "@welshman/util" -import type {Filter, TrustedEvent} from "@welshman/util" +import type {Filter, List, PublishedList, TrustedEvent} from "@welshman/util" import {request, requestOne, Difference, DifferenceEvent} from "@welshman/net" import { pubkey, @@ -74,6 +75,8 @@ const pullOneWithFallback = async ( signal: AbortSignal, onEvent?: (event: TrustedEvent) => void, ) => { + if (signal.aborted) return + const cachedEvents = repository.query([filter]).filter(isSignedEvent) const since = last(cachedEvents.slice(10))?.created_at || 0 @@ -86,6 +89,12 @@ const pullOneWithFallback = async ( const shouldFallback = !hasNegentropy(url) || (await new Promise(resolve => { + if (signal.aborted) { + resolve(false) + return + } + + // If teardown wins while the diff is opening, skip the fallback path and let cleanup stay in control. const diff = new Difference({relay: url, filter, events: cachedEvents, signal}) diff.on(DifferenceEvent.Error, () => { @@ -111,9 +120,7 @@ export const pullWithFallback = async ({url, signal, filters, onEvent}: SyncOpts if (signal.aborted) return - for (const filter of filters) { - pullOneWithFallback(url, filter, signal, onEvent) - } + await Promise.all(filters.map(filter => pullOneWithFallback(url, filter, signal, onEvent))) } const listen = ({url, signal, filters, onEvent}: SyncOpts) => { @@ -123,6 +130,8 @@ const listen = ({url, signal, filters, onEvent}: SyncOpts) => { } const pullAndListen = (options: SyncOpts) => { + if (options.signal.aborted) return + pullWithFallback(options) listen(options) } @@ -197,7 +206,7 @@ const syncUserRoomMembership = (url: string, h: string) => { const syncUserData = () => { const unsubscribersByKey = new Map() - const unsubscribeGroupList = userGroupList.subscribe($userGroupList => { + const syncGroupList = ($userGroupList: List | undefined) => { if ($userGroupList) { const keys = new Set() @@ -226,26 +235,32 @@ const syncUserData = () => { } } } - }) + } - const unsubscribeRelayList = userRelayList.subscribe($userRelayList => { - if ($userRelayList) { - loadBlossomServerList($userRelayList.event.pubkey) - loadBlockedRelayList($userRelayList.event.pubkey) - loadFollowList($userRelayList.event.pubkey) - loadGroupList($userRelayList.event.pubkey) - loadMuteList($userRelayList.event.pubkey) - loadProfile($userRelayList.event.pubkey) - loadSettings($userRelayList.event.pubkey) - loadFeedsForPubkey($userRelayList.event.pubkey) - } - }) + const syncRelayList = ($userRelayList: PublishedList | undefined) => { + const pubkey = $userRelayList?.event?.pubkey - const unsubscribeFollows = userFollowList.subscribe(async $userFollowList => { + if (!pubkey) return + + loadBlossomServerList(pubkey) + loadBlockedRelayList(pubkey) + loadFollowList(pubkey) + loadGroupList(pubkey) + loadMuteList(pubkey) + loadProfile(pubkey) + loadSettings(pubkey) + loadFeedsForPubkey(pubkey) + } + + const syncFollowList = async (signal: AbortSignal) => { for (const pubkeys of chunk(10, get(bootstrapPubkeys))) { + if (signal.aborted) return + // This isn't urgent, avoid clogging other stuff up await sleep(1000) + if (signal.aborted) return + await Promise.all( pubkeys.flatMap(pk => [ loadRelayList(pk), @@ -256,9 +271,26 @@ const syncUserData = () => { ]), ) } + } + + let bootstrapFollowController = new AbortController() + + const unsubscribeGroupList = merged([userGroupList]).subscribe(([$userGroupList]) => { + syncGroupList($userGroupList) + }) + + const unsubscribeRelayList = merged([userRelayList]).subscribe(([$userRelayList]) => { + syncRelayList($userRelayList) + }) + + const unsubscribeFollows = merged([userFollowList]).subscribe(() => { + bootstrapFollowController.abort() + bootstrapFollowController = new AbortController() + void syncFollowList(bootstrapFollowController.signal) }) return () => { + bootstrapFollowController.abort() unsubscribersByKey.forEach(call) unsubscribeGroupList() unsubscribeRelayList() @@ -321,7 +353,7 @@ const syncSpace = (url: string, rooms: string[]) => { } const syncSpaces = () => { - const store = derived([userGroupList, page], identity) + const store = merged([userGroupList, page]) const unsubscribersByUrl = new Map() const roomsByUrl = new Map() @@ -383,6 +415,7 @@ const syncDMs = () => { const unsubscribersByUrl = new Map() let currentPubkey: string | undefined + let currentShouldUnwrap = false const unsubscribeAll = () => { for (const [url, unsubscribe] of unsubscribersByUrl.entries()) { @@ -391,6 +424,34 @@ const syncDMs = () => { } } + const syncPubkey = ($pubkey: string | undefined, $shouldUnwrap: boolean) => { + if ($pubkey !== currentPubkey) { + unsubscribeAll() + } + + if ($pubkey && $shouldUnwrap) { + loadRelayList($pubkey) + .then(() => loadMessagingRelayList($pubkey)) + .then($l => { + if ($l && currentPubkey === $pubkey && currentShouldUnwrap === $shouldUnwrap) { + subscribeAll($pubkey, getRelayTagValues(getListTags($l))) + } + }) + } + + currentPubkey = $pubkey + currentShouldUnwrap = $shouldUnwrap + } + + const syncList = ($userMessagingRelayList: List | undefined) => { + const $pubkey = pubkey.get() + const $shouldUnwrap = shouldUnwrap.get() + + if ($pubkey && $shouldUnwrap) { + subscribeAll($pubkey, getRelayTagValues(getListTags($userMessagingRelayList))) + } + } + const subscribeAll = (pubkey: string, urls: string[]) => { // Start syncing newly added relays for (const url of urls) { @@ -408,33 +469,16 @@ const syncDMs = () => { } } - // When pubkey changes, re-sync - const unsubscribePubkey = derived([pubkey, shouldUnwrap], identity).subscribe( - ([$pubkey, $shouldUnwrap]) => { - if ($pubkey !== currentPubkey) { - unsubscribeAll() - } - - // If we have a pubkey, refresh our user's relay list then sync our subscriptions - if ($pubkey && $shouldUnwrap) { - loadRelayList($pubkey) - .then(() => loadMessagingRelayList($pubkey)) - .then($l => subscribeAll($pubkey, getRelayTagValues(getListTags($l)))) - } - - currentPubkey = $pubkey - }, - ) + const unsubscribePubkey = merged([pubkey, shouldUnwrap]).subscribe(([$pubkey, $shouldUnwrap]) => { + syncPubkey($pubkey, $shouldUnwrap) + }) // When user messaging relays change, update synchronization - const unsubscribeList = userMessagingRelayList.subscribe($userMessagingRelayList => { - const $pubkey = pubkey.get() - const $shouldUnwrap = shouldUnwrap.get() - - if ($pubkey && $shouldUnwrap) { - subscribeAll($pubkey, getRelayTagValues(getListTags($userMessagingRelayList))) - } - }) + const unsubscribeList = merged([userMessagingRelayList]).subscribe( + ([$userMessagingRelayList]) => { + syncList($userMessagingRelayList) + }, + ) return () => { unsubscribeAll()