Clean up billing a bit
This commit is contained in:
+47
-128
@@ -4,7 +4,6 @@ use std::time::Duration;
|
||||
|
||||
use crate::bitcoin;
|
||||
use crate::command;
|
||||
use crate::db;
|
||||
use crate::env;
|
||||
use crate::models::{Activity, Bolt11, Invoice, InvoiceItem, Tenant};
|
||||
use crate::query;
|
||||
@@ -34,107 +33,25 @@ impl Billing {
|
||||
// --- lifecycle methods ---
|
||||
|
||||
pub async fn start(self) {
|
||||
let mut rx = db::subscribe();
|
||||
|
||||
tokio::spawn({
|
||||
let billing = self.clone();
|
||||
async move { billing.poll().await }
|
||||
});
|
||||
|
||||
if let Err(error) = self.reconcile_subscriptions("startup").await {
|
||||
tracing::error!(error = %error, "failed to reconcile subscriptions on startup");
|
||||
}
|
||||
|
||||
loop {
|
||||
match rx.recv().await {
|
||||
Ok(activity) => {
|
||||
if let Err(e) = self.handle_activity(&activity).await {
|
||||
tracing::error!(error = %e, "billing handle_activity failed");
|
||||
}
|
||||
}
|
||||
Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
|
||||
tracing::warn!(missed = n, "billing lagged, reconciling all subscriptions");
|
||||
|
||||
if let Err(error) = self.reconcile_subscriptions("lagged").await {
|
||||
tracing::error!(error = %error, "failed to reconcile after lag");
|
||||
}
|
||||
}
|
||||
Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn poll(&self) {
|
||||
let mut interval = tokio::time::interval(POLL_INTERVAL);
|
||||
|
||||
loop {
|
||||
interval.tick().await;
|
||||
|
||||
if let Err(error) = self.autogenerate_invoices().await {
|
||||
if let Err(error) = self.reconcile_subscriptions().await {
|
||||
tracing::error!(error = %error, "billing poll failed");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn autogenerate_invoices(&self) -> Result<()> {
|
||||
async fn reconcile_subscriptions(&self) -> Result<()> {
|
||||
let tenants = query::list_tenants().await?;
|
||||
|
||||
tracing::info!(
|
||||
tenant_count = tenants.len(),
|
||||
"polling tenants for subscription renewal"
|
||||
);
|
||||
|
||||
for tenant in tenants {
|
||||
if let Err(error) = self.autogenerate_invoice(&tenant).await {
|
||||
tracing::error!(
|
||||
tenant = %tenant.pubkey,
|
||||
error = ?error,
|
||||
"failed to autogenerate invoice"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Periodically generate the tenant's invoice for the current period
|
||||
/// (adding any due renewals) and, if one results, attempt payment.
|
||||
async fn autogenerate_invoice(&self, tenant: &Tenant) -> Result<()> {
|
||||
if let Some(invoice) = self.generate_invoice(tenant).await? {
|
||||
self.attempt_payment(tenant, &invoice).await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn handle_activity(&self, activity: &Activity) -> Result<()> {
|
||||
let should_reconcile = matches!(
|
||||
activity.activity_type.as_str(),
|
||||
"create_relay" | "update_relay" | "activate_relay" | "deactivate_relay"
|
||||
);
|
||||
|
||||
if should_reconcile
|
||||
&& let Some(tenant) = query::get_tenant(&activity.tenant).await?
|
||||
{
|
||||
self.reconcile_subscription(&tenant).await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn reconcile_subscriptions(&self, source: &str) -> Result<()> {
|
||||
let tenants = query::list_tenants().await?;
|
||||
|
||||
tracing::info!(
|
||||
source,
|
||||
tenant_count = tenants.len(),
|
||||
"reconciling all subscriptions"
|
||||
);
|
||||
tracing::info!(tenant_count = tenants.len(), "reconciling all subscriptions");
|
||||
|
||||
for tenant in tenants {
|
||||
if let Err(error) = self.reconcile_subscription(&tenant).await {
|
||||
tracing::error!(
|
||||
source,
|
||||
tenant = %tenant.pubkey,
|
||||
error = ?error,
|
||||
"failed to reconcile subscription"
|
||||
@@ -145,22 +62,41 @@ impl Billing {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// --- Reconciliation, renewal, and on-demand billing ---
|
||||
// --- Reconciliation of activity/renewals ---
|
||||
|
||||
/// Lists billable activity, setting the tenant's billing anchor to the first
|
||||
/// activity in the process.
|
||||
async fn reconcile_subscription(&self, tenant: &Tenant) -> Result<()> {
|
||||
/// activity in the process. Generates an invoice for the current period if due
|
||||
/// for renewal or any billable activities have occurred. Attempts payment if
|
||||
/// an invoice is generated.
|
||||
pub async fn reconcile_subscription(&self, tenant: &Tenant) -> Result<()> {
|
||||
let mut tenant = tenant.clone();
|
||||
|
||||
// Reconcile all activity, setting the tenant's billing anchor on the first
|
||||
// positive-balance line item if not already set.
|
||||
for activity in query::list_billable_activity_for_tenant(&tenant.pubkey).await? {
|
||||
if tenant.billing_anchor.is_none() {
|
||||
tenant.billing_anchor = Some(activity.created_at);
|
||||
command::set_tenant_billing_anchor(&tenant).await?;
|
||||
}
|
||||
|
||||
self.reconcile_activity(&tenant, &activity).await?;
|
||||
let invoice_item = self.reconcile_activity(&tenant, &activity).await?;
|
||||
}
|
||||
|
||||
// If the tenant has no billing anchor, they have nothing to bill
|
||||
let Some(period) = BillingPeriod::current(&tenant) else {
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
// If tenant is due for renewal, bill any active relays.
|
||||
if tenant.renewed_at.is_none_or(|at| at < period.start) {
|
||||
self.reconcile_renewal(&tenant, &period).await?;
|
||||
}
|
||||
|
||||
// Create the invoice, but only if non-zero and attempt payment
|
||||
if let Some(invoice) = command::create_invoice(&tenant, &period).await? {
|
||||
self.attempt_payment(&tenant, &invoice).await?;
|
||||
};
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -168,7 +104,7 @@ impl Billing {
|
||||
/// persist it with the activity's billed marker. Activities that produce no
|
||||
/// item (e.g. free-plan changes) are still marked billed so they aren't
|
||||
/// re-scanned.
|
||||
async fn reconcile_activity(&self, tenant: &Tenant, activity: &Activity) -> Result<()> {
|
||||
async fn reconcile_activity(&self, tenant: &Tenant, activity: &Activity) -> Result<Option<InvoiceItem>> {
|
||||
let invoice_item = match activity.activity_type.as_str() {
|
||||
"create_relay" => {
|
||||
self.make_prorated_item(tenant, activity, 1, "New relay created")
|
||||
@@ -187,9 +123,11 @@ impl Billing {
|
||||
};
|
||||
|
||||
match invoice_item {
|
||||
Some(item) => command::insert_invoice_item_for_activity(&item, &activity.id).await,
|
||||
None => command::mark_activity_billed(&activity.id).await,
|
||||
Some(ref item) => command::insert_invoice_item_for_activity(&item, &activity.id).await?,
|
||||
None => command::mark_activity_billed(&activity.id).await?,
|
||||
}
|
||||
|
||||
Ok(invoice_item)
|
||||
}
|
||||
|
||||
/// A prorated charge (or credit, with `sign` = -1) for the relay's current
|
||||
@@ -203,16 +141,16 @@ impl Billing {
|
||||
description: &str,
|
||||
) -> Result<Option<InvoiceItem>> {
|
||||
let Some(relay) = query::get_relay(&activity.resource_id).await? else {
|
||||
return Ok(None);
|
||||
return anyhow!("activity resource was not a valid relay");
|
||||
};
|
||||
let Some(plan) = query::get_plan(&relay.plan) else {
|
||||
return Ok(None);
|
||||
return anyhow!("activity plan was not a valid plan");
|
||||
};
|
||||
if plan.amount <= 0 {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
let period = BillingPeriod::new(tenant, activity.created_at)
|
||||
let period = BillingPeriod::at(tenant, activity.created_at)
|
||||
.ok_or_else(|| anyhow!("billing anchor must be set before building prorated items"))?;
|
||||
let amount = sign * period.prorate(plan.amount, activity.created_at);
|
||||
|
||||
@@ -240,23 +178,24 @@ impl Billing {
|
||||
activity: &Activity,
|
||||
) -> Result<Option<InvoiceItem>> {
|
||||
let Some(new_plan_id) = activity.plan_id.as_deref() else {
|
||||
return Ok(None);
|
||||
return anyhow!("activity plan was not a valid plan");
|
||||
};
|
||||
let Some(old_plan_id) =
|
||||
query::get_relay_plan_before(&activity.resource_id, activity.created_at).await?
|
||||
else {
|
||||
return Ok(None);
|
||||
return anyhow!("no previous plan found for relay update activity");
|
||||
};
|
||||
if old_plan_id == new_plan_id {
|
||||
return Ok(None);
|
||||
}
|
||||
let (Some(new_plan), Some(old_plan)) =
|
||||
(query::get_plan(new_plan_id), query::get_plan(&old_plan_id))
|
||||
else {
|
||||
return Ok(None);
|
||||
let Some(new_plan) = query::get_plan(new_plan_id) else {
|
||||
return anyhow!("new plan is an invalid plan");
|
||||
};
|
||||
let Some(old_plan) = query::get_plan(old_plan_id) else {
|
||||
return anyhow!("old plan is an invalid plan");
|
||||
};
|
||||
|
||||
let period = BillingPeriod::new(tenant, activity.created_at)
|
||||
let period = BillingPeriod::at(tenant, activity.created_at)
|
||||
.ok_or_else(|| anyhow!("billing anchor must be set before building prorated items"))?;
|
||||
let amount = period.prorate(new_plan.amount, activity.created_at)
|
||||
- period.prorate(old_plan.amount, activity.created_at);
|
||||
@@ -278,28 +217,6 @@ impl Billing {
|
||||
}))
|
||||
}
|
||||
|
||||
/// Reconcile pending activity, add this period's renewals if they're due, and
|
||||
/// claim everything outstanding onto an invoice. Idempotent per period.
|
||||
pub async fn generate_invoice(&self, tenant: &Tenant) -> Result<Option<Invoice>> {
|
||||
self.reconcile_subscription(tenant).await?;
|
||||
|
||||
// reconcile may have just set the anchor (first activity); re-read it.
|
||||
let Some(tenant) = query::get_tenant(&tenant.pubkey).await? else {
|
||||
return Ok(None);
|
||||
};
|
||||
let Some(period) = BillingPeriod::current(&tenant) else {
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
// Short-circuit the renewal scan if this period is already renewed, for
|
||||
// performance. Renew_tenant re-checks this in-tx as the real guard.
|
||||
if tenant.renewed_at.is_none_or(|at| at < period.start) {
|
||||
self.renew_period(&tenant, &period).await?;
|
||||
}
|
||||
|
||||
command::create_invoice(&tenant.pubkey, period.start, period.end).await
|
||||
}
|
||||
|
||||
/// Charge a full-period renewal for every relay that was active on a paid plan
|
||||
/// as of `period.start`, reconstructing that state from the activity log
|
||||
/// (status from create/activate/deactivate, plan from create/update).
|
||||
@@ -307,7 +224,7 @@ impl Billing {
|
||||
/// on every generation can't renew twice; a relay created/activated *within*
|
||||
/// the period isn't active before the boundary, so it's covered by its own
|
||||
/// prorated charge instead.
|
||||
async fn renew_period(&self, tenant: &Tenant, period: &BillingPeriod) -> Result<()> {
|
||||
async fn reconcile_renewal(&self, tenant: &Tenant, period: &BillingPeriod) -> Result<()> {
|
||||
let activities = query::list_relay_activity_before(&tenant.pubkey, period.start).await?;
|
||||
|
||||
let mut line_items = Vec::new();
|
||||
@@ -339,6 +256,8 @@ impl Billing {
|
||||
command::insert_invoice_items_for_renewal(&line_items, period).await
|
||||
}
|
||||
|
||||
// --- Payments ---
|
||||
|
||||
pub async fn attempt_payment(&self, tenant: &Tenant, invoice: &Invoice) -> Result<()> {
|
||||
let mut error_message: Option<String> = None;
|
||||
|
||||
@@ -510,12 +429,12 @@ impl BillingPeriod {
|
||||
/// The period containing `chrono::Utc::now()` for `tenant`. `None` when the
|
||||
/// tenant has no `billing_anchor` yet — i.e. no billable activity has been seen.
|
||||
fn current(tenant: &Tenant) -> Option<Self> {
|
||||
Self::new(tenant, chrono::Utc::now().timestamp())
|
||||
Self::at(tenant, chrono::Utc::now().timestamp())
|
||||
}
|
||||
|
||||
/// The period containing `at` for `tenant`. `None` when the tenant has no
|
||||
/// `billing_anchor` yet — i.e. no billable activity has been seen.
|
||||
fn new(tenant: &Tenant, at: i64) -> Option<Self> {
|
||||
fn at(tenant: &Tenant, at: i64) -> Option<Self> {
|
||||
use chrono::{DateTime, Months, Utc};
|
||||
|
||||
let anchor = tenant.billing_anchor?;
|
||||
|
||||
+9
-15
@@ -264,17 +264,13 @@ pub async fn mark_activity_billed(activity_id: &str) -> Result<()> {
|
||||
/// Claim all of a tenant's outstanding items onto a new invoice. A non-positive
|
||||
/// balance leaves the items outstanding so the credit carries to the next positive
|
||||
/// invoice. Returns the invoice, or `None` when there's nothing to bill.
|
||||
pub async fn create_invoice(
|
||||
tenant_pubkey: &str,
|
||||
period_start: i64,
|
||||
period_end: i64,
|
||||
) -> Result<Option<Invoice>> {
|
||||
pub async fn create_invoice(tenant: &Tenant, period: &BillingPeriod) -> Result<Option<Invoice>> {
|
||||
with_tx(async |tx| {
|
||||
let total = sqlx::query_scalar::<_, i64>(
|
||||
"SELECT COALESCE(SUM(amount), 0) FROM invoice_item
|
||||
WHERE tenant_pubkey = ? AND invoice_id IS NULL",
|
||||
)
|
||||
.bind(tenant_pubkey)
|
||||
.bind(&tenant.pubkey)
|
||||
.fetch_one(&mut **tx)
|
||||
.await?;
|
||||
|
||||
@@ -282,15 +278,14 @@ pub async fn create_invoice(
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
let invoice =
|
||||
insert_invoice_tx(tx, tenant_pubkey, period_start, period_end).await?;
|
||||
let invoice = insert_invoice_tx(tx, &tenant, &period).await?;
|
||||
|
||||
sqlx::query(
|
||||
"UPDATE invoice_item SET invoice_id = ?
|
||||
WHERE tenant_pubkey = ? AND invoice_id IS NULL",
|
||||
)
|
||||
.bind(&invoice.id)
|
||||
.bind(tenant_pubkey)
|
||||
.bind(&tenant.pubkey)
|
||||
.execute(&mut **tx)
|
||||
.await?;
|
||||
|
||||
@@ -423,9 +418,8 @@ async fn insert_activity_tx(
|
||||
|
||||
async fn insert_invoice_tx(
|
||||
tx: &mut Transaction<'_, Sqlite>,
|
||||
tenant_pubkey: &str,
|
||||
period_start: i64,
|
||||
period_end: i64,
|
||||
tenant: &Tenant,
|
||||
period: &BillingPeriod,
|
||||
) -> Result<Invoice> {
|
||||
let now = chrono::Utc::now().timestamp();
|
||||
let invoice_id = uuid::Uuid::new_v4().to_string();
|
||||
@@ -435,9 +429,9 @@ async fn insert_invoice_tx(
|
||||
VALUES (?, ?, 'open', ?, ?, ?, ?) RETURNING *",
|
||||
)
|
||||
.bind(invoice_id)
|
||||
.bind(tenant_pubkey)
|
||||
.bind(period_start)
|
||||
.bind(period_end)
|
||||
.bind(&tenant.pubkey)
|
||||
.bind(&period.start)
|
||||
.bind(period.end)
|
||||
.bind(now)
|
||||
.bind(now)
|
||||
.fetch_one(&mut **tx)
|
||||
|
||||
@@ -19,12 +19,7 @@ pub async fn get_tenant_latest_invoice(
|
||||
api.require_admin_or_tenant(&auth, &pubkey)?;
|
||||
let tenant = api.get_tenant_or_404(&pubkey).await?;
|
||||
|
||||
// Roll any outstanding charges (and due renewals) into an invoice, then
|
||||
// return the latest.
|
||||
api.billing
|
||||
.generate_invoice(&tenant)
|
||||
.await
|
||||
.map_err(internal)?;
|
||||
api.billing.reconcile_subscription(&tenant).await.map_err(internal)?;
|
||||
|
||||
let invoice = query::get_latest_invoice_for_tenant(&pubkey).await.map_err(internal)?;
|
||||
|
||||
|
||||
@@ -33,7 +33,7 @@ export default function PaymentDialog(props: PaymentDialogProps) {
|
||||
const planById = new Map(plans().map((p) => [p.id, p]))
|
||||
return (relays() ?? [])
|
||||
.map((relay) => ({ relay, plan: planById.get(relay.plan) }))
|
||||
.filter((entry) => entry.plan?.amount > 0)
|
||||
.filter((entry) => Boolean(entry.plan?.amount))
|
||||
})
|
||||
|
||||
async function loadBolt11() {
|
||||
|
||||
Reference in New Issue
Block a user