forked from coracle/caravel
Group subscription items by price
This commit is contained in:
@@ -31,3 +31,5 @@ NWC_URL= # Nostr Wallet Connect URL for generating Lightnin
|
|||||||
ENCRYPTION_SECRET= # Nostr secret key (hex or nsec) used to encrypt tenant NWC URLs at rest
|
ENCRYPTION_SECRET= # Nostr secret key (hex or nsec) used to encrypt tenant NWC URLs at rest
|
||||||
STRIPE_SECRET_KEY= # Required Stripe API secret key (sk_...)
|
STRIPE_SECRET_KEY= # Required Stripe API secret key (sk_...)
|
||||||
STRIPE_WEBHOOK_SECRET=whsec_test_00000000000000000000000000 # Webhook signing secret (use real value in production)
|
STRIPE_WEBHOOK_SECRET=whsec_test_00000000000000000000000000 # Webhook signing secret (use real value in production)
|
||||||
|
STRIPE_PRICE_BASIC= # Stripe price ID (price_...) for the Basic plan; required for paid plans
|
||||||
|
STRIPE_PRICE_GROWTH= # Stripe price ID (price_...) for the Growth plan; required for paid plans
|
||||||
|
|||||||
@@ -47,6 +47,8 @@ Environment variables:
|
|||||||
| `ENCRYPTION_SECRET` | Nostr secret key (hex or nsec) used to encrypt tenant NWC URLs at rest | _required_ |
|
| `ENCRYPTION_SECRET` | Nostr secret key (hex or nsec) used to encrypt tenant NWC URLs at rest | _required_ |
|
||||||
| `STRIPE_SECRET_KEY` | Stripe API secret key used for billing API operations | _required_ |
|
| `STRIPE_SECRET_KEY` | Stripe API secret key used for billing API operations | _required_ |
|
||||||
| `STRIPE_WEBHOOK_SECRET` | Stripe webhook signing secret used to verify `Stripe-Signature` headers | _required_ |
|
| `STRIPE_WEBHOOK_SECRET` | Stripe webhook signing secret used to verify `Stripe-Signature` headers | _required_ |
|
||||||
|
| `STRIPE_PRICE_BASIC` | Stripe price ID (`price_...`) backing the Basic plan | _required for paid plans_ |
|
||||||
|
| `STRIPE_PRICE_GROWTH` | Stripe price ID (`price_...`) backing the Growth plan | _required for paid plans_ |
|
||||||
| `ROBOT_SECRET` | Robot Nostr secret key | _required_ |
|
| `ROBOT_SECRET` | Robot Nostr secret key | _required_ |
|
||||||
| `ROBOT_NAME` | Robot display name (kind `0`) | _optional_ |
|
| `ROBOT_NAME` | Robot display name (kind `0`) | _optional_ |
|
||||||
| `ROBOT_DESCRIPTION` | Robot description (kind `0`) | _optional_ |
|
| `ROBOT_DESCRIPTION` | Robot description (kind `0`) | _optional_ |
|
||||||
|
|||||||
+14
-9
@@ -24,18 +24,23 @@ Members:
|
|||||||
|
|
||||||
## `pub fn sync_relay_subscription(&self, activity: &Activity)`
|
## `pub fn sync_relay_subscription(&self, activity: &Activity)`
|
||||||
|
|
||||||
Manages the Stripe subscription and subscription items for a relay's tenant. Only paid (non-free) relays interact with Stripe. Free-only tenants have no subscription. Must be idempotent.
|
Resolves the relay associated with `activity` and reconciles that relay's tenant via `sync_tenant_subscription`. The startup/lagged reconcile loop calls `sync_tenant_subscription` for every tenant.
|
||||||
|
|
||||||
|
## `fn sync_tenant_subscription(&self, tenant_pubkey: &str)`
|
||||||
|
|
||||||
|
Reconciles a tenant's single Stripe subscription with the set of relays that should be billed. Only paid (non-free) relays interact with Stripe. Free-only tenants have no subscription. Must be idempotent.
|
||||||
|
|
||||||
|
Stripe forbids two subscription items on the same subscription from sharing a price, so billing is modeled as **one subscription item per plan (price), with `quantity` equal to the number of the tenant's `active` relays on that plan**. Every such relay's `stripe_subscription_item_id` points at the shared item for its plan; relays that aren't billed (free, inactive, delinquent) have it cleared.
|
||||||
|
|
||||||
Stripe uses **pay-in-advance** by default: when a subscription is first created, Stripe immediately generates an open invoice for the current period. The `invoice.created` webhook fires shortly after and `handle_invoice_created` attempts payment.
|
Stripe uses **pay-in-advance** by default: when a subscription is first created, Stripe immediately generates an open invoice for the current period. The `invoice.created` webhook fires shortly after and `handle_invoice_created` attempts payment.
|
||||||
|
|
||||||
- Fetch the relay and tenant associated with the `activity`
|
- Fetch the tenant and its relays. Build the desired state: for each `active` relay on a paid plan with a non-empty `stripe_price_id`, count relays per price.
|
||||||
- **If relay plan is `free`**: if the relay has a `stripe_subscription_item_id`, delete it via the Stripe API and call `command.delete_relay_subscription_item`. Then run downgrade proration validation by previewing the upcoming invoice and logging proration lines/amounts. Then check cleanup (below). Return early.
|
- **Resolve the live subscription**: if the tenant has a `stripe_subscription_id`, fetch it. If Stripe no longer knows about it, or its status is `canceled`/`incomplete_expired`, call `command.clear_tenant_subscription` and treat the tenant as having no subscription.
|
||||||
- **If relay is `inactive` or `delinquent`**: if the relay has a `stripe_subscription_item_id`, delete it via the Stripe API and call `command.delete_relay_subscription_item`. Then check cleanup (below). Return early.
|
- **No relays to bill** (desired state empty): if the tenant still has a `stripe_subscription_id`, cancel the Stripe subscription and call `command.clear_tenant_subscription`. Clear `stripe_subscription_item_id` on every relay that has one. Return.
|
||||||
- **If relay is `active` and on a paid plan**:
|
- **No subscription yet**: create a Stripe subscription for the customer with `collection_method: "charge_automatically"` and one item per `(price, quantity)`. Save the subscription ID via `command.set_tenant_subscription`.
|
||||||
- **Ensure subscription exists**: If the tenant has no `stripe_subscription_id`, create a Stripe subscription for the customer with `collection_method: "charge_automatically"` and the relay's price as the first item. Save the subscription ID via `command.set_tenant_subscription` and the item ID via `command.set_relay_subscription_item`. Return early.
|
- **Existing subscription**: fetch its current items. For each desired `(price, quantity)`: update the matching item's quantity if it differs, otherwise create the item. Delete any item whose price no longer appears in the desired state.
|
||||||
- **Sync the subscription item**: If the tenant already has a subscription, create or update the relay's Stripe subscription item to the plan's `stripe_price_id` via the Stripe API, then call `command.set_relay_subscription_item`.
|
- **Point relays at items**: for each relay, set `stripe_subscription_item_id` (via `command.set_relay_subscription_item`) to the shared item for its plan, or clear it (via `command.delete_relay_subscription_item`) if the relay is not billed.
|
||||||
- **Downgrade validation**: when changing an existing subscription item, detect if the new plan amount is lower than the current one. If yes, preview the upcoming Stripe invoice and log proration line/amount details to validate expected credit/proration behavior.
|
- **Downgrade validation**: if any quantity decreased or any item was removed, preview the upcoming Stripe invoice and log proration line/amount details to validate expected credit/proration behavior.
|
||||||
- **Clean up empty subscription**: After any item deletion, check if the tenant has any remaining active paid relays. If none and the tenant has a `stripe_subscription_id`, cancel the Stripe subscription immediately and call `command.clear_tenant_subscription`.
|
|
||||||
|
|
||||||
## `pub fn handle_webhook(&self, payload: &str, signature: &str) -> Result<()>`
|
## `pub fn handle_webhook(&self, payload: &str, signature: &str) -> Result<()>`
|
||||||
|
|
||||||
|
|||||||
@@ -39,10 +39,6 @@ Members:
|
|||||||
|
|
||||||
- Returns the tenant matching the given `stripe_customer_id`
|
- Returns the tenant matching the given `stripe_customer_id`
|
||||||
|
|
||||||
## `pub fn has_active_paid_relays(&self, tenant_id: &str) -> Result<bool>`
|
|
||||||
|
|
||||||
- Returns true if the tenant has any relays where `status = 'active'` and `plan != 'free'`
|
|
||||||
|
|
||||||
## `pub fn list_activity_for_relay(&self, relay_id: &str) -> Result<Vec<Activity>>`
|
## `pub fn list_activity_for_relay(&self, relay_id: &str) -> Result<Vec<Activity>>`
|
||||||
|
|
||||||
- Returns all activity where `resource_type = 'relay'` and `resource_id = relay_id`
|
- Returns all activity where `resource_type = 'relay'` and `resource_id = relay_id`
|
||||||
|
|||||||
+298
-187
@@ -5,11 +5,10 @@ use nwc::prelude::{
|
|||||||
PayInvoiceRequest as NwcPayInvoiceRequest, TransactionState,
|
PayInvoiceRequest as NwcPayInvoiceRequest, TransactionState,
|
||||||
};
|
};
|
||||||
use sha2::Sha256;
|
use sha2::Sha256;
|
||||||
|
use std::collections::BTreeMap;
|
||||||
|
|
||||||
use crate::command::Command;
|
use crate::command::Command;
|
||||||
use crate::models::{
|
use crate::models::{Activity, RELAY_STATUS_ACTIVE, RELAY_STATUS_DELINQUENT, Relay};
|
||||||
Activity, RELAY_STATUS_ACTIVE, RELAY_STATUS_DELINQUENT, RELAY_STATUS_INACTIVE, Relay,
|
|
||||||
};
|
|
||||||
use crate::query::Query;
|
use crate::query::Query;
|
||||||
use crate::robot::Robot;
|
use crate::robot::Robot;
|
||||||
|
|
||||||
@@ -145,24 +144,24 @@ impl Billing {
|
|||||||
}
|
}
|
||||||
|
|
||||||
async fn reconcile_relay_subscriptions(&self, source: &str) -> Result<()> {
|
async fn reconcile_relay_subscriptions(&self, source: &str) -> Result<()> {
|
||||||
let relays = self.query.list_relays().await?;
|
let tenants = self.query.list_tenants().await?;
|
||||||
|
|
||||||
if relays.is_empty() {
|
if tenants.is_empty() {
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
|
|
||||||
tracing::info!(
|
tracing::info!(
|
||||||
source,
|
source,
|
||||||
relay_count = relays.len(),
|
tenant_count = tenants.len(),
|
||||||
"reconciling relay billing state"
|
"reconciling relay billing state"
|
||||||
);
|
);
|
||||||
|
|
||||||
for relay in relays {
|
for tenant in tenants {
|
||||||
if let Err(error) = self.sync_relay_subscription_for_relay(&relay).await {
|
if let Err(error) = self.sync_tenant_subscription(&tenant.pubkey).await {
|
||||||
tracing::error!(
|
tracing::error!(
|
||||||
source,
|
source,
|
||||||
relay = %relay.id,
|
tenant = %tenant.pubkey,
|
||||||
error = %error,
|
error = ?error,
|
||||||
"failed to reconcile relay billing state"
|
"failed to reconcile relay billing state"
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
@@ -194,113 +193,174 @@ impl Billing {
|
|||||||
return Ok(());
|
return Ok(());
|
||||||
};
|
};
|
||||||
|
|
||||||
self.sync_relay_subscription_for_relay(&relay).await
|
self.sync_tenant_subscription(&relay.tenant).await
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn sync_relay_subscription_for_relay(&self, relay: &Relay) -> Result<()> {
|
/// Reconciles a tenant's single Stripe subscription with the set of relays that
|
||||||
let Some(tenant) = self.query.get_tenant(&relay.tenant).await? else {
|
/// should be billed.
|
||||||
|
///
|
||||||
|
/// Stripe forbids two subscription items on the same subscription from sharing a
|
||||||
|
/// price, so billing is modeled as one subscription item per plan (price) with
|
||||||
|
/// `quantity` equal to the number of the tenant's `active` relays on that plan.
|
||||||
|
/// Every such relay's `stripe_subscription_item_id` points at the shared item for
|
||||||
|
/// its plan; relays that aren't billed (free, inactive, delinquent) have it
|
||||||
|
/// cleared. Must be idempotent.
|
||||||
|
async fn sync_tenant_subscription(&self, tenant_pubkey: &str) -> Result<()> {
|
||||||
|
let Some(mut tenant) = self.query.get_tenant(tenant_pubkey).await? else {
|
||||||
return Ok(());
|
return Ok(());
|
||||||
};
|
};
|
||||||
|
|
||||||
let plan = Query::get_plan(&relay.plan)
|
let relays = self.query.list_relays_for_tenant(tenant_pubkey).await?;
|
||||||
.ok_or_else(|| anyhow!("unknown relay plan id: {}", relay.plan))?;
|
|
||||||
|
|
||||||
// Free plan: remove subscription item if exists, then clean up
|
// Desired billed state: price id -> quantity, plus which relays map to which price.
|
||||||
if plan.id == "free" {
|
let mut desired: BTreeMap<String, i64> = BTreeMap::new();
|
||||||
if let Some(ref item_id) = relay.stripe_subscription_item_id {
|
let mut relay_price: BTreeMap<String, String> = BTreeMap::new();
|
||||||
self.stripe_delete_subscription_item(item_id).await?;
|
for relay in &relays {
|
||||||
self.command
|
if relay.status != RELAY_STATUS_ACTIVE {
|
||||||
.delete_relay_subscription_item(&relay.id)
|
continue;
|
||||||
.await?;
|
|
||||||
self.validate_downgrade_proration(&tenant, "free-plan-downgrade")
|
|
||||||
.await;
|
|
||||||
}
|
}
|
||||||
self.cleanup_empty_subscription(&tenant.pubkey).await?;
|
let Some(plan) = Query::get_plan(&relay.plan) else {
|
||||||
return Ok(());
|
tracing::warn!(relay = %relay.id, plan = %relay.plan, "active relay on unknown plan; not billed");
|
||||||
|
continue;
|
||||||
|
};
|
||||||
|
let Some(price_id) = plan.stripe_price_id else {
|
||||||
|
continue; // free plan: nothing to bill
|
||||||
|
};
|
||||||
|
if price_id.trim().is_empty() {
|
||||||
|
tracing::warn!(relay = %relay.id, plan = %relay.plan, "active relay on a paid plan with no configured Stripe price id; not billed");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
*desired.entry(price_id.clone()).or_insert(0) += 1;
|
||||||
|
relay_price.insert(relay.id.clone(), price_id);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Inactive relay: remove subscription item if exists, then clean up
|
// Resolve the live subscription, dropping a stale reference to one that no
|
||||||
if relay.status == RELAY_STATUS_INACTIVE || relay.status == RELAY_STATUS_DELINQUENT {
|
// longer exists or has been canceled.
|
||||||
if let Some(ref item_id) = relay.stripe_subscription_item_id {
|
let subscription = match tenant.stripe_subscription_id.as_deref() {
|
||||||
self.stripe_delete_subscription_item(item_id).await?;
|
Some(subscription_id) => match self.stripe_get_subscription(subscription_id).await? {
|
||||||
|
Some(sub)
|
||||||
|
if !matches!(
|
||||||
|
sub["status"].as_str().unwrap_or_default(),
|
||||||
|
"canceled" | "incomplete_expired"
|
||||||
|
) =>
|
||||||
|
{
|
||||||
|
Some(sub)
|
||||||
|
}
|
||||||
|
_ => {
|
||||||
|
self.command
|
||||||
|
.clear_tenant_subscription(tenant_pubkey)
|
||||||
|
.await?;
|
||||||
|
tenant.stripe_subscription_id = None;
|
||||||
|
None
|
||||||
|
}
|
||||||
|
},
|
||||||
|
None => None,
|
||||||
|
};
|
||||||
|
|
||||||
|
// No relays to bill: tear everything down.
|
||||||
|
if desired.is_empty() {
|
||||||
|
if let Some(ref subscription_id) = tenant.stripe_subscription_id {
|
||||||
|
self.stripe_cancel_subscription(subscription_id).await?;
|
||||||
self.command
|
self.command
|
||||||
.delete_relay_subscription_item(&relay.id)
|
.clear_tenant_subscription(tenant_pubkey)
|
||||||
.await?;
|
.await?;
|
||||||
}
|
}
|
||||||
self.cleanup_empty_subscription(&tenant.pubkey).await?;
|
for relay in &relays {
|
||||||
return Ok(());
|
if relay.stripe_subscription_item_id.is_some() {
|
||||||
}
|
self.command
|
||||||
|
.delete_relay_subscription_item(&relay.id)
|
||||||
// Active relay on a paid plan
|
.await?;
|
||||||
let Some(ref stripe_price_id) = plan.stripe_price_id else {
|
}
|
||||||
return Ok(());
|
|
||||||
};
|
|
||||||
|
|
||||||
// Ensure subscription exists
|
|
||||||
if tenant.stripe_subscription_id.is_none() {
|
|
||||||
let (subscription_id, item_id) = self
|
|
||||||
.stripe_create_subscription(&tenant.stripe_customer_id, stripe_price_id)
|
|
||||||
.await?;
|
|
||||||
self.command
|
|
||||||
.set_tenant_subscription(&tenant.pubkey, &subscription_id)
|
|
||||||
.await?;
|
|
||||||
self.command
|
|
||||||
.set_relay_subscription_item(&relay.id, &item_id)
|
|
||||||
.await?;
|
|
||||||
return Ok(());
|
|
||||||
}
|
|
||||||
|
|
||||||
// Sync the subscription item: create or update
|
|
||||||
let subscription_id = tenant.stripe_subscription_id.as_ref().unwrap();
|
|
||||||
let item_id = if let Some(ref existing_item_id) = relay.stripe_subscription_item_id {
|
|
||||||
let is_downgrade = self
|
|
||||||
.is_subscription_item_downgrade(existing_item_id, plan.amount)
|
|
||||||
.await
|
|
||||||
.unwrap_or_else(|error| {
|
|
||||||
tracing::warn!(
|
|
||||||
error = %error,
|
|
||||||
relay_id = %relay.id,
|
|
||||||
"failed to determine relay plan downgrade direction"
|
|
||||||
);
|
|
||||||
false
|
|
||||||
});
|
|
||||||
|
|
||||||
let updated_item_id = self
|
|
||||||
.stripe_update_subscription_item(existing_item_id, stripe_price_id)
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
if is_downgrade {
|
|
||||||
self.validate_downgrade_proration(&tenant, "paid-plan-downgrade")
|
|
||||||
.await;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
updated_item_id
|
|
||||||
} else {
|
|
||||||
self.stripe_create_subscription_item(subscription_id, stripe_price_id)
|
|
||||||
.await?
|
|
||||||
};
|
|
||||||
self.command
|
|
||||||
.set_relay_subscription_item(&relay.id, &item_id)
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn cleanup_empty_subscription(&self, tenant_pubkey: &str) -> Result<()> {
|
|
||||||
let has_paid = self.query.has_active_paid_relays(tenant_pubkey).await?;
|
|
||||||
if has_paid {
|
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
|
|
||||||
let Some(tenant) = self.query.get_tenant(tenant_pubkey).await? else {
|
// Bring the subscription's items in line with `desired`. `price_to_item` ends
|
||||||
return Ok(());
|
// up mapping every desired price to its (possibly newly created) item id.
|
||||||
};
|
let mut price_to_item: BTreeMap<String, String> = BTreeMap::new();
|
||||||
|
let mut downgraded = false;
|
||||||
|
|
||||||
if let Some(ref subscription_id) = tenant.stripe_subscription_id {
|
match subscription {
|
||||||
self.stripe_cancel_subscription(subscription_id).await?;
|
None => {
|
||||||
self.command
|
let (subscription_id, items) = self
|
||||||
.clear_tenant_subscription(tenant_pubkey)
|
.stripe_create_subscription(&tenant.stripe_customer_id, &desired)
|
||||||
.await?;
|
.await?;
|
||||||
|
self.command
|
||||||
|
.set_tenant_subscription(tenant_pubkey, &subscription_id)
|
||||||
|
.await?;
|
||||||
|
tenant.stripe_subscription_id = Some(subscription_id);
|
||||||
|
price_to_item = items;
|
||||||
|
}
|
||||||
|
Some(sub) => {
|
||||||
|
let subscription_id = sub["id"]
|
||||||
|
.as_str()
|
||||||
|
.ok_or_else(|| anyhow!("missing subscription id"))?
|
||||||
|
.to_string();
|
||||||
|
|
||||||
|
// price id -> (item id, quantity) for items currently on the subscription.
|
||||||
|
let mut current: BTreeMap<String, (String, i64)> = BTreeMap::new();
|
||||||
|
for item in sub["items"]["data"].as_array().into_iter().flatten() {
|
||||||
|
let (Some(item_id), Some(price_id)) =
|
||||||
|
(item["id"].as_str(), item["price"]["id"].as_str())
|
||||||
|
else {
|
||||||
|
continue;
|
||||||
|
};
|
||||||
|
let quantity = item["quantity"].as_i64().unwrap_or(1);
|
||||||
|
current.insert(price_id.to_string(), (item_id.to_string(), quantity));
|
||||||
|
}
|
||||||
|
|
||||||
|
for (price_id, &quantity) in &desired {
|
||||||
|
if let Some((item_id, current_quantity)) = current.remove(price_id) {
|
||||||
|
if current_quantity != quantity {
|
||||||
|
if quantity < current_quantity {
|
||||||
|
downgraded = true;
|
||||||
|
}
|
||||||
|
self.stripe_set_subscription_item_quantity(&item_id, quantity)
|
||||||
|
.await?;
|
||||||
|
}
|
||||||
|
price_to_item.insert(price_id.clone(), item_id);
|
||||||
|
} else {
|
||||||
|
let item_id = self
|
||||||
|
.stripe_create_subscription_item(&subscription_id, price_id, quantity)
|
||||||
|
.await?;
|
||||||
|
price_to_item.insert(price_id.clone(), item_id);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Items for plans no relay is on anymore.
|
||||||
|
for (_, (item_id, _)) in current {
|
||||||
|
downgraded = true;
|
||||||
|
self.stripe_delete_subscription_item(&item_id).await?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Point each relay at the shared item for its plan (or clear it if unbilled).
|
||||||
|
for relay in &relays {
|
||||||
|
match relay_price.get(&relay.id) {
|
||||||
|
Some(price_id) => {
|
||||||
|
let item_id = price_to_item
|
||||||
|
.get(price_id)
|
||||||
|
.ok_or_else(|| anyhow!("missing subscription item for price {price_id}"))?;
|
||||||
|
if relay.stripe_subscription_item_id.as_deref() != Some(item_id.as_str()) {
|
||||||
|
self.command
|
||||||
|
.set_relay_subscription_item(&relay.id, item_id)
|
||||||
|
.await?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
None => {
|
||||||
|
if relay.stripe_subscription_item_id.is_some() {
|
||||||
|
self.command
|
||||||
|
.delete_relay_subscription_item(&relay.id)
|
||||||
|
.await?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if downgraded {
|
||||||
|
self.validate_downgrade_proration(&tenant, "tenant-subscription-sync")
|
||||||
|
.await;
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
@@ -624,33 +684,6 @@ impl Billing {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn is_subscription_item_downgrade(
|
|
||||||
&self,
|
|
||||||
item_id: &str,
|
|
||||||
next_plan_amount: i64,
|
|
||||||
) -> Result<bool> {
|
|
||||||
let Some(current_price_id) = self.stripe_get_subscription_item_price_id(item_id).await?
|
|
||||||
else {
|
|
||||||
return Ok(false);
|
|
||||||
};
|
|
||||||
|
|
||||||
let Some(current_plan_amount) = Self::plan_amount_from_price_id(¤t_price_id) else {
|
|
||||||
return Ok(false);
|
|
||||||
};
|
|
||||||
|
|
||||||
Ok(next_plan_amount < current_plan_amount)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn plan_amount_from_price_id(price_id: &str) -> Option<i64> {
|
|
||||||
Query::list_plans().into_iter().find_map(|plan| {
|
|
||||||
if plan.stripe_price_id.as_deref() == Some(price_id) {
|
|
||||||
Some(plan.amount)
|
|
||||||
} else {
|
|
||||||
None
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn validate_downgrade_proration(&self, tenant: &crate::models::Tenant, context: &str) {
|
async fn validate_downgrade_proration(&self, tenant: &crate::models::Tenant, context: &str) {
|
||||||
match self
|
match self
|
||||||
.stripe_preview_upcoming_invoice(
|
.stripe_preview_upcoming_invoice(
|
||||||
@@ -786,7 +819,7 @@ impl Billing {
|
|||||||
.send()
|
.send()
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
let body: serde_json::Value = resp.error_for_status()?.json().await?;
|
let body: serde_json::Value = stripe_error_for_status(resp).await?.json().await?;
|
||||||
let customer_id = body["id"]
|
let customer_id = body["id"]
|
||||||
.as_str()
|
.as_str()
|
||||||
.ok_or_else(|| anyhow!("missing customer id"))?;
|
.ok_or_else(|| anyhow!("missing customer id"))?;
|
||||||
@@ -807,7 +840,7 @@ impl Billing {
|
|||||||
.send()
|
.send()
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
let body: serde_json::Value = resp.error_for_status()?.json().await?;
|
let body: serde_json::Value = stripe_error_for_status(resp).await?.json().await?;
|
||||||
Ok(body["data"].clone())
|
Ok(body["data"].clone())
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -828,7 +861,7 @@ impl Billing {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
let body: serde_json::Value = resp.error_for_status()?.json().await?;
|
let body: serde_json::Value = stripe_error_for_status(resp).await?.json().await?;
|
||||||
Ok(body)
|
Ok(body)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -983,7 +1016,7 @@ impl Billing {
|
|||||||
.send()
|
.send()
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
let body: serde_json::Value = resp.error_for_status()?.json().await?;
|
let body: serde_json::Value = stripe_error_for_status(resp).await?.json().await?;
|
||||||
let url = body["url"]
|
let url = body["url"]
|
||||||
.as_str()
|
.as_str()
|
||||||
.ok_or_else(|| anyhow!("missing portal session url"))?
|
.ok_or_else(|| anyhow!("missing portal session url"))?
|
||||||
@@ -1006,55 +1039,105 @@ impl Billing {
|
|||||||
hex::encode(mac.finalize().into_bytes())
|
hex::encode(mac.finalize().into_bytes())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Fetches a subscription, returning `None` if Stripe no longer knows about it
|
||||||
|
/// (so callers can recover from a stale `stripe_subscription_id`).
|
||||||
|
async fn stripe_get_subscription(
|
||||||
|
&self,
|
||||||
|
subscription_id: &str,
|
||||||
|
) -> Result<Option<serde_json::Value>> {
|
||||||
|
let resp = self
|
||||||
|
.http
|
||||||
|
.get(format!("{STRIPE_API}/subscriptions/{subscription_id}"))
|
||||||
|
.bearer_auth(&self.stripe_secret_key)
|
||||||
|
.send()
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
if resp.status() == reqwest::StatusCode::NOT_FOUND {
|
||||||
|
return Ok(None);
|
||||||
|
}
|
||||||
|
|
||||||
|
let body: serde_json::Value = stripe_error_for_status(resp).await?.json().await?;
|
||||||
|
Ok(Some(body))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Creates a subscription with one item per `(price_id, quantity)` entry. Returns
|
||||||
|
/// the subscription id and a map from price id to the created subscription item id.
|
||||||
async fn stripe_create_subscription(
|
async fn stripe_create_subscription(
|
||||||
&self,
|
&self,
|
||||||
customer_id: &str,
|
customer_id: &str,
|
||||||
price_id: &str,
|
items: &BTreeMap<String, i64>,
|
||||||
) -> Result<(String, String)> {
|
) -> Result<(String, BTreeMap<String, String>)> {
|
||||||
let idempotency_key = self.idempotency_key(&["create_subscription", customer_id, price_id]);
|
let mut form: Vec<(String, String)> = vec![
|
||||||
|
("customer".to_string(), customer_id.to_string()),
|
||||||
|
(
|
||||||
|
"collection_method".to_string(),
|
||||||
|
"charge_automatically".to_string(),
|
||||||
|
),
|
||||||
|
];
|
||||||
|
let mut key_parts: Vec<String> =
|
||||||
|
vec!["create_subscription".to_string(), customer_id.to_string()];
|
||||||
|
for (index, (price_id, quantity)) in items.iter().enumerate() {
|
||||||
|
form.push((format!("items[{index}][price]"), price_id.clone()));
|
||||||
|
form.push((format!("items[{index}][quantity]"), quantity.to_string()));
|
||||||
|
key_parts.push(format!("{price_id}={quantity}"));
|
||||||
|
}
|
||||||
|
let key_refs: Vec<&str> = key_parts.iter().map(String::as_str).collect();
|
||||||
|
let idempotency_key = self.idempotency_key(&key_refs);
|
||||||
|
|
||||||
let resp = self
|
let resp = self
|
||||||
.http
|
.http
|
||||||
.post(format!("{STRIPE_API}/subscriptions"))
|
.post(format!("{STRIPE_API}/subscriptions"))
|
||||||
.bearer_auth(&self.stripe_secret_key)
|
.bearer_auth(&self.stripe_secret_key)
|
||||||
.header("Idempotency-Key", idempotency_key)
|
.header("Idempotency-Key", idempotency_key)
|
||||||
.form(&[
|
.form(&form)
|
||||||
("customer", customer_id),
|
|
||||||
("collection_method", "charge_automatically"),
|
|
||||||
("items[0][price]", price_id),
|
|
||||||
])
|
|
||||||
.send()
|
.send()
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
let body: serde_json::Value = resp.error_for_status()?.json().await?;
|
let body: serde_json::Value = stripe_error_for_status(resp).await?.json().await?;
|
||||||
let subscription_id = body["id"]
|
let subscription_id = body["id"]
|
||||||
.as_str()
|
.as_str()
|
||||||
.ok_or_else(|| anyhow!("missing subscription id"))?
|
.ok_or_else(|| anyhow!("missing subscription id"))?
|
||||||
.to_string();
|
.to_string();
|
||||||
let item_id = body["items"]["data"][0]["id"]
|
let mut price_to_item = BTreeMap::new();
|
||||||
.as_str()
|
for item in body["items"]["data"]
|
||||||
.ok_or_else(|| anyhow!("missing subscription item id"))?
|
.as_array()
|
||||||
.to_string();
|
.ok_or_else(|| anyhow!("missing subscription items"))?
|
||||||
|
{
|
||||||
|
let item_id = item["id"]
|
||||||
|
.as_str()
|
||||||
|
.ok_or_else(|| anyhow!("missing subscription item id"))?;
|
||||||
|
let price_id = item["price"]["id"]
|
||||||
|
.as_str()
|
||||||
|
.ok_or_else(|| anyhow!("missing subscription item price id"))?;
|
||||||
|
price_to_item.insert(price_id.to_string(), item_id.to_string());
|
||||||
|
}
|
||||||
|
|
||||||
Ok((subscription_id, item_id))
|
Ok((subscription_id, price_to_item))
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn stripe_create_subscription_item(
|
async fn stripe_create_subscription_item(
|
||||||
&self,
|
&self,
|
||||||
subscription_id: &str,
|
subscription_id: &str,
|
||||||
price_id: &str,
|
price_id: &str,
|
||||||
|
quantity: i64,
|
||||||
) -> Result<String> {
|
) -> Result<String> {
|
||||||
let idempotency_key =
|
let idempotency_key =
|
||||||
self.idempotency_key(&["create_subscription_item", subscription_id, price_id]);
|
self.idempotency_key(&["create_subscription_item", subscription_id, price_id]);
|
||||||
|
let quantity = quantity.to_string();
|
||||||
let resp = self
|
let resp = self
|
||||||
.http
|
.http
|
||||||
.post(format!("{STRIPE_API}/subscription_items"))
|
.post(format!("{STRIPE_API}/subscription_items"))
|
||||||
.bearer_auth(&self.stripe_secret_key)
|
.bearer_auth(&self.stripe_secret_key)
|
||||||
.header("Idempotency-Key", idempotency_key)
|
.header("Idempotency-Key", idempotency_key)
|
||||||
.form(&[("subscription", subscription_id), ("price", price_id)])
|
.form(&[
|
||||||
|
("subscription", subscription_id),
|
||||||
|
("price", price_id),
|
||||||
|
("quantity", quantity.as_str()),
|
||||||
|
])
|
||||||
.send()
|
.send()
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
let body: serde_json::Value = resp.error_for_status()?.json().await?;
|
let body: serde_json::Value = stripe_error_for_status(resp).await?.json().await?;
|
||||||
let item_id = body["id"]
|
let item_id = body["id"]
|
||||||
.as_str()
|
.as_str()
|
||||||
.ok_or_else(|| anyhow!("missing subscription item id"))?
|
.ok_or_else(|| anyhow!("missing subscription item id"))?
|
||||||
@@ -1063,78 +1146,63 @@ impl Billing {
|
|||||||
Ok(item_id)
|
Ok(item_id)
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn stripe_update_subscription_item(
|
/// Sets a subscription item's quantity. No idempotency key: this is a
|
||||||
|
/// reconcile-to-desired-state write, and re-applying the same target is a no-op.
|
||||||
|
async fn stripe_set_subscription_item_quantity(
|
||||||
&self,
|
&self,
|
||||||
item_id: &str,
|
item_id: &str,
|
||||||
price_id: &str,
|
quantity: i64,
|
||||||
) -> Result<String> {
|
) -> Result<()> {
|
||||||
let idempotency_key =
|
|
||||||
self.idempotency_key(&["update_subscription_item", item_id, price_id]);
|
|
||||||
let resp = self
|
let resp = self
|
||||||
.http
|
.http
|
||||||
.post(format!("{STRIPE_API}/subscription_items/{item_id}"))
|
.post(format!("{STRIPE_API}/subscription_items/{item_id}"))
|
||||||
.bearer_auth(&self.stripe_secret_key)
|
.bearer_auth(&self.stripe_secret_key)
|
||||||
.header("Idempotency-Key", idempotency_key)
|
.form(&[("quantity", quantity.to_string())])
|
||||||
.form(&[("price", price_id)])
|
|
||||||
.send()
|
.send()
|
||||||
.await?;
|
.await?;
|
||||||
|
stripe_error_for_status(resp).await?;
|
||||||
|
|
||||||
let body: serde_json::Value = resp.error_for_status()?.json().await?;
|
Ok(())
|
||||||
let id = body["id"]
|
|
||||||
.as_str()
|
|
||||||
.ok_or_else(|| anyhow!("missing subscription item id"))?
|
|
||||||
.to_string();
|
|
||||||
|
|
||||||
Ok(id)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn stripe_delete_subscription_item(&self, item_id: &str) -> Result<()> {
|
async fn stripe_delete_subscription_item(&self, item_id: &str) -> Result<()> {
|
||||||
self.http
|
let resp = self
|
||||||
|
.http
|
||||||
.delete(format!("{STRIPE_API}/subscription_items/{item_id}"))
|
.delete(format!("{STRIPE_API}/subscription_items/{item_id}"))
|
||||||
.bearer_auth(&self.stripe_secret_key)
|
.bearer_auth(&self.stripe_secret_key)
|
||||||
.send()
|
.send()
|
||||||
.await?
|
.await?;
|
||||||
.error_for_status()?;
|
stripe_error_for_status(resp).await?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn stripe_cancel_subscription(&self, subscription_id: &str) -> Result<()> {
|
async fn stripe_cancel_subscription(&self, subscription_id: &str) -> Result<()> {
|
||||||
self.http
|
let resp = self
|
||||||
|
.http
|
||||||
.delete(format!("{STRIPE_API}/subscriptions/{subscription_id}"))
|
.delete(format!("{STRIPE_API}/subscriptions/{subscription_id}"))
|
||||||
.bearer_auth(&self.stripe_secret_key)
|
.bearer_auth(&self.stripe_secret_key)
|
||||||
.send()
|
.send()
|
||||||
.await?
|
.await?;
|
||||||
.error_for_status()?;
|
stripe_error_for_status(resp).await?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn stripe_pay_invoice(&self, invoice_id: &str) -> Result<()> {
|
async fn stripe_pay_invoice(&self, invoice_id: &str) -> Result<()> {
|
||||||
let idempotency_key = self.idempotency_key(&["pay_invoice", invoice_id]);
|
let idempotency_key = self.idempotency_key(&["pay_invoice", invoice_id]);
|
||||||
self.http
|
let resp = self
|
||||||
|
.http
|
||||||
.post(format!("{STRIPE_API}/invoices/{invoice_id}/pay"))
|
.post(format!("{STRIPE_API}/invoices/{invoice_id}/pay"))
|
||||||
.bearer_auth(&self.stripe_secret_key)
|
.bearer_auth(&self.stripe_secret_key)
|
||||||
.header("Idempotency-Key", idempotency_key)
|
.header("Idempotency-Key", idempotency_key)
|
||||||
.send()
|
.send()
|
||||||
.await?
|
.await?;
|
||||||
.error_for_status()?;
|
stripe_error_for_status(resp).await?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn stripe_get_subscription_item_price_id(&self, item_id: &str) -> Result<Option<String>> {
|
|
||||||
let resp = self
|
|
||||||
.http
|
|
||||||
.get(format!("{STRIPE_API}/subscription_items/{item_id}"))
|
|
||||||
.bearer_auth(&self.stripe_secret_key)
|
|
||||||
.send()
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
let body: serde_json::Value = resp.error_for_status()?.json().await?;
|
|
||||||
Ok(body["price"]["id"].as_str().map(ToString::to_string))
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn stripe_preview_upcoming_invoice(
|
async fn stripe_preview_upcoming_invoice(
|
||||||
&self,
|
&self,
|
||||||
customer_id: &str,
|
customer_id: &str,
|
||||||
@@ -1150,20 +1218,24 @@ impl Billing {
|
|||||||
req = req.query(&[("subscription", subscription_id)]);
|
req = req.query(&[("subscription", subscription_id)]);
|
||||||
}
|
}
|
||||||
|
|
||||||
let body: serde_json::Value = req.send().await?.error_for_status()?.json().await?;
|
let body: serde_json::Value = stripe_error_for_status(req.send().await?)
|
||||||
|
.await?
|
||||||
|
.json()
|
||||||
|
.await?;
|
||||||
Ok(body)
|
Ok(body)
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn stripe_pay_invoice_out_of_band(&self, invoice_id: &str) -> Result<()> {
|
async fn stripe_pay_invoice_out_of_band(&self, invoice_id: &str) -> Result<()> {
|
||||||
let idempotency_key = self.idempotency_key(&["pay_invoice_oob", invoice_id]);
|
let idempotency_key = self.idempotency_key(&["pay_invoice_oob", invoice_id]);
|
||||||
self.http
|
let resp = self
|
||||||
|
.http
|
||||||
.post(format!("{STRIPE_API}/invoices/{invoice_id}/pay"))
|
.post(format!("{STRIPE_API}/invoices/{invoice_id}/pay"))
|
||||||
.bearer_auth(&self.stripe_secret_key)
|
.bearer_auth(&self.stripe_secret_key)
|
||||||
.header("Idempotency-Key", idempotency_key)
|
.header("Idempotency-Key", idempotency_key)
|
||||||
.form(&[("paid_out_of_band", "true")])
|
.form(&[("paid_out_of_band", "true")])
|
||||||
.send()
|
.send()
|
||||||
.await?
|
.await?;
|
||||||
.error_for_status()?;
|
stripe_error_for_status(resp).await?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
@@ -1177,7 +1249,7 @@ impl Billing {
|
|||||||
.send()
|
.send()
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
let body: serde_json::Value = resp.error_for_status()?.json().await?;
|
let body: serde_json::Value = stripe_error_for_status(resp).await?.json().await?;
|
||||||
let has_method = body["data"]
|
let has_method = body["data"]
|
||||||
.as_array()
|
.as_array()
|
||||||
.map(|a| !a.is_empty())
|
.map(|a| !a.is_empty())
|
||||||
@@ -1391,6 +1463,45 @@ impl Billing {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Like [`reqwest::Response::error_for_status`], but on a 4xx/5xx response it reads
|
||||||
|
/// the body and folds Stripe's JSON error payload (`error.message`/`code`/`param`)
|
||||||
|
/// into the returned error, so callers get an actionable message instead of a bare
|
||||||
|
/// "400 Bad Request" with only the URL.
|
||||||
|
async fn stripe_error_for_status(resp: reqwest::Response) -> Result<reqwest::Response> {
|
||||||
|
let status = resp.status();
|
||||||
|
if !status.is_client_error() && !status.is_server_error() {
|
||||||
|
return Ok(resp);
|
||||||
|
}
|
||||||
|
|
||||||
|
let url = resp.url().clone();
|
||||||
|
let body = resp.text().await.unwrap_or_default();
|
||||||
|
let detail = serde_json::from_str::<serde_json::Value>(&body)
|
||||||
|
.ok()
|
||||||
|
.and_then(|json| {
|
||||||
|
let error = &json["error"];
|
||||||
|
let message = error["message"].as_str()?.to_string();
|
||||||
|
let mut detail = message;
|
||||||
|
if let Some(code) = error["type"].as_str().or_else(|| error["code"].as_str()) {
|
||||||
|
detail.push_str(&format!(" [{code}]"));
|
||||||
|
}
|
||||||
|
if let Some(param) = error["param"].as_str() {
|
||||||
|
detail.push_str(&format!(" (param: {param})"));
|
||||||
|
}
|
||||||
|
Some(detail)
|
||||||
|
})
|
||||||
|
.unwrap_or_else(|| {
|
||||||
|
if body.trim().is_empty() {
|
||||||
|
"<empty response body>".to_string()
|
||||||
|
} else {
|
||||||
|
body
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
Err(anyhow!(
|
||||||
|
"Stripe API request to {url} failed with status {status}: {detail}"
|
||||||
|
))
|
||||||
|
}
|
||||||
|
|
||||||
pub async fn fetch_btc_spot_price_from_base(
|
pub async fn fetch_btc_spot_price_from_base(
|
||||||
http: &reqwest::Client,
|
http: &reqwest::Client,
|
||||||
api_base: &str,
|
api_base: &str,
|
||||||
|
|||||||
@@ -184,17 +184,6 @@ impl Query {
|
|||||||
Ok(bolt11)
|
Ok(bolt11)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn has_active_paid_relays(&self, tenant_id: &str) -> Result<bool> {
|
|
||||||
let plans = sqlx::query_scalar::<_, String>(
|
|
||||||
"SELECT plan FROM relay WHERE tenant = ? AND status = 'active'",
|
|
||||||
)
|
|
||||||
.bind(tenant_id)
|
|
||||||
.fetch_all(&self.pool)
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
Ok(plans.into_iter().any(|plan| Self::is_paid_plan(&plan)))
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn list_activity_for_relay(&self, relay_id: &str) -> Result<Vec<Activity>> {
|
pub async fn list_activity_for_relay(&self, relay_id: &str) -> Result<Vec<Activity>> {
|
||||||
let rows = sqlx::query_as::<_, Activity>(
|
let rows = sqlx::query_as::<_, Activity>(
|
||||||
"SELECT id, tenant, created_at, activity_type, resource_type, resource_id
|
"SELECT id, tenant, created_at, activity_type, resource_type, resource_id
|
||||||
|
|||||||
Reference in New Issue
Block a user