Update backend implementation to fit spec

This commit is contained in:
Jon Staab
2026-03-25 11:43:09 -07:00
parent 2e0740910c
commit cb2e37c74a
19 changed files with 1798 additions and 2341 deletions
+276 -393
View File
@@ -1,316 +1,263 @@
use anyhow::{Result, anyhow};
use chrono::{DateTime, Months, TimeZone, Utc};
use std::collections::HashMap;
use tokio::time::{Duration, sleep};
use uuid::Uuid;
use crate::models::{Invoice, InvoiceAttempt, InvoiceItem, RelayLifecycleEvent, Tenant};
use crate::notifications::Nip17Notifier;
use anyhow::Result;
use chrono::{DateTime, Datelike, Duration, Months, TimeZone, Utc};
use tokio::sync::Mutex;
use crate::models::{Activity, Invoice, InvoiceItem, Relay, Tenant};
use crate::repo::Repo;
use nostr_sdk::nips::nip47::{self, MakeInvoiceRequest, NostrWalletConnectURI, PayInvoiceRequest};
use nostr_sdk::{Client, Filter, Keys, Kind, Timestamp};
const GRACE_DAYS: i64 = 7;
const DUE_DAYS: i64 = 7;
const WORKER_INTERVAL_SECS: u64 = 300;
// ── service ───────────────────────────────────────────────────────────────────
use crate::robot::Robot;
#[derive(Clone)]
pub struct BillingService {
pub struct Billing {
nwc_url: String,
repo: Repo,
notifier: Nip17Notifier,
platform_nwc_url: String,
robot: Robot,
last_activity_at: std::sync::Arc<Mutex<i64>>,
}
impl BillingService {
pub fn new(repo: Repo, notifier: Nip17Notifier, platform_nwc_url: String) -> Self {
impl Billing {
pub fn new(repo: Repo, robot: Robot) -> Self {
let nwc_url = std::env::var("NWC_URL").unwrap_or_default();
Self {
nwc_url,
repo,
notifier,
platform_nwc_url,
robot,
last_activity_at: std::sync::Arc::new(Mutex::new(0)),
}
}
pub async fn run(self) {
pub async fn start(self) {
let mut interval = tokio::time::interval(std::time::Duration::from_secs(3600));
loop {
if let Err(err) = self.process_once().await {
tracing::error!(error = %err, "billing run failed");
interval.tick().await;
if let Err(e) = self.tick().await {
tracing::error!(error = %e, "billing tick failed");
}
sleep(Duration::from_secs(WORKER_INTERVAL_SECS)).await;
}
}
async fn process_once(&self) -> Result<()> {
pub async fn tick(&self) -> Result<()> {
let mut since_guard = self.last_activity_at.lock().await;
let since = *since_guard;
let activity = self.repo.list_activity(&since, None).await?;
for a in &activity {
if matches!(a.activity_type.as_str(), "relay_created" | "relay_updated" | "relay_activated") {
self.maybe_reset_anchor_for_first_paid_relay(a).await?;
}
*since_guard = (*since_guard).max(a.created_at);
}
drop(since_guard);
let tenants = self.repo.list_tenants().await?;
for tenant in &tenants {
if let Err(err) = self.generate_invoice_if_due(tenant).await {
tracing::error!(tenant = %tenant.pubkey, error = %err, "invoice generation failed");
}
}
for tenant in &tenants {
if let Err(err) = self.collect_outstanding(tenant).await {
tracing::error!(tenant = %tenant.pubkey, error = %err, "collection failed");
}
self.generate_invoice_if_due(tenant).await?;
self.collect_outstanding(tenant).await?;
}
Ok(())
}
// ── invoice generation ────────────────────────────────────────────────────
async fn generate_invoice_if_due(&self, tenant: &Tenant) -> Result<()> {
if tenant.status != "active" {
async fn maybe_reset_anchor_for_first_paid_relay(&self, activity: &Activity) -> Result<()> {
let relay = match self.repo.get_relay(&activity.identifier).await? {
Some(r) => r,
None => return Ok(()),
};
if relay.plan == "free" {
return Ok(());
}
let anchor = ts_to_dt(tenant.billing_anchor_at)?;
let now = Utc::now();
let (period_start, period_end) = current_billing_period(anchor, now);
let relays = self.repo.list_relays(Some(&relay.tenant)).await?;
let paid_active_count = relays
.into_iter()
.filter(|r| r.status == "active" && r.plan != "free")
.count() as i64;
// Only generate once the period has closed
if paid_active_count == 1 {
self.repo
.update_tenant_billing_anchor(&relay.tenant, activity.created_at)
.await?;
}
Ok(())
}
async fn generate_invoice_if_due(&self, tenant: &Tenant) -> Result<()> {
if self.repo.total_pending_invoices_for_tenant(&tenant.pubkey).await? > 0 {
return Ok(());
}
let relays = self.repo.list_relays(Some(&tenant.pubkey)).await?;
let active_paid_relays: Vec<Relay> = relays
.iter()
.filter(|r| r.status == "active" && r.plan != "free")
.cloned()
.collect();
if active_paid_relays.is_empty() {
return Ok(());
}
let now = Utc::now();
let anchor = ts_to_dt(tenant.billing_anchor)?;
let (period_start, period_end) = billing_window(anchor, now);
if now < period_end {
return Ok(());
}
let plans = self.repo.list_plans().await?;
let plan_amount_map: HashMap<String, i64> =
plans.into_iter().map(|p| (p.id, p.sats_per_month)).collect();
let usage_events = self.repo.list_activity(&tenant.billing_anchor, Some(&tenant.pubkey)).await?;
let invoice_id = uuid::Uuid::new_v4().to_string();
let mut items = Vec::new();
let events = self
.repo
.list_lifecycle_events_for_tenant(&tenant.pubkey, dt_to_ts(period_end))
.await?;
for relay in active_paid_relays {
let hours = relay_active_hours_in_window(&relay, &usage_events, period_start, period_end);
if hours <= 0 {
continue;
}
let plan_monthly = self.repo.get_relay_plan_amount_sats(&relay.plan).await?;
if plan_monthly <= 0 {
continue;
}
let invoice_id = Uuid::new_v4().to_string();
let items = compute_invoice_items(
&invoice_id,
&events,
&plan_amount_map,
period_start,
period_end,
);
let sats = ((plan_monthly as f64 / 30.0 / 24.0) * hours as f64).ceil() as i64;
if sats <= 0 {
continue;
}
let total: i64 = items.iter().map(|i| i.amount).sum();
items.push(InvoiceItem {
id: uuid::Uuid::new_v4().to_string(),
invoice: invoice_id.clone(),
relay: relay.id,
sats,
});
}
let total: i64 = items.iter().map(|i| i.sats).sum();
if total == 0 {
return Ok(());
}
let bolt11 = self.make_bolt11(total).await.unwrap_or_default();
let invoice = Invoice {
id: invoice_id.clone(),
tenant: tenant.pubkey.clone(),
amount: total,
status: "pending".to_string(),
created_at: dt_to_ts(now),
bolt11,
period_start: dt_to_ts(period_start),
period_end: dt_to_ts(period_end),
};
let created = self
.repo
.create_invoice_with_items(&invoice, &items)
.await?;
if created {
tracing::info!(tenant = %tenant.pubkey, invoice = %invoice_id, amount = total, "invoice generated");
}
Ok(())
}
// ── collection ────────────────────────────────────────────────────────────
async fn collect_outstanding(&self, tenant: &Tenant) -> Result<()> {
let invoices = self.repo.list_invoices_by_tenant(&tenant.pubkey).await?;
let unpaid: Vec<&Invoice> = invoices
.iter()
.filter(|inv| matches!(inv.status.as_str(), "pending" | "past_due"))
.collect();
if unpaid.is_empty() {
return Ok(());
}
for invoice in &unpaid {
self.attempt_collection(tenant, invoice).await?;
}
// Re-fetch to check if all are now paid; auto-reactivate if so
let invoices_after = self.repo.list_invoices_by_tenant(&tenant.pubkey).await?;
let still_unpaid = invoices_after
.iter()
.any(|inv| matches!(inv.status.as_str(), "pending" | "past_due"));
if !still_unpaid && tenant.status == "suspended" {
let now = now_ts();
self.repo
.update_tenant_status(&tenant.pubkey, "active")
.await?;
self.repo
.reactivate_relays_for_tenant(&tenant.pubkey, now)
.await?;
tracing::info!(tenant = %tenant.pubkey, "tenant reactivated after full balance payment");
}
Ok(())
}
async fn attempt_collection(&self, tenant: &Tenant, invoice: &Invoice) -> Result<()> {
let now = Utc::now();
let created_at = ts_to_dt(invoice.created_at)?;
let due_at = created_at + chrono::Duration::days(DUE_DAYS);
let grace_ends_at = due_at + chrono::Duration::days(GRACE_DAYS);
// Deactivate after grace period expires
if now > grace_ends_at && invoice.status != "past_due" {
let ts = now_ts();
self.repo
.update_tenant_status(&tenant.pubkey, "suspended")
.await?;
self.repo
.suspend_relays_for_tenant(&tenant.pubkey, ts)
.await?;
self.repo
.record_attempt(
&InvoiceAttempt {
id: Uuid::new_v4().to_string(),
invoice: invoice.id.clone(),
run_id: Uuid::new_v4().to_string(),
method: "system".to_string(),
outcome: "failed".to_string(),
error: "grace period expired".to_string(),
created_at: ts,
},
"past_due",
)
.await?;
return Ok(());
}
// Only retry once per 24h
let attempts = self.repo.list_attempts_for_invoice(&invoice.id).await?;
if let Some(last) = attempts.last()
&& last.method != "nip17_dm"
{
let last_at = ts_to_dt(last.created_at)?;
if now - last_at < chrono::Duration::hours(24) {
let bolt11 = match self.make_bolt11(total).await {
Ok(v) => v,
Err(e) => {
tracing::error!(tenant = %tenant.pubkey, error = %e, "bolt11 generation failed");
return Ok(());
}
}
};
let run_id = Uuid::new_v4().to_string();
let invoice = Invoice {
id: invoice_id,
tenant: tenant.pubkey.clone(),
status: "pending".to_string(),
created_at: now.timestamp(),
attempted_at: 0,
error: String::new(),
closed_at: 0,
sent_at: 0,
paid_at: 0,
bolt11,
period_start: period_start.timestamp(),
period_end: period_end.timestamp(),
};
// 1. Try NWC
if !tenant.nwc_url.trim().is_empty() {
match self
.pay_via_nwc(&tenant.nwc_url, &invoice.bolt11)
.await
{
Ok(()) => {
self.repo
.record_attempt(
&attempt(&invoice.id, &run_id, "nwc", "success", ""),
"paid",
)
.await?;
return Ok(());
}
Err(err) => {
tracing::warn!(tenant = %tenant.pubkey, error = %err, "NWC payment failed");
self.repo
.record_attempt(
&attempt(&invoice.id, &run_id, "nwc", "failed", &err.to_string()),
&invoice.status,
)
.await?;
self.repo.create_invoice(&invoice, &items).await?;
Ok(())
}
async fn collect_outstanding(&self, tenant: &Tenant) -> Result<()> {
let invoices = self.repo.list_invoices(Some(&tenant.pubkey)).await?;
let now = now_ts();
for invoice in invoices.into_iter().filter(|i| i.status == "pending") {
if invoice.attempted_at > 0 && now - invoice.attempted_at < 24 * 3600 {
continue;
}
if self.is_bolt11_paid(&invoice.bolt11).await {
self.repo.mark_invoice_paid(&invoice.id).await?;
continue;
}
let mut collected = false;
if !tenant.nwc_url.trim().is_empty() && self.pay_invoice_nwc(&tenant.nwc_url, &invoice.bolt11).await {
self.repo.mark_invoice_paid(&invoice.id).await?;
collected = true;
}
if !collected {
self.repo
.mark_invoice_attempted(&invoice.id, Some("autopay failed or unavailable"))
.await?;
if invoice.sent_at == 0 {
let amount: i64 = self
.repo
.get_invoice_items(&invoice.id)
.await?
.into_iter()
.map(|i| i.sats)
.sum();
let message = format!(
"Invoice {} is due. Amount: {} sats\n{}",
invoice.id, amount, invoice.bolt11
);
if self.robot.send_dm(&tenant.pubkey, &message).await.is_ok() {
self.repo.mark_invoice_sent(&invoice.id).await?;
}
}
}
}
// 2. Try Stripe
if !tenant.stripe_subscription_id.trim().is_empty() {
match self.pay_via_stripe(tenant, invoice).await {
Ok(()) => {
self.repo
.record_attempt(
&attempt(&invoice.id, &run_id, "stripe", "success", ""),
"paid",
)
.await?;
return Ok(());
}
Err(err) => {
tracing::warn!(tenant = %tenant.pubkey, error = %err, "Stripe payment failed");
self.repo
.record_attempt(
&attempt(&invoice.id, &run_id, "stripe", "failed", &err.to_string()),
&invoice.status,
)
.await?;
}
}
}
// 3. Fallback: Lightning invoice shown in-app; send one DM if no auto-pay configured
let dm_sent = self.repo.invoice_dm_sent(&invoice.id).await?;
if !dm_sent {
match self
.send_invoice_dm(tenant, invoice, invoice.bolt11.as_str())
.await
{
Ok(()) => {
self.repo
.record_attempt(
&attempt(&invoice.id, &run_id, "nip17_dm", "sent", ""),
&invoice.status,
)
.await?;
}
Err(err) => {
tracing::warn!(tenant = %tenant.pubkey, error = %err, "NIP-17 DM failed");
}
if now - invoice.created_at >= 7 * 24 * 3600 {
self.repo.mark_invoice_closed(&invoice.id).await?;
}
}
Ok(())
}
// ── payment providers ─────────────────────────────────────────────────────
async fn make_bolt11(&self, amount_sats: i64) -> Result<String> {
if self.platform_nwc_url.trim().is_empty() {
return Err(anyhow!("NWC_URL is required to generate invoices"));
if self.nwc_url.trim().is_empty() {
anyhow::bail!("NWC_URL not configured")
}
let uri = NostrWalletConnectURI::parse(&self.platform_nwc_url)?;
let req = nip47::Request::make_invoice(MakeInvoiceRequest {
amount: (amount_sats as u64) * 1_000,
description: Some("Relay hosting".to_string()),
description_hash: None,
expiry: None,
});
let uri = nostr_sdk::nips::nip47::NostrWalletConnectURI::parse(&self.nwc_url)?;
let req = nostr_sdk::nips::nip47::Request::make_invoice(
nostr_sdk::nips::nip47::MakeInvoiceRequest {
amount: (amount_sats as u64) * 1_000,
description: Some("Caravel relay invoice".to_string()),
description_hash: None,
expiry: None,
},
);
let resp = self.send_nwc_request(&uri, req).await?;
Ok(resp.to_make_invoice()?.invoice)
}
async fn pay_via_nwc(&self, nwc_url: &str, bolt11: &str) -> Result<()> {
let uri = NostrWalletConnectURI::parse(nwc_url)?;
let req = nip47::Request::pay_invoice(PayInvoiceRequest::new(bolt11));
self.send_nwc_request(&uri, req).await?.to_pay_invoice()?;
Ok(())
async fn is_bolt11_paid(&self, _bolt11: &str) -> bool {
false
}
async fn pay_via_stripe(&self, _tenant: &Tenant, _invoice: &Invoice) -> Result<()> {
// TODO: implement Stripe off-session charge using tenant.stripe_subscription_id
Err(anyhow!("Stripe not yet implemented"))
async fn pay_invoice_nwc(&self, nwc_url: &str, bolt11: &str) -> bool {
let uri = match nostr_sdk::nips::nip47::NostrWalletConnectURI::parse(nwc_url) {
Ok(v) => v,
Err(_) => return false,
};
let req = nostr_sdk::nips::nip47::Request::pay_invoice(
nostr_sdk::nips::nip47::PayInvoiceRequest::new(bolt11),
);
self.send_nwc_request(&uri, req)
.await
.and_then(|r| r.to_pay_invoice().map(|_| ()).map_err(anyhow::Error::from))
.is_ok()
}
async fn send_nwc_request(
&self,
uri: &NostrWalletConnectURI,
request: nip47::Request,
) -> Result<nip47::Response> {
uri: &nostr_sdk::nips::nip47::NostrWalletConnectURI,
request: nostr_sdk::nips::nip47::Request,
) -> Result<nostr_sdk::nips::nip47::Response> {
use nostr_sdk::{Client, Filter, Kind, Keys, Timestamp};
let app_keys = Keys::new(uri.secret.clone());
let app_pubkey = app_keys.public_key();
let client = Client::new(app_keys);
@@ -327,183 +274,119 @@ impl BillingService {
.pubkey(app_pubkey)
.since(started_at);
let events = client.fetch_events(filter, Duration::from_secs(10)).await?;
let events = client
.fetch_events(filter, std::time::Duration::from_secs(10))
.await?;
let event = events
.into_iter()
.max_by_key(|e| e.created_at)
.ok_or_else(|| anyhow!("no NWC response received"))?;
.ok_or_else(|| anyhow::anyhow!("no NWC response received"))?;
Ok(nip47::Response::from_event(uri, &event)?)
}
async fn send_invoice_dm(
&self,
tenant: &Tenant,
invoice: &Invoice,
bolt11: &str,
) -> Result<()> {
let due_date = ts_to_dt(invoice.created_at + DUE_DAYS * 86400)?;
let period_start = ts_to_dt(invoice.period_start)?;
let period_end = ts_to_dt(invoice.period_end)?;
let message = format!(
"You have an outstanding invoice of {} sats due by {}.\n\
Period: {}{}\n\
Pay with Lightning:\n{}",
invoice.amount,
due_date.format("%Y-%m-%d"),
period_start.format("%Y-%m-%d"),
period_end.format("%Y-%m-%d"),
bolt11,
);
self.notifier.send(&tenant.pubkey, &message).await
Ok(nostr_sdk::nips::nip47::Response::from_event(uri, &event)?)
}
}
// ── billing math ──────────────────────────────────────────────────────────────
fn now_ts() -> i64 {
Utc::now().timestamp()
}
/// Given a billing anchor and the current time, return the current billing
/// period [start, end) based on rolling monthly windows from the anchor.
fn current_billing_period(
anchor: DateTime<Utc>,
now: DateTime<Utc>,
) -> (DateTime<Utc>, DateTime<Utc>) {
let mut period_start = anchor;
fn ts_to_dt(ts: i64) -> Result<DateTime<Utc>> {
Utc.timestamp_opt(ts, 0)
.single()
.ok_or_else(|| anyhow::anyhow!("invalid unix timestamp"))
}
fn billing_window(anchor: DateTime<Utc>, now: DateTime<Utc>) -> (DateTime<Utc>, DateTime<Utc>) {
let mut start = anchor;
loop {
let period_end = period_start + Months::new(1);
if now < period_end {
return (period_start, period_end);
let end = start + Months::new(1);
if now < end {
return (start, end);
}
period_start = period_end;
start = end;
}
}
/// Compute per-relay billable sats for a billing period from the lifecycle
/// event log. Rules:
/// - Billing starts at `provisioned`, pauses at `suspended`, resumes at
/// `unsuspended`, stops at `deactivated`.
/// - Only time within [period_start, period_end) counts.
/// - Round each relay's total billable seconds up to the next full hour.
/// - Minimum 1 billable hour per relay per period.
/// - Amount is based on the relay's current plan amount (retroactive within period).
fn compute_invoice_items(
invoice_id: &str,
events: &[RelayLifecycleEvent],
plan_amount_map: &HashMap<String, i64>,
period_start: DateTime<Utc>,
period_end: DateTime<Utc>,
) -> Vec<InvoiceItem> {
// Group events by relay, preserving sort order from the DB (relay, created_at, id)
let mut by_relay: HashMap<&str, Vec<&RelayLifecycleEvent>> = HashMap::new();
for event in events {
by_relay.entry(&event.relay).or_default().push(event);
}
let mut items = Vec::new();
for (relay_id, relay_events) in &by_relay {
// Use the latest plan for this relay (retroactive rate within period)
let Some(latest_event) = relay_events.last() else {
continue;
};
let plan_amount = *plan_amount_map.get(latest_event.plan.as_str()).unwrap_or(&0);
if plan_amount == 0 {
continue;
}
let billable_secs = billable_seconds_in_period(relay_events, period_start, period_end);
if billable_secs == 0 {
continue;
}
// Round up to next full hour, minimum 1 hour
let hours = ((billable_secs as f64) / 3600.0).ceil().max(1.0) as i64;
items.push(InvoiceItem {
id: Uuid::new_v4().to_string(),
invoice: invoice_id.to_string(),
relay: relay_id.to_string(),
amount: hours * plan_amount,
period_start: dt_to_ts(period_start),
period_end: dt_to_ts(period_end),
});
}
items
}
/// Compute total billable seconds for one relay within [period_start, period_end).
/// Replays the full event history to correctly handle events that precede the period.
fn billable_seconds_in_period(
events: &[&RelayLifecycleEvent],
period_start: DateTime<Utc>,
period_end: DateTime<Utc>,
fn relay_active_hours_in_window(
relay: &Relay,
events: &[Activity],
start: DateTime<Utc>,
end: DateTime<Utc>,
) -> i64 {
let mut total_secs: i64 = 0;
let mut billing_start: Option<DateTime<Utc>> = None;
if relay.plan == "free" {
return 0;
}
let mut marks: HashMap<&str, Vec<&Activity>> = HashMap::new();
for event in events {
let Ok(ts) = ts_to_dt(event.created_at) else {
continue;
};
if event.identifier == relay.id {
marks.entry(&relay.id).or_default().push(event);
}
}
match event.event_type.as_str() {
"provisioned" | "unsuspended" => {
if billing_start.is_none() {
billing_start = Some(ts.max(period_start));
let Some(entries) = marks.get(relay.id.as_str()) else {
if relay.status == "active" {
return ((end - start).num_seconds() as f64 / 3600.0).ceil() as i64;
}
return 0;
};
let mut active = relay.status == "active";
let mut cursor = start;
let mut secs = 0i64;
for event in entries.iter().copied() {
let ts = match Utc.timestamp_opt(event.created_at, 0).single() {
Some(v) => v,
None => continue,
};
if ts <= start || ts >= end {
continue;
}
match event.activity_type.as_str() {
"relay_created" | "relay_activated" => {
if !active {
active = true;
cursor = ts;
}
}
"suspended" | "deactivated" => {
if let Some(start) = billing_start.take() {
let end = ts.min(period_end);
if end > start {
total_secs += (end - start).num_seconds();
}
"relay_deactivated" | "relay_sync_failed" => {
if active {
active = false;
secs += (ts - cursor).num_seconds().max(0);
}
}
_ => {}
}
}
// Still billing at period end
if let Some(start) = billing_start
&& period_end > start
{
total_secs += (period_end - start).num_seconds();
if active {
secs += (end - cursor).num_seconds().max(0);
}
total_secs
let hours = (secs as f64 / 3600.0).ceil() as i64;
if hours > 0 { hours } else { 0 }
}
// ── helpers ───────────────────────────────────────────────────────────────────
pub fn now_ts() -> i64 {
Utc::now().timestamp()
#[allow(dead_code)]
fn _same_month(a: DateTime<Utc>, b: DateTime<Utc>) -> bool {
a.year() == b.year() && a.month() == b.month()
}
fn dt_to_ts(dt: DateTime<Utc>) -> i64 {
dt.timestamp()
#[allow(dead_code)]
fn _days_between(a: i64, b: i64) -> i64 {
let da = Utc.timestamp_opt(a, 0).single().unwrap_or_else(Utc::now);
let db = Utc.timestamp_opt(b, 0).single().unwrap_or_else(Utc::now);
(db - da).num_days()
}
fn ts_to_dt(ts: i64) -> Result<DateTime<Utc>> {
Utc.timestamp_opt(ts, 0)
.single()
.ok_or_else(|| anyhow!("invalid unix timestamp: {ts}"))
#[allow(dead_code)]
fn _hours_between(a: DateTime<Utc>, b: DateTime<Utc>) -> i64 {
((b - a).num_seconds() as f64 / 3600.0).ceil() as i64
}
fn attempt(
invoice_id: &str,
run_id: &str,
method: &str,
outcome: &str,
error: &str,
) -> InvoiceAttempt {
InvoiceAttempt {
id: Uuid::new_v4().to_string(),
invoice: invoice_id.to_string(),
run_id: run_id.to_string(),
method: method.to_string(),
outcome: outcome.to_string(),
error: error.to_string(),
created_at: now_ts(),
}
#[allow(dead_code)]
fn _next_day(dt: DateTime<Utc>) -> DateTime<Utc> {
dt + Duration::days(1)
}