diff --git a/backend/migrations/0001_init.sql b/backend/migrations/0001_init.sql index a34b7b0..264e380 100644 --- a/backend/migrations/0001_init.sql +++ b/backend/migrations/0001_init.sql @@ -5,7 +5,8 @@ CREATE TABLE IF NOT EXISTS activity ( activity_type TEXT NOT NULL, resource_type TEXT NOT NULL, resource_id TEXT NOT NULL, - billed_at INTEGER + billed_at INTEGER, + plan_id TEXT ); CREATE TABLE IF NOT EXISTS tenant ( @@ -52,14 +53,15 @@ CREATE TABLE IF NOT EXISTS invoice ( CREATE TABLE IF NOT EXISTS invoice_item ( id TEXT PRIMARY KEY, - invoice_id TEXT NOT NULL, - activity_id TEXT NOT NULL, + invoice_id TEXT, + activity_id TEXT, tenant_pubkey TEXT NOT NULL, relay_id TEXT NOT NULL, plan TEXT NOT NULL, amount INTEGER NOT NULL, description TEXT NOT NULL DEFAULT '', created_at INTEGER NOT NULL, + period_start INTEGER, FOREIGN KEY (invoice_id) REFERENCES invoice(id), FOREIGN KEY (tenant_pubkey) REFERENCES tenant(pubkey) ); @@ -90,12 +92,14 @@ CREATE INDEX IF NOT EXISTS idx_activity_resource_created ON activity (resource_i CREATE INDEX IF NOT EXISTS idx_activity_unbilled ON activity (tenant, created_at) WHERE billed_at IS NULL; -CREATE UNIQUE INDEX IF NOT EXISTS uniq_invoice_tenant_period ON invoice (tenant_pubkey, period_start); - CREATE INDEX IF NOT EXISTS idx_invoice_tenant_created ON invoice (tenant_pubkey, created_at); CREATE INDEX IF NOT EXISTS idx_invoice_item_invoice ON invoice_item (invoice_id); +CREATE INDEX IF NOT EXISTS idx_invoice_item_outstanding ON invoice_item (tenant_pubkey) WHERE invoice_id IS NULL; + +CREATE INDEX IF NOT EXISTS idx_invoice_item_renewal ON invoice_item (tenant_pubkey, period_start); + CREATE INDEX IF NOT EXISTS idx_bolt11_invoice_created ON bolt11 (invoice_id, created_at); CREATE INDEX IF NOT EXISTS idx_intent_invoice ON intent (invoice_id); diff --git a/backend/src/api.rs b/backend/src/api.rs index 748641b..76c3aee 100644 --- a/backend/src/api.rs +++ b/backend/src/api.rs @@ -35,7 +35,7 @@ use crate::query; use crate::robot::Robot; use crate::stripe::Stripe; use crate::routes::identity::get_identity; -use crate::routes::invoices::{get_invoice, get_invoice_bolt11}; +use crate::routes::invoices::{get_invoice, get_invoice_bolt11, get_tenant_latest_invoice}; use crate::routes::plans::{get_plan, list_plans}; use crate::routes::relays::{ create_relay, deactivate_relay, get_relay, list_relay_activity, list_relay_members, @@ -76,6 +76,10 @@ impl Api { .route("/tenants/:pubkey", get(get_tenant).put(update_tenant)) .route("/tenants/:pubkey/relays", get(list_tenant_relays)) .route("/tenants/:pubkey/invoices", get(list_tenant_invoices)) + .route( + "/tenants/:pubkey/invoices/latest", + get(get_tenant_latest_invoice), + ) .route("/tenants/:pubkey/stripe/session", get(create_stripe_session)) .route("/relays", get(list_relays).post(create_relay)) .route("/relays/:id", get(get_relay).put(update_relay)) diff --git a/backend/src/billing.rs b/backend/src/billing.rs index 4a08e2c..0066ecd 100644 --- a/backend/src/billing.rs +++ b/backend/src/billing.rs @@ -1,4 +1,5 @@ use anyhow::{Result, anyhow}; +use std::collections::HashMap; use std::time::Duration; use crate::bitcoin; @@ -93,30 +94,23 @@ impl Billing { Ok(()) } - /// If `tenant`'s subscription has rolled into a new billing period, claim it by - /// atomically recording an `autogenerate_invoice` activity, then turn that into an invoice. + /// Poll entry point: generate the tenant's invoice for the current period + /// (adding any due renewals) and, if one results, collect payment. async fn autogenerate_invoice(&self, tenant: &Tenant) -> Result<()> { - // A subscription only exists once a billing anchor is set; until then - // there is no schedule to renew against. - let Some(billing_anchor) = tenant.billing_anchor else { - return Ok(()); - }; - - let now = chrono::Utc::now().timestamp(); - let period_start = period_start_at(billing_anchor, now); - - command::try_autogenerate_invoice(&tenant.pubkey, period_start).await?; + if let Some(invoice) = self.generate_invoice(tenant).await? { + self.attempt_payment(tenant, &invoice).await?; + } Ok(()) } async fn handle_activity(&self, activity: &Activity) -> Result<()> { - let should_sync = matches!( + let should_reconcile = matches!( activity.activity_type.as_str(), - "create_relay" | "update_relay" | "activate_relay" | "deactivate_relay" | "autogenerate_invoice" + "create_relay" | "update_relay" | "activate_relay" | "deactivate_relay" ); - if should_sync + if should_reconcile && let Some(tenant) = query::get_tenant(&activity.tenant).await? { self.reconcile_subscription(&tenant).await?; @@ -148,93 +142,205 @@ impl Billing { Ok(()) } - // --- Invoice generation and autopayment --- + // --- Reconciliation, renewal, and on-demand billing --- - /// Scan a tenant's activity for changes not yet reflected in an invoice and, - /// if there are any, create an invoice with the corresponding line items and - /// attempt to collect payment. async fn reconcile_subscription(&self, tenant: &Tenant) -> Result<()> { - let now = chrono::Utc::now().timestamp(); - let invoice_id = uuid::Uuid::new_v4().to_string(); + let mut tenant = tenant.clone(); - let activities = query::list_billable_activity_for_tenant(&tenant.pubkey).await?; - let billed_activity_ids: Vec = activities.iter().map(|a| a.id.clone()).collect(); - - let mut invoice_items: Vec = Vec::new(); - - for activity in &activities { - // TODO: this is gross - let relay = if activity.resource_type == "relay" { - query::get_relay(&activity.resource_id).await? - } else { - None - }; - - match activity.activity_type.as_str() { - "create_relay" => { - if let Some(relay) = &relay - && let Some(plan) = query::get_plan(&relay.plan) - && plan.amount > 0 - { - // TODO: prorate amount based on billing anchor - invoice_items.push(InvoiceItem { - id: uuid::Uuid::new_v4().to_string(), - invoice_id: invoice_id.clone(), - activity_id: activity.id.clone(), - tenant_pubkey: tenant.pubkey.clone(), - relay_id: activity.resource_id.clone(), - plan: plan.id, - amount: plan.amount, - description: "New relay created".to_string(), - created_at: activity.created_at, - }); - } - } - "update_relay" => { - // TODO: refund/charge prorated amount - } - "activate_relay" => { - // TODO: charge prorated amount - } - "deactivate_relay" => { - // TODO: refund prorated amount - } - "autogenerate_invoice" => { - // TODO: we're at the beginning of a new period, add invoice - // items for all active/paid relays - } - _ => {} + for activity in query::list_billable_activity_for_tenant(&tenant.pubkey).await? { + if tenant.billing_anchor.is_none() { + tenant.billing_anchor = Some(activity.created_at); + command::set_tenant_billing_anchor(&tenant).await?; } + + self.reconcile_activity(&tenant, &activity).await?; } - // No line items (e.g. only free-plan or not-yet-prorated changes): still - // stamp the activities billed so a recovery pass doesn't re-scan them. - if invoice_items.is_empty() { - command::mark_activities_billed(&billed_activity_ids).await?; - return Ok(()); + Ok(()) + } + + /// Reconcile one activity into the ledger: build its line item (if any) and + /// persist it with the activity's billed marker. Activities that produce no + /// item (e.g. free-plan changes) are still marked billed so they aren't + /// re-scanned. + async fn reconcile_activity(&self, tenant: &Tenant, activity: &Activity) -> Result<()> { + let invoice_item = match activity.activity_type.as_str() { + "create_relay" => { + self.make_prorated_item(tenant, activity, 1, "New relay created") + .await? + } + "activate_relay" => { + self.make_prorated_item(tenant, activity, 1, "Relay reactivated") + .await? + } + "deactivate_relay" => { + self.make_prorated_item(tenant, activity, -1, "Relay deactivated (prorated credit)") + .await? + } + "update_relay" => self.make_plan_change_item(tenant, activity).await?, + _ => None, + }; + + match invoice_item { + Some(item) => command::insert_invoice_item_for_activity(&item, &activity.id).await, + None => command::mark_activity_billed(&activity.id).await, + } + } + + /// A prorated charge (or credit, with `sign` = -1) for the relay's current + /// plan. `None` for a missing relay or a free plan. Mid-period items don't + /// stamp `period_start` — the renewal decides coverage from activity history. + async fn make_prorated_item( + &self, + tenant: &Tenant, + activity: &Activity, + sign: i64, + description: &str, + ) -> Result> { + let Some(relay) = query::get_relay(&activity.resource_id).await? else { + return Ok(None); + }; + let Some(plan) = query::get_plan(&relay.plan) else { + return Ok(None); + }; + if plan.amount <= 0 { + return Ok(None); } - let period_start = invoice_items - .iter() - .map(|item| item.created_at) - .max() - .unwrap_or(now); + let anchor = tenant + .billing_anchor + .ok_or_else(|| anyhow!("billing anchor must be set before building prorated items"))?; + let fraction = period_fraction_remaining(anchor, activity.created_at); + let amount = sign * prorate(plan.amount, fraction); + + Ok(Some(line_item(activity, &relay.id, plan.id, amount, description, None))) + } + + /// The prorated delta for a plan change, read straight from the activity log: + /// `new` is this `update_relay` activity's recorded plan, `old` is the relay's + /// plan immediately before it. Because the renewal charges the relay's plan as + /// of the period boundary, this delta composes to the correct total regardless + /// of ordering and needs no coverage gate. `None` when nothing changed. + async fn make_plan_change_item( + &self, + tenant: &Tenant, + activity: &Activity, + ) -> Result> { + let Some(new_plan_id) = activity.plan_id.as_deref() else { + return Ok(None); + }; + let Some(old_plan_id) = + query::get_relay_plan_before(&activity.resource_id, activity.created_at).await? + else { + return Ok(None); + }; + if old_plan_id == new_plan_id { + return Ok(None); + } + let (Some(new_plan), Some(old_plan)) = + (query::get_plan(new_plan_id), query::get_plan(&old_plan_id)) + else { + return Ok(None); + }; + + let anchor = tenant + .billing_anchor + .ok_or_else(|| anyhow!("billing anchor must be set before building prorated items"))?; + let fraction = period_fraction_remaining(anchor, activity.created_at); + let amount = prorate(new_plan.amount, fraction) - prorate(old_plan.amount, fraction); + if amount == 0 { + return Ok(None); + } + + let description = format!("Plan changed from {} to {}", old_plan.name, new_plan.name); + Ok(Some(line_item( + activity, + &activity.resource_id, + new_plan.id, + amount, + &description, + None, + ))) + } + + /// Reconcile pending activity, add this period's renewals for any relay due, + /// and claim everything outstanding onto an invoice. Shared by the poll and + /// the on-demand invoice endpoint — safe to call either way: renewals are + /// per-relay idempotent. No payment is attempted here; callers that want + /// auto-pay do it on the returned invoice. `None` when nothing is owed. + pub async fn generate_invoice(&self, tenant: &Tenant) -> Result> { + self.reconcile_subscription(tenant).await?; + + // reconcile may have just set the anchor (first activity); re-read it. + let Some(tenant) = query::get_tenant(&tenant.pubkey).await? else { + return Ok(None); + }; + let Some(anchor) = tenant.billing_anchor else { + return Ok(None); + }; + + let now = chrono::Utc::now().timestamp(); + let period_start = period_start_at(anchor, now); let period_end = add_one_month(period_start); - let invoice = command::create_invoice( + self.renew_period(&tenant, period_start).await?; + self.claim_outstanding(&tenant, period_start, period_end).await + } + + /// Charge a full-period renewal for every relay that was active on a paid plan + /// as of `period_start`, reconstructing that state from the activity log + /// (status from create/activate/deactivate, plan from create/update). Per-relay + /// idempotent via `period_start`, so calling it on every generation can't + /// renew a relay twice; a relay created/activated *within* the period isn't + /// active before the boundary, so it's covered by its own prorated charge. + async fn renew_period(&self, tenant: &Tenant, period_start: i64) -> Result<()> { + let activities = query::list_relay_activity_before(&tenant.pubkey, period_start).await?; + + let mut renewal_items = Vec::new(); + for (relay_id, state) in relay_states(&activities) { + if !state.active { + continue; + } + let Some(plan) = state.plan.and_then(|id| query::get_plan(&id)) else { + continue; + }; + if plan.amount <= 0 { + continue; + } + renewal_items.push(InvoiceItem { + id: uuid::Uuid::new_v4().to_string(), + invoice_id: None, + activity_id: None, + tenant_pubkey: tenant.pubkey.clone(), + relay_id, + plan: plan.id, + amount: plan.amount, + description: "Subscription renewal".to_string(), + created_at: period_start, + period_start: Some(period_start), + }); + } + + command::create_renewal_items(&renewal_items).await + } + + /// Claim the tenant's outstanding items onto a fresh invoice if they net + /// positive; `None` when nothing is owed (a net credit stays outstanding and + /// carries to the next positive invoice). + async fn claim_outstanding( + &self, + tenant: &Tenant, + period_start: i64, + period_end: i64, + ) -> Result> { + let invoice_id = uuid::Uuid::new_v4().to_string(); + command::claim_outstanding_into_invoice( &invoice_id, &tenant.pubkey, period_start, period_end, - &invoice_items, - &billed_activity_ids, - tenant.billing_anchor.is_none().then_some(now), ) - .await?; - - self.attempt_payment(tenant, &invoice).await?; - - Ok(()) + .await } pub async fn attempt_payment(&self, tenant: &Tenant, invoice: &Invoice) -> Result<()> { @@ -428,6 +534,86 @@ fn add_one_month(ts: i64) -> i64 { .unwrap_or(ts) } +/// Fraction of the current billing period still unused at `at`, in `[0.0, 1.0]`, +/// for prorating a mid-period charge or credit. With no billing anchor yet the +/// period is only just beginning, so the whole period remains (full price). +fn period_fraction_remaining(billing_anchor: i64, at: i64) -> f64 { + let period_start = period_start_at(billing_anchor, at); + let period_end = add_one_month(period_start); + let period_len = (period_end - period_start) as f64; + if period_len <= 0.0 { + return 1.0; + } + + (((period_end - at) as f64) / period_len).clamp(0.0, 1.0) +} + +/// Prorate a minor-unit `amount` by `fraction`, rounded to the nearest unit. +fn prorate(amount: i64, fraction: f64) -> i64 { + (amount as f64 * fraction).round() as i64 +} + +/// Build an outstanding (unassigned, `invoice_id = None`) line item from a +/// reconciled activity. `period_start` is `Some` only for coverage charges +/// (creation/activation), which mark the relay-period as paid. +fn line_item( + activity: &Activity, + relay_id: &str, + plan: String, + amount: i64, + description: &str, + period_start: Option, +) -> InvoiceItem { + InvoiceItem { + id: uuid::Uuid::new_v4().to_string(), + invoice_id: None, + activity_id: Some(activity.id.clone()), + tenant_pubkey: activity.tenant.clone(), + relay_id: relay_id.to_string(), + plan, + amount, + description: description.to_string(), + created_at: activity.created_at, + period_start, + } +} + +/// A relay's billing-relevant state at a point in time, reconstructed by folding +/// its activity log. +#[derive(Default)] +struct RelayState { + active: bool, + plan: Option, +} + +/// Fold relay activities (which must be oldest-first) into each relay's +/// `(active, plan)` state. `create`/`activate`/`deactivate` drive status; +/// `create`/`update` carry the plan via `plan_id`. Feed it activities up to a +/// cutoff to get each relay's state as of that moment (e.g. the period boundary). +fn relay_states(activities: &[Activity]) -> HashMap { + let mut states: HashMap = HashMap::new(); + + for activity in activities { + let state = states.entry(activity.resource_id.clone()).or_default(); + match activity.activity_type.as_str() { + "create_relay" => { + state.active = true; + state.plan = activity.plan_id.clone(); + } + "update_relay" => { + if activity.plan_id.is_some() { + state.plan = activity.plan_id.clone(); + } + } + "activate_relay" => state.active = true, + "deactivate_relay" => state.active = false, + _ => {} + } + } + + states +} + fn summarize_error_message(error: &str) -> Option { let normalized = error.split_whitespace().collect::>().join(" "); if normalized.is_empty() { diff --git a/backend/src/command.rs b/backend/src/command.rs index 0df92c9..efe7e4f 100644 --- a/backend/src/command.rs +++ b/backend/src/command.rs @@ -7,69 +7,6 @@ use crate::models::{ RELAY_STATUS_INACTIVE, Relay, Tenant, }; -// --- Activity --- - -/// Stamp `billed_at` on activities that were reconciled without producing an -/// invoice (e.g. free-plan or not-yet-prorated changes), so a recovery pass -/// doesn't re-scan them. -pub async fn mark_activities_billed(activity_ids: &[String]) -> Result<()> { - if activity_ids.is_empty() { - return Ok(()); - } - - let now = chrono::Utc::now().timestamp(); - with_tx(async |tx| mark_activities_billed_tx(tx, activity_ids, now).await).await -} - -/// Atomically record an `autogenerate_invoice` activity for the tenant, but only -/// if none has been recorded since `since` (the start of the current billing -/// period). Returns whether a new activity was inserted; `false` means the -/// period was already claimed. -/// -/// The existence check and insert are a single statement, which SQLite runs -/// atomically, so concurrent pollers (or a restart racing the previous run) -/// can't both claim the same period. On success the activity is broadcast so the -/// billing consumer reconciles it like any other. -pub async fn try_autogenerate_invoice(tenant_pubkey: &str, since: i64) -> Result { - let id = uuid::Uuid::new_v4().to_string(); - let created_at = chrono::Utc::now().timestamp(); - - let result = sqlx::query( - "INSERT INTO activity (id, tenant, created_at, activity_type, resource_type, resource_id) - SELECT ?, ?, ?, 'autogenerate_invoice', 'tenant', ? - WHERE NOT EXISTS ( - SELECT 1 FROM activity - WHERE tenant = ? - AND activity_type = 'autogenerate_invoice' - AND created_at >= ? - )", - ) - .bind(&id) - .bind(tenant_pubkey) - .bind(created_at) - .bind(tenant_pubkey) - .bind(tenant_pubkey) - .bind(since) - .execute(pool()) - .await?; - - if result.rows_affected() == 0 { - return Ok(false); - } - - publish(Activity { - id, - tenant: tenant_pubkey.to_string(), - created_at, - activity_type: "autogenerate_invoice".to_string(), - resource_type: "tenant".to_string(), - resource_id: tenant_pubkey.to_string(), - billed_at: None, - }); - - Ok(true) -} - // --- Tenants --- pub async fn create_tenant(tenant: &Tenant) -> Result<()> { @@ -84,7 +21,7 @@ pub async fn create_tenant(tenant: &Tenant) -> Result<()> { .bind(&tenant.stripe_customer_id) .execute(&mut **tx) .await?; - insert_activity_tx(tx, "create_tenant", "tenant", &tenant.pubkey).await + insert_activity_tx(tx, "create_tenant", "tenant", &tenant.pubkey, None).await }) .await?; publish(activity); @@ -98,13 +35,22 @@ pub async fn update_tenant(tenant: &Tenant) -> Result<()> { .bind(&tenant.pubkey) .execute(&mut **tx) .await?; - insert_activity_tx(tx, "update_tenant", "tenant", &tenant.pubkey).await + insert_activity_tx(tx, "update_tenant", "tenant", &tenant.pubkey, None).await }) .await?; publish(activity); Ok(()) } +pub async fn set_tenant_billing_anchor(tenant: &Tenant) -> Result<()> { + sqlx::query("UPDATE tenant SET billing_anchor = ? WHERE pubkey = ?") + .bind(tenant.billing_anchor) + .bind(&tenant.pubkey) + .execute(pool()) + .await?; + Ok(()) +} + pub async fn clear_tenant_nwc_error(pubkey: &str) -> Result<()> { sqlx::query("UPDATE tenant SET nwc_error = NULL WHERE pubkey = ?") .bind(pubkey) @@ -143,7 +89,7 @@ pub async fn create_relay(relay: &Relay) -> Result<()> { .bind(relay.push_enabled) .execute(&mut **tx) .await?; - insert_activity_tx(tx, "create_relay", "relay", &relay.id).await + insert_activity_tx(tx, "create_relay", "relay", &relay.id, Some(&relay.plan)).await }) .await?; publish(activity); @@ -179,7 +125,7 @@ pub async fn update_relay(relay: &Relay) -> Result<()> { .bind(&relay.id) .execute(&mut **tx) .await?; - insert_activity_tx(tx, "update_relay", "relay", &relay.id).await + insert_activity_tx(tx, "update_relay", "relay", &relay.id, Some(&relay.plan)).await }) .await?; publish(activity); @@ -206,7 +152,7 @@ async fn set_relay_status(relay_id: &str, status: &str, activity_type: &str) -> .bind(relay_id) .execute(&mut **tx) .await?; - insert_activity_tx(tx, activity_type, "relay", relay_id).await + insert_activity_tx(tx, activity_type, "relay", relay_id, None).await }) .await?; publish(activity); @@ -220,7 +166,7 @@ pub async fn fail_relay_sync(relay: &Relay, sync_error: String) -> Result<()> { .bind(&relay.id) .execute(&mut **tx) .await?; - insert_activity_tx(tx, "fail_relay_sync", "relay", &relay.id).await + insert_activity_tx(tx, "fail_relay_sync", "relay", &relay.id, None).await }) .await?; publish(activity); @@ -233,44 +179,108 @@ pub async fn complete_relay_sync(relay_id: &str) -> Result<()> { .bind(relay_id) .execute(&mut **tx) .await?; - insert_activity_tx(tx, "complete_relay_sync", "relay", relay_id).await + insert_activity_tx(tx, "complete_relay_sync", "relay", relay_id, None).await }) .await?; publish(activity); Ok(()) } +// --- Invoice items (the outstanding-charge ledger) --- + +pub async fn insert_invoice_item_for_activity(invoice_item: &InvoiceItem, activity_id: &str) -> Result<()> { + let now = chrono::Utc::now().timestamp(); + + with_tx(async |tx| { + insert_invoice_item_tx(tx, invoice_item).await?; + mark_activity_billed_tx(tx, activity_id, now).await?; + + Ok(()) + }) + .await +} + +/// Mark an activity billed without a line item — for activities that produce no +/// charge (e.g. free-plan changes), so a recovery pass doesn't re-scan them. +pub async fn mark_activity_billed(activity_id: &str) -> Result<()> { + let now = chrono::Utc::now().timestamp(); + + with_tx(async |tx| mark_activity_billed_tx(tx, activity_id, now).await).await +} + +/// Insert renewal line items, skipping any relay already covered for the item's +/// `period_start`. The per-relay existence check and insert are a single +/// statement, so neither a re-tick nor a relay's own creation/activation charge +/// (which also stamps `period_start`) can bill the same relay-period twice. +pub async fn create_renewal_items(items: &[InvoiceItem]) -> Result<()> { + with_tx(async |tx| { + for item in items { + sqlx::query( + "INSERT INTO invoice_item + (id, invoice_id, activity_id, tenant_pubkey, relay_id, plan, amount, description, created_at, period_start) + SELECT ?, NULL, NULL, ?, ?, ?, ?, ?, ?, ? + WHERE NOT EXISTS ( + SELECT 1 FROM invoice_item WHERE relay_id = ? AND period_start = ? + )", + ) + .bind(&item.id) + .bind(&item.tenant_pubkey) + .bind(&item.relay_id) + .bind(&item.plan) + .bind(item.amount) + .bind(&item.description) + .bind(item.created_at) + .bind(item.period_start) + .bind(&item.relay_id) + .bind(item.period_start) + .execute(&mut **tx) + .await?; + } + + Ok(()) + }) + .await +} + // --- Invoices --- -/// Create an invoice with its line items, stamp `billed_at` on the activities -/// that produced them, and set the tenant's billing anchor when this is their -/// first invoice — all in one transaction. Returns the inserted invoice. -pub async fn create_invoice( +/// Claim all of a tenant's outstanding items onto a new invoice — but only if +/// they sum to a positive amount. A non-positive balance (net credit or nothing +/// owed) leaves the items outstanding so the credit carries to the next positive +/// invoice. The sum, insert, and claim run in one transaction. Returns the +/// invoice, or `None` when there's nothing to bill. +pub async fn claim_outstanding_into_invoice( invoice_id: &str, tenant_pubkey: &str, period_start: i64, period_end: i64, - items: &[InvoiceItem], - billed_activity_ids: &[String], - new_billing_anchor: Option, -) -> Result { - let now = chrono::Utc::now().timestamp(); - +) -> Result> { with_tx(async |tx| { + let total = sqlx::query_scalar::<_, i64>( + "SELECT COALESCE(SUM(amount), 0) FROM invoice_item + WHERE tenant_pubkey = ? AND invoice_id IS NULL", + ) + .bind(tenant_pubkey) + .fetch_one(&mut **tx) + .await?; + + if total <= 0 { + return Ok(None); + } + let invoice = insert_invoice_tx(tx, invoice_id, tenant_pubkey, period_start, period_end).await?; - for item in items { - insert_invoice_item_tx(tx, item).await?; - } + sqlx::query( + "UPDATE invoice_item SET invoice_id = ? + WHERE tenant_pubkey = ? AND invoice_id IS NULL", + ) + .bind(invoice_id) + .bind(tenant_pubkey) + .execute(&mut **tx) + .await?; - mark_activities_billed_tx(tx, billed_activity_ids, now).await?; - - if let Some(anchor) = new_billing_anchor { - set_tenant_billing_anchor_tx(tx, tenant_pubkey, anchor).await?; - } - - Ok(invoice) + Ok(Some(invoice)) }) .await } @@ -285,7 +295,7 @@ pub async fn mark_invoice_paid(invoice_id: &str, method: &str) -> Result<()> { .bind(invoice_id) .execute(&mut **tx) .await?; - insert_activity_tx(tx, "invoice_paid", "invoice", invoice_id).await + insert_activity_tx(tx, "invoice_paid", "invoice", invoice_id, None).await }) .await?; publish(activity); @@ -355,6 +365,7 @@ async fn insert_activity_tx( activity_type: &str, resource_type: &str, resource_id: &str, + plan_id: Option<&str>, ) -> Result { let tenant = match resource_type { "tenant" => resource_id.to_string(), @@ -371,8 +382,8 @@ async fn insert_activity_tx( let created_at = chrono::Utc::now().timestamp(); sqlx::query( - "INSERT INTO activity (id, tenant, created_at, activity_type, resource_type, resource_id) - VALUES (?, ?, ?, ?, ?, ?)", + "INSERT INTO activity (id, tenant, created_at, activity_type, resource_type, resource_id, plan_id) + VALUES (?, ?, ?, ?, ?, ?, ?)", ) .bind(&id) .bind(&tenant) @@ -380,6 +391,7 @@ async fn insert_activity_tx( .bind(activity_type) .bind(resource_type) .bind(resource_id) + .bind(plan_id) .execute(&mut **tx) .await?; @@ -391,6 +403,7 @@ async fn insert_activity_tx( resource_type: resource_type.to_string(), resource_id: resource_id.to_string(), billed_at: None, + plan_id: plan_id.map(str::to_string), }) } @@ -420,8 +433,8 @@ async fn insert_invoice_tx( async fn insert_invoice_item_tx(tx: &mut Transaction<'_, Sqlite>, item: &InvoiceItem) -> Result<()> { sqlx::query( "INSERT INTO invoice_item - (id, invoice_id, activity_id, tenant_pubkey, relay_id, plan, amount, description, created_at) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)", + (id, invoice_id, activity_id, tenant_pubkey, relay_id, plan, amount, description, created_at, period_start) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", ) .bind(&item.id) .bind(&item.invoice_id) @@ -432,34 +445,20 @@ async fn insert_invoice_item_tx(tx: &mut Transaction<'_, Sqlite>, item: &Invoice .bind(item.amount) .bind(&item.description) .bind(item.created_at) + .bind(item.period_start) .execute(&mut **tx) .await?; Ok(()) } -async fn mark_activities_billed_tx( +async fn mark_activity_billed_tx( tx: &mut Transaction<'_, Sqlite>, - activity_ids: &[String], + activity_id: &str, billed_at: i64, ) -> Result<()> { - for id in activity_ids { - sqlx::query("UPDATE activity SET billed_at = ? WHERE id = ?") - .bind(billed_at) - .bind(id) - .execute(&mut **tx) - .await?; - } - Ok(()) -} - -async fn set_tenant_billing_anchor_tx( - tx: &mut Transaction<'_, Sqlite>, - tenant_pubkey: &str, - billing_anchor: i64, -) -> Result<()> { - sqlx::query("UPDATE tenant SET billing_anchor = ? WHERE pubkey = ?") - .bind(billing_anchor) - .bind(tenant_pubkey) + sqlx::query("UPDATE activity SET billed_at = ? WHERE id = ?") + .bind(billed_at) + .bind(activity_id) .execute(&mut **tx) .await?; Ok(()) diff --git a/backend/src/models.rs b/backend/src/models.rs index f3011c3..061e88f 100644 --- a/backend/src/models.rs +++ b/backend/src/models.rs @@ -13,6 +13,9 @@ pub struct Activity { pub resource_type: String, pub resource_id: String, pub billed_at: Option, + /// The relay's plan at the time of a `create_relay`/`update_relay` activity; + /// `None` for all other activity types. + pub plan_id: Option, } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -94,14 +97,19 @@ pub struct Invoice { #[derive(Debug, Clone, Serialize, Deserialize, sqlx::FromRow)] pub struct InvoiceItem { pub id: String, - pub invoice_id: String, - pub activity_id: String, + /// `None` while outstanding; set once the item is claimed onto an invoice. + pub invoice_id: Option, + /// `None` for renewal items, which have no source activity. + pub activity_id: Option, pub tenant_pubkey: String, pub relay_id: String, pub plan: String, pub amount: i64, pub description: String, pub created_at: i64, + /// Set only on renewal items: the billing period this item renews. Doubles as + /// the marker that prevents a period from being renewed twice. + pub period_start: Option, } #[derive(Debug, Clone, Serialize, Deserialize, sqlx::FromRow)] diff --git a/backend/src/query.rs b/backend/src/query.rs index c2fe6ef..04d5037 100644 --- a/backend/src/query.rs +++ b/backend/src/query.rs @@ -113,6 +113,15 @@ pub async fn list_invoices(tenant_pubkey: &str) -> Result> { .await?) } +pub async fn get_latest_invoice(tenant_pubkey: &str) -> Result> { + Ok(sqlx::query_as::<_, Invoice>( + "SELECT * FROM invoice WHERE tenant_pubkey = ? ORDER BY created_at DESC LIMIT 1", + ) + .bind(tenant_pubkey) + .fetch_optional(pool()) + .await?) +} + pub async fn get_invoice_items_for_invoice(invoice_id: &str) -> Result> { Ok( sqlx::query_as::<_, InvoiceItem>("SELECT * FROM invoice_item WHERE invoice_id = ?") @@ -122,6 +131,25 @@ pub async fn get_invoice_items_for_invoice(invoice_id: &str) -> Result Result> { + Ok(sqlx::query_scalar::<_, String>( + "SELECT plan_id FROM activity + WHERE resource_id = ? + AND created_at < ? + AND activity_type IN ('create_relay', 'update_relay') + AND plan_id IS NOT NULL + ORDER BY created_at DESC + LIMIT 1", + ) + .bind(relay_id) + .bind(before) + .fetch_optional(pool()) + .await?) +} + pub async fn get_bolt11(bolt11_id: &str) -> Result> { Ok(sqlx::query_as::<_, Bolt11>("SELECT * FROM bolt11 WHERE id = ?") .bind(bolt11_id) @@ -149,8 +177,7 @@ pub async fn list_billable_activity_for_tenant(tenant_pubkey: &str) -> Result Result Result> { + Ok(sqlx::query_as::<_, Activity>(&select_activity( + "WHERE tenant = ? + AND resource_type = 'relay' + AND activity_type IN ( + 'create_relay', 'update_relay', 'activate_relay', 'deactivate_relay' + ) + AND created_at < ? + ORDER BY created_at ASC", + )) + .bind(tenant_pubkey) + .bind(before) + .fetch_all(pool()) + .await?) +} + pub async fn list_activity_for_resource(resource_id: &str) -> Result> { Ok(sqlx::query_as::<_, Activity>(&select_activity( "WHERE resource_id = ? ORDER BY created_at DESC", diff --git a/backend/src/routes/invoices.rs b/backend/src/routes/invoices.rs index 7929ff5..7dd0de9 100644 --- a/backend/src/routes/invoices.rs +++ b/backend/src/routes/invoices.rs @@ -6,6 +6,31 @@ use crate::api::{Api, AuthedPubkey}; use crate::query; use crate::web::{ApiResult, internal, not_found, ok}; +/// The tenant's most recent invoice, after first materializing any outstanding +/// line items into a fresh one — so the frontend can collect payment right after +/// a change (e.g. creating a relay). Payment isn't attempted here; the caller +/// drives it via the bolt11/Stripe endpoints. `null` when the tenant has no +/// invoices and nothing is outstanding. +pub async fn get_tenant_latest_invoice( + State(api): State>, + AuthedPubkey(auth): AuthedPubkey, + Path(pubkey): Path, +) -> ApiResult { + api.require_admin_or_tenant(&auth, &pubkey)?; + let tenant = api.get_tenant_or_404(&pubkey).await?; + + // Roll any outstanding charges (and due renewals) into an invoice, then + // return the latest. + api.billing + .generate_invoice(&tenant) + .await + .map_err(internal)?; + + let invoice = query::get_latest_invoice(&pubkey).await.map_err(internal)?; + + ok(invoice) +} + pub async fn get_invoice( State(api): State>, AuthedPubkey(auth): AuthedPubkey,