Split repo methods into tenant and non-tenant versions

This commit is contained in:
Jon Staab
2026-03-26 12:53:21 -07:00
parent 1b3fe346f5
commit 619fd0c2ce
5 changed files with 112 additions and 87 deletions
+18 -6
View File
@@ -53,9 +53,13 @@ Notes:
- Returns the hardcoded relay plans used by the system (`free`, `basic`, `growth`) - Returns the hardcoded relay plans used by the system (`free`, `basic`, `growth`)
- This is the source of truth for plan metadata exposed via API - This is the source of truth for plan metadata exposed via API
## `pub fn list_relays(&self, tenant_id: Option<&str>) -> Result<Vec<Relay>>` ## `pub fn list_relays(&self) -> Result<Vec<Relay>>`
- Returns all matching relays - Returns all relays
## `pub fn list_relays_for_tenant(&self, tenant_id: &str) -> Result<Vec<Relay>>`
- Returns all relays belonging to the given tenant
## `pub fn get_relay(&self, id: &str) -> Result<Relay>` ## `pub fn get_relay(&self, id: &str) -> Result<Relay>`
@@ -82,7 +86,7 @@ Notes:
- Sets relay status to `active` - Sets relay status to `active`
- Logs activity as `(activate_relay, relay_id)` - Logs activity as `(activate_relay, relay_id)`
## `pub fn fail_relay_sync(&self, relay: &Relay, sync_error: String) -> Result<()>` ## `pub fn fail_relay_sync(&self, relay: &Relay, sync_error: &str) -> Result<()>`
- Sets relay status to `inactive`, sets `sync_error` - Sets relay status to `inactive`, sets `sync_error`
- Logs activity as `(fail_relay_sync, relay_id)` - Logs activity as `(fail_relay_sync, relay_id)`
@@ -92,7 +96,11 @@ Notes:
- Saves an `invoice` row and related `invoice_item` rows - Saves an `invoice` row and related `invoice_item` rows
- Logs activity as `(create_invoice, invoice_id)` - Logs activity as `(create_invoice, invoice_id)`
## `pub fn list_invoices(tenant_id: Option<&str>) -> Result<Vec<Invoice>>` ## `pub fn list_invoices() -> Result<Vec<Invoice>>`
- Returns all invoices
## `pub fn list_invoices_for_tenant(tenant_id: &str) -> Result<Vec<Invoice>>`
- Returns all matching invoices - Returns all matching invoices
@@ -103,7 +111,7 @@ Notes:
- Clears `error` if set - Clears `error` if set
- Logs activity as `(mark_invoice_paid, invoice_id)` - Logs activity as `(mark_invoice_paid, invoice_id)`
## `pub fn mark_invoice_attempted(&self, invoice_id: &str, error: Option<&str>) -> Result<()>` ## `pub fn mark_invoice_attempted(&self, invoice_id: &str, error: &str) -> Result<()>`
- Sets `attempted_at` to now - Sets `attempted_at` to now
- Updates `error` if provided - Updates `error` if provided
@@ -122,7 +130,11 @@ Notes:
- Sets `closed_at` to now - Sets `closed_at` to now
- Logs activity as `(mark_invoice_closed, invoice_id)` - Logs activity as `(mark_invoice_closed, invoice_id)`
## `pub fn list_activity(&self, since: &i64, tenant: Option<&str>) -> Result<Vec<Activity>>` ## `pub fn list_activity(&self, since: &i64) -> Result<Vec<Activity>>`
- Returns all activity occuring after `since`
## `pub fn list_activity_for_tenant(&self, tenant: &str, since: &i64) -> Result<Vec<Activity>>`
- Returns all activity occuring after `since` matching `tenant` - Returns all activity occuring after `since` matching `tenant`
+12 -2
View File
@@ -454,7 +454,12 @@ async fn list_relays(
Some(auth.as_str()) Some(auth.as_str())
}; };
match state.api.repo.list_relays(tenant_filter).await { let relays = match tenant_filter {
Some(tenant) => state.api.repo.list_relays_for_tenant(tenant).await,
None => state.api.repo.list_relays().await,
};
match relays {
Ok(relays) => ok(StatusCode::OK, relays), Ok(relays) => ok(StatusCode::OK, relays),
Err(e) => err( Err(e) => err(
StatusCode::INTERNAL_SERVER_ERROR, StatusCode::INTERNAL_SERVER_ERROR,
@@ -747,7 +752,12 @@ async fn list_invoices(
Some(auth.as_str()) Some(auth.as_str())
}; };
match state.api.repo.list_invoices(tenant_filter).await { let invoices = match tenant_filter {
Some(tenant) => state.api.repo.list_invoices_for_tenant(tenant).await,
None => state.api.repo.list_invoices().await,
};
match invoices {
Ok(invoices) => ok(StatusCode::OK, invoices), Ok(invoices) => ok(StatusCode::OK, invoices),
Err(e) => err( Err(e) => err(
StatusCode::INTERNAL_SERVER_ERROR, StatusCode::INTERNAL_SERVER_ERROR,
+5 -5
View File
@@ -40,7 +40,7 @@ impl Billing {
pub async fn tick(&self) -> Result<()> { pub async fn tick(&self) -> Result<()> {
let mut since_guard = self.last_activity_at.lock().await; let mut since_guard = self.last_activity_at.lock().await;
let since = *since_guard; let since = *since_guard;
let activity = self.repo.list_activity(&since, None).await?; let activity = self.repo.list_activity(&since).await?;
for a in &activity { for a in &activity {
if matches!( if matches!(
a.activity_type.as_str(), a.activity_type.as_str(),
@@ -70,7 +70,7 @@ impl Billing {
return Ok(()); return Ok(());
} }
let relays = self.repo.list_relays(Some(&relay.tenant)).await?; let relays = self.repo.list_relays_for_tenant(&relay.tenant).await?;
let paid_active_count = relays let paid_active_count = relays
.into_iter() .into_iter()
.filter(|r| r.status == "active" && r.plan != "free") .filter(|r| r.status == "active" && r.plan != "free")
@@ -95,7 +95,7 @@ impl Billing {
return Ok(()); return Ok(());
} }
let relays = self.repo.list_relays(Some(&tenant.pubkey)).await?; let relays = self.repo.list_relays_for_tenant(&tenant.pubkey).await?;
let active_paid_relays: Vec<Relay> = relays let active_paid_relays: Vec<Relay> = relays
.iter() .iter()
.filter(|r| r.status == "active" && r.plan != "free") .filter(|r| r.status == "active" && r.plan != "free")
@@ -114,7 +114,7 @@ impl Billing {
let usage_events = self let usage_events = self
.repo .repo
.list_activity(&tenant.billing_anchor, Some(&tenant.pubkey)) .list_activity_for_tenant(&tenant.pubkey, &tenant.billing_anchor)
.await?; .await?;
let invoice_id = uuid::Uuid::new_v4().to_string(); let invoice_id = uuid::Uuid::new_v4().to_string();
let mut items = Vec::new(); let mut items = Vec::new();
@@ -176,7 +176,7 @@ impl Billing {
} }
async fn collect_outstanding(&self, tenant: &Tenant) -> Result<()> { async fn collect_outstanding(&self, tenant: &Tenant) -> Result<()> {
let invoices = self.repo.list_invoices(Some(&tenant.pubkey)).await?; let invoices = self.repo.list_invoices_for_tenant(&tenant.pubkey).await?;
let now = now_ts(); let now = now_ts();
for invoice in invoices.into_iter().filter(|i| i.status == "pending") { for invoice in invoices.into_iter().filter(|i| i.status == "pending") {
+1 -1
View File
@@ -45,7 +45,7 @@ impl Infra {
pub async fn tick(&self) -> Result<()> { pub async fn tick(&self) -> Result<()> {
let mut since_guard = self.last_activity_at.lock().await; let mut since_guard = self.last_activity_at.lock().await;
let since = *since_guard; let since = *since_guard;
let activity = self.repo.list_activity(&since, None).await?; let activity = self.repo.list_activity(&since).await?;
for a in activity { for a in activity {
if a.resource_type == "relay" if a.resource_type == "relay"
+76 -73
View File
@@ -160,34 +160,35 @@ impl Repo {
Ok(()) Ok(())
} }
pub async fn list_relays(&self, tenant_id: Option<&str>) -> Result<Vec<Relay>> { pub async fn list_relays(&self) -> Result<Vec<Relay>> {
let rows = if let Some(tenant) = tenant_id { let rows = sqlx::query_as::<_, Relay>(
sqlx::query_as::<_, Relay>( "SELECT id, tenant, schema, subdomain, plan, status, sync_error,
"SELECT id, tenant, schema, subdomain, plan, status, sync_error, info_name, info_icon, info_description,
info_name, info_icon, info_description, policy_public_join, policy_strip_signatures,
policy_public_join, policy_strip_signatures, groups_enabled, management_enabled, blossom_enabled,
groups_enabled, management_enabled, blossom_enabled, livekit_enabled, push_enabled
livekit_enabled, push_enabled FROM relay
FROM relay ORDER BY id",
WHERE tenant = ? )
ORDER BY id", .fetch_all(&self.pool)
) .await?;
.bind(tenant) Ok(rows)
.fetch_all(&self.pool) }
.await?
} else { pub async fn list_relays_for_tenant(&self, tenant_id: &str) -> Result<Vec<Relay>> {
sqlx::query_as::<_, Relay>( let rows = sqlx::query_as::<_, Relay>(
"SELECT id, tenant, schema, subdomain, plan, status, sync_error, "SELECT id, tenant, schema, subdomain, plan, status, sync_error,
info_name, info_icon, info_description, info_name, info_icon, info_description,
policy_public_join, policy_strip_signatures, policy_public_join, policy_strip_signatures,
groups_enabled, management_enabled, blossom_enabled, groups_enabled, management_enabled, blossom_enabled,
livekit_enabled, push_enabled livekit_enabled, push_enabled
FROM relay FROM relay
ORDER BY id", WHERE tenant = ?
) ORDER BY id",
.fetch_all(&self.pool) )
.await? .bind(tenant_id)
}; .fetch_all(&self.pool)
.await?;
Ok(rows) Ok(rows)
} }
@@ -355,28 +356,29 @@ impl Repo {
Ok(()) Ok(())
} }
pub async fn list_invoices(&self, tenant_id: Option<&str>) -> Result<Vec<Invoice>> { pub async fn list_invoices(&self) -> Result<Vec<Invoice>> {
let rows = if let Some(tenant) = tenant_id { let rows = sqlx::query_as::<_, Invoice>(
sqlx::query_as::<_, Invoice>( "SELECT id, tenant, status, created_at, attempted_at, error, closed_at,
"SELECT id, tenant, status, created_at, attempted_at, error, closed_at, sent_at, paid_at, bolt11, period_start, period_end
sent_at, paid_at, bolt11, period_start, period_end FROM invoice
FROM invoice ORDER BY created_at DESC",
WHERE tenant = ? )
ORDER BY created_at DESC", .fetch_all(&self.pool)
) .await?;
.bind(tenant) Ok(rows)
.fetch_all(&self.pool) }
.await?
} else { pub async fn list_invoices_for_tenant(&self, tenant_id: &str) -> Result<Vec<Invoice>> {
sqlx::query_as::<_, Invoice>( let rows = sqlx::query_as::<_, Invoice>(
"SELECT id, tenant, status, created_at, attempted_at, error, closed_at, "SELECT id, tenant, status, created_at, attempted_at, error, closed_at,
sent_at, paid_at, bolt11, period_start, period_end sent_at, paid_at, bolt11, period_start, period_end
FROM invoice FROM invoice
ORDER BY created_at DESC", WHERE tenant = ?
) ORDER BY created_at DESC",
.fetch_all(&self.pool) )
.await? .bind(tenant_id)
}; .fetch_all(&self.pool)
.await?;
Ok(rows) Ok(rows)
} }
@@ -453,29 +455,30 @@ impl Repo {
Ok(()) Ok(())
} }
pub async fn list_activity(&self, since: &i64, tenant: Option<&str>) -> Result<Vec<Activity>> { pub async fn list_activity(&self, since: &i64) -> Result<Vec<Activity>> {
let rows = if let Some(tenant_pubkey) = tenant { let rows = sqlx::query_as::<_, Activity>(
sqlx::query_as::<_, Activity>( "SELECT id, tenant, created_at, activity_type, resource_type, resource_id
"SELECT id, tenant, created_at, activity_type, resource_type, resource_id FROM activity
FROM activity WHERE created_at > ?
WHERE created_at > ? AND tenant = ? ORDER BY created_at, id",
ORDER BY created_at, id", )
) .bind(since)
.bind(since) .fetch_all(&self.pool)
.bind(tenant_pubkey) .await?;
.fetch_all(&self.pool) Ok(rows)
.await? }
} else {
sqlx::query_as::<_, Activity>( pub async fn list_activity_for_tenant(&self, tenant: &str, since: &i64) -> Result<Vec<Activity>> {
"SELECT id, tenant, created_at, activity_type, resource_type, resource_id let rows = sqlx::query_as::<_, Activity>(
FROM activity "SELECT id, tenant, created_at, activity_type, resource_type, resource_id
WHERE created_at > ? FROM activity
ORDER BY created_at, id", WHERE created_at > ? AND tenant = ?
) ORDER BY created_at, id",
.bind(since) )
.fetch_all(&self.pool) .bind(since)
.await? .bind(tenant)
}; .fetch_all(&self.pool)
.await?;
Ok(rows) Ok(rows)
} }