diff --git a/backend/migrations/0001_init.sql b/backend/migrations/0001_init.sql index ae8f81e..723a392 100644 --- a/backend/migrations/0001_init.sql +++ b/backend/migrations/0001_init.sql @@ -98,6 +98,9 @@ CREATE INDEX IF NOT EXISTS idx_invoice_item_invoice ON invoice_item (invoice_id) CREATE INDEX IF NOT EXISTS idx_invoice_item_outstanding ON invoice_item (tenant_pubkey) WHERE invoice_id IS NULL; +-- At most one line item per billable activity to ensure no double-billing. +CREATE UNIQUE INDEX IF NOT EXISTS idx_invoice_item_activity ON invoice_item (activity_id); + CREATE INDEX IF NOT EXISTS idx_bolt11_invoice_created ON bolt11 (invoice_id, created_at); CREATE INDEX IF NOT EXISTS idx_intent_invoice ON intent (invoice_id); diff --git a/backend/src/billing.rs b/backend/src/billing.rs index a9fd127..6636a7a 100644 --- a/backend/src/billing.rs +++ b/backend/src/billing.rs @@ -97,8 +97,8 @@ impl Billing { Ok(()) } - /// Poll entry point: generate the tenant's invoice for the current period - /// (adding any due renewals) and, if one results, collect payment. + /// Periodically generate the tenant's invoice for the current period + /// (adding any due renewals) and, if one results, attempt payment. async fn autogenerate_invoice(&self, tenant: &Tenant) -> Result<()> { if let Some(invoice) = self.generate_invoice(tenant).await? { self.attempt_payment(tenant, &invoice).await?; @@ -266,11 +266,7 @@ impl Billing { } /// Reconcile pending activity, add this period's renewals if they're due, and - /// claim everything outstanding onto an invoice. Shared by the poll and the - /// on-demand invoice endpoint — safe to call either way: renewal is idempotent - /// per period (see [`command::renew_tenant`]). No payment is attempted here; - /// callers that want auto-pay do it on the returned invoice. `None` when - /// nothing is owed. + /// claim everything outstanding onto an invoice. Idempotent per period. pub async fn generate_invoice(&self, tenant: &Tenant) -> Result> { self.reconcile_subscription(tenant).await?; @@ -286,14 +282,16 @@ impl Billing { let period_start = period_start_at(anchor, now); let period_end = add_one_month(period_start); - // Short-circuit the renewal scan once this period is already renewed — the - // common case on all but the first poll of a period (saving ~720 scans a - // month per tenant). renew_tenant re-checks this in-tx as the real guard. + // Short-circuit the renewal scan if this period is already renewed, for + // performance. Renew_tenant re-checks this in-tx as the real guard. if tenant.renewed_at.is_none_or(|at| at < period_start) { self.renew_period(&tenant, period_start).await?; } - self.claim_outstanding(&tenant, period_start, period_end).await + /// Claim the tenant's outstanding items onto a fresh invoice if they net + /// positive; `None` when nothing is owed (a net credit stays outstanding and + /// carries to the next positive invoice). + command::create_invoice(&tenant.pubkey, period_start, period_end).await } /// Charge a full-period renewal for every relay that was active on a paid plan @@ -335,25 +333,6 @@ impl Billing { command::renew_tenant(&tenant.pubkey, period_start, &renewal_items).await } - /// Claim the tenant's outstanding items onto a fresh invoice if they net - /// positive; `None` when nothing is owed (a net credit stays outstanding and - /// carries to the next positive invoice). - async fn claim_outstanding( - &self, - tenant: &Tenant, - period_start: i64, - period_end: i64, - ) -> Result> { - let invoice_id = uuid::Uuid::new_v4().to_string(); - command::claim_outstanding_into_invoice( - &invoice_id, - &tenant.pubkey, - period_start, - period_end, - ) - .await - } - pub async fn attempt_payment(&self, tenant: &Tenant, invoice: &Invoice) -> Result<()> { let mut error_message: Option = None; diff --git a/backend/src/command.rs b/backend/src/command.rs index 44e1ffb..6d3aea9 100644 --- a/backend/src/command.rs +++ b/backend/src/command.rs @@ -59,6 +59,42 @@ pub async fn clear_tenant_nwc_error(pubkey: &str) -> Result<()> { Ok(()) } +/// Insert this period's renewal items and advance the tenant's `renewed_at` +/// marker to `period_start`, atomically and idempotently. +pub async fn renew_tenant( + tenant_pubkey: &str, + period_start: i64, + items: &[InvoiceItem], +) -> Result<()> { + with_tx(async |tx| { + // Re-read the marker inside the transaction so the guard and the writes + // commit together — this ensures idempotency so we don't double-invoice. + let renewed_at = sqlx::query_scalar::<_, Option>( + "SELECT renewed_at FROM tenant WHERE pubkey = ?", + ) + .bind(tenant_pubkey) + .fetch_one(&mut **tx) + .await?; + + if renewed_at.is_some_and(|at| at >= period_start) { + return Ok(()); + } + + for item in items { + insert_invoice_item_tx(tx, item).await?; + } + + sqlx::query("UPDATE tenant SET renewed_at = ? WHERE pubkey = ?") + .bind(period_start) + .bind(tenant_pubkey) + .execute(&mut **tx) + .await?; + + Ok(()) + }) + .await +} + // --- Relays --- pub async fn create_relay(relay: &Relay) -> Result<()> { @@ -132,6 +168,10 @@ pub async fn update_relay(relay: &Relay) -> Result<()> { Ok(()) } +pub async fn activate_relay(relay: &Relay) -> Result<()> { + set_relay_status(&relay.id, RELAY_STATUS_ACTIVE, "activate_relay").await +} + pub async fn deactivate_relay(relay: &Relay) -> Result<()> { set_relay_status(&relay.id, RELAY_STATUS_INACTIVE, "deactivate_relay").await } @@ -141,10 +181,6 @@ pub async fn mark_relay_delinquent(relay: &Relay) -> Result<()> { set_relay_status(&relay.id, RELAY_STATUS_DELINQUENT, "mark_relay_delinquent").await } -pub async fn activate_relay(relay: &Relay) -> Result<()> { - set_relay_status(&relay.id, RELAY_STATUS_ACTIVE, "activate_relay").await -} - async fn set_relay_status(relay_id: &str, status: &str, activity_type: &str) -> Result<()> { let activity = with_tx(async |tx| { sqlx::query("UPDATE relay SET status = ?, synced = 0 WHERE id = ?") @@ -186,7 +222,7 @@ pub async fn complete_relay_sync(relay_id: &str) -> Result<()> { Ok(()) } -// --- Invoice items (the outstanding-charge ledger) --- +// --- Invoice items --- /// Persist a reconciled activity's line item and mark the activity billed in one /// transaction, so a recovery pass never re-bills it. @@ -194,8 +230,11 @@ pub async fn insert_invoice_item_for_activity(invoice_item: &InvoiceItem, activi let now = chrono::Utc::now().timestamp(); with_tx(async |tx| { - insert_invoice_item_tx(tx, invoice_item).await?; - mark_activity_billed_tx(tx, activity_id, now).await?; + // Claim the activity first. If a concurrent reconcile pass already billed + // it, the claim no-ops and we skip the item rather than duplicating it. + if mark_activity_billed_tx(tx, activity_id, now).await? { + insert_invoice_item_tx(tx, invoice_item).await?; + } Ok(()) }) @@ -207,42 +246,8 @@ pub async fn insert_invoice_item_for_activity(invoice_item: &InvoiceItem, activi pub async fn mark_activity_billed(activity_id: &str) -> Result<()> { let now = chrono::Utc::now().timestamp(); - with_tx(async |tx| mark_activity_billed_tx(tx, activity_id, now).await).await -} - -/// Insert this period's renewal items and advance the tenant's `renewed_at` -/// marker to `period_start`, atomically. Idempotent: a repeat call for an -/// already-renewed period is a no-op, so a crash mid-renewal or a poll racing -/// the on-demand endpoint can't bill the same period twice. -pub async fn renew_tenant( - tenant_pubkey: &str, - period_start: i64, - items: &[InvoiceItem], -) -> Result<()> { with_tx(async |tx| { - // Re-read the marker inside the transaction so the guard and the writes - // commit together — this is the real idempotency backstop. - let renewed_at = sqlx::query_scalar::<_, Option>( - "SELECT renewed_at FROM tenant WHERE pubkey = ?", - ) - .bind(tenant_pubkey) - .fetch_one(&mut **tx) - .await?; - - if renewed_at.is_some_and(|at| at >= period_start) { - return Ok(()); - } - - for item in items { - insert_invoice_item_tx(tx, item).await?; - } - - sqlx::query("UPDATE tenant SET renewed_at = ? WHERE pubkey = ?") - .bind(period_start) - .bind(tenant_pubkey) - .execute(&mut **tx) - .await?; - + mark_activity_billed_tx(tx, activity_id, now).await?; Ok(()) }) .await @@ -250,12 +255,10 @@ pub async fn renew_tenant( // --- Invoices --- -/// Claim all of a tenant's outstanding items onto a new invoice — but only if -/// they sum to a positive amount. A non-positive balance (net credit or nothing -/// owed) leaves the items outstanding so the credit carries to the next positive +/// Claim all of a tenant's outstanding items onto a new invoice. A non-positive +/// balance leaves the items outstanding so the credit carries to the next positive /// invoice. Returns the invoice, or `None` when there's nothing to bill. -pub async fn claim_outstanding_into_invoice( - invoice_id: &str, +pub async fn create_invoice( tenant_pubkey: &str, period_start: i64, period_end: i64, @@ -274,13 +277,13 @@ pub async fn claim_outstanding_into_invoice( } let invoice = - insert_invoice_tx(tx, invoice_id, tenant_pubkey, period_start, period_end).await?; + insert_invoice_tx(tx, tenant_pubkey, period_start, period_end).await?; sqlx::query( "UPDATE invoice_item SET invoice_id = ? WHERE tenant_pubkey = ? AND invoice_id IS NULL", ) - .bind(invoice_id) + .bind(&invoice.id) .bind(tenant_pubkey) .execute(&mut **tx) .await?; @@ -414,12 +417,12 @@ async fn insert_activity_tx( async fn insert_invoice_tx( tx: &mut Transaction<'_, Sqlite>, - invoice_id: &str, tenant_pubkey: &str, period_start: i64, period_end: i64, ) -> Result { let now = chrono::Utc::now().timestamp(); + let invoice_id = uuid::Uuid::new_v4().to_string(); Ok(sqlx::query_as::<_, Invoice>( "INSERT INTO invoice (id, tenant_pubkey, status, period_start, period_end, created_at, updated_at) @@ -455,15 +458,18 @@ async fn insert_invoice_item_tx(tx: &mut Transaction<'_, Sqlite>, item: &Invoice Ok(()) } +/// Claim an activity as billed. Returns `true` if this call set the marker, and +/// `false` if it was already set — e.g. a concurrent reconcile pass won the race — +/// so callers can skip work that would otherwise double-bill. async fn mark_activity_billed_tx( tx: &mut Transaction<'_, Sqlite>, activity_id: &str, billed_at: i64, -) -> Result<()> { - sqlx::query("UPDATE activity SET billed_at = ? WHERE id = ?") +) -> Result { + let result = sqlx::query("UPDATE activity SET billed_at = ? WHERE id = ? AND billed_at IS NULL") .bind(billed_at) .bind(activity_id) .execute(&mut **tx) .await?; - Ok(()) + Ok(result.rows_affected() > 0) }