From f7bd3e53fe1bda3c0af54ae8a25c8a15e08cbe78 Mon Sep 17 00:00:00 2001 From: Jon Staab Date: Thu, 28 May 2026 15:53:02 -0700 Subject: [PATCH] Add snapshots to activity --- backend/migrations/0001_init.sql | 2 +- backend/src/billing.rs | 87 +++++++--------------- backend/src/command.rs | 119 ++++++++++++++++--------------- backend/src/infra.rs | 2 +- backend/src/models.rs | 23 +++++- backend/src/query.rs | 41 +++++------ backend/src/routes/plans.rs | 6 +- backend/src/routes/relays.rs | 5 +- 8 files changed, 133 insertions(+), 152 deletions(-) diff --git a/backend/migrations/0001_init.sql b/backend/migrations/0001_init.sql index 97f400a..e2a2f05 100644 --- a/backend/migrations/0001_init.sql +++ b/backend/migrations/0001_init.sql @@ -16,7 +16,7 @@ CREATE TABLE IF NOT EXISTS activity ( resource_type TEXT NOT NULL, resource_id TEXT NOT NULL, billed_at INTEGER, - plan_id TEXT, + snapshot TEXT NOT NULL, FOREIGN KEY (tenant_pubkey) REFERENCES tenant(pubkey) ); diff --git a/backend/src/billing.rs b/backend/src/billing.rs index c3c34bc..72b55c1 100644 --- a/backend/src/billing.rs +++ b/backend/src/billing.rs @@ -1,11 +1,10 @@ use anyhow::{Result, anyhow}; -use std::collections::HashMap; use std::time::Duration; use crate::bitcoin; use crate::command; use crate::env; -use crate::models::{Activity, Bolt11, Invoice, InvoiceItem, Tenant}; +use crate::models::{Activity, Bolt11, Invoice, InvoiceItem, RELAY_STATUS_ACTIVE, Snapshot, Tenant}; use crate::query; use crate::robot::Robot; use crate::stripe::Stripe; @@ -141,9 +140,7 @@ impl Billing { let Some(relay) = query::get_relay(&activity.resource_id).await? else { return Err(anyhow!("activity resource was not a valid relay")); }; - let Some(plan) = query::get_plan(&relay.plan_id) else { - return Err(anyhow!("activity plan was not a valid plan")); - }; + let plan = query::get_plan(&relay.plan_id)?; if plan.amount <= 0 { return Ok(None); } @@ -175,23 +172,19 @@ impl Billing { tenant: &Tenant, activity: &Activity, ) -> Result> { - let Some(new_plan_id) = activity.plan_id.as_deref() else { - return Err(anyhow!("activity plan was not a valid plan")); + let new_plan_id = match &*activity.snapshot { + Snapshot::Relay { plan, .. } => plan, }; let Some(old_plan_id) = query::get_relay_plan_before(&activity.resource_id, activity.created_at).await? else { return Err(anyhow!("no previous plan found for relay update activity")); }; - if old_plan_id == new_plan_id { + if &old_plan_id == new_plan_id { return Ok(None); } - let Some(new_plan) = query::get_plan(new_plan_id) else { - return Err(anyhow!("new plan is an invalid plan")); - }; - let Some(old_plan) = query::get_plan(&old_plan_id) else { - return Err(anyhow!("old plan is an invalid plan")); - }; + let new_plan = query::get_plan(new_plan_id)?; + let old_plan = query::get_plan(&old_plan_id)?; let period = BillingPeriod::at(tenant, activity.created_at) .ok_or_else(|| anyhow!("billing anchor must be set before building prorated items"))?; @@ -216,23 +209,27 @@ impl Billing { } /// Charge a full-period renewal for every relay that was active on a paid plan - /// as of `period.start`, reconstructing that state from the activity log - /// (status from create/activate/deactivate, plan from create/update). - /// Idempotent per period via the tenant's `renewed_at` marker, so calling it - /// on every generation can't renew twice; a relay created/activated *within* - /// the period isn't active before the boundary, so it's covered by its own - /// prorated charge instead. + /// as of `period.start`, reading that state from each relay's most recent + /// activity snapshot before the boundary (relays with no prior activity didn't + /// exist yet and are skipped). Idempotent per period via the tenant's + /// `renewed_at` marker, so calling it on every generation can't renew twice; + /// a relay created/activated *within* the period isn't active before the + /// boundary, so it's covered by its own prorated charge instead. async fn reconcile_renewal(&self, tenant: &Tenant, period: &BillingPeriod) -> Result<()> { - let activities = query::list_relay_activity_before(&tenant.pubkey, period.start).await?; + let relays = query::list_relays_for_tenant(&tenant.pubkey).await?; let mut line_items = Vec::new(); - for (relay_id, state) in relay_states(&activities) { - if !state.active { - continue; - } - let Some(plan) = state.plan_id.and_then(|id| query::get_plan(&id)) else { + for relay in relays { + let Some(activity) = + query::get_latest_relay_activity_before(&relay.id, period.start).await? + else { continue; }; + let Snapshot::Relay { plan: plan_id, status, .. } = &*activity.snapshot; + if status != RELAY_STATUS_ACTIVE { + continue; + } + let plan = query::get_plan(plan_id)?; if plan.amount <= 0 { continue; } @@ -241,7 +238,7 @@ impl Billing { invoice_id: None, activity_id: None, tenant_pubkey: tenant.pubkey.clone(), - relay_id, + relay_id: relay.id, plan_id: plan.id, amount: plan.amount, description: "Subscription renewal".to_string(), @@ -476,42 +473,6 @@ impl BillingPeriod { } } -/// A relay's billing-relevant state at a point in time, reconstructed by folding -/// its activity log. -#[derive(Default)] -struct RelayState { - active: bool, - plan_id: Option, -} - -/// Fold relay activities (which must be oldest-first) into each relay's -/// `(active, plan)` state. `create`/`activate`/`deactivate` drive status; -/// `create`/`update` carry the plan via `plan_id`. Feed it activities up to a -/// cutoff to get each relay's state as of that moment (e.g. the period boundary). -fn relay_states(activities: &[Activity]) -> HashMap { - let mut states: HashMap = HashMap::new(); - - for activity in activities { - let state = states.entry(activity.resource_id.clone()).or_default(); - match activity.activity_type.as_str() { - "create_relay" => { - state.active = true; - state.plan_id = activity.plan_id.clone(); - } - "update_relay" => { - if activity.plan_id.is_some() { - state.plan_id = activity.plan_id.clone(); - } - } - "activate_relay" => state.active = true, - "deactivate_relay" => state.active = false, - _ => {} - } - } - - states -} - fn summarize_error_message(error: &str) -> Option { let normalized = error.split_whitespace().collect::>().join(" "); if normalized.is_empty() { diff --git a/backend/src/command.rs b/backend/src/command.rs index be35fbb..baa090c 100644 --- a/backend/src/command.rs +++ b/backend/src/command.rs @@ -1,45 +1,36 @@ use anyhow::Result; +use sqlx::types::Json; use sqlx::{Sqlite, Transaction}; use crate::billing::BillingPeriod; use crate::db::{pool, publish, with_tx}; use crate::models::{ Activity, Bolt11, Invoice, InvoiceItem, RELAY_STATUS_ACTIVE, RELAY_STATUS_DELINQUENT, - RELAY_STATUS_INACTIVE, Relay, Tenant, + RELAY_STATUS_INACTIVE, Relay, Snapshot, Tenant, }; // --- Tenants --- pub async fn create_tenant(tenant: &Tenant) -> Result<()> { - let activity = with_tx(async |tx| { - sqlx::query( - "INSERT INTO tenant (pubkey, nwc_url, created_at, stripe_customer_id) - VALUES (?, ?, ?, ?)", - ) - .bind(&tenant.pubkey) - .bind(&tenant.nwc_url) - .bind(tenant.created_at) - .bind(&tenant.stripe_customer_id) - .execute(&mut **tx) - .await?; - insert_activity_tx(tx, "create_tenant", "tenant", &tenant.pubkey, None).await - }) + sqlx::query( + "INSERT INTO tenant (pubkey, nwc_url, created_at, stripe_customer_id) + VALUES (?, ?, ?, ?)", + ) + .bind(&tenant.pubkey) + .bind(&tenant.nwc_url) + .bind(tenant.created_at) + .bind(&tenant.stripe_customer_id) + .execute(pool()) .await?; - publish(activity); Ok(()) } pub async fn update_tenant(tenant: &Tenant) -> Result<()> { - let activity = with_tx(async |tx| { - sqlx::query("UPDATE tenant SET nwc_url = ? WHERE pubkey = ?") - .bind(&tenant.nwc_url) - .bind(&tenant.pubkey) - .execute(&mut **tx) - .await?; - insert_activity_tx(tx, "update_tenant", "tenant", &tenant.pubkey, None).await - }) - .await?; - publish(activity); + sqlx::query("UPDATE tenant SET nwc_url = ? WHERE pubkey = ?") + .bind(&tenant.nwc_url) + .bind(&tenant.pubkey) + .execute(pool()) + .await?; Ok(()) } @@ -90,7 +81,11 @@ pub async fn create_relay(relay: &Relay) -> Result<()> { .bind(relay.push_enabled) .execute(&mut **tx) .await?; - insert_activity_tx(tx, "create_relay", "relay", &relay.id, Some(&relay.plan_id)).await + let snapshot = Snapshot::Relay { + plan: relay.plan_id.clone(), + status: RELAY_STATUS_ACTIVE.to_string(), + }; + insert_activity_tx(tx, "create_relay", &relay.id, snapshot).await }) .await?; publish(activity); @@ -126,7 +121,11 @@ pub async fn update_relay(relay: &Relay) -> Result<()> { .bind(&relay.id) .execute(&mut **tx) .await?; - insert_activity_tx(tx, "update_relay", "relay", &relay.id, Some(&relay.plan_id)).await + let snapshot = Snapshot::Relay { + plan: relay.plan_id.clone(), + status: relay.status.clone(), + }; + insert_activity_tx(tx, "update_relay", &relay.id, snapshot).await }) .await?; publish(activity); @@ -134,26 +133,30 @@ pub async fn update_relay(relay: &Relay) -> Result<()> { } pub async fn activate_relay(relay: &Relay) -> Result<()> { - set_relay_status(&relay.id, RELAY_STATUS_ACTIVE, "activate_relay").await + set_relay_status(relay, RELAY_STATUS_ACTIVE, "activate_relay").await } pub async fn deactivate_relay(relay: &Relay) -> Result<()> { - set_relay_status(&relay.id, RELAY_STATUS_INACTIVE, "deactivate_relay").await + set_relay_status(relay, RELAY_STATUS_INACTIVE, "deactivate_relay").await } #[allow(dead_code)] // wired up by the delinquency flow (not yet implemented) pub async fn mark_relay_delinquent(relay: &Relay) -> Result<()> { - set_relay_status(&relay.id, RELAY_STATUS_DELINQUENT, "mark_relay_delinquent").await + set_relay_status(relay, RELAY_STATUS_DELINQUENT, "mark_relay_delinquent").await } -async fn set_relay_status(relay_id: &str, status: &str, activity_type: &str) -> Result<()> { +async fn set_relay_status(relay: &Relay, status: &str, activity_type: &str) -> Result<()> { let activity = with_tx(async |tx| { sqlx::query("UPDATE relay SET status = ?, synced = 0 WHERE id = ?") .bind(status) - .bind(relay_id) + .bind(&relay.id) .execute(&mut **tx) .await?; - insert_activity_tx(tx, activity_type, "relay", relay_id, None).await + let snapshot = Snapshot::Relay { + plan: relay.plan_id.clone(), + status: status.to_string(), + }; + insert_activity_tx(tx, activity_type, &relay.id, snapshot).await }) .await?; publish(activity); @@ -167,20 +170,28 @@ pub async fn fail_relay_sync(relay: &Relay, sync_error: String) -> Result<()> { .bind(&relay.id) .execute(&mut **tx) .await?; - insert_activity_tx(tx, "fail_relay_sync", "relay", &relay.id, None).await + let snapshot = Snapshot::Relay { + plan: relay.plan_id.clone(), + status: relay.status.clone(), + }; + insert_activity_tx(tx, "fail_relay_sync", &relay.id, snapshot).await }) .await?; publish(activity); Ok(()) } -pub async fn complete_relay_sync(relay_id: &str) -> Result<()> { +pub async fn complete_relay_sync(relay: &Relay) -> Result<()> { let activity = with_tx(async |tx| { sqlx::query("UPDATE relay SET synced = 1, sync_error = '' WHERE id = ?") - .bind(relay_id) + .bind(&relay.id) .execute(&mut **tx) .await?; - insert_activity_tx(tx, "complete_relay_sync", "relay", relay_id, None).await + let snapshot = Snapshot::Relay { + plan: relay.plan_id.clone(), + status: relay.status.clone(), + }; + insert_activity_tx(tx, "complete_relay_sync", &relay.id, snapshot).await }) .await?; publish(activity); @@ -297,17 +308,12 @@ pub async fn create_invoice(tenant: &Tenant, period: &BillingPeriod) -> Result Result<()> { let updated_at = chrono::Utc::now().timestamp(); - let activity = with_tx(async |tx| { - sqlx::query("UPDATE invoice SET status = 'paid', method = ?, updated_at = ? WHERE id = ?") - .bind(method) - .bind(updated_at) - .bind(invoice_id) - .execute(&mut **tx) - .await?; - insert_activity_tx(tx, "invoice_paid", "invoice", invoice_id, None).await - }) - .await?; - publish(activity); + sqlx::query("UPDATE invoice SET status = 'paid', method = ?, updated_at = ? WHERE id = ?") + .bind(method) + .bind(updated_at) + .bind(invoice_id) + .execute(pool()) + .await?; Ok(()) } @@ -372,26 +378,25 @@ pub async fn insert_intent(intent_id: &str, invoice_id: &str) -> Result<()> { async fn insert_activity_tx( tx: &mut Transaction<'_, Sqlite>, activity_type: &str, - resource_type: &str, resource_id: &str, - plan_id: Option<&str>, + snapshot: Snapshot, ) -> Result { - let tenant_pubkey = match resource_type { - "tenant" => resource_id.to_string(), - "relay" => { + let resource_type = snapshot.resource_type(); + let tenant_pubkey = match &snapshot { + Snapshot::Relay { .. } => { sqlx::query_scalar::<_, String>("SELECT tenant_pubkey FROM relay WHERE id = ?") .bind(resource_id) .fetch_one(&mut **tx) .await? } - _ => anyhow::bail!("unknown resource_type: {resource_type}"), }; let id = uuid::Uuid::new_v4().to_string(); let created_at = chrono::Utc::now().timestamp(); + let snapshot = Json(snapshot); sqlx::query( - "INSERT INTO activity (id, tenant_pubkey, created_at, activity_type, resource_type, resource_id, plan_id) + "INSERT INTO activity (id, tenant_pubkey, created_at, activity_type, resource_type, resource_id, snapshot) VALUES (?, ?, ?, ?, ?, ?, ?)", ) .bind(&id) @@ -400,7 +405,7 @@ async fn insert_activity_tx( .bind(activity_type) .bind(resource_type) .bind(resource_id) - .bind(plan_id) + .bind(&snapshot) .execute(&mut **tx) .await?; @@ -412,7 +417,7 @@ async fn insert_activity_tx( resource_type: resource_type.to_string(), resource_id: resource_id.to_string(), billed_at: None, - plan_id: plan_id.map(str::to_string), + snapshot, }) } diff --git a/backend/src/infra.rs b/backend/src/infra.rs index f79d762..d534362 100644 --- a/backend/src/infra.rs +++ b/backend/src/infra.rs @@ -148,7 +148,7 @@ async fn sync_relay(relay: &Relay) { match try_sync_relay(relay).await { Ok(()) => { tracing::info!(relay = %relay.id, "relay sync succeeded"); - if let Err(e) = command::complete_relay_sync(&relay.id).await { + if let Err(e) = command::complete_relay_sync(relay).await { tracing::error!(relay = %relay.id, error = %e, "failed to mark sync complete"); } } diff --git a/backend/src/models.rs b/backend/src/models.rs index 6e6f19f..00cbd55 100644 --- a/backend/src/models.rs +++ b/backend/src/models.rs @@ -1,9 +1,28 @@ use serde::{Deserialize, Serialize}; +use sqlx::types::Json; pub const RELAY_STATUS_ACTIVE: &str = "active"; pub const RELAY_STATUS_INACTIVE: &str = "inactive"; pub const RELAY_STATUS_DELINQUENT: &str = "delinquent"; +/// Per-resource_type snapshot of a resource's state captured on each activity, +/// stored as JSON in `activity.snapshot`. Tagged on `resource_type` so the JSON +/// is self-describing and the variant matches the activity row's column. Add a +/// variant per resource type that needs state preserved on the activity log. +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(tag = "resource_type", rename_all = "snake_case")] +pub enum Snapshot { + Relay { plan: String, status: String }, +} + +impl Snapshot { + pub fn resource_type(&self) -> &'static str { + match self { + Self::Relay { .. } => "relay", + } + } +} + #[derive(Debug, Clone, Serialize, Deserialize)] pub struct Plan { pub id: String, @@ -36,9 +55,7 @@ pub struct Activity { pub resource_type: String, pub resource_id: String, pub billed_at: Option, - /// The relay's plan at the time of a `create_relay`/`update_relay` activity; - /// `None` for all other activity types. - pub plan_id: Option, + pub snapshot: Json, } #[derive(Debug, Clone, Serialize, Deserialize, sqlx::FromRow)] diff --git a/backend/src/query.rs b/backend/src/query.rs index 8262b66..fd961af 100644 --- a/backend/src/query.rs +++ b/backend/src/query.rs @@ -1,4 +1,4 @@ -use anyhow::Result; +use anyhow::{Result, anyhow}; use crate::models::{Activity, Bolt11, Invoice, InvoiceItem, Plan, Relay, Tenant}; use crate::db::pool; @@ -46,8 +46,11 @@ pub fn list_plans() -> Vec { ] } -pub fn get_plan(plan_id: &str) -> Option { - list_plans().into_iter().find(|p| p.id == plan_id) +pub fn get_plan(plan_id: &str) -> Result { + list_plans() + .into_iter() + .find(|p| p.id == plan_id) + .ok_or_else(|| anyhow!("plan not found: {plan_id}")) } // --- Tenants --- @@ -95,15 +98,14 @@ pub async fn get_relay(id: &str) -> Result> { .await?) } -/// The relay's plan immediately before `before`, read from the activity log -/// (the most recent `create_relay`/`update_relay` with `created_at < before`). -/// Billing uses this as the `old` side of a plan-change delta. +/// The relay's plan immediately before `before`, read from the most recent +/// relay-activity snapshot with `created_at < before`. Billing uses this as +/// the `old` side of a plan-change delta. pub async fn get_relay_plan_before(relay_id: &str, before: i64) -> Result> { Ok(sqlx::query_scalar::<_, String>( - "SELECT plan_id FROM activity + "SELECT json_extract(snapshot, '$.plan') FROM activity WHERE resource_id = ? AND resource_type = 'relay' - AND plan_id IS NOT NULL AND created_at < ? ORDER BY created_at DESC LIMIT 1", @@ -188,26 +190,25 @@ pub async fn list_billable_activity_for_tenant(tenant_pubkey: &str) -> Result Result> { +) -> Result> { Ok(sqlx::query_as::<_, Activity>(&select_activity( - "WHERE tenant_pubkey = ? + "WHERE resource_id = ? AND resource_type = 'relay' - AND activity_type IN ( - 'create_relay', 'update_relay', 'activate_relay', 'deactivate_relay' - ) AND created_at < ? - ORDER BY created_at ASC", + ORDER BY created_at DESC + LIMIT 1", )) - .bind(tenant_pubkey) + .bind(relay_id) .bind(before) - .fetch_all(pool()) + .fetch_optional(pool()) .await?) } diff --git a/backend/src/routes/plans.rs b/backend/src/routes/plans.rs index a5d9a6e..0461b46 100644 --- a/backend/src/routes/plans.rs +++ b/backend/src/routes/plans.rs @@ -11,8 +11,6 @@ pub async fn list_plans(State(_api): State>) -> ApiResult { } pub async fn get_plan(State(_api): State>, Path(id): Path) -> ApiResult { - match query::get_plan(&id) { - Some(plan) => ok(plan), - None => Err(not_found("plan not found")), - } + let plan = query::get_plan(&id).map_err(|_| not_found("plan not found"))?; + ok(plan) } diff --git a/backend/src/routes/relays.rs b/backend/src/routes/relays.rs index 1c66e6e..ad032fb 100644 --- a/backend/src/routes/relays.rs +++ b/backend/src/routes/relays.rs @@ -194,8 +194,7 @@ pub async fn update_relay( .is_some_and(|requested| requested != current_plan); if plan_changed { - let selected_plan = - query::get_plan(&relay.plan_id).expect("validated plan must exist"); + let selected_plan = query::get_plan(&relay.plan_id).map_err(internal)?; if let Some(limit) = selected_plan.members { let current_members = fetch_relay_members(&relay) .await @@ -288,7 +287,7 @@ fn prepare_relay(mut relay: Relay) -> Result { } let plan = query::get_plan(&relay.plan_id) - .ok_or_else(|| unprocessable("invalid-plan", "plan not found"))?; + .map_err(|_| unprocessable("invalid-plan", "plan not found"))?; if (!plan.blossom && relay.blossom_enabled == 1) || (!plan.livekit && relay.livekit_enabled == 1) { return Err(unprocessable("premium-feature", "feature requires a paid plan"));