diff --git a/README.md b/README.md index 0271dc6..67175e2 100644 --- a/README.md +++ b/README.md @@ -111,7 +111,7 @@ When a relay is created, an async worker is spawned that sends the appropriate A ### Billing Logic - Billing is monthly. Invoices batch all of a tenant's relay charges into a single payment. - Tenants can enable **recurring billing** by providing their own NWC URL on the account page. The platform uses this to pull payments automatically. -- If recurring billing is off, invoices are sent via **NIP-17 DMs** (from the platform's Nostr key) when a subscription is due. +- If recurring billing is off, invoices are sent via **NIP-17 DMs** (from the platform's Nostr key) when a subscription is due. If recurring billing is on, still send notifications when a payment is made. - A **7-day grace period** applies before access is restricted for non-payment. ### Environment Variables diff --git a/backend/README.md b/backend/README.md index 963355c..c18b8d7 100644 --- a/backend/README.md +++ b/backend/README.md @@ -35,6 +35,8 @@ Environment variables: | `ZOOID_API_URL` | Zooid API base URL | `http://127.0.0.1:8032` | | `PLATFORM_SECRET` | Platform Nostr secret key for NIP-98 auth | _required_ | | `RELAY_DOMAIN` | Relay base domain for subdomains | `spaces.coracle.social` | +| `NWC_URL` | Platform NWC URL for invoice generation | _required for billing_ | +| `NOSTR_INDEXER_RELAYS` | Comma-separated relays to fetch kind `10050` DM relays | _required for notifications_ | The database directory is created automatically if it doesn’t exist. @@ -71,6 +73,15 @@ NIP-98 verification is implemented in `auth.rs` using the Rust Nostr SDK. It ver This is ready to be used by API routes. +## Billing Jobs + +The backend runs an in-process billing loop that: + +- Generates monthly invoices (using `NWC_URL`) +- Uses the tenant’s `tenant_nwc_url` for recurring pull payments (if set) +- Sends NIP-17 DMs with invoices when recurring is off +- Sends NIP-17 DMs on successful payment when recurring is on + ## API Routes Tenant routes (all require NIP-98 auth; pubkey is inferred from the token): @@ -82,6 +93,7 @@ Tenant routes (all require NIP-98 auth; pubkey is inferred from the token): - `PUT /tenant/relays/:id` — update relay - `DELETE /tenant/relays/:id` — deactivate relay - `GET /tenant/invoices` — list invoices +- `PUT /tenant/billing` — update tenant billing (NWC URL) Admin routes (all require NIP-98 auth; pubkey must be in `HOSTING_ADMIN_PUBKEYS`): @@ -96,3 +108,4 @@ Admin routes (all require NIP-98 auth; pubkey must be in `HOSTING_ADMIN_PUBKEYS` ## Next Steps - Add invoice generation and billing jobs +- On start, publish kind 0, 10002, 10050 to indexer relays based on env vars diff --git a/backend/migrations/0003_add_tenant_nwc_url.sql b/backend/migrations/0003_add_tenant_nwc_url.sql new file mode 100644 index 0000000..9c8f763 --- /dev/null +++ b/backend/migrations/0003_add_tenant_nwc_url.sql @@ -0,0 +1 @@ +ALTER TABLE tenants ADD COLUMN tenant_nwc_url TEXT NOT NULL DEFAULT ""; diff --git a/backend/src/api.rs b/backend/src/api.rs index 03cc94b..2e6b8e7 100644 --- a/backend/src/api.rs +++ b/backend/src/api.rs @@ -31,7 +31,8 @@ pub fn router(state: AppState) -> Router { "/tenant/relays/:id", get(get_tenant_relay).put(update_tenant_relay).delete(deactivate_tenant_relay), ) - .route("/tenant/invoices", get(list_tenant_invoices)); + .route("/tenant/invoices", get(list_tenant_invoices)) + .route("/tenant/billing", put(update_tenant_billing)); let admin_routes = Router::new() .route("/admin/tenants", get(admin_list_tenants)) @@ -114,6 +115,7 @@ async fn get_tenant( let tenant = NewTenant { pubkey: pubkey.clone(), status: "active".to_string(), + tenant_nwc_url: "".to_string(), }; if state.repo.create_tenant(&tenant).await.is_ok() { (StatusCode::OK, Json(tenant)).into_response() @@ -167,6 +169,7 @@ async fn create_tenant_relay( let tenant = NewTenant { pubkey: pubkey.clone(), status: "active".to_string(), + tenant_nwc_url: "".to_string(), }; if let Err(_) = state.repo.create_tenant_if_missing(&tenant).await { @@ -322,6 +325,35 @@ async fn list_tenant_invoices( } } +#[derive(Debug, Deserialize)] +struct UpdateTenantBillingRequest { + tenant_nwc_url: String, +} + +async fn update_tenant_billing( + State(state): State, + headers: HeaderMap, + method: Method, + uri: Uri, + Json(payload): Json, +) -> Response { + let pubkey = match extract_auth_pubkey(&headers, &method, &uri) { + Ok(pubkey) => pubkey, + Err(_) => return unauthorized(), + }; + + if let Err(_) = state + .repo + .update_tenant_nwc_url(&pubkey, &payload.tenant_nwc_url) + .await + { + return (StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError { error: "failed to update billing".into() })) + .into_response(); + } + + (StatusCode::OK, Json(payload)).into_response() +} + async fn admin_list_tenants( State(state): State, headers: HeaderMap, @@ -419,6 +451,7 @@ async fn admin_update_tenant_status( let updated = NewTenant { pubkey: tenant.pubkey, status: payload.status, + tenant_nwc_url: tenant.tenant_nwc_url, }; (StatusCode::OK, Json(updated)).into_response() diff --git a/backend/src/billing.rs b/backend/src/billing.rs new file mode 100644 index 0000000..448bd59 --- /dev/null +++ b/backend/src/billing.rs @@ -0,0 +1,212 @@ +use anyhow::{anyhow, Result}; +use chrono::{DateTime, Months, Utc}; +use tokio::time::{sleep, Duration}; +use uuid::Uuid; + +use crate::models::{Invoice, NewInvoice, NewInvoiceItem, Relay, Tenant}; +use crate::notifications::Nip17Notifier; +use crate::repo::Repo; + +use nostr_sdk::nwc::prelude::*; + +#[derive(Clone)] +pub struct BillingService { + repo: Repo, + notifier: Nip17Notifier, + platform_nwc_url: String, +} + +impl BillingService { + pub fn new( + repo: Repo, + notifier: Nip17Notifier, + platform_nwc_url: String, + ) -> Self { + Self { + repo, + notifier, + platform_nwc_url, + } + } + + pub async fn run(self) { + loop { + if let Err(err) = self.process_once().await { + tracing::error!(error = %err, "billing run failed"); + } + sleep(Duration::from_secs(300)).await; + } + } + + async fn process_once(&self) -> Result<()> { + let tenants = self.repo.list_tenants().await?; + for tenant in tenants { + if let Err(err) = self.bill_tenant(&tenant).await { + tracing::error!(tenant = tenant.pubkey, error = %err, "billing failed"); + } + } + Ok(()) + } + + async fn bill_tenant(&self, tenant: &Tenant) -> Result<()> { + if tenant.status != "active" { + return Ok(()); + } + + let relays = self.repo.list_relays_by_tenant(&tenant.pubkey).await?; + let active_relays = relays + .into_iter() + .filter(|relay| relay.status == "active") + .collect::>(); + + let invoices = self.repo.list_invoices_by_tenant(&tenant.pubkey).await?; + + let (period_start, period_end, should_bill) = next_billing_window(&invoices)?; + if !should_bill { + return Ok(()); + } + + let invoice_id = Uuid::new_v4().to_string(); + let items = build_invoice_items(&invoice_id, &active_relays, period_start, period_end); + let total_amount: i64 = items.iter().map(|item| item.amount).sum(); + + if total_amount == 0 { + return Ok(()); + } + + let invoice_str = self.make_invoice(total_amount).await?; + let invoice = NewInvoice { + id: invoice_id.clone(), + tenant: tenant.pubkey.clone(), + amount: total_amount, + status: "pending".to_string(), + created_at: Utc::now().to_rfc3339(), + invoice: invoice_str.clone(), + }; + + self.repo.create_invoice_with_items(&invoice, &items).await?; + + if tenant.tenant_nwc_url.trim().is_empty() { + self.send_invoice_dm(tenant, &invoice, period_start, period_end) + .await?; + return Ok(()); + } + + match self.pay_invoice(&tenant.tenant_nwc_url, &invoice_str).await { + Ok(()) => { + self.repo.update_invoice_status(&invoice_id, "paid").await?; + self.send_payment_dm(tenant, &invoice).await?; + } + Err(err) => { + tracing::error!(tenant = tenant.pubkey, error = %err, "recurring payment failed"); + } + } + + Ok(()) + } + + async fn make_invoice(&self, amount: i64) -> Result { + if self.platform_nwc_url.trim().is_empty() { + return Err(anyhow!("NWC_URL is required to generate invoices")); + } + + let uri = NostrWalletConnectURI::parse(&self.platform_nwc_url)?; + let nwc = NWC::new(uri); + let request = MakeInvoiceRequest::new(amount as u64, "Relay hosting"); + let response = nwc.make_invoice(request).await?; + Ok(response.invoice) + } + + async fn pay_invoice(&self, tenant_nwc_url: &str, invoice: &str) -> Result<()> { + let uri = NostrWalletConnectURI::parse(tenant_nwc_url)?; + let nwc = NWC::new(uri); + let request = PayInvoiceRequest::new(invoice); + nwc.pay_invoice(request).await?; + Ok(()) + } + + async fn send_invoice_dm( + &self, + tenant: &Tenant, + invoice: &NewInvoice, + period_start: DateTime, + period_end: DateTime, + ) -> Result<()> { + let message = format!( + "Invoice due: {} sats\nPeriod: {} - {}\nInvoice: {}", + invoice.amount, + period_start.to_rfc3339(), + period_end.to_rfc3339(), + invoice.invoice + ); + self.notifier.send(&tenant.pubkey, &message).await + } + + async fn send_payment_dm(&self, tenant: &Tenant, invoice: &NewInvoice) -> Result<()> { + let message = format!( + "Payment received: {} sats\nInvoice ID: {}", + invoice.amount, invoice.id + ); + self.notifier.send(&tenant.pubkey, &message).await + } +} + +fn next_billing_window(invoices: &[Invoice]) -> Result<(DateTime, DateTime, bool)> { + let now = Utc::now(); + if invoices.is_empty() { + let end = now + Months::new(1); + return Ok((now, end, true)); + } + + let last = &invoices[0]; + if last.status == "pending" { + return Ok((now, now, false)); + } + + let last_created = parse_timestamp(&last.created_at)?; + let next_due = last_created + Months::new(1); + if now < next_due { + return Ok((now, next_due, false)); + } + + Ok((last_created, next_due, true)) +} + +fn parse_timestamp(value: &str) -> Result> { + let parsed = DateTime::parse_from_rfc3339(value) + .map_err(|e| anyhow!("invalid timestamp {value}: {e}"))?; + Ok(parsed.with_timezone(&Utc)) +} + +fn build_invoice_items( + invoice_id: &str, + relays: &[Relay], + period_start: DateTime, + period_end: DateTime, +) -> Vec { + relays + .iter() + .filter_map(|relay| { + let amount = plan_amount(&relay.plan); + if amount == 0 { + return None; + } + Some(NewInvoiceItem { + id: Uuid::new_v4().to_string(), + invoice: invoice_id.to_string(), + relay: relay.id.clone(), + amount, + period_start: period_start.to_rfc3339(), + period_end: period_end.to_rfc3339(), + }) + }) + .collect() +} + +fn plan_amount(plan: &str) -> i64 { + match plan { + "basic" => 10_000, + "growth" => 50_000, + _ => 0, + } +} diff --git a/backend/src/config.rs b/backend/src/config.rs index 00bcaeb..f46f4b5 100644 --- a/backend/src/config.rs +++ b/backend/src/config.rs @@ -9,6 +9,8 @@ pub struct Config { pub zooid_api_url: String, pub platform_secret: String, pub relay_domain: String, + pub platform_nwc_url: String, + pub indexer_relays: Vec, } impl Config { @@ -31,6 +33,13 @@ impl Config { let platform_secret = env::var("PLATFORM_SECRET").unwrap_or_default(); let relay_domain = env::var("RELAY_DOMAIN").unwrap_or_else(|_| "spaces.coracle.social".to_string()); + let platform_nwc_url = env::var("NWC_URL").unwrap_or_default(); + let indexer_relays = env::var("NOSTR_INDEXER_RELAYS") + .unwrap_or_default() + .split(',') + .map(|v| v.trim().to_string()) + .filter(|v| !v.is_empty()) + .collect::>(); Self { database_url, @@ -40,6 +49,8 @@ impl Config { zooid_api_url, platform_secret, relay_domain, + platform_nwc_url, + indexer_relays, } } } diff --git a/backend/src/main.rs b/backend/src/main.rs index 22fa38c..84443df 100644 --- a/backend/src/main.rs +++ b/backend/src/main.rs @@ -1,8 +1,10 @@ mod api; mod auth; +mod billing; mod config; mod db; mod models; +mod notifications; mod provisioning; mod repo; @@ -16,6 +18,8 @@ use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; use crate::config::Config; use crate::db::init_pool; +use crate::billing::BillingService; +use crate::notifications::Nip17Notifier; use crate::provisioning::Provisioner; use crate::repo::Repo; @@ -31,6 +35,13 @@ async fn main() -> Result<()> { let pool = init_pool(&config.database_url).await?; let repo = Repo::new(pool); + let notifier = Nip17Notifier::new(config.platform_secret.clone(), config.indexer_relays.clone()).await?; + let billing = BillingService::new( + repo.clone(), + notifier, + config.platform_nwc_url.clone(), + ); + tokio::spawn(billing.run()); let provisioner = Provisioner::new( config.zooid_api_url.clone(), config.relay_domain.clone(), diff --git a/backend/src/models.rs b/backend/src/models.rs index 18f885b..96b1896 100644 --- a/backend/src/models.rs +++ b/backend/src/models.rs @@ -4,12 +4,14 @@ use serde::{Deserialize, Serialize}; pub struct Tenant { pub pubkey: String, pub status: String, + pub tenant_nwc_url: String, } #[derive(Debug, Clone, Serialize, Deserialize)] pub struct NewTenant { pub pubkey: String, pub status: String, + pub tenant_nwc_url: String, } #[derive(Debug, Clone, Serialize, Deserialize, sqlx::FromRow)] diff --git a/backend/src/notifications.rs b/backend/src/notifications.rs new file mode 100644 index 0000000..f7dbf50 --- /dev/null +++ b/backend/src/notifications.rs @@ -0,0 +1,106 @@ +use anyhow::{anyhow, Result}; +use nostr_sdk::prelude::*; +use std::collections::HashMap; +use std::sync::Arc; +use std::time::{Duration, Instant}; +use tokio::sync::Mutex; + +#[derive(Clone)] +pub struct Nip17Notifier { + keys: Keys, + indexer_client: Client, + indexer_enabled: bool, + cache: Arc>>, +} + +impl Nip17Notifier { + pub async fn new(platform_secret: String, relays: Vec) -> Result { + if platform_secret.trim().is_empty() { + return Err(anyhow!("PLATFORM_SECRET is required for NIP-17 notifications")); + } + + let keys = Keys::parse(platform_secret)?; + let indexer_client = Client::new(keys.clone()); + + for relay in &relays { + indexer_client.add_relay(relay).await?; + } + + let indexer_enabled = !relays.is_empty(); + if indexer_enabled { + indexer_client.connect().await; + } + + Ok(Self { + keys, + indexer_client, + indexer_enabled, + cache: Arc::new(Mutex::new(HashMap::new())), + }) + } + + pub async fn send(&self, recipient: &str, message: &str) -> Result<()> { + if !self.indexer_enabled { + return Ok(()); + } + + let relays = self.fetch_dm_relays(recipient).await?; + if relays.is_empty() { + return Ok(()); + } + + let pubkey = PublicKey::parse(recipient)?; + let client = Client::new(self.keys.clone()); + for relay in relays { + client.add_relay(relay).await?; + } + client.connect().await; + client.send_private_msg(pubkey, message, []).await?; + Ok(()) + } + + async fn fetch_dm_relays(&self, recipient: &str) -> Result> { + let mut cache = self.cache.lock().await; + if let Some(entry) = cache.get(recipient) { + if entry.fetched_at.elapsed() < Duration::from_secs(300) { + return Ok(entry.relays.clone()); + } + } + + let pubkey = PublicKey::parse(recipient)?; + let filter = Filter::new().kind(Kind::Custom(10050)).author(pubkey); + let events = self + .indexer_client + .get_events_of(vec![filter], Some(Duration::from_secs(5))) + .await?; + + let mut relays = Vec::new(); + if let Some(event) = events.into_iter().max_by_key(|event| event.created_at) { + for tag in event.tags.iter() { + if let Some(first) = tag.as_vec().get(0) { + if first == "relay" { + if let Some(value) = tag.as_vec().get(1) { + relays.push(value.to_string()); + } + } + } + } + } + + cache.insert( + recipient.to_string(), + CacheEntry { + relays: relays.clone(), + fetched_at: Instant::now(), + }, + ); + + Ok(relays) + } +} + +#[derive(Clone)] +struct CacheEntry { + relays: Vec, + fetched_at: Instant, +} diff --git a/backend/src/repo.rs b/backend/src/repo.rs index b76fb12..23354a9 100644 --- a/backend/src/repo.rs +++ b/backend/src/repo.rs @@ -15,21 +15,13 @@ impl Repo { Self { pool } } - pub async fn update_tenant_status(&self, pubkey: &str, status: &str) -> Result<()> { - sqlx::query("UPDATE tenants SET status = ? WHERE pubkey = ?") - .bind(status) - .bind(pubkey) - .execute(&self.pool) - .await?; - Ok(()) - } - pub async fn create_tenant(&self, tenant: &NewTenant) -> Result<()> { sqlx::query( - "INSERT INTO tenants (pubkey, status) VALUES (?, ?)", + "INSERT INTO tenants (pubkey, status, tenant_nwc_url) VALUES (?, ?, ?)", ) .bind(&tenant.pubkey) .bind(&tenant.status) + .bind(&tenant.tenant_nwc_url) .execute(&self.pool) .await?; Ok(()) @@ -37,10 +29,11 @@ impl Repo { pub async fn create_tenant_if_missing(&self, tenant: &NewTenant) -> Result<()> { sqlx::query( - "INSERT OR IGNORE INTO tenants (pubkey, status) VALUES (?, ?)", + "INSERT OR IGNORE INTO tenants (pubkey, status, tenant_nwc_url) VALUES (?, ?, ?)", ) .bind(&tenant.pubkey) .bind(&tenant.status) + .bind(&tenant.tenant_nwc_url) .execute(&self.pool) .await?; Ok(()) @@ -48,7 +41,7 @@ impl Repo { pub async fn get_tenant(&self, pubkey: &str) -> Result> { let tenant = sqlx::query_as::<_, Tenant>( - "SELECT pubkey, status FROM tenants WHERE pubkey = ?", + "SELECT pubkey, status, tenant_nwc_url FROM tenants WHERE pubkey = ?", ) .bind(pubkey) .fetch_optional(&self.pool) @@ -58,13 +51,31 @@ impl Repo { pub async fn list_tenants(&self) -> Result> { let tenants = sqlx::query_as::<_, Tenant>( - "SELECT pubkey, status FROM tenants ORDER BY pubkey", + "SELECT pubkey, status, tenant_nwc_url FROM tenants ORDER BY pubkey", ) .fetch_all(&self.pool) .await?; Ok(tenants) } + pub async fn update_tenant_status(&self, pubkey: &str, status: &str) -> Result<()> { + sqlx::query("UPDATE tenants SET status = ? WHERE pubkey = ?") + .bind(status) + .bind(pubkey) + .execute(&self.pool) + .await?; + Ok(()) + } + + pub async fn update_tenant_nwc_url(&self, pubkey: &str, tenant_nwc_url: &str) -> Result<()> { + sqlx::query("UPDATE tenants SET tenant_nwc_url = ? WHERE pubkey = ?") + .bind(tenant_nwc_url) + .bind(pubkey) + .execute(&self.pool) + .await?; + Ok(()) + } + pub async fn create_relay(&self, relay: &NewRelay) -> Result<()> { sqlx::query( "INSERT INTO relays (id, tenant, name, subdomain, schema, icon, description, plan, status) @@ -198,4 +209,13 @@ impl Repo { .await?; Ok(items) } + + pub async fn update_invoice_status(&self, id: &str, status: &str) -> Result<()> { + sqlx::query("UPDATE invoices SET status = ? WHERE id = ?") + .bind(status) + .bind(id) + .execute(&self.pool) + .await?; + Ok(()) + } }