Fix fallback pull race after abort #167
@@ -28,6 +28,7 @@ node_modules/
|
||||
.pnpm-store/
|
||||
build/
|
||||
.svelte-kit/
|
||||
.next/
|
||||
|
||||
# Rust/Tauri
|
||||
*target/
|
||||
|
||||
+91
-47
@@ -1,8 +1,9 @@
|
||||
import {page} from "$app/stores"
|
||||
import type {Unsubscriber} from "svelte/store"
|
||||
import {derived, get} from "svelte/store"
|
||||
import {last, call, ifLet, assoc, chunk, sleep, identity, WEEK, ago} from "@welshman/lib"
|
||||
import {get} from "svelte/store"
|
||||
import {last, call, ifLet, assoc, chunk, sleep, WEEK, ago} from "@welshman/lib"
|
||||
import {PollResponse} from "nostr-tools/kinds"
|
||||
import {merged} from "@welshman/store"
|
||||
import {
|
||||
getListTags,
|
||||
getRelayTagValues,
|
||||
|
hodlbod marked this conversation as resolved
Outdated
|
||||
@@ -21,7 +22,7 @@ import {
|
||||
unionFilters,
|
||||
getTagValue,
|
||||
} from "@welshman/util"
|
||||
import type {Filter, TrustedEvent} from "@welshman/util"
|
||||
import type {Filter, List, PublishedList, TrustedEvent} from "@welshman/util"
|
||||
import {request, requestOne, Difference, DifferenceEvent} from "@welshman/net"
|
||||
import {
|
||||
pubkey,
|
||||
@@ -74,6 +75,8 @@ const pullOneWithFallback = async (
|
||||
signal: AbortSignal,
|
||||
onEvent?: (event: TrustedEvent) => void,
|
||||
) => {
|
||||
if (signal.aborted) return
|
||||
|
||||
const cachedEvents = repository.query([filter]).filter(isSignedEvent)
|
||||
const since = last(cachedEvents.slice(10))?.created_at || 0
|
||||
|
||||
@@ -86,6 +89,12 @@ const pullOneWithFallback = async (
|
||||
const shouldFallback =
|
||||
!hasNegentropy(url) ||
|
||||
(await new Promise(resolve => {
|
||||
if (signal.aborted) {
|
||||
resolve(false)
|
||||
return
|
||||
}
|
||||
|
||||
// If teardown wins while the diff is opening, skip the fallback path and let cleanup stay in control.
|
||||
const diff = new Difference({relay: url, filter, events: cachedEvents, signal})
|
||||
|
||||
diff.on(DifferenceEvent.Error, () => {
|
||||
@@ -111,9 +120,7 @@ export const pullWithFallback = async ({url, signal, filters, onEvent}: SyncOpts
|
||||
|
||||
if (signal.aborted) return
|
||||
|
||||
for (const filter of filters) {
|
||||
pullOneWithFallback(url, filter, signal, onEvent)
|
||||
}
|
||||
await Promise.all(filters.map(filter => pullOneWithFallback(url, filter, signal, onEvent)))
|
||||
}
|
||||
|
||||
const listen = ({url, signal, filters, onEvent}: SyncOpts) => {
|
||||
@@ -123,6 +130,8 @@ const listen = ({url, signal, filters, onEvent}: SyncOpts) => {
|
||||
}
|
||||
|
||||
const pullAndListen = (options: SyncOpts) => {
|
||||
if (options.signal.aborted) return
|
||||
|
||||
pullWithFallback(options)
|
||||
listen(options)
|
||||
}
|
||||
@@ -197,7 +206,7 @@ const syncUserRoomMembership = (url: string, h: string) => {
|
||||
const syncUserData = () => {
|
||||
const unsubscribersByKey = new Map<string, Unsubscriber>()
|
||||
|
||||
const unsubscribeGroupList = userGroupList.subscribe($userGroupList => {
|
||||
const syncGroupList = ($userGroupList: List | undefined) => {
|
||||
if ($userGroupList) {
|
||||
const keys = new Set<string>()
|
||||
|
||||
@@ -226,26 +235,32 @@ const syncUserData = () => {
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
const unsubscribeRelayList = userRelayList.subscribe($userRelayList => {
|
||||
if ($userRelayList) {
|
||||
loadBlossomServerList($userRelayList.event.pubkey)
|
||||
loadBlockedRelayList($userRelayList.event.pubkey)
|
||||
loadFollowList($userRelayList.event.pubkey)
|
||||
loadGroupList($userRelayList.event.pubkey)
|
||||
loadMuteList($userRelayList.event.pubkey)
|
||||
loadProfile($userRelayList.event.pubkey)
|
||||
loadSettings($userRelayList.event.pubkey)
|
||||
loadFeedsForPubkey($userRelayList.event.pubkey)
|
||||
}
|
||||
})
|
||||
const syncRelayList = ($userRelayList: PublishedList | undefined) => {
|
||||
const pubkey = $userRelayList?.event?.pubkey
|
||||
|
hodlbod marked this conversation as resolved
Outdated
hodlbod
commented
You can use You can use `PublishedList` here for a slightly more narrow type
|
||||
|
||||
const unsubscribeFollows = userFollowList.subscribe(async $userFollowList => {
|
||||
if (!pubkey) return
|
||||
|
||||
loadBlossomServerList(pubkey)
|
||||
loadBlockedRelayList(pubkey)
|
||||
loadFollowList(pubkey)
|
||||
loadGroupList(pubkey)
|
||||
loadMuteList(pubkey)
|
||||
loadProfile(pubkey)
|
||||
loadSettings(pubkey)
|
||||
loadFeedsForPubkey(pubkey)
|
||||
}
|
||||
|
||||
const syncFollowList = async (signal: AbortSignal) => {
|
||||
for (const pubkeys of chunk(10, get(bootstrapPubkeys))) {
|
||||
if (signal.aborted) return
|
||||
|
||||
|
hodlbod marked this conversation as resolved
Outdated
hodlbod
commented
This actually changes the behavior to flood relays with requests. This needs to be a trickle in the background to prevent interrupting other requests. If we need to cancel it, then we should add that explicitly. This actually changes the behavior to flood relays with requests. This needs to be a trickle in the background to prevent interrupting other requests. If we need to cancel it, then we should add that explicitly.
|
||||
// This isn't urgent, avoid clogging other stuff up
|
||||
await sleep(1000)
|
||||
|
||||
if (signal.aborted) return
|
||||
|
||||
await Promise.all(
|
||||
pubkeys.flatMap(pk => [
|
||||
loadRelayList(pk),
|
||||
@@ -256,9 +271,26 @@ const syncUserData = () => {
|
||||
]),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
let bootstrapFollowController = new AbortController()
|
||||
|
||||
const unsubscribeGroupList = merged([userGroupList]).subscribe(([$userGroupList]) => {
|
||||
syncGroupList($userGroupList)
|
||||
})
|
||||
|
||||
const unsubscribeRelayList = merged([userRelayList]).subscribe(([$userRelayList]) => {
|
||||
syncRelayList($userRelayList)
|
||||
})
|
||||
|
||||
const unsubscribeFollows = merged([userFollowList]).subscribe(() => {
|
||||
bootstrapFollowController.abort()
|
||||
bootstrapFollowController = new AbortController()
|
||||
void syncFollowList(bootstrapFollowController.signal)
|
||||
})
|
||||
|
||||
return () => {
|
||||
bootstrapFollowController.abort()
|
||||
unsubscribersByKey.forEach(call)
|
||||
unsubscribeGroupList()
|
||||
unsubscribeRelayList()
|
||||
|
hodlbod marked this conversation as resolved
Outdated
hodlbod
commented
Instead of derived/identity you can use Instead of derived/identity you can use `merged` from `@welshman/store`
|
||||
@@ -321,7 +353,7 @@ const syncSpace = (url: string, rooms: string[]) => {
|
||||
}
|
||||
|
||||
const syncSpaces = () => {
|
||||
const store = derived([userGroupList, page], identity)
|
||||
const store = merged([userGroupList, page])
|
||||
const unsubscribersByUrl = new Map<string, Unsubscriber>()
|
||||
const roomsByUrl = new Map<string, string>()
|
||||
|
||||
@@ -383,6 +415,7 @@ const syncDMs = () => {
|
||||
const unsubscribersByUrl = new Map<string, Unsubscriber>()
|
||||
|
||||
let currentPubkey: string | undefined
|
||||
let currentShouldUnwrap = false
|
||||
|
||||
const unsubscribeAll = () => {
|
||||
for (const [url, unsubscribe] of unsubscribersByUrl.entries()) {
|
||||
@@ -391,6 +424,34 @@ const syncDMs = () => {
|
||||
}
|
||||
}
|
||||
|
||||
const syncPubkey = ($pubkey: string | undefined, $shouldUnwrap: boolean) => {
|
||||
if ($pubkey !== currentPubkey) {
|
||||
unsubscribeAll()
|
||||
}
|
||||
|
||||
if ($pubkey && $shouldUnwrap) {
|
||||
loadRelayList($pubkey)
|
||||
.then(() => loadMessagingRelayList($pubkey))
|
||||
.then($l => {
|
||||
if ($l && currentPubkey === $pubkey && currentShouldUnwrap === $shouldUnwrap) {
|
||||
subscribeAll($pubkey, getRelayTagValues(getListTags($l)))
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
currentPubkey = $pubkey
|
||||
currentShouldUnwrap = $shouldUnwrap
|
||||
}
|
||||
|
||||
const syncList = ($userMessagingRelayList: List | undefined) => {
|
||||
const $pubkey = pubkey.get()
|
||||
const $shouldUnwrap = shouldUnwrap.get()
|
||||
|
||||
if ($pubkey && $shouldUnwrap) {
|
||||
subscribeAll($pubkey, getRelayTagValues(getListTags($userMessagingRelayList)))
|
||||
}
|
||||
}
|
||||
|
||||
const subscribeAll = (pubkey: string, urls: string[]) => {
|
||||
// Start syncing newly added relays
|
||||
for (const url of urls) {
|
||||
@@ -408,33 +469,16 @@ const syncDMs = () => {
|
||||
}
|
||||
}
|
||||
|
||||
// When pubkey changes, re-sync
|
||||
const unsubscribePubkey = derived([pubkey, shouldUnwrap], identity).subscribe(
|
||||
([$pubkey, $shouldUnwrap]) => {
|
||||
if ($pubkey !== currentPubkey) {
|
||||
unsubscribeAll()
|
||||
}
|
||||
|
||||
// If we have a pubkey, refresh our user's relay list then sync our subscriptions
|
||||
if ($pubkey && $shouldUnwrap) {
|
||||
loadRelayList($pubkey)
|
||||
.then(() => loadMessagingRelayList($pubkey))
|
||||
.then($l => subscribeAll($pubkey, getRelayTagValues(getListTags($l))))
|
||||
}
|
||||
|
||||
currentPubkey = $pubkey
|
||||
},
|
||||
)
|
||||
const unsubscribePubkey = merged([pubkey, shouldUnwrap]).subscribe(([$pubkey, $shouldUnwrap]) => {
|
||||
syncPubkey($pubkey, $shouldUnwrap)
|
||||
})
|
||||
|
||||
// When user messaging relays change, update synchronization
|
||||
const unsubscribeList = userMessagingRelayList.subscribe($userMessagingRelayList => {
|
||||
const $pubkey = pubkey.get()
|
||||
const $shouldUnwrap = shouldUnwrap.get()
|
||||
|
||||
if ($pubkey && $shouldUnwrap) {
|
||||
subscribeAll($pubkey, getRelayTagValues(getListTags($userMessagingRelayList)))
|
||||
}
|
||||
})
|
||||
const unsubscribeList = merged([userMessagingRelayList]).subscribe(
|
||||
([$userMessagingRelayList]) => {
|
||||
syncList($userMessagingRelayList)
|
||||
},
|
||||
)
|
||||
|
||||
return () => {
|
||||
unsubscribeAll()
|
||||
|
||||
Reference in New Issue
Block a user
Use
import typerather than mixing type/variable imports