diff --git a/src/app/commands.ts b/src/app/commands.ts index 030cb1c5..27c1189e 100644 --- a/src/app/commands.ts +++ b/src/app/commands.ts @@ -27,8 +27,8 @@ import { getRelayTagValues, } from "@welshman/util" import type {TrustedEvent, EventTemplate, List} from "@welshman/util" -import type {SubscribeRequestWithHandlers, Subscription} from "@welshman/net" -import {PublishStatus, AuthStatus, SocketStatus, SubscriptionEvent} from "@welshman/net" +import type {SubscribeRequestWithHandlers} from "@welshman/net" +import {PublishStatus, AuthStatus, SocketStatus} from "@welshman/net" import {Nip59, makeSecret, stamp, Nip46Broker} from "@welshman/signer" import { pubkey, @@ -51,7 +51,6 @@ import { nip44EncryptToSelf, loadRelay, addSession, - subscribe, clearStorage, dropSession, } from "@welshman/app" @@ -97,33 +96,6 @@ export const makeIMeta = (url: string, data: Record) => [ ...Object.entries(data).map(([k, v]) => [k, v].join(" ")), ] -export const subscribePersistent = (request: SubscribeRequestWithHandlers) => { - let sub: Subscription - let done = false - - const start = async () => { - // If the subscription gets closed quickly, don't start flapping - await Promise.all([ - sleep(30_000), - new Promise(resolve => { - sub = subscribe(request) - sub.emitter.on(SubscriptionEvent.Complete, resolve) - }), - ]) - - if (!done) { - start() - } - } - - start() - - return () => { - done = true - sub?.close() - } -} - export const getThunkError = async (thunk: Thunk) => { const result = await thunk.result const [{status, message}] = Object.values(result) as any diff --git a/src/app/components/MenuSpace.svelte b/src/app/components/MenuSpace.svelte index 6f9d3daa..f57675c1 100644 --- a/src/app/components/MenuSpace.svelte +++ b/src/app/components/MenuSpace.svelte @@ -1,7 +1,6 @@ diff --git a/src/app/components/ReactionSummary.svelte b/src/app/components/ReactionSummary.svelte index 3670291a..479793a0 100644 --- a/src/app/components/ReactionSummary.svelte +++ b/src/app/components/ReactionSummary.svelte @@ -1,7 +1,8 @@ diff --git a/src/app/requests.ts b/src/app/requests.ts new file mode 100644 index 00000000..be938bd5 --- /dev/null +++ b/src/app/requests.ts @@ -0,0 +1,106 @@ +import type {Unsubscriber} from "svelte/store" +import {sleep, partition, assoc, now, sortBy} from "@welshman/lib" +import {MESSAGE, DELETE, THREAD, COMMENT} from "@welshman/util" +import type {SubscribeRequestWithHandlers, Subscription} from "@welshman/net" +import {SubscriptionEvent} from "@welshman/net" +import type {AppSyncOpts} from "@welshman/app" +import {subscribe, load, pull, repository, hasNegentropy} from "@welshman/app" +import {userRoomsByUrl, LEGACY_MESSAGE, GENERAL} from "@app/state" + +// Utils + +export const pullConservatively = ({relays, filters}: AppSyncOpts) => { + const [smart, dumb] = partition(hasNegentropy, relays) + const promises = [pull({relays: smart, filters})] + + // Since pulling from relays without negentropy is expensive, limit how many + // duplicates we repeatedly download + if (dumb.length > 0) { + const events = sortBy(e => -e.created_at, repository.query(filters)) + + if (events.length > 100) { + filters = filters.map(assoc("since", events[10]!.created_at)) + } + + promises.push(pull({relays: dumb, filters})) + } + + return Promise.all(promises) +} + +export const subscribePersistent = (request: SubscribeRequestWithHandlers) => { + let sub: Subscription + let done = false + + const start = async () => { + // If the subscription gets closed quickly, don't start flapping + await Promise.all([ + sleep(30_000), + new Promise(resolve => { + sub = subscribe(request) + sub.emitter.on(SubscriptionEvent.Complete, resolve) + }), + ]) + + if (!done) { + start() + } + } + + start() + + return () => { + done = true + sub?.close() + } +} + +// Application requests + +export const listenForNotifications = () => { + const since = now() + const unsubscribers: Unsubscriber[] = [] + + for (const [url, rooms] of userRoomsByUrl.get()) { + load({ + relays: [url], + filters: [ + {kinds: [THREAD], limit: 1}, + {kinds: [COMMENT], "#K": [String(THREAD)], limit: 1}, + ...Array.from(rooms).map(room => ({kinds: [MESSAGE], "#h": [room], limit: 1})), + ], + }) + + unsubscribers.push( + subscribePersistent({ + relays: [url], + filters: [ + {kinds: [THREAD], since}, + {kinds: [COMMENT], "#K": [String(THREAD)], since}, + {kinds: [MESSAGE], "#h": Array.from(rooms), since}, + ], + }), + ) + } + + return () => { + for (const unsubscribe of unsubscribers) { + unsubscribe() + } + } +} + +export const listenForChannelMessages = (url: string, room: string) => { + const since = now() + const relays = [url] + const legacyRoom = room === GENERAL ? "general" : room + + // Load legacy immediate so our request doesn't get rejected by nip29 relays + load({relays, filters: [{kinds: [LEGACY_MESSAGE], "#~": [legacyRoom]}], delay: 0}) + + // Load historical state with negentropy if available + pullConservatively({relays, filters: [{kinds: [MESSAGE, DELETE], "#h": [room]}]}) + + // Listen for new messages + return subscribePersistent({relays, filters: [{kinds: [MESSAGE, DELETE], "#h": [room], since}]}) +} diff --git a/src/app/state.ts b/src/app/state.ts index 59fad0a2..765cbb4d 100644 --- a/src/app/state.ts +++ b/src/app/state.ts @@ -5,11 +5,9 @@ import { ctx, setContext, remove, - assoc, sortBy, sort, uniq, - partition, nth, pushToMapKey, nthEq, @@ -17,6 +15,7 @@ import { parseJson, fromPairs, memoize, + addToMapKey, } from "@welshman/lib" import { getIdFilters, @@ -57,15 +56,13 @@ import { relay, getSession, getSigner, - hasNegentropy, - pull, createSearch, userFollows, ensurePlaintext, thunks, walkThunks, } from "@welshman/app" -import type {AppSyncOpts, Thunk} from "@welshman/app" +import type {Thunk} from "@welshman/app" import type {SubscribeRequestWithHandlers} from "@welshman/net" import {deriveEvents, deriveEventsMapped, withGetter, synced} from "@welshman/store" @@ -209,25 +206,6 @@ export const ensureUnwrapped = async (event: TrustedEvent) => { return rumor } -export const pullConservatively = ({relays, filters}: AppSyncOpts) => { - const [smart, dumb] = partition(hasNegentropy, relays) - const promises = [pull({relays: smart, filters})] - - // Since pulling from relays without negentropy is expensive, limit how many - // duplicates we repeatedly download - if (dumb.length > 0) { - const events = sortBy(e => -e.created_at, repository.query(filters)) - - if (events.length > 100) { - filters = filters.map(assoc("since", events[100]!.created_at)) - } - - promises.push(pull({relays: dumb, filters})) - } - - return Promise.all(promises) -} - export const trackerStore = makeTrackerStore() export const deriveEvent = (idOrAddress: string, hints: string[] = []) => { @@ -382,10 +360,7 @@ export const { store: memberships, getKey: list => list.event.pubkey, load: (pubkey: string, request: Partial = {}) => - load({ - ...request, - filters: [{kinds: [GROUPS], authors: [pubkey]}], - }), + load({...request, filters: [{kinds: [GROUPS], authors: [pubkey]}]}), }) // Chats @@ -614,11 +589,26 @@ export const userMembership = withGetter( }), ) +export const userRoomsByUrl = withGetter( + derived(userMembership, $userMembership => { + const $userRoomsByUrl = new Map>() + + for (const [_, room, url] of getGroupTags(getListTags($userMembership))) { + addToMapKey($userRoomsByUrl, url, room) + } + + for (const url of $userRoomsByUrl.keys()) { + addToMapKey($userRoomsByUrl, url, GENERAL) + } + + return $userRoomsByUrl + }), +) + export const deriveUserRooms = (url: string) => - derived(userMembership, $userMembership => [ - GENERAL, - ...sortBy(roomComparator(url), getMembershipRoomsByUrl(url, $userMembership)), - ]) + derived(userRoomsByUrl, $userRoomsByUrl => + sortBy(roomComparator(url), Array.from($userRoomsByUrl.get(url) || [])), + ) export const deriveOtherRooms = (url: string) => derived([deriveUserRooms(url), channelsByUrl], ([$userRooms, $channelsByUrl]) => diff --git a/src/routes/+layout.svelte b/src/routes/+layout.svelte index 7760e7c8..855ecbed 100644 --- a/src/routes/+layout.svelte +++ b/src/routes/+layout.svelte @@ -5,7 +5,7 @@ import {get, derived} from "svelte/store" import {dev} from "$app/environment" import {bytesToHex, hexToBytes} from "@noble/hashes/utils" - import {identity, uniq, sleep, take, sortBy, ago, now, HOUR, WEEK, Worker} from "@welshman/lib" + import {identity, sleep, take, sortBy, ago, now, HOUR, WEEK, Worker} from "@welshman/lib" import type {TrustedEvent} from "@welshman/util" import { PROFILE, @@ -15,9 +15,6 @@ RELAYS, INBOX_RELAYS, WRAP, - MESSAGE, - COMMENT, - THREAD, getPubkeyTagValues, getListTags, } from "@welshman/util" @@ -39,7 +36,6 @@ dropSession, getRelayUrls, userInboxRelaySelections, - load, } from "@welshman/app" import * as lib from "@welshman/lib" import * as util from "@welshman/util" @@ -51,17 +47,11 @@ import {setupTracking} from "@app/tracking" import {setupAnalytics} from "@app/analytics" import {theme} from "@app/theme" - import { - INDEXER_RELAYS, - getMembershipUrls, - getMembershipRooms, - userMembership, - ensureUnwrapped, - canDecrypt, - GENERAL, - } from "@app/state" - import {loadUserData, subscribePersistent} from "@app/commands" + import {INDEXER_RELAYS, userMembership, ensureUnwrapped, canDecrypt} from "@app/state" + import {loadUserData} from "@app/commands" + import {subscribePersistent, listenForNotifications} from "@app/requests" import * as commands from "@app/commands" + import * as requests from "@app/requests" import {checked} from "@app/notifications" import * as notifications from "@app/notifications" import * as state from "@app/state" @@ -86,6 +76,7 @@ ...app, ...state, ...commands, + ...requests, ...notifications, }) @@ -199,30 +190,7 @@ userMembership.subscribe($membership => { unsubSpaces?.() - - const since = ago(30) - const rooms = uniq(getMembershipRooms($membership).map(m => m.room)).concat(GENERAL) - const relays = uniq(getMembershipUrls($membership)) - - // Get one event for each of our notification categories - load({ - relays, - filters: [ - {kinds: [THREAD], limit: 1}, - {kinds: [COMMENT], "#K": [String(THREAD)], limit: 1}, - ...rooms.map(room => ({kinds: [MESSAGE], "#h": [room], limit: 1})), - ], - }) - - // Listen for new notifications/memberships - unsubSpaces = subscribePersistent({ - relays, - filters: [ - {kinds: [THREAD], since}, - {kinds: [COMMENT], "#K": [String(THREAD)], since}, - {kinds: [MESSAGE], "#h": rooms, since}, - ], - }) + unsubSpaces = listenForNotifications() }) // Listen for chats, populate chat-based notifications diff --git a/src/routes/chat/+layout.svelte b/src/routes/chat/+layout.svelte index a33d1955..0b69b52d 100644 --- a/src/routes/chat/+layout.svelte +++ b/src/routes/chat/+layout.svelte @@ -12,7 +12,8 @@ import SecondaryNavSection from "@lib/components/SecondaryNavSection.svelte" import ChatStart from "@app/components/ChatStart.svelte" import ChatItem from "@app/components/ChatItem.svelte" - import {chatSearch, pullConservatively} from "@app/state" + import {chatSearch} from "@app/state" + import {pullConservatively} from "@app/requests" import {pushModal} from "@app/modal" const startChat = () => pushModal(ChatStart) diff --git a/src/routes/spaces/[relay]/[room]/+page.svelte b/src/routes/spaces/[relay]/[room]/+page.svelte index 07c3daa2..e1cd86d9 100644 --- a/src/routes/spaces/[relay]/[room]/+page.svelte +++ b/src/routes/spaces/[relay]/[room]/+page.svelte @@ -5,11 +5,11 @@ import {derived} from "svelte/store" import type {Editor} from "svelte-tiptap" import {page} from "$app/stores" - import {sleep, now, ctx} from "@welshman/lib" + import {sleep, ctx} from "@welshman/lib" import type {TrustedEvent, EventContent} from "@welshman/util" import {throttled} from "@welshman/store" - import {createEvent, DELETE, MESSAGE} from "@welshman/util" - import {formatTimestampAsDate, load, publishThunk, deriveRelay} from "@welshman/app" + import {createEvent, MESSAGE} from "@welshman/util" + import {formatTimestampAsDate, publishThunk, deriveRelay} from "@welshman/app" import {slide} from "@lib/transition" import {createScroller, type Scroller} from "@lib/html" import Icon from "@lib/components/Icon.svelte" @@ -22,7 +22,6 @@ import ChannelMessage from "@app/components/ChannelMessage.svelte" import ChannelCompose from "@app/components/ChannelCompose.svelte" import { - pullConservatively, userSettingValues, userMembership, decodeRelay, @@ -34,13 +33,8 @@ displayChannel, } from "@app/state" import {setChecked} from "@app/notifications" - import { - nip29, - addRoomMembership, - removeRoomMembership, - getThunkError, - subscribePersistent, - } from "@app/commands" + import {nip29, addRoomMembership, removeRoomMembership, getThunkError} from "@app/commands" + import {listenForChannelMessages} from "@app/requests" import {PROTECTED} from "@app/state" import {popKey} from "@app/implicit" import {pushToast} from "@app/toast" @@ -96,7 +90,7 @@ delay: $userSettingValues.send_delay, }) - let limit = 15 + let limit = 30 let loading = true let unsub: () => void let element: HTMLElement @@ -135,32 +129,16 @@ // Sveltekiiit await sleep(100) - if (!nip29.isSupported($relay)) { - load({ - delay: 0, - relays: [url], - filters: [{kinds: [LEGACY_MESSAGE], "#~": [legacyRoom]}], - }) - } - - pullConservatively({ - relays: [url], - filters: [{kinds: [MESSAGE, DELETE], "#h": [room]}], - }) - scroller = createScroller({ element, delay: 300, threshold: 3000, onScroll: () => { - limit += 15 + limit += 30 }, }) - unsub = subscribePersistent({ - relays: [url], - filters: [{kinds: [MESSAGE], "#h": [room], since: now()}], - }) + unsub = listenForChannelMessages(url, room) }) onDestroy(() => { diff --git a/src/routes/spaces/[relay]/calendar/+page.svelte b/src/routes/spaces/[relay]/calendar/+page.svelte index 88e39ad2..6ce88596 100644 --- a/src/routes/spaces/[relay]/calendar/+page.svelte +++ b/src/routes/spaces/[relay]/calendar/+page.svelte @@ -14,7 +14,8 @@ import EventItem from "@app/components/EventItem.svelte" import EventCreate from "@app/components/EventCreate.svelte" import {pushModal} from "@app/modal" - import {deriveEventsForUrl, pullConservatively, decodeRelay} from "@app/state" + import {deriveEventsForUrl, decodeRelay} from "@app/state" + import {pullConservatively} from "@app/requests" import {setChecked} from "@app/notifications" const url = decodeRelay($page.params.relay) diff --git a/src/routes/spaces/[relay]/threads/[id]/+page.svelte b/src/routes/spaces/[relay]/threads/[id]/+page.svelte index 23242f59..bdaad3e8 100644 --- a/src/routes/spaces/[relay]/threads/[id]/+page.svelte +++ b/src/routes/spaces/[relay]/threads/[id]/+page.svelte @@ -15,7 +15,7 @@ import ThreadActions from "@app/components/ThreadActions.svelte" import ThreadReply from "@app/components/ThreadReply.svelte" import {deriveEvent, decodeRelay} from "@app/state" - import {subscribePersistent} from "@app/commands" + import {subscribePersistent} from "@app/requests" import {setChecked} from "@app/notifications" const {relay, id} = $page.params