diff --git a/backend/src/billing.rs b/backend/src/billing.rs index 66d55b9..1c1ebc0 100644 --- a/backend/src/billing.rs +++ b/backend/src/billing.rs @@ -4,7 +4,6 @@ use std::time::Duration; use crate::bitcoin; use crate::command; -use crate::db; use crate::env; use crate::models::{Activity, Bolt11, Invoice, InvoiceItem, Tenant}; use crate::query; @@ -34,107 +33,25 @@ impl Billing { // --- lifecycle methods --- pub async fn start(self) { - let mut rx = db::subscribe(); - - tokio::spawn({ - let billing = self.clone(); - async move { billing.poll().await } - }); - - if let Err(error) = self.reconcile_subscriptions("startup").await { - tracing::error!(error = %error, "failed to reconcile subscriptions on startup"); - } - - loop { - match rx.recv().await { - Ok(activity) => { - if let Err(e) = self.handle_activity(&activity).await { - tracing::error!(error = %e, "billing handle_activity failed"); - } - } - Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => { - tracing::warn!(missed = n, "billing lagged, reconciling all subscriptions"); - - if let Err(error) = self.reconcile_subscriptions("lagged").await { - tracing::error!(error = %error, "failed to reconcile after lag"); - } - } - Err(tokio::sync::broadcast::error::RecvError::Closed) => break, - } - } - } - - async fn poll(&self) { let mut interval = tokio::time::interval(POLL_INTERVAL); loop { interval.tick().await; - if let Err(error) = self.autogenerate_invoices().await { + if let Err(error) = self.reconcile_subscriptions().await { tracing::error!(error = %error, "billing poll failed"); } } } - async fn autogenerate_invoices(&self) -> Result<()> { + async fn reconcile_subscriptions(&self) -> Result<()> { let tenants = query::list_tenants().await?; - tracing::info!( - tenant_count = tenants.len(), - "polling tenants for subscription renewal" - ); - - for tenant in tenants { - if let Err(error) = self.autogenerate_invoice(&tenant).await { - tracing::error!( - tenant = %tenant.pubkey, - error = ?error, - "failed to autogenerate invoice" - ); - } - } - - Ok(()) - } - - /// Periodically generate the tenant's invoice for the current period - /// (adding any due renewals) and, if one results, attempt payment. - async fn autogenerate_invoice(&self, tenant: &Tenant) -> Result<()> { - if let Some(invoice) = self.generate_invoice(tenant).await? { - self.attempt_payment(tenant, &invoice).await?; - } - - Ok(()) - } - - async fn handle_activity(&self, activity: &Activity) -> Result<()> { - let should_reconcile = matches!( - activity.activity_type.as_str(), - "create_relay" | "update_relay" | "activate_relay" | "deactivate_relay" - ); - - if should_reconcile - && let Some(tenant) = query::get_tenant(&activity.tenant).await? - { - self.reconcile_subscription(&tenant).await?; - } - - Ok(()) - } - - async fn reconcile_subscriptions(&self, source: &str) -> Result<()> { - let tenants = query::list_tenants().await?; - - tracing::info!( - source, - tenant_count = tenants.len(), - "reconciling all subscriptions" - ); + tracing::info!(tenant_count = tenants.len(), "reconciling all subscriptions"); for tenant in tenants { if let Err(error) = self.reconcile_subscription(&tenant).await { tracing::error!( - source, tenant = %tenant.pubkey, error = ?error, "failed to reconcile subscription" @@ -145,22 +62,41 @@ impl Billing { Ok(()) } - // --- Reconciliation, renewal, and on-demand billing --- + // --- Reconciliation of activity/renewals --- /// Lists billable activity, setting the tenant's billing anchor to the first - /// activity in the process. - async fn reconcile_subscription(&self, tenant: &Tenant) -> Result<()> { + /// activity in the process. Generates an invoice for the current period if due + /// for renewal or any billable activities have occurred. Attempts payment if + /// an invoice is generated. + pub async fn reconcile_subscription(&self, tenant: &Tenant) -> Result<()> { let mut tenant = tenant.clone(); + // Reconcile all activity, setting the tenant's billing anchor on the first + // positive-balance line item if not already set. for activity in query::list_billable_activity_for_tenant(&tenant.pubkey).await? { if tenant.billing_anchor.is_none() { tenant.billing_anchor = Some(activity.created_at); command::set_tenant_billing_anchor(&tenant).await?; } - self.reconcile_activity(&tenant, &activity).await?; + let invoice_item = self.reconcile_activity(&tenant, &activity).await?; } + // If the tenant has no billing anchor, they have nothing to bill + let Some(period) = BillingPeriod::current(&tenant) else { + return Ok(()); + }; + + // If tenant is due for renewal, bill any active relays. + if tenant.renewed_at.is_none_or(|at| at < period.start) { + self.reconcile_renewal(&tenant, &period).await?; + } + + // Create the invoice, but only if non-zero and attempt payment + if let Some(invoice) = command::create_invoice(&tenant, &period).await? { + self.attempt_payment(&tenant, &invoice).await?; + }; + Ok(()) } @@ -168,7 +104,7 @@ impl Billing { /// persist it with the activity's billed marker. Activities that produce no /// item (e.g. free-plan changes) are still marked billed so they aren't /// re-scanned. - async fn reconcile_activity(&self, tenant: &Tenant, activity: &Activity) -> Result<()> { + async fn reconcile_activity(&self, tenant: &Tenant, activity: &Activity) -> Result> { let invoice_item = match activity.activity_type.as_str() { "create_relay" => { self.make_prorated_item(tenant, activity, 1, "New relay created") @@ -187,9 +123,11 @@ impl Billing { }; match invoice_item { - Some(item) => command::insert_invoice_item_for_activity(&item, &activity.id).await, - None => command::mark_activity_billed(&activity.id).await, + Some(ref item) => command::insert_invoice_item_for_activity(&item, &activity.id).await?, + None => command::mark_activity_billed(&activity.id).await?, } + + Ok(invoice_item) } /// A prorated charge (or credit, with `sign` = -1) for the relay's current @@ -203,16 +141,16 @@ impl Billing { description: &str, ) -> Result> { let Some(relay) = query::get_relay(&activity.resource_id).await? else { - return Ok(None); + return anyhow!("activity resource was not a valid relay"); }; let Some(plan) = query::get_plan(&relay.plan) else { - return Ok(None); + return anyhow!("activity plan was not a valid plan"); }; if plan.amount <= 0 { return Ok(None); } - let period = BillingPeriod::new(tenant, activity.created_at) + let period = BillingPeriod::at(tenant, activity.created_at) .ok_or_else(|| anyhow!("billing anchor must be set before building prorated items"))?; let amount = sign * period.prorate(plan.amount, activity.created_at); @@ -240,23 +178,24 @@ impl Billing { activity: &Activity, ) -> Result> { let Some(new_plan_id) = activity.plan_id.as_deref() else { - return Ok(None); + return anyhow!("activity plan was not a valid plan"); }; let Some(old_plan_id) = query::get_relay_plan_before(&activity.resource_id, activity.created_at).await? else { - return Ok(None); + return anyhow!("no previous plan found for relay update activity"); }; if old_plan_id == new_plan_id { return Ok(None); } - let (Some(new_plan), Some(old_plan)) = - (query::get_plan(new_plan_id), query::get_plan(&old_plan_id)) - else { - return Ok(None); + let Some(new_plan) = query::get_plan(new_plan_id) else { + return anyhow!("new plan is an invalid plan"); + }; + let Some(old_plan) = query::get_plan(old_plan_id) else { + return anyhow!("old plan is an invalid plan"); }; - let period = BillingPeriod::new(tenant, activity.created_at) + let period = BillingPeriod::at(tenant, activity.created_at) .ok_or_else(|| anyhow!("billing anchor must be set before building prorated items"))?; let amount = period.prorate(new_plan.amount, activity.created_at) - period.prorate(old_plan.amount, activity.created_at); @@ -278,28 +217,6 @@ impl Billing { })) } - /// Reconcile pending activity, add this period's renewals if they're due, and - /// claim everything outstanding onto an invoice. Idempotent per period. - pub async fn generate_invoice(&self, tenant: &Tenant) -> Result> { - self.reconcile_subscription(tenant).await?; - - // reconcile may have just set the anchor (first activity); re-read it. - let Some(tenant) = query::get_tenant(&tenant.pubkey).await? else { - return Ok(None); - }; - let Some(period) = BillingPeriod::current(&tenant) else { - return Ok(None); - }; - - // Short-circuit the renewal scan if this period is already renewed, for - // performance. Renew_tenant re-checks this in-tx as the real guard. - if tenant.renewed_at.is_none_or(|at| at < period.start) { - self.renew_period(&tenant, &period).await?; - } - - command::create_invoice(&tenant.pubkey, period.start, period.end).await - } - /// Charge a full-period renewal for every relay that was active on a paid plan /// as of `period.start`, reconstructing that state from the activity log /// (status from create/activate/deactivate, plan from create/update). @@ -307,7 +224,7 @@ impl Billing { /// on every generation can't renew twice; a relay created/activated *within* /// the period isn't active before the boundary, so it's covered by its own /// prorated charge instead. - async fn renew_period(&self, tenant: &Tenant, period: &BillingPeriod) -> Result<()> { + async fn reconcile_renewal(&self, tenant: &Tenant, period: &BillingPeriod) -> Result<()> { let activities = query::list_relay_activity_before(&tenant.pubkey, period.start).await?; let mut line_items = Vec::new(); @@ -339,6 +256,8 @@ impl Billing { command::insert_invoice_items_for_renewal(&line_items, period).await } + // --- Payments --- + pub async fn attempt_payment(&self, tenant: &Tenant, invoice: &Invoice) -> Result<()> { let mut error_message: Option = None; @@ -510,12 +429,12 @@ impl BillingPeriod { /// The period containing `chrono::Utc::now()` for `tenant`. `None` when the /// tenant has no `billing_anchor` yet — i.e. no billable activity has been seen. fn current(tenant: &Tenant) -> Option { - Self::new(tenant, chrono::Utc::now().timestamp()) + Self::at(tenant, chrono::Utc::now().timestamp()) } /// The period containing `at` for `tenant`. `None` when the tenant has no /// `billing_anchor` yet — i.e. no billable activity has been seen. - fn new(tenant: &Tenant, at: i64) -> Option { + fn at(tenant: &Tenant, at: i64) -> Option { use chrono::{DateTime, Months, Utc}; let anchor = tenant.billing_anchor?; diff --git a/backend/src/command.rs b/backend/src/command.rs index ebec273..430c2dc 100644 --- a/backend/src/command.rs +++ b/backend/src/command.rs @@ -264,17 +264,13 @@ pub async fn mark_activity_billed(activity_id: &str) -> Result<()> { /// Claim all of a tenant's outstanding items onto a new invoice. A non-positive /// balance leaves the items outstanding so the credit carries to the next positive /// invoice. Returns the invoice, or `None` when there's nothing to bill. -pub async fn create_invoice( - tenant_pubkey: &str, - period_start: i64, - period_end: i64, -) -> Result> { +pub async fn create_invoice(tenant: &Tenant, period: &BillingPeriod) -> Result> { with_tx(async |tx| { let total = sqlx::query_scalar::<_, i64>( "SELECT COALESCE(SUM(amount), 0) FROM invoice_item WHERE tenant_pubkey = ? AND invoice_id IS NULL", ) - .bind(tenant_pubkey) + .bind(&tenant.pubkey) .fetch_one(&mut **tx) .await?; @@ -282,15 +278,14 @@ pub async fn create_invoice( return Ok(None); } - let invoice = - insert_invoice_tx(tx, tenant_pubkey, period_start, period_end).await?; + let invoice = insert_invoice_tx(tx, &tenant, &period).await?; sqlx::query( "UPDATE invoice_item SET invoice_id = ? WHERE tenant_pubkey = ? AND invoice_id IS NULL", ) .bind(&invoice.id) - .bind(tenant_pubkey) + .bind(&tenant.pubkey) .execute(&mut **tx) .await?; @@ -423,9 +418,8 @@ async fn insert_activity_tx( async fn insert_invoice_tx( tx: &mut Transaction<'_, Sqlite>, - tenant_pubkey: &str, - period_start: i64, - period_end: i64, + tenant: &Tenant, + period: &BillingPeriod, ) -> Result { let now = chrono::Utc::now().timestamp(); let invoice_id = uuid::Uuid::new_v4().to_string(); @@ -435,9 +429,9 @@ async fn insert_invoice_tx( VALUES (?, ?, 'open', ?, ?, ?, ?) RETURNING *", ) .bind(invoice_id) - .bind(tenant_pubkey) - .bind(period_start) - .bind(period_end) + .bind(&tenant.pubkey) + .bind(&period.start) + .bind(period.end) .bind(now) .bind(now) .fetch_one(&mut **tx) diff --git a/backend/src/routes/invoices.rs b/backend/src/routes/invoices.rs index 5ca51e7..b1a66f5 100644 --- a/backend/src/routes/invoices.rs +++ b/backend/src/routes/invoices.rs @@ -19,12 +19,7 @@ pub async fn get_tenant_latest_invoice( api.require_admin_or_tenant(&auth, &pubkey)?; let tenant = api.get_tenant_or_404(&pubkey).await?; - // Roll any outstanding charges (and due renewals) into an invoice, then - // return the latest. - api.billing - .generate_invoice(&tenant) - .await - .map_err(internal)?; + api.billing.reconcile_subscription(&tenant).await.map_err(internal)?; let invoice = query::get_latest_invoice_for_tenant(&pubkey).await.map_err(internal)?; diff --git a/frontend/src/components/PaymentDialog.tsx b/frontend/src/components/PaymentDialog.tsx index 6e56bf5..0849841 100644 --- a/frontend/src/components/PaymentDialog.tsx +++ b/frontend/src/components/PaymentDialog.tsx @@ -33,7 +33,7 @@ export default function PaymentDialog(props: PaymentDialogProps) { const planById = new Map(plans().map((p) => [p.id, p])) return (relays() ?? []) .map((relay) => ({ relay, plan: planById.get(relay.plan) })) - .filter((entry) => entry.plan?.amount > 0) + .filter((entry) => Boolean(entry.plan?.amount)) }) async function loadBolt11() {