Add tenant to activity table, include resource type

This commit is contained in:
Jon Staab
2026-03-26 08:17:40 -07:00
parent 28e564e795
commit b796665e31
7 changed files with 63 additions and 48 deletions
+3 -1
View File
@@ -1,8 +1,10 @@
CREATE TABLE IF NOT EXISTS activity (
id TEXT PRIMARY KEY,
tenant TEXT NOT NULL,
created_at INTEGER NOT NULL,
activity_type TEXT NOT NULL,
identifier TEXT NOT NULL
resource_type TEXT NOT NULL,
resource_id TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS tenant (
+3 -1
View File
@@ -8,6 +8,7 @@ This file describes the domain model. This description should be translated into
Activity is an audit log of all actions performed by a user or a worker process. This allows us to trace history to create invoices, synchronize actions to external services, and debug system behavior.
- `id` - a random activity ID
- `tenant` - a tenant ID
- `created_at` - unix timestamp when the activity was created
- `activity_type` is one of:
- `create_tenant`
@@ -23,7 +24,8 @@ Activity is an audit log of all actions performed by a user or a worker process.
- `mark_invoice_attempted`
- `mark_invoice_sent`
- `mark_invoice_closed`
- `identifier` is a string identifying the resource being modified. This id in interpreted depending on what the `activity_type` is.
- `resource_type` is a string identifying the resource type being modified.
- `resource_id` is a string identifying the resource id being modified.
# Tenant
+3 -2
View File
@@ -9,7 +9,7 @@ Members:
Notes:
- All public write methods should be run in a transaction so they're atomic
- All writes should be accompanied by an activity log entry of `(activity_type, identifier)`
- All writes should be accompanied by an activity log entry of `(tenant, activity_type, resource_type, resource_id)`
- Database table names are singular: `activity`, `tenant`, `relay`, `invoice`, `invoice_item`
## `pub fn new() -> Self`
@@ -19,9 +19,10 @@ Notes:
- Initializes its sqlx `pool`
- Runs migrations found in the `migrations` directory.
## `fn insert_activity(activity_type, identifier) -> Result<()>`
## `fn insert_activity(activity_type, resource_type, resource_id) -> Result<()>`
- Private helper that inserts one row into `activity`
- Infers `tenant` from `resource_type` and `resource_id`
- Used by write methods to avoid repeating audit-log SQL
## `pub fn list_tenants(&self) -> Result<Vec<Tenant>>`
+2 -2
View File
@@ -59,7 +59,7 @@ impl Billing {
}
async fn maybe_reset_anchor_for_first_paid_relay(&self, activity: &Activity) -> Result<()> {
let relay = match self.repo.get_relay(&activity.identifier).await? {
let relay = match self.repo.get_relay(&activity.resource_id).await? {
Some(r) => r,
None => return Ok(()),
};
@@ -319,7 +319,7 @@ fn relay_active_hours_in_window(
let mut marks: HashMap<&str, Vec<&Activity>> = HashMap::new();
for event in events {
if event.identifier == relay.id {
if event.resource_type == "relay" && event.resource_id == relay.id {
marks.entry(&relay.id).or_default().push(event);
}
}
+7 -5
View File
@@ -48,11 +48,13 @@ impl Infra {
let activity = self.repo.list_activity(&since, None).await?;
for a in activity {
if matches!(
a.activity_type.as_str(),
"create_relay" | "update_relay" | "deactivate_relay"
) {
let Some(relay) = self.repo.get_relay(&a.identifier).await? else {
if a.resource_type == "relay"
&& matches!(
a.activity_type.as_str(),
"create_relay" | "update_relay" | "deactivate_relay"
)
{
let Some(relay) = self.repo.get_relay(&a.resource_id).await? else {
continue;
};
+3 -1
View File
@@ -3,9 +3,11 @@ use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize, sqlx::FromRow)]
pub struct Activity {
pub id: String,
pub tenant: String,
pub created_at: i64,
pub activity_type: String,
pub identifier: String,
pub resource_type: String,
pub resource_id: String,
}
#[derive(Debug, Clone, Serialize, Deserialize, sqlx::FromRow)]
+42 -36
View File
@@ -41,15 +41,35 @@ impl Repo {
async fn insert_activity(
tx: &mut Transaction<'_, Sqlite>,
activity_type: &str,
identifier: &str,
resource_type: &str,
resource_id: &str,
) -> Result<()> {
let tenant = match resource_type {
"tenant" => resource_id.to_string(),
"relay" => {
sqlx::query_scalar::<_, String>("SELECT tenant FROM relay WHERE id = ?")
.bind(resource_id)
.fetch_one(&mut **tx)
.await?
}
"invoice" => {
sqlx::query_scalar::<_, String>("SELECT tenant FROM invoice WHERE id = ?")
.bind(resource_id)
.fetch_one(&mut **tx)
.await?
}
_ => anyhow::bail!("unknown resource_type: {}", resource_type),
};
sqlx::query(
"INSERT INTO activity (id, created_at, activity_type, identifier)
VALUES (?, strftime('%s','now'), ?, ?)",
"INSERT INTO activity (id, tenant, created_at, activity_type, resource_type, resource_id)
VALUES (?, ?, strftime('%s','now'), ?, ?, ?)",
)
.bind(uuid::Uuid::new_v4().to_string())
.bind(tenant)
.bind(activity_type)
.bind(identifier)
.bind(resource_type)
.bind(resource_id)
.execute(&mut **tx)
.await?;
Ok(())
@@ -92,7 +112,7 @@ impl Repo {
.execute(&mut *tx)
.await?;
Self::insert_activity(&mut tx, "create_tenant", &tenant.pubkey).await?;
Self::insert_activity(&mut tx, "create_tenant", "tenant", &tenant.pubkey).await?;
tx.commit().await?;
Ok(())
@@ -107,7 +127,7 @@ impl Repo {
.execute(&mut *tx)
.await?;
Self::insert_activity(&mut tx, "update_tenant_billing_anchor", pubkey).await?;
Self::insert_activity(&mut tx, "update_tenant_billing_anchor", "tenant", pubkey).await?;
tx.commit().await?;
Ok(())
@@ -122,7 +142,7 @@ impl Repo {
.execute(&mut *tx)
.await?;
Self::insert_activity(&mut tx, "update_tenant_nwc_url", pubkey).await?;
Self::insert_activity(&mut tx, "update_tenant_nwc_url", "tenant", pubkey).await?;
tx.commit().await?;
Ok(())
@@ -206,7 +226,7 @@ impl Repo {
.execute(&mut *tx)
.await?;
Self::insert_activity(&mut tx, "create_relay", &relay.id).await?;
Self::insert_activity(&mut tx, "create_relay", "relay", &relay.id).await?;
tx.commit().await?;
Ok(())
@@ -244,7 +264,7 @@ impl Repo {
.execute(&mut *tx)
.await?;
Self::insert_activity(&mut tx, "update_relay", &relay.id).await?;
Self::insert_activity(&mut tx, "update_relay", "relay", &relay.id).await?;
tx.commit().await?;
Ok(())
@@ -258,7 +278,7 @@ impl Repo {
.execute(&mut *tx)
.await?;
Self::insert_activity(&mut tx, "deactivate_relay", &relay.id).await?;
Self::insert_activity(&mut tx, "deactivate_relay", "relay", &relay.id).await?;
tx.commit().await?;
Ok(())
@@ -272,7 +292,7 @@ impl Repo {
.execute(&mut *tx)
.await?;
Self::insert_activity(&mut tx, "activate_relay", &relay.id).await?;
Self::insert_activity(&mut tx, "activate_relay", "relay", &relay.id).await?;
tx.commit().await?;
Ok(())
@@ -287,7 +307,7 @@ impl Repo {
.execute(&mut *tx)
.await?;
Self::insert_activity(&mut tx, "fail_relay_sync", &relay.id).await?;
Self::insert_activity(&mut tx, "fail_relay_sync", "relay", &relay.id).await?;
tx.commit().await?;
Ok(())
@@ -327,7 +347,7 @@ impl Repo {
.await?;
}
Self::insert_activity(&mut tx, "create_invoice", &invoice.id).await?;
Self::insert_activity(&mut tx, "create_invoice", "invoice", &invoice.id).await?;
tx.commit().await?;
Ok(())
@@ -370,7 +390,7 @@ impl Repo {
.execute(&mut *tx)
.await?;
Self::insert_activity(&mut tx, "mark_invoice_paid", invoice_id).await?;
Self::insert_activity(&mut tx, "mark_invoice_paid", "invoice", invoice_id).await?;
tx.commit().await?;
Ok(())
@@ -389,7 +409,7 @@ impl Repo {
.execute(&mut *tx)
.await?;
Self::insert_activity(&mut tx, "mark_invoice_attempted", invoice_id).await?;
Self::insert_activity(&mut tx, "mark_invoice_attempted", "invoice", invoice_id).await?;
tx.commit().await?;
Ok(())
@@ -403,7 +423,7 @@ impl Repo {
.execute(&mut *tx)
.await?;
Self::insert_activity(&mut tx, "mark_invoice_sent", invoice_id).await?;
Self::insert_activity(&mut tx, "mark_invoice_sent", "invoice", invoice_id).await?;
tx.commit().await?;
Ok(())
@@ -421,7 +441,7 @@ impl Repo {
.execute(&mut *tx)
.await?;
Self::insert_activity(&mut tx, "mark_invoice_closed", invoice_id).await?;
Self::insert_activity(&mut tx, "mark_invoice_closed", "invoice", invoice_id).await?;
tx.commit().await?;
Ok(())
@@ -430,32 +450,18 @@ impl Repo {
pub async fn list_activity(&self, since: &i64, tenant: Option<&str>) -> Result<Vec<Activity>> {
let rows = if let Some(tenant_pubkey) = tenant {
sqlx::query_as::<_, Activity>(
"SELECT a.id, a.created_at, a.activity_type, a.identifier
FROM activity a
WHERE a.created_at > ?
AND (
a.activity_type IN ('create_tenant', 'update_tenant_billing_anchor')
AND a.identifier = ?
OR EXISTS (
SELECT 1 FROM relay r
WHERE r.id = a.identifier AND r.tenant = ?
)
OR EXISTS (
SELECT 1 FROM invoice i
WHERE i.id = a.identifier AND i.tenant = ?
)
)
ORDER BY a.created_at, a.id",
"SELECT id, tenant, created_at, activity_type, resource_type, resource_id
FROM activity
WHERE created_at > ? AND tenant = ?
ORDER BY created_at, id",
)
.bind(since)
.bind(tenant_pubkey)
.bind(tenant_pubkey)
.bind(tenant_pubkey)
.fetch_all(&self.pool)
.await?
} else {
sqlx::query_as::<_, Activity>(
"SELECT id, created_at, activity_type, identifier
"SELECT id, tenant, created_at, activity_type, resource_type, resource_id
FROM activity
WHERE created_at > ?
ORDER BY created_at, id",