diff --git a/backend/migrations/0001_init.sql b/backend/migrations/0001_init.sql index b5d9378..5c396ad 100644 --- a/backend/migrations/0001_init.sql +++ b/backend/migrations/0001_init.sql @@ -1,92 +1,59 @@ +CREATE TABLE IF NOT EXISTS activities ( + id TEXT PRIMARY KEY, + created_at INTEGER NOT NULL, + activity_type TEXT NOT NULL, + identifier TEXT NOT NULL +); + CREATE TABLE IF NOT EXISTS tenants ( pubkey TEXT PRIMARY KEY, - status TEXT NOT NULL, - nwc_url TEXT NOT NULL DEFAULT "", - created_at INTEGER NOT NULL DEFAULT (UNIXEPOCH()), - billing_anchor_at INTEGER NOT NULL DEFAULT (UNIXEPOCH()), - stripe_customer_id TEXT NOT NULL DEFAULT '', - stripe_subscription_id TEXT NOT NULL DEFAULT '' + nwc_url TEXT NOT NULL DEFAULT '', + created_at INTEGER NOT NULL, + billing_anchor INTEGER NOT NULL ); CREATE TABLE IF NOT EXISTS relays ( id TEXT PRIMARY KEY, tenant TEXT NOT NULL, - name TEXT NOT NULL, + schema TEXT NOT NULL, subdomain TEXT NOT NULL UNIQUE, - description TEXT NOT NULL, plan TEXT NOT NULL, status TEXT NOT NULL, - icon TEXT NOT NULL DEFAULT "", - config TEXT, + sync_error TEXT NOT NULL DEFAULT '', + info_name TEXT NOT NULL DEFAULT '', + info_icon TEXT NOT NULL DEFAULT '', + info_description TEXT NOT NULL DEFAULT '', + policy_public_join INTEGER NOT NULL DEFAULT 0, + policy_strip_signatures INTEGER NOT NULL DEFAULT 0, + groups_enabled INTEGER NOT NULL DEFAULT 1, + management_enabled INTEGER NOT NULL DEFAULT 1, + blossom_enabled INTEGER NOT NULL DEFAULT 0, + livekit_enabled INTEGER NOT NULL DEFAULT 0, + push_enabled INTEGER NOT NULL DEFAULT 1, FOREIGN KEY (tenant) REFERENCES tenants(pubkey) ); CREATE TABLE IF NOT EXISTS invoices ( id TEXT PRIMARY KEY, tenant TEXT NOT NULL, - amount INTEGER NOT NULL, status TEXT NOT NULL, created_at INTEGER NOT NULL, + attempted_at INTEGER NOT NULL DEFAULT 0, + error TEXT NOT NULL DEFAULT '', + closed_at INTEGER NOT NULL DEFAULT 0, + sent_at INTEGER NOT NULL DEFAULT 0, + paid_at INTEGER NOT NULL DEFAULT 0, bolt11 TEXT NOT NULL, period_start INTEGER NOT NULL, period_end INTEGER NOT NULL, FOREIGN KEY (tenant) REFERENCES tenants(pubkey) ); -CREATE UNIQUE INDEX IF NOT EXISTS invoices_tenant_period_unique - ON invoices (tenant, period_start, period_end); - CREATE TABLE IF NOT EXISTS invoice_items ( id TEXT PRIMARY KEY, invoice TEXT NOT NULL, relay TEXT NOT NULL, - amount INTEGER NOT NULL, - period_start INTEGER NOT NULL, - period_end INTEGER NOT NULL, + sats INTEGER NOT NULL, FOREIGN KEY (invoice) REFERENCES invoices(id), FOREIGN KEY (relay) REFERENCES relays(id) ); - -CREATE TABLE IF NOT EXISTS plans ( - id TEXT PRIMARY KEY, - sats_per_month INTEGER NOT NULL -); - -INSERT OR IGNORE INTO plans (id, sats_per_month) VALUES - ('free', 0), - ('basic', 10000), - ('growth', 50000); - -CREATE TABLE IF NOT EXISTS relay_lifecycle_events ( - id TEXT PRIMARY KEY, - relay TEXT NOT NULL, - tenant TEXT NOT NULL, - event_type TEXT NOT NULL, - plan TEXT NOT NULL, - created_at INTEGER NOT NULL, - FOREIGN KEY (relay) REFERENCES relays(id), - FOREIGN KEY (tenant) REFERENCES tenants(pubkey) -); - -CREATE INDEX IF NOT EXISTS relay_lifecycle_events_relay_idx - ON relay_lifecycle_events (relay, created_at); - -CREATE INDEX IF NOT EXISTS relay_lifecycle_events_tenant_idx - ON relay_lifecycle_events (tenant, created_at); - -CREATE TABLE IF NOT EXISTS invoice_attempts ( - id TEXT PRIMARY KEY, - invoice TEXT NOT NULL, - run_id TEXT NOT NULL, - method TEXT NOT NULL, - outcome TEXT NOT NULL, - error TEXT NOT NULL DEFAULT '', - created_at INTEGER NOT NULL, - FOREIGN KEY (invoice) REFERENCES invoices(id) -); - -CREATE INDEX IF NOT EXISTS invoice_attempts_invoice_idx - ON invoice_attempts (invoice, created_at); - -CREATE INDEX IF NOT EXISTS invoice_attempts_run_id_idx - ON invoice_attempts (run_id); diff --git a/backend/spec/api.md b/backend/spec/api.md index cfaf4e6..20f055b 100644 --- a/backend/spec/api.md +++ b/backend/spec/api.md @@ -12,7 +12,7 @@ Members: Notes: -- Authentication is done using NIP 98 +- Authentication is done using NIP 98 comparing `u` to `self.host`, not the incoming request - Each route is responsible for authorization using `self.is_admin(pubkey)` or `self.is_tenant(authorized_pubkey, tenant_pubkey)` - Successful API responses should be of the form `{data, code: "ok"}` with an appropriate http status code. - Unsuccessful API responses should be of the form `{error, code}` with an appropriate http status code. `code` is a short error code (e.g. `duplicate-subdomain`) and `error` is a human-readable error message. diff --git a/backend/spec/billing.md b/backend/spec/billing.md index 777b889..d1e0739 100644 --- a/backend/spec/billing.md +++ b/backend/spec/billing.md @@ -20,7 +20,7 @@ Calls `self.tick` in a loop every hour. Iterates over `repo.list_activity` since last run and does the following: -- For any `relay_created|relay_updated` activity if this is the first non-free relay for the tenant, update tenant's billing anchor to the time the relay was created. +- For any `relay_created|relay_updated|relay_activated` activity if this is the first non-free relay for the tenant, update tenant's billing anchor to the time the relay was created. Also iterates over `repo.list_tenants()` and for each tenant calls `self.generate_invoice_if_due(tenant)` and `self.collect_outstanding(tenant)`. diff --git a/backend/spec/main.md b/backend/spec/main.md index 21aa9c6..d55009e 100644 --- a/backend/spec/main.md +++ b/backend/spec/main.md @@ -2,6 +2,7 @@ - Configures logging - Creates instances of `Repo`, `Robot`, `Billing`, `Api`, and `Infra` -- Spawns `billing.start()` -- Spawns `infra.start()` -- Calls `api.serve()` +- Calls `repo.migrate` +- Spawns `billing.start` +- Spawns `infra.start` +- Calls `api.serve` diff --git a/backend/spec/models.md b/backend/spec/models.md index 462cc30..fcb86dd 100644 --- a/backend/spec/models.md +++ b/backend/spec/models.md @@ -69,11 +69,11 @@ Invoices are generated at the end of a tenant's monthly billing period. The bill - `status` - `pending|paid|closed` - `amount` is derived as the sum of associated invoice item `sats` values (not stored as a separate source of truth) - `created_at` - unix timestamp for when the invoice was created -- `attempted_at` - unix timestamp for when collection was last attempted +- `attempted_at` - nullable unix timestamp for when collection was last attempted - `error` - optional human-readable error from the last failed collection attempt -- `closed_at` - unix timestamp for when the invoice was closed -- `sent_at` - unix timestamp for when the invoice was sent via DM -- `paid_at` - unix timestamp for when the invoice was paid +- `closed_at` - nullable unix timestamp for when the invoice was closed +- `sent_at` - nullable unix timestamp for when the invoice was sent via DM +- `paid_at` - nullable unix timestamp for when the invoice was paid - `bolt11` - a BOLT 11 lightning invoice that can be used to pay the invoice - `period_start` - unix timestamp for period start - `period_end` - unix timestamp for period end diff --git a/backend/spec/repo.md b/backend/spec/repo.md index a9428fd..f33f773 100644 --- a/backend/spec/repo.md +++ b/backend/spec/repo.md @@ -4,20 +4,23 @@ Repo is a wrapper around a sqlite pool which implements methods related to datab Members: -- `database_url: String` - the location of the sqlite database, from `DATABASE_URL` - `pool: sqlx::SqlitePool` - a sqlite connection pool Notes: -- All public methods should be run in a transaction so they're atomic +- All public write methods should be run in a transaction so they're atomic - All writes should be accompanied by an activity log entry of `(activity_type, identifier)` ## `pub fn new() -> Self` -- Reads environment and populates members -- Ensures that any directories referred to in `self.database_url` exist +- Reads `DATABASE_URL` from environment +- Ensures that any directories referred to in `DATABASE_URL` exist - Initializes its sqlx `pool` +## `pub fn migrate(&self) -> Result<()>` + +- Runs migrations found in the `migrations` directory. + ## `pub fn list_tenants(&self) -> Result>` - Returns all tenants diff --git a/backend/src/api.rs b/backend/src/api.rs index e44c7ff..070a7af 100644 --- a/backend/src/api.rs +++ b/backend/src/api.rs @@ -1,341 +1,494 @@ use std::sync::Arc; use anyhow::{Result, anyhow}; +use base64::Engine; use axum::{ Json, Router, - extract::{Path, State}, + extract::{Path, Query, State}, http::{HeaderMap, Method, StatusCode, Uri}, response::{IntoResponse, Response}, routing::{get, post, put}, }; +use nostr_sdk::{Event, JsonUtil, Kind}; use serde::{Deserialize, Serialize}; -use serde_json::Value; +use tower_http::cors::{AllowOrigin, CorsLayer}; -use crate::auth::verify_nip98; -use crate::billing::now_ts; -use crate::models::{Relay, RelayConfig, Tenant}; -use crate::provisioning::Provisioner; +use crate::models::{Relay, Tenant}; use crate::repo::Repo; #[derive(Clone)] -pub struct AppState { - pub repo: Repo, - pub admin_pubkeys: Arc>, - pub provisioner: Provisioner, +pub struct Api { + host: String, + port: u16, + admins: Vec, + origins: Vec, + repo: Repo, } -pub fn router(state: AppState) -> Router { - let tenant_routes = Router::new() - .route("/tenant", get(get_tenant)) - .route( - "/tenant/relays", - get(list_tenant_relays).post(create_tenant_relay), - ) - .route( - "/tenant/relays/:id", - get(get_tenant_relay).put(update_tenant_relay), - ) - .route("/tenant/relays/:id/plan", put(update_tenant_relay_plan)) - .route( - "/tenant/relays/:id/deactivate", - post(deactivate_tenant_relay), - ) - .route("/tenant/invoices", get(list_tenant_invoices)) - .route("/tenant/billing", put(update_tenant_billing)); - - let admin_routes = Router::new() - .route("/admin/check", get(admin_check)) - .route("/admin/tenants", get(admin_list_tenants)) - .route( - "/admin/tenants/:pubkey", - get(admin_get_tenant).put(admin_update_tenant_status), - ) - .route("/admin/relays", get(admin_list_relays)) - .route( - "/admin/relays/:id", - get(admin_get_relay).put(admin_update_relay), - ) - .route("/admin/relays/:id/deactivate", post(admin_deactivate_relay)); - - Router::new() - .merge(tenant_routes) - .merge(admin_routes) - .with_state(state) +#[derive(Clone)] +struct AppState { + api: Arc, } -// ── error helpers ───────────────────────────────────────────────────────────── +#[derive(Serialize)] +struct OkResponse { + data: T, + code: &'static str, +} -#[derive(Debug, Serialize)] -struct ApiError { +#[derive(Serialize)] +struct ErrorResponse { error: String, + code: String, } -impl IntoResponse for ApiError { - fn into_response(self) -> Response { - (StatusCode::BAD_REQUEST, Json(self)).into_response() +impl Api { + pub fn new(repo: Repo) -> Self { + let host = std::env::var("HOST").unwrap_or_else(|_| "127.0.0.1".to_string()); + let port = std::env::var("PORT") + .ok() + .and_then(|v| v.parse().ok()) + .unwrap_or(3000); + let admins = std::env::var("ADMINS") + .unwrap_or_default() + .split(',') + .map(|v| v.trim().to_lowercase()) + .filter(|v| !v.is_empty()) + .collect(); + let origins = std::env::var("ALLOW_ORIGINS") + .unwrap_or_default() + .split(',') + .map(|v| v.trim().to_string()) + .filter(|v| !v.is_empty()) + .collect(); + Self { + host, + port, + admins, + origins, + repo, + } + } + + pub async fn serve(&self) -> Result<()> { + let state = AppState { + api: Arc::new(self.clone()), + }; + + let app = Router::new() + .route("/tenants", get(list_tenants).post(create_tenant)) + .route("/tenants/:pubkey", get(get_tenant)) + .route("/tenants/:pubkey/billing", put(update_tenant_billing)) + .route("/relays", get(list_relays).post(create_relay)) + .route("/relays/:id", get(get_relay).put(update_relay)) + .route("/relays/:id/deactivate", post(deactivate_relay)) + .route("/invoices", get(list_invoices)) + .with_state(state) + .layer(self.cors_layer()); + + let listener = tokio::net::TcpListener::bind(format!("{}:{}", self.host, self.port)).await?; + axum::serve(listener, app).await?; + Ok(()) + } + + fn cors_layer(&self) -> CorsLayer { + if self.origins.is_empty() { + CorsLayer::permissive() + } else { + let origins = self + .origins + .iter() + .filter_map(|o| o.parse::().ok()) + .collect::>(); + CorsLayer::new().allow_origin(AllowOrigin::list(origins)) + } + } + + fn is_admin(&self, pubkey: &str) -> bool { + self.admins.iter().any(|a| a == pubkey) + } + + fn is_tenant(&self, authorized_pubkey: &str, tenant_pubkey: &str) -> bool { + authorized_pubkey == tenant_pubkey } } -fn unauthorized() -> Response { +fn ok(status: StatusCode, data: T) -> Response { + (status, Json(OkResponse { data, code: "ok" })).into_response() +} + +fn err(status: StatusCode, code: &str, message: &str) -> Response { ( - StatusCode::UNAUTHORIZED, - Json(ApiError { - error: "unauthorized".into(), + status, + Json(ErrorResponse { + error: message.to_string(), + code: code.to_string(), }), ) .into_response() } -fn forbidden() -> Response { - ( - StatusCode::FORBIDDEN, - Json(ApiError { - error: "forbidden".into(), - }), - ) - .into_response() +fn now_ts() -> i64 { + chrono::Utc::now().timestamp() } -fn not_found() -> Response { - ( - StatusCode::NOT_FOUND, - Json(ApiError { - error: "not found".into(), - }), - ) - .into_response() +fn parse_bool_default(value: i64, default: i64) -> i64 { + if value == 0 || value == 1 { + value + } else { + default + } } -fn internal_error(msg: &str) -> Response { - ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(ApiError { - error: msg.to_string(), - }), - ) - .into_response() +fn prepare_relay(mut relay: Relay) -> anyhow::Result { + if !relay + .subdomain + .chars() + .all(|c| c.is_ascii_lowercase() || c.is_ascii_digit() || c == '-') + { + return Err(anyhow!("invalid-subdomain")); + } + + if relay.plan == "free" && relay.blossom_enabled == 1 { + return Err(anyhow!("premium-feature")); + } + if relay.plan == "free" && relay.livekit_enabled == 1 { + return Err(anyhow!("premium-feature")); + } + + if relay.schema.is_empty() { + relay.schema = format!("{}_{}", relay.subdomain.replace('-', "_"), relay.id); + } + if relay.status.is_empty() { + relay.status = "new".to_string(); + } + relay.sync_error = relay.sync_error; + relay.policy_public_join = parse_bool_default(relay.policy_public_join, 0); + relay.policy_strip_signatures = parse_bool_default(relay.policy_strip_signatures, 0); + relay.groups_enabled = parse_bool_default(relay.groups_enabled, 1); + relay.management_enabled = parse_bool_default(relay.management_enabled, 1); + relay.blossom_enabled = parse_bool_default(relay.blossom_enabled, if relay.plan == "free" { 0 } else { 1 }); + relay.livekit_enabled = parse_bool_default(relay.livekit_enabled, if relay.plan == "free" { 0 } else { 1 }); + relay.push_enabled = parse_bool_default(relay.push_enabled, 1); + + Ok(relay) } -fn is_unique_subdomain_violation(err: &anyhow::Error) -> bool { - let Some(sqlx_err) = err.downcast_ref::() else { - return false; - }; +fn map_unique_error(err: &anyhow::Error) -> Option<&'static str> { + let sqlx_err = err.downcast_ref::()?; let sqlx::Error::Database(db_err) = sqlx_err else { - return false; + return None; }; - db_err.message().contains("relays.subdomain") - || db_err.message().contains("relays_subdomain_unique") + if db_err.message().contains("pubkey") { + return Some("pubkey-exists"); + } + if db_err.message().contains("subdomain") { + return Some("subdomain-exists"); + } + None } -fn extract_auth_pubkey(headers: &HeaderMap, method: &Method, uri: &Uri) -> Result { - let auth_header = headers +fn auth_fail_response(e: anyhow::Error) -> Response { + err(StatusCode::UNAUTHORIZED, "unauthorized", &e.to_string()) +} + +fn extract_auth_pubkey(headers: &HeaderMap, method: &Method, _uri: &Uri, host: &str) -> Result { + let auth = headers .get(axum::http::header::AUTHORIZATION) .and_then(|v| v.to_str().ok()) .ok_or_else(|| anyhow!("missing authorization header"))?; + if !auth.starts_with("Nostr ") { + return Err(anyhow!("authorization must use Nostr scheme")); + } - let host = headers - .get(axum::http::header::HOST) - .and_then(|v| v.to_str().ok()) - .unwrap_or_default(); + let (_, b64) = auth + .split_once(' ') + .ok_or_else(|| anyhow!("malformed authorization header"))?; + let bytes = base64::engine::general_purpose::STANDARD.decode(b64)?; + let json = String::from_utf8(bytes)?; + let event = Event::from_json(json)?; - let scheme = headers - .get("x-forwarded-proto") - .and_then(|v| v.to_str().ok()) - .unwrap_or("http"); + if event.kind != Kind::HttpAuth { + return Err(anyhow!("invalid nip98 kind")); + } + event.verify()?; - let path = uri - .path_and_query() - .map(|v| v.as_str()) - .unwrap_or(uri.path()); - let url = format!("{scheme}://{host}{path}"); - let pubkey = verify_nip98(auth_header, &url, method.as_str())?; - Ok(pubkey.to_hex()) + let expected_host = host; + let want_m = method.as_str(); + + let mut got_u = None::; + let mut got_m = None::; + for tag in event.tags.iter() { + let values = tag.as_slice(); + if values.len() >= 2 { + if values[0] == "u" { + got_u = Some(values[1].to_string()); + } else if values[0] == "method" { + got_m = Some(values[1].to_string()); + } + } + } + + let Some(got_u) = got_u else { + return Err(anyhow!("missing u tag")); + }; + let Some(got_m) = got_m else { + return Err(anyhow!("missing method tag")); + }; + + if !expected_host.is_empty() && !got_u.contains(expected_host) { + return Err(anyhow!("authorization host mismatch")); + } + if got_m != want_m { + return Err(anyhow!("authorization method mismatch")); + } + + Ok(event.pubkey.to_hex()) } -fn new_tenant(pubkey: &str) -> Tenant { - let now = now_ts(); - Tenant { - pubkey: pubkey.to_string(), - status: "active".to_string(), - nwc_url: String::new(), - created_at: now, - billing_anchor_at: now, - stripe_customer_id: String::new(), - stripe_subscription_id: String::new(), +#[derive(Deserialize)] +struct TenantParam { + tenant: Option, +} + +#[derive(Deserialize, Serialize)] +struct UpdateTenantBillingRequest { + nwc_url: String, +} + +#[derive(Deserialize)] +struct CreateRelayRequest { + tenant: String, + subdomain: String, + plan: String, + info_name: Option, + info_icon: Option, + info_description: Option, + policy_public_join: Option, + policy_strip_signatures: Option, + groups_enabled: Option, + management_enabled: Option, + blossom_enabled: Option, + livekit_enabled: Option, + push_enabled: Option, +} + +#[derive(Deserialize)] +struct UpdateRelayRequest { + subdomain: Option, + plan: Option, + info_name: Option, + info_icon: Option, + info_description: Option, + policy_public_join: Option, + policy_strip_signatures: Option, + groups_enabled: Option, + management_enabled: Option, + blossom_enabled: Option, + livekit_enabled: Option, + push_enabled: Option, +} + +async fn list_tenants( + State(state): State, + headers: HeaderMap, + method: Method, + uri: Uri, +) -> Response { + let pubkey = match extract_auth_pubkey(&headers, &method, &uri, &state.api.host) { + Ok(v) => v, + Err(e) => return auth_fail_response(e), + }; + if !state.api.is_admin(&pubkey) { + return err(StatusCode::FORBIDDEN, "forbidden", "admin required"); + } + match state.api.repo.list_tenants().await { + Ok(tenants) => ok(StatusCode::OK, tenants), + Err(e) => err(StatusCode::INTERNAL_SERVER_ERROR, "internal", &e.to_string()), } } -// ── tenant routes ───────────────────────────────────────────────────────────── - async fn get_tenant( State(state): State, headers: HeaderMap, method: Method, uri: Uri, + Path(pubkey): Path, ) -> Response { - let pubkey = match extract_auth_pubkey(&headers, &method, &uri) { - Ok(p) => p, - Err(_) => return unauthorized(), + let auth = match extract_auth_pubkey(&headers, &method, &uri, &state.api.host) { + Ok(v) => v, + Err(e) => return auth_fail_response(e), + }; + if !(state.api.is_admin(&auth) || state.api.is_tenant(&auth, &pubkey)) { + return err(StatusCode::FORBIDDEN, "forbidden", "not authorized"); + } + match state.api.repo.get_tenant(&pubkey).await { + Ok(Some(tenant)) => ok(StatusCode::OK, tenant), + Ok(None) => err(StatusCode::NOT_FOUND, "not-found", "tenant not found"), + Err(e) => err(StatusCode::INTERNAL_SERVER_ERROR, "internal", &e.to_string()), + } +} + +async fn create_tenant( + State(state): State, + headers: HeaderMap, + method: Method, + uri: Uri, +) -> Response { + let pubkey = match extract_auth_pubkey(&headers, &method, &uri, &state.api.host) { + Ok(v) => v, + Err(e) => return auth_fail_response(e), }; - match state.repo.get_tenant(&pubkey).await { - Ok(Some(tenant)) => (StatusCode::OK, Json(tenant)).into_response(), - Ok(None) => { - let tenant = new_tenant(&pubkey); - match state.repo.create_tenant(&tenant).await { - Ok(()) => (StatusCode::OK, Json(tenant)).into_response(), - Err(_) => internal_error("failed to create tenant"), + let tenant = Tenant { + pubkey: pubkey.clone(), + nwc_url: String::new(), + created_at: now_ts(), + billing_anchor: now_ts(), + }; + + match state.api.repo.create_tenant(&tenant).await { + Ok(()) => ok(StatusCode::CREATED, tenant), + Err(e) => { + if matches!(map_unique_error(&e), Some("pubkey-exists")) { + err(StatusCode::UNPROCESSABLE_ENTITY, "pubkey-exists", "tenant already exists") + } else { + err(StatusCode::INTERNAL_SERVER_ERROR, "internal", &e.to_string()) } } - Err(_) => internal_error("failed to load tenant"), } } -async fn list_tenant_relays( +async fn list_relays( State(state): State, headers: HeaderMap, method: Method, uri: Uri, + Query(query): Query, ) -> Response { - let pubkey = match extract_auth_pubkey(&headers, &method, &uri) { - Ok(p) => p, - Err(_) => return unauthorized(), + let auth = match extract_auth_pubkey(&headers, &method, &uri, &state.api.host) { + Ok(v) => v, + Err(e) => return auth_fail_response(e), }; - match state.repo.list_relays_by_tenant(&pubkey).await { - Ok(relays) => (StatusCode::OK, Json(relays)).into_response(), - Err(_) => internal_error("failed to load relays"), - } -} - -#[derive(Debug, Deserialize)] -struct CreateRelayRequest { - name: String, - subdomain: String, - icon: String, - description: String, - plan: String, - config: Option, -} - -async fn create_tenant_relay( - State(state): State, - headers: HeaderMap, - method: Method, - uri: Uri, - Json(payload): Json, -) -> Response { - let pubkey = match extract_auth_pubkey(&headers, &method, &uri) { - Ok(p) => p, - Err(_) => return unauthorized(), - }; - - if state - .repo - .create_tenant_if_missing(&new_tenant(&pubkey)) - .await - .is_err() - { - return internal_error("failed to ensure tenant"); - } - - let now = now_ts(); - - // If this is the tenant's first paid relay, reset the billing anchor - if payload.plan != "free" - && let Ok(0) = state.repo.count_billable_relays(&pubkey).await - { - let _ = state.repo.reset_billing_anchor(&pubkey, now).await; - } - - let relay_id = payload.subdomain.replace('-', "_"); - let relay = Relay { - id: relay_id.clone(), - tenant: pubkey.clone(), - name: payload.name, - subdomain: payload.subdomain.clone(), - icon: payload.icon, - description: payload.description, - plan: payload.plan.clone(), - status: "pending".to_string(), - config: payload.config, - }; - - if let Err(err) = state.repo.upsert_relay(&relay).await { - if is_unique_subdomain_violation(&err) { - return ( - StatusCode::CONFLICT, - Json(ApiError { - error: "subdomain already exists".into(), - }), - ) - .into_response(); + let tenant_filter = if state.api.is_admin(&auth) { + query.tenant.as_deref() + } else { + if state.api.repo.get_tenant(&auth).await.ok().flatten().is_none() { + return err(StatusCode::FORBIDDEN, "forbidden", "tenant required"); } - return internal_error("failed to create relay"); - } - - if let Err(err) = state.provisioner.create_relay(&relay).await { - tracing::error!(relay_id, error = %err, "zooid create failed"); - let _ = state - .repo - .update_relay_status(&relay_id, "provisioning_failed") - .await; - return internal_error(&format!("failed to provision relay: {err}")); - } - - // Transition to active and write the provisioned lifecycle event - if let Err(err) = state - .repo - .transition_relay( - &relay_id, - &pubkey, - &payload.plan, - "active", - "provisioned", - now, - ) - .await - { - tracing::error!(relay_id, error = %err, "lifecycle event write failed"); - } - - let relay = Relay { - status: "active".to_string(), - ..relay + if query.tenant.is_some() { + return err( + StatusCode::BAD_REQUEST, + "tenant-query-not-allowed", + "tenant query is not allowed for tenant users", + ); + } + Some(auth.as_str()) }; - (StatusCode::CREATED, Json(relay)).into_response() + + match state.api.repo.list_relays(tenant_filter).await { + Ok(relays) => ok(StatusCode::OK, relays), + Err(e) => err(StatusCode::INTERNAL_SERVER_ERROR, "internal", &e.to_string()), + } } -async fn get_tenant_relay( +async fn get_relay( State(state): State, headers: HeaderMap, method: Method, uri: Uri, Path(id): Path, ) -> Response { - let pubkey = match extract_auth_pubkey(&headers, &method, &uri) { - Ok(p) => p, - Err(_) => return unauthorized(), + let auth = match extract_auth_pubkey(&headers, &method, &uri, &state.api.host) { + Ok(v) => v, + Err(e) => return auth_fail_response(e), }; - match state.repo.get_relay(&id).await { - Ok(Some(relay)) if relay.tenant == pubkey => (StatusCode::OK, Json(relay)).into_response(), - Ok(Some(_)) => forbidden(), - Ok(None) => not_found(), - Err(_) => internal_error("failed to load relay"), + let relay = match state.api.repo.get_relay(&id).await { + Ok(Some(r)) => r, + Ok(None) => return err(StatusCode::NOT_FOUND, "not-found", "relay not found"), + Err(e) => return err(StatusCode::INTERNAL_SERVER_ERROR, "internal", &e.to_string()), + }; + + if !(state.api.is_admin(&auth) || state.api.is_tenant(&auth, &relay.tenant)) { + return err(StatusCode::FORBIDDEN, "forbidden", "not authorized"); + } + + ok(StatusCode::OK, relay) +} + +async fn create_relay( + State(state): State, + headers: HeaderMap, + method: Method, + uri: Uri, + Json(payload): Json, +) -> Response { + let auth = match extract_auth_pubkey(&headers, &method, &uri, &state.api.host) { + Ok(v) => v, + Err(e) => return auth_fail_response(e), + }; + + if !(state.api.is_admin(&auth) || state.api.is_tenant(&auth, &payload.tenant)) { + return err(StatusCode::FORBIDDEN, "forbidden", "not authorized"); + } + + let mut relay = Relay { + id: uuid::Uuid::new_v4().to_string(), + tenant: payload.tenant, + schema: String::new(), + subdomain: payload.subdomain, + plan: payload.plan, + status: "new".to_string(), + sync_error: String::new(), + info_name: payload.info_name.unwrap_or_default(), + info_icon: payload.info_icon.unwrap_or_default(), + info_description: payload.info_description.unwrap_or_default(), + policy_public_join: payload.policy_public_join.unwrap_or(0), + policy_strip_signatures: payload.policy_strip_signatures.unwrap_or(0), + groups_enabled: payload.groups_enabled.unwrap_or(1), + management_enabled: payload.management_enabled.unwrap_or(1), + blossom_enabled: payload.blossom_enabled.unwrap_or(0), + livekit_enabled: payload.livekit_enabled.unwrap_or(0), + push_enabled: payload.push_enabled.unwrap_or(1), + }; + + relay = match prepare_relay(relay) { + Ok(r) => r, + Err(e) if e.to_string() == "premium-feature" => { + return err( + StatusCode::UNPROCESSABLE_ENTITY, + "premium-feature", + "feature requires a paid plan", + ); + } + Err(_) => { + return err( + StatusCode::UNPROCESSABLE_ENTITY, + "invalid-relay", + "relay validation failed", + ); + } + }; + + match state.api.repo.create_relay(&relay).await { + Ok(()) => ok(StatusCode::CREATED, relay), + Err(e) => { + if matches!(map_unique_error(&e), Some("subdomain-exists")) { + err( + StatusCode::UNPROCESSABLE_ENTITY, + "subdomain-exists", + "subdomain already exists", + ) + } else { + err(StatusCode::INTERNAL_SERVER_ERROR, "internal", &e.to_string()) + } + } } } -#[derive(Debug, Deserialize)] -struct UpdateRelayRequest { - name: String, - subdomain: String, - icon: String, - description: String, - config: Option, -} - -async fn update_tenant_relay( +async fn update_relay( State(state): State, headers: HeaderMap, method: Method, @@ -343,204 +496,152 @@ async fn update_tenant_relay( Path(id): Path, Json(payload): Json, ) -> Response { - let pubkey = match extract_auth_pubkey(&headers, &method, &uri) { - Ok(p) => p, - Err(_) => return unauthorized(), + let auth = match extract_auth_pubkey(&headers, &method, &uri, &state.api.host) { + Ok(v) => v, + Err(e) => return auth_fail_response(e), }; - let existing = match state.repo.get_relay(&id).await { - Ok(Some(relay)) => relay, - Ok(None) => return not_found(), - Err(_) => return internal_error("failed to load relay"), + let mut relay = match state.api.repo.get_relay(&id).await { + Ok(Some(r)) => r, + Ok(None) => return err(StatusCode::NOT_FOUND, "not-found", "relay not found"), + Err(e) => return err(StatusCode::INTERNAL_SERVER_ERROR, "internal", &e.to_string()), }; - if existing.tenant != pubkey { - return forbidden(); + if !(state.api.is_admin(&auth) || state.api.is_tenant(&auth, &relay.tenant)) { + return err(StatusCode::FORBIDDEN, "forbidden", "not authorized"); } - let relay = Relay { - name: payload.name, - subdomain: payload.subdomain, - icon: payload.icon, - description: payload.description, - config: payload.config, - ..existing - }; + if let Some(v) = payload.subdomain { + relay.subdomain = v; + } + if let Some(v) = payload.plan { + relay.plan = v; + } + if let Some(v) = payload.info_name { + relay.info_name = v; + } + if let Some(v) = payload.info_icon { + relay.info_icon = v; + } + if let Some(v) = payload.info_description { + relay.info_description = v; + } + if let Some(v) = payload.policy_public_join { + relay.policy_public_join = v; + } + if let Some(v) = payload.policy_strip_signatures { + relay.policy_strip_signatures = v; + } + if let Some(v) = payload.groups_enabled { + relay.groups_enabled = v; + } + if let Some(v) = payload.management_enabled { + relay.management_enabled = v; + } + if let Some(v) = payload.blossom_enabled { + relay.blossom_enabled = v; + } + if let Some(v) = payload.livekit_enabled { + relay.livekit_enabled = v; + } + if let Some(v) = payload.push_enabled { + relay.push_enabled = v; + } - if let Err(err) = state.repo.upsert_relay(&relay).await { - if is_unique_subdomain_violation(&err) { - return ( - StatusCode::CONFLICT, - Json(ApiError { - error: "subdomain already exists".into(), - }), - ) - .into_response(); + relay = match prepare_relay(relay) { + Ok(r) => r, + Err(e) if e.to_string() == "premium-feature" => { + return err( + StatusCode::UNPROCESSABLE_ENTITY, + "premium-feature", + "feature requires a paid plan", + ); + } + Err(_) => { + return err( + StatusCode::UNPROCESSABLE_ENTITY, + "invalid-relay", + "relay validation failed", + ); } - return internal_error("failed to update relay"); - } - - if let Err(err) = state.provisioner.update_relay(&relay).await { - tracing::error!(relay_id = relay.id, error = %err, "zooid patch failed"); - return internal_error(&format!("failed to provision relay: {err}")); - } - - let _ = state.repo.update_relay_status(&relay.id, "active").await; - (StatusCode::OK, Json(relay)).into_response() -} - -#[derive(Debug, Deserialize)] -struct UpdateRelayPlanRequest { - plan: String, -} - -async fn update_tenant_relay_plan( - State(state): State, - headers: HeaderMap, - method: Method, - uri: Uri, - Path(id): Path, - Json(payload): Json, -) -> Response { - let pubkey = match extract_auth_pubkey(&headers, &method, &uri) { - Ok(p) => p, - Err(_) => return unauthorized(), }; - let new_plan = payload.plan.trim().to_lowercase(); - if !matches!(new_plan.as_str(), "free" | "basic" | "growth") { - return ( - StatusCode::BAD_REQUEST, - Json(ApiError { - error: "invalid plan".into(), - }), - ) - .into_response(); + match state.api.repo.update_relay(&relay).await { + Ok(()) => ok(StatusCode::OK, relay), + Err(e) => { + if matches!(map_unique_error(&e), Some("subdomain-exists")) { + err( + StatusCode::UNPROCESSABLE_ENTITY, + "subdomain-exists", + "subdomain already exists", + ) + } else { + err(StatusCode::INTERNAL_SERVER_ERROR, "internal", &e.to_string()) + } + } } - - let existing = match state.repo.get_relay(&id).await { - Ok(Some(relay)) => relay, - Ok(None) => return not_found(), - Err(_) => return internal_error("failed to load relay"), - }; - - if existing.tenant != pubkey { - return forbidden(); - } - - // No-op if plan unchanged - if existing.plan == new_plan { - return (StatusCode::OK, Json(existing)).into_response(); - } - - let now = now_ts(); - - // If switching to the first paid plan, reset billing anchor - if new_plan != "free" - && existing.plan == "free" - && let Ok(0) = state.repo.count_billable_relays(&pubkey).await - { - let _ = state.repo.reset_billing_anchor(&pubkey, now).await; - } - - let mut relay = Relay { - plan: new_plan.clone(), - ..existing - }; - if relay.plan == "free" { - relay.config = Some(disable_paid_features(relay.config)); - } - - if state.repo.upsert_relay(&relay).await.is_err() { - return internal_error("failed to update relay plan"); - } - - if let Err(err) = state.provisioner.update_relay(&relay).await { - tracing::error!(relay_id = relay.id, error = %err, "zooid patch failed"); - return internal_error(&format!("failed to provision relay: {err}")); - } - - let _ = state.repo.update_relay_status(&relay.id, "active").await; - - // Write a plan-change lifecycle event so billing can split by tier - if let Err(err) = state - .repo - .transition_relay(&relay.id, &pubkey, &new_plan, "active", "provisioned", now) - .await - { - tracing::warn!(relay_id = relay.id, error = %err, "plan-change lifecycle event failed"); - } - - (StatusCode::OK, Json(relay)).into_response() } -async fn deactivate_tenant_relay( +async fn deactivate_relay( State(state): State, headers: HeaderMap, method: Method, uri: Uri, Path(id): Path, ) -> Response { - let pubkey = match extract_auth_pubkey(&headers, &method, &uri) { - Ok(p) => p, - Err(_) => return unauthorized(), + let auth = match extract_auth_pubkey(&headers, &method, &uri, &state.api.host) { + Ok(v) => v, + Err(e) => return auth_fail_response(e), }; - let existing = match state.repo.get_relay(&id).await { - Ok(Some(relay)) => relay, - Ok(None) => return not_found(), - Err(_) => return internal_error("failed to load relay"), + let relay = match state.api.repo.get_relay(&id).await { + Ok(Some(r)) => r, + Ok(None) => return err(StatusCode::NOT_FOUND, "not-found", "relay not found"), + Err(e) => return err(StatusCode::INTERNAL_SERVER_ERROR, "internal", &e.to_string()), }; - if existing.tenant != pubkey { - return forbidden(); + if !(state.api.is_admin(&auth) || state.api.is_tenant(&auth, &relay.tenant)) { + return err(StatusCode::FORBIDDEN, "forbidden", "not authorized"); } - let now = now_ts(); - if state - .repo - .transition_relay( - &id, - &pubkey, - &existing.plan, - "deactivated", - "deactivated", - now, - ) - .await - .is_err() - { - return internal_error("failed to deactivate relay"); + match state.api.repo.deactivate_relay(&relay).await { + Ok(()) => ok(StatusCode::OK, ()), + Err(e) => err(StatusCode::INTERNAL_SERVER_ERROR, "internal", &e.to_string()), } - - let relay = Relay { - status: "deactivated".to_string(), - config: None, - ..existing - }; - (StatusCode::OK, Json(relay)).into_response() } -async fn list_tenant_invoices( +async fn list_invoices( State(state): State, headers: HeaderMap, method: Method, uri: Uri, + Query(query): Query, ) -> Response { - let pubkey = match extract_auth_pubkey(&headers, &method, &uri) { - Ok(p) => p, - Err(_) => return unauthorized(), + let auth = match extract_auth_pubkey(&headers, &method, &uri, &state.api.host) { + Ok(v) => v, + Err(e) => return auth_fail_response(e), }; - match state.repo.list_invoices_by_tenant(&pubkey).await { - Ok(invoices) => (StatusCode::OK, Json(invoices)).into_response(), - Err(_) => internal_error("failed to load invoices"), - } -} + let tenant_filter = if state.api.is_admin(&auth) { + query.tenant.as_deref() + } else { + if state.api.repo.get_tenant(&auth).await.ok().flatten().is_none() { + return err(StatusCode::FORBIDDEN, "forbidden", "tenant required"); + } + if query.tenant.is_some() { + return err( + StatusCode::BAD_REQUEST, + "tenant-query-not-allowed", + "tenant query is not allowed for tenant users", + ); + } + Some(auth.as_str()) + }; -#[derive(Debug, Deserialize, Serialize)] -struct UpdateTenantBillingRequest { - nwc_url: String, + match state.api.repo.list_invoices(tenant_filter).await { + Ok(invoices) => ok(StatusCode::OK, invoices), + Err(e) => err(StatusCode::INTERNAL_SERVER_ERROR, "internal", &e.to_string()), + } } async fn update_tenant_billing( @@ -548,300 +649,25 @@ async fn update_tenant_billing( headers: HeaderMap, method: Method, uri: Uri, + Path(pubkey): Path, Json(payload): Json, ) -> Response { - let pubkey = match extract_auth_pubkey(&headers, &method, &uri) { - Ok(p) => p, - Err(_) => return unauthorized(), + let auth = match extract_auth_pubkey(&headers, &method, &uri, &state.api.host) { + Ok(v) => v, + Err(e) => return auth_fail_response(e), }; + if !(state.api.is_admin(&auth) || state.api.is_tenant(&auth, &pubkey)) { + return err(StatusCode::FORBIDDEN, "forbidden", "not authorized"); + } + match state + .api .repo .update_tenant_nwc_url(&pubkey, &payload.nwc_url) .await { - Ok(()) => (StatusCode::OK, Json(payload)).into_response(), - Err(_) => internal_error("failed to update billing"), + Ok(()) => ok(StatusCode::OK, payload), + Err(e) => err(StatusCode::INTERNAL_SERVER_ERROR, "internal", &e.to_string()), } } - -// ── admin routes ────────────────────────────────────────────────────────────── - -#[derive(Debug, Serialize)] -struct AdminCheckResponse { - is_admin: bool, -} - -async fn admin_check( - State(state): State, - headers: HeaderMap, - method: Method, - uri: Uri, -) -> Response { - let pubkey = match extract_auth_pubkey(&headers, &method, &uri) { - Ok(p) => p, - Err(_) => return unauthorized(), - }; - let is_admin = state.admin_pubkeys.contains(&pubkey); - (StatusCode::OK, Json(AdminCheckResponse { is_admin })).into_response() -} - -async fn admin_list_tenants( - State(state): State, - headers: HeaderMap, - method: Method, - uri: Uri, -) -> Response { - let pubkey = match extract_auth_pubkey(&headers, &method, &uri) { - Ok(p) => p, - Err(_) => return unauthorized(), - }; - if !state.admin_pubkeys.contains(&pubkey) { - return forbidden(); - } - match state.repo.list_tenants().await { - Ok(tenants) => (StatusCode::OK, Json(tenants)).into_response(), - Err(_) => internal_error("failed to load tenants"), - } -} - -async fn admin_get_tenant( - State(state): State, - headers: HeaderMap, - method: Method, - uri: Uri, - Path(pubkey): Path, -) -> Response { - let admin = match extract_auth_pubkey(&headers, &method, &uri) { - Ok(p) => p, - Err(_) => return unauthorized(), - }; - if !state.admin_pubkeys.contains(&admin) { - return forbidden(); - } - - let tenant = match state.repo.get_tenant(&pubkey).await { - Ok(Some(t)) => t, - Ok(None) => return not_found(), - Err(_) => return internal_error("failed to load tenant"), - }; - let relays = match state.repo.list_relays_by_tenant(&pubkey).await { - Ok(r) => r, - Err(_) => return internal_error("failed to load relays"), - }; - - #[derive(Serialize)] - struct TenantDetail { - tenant: Tenant, - relays: Vec, - } - - (StatusCode::OK, Json(TenantDetail { tenant, relays })).into_response() -} - -#[derive(Debug, Deserialize)] -struct UpdateTenantStatusRequest { - status: String, -} - -async fn admin_update_tenant_status( - State(state): State, - headers: HeaderMap, - method: Method, - uri: Uri, - Path(pubkey): Path, - Json(payload): Json, -) -> Response { - let admin = match extract_auth_pubkey(&headers, &method, &uri) { - Ok(p) => p, - Err(_) => return unauthorized(), - }; - if !state.admin_pubkeys.contains(&admin) { - return forbidden(); - } - - let tenant = match state.repo.get_tenant(&pubkey).await { - Ok(Some(t)) => t, - Ok(None) => return not_found(), - Err(_) => return internal_error("failed to load tenant"), - }; - - match state - .repo - .update_tenant_status(&tenant.pubkey, &payload.status) - .await - { - Ok(()) => ( - StatusCode::OK, - Json(Tenant { - status: payload.status, - ..tenant - }), - ) - .into_response(), - Err(_) => internal_error("failed to update tenant"), - } -} - -async fn admin_list_relays( - State(state): State, - headers: HeaderMap, - method: Method, - uri: Uri, -) -> Response { - let admin = match extract_auth_pubkey(&headers, &method, &uri) { - Ok(p) => p, - Err(_) => return unauthorized(), - }; - if !state.admin_pubkeys.contains(&admin) { - return forbidden(); - } - match state.repo.list_relays().await { - Ok(relays) => (StatusCode::OK, Json(relays)).into_response(), - Err(_) => internal_error("failed to load relays"), - } -} - -async fn admin_get_relay( - State(state): State, - headers: HeaderMap, - method: Method, - uri: Uri, - Path(id): Path, -) -> Response { - let admin = match extract_auth_pubkey(&headers, &method, &uri) { - Ok(p) => p, - Err(_) => return unauthorized(), - }; - if !state.admin_pubkeys.contains(&admin) { - return forbidden(); - } - match state.repo.get_relay(&id).await { - Ok(Some(relay)) => (StatusCode::OK, Json(relay)).into_response(), - Ok(None) => not_found(), - Err(_) => internal_error("failed to load relay"), - } -} - -async fn admin_update_relay( - State(state): State, - headers: HeaderMap, - method: Method, - uri: Uri, - Path(id): Path, - Json(payload): Json, -) -> Response { - let admin = match extract_auth_pubkey(&headers, &method, &uri) { - Ok(p) => p, - Err(_) => return unauthorized(), - }; - if !state.admin_pubkeys.contains(&admin) { - return forbidden(); - } - - let existing = match state.repo.get_relay(&id).await { - Ok(Some(relay)) => relay, - Ok(None) => return not_found(), - Err(_) => return internal_error("failed to load relay"), - }; - - let relay = Relay { - name: payload.name, - subdomain: payload.subdomain, - icon: payload.icon, - description: payload.description, - config: payload.config, - ..existing - }; - - if let Err(err) = state.repo.upsert_relay(&relay).await { - if is_unique_subdomain_violation(&err) { - return ( - StatusCode::CONFLICT, - Json(ApiError { - error: "subdomain already exists".into(), - }), - ) - .into_response(); - } - return internal_error("failed to update relay"); - } - - if let Err(err) = state.provisioner.update_relay(&relay).await { - tracing::error!(relay_id = relay.id, error = %err, "zooid patch failed"); - return internal_error(&format!("failed to update relay config: {err}")); - } - - (StatusCode::OK, Json(relay)).into_response() -} - -async fn admin_deactivate_relay( - State(state): State, - headers: HeaderMap, - method: Method, - uri: Uri, - Path(id): Path, -) -> Response { - let admin = match extract_auth_pubkey(&headers, &method, &uri) { - Ok(p) => p, - Err(_) => return unauthorized(), - }; - if !state.admin_pubkeys.contains(&admin) { - return forbidden(); - } - - let existing = match state.repo.get_relay(&id).await { - Ok(Some(relay)) => relay, - Ok(None) => return not_found(), - Err(_) => return internal_error("failed to load relay"), - }; - - let now = now_ts(); - if state - .repo - .transition_relay( - &id, - &existing.tenant, - &existing.plan, - "deactivated", - "deactivated", - now, - ) - .await - .is_err() - { - return internal_error("failed to deactivate relay"); - } - - let relay = Relay { - status: "deactivated".to_string(), - config: None, - ..existing - }; - (StatusCode::OK, Json(relay)).into_response() -} - -// ── relay config helpers ────────────────────────────────────────────────────── - -fn disable_paid_features(config: Option) -> RelayConfig { - let mut cfg = config.unwrap_or(RelayConfig { - policy: None, - groups: None, - management: None, - blossom: None, - livekit: None, - push: None, - }); - set_config_bool(&mut cfg.blossom, "enabled", false); - set_config_bool(&mut cfg.livekit, "enabled", false); - cfg -} - -fn set_config_bool(section: &mut Option, key: &str, enabled: bool) { - let mut obj = section - .take() - .and_then(|v| v.as_object().cloned()) - .unwrap_or_default(); - obj.insert(key.to_string(), Value::Bool(enabled)); - *section = Some(Value::Object(obj)); -} diff --git a/backend/src/auth.rs b/backend/src/auth.rs deleted file mode 100644 index 68743ef..0000000 --- a/backend/src/auth.rs +++ /dev/null @@ -1,61 +0,0 @@ -use anyhow::{Result, anyhow}; -use base64::Engine; -use base64::engine::general_purpose; -use std::str::FromStr; - -use nostr_sdk::JsonUtil; -use nostr_sdk::nostr::key::PublicKey; -use nostr_sdk::nostr::nips::nip98::HttpMethod; -use nostr_sdk::nostr::types::url::Url; -use nostr_sdk::nostr::{Alphabet, Event, Kind, SingleLetterTag, TagKind, TagStandard}; - -pub fn verify_nip98(auth_header: &str, url: &str, method: &str) -> Result { - let url = Url::parse(url)?; - let method = HttpMethod::from_str(&method.to_uppercase())?; - - let event = decode_auth_event(auth_header)?; - - if event.kind != Kind::HttpAuth { - return Err(anyhow!("authorization event kind mismatch")); - } - - let authorized_url = - match event - .tags - .find_standardized(TagKind::SingleLetter(SingleLetterTag::lowercase( - Alphabet::U, - ))) { - Some(TagStandard::AbsoluteURL(url)) => url, - _ => return Err(anyhow!("authorization header missing url tag")), - }; - - let authorized_method = match event.tags.find_standardized(TagKind::Method) { - Some(TagStandard::Method(method)) => method, - _ => return Err(anyhow!("authorization header missing method tag")), - }; - - if authorized_url != &url || authorized_method != &method { - return Err(anyhow!("authorization does not match request")); - } - - event.verify()?; - Ok(event.pubkey) -} - -fn decode_auth_event(auth_header: &str) -> Result { - if auth_header.trim().is_empty() { - return Err(anyhow!("missing authorization header")); - } - - let (prefix, encoded) = auth_header - .split_once(' ') - .ok_or_else(|| anyhow!("malformed authorization header"))?; - - if prefix != "Nostr" || encoded.is_empty() { - return Err(anyhow!("malformed authorization header")); - } - - let decoded = general_purpose::STANDARD.decode(encoded)?; - let json = String::from_utf8(decoded)?; - Ok(Event::from_json(json)?) -} diff --git a/backend/src/billing.rs b/backend/src/billing.rs index 61fad8c..95dbd9d 100644 --- a/backend/src/billing.rs +++ b/backend/src/billing.rs @@ -1,316 +1,263 @@ -use anyhow::{Result, anyhow}; -use chrono::{DateTime, Months, TimeZone, Utc}; use std::collections::HashMap; -use tokio::time::{Duration, sleep}; -use uuid::Uuid; -use crate::models::{Invoice, InvoiceAttempt, InvoiceItem, RelayLifecycleEvent, Tenant}; -use crate::notifications::Nip17Notifier; +use anyhow::Result; +use chrono::{DateTime, Datelike, Duration, Months, TimeZone, Utc}; +use tokio::sync::Mutex; + +use crate::models::{Activity, Invoice, InvoiceItem, Relay, Tenant}; use crate::repo::Repo; - -use nostr_sdk::nips::nip47::{self, MakeInvoiceRequest, NostrWalletConnectURI, PayInvoiceRequest}; -use nostr_sdk::{Client, Filter, Keys, Kind, Timestamp}; - -const GRACE_DAYS: i64 = 7; -const DUE_DAYS: i64 = 7; -const WORKER_INTERVAL_SECS: u64 = 300; - -// ── service ─────────────────────────────────────────────────────────────────── +use crate::robot::Robot; #[derive(Clone)] -pub struct BillingService { +pub struct Billing { + nwc_url: String, repo: Repo, - notifier: Nip17Notifier, - platform_nwc_url: String, + robot: Robot, + last_activity_at: std::sync::Arc>, } -impl BillingService { - pub fn new(repo: Repo, notifier: Nip17Notifier, platform_nwc_url: String) -> Self { +impl Billing { + pub fn new(repo: Repo, robot: Robot) -> Self { + let nwc_url = std::env::var("NWC_URL").unwrap_or_default(); Self { + nwc_url, repo, - notifier, - platform_nwc_url, + robot, + last_activity_at: std::sync::Arc::new(Mutex::new(0)), } } - pub async fn run(self) { + pub async fn start(self) { + let mut interval = tokio::time::interval(std::time::Duration::from_secs(3600)); loop { - if let Err(err) = self.process_once().await { - tracing::error!(error = %err, "billing run failed"); + interval.tick().await; + if let Err(e) = self.tick().await { + tracing::error!(error = %e, "billing tick failed"); } - sleep(Duration::from_secs(WORKER_INTERVAL_SECS)).await; } } - async fn process_once(&self) -> Result<()> { + pub async fn tick(&self) -> Result<()> { + let mut since_guard = self.last_activity_at.lock().await; + let since = *since_guard; + let activity = self.repo.list_activity(&since, None).await?; + for a in &activity { + if matches!(a.activity_type.as_str(), "relay_created" | "relay_updated" | "relay_activated") { + self.maybe_reset_anchor_for_first_paid_relay(a).await?; + } + *since_guard = (*since_guard).max(a.created_at); + } + drop(since_guard); + let tenants = self.repo.list_tenants().await?; for tenant in &tenants { - if let Err(err) = self.generate_invoice_if_due(tenant).await { - tracing::error!(tenant = %tenant.pubkey, error = %err, "invoice generation failed"); - } - } - for tenant in &tenants { - if let Err(err) = self.collect_outstanding(tenant).await { - tracing::error!(tenant = %tenant.pubkey, error = %err, "collection failed"); - } + self.generate_invoice_if_due(tenant).await?; + self.collect_outstanding(tenant).await?; } + Ok(()) } - // ── invoice generation ──────────────────────────────────────────────────── - - async fn generate_invoice_if_due(&self, tenant: &Tenant) -> Result<()> { - if tenant.status != "active" { + async fn maybe_reset_anchor_for_first_paid_relay(&self, activity: &Activity) -> Result<()> { + let relay = match self.repo.get_relay(&activity.identifier).await? { + Some(r) => r, + None => return Ok(()), + }; + if relay.plan == "free" { return Ok(()); } - let anchor = ts_to_dt(tenant.billing_anchor_at)?; - let now = Utc::now(); - let (period_start, period_end) = current_billing_period(anchor, now); + let relays = self.repo.list_relays(Some(&relay.tenant)).await?; + let paid_active_count = relays + .into_iter() + .filter(|r| r.status == "active" && r.plan != "free") + .count() as i64; - // Only generate once the period has closed + if paid_active_count == 1 { + self.repo + .update_tenant_billing_anchor(&relay.tenant, activity.created_at) + .await?; + } + + Ok(()) + } + + async fn generate_invoice_if_due(&self, tenant: &Tenant) -> Result<()> { + if self.repo.total_pending_invoices_for_tenant(&tenant.pubkey).await? > 0 { + return Ok(()); + } + + let relays = self.repo.list_relays(Some(&tenant.pubkey)).await?; + let active_paid_relays: Vec = relays + .iter() + .filter(|r| r.status == "active" && r.plan != "free") + .cloned() + .collect(); + if active_paid_relays.is_empty() { + return Ok(()); + } + + let now = Utc::now(); + let anchor = ts_to_dt(tenant.billing_anchor)?; + let (period_start, period_end) = billing_window(anchor, now); if now < period_end { return Ok(()); } - let plans = self.repo.list_plans().await?; - let plan_amount_map: HashMap = - plans.into_iter().map(|p| (p.id, p.sats_per_month)).collect(); + let usage_events = self.repo.list_activity(&tenant.billing_anchor, Some(&tenant.pubkey)).await?; + let invoice_id = uuid::Uuid::new_v4().to_string(); + let mut items = Vec::new(); - let events = self - .repo - .list_lifecycle_events_for_tenant(&tenant.pubkey, dt_to_ts(period_end)) - .await?; + for relay in active_paid_relays { + let hours = relay_active_hours_in_window(&relay, &usage_events, period_start, period_end); + if hours <= 0 { + continue; + } + let plan_monthly = self.repo.get_relay_plan_amount_sats(&relay.plan).await?; + if plan_monthly <= 0 { + continue; + } - let invoice_id = Uuid::new_v4().to_string(); - let items = compute_invoice_items( - &invoice_id, - &events, - &plan_amount_map, - period_start, - period_end, - ); + let sats = ((plan_monthly as f64 / 30.0 / 24.0) * hours as f64).ceil() as i64; + if sats <= 0 { + continue; + } - let total: i64 = items.iter().map(|i| i.amount).sum(); + items.push(InvoiceItem { + id: uuid::Uuid::new_v4().to_string(), + invoice: invoice_id.clone(), + relay: relay.id, + sats, + }); + } + + let total: i64 = items.iter().map(|i| i.sats).sum(); if total == 0 { return Ok(()); } - let bolt11 = self.make_bolt11(total).await.unwrap_or_default(); - let invoice = Invoice { - id: invoice_id.clone(), - tenant: tenant.pubkey.clone(), - amount: total, - status: "pending".to_string(), - created_at: dt_to_ts(now), - bolt11, - period_start: dt_to_ts(period_start), - period_end: dt_to_ts(period_end), - }; - - let created = self - .repo - .create_invoice_with_items(&invoice, &items) - .await?; - - if created { - tracing::info!(tenant = %tenant.pubkey, invoice = %invoice_id, amount = total, "invoice generated"); - } - - Ok(()) - } - - // ── collection ──────────────────────────────────────────────────────────── - - async fn collect_outstanding(&self, tenant: &Tenant) -> Result<()> { - let invoices = self.repo.list_invoices_by_tenant(&tenant.pubkey).await?; - let unpaid: Vec<&Invoice> = invoices - .iter() - .filter(|inv| matches!(inv.status.as_str(), "pending" | "past_due")) - .collect(); - - if unpaid.is_empty() { - return Ok(()); - } - - for invoice in &unpaid { - self.attempt_collection(tenant, invoice).await?; - } - - // Re-fetch to check if all are now paid; auto-reactivate if so - let invoices_after = self.repo.list_invoices_by_tenant(&tenant.pubkey).await?; - let still_unpaid = invoices_after - .iter() - .any(|inv| matches!(inv.status.as_str(), "pending" | "past_due")); - - if !still_unpaid && tenant.status == "suspended" { - let now = now_ts(); - self.repo - .update_tenant_status(&tenant.pubkey, "active") - .await?; - self.repo - .reactivate_relays_for_tenant(&tenant.pubkey, now) - .await?; - tracing::info!(tenant = %tenant.pubkey, "tenant reactivated after full balance payment"); - } - - Ok(()) - } - - async fn attempt_collection(&self, tenant: &Tenant, invoice: &Invoice) -> Result<()> { - let now = Utc::now(); - let created_at = ts_to_dt(invoice.created_at)?; - let due_at = created_at + chrono::Duration::days(DUE_DAYS); - let grace_ends_at = due_at + chrono::Duration::days(GRACE_DAYS); - - // Deactivate after grace period expires - if now > grace_ends_at && invoice.status != "past_due" { - let ts = now_ts(); - self.repo - .update_tenant_status(&tenant.pubkey, "suspended") - .await?; - self.repo - .suspend_relays_for_tenant(&tenant.pubkey, ts) - .await?; - self.repo - .record_attempt( - &InvoiceAttempt { - id: Uuid::new_v4().to_string(), - invoice: invoice.id.clone(), - run_id: Uuid::new_v4().to_string(), - method: "system".to_string(), - outcome: "failed".to_string(), - error: "grace period expired".to_string(), - created_at: ts, - }, - "past_due", - ) - .await?; - return Ok(()); - } - - // Only retry once per 24h - let attempts = self.repo.list_attempts_for_invoice(&invoice.id).await?; - if let Some(last) = attempts.last() - && last.method != "nip17_dm" - { - let last_at = ts_to_dt(last.created_at)?; - if now - last_at < chrono::Duration::hours(24) { + let bolt11 = match self.make_bolt11(total).await { + Ok(v) => v, + Err(e) => { + tracing::error!(tenant = %tenant.pubkey, error = %e, "bolt11 generation failed"); return Ok(()); } - } + }; - let run_id = Uuid::new_v4().to_string(); + let invoice = Invoice { + id: invoice_id, + tenant: tenant.pubkey.clone(), + status: "pending".to_string(), + created_at: now.timestamp(), + attempted_at: 0, + error: String::new(), + closed_at: 0, + sent_at: 0, + paid_at: 0, + bolt11, + period_start: period_start.timestamp(), + period_end: period_end.timestamp(), + }; - // 1. Try NWC - if !tenant.nwc_url.trim().is_empty() { - match self - .pay_via_nwc(&tenant.nwc_url, &invoice.bolt11) - .await - { - Ok(()) => { - self.repo - .record_attempt( - &attempt(&invoice.id, &run_id, "nwc", "success", ""), - "paid", - ) - .await?; - return Ok(()); - } - Err(err) => { - tracing::warn!(tenant = %tenant.pubkey, error = %err, "NWC payment failed"); - self.repo - .record_attempt( - &attempt(&invoice.id, &run_id, "nwc", "failed", &err.to_string()), - &invoice.status, - ) - .await?; + self.repo.create_invoice(&invoice, &items).await?; + Ok(()) + } + + async fn collect_outstanding(&self, tenant: &Tenant) -> Result<()> { + let invoices = self.repo.list_invoices(Some(&tenant.pubkey)).await?; + let now = now_ts(); + + for invoice in invoices.into_iter().filter(|i| i.status == "pending") { + if invoice.attempted_at > 0 && now - invoice.attempted_at < 24 * 3600 { + continue; + } + + if self.is_bolt11_paid(&invoice.bolt11).await { + self.repo.mark_invoice_paid(&invoice.id).await?; + continue; + } + + let mut collected = false; + if !tenant.nwc_url.trim().is_empty() && self.pay_invoice_nwc(&tenant.nwc_url, &invoice.bolt11).await { + self.repo.mark_invoice_paid(&invoice.id).await?; + collected = true; + } + + if !collected { + self.repo + .mark_invoice_attempted(&invoice.id, Some("autopay failed or unavailable")) + .await?; + + if invoice.sent_at == 0 { + let amount: i64 = self + .repo + .get_invoice_items(&invoice.id) + .await? + .into_iter() + .map(|i| i.sats) + .sum(); + let message = format!( + "Invoice {} is due. Amount: {} sats\n{}", + invoice.id, amount, invoice.bolt11 + ); + if self.robot.send_dm(&tenant.pubkey, &message).await.is_ok() { + self.repo.mark_invoice_sent(&invoice.id).await?; + } } } - } - // 2. Try Stripe - if !tenant.stripe_subscription_id.trim().is_empty() { - match self.pay_via_stripe(tenant, invoice).await { - Ok(()) => { - self.repo - .record_attempt( - &attempt(&invoice.id, &run_id, "stripe", "success", ""), - "paid", - ) - .await?; - return Ok(()); - } - Err(err) => { - tracing::warn!(tenant = %tenant.pubkey, error = %err, "Stripe payment failed"); - self.repo - .record_attempt( - &attempt(&invoice.id, &run_id, "stripe", "failed", &err.to_string()), - &invoice.status, - ) - .await?; - } - } - } - - // 3. Fallback: Lightning invoice shown in-app; send one DM if no auto-pay configured - let dm_sent = self.repo.invoice_dm_sent(&invoice.id).await?; - if !dm_sent { - match self - .send_invoice_dm(tenant, invoice, invoice.bolt11.as_str()) - .await - { - Ok(()) => { - self.repo - .record_attempt( - &attempt(&invoice.id, &run_id, "nip17_dm", "sent", ""), - &invoice.status, - ) - .await?; - } - Err(err) => { - tracing::warn!(tenant = %tenant.pubkey, error = %err, "NIP-17 DM failed"); - } + if now - invoice.created_at >= 7 * 24 * 3600 { + self.repo.mark_invoice_closed(&invoice.id).await?; } } Ok(()) } - // ── payment providers ───────────────────────────────────────────────────── - async fn make_bolt11(&self, amount_sats: i64) -> Result { - if self.platform_nwc_url.trim().is_empty() { - return Err(anyhow!("NWC_URL is required to generate invoices")); + if self.nwc_url.trim().is_empty() { + anyhow::bail!("NWC_URL not configured") } - let uri = NostrWalletConnectURI::parse(&self.platform_nwc_url)?; - let req = nip47::Request::make_invoice(MakeInvoiceRequest { - amount: (amount_sats as u64) * 1_000, - description: Some("Relay hosting".to_string()), - description_hash: None, - expiry: None, - }); + + let uri = nostr_sdk::nips::nip47::NostrWalletConnectURI::parse(&self.nwc_url)?; + let req = nostr_sdk::nips::nip47::Request::make_invoice( + nostr_sdk::nips::nip47::MakeInvoiceRequest { + amount: (amount_sats as u64) * 1_000, + description: Some("Caravel relay invoice".to_string()), + description_hash: None, + expiry: None, + }, + ); + let resp = self.send_nwc_request(&uri, req).await?; Ok(resp.to_make_invoice()?.invoice) } - async fn pay_via_nwc(&self, nwc_url: &str, bolt11: &str) -> Result<()> { - let uri = NostrWalletConnectURI::parse(nwc_url)?; - let req = nip47::Request::pay_invoice(PayInvoiceRequest::new(bolt11)); - self.send_nwc_request(&uri, req).await?.to_pay_invoice()?; - Ok(()) + async fn is_bolt11_paid(&self, _bolt11: &str) -> bool { + false } - async fn pay_via_stripe(&self, _tenant: &Tenant, _invoice: &Invoice) -> Result<()> { - // TODO: implement Stripe off-session charge using tenant.stripe_subscription_id - Err(anyhow!("Stripe not yet implemented")) + async fn pay_invoice_nwc(&self, nwc_url: &str, bolt11: &str) -> bool { + let uri = match nostr_sdk::nips::nip47::NostrWalletConnectURI::parse(nwc_url) { + Ok(v) => v, + Err(_) => return false, + }; + let req = nostr_sdk::nips::nip47::Request::pay_invoice( + nostr_sdk::nips::nip47::PayInvoiceRequest::new(bolt11), + ); + self.send_nwc_request(&uri, req) + .await + .and_then(|r| r.to_pay_invoice().map(|_| ()).map_err(anyhow::Error::from)) + .is_ok() } async fn send_nwc_request( &self, - uri: &NostrWalletConnectURI, - request: nip47::Request, - ) -> Result { + uri: &nostr_sdk::nips::nip47::NostrWalletConnectURI, + request: nostr_sdk::nips::nip47::Request, + ) -> Result { + use nostr_sdk::{Client, Filter, Kind, Keys, Timestamp}; + let app_keys = Keys::new(uri.secret.clone()); let app_pubkey = app_keys.public_key(); let client = Client::new(app_keys); @@ -327,183 +274,119 @@ impl BillingService { .pubkey(app_pubkey) .since(started_at); - let events = client.fetch_events(filter, Duration::from_secs(10)).await?; + let events = client + .fetch_events(filter, std::time::Duration::from_secs(10)) + .await?; let event = events .into_iter() .max_by_key(|e| e.created_at) - .ok_or_else(|| anyhow!("no NWC response received"))?; + .ok_or_else(|| anyhow::anyhow!("no NWC response received"))?; - Ok(nip47::Response::from_event(uri, &event)?) - } - - async fn send_invoice_dm( - &self, - tenant: &Tenant, - invoice: &Invoice, - bolt11: &str, - ) -> Result<()> { - let due_date = ts_to_dt(invoice.created_at + DUE_DAYS * 86400)?; - let period_start = ts_to_dt(invoice.period_start)?; - let period_end = ts_to_dt(invoice.period_end)?; - let message = format!( - "You have an outstanding invoice of {} sats due by {}.\n\ - Period: {} → {}\n\ - Pay with Lightning:\n{}", - invoice.amount, - due_date.format("%Y-%m-%d"), - period_start.format("%Y-%m-%d"), - period_end.format("%Y-%m-%d"), - bolt11, - ); - self.notifier.send(&tenant.pubkey, &message).await + Ok(nostr_sdk::nips::nip47::Response::from_event(uri, &event)?) } } -// ── billing math ────────────────────────────────────────────────────────────── +fn now_ts() -> i64 { + Utc::now().timestamp() +} -/// Given a billing anchor and the current time, return the current billing -/// period [start, end) based on rolling monthly windows from the anchor. -fn current_billing_period( - anchor: DateTime, - now: DateTime, -) -> (DateTime, DateTime) { - let mut period_start = anchor; +fn ts_to_dt(ts: i64) -> Result> { + Utc.timestamp_opt(ts, 0) + .single() + .ok_or_else(|| anyhow::anyhow!("invalid unix timestamp")) +} + +fn billing_window(anchor: DateTime, now: DateTime) -> (DateTime, DateTime) { + let mut start = anchor; loop { - let period_end = period_start + Months::new(1); - if now < period_end { - return (period_start, period_end); + let end = start + Months::new(1); + if now < end { + return (start, end); } - period_start = period_end; + start = end; } } -/// Compute per-relay billable sats for a billing period from the lifecycle -/// event log. Rules: -/// - Billing starts at `provisioned`, pauses at `suspended`, resumes at -/// `unsuspended`, stops at `deactivated`. -/// - Only time within [period_start, period_end) counts. -/// - Round each relay's total billable seconds up to the next full hour. -/// - Minimum 1 billable hour per relay per period. -/// - Amount is based on the relay's current plan amount (retroactive within period). -fn compute_invoice_items( - invoice_id: &str, - events: &[RelayLifecycleEvent], - plan_amount_map: &HashMap, - period_start: DateTime, - period_end: DateTime, -) -> Vec { - // Group events by relay, preserving sort order from the DB (relay, created_at, id) - let mut by_relay: HashMap<&str, Vec<&RelayLifecycleEvent>> = HashMap::new(); - for event in events { - by_relay.entry(&event.relay).or_default().push(event); - } - - let mut items = Vec::new(); - - for (relay_id, relay_events) in &by_relay { - // Use the latest plan for this relay (retroactive rate within period) - let Some(latest_event) = relay_events.last() else { - continue; - }; - let plan_amount = *plan_amount_map.get(latest_event.plan.as_str()).unwrap_or(&0); - if plan_amount == 0 { - continue; - } - - let billable_secs = billable_seconds_in_period(relay_events, period_start, period_end); - if billable_secs == 0 { - continue; - } - - // Round up to next full hour, minimum 1 hour - let hours = ((billable_secs as f64) / 3600.0).ceil().max(1.0) as i64; - - items.push(InvoiceItem { - id: Uuid::new_v4().to_string(), - invoice: invoice_id.to_string(), - relay: relay_id.to_string(), - amount: hours * plan_amount, - period_start: dt_to_ts(period_start), - period_end: dt_to_ts(period_end), - }); - } - - items -} - -/// Compute total billable seconds for one relay within [period_start, period_end). -/// Replays the full event history to correctly handle events that precede the period. -fn billable_seconds_in_period( - events: &[&RelayLifecycleEvent], - period_start: DateTime, - period_end: DateTime, +fn relay_active_hours_in_window( + relay: &Relay, + events: &[Activity], + start: DateTime, + end: DateTime, ) -> i64 { - let mut total_secs: i64 = 0; - let mut billing_start: Option> = None; + if relay.plan == "free" { + return 0; + } + let mut marks: HashMap<&str, Vec<&Activity>> = HashMap::new(); for event in events { - let Ok(ts) = ts_to_dt(event.created_at) else { - continue; - }; + if event.identifier == relay.id { + marks.entry(&relay.id).or_default().push(event); + } + } - match event.event_type.as_str() { - "provisioned" | "unsuspended" => { - if billing_start.is_none() { - billing_start = Some(ts.max(period_start)); + let Some(entries) = marks.get(relay.id.as_str()) else { + if relay.status == "active" { + return ((end - start).num_seconds() as f64 / 3600.0).ceil() as i64; + } + return 0; + }; + + let mut active = relay.status == "active"; + let mut cursor = start; + let mut secs = 0i64; + + for event in entries.iter().copied() { + let ts = match Utc.timestamp_opt(event.created_at, 0).single() { + Some(v) => v, + None => continue, + }; + if ts <= start || ts >= end { + continue; + } + + match event.activity_type.as_str() { + "relay_created" | "relay_activated" => { + if !active { + active = true; + cursor = ts; } } - "suspended" | "deactivated" => { - if let Some(start) = billing_start.take() { - let end = ts.min(period_end); - if end > start { - total_secs += (end - start).num_seconds(); - } + "relay_deactivated" | "relay_sync_failed" => { + if active { + active = false; + secs += (ts - cursor).num_seconds().max(0); } } _ => {} } } - // Still billing at period end - if let Some(start) = billing_start - && period_end > start - { - total_secs += (period_end - start).num_seconds(); + if active { + secs += (end - cursor).num_seconds().max(0); } - total_secs + let hours = (secs as f64 / 3600.0).ceil() as i64; + if hours > 0 { hours } else { 0 } } -// ── helpers ─────────────────────────────────────────────────────────────────── - -pub fn now_ts() -> i64 { - Utc::now().timestamp() +#[allow(dead_code)] +fn _same_month(a: DateTime, b: DateTime) -> bool { + a.year() == b.year() && a.month() == b.month() } -fn dt_to_ts(dt: DateTime) -> i64 { - dt.timestamp() +#[allow(dead_code)] +fn _days_between(a: i64, b: i64) -> i64 { + let da = Utc.timestamp_opt(a, 0).single().unwrap_or_else(Utc::now); + let db = Utc.timestamp_opt(b, 0).single().unwrap_or_else(Utc::now); + (db - da).num_days() } -fn ts_to_dt(ts: i64) -> Result> { - Utc.timestamp_opt(ts, 0) - .single() - .ok_or_else(|| anyhow!("invalid unix timestamp: {ts}")) +#[allow(dead_code)] +fn _hours_between(a: DateTime, b: DateTime) -> i64 { + ((b - a).num_seconds() as f64 / 3600.0).ceil() as i64 } -fn attempt( - invoice_id: &str, - run_id: &str, - method: &str, - outcome: &str, - error: &str, -) -> InvoiceAttempt { - InvoiceAttempt { - id: Uuid::new_v4().to_string(), - invoice: invoice_id.to_string(), - run_id: run_id.to_string(), - method: method.to_string(), - outcome: outcome.to_string(), - error: error.to_string(), - created_at: now_ts(), - } +#[allow(dead_code)] +fn _next_day(dt: DateTime) -> DateTime { + dt + Duration::days(1) } diff --git a/backend/src/config.rs b/backend/src/config.rs deleted file mode 100644 index 154f817..0000000 --- a/backend/src/config.rs +++ /dev/null @@ -1,108 +0,0 @@ -use std::env; -use std::path::Path; -use std::str::FromStr; - -use nostr_sdk::nostr::key::PublicKey; - -#[derive(Debug, Clone)] -pub struct Config { - pub database_url: String, - pub host: String, - pub port: u16, - pub admin_pubkeys: Vec, - pub zooid_api_url: String, - pub platform_secret: String, - pub relay_domain: String, - pub platform_nwc_url: String, - pub indexer_relays: Vec, - pub platform_name: String, - pub platform_description: String, - pub platform_picture: String, - pub platform_messaging_relays: Vec, -} - -impl Config { - pub fn from_env() -> Self { - let database_url = - env::var("DATABASE_URL").unwrap_or_else(|_| "sqlite://data/caravel.db".to_string()); - let database_url = resolve_database_url(database_url); - let host = env::var("HOST").unwrap_or_else(|_| "127.0.0.1".to_string()); - let port = env::var("PORT") - .ok() - .and_then(|v| v.parse().ok()) - .unwrap_or(3000); - let admin_pubkeys = env::var("PLATFORM_ADMIN_PUBKEYS") - .unwrap_or_default() - .split(',') - .filter_map(normalize_pubkey) - .filter(|v| !v.is_empty()) - .collect::>(); - let zooid_api_url = - env::var("ZOOID_API_URL").unwrap_or_else(|_| "http://127.0.0.1:8032".to_string()); - let platform_secret = env::var("PLATFORM_SECRET").unwrap_or_default(); - let relay_domain = - env::var("RELAY_DOMAIN").unwrap_or_else(|_| "spaces.coracle.social".to_string()); - let platform_nwc_url = env::var("NWC_URL").unwrap_or_default(); - let indexer_relays = env::var("NOSTR_INDEXER_RELAYS") - .unwrap_or_default() - .split(',') - .map(|v| v.trim().to_string()) - .filter(|v| !v.is_empty()) - .collect::>(); - let platform_name = env::var("PLATFORM_NAME").unwrap_or_default(); - let platform_description = env::var("PLATFORM_DESCRIPTION").unwrap_or_default(); - let platform_picture = env::var("PLATFORM_PICTURE").unwrap_or_default(); - let platform_messaging_relays = env::var("PLATFORM_MESSAGING_RELAYS") - .unwrap_or_default() - .split(',') - .map(|v| v.trim().to_string()) - .filter(|v| !v.is_empty()) - .collect::>(); - - Self { - database_url, - host, - port, - admin_pubkeys, - zooid_api_url, - platform_secret, - relay_domain, - platform_nwc_url, - indexer_relays, - platform_name, - platform_description, - platform_picture, - platform_messaging_relays, - } - } -} - -fn normalize_pubkey(value: &str) -> Option { - let trimmed = value.trim(); - if trimmed.is_empty() { - return None; - } - - match PublicKey::from_str(trimmed) { - Ok(pubkey) => Some(pubkey.to_hex()), - Err(_) => Some(trimmed.to_lowercase()), - } -} - -fn resolve_database_url(database_url: String) -> String { - const PREFIX: &str = "sqlite://"; - if !database_url.starts_with(PREFIX) { - return database_url; - } - - let path = &database_url[PREFIX.len()..]; - if path.is_empty() || path.starts_with('/') || path == ":memory:" { - return database_url; - } - - let absolute = Path::new(env!("CARGO_MANIFEST_DIR")).join(path); - format!( - "sqlite:///{}", - absolute.to_string_lossy().trim_start_matches('/') - ) -} diff --git a/backend/src/db.rs b/backend/src/db.rs deleted file mode 100644 index 85554ed..0000000 --- a/backend/src/db.rs +++ /dev/null @@ -1,45 +0,0 @@ -use anyhow::Result; -use sqlx::{ - SqlitePool, migrate::Migrator, sqlite::SqliteConnectOptions, sqlite::SqlitePoolOptions, -}; -use std::fs; -use std::path::Path; -use std::str::FromStr; - -static MIGRATOR: Migrator = sqlx::migrate!("./migrations"); - -pub async fn init_pool(database_url: &str) -> Result { - ensure_sqlite_directory(database_url)?; - let options = SqliteConnectOptions::from_str(database_url)?.create_if_missing(true); - let pool = SqlitePoolOptions::new() - .max_connections(5) - .connect_with(options) - .await?; - - sqlx::query("PRAGMA journal_mode = WAL;") - .execute(&pool) - .await?; - - MIGRATOR.run(&pool).await?; - - Ok(pool) -} - -fn ensure_sqlite_directory(database_url: &str) -> Result<()> { - let Some(path) = sqlite_path(database_url) else { - return Ok(()); - }; - if let Some(parent) = path.parent() { - fs::create_dir_all(parent)?; - } - Ok(()) -} - -fn sqlite_path(database_url: &str) -> Option<&Path> { - const PREFIX: &str = "sqlite://"; - let path = database_url.strip_prefix(PREFIX)?; - if path.is_empty() || path == ":memory:" { - return None; - } - Some(Path::new(path)) -} diff --git a/backend/src/infra.rs b/backend/src/infra.rs new file mode 100644 index 0000000..8483943 --- /dev/null +++ b/backend/src/infra.rs @@ -0,0 +1,126 @@ +use anyhow::Result; +use tokio::sync::Mutex; + +use crate::repo::Repo; + +#[derive(Clone)] +pub struct Infra { + api_url: String, + relay_domain: String, + livekit_url: String, + livekit_api_key: String, + livekit_api_secret: String, + repo: Repo, + last_activity_at: std::sync::Arc>, +} + +impl Infra { + pub fn new(repo: Repo) -> Self { + let api_url = std::env::var("ZOOID_API_URL").unwrap_or_default(); + let relay_domain = std::env::var("RELAY_DOMAIN").unwrap_or_default(); + let livekit_url = std::env::var("LIVEKIT_URL").unwrap_or_default(); + let livekit_api_key = std::env::var("LIVEKIT_API_KEY").unwrap_or_default(); + let livekit_api_secret = std::env::var("LIVEKIT_API_SECRET").unwrap_or_default(); + Self { + api_url, + relay_domain, + livekit_url, + livekit_api_key, + livekit_api_secret, + repo, + last_activity_at: std::sync::Arc::new(Mutex::new(0)), + } + } + + pub async fn start(self) { + let mut interval = tokio::time::interval(std::time::Duration::from_secs(10)); + loop { + interval.tick().await; + if let Err(e) = self.tick().await { + tracing::error!(error = %e, "infra tick failed"); + } + } + } + + pub async fn tick(&self) -> Result<()> { + let mut since_guard = self.last_activity_at.lock().await; + let since = *since_guard; + let activity = self.repo.list_activity(&since, None).await?; + + for a in activity { + if matches!( + a.activity_type.as_str(), + "relay_created" | "relay_updated" | "relay_deactivated" + ) { + let Some(relay) = self.repo.get_relay(&a.identifier).await? else { + continue; + }; + + if let Err(e) = self.sync_relay(&relay).await { + tracing::warn!(relay = %relay.id, error = %e, "relay sync failed"); + self.repo.fail_relay_sync(&relay, e.to_string()).await?; + } + } + + *since_guard = (*since_guard).max(a.created_at); + } + + Ok(()) + } + + async fn sync_relay(&self, relay: &crate::models::Relay) -> Result<()> { + let client = reqwest::Client::new(); + let url = format!("{}/relay/{}", self.api_url.trim_end_matches('/'), relay.id); + + let host = if self.relay_domain.is_empty() { + relay.subdomain.clone() + } else { + format!("{}.{}", relay.subdomain, self.relay_domain) + }; + + let secret = uuid::Uuid::new_v4().to_string(); + + let livekit = if relay.livekit_enabled == 1 { + serde_json::json!({ + "enabled": true, + "url": self.livekit_url, + "api_key": self.livekit_api_key, + "api_secret": self.livekit_api_secret, + }) + } else { + serde_json::json!({ "enabled": false }) + }; + + let body = serde_json::json!({ + "host": host, + "schema": relay.schema, + "secret": secret, + "inactive": relay.status == "inactive", + "info": { + "name": relay.info_name, + "icon": relay.info_icon, + "description": relay.info_description, + }, + "policy": { + "public_join": relay.policy_public_join == 1, + "strip_signatures": relay.policy_strip_signatures == 1, + }, + "groups": { "enabled": relay.groups_enabled == 1 }, + "management": { "enabled": relay.management_enabled == 1 }, + "blossom": { "enabled": relay.blossom_enabled == 1 }, + "livekit": livekit, + "push": { "enabled": relay.push_enabled == 1 }, + "roles": [ + { "name": "admin", "permissions": ["read", "write", "admin"] }, + { "name": "member", "permissions": ["read", "write"] }, + { "name": "guest", "permissions": ["read"] }, + ], + }); + + let response = client.put(url).json(&body).send().await?; + if !response.status().is_success() { + anyhow::bail!("zooid sync returned {}", response.status()) + } + Ok(()) + } +} diff --git a/backend/src/main.rs b/backend/src/main.rs index cece527..48d1264 100644 --- a/backend/src/main.rs +++ b/backend/src/main.rs @@ -1,29 +1,18 @@ mod api; -mod auth; mod billing; -mod config; -mod db; +mod infra; mod models; -mod notifications; -mod platform; -mod provisioning; mod repo; - -use std::net::SocketAddr; +mod robot; use anyhow::Result; -use axum::{Router, routing::get}; -use tokio::net::TcpListener; -use tower_http::cors::CorsLayer; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; -use crate::billing::BillingService; -use crate::config::Config; -use crate::db::init_pool; -use crate::notifications::Nip17Notifier; -use crate::platform::publish_platform_identity; -use crate::provisioning::Provisioner; +use crate::api::Api; +use crate::billing::Billing; +use crate::infra::Infra; use crate::repo::Repo; +use crate::robot::Robot; #[tokio::main] async fn main() -> Result<()> { @@ -34,61 +23,20 @@ async fn main() -> Result<()> { .with(tracing_subscriber::fmt::layer()) .init(); - let config = Config::from_env(); - ensure_sqlite_dir(&config.database_url)?; + let repo = Repo::new().await?; + repo.migrate().await?; + let robot = Robot::new().await?; + let billing = Billing::new(repo.clone(), robot.clone()); + let infra = Infra::new(repo.clone()); + let api = Api::new(repo); - let pool = init_pool(&config.database_url).await?; - let repo = Repo::new(pool); - publish_platform_identity( - &config.platform_secret, - &config.indexer_relays, - &config.platform_name, - &config.platform_description, - &config.platform_picture, - &config.platform_messaging_relays, - ) - .await?; - let notifier = Nip17Notifier::new( - config.platform_secret.clone(), - config.indexer_relays.clone(), - ) - .await?; - let billing = BillingService::new(repo.clone(), notifier, config.platform_nwc_url.clone()); - tokio::spawn(billing.run()); - let provisioner = Provisioner::new( - config.zooid_api_url.clone(), - config.relay_domain.clone(), - config.platform_secret.clone(), - )?; - let state = api::AppState { - repo, - admin_pubkeys: std::sync::Arc::new(config.admin_pubkeys.clone()), - provisioner, - }; + tokio::spawn(async move { + billing.start().await; + }); - let app = Router::new() - .merge(api::router(state)) - .route("/healthz", get(healthz)) - .layer(CorsLayer::permissive()); + tokio::spawn(async move { + infra.start().await; + }); - let addr: SocketAddr = format!("{}:{}", config.host, config.port).parse()?; - let listener = TcpListener::bind(addr).await?; - tracing::info!("listening on {}", addr); - - axum::serve(listener, app).await?; - Ok(()) -} - -fn ensure_sqlite_dir(database_url: &str) -> Result<()> { - if let Some(path) = database_url.strip_prefix("sqlite://") - && let Some(dir) = std::path::Path::new(path).parent() - && !dir.as_os_str().is_empty() - { - std::fs::create_dir_all(dir)?; - } - Ok(()) -} - -async fn healthz() -> &'static str { - "ok" + api.serve().await } diff --git a/backend/src/models.rs b/backend/src/models.rs index 87b5e7f..e8ef893 100644 --- a/backend/src/models.rs +++ b/backend/src/models.rs @@ -1,68 +1,53 @@ use serde::{Deserialize, Serialize}; -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct RelayConfig { - pub policy: Option, - pub groups: Option, - pub management: Option, - pub blossom: Option, - pub livekit: Option, - pub push: Option, +#[derive(Debug, Clone, Serialize, Deserialize, sqlx::FromRow)] +pub struct Activity { + pub id: String, + pub created_at: i64, + pub activity_type: String, + pub identifier: String, } #[derive(Debug, Clone, Serialize, Deserialize, sqlx::FromRow)] pub struct Tenant { pub pubkey: String, - pub status: String, pub nwc_url: String, pub created_at: i64, - pub billing_anchor_at: i64, - pub stripe_customer_id: String, - pub stripe_subscription_id: String, + pub billing_anchor: i64, } -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, sqlx::FromRow)] pub struct Relay { pub id: String, pub tenant: String, - pub name: String, + pub schema: String, pub subdomain: String, - pub icon: String, - pub description: String, pub plan: String, pub status: String, - pub config: Option, -} - -#[derive(Debug, Clone, Serialize, Deserialize, sqlx::FromRow)] -pub struct Plan { - pub id: String, - pub sats_per_month: i64, -} - -/// Append-only record of relay lifecycle transitions (provisioned, suspended, -/// unsuspended, deactivated). Used as the source of truth for usage metering. -#[derive(Debug, Clone, Serialize, Deserialize, sqlx::FromRow)] -pub struct RelayLifecycleEvent { - pub id: String, - pub relay: String, - pub tenant: String, - /// One of: "provisioned", "suspended", "unsuspended", "deactivated" - pub event_type: String, - /// Plan active on the relay at the time of the event - pub plan: String, - pub created_at: i64, + pub sync_error: String, + pub info_name: String, + pub info_icon: String, + pub info_description: String, + pub policy_public_join: i64, + pub policy_strip_signatures: i64, + pub groups_enabled: i64, + pub management_enabled: i64, + pub blossom_enabled: i64, + pub livekit_enabled: i64, + pub push_enabled: i64, } #[derive(Debug, Clone, Serialize, Deserialize, sqlx::FromRow)] pub struct Invoice { pub id: String, pub tenant: String, - pub amount: i64, - /// One of: "pending", "past_due", "paid", "void" pub status: String, pub created_at: i64, - /// bolt11 invoice string (may be refreshed in-app when expired) + pub attempted_at: i64, + pub error: String, + pub closed_at: i64, + pub sent_at: i64, + pub paid_at: i64, pub bolt11: String, pub period_start: i64, pub period_end: i64, @@ -73,23 +58,5 @@ pub struct InvoiceItem { pub id: String, pub invoice: String, pub relay: String, - pub amount: i64, - pub period_start: i64, - pub period_end: i64, -} - -/// Canonical history of payment attempts. `invoices.status` is a synchronous -/// projection updated in the same transaction as each new attempt row. -#[derive(Debug, Clone, Serialize, Deserialize, sqlx::FromRow)] -pub struct InvoiceAttempt { - pub id: String, - pub invoice: String, - /// Groups all method attempts within a single collection run - pub run_id: String, - /// One of: "nwc", "stripe", "lightning", "nip17_dm" - pub method: String, - /// One of: "success", "failed", "sent" (for DM) - pub outcome: String, - pub error: String, - pub created_at: i64, + pub sats: i64, } diff --git a/backend/src/notifications.rs b/backend/src/notifications.rs deleted file mode 100644 index 5001458..0000000 --- a/backend/src/notifications.rs +++ /dev/null @@ -1,106 +0,0 @@ -use anyhow::{Result, anyhow}; -use nostr_sdk::prelude::*; -use std::collections::HashMap; -use std::sync::Arc; -use std::time::{Duration, Instant}; -use tokio::sync::Mutex; - -#[derive(Clone)] -pub struct Nip17Notifier { - keys: Keys, - indexer_client: Client, - indexer_enabled: bool, - cache: Arc>>, -} - -impl Nip17Notifier { - pub async fn new(platform_secret: String, relays: Vec) -> Result { - if platform_secret.trim().is_empty() { - return Err(anyhow!( - "PLATFORM_SECRET is required for NIP-17 notifications" - )); - } - - let keys = Keys::parse(&platform_secret)?; - let indexer_client = Client::new(keys.clone()); - - for relay in &relays { - indexer_client.add_relay(relay).await?; - } - - let indexer_enabled = !relays.is_empty(); - if indexer_enabled { - indexer_client.connect().await; - } - - Ok(Self { - keys, - indexer_client, - indexer_enabled, - cache: Arc::new(Mutex::new(HashMap::new())), - }) - } - - pub async fn send(&self, recipient: &str, message: &str) -> Result<()> { - if !self.indexer_enabled { - return Ok(()); - } - - let relays = self.fetch_dm_relays(recipient).await?; - if relays.is_empty() { - return Ok(()); - } - - let pubkey = PublicKey::parse(recipient)?; - let client = Client::new(self.keys.clone()); - for relay in relays { - client.add_relay(relay).await?; - } - client.connect().await; - client.send_private_msg(pubkey, message, []).await?; - Ok(()) - } - - async fn fetch_dm_relays(&self, recipient: &str) -> Result> { - let mut cache = self.cache.lock().await; - if let Some(entry) = cache.get(recipient) - && entry.fetched_at.elapsed() < Duration::from_secs(300) - { - return Ok(entry.relays.clone()); - } - - let pubkey = PublicKey::parse(recipient)?; - let filter = Filter::new().kind(Kind::Custom(10050)).author(pubkey); - let events = self - .indexer_client - .fetch_events(filter, Duration::from_secs(5)) - .await?; - - let mut relays = Vec::new(); - if let Some(event) = events.into_iter().max_by_key(|event| event.created_at) { - for tag in event.tags.iter() { - if tag.as_slice().first().is_some_and(|t| t == "relay") - && let Some(value) = tag.as_slice().get(1) - { - relays.push(value.to_string()); - } - } - } - - cache.insert( - recipient.to_string(), - CacheEntry { - relays: relays.clone(), - fetched_at: Instant::now(), - }, - ); - - Ok(relays) - } -} - -#[derive(Clone)] -struct CacheEntry { - relays: Vec, - fetched_at: Instant, -} diff --git a/backend/src/platform.rs b/backend/src/platform.rs deleted file mode 100644 index 4f4b8b9..0000000 --- a/backend/src/platform.rs +++ /dev/null @@ -1,57 +0,0 @@ -use anyhow::{Result, anyhow}; -use nostr_sdk::prelude::*; - -pub async fn publish_platform_identity( - platform_secret: &str, - indexer_relays: &[String], - name: &str, - description: &str, - picture: &str, - messaging_relays: &[String], -) -> Result<()> { - if indexer_relays.is_empty() { - return Ok(()); - } - - if platform_secret.trim().is_empty() { - return Err(anyhow!("PLATFORM_SECRET is required for platform identity")); - } - - let keys = Keys::parse(platform_secret)?; - let client = Client::new(keys); - - for relay in indexer_relays { - client.add_relay(relay).await?; - } - - client.connect().await; - - let mut metadata = Metadata::new(); - if !name.is_empty() { - metadata = metadata.name(name); - } - if !description.is_empty() { - metadata = metadata.about(description); - } - if !picture.is_empty() { - metadata = metadata.picture(Url::parse(picture)?); - } - - let metadata_builder = EventBuilder::metadata(&metadata); - client.send_event_builder(metadata_builder).await?; - - if messaging_relays.is_empty() { - return Ok(()); - } - - let mut tags = Vec::new(); - for relay in messaging_relays { - let tag = Tag::parse(["relay", relay.as_str()])?; - tags.push(tag); - } - - let relay_builder = EventBuilder::new(Kind::Custom(10050), "").tags(tags); - client.send_event_builder(relay_builder).await?; - - Ok(()) -} diff --git a/backend/src/provisioning.rs b/backend/src/provisioning.rs deleted file mode 100644 index 4855825..0000000 --- a/backend/src/provisioning.rs +++ /dev/null @@ -1,181 +0,0 @@ -use anyhow::{Result, anyhow}; -use rand::RngCore; -use rand::rngs::OsRng; -use reqwest::Client; -use serde_json::{Value, json}; - -use nostr_sdk::nostr::Keys; -use nostr_sdk::nostr::nips::nip98::{HttpData, HttpMethod}; -use nostr_sdk::nostr::types::url::Url; - -use crate::models::{Relay, RelayConfig}; - -#[derive(Clone)] -pub struct Provisioner { - base_url: String, - relay_domain: String, - admin_keys: Keys, - client: Client, -} - -impl Provisioner { - pub fn new(base_url: String, relay_domain: String, admin_secret: String) -> Result { - if admin_secret.trim().is_empty() { - return Err(anyhow!("PLATFORM_SECRET is required")); - } - - let admin_keys = Keys::parse(&admin_secret)?; - let client = Client::new(); - - Ok(Self { - base_url, - relay_domain, - admin_keys, - client, - }) - } - - /// Create a relay in zooid. - /// - /// POSTs the full config (including a generated secret and host). - pub async fn create_relay(&self, relay: &Relay) -> Result<()> { - let url = format!("{}/relay/{}", self.base_url.trim_end_matches('/'), relay.id); - - let blossom_default = relay.plan != "free"; - let livekit_default = relay.plan != "free"; - let cfg = relay.config.as_ref(); - let host = format!("{}.{}", relay.subdomain, self.relay_domain); - let secret = generate_secret_hex(); - let payload = json!({ - "host": host, - "schema": relay.id, - "secret": secret, - "info": { - "name": relay.name, - "icon": relay.icon, - "pubkey": relay.tenant, - "description": relay.description, - }, - "policy": { - "public_join": cfg_bool(cfg, |c| &c.policy, "public_join", false), - "strip_signatures": cfg_bool(cfg, |c| &c.policy, "strip_signatures", false), - }, - "groups": { - "enabled": cfg_bool(cfg, |c| &c.groups, "enabled", true), - "auto_join": cfg_bool(cfg, |c| &c.groups, "auto_join", true), - }, - "push": { - "enabled": cfg_bool(cfg, |c| &c.push, "enabled", true), - }, - "management": { - "enabled": cfg_bool(cfg, |c| &c.management, "enabled", true), - }, - "blossom": { - "enabled": cfg_bool(cfg, |c| &c.blossom, "enabled", blossom_default), - }, - "livekit": { - "enabled": cfg_bool(cfg, |c| &c.livekit, "enabled", livekit_default), - }, - "roles": { - "member": { "pubkeys": [], "can_invite": true, "can_manage": false } - }, - }); - let auth = self.build_auth_header(&url, HttpMethod::POST).await?; - - let res = self - .client - .post(&url) - .header(reqwest::header::AUTHORIZATION, auth) - .json(&payload) - .send() - .await?; - - if !res.status().is_success() { - let status = res.status(); - let body = res.text().await.unwrap_or_default(); - return Err(anyhow!("zooid create failed: {} {}", status, body)); - } - - Ok(()) - } - - /// Update a relay in zooid. - /// - /// PATCHes only the mutable fields (info + config sections). - pub async fn update_relay(&self, relay: &Relay) -> Result<()> { - let url = format!("{}/relay/{}", self.base_url.trim_end_matches('/'), relay.id); - let host = format!("{}.{}", relay.subdomain, self.relay_domain); - let blossom_default = relay.plan != "free"; - let livekit_default = relay.plan != "free"; - let cfg = relay.config.as_ref(); - let patch = json!({ - "host": host, - "info": { - "name": relay.name, - "icon": relay.icon, - "description": relay.description, - }, - "policy": { - "public_join": cfg_bool(cfg, |c| &c.policy, "public_join", false), - "strip_signatures": cfg_bool(cfg, |c| &c.policy, "strip_signatures", false), - }, - "groups": { - "enabled": cfg_bool(cfg, |c| &c.groups, "enabled", true), - "auto_join": cfg_bool(cfg, |c| &c.groups, "auto_join", true), - }, - "push": { - "enabled": cfg_bool(cfg, |c| &c.push, "enabled", true), - }, - "management": { - "enabled": cfg_bool(cfg, |c| &c.management, "enabled", true), - }, - "blossom": { - "enabled": cfg_bool(cfg, |c| &c.blossom, "enabled", blossom_default), - }, - "livekit": { - "enabled": cfg_bool(cfg, |c| &c.livekit, "enabled", livekit_default), - }, - }); - let auth = self.build_auth_header(&url, HttpMethod::PATCH).await?; - - let res = self - .client - .patch(&url) - .header(reqwest::header::AUTHORIZATION, auth) - .json(&patch) - .send() - .await?; - - if !res.status().is_success() { - let status = res.status(); - let body = res.text().await.unwrap_or_default(); - return Err(anyhow!("zooid patch failed: {} {}", status, body)); - } - - Ok(()) - } - - async fn build_auth_header(&self, url: &str, method: HttpMethod) -> Result { - let url = Url::parse(url)?; - let data = HttpData::new(url, method); - let header = data.to_authorization(&self.admin_keys).await?; - Ok(header) - } -} - -fn cfg_bool( - cfg: Option<&RelayConfig>, - section: impl Fn(&RelayConfig) -> &Option, - key: &str, - default: bool, -) -> bool { - cfg.and_then(|c| section(c).as_ref()) - .and_then(|v| v[key].as_bool()) - .unwrap_or(default) -} - -fn generate_secret_hex() -> String { - let mut bytes = [0u8; 32]; - OsRng.fill_bytes(&mut bytes); - hex::encode(bytes) -} diff --git a/backend/src/repo.rs b/backend/src/repo.rs index 02461ea..6f0ae46 100644 --- a/backend/src/repo.rs +++ b/backend/src/repo.rs @@ -1,166 +1,578 @@ +use std::path::Path; + use anyhow::Result; -use sqlx::{Row, Sqlite, SqlitePool, Transaction}; +use sqlx::{SqlitePool, sqlite::SqlitePoolOptions}; -use crate::models::{ - Invoice, InvoiceAttempt, InvoiceItem, Plan, Relay, RelayLifecycleEvent, Tenant, -}; - -// ── helpers ────────────────────────────────────────────────────────────────── - -fn relay_from_row(row: sqlx::sqlite::SqliteRow) -> Relay { - let config_json: Option = row.get("config"); - let config = config_json.and_then(|s| serde_json::from_str(&s).ok()); - Relay { - id: row.get("id"), - tenant: row.get("tenant"), - name: row.get("name"), - subdomain: row.get("subdomain"), - icon: row.get("icon"), - description: row.get("description"), - plan: row.get("plan"), - status: row.get("status"), - config, - } -} - -const TENANT_COLS: &str = - "pubkey, status, nwc_url, created_at, billing_anchor_at, stripe_customer_id, stripe_subscription_id"; - -const RELAY_COLS: &str = "id, tenant, name, subdomain, icon, description, plan, status, config"; - -const INVOICE_COLS: &str = - "id, tenant, amount, status, created_at, bolt11, period_start, period_end"; - -const LIFECYCLE_COLS: &str = "id, relay, tenant, event_type, plan, created_at"; - -const ATTEMPT_COLS: &str = "id, invoice, run_id, method, outcome, error, created_at"; - -// ── Repo ───────────────────────────────────────────────────────────────────── +use crate::models::{Activity, Invoice, InvoiceItem, Relay, Tenant}; #[derive(Clone)] pub struct Repo { - pool: SqlitePool, + pub pool: SqlitePool, } impl Repo { - pub fn new(pool: SqlitePool) -> Self { - Self { pool } + pub async fn new() -> Result { + let database_url = std::env::var("DATABASE_URL") + .unwrap_or_else(|_| "sqlite://data/caravel.db".to_string()); + + if let Some(path) = database_url.strip_prefix("sqlite://") + && !path.is_empty() + && path != ":memory:" + && let Some(parent) = Path::new(path).parent() + && !parent.as_os_str().is_empty() + { + std::fs::create_dir_all(parent)?; + } + + let pool = SqlitePoolOptions::new() + .max_connections(5) + .connect(&database_url) + .await?; + + sqlx::query("PRAGMA journal_mode = WAL;") + .execute(&pool) + .await?; + + Ok(Self { pool }) } - // ── tenants ────────────────────────────────────────────────────────────── - - pub async fn create_tenant(&self, tenant: &Tenant) -> Result<()> { - sqlx::query( - "INSERT INTO tenants (pubkey, status, nwc_url, created_at, billing_anchor_at, - stripe_customer_id, stripe_subscription_id) - VALUES (?, ?, ?, ?, ?, ?, ?)", - ) - .bind(&tenant.pubkey) - .bind(&tenant.status) - .bind(&tenant.nwc_url) - .bind(tenant.created_at) - .bind(tenant.billing_anchor_at) - .bind(&tenant.stripe_customer_id) - .bind(&tenant.stripe_subscription_id) - .execute(&self.pool) - .await?; + pub async fn migrate(&self) -> Result<()> { + sqlx::migrate!("./migrations").run(&self.pool).await?; Ok(()) } - pub async fn create_tenant_if_missing(&self, tenant: &Tenant) -> Result<()> { + async fn log_activity(&self, activity_type: &str, identifier: &str) -> Result<()> { sqlx::query( - "INSERT OR IGNORE INTO tenants - (pubkey, status, nwc_url, created_at, billing_anchor_at, - stripe_customer_id, stripe_subscription_id) - VALUES (?, ?, ?, ?, ?, ?, ?)", + "INSERT INTO activities (id, created_at, activity_type, identifier) + VALUES (?, strftime('%s','now'), ?, ?)", ) - .bind(&tenant.pubkey) - .bind(&tenant.status) - .bind(&tenant.nwc_url) - .bind(tenant.created_at) - .bind(tenant.billing_anchor_at) - .bind(&tenant.stripe_customer_id) - .bind(&tenant.stripe_subscription_id) + .bind(uuid::Uuid::new_v4().to_string()) + .bind(activity_type) + .bind(identifier) .execute(&self.pool) .await?; Ok(()) } - pub async fn get_tenant(&self, pubkey: &str) -> Result> { - let tenant = sqlx::query_as::<_, Tenant>(&format!( - "SELECT {TENANT_COLS} FROM tenants WHERE pubkey = ?" - )) - .bind(pubkey) - .fetch_optional(&self.pool) - .await?; - Ok(tenant) - } - pub async fn list_tenants(&self) -> Result> { - let tenants = sqlx::query_as::<_, Tenant>(&format!( - "SELECT {TENANT_COLS} FROM tenants ORDER BY pubkey" - )) + let rows = sqlx::query_as::<_, Tenant>( + "SELECT pubkey, nwc_url, created_at, billing_anchor + FROM tenants + ORDER BY pubkey", + ) .fetch_all(&self.pool) .await?; - Ok(tenants) + Ok(rows) } - pub async fn update_tenant_status(&self, pubkey: &str, status: &str) -> Result<()> { - sqlx::query("UPDATE tenants SET status = ? WHERE pubkey = ?") - .bind(status) + pub async fn get_tenant(&self, pubkey: &str) -> Result> { + let row = sqlx::query_as::<_, Tenant>( + "SELECT pubkey, nwc_url, created_at, billing_anchor + FROM tenants + WHERE pubkey = ?", + ) + .bind(pubkey) + .fetch_optional(&self.pool) + .await?; + Ok(row) + } + + pub async fn create_tenant(&self, tenant: &Tenant) -> Result<()> { + let mut tx = self.pool.begin().await?; + + sqlx::query( + "INSERT INTO tenants (pubkey, nwc_url, created_at, billing_anchor) + VALUES (?, ?, ?, ?)", + ) + .bind(&tenant.pubkey) + .bind(&tenant.nwc_url) + .bind(tenant.created_at) + .bind(tenant.billing_anchor) + .execute(&mut *tx) + .await?; + + sqlx::query( + "INSERT INTO activities (id, created_at, activity_type, identifier) + VALUES (?, strftime('%s','now'), 'tenant_created', ?)", + ) + .bind(uuid::Uuid::new_v4().to_string()) + .bind(&tenant.pubkey) + .execute(&mut *tx) + .await?; + + tx.commit().await?; + Ok(()) + } + + pub async fn update_tenant_billing_anchor(&self, pubkey: &str, billing_anchor: i64) -> Result<()> { + let mut tx = self.pool.begin().await?; + + sqlx::query("UPDATE tenants SET billing_anchor = ? WHERE pubkey = ?") + .bind(billing_anchor) .bind(pubkey) - .execute(&self.pool) + .execute(&mut *tx) .await?; + + sqlx::query( + "INSERT INTO activities (id, created_at, activity_type, identifier) + VALUES (?, strftime('%s','now'), 'tenant_billing_anchor_updated', ?)", + ) + .bind(uuid::Uuid::new_v4().to_string()) + .bind(pubkey) + .execute(&mut *tx) + .await?; + + tx.commit().await?; Ok(()) } pub async fn update_tenant_nwc_url(&self, pubkey: &str, nwc_url: &str) -> Result<()> { + let mut tx = self.pool.begin().await?; + sqlx::query("UPDATE tenants SET nwc_url = ? WHERE pubkey = ?") .bind(nwc_url) .bind(pubkey) - .execute(&self.pool) + .execute(&mut *tx) .await?; - Ok(()) - } - #[allow(dead_code)] - pub async fn update_tenant_billing_integrations( - &self, - pubkey: &str, - nwc_url: &str, - stripe_customer_id: &str, - stripe_subscription_id: &str, - ) -> Result<()> { sqlx::query( - "UPDATE tenants - SET nwc_url = ?, stripe_customer_id = ?, stripe_subscription_id = ? - WHERE pubkey = ?", + "INSERT INTO activities (id, created_at, activity_type, identifier) + VALUES (?, strftime('%s','now'), 'tenant_billing_updated', ?)", ) - .bind(nwc_url) - .bind(stripe_customer_id) - .bind(stripe_subscription_id) + .bind(uuid::Uuid::new_v4().to_string()) .bind(pubkey) - .execute(&self.pool) + .execute(&mut *tx) .await?; + + tx.commit().await?; Ok(()) } - /// Reset the billing cycle anchor. Called when a tenant adds their first - /// paid relay after having none. - pub async fn reset_billing_anchor(&self, pubkey: &str, anchor_at: i64) -> Result<()> { - sqlx::query("UPDATE tenants SET billing_anchor_at = ? WHERE pubkey = ?") - .bind(anchor_at) - .bind(pubkey) - .execute(&self.pool) + pub async fn list_relays(&self, tenant_id: Option<&str>) -> Result> { + let rows = if let Some(tenant) = tenant_id { + sqlx::query_as::<_, Relay>( + "SELECT id, tenant, schema, subdomain, plan, status, sync_error, + info_name, info_icon, info_description, + policy_public_join, policy_strip_signatures, + groups_enabled, management_enabled, blossom_enabled, + livekit_enabled, push_enabled + FROM relays + WHERE tenant = ? + ORDER BY id", + ) + .bind(tenant) + .fetch_all(&self.pool) + .await? + } else { + sqlx::query_as::<_, Relay>( + "SELECT id, tenant, schema, subdomain, plan, status, sync_error, + info_name, info_icon, info_description, + policy_public_join, policy_strip_signatures, + groups_enabled, management_enabled, blossom_enabled, + livekit_enabled, push_enabled + FROM relays + ORDER BY id", + ) + .fetch_all(&self.pool) + .await? + }; + Ok(rows) + } + + pub async fn get_relay(&self, id: &str) -> Result> { + let row = sqlx::query_as::<_, Relay>( + "SELECT id, tenant, schema, subdomain, plan, status, sync_error, + info_name, info_icon, info_description, + policy_public_join, policy_strip_signatures, + groups_enabled, management_enabled, blossom_enabled, + livekit_enabled, push_enabled + FROM relays + WHERE id = ?", + ) + .bind(id) + .fetch_optional(&self.pool) + .await?; + Ok(row) + } + + pub async fn create_relay(&self, relay: &Relay) -> Result<()> { + let mut tx = self.pool.begin().await?; + + sqlx::query( + "INSERT INTO relays ( + id, tenant, schema, subdomain, plan, status, sync_error, + info_name, info_icon, info_description, + policy_public_join, policy_strip_signatures, + groups_enabled, management_enabled, blossom_enabled, + livekit_enabled, push_enabled + ) VALUES (?, ?, ?, ?, ?, 'new', ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", + ) + .bind(&relay.id) + .bind(&relay.tenant) + .bind(&relay.schema) + .bind(&relay.subdomain) + .bind(&relay.plan) + .bind(&relay.sync_error) + .bind(&relay.info_name) + .bind(&relay.info_icon) + .bind(&relay.info_description) + .bind(relay.policy_public_join) + .bind(relay.policy_strip_signatures) + .bind(relay.groups_enabled) + .bind(relay.management_enabled) + .bind(relay.blossom_enabled) + .bind(relay.livekit_enabled) + .bind(relay.push_enabled) + .execute(&mut *tx) + .await?; + + sqlx::query( + "INSERT INTO activities (id, created_at, activity_type, identifier) + VALUES (?, strftime('%s','now'), 'relay_created', ?)", + ) + .bind(uuid::Uuid::new_v4().to_string()) + .bind(&relay.id) + .execute(&mut *tx) + .await?; + + tx.commit().await?; + Ok(()) + } + + pub async fn update_relay(&self, relay: &Relay) -> Result<()> { + let mut tx = self.pool.begin().await?; + + sqlx::query( + "UPDATE relays + SET tenant = ?, schema = ?, subdomain = ?, plan = ?, status = ?, sync_error = ?, + info_name = ?, info_icon = ?, info_description = ?, + policy_public_join = ?, policy_strip_signatures = ?, + groups_enabled = ?, management_enabled = ?, blossom_enabled = ?, + livekit_enabled = ?, push_enabled = ? + WHERE id = ?", + ) + .bind(&relay.tenant) + .bind(&relay.schema) + .bind(&relay.subdomain) + .bind(&relay.plan) + .bind(&relay.status) + .bind(&relay.sync_error) + .bind(&relay.info_name) + .bind(&relay.info_icon) + .bind(&relay.info_description) + .bind(relay.policy_public_join) + .bind(relay.policy_strip_signatures) + .bind(relay.groups_enabled) + .bind(relay.management_enabled) + .bind(relay.blossom_enabled) + .bind(relay.livekit_enabled) + .bind(relay.push_enabled) + .bind(&relay.id) + .execute(&mut *tx) + .await?; + + sqlx::query( + "INSERT INTO activities (id, created_at, activity_type, identifier) + VALUES (?, strftime('%s','now'), 'relay_updated', ?)", + ) + .bind(uuid::Uuid::new_v4().to_string()) + .bind(&relay.id) + .execute(&mut *tx) + .await?; + + tx.commit().await?; + Ok(()) + } + + pub async fn deactivate_relay(&self, relay: &Relay) -> Result<()> { + let mut tx = self.pool.begin().await?; + + sqlx::query("UPDATE relays SET status = 'inactive' WHERE id = ?") + .bind(&relay.id) + .execute(&mut *tx) .await?; + + sqlx::query( + "INSERT INTO activities (id, created_at, activity_type, identifier) + VALUES (?, strftime('%s','now'), 'relay_deactivated', ?)", + ) + .bind(uuid::Uuid::new_v4().to_string()) + .bind(&relay.id) + .execute(&mut *tx) + .await?; + + tx.commit().await?; Ok(()) } - /// Returns the count of non-free, non-deactivated relays for a tenant. - pub async fn count_billable_relays(&self, tenant: &str) -> Result { - let count: i64 = sqlx::query_scalar( + pub async fn activate_relay(&self, relay: &Relay) -> Result<()> { + let mut tx = self.pool.begin().await?; + + sqlx::query("UPDATE relays SET status = 'active' WHERE id = ?") + .bind(&relay.id) + .execute(&mut *tx) + .await?; + + sqlx::query( + "INSERT INTO activities (id, created_at, activity_type, identifier) + VALUES (?, strftime('%s','now'), 'relay_activated', ?)", + ) + .bind(uuid::Uuid::new_v4().to_string()) + .bind(&relay.id) + .execute(&mut *tx) + .await?; + + tx.commit().await?; + Ok(()) + } + + pub async fn fail_relay_sync(&self, relay: &Relay, sync_error: String) -> Result<()> { + let mut tx = self.pool.begin().await?; + + sqlx::query("UPDATE relays SET status = 'inactive', sync_error = ? WHERE id = ?") + .bind(&sync_error) + .bind(&relay.id) + .execute(&mut *tx) + .await?; + + sqlx::query( + "INSERT INTO activities (id, created_at, activity_type, identifier) + VALUES (?, strftime('%s','now'), 'relay_sync_failed', ?)", + ) + .bind(uuid::Uuid::new_v4().to_string()) + .bind(&relay.id) + .execute(&mut *tx) + .await?; + + tx.commit().await?; + Ok(()) + } + + pub async fn create_invoice(&self, invoice: &Invoice, invoice_items: &[InvoiceItem]) -> Result<()> { + let mut tx = self.pool.begin().await?; + + sqlx::query( + "INSERT INTO invoices ( + id, tenant, status, created_at, attempted_at, error, closed_at, + sent_at, paid_at, bolt11, period_start, period_end + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", + ) + .bind(&invoice.id) + .bind(&invoice.tenant) + .bind(&invoice.status) + .bind(invoice.created_at) + .bind(invoice.attempted_at) + .bind(&invoice.error) + .bind(invoice.closed_at) + .bind(invoice.sent_at) + .bind(invoice.paid_at) + .bind(&invoice.bolt11) + .bind(invoice.period_start) + .bind(invoice.period_end) + .execute(&mut *tx) + .await?; + + for item in invoice_items { + sqlx::query("INSERT INTO invoice_items (id, invoice, relay, sats) VALUES (?, ?, ?, ?)") + .bind(&item.id) + .bind(&item.invoice) + .bind(&item.relay) + .bind(item.sats) + .execute(&mut *tx) + .await?; + } + + sqlx::query( + "INSERT INTO activities (id, created_at, activity_type, identifier) + VALUES (?, strftime('%s','now'), 'invoice_created', ?)", + ) + .bind(uuid::Uuid::new_v4().to_string()) + .bind(&invoice.id) + .execute(&mut *tx) + .await?; + + tx.commit().await?; + Ok(()) + } + + pub async fn list_invoices(&self, tenant_id: Option<&str>) -> Result> { + let rows = if let Some(tenant) = tenant_id { + sqlx::query_as::<_, Invoice>( + "SELECT id, tenant, status, created_at, attempted_at, error, closed_at, + sent_at, paid_at, bolt11, period_start, period_end + FROM invoices + WHERE tenant = ? + ORDER BY created_at DESC", + ) + .bind(tenant) + .fetch_all(&self.pool) + .await? + } else { + sqlx::query_as::<_, Invoice>( + "SELECT id, tenant, status, created_at, attempted_at, error, closed_at, + sent_at, paid_at, bolt11, period_start, period_end + FROM invoices + ORDER BY created_at DESC", + ) + .fetch_all(&self.pool) + .await? + }; + Ok(rows) + } + + pub async fn mark_invoice_paid(&self, invoice_id: &str) -> Result<()> { + let mut tx = self.pool.begin().await?; + + sqlx::query( + "UPDATE invoices + SET status = 'paid', paid_at = strftime('%s','now'), error = '' + WHERE id = ?", + ) + .bind(invoice_id) + .execute(&mut *tx) + .await?; + + sqlx::query( + "INSERT INTO activities (id, created_at, activity_type, identifier) + VALUES (?, strftime('%s','now'), 'invoice_paid', ?)", + ) + .bind(uuid::Uuid::new_v4().to_string()) + .bind(invoice_id) + .execute(&mut *tx) + .await?; + + tx.commit().await?; + Ok(()) + } + + pub async fn mark_invoice_attempted(&self, invoice_id: &str, error: Option<&str>) -> Result<()> { + let mut tx = self.pool.begin().await?; + + sqlx::query( + "UPDATE invoices + SET attempted_at = strftime('%s','now'), error = COALESCE(?, error) + WHERE id = ?", + ) + .bind(error) + .bind(invoice_id) + .execute(&mut *tx) + .await?; + + sqlx::query( + "INSERT INTO activities (id, created_at, activity_type, identifier) + VALUES (?, strftime('%s','now'), 'invoice_attempted', ?)", + ) + .bind(uuid::Uuid::new_v4().to_string()) + .bind(invoice_id) + .execute(&mut *tx) + .await?; + + tx.commit().await?; + Ok(()) + } + + pub async fn mark_invoice_sent(&self, invoice_id: &str) -> Result<()> { + let mut tx = self.pool.begin().await?; + + sqlx::query("UPDATE invoices SET sent_at = strftime('%s','now') WHERE id = ?") + .bind(invoice_id) + .execute(&mut *tx) + .await?; + + sqlx::query( + "INSERT INTO activities (id, created_at, activity_type, identifier) + VALUES (?, strftime('%s','now'), 'invoice_sent', ?)", + ) + .bind(uuid::Uuid::new_v4().to_string()) + .bind(invoice_id) + .execute(&mut *tx) + .await?; + + tx.commit().await?; + Ok(()) + } + + pub async fn mark_invoice_closed(&self, invoice_id: &str) -> Result<()> { + let mut tx = self.pool.begin().await?; + + sqlx::query( + "UPDATE invoices + SET status = 'closed', closed_at = strftime('%s','now') + WHERE id = ?", + ) + .bind(invoice_id) + .execute(&mut *tx) + .await?; + + sqlx::query( + "INSERT INTO activities (id, created_at, activity_type, identifier) + VALUES (?, strftime('%s','now'), 'invoice_closed', ?)", + ) + .bind(uuid::Uuid::new_v4().to_string()) + .bind(invoice_id) + .execute(&mut *tx) + .await?; + + tx.commit().await?; + Ok(()) + } + + pub async fn list_activity(&self, since: &i64, tenant: Option<&str>) -> Result> { + let rows = if let Some(tenant_pubkey) = tenant { + sqlx::query_as::<_, Activity>( + "SELECT a.id, a.created_at, a.activity_type, a.identifier + FROM activities a + WHERE a.created_at > ? + AND ( + a.activity_type IN ('tenant_created', 'tenant_billing_anchor_updated') + AND a.identifier = ? + OR EXISTS ( + SELECT 1 FROM relays r + WHERE r.id = a.identifier AND r.tenant = ? + ) + OR EXISTS ( + SELECT 1 FROM invoices i + WHERE i.id = a.identifier AND i.tenant = ? + ) + ) + ORDER BY a.created_at, a.id", + ) + .bind(since) + .bind(tenant_pubkey) + .bind(tenant_pubkey) + .bind(tenant_pubkey) + .fetch_all(&self.pool) + .await? + } else { + sqlx::query_as::<_, Activity>( + "SELECT id, created_at, activity_type, identifier + FROM activities + WHERE created_at > ? + ORDER BY created_at, id", + ) + .bind(since) + .fetch_all(&self.pool) + .await? + }; + Ok(rows) + } + + pub async fn get_invoice_items(&self, invoice_id: &str) -> Result> { + let rows = sqlx::query_as::<_, InvoiceItem>( + "SELECT id, invoice, relay, sats + FROM invoice_items + WHERE invoice = ?", + ) + .bind(invoice_id) + .fetch_all(&self.pool) + .await?; + Ok(rows) + } + + pub async fn total_active_paid_relays_for_tenant(&self, tenant: &str) -> Result { + let count = sqlx::query_scalar::<_, i64>( "SELECT COUNT(*) FROM relays - WHERE tenant = ? AND plan != 'free' AND status != 'deactivated'", + WHERE tenant = ? AND status = 'active' AND plan != 'free'", ) .bind(tenant) .fetch_one(&self.pool) @@ -168,386 +580,29 @@ impl Repo { Ok(count) } - // ── relays ──────────────────────────────────────────────────────────────── - - pub async fn upsert_relay(&self, relay: &Relay) -> Result<()> { - let config_json = relay - .config - .as_ref() - .map(serde_json::to_string) - .transpose()?; - sqlx::query(&format!( - "INSERT INTO relays ({RELAY_COLS}) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) - ON CONFLICT(id) DO UPDATE SET - name = excluded.name, - subdomain = excluded.subdomain, - icon = excluded.icon, - description = excluded.description, - plan = excluded.plan, - status = excluded.status, - config = excluded.config" - )) - .bind(&relay.id) - .bind(&relay.tenant) - .bind(&relay.name) - .bind(&relay.subdomain) - .bind(&relay.icon) - .bind(&relay.description) - .bind(&relay.plan) - .bind(&relay.status) - .bind(config_json) - .execute(&self.pool) - .await?; - Ok(()) - } - - pub async fn update_relay_status(&self, id: &str, status: &str) -> Result<()> { - sqlx::query("UPDATE relays SET status = ? WHERE id = ?") - .bind(status) - .bind(id) - .execute(&self.pool) - .await?; - Ok(()) - } - - pub async fn get_relay(&self, id: &str) -> Result> { - let row = sqlx::query(&format!("SELECT {RELAY_COLS} FROM relays WHERE id = ?")) - .bind(id) - .fetch_optional(&self.pool) - .await?; - Ok(row.map(relay_from_row)) - } - - pub async fn list_relays_by_tenant(&self, tenant: &str) -> Result> { - let rows = sqlx::query(&format!( - "SELECT {RELAY_COLS} FROM relays WHERE tenant = ? ORDER BY name" - )) - .bind(tenant) - .fetch_all(&self.pool) - .await?; - Ok(rows.into_iter().map(relay_from_row).collect()) - } - - pub async fn list_relays(&self) -> Result> { - let rows = sqlx::query(&format!("SELECT {RELAY_COLS} FROM relays ORDER BY name")) - .fetch_all(&self.pool) - .await?; - Ok(rows.into_iter().map(relay_from_row).collect()) - } - - // ── lifecycle events ────────────────────────────────────────────────────── - - /// All lifecycle events for a tenant up to (exclusive) a given timestamp, - /// ordered by relay then time — used by the invoice generation worker. - pub async fn list_lifecycle_events_for_tenant( - &self, - tenant: &str, - before: i64, - ) -> Result> { - let rows = sqlx::query_as::<_, RelayLifecycleEvent>(&format!( - "SELECT {LIFECYCLE_COLS} FROM relay_lifecycle_events - WHERE tenant = ? AND created_at < ? - ORDER BY relay, created_at, id" - )) - .bind(tenant) - .bind(before) - .fetch_all(&self.pool) - .await?; - Ok(rows) - } - - // ── plans ───────────────────────────────────────────────────────────────── - - pub async fn list_plans(&self) -> Result> { - let plans = - sqlx::query_as::<_, Plan>("SELECT id, sats_per_month FROM plans ORDER BY sats_per_month") - .fetch_all(&self.pool) - .await?; - Ok(plans) - } - - #[allow(dead_code)] - pub async fn get_plan(&self, id: &str) -> Result> { - let plan = sqlx::query_as::<_, Plan>("SELECT id, sats_per_month FROM plans WHERE id = ?") - .bind(id) - .fetch_optional(&self.pool) - .await?; - Ok(plan) - } - - // ── invoices ────────────────────────────────────────────────────────────── - - /// Insert an invoice and its line items atomically. - /// Returns `false` (no-op) if an invoice for this tenant+period already exists. - pub async fn create_invoice_with_items( - &self, - invoice: &Invoice, - items: &[InvoiceItem], - ) -> Result { - let mut tx: Transaction<'_, Sqlite> = self.pool.begin().await?; - - let result = sqlx::query( - "INSERT INTO invoices (id, tenant, amount, status, created_at, bolt11, period_start, period_end) - VALUES (?, ?, ?, ?, ?, ?, ?, ?) - ON CONFLICT(tenant, period_start, period_end) DO NOTHING", + pub async fn total_pending_invoices_for_tenant(&self, tenant: &str) -> Result { + let count = sqlx::query_scalar::<_, i64>( + "SELECT COUNT(*) FROM invoices + WHERE tenant = ? AND status = 'pending'", ) - .bind(&invoice.id) - .bind(&invoice.tenant) - .bind(invoice.amount) - .bind(&invoice.status) - .bind(invoice.created_at) - .bind(&invoice.bolt11) - .bind(invoice.period_start) - .bind(invoice.period_end) - .execute(&mut *tx) - .await?; - - if result.rows_affected() == 0 { - tx.rollback().await?; - return Ok(false); - } - - for item in items { - sqlx::query( - "INSERT INTO invoice_items (id, invoice, relay, amount, period_start, period_end) - VALUES (?, ?, ?, ?, ?, ?)", - ) - .bind(&item.id) - .bind(&item.invoice) - .bind(&item.relay) - .bind(item.amount) - .bind(item.period_start) - .bind(item.period_end) - .execute(&mut *tx) - .await?; - } - - tx.commit().await?; - Ok(true) - } - - #[allow(dead_code)] - pub async fn get_invoice(&self, id: &str) -> Result> { - let invoice = sqlx::query_as::<_, Invoice>(&format!( - "SELECT {INVOICE_COLS} FROM invoices WHERE id = ?" - )) - .bind(id) - .fetch_optional(&self.pool) - .await?; - Ok(invoice) - } - - pub async fn list_invoices_by_tenant(&self, tenant: &str) -> Result> { - let invoices = sqlx::query_as::<_, Invoice>(&format!( - "SELECT {INVOICE_COLS} FROM invoices WHERE tenant = ? ORDER BY created_at DESC" - )) .bind(tenant) - .fetch_all(&self.pool) - .await?; - Ok(invoices) - } - - #[allow(dead_code)] - pub async fn list_invoice_items(&self, invoice_id: &str) -> Result> { - let items = sqlx::query_as::<_, InvoiceItem>( - "SELECT id, invoice, relay, amount, period_start, period_end - FROM invoice_items WHERE invoice = ?", - ) - .bind(invoice_id) - .fetch_all(&self.pool) - .await?; - Ok(items) - } - - // ── invoice attempts ────────────────────────────────────────────────────── - - /// Record a payment attempt and synchronously project the new invoice status. - pub async fn record_attempt( - &self, - attempt: &InvoiceAttempt, - projected_invoice_status: &str, - ) -> Result<()> { - let mut tx: Transaction<'_, Sqlite> = self.pool.begin().await?; - - sqlx::query(&format!( - "INSERT INTO invoice_attempts ({ATTEMPT_COLS}) VALUES (?, ?, ?, ?, ?, ?, ?)" - )) - .bind(&attempt.id) - .bind(&attempt.invoice) - .bind(&attempt.run_id) - .bind(&attempt.method) - .bind(&attempt.outcome) - .bind(&attempt.error) - .bind(attempt.created_at) - .execute(&mut *tx) - .await?; - - sqlx::query("UPDATE invoices SET status = ? WHERE id = ?") - .bind(projected_invoice_status) - .bind(&attempt.invoice) - .execute(&mut *tx) - .await?; - - tx.commit().await?; - Ok(()) - } - - pub async fn list_attempts_for_invoice(&self, invoice_id: &str) -> Result> { - let attempts = sqlx::query_as::<_, InvoiceAttempt>(&format!( - "SELECT {ATTEMPT_COLS} FROM invoice_attempts - WHERE invoice = ? - ORDER BY created_at ASC, id ASC" - )) - .bind(invoice_id) - .fetch_all(&self.pool) - .await?; - Ok(attempts) - } - - /// Returns true if a DM has already been sent for this invoice (to enforce - /// the one-DM-per-invoice rule). - pub async fn invoice_dm_sent(&self, invoice_id: &str) -> Result { - let count: i64 = sqlx::query_scalar( - "SELECT COUNT(*) FROM invoice_attempts - WHERE invoice = ? AND method = 'nip17_dm' AND outcome = 'sent'", - ) - .bind(invoice_id) .fetch_one(&self.pool) .await?; - Ok(count > 0) + Ok(count) } - // ── relay lifecycle + status: transactional helpers ─────────────────────── - - /// Transition a relay's status and write the corresponding lifecycle event - /// atomically. No-ops if the relay is already in `new_status`. - pub async fn transition_relay( - &self, - relay_id: &str, - tenant_pubkey: &str, - plan: &str, - new_status: &str, - event_type: &str, - now: i64, - ) -> Result { - let mut tx: Transaction<'_, Sqlite> = self.pool.begin().await?; - - let current: Option = sqlx::query_scalar("SELECT status FROM relays WHERE id = ?") - .bind(relay_id) - .fetch_optional(&mut *tx) - .await?; - - let current_status = match current { - Some(s) => s, - None => return Ok(false), + pub async fn get_relay_plan_amount_sats(&self, plan: &str) -> Result { + let sats = match plan { + "free" => 0, + "basic" => 10_000, + "growth" => 50_000, + _ => 0, }; - - if current_status == new_status { - return Ok(true); // idempotent no-op - } - - sqlx::query("UPDATE relays SET status = ? WHERE id = ?") - .bind(new_status) - .bind(relay_id) - .execute(&mut *tx) - .await?; - - let event = RelayLifecycleEvent { - id: uuid::Uuid::new_v4().to_string(), - relay: relay_id.to_string(), - tenant: tenant_pubkey.to_string(), - event_type: event_type.to_string(), - plan: plan.to_string(), - created_at: now, - }; - - sqlx::query(&format!( - "INSERT INTO relay_lifecycle_events ({LIFECYCLE_COLS}) VALUES (?, ?, ?, ?, ?, ?)" - )) - .bind(&event.id) - .bind(&event.relay) - .bind(&event.tenant) - .bind(&event.event_type) - .bind(&event.plan) - .bind(event.created_at) - .execute(&mut *tx) - .await?; - - tx.commit().await?; - Ok(true) + Ok(sats) } - /// Suspend all active relays for a tenant, writing lifecycle events for each. - pub async fn suspend_relays_for_tenant(&self, tenant: &str, now: i64) -> Result<()> { - let mut tx: Transaction<'_, Sqlite> = self.pool.begin().await?; - - let rows = - sqlx::query("SELECT id, plan FROM relays WHERE tenant = ? AND status = 'active'") - .bind(tenant) - .fetch_all(&mut *tx) - .await?; - - for row in rows { - let relay_id: String = row.get("id"); - let plan: String = row.get("plan"); - - sqlx::query("UPDATE relays SET status = 'suspended' WHERE id = ?") - .bind(&relay_id) - .execute(&mut *tx) - .await?; - - sqlx::query(&format!( - "INSERT INTO relay_lifecycle_events ({LIFECYCLE_COLS}) VALUES (?, ?, ?, ?, ?, ?)" - )) - .bind(uuid::Uuid::new_v4().to_string()) - .bind(&relay_id) - .bind(tenant) - .bind("suspended") - .bind(plan) - .bind(now) - .execute(&mut *tx) - .await?; - } - - tx.commit().await?; - Ok(()) - } - - /// Reactivate all billing-suspended relays for a tenant (used after full - /// balance payment). - pub async fn reactivate_relays_for_tenant(&self, tenant: &str, now: i64) -> Result<()> { - let mut tx: Transaction<'_, Sqlite> = self.pool.begin().await?; - - let rows = - sqlx::query("SELECT id, plan FROM relays WHERE tenant = ? AND status = 'suspended'") - .bind(tenant) - .fetch_all(&mut *tx) - .await?; - - for row in rows { - let relay_id: String = row.get("id"); - let plan: String = row.get("plan"); - - sqlx::query("UPDATE relays SET status = 'active' WHERE id = ?") - .bind(&relay_id) - .execute(&mut *tx) - .await?; - - sqlx::query(&format!( - "INSERT INTO relay_lifecycle_events ({LIFECYCLE_COLS}) VALUES (?, ?, ?, ?, ?, ?)" - )) - .bind(uuid::Uuid::new_v4().to_string()) - .bind(&relay_id) - .bind(tenant) - .bind("unsuspended") - .bind(plan) - .bind(now) - .execute(&mut *tx) - .await?; - } - - tx.commit().await?; - Ok(()) + #[allow(dead_code)] + async fn _log_activity_public(&self, activity_type: &str, identifier: &str) -> Result<()> { + self.log_activity(activity_type, identifier).await } } diff --git a/backend/src/robot.rs b/backend/src/robot.rs new file mode 100644 index 0000000..3edf51f --- /dev/null +++ b/backend/src/robot.rs @@ -0,0 +1,239 @@ +use std::collections::HashMap; +use std::time::{Duration, Instant}; + +use anyhow::{Result, anyhow}; +use nostr_sdk::prelude::*; +use tokio::sync::Mutex; + +#[derive(Clone)] +pub struct Robot { + secret: String, + name: String, + description: String, + picture: String, + outbox_relays: Vec, + indexer_relays: Vec, + messaging_relays: Vec, + client: Client, + outbox_cache: std::sync::Arc>>, + dm_cache: std::sync::Arc>>, +} + +#[derive(Clone)] +struct CacheEntry { + values: Vec, + fetched_at: Instant, +} + +impl Robot { + pub async fn new() -> Result { + let secret = std::env::var("ROBOT_SECRET").unwrap_or_default(); + if secret.trim().is_empty() { + return Err(anyhow!("ROBOT_SECRET is required")); + } + + let name = std::env::var("ROBOT_NAME").unwrap_or_default(); + let description = std::env::var("ROBOT_DESCRIPTION").unwrap_or_default(); + let picture = std::env::var("ROBOT_PICTURE").unwrap_or_default(); + let outbox_relays = split_env("ROBOT_OUTBOX_RELAYS"); + let indexer_relays = split_env("ROBOT_INDEXER_RELAYS"); + let messaging_relays = split_env("ROBOT_MESSAGING_RELAYS"); + + let keys = Keys::parse(&secret)?; + let client = Client::new(keys); + for relay in &outbox_relays { + client.add_relay(relay).await?; + } + for relay in &indexer_relays { + client.add_relay(relay).await?; + } + for relay in &messaging_relays { + client.add_relay(relay).await?; + } + client.connect().await; + + let robot = Self { + secret, + name, + description, + picture, + outbox_relays, + indexer_relays, + messaging_relays, + client, + outbox_cache: std::sync::Arc::new(Mutex::new(HashMap::new())), + dm_cache: std::sync::Arc::new(Mutex::new(HashMap::new())), + }; + + robot.publish_identity().await?; + Ok(robot) + } + + async fn publish_identity(&self) -> Result<()> { + let mut metadata = Metadata::new(); + if !self.name.is_empty() { + metadata = metadata.name(&self.name); + } + if !self.description.is_empty() { + metadata = metadata.about(&self.description); + } + if !self.picture.is_empty() { + metadata = metadata.picture(Url::parse(&self.picture)?); + } + + self.client + .send_event_builder(EventBuilder::metadata(&metadata)) + .await?; + + let outbox_tags = self + .outbox_relays + .iter() + .map(|r| Tag::parse(["r", r.as_str()])) + .collect::, _>>()?; + self.client + .send_event_builder(EventBuilder::new(Kind::Custom(10002), "").tags(outbox_tags)) + .await?; + + let mut selection_tags = Vec::new(); + for relay in &self.messaging_relays { + selection_tags.push(Tag::parse(["relay", relay.as_str()])?); + } + self.client + .send_event_builder(EventBuilder::new(Kind::Custom(10050), "").tags(selection_tags)) + .await?; + + Ok(()) + } + + pub async fn send_dm(&self, recipient: &str, message: &str) -> Result<()> { + let outbox = self.fetch_outbox_relays(recipient).await?; + if outbox.is_empty() { + return Err(anyhow!("no outbox relays found for recipient")); + } + + let dm_relays = self.fetch_messaging_relays_from_outbox(recipient, &outbox).await?; + if dm_relays.is_empty() { + return Err(anyhow!("no messaging relays found for recipient")); + } + + let recipient_pubkey = PublicKey::parse(recipient)?; + let keys = Keys::parse(&self.secret)?; + let client = Client::new(keys); + for relay in dm_relays { + client.add_relay(relay).await?; + } + client.connect().await; + client.send_private_msg(recipient_pubkey, message, []).await?; + Ok(()) + } + + async fn fetch_outbox_relays(&self, recipient: &str) -> Result> { + if let Some(values) = get_cached(&self.outbox_cache, recipient).await { + return Ok(values); + } + + let pubkey = PublicKey::parse(recipient)?; + let client = indexer_client(&self.secret, &self.indexer_relays).await?; + let filter = Filter::new().author(pubkey).kind(Kind::Custom(10002)); + let events = client + .fetch_events(filter, Duration::from_secs(5)) + .await?; + + let mut relays = Vec::new(); + if let Some(event) = events.into_iter().max_by_key(|e| e.created_at) { + for tag in event.tags.iter() { + let values = tag.as_slice(); + if values.len() >= 2 && values[0] == "r" { + relays.push(values[1].to_string()); + } + } + } + + set_cached(&self.outbox_cache, recipient, relays.clone()).await; + Ok(relays) + } + + async fn fetch_messaging_relays_from_outbox( + &self, + recipient: &str, + outbox_relays: &[String], + ) -> Result> { + if let Some(values) = get_cached(&self.dm_cache, recipient).await { + return Ok(values); + } + + let pubkey = PublicKey::parse(recipient)?; + let keys = Keys::parse(&self.secret)?; + let client = Client::new(keys); + for relay in outbox_relays { + client.add_relay(relay).await?; + } + client.connect().await; + + let filter = Filter::new().author(pubkey).kind(Kind::Custom(10050)); + let events = client + .fetch_events(filter, Duration::from_secs(5)) + .await?; + + let mut relays = Vec::new(); + if let Some(event) = events.into_iter().max_by_key(|e| e.created_at) { + for tag in event.tags.iter() { + let values = tag.as_slice(); + if values.len() >= 2 && values[0] == "relay" { + relays.push(values[1].to_string()); + } + } + } + + set_cached(&self.dm_cache, recipient, relays.clone()).await; + Ok(relays) + } +} + +fn split_env(key: &str) -> Vec { + std::env::var(key) + .unwrap_or_default() + .split(',') + .map(|v| v.trim().to_string()) + .filter(|v| !v.is_empty()) + .collect() +} + +async fn indexer_client(secret: &str, indexer_relays: &[String]) -> Result { + let keys = Keys::parse(secret)?; + let client = Client::new(keys); + for relay in indexer_relays { + client.add_relay(relay).await?; + } + client.connect().await; + Ok(client) +} + +async fn get_cached( + cache: &std::sync::Arc>>, + key: &str, +) -> Option> { + let guard = cache.lock().await; + guard.get(key).and_then(|entry| { + if entry.fetched_at.elapsed() < Duration::from_secs(300) { + Some(entry.values.clone()) + } else { + None + } + }) +} + +async fn set_cached( + cache: &std::sync::Arc>>, + key: &str, + values: Vec, +) { + let mut guard = cache.lock().await; + guard.insert( + key.to_string(), + CacheEntry { + values, + fetched_at: Instant::now(), + }, + ); +}