From 9491d608ae3148daf546a94931277e74d97dba3c Mon Sep 17 00:00:00 2001 From: Jon Staab Date: Mon, 23 Mar 2026 17:44:03 -0700 Subject: [PATCH] More billing work --- backend/BILLING.md | 94 +++++ backend/README.md | 2 +- backend/migrations/0001_init.sql | 63 +++- backend/src/api.rs | 596 +++++++++++++------------------ backend/src/billing.rs | 510 +++++++++++++++++++------- backend/src/main.rs | 11 +- backend/src/models.rs | 73 ++-- backend/src/notifications.rs | 18 +- backend/src/provisioning.rs | 1 - backend/src/repo.rs | 495 ++++++++++++++++++++----- justfile | 27 +- 11 files changed, 1253 insertions(+), 637 deletions(-) create mode 100644 backend/BILLING.md diff --git a/backend/BILLING.md b/backend/BILLING.md new file mode 100644 index 0000000..11c6d16 --- /dev/null +++ b/backend/BILLING.md @@ -0,0 +1,94 @@ +# Billing Architecture (Agreed) + +This document summarizes the agreed billing architecture for Caravel backend. + +## Billing model + +- Usage-based billing: **sats/hour** for relay operation. +- A relay is billable when it is provisioned/active in lifecycle terms. +- Billing is **monthly**, with a **rolling cycle anchored to tenant signup**. +- One **consolidated invoice per tenant** per billing period. + +## Metering and lifecycle + +- Add an append-only lifecycle event table in the backend database. +- Events are the source for usage computation. +- Canonical event timestamp field name: `created_at` (UTC). +- Lifecycle behavior is treated as a state machine for billing math (idempotent outcomes for repeated/no-op transitions). +- Transition validation is permissive (any transition can be recorded); billing logic interprets sequences. +- Billable time behavior: + - Start on `provisioned` + - Pause on `suspended` + - Stop on `deactivated` + - Resume immediately on unsuspend + +## Pricing + +- Price is per relay plan/tier in **sats/hour**. +- Rates are stored in a mutable `plans` table (current rate only). +- Mid-cycle plan changes are billed by time spent in each plan. +- Plan rate changes are retroactive for un-invoiced usage in the current open period. + +## Rounding and minimums + +- Round usage up to the next full hour. +- Minimum charge: **1 billable hour per relay per month**. + +## Invoice generation + +- A periodic worker creates invoices at billing boundaries. +- Existing relays at launch start billing from launch timestamp only (no historical backfill). +- Avoid duplicate invoices with a DB unique constraint on: + - `(tenant, period_start, period_end)` + +## Invoice status and attempts + +- `invoice_attempts` is the canonical history/state source. +- `invoices.status` is a synchronous projection updated in the same transaction as attempt writes. +- Each payment method attempt is its own row in `invoice_attempts`. +- Attempts in a single retry pass share a `run_id` UUID. + +## Collection order and fallback + +For each invoice collection run: + +1. Try **NWC** auto-pay +2. If not paid, try **Stripe** auto-pay +3. If still unpaid/unavailable, create Lightning invoice and show QR in-app +4. If neither NWC nor Stripe is configured, send a one-time **NIP-17 DM** with invoice/subscription status + +Notes: + +- Retry cadence: every 24 hours (NWC/Stripe retries). +- Do **not** resend DMs on retries. +- Lightning invoice refresh is in-app only when prior invoice expires. +- DM send is recorded as an `invoice_attempts` row (same `run_id` as triggering run). + +## Due dates, grace, and enforcement + +- Invoice due time is derived as: `invoice.created_at + 7 days`. +- Grace period: 7 days, relay service remains fully active during grace. +- If still unpaid after grace, billing flow marks tenant/account past due and performs billing-side handling. +- Full outstanding balance must be paid before billing status is considered clear. + +## Tenant and integration storage + +- Store billing cycle anchor on `tenants` (e.g., `billing_anchor_at`). +- Anchor can be reset when tenant goes from no non-free relays to having one again. +- Determine “has billable relays” by querying relays on demand (no counter cache). +- Keep NWC config in `tenants.nwc_url`. +- Store Stripe IDs directly on `tenants`. + +## Worker and runtime model + +- Scheduler runs inside backend service process. +- Multiple instances may run; correctness relies on DB idempotency and unique constraints. + +## Repository impact + +- Add migration(s) for lifecycle events and billing-related schema changes. +- Add repository methods in `backend/src/repo.rs` for: + - writing lifecycle events + - reading lifecycle events by relay/tenant/time window + - creating/fetching invoices with period boundaries + - writing invoice attempts and projecting invoice status atomically diff --git a/backend/README.md b/backend/README.md index f94f0f1..9b21117 100644 --- a/backend/README.md +++ b/backend/README.md @@ -86,7 +86,7 @@ This is ready to be used by API routes. The backend runs an in-process billing loop that: - Generates monthly invoices (using `NWC_URL`) -- Uses the tenant’s `tenant_nwc_url` for recurring pull payments (if set) +- Uses the tenant’s `nwc_url` for recurring pull payments (if set) - Sends NIP-17 DMs with invoices when recurring is off - Sends NIP-17 DMs on successful payment when recurring is on diff --git a/backend/migrations/0001_init.sql b/backend/migrations/0001_init.sql index 809f2d8..b5d9378 100644 --- a/backend/migrations/0001_init.sql +++ b/backend/migrations/0001_init.sql @@ -1,7 +1,11 @@ CREATE TABLE IF NOT EXISTS tenants ( pubkey TEXT PRIMARY KEY, status TEXT NOT NULL, - tenant_nwc_url TEXT NOT NULL DEFAULT "" + nwc_url TEXT NOT NULL DEFAULT "", + created_at INTEGER NOT NULL DEFAULT (UNIXEPOCH()), + billing_anchor_at INTEGER NOT NULL DEFAULT (UNIXEPOCH()), + stripe_customer_id TEXT NOT NULL DEFAULT '', + stripe_subscription_id TEXT NOT NULL DEFAULT '' ); CREATE TABLE IF NOT EXISTS relays ( @@ -22,18 +26,67 @@ CREATE TABLE IF NOT EXISTS invoices ( tenant TEXT NOT NULL, amount INTEGER NOT NULL, status TEXT NOT NULL, - created_at TEXT NOT NULL, - invoice TEXT NOT NULL, + created_at INTEGER NOT NULL, + bolt11 TEXT NOT NULL, + period_start INTEGER NOT NULL, + period_end INTEGER NOT NULL, FOREIGN KEY (tenant) REFERENCES tenants(pubkey) ); +CREATE UNIQUE INDEX IF NOT EXISTS invoices_tenant_period_unique + ON invoices (tenant, period_start, period_end); + CREATE TABLE IF NOT EXISTS invoice_items ( id TEXT PRIMARY KEY, invoice TEXT NOT NULL, relay TEXT NOT NULL, amount INTEGER NOT NULL, - period_start TEXT NOT NULL, - period_end TEXT NOT NULL, + period_start INTEGER NOT NULL, + period_end INTEGER NOT NULL, FOREIGN KEY (invoice) REFERENCES invoices(id), FOREIGN KEY (relay) REFERENCES relays(id) ); + +CREATE TABLE IF NOT EXISTS plans ( + id TEXT PRIMARY KEY, + sats_per_month INTEGER NOT NULL +); + +INSERT OR IGNORE INTO plans (id, sats_per_month) VALUES + ('free', 0), + ('basic', 10000), + ('growth', 50000); + +CREATE TABLE IF NOT EXISTS relay_lifecycle_events ( + id TEXT PRIMARY KEY, + relay TEXT NOT NULL, + tenant TEXT NOT NULL, + event_type TEXT NOT NULL, + plan TEXT NOT NULL, + created_at INTEGER NOT NULL, + FOREIGN KEY (relay) REFERENCES relays(id), + FOREIGN KEY (tenant) REFERENCES tenants(pubkey) +); + +CREATE INDEX IF NOT EXISTS relay_lifecycle_events_relay_idx + ON relay_lifecycle_events (relay, created_at); + +CREATE INDEX IF NOT EXISTS relay_lifecycle_events_tenant_idx + ON relay_lifecycle_events (tenant, created_at); + +CREATE TABLE IF NOT EXISTS invoice_attempts ( + id TEXT PRIMARY KEY, + invoice TEXT NOT NULL, + run_id TEXT NOT NULL, + method TEXT NOT NULL, + outcome TEXT NOT NULL, + error TEXT NOT NULL DEFAULT '', + created_at INTEGER NOT NULL, + FOREIGN KEY (invoice) REFERENCES invoices(id) +); + +CREATE INDEX IF NOT EXISTS invoice_attempts_invoice_idx + ON invoice_attempts (invoice, created_at); + +CREATE INDEX IF NOT EXISTS invoice_attempts_run_id_idx + ON invoice_attempts (run_id); diff --git a/backend/src/api.rs b/backend/src/api.rs index 4f6fecd..e44c7ff 100644 --- a/backend/src/api.rs +++ b/backend/src/api.rs @@ -12,7 +12,8 @@ use serde::{Deserialize, Serialize}; use serde_json::Value; use crate::auth::verify_nip98; -use crate::models::{NewTenant, Relay, RelayConfig}; +use crate::billing::now_ts; +use crate::models::{Relay, RelayConfig, Tenant}; use crate::provisioning::Provisioner; use crate::repo::Repo; @@ -62,6 +63,8 @@ pub fn router(state: AppState) -> Router { .with_state(state) } +// ── error helpers ───────────────────────────────────────────────────────────── + #[derive(Debug, Serialize)] struct ApiError { error: String, @@ -103,15 +106,23 @@ fn not_found() -> Response { .into_response() } +fn internal_error(msg: &str) -> Response { + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError { + error: msg.to_string(), + }), + ) + .into_response() +} + fn is_unique_subdomain_violation(err: &anyhow::Error) -> bool { let Some(sqlx_err) = err.downcast_ref::() else { return false; }; - let sqlx::Error::Database(db_err) = sqlx_err else { return false; }; - db_err.message().contains("relays.subdomain") || db_err.message().contains("relays_subdomain_unique") } @@ -136,11 +147,26 @@ fn extract_auth_pubkey(headers: &HeaderMap, method: &Method, uri: &Uri) -> Resul .path_and_query() .map(|v| v.as_str()) .unwrap_or(uri.path()); - let url = format!("{}://{}{}", scheme, host, path); + let url = format!("{scheme}://{host}{path}"); let pubkey = verify_nip98(auth_header, &url, method.as_str())?; Ok(pubkey.to_hex()) } +fn new_tenant(pubkey: &str) -> Tenant { + let now = now_ts(); + Tenant { + pubkey: pubkey.to_string(), + status: "active".to_string(), + nwc_url: String::new(), + created_at: now, + billing_anchor_at: now, + stripe_customer_id: String::new(), + stripe_subscription_id: String::new(), + } +} + +// ── tenant routes ───────────────────────────────────────────────────────────── + async fn get_tenant( State(state): State, headers: HeaderMap, @@ -148,37 +174,20 @@ async fn get_tenant( uri: Uri, ) -> Response { let pubkey = match extract_auth_pubkey(&headers, &method, &uri) { - Ok(pubkey) => pubkey, + Ok(p) => p, Err(_) => return unauthorized(), }; match state.repo.get_tenant(&pubkey).await { Ok(Some(tenant)) => (StatusCode::OK, Json(tenant)).into_response(), Ok(None) => { - let tenant = NewTenant { - pubkey: pubkey.clone(), - status: "active".to_string(), - tenant_nwc_url: "".to_string(), - }; - if state.repo.create_tenant(&tenant).await.is_ok() { - (StatusCode::OK, Json(tenant)).into_response() - } else { - ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(ApiError { - error: "failed to create tenant".into(), - }), - ) - .into_response() + let tenant = new_tenant(&pubkey); + match state.repo.create_tenant(&tenant).await { + Ok(()) => (StatusCode::OK, Json(tenant)).into_response(), + Err(_) => internal_error("failed to create tenant"), } } - Err(_) => ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(ApiError { - error: "failed to load tenant".into(), - }), - ) - .into_response(), + Err(_) => internal_error("failed to load tenant"), } } @@ -189,19 +198,13 @@ async fn list_tenant_relays( uri: Uri, ) -> Response { let pubkey = match extract_auth_pubkey(&headers, &method, &uri) { - Ok(pubkey) => pubkey, + Ok(p) => p, Err(_) => return unauthorized(), }; match state.repo.list_relays_by_tenant(&pubkey).await { Ok(relays) => (StatusCode::OK, Json(relays)).into_response(), - Err(_) => ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(ApiError { - error: "failed to load relays".into(), - }), - ) - .into_response(), + Err(_) => internal_error("failed to load relays"), } } @@ -223,35 +226,37 @@ async fn create_tenant_relay( Json(payload): Json, ) -> Response { let pubkey = match extract_auth_pubkey(&headers, &method, &uri) { - Ok(pubkey) => pubkey, + Ok(p) => p, Err(_) => return unauthorized(), }; - let tenant = NewTenant { - pubkey: pubkey.clone(), - status: "active".to_string(), - tenant_nwc_url: "".to_string(), - }; - - if let Err(_) = state.repo.create_tenant_if_missing(&tenant).await { - return ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(ApiError { - error: "failed to ensure tenant".into(), - }), - ) - .into_response(); + if state + .repo + .create_tenant_if_missing(&new_tenant(&pubkey)) + .await + .is_err() + { + return internal_error("failed to ensure tenant"); } - let id = payload.subdomain.replace('-', "_"); + let now = now_ts(); + + // If this is the tenant's first paid relay, reset the billing anchor + if payload.plan != "free" + && let Ok(0) = state.repo.count_billable_relays(&pubkey).await + { + let _ = state.repo.reset_billing_anchor(&pubkey, now).await; + } + + let relay_id = payload.subdomain.replace('-', "_"); let relay = Relay { - id: id.clone(), + id: relay_id.clone(), tenant: pubkey.clone(), name: payload.name, subdomain: payload.subdomain.clone(), icon: payload.icon, description: payload.description, - plan: payload.plan, + plan: payload.plan.clone(), status: "pending".to_string(), config: payload.config, }; @@ -266,28 +271,38 @@ async fn create_tenant_relay( ) .into_response(); } - - return ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(ApiError { - error: "failed to create relay".into(), - }), - ) - .into_response(); + return internal_error("failed to create relay"); } if let Err(err) = state.provisioner.create_relay(&relay).await { - tracing::error!(relay_id = relay.id, error = %err, "zooid create failed"); - let _ = state.repo.update_relay_status(&relay.id, "provisioning_failed").await; - return ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(ApiError { error: format!("failed to provision relay: {err}") }), - ) - .into_response(); + tracing::error!(relay_id, error = %err, "zooid create failed"); + let _ = state + .repo + .update_relay_status(&relay_id, "provisioning_failed") + .await; + return internal_error(&format!("failed to provision relay: {err}")); } - let _ = state.repo.update_relay_status(&relay.id, "active").await; + // Transition to active and write the provisioned lifecycle event + if let Err(err) = state + .repo + .transition_relay( + &relay_id, + &pubkey, + &payload.plan, + "active", + "provisioned", + now, + ) + .await + { + tracing::error!(relay_id, error = %err, "lifecycle event write failed"); + } + let relay = Relay { + status: "active".to_string(), + ..relay + }; (StatusCode::CREATED, Json(relay)).into_response() } @@ -299,7 +314,7 @@ async fn get_tenant_relay( Path(id): Path, ) -> Response { let pubkey = match extract_auth_pubkey(&headers, &method, &uri) { - Ok(pubkey) => pubkey, + Ok(p) => p, Err(_) => return unauthorized(), }; @@ -307,13 +322,7 @@ async fn get_tenant_relay( Ok(Some(relay)) if relay.tenant == pubkey => (StatusCode::OK, Json(relay)).into_response(), Ok(Some(_)) => forbidden(), Ok(None) => not_found(), - Err(_) => ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(ApiError { - error: "failed to load relay".into(), - }), - ) - .into_response(), + Err(_) => internal_error("failed to load relay"), } } @@ -335,22 +344,14 @@ async fn update_tenant_relay( Json(payload): Json, ) -> Response { let pubkey = match extract_auth_pubkey(&headers, &method, &uri) { - Ok(pubkey) => pubkey, + Ok(p) => p, Err(_) => return unauthorized(), }; let existing = match state.repo.get_relay(&id).await { Ok(Some(relay)) => relay, Ok(None) => return not_found(), - Err(_) => { - return ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(ApiError { - error: "failed to load relay".into(), - }), - ) - .into_response(); - } + Err(_) => return internal_error("failed to load relay"), }; if existing.tenant != pubkey { @@ -358,15 +359,12 @@ async fn update_tenant_relay( } let relay = Relay { - id: existing.id, - tenant: existing.tenant, name: payload.name, - subdomain: payload.subdomain.clone(), + subdomain: payload.subdomain, icon: payload.icon, description: payload.description, - plan: existing.plan, - status: existing.status, config: payload.config, + ..existing }; if let Err(err) = state.repo.upsert_relay(&relay).await { @@ -379,27 +377,15 @@ async fn update_tenant_relay( ) .into_response(); } - - return ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(ApiError { - error: "failed to update relay".into(), - }), - ) - .into_response(); + return internal_error("failed to update relay"); } if let Err(err) = state.provisioner.update_relay(&relay).await { tracing::error!(relay_id = relay.id, error = %err, "zooid patch failed"); - return ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(ApiError { error: format!("failed to provision relay: {err}") }), - ) - .into_response(); + return internal_error(&format!("failed to provision relay: {err}")); } let _ = state.repo.update_relay_status(&relay.id, "active").await; - (StatusCode::OK, Json(relay)).into_response() } @@ -417,12 +403,12 @@ async fn update_tenant_relay_plan( Json(payload): Json, ) -> Response { let pubkey = match extract_auth_pubkey(&headers, &method, &uri) { - Ok(pubkey) => pubkey, + Ok(p) => p, Err(_) => return unauthorized(), }; - let plan = payload.plan.trim().to_lowercase(); - if !matches!(plan.as_str(), "free" | "basic" | "growth") { + let new_plan = payload.plan.trim().to_lowercase(); + if !matches!(new_plan.as_str(), "free" | "basic" | "growth") { return ( StatusCode::BAD_REQUEST, Json(ApiError { @@ -435,81 +421,57 @@ async fn update_tenant_relay_plan( let existing = match state.repo.get_relay(&id).await { Ok(Some(relay)) => relay, Ok(None) => return not_found(), - Err(_) => { - return ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(ApiError { - error: "failed to load relay".into(), - }), - ) - .into_response(); - } + Err(_) => return internal_error("failed to load relay"), }; if existing.tenant != pubkey { return forbidden(); } + // No-op if plan unchanged + if existing.plan == new_plan { + return (StatusCode::OK, Json(existing)).into_response(); + } + + let now = now_ts(); + + // If switching to the first paid plan, reset billing anchor + if new_plan != "free" + && existing.plan == "free" + && let Ok(0) = state.repo.count_billable_relays(&pubkey).await + { + let _ = state.repo.reset_billing_anchor(&pubkey, now).await; + } + let mut relay = Relay { - plan, + plan: new_plan.clone(), ..existing }; - if relay.plan == "free" { relay.config = Some(disable_paid_features(relay.config)); } - if let Err(_) = state.repo.upsert_relay(&relay).await { - return ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(ApiError { - error: "failed to update relay plan".into(), - }), - ) - .into_response(); + if state.repo.upsert_relay(&relay).await.is_err() { + return internal_error("failed to update relay plan"); } if let Err(err) = state.provisioner.update_relay(&relay).await { tracing::error!(relay_id = relay.id, error = %err, "zooid patch failed"); - return ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(ApiError { - error: format!("failed to provision relay: {err}"), - }), - ) - .into_response(); + return internal_error(&format!("failed to provision relay: {err}")); } let _ = state.repo.update_relay_status(&relay.id, "active").await; - (StatusCode::OK, Json(relay)).into_response() -} - -fn disable_paid_features(config: Option) -> RelayConfig { - let mut cfg = config.unwrap_or_else(empty_relay_config); - set_config_bool(&mut cfg.blossom, "enabled", false); - set_config_bool(&mut cfg.livekit, "enabled", false); - cfg -} - -fn empty_relay_config() -> RelayConfig { - RelayConfig { - policy: None, - groups: None, - management: None, - blossom: None, - livekit: None, - push: None, + // Write a plan-change lifecycle event so billing can split by tier + if let Err(err) = state + .repo + .transition_relay(&relay.id, &pubkey, &new_plan, "active", "provisioned", now) + .await + { + tracing::warn!(relay_id = relay.id, error = %err, "plan-change lifecycle event failed"); } -} -fn set_config_bool(section: &mut Option, key: &str, enabled: bool) { - let mut object = section - .take() - .and_then(|value| value.as_object().cloned()) - .unwrap_or_default(); - object.insert(key.to_string(), Value::Bool(enabled)); - *section = Some(Value::Object(object)); + (StatusCode::OK, Json(relay)).into_response() } async fn deactivate_tenant_relay( @@ -520,44 +482,42 @@ async fn deactivate_tenant_relay( Path(id): Path, ) -> Response { let pubkey = match extract_auth_pubkey(&headers, &method, &uri) { - Ok(pubkey) => pubkey, + Ok(p) => p, Err(_) => return unauthorized(), }; let existing = match state.repo.get_relay(&id).await { Ok(Some(relay)) => relay, Ok(None) => return not_found(), - Err(_) => { - return ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(ApiError { - error: "failed to load relay".into(), - }), - ) - .into_response(); - } + Err(_) => return internal_error("failed to load relay"), }; if existing.tenant != pubkey { return forbidden(); } + let now = now_ts(); + if state + .repo + .transition_relay( + &id, + &pubkey, + &existing.plan, + "deactivated", + "deactivated", + now, + ) + .await + .is_err() + { + return internal_error("failed to deactivate relay"); + } + let relay = Relay { status: "deactivated".to_string(), config: None, ..existing }; - - if let Err(_) = state.repo.upsert_relay(&relay).await { - return ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(ApiError { - error: "failed to deactivate relay".into(), - }), - ) - .into_response(); - } - (StatusCode::OK, Json(relay)).into_response() } @@ -568,25 +528,19 @@ async fn list_tenant_invoices( uri: Uri, ) -> Response { let pubkey = match extract_auth_pubkey(&headers, &method, &uri) { - Ok(pubkey) => pubkey, + Ok(p) => p, Err(_) => return unauthorized(), }; match state.repo.list_invoices_by_tenant(&pubkey).await { Ok(invoices) => (StatusCode::OK, Json(invoices)).into_response(), - Err(_) => ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(ApiError { - error: "failed to load invoices".into(), - }), - ) - .into_response(), + Err(_) => internal_error("failed to load invoices"), } } #[derive(Debug, Deserialize, Serialize)] struct UpdateTenantBillingRequest { - tenant_nwc_url: String, + nwc_url: String, } async fn update_tenant_billing( @@ -597,54 +551,22 @@ async fn update_tenant_billing( Json(payload): Json, ) -> Response { let pubkey = match extract_auth_pubkey(&headers, &method, &uri) { - Ok(pubkey) => pubkey, + Ok(p) => p, Err(_) => return unauthorized(), }; - if let Err(_) = state + match state .repo - .update_tenant_nwc_url(&pubkey, &payload.tenant_nwc_url) + .update_tenant_nwc_url(&pubkey, &payload.nwc_url) .await { - return ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(ApiError { - error: "failed to update billing".into(), - }), - ) - .into_response(); - } - - (StatusCode::OK, Json(payload)).into_response() -} - -async fn admin_list_tenants( - State(state): State, - headers: HeaderMap, - method: Method, - uri: Uri, -) -> Response { - let pubkey = match extract_auth_pubkey(&headers, &method, &uri) { - Ok(pubkey) => pubkey, - Err(_) => return unauthorized(), - }; - - if !state.admin_pubkeys.contains(&pubkey) { - return forbidden(); - } - - match state.repo.list_tenants().await { - Ok(tenants) => (StatusCode::OK, Json(tenants)).into_response(), - Err(_) => ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(ApiError { - error: "failed to load tenants".into(), - }), - ) - .into_response(), + Ok(()) => (StatusCode::OK, Json(payload)).into_response(), + Err(_) => internal_error("failed to update billing"), } } +// ── admin routes ────────────────────────────────────────────────────────────── + #[derive(Debug, Serialize)] struct AdminCheckResponse { is_admin: bool, @@ -657,14 +579,32 @@ async fn admin_check( uri: Uri, ) -> Response { let pubkey = match extract_auth_pubkey(&headers, &method, &uri) { - Ok(pubkey) => pubkey, + Ok(p) => p, Err(_) => return unauthorized(), }; - let is_admin = state.admin_pubkeys.contains(&pubkey); (StatusCode::OK, Json(AdminCheckResponse { is_admin })).into_response() } +async fn admin_list_tenants( + State(state): State, + headers: HeaderMap, + method: Method, + uri: Uri, +) -> Response { + let pubkey = match extract_auth_pubkey(&headers, &method, &uri) { + Ok(p) => p, + Err(_) => return unauthorized(), + }; + if !state.admin_pubkeys.contains(&pubkey) { + return forbidden(); + } + match state.repo.list_tenants().await { + Ok(tenants) => (StatusCode::OK, Json(tenants)).into_response(), + Err(_) => internal_error("failed to load tenants"), + } +} + async fn admin_get_tenant( State(state): State, headers: HeaderMap, @@ -673,44 +613,26 @@ async fn admin_get_tenant( Path(pubkey): Path, ) -> Response { let admin = match extract_auth_pubkey(&headers, &method, &uri) { - Ok(pubkey) => pubkey, + Ok(p) => p, Err(_) => return unauthorized(), }; - if !state.admin_pubkeys.contains(&admin) { return forbidden(); } let tenant = match state.repo.get_tenant(&pubkey).await { - Ok(Some(tenant)) => tenant, + Ok(Some(t)) => t, Ok(None) => return not_found(), - Err(_) => { - return ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(ApiError { - error: "failed to load tenant".into(), - }), - ) - .into_response(); - } + Err(_) => return internal_error("failed to load tenant"), }; - let relays = match state.repo.list_relays_by_tenant(&pubkey).await { - Ok(relays) => relays, - Err(_) => { - return ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(ApiError { - error: "failed to load relays".into(), - }), - ) - .into_response(); - } + Ok(r) => r, + Err(_) => return internal_error("failed to load relays"), }; #[derive(Serialize)] struct TenantDetail { - tenant: crate::models::Tenant, + tenant: Tenant, relays: Vec, } @@ -731,49 +653,34 @@ async fn admin_update_tenant_status( Json(payload): Json, ) -> Response { let admin = match extract_auth_pubkey(&headers, &method, &uri) { - Ok(pubkey) => pubkey, + Ok(p) => p, Err(_) => return unauthorized(), }; - if !state.admin_pubkeys.contains(&admin) { return forbidden(); } let tenant = match state.repo.get_tenant(&pubkey).await { - Ok(Some(tenant)) => tenant, + Ok(Some(t)) => t, Ok(None) => return not_found(), - Err(_) => { - return ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(ApiError { - error: "failed to load tenant".into(), - }), - ) - .into_response(); - } + Err(_) => return internal_error("failed to load tenant"), }; - if let Err(_) = state + match state .repo .update_tenant_status(&tenant.pubkey, &payload.status) .await { - return ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(ApiError { - error: "failed to update tenant".into(), + Ok(()) => ( + StatusCode::OK, + Json(Tenant { + status: payload.status, + ..tenant }), ) - .into_response(); + .into_response(), + Err(_) => internal_error("failed to update tenant"), } - - let updated = NewTenant { - pubkey: tenant.pubkey, - status: payload.status, - tenant_nwc_url: tenant.tenant_nwc_url, - }; - - (StatusCode::OK, Json(updated)).into_response() } async fn admin_list_relays( @@ -783,23 +690,15 @@ async fn admin_list_relays( uri: Uri, ) -> Response { let admin = match extract_auth_pubkey(&headers, &method, &uri) { - Ok(pubkey) => pubkey, + Ok(p) => p, Err(_) => return unauthorized(), }; - if !state.admin_pubkeys.contains(&admin) { return forbidden(); } - match state.repo.list_relays().await { Ok(relays) => (StatusCode::OK, Json(relays)).into_response(), - Err(_) => ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(ApiError { - error: "failed to load relays".into(), - }), - ) - .into_response(), + Err(_) => internal_error("failed to load relays"), } } @@ -811,24 +710,16 @@ async fn admin_get_relay( Path(id): Path, ) -> Response { let admin = match extract_auth_pubkey(&headers, &method, &uri) { - Ok(pubkey) => pubkey, + Ok(p) => p, Err(_) => return unauthorized(), }; - if !state.admin_pubkeys.contains(&admin) { return forbidden(); } - match state.repo.get_relay(&id).await { Ok(Some(relay)) => (StatusCode::OK, Json(relay)).into_response(), Ok(None) => not_found(), - Err(_) => ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(ApiError { - error: "failed to load relay".into(), - }), - ) - .into_response(), + Err(_) => internal_error("failed to load relay"), } } @@ -841,10 +732,9 @@ async fn admin_update_relay( Json(payload): Json, ) -> Response { let admin = match extract_auth_pubkey(&headers, &method, &uri) { - Ok(pubkey) => pubkey, + Ok(p) => p, Err(_) => return unauthorized(), }; - if !state.admin_pubkeys.contains(&admin) { return forbidden(); } @@ -852,27 +742,16 @@ async fn admin_update_relay( let existing = match state.repo.get_relay(&id).await { Ok(Some(relay)) => relay, Ok(None) => return not_found(), - Err(_) => { - return ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(ApiError { - error: "failed to load relay".into(), - }), - ) - .into_response(); - } + Err(_) => return internal_error("failed to load relay"), }; let relay = Relay { - id: existing.id, - tenant: existing.tenant, name: payload.name, - subdomain: payload.subdomain.clone(), + subdomain: payload.subdomain, icon: payload.icon, description: payload.description, - plan: existing.plan, - status: existing.status, config: payload.config, + ..existing }; if let Err(err) = state.repo.upsert_relay(&relay).await { @@ -885,23 +764,12 @@ async fn admin_update_relay( ) .into_response(); } - - return ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(ApiError { - error: "failed to update relay".into(), - }), - ) - .into_response(); + return internal_error("failed to update relay"); } if let Err(err) = state.provisioner.update_relay(&relay).await { tracing::error!(relay_id = relay.id, error = %err, "zooid patch failed"); - return ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(ApiError { error: format!("failed to update relay config: {err}") }), - ) - .into_response(); + return internal_error(&format!("failed to update relay config: {err}")); } (StatusCode::OK, Json(relay)).into_response() @@ -915,10 +783,9 @@ async fn admin_deactivate_relay( Path(id): Path, ) -> Response { let admin = match extract_auth_pubkey(&headers, &method, &uri) { - Ok(pubkey) => pubkey, + Ok(p) => p, Err(_) => return unauthorized(), }; - if !state.admin_pubkeys.contains(&admin) { return forbidden(); } @@ -926,32 +793,55 @@ async fn admin_deactivate_relay( let existing = match state.repo.get_relay(&id).await { Ok(Some(relay)) => relay, Ok(None) => return not_found(), - Err(_) => { - return ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(ApiError { - error: "failed to load relay".into(), - }), - ) - .into_response(); - } + Err(_) => return internal_error("failed to load relay"), }; + let now = now_ts(); + if state + .repo + .transition_relay( + &id, + &existing.tenant, + &existing.plan, + "deactivated", + "deactivated", + now, + ) + .await + .is_err() + { + return internal_error("failed to deactivate relay"); + } + let relay = Relay { status: "deactivated".to_string(), config: None, ..existing }; - - if let Err(_) = state.repo.upsert_relay(&relay).await { - return ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(ApiError { - error: "failed to deactivate relay".into(), - }), - ) - .into_response(); - } - (StatusCode::OK, Json(relay)).into_response() } + +// ── relay config helpers ────────────────────────────────────────────────────── + +fn disable_paid_features(config: Option) -> RelayConfig { + let mut cfg = config.unwrap_or(RelayConfig { + policy: None, + groups: None, + management: None, + blossom: None, + livekit: None, + push: None, + }); + set_config_bool(&mut cfg.blossom, "enabled", false); + set_config_bool(&mut cfg.livekit, "enabled", false); + cfg +} + +fn set_config_bool(section: &mut Option, key: &str, enabled: bool) { + let mut obj = section + .take() + .and_then(|v| v.as_object().cloned()) + .unwrap_or_default(); + obj.insert(key.to_string(), Value::Bool(enabled)); + *section = Some(Value::Object(obj)); +} diff --git a/backend/src/billing.rs b/backend/src/billing.rs index 6d7dbb0..61fad8c 100644 --- a/backend/src/billing.rs +++ b/backend/src/billing.rs @@ -1,15 +1,22 @@ use anyhow::{Result, anyhow}; -use chrono::{DateTime, Months, Utc}; +use chrono::{DateTime, Months, TimeZone, Utc}; +use std::collections::HashMap; use tokio::time::{Duration, sleep}; use uuid::Uuid; -use crate::models::{Invoice, NewInvoice, NewInvoiceItem, Relay, Tenant}; +use crate::models::{Invoice, InvoiceAttempt, InvoiceItem, RelayLifecycleEvent, Tenant}; use crate::notifications::Nip17Notifier; use crate::repo::Repo; use nostr_sdk::nips::nip47::{self, MakeInvoiceRequest, NostrWalletConnectURI, PayInvoiceRequest}; use nostr_sdk::{Client, Filter, Keys, Kind, Timestamp}; +const GRACE_DAYS: i64 = 7; +const DUE_DAYS: i64 = 7; +const WORKER_INTERVAL_SECS: u64 = 300; + +// ── service ─────────────────────────────────────────────────────────────────── + #[derive(Clone)] pub struct BillingService { repo: Repo, @@ -31,135 +38,274 @@ impl BillingService { if let Err(err) = self.process_once().await { tracing::error!(error = %err, "billing run failed"); } - sleep(Duration::from_secs(300)).await; + sleep(Duration::from_secs(WORKER_INTERVAL_SECS)).await; } } async fn process_once(&self) -> Result<()> { let tenants = self.repo.list_tenants().await?; - for tenant in tenants { - if let Err(err) = self.bill_tenant(&tenant).await { - tracing::error!(tenant = tenant.pubkey, error = %err, "billing failed"); + for tenant in &tenants { + if let Err(err) = self.generate_invoice_if_due(tenant).await { + tracing::error!(tenant = %tenant.pubkey, error = %err, "invoice generation failed"); } - if let Err(err) = self.suspend_if_delinquent(&tenant).await { - tracing::error!(tenant = tenant.pubkey, error = %err, "grace period enforcement failed"); + } + for tenant in &tenants { + if let Err(err) = self.collect_outstanding(tenant).await { + tracing::error!(tenant = %tenant.pubkey, error = %err, "collection failed"); } } Ok(()) } - async fn bill_tenant(&self, tenant: &Tenant) -> Result<()> { + // ── invoice generation ──────────────────────────────────────────────────── + + async fn generate_invoice_if_due(&self, tenant: &Tenant) -> Result<()> { if tenant.status != "active" { return Ok(()); } - let relays = self.repo.list_relays_by_tenant(&tenant.pubkey).await?; - let active_relays = relays - .into_iter() - .filter(|relay| relay.status == "active") - .collect::>(); + let anchor = ts_to_dt(tenant.billing_anchor_at)?; + let now = Utc::now(); + let (period_start, period_end) = current_billing_period(anchor, now); - let invoices = self.repo.list_invoices_by_tenant(&tenant.pubkey).await?; - - let (period_start, period_end, should_bill) = next_billing_window(&invoices)?; - if !should_bill { + // Only generate once the period has closed + if now < period_end { return Ok(()); } + let plans = self.repo.list_plans().await?; + let plan_amount_map: HashMap = + plans.into_iter().map(|p| (p.id, p.sats_per_month)).collect(); + + let events = self + .repo + .list_lifecycle_events_for_tenant(&tenant.pubkey, dt_to_ts(period_end)) + .await?; + let invoice_id = Uuid::new_v4().to_string(); - let items = build_invoice_items(&invoice_id, &active_relays, period_start, period_end); - let total_amount: i64 = items.iter().map(|item| item.amount).sum(); + let items = compute_invoice_items( + &invoice_id, + &events, + &plan_amount_map, + period_start, + period_end, + ); - if total_amount == 0 { + let total: i64 = items.iter().map(|i| i.amount).sum(); + if total == 0 { return Ok(()); } - let invoice_str = self.make_invoice(total_amount).await?; - let invoice = NewInvoice { + let bolt11 = self.make_bolt11(total).await.unwrap_or_default(); + let invoice = Invoice { id: invoice_id.clone(), tenant: tenant.pubkey.clone(), - amount: total_amount, + amount: total, status: "pending".to_string(), - created_at: Utc::now().to_rfc3339(), - invoice: invoice_str.clone(), + created_at: dt_to_ts(now), + bolt11, + period_start: dt_to_ts(period_start), + period_end: dt_to_ts(period_end), }; - self.repo + let created = self + .repo .create_invoice_with_items(&invoice, &items) .await?; - if tenant.tenant_nwc_url.trim().is_empty() { - self.send_invoice_dm(tenant, &invoice, period_start, period_end) + if created { + tracing::info!(tenant = %tenant.pubkey, invoice = %invoice_id, amount = total, "invoice generated"); + } + + Ok(()) + } + + // ── collection ──────────────────────────────────────────────────────────── + + async fn collect_outstanding(&self, tenant: &Tenant) -> Result<()> { + let invoices = self.repo.list_invoices_by_tenant(&tenant.pubkey).await?; + let unpaid: Vec<&Invoice> = invoices + .iter() + .filter(|inv| matches!(inv.status.as_str(), "pending" | "past_due")) + .collect(); + + if unpaid.is_empty() { + return Ok(()); + } + + for invoice in &unpaid { + self.attempt_collection(tenant, invoice).await?; + } + + // Re-fetch to check if all are now paid; auto-reactivate if so + let invoices_after = self.repo.list_invoices_by_tenant(&tenant.pubkey).await?; + let still_unpaid = invoices_after + .iter() + .any(|inv| matches!(inv.status.as_str(), "pending" | "past_due")); + + if !still_unpaid && tenant.status == "suspended" { + let now = now_ts(); + self.repo + .update_tenant_status(&tenant.pubkey, "active") + .await?; + self.repo + .reactivate_relays_for_tenant(&tenant.pubkey, now) + .await?; + tracing::info!(tenant = %tenant.pubkey, "tenant reactivated after full balance payment"); + } + + Ok(()) + } + + async fn attempt_collection(&self, tenant: &Tenant, invoice: &Invoice) -> Result<()> { + let now = Utc::now(); + let created_at = ts_to_dt(invoice.created_at)?; + let due_at = created_at + chrono::Duration::days(DUE_DAYS); + let grace_ends_at = due_at + chrono::Duration::days(GRACE_DAYS); + + // Deactivate after grace period expires + if now > grace_ends_at && invoice.status != "past_due" { + let ts = now_ts(); + self.repo + .update_tenant_status(&tenant.pubkey, "suspended") + .await?; + self.repo + .suspend_relays_for_tenant(&tenant.pubkey, ts) + .await?; + self.repo + .record_attempt( + &InvoiceAttempt { + id: Uuid::new_v4().to_string(), + invoice: invoice.id.clone(), + run_id: Uuid::new_v4().to_string(), + method: "system".to_string(), + outcome: "failed".to_string(), + error: "grace period expired".to_string(), + created_at: ts, + }, + "past_due", + ) .await?; return Ok(()); } - match self.pay_invoice(&tenant.tenant_nwc_url, &invoice_str).await { - Ok(()) => { - self.repo.update_invoice_status(&invoice_id, "paid").await?; - self.send_payment_dm(tenant, &invoice).await?; + // Only retry once per 24h + let attempts = self.repo.list_attempts_for_invoice(&invoice.id).await?; + if let Some(last) = attempts.last() + && last.method != "nip17_dm" + { + let last_at = ts_to_dt(last.created_at)?; + if now - last_at < chrono::Duration::hours(24) { + return Ok(()); } - Err(err) => { - tracing::error!(tenant = tenant.pubkey, error = %err, "recurring payment failed"); + } + + let run_id = Uuid::new_v4().to_string(); + + // 1. Try NWC + if !tenant.nwc_url.trim().is_empty() { + match self + .pay_via_nwc(&tenant.nwc_url, &invoice.bolt11) + .await + { + Ok(()) => { + self.repo + .record_attempt( + &attempt(&invoice.id, &run_id, "nwc", "success", ""), + "paid", + ) + .await?; + return Ok(()); + } + Err(err) => { + tracing::warn!(tenant = %tenant.pubkey, error = %err, "NWC payment failed"); + self.repo + .record_attempt( + &attempt(&invoice.id, &run_id, "nwc", "failed", &err.to_string()), + &invoice.status, + ) + .await?; + } + } + } + + // 2. Try Stripe + if !tenant.stripe_subscription_id.trim().is_empty() { + match self.pay_via_stripe(tenant, invoice).await { + Ok(()) => { + self.repo + .record_attempt( + &attempt(&invoice.id, &run_id, "stripe", "success", ""), + "paid", + ) + .await?; + return Ok(()); + } + Err(err) => { + tracing::warn!(tenant = %tenant.pubkey, error = %err, "Stripe payment failed"); + self.repo + .record_attempt( + &attempt(&invoice.id, &run_id, "stripe", "failed", &err.to_string()), + &invoice.status, + ) + .await?; + } + } + } + + // 3. Fallback: Lightning invoice shown in-app; send one DM if no auto-pay configured + let dm_sent = self.repo.invoice_dm_sent(&invoice.id).await?; + if !dm_sent { + match self + .send_invoice_dm(tenant, invoice, invoice.bolt11.as_str()) + .await + { + Ok(()) => { + self.repo + .record_attempt( + &attempt(&invoice.id, &run_id, "nip17_dm", "sent", ""), + &invoice.status, + ) + .await?; + } + Err(err) => { + tracing::warn!(tenant = %tenant.pubkey, error = %err, "NIP-17 DM failed"); + } } } Ok(()) } - async fn suspend_if_delinquent(&self, tenant: &Tenant) -> Result<()> { - if tenant.status != "active" { - return Ok(()); - } + // ── payment providers ───────────────────────────────────────────────────── - let invoices = self.repo.list_invoices_by_tenant(&tenant.pubkey).await?; - let latest = match invoices.first() { - Some(invoice) => invoice, - None => return Ok(()), - }; - - if latest.status != "pending" { - return Ok(()); - } - - let created_at = parse_timestamp(&latest.created_at)?; - let deadline = created_at + chrono::Duration::days(7); - if Utc::now() < deadline { - return Ok(()); - } - - self.repo - .update_tenant_status(&tenant.pubkey, "suspended") - .await?; - self.repo.suspend_relays_for_tenant(&tenant.pubkey).await?; - Ok(()) - } - - async fn make_invoice(&self, amount: i64) -> Result { + async fn make_bolt11(&self, amount_sats: i64) -> Result { if self.platform_nwc_url.trim().is_empty() { return Err(anyhow!("NWC_URL is required to generate invoices")); } - let uri = NostrWalletConnectURI::parse(&self.platform_nwc_url)?; - let request = nip47::Request::make_invoice(MakeInvoiceRequest { - amount: (amount as u64) * 1_000, + let req = nip47::Request::make_invoice(MakeInvoiceRequest { + amount: (amount_sats as u64) * 1_000, description: Some("Relay hosting".to_string()), description_hash: None, expiry: None, }); - let response = self.send_nwc_request(&uri, request).await?; - Ok(response.to_make_invoice()?.invoice) + let resp = self.send_nwc_request(&uri, req).await?; + Ok(resp.to_make_invoice()?.invoice) } - async fn pay_invoice(&self, tenant_nwc_url: &str, invoice: &str) -> Result<()> { - let uri = NostrWalletConnectURI::parse(tenant_nwc_url)?; - let request = nip47::Request::pay_invoice(PayInvoiceRequest::new(invoice)); - self.send_nwc_request(&uri, request) - .await? - .to_pay_invoice()?; + async fn pay_via_nwc(&self, nwc_url: &str, bolt11: &str) -> Result<()> { + let uri = NostrWalletConnectURI::parse(nwc_url)?; + let req = nip47::Request::pay_invoice(PayInvoiceRequest::new(bolt11)); + self.send_nwc_request(&uri, req).await?.to_pay_invoice()?; Ok(()) } + async fn pay_via_stripe(&self, _tenant: &Tenant, _invoice: &Invoice) -> Result<()> { + // TODO: implement Stripe off-session charge using tenant.stripe_subscription_id + Err(anyhow!("Stripe not yet implemented")) + } + async fn send_nwc_request( &self, uri: &NostrWalletConnectURI, @@ -184,8 +330,8 @@ impl BillingService { let events = client.fetch_events(filter, Duration::from_secs(10)).await?; let event = events .into_iter() - .max_by_key(|event| event.created_at) - .ok_or_else(|| anyhow!("no NWC response"))?; + .max_by_key(|e| e.created_at) + .ok_or_else(|| anyhow!("no NWC response received"))?; Ok(nip47::Response::from_event(uri, &event)?) } @@ -193,85 +339,171 @@ impl BillingService { async fn send_invoice_dm( &self, tenant: &Tenant, - invoice: &NewInvoice, - period_start: DateTime, - period_end: DateTime, + invoice: &Invoice, + bolt11: &str, ) -> Result<()> { + let due_date = ts_to_dt(invoice.created_at + DUE_DAYS * 86400)?; + let period_start = ts_to_dt(invoice.period_start)?; + let period_end = ts_to_dt(invoice.period_end)?; let message = format!( - "Invoice due: {} sats\nPeriod: {} - {}\nInvoice: {}", + "You have an outstanding invoice of {} sats due by {}.\n\ + Period: {} → {}\n\ + Pay with Lightning:\n{}", invoice.amount, - period_start.to_rfc3339(), - period_end.to_rfc3339(), - invoice.invoice - ); - self.notifier.send(&tenant.pubkey, &message).await - } - - async fn send_payment_dm(&self, tenant: &Tenant, invoice: &NewInvoice) -> Result<()> { - let message = format!( - "Payment received: {} sats\nInvoice ID: {}", - invoice.amount, invoice.id + due_date.format("%Y-%m-%d"), + period_start.format("%Y-%m-%d"), + period_end.format("%Y-%m-%d"), + bolt11, ); self.notifier.send(&tenant.pubkey, &message).await } } -fn next_billing_window(invoices: &[Invoice]) -> Result<(DateTime, DateTime, bool)> { - let now = Utc::now(); - if invoices.is_empty() { - let end = now + Months::new(1); - return Ok((now, end, true)); - } +// ── billing math ────────────────────────────────────────────────────────────── - let last = &invoices[0]; - if last.status == "pending" { - return Ok((now, now, false)); +/// Given a billing anchor and the current time, return the current billing +/// period [start, end) based on rolling monthly windows from the anchor. +fn current_billing_period( + anchor: DateTime, + now: DateTime, +) -> (DateTime, DateTime) { + let mut period_start = anchor; + loop { + let period_end = period_start + Months::new(1); + if now < period_end { + return (period_start, period_end); + } + period_start = period_end; } - - let last_created = parse_timestamp(&last.created_at)?; - let next_due = last_created + Months::new(1); - if now < next_due { - return Ok((now, next_due, false)); - } - - Ok((last_created, next_due, true)) } -fn parse_timestamp(value: &str) -> Result> { - let parsed = DateTime::parse_from_rfc3339(value) - .map_err(|e| anyhow!("invalid timestamp {value}: {e}"))?; - Ok(parsed.with_timezone(&Utc)) -} - -fn build_invoice_items( +/// Compute per-relay billable sats for a billing period from the lifecycle +/// event log. Rules: +/// - Billing starts at `provisioned`, pauses at `suspended`, resumes at +/// `unsuspended`, stops at `deactivated`. +/// - Only time within [period_start, period_end) counts. +/// - Round each relay's total billable seconds up to the next full hour. +/// - Minimum 1 billable hour per relay per period. +/// - Amount is based on the relay's current plan amount (retroactive within period). +fn compute_invoice_items( invoice_id: &str, - relays: &[Relay], + events: &[RelayLifecycleEvent], + plan_amount_map: &HashMap, period_start: DateTime, period_end: DateTime, -) -> Vec { - relays - .iter() - .filter_map(|relay| { - let amount = plan_amount(&relay.plan); - if amount == 0 { - return None; - } - Some(NewInvoiceItem { - id: Uuid::new_v4().to_string(), - invoice: invoice_id.to_string(), - relay: relay.id.clone(), - amount, - period_start: period_start.to_rfc3339(), - period_end: period_end.to_rfc3339(), - }) - }) - .collect() +) -> Vec { + // Group events by relay, preserving sort order from the DB (relay, created_at, id) + let mut by_relay: HashMap<&str, Vec<&RelayLifecycleEvent>> = HashMap::new(); + for event in events { + by_relay.entry(&event.relay).or_default().push(event); + } + + let mut items = Vec::new(); + + for (relay_id, relay_events) in &by_relay { + // Use the latest plan for this relay (retroactive rate within period) + let Some(latest_event) = relay_events.last() else { + continue; + }; + let plan_amount = *plan_amount_map.get(latest_event.plan.as_str()).unwrap_or(&0); + if plan_amount == 0 { + continue; + } + + let billable_secs = billable_seconds_in_period(relay_events, period_start, period_end); + if billable_secs == 0 { + continue; + } + + // Round up to next full hour, minimum 1 hour + let hours = ((billable_secs as f64) / 3600.0).ceil().max(1.0) as i64; + + items.push(InvoiceItem { + id: Uuid::new_v4().to_string(), + invoice: invoice_id.to_string(), + relay: relay_id.to_string(), + amount: hours * plan_amount, + period_start: dt_to_ts(period_start), + period_end: dt_to_ts(period_end), + }); + } + + items } -fn plan_amount(plan: &str) -> i64 { - match plan { - "basic" => 10_000, - "growth" => 50_000, - _ => 0, +/// Compute total billable seconds for one relay within [period_start, period_end). +/// Replays the full event history to correctly handle events that precede the period. +fn billable_seconds_in_period( + events: &[&RelayLifecycleEvent], + period_start: DateTime, + period_end: DateTime, +) -> i64 { + let mut total_secs: i64 = 0; + let mut billing_start: Option> = None; + + for event in events { + let Ok(ts) = ts_to_dt(event.created_at) else { + continue; + }; + + match event.event_type.as_str() { + "provisioned" | "unsuspended" => { + if billing_start.is_none() { + billing_start = Some(ts.max(period_start)); + } + } + "suspended" | "deactivated" => { + if let Some(start) = billing_start.take() { + let end = ts.min(period_end); + if end > start { + total_secs += (end - start).num_seconds(); + } + } + } + _ => {} + } + } + + // Still billing at period end + if let Some(start) = billing_start + && period_end > start + { + total_secs += (period_end - start).num_seconds(); + } + + total_secs +} + +// ── helpers ─────────────────────────────────────────────────────────────────── + +pub fn now_ts() -> i64 { + Utc::now().timestamp() +} + +fn dt_to_ts(dt: DateTime) -> i64 { + dt.timestamp() +} + +fn ts_to_dt(ts: i64) -> Result> { + Utc.timestamp_opt(ts, 0) + .single() + .ok_or_else(|| anyhow!("invalid unix timestamp: {ts}")) +} + +fn attempt( + invoice_id: &str, + run_id: &str, + method: &str, + outcome: &str, + error: &str, +) -> InvoiceAttempt { + InvoiceAttempt { + id: Uuid::new_v4().to_string(), + invoice: invoice_id.to_string(), + run_id: run_id.to_string(), + method: method.to_string(), + outcome: outcome.to_string(), + error: error.to_string(), + created_at: now_ts(), } } diff --git a/backend/src/main.rs b/backend/src/main.rs index c8c495a..cece527 100644 --- a/backend/src/main.rs +++ b/backend/src/main.rs @@ -80,12 +80,11 @@ async fn main() -> Result<()> { } fn ensure_sqlite_dir(database_url: &str) -> Result<()> { - if let Some(path) = database_url.strip_prefix("sqlite://") { - if let Some(dir) = std::path::Path::new(path).parent() { - if !dir.as_os_str().is_empty() { - std::fs::create_dir_all(dir)?; - } - } + if let Some(path) = database_url.strip_prefix("sqlite://") + && let Some(dir) = std::path::Path::new(path).parent() + && !dir.as_os_str().is_empty() + { + std::fs::create_dir_all(dir)?; } Ok(()) } diff --git a/backend/src/models.rs b/backend/src/models.rs index 83be9d1..87b5e7f 100644 --- a/backend/src/models.rs +++ b/backend/src/models.rs @@ -14,14 +14,11 @@ pub struct RelayConfig { pub struct Tenant { pub pubkey: String, pub status: String, - pub tenant_nwc_url: String, -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct NewTenant { - pub pubkey: String, - pub status: String, - pub tenant_nwc_url: String, + pub nwc_url: String, + pub created_at: i64, + pub billing_anchor_at: i64, + pub stripe_customer_id: String, + pub stripe_subscription_id: String, } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -37,24 +34,38 @@ pub struct Relay { pub config: Option, } +#[derive(Debug, Clone, Serialize, Deserialize, sqlx::FromRow)] +pub struct Plan { + pub id: String, + pub sats_per_month: i64, +} + +/// Append-only record of relay lifecycle transitions (provisioned, suspended, +/// unsuspended, deactivated). Used as the source of truth for usage metering. +#[derive(Debug, Clone, Serialize, Deserialize, sqlx::FromRow)] +pub struct RelayLifecycleEvent { + pub id: String, + pub relay: String, + pub tenant: String, + /// One of: "provisioned", "suspended", "unsuspended", "deactivated" + pub event_type: String, + /// Plan active on the relay at the time of the event + pub plan: String, + pub created_at: i64, +} + #[derive(Debug, Clone, Serialize, Deserialize, sqlx::FromRow)] pub struct Invoice { pub id: String, pub tenant: String, pub amount: i64, + /// One of: "pending", "past_due", "paid", "void" pub status: String, - pub created_at: String, - pub invoice: String, -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct NewInvoice { - pub id: String, - pub tenant: String, - pub amount: i64, - pub status: String, - pub created_at: String, - pub invoice: String, + pub created_at: i64, + /// bolt11 invoice string (may be refreshed in-app when expired) + pub bolt11: String, + pub period_start: i64, + pub period_end: i64, } #[derive(Debug, Clone, Serialize, Deserialize, sqlx::FromRow)] @@ -63,16 +74,22 @@ pub struct InvoiceItem { pub invoice: String, pub relay: String, pub amount: i64, - pub period_start: String, - pub period_end: String, + pub period_start: i64, + pub period_end: i64, } -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct NewInvoiceItem { +/// Canonical history of payment attempts. `invoices.status` is a synchronous +/// projection updated in the same transaction as each new attempt row. +#[derive(Debug, Clone, Serialize, Deserialize, sqlx::FromRow)] +pub struct InvoiceAttempt { pub id: String, pub invoice: String, - pub relay: String, - pub amount: i64, - pub period_start: String, - pub period_end: String, + /// Groups all method attempts within a single collection run + pub run_id: String, + /// One of: "nwc", "stripe", "lightning", "nip17_dm" + pub method: String, + /// One of: "success", "failed", "sent" (for DM) + pub outcome: String, + pub error: String, + pub created_at: i64, } diff --git a/backend/src/notifications.rs b/backend/src/notifications.rs index 7a6f13d..5001458 100644 --- a/backend/src/notifications.rs +++ b/backend/src/notifications.rs @@ -63,10 +63,10 @@ impl Nip17Notifier { async fn fetch_dm_relays(&self, recipient: &str) -> Result> { let mut cache = self.cache.lock().await; - if let Some(entry) = cache.get(recipient) { - if entry.fetched_at.elapsed() < Duration::from_secs(300) { - return Ok(entry.relays.clone()); - } + if let Some(entry) = cache.get(recipient) + && entry.fetched_at.elapsed() < Duration::from_secs(300) + { + return Ok(entry.relays.clone()); } let pubkey = PublicKey::parse(recipient)?; @@ -79,12 +79,10 @@ impl Nip17Notifier { let mut relays = Vec::new(); if let Some(event) = events.into_iter().max_by_key(|event| event.created_at) { for tag in event.tags.iter() { - if let Some(first) = tag.as_slice().get(0) { - if first == "relay" { - if let Some(value) = tag.as_slice().get(1) { - relays.push(value.to_string()); - } - } + if tag.as_slice().first().is_some_and(|t| t == "relay") + && let Some(value) = tag.as_slice().get(1) + { + relays.push(value.to_string()); } } } diff --git a/backend/src/provisioning.rs b/backend/src/provisioning.rs index 688f89c..4855825 100644 --- a/backend/src/provisioning.rs +++ b/backend/src/provisioning.rs @@ -174,7 +174,6 @@ fn cfg_bool( .unwrap_or(default) } - fn generate_secret_hex() -> String { let mut bytes = [0u8; 32]; OsRng.fill_bytes(&mut bytes); diff --git a/backend/src/repo.rs b/backend/src/repo.rs index e94cf83..02461ea 100644 --- a/backend/src/repo.rs +++ b/backend/src/repo.rs @@ -2,9 +2,11 @@ use anyhow::Result; use sqlx::{Row, Sqlite, SqlitePool, Transaction}; use crate::models::{ - Invoice, InvoiceItem, NewInvoice, NewInvoiceItem, NewTenant, Relay, Tenant, + Invoice, InvoiceAttempt, InvoiceItem, Plan, Relay, RelayLifecycleEvent, Tenant, }; +// ── helpers ────────────────────────────────────────────────────────────────── + fn relay_from_row(row: sqlx::sqlite::SqliteRow) -> Relay { let config_json: Option = row.get("config"); let config = config_json.and_then(|s| serde_json::from_str(&s).ok()); @@ -21,6 +23,20 @@ fn relay_from_row(row: sqlx::sqlite::SqliteRow) -> Relay { } } +const TENANT_COLS: &str = + "pubkey, status, nwc_url, created_at, billing_anchor_at, stripe_customer_id, stripe_subscription_id"; + +const RELAY_COLS: &str = "id, tenant, name, subdomain, icon, description, plan, status, config"; + +const INVOICE_COLS: &str = + "id, tenant, amount, status, created_at, bolt11, period_start, period_end"; + +const LIFECYCLE_COLS: &str = "id, relay, tenant, event_type, plan, created_at"; + +const ATTEMPT_COLS: &str = "id, invoice, run_id, method, outcome, error, created_at"; + +// ── Repo ───────────────────────────────────────────────────────────────────── + #[derive(Clone)] pub struct Repo { pool: SqlitePool, @@ -31,32 +47,49 @@ impl Repo { Self { pool } } - pub async fn create_tenant(&self, tenant: &NewTenant) -> Result<()> { - sqlx::query("INSERT INTO tenants (pubkey, status, tenant_nwc_url) VALUES (?, ?, ?)") - .bind(&tenant.pubkey) - .bind(&tenant.status) - .bind(&tenant.tenant_nwc_url) - .execute(&self.pool) - .await?; - Ok(()) - } + // ── tenants ────────────────────────────────────────────────────────────── - pub async fn create_tenant_if_missing(&self, tenant: &NewTenant) -> Result<()> { + pub async fn create_tenant(&self, tenant: &Tenant) -> Result<()> { sqlx::query( - "INSERT OR IGNORE INTO tenants (pubkey, status, tenant_nwc_url) VALUES (?, ?, ?)", + "INSERT INTO tenants (pubkey, status, nwc_url, created_at, billing_anchor_at, + stripe_customer_id, stripe_subscription_id) + VALUES (?, ?, ?, ?, ?, ?, ?)", ) .bind(&tenant.pubkey) .bind(&tenant.status) - .bind(&tenant.tenant_nwc_url) + .bind(&tenant.nwc_url) + .bind(tenant.created_at) + .bind(tenant.billing_anchor_at) + .bind(&tenant.stripe_customer_id) + .bind(&tenant.stripe_subscription_id) + .execute(&self.pool) + .await?; + Ok(()) + } + + pub async fn create_tenant_if_missing(&self, tenant: &Tenant) -> Result<()> { + sqlx::query( + "INSERT OR IGNORE INTO tenants + (pubkey, status, nwc_url, created_at, billing_anchor_at, + stripe_customer_id, stripe_subscription_id) + VALUES (?, ?, ?, ?, ?, ?, ?)", + ) + .bind(&tenant.pubkey) + .bind(&tenant.status) + .bind(&tenant.nwc_url) + .bind(tenant.created_at) + .bind(tenant.billing_anchor_at) + .bind(&tenant.stripe_customer_id) + .bind(&tenant.stripe_subscription_id) .execute(&self.pool) .await?; Ok(()) } pub async fn get_tenant(&self, pubkey: &str) -> Result> { - let tenant = sqlx::query_as::<_, Tenant>( - "SELECT pubkey, status, tenant_nwc_url FROM tenants WHERE pubkey = ?", - ) + let tenant = sqlx::query_as::<_, Tenant>(&format!( + "SELECT {TENANT_COLS} FROM tenants WHERE pubkey = ?" + )) .bind(pubkey) .fetch_optional(&self.pool) .await?; @@ -64,9 +97,9 @@ impl Repo { } pub async fn list_tenants(&self) -> Result> { - let tenants = sqlx::query_as::<_, Tenant>( - "SELECT pubkey, status, tenant_nwc_url FROM tenants ORDER BY pubkey", - ) + let tenants = sqlx::query_as::<_, Tenant>(&format!( + "SELECT {TENANT_COLS} FROM tenants ORDER BY pubkey" + )) .fetch_all(&self.pool) .await?; Ok(tenants) @@ -81,29 +114,80 @@ impl Repo { Ok(()) } - pub async fn update_tenant_nwc_url(&self, pubkey: &str, tenant_nwc_url: &str) -> Result<()> { - sqlx::query("UPDATE tenants SET tenant_nwc_url = ? WHERE pubkey = ?") - .bind(tenant_nwc_url) + pub async fn update_tenant_nwc_url(&self, pubkey: &str, nwc_url: &str) -> Result<()> { + sqlx::query("UPDATE tenants SET nwc_url = ? WHERE pubkey = ?") + .bind(nwc_url) .bind(pubkey) .execute(&self.pool) .await?; Ok(()) } - pub async fn upsert_relay(&self, relay: &Relay) -> Result<()> { - let config_json = relay.config.as_ref().map(serde_json::to_string).transpose()?; + #[allow(dead_code)] + pub async fn update_tenant_billing_integrations( + &self, + pubkey: &str, + nwc_url: &str, + stripe_customer_id: &str, + stripe_subscription_id: &str, + ) -> Result<()> { sqlx::query( - "INSERT INTO relays (id, tenant, name, subdomain, icon, description, plan, status, config) + "UPDATE tenants + SET nwc_url = ?, stripe_customer_id = ?, stripe_subscription_id = ? + WHERE pubkey = ?", + ) + .bind(nwc_url) + .bind(stripe_customer_id) + .bind(stripe_subscription_id) + .bind(pubkey) + .execute(&self.pool) + .await?; + Ok(()) + } + + /// Reset the billing cycle anchor. Called when a tenant adds their first + /// paid relay after having none. + pub async fn reset_billing_anchor(&self, pubkey: &str, anchor_at: i64) -> Result<()> { + sqlx::query("UPDATE tenants SET billing_anchor_at = ? WHERE pubkey = ?") + .bind(anchor_at) + .bind(pubkey) + .execute(&self.pool) + .await?; + Ok(()) + } + + /// Returns the count of non-free, non-deactivated relays for a tenant. + pub async fn count_billable_relays(&self, tenant: &str) -> Result { + let count: i64 = sqlx::query_scalar( + "SELECT COUNT(*) FROM relays + WHERE tenant = ? AND plan != 'free' AND status != 'deactivated'", + ) + .bind(tenant) + .fetch_one(&self.pool) + .await?; + Ok(count) + } + + // ── relays ──────────────────────────────────────────────────────────────── + + pub async fn upsert_relay(&self, relay: &Relay) -> Result<()> { + let config_json = relay + .config + .as_ref() + .map(serde_json::to_string) + .transpose()?; + sqlx::query(&format!( + "INSERT INTO relays ({RELAY_COLS}) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(id) DO UPDATE SET - name = excluded.name, - subdomain = excluded.subdomain, - icon = excluded.icon, + name = excluded.name, + subdomain = excluded.subdomain, + icon = excluded.icon, description = excluded.description, - plan = excluded.plan, - status = excluded.status, - config = excluded.config", - ) + plan = excluded.plan, + status = excluded.status, + config = excluded.config" + )) .bind(&relay.id) .bind(&relay.tenant) .bind(&relay.name) @@ -127,30 +211,18 @@ impl Repo { Ok(()) } - pub async fn suspend_relays_for_tenant(&self, tenant: &str) -> Result<()> { - sqlx::query( - "UPDATE relays SET status = 'suspended' WHERE tenant = ? AND status = 'active'", - ) - .bind(tenant) - .execute(&self.pool) - .await?; - Ok(()) - } - pub async fn get_relay(&self, id: &str) -> Result> { - let row = sqlx::query( - "SELECT id, tenant, name, subdomain, icon, description, plan, status, config FROM relays WHERE id = ?", - ) - .bind(id) - .fetch_optional(&self.pool) - .await?; + let row = sqlx::query(&format!("SELECT {RELAY_COLS} FROM relays WHERE id = ?")) + .bind(id) + .fetch_optional(&self.pool) + .await?; Ok(row.map(relay_from_row)) } pub async fn list_relays_by_tenant(&self, tenant: &str) -> Result> { - let rows = sqlx::query( - "SELECT id, tenant, name, subdomain, icon, description, plan, status, config FROM relays WHERE tenant = ? ORDER BY name", - ) + let rows = sqlx::query(&format!( + "SELECT {RELAY_COLS} FROM relays WHERE tenant = ? ORDER BY name" + )) .bind(tenant) .fetch_all(&self.pool) .await?; @@ -158,34 +230,84 @@ impl Repo { } pub async fn list_relays(&self) -> Result> { - let rows = sqlx::query( - "SELECT id, tenant, name, subdomain, icon, description, plan, status, config FROM relays ORDER BY name", - ) - .fetch_all(&self.pool) - .await?; + let rows = sqlx::query(&format!("SELECT {RELAY_COLS} FROM relays ORDER BY name")) + .fetch_all(&self.pool) + .await?; Ok(rows.into_iter().map(relay_from_row).collect()) } + // ── lifecycle events ────────────────────────────────────────────────────── + + /// All lifecycle events for a tenant up to (exclusive) a given timestamp, + /// ordered by relay then time — used by the invoice generation worker. + pub async fn list_lifecycle_events_for_tenant( + &self, + tenant: &str, + before: i64, + ) -> Result> { + let rows = sqlx::query_as::<_, RelayLifecycleEvent>(&format!( + "SELECT {LIFECYCLE_COLS} FROM relay_lifecycle_events + WHERE tenant = ? AND created_at < ? + ORDER BY relay, created_at, id" + )) + .bind(tenant) + .bind(before) + .fetch_all(&self.pool) + .await?; + Ok(rows) + } + + // ── plans ───────────────────────────────────────────────────────────────── + + pub async fn list_plans(&self) -> Result> { + let plans = + sqlx::query_as::<_, Plan>("SELECT id, sats_per_month FROM plans ORDER BY sats_per_month") + .fetch_all(&self.pool) + .await?; + Ok(plans) + } + + #[allow(dead_code)] + pub async fn get_plan(&self, id: &str) -> Result> { + let plan = sqlx::query_as::<_, Plan>("SELECT id, sats_per_month FROM plans WHERE id = ?") + .bind(id) + .fetch_optional(&self.pool) + .await?; + Ok(plan) + } + + // ── invoices ────────────────────────────────────────────────────────────── + + /// Insert an invoice and its line items atomically. + /// Returns `false` (no-op) if an invoice for this tenant+period already exists. pub async fn create_invoice_with_items( &self, - invoice: &NewInvoice, - items: &[NewInvoiceItem], - ) -> Result<()> { + invoice: &Invoice, + items: &[InvoiceItem], + ) -> Result { let mut tx: Transaction<'_, Sqlite> = self.pool.begin().await?; - sqlx::query( - "INSERT INTO invoices (id, tenant, amount, status, created_at, invoice) - VALUES (?, ?, ?, ?, ?, ?)", + let result = sqlx::query( + "INSERT INTO invoices (id, tenant, amount, status, created_at, bolt11, period_start, period_end) + VALUES (?, ?, ?, ?, ?, ?, ?, ?) + ON CONFLICT(tenant, period_start, period_end) DO NOTHING", ) .bind(&invoice.id) .bind(&invoice.tenant) .bind(invoice.amount) .bind(&invoice.status) - .bind(&invoice.created_at) - .bind(&invoice.invoice) + .bind(invoice.created_at) + .bind(&invoice.bolt11) + .bind(invoice.period_start) + .bind(invoice.period_end) .execute(&mut *tx) .await?; + if result.rows_affected() == 0 { + tx.rollback().await?; + return Ok(false); + } + for item in items { sqlx::query( "INSERT INTO invoice_items (id, invoice, relay, amount, period_start, period_end) @@ -195,8 +317,195 @@ impl Repo { .bind(&item.invoice) .bind(&item.relay) .bind(item.amount) - .bind(&item.period_start) - .bind(&item.period_end) + .bind(item.period_start) + .bind(item.period_end) + .execute(&mut *tx) + .await?; + } + + tx.commit().await?; + Ok(true) + } + + #[allow(dead_code)] + pub async fn get_invoice(&self, id: &str) -> Result> { + let invoice = sqlx::query_as::<_, Invoice>(&format!( + "SELECT {INVOICE_COLS} FROM invoices WHERE id = ?" + )) + .bind(id) + .fetch_optional(&self.pool) + .await?; + Ok(invoice) + } + + pub async fn list_invoices_by_tenant(&self, tenant: &str) -> Result> { + let invoices = sqlx::query_as::<_, Invoice>(&format!( + "SELECT {INVOICE_COLS} FROM invoices WHERE tenant = ? ORDER BY created_at DESC" + )) + .bind(tenant) + .fetch_all(&self.pool) + .await?; + Ok(invoices) + } + + #[allow(dead_code)] + pub async fn list_invoice_items(&self, invoice_id: &str) -> Result> { + let items = sqlx::query_as::<_, InvoiceItem>( + "SELECT id, invoice, relay, amount, period_start, period_end + FROM invoice_items WHERE invoice = ?", + ) + .bind(invoice_id) + .fetch_all(&self.pool) + .await?; + Ok(items) + } + + // ── invoice attempts ────────────────────────────────────────────────────── + + /// Record a payment attempt and synchronously project the new invoice status. + pub async fn record_attempt( + &self, + attempt: &InvoiceAttempt, + projected_invoice_status: &str, + ) -> Result<()> { + let mut tx: Transaction<'_, Sqlite> = self.pool.begin().await?; + + sqlx::query(&format!( + "INSERT INTO invoice_attempts ({ATTEMPT_COLS}) VALUES (?, ?, ?, ?, ?, ?, ?)" + )) + .bind(&attempt.id) + .bind(&attempt.invoice) + .bind(&attempt.run_id) + .bind(&attempt.method) + .bind(&attempt.outcome) + .bind(&attempt.error) + .bind(attempt.created_at) + .execute(&mut *tx) + .await?; + + sqlx::query("UPDATE invoices SET status = ? WHERE id = ?") + .bind(projected_invoice_status) + .bind(&attempt.invoice) + .execute(&mut *tx) + .await?; + + tx.commit().await?; + Ok(()) + } + + pub async fn list_attempts_for_invoice(&self, invoice_id: &str) -> Result> { + let attempts = sqlx::query_as::<_, InvoiceAttempt>(&format!( + "SELECT {ATTEMPT_COLS} FROM invoice_attempts + WHERE invoice = ? + ORDER BY created_at ASC, id ASC" + )) + .bind(invoice_id) + .fetch_all(&self.pool) + .await?; + Ok(attempts) + } + + /// Returns true if a DM has already been sent for this invoice (to enforce + /// the one-DM-per-invoice rule). + pub async fn invoice_dm_sent(&self, invoice_id: &str) -> Result { + let count: i64 = sqlx::query_scalar( + "SELECT COUNT(*) FROM invoice_attempts + WHERE invoice = ? AND method = 'nip17_dm' AND outcome = 'sent'", + ) + .bind(invoice_id) + .fetch_one(&self.pool) + .await?; + Ok(count > 0) + } + + // ── relay lifecycle + status: transactional helpers ─────────────────────── + + /// Transition a relay's status and write the corresponding lifecycle event + /// atomically. No-ops if the relay is already in `new_status`. + pub async fn transition_relay( + &self, + relay_id: &str, + tenant_pubkey: &str, + plan: &str, + new_status: &str, + event_type: &str, + now: i64, + ) -> Result { + let mut tx: Transaction<'_, Sqlite> = self.pool.begin().await?; + + let current: Option = sqlx::query_scalar("SELECT status FROM relays WHERE id = ?") + .bind(relay_id) + .fetch_optional(&mut *tx) + .await?; + + let current_status = match current { + Some(s) => s, + None => return Ok(false), + }; + + if current_status == new_status { + return Ok(true); // idempotent no-op + } + + sqlx::query("UPDATE relays SET status = ? WHERE id = ?") + .bind(new_status) + .bind(relay_id) + .execute(&mut *tx) + .await?; + + let event = RelayLifecycleEvent { + id: uuid::Uuid::new_v4().to_string(), + relay: relay_id.to_string(), + tenant: tenant_pubkey.to_string(), + event_type: event_type.to_string(), + plan: plan.to_string(), + created_at: now, + }; + + sqlx::query(&format!( + "INSERT INTO relay_lifecycle_events ({LIFECYCLE_COLS}) VALUES (?, ?, ?, ?, ?, ?)" + )) + .bind(&event.id) + .bind(&event.relay) + .bind(&event.tenant) + .bind(&event.event_type) + .bind(&event.plan) + .bind(event.created_at) + .execute(&mut *tx) + .await?; + + tx.commit().await?; + Ok(true) + } + + /// Suspend all active relays for a tenant, writing lifecycle events for each. + pub async fn suspend_relays_for_tenant(&self, tenant: &str, now: i64) -> Result<()> { + let mut tx: Transaction<'_, Sqlite> = self.pool.begin().await?; + + let rows = + sqlx::query("SELECT id, plan FROM relays WHERE tenant = ? AND status = 'active'") + .bind(tenant) + .fetch_all(&mut *tx) + .await?; + + for row in rows { + let relay_id: String = row.get("id"); + let plan: String = row.get("plan"); + + sqlx::query("UPDATE relays SET status = 'suspended' WHERE id = ?") + .bind(&relay_id) + .execute(&mut *tx) + .await?; + + sqlx::query(&format!( + "INSERT INTO relay_lifecycle_events ({LIFECYCLE_COLS}) VALUES (?, ?, ?, ?, ?, ?)" + )) + .bind(uuid::Uuid::new_v4().to_string()) + .bind(&relay_id) + .bind(tenant) + .bind("suspended") + .bind(plan) + .bind(now) .execute(&mut *tx) .await?; } @@ -205,32 +514,40 @@ impl Repo { Ok(()) } - pub async fn list_invoices_by_tenant(&self, tenant: &str) -> Result> { - let invoices = sqlx::query_as::<_, Invoice>( - "SELECT id, tenant, amount, status, created_at, invoice FROM invoices WHERE tenant = ? ORDER BY created_at DESC", - ) - .bind(tenant) - .fetch_all(&self.pool) - .await?; - Ok(invoices) - } + /// Reactivate all billing-suspended relays for a tenant (used after full + /// balance payment). + pub async fn reactivate_relays_for_tenant(&self, tenant: &str, now: i64) -> Result<()> { + let mut tx: Transaction<'_, Sqlite> = self.pool.begin().await?; - pub async fn list_invoice_items(&self, invoice_id: &str) -> Result> { - let items = sqlx::query_as::<_, InvoiceItem>( - "SELECT id, invoice, relay, amount, period_start, period_end FROM invoice_items WHERE invoice = ?", - ) - .bind(invoice_id) - .fetch_all(&self.pool) - .await?; - Ok(items) - } + let rows = + sqlx::query("SELECT id, plan FROM relays WHERE tenant = ? AND status = 'suspended'") + .bind(tenant) + .fetch_all(&mut *tx) + .await?; - pub async fn update_invoice_status(&self, id: &str, status: &str) -> Result<()> { - sqlx::query("UPDATE invoices SET status = ? WHERE id = ?") - .bind(status) - .bind(id) - .execute(&self.pool) + for row in rows { + let relay_id: String = row.get("id"); + let plan: String = row.get("plan"); + + sqlx::query("UPDATE relays SET status = 'active' WHERE id = ?") + .bind(&relay_id) + .execute(&mut *tx) + .await?; + + sqlx::query(&format!( + "INSERT INTO relay_lifecycle_events ({LIFECYCLE_COLS}) VALUES (?, ?, ?, ?, ?, ?)" + )) + .bind(uuid::Uuid::new_v4().to_string()) + .bind(&relay_id) + .bind(tenant) + .bind("unsuspended") + .bind(plan) + .bind(now) + .execute(&mut *tx) .await?; + } + + tx.commit().await?; Ok(()) } } diff --git a/justfile b/justfile index 2ca7edc..0035973 100644 --- a/justfile +++ b/justfile @@ -1,6 +1,23 @@ dev: - #!/usr/bin/env sh - trap 'kill 0' EXIT - cd backend && onchange src -ik -- bash -c 'RUST_LOG=backend=info cargo run' & - cd frontend && bun dev & - wait + #!/usr/bin/env sh + trap 'kill 0' EXIT + cd backend && onchange src -ik -- bash -c 'RUST_LOG=backend=info cargo run' & + cd frontend && bun dev & + wait + +fmt-backend: + cd backend && cargo fmt + +fmt: fmt-backend + +lint-backend: + cd backend && cargo clippy -- -D warnings + +lint: lint-backend + +build-backend: + cd backend && cargo build + +build: build-backend + +check: fmt lint build