Fix relay syncing
This commit is contained in:
@@ -19,6 +19,13 @@ Calls `self.tick` in a loop every 10 seconds.
|
|||||||
|
|
||||||
Iterates over `repo.list_activity` since last run and does the following:
|
Iterates over `repo.list_activity` since last run and does the following:
|
||||||
|
|
||||||
- For any `create_relay|update_relay` activity, sync relay config to zooid.
|
- For `create_relay` activity, create the relay in zooid via `POST /relay`.
|
||||||
- For any `deactivate_relay` activity, sync relay config to zooid.
|
- For `update_relay` or `deactivate_relay` activity, update the relay in zooid via `PUT /relay/:id`.
|
||||||
|
- All other activity types are ignored (e.g. `fail_relay_sync` must not trigger another sync).
|
||||||
- If unsuccessful, call `repo.fail_relay_sync`.
|
- If unsuccessful, call `repo.fail_relay_sync`.
|
||||||
|
|
||||||
|
## `async fn sync_relay(&self, relay: &Relay, is_new: bool)`
|
||||||
|
|
||||||
|
- If `is_new`, sends `POST /relay` to create the relay in zooid.
|
||||||
|
- Otherwise, sends `PUT /relay/:id` to update it.
|
||||||
|
- Passes full relay configuration in the body including host, schema, secret, inactive flag, info, policy, groups, management, blossom, livekit, push, and roles.
|
||||||
|
|||||||
+22
-10
@@ -33,6 +33,12 @@ impl Infra {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub async fn start(self) {
|
pub async fn start(self) {
|
||||||
|
// Initialize from DB so we don't replay historical activities on restart
|
||||||
|
match self.repo.max_activity_at().await {
|
||||||
|
Ok(ts) => *self.last_activity_at.lock().await = ts,
|
||||||
|
Err(e) => tracing::error!(error = %e, "failed to read max activity timestamp"),
|
||||||
|
}
|
||||||
|
|
||||||
let mut interval = tokio::time::interval(std::time::Duration::from_secs(10));
|
let mut interval = tokio::time::interval(std::time::Duration::from_secs(10));
|
||||||
loop {
|
loop {
|
||||||
interval.tick().await;
|
interval.tick().await;
|
||||||
@@ -48,17 +54,18 @@ impl Infra {
|
|||||||
let activity = self.repo.list_activity(&since).await?;
|
let activity = self.repo.list_activity(&since).await?;
|
||||||
|
|
||||||
for a in activity {
|
for a in activity {
|
||||||
if a.resource_type == "relay"
|
let sync_type = match a.activity_type.as_str() {
|
||||||
&& matches!(
|
"create_relay" => Some(true),
|
||||||
a.activity_type.as_str(),
|
"update_relay" | "deactivate_relay" => Some(false),
|
||||||
"create_relay" | "update_relay" | "deactivate_relay"
|
_ => None,
|
||||||
)
|
};
|
||||||
{
|
|
||||||
|
if let Some(is_new) = sync_type {
|
||||||
let Some(relay) = self.repo.get_relay(&a.resource_id).await? else {
|
let Some(relay) = self.repo.get_relay(&a.resource_id).await? else {
|
||||||
continue;
|
continue;
|
||||||
};
|
};
|
||||||
|
|
||||||
if let Err(e) = self.sync_relay(&relay).await {
|
if let Err(e) = self.sync_relay(&relay, is_new).await {
|
||||||
tracing::warn!(relay = %relay.id, error = %e, "relay sync failed");
|
tracing::warn!(relay = %relay.id, error = %e, "relay sync failed");
|
||||||
self.repo.fail_relay_sync(&relay, e.to_string()).await?;
|
self.repo.fail_relay_sync(&relay, e.to_string()).await?;
|
||||||
}
|
}
|
||||||
@@ -70,9 +77,9 @@ impl Infra {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn sync_relay(&self, relay: &crate::models::Relay) -> Result<()> {
|
async fn sync_relay(&self, relay: &crate::models::Relay, is_new: bool) -> Result<()> {
|
||||||
let client = reqwest::Client::new();
|
let client = reqwest::Client::new();
|
||||||
let url = format!("{}/relay/{}", self.api_url.trim_end_matches('/'), relay.id);
|
let base = self.api_url.trim_end_matches('/');
|
||||||
|
|
||||||
let host = if self.relay_domain.is_empty() {
|
let host = if self.relay_domain.is_empty() {
|
||||||
relay.subdomain.clone()
|
relay.subdomain.clone()
|
||||||
@@ -119,7 +126,12 @@ impl Infra {
|
|||||||
],
|
],
|
||||||
});
|
});
|
||||||
|
|
||||||
let response = client.put(url).json(&body).send().await?;
|
let response = if is_new {
|
||||||
|
client.post(format!("{}/relay", base)).json(&body).send().await?
|
||||||
|
} else {
|
||||||
|
client.put(format!("{}/relay/{}", base, relay.id)).json(&body).send().await?
|
||||||
|
};
|
||||||
|
|
||||||
if !response.status().is_success() {
|
if !response.status().is_success() {
|
||||||
anyhow::bail!("zooid sync returned {}", response.status())
|
anyhow::bail!("zooid sync returned {}", response.status())
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -468,6 +468,15 @@ impl Repo {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub async fn max_activity_at(&self) -> Result<i64> {
|
||||||
|
let val = sqlx::query_scalar::<_, Option<i64>>(
|
||||||
|
"SELECT MAX(created_at) FROM activity",
|
||||||
|
)
|
||||||
|
.fetch_one(&self.pool)
|
||||||
|
.await?;
|
||||||
|
Ok(val.unwrap_or(0))
|
||||||
|
}
|
||||||
|
|
||||||
pub async fn list_activity(&self, since: &i64) -> Result<Vec<Activity>> {
|
pub async fn list_activity(&self, since: &i64) -> Result<Vec<Activity>> {
|
||||||
let rows = sqlx::query_as::<_, Activity>(
|
let rows = sqlx::query_as::<_, Activity>(
|
||||||
"SELECT id, tenant, created_at, activity_type, resource_type, resource_id
|
"SELECT id, tenant, created_at, activity_type, resource_type, resource_id
|
||||||
|
|||||||
Reference in New Issue
Block a user