Massive user-story-oriented refactor

This commit is contained in:
Jon Staab
2026-06-01 10:24:21 -07:00
parent 0018a5d4f3
commit f5403b6aef
28 changed files with 971 additions and 428 deletions
+10 -8
View File
@@ -18,8 +18,7 @@ use std::sync::Arc;
use anyhow::{Result, anyhow, ensure};
use axum::{
Router,
async_trait,
Router, async_trait,
extract::FromRequestParts,
http::{HeaderMap, request::Parts},
routing::{get, post},
@@ -32,9 +31,8 @@ use crate::env;
use crate::models::{Relay, Tenant};
use crate::query;
use crate::robot::Robot;
use crate::stripe::Stripe;
use crate::routes::identity::get_identity;
use crate::routes::invoices::{get_invoice, get_invoice_bolt11, get_tenant_latest_invoice};
use crate::routes::invoices::{get_invoice, get_invoice_bolt11, list_invoice_items};
use crate::routes::plans::{get_plan, list_plans};
use crate::routes::relays::{
create_relay, deactivate_relay, get_relay, list_relay_activity, list_relay_members,
@@ -44,6 +42,7 @@ use crate::routes::tenants::{
create_stripe_session, create_tenant, get_tenant, list_tenant_invoices, list_tenant_relays,
list_tenants, update_tenant,
};
use crate::stripe::Stripe;
use crate::web::{ApiError, forbidden, internal, not_found, unauthorized};
#[derive(Clone)]
@@ -74,10 +73,9 @@ impl Api {
.route("/tenants/:pubkey/relays", get(list_tenant_relays))
.route("/tenants/:pubkey/invoices", get(list_tenant_invoices))
.route(
"/tenants/:pubkey/invoices/latest",
get(get_tenant_latest_invoice),
"/tenants/:pubkey/stripe/session",
get(create_stripe_session),
)
.route("/tenants/:pubkey/stripe/session", get(create_stripe_session))
.route("/relays", get(list_relays).post(create_relay))
.route("/relays/:id", get(get_relay).put(update_relay))
.route("/relays/:id/members", get(list_relay_members))
@@ -86,6 +84,7 @@ impl Api {
.route("/relays/:id/reactivate", post(reactivate_relay))
.route("/invoices/:id", get(get_invoice))
.route("/invoices/:id/bolt11", get(get_invoice_bolt11))
.route("/invoices/:id/items", get(list_invoice_items))
.with_state(api)
}
@@ -185,7 +184,10 @@ impl Api {
.last()
.ok_or_else(|| anyhow!("missing u tag"))?;
ensure!(got_u == env::get().server_host, "authorization host mismatch");
ensure!(
got_u == env::get().server_host,
"authorization host mismatch"
);
Ok(event.pubkey.to_hex())
}
+110 -66
View File
@@ -14,9 +14,12 @@ use crate::wallet::Wallet;
const POLL_INTERVAL: Duration = Duration::from_secs(60 * 60);
const GRACE_PERIOD_SECS: i64 = 7 * 24 * 60 * 60;
/// Hold the manual-payment DM until an open invoice is at least this old. A freshly
/// issued invoice is surfaced to the tenant in-app first (e.g. right after they
/// create a relay), so we don't also nag by DM on the first dunning cycles.
const FRESH_INVOICE_DM_GRACE_SECS: i64 = 24 * 60 * 60;
const MANUAL_PAYMENT_DM: &str = "Payment is due for your relay subscription. Open the link below to review the invoice and pay by Lightning or card:";
const USER_ERROR_PREFIX: &str = "Auto-payment failed:";
const USER_ERROR_MAX_CHARS: usize = 240;
const CHURN_DM: &str = "Your relay subscription is past due, so your relays have been paused. You can restore service any time by adding a payment method or paying an invoice from your dashboard:";
/// Owns subscription billing: it reconciles tenant activity into invoice items,
/// renews subscriptions each period, and collects payment (Lightning, then a
@@ -54,10 +57,13 @@ impl Billing {
async fn reconcile_subscriptions(&self) -> Result<()> {
let tenants = query::list_tenants().await?;
tracing::info!(tenant_count = tenants.len(), "reconciling all subscriptions");
tracing::info!(
tenant_count = tenants.len(),
"reconciling all subscriptions"
);
for tenant in tenants {
if let Err(error) = self.reconcile_subscription(&tenant).await {
if let Err(error) = self.reconcile_subscription(&tenant, true).await {
tracing::error!(
tenant = %tenant.pubkey,
error = ?error,
@@ -73,10 +79,13 @@ impl Billing {
/// Reconciles a tenant's billing: re-activates them if a churned tenant has
/// re-engaged, folds billable activity into line items (setting the billing
/// anchor on the first), renews the current period if due, claims outstanding
/// items onto an invoice, and then collects every open invoice — churning the
/// tenant if one has gone unpaid past the grace period.
pub async fn reconcile_subscription(&self, tenant: &Tenant) -> Result<()> {
/// anchor based on the first one), renews the current period if due, and claims
/// outstanding items onto an invoice.
pub async fn reconcile_subscription(
&self,
tenant: &Tenant,
attempt_payment: bool,
) -> Result<()> {
let mut tenant = tenant.clone();
let activities = query::list_billable_activity(&tenant.pubkey).await?;
@@ -110,9 +119,11 @@ impl Billing {
command::create_invoice(&tenant, &period).await?;
}
// Retry payment on every open invoice (this also pays one just created),
// churning the tenant if the oldest has aged past the grace period.
self.collect_open_invoices(&tenant).await?;
// Attempt payment on every open invoice after syncing with stripe.
if attempt_payment {
self.sync_stripe_customer(&mut tenant).await?;
self.collect_open_invoices(&tenant).await?;
}
Ok(())
}
@@ -140,7 +151,7 @@ impl Billing {
};
match invoice_item {
Some(ref item) => command::insert_invoice_item_for_activity(&item, &activity.id).await,
Some(ref item) => command::insert_invoice_item_for_activity(item, &activity.id).await,
None => command::mark_activity_billed(&activity.id).await,
}
}
@@ -242,7 +253,11 @@ impl Billing {
else {
continue;
};
let Snapshot::Relay { plan: plan_id, status, .. } = &*activity.snapshot;
let Snapshot::Relay {
plan: plan_id,
status,
..
} = &*activity.snapshot;
if status != RELAY_STATUS_ACTIVE {
continue;
}
@@ -270,6 +285,37 @@ impl Billing {
// --- Payments ---
/// Dunning pass over a tenant's open invoices: if the oldest has been unpaid
/// past the grace period, churn the tenant; otherwise retry payment on each.
async fn collect_open_invoices(&self, tenant: &Tenant) -> Result<()> {
let open = query::list_open_invoices(&tenant.pubkey).await?;
let Some(oldest) = open.first() else {
return Ok(());
};
let now = chrono::Utc::now().timestamp();
if now - oldest.created_at >= GRACE_PERIOD_SECS {
if tenant.churned_at.is_none() {
let relays = query::list_relays_for_tenant(&tenant.pubkey).await?;
command::churn_tenant(&tenant.pubkey, now, &relays).await?;
// Notify the tenant once, on the transition into churn (the guard
// above fires this a single time). Log-and-continue on failure.
let message = format!("{CHURN_DM}\n\n{}/account", env::get().app_url);
if let Err(e) = self.robot.send_dm(&tenant.pubkey, &message).await {
tracing::error!(tenant = %tenant.pubkey, error = %e, "failed to send churn DM");
}
}
return Ok(());
}
for invoice in &open {
self.attempt_payment(tenant, invoice).await?;
}
Ok(())
}
/// Collect an invoice via NWC, then a saved card, then a manual DM. A failing
/// method's error is stored on the tenant (to warn them in the UI) but never
/// aborts the cascade or future retries; a method's error is cleared when it
@@ -294,12 +340,11 @@ impl Billing {
return Ok(());
}
// 3. Payment method on file: if the tenant has one saved, charge it via Stripe.
if let Some(payment_method) =
self.stripe.get_saved_payment_method(&tenant.stripe_customer_id).await?
{
// 3. Payment method on file: charge the tenant's cached Stripe payment
// method, kept fresh by sync_stripe_payment_method before collection.
if let Some(payment_method) = &tenant.stripe_payment_method_id {
match self
.attempt_payment_using_stripe(tenant, invoice, &payment_method)
.attempt_payment_using_stripe(tenant, invoice, payment_method)
.await
{
Ok(()) => return Ok(()),
@@ -308,8 +353,10 @@ impl Billing {
}
// 4. Manual payment: DM a link to the in-app payment page for this invoice.
let summary = error_message.as_deref().and_then(summarize_error_message);
if let Err(e) = self.attempt_payment_using_dm(tenant, invoice, summary).await {
if let Err(e) = self
.attempt_payment_using_dm(tenant, invoice, error_message)
.await
{
tracing::error!(
tenant = %tenant.pubkey,
error = %e,
@@ -375,15 +422,23 @@ impl Billing {
&self,
tenant: &Tenant,
invoice: &Invoice,
error_message: Option<String>,
error: Option<String>,
) -> Result<()> {
// If the invoice was just generated, give the user a chance to check the dashboard before nagging them.
let now = chrono::Utc::now().timestamp();
if now - invoice.created_at < FRESH_INVOICE_DM_GRACE_SECS {
return Ok(());
}
let invoice_id = &invoice.id;
let url_base = &env::get().app_url;
let payment_url = format!("{url_base}/account?invoice={invoice_id}");
let base = format!("{MANUAL_PAYMENT_DM}\n\n{payment_url}");
let dm_message = match error_message {
Some(error_message) if !error_message.is_empty() => {
format!("{base}\n\n{USER_ERROR_PREFIX} {error_message}")
let dm_message = match error {
Some(error) if !error.is_empty() => {
let limit: usize = 240;
let summary = error.chars().take(limit.saturating_sub(3)).collect::<String>();
format!("{base}\n\nAuto-payment failed: {summary}")
}
_ => base,
};
@@ -391,32 +446,6 @@ impl Billing {
self.robot.send_dm(&tenant.pubkey, &dm_message).await
}
// --- Dunning ---
/// Dunning pass over a tenant's open invoices: if the oldest has been unpaid
/// past the grace period, churn the tenant; otherwise retry payment on each.
async fn collect_open_invoices(&self, tenant: &Tenant) -> Result<()> {
let open = query::list_open_invoices(&tenant.pubkey).await?;
let Some(oldest) = open.first() else {
return Ok(());
};
let now = chrono::Utc::now().timestamp();
if now - oldest.created_at >= GRACE_PERIOD_SECS {
if tenant.churned_at.is_none() {
let relays = query::list_relays_for_tenant(&tenant.pubkey).await?;
command::churn_tenant(&tenant.pubkey, now, &relays).await?;
}
return Ok(());
}
for invoice in &open {
self.attempt_payment(tenant, invoice).await?;
}
Ok(())
}
// --- Bolt11 utils ---
pub async fn ensure_bolt11(&self, invoice: &Invoice) -> Result<Bolt11> {
@@ -455,6 +484,37 @@ impl Billing {
Ok(bolt11)
}
}
// --- Stripe utils ---
/// Copy down any stripe-related stuff to our local tenant model. Fail gracefully.
pub async fn sync_stripe_customer(&self, tenant: &mut Tenant) -> Result<()> {
match self.sync_stripe_payment_method(tenant).await {
Ok(payment_method_id) => {
tenant.stripe_payment_method_id = payment_method_id;
}
Err(error) => {
tracing::error!(tenant = %tenant.pubkey, error = ?error, "failed to sync payment method");
}
};
Ok(())
}
/// Refresh the cached Stripe payment method from Stripe so collection can charge
/// it directly and the UI reflects cards added via the portal.
async fn sync_stripe_payment_method(&self, tenant: &Tenant) -> Result<Option<String>> {
let payment_method_id = self
.stripe
.get_saved_payment_method(&tenant.stripe_customer_id)
.await?;
if payment_method_id != tenant.stripe_payment_method_id {
command::set_tenant_stripe_payment_method(&tenant.pubkey, &payment_method_id).await?;
}
Ok(payment_method_id)
}
}
/// One tenant's monthly billing period containing some timestamp, anchored at
@@ -517,19 +577,3 @@ impl BillingPeriod {
(amount as f64 * self.fraction_remaining(at)).round() as i64
}
}
fn summarize_error_message(error: &str) -> Option<String> {
let normalized = error.split_whitespace().collect::<Vec<_>>().join(" ");
if normalized.is_empty() {
return None;
}
if normalized.chars().count() <= USER_ERROR_MAX_CHARS {
return Some(normalized);
}
let prefix_len = USER_ERROR_MAX_CHARS.saturating_sub(3);
let mut truncated = normalized.chars().take(prefix_len).collect::<String>();
truncated.push_str("...");
Some(truncated)
}
+1 -2
View File
@@ -27,8 +27,7 @@ pub async fn get_bitcoin_price(currency: &str) -> Result<f64> {
let resp = http.get(url).send().await?;
let body: CoinbaseSpotPriceResponse = resp.error_for_status()?.json().await?;
body
.data
body.data
.amount
.parse::<f64>()
.map_err(|e| anyhow!("invalid BTC spot quote for {currency}: {e}"))
+57 -21
View File
@@ -25,8 +25,10 @@ pub async fn create_tenant(tenant: &Tenant) -> Result<()> {
Ok(())
}
/// Update a tenant's NWC credentials, clearing any stored NWC error so a fresh
/// wallet starts from a clean slate (it re-errors on the next charge if invalid).
pub async fn update_tenant(tenant: &Tenant) -> Result<()> {
sqlx::query("UPDATE tenant SET nwc_url = ? WHERE pubkey = ?")
sqlx::query("UPDATE tenant SET nwc_url = ?, nwc_error = NULL WHERE pubkey = ?")
.bind(&tenant.nwc_url)
.bind(&tenant.pubkey)
.execute(pool())
@@ -34,6 +36,24 @@ pub async fn update_tenant(tenant: &Tenant) -> Result<()> {
Ok(())
}
/// Cache the tenant's Stripe payment method id (or clear it with `None`) and clear
/// any stored Stripe error. Called when a card is (re)attached via the portal or
/// detected during reconciliation, so collection can charge it directly and the UI
/// reflects the change.
pub async fn set_tenant_stripe_payment_method(
pubkey: &str,
payment_method_id: &Option<String>,
) -> Result<()> {
sqlx::query(
"UPDATE tenant SET stripe_payment_method_id = ?, stripe_error = NULL WHERE pubkey = ?",
)
.bind(payment_method_id)
.bind(pubkey)
.execute(pool())
.await?;
Ok(())
}
pub async fn set_tenant_billing_anchor(tenant: &Tenant) -> Result<()> {
sqlx::query("UPDATE tenant SET billing_anchor = ? WHERE pubkey = ?")
.bind(tenant.billing_anchor)
@@ -70,9 +90,13 @@ pub async fn churn_tenant(tenant_pubkey: &str, now: i64, relays: &[Relay]) -> Re
let mut activities = Vec::new();
for relay in relays {
if relay.status == RELAY_STATUS_ACTIVE {
let activity =
set_relay_status_tx(tx, relay, RELAY_STATUS_DELINQUENT, "mark_relay_delinquent")
.await?;
let activity = set_relay_status_tx(
tx,
relay,
RELAY_STATUS_DELINQUENT,
"mark_relay_delinquent",
)
.await?;
activities.push(activity);
}
}
@@ -252,7 +276,10 @@ pub async fn complete_relay_sync(relay: &Relay) -> Result<()> {
/// Persist a reconciled activity's line item and mark the activity billed in one
/// transaction, so a recovery pass never re-bills it.
pub async fn insert_invoice_item_for_activity(invoice_item: &InvoiceItem, activity_id: &str) -> Result<()> {
pub async fn insert_invoice_item_for_activity(
invoice_item: &InvoiceItem,
activity_id: &str,
) -> Result<()> {
let now = chrono::Utc::now().timestamp();
with_tx(async |tx| {
@@ -282,12 +309,11 @@ pub async fn insert_invoice_items_for_renewal(
with_tx(async |tx| {
// Re-read the marker inside the transaction so the guard and the writes
// commit together — this ensures idempotency so we don't double-invoice.
let renewed_at = sqlx::query_scalar::<_, Option<i64>>(
"SELECT renewed_at FROM tenant WHERE pubkey = ?",
)
.bind(tenant_pubkey)
.fetch_one(&mut **tx)
.await?;
let renewed_at =
sqlx::query_scalar::<_, Option<i64>>("SELECT renewed_at FROM tenant WHERE pubkey = ?")
.bind(tenant_pubkey)
.fetch_one(&mut **tx)
.await?;
if renewed_at.is_some_and(|at| at >= period.start) {
return Ok(());
@@ -339,7 +365,7 @@ pub async fn create_invoice(tenant: &Tenant, period: &BillingPeriod) -> Result<O
return Ok(None);
}
let invoice = insert_invoice_tx(tx, &tenant, &period, total).await?;
let invoice = insert_invoice_tx(tx, tenant, period, total).await?;
sqlx::query(
"UPDATE invoice_item SET invoice_id = ?
@@ -485,14 +511,17 @@ async fn insert_invoice_tx(
.bind(invoice_id)
.bind(&tenant.pubkey)
.bind(amount)
.bind(&period.start)
.bind(period.start)
.bind(period.end)
.bind(now)
.fetch_one(&mut **tx)
.await?)
}
async fn insert_invoice_item_tx(tx: &mut Transaction<'_, Sqlite>, item: &InvoiceItem) -> Result<()> {
async fn insert_invoice_item_tx(
tx: &mut Transaction<'_, Sqlite>,
item: &InvoiceItem,
) -> Result<()> {
sqlx::query(
"INSERT INTO invoice_item
(id, invoice_id, activity_id, tenant_pubkey, relay_id, plan_id, amount, description, created_at)
@@ -520,11 +549,12 @@ async fn mark_activity_billed_tx(
activity_id: &str,
billed_at: i64,
) -> Result<bool> {
let result = sqlx::query("UPDATE activity SET billed_at = ? WHERE id = ? AND billed_at IS NULL")
.bind(billed_at)
.bind(activity_id)
.execute(&mut **tx)
.await?;
let result =
sqlx::query("UPDATE activity SET billed_at = ? WHERE id = ? AND billed_at IS NULL")
.bind(billed_at)
.bind(activity_id)
.execute(&mut **tx)
.await?;
Ok(result.rows_affected() > 0)
}
@@ -578,7 +608,10 @@ async fn mark_invoice_paid_tx(
/// Void all of a tenant's open invoices, forgiving the balance — used when a
/// tenant churns or re-activates, so old debt never has to be collected.
async fn void_open_invoices_tx(tx: &mut Transaction<'_, Sqlite>, tenant_pubkey: &str) -> Result<()> {
async fn void_open_invoices_tx(
tx: &mut Transaction<'_, Sqlite>,
tenant_pubkey: &str,
) -> Result<()> {
let voided_at = chrono::Utc::now().timestamp();
sqlx::query(
@@ -615,7 +648,10 @@ async fn clear_tenant_nwc_error_tx(tx: &mut Transaction<'_, Sqlite>, pubkey: &st
Ok(())
}
async fn clear_tenant_stripe_error_tx(tx: &mut Transaction<'_, Sqlite>, pubkey: &str) -> Result<()> {
async fn clear_tenant_stripe_error_tx(
tx: &mut Transaction<'_, Sqlite>,
pubkey: &str,
) -> Result<()> {
sqlx::query("UPDATE tenant SET stripe_error = NULL WHERE pubkey = ?")
.bind(pubkey)
.execute(&mut **tx)
+10 -2
View File
@@ -74,7 +74,11 @@ async fn reconcile_relay_state(source: &str) -> Result<()> {
return Ok(());
}
tracing::info!(source, relay_count = relays.len(), "reconciling pending relay state");
tracing::info!(
source,
relay_count = relays.len(),
"reconciling pending relay state"
);
for relay in relays {
if relay.sync_error.trim().is_empty() {
@@ -229,7 +233,11 @@ async fn try_sync_relay(relay: &Relay) -> Result<()> {
);
}
let method = if is_new { HttpMethod::POST } else { HttpMethod::PATCH };
let method = if is_new {
HttpMethod::POST
} else {
HttpMethod::PATCH
};
request(method, &format!("relay/{}", relay.id), Some(&body)).await?;
Ok(())
}
+1 -1
View File
@@ -2,10 +2,10 @@ pub mod api;
pub mod billing;
pub mod bitcoin;
pub mod command;
pub mod db;
pub mod env;
pub mod infra;
pub mod models;
pub mod db;
pub mod query;
pub mod robot;
pub mod routes;
+3 -4
View File
@@ -2,10 +2,10 @@ mod api;
mod billing;
mod bitcoin;
mod command;
mod db;
mod env;
mod infra;
mod models;
mod db;
mod query;
mod robot;
mod routes;
@@ -16,7 +16,7 @@ mod web;
use anyhow::Result;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
use tower_http::cors::{AllowOrigin, CorsLayer, Any};
use tower_http::cors::{AllowOrigin, Any, CorsLayer};
use crate::api::Api;
use crate::billing::Billing;
@@ -61,8 +61,7 @@ async fn main() -> Result<()> {
billing.start().await;
});
let listener =
tokio::net::TcpListener::bind(format!(
let listener = tokio::net::TcpListener::bind(format!(
"{}:{}",
env::get().server_host,
env::get().server_port
+33 -29
View File
@@ -39,7 +39,8 @@ pub struct Tenant {
pub nwc_url: String,
/// Last NWC auto-payment error, or `None` when the wallet last paid (or has
/// never been tried). Surfaced in the UI to warn the user; it never blocks a
/// retry — the next reconcile attempts payment again regardless.
/// retry — the next reconcile attempts payment again regardless. Also cleared
/// when the tenant updates their NWC credentials.
pub nwc_error: Option<String>,
/// Last Stripe auto-payment error, with the same semantics as `nwc_error`.
pub stripe_error: Option<String>,
@@ -122,40 +123,43 @@ impl Default for Relay {
/// balance forgiven when the tenant churns).
#[derive(Debug, Clone, Serialize, Deserialize, sqlx::FromRow)]
pub struct Invoice {
pub id: String,
pub tenant_pubkey: String,
/// The total owed, fixed when the invoice is cut from its outstanding line
/// items, so collection never has to re-sum them.
pub amount: i64,
pub period_start: i64,
pub period_end: i64,
pub created_at: i64,
pub paid_at: Option<i64>,
pub voided_at: Option<i64>,
pub id: String,
pub tenant_pubkey: String,
/// The total owed, fixed when the invoice is cut from its outstanding line
/// items, so collection never has to re-sum them.
pub amount: i64,
pub period_start: i64,
pub period_end: i64,
pub created_at: i64,
pub paid_at: Option<i64>,
pub voided_at: Option<i64>,
/// How the invoice was paid — `nwc`, `stripe`, or `oob` (out-of-band
/// Lightning) — set when it is marked paid; `None` while open or void.
pub method: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize, sqlx::FromRow)]
pub struct InvoiceItem {
pub id: String,
/// `None` while outstanding; set once the item is claimed onto an invoice.
pub invoice_id: Option<String>,
/// `None` for renewal items, which have no source activity.
pub activity_id: Option<String>,
pub tenant_pubkey: String,
pub relay_id: String,
pub plan_id: String,
pub amount: i64,
pub description: String,
pub created_at: i64,
pub id: String,
/// `None` while outstanding; set once the item is claimed onto an invoice.
pub invoice_id: Option<String>,
/// `None` for renewal items, which have no source activity.
pub activity_id: Option<String>,
pub tenant_pubkey: String,
pub relay_id: String,
pub plan_id: String,
pub amount: i64,
pub description: String,
pub created_at: i64,
}
#[derive(Debug, Clone, Serialize, Deserialize, sqlx::FromRow)]
pub struct Bolt11 {
pub id: String,
pub invoice_id: String,
pub lnbc: String,
pub msats: i64,
pub created_at: i64,
pub expires_at: i64,
pub settled_at: Option<i64>,
pub id: String,
pub invoice_id: String,
pub lnbc: String,
pub msats: i64,
pub created_at: i64,
pub expires_at: i64,
pub settled_at: Option<i64>,
}
+32 -22
View File
@@ -1,7 +1,7 @@
use anyhow::{Result, anyhow};
use crate::models::{Activity, Bolt11, Invoice, Plan, Relay, Tenant};
use crate::db::pool;
use crate::models::{Activity, Bolt11, Invoice, InvoiceItem, Plan, Relay, Tenant};
fn select_tenant(tail: &str) -> String {
format!("SELECT * FROM tenant {tail}")
@@ -62,10 +62,12 @@ pub async fn list_tenants() -> Result<Vec<Tenant>> {
}
pub async fn get_tenant(pubkey: &str) -> Result<Option<Tenant>> {
Ok(sqlx::query_as::<_, Tenant>(&select_tenant("WHERE pubkey = ?"))
.bind(pubkey)
.fetch_optional(pool())
.await?)
Ok(
sqlx::query_as::<_, Tenant>(&select_tenant("WHERE pubkey = ?"))
.bind(pubkey)
.fetch_optional(pool())
.await?,
)
}
// --- Relays ---
@@ -85,10 +87,12 @@ pub async fn list_relays_pending_sync() -> Result<Vec<Relay>> {
}
pub async fn list_relays_for_tenant(tenant_pubkey: &str) -> Result<Vec<Relay>> {
Ok(sqlx::query_as::<_, Relay>(&select_relay("WHERE tenant_pubkey = ?"))
.bind(tenant_pubkey)
.fetch_all(pool())
.await?)
Ok(
sqlx::query_as::<_, Relay>(&select_relay("WHERE tenant_pubkey = ?"))
.bind(tenant_pubkey)
.fetch_all(pool())
.await?,
)
}
pub async fn get_relay(id: &str) -> Result<Option<Relay>> {
@@ -119,10 +123,12 @@ pub async fn get_relay_plan_before(relay_id: &str, before: i64) -> Result<Option
// --- Invoices ---
pub async fn get_invoice(invoice_id: &str) -> Result<Option<Invoice>> {
Ok(sqlx::query_as::<_, Invoice>("SELECT * FROM invoice WHERE id = ?")
.bind(invoice_id)
.fetch_optional(pool())
.await?)
Ok(
sqlx::query_as::<_, Invoice>("SELECT * FROM invoice WHERE id = ?")
.bind(invoice_id)
.fetch_optional(pool())
.await?,
)
}
pub async fn list_invoices(tenant_pubkey: &str) -> Result<Vec<Invoice>> {
@@ -134,12 +140,14 @@ pub async fn list_invoices(tenant_pubkey: &str) -> Result<Vec<Invoice>> {
.await?)
}
pub async fn get_latest_invoice(tenant_pubkey: &str) -> Result<Option<Invoice>> {
Ok(sqlx::query_as::<_, Invoice>(
"SELECT * FROM invoice WHERE tenant_pubkey = ? ORDER BY created_at DESC LIMIT 1",
/// The line items claimed onto an invoice, oldest first. Used to render an
/// invoice's contents (and its downloadable copy) from what was actually billed.
pub async fn list_invoice_items_for_invoice(invoice_id: &str) -> Result<Vec<InvoiceItem>> {
Ok(sqlx::query_as::<_, InvoiceItem>(
"SELECT * FROM invoice_item WHERE invoice_id = ? ORDER BY created_at ASC",
)
.bind(tenant_pubkey)
.fetch_optional(pool())
.bind(invoice_id)
.fetch_all(pool())
.await?)
}
@@ -159,10 +167,12 @@ pub async fn list_open_invoices(tenant_pubkey: &str) -> Result<Vec<Invoice>> {
// --- Bolt11 ---
pub async fn get_bolt11(bolt11_id: &str) -> Result<Option<Bolt11>> {
Ok(sqlx::query_as::<_, Bolt11>("SELECT * FROM bolt11 WHERE id = ?")
.bind(bolt11_id)
.fetch_optional(pool())
.await?)
Ok(
sqlx::query_as::<_, Bolt11>("SELECT * FROM bolt11 WHERE id = ?")
.bind(bolt11_id)
.fetch_optional(pool())
.await?,
)
}
pub async fn get_bolt11_for_invoice(invoice_id: &str) -> Result<Option<Bolt11>> {
+16 -9
View File
@@ -43,10 +43,7 @@ impl Robot {
Ok(client)
}
async fn publish_identity(
&self,
) -> Result<()> {
async fn publish_identity(&self) -> Result<()> {
let mut metadata = Metadata::new();
if !env::get().robot_name.is_empty() {
metadata = metadata.name(&env::get().robot_name);
@@ -65,7 +62,8 @@ impl Robot {
.send_event_builder(EventBuilder::metadata(&metadata))
.await?;
let outbox_tags = env::get().robot_outbox_relays
let outbox_tags = env::get()
.robot_outbox_relays
.iter()
.map(|r| Tag::parse(["r", r.as_str()]))
.collect::<std::result::Result<Vec<_>, _>>()?;
@@ -73,7 +71,8 @@ impl Robot {
.send_event_builder(EventBuilder::new(Kind::Custom(10002), "").tags(outbox_tags))
.await?;
let messaging_tags = env::get().robot_messaging_relays
let messaging_tags = env::get()
.robot_messaging_relays
.iter()
.map(|r| Tag::parse(["relay", r.as_str()]))
.collect::<std::result::Result<Vec<_>, _>>()?;
@@ -100,7 +99,9 @@ impl Robot {
let recipient_pubkey = PublicKey::parse(recipient)?;
let client = self.make_client(&dm_relays).await?;
client.send_private_msg(recipient_pubkey, message, []).await?;
client
.send_private_msg(recipient_pubkey, message, [])
.await?;
Ok(())
}
@@ -132,8 +133,14 @@ impl Robot {
pub async fn fetch_nostr_name(&self, pubkey: &str) -> Option<String> {
let pubkey = PublicKey::parse(pubkey).ok()?;
let filter = Filter::new().author(pubkey).kind(Kind::Metadata).limit(1);
let client = self.make_client(&env::get().robot_indexer_relays).await.ok()?;
let events = client.fetch_events(filter, Duration::from_secs(5)).await.ok()?;
let client = self
.make_client(&env::get().robot_indexer_relays)
.await
.ok()?;
let events = client
.fetch_events(filter, Duration::from_secs(5))
.await
.ok()?;
let event = events.into_iter().max_by_key(|e| e.created_at)?;
let content: serde_json::Value = serde_json::from_str(&event.content).ok()?;
let name = content
+21 -20
View File
@@ -6,26 +6,6 @@ use crate::api::{Api, AuthedPubkey};
use crate::query;
use crate::web::{ApiResult, internal, not_found, ok};
/// The tenant's most recent invoice, after first materializing any outstanding
/// line items into a fresh one — so the frontend can collect payment right after
/// a change (e.g. creating a relay). Payment isn't attempted here; the caller
/// drives it via the bolt11/Stripe endpoints. `null` when the tenant has no
/// invoices and nothing is outstanding.
pub async fn get_tenant_latest_invoice(
State(api): State<Arc<Api>>,
AuthedPubkey(auth): AuthedPubkey,
Path(pubkey): Path<String>,
) -> ApiResult {
api.require_admin_or_tenant(&auth, &pubkey)?;
let tenant = api.get_tenant_or_404(&pubkey).await?;
api.billing.reconcile_subscription(&tenant).await.map_err(internal)?;
let invoice = query::get_latest_invoice(&pubkey).await.map_err(internal)?;
ok(invoice)
}
pub async fn get_invoice(
State(api): State<Arc<Api>>,
AuthedPubkey(auth): AuthedPubkey,
@@ -63,3 +43,24 @@ pub async fn get_invoice_bolt11(
ok(serde_json::json!(bolt11))
}
/// The line items billed on an invoice, for rendering its contents and a
/// downloadable copy.
pub async fn list_invoice_items(
State(api): State<Arc<Api>>,
AuthedPubkey(auth): AuthedPubkey,
Path(invoice_id): Path<String>,
) -> ApiResult {
let invoice = query::get_invoice(&invoice_id)
.await
.map_err(internal)?
.ok_or_else(|| not_found("invoice not found"))?;
api.require_admin_or_tenant(&auth, &invoice.tenant_pubkey)?;
let items = query::list_invoice_items_for_invoice(&invoice_id)
.await
.map_err(internal)?;
ok(items)
}
+19 -17
View File
@@ -9,14 +9,12 @@ use regex::Regex;
use serde::Deserialize;
use crate::api::{Api, AuthedPubkey};
use crate::{command, infra, query};
use crate::models::{
RELAY_STATUS_ACTIVE, RELAY_STATUS_DELINQUENT, RELAY_STATUS_INACTIVE, Relay,
};
use crate::models::{RELAY_STATUS_ACTIVE, RELAY_STATUS_DELINQUENT, RELAY_STATUS_INACTIVE, Relay};
use crate::web::{
ApiError, ApiResult, bad_request, created, internal, map_unique_error, ok,
parse_bool_default, unprocessable,
ApiError, ApiResult, bad_request, created, internal, map_unique_error, ok, parse_bool_default,
unprocessable,
};
use crate::{command, infra, query};
pub async fn list_relays(
State(api): State<Arc<Api>>,
@@ -196,10 +194,7 @@ pub async fn update_relay(
if plan_changed {
let selected_plan = query::get_plan(&relay.plan_id).map_err(internal)?;
if let Some(limit) = selected_plan.members {
let current_members = fetch_relay_members(&relay)
.await
.map_err(internal)?
.len() as i64;
let current_members = fetch_relay_members(&relay).await.map_err(internal)?.len() as i64;
if current_members > limit {
let message = format!(
@@ -231,12 +226,13 @@ pub async fn deactivate_relay(
}
if relay.status == RELAY_STATUS_INACTIVE {
return Err(bad_request("relay-is-inactive", "relay is already inactive"));
return Err(bad_request(
"relay-is-inactive",
"relay is already inactive",
));
}
command::deactivate_relay(&relay)
.await
.map_err(internal)?;
command::deactivate_relay(&relay).await.map_err(internal)?;
ok(())
}
@@ -282,15 +278,21 @@ static SUBDOMAIN_RE: LazyLock<Regex> =
/// premium features, and coerce the boolean columns to 0/1.
fn prepare_relay(mut relay: Relay) -> Result<Relay, ApiError> {
if !SUBDOMAIN_RE.is_match(&relay.subdomain)
|| RESERVED_SUBDOMAINS.contains(&relay.subdomain.as_str()) {
|| RESERVED_SUBDOMAINS.contains(&relay.subdomain.as_str())
{
return Err(unprocessable("invalid-subdomain", "subdomain is invalid"));
}
let plan = query::get_plan(&relay.plan_id)
.map_err(|_| unprocessable("invalid-plan", "plan not found"))?;
if (!plan.blossom && relay.blossom_enabled == 1) || (!plan.livekit && relay.livekit_enabled == 1) {
return Err(unprocessable("premium-feature", "feature requires a paid plan"));
if (!plan.blossom && relay.blossom_enabled == 1)
|| (!plan.livekit && relay.livekit_enabled == 1)
{
return Err(unprocessable(
"premium-feature",
"feature requires a paid plan",
));
}
relay.policy_public_join = parse_bool_default(relay.policy_public_join, 0);
+23 -12
View File
@@ -56,6 +56,21 @@ pub async fn list_tenants(
.collect::<Vec<_>>())
}
/// Fetch a tenant by pubkey. Automatically refreshes the tenant's stripe payment data.
pub async fn get_tenant(
State(api): State<Arc<Api>>,
AuthedPubkey(auth): AuthedPubkey,
Path(pubkey): Path<String>,
) -> ApiResult {
api.require_admin_or_tenant(&auth, &pubkey)?;
let mut tenant = api.get_tenant_or_404(&pubkey).await?;
api.billing.sync_stripe_customer(&mut tenant).await.map_err(internal)?;
ok(TenantResponse::from(tenant))
}
/// Create the tenant row for the calling pubkey and provision its Stripe
/// customer. Idempotent: an existing tenant (including one created by a
/// concurrent unique-constraint race) is returned as-is.
@@ -99,16 +114,6 @@ pub async fn create_tenant(
}
}
pub async fn get_tenant(
State(api): State<Arc<Api>>,
AuthedPubkey(auth): AuthedPubkey,
Path(pubkey): Path<String>,
) -> ApiResult {
api.require_admin_or_tenant(&auth, &pubkey)?;
let tenant = api.get_tenant_or_404(&pubkey).await?;
ok(TenantResponse::from(tenant))
}
#[derive(Deserialize)]
pub struct UpdateTenantRequest {
pub nwc_url: Option<String>,
@@ -136,6 +141,7 @@ pub async fn update_tenant(
ok(TenantResponse::from(tenant))
}
/// List a tenant's relays.
pub async fn list_tenant_relays(
State(api): State<Arc<Api>>,
AuthedPubkey(auth): AuthedPubkey,
@@ -149,7 +155,7 @@ pub async fn list_tenant_relays(
ok(relays)
}
/// List a tenant's invoices, most recent first.
/// List a tenant's invoices after reconciling the tenant's billing state.
pub async fn list_tenant_invoices(
State(api): State<Arc<Api>>,
AuthedPubkey(auth): AuthedPubkey,
@@ -157,10 +163,15 @@ pub async fn list_tenant_invoices(
) -> ApiResult {
api.require_admin_or_tenant(&auth, &pubkey)?;
let invoices = query::list_invoices(&pubkey)
let tenant = api.get_tenant_or_404(&pubkey).await?;
api.billing
.reconcile_subscription(&tenant, false)
.await
.map_err(internal)?;
let invoices = query::list_invoices(&pubkey).await.map_err(internal)?;
ok(invoices)
}
+3 -1
View File
@@ -14,7 +14,9 @@ pub struct Wallet {
impl Wallet {
pub fn from_url(url: &str) -> Result<Self> {
Ok(Self { url: url.parse::<NostrWalletConnectURI>()? })
Ok(Self {
url: url.parse::<NostrWalletConnectURI>()?,
})
}
pub async fn make_invoice(
+5 -1
View File
@@ -71,7 +71,11 @@ pub fn err(status: StatusCode, code: &str, message: &str) -> ApiError {
}
pub fn unauthorized(reason: impl Display) -> ApiError {
err(StatusCode::UNAUTHORIZED, "unauthorized", &reason.to_string())
err(
StatusCode::UNAUTHORIZED,
"unauthorized",
&reason.to_string(),
)
}
pub fn forbidden(message: &str) -> ApiError {