Review pass

This commit is contained in:
Jon Staab
2026-03-25 17:01:52 -07:00
parent 6f407fd681
commit 28e564e795
10 changed files with 70 additions and 154 deletions
+1 -1
View File
@@ -20,7 +20,7 @@ Calls `self.tick` in a loop every hour.
Iterates over `repo.list_activity` since last run and does the following: Iterates over `repo.list_activity` since last run and does the following:
- For any `relay_created|relay_updated|relay_activated` activity if this is the first non-free relay for the tenant, update tenant's billing anchor to the time the relay was created. - For any `create_relay|update_relay|activate_relay` activity if this is the first non-free relay for the tenant, update tenant's billing anchor to the time the relay was created.
Also iterates over `repo.list_tenants()` and for each tenant calls `self.generate_invoice_if_due(tenant)` and `self.collect_outstanding(tenant)`. Also iterates over `repo.list_tenants()` and for each tenant calls `self.generate_invoice_if_due(tenant)` and `self.collect_outstanding(tenant)`.
+2 -2
View File
@@ -19,6 +19,6 @@ Calls `self.tick` in a loop every 10 seconds.
Iterates over `repo.list_activity` since last run and does the following: Iterates over `repo.list_activity` since last run and does the following:
- For any `relay_created|relay_updated` activity, sync relay config to zooid. - For any `create_relay|update_relay` activity, sync relay config to zooid.
- For any `relay_deactivated` activity, sync relay config to zooid. - For any `deactivate_relay` activity, sync relay config to zooid.
- If unsuccessful, call `repo.fail_relay_sync`. - If unsuccessful, call `repo.fail_relay_sync`.
-1
View File
@@ -2,7 +2,6 @@
- Configures logging - Configures logging
- Creates instances of `Repo`, `Robot`, `Billing`, `Api`, and `Infra` - Creates instances of `Repo`, `Robot`, `Billing`, `Api`, and `Infra`
- Calls `repo.migrate`
- Spawns `billing.start` - Spawns `billing.start`
- Spawns `infra.start` - Spawns `infra.start`
- Calls `api.serve` - Calls `api.serve`
+17 -13
View File
@@ -1,5 +1,8 @@
This file describes the domain model. This description should be translated into standard structs and sqlite schemas in a way that makes sense. This file describes the domain model. This description should be translated into standard structs and sqlite schemas in a way that makes sense.
- Fields marked as private should use `#[serde(skip_serializing)]` in their definition.
- Fields marked as readonly should use `#[serde(skip_deserializing)]` in their definition.
# Activity # Activity
Activity is an audit log of all actions performed by a user or a worker process. This allows us to trace history to create invoices, synchronize actions to external services, and debug system behavior. Activity is an audit log of all actions performed by a user or a worker process. This allows us to trace history to create invoices, synchronize actions to external services, and debug system behavior.
@@ -7,18 +10,19 @@ Activity is an audit log of all actions performed by a user or a worker process.
- `id` - a random activity ID - `id` - a random activity ID
- `created_at` - unix timestamp when the activity was created - `created_at` - unix timestamp when the activity was created
- `activity_type` is one of: - `activity_type` is one of:
- `tenant_created` - `create_tenant`
- `tenant_billing_anchor_updated` - `update_tenant_billing_anchor`
- `relay_created` - `update_tenant_nwc_url`
- `relay_updated` - `create_relay`
- `relay_activated` - `update_relay`
- `relay_deactivated` - `activate_relay`
- `relay_sync_failed` - `deactivate_relay`
- `invoice_created` - `fail_relay_sync`
- `invoice_paid` - `create_invoice`
- `invoice_attempted` - `mark_invoice_paid`
- `invoice_sent` - `mark_invoice_attempted`
- `invoice_closed` - `mark_invoice_sent`
- `mark_invoice_closed`
- `identifier` is a string identifying the resource being modified. This id in interpreted depending on what the `activity_type` is. - `identifier` is a string identifying the resource being modified. This id in interpreted depending on what the `activity_type` is.
# Tenant # Tenant
@@ -26,7 +30,7 @@ Activity is an audit log of all actions performed by a user or a worker process.
Tenants are customers of the service, identified by a nostr `pubkey`. Public metadata like name etc are pulled from the nostr network. They also have associated billing information. Tenants are customers of the service, identified by a nostr `pubkey`. Public metadata like name etc are pulled from the nostr network. They also have associated billing information.
- `pubkey` is the nostr public key identifying the tenant - `pubkey` is the nostr public key identifying the tenant
- `nwc_url` a nostr wallet connect URL used for **paying** invoices generated by the system - `nwc_url` (private) a nostr wallet connect URL used for **paying** invoices generated by the system
- `created_at` unix timestamp identifying tenant creation time - `created_at` unix timestamp identifying tenant creation time
- `billing_anchor` unix timestamp identifying billing cycle anchor. This gets reset when the tenant has no paid relays and adds (or reactivates) one. - `billing_anchor` unix timestamp identifying billing cycle anchor. This gets reset when the tenant has no paid relays and adds (or reactivates) one.
+22 -15
View File
@@ -17,11 +17,13 @@ Notes:
- Reads `DATABASE_URL` from environment - Reads `DATABASE_URL` from environment
- Ensures that any directories referred to in `DATABASE_URL` exist - Ensures that any directories referred to in `DATABASE_URL` exist
- Initializes its sqlx `pool` - Initializes its sqlx `pool`
## `pub fn migrate(&self) -> Result<()>`
- Runs migrations found in the `migrations` directory. - Runs migrations found in the `migrations` directory.
## `fn insert_activity(activity_type, identifier) -> Result<()>`
- Private helper that inserts one row into `activity`
- Used by write methods to avoid repeating audit-log SQL
## `pub fn list_tenants(&self) -> Result<Vec<Tenant>>` ## `pub fn list_tenants(&self) -> Result<Vec<Tenant>>`
- Returns all tenants - Returns all tenants
@@ -33,12 +35,17 @@ Notes:
## `pub fn create_tenant(&self, tenant: &Tenant) -> Result<()>` ## `pub fn create_tenant(&self, tenant: &Tenant) -> Result<()>`
- Creates tenant, may throw sqlite uniqueness error on pubkey - Creates tenant, may throw sqlite uniqueness error on pubkey
- Logs activity as `(tenant_created, tenant_id)` - Logs activity as `(create_tenant, tenant_id)`
## `pub fn update_tenant_billing_anchor(&self, pubkey: &str, billing_anchor: i64) -> Result<()>` ## `pub fn update_tenant_billing_anchor(&self, pubkey: &str, billing_anchor: i64) -> Result<()>`
- Updates the tenant's `billing_anchor` - Updates the tenant's `billing_anchor`
- Logs activity as `(tenant_billing_anchor_updated, tenant_id)` - Logs activity as `(update_tenant_billing_anchor, tenant_id)`
## `pub fn update_tenant_nwc_url(&self, pubkey: &str, nwc_url: &str) -> Result<()>`
- Updates tenant `nwc_url`
- Logs activity as `(update_tenant_nwc_url, tenant_id)`
## `pub fn list_relays(&self, tenant_id: Option<&str>) -> Result<Vec<Relay>>` ## `pub fn list_relays(&self, tenant_id: Option<&str>) -> Result<Vec<Relay>>`
@@ -52,32 +59,32 @@ Notes:
- Creates relay, may throw sqlite uniqueness error on subdomain - Creates relay, may throw sqlite uniqueness error on subdomain
- Sets relay status to `new` - Sets relay status to `new`
- Logs activity as `(relay_created, relay_id)` - Logs activity as `(create_relay, relay_id)`
## `pub fn update_relay(&self, relay: &Relay) -> Result<()>` ## `pub fn update_relay(&self, relay: &Relay) -> Result<()>`
- Updates relay, may throw sqlite uniqueness error on subdomain - Updates relay, may throw sqlite uniqueness error on subdomain
- Logs activity as `(relay_updated, relay_id)` - Logs activity as `(update_relay, relay_id)`
## `pub fn deactivate_relay(&self, relay: &Relay) -> Result<()>` ## `pub fn deactivate_relay(&self, relay: &Relay) -> Result<()>`
- Sets relay status to `inactive` - Sets relay status to `inactive`
- Logs activity as `(relay_deactivated, relay_id)` - Logs activity as `(deactivate_relay, relay_id)`
## `pub fn activate_relay(&self, relay: &Relay) -> Result<()>` ## `pub fn activate_relay(&self, relay: &Relay) -> Result<()>`
- Sets relay status to `active` - Sets relay status to `active`
- Logs activity as `(relay_activated, relay_id)` - Logs activity as `(activate_relay, relay_id)`
## `pub fn fail_relay_sync(&self, relay: &Relay, sync_error: String) -> Result<()>` ## `pub fn fail_relay_sync(&self, relay: &Relay, sync_error: String) -> Result<()>`
- Sets relay status to `inactive`, sets `sync_error` - Sets relay status to `inactive`, sets `sync_error`
- Logs activity as `(relay_sync_failed, relay_id)` - Logs activity as `(fail_relay_sync, relay_id)`
## `pub fn create_invoice(&self, invoice: &Invoice, invoice_items: [&InvoiceItem]) -> Result<()>` ## `pub fn create_invoice(&self, invoice: &Invoice, invoice_items: [&InvoiceItem]) -> Result<()>`
- Saves an `invoice` row and related `invoice_item` rows - Saves an `invoice` row and related `invoice_item` rows
- Logs activity as `(invoice_created, invoice_id)` - Logs activity as `(create_invoice, invoice_id)`
## `pub fn list_invoices(tenant_id: Option<&str>) -> Result<Vec<Invoice>>` ## `pub fn list_invoices(tenant_id: Option<&str>) -> Result<Vec<Invoice>>`
@@ -88,26 +95,26 @@ Notes:
- Sets invoice status to `paid` - Sets invoice status to `paid`
- Sets `paid_at` to now - Sets `paid_at` to now
- Clears `error` if set - Clears `error` if set
- Logs activity as `(invoice_paid, invoice_id)` - Logs activity as `(mark_invoice_paid, invoice_id)`
## `pub fn mark_invoice_attempted(&self, invoice_id: &str, error: Option<&str>) -> Result<()>` ## `pub fn mark_invoice_attempted(&self, invoice_id: &str, error: Option<&str>) -> Result<()>`
- Sets `attempted_at` to now - Sets `attempted_at` to now
- Updates `error` if provided - Updates `error` if provided
- Leaves status as `pending` - Leaves status as `pending`
- Logs activity as `(invoice_attempted, invoice_id)` - Logs activity as `(mark_invoice_attempted, invoice_id)`
## `pub fn mark_invoice_sent(&self, invoice_id: &str) -> Result<()>` ## `pub fn mark_invoice_sent(&self, invoice_id: &str) -> Result<()>`
- Sets `sent_at` to now - Sets `sent_at` to now
- Leaves status as `pending` - Leaves status as `pending`
- Logs activity as `(invoice_sent, invoice_id)` - Logs activity as `(mark_invoice_sent, invoice_id)`
## `pub fn mark_invoice_closed(&self, invoice_id: &str) -> Result<()>` ## `pub fn mark_invoice_closed(&self, invoice_id: &str) -> Result<()>`
- Sets invoice status to `closed` - Sets invoice status to `closed`
- Sets `closed_at` to now - Sets `closed_at` to now
- Logs activity as `(invoice_closed, invoice_id)` - Logs activity as `(mark_invoice_closed, invoice_id)`
## `pub fn list_activity(&self, since: &i64, tenant: Option<&str>) -> Result<Vec<Activity>>` ## `pub fn list_activity(&self, since: &i64, tenant: Option<&str>) -> Result<Vec<Activity>>`
+3 -3
View File
@@ -42,7 +42,7 @@ impl Billing {
let since = *since_guard; let since = *since_guard;
let activity = self.repo.list_activity(&since, None).await?; let activity = self.repo.list_activity(&since, None).await?;
for a in &activity { for a in &activity {
if matches!(a.activity_type.as_str(), "relay_created" | "relay_updated" | "relay_activated") { if matches!(a.activity_type.as_str(), "create_relay" | "update_relay" | "activate_relay") {
self.maybe_reset_anchor_for_first_paid_relay(a).await?; self.maybe_reset_anchor_for_first_paid_relay(a).await?;
} }
*since_guard = (*since_guard).max(a.created_at); *since_guard = (*since_guard).max(a.created_at);
@@ -345,13 +345,13 @@ fn relay_active_hours_in_window(
} }
match event.activity_type.as_str() { match event.activity_type.as_str() {
"relay_created" | "relay_activated" => { "create_relay" | "activate_relay" => {
if !active { if !active {
active = true; active = true;
cursor = ts; cursor = ts;
} }
} }
"relay_deactivated" | "relay_sync_failed" => { "deactivate_relay" | "fail_relay_sync" => {
if active { if active {
active = false; active = false;
secs += (ts - cursor).num_seconds().max(0); secs += (ts - cursor).num_seconds().max(0);
+1 -1
View File
@@ -50,7 +50,7 @@ impl Infra {
for a in activity { for a in activity {
if matches!( if matches!(
a.activity_type.as_str(), a.activity_type.as_str(),
"relay_created" | "relay_updated" | "relay_deactivated" "create_relay" | "update_relay" | "deactivate_relay"
) { ) {
let Some(relay) = self.repo.get_relay(&a.identifier).await? else { let Some(relay) = self.repo.get_relay(&a.identifier).await? else {
continue; continue;
-1
View File
@@ -24,7 +24,6 @@ async fn main() -> Result<()> {
.init(); .init();
let repo = Repo::new().await?; let repo = Repo::new().await?;
repo.migrate().await?;
let robot = Robot::new().await?; let robot = Robot::new().await?;
let billing = Billing::new(repo.clone(), robot.clone()); let billing = Billing::new(repo.clone(), robot.clone());
let infra = Infra::new(repo.clone()); let infra = Infra::new(repo.clone());
+1
View File
@@ -11,6 +11,7 @@ pub struct Activity {
#[derive(Debug, Clone, Serialize, Deserialize, sqlx::FromRow)] #[derive(Debug, Clone, Serialize, Deserialize, sqlx::FromRow)]
pub struct Tenant { pub struct Tenant {
pub pubkey: String, pub pubkey: String,
#[serde(skip_serializing)]
pub nwc_url: String, pub nwc_url: String,
pub created_at: i64, pub created_at: i64,
pub billing_anchor: i64, pub billing_anchor: i64,
+23 -117
View File
@@ -1,7 +1,7 @@
use std::path::Path; use std::path::Path;
use anyhow::Result; use anyhow::Result;
use sqlx::{SqlitePool, sqlite::SqlitePoolOptions}; use sqlx::{Sqlite, SqlitePool, Transaction, sqlite::SqlitePoolOptions};
use crate::models::{Activity, Invoice, InvoiceItem, Relay, Tenant}; use crate::models::{Activity, Invoice, InvoiceItem, Relay, Tenant};
@@ -33,15 +33,16 @@ impl Repo {
.execute(&pool) .execute(&pool)
.await?; .await?;
sqlx::migrate!("./migrations").run(&pool).await?;
Ok(Self { pool }) Ok(Self { pool })
} }
pub async fn migrate(&self) -> Result<()> { async fn insert_activity(
sqlx::migrate!("./migrations").run(&self.pool).await?; tx: &mut Transaction<'_, Sqlite>,
Ok(()) activity_type: &str,
} identifier: &str,
) -> Result<()> {
async fn log_activity(&self, activity_type: &str, identifier: &str) -> Result<()> {
sqlx::query( sqlx::query(
"INSERT INTO activity (id, created_at, activity_type, identifier) "INSERT INTO activity (id, created_at, activity_type, identifier)
VALUES (?, strftime('%s','now'), ?, ?)", VALUES (?, strftime('%s','now'), ?, ?)",
@@ -49,7 +50,7 @@ impl Repo {
.bind(uuid::Uuid::new_v4().to_string()) .bind(uuid::Uuid::new_v4().to_string())
.bind(activity_type) .bind(activity_type)
.bind(identifier) .bind(identifier)
.execute(&self.pool) .execute(&mut **tx)
.await?; .await?;
Ok(()) Ok(())
} }
@@ -91,14 +92,7 @@ impl Repo {
.execute(&mut *tx) .execute(&mut *tx)
.await?; .await?;
sqlx::query( Self::insert_activity(&mut tx, "create_tenant", &tenant.pubkey).await?;
"INSERT INTO activity (id, created_at, activity_type, identifier)
VALUES (?, strftime('%s','now'), 'tenant_created', ?)",
)
.bind(uuid::Uuid::new_v4().to_string())
.bind(&tenant.pubkey)
.execute(&mut *tx)
.await?;
tx.commit().await?; tx.commit().await?;
Ok(()) Ok(())
@@ -113,14 +107,7 @@ impl Repo {
.execute(&mut *tx) .execute(&mut *tx)
.await?; .await?;
sqlx::query( Self::insert_activity(&mut tx, "update_tenant_billing_anchor", pubkey).await?;
"INSERT INTO activity (id, created_at, activity_type, identifier)
VALUES (?, strftime('%s','now'), 'tenant_billing_anchor_updated', ?)",
)
.bind(uuid::Uuid::new_v4().to_string())
.bind(pubkey)
.execute(&mut *tx)
.await?;
tx.commit().await?; tx.commit().await?;
Ok(()) Ok(())
@@ -135,14 +122,7 @@ impl Repo {
.execute(&mut *tx) .execute(&mut *tx)
.await?; .await?;
sqlx::query( Self::insert_activity(&mut tx, "update_tenant_nwc_url", pubkey).await?;
"INSERT INTO activity (id, created_at, activity_type, identifier)
VALUES (?, strftime('%s','now'), 'tenant_billing_updated', ?)",
)
.bind(uuid::Uuid::new_v4().to_string())
.bind(pubkey)
.execute(&mut *tx)
.await?;
tx.commit().await?; tx.commit().await?;
Ok(()) Ok(())
@@ -226,14 +206,7 @@ impl Repo {
.execute(&mut *tx) .execute(&mut *tx)
.await?; .await?;
sqlx::query( Self::insert_activity(&mut tx, "create_relay", &relay.id).await?;
"INSERT INTO activity (id, created_at, activity_type, identifier)
VALUES (?, strftime('%s','now'), 'relay_created', ?)",
)
.bind(uuid::Uuid::new_v4().to_string())
.bind(&relay.id)
.execute(&mut *tx)
.await?;
tx.commit().await?; tx.commit().await?;
Ok(()) Ok(())
@@ -271,14 +244,7 @@ impl Repo {
.execute(&mut *tx) .execute(&mut *tx)
.await?; .await?;
sqlx::query( Self::insert_activity(&mut tx, "update_relay", &relay.id).await?;
"INSERT INTO activity (id, created_at, activity_type, identifier)
VALUES (?, strftime('%s','now'), 'relay_updated', ?)",
)
.bind(uuid::Uuid::new_v4().to_string())
.bind(&relay.id)
.execute(&mut *tx)
.await?;
tx.commit().await?; tx.commit().await?;
Ok(()) Ok(())
@@ -292,14 +258,7 @@ impl Repo {
.execute(&mut *tx) .execute(&mut *tx)
.await?; .await?;
sqlx::query( Self::insert_activity(&mut tx, "deactivate_relay", &relay.id).await?;
"INSERT INTO activity (id, created_at, activity_type, identifier)
VALUES (?, strftime('%s','now'), 'relay_deactivated', ?)",
)
.bind(uuid::Uuid::new_v4().to_string())
.bind(&relay.id)
.execute(&mut *tx)
.await?;
tx.commit().await?; tx.commit().await?;
Ok(()) Ok(())
@@ -313,14 +272,7 @@ impl Repo {
.execute(&mut *tx) .execute(&mut *tx)
.await?; .await?;
sqlx::query( Self::insert_activity(&mut tx, "activate_relay", &relay.id).await?;
"INSERT INTO activity (id, created_at, activity_type, identifier)
VALUES (?, strftime('%s','now'), 'relay_activated', ?)",
)
.bind(uuid::Uuid::new_v4().to_string())
.bind(&relay.id)
.execute(&mut *tx)
.await?;
tx.commit().await?; tx.commit().await?;
Ok(()) Ok(())
@@ -335,14 +287,7 @@ impl Repo {
.execute(&mut *tx) .execute(&mut *tx)
.await?; .await?;
sqlx::query( Self::insert_activity(&mut tx, "fail_relay_sync", &relay.id).await?;
"INSERT INTO activity (id, created_at, activity_type, identifier)
VALUES (?, strftime('%s','now'), 'relay_sync_failed', ?)",
)
.bind(uuid::Uuid::new_v4().to_string())
.bind(&relay.id)
.execute(&mut *tx)
.await?;
tx.commit().await?; tx.commit().await?;
Ok(()) Ok(())
@@ -382,14 +327,7 @@ impl Repo {
.await?; .await?;
} }
sqlx::query( Self::insert_activity(&mut tx, "create_invoice", &invoice.id).await?;
"INSERT INTO activity (id, created_at, activity_type, identifier)
VALUES (?, strftime('%s','now'), 'invoice_created', ?)",
)
.bind(uuid::Uuid::new_v4().to_string())
.bind(&invoice.id)
.execute(&mut *tx)
.await?;
tx.commit().await?; tx.commit().await?;
Ok(()) Ok(())
@@ -432,14 +370,7 @@ impl Repo {
.execute(&mut *tx) .execute(&mut *tx)
.await?; .await?;
sqlx::query( Self::insert_activity(&mut tx, "mark_invoice_paid", invoice_id).await?;
"INSERT INTO activity (id, created_at, activity_type, identifier)
VALUES (?, strftime('%s','now'), 'invoice_paid', ?)",
)
.bind(uuid::Uuid::new_v4().to_string())
.bind(invoice_id)
.execute(&mut *tx)
.await?;
tx.commit().await?; tx.commit().await?;
Ok(()) Ok(())
@@ -458,14 +389,7 @@ impl Repo {
.execute(&mut *tx) .execute(&mut *tx)
.await?; .await?;
sqlx::query( Self::insert_activity(&mut tx, "mark_invoice_attempted", invoice_id).await?;
"INSERT INTO activity (id, created_at, activity_type, identifier)
VALUES (?, strftime('%s','now'), 'invoice_attempted', ?)",
)
.bind(uuid::Uuid::new_v4().to_string())
.bind(invoice_id)
.execute(&mut *tx)
.await?;
tx.commit().await?; tx.commit().await?;
Ok(()) Ok(())
@@ -479,14 +403,7 @@ impl Repo {
.execute(&mut *tx) .execute(&mut *tx)
.await?; .await?;
sqlx::query( Self::insert_activity(&mut tx, "mark_invoice_sent", invoice_id).await?;
"INSERT INTO activity (id, created_at, activity_type, identifier)
VALUES (?, strftime('%s','now'), 'invoice_sent', ?)",
)
.bind(uuid::Uuid::new_v4().to_string())
.bind(invoice_id)
.execute(&mut *tx)
.await?;
tx.commit().await?; tx.commit().await?;
Ok(()) Ok(())
@@ -504,14 +421,7 @@ impl Repo {
.execute(&mut *tx) .execute(&mut *tx)
.await?; .await?;
sqlx::query( Self::insert_activity(&mut tx, "mark_invoice_closed", invoice_id).await?;
"INSERT INTO activity (id, created_at, activity_type, identifier)
VALUES (?, strftime('%s','now'), 'invoice_closed', ?)",
)
.bind(uuid::Uuid::new_v4().to_string())
.bind(invoice_id)
.execute(&mut *tx)
.await?;
tx.commit().await?; tx.commit().await?;
Ok(()) Ok(())
@@ -524,7 +434,7 @@ impl Repo {
FROM activity a FROM activity a
WHERE a.created_at > ? WHERE a.created_at > ?
AND ( AND (
a.activity_type IN ('tenant_created', 'tenant_billing_anchor_updated') a.activity_type IN ('create_tenant', 'update_tenant_billing_anchor')
AND a.identifier = ? AND a.identifier = ?
OR EXISTS ( OR EXISTS (
SELECT 1 FROM relay r SELECT 1 FROM relay r
@@ -601,8 +511,4 @@ impl Repo {
Ok(sats) Ok(sats)
} }
#[allow(dead_code)]
async fn _log_activity_public(&self, activity_type: &str, identifier: &str) -> Result<()> {
self.log_activity(activity_type, identifier).await
}
} }