From b796665e314750d629919390b364f8ee1dc6cbf2 Mon Sep 17 00:00:00 2001 From: Jon Staab Date: Thu, 26 Mar 2026 08:17:40 -0700 Subject: [PATCH] Add tenant to activity table, include resource type --- backend/migrations/0001_init.sql | 4 +- backend/spec/models.md | 4 +- backend/spec/repo.md | 5 +- backend/src/billing.rs | 4 +- backend/src/infra.rs | 12 +++-- backend/src/models.rs | 4 +- backend/src/repo.rs | 78 +++++++++++++++++--------------- 7 files changed, 63 insertions(+), 48 deletions(-) diff --git a/backend/migrations/0001_init.sql b/backend/migrations/0001_init.sql index 87e5f2b..2acf311 100644 --- a/backend/migrations/0001_init.sql +++ b/backend/migrations/0001_init.sql @@ -1,8 +1,10 @@ CREATE TABLE IF NOT EXISTS activity ( id TEXT PRIMARY KEY, + tenant TEXT NOT NULL, created_at INTEGER NOT NULL, activity_type TEXT NOT NULL, - identifier TEXT NOT NULL + resource_type TEXT NOT NULL, + resource_id TEXT NOT NULL ); CREATE TABLE IF NOT EXISTS tenant ( diff --git a/backend/spec/models.md b/backend/spec/models.md index e4e7758..b561840 100644 --- a/backend/spec/models.md +++ b/backend/spec/models.md @@ -8,6 +8,7 @@ This file describes the domain model. This description should be translated into Activity is an audit log of all actions performed by a user or a worker process. This allows us to trace history to create invoices, synchronize actions to external services, and debug system behavior. - `id` - a random activity ID +- `tenant` - a tenant ID - `created_at` - unix timestamp when the activity was created - `activity_type` is one of: - `create_tenant` @@ -23,7 +24,8 @@ Activity is an audit log of all actions performed by a user or a worker process. - `mark_invoice_attempted` - `mark_invoice_sent` - `mark_invoice_closed` -- `identifier` is a string identifying the resource being modified. This id in interpreted depending on what the `activity_type` is. +- `resource_type` is a string identifying the resource type being modified. +- `resource_id` is a string identifying the resource id being modified. # Tenant diff --git a/backend/spec/repo.md b/backend/spec/repo.md index 6484c2b..e2a3d85 100644 --- a/backend/spec/repo.md +++ b/backend/spec/repo.md @@ -9,7 +9,7 @@ Members: Notes: - All public write methods should be run in a transaction so they're atomic -- All writes should be accompanied by an activity log entry of `(activity_type, identifier)` +- All writes should be accompanied by an activity log entry of `(tenant, activity_type, resource_type, resource_id)` - Database table names are singular: `activity`, `tenant`, `relay`, `invoice`, `invoice_item` ## `pub fn new() -> Self` @@ -19,9 +19,10 @@ Notes: - Initializes its sqlx `pool` - Runs migrations found in the `migrations` directory. -## `fn insert_activity(activity_type, identifier) -> Result<()>` +## `fn insert_activity(activity_type, resource_type, resource_id) -> Result<()>` - Private helper that inserts one row into `activity` +- Infers `tenant` from `resource_type` and `resource_id` - Used by write methods to avoid repeating audit-log SQL ## `pub fn list_tenants(&self) -> Result>` diff --git a/backend/src/billing.rs b/backend/src/billing.rs index d7ff3b8..fcf67f0 100644 --- a/backend/src/billing.rs +++ b/backend/src/billing.rs @@ -59,7 +59,7 @@ impl Billing { } async fn maybe_reset_anchor_for_first_paid_relay(&self, activity: &Activity) -> Result<()> { - let relay = match self.repo.get_relay(&activity.identifier).await? { + let relay = match self.repo.get_relay(&activity.resource_id).await? { Some(r) => r, None => return Ok(()), }; @@ -319,7 +319,7 @@ fn relay_active_hours_in_window( let mut marks: HashMap<&str, Vec<&Activity>> = HashMap::new(); for event in events { - if event.identifier == relay.id { + if event.resource_type == "relay" && event.resource_id == relay.id { marks.entry(&relay.id).or_default().push(event); } } diff --git a/backend/src/infra.rs b/backend/src/infra.rs index 58d4c89..1cda24f 100644 --- a/backend/src/infra.rs +++ b/backend/src/infra.rs @@ -48,11 +48,13 @@ impl Infra { let activity = self.repo.list_activity(&since, None).await?; for a in activity { - if matches!( - a.activity_type.as_str(), - "create_relay" | "update_relay" | "deactivate_relay" - ) { - let Some(relay) = self.repo.get_relay(&a.identifier).await? else { + if a.resource_type == "relay" + && matches!( + a.activity_type.as_str(), + "create_relay" | "update_relay" | "deactivate_relay" + ) + { + let Some(relay) = self.repo.get_relay(&a.resource_id).await? else { continue; }; diff --git a/backend/src/models.rs b/backend/src/models.rs index a47ef9c..34c2e94 100644 --- a/backend/src/models.rs +++ b/backend/src/models.rs @@ -3,9 +3,11 @@ use serde::{Deserialize, Serialize}; #[derive(Debug, Clone, Serialize, Deserialize, sqlx::FromRow)] pub struct Activity { pub id: String, + pub tenant: String, pub created_at: i64, pub activity_type: String, - pub identifier: String, + pub resource_type: String, + pub resource_id: String, } #[derive(Debug, Clone, Serialize, Deserialize, sqlx::FromRow)] diff --git a/backend/src/repo.rs b/backend/src/repo.rs index c2ad246..93de9cb 100644 --- a/backend/src/repo.rs +++ b/backend/src/repo.rs @@ -41,15 +41,35 @@ impl Repo { async fn insert_activity( tx: &mut Transaction<'_, Sqlite>, activity_type: &str, - identifier: &str, + resource_type: &str, + resource_id: &str, ) -> Result<()> { + let tenant = match resource_type { + "tenant" => resource_id.to_string(), + "relay" => { + sqlx::query_scalar::<_, String>("SELECT tenant FROM relay WHERE id = ?") + .bind(resource_id) + .fetch_one(&mut **tx) + .await? + } + "invoice" => { + sqlx::query_scalar::<_, String>("SELECT tenant FROM invoice WHERE id = ?") + .bind(resource_id) + .fetch_one(&mut **tx) + .await? + } + _ => anyhow::bail!("unknown resource_type: {}", resource_type), + }; + sqlx::query( - "INSERT INTO activity (id, created_at, activity_type, identifier) - VALUES (?, strftime('%s','now'), ?, ?)", + "INSERT INTO activity (id, tenant, created_at, activity_type, resource_type, resource_id) + VALUES (?, ?, strftime('%s','now'), ?, ?, ?)", ) .bind(uuid::Uuid::new_v4().to_string()) + .bind(tenant) .bind(activity_type) - .bind(identifier) + .bind(resource_type) + .bind(resource_id) .execute(&mut **tx) .await?; Ok(()) @@ -92,7 +112,7 @@ impl Repo { .execute(&mut *tx) .await?; - Self::insert_activity(&mut tx, "create_tenant", &tenant.pubkey).await?; + Self::insert_activity(&mut tx, "create_tenant", "tenant", &tenant.pubkey).await?; tx.commit().await?; Ok(()) @@ -107,7 +127,7 @@ impl Repo { .execute(&mut *tx) .await?; - Self::insert_activity(&mut tx, "update_tenant_billing_anchor", pubkey).await?; + Self::insert_activity(&mut tx, "update_tenant_billing_anchor", "tenant", pubkey).await?; tx.commit().await?; Ok(()) @@ -122,7 +142,7 @@ impl Repo { .execute(&mut *tx) .await?; - Self::insert_activity(&mut tx, "update_tenant_nwc_url", pubkey).await?; + Self::insert_activity(&mut tx, "update_tenant_nwc_url", "tenant", pubkey).await?; tx.commit().await?; Ok(()) @@ -206,7 +226,7 @@ impl Repo { .execute(&mut *tx) .await?; - Self::insert_activity(&mut tx, "create_relay", &relay.id).await?; + Self::insert_activity(&mut tx, "create_relay", "relay", &relay.id).await?; tx.commit().await?; Ok(()) @@ -244,7 +264,7 @@ impl Repo { .execute(&mut *tx) .await?; - Self::insert_activity(&mut tx, "update_relay", &relay.id).await?; + Self::insert_activity(&mut tx, "update_relay", "relay", &relay.id).await?; tx.commit().await?; Ok(()) @@ -258,7 +278,7 @@ impl Repo { .execute(&mut *tx) .await?; - Self::insert_activity(&mut tx, "deactivate_relay", &relay.id).await?; + Self::insert_activity(&mut tx, "deactivate_relay", "relay", &relay.id).await?; tx.commit().await?; Ok(()) @@ -272,7 +292,7 @@ impl Repo { .execute(&mut *tx) .await?; - Self::insert_activity(&mut tx, "activate_relay", &relay.id).await?; + Self::insert_activity(&mut tx, "activate_relay", "relay", &relay.id).await?; tx.commit().await?; Ok(()) @@ -287,7 +307,7 @@ impl Repo { .execute(&mut *tx) .await?; - Self::insert_activity(&mut tx, "fail_relay_sync", &relay.id).await?; + Self::insert_activity(&mut tx, "fail_relay_sync", "relay", &relay.id).await?; tx.commit().await?; Ok(()) @@ -327,7 +347,7 @@ impl Repo { .await?; } - Self::insert_activity(&mut tx, "create_invoice", &invoice.id).await?; + Self::insert_activity(&mut tx, "create_invoice", "invoice", &invoice.id).await?; tx.commit().await?; Ok(()) @@ -370,7 +390,7 @@ impl Repo { .execute(&mut *tx) .await?; - Self::insert_activity(&mut tx, "mark_invoice_paid", invoice_id).await?; + Self::insert_activity(&mut tx, "mark_invoice_paid", "invoice", invoice_id).await?; tx.commit().await?; Ok(()) @@ -389,7 +409,7 @@ impl Repo { .execute(&mut *tx) .await?; - Self::insert_activity(&mut tx, "mark_invoice_attempted", invoice_id).await?; + Self::insert_activity(&mut tx, "mark_invoice_attempted", "invoice", invoice_id).await?; tx.commit().await?; Ok(()) @@ -403,7 +423,7 @@ impl Repo { .execute(&mut *tx) .await?; - Self::insert_activity(&mut tx, "mark_invoice_sent", invoice_id).await?; + Self::insert_activity(&mut tx, "mark_invoice_sent", "invoice", invoice_id).await?; tx.commit().await?; Ok(()) @@ -421,7 +441,7 @@ impl Repo { .execute(&mut *tx) .await?; - Self::insert_activity(&mut tx, "mark_invoice_closed", invoice_id).await?; + Self::insert_activity(&mut tx, "mark_invoice_closed", "invoice", invoice_id).await?; tx.commit().await?; Ok(()) @@ -430,32 +450,18 @@ impl Repo { pub async fn list_activity(&self, since: &i64, tenant: Option<&str>) -> Result> { let rows = if let Some(tenant_pubkey) = tenant { sqlx::query_as::<_, Activity>( - "SELECT a.id, a.created_at, a.activity_type, a.identifier - FROM activity a - WHERE a.created_at > ? - AND ( - a.activity_type IN ('create_tenant', 'update_tenant_billing_anchor') - AND a.identifier = ? - OR EXISTS ( - SELECT 1 FROM relay r - WHERE r.id = a.identifier AND r.tenant = ? - ) - OR EXISTS ( - SELECT 1 FROM invoice i - WHERE i.id = a.identifier AND i.tenant = ? - ) - ) - ORDER BY a.created_at, a.id", + "SELECT id, tenant, created_at, activity_type, resource_type, resource_id + FROM activity + WHERE created_at > ? AND tenant = ? + ORDER BY created_at, id", ) .bind(since) .bind(tenant_pubkey) - .bind(tenant_pubkey) - .bind(tenant_pubkey) .fetch_all(&self.pool) .await? } else { sqlx::query_as::<_, Activity>( - "SELECT id, created_at, activity_type, identifier + "SELECT id, tenant, created_at, activity_type, resource_type, resource_id FROM activity WHERE created_at > ? ORDER BY created_at, id",