Rework billing

This commit is contained in:
Jon Staab
2026-04-07 14:40:48 -07:00
parent 65dfcaeb6c
commit 0980523a50
33 changed files with 1589 additions and 318 deletions
+2
View File
@@ -28,3 +28,5 @@ LIVEKIT_API_SECRET=
# Billing
NWC_URL= # Nostr Wallet Connect URL for generating Lightning invoices
STRIPE_SECRET_KEY= # Stripe API secret key (sk_...)
STRIPE_WEBHOOK_SECRET= # Secret for verifying Stripe webhook signatures (whsec_...)
+44 -17
View File
@@ -206,11 +206,14 @@ dependencies = [
"chrono",
"dotenvy",
"hex",
"hmac",
"nostr-sdk",
"nwc",
"rand 0.8.5",
"reqwest",
"serde",
"serde_json",
"sha2",
"sqlx",
"tokio",
"tower",
@@ -1293,6 +1296,12 @@ version = "0.4.29"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5e5032e24019045c762d3c0f28f5b6b8bbf38563a65908389bf7978758920897"
[[package]]
name = "lru"
version = "0.16.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a1dc47f592c06f33f8e3aea9591776ec7c9f9e4124778ff8a3c3b87159f7e593"
[[package]]
name = "lru-slab"
version = "0.1.2"
@@ -1370,12 +1379,6 @@ dependencies = [
"tempfile",
]
[[package]]
name = "negentropy"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e664971378a3987224f7a0e10059782035e89899ae403718ee07de85bec42afe"
[[package]]
name = "negentropy"
version = "0.5.0"
@@ -1394,9 +1397,9 @@ dependencies = [
[[package]]
name = "nostr"
version = "0.39.0"
version = "0.44.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d90b55eff1f0747d9e423972179672e1aacac3d3ccee4c1281147eaa90d6491e"
checksum = "3aa5e3b6a278ed061835fe1ee293b71641e6bf8b401cfe4e1834bbf4ef0a34e1"
dependencies = [
"aes",
"base64 0.22.1",
@@ -1407,6 +1410,7 @@ dependencies = [
"chacha20",
"chacha20poly1305",
"getrandom 0.2.17",
"hex",
"instant",
"scrypt",
"secp256k1",
@@ -1418,25 +1422,36 @@ dependencies = [
[[package]]
name = "nostr-database"
version = "0.39.0"
version = "0.44.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ce07b47c77b8e5a856727885fe0ae47b9aa53d8d853a2190dd479b5a0d6e4f52"
checksum = "7462c9d8ae5ef6a28d66a192d399ad2530f1f2130b13186296dbb11bdef5b3d1"
dependencies = [
"lru",
"nostr",
"tokio",
]
[[package]]
name = "nostr-relay-pool"
version = "0.39.0"
name = "nostr-gossip"
version = "0.44.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "211ac5bbdda1a8eec0c21814a838da832038767a5d354fe2fcc1ca438cae56fd"
checksum = "ade30de16869618919c6b5efc8258f47b654a98b51541eb77f85e8ec5e3c83a6"
dependencies = [
"nostr",
]
[[package]]
name = "nostr-relay-pool"
version = "0.44.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4b1073ccfbaea5549fb914a9d52c68dab2aecda61535e5143dd73e95445a804b"
dependencies = [
"async-utility",
"async-wsocket",
"atomic-destructor",
"negentropy 0.3.1",
"negentropy 0.5.0",
"hex",
"lru",
"negentropy",
"nostr",
"nostr-database",
"tokio",
@@ -1445,13 +1460,14 @@ dependencies = [
[[package]]
name = "nostr-sdk"
version = "0.39.0"
version = "0.44.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5baca581deb810a88bb51c54d1d7980f4506a64a3e9a19270829b406e47adf31"
checksum = "471732576710e779b64f04c55e3f8b5292f865fea228436daf19694f0bf70393"
dependencies = [
"async-utility",
"nostr",
"nostr-database",
"nostr-gossip",
"nostr-relay-pool",
"tokio",
"tracing",
@@ -1512,6 +1528,17 @@ dependencies = [
"libm",
]
[[package]]
name = "nwc"
version = "0.44.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f651e3c28dd9da0873151233e804a92b6240a1db1260f3c9d727950adb8e9036"
dependencies = [
"nostr",
"nostr-relay-pool",
"tracing",
]
[[package]]
name = "once_cell"
version = "1.21.3"
+4 -1
View File
@@ -12,13 +12,16 @@ sqlx = { version = "0.7", features = ["runtime-tokio-rustls", "sqlite", "macros"
tokio = { version = "1.36", features = ["full"] }
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
nostr-sdk = { version = "0.39", features = ["nip04", "nip47", "nip59", "nip98"] }
nostr-sdk = { version = "0.44", features = ["nip59", "nip98"] }
nwc = "0.44"
uuid = { version = "1.7", features = ["v4"] }
chrono = { version = "0.4", features = ["serde"] }
tower-http = { version = "0.5", features = ["cors"] }
reqwest = { version = "0.12", features = ["json", "rustls-tls"] }
rand = "0.8"
hex = "0.4"
hmac = "0.12"
sha2 = "0.10"
dotenvy = "0.15.7"
base64 = "0.22"
+3 -1
View File
@@ -10,9 +10,11 @@ CREATE TABLE IF NOT EXISTS activity (
CREATE TABLE IF NOT EXISTS tenant (
pubkey TEXT PRIMARY KEY,
nwc_url TEXT NOT NULL DEFAULT '',
nwc_error TEXT,
created_at INTEGER NOT NULL,
stripe_customer_id TEXT NOT NULL DEFAULT '',
stripe_subscription_id TEXT NOT NULL DEFAULT ''
stripe_subscription_id TEXT,
past_due_at INTEGER
);
CREATE TABLE IF NOT EXISTS relay (
+50 -4
View File
@@ -47,8 +47,8 @@ Notes:
- Serves `GET /identity`
- Authorizes anyone, but must be authorized
- If a tenant for the identity doesn't exist:
- Call the Stripe API to create a new customer and subscription
- Create a new tenant using `command.create_tenant` with payload and stripe info
- Call the Stripe API to create a new customer
- Create a new tenant using `command.create_tenant` with payload and `stripe_customer_id`. No subscription is created yet — that happens when the first relay is added.
- Return `data` is an `Identity` struct
--- Tenant routes
@@ -122,7 +122,7 @@ Notes:
- Serves `POST /relays/:id/deactivate`
- Authorizes admin or relay owner
- If relay is already inactive, return a `400` with `code=relay-is-inactive`
- Call `billing.deactivate_relay`
- Call `command.deactivate_relay`
- Return `data` is empty
## `async fn reactivate_relay(...) -> Response`
@@ -130,9 +130,55 @@ Notes:
- Serves `POST /relays/:id/reactivate`
- Authorizes admin or relay owner
- If relay is already active, return a `400` with `code=relay-is-active`
- Call `billing.reactivate_relay`
- Call `command.activate_relay`
- Return `data` is empty
--- Invoice routes
## `async fn list_tenant_invoices(...) -> Response`
- Serves `GET /tenants/:pubkey/invoices`
- Authorizes admin or matching tenant
- Looks up tenant by pubkey, fetches invoices from Stripe API using `stripe_customer_id`
- Return `data` is a list of Stripe invoice objects: `{ id, status, amount_due, currency, hosted_invoice_url, period_start, period_end }`
## `async fn get_invoice(...) -> Response`
- Serves `GET /invoices/:id`
- Fetches invoice from Stripe API by ID
- Looks up tenant by the invoice's `customer` field, authorizes admin or matching tenant
- Return `data` is a single Stripe invoice object
- If invoice does not exist, return `404` with `code=not-found`
## `async fn get_invoice_bolt11(...) -> Response`
- Serves `GET /invoices/:id/bolt11`
- Fetches invoice from Stripe API by ID
- Looks up tenant by the invoice's `customer` field, authorizes admin or matching tenant
- If invoice `status` is not `open`, return `400` with `code=invoice-not-open`
- Creates a bolt11 Lightning invoice for the invoice's `amount_due` using `billing.create_bolt11(amount_due)`
- Return `data` is `{ bolt11 }`
--- Stripe session route
## `async fn create_stripe_session(...) -> Response`
- Serves `GET /tenants/:pubkey/stripe/session`
- Authorizes admin or matching tenant
- Looks up tenant by pubkey
- Creates a Stripe Customer Portal session for the tenant's `stripe_customer_id`
- Return `data` is `{ url }` — the portal session URL
--- Stripe webhook route
## `async fn stripe_webhook(...) -> Response`
- Serves `POST /stripe/webhook`
- No NIP-98 authentication — uses Stripe signature verification instead
- Reads raw request body and `Stripe-Signature` header
- Calls `billing.handle_webhook(payload, signature)`
- Returns `200` on success, `400` on signature verification failure
--- Utilities
## `extract_auth_pubkey(&self, headers: &HeaderMap) -> Result<String>`
+95 -9
View File
@@ -1,10 +1,11 @@
# `pub struct Billing`
Billing encapsulates logic related to synchronizing state with Stripe.
Billing encapsulates logic related to synchronizing state with Stripe, processing payments via NWC, and managing subscription lifecycle.
Members:
- `nwc_url: String` - a nostr wallet connect URL used to **create** bolt11 invoices
- `nwc_url: String` - a nostr wallet connect URL used to **create** bolt11 invoices (i.e. receive payments), from `NWC_URL`
- `stripe_webhook_secret: String` - secret for verifying Stripe webhook signatures, from `STRIPE_WEBHOOK_SECRET`
- `query: Query`
- `command: Command`
- `robot: Robot`
@@ -15,12 +16,97 @@ Members:
## `pub fn start(&self)`
- Subscribes to `command.notify.notified`
- On `create_relay`, `update_relay`, `activate_relay`, `deactivate_relay`, `fail_relay_sync`, and `complete_relay_sync`, call `self.sync_relay_subscription_item`.
- Subscribes to `command.notify.subscribe()`
- On `create_relay`, `update_relay`, `activate_relay`, `deactivate_relay`, `fail_relay_sync`, and `complete_relay_sync`, call `self.sync_relay_subscription`.
## `pub fn sync_relay_subscription_item(&self, activity: &Activity)`
## `pub fn sync_relay_subscription(&self, activity: &Activity)`
Manages the Stripe subscription and subscription items for a relay's tenant. Only paid (non-free) relays interact with Stripe. Free-only tenants have no subscription. Must be idempotent.
- Fetch the relay and tenant associated with the `activity`
- **If relay plan is `free`**: if the relay has a `stripe_subscription_item_id`, delete it via the Stripe API and call `command.delete_relay_subscription_item`. Then check cleanup (below). Return early.
- **If relay is `inactive`**: if the relay has a `stripe_subscription_item_id`, delete it via the Stripe API and call `command.delete_relay_subscription_item`. Then check cleanup (below). Return early.
- **If relay is `active` and on a paid plan**:
- **Ensure subscription exists**: If the tenant has no `stripe_subscription_id`, create a Stripe subscription for the customer with `collection_method: "charge_automatically"` and the relay's price as the first item. Save the subscription ID via `command.set_tenant_subscription` and the item ID via `command.set_relay_subscription_item`. Return early.
- **Sync the subscription item**: If the tenant already has a subscription, create or update the relay's Stripe subscription item to the plan's `stripe_price_id` via the Stripe API, then call `command.set_relay_subscription_item`.
- **Clean up empty subscription**: After any item deletion, check if the tenant has any remaining active paid relays. If none and the tenant has a `stripe_subscription_id`, cancel the Stripe subscription immediately and call `command.clear_tenant_subscription`.
## `pub fn handle_webhook(&self, payload: &str, signature: &str) -> Result<()>`
- Verify the webhook signature using `self.stripe_webhook_secret`
- Parse the event and dispatch by type:
- `invoice.created` -> `self.handle_invoice_created`
- `invoice.paid` -> `self.handle_invoice_paid`
- `invoice.payment_failed` -> `self.handle_invoice_payment_failed`
- `invoice.overdue` -> `self.handle_invoice_overdue`
- `customer.subscription.updated` -> `self.handle_subscription_updated`
- `customer.subscription.deleted` -> `self.handle_subscription_deleted`
- Unknown event types are ignored (return Ok)
## `pub async fn stripe_list_invoices(&self, customer_id: &str) -> Result<Value>`
- Fetches invoices from Stripe API for the given customer
- Returns the `data` array from the Stripe response
## `pub async fn stripe_get_invoice(&self, invoice_id: &str) -> Result<Value>`
- Fetches a single invoice from Stripe API by ID
- Returns the full Stripe invoice object
## `pub async fn create_bolt11(&self, amount_due_cents: i64) -> Result<String>`
- Creates a bolt11 Lightning invoice for the given amount using the system NWC wallet (`self.nwc_url`)
- Returns the bolt11 invoice string
## `pub async fn stripe_create_portal_session(&self, customer_id: &str) -> Result<String>`
- Creates a Stripe Customer Portal session for the given customer
- Returns the portal session URL
## `fn handle_invoice_created(&self, invoice: &Invoice)`
Attempts to pay a new subscription invoice. Payment priority:
1. **NWC auto-pay**: If the tenant has a `nwc_url`:
- Create a bolt11 Lightning invoice for the invoice amount using `self.nwc_url` (the receiving/system wallet)
- Pay the bolt11 invoice using the tenant's `nwc_url` (the spending/tenant wallet)
- If payment succeeds: call Stripe `POST /v1/invoices/:id/pay` with `paid_out_of_band: true`. Clear `nwc_error` via `command.clear_tenant_nwc_error`.
- If payment fails: set `nwc_error` on tenant via `command.set_tenant_nwc_error`. Fall through to next option.
2. **Card on file**: If the tenant has a payment method on the Stripe customer, do nothing — Stripe will charge automatically.
3. **Manual payment**: If neither NWC nor card is available, send a DM via `robot.send_dm` notifying the tenant that payment is due with a link to the application for manual Lightning payment.
Skip invoices with `amount_due` of 0.
## `fn handle_invoice_paid(&self, invoice: &Invoice)`
- Look up tenant by `stripe_customer_id`
- If tenant has `past_due_at` set:
- Clear `past_due_at` via `command.clear_tenant_past_due`
- Find all `inactive` relays for the tenant that were deactivated due to non-payment (i.e. relays on paid plans that are inactive)
- Reactivate each one via `command.activate_relay`
## `fn handle_invoice_payment_failed(&self, invoice: &Invoice)`
- Look up tenant by `stripe_customer_id`
- If tenant does not already have `past_due_at` set:
- Set `past_due_at` to now via `command.set_tenant_past_due`
- Send a DM via `robot.send_dm` notifying the tenant that their payment has failed and their relays may be deactivated if not resolved.
## `fn handle_invoice_overdue(&self, invoice: &Invoice)`
- Look up tenant by `stripe_customer_id`
- Deactivate all active relays on paid plans via `command.deactivate_relay`
- Send a DM via `robot.send_dm` notifying the tenant that their paid relays have been deactivated due to non-payment
## `fn handle_subscription_updated(&self, subscription: &Subscription)`
- Look up tenant by `stripe_customer_id`
- If subscription status is `canceled` or `unpaid`:
- Clear `stripe_subscription_id` via `command.clear_tenant_subscription`
- Deactivate all active paid relays for the tenant via `command.deactivate_relay`
## `fn handle_subscription_deleted(&self, subscription: &Subscription)`
- Look up tenant by `stripe_customer_id`
- Clear `stripe_subscription_id` via `command.clear_tenant_subscription`
- Fetch the relay associated with the `activity`
- If the relay has `sync_error`, `synced` is false, `plan` is `free`, or `status` is `inactive`, delete the relay's subscription item using the Stripe api, and clear it with `command.delete_relay_subscription_item`.
- Otherwise, create/update the relay's subscription item to the appropriate Stripe price using the Stripe api and set it with `command.set_relay_subscription_item`.
- This method should be idempotent
+30
View File
@@ -69,3 +69,33 @@ Notes:
- Sets `stripe_subscription_item_id`
- Does not log activity
## `pub fn set_tenant_subscription(&self, pubkey: &str, stripe_subscription_id: &str) -> Result<()>`
- Sets `stripe_subscription_id` on the tenant
- Does not log activity
## `pub fn clear_tenant_subscription(&self, pubkey: &str) -> Result<()>`
- Sets `stripe_subscription_id = null` on the tenant
- Does not log activity
## `pub fn set_tenant_nwc_error(&self, pubkey: &str, error: &str) -> Result<()>`
- Sets `nwc_error` on the tenant
- Does not log activity
## `pub fn clear_tenant_nwc_error(&self, pubkey: &str) -> Result<()>`
- Sets `nwc_error = null` on the tenant
- Does not log activity
## `pub fn set_tenant_past_due(&self, pubkey: &str) -> Result<()>`
- Sets `past_due_at` to the current timestamp
- Does not log activity
## `pub fn clear_tenant_past_due(&self, pubkey: &str) -> Result<()>`
- Sets `past_due_at = null` on the tenant
- Does not log activity
+6 -4
View File
@@ -39,7 +39,7 @@ A plan represents a rate charged for relays at a given feature/usage limit. Plan
- `members` - the max number of members a relay can have before needing to upgrade. If empty, membership is not limited.
- `blossom` - whether blossom media hosting is available on this plan
- `livekit` - whether livekit audio/video calls are available on this plan
- `stripe_price_id` - the identifier of the price in Stripe
- `stripe_price_id` (nullable) - the identifier of the price in Stripe. Null for the free plan.
There are three plans available:
@@ -52,10 +52,12 @@ There are three plans available:
Tenants are customers of the service, identified by a nostr `pubkey`. Public metadata like name etc are pulled from the nostr network. They also have associated billing information.
- `pubkey` is the nostr public key identifying the tenant
- `nwc_url` (private) a nostr wallet connect URL used for **paying** invoices generated by the system
- `nwc_url` (private) a nostr wallet connect URL used for **paying** invoices generated by the system on the tenant's behalf
- `nwc_error` (private) a string indicating the most recent NWC payment error, if any. Cleared on successful NWC payment.
- `created_at` unix timestamp identifying tenant creation time
- `stripe_customer_id` a string identifying the associated stripe customer
- `stripe_subscription_id` a string identifying the associated stripe subscription
- `stripe_subscription_id` (nullable) a string identifying the associated stripe subscription. Created when the first paid (non-free) relay is activated, deleted when the last paid relay is removed or deactivated. Free-only tenants have no subscription.
- `past_due_at` (nullable) unix timestamp when the tenant's subscription became past due. Set on payment failure, cleared on payment success.
# Relay
@@ -66,7 +68,7 @@ A relay is a nostr relay owned by a `tenant` and hosted by the attached zooid in
- `schema` - the relay's db schema (read_only, calculated based on `subdomain` + `id`)
- `subdomain` - the relay's subdomain
- `plan` - the relay's plan
- `stripe_subscription_item_id` - the Stripe subscription item id.
- `stripe_subscription_item_id` (nullable) - the Stripe subscription item id. Only set for relays on paid plans.
- `status` - `active|inactive`. Only `active` relays count toward billing.
- `synced` - whether the relay has been successfully synced to zooid at least once.
- `sync_error` - a string indicating any errors encountered when synchronizing.
+8
View File
@@ -35,6 +35,14 @@ Members:
- Returns matching relay
## `pub fn get_tenant_by_stripe_customer_id(&self, stripe_customer_id: &str) -> Result<Tenant>`
- Returns the tenant matching the given `stripe_customer_id`
## `pub fn has_active_paid_relays(&self, tenant_id: &str) -> Result<bool>`
- Returns true if the tenant has any relays where `status = 'active'` and `plan != 'free'`
## `pub fn list_activity_for_relay(&self, relay_id: &str) -> Result<Vec<Activity>>`
- Returns all activity where `resource_type = 'relay'` and `resource_id = relay_id`
+140 -27
View File
@@ -16,6 +16,7 @@ use crate::billing::Billing;
use crate::command::Command;
use crate::models::{Relay, Tenant};
use crate::query::Query;
use axum::body::Bytes;
#[derive(Clone)]
pub struct Api {
@@ -26,6 +27,27 @@ pub struct Api {
billing: Billing,
}
async fn stripe_webhook(
State(state): State<AppState>,
headers: HeaderMap,
body: Bytes,
) -> Response {
let signature = headers
.get("Stripe-Signature")
.and_then(|v| v.to_str().ok())
.unwrap_or("");
let payload = match std::str::from_utf8(&body) {
Ok(s) => s,
Err(_) => return err(StatusCode::BAD_REQUEST, "bad-request", "invalid payload"),
};
match state.api.billing.handle_webhook(payload, signature).await {
Ok(()) => ok(StatusCode::OK, ()),
Err(e) => err(StatusCode::BAD_REQUEST, "webhook-error", &e.to_string()),
}
}
#[derive(Clone)]
struct AppState {
api: Arc<Api>,
@@ -47,6 +69,8 @@ struct ErrorResponse {
enum ApiError {
Unauthorized(anyhow::Error),
Forbidden(&'static str),
NotFound(&'static str),
Internal(String),
}
impl IntoResponse for ApiError {
@@ -54,6 +78,8 @@ impl IntoResponse for ApiError {
match self {
Self::Unauthorized(e) => err(StatusCode::UNAUTHORIZED, "unauthorized", &e.to_string()),
Self::Forbidden(message) => err(StatusCode::FORBIDDEN, "forbidden", message),
Self::NotFound(message) => err(StatusCode::NOT_FOUND, "not-found", message),
Self::Internal(message) => err(StatusCode::INTERNAL_SERVER_ERROR, "internal", &message),
}
}
}
@@ -93,6 +119,11 @@ impl Api {
.route("/relays/:id/activity", get(list_relay_activity))
.route("/relays/:id/deactivate", post(deactivate_relay))
.route("/relays/:id/reactivate", post(reactivate_relay))
.route("/tenants/:pubkey/invoices", get(list_tenant_invoices))
.route("/invoices/:id", get(get_invoice))
.route("/invoices/:id/bolt11", get(get_invoice_bolt11))
.route("/tenants/:pubkey/stripe/session", get(create_stripe_session))
.route("/stripe/webhook", post(stripe_webhook))
.with_state(state)
}
@@ -173,6 +204,14 @@ impl Api {
}
}
async fn get_tenant_or_404(&self, pubkey: &str) -> std::result::Result<Tenant, ApiError> {
match self.query.get_tenant(pubkey).await {
Ok(Some(t)) => Ok(t),
Ok(None) => Err(ApiError::NotFound("tenant not found")),
Err(e) => Err(ApiError::Internal(e.to_string())),
}
}
fn prepare_relay(&self, mut relay: Relay) -> anyhow::Result<Relay> {
if !relay
.subdomain
@@ -328,16 +367,17 @@ async fn get_identity(
// Only create if tenant doesn't exist yet
if let Ok(None) = state.api.query.get_tenant(&pubkey).await {
// TODO: Call Stripe API to create customer and subscription
// TODO: Call Stripe API to create a new customer
let stripe_customer_id = String::new();
let stripe_subscription_id = String::new();
let tenant = Tenant {
pubkey: pubkey.clone(),
nwc_url: String::new(),
nwc_error: None,
created_at: now_ts(),
stripe_customer_id,
stripe_subscription_id,
stripe_subscription_id: None,
past_due_at: None,
};
match state.api.command.create_tenant(&tenant).await {
@@ -376,16 +416,8 @@ async fn get_tenant(
) -> std::result::Result<Response, ApiError> {
let auth = state.api.extract_auth_pubkey(&headers)?;
state.api.require_admin_or_tenant(&auth, &pubkey)?;
match state.api.query.get_tenant(&pubkey).await {
Ok(Some(tenant)) => Ok(ok(StatusCode::OK, tenant)),
Ok(None) => Ok(err(StatusCode::NOT_FOUND, "not-found", "tenant not found")),
Err(e) => Ok(err(
StatusCode::INTERNAL_SERVER_ERROR,
"internal",
&e.to_string(),
)),
}
let tenant = state.api.get_tenant_or_404(&pubkey).await?;
Ok(ok(StatusCode::OK, tenant))
}
async fn list_relays(
@@ -662,7 +694,7 @@ async fn deactivate_relay(
));
}
match state.api.billing.deactivate_relay(&id).await {
match state.api.command.deactivate_relay(&relay).await {
Ok(()) => Ok(ok(StatusCode::OK, ())),
Err(e) => Ok(err(
StatusCode::INTERNAL_SERVER_ERROR,
@@ -701,7 +733,7 @@ async fn reactivate_relay(
));
}
match state.api.billing.reactivate_relay(&id).await {
match state.api.command.activate_relay(&relay).await {
Ok(()) => Ok(ok(StatusCode::OK, ())),
Err(e) => Ok(err(
StatusCode::INTERNAL_SERVER_ERROR,
@@ -711,6 +743,98 @@ async fn reactivate_relay(
}
}
async fn list_tenant_invoices(
State(state): State<AppState>,
headers: HeaderMap,
Path(pubkey): Path<String>,
) -> std::result::Result<Response, ApiError> {
let auth = state.api.extract_auth_pubkey(&headers)?;
state.api.require_admin_or_tenant(&auth, &pubkey)?;
let tenant = state.api.get_tenant_or_404(&pubkey).await?;
match state
.api
.billing
.stripe_list_invoices(&tenant.stripe_customer_id)
.await
{
Ok(invoices) => Ok(ok(StatusCode::OK, invoices)),
Err(e) => Ok(err(
StatusCode::INTERNAL_SERVER_ERROR,
"internal",
&e.to_string(),
)),
}
}
async fn get_invoice(
State(state): State<AppState>,
headers: HeaderMap,
Path(id): Path<String>,
) -> std::result::Result<Response, ApiError> {
let auth = state.api.extract_auth_pubkey(&headers)?;
let (invoice, tenant) = state.api.billing.get_invoice_with_tenant(&id).await
.map_err(|e| ApiError::Internal(e.to_string()))?;
state.api.require_admin_or_tenant(&auth, &tenant.pubkey)?;
Ok(ok(StatusCode::OK, invoice))
}
async fn get_invoice_bolt11(
State(state): State<AppState>,
headers: HeaderMap,
Path(id): Path<String>,
) -> std::result::Result<Response, ApiError> {
let auth = state.api.extract_auth_pubkey(&headers)?;
let (invoice, tenant) = state.api.billing.get_invoice_with_tenant(&id).await
.map_err(|e| ApiError::Internal(e.to_string()))?;
state.api.require_admin_or_tenant(&auth, &tenant.pubkey)?;
let status = invoice["status"].as_str().unwrap_or_default();
if status != "open" {
return Ok(err(
StatusCode::BAD_REQUEST,
"invoice-not-open",
"invoice is not open",
));
}
let amount_due = invoice["amount_due"].as_i64().unwrap_or(0);
match state.api.billing.create_bolt11(amount_due).await {
Ok(bolt11) => Ok(ok(StatusCode::OK, serde_json::json!({ "bolt11": bolt11 }))),
Err(e) => Ok(err(
StatusCode::INTERNAL_SERVER_ERROR,
"internal",
&e.to_string(),
)),
}
}
async fn create_stripe_session(
State(state): State<AppState>,
headers: HeaderMap,
Path(pubkey): Path<String>,
) -> std::result::Result<Response, ApiError> {
let auth = state.api.extract_auth_pubkey(&headers)?;
state.api.require_admin_or_tenant(&auth, &pubkey)?;
let tenant = state.api.get_tenant_or_404(&pubkey).await?;
match state
.api
.billing
.stripe_create_portal_session(&tenant.stripe_customer_id)
.await
{
Ok(url) => Ok(ok(StatusCode::OK, serde_json::json!({ "url": url }))),
Err(e) => Ok(err(
StatusCode::INTERNAL_SERVER_ERROR,
"internal",
&e.to_string(),
)),
}
}
async fn update_tenant(
State(state): State<AppState>,
headers: HeaderMap,
@@ -719,18 +843,7 @@ async fn update_tenant(
) -> std::result::Result<Response, ApiError> {
let auth = state.api.extract_auth_pubkey(&headers)?;
state.api.require_admin_or_tenant(&auth, &pubkey)?;
let mut tenant = match state.api.query.get_tenant(&pubkey).await {
Ok(Some(t)) => t,
Ok(None) => return Ok(err(StatusCode::NOT_FOUND, "not-found", "tenant not found")),
Err(e) => {
return Ok(err(
StatusCode::INTERNAL_SERVER_ERROR,
"internal",
&e.to_string(),
));
}
};
let mut tenant = state.api.get_tenant_or_404(&pubkey).await?;
if let Some(nwc_url) = payload.nwc_url {
tenant.nwc_url = nwc_url;
+611 -25
View File
@@ -1,13 +1,38 @@
use anyhow::Result;
use anyhow::{Result, anyhow};
use hmac::{Hmac, Mac};
use nwc::prelude::{
MakeInvoiceRequest, NostrWalletConnectURI, PayInvoiceRequest as NwcPayInvoiceRequest, NWC,
};
use sha2::Sha256;
use crate::command::Command;
use crate::models::Activity;
use crate::query::Query;
use crate::robot::Robot;
type HmacSha256 = Hmac<Sha256>;
const STRIPE_API: &str = "https://api.stripe.com/v1";
const WEBHOOK_TOLERANCE_SECS: i64 = 300;
#[derive(serde::Deserialize)]
struct StripeEvent {
#[serde(rename = "type")]
event_type: String,
data: StripeEventData,
}
#[derive(serde::Deserialize)]
struct StripeEventData {
object: serde_json::Value,
}
#[derive(Clone)]
pub struct Billing {
nwc_url: String,
stripe_secret_key: String,
stripe_webhook_secret: String,
http: reqwest::Client,
query: Query,
command: Command,
robot: Robot,
@@ -16,8 +41,13 @@ pub struct Billing {
impl Billing {
pub fn new(query: Query, command: Command, robot: Robot) -> Self {
let nwc_url = std::env::var("NWC_URL").unwrap_or_default();
let stripe_secret_key = std::env::var("STRIPE_SECRET_KEY").unwrap_or_default();
let stripe_webhook_secret = std::env::var("STRIPE_WEBHOOK_SECRET").unwrap_or_default();
Self {
nwc_url,
stripe_secret_key,
stripe_webhook_secret,
http: reqwest::Client::new(),
query,
command,
robot,
@@ -50,51 +80,607 @@ impl Billing {
);
if needs_billing_sync {
self.sync_relay_subscription_item(activity).await?;
self.sync_relay_subscription(activity).await?;
}
Ok(())
}
async fn sync_relay_subscription_item(&self, activity: &Activity) -> Result<()> {
pub async fn sync_relay_subscription(&self, activity: &Activity) -> Result<()> {
let Some(relay) = self.query.get_relay(&activity.resource_id).await? else {
return Ok(());
};
let should_delete = !relay.sync_error.is_empty()
|| relay.synced == 0
|| relay.plan == "free"
|| relay.status == "inactive";
let Some(tenant) = self.query.get_tenant(&relay.tenant).await? else {
return Ok(());
};
if should_delete {
if relay.stripe_subscription_item_id.is_some() {
// TODO: Delete subscription item via Stripe API
// Free plan: remove subscription item if exists, then clean up
if relay.plan == "free" {
if let Some(ref item_id) = relay.stripe_subscription_item_id {
self.stripe_delete_subscription_item(item_id).await?;
self.command.delete_relay_subscription_item(&relay.id).await?;
}
self.cleanup_empty_subscription(&tenant.pubkey).await?;
return Ok(());
}
// Inactive relay: remove subscription item if exists, then clean up
if relay.status == "inactive" {
if let Some(ref item_id) = relay.stripe_subscription_item_id {
self.stripe_delete_subscription_item(item_id).await?;
self.command.delete_relay_subscription_item(&relay.id).await?;
}
self.cleanup_empty_subscription(&tenant.pubkey).await?;
return Ok(());
}
// Active relay on a paid plan
let plan = Query::list_plans()
.into_iter()
.find(|p| p.id == relay.plan);
let Some(plan) = plan else {
return Ok(());
};
let Some(ref stripe_price_id) = plan.stripe_price_id else {
return Ok(());
};
// Ensure subscription exists
if tenant.stripe_subscription_id.is_none() {
let (subscription_id, item_id) = self
.stripe_create_subscription(&tenant.stripe_customer_id, stripe_price_id)
.await?;
self.command
.set_tenant_subscription(&tenant.pubkey, &subscription_id)
.await?;
self.command
.set_relay_subscription_item(&relay.id, &item_id)
.await?;
return Ok(());
}
// Sync the subscription item: create or update
let subscription_id = tenant.stripe_subscription_id.as_ref().unwrap();
let item_id = if let Some(ref existing_item_id) = relay.stripe_subscription_item_id {
self.stripe_update_subscription_item(existing_item_id, stripe_price_id)
.await?
} else {
// TODO: Create or update subscription item via Stripe API
// let stripe_subscription_item_id = ...;
// self.command.set_relay_subscription_item(&relay.id, &stripe_subscription_item_id).await?;
self.stripe_create_subscription_item(subscription_id, stripe_price_id)
.await?
};
self.command
.set_relay_subscription_item(&relay.id, &item_id)
.await?;
Ok(())
}
async fn cleanup_empty_subscription(&self, tenant_pubkey: &str) -> Result<()> {
let has_paid = self.query.has_active_paid_relays(tenant_pubkey).await?;
if has_paid {
return Ok(());
}
let Some(tenant) = self.query.get_tenant(tenant_pubkey).await? else {
return Ok(());
};
if let Some(ref subscription_id) = tenant.stripe_subscription_id {
self.stripe_cancel_subscription(subscription_id).await?;
self.command.clear_tenant_subscription(tenant_pubkey).await?;
}
Ok(())
}
pub async fn deactivate_relay(&self, relay_id: &str) -> Result<()> {
let relay = self
.query
.get_relay(relay_id)
.await?
.ok_or_else(|| anyhow::anyhow!("relay not found"))?;
self.command.deactivate_relay(&relay).await
pub async fn handle_webhook(&self, payload: &str, signature: &str) -> Result<()> {
self.verify_webhook_signature(payload, signature)?;
let event: StripeEvent = serde_json::from_str(payload)?;
let obj = &event.data.object;
match event.event_type.as_str() {
"invoice.created" => {
let customer = obj["customer"].as_str().unwrap_or_default();
let amount_due = obj["amount_due"].as_i64().unwrap_or(0);
let invoice_id = obj["id"].as_str().unwrap_or_default();
self.handle_invoice_created(customer, amount_due, invoice_id)
.await?;
}
"invoice.paid" => {
let customer = obj["customer"].as_str().unwrap_or_default();
self.handle_invoice_paid(customer).await?;
}
"invoice.payment_failed" => {
let customer = obj["customer"].as_str().unwrap_or_default();
self.handle_invoice_payment_failed(customer).await?;
}
"invoice.overdue" => {
let customer = obj["customer"].as_str().unwrap_or_default();
self.handle_invoice_overdue(customer).await?;
}
"customer.subscription.updated" => {
let customer = obj["customer"].as_str().unwrap_or_default();
let status = obj["status"].as_str().unwrap_or_default();
self.handle_subscription_updated(customer, status).await?;
}
"customer.subscription.deleted" => {
let customer = obj["customer"].as_str().unwrap_or_default();
self.handle_subscription_deleted(customer).await?;
}
_ => {}
}
Ok(())
}
pub async fn reactivate_relay(&self, relay_id: &str) -> Result<()> {
let relay = self
fn verify_webhook_signature(&self, payload: &str, sig_header: &str) -> Result<()> {
let mut timestamp = None;
let mut signature = None;
for part in sig_header.split(',') {
if let Some(t) = part.strip_prefix("t=") {
timestamp = Some(t);
} else if let Some(v) = part.strip_prefix("v1=") {
signature = Some(v);
}
}
let timestamp = timestamp.ok_or_else(|| anyhow!("missing webhook timestamp"))?;
let signature = signature.ok_or_else(|| anyhow!("missing webhook signature"))?;
let signed_payload = format!("{timestamp}.{payload}");
let mut mac = HmacSha256::new_from_slice(self.stripe_webhook_secret.as_bytes())
.map_err(|e| anyhow!("invalid webhook secret: {e}"))?;
mac.update(signed_payload.as_bytes());
let expected = hex::encode(mac.finalize().into_bytes());
if expected != signature {
return Err(anyhow!("webhook signature mismatch"));
}
let ts: i64 = timestamp.parse().map_err(|_| anyhow!("bad webhook timestamp"))?;
let now = chrono::Utc::now().timestamp();
if (now - ts).abs() > WEBHOOK_TOLERANCE_SECS {
return Err(anyhow!("webhook timestamp outside tolerance"));
}
Ok(())
}
async fn handle_invoice_created(
&self,
stripe_customer_id: &str,
amount_due: i64,
invoice_id: &str,
) -> Result<()> {
if amount_due == 0 {
return Ok(());
}
let Some(tenant) = self
.query
.get_relay(relay_id)
.get_tenant_by_stripe_customer_id(stripe_customer_id)
.await?
.ok_or_else(|| anyhow::anyhow!("relay not found"))?;
self.command.activate_relay(&relay).await
else {
return Ok(());
};
// 1. NWC auto-pay: if the tenant has a nwc_url
if !tenant.nwc_url.is_empty() {
match self.nwc_pay_invoice(amount_due, &tenant.nwc_url).await {
Ok(()) => {
self.stripe_pay_invoice_out_of_band(invoice_id).await?;
self.command
.clear_tenant_nwc_error(&tenant.pubkey)
.await?;
return Ok(());
}
Err(e) => {
let error_msg = format!("{e}");
self.command
.set_tenant_nwc_error(&tenant.pubkey, &error_msg)
.await?;
// Fall through to next option
}
}
}
// 2. Card on file: if the tenant has a payment method, Stripe charges automatically
if self
.stripe_has_payment_method(&tenant.stripe_customer_id)
.await?
{
return Ok(());
}
// 3. Manual payment: send a DM
self.robot
.send_dm(
&tenant.pubkey,
"Payment is due for your relay subscription. Please visit the application to complete a manual Lightning payment.",
)
.await?;
Ok(())
}
async fn handle_invoice_paid(&self, stripe_customer_id: &str) -> Result<()> {
let Some(tenant) = self
.query
.get_tenant_by_stripe_customer_id(stripe_customer_id)
.await?
else {
return Ok(());
};
if tenant.past_due_at.is_some() {
self.command.clear_tenant_past_due(&tenant.pubkey).await?;
let relays = self.query.list_relays_for_tenant(&tenant.pubkey).await?;
for relay in relays {
if relay.status == "inactive" && relay.plan != "free" {
self.command.activate_relay(&relay).await?;
}
}
}
Ok(())
}
async fn handle_invoice_payment_failed(&self, stripe_customer_id: &str) -> Result<()> {
let Some(tenant) = self
.query
.get_tenant_by_stripe_customer_id(stripe_customer_id)
.await?
else {
return Ok(());
};
if tenant.past_due_at.is_none() {
self.command.set_tenant_past_due(&tenant.pubkey).await?;
self.robot
.send_dm(
&tenant.pubkey,
"Your payment has failed. Your relays may be deactivated if not resolved within a week.",
)
.await?;
}
Ok(())
}
async fn handle_subscription_updated(
&self,
stripe_customer_id: &str,
status: &str,
) -> Result<()> {
if status != "canceled" && status != "unpaid" {
return Ok(());
}
let Some(tenant) = self
.query
.get_tenant_by_stripe_customer_id(stripe_customer_id)
.await?
else {
return Ok(());
};
self.command
.clear_tenant_subscription(&tenant.pubkey)
.await?;
let relays = self.query.list_relays_for_tenant(&tenant.pubkey).await?;
for relay in relays {
if relay.status == "active" && relay.plan != "free" {
self.command.deactivate_relay(&relay).await?;
}
}
Ok(())
}
async fn handle_subscription_deleted(&self, stripe_customer_id: &str) -> Result<()> {
let Some(tenant) = self
.query
.get_tenant_by_stripe_customer_id(stripe_customer_id)
.await?
else {
return Ok(());
};
self.command
.clear_tenant_subscription(&tenant.pubkey)
.await?;
Ok(())
}
async fn handle_invoice_overdue(&self, stripe_customer_id: &str) -> Result<()> {
let Some(tenant) = self
.query
.get_tenant_by_stripe_customer_id(stripe_customer_id)
.await?
else {
return Ok(());
};
let relays = self.query.list_relays_for_tenant(&tenant.pubkey).await?;
for relay in relays {
if relay.status == "active" && relay.plan != "free" {
self.command.deactivate_relay(&relay).await?;
}
}
self.robot
.send_dm(
&tenant.pubkey,
"Your paid relays have been deactivated due to non-payment.",
)
.await?;
Ok(())
}
// --- Public API helpers ---
pub async fn get_invoice_with_tenant(
&self,
invoice_id: &str,
) -> Result<(serde_json::Value, crate::models::Tenant)> {
let invoice = self.stripe_get_invoice(invoice_id).await?;
let customer_id = invoice["customer"]
.as_str()
.ok_or_else(|| anyhow!("invoice missing customer"))?;
let tenant = self
.query
.get_tenant_by_stripe_customer_id(customer_id)
.await?
.ok_or_else(|| anyhow!("tenant not found for customer"))?;
Ok((invoice, tenant))
}
pub async fn stripe_list_invoices(&self, customer_id: &str) -> Result<serde_json::Value> {
let resp = self
.http
.get(format!("{STRIPE_API}/invoices"))
.bearer_auth(&self.stripe_secret_key)
.query(&[("customer", customer_id)])
.send()
.await?;
let body: serde_json::Value = resp.error_for_status()?.json().await?;
Ok(body["data"].clone())
}
pub async fn stripe_get_invoice(&self, invoice_id: &str) -> Result<serde_json::Value> {
let resp = self
.http
.get(format!("{STRIPE_API}/invoices/{invoice_id}"))
.bearer_auth(&self.stripe_secret_key)
.send()
.await?;
let body: serde_json::Value = resp.error_for_status()?.json().await?;
Ok(body)
}
pub async fn create_bolt11(&self, amount_due_cents: i64) -> Result<String> {
let amount_msats = (amount_due_cents as u64) * 1000; // placeholder conversion
let system_uri: NostrWalletConnectURI = self.nwc_url.parse()
.map_err(|_| anyhow!("invalid system NWC URL"))?;
let system_nwc = NWC::new(system_uri);
let make_req = MakeInvoiceRequest {
amount: amount_msats,
description: Some("Relay subscription payment".to_string()),
description_hash: None,
expiry: None,
};
let invoice_response = system_nwc
.make_invoice(make_req)
.await
.map_err(|e| anyhow!("failed to create invoice: {e}"))?;
system_nwc.shutdown().await;
Ok(invoice_response.invoice)
}
pub async fn stripe_create_portal_session(&self, customer_id: &str) -> Result<String> {
let resp = self
.http
.post(format!("{STRIPE_API}/billing_portal/sessions"))
.bearer_auth(&self.stripe_secret_key)
.form(&[("customer", customer_id)])
.send()
.await?;
let body: serde_json::Value = resp.error_for_status()?.json().await?;
let url = body["url"]
.as_str()
.ok_or_else(|| anyhow!("missing portal session url"))?
.to_string();
Ok(url)
}
// --- Stripe API helpers ---
async fn stripe_create_subscription(
&self,
customer_id: &str,
price_id: &str,
) -> Result<(String, String)> {
let resp = self
.http
.post(format!("{STRIPE_API}/subscriptions"))
.bearer_auth(&self.stripe_secret_key)
.form(&[
("customer", customer_id),
("collection_method", "charge_automatically"),
("items[0][price]", price_id),
])
.send()
.await?;
let body: serde_json::Value = resp.error_for_status()?.json().await?;
let subscription_id = body["id"]
.as_str()
.ok_or_else(|| anyhow!("missing subscription id"))?
.to_string();
let item_id = body["items"]["data"][0]["id"]
.as_str()
.ok_or_else(|| anyhow!("missing subscription item id"))?
.to_string();
Ok((subscription_id, item_id))
}
async fn stripe_create_subscription_item(
&self,
subscription_id: &str,
price_id: &str,
) -> Result<String> {
let resp = self
.http
.post(format!("{STRIPE_API}/subscription_items"))
.bearer_auth(&self.stripe_secret_key)
.form(&[
("subscription", subscription_id),
("price", price_id),
])
.send()
.await?;
let body: serde_json::Value = resp.error_for_status()?.json().await?;
let item_id = body["id"]
.as_str()
.ok_or_else(|| anyhow!("missing subscription item id"))?
.to_string();
Ok(item_id)
}
async fn stripe_update_subscription_item(
&self,
item_id: &str,
price_id: &str,
) -> Result<String> {
let resp = self
.http
.post(format!("{STRIPE_API}/subscription_items/{item_id}"))
.bearer_auth(&self.stripe_secret_key)
.form(&[("price", price_id)])
.send()
.await?;
let body: serde_json::Value = resp.error_for_status()?.json().await?;
let id = body["id"]
.as_str()
.ok_or_else(|| anyhow!("missing subscription item id"))?
.to_string();
Ok(id)
}
async fn stripe_delete_subscription_item(&self, item_id: &str) -> Result<()> {
self.http
.delete(format!("{STRIPE_API}/subscription_items/{item_id}"))
.bearer_auth(&self.stripe_secret_key)
.send()
.await?
.error_for_status()?;
Ok(())
}
async fn stripe_cancel_subscription(&self, subscription_id: &str) -> Result<()> {
self.http
.delete(format!("{STRIPE_API}/subscriptions/{subscription_id}"))
.bearer_auth(&self.stripe_secret_key)
.send()
.await?
.error_for_status()?;
Ok(())
}
async fn stripe_pay_invoice_out_of_band(&self, invoice_id: &str) -> Result<()> {
self.http
.post(format!("{STRIPE_API}/invoices/{invoice_id}/pay"))
.bearer_auth(&self.stripe_secret_key)
.form(&[("paid_out_of_band", "true")])
.send()
.await?
.error_for_status()?;
Ok(())
}
async fn stripe_has_payment_method(&self, customer_id: &str) -> Result<bool> {
let resp = self
.http
.get(format!("{STRIPE_API}/payment_methods"))
.bearer_auth(&self.stripe_secret_key)
.query(&[("customer", customer_id), ("type", "card")])
.send()
.await?;
let body: serde_json::Value = resp.error_for_status()?.json().await?;
let has_method = body["data"]
.as_array()
.map(|a| !a.is_empty())
.unwrap_or(false);
Ok(has_method)
}
// --- NWC helpers ---
async fn nwc_pay_invoice(&self, amount_due_cents: i64, tenant_nwc_url: &str) -> Result<()> {
// Convert USD cents to millisatoshis (approximate: 1 sat ≈ variable USD)
// amount_due is in cents from Stripe. We create a Lightning invoice for the exact amount.
// The NWC make_invoice amount is in millisatoshis.
let amount_msats = (amount_due_cents as u64) * 1000; // placeholder conversion, actual rate would come from exchange
// Create a bolt11 invoice using the system wallet (self.nwc_url)
let system_uri: NostrWalletConnectURI = self.nwc_url.parse()
.map_err(|_| anyhow!("invalid system NWC URL"))?;
let system_nwc = NWC::new(system_uri);
let make_req = MakeInvoiceRequest {
amount: amount_msats,
description: Some("Relay subscription payment".to_string()),
description_hash: None,
expiry: None,
};
let invoice_response = system_nwc
.make_invoice(make_req)
.await
.map_err(|e| anyhow!("failed to create invoice: {e}"))?;
system_nwc.shutdown().await;
// Pay the bolt11 invoice using the tenant's wallet
let tenant_uri: NostrWalletConnectURI = tenant_nwc_url.parse()
.map_err(|_| anyhow!("invalid tenant NWC URL"))?;
let tenant_nwc = NWC::new(tenant_uri);
let pay_req = NwcPayInvoiceRequest::new(invoice_response.invoice);
tenant_nwc
.pay_invoice(pay_req)
.await
.map_err(|e| anyhow!("failed to pay invoice: {e}"))?;
tenant_nwc.shutdown().await;
Ok(())
}
}
+54 -3
View File
@@ -67,14 +67,13 @@ impl Command {
let mut tx = self.pool.begin().await?;
sqlx::query(
"INSERT INTO tenant (pubkey, nwc_url, created_at, stripe_customer_id, stripe_subscription_id)
VALUES (?, ?, ?, ?, ?)",
"INSERT INTO tenant (pubkey, nwc_url, created_at, stripe_customer_id)
VALUES (?, ?, ?, ?)",
)
.bind(&tenant.pubkey)
.bind(&tenant.nwc_url)
.bind(tenant.created_at)
.bind(&tenant.stripe_customer_id)
.bind(&tenant.stripe_subscription_id)
.execute(&mut *tx)
.await?;
@@ -255,4 +254,56 @@ impl Command {
.await?;
Ok(())
}
pub async fn set_tenant_subscription(&self, pubkey: &str, stripe_subscription_id: &str) -> Result<()> {
sqlx::query("UPDATE tenant SET stripe_subscription_id = ? WHERE pubkey = ?")
.bind(stripe_subscription_id)
.bind(pubkey)
.execute(&self.pool)
.await?;
Ok(())
}
pub async fn clear_tenant_subscription(&self, pubkey: &str) -> Result<()> {
sqlx::query("UPDATE tenant SET stripe_subscription_id = NULL WHERE pubkey = ?")
.bind(pubkey)
.execute(&self.pool)
.await?;
Ok(())
}
pub async fn set_tenant_nwc_error(&self, pubkey: &str, error: &str) -> Result<()> {
sqlx::query("UPDATE tenant SET nwc_error = ? WHERE pubkey = ?")
.bind(error)
.bind(pubkey)
.execute(&self.pool)
.await?;
Ok(())
}
pub async fn clear_tenant_nwc_error(&self, pubkey: &str) -> Result<()> {
sqlx::query("UPDATE tenant SET nwc_error = NULL WHERE pubkey = ?")
.bind(pubkey)
.execute(&self.pool)
.await?;
Ok(())
}
pub async fn set_tenant_past_due(&self, pubkey: &str) -> Result<()> {
let now = chrono::Utc::now().timestamp();
sqlx::query("UPDATE tenant SET past_due_at = ? WHERE pubkey = ?")
.bind(now)
.bind(pubkey)
.execute(&self.pool)
.await?;
Ok(())
}
pub async fn clear_tenant_past_due(&self, pubkey: &str) -> Result<()> {
sqlx::query("UPDATE tenant SET past_due_at = NULL WHERE pubkey = ?")
.bind(pubkey)
.execute(&self.pool)
.await?;
Ok(())
}
}
+4 -3
View File
@@ -18,17 +18,18 @@ pub struct Plan {
pub members: Option<i64>,
pub blossom: bool,
pub livekit: bool,
pub stripe_price_id: String,
pub stripe_price_id: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize, sqlx::FromRow)]
pub struct Tenant {
pub pubkey: String,
#[serde(skip_serializing)]
pub nwc_url: String,
pub nwc_error: Option<String>,
pub created_at: i64,
pub stripe_customer_id: String,
pub stripe_subscription_id: String,
pub stripe_subscription_id: Option<String>,
pub past_due_at: Option<i64>,
}
#[derive(Debug, Clone, Serialize, Deserialize, sqlx::FromRow)]
+27 -5
View File
@@ -15,7 +15,7 @@ impl Query {
pub async fn list_tenants(&self) -> Result<Vec<Tenant>> {
let rows = sqlx::query_as::<_, Tenant>(
"SELECT pubkey, nwc_url, created_at, stripe_customer_id, stripe_subscription_id
"SELECT pubkey, nwc_url, nwc_error, created_at, stripe_customer_id, stripe_subscription_id, past_due_at
FROM tenant
ORDER BY pubkey",
)
@@ -26,7 +26,7 @@ impl Query {
pub async fn get_tenant(&self, pubkey: &str) -> Result<Option<Tenant>> {
let row = sqlx::query_as::<_, Tenant>(
"SELECT pubkey, nwc_url, created_at, stripe_customer_id, stripe_subscription_id
"SELECT pubkey, nwc_url, nwc_error, created_at, stripe_customer_id, stripe_subscription_id, past_due_at
FROM tenant
WHERE pubkey = ?",
)
@@ -45,7 +45,7 @@ impl Query {
members: Some(10),
blossom: false,
livekit: false,
stripe_price_id: String::new(),
stripe_price_id: None,
},
Plan {
id: "basic".to_string(),
@@ -54,7 +54,7 @@ impl Query {
members: Some(100),
blossom: true,
livekit: true,
stripe_price_id: std::env::var("STRIPE_PRICE_BASIC").unwrap_or_default(),
stripe_price_id: Some(std::env::var("STRIPE_PRICE_BASIC").unwrap_or_default()),
},
Plan {
id: "growth".to_string(),
@@ -63,7 +63,7 @@ impl Query {
members: None,
blossom: true,
livekit: true,
stripe_price_id: std::env::var("STRIPE_PRICE_GROWTH").unwrap_or_default(),
stripe_price_id: Some(std::env::var("STRIPE_PRICE_GROWTH").unwrap_or_default()),
},
]
}
@@ -119,6 +119,28 @@ impl Query {
Ok(row)
}
pub async fn get_tenant_by_stripe_customer_id(&self, stripe_customer_id: &str) -> Result<Option<Tenant>> {
let row = sqlx::query_as::<_, Tenant>(
"SELECT pubkey, nwc_url, nwc_error, created_at, stripe_customer_id, stripe_subscription_id, past_due_at
FROM tenant
WHERE stripe_customer_id = ?",
)
.bind(stripe_customer_id)
.fetch_optional(&self.pool)
.await?;
Ok(row)
}
pub async fn has_active_paid_relays(&self, tenant_id: &str) -> Result<bool> {
let count = sqlx::query_scalar::<_, i64>(
"SELECT COUNT(*) FROM relay WHERE tenant = ? AND status = 'active' AND plan != 'free'",
)
.bind(tenant_id)
.fetch_one(&self.pool)
.await?;
Ok(count > 0)
}
pub async fn list_activity_for_relay(&self, relay_id: &str) -> Result<Vec<Activity>> {
let rows = sqlx::query_as::<_, Activity>(
"SELECT id, tenant, created_at, activity_type, resource_type, resource_id