From b7737967436005b2b9cc955a55c86b413d841107 Mon Sep 17 00:00:00 2001 From: userAdityaa Date: Sat, 11 Apr 2026 17:28:58 +0545 Subject: [PATCH] fix(billing): ensure all tenants have valid Stripe customer IDs --- backend/migrations/0001_init.sql | 2 +- backend/src/api.rs | 110 ++++++++++++++++++++----------- backend/src/billing.rs | 88 +++++++++++++++++++------ backend/src/command.rs | 31 +++++++-- 4 files changed, 166 insertions(+), 65 deletions(-) diff --git a/backend/migrations/0001_init.sql b/backend/migrations/0001_init.sql index 1e77dda..3bf6c47 100644 --- a/backend/migrations/0001_init.sql +++ b/backend/migrations/0001_init.sql @@ -12,7 +12,7 @@ CREATE TABLE IF NOT EXISTS tenant ( nwc_url TEXT NOT NULL DEFAULT '', nwc_error TEXT, created_at INTEGER NOT NULL, - stripe_customer_id TEXT NOT NULL DEFAULT '', + stripe_customer_id TEXT NOT NULL, stripe_subscription_id TEXT, past_due_at INTEGER ); diff --git a/backend/src/api.rs b/backend/src/api.rs index e193ff4..958ee4d 100644 --- a/backend/src/api.rs +++ b/backend/src/api.rs @@ -122,7 +122,10 @@ impl Api { .route("/tenants/:pubkey/invoices", get(list_tenant_invoices)) .route("/invoices/:id", get(get_invoice)) .route("/invoices/:id/bolt11", get(get_invoice_bolt11)) - .route("/tenants/:pubkey/stripe/session", get(create_stripe_session)) + .route( + "/tenants/:pubkey/stripe/session", + get(create_stripe_session), + ) .route("/stripe/webhook", post(stripe_webhook)) .with_state(state) } @@ -365,41 +368,53 @@ async fn get_identity( let pubkey = state.api.extract_auth_pubkey(&headers)?; let is_admin = state.api.admins.iter().any(|a| a == &pubkey); - // Only create if tenant doesn't exist yet - if let Ok(None) = state.api.query.get_tenant(&pubkey).await { - // TODO: Call Stripe API to create a new customer - let stripe_customer_id = String::new(); + // Ensure tenant exists. + match state.api.query.get_tenant(&pubkey).await { + Ok(Some(_)) => {} + Ok(None) => { + let stripe_customer_id = match state.api.billing.stripe_create_customer(&pubkey).await { + Ok(id) => id, + Err(e) => { + return Ok(err( + StatusCode::INTERNAL_SERVER_ERROR, + "stripe-customer-create-failed", + &e.to_string(), + )); + } + }; - let tenant = Tenant { - pubkey: pubkey.clone(), - nwc_url: String::new(), - nwc_error: None, - created_at: now_ts(), - stripe_customer_id, - stripe_subscription_id: None, - past_due_at: None, - }; + let tenant = Tenant { + pubkey: pubkey.clone(), + nwc_url: String::new(), + nwc_error: None, + created_at: now_ts(), + stripe_customer_id, + stripe_subscription_id: None, + past_due_at: None, + }; - match state.api.command.create_tenant(&tenant).await { - Ok(()) => {} - Err(e) if matches!(map_unique_error(&e), Some("pubkey-exists")) => {} - Err(e) => { - return Ok(err( - StatusCode::INTERNAL_SERVER_ERROR, - "internal", - &e.to_string(), - )); - } - }; + match state.api.command.create_tenant(&tenant).await { + Ok(()) => {} + Err(e) if matches!(map_unique_error(&e), Some("pubkey-exists")) => {} + Err(e) => { + return Ok(err( + StatusCode::INTERNAL_SERVER_ERROR, + "internal", + &e.to_string(), + )); + } + }; + } + Err(e) => { + return Ok(err( + StatusCode::INTERNAL_SERVER_ERROR, + "internal", + &e.to_string(), + )); + } } - Ok(ok( - StatusCode::OK, - IdentityResponse { - pubkey, - is_admin, - }, - )) + Ok(ok(StatusCode::OK, IdentityResponse { pubkey, is_admin })) } async fn get_plan(Path(id): Path) -> Response { @@ -489,14 +504,27 @@ async fn list_relay_activity( let relay = match state.api.query.get_relay(&id).await { Ok(Some(r)) => r, Ok(None) => return Ok(err(StatusCode::NOT_FOUND, "not-found", "relay not found")), - Err(e) => return Ok(err(StatusCode::INTERNAL_SERVER_ERROR, "internal", &e.to_string())), + Err(e) => { + return Ok(err( + StatusCode::INTERNAL_SERVER_ERROR, + "internal", + &e.to_string(), + )); + } }; state.api.require_admin_or_tenant(&auth, &relay.tenant)?; match state.api.query.list_activity_for_relay(&id).await { - Ok(activity) => Ok(ok(StatusCode::OK, serde_json::json!({ "activity": activity }))), - Err(e) => Ok(err(StatusCode::INTERNAL_SERVER_ERROR, "internal", &e.to_string())), + Ok(activity) => Ok(ok( + StatusCode::OK, + serde_json::json!({ "activity": activity }), + )), + Err(e) => Ok(err( + StatusCode::INTERNAL_SERVER_ERROR, + "internal", + &e.to_string(), + )), } } @@ -773,7 +801,11 @@ async fn get_invoice( Path(id): Path, ) -> std::result::Result { let auth = state.api.extract_auth_pubkey(&headers)?; - let (invoice, tenant) = state.api.billing.get_invoice_with_tenant(&id).await + let (invoice, tenant) = state + .api + .billing + .get_invoice_with_tenant(&id) + .await .map_err(|e| ApiError::Internal(e.to_string()))?; state.api.require_admin_or_tenant(&auth, &tenant.pubkey)?; @@ -786,7 +818,11 @@ async fn get_invoice_bolt11( Path(id): Path, ) -> std::result::Result { let auth = state.api.extract_auth_pubkey(&headers)?; - let (invoice, tenant) = state.api.billing.get_invoice_with_tenant(&id).await + let (invoice, tenant) = state + .api + .billing + .get_invoice_with_tenant(&id) + .await .map_err(|e| ApiError::Internal(e.to_string()))?; state.api.require_admin_or_tenant(&auth, &tenant.pubkey)?; diff --git a/backend/src/billing.rs b/backend/src/billing.rs index 89d84e8..b9013ee 100644 --- a/backend/src/billing.rs +++ b/backend/src/billing.rs @@ -1,7 +1,7 @@ use anyhow::{Result, anyhow}; use hmac::{Hmac, Mac}; use nwc::prelude::{ - MakeInvoiceRequest, NostrWalletConnectURI, PayInvoiceRequest as NwcPayInvoiceRequest, NWC, + MakeInvoiceRequest, NWC, NostrWalletConnectURI, PayInvoiceRequest as NwcPayInvoiceRequest, }; use sha2::Sha256; @@ -75,8 +75,12 @@ impl Billing { async fn handle_activity(&self, activity: &Activity) -> Result<()> { let needs_billing_sync = matches!( activity.activity_type.as_str(), - "create_relay" | "update_relay" | "activate_relay" | "deactivate_relay" - | "fail_relay_sync" | "complete_relay_sync" + "create_relay" + | "update_relay" + | "activate_relay" + | "deactivate_relay" + | "fail_relay_sync" + | "complete_relay_sync" ); if needs_billing_sync { @@ -99,7 +103,9 @@ impl Billing { if relay.plan == "free" { if let Some(ref item_id) = relay.stripe_subscription_item_id { self.stripe_delete_subscription_item(item_id).await?; - self.command.delete_relay_subscription_item(&relay.id).await?; + self.command + .delete_relay_subscription_item(&relay.id) + .await?; } self.cleanup_empty_subscription(&tenant.pubkey).await?; return Ok(()); @@ -109,16 +115,16 @@ impl Billing { if relay.status == "inactive" { if let Some(ref item_id) = relay.stripe_subscription_item_id { self.stripe_delete_subscription_item(item_id).await?; - self.command.delete_relay_subscription_item(&relay.id).await?; + self.command + .delete_relay_subscription_item(&relay.id) + .await?; } self.cleanup_empty_subscription(&tenant.pubkey).await?; return Ok(()); } // Active relay on a paid plan - let plan = Query::list_plans() - .into_iter() - .find(|p| p.id == relay.plan); + let plan = Query::list_plans().into_iter().find(|p| p.id == relay.plan); let Some(plan) = plan else { return Ok(()); @@ -128,6 +134,13 @@ impl Billing { return Ok(()); }; + if tenant.stripe_customer_id.trim().is_empty() { + return Err(anyhow!( + "tenant {} has no stripe_customer_id", + tenant.pubkey + )); + } + // Ensure subscription exists if tenant.stripe_subscription_id.is_none() { let (subscription_id, item_id) = self @@ -170,7 +183,9 @@ impl Billing { if let Some(ref subscription_id) = tenant.stripe_subscription_id { self.stripe_cancel_subscription(subscription_id).await?; - self.command.clear_tenant_subscription(tenant_pubkey).await?; + self.command + .clear_tenant_subscription(tenant_pubkey) + .await?; } Ok(()) @@ -240,7 +255,9 @@ impl Billing { return Err(anyhow!("webhook signature mismatch")); } - let ts: i64 = timestamp.parse().map_err(|_| anyhow!("bad webhook timestamp"))?; + let ts: i64 = timestamp + .parse() + .map_err(|_| anyhow!("bad webhook timestamp"))?; let now = chrono::Utc::now().timestamp(); if (now - ts).abs() > WEBHOOK_TOLERANCE_SECS { return Err(anyhow!("webhook timestamp outside tolerance")); @@ -272,9 +289,7 @@ impl Billing { match self.nwc_pay_invoice(amount_due, &tenant.nwc_url).await { Ok(()) => { self.stripe_pay_invoice_out_of_band(invoice_id).await?; - self.command - .clear_tenant_nwc_error(&tenant.pubkey) - .await?; + self.command.clear_tenant_nwc_error(&tenant.pubkey).await?; return Ok(()); } Err(e) => { @@ -442,6 +457,37 @@ impl Billing { Ok((invoice, tenant)) } + pub async fn stripe_create_customer(&self, tenant_pubkey: &str) -> Result { + if self.stripe_secret_key.trim().is_empty() { + return Err(anyhow!("missing STRIPE_SECRET_KEY")); + } + + let short_pubkey: String = tenant_pubkey.chars().take(12).collect(); + let display_name = format!("Caravel tenant {short_pubkey}"); + + let resp = self + .http + .post(format!("{STRIPE_API}/customers")) + .bearer_auth(&self.stripe_secret_key) + .form(&[ + ("name", display_name.as_str()), + ("metadata[tenant_pubkey]", tenant_pubkey), + ]) + .send() + .await?; + + let body: serde_json::Value = resp.error_for_status()?.json().await?; + let customer_id = body["id"] + .as_str() + .ok_or_else(|| anyhow!("missing customer id"))?; + + if !customer_id.starts_with("cus_") { + return Err(anyhow!("unexpected customer id format")); + } + + Ok(customer_id.to_string()) + } + pub async fn stripe_list_invoices(&self, customer_id: &str) -> Result { let resp = self .http @@ -470,7 +516,9 @@ impl Billing { pub async fn create_bolt11(&self, amount_due_cents: i64) -> Result { let amount_msats = (amount_due_cents as u64) * 1000; // placeholder conversion - let system_uri: NostrWalletConnectURI = self.nwc_url.parse() + let system_uri: NostrWalletConnectURI = self + .nwc_url + .parse() .map_err(|_| anyhow!("invalid system NWC URL"))?; let system_nwc = NWC::new(system_uri); @@ -550,10 +598,7 @@ impl Billing { .http .post(format!("{STRIPE_API}/subscription_items")) .bearer_auth(&self.stripe_secret_key) - .form(&[ - ("subscription", subscription_id), - ("price", price_id), - ]) + .form(&[("subscription", subscription_id), ("price", price_id)]) .send() .await?; @@ -649,7 +694,9 @@ impl Billing { let amount_msats = (amount_due_cents as u64) * 1000; // placeholder conversion, actual rate would come from exchange // Create a bolt11 invoice using the system wallet (self.nwc_url) - let system_uri: NostrWalletConnectURI = self.nwc_url.parse() + let system_uri: NostrWalletConnectURI = self + .nwc_url + .parse() .map_err(|_| anyhow!("invalid system NWC URL"))?; let system_nwc = NWC::new(system_uri); @@ -668,7 +715,8 @@ impl Billing { system_nwc.shutdown().await; // Pay the bolt11 invoice using the tenant's wallet - let tenant_uri: NostrWalletConnectURI = tenant_nwc_url.parse() + let tenant_uri: NostrWalletConnectURI = tenant_nwc_url + .parse() .map_err(|_| anyhow!("invalid tenant NWC URL"))?; let tenant_nwc = NWC::new(tenant_uri); diff --git a/backend/src/command.rs b/backend/src/command.rs index 72bd22c..ce19174 100644 --- a/backend/src/command.rs +++ b/backend/src/command.rs @@ -64,6 +64,10 @@ impl Command { } pub async fn create_tenant(&self, tenant: &Tenant) -> Result<()> { + if tenant.stripe_customer_id.trim().is_empty() { + anyhow::bail!("stripe_customer_id is required"); + } + let mut tx = self.pool.begin().await?; sqlx::query( @@ -77,7 +81,8 @@ impl Command { .execute(&mut *tx) .await?; - let activity = Self::insert_activity(&mut tx, "create_tenant", "tenant", &tenant.pubkey).await?; + let activity = + Self::insert_activity(&mut tx, "create_tenant", "tenant", &tenant.pubkey).await?; tx.commit().await?; self.emit(activity); @@ -93,7 +98,8 @@ impl Command { .execute(&mut *tx) .await?; - let activity = Self::insert_activity(&mut tx, "update_tenant", "tenant", &tenant.pubkey).await?; + let activity = + Self::insert_activity(&mut tx, "update_tenant", "tenant", &tenant.pubkey).await?; tx.commit().await?; self.emit(activity); @@ -185,7 +191,8 @@ impl Command { .execute(&mut *tx) .await?; - let activity = Self::insert_activity(&mut tx, "deactivate_relay", "relay", &relay.id).await?; + let activity = + Self::insert_activity(&mut tx, "deactivate_relay", "relay", &relay.id).await?; tx.commit().await?; self.emit(activity); @@ -216,7 +223,8 @@ impl Command { .execute(&mut *tx) .await?; - let activity = Self::insert_activity(&mut tx, "fail_relay_sync", "relay", &relay.id).await?; + let activity = + Self::insert_activity(&mut tx, "fail_relay_sync", "relay", &relay.id).await?; tx.commit().await?; self.emit(activity); @@ -231,7 +239,8 @@ impl Command { .execute(&mut *tx) .await?; - let activity = Self::insert_activity(&mut tx, "complete_relay_sync", "relay", relay_id).await?; + let activity = + Self::insert_activity(&mut tx, "complete_relay_sync", "relay", relay_id).await?; tx.commit().await?; self.emit(activity); @@ -246,7 +255,11 @@ impl Command { Ok(()) } - pub async fn set_relay_subscription_item(&self, relay_id: &str, stripe_subscription_item_id: &str) -> Result<()> { + pub async fn set_relay_subscription_item( + &self, + relay_id: &str, + stripe_subscription_item_id: &str, + ) -> Result<()> { sqlx::query("UPDATE relay SET stripe_subscription_item_id = ? WHERE id = ?") .bind(stripe_subscription_item_id) .bind(relay_id) @@ -255,7 +268,11 @@ impl Command { Ok(()) } - pub async fn set_tenant_subscription(&self, pubkey: &str, stripe_subscription_id: &str) -> Result<()> { + pub async fn set_tenant_subscription( + &self, + pubkey: &str, + stripe_subscription_id: &str, + ) -> Result<()> { sqlx::query("UPDATE tenant SET stripe_subscription_id = ? WHERE pubkey = ?") .bind(stripe_subscription_id) .bind(pubkey)