Compare commits

...

23 Commits

Author SHA1 Message Date
Jon Staab a654096f25 Refactor stripe module 2026-05-19 18:19:58 -07:00
Jon Staab b49d62f1dd Remove InvoiceLookupError 2026-05-19 17:29:47 -07:00
Jon Staab 2d5eb0ca84 Refactor commands 2026-05-19 17:20:00 -07:00
Jon Staab dde4b981b2 refactor query 2026-05-19 17:04:10 -07:00
Jon Staab 7134915665 Refactor infra 2026-05-15 14:30:26 -07:00
Jon Staab cfa52d739f Clean up relay validation 2026-05-15 13:15:57 -07:00
Jon Staab 6abe62b569 remove invoice auto collection on nwc_url update 2026-05-15 12:54:40 -07:00
Jon Staab cd7b84439e define defaults on the model, simplify create relay payload 2026-05-15 11:25:25 -07:00
Jon Staab 1c3e0d619a Refactor error handling 2026-05-15 11:07:27 -07:00
Jon Staab 5590b14074 Refactor api into different route files 2026-05-15 09:28:12 -07:00
Jon Staab 26f05e8b8f Add env struct 2026-05-14 15:33:28 -07:00
Jon Staab 066c91a4d1 Refactor bitcoin exchange rate fetching and wallet 2026-05-14 12:47:32 -07:00
userAdityaa 3ed021214a feat(infra): pass Blossom S3 config to Zooid with schema key prefix (#69)
Reviewed-on: coracle/caravel#69
Co-authored-by: userAdityaa <aditya.chaudhary1558@gmail.com>
Co-committed-by: userAdityaa <aditya.chaudhary1558@gmail.com>
2026-05-13 15:47:08 +00:00
Jon Staab c0aff5f7cf Refactor billing module 2026-05-12 16:32:05 -07:00
Jon Staab c9c1dd2c4c Group subscription items by price 2026-05-12 15:53:17 -07:00
Jon Staab 679a56edc3 Add docker publish workflow 2026-05-12 14:48:50 -07:00
userAdityaa e7efd9d08b fix: stripe portal dead-end with callback return flow (#67)
Co-authored-by: userAdityaa <aditya.chaudhary1558@gmail.com>
Co-committed-by: userAdityaa <aditya.chaudhary1558@gmail.com>
2026-05-11 20:48:55 +00:00
userAdityaa 0151762362 chore: improve billing customer name using Nostr kind 0 with pubkey fallback (#66)
Co-authored-by: userAdityaa <aditya.chaudhary1558@gmail.com>
Co-committed-by: userAdityaa <aditya.chaudhary1558@gmail.com>
2026-05-08 22:52:13 +00:00
userAdityaa a79c43e17e feat: open payment modal immediately on relay plan upgrade (#64)
Co-authored-by: userAdityaa <aditya.chaudhary1558@gmail.com>
Co-committed-by: userAdityaa <aditya.chaudhary1558@gmail.com>
2026-05-07 18:35:24 +00:00
Jon Staab dbe25c372f Conflate id and schema 2026-05-05 17:47:13 -07:00
userAdityaa 80a86452d0 chore: encrypt tenant NWC URL at rest and stop secret exposure in tenant APIs (#58)
Co-authored-by: userAdityaa <aditya.chaudhary1558@gmail.com>
Co-committed-by: userAdityaa <aditya.chaudhary1558@gmail.com>
2026-05-05 20:42:12 +00:00
userAdityaa b1e3747ddb fix: manual Lightning payment reconciliation with Stripe invoice state (#54)
Reviewed-on: coracle/caravel#54
Co-authored-by: userAdityaa <aditya.chaudhary1558@gmail.com>
Co-committed-by: userAdityaa <aditya.chaudhary1558@gmail.com>
2026-05-01 23:38:57 +00:00
userAdityaa 29f657635c fix: relay sync create/update classification to prevent false create mode on updates (#56)
Co-authored-by: userAdityaa <aditya.chaudhary1558@gmail.com>
Co-committed-by: userAdityaa <aditya.chaudhary1558@gmail.com>
2026-05-01 14:21:37 +00:00
49 changed files with 2916 additions and 2959 deletions
+59
View File
@@ -0,0 +1,59 @@
name: Docker
on:
push:
branches: [master]
env:
REGISTRY: gitea.coracle.social
jobs:
build-and-push-image:
runs-on: ubuntu-latest
permissions:
contents: read
packages: write
strategy:
matrix:
include:
- component: frontend
image: coracle/caravel-frontend
context: frontend
- component: backend
image: coracle/caravel-backend
context: backend
steps:
- name: Checkout repository
uses: actions/checkout@v4
- name: Log in to the Container registry
uses: docker/login-action@v3
with:
registry: ${{ env.REGISTRY }}
username: hodlbod
password: ${{ secrets.PACKAGE_TOKEN }}
- name: Extract metadata (tags, labels) for Docker
id: meta
uses: docker/metadata-action@v5
with:
images: ${{ env.REGISTRY }}/${{ matrix.image }}
tags: |
type=raw,value=latest,enable=${{ github.ref == 'refs/heads/master' }}
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
with:
driver: docker-container
- name: Build and push Docker image
id: push
uses: docker/build-push-action@v5
with:
context: ${{ matrix.context }}
push: true
platforms: linux/amd64,linux/arm64
tags: ${{ steps.meta.outputs.tags }}
labels: ${{ steps.meta.outputs.labels }}
+1
View File
@@ -1,4 +1,5 @@
ref ref
todo.md
node_modules node_modules
target target
data data
+1 -1
View File
@@ -26,7 +26,7 @@ docker run -it \
-v ./config:/app/config \ -v ./config:/app/config \
-v ./media:/app/media \ -v ./media:/app/media \
-v ./data:/app/data \ -v ./data:/app/data \
ghcr.io/coracle-social/zooid gitea.coracle.social/coracle/zooid
``` ```
### 2. Configure the backend ### 2. Configure the backend
+17 -11
View File
@@ -1,10 +1,8 @@
# Server # Server
HOST=127.0.0.1 SERVER_HOST=127.0.0.1
PORT=2892 SERVER_PORT=2892
ALLOW_ORIGINS= # Optional comma-separated allowed CORS origins; empty = permissive SERVER_ALLOW_ORIGINS= # Optional comma-separated allowed CORS origins; empty = permissive
SERVER_ADMIN_PUBKEYS= # Comma-separated hex pubkeys with admin access
# Auth
ADMINS= # Comma-separated hex pubkeys with admin access
# Database # Database
DATABASE_URL=sqlite://data/caravel.db DATABASE_URL=sqlite://data/caravel.db
@@ -14,19 +12,27 @@ ROBOT_SECRET= # Nostr private key (hex)
ROBOT_NAME= ROBOT_NAME=
ROBOT_DESCRIPTION= ROBOT_DESCRIPTION=
ROBOT_PICTURE= ROBOT_PICTURE=
ROBOT_OUTBOX_RELAYS=relay.damus.io,relay.primal.net,nos.lol ROBOT_WALLET= # Nostr Wallet Connect URL for generating Lightning invoices
ROBOT_INDEXER_RELAYS=purplepag.es,relay.damus.io,indexer.coracle.social ROBOT_OUTBOX_RELAYS=wss://relay.damus.io,wss://relay.primal.net,wss://nos.lol
ROBOT_MESSAGING_RELAYS=auth.nostr1.com,relay.keychat.io,relay.ditto.pub ROBOT_INDEXER_RELAYS=wss://purplepag.es,wss://relay.damus.io,wss://indexer.coracle.social
ROBOT_MESSAGING_RELAYS=wss://auth.nostr1.com,wss://relay.keychat.io,wss://relay.ditto.pub
# Zooid # Zooid
ZOOID_API_URL=http://127.0.0.1:3334 ZOOID_API_URL=http://127.0.0.1:3334
ZOOID_API_SECRET=
RELAY_DOMAIN=spaces.coracle.social RELAY_DOMAIN=spaces.coracle.social
LIVEKIT_URL= LIVEKIT_URL=
LIVEKIT_API_KEY= LIVEKIT_API_KEY=
LIVEKIT_API_SECRET= LIVEKIT_API_SECRET=
# Blossom S3 (optional; when region, bucket, access key, and secret are all set, relays with blossom enabled sync with adapter s3 and key_prefix = relay schema)
BLOSSOM_S3_ENDPOINT=
BLOSSOM_S3_REGION=
BLOSSOM_S3_BUCKET=
BLOSSOM_S3_ACCESS_KEY=
BLOSSOM_S3_SECRET_KEY=
# Billing # Billing
NWC_URL= # Nostr Wallet Connect URL for generating Lightning invoices
STRIPE_SECRET_KEY= # Required Stripe API secret key (sk_...) STRIPE_SECRET_KEY= # Required Stripe API secret key (sk_...)
STRIPE_WEBHOOK_SECRET=whsec_test_00000000000000000000000000 # Webhook signing secret (use real value in production) STRIPE_WEBHOOK_SECRET=whsec_test_00000000000000000000000000 # Webhook signing secret (use real value in production)
STRIPE_PRICE_BASIC= # Stripe price ID (price_...) for the Basic plan; required for paid plans
STRIPE_PRICE_GROWTH= # Stripe price ID (price_...) for the Growth plan; required for paid plans
+13
View File
@@ -210,6 +210,7 @@ dependencies = [
"nostr-sdk", "nostr-sdk",
"nwc", "nwc",
"rand 0.8.5", "rand 0.8.5",
"regex",
"reqwest", "reqwest",
"serde", "serde",
"serde_json", "serde_json",
@@ -1894,6 +1895,18 @@ dependencies = [
"bitflags", "bitflags",
] ]
[[package]]
name = "regex"
version = "1.12.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e10754a14b9137dd7b1e3e5b0493cc9171fdd105e0ab477f51b72e7f3ac0e276"
dependencies = [
"aho-corasick",
"memchr",
"regex-automata",
"regex-syntax",
]
[[package]] [[package]]
name = "regex-automata" name = "regex-automata"
version = "0.4.14" version = "0.4.14"
+1
View File
@@ -24,6 +24,7 @@ hmac = "0.12"
sha2 = "0.10" sha2 = "0.10"
dotenvy = "0.15.7" dotenvy = "0.15.7"
base64 = "0.22" base64 = "0.22"
regex = "1"
[dev-dependencies] [dev-dependencies]
tower = { version = "0.5", features = ["util"] } tower = { version = "0.5", features = ["util"] }
+8 -1
View File
@@ -38,14 +38,21 @@ Environment variables:
| `ADMINS` | Comma-separated admin pubkeys (hex) | _optional_ | | `ADMINS` | Comma-separated admin pubkeys (hex) | _optional_ |
| `ALLOW_ORIGINS` | Comma-separated CORS origins. If empty, CORS is permissive. | _optional_ | | `ALLOW_ORIGINS` | Comma-separated CORS origins. If empty, CORS is permissive. | _optional_ |
| `ZOOID_API_URL` | Zooid API base URL used by infra worker | _required for infra sync_ | | `ZOOID_API_URL` | Zooid API base URL used by infra worker | _required for infra sync_ |
| `ZOOID_API_SECRET` | Nostr secret key used for authentication of requests to the zooid API | _required_ |
| `RELAY_DOMAIN` | Base domain appended to relay subdomains | empty | | `RELAY_DOMAIN` | Base domain appended to relay subdomains | empty |
| `LIVEKIT_URL` | LiveKit URL sent to zooid when relay livekit is enabled | _optional_ | | `LIVEKIT_URL` | LiveKit URL sent to zooid when relay livekit is enabled | _optional_ |
| `LIVEKIT_API_KEY` | LiveKit API key sent to zooid | _optional_ | | `LIVEKIT_API_KEY` | LiveKit API key sent to zooid | _optional_ |
| `LIVEKIT_API_SECRET` | LiveKit API secret sent to zooid | _optional_ | | `LIVEKIT_API_SECRET` | LiveKit API secret sent to zooid | _optional_ |
| `BLOSSOM_S3_ENDPOINT` | S3-compatible endpoint URL for Blossom; omit for AWS S3 | _optional_ |
| `BLOSSOM_S3_REGION` | S3 region; with bucket, access key, and secret enables S3 for Blossom | _optional_ |
| `BLOSSOM_S3_BUCKET` | S3 bucket name | _optional_ |
| `BLOSSOM_S3_ACCESS_KEY` | S3 access key ID | _optional_ |
| `BLOSSOM_S3_SECRET_KEY` | S3 secret access key | _optional_ |
| `NWC_URL` | Platform NWC URL used to generate BOLT11 invoices | _required for invoice generation_ | | `NWC_URL` | Platform NWC URL used to generate BOLT11 invoices | _required for invoice generation_ |
| `ENCRYPTION_SECRET` | Nostr secret key (hex or nsec) used to encrypt tenant NWC URLs at rest | _required_ |
| `STRIPE_SECRET_KEY` | Stripe API secret key used for billing API operations | _required_ | | `STRIPE_SECRET_KEY` | Stripe API secret key used for billing API operations | _required_ |
| `STRIPE_WEBHOOK_SECRET` | Stripe webhook signing secret used to verify `Stripe-Signature` headers | _required_ | | `STRIPE_WEBHOOK_SECRET` | Stripe webhook signing secret used to verify `Stripe-Signature` headers | _required_ |
| `STRIPE_PRICE_BASIC` | Stripe price ID (`price_...`) backing the Basic plan | _required for paid plans_ |
| `STRIPE_PRICE_GROWTH` | Stripe price ID (`price_...`) backing the Growth plan | _required for paid plans_ |
| `ROBOT_SECRET` | Robot Nostr secret key | _required_ | | `ROBOT_SECRET` | Robot Nostr secret key | _required_ |
| `ROBOT_NAME` | Robot display name (kind `0`) | _optional_ | | `ROBOT_NAME` | Robot display name (kind `0`) | _optional_ |
| `ROBOT_DESCRIPTION` | Robot description (kind `0`) | _optional_ | | `ROBOT_DESCRIPTION` | Robot description (kind `0`) | _optional_ |
+36
View File
@@ -39,3 +39,39 @@ CREATE TABLE IF NOT EXISTS relay (
push_enabled INTEGER NOT NULL DEFAULT 1, push_enabled INTEGER NOT NULL DEFAULT 1,
FOREIGN KEY (tenant) REFERENCES tenant(pubkey) FOREIGN KEY (tenant) REFERENCES tenant(pubkey)
); );
CREATE TABLE IF NOT EXISTS invoice_nwc_payment (
invoice_id TEXT PRIMARY KEY,
tenant_pubkey TEXT NOT NULL,
state TEXT NOT NULL CHECK (state IN ('pending', 'paid')),
created_at INTEGER NOT NULL,
updated_at INTEGER NOT NULL,
FOREIGN KEY (tenant_pubkey) REFERENCES tenant(pubkey)
);
CREATE TABLE IF NOT EXISTS invoice_manual_lightning_payment (
invoice_id TEXT PRIMARY KEY,
tenant_pubkey TEXT NOT NULL,
bolt11 TEXT NOT NULL,
created_at INTEGER NOT NULL,
updated_at INTEGER NOT NULL,
FOREIGN KEY (tenant_pubkey) REFERENCES tenant(pubkey)
);
CREATE INDEX IF NOT EXISTS idx_tenant_stripe_customer_id
ON tenant (stripe_customer_id);
CREATE INDEX IF NOT EXISTS idx_relay_tenant_id
ON relay (tenant, id);
CREATE INDEX IF NOT EXISTS idx_relay_tenant_status_plan
ON relay (tenant, status, plan);
CREATE INDEX IF NOT EXISTS idx_activity_resource_type_resource_id_created_at_id
ON activity (resource_type, resource_id, created_at DESC, id DESC);
CREATE INDEX IF NOT EXISTS idx_invoice_nwc_payment_tenant_pubkey
ON invoice_nwc_payment (tenant_pubkey);
CREATE INDEX IF NOT EXISTS idx_invoice_manual_lightning_payment_tenant_pubkey
ON invoice_manual_lightning_payment (tenant_pubkey);
@@ -1,11 +0,0 @@
CREATE INDEX IF NOT EXISTS idx_tenant_stripe_customer_id
ON tenant (stripe_customer_id);
CREATE INDEX IF NOT EXISTS idx_relay_tenant_id
ON relay (tenant, id);
CREATE INDEX IF NOT EXISTS idx_relay_tenant_status_plan
ON relay (tenant, status, plan);
CREATE INDEX IF NOT EXISTS idx_activity_resource_type_resource_id_created_at_id
ON activity (resource_type, resource_id, created_at DESC, id DESC);
@@ -1,11 +0,0 @@
CREATE TABLE IF NOT EXISTS invoice_nwc_payment (
invoice_id TEXT PRIMARY KEY,
tenant_pubkey TEXT NOT NULL,
state TEXT NOT NULL CHECK (state IN ('pending', 'paid')),
created_at INTEGER NOT NULL,
updated_at INTEGER NOT NULL,
FOREIGN KEY (tenant_pubkey) REFERENCES tenant(pubkey)
);
CREATE INDEX IF NOT EXISTS idx_invoice_nwc_payment_tenant_pubkey
ON invoice_nwc_payment (tenant_pubkey);
+6 -5
View File
@@ -57,7 +57,7 @@ Notes:
- Serves `GET /tenants` - Serves `GET /tenants`
- Authorizes admin only - Authorizes admin only
- Return `data` is a list of tenant structs from `query.list_tenants` - Return `data` is a list of `TenantResponse` structs (contains `nwc_is_set: bool` instead of `nwc_url`)
## `async fn create_tenant(...) -> Response` ## `async fn create_tenant(...) -> Response`
@@ -69,20 +69,21 @@ Notes:
- On unique-constraint race (`pubkey-exists`), re-fetch and return the existing tenant - On unique-constraint race (`pubkey-exists`), re-fetch and return the existing tenant
- If Stripe customer creation fails, return `code=stripe-customer-create-failed` - If Stripe customer creation fails, return `code=stripe-customer-create-failed`
- Always returns `200` (create-or-get is uniform) - Always returns `200` (create-or-get is uniform)
- Return `data` is a single `Tenant` struct - Return `data` is a single `TenantResponse` struct (contains `nwc_is_set: bool` instead of `nwc_url`)
## `async fn get_tenant(...) -> Response` ## `async fn get_tenant(...) -> Response`
- Serves `GET /tenants/:pubkey` - Serves `GET /tenants/:pubkey`
- Authorizes admin or matching tenant - Authorizes admin or matching tenant
- Return `data` is a single tenant struct from `query.get_tenant` - Return `data` is a single `TenantResponse` struct (contains `nwc_is_set: bool` instead of `nwc_url`)
## `async fn update_tenant(...) -> Response` ## `async fn update_tenant(...) -> Response`
- Serves `PUT /tenants/:pubkey` - Serves `PUT /tenants/:pubkey`
- Authorizes admin or matching tenant - Authorizes admin or matching tenant
- Accepts `nwc_url` in the request body; encrypts it before storage using `cipher::encrypt`
- Updates tenant using `command.update_tenant` - Updates tenant using `command.update_tenant`
- Return `data` is the updated tenant struct - Return `data` is the updated `TenantResponse` struct (contains `nwc_is_set: bool` instead of `nwc_url`)
## `async fn list_tenant_relays(...) -> Response` ## `async fn list_tenant_relays(...) -> Response`
@@ -135,7 +136,7 @@ Notes:
- Serves `GET /relays/:id/activity` - Serves `GET /relays/:id/activity`
- Authorizes admin or relay owner - Authorizes admin or relay owner
- Get activity from `query.list_activity_for_relay` - Get activity from `query.list_activity_for_resource`
- Return `data` is `{activity}` - Return `data` is `{activity}`
## `async fn deactivate_relay(...) -> Response` ## `async fn deactivate_relay(...) -> Response`
+56 -41
View File
@@ -2,45 +2,47 @@
Billing encapsulates logic related to synchronizing state with Stripe, processing payments via NWC, and managing subscription lifecycle. Billing encapsulates logic related to synchronizing state with Stripe, processing payments via NWC, and managing subscription lifecycle.
It owns the domain logic only: Stripe REST calls go through `Stripe` (see `spec/stripe.md`), NWC wallet operations through `Wallet` (see `spec/wallet.md`), and fiat → msats conversion through `bitcoin` (see `spec/bitcoin.md`).
Members: Members:
- `nwc_url: String` - a nostr wallet connect URL used to **create** bolt11 invoices (i.e. receive payments), from `NWC_URL` - `stripe: Stripe` - Stripe REST wrapper (see `spec/stripe.md`)
- `stripe_secret_key: String` - Stripe API key used for billing API operations, from `STRIPE_SECRET_KEY` - `wallet: Wallet` - the system NWC wallet, used to issue and look up bolt11 invoices (see `spec/wallet.md`)
- `stripe_webhook_secret: String` - secret for verifying Stripe webhook signatures, from `STRIPE_WEBHOOK_SECRET`
- `query: Query` - `query: Query`
- `command: Command` - `command: Command`
- `robot: Robot` - `robot: Robot`
## `pub fn new(query: Query, command: Command, robot: Robot) -> Self` ## `pub fn new(query: Query, command: Command, robot: Robot) -> Self`
- Reads environment and populates members - Builds `stripe` via `Stripe::from_env()` and `wallet` from `NWC_URL`
- Panics if `STRIPE_SECRET_KEY` is missing/empty - Panics if `STRIPE_SECRET_KEY`, `STRIPE_WEBHOOK_SECRET`, or `NWC_URL` is missing or malformed
- Panics if `STRIPE_WEBHOOK_SECRET` is missing/empty
## `pub fn start(&self)` ## `pub fn start(&self)`
- Subscribes to `command.notify.subscribe()` - Subscribes to `command.notify.subscribe()`
- On `create_relay`, `update_relay`, `activate_relay`, `deactivate_relay`, `fail_relay_sync`, and `complete_relay_sync`, call `self.sync_relay_subscription`. - On `create_relay`, `update_relay`, `activate_relay`, `deactivate_relay`, `fail_relay_sync`, and `complete_relay_sync`: resolve the relay named by the activity (skip if it no longer exists) and reconcile its tenant via `sync_tenant_subscription`.
- The startup/lagged reconcile loop calls `sync_tenant_subscription` for every tenant.
## `pub fn sync_relay_subscription(&self, activity: &Activity)` ## `fn sync_tenant_subscription(&self, tenant_pubkey: &str)`
Manages the Stripe subscription and subscription items for a relay's tenant. Only paid (non-free) relays interact with Stripe. Free-only tenants have no subscription. Must be idempotent. Reconciles a tenant's single Stripe subscription with the set of relays that should be billed. Only paid (non-free) relays interact with Stripe. Free-only tenants have no subscription. Must be idempotent.
Stripe forbids two subscription items on the same subscription from sharing a price, so billing is modeled as **one subscription item per plan (price), with `quantity` equal to the number of the tenant's `active` relays on that plan**. Every such relay's `stripe_subscription_item_id` points at the shared item for its plan; relays that aren't billed (free, inactive, delinquent) have it cleared.
Stripe uses **pay-in-advance** by default: when a subscription is first created, Stripe immediately generates an open invoice for the current period. The `invoice.created` webhook fires shortly after and `handle_invoice_created` attempts payment. Stripe uses **pay-in-advance** by default: when a subscription is first created, Stripe immediately generates an open invoice for the current period. The `invoice.created` webhook fires shortly after and `handle_invoice_created` attempts payment.
- Fetch the relay and tenant associated with the `activity` - Fetch the tenant and its relays. Build the desired state: for each `active` relay on a paid plan with a non-empty `stripe_price_id`, count relays per price.
- **If relay plan is `free`**: if the relay has a `stripe_subscription_item_id`, delete it via the Stripe API and call `command.delete_relay_subscription_item`. Then run downgrade proration validation by previewing the upcoming invoice and logging proration lines/amounts. Then check cleanup (below). Return early. - **Resolve the live subscription**: if the tenant has a `stripe_subscription_id`, fetch it. If Stripe no longer knows about it, or its status is `canceled`/`incomplete_expired`, call `command.clear_tenant_subscription` and treat the tenant as having no subscription.
- **If relay is `inactive` or `delinquent`**: if the relay has a `stripe_subscription_item_id`, delete it via the Stripe API and call `command.delete_relay_subscription_item`. Then check cleanup (below). Return early. - **No relays to bill** (desired state empty): if the tenant still has a `stripe_subscription_id`, cancel the Stripe subscription and call `command.clear_tenant_subscription`. Clear `stripe_subscription_item_id` on every relay that has one. Return.
- **If relay is `active` and on a paid plan**: - **No subscription yet**: create a Stripe subscription for the customer with `collection_method: "charge_automatically"` and one item per `(price, quantity)`. Save the subscription ID via `command.set_tenant_subscription`.
- **Ensure subscription exists**: If the tenant has no `stripe_subscription_id`, create a Stripe subscription for the customer with `collection_method: "charge_automatically"` and the relay's price as the first item. Save the subscription ID via `command.set_tenant_subscription` and the item ID via `command.set_relay_subscription_item`. Return early. - **Existing subscription**: fetch its current items. For each desired `(price, quantity)`: update the matching item's quantity if it differs, otherwise create the item. Delete any item whose price no longer appears in the desired state.
- **Sync the subscription item**: If the tenant already has a subscription, create or update the relay's Stripe subscription item to the plan's `stripe_price_id` via the Stripe API, then call `command.set_relay_subscription_item`. - **Point relays at items**: for each relay, set `stripe_subscription_item_id` (via `command.set_relay_subscription_item`) to the shared item for its plan, or clear it (via `command.delete_relay_subscription_item`) if the relay is not billed.
- **Downgrade validation**: when changing an existing subscription item, detect if the new plan amount is lower than the current one. If yes, preview the upcoming Stripe invoice and log proration line/amount details to validate expected credit/proration behavior. - **Downgrade validation**: if any quantity decreased or any item was removed, preview the upcoming Stripe invoice and log proration line/amount details to validate expected credit/proration behavior.
- **Clean up empty subscription**: After any item deletion, check if the tenant has any remaining active paid relays. If none and the tenant has a `stripe_subscription_id`, cancel the Stripe subscription immediately and call `command.clear_tenant_subscription`.
## `pub fn handle_webhook(&self, payload: &str, signature: &str) -> Result<()>` ## `pub fn handle_webhook(&self, payload: &str, signature: &str) -> Result<()>`
- Verify the webhook signature using `self.stripe_webhook_secret` - Verify and parse the event via `self.stripe.construct_event(payload, signature)` (checks the `Stripe-Signature` HMAC and timestamp tolerance — see `spec/stripe.md`)
- Parse the event and dispatch by type: - Dispatch by type:
- `invoice.created` -> `self.handle_invoice_created` - `invoice.created` -> `self.handle_invoice_created`
- `invoice.paid` -> `self.handle_invoice_paid` - `invoice.paid` -> `self.handle_invoice_paid`
- `invoice.payment_failed` -> `self.handle_invoice_payment_failed` - `invoice.payment_failed` -> `self.handle_invoice_payment_failed`
@@ -50,56 +52,69 @@ Stripe uses **pay-in-advance** by default: when a subscription is first created,
- `payment_method.attached` -> `self.handle_payment_method_attached` - `payment_method.attached` -> `self.handle_payment_method_attached`
- Unknown event types are ignored (return Ok) - Unknown event types are ignored (return Ok)
## `pub async fn stripe_create_customer(&self, tenant_pubkey: &str) -> Result<String>`
- Resolves a display name via `robot.fetch_nostr_name(tenant_pubkey)`, falling back to the first 8 chars of the pubkey
- Creates the Stripe customer via `stripe.create_customer(display_name, tenant_pubkey)` and returns its id
## `pub async fn stripe_list_invoices(&self, customer_id: &str) -> Result<Value>` ## `pub async fn stripe_list_invoices(&self, customer_id: &str) -> Result<Value>`
- Fetches invoices from Stripe API for the given customer - Delegates to `stripe.list_invoices` — returns the `data` array of the customer's invoices
- Returns the `data` array from the Stripe response
## `pub async fn stripe_get_invoice(&self, invoice_id: &str) -> Result<Value>` ## `pub async fn stripe_create_portal_session(&self, customer_id: &str, return_url: Option<&str>) -> Result<String>`
- Fetches a single invoice from Stripe API by ID - Delegates to `stripe.create_portal_session` — returns the Customer Portal session URL
- Returns the full Stripe invoice object
## `pub async fn create_bolt11(&self, amount_due_cents: i64) -> Result<String>` ## `pub async fn get_invoice_with_tenant(&self, invoice_id: &str) -> Result<(Value, Tenant), InvoiceLookupError>`
- Creates a bolt11 Lightning invoice for the given amount using the system NWC wallet (`self.nwc_url`) - Fetches the invoice via `stripe.get_invoice` (a Stripe 4xx surfaces as `InvoiceLookupError::StripeClient`)
- Looks up the tenant by the invoice's `customer` field; errors if the invoice has no customer or no tenant matches
## `pub async fn reconcile_manual_lightning_invoice(&self, invoice_id: &str, invoice: &Value) -> Result<Value, InvoiceLookupError>`
If `invoice.status == "open"` and a manual-Lightning bolt11 was previously issued for it (`query.get_invoice_manual_lightning_bolt11`), check whether that bolt11 has settled (`self.wallet.is_settled(...)`). If it has, mark the Stripe invoice paid out of band (`stripe.pay_invoice_out_of_band`) and return the refreshed invoice. On any lookup/settlement failure, log and return the invoice unchanged.
## `pub async fn get_or_create_manual_lightning_bolt11(&self, invoice_id: &str, tenant_pubkey: &str, amount_due_minor: i64, currency: &str) -> Result<String>`
- Returns the existing bolt11 if one is already recorded for the invoice
- Otherwise creates one via `create_bolt11`, records it with `command.insert_manual_lightning_invoice_payment`, and returns it (re-reading the stored row if the insert lost a race)
## `pub async fn create_bolt11(&self, amount_due_minor: i64, currency: &str) -> Result<String>`
- Converts the fiat amount to msats via `bitcoin::fiat_to_msats` (fetches the live BTC spot price — see `spec/bitcoin.md`)
- Issues a bolt11 invoice for that amount on the system NWC wallet (`self.wallet.make_invoice(...)`)
- Returns the bolt11 invoice string - Returns the bolt11 invoice string
## `pub async fn stripe_create_portal_session(&self, customer_id: &str) -> Result<String>`
- Creates a Stripe Customer Portal session for the given customer
- Returns the portal session URL
## `pub async fn pay_outstanding_nwc_invoices(&self, tenant: &Tenant) -> Result<()>` ## `pub async fn pay_outstanding_nwc_invoices(&self, tenant: &Tenant) -> Result<()>`
Called when a tenant first sets their NWC URL (via `PUT /tenants/:pubkey`). Attempts to pay any currently open invoices for the tenant using their NWC wallet, so that invoices created before NWC was configured are not left unpaid. Called when a tenant first sets their NWC URL (via `PUT /tenants/:pubkey`). Attempts to pay any currently open invoices for the tenant using their NWC wallet, so that invoices created before NWC was configured are not left unpaid.
- If `tenant.nwc_url` is empty, return early. - If `tenant.nwc_url` is empty, return early.
- List all Stripe invoices for `tenant.stripe_customer_id` via `stripe_list_invoices`. - List all Stripe invoices for `tenant.stripe_customer_id` via `stripe.list_invoices`.
- For each invoice with `status == "open"` and `amount_due > 0`: - For each invoice with `status == "open"` and `amount_due > 0`:
- Attempt NWC payment via `nwc_pay_invoice`. - Attempt NWC payment via `nwc_pay_invoice`.
- On success: call `stripe_pay_invoice_out_of_band` and `command.clear_tenant_nwc_error`. - On success: mark the Stripe invoice paid out of band (`stripe.pay_invoice_out_of_band`) and call `command.clear_tenant_nwc_error`.
- On failure: call `command.set_tenant_nwc_error` and log the error; continue to the next invoice. - On failure: call `command.set_tenant_nwc_error` and log the error; continue to the next invoice.
## `pub async fn pay_outstanding_card_invoices(&self, tenant: &Tenant) -> Result<()>` ## `pub async fn pay_outstanding_card_invoices(&self, tenant: &Tenant) -> Result<()>`
Attempts Stripe-side collection for open invoices when the tenant has a card on file. Attempts Stripe-side collection for open invoices when the tenant has a card on file.
- If tenant has no card payment method, return early. - If tenant has no card payment method (`stripe.has_payment_method`), return early.
- List all Stripe invoices for `tenant.stripe_customer_id`. - List all Stripe invoices for `tenant.stripe_customer_id` via `stripe.list_invoices`.
- For each invoice with `status == "open"` and `amount_due > 0`: - For each invoice with `status == "open"` and `amount_due > 0`:
- Call Stripe `POST /v1/invoices/:id/pay` to retry collection using the card on file. - Call `stripe.pay_invoice` to retry collection using the card on file.
- Log and continue on failures. - Log and continue on failures.
## `fn handle_invoice_created(&self, invoice: &Invoice)` ## `fn handle_invoice_created(&self, invoice: &Invoice)`
Attempts to pay a new subscription invoice. Because Stripe defaults to pay-in-advance, this webhook fires immediately when a subscription is created (i.e. when a paid relay is added or a plan is upgraded). Payment priority: Attempts to pay a new subscription invoice. Because Stripe defaults to pay-in-advance, this webhook fires immediately when a subscription is created (i.e. when a paid relay is added or a plan is upgraded). Payment priority:
1. **NWC auto-pay**: If the tenant has a `nwc_url`: 1. **NWC auto-pay**: If the tenant has a `nwc_url`, run `nwc_pay_invoice` (decrypting the tenant's stored `nwc_url` first):
- Create a bolt11 Lightning invoice for the invoice amount using `self.nwc_url` (the receiving/system wallet) - The system wallet (`self.wallet`) issues a bolt11 invoice for the fiat amount; the tenant's wallet (`Wallet::from_url` of the decrypted URL) pays it. A `pending` row in `invoice_nwc_payment` guards against double-charging across retries.
- Pay the bolt11 invoice using the tenant's `nwc_url` (the spending/tenant wallet) - If payment succeeds: mark the Stripe invoice paid out of band (`stripe.pay_invoice_out_of_band`) and clear `nwc_error` via `command.clear_tenant_nwc_error`. Done.
- If payment succeeds: call Stripe `POST /v1/invoices/:id/pay` with `paid_out_of_band: true`. Clear `nwc_error` via `command.clear_tenant_nwc_error`. - If it fails before any charge could have gone out: set `nwc_error` on the tenant via `command.set_tenant_nwc_error`, and fall through to the next option (carrying a short summary of the error into the eventual DM).
- If payment fails: set `nwc_error` on tenant via `command.set_tenant_nwc_error`. Fall through to next option. - If it fails after a charge may have gone out (needs reconciliation): set `nwc_error` and return the error without falling through — a human must reconcile before any retry.
2. **Card on file**: If the tenant has a payment method on the Stripe customer, do nothing here — Stripe will charge automatically for this invoice attempt. 2. **Card on file**: If the tenant has a payment method on the Stripe customer, do nothing here — Stripe will charge automatically for this invoice attempt.
3. **Manual payment**: If neither NWC nor card is available, send a DM via `robot.send_dm` notifying the tenant that payment is due with a link to the application for manual Lightning payment. 3. **Manual payment**: If neither NWC nor card is available, send a DM via `robot.send_dm` notifying the tenant that payment is due with a link to the application for manual Lightning payment.
+11
View File
@@ -0,0 +1,11 @@
# `bitcoin` — fiat ↔ Bitcoin conversion
Free async helpers for pricing fiat amounts in Lightning units against a live BTC spot price. The NWC wallet lives in `spec/wallet.md`; billing orchestration lives in `spec/billing.md`.
## `pub async fn fiat_to_msats(amount_fiat_minor: i64, currency: &str) -> Result<u64>`
Converts a Stripe-style minor-unit fiat amount to millisatoshis using the live BTC spot price for `currency` and Stripe's per-currency decimal exponent (most currencies 2; `JPY`/`KRW`/… 0; `BHD`/`KWD`/… 3).
## `pub async fn get_bitcoin_price(currency: &str) -> Result<f64>`
Returns the current BTC spot price in `currency`, fetched from Coinbase's public spot-price endpoint.
+2
View File
@@ -5,6 +5,7 @@ Infra is a service which listens for activity and synchronizes relay updates to
Members: Members:
- `api_url: String` - the URL of the zooid instance to be managed, from `ZOOID_API_URL` - `api_url: String` - the URL of the zooid instance to be managed, from `ZOOID_API_URL`
- `blossom_s3: Option<BlossomS3Sync>` - shared Blossom S3 settings from `BLOSSOM_S3_*` when region, bucket, access key, and secret are all non-empty after trim
- `query: Query` - `query: Query`
- `command: Command` - `command: Command`
@@ -36,3 +37,4 @@ Members:
- Otherwise, sends `PATCH /relay/:id` to update it. - Otherwise, sends `PATCH /relay/:id` to update it.
- Includes `secret` only for relay creation (`POST`) so updates do not rotate relay identity. - Includes `secret` only for relay creation (`POST`) so updates do not rotate relay identity.
- Passes relay configuration in the body including host, schema, inactive flag, info, policy, groups, management, blossom, livekit, push, and roles. - Passes relay configuration in the body including host, schema, inactive flag, info, policy, groups, management, blossom, livekit, push, and roles.
- When `blossom_s3` is configured and the relay has blossom enabled, the blossom section includes `adapter: "s3"`, S3 fields from the environment, and `s3.key_prefix` set to the relay's `schema`. Otherwise blossom omits S3 (zooid defaults to local storage) or sends `{ "enabled": false }` when blossom is disabled.
+3 -3
View File
@@ -52,7 +52,7 @@ There are three plans available:
Tenants are customers of the service, identified by a nostr `pubkey`. Public metadata like name etc are pulled from the nostr network. They also have associated billing information. Tenants are customers of the service, identified by a nostr `pubkey`. Public metadata like name etc are pulled from the nostr network. They also have associated billing information.
- `pubkey` is the nostr public key identifying the tenant - `pubkey` is the nostr public key identifying the tenant
- `nwc_url` (private) a nostr wallet connect URL used for **paying** invoices generated by the system on the tenant's behalf - `nwc_url` (private) a nostr wallet connect URL used for **paying** invoices generated by the system on the tenant's behalf; stored encrypted at rest using NIP-44 via `ENCRYPTION_SECRET`; never serialized to API responses — tenant API endpoints expose `nwc_is_set: bool` instead
- `nwc_error` (private) a string indicating the most recent NWC payment error, if any. Cleared on successful NWC payment. - `nwc_error` (private) a string indicating the most recent NWC payment error, if any. Cleared on successful NWC payment.
- `created_at` unix timestamp identifying tenant creation time - `created_at` unix timestamp identifying tenant creation time
- `stripe_customer_id` a string identifying the associated stripe customer - `stripe_customer_id` a string identifying the associated stripe customer
@@ -63,9 +63,9 @@ Tenants are customers of the service, identified by a nostr `pubkey`. Public met
A relay is a nostr relay owned by a `tenant` and hosted by the attached zooid instance. Relay subdomains MUST be unique. A relay is a nostr relay owned by a `tenant` and hosted by the attached zooid instance. Relay subdomains MUST be unique.
- `id` - a random ID identifying the relay - `id` - calculated based on `subdomain` + 8 random hex chars
- `tenant` - the tenant's pubkey - `tenant` - the tenant's pubkey
- `schema` - the relay's db schema (read_only, calculated based on `subdomain` + `id`) - `schema` - the relay's db schema (read only, same as `id`)
- `subdomain` - the relay's subdomain - `subdomain` - the relay's subdomain
- `plan` - the relay's plan - `plan` - the relay's plan
- `stripe_subscription_item_id` (nullable) - the Stripe subscription item id. Only set for relays on paid plans. - `stripe_subscription_item_id` (nullable) - the Stripe subscription item id. Only set for relays on paid plans.
+1 -5
View File
@@ -39,11 +39,7 @@ Members:
- Returns the tenant matching the given `stripe_customer_id` - Returns the tenant matching the given `stripe_customer_id`
## `pub fn has_active_paid_relays(&self, tenant_id: &str) -> Result<bool>` ## `pub fn list_activity_for_resource(&self, relay_id: &str) -> Result<Vec<Activity>>`
- Returns true if the tenant has any relays where `status = 'active'` and `plan != 'free'`
## `pub fn list_activity_for_relay(&self, relay_id: &str) -> Result<Vec<Activity>>`
- Returns all activity where `resource_type = 'relay'` and `resource_id = relay_id` - Returns all activity where `resource_type = 'relay'` and `resource_id = relay_id`
- Ordered newest-first - Ordered newest-first
+113
View File
@@ -0,0 +1,113 @@
# `pub struct Stripe`
A thin async wrapper around the subset of the Stripe REST API this service uses. It knows nothing about relays, tenants, or the database — it just speaks HTTP to Stripe and returns `serde_json::Value` (or small typed results). The domain logic that drives it lives in `spec/billing.md`.
Members:
- `secret_key: String` - Stripe API key, used as the bearer token and as the HMAC key for idempotency keys
- `webhook_secret: String` - secret for verifying Stripe webhook signatures
- `http: reqwest::Client`
All requests authenticate with `secret_key` via `Authorization: Bearer`. Write requests that must not be retried twice (customer/subscription/item creation, invoice payment) send a deterministic `Idempotency-Key` derived by HMAC-ing the operation name and its arguments with `secret_key`. Reconcile-to-desired-state writes (e.g. setting an item quantity) intentionally omit the idempotency key, since re-applying the same target is a no-op.
On any 4xx/5xx response, the wrapper reads the body and folds Stripe's JSON error payload (`error.message` / `error.type` / `error.code` / `error.param`) into the returned error so callers get an actionable message instead of a bare status line.
## `pub fn from_env() -> Self`
Reads `STRIPE_SECRET_KEY` and `STRIPE_WEBHOOK_SECRET` (both required) and constructs the client. Panics if either is missing or blank. This is what `Billing::new` calls.
## `pub fn new(secret_key: String, webhook_secret: String) -> Self`
Constructs the client with a fresh `reqwest::Client` from explicit keys (does not touch the environment).
## `pub async fn create_customer(&self, name: &str, tenant_pubkey: &str) -> Result<String>`
- `POST /v1/customers` with `name` and `metadata[tenant_pubkey]`
- Idempotent on `tenant_pubkey`
- Returns the new customer id; errors if it isn't a `cus_…` id
## `pub async fn get_subscription(&self, subscription_id: &str) -> Result<Option<Value>>`
- `GET /v1/subscriptions/:id`
- Returns `None` if Stripe responds `404` (so callers can recover from a stale subscription id), otherwise the subscription object
## `pub async fn create_subscription(&self, customer_id: &str, items: &BTreeMap<String, i64>) -> Result<(String, BTreeMap<String, String>)>`
- `POST /v1/subscriptions` with `collection_method: charge_automatically` and one `items[n][price]` / `items[n][quantity]` pair per entry
- Idempotent on the customer and the `(price, quantity)` set
- Returns the subscription id and a map from price id to the created subscription item id
## `pub async fn create_subscription_item(&self, subscription_id: &str, price_id: &str, quantity: i64) -> Result<String>`
- `POST /v1/subscription_items`
- Idempotent on `(subscription_id, price_id)`
- Returns the new subscription item id
## `pub async fn set_subscription_item_quantity(&self, item_id: &str, quantity: i64) -> Result<()>`
- `POST /v1/subscription_items/:id` with `quantity`
- No idempotency key (reconcile-to-target write)
## `pub async fn delete_subscription_item(&self, item_id: &str) -> Result<()>`
- `DELETE /v1/subscription_items/:id`
## `pub async fn cancel_subscription(&self, subscription_id: &str) -> Result<()>`
- `DELETE /v1/subscriptions/:id`
## `pub async fn list_invoices(&self, customer_id: &str) -> Result<Value>`
- `GET /v1/invoices?customer=…`
- Returns the `data` array
## `pub async fn get_invoice(&self, invoice_id: &str) -> Result<Value, InvoiceLookupError>`
- `GET /v1/invoices/:id`
- On a 4xx response, returns `InvoiceLookupError::StripeClient { status }` (callers usually surface this as a client error, e.g. `404` "no such invoice"); other failures are `InvoiceLookupError::Internal`
- Returns the full invoice object
## `pub async fn pay_invoice(&self, invoice_id: &str) -> Result<()>`
- `POST /v1/invoices/:id/pay` (retries collection using the customer's default payment method)
- Idempotent on `invoice_id`
## `pub async fn pay_invoice_out_of_band(&self, invoice_id: &str) -> Result<()>`
- `POST /v1/invoices/:id/pay` with `paid_out_of_band: true` — used when payment was collected over Lightning rather than through Stripe
- Idempotent on `invoice_id`
## `pub async fn preview_invoice(&self, customer_id: &str, subscription_id: Option<&str>) -> Result<Value>`
- `GET /v1/invoices/upcoming?customer=…[&subscription=…]`
- Used to validate proration when a subscription is downgraded
## `pub async fn has_payment_method(&self, customer_id: &str) -> Result<bool>`
- `GET /v1/payment_methods?customer=…&type=card`
- Returns whether the customer has at least one card on file
## `pub async fn create_portal_session(&self, customer_id: &str, return_url: Option<&str>) -> Result<String>`
- `POST /v1/billing_portal/sessions` with `customer` and optional `return_url`
- Returns the portal session URL
## `pub fn construct_event(&self, payload: &str, sig_header: &str) -> Result<Event>`
Verifies the `Stripe-Signature` header against `webhook_secret` and parses the body.
- Parse `t=` (timestamp) and `v1=` (signature) from the header
- Compute `HMAC-SHA256(webhook_secret, "{t}.{payload}")`, hex-encode it, and compare to `v1=`; error on mismatch
- Error if the timestamp is more than 300 seconds from now
- Returns the deserialized `Event` (`{ event_type, data: { object } }`)
# `pub enum InvoiceLookupError`
- `StripeClient { status: reqwest::StatusCode }` - Stripe returned a 4xx for an invoice lookup
- `Internal(anyhow::Error)` - any other failure
Implements `Display`/`Error` and `From<anyhow::Error>` / `From<reqwest::Error>` (both mapping to `Internal`).
# `pub struct Event` / `pub struct EventData`
The verified, parsed webhook event: `Event { event_type: String, data: EventData }`, `EventData { object: serde_json::Value }`.
+23
View File
@@ -0,0 +1,23 @@
# `pub struct Wallet`
A handle to a single Nostr Wallet Connect (NWC) wallet. `Billing` holds one as its system wallet (receives — issues and looks up invoices); tenant wallets (pay invoices) are constructed ad-hoc from the decrypted `tenant.nwc_url` at the call site.
Member:
- `url: NostrWalletConnectURI` — the parsed `nostr+walletconnect://…` URI
## `pub fn from_url(url: &str) -> Result<Self>`
Parses an `nostr+walletconnect://` URI.
## `pub async fn make_invoice(&self, amount_msats: u64, description: &str) -> Result<String>`
Issues a bolt11 invoice for `amount_msats` and returns it.
## `pub async fn pay_invoice(&self, bolt11: String) -> Result<()>`
Pays a bolt11 invoice.
## `pub async fn is_settled(&self, bolt11: &str) -> Result<bool>`
Returns whether a bolt11 invoice (previously issued by this wallet) has settled.
+134 -1039
View File
File diff suppressed because it is too large Load Diff
+346 -1003
View File
File diff suppressed because it is too large Load Diff
+48
View File
@@ -0,0 +1,48 @@
use anyhow::{Result, anyhow};
pub async fn fiat_to_msats(amount_fiat_minor: i64, currency: &str) -> Result<u64> {
let price = get_bitcoin_price(&currency.to_uppercase()).await?;
let divisor = 10_f64.powi(currency_minor_exponent(currency)? as i32);
let amount_fiat = (amount_fiat_minor as f64) / divisor;
let amount_msats = (amount_fiat / price * 100_000_000_000.0).round();
Ok(amount_msats as u64)
}
#[derive(serde::Deserialize)]
struct CoinbaseSpotPriceResponse {
data: CoinbaseSpotPriceData,
}
#[derive(serde::Deserialize)]
struct CoinbaseSpotPriceData {
amount: String,
}
pub async fn get_bitcoin_price(currency: &str) -> Result<f64> {
let http = reqwest::Client::new();
let url = format!("https://api.coinbase.com/v2/prices/BTC-{currency}/spot");
let resp = http.get(url).send().await?;
let body: CoinbaseSpotPriceResponse = resp.error_for_status()?.json().await?;
body
.data
.amount
.parse::<f64>()
.map_err(|e| anyhow!("invalid BTC spot quote for {currency}: {e}"))
}
/// Number of decimal places in `currency`'s minor unit, following Stripe's
/// currency conventions (most are 2, JPY/KRW/… are 0, BHD/KWD/… are 3).
fn currency_minor_exponent(currency: &str) -> Result<u8> {
let normalized = currency.to_uppercase();
let exponent = match normalized.as_str() {
// Zero-decimal currencies in Stripe.
"BIF" | "CLP" | "DJF" | "GNF" | "JPY" | "KMF" | "KRW" | "MGA" | "PYG" | "RWF" | "UGX"
| "VND" | "VUV" | "XAF" | "XOF" | "XPF" => 0,
// Three-decimal currencies in Stripe.
"BHD" | "JOD" | "KWD" | "OMR" | "TND" => 3,
_ if normalized.chars().all(|c| c.is_ascii_alphabetic()) && normalized.len() == 3 => 2,
_ => return Err(anyhow!("invalid currency code: {currency}")),
};
Ok(exponent)
}
+231 -272
View File
@@ -18,6 +18,8 @@ impl Command {
Self { pool, notify } Self { pool, notify }
} }
// Activity
async fn insert_activity( async fn insert_activity(
tx: &mut Transaction<'_, Sqlite>, tx: &mut Transaction<'_, Sqlite>,
activity_type: &str, activity_type: &str,
@@ -61,218 +63,55 @@ impl Command {
}) })
} }
fn emit(&self, activity: Activity) { /// Run `f` inside a transaction, record an activity row, commit, and broadcast.
async fn with_activity<F>(
&self,
activity_type: &str,
resource_type: &str,
resource_id: &str,
f: F,
) -> Result<()>
where
F: AsyncFnOnce(&mut Transaction<'_, Sqlite>) -> Result<()>,
{
let mut tx = self.pool.begin().await?;
f(&mut tx).await?;
let activity =
Self::insert_activity(&mut tx, activity_type, resource_type, resource_id).await?;
tx.commit().await?;
let _ = self.notify.send(activity); let _ = self.notify.send(activity);
Ok(())
} }
// Tenants
pub async fn create_tenant(&self, tenant: &Tenant) -> Result<()> { pub async fn create_tenant(&self, tenant: &Tenant) -> Result<()> {
if tenant.stripe_customer_id.trim().is_empty() { self.with_activity("create_tenant", "tenant", &tenant.pubkey, async |tx| {
anyhow::bail!("stripe_customer_id is required"); sqlx::query(
} "INSERT INTO tenant (pubkey, nwc_url, created_at, stripe_customer_id)
VALUES (?, ?, ?, ?)",
let mut tx = self.pool.begin().await?; )
.bind(&tenant.pubkey)
sqlx::query( .bind(&tenant.nwc_url)
"INSERT INTO tenant (pubkey, nwc_url, created_at, stripe_customer_id) .bind(tenant.created_at)
VALUES (?, ?, ?, ?)", .bind(&tenant.stripe_customer_id)
) .execute(&mut **tx)
.bind(&tenant.pubkey) .await?;
.bind(&tenant.nwc_url) Ok(())
.bind(tenant.created_at) })
.bind(&tenant.stripe_customer_id) .await
.execute(&mut *tx)
.await?;
let activity =
Self::insert_activity(&mut tx, "create_tenant", "tenant", &tenant.pubkey).await?;
tx.commit().await?;
self.emit(activity);
Ok(())
} }
pub async fn update_tenant(&self, tenant: &Tenant) -> Result<()> { pub async fn update_tenant(&self, tenant: &Tenant) -> Result<()> {
let mut tx = self.pool.begin().await?; self.with_activity("update_tenant", "tenant", &tenant.pubkey, async |tx| {
sqlx::query("UPDATE tenant SET nwc_url = ? WHERE pubkey = ?")
sqlx::query("UPDATE tenant SET nwc_url = ? WHERE pubkey = ?") .bind(&tenant.nwc_url)
.bind(&tenant.nwc_url) .bind(&tenant.pubkey)
.bind(&tenant.pubkey) .execute(&mut **tx)
.execute(&mut *tx) .await?;
.await?; Ok(())
})
let activity = .await
Self::insert_activity(&mut tx, "update_tenant", "tenant", &tenant.pubkey).await?;
tx.commit().await?;
self.emit(activity);
Ok(())
}
pub async fn create_relay(&self, relay: &Relay) -> Result<()> {
let mut tx = self.pool.begin().await?;
sqlx::query(
"INSERT INTO relay (
id, tenant, schema, subdomain, plan, status, synced, sync_error,
info_name, info_icon, info_description,
policy_public_join, policy_strip_signatures,
groups_enabled, management_enabled, blossom_enabled,
livekit_enabled, push_enabled
) VALUES (?, ?, ?, ?, ?, 'active', 0, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
)
.bind(&relay.id)
.bind(&relay.tenant)
.bind(&relay.schema)
.bind(&relay.subdomain)
.bind(&relay.plan)
.bind(&relay.sync_error)
.bind(&relay.info_name)
.bind(&relay.info_icon)
.bind(&relay.info_description)
.bind(relay.policy_public_join)
.bind(relay.policy_strip_signatures)
.bind(relay.groups_enabled)
.bind(relay.management_enabled)
.bind(relay.blossom_enabled)
.bind(relay.livekit_enabled)
.bind(relay.push_enabled)
.execute(&mut *tx)
.await?;
let activity = Self::insert_activity(&mut tx, "create_relay", "relay", &relay.id).await?;
tx.commit().await?;
self.emit(activity);
Ok(())
}
pub async fn update_relay(&self, relay: &Relay) -> Result<()> {
let mut tx = self.pool.begin().await?;
sqlx::query(
"UPDATE relay
SET tenant = ?, schema = ?, subdomain = ?, plan = ?, status = ?, sync_error = ?, synced = 0,
info_name = ?, info_icon = ?, info_description = ?,
policy_public_join = ?, policy_strip_signatures = ?,
groups_enabled = ?, management_enabled = ?, blossom_enabled = ?,
livekit_enabled = ?, push_enabled = ?
WHERE id = ?",
)
.bind(&relay.tenant)
.bind(&relay.schema)
.bind(&relay.subdomain)
.bind(&relay.plan)
.bind(&relay.status)
.bind(&relay.sync_error)
.bind(&relay.info_name)
.bind(&relay.info_icon)
.bind(&relay.info_description)
.bind(relay.policy_public_join)
.bind(relay.policy_strip_signatures)
.bind(relay.groups_enabled)
.bind(relay.management_enabled)
.bind(relay.blossom_enabled)
.bind(relay.livekit_enabled)
.bind(relay.push_enabled)
.bind(&relay.id)
.execute(&mut *tx)
.await?;
let activity = Self::insert_activity(&mut tx, "update_relay", "relay", &relay.id).await?;
tx.commit().await?;
self.emit(activity);
Ok(())
}
pub async fn deactivate_relay(&self, relay: &Relay) -> Result<()> {
self.set_relay_status(&relay.id, RELAY_STATUS_INACTIVE, "deactivate_relay")
.await
}
pub async fn mark_relay_delinquent(&self, relay: &Relay) -> Result<()> {
self.set_relay_status(&relay.id, RELAY_STATUS_DELINQUENT, "deactivate_relay")
.await
}
async fn set_relay_status(
&self,
relay_id: &str,
status: &str,
activity_type: &str,
) -> Result<()> {
let mut tx = self.pool.begin().await?;
sqlx::query("UPDATE relay SET status = ?, synced = 0 WHERE id = ?")
.bind(status)
.bind(relay_id)
.execute(&mut *tx)
.await?;
let activity = Self::insert_activity(&mut tx, activity_type, "relay", relay_id).await?;
tx.commit().await?;
self.emit(activity);
Ok(())
}
pub async fn activate_relay(&self, relay: &Relay) -> Result<()> {
self.set_relay_status(&relay.id, RELAY_STATUS_ACTIVE, "activate_relay")
.await
}
pub async fn fail_relay_sync(&self, relay: &Relay, sync_error: String) -> Result<()> {
let mut tx = self.pool.begin().await?;
sqlx::query("UPDATE relay SET synced = 0, sync_error = ? WHERE id = ?")
.bind(&sync_error)
.bind(&relay.id)
.execute(&mut *tx)
.await?;
let activity =
Self::insert_activity(&mut tx, "fail_relay_sync", "relay", &relay.id).await?;
tx.commit().await?;
self.emit(activity);
Ok(())
}
pub async fn complete_relay_sync(&self, relay_id: &str) -> Result<()> {
let mut tx = self.pool.begin().await?;
sqlx::query("UPDATE relay SET synced = 1, sync_error = '' WHERE id = ?")
.bind(relay_id)
.execute(&mut *tx)
.await?;
let activity =
Self::insert_activity(&mut tx, "complete_relay_sync", "relay", relay_id).await?;
tx.commit().await?;
self.emit(activity);
Ok(())
}
pub async fn delete_relay_subscription_item(&self, relay_id: &str) -> Result<()> {
sqlx::query("UPDATE relay SET stripe_subscription_item_id = NULL WHERE id = ?")
.bind(relay_id)
.execute(&self.pool)
.await?;
Ok(())
}
pub async fn set_relay_subscription_item(
&self,
relay_id: &str,
stripe_subscription_item_id: &str,
) -> Result<()> {
sqlx::query("UPDATE relay SET stripe_subscription_item_id = ? WHERE id = ?")
.bind(stripe_subscription_item_id)
.bind(relay_id)
.execute(&self.pool)
.await?;
Ok(())
} }
pub async fn set_tenant_subscription( pub async fn set_tenant_subscription(
@@ -313,6 +152,173 @@ impl Command {
Ok(()) Ok(())
} }
pub async fn set_tenant_past_due(&self, pubkey: &str) -> Result<()> {
let now = chrono::Utc::now().timestamp();
sqlx::query("UPDATE tenant SET past_due_at = ? WHERE pubkey = ?")
.bind(now)
.bind(pubkey)
.execute(&self.pool)
.await?;
Ok(())
}
pub async fn clear_tenant_past_due(&self, pubkey: &str) -> Result<()> {
sqlx::query("UPDATE tenant SET past_due_at = NULL WHERE pubkey = ?")
.bind(pubkey)
.execute(&self.pool)
.await?;
Ok(())
}
// Relays
pub async fn create_relay(&self, relay: &Relay) -> Result<()> {
self.with_activity("create_relay", "relay", &relay.id, async |tx| {
sqlx::query(
"INSERT INTO relay (
id, tenant, schema, subdomain, plan, status, synced, sync_error,
info_name, info_icon, info_description,
policy_public_join, policy_strip_signatures,
groups_enabled, management_enabled, blossom_enabled,
livekit_enabled, push_enabled
) VALUES (?, ?, ?, ?, ?, 'active', 0, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
)
.bind(&relay.id)
.bind(&relay.tenant)
.bind(&relay.schema)
.bind(&relay.subdomain)
.bind(&relay.plan)
.bind(&relay.sync_error)
.bind(&relay.info_name)
.bind(&relay.info_icon)
.bind(&relay.info_description)
.bind(relay.policy_public_join)
.bind(relay.policy_strip_signatures)
.bind(relay.groups_enabled)
.bind(relay.management_enabled)
.bind(relay.blossom_enabled)
.bind(relay.livekit_enabled)
.bind(relay.push_enabled)
.execute(&mut **tx)
.await?;
Ok(())
})
.await
}
pub async fn update_relay(&self, relay: &Relay) -> Result<()> {
self.with_activity("update_relay", "relay", &relay.id, async |tx| {
sqlx::query(
"UPDATE relay
SET tenant = ?, schema = ?, subdomain = ?, plan = ?, status = ?, sync_error = ?, synced = 0,
info_name = ?, info_icon = ?, info_description = ?,
policy_public_join = ?, policy_strip_signatures = ?,
groups_enabled = ?, management_enabled = ?, blossom_enabled = ?,
livekit_enabled = ?, push_enabled = ?
WHERE id = ?",
)
.bind(&relay.tenant)
.bind(&relay.schema)
.bind(&relay.subdomain)
.bind(&relay.plan)
.bind(&relay.status)
.bind(&relay.sync_error)
.bind(&relay.info_name)
.bind(&relay.info_icon)
.bind(&relay.info_description)
.bind(relay.policy_public_join)
.bind(relay.policy_strip_signatures)
.bind(relay.groups_enabled)
.bind(relay.management_enabled)
.bind(relay.blossom_enabled)
.bind(relay.livekit_enabled)
.bind(relay.push_enabled)
.bind(&relay.id)
.execute(&mut **tx)
.await?;
Ok(())
})
.await
}
pub async fn deactivate_relay(&self, relay: &Relay) -> Result<()> {
self.set_relay_status(&relay.id, RELAY_STATUS_INACTIVE, "deactivate_relay")
.await
}
pub async fn mark_relay_delinquent(&self, relay: &Relay) -> Result<()> {
self.set_relay_status(&relay.id, RELAY_STATUS_DELINQUENT, "mark_relay_delinquent")
.await
}
pub async fn activate_relay(&self, relay: &Relay) -> Result<()> {
self.set_relay_status(&relay.id, RELAY_STATUS_ACTIVE, "activate_relay")
.await
}
async fn set_relay_status(
&self,
relay_id: &str,
status: &str,
activity_type: &str,
) -> Result<()> {
self.with_activity(activity_type, "relay", relay_id, async |tx| {
sqlx::query("UPDATE relay SET status = ?, synced = 0 WHERE id = ?")
.bind(status)
.bind(relay_id)
.execute(&mut **tx)
.await?;
Ok(())
})
.await
}
pub async fn fail_relay_sync(&self, relay: &Relay, sync_error: String) -> Result<()> {
self.with_activity("fail_relay_sync", "relay", &relay.id, async |tx| {
sqlx::query("UPDATE relay SET synced = 0, sync_error = ? WHERE id = ?")
.bind(&sync_error)
.bind(&relay.id)
.execute(&mut **tx)
.await?;
Ok(())
})
.await
}
pub async fn complete_relay_sync(&self, relay_id: &str) -> Result<()> {
self.with_activity("complete_relay_sync", "relay", relay_id, async |tx| {
sqlx::query("UPDATE relay SET synced = 1, sync_error = '' WHERE id = ?")
.bind(relay_id)
.execute(&mut **tx)
.await?;
Ok(())
})
.await
}
pub async fn clear_relay_subscription_item(&self, relay_id: &str) -> Result<()> {
sqlx::query("UPDATE relay SET stripe_subscription_item_id = NULL WHERE id = ?")
.bind(relay_id)
.execute(&self.pool)
.await?;
Ok(())
}
pub async fn set_relay_subscription_item(
&self,
relay_id: &str,
stripe_subscription_item_id: &str,
) -> Result<()> {
sqlx::query("UPDATE relay SET stripe_subscription_item_id = ? WHERE id = ?")
.bind(stripe_subscription_item_id)
.bind(relay_id)
.execute(&self.pool)
.await?;
Ok(())
}
// Invoices
pub async fn insert_pending_invoice_nwc_payment( pub async fn insert_pending_invoice_nwc_payment(
&self, &self,
invoice_id: &str, invoice_id: &str,
@@ -353,74 +359,27 @@ impl Command {
Ok(()) Ok(())
} }
pub async fn set_tenant_past_due(&self, pubkey: &str) -> Result<()> { pub async fn insert_manual_lightning_invoice_payment(
&self,
invoice_id: &str,
tenant_pubkey: &str,
bolt11: &str,
) -> Result<bool> {
let now = chrono::Utc::now().timestamp(); let now = chrono::Utc::now().timestamp();
sqlx::query("UPDATE tenant SET past_due_at = ? WHERE pubkey = ?") let result = sqlx::query(
.bind(now) "INSERT INTO invoice_manual_lightning_payment
.bind(pubkey) (invoice_id, tenant_pubkey, bolt11, created_at, updated_at)
.execute(&self.pool) VALUES (?, ?, ?, ?, ?)
.await?; ON CONFLICT(invoice_id) DO NOTHING",
Ok(()) )
} .bind(invoice_id)
.bind(tenant_pubkey)
.bind(bolt11)
.bind(now)
.bind(now)
.execute(&self.pool)
.await?;
pub async fn clear_tenant_past_due(&self, pubkey: &str) -> Result<()> { Ok(result.rows_affected() > 0)
sqlx::query("UPDATE tenant SET past_due_at = NULL WHERE pubkey = ?")
.bind(pubkey)
.execute(&self.pool)
.await?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use sqlx::SqlitePool;
use sqlx::sqlite::{SqliteConnectOptions, SqlitePoolOptions};
use std::str::FromStr;
async fn test_pool() -> SqlitePool {
let connect_options = SqliteConnectOptions::from_str("sqlite::memory:")
.expect("valid sqlite memory url")
.create_if_missing(true);
let pool = SqlitePoolOptions::new()
.max_connections(1)
.connect_with(connect_options)
.await
.expect("connect sqlite memory db");
sqlx::migrate!("./migrations")
.run(&pool)
.await
.expect("run migrations");
pool
}
#[tokio::test]
async fn create_tenant_rejects_empty_stripe_customer_id() {
let pool = test_pool().await;
let command = Command::new(pool);
let tenant = Tenant {
pubkey: "tenant_pubkey".to_string(),
nwc_url: String::new(),
nwc_error: None,
created_at: 0,
stripe_customer_id: " ".to_string(),
stripe_subscription_id: None,
past_due_at: None,
};
let err = command
.create_tenant(&tenant)
.await
.expect_err("empty customer id must be rejected");
assert!(
err.to_string().contains("stripe_customer_id is required"),
"unexpected error: {err}"
);
} }
} }
+126
View File
@@ -0,0 +1,126 @@
use anyhow::{Result, anyhow};
use nostr_sdk::prelude::*;
#[derive(Clone)]
pub struct Env {
pub server_host: String,
pub server_port: u16,
pub server_admin_pubkeys: Vec<String>,
pub server_allow_origins: Vec<String>,
pub database_url: String,
pub robot_name: String,
pub robot_wallet: String,
pub robot_picture: String,
pub robot_description: String,
pub robot_outbox_relays: Vec<String>,
pub robot_indexer_relays: Vec<String>,
pub robot_messaging_relays: Vec<String>,
pub blossom_s3_region: String,
pub blossom_s3_bucket: String,
pub blossom_s3_endpoint: String,
pub blossom_s3_access_key: String,
pub blossom_s3_secret_key: String,
pub zooid_api_url: String,
pub relay_domain: String,
pub livekit_url: String,
pub livekit_api_key: String,
pub livekit_api_secret: String,
pub stripe_secret_key: String,
pub stripe_webhook_secret: String,
pub stripe_price_basic: String,
pub stripe_price_growth: String,
/// Parsed from `robot_secret`; used for nostr signing and nip44 encryption.
pub keys: Keys,
}
impl Env {
pub fn load() -> Self {
let keys = Keys::parse(&require_str("ROBOT_SECRET"))
.expect("ROBOT_SECRET is not a valid nostr secret key");
Self {
server_host: require_str("SERVER_HOST"),
server_port: require_u16("SERVER_PORT"),
server_admin_pubkeys: require_csv("SERVER_ADMIN_PUBKEYS"),
server_allow_origins: require_csv("SERVER_ALLOW_ORIGINS"),
database_url: require_str("DATABASE_URL"),
robot_name: require_str("ROBOT_NAME"),
robot_wallet: require_str("ROBOT_WALLET"),
robot_picture: require_str("ROBOT_PICTURE"),
robot_description: require_str("ROBOT_DESCRIPTION"),
robot_outbox_relays: require_csv("ROBOT_OUTBOX_RELAYS"),
robot_indexer_relays: require_csv("ROBOT_INDEXER_RELAYS"),
robot_messaging_relays: require_csv("ROBOT_MESSAGING_RELAYS"),
blossom_s3_region: require_str("BLOSSOM_S3_REGION"),
blossom_s3_bucket: require_str("BLOSSOM_S3_BUCKET"),
blossom_s3_endpoint: require_str("BLOSSOM_S3_ENDPOINT"),
blossom_s3_access_key: require_str("BLOSSOM_S3_ACCESS_KEY"),
blossom_s3_secret_key: require_str("BLOSSOM_S3_SECRET_KEY"),
zooid_api_url: require_str("ZOOID_API_URL"),
relay_domain: require_str("RELAY_DOMAIN"),
livekit_url: require_str("LIVEKIT_URL"),
livekit_api_key: require_str("LIVEKIT_API_KEY"),
livekit_api_secret: require_str("LIVEKIT_API_SECRET"),
stripe_secret_key: require_str("STRIPE_SECRET_KEY"),
stripe_webhook_secret: require_str("STRIPE_WEBHOOK_SECRET"),
stripe_price_basic: require_str("STRIPE_PRICE_BASIC"),
stripe_price_growth: require_str("STRIPE_PRICE_GROWTH"),
keys,
}
}
pub fn encrypt(&self, plaintext: &str) -> Result<String> {
nip44::encrypt(
self.keys.secret_key(),
&self.keys.public_key(),
plaintext,
nip44::Version::V2,
)
.map_err(|e| anyhow!("encryption failed: {e}"))
}
pub fn decrypt(&self, ciphertext: &str) -> Result<String> {
nip44::decrypt(self.keys.secret_key(), &self.keys.public_key(), ciphertext)
.map_err(|e| anyhow!("decryption failed: {e}"))
}
pub async fn make_auth(&self, url: &str, method: HttpMethod) -> Result<String> {
let server_url = Url::parse(url)?;
let auth = HttpData::new(server_url, method)
.to_authorization(&self.keys)
.await?;
Ok(auth)
}
}
fn require_str(key: &str) -> String {
let v = std::env::var(key)
.unwrap_or_else(|_| panic!("{key} is required"))
.trim()
.to_string();
if v.is_empty() {
panic!("{key} is required")
}
v
}
fn require_u16(key: &str) -> u16 {
require_str(key)
.parse()
.unwrap_or_else(|_| panic!("{key} is invalid"))
}
fn require_csv(key: &str) -> Vec<String> {
let v: Vec<String> = std::env::var(key)
.unwrap_or_default()
.split(',')
.map(|s| s.trim().to_string())
.filter(|s| !s.is_empty())
.collect();
if v.is_empty() {
panic!("{key} is required");
}
v
}
+163 -235
View File
@@ -3,6 +3,7 @@ use nostr_sdk::prelude::*;
use std::time::Duration; use std::time::Duration;
use crate::command::Command; use crate::command::Command;
use crate::env::Env;
use crate::models::{Activity, RELAY_STATUS_DELINQUENT, RELAY_STATUS_INACTIVE, Relay}; use crate::models::{Activity, RELAY_STATUS_DELINQUENT, RELAY_STATUS_INACTIVE, Relay};
use crate::query::Query; use crate::query::Query;
@@ -12,42 +13,18 @@ const RELAY_SYNC_RETRY_MAX_ATTEMPTS: usize = 6;
#[derive(Clone)] #[derive(Clone)]
pub struct Infra { pub struct Infra {
api_url: String, env: Env,
relay_domain: String,
livekit_url: String,
livekit_api_key: String,
livekit_api_secret: String,
api_secret: String,
query: Query, query: Query,
command: Command, command: Command,
} }
impl Infra { impl Infra {
pub fn new(query: Query, command: Command) -> Result<Self> { pub fn new(query: Query, command: Command, env: &Env) -> Self {
let api_url = std::env::var("ZOOID_API_URL").unwrap_or_default(); Self {
let relay_domain = std::env::var("RELAY_DOMAIN").unwrap_or_default(); env: env.clone(),
let livekit_url = std::env::var("LIVEKIT_URL").unwrap_or_default();
let livekit_api_key = std::env::var("LIVEKIT_API_KEY").unwrap_or_default();
let livekit_api_secret = std::env::var("LIVEKIT_API_SECRET").unwrap_or_default();
let api_secret = std::env::var("ZOOID_API_SECRET").unwrap_or_default();
if api_url.trim().is_empty() {
anyhow::bail!("missing ZOOID_API_URL");
}
if api_secret.trim().is_empty() {
anyhow::bail!("missing ZOOID_API_SECRET");
}
Ok(Self {
api_url,
relay_domain,
livekit_url,
livekit_api_key,
livekit_api_secret,
api_secret,
query, query,
command, command,
}) }
} }
pub async fn start(self) { pub async fn start(self) {
@@ -77,15 +54,17 @@ impl Infra {
} }
async fn handle_activity(&self, activity: &Activity) -> Result<()> { async fn handle_activity(&self, activity: &Activity) -> Result<()> {
let needs_sync = should_sync_relay_activity(activity.activity_type.as_str()); let needs_sync = matches!(
activity.activity_type.as_str(),
"create_relay" | "update_relay" | "activate_relay" | "deactivate_relay" | "fail_relay_sync"
);
if !needs_sync || activity.resource_type != "relay" { if activity.resource_type != "relay" || !needs_sync {
return Ok(()); return Ok(());
} }
if activity.activity_type == "fail_relay_sync" { if activity.activity_type == "fail_relay_sync" {
self.schedule_relay_sync_retry(&activity.resource_id, "activity") self.schedule_relay_sync_retry(&activity.resource_id, "activity").await?;
.await?;
return Ok(()); return Ok(());
} }
@@ -93,9 +72,7 @@ impl Infra {
return Ok(()); return Ok(());
}; };
let is_new = relay.synced == 0; self.sync_relay(&relay).await;
self.sync_and_report(&relay, is_new).await;
Ok(()) Ok(())
} }
@@ -110,8 +87,7 @@ impl Infra {
for relay in relays { for relay in relays {
if relay.sync_error.trim().is_empty() { if relay.sync_error.trim().is_empty() {
let is_new = relay.synced == 0; self.sync_relay(&relay).await;
self.sync_and_report(&relay, is_new).await;
} else { } else {
self.schedule_relay_sync_retry(&relay.id, source).await?; self.schedule_relay_sync_retry(&relay.id, source).await?;
} }
@@ -121,10 +97,28 @@ impl Infra {
} }
async fn schedule_relay_sync_retry(&self, relay_id: &str, source: &str) -> Result<()> { async fn schedule_relay_sync_retry(&self, relay_id: &str, source: &str) -> Result<()> {
let activities = self.query.list_activity_for_relay(relay_id).await?; fn get_retry_delay(consecutive_failures: usize) -> Option<Duration> {
let consecutive_failures = consecutive_sync_failures(&activities); let retry_attempt = consecutive_failures.max(1);
if retry_attempt > RELAY_SYNC_RETRY_MAX_ATTEMPTS {
return None;
}
let Some(delay) = relay_sync_retry_delay(consecutive_failures) else { let exponent = (retry_attempt - 1).min(31);
let multiplier = 1u64 << exponent;
let delay_secs = RELAY_SYNC_RETRY_BASE_DELAY_SECS
.saturating_mul(multiplier)
.min(RELAY_SYNC_RETRY_MAX_DELAY_SECS);
Some(Duration::from_secs(delay_secs))
}
let activities = self.query.list_activity_for_resource(relay_id).await?;
let consecutive_failures = activities
.iter()
.take_while(|activity| activity.activity_type == "fail_relay_sync")
.count();
let Some(delay) = get_retry_delay(consecutive_failures) else {
tracing::warn!( tracing::warn!(
relay = relay_id, relay = relay_id,
consecutive_failures, consecutive_failures,
@@ -148,31 +142,20 @@ impl Infra {
tokio::spawn(async move { tokio::spawn(async move {
tokio::time::sleep(delay).await; tokio::time::sleep(delay).await;
if let Err(e) = infra.retry_relay_sync(&relay_id).await { match infra.query.get_relay(&relay_id).await {
tracing::error!(relay = %relay_id, error = %e, "relay sync retry task failed"); Ok(Some(relay)) => infra.sync_relay(&relay).await,
Ok(None) => {}
Err(e) => {
tracing::error!(relay = %relay_id, error = %e, "relay sync retry task failed");
}
} }
}); });
Ok(()) Ok(())
} }
async fn retry_relay_sync(&self, relay_id: &str) -> Result<()> { async fn sync_relay(&self, relay: &Relay) {
let Some(relay) = self.query.get_relay(relay_id).await? else { match self.try_sync_relay(relay).await {
return Ok(());
};
if relay.sync_error.trim().is_empty() {
tracing::debug!(relay = %relay.id, "skip relay sync retry; relay has no sync_error");
return Ok(());
}
let is_new = relay.synced == 0;
self.sync_and_report(&relay, is_new).await;
Ok(())
}
async fn sync_and_report(&self, relay: &Relay, is_new: bool) {
match self.sync_relay(relay, is_new).await {
Ok(()) => { Ok(()) => {
tracing::info!(relay = %relay.id, "relay sync succeeded"); tracing::info!(relay = %relay.id, "relay sync succeeded");
if let Err(e) = self.command.complete_relay_sync(&relay.id).await { if let Err(e) = self.command.complete_relay_sync(&relay.id).await {
@@ -188,191 +171,136 @@ impl Infra {
} }
} }
async fn nip98_auth(&self, url: &str, method: HttpMethod) -> Result<String> { async fn try_sync_relay(&self, relay: &Relay) -> Result<()> {
let keys = Keys::parse(&self.api_secret)?; // A relay is "new" (POST with a freshly generated secret) only if it has
let server_url = Url::parse(url)?; // never completed a sync. `synced == 1` short-circuits the activity lookup;
let auth = HttpData::new(server_url, method) // otherwise check the activity history so that a re-sync after an update
.to_authorization(&keys) // (which resets `synced` to 0) PATCHes instead of clobbering the secret.
let is_new = relay.synced != 1
&& self
.query
.get_latest_activity_for_resource_and_type(&relay.id, "complete_relay_sync")
.await?
.is_none();
let mut body = serde_json::json!({
"host": format!("{}.{}", relay.subdomain, self.env.relay_domain),
"schema": relay.schema,
"inactive": relay.status == RELAY_STATUS_INACTIVE
|| relay.status == RELAY_STATUS_DELINQUENT,
"info": {
"name": relay.info_name,
"icon": relay.info_icon,
"description": relay.info_description,
"pubkey": relay.tenant,
},
"policy": {
"public_join": relay.policy_public_join == 1,
"strip_signatures": relay.policy_strip_signatures == 1,
},
"groups": { "enabled": relay.groups_enabled == 1 },
"management": { "enabled": relay.management_enabled == 1 },
"blossom": if relay.blossom_enabled == 1 {
serde_json::json!({
"enabled": true,
"adapter": "s3",
"s3": {
"endpoint": self.env.blossom_s3_endpoint,
"region": self.env.blossom_s3_region,
"bucket": self.env.blossom_s3_bucket,
"access_key": self.env.blossom_s3_access_key,
"secret_key": self.env.blossom_s3_secret_key,
"key_prefix": relay.schema,
},
})
} else {
serde_json::json!({ "enabled": false })
},
"livekit": if relay.livekit_enabled == 1 {
serde_json::json!({
"enabled": true,
"server_url": self.env.livekit_url,
"api_key": self.env.livekit_api_key,
"api_secret": self.env.livekit_api_secret,
})
} else {
serde_json::json!({ "enabled": false })
},
"push": { "enabled": relay.push_enabled == 1 },
"roles": {
"admin": { "can_manage": true, "can_invite": true },
"member": { "can_invite": true },
},
});
// Only provide a secret if the relay is new. This allows us to not store the relay secrets on our side.
if is_new && let Some(obj) = body.as_object_mut() {
obj.insert(
"secret".to_string(),
serde_json::Value::String(Keys::generate().secret_key().to_secret_hex()),
);
}
let method = if is_new { HttpMethod::POST } else { HttpMethod::PATCH };
self.request(method, &format!("relay/{}", relay.id), Some(&body))
.await?; .await?;
Ok(auth) Ok(())
} }
pub async fn list_relay_members(&self, relay_id: &str) -> Result<Vec<String>> { pub async fn list_relay_members(&self, relay_id: &str) -> Result<Vec<String>> {
let client = reqwest::Client::new(); #[derive(serde::Deserialize)]
let base = self.api_url.trim_end_matches('/'); struct MembersResponse {
let url = format!("{base}/relay/{relay_id}/members"); members: Vec<String>,
let auth = self.nip98_auth(&url, HttpMethod::GET).await?; }
let response = client let response = self
.get(&url) .request(HttpMethod::GET, &format!("relay/{relay_id}/members"), None)
.header("Authorization", auth)
.send()
.await?; .await?;
let parsed: MembersResponse = response.json().await?;
Ok(parsed.members)
}
// Internal utilities
/// Sends an authenticated request to the zooid API at `path` (relative to
/// `env.zooid_api_url`). Returns the response on 2xx; bails with the body
/// text otherwise.
async fn request(
&self,
method: HttpMethod,
path: &str,
body: Option<&serde_json::Value>,
) -> Result<reqwest::Response> {
let client = reqwest::Client::builder()
.timeout(Duration::from_secs(5))
.build()?;
let base = self.env.zooid_api_url.trim_end_matches('/');
let path = path.trim_start_matches('/');
let url = format!("{base}/{path}");
let auth = self.env.make_auth(&url, method).await?;
let reqwest_method = match method {
HttpMethod::GET => reqwest::Method::GET,
HttpMethod::POST => reqwest::Method::POST,
HttpMethod::PUT => reqwest::Method::PUT,
HttpMethod::PATCH => reqwest::Method::PATCH,
};
let mut req = client
.request(reqwest_method, &url)
.header("Authorization", auth);
if let Some(body) = body {
req = req.json(body);
}
let response = req.send().await?;
if !response.status().is_success() { if !response.status().is_success() {
let status = response.status(); let status = response.status();
let body = response.text().await.unwrap_or_default(); let text = response.text().await.unwrap_or_default();
anyhow::bail!("zooid members returned {status}: {body}"); anyhow::bail!("zooid {method} {path} returned {status}: {text}");
} }
Ok(response)
let body = response.text().await?;
parse_relay_members_response(&body)
}
async fn sync_relay(&self, relay: &Relay, is_new: bool) -> Result<()> {
let client = reqwest::Client::new();
let base = self.api_url.trim_end_matches('/');
let host = if self.relay_domain.is_empty() {
relay.subdomain.clone()
} else {
format!("{}.{}", relay.subdomain, self.relay_domain)
};
let livekit = if relay.livekit_enabled == 1 {
serde_json::json!({
"enabled": true,
"server_url": self.livekit_url,
"api_key": self.livekit_api_key,
"api_secret": self.livekit_api_secret,
})
} else {
serde_json::json!({ "enabled": false })
};
let body = relay_sync_body(
relay,
host,
livekit,
is_new.then(|| Keys::generate().secret_key().to_secret_hex()),
);
let url = format!("{}/relay/{}", base, relay.id);
let auth = self
.nip98_auth(&url, zooid_sync_http_method(is_new))
.await?;
let request = if is_new {
client.post(&url)
} else {
client.patch(&url)
};
let response = request
.header("Authorization", auth)
.json(&body)
.send()
.await?;
if !response.status().is_success() {
let status = response.status();
let body = response.text().await.unwrap_or_default();
anyhow::bail!("zooid sync returned {status}: {body}")
}
Ok(())
} }
} }
fn zooid_sync_http_method(is_new: bool) -> HttpMethod {
if is_new {
HttpMethod::POST
} else {
HttpMethod::PATCH
}
}
fn parse_relay_members_response(body: &str) -> Result<Vec<String>> {
let value: serde_json::Value = serde_json::from_str(body)?;
if let Some(members) = members_from_value(&value) {
return Ok(members);
}
if let Some(members) = value.get("members").and_then(members_from_value) {
return Ok(members);
}
if let Some(members) = value
.get("data")
.and_then(|data| data.get("members"))
.and_then(members_from_value)
{
return Ok(members);
}
anyhow::bail!("zooid members response missing members array")
}
fn members_from_value(value: &serde_json::Value) -> Option<Vec<String>> {
let values = value.as_array()?;
values
.iter()
.map(|value| value.as_str().map(ToString::to_string))
.collect()
}
fn relay_sync_body(
relay: &Relay,
host: String,
livekit: serde_json::Value,
secret: Option<String>,
) -> serde_json::Value {
let mut body = serde_json::json!({
"host": host,
"schema": relay.schema,
"inactive": relay.status == RELAY_STATUS_INACTIVE
|| relay.status == RELAY_STATUS_DELINQUENT,
"info": {
"name": relay.info_name,
"icon": relay.info_icon,
"description": relay.info_description,
"pubkey": relay.tenant,
},
"policy": {
"public_join": relay.policy_public_join == 1,
"strip_signatures": relay.policy_strip_signatures == 1,
},
"groups": { "enabled": relay.groups_enabled == 1 },
"management": { "enabled": relay.management_enabled == 1 },
"blossom": { "enabled": relay.blossom_enabled == 1 },
"livekit": livekit,
"push": { "enabled": relay.push_enabled == 1 },
"roles": {
"admin": { "can_manage": true, "can_invite": true },
"member": { "can_invite": true },
},
});
if let (Some(secret), Some(body_obj)) = (secret, body.as_object_mut()) {
body_obj.insert("secret".to_string(), serde_json::Value::String(secret));
}
body
}
fn should_sync_relay_activity(activity_type: &str) -> bool {
matches!(
activity_type,
"create_relay" | "update_relay" | "activate_relay" | "deactivate_relay" | "fail_relay_sync"
)
}
fn consecutive_sync_failures(activities: &[Activity]) -> usize {
activities
.iter()
.take_while(|activity| activity.activity_type == "fail_relay_sync")
.count()
}
fn relay_sync_retry_delay(consecutive_failures: usize) -> Option<Duration> {
let retry_attempt = consecutive_failures.max(1);
if retry_attempt > RELAY_SYNC_RETRY_MAX_ATTEMPTS {
return None;
}
let exponent = (retry_attempt - 1).min(31);
let multiplier = 1u64 << exponent;
let delay_secs = RELAY_SYNC_RETRY_BASE_DELAY_SECS
.saturating_mul(multiplier)
.min(RELAY_SYNC_RETRY_MAX_DELAY_SECS);
Some(Duration::from_secs(delay_secs))
}
+6
View File
@@ -1,8 +1,14 @@
pub mod api; pub mod api;
pub mod billing; pub mod billing;
pub mod bitcoin;
pub mod command; pub mod command;
pub mod env;
pub mod infra; pub mod infra;
pub mod models; pub mod models;
pub mod pool; pub mod pool;
pub mod query; pub mod query;
pub mod robot; pub mod robot;
pub mod routes;
pub mod stripe;
pub mod wallet;
pub mod web;
+20 -21
View File
@@ -1,11 +1,17 @@
mod api; mod api;
mod billing; mod billing;
mod bitcoin;
mod command; mod command;
mod env;
mod infra; mod infra;
mod models; mod models;
mod pool; mod pool;
mod query; mod query;
mod robot; mod robot;
mod routes;
mod stripe;
mod wallet;
mod web;
use anyhow::Result; use anyhow::Result;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
@@ -15,6 +21,7 @@ use tower_http::cors::{AllowOrigin, CorsLayer};
use crate::api::Api; use crate::api::Api;
use crate::billing::Billing; use crate::billing::Billing;
use crate::command::Command; use crate::command::Command;
use crate::env::Env;
use crate::infra::Infra; use crate::infra::Infra;
use crate::query::Query; use crate::query::Query;
use crate::robot::Robot; use crate::robot::Robot;
@@ -28,30 +35,21 @@ async fn main() -> Result<()> {
.with(tracing_subscriber::fmt::layer()) .with(tracing_subscriber::fmt::layer())
.init(); .init();
let pool = pool::create_pool().await?; let env = Env::load();
let robot = Robot::new().await?;
let query = Query::new(pool.clone()); let pool = pool::create_pool(&env.database_url).await?;
let robot = Robot::new(&env).await?;
let query = Query::new(pool.clone(), &env);
let command = Command::new(pool); let command = Command::new(pool);
let billing = Billing::new(query.clone(), command.clone(), robot.clone()); let billing = Billing::new(query.clone(), command.clone(), robot.clone(), &env);
let infra = Infra::new(query.clone(), command.clone())?; let infra = Infra::new(query.clone(), command.clone(), &env);
let api = Api::new(query, command, billing.clone(), infra.clone()); let api = Api::new(query, command, billing.clone(), infra.clone(), &env);
let host = std::env::var("HOST").unwrap_or_else(|_| "127.0.0.1".to_string()); let cors = if env.server_allow_origins.is_empty() {
let port: u16 = std::env::var("PORT")
.ok()
.and_then(|v| v.parse().ok())
.unwrap_or(2892);
let origins: Vec<String> = std::env::var("ALLOW_ORIGINS")
.unwrap_or_default()
.split(',')
.map(|v| v.trim().to_string())
.filter(|v| !v.is_empty())
.collect();
let cors = if origins.is_empty() {
CorsLayer::permissive() CorsLayer::permissive()
} else { } else {
let parsed = origins let parsed = env
.server_allow_origins
.iter() .iter()
.filter_map(|o| o.parse::<axum::http::HeaderValue>().ok()) .filter_map(|o| o.parse::<axum::http::HeaderValue>().ok())
.collect::<Vec<_>>(); .collect::<Vec<_>>();
@@ -68,7 +66,8 @@ async fn main() -> Result<()> {
billing.start().await; billing.start().await;
}); });
let listener = tokio::net::TcpListener::bind(format!("{host}:{port}")).await?; let listener =
tokio::net::TcpListener::bind(format!("{}:{}", env.server_host, env.server_port)).await?;
axum::serve(listener, app).await?; axum::serve(listener, app).await?;
Ok(()) Ok(())
} }
+27 -1
View File
@@ -25,7 +25,7 @@ pub struct Plan {
pub stripe_price_id: Option<String>, pub stripe_price_id: Option<String>,
} }
#[derive(Debug, Clone, Serialize, Deserialize, sqlx::FromRow)] #[derive(Debug, Default, Clone, Serialize, Deserialize, sqlx::FromRow)]
pub struct Tenant { pub struct Tenant {
pub pubkey: String, pub pubkey: String,
pub nwc_url: String, pub nwc_url: String,
@@ -58,3 +58,29 @@ pub struct Relay {
pub push_enabled: i64, pub push_enabled: i64,
pub synced: i64, pub synced: i64,
} }
impl Default for Relay {
fn default() -> Self {
Self {
id: String::new(),
tenant: String::new(),
schema: String::new(),
subdomain: String::new(),
plan: String::new(),
stripe_subscription_item_id: None,
status: RELAY_STATUS_ACTIVE.to_string(),
sync_error: String::new(),
info_name: String::new(),
info_icon: String::new(),
info_description: String::new(),
policy_public_join: 0,
policy_strip_signatures: 0,
groups_enabled: 1,
management_enabled: 1,
blossom_enabled: 0,
livekit_enabled: 0,
push_enabled: 1,
synced: 0,
}
}
}
+2 -4
View File
@@ -7,10 +7,8 @@ use sqlx::{
sqlite::{SqliteConnectOptions, SqlitePoolOptions}, sqlite::{SqliteConnectOptions, SqlitePoolOptions},
}; };
pub async fn create_pool() -> Result<SqlitePool> { pub async fn create_pool(database_url: &str) -> Result<SqlitePool> {
let raw_database_url = std::env::var("DATABASE_URL") let database_url = normalize_sqlite_url(database_url);
.unwrap_or_else(|_| format!("sqlite://{}/data/caravel.db", env!("CARGO_MANIFEST_DIR")));
let database_url = normalize_sqlite_url(&raw_database_url);
if let Some(path) = database_url.strip_prefix("sqlite://") if let Some(path) = database_url.strip_prefix("sqlite://")
&& !path.is_empty() && !path.is_empty()
+107 -118
View File
@@ -1,42 +1,38 @@
use anyhow::Result; use anyhow::Result;
use sqlx::SqlitePool; use sqlx::SqlitePool;
use crate::env::Env;
use crate::models::{Activity, Plan, Relay, Tenant}; use crate::models::{Activity, Plan, Relay, Tenant};
fn select_tenant(tail: &str) -> String {
format!("SELECT * FROM tenant {tail}")
}
fn select_relay(tail: &str) -> String {
format!("SELECT * FROM relay {tail}")
}
fn select_activity(tail: &str) -> String {
format!("SELECT * FROM activity {tail}")
}
#[derive(Clone)] #[derive(Clone)]
pub struct Query { pub struct Query {
pool: SqlitePool, pool: SqlitePool,
env: Env,
} }
impl Query { impl Query {
pub fn new(pool: SqlitePool) -> Self { pub fn new(pool: SqlitePool, env: &Env) -> Self {
Self { pool } Self {
pool,
env: env.clone(),
}
} }
pub async fn list_tenants(&self) -> Result<Vec<Tenant>> { // Plans
let rows = sqlx::query_as::<_, Tenant>(
"SELECT pubkey, nwc_url, nwc_error, created_at, stripe_customer_id, stripe_subscription_id, past_due_at
FROM tenant
ORDER BY pubkey",
)
.fetch_all(&self.pool)
.await?;
Ok(rows)
}
pub async fn get_tenant(&self, pubkey: &str) -> Result<Option<Tenant>> { pub fn list_plans(&self) -> Vec<Plan> {
let row = sqlx::query_as::<_, Tenant>(
"SELECT pubkey, nwc_url, nwc_error, created_at, stripe_customer_id, stripe_subscription_id, past_due_at
FROM tenant
WHERE pubkey = ?",
)
.bind(pubkey)
.fetch_optional(&self.pool)
.await?;
Ok(row)
}
pub fn list_plans() -> Vec<Plan> {
vec![ vec![
Plan { Plan {
id: "free".to_string(), id: "free".to_string(),
@@ -54,7 +50,7 @@ impl Query {
members: Some(100), members: Some(100),
blossom: true, blossom: true,
livekit: true, livekit: true,
stripe_price_id: Some(std::env::var("STRIPE_PRICE_BASIC").unwrap_or_default()), stripe_price_id: Some(self.env.stripe_price_basic.clone()),
}, },
Plan { Plan {
id: "growth".to_string(), id: "growth".to_string(),
@@ -63,86 +59,33 @@ impl Query {
members: None, members: None,
blossom: true, blossom: true,
livekit: true, livekit: true,
stripe_price_id: Some(std::env::var("STRIPE_PRICE_GROWTH").unwrap_or_default()), stripe_price_id: Some(self.env.stripe_price_growth.clone()),
}, },
] ]
} }
pub fn get_plan(plan_id: &str) -> Option<Plan> { pub fn get_plan(&self, plan_id: &str) -> Option<Plan> {
Self::list_plans().into_iter().find(|p| p.id == plan_id) self.list_plans().into_iter().find(|p| p.id == plan_id)
} }
pub fn is_paid_plan(plan_id: &str) -> bool { pub fn is_paid_plan(&self, plan_id: &str) -> bool {
Self::get_plan(plan_id) self.get_plan(plan_id).is_some_and(|p| p.amount > 0)
.map(|p| p.id != "free")
.unwrap_or(false)
} }
pub async fn list_relays(&self) -> Result<Vec<Relay>> { // Tenants
let rows = sqlx::query_as::<_, Relay>(
"SELECT id, tenant, schema, subdomain, plan, stripe_subscription_item_id, pub async fn list_tenants(&self) -> Result<Vec<Tenant>> {
status, sync_error, let rows = sqlx::query_as::<_, Tenant>(&select_tenant(""))
info_name, info_icon, info_description, .fetch_all(&self.pool)
policy_public_join, policy_strip_signatures, .await?;
groups_enabled, management_enabled, blossom_enabled,
livekit_enabled, push_enabled, synced
FROM relay
ORDER BY id",
)
.fetch_all(&self.pool)
.await?;
Ok(rows) Ok(rows)
} }
pub async fn list_relays_pending_sync(&self) -> Result<Vec<Relay>> { pub async fn get_tenant(&self, pubkey: &str) -> Result<Option<Tenant>> {
let rows = sqlx::query_as::<_, Relay>( let row = sqlx::query_as::<_, Tenant>(&select_tenant("WHERE pubkey = ?"))
"SELECT id, tenant, schema, subdomain, plan, stripe_subscription_item_id, .bind(pubkey)
status, sync_error, .fetch_optional(&self.pool)
info_name, info_icon, info_description, .await?;
policy_public_join, policy_strip_signatures,
groups_enabled, management_enabled, blossom_enabled,
livekit_enabled, push_enabled, synced
FROM relay
WHERE synced = 0 OR TRIM(sync_error) != ''
ORDER BY id",
)
.fetch_all(&self.pool)
.await?;
Ok(rows)
}
pub async fn list_relays_for_tenant(&self, tenant_id: &str) -> Result<Vec<Relay>> {
let rows = sqlx::query_as::<_, Relay>(
"SELECT id, tenant, schema, subdomain, plan, stripe_subscription_item_id,
status, sync_error,
info_name, info_icon, info_description,
policy_public_join, policy_strip_signatures,
groups_enabled, management_enabled, blossom_enabled,
livekit_enabled, push_enabled, synced
FROM relay
WHERE tenant = ?
ORDER BY id",
)
.bind(tenant_id)
.fetch_all(&self.pool)
.await?;
Ok(rows)
}
pub async fn get_relay(&self, id: &str) -> Result<Option<Relay>> {
let row = sqlx::query_as::<_, Relay>(
"SELECT id, tenant, schema, subdomain, plan, stripe_subscription_item_id,
status, sync_error,
info_name, info_icon, info_description,
policy_public_join, policy_strip_signatures,
groups_enabled, management_enabled, blossom_enabled,
livekit_enabled, push_enabled, synced
FROM relay
WHERE id = ?",
)
.bind(id)
.fetch_optional(&self.pool)
.await?;
Ok(row) Ok(row)
} }
@@ -150,17 +93,49 @@ impl Query {
&self, &self,
stripe_customer_id: &str, stripe_customer_id: &str,
) -> Result<Option<Tenant>> { ) -> Result<Option<Tenant>> {
let row = sqlx::query_as::<_, Tenant>( let row = sqlx::query_as::<_, Tenant>(&select_tenant("WHERE stripe_customer_id = ?"))
"SELECT pubkey, nwc_url, nwc_error, created_at, stripe_customer_id, stripe_subscription_id, past_due_at .bind(stripe_customer_id)
FROM tenant .fetch_optional(&self.pool)
WHERE stripe_customer_id = ?", .await?;
)
.bind(stripe_customer_id)
.fetch_optional(&self.pool)
.await?;
Ok(row) Ok(row)
} }
// Relays
pub async fn list_relays(&self) -> Result<Vec<Relay>> {
let rows = sqlx::query_as::<_, Relay>(&select_relay(""))
.fetch_all(&self.pool)
.await?;
Ok(rows)
}
pub async fn list_relays_pending_sync(&self) -> Result<Vec<Relay>> {
let rows = sqlx::query_as::<_, Relay>(&select_relay(
"WHERE synced = 0 OR TRIM(sync_error) != ''",
))
.fetch_all(&self.pool)
.await?;
Ok(rows)
}
pub async fn list_relays_for_tenant(&self, tenant_id: &str) -> Result<Vec<Relay>> {
let rows = sqlx::query_as::<_, Relay>(&select_relay("WHERE tenant = ?"))
.bind(tenant_id)
.fetch_all(&self.pool)
.await?;
Ok(rows)
}
pub async fn get_relay(&self, id: &str) -> Result<Option<Relay>> {
let row = sqlx::query_as::<_, Relay>(&select_relay("WHERE id = ?"))
.bind(id)
.fetch_optional(&self.pool)
.await?;
Ok(row)
}
// Invoice state
pub async fn get_invoice_nwc_payment_state(&self, invoice_id: &str) -> Result<Option<String>> { pub async fn get_invoice_nwc_payment_state(&self, invoice_id: &str) -> Result<Option<String>> {
let state = sqlx::query_scalar::<_, String>( let state = sqlx::query_scalar::<_, String>(
"SELECT state FROM invoice_nwc_payment WHERE invoice_id = ?", "SELECT state FROM invoice_nwc_payment WHERE invoice_id = ?",
@@ -171,27 +146,41 @@ impl Query {
Ok(state) Ok(state)
} }
pub async fn has_active_paid_relays(&self, tenant_id: &str) -> Result<bool> { pub async fn get_invoice_manual_lightning_bolt11(
let plans = sqlx::query_scalar::<_, String>( &self,
"SELECT plan FROM relay WHERE tenant = ? AND status = 'active'", invoice_id: &str,
) -> Result<Option<String>> {
let bolt11 = sqlx::query_scalar::<_, String>(
"SELECT bolt11 FROM invoice_manual_lightning_payment WHERE invoice_id = ?",
) )
.bind(tenant_id) .bind(invoice_id)
.fetch_all(&self.pool) .fetch_optional(&self.pool)
.await?; .await?;
Ok(bolt11)
Ok(plans.into_iter().any(|plan| Self::is_paid_plan(&plan)))
} }
pub async fn list_activity_for_relay(&self, relay_id: &str) -> Result<Vec<Activity>> { // Activity
let rows = sqlx::query_as::<_, Activity>(
"SELECT id, tenant, created_at, activity_type, resource_type, resource_id pub async fn list_activity_for_resource(&self, resource_id: &str) -> Result<Vec<Activity>> {
FROM activity let rows = sqlx::query_as::<_, Activity>(&select_activity("WHERE resource_id = ? ORDER BY created_at DESC"))
WHERE resource_type = 'relay' AND resource_id = ? .bind(resource_id)
ORDER BY created_at DESC, id DESC",
)
.bind(relay_id)
.fetch_all(&self.pool) .fetch_all(&self.pool)
.await?; .await?;
Ok(rows) Ok(rows)
} }
pub async fn get_latest_activity_for_resource_and_type(
&self,
resource_id: &str,
activity_type: &str,
) -> Result<Option<Activity>> {
let row = sqlx::query_as::<_, Activity>(&select_activity(
"WHERE resource_id = ? AND activity_type = ? ORDER BY created_at DESC LIMIT 1",
))
.bind(resource_id)
.bind(activity_type)
.fetch_optional(&self.pool)
.await?;
Ok(row)
}
} }
+51 -127
View File
@@ -5,15 +5,11 @@ use anyhow::{Result, anyhow};
use nostr_sdk::prelude::*; use nostr_sdk::prelude::*;
use tokio::sync::Mutex; use tokio::sync::Mutex;
use crate::env::Env;
#[derive(Clone)] #[derive(Clone)]
pub struct Robot { pub struct Robot {
secret: String, env: Env,
name: String,
description: String,
picture: String,
outbox_client: Client,
indexer_client: Client,
messaging_client: Client,
outbox_cache: std::sync::Arc<Mutex<HashMap<String, CacheEntry>>>, outbox_cache: std::sync::Arc<Mutex<HashMap<String, CacheEntry>>>,
dm_cache: std::sync::Arc<Mutex<HashMap<String, CacheEntry>>>, dm_cache: std::sync::Arc<Mutex<HashMap<String, CacheEntry>>>,
} }
@@ -25,84 +21,61 @@ struct CacheEntry {
} }
impl Robot { impl Robot {
pub async fn new() -> Result<Self> { pub async fn new(env: &Env) -> Result<Self> {
let secret = std::env::var("ROBOT_SECRET").unwrap_or_default();
if secret.trim().is_empty() {
return Err(anyhow!("ROBOT_SECRET is required"));
}
let name = std::env::var("ROBOT_NAME").unwrap_or_default();
let description = std::env::var("ROBOT_DESCRIPTION").unwrap_or_default();
let picture = std::env::var("ROBOT_PICTURE").unwrap_or_default();
let outbox_relays = split_relays("ROBOT_OUTBOX_RELAYS");
let indexer_relays = split_relays("ROBOT_INDEXER_RELAYS");
let messaging_relays = split_relays("ROBOT_MESSAGING_RELAYS");
if outbox_relays.is_empty() {
return Err(anyhow!("ROBOT_OUTBOX_RELAYS is required"));
}
if indexer_relays.is_empty() {
return Err(anyhow!("ROBOT_INDEXER_RELAYS is required"));
}
if messaging_relays.is_empty() {
return Err(anyhow!("ROBOT_MESSAGING_RELAYS is required"));
}
let outbox_client = client_with_relays(&secret, &outbox_relays).await?;
let indexer_client = client_with_relays(&secret, &indexer_relays).await?;
let messaging_client = client_with_relays(&secret, &messaging_relays).await?;
let robot = Self { let robot = Self {
secret, env: env.clone(),
name,
description,
picture,
outbox_client,
indexer_client,
messaging_client,
outbox_cache: std::sync::Arc::new(Mutex::new(HashMap::new())), outbox_cache: std::sync::Arc::new(Mutex::new(HashMap::new())),
dm_cache: std::sync::Arc::new(Mutex::new(HashMap::new())), dm_cache: std::sync::Arc::new(Mutex::new(HashMap::new())),
}; };
robot robot.publish_identity().await?;
.publish_identity(&outbox_relays, &messaging_relays)
.await?;
Ok(robot) Ok(robot)
} }
async fn make_client(&self, relays: &[String]) -> Result<Client> {
let client = Client::new(self.env.keys.clone());
for relay in relays {
client.add_relay(relay).await?;
}
client.connect().await;
Ok(client)
}
async fn publish_identity( async fn publish_identity(
&self, &self,
outbox_relays: &[String],
messaging_relays: &[String],
) -> Result<()> { ) -> Result<()> {
let mut metadata = Metadata::new(); let mut metadata = Metadata::new();
if !self.name.is_empty() { if !self.env.robot_name.is_empty() {
metadata = metadata.name(&self.name); metadata = metadata.name(&self.env.robot_name);
} }
if !self.description.is_empty() { if !self.env.robot_description.is_empty() {
metadata = metadata.about(&self.description); metadata = metadata.about(&self.env.robot_description);
} }
if !self.picture.is_empty() { if !self.env.robot_picture.is_empty() {
metadata = metadata.picture(Url::parse(&self.picture)?); metadata = metadata.picture(Url::parse(&self.env.robot_picture)?);
} }
self.outbox_client let outbox_client = self.make_client(&self.env.robot_outbox_relays).await?;
let indexer_client = self.make_client(&self.env.robot_indexer_relays).await?;
outbox_client
.send_event_builder(EventBuilder::metadata(&metadata)) .send_event_builder(EventBuilder::metadata(&metadata))
.await?; .await?;
let outbox_tags = outbox_relays let outbox_tags = self.env.robot_outbox_relays
.iter() .iter()
.map(|r| Tag::parse(["r", r.as_str()])) .map(|r| Tag::parse(["r", r.as_str()]))
.collect::<std::result::Result<Vec<_>, _>>()?; .collect::<std::result::Result<Vec<_>, _>>()?;
self.outbox_client outbox_client
.send_event_builder(EventBuilder::new(Kind::Custom(10002), "").tags(outbox_tags)) .send_event_builder(EventBuilder::new(Kind::Custom(10002), "").tags(outbox_tags))
.await?; .await?;
let messaging_tags = messaging_relays let messaging_tags = self.env.robot_messaging_relays
.iter() .iter()
.map(|r| Tag::parse(["relay", r.as_str()])) .map(|r| Tag::parse(["relay", r.as_str()]))
.collect::<std::result::Result<Vec<_>, _>>()?; .collect::<std::result::Result<Vec<_>, _>>()?;
self.indexer_client indexer_client
.send_event_builder(EventBuilder::new(Kind::Custom(10050), "").tags(messaging_tags)) .send_event_builder(EventBuilder::new(Kind::Custom(10050), "").tags(messaging_tags))
.await?; .await?;
@@ -123,14 +96,8 @@ impl Robot {
} }
let recipient_pubkey = PublicKey::parse(recipient)?; let recipient_pubkey = PublicKey::parse(recipient)?;
let client = self.messaging_client.clone(); let client = self.make_client(&dm_relays).await?;
for relay in dm_relays { client.send_private_msg(recipient_pubkey, message, []).await?;
let _ = client.add_relay(relay).await;
}
client.connect().await;
client
.send_private_msg(recipient_pubkey, message, [])
.await?;
Ok(()) Ok(())
} }
@@ -141,10 +108,8 @@ impl Robot {
let pubkey = PublicKey::parse(recipient)?; let pubkey = PublicKey::parse(recipient)?;
let filter = Filter::new().author(pubkey).kind(Kind::Custom(10002)); let filter = Filter::new().author(pubkey).kind(Kind::Custom(10002));
let events = self let client = self.make_client(&self.env.robot_indexer_relays).await?;
.indexer_client let events = client.fetch_events(filter, Duration::from_secs(5)).await?;
.fetch_events(filter, Duration::from_secs(5))
.await?;
let mut relays = Vec::new(); let mut relays = Vec::new();
if let Some(event) = events.into_iter().max_by_key(|e| e.created_at) { if let Some(event) = events.into_iter().max_by_key(|e| e.created_at) {
@@ -160,6 +125,22 @@ impl Robot {
Ok(relays) Ok(relays)
} }
pub async fn fetch_nostr_name(&self, pubkey: &str) -> Option<String> {
let pubkey = PublicKey::parse(pubkey).ok()?;
let filter = Filter::new().author(pubkey).kind(Kind::Metadata).limit(1);
let client = self.make_client(&self.env.robot_indexer_relays).await.ok()?;
let events = client.fetch_events(filter, Duration::from_secs(5)).await.ok()?;
let event = events.into_iter().max_by_key(|e| e.created_at)?;
let content: serde_json::Value = serde_json::from_str(&event.content).ok()?;
let name = content
.get("display_name")
.or_else(|| content.get("name"))
.and_then(|v| v.as_str())
.map(|s| s.trim().to_string())
.filter(|s| !s.is_empty())?;
Some(name)
}
async fn fetch_messaging_relays_from_outbox( async fn fetch_messaging_relays_from_outbox(
&self, &self,
recipient: &str, recipient: &str,
@@ -170,13 +151,7 @@ impl Robot {
} }
let pubkey = PublicKey::parse(recipient)?; let pubkey = PublicKey::parse(recipient)?;
let keys = Keys::parse(&self.secret)?; let client = self.make_client(outbox_relays).await?;
let client = Client::new(keys);
for relay in outbox_relays {
client.add_relay(relay).await?;
}
client.connect().await;
let filter = Filter::new().author(pubkey).kind(Kind::Custom(10050)); let filter = Filter::new().author(pubkey).kind(Kind::Custom(10050));
let events = client.fetch_events(filter, Duration::from_secs(5)).await?; let events = client.fetch_events(filter, Duration::from_secs(5)).await?;
@@ -195,37 +170,6 @@ impl Robot {
} }
} }
fn split_relays(key: &str) -> Vec<String> {
std::env::var(key)
.unwrap_or_default()
.split(',')
.map(|v| normalize_relay_url(v.trim()))
.filter(|v| !v.is_empty())
.collect()
}
fn normalize_relay_url(url: &str) -> String {
if url.is_empty() {
return String::new();
}
if url.starts_with("ws://") || url.starts_with("wss://") {
url.to_string()
} else {
format!("wss://{url}")
}
}
async fn client_with_relays(secret: &str, relays: &[String]) -> Result<Client> {
let keys = Keys::parse(secret)?;
let client = Client::new(keys);
for relay in relays {
client.add_relay(relay).await?;
}
client.connect().await;
Ok(client)
}
async fn get_cached( async fn get_cached(
cache: &std::sync::Arc<Mutex<HashMap<String, CacheEntry>>>, cache: &std::sync::Arc<Mutex<HashMap<String, CacheEntry>>>,
key: &str, key: &str,
@@ -254,23 +198,3 @@ async fn set_cached(
}, },
); );
} }
#[cfg(test)]
impl Robot {
pub fn test_stub() -> Self {
let keys = Keys::generate();
let client = Client::new(keys);
Self {
secret: String::new(),
name: String::new(),
description: String::new(),
picture: String::new(),
outbox_client: client.clone(),
indexer_client: client.clone(),
messaging_client: client,
outbox_cache: std::sync::Arc::new(Mutex::new(HashMap::new())),
dm_cache: std::sync::Arc::new(Mutex::new(HashMap::new())),
}
}
}
+21
View File
@@ -0,0 +1,21 @@
use std::sync::Arc;
use axum::extract::State;
use serde::Serialize;
use crate::api::{Api, AuthedPubkey};
use crate::web::{ApiResult, ok};
#[derive(Serialize)]
struct IdentityResponse {
pubkey: String,
is_admin: bool,
}
pub async fn get_identity(
State(api): State<Arc<Api>>,
AuthedPubkey(pubkey): AuthedPubkey,
) -> ApiResult {
let is_admin = api.is_admin(&pubkey);
ok(IdentityResponse { pubkey, is_admin })
}
+80
View File
@@ -0,0 +1,80 @@
use std::sync::Arc;
use axum::extract::{Path, State};
use crate::api::{Api, AuthedPubkey};
use crate::web::{ApiResult, bad_request, internal, not_found, ok};
pub async fn list_tenant_invoices(
State(api): State<Arc<Api>>,
AuthedPubkey(auth): AuthedPubkey,
Path(pubkey): Path<String>,
) -> ApiResult {
api.require_admin_or_tenant(&auth, &pubkey)?;
let tenant = api.get_tenant_or_404(&pubkey).await?;
let invoices = api
.billing
.stripe_list_invoices(&tenant.stripe_customer_id)
.await
.map_err(internal)?;
ok(invoices)
}
pub async fn get_invoice(
State(api): State<Arc<Api>>,
AuthedPubkey(auth): AuthedPubkey,
Path(id): Path<String>,
) -> ApiResult {
let (invoice, tenant) = api
.billing
.get_invoice_with_tenant(&id)
.await
.map_err(internal)?
.ok_or_else(|| not_found("invoice not found"))?;
api.require_admin_or_tenant(&auth, &tenant.pubkey)?;
let invoice = api
.billing
.reconcile_manual_lightning_invoice(&id, &invoice)
.await
.map_err(internal)?;
ok(invoice)
}
pub async fn get_invoice_bolt11(
State(api): State<Arc<Api>>,
AuthedPubkey(auth): AuthedPubkey,
Path(id): Path<String>,
) -> ApiResult {
let (invoice, tenant) = api
.billing
.get_invoice_with_tenant(&id)
.await
.map_err(internal)?
.ok_or_else(|| not_found("invoice not found"))?;
api.require_admin_or_tenant(&auth, &tenant.pubkey)?;
let invoice = api
.billing
.reconcile_manual_lightning_invoice(&id, &invoice)
.await
.map_err(internal)?;
if invoice.status != "open" {
return Err(bad_request("invoice-not-open", "invoice is not open"));
}
let bolt11 = api
.billing
.get_or_create_manual_lightning_bolt11(
&id,
&tenant.pubkey,
invoice.amount_due,
&invoice.currency,
)
.await
.map_err(internal)?;
ok(serde_json::json!({ "bolt11": bolt11 }))
}
+6
View File
@@ -0,0 +1,6 @@
pub mod identity;
pub mod invoices;
pub mod plans;
pub mod relays;
pub mod stripe;
pub mod tenants;
+17
View File
@@ -0,0 +1,17 @@
use std::sync::Arc;
use axum::extract::{Path, State};
use crate::api::Api;
use crate::web::{ApiResult, not_found, ok};
pub async fn list_plans(State(api): State<Arc<Api>>) -> ApiResult {
ok(api.query.list_plans())
}
pub async fn get_plan(State(api): State<Arc<Api>>, Path(id): Path<String>) -> ApiResult {
match api.query.get_plan(&id) {
Some(plan) => ok(plan),
None => Err(not_found("plan not found")),
}
}
+315
View File
@@ -0,0 +1,315 @@
use std::sync::{Arc, LazyLock};
use anyhow::Result;
use axum::{
Json,
extract::{Path, State},
};
use regex::Regex;
use serde::Deserialize;
use crate::api::{Api, AuthedPubkey};
use crate::models::{
RELAY_STATUS_ACTIVE, RELAY_STATUS_DELINQUENT, RELAY_STATUS_INACTIVE, Relay,
};
use crate::web::{
ApiError, ApiResult, bad_request, created, internal, map_unique_error, ok,
parse_bool_default, unprocessable,
};
#[derive(Deserialize)]
pub struct CreateRelayRequest {
pub tenant: String,
pub subdomain: String,
pub plan: String,
pub info_name: String,
pub info_icon: String,
pub info_description: String,
pub policy_public_join: i64,
pub policy_strip_signatures: i64,
pub groups_enabled: i64,
pub management_enabled: i64,
pub blossom_enabled: i64,
pub livekit_enabled: i64,
pub push_enabled: i64,
}
#[derive(Deserialize)]
pub struct UpdateRelayRequest {
pub subdomain: Option<String>,
pub plan: Option<String>,
pub info_name: Option<String>,
pub info_icon: Option<String>,
pub info_description: Option<String>,
pub policy_public_join: Option<i64>,
pub policy_strip_signatures: Option<i64>,
pub groups_enabled: Option<i64>,
pub management_enabled: Option<i64>,
pub blossom_enabled: Option<i64>,
pub livekit_enabled: Option<i64>,
pub push_enabled: Option<i64>,
}
pub async fn list_relays(
State(api): State<Arc<Api>>,
AuthedPubkey(auth): AuthedPubkey,
) -> ApiResult {
api.require_admin(&auth)?;
let relays = api.query.list_relays().await.map_err(internal)?;
ok(relays)
}
pub async fn get_relay(
State(api): State<Arc<Api>>,
AuthedPubkey(auth): AuthedPubkey,
Path(id): Path<String>,
) -> ApiResult {
let relay = api.get_relay_or_404(&id).await?;
api.require_admin_or_tenant(&auth, &relay.tenant)?;
ok(relay)
}
pub async fn list_relay_activity(
State(api): State<Arc<Api>>,
AuthedPubkey(auth): AuthedPubkey,
Path(id): Path<String>,
) -> ApiResult {
let relay = api.get_relay_or_404(&id).await?;
api.require_admin_or_tenant(&auth, &relay.tenant)?;
let activity = api
.query
.list_activity_for_resource(&id)
.await
.map_err(internal)?;
ok(serde_json::json!({ "activity": activity }))
}
pub async fn list_relay_members(
State(api): State<Arc<Api>>,
AuthedPubkey(auth): AuthedPubkey,
Path(id): Path<String>,
) -> ApiResult {
let relay = api.get_relay_or_404(&id).await?;
api.require_admin_or_tenant(&auth, &relay.tenant)?;
let members = fetch_relay_members(&api, &relay).await.map_err(internal)?;
ok(serde_json::json!({ "members": members }))
}
pub async fn create_relay(
State(api): State<Arc<Api>>,
AuthedPubkey(auth): AuthedPubkey,
Json(payload): Json<CreateRelayRequest>,
) -> ApiResult {
api.require_admin_or_tenant(&auth, &payload.tenant)?;
let relay_id = format!(
"{}_{}",
payload.subdomain.replace('-', "_"),
&uuid::Uuid::new_v4().simple().to_string()[..8]
);
let relay = Relay {
id: relay_id.clone(),
tenant: payload.tenant,
schema: relay_id.clone(),
subdomain: payload.subdomain,
plan: payload.plan,
info_name: payload.info_name,
info_icon: payload.info_icon,
info_description: payload.info_description,
policy_public_join: payload.policy_public_join,
policy_strip_signatures: payload.policy_strip_signatures,
groups_enabled: payload.groups_enabled,
management_enabled: payload.management_enabled,
blossom_enabled: payload.blossom_enabled,
livekit_enabled: payload.livekit_enabled,
push_enabled: payload.push_enabled,
..Default::default()
};
let relay = prepare_relay(&api, relay)?;
api.command
.create_relay(&relay)
.await
.map_err(map_relay_write_error)?;
created(relay)
}
pub async fn update_relay(
State(api): State<Arc<Api>>,
AuthedPubkey(auth): AuthedPubkey,
Path(id): Path<String>,
Json(payload): Json<UpdateRelayRequest>,
) -> ApiResult {
let mut relay = api.get_relay_or_404(&id).await?;
api.require_admin_or_tenant(&auth, &relay.tenant)?;
let current_plan = relay.plan.clone();
let requested_plan = payload.plan.clone();
if let Some(v) = payload.subdomain {
relay.subdomain = v;
}
if let Some(v) = requested_plan.clone() {
relay.plan = v;
}
if let Some(v) = payload.info_name {
relay.info_name = v;
}
if let Some(v) = payload.info_icon {
relay.info_icon = v;
}
if let Some(v) = payload.info_description {
relay.info_description = v;
}
if let Some(v) = payload.policy_public_join {
relay.policy_public_join = v;
}
if let Some(v) = payload.policy_strip_signatures {
relay.policy_strip_signatures = v;
}
if let Some(v) = payload.groups_enabled {
relay.groups_enabled = v;
}
if let Some(v) = payload.management_enabled {
relay.management_enabled = v;
}
if let Some(v) = payload.blossom_enabled {
relay.blossom_enabled = v;
}
if let Some(v) = payload.livekit_enabled {
relay.livekit_enabled = v;
}
if let Some(v) = payload.push_enabled {
relay.push_enabled = v;
}
let relay = prepare_relay(&api, relay)?;
let plan_changed = requested_plan
.as_deref()
.is_some_and(|requested| requested != current_plan);
if plan_changed {
let selected_plan = api
.query
.get_plan(&relay.plan)
.expect("validated plan must exist");
if let Some(limit) = selected_plan.members {
let current_members = fetch_relay_members(&api, &relay)
.await
.map_err(internal)?
.len() as i64;
if current_members > limit {
let message = format!(
"relay has {current_members} members, which exceeds the {} plan limit of {limit}",
selected_plan.name.to_lowercase()
);
return Err(unprocessable("member-limit-exceeded", &message));
}
}
}
api.command
.update_relay(&relay)
.await
.map_err(map_relay_write_error)?;
ok(relay)
}
pub async fn deactivate_relay(
State(api): State<Arc<Api>>,
AuthedPubkey(auth): AuthedPubkey,
Path(id): Path<String>,
) -> ApiResult {
let relay = api.get_relay_or_404(&id).await?;
api.require_admin_or_tenant(&auth, &relay.tenant)?;
if relay.status == RELAY_STATUS_DELINQUENT {
return Err(bad_request("relay-is-delinquent", "relay is delinquent"));
}
if relay.status == RELAY_STATUS_INACTIVE {
return Err(bad_request("relay-is-inactive", "relay is already inactive"));
}
api.command
.deactivate_relay(&relay)
.await
.map_err(internal)?;
ok(())
}
pub async fn reactivate_relay(
State(api): State<Arc<Api>>,
AuthedPubkey(auth): AuthedPubkey,
Path(id): Path<String>,
) -> ApiResult {
let relay = api.get_relay_or_404(&id).await?;
api.require_admin_or_tenant(&auth, &relay.tenant)?;
if relay.status == RELAY_STATUS_DELINQUENT {
return Err(bad_request("relay-is-delinquent", "relay is delinquent"));
}
if relay.status == RELAY_STATUS_ACTIVE {
return Err(bad_request("relay-is-active", "relay is already active"));
}
api.command.activate_relay(&relay).await.map_err(internal)?;
ok(())
}
// --- helpers ----------------------------------------------------------------
async fn fetch_relay_members(api: &Api, relay: &Relay) -> Result<Vec<String>> {
if relay.synced == 0 {
return Ok(Vec::new());
}
api.infra.list_relay_members(&relay.id).await
}
const RESERVED_SUBDOMAINS: [&str; 3] = ["api", "admin", "internal"];
static SUBDOMAIN_RE: LazyLock<Regex> =
LazyLock::new(|| Regex::new(r"^[a-z0-9][a-z0-9-]{1,61}[a-z0-9]$").unwrap());
fn prepare_relay(api: &Api, mut relay: Relay) -> Result<Relay, ApiError> {
if !SUBDOMAIN_RE.is_match(&relay.subdomain)
|| RESERVED_SUBDOMAINS.contains(&relay.subdomain.as_str()) {
return Err(unprocessable("invalid-subdomain", "subdomain is invalid"));
}
let plan = api
.query
.get_plan(&relay.plan)
.ok_or_else(|| unprocessable("invalid-plan", "plan not found"))?;
if (!plan.blossom && relay.blossom_enabled == 1) || (!plan.livekit && relay.livekit_enabled == 1) {
return Err(unprocessable("premium-feature", "feature requires a paid plan"));
}
relay.policy_public_join = parse_bool_default(relay.policy_public_join, 0);
relay.policy_strip_signatures = parse_bool_default(relay.policy_strip_signatures, 0);
relay.groups_enabled = parse_bool_default(relay.groups_enabled, 1);
relay.management_enabled = parse_bool_default(relay.management_enabled, 1);
relay.blossom_enabled = parse_bool_default(relay.blossom_enabled, 0);
relay.livekit_enabled = parse_bool_default(relay.livekit_enabled, 0);
relay.push_enabled = parse_bool_default(relay.push_enabled, 1);
Ok(relay)
}
fn map_relay_write_error(e: anyhow::Error) -> ApiError {
if matches!(map_unique_error(&e), Some("subdomain-exists")) {
unprocessable("subdomain-exists", "subdomain already exists")
} else {
internal(e)
}
}
+55
View File
@@ -0,0 +1,55 @@
use std::sync::Arc;
use axum::{
body::Bytes,
extract::{Path, Query as QueryParams, State},
http::HeaderMap,
};
use serde::Deserialize;
use crate::api::{Api, AuthedPubkey};
use crate::web::{ApiResult, bad_request, internal, ok};
#[derive(Deserialize)]
pub struct StripeSessionParams {
return_url: Option<String>,
}
pub async fn create_stripe_session(
State(api): State<Arc<Api>>,
AuthedPubkey(auth): AuthedPubkey,
Path(pubkey): Path<String>,
QueryParams(params): QueryParams<StripeSessionParams>,
) -> ApiResult {
api.require_admin_or_tenant(&auth, &pubkey)?;
let tenant = api.get_tenant_or_404(&pubkey).await?;
let url = api
.billing
.stripe_create_portal_session(&tenant.stripe_customer_id, params.return_url.as_deref())
.await
.map_err(internal)?;
ok(serde_json::json!({ "url": url }))
}
/// Stripe webhook endpoint. Authenticated via `Stripe-Signature` verification
/// on the raw body, not via NIP-98, so it does not use `AuthedPubkey`.
pub async fn stripe_webhook(
State(api): State<Arc<Api>>,
headers: HeaderMap,
body: Bytes,
) -> ApiResult {
let signature = headers
.get("Stripe-Signature")
.and_then(|v| v.to_str().ok())
.unwrap_or("");
let payload = std::str::from_utf8(&body)
.map_err(|_| bad_request("bad-request", "invalid payload"))?;
api.billing
.handle_webhook(payload, signature)
.await
.map_err(|e| bad_request("webhook-error", &e.to_string()))?;
ok(())
}
+139
View File
@@ -0,0 +1,139 @@
use std::sync::Arc;
use axum::{
Json,
extract::{Path, State},
};
use chrono::Utc;
use serde::{Deserialize, Serialize};
use crate::api::{Api, AuthedPubkey};
use crate::models::Tenant;
use crate::web::{ApiResult, internal, map_unique_error, ok};
#[derive(Serialize)]
pub struct TenantResponse {
pub pubkey: String,
pub nwc_is_set: bool,
pub nwc_error: Option<String>,
pub created_at: i64,
pub stripe_customer_id: String,
pub stripe_subscription_id: Option<String>,
pub past_due_at: Option<i64>,
}
impl From<Tenant> for TenantResponse {
fn from(t: Tenant) -> Self {
TenantResponse {
nwc_is_set: !t.nwc_url.is_empty(),
pubkey: t.pubkey,
nwc_error: t.nwc_error,
created_at: t.created_at,
stripe_customer_id: t.stripe_customer_id,
stripe_subscription_id: t.stripe_subscription_id,
past_due_at: t.past_due_at,
}
}
}
#[derive(Deserialize)]
pub struct UpdateTenantRequest {
pub nwc_url: Option<String>,
}
pub async fn list_tenants(
State(api): State<Arc<Api>>,
AuthedPubkey(auth): AuthedPubkey,
) -> ApiResult {
api.require_admin(&auth)?;
let tenants = api.query.list_tenants().await.map_err(internal)?;
ok(tenants
.into_iter()
.map(TenantResponse::from)
.collect::<Vec<_>>())
}
/// Creates the tenant row for the calling pubkey. Idempotent: if the tenant
/// already exists (including a unique-constraint race) we return the existing
/// row.
pub async fn create_tenant(
State(api): State<Arc<Api>>,
AuthedPubkey(pubkey): AuthedPubkey,
) -> ApiResult {
if let Some(t) = api.query.get_tenant(&pubkey).await.map_err(internal)? {
return ok(TenantResponse::from(t));
}
let stripe_customer_id = api
.billing
.stripe_create_customer(&pubkey)
.await
.map_err(internal)?;
let tenant = Tenant {
pubkey: pubkey.clone(),
created_at: Utc::now().timestamp(),
stripe_customer_id,
..Default::default()
};
match api.command.create_tenant(&tenant).await {
Ok(()) => ok(TenantResponse::from(tenant)),
Err(e) if matches!(map_unique_error(&e), Some("pubkey-exists")) => {
match api.query.get_tenant(&pubkey).await {
Ok(Some(t)) => ok(TenantResponse::from(t)),
Ok(None) => Err(internal("tenant row missing after unique-constraint race")),
Err(e) => Err(internal(e)),
}
}
Err(e) => Err(internal(e)),
}
}
pub async fn get_tenant(
State(api): State<Arc<Api>>,
AuthedPubkey(auth): AuthedPubkey,
Path(pubkey): Path<String>,
) -> ApiResult {
api.require_admin_or_tenant(&auth, &pubkey)?;
let tenant = api.get_tenant_or_404(&pubkey).await?;
ok(TenantResponse::from(tenant))
}
pub async fn update_tenant(
State(api): State<Arc<Api>>,
AuthedPubkey(auth): AuthedPubkey,
Path(pubkey): Path<String>,
Json(payload): Json<UpdateTenantRequest>,
) -> ApiResult {
api.require_admin_or_tenant(&auth, &pubkey)?;
let mut tenant = api.get_tenant_or_404(&pubkey).await?;
// Encrypt tenant's nwc_url at rest
if let Some(nwc_url) = payload.nwc_url {
if nwc_url.is_empty() {
tenant.nwc_url = String::new();
} else {
tenant.nwc_url = api.env.encrypt(&nwc_url).map_err(internal)?;
}
}
api.command.update_tenant(&tenant).await.map_err(internal)?;
ok(TenantResponse::from(tenant))
}
pub async fn list_tenant_relays(
State(api): State<Arc<Api>>,
AuthedPubkey(auth): AuthedPubkey,
Path(pubkey): Path<String>,
) -> ApiResult {
api.require_admin_or_tenant(&auth, &pubkey)?;
let relays = api
.query
.list_relays_for_tenant(&pubkey)
.await
.map_err(internal)?;
ok(relays)
}
+442
View File
@@ -0,0 +1,442 @@
//! A thin async wrapper around the subset of the Stripe REST API this service uses.
//!
//! Nothing here knows about relays, tenants, or our database — it just speaks HTTP
//! to Stripe and hands back `serde_json::Value` (or small typed results). The
//! domain logic lives in [`crate::billing`].
use anyhow::{Result, anyhow};
use hmac::{Hmac, Mac};
use sha2::Sha256;
use std::collections::BTreeMap;
const STRIPE_API: &str = "https://api.stripe.com/v1";
// Webhooks
const WEBHOOK_TOLERANCE_SECS: i64 = 300;
#[derive(serde::Deserialize)]
pub struct StripeWebhookEvent {
#[serde(rename = "type")]
pub event_type: String,
pub data: StripeWebhookEventData,
}
#[derive(serde::Deserialize)]
pub struct StripeWebhookEventData {
pub object: serde_json::Value,
}
// API return types
#[derive(serde::Deserialize)]
pub struct StripeSubscription {
pub id: String,
pub status: String,
#[serde(deserialize_with = "deserialize_list")]
pub items: Vec<StripeSubscriptionItem>,
}
#[derive(serde::Deserialize)]
pub struct StripeSubscriptionItem {
pub id: String,
pub price: StripePrice,
#[serde(default = "default_quantity")]
pub quantity: i64,
}
#[derive(serde::Deserialize)]
pub struct StripePrice {
pub id: String,
}
#[derive(serde::Deserialize, serde::Serialize, Clone)]
pub struct StripeInvoice {
pub id: String,
pub customer: String,
pub status: String,
pub amount_due: i64,
pub currency: String,
#[serde(deserialize_with = "deserialize_list")]
pub lines: Vec<StripeInvoiceLine>,
}
#[derive(serde::Deserialize)]
pub struct StripeInvoicePreview {
pub amount_due: i64,
pub currency: String,
#[serde(deserialize_with = "deserialize_list")]
pub lines: Vec<StripeInvoiceLine>,
}
#[derive(serde::Deserialize, serde::Serialize, Clone)]
pub struct StripeInvoiceLine {
#[serde(default)]
pub proration: bool,
}
#[derive(serde::Deserialize)]
struct StripeList<T> {
data: Vec<T>,
}
fn deserialize_list<'de, D, T>(deserializer: D) -> Result<Vec<T>, D::Error>
where
D: serde::Deserializer<'de>,
T: serde::Deserialize<'de>,
{
Ok(<StripeList<T> as serde::Deserialize>::deserialize(deserializer)?.data)
}
fn default_quantity() -> i64 {
1
}
// Stripe struct and impl
#[derive(Clone)]
pub struct Stripe {
pub(crate) secret_key: String,
pub(crate) webhook_secret: String,
http: reqwest::Client,
}
impl Stripe {
pub fn new(secret_key: String, webhook_secret: String) -> Self {
Self {
secret_key,
webhook_secret,
http: reqwest::Client::new(),
}
}
// --- Request helpers ---
fn get(&self, path: &str) -> reqwest::RequestBuilder {
self.http
.get(format!("{STRIPE_API}{path}"))
.bearer_auth(&self.secret_key)
}
fn post(&self, path: &str) -> reqwest::RequestBuilder {
self.http
.post(format!("{STRIPE_API}{path}"))
.bearer_auth(&self.secret_key)
}
fn delete(&self, path: &str) -> reqwest::RequestBuilder {
self.http
.delete(format!("{STRIPE_API}{path}"))
.bearer_auth(&self.secret_key)
}
fn idempotency_key(&self, parts: &[&str]) -> String {
let mut mac = Hmac::<Sha256>::new_from_slice(self.secret_key.as_bytes())
.expect("HMAC accepts any key length");
for (i, part) in parts.iter().enumerate() {
if i > 0 {
mac.update(b":");
}
mac.update(part.as_bytes());
}
hex::encode(mac.finalize().into_bytes())
}
// --- Customers ---
pub async fn create_customer(&self, tenant_pubkey: &str, name: &str) -> Result<String> {
let body = self
.post("/customers")
.header(
"Idempotency-Key",
self.idempotency_key(&["create_customer", tenant_pubkey]),
)
.form(&[("name", name), ("metadata[tenant_pubkey]", tenant_pubkey)])
.send_json()
.await?;
let customer_id = body["id"]
.as_str()
.ok_or_else(|| anyhow!("missing customer id"))?;
Ok(customer_id.to_string())
}
// --- Subscriptions ---
pub async fn get_subscription(
&self,
subscription_id: &str,
) -> Result<Option<StripeSubscription>> {
let body = self
.get(&format!("/subscriptions/{subscription_id}"))
.send_optional_json()
.await?;
body.map(serde_json::from_value)
.transpose()
.map_err(Into::into)
}
pub async fn create_subscription(
&self,
customer_id: &str,
items: &BTreeMap<String, i64>,
) -> Result<StripeSubscription> {
let mut form: Vec<(String, String)> = vec![
("customer".to_string(), customer_id.to_string()),
(
"collection_method".to_string(),
"charge_automatically".to_string(),
),
];
let mut key_parts: Vec<String> =
vec!["create_subscription".to_string(), customer_id.to_string()];
for (index, (price_id, quantity)) in items.iter().enumerate() {
form.push((format!("items[{index}][price]"), price_id.clone()));
form.push((format!("items[{index}][quantity]"), quantity.to_string()));
key_parts.push(format!("{price_id}={quantity}"));
}
let key_refs: Vec<&str> = key_parts.iter().map(String::as_str).collect();
Ok(self
.post("/subscriptions")
.header("Idempotency-Key", self.idempotency_key(&key_refs))
.form(&form)
.send_ok()
.await?
.json()
.await?)
}
pub async fn create_subscription_item(
&self,
subscription_id: &str,
price_id: &str,
quantity: i64,
) -> Result<String> {
let quantity = quantity.to_string();
let body = self
.post("/subscription_items")
.header(
"Idempotency-Key",
self.idempotency_key(&["create_subscription_item", subscription_id, price_id]),
)
.form(&[
("subscription", subscription_id),
("price", price_id),
("quantity", quantity.as_str()),
])
.send_json()
.await?;
body["id"]
.as_str()
.map(str::to_string)
.ok_or_else(|| anyhow!("missing subscription item id"))
}
pub async fn set_subscription_item_quantity(&self, item_id: &str, quantity: i64) -> Result<()> {
self.post(&format!("/subscription_items/{item_id}"))
.form(&[("quantity", quantity.to_string())])
.send_ok()
.await?;
Ok(())
}
pub async fn delete_subscription_item(&self, item_id: &str) -> Result<()> {
self.delete(&format!("/subscription_items/{item_id}"))
.send_ok()
.await?;
Ok(())
}
pub async fn cancel_subscription(&self, subscription_id: &str) -> Result<()> {
self.delete(&format!("/subscriptions/{subscription_id}"))
.send_ok()
.await?;
Ok(())
}
// --- Invoices ---
pub async fn list_invoices(&self, customer_id: &str) -> Result<Vec<StripeInvoice>> {
let list: StripeList<StripeInvoice> = self
.get("/invoices")
.query(&[("customer", customer_id)])
.send_ok()
.await?
.json()
.await?;
Ok(list.data)
}
pub async fn get_invoice(&self, invoice_id: &str) -> Result<Option<StripeInvoice>> {
let body = self
.get(&format!("/invoices/{invoice_id}"))
.send_optional_json()
.await?;
body.map(serde_json::from_value)
.transpose()
.map_err(Into::into)
}
pub async fn pay_invoice(&self, invoice_id: &str) -> Result<()> {
self.post(&format!("/invoices/{invoice_id}/pay"))
.header(
"Idempotency-Key",
self.idempotency_key(&["pay_invoice", invoice_id]),
)
.send_ok()
.await?;
Ok(())
}
pub async fn pay_invoice_out_of_band(&self, invoice_id: &str) -> Result<()> {
self.post(&format!("/invoices/{invoice_id}/pay"))
.header(
"Idempotency-Key",
self.idempotency_key(&["pay_invoice_oob", invoice_id]),
)
.form(&[("paid_out_of_band", "true")])
.send_ok()
.await?;
Ok(())
}
pub async fn preview_invoice(
&self,
customer_id: &str,
subscription_id: Option<&str>,
) -> Result<StripeInvoicePreview> {
let mut req = self.get("/invoices/upcoming").query(&[("customer", customer_id)]);
if let Some(subscription_id) = subscription_id {
req = req.query(&[("subscription", subscription_id)]);
}
Ok(req.send_ok().await?.json().await?)
}
// --- Payment methods ---
pub async fn has_payment_method(&self, customer_id: &str) -> Result<bool> {
let body = self
.get("/payment_methods")
.query(&[("customer", customer_id), ("type", "card")])
.send_json()
.await?;
Ok(body["data"].as_array().is_some_and(|a| !a.is_empty()))
}
// --- Portal ---
pub async fn create_portal_session(
&self,
customer_id: &str,
return_url: Option<&str>,
) -> Result<String> {
let mut params = vec![("customer", customer_id.to_string())];
if let Some(url) = return_url {
params.push(("return_url", url.to_string()));
}
let body = self
.post("/billing_portal/sessions")
.form(&params)
.send_json()
.await?;
body["url"]
.as_str()
.map(str::to_string)
.ok_or_else(|| anyhow!("missing portal session url"))
}
// --- Webhooks ---
pub fn get_webhook_event(&self, payload: &str, signature: &str) -> Result<StripeWebhookEvent> {
let mut timestamp = None;
let mut sig = None;
for part in signature.split(',') {
if let Some(t) = part.strip_prefix("t=") {
timestamp = Some(t);
} else if let Some(v) = part.strip_prefix("v1=") {
sig = Some(v);
}
}
let timestamp = timestamp.ok_or_else(|| anyhow!("missing webhook timestamp"))?;
let signature = sig.ok_or_else(|| anyhow!("missing webhook signature"))?;
let signed_payload = format!("{timestamp}.{payload}");
let mut mac = Hmac::<Sha256>::new_from_slice(self.webhook_secret.as_bytes())
.map_err(|e| anyhow!("invalid webhook secret: {e}"))?;
mac.update(signed_payload.as_bytes());
let expected = hex::encode(mac.finalize().into_bytes());
if expected != signature {
return Err(anyhow!("webhook signature mismatch"));
}
let ts: i64 = timestamp
.parse()
.map_err(|_| anyhow!("bad webhook timestamp"))?;
let now = chrono::Utc::now().timestamp();
if (now - ts).abs() > WEBHOOK_TOLERANCE_SECS {
return Err(anyhow!("webhook timestamp outside tolerance"));
}
Ok(serde_json::from_str(payload)?)
}
}
trait StripeRequest {
async fn send_ok(self) -> Result<reqwest::Response>;
async fn send_json(self) -> Result<serde_json::Value>;
async fn send_optional_json(self) -> Result<Option<serde_json::Value>>;
}
impl StripeRequest for reqwest::RequestBuilder {
async fn send_ok(self) -> Result<reqwest::Response> {
error_for_status(self.send().await?).await
}
async fn send_json(self) -> Result<serde_json::Value> {
Ok(self.send_ok().await?.json().await?)
}
async fn send_optional_json(self) -> Result<Option<serde_json::Value>> {
let resp = self.send().await?;
if resp.status() == reqwest::StatusCode::NOT_FOUND {
return Ok(None);
}
Ok(Some(error_for_status(resp).await?.json().await?))
}
}
/// Give callers an actionable message instead of a bare "400 Bad Request"
async fn error_for_status(resp: reqwest::Response) -> Result<reqwest::Response> {
let status = resp.status();
if !status.is_client_error() && !status.is_server_error() {
return Ok(resp);
}
let url = resp.url().clone();
let body = resp.text().await.unwrap_or_default();
let detail = serde_json::from_str::<serde_json::Value>(&body)
.ok()
.and_then(|json| {
let error = &json["error"];
let message = error["message"].as_str()?.to_string();
let mut detail = message;
if let Some(code) = error["type"].as_str().or_else(|| error["code"].as_str()) {
detail.push_str(&format!(" [{code}]"));
}
if let Some(param) = error["param"].as_str() {
detail.push_str(&format!(" (param: {param})"));
}
Some(detail)
})
.unwrap_or_else(|| {
if body.trim().is_empty() {
"<empty response body>".to_string()
} else {
body
}
});
Err(anyhow!(
"Stripe API request to {url} failed with status {status}: {detail}"
))
}
+55
View File
@@ -0,0 +1,55 @@
use anyhow::{Result, anyhow};
use nwc::prelude::{
LookupInvoiceRequest, MakeInvoiceRequest, NWC, NostrWalletConnectURI, PayInvoiceRequest,
TransactionState,
};
#[derive(Clone)]
pub struct Wallet {
url: NostrWalletConnectURI,
}
impl Wallet {
pub fn from_url(url: &str) -> Result<Self> {
let url = url
.parse::<NostrWalletConnectURI>()
.map_err(|_| anyhow!("invalid NWC URL"))?;
Ok(Self { url })
}
pub async fn make_invoice(&self, amount_msats: u64, description: &str) -> Result<String> {
let nwc = NWC::new(self.url.clone());
let result = nwc
.make_invoice(MakeInvoiceRequest {
amount: amount_msats,
description: Some(description.to_string()),
description_hash: None,
expiry: None,
})
.await;
nwc.shutdown().await;
Ok(result
.map_err(|e| anyhow!("failed to create invoice: {e}"))?
.invoice)
}
pub async fn pay_invoice(&self, bolt11: String) -> Result<()> {
let nwc = NWC::new(self.url.clone());
let result = nwc.pay_invoice(PayInvoiceRequest::new(bolt11)).await;
nwc.shutdown().await;
result.map(|_| ()).map_err(|e| anyhow!("{e}"))
}
pub async fn is_settled(&self, bolt11: &str) -> Result<bool> {
let nwc = NWC::new(self.url.clone());
let result = nwc
.lookup_invoice(LookupInvoiceRequest {
payment_hash: None,
invoice: Some(bolt11.to_string()),
})
.await;
nwc.shutdown().await;
let response = result.map_err(|e| anyhow!("failed to lookup invoice: {e}"))?;
Ok(response.state == Some(TransactionState::Settled) || response.settled_at.is_some())
}
}
+125
View File
@@ -0,0 +1,125 @@
//! General-purpose HTTP helpers shared across route handlers.
//!
//! Success builders (`res`, `ok`, `created`) return [`ApiResult`] so they
//! can sit at the end of a handler without an `Ok(..)` wrap. Error builders
//! (`err`, `not_found`, `forbidden`, …) return [`ApiError`] so they compose
//! with `.map_err(...)` and with explicit `Err(...)` returns.
use std::fmt::Display;
use axum::{
Json,
http::StatusCode,
response::{IntoResponse, Response},
};
use serde::Serialize;
pub struct ApiError(pub Box<Response>);
impl IntoResponse for ApiError {
fn into_response(self) -> Response {
*self.0
}
}
impl From<Response> for ApiError {
fn from(r: Response) -> Self {
Self(Box::new(r))
}
}
pub type ApiResult = Result<Response, ApiError>;
#[derive(Serialize)]
pub struct DataResponse<T: Serialize> {
pub data: T,
pub code: &'static str,
}
#[derive(Serialize)]
pub struct ErrorResponse {
pub error: String,
pub code: String,
}
// --- success builders (return ApiResult) ------------------------------------
pub fn res<T: Serialize>(status: StatusCode, data: T) -> ApiResult {
Ok((status, Json(DataResponse { data, code: "ok" })).into_response())
}
pub fn ok<T: Serialize>(data: T) -> ApiResult {
res(StatusCode::OK, data)
}
pub fn created<T: Serialize>(data: T) -> ApiResult {
res(StatusCode::CREATED, data)
}
// --- error builders (return ApiError) ---------------------------------------
pub fn err(status: StatusCode, code: &str, message: &str) -> ApiError {
(
status,
Json(ErrorResponse {
error: message.to_string(),
code: code.to_string(),
}),
)
.into_response()
.into()
}
pub fn unauthorized(reason: impl Display) -> ApiError {
err(StatusCode::UNAUTHORIZED, "unauthorized", &reason.to_string())
}
pub fn forbidden(message: &str) -> ApiError {
err(StatusCode::FORBIDDEN, "forbidden", message)
}
pub fn not_found(message: &str) -> ApiError {
err(StatusCode::NOT_FOUND, "not-found", message)
}
pub fn bad_request(code: &str, message: &str) -> ApiError {
err(StatusCode::BAD_REQUEST, code, message)
}
pub fn unprocessable(code: &str, message: &str) -> ApiError {
err(StatusCode::UNPROCESSABLE_ENTITY, code, message)
}
pub fn internal(reason: impl Display) -> ApiError {
err(
StatusCode::INTERNAL_SERVER_ERROR,
"internal",
&reason.to_string(),
)
}
// --- misc utilities ---------------------------------------------------------
pub fn parse_bool_default(value: i64, default: i64) -> i64 {
if value == 0 || value == 1 {
value
} else {
default
}
}
/// Recognize sqlite UNIQUE constraint violations on known columns so the
/// caller can translate them into 422 responses instead of opaque 500s.
pub fn map_unique_error(err: &anyhow::Error) -> Option<&'static str> {
let sqlx_err = err.downcast_ref::<sqlx::Error>()?;
let sqlx::Error::Database(db_err) = sqlx_err else {
return None;
};
if db_err.message().contains("pubkey") {
return Some("pubkey-exists");
}
if db_err.message().contains("subdomain") {
return Some("subdomain-exists");
}
None
}
-31
View File
@@ -1,31 +0,0 @@
use axum::{Json, Router, routing::get};
use backend::billing::{fetch_btc_spot_price_from_base, fiat_minor_to_msats_from_quote};
#[tokio::test]
async fn quote_endpoint_can_be_stubbed_deterministically() {
async fn spot() -> Json<serde_json::Value> {
Json(serde_json::json!({ "data": { "amount": "50000.00" } }))
}
let app = Router::new().route("/v2/prices/BTC-USD/spot", get(spot));
let listener = tokio::net::TcpListener::bind("127.0.0.1:0")
.await
.expect("bind test server");
let addr = listener.local_addr().expect("get local addr");
tokio::spawn(async move {
axum::serve(listener, app).await.expect("serve quote stub");
});
let client = reqwest::Client::new();
let base = format!("http://{addr}/v2/prices");
let btc_price = fetch_btc_spot_price_from_base(&client, &base, "USD")
.await
.expect("fetch stubbed quote");
assert_eq!(btc_price, 50_000.0);
let msats =
fiat_minor_to_msats_from_quote(100, "USD", btc_price).expect("convert quoted fiat amount");
assert_eq!(msats, 2_000_000);
}
+1 -1
View File
@@ -40,7 +40,7 @@ export default function PaymentSetup(props: PaymentSetupProps) {
setRedirecting(true) setRedirecting(true)
setError("") setError("")
try { try {
const { url } = await createPortalSession(account()!.pubkey) const { url } = await createPortalSession(account()!.pubkey, window.location.href)
window.location.href = url window.location.href = url
} catch (e) { } catch (e) {
setError(e instanceof Error ? e.message : "Failed to open billing portal") setError(e instanceof Error ? e.message : "Failed to open billing portal")
+3 -2
View File
@@ -253,8 +253,9 @@ export function reactivateRelay(id: string) {
return callApi<undefined, void>("POST", `/relays/${id}/reactivate`) return callApi<undefined, void>("POST", `/relays/${id}/reactivate`)
} }
export function createPortalSession(pubkey: string) { export function createPortalSession(pubkey: string, returnUrl?: string) {
return callApi<undefined, { url: string }>("GET", `/tenants/${pubkey}/stripe/session`) const query = returnUrl ? `?return_url=${encodeURIComponent(returnUrl)}` : ""
return callApi<undefined, { url: string }>("GET", `/tenants/${pubkey}/stripe/session${query}`)
} }
export function getInvoiceBolt11(invoiceId: string) { export function getInvoiceBolt11(invoiceId: string) {
+9 -4
View File
@@ -1,5 +1,5 @@
import { createSignal } from "solid-js" import { createSignal } from "solid-js"
import { updateRelayById, deactivateRelayById, reactivateRelayById, getLatestOpenInvoice, type Relay } from "@/lib/hooks" import { updateRelayById, deactivateRelayById, reactivateRelayById, getLatestOpenInvoice, tenantNeedsPaymentSetup, type Relay } from "@/lib/hooks"
import { setToastMessage } from "@/components/Toast" import { setToastMessage } from "@/components/Toast"
import type { Invoice, PlanId } from "@/lib/api" import type { Invoice, PlanId } from "@/lib/api"
@@ -31,6 +31,7 @@ export default function useRelayToggles(
) { ) {
const [busy, setBusy] = createSignal(false) const [busy, setBusy] = createSignal(false)
const [pendingInvoice, setPendingInvoice] = createSignal<Invoice | undefined>() const [pendingInvoice, setPendingInvoice] = createSignal<Invoice | undefined>()
const [pendingPaymentSetup, setPendingPaymentSetup] = createSignal(false)
async function updateRelay(next: Relay, previous: Relay) { async function updateRelay(next: Relay, previous: Relay) {
mutate(next) mutate(next)
@@ -101,8 +102,12 @@ export default function useRelayToggles(
} }
if (plan !== "free") { if (plan !== "free") {
const invoice = await getLatestOpenInvoice() const needsSetup = await tenantNeedsPaymentSetup()
if (invoice) setPendingInvoice(invoice) if (needsSetup) {
const invoice = await getLatestOpenInvoice()
if (invoice) setPendingInvoice(invoice)
setPendingPaymentSetup(true)
}
} }
} }
@@ -116,5 +121,5 @@ export default function useRelayToggles(
onToggleLivekitSupport: () => toggle("livekit_enabled", relay()?.plan !== "free"), onToggleLivekitSupport: () => toggle("livekit_enabled", relay()?.plan !== "free"),
} }
return { busy, handleDeactivate, handleReactivate, handleUpdatePlan, pendingInvoice, clearPendingInvoice: () => setPendingInvoice(undefined), toggles } return { busy, handleDeactivate, handleReactivate, handleUpdatePlan, pendingInvoice, clearPendingInvoice: () => setPendingInvoice(undefined), pendingPaymentSetup, clearPendingPaymentSetup: () => setPendingPaymentSetup(false), toggles }
} }
+1 -1
View File
@@ -49,7 +49,7 @@ export default function Account() {
async function openPortal() { async function openPortal() {
setPortalLoading(true) setPortalLoading(true)
try { try {
const { url } = await createPortalSession(account()!.pubkey) const { url } = await createPortalSession(account()!.pubkey, window.location.href)
window.location.href = url window.location.href = url
} catch (e) { } catch (e) {
setError(e instanceof Error ? e.message : "Failed to open billing portal") setError(e instanceof Error ? e.message : "Failed to open billing portal")
+9 -2
View File
@@ -1,5 +1,5 @@
import { useParams } from "@solidjs/router" import { useParams } from "@solidjs/router"
import { createMemo, createResource, createSignal, Show } from "solid-js" import { createEffect, createMemo, createResource, createSignal, Show } from "solid-js"
import BackLink from "@/components/BackLink" import BackLink from "@/components/BackLink"
import PageContainer from "@/components/PageContainer" import PageContainer from "@/components/PageContainer"
import PaymentDialog from "@/components/PaymentDialog" import PaymentDialog from "@/components/PaymentDialog"
@@ -28,13 +28,20 @@ export default function RelayDetail() {
}) })
const loading = useMinLoading(() => relay.loading && !relay()) const loading = useMinLoading(() => relay.loading && !relay())
const [activity] = useRelayActivity(relayId) const [activity] = useRelayActivity(relayId)
const { busy, handleDeactivate, handleReactivate, handleUpdatePlan, pendingInvoice, clearPendingInvoice, toggles } = useRelayToggles(relayId, relay, { refetch, mutate }) const { busy, handleDeactivate, handleReactivate, handleUpdatePlan, pendingInvoice, clearPendingInvoice, pendingPaymentSetup, clearPendingPaymentSetup, toggles } = useRelayToggles(relayId, relay, { refetch, mutate })
const [tenant, { refetch: refetchTenant }] = useTenant() const [tenant, { refetch: refetchTenant }] = useTenant()
const [paymentSetupOpen, setPaymentSetupOpen] = createSignal(false) const [paymentSetupOpen, setPaymentSetupOpen] = createSignal(false)
const [invoiceDialogOpen, setInvoiceDialogOpen] = createSignal(false) const [invoiceDialogOpen, setInvoiceDialogOpen] = createSignal(false)
const [paymentBannerDismissed, setPaymentBannerDismissed] = createSignal(false) const [paymentBannerDismissed, setPaymentBannerDismissed] = createSignal(false)
createEffect(() => {
if (pendingPaymentSetup() && !pendingInvoice()) {
setPaymentSetupOpen(true)
clearPendingPaymentSetup()
}
})
const isPaidRelay = createMemo(() => { const isPaidRelay = createMemo(() => {
const r = relay() const r = relay()
if (!r) return false if (!r) return false
+22 -6
View File
@@ -3,13 +3,15 @@ import { useNavigate } from "@solidjs/router"
import BackLink from "@/components/BackLink" import BackLink from "@/components/BackLink"
import PageContainer from "@/components/PageContainer" import PageContainer from "@/components/PageContainer"
import PaymentDialog from "@/components/PaymentDialog" import PaymentDialog from "@/components/PaymentDialog"
import PaymentSetup from "@/components/PaymentSetup"
import RelayForm, { type RelayFormValues } from "@/components/RelayForm" import RelayForm, { type RelayFormValues } from "@/components/RelayForm"
import { createRelayForActiveTenant, getLatestOpenInvoice } from "@/lib/hooks" import { createRelayForActiveTenant, getLatestOpenInvoice, tenantNeedsPaymentSetup } from "@/lib/hooks"
import type { Invoice } from "@/lib/api" import type { Invoice } from "@/lib/api"
export default function RelayNew() { export default function RelayNew() {
const navigate = useNavigate() const navigate = useNavigate()
const [pendingInvoice, setPendingInvoice] = createSignal<Invoice | undefined>() const [pendingInvoice, setPendingInvoice] = createSignal<Invoice | undefined>()
const [paymentSetupOpen, setPaymentSetupOpen] = createSignal(false)
let createdRelayId = "" let createdRelayId = ""
async function handleSubmit(values: RelayFormValues) { async function handleSubmit(values: RelayFormValues) {
@@ -17,9 +19,14 @@ export default function RelayNew() {
createdRelayId = relay.id createdRelayId = relay.id
if (values.plan !== "free") { if (values.plan !== "free") {
const invoice = await getLatestOpenInvoice() const needsSetup = await tenantNeedsPaymentSetup()
if (invoice) { if (needsSetup) {
setPendingInvoice(invoice) const invoice = await getLatestOpenInvoice()
if (invoice) {
setPendingInvoice(invoice)
return
}
setPaymentSetupOpen(true)
return return
} }
} }
@@ -27,8 +34,13 @@ export default function RelayNew() {
navigate(`/relays/${relay.id}`) navigate(`/relays/${relay.id}`)
} }
function handleDialogClose() { function handleInvoiceClose() {
setPendingInvoice(undefined) setPendingInvoice(undefined)
setPaymentSetupOpen(true)
}
function handleSetupClose() {
setPaymentSetupOpen(false)
navigate(`/relays/${createdRelayId}`) navigate(`/relays/${createdRelayId}`)
} }
@@ -47,10 +59,14 @@ export default function RelayNew() {
<PaymentDialog <PaymentDialog
invoice={inv()} invoice={inv()}
open={true} open={true}
onClose={handleDialogClose} onClose={handleInvoiceClose}
/> />
)} )}
</Show> </Show>
<PaymentSetup
open={paymentSetupOpen()}
onClose={handleSetupClose}
/>
</PageContainer> </PageContainer>
) )
} }
+4 -1
View File
@@ -5,6 +5,9 @@ dev:
cd frontend && bun dev & cd frontend && bun dev &
wait wait
dev-backend:
cd backend && onchange src -ik -- bash -c 'RUST_LOG=backend=info cargo run'
dev-frontend: dev-frontend:
cd frontend && bun run dev cd frontend && bun run dev
@@ -27,7 +30,7 @@ build-backend:
cd backend && cargo build cd backend && cargo build
build-frontend: build-frontend:
cd frontend && bun run build cd frontend && bun i && bun run build
fmt: fmt-backend fmt: fmt-backend
-2
View File
@@ -1,2 +0,0 @@
- [ ] Fix billing by using stripe as a backend to do proration, then mark invoices paid manually when using bitcoin.
- [ ] Send a payment link instead of an invoice so we can generate/pay on the fly