forked from coracle/flotilla
Make sync logic more robust
This commit is contained in:
+100
-43
@@ -20,7 +20,7 @@ import {
|
||||
unionFilters,
|
||||
} from "@welshman/util"
|
||||
import type {Filter} from "@welshman/util"
|
||||
import {request, pull} from "@welshman/net"
|
||||
import {request, requestOne, Difference, DifferenceEvent} from "@welshman/net"
|
||||
import {
|
||||
pubkey,
|
||||
loadRelay,
|
||||
@@ -64,26 +64,47 @@ type SyncOpts = {
|
||||
filters: Filter[]
|
||||
}
|
||||
|
||||
export const pullWithFallback = ({url, signal, filters}: SyncOpts) => {
|
||||
const relays = [url]
|
||||
const events = repository.query(filters).filter(isSignedEvent)
|
||||
const pullOneWithFallback = async (url: string, filter: Filter, signal: AbortSignal) => {
|
||||
const cachedEvents = repository.query([filter]).filter(isSignedEvent)
|
||||
const since = last(cachedEvents.slice(10))?.created_at || 0
|
||||
|
||||
if (hasNegentropy(url)) {
|
||||
pull({relays, signal, events, filters})
|
||||
} else {
|
||||
request({
|
||||
relays,
|
||||
signal,
|
||||
autoClose: true,
|
||||
filters: filters.map(assoc("since", last(events.slice(10))?.created_at || 0)),
|
||||
})
|
||||
const shouldFallback =
|
||||
!hasNegentropy(url) ||
|
||||
(await new Promise(resolve => {
|
||||
const diff = new Difference({relay: url, filter, events: cachedEvents, signal})
|
||||
|
||||
diff.on(DifferenceEvent.Error, () => {
|
||||
resolve(true)
|
||||
})
|
||||
|
||||
diff.on(DifferenceEvent.Close, () => {
|
||||
for (const ids of chunk(100, Array.from(diff.need))) {
|
||||
requestOne({relay: url, signal, autoClose: true, filters: [{ids}]})
|
||||
}
|
||||
|
||||
resolve(false)
|
||||
})
|
||||
}))
|
||||
|
||||
if (shouldFallback && !signal.aborted) {
|
||||
request({relays: [url], signal, autoClose: true, filters: [{...filter, since}]})
|
||||
}
|
||||
}
|
||||
|
||||
export const pullWithFallback = async ({url, signal, filters}: SyncOpts) => {
|
||||
await loadRelay(url)
|
||||
|
||||
if (signal.aborted) return
|
||||
|
||||
for (const filter of filters) {
|
||||
pullOneWithFallback(url, filter, signal)
|
||||
}
|
||||
}
|
||||
|
||||
const listen = ({url, signal, filters}: SyncOpts) => {
|
||||
const relays = [url]
|
||||
|
||||
request({relays, signal, filters: unionFilters(filters).map(assoc("limit", 0))})
|
||||
request({relays, signal, filters: unionFilters(filters.map(assoc("limit", 0)))})
|
||||
}
|
||||
|
||||
const pullAndListen = ({url, filters, signal}: SyncOpts) => {
|
||||
@@ -232,69 +253,105 @@ const syncUserData = () => {
|
||||
|
||||
// Spaces
|
||||
|
||||
const syncSpace = (url: string) => {
|
||||
const syncSpace = (url: string, rooms: string[]) => {
|
||||
const controller = new AbortController()
|
||||
|
||||
// These are separated so that old versions of relay29 don't barf
|
||||
|
||||
// Relay-level kinds don't need #h tags
|
||||
pullAndListen({
|
||||
url,
|
||||
signal: controller.signal,
|
||||
filters: [{kinds: [RELAY_MEMBERS, RELAY_ADD_MEMBER, RELAY_REMOVE_MEMBER]}],
|
||||
})
|
||||
|
||||
// Room metadata uses #d tags, not #h, so no filtering needed
|
||||
pullAndListen({
|
||||
url,
|
||||
signal: controller.signal,
|
||||
filters: [{kinds: [ROOM_META, ROOM_ADMINS, ROOM_MEMBERS]}],
|
||||
})
|
||||
|
||||
pullAndListen({
|
||||
url,
|
||||
signal: controller.signal,
|
||||
filters: [{kinds: [ROOM_DELETE, ROOM_ADD_MEMBER, ROOM_REMOVE_MEMBER]}],
|
||||
})
|
||||
|
||||
// Room-scoped kinds: add #h tags when we know which rooms the user is in.
|
||||
// This avoids sending broad filters that picky relays reject.
|
||||
const roomKinds = [ROOM_DELETE, ROOM_ADD_MEMBER, ROOM_REMOVE_MEMBER]
|
||||
const since = ago(WEEK)
|
||||
|
||||
pullAndListen({
|
||||
url,
|
||||
signal: controller.signal,
|
||||
filters: [{kinds: MESSAGE_KINDS, since}, makeCommentFilter(CONTENT_KINDS, {since})],
|
||||
})
|
||||
if (rooms.length > 0) {
|
||||
pullAndListen({
|
||||
url,
|
||||
signal: controller.signal,
|
||||
filters: [{kinds: roomKinds, "#h": rooms}],
|
||||
})
|
||||
|
||||
listen({
|
||||
url,
|
||||
signal: controller.signal,
|
||||
filters: [{kinds: REACTION_KINDS, limit: 0}],
|
||||
})
|
||||
pullAndListen({
|
||||
url,
|
||||
signal: controller.signal,
|
||||
filters: [
|
||||
{kinds: MESSAGE_KINDS, "#h": rooms, since},
|
||||
makeCommentFilter(CONTENT_KINDS, {"#h": rooms, since}),
|
||||
],
|
||||
})
|
||||
|
||||
listen({
|
||||
url,
|
||||
signal: controller.signal,
|
||||
filters: [{kinds: REACTION_KINDS, "#h": rooms}],
|
||||
})
|
||||
} else {
|
||||
pullAndListen({
|
||||
url,
|
||||
signal: controller.signal,
|
||||
filters: [{kinds: roomKinds}],
|
||||
})
|
||||
|
||||
pullAndListen({
|
||||
url,
|
||||
signal: controller.signal,
|
||||
filters: [{kinds: MESSAGE_KINDS, since}, makeCommentFilter(CONTENT_KINDS, {since})],
|
||||
})
|
||||
|
||||
listen({
|
||||
url,
|
||||
signal: controller.signal,
|
||||
filters: [{kinds: REACTION_KINDS}],
|
||||
})
|
||||
}
|
||||
|
||||
return () => controller.abort()
|
||||
}
|
||||
|
||||
const syncSpaces = () => {
|
||||
const store = derived([userSpaceUrls, page], identity)
|
||||
const store = derived([userGroupList, page], identity)
|
||||
const unsubscribersByUrl = new Map<string, Unsubscriber>()
|
||||
const unsubscribe = store.subscribe(([$userSpaceUrls, $page]) => {
|
||||
const urls = Array.from($userSpaceUrls)
|
||||
const roomsByUrl = new Map<string, string>()
|
||||
|
||||
const unsubscribe = store.subscribe(([$userGroupList, $page]) => {
|
||||
const urls = new Set(getSpaceUrlsFromGroupList($userGroupList))
|
||||
|
||||
if ($page.params.relay) {
|
||||
urls.push(decodeRelay($page.params.relay))
|
||||
urls.add(decodeRelay($page.params.relay))
|
||||
}
|
||||
|
||||
// stop syncing removed spaces
|
||||
// Stop syncing removed spaces
|
||||
for (const [url, unsubscribe] of unsubscribersByUrl.entries()) {
|
||||
if (!urls.includes(url)) {
|
||||
if (!urls.has(url)) {
|
||||
unsubscribersByUrl.delete(url)
|
||||
roomsByUrl.delete(url)
|
||||
unsubscribe()
|
||||
}
|
||||
}
|
||||
|
||||
// Start syncing newly added spaces
|
||||
// Start or restart syncing for each space
|
||||
for (const url of urls) {
|
||||
if (!unsubscribersByUrl.has(url)) {
|
||||
unsubscribersByUrl.set(url, syncSpace(url))
|
||||
}
|
||||
const rooms = getSpaceRoomsFromGroupList(url, $userGroupList)
|
||||
const roomsKey = rooms.join(",")
|
||||
|
||||
if (unsubscribersByUrl.has(url) && roomsByUrl.get(url) === roomsKey) continue
|
||||
|
||||
// Tear down existing sync if rooms changed
|
||||
unsubscribersByUrl.get(url)?.()
|
||||
|
||||
roomsByUrl.set(url, roomsKey)
|
||||
unsubscribersByUrl.set(url, syncSpace(url, rooms))
|
||||
}
|
||||
})
|
||||
|
||||
|
||||
Reference in New Issue
Block a user