From 5b53659a5b5467a2bc81fcf4dee4d67ae8dc97ba Mon Sep 17 00:00:00 2001 From: userAdityaa Date: Thu, 23 Apr 2026 17:20:13 +0545 Subject: [PATCH] fix: relay sync failures with delayed bounded retries --- backend/spec/infra.md | 7 ++- backend/src/infra.rs | 116 +++++++++++++++++++++++++++++++++++++++--- backend/src/query.rs | 17 +++++++ 3 files changed, 130 insertions(+), 10 deletions(-) diff --git a/backend/spec/infra.md b/backend/spec/infra.md index 4c4b45a..7383ab6 100644 --- a/backend/spec/infra.md +++ b/backend/spec/infra.md @@ -15,12 +15,15 @@ Members: ## `pub async fn start(self)` - Subscribes to `command.notify` +- On startup, schedules delayed sync retries for relays whose `sync_error` is non-empty. - Loops on `rx.recv()`, calling `handle_activity` for each received `Activity`. ## `async fn handle_activity(&self, activity: &Activity)` -- For `create_relay`, `update_relay`, `activate_relay`, or `deactivate_relay` activity, calls `sync_and_report`. -- All other activity types are ignored (e.g. `fail_relay_sync`, `complete_relay_sync`). +- For `create_relay`, `update_relay`, `activate_relay`, or `deactivate_relay` activity, calls `sync_and_report` immediately. +- For `fail_relay_sync`, schedules a delayed retry using exponential backoff based on consecutive failures for the relay. +- Retry scheduling stops after the configured max attempts to avoid infinite retry loops. +- Other activity types are ignored (e.g. `complete_relay_sync`). ## `async fn sync_and_report(&self, relay: &Relay, is_new: bool)` diff --git a/backend/src/infra.rs b/backend/src/infra.rs index cd922fe..6ac956d 100644 --- a/backend/src/infra.rs +++ b/backend/src/infra.rs @@ -1,10 +1,15 @@ use anyhow::Result; use nostr_sdk::prelude::*; +use std::time::Duration; use crate::command::Command; use crate::models::{Activity, RELAY_STATUS_DELINQUENT, RELAY_STATUS_INACTIVE, Relay}; use crate::query::Query; +const RELAY_SYNC_RETRY_BASE_DELAY_SECS: u64 = 30; +const RELAY_SYNC_RETRY_MAX_DELAY_SECS: u64 = 15 * 60; +const RELAY_SYNC_RETRY_MAX_ATTEMPTS: usize = 6; + #[derive(Clone)] pub struct Infra { api_url: String, @@ -48,6 +53,10 @@ impl Infra { pub async fn start(self) { let mut rx = self.command.notify.subscribe(); + if let Err(e) = self.schedule_startup_retries().await { + tracing::error!(error = %e, "failed to schedule relay sync retries on startup"); + } + loop { match rx.recv().await { Ok(activity) => { @@ -66,15 +75,84 @@ impl Infra { async fn handle_activity(&self, activity: &Activity) -> Result<()> { let needs_sync = should_sync_relay_activity(activity.activity_type.as_str()); - if needs_sync { - let Some(relay) = self.query.get_relay(&activity.resource_id).await? else { - return Ok(()); - }; - - let is_new = relay.synced == 0; - self.sync_and_report(&relay, is_new).await; + if !needs_sync || activity.resource_type != "relay" { + return Ok(()); } + if activity.activity_type == "fail_relay_sync" { + self.schedule_relay_sync_retry(&activity.resource_id, "activity") + .await?; + return Ok(()); + } + + let Some(relay) = self.query.get_relay(&activity.resource_id).await? else { + return Ok(()); + }; + + let is_new = relay.synced == 0; + self.sync_and_report(&relay, is_new).await; + + Ok(()) + } + + async fn schedule_startup_retries(&self) -> Result<()> { + let relays = self.query.list_relays_with_sync_error().await?; + + for relay in relays { + self.schedule_relay_sync_retry(&relay.id, "startup").await?; + } + + Ok(()) + } + + async fn schedule_relay_sync_retry(&self, relay_id: &str, source: &str) -> Result<()> { + let activities = self.query.list_activity_for_relay(relay_id).await?; + let consecutive_failures = consecutive_sync_failures(&activities); + + let Some(delay) = relay_sync_retry_delay(consecutive_failures) else { + tracing::warn!( + relay = relay_id, + consecutive_failures, + max_attempts = RELAY_SYNC_RETRY_MAX_ATTEMPTS, + "relay sync retries exhausted; awaiting manual intervention" + ); + return Ok(()); + }; + + tracing::info!( + relay = relay_id, + source, + consecutive_failures, + delay_secs = delay.as_secs(), + "scheduled relay sync retry" + ); + + let relay_id = relay_id.to_string(); + let infra = self.clone(); + + tokio::spawn(async move { + tokio::time::sleep(delay).await; + + if let Err(e) = infra.retry_relay_sync(&relay_id).await { + tracing::error!(relay = %relay_id, error = %e, "relay sync retry task failed"); + } + }); + + Ok(()) + } + + async fn retry_relay_sync(&self, relay_id: &str) -> Result<()> { + let Some(relay) = self.query.get_relay(relay_id).await? else { + return Ok(()); + }; + + if relay.sync_error.trim().is_empty() { + tracing::debug!(relay = %relay.id, "skip relay sync retry; relay has no sync_error"); + return Ok(()); + } + + let is_new = relay.synced == 0; + self.sync_and_report(&relay, is_new).await; Ok(()) } @@ -258,6 +336,28 @@ fn relay_sync_body( fn should_sync_relay_activity(activity_type: &str) -> bool { matches!( activity_type, - "create_relay" | "update_relay" | "activate_relay" | "deactivate_relay" + "create_relay" | "update_relay" | "activate_relay" | "deactivate_relay" | "fail_relay_sync" ) } + +fn consecutive_sync_failures(activities: &[Activity]) -> usize { + activities + .iter() + .take_while(|activity| activity.activity_type == "fail_relay_sync") + .count() +} + +fn relay_sync_retry_delay(consecutive_failures: usize) -> Option { + let retry_attempt = consecutive_failures.max(1); + if retry_attempt > RELAY_SYNC_RETRY_MAX_ATTEMPTS { + return None; + } + + let exponent = (retry_attempt - 1).min(31); + let multiplier = 1u64 << exponent; + let delay_secs = RELAY_SYNC_RETRY_BASE_DELAY_SECS + .saturating_mul(multiplier) + .min(RELAY_SYNC_RETRY_MAX_DELAY_SECS); + + Some(Duration::from_secs(delay_secs)) +} diff --git a/backend/src/query.rs b/backend/src/query.rs index 7cfd77b..278553c 100644 --- a/backend/src/query.rs +++ b/backend/src/query.rs @@ -94,6 +94,23 @@ impl Query { Ok(rows) } + pub async fn list_relays_with_sync_error(&self) -> Result> { + let rows = sqlx::query_as::<_, Relay>( + "SELECT id, tenant, schema, subdomain, plan, stripe_subscription_item_id, + status, sync_error, + info_name, info_icon, info_description, + policy_public_join, policy_strip_signatures, + groups_enabled, management_enabled, blossom_enabled, + livekit_enabled, push_enabled, synced + FROM relay + WHERE TRIM(sync_error) != '' + ORDER BY id", + ) + .fetch_all(&self.pool) + .await?; + Ok(rows) + } + pub async fn list_relays_for_tenant(&self, tenant_id: &str) -> Result> { let rows = sqlx::query_as::<_, Relay>( "SELECT id, tenant, schema, subdomain, plan, stripe_subscription_item_id, -- 2.52.0