From 35d9aab02ad05a6af7561a6a2ccb057371071037 Mon Sep 17 00:00:00 2001 From: Jon Staab Date: Wed, 27 May 2026 17:26:47 -0700 Subject: [PATCH] Make infra module free functions --- backend/src/api.rs | 5 +- backend/src/infra.rs | 496 ++++++++++++++++----------------- backend/src/main.rs | 8 +- backend/src/query.rs | 52 ++-- backend/src/routes/invoices.rs | 2 +- backend/src/routes/relays.rs | 10 +- backend/src/routes/tenants.rs | 2 +- 7 files changed, 275 insertions(+), 300 deletions(-) diff --git a/backend/src/api.rs b/backend/src/api.rs index 76c3aee..03b6e01 100644 --- a/backend/src/api.rs +++ b/backend/src/api.rs @@ -29,7 +29,6 @@ use nostr_sdk::{Event, JsonUtil, Kind}; use crate::billing::Billing; use crate::env; -use crate::infra::Infra; use crate::models::{Relay, Tenant}; use crate::query; use crate::robot::Robot; @@ -52,16 +51,14 @@ pub struct Api { pub billing: Billing, pub stripe: Stripe, pub robot: Robot, - pub infra: Infra, } impl Api { - pub fn new(billing: Billing, stripe: Stripe, robot: Robot, infra: Infra) -> Self { + pub fn new(billing: Billing, stripe: Stripe, robot: Robot) -> Self { Self { billing, stripe, robot, - infra, } } diff --git a/backend/src/infra.rs b/backend/src/infra.rs index 177a6b4..40fca45 100644 --- a/backend/src/infra.rs +++ b/backend/src/infra.rs @@ -1,3 +1,7 @@ +//! The relay-provisioning reactor: it keeps the external relay backend (the +//! zooid API) in sync with our relay rows, reacting to relay activity and +//! retrying failed syncs with backoff. + use anyhow::Result; use nostr_sdk::prelude::*; use std::time::Duration; @@ -12,298 +16,272 @@ const RELAY_SYNC_RETRY_BASE_DELAY_SECS: u64 = 30; const RELAY_SYNC_RETRY_MAX_DELAY_SECS: u64 = 15 * 60; const RELAY_SYNC_RETRY_MAX_ATTEMPTS: usize = 6; -/// The relay-provisioning reactor: it keeps the external relay backend (the -/// zooid API) in sync with our relay rows, reacting to relay activity and -/// retrying failed syncs with backoff. -#[derive(Clone)] -pub struct Infra; +/// Run the reactor for the life of the process: reconcile any relays left +/// unsynced from a previous run, then sync each relay as its activity arrives. +pub async fn start() { + let mut rx = db::subscribe(); -impl Default for Infra { - fn default() -> Self { - Self::new() + if let Err(error) = reconcile_relay_state("startup").await { + tracing::error!(error = %error, "failed to reconcile relay state on startup"); + } + + loop { + match rx.recv().await { + Ok(activity) => { + if let Err(e) = handle_activity(&activity).await { + tracing::error!(error = %e, "infra handle_activity failed"); + } + } + Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => { + tracing::warn!(missed = n, "infra lagged"); + + if let Err(error) = reconcile_relay_state("lagged").await { + tracing::error!(error = %error, "failed to reconcile relay state after lag"); + } + } + Err(tokio::sync::broadcast::error::RecvError::Closed) => break, + } } } -impl Infra { - pub fn new() -> Self { - Self +async fn handle_activity(activity: &Activity) -> Result<()> { + let needs_sync = matches!( + activity.activity_type.as_str(), + "create_relay" | "update_relay" | "activate_relay" | "deactivate_relay" | "fail_relay_sync" + ); + + if activity.resource_type != "relay" || !needs_sync { + return Ok(()); } - /// Run the reactor for the life of the process: reconcile any relays left - /// unsynced from a previous run, then sync each relay as its activity arrives. - pub async fn start(self) { - let mut rx = db::subscribe(); + if activity.activity_type == "fail_relay_sync" { + schedule_relay_sync_retry(&activity.resource_id, "activity").await?; + return Ok(()); + } - if let Err(error) = self.reconcile_relay_state("startup").await { - tracing::error!(error = %error, "failed to reconcile relay state on startup"); - } + let Some(relay) = query::get_relay(&activity.resource_id).await? else { + return Ok(()); + }; - loop { - match rx.recv().await { - Ok(activity) => { - if let Err(e) = self.handle_activity(&activity).await { - tracing::error!(error = %e, "infra handle_activity failed"); - } - } - Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => { - tracing::warn!(missed = n, "infra lagged"); + sync_relay(&relay).await; + Ok(()) +} - if let Err(error) = self.reconcile_relay_state("lagged").await { - tracing::error!(error = %error, "failed to reconcile relay state after lag"); - } - } - Err(tokio::sync::broadcast::error::RecvError::Closed) => break, - } +async fn reconcile_relay_state(source: &str) -> Result<()> { + let relays = query::list_relays_pending_sync().await?; + + if relays.is_empty() { + return Ok(()); + } + + tracing::info!(source, relay_count = relays.len(), "reconciling pending relay state"); + + for relay in relays { + if relay.sync_error.trim().is_empty() { + sync_relay(&relay).await; + } else { + schedule_relay_sync_retry(&relay.id, source).await?; } } - async fn handle_activity(&self, activity: &Activity) -> Result<()> { - let needs_sync = matches!( - activity.activity_type.as_str(), - "create_relay" | "update_relay" | "activate_relay" | "deactivate_relay" | "fail_relay_sync" - ); + Ok(()) +} - if activity.resource_type != "relay" || !needs_sync { - return Ok(()); +async fn schedule_relay_sync_retry(relay_id: &str, source: &str) -> Result<()> { + fn get_retry_delay(consecutive_failures: usize) -> Option { + let retry_attempt = consecutive_failures.max(1); + if retry_attempt > RELAY_SYNC_RETRY_MAX_ATTEMPTS { + return None; } - if activity.activity_type == "fail_relay_sync" { - self.schedule_relay_sync_retry(&activity.resource_id, "activity").await?; - return Ok(()); - } + let exponent = (retry_attempt - 1).min(31); + let multiplier = 1u64 << exponent; + let delay_secs = RELAY_SYNC_RETRY_BASE_DELAY_SECS + .saturating_mul(multiplier) + .min(RELAY_SYNC_RETRY_MAX_DELAY_SECS); - let Some(relay) = query::get_relay(&activity.resource_id).await? else { - return Ok(()); - }; - - self.sync_relay(&relay).await; - Ok(()) + Some(Duration::from_secs(delay_secs)) } - async fn reconcile_relay_state(&self, source: &str) -> Result<()> { - let relays = query::list_relays_pending_sync().await?; + let activities = query::list_activity_for_resource(relay_id).await?; + let consecutive_failures = activities + .iter() + .take_while(|activity| activity.activity_type == "fail_relay_sync") + .count(); - if relays.is_empty() { - return Ok(()); - } - - tracing::info!(source, relay_count = relays.len(), "reconciling pending relay state"); - - for relay in relays { - if relay.sync_error.trim().is_empty() { - self.sync_relay(&relay).await; - } else { - self.schedule_relay_sync_retry(&relay.id, source).await?; - } - } - - Ok(()) - } - - async fn schedule_relay_sync_retry(&self, relay_id: &str, source: &str) -> Result<()> { - fn get_retry_delay(consecutive_failures: usize) -> Option { - let retry_attempt = consecutive_failures.max(1); - if retry_attempt > RELAY_SYNC_RETRY_MAX_ATTEMPTS { - return None; - } - - let exponent = (retry_attempt - 1).min(31); - let multiplier = 1u64 << exponent; - let delay_secs = RELAY_SYNC_RETRY_BASE_DELAY_SECS - .saturating_mul(multiplier) - .min(RELAY_SYNC_RETRY_MAX_DELAY_SECS); - - Some(Duration::from_secs(delay_secs)) - } - - let activities = query::list_activity_for_resource(relay_id).await?; - let consecutive_failures = activities - .iter() - .take_while(|activity| activity.activity_type == "fail_relay_sync") - .count(); - - let Some(delay) = get_retry_delay(consecutive_failures) else { - tracing::warn!( - relay = relay_id, - consecutive_failures, - max_attempts = RELAY_SYNC_RETRY_MAX_ATTEMPTS, - "relay sync retries exhausted; awaiting manual intervention" - ); - return Ok(()); - }; - - tracing::info!( + let Some(delay) = get_retry_delay(consecutive_failures) else { + tracing::warn!( relay = relay_id, - source, consecutive_failures, - delay_secs = delay.as_secs(), - "scheduled relay sync retry" + max_attempts = RELAY_SYNC_RETRY_MAX_ATTEMPTS, + "relay sync retries exhausted; awaiting manual intervention" ); + return Ok(()); + }; - let relay_id = relay_id.to_string(); - let infra = self.clone(); + tracing::info!( + relay = relay_id, + source, + consecutive_failures, + delay_secs = delay.as_secs(), + "scheduled relay sync retry" + ); - tokio::spawn(async move { - tokio::time::sleep(delay).await; + let relay_id = relay_id.to_string(); - match query::get_relay(&relay_id).await { - Ok(Some(relay)) => infra.sync_relay(&relay).await, - Ok(None) => {} - Err(e) => { - tracing::error!(relay = %relay_id, error = %e, "relay sync retry task failed"); - } - } - }); + tokio::spawn(async move { + tokio::time::sleep(delay).await; - Ok(()) - } - - async fn sync_relay(&self, relay: &Relay) { - match self.try_sync_relay(relay).await { - Ok(()) => { - tracing::info!(relay = %relay.id, "relay sync succeeded"); - if let Err(e) = command::complete_relay_sync(&relay.id).await { - tracing::error!(relay = %relay.id, error = %e, "failed to mark sync complete"); - } - } + match query::get_relay(&relay_id).await { + Ok(Some(relay)) => sync_relay(&relay).await, + Ok(None) => {} Err(e) => { - tracing::warn!(relay = %relay.id, error = %e, "relay sync failed"); - if let Err(e2) = command::fail_relay_sync(relay, e.to_string()).await { - tracing::error!(relay = %relay.id, error = %e2, "failed to record sync failure"); - } + tracing::error!(relay = %relay_id, error = %e, "relay sync retry task failed"); } } - } + }); - async fn try_sync_relay(&self, relay: &Relay) -> Result<()> { - // A relay is "new" (POST with a freshly generated secret) only if it has - // never completed a sync. `synced == 1` short-circuits the activity lookup; - // otherwise check the activity history so that a re-sync after an update - // (which resets `synced` to 0) PATCHes instead of clobbering the secret. - let is_new = relay.synced != 1 - && query::get_latest_activity_for_resource_and_type(&relay.id, "complete_relay_sync") - .await? - .is_none(); + Ok(()) +} - let mut body = serde_json::json!({ - "host": format!("{}.{}", relay.subdomain, env::get().relay_domain), - "schema": relay.id, - "inactive": relay.status == RELAY_STATUS_INACTIVE - || relay.status == RELAY_STATUS_DELINQUENT, - "info": { - "name": relay.info_name, - "icon": relay.info_icon, - "description": relay.info_description, - "pubkey": relay.tenant, - }, - "policy": { - "public_join": relay.policy_public_join == 1, - "strip_signatures": relay.policy_strip_signatures == 1, - }, - "groups": { "enabled": relay.groups_enabled == 1 }, - "management": { "enabled": relay.management_enabled == 1 }, - "blossom": if relay.blossom_enabled == 1 { - serde_json::json!({ - "enabled": true, - "adapter": "s3", - "s3": { - "endpoint": env::get().blossom_s3_endpoint, - "region": env::get().blossom_s3_region, - "bucket": env::get().blossom_s3_bucket, - "access_key": env::get().blossom_s3_access_key, - "secret_key": env::get().blossom_s3_secret_key, - "key_prefix": relay.id, - }, - }) - } else { - serde_json::json!({ "enabled": false }) - }, - "livekit": if relay.livekit_enabled == 1 { - serde_json::json!({ - "enabled": true, - "server_url": env::get().livekit_url, - "api_key": env::get().livekit_api_key, - "api_secret": env::get().livekit_api_secret, - }) - } else { - serde_json::json!({ "enabled": false }) - }, - "push": { "enabled": relay.push_enabled == 1 }, - "roles": { - "admin": { "can_manage": true, "can_invite": true }, - "member": { "can_invite": true }, - }, - }); - - // Only provide a secret if the relay is new. This allows us to not store the relay secrets on our side. - if is_new && let Some(obj) = body.as_object_mut() { - obj.insert( - "secret".to_string(), - serde_json::Value::String(Keys::generate().secret_key().to_secret_hex()), - ); +async fn sync_relay(relay: &Relay) { + match try_sync_relay(relay).await { + Ok(()) => { + tracing::info!(relay = %relay.id, "relay sync succeeded"); + if let Err(e) = command::complete_relay_sync(&relay.id).await { + tracing::error!(relay = %relay.id, error = %e, "failed to mark sync complete"); + } } - - let method = if is_new { HttpMethod::POST } else { HttpMethod::PATCH }; - self.request(method, &format!("relay/{}", relay.id), Some(&body)) - .await?; - Ok(()) - } - - /// Fetch the member pubkeys of a relay from the zooid API. - pub async fn list_relay_members(&self, relay_id: &str) -> Result> { - #[derive(serde::Deserialize)] - struct MembersResponse { - members: Vec, + Err(e) => { + tracing::warn!(relay = %relay.id, error = %e, "relay sync failed"); + if let Err(e2) = command::fail_relay_sync(relay, e.to_string()).await { + tracing::error!(relay = %relay.id, error = %e2, "failed to record sync failure"); + } } - - let response = self - .request(HttpMethod::GET, &format!("relay/{relay_id}/members"), None) - .await?; - let parsed: MembersResponse = response.json().await?; - Ok(parsed.members) - } - - // Internal utilities - - /// Sends an authenticated request to the zooid API at `path` (relative to - /// `env.zooid_api_url`). Returns the response on 2xx; bails with the body - /// text otherwise. - async fn request( - &self, - method: HttpMethod, - path: &str, - body: Option<&serde_json::Value>, - ) -> Result { - let client = reqwest::Client::builder() - .timeout(Duration::from_secs(5)) - .build()?; - let base = env::get().zooid_api_url.trim_end_matches('/'); - let path = path.trim_start_matches('/'); - let url = format!("{base}/{path}"); - let auth = env::get().make_auth(&url, method).await?; - - let reqwest_method = match method { - HttpMethod::GET => reqwest::Method::GET, - HttpMethod::POST => reqwest::Method::POST, - HttpMethod::PUT => reqwest::Method::PUT, - HttpMethod::PATCH => reqwest::Method::PATCH, - }; - - let mut req = client - .request(reqwest_method, &url) - .header("Authorization", auth); - if let Some(body) = body { - req = req.json(body); - } - - let response = req.send().await?; - - if !response.status().is_success() { - let status = response.status(); - let text = response.text().await.unwrap_or_default(); - anyhow::bail!("zooid {method} {path} returned {status}: {text}"); - } - Ok(response) } } +async fn try_sync_relay(relay: &Relay) -> Result<()> { + // A relay is "new" (POST with a freshly generated secret) only if it has + // never completed a sync. `synced == 1` short-circuits the activity lookup; + // otherwise check the activity history so that a re-sync after an update + // (which resets `synced` to 0) PATCHes instead of clobbering the secret. + let is_new = relay.synced != 1 + && query::get_latest_activity_for_resource_and_type(&relay.id, "complete_relay_sync") + .await? + .is_none(); + + let mut body = serde_json::json!({ + "host": format!("{}.{}", relay.subdomain, env::get().relay_domain), + "schema": relay.id, + "inactive": relay.status == RELAY_STATUS_INACTIVE + || relay.status == RELAY_STATUS_DELINQUENT, + "info": { + "name": relay.info_name, + "icon": relay.info_icon, + "description": relay.info_description, + "pubkey": relay.tenant, + }, + "policy": { + "public_join": relay.policy_public_join == 1, + "strip_signatures": relay.policy_strip_signatures == 1, + }, + "groups": { "enabled": relay.groups_enabled == 1 }, + "management": { "enabled": relay.management_enabled == 1 }, + "blossom": if relay.blossom_enabled == 1 { + serde_json::json!({ + "enabled": true, + "adapter": "s3", + "s3": { + "endpoint": env::get().blossom_s3_endpoint, + "region": env::get().blossom_s3_region, + "bucket": env::get().blossom_s3_bucket, + "access_key": env::get().blossom_s3_access_key, + "secret_key": env::get().blossom_s3_secret_key, + "key_prefix": relay.id, + }, + }) + } else { + serde_json::json!({ "enabled": false }) + }, + "livekit": if relay.livekit_enabled == 1 { + serde_json::json!({ + "enabled": true, + "server_url": env::get().livekit_url, + "api_key": env::get().livekit_api_key, + "api_secret": env::get().livekit_api_secret, + }) + } else { + serde_json::json!({ "enabled": false }) + }, + "push": { "enabled": relay.push_enabled == 1 }, + "roles": { + "admin": { "can_manage": true, "can_invite": true }, + "member": { "can_invite": true }, + }, + }); + + // Only provide a secret if the relay is new. This allows us to not store the relay secrets on our side. + if is_new && let Some(obj) = body.as_object_mut() { + obj.insert( + "secret".to_string(), + serde_json::Value::String(Keys::generate().secret_key().to_secret_hex()), + ); + } + + let method = if is_new { HttpMethod::POST } else { HttpMethod::PATCH }; + request(method, &format!("relay/{}", relay.id), Some(&body)).await?; + Ok(()) +} + +/// Fetch the member pubkeys of a relay from the zooid API. +pub async fn list_relay_members(relay_id: &str) -> Result> { + #[derive(serde::Deserialize)] + struct MembersResponse { + members: Vec, + } + + let response = request(HttpMethod::GET, &format!("relay/{relay_id}/members"), None).await?; + let parsed: MembersResponse = response.json().await?; + Ok(parsed.members) +} + +/// Sends an authenticated request to the zooid API at `path` (relative to +/// `env.zooid_api_url`). Returns the response on 2xx; bails with the body +/// text otherwise. +async fn request( + method: HttpMethod, + path: &str, + body: Option<&serde_json::Value>, +) -> Result { + let client = reqwest::Client::builder() + .timeout(Duration::from_secs(5)) + .build()?; + let base = env::get().zooid_api_url.trim_end_matches('/'); + let path = path.trim_start_matches('/'); + let url = format!("{base}/{path}"); + let auth = env::get().make_auth(&url, method).await?; + + let reqwest_method = match method { + HttpMethod::GET => reqwest::Method::GET, + HttpMethod::POST => reqwest::Method::POST, + HttpMethod::PUT => reqwest::Method::PUT, + HttpMethod::PATCH => reqwest::Method::PATCH, + }; + + let mut req = client + .request(reqwest_method, &url) + .header("Authorization", auth); + if let Some(body) = body { + req = req.json(body); + } + + let response = req.send().await?; + + if !response.status().is_success() { + let status = response.status(); + let text = response.text().await.unwrap_or_default(); + anyhow::bail!("zooid {method} {path} returned {status}: {text}"); + } + Ok(response) +} diff --git a/backend/src/main.rs b/backend/src/main.rs index 1def69c..9b52f2c 100644 --- a/backend/src/main.rs +++ b/backend/src/main.rs @@ -20,7 +20,6 @@ use tower_http::cors::{AllowOrigin, CorsLayer, Any}; use crate::api::Api; use crate::billing::Billing; -use crate::infra::Infra; use crate::robot::Robot; use crate::stripe::Stripe; @@ -39,9 +38,8 @@ async fn main() -> Result<()> { let robot = Robot::new().await?; let stripe = Stripe::new(); - let infra = Infra::new(); let billing = Billing::new(robot.clone()); - let api = Api::new(billing.clone(), stripe, robot, infra.clone()); + let api = Api::new(billing.clone(), stripe, robot); let parsed = env::get() .server_allow_origins @@ -55,8 +53,8 @@ async fn main() -> Result<()> { let app = api.router().layer(cors); - tokio::spawn(async move { - infra.start().await; + tokio::spawn(async { + infra::start().await; }); tokio::spawn(async move { diff --git a/backend/src/query.rs b/backend/src/query.rs index 04d5037..9b1c04b 100644 --- a/backend/src/query.rs +++ b/backend/src/query.rs @@ -15,7 +15,7 @@ fn select_activity(tail: &str) -> String { format!("SELECT * FROM activity {tail}") } -// Plans +// --- Plans --- pub fn list_plans() -> Vec { vec![ @@ -50,7 +50,7 @@ pub fn get_plan(plan_id: &str) -> Option { list_plans().into_iter().find(|p| p.id == plan_id) } -// Tenants +// --- Tenants --- pub async fn list_tenants() -> Result> { Ok(sqlx::query_as::<_, Tenant>(&select_tenant("")) @@ -65,7 +65,7 @@ pub async fn get_tenant(pubkey: &str) -> Result> { .await?) } -// Relays +// --- Relays --- pub async fn list_relays() -> Result> { Ok(sqlx::query_as::<_, Relay>(&select_relay("")) @@ -95,7 +95,26 @@ pub async fn get_relay(id: &str) -> Result> { .await?) } -// Invoices +/// The relay's plan immediately before `before`, read from the activity log +/// (the most recent `create_relay`/`update_relay` with `created_at < before`). +/// Billing uses this as the `old` side of a plan-change delta. +pub async fn get_relay_plan_before(relay_id: &str, before: i64) -> Result> { + Ok(sqlx::query_scalar::<_, String>( + "SELECT plan_id FROM activity + WHERE resource_id = ? + AND resource_type = 'relay' + AND plan_id IS NOT NULL + AND created_at < ? + ORDER BY created_at DESC + LIMIT 1", + ) + .bind(relay_id) + .bind(before) + .fetch_optional(pool()) + .await?) +} + +// --- Invoices --- pub async fn get_invoice(invoice_id: &str) -> Result> { Ok(sqlx::query_as::<_, Invoice>("SELECT * FROM invoice WHERE id = ?") @@ -104,7 +123,7 @@ pub async fn get_invoice(invoice_id: &str) -> Result> { .await?) } -pub async fn list_invoices(tenant_pubkey: &str) -> Result> { +pub async fn list_invoices_for_tenant(tenant_pubkey: &str) -> Result> { Ok(sqlx::query_as::<_, Invoice>( "SELECT * FROM invoice WHERE tenant_pubkey = ? ORDER BY created_at DESC", ) @@ -113,7 +132,7 @@ pub async fn list_invoices(tenant_pubkey: &str) -> Result> { .await?) } -pub async fn get_latest_invoice(tenant_pubkey: &str) -> Result> { +pub async fn get_latest_invoice_for_tenant(tenant_pubkey: &str) -> Result> { Ok(sqlx::query_as::<_, Invoice>( "SELECT * FROM invoice WHERE tenant_pubkey = ? ORDER BY created_at DESC LIMIT 1", ) @@ -131,24 +150,7 @@ pub async fn get_invoice_items_for_invoice(invoice_id: &str) -> Result Result> { - Ok(sqlx::query_scalar::<_, String>( - "SELECT plan_id FROM activity - WHERE resource_id = ? - AND created_at < ? - AND activity_type IN ('create_relay', 'update_relay') - AND plan_id IS NOT NULL - ORDER BY created_at DESC - LIMIT 1", - ) - .bind(relay_id) - .bind(before) - .fetch_optional(pool()) - .await?) -} +// --- Bolt11 --- pub async fn get_bolt11(bolt11_id: &str) -> Result> { Ok(sqlx::query_as::<_, Bolt11>("SELECT * FROM bolt11 WHERE id = ?") @@ -166,7 +168,7 @@ pub async fn get_bolt11_for_invoice(invoice_id: &str) -> Result> .await?) } -// Activity +// --- Activity --- /// Billable activity for a tenant not yet folded into an invoice. The /// activity-type filter and the `billed_at IS NULL` guard live here so the diff --git a/backend/src/routes/invoices.rs b/backend/src/routes/invoices.rs index 415a507..5ca51e7 100644 --- a/backend/src/routes/invoices.rs +++ b/backend/src/routes/invoices.rs @@ -26,7 +26,7 @@ pub async fn get_tenant_latest_invoice( .await .map_err(internal)?; - let invoice = query::get_latest_invoice(&pubkey).await.map_err(internal)?; + let invoice = query::get_latest_invoice_for_tenant(&pubkey).await.map_err(internal)?; ok(invoice) } diff --git a/backend/src/routes/relays.rs b/backend/src/routes/relays.rs index fd606aa..c1feafd 100644 --- a/backend/src/routes/relays.rs +++ b/backend/src/routes/relays.rs @@ -9,7 +9,7 @@ use regex::Regex; use serde::Deserialize; use crate::api::{Api, AuthedPubkey}; -use crate::{command, query}; +use crate::{command, infra, query}; use crate::models::{ RELAY_STATUS_ACTIVE, RELAY_STATUS_DELINQUENT, RELAY_STATUS_INACTIVE, Relay, }; @@ -60,7 +60,7 @@ pub async fn list_relay_members( let relay = api.get_relay_or_404(&id).await?; api.require_admin_or_tenant(&auth, &relay.tenant)?; - let members = fetch_relay_members(&api, &relay).await.map_err(internal)?; + let members = fetch_relay_members(&relay).await.map_err(internal)?; ok(serde_json::json!({ "members": members })) } @@ -197,7 +197,7 @@ pub async fn update_relay( let selected_plan = query::get_plan(&relay.plan).expect("validated plan must exist"); if let Some(limit) = selected_plan.members { - let current_members = fetch_relay_members(&api, &relay) + let current_members = fetch_relay_members(&relay) .await .map_err(internal)? .len() as i64; @@ -265,12 +265,12 @@ pub async fn reactivate_relay( // --- helpers ---------------------------------------------------------------- -async fn fetch_relay_members(api: &Api, relay: &Relay) -> Result> { +async fn fetch_relay_members(relay: &Relay) -> Result> { if relay.synced == 0 { return Ok(Vec::new()); } - api.infra.list_relay_members(&relay.id).await + infra::list_relay_members(&relay.id).await } const RESERVED_SUBDOMAINS: [&str; 3] = ["api", "admin", "internal"]; diff --git a/backend/src/routes/tenants.rs b/backend/src/routes/tenants.rs index 974dd9c..1cfc85d 100644 --- a/backend/src/routes/tenants.rs +++ b/backend/src/routes/tenants.rs @@ -149,7 +149,7 @@ pub async fn list_tenant_invoices( ) -> ApiResult { api.require_admin_or_tenant(&auth, &pubkey)?; - let invoices = query::list_invoices(&pubkey) + let invoices = query::list_invoices_for_tenant(&pubkey) .await .map_err(internal)?;