diff --git a/backend/spec/api.md b/backend/spec/api.md index e304ccc..b9154ea 100644 --- a/backend/spec/api.md +++ b/backend/spec/api.md @@ -5,10 +5,10 @@ Api manages the HTTP interface for the application Members: - `host: String` - the hostname of the service for checking NIP 98 auth, from `HOST` -- `port: u16` - a port to run the server on from `PORT` - `admins: Vec` - a list of admin pubkeys from `ADMINS` -- `origins: Vec` - to be used in CORS headers, from `ALLOW_ORIGINS` -- `repo: Repo` +- `query: Query` +- `command: Command` +- `billing: Billing` Notes: @@ -31,7 +31,7 @@ Notes: - Serves `GET /plans` - No authentication required -- Return `data` is a list of plan structs from `Repo::list_plans` +- Return `data` is a list of plan structs from `Query::list_plans` ## `async fn get_plan(...) -> Response` @@ -55,26 +55,26 @@ Notes: - Serves `GET /tenants` - Authorizes admin only -- Return `data` is a list of tenant structs from `repo.list_tenants` +- Return `data` is a list of tenant structs from `query.list_tenants` ## `async fn get_tenant(...) -> Response` - Serves `GET /tenants/:pubkey` - Authorizes admin or matching tenant -- Return `data` is a single tenant struct from `repo.get_tenant` +- Return `data` is a single tenant struct from `query.get_tenant` ## `async fn update_tenant(...) -> Response` - Serves `PUT /tenants/:pubkey` - Authorizes admin or matching tenant -- Updates tenant using `repo.update_tenant` +- Updates tenant using `command.update_tenant` - Return `data` is the updated tenant struct ## `async fn list_tenant_relays(...) -> Response` - Serves `GET /tenants/:pubkey/relays` - Authorizes admin or matching tenant -- Return `data` is a list of relay structs from `repo.list_relays_for_tenant` +- Return `data` is a list of relay structs from `query.list_relays_for_tenant` --- Relay routes @@ -82,20 +82,20 @@ Notes: - Serves `GET /relays` - Authorizes admin only -- Return `data` is a list of relay structs from `repo.list_relays` +- Return `data` is a list of relay structs from `query.list_relays` ## `async fn get_relay(...) -> Response` - Serves `GET /relays/:id` - Authorizes admin or relay owner -- Return `data` is a single relay struct from `repo.get_relay` +- Return `data` is a single relay struct from `query.get_relay` ## `async fn create_relay(...) -> Response` - Serves `POST /relays` - Authorizes admin or matching tenant pubkey in request body - Validates/prepares the relay data to be saved using `prepare_relay` -- Creates a new relay using `repo.create_relay` +- Creates a new relay using `command.create_relay` - If relay is a duplicate by subdomain, return a `422` with `code=subdomain-exists` - Return `data` is a single relay struct. Use HTTP `201`. @@ -104,7 +104,7 @@ Notes: - Serves `PUT /relays/:id` - Authorizes admin or relay owner - Validates/prepares the relay data to be saved using `prepare_relay` -- Updates the given relay using `repo.update_relay` +- Updates the given relay using `command.update_relay` - If relay is a duplicate by subdomain, return a `422` with `code=subdomain-exists` - Return `data` is a single relay struct. @@ -112,7 +112,7 @@ Notes: - Serves `GET /relays/:id/activity` - Authorizes admin or relay owner -- Get activity from `repo.list_activity_for_relay` +- Get activity from `query.list_activity_for_relay` - Return `data` is `{activity}` ## `async fn deactivate_relay(...) -> Response` diff --git a/backend/spec/billing.md b/backend/spec/billing.md index 0d21a9a..40b111e 100644 --- a/backend/spec/billing.md +++ b/backend/spec/billing.md @@ -5,9 +5,10 @@ Billing encapsulates logic related to synchronizing state with Stripe. Members: - `nwc_url: String` - a nostr wallet connect URL used to **create** bolt11 invoices -- `repo: Repo` +- `query: Query` +- `command: Command` - `robot: Robot` -## `pub fn new(repo: Repo, robot: Robot) -> Self` +## `pub fn new(query: Query, command: Command, robot: Robot) -> Self` - Reads environment and populates members diff --git a/backend/spec/command.md b/backend/spec/command.md new file mode 100644 index 0000000..19b8a7c --- /dev/null +++ b/backend/spec/command.md @@ -0,0 +1,57 @@ +# `pub struct Command` + +Command writes to the database. + +Members: + +- `pool: SqlitePool` - a sqlite connection pool + +Notes: + +- All public write methods should be atomic +- All writes should be accompanied by an activity log entry of `(tenant, activity_type, resource_type, resource_id)` + +## `pub fn new(&self, pool: SqlitePool) -> Self` + +- Assigns pool to self + +## `pub fn create_tenant(&self, tenant: &Tenant) -> Result<()>` + +- Creates tenant, may throw sqlite uniqueness error on pubkey +- Logs activity as `(create_tenant, tenant_id)` + +## `pub fn update_tenant(&self, tenant: &Tenant) -> Result<()>` + +- Updates tenant +- Logs activity as `(update_tenant, tenant_id)` + +## `pub fn create_relay(&self, relay: &Relay) -> Result<()>` + +- Creates relay, may throw sqlite uniqueness error on subdomain +- Sets relay status to `new` +- Logs activity as `(create_relay, relay_id)` + +## `pub fn update_relay(&self, relay: &Relay) -> Result<()>` + +- Updates relay, may throw sqlite uniqueness error on subdomain +- Logs activity as `(update_relay, relay_id)` + +## `pub fn deactivate_relay(&self, relay: &Relay) -> Result<()>` + +- Sets relay status to `inactive` +- Logs activity as `(deactivate_relay, relay_id)` + +## `pub fn activate_relay(&self, relay: &Relay) -> Result<()>` + +- Sets relay status to `active` +- Logs activity as `(activate_relay, relay_id)` + +## `pub fn fail_relay_sync(&self, relay: &Relay, sync_error: &str) -> Result<()>` + +- Sets relay status to `inactive`, sets `sync_error` +- Logs activity as `(fail_relay_sync, relay_id)` + +## `pub fn mark_relay_synced(&self, relay_id: &str) -> Result<()>` + +- Sets `synced = 1`, `status = 'active'`, clears `sync_error` +- No activity log (called by infra after successful sync) diff --git a/backend/spec/infra.md b/backend/spec/infra.md index 7fb30c2..a1dd60f 100644 --- a/backend/spec/infra.md +++ b/backend/spec/infra.md @@ -5,25 +5,26 @@ Infra is a service which polls the database and synchronizes updates to relays t Members: - `api_url: String` - the URL of the zooid instance to be managed, from `ZOOID_API_URL` -- `repo: Repo` +- `query: Query` +- `command: Command` -## `pub fn new(repo: Repo) -> Self` +## `pub fn new(query: Query, command: Command) -> Self` - Reads environment and populates members ## `pub async fn start(self)` -- Initializes `last_activity_at` from `repo.max_activity_at()` so historical activities are not replayed on restart. +- Initializes `last_activity_at` from `query.max_activity_at()` so historical activities are not replayed on restart. - Calls `self.tick` in a loop every 10 seconds. ## `pub async fn tick(self)` -Iterates over `repo.list_activity` since last run and does the following: +Iterates over `query.list_activity` since last run and does the following: - For `create_relay`, `update_relay`, or `deactivate_relay` activity, sync the relay to zooid. - Uses `relay.synced` to decide POST vs PUT (not the activity type), so already-synced relays always use PUT even on restart. - - On success, calls `repo.mark_relay_synced` to set `synced = 1`, `status = 'active'`, and clear `sync_error`. - - On failure, calls `repo.fail_relay_sync`. + - On success, calls `command.mark_relay_synced` to set `synced = 1`, `status = 'active'`, and clear `sync_error`. + - On failure, calls `command.fail_relay_sync`. - All other activity types are ignored (e.g. `fail_relay_sync` must not trigger another sync). ## `async fn sync_relay(&self, relay: &Relay, is_new: bool)` diff --git a/backend/spec/main.md b/backend/spec/main.md index 7125d69..a26b2e0 100644 --- a/backend/spec/main.md +++ b/backend/spec/main.md @@ -1,8 +1,8 @@ # `async fn main() -> Result<()>` - Configures logging -- Creates instances of `Repo`, `Robot`, `Billing`, `Api`, and `Infra` -- Spawns `infra.start` +- Calls `create_pool` to get a `SqlitePool`, then creates `Query`, `Command`, `Robot`, `Billing`, `Api`, and `Infra` - Get an axum router from `api.router` - Adds CORS middleware based on `origins` - Calls `axum::serve` with a listener +- Spawns `infra.start` diff --git a/backend/spec/pool.md b/backend/spec/pool.md new file mode 100644 index 0000000..d42ad79 --- /dev/null +++ b/backend/spec/pool.md @@ -0,0 +1,14 @@ +# `pub async fn create_pool() -> Result` + +Creates and returns a sqlite connection pool. + +Notes: + +- Database table names are singular: `activity`, `tenant`, `relay` + +Steps: + +- Reads `DATABASE_URL` from environment +- Ensures that any directories referred to in `DATABASE_URL` exist +- Initializes the sqlx pool +- Runs migrations found in the `migrations` directory diff --git a/backend/spec/query.md b/backend/spec/query.md new file mode 100644 index 0000000..d5dd856 --- /dev/null +++ b/backend/spec/query.md @@ -0,0 +1,50 @@ +# `pub struct Query` + +Query reads from the database. + +Members: + +- `pool: SqlitePool` - a sqlite connection pool + +## `pub fn new(&self, pool: SqlitePool) -> Self` + +- Assigns pool to self + +## `pub fn list_tenants(&self) -> Result>` + +- Returns all tenants + +## `pub fn get_tenant(&self, pubkey: &str) -> Result` + +- Returns matching tenant + +## `pub fn list_plans() -> Vec` + +- Returns the hardcoded relay plans used by the system (`free`, `basic`, `growth`) +- This is the source of truth for plan metadata exposed via API + +## `pub fn list_relays(&self) -> Result>` + +- Returns all relays + +## `pub fn list_relays_for_tenant(&self, tenant_id: &str) -> Result>` + +- Returns all relays belonging to the given tenant + +## `pub fn get_relay(&self, id: &str) -> Result` + +- Returns matching relay + +## `pub fn max_activity_at(&self) -> Result` + +- Returns the maximum `created_at` value from the activity table, or 0 if empty +- Used by infra to initialize the since guard on startup + +## `pub fn list_activity(&self, since: &i64) -> Result>` + +- Returns all activity occuring after `since` + +## `pub fn list_activity_for_relay(&self, relay_id: &str) -> Result>` + +- Returns all activity where `resource_type = 'relay'` and `resource_id = relay_id` +- Ordered newest-first diff --git a/backend/spec/repo.md b/backend/spec/repo.md deleted file mode 100644 index 59df2c3..0000000 --- a/backend/spec/repo.md +++ /dev/null @@ -1,107 +0,0 @@ -# `pub struct Repo` - -Repo is a wrapper around a sqlite pool which implements methods related to database access. - -Members: - -- `pool: sqlx::SqlitePool` - a sqlite connection pool - -Notes: - -- All public write methods should be run in a transaction so they're atomic -- All writes should be accompanied by an activity log entry of `(tenant, activity_type, resource_type, resource_id)` -- Database table names are singular: `activity`, `tenant`, `relay` - -## `pub fn new() -> Self` - -- Reads `DATABASE_URL` from environment -- Ensures that any directories referred to in `DATABASE_URL` exist -- Initializes its sqlx `pool` -- Runs migrations found in the `migrations` directory. - -## `fn insert_activity(activity_type, resource_type, resource_id) -> Result<()>` - -- Private helper that inserts one row into `activity` -- Infers `tenant` from `resource_type` and `resource_id` -- Used by write methods to avoid repeating audit-log SQL - -## `pub fn list_tenants(&self) -> Result>` - -- Returns all tenants - -## `pub fn get_tenant(&self, pubkey: &str) -> Result` - -- Returns matching tenant - -## `pub fn create_tenant(&self, tenant: &Tenant) -> Result<()>` - -- Creates tenant, may throw sqlite uniqueness error on pubkey -- Logs activity as `(create_tenant, tenant_id)` - -## `pub fn update_tenant(&self, tenant: &Tenant) -> Result<()>` - -- Updates tenant -- Logs activity as `(update_tenant, tenant_id)` - -## `pub fn list_plans() -> Vec` - -- Returns the hardcoded relay plans used by the system (`free`, `basic`, `growth`) -- This is the source of truth for plan metadata exposed via API - -## `pub fn list_relays(&self) -> Result>` - -- Returns all relays - -## `pub fn list_relays_for_tenant(&self, tenant_id: &str) -> Result>` - -- Returns all relays belonging to the given tenant - -## `pub fn get_relay(&self, id: &str) -> Result` - -- Returns matching relay - -## `pub fn create_relay(&self, relay: &Relay) -> Result<()>` - -- Creates relay, may throw sqlite uniqueness error on subdomain -- Sets relay status to `new` -- Logs activity as `(create_relay, relay_id)` - -## `pub fn update_relay(&self, relay: &Relay) -> Result<()>` - -- Updates relay, may throw sqlite uniqueness error on subdomain -- Logs activity as `(update_relay, relay_id)` - -## `pub fn deactivate_relay(&self, relay: &Relay) -> Result<()>` - -- Sets relay status to `inactive` -- Logs activity as `(deactivate_relay, relay_id)` - -## `pub fn activate_relay(&self, relay: &Relay) -> Result<()>` - -- Sets relay status to `active` -- Logs activity as `(activate_relay, relay_id)` - -## `pub fn fail_relay_sync(&self, relay: &Relay, sync_error: &str) -> Result<()>` - -- Sets relay status to `inactive`, sets `sync_error` -- Logs activity as `(fail_relay_sync, relay_id)` - -## `pub fn mark_relay_synced(&self, relay_id: &str) -> Result<()>` - -- Sets `synced = 1`, `status = 'active'`, clears `sync_error` -- No activity log (called by infra after successful sync) - -## `pub fn max_activity_at(&self) -> Result` - -- Returns the maximum `created_at` value from the activity table, or 0 if empty -- Used by infra to initialize the since guard on startup - -## `pub fn list_activity(&self, since: &i64) -> Result>` - -- Returns all activity occuring after `since` - -## `pub fn list_activity_for_relay(&self, relay_id: &str) -> Result>` - -- Returns all activity where `resource_type = 'relay'` and `resource_id = relay_id` -- Ordered newest-first - diff --git a/backend/src/api.rs b/backend/src/api.rs index 6624d96..cf48c4c 100644 --- a/backend/src/api.rs +++ b/backend/src/api.rs @@ -13,14 +13,16 @@ use nostr_sdk::{Event, JsonUtil, Kind}; use serde::{Deserialize, Serialize}; use crate::billing::Billing; +use crate::command::Command; use crate::models::{Relay, Tenant}; -use crate::repo::Repo; +use crate::query::Query; #[derive(Clone)] pub struct Api { host: String, admins: Vec, - repo: Repo, + query: Query, + command: Command, billing: Billing, } @@ -57,7 +59,7 @@ impl IntoResponse for ApiError { } impl Api { - pub fn new(repo: Repo, billing: Billing) -> Self { + pub fn new(query: Query, command: Command, billing: Billing) -> Self { let host = std::env::var("HOST").unwrap_or_else(|_| "127.0.0.1".to_string()); let admins = std::env::var("ADMINS") .unwrap_or_default() @@ -68,7 +70,8 @@ impl Api { Self { host, admins, - repo, + query, + command, billing, } } @@ -302,7 +305,7 @@ async fn list_tenants( let pubkey = state.api.extract_auth_pubkey(&headers)?; state.api.require_admin(&pubkey)?; - match state.api.repo.list_tenants().await { + match state.api.query.list_tenants().await { Ok(tenants) => Ok(ok(StatusCode::OK, tenants)), Err(e) => Ok(err( StatusCode::INTERNAL_SERVER_ERROR, @@ -313,7 +316,7 @@ async fn list_tenants( } async fn list_plans() -> Response { - ok(StatusCode::OK, Repo::list_plans()) + ok(StatusCode::OK, Query::list_plans()) } async fn get_identity( @@ -328,7 +331,7 @@ async fn get_identity( created_at: now_ts(), }; - match state.api.repo.create_tenant(&tenant).await { + match state.api.command.create_tenant(&tenant).await { Ok(()) => true, Err(e) if matches!(map_unique_error(&e), Some("pubkey-exists")) => true, Err(e) => { @@ -350,7 +353,7 @@ async fn get_identity( } async fn get_plan(Path(id): Path) -> Response { - match Repo::list_plans().into_iter().find(|p| p.id == id) { + match Query::list_plans().into_iter().find(|p| p.id == id) { Some(plan) => ok(StatusCode::OK, plan), None => err(StatusCode::NOT_FOUND, "not-found", "plan not found"), } @@ -364,7 +367,7 @@ async fn get_tenant( let auth = state.api.extract_auth_pubkey(&headers)?; state.api.require_admin_or_tenant(&auth, &pubkey)?; - match state.api.repo.get_tenant(&pubkey).await { + match state.api.query.get_tenant(&pubkey).await { Ok(Some(tenant)) => Ok(ok(StatusCode::OK, tenant)), Ok(None) => Ok(err(StatusCode::NOT_FOUND, "not-found", "tenant not found")), Err(e) => Ok(err( @@ -382,7 +385,7 @@ async fn list_relays( let pubkey = state.api.extract_auth_pubkey(&headers)?; state.api.require_admin(&pubkey)?; - match state.api.repo.list_relays().await { + match state.api.query.list_relays().await { Ok(relays) => Ok(ok(StatusCode::OK, relays)), Err(e) => Ok(err( StatusCode::INTERNAL_SERVER_ERROR, @@ -400,7 +403,7 @@ async fn list_tenant_relays( let auth = state.api.extract_auth_pubkey(&headers)?; state.api.require_admin_or_tenant(&auth, &pubkey)?; - match state.api.repo.list_relays_for_tenant(&pubkey).await { + match state.api.query.list_relays_for_tenant(&pubkey).await { Ok(relays) => Ok(ok(StatusCode::OK, relays)), Err(e) => Ok(err( StatusCode::INTERNAL_SERVER_ERROR, @@ -417,7 +420,7 @@ async fn get_relay( ) -> std::result::Result { let auth = state.api.extract_auth_pubkey(&headers)?; - let relay = match state.api.repo.get_relay(&id).await { + let relay = match state.api.query.get_relay(&id).await { Ok(Some(r)) => r, Ok(None) => return Ok(err(StatusCode::NOT_FOUND, "not-found", "relay not found")), Err(e) => { @@ -441,7 +444,7 @@ async fn list_relay_activity( ) -> std::result::Result { let auth = state.api.extract_auth_pubkey(&headers)?; - let relay = match state.api.repo.get_relay(&id).await { + let relay = match state.api.query.get_relay(&id).await { Ok(Some(r)) => r, Ok(None) => return Ok(err(StatusCode::NOT_FOUND, "not-found", "relay not found")), Err(e) => return Ok(err(StatusCode::INTERNAL_SERVER_ERROR, "internal", &e.to_string())), @@ -449,7 +452,7 @@ async fn list_relay_activity( state.api.require_admin_or_tenant(&auth, &relay.tenant)?; - match state.api.repo.list_activity_for_relay(&id).await { + match state.api.query.list_activity_for_relay(&id).await { Ok(activity) => Ok(ok(StatusCode::OK, serde_json::json!({ "activity": activity }))), Err(e) => Ok(err(StatusCode::INTERNAL_SERVER_ERROR, "internal", &e.to_string())), } @@ -502,7 +505,7 @@ async fn create_relay( } }; - match state.api.repo.create_relay(&relay).await { + match state.api.command.create_relay(&relay).await { Ok(()) => Ok(ok(StatusCode::CREATED, relay)), Err(e) => { if matches!(map_unique_error(&e), Some("subdomain-exists")) { @@ -530,7 +533,7 @@ async fn update_relay( ) -> std::result::Result { let auth = state.api.extract_auth_pubkey(&headers)?; - let mut relay = match state.api.repo.get_relay(&id).await { + let mut relay = match state.api.query.get_relay(&id).await { Ok(Some(r)) => r, Ok(None) => return Ok(err(StatusCode::NOT_FOUND, "not-found", "relay not found")), Err(e) => { @@ -599,7 +602,7 @@ async fn update_relay( } }; - match state.api.repo.update_relay(&relay).await { + match state.api.command.update_relay(&relay).await { Ok(()) => Ok(ok(StatusCode::OK, relay)), Err(e) => { if matches!(map_unique_error(&e), Some("subdomain-exists")) { @@ -626,7 +629,7 @@ async fn deactivate_relay( ) -> std::result::Result { let auth = state.api.extract_auth_pubkey(&headers)?; - let relay = match state.api.repo.get_relay(&id).await { + let relay = match state.api.query.get_relay(&id).await { Ok(Some(r)) => r, Ok(None) => return Ok(err(StatusCode::NOT_FOUND, "not-found", "relay not found")), Err(e) => { @@ -665,7 +668,7 @@ async fn reactivate_relay( ) -> std::result::Result { let auth = state.api.extract_auth_pubkey(&headers)?; - let relay = match state.api.repo.get_relay(&id).await { + let relay = match state.api.query.get_relay(&id).await { Ok(Some(r)) => r, Ok(None) => return Ok(err(StatusCode::NOT_FOUND, "not-found", "relay not found")), Err(e) => { @@ -706,7 +709,7 @@ async fn update_tenant( let auth = state.api.extract_auth_pubkey(&headers)?; state.api.require_admin_or_tenant(&auth, &pubkey)?; - let mut tenant = match state.api.repo.get_tenant(&pubkey).await { + let mut tenant = match state.api.query.get_tenant(&pubkey).await { Ok(Some(t)) => t, Ok(None) => return Ok(err(StatusCode::NOT_FOUND, "not-found", "tenant not found")), Err(e) => { @@ -722,7 +725,7 @@ async fn update_tenant( tenant.nwc_url = nwc_url; } - match state.api.repo.update_tenant(&tenant).await { + match state.api.command.update_tenant(&tenant).await { Ok(()) => Ok(ok(StatusCode::OK, tenant)), Err(e) => Ok(err( StatusCode::INTERNAL_SERVER_ERROR, @@ -731,4 +734,3 @@ async fn update_tenant( )), } } - diff --git a/backend/src/billing.rs b/backend/src/billing.rs index b8d8ab1..ae29e47 100644 --- a/backend/src/billing.rs +++ b/backend/src/billing.rs @@ -1,40 +1,43 @@ use anyhow::Result; -use crate::repo::Repo; +use crate::command::Command; +use crate::query::Query; use crate::robot::Robot; #[derive(Clone)] pub struct Billing { nwc_url: String, - repo: Repo, + query: Query, + command: Command, robot: Robot, } impl Billing { - pub fn new(repo: Repo, robot: Robot) -> Self { + pub fn new(query: Query, command: Command, robot: Robot) -> Self { let nwc_url = std::env::var("NWC_URL").unwrap_or_default(); Self { nwc_url, - repo, + query, + command, robot, } } pub async fn deactivate_relay(&self, relay_id: &str) -> Result<()> { let relay = self - .repo + .query .get_relay(relay_id) .await? .ok_or_else(|| anyhow::anyhow!("relay not found"))?; - self.repo.deactivate_relay(&relay).await + self.command.deactivate_relay(&relay).await } pub async fn reactivate_relay(&self, relay_id: &str) -> Result<()> { let relay = self - .repo + .query .get_relay(relay_id) .await? .ok_or_else(|| anyhow::anyhow!("relay not found"))?; - self.repo.activate_relay(&relay).await + self.command.activate_relay(&relay).await } } diff --git a/backend/src/command.rs b/backend/src/command.rs new file mode 100644 index 0000000..790b57e --- /dev/null +++ b/backend/src/command.rs @@ -0,0 +1,206 @@ +use anyhow::Result; +use sqlx::{Sqlite, SqlitePool, Transaction}; + +use crate::models::{Relay, Tenant}; + +#[derive(Clone)] +pub struct Command { + pool: SqlitePool, +} + +impl Command { + pub fn new(pool: SqlitePool) -> Self { + Self { pool } + } + + async fn insert_activity( + tx: &mut Transaction<'_, Sqlite>, + activity_type: &str, + resource_type: &str, + resource_id: &str, + ) -> Result<()> { + let tenant = match resource_type { + "tenant" => resource_id.to_string(), + "relay" => { + sqlx::query_scalar::<_, String>("SELECT tenant FROM relay WHERE id = ?") + .bind(resource_id) + .fetch_one(&mut **tx) + .await? + } + _ => anyhow::bail!("unknown resource_type: {}", resource_type), + }; + + sqlx::query( + "INSERT INTO activity (id, tenant, created_at, activity_type, resource_type, resource_id) + VALUES (?, ?, strftime('%s','now'), ?, ?, ?)", + ) + .bind(uuid::Uuid::new_v4().to_string()) + .bind(tenant) + .bind(activity_type) + .bind(resource_type) + .bind(resource_id) + .execute(&mut **tx) + .await?; + Ok(()) + } + + pub async fn create_tenant(&self, tenant: &Tenant) -> Result<()> { + let mut tx = self.pool.begin().await?; + + sqlx::query( + "INSERT INTO tenant (pubkey, nwc_url, created_at) + VALUES (?, ?, ?)", + ) + .bind(&tenant.pubkey) + .bind(&tenant.nwc_url) + .bind(tenant.created_at) + .execute(&mut *tx) + .await?; + + Self::insert_activity(&mut tx, "create_tenant", "tenant", &tenant.pubkey).await?; + + tx.commit().await?; + Ok(()) + } + + pub async fn update_tenant(&self, tenant: &Tenant) -> Result<()> { + let mut tx = self.pool.begin().await?; + + sqlx::query("UPDATE tenant SET nwc_url = ? WHERE pubkey = ?") + .bind(&tenant.nwc_url) + .bind(&tenant.pubkey) + .execute(&mut *tx) + .await?; + + Self::insert_activity(&mut tx, "update_tenant", "tenant", &tenant.pubkey).await?; + + tx.commit().await?; + Ok(()) + } + + pub async fn create_relay(&self, relay: &Relay) -> Result<()> { + let mut tx = self.pool.begin().await?; + + sqlx::query( + "INSERT INTO relay ( + id, tenant, schema, subdomain, plan, status, sync_error, + info_name, info_icon, info_description, + policy_public_join, policy_strip_signatures, + groups_enabled, management_enabled, blossom_enabled, + livekit_enabled, push_enabled + ) VALUES (?, ?, ?, ?, ?, 'new', ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", + ) + .bind(&relay.id) + .bind(&relay.tenant) + .bind(&relay.schema) + .bind(&relay.subdomain) + .bind(&relay.plan) + .bind(&relay.sync_error) + .bind(&relay.info_name) + .bind(&relay.info_icon) + .bind(&relay.info_description) + .bind(relay.policy_public_join) + .bind(relay.policy_strip_signatures) + .bind(relay.groups_enabled) + .bind(relay.management_enabled) + .bind(relay.blossom_enabled) + .bind(relay.livekit_enabled) + .bind(relay.push_enabled) + .execute(&mut *tx) + .await?; + + Self::insert_activity(&mut tx, "create_relay", "relay", &relay.id).await?; + + tx.commit().await?; + Ok(()) + } + + pub async fn update_relay(&self, relay: &Relay) -> Result<()> { + let mut tx = self.pool.begin().await?; + + sqlx::query( + "UPDATE relay + SET tenant = ?, schema = ?, subdomain = ?, plan = ?, status = ?, sync_error = ?, + info_name = ?, info_icon = ?, info_description = ?, + policy_public_join = ?, policy_strip_signatures = ?, + groups_enabled = ?, management_enabled = ?, blossom_enabled = ?, + livekit_enabled = ?, push_enabled = ? + WHERE id = ?", + ) + .bind(&relay.tenant) + .bind(&relay.schema) + .bind(&relay.subdomain) + .bind(&relay.plan) + .bind(&relay.status) + .bind(&relay.sync_error) + .bind(&relay.info_name) + .bind(&relay.info_icon) + .bind(&relay.info_description) + .bind(relay.policy_public_join) + .bind(relay.policy_strip_signatures) + .bind(relay.groups_enabled) + .bind(relay.management_enabled) + .bind(relay.blossom_enabled) + .bind(relay.livekit_enabled) + .bind(relay.push_enabled) + .bind(&relay.id) + .execute(&mut *tx) + .await?; + + Self::insert_activity(&mut tx, "update_relay", "relay", &relay.id).await?; + + tx.commit().await?; + Ok(()) + } + + pub async fn deactivate_relay(&self, relay: &Relay) -> Result<()> { + let mut tx = self.pool.begin().await?; + + sqlx::query("UPDATE relay SET status = 'inactive' WHERE id = ?") + .bind(&relay.id) + .execute(&mut *tx) + .await?; + + Self::insert_activity(&mut tx, "deactivate_relay", "relay", &relay.id).await?; + + tx.commit().await?; + Ok(()) + } + + pub async fn activate_relay(&self, relay: &Relay) -> Result<()> { + let mut tx = self.pool.begin().await?; + + sqlx::query("UPDATE relay SET status = 'active' WHERE id = ?") + .bind(&relay.id) + .execute(&mut *tx) + .await?; + + Self::insert_activity(&mut tx, "activate_relay", "relay", &relay.id).await?; + + tx.commit().await?; + Ok(()) + } + + pub async fn fail_relay_sync(&self, relay: &Relay, sync_error: String) -> Result<()> { + let mut tx = self.pool.begin().await?; + + sqlx::query("UPDATE relay SET status = 'inactive', sync_error = ? WHERE id = ?") + .bind(&sync_error) + .bind(&relay.id) + .execute(&mut *tx) + .await?; + + Self::insert_activity(&mut tx, "fail_relay_sync", "relay", &relay.id).await?; + + tx.commit().await?; + Ok(()) + } + + pub async fn mark_relay_synced(&self, relay_id: &str) -> Result<()> { + sqlx::query("UPDATE relay SET synced = 1, status = 'active', sync_error = '' WHERE id = ?") + .bind(relay_id) + .execute(&self.pool) + .await?; + Ok(()) + } +} diff --git a/backend/src/infra.rs b/backend/src/infra.rs index 766a4f1..f57c90a 100644 --- a/backend/src/infra.rs +++ b/backend/src/infra.rs @@ -2,7 +2,8 @@ use anyhow::Result; use nostr_sdk::prelude::*; use tokio::sync::Mutex; -use crate::repo::Repo; +use crate::command::Command; +use crate::query::Query; #[derive(Clone)] pub struct Infra { @@ -12,12 +13,13 @@ pub struct Infra { livekit_api_key: String, livekit_api_secret: String, api_secret: String, - repo: Repo, + query: Query, + command: Command, last_activity_at: std::sync::Arc>, } impl Infra { - pub fn new(repo: Repo) -> Self { + pub fn new(query: Query, command: Command) -> Self { let api_url = std::env::var("ZOOID_API_URL").unwrap_or_default(); let relay_domain = std::env::var("RELAY_DOMAIN").unwrap_or_default(); let livekit_url = std::env::var("LIVEKIT_URL").unwrap_or_default(); @@ -31,14 +33,15 @@ impl Infra { livekit_api_key, livekit_api_secret, api_secret, - repo, + query, + command, last_activity_at: std::sync::Arc::new(Mutex::new(0)), } } pub async fn start(self) { // Initialize from DB so we don't replay historical activities on restart - match self.repo.max_activity_at().await { + match self.query.max_activity_at().await { Ok(ts) => *self.last_activity_at.lock().await = ts, Err(e) => tracing::error!(error = %e, "failed to read max activity timestamp"), } @@ -55,7 +58,7 @@ impl Infra { pub async fn tick(&self) -> Result<()> { let mut since_guard = self.last_activity_at.lock().await; let since = *since_guard; - let activity = self.repo.list_activity(&since).await?; + let activity = self.query.list_activity(&since).await?; for a in activity { let needs_sync = matches!( @@ -64,7 +67,7 @@ impl Infra { ); if needs_sync { - let Some(relay) = self.repo.get_relay(&a.resource_id).await? else { + let Some(relay) = self.query.get_relay(&a.resource_id).await? else { continue; }; @@ -73,11 +76,11 @@ impl Infra { match self.sync_relay(&relay, is_new).await { Ok(()) => { tracing::info!(relay = %relay.id, "relay sync succeeded"); - self.repo.mark_relay_synced(&relay.id).await? + self.command.mark_relay_synced(&relay.id).await? } Err(e) => { tracing::warn!(relay = %relay.id, error = %e, "relay sync failed"); - self.repo.fail_relay_sync(&relay, e.to_string()).await?; + self.command.fail_relay_sync(&relay, e.to_string()).await?; } } } diff --git a/backend/src/lib.rs b/backend/src/lib.rs index 7b16abe..98f3936 100644 --- a/backend/src/lib.rs +++ b/backend/src/lib.rs @@ -1,6 +1,8 @@ pub mod api; pub mod billing; +pub mod command; pub mod infra; pub mod models; -pub mod repo; +pub mod pool; +pub mod query; pub mod robot; diff --git a/backend/src/main.rs b/backend/src/main.rs index 01c58fb..1174a4b 100644 --- a/backend/src/main.rs +++ b/backend/src/main.rs @@ -1,8 +1,10 @@ mod api; mod billing; +mod command; mod infra; mod models; -mod repo; +mod query; +mod pool; mod robot; use anyhow::Result; @@ -12,8 +14,9 @@ use tower_http::cors::{AllowOrigin, CorsLayer}; use crate::api::Api; use crate::billing::Billing; +use crate::command::Command; use crate::infra::Infra; -use crate::repo::Repo; +use crate::query::Query; use crate::robot::Robot; #[tokio::main] @@ -25,11 +28,13 @@ async fn main() -> Result<()> { .with(tracing_subscriber::fmt::layer()) .init(); - let repo = Repo::new().await?; + let pool = pool::create_pool().await?; let robot = Robot::new().await?; - let billing = Billing::new(repo.clone(), robot.clone()); - let infra = Infra::new(repo.clone()); - let api = Api::new(repo, billing.clone()); + let query = Query::new(pool.clone()); + let command = Command::new(pool); + let billing = Billing::new(query.clone(), command.clone(), robot.clone()); + let infra = Infra::new(query.clone(), command.clone()); + let api = Api::new(query, command, billing.clone()); let host = std::env::var("HOST").unwrap_or_else(|_| "127.0.0.1".to_string()); let port: u16 = std::env::var("PORT") diff --git a/backend/src/pool.rs b/backend/src/pool.rs new file mode 100644 index 0000000..1fbe829 --- /dev/null +++ b/backend/src/pool.rs @@ -0,0 +1,51 @@ +use std::path::Path; +use std::str::FromStr; + +use anyhow::Result; +use sqlx::{ + SqlitePool, + sqlite::{SqliteConnectOptions, SqlitePoolOptions}, +}; + +pub async fn create_pool() -> Result { + let raw_database_url = std::env::var("DATABASE_URL") + .unwrap_or_else(|_| format!("sqlite://{}/data/caravel.db", env!("CARGO_MANIFEST_DIR"))); + let database_url = normalize_sqlite_url(&raw_database_url); + + if let Some(path) = database_url.strip_prefix("sqlite://") + && !path.is_empty() + && path != ":memory:" + && let Some(parent) = Path::new(path).parent() + && !parent.as_os_str().is_empty() + { + std::fs::create_dir_all(parent)?; + } + + let connect_options = + SqliteConnectOptions::from_str(&database_url)?.create_if_missing(true); + + let pool = SqlitePoolOptions::new() + .max_connections(5) + .connect_with(connect_options) + .await?; + + sqlx::query("PRAGMA journal_mode = WAL;") + .execute(&pool) + .await?; + + sqlx::migrate!("./migrations").run(&pool).await?; + + Ok(pool) +} + +fn normalize_sqlite_url(url: &str) -> String { + let Some(path) = url.strip_prefix("sqlite://") else { + return url.to_string(); + }; + + if path.is_empty() || path == ":memory:" || Path::new(path).is_absolute() { + return url.to_string(); + } + + format!("sqlite://{}/{}", env!("CARGO_MANIFEST_DIR"), path) +} diff --git a/backend/src/query.rs b/backend/src/query.rs new file mode 100644 index 0000000..9782055 --- /dev/null +++ b/backend/src/query.rs @@ -0,0 +1,150 @@ +use anyhow::Result; +use sqlx::SqlitePool; + +use crate::models::{Activity, Plan, Relay, Tenant}; + +#[derive(Clone)] +pub struct Query { + pool: SqlitePool, +} + +impl Query { + pub fn new(pool: SqlitePool) -> Self { + Self { pool } + } + + pub async fn list_tenants(&self) -> Result> { + let rows = sqlx::query_as::<_, Tenant>( + "SELECT pubkey, nwc_url, created_at + FROM tenant + ORDER BY pubkey", + ) + .fetch_all(&self.pool) + .await?; + Ok(rows) + } + + pub async fn get_tenant(&self, pubkey: &str) -> Result> { + let row = sqlx::query_as::<_, Tenant>( + "SELECT pubkey, nwc_url, created_at + FROM tenant + WHERE pubkey = ?", + ) + .bind(pubkey) + .fetch_optional(&self.pool) + .await?; + Ok(row) + } + + pub fn list_plans() -> Vec { + vec![ + Plan { + id: "free".to_string(), + name: "Free".to_string(), + sats: 0, + members: Some(10), + blossom: false, + livekit: false, + }, + Plan { + id: "basic".to_string(), + name: "Basic".to_string(), + sats: 10_000, + members: Some(100), + blossom: true, + livekit: true, + }, + Plan { + id: "growth".to_string(), + name: "Growth".to_string(), + sats: 50_000, + members: None, + blossom: true, + livekit: true, + }, + ] + } + + pub async fn list_relays(&self) -> Result> { + let rows = sqlx::query_as::<_, Relay>( + "SELECT id, tenant, schema, subdomain, plan, status, sync_error, + info_name, info_icon, info_description, + policy_public_join, policy_strip_signatures, + groups_enabled, management_enabled, blossom_enabled, + livekit_enabled, push_enabled, synced + FROM relay + ORDER BY id", + ) + .fetch_all(&self.pool) + .await?; + Ok(rows) + } + + pub async fn list_relays_for_tenant(&self, tenant_id: &str) -> Result> { + let rows = sqlx::query_as::<_, Relay>( + "SELECT id, tenant, schema, subdomain, plan, status, sync_error, + info_name, info_icon, info_description, + policy_public_join, policy_strip_signatures, + groups_enabled, management_enabled, blossom_enabled, + livekit_enabled, push_enabled, synced + FROM relay + WHERE tenant = ? + ORDER BY id", + ) + .bind(tenant_id) + .fetch_all(&self.pool) + .await?; + Ok(rows) + } + + pub async fn get_relay(&self, id: &str) -> Result> { + let row = sqlx::query_as::<_, Relay>( + "SELECT id, tenant, schema, subdomain, plan, status, sync_error, + info_name, info_icon, info_description, + policy_public_join, policy_strip_signatures, + groups_enabled, management_enabled, blossom_enabled, + livekit_enabled, push_enabled, synced + FROM relay + WHERE id = ?", + ) + .bind(id) + .fetch_optional(&self.pool) + .await?; + Ok(row) + } + + pub async fn max_activity_at(&self) -> Result { + let val = sqlx::query_scalar::<_, Option>( + "SELECT MAX(created_at) FROM activity", + ) + .fetch_one(&self.pool) + .await?; + Ok(val.unwrap_or(0)) + } + + pub async fn list_activity(&self, since: &i64) -> Result> { + let rows = sqlx::query_as::<_, Activity>( + "SELECT id, tenant, created_at, activity_type, resource_type, resource_id + FROM activity + WHERE created_at > ? + ORDER BY created_at, id", + ) + .bind(since) + .fetch_all(&self.pool) + .await?; + Ok(rows) + } + + pub async fn list_activity_for_relay(&self, relay_id: &str) -> Result> { + let rows = sqlx::query_as::<_, Activity>( + "SELECT id, tenant, created_at, activity_type, resource_type, resource_id + FROM activity + WHERE resource_type = 'relay' AND resource_id = ? + ORDER BY created_at DESC, id DESC", + ) + .bind(relay_id) + .fetch_all(&self.pool) + .await?; + Ok(rows) + } +} diff --git a/backend/src/repo.rs b/backend/src/repo.rs deleted file mode 100644 index d1adc5f..0000000 --- a/backend/src/repo.rs +++ /dev/null @@ -1,372 +0,0 @@ -use std::path::Path; -use std::str::FromStr; - -use anyhow::Result; -use sqlx::{ - Sqlite, SqlitePool, Transaction, - sqlite::{SqliteConnectOptions, SqlitePoolOptions}, -}; - -use crate::models::{Activity, Plan, Relay, Tenant}; - -#[derive(Clone)] -pub struct Repo { - pub pool: SqlitePool, -} - -impl Repo { - pub async fn new() -> Result { - let raw_database_url = std::env::var("DATABASE_URL") - .unwrap_or_else(|_| format!("sqlite://{}/data/caravel.db", env!("CARGO_MANIFEST_DIR"))); - let database_url = normalize_sqlite_url(&raw_database_url); - - if let Some(path) = database_url.strip_prefix("sqlite://") - && !path.is_empty() - && path != ":memory:" - && let Some(parent) = Path::new(path).parent() - && !parent.as_os_str().is_empty() - { - std::fs::create_dir_all(parent)?; - } - - let connect_options = - SqliteConnectOptions::from_str(&database_url)?.create_if_missing(true); - - let pool = SqlitePoolOptions::new() - .max_connections(5) - .connect_with(connect_options) - .await?; - - sqlx::query("PRAGMA journal_mode = WAL;") - .execute(&pool) - .await?; - - sqlx::migrate!("./migrations").run(&pool).await?; - - Ok(Self { pool }) - } - - async fn insert_activity( - tx: &mut Transaction<'_, Sqlite>, - activity_type: &str, - resource_type: &str, - resource_id: &str, - ) -> Result<()> { - let tenant = match resource_type { - "tenant" => resource_id.to_string(), - "relay" => { - sqlx::query_scalar::<_, String>("SELECT tenant FROM relay WHERE id = ?") - .bind(resource_id) - .fetch_one(&mut **tx) - .await? - } - _ => anyhow::bail!("unknown resource_type: {}", resource_type), - }; - - sqlx::query( - "INSERT INTO activity (id, tenant, created_at, activity_type, resource_type, resource_id) - VALUES (?, ?, strftime('%s','now'), ?, ?, ?)", - ) - .bind(uuid::Uuid::new_v4().to_string()) - .bind(tenant) - .bind(activity_type) - .bind(resource_type) - .bind(resource_id) - .execute(&mut **tx) - .await?; - Ok(()) - } - - pub async fn list_tenants(&self) -> Result> { - let rows = sqlx::query_as::<_, Tenant>( - "SELECT pubkey, nwc_url, created_at - FROM tenant - ORDER BY pubkey", - ) - .fetch_all(&self.pool) - .await?; - Ok(rows) - } - - pub async fn get_tenant(&self, pubkey: &str) -> Result> { - let row = sqlx::query_as::<_, Tenant>( - "SELECT pubkey, nwc_url, created_at - FROM tenant - WHERE pubkey = ?", - ) - .bind(pubkey) - .fetch_optional(&self.pool) - .await?; - Ok(row) - } - - pub async fn create_tenant(&self, tenant: &Tenant) -> Result<()> { - let mut tx = self.pool.begin().await?; - - sqlx::query( - "INSERT INTO tenant (pubkey, nwc_url, created_at) - VALUES (?, ?, ?)", - ) - .bind(&tenant.pubkey) - .bind(&tenant.nwc_url) - .bind(tenant.created_at) - .execute(&mut *tx) - .await?; - - Self::insert_activity(&mut tx, "create_tenant", "tenant", &tenant.pubkey).await?; - - tx.commit().await?; - Ok(()) - } - - pub async fn update_tenant(&self, tenant: &Tenant) -> Result<()> { - let mut tx = self.pool.begin().await?; - - sqlx::query("UPDATE tenant SET nwc_url = ? WHERE pubkey = ?") - .bind(&tenant.nwc_url) - .bind(&tenant.pubkey) - .execute(&mut *tx) - .await?; - - Self::insert_activity(&mut tx, "update_tenant", "tenant", &tenant.pubkey).await?; - - tx.commit().await?; - Ok(()) - } - - pub async fn list_relays(&self) -> Result> { - let rows = sqlx::query_as::<_, Relay>( - "SELECT id, tenant, schema, subdomain, plan, status, sync_error, - info_name, info_icon, info_description, - policy_public_join, policy_strip_signatures, - groups_enabled, management_enabled, blossom_enabled, - livekit_enabled, push_enabled, synced - FROM relay - ORDER BY id", - ) - .fetch_all(&self.pool) - .await?; - Ok(rows) - } - - pub async fn list_relays_for_tenant(&self, tenant_id: &str) -> Result> { - let rows = sqlx::query_as::<_, Relay>( - "SELECT id, tenant, schema, subdomain, plan, status, sync_error, - info_name, info_icon, info_description, - policy_public_join, policy_strip_signatures, - groups_enabled, management_enabled, blossom_enabled, - livekit_enabled, push_enabled, synced - FROM relay - WHERE tenant = ? - ORDER BY id", - ) - .bind(tenant_id) - .fetch_all(&self.pool) - .await?; - Ok(rows) - } - - pub async fn get_relay(&self, id: &str) -> Result> { - let row = sqlx::query_as::<_, Relay>( - "SELECT id, tenant, schema, subdomain, plan, status, sync_error, - info_name, info_icon, info_description, - policy_public_join, policy_strip_signatures, - groups_enabled, management_enabled, blossom_enabled, - livekit_enabled, push_enabled, synced - FROM relay - WHERE id = ?", - ) - .bind(id) - .fetch_optional(&self.pool) - .await?; - Ok(row) - } - - pub async fn create_relay(&self, relay: &Relay) -> Result<()> { - let mut tx = self.pool.begin().await?; - - sqlx::query( - "INSERT INTO relay ( - id, tenant, schema, subdomain, plan, status, sync_error, - info_name, info_icon, info_description, - policy_public_join, policy_strip_signatures, - groups_enabled, management_enabled, blossom_enabled, - livekit_enabled, push_enabled - ) VALUES (?, ?, ?, ?, ?, 'new', ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", - ) - .bind(&relay.id) - .bind(&relay.tenant) - .bind(&relay.schema) - .bind(&relay.subdomain) - .bind(&relay.plan) - .bind(&relay.sync_error) - .bind(&relay.info_name) - .bind(&relay.info_icon) - .bind(&relay.info_description) - .bind(relay.policy_public_join) - .bind(relay.policy_strip_signatures) - .bind(relay.groups_enabled) - .bind(relay.management_enabled) - .bind(relay.blossom_enabled) - .bind(relay.livekit_enabled) - .bind(relay.push_enabled) - .execute(&mut *tx) - .await?; - - Self::insert_activity(&mut tx, "create_relay", "relay", &relay.id).await?; - - tx.commit().await?; - Ok(()) - } - - pub async fn update_relay(&self, relay: &Relay) -> Result<()> { - let mut tx = self.pool.begin().await?; - - sqlx::query( - "UPDATE relay - SET tenant = ?, schema = ?, subdomain = ?, plan = ?, status = ?, sync_error = ?, - info_name = ?, info_icon = ?, info_description = ?, - policy_public_join = ?, policy_strip_signatures = ?, - groups_enabled = ?, management_enabled = ?, blossom_enabled = ?, - livekit_enabled = ?, push_enabled = ? - WHERE id = ?", - ) - .bind(&relay.tenant) - .bind(&relay.schema) - .bind(&relay.subdomain) - .bind(&relay.plan) - .bind(&relay.status) - .bind(&relay.sync_error) - .bind(&relay.info_name) - .bind(&relay.info_icon) - .bind(&relay.info_description) - .bind(relay.policy_public_join) - .bind(relay.policy_strip_signatures) - .bind(relay.groups_enabled) - .bind(relay.management_enabled) - .bind(relay.blossom_enabled) - .bind(relay.livekit_enabled) - .bind(relay.push_enabled) - .bind(&relay.id) - .execute(&mut *tx) - .await?; - - Self::insert_activity(&mut tx, "update_relay", "relay", &relay.id).await?; - - tx.commit().await?; - Ok(()) - } - - pub async fn deactivate_relay(&self, relay: &Relay) -> Result<()> { - let mut tx = self.pool.begin().await?; - - sqlx::query("UPDATE relay SET status = 'inactive' WHERE id = ?") - .bind(&relay.id) - .execute(&mut *tx) - .await?; - - Self::insert_activity(&mut tx, "deactivate_relay", "relay", &relay.id).await?; - - tx.commit().await?; - Ok(()) - } - - pub async fn fail_relay_sync(&self, relay: &Relay, sync_error: String) -> Result<()> { - let mut tx = self.pool.begin().await?; - - sqlx::query("UPDATE relay SET status = 'inactive', sync_error = ? WHERE id = ?") - .bind(&sync_error) - .bind(&relay.id) - .execute(&mut *tx) - .await?; - - Self::insert_activity(&mut tx, "fail_relay_sync", "relay", &relay.id).await?; - - tx.commit().await?; - Ok(()) - } - - pub async fn mark_relay_synced(&self, relay_id: &str) -> Result<()> { - sqlx::query("UPDATE relay SET synced = 1, status = 'active', sync_error = '' WHERE id = ?") - .bind(relay_id) - .execute(&self.pool) - .await?; - Ok(()) - } - - pub async fn max_activity_at(&self) -> Result { - let val = sqlx::query_scalar::<_, Option>( - "SELECT MAX(created_at) FROM activity", - ) - .fetch_one(&self.pool) - .await?; - Ok(val.unwrap_or(0)) - } - - pub async fn list_activity(&self, since: &i64) -> Result> { - let rows = sqlx::query_as::<_, Activity>( - "SELECT id, tenant, created_at, activity_type, resource_type, resource_id - FROM activity - WHERE created_at > ? - ORDER BY created_at, id", - ) - .bind(since) - .fetch_all(&self.pool) - .await?; - Ok(rows) - } - - pub async fn list_activity_for_relay(&self, relay_id: &str) -> Result> { - let rows = sqlx::query_as::<_, Activity>( - "SELECT id, tenant, created_at, activity_type, resource_type, resource_id - FROM activity - WHERE resource_type = 'relay' AND resource_id = ? - ORDER BY created_at DESC, id DESC", - ) - .bind(relay_id) - .fetch_all(&self.pool) - .await?; - Ok(rows) - } - - pub fn list_plans() -> Vec { - vec![ - Plan { - id: "free".to_string(), - name: "Free".to_string(), - sats: 0, - members: Some(10), - blossom: false, - livekit: false, - }, - Plan { - id: "basic".to_string(), - name: "Basic".to_string(), - sats: 10_000, - members: Some(100), - blossom: true, - livekit: true, - }, - Plan { - id: "growth".to_string(), - name: "Growth".to_string(), - sats: 50_000, - members: None, - blossom: true, - livekit: true, - }, - ] - } -} - -fn normalize_sqlite_url(url: &str) -> String { - let Some(path) = url.strip_prefix("sqlite://") else { - return url.to_string(); - }; - - if path.is_empty() || path == ":memory:" || Path::new(path).is_absolute() { - return url.to_string(); - } - - format!("sqlite://{}/{}", env!("CARGO_MANIFEST_DIR"), path) -} diff --git a/todo.md b/todo.md index 32b6bfe..73d56ef 100644 --- a/todo.md +++ b/todo.md @@ -1,3 +1,3 @@ -- [ ] Split repo into queries and commands +- [ ] Update infra to listen to sqlite - [ ] Fix billing by using stripe as a backend to do proration, then mark invoices paid manually when using bitcoin. - [ ] Send a payment link instead of an invoice so we can generate/pay on the fly