refactor query

This commit is contained in:
Jon Staab
2026-05-19 17:04:10 -07:00
parent 7134915665
commit dde4b981b2
10 changed files with 130 additions and 216 deletions
+36
View File
@@ -39,3 +39,39 @@ CREATE TABLE IF NOT EXISTS relay (
push_enabled INTEGER NOT NULL DEFAULT 1,
FOREIGN KEY (tenant) REFERENCES tenant(pubkey)
);
CREATE TABLE IF NOT EXISTS invoice_nwc_payment (
invoice_id TEXT PRIMARY KEY,
tenant_pubkey TEXT NOT NULL,
state TEXT NOT NULL CHECK (state IN ('pending', 'paid')),
created_at INTEGER NOT NULL,
updated_at INTEGER NOT NULL,
FOREIGN KEY (tenant_pubkey) REFERENCES tenant(pubkey)
);
CREATE TABLE IF NOT EXISTS invoice_manual_lightning_payment (
invoice_id TEXT PRIMARY KEY,
tenant_pubkey TEXT NOT NULL,
bolt11 TEXT NOT NULL,
created_at INTEGER NOT NULL,
updated_at INTEGER NOT NULL,
FOREIGN KEY (tenant_pubkey) REFERENCES tenant(pubkey)
);
CREATE INDEX IF NOT EXISTS idx_tenant_stripe_customer_id
ON tenant (stripe_customer_id);
CREATE INDEX IF NOT EXISTS idx_relay_tenant_id
ON relay (tenant, id);
CREATE INDEX IF NOT EXISTS idx_relay_tenant_status_plan
ON relay (tenant, status, plan);
CREATE INDEX IF NOT EXISTS idx_activity_resource_type_resource_id_created_at_id
ON activity (resource_type, resource_id, created_at DESC, id DESC);
CREATE INDEX IF NOT EXISTS idx_invoice_nwc_payment_tenant_pubkey
ON invoice_nwc_payment (tenant_pubkey);
CREATE INDEX IF NOT EXISTS idx_invoice_manual_lightning_payment_tenant_pubkey
ON invoice_manual_lightning_payment (tenant_pubkey);
@@ -1,11 +0,0 @@
CREATE INDEX IF NOT EXISTS idx_tenant_stripe_customer_id
ON tenant (stripe_customer_id);
CREATE INDEX IF NOT EXISTS idx_relay_tenant_id
ON relay (tenant, id);
CREATE INDEX IF NOT EXISTS idx_relay_tenant_status_plan
ON relay (tenant, status, plan);
CREATE INDEX IF NOT EXISTS idx_activity_resource_type_resource_id_created_at_id
ON activity (resource_type, resource_id, created_at DESC, id DESC);
@@ -1,11 +0,0 @@
CREATE TABLE IF NOT EXISTS invoice_nwc_payment (
invoice_id TEXT PRIMARY KEY,
tenant_pubkey TEXT NOT NULL,
state TEXT NOT NULL CHECK (state IN ('pending', 'paid')),
created_at INTEGER NOT NULL,
updated_at INTEGER NOT NULL,
FOREIGN KEY (tenant_pubkey) REFERENCES tenant(pubkey)
);
CREATE INDEX IF NOT EXISTS idx_invoice_nwc_payment_tenant_pubkey
ON invoice_nwc_payment (tenant_pubkey);
@@ -1,11 +0,0 @@
CREATE TABLE IF NOT EXISTS invoice_manual_lightning_payment (
invoice_id TEXT PRIMARY KEY,
tenant_pubkey TEXT NOT NULL,
bolt11 TEXT NOT NULL,
created_at INTEGER NOT NULL,
updated_at INTEGER NOT NULL,
FOREIGN KEY (tenant_pubkey) REFERENCES tenant(pubkey)
);
CREATE INDEX IF NOT EXISTS idx_invoice_manual_lightning_payment_tenant_pubkey
ON invoice_manual_lightning_payment (tenant_pubkey);
+1 -1
View File
@@ -136,7 +136,7 @@ Notes:
- Serves `GET /relays/:id/activity`
- Authorizes admin or relay owner
- Get activity from `query.list_activity_for_relay`
- Get activity from `query.list_activity_for_resource`
- Return `data` is `{activity}`
## `async fn deactivate_relay(...) -> Response`
+1 -1
View File
@@ -39,7 +39,7 @@ Members:
- Returns the tenant matching the given `stripe_customer_id`
## `pub fn list_activity_for_relay(&self, relay_id: &str) -> Result<Vec<Activity>>`
## `pub fn list_activity_for_resource(&self, relay_id: &str) -> Result<Vec<Activity>>`
- Returns all activity where `resource_type = 'relay'` and `resource_id = relay_id`
- Ordered newest-first
+3 -62
View File
@@ -454,7 +454,7 @@ impl Billing {
let relays = self.query.list_relays_for_tenant(&tenant.pubkey).await?;
for relay in relays {
if Self::should_reactivate_after_payment(&relay) {
if relay.status == RELAY_STATUS_DELINQUENT && self.query.is_paid_plan(&relay.plan) {
self.command.activate_relay(&relay).await?;
}
}
@@ -508,7 +508,7 @@ impl Billing {
let relays = self.query.list_relays_for_tenant(&tenant.pubkey).await?;
for relay in relays {
if relay.status == RELAY_STATUS_ACTIVE && Query::is_paid_plan(&relay.plan) {
if relay.status == RELAY_STATUS_ACTIVE && self.query.is_paid_plan(&relay.plan) {
self.command.mark_relay_delinquent(&relay).await?;
}
}
@@ -543,7 +543,7 @@ impl Billing {
let relays = self.query.list_relays_for_tenant(&tenant.pubkey).await?;
for relay in relays {
if relay.status == RELAY_STATUS_ACTIVE && Query::is_paid_plan(&relay.plan) {
if relay.status == RELAY_STATUS_ACTIVE && self.query.is_paid_plan(&relay.plan) {
self.command.mark_relay_delinquent(&relay).await?;
}
}
@@ -962,10 +962,6 @@ impl Billing {
))),
}
}
fn should_reactivate_after_payment(relay: &Relay) -> bool {
relay.status == RELAY_STATUS_DELINQUENT && Query::is_paid_plan(&relay.plan)
}
}
fn summarize_nwc_error_for_dm(error: &str) -> Option<String> {
@@ -992,58 +988,3 @@ fn manual_lightning_payment_dm(nwc_error: Option<&str>) -> String {
_ => MANUAL_LIGHTNING_PAYMENT_DM.to_string(),
}
}
#[cfg(test)]
mod tests {
use super::Billing;
use crate::models::{
RELAY_STATUS_ACTIVE, RELAY_STATUS_DELINQUENT, RELAY_STATUS_INACTIVE, Relay,
};
fn relay_fixture(status: &str, plan: &str) -> Relay {
Relay {
id: "relay-1".to_string(),
tenant: "tenant-1".to_string(),
schema: "tenant_1".to_string(),
subdomain: "relay-1".to_string(),
plan: plan.to_string(),
stripe_subscription_item_id: None,
status: status.to_string(),
sync_error: String::new(),
info_name: String::new(),
info_icon: String::new(),
info_description: String::new(),
policy_public_join: 0,
policy_strip_signatures: 0,
groups_enabled: 1,
management_enabled: 1,
blossom_enabled: 1,
livekit_enabled: 1,
push_enabled: 1,
synced: 1,
}
}
#[test]
fn reactivates_only_delinquent_paid_relays_after_payment() {
let delinquent_paid = relay_fixture(RELAY_STATUS_DELINQUENT, "basic");
assert!(Billing::should_reactivate_after_payment(&delinquent_paid));
let manually_inactive_paid = relay_fixture(RELAY_STATUS_INACTIVE, "basic");
assert!(!Billing::should_reactivate_after_payment(
&manually_inactive_paid
));
let free_delinquent = relay_fixture(RELAY_STATUS_DELINQUENT, "free");
assert!(!Billing::should_reactivate_after_payment(&free_delinquent));
let active_paid = relay_fixture(RELAY_STATUS_ACTIVE, "basic");
assert!(!Billing::should_reactivate_after_payment(&active_paid));
let unknown_status_paid = relay_fixture("suspended", "basic");
assert!(!Billing::should_reactivate_after_payment(
&unknown_status_paid
));
}
}
+6 -2
View File
@@ -112,7 +112,7 @@ impl Infra {
Some(Duration::from_secs(delay_secs))
}
let activities = self.query.list_activity_for_relay(relay_id).await?;
let activities = self.query.list_activity_for_resource(relay_id).await?;
let consecutive_failures = activities
.iter()
.take_while(|activity| activity.activity_type == "fail_relay_sync")
@@ -177,7 +177,11 @@ impl Infra {
// otherwise check the activity history so that a re-sync after an update
// (which resets `synced` to 0) PATCHes instead of clobbering the secret.
let is_new = relay.synced != 1
&& !self.query.relay_has_completed_sync(&relay.id).await?;
&& self
.query
.get_latest_activity_for_resource_and_type(&relay.id, "complete_relay_sync")
.await?
.is_none();
let mut body = serde_json::json!({
"host": format!("{}.{}", relay.subdomain, self.env.relay_domain),
+82 -116
View File
@@ -4,6 +4,18 @@ use sqlx::SqlitePool;
use crate::env::Env;
use crate::models::{Activity, Plan, Relay, Tenant};
fn select_tenant(tail: &str) -> String {
format!("SELECT * FROM tenant {tail}")
}
fn select_relay(tail: &str) -> String {
format!("SELECT * FROM relay {tail}")
}
fn select_activity(tail: &str) -> String {
format!("SELECT * FROM activity {tail}")
}
#[derive(Clone)]
pub struct Query {
pool: SqlitePool,
@@ -18,28 +30,7 @@ impl Query {
}
}
pub async fn list_tenants(&self) -> Result<Vec<Tenant>> {
let rows = sqlx::query_as::<_, Tenant>(
"SELECT pubkey, nwc_url, nwc_error, created_at, stripe_customer_id, stripe_subscription_id, past_due_at
FROM tenant
ORDER BY pubkey",
)
.fetch_all(&self.pool)
.await?;
Ok(rows)
}
pub async fn get_tenant(&self, pubkey: &str) -> Result<Option<Tenant>> {
let row = sqlx::query_as::<_, Tenant>(
"SELECT pubkey, nwc_url, nwc_error, created_at, stripe_customer_id, stripe_subscription_id, past_due_at
FROM tenant
WHERE pubkey = ?",
)
.bind(pubkey)
.fetch_optional(&self.pool)
.await?;
Ok(row)
}
// Plans
pub fn list_plans(&self) -> Vec<Plan> {
vec![
@@ -77,77 +68,24 @@ impl Query {
self.list_plans().into_iter().find(|p| p.id == plan_id)
}
/// True for any plan that costs money. Doesn't require an instance because
/// the answer doesn't depend on Stripe price ids — only the canonical plan id.
pub fn is_paid_plan(plan_id: &str) -> bool {
matches!(plan_id, "basic" | "growth")
pub fn is_paid_plan(&self, plan_id: &str) -> bool {
self.get_plan(plan_id).is_some_and(|p| p.amount > 0)
}
pub async fn list_relays(&self) -> Result<Vec<Relay>> {
let rows = sqlx::query_as::<_, Relay>(
"SELECT id, tenant, schema, subdomain, plan, stripe_subscription_item_id,
status, sync_error,
info_name, info_icon, info_description,
policy_public_join, policy_strip_signatures,
groups_enabled, management_enabled, blossom_enabled,
livekit_enabled, push_enabled, synced
FROM relay
ORDER BY id",
)
.fetch_all(&self.pool)
.await?;
// Tenants
pub async fn list_tenants(&self) -> Result<Vec<Tenant>> {
let rows = sqlx::query_as::<_, Tenant>(&select_tenant(""))
.fetch_all(&self.pool)
.await?;
Ok(rows)
}
pub async fn list_relays_pending_sync(&self) -> Result<Vec<Relay>> {
let rows = sqlx::query_as::<_, Relay>(
"SELECT id, tenant, schema, subdomain, plan, stripe_subscription_item_id,
status, sync_error,
info_name, info_icon, info_description,
policy_public_join, policy_strip_signatures,
groups_enabled, management_enabled, blossom_enabled,
livekit_enabled, push_enabled, synced
FROM relay
WHERE synced = 0 OR TRIM(sync_error) != ''
ORDER BY id",
)
.fetch_all(&self.pool)
.await?;
Ok(rows)
}
pub async fn list_relays_for_tenant(&self, tenant_id: &str) -> Result<Vec<Relay>> {
let rows = sqlx::query_as::<_, Relay>(
"SELECT id, tenant, schema, subdomain, plan, stripe_subscription_item_id,
status, sync_error,
info_name, info_icon, info_description,
policy_public_join, policy_strip_signatures,
groups_enabled, management_enabled, blossom_enabled,
livekit_enabled, push_enabled, synced
FROM relay
WHERE tenant = ?
ORDER BY id",
)
.bind(tenant_id)
.fetch_all(&self.pool)
.await?;
Ok(rows)
}
pub async fn get_relay(&self, id: &str) -> Result<Option<Relay>> {
let row = sqlx::query_as::<_, Relay>(
"SELECT id, tenant, schema, subdomain, plan, stripe_subscription_item_id,
status, sync_error,
info_name, info_icon, info_description,
policy_public_join, policy_strip_signatures,
groups_enabled, management_enabled, blossom_enabled,
livekit_enabled, push_enabled, synced
FROM relay
WHERE id = ?",
)
.bind(id)
.fetch_optional(&self.pool)
.await?;
pub async fn get_tenant(&self, pubkey: &str) -> Result<Option<Tenant>> {
let row = sqlx::query_as::<_, Tenant>(&select_tenant("WHERE pubkey = ?"))
.bind(pubkey)
.fetch_optional(&self.pool)
.await?;
Ok(row)
}
@@ -155,17 +93,49 @@ impl Query {
&self,
stripe_customer_id: &str,
) -> Result<Option<Tenant>> {
let row = sqlx::query_as::<_, Tenant>(
"SELECT pubkey, nwc_url, nwc_error, created_at, stripe_customer_id, stripe_subscription_id, past_due_at
FROM tenant
WHERE stripe_customer_id = ?",
)
.bind(stripe_customer_id)
.fetch_optional(&self.pool)
.await?;
let row = sqlx::query_as::<_, Tenant>(&select_tenant("WHERE stripe_customer_id = ?"))
.bind(stripe_customer_id)
.fetch_optional(&self.pool)
.await?;
Ok(row)
}
// Relays
pub async fn list_relays(&self) -> Result<Vec<Relay>> {
let rows = sqlx::query_as::<_, Relay>(&select_relay(""))
.fetch_all(&self.pool)
.await?;
Ok(rows)
}
pub async fn list_relays_pending_sync(&self) -> Result<Vec<Relay>> {
let rows = sqlx::query_as::<_, Relay>(&select_relay(
"WHERE synced = 0 OR TRIM(sync_error) != ''",
))
.fetch_all(&self.pool)
.await?;
Ok(rows)
}
pub async fn list_relays_for_tenant(&self, tenant_id: &str) -> Result<Vec<Relay>> {
let rows = sqlx::query_as::<_, Relay>(&select_relay("WHERE tenant = ?"))
.bind(tenant_id)
.fetch_all(&self.pool)
.await?;
Ok(rows)
}
pub async fn get_relay(&self, id: &str) -> Result<Option<Relay>> {
let row = sqlx::query_as::<_, Relay>(&select_relay("WHERE id = ?"))
.bind(id)
.fetch_optional(&self.pool)
.await?;
Ok(row)
}
// Invoice state
pub async fn get_invoice_nwc_payment_state(&self, invoice_id: &str) -> Result<Option<String>> {
let state = sqlx::query_scalar::<_, String>(
"SELECT state FROM invoice_nwc_payment WHERE invoice_id = ?",
@@ -189,32 +159,28 @@ impl Query {
Ok(bolt11)
}
pub async fn list_activity_for_relay(&self, relay_id: &str) -> Result<Vec<Activity>> {
let rows = sqlx::query_as::<_, Activity>(
"SELECT id, tenant, created_at, activity_type, resource_type, resource_id
FROM activity
WHERE resource_type = 'relay' AND resource_id = ?
ORDER BY created_at DESC, id DESC",
)
.bind(relay_id)
// Activity
pub async fn list_activity_for_resource(&self, resource_id: &str) -> Result<Vec<Activity>> {
let rows = sqlx::query_as::<_, Activity>(&select_activity("WHERE resource_id = ? ORDER BY created_at DESC"))
.bind(resource_id)
.fetch_all(&self.pool)
.await?;
Ok(rows)
}
pub async fn relay_has_completed_sync(&self, relay_id: &str) -> Result<bool> {
let found = sqlx::query_scalar::<_, i64>(
"SELECT 1
FROM activity
WHERE resource_type = 'relay'
AND resource_id = ?
AND activity_type = 'complete_relay_sync'
LIMIT 1",
)
.bind(relay_id)
pub async fn get_latest_activity_for_resource_and_type(
&self,
resource_id: &str,
activity_type: &str,
) -> Result<Option<Activity>> {
let row = sqlx::query_as::<_, Activity>(&select_activity(
"WHERE resource_id = ? AND activity_type = ? ORDER BY created_at DESC LIMIT 1",
))
.bind(resource_id)
.bind(activity_type)
.fetch_optional(&self.pool)
.await?;
Ok(found.is_some())
Ok(row)
}
}
+1 -1
View File
@@ -80,7 +80,7 @@ pub async fn list_relay_activity(
let activity = api
.query
.list_activity_for_relay(&id)
.list_activity_for_resource(&id)
.await
.map_err(internal)?;
ok(serde_json::json!({ "activity": activity }))