From 2d5eb0ca84016a06beec7f80c5adeaec3b5dcf88 Mon Sep 17 00:00:00 2001 From: Jon Staab Date: Tue, 19 May 2026 17:20:00 -0700 Subject: [PATCH] Refactor commands --- backend/src/billing.rs | 6 +- backend/src/bitcoin.rs | 4 +- backend/src/command.rs | 487 ++++++++++++++++++----------------------- backend/src/infra.rs | 12 +- backend/src/models.rs | 16 +- 5 files changed, 222 insertions(+), 303 deletions(-) diff --git a/backend/src/billing.rs b/backend/src/billing.rs index 4b027e2..13d84f6 100644 --- a/backend/src/billing.rs +++ b/backend/src/billing.rs @@ -4,7 +4,7 @@ use std::collections::BTreeMap; use crate::bitcoin; use crate::command::Command; use crate::env::Env; -use crate::models::{Activity, RELAY_STATUS_ACTIVE, RELAY_STATUS_DELINQUENT, Relay}; +use crate::models::{Activity, RELAY_STATUS_ACTIVE, RELAY_STATUS_DELINQUENT}; use crate::query::Query; use crate::robot::Robot; use crate::stripe::{InvoiceLookupError, Stripe}; @@ -191,7 +191,7 @@ impl Billing { for relay in &relays { if relay.stripe_subscription_item_id.is_some() { self.command - .delete_relay_subscription_item(&relay.id) + .clear_relay_subscription_item(&relay.id) .await?; } } @@ -277,7 +277,7 @@ impl Billing { None => { if relay.stripe_subscription_item_id.is_some() { self.command - .delete_relay_subscription_item(&relay.id) + .clear_relay_subscription_item(&relay.id) .await?; } } diff --git a/backend/src/bitcoin.rs b/backend/src/bitcoin.rs index 0f977e2..b768dfe 100644 --- a/backend/src/bitcoin.rs +++ b/backend/src/bitcoin.rs @@ -24,11 +24,11 @@ pub async fn get_bitcoin_price(currency: &str) -> Result { let resp = http.get(url).send().await?; let body: CoinbaseSpotPriceResponse = resp.error_for_status()?.json().await?; - Ok(body + body .data .amount .parse::() - .map_err(|e| anyhow!("invalid BTC spot quote for {currency}: {e}"))?) + .map_err(|e| anyhow!("invalid BTC spot quote for {currency}: {e}")) } /// Number of decimal places in `currency`'s minor unit, following Stripe's diff --git a/backend/src/command.rs b/backend/src/command.rs index 199462c..18d6644 100644 --- a/backend/src/command.rs +++ b/backend/src/command.rs @@ -18,6 +18,8 @@ impl Command { Self { pool, notify } } + // Activity + async fn insert_activity( tx: &mut Transaction<'_, Sqlite>, activity_type: &str, @@ -61,218 +63,55 @@ impl Command { }) } - fn emit(&self, activity: Activity) { + /// Run `f` inside a transaction, record an activity row, commit, and broadcast. + async fn with_activity( + &self, + activity_type: &str, + resource_type: &str, + resource_id: &str, + f: F, + ) -> Result<()> + where + F: AsyncFnOnce(&mut Transaction<'_, Sqlite>) -> Result<()>, + { + let mut tx = self.pool.begin().await?; + f(&mut tx).await?; + let activity = + Self::insert_activity(&mut tx, activity_type, resource_type, resource_id).await?; + tx.commit().await?; let _ = self.notify.send(activity); + Ok(()) } + // Tenants + pub async fn create_tenant(&self, tenant: &Tenant) -> Result<()> { - if tenant.stripe_customer_id.trim().is_empty() { - anyhow::bail!("stripe_customer_id is required"); - } - - let mut tx = self.pool.begin().await?; - - sqlx::query( - "INSERT INTO tenant (pubkey, nwc_url, created_at, stripe_customer_id) - VALUES (?, ?, ?, ?)", - ) - .bind(&tenant.pubkey) - .bind(&tenant.nwc_url) - .bind(tenant.created_at) - .bind(&tenant.stripe_customer_id) - .execute(&mut *tx) - .await?; - - let activity = - Self::insert_activity(&mut tx, "create_tenant", "tenant", &tenant.pubkey).await?; - - tx.commit().await?; - self.emit(activity); - Ok(()) + self.with_activity("create_tenant", "tenant", &tenant.pubkey, async |tx| { + sqlx::query( + "INSERT INTO tenant (pubkey, nwc_url, created_at, stripe_customer_id) + VALUES (?, ?, ?, ?)", + ) + .bind(&tenant.pubkey) + .bind(&tenant.nwc_url) + .bind(tenant.created_at) + .bind(&tenant.stripe_customer_id) + .execute(&mut **tx) + .await?; + Ok(()) + }) + .await } pub async fn update_tenant(&self, tenant: &Tenant) -> Result<()> { - let mut tx = self.pool.begin().await?; - - sqlx::query("UPDATE tenant SET nwc_url = ? WHERE pubkey = ?") - .bind(&tenant.nwc_url) - .bind(&tenant.pubkey) - .execute(&mut *tx) - .await?; - - let activity = - Self::insert_activity(&mut tx, "update_tenant", "tenant", &tenant.pubkey).await?; - - tx.commit().await?; - self.emit(activity); - Ok(()) - } - - pub async fn create_relay(&self, relay: &Relay) -> Result<()> { - let mut tx = self.pool.begin().await?; - - sqlx::query( - "INSERT INTO relay ( - id, tenant, schema, subdomain, plan, status, synced, sync_error, - info_name, info_icon, info_description, - policy_public_join, policy_strip_signatures, - groups_enabled, management_enabled, blossom_enabled, - livekit_enabled, push_enabled - ) VALUES (?, ?, ?, ?, ?, 'active', 0, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", - ) - .bind(&relay.id) - .bind(&relay.tenant) - .bind(&relay.schema) - .bind(&relay.subdomain) - .bind(&relay.plan) - .bind(&relay.sync_error) - .bind(&relay.info_name) - .bind(&relay.info_icon) - .bind(&relay.info_description) - .bind(relay.policy_public_join) - .bind(relay.policy_strip_signatures) - .bind(relay.groups_enabled) - .bind(relay.management_enabled) - .bind(relay.blossom_enabled) - .bind(relay.livekit_enabled) - .bind(relay.push_enabled) - .execute(&mut *tx) - .await?; - - let activity = Self::insert_activity(&mut tx, "create_relay", "relay", &relay.id).await?; - - tx.commit().await?; - self.emit(activity); - Ok(()) - } - - pub async fn update_relay(&self, relay: &Relay) -> Result<()> { - let mut tx = self.pool.begin().await?; - - sqlx::query( - "UPDATE relay - SET tenant = ?, schema = ?, subdomain = ?, plan = ?, status = ?, sync_error = ?, synced = 0, - info_name = ?, info_icon = ?, info_description = ?, - policy_public_join = ?, policy_strip_signatures = ?, - groups_enabled = ?, management_enabled = ?, blossom_enabled = ?, - livekit_enabled = ?, push_enabled = ? - WHERE id = ?", - ) - .bind(&relay.tenant) - .bind(&relay.schema) - .bind(&relay.subdomain) - .bind(&relay.plan) - .bind(&relay.status) - .bind(&relay.sync_error) - .bind(&relay.info_name) - .bind(&relay.info_icon) - .bind(&relay.info_description) - .bind(relay.policy_public_join) - .bind(relay.policy_strip_signatures) - .bind(relay.groups_enabled) - .bind(relay.management_enabled) - .bind(relay.blossom_enabled) - .bind(relay.livekit_enabled) - .bind(relay.push_enabled) - .bind(&relay.id) - .execute(&mut *tx) - .await?; - - let activity = Self::insert_activity(&mut tx, "update_relay", "relay", &relay.id).await?; - - tx.commit().await?; - self.emit(activity); - Ok(()) - } - - pub async fn deactivate_relay(&self, relay: &Relay) -> Result<()> { - self.set_relay_status(&relay.id, RELAY_STATUS_INACTIVE, "deactivate_relay") - .await - } - - pub async fn mark_relay_delinquent(&self, relay: &Relay) -> Result<()> { - self.set_relay_status(&relay.id, RELAY_STATUS_DELINQUENT, "deactivate_relay") - .await - } - - async fn set_relay_status( - &self, - relay_id: &str, - status: &str, - activity_type: &str, - ) -> Result<()> { - let mut tx = self.pool.begin().await?; - - sqlx::query("UPDATE relay SET status = ?, synced = 0 WHERE id = ?") - .bind(status) - .bind(relay_id) - .execute(&mut *tx) - .await?; - - let activity = Self::insert_activity(&mut tx, activity_type, "relay", relay_id).await?; - - tx.commit().await?; - self.emit(activity); - Ok(()) - } - - pub async fn activate_relay(&self, relay: &Relay) -> Result<()> { - self.set_relay_status(&relay.id, RELAY_STATUS_ACTIVE, "activate_relay") - .await - } - - pub async fn fail_relay_sync(&self, relay: &Relay, sync_error: String) -> Result<()> { - let mut tx = self.pool.begin().await?; - - sqlx::query("UPDATE relay SET synced = 0, sync_error = ? WHERE id = ?") - .bind(&sync_error) - .bind(&relay.id) - .execute(&mut *tx) - .await?; - - let activity = - Self::insert_activity(&mut tx, "fail_relay_sync", "relay", &relay.id).await?; - - tx.commit().await?; - self.emit(activity); - Ok(()) - } - - pub async fn complete_relay_sync(&self, relay_id: &str) -> Result<()> { - let mut tx = self.pool.begin().await?; - - sqlx::query("UPDATE relay SET synced = 1, sync_error = '' WHERE id = ?") - .bind(relay_id) - .execute(&mut *tx) - .await?; - - let activity = - Self::insert_activity(&mut tx, "complete_relay_sync", "relay", relay_id).await?; - - tx.commit().await?; - self.emit(activity); - Ok(()) - } - - pub async fn delete_relay_subscription_item(&self, relay_id: &str) -> Result<()> { - sqlx::query("UPDATE relay SET stripe_subscription_item_id = NULL WHERE id = ?") - .bind(relay_id) - .execute(&self.pool) - .await?; - Ok(()) - } - - pub async fn set_relay_subscription_item( - &self, - relay_id: &str, - stripe_subscription_item_id: &str, - ) -> Result<()> { - sqlx::query("UPDATE relay SET stripe_subscription_item_id = ? WHERE id = ?") - .bind(stripe_subscription_item_id) - .bind(relay_id) - .execute(&self.pool) - .await?; - Ok(()) + self.with_activity("update_tenant", "tenant", &tenant.pubkey, async |tx| { + sqlx::query("UPDATE tenant SET nwc_url = ? WHERE pubkey = ?") + .bind(&tenant.nwc_url) + .bind(&tenant.pubkey) + .execute(&mut **tx) + .await?; + Ok(()) + }) + .await } pub async fn set_tenant_subscription( @@ -313,6 +152,173 @@ impl Command { Ok(()) } + pub async fn set_tenant_past_due(&self, pubkey: &str) -> Result<()> { + let now = chrono::Utc::now().timestamp(); + sqlx::query("UPDATE tenant SET past_due_at = ? WHERE pubkey = ?") + .bind(now) + .bind(pubkey) + .execute(&self.pool) + .await?; + Ok(()) + } + + pub async fn clear_tenant_past_due(&self, pubkey: &str) -> Result<()> { + sqlx::query("UPDATE tenant SET past_due_at = NULL WHERE pubkey = ?") + .bind(pubkey) + .execute(&self.pool) + .await?; + Ok(()) + } + + // Relays + + pub async fn create_relay(&self, relay: &Relay) -> Result<()> { + self.with_activity("create_relay", "relay", &relay.id, async |tx| { + sqlx::query( + "INSERT INTO relay ( + id, tenant, schema, subdomain, plan, status, synced, sync_error, + info_name, info_icon, info_description, + policy_public_join, policy_strip_signatures, + groups_enabled, management_enabled, blossom_enabled, + livekit_enabled, push_enabled + ) VALUES (?, ?, ?, ?, ?, 'active', 0, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", + ) + .bind(&relay.id) + .bind(&relay.tenant) + .bind(&relay.schema) + .bind(&relay.subdomain) + .bind(&relay.plan) + .bind(&relay.sync_error) + .bind(&relay.info_name) + .bind(&relay.info_icon) + .bind(&relay.info_description) + .bind(relay.policy_public_join) + .bind(relay.policy_strip_signatures) + .bind(relay.groups_enabled) + .bind(relay.management_enabled) + .bind(relay.blossom_enabled) + .bind(relay.livekit_enabled) + .bind(relay.push_enabled) + .execute(&mut **tx) + .await?; + Ok(()) + }) + .await + } + + pub async fn update_relay(&self, relay: &Relay) -> Result<()> { + self.with_activity("update_relay", "relay", &relay.id, async |tx| { + sqlx::query( + "UPDATE relay + SET tenant = ?, schema = ?, subdomain = ?, plan = ?, status = ?, sync_error = ?, synced = 0, + info_name = ?, info_icon = ?, info_description = ?, + policy_public_join = ?, policy_strip_signatures = ?, + groups_enabled = ?, management_enabled = ?, blossom_enabled = ?, + livekit_enabled = ?, push_enabled = ? + WHERE id = ?", + ) + .bind(&relay.tenant) + .bind(&relay.schema) + .bind(&relay.subdomain) + .bind(&relay.plan) + .bind(&relay.status) + .bind(&relay.sync_error) + .bind(&relay.info_name) + .bind(&relay.info_icon) + .bind(&relay.info_description) + .bind(relay.policy_public_join) + .bind(relay.policy_strip_signatures) + .bind(relay.groups_enabled) + .bind(relay.management_enabled) + .bind(relay.blossom_enabled) + .bind(relay.livekit_enabled) + .bind(relay.push_enabled) + .bind(&relay.id) + .execute(&mut **tx) + .await?; + Ok(()) + }) + .await + } + + pub async fn deactivate_relay(&self, relay: &Relay) -> Result<()> { + self.set_relay_status(&relay.id, RELAY_STATUS_INACTIVE, "deactivate_relay") + .await + } + + pub async fn mark_relay_delinquent(&self, relay: &Relay) -> Result<()> { + self.set_relay_status(&relay.id, RELAY_STATUS_DELINQUENT, "mark_relay_delinquent") + .await + } + + pub async fn activate_relay(&self, relay: &Relay) -> Result<()> { + self.set_relay_status(&relay.id, RELAY_STATUS_ACTIVE, "activate_relay") + .await + } + + async fn set_relay_status( + &self, + relay_id: &str, + status: &str, + activity_type: &str, + ) -> Result<()> { + self.with_activity(activity_type, "relay", relay_id, async |tx| { + sqlx::query("UPDATE relay SET status = ?, synced = 0 WHERE id = ?") + .bind(status) + .bind(relay_id) + .execute(&mut **tx) + .await?; + Ok(()) + }) + .await + } + + pub async fn fail_relay_sync(&self, relay: &Relay, sync_error: String) -> Result<()> { + self.with_activity("fail_relay_sync", "relay", &relay.id, async |tx| { + sqlx::query("UPDATE relay SET synced = 0, sync_error = ? WHERE id = ?") + .bind(&sync_error) + .bind(&relay.id) + .execute(&mut **tx) + .await?; + Ok(()) + }) + .await + } + + pub async fn complete_relay_sync(&self, relay_id: &str) -> Result<()> { + self.with_activity("complete_relay_sync", "relay", relay_id, async |tx| { + sqlx::query("UPDATE relay SET synced = 1, sync_error = '' WHERE id = ?") + .bind(relay_id) + .execute(&mut **tx) + .await?; + Ok(()) + }) + .await + } + + pub async fn clear_relay_subscription_item(&self, relay_id: &str) -> Result<()> { + sqlx::query("UPDATE relay SET stripe_subscription_item_id = NULL WHERE id = ?") + .bind(relay_id) + .execute(&self.pool) + .await?; + Ok(()) + } + + pub async fn set_relay_subscription_item( + &self, + relay_id: &str, + stripe_subscription_item_id: &str, + ) -> Result<()> { + sqlx::query("UPDATE relay SET stripe_subscription_item_id = ? WHERE id = ?") + .bind(stripe_subscription_item_id) + .bind(relay_id) + .execute(&self.pool) + .await?; + Ok(()) + } + + // Invoices + pub async fn insert_pending_invoice_nwc_payment( &self, invoice_id: &str, @@ -376,75 +382,4 @@ impl Command { Ok(result.rows_affected() > 0) } - - pub async fn set_tenant_past_due(&self, pubkey: &str) -> Result<()> { - let now = chrono::Utc::now().timestamp(); - sqlx::query("UPDATE tenant SET past_due_at = ? WHERE pubkey = ?") - .bind(now) - .bind(pubkey) - .execute(&self.pool) - .await?; - Ok(()) - } - - pub async fn clear_tenant_past_due(&self, pubkey: &str) -> Result<()> { - sqlx::query("UPDATE tenant SET past_due_at = NULL WHERE pubkey = ?") - .bind(pubkey) - .execute(&self.pool) - .await?; - Ok(()) - } -} - -#[cfg(test)] -mod tests { - use super::*; - use sqlx::SqlitePool; - use sqlx::sqlite::{SqliteConnectOptions, SqlitePoolOptions}; - use std::str::FromStr; - - async fn test_pool() -> SqlitePool { - let connect_options = SqliteConnectOptions::from_str("sqlite::memory:") - .expect("valid sqlite memory url") - .create_if_missing(true); - - let pool = SqlitePoolOptions::new() - .max_connections(1) - .connect_with(connect_options) - .await - .expect("connect sqlite memory db"); - - sqlx::migrate!("./migrations") - .run(&pool) - .await - .expect("run migrations"); - - pool - } - - #[tokio::test] - async fn create_tenant_rejects_empty_stripe_customer_id() { - let pool = test_pool().await; - let command = Command::new(pool); - - let tenant = Tenant { - pubkey: "tenant_pubkey".to_string(), - nwc_url: String::new(), - nwc_error: None, - created_at: 0, - stripe_customer_id: " ".to_string(), - stripe_subscription_id: None, - past_due_at: None, - }; - - let err = command - .create_tenant(&tenant) - .await - .expect_err("empty customer id must be rejected"); - - assert!( - err.to_string().contains("stripe_customer_id is required"), - "unexpected error: {err}" - ); - } } diff --git a/backend/src/infra.rs b/backend/src/infra.rs index 3d3effb..9177d77 100644 --- a/backend/src/infra.rs +++ b/backend/src/infra.rs @@ -234,13 +234,11 @@ impl Infra { }); // Only provide a secret if the relay is new. This allows us to not store the relay secrets on our side. - if is_new { - if let Some(obj) = body.as_object_mut() { - obj.insert( - "secret".to_string(), - serde_json::Value::String(Keys::generate().secret_key().to_secret_hex()), - ); - } + if is_new && let Some(obj) = body.as_object_mut() { + obj.insert( + "secret".to_string(), + serde_json::Value::String(Keys::generate().secret_key().to_secret_hex()), + ); } let method = if is_new { HttpMethod::POST } else { HttpMethod::PATCH }; diff --git a/backend/src/models.rs b/backend/src/models.rs index 8b4d6f8..d7c4037 100644 --- a/backend/src/models.rs +++ b/backend/src/models.rs @@ -25,7 +25,7 @@ pub struct Plan { pub stripe_price_id: Option, } -#[derive(Debug, Clone, Serialize, Deserialize, sqlx::FromRow)] +#[derive(Debug, Default, Clone, Serialize, Deserialize, sqlx::FromRow)] pub struct Tenant { pub pubkey: String, pub nwc_url: String, @@ -36,20 +36,6 @@ pub struct Tenant { pub past_due_at: Option, } -impl Default for Tenant { - fn default() -> Self { - Self { - pubkey: String::new(), - nwc_url: String::new(), - nwc_error: None, - created_at: 0, - stripe_customer_id: String::new(), - stripe_subscription_id: None, - past_due_at: None, - } - } -} - #[derive(Debug, Clone, Serialize, Deserialize, sqlx::FromRow)] pub struct Relay { pub id: String,