Significant refactor of activity reconciliation

This commit is contained in:
Jon Staab
2026-05-27 14:16:21 -07:00
parent 7a2baf6f82
commit f37bb55286
7 changed files with 488 additions and 212 deletions
+114 -115
View File
@@ -7,69 +7,6 @@ use crate::models::{
RELAY_STATUS_INACTIVE, Relay, Tenant,
};
// --- Activity ---
/// Stamp `billed_at` on activities that were reconciled without producing an
/// invoice (e.g. free-plan or not-yet-prorated changes), so a recovery pass
/// doesn't re-scan them.
pub async fn mark_activities_billed(activity_ids: &[String]) -> Result<()> {
if activity_ids.is_empty() {
return Ok(());
}
let now = chrono::Utc::now().timestamp();
with_tx(async |tx| mark_activities_billed_tx(tx, activity_ids, now).await).await
}
/// Atomically record an `autogenerate_invoice` activity for the tenant, but only
/// if none has been recorded since `since` (the start of the current billing
/// period). Returns whether a new activity was inserted; `false` means the
/// period was already claimed.
///
/// The existence check and insert are a single statement, which SQLite runs
/// atomically, so concurrent pollers (or a restart racing the previous run)
/// can't both claim the same period. On success the activity is broadcast so the
/// billing consumer reconciles it like any other.
pub async fn try_autogenerate_invoice(tenant_pubkey: &str, since: i64) -> Result<bool> {
let id = uuid::Uuid::new_v4().to_string();
let created_at = chrono::Utc::now().timestamp();
let result = sqlx::query(
"INSERT INTO activity (id, tenant, created_at, activity_type, resource_type, resource_id)
SELECT ?, ?, ?, 'autogenerate_invoice', 'tenant', ?
WHERE NOT EXISTS (
SELECT 1 FROM activity
WHERE tenant = ?
AND activity_type = 'autogenerate_invoice'
AND created_at >= ?
)",
)
.bind(&id)
.bind(tenant_pubkey)
.bind(created_at)
.bind(tenant_pubkey)
.bind(tenant_pubkey)
.bind(since)
.execute(pool())
.await?;
if result.rows_affected() == 0 {
return Ok(false);
}
publish(Activity {
id,
tenant: tenant_pubkey.to_string(),
created_at,
activity_type: "autogenerate_invoice".to_string(),
resource_type: "tenant".to_string(),
resource_id: tenant_pubkey.to_string(),
billed_at: None,
});
Ok(true)
}
// --- Tenants ---
pub async fn create_tenant(tenant: &Tenant) -> Result<()> {
@@ -84,7 +21,7 @@ pub async fn create_tenant(tenant: &Tenant) -> Result<()> {
.bind(&tenant.stripe_customer_id)
.execute(&mut **tx)
.await?;
insert_activity_tx(tx, "create_tenant", "tenant", &tenant.pubkey).await
insert_activity_tx(tx, "create_tenant", "tenant", &tenant.pubkey, None).await
})
.await?;
publish(activity);
@@ -98,13 +35,22 @@ pub async fn update_tenant(tenant: &Tenant) -> Result<()> {
.bind(&tenant.pubkey)
.execute(&mut **tx)
.await?;
insert_activity_tx(tx, "update_tenant", "tenant", &tenant.pubkey).await
insert_activity_tx(tx, "update_tenant", "tenant", &tenant.pubkey, None).await
})
.await?;
publish(activity);
Ok(())
}
pub async fn set_tenant_billing_anchor(tenant: &Tenant) -> Result<()> {
sqlx::query("UPDATE tenant SET billing_anchor = ? WHERE pubkey = ?")
.bind(tenant.billing_anchor)
.bind(&tenant.pubkey)
.execute(pool())
.await?;
Ok(())
}
pub async fn clear_tenant_nwc_error(pubkey: &str) -> Result<()> {
sqlx::query("UPDATE tenant SET nwc_error = NULL WHERE pubkey = ?")
.bind(pubkey)
@@ -143,7 +89,7 @@ pub async fn create_relay(relay: &Relay) -> Result<()> {
.bind(relay.push_enabled)
.execute(&mut **tx)
.await?;
insert_activity_tx(tx, "create_relay", "relay", &relay.id).await
insert_activity_tx(tx, "create_relay", "relay", &relay.id, Some(&relay.plan)).await
})
.await?;
publish(activity);
@@ -179,7 +125,7 @@ pub async fn update_relay(relay: &Relay) -> Result<()> {
.bind(&relay.id)
.execute(&mut **tx)
.await?;
insert_activity_tx(tx, "update_relay", "relay", &relay.id).await
insert_activity_tx(tx, "update_relay", "relay", &relay.id, Some(&relay.plan)).await
})
.await?;
publish(activity);
@@ -206,7 +152,7 @@ async fn set_relay_status(relay_id: &str, status: &str, activity_type: &str) ->
.bind(relay_id)
.execute(&mut **tx)
.await?;
insert_activity_tx(tx, activity_type, "relay", relay_id).await
insert_activity_tx(tx, activity_type, "relay", relay_id, None).await
})
.await?;
publish(activity);
@@ -220,7 +166,7 @@ pub async fn fail_relay_sync(relay: &Relay, sync_error: String) -> Result<()> {
.bind(&relay.id)
.execute(&mut **tx)
.await?;
insert_activity_tx(tx, "fail_relay_sync", "relay", &relay.id).await
insert_activity_tx(tx, "fail_relay_sync", "relay", &relay.id, None).await
})
.await?;
publish(activity);
@@ -233,44 +179,108 @@ pub async fn complete_relay_sync(relay_id: &str) -> Result<()> {
.bind(relay_id)
.execute(&mut **tx)
.await?;
insert_activity_tx(tx, "complete_relay_sync", "relay", relay_id).await
insert_activity_tx(tx, "complete_relay_sync", "relay", relay_id, None).await
})
.await?;
publish(activity);
Ok(())
}
// --- Invoice items (the outstanding-charge ledger) ---
pub async fn insert_invoice_item_for_activity(invoice_item: &InvoiceItem, activity_id: &str) -> Result<()> {
let now = chrono::Utc::now().timestamp();
with_tx(async |tx| {
insert_invoice_item_tx(tx, invoice_item).await?;
mark_activity_billed_tx(tx, activity_id, now).await?;
Ok(())
})
.await
}
/// Mark an activity billed without a line item — for activities that produce no
/// charge (e.g. free-plan changes), so a recovery pass doesn't re-scan them.
pub async fn mark_activity_billed(activity_id: &str) -> Result<()> {
let now = chrono::Utc::now().timestamp();
with_tx(async |tx| mark_activity_billed_tx(tx, activity_id, now).await).await
}
/// Insert renewal line items, skipping any relay already covered for the item's
/// `period_start`. The per-relay existence check and insert are a single
/// statement, so neither a re-tick nor a relay's own creation/activation charge
/// (which also stamps `period_start`) can bill the same relay-period twice.
pub async fn create_renewal_items(items: &[InvoiceItem]) -> Result<()> {
with_tx(async |tx| {
for item in items {
sqlx::query(
"INSERT INTO invoice_item
(id, invoice_id, activity_id, tenant_pubkey, relay_id, plan, amount, description, created_at, period_start)
SELECT ?, NULL, NULL, ?, ?, ?, ?, ?, ?, ?
WHERE NOT EXISTS (
SELECT 1 FROM invoice_item WHERE relay_id = ? AND period_start = ?
)",
)
.bind(&item.id)
.bind(&item.tenant_pubkey)
.bind(&item.relay_id)
.bind(&item.plan)
.bind(item.amount)
.bind(&item.description)
.bind(item.created_at)
.bind(item.period_start)
.bind(&item.relay_id)
.bind(item.period_start)
.execute(&mut **tx)
.await?;
}
Ok(())
})
.await
}
// --- Invoices ---
/// Create an invoice with its line items, stamp `billed_at` on the activities
/// that produced them, and set the tenant's billing anchor when this is their
/// first invoice — all in one transaction. Returns the inserted invoice.
pub async fn create_invoice(
/// Claim all of a tenant's outstanding items onto a new invoice — but only if
/// they sum to a positive amount. A non-positive balance (net credit or nothing
/// owed) leaves the items outstanding so the credit carries to the next positive
/// invoice. The sum, insert, and claim run in one transaction. Returns the
/// invoice, or `None` when there's nothing to bill.
pub async fn claim_outstanding_into_invoice(
invoice_id: &str,
tenant_pubkey: &str,
period_start: i64,
period_end: i64,
items: &[InvoiceItem],
billed_activity_ids: &[String],
new_billing_anchor: Option<i64>,
) -> Result<Invoice> {
let now = chrono::Utc::now().timestamp();
) -> Result<Option<Invoice>> {
with_tx(async |tx| {
let total = sqlx::query_scalar::<_, i64>(
"SELECT COALESCE(SUM(amount), 0) FROM invoice_item
WHERE tenant_pubkey = ? AND invoice_id IS NULL",
)
.bind(tenant_pubkey)
.fetch_one(&mut **tx)
.await?;
if total <= 0 {
return Ok(None);
}
let invoice =
insert_invoice_tx(tx, invoice_id, tenant_pubkey, period_start, period_end).await?;
for item in items {
insert_invoice_item_tx(tx, item).await?;
}
sqlx::query(
"UPDATE invoice_item SET invoice_id = ?
WHERE tenant_pubkey = ? AND invoice_id IS NULL",
)
.bind(invoice_id)
.bind(tenant_pubkey)
.execute(&mut **tx)
.await?;
mark_activities_billed_tx(tx, billed_activity_ids, now).await?;
if let Some(anchor) = new_billing_anchor {
set_tenant_billing_anchor_tx(tx, tenant_pubkey, anchor).await?;
}
Ok(invoice)
Ok(Some(invoice))
})
.await
}
@@ -285,7 +295,7 @@ pub async fn mark_invoice_paid(invoice_id: &str, method: &str) -> Result<()> {
.bind(invoice_id)
.execute(&mut **tx)
.await?;
insert_activity_tx(tx, "invoice_paid", "invoice", invoice_id).await
insert_activity_tx(tx, "invoice_paid", "invoice", invoice_id, None).await
})
.await?;
publish(activity);
@@ -355,6 +365,7 @@ async fn insert_activity_tx(
activity_type: &str,
resource_type: &str,
resource_id: &str,
plan_id: Option<&str>,
) -> Result<Activity> {
let tenant = match resource_type {
"tenant" => resource_id.to_string(),
@@ -371,8 +382,8 @@ async fn insert_activity_tx(
let created_at = chrono::Utc::now().timestamp();
sqlx::query(
"INSERT INTO activity (id, tenant, created_at, activity_type, resource_type, resource_id)
VALUES (?, ?, ?, ?, ?, ?)",
"INSERT INTO activity (id, tenant, created_at, activity_type, resource_type, resource_id, plan_id)
VALUES (?, ?, ?, ?, ?, ?, ?)",
)
.bind(&id)
.bind(&tenant)
@@ -380,6 +391,7 @@ async fn insert_activity_tx(
.bind(activity_type)
.bind(resource_type)
.bind(resource_id)
.bind(plan_id)
.execute(&mut **tx)
.await?;
@@ -391,6 +403,7 @@ async fn insert_activity_tx(
resource_type: resource_type.to_string(),
resource_id: resource_id.to_string(),
billed_at: None,
plan_id: plan_id.map(str::to_string),
})
}
@@ -420,8 +433,8 @@ async fn insert_invoice_tx(
async fn insert_invoice_item_tx(tx: &mut Transaction<'_, Sqlite>, item: &InvoiceItem) -> Result<()> {
sqlx::query(
"INSERT INTO invoice_item
(id, invoice_id, activity_id, tenant_pubkey, relay_id, plan, amount, description, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)",
(id, invoice_id, activity_id, tenant_pubkey, relay_id, plan, amount, description, created_at, period_start)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
)
.bind(&item.id)
.bind(&item.invoice_id)
@@ -432,34 +445,20 @@ async fn insert_invoice_item_tx(tx: &mut Transaction<'_, Sqlite>, item: &Invoice
.bind(item.amount)
.bind(&item.description)
.bind(item.created_at)
.bind(item.period_start)
.execute(&mut **tx)
.await?;
Ok(())
}
async fn mark_activities_billed_tx(
async fn mark_activity_billed_tx(
tx: &mut Transaction<'_, Sqlite>,
activity_ids: &[String],
activity_id: &str,
billed_at: i64,
) -> Result<()> {
for id in activity_ids {
sqlx::query("UPDATE activity SET billed_at = ? WHERE id = ?")
.bind(billed_at)
.bind(id)
.execute(&mut **tx)
.await?;
}
Ok(())
}
async fn set_tenant_billing_anchor_tx(
tx: &mut Transaction<'_, Sqlite>,
tenant_pubkey: &str,
billing_anchor: i64,
) -> Result<()> {
sqlx::query("UPDATE tenant SET billing_anchor = ? WHERE pubkey = ?")
.bind(billing_anchor)
.bind(tenant_pubkey)
sqlx::query("UPDATE activity SET billed_at = ? WHERE id = ?")
.bind(billed_at)
.bind(activity_id)
.execute(&mut **tx)
.await?;
Ok(())