diff --git a/backend/migrations/0001_init.sql b/backend/migrations/0001_init.sql index 2acf311..1dd4688 100644 --- a/backend/migrations/0001_init.sql +++ b/backend/migrations/0001_init.sql @@ -10,8 +10,7 @@ CREATE TABLE IF NOT EXISTS activity ( CREATE TABLE IF NOT EXISTS tenant ( pubkey TEXT PRIMARY KEY, nwc_url TEXT NOT NULL DEFAULT '', - created_at INTEGER NOT NULL, - billing_anchor INTEGER NOT NULL + created_at INTEGER NOT NULL ); CREATE TABLE IF NOT EXISTS relay ( @@ -21,6 +20,7 @@ CREATE TABLE IF NOT EXISTS relay ( subdomain TEXT NOT NULL UNIQUE, plan TEXT NOT NULL, status TEXT NOT NULL, + synced INTEGER NOT NULL DEFAULT 0, sync_error TEXT NOT NULL DEFAULT '', info_name TEXT NOT NULL DEFAULT '', info_icon TEXT NOT NULL DEFAULT '', @@ -34,28 +34,3 @@ CREATE TABLE IF NOT EXISTS relay ( push_enabled INTEGER NOT NULL DEFAULT 1, FOREIGN KEY (tenant) REFERENCES tenant(pubkey) ); - -CREATE TABLE IF NOT EXISTS invoice ( - id TEXT PRIMARY KEY, - tenant TEXT NOT NULL, - status TEXT NOT NULL, - created_at INTEGER NOT NULL, - attempted_at INTEGER NOT NULL DEFAULT 0, - error TEXT NOT NULL DEFAULT '', - closed_at INTEGER NOT NULL DEFAULT 0, - sent_at INTEGER NOT NULL DEFAULT 0, - paid_at INTEGER NOT NULL DEFAULT 0, - bolt11 TEXT NOT NULL, - period_start INTEGER NOT NULL, - period_end INTEGER NOT NULL, - FOREIGN KEY (tenant) REFERENCES tenant(pubkey) -); - -CREATE TABLE IF NOT EXISTS invoice_item ( - id TEXT PRIMARY KEY, - invoice TEXT NOT NULL, - relay TEXT NOT NULL, - sats INTEGER NOT NULL, - FOREIGN KEY (invoice) REFERENCES invoice(id), - FOREIGN KEY (relay) REFERENCES relay(id) -); diff --git a/backend/migrations/0002_relay_synced.sql b/backend/migrations/0002_relay_synced.sql deleted file mode 100644 index ad7d37c..0000000 --- a/backend/migrations/0002_relay_synced.sql +++ /dev/null @@ -1 +0,0 @@ -ALTER TABLE relay ADD COLUMN synced INTEGER NOT NULL DEFAULT 0; diff --git a/backend/spec/api.md b/backend/spec/api.md index 89e6c85..e304ccc 100644 --- a/backend/spec/api.md +++ b/backend/spec/api.md @@ -21,11 +21,9 @@ Notes: - Reads environment and populates members -## `pub fn serve(&self) -> Result<()>` +## `pub fn router(&self) -> Result<()>` -- Initializes an `axum::Router` -- Adds CORS middleware based on `origins` -- Calls `axum::serve` with a listener +- Returns an `axum::Router` --- Plan routes @@ -65,25 +63,19 @@ Notes: - Authorizes admin or matching tenant - Return `data` is a single tenant struct from `repo.get_tenant` +## `async fn update_tenant(...) -> Response` + +- Serves `PUT /tenants/:pubkey` +- Authorizes admin or matching tenant +- Updates tenant using `repo.update_tenant` +- Return `data` is the updated tenant struct + ## `async fn list_tenant_relays(...) -> Response` - Serves `GET /tenants/:pubkey/relays` - Authorizes admin or matching tenant - Return `data` is a list of relay structs from `repo.list_relays_for_tenant` -## `async fn list_tenant_invoices(...) -> Response` - -- Serves `GET /tenants/:pubkey/invoices` -- Authorizes admin or matching tenant -- Return `data` is a list of invoice structs from `repo.list_invoices_for_tenant` - -## `async fn update_tenant_billing(...) -> Response` - -- Serves `PUT /tenants/:pubkey/billing` -- Authorizes admin or matching tenant -- Updates tenant billing NWC URL using `repo.update_tenant_nwc_url` -- Return `data` is the submitted billing payload - --- Relay routes ## `async fn list_relays(...) -> Response` @@ -120,28 +112,24 @@ Notes: - Serves `GET /relays/:id/activity` - Authorizes admin or relay owner -- Return `data` is a list of activity structs from `repo.list_activity_for_relay` +- Get activity from `repo.list_activity_for_relay` +- Return `data` is `{activity}` ## `async fn deactivate_relay(...) -> Response` - Serves `POST /relays/:id/deactivate` - Authorizes admin or relay owner -- Deactivates relay using `repo.deactivate_relay` +- If relay is already active, return a `400` with `code=relay-is-inactive` +- Call `billing.deactivate_relay` - Return `data` is empty ---- Invoice routes +## `async fn reactivate_relay(...) -> Response` -## `async fn list_invoices(...) -> Response` - -- Serves `GET /invoices` -- Authorizes admin only -- Return `data` is a list of invoice structs from `repo.list_invoices` - -## `async fn get_invoice(...) -> Response` - -- Serves `GET /invoices/:id` -- Authorizes admin or invoice owner -- Return `data` is a single invoice struct from `repo.get_invoice` +- Serves `POST /relays/:id/reactivate` +- Authorizes admin or relay owner +- If relay is already active, return a `400` with `code=relay-is-active` +- Call `billing.reactivate_relay` +- Return `data` is empty --- Utilities diff --git a/backend/spec/billing.md b/backend/spec/billing.md index fe28c18..0d21a9a 100644 --- a/backend/spec/billing.md +++ b/backend/spec/billing.md @@ -1,50 +1,13 @@ # `pub struct Billing` -Billing is a service which polls the database, creates invoices, and attempts to collect payment for invoices. +Billing encapsulates logic related to synchronizing state with Stripe. Members: -- `nwc_url: String` - a nostr wallet connect URL used to **create** invoices +- `nwc_url: String` - a nostr wallet connect URL used to **create** bolt11 invoices - `repo: Repo` - `robot: Robot` ## `pub fn new(repo: Repo, robot: Robot) -> Self` - Reads environment and populates members - -## `pub async fn start(self)` - -Calls `self.tick` in a loop every hour. - -## `pub async fn tick(self)` - -Iterates over `repo.list_activity` since last run and does the following: - -- For any `create_relay|update_relay|activate_relay` activity if this is the first non-free relay for the tenant, update tenant's billing anchor to the time the relay was created. - -Also iterates over `repo.list_tenants()` and for each tenant calls `self.generate_invoice_if_due(tenant)` and `self.collect_outstanding(tenant)`. - -## `async fn generate_invoice_if_due(&self, tenant: &Tenant)` - -- Skip tenants that have a `pending` invoice or have no active non-free relays. -- Compute current billing period from `tenant.billing_anchor` as rolling monthly windows: `[period_start, period_end)`. -- Only generate an invoice once the period has closed (`now >= period_end`). -- Load activity needed to compute usage for the tenant using `repo.list_activity`. -- Calculate how many hours (rounded up) each relay was active during the window per paid plan. -- If total invoice amount is 0, return. -- Generate a `bolt11` invoice. If this fails, log the error and return. -- Generate an invoice for the tenant and an invoice item for each relay. -- Persist invoice + items atomically using `repo.create_invoice`. - -## `async fn collect_outstanding(&self, tenant: &Tenant)` - -- Load `pending` tenant invoices and attempt to collect each one. -- If `attempted_at` is less than 24 hours ago, skip it. -- If the `bolt11` invoice has been paid out of band, call `repo.mark_invoice_paid` and return. -- If the tenant has a `nwc_url`, attempt to pay the invoice with nwc. -- If collection succeeds, call `repo.mark_invoice_paid`. -- If collection fails, populate `repo.mark_invoice_attempted`. -- If nwc isn't set up or fails and `sent_at` is not set: - - Send a NIP 17 DM to the user with the invoice included. - - Call `repo.mark_invoice_sent`. -- If the invoice is 7 days past `created_at`, call `repo.mark_invoice_closed`. diff --git a/backend/spec/main.md b/backend/spec/main.md index 1740e24..7125d69 100644 --- a/backend/spec/main.md +++ b/backend/spec/main.md @@ -2,6 +2,7 @@ - Configures logging - Creates instances of `Repo`, `Robot`, `Billing`, `Api`, and `Infra` -- Spawns `billing.start` - Spawns `infra.start` -- Calls `api.serve` +- Get an axum router from `api.router` +- Adds CORS middleware based on `origins` +- Calls `axum::serve` with a listener diff --git a/backend/spec/models.md b/backend/spec/models.md index 75501ac..0af1ea3 100644 --- a/backend/spec/models.md +++ b/backend/spec/models.md @@ -19,18 +19,13 @@ Activity is an audit log of all actions performed by a user or a worker process. - `created_at` - unix timestamp when the activity was created - `activity_type` is one of: - `create_tenant` - - `update_tenant_billing_anchor` - - `update_tenant_nwc_url` + - `update_tenant` - `create_relay` - `update_relay` + - `update_relay_plan` - `activate_relay` - `deactivate_relay` - `fail_relay_sync` - - `create_invoice` - - `mark_invoice_paid` - - `mark_invoice_attempted` - - `mark_invoice_sent` - - `mark_invoice_closed` - `resource_type` is a string identifying the resource type being modified. - `resource_id` is a string identifying the resource id being modified. @@ -58,7 +53,6 @@ Tenants are customers of the service, identified by a nostr `pubkey`. Public met - `pubkey` is the nostr public key identifying the tenant - `nwc_url` (private) a nostr wallet connect URL used for **paying** invoices generated by the system - `created_at` unix timestamp identifying tenant creation time -- `billing_anchor` unix timestamp identifying billing cycle anchor. This gets reset when the tenant has no paid relays and adds (or reactivates) one. # Relay @@ -69,7 +63,8 @@ A relay is a nostr relay owned by a `tenant` and hosted by the attached zooid in - `schema` - the relay's db schema (read_only, calculated based on `subdomain` + `id`) - `subdomain` - the relay's subdomain - `plan` - the relay's plan -- `status` - `new|active|inactive`. Only `active` relays count toward billing. +- `status` - `active|inactive`. Only `active` relays count toward billing. +- `synced` - whether the relay has been successfully synced to zooid at least once. - `sync_error` - a string indicating any errors encountered when synchronizing. - `info_name` - the relay's name - `info_icon` - the relay's icon image URL @@ -81,7 +76,6 @@ A relay is a nostr relay owned by a `tenant` and hosted by the attached zooid in - `blossom_enabled` - whether blossom file storage is enabled - `livekit_enabled` - whether livekit calls are enabled - `push_enabled` - whether relay push is enabled -- `synced` (private) - whether the relay has been successfully synced to zooid at least once. Used by infra to decide POST vs PUT. Some attributes persisted to zooid via API have special handling: @@ -91,29 +85,3 @@ Some attributes persisted to zooid via API have special handling: - The relay's `livekit_*` configuration is inferred based on environment variables and `livekit_enabled`. - The relay's `roles` are hard-coded for now. -# Invoice - -Invoices are generated at the end of a tenant's monthly billing period. The billing module is responsible for creating them, collecting them, and dunning them. - -- `id` - random invoice ID -- `tenant` - tenant pubkey -- `status` - `pending|paid|closed` -- `amount` is derived as the sum of associated invoice item `sats` values (not stored as a separate source of truth) -- `created_at` - unix timestamp for when the invoice was created -- `attempted_at` - nullable unix timestamp for when collection was last attempted -- `error` - optional human-readable error from the last failed collection attempt -- `closed_at` - nullable unix timestamp for when the invoice was closed -- `sent_at` - nullable unix timestamp for when the invoice was sent via DM -- `paid_at` - nullable unix timestamp for when the invoice was paid -- `bolt11` - a BOLT 11 lightning invoice that can be used to pay the invoice -- `period_start` - unix timestamp for period start -- `period_end` - unix timestamp for period end - -# Invoice Item - -Invoice items are attached to an invoice and represent charges for a given relay. - -- `id` - random invoice item ID -- `invoice` - invoice ID -- `relay` - relay ID -- `sats` - amount in satoshis diff --git a/backend/spec/repo.md b/backend/spec/repo.md index 608ada7..59df2c3 100644 --- a/backend/spec/repo.md +++ b/backend/spec/repo.md @@ -10,7 +10,7 @@ Notes: - All public write methods should be run in a transaction so they're atomic - All writes should be accompanied by an activity log entry of `(tenant, activity_type, resource_type, resource_id)` -- Database table names are singular: `activity`, `tenant`, `relay`, `invoice`, `invoice_item` +- Database table names are singular: `activity`, `tenant`, `relay` ## `pub fn new() -> Self` @@ -38,15 +38,10 @@ Notes: - Creates tenant, may throw sqlite uniqueness error on pubkey - Logs activity as `(create_tenant, tenant_id)` -## `pub fn update_tenant_billing_anchor(&self, pubkey: &str, billing_anchor: i64) -> Result<()>` +## `pub fn update_tenant(&self, tenant: &Tenant) -> Result<()>` -- Updates the tenant's `billing_anchor` -- Logs activity as `(update_tenant_billing_anchor, tenant_id)` - -## `pub fn update_tenant_nwc_url(&self, pubkey: &str, nwc_url: &str) -> Result<()>` - -- Updates tenant `nwc_url` -- Logs activity as `(update_tenant_nwc_url, tenant_id)` +- Updates tenant +- Logs activity as `(update_tenant, tenant_id)` ## `pub fn list_plans() -> Vec` @@ -101,59 +96,12 @@ Notes: - Returns the maximum `created_at` value from the activity table, or 0 if empty - Used by infra to initialize the since guard on startup -## `pub fn create_invoice(&self, invoice: &Invoice, invoice_items: [&InvoiceItem]) -> Result<()>` - -- Saves an `invoice` row and related `invoice_item` rows -- Logs activity as `(create_invoice, invoice_id)` - -## `pub fn list_invoices() -> Result>` - -- Returns all invoices - -## `pub fn list_invoices_for_tenant(tenant_id: &str) -> Result>` - -- Returns all matching invoices - -## `pub fn mark_invoice_paid(&self, invoice_id: &str) -> Result<()>` - -- Sets invoice status to `paid` -- Sets `paid_at` to now -- Clears `error` if set -- Logs activity as `(mark_invoice_paid, invoice_id)` - -## `pub fn mark_invoice_attempted(&self, invoice_id: &str, error: &str) -> Result<()>` - -- Sets `attempted_at` to now -- Updates `error` if provided -- Leaves status as `pending` -- Logs activity as `(mark_invoice_attempted, invoice_id)` - -## `pub fn mark_invoice_sent(&self, invoice_id: &str) -> Result<()>` - -- Sets `sent_at` to now -- Leaves status as `pending` -- Logs activity as `(mark_invoice_sent, invoice_id)` - -## `pub fn mark_invoice_closed(&self, invoice_id: &str) -> Result<()>` - -- Sets invoice status to `closed` -- Sets `closed_at` to now -- Logs activity as `(mark_invoice_closed, invoice_id)` - ## `pub fn list_activity(&self, since: &i64) -> Result>` - Returns all activity occuring after `since` -## `pub fn list_activity_for_tenant(&self, tenant: &str, since: &i64) -> Result>` - -- Returns all activity occuring after `since` matching `tenant` - ## `pub fn list_activity_for_relay(&self, relay_id: &str) -> Result>` - Returns all activity where `resource_type = 'relay'` and `resource_id = relay_id` - Ordered newest-first -## `pub fn get_relay_plan_sats(&self, plan: &str) -> Result` - -- Returns the monthly sats amount for a given plan id -- Uses `list_plans()` data for consistent pricing logic across API and billing diff --git a/backend/src/api.rs b/backend/src/api.rs index 7a75bf7..6624d96 100644 --- a/backend/src/api.rs +++ b/backend/src/api.rs @@ -1,6 +1,6 @@ use std::sync::Arc; -use anyhow::{Result, anyhow}; +use anyhow::anyhow; use axum::{ Json, Router, extract::{Path, State}, @@ -11,18 +11,17 @@ use axum::{ use base64::Engine; use nostr_sdk::{Event, JsonUtil, Kind}; use serde::{Deserialize, Serialize}; -use tower_http::cors::{AllowOrigin, CorsLayer}; +use crate::billing::Billing; use crate::models::{Relay, Tenant}; use crate::repo::Repo; #[derive(Clone)] pub struct Api { host: String, - port: u16, admins: Vec, - origins: Vec, repo: Repo, + billing: Billing, } #[derive(Clone)] @@ -58,45 +57,25 @@ impl IntoResponse for ApiError { } impl Api { - pub fn new(repo: Repo) -> Self { + pub fn new(repo: Repo, billing: Billing) -> Self { let host = std::env::var("HOST").unwrap_or_else(|_| "127.0.0.1".to_string()); - let port = std::env::var("PORT") - .ok() - .and_then(|v| v.parse().ok()) - .unwrap_or(3000); let admins = std::env::var("ADMINS") .unwrap_or_default() .split(',') .map(|v| v.trim().to_lowercase()) .filter(|v| !v.is_empty()) .collect(); - let origins = std::env::var("ALLOW_ORIGINS") - .unwrap_or_default() - .split(',') - .map(|v| v.trim().to_string()) - .filter(|v| !v.is_empty()) - .collect(); Self { host, - port, admins, - origins, repo, + billing, } } - pub async fn serve(&self) -> Result<()> { - let app = self.router(); - - let listener = - tokio::net::TcpListener::bind(format!("{}:{}", self.host, self.port)).await?; - axum::serve(listener, app).await?; - Ok(()) - } - - fn router(&self) -> Router { + pub fn router(self) -> Router { let state = AppState { - api: Arc::new(self.clone()), + api: Arc::new(self), }; Router::new() @@ -104,31 +83,14 @@ impl Api { .route("/plans", get(list_plans)) .route("/plans/:id", get(get_plan)) .route("/tenants", get(list_tenants)) - .route("/tenants/:pubkey", get(get_tenant)) + .route("/tenants/:pubkey", get(get_tenant).put(update_tenant)) .route("/tenants/:pubkey/relays", get(list_tenant_relays)) - .route("/tenants/:pubkey/invoices", get(list_tenant_invoices)) - .route("/tenants/:pubkey/billing", put(update_tenant_billing)) .route("/relays", get(list_relays).post(create_relay)) .route("/relays/:id", get(get_relay).put(update_relay)) .route("/relays/:id/activity", get(list_relay_activity)) .route("/relays/:id/deactivate", post(deactivate_relay)) - .route("/invoices", get(list_invoices)) - .route("/invoices/:id", get(get_invoice)) + .route("/relays/:id/reactivate", post(reactivate_relay)) .with_state(state) - .layer(self.cors_layer()) - } - - fn cors_layer(&self) -> CorsLayer { - if self.origins.is_empty() { - CorsLayer::permissive() - } else { - let origins = self - .origins - .iter() - .filter_map(|o| o.parse::().ok()) - .collect::>(); - CorsLayer::new().allow_origin(AllowOrigin::list(origins)) - } } fn extract_auth_pubkey(&self, headers: &HeaderMap) -> std::result::Result { @@ -289,9 +251,9 @@ fn map_unique_error(err: &anyhow::Error) -> Option<&'static str> { None } -#[derive(Deserialize, Serialize)] -struct UpdateTenantBillingRequest { - nwc_url: String, +#[derive(Deserialize)] +struct UpdateTenantRequest { + nwc_url: Option, } #[derive(Serialize)] @@ -364,7 +326,6 @@ async fn get_identity( pubkey: pubkey.clone(), nwc_url: String::new(), created_at: now_ts(), - billing_anchor: now_ts(), }; match state.api.repo.create_tenant(&tenant).await { @@ -489,7 +450,7 @@ async fn list_relay_activity( state.api.require_admin_or_tenant(&auth, &relay.tenant)?; match state.api.repo.list_activity_for_relay(&id).await { - Ok(activity) => Ok(ok(StatusCode::OK, activity)), + Ok(activity) => Ok(ok(StatusCode::OK, serde_json::json!({ "activity": activity }))), Err(e) => Ok(err(StatusCode::INTERNAL_SERVER_ERROR, "internal", &e.to_string())), } } @@ -679,7 +640,15 @@ async fn deactivate_relay( state.api.require_admin_or_tenant(&auth, &relay.tenant)?; - match state.api.repo.deactivate_relay(&relay).await { + if relay.status == "inactive" { + return Ok(err( + StatusCode::BAD_REQUEST, + "relay-is-inactive", + "relay is already inactive", + )); + } + + match state.api.billing.deactivate_relay(&id).await { Ok(()) => Ok(ok(StatusCode::OK, ())), Err(e) => Ok(err( StatusCode::INTERNAL_SERVER_ERROR, @@ -689,51 +658,16 @@ async fn deactivate_relay( } } -async fn list_invoices( - State(state): State, - headers: HeaderMap, -) -> std::result::Result { - let pubkey = state.api.extract_auth_pubkey(&headers)?; - state.api.require_admin(&pubkey)?; - - match state.api.repo.list_invoices_with_items().await { - Ok(invoices) => Ok(ok(StatusCode::OK, invoices)), - Err(e) => Ok(err( - StatusCode::INTERNAL_SERVER_ERROR, - "internal", - &e.to_string(), - )), - } -} - -async fn list_tenant_invoices( - State(state): State, - headers: HeaderMap, - Path(pubkey): Path, -) -> std::result::Result { - let auth = state.api.extract_auth_pubkey(&headers)?; - state.api.require_admin_or_tenant(&auth, &pubkey)?; - - match state.api.repo.list_invoices_for_tenant_with_items(&pubkey).await { - Ok(invoices) => Ok(ok(StatusCode::OK, invoices)), - Err(e) => Ok(err( - StatusCode::INTERNAL_SERVER_ERROR, - "internal", - &e.to_string(), - )), - } -} - -async fn get_invoice( +async fn reactivate_relay( State(state): State, headers: HeaderMap, Path(id): Path, ) -> std::result::Result { let auth = state.api.extract_auth_pubkey(&headers)?; - let invoice = match state.api.repo.get_invoice_with_items(&id).await { - Ok(Some(i)) => i, - Ok(None) => return Ok(err(StatusCode::NOT_FOUND, "not-found", "invoice not found")), + let relay = match state.api.repo.get_relay(&id).await { + Ok(Some(r)) => r, + Ok(None) => return Ok(err(StatusCode::NOT_FOUND, "not-found", "relay not found")), Err(e) => { return Ok(err( StatusCode::INTERNAL_SERVER_ERROR, @@ -743,27 +677,18 @@ async fn get_invoice( } }; - state.api.require_admin_or_tenant(&auth, &invoice.invoice.tenant)?; + state.api.require_admin_or_tenant(&auth, &relay.tenant)?; - Ok(ok(StatusCode::OK, invoice)) -} + if relay.status == "active" { + return Ok(err( + StatusCode::BAD_REQUEST, + "relay-is-active", + "relay is already active", + )); + } -async fn update_tenant_billing( - State(state): State, - headers: HeaderMap, - Path(pubkey): Path, - Json(payload): Json, -) -> std::result::Result { - let auth = state.api.extract_auth_pubkey(&headers)?; - state.api.require_admin_or_tenant(&auth, &pubkey)?; - - match state - .api - .repo - .update_tenant_nwc_url(&pubkey, &payload.nwc_url) - .await - { - Ok(()) => Ok(ok(StatusCode::OK, payload)), + match state.api.billing.reactivate_relay(&id).await { + Ok(()) => Ok(ok(StatusCode::OK, ())), Err(e) => Ok(err( StatusCode::INTERNAL_SERVER_ERROR, "internal", @@ -772,334 +697,38 @@ async fn update_tenant_billing( } } -#[cfg(test)] -mod tests { - use std::str::FromStr; +async fn update_tenant( + State(state): State, + headers: HeaderMap, + Path(pubkey): Path, + Json(payload): Json, +) -> std::result::Result { + let auth = state.api.extract_auth_pubkey(&headers)?; + state.api.require_admin_or_tenant(&auth, &pubkey)?; - use axum::{ - body::{Body, to_bytes}, - http::{Request, StatusCode, header}, + let mut tenant = match state.api.repo.get_tenant(&pubkey).await { + Ok(Some(t)) => t, + Ok(None) => return Ok(err(StatusCode::NOT_FOUND, "not-found", "tenant not found")), + Err(e) => { + return Ok(err( + StatusCode::INTERNAL_SERVER_ERROR, + "internal", + &e.to_string(), + )); + } }; - use base64::Engine; - use nostr_sdk::{EventBuilder, JsonUtil, Keys, Kind, Tag}; - use serde_json::{Value, json}; - use sqlx::{SqlitePool, sqlite::{SqliteConnectOptions, SqlitePoolOptions}}; - use tower::util::ServiceExt; - use crate::{models::{Relay, Tenant}, repo::Repo}; - - use super::Api; - - async fn test_repo() -> Repo { - let db_file = std::env::temp_dir().join(format!("caravel-api-test-{}.db", uuid::Uuid::new_v4())); - let database_url = format!("sqlite://{}", db_file.display()); - - let options = SqliteConnectOptions::from_str(&database_url) - .expect("sqlite options") - .create_if_missing(true); - let pool: SqlitePool = SqlitePoolOptions::new() - .max_connections(1) - .connect_with(options) - .await - .expect("connect sqlite"); - - sqlx::query("PRAGMA journal_mode = WAL;") - .execute(&pool) - .await - .expect("set WAL"); - - sqlx::migrate!("./migrations") - .run(&pool) - .await - .expect("run migrations"); - - Repo { pool } + if let Some(nwc_url) = payload.nwc_url { + tenant.nwc_url = nwc_url; } - fn keys() -> (Keys, Keys, Keys) { - (Keys::generate(), Keys::generate(), Keys::generate()) - } - - fn pubkey_hex(keys: &Keys) -> String { - keys.public_key().to_hex() - } - - fn auth_header(keys: &Keys, u: &str) -> String { - let tag = Tag::parse(["u", u]).expect("u tag"); - let event = EventBuilder::new(Kind::HttpAuth, "").tags([tag]) - .sign_with_keys(keys) - .expect("sign nip98 event"); - let json = event.as_json(); - let b64 = base64::engine::general_purpose::STANDARD.encode(json); - format!("Nostr {b64}") - } - - fn make_api(repo: Repo, admin_pubkey: String) -> Api { - Api { - host: "api.test".to_string(), - port: 0, - admins: vec![admin_pubkey], - origins: vec![], - repo, - } - } - - async fn request( - api: &Api, - method: &str, - path: &str, - auth: Option, - body: Option, - ) -> (StatusCode, Value) { - let mut builder = Request::builder().method(method).uri(path); - if let Some(auth) = auth { - builder = builder.header(header::AUTHORIZATION, auth); - } - - let req = if let Some(body) = body { - builder - .header(header::CONTENT_TYPE, "application/json") - .body(Body::from(body.to_string())) - .expect("request") - } else { - builder.body(Body::empty()).expect("request") - }; - - let resp = api.router().oneshot(req).await.expect("router response"); - let status = resp.status(); - let bytes = to_bytes(resp.into_body(), usize::MAX) - .await - .expect("read body"); - let payload: Value = serde_json::from_slice(&bytes).expect("json body"); - (status, payload) - } - - async fn create_tenant(repo: &Repo, pubkey: String) { - let tenant = Tenant { - pubkey, - nwc_url: String::new(), - created_at: 1, - billing_anchor: 1, - }; - repo.create_tenant(&tenant).await.expect("create tenant"); - } - - async fn create_relay(repo: &Repo, id: &str, tenant: &str, subdomain: &str) { - let relay = Relay { - id: id.to_string(), - tenant: tenant.to_string(), - schema: format!("{}_{}", subdomain.replace('-', "_"), id), - subdomain: subdomain.to_string(), - plan: "free".to_string(), - status: "new".to_string(), - sync_error: String::new(), - info_name: String::new(), - info_icon: String::new(), - info_description: String::new(), - policy_public_join: 0, - policy_strip_signatures: 0, - groups_enabled: 1, - management_enabled: 1, - blossom_enabled: 0, - livekit_enabled: 0, - push_enabled: 1, - synced: 0, - }; - repo.create_relay(&relay).await.expect("create relay"); - } - - #[tokio::test] - async fn missing_auth_returns_unauthorized() { - let repo = test_repo().await; - let (admin_keys, _, _) = keys(); - let api = make_api(repo, pubkey_hex(&admin_keys)); - - let (status, body) = request(&api, "GET", "/plans", None, None).await; - - assert_eq!(status, StatusCode::UNAUTHORIZED); - assert_eq!(body["code"], "unauthorized"); - } - - #[tokio::test] - async fn plans_endpoints_return_ok_and_not_found() { - let repo = test_repo().await; - let (admin_keys, _, _) = keys(); - let api = make_api(repo, pubkey_hex(&admin_keys)); - let auth = auth_header(&admin_keys, "https://api.test"); - - let (status, body) = request(&api, "GET", "/plans", Some(auth.clone()), None).await; - assert_eq!(status, StatusCode::OK); - assert_eq!(body["code"], "ok"); - assert!(body["data"].as_array().expect("plans array").len() >= 3); - - let (status, body) = request(&api, "GET", "/plans/does-not-exist", Some(auth), None).await; - assert_eq!(status, StatusCode::NOT_FOUND); - assert_eq!(body["code"], "not-found"); - } - - #[tokio::test] - async fn tenants_list_is_admin_only() { - let repo = test_repo().await; - let (admin_keys, tenant_keys, _) = keys(); - let api = make_api(repo, pubkey_hex(&admin_keys)); - - let auth = auth_header(&tenant_keys, "https://api.test"); - let (status, body) = request(&api, "GET", "/tenants", Some(auth), None).await; - - assert_eq!(status, StatusCode::FORBIDDEN); - assert_eq!(body["code"], "forbidden"); - } - - #[tokio::test] - async fn identity_creates_tenant_if_missing() { - let repo = test_repo().await; - let (admin_keys, tenant_keys, _) = keys(); - let api = make_api(repo.clone(), pubkey_hex(&admin_keys)); - let auth = auth_header(&tenant_keys, "https://api.test"); - let tenant_pubkey = pubkey_hex(&tenant_keys); - - let (status, body) = request(&api, "GET", "/identity", Some(auth), None).await; - assert_eq!(status, StatusCode::OK); - assert_eq!(body["code"], "ok"); - assert_eq!(body["data"]["pubkey"], tenant_pubkey); - assert_eq!(body["data"]["is_admin"], false); - - let tenant = repo - .get_tenant(&tenant_pubkey) - .await - .expect("lookup tenant after identity"); - assert!(tenant.is_some()); - } - - #[tokio::test] - async fn tenant_get_allows_owner_and_denies_other_tenant() { - let repo = test_repo().await; - let (admin_keys, tenant_a_keys, tenant_b_keys) = keys(); - create_tenant(&repo, pubkey_hex(&tenant_a_keys)).await; - let api = make_api(repo, pubkey_hex(&admin_keys)); - - let owner_auth = auth_header(&tenant_a_keys, "https://api.test"); - let (status, body) = request( - &api, - "GET", - &format!("/tenants/{}", pubkey_hex(&tenant_a_keys)), - Some(owner_auth), - None, - ) - .await; - assert_eq!(status, StatusCode::OK); - assert_eq!(body["code"], "ok"); - - let other_auth = auth_header(&tenant_b_keys, "https://api.test"); - let (status, body) = request( - &api, - "GET", - &format!("/tenants/{}", pubkey_hex(&tenant_a_keys)), - Some(other_auth), - None, - ) - .await; - assert_eq!(status, StatusCode::FORBIDDEN); - assert_eq!(body["code"], "forbidden"); - } - - #[tokio::test] - async fn create_relay_validates_premium_feature_and_subdomain() { - let repo = test_repo().await; - let (admin_keys, tenant_keys, _) = keys(); - create_tenant(&repo, pubkey_hex(&tenant_keys)).await; - let api = make_api(repo, pubkey_hex(&admin_keys)); - let auth = auth_header(&tenant_keys, "https://api.test"); - - let (status, body) = request( - &api, - "POST", - "/relays", - Some(auth.clone()), - Some(json!({ - "tenant": pubkey_hex(&tenant_keys), - "subdomain": "bad_subdomain", - "plan": "free" - })), - ) - .await; - assert_eq!(status, StatusCode::UNPROCESSABLE_ENTITY); - assert_eq!(body["code"], "invalid-relay"); - - let (status, body) = request( - &api, - "POST", - "/relays", - Some(auth), - Some(json!({ - "tenant": pubkey_hex(&tenant_keys), - "subdomain": "good-subdomain", - "plan": "free", - "blossom_enabled": 1 - })), - ) - .await; - assert_eq!(status, StatusCode::UNPROCESSABLE_ENTITY); - assert_eq!(body["code"], "premium-feature"); - } - - #[tokio::test] - async fn relay_get_honors_owner_and_duplicate_subdomain_is_422() { - let repo = test_repo().await; - let (admin_keys, tenant_a_keys, tenant_b_keys) = keys(); - let tenant_a = pubkey_hex(&tenant_a_keys); - create_tenant(&repo, tenant_a.clone()).await; - create_tenant(&repo, pubkey_hex(&tenant_b_keys)).await; - create_relay(&repo, "relay-a", &tenant_a, "alpha").await; - - let api = make_api(repo.clone(), pubkey_hex(&admin_keys)); - let owner_auth = auth_header(&tenant_a_keys, "https://api.test"); - - let (status, body) = request(&api, "GET", "/relays/relay-a", Some(owner_auth), None).await; - assert_eq!(status, StatusCode::OK); - assert_eq!(body["code"], "ok"); - - let other_auth = auth_header(&tenant_b_keys, "https://api.test"); - let (status, body) = request(&api, "GET", "/relays/relay-a", Some(other_auth), None).await; - assert_eq!(status, StatusCode::FORBIDDEN); - assert_eq!(body["code"], "forbidden"); - - let (status, body) = request( - &api, - "POST", - "/relays", - Some(auth_header(&tenant_a_keys, "https://api.test")), - Some(json!({ - "tenant": tenant_a, - "subdomain": "alpha", - "plan": "free" - })), - ) - .await; - assert_eq!(status, StatusCode::UNPROCESSABLE_ENTITY); - assert_eq!(body["code"], "subdomain-exists"); - } - - #[tokio::test] - async fn update_tenant_billing_allows_self() { - let repo = test_repo().await; - let (admin_keys, tenant_keys, _) = keys(); - let tenant = pubkey_hex(&tenant_keys); - create_tenant(&repo, tenant.clone()).await; - - let api = make_api(repo.clone(), pubkey_hex(&admin_keys)); - let auth = auth_header(&tenant_keys, "https://api.test"); - - let (status, body) = request( - &api, - "PUT", - &format!("/tenants/{tenant}/billing"), - Some(auth), - Some(json!({"nwc_url":"nostr+walletconnect://example"})), - ) - .await; - assert_eq!(status, StatusCode::OK); - assert_eq!(body["code"], "ok"); - assert_eq!(body["data"]["nwc_url"], "nostr+walletconnect://example"); + match state.api.repo.update_tenant(&tenant).await { + Ok(()) => Ok(ok(StatusCode::OK, tenant)), + Err(e) => Ok(err( + StatusCode::INTERNAL_SERVER_ERROR, + "internal", + &e.to_string(), + )), } } + diff --git a/backend/src/billing.rs b/backend/src/billing.rs index 8ce39ce..b8d8ab1 100644 --- a/backend/src/billing.rs +++ b/backend/src/billing.rs @@ -1,10 +1,5 @@ -use std::collections::HashMap; - use anyhow::Result; -use chrono::{DateTime, Datelike, Duration, Months, TimeZone, Utc}; -use tokio::sync::Mutex; -use crate::models::{Activity, Invoice, InvoiceItem, Relay, Tenant}; use crate::repo::Repo; use crate::robot::Robot; @@ -13,7 +8,6 @@ pub struct Billing { nwc_url: String, repo: Repo, robot: Robot, - last_activity_at: std::sync::Arc>, } impl Billing { @@ -23,384 +17,24 @@ impl Billing { nwc_url, repo, robot, - last_activity_at: std::sync::Arc::new(Mutex::new(0)), } } - pub async fn start(self) { - let mut interval = tokio::time::interval(std::time::Duration::from_secs(3600)); - loop { - interval.tick().await; - if let Err(e) = self.tick().await { - tracing::error!(error = %e, "billing tick failed"); - } - } - } - - pub async fn tick(&self) -> Result<()> { - let mut since_guard = self.last_activity_at.lock().await; - let since = *since_guard; - let activity = self.repo.list_activity(&since).await?; - for a in &activity { - if matches!( - a.activity_type.as_str(), - "create_relay" | "update_relay" | "activate_relay" - ) { - self.maybe_reset_anchor_for_first_paid_relay(a).await?; - } - *since_guard = (*since_guard).max(a.created_at); - } - drop(since_guard); - - let tenants = self.repo.list_tenants().await?; - for tenant in &tenants { - self.generate_invoice_if_due(tenant).await?; - self.collect_outstanding(tenant).await?; - } - - Ok(()) - } - - async fn maybe_reset_anchor_for_first_paid_relay(&self, activity: &Activity) -> Result<()> { - let relay = match self.repo.get_relay(&activity.resource_id).await? { - Some(r) => r, - None => return Ok(()), - }; - if relay.plan == "free" { - return Ok(()); - } - - let relays = self.repo.list_relays_for_tenant(&relay.tenant).await?; - let paid_active_count = relays - .into_iter() - .filter(|r| r.status == "active" && r.plan != "free") - .count() as i64; - - if paid_active_count == 1 { - self.repo - .update_tenant_billing_anchor(&relay.tenant, activity.created_at) - .await?; - } - - Ok(()) - } - - async fn generate_invoice_if_due(&self, tenant: &Tenant) -> Result<()> { - if self + pub async fn deactivate_relay(&self, relay_id: &str) -> Result<()> { + let relay = self .repo - .total_pending_invoices_for_tenant(&tenant.pubkey) + .get_relay(relay_id) .await? - > 0 - { - return Ok(()); - } + .ok_or_else(|| anyhow::anyhow!("relay not found"))?; + self.repo.deactivate_relay(&relay).await + } - let relays = self.repo.list_relays_for_tenant(&tenant.pubkey).await?; - let active_paid_relays: Vec = relays - .iter() - .filter(|r| r.status == "active" && r.plan != "free") - .cloned() - .collect(); - if active_paid_relays.is_empty() { - return Ok(()); - } - - let now = Utc::now(); - let anchor = ts_to_dt(tenant.billing_anchor)?; - let (period_start, period_end) = billing_window(anchor, now); - if now < period_end { - return Ok(()); - } - - let usage_events = self + pub async fn reactivate_relay(&self, relay_id: &str) -> Result<()> { + let relay = self .repo - .list_activity_for_tenant(&tenant.pubkey, &tenant.billing_anchor) - .await?; - let invoice_id = uuid::Uuid::new_v4().to_string(); - let mut items = Vec::new(); - - for relay in active_paid_relays { - let hours = - relay_active_hours_in_window(&relay, &usage_events, period_start, period_end); - if hours <= 0 { - continue; - } - let plan_monthly = self.repo.get_relay_plan_sats(&relay.plan).await?; - if plan_monthly <= 0 { - continue; - } - - let sats = ((plan_monthly as f64 / 30.0 / 24.0) * hours as f64).ceil() as i64; - if sats <= 0 { - continue; - } - - items.push(InvoiceItem { - id: uuid::Uuid::new_v4().to_string(), - invoice: invoice_id.clone(), - relay: relay.id, - sats, - }); - } - - let total: i64 = items.iter().map(|i| i.sats).sum(); - if total == 0 { - return Ok(()); - } - - let bolt11 = match self.make_bolt11(total).await { - Ok(v) => v, - Err(e) => { - tracing::error!(tenant = %tenant.pubkey, error = %e, "bolt11 generation failed"); - return Ok(()); - } - }; - - let invoice = Invoice { - id: invoice_id, - tenant: tenant.pubkey.clone(), - status: "pending".to_string(), - created_at: now.timestamp(), - attempted_at: 0, - error: String::new(), - closed_at: 0, - sent_at: 0, - paid_at: 0, - bolt11, - period_start: period_start.timestamp(), - period_end: period_end.timestamp(), - }; - - self.repo.create_invoice(&invoice, &items).await?; - Ok(()) - } - - async fn collect_outstanding(&self, tenant: &Tenant) -> Result<()> { - let invoices = self.repo.list_invoices_for_tenant(&tenant.pubkey).await?; - let now = now_ts(); - - for invoice in invoices.into_iter().filter(|i| i.status == "pending") { - if invoice.attempted_at > 0 && now - invoice.attempted_at < 24 * 3600 { - continue; - } - - if self.is_bolt11_paid(&invoice.bolt11).await { - self.repo.mark_invoice_paid(&invoice.id).await?; - continue; - } - - let mut collected = false; - if !tenant.nwc_url.trim().is_empty() - && self.pay_invoice_nwc(&tenant.nwc_url, &invoice.bolt11).await - { - self.repo.mark_invoice_paid(&invoice.id).await?; - collected = true; - } - - if !collected { - self.repo - .mark_invoice_attempted(&invoice.id, Some("autopay failed or unavailable")) - .await?; - - if invoice.sent_at == 0 { - let amount: i64 = self - .repo - .get_invoice_items(&invoice.id) - .await? - .into_iter() - .map(|i| i.sats) - .sum(); - let message = format!( - "Invoice {} is due. Amount: {} sats\n{}", - invoice.id, amount, invoice.bolt11 - ); - if self.robot.send_dm(&tenant.pubkey, &message).await.is_ok() { - self.repo.mark_invoice_sent(&invoice.id).await?; - } - } - } - - if now - invoice.created_at >= 7 * 24 * 3600 { - self.repo.mark_invoice_closed(&invoice.id).await?; - } - } - - Ok(()) - } - - async fn make_bolt11(&self, sats: i64) -> Result { - if self.nwc_url.trim().is_empty() { - anyhow::bail!("NWC_URL not configured") - } - - let uri = nostr_sdk::nips::nip47::NostrWalletConnectURI::parse(&self.nwc_url)?; - let req = nostr_sdk::nips::nip47::Request::make_invoice( - nostr_sdk::nips::nip47::MakeInvoiceRequest { - amount: (sats as u64) * 1_000, - description: Some("Caravel relay invoice".to_string()), - description_hash: None, - expiry: None, - }, - ); - - let resp = self.send_nwc_request(&uri, req).await?; - Ok(resp.to_make_invoice()?.invoice) - } - - async fn is_bolt11_paid(&self, _bolt11: &str) -> bool { - false - } - - async fn pay_invoice_nwc(&self, nwc_url: &str, bolt11: &str) -> bool { - let uri = match nostr_sdk::nips::nip47::NostrWalletConnectURI::parse(nwc_url) { - Ok(v) => v, - Err(_) => return false, - }; - let req = nostr_sdk::nips::nip47::Request::pay_invoice( - nostr_sdk::nips::nip47::PayInvoiceRequest::new(bolt11), - ); - self.send_nwc_request(&uri, req) - .await - .and_then(|r| r.to_pay_invoice().map(|_| ()).map_err(anyhow::Error::from)) - .is_ok() - } - - async fn send_nwc_request( - &self, - uri: &nostr_sdk::nips::nip47::NostrWalletConnectURI, - request: nostr_sdk::nips::nip47::Request, - ) -> Result { - use nostr_sdk::{Client, Filter, Keys, Kind, Timestamp}; - - let app_keys = Keys::new(uri.secret.clone()); - let app_pubkey = app_keys.public_key(); - let client = Client::new(app_keys); - client.add_relay(uri.relay_url.clone()).await?; - client.connect().await; - - let started_at = Timestamp::now(); - let event = request.to_event(uri)?; - client.send_event(event).await?; - - let filter = Filter::new() - .kind(Kind::WalletConnectResponse) - .author(uri.public_key) - .pubkey(app_pubkey) - .since(started_at); - - let events = client - .fetch_events(filter, std::time::Duration::from_secs(10)) - .await?; - let event = events - .into_iter() - .max_by_key(|e| e.created_at) - .ok_or_else(|| anyhow::anyhow!("no NWC response received"))?; - - Ok(nostr_sdk::nips::nip47::Response::from_event(uri, &event)?) + .get_relay(relay_id) + .await? + .ok_or_else(|| anyhow::anyhow!("relay not found"))?; + self.repo.activate_relay(&relay).await } } - -fn now_ts() -> i64 { - Utc::now().timestamp() -} - -fn ts_to_dt(ts: i64) -> Result> { - Utc.timestamp_opt(ts, 0) - .single() - .ok_or_else(|| anyhow::anyhow!("invalid unix timestamp")) -} - -fn billing_window(anchor: DateTime, now: DateTime) -> (DateTime, DateTime) { - let mut start = anchor; - loop { - let end = start + Months::new(1); - if now < end { - return (start, end); - } - start = end; - } -} - -fn relay_active_hours_in_window( - relay: &Relay, - events: &[Activity], - start: DateTime, - end: DateTime, -) -> i64 { - if relay.plan == "free" { - return 0; - } - - let mut marks: HashMap<&str, Vec<&Activity>> = HashMap::new(); - for event in events { - if event.resource_type == "relay" && event.resource_id == relay.id { - marks.entry(&relay.id).or_default().push(event); - } - } - - let Some(entries) = marks.get(relay.id.as_str()) else { - if relay.status == "active" { - return ((end - start).num_seconds() as f64 / 3600.0).ceil() as i64; - } - return 0; - }; - - let mut active = relay.status == "active"; - let mut cursor = start; - let mut secs = 0i64; - - for event in entries.iter().copied() { - let ts = match Utc.timestamp_opt(event.created_at, 0).single() { - Some(v) => v, - None => continue, - }; - if ts <= start || ts >= end { - continue; - } - - match event.activity_type.as_str() { - "create_relay" | "activate_relay" => { - if !active { - active = true; - cursor = ts; - } - } - "deactivate_relay" | "fail_relay_sync" => { - if active { - active = false; - secs += (ts - cursor).num_seconds().max(0); - } - } - _ => {} - } - } - - if active { - secs += (end - cursor).num_seconds().max(0); - } - - let hours = (secs as f64 / 3600.0).ceil() as i64; - if hours > 0 { hours } else { 0 } -} - -#[allow(dead_code)] -fn _same_month(a: DateTime, b: DateTime) -> bool { - a.year() == b.year() && a.month() == b.month() -} - -#[allow(dead_code)] -fn _days_between(a: i64, b: i64) -> i64 { - let da = Utc.timestamp_opt(a, 0).single().unwrap_or_else(Utc::now); - let db = Utc.timestamp_opt(b, 0).single().unwrap_or_else(Utc::now); - (db - da).num_days() -} - -#[allow(dead_code)] -fn _hours_between(a: DateTime, b: DateTime) -> i64 { - ((b - a).num_seconds() as f64 / 3600.0).ceil() as i64 -} - -#[allow(dead_code)] -fn _next_day(dt: DateTime) -> DateTime { - dt + Duration::days(1) -} diff --git a/backend/src/main.rs b/backend/src/main.rs index 356907a..01c58fb 100644 --- a/backend/src/main.rs +++ b/backend/src/main.rs @@ -8,6 +8,8 @@ mod robot; use anyhow::Result; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; +use tower_http::cors::{AllowOrigin, CorsLayer}; + use crate::api::Api; use crate::billing::Billing; use crate::infra::Infra; @@ -27,15 +29,37 @@ async fn main() -> Result<()> { let robot = Robot::new().await?; let billing = Billing::new(repo.clone(), robot.clone()); let infra = Infra::new(repo.clone()); - let api = Api::new(repo); + let api = Api::new(repo, billing.clone()); - tokio::spawn(async move { - billing.start().await; - }); + let host = std::env::var("HOST").unwrap_or_else(|_| "127.0.0.1".to_string()); + let port: u16 = std::env::var("PORT") + .ok() + .and_then(|v| v.parse().ok()) + .unwrap_or(3000); + let origins: Vec = std::env::var("ALLOW_ORIGINS") + .unwrap_or_default() + .split(',') + .map(|v| v.trim().to_string()) + .filter(|v| !v.is_empty()) + .collect(); + + let cors = if origins.is_empty() { + CorsLayer::permissive() + } else { + let parsed = origins + .iter() + .filter_map(|o| o.parse::().ok()) + .collect::>(); + CorsLayer::new().allow_origin(AllowOrigin::list(parsed)) + }; + + let app = api.router().layer(cors); tokio::spawn(async move { infra.start().await; }); - api.serve().await + let listener = tokio::net::TcpListener::bind(format!("{host}:{port}")).await?; + axum::serve(listener, app).await?; + Ok(()) } diff --git a/backend/src/models.rs b/backend/src/models.rs index e365347..6cc6a03 100644 --- a/backend/src/models.rs +++ b/backend/src/models.rs @@ -26,7 +26,6 @@ pub struct Tenant { #[serde(skip_serializing)] pub nwc_url: String, pub created_at: i64, - pub billing_anchor: i64, } #[derive(Debug, Clone, Serialize, Deserialize, sqlx::FromRow)] @@ -48,37 +47,5 @@ pub struct Relay { pub blossom_enabled: i64, pub livekit_enabled: i64, pub push_enabled: i64, - #[serde(skip_serializing)] pub synced: i64, } - -#[derive(Debug, Clone, Serialize, Deserialize, sqlx::FromRow)] -pub struct Invoice { - pub id: String, - pub tenant: String, - pub status: String, - pub created_at: i64, - pub attempted_at: i64, - pub error: String, - pub closed_at: i64, - pub sent_at: i64, - pub paid_at: i64, - pub bolt11: String, - pub period_start: i64, - pub period_end: i64, -} - -#[derive(Debug, Clone, Serialize, Deserialize, sqlx::FromRow)] -pub struct InvoiceItem { - pub id: String, - pub invoice: String, - pub relay: String, - pub sats: i64, -} - -#[derive(Debug, Clone, Serialize)] -pub struct InvoiceWithItems { - #[serde(flatten)] - pub invoice: Invoice, - pub items: Vec, -} diff --git a/backend/src/repo.rs b/backend/src/repo.rs index 60fe8de..d1adc5f 100644 --- a/backend/src/repo.rs +++ b/backend/src/repo.rs @@ -7,7 +7,7 @@ use sqlx::{ sqlite::{SqliteConnectOptions, SqlitePoolOptions}, }; -use crate::models::{Activity, Invoice, InvoiceItem, InvoiceWithItems, Plan, Relay, Tenant}; +use crate::models::{Activity, Plan, Relay, Tenant}; #[derive(Clone)] pub struct Repo { @@ -60,12 +60,6 @@ impl Repo { .fetch_one(&mut **tx) .await? } - "invoice" => { - sqlx::query_scalar::<_, String>("SELECT tenant FROM invoice WHERE id = ?") - .bind(resource_id) - .fetch_one(&mut **tx) - .await? - } _ => anyhow::bail!("unknown resource_type: {}", resource_type), }; @@ -85,7 +79,7 @@ impl Repo { pub async fn list_tenants(&self) -> Result> { let rows = sqlx::query_as::<_, Tenant>( - "SELECT pubkey, nwc_url, created_at, billing_anchor + "SELECT pubkey, nwc_url, created_at FROM tenant ORDER BY pubkey", ) @@ -96,7 +90,7 @@ impl Repo { pub async fn get_tenant(&self, pubkey: &str) -> Result> { let row = sqlx::query_as::<_, Tenant>( - "SELECT pubkey, nwc_url, created_at, billing_anchor + "SELECT pubkey, nwc_url, created_at FROM tenant WHERE pubkey = ?", ) @@ -110,13 +104,12 @@ impl Repo { let mut tx = self.pool.begin().await?; sqlx::query( - "INSERT INTO tenant (pubkey, nwc_url, created_at, billing_anchor) - VALUES (?, ?, ?, ?)", + "INSERT INTO tenant (pubkey, nwc_url, created_at) + VALUES (?, ?, ?)", ) .bind(&tenant.pubkey) .bind(&tenant.nwc_url) .bind(tenant.created_at) - .bind(tenant.billing_anchor) .execute(&mut *tx) .await?; @@ -126,35 +119,16 @@ impl Repo { Ok(()) } - pub async fn update_tenant_billing_anchor( - &self, - pubkey: &str, - billing_anchor: i64, - ) -> Result<()> { - let mut tx = self.pool.begin().await?; - - sqlx::query("UPDATE tenant SET billing_anchor = ? WHERE pubkey = ?") - .bind(billing_anchor) - .bind(pubkey) - .execute(&mut *tx) - .await?; - - Self::insert_activity(&mut tx, "update_tenant_billing_anchor", "tenant", pubkey).await?; - - tx.commit().await?; - Ok(()) - } - - pub async fn update_tenant_nwc_url(&self, pubkey: &str, nwc_url: &str) -> Result<()> { + pub async fn update_tenant(&self, tenant: &Tenant) -> Result<()> { let mut tx = self.pool.begin().await?; sqlx::query("UPDATE tenant SET nwc_url = ? WHERE pubkey = ?") - .bind(nwc_url) - .bind(pubkey) + .bind(&tenant.nwc_url) + .bind(&tenant.pubkey) .execute(&mut *tx) .await?; - Self::insert_activity(&mut tx, "update_tenant_nwc_url", "tenant", pubkey).await?; + Self::insert_activity(&mut tx, "update_tenant", "tenant", &tenant.pubkey).await?; tx.commit().await?; Ok(()) @@ -313,172 +287,10 @@ impl Repo { } pub async fn mark_relay_synced(&self, relay_id: &str) -> Result<()> { - let mut tx = self.pool.begin().await?; - sqlx::query("UPDATE relay SET synced = 1, status = 'active', sync_error = '' WHERE id = ?") .bind(relay_id) - .execute(&mut *tx) + .execute(&self.pool) .await?; - - Self::insert_activity(&mut tx, "mark_relay_synced", "relay", relay_id).await?; - - tx.commit().await?; - Ok(()) - } - - pub async fn create_invoice( - &self, - invoice: &Invoice, - invoice_items: &[InvoiceItem], - ) -> Result<()> { - let mut tx = self.pool.begin().await?; - - sqlx::query( - "INSERT INTO invoice ( - id, tenant, status, created_at, attempted_at, error, closed_at, - sent_at, paid_at, bolt11, period_start, period_end - ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", - ) - .bind(&invoice.id) - .bind(&invoice.tenant) - .bind(&invoice.status) - .bind(invoice.created_at) - .bind(invoice.attempted_at) - .bind(&invoice.error) - .bind(invoice.closed_at) - .bind(invoice.sent_at) - .bind(invoice.paid_at) - .bind(&invoice.bolt11) - .bind(invoice.period_start) - .bind(invoice.period_end) - .execute(&mut *tx) - .await?; - - for item in invoice_items { - sqlx::query("INSERT INTO invoice_item (id, invoice, relay, sats) VALUES (?, ?, ?, ?)") - .bind(&item.id) - .bind(&item.invoice) - .bind(&item.relay) - .bind(item.sats) - .execute(&mut *tx) - .await?; - } - - Self::insert_activity(&mut tx, "create_invoice", "invoice", &invoice.id).await?; - - tx.commit().await?; - Ok(()) - } - - pub async fn list_invoices(&self) -> Result> { - let rows = sqlx::query_as::<_, Invoice>( - "SELECT id, tenant, status, created_at, attempted_at, error, closed_at, - sent_at, paid_at, bolt11, period_start, period_end - FROM invoice - ORDER BY created_at DESC", - ) - .fetch_all(&self.pool) - .await?; - Ok(rows) - } - - pub async fn list_invoices_for_tenant(&self, tenant_id: &str) -> Result> { - let rows = sqlx::query_as::<_, Invoice>( - "SELECT id, tenant, status, created_at, attempted_at, error, closed_at, - sent_at, paid_at, bolt11, period_start, period_end - FROM invoice - WHERE tenant = ? - ORDER BY created_at DESC", - ) - .bind(tenant_id) - .fetch_all(&self.pool) - .await?; - Ok(rows) - } - - pub async fn get_invoice(&self, id: &str) -> Result> { - let row = sqlx::query_as::<_, Invoice>( - "SELECT id, tenant, status, created_at, attempted_at, error, closed_at, - sent_at, paid_at, bolt11, period_start, period_end - FROM invoice - WHERE id = ?", - ) - .bind(id) - .fetch_optional(&self.pool) - .await?; - Ok(row) - } - - pub async fn mark_invoice_paid(&self, invoice_id: &str) -> Result<()> { - let mut tx = self.pool.begin().await?; - - sqlx::query( - "UPDATE invoice - SET status = 'paid', paid_at = strftime('%s','now'), error = '' - WHERE id = ?", - ) - .bind(invoice_id) - .execute(&mut *tx) - .await?; - - Self::insert_activity(&mut tx, "mark_invoice_paid", "invoice", invoice_id).await?; - - tx.commit().await?; - Ok(()) - } - - pub async fn mark_invoice_attempted( - &self, - invoice_id: &str, - error: Option<&str>, - ) -> Result<()> { - let mut tx = self.pool.begin().await?; - - sqlx::query( - "UPDATE invoice - SET attempted_at = strftime('%s','now'), error = COALESCE(?, error) - WHERE id = ?", - ) - .bind(error) - .bind(invoice_id) - .execute(&mut *tx) - .await?; - - Self::insert_activity(&mut tx, "mark_invoice_attempted", "invoice", invoice_id).await?; - - tx.commit().await?; - Ok(()) - } - - pub async fn mark_invoice_sent(&self, invoice_id: &str) -> Result<()> { - let mut tx = self.pool.begin().await?; - - sqlx::query("UPDATE invoice SET sent_at = strftime('%s','now') WHERE id = ?") - .bind(invoice_id) - .execute(&mut *tx) - .await?; - - Self::insert_activity(&mut tx, "mark_invoice_sent", "invoice", invoice_id).await?; - - tx.commit().await?; - Ok(()) - } - - pub async fn mark_invoice_closed(&self, invoice_id: &str) -> Result<()> { - let mut tx = self.pool.begin().await?; - - sqlx::query( - "UPDATE invoice - SET status = 'closed', closed_at = strftime('%s','now') - WHERE id = ?", - ) - .bind(invoice_id) - .execute(&mut *tx) - .await?; - - Self::insert_activity(&mut tx, "mark_invoice_closed", "invoice", invoice_id).await?; - - tx.commit().await?; Ok(()) } @@ -504,24 +316,6 @@ impl Repo { Ok(rows) } - pub async fn list_activity_for_tenant( - &self, - tenant: &str, - since: &i64, - ) -> Result> { - let rows = sqlx::query_as::<_, Activity>( - "SELECT id, tenant, created_at, activity_type, resource_type, resource_id - FROM activity - WHERE created_at > ? AND tenant = ? - ORDER BY created_at, id", - ) - .bind(since) - .bind(tenant) - .fetch_all(&self.pool) - .await?; - Ok(rows) - } - pub async fn list_activity_for_relay(&self, relay_id: &str) -> Result> { let rows = sqlx::query_as::<_, Activity>( "SELECT id, tenant, created_at, activity_type, resource_type, resource_id @@ -535,71 +329,6 @@ impl Repo { Ok(rows) } - pub async fn list_invoices_with_items(&self) -> Result> { - let invoices = self.list_invoices().await?; - let mut result = Vec::with_capacity(invoices.len()); - for invoice in invoices { - let items = self.get_invoice_items(&invoice.id).await?; - result.push(InvoiceWithItems { invoice, items }); - } - Ok(result) - } - - pub async fn list_invoices_for_tenant_with_items( - &self, - tenant_id: &str, - ) -> Result> { - let invoices = self.list_invoices_for_tenant(tenant_id).await?; - let mut result = Vec::with_capacity(invoices.len()); - for invoice in invoices { - let items = self.get_invoice_items(&invoice.id).await?; - result.push(InvoiceWithItems { invoice, items }); - } - Ok(result) - } - - pub async fn get_invoice_with_items(&self, id: &str) -> Result> { - match self.get_invoice(id).await? { - Some(invoice) => { - let items = self.get_invoice_items(&invoice.id).await?; - Ok(Some(InvoiceWithItems { invoice, items })) - } - None => Ok(None), - } - } - - pub async fn get_invoice_items(&self, invoice_id: &str) -> Result> { - let rows = sqlx::query_as::<_, InvoiceItem>( - "SELECT id, invoice, relay, sats - FROM invoice_item - WHERE invoice = ?", - ) - .bind(invoice_id) - .fetch_all(&self.pool) - .await?; - Ok(rows) - } - - pub async fn total_pending_invoices_for_tenant(&self, tenant: &str) -> Result { - let count = sqlx::query_scalar::<_, i64>( - "SELECT COUNT(*) FROM invoice - WHERE tenant = ? AND status = 'pending'", - ) - .bind(tenant) - .fetch_one(&self.pool) - .await?; - Ok(count) - } - - pub async fn get_relay_plan_sats(&self, plan: &str) -> Result { - let sats = Self::list_plans() - .into_iter() - .find(|p| p.id == plan) - .map(|p| p.sats) - .unwrap_or(0); - Ok(sats) - } - pub fn list_plans() -> Vec { vec![ Plan { diff --git a/todo.md b/todo.md index c4d7558..32b6bfe 100644 --- a/todo.md +++ b/todo.md @@ -1,6 +1,3 @@ -- [ ] Show relay status on details page -- [ ] Show relay activity history on details page (add activity route and repo method to backend spec + implementation) -- [ ] Infra provisioning isn't happening. At least, the relay's status isn't being updated -- [ ] If relay is inactive, show "reactivate" instead of "inactivate" in the relay detal menu -- [ ] Invoices -- [ ] Stripe +- [ ] Split repo into queries and commands +- [ ] Fix billing by using stripe as a backend to do proration, then mark invoices paid manually when using bitcoin. +- [ ] Send a payment link instead of an invoice so we can generate/pay on the fly