Refactor synchronization logic

This commit is contained in:
Jon Staab
2025-10-06 11:23:19 -07:00
committed by hodlbod
parent d0491ed202
commit e0099141aa
17 changed files with 357 additions and 375 deletions
+163 -31
View File
@@ -1,19 +1,31 @@
import {page} from "$app/stores"
import type {Unsubscriber} from "svelte/store"
import {derived, get} from "svelte/store"
import {call, chunk, sleep, now, identity, WEEK, ago} from "@welshman/lib"
import {
partition,
call,
sortBy,
assoc,
chunk,
sleep,
now,
identity,
WEEK,
MONTH,
ago,
} from "@welshman/lib"
import {
getListTags,
getRelayTagValues,
WRAP,
MESSAGE,
ZAP_GOAL,
THREAD,
EVENT_TIME,
COMMENT,
ROOM_META,
ROOM_ADD_USER,
ROOM_REMOVE_USER,
isSignedEvent,
normalizeRelayUrl,
} from "@welshman/util"
import {request, pull} from "@welshman/net"
import type {Filter, TrustedEvent} from "@welshman/util"
import {request, load, pull} from "@welshman/net"
import {
pubkey,
loadRelay,
@@ -27,17 +39,54 @@ import {
loadMutes,
loadProfile,
repository,
hasNegentropy,
} from "@welshman/app"
import {
MESSAGE_FILTER,
COMMENT_FILTER,
INDEXER_RELAYS,
REACTION_KINDS,
canDecrypt,
loadSettings,
userMembership,
defaultPubkeys,
decodeRelay,
loadMembership,
getUrlsForEvent,
} from "@app/core/state"
import {loadAlerts, loadAlertStatuses} from "@app/core/requests"
import {hasBlossomSupport} from "@app/core/commands"
// Utils
type PullOpts = {
relays: string[]
filters: Filter[]
signal: AbortSignal
}
const pullConservatively = ({relays, filters, signal}: PullOpts) => {
const $getUrlsForEvent = get(getUrlsForEvent)
const [smart, dumb] = partition(hasNegentropy, relays)
const events = repository.query(filters, {shouldSort: false}).filter(isSignedEvent)
const promises: Promise<TrustedEvent[]>[] = [pull({relays: smart, filters, signal, events})]
// Since pulling from relays without negentropy is expensive, limit how many
// duplicates we repeatedly download
for (const url of dumb) {
const urlEvents = events.filter(e => $getUrlsForEvent(e.id).includes(url))
if (urlEvents.length >= 100) {
filters = filters.map(assoc("since", sortBy(e => -e.created_at, urlEvents)[10]!.created_at))
}
promises.push(load({relays: [url], filters, signal}))
}
return Promise.all(promises)
}
// Relays
const syncRelays = () => {
for (const url of INDEXER_RELAYS) {
@@ -46,7 +95,10 @@ const syncRelays = () => {
const unsubscribePage = page.subscribe($page => {
if ($page.params.relay) {
loadRelay(decodeRelay($page.params.relay))
const url = decodeRelay($page.params.relay)
loadRelay(url)
hasBlossomSupport(url)
}
})
@@ -62,6 +114,8 @@ const syncRelays = () => {
}
}
// User data
const syncUserData = () => {
const unsubscribePubkey = pubkey.subscribe($pubkey => {
if ($pubkey) {
@@ -107,40 +161,40 @@ const syncUserData = () => {
}
}
const syncSpace = (url: string) => {
// Memberships
const syncMembership = (url: string) => {
const controller = new AbortController()
// Load historical data
pull({
// Load group metadata
pullConservatively({
relays: [url],
signal: controller.signal,
filters: [{kinds: [ZAP_GOAL, EVENT_TIME, THREAD, MESSAGE, COMMENT]}],
events: repository
.query([{kinds: [ZAP_GOAL, EVENT_TIME, THREAD, MESSAGE, COMMENT]}])
.filter(isSignedEvent),
filters: [{kinds: [ROOM_META]}],
})
// Load new events
// Load historical data from up to a month ago for quick page loading
pullConservatively({
relays: [url],
signal: controller.signal,
filters: [MESSAGE_FILTER, COMMENT_FILTER].map(assoc("since", ago(MONTH))),
})
// Listen for new events
request({
relays: [url],
signal: controller.signal,
filters: [{kinds: [ZAP_GOAL, EVENT_TIME, THREAD, MESSAGE, COMMENT], since: now()}],
filters: [MESSAGE_FILTER, COMMENT_FILTER].map(assoc("since", now())),
})
return () => controller.abort()
}
const syncSpaces = () => {
const syncMemberships = () => {
const unsubscribersByUrl = new Map<string, Unsubscriber>()
const unsubscribeMembership = userMembership.subscribe($l => {
const urls = getRelayTagValues(getListTags($l))
// Start syncing newly added spaces
for (const url of urls) {
if (!unsubscribersByUrl.has(url)) {
unsubscribersByUrl.set(url, syncSpace(url))
}
}
const unsubscribeMembership = userMembership.subscribe($l => {
const urls = getRelayTagValues(getListTags($l)).map(normalizeRelayUrl)
// stop syncing removed spaces
for (const [url, unsubscribe] of unsubscribersByUrl.entries()) {
@@ -149,6 +203,13 @@ const syncSpaces = () => {
unsubscribe()
}
}
// Start syncing newly added spaces
for (const url of urls) {
if (!unsubscribersByUrl.has(url)) {
unsubscribersByUrl.set(url, syncMembership(url))
}
}
})
return () => {
@@ -157,17 +218,80 @@ const syncSpaces = () => {
}
}
// Sync extra stuff for the current space
const syncSpace = (url: string) => {
const $pubkey = pubkey.get()
const controller = new AbortController()
// Load all membership changes for the current user
if ($pubkey) {
pullConservatively({
relays: [url],
signal: controller.signal,
filters: [
{
kinds: [ROOM_ADD_USER, ROOM_REMOVE_USER],
"#p": [$pubkey],
},
],
})
}
// Listen actively for all current membership changes, reports, reactions, zaps, etc
request({
relays: [url],
signal: controller.signal,
filters: [
{
kinds: [ROOM_ADD_USER, ROOM_REMOVE_USER, ...REACTION_KINDS],
since: now(),
},
],
})
return () => controller.abort()
}
const syncCurrentSpace = () => {
const unsubscribersByUrl = new Map<string, Unsubscriber>()
// Sync the space the user is currently visiting
const unsubscribePage = page.subscribe($page => {
if ($page.params.relay) {
const url = decodeRelay($page.params.relay)
if (!unsubscribersByUrl.has(url)) {
unsubscribersByUrl.set(url, syncSpace(url))
}
for (const [oldUrl, unsubscribe] of unsubscribersByUrl.entries()) {
if (url !== oldUrl) {
unsubscribersByUrl.delete(oldUrl)
unsubscribe()
}
}
} else {
Array.from(unsubscribersByUrl.values()).forEach(call)
}
})
return () => {
Array.from(unsubscribersByUrl.values()).forEach(call)
unsubscribePage()
}
}
// DMs
const syncDMRelay = (url: string, pubkey: string) => {
const controller = new AbortController()
// Load historical data
pull({
pullConservatively({
relays: [url],
signal: controller.signal,
filters: [{kinds: [WRAP], "#p": [pubkey], until: ago(WEEK, 2)}],
events: repository
.query([{kinds: [ZAP_GOAL, EVENT_TIME, THREAD, MESSAGE, COMMENT]}])
.filter(isSignedEvent),
})
// Load new events
@@ -243,8 +367,16 @@ const syncDMs = () => {
}
}
// Merge all synchronization functions
export const syncApplicationData = () => {
const unsubscribers = [syncRelays(), syncUserData(), syncSpaces(), syncDMs()]
const unsubscribers = [
syncRelays(),
syncUserData(),
syncMemberships(),
syncCurrentSpace(),
syncDMs(),
]
return () => unsubscribers.forEach(call)
}