Improve join/leave, publish messages

This commit is contained in:
Jon Staab
2024-08-23 13:33:36 -07:00
parent d6fa0a85bc
commit f12e7ef77c
24 changed files with 412 additions and 297 deletions
+62 -30
View File
@@ -5,6 +5,7 @@ import {get, writable, readable, derived} from "svelte/store"
import type {Maybe} from "@welshman/lib"
import {
max,
append,
between,
uniqBy,
groupBy,
@@ -38,6 +39,8 @@ import {
displayPubkey,
GROUP_JOIN,
GROUP_ADD_USER,
isStampedEvent,
isEventTemplate,
} from "@welshman/util"
import type {SignedEvent, HashedEvent, EventTemplate, TrustedEvent, PublishedProfile, PublishedList} from "@welshman/util"
import type {SubscribeRequest, PublishRequest} from "@welshman/net"
@@ -129,9 +132,13 @@ export type Thunk = {
relays: string[]
}
export const thunkWorker = new Worker<Thunk>()
export type ThunkWithResolve = Thunk & {
resolve: (data: PublishStatusDataByUrl) => void
}
thunkWorker.addGlobalHandler(async ({event, relays}: Thunk) => {
export const thunkWorker = new Worker<ThunkWithResolve>()
thunkWorker.addGlobalHandler(async ({event, relays, resolve}: ThunkWithResolve) => {
const session = getSession(event.pubkey)
if (!session) {
@@ -139,26 +146,20 @@ thunkWorker.addGlobalHandler(async ({event, relays}: Thunk) => {
}
const signedEvent = await getSigner(session)!.sign(event)
const savedEvent = repository.getEvent(signedEvent.id)
const pub = basePublish({event: signedEvent, relays})
// Copy the signature over since we had deferred it
if (savedEvent) {
savedEvent.sig = signedEvent.sig
}
;(repository.getEvent(signedEvent.id) as SignedEvent).sig = signedEvent.sig
const failures = new Set<string>()
// Track publish success
const {id} = event
const statusByUrl: PublishStatusDataByUrl = {}
// Watch for failures
pub.emitter.on('*', (status: PublishStatus, url: string) => {
console.log('pub status', status, url)
pub.emitter.on('*', (status: PublishStatus, url: string, message: string) => {
publishStatusData.update(assoc(id, Object.assign(statusByUrl, {[url]: {id, url, status, message}})))
if ([PublishStatus.Failure, PublishStatus.Timeout].includes(status)) {
failures.add(url)
}
if (failures.size === relays.length) {
console.warn("Failed to publish", pub)
if (Object.values(statusByUrl).filter(s => s.status !== PublishStatus.Pending).length === relays.length) {
resolve(statusByUrl)
}
})
})
@@ -178,17 +179,20 @@ export const makeThunk = ({event, relays}: ThunkParams) => {
return {event: hash(own(stamp(event), $pk)), relays}
}
export const publishThunk = (thunk: Thunk) => {
thunkWorker.push(thunk)
repository.publish(thunk.event)
}
export const publishThunk = (thunk: Thunk) =>
new Promise<PublishStatusDataByUrl>(resolve => {
thunkWorker.push({...thunk, resolve})
repository.publish(thunk.event)
})
// Subscribe
export const subscribe = (request: SubscribeRequest) => {
const sub = baseSubscribe({delay: 50, authTimeout: 3000, ...request})
sub.emitter.on("event", (url: string, e: SignedEvent) => repository.publish(e))
sub.emitter.on("event", (url: string, e: SignedEvent) => {
repository.publish(e)
})
return sub
}
@@ -198,14 +202,27 @@ export const load = (request: SubscribeRequest) =>
const sub = subscribe({closeOnEose: true, timeout: 3000, ...request})
const events: TrustedEvent[] = []
sub.emitter.on("event", (url: string, e: SignedEvent) => {
repository.publish(e)
events.push(e)
})
sub.emitter.on("event", (url: string, e: SignedEvent) => events.push(e))
sub.emitter.on("complete", () => resolve(events))
})
// Publish status
export type PublishStatusData = {
id: string
url: string
message: string
status: PublishStatus
}
export type PublishStatusDataByUrl = Record<string, PublishStatusData>
export type PublishStatusDataByUrlById = Record<string, PublishStatusDataByUrl>
export const publishStatusData = writable<PublishStatusDataByUrlById>({})
// Freshness
export const freshness = withGetter(writable<Record<string, number>>({}))
@@ -255,7 +272,7 @@ export type Topic = {
count: number
}
export const topics = custom<Topic[]>(setter => {
export const topics = custom<Topic[]>(setter => {
const getTopics = () => {
const topics = new Map<string, number>()
for (const tagString of repository.eventsByTag.keys()) {
@@ -290,7 +307,7 @@ export const searchTopics = derived(topics, $topics =>
export const relays = writable<Relay[]>([])
export const relaysByPubkey = derived(relays, $relays =>
groupBy(($relay: Relay) => $relay.pubkey, $relays),
groupBy(($relay: Relay) => $relay.pubkey, $relays.filter(r => r.pubkey)),
)
export const {
@@ -508,7 +525,7 @@ export const getGroupName = (e?: TrustedEvent) => e?.tags.find(nthEq(0, "name"))
export const getGroupPicture = (e?: TrustedEvent) => e?.tags.find(nthEq(0, "picture"))?.[1]
export const displayGroup = (group?: Group) => group?.name || "[no name]"
export const displayGroup = (group?: Group) => group?.name || group?.nom || "[no name]"
export type Group = {
nom: string
@@ -524,8 +541,8 @@ export type PublishedGroup = Omit<Group, "event"> & {
export const readGroup = (event: TrustedEvent) => {
const nom = getIdentifier(event)!
const name = event?.tags.find(nthEq(0, "name"))?.[1] || "[no name]"
const about = event?.tags.find(nthEq(0, "about"))?.[1] || ""
const name = event?.tags.find(nthEq(0, "name"))?.[1]
const about = event?.tags.find(nthEq(0, "about"))?.[1]
const picture = event?.tags.find(nthEq(0, "picture"))?.[1]
return {nom, name, about, picture, event}
@@ -759,3 +776,18 @@ export const userGroupsByNom = withGetter(
return $userGroupsByNom
}),
)
export const userRelayUrlsByNom = derived(
userGroupsByNom,
$userGroupsByNom => {
const $userRelayUrlsByNom = new Map()
for (const [nom, groups] of $userGroupsByNom.entries()) {
for (const group of groups) {
pushToMapKey($userRelayUrlsByNom, nom, group.relay.url)
}
}
return $userRelayUrlsByNom
}
)