Add Stripe subscription sync
This commit is contained in:
+1
-22
@@ -38,15 +38,8 @@ impl Infra {
|
||||
}
|
||||
|
||||
pub async fn start(self) {
|
||||
// Subscribe before catch-up so no activities are missed between query and listen
|
||||
let mut rx = self.command.notify.subscribe();
|
||||
|
||||
// Catch up on any unsynced relays from before this process started
|
||||
match self.catch_up().await {
|
||||
Ok(()) => {}
|
||||
Err(e) => tracing::error!(error = %e, "infra catch-up failed"),
|
||||
}
|
||||
|
||||
loop {
|
||||
match rx.recv().await {
|
||||
Ok(activity) => {
|
||||
@@ -55,27 +48,13 @@ impl Infra {
|
||||
}
|
||||
}
|
||||
Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
|
||||
tracing::warn!(missed = n, "infra lagged, running catch-up");
|
||||
if let Err(e) = self.catch_up().await {
|
||||
tracing::error!(error = %e, "infra catch-up failed");
|
||||
}
|
||||
tracing::warn!(missed = n, "infra lagged");
|
||||
}
|
||||
Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn catch_up(&self) -> Result<()> {
|
||||
let relays = self.query.list_relays().await?;
|
||||
for relay in relays {
|
||||
if relay.synced == 0 && relay.sync_error.is_empty() {
|
||||
let is_new = relay.synced == 0;
|
||||
self.sync_and_report(&relay, is_new).await;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn handle_activity(&self, activity: &Activity) -> Result<()> {
|
||||
let needs_sync = matches!(
|
||||
activity.activity_type.as_str(),
|
||||
|
||||
@@ -113,28 +113,6 @@ impl Query {
|
||||
Ok(row)
|
||||
}
|
||||
|
||||
pub async fn max_activity_at(&self) -> Result<i64> {
|
||||
let val = sqlx::query_scalar::<_, Option<i64>>(
|
||||
"SELECT MAX(created_at) FROM activity",
|
||||
)
|
||||
.fetch_one(&self.pool)
|
||||
.await?;
|
||||
Ok(val.unwrap_or(0))
|
||||
}
|
||||
|
||||
pub async fn list_activity(&self, since: &i64) -> Result<Vec<Activity>> {
|
||||
let rows = sqlx::query_as::<_, Activity>(
|
||||
"SELECT id, tenant, created_at, activity_type, resource_type, resource_id
|
||||
FROM activity
|
||||
WHERE created_at > ?
|
||||
ORDER BY created_at, id",
|
||||
)
|
||||
.bind(since)
|
||||
.fetch_all(&self.pool)
|
||||
.await?;
|
||||
Ok(rows)
|
||||
}
|
||||
|
||||
pub async fn list_activity_for_relay(&self, relay_id: &str) -> Result<Vec<Activity>> {
|
||||
let rows = sqlx::query_as::<_, Activity>(
|
||||
"SELECT id, tenant, created_at, activity_type, resource_type, resource_id
|
||||
|
||||
Reference in New Issue
Block a user