Compare commits

..

1 Commits

Author SHA1 Message Date
userAdityaa 5b53659a5b fix: relay sync failures with delayed bounded retries 2026-04-23 17:20:13 +05:45
+11 -69
View File
@@ -17,9 +17,6 @@ type HmacSha256 = Hmac<Sha256>;
const STRIPE_API: &str = "https://api.stripe.com/v1"; const STRIPE_API: &str = "https://api.stripe.com/v1";
const COINBASE_SPOT_API: &str = "https://api.coinbase.com/v2/prices"; const COINBASE_SPOT_API: &str = "https://api.coinbase.com/v2/prices";
const WEBHOOK_TOLERANCE_SECS: i64 = 300; const WEBHOOK_TOLERANCE_SECS: i64 = 300;
const MANUAL_LIGHTNING_PAYMENT_DM: &str = "Payment is due for your relay subscription. Please visit the application to complete a manual Lightning payment.";
const NWC_ERROR_DM_PREFIX: &str = "NWC auto-payment failed:";
const NWC_ERROR_DM_MAX_CHARS: usize = 240;
#[derive(Debug)] #[derive(Debug)]
pub enum InvoiceLookupError { pub enum InvoiceLookupError {
@@ -83,6 +80,7 @@ pub struct Billing {
nwc_url: String, nwc_url: String,
stripe_secret_key: String, stripe_secret_key: String,
stripe_webhook_secret: String, stripe_webhook_secret: String,
btc_quote_api_base: String,
http: reqwest::Client, http: reqwest::Client,
query: Query, query: Query,
command: Command, command: Command,
@@ -100,10 +98,13 @@ impl Billing {
if stripe_webhook_secret.trim().is_empty() { if stripe_webhook_secret.trim().is_empty() {
panic!("missing STRIPE_WEBHOOK_SECRET environment variable"); panic!("missing STRIPE_WEBHOOK_SECRET environment variable");
} }
let btc_quote_api_base =
std::env::var("BTC_PRICE_API_BASE").unwrap_or_else(|_| COINBASE_SPOT_API.to_string());
Self { Self {
nwc_url, nwc_url,
stripe_secret_key, stripe_secret_key,
stripe_webhook_secret, stripe_webhook_secret,
btc_quote_api_base,
http: reqwest::Client::new(), http: reqwest::Client::new(),
query, query,
command, command,
@@ -359,8 +360,6 @@ impl Billing {
return Ok(()); return Ok(());
}; };
let mut nwc_error_for_dm: Option<String> = None;
// 1. NWC auto-pay: if the tenant has a nwc_url // 1. NWC auto-pay: if the tenant has a nwc_url
if !tenant.nwc_url.is_empty() { if !tenant.nwc_url.is_empty() {
match self match self
@@ -377,14 +376,6 @@ impl Billing {
self.command self.command
.set_tenant_nwc_error(&tenant.pubkey, &error_msg) .set_tenant_nwc_error(&tenant.pubkey, &error_msg)
.await?; .await?;
tracing::warn!(
error = %e,
tenant_pubkey = %tenant.pubkey,
stripe_customer_id,
invoice_id,
"nwc auto-payment failed for invoice.created"
);
nwc_error_for_dm = summarize_nwc_error_for_dm(&error_msg);
// Fall through to next option // Fall through to next option
} }
} }
@@ -399,8 +390,12 @@ impl Billing {
} }
// 3. Manual payment: send a DM // 3. Manual payment: send a DM
let dm_message = manual_lightning_payment_dm(nwc_error_for_dm.as_deref()); self.robot
self.robot.send_dm(&tenant.pubkey, &dm_message).await?; .send_dm(
&tenant.pubkey,
"Payment is due for your relay subscription. Please visit the application to complete a manual Lightning payment.",
)
.await?;
Ok(()) Ok(())
} }
@@ -643,13 +638,11 @@ impl Billing {
pub async fn stripe_create_customer(&self, tenant_pubkey: &str) -> Result<String> { pub async fn stripe_create_customer(&self, tenant_pubkey: &str) -> Result<String> {
let short_pubkey: String = tenant_pubkey.chars().take(12).collect(); let short_pubkey: String = tenant_pubkey.chars().take(12).collect();
let display_name = format!("Caravel tenant {short_pubkey}"); let display_name = format!("Caravel tenant {short_pubkey}");
let idempotency_key = self.idempotency_key(&["create_customer", tenant_pubkey]);
let resp = self let resp = self
.http .http
.post(format!("{STRIPE_API}/customers")) .post(format!("{STRIPE_API}/customers"))
.bearer_auth(&self.stripe_secret_key) .bearer_auth(&self.stripe_secret_key)
.header("Idempotency-Key", idempotency_key)
.form(&[ .form(&[
("name", display_name.as_str()), ("name", display_name.as_str()),
("metadata[tenant_pubkey]", tenant_pubkey), ("metadata[tenant_pubkey]", tenant_pubkey),
@@ -836,30 +829,15 @@ impl Billing {
// --- Stripe API helpers --- // --- Stripe API helpers ---
fn idempotency_key(&self, parts: &[&str]) -> String {
let mut mac = HmacSha256::new_from_slice(self.stripe_secret_key.as_bytes())
.expect("HMAC accepts any key length");
for (i, part) in parts.iter().enumerate() {
if i > 0 {
mac.update(b":");
}
mac.update(part.as_bytes());
}
hex::encode(mac.finalize().into_bytes())
}
async fn stripe_create_subscription( async fn stripe_create_subscription(
&self, &self,
customer_id: &str, customer_id: &str,
price_id: &str, price_id: &str,
) -> Result<(String, String)> { ) -> Result<(String, String)> {
let idempotency_key =
self.idempotency_key(&["create_subscription", customer_id, price_id]);
let resp = self let resp = self
.http .http
.post(format!("{STRIPE_API}/subscriptions")) .post(format!("{STRIPE_API}/subscriptions"))
.bearer_auth(&self.stripe_secret_key) .bearer_auth(&self.stripe_secret_key)
.header("Idempotency-Key", idempotency_key)
.form(&[ .form(&[
("customer", customer_id), ("customer", customer_id),
("collection_method", "charge_automatically"), ("collection_method", "charge_automatically"),
@@ -886,13 +864,10 @@ impl Billing {
subscription_id: &str, subscription_id: &str,
price_id: &str, price_id: &str,
) -> Result<String> { ) -> Result<String> {
let idempotency_key =
self.idempotency_key(&["create_subscription_item", subscription_id, price_id]);
let resp = self let resp = self
.http .http
.post(format!("{STRIPE_API}/subscription_items")) .post(format!("{STRIPE_API}/subscription_items"))
.bearer_auth(&self.stripe_secret_key) .bearer_auth(&self.stripe_secret_key)
.header("Idempotency-Key", idempotency_key)
.form(&[("subscription", subscription_id), ("price", price_id)]) .form(&[("subscription", subscription_id), ("price", price_id)])
.send() .send()
.await?; .await?;
@@ -911,13 +886,10 @@ impl Billing {
item_id: &str, item_id: &str,
price_id: &str, price_id: &str,
) -> Result<String> { ) -> Result<String> {
let idempotency_key =
self.idempotency_key(&["update_subscription_item", item_id, price_id]);
let resp = self let resp = self
.http .http
.post(format!("{STRIPE_API}/subscription_items/{item_id}")) .post(format!("{STRIPE_API}/subscription_items/{item_id}"))
.bearer_auth(&self.stripe_secret_key) .bearer_auth(&self.stripe_secret_key)
.header("Idempotency-Key", idempotency_key)
.form(&[("price", price_id)]) .form(&[("price", price_id)])
.send() .send()
.await?; .await?;
@@ -954,11 +926,9 @@ impl Billing {
} }
async fn stripe_pay_invoice(&self, invoice_id: &str) -> Result<()> { async fn stripe_pay_invoice(&self, invoice_id: &str) -> Result<()> {
let idempotency_key = self.idempotency_key(&["pay_invoice", invoice_id]);
self.http self.http
.post(format!("{STRIPE_API}/invoices/{invoice_id}/pay")) .post(format!("{STRIPE_API}/invoices/{invoice_id}/pay"))
.bearer_auth(&self.stripe_secret_key) .bearer_auth(&self.stripe_secret_key)
.header("Idempotency-Key", idempotency_key)
.send() .send()
.await? .await?
.error_for_status()?; .error_for_status()?;
@@ -998,11 +968,9 @@ impl Billing {
} }
async fn stripe_pay_invoice_out_of_band(&self, invoice_id: &str) -> Result<()> { async fn stripe_pay_invoice_out_of_band(&self, invoice_id: &str) -> Result<()> {
let idempotency_key = self.idempotency_key(&["pay_invoice_oob", invoice_id]);
self.http self.http
.post(format!("{STRIPE_API}/invoices/{invoice_id}/pay")) .post(format!("{STRIPE_API}/invoices/{invoice_id}/pay"))
.bearer_auth(&self.stripe_secret_key) .bearer_auth(&self.stripe_secret_key)
.header("Idempotency-Key", idempotency_key)
.form(&[("paid_out_of_band", "true")]) .form(&[("paid_out_of_band", "true")])
.send() .send()
.await? .await?
@@ -1089,7 +1057,7 @@ impl Billing {
} }
async fn fetch_btc_spot_price(&self, currency: &str) -> Result<f64> { async fn fetch_btc_spot_price(&self, currency: &str) -> Result<f64> {
fetch_btc_spot_price_from_base(&self.http, COINBASE_SPOT_API, currency).await fetch_btc_spot_price_from_base(&self.http, &self.btc_quote_api_base, currency).await
} }
fn currency_minor_exponent(currency: &str) -> Result<u8> { fn currency_minor_exponent(currency: &str) -> Result<u8> {
@@ -1133,31 +1101,6 @@ pub async fn fetch_btc_spot_price_from_base(
Ok(amount) Ok(amount)
} }
fn summarize_nwc_error_for_dm(error: &str) -> Option<String> {
let normalized = error.split_whitespace().collect::<Vec<_>>().join(" ");
if normalized.is_empty() {
return None;
}
if normalized.chars().count() <= NWC_ERROR_DM_MAX_CHARS {
return Some(normalized);
}
let prefix_len = NWC_ERROR_DM_MAX_CHARS.saturating_sub(3);
let mut truncated = normalized.chars().take(prefix_len).collect::<String>();
truncated.push_str("...");
Some(truncated)
}
fn manual_lightning_payment_dm(nwc_error: Option<&str>) -> String {
match nwc_error {
Some(error) if !error.is_empty() => {
format!("{MANUAL_LIGHTNING_PAYMENT_DM}\n\n{NWC_ERROR_DM_PREFIX} {error}")
}
_ => MANUAL_LIGHTNING_PAYMENT_DM.to_string(),
}
}
pub fn fiat_minor_to_msats_from_quote( pub fn fiat_minor_to_msats_from_quote(
amount_due_minor: i64, amount_due_minor: i64,
currency: &str, currency: &str,
@@ -1455,5 +1398,4 @@ mod tests {
assert_eq!(billing.stripe_secret_key, "sk_test_dummy"); assert_eq!(billing.stripe_secret_key, "sk_test_dummy");
assert_eq!(billing.stripe_webhook_secret, "whsec_test_dummy"); assert_eq!(billing.stripe_webhook_secret, "whsec_test_dummy");
} }
} }