diff --git a/backend/src/infra.rs b/backend/src/infra.rs index 6ef88c2..a0f480e 100644 --- a/backend/src/infra.rs +++ b/backend/src/infra.rs @@ -93,7 +93,7 @@ impl Infra { return Ok(()); }; - let is_new = relay.synced == 0; + let is_new = self.relay_sync_is_new(&relay).await?; self.sync_and_report(&relay, is_new).await; Ok(()) @@ -110,7 +110,7 @@ impl Infra { for relay in relays { if relay.sync_error.trim().is_empty() { - let is_new = relay.synced == 0; + let is_new = self.relay_sync_is_new(&relay).await?; self.sync_and_report(&relay, is_new).await; } else { self.schedule_relay_sync_retry(&relay.id, source).await?; @@ -166,11 +166,20 @@ impl Infra { return Ok(()); } - let is_new = relay.synced == 0; + let is_new = self.relay_sync_is_new(&relay).await?; self.sync_and_report(&relay, is_new).await; Ok(()) } + async fn relay_sync_is_new(&self, relay: &Relay) -> Result { + if relay.synced == 1 { + return Ok(false); + } + + let has_completed_sync = self.query.relay_has_completed_sync(&relay.id).await?; + Ok(!has_completed_sync) + } + async fn sync_and_report(&self, relay: &Relay, is_new: bool) { match self.sync_relay(relay, is_new).await { Ok(()) => { diff --git a/backend/src/query.rs b/backend/src/query.rs index d231055..c1e3dd0 100644 --- a/backend/src/query.rs +++ b/backend/src/query.rs @@ -194,4 +194,20 @@ impl Query { .await?; Ok(rows) } + + pub async fn relay_has_completed_sync(&self, relay_id: &str) -> Result { + let found = sqlx::query_scalar::<_, i64>( + "SELECT 1 + FROM activity + WHERE resource_type = 'relay' + AND resource_id = ? + AND activity_type = 'complete_relay_sync' + LIMIT 1", + ) + .bind(relay_id) + .fetch_optional(&self.pool) + .await?; + + Ok(found.is_some()) + } }