Add since to reqs that get re-opened to avoid re-downloading and gaps in active subscriptions. Also add some getters for event stores and event sort utils
This commit is contained in:
@@ -207,7 +207,6 @@ export const round = (precision: number, x: number) =>
|
|||||||
// ----------------------------------------------------------------------------
|
// ----------------------------------------------------------------------------
|
||||||
|
|
||||||
/** One minute in seconds */
|
/** One minute in seconds */
|
||||||
|
|
||||||
export const MINUTE = 60
|
export const MINUTE = 60
|
||||||
|
|
||||||
/** One hour in seconds */
|
/** One hour in seconds */
|
||||||
|
|||||||
@@ -368,7 +368,7 @@ describe("policy", () => {
|
|||||||
socket.emit(SocketEvent.Status, SocketStatus.Closed)
|
socket.emit(SocketEvent.Status, SocketStatus.Closed)
|
||||||
|
|
||||||
// Advance past the reopen delay
|
// Advance past the reopen delay
|
||||||
await vi.advanceTimersByTimeAsync(30000)
|
await vi.advanceTimersByTimeAsync(31000)
|
||||||
|
|
||||||
// Should resend the pending event
|
// Should resend the pending event
|
||||||
expect(sendSpy).toHaveBeenCalledWith(event)
|
expect(sendSpy).toHaveBeenCalledWith(event)
|
||||||
|
|||||||
@@ -1,5 +1,5 @@
|
|||||||
import {on, ms, nthNe, always, call, sleep, ago, now} from "@welshman/lib"
|
import {on, ms, omit, nthNe, always, call, sleep, ago, now} from "@welshman/lib"
|
||||||
import {RELAY_JOIN, StampedEvent, SignedEvent} from "@welshman/util"
|
import {RELAY_JOIN, StampedEvent, SignedEvent, Filter} from "@welshman/util"
|
||||||
import {
|
import {
|
||||||
ClientMessage,
|
ClientMessage,
|
||||||
isClientAuth,
|
isClientAuth,
|
||||||
@@ -142,11 +142,29 @@ export const socketPolicyCloseInactive = (socket: Socket) => {
|
|||||||
|
|
||||||
// If the socket closed and we have no error, reopen it but don't flap
|
// If the socket closed and we have no error, reopen it but don't flap
|
||||||
if (isClosed && pending.size) {
|
if (isClosed && pending.size) {
|
||||||
sleep(Math.max(0, ms(5 - (now() - lastOpen)))).then(() => {
|
const since = now()
|
||||||
|
const delay = Math.max(0, ms(5 - (now() - lastOpen)))
|
||||||
|
|
||||||
|
sleep(delay).then(() => {
|
||||||
socket.attemptToOpen()
|
socket.attemptToOpen()
|
||||||
|
|
||||||
for (const message of pending.values()) {
|
for (const message of pending.values()) {
|
||||||
socket.send(message)
|
// Add since to avoid re-downloading stuff on reconnect. If limit=0, remove it to catch up
|
||||||
|
if (isClientReq(message) && delay > 0) {
|
||||||
|
const filters: Filter[] = []
|
||||||
|
|
||||||
|
for (let filter of message.slice(2) as Filter[]) {
|
||||||
|
if (filter.limit === 0) {
|
||||||
|
filter = omit(["limit"], filter)
|
||||||
|
}
|
||||||
|
|
||||||
|
filters.push({...filter, since})
|
||||||
|
}
|
||||||
|
|
||||||
|
socket.send([...message.slice(0, 2), ...filters])
|
||||||
|
} else {
|
||||||
|
socket.send(message)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,6 +1,13 @@
|
|||||||
import {readable, Readable} from "svelte/store"
|
import {readable, Readable} from "svelte/store"
|
||||||
import {on, assoc, now, mapPop, Maybe, MaybeAsync, call, sortBy, first} from "@welshman/lib"
|
import {on, assoc, now, mapPop, Maybe, MaybeAsync, call, sortBy, first} from "@welshman/lib"
|
||||||
import {matchFilters, getIdFilters, Filter, TrustedEvent} from "@welshman/util"
|
import {
|
||||||
|
matchFilters,
|
||||||
|
getIdFilters,
|
||||||
|
Filter,
|
||||||
|
TrustedEvent,
|
||||||
|
sortEventsAsc,
|
||||||
|
sortEventsDesc,
|
||||||
|
} from "@welshman/util"
|
||||||
import {Repository, RepositoryUpdate, Tracker} from "@welshman/net"
|
import {Repository, RepositoryUpdate, Tracker} from "@welshman/net"
|
||||||
import {deriveDeduplicated} from "./misc.js"
|
import {deriveDeduplicated} from "./misc.js"
|
||||||
|
|
||||||
@@ -8,23 +15,25 @@ import {deriveDeduplicated} from "./misc.js"
|
|||||||
|
|
||||||
export type EventsById = Map<string, TrustedEvent>
|
export type EventsById = Map<string, TrustedEvent>
|
||||||
|
|
||||||
export type DeriveEventsByIdOptions = {
|
export type EventsByIdOptions = {
|
||||||
filters: Filter[]
|
filters: Filter[]
|
||||||
repository: Repository
|
repository: Repository
|
||||||
includeDeleted?: boolean
|
includeDeleted?: boolean
|
||||||
}
|
}
|
||||||
|
|
||||||
export const deriveEventsById = ({
|
export const getEventsById = ({filters, repository, includeDeleted}: EventsByIdOptions) => {
|
||||||
filters,
|
|
||||||
repository,
|
|
||||||
includeDeleted,
|
|
||||||
}: DeriveEventsByIdOptions) => {
|
|
||||||
const eventsById = new Map<string, TrustedEvent>()
|
const eventsById = new Map<string, TrustedEvent>()
|
||||||
|
|
||||||
return readable(eventsById, set => {
|
for (const event of repository.query(filters, {includeDeleted})) {
|
||||||
for (const event of repository.query(filters, {includeDeleted})) {
|
eventsById.set(event.id, event)
|
||||||
eventsById.set(event.id, event)
|
}
|
||||||
}
|
|
||||||
|
return eventsById
|
||||||
|
}
|
||||||
|
|
||||||
|
export const deriveEventsById = ({filters, repository, includeDeleted}: EventsByIdOptions) =>
|
||||||
|
readable<EventsById>(new Map(), set => {
|
||||||
|
const eventsById = getEventsById({filters, repository, includeDeleted})
|
||||||
|
|
||||||
set(eventsById)
|
set(eventsById)
|
||||||
|
|
||||||
@@ -49,28 +58,23 @@ export const deriveEventsById = ({
|
|||||||
}
|
}
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
}
|
|
||||||
|
|
||||||
export const deriveArray = <T>(itemsByIdStore: Readable<Map<string, T>>) =>
|
export const deriveArray = <T>(itemsByIdStore: Readable<Map<string, T>>) =>
|
||||||
deriveDeduplicated(itemsByIdStore, itemsById => Array.from(itemsById.values()))
|
deriveDeduplicated(itemsByIdStore, itemsById => Array.from(itemsById.values()))
|
||||||
|
|
||||||
export const deriveEventsAsc = (eventsByIdStore: Readable<EventsById>) =>
|
export const deriveEventsAsc = (eventsByIdStore: Readable<EventsById>) =>
|
||||||
deriveDeduplicated(eventsByIdStore, eventsById => sortBy(e => e.created_at, eventsById.values()))
|
deriveDeduplicated(eventsByIdStore, eventsById => sortEventsAsc(eventsById.values()))
|
||||||
|
|
||||||
export const deriveEventsDesc = (eventsByIdStore: Readable<EventsById>) =>
|
export const deriveEventsDesc = (eventsByIdStore: Readable<EventsById>) =>
|
||||||
deriveDeduplicated(eventsByIdStore, eventsById => sortBy(e => -e.created_at, eventsById.values()))
|
deriveDeduplicated(eventsByIdStore, eventsById => sortEventsDesc(eventsById.values()))
|
||||||
|
|
||||||
export type DeriveEventOptions = {
|
export type EventOptions = {
|
||||||
repository: Repository
|
repository: Repository
|
||||||
includeDeleted?: boolean
|
includeDeleted?: boolean
|
||||||
onDerive?: (filters: Filter[], ...args: any[]) => void
|
onDerive?: (filters: Filter[], ...args: any[]) => void
|
||||||
}
|
}
|
||||||
|
|
||||||
export const makeDeriveEvent = ({
|
export const makeDeriveEvent = ({repository, includeDeleted = false, onDerive}: EventOptions) => {
|
||||||
repository,
|
|
||||||
includeDeleted = false,
|
|
||||||
onDerive,
|
|
||||||
}: DeriveEventOptions) => {
|
|
||||||
return (idOrAddress: string, ...args: any[]) => {
|
return (idOrAddress: string, ...args: any[]) => {
|
||||||
const filters = getIdFilters([idOrAddress])
|
const filters = getIdFilters([idOrAddress])
|
||||||
|
|
||||||
@@ -103,58 +107,75 @@ export const makeDeriveEvent = ({
|
|||||||
|
|
||||||
export type EventsByIdByUrl = Map<string, EventsById>
|
export type EventsByIdByUrl = Map<string, EventsById>
|
||||||
|
|
||||||
export type DeriveEventsByIdByUrlOptions = DeriveEventsByIdOptions & {
|
export type EventsByIdByUrlOptions = EventsByIdOptions & {
|
||||||
tracker: Tracker
|
tracker: Tracker
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export const getEventsByIdByUrl = ({
|
||||||
|
filters,
|
||||||
|
tracker,
|
||||||
|
repository,
|
||||||
|
includeDeleted,
|
||||||
|
}: EventsByIdByUrlOptions) => {
|
||||||
|
const eventsByIdByUrl: EventsByIdByUrl = new Map()
|
||||||
|
|
||||||
|
for (const event of repository.query(filters, {includeDeleted})) {
|
||||||
|
for (const url of tracker.getRelays(event.id)) {
|
||||||
|
let eventsById = eventsByIdByUrl.get(url)
|
||||||
|
if (!eventsById) {
|
||||||
|
eventsById = new Map()
|
||||||
|
eventsByIdByUrl.set(url, eventsById)
|
||||||
|
}
|
||||||
|
|
||||||
|
eventsById.set(event.id, event)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return eventsByIdByUrl
|
||||||
|
}
|
||||||
|
|
||||||
export const deriveEventsByIdByUrl = ({
|
export const deriveEventsByIdByUrl = ({
|
||||||
filters,
|
filters,
|
||||||
tracker,
|
tracker,
|
||||||
repository,
|
repository,
|
||||||
includeDeleted,
|
includeDeleted,
|
||||||
}: DeriveEventsByIdByUrlOptions) => {
|
}: EventsByIdByUrlOptions) =>
|
||||||
const eventsByIdByUrl: EventsByIdByUrl = new Map()
|
readable<EventsByIdByUrl>(new Map(), set => {
|
||||||
|
const eventsByIdByUrl = getEventsByIdByUrl({filters, tracker, repository, includeDeleted})
|
||||||
|
|
||||||
const addEvent = (url: string, event: TrustedEvent) => {
|
const addEvent = (url: string, event: TrustedEvent) => {
|
||||||
if (!matchFilters(filters, event)) return false
|
if (!matchFilters(filters, event)) return false
|
||||||
|
|
||||||
const eventsById = eventsByIdByUrl.get(url)
|
const eventsById = eventsByIdByUrl.get(url)
|
||||||
|
|
||||||
if (eventsById?.has(event.id)) return false
|
if (eventsById?.has(event.id)) return false
|
||||||
|
|
||||||
// Create a new map so we can detect which key changed
|
// Create a new map so we can detect which key changed
|
||||||
const newEventsById = new Map(eventsById)
|
const newEventsById = new Map(eventsById)
|
||||||
|
|
||||||
newEventsById.set(event.id, event)
|
newEventsById.set(event.id, event)
|
||||||
eventsByIdByUrl.set(url, newEventsById)
|
eventsByIdByUrl.set(url, newEventsById)
|
||||||
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
const removeEvent = (url: string, id: string) => {
|
|
||||||
const eventsById = eventsByIdByUrl.get(url)
|
|
||||||
|
|
||||||
if (eventsById?.has(id)) {
|
|
||||||
eventsById.delete(id)
|
|
||||||
|
|
||||||
if (eventsById.size === 0) {
|
|
||||||
eventsByIdByUrl.delete(url)
|
|
||||||
} else {
|
|
||||||
// Create a new map so we can detect which key changed
|
|
||||||
eventsByIdByUrl.set(url, new Map(eventsById))
|
|
||||||
}
|
|
||||||
|
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
return false
|
const removeEvent = (url: string, id: string) => {
|
||||||
}
|
const eventsById = eventsByIdByUrl.get(url)
|
||||||
|
|
||||||
return readable(eventsByIdByUrl, set => {
|
if (eventsById?.has(id)) {
|
||||||
for (const event of repository.query(filters, {includeDeleted})) {
|
eventsById.delete(id)
|
||||||
for (const url of tracker.getRelays(event.id)) {
|
|
||||||
addEvent(url, event)
|
if (eventsById.size === 0) {
|
||||||
|
eventsByIdByUrl.delete(url)
|
||||||
|
} else {
|
||||||
|
// Create a new map so we can detect which key changed
|
||||||
|
eventsByIdByUrl.set(url, new Map(eventsById))
|
||||||
|
}
|
||||||
|
|
||||||
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
set(eventsByIdByUrl)
|
set(eventsByIdByUrl)
|
||||||
@@ -211,33 +232,42 @@ export const deriveEventsByIdByUrl = ({
|
|||||||
|
|
||||||
return () => unsubscribers.forEach(call)
|
return () => unsubscribers.forEach(call)
|
||||||
})
|
})
|
||||||
}
|
|
||||||
|
|
||||||
export type DeriveEventsByIdForUrlOptions = DeriveEventsByIdOptions & {
|
export type EventsByIdForUrlOptions = EventsByIdOptions & {
|
||||||
url: string
|
url: string
|
||||||
tracker: Tracker
|
tracker: Tracker
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export const getEventsByIdForUrl = ({
|
||||||
|
url,
|
||||||
|
filters,
|
||||||
|
tracker,
|
||||||
|
repository,
|
||||||
|
includeDeleted,
|
||||||
|
}: EventsByIdForUrlOptions) => {
|
||||||
|
const initialIds = Array.from(tracker.getIds(url))
|
||||||
|
const initialFilters = filters.map(assoc("ids", initialIds))
|
||||||
|
const eventsById: EventsById = new Map()
|
||||||
|
|
||||||
|
for (const event of repository.query(initialFilters, {includeDeleted})) {
|
||||||
|
eventsById.set(event.id, event)
|
||||||
|
}
|
||||||
|
|
||||||
|
return eventsById
|
||||||
|
}
|
||||||
|
|
||||||
export const deriveEventsByIdForUrl = ({
|
export const deriveEventsByIdForUrl = ({
|
||||||
url,
|
url,
|
||||||
filters,
|
filters,
|
||||||
tracker,
|
tracker,
|
||||||
repository,
|
repository,
|
||||||
includeDeleted,
|
includeDeleted,
|
||||||
}: DeriveEventsByIdForUrlOptions) => {
|
}: EventsByIdForUrlOptions) => {
|
||||||
const eventsById: EventsById = new Map()
|
let eventsById = getEventsByIdForUrl({url, filters, tracker, repository, includeDeleted})
|
||||||
|
|
||||||
return readable(eventsById, set => {
|
return readable(eventsById, set => {
|
||||||
const reset = () => {
|
const reset = () => {
|
||||||
const initialIds = Array.from(tracker.getIds(url))
|
eventsById = getEventsByIdForUrl({url, filters, tracker, repository, includeDeleted})
|
||||||
const initialFilters = filters.map(assoc("ids", initialIds))
|
|
||||||
|
|
||||||
eventsById.clear()
|
|
||||||
|
|
||||||
for (const event of repository.query(initialFilters, {includeDeleted})) {
|
|
||||||
eventsById.set(event.id, event)
|
|
||||||
}
|
|
||||||
|
|
||||||
set(eventsById)
|
set(eventsById)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -295,7 +325,7 @@ export type EventToItem<T> = (event: TrustedEvent) => MaybeAsync<Maybe<T>>
|
|||||||
|
|
||||||
export type GetItem<T> = (key: string, ...args: any[]) => Maybe<T>
|
export type GetItem<T> = (key: string, ...args: any[]) => Maybe<T>
|
||||||
|
|
||||||
export type DeriveItemsByKeyOptions<T> = {
|
export type ItemsByKeyOptions<T> = {
|
||||||
getKey: (item: T) => string
|
getKey: (item: T) => string
|
||||||
filters: Filter[]
|
filters: Filter[]
|
||||||
repository: Repository
|
repository: Repository
|
||||||
@@ -309,7 +339,7 @@ export const deriveItemsByKey = <T>({
|
|||||||
repository,
|
repository,
|
||||||
eventToItem,
|
eventToItem,
|
||||||
includeDeleted,
|
includeDeleted,
|
||||||
}: DeriveItemsByKeyOptions<T>) => {
|
}: ItemsByKeyOptions<T>) => {
|
||||||
const deferred = new Map<string, Promise<Maybe<T>>>()
|
const deferred = new Map<string, Promise<Maybe<T>>>()
|
||||||
const itemsByKey = new Map<string, T>()
|
const itemsByKey = new Map<string, T>()
|
||||||
const idsByKey = new Map<string, string>()
|
const idsByKey = new Map<string, string>()
|
||||||
|
|||||||
@@ -1,7 +1,7 @@
|
|||||||
import {verifiedSymbol, verifyEvent as verifyEventPure} from "nostr-tools/pure"
|
import {verifiedSymbol, verifyEvent as verifyEventPure} from "nostr-tools/pure"
|
||||||
import {setNostrWasm, verifyEvent as verifyEventWasm} from "nostr-tools/wasm"
|
import {setNostrWasm, verifyEvent as verifyEventWasm} from "nostr-tools/wasm"
|
||||||
import {initNostrWasm} from "nostr-wasm"
|
import {initNostrWasm} from "nostr-wasm"
|
||||||
import {mapVals, lte, first, pick, now} from "@welshman/lib"
|
import {mapVals, sortBy, lte, first, pick, now} from "@welshman/lib"
|
||||||
import {getReplyTags, getCommentTags, getReplyTagValues, getCommentTagValues} from "./Tags.js"
|
import {getReplyTags, getCommentTags, getReplyTagValues, getCommentTagValues} from "./Tags.js"
|
||||||
import {getAddress, Address} from "./Address.js"
|
import {getAddress, Address} from "./Address.js"
|
||||||
import {
|
import {
|
||||||
@@ -195,3 +195,7 @@ export const isChildOf = (child: EventTemplate, parent: HashedEvent) => {
|
|||||||
|
|
||||||
return getIdAndAddress(parent).some(x => idsAndAddrs.includes(x))
|
return getIdAndAddress(parent).some(x => idsAndAddrs.includes(x))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export const sortEventsAsc = (events: Iterable<TrustedEvent>) => sortBy(e => e.created_at, events)
|
||||||
|
|
||||||
|
export const sortEventsDesc = (events: Iterable<TrustedEvent>) => sortBy(e => -e.created_at, events)
|
||||||
|
|||||||
Reference in New Issue
Block a user