Avoid duplicate syncs

This commit is contained in:
Jon Staab
2026-03-31 06:51:47 -07:00
parent e6eda81920
commit 95c971af1a
8 changed files with 65 additions and 35 deletions
+1
View File
@@ -0,0 +1 @@
ALTER TABLE relay ADD COLUMN synced INTEGER NOT NULL DEFAULT 0;
+6 -4
View File
@@ -13,16 +13,18 @@ Members:
## `pub async fn start(self)`
Calls `self.tick` in a loop every 10 seconds.
- Initializes `last_activity_at` from `repo.max_activity_at()` so historical activities are not replayed on restart.
- Calls `self.tick` in a loop every 10 seconds.
## `pub async fn tick(self)`
Iterates over `repo.list_activity` since last run and does the following:
- For `create_relay` activity, create the relay in zooid via `POST /relay`.
- For `update_relay` or `deactivate_relay` activity, update the relay in zooid via `PUT /relay/:id`.
- For `create_relay`, `update_relay`, or `deactivate_relay` activity, sync the relay to zooid.
- Uses `relay.synced` to decide POST vs PUT (not the activity type), so already-synced relays always use PUT even on restart.
- On success, calls `repo.mark_relay_synced` to set `synced = 1`, `status = 'active'`, and clear `sync_error`.
- On failure, calls `repo.fail_relay_sync`.
- All other activity types are ignored (e.g. `fail_relay_sync` must not trigger another sync).
- If unsuccessful, call `repo.fail_relay_sync`.
## `async fn sync_relay(&self, relay: &Relay, is_new: bool)`
+1
View File
@@ -81,6 +81,7 @@ A relay is a nostr relay owned by a `tenant` and hosted by the attached zooid in
- `blossom_enabled` - whether blossom file storage is enabled
- `livekit_enabled` - whether livekit calls are enabled
- `push_enabled` - whether relay push is enabled
- `synced` (private) - whether the relay has been successfully synced to zooid at least once. Used by infra to decide POST vs PUT.
Some attributes persisted to zooid via API have special handling:
+10
View File
@@ -91,6 +91,16 @@ Notes:
- Sets relay status to `inactive`, sets `sync_error`
- Logs activity as `(fail_relay_sync, relay_id)`
## `pub fn mark_relay_synced(&self, relay_id: &str) -> Result<()>`
- Sets `synced = 1`, `status = 'active'`, clears `sync_error`
- No activity log (called by infra after successful sync)
## `pub fn max_activity_at(&self) -> Result<i64>`
- Returns the maximum `created_at` value from the activity table, or 0 if empty
- Used by infra to initialize the since guard on startup
## `pub fn create_invoice(&self, invoice: &Invoice, invoice_items: [&InvoiceItem]) -> Result<()>`
- Saves an `invoice` row and related `invoice_item` rows
+2
View File
@@ -520,6 +520,7 @@ async fn create_relay(
blossom_enabled: payload.blossom_enabled.unwrap_or(0),
livekit_enabled: payload.livekit_enabled.unwrap_or(0),
push_enabled: payload.push_enabled.unwrap_or(1),
synced: 0,
};
relay = match state.api.prepare_relay(relay) {
@@ -902,6 +903,7 @@ mod tests {
blossom_enabled: 0,
livekit_enabled: 0,
push_enabled: 1,
synced: 0,
};
repo.create_relay(&relay).await.expect("create relay");
}
+13 -9
View File
@@ -54,20 +54,24 @@ impl Infra {
let activity = self.repo.list_activity(&since).await?;
for a in activity {
let sync_type = match a.activity_type.as_str() {
"create_relay" => Some(true),
"update_relay" | "deactivate_relay" => Some(false),
_ => None,
};
let needs_sync = matches!(
a.activity_type.as_str(),
"create_relay" | "update_relay" | "deactivate_relay"
);
if let Some(is_new) = sync_type {
if needs_sync {
let Some(relay) = self.repo.get_relay(&a.resource_id).await? else {
continue;
};
if let Err(e) = self.sync_relay(&relay, is_new).await {
tracing::warn!(relay = %relay.id, error = %e, "relay sync failed");
self.repo.fail_relay_sync(&relay, e.to_string()).await?;
let is_new = relay.synced == 0;
match self.sync_relay(&relay, is_new).await {
Ok(()) => self.repo.mark_relay_synced(&relay.id).await?,
Err(e) => {
tracing::warn!(relay = %relay.id, error = %e, "relay sync failed");
self.repo.fail_relay_sync(&relay, e.to_string()).await?;
}
}
}
+2
View File
@@ -48,6 +48,8 @@ pub struct Relay {
pub blossom_enabled: i64,
pub livekit_enabled: i64,
pub push_enabled: i64,
#[serde(skip_serializing)]
pub synced: i64,
}
#[derive(Debug, Clone, Serialize, Deserialize, sqlx::FromRow)]
+30 -22
View File
@@ -162,13 +162,13 @@ impl Repo {
pub async fn list_relays(&self) -> Result<Vec<Relay>> {
let rows = sqlx::query_as::<_, Relay>(
"SELECT id, tenant, schema, subdomain, plan, status, sync_error,
info_name, info_icon, info_description,
policy_public_join, policy_strip_signatures,
groups_enabled, management_enabled, blossom_enabled,
livekit_enabled, push_enabled
FROM relay
ORDER BY id",
"SELECT id, tenant, schema, subdomain, plan, status, sync_error,
info_name, info_icon, info_description,
policy_public_join, policy_strip_signatures,
groups_enabled, management_enabled, blossom_enabled,
livekit_enabled, push_enabled, synced
FROM relay
ORDER BY id",
)
.fetch_all(&self.pool)
.await?;
@@ -177,14 +177,14 @@ impl Repo {
pub async fn list_relays_for_tenant(&self, tenant_id: &str) -> Result<Vec<Relay>> {
let rows = sqlx::query_as::<_, Relay>(
"SELECT id, tenant, schema, subdomain, plan, status, sync_error,
info_name, info_icon, info_description,
policy_public_join, policy_strip_signatures,
groups_enabled, management_enabled, blossom_enabled,
livekit_enabled, push_enabled
FROM relay
WHERE tenant = ?
ORDER BY id",
"SELECT id, tenant, schema, subdomain, plan, status, sync_error,
info_name, info_icon, info_description,
policy_public_join, policy_strip_signatures,
groups_enabled, management_enabled, blossom_enabled,
livekit_enabled, push_enabled, synced
FROM relay
WHERE tenant = ?
ORDER BY id",
)
.bind(tenant_id)
.fetch_all(&self.pool)
@@ -194,13 +194,13 @@ impl Repo {
pub async fn get_relay(&self, id: &str) -> Result<Option<Relay>> {
let row = sqlx::query_as::<_, Relay>(
"SELECT id, tenant, schema, subdomain, plan, status, sync_error,
info_name, info_icon, info_description,
policy_public_join, policy_strip_signatures,
groups_enabled, management_enabled, blossom_enabled,
livekit_enabled, push_enabled
FROM relay
WHERE id = ?",
"SELECT id, tenant, schema, subdomain, plan, status, sync_error,
info_name, info_icon, info_description,
policy_public_join, policy_strip_signatures,
groups_enabled, management_enabled, blossom_enabled,
livekit_enabled, push_enabled, synced
FROM relay
WHERE id = ?",
)
.bind(id)
.fetch_optional(&self.pool)
@@ -312,6 +312,14 @@ impl Repo {
Ok(())
}
pub async fn mark_relay_synced(&self, relay_id: &str) -> Result<()> {
sqlx::query("UPDATE relay SET synced = 1, status = 'active', sync_error = '' WHERE id = ?")
.bind(relay_id)
.execute(&self.pool)
.await?;
Ok(())
}
pub async fn create_invoice(
&self,
invoice: &Invoice,