forked from coracle/flotilla
Make sync logic more robust
This commit is contained in:
+100
-43
@@ -20,7 +20,7 @@ import {
|
|||||||
unionFilters,
|
unionFilters,
|
||||||
} from "@welshman/util"
|
} from "@welshman/util"
|
||||||
import type {Filter} from "@welshman/util"
|
import type {Filter} from "@welshman/util"
|
||||||
import {request, pull} from "@welshman/net"
|
import {request, requestOne, Difference, DifferenceEvent} from "@welshman/net"
|
||||||
import {
|
import {
|
||||||
pubkey,
|
pubkey,
|
||||||
loadRelay,
|
loadRelay,
|
||||||
@@ -64,26 +64,47 @@ type SyncOpts = {
|
|||||||
filters: Filter[]
|
filters: Filter[]
|
||||||
}
|
}
|
||||||
|
|
||||||
export const pullWithFallback = ({url, signal, filters}: SyncOpts) => {
|
const pullOneWithFallback = async (url: string, filter: Filter, signal: AbortSignal) => {
|
||||||
const relays = [url]
|
const cachedEvents = repository.query([filter]).filter(isSignedEvent)
|
||||||
const events = repository.query(filters).filter(isSignedEvent)
|
const since = last(cachedEvents.slice(10))?.created_at || 0
|
||||||
|
|
||||||
if (hasNegentropy(url)) {
|
const shouldFallback =
|
||||||
pull({relays, signal, events, filters})
|
!hasNegentropy(url) ||
|
||||||
} else {
|
(await new Promise(resolve => {
|
||||||
request({
|
const diff = new Difference({relay: url, filter, events: cachedEvents, signal})
|
||||||
relays,
|
|
||||||
signal,
|
diff.on(DifferenceEvent.Error, () => {
|
||||||
autoClose: true,
|
resolve(true)
|
||||||
filters: filters.map(assoc("since", last(events.slice(10))?.created_at || 0)),
|
})
|
||||||
})
|
|
||||||
|
diff.on(DifferenceEvent.Close, () => {
|
||||||
|
for (const ids of chunk(100, Array.from(diff.need))) {
|
||||||
|
requestOne({relay: url, signal, autoClose: true, filters: [{ids}]})
|
||||||
|
}
|
||||||
|
|
||||||
|
resolve(false)
|
||||||
|
})
|
||||||
|
}))
|
||||||
|
|
||||||
|
if (shouldFallback && !signal.aborted) {
|
||||||
|
request({relays: [url], signal, autoClose: true, filters: [{...filter, since}]})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
export const pullWithFallback = async ({url, signal, filters}: SyncOpts) => {
|
||||||
|
await loadRelay(url)
|
||||||
|
|
||||||
|
if (signal.aborted) return
|
||||||
|
|
||||||
|
for (const filter of filters) {
|
||||||
|
pullOneWithFallback(url, filter, signal)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
const listen = ({url, signal, filters}: SyncOpts) => {
|
const listen = ({url, signal, filters}: SyncOpts) => {
|
||||||
const relays = [url]
|
const relays = [url]
|
||||||
|
|
||||||
request({relays, signal, filters: unionFilters(filters).map(assoc("limit", 0))})
|
request({relays, signal, filters: unionFilters(filters.map(assoc("limit", 0)))})
|
||||||
}
|
}
|
||||||
|
|
||||||
const pullAndListen = ({url, filters, signal}: SyncOpts) => {
|
const pullAndListen = ({url, filters, signal}: SyncOpts) => {
|
||||||
@@ -232,69 +253,105 @@ const syncUserData = () => {
|
|||||||
|
|
||||||
// Spaces
|
// Spaces
|
||||||
|
|
||||||
const syncSpace = (url: string) => {
|
const syncSpace = (url: string, rooms: string[]) => {
|
||||||
const controller = new AbortController()
|
const controller = new AbortController()
|
||||||
|
|
||||||
// These are separated so that old versions of relay29 don't barf
|
// Relay-level kinds don't need #h tags
|
||||||
|
|
||||||
pullAndListen({
|
pullAndListen({
|
||||||
url,
|
url,
|
||||||
signal: controller.signal,
|
signal: controller.signal,
|
||||||
filters: [{kinds: [RELAY_MEMBERS, RELAY_ADD_MEMBER, RELAY_REMOVE_MEMBER]}],
|
filters: [{kinds: [RELAY_MEMBERS, RELAY_ADD_MEMBER, RELAY_REMOVE_MEMBER]}],
|
||||||
})
|
})
|
||||||
|
|
||||||
|
// Room metadata uses #d tags, not #h, so no filtering needed
|
||||||
pullAndListen({
|
pullAndListen({
|
||||||
url,
|
url,
|
||||||
signal: controller.signal,
|
signal: controller.signal,
|
||||||
filters: [{kinds: [ROOM_META, ROOM_ADMINS, ROOM_MEMBERS]}],
|
filters: [{kinds: [ROOM_META, ROOM_ADMINS, ROOM_MEMBERS]}],
|
||||||
})
|
})
|
||||||
|
|
||||||
pullAndListen({
|
// Room-scoped kinds: add #h tags when we know which rooms the user is in.
|
||||||
url,
|
// This avoids sending broad filters that picky relays reject.
|
||||||
signal: controller.signal,
|
const roomKinds = [ROOM_DELETE, ROOM_ADD_MEMBER, ROOM_REMOVE_MEMBER]
|
||||||
filters: [{kinds: [ROOM_DELETE, ROOM_ADD_MEMBER, ROOM_REMOVE_MEMBER]}],
|
|
||||||
})
|
|
||||||
|
|
||||||
const since = ago(WEEK)
|
const since = ago(WEEK)
|
||||||
|
|
||||||
pullAndListen({
|
if (rooms.length > 0) {
|
||||||
url,
|
pullAndListen({
|
||||||
signal: controller.signal,
|
url,
|
||||||
filters: [{kinds: MESSAGE_KINDS, since}, makeCommentFilter(CONTENT_KINDS, {since})],
|
signal: controller.signal,
|
||||||
})
|
filters: [{kinds: roomKinds, "#h": rooms}],
|
||||||
|
})
|
||||||
|
|
||||||
listen({
|
pullAndListen({
|
||||||
url,
|
url,
|
||||||
signal: controller.signal,
|
signal: controller.signal,
|
||||||
filters: [{kinds: REACTION_KINDS, limit: 0}],
|
filters: [
|
||||||
})
|
{kinds: MESSAGE_KINDS, "#h": rooms, since},
|
||||||
|
makeCommentFilter(CONTENT_KINDS, {"#h": rooms, since}),
|
||||||
|
],
|
||||||
|
})
|
||||||
|
|
||||||
|
listen({
|
||||||
|
url,
|
||||||
|
signal: controller.signal,
|
||||||
|
filters: [{kinds: REACTION_KINDS, "#h": rooms}],
|
||||||
|
})
|
||||||
|
} else {
|
||||||
|
pullAndListen({
|
||||||
|
url,
|
||||||
|
signal: controller.signal,
|
||||||
|
filters: [{kinds: roomKinds}],
|
||||||
|
})
|
||||||
|
|
||||||
|
pullAndListen({
|
||||||
|
url,
|
||||||
|
signal: controller.signal,
|
||||||
|
filters: [{kinds: MESSAGE_KINDS, since}, makeCommentFilter(CONTENT_KINDS, {since})],
|
||||||
|
})
|
||||||
|
|
||||||
|
listen({
|
||||||
|
url,
|
||||||
|
signal: controller.signal,
|
||||||
|
filters: [{kinds: REACTION_KINDS}],
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
return () => controller.abort()
|
return () => controller.abort()
|
||||||
}
|
}
|
||||||
|
|
||||||
const syncSpaces = () => {
|
const syncSpaces = () => {
|
||||||
const store = derived([userSpaceUrls, page], identity)
|
const store = derived([userGroupList, page], identity)
|
||||||
const unsubscribersByUrl = new Map<string, Unsubscriber>()
|
const unsubscribersByUrl = new Map<string, Unsubscriber>()
|
||||||
const unsubscribe = store.subscribe(([$userSpaceUrls, $page]) => {
|
const roomsByUrl = new Map<string, string>()
|
||||||
const urls = Array.from($userSpaceUrls)
|
|
||||||
|
const unsubscribe = store.subscribe(([$userGroupList, $page]) => {
|
||||||
|
const urls = new Set(getSpaceUrlsFromGroupList($userGroupList))
|
||||||
|
|
||||||
if ($page.params.relay) {
|
if ($page.params.relay) {
|
||||||
urls.push(decodeRelay($page.params.relay))
|
urls.add(decodeRelay($page.params.relay))
|
||||||
}
|
}
|
||||||
|
|
||||||
// stop syncing removed spaces
|
// Stop syncing removed spaces
|
||||||
for (const [url, unsubscribe] of unsubscribersByUrl.entries()) {
|
for (const [url, unsubscribe] of unsubscribersByUrl.entries()) {
|
||||||
if (!urls.includes(url)) {
|
if (!urls.has(url)) {
|
||||||
unsubscribersByUrl.delete(url)
|
unsubscribersByUrl.delete(url)
|
||||||
|
roomsByUrl.delete(url)
|
||||||
unsubscribe()
|
unsubscribe()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start syncing newly added spaces
|
// Start or restart syncing for each space
|
||||||
for (const url of urls) {
|
for (const url of urls) {
|
||||||
if (!unsubscribersByUrl.has(url)) {
|
const rooms = getSpaceRoomsFromGroupList(url, $userGroupList)
|
||||||
unsubscribersByUrl.set(url, syncSpace(url))
|
const roomsKey = rooms.join(",")
|
||||||
}
|
|
||||||
|
if (unsubscribersByUrl.has(url) && roomsByUrl.get(url) === roomsKey) continue
|
||||||
|
|
||||||
|
// Tear down existing sync if rooms changed
|
||||||
|
unsubscribersByUrl.get(url)?.()
|
||||||
|
|
||||||
|
roomsByUrl.set(url, roomsKey)
|
||||||
|
unsubscribersByUrl.set(url, syncSpace(url, rooms))
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user