Fix fallback pull race after abort #167

Merged
hodlbod merged 6 commits from nayan9617/flotilla:fix/pull-fallback-abort-race into dev 2026-04-08 16:43:05 +00:00
2 changed files with 92 additions and 47 deletions
+1
View File
@@ -28,6 +28,7 @@ node_modules/
.pnpm-store/
build/
.svelte-kit/
.next/
# Rust/Tauri
*target/
+91 -47
View File
@@ -1,8 +1,9 @@
import {page} from "$app/stores"
import type {Unsubscriber} from "svelte/store"
import {derived, get} from "svelte/store"
import {last, call, ifLet, assoc, chunk, sleep, identity, WEEK, ago} from "@welshman/lib"
import {get} from "svelte/store"
import {last, call, ifLet, assoc, chunk, sleep, WEEK, ago} from "@welshman/lib"
import {PollResponse} from "nostr-tools/kinds"
import {merged} from "@welshman/store"
import {
getListTags,
getRelayTagValues,
hodlbod marked this conversation as resolved Outdated
Outdated
Review

Use import type rather than mixing type/variable imports

Use `import type` rather than mixing type/variable imports
@@ -21,7 +22,7 @@ import {
unionFilters,
getTagValue,
} from "@welshman/util"
import type {Filter, TrustedEvent} from "@welshman/util"
import type {Filter, List, PublishedList, TrustedEvent} from "@welshman/util"
import {request, requestOne, Difference, DifferenceEvent} from "@welshman/net"
import {
pubkey,
@@ -74,6 +75,8 @@ const pullOneWithFallback = async (
signal: AbortSignal,
onEvent?: (event: TrustedEvent) => void,
) => {
if (signal.aborted) return
const cachedEvents = repository.query([filter]).filter(isSignedEvent)
const since = last(cachedEvents.slice(10))?.created_at || 0
@@ -86,6 +89,12 @@ const pullOneWithFallback = async (
const shouldFallback =
!hasNegentropy(url) ||
(await new Promise(resolve => {
if (signal.aborted) {
resolve(false)
return
}
// If teardown wins while the diff is opening, skip the fallback path and let cleanup stay in control.
const diff = new Difference({relay: url, filter, events: cachedEvents, signal})
diff.on(DifferenceEvent.Error, () => {
@@ -111,9 +120,7 @@ export const pullWithFallback = async ({url, signal, filters, onEvent}: SyncOpts
if (signal.aborted) return
for (const filter of filters) {
pullOneWithFallback(url, filter, signal, onEvent)
}
await Promise.all(filters.map(filter => pullOneWithFallback(url, filter, signal, onEvent)))
}
const listen = ({url, signal, filters, onEvent}: SyncOpts) => {
@@ -123,6 +130,8 @@ const listen = ({url, signal, filters, onEvent}: SyncOpts) => {
}
const pullAndListen = (options: SyncOpts) => {
if (options.signal.aborted) return
pullWithFallback(options)
listen(options)
}
@@ -197,7 +206,7 @@ const syncUserRoomMembership = (url: string, h: string) => {
const syncUserData = () => {
const unsubscribersByKey = new Map<string, Unsubscriber>()
const unsubscribeGroupList = userGroupList.subscribe($userGroupList => {
const syncGroupList = ($userGroupList: List | undefined) => {
if ($userGroupList) {
const keys = new Set<string>()
@@ -226,26 +235,32 @@ const syncUserData = () => {
}
}
}
})
}
const unsubscribeRelayList = userRelayList.subscribe($userRelayList => {
if ($userRelayList) {
loadBlossomServerList($userRelayList.event.pubkey)
loadBlockedRelayList($userRelayList.event.pubkey)
loadFollowList($userRelayList.event.pubkey)
loadGroupList($userRelayList.event.pubkey)
loadMuteList($userRelayList.event.pubkey)
loadProfile($userRelayList.event.pubkey)
loadSettings($userRelayList.event.pubkey)
loadFeedsForPubkey($userRelayList.event.pubkey)
}
})
const syncRelayList = ($userRelayList: PublishedList | undefined) => {
const pubkey = $userRelayList?.event?.pubkey
hodlbod marked this conversation as resolved Outdated
Outdated
Review

You can use PublishedList here for a slightly more narrow type

You can use `PublishedList` here for a slightly more narrow type
const unsubscribeFollows = userFollowList.subscribe(async $userFollowList => {
if (!pubkey) return
loadBlossomServerList(pubkey)
loadBlockedRelayList(pubkey)
loadFollowList(pubkey)
loadGroupList(pubkey)
loadMuteList(pubkey)
loadProfile(pubkey)
loadSettings(pubkey)
loadFeedsForPubkey(pubkey)
}
const syncFollowList = async (signal: AbortSignal) => {
for (const pubkeys of chunk(10, get(bootstrapPubkeys))) {
if (signal.aborted) return
hodlbod marked this conversation as resolved Outdated
Outdated
Review

This actually changes the behavior to flood relays with requests. This needs to be a trickle in the background to prevent interrupting other requests. If we need to cancel it, then we should add that explicitly.

This actually changes the behavior to flood relays with requests. This needs to be a trickle in the background to prevent interrupting other requests. If we need to cancel it, then we should add that explicitly.
// This isn't urgent, avoid clogging other stuff up
await sleep(1000)
if (signal.aborted) return
await Promise.all(
pubkeys.flatMap(pk => [
loadRelayList(pk),
@@ -256,9 +271,26 @@ const syncUserData = () => {
]),
)
}
}
let bootstrapFollowController = new AbortController()
const unsubscribeGroupList = merged([userGroupList]).subscribe(([$userGroupList]) => {
syncGroupList($userGroupList)
})
const unsubscribeRelayList = merged([userRelayList]).subscribe(([$userRelayList]) => {
syncRelayList($userRelayList)
})
const unsubscribeFollows = merged([userFollowList]).subscribe(() => {
bootstrapFollowController.abort()
bootstrapFollowController = new AbortController()
void syncFollowList(bootstrapFollowController.signal)
})
return () => {
bootstrapFollowController.abort()
unsubscribersByKey.forEach(call)
unsubscribeGroupList()
unsubscribeRelayList()
hodlbod marked this conversation as resolved Outdated
Outdated
Review

Instead of derived/identity you can use merged from @welshman/store

Instead of derived/identity you can use `merged` from `@welshman/store`
@@ -321,7 +353,7 @@ const syncSpace = (url: string, rooms: string[]) => {
}
const syncSpaces = () => {
const store = derived([userGroupList, page], identity)
const store = merged([userGroupList, page])
const unsubscribersByUrl = new Map<string, Unsubscriber>()
const roomsByUrl = new Map<string, string>()
@@ -383,6 +415,7 @@ const syncDMs = () => {
const unsubscribersByUrl = new Map<string, Unsubscriber>()
let currentPubkey: string | undefined
let currentShouldUnwrap = false
const unsubscribeAll = () => {
for (const [url, unsubscribe] of unsubscribersByUrl.entries()) {
@@ -391,6 +424,34 @@ const syncDMs = () => {
}
}
const syncPubkey = ($pubkey: string | undefined, $shouldUnwrap: boolean) => {
if ($pubkey !== currentPubkey) {
unsubscribeAll()
}
if ($pubkey && $shouldUnwrap) {
loadRelayList($pubkey)
.then(() => loadMessagingRelayList($pubkey))
.then($l => {
if ($l && currentPubkey === $pubkey && currentShouldUnwrap === $shouldUnwrap) {
subscribeAll($pubkey, getRelayTagValues(getListTags($l)))
}
})
}
currentPubkey = $pubkey
currentShouldUnwrap = $shouldUnwrap
}
const syncList = ($userMessagingRelayList: List | undefined) => {
const $pubkey = pubkey.get()
const $shouldUnwrap = shouldUnwrap.get()
if ($pubkey && $shouldUnwrap) {
subscribeAll($pubkey, getRelayTagValues(getListTags($userMessagingRelayList)))
}
}
const subscribeAll = (pubkey: string, urls: string[]) => {
// Start syncing newly added relays
for (const url of urls) {
@@ -408,33 +469,16 @@ const syncDMs = () => {
}
}
// When pubkey changes, re-sync
const unsubscribePubkey = derived([pubkey, shouldUnwrap], identity).subscribe(
([$pubkey, $shouldUnwrap]) => {
if ($pubkey !== currentPubkey) {
unsubscribeAll()
}
// If we have a pubkey, refresh our user's relay list then sync our subscriptions
if ($pubkey && $shouldUnwrap) {
loadRelayList($pubkey)
.then(() => loadMessagingRelayList($pubkey))
.then($l => subscribeAll($pubkey, getRelayTagValues(getListTags($l))))
}
currentPubkey = $pubkey
},
)
const unsubscribePubkey = merged([pubkey, shouldUnwrap]).subscribe(([$pubkey, $shouldUnwrap]) => {
syncPubkey($pubkey, $shouldUnwrap)
})
// When user messaging relays change, update synchronization
const unsubscribeList = userMessagingRelayList.subscribe($userMessagingRelayList => {
const $pubkey = pubkey.get()
const $shouldUnwrap = shouldUnwrap.get()
if ($pubkey && $shouldUnwrap) {
subscribeAll($pubkey, getRelayTagValues(getListTags($userMessagingRelayList)))
}
})
const unsubscribeList = merged([userMessagingRelayList]).subscribe(
([$userMessagingRelayList]) => {
syncList($userMessagingRelayList)
},
)
return () => {
unsubscribeAll()