From 65dfcaeb6c08225e59ff9862d130099aa026415f Mon Sep 17 00:00:00 2001 From: Jon Staab Date: Tue, 7 Apr 2026 11:21:40 -0700 Subject: [PATCH] Implement stripe subscription sync --- backend/migrations/0001_init.sql | 5 ++- backend/src/api.rs | 45 +++++++++++++++---------- backend/src/billing.rs | 57 ++++++++++++++++++++++++++++++++ backend/src/command.rs | 27 ++++++++++++--- backend/src/main.rs | 4 +++ backend/src/models.rs | 6 +++- backend/src/query.rs | 22 +++++++----- 7 files changed, 135 insertions(+), 31 deletions(-) diff --git a/backend/migrations/0001_init.sql b/backend/migrations/0001_init.sql index 1dd4688..a72f10a 100644 --- a/backend/migrations/0001_init.sql +++ b/backend/migrations/0001_init.sql @@ -10,7 +10,9 @@ CREATE TABLE IF NOT EXISTS activity ( CREATE TABLE IF NOT EXISTS tenant ( pubkey TEXT PRIMARY KEY, nwc_url TEXT NOT NULL DEFAULT '', - created_at INTEGER NOT NULL + created_at INTEGER NOT NULL, + stripe_customer_id TEXT NOT NULL DEFAULT '', + stripe_subscription_id TEXT NOT NULL DEFAULT '' ); CREATE TABLE IF NOT EXISTS relay ( @@ -19,6 +21,7 @@ CREATE TABLE IF NOT EXISTS relay ( schema TEXT NOT NULL, subdomain TEXT NOT NULL UNIQUE, plan TEXT NOT NULL, + stripe_subscription_item_id TEXT, status TEXT NOT NULL, synced INTEGER NOT NULL DEFAULT 0, sync_error TEXT NOT NULL DEFAULT '', diff --git a/backend/src/api.rs b/backend/src/api.rs index 78cef2e..6aa188e 100644 --- a/backend/src/api.rs +++ b/backend/src/api.rs @@ -6,7 +6,7 @@ use axum::{ extract::{Path, State}, http::{HeaderMap, StatusCode}, response::{IntoResponse, Response}, - routing::{get, post, put}, + routing::{get, post}, }; use base64::Engine; use nostr_sdk::{Event, JsonUtil, Kind}; @@ -325,23 +325,33 @@ async fn get_identity( ) -> std::result::Result { let pubkey = state.api.extract_auth_pubkey(&headers)?; let is_admin = state.api.admins.iter().any(|a| a == &pubkey); - let tenant = Tenant { - pubkey: pubkey.clone(), - nwc_url: String::new(), - created_at: now_ts(), - }; - match state.api.command.create_tenant(&tenant).await { - Ok(()) => true, - Err(e) if matches!(map_unique_error(&e), Some("pubkey-exists")) => true, - Err(e) => { - return Ok(err( - StatusCode::INTERNAL_SERVER_ERROR, - "internal", - &e.to_string(), - )); - } - }; + // Only create if tenant doesn't exist yet + if let Ok(None) = state.api.query.get_tenant(&pubkey).await { + // TODO: Call Stripe API to create customer and subscription + let stripe_customer_id = String::new(); + let stripe_subscription_id = String::new(); + + let tenant = Tenant { + pubkey: pubkey.clone(), + nwc_url: String::new(), + created_at: now_ts(), + stripe_customer_id, + stripe_subscription_id, + }; + + match state.api.command.create_tenant(&tenant).await { + Ok(()) => {} + Err(e) if matches!(map_unique_error(&e), Some("pubkey-exists")) => {} + Err(e) => { + return Ok(err( + StatusCode::INTERNAL_SERVER_ERROR, + "internal", + &e.to_string(), + )); + } + }; + } Ok(ok( StatusCode::OK, @@ -472,6 +482,7 @@ async fn create_relay( schema: String::new(), subdomain: payload.subdomain, plan: payload.plan, + stripe_subscription_item_id: None, status: "active".to_string(), sync_error: String::new(), info_name: payload.info_name.unwrap_or_default(), diff --git a/backend/src/billing.rs b/backend/src/billing.rs index ae29e47..240d085 100644 --- a/backend/src/billing.rs +++ b/backend/src/billing.rs @@ -1,6 +1,7 @@ use anyhow::Result; use crate::command::Command; +use crate::models::Activity; use crate::query::Query; use crate::robot::Robot; @@ -23,6 +24,62 @@ impl Billing { } } + pub async fn start(self) { + let mut rx = self.command.notify.subscribe(); + + loop { + match rx.recv().await { + Ok(activity) => { + if let Err(e) = self.handle_activity(&activity).await { + tracing::error!(error = %e, "billing handle_activity failed"); + } + } + Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => { + tracing::warn!(missed = n, "billing lagged"); + } + Err(tokio::sync::broadcast::error::RecvError::Closed) => break, + } + } + } + + async fn handle_activity(&self, activity: &Activity) -> Result<()> { + let needs_billing_sync = matches!( + activity.activity_type.as_str(), + "create_relay" | "update_relay" | "activate_relay" | "deactivate_relay" + | "fail_relay_sync" | "complete_relay_sync" + ); + + if needs_billing_sync { + self.sync_relay_subscription_item(activity).await?; + } + + Ok(()) + } + + async fn sync_relay_subscription_item(&self, activity: &Activity) -> Result<()> { + let Some(relay) = self.query.get_relay(&activity.resource_id).await? else { + return Ok(()); + }; + + let should_delete = !relay.sync_error.is_empty() + || relay.synced == 0 + || relay.plan == "free" + || relay.status == "inactive"; + + if should_delete { + if relay.stripe_subscription_item_id.is_some() { + // TODO: Delete subscription item via Stripe API + self.command.delete_relay_subscription_item(&relay.id).await?; + } + } else { + // TODO: Create or update subscription item via Stripe API + // let stripe_subscription_item_id = ...; + // self.command.set_relay_subscription_item(&relay.id, &stripe_subscription_item_id).await?; + } + + Ok(()) + } + pub async fn deactivate_relay(&self, relay_id: &str) -> Result<()> { let relay = self .query diff --git a/backend/src/command.rs b/backend/src/command.rs index caabab9..265139a 100644 --- a/backend/src/command.rs +++ b/backend/src/command.rs @@ -67,12 +67,14 @@ impl Command { let mut tx = self.pool.begin().await?; sqlx::query( - "INSERT INTO tenant (pubkey, nwc_url, created_at) - VALUES (?, ?, ?)", + "INSERT INTO tenant (pubkey, nwc_url, created_at, stripe_customer_id, stripe_subscription_id) + VALUES (?, ?, ?, ?, ?)", ) .bind(&tenant.pubkey) .bind(&tenant.nwc_url) .bind(tenant.created_at) + .bind(&tenant.stripe_customer_id) + .bind(&tenant.stripe_subscription_id) .execute(&mut *tx) .await?; @@ -209,7 +211,7 @@ impl Command { pub async fn fail_relay_sync(&self, relay: &Relay, sync_error: String) -> Result<()> { let mut tx = self.pool.begin().await?; - sqlx::query("UPDATE relay SET status = 'inactive', sync_error = ? WHERE id = ?") + sqlx::query("UPDATE relay SET sync_error = ? WHERE id = ?") .bind(&sync_error) .bind(&relay.id) .execute(&mut *tx) @@ -225,7 +227,7 @@ impl Command { pub async fn complete_relay_sync(&self, relay_id: &str) -> Result<()> { let mut tx = self.pool.begin().await?; - sqlx::query("UPDATE relay SET synced = 1, status = 'active', sync_error = '' WHERE id = ?") + sqlx::query("UPDATE relay SET synced = 1, sync_error = '' WHERE id = ?") .bind(relay_id) .execute(&mut *tx) .await?; @@ -236,4 +238,21 @@ impl Command { self.emit(activity); Ok(()) } + + pub async fn delete_relay_subscription_item(&self, relay_id: &str) -> Result<()> { + sqlx::query("UPDATE relay SET stripe_subscription_item_id = NULL WHERE id = ?") + .bind(relay_id) + .execute(&self.pool) + .await?; + Ok(()) + } + + pub async fn set_relay_subscription_item(&self, relay_id: &str, stripe_subscription_item_id: &str) -> Result<()> { + sqlx::query("UPDATE relay SET stripe_subscription_item_id = ? WHERE id = ?") + .bind(stripe_subscription_item_id) + .bind(relay_id) + .execute(&self.pool) + .await?; + Ok(()) + } } diff --git a/backend/src/main.rs b/backend/src/main.rs index 1174a4b..336e372 100644 --- a/backend/src/main.rs +++ b/backend/src/main.rs @@ -64,6 +64,10 @@ async fn main() -> Result<()> { infra.start().await; }); + tokio::spawn(async move { + billing.start().await; + }); + let listener = tokio::net::TcpListener::bind(format!("{host}:{port}")).await?; axum::serve(listener, app).await?; Ok(()) diff --git a/backend/src/models.rs b/backend/src/models.rs index 6cc6a03..e5b4311 100644 --- a/backend/src/models.rs +++ b/backend/src/models.rs @@ -14,10 +14,11 @@ pub struct Activity { pub struct Plan { pub id: String, pub name: String, - pub sats: i64, + pub amount: i64, pub members: Option, pub blossom: bool, pub livekit: bool, + pub stripe_price_id: String, } #[derive(Debug, Clone, Serialize, Deserialize, sqlx::FromRow)] @@ -26,6 +27,8 @@ pub struct Tenant { #[serde(skip_serializing)] pub nwc_url: String, pub created_at: i64, + pub stripe_customer_id: String, + pub stripe_subscription_id: String, } #[derive(Debug, Clone, Serialize, Deserialize, sqlx::FromRow)] @@ -35,6 +38,7 @@ pub struct Relay { pub schema: String, pub subdomain: String, pub plan: String, + pub stripe_subscription_item_id: Option, pub status: String, pub sync_error: String, pub info_name: String, diff --git a/backend/src/query.rs b/backend/src/query.rs index 4ff4199..cad61c0 100644 --- a/backend/src/query.rs +++ b/backend/src/query.rs @@ -15,7 +15,7 @@ impl Query { pub async fn list_tenants(&self) -> Result> { let rows = sqlx::query_as::<_, Tenant>( - "SELECT pubkey, nwc_url, created_at + "SELECT pubkey, nwc_url, created_at, stripe_customer_id, stripe_subscription_id FROM tenant ORDER BY pubkey", ) @@ -26,7 +26,7 @@ impl Query { pub async fn get_tenant(&self, pubkey: &str) -> Result> { let row = sqlx::query_as::<_, Tenant>( - "SELECT pubkey, nwc_url, created_at + "SELECT pubkey, nwc_url, created_at, stripe_customer_id, stripe_subscription_id FROM tenant WHERE pubkey = ?", ) @@ -41,33 +41,37 @@ impl Query { Plan { id: "free".to_string(), name: "Free".to_string(), - sats: 0, + amount: 0, members: Some(10), blossom: false, livekit: false, + stripe_price_id: String::new(), }, Plan { id: "basic".to_string(), name: "Basic".to_string(), - sats: 10_000, + amount: 500, members: Some(100), blossom: true, livekit: true, + stripe_price_id: std::env::var("STRIPE_PRICE_BASIC").unwrap_or_default(), }, Plan { id: "growth".to_string(), name: "Growth".to_string(), - sats: 50_000, + amount: 2500, members: None, blossom: true, livekit: true, + stripe_price_id: std::env::var("STRIPE_PRICE_GROWTH").unwrap_or_default(), }, ] } pub async fn list_relays(&self) -> Result> { let rows = sqlx::query_as::<_, Relay>( - "SELECT id, tenant, schema, subdomain, plan, status, sync_error, + "SELECT id, tenant, schema, subdomain, plan, stripe_subscription_item_id, + status, sync_error, info_name, info_icon, info_description, policy_public_join, policy_strip_signatures, groups_enabled, management_enabled, blossom_enabled, @@ -82,7 +86,8 @@ impl Query { pub async fn list_relays_for_tenant(&self, tenant_id: &str) -> Result> { let rows = sqlx::query_as::<_, Relay>( - "SELECT id, tenant, schema, subdomain, plan, status, sync_error, + "SELECT id, tenant, schema, subdomain, plan, stripe_subscription_item_id, + status, sync_error, info_name, info_icon, info_description, policy_public_join, policy_strip_signatures, groups_enabled, management_enabled, blossom_enabled, @@ -99,7 +104,8 @@ impl Query { pub async fn get_relay(&self, id: &str) -> Result> { let row = sqlx::query_as::<_, Relay>( - "SELECT id, tenant, schema, subdomain, plan, status, sync_error, + "SELECT id, tenant, schema, subdomain, plan, stripe_subscription_item_id, + status, sync_error, info_name, info_icon, info_description, policy_public_join, policy_strip_signatures, groups_enabled, management_enabled, blossom_enabled,