From 95c971af1a0a477d1757670092d4c5f1b2af1585 Mon Sep 17 00:00:00 2001 From: Jon Staab Date: Tue, 31 Mar 2026 06:51:47 -0700 Subject: [PATCH] Avoid duplicate syncs --- backend/migrations/0002_relay_synced.sql | 1 + backend/spec/infra.md | 10 +++-- backend/spec/models.md | 1 + backend/spec/repo.md | 10 +++++ backend/src/api.rs | 2 + backend/src/infra.rs | 22 ++++++---- backend/src/models.rs | 2 + backend/src/repo.rs | 52 ++++++++++++++---------- 8 files changed, 65 insertions(+), 35 deletions(-) create mode 100644 backend/migrations/0002_relay_synced.sql diff --git a/backend/migrations/0002_relay_synced.sql b/backend/migrations/0002_relay_synced.sql new file mode 100644 index 0000000..ad7d37c --- /dev/null +++ b/backend/migrations/0002_relay_synced.sql @@ -0,0 +1 @@ +ALTER TABLE relay ADD COLUMN synced INTEGER NOT NULL DEFAULT 0; diff --git a/backend/spec/infra.md b/backend/spec/infra.md index 68e30d6..dd30dbe 100644 --- a/backend/spec/infra.md +++ b/backend/spec/infra.md @@ -13,16 +13,18 @@ Members: ## `pub async fn start(self)` -Calls `self.tick` in a loop every 10 seconds. +- Initializes `last_activity_at` from `repo.max_activity_at()` so historical activities are not replayed on restart. +- Calls `self.tick` in a loop every 10 seconds. ## `pub async fn tick(self)` Iterates over `repo.list_activity` since last run and does the following: -- For `create_relay` activity, create the relay in zooid via `POST /relay`. -- For `update_relay` or `deactivate_relay` activity, update the relay in zooid via `PUT /relay/:id`. +- For `create_relay`, `update_relay`, or `deactivate_relay` activity, sync the relay to zooid. + - Uses `relay.synced` to decide POST vs PUT (not the activity type), so already-synced relays always use PUT even on restart. + - On success, calls `repo.mark_relay_synced` to set `synced = 1`, `status = 'active'`, and clear `sync_error`. + - On failure, calls `repo.fail_relay_sync`. - All other activity types are ignored (e.g. `fail_relay_sync` must not trigger another sync). -- If unsuccessful, call `repo.fail_relay_sync`. ## `async fn sync_relay(&self, relay: &Relay, is_new: bool)` diff --git a/backend/spec/models.md b/backend/spec/models.md index 7edebb6..75501ac 100644 --- a/backend/spec/models.md +++ b/backend/spec/models.md @@ -81,6 +81,7 @@ A relay is a nostr relay owned by a `tenant` and hosted by the attached zooid in - `blossom_enabled` - whether blossom file storage is enabled - `livekit_enabled` - whether livekit calls are enabled - `push_enabled` - whether relay push is enabled +- `synced` (private) - whether the relay has been successfully synced to zooid at least once. Used by infra to decide POST vs PUT. Some attributes persisted to zooid via API have special handling: diff --git a/backend/spec/repo.md b/backend/spec/repo.md index 47b6412..608ada7 100644 --- a/backend/spec/repo.md +++ b/backend/spec/repo.md @@ -91,6 +91,16 @@ Notes: - Sets relay status to `inactive`, sets `sync_error` - Logs activity as `(fail_relay_sync, relay_id)` +## `pub fn mark_relay_synced(&self, relay_id: &str) -> Result<()>` + +- Sets `synced = 1`, `status = 'active'`, clears `sync_error` +- No activity log (called by infra after successful sync) + +## `pub fn max_activity_at(&self) -> Result` + +- Returns the maximum `created_at` value from the activity table, or 0 if empty +- Used by infra to initialize the since guard on startup + ## `pub fn create_invoice(&self, invoice: &Invoice, invoice_items: [&InvoiceItem]) -> Result<()>` - Saves an `invoice` row and related `invoice_item` rows diff --git a/backend/src/api.rs b/backend/src/api.rs index fe98ab8..d5455fc 100644 --- a/backend/src/api.rs +++ b/backend/src/api.rs @@ -520,6 +520,7 @@ async fn create_relay( blossom_enabled: payload.blossom_enabled.unwrap_or(0), livekit_enabled: payload.livekit_enabled.unwrap_or(0), push_enabled: payload.push_enabled.unwrap_or(1), + synced: 0, }; relay = match state.api.prepare_relay(relay) { @@ -902,6 +903,7 @@ mod tests { blossom_enabled: 0, livekit_enabled: 0, push_enabled: 1, + synced: 0, }; repo.create_relay(&relay).await.expect("create relay"); } diff --git a/backend/src/infra.rs b/backend/src/infra.rs index b6867ac..381ccd2 100644 --- a/backend/src/infra.rs +++ b/backend/src/infra.rs @@ -54,20 +54,24 @@ impl Infra { let activity = self.repo.list_activity(&since).await?; for a in activity { - let sync_type = match a.activity_type.as_str() { - "create_relay" => Some(true), - "update_relay" | "deactivate_relay" => Some(false), - _ => None, - }; + let needs_sync = matches!( + a.activity_type.as_str(), + "create_relay" | "update_relay" | "deactivate_relay" + ); - if let Some(is_new) = sync_type { + if needs_sync { let Some(relay) = self.repo.get_relay(&a.resource_id).await? else { continue; }; - if let Err(e) = self.sync_relay(&relay, is_new).await { - tracing::warn!(relay = %relay.id, error = %e, "relay sync failed"); - self.repo.fail_relay_sync(&relay, e.to_string()).await?; + let is_new = relay.synced == 0; + + match self.sync_relay(&relay, is_new).await { + Ok(()) => self.repo.mark_relay_synced(&relay.id).await?, + Err(e) => { + tracing::warn!(relay = %relay.id, error = %e, "relay sync failed"); + self.repo.fail_relay_sync(&relay, e.to_string()).await?; + } } } diff --git a/backend/src/models.rs b/backend/src/models.rs index 35709b5..f8a3528 100644 --- a/backend/src/models.rs +++ b/backend/src/models.rs @@ -48,6 +48,8 @@ pub struct Relay { pub blossom_enabled: i64, pub livekit_enabled: i64, pub push_enabled: i64, + #[serde(skip_serializing)] + pub synced: i64, } #[derive(Debug, Clone, Serialize, Deserialize, sqlx::FromRow)] diff --git a/backend/src/repo.rs b/backend/src/repo.rs index d5e7174..956a49b 100644 --- a/backend/src/repo.rs +++ b/backend/src/repo.rs @@ -162,13 +162,13 @@ impl Repo { pub async fn list_relays(&self) -> Result> { let rows = sqlx::query_as::<_, Relay>( - "SELECT id, tenant, schema, subdomain, plan, status, sync_error, - info_name, info_icon, info_description, - policy_public_join, policy_strip_signatures, - groups_enabled, management_enabled, blossom_enabled, - livekit_enabled, push_enabled - FROM relay - ORDER BY id", + "SELECT id, tenant, schema, subdomain, plan, status, sync_error, + info_name, info_icon, info_description, + policy_public_join, policy_strip_signatures, + groups_enabled, management_enabled, blossom_enabled, + livekit_enabled, push_enabled, synced + FROM relay + ORDER BY id", ) .fetch_all(&self.pool) .await?; @@ -177,14 +177,14 @@ impl Repo { pub async fn list_relays_for_tenant(&self, tenant_id: &str) -> Result> { let rows = sqlx::query_as::<_, Relay>( - "SELECT id, tenant, schema, subdomain, plan, status, sync_error, - info_name, info_icon, info_description, - policy_public_join, policy_strip_signatures, - groups_enabled, management_enabled, blossom_enabled, - livekit_enabled, push_enabled - FROM relay - WHERE tenant = ? - ORDER BY id", + "SELECT id, tenant, schema, subdomain, plan, status, sync_error, + info_name, info_icon, info_description, + policy_public_join, policy_strip_signatures, + groups_enabled, management_enabled, blossom_enabled, + livekit_enabled, push_enabled, synced + FROM relay + WHERE tenant = ? + ORDER BY id", ) .bind(tenant_id) .fetch_all(&self.pool) @@ -194,13 +194,13 @@ impl Repo { pub async fn get_relay(&self, id: &str) -> Result> { let row = sqlx::query_as::<_, Relay>( - "SELECT id, tenant, schema, subdomain, plan, status, sync_error, - info_name, info_icon, info_description, - policy_public_join, policy_strip_signatures, - groups_enabled, management_enabled, blossom_enabled, - livekit_enabled, push_enabled - FROM relay - WHERE id = ?", + "SELECT id, tenant, schema, subdomain, plan, status, sync_error, + info_name, info_icon, info_description, + policy_public_join, policy_strip_signatures, + groups_enabled, management_enabled, blossom_enabled, + livekit_enabled, push_enabled, synced + FROM relay + WHERE id = ?", ) .bind(id) .fetch_optional(&self.pool) @@ -312,6 +312,14 @@ impl Repo { Ok(()) } + pub async fn mark_relay_synced(&self, relay_id: &str) -> Result<()> { + sqlx::query("UPDATE relay SET synced = 1, status = 'active', sync_error = '' WHERE id = ?") + .bind(relay_id) + .execute(&self.pool) + .await?; + Ok(()) + } + pub async fn create_invoice( &self, invoice: &Invoice,