forked from coracle/caravel
Refactor commands
This commit is contained in:
+211
-276
@@ -18,6 +18,8 @@ impl Command {
|
||||
Self { pool, notify }
|
||||
}
|
||||
|
||||
// Activity
|
||||
|
||||
async fn insert_activity(
|
||||
tx: &mut Transaction<'_, Sqlite>,
|
||||
activity_type: &str,
|
||||
@@ -61,218 +63,55 @@ impl Command {
|
||||
})
|
||||
}
|
||||
|
||||
fn emit(&self, activity: Activity) {
|
||||
/// Run `f` inside a transaction, record an activity row, commit, and broadcast.
|
||||
async fn with_activity<F>(
|
||||
&self,
|
||||
activity_type: &str,
|
||||
resource_type: &str,
|
||||
resource_id: &str,
|
||||
f: F,
|
||||
) -> Result<()>
|
||||
where
|
||||
F: AsyncFnOnce(&mut Transaction<'_, Sqlite>) -> Result<()>,
|
||||
{
|
||||
let mut tx = self.pool.begin().await?;
|
||||
f(&mut tx).await?;
|
||||
let activity =
|
||||
Self::insert_activity(&mut tx, activity_type, resource_type, resource_id).await?;
|
||||
tx.commit().await?;
|
||||
let _ = self.notify.send(activity);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Tenants
|
||||
|
||||
pub async fn create_tenant(&self, tenant: &Tenant) -> Result<()> {
|
||||
if tenant.stripe_customer_id.trim().is_empty() {
|
||||
anyhow::bail!("stripe_customer_id is required");
|
||||
}
|
||||
|
||||
let mut tx = self.pool.begin().await?;
|
||||
|
||||
sqlx::query(
|
||||
"INSERT INTO tenant (pubkey, nwc_url, created_at, stripe_customer_id)
|
||||
VALUES (?, ?, ?, ?)",
|
||||
)
|
||||
.bind(&tenant.pubkey)
|
||||
.bind(&tenant.nwc_url)
|
||||
.bind(tenant.created_at)
|
||||
.bind(&tenant.stripe_customer_id)
|
||||
.execute(&mut *tx)
|
||||
.await?;
|
||||
|
||||
let activity =
|
||||
Self::insert_activity(&mut tx, "create_tenant", "tenant", &tenant.pubkey).await?;
|
||||
|
||||
tx.commit().await?;
|
||||
self.emit(activity);
|
||||
Ok(())
|
||||
self.with_activity("create_tenant", "tenant", &tenant.pubkey, async |tx| {
|
||||
sqlx::query(
|
||||
"INSERT INTO tenant (pubkey, nwc_url, created_at, stripe_customer_id)
|
||||
VALUES (?, ?, ?, ?)",
|
||||
)
|
||||
.bind(&tenant.pubkey)
|
||||
.bind(&tenant.nwc_url)
|
||||
.bind(tenant.created_at)
|
||||
.bind(&tenant.stripe_customer_id)
|
||||
.execute(&mut **tx)
|
||||
.await?;
|
||||
Ok(())
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn update_tenant(&self, tenant: &Tenant) -> Result<()> {
|
||||
let mut tx = self.pool.begin().await?;
|
||||
|
||||
sqlx::query("UPDATE tenant SET nwc_url = ? WHERE pubkey = ?")
|
||||
.bind(&tenant.nwc_url)
|
||||
.bind(&tenant.pubkey)
|
||||
.execute(&mut *tx)
|
||||
.await?;
|
||||
|
||||
let activity =
|
||||
Self::insert_activity(&mut tx, "update_tenant", "tenant", &tenant.pubkey).await?;
|
||||
|
||||
tx.commit().await?;
|
||||
self.emit(activity);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn create_relay(&self, relay: &Relay) -> Result<()> {
|
||||
let mut tx = self.pool.begin().await?;
|
||||
|
||||
sqlx::query(
|
||||
"INSERT INTO relay (
|
||||
id, tenant, schema, subdomain, plan, status, synced, sync_error,
|
||||
info_name, info_icon, info_description,
|
||||
policy_public_join, policy_strip_signatures,
|
||||
groups_enabled, management_enabled, blossom_enabled,
|
||||
livekit_enabled, push_enabled
|
||||
) VALUES (?, ?, ?, ?, ?, 'active', 0, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
|
||||
)
|
||||
.bind(&relay.id)
|
||||
.bind(&relay.tenant)
|
||||
.bind(&relay.schema)
|
||||
.bind(&relay.subdomain)
|
||||
.bind(&relay.plan)
|
||||
.bind(&relay.sync_error)
|
||||
.bind(&relay.info_name)
|
||||
.bind(&relay.info_icon)
|
||||
.bind(&relay.info_description)
|
||||
.bind(relay.policy_public_join)
|
||||
.bind(relay.policy_strip_signatures)
|
||||
.bind(relay.groups_enabled)
|
||||
.bind(relay.management_enabled)
|
||||
.bind(relay.blossom_enabled)
|
||||
.bind(relay.livekit_enabled)
|
||||
.bind(relay.push_enabled)
|
||||
.execute(&mut *tx)
|
||||
.await?;
|
||||
|
||||
let activity = Self::insert_activity(&mut tx, "create_relay", "relay", &relay.id).await?;
|
||||
|
||||
tx.commit().await?;
|
||||
self.emit(activity);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn update_relay(&self, relay: &Relay) -> Result<()> {
|
||||
let mut tx = self.pool.begin().await?;
|
||||
|
||||
sqlx::query(
|
||||
"UPDATE relay
|
||||
SET tenant = ?, schema = ?, subdomain = ?, plan = ?, status = ?, sync_error = ?, synced = 0,
|
||||
info_name = ?, info_icon = ?, info_description = ?,
|
||||
policy_public_join = ?, policy_strip_signatures = ?,
|
||||
groups_enabled = ?, management_enabled = ?, blossom_enabled = ?,
|
||||
livekit_enabled = ?, push_enabled = ?
|
||||
WHERE id = ?",
|
||||
)
|
||||
.bind(&relay.tenant)
|
||||
.bind(&relay.schema)
|
||||
.bind(&relay.subdomain)
|
||||
.bind(&relay.plan)
|
||||
.bind(&relay.status)
|
||||
.bind(&relay.sync_error)
|
||||
.bind(&relay.info_name)
|
||||
.bind(&relay.info_icon)
|
||||
.bind(&relay.info_description)
|
||||
.bind(relay.policy_public_join)
|
||||
.bind(relay.policy_strip_signatures)
|
||||
.bind(relay.groups_enabled)
|
||||
.bind(relay.management_enabled)
|
||||
.bind(relay.blossom_enabled)
|
||||
.bind(relay.livekit_enabled)
|
||||
.bind(relay.push_enabled)
|
||||
.bind(&relay.id)
|
||||
.execute(&mut *tx)
|
||||
.await?;
|
||||
|
||||
let activity = Self::insert_activity(&mut tx, "update_relay", "relay", &relay.id).await?;
|
||||
|
||||
tx.commit().await?;
|
||||
self.emit(activity);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn deactivate_relay(&self, relay: &Relay) -> Result<()> {
|
||||
self.set_relay_status(&relay.id, RELAY_STATUS_INACTIVE, "deactivate_relay")
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn mark_relay_delinquent(&self, relay: &Relay) -> Result<()> {
|
||||
self.set_relay_status(&relay.id, RELAY_STATUS_DELINQUENT, "deactivate_relay")
|
||||
.await
|
||||
}
|
||||
|
||||
async fn set_relay_status(
|
||||
&self,
|
||||
relay_id: &str,
|
||||
status: &str,
|
||||
activity_type: &str,
|
||||
) -> Result<()> {
|
||||
let mut tx = self.pool.begin().await?;
|
||||
|
||||
sqlx::query("UPDATE relay SET status = ?, synced = 0 WHERE id = ?")
|
||||
.bind(status)
|
||||
.bind(relay_id)
|
||||
.execute(&mut *tx)
|
||||
.await?;
|
||||
|
||||
let activity = Self::insert_activity(&mut tx, activity_type, "relay", relay_id).await?;
|
||||
|
||||
tx.commit().await?;
|
||||
self.emit(activity);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn activate_relay(&self, relay: &Relay) -> Result<()> {
|
||||
self.set_relay_status(&relay.id, RELAY_STATUS_ACTIVE, "activate_relay")
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn fail_relay_sync(&self, relay: &Relay, sync_error: String) -> Result<()> {
|
||||
let mut tx = self.pool.begin().await?;
|
||||
|
||||
sqlx::query("UPDATE relay SET synced = 0, sync_error = ? WHERE id = ?")
|
||||
.bind(&sync_error)
|
||||
.bind(&relay.id)
|
||||
.execute(&mut *tx)
|
||||
.await?;
|
||||
|
||||
let activity =
|
||||
Self::insert_activity(&mut tx, "fail_relay_sync", "relay", &relay.id).await?;
|
||||
|
||||
tx.commit().await?;
|
||||
self.emit(activity);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn complete_relay_sync(&self, relay_id: &str) -> Result<()> {
|
||||
let mut tx = self.pool.begin().await?;
|
||||
|
||||
sqlx::query("UPDATE relay SET synced = 1, sync_error = '' WHERE id = ?")
|
||||
.bind(relay_id)
|
||||
.execute(&mut *tx)
|
||||
.await?;
|
||||
|
||||
let activity =
|
||||
Self::insert_activity(&mut tx, "complete_relay_sync", "relay", relay_id).await?;
|
||||
|
||||
tx.commit().await?;
|
||||
self.emit(activity);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn delete_relay_subscription_item(&self, relay_id: &str) -> Result<()> {
|
||||
sqlx::query("UPDATE relay SET stripe_subscription_item_id = NULL WHERE id = ?")
|
||||
.bind(relay_id)
|
||||
.execute(&self.pool)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn set_relay_subscription_item(
|
||||
&self,
|
||||
relay_id: &str,
|
||||
stripe_subscription_item_id: &str,
|
||||
) -> Result<()> {
|
||||
sqlx::query("UPDATE relay SET stripe_subscription_item_id = ? WHERE id = ?")
|
||||
.bind(stripe_subscription_item_id)
|
||||
.bind(relay_id)
|
||||
.execute(&self.pool)
|
||||
.await?;
|
||||
Ok(())
|
||||
self.with_activity("update_tenant", "tenant", &tenant.pubkey, async |tx| {
|
||||
sqlx::query("UPDATE tenant SET nwc_url = ? WHERE pubkey = ?")
|
||||
.bind(&tenant.nwc_url)
|
||||
.bind(&tenant.pubkey)
|
||||
.execute(&mut **tx)
|
||||
.await?;
|
||||
Ok(())
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn set_tenant_subscription(
|
||||
@@ -313,6 +152,173 @@ impl Command {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn set_tenant_past_due(&self, pubkey: &str) -> Result<()> {
|
||||
let now = chrono::Utc::now().timestamp();
|
||||
sqlx::query("UPDATE tenant SET past_due_at = ? WHERE pubkey = ?")
|
||||
.bind(now)
|
||||
.bind(pubkey)
|
||||
.execute(&self.pool)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn clear_tenant_past_due(&self, pubkey: &str) -> Result<()> {
|
||||
sqlx::query("UPDATE tenant SET past_due_at = NULL WHERE pubkey = ?")
|
||||
.bind(pubkey)
|
||||
.execute(&self.pool)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Relays
|
||||
|
||||
pub async fn create_relay(&self, relay: &Relay) -> Result<()> {
|
||||
self.with_activity("create_relay", "relay", &relay.id, async |tx| {
|
||||
sqlx::query(
|
||||
"INSERT INTO relay (
|
||||
id, tenant, schema, subdomain, plan, status, synced, sync_error,
|
||||
info_name, info_icon, info_description,
|
||||
policy_public_join, policy_strip_signatures,
|
||||
groups_enabled, management_enabled, blossom_enabled,
|
||||
livekit_enabled, push_enabled
|
||||
) VALUES (?, ?, ?, ?, ?, 'active', 0, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
|
||||
)
|
||||
.bind(&relay.id)
|
||||
.bind(&relay.tenant)
|
||||
.bind(&relay.schema)
|
||||
.bind(&relay.subdomain)
|
||||
.bind(&relay.plan)
|
||||
.bind(&relay.sync_error)
|
||||
.bind(&relay.info_name)
|
||||
.bind(&relay.info_icon)
|
||||
.bind(&relay.info_description)
|
||||
.bind(relay.policy_public_join)
|
||||
.bind(relay.policy_strip_signatures)
|
||||
.bind(relay.groups_enabled)
|
||||
.bind(relay.management_enabled)
|
||||
.bind(relay.blossom_enabled)
|
||||
.bind(relay.livekit_enabled)
|
||||
.bind(relay.push_enabled)
|
||||
.execute(&mut **tx)
|
||||
.await?;
|
||||
Ok(())
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn update_relay(&self, relay: &Relay) -> Result<()> {
|
||||
self.with_activity("update_relay", "relay", &relay.id, async |tx| {
|
||||
sqlx::query(
|
||||
"UPDATE relay
|
||||
SET tenant = ?, schema = ?, subdomain = ?, plan = ?, status = ?, sync_error = ?, synced = 0,
|
||||
info_name = ?, info_icon = ?, info_description = ?,
|
||||
policy_public_join = ?, policy_strip_signatures = ?,
|
||||
groups_enabled = ?, management_enabled = ?, blossom_enabled = ?,
|
||||
livekit_enabled = ?, push_enabled = ?
|
||||
WHERE id = ?",
|
||||
)
|
||||
.bind(&relay.tenant)
|
||||
.bind(&relay.schema)
|
||||
.bind(&relay.subdomain)
|
||||
.bind(&relay.plan)
|
||||
.bind(&relay.status)
|
||||
.bind(&relay.sync_error)
|
||||
.bind(&relay.info_name)
|
||||
.bind(&relay.info_icon)
|
||||
.bind(&relay.info_description)
|
||||
.bind(relay.policy_public_join)
|
||||
.bind(relay.policy_strip_signatures)
|
||||
.bind(relay.groups_enabled)
|
||||
.bind(relay.management_enabled)
|
||||
.bind(relay.blossom_enabled)
|
||||
.bind(relay.livekit_enabled)
|
||||
.bind(relay.push_enabled)
|
||||
.bind(&relay.id)
|
||||
.execute(&mut **tx)
|
||||
.await?;
|
||||
Ok(())
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn deactivate_relay(&self, relay: &Relay) -> Result<()> {
|
||||
self.set_relay_status(&relay.id, RELAY_STATUS_INACTIVE, "deactivate_relay")
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn mark_relay_delinquent(&self, relay: &Relay) -> Result<()> {
|
||||
self.set_relay_status(&relay.id, RELAY_STATUS_DELINQUENT, "mark_relay_delinquent")
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn activate_relay(&self, relay: &Relay) -> Result<()> {
|
||||
self.set_relay_status(&relay.id, RELAY_STATUS_ACTIVE, "activate_relay")
|
||||
.await
|
||||
}
|
||||
|
||||
async fn set_relay_status(
|
||||
&self,
|
||||
relay_id: &str,
|
||||
status: &str,
|
||||
activity_type: &str,
|
||||
) -> Result<()> {
|
||||
self.with_activity(activity_type, "relay", relay_id, async |tx| {
|
||||
sqlx::query("UPDATE relay SET status = ?, synced = 0 WHERE id = ?")
|
||||
.bind(status)
|
||||
.bind(relay_id)
|
||||
.execute(&mut **tx)
|
||||
.await?;
|
||||
Ok(())
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn fail_relay_sync(&self, relay: &Relay, sync_error: String) -> Result<()> {
|
||||
self.with_activity("fail_relay_sync", "relay", &relay.id, async |tx| {
|
||||
sqlx::query("UPDATE relay SET synced = 0, sync_error = ? WHERE id = ?")
|
||||
.bind(&sync_error)
|
||||
.bind(&relay.id)
|
||||
.execute(&mut **tx)
|
||||
.await?;
|
||||
Ok(())
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn complete_relay_sync(&self, relay_id: &str) -> Result<()> {
|
||||
self.with_activity("complete_relay_sync", "relay", relay_id, async |tx| {
|
||||
sqlx::query("UPDATE relay SET synced = 1, sync_error = '' WHERE id = ?")
|
||||
.bind(relay_id)
|
||||
.execute(&mut **tx)
|
||||
.await?;
|
||||
Ok(())
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn clear_relay_subscription_item(&self, relay_id: &str) -> Result<()> {
|
||||
sqlx::query("UPDATE relay SET stripe_subscription_item_id = NULL WHERE id = ?")
|
||||
.bind(relay_id)
|
||||
.execute(&self.pool)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn set_relay_subscription_item(
|
||||
&self,
|
||||
relay_id: &str,
|
||||
stripe_subscription_item_id: &str,
|
||||
) -> Result<()> {
|
||||
sqlx::query("UPDATE relay SET stripe_subscription_item_id = ? WHERE id = ?")
|
||||
.bind(stripe_subscription_item_id)
|
||||
.bind(relay_id)
|
||||
.execute(&self.pool)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Invoices
|
||||
|
||||
pub async fn insert_pending_invoice_nwc_payment(
|
||||
&self,
|
||||
invoice_id: &str,
|
||||
@@ -376,75 +382,4 @@ impl Command {
|
||||
|
||||
Ok(result.rows_affected() > 0)
|
||||
}
|
||||
|
||||
pub async fn set_tenant_past_due(&self, pubkey: &str) -> Result<()> {
|
||||
let now = chrono::Utc::now().timestamp();
|
||||
sqlx::query("UPDATE tenant SET past_due_at = ? WHERE pubkey = ?")
|
||||
.bind(now)
|
||||
.bind(pubkey)
|
||||
.execute(&self.pool)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn clear_tenant_past_due(&self, pubkey: &str) -> Result<()> {
|
||||
sqlx::query("UPDATE tenant SET past_due_at = NULL WHERE pubkey = ?")
|
||||
.bind(pubkey)
|
||||
.execute(&self.pool)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use sqlx::SqlitePool;
|
||||
use sqlx::sqlite::{SqliteConnectOptions, SqlitePoolOptions};
|
||||
use std::str::FromStr;
|
||||
|
||||
async fn test_pool() -> SqlitePool {
|
||||
let connect_options = SqliteConnectOptions::from_str("sqlite::memory:")
|
||||
.expect("valid sqlite memory url")
|
||||
.create_if_missing(true);
|
||||
|
||||
let pool = SqlitePoolOptions::new()
|
||||
.max_connections(1)
|
||||
.connect_with(connect_options)
|
||||
.await
|
||||
.expect("connect sqlite memory db");
|
||||
|
||||
sqlx::migrate!("./migrations")
|
||||
.run(&pool)
|
||||
.await
|
||||
.expect("run migrations");
|
||||
|
||||
pool
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn create_tenant_rejects_empty_stripe_customer_id() {
|
||||
let pool = test_pool().await;
|
||||
let command = Command::new(pool);
|
||||
|
||||
let tenant = Tenant {
|
||||
pubkey: "tenant_pubkey".to_string(),
|
||||
nwc_url: String::new(),
|
||||
nwc_error: None,
|
||||
created_at: 0,
|
||||
stripe_customer_id: " ".to_string(),
|
||||
stripe_subscription_id: None,
|
||||
past_due_at: None,
|
||||
};
|
||||
|
||||
let err = command
|
||||
.create_tenant(&tenant)
|
||||
.await
|
||||
.expect_err("empty customer id must be rejected");
|
||||
|
||||
assert!(
|
||||
err.to_string().contains("stripe_customer_id is required"),
|
||||
"unexpected error: {err}"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user