From 3ae540e4379dd5cc6428dc23eab9764f599939ab Mon Sep 17 00:00:00 2001 From: userAdityaa Date: Sat, 11 Apr 2026 17:28:58 +0545 Subject: [PATCH] fix(billing): ensure all tenants have valid Stripe customer IDs --- .../0002_tenant_stripe_customer_id_guards.sql | 19 ++++ backend/src/api.rs | 93 ++++++++++++++----- backend/src/billing.rs | 88 ++++++++++++++---- backend/src/command.rs | 24 +++++ 4 files changed, 180 insertions(+), 44 deletions(-) create mode 100644 backend/migrations/0002_tenant_stripe_customer_id_guards.sql diff --git a/backend/migrations/0002_tenant_stripe_customer_id_guards.sql b/backend/migrations/0002_tenant_stripe_customer_id_guards.sql new file mode 100644 index 0000000..13bee6c --- /dev/null +++ b/backend/migrations/0002_tenant_stripe_customer_id_guards.sql @@ -0,0 +1,19 @@ +CREATE UNIQUE INDEX IF NOT EXISTS tenant_stripe_customer_id_unique +ON tenant(stripe_customer_id) +WHERE stripe_customer_id <> ''; + +CREATE TRIGGER IF NOT EXISTS tenant_stripe_customer_id_not_empty_insert +BEFORE INSERT ON tenant +FOR EACH ROW +WHEN trim(NEW.stripe_customer_id) = '' +BEGIN + SELECT RAISE(ABORT, 'stripe_customer_id cannot be empty'); +END; + +CREATE TRIGGER IF NOT EXISTS tenant_stripe_customer_id_not_empty_update +BEFORE UPDATE OF stripe_customer_id ON tenant +FOR EACH ROW +WHEN trim(NEW.stripe_customer_id) = '' +BEGIN + SELECT RAISE(ABORT, 'stripe_customer_id cannot be empty'); +END; diff --git a/backend/src/api.rs b/backend/src/api.rs index e193ff4..fc4999a 100644 --- a/backend/src/api.rs +++ b/backend/src/api.rs @@ -365,32 +365,77 @@ async fn get_identity( let pubkey = state.api.extract_auth_pubkey(&headers)?; let is_admin = state.api.admins.iter().any(|a| a == &pubkey); - // Only create if tenant doesn't exist yet - if let Ok(None) = state.api.query.get_tenant(&pubkey).await { - // TODO: Call Stripe API to create a new customer - let stripe_customer_id = String::new(); + // Ensure tenant exists and always has a Stripe customer id. + match state.api.query.get_tenant(&pubkey).await { + Ok(Some(existing_tenant)) => { + if existing_tenant.stripe_customer_id.trim().is_empty() { + let stripe_customer_id = + match state.api.billing.stripe_create_customer(&pubkey).await { + Ok(id) => id, + Err(e) => { + return Ok(err( + StatusCode::INTERNAL_SERVER_ERROR, + "stripe-customer-create-failed", + &e.to_string(), + )); + } + }; - let tenant = Tenant { - pubkey: pubkey.clone(), - nwc_url: String::new(), - nwc_error: None, - created_at: now_ts(), - stripe_customer_id, - stripe_subscription_id: None, - past_due_at: None, - }; - - match state.api.command.create_tenant(&tenant).await { - Ok(()) => {} - Err(e) if matches!(map_unique_error(&e), Some("pubkey-exists")) => {} - Err(e) => { - return Ok(err( - StatusCode::INTERNAL_SERVER_ERROR, - "internal", - &e.to_string(), - )); + if let Err(e) = state + .api + .command + .set_tenant_stripe_customer_id(&pubkey, &stripe_customer_id) + .await + { + return Ok(err( + StatusCode::INTERNAL_SERVER_ERROR, + "internal", + &e.to_string(), + )); + } } - }; + } + Ok(None) => { + let stripe_customer_id = match state.api.billing.stripe_create_customer(&pubkey).await { + Ok(id) => id, + Err(e) => { + return Ok(err( + StatusCode::INTERNAL_SERVER_ERROR, + "stripe-customer-create-failed", + &e.to_string(), + )); + } + }; + + let tenant = Tenant { + pubkey: pubkey.clone(), + nwc_url: String::new(), + nwc_error: None, + created_at: now_ts(), + stripe_customer_id, + stripe_subscription_id: None, + past_due_at: None, + }; + + match state.api.command.create_tenant(&tenant).await { + Ok(()) => {} + Err(e) if matches!(map_unique_error(&e), Some("pubkey-exists")) => {} + Err(e) => { + return Ok(err( + StatusCode::INTERNAL_SERVER_ERROR, + "internal", + &e.to_string(), + )); + } + }; + } + Err(e) => { + return Ok(err( + StatusCode::INTERNAL_SERVER_ERROR, + "internal", + &e.to_string(), + )); + } } Ok(ok( diff --git a/backend/src/billing.rs b/backend/src/billing.rs index 89d84e8..b9013ee 100644 --- a/backend/src/billing.rs +++ b/backend/src/billing.rs @@ -1,7 +1,7 @@ use anyhow::{Result, anyhow}; use hmac::{Hmac, Mac}; use nwc::prelude::{ - MakeInvoiceRequest, NostrWalletConnectURI, PayInvoiceRequest as NwcPayInvoiceRequest, NWC, + MakeInvoiceRequest, NWC, NostrWalletConnectURI, PayInvoiceRequest as NwcPayInvoiceRequest, }; use sha2::Sha256; @@ -75,8 +75,12 @@ impl Billing { async fn handle_activity(&self, activity: &Activity) -> Result<()> { let needs_billing_sync = matches!( activity.activity_type.as_str(), - "create_relay" | "update_relay" | "activate_relay" | "deactivate_relay" - | "fail_relay_sync" | "complete_relay_sync" + "create_relay" + | "update_relay" + | "activate_relay" + | "deactivate_relay" + | "fail_relay_sync" + | "complete_relay_sync" ); if needs_billing_sync { @@ -99,7 +103,9 @@ impl Billing { if relay.plan == "free" { if let Some(ref item_id) = relay.stripe_subscription_item_id { self.stripe_delete_subscription_item(item_id).await?; - self.command.delete_relay_subscription_item(&relay.id).await?; + self.command + .delete_relay_subscription_item(&relay.id) + .await?; } self.cleanup_empty_subscription(&tenant.pubkey).await?; return Ok(()); @@ -109,16 +115,16 @@ impl Billing { if relay.status == "inactive" { if let Some(ref item_id) = relay.stripe_subscription_item_id { self.stripe_delete_subscription_item(item_id).await?; - self.command.delete_relay_subscription_item(&relay.id).await?; + self.command + .delete_relay_subscription_item(&relay.id) + .await?; } self.cleanup_empty_subscription(&tenant.pubkey).await?; return Ok(()); } // Active relay on a paid plan - let plan = Query::list_plans() - .into_iter() - .find(|p| p.id == relay.plan); + let plan = Query::list_plans().into_iter().find(|p| p.id == relay.plan); let Some(plan) = plan else { return Ok(()); @@ -128,6 +134,13 @@ impl Billing { return Ok(()); }; + if tenant.stripe_customer_id.trim().is_empty() { + return Err(anyhow!( + "tenant {} has no stripe_customer_id", + tenant.pubkey + )); + } + // Ensure subscription exists if tenant.stripe_subscription_id.is_none() { let (subscription_id, item_id) = self @@ -170,7 +183,9 @@ impl Billing { if let Some(ref subscription_id) = tenant.stripe_subscription_id { self.stripe_cancel_subscription(subscription_id).await?; - self.command.clear_tenant_subscription(tenant_pubkey).await?; + self.command + .clear_tenant_subscription(tenant_pubkey) + .await?; } Ok(()) @@ -240,7 +255,9 @@ impl Billing { return Err(anyhow!("webhook signature mismatch")); } - let ts: i64 = timestamp.parse().map_err(|_| anyhow!("bad webhook timestamp"))?; + let ts: i64 = timestamp + .parse() + .map_err(|_| anyhow!("bad webhook timestamp"))?; let now = chrono::Utc::now().timestamp(); if (now - ts).abs() > WEBHOOK_TOLERANCE_SECS { return Err(anyhow!("webhook timestamp outside tolerance")); @@ -272,9 +289,7 @@ impl Billing { match self.nwc_pay_invoice(amount_due, &tenant.nwc_url).await { Ok(()) => { self.stripe_pay_invoice_out_of_band(invoice_id).await?; - self.command - .clear_tenant_nwc_error(&tenant.pubkey) - .await?; + self.command.clear_tenant_nwc_error(&tenant.pubkey).await?; return Ok(()); } Err(e) => { @@ -442,6 +457,37 @@ impl Billing { Ok((invoice, tenant)) } + pub async fn stripe_create_customer(&self, tenant_pubkey: &str) -> Result { + if self.stripe_secret_key.trim().is_empty() { + return Err(anyhow!("missing STRIPE_SECRET_KEY")); + } + + let short_pubkey: String = tenant_pubkey.chars().take(12).collect(); + let display_name = format!("Caravel tenant {short_pubkey}"); + + let resp = self + .http + .post(format!("{STRIPE_API}/customers")) + .bearer_auth(&self.stripe_secret_key) + .form(&[ + ("name", display_name.as_str()), + ("metadata[tenant_pubkey]", tenant_pubkey), + ]) + .send() + .await?; + + let body: serde_json::Value = resp.error_for_status()?.json().await?; + let customer_id = body["id"] + .as_str() + .ok_or_else(|| anyhow!("missing customer id"))?; + + if !customer_id.starts_with("cus_") { + return Err(anyhow!("unexpected customer id format")); + } + + Ok(customer_id.to_string()) + } + pub async fn stripe_list_invoices(&self, customer_id: &str) -> Result { let resp = self .http @@ -470,7 +516,9 @@ impl Billing { pub async fn create_bolt11(&self, amount_due_cents: i64) -> Result { let amount_msats = (amount_due_cents as u64) * 1000; // placeholder conversion - let system_uri: NostrWalletConnectURI = self.nwc_url.parse() + let system_uri: NostrWalletConnectURI = self + .nwc_url + .parse() .map_err(|_| anyhow!("invalid system NWC URL"))?; let system_nwc = NWC::new(system_uri); @@ -550,10 +598,7 @@ impl Billing { .http .post(format!("{STRIPE_API}/subscription_items")) .bearer_auth(&self.stripe_secret_key) - .form(&[ - ("subscription", subscription_id), - ("price", price_id), - ]) + .form(&[("subscription", subscription_id), ("price", price_id)]) .send() .await?; @@ -649,7 +694,9 @@ impl Billing { let amount_msats = (amount_due_cents as u64) * 1000; // placeholder conversion, actual rate would come from exchange // Create a bolt11 invoice using the system wallet (self.nwc_url) - let system_uri: NostrWalletConnectURI = self.nwc_url.parse() + let system_uri: NostrWalletConnectURI = self + .nwc_url + .parse() .map_err(|_| anyhow!("invalid system NWC URL"))?; let system_nwc = NWC::new(system_uri); @@ -668,7 +715,8 @@ impl Billing { system_nwc.shutdown().await; // Pay the bolt11 invoice using the tenant's wallet - let tenant_uri: NostrWalletConnectURI = tenant_nwc_url.parse() + let tenant_uri: NostrWalletConnectURI = tenant_nwc_url + .parse() .map_err(|_| anyhow!("invalid tenant NWC URL"))?; let tenant_nwc = NWC::new(tenant_uri); diff --git a/backend/src/command.rs b/backend/src/command.rs index 72bd22c..ae2a58f 100644 --- a/backend/src/command.rs +++ b/backend/src/command.rs @@ -64,6 +64,10 @@ impl Command { } pub async fn create_tenant(&self, tenant: &Tenant) -> Result<()> { + if tenant.stripe_customer_id.trim().is_empty() { + anyhow::bail!("stripe_customer_id is required"); + } + let mut tx = self.pool.begin().await?; sqlx::query( @@ -264,6 +268,26 @@ impl Command { Ok(()) } + pub async fn set_tenant_stripe_customer_id(&self, pubkey: &str, stripe_customer_id: &str) -> Result<()> { + if stripe_customer_id.trim().is_empty() { + anyhow::bail!("stripe_customer_id is required"); + } + + let mut tx = self.pool.begin().await?; + + sqlx::query("UPDATE tenant SET stripe_customer_id = ? WHERE pubkey = ?") + .bind(stripe_customer_id) + .bind(pubkey) + .execute(&mut *tx) + .await?; + + let activity = Self::insert_activity(&mut tx, "update_tenant", "tenant", pubkey).await?; + + tx.commit().await?; + self.emit(activity); + Ok(()) + } + pub async fn clear_tenant_subscription(&self, pubkey: &str) -> Result<()> { sqlx::query("UPDATE tenant SET stripe_subscription_id = NULL WHERE pubkey = ?") .bind(pubkey)