forked from coracle/flotilla
Make space syncing more robust
This commit is contained in:
+70
-71
@@ -1,7 +1,7 @@
|
|||||||
import {page} from "$app/stores"
|
import {page} from "$app/stores"
|
||||||
import type {Unsubscriber} from "svelte/store"
|
import type {Unsubscriber} from "svelte/store"
|
||||||
import {derived, get} from "svelte/store"
|
import {derived, get} from "svelte/store"
|
||||||
import {last, call, assoc, chunk, sleep, identity, WEEK, ago} from "@welshman/lib"
|
import {last, call, ifLet, assoc, chunk, sleep, identity, WEEK, ago} from "@welshman/lib"
|
||||||
import {
|
import {
|
||||||
getListTags,
|
getListTags,
|
||||||
getRelayTagValues,
|
getRelayTagValues,
|
||||||
@@ -13,14 +13,14 @@ import {
|
|||||||
ROOM_ADD_MEMBER,
|
ROOM_ADD_MEMBER,
|
||||||
ROOM_REMOVE_MEMBER,
|
ROOM_REMOVE_MEMBER,
|
||||||
ROOM_CREATE_PERMISSION,
|
ROOM_CREATE_PERMISSION,
|
||||||
ROOM_JOIN,
|
|
||||||
RELAY_MEMBERS,
|
RELAY_MEMBERS,
|
||||||
RELAY_ADD_MEMBER,
|
RELAY_ADD_MEMBER,
|
||||||
RELAY_REMOVE_MEMBER,
|
RELAY_REMOVE_MEMBER,
|
||||||
isSignedEvent,
|
isSignedEvent,
|
||||||
unionFilters,
|
unionFilters,
|
||||||
|
getTagValue,
|
||||||
} from "@welshman/util"
|
} from "@welshman/util"
|
||||||
import type {Filter} from "@welshman/util"
|
import type {Filter, TrustedEvent} from "@welshman/util"
|
||||||
import {request, requestOne, Difference, DifferenceEvent} from "@welshman/net"
|
import {request, requestOne, Difference, DifferenceEvent} from "@welshman/net"
|
||||||
import {
|
import {
|
||||||
pubkey,
|
pubkey,
|
||||||
@@ -64,12 +64,24 @@ type SyncOpts = {
|
|||||||
url: string
|
url: string
|
||||||
signal: AbortSignal
|
signal: AbortSignal
|
||||||
filters: Filter[]
|
filters: Filter[]
|
||||||
|
onEvent?: (event: TrustedEvent) => void
|
||||||
}
|
}
|
||||||
|
|
||||||
const pullOneWithFallback = async (url: string, filter: Filter, signal: AbortSignal) => {
|
const pullOneWithFallback = async (
|
||||||
|
url: string,
|
||||||
|
filter: Filter,
|
||||||
|
signal: AbortSignal,
|
||||||
|
onEvent?: (event: TrustedEvent) => void,
|
||||||
|
) => {
|
||||||
const cachedEvents = repository.query([filter]).filter(isSignedEvent)
|
const cachedEvents = repository.query([filter]).filter(isSignedEvent)
|
||||||
const since = last(cachedEvents.slice(10))?.created_at || 0
|
const since = last(cachedEvents.slice(10))?.created_at || 0
|
||||||
|
|
||||||
|
if (onEvent) {
|
||||||
|
for (const event of cachedEvents) {
|
||||||
|
onEvent(event)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
const shouldFallback =
|
const shouldFallback =
|
||||||
!hasNegentropy(url) ||
|
!hasNegentropy(url) ||
|
||||||
(await new Promise(resolve => {
|
(await new Promise(resolve => {
|
||||||
@@ -81,7 +93,7 @@ const pullOneWithFallback = async (url: string, filter: Filter, signal: AbortSig
|
|||||||
|
|
||||||
diff.on(DifferenceEvent.Close, () => {
|
diff.on(DifferenceEvent.Close, () => {
|
||||||
for (const ids of chunk(100, Array.from(diff.need))) {
|
for (const ids of chunk(100, Array.from(diff.need))) {
|
||||||
requestOne({relay: url, signal, autoClose: true, filters: [{ids}]})
|
requestOne({relay: url, signal, autoClose: true, filters: [{ids}], onEvent})
|
||||||
}
|
}
|
||||||
|
|
||||||
resolve(false)
|
resolve(false)
|
||||||
@@ -89,29 +101,29 @@ const pullOneWithFallback = async (url: string, filter: Filter, signal: AbortSig
|
|||||||
}))
|
}))
|
||||||
|
|
||||||
if (shouldFallback && !signal.aborted) {
|
if (shouldFallback && !signal.aborted) {
|
||||||
request({relays: [url], signal, autoClose: true, filters: [{...filter, since}]})
|
request({relays: [url], signal, autoClose: true, filters: [{since, ...filter}], onEvent})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
export const pullWithFallback = async ({url, signal, filters}: SyncOpts) => {
|
export const pullWithFallback = async ({url, signal, filters, onEvent}: SyncOpts) => {
|
||||||
await loadRelay(url)
|
await loadRelay(url)
|
||||||
|
|
||||||
if (signal.aborted) return
|
if (signal.aborted) return
|
||||||
|
|
||||||
for (const filter of filters) {
|
for (const filter of filters) {
|
||||||
pullOneWithFallback(url, filter, signal)
|
pullOneWithFallback(url, filter, signal, onEvent)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
const listen = ({url, signal, filters}: SyncOpts) => {
|
const listen = ({url, signal, filters, onEvent}: SyncOpts) => {
|
||||||
const relays = [url]
|
const relays = [url]
|
||||||
|
|
||||||
request({relays, signal, filters: unionFilters(filters.map(assoc("limit", 0)))})
|
request({relays, signal, filters: unionFilters(filters.map(assoc("limit", 0))), onEvent})
|
||||||
}
|
}
|
||||||
|
|
||||||
const pullAndListen = ({url, filters, signal}: SyncOpts) => {
|
const pullAndListen = (options: SyncOpts) => {
|
||||||
pullWithFallback({url, signal, filters})
|
pullWithFallback(options)
|
||||||
listen({url, signal, filters})
|
listen(options)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Relays
|
// Relays
|
||||||
@@ -256,68 +268,55 @@ const syncUserData = () => {
|
|||||||
// Spaces
|
// Spaces
|
||||||
|
|
||||||
const syncSpace = (url: string, rooms: string[]) => {
|
const syncSpace = (url: string, rooms: string[]) => {
|
||||||
|
const since = ago(WEEK)
|
||||||
|
const seen = new Set<string>()
|
||||||
const controller = new AbortController()
|
const controller = new AbortController()
|
||||||
|
|
||||||
// Relay-level kinds don't need #h tags
|
const pullRoomContent = (room: string) => {
|
||||||
pullAndListen({
|
if (!seen.has(room)) {
|
||||||
url,
|
seen.add(room)
|
||||||
signal: controller.signal,
|
pullAndListen({
|
||||||
filters: [{kinds: [RELAY_MEMBERS, RELAY_ADD_MEMBER, RELAY_REMOVE_MEMBER]}],
|
url,
|
||||||
})
|
signal: controller.signal,
|
||||||
|
filters: [
|
||||||
// Room metadata uses #d tags, not #h, so no filtering needed
|
{kinds: MESSAGE_KINDS, since, "#h": [room]},
|
||||||
pullAndListen({
|
makeCommentFilter(CONTENT_KINDS, {since, "#h": [room]}),
|
||||||
url,
|
],
|
||||||
signal: controller.signal,
|
})
|
||||||
filters: [{kinds: [ROOM_META, ROOM_ADMINS, ROOM_MEMBERS, LIVEKIT_PARTICIPANTS, ROOM_JOIN]}],
|
}
|
||||||
})
|
|
||||||
|
|
||||||
// Room-scoped kinds: add #h tags when we know which rooms the user is in.
|
|
||||||
// This avoids sending broad filters that picky relays reject.
|
|
||||||
const roomKinds = [ROOM_DELETE, ROOM_ADD_MEMBER, ROOM_REMOVE_MEMBER]
|
|
||||||
const since = ago(WEEK)
|
|
||||||
|
|
||||||
if (rooms.length > 0) {
|
|
||||||
pullAndListen({
|
|
||||||
url,
|
|
||||||
signal: controller.signal,
|
|
||||||
filters: [{kinds: roomKinds, "#h": rooms}],
|
|
||||||
})
|
|
||||||
|
|
||||||
pullAndListen({
|
|
||||||
url,
|
|
||||||
signal: controller.signal,
|
|
||||||
filters: [
|
|
||||||
{kinds: MESSAGE_KINDS, "#h": rooms, since},
|
|
||||||
makeCommentFilter(CONTENT_KINDS, {"#h": rooms, since}),
|
|
||||||
],
|
|
||||||
})
|
|
||||||
|
|
||||||
listen({
|
|
||||||
url,
|
|
||||||
signal: controller.signal,
|
|
||||||
filters: [{kinds: REACTION_KINDS, "#h": rooms}],
|
|
||||||
})
|
|
||||||
} else {
|
|
||||||
pullAndListen({
|
|
||||||
url,
|
|
||||||
signal: controller.signal,
|
|
||||||
filters: [{kinds: roomKinds}],
|
|
||||||
})
|
|
||||||
|
|
||||||
pullAndListen({
|
|
||||||
url,
|
|
||||||
signal: controller.signal,
|
|
||||||
filters: [{kinds: MESSAGE_KINDS, since}, makeCommentFilter(CONTENT_KINDS, {since})],
|
|
||||||
})
|
|
||||||
|
|
||||||
listen({
|
|
||||||
url,
|
|
||||||
signal: controller.signal,
|
|
||||||
filters: [{kinds: REACTION_KINDS}],
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
for (const room of rooms) {
|
||||||
|
pullRoomContent(room)
|
||||||
|
}
|
||||||
|
|
||||||
|
const relayKinds = [RELAY_MEMBERS, RELAY_ADD_MEMBER, RELAY_REMOVE_MEMBER]
|
||||||
|
const roomMetaKinds = [ROOM_META, ROOM_ADMINS, ROOM_MEMBERS, LIVEKIT_PARTICIPANTS]
|
||||||
|
const roomMemberKinds = [ROOM_DELETE, ROOM_ADD_MEMBER, ROOM_REMOVE_MEMBER]
|
||||||
|
|
||||||
|
pullAndListen({
|
||||||
|
url,
|
||||||
|
signal: controller.signal,
|
||||||
|
filters: [
|
||||||
|
{kinds: relayKinds},
|
||||||
|
{kinds: roomMetaKinds},
|
||||||
|
{kinds: roomMemberKinds},
|
||||||
|
{kinds: MESSAGE_KINDS, since},
|
||||||
|
makeCommentFilter(CONTENT_KINDS, {since}),
|
||||||
|
],
|
||||||
|
onEvent: event => {
|
||||||
|
if (event.kind === ROOM_META) {
|
||||||
|
ifLet(getTagValue("d", event.tags), pullRoomContent)
|
||||||
|
}
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
|
listen({
|
||||||
|
url,
|
||||||
|
signal: controller.signal,
|
||||||
|
filters: [{kinds: REACTION_KINDS}],
|
||||||
|
})
|
||||||
|
|
||||||
return () => controller.abort()
|
return () => controller.abort()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user