Compare commits

..

1 Commits

4 changed files with 10 additions and 158 deletions
+2 -5
View File
@@ -15,15 +15,12 @@ Members:
## `pub async fn start(self)`
- Subscribes to `command.notify`
- On startup, schedules delayed sync retries for relays whose `sync_error` is non-empty.
- Loops on `rx.recv()`, calling `handle_activity` for each received `Activity`.
## `async fn handle_activity(&self, activity: &Activity)`
- For `create_relay`, `update_relay`, `activate_relay`, or `deactivate_relay` activity, calls `sync_and_report` immediately.
- For `fail_relay_sync`, schedules a delayed retry using exponential backoff based on consecutive failures for the relay.
- Retry scheduling stops after the configured max attempts to avoid infinite retry loops.
- Other activity types are ignored (e.g. `complete_relay_sync`).
- For `create_relay`, `update_relay`, `activate_relay`, or `deactivate_relay` activity, calls `sync_and_report`.
- All other activity types are ignored (e.g. `fail_relay_sync`, `complete_relay_sync`).
## `async fn sync_and_report(&self, relay: &Relay, is_new: bool)`
-28
View File
@@ -643,13 +643,11 @@ impl Billing {
pub async fn stripe_create_customer(&self, tenant_pubkey: &str) -> Result<String> {
let short_pubkey: String = tenant_pubkey.chars().take(12).collect();
let display_name = format!("Caravel tenant {short_pubkey}");
let idempotency_key = self.idempotency_key(&["create_customer", tenant_pubkey]);
let resp = self
.http
.post(format!("{STRIPE_API}/customers"))
.bearer_auth(&self.stripe_secret_key)
.header("Idempotency-Key", idempotency_key)
.form(&[
("name", display_name.as_str()),
("metadata[tenant_pubkey]", tenant_pubkey),
@@ -836,30 +834,15 @@ impl Billing {
// --- Stripe API helpers ---
fn idempotency_key(&self, parts: &[&str]) -> String {
let mut mac = HmacSha256::new_from_slice(self.stripe_secret_key.as_bytes())
.expect("HMAC accepts any key length");
for (i, part) in parts.iter().enumerate() {
if i > 0 {
mac.update(b":");
}
mac.update(part.as_bytes());
}
hex::encode(mac.finalize().into_bytes())
}
async fn stripe_create_subscription(
&self,
customer_id: &str,
price_id: &str,
) -> Result<(String, String)> {
let idempotency_key =
self.idempotency_key(&["create_subscription", customer_id, price_id]);
let resp = self
.http
.post(format!("{STRIPE_API}/subscriptions"))
.bearer_auth(&self.stripe_secret_key)
.header("Idempotency-Key", idempotency_key)
.form(&[
("customer", customer_id),
("collection_method", "charge_automatically"),
@@ -886,13 +869,10 @@ impl Billing {
subscription_id: &str,
price_id: &str,
) -> Result<String> {
let idempotency_key =
self.idempotency_key(&["create_subscription_item", subscription_id, price_id]);
let resp = self
.http
.post(format!("{STRIPE_API}/subscription_items"))
.bearer_auth(&self.stripe_secret_key)
.header("Idempotency-Key", idempotency_key)
.form(&[("subscription", subscription_id), ("price", price_id)])
.send()
.await?;
@@ -911,13 +891,10 @@ impl Billing {
item_id: &str,
price_id: &str,
) -> Result<String> {
let idempotency_key =
self.idempotency_key(&["update_subscription_item", item_id, price_id]);
let resp = self
.http
.post(format!("{STRIPE_API}/subscription_items/{item_id}"))
.bearer_auth(&self.stripe_secret_key)
.header("Idempotency-Key", idempotency_key)
.form(&[("price", price_id)])
.send()
.await?;
@@ -954,11 +931,9 @@ impl Billing {
}
async fn stripe_pay_invoice(&self, invoice_id: &str) -> Result<()> {
let idempotency_key = self.idempotency_key(&["pay_invoice", invoice_id]);
self.http
.post(format!("{STRIPE_API}/invoices/{invoice_id}/pay"))
.bearer_auth(&self.stripe_secret_key)
.header("Idempotency-Key", idempotency_key)
.send()
.await?
.error_for_status()?;
@@ -998,11 +973,9 @@ impl Billing {
}
async fn stripe_pay_invoice_out_of_band(&self, invoice_id: &str) -> Result<()> {
let idempotency_key = self.idempotency_key(&["pay_invoice_oob", invoice_id]);
self.http
.post(format!("{STRIPE_API}/invoices/{invoice_id}/pay"))
.bearer_auth(&self.stripe_secret_key)
.header("Idempotency-Key", idempotency_key)
.form(&[("paid_out_of_band", "true")])
.send()
.await?
@@ -1455,5 +1428,4 @@ mod tests {
assert_eq!(billing.stripe_secret_key, "sk_test_dummy");
assert_eq!(billing.stripe_webhook_secret, "whsec_test_dummy");
}
}
+8 -108
View File
@@ -1,15 +1,10 @@
use anyhow::Result;
use nostr_sdk::prelude::*;
use std::time::Duration;
use crate::command::Command;
use crate::models::{Activity, RELAY_STATUS_DELINQUENT, RELAY_STATUS_INACTIVE, Relay};
use crate::query::Query;
const RELAY_SYNC_RETRY_BASE_DELAY_SECS: u64 = 30;
const RELAY_SYNC_RETRY_MAX_DELAY_SECS: u64 = 15 * 60;
const RELAY_SYNC_RETRY_MAX_ATTEMPTS: usize = 6;
#[derive(Clone)]
pub struct Infra {
api_url: String,
@@ -53,10 +48,6 @@ impl Infra {
pub async fn start(self) {
let mut rx = self.command.notify.subscribe();
if let Err(e) = self.schedule_startup_retries().await {
tracing::error!(error = %e, "failed to schedule relay sync retries on startup");
}
loop {
match rx.recv().await {
Ok(activity) => {
@@ -75,84 +66,15 @@ impl Infra {
async fn handle_activity(&self, activity: &Activity) -> Result<()> {
let needs_sync = should_sync_relay_activity(activity.activity_type.as_str());
if !needs_sync || activity.resource_type != "relay" {
return Ok(());
if needs_sync {
let Some(relay) = self.query.get_relay(&activity.resource_id).await? else {
return Ok(());
};
let is_new = relay.synced == 0;
self.sync_and_report(&relay, is_new).await;
}
if activity.activity_type == "fail_relay_sync" {
self.schedule_relay_sync_retry(&activity.resource_id, "activity")
.await?;
return Ok(());
}
let Some(relay) = self.query.get_relay(&activity.resource_id).await? else {
return Ok(());
};
let is_new = relay.synced == 0;
self.sync_and_report(&relay, is_new).await;
Ok(())
}
async fn schedule_startup_retries(&self) -> Result<()> {
let relays = self.query.list_relays_with_sync_error().await?;
for relay in relays {
self.schedule_relay_sync_retry(&relay.id, "startup").await?;
}
Ok(())
}
async fn schedule_relay_sync_retry(&self, relay_id: &str, source: &str) -> Result<()> {
let activities = self.query.list_activity_for_relay(relay_id).await?;
let consecutive_failures = consecutive_sync_failures(&activities);
let Some(delay) = relay_sync_retry_delay(consecutive_failures) else {
tracing::warn!(
relay = relay_id,
consecutive_failures,
max_attempts = RELAY_SYNC_RETRY_MAX_ATTEMPTS,
"relay sync retries exhausted; awaiting manual intervention"
);
return Ok(());
};
tracing::info!(
relay = relay_id,
source,
consecutive_failures,
delay_secs = delay.as_secs(),
"scheduled relay sync retry"
);
let relay_id = relay_id.to_string();
let infra = self.clone();
tokio::spawn(async move {
tokio::time::sleep(delay).await;
if let Err(e) = infra.retry_relay_sync(&relay_id).await {
tracing::error!(relay = %relay_id, error = %e, "relay sync retry task failed");
}
});
Ok(())
}
async fn retry_relay_sync(&self, relay_id: &str) -> Result<()> {
let Some(relay) = self.query.get_relay(relay_id).await? else {
return Ok(());
};
if relay.sync_error.trim().is_empty() {
tracing::debug!(relay = %relay.id, "skip relay sync retry; relay has no sync_error");
return Ok(());
}
let is_new = relay.synced == 0;
self.sync_and_report(&relay, is_new).await;
Ok(())
}
@@ -336,28 +258,6 @@ fn relay_sync_body(
fn should_sync_relay_activity(activity_type: &str) -> bool {
matches!(
activity_type,
"create_relay" | "update_relay" | "activate_relay" | "deactivate_relay" | "fail_relay_sync"
"create_relay" | "update_relay" | "activate_relay" | "deactivate_relay"
)
}
fn consecutive_sync_failures(activities: &[Activity]) -> usize {
activities
.iter()
.take_while(|activity| activity.activity_type == "fail_relay_sync")
.count()
}
fn relay_sync_retry_delay(consecutive_failures: usize) -> Option<Duration> {
let retry_attempt = consecutive_failures.max(1);
if retry_attempt > RELAY_SYNC_RETRY_MAX_ATTEMPTS {
return None;
}
let exponent = (retry_attempt - 1).min(31);
let multiplier = 1u64 << exponent;
let delay_secs = RELAY_SYNC_RETRY_BASE_DELAY_SECS
.saturating_mul(multiplier)
.min(RELAY_SYNC_RETRY_MAX_DELAY_SECS);
Some(Duration::from_secs(delay_secs))
}
-17
View File
@@ -94,23 +94,6 @@ impl Query {
Ok(rows)
}
pub async fn list_relays_with_sync_error(&self) -> Result<Vec<Relay>> {
let rows = sqlx::query_as::<_, Relay>(
"SELECT id, tenant, schema, subdomain, plan, stripe_subscription_item_id,
status, sync_error,
info_name, info_icon, info_description,
policy_public_join, policy_strip_signatures,
groups_enabled, management_enabled, blossom_enabled,
livekit_enabled, push_enabled, synced
FROM relay
WHERE TRIM(sync_error) != ''
ORDER BY id",
)
.fetch_all(&self.pool)
.await?;
Ok(rows)
}
pub async fn list_relays_for_tenant(&self, tenant_id: &str) -> Result<Vec<Relay>> {
let rows = sqlx::query_as::<_, Relay>(
"SELECT id, tenant, schema, subdomain, plan, stripe_subscription_item_id,