Compare commits

..

1 Commits

Author SHA1 Message Date
userAdityaa 5848355428 fix: silent relay state drift when activity bus drops events 2026-04-29 23:45:30 +05:30
2 changed files with 3 additions and 28 deletions
+3 -12
View File
@@ -93,7 +93,7 @@ impl Infra {
return Ok(());
};
let is_new = self.relay_sync_is_new(&relay).await?;
let is_new = relay.synced == 0;
self.sync_and_report(&relay, is_new).await;
Ok(())
@@ -110,7 +110,7 @@ impl Infra {
for relay in relays {
if relay.sync_error.trim().is_empty() {
let is_new = self.relay_sync_is_new(&relay).await?;
let is_new = relay.synced == 0;
self.sync_and_report(&relay, is_new).await;
} else {
self.schedule_relay_sync_retry(&relay.id, source).await?;
@@ -166,20 +166,11 @@ impl Infra {
return Ok(());
}
let is_new = self.relay_sync_is_new(&relay).await?;
let is_new = relay.synced == 0;
self.sync_and_report(&relay, is_new).await;
Ok(())
}
async fn relay_sync_is_new(&self, relay: &Relay) -> Result<bool> {
if relay.synced == 1 {
return Ok(false);
}
let has_completed_sync = self.query.relay_has_completed_sync(&relay.id).await?;
Ok(!has_completed_sync)
}
async fn sync_and_report(&self, relay: &Relay, is_new: bool) {
match self.sync_relay(relay, is_new).await {
Ok(()) => {
-16
View File
@@ -194,20 +194,4 @@ impl Query {
.await?;
Ok(rows)
}
pub async fn relay_has_completed_sync(&self, relay_id: &str) -> Result<bool> {
let found = sqlx::query_scalar::<_, i64>(
"SELECT 1
FROM activity
WHERE resource_type = 'relay'
AND resource_id = ?
AND activity_type = 'complete_relay_sync'
LIMIT 1",
)
.bind(relay_id)
.fetch_optional(&self.pool)
.await?;
Ok(found.is_some())
}
}