Update publish

This commit is contained in:
Jon Staab
2025-04-11 08:03:44 -07:00
parent 989fc74374
commit 837fd1ab66
12 changed files with 219 additions and 274 deletions
+4 -9
View File
@@ -1,5 +1,5 @@
import {nthEq, partition, race, now} from "@welshman/lib"
import {createEvent, getPubkeyTagValues} from "@welshman/util"
import {createEvent, getPubkeyTagValues, TrustedEvent} from "@welshman/util"
import {request, Tracker} from "@welshman/net"
import {Scope, FeedController, RequestOpts, FeedOptions, DVMOpts, Feed} from "@welshman/feeds"
import {makeDvmRequest, DVMEvent} from "@welshman/dvm"
@@ -20,14 +20,9 @@ export const makeFeedRequestHandler = ({signal}: FeedRequestHandlerOptions) =>
const requestOptions = {}
if (relays.length > 0) {
await new Promise(resolve => {
const req = request({tracker, signal, autoClose: true, relays, filters})
req.on(RequestEvent.Event, onEvent)
req.on(RequestEvent.Close, resolve)
})
await request({tracker, signal, relays, filters, onEvent, autoClose: true})
} else {
const promises: Promise<TrustedEvent>[][] = []
const promises: Promise<TrustedEvent[]>[] = []
const [withSearch, withoutSearch] = partition(f => Boolean(f.search), filters)
if (withSearch.length > 0) {
@@ -35,7 +30,7 @@ export const makeFeedRequestHandler = ({signal}: FeedRequestHandlerOptions) =>
request({
signal,
tracker,
onEvent
onEvent,
threshold: 0.1,
autoClose: true,
filters: withSearch,
+2 -8
View File
@@ -10,7 +10,7 @@ import {
getRelayTagValues,
} from "@welshman/util"
import {TrustedEvent, Filter, PublishedList, List} from "@welshman/util"
import {request, load, RequestEvent} from "@welshman/net"
import {request, load} from "@welshman/net"
import {deriveEventsMapped} from "@welshman/store"
import {repository} from "./core.js"
import {Router} from "./router.js"
@@ -52,13 +52,7 @@ export const makeOutboxLoader = (kinds: number[]) => {
const filters = [{authors, kinds}]
const relays = router.merge(scenarios).getUrls()
const promise = new Promise<void>(resolve => {
const req = request({filters, relays, autoClose: true})
req.on(RequestEvent.Eose, () => resolve())
req.on(RequestEvent.Close, () => resolve())
})
const promise = request({filters, relays, autoClose: true})
return requests.map(always(promise))
})
+2 -10
View File
@@ -3,8 +3,7 @@ import {isSignedEvent, SignedEvent} from "@welshman/util"
import {
push as basePush,
pull as basePull,
PublishEvent,
SinglePublish,
publishOne,
requestOne,
} from "@welshman/net"
import {repository} from "./core.js"
@@ -46,14 +45,7 @@ export const push = async ({relays, filters}: AppSyncOpts) => {
relays.map(async relay => {
await (hasNegentropy(relay)
? basePush({filters, events, relays: [relay]})
: Promise.all(
events.map(
(event: SignedEvent) =>
new Promise<void>(resolve => {
new SinglePublish({event, relay}).on(PublishEvent.Complete, resolve)
}),
),
))
: Promise.all(events.map((event: SignedEvent) => publishOne({event, relay}))))
}),
)
}
+30 -35
View File
@@ -26,7 +26,7 @@ import {
isUnwrappedEvent,
isSignedEvent,
} from "@welshman/util"
import {MultiPublish, AdapterContext, PublishStatus, PublishEvent} from "@welshman/net"
import {publish, AdapterContext, PublishStatus} from "@welshman/net"
import {repository, tracker} from "./core.js"
import {pubkey, getSession, getSigner} from "./session.js"
@@ -47,14 +47,14 @@ export type ThunkStatus = {
status: PublishStatus
}
export type ThunkStatusByUrl = Record<string, ThunkStatus>
export type ThunkStatusByRelay = Record<string, ThunkStatus>
export type Thunk = {
event: TrustedEvent
request: ThunkRequest
controller: AbortController
result: Deferred<ThunkStatusByUrl>
status: Writable<ThunkStatusByUrl>
result: Deferred<ThunkStatusByRelay>
status: Writable<ThunkStatusByRelay>
}
export const prepEvent = (event: ThunkEvent) => {
@@ -85,8 +85,8 @@ export const makeThunk = (request: ThunkRequest) => {
export type MergedThunk = {
thunks: Thunk[]
controller: AbortController
result: Promise<ThunkStatusByUrl[]>
status: Readable<ThunkStatusByUrl>
result: Promise<ThunkStatusByRelay[]>
status: Readable<ThunkStatusByRelay>
}
export const isMergedThunk = (thunk: Thunk | MergedThunk): thunk is MergedThunk =>
@@ -108,7 +108,7 @@ export const mergeThunks = (thunks: Thunk[]) => {
status: derived(
thunks.map(thunk => thunk.status),
statuses => {
const mergedStatus: ThunkStatusByUrl = {}
const mergedStatus: ThunkStatusByRelay = {}
for (const url of uniq(statuses.flatMap(s => Object.keys(s)))) {
const urlStatuses = statuses.map(s => s[url])
@@ -196,7 +196,7 @@ export const thunkQueue = new TaskQueue<Thunk>({
// Avoid making this function async so multiple publishes can run concurrently
Promise.resolve().then(async () => {
const fail = (message: string) => {
const status: ThunkStatusByUrl = {}
const status: ThunkStatusByRelay = {}
for (const url of thunk.request.relays) {
status[url] = {status: PublishStatus.Failure, message}
@@ -239,17 +239,37 @@ export const thunkQueue = new TaskQueue<Thunk>({
fromPairs(
thunk.request.relays.map(url => [
url,
{status: PublishStatus.Pending, message: "Sending your message..."},
{status: Pending, message: "Sending your message..."},
]),
),
)
// Send it off
const pub = new MultiPublish({
publish({
event: signedEvent,
relays: thunk.request.relays,
context: thunk.request.context,
timeout: thunk.request.timeout,
onSuccess: (message: string, url: string) => {
tracker.track(signedEvent.id, url)
thunk.status.update(assoc(url, {status: PublishStatus.Success, message}))
},
onFailure: (message: string, url: string) => {
thunk.status.update(assoc(url, {status: PublishStatus.Failure, message}))
},
onTimeout: (url: string) => {
const message = "Publish timed out"
thunk.status.update(assoc(url, {status: PublishStatus.Timeout, message}))
},
onAborted: (url: string) => {
const message = "Publish was aborted"
thunk.status.update(assoc(url, {status: PublishStatus.Aborted, message}))
},
onComplete: () => {
thunk.result.resolve(get(thunk.status))
},
})
// Copy the signature over since we had deferred it
@@ -259,31 +279,6 @@ export const thunkQueue = new TaskQueue<Thunk>({
if (savedEvent) {
savedEvent.sig = signedEvent.sig
}
pub.on(PublishEvent.Success, (id: string, message: string, url: string) => {
tracker.track(id, url)
thunk.status.update(assoc(url, {status: PublishStatus.Success, message}))
})
pub.on(PublishEvent.Failure, (id: string, message: string, url: string) => {
thunk.status.update(assoc(url, {status: PublishStatus.Failure, message}))
})
pub.on(PublishEvent.Timeout, (url: string) => {
thunk.status.update(
assoc(url, {status: PublishStatus.Timeout, message: "Publish timed out"}),
)
})
pub.on(PublishEvent.Aborted, (url: string) => {
thunk.status.update(
assoc(url, {status: PublishStatus.Aborted, message: "Publish was aborted"}),
)
})
pub.on(PublishEvent.Complete, () => {
thunk.result.resolve(get(thunk.status))
})
})
},
})