diff --git a/backend/src/billing.rs b/backend/src/billing.rs index 16220a4..f4be823 100644 --- a/backend/src/billing.rs +++ b/backend/src/billing.rs @@ -4,10 +4,10 @@ use std::collections::BTreeMap; use crate::bitcoin; use crate::command::Command; use crate::env::Env; -use crate::models::{Activity, RELAY_STATUS_ACTIVE, RELAY_STATUS_DELINQUENT}; +use crate::models::{Activity, Tenant, RELAY_STATUS_ACTIVE, RELAY_STATUS_DELINQUENT}; use crate::query::Query; use crate::robot::Robot; -use crate::stripe::{Stripe, StripeInvoice}; +use crate::stripe::{Stripe, StripeInvoice, StripeSubscription}; use crate::wallet::Wallet; const MANUAL_LIGHTNING_PAYMENT_DM: &str = "Payment is due for your relay subscription. Please visit the application to complete a manual Lightning payment."; @@ -34,10 +34,7 @@ pub struct Billing { impl Billing { pub fn new(query: Query, command: Command, robot: Robot, env: &Env) -> Self { Self { - stripe: Stripe::new( - env.stripe_secret_key.clone(), - env.stripe_webhook_secret.clone(), - ), + stripe: Stripe::new(env), wallet: Wallet::from_url(&env.robot_wallet).expect("invalid ROBOT_WALLET"), env: env.clone(), query, @@ -49,7 +46,7 @@ impl Billing { pub async fn start(self) { let mut rx = self.command.notify.subscribe(); - if let Err(error) = self.reconcile_relay_subscriptions("startup").await { + if let Err(error) = self.reconcile_subscriptions("startup").await { tracing::error!(error = %error, "failed to reconcile relay billing state on startup"); } @@ -63,7 +60,7 @@ impl Billing { Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => { tracing::warn!(missed = n, "billing lagged"); - if let Err(error) = self.reconcile_relay_subscriptions("lagged").await { + if let Err(error) = self.reconcile_subscriptions("lagged").await { tracing::error!(error = %error, "failed to reconcile relay billing state after lag"); } } @@ -72,7 +69,7 @@ impl Billing { } } - async fn reconcile_relay_subscriptions(&self, source: &str) -> Result<()> { + async fn reconcile_subscriptions(&self, source: &str) -> Result<()> { let tenants = self.query.list_tenants().await?; if tenants.is_empty() { @@ -86,7 +83,7 @@ impl Billing { ); for tenant in tenants { - if let Err(error) = self.sync_tenant_subscription(&tenant.pubkey).await { + if let Err(error) = self.sync_tenant(&tenant.pubkey).await { tracing::error!( source, tenant = %tenant.pubkey, @@ -110,10 +107,8 @@ impl Billing { | "complete_relay_sync" ); - if needs_billing_sync - && let Some(relay) = self.query.get_relay(&activity.resource_id).await? - { - self.sync_tenant_subscription(&relay.tenant).await?; + if needs_billing_sync { + self.sync_tenant(&activity.tenant).await?; } Ok(()) @@ -125,135 +120,125 @@ impl Billing { /// Stripe forbids two subscription items on the same subscription from sharing a /// price, so billing is modeled as one subscription item per plan (price) with /// `quantity` equal to the number of the tenant's `active` relays on that plan. - async fn sync_tenant_subscription(&self, tenant_pubkey: &str) -> Result<()> { - let Some(mut tenant) = self.query.get_tenant(tenant_pubkey).await? else { + async fn sync_tenant(&self, tenant_pubkey: &str) -> Result<()> { + let Some(tenant) = self.query.get_tenant(tenant_pubkey).await? else { return Ok(()); }; - let relays = self.query.list_relays_for_tenant(tenant_pubkey).await?; + let quantity_by_price_id = self.get_quantity_by_price_id(&tenant).await?; - // Desired billed state: price id -> quantity. - let mut desired: BTreeMap = BTreeMap::new(); - for relay in &relays { + // If we've got no subscription items, we can cancel and clear the tenant's subscription + if quantity_by_price_id.is_empty() { + self.ensure_subscription_is_inactive(&tenant).await?; + return Ok(()); + } + + let subscription = self + .ensure_subscription_is_active(&tenant, &quantity_by_price_id) + .await?; + + self.ensure_subscription_items(subscription, quantity_by_price_id).await + } + + // --Stripe helpers-- + + /// Gets a map of stripe_price_id -> quantity based on the tenant's current relays + async fn get_quantity_by_price_id(&self, tenant: &Tenant) -> Result> { + let mut quantity_by_price_id = BTreeMap::new(); + for relay in self.query.list_relays_for_tenant(&tenant.pubkey).await? { if relay.status != RELAY_STATUS_ACTIVE { continue; } - let Some(plan) = self.query.get_plan(&relay.plan) else { - tracing::warn!(relay = %relay.id, plan = %relay.plan, "active relay on unknown plan; not billed"); + let Some(price_id) = self.query.get_plan(&relay.plan).and_then(|p| p.stripe_price_id) else { continue; }; - let Some(price_id) = plan.stripe_price_id else { - continue; // free plan: nothing to bill - }; - if price_id.trim().is_empty() { - tracing::warn!(relay = %relay.id, plan = %relay.plan, "active relay on a paid plan with no configured Stripe price id; not billed"); - continue; - } - *desired.entry(price_id).or_insert(0) += 1; + *quantity_by_price_id.entry(price_id).or_insert(0) += 1; } + Ok(quantity_by_price_id) + } - // Resolve the live subscription, dropping a stale reference to one that no - // longer exists or has been canceled. - let subscription = match tenant.stripe_subscription_id.as_deref() { - Some(subscription_id) => match self.stripe.get_subscription(subscription_id).await? { - Some(sub) if !matches!(sub.status.as_str(), "canceled" | "incomplete_expired") => { - Some(sub) - } - _ => { - self.command - .clear_tenant_subscription(tenant_pubkey) - .await?; - tenant.stripe_subscription_id = None; - None - } - }, + /// Fetch the tenant's current subscription from Stripe, if it has one + async fn get_subscription(&self, tenant: &Tenant) -> Result> { + let subscription = match &tenant.stripe_subscription_id { + Some(id) => self.stripe.get_subscription(id).await?, None => None, }; - // No relays to bill: tear everything down. - if desired.is_empty() { - if let Some(ref subscription_id) = tenant.stripe_subscription_id { - self.stripe.cancel_subscription(subscription_id).await?; - self.command - .clear_tenant_subscription(tenant_pubkey) - .await?; - } - return Ok(()); + // If it's canceled, clear the subscription id and return nothing for simplicity + if subscription + .as_ref() + .is_some_and(|s| matches!(s.status.as_str(), "canceled" | "incomplete_expired")) + { + self.command.clear_tenant_subscription(&tenant.pubkey).await?; + return Ok(None); } - // Bring the subscription's items in line with `desired`. - let mut downgraded = false; + Ok(subscription) + } - match subscription { - None => { - let sub = self - .stripe - .create_subscription(&tenant.stripe_customer_id, &desired) - .await?; - self.command - .set_tenant_subscription(tenant_pubkey, &sub.id) - .await?; - tenant.stripe_subscription_id = Some(sub.id); - } - Some(sub) => { - let subscription_id = sub.id; - - // price id -> (item id, quantity) for items currently on the subscription. - let mut current: BTreeMap = BTreeMap::new(); - for item in sub.items { - current.insert(item.price.id, (item.id, item.quantity)); - } - - for (price_id, &quantity) in &desired { - if let Some((item_id, current_quantity)) = current.remove(price_id) { - if current_quantity != quantity { - if quantity < current_quantity { - downgraded = true; - } - self.stripe - .set_subscription_item_quantity(&item_id, quantity) - .await?; - } - } else { - self.stripe - .create_subscription_item(&subscription_id, price_id, quantity) - .await?; - } - } - - // Items for plans no relay is on anymore. - for (_, (item_id, _)) in current { - downgraded = true; - self.stripe.delete_subscription_item(&item_id).await?; - } - } + /// Make sure the tenant has an active subscription, creating one with the desired + /// items if it doesn't (Stripe rejects an itemless subscription). + async fn ensure_subscription_is_active( + &self, + tenant: &Tenant, + quantity_by_price_id: &BTreeMap, + ) -> Result { + if let Some(sub) = self.get_subscription(tenant).await? { + return Ok(sub); } - if downgraded { - self.validate_downgrade_proration(&tenant, "tenant-subscription-sync") - .await; + let sub = self + .stripe + .create_subscription(&tenant.stripe_customer_id, quantity_by_price_id) + .await?; + self.command.set_tenant_subscription(&tenant.pubkey, &sub.id).await?; + Ok(sub) + } + + /// If the tenant has a subscription, cancel and clear it + async fn ensure_subscription_is_inactive(&self, tenant: &Tenant) -> Result<()> { + if let Some(s) = self.get_subscription(tenant).await? { + self.stripe.cancel_subscription(&s.id).await?; + self.command.clear_tenant_subscription(&tenant.pubkey).await?; } Ok(()) } - async fn existing_invoice_nwc_payment_outcome( + /// Sync desired quantity_by_price_id with stripe + async fn ensure_subscription_items( &self, - invoice_id: &str, - ) -> Result> { - let state = self.query.get_invoice_nwc_payment_state(invoice_id).await?; - match state.as_deref() { - Some("paid") => Ok(Some(NwcInvoicePaymentOutcome::Paid)), - Some("pending") => Ok(Some(NwcInvoicePaymentOutcome::Pending(anyhow!( - "invoice {invoice_id} has a pending NWC reconciliation; refusing to create a new Lightning charge" - )))), - Some(other) => Err(anyhow!( - "unknown invoice_nwc_payment state '{other}' for invoice {invoice_id}" - )), - None => Ok(None), + subscription: StripeSubscription, + quantity_by_price_id: BTreeMap, + ) -> Result<()> { + let mut current: BTreeMap = BTreeMap::new(); + for item in subscription.items { + current.insert(item.price.id, (item.id, item.quantity)); } + + for (price_id, &quantity) in &quantity_by_price_id { + if let Some((item_id, current_quantity)) = current.remove(price_id) { + if current_quantity != quantity { + self.stripe + .set_subscription_item_quantity(&item_id, quantity) + .await?; + } + } else { + self.stripe + .create_subscription_item(&subscription.id, price_id, quantity) + .await?; + } + } + + for (_, (item_id, _)) in current { + self.stripe.delete_subscription_item(&item_id).await?; + } + + Ok(()) } + // --Stripe Webhooks-- + pub async fn handle_webhook(&self, payload: &str, signature: &str) -> Result<()> { let event = self.stripe.get_webhook_event(payload, signature)?; let obj = &event.data.object; @@ -520,47 +505,6 @@ impl Billing { Ok(()) } - async fn validate_downgrade_proration(&self, tenant: &crate::models::Tenant, context: &str) { - match self - .stripe - .preview_invoice( - &tenant.stripe_customer_id, - tenant.stripe_subscription_id.as_deref(), - ) - .await - { - Ok(upcoming) => { - let proration_lines = upcoming.lines.iter().filter(|line| line.proration).count(); - - tracing::info!( - tenant_pubkey = %tenant.pubkey, - stripe_customer_id = %tenant.stripe_customer_id, - context, - proration_lines, - amount_due = upcoming.amount_due, - currency = %upcoming.currency, - "validated Stripe proration preview for downgrade" - ); - - if proration_lines == 0 { - tracing::warn!( - tenant_pubkey = %tenant.pubkey, - context, - "downgrade proration preview has no proration lines; verify in Stripe dashboard" - ); - } - } - Err(error) => { - tracing::warn!( - error = %error, - tenant_pubkey = %tenant.pubkey, - context, - "failed to fetch downgrade proration preview" - ); - } - } - } - // --- Public API helpers --- /// Returns `Ok(None)` if Stripe has no such invoice; the route turns that into a 404. @@ -888,6 +832,23 @@ impl Billing { ))), } } + + async fn existing_invoice_nwc_payment_outcome( + &self, + invoice_id: &str, + ) -> Result> { + let state = self.query.get_invoice_nwc_payment_state(invoice_id).await?; + match state.as_deref() { + Some("paid") => Ok(Some(NwcInvoicePaymentOutcome::Paid)), + Some("pending") => Ok(Some(NwcInvoicePaymentOutcome::Pending(anyhow!( + "invoice {invoice_id} has a pending NWC reconciliation; refusing to create a new Lightning charge" + )))), + Some(other) => Err(anyhow!( + "unknown invoice_nwc_payment state '{other}' for invoice {invoice_id}" + )), + None => Ok(None), + } + } } fn summarize_nwc_error_for_dm(error: &str) -> Option { diff --git a/backend/src/stripe.rs b/backend/src/stripe.rs index 6e6d239..3591525 100644 --- a/backend/src/stripe.rs +++ b/backend/src/stripe.rs @@ -9,6 +9,8 @@ use hmac::{Hmac, Mac}; use sha2::Sha256; use std::collections::BTreeMap; +use crate::env::Env; + const STRIPE_API: &str = "https://api.stripe.com/v1"; // Webhooks @@ -57,22 +59,6 @@ pub struct StripeInvoice { pub status: String, pub amount_due: i64, pub currency: String, - #[serde(deserialize_with = "deserialize_list")] - pub lines: Vec, -} - -#[derive(serde::Deserialize)] -pub struct StripeInvoicePreview { - pub amount_due: i64, - pub currency: String, - #[serde(deserialize_with = "deserialize_list")] - pub lines: Vec, -} - -#[derive(serde::Deserialize, serde::Serialize, Clone)] -pub struct StripeInvoiceLine { - #[serde(default)] - pub proration: bool, } #[derive(serde::Deserialize)] @@ -96,16 +82,14 @@ fn default_quantity() -> i64 { #[derive(Clone)] pub struct Stripe { - pub(crate) secret_key: String, - pub(crate) webhook_secret: String, + env: Env, http: reqwest::Client, } impl Stripe { - pub fn new(secret_key: String, webhook_secret: String) -> Self { + pub fn new(env: &Env) -> Self { Self { - secret_key, - webhook_secret, + env: env.clone(), http: reqwest::Client::new(), } } @@ -115,23 +99,23 @@ impl Stripe { fn get(&self, path: &str) -> reqwest::RequestBuilder { self.http .get(format!("{STRIPE_API}{path}")) - .bearer_auth(&self.secret_key) + .bearer_auth(&self.env.stripe_secret_key) } fn post(&self, path: &str) -> reqwest::RequestBuilder { self.http .post(format!("{STRIPE_API}{path}")) - .bearer_auth(&self.secret_key) + .bearer_auth(&self.env.stripe_secret_key) } fn delete(&self, path: &str) -> reqwest::RequestBuilder { self.http .delete(format!("{STRIPE_API}{path}")) - .bearer_auth(&self.secret_key) + .bearer_auth(&self.env.stripe_secret_key) } fn idempotency_key(&self, parts: &[&str]) -> String { - let mut mac = Hmac::::new_from_slice(self.secret_key.as_bytes()) + let mut mac = Hmac::::new_from_slice(self.env.stripe_secret_key.as_bytes()) .expect("HMAC accepts any key length"); for (i, part) in parts.iter().enumerate() { if i > 0 { @@ -175,6 +159,8 @@ impl Stripe { .map_err(Into::into) } + /// Stripe requires at least one item to create a subscription, so the desired + /// items are sent inline here; [`crate::billing`] reconciles from there. pub async fn create_subscription( &self, customer_id: &str, @@ -296,18 +282,6 @@ impl Stripe { Ok(()) } - pub async fn preview_invoice( - &self, - customer_id: &str, - subscription_id: Option<&str>, - ) -> Result { - let mut req = self.get("/invoices/upcoming").query(&[("customer", customer_id)]); - if let Some(subscription_id) = subscription_id { - req = req.query(&[("subscription", subscription_id)]); - } - Ok(req.send_ok().await?.json().await?) - } - // --- Payment methods --- pub async fn has_payment_method(&self, customer_id: &str) -> Result { @@ -357,7 +331,7 @@ impl Stripe { let signature = sig.ok_or_else(|| anyhow!("missing webhook signature"))?; let signed_payload = format!("{timestamp}.{payload}"); - let mut mac = Hmac::::new_from_slice(self.webhook_secret.as_bytes()) + let mut mac = Hmac::::new_from_slice(self.env.stripe_webhook_secret.as_bytes()) .map_err(|e| anyhow!("invalid webhook secret: {e}"))?; mac.update(signed_payload.as_bytes()); let expected = hex::encode(mac.finalize().into_bytes());