From 05e4eac025005cdc0042533d92cdd47d639c5dfc Mon Sep 17 00:00:00 2001 From: Jon Staab Date: Wed, 1 Apr 2026 16:26:54 -0700 Subject: [PATCH] Add Stripe subscription sync --- backend/spec/api.md | 6 ++++-- backend/spec/billing.md | 11 +++++++++++ backend/spec/command.md | 14 ++++++++++++-- backend/spec/infra.md | 8 +------- backend/spec/main.md | 1 + backend/spec/models.md | 14 +++++++++----- backend/spec/query.md | 9 --------- backend/src/infra.rs | 23 +---------------------- backend/src/query.rs | 22 ---------------------- 9 files changed, 39 insertions(+), 69 deletions(-) diff --git a/backend/spec/api.md b/backend/spec/api.md index b9154ea..e71adbd 100644 --- a/backend/spec/api.md +++ b/backend/spec/api.md @@ -46,7 +46,9 @@ Notes: - Serves `GET /identity` - Authorizes anyone, but must be authorized -- If a tenant for the identity doesn't exist, one is created +- If a tenant for the identity doesn't exist: + - Call the Stripe API to create a new customer and subscription + - Create a new tenant using `command.create_tenant` with payload and stripe info - Return `data` is an `Identity` struct --- Tenant routes @@ -119,7 +121,7 @@ Notes: - Serves `POST /relays/:id/deactivate` - Authorizes admin or relay owner -- If relay is already active, return a `400` with `code=relay-is-inactive` +- If relay is already inactive, return a `400` with `code=relay-is-inactive` - Call `billing.deactivate_relay` - Return `data` is empty diff --git a/backend/spec/billing.md b/backend/spec/billing.md index 51fd577..50a7f93 100644 --- a/backend/spec/billing.md +++ b/backend/spec/billing.md @@ -12,4 +12,15 @@ Members: ## `pub fn new(query: Query, command: Command, robot: Robot) -> Self` - Reads environment and populates members + +## `pub fn start(&self)` + - Subscribes to `command.notify.notified` + - On `create_relay`, `update_relay`, `activate_relay`, `deactivate_relay`, `fail_relay_sync`, and `complete_relay_sync`, call `self.sync_relay_subscription_item`. + +## `pub fn sync_relay_subscription_item(&self, activity: &Activity)` + +- Fetch the relay associated with the `activity` +- If the relay has `sync_error`, `synced` is false, `plan` is `free`, or `status` is `inactive`, delete the relay's subscription item using the Stripe api, and clear it with `command.delete_relay_subscription_item`. +- Otherwise, create/update the relay's subscription item to the appropriate Stripe price using the Stripe api and set it with `command.set_relay_subscription_item`. +- This method should be idempotent diff --git a/backend/spec/command.md b/backend/spec/command.md index 47621d0..f07af92 100644 --- a/backend/spec/command.md +++ b/backend/spec/command.md @@ -52,10 +52,20 @@ Notes: ## `pub fn fail_relay_sync(&self, relay: &Relay, sync_error: &str) -> Result<()>` -- Sets relay status to `inactive`, sets `sync_error` +- Sets `sync_error` on the relay - Logs activity as `(fail_relay_sync, relay_id)` ## `pub fn complete_relay_sync(&self, relay_id: &str) -> Result<()>` -- Sets `synced = 1`, `status = 'active'`, clears `sync_error` +- Sets `synced = 1`, clears `sync_error` - Logs activity as `(complete_relay_sync, relay_id)` + +## `pub fn delete_relay_subscription_item(&self, relay_id: &str) -> Result<()>` + +- Sets `stripe_subscription_item_id = null` +- Does not log activity + +## `pub fn set_relay_subscription_item(&self, relay_id: &str, stripe_subscription_item_id: &str) -> Result<()>` + +- Sets `stripe_subscription_item_id` +- Does not log activity diff --git a/backend/spec/infra.md b/backend/spec/infra.md index c372d49..94d7204 100644 --- a/backend/spec/infra.md +++ b/backend/spec/infra.md @@ -14,14 +14,8 @@ Members: ## `pub async fn start(self)` -- Subscribes to `command.notify` before doing anything else so no activities are missed. -- Calls `catch_up` to sync any relays that need it from before this process started. +- Subscribes to `command.notify` - Loops on `rx.recv()`, calling `handle_activity` for each received `Activity`. -- On `Lagged`, logs a warning and runs `catch_up` to recover. - -## `async fn catch_up(&self)` - -- Lists all relays via `query.list_relays()` and syncs any where `synced == 0` and `sync_error` is empty. ## `async fn handle_activity(&self, activity: &Activity)` diff --git a/backend/spec/main.md b/backend/spec/main.md index a26b2e0..c7e44dd 100644 --- a/backend/spec/main.md +++ b/backend/spec/main.md @@ -6,3 +6,4 @@ - Adds CORS middleware based on `origins` - Calls `axum::serve` with a listener - Spawns `infra.start` +- Spawns `billing.start` diff --git a/backend/spec/models.md b/backend/spec/models.md index 0af1ea3..8451fc3 100644 --- a/backend/spec/models.md +++ b/backend/spec/models.md @@ -22,10 +22,10 @@ Activity is an audit log of all actions performed by a user or a worker process. - `update_tenant` - `create_relay` - `update_relay` - - `update_relay_plan` - `activate_relay` - `deactivate_relay` - `fail_relay_sync` + - `complete_relay_sync` - `resource_type` is a string identifying the resource type being modified. - `resource_id` is a string identifying the resource id being modified. @@ -35,16 +35,17 @@ A plan represents a rate charged for relays at a given feature/usage limit. Plan - `id` - the plan slug - `name` - the plan name -- `sats` - the plan't cost per month +- `amount` - the plan monthly cost in USD - `members` - the max number of members a relay can have before needing to upgrade. If empty, membership is not limited. - `blossom` - whether blossom media hosting is available on this plan - `livekit` - whether livekit audio/video calls are available on this plan +- `stripe_price_id` - the identifier of the price in Stripe There are three plans available: -- `free` - 0 sats/mo, up to 10 members, no blossom/livekit -- `basic` - 10k sats/mo, up to 100 members, includes blossom/livekit -- `growth` - 50k sats/mo, unlimited members, includes blossom/livekit +- `free` - $0/mo, up to 10 members, no blossom/livekit +- `basic` - $5/mo, up to 100 members, includes blossom/livekit +- `growth` - $25/mo, unlimited members, includes blossom/livekit # Tenant @@ -53,6 +54,8 @@ Tenants are customers of the service, identified by a nostr `pubkey`. Public met - `pubkey` is the nostr public key identifying the tenant - `nwc_url` (private) a nostr wallet connect URL used for **paying** invoices generated by the system - `created_at` unix timestamp identifying tenant creation time +- `stripe_customer_id` a string identifying the associated stripe customer +- `stripe_subscription_id` a string identifying the associated stripe subscription # Relay @@ -63,6 +66,7 @@ A relay is a nostr relay owned by a `tenant` and hosted by the attached zooid in - `schema` - the relay's db schema (read_only, calculated based on `subdomain` + `id`) - `subdomain` - the relay's subdomain - `plan` - the relay's plan +- `stripe_subscription_item_id` - the Stripe subscription item id. - `status` - `active|inactive`. Only `active` relays count toward billing. - `synced` - whether the relay has been successfully synced to zooid at least once. - `sync_error` - a string indicating any errors encountered when synchronizing. diff --git a/backend/spec/query.md b/backend/spec/query.md index d5dd856..49264b2 100644 --- a/backend/spec/query.md +++ b/backend/spec/query.md @@ -35,15 +35,6 @@ Members: - Returns matching relay -## `pub fn max_activity_at(&self) -> Result` - -- Returns the maximum `created_at` value from the activity table, or 0 if empty -- Used by infra to initialize the since guard on startup - -## `pub fn list_activity(&self, since: &i64) -> Result>` - -- Returns all activity occuring after `since` - ## `pub fn list_activity_for_relay(&self, relay_id: &str) -> Result>` - Returns all activity where `resource_type = 'relay'` and `resource_id = relay_id` diff --git a/backend/src/infra.rs b/backend/src/infra.rs index f82593d..8a96841 100644 --- a/backend/src/infra.rs +++ b/backend/src/infra.rs @@ -38,15 +38,8 @@ impl Infra { } pub async fn start(self) { - // Subscribe before catch-up so no activities are missed between query and listen let mut rx = self.command.notify.subscribe(); - // Catch up on any unsynced relays from before this process started - match self.catch_up().await { - Ok(()) => {} - Err(e) => tracing::error!(error = %e, "infra catch-up failed"), - } - loop { match rx.recv().await { Ok(activity) => { @@ -55,27 +48,13 @@ impl Infra { } } Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => { - tracing::warn!(missed = n, "infra lagged, running catch-up"); - if let Err(e) = self.catch_up().await { - tracing::error!(error = %e, "infra catch-up failed"); - } + tracing::warn!(missed = n, "infra lagged"); } Err(tokio::sync::broadcast::error::RecvError::Closed) => break, } } } - async fn catch_up(&self) -> Result<()> { - let relays = self.query.list_relays().await?; - for relay in relays { - if relay.synced == 0 && relay.sync_error.is_empty() { - let is_new = relay.synced == 0; - self.sync_and_report(&relay, is_new).await; - } - } - Ok(()) - } - async fn handle_activity(&self, activity: &Activity) -> Result<()> { let needs_sync = matches!( activity.activity_type.as_str(), diff --git a/backend/src/query.rs b/backend/src/query.rs index 9782055..4ff4199 100644 --- a/backend/src/query.rs +++ b/backend/src/query.rs @@ -113,28 +113,6 @@ impl Query { Ok(row) } - pub async fn max_activity_at(&self) -> Result { - let val = sqlx::query_scalar::<_, Option>( - "SELECT MAX(created_at) FROM activity", - ) - .fetch_one(&self.pool) - .await?; - Ok(val.unwrap_or(0)) - } - - pub async fn list_activity(&self, since: &i64) -> Result> { - let rows = sqlx::query_as::<_, Activity>( - "SELECT id, tenant, created_at, activity_type, resource_type, resource_id - FROM activity - WHERE created_at > ? - ORDER BY created_at, id", - ) - .bind(since) - .fetch_all(&self.pool) - .await?; - Ok(rows) - } - pub async fn list_activity_for_relay(&self, relay_id: &str) -> Result> { let rows = sqlx::query_as::<_, Activity>( "SELECT id, tenant, created_at, activity_type, resource_type, resource_id