Fix tags on deletes other than a/e
This commit is contained in:
@@ -143,74 +143,76 @@ export const optimizeSubscriptions = (subs: Subscription[]) => {
|
|||||||
const mergedSubs: Subscription[] = []
|
const mergedSubs: Subscription[] = []
|
||||||
|
|
||||||
for (const {relays, filters} of ctx.net.optimizeSubscriptions(group)) {
|
for (const {relays, filters} of ctx.net.optimizeSubscriptions(group)) {
|
||||||
const mergedSub = makeSubscription({
|
for (const filter of filters) {
|
||||||
filters,
|
const mergedSub = makeSubscription({
|
||||||
relays,
|
filters: [filter],
|
||||||
timeout,
|
relays,
|
||||||
authTimeout,
|
timeout,
|
||||||
closeOnEose
|
authTimeout,
|
||||||
})
|
closeOnEose
|
||||||
|
})
|
||||||
|
|
||||||
for (const {id, controller, request} of group) {
|
for (const {id, controller, request} of group) {
|
||||||
const onAbort = () => {
|
const onAbort = () => {
|
||||||
abortedSubs.add(id)
|
abortedSubs.add(id)
|
||||||
|
|
||||||
if (abortedSubs.size === group.length) {
|
if (abortedSubs.size === group.length) {
|
||||||
mergedSub.close()
|
mergedSub.close()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
request.signal?.addEventListener('abort', onAbort)
|
||||||
|
controller.signal.addEventListener('abort', onAbort)
|
||||||
}
|
}
|
||||||
|
|
||||||
request.signal?.addEventListener('abort', onAbort)
|
mergedSub.emitter.on(SubscriptionEvent.Event, (url: string, event: TrustedEvent) => {
|
||||||
controller.signal.addEventListener('abort', onAbort)
|
|
||||||
}
|
|
||||||
|
|
||||||
mergedSub.emitter.on(SubscriptionEvent.Event, (url: string, event: TrustedEvent) => {
|
|
||||||
for (const sub of group) {
|
|
||||||
if (matchFilters(sub.request.filters, event) && !sub.tracker.track(event.id, url)) {
|
|
||||||
sub.emitter.emit(SubscriptionEvent.Event, url, event)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
})
|
|
||||||
|
|
||||||
// Pass events back to caller
|
|
||||||
const propagateEvent = (type: SubscriptionEvent) =>
|
|
||||||
mergedSub.emitter.on(type, (url: string, event: TrustedEvent) => {
|
|
||||||
for (const sub of group) {
|
for (const sub of group) {
|
||||||
if (matchFilters(sub.request.filters, event)) {
|
if (matchFilters(sub.request.filters, event) && !sub.tracker.track(event.id, url)) {
|
||||||
sub.emitter.emit(type, url, event)
|
sub.emitter.emit(SubscriptionEvent.Event, url, event)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
propagateEvent(SubscriptionEvent.Duplicate)
|
// Pass events back to caller
|
||||||
propagateEvent(SubscriptionEvent.DeletedEvent)
|
const propagateEvent = (type: SubscriptionEvent) =>
|
||||||
propagateEvent(SubscriptionEvent.Invalid)
|
mergedSub.emitter.on(type, (url: string, event: TrustedEvent) => {
|
||||||
|
|
||||||
const propagateFinality = (type: SubscriptionEvent, subIds: Set<string>) =>
|
|
||||||
mergedSub.emitter.on(type, (...args: any[]) => {
|
|
||||||
subIds.add(mergedSub.id)
|
|
||||||
|
|
||||||
// Wait for all subscriptions to complete before reporting finality to the caller.
|
|
||||||
// This is sub-optimal, but because we're outsourcing filter/relay optimization
|
|
||||||
// we can't make any assumptions about which caller subscriptions have completed
|
|
||||||
// at any given time.
|
|
||||||
if (subIds.size === mergedSubs.length) {
|
|
||||||
for (const sub of group) {
|
for (const sub of group) {
|
||||||
sub.emitter.emit(type, ...args)
|
if (matchFilters(sub.request.filters, event)) {
|
||||||
|
sub.emitter.emit(type, url, event)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
})
|
||||||
|
|
||||||
if (type === SubscriptionEvent.Complete) {
|
propagateEvent(SubscriptionEvent.Duplicate)
|
||||||
mergedSub.emitter.removeAllListeners()
|
propagateEvent(SubscriptionEvent.DeletedEvent)
|
||||||
}
|
propagateEvent(SubscriptionEvent.Invalid)
|
||||||
})
|
|
||||||
|
|
||||||
propagateFinality(SubscriptionEvent.Send, sentSubs)
|
const propagateFinality = (type: SubscriptionEvent, subIds: Set<string>) =>
|
||||||
propagateFinality(SubscriptionEvent.Eose, eosedSubs)
|
mergedSub.emitter.on(type, (...args: any[]) => {
|
||||||
propagateFinality(SubscriptionEvent.Close, closedSubs)
|
subIds.add(mergedSub.id)
|
||||||
propagateFinality(SubscriptionEvent.Complete, completedSubs)
|
|
||||||
|
|
||||||
mergedSubs.push(mergedSub)
|
// Wait for all subscriptions to complete before reporting finality to the caller.
|
||||||
|
// This is sub-optimal, but because we're outsourcing filter/relay optimization
|
||||||
|
// we can't make any assumptions about which caller subscriptions have completed
|
||||||
|
// at any given time.
|
||||||
|
if (subIds.size === mergedSubs.length) {
|
||||||
|
for (const sub of group) {
|
||||||
|
sub.emitter.emit(type, ...args)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (type === SubscriptionEvent.Complete) {
|
||||||
|
mergedSub.emitter.removeAllListeners()
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
propagateFinality(SubscriptionEvent.Send, sentSubs)
|
||||||
|
propagateFinality(SubscriptionEvent.Eose, eosedSubs)
|
||||||
|
propagateFinality(SubscriptionEvent.Close, closedSubs)
|
||||||
|
propagateFinality(SubscriptionEvent.Complete, completedSubs)
|
||||||
|
|
||||||
|
mergedSubs.push(mergedSub)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return mergedSubs
|
return mergedSubs
|
||||||
|
|||||||
@@ -222,7 +222,7 @@ export class Repository<E extends HashedEvent = TrustedEvent> extends Emitter {
|
|||||||
|
|
||||||
// If this is a delete event, the tag value is an id or address. Track when it was
|
// If this is a delete event, the tag value is an id or address. Track when it was
|
||||||
// deleted so that replaceables can be restored.
|
// deleted so that replaceables can be restored.
|
||||||
if (event.kind === DELETE) {
|
if (event.kind === DELETE && ["a", "e"].includes(tag[0]) && tag[1]) {
|
||||||
this.deletes.set(tag[1], Math.max(event.created_at, this.deletes.get(tag[1]) || 0))
|
this.deletes.set(tag[1], Math.max(event.created_at, this.deletes.get(tag[1]) || 0))
|
||||||
|
|
||||||
const deletedEvent = this.getEvent(tag[1])
|
const deletedEvent = this.getEvent(tag[1])
|
||||||
|
|||||||
Reference in New Issue
Block a user