Add snapshots to activity

This commit is contained in:
Jon Staab
2026-05-28 15:53:02 -07:00
parent eb0123abef
commit f7bd3e53fe
8 changed files with 133 additions and 152 deletions
+24 -63
View File
@@ -1,11 +1,10 @@
use anyhow::{Result, anyhow};
use std::collections::HashMap;
use std::time::Duration;
use crate::bitcoin;
use crate::command;
use crate::env;
use crate::models::{Activity, Bolt11, Invoice, InvoiceItem, Tenant};
use crate::models::{Activity, Bolt11, Invoice, InvoiceItem, RELAY_STATUS_ACTIVE, Snapshot, Tenant};
use crate::query;
use crate::robot::Robot;
use crate::stripe::Stripe;
@@ -141,9 +140,7 @@ impl Billing {
let Some(relay) = query::get_relay(&activity.resource_id).await? else {
return Err(anyhow!("activity resource was not a valid relay"));
};
let Some(plan) = query::get_plan(&relay.plan_id) else {
return Err(anyhow!("activity plan was not a valid plan"));
};
let plan = query::get_plan(&relay.plan_id)?;
if plan.amount <= 0 {
return Ok(None);
}
@@ -175,23 +172,19 @@ impl Billing {
tenant: &Tenant,
activity: &Activity,
) -> Result<Option<InvoiceItem>> {
let Some(new_plan_id) = activity.plan_id.as_deref() else {
return Err(anyhow!("activity plan was not a valid plan"));
let new_plan_id = match &*activity.snapshot {
Snapshot::Relay { plan, .. } => plan,
};
let Some(old_plan_id) =
query::get_relay_plan_before(&activity.resource_id, activity.created_at).await?
else {
return Err(anyhow!("no previous plan found for relay update activity"));
};
if old_plan_id == new_plan_id {
if &old_plan_id == new_plan_id {
return Ok(None);
}
let Some(new_plan) = query::get_plan(new_plan_id) else {
return Err(anyhow!("new plan is an invalid plan"));
};
let Some(old_plan) = query::get_plan(&old_plan_id) else {
return Err(anyhow!("old plan is an invalid plan"));
};
let new_plan = query::get_plan(new_plan_id)?;
let old_plan = query::get_plan(&old_plan_id)?;
let period = BillingPeriod::at(tenant, activity.created_at)
.ok_or_else(|| anyhow!("billing anchor must be set before building prorated items"))?;
@@ -216,23 +209,27 @@ impl Billing {
}
/// Charge a full-period renewal for every relay that was active on a paid plan
/// as of `period.start`, reconstructing that state from the activity log
/// (status from create/activate/deactivate, plan from create/update).
/// Idempotent per period via the tenant's `renewed_at` marker, so calling it
/// on every generation can't renew twice; a relay created/activated *within*
/// the period isn't active before the boundary, so it's covered by its own
/// prorated charge instead.
/// as of `period.start`, reading that state from each relay's most recent
/// activity snapshot before the boundary (relays with no prior activity didn't
/// exist yet and are skipped). Idempotent per period via the tenant's
/// `renewed_at` marker, so calling it on every generation can't renew twice;
/// a relay created/activated *within* the period isn't active before the
/// boundary, so it's covered by its own prorated charge instead.
async fn reconcile_renewal(&self, tenant: &Tenant, period: &BillingPeriod) -> Result<()> {
let activities = query::list_relay_activity_before(&tenant.pubkey, period.start).await?;
let relays = query::list_relays_for_tenant(&tenant.pubkey).await?;
let mut line_items = Vec::new();
for (relay_id, state) in relay_states(&activities) {
if !state.active {
continue;
}
let Some(plan) = state.plan_id.and_then(|id| query::get_plan(&id)) else {
for relay in relays {
let Some(activity) =
query::get_latest_relay_activity_before(&relay.id, period.start).await?
else {
continue;
};
let Snapshot::Relay { plan: plan_id, status, .. } = &*activity.snapshot;
if status != RELAY_STATUS_ACTIVE {
continue;
}
let plan = query::get_plan(plan_id)?;
if plan.amount <= 0 {
continue;
}
@@ -241,7 +238,7 @@ impl Billing {
invoice_id: None,
activity_id: None,
tenant_pubkey: tenant.pubkey.clone(),
relay_id,
relay_id: relay.id,
plan_id: plan.id,
amount: plan.amount,
description: "Subscription renewal".to_string(),
@@ -476,42 +473,6 @@ impl BillingPeriod {
}
}
/// A relay's billing-relevant state at a point in time, reconstructed by folding
/// its activity log.
#[derive(Default)]
struct RelayState {
active: bool,
plan_id: Option<String>,
}
/// Fold relay activities (which must be oldest-first) into each relay's
/// `(active, plan)` state. `create`/`activate`/`deactivate` drive status;
/// `create`/`update` carry the plan via `plan_id`. Feed it activities up to a
/// cutoff to get each relay's state as of that moment (e.g. the period boundary).
fn relay_states(activities: &[Activity]) -> HashMap<String, RelayState> {
let mut states: HashMap<String, RelayState> = HashMap::new();
for activity in activities {
let state = states.entry(activity.resource_id.clone()).or_default();
match activity.activity_type.as_str() {
"create_relay" => {
state.active = true;
state.plan_id = activity.plan_id.clone();
}
"update_relay" => {
if activity.plan_id.is_some() {
state.plan_id = activity.plan_id.clone();
}
}
"activate_relay" => state.active = true,
"deactivate_relay" => state.active = false,
_ => {}
}
}
states
}
fn summarize_error_message(error: &str) -> Option<String> {
let normalized = error.split_whitespace().collect::<Vec<_>>().join(" ");
if normalized.is_empty() {
+62 -57
View File
@@ -1,45 +1,36 @@
use anyhow::Result;
use sqlx::types::Json;
use sqlx::{Sqlite, Transaction};
use crate::billing::BillingPeriod;
use crate::db::{pool, publish, with_tx};
use crate::models::{
Activity, Bolt11, Invoice, InvoiceItem, RELAY_STATUS_ACTIVE, RELAY_STATUS_DELINQUENT,
RELAY_STATUS_INACTIVE, Relay, Tenant,
RELAY_STATUS_INACTIVE, Relay, Snapshot, Tenant,
};
// --- Tenants ---
pub async fn create_tenant(tenant: &Tenant) -> Result<()> {
let activity = with_tx(async |tx| {
sqlx::query(
"INSERT INTO tenant (pubkey, nwc_url, created_at, stripe_customer_id)
VALUES (?, ?, ?, ?)",
)
.bind(&tenant.pubkey)
.bind(&tenant.nwc_url)
.bind(tenant.created_at)
.bind(&tenant.stripe_customer_id)
.execute(&mut **tx)
.await?;
insert_activity_tx(tx, "create_tenant", "tenant", &tenant.pubkey, None).await
})
sqlx::query(
"INSERT INTO tenant (pubkey, nwc_url, created_at, stripe_customer_id)
VALUES (?, ?, ?, ?)",
)
.bind(&tenant.pubkey)
.bind(&tenant.nwc_url)
.bind(tenant.created_at)
.bind(&tenant.stripe_customer_id)
.execute(pool())
.await?;
publish(activity);
Ok(())
}
pub async fn update_tenant(tenant: &Tenant) -> Result<()> {
let activity = with_tx(async |tx| {
sqlx::query("UPDATE tenant SET nwc_url = ? WHERE pubkey = ?")
.bind(&tenant.nwc_url)
.bind(&tenant.pubkey)
.execute(&mut **tx)
.await?;
insert_activity_tx(tx, "update_tenant", "tenant", &tenant.pubkey, None).await
})
.await?;
publish(activity);
sqlx::query("UPDATE tenant SET nwc_url = ? WHERE pubkey = ?")
.bind(&tenant.nwc_url)
.bind(&tenant.pubkey)
.execute(pool())
.await?;
Ok(())
}
@@ -90,7 +81,11 @@ pub async fn create_relay(relay: &Relay) -> Result<()> {
.bind(relay.push_enabled)
.execute(&mut **tx)
.await?;
insert_activity_tx(tx, "create_relay", "relay", &relay.id, Some(&relay.plan_id)).await
let snapshot = Snapshot::Relay {
plan: relay.plan_id.clone(),
status: RELAY_STATUS_ACTIVE.to_string(),
};
insert_activity_tx(tx, "create_relay", &relay.id, snapshot).await
})
.await?;
publish(activity);
@@ -126,7 +121,11 @@ pub async fn update_relay(relay: &Relay) -> Result<()> {
.bind(&relay.id)
.execute(&mut **tx)
.await?;
insert_activity_tx(tx, "update_relay", "relay", &relay.id, Some(&relay.plan_id)).await
let snapshot = Snapshot::Relay {
plan: relay.plan_id.clone(),
status: relay.status.clone(),
};
insert_activity_tx(tx, "update_relay", &relay.id, snapshot).await
})
.await?;
publish(activity);
@@ -134,26 +133,30 @@ pub async fn update_relay(relay: &Relay) -> Result<()> {
}
pub async fn activate_relay(relay: &Relay) -> Result<()> {
set_relay_status(&relay.id, RELAY_STATUS_ACTIVE, "activate_relay").await
set_relay_status(relay, RELAY_STATUS_ACTIVE, "activate_relay").await
}
pub async fn deactivate_relay(relay: &Relay) -> Result<()> {
set_relay_status(&relay.id, RELAY_STATUS_INACTIVE, "deactivate_relay").await
set_relay_status(relay, RELAY_STATUS_INACTIVE, "deactivate_relay").await
}
#[allow(dead_code)] // wired up by the delinquency flow (not yet implemented)
pub async fn mark_relay_delinquent(relay: &Relay) -> Result<()> {
set_relay_status(&relay.id, RELAY_STATUS_DELINQUENT, "mark_relay_delinquent").await
set_relay_status(relay, RELAY_STATUS_DELINQUENT, "mark_relay_delinquent").await
}
async fn set_relay_status(relay_id: &str, status: &str, activity_type: &str) -> Result<()> {
async fn set_relay_status(relay: &Relay, status: &str, activity_type: &str) -> Result<()> {
let activity = with_tx(async |tx| {
sqlx::query("UPDATE relay SET status = ?, synced = 0 WHERE id = ?")
.bind(status)
.bind(relay_id)
.bind(&relay.id)
.execute(&mut **tx)
.await?;
insert_activity_tx(tx, activity_type, "relay", relay_id, None).await
let snapshot = Snapshot::Relay {
plan: relay.plan_id.clone(),
status: status.to_string(),
};
insert_activity_tx(tx, activity_type, &relay.id, snapshot).await
})
.await?;
publish(activity);
@@ -167,20 +170,28 @@ pub async fn fail_relay_sync(relay: &Relay, sync_error: String) -> Result<()> {
.bind(&relay.id)
.execute(&mut **tx)
.await?;
insert_activity_tx(tx, "fail_relay_sync", "relay", &relay.id, None).await
let snapshot = Snapshot::Relay {
plan: relay.plan_id.clone(),
status: relay.status.clone(),
};
insert_activity_tx(tx, "fail_relay_sync", &relay.id, snapshot).await
})
.await?;
publish(activity);
Ok(())
}
pub async fn complete_relay_sync(relay_id: &str) -> Result<()> {
pub async fn complete_relay_sync(relay: &Relay) -> Result<()> {
let activity = with_tx(async |tx| {
sqlx::query("UPDATE relay SET synced = 1, sync_error = '' WHERE id = ?")
.bind(relay_id)
.bind(&relay.id)
.execute(&mut **tx)
.await?;
insert_activity_tx(tx, "complete_relay_sync", "relay", relay_id, None).await
let snapshot = Snapshot::Relay {
plan: relay.plan_id.clone(),
status: relay.status.clone(),
};
insert_activity_tx(tx, "complete_relay_sync", &relay.id, snapshot).await
})
.await?;
publish(activity);
@@ -297,17 +308,12 @@ pub async fn create_invoice(tenant: &Tenant, period: &BillingPeriod) -> Result<O
pub async fn mark_invoice_paid(invoice_id: &str, method: &str) -> Result<()> {
let updated_at = chrono::Utc::now().timestamp();
let activity = with_tx(async |tx| {
sqlx::query("UPDATE invoice SET status = 'paid', method = ?, updated_at = ? WHERE id = ?")
.bind(method)
.bind(updated_at)
.bind(invoice_id)
.execute(&mut **tx)
.await?;
insert_activity_tx(tx, "invoice_paid", "invoice", invoice_id, None).await
})
.await?;
publish(activity);
sqlx::query("UPDATE invoice SET status = 'paid', method = ?, updated_at = ? WHERE id = ?")
.bind(method)
.bind(updated_at)
.bind(invoice_id)
.execute(pool())
.await?;
Ok(())
}
@@ -372,26 +378,25 @@ pub async fn insert_intent(intent_id: &str, invoice_id: &str) -> Result<()> {
async fn insert_activity_tx(
tx: &mut Transaction<'_, Sqlite>,
activity_type: &str,
resource_type: &str,
resource_id: &str,
plan_id: Option<&str>,
snapshot: Snapshot,
) -> Result<Activity> {
let tenant_pubkey = match resource_type {
"tenant" => resource_id.to_string(),
"relay" => {
let resource_type = snapshot.resource_type();
let tenant_pubkey = match &snapshot {
Snapshot::Relay { .. } => {
sqlx::query_scalar::<_, String>("SELECT tenant_pubkey FROM relay WHERE id = ?")
.bind(resource_id)
.fetch_one(&mut **tx)
.await?
}
_ => anyhow::bail!("unknown resource_type: {resource_type}"),
};
let id = uuid::Uuid::new_v4().to_string();
let created_at = chrono::Utc::now().timestamp();
let snapshot = Json(snapshot);
sqlx::query(
"INSERT INTO activity (id, tenant_pubkey, created_at, activity_type, resource_type, resource_id, plan_id)
"INSERT INTO activity (id, tenant_pubkey, created_at, activity_type, resource_type, resource_id, snapshot)
VALUES (?, ?, ?, ?, ?, ?, ?)",
)
.bind(&id)
@@ -400,7 +405,7 @@ async fn insert_activity_tx(
.bind(activity_type)
.bind(resource_type)
.bind(resource_id)
.bind(plan_id)
.bind(&snapshot)
.execute(&mut **tx)
.await?;
@@ -412,7 +417,7 @@ async fn insert_activity_tx(
resource_type: resource_type.to_string(),
resource_id: resource_id.to_string(),
billed_at: None,
plan_id: plan_id.map(str::to_string),
snapshot,
})
}
+1 -1
View File
@@ -148,7 +148,7 @@ async fn sync_relay(relay: &Relay) {
match try_sync_relay(relay).await {
Ok(()) => {
tracing::info!(relay = %relay.id, "relay sync succeeded");
if let Err(e) = command::complete_relay_sync(&relay.id).await {
if let Err(e) = command::complete_relay_sync(relay).await {
tracing::error!(relay = %relay.id, error = %e, "failed to mark sync complete");
}
}
+20 -3
View File
@@ -1,9 +1,28 @@
use serde::{Deserialize, Serialize};
use sqlx::types::Json;
pub const RELAY_STATUS_ACTIVE: &str = "active";
pub const RELAY_STATUS_INACTIVE: &str = "inactive";
pub const RELAY_STATUS_DELINQUENT: &str = "delinquent";
/// Per-resource_type snapshot of a resource's state captured on each activity,
/// stored as JSON in `activity.snapshot`. Tagged on `resource_type` so the JSON
/// is self-describing and the variant matches the activity row's column. Add a
/// variant per resource type that needs state preserved on the activity log.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "resource_type", rename_all = "snake_case")]
pub enum Snapshot {
Relay { plan: String, status: String },
}
impl Snapshot {
pub fn resource_type(&self) -> &'static str {
match self {
Self::Relay { .. } => "relay",
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Plan {
pub id: String,
@@ -36,9 +55,7 @@ pub struct Activity {
pub resource_type: String,
pub resource_id: String,
pub billed_at: Option<i64>,
/// The relay's plan at the time of a `create_relay`/`update_relay` activity;
/// `None` for all other activity types.
pub plan_id: Option<String>,
pub snapshot: Json<Snapshot>,
}
#[derive(Debug, Clone, Serialize, Deserialize, sqlx::FromRow)]
+21 -20
View File
@@ -1,4 +1,4 @@
use anyhow::Result;
use anyhow::{Result, anyhow};
use crate::models::{Activity, Bolt11, Invoice, InvoiceItem, Plan, Relay, Tenant};
use crate::db::pool;
@@ -46,8 +46,11 @@ pub fn list_plans() -> Vec<Plan> {
]
}
pub fn get_plan(plan_id: &str) -> Option<Plan> {
list_plans().into_iter().find(|p| p.id == plan_id)
pub fn get_plan(plan_id: &str) -> Result<Plan> {
list_plans()
.into_iter()
.find(|p| p.id == plan_id)
.ok_or_else(|| anyhow!("plan not found: {plan_id}"))
}
// --- Tenants ---
@@ -95,15 +98,14 @@ pub async fn get_relay(id: &str) -> Result<Option<Relay>> {
.await?)
}
/// The relay's plan immediately before `before`, read from the activity log
/// (the most recent `create_relay`/`update_relay` with `created_at < before`).
/// Billing uses this as the `old` side of a plan-change delta.
/// The relay's plan immediately before `before`, read from the most recent
/// relay-activity snapshot with `created_at < before`. Billing uses this as
/// the `old` side of a plan-change delta.
pub async fn get_relay_plan_before(relay_id: &str, before: i64) -> Result<Option<String>> {
Ok(sqlx::query_scalar::<_, String>(
"SELECT plan_id FROM activity
"SELECT json_extract(snapshot, '$.plan') FROM activity
WHERE resource_id = ?
AND resource_type = 'relay'
AND plan_id IS NOT NULL
AND created_at < ?
ORDER BY created_at DESC
LIMIT 1",
@@ -188,26 +190,25 @@ pub async fn list_billable_activity_for_tenant(tenant_pubkey: &str) -> Result<Ve
.await?)
}
/// A tenant's relay status/plan activity strictly before `before`, oldest-first
/// — folded by billing to reconstruct each relay's state as of a period boundary.
/// The relay's most recent activity strictly before `before`, or `None` if it
/// had no activity yet — i.e. the relay didn't exist at that point. Billing
/// reads its snapshot to recover the relay's state as of a period boundary.
/// Strict `<` so a relay created exactly at the boundary isn't counted active
/// there (its own creation charge covers that period).
pub async fn list_relay_activity_before(
tenant_pubkey: &str,
pub async fn get_latest_relay_activity_before(
relay_id: &str,
before: i64,
) -> Result<Vec<Activity>> {
) -> Result<Option<Activity>> {
Ok(sqlx::query_as::<_, Activity>(&select_activity(
"WHERE tenant_pubkey = ?
"WHERE resource_id = ?
AND resource_type = 'relay'
AND activity_type IN (
'create_relay', 'update_relay', 'activate_relay', 'deactivate_relay'
)
AND created_at < ?
ORDER BY created_at ASC",
ORDER BY created_at DESC
LIMIT 1",
))
.bind(tenant_pubkey)
.bind(relay_id)
.bind(before)
.fetch_all(pool())
.fetch_optional(pool())
.await?)
}
+2 -4
View File
@@ -11,8 +11,6 @@ pub async fn list_plans(State(_api): State<Arc<Api>>) -> ApiResult {
}
pub async fn get_plan(State(_api): State<Arc<Api>>, Path(id): Path<String>) -> ApiResult {
match query::get_plan(&id) {
Some(plan) => ok(plan),
None => Err(not_found("plan not found")),
}
let plan = query::get_plan(&id).map_err(|_| not_found("plan not found"))?;
ok(plan)
}
+2 -3
View File
@@ -194,8 +194,7 @@ pub async fn update_relay(
.is_some_and(|requested| requested != current_plan);
if plan_changed {
let selected_plan =
query::get_plan(&relay.plan_id).expect("validated plan must exist");
let selected_plan = query::get_plan(&relay.plan_id).map_err(internal)?;
if let Some(limit) = selected_plan.members {
let current_members = fetch_relay_members(&relay)
.await
@@ -288,7 +287,7 @@ fn prepare_relay(mut relay: Relay) -> Result<Relay, ApiError> {
}
let plan = query::get_plan(&relay.plan_id)
.ok_or_else(|| unprocessable("invalid-plan", "plan not found"))?;
.map_err(|_| unprocessable("invalid-plan", "plan not found"))?;
if (!plan.blossom && relay.blossom_enabled == 1) || (!plan.livekit && relay.livekit_enabled == 1) {
return Err(unprocessable("premium-feature", "feature requires a paid plan"));