Clear billing logic, do some cleanup

This commit is contained in:
Jon Staab
2026-04-01 14:30:09 -07:00
parent d1209c635b
commit baae65b8b2
13 changed files with 152 additions and 1330 deletions
+2 -27
View File
@@ -10,8 +10,7 @@ CREATE TABLE IF NOT EXISTS activity (
CREATE TABLE IF NOT EXISTS tenant ( CREATE TABLE IF NOT EXISTS tenant (
pubkey TEXT PRIMARY KEY, pubkey TEXT PRIMARY KEY,
nwc_url TEXT NOT NULL DEFAULT '', nwc_url TEXT NOT NULL DEFAULT '',
created_at INTEGER NOT NULL, created_at INTEGER NOT NULL
billing_anchor INTEGER NOT NULL
); );
CREATE TABLE IF NOT EXISTS relay ( CREATE TABLE IF NOT EXISTS relay (
@@ -21,6 +20,7 @@ CREATE TABLE IF NOT EXISTS relay (
subdomain TEXT NOT NULL UNIQUE, subdomain TEXT NOT NULL UNIQUE,
plan TEXT NOT NULL, plan TEXT NOT NULL,
status TEXT NOT NULL, status TEXT NOT NULL,
synced INTEGER NOT NULL DEFAULT 0,
sync_error TEXT NOT NULL DEFAULT '', sync_error TEXT NOT NULL DEFAULT '',
info_name TEXT NOT NULL DEFAULT '', info_name TEXT NOT NULL DEFAULT '',
info_icon TEXT NOT NULL DEFAULT '', info_icon TEXT NOT NULL DEFAULT '',
@@ -34,28 +34,3 @@ CREATE TABLE IF NOT EXISTS relay (
push_enabled INTEGER NOT NULL DEFAULT 1, push_enabled INTEGER NOT NULL DEFAULT 1,
FOREIGN KEY (tenant) REFERENCES tenant(pubkey) FOREIGN KEY (tenant) REFERENCES tenant(pubkey)
); );
CREATE TABLE IF NOT EXISTS invoice (
id TEXT PRIMARY KEY,
tenant TEXT NOT NULL,
status TEXT NOT NULL,
created_at INTEGER NOT NULL,
attempted_at INTEGER NOT NULL DEFAULT 0,
error TEXT NOT NULL DEFAULT '',
closed_at INTEGER NOT NULL DEFAULT 0,
sent_at INTEGER NOT NULL DEFAULT 0,
paid_at INTEGER NOT NULL DEFAULT 0,
bolt11 TEXT NOT NULL,
period_start INTEGER NOT NULL,
period_end INTEGER NOT NULL,
FOREIGN KEY (tenant) REFERENCES tenant(pubkey)
);
CREATE TABLE IF NOT EXISTS invoice_item (
id TEXT PRIMARY KEY,
invoice TEXT NOT NULL,
relay TEXT NOT NULL,
sats INTEGER NOT NULL,
FOREIGN KEY (invoice) REFERENCES invoice(id),
FOREIGN KEY (relay) REFERENCES relay(id)
);
-1
View File
@@ -1 +0,0 @@
ALTER TABLE relay ADD COLUMN synced INTEGER NOT NULL DEFAULT 0;
+19 -31
View File
@@ -21,11 +21,9 @@ Notes:
- Reads environment and populates members - Reads environment and populates members
## `pub fn serve(&self) -> Result<()>` ## `pub fn router(&self) -> Result<()>`
- Initializes an `axum::Router` - Returns an `axum::Router`
- Adds CORS middleware based on `origins`
- Calls `axum::serve` with a listener
--- Plan routes --- Plan routes
@@ -65,25 +63,19 @@ Notes:
- Authorizes admin or matching tenant - Authorizes admin or matching tenant
- Return `data` is a single tenant struct from `repo.get_tenant` - Return `data` is a single tenant struct from `repo.get_tenant`
## `async fn update_tenant(...) -> Response`
- Serves `PUT /tenants/:pubkey`
- Authorizes admin or matching tenant
- Updates tenant using `repo.update_tenant`
- Return `data` is the updated tenant struct
## `async fn list_tenant_relays(...) -> Response` ## `async fn list_tenant_relays(...) -> Response`
- Serves `GET /tenants/:pubkey/relays` - Serves `GET /tenants/:pubkey/relays`
- Authorizes admin or matching tenant - Authorizes admin or matching tenant
- Return `data` is a list of relay structs from `repo.list_relays_for_tenant` - Return `data` is a list of relay structs from `repo.list_relays_for_tenant`
## `async fn list_tenant_invoices(...) -> Response`
- Serves `GET /tenants/:pubkey/invoices`
- Authorizes admin or matching tenant
- Return `data` is a list of invoice structs from `repo.list_invoices_for_tenant`
## `async fn update_tenant_billing(...) -> Response`
- Serves `PUT /tenants/:pubkey/billing`
- Authorizes admin or matching tenant
- Updates tenant billing NWC URL using `repo.update_tenant_nwc_url`
- Return `data` is the submitted billing payload
--- Relay routes --- Relay routes
## `async fn list_relays(...) -> Response` ## `async fn list_relays(...) -> Response`
@@ -120,28 +112,24 @@ Notes:
- Serves `GET /relays/:id/activity` - Serves `GET /relays/:id/activity`
- Authorizes admin or relay owner - Authorizes admin or relay owner
- Return `data` is a list of activity structs from `repo.list_activity_for_relay` - Get activity from `repo.list_activity_for_relay`
- Return `data` is `{activity}`
## `async fn deactivate_relay(...) -> Response` ## `async fn deactivate_relay(...) -> Response`
- Serves `POST /relays/:id/deactivate` - Serves `POST /relays/:id/deactivate`
- Authorizes admin or relay owner - Authorizes admin or relay owner
- Deactivates relay using `repo.deactivate_relay` - If relay is already active, return a `400` with `code=relay-is-inactive`
- Call `billing.deactivate_relay`
- Return `data` is empty - Return `data` is empty
--- Invoice routes ## `async fn reactivate_relay(...) -> Response`
## `async fn list_invoices(...) -> Response` - Serves `POST /relays/:id/reactivate`
- Authorizes admin or relay owner
- Serves `GET /invoices` - If relay is already active, return a `400` with `code=relay-is-active`
- Authorizes admin only - Call `billing.reactivate_relay`
- Return `data` is a list of invoice structs from `repo.list_invoices` - Return `data` is empty
## `async fn get_invoice(...) -> Response`
- Serves `GET /invoices/:id`
- Authorizes admin or invoice owner
- Return `data` is a single invoice struct from `repo.get_invoice`
--- Utilities --- Utilities
+2 -39
View File
@@ -1,50 +1,13 @@
# `pub struct Billing` # `pub struct Billing`
Billing is a service which polls the database, creates invoices, and attempts to collect payment for invoices. Billing encapsulates logic related to synchronizing state with Stripe.
Members: Members:
- `nwc_url: String` - a nostr wallet connect URL used to **create** invoices - `nwc_url: String` - a nostr wallet connect URL used to **create** bolt11 invoices
- `repo: Repo` - `repo: Repo`
- `robot: Robot` - `robot: Robot`
## `pub fn new(repo: Repo, robot: Robot) -> Self` ## `pub fn new(repo: Repo, robot: Robot) -> Self`
- Reads environment and populates members - Reads environment and populates members
## `pub async fn start(self)`
Calls `self.tick` in a loop every hour.
## `pub async fn tick(self)`
Iterates over `repo.list_activity` since last run and does the following:
- For any `create_relay|update_relay|activate_relay` activity if this is the first non-free relay for the tenant, update tenant's billing anchor to the time the relay was created.
Also iterates over `repo.list_tenants()` and for each tenant calls `self.generate_invoice_if_due(tenant)` and `self.collect_outstanding(tenant)`.
## `async fn generate_invoice_if_due(&self, tenant: &Tenant)`
- Skip tenants that have a `pending` invoice or have no active non-free relays.
- Compute current billing period from `tenant.billing_anchor` as rolling monthly windows: `[period_start, period_end)`.
- Only generate an invoice once the period has closed (`now >= period_end`).
- Load activity needed to compute usage for the tenant using `repo.list_activity`.
- Calculate how many hours (rounded up) each relay was active during the window per paid plan.
- If total invoice amount is 0, return.
- Generate a `bolt11` invoice. If this fails, log the error and return.
- Generate an invoice for the tenant and an invoice item for each relay.
- Persist invoice + items atomically using `repo.create_invoice`.
## `async fn collect_outstanding(&self, tenant: &Tenant)`
- Load `pending` tenant invoices and attempt to collect each one.
- If `attempted_at` is less than 24 hours ago, skip it.
- If the `bolt11` invoice has been paid out of band, call `repo.mark_invoice_paid` and return.
- If the tenant has a `nwc_url`, attempt to pay the invoice with nwc.
- If collection succeeds, call `repo.mark_invoice_paid`.
- If collection fails, populate `repo.mark_invoice_attempted`.
- If nwc isn't set up or fails and `sent_at` is not set:
- Send a NIP 17 DM to the user with the invoice included.
- Call `repo.mark_invoice_sent`.
- If the invoice is 7 days past `created_at`, call `repo.mark_invoice_closed`.
+3 -2
View File
@@ -2,6 +2,7 @@
- Configures logging - Configures logging
- Creates instances of `Repo`, `Robot`, `Billing`, `Api`, and `Infra` - Creates instances of `Repo`, `Robot`, `Billing`, `Api`, and `Infra`
- Spawns `billing.start`
- Spawns `infra.start` - Spawns `infra.start`
- Calls `api.serve` - Get an axum router from `api.router`
- Adds CORS middleware based on `origins`
- Calls `axum::serve` with a listener
+4 -36
View File
@@ -19,18 +19,13 @@ Activity is an audit log of all actions performed by a user or a worker process.
- `created_at` - unix timestamp when the activity was created - `created_at` - unix timestamp when the activity was created
- `activity_type` is one of: - `activity_type` is one of:
- `create_tenant` - `create_tenant`
- `update_tenant_billing_anchor` - `update_tenant`
- `update_tenant_nwc_url`
- `create_relay` - `create_relay`
- `update_relay` - `update_relay`
- `update_relay_plan`
- `activate_relay` - `activate_relay`
- `deactivate_relay` - `deactivate_relay`
- `fail_relay_sync` - `fail_relay_sync`
- `create_invoice`
- `mark_invoice_paid`
- `mark_invoice_attempted`
- `mark_invoice_sent`
- `mark_invoice_closed`
- `resource_type` is a string identifying the resource type being modified. - `resource_type` is a string identifying the resource type being modified.
- `resource_id` is a string identifying the resource id being modified. - `resource_id` is a string identifying the resource id being modified.
@@ -58,7 +53,6 @@ Tenants are customers of the service, identified by a nostr `pubkey`. Public met
- `pubkey` is the nostr public key identifying the tenant - `pubkey` is the nostr public key identifying the tenant
- `nwc_url` (private) a nostr wallet connect URL used for **paying** invoices generated by the system - `nwc_url` (private) a nostr wallet connect URL used for **paying** invoices generated by the system
- `created_at` unix timestamp identifying tenant creation time - `created_at` unix timestamp identifying tenant creation time
- `billing_anchor` unix timestamp identifying billing cycle anchor. This gets reset when the tenant has no paid relays and adds (or reactivates) one.
# Relay # Relay
@@ -69,7 +63,8 @@ A relay is a nostr relay owned by a `tenant` and hosted by the attached zooid in
- `schema` - the relay's db schema (read_only, calculated based on `subdomain` + `id`) - `schema` - the relay's db schema (read_only, calculated based on `subdomain` + `id`)
- `subdomain` - the relay's subdomain - `subdomain` - the relay's subdomain
- `plan` - the relay's plan - `plan` - the relay's plan
- `status` - `new|active|inactive`. Only `active` relays count toward billing. - `status` - `active|inactive`. Only `active` relays count toward billing.
- `synced` - whether the relay has been successfully synced to zooid at least once.
- `sync_error` - a string indicating any errors encountered when synchronizing. - `sync_error` - a string indicating any errors encountered when synchronizing.
- `info_name` - the relay's name - `info_name` - the relay's name
- `info_icon` - the relay's icon image URL - `info_icon` - the relay's icon image URL
@@ -81,7 +76,6 @@ A relay is a nostr relay owned by a `tenant` and hosted by the attached zooid in
- `blossom_enabled` - whether blossom file storage is enabled - `blossom_enabled` - whether blossom file storage is enabled
- `livekit_enabled` - whether livekit calls are enabled - `livekit_enabled` - whether livekit calls are enabled
- `push_enabled` - whether relay push is enabled - `push_enabled` - whether relay push is enabled
- `synced` (private) - whether the relay has been successfully synced to zooid at least once. Used by infra to decide POST vs PUT.
Some attributes persisted to zooid via API have special handling: Some attributes persisted to zooid via API have special handling:
@@ -91,29 +85,3 @@ Some attributes persisted to zooid via API have special handling:
- The relay's `livekit_*` configuration is inferred based on environment variables and `livekit_enabled`. - The relay's `livekit_*` configuration is inferred based on environment variables and `livekit_enabled`.
- The relay's `roles` are hard-coded for now. - The relay's `roles` are hard-coded for now.
# Invoice
Invoices are generated at the end of a tenant's monthly billing period. The billing module is responsible for creating them, collecting them, and dunning them.
- `id` - random invoice ID
- `tenant` - tenant pubkey
- `status` - `pending|paid|closed`
- `amount` is derived as the sum of associated invoice item `sats` values (not stored as a separate source of truth)
- `created_at` - unix timestamp for when the invoice was created
- `attempted_at` - nullable unix timestamp for when collection was last attempted
- `error` - optional human-readable error from the last failed collection attempt
- `closed_at` - nullable unix timestamp for when the invoice was closed
- `sent_at` - nullable unix timestamp for when the invoice was sent via DM
- `paid_at` - nullable unix timestamp for when the invoice was paid
- `bolt11` - a BOLT 11 lightning invoice that can be used to pay the invoice
- `period_start` - unix timestamp for period start
- `period_end` - unix timestamp for period end
# Invoice Item
Invoice items are attached to an invoice and represent charges for a given relay.
- `id` - random invoice item ID
- `invoice` - invoice ID
- `relay` - relay ID
- `sats` - amount in satoshis
+4 -56
View File
@@ -10,7 +10,7 @@ Notes:
- All public write methods should be run in a transaction so they're atomic - All public write methods should be run in a transaction so they're atomic
- All writes should be accompanied by an activity log entry of `(tenant, activity_type, resource_type, resource_id)` - All writes should be accompanied by an activity log entry of `(tenant, activity_type, resource_type, resource_id)`
- Database table names are singular: `activity`, `tenant`, `relay`, `invoice`, `invoice_item` - Database table names are singular: `activity`, `tenant`, `relay`
## `pub fn new() -> Self` ## `pub fn new() -> Self`
@@ -38,15 +38,10 @@ Notes:
- Creates tenant, may throw sqlite uniqueness error on pubkey - Creates tenant, may throw sqlite uniqueness error on pubkey
- Logs activity as `(create_tenant, tenant_id)` - Logs activity as `(create_tenant, tenant_id)`
## `pub fn update_tenant_billing_anchor(&self, pubkey: &str, billing_anchor: i64) -> Result<()>` ## `pub fn update_tenant(&self, tenant: &Tenant) -> Result<()>`
- Updates the tenant's `billing_anchor` - Updates tenant
- Logs activity as `(update_tenant_billing_anchor, tenant_id)` - Logs activity as `(update_tenant, tenant_id)`
## `pub fn update_tenant_nwc_url(&self, pubkey: &str, nwc_url: &str) -> Result<()>`
- Updates tenant `nwc_url`
- Logs activity as `(update_tenant_nwc_url, tenant_id)`
## `pub fn list_plans() -> Vec<Plan>` ## `pub fn list_plans() -> Vec<Plan>`
@@ -101,59 +96,12 @@ Notes:
- Returns the maximum `created_at` value from the activity table, or 0 if empty - Returns the maximum `created_at` value from the activity table, or 0 if empty
- Used by infra to initialize the since guard on startup - Used by infra to initialize the since guard on startup
## `pub fn create_invoice(&self, invoice: &Invoice, invoice_items: [&InvoiceItem]) -> Result<()>`
- Saves an `invoice` row and related `invoice_item` rows
- Logs activity as `(create_invoice, invoice_id)`
## `pub fn list_invoices() -> Result<Vec<Invoice>>`
- Returns all invoices
## `pub fn list_invoices_for_tenant(tenant_id: &str) -> Result<Vec<Invoice>>`
- Returns all matching invoices
## `pub fn mark_invoice_paid(&self, invoice_id: &str) -> Result<()>`
- Sets invoice status to `paid`
- Sets `paid_at` to now
- Clears `error` if set
- Logs activity as `(mark_invoice_paid, invoice_id)`
## `pub fn mark_invoice_attempted(&self, invoice_id: &str, error: &str) -> Result<()>`
- Sets `attempted_at` to now
- Updates `error` if provided
- Leaves status as `pending`
- Logs activity as `(mark_invoice_attempted, invoice_id)`
## `pub fn mark_invoice_sent(&self, invoice_id: &str) -> Result<()>`
- Sets `sent_at` to now
- Leaves status as `pending`
- Logs activity as `(mark_invoice_sent, invoice_id)`
## `pub fn mark_invoice_closed(&self, invoice_id: &str) -> Result<()>`
- Sets invoice status to `closed`
- Sets `closed_at` to now
- Logs activity as `(mark_invoice_closed, invoice_id)`
## `pub fn list_activity(&self, since: &i64) -> Result<Vec<Activity>>` ## `pub fn list_activity(&self, since: &i64) -> Result<Vec<Activity>>`
- Returns all activity occuring after `since` - Returns all activity occuring after `since`
## `pub fn list_activity_for_tenant(&self, tenant: &str, since: &i64) -> Result<Vec<Activity>>`
- Returns all activity occuring after `since` matching `tenant`
## `pub fn list_activity_for_relay(&self, relay_id: &str) -> Result<Vec<Activity>>` ## `pub fn list_activity_for_relay(&self, relay_id: &str) -> Result<Vec<Activity>>`
- Returns all activity where `resource_type = 'relay'` and `resource_id = relay_id` - Returns all activity where `resource_type = 'relay'` and `resource_id = relay_id`
- Ordered newest-first - Ordered newest-first
## `pub fn get_relay_plan_sats(&self, plan: &str) -> Result<i64>`
- Returns the monthly sats amount for a given plan id
- Uses `list_plans()` data for consistent pricing logic across API and billing
+64 -435
View File
@@ -1,6 +1,6 @@
use std::sync::Arc; use std::sync::Arc;
use anyhow::{Result, anyhow}; use anyhow::anyhow;
use axum::{ use axum::{
Json, Router, Json, Router,
extract::{Path, State}, extract::{Path, State},
@@ -11,18 +11,17 @@ use axum::{
use base64::Engine; use base64::Engine;
use nostr_sdk::{Event, JsonUtil, Kind}; use nostr_sdk::{Event, JsonUtil, Kind};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use tower_http::cors::{AllowOrigin, CorsLayer};
use crate::billing::Billing;
use crate::models::{Relay, Tenant}; use crate::models::{Relay, Tenant};
use crate::repo::Repo; use crate::repo::Repo;
#[derive(Clone)] #[derive(Clone)]
pub struct Api { pub struct Api {
host: String, host: String,
port: u16,
admins: Vec<String>, admins: Vec<String>,
origins: Vec<String>,
repo: Repo, repo: Repo,
billing: Billing,
} }
#[derive(Clone)] #[derive(Clone)]
@@ -58,45 +57,25 @@ impl IntoResponse for ApiError {
} }
impl Api { impl Api {
pub fn new(repo: Repo) -> Self { pub fn new(repo: Repo, billing: Billing) -> Self {
let host = std::env::var("HOST").unwrap_or_else(|_| "127.0.0.1".to_string()); let host = std::env::var("HOST").unwrap_or_else(|_| "127.0.0.1".to_string());
let port = std::env::var("PORT")
.ok()
.and_then(|v| v.parse().ok())
.unwrap_or(3000);
let admins = std::env::var("ADMINS") let admins = std::env::var("ADMINS")
.unwrap_or_default() .unwrap_or_default()
.split(',') .split(',')
.map(|v| v.trim().to_lowercase()) .map(|v| v.trim().to_lowercase())
.filter(|v| !v.is_empty()) .filter(|v| !v.is_empty())
.collect(); .collect();
let origins = std::env::var("ALLOW_ORIGINS")
.unwrap_or_default()
.split(',')
.map(|v| v.trim().to_string())
.filter(|v| !v.is_empty())
.collect();
Self { Self {
host, host,
port,
admins, admins,
origins,
repo, repo,
billing,
} }
} }
pub async fn serve(&self) -> Result<()> { pub fn router(self) -> Router {
let app = self.router();
let listener =
tokio::net::TcpListener::bind(format!("{}:{}", self.host, self.port)).await?;
axum::serve(listener, app).await?;
Ok(())
}
fn router(&self) -> Router {
let state = AppState { let state = AppState {
api: Arc::new(self.clone()), api: Arc::new(self),
}; };
Router::new() Router::new()
@@ -104,31 +83,14 @@ impl Api {
.route("/plans", get(list_plans)) .route("/plans", get(list_plans))
.route("/plans/:id", get(get_plan)) .route("/plans/:id", get(get_plan))
.route("/tenants", get(list_tenants)) .route("/tenants", get(list_tenants))
.route("/tenants/:pubkey", get(get_tenant)) .route("/tenants/:pubkey", get(get_tenant).put(update_tenant))
.route("/tenants/:pubkey/relays", get(list_tenant_relays)) .route("/tenants/:pubkey/relays", get(list_tenant_relays))
.route("/tenants/:pubkey/invoices", get(list_tenant_invoices))
.route("/tenants/:pubkey/billing", put(update_tenant_billing))
.route("/relays", get(list_relays).post(create_relay)) .route("/relays", get(list_relays).post(create_relay))
.route("/relays/:id", get(get_relay).put(update_relay)) .route("/relays/:id", get(get_relay).put(update_relay))
.route("/relays/:id/activity", get(list_relay_activity)) .route("/relays/:id/activity", get(list_relay_activity))
.route("/relays/:id/deactivate", post(deactivate_relay)) .route("/relays/:id/deactivate", post(deactivate_relay))
.route("/invoices", get(list_invoices)) .route("/relays/:id/reactivate", post(reactivate_relay))
.route("/invoices/:id", get(get_invoice))
.with_state(state) .with_state(state)
.layer(self.cors_layer())
}
fn cors_layer(&self) -> CorsLayer {
if self.origins.is_empty() {
CorsLayer::permissive()
} else {
let origins = self
.origins
.iter()
.filter_map(|o| o.parse::<axum::http::HeaderValue>().ok())
.collect::<Vec<_>>();
CorsLayer::new().allow_origin(AllowOrigin::list(origins))
}
} }
fn extract_auth_pubkey(&self, headers: &HeaderMap) -> std::result::Result<String, ApiError> { fn extract_auth_pubkey(&self, headers: &HeaderMap) -> std::result::Result<String, ApiError> {
@@ -289,9 +251,9 @@ fn map_unique_error(err: &anyhow::Error) -> Option<&'static str> {
None None
} }
#[derive(Deserialize, Serialize)] #[derive(Deserialize)]
struct UpdateTenantBillingRequest { struct UpdateTenantRequest {
nwc_url: String, nwc_url: Option<String>,
} }
#[derive(Serialize)] #[derive(Serialize)]
@@ -364,7 +326,6 @@ async fn get_identity(
pubkey: pubkey.clone(), pubkey: pubkey.clone(),
nwc_url: String::new(), nwc_url: String::new(),
created_at: now_ts(), created_at: now_ts(),
billing_anchor: now_ts(),
}; };
match state.api.repo.create_tenant(&tenant).await { match state.api.repo.create_tenant(&tenant).await {
@@ -489,7 +450,7 @@ async fn list_relay_activity(
state.api.require_admin_or_tenant(&auth, &relay.tenant)?; state.api.require_admin_or_tenant(&auth, &relay.tenant)?;
match state.api.repo.list_activity_for_relay(&id).await { match state.api.repo.list_activity_for_relay(&id).await {
Ok(activity) => Ok(ok(StatusCode::OK, activity)), Ok(activity) => Ok(ok(StatusCode::OK, serde_json::json!({ "activity": activity }))),
Err(e) => Ok(err(StatusCode::INTERNAL_SERVER_ERROR, "internal", &e.to_string())), Err(e) => Ok(err(StatusCode::INTERNAL_SERVER_ERROR, "internal", &e.to_string())),
} }
} }
@@ -679,7 +640,15 @@ async fn deactivate_relay(
state.api.require_admin_or_tenant(&auth, &relay.tenant)?; state.api.require_admin_or_tenant(&auth, &relay.tenant)?;
match state.api.repo.deactivate_relay(&relay).await { if relay.status == "inactive" {
return Ok(err(
StatusCode::BAD_REQUEST,
"relay-is-inactive",
"relay is already inactive",
));
}
match state.api.billing.deactivate_relay(&id).await {
Ok(()) => Ok(ok(StatusCode::OK, ())), Ok(()) => Ok(ok(StatusCode::OK, ())),
Err(e) => Ok(err( Err(e) => Ok(err(
StatusCode::INTERNAL_SERVER_ERROR, StatusCode::INTERNAL_SERVER_ERROR,
@@ -689,51 +658,16 @@ async fn deactivate_relay(
} }
} }
async fn list_invoices( async fn reactivate_relay(
State(state): State<AppState>,
headers: HeaderMap,
) -> std::result::Result<Response, ApiError> {
let pubkey = state.api.extract_auth_pubkey(&headers)?;
state.api.require_admin(&pubkey)?;
match state.api.repo.list_invoices_with_items().await {
Ok(invoices) => Ok(ok(StatusCode::OK, invoices)),
Err(e) => Ok(err(
StatusCode::INTERNAL_SERVER_ERROR,
"internal",
&e.to_string(),
)),
}
}
async fn list_tenant_invoices(
State(state): State<AppState>,
headers: HeaderMap,
Path(pubkey): Path<String>,
) -> std::result::Result<Response, ApiError> {
let auth = state.api.extract_auth_pubkey(&headers)?;
state.api.require_admin_or_tenant(&auth, &pubkey)?;
match state.api.repo.list_invoices_for_tenant_with_items(&pubkey).await {
Ok(invoices) => Ok(ok(StatusCode::OK, invoices)),
Err(e) => Ok(err(
StatusCode::INTERNAL_SERVER_ERROR,
"internal",
&e.to_string(),
)),
}
}
async fn get_invoice(
State(state): State<AppState>, State(state): State<AppState>,
headers: HeaderMap, headers: HeaderMap,
Path(id): Path<String>, Path(id): Path<String>,
) -> std::result::Result<Response, ApiError> { ) -> std::result::Result<Response, ApiError> {
let auth = state.api.extract_auth_pubkey(&headers)?; let auth = state.api.extract_auth_pubkey(&headers)?;
let invoice = match state.api.repo.get_invoice_with_items(&id).await { let relay = match state.api.repo.get_relay(&id).await {
Ok(Some(i)) => i, Ok(Some(r)) => r,
Ok(None) => return Ok(err(StatusCode::NOT_FOUND, "not-found", "invoice not found")), Ok(None) => return Ok(err(StatusCode::NOT_FOUND, "not-found", "relay not found")),
Err(e) => { Err(e) => {
return Ok(err( return Ok(err(
StatusCode::INTERNAL_SERVER_ERROR, StatusCode::INTERNAL_SERVER_ERROR,
@@ -743,27 +677,18 @@ async fn get_invoice(
} }
}; };
state.api.require_admin_or_tenant(&auth, &invoice.invoice.tenant)?; state.api.require_admin_or_tenant(&auth, &relay.tenant)?;
Ok(ok(StatusCode::OK, invoice)) if relay.status == "active" {
} return Ok(err(
StatusCode::BAD_REQUEST,
"relay-is-active",
"relay is already active",
));
}
async fn update_tenant_billing( match state.api.billing.reactivate_relay(&id).await {
State(state): State<AppState>, Ok(()) => Ok(ok(StatusCode::OK, ())),
headers: HeaderMap,
Path(pubkey): Path<String>,
Json(payload): Json<UpdateTenantBillingRequest>,
) -> std::result::Result<Response, ApiError> {
let auth = state.api.extract_auth_pubkey(&headers)?;
state.api.require_admin_or_tenant(&auth, &pubkey)?;
match state
.api
.repo
.update_tenant_nwc_url(&pubkey, &payload.nwc_url)
.await
{
Ok(()) => Ok(ok(StatusCode::OK, payload)),
Err(e) => Ok(err( Err(e) => Ok(err(
StatusCode::INTERNAL_SERVER_ERROR, StatusCode::INTERNAL_SERVER_ERROR,
"internal", "internal",
@@ -772,334 +697,38 @@ async fn update_tenant_billing(
} }
} }
#[cfg(test)] async fn update_tenant(
mod tests { State(state): State<AppState>,
use std::str::FromStr; headers: HeaderMap,
Path(pubkey): Path<String>,
Json(payload): Json<UpdateTenantRequest>,
) -> std::result::Result<Response, ApiError> {
let auth = state.api.extract_auth_pubkey(&headers)?;
state.api.require_admin_or_tenant(&auth, &pubkey)?;
use axum::{ let mut tenant = match state.api.repo.get_tenant(&pubkey).await {
body::{Body, to_bytes}, Ok(Some(t)) => t,
http::{Request, StatusCode, header}, Ok(None) => return Ok(err(StatusCode::NOT_FOUND, "not-found", "tenant not found")),
Err(e) => {
return Ok(err(
StatusCode::INTERNAL_SERVER_ERROR,
"internal",
&e.to_string(),
));
}
}; };
use base64::Engine;
use nostr_sdk::{EventBuilder, JsonUtil, Keys, Kind, Tag};
use serde_json::{Value, json};
use sqlx::{SqlitePool, sqlite::{SqliteConnectOptions, SqlitePoolOptions}};
use tower::util::ServiceExt;
use crate::{models::{Relay, Tenant}, repo::Repo}; if let Some(nwc_url) = payload.nwc_url {
tenant.nwc_url = nwc_url;
use super::Api;
async fn test_repo() -> Repo {
let db_file = std::env::temp_dir().join(format!("caravel-api-test-{}.db", uuid::Uuid::new_v4()));
let database_url = format!("sqlite://{}", db_file.display());
let options = SqliteConnectOptions::from_str(&database_url)
.expect("sqlite options")
.create_if_missing(true);
let pool: SqlitePool = SqlitePoolOptions::new()
.max_connections(1)
.connect_with(options)
.await
.expect("connect sqlite");
sqlx::query("PRAGMA journal_mode = WAL;")
.execute(&pool)
.await
.expect("set WAL");
sqlx::migrate!("./migrations")
.run(&pool)
.await
.expect("run migrations");
Repo { pool }
} }
fn keys() -> (Keys, Keys, Keys) { match state.api.repo.update_tenant(&tenant).await {
(Keys::generate(), Keys::generate(), Keys::generate()) Ok(()) => Ok(ok(StatusCode::OK, tenant)),
} Err(e) => Ok(err(
StatusCode::INTERNAL_SERVER_ERROR,
fn pubkey_hex(keys: &Keys) -> String { "internal",
keys.public_key().to_hex() &e.to_string(),
} )),
fn auth_header(keys: &Keys, u: &str) -> String {
let tag = Tag::parse(["u", u]).expect("u tag");
let event = EventBuilder::new(Kind::HttpAuth, "").tags([tag])
.sign_with_keys(keys)
.expect("sign nip98 event");
let json = event.as_json();
let b64 = base64::engine::general_purpose::STANDARD.encode(json);
format!("Nostr {b64}")
}
fn make_api(repo: Repo, admin_pubkey: String) -> Api {
Api {
host: "api.test".to_string(),
port: 0,
admins: vec![admin_pubkey],
origins: vec![],
repo,
}
}
async fn request(
api: &Api,
method: &str,
path: &str,
auth: Option<String>,
body: Option<Value>,
) -> (StatusCode, Value) {
let mut builder = Request::builder().method(method).uri(path);
if let Some(auth) = auth {
builder = builder.header(header::AUTHORIZATION, auth);
}
let req = if let Some(body) = body {
builder
.header(header::CONTENT_TYPE, "application/json")
.body(Body::from(body.to_string()))
.expect("request")
} else {
builder.body(Body::empty()).expect("request")
};
let resp = api.router().oneshot(req).await.expect("router response");
let status = resp.status();
let bytes = to_bytes(resp.into_body(), usize::MAX)
.await
.expect("read body");
let payload: Value = serde_json::from_slice(&bytes).expect("json body");
(status, payload)
}
async fn create_tenant(repo: &Repo, pubkey: String) {
let tenant = Tenant {
pubkey,
nwc_url: String::new(),
created_at: 1,
billing_anchor: 1,
};
repo.create_tenant(&tenant).await.expect("create tenant");
}
async fn create_relay(repo: &Repo, id: &str, tenant: &str, subdomain: &str) {
let relay = Relay {
id: id.to_string(),
tenant: tenant.to_string(),
schema: format!("{}_{}", subdomain.replace('-', "_"), id),
subdomain: subdomain.to_string(),
plan: "free".to_string(),
status: "new".to_string(),
sync_error: String::new(),
info_name: String::new(),
info_icon: String::new(),
info_description: String::new(),
policy_public_join: 0,
policy_strip_signatures: 0,
groups_enabled: 1,
management_enabled: 1,
blossom_enabled: 0,
livekit_enabled: 0,
push_enabled: 1,
synced: 0,
};
repo.create_relay(&relay).await.expect("create relay");
}
#[tokio::test]
async fn missing_auth_returns_unauthorized() {
let repo = test_repo().await;
let (admin_keys, _, _) = keys();
let api = make_api(repo, pubkey_hex(&admin_keys));
let (status, body) = request(&api, "GET", "/plans", None, None).await;
assert_eq!(status, StatusCode::UNAUTHORIZED);
assert_eq!(body["code"], "unauthorized");
}
#[tokio::test]
async fn plans_endpoints_return_ok_and_not_found() {
let repo = test_repo().await;
let (admin_keys, _, _) = keys();
let api = make_api(repo, pubkey_hex(&admin_keys));
let auth = auth_header(&admin_keys, "https://api.test");
let (status, body) = request(&api, "GET", "/plans", Some(auth.clone()), None).await;
assert_eq!(status, StatusCode::OK);
assert_eq!(body["code"], "ok");
assert!(body["data"].as_array().expect("plans array").len() >= 3);
let (status, body) = request(&api, "GET", "/plans/does-not-exist", Some(auth), None).await;
assert_eq!(status, StatusCode::NOT_FOUND);
assert_eq!(body["code"], "not-found");
}
#[tokio::test]
async fn tenants_list_is_admin_only() {
let repo = test_repo().await;
let (admin_keys, tenant_keys, _) = keys();
let api = make_api(repo, pubkey_hex(&admin_keys));
let auth = auth_header(&tenant_keys, "https://api.test");
let (status, body) = request(&api, "GET", "/tenants", Some(auth), None).await;
assert_eq!(status, StatusCode::FORBIDDEN);
assert_eq!(body["code"], "forbidden");
}
#[tokio::test]
async fn identity_creates_tenant_if_missing() {
let repo = test_repo().await;
let (admin_keys, tenant_keys, _) = keys();
let api = make_api(repo.clone(), pubkey_hex(&admin_keys));
let auth = auth_header(&tenant_keys, "https://api.test");
let tenant_pubkey = pubkey_hex(&tenant_keys);
let (status, body) = request(&api, "GET", "/identity", Some(auth), None).await;
assert_eq!(status, StatusCode::OK);
assert_eq!(body["code"], "ok");
assert_eq!(body["data"]["pubkey"], tenant_pubkey);
assert_eq!(body["data"]["is_admin"], false);
let tenant = repo
.get_tenant(&tenant_pubkey)
.await
.expect("lookup tenant after identity");
assert!(tenant.is_some());
}
#[tokio::test]
async fn tenant_get_allows_owner_and_denies_other_tenant() {
let repo = test_repo().await;
let (admin_keys, tenant_a_keys, tenant_b_keys) = keys();
create_tenant(&repo, pubkey_hex(&tenant_a_keys)).await;
let api = make_api(repo, pubkey_hex(&admin_keys));
let owner_auth = auth_header(&tenant_a_keys, "https://api.test");
let (status, body) = request(
&api,
"GET",
&format!("/tenants/{}", pubkey_hex(&tenant_a_keys)),
Some(owner_auth),
None,
)
.await;
assert_eq!(status, StatusCode::OK);
assert_eq!(body["code"], "ok");
let other_auth = auth_header(&tenant_b_keys, "https://api.test");
let (status, body) = request(
&api,
"GET",
&format!("/tenants/{}", pubkey_hex(&tenant_a_keys)),
Some(other_auth),
None,
)
.await;
assert_eq!(status, StatusCode::FORBIDDEN);
assert_eq!(body["code"], "forbidden");
}
#[tokio::test]
async fn create_relay_validates_premium_feature_and_subdomain() {
let repo = test_repo().await;
let (admin_keys, tenant_keys, _) = keys();
create_tenant(&repo, pubkey_hex(&tenant_keys)).await;
let api = make_api(repo, pubkey_hex(&admin_keys));
let auth = auth_header(&tenant_keys, "https://api.test");
let (status, body) = request(
&api,
"POST",
"/relays",
Some(auth.clone()),
Some(json!({
"tenant": pubkey_hex(&tenant_keys),
"subdomain": "bad_subdomain",
"plan": "free"
})),
)
.await;
assert_eq!(status, StatusCode::UNPROCESSABLE_ENTITY);
assert_eq!(body["code"], "invalid-relay");
let (status, body) = request(
&api,
"POST",
"/relays",
Some(auth),
Some(json!({
"tenant": pubkey_hex(&tenant_keys),
"subdomain": "good-subdomain",
"plan": "free",
"blossom_enabled": 1
})),
)
.await;
assert_eq!(status, StatusCode::UNPROCESSABLE_ENTITY);
assert_eq!(body["code"], "premium-feature");
}
#[tokio::test]
async fn relay_get_honors_owner_and_duplicate_subdomain_is_422() {
let repo = test_repo().await;
let (admin_keys, tenant_a_keys, tenant_b_keys) = keys();
let tenant_a = pubkey_hex(&tenant_a_keys);
create_tenant(&repo, tenant_a.clone()).await;
create_tenant(&repo, pubkey_hex(&tenant_b_keys)).await;
create_relay(&repo, "relay-a", &tenant_a, "alpha").await;
let api = make_api(repo.clone(), pubkey_hex(&admin_keys));
let owner_auth = auth_header(&tenant_a_keys, "https://api.test");
let (status, body) = request(&api, "GET", "/relays/relay-a", Some(owner_auth), None).await;
assert_eq!(status, StatusCode::OK);
assert_eq!(body["code"], "ok");
let other_auth = auth_header(&tenant_b_keys, "https://api.test");
let (status, body) = request(&api, "GET", "/relays/relay-a", Some(other_auth), None).await;
assert_eq!(status, StatusCode::FORBIDDEN);
assert_eq!(body["code"], "forbidden");
let (status, body) = request(
&api,
"POST",
"/relays",
Some(auth_header(&tenant_a_keys, "https://api.test")),
Some(json!({
"tenant": tenant_a,
"subdomain": "alpha",
"plan": "free"
})),
)
.await;
assert_eq!(status, StatusCode::UNPROCESSABLE_ENTITY);
assert_eq!(body["code"], "subdomain-exists");
}
#[tokio::test]
async fn update_tenant_billing_allows_self() {
let repo = test_repo().await;
let (admin_keys, tenant_keys, _) = keys();
let tenant = pubkey_hex(&tenant_keys);
create_tenant(&repo, tenant.clone()).await;
let api = make_api(repo.clone(), pubkey_hex(&admin_keys));
let auth = auth_header(&tenant_keys, "https://api.test");
let (status, body) = request(
&api,
"PUT",
&format!("/tenants/{tenant}/billing"),
Some(auth),
Some(json!({"nwc_url":"nostr+walletconnect://example"})),
)
.await;
assert_eq!(status, StatusCode::OK);
assert_eq!(body["code"], "ok");
assert_eq!(body["data"]["nwc_url"], "nostr+walletconnect://example");
} }
} }
+12 -378
View File
@@ -1,10 +1,5 @@
use std::collections::HashMap;
use anyhow::Result; use anyhow::Result;
use chrono::{DateTime, Datelike, Duration, Months, TimeZone, Utc};
use tokio::sync::Mutex;
use crate::models::{Activity, Invoice, InvoiceItem, Relay, Tenant};
use crate::repo::Repo; use crate::repo::Repo;
use crate::robot::Robot; use crate::robot::Robot;
@@ -13,7 +8,6 @@ pub struct Billing {
nwc_url: String, nwc_url: String,
repo: Repo, repo: Repo,
robot: Robot, robot: Robot,
last_activity_at: std::sync::Arc<Mutex<i64>>,
} }
impl Billing { impl Billing {
@@ -23,384 +17,24 @@ impl Billing {
nwc_url, nwc_url,
repo, repo,
robot, robot,
last_activity_at: std::sync::Arc::new(Mutex::new(0)),
} }
} }
pub async fn start(self) { pub async fn deactivate_relay(&self, relay_id: &str) -> Result<()> {
let mut interval = tokio::time::interval(std::time::Duration::from_secs(3600)); let relay = self
loop {
interval.tick().await;
if let Err(e) = self.tick().await {
tracing::error!(error = %e, "billing tick failed");
}
}
}
pub async fn tick(&self) -> Result<()> {
let mut since_guard = self.last_activity_at.lock().await;
let since = *since_guard;
let activity = self.repo.list_activity(&since).await?;
for a in &activity {
if matches!(
a.activity_type.as_str(),
"create_relay" | "update_relay" | "activate_relay"
) {
self.maybe_reset_anchor_for_first_paid_relay(a).await?;
}
*since_guard = (*since_guard).max(a.created_at);
}
drop(since_guard);
let tenants = self.repo.list_tenants().await?;
for tenant in &tenants {
self.generate_invoice_if_due(tenant).await?;
self.collect_outstanding(tenant).await?;
}
Ok(())
}
async fn maybe_reset_anchor_for_first_paid_relay(&self, activity: &Activity) -> Result<()> {
let relay = match self.repo.get_relay(&activity.resource_id).await? {
Some(r) => r,
None => return Ok(()),
};
if relay.plan == "free" {
return Ok(());
}
let relays = self.repo.list_relays_for_tenant(&relay.tenant).await?;
let paid_active_count = relays
.into_iter()
.filter(|r| r.status == "active" && r.plan != "free")
.count() as i64;
if paid_active_count == 1 {
self.repo
.update_tenant_billing_anchor(&relay.tenant, activity.created_at)
.await?;
}
Ok(())
}
async fn generate_invoice_if_due(&self, tenant: &Tenant) -> Result<()> {
if self
.repo .repo
.total_pending_invoices_for_tenant(&tenant.pubkey) .get_relay(relay_id)
.await? .await?
> 0 .ok_or_else(|| anyhow::anyhow!("relay not found"))?;
{ self.repo.deactivate_relay(&relay).await
return Ok(()); }
}
let relays = self.repo.list_relays_for_tenant(&tenant.pubkey).await?; pub async fn reactivate_relay(&self, relay_id: &str) -> Result<()> {
let active_paid_relays: Vec<Relay> = relays let relay = self
.iter()
.filter(|r| r.status == "active" && r.plan != "free")
.cloned()
.collect();
if active_paid_relays.is_empty() {
return Ok(());
}
let now = Utc::now();
let anchor = ts_to_dt(tenant.billing_anchor)?;
let (period_start, period_end) = billing_window(anchor, now);
if now < period_end {
return Ok(());
}
let usage_events = self
.repo .repo
.list_activity_for_tenant(&tenant.pubkey, &tenant.billing_anchor) .get_relay(relay_id)
.await?; .await?
let invoice_id = uuid::Uuid::new_v4().to_string(); .ok_or_else(|| anyhow::anyhow!("relay not found"))?;
let mut items = Vec::new(); self.repo.activate_relay(&relay).await
for relay in active_paid_relays {
let hours =
relay_active_hours_in_window(&relay, &usage_events, period_start, period_end);
if hours <= 0 {
continue;
}
let plan_monthly = self.repo.get_relay_plan_sats(&relay.plan).await?;
if plan_monthly <= 0 {
continue;
}
let sats = ((plan_monthly as f64 / 30.0 / 24.0) * hours as f64).ceil() as i64;
if sats <= 0 {
continue;
}
items.push(InvoiceItem {
id: uuid::Uuid::new_v4().to_string(),
invoice: invoice_id.clone(),
relay: relay.id,
sats,
});
}
let total: i64 = items.iter().map(|i| i.sats).sum();
if total == 0 {
return Ok(());
}
let bolt11 = match self.make_bolt11(total).await {
Ok(v) => v,
Err(e) => {
tracing::error!(tenant = %tenant.pubkey, error = %e, "bolt11 generation failed");
return Ok(());
}
};
let invoice = Invoice {
id: invoice_id,
tenant: tenant.pubkey.clone(),
status: "pending".to_string(),
created_at: now.timestamp(),
attempted_at: 0,
error: String::new(),
closed_at: 0,
sent_at: 0,
paid_at: 0,
bolt11,
period_start: period_start.timestamp(),
period_end: period_end.timestamp(),
};
self.repo.create_invoice(&invoice, &items).await?;
Ok(())
}
async fn collect_outstanding(&self, tenant: &Tenant) -> Result<()> {
let invoices = self.repo.list_invoices_for_tenant(&tenant.pubkey).await?;
let now = now_ts();
for invoice in invoices.into_iter().filter(|i| i.status == "pending") {
if invoice.attempted_at > 0 && now - invoice.attempted_at < 24 * 3600 {
continue;
}
if self.is_bolt11_paid(&invoice.bolt11).await {
self.repo.mark_invoice_paid(&invoice.id).await?;
continue;
}
let mut collected = false;
if !tenant.nwc_url.trim().is_empty()
&& self.pay_invoice_nwc(&tenant.nwc_url, &invoice.bolt11).await
{
self.repo.mark_invoice_paid(&invoice.id).await?;
collected = true;
}
if !collected {
self.repo
.mark_invoice_attempted(&invoice.id, Some("autopay failed or unavailable"))
.await?;
if invoice.sent_at == 0 {
let amount: i64 = self
.repo
.get_invoice_items(&invoice.id)
.await?
.into_iter()
.map(|i| i.sats)
.sum();
let message = format!(
"Invoice {} is due. Amount: {} sats\n{}",
invoice.id, amount, invoice.bolt11
);
if self.robot.send_dm(&tenant.pubkey, &message).await.is_ok() {
self.repo.mark_invoice_sent(&invoice.id).await?;
}
}
}
if now - invoice.created_at >= 7 * 24 * 3600 {
self.repo.mark_invoice_closed(&invoice.id).await?;
}
}
Ok(())
}
async fn make_bolt11(&self, sats: i64) -> Result<String> {
if self.nwc_url.trim().is_empty() {
anyhow::bail!("NWC_URL not configured")
}
let uri = nostr_sdk::nips::nip47::NostrWalletConnectURI::parse(&self.nwc_url)?;
let req = nostr_sdk::nips::nip47::Request::make_invoice(
nostr_sdk::nips::nip47::MakeInvoiceRequest {
amount: (sats as u64) * 1_000,
description: Some("Caravel relay invoice".to_string()),
description_hash: None,
expiry: None,
},
);
let resp = self.send_nwc_request(&uri, req).await?;
Ok(resp.to_make_invoice()?.invoice)
}
async fn is_bolt11_paid(&self, _bolt11: &str) -> bool {
false
}
async fn pay_invoice_nwc(&self, nwc_url: &str, bolt11: &str) -> bool {
let uri = match nostr_sdk::nips::nip47::NostrWalletConnectURI::parse(nwc_url) {
Ok(v) => v,
Err(_) => return false,
};
let req = nostr_sdk::nips::nip47::Request::pay_invoice(
nostr_sdk::nips::nip47::PayInvoiceRequest::new(bolt11),
);
self.send_nwc_request(&uri, req)
.await
.and_then(|r| r.to_pay_invoice().map(|_| ()).map_err(anyhow::Error::from))
.is_ok()
}
async fn send_nwc_request(
&self,
uri: &nostr_sdk::nips::nip47::NostrWalletConnectURI,
request: nostr_sdk::nips::nip47::Request,
) -> Result<nostr_sdk::nips::nip47::Response> {
use nostr_sdk::{Client, Filter, Keys, Kind, Timestamp};
let app_keys = Keys::new(uri.secret.clone());
let app_pubkey = app_keys.public_key();
let client = Client::new(app_keys);
client.add_relay(uri.relay_url.clone()).await?;
client.connect().await;
let started_at = Timestamp::now();
let event = request.to_event(uri)?;
client.send_event(event).await?;
let filter = Filter::new()
.kind(Kind::WalletConnectResponse)
.author(uri.public_key)
.pubkey(app_pubkey)
.since(started_at);
let events = client
.fetch_events(filter, std::time::Duration::from_secs(10))
.await?;
let event = events
.into_iter()
.max_by_key(|e| e.created_at)
.ok_or_else(|| anyhow::anyhow!("no NWC response received"))?;
Ok(nostr_sdk::nips::nip47::Response::from_event(uri, &event)?)
} }
} }
fn now_ts() -> i64 {
Utc::now().timestamp()
}
fn ts_to_dt(ts: i64) -> Result<DateTime<Utc>> {
Utc.timestamp_opt(ts, 0)
.single()
.ok_or_else(|| anyhow::anyhow!("invalid unix timestamp"))
}
fn billing_window(anchor: DateTime<Utc>, now: DateTime<Utc>) -> (DateTime<Utc>, DateTime<Utc>) {
let mut start = anchor;
loop {
let end = start + Months::new(1);
if now < end {
return (start, end);
}
start = end;
}
}
fn relay_active_hours_in_window(
relay: &Relay,
events: &[Activity],
start: DateTime<Utc>,
end: DateTime<Utc>,
) -> i64 {
if relay.plan == "free" {
return 0;
}
let mut marks: HashMap<&str, Vec<&Activity>> = HashMap::new();
for event in events {
if event.resource_type == "relay" && event.resource_id == relay.id {
marks.entry(&relay.id).or_default().push(event);
}
}
let Some(entries) = marks.get(relay.id.as_str()) else {
if relay.status == "active" {
return ((end - start).num_seconds() as f64 / 3600.0).ceil() as i64;
}
return 0;
};
let mut active = relay.status == "active";
let mut cursor = start;
let mut secs = 0i64;
for event in entries.iter().copied() {
let ts = match Utc.timestamp_opt(event.created_at, 0).single() {
Some(v) => v,
None => continue,
};
if ts <= start || ts >= end {
continue;
}
match event.activity_type.as_str() {
"create_relay" | "activate_relay" => {
if !active {
active = true;
cursor = ts;
}
}
"deactivate_relay" | "fail_relay_sync" => {
if active {
active = false;
secs += (ts - cursor).num_seconds().max(0);
}
}
_ => {}
}
}
if active {
secs += (end - cursor).num_seconds().max(0);
}
let hours = (secs as f64 / 3600.0).ceil() as i64;
if hours > 0 { hours } else { 0 }
}
#[allow(dead_code)]
fn _same_month(a: DateTime<Utc>, b: DateTime<Utc>) -> bool {
a.year() == b.year() && a.month() == b.month()
}
#[allow(dead_code)]
fn _days_between(a: i64, b: i64) -> i64 {
let da = Utc.timestamp_opt(a, 0).single().unwrap_or_else(Utc::now);
let db = Utc.timestamp_opt(b, 0).single().unwrap_or_else(Utc::now);
(db - da).num_days()
}
#[allow(dead_code)]
fn _hours_between(a: DateTime<Utc>, b: DateTime<Utc>) -> i64 {
((b - a).num_seconds() as f64 / 3600.0).ceil() as i64
}
#[allow(dead_code)]
fn _next_day(dt: DateTime<Utc>) -> DateTime<Utc> {
dt + Duration::days(1)
}
+29 -5
View File
@@ -8,6 +8,8 @@ mod robot;
use anyhow::Result; use anyhow::Result;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
use tower_http::cors::{AllowOrigin, CorsLayer};
use crate::api::Api; use crate::api::Api;
use crate::billing::Billing; use crate::billing::Billing;
use crate::infra::Infra; use crate::infra::Infra;
@@ -27,15 +29,37 @@ async fn main() -> Result<()> {
let robot = Robot::new().await?; let robot = Robot::new().await?;
let billing = Billing::new(repo.clone(), robot.clone()); let billing = Billing::new(repo.clone(), robot.clone());
let infra = Infra::new(repo.clone()); let infra = Infra::new(repo.clone());
let api = Api::new(repo); let api = Api::new(repo, billing.clone());
tokio::spawn(async move { let host = std::env::var("HOST").unwrap_or_else(|_| "127.0.0.1".to_string());
billing.start().await; let port: u16 = std::env::var("PORT")
}); .ok()
.and_then(|v| v.parse().ok())
.unwrap_or(3000);
let origins: Vec<String> = std::env::var("ALLOW_ORIGINS")
.unwrap_or_default()
.split(',')
.map(|v| v.trim().to_string())
.filter(|v| !v.is_empty())
.collect();
let cors = if origins.is_empty() {
CorsLayer::permissive()
} else {
let parsed = origins
.iter()
.filter_map(|o| o.parse::<axum::http::HeaderValue>().ok())
.collect::<Vec<_>>();
CorsLayer::new().allow_origin(AllowOrigin::list(parsed))
};
let app = api.router().layer(cors);
tokio::spawn(async move { tokio::spawn(async move {
infra.start().await; infra.start().await;
}); });
api.serve().await let listener = tokio::net::TcpListener::bind(format!("{host}:{port}")).await?;
axum::serve(listener, app).await?;
Ok(())
} }
-33
View File
@@ -26,7 +26,6 @@ pub struct Tenant {
#[serde(skip_serializing)] #[serde(skip_serializing)]
pub nwc_url: String, pub nwc_url: String,
pub created_at: i64, pub created_at: i64,
pub billing_anchor: i64,
} }
#[derive(Debug, Clone, Serialize, Deserialize, sqlx::FromRow)] #[derive(Debug, Clone, Serialize, Deserialize, sqlx::FromRow)]
@@ -48,37 +47,5 @@ pub struct Relay {
pub blossom_enabled: i64, pub blossom_enabled: i64,
pub livekit_enabled: i64, pub livekit_enabled: i64,
pub push_enabled: i64, pub push_enabled: i64,
#[serde(skip_serializing)]
pub synced: i64, pub synced: i64,
} }
#[derive(Debug, Clone, Serialize, Deserialize, sqlx::FromRow)]
pub struct Invoice {
pub id: String,
pub tenant: String,
pub status: String,
pub created_at: i64,
pub attempted_at: i64,
pub error: String,
pub closed_at: i64,
pub sent_at: i64,
pub paid_at: i64,
pub bolt11: String,
pub period_start: i64,
pub period_end: i64,
}
#[derive(Debug, Clone, Serialize, Deserialize, sqlx::FromRow)]
pub struct InvoiceItem {
pub id: String,
pub invoice: String,
pub relay: String,
pub sats: i64,
}
#[derive(Debug, Clone, Serialize)]
pub struct InvoiceWithItems {
#[serde(flatten)]
pub invoice: Invoice,
pub items: Vec<InvoiceItem>,
}
+10 -281
View File
@@ -7,7 +7,7 @@ use sqlx::{
sqlite::{SqliteConnectOptions, SqlitePoolOptions}, sqlite::{SqliteConnectOptions, SqlitePoolOptions},
}; };
use crate::models::{Activity, Invoice, InvoiceItem, InvoiceWithItems, Plan, Relay, Tenant}; use crate::models::{Activity, Plan, Relay, Tenant};
#[derive(Clone)] #[derive(Clone)]
pub struct Repo { pub struct Repo {
@@ -60,12 +60,6 @@ impl Repo {
.fetch_one(&mut **tx) .fetch_one(&mut **tx)
.await? .await?
} }
"invoice" => {
sqlx::query_scalar::<_, String>("SELECT tenant FROM invoice WHERE id = ?")
.bind(resource_id)
.fetch_one(&mut **tx)
.await?
}
_ => anyhow::bail!("unknown resource_type: {}", resource_type), _ => anyhow::bail!("unknown resource_type: {}", resource_type),
}; };
@@ -85,7 +79,7 @@ impl Repo {
pub async fn list_tenants(&self) -> Result<Vec<Tenant>> { pub async fn list_tenants(&self) -> Result<Vec<Tenant>> {
let rows = sqlx::query_as::<_, Tenant>( let rows = sqlx::query_as::<_, Tenant>(
"SELECT pubkey, nwc_url, created_at, billing_anchor "SELECT pubkey, nwc_url, created_at
FROM tenant FROM tenant
ORDER BY pubkey", ORDER BY pubkey",
) )
@@ -96,7 +90,7 @@ impl Repo {
pub async fn get_tenant(&self, pubkey: &str) -> Result<Option<Tenant>> { pub async fn get_tenant(&self, pubkey: &str) -> Result<Option<Tenant>> {
let row = sqlx::query_as::<_, Tenant>( let row = sqlx::query_as::<_, Tenant>(
"SELECT pubkey, nwc_url, created_at, billing_anchor "SELECT pubkey, nwc_url, created_at
FROM tenant FROM tenant
WHERE pubkey = ?", WHERE pubkey = ?",
) )
@@ -110,13 +104,12 @@ impl Repo {
let mut tx = self.pool.begin().await?; let mut tx = self.pool.begin().await?;
sqlx::query( sqlx::query(
"INSERT INTO tenant (pubkey, nwc_url, created_at, billing_anchor) "INSERT INTO tenant (pubkey, nwc_url, created_at)
VALUES (?, ?, ?, ?)", VALUES (?, ?, ?)",
) )
.bind(&tenant.pubkey) .bind(&tenant.pubkey)
.bind(&tenant.nwc_url) .bind(&tenant.nwc_url)
.bind(tenant.created_at) .bind(tenant.created_at)
.bind(tenant.billing_anchor)
.execute(&mut *tx) .execute(&mut *tx)
.await?; .await?;
@@ -126,35 +119,16 @@ impl Repo {
Ok(()) Ok(())
} }
pub async fn update_tenant_billing_anchor( pub async fn update_tenant(&self, tenant: &Tenant) -> Result<()> {
&self,
pubkey: &str,
billing_anchor: i64,
) -> Result<()> {
let mut tx = self.pool.begin().await?;
sqlx::query("UPDATE tenant SET billing_anchor = ? WHERE pubkey = ?")
.bind(billing_anchor)
.bind(pubkey)
.execute(&mut *tx)
.await?;
Self::insert_activity(&mut tx, "update_tenant_billing_anchor", "tenant", pubkey).await?;
tx.commit().await?;
Ok(())
}
pub async fn update_tenant_nwc_url(&self, pubkey: &str, nwc_url: &str) -> Result<()> {
let mut tx = self.pool.begin().await?; let mut tx = self.pool.begin().await?;
sqlx::query("UPDATE tenant SET nwc_url = ? WHERE pubkey = ?") sqlx::query("UPDATE tenant SET nwc_url = ? WHERE pubkey = ?")
.bind(nwc_url) .bind(&tenant.nwc_url)
.bind(pubkey) .bind(&tenant.pubkey)
.execute(&mut *tx) .execute(&mut *tx)
.await?; .await?;
Self::insert_activity(&mut tx, "update_tenant_nwc_url", "tenant", pubkey).await?; Self::insert_activity(&mut tx, "update_tenant", "tenant", &tenant.pubkey).await?;
tx.commit().await?; tx.commit().await?;
Ok(()) Ok(())
@@ -313,172 +287,10 @@ impl Repo {
} }
pub async fn mark_relay_synced(&self, relay_id: &str) -> Result<()> { pub async fn mark_relay_synced(&self, relay_id: &str) -> Result<()> {
let mut tx = self.pool.begin().await?;
sqlx::query("UPDATE relay SET synced = 1, status = 'active', sync_error = '' WHERE id = ?") sqlx::query("UPDATE relay SET synced = 1, status = 'active', sync_error = '' WHERE id = ?")
.bind(relay_id) .bind(relay_id)
.execute(&mut *tx) .execute(&self.pool)
.await?; .await?;
Self::insert_activity(&mut tx, "mark_relay_synced", "relay", relay_id).await?;
tx.commit().await?;
Ok(())
}
pub async fn create_invoice(
&self,
invoice: &Invoice,
invoice_items: &[InvoiceItem],
) -> Result<()> {
let mut tx = self.pool.begin().await?;
sqlx::query(
"INSERT INTO invoice (
id, tenant, status, created_at, attempted_at, error, closed_at,
sent_at, paid_at, bolt11, period_start, period_end
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
)
.bind(&invoice.id)
.bind(&invoice.tenant)
.bind(&invoice.status)
.bind(invoice.created_at)
.bind(invoice.attempted_at)
.bind(&invoice.error)
.bind(invoice.closed_at)
.bind(invoice.sent_at)
.bind(invoice.paid_at)
.bind(&invoice.bolt11)
.bind(invoice.period_start)
.bind(invoice.period_end)
.execute(&mut *tx)
.await?;
for item in invoice_items {
sqlx::query("INSERT INTO invoice_item (id, invoice, relay, sats) VALUES (?, ?, ?, ?)")
.bind(&item.id)
.bind(&item.invoice)
.bind(&item.relay)
.bind(item.sats)
.execute(&mut *tx)
.await?;
}
Self::insert_activity(&mut tx, "create_invoice", "invoice", &invoice.id).await?;
tx.commit().await?;
Ok(())
}
pub async fn list_invoices(&self) -> Result<Vec<Invoice>> {
let rows = sqlx::query_as::<_, Invoice>(
"SELECT id, tenant, status, created_at, attempted_at, error, closed_at,
sent_at, paid_at, bolt11, period_start, period_end
FROM invoice
ORDER BY created_at DESC",
)
.fetch_all(&self.pool)
.await?;
Ok(rows)
}
pub async fn list_invoices_for_tenant(&self, tenant_id: &str) -> Result<Vec<Invoice>> {
let rows = sqlx::query_as::<_, Invoice>(
"SELECT id, tenant, status, created_at, attempted_at, error, closed_at,
sent_at, paid_at, bolt11, period_start, period_end
FROM invoice
WHERE tenant = ?
ORDER BY created_at DESC",
)
.bind(tenant_id)
.fetch_all(&self.pool)
.await?;
Ok(rows)
}
pub async fn get_invoice(&self, id: &str) -> Result<Option<Invoice>> {
let row = sqlx::query_as::<_, Invoice>(
"SELECT id, tenant, status, created_at, attempted_at, error, closed_at,
sent_at, paid_at, bolt11, period_start, period_end
FROM invoice
WHERE id = ?",
)
.bind(id)
.fetch_optional(&self.pool)
.await?;
Ok(row)
}
pub async fn mark_invoice_paid(&self, invoice_id: &str) -> Result<()> {
let mut tx = self.pool.begin().await?;
sqlx::query(
"UPDATE invoice
SET status = 'paid', paid_at = strftime('%s','now'), error = ''
WHERE id = ?",
)
.bind(invoice_id)
.execute(&mut *tx)
.await?;
Self::insert_activity(&mut tx, "mark_invoice_paid", "invoice", invoice_id).await?;
tx.commit().await?;
Ok(())
}
pub async fn mark_invoice_attempted(
&self,
invoice_id: &str,
error: Option<&str>,
) -> Result<()> {
let mut tx = self.pool.begin().await?;
sqlx::query(
"UPDATE invoice
SET attempted_at = strftime('%s','now'), error = COALESCE(?, error)
WHERE id = ?",
)
.bind(error)
.bind(invoice_id)
.execute(&mut *tx)
.await?;
Self::insert_activity(&mut tx, "mark_invoice_attempted", "invoice", invoice_id).await?;
tx.commit().await?;
Ok(())
}
pub async fn mark_invoice_sent(&self, invoice_id: &str) -> Result<()> {
let mut tx = self.pool.begin().await?;
sqlx::query("UPDATE invoice SET sent_at = strftime('%s','now') WHERE id = ?")
.bind(invoice_id)
.execute(&mut *tx)
.await?;
Self::insert_activity(&mut tx, "mark_invoice_sent", "invoice", invoice_id).await?;
tx.commit().await?;
Ok(())
}
pub async fn mark_invoice_closed(&self, invoice_id: &str) -> Result<()> {
let mut tx = self.pool.begin().await?;
sqlx::query(
"UPDATE invoice
SET status = 'closed', closed_at = strftime('%s','now')
WHERE id = ?",
)
.bind(invoice_id)
.execute(&mut *tx)
.await?;
Self::insert_activity(&mut tx, "mark_invoice_closed", "invoice", invoice_id).await?;
tx.commit().await?;
Ok(()) Ok(())
} }
@@ -504,24 +316,6 @@ impl Repo {
Ok(rows) Ok(rows)
} }
pub async fn list_activity_for_tenant(
&self,
tenant: &str,
since: &i64,
) -> Result<Vec<Activity>> {
let rows = sqlx::query_as::<_, Activity>(
"SELECT id, tenant, created_at, activity_type, resource_type, resource_id
FROM activity
WHERE created_at > ? AND tenant = ?
ORDER BY created_at, id",
)
.bind(since)
.bind(tenant)
.fetch_all(&self.pool)
.await?;
Ok(rows)
}
pub async fn list_activity_for_relay(&self, relay_id: &str) -> Result<Vec<Activity>> { pub async fn list_activity_for_relay(&self, relay_id: &str) -> Result<Vec<Activity>> {
let rows = sqlx::query_as::<_, Activity>( let rows = sqlx::query_as::<_, Activity>(
"SELECT id, tenant, created_at, activity_type, resource_type, resource_id "SELECT id, tenant, created_at, activity_type, resource_type, resource_id
@@ -535,71 +329,6 @@ impl Repo {
Ok(rows) Ok(rows)
} }
pub async fn list_invoices_with_items(&self) -> Result<Vec<InvoiceWithItems>> {
let invoices = self.list_invoices().await?;
let mut result = Vec::with_capacity(invoices.len());
for invoice in invoices {
let items = self.get_invoice_items(&invoice.id).await?;
result.push(InvoiceWithItems { invoice, items });
}
Ok(result)
}
pub async fn list_invoices_for_tenant_with_items(
&self,
tenant_id: &str,
) -> Result<Vec<InvoiceWithItems>> {
let invoices = self.list_invoices_for_tenant(tenant_id).await?;
let mut result = Vec::with_capacity(invoices.len());
for invoice in invoices {
let items = self.get_invoice_items(&invoice.id).await?;
result.push(InvoiceWithItems { invoice, items });
}
Ok(result)
}
pub async fn get_invoice_with_items(&self, id: &str) -> Result<Option<InvoiceWithItems>> {
match self.get_invoice(id).await? {
Some(invoice) => {
let items = self.get_invoice_items(&invoice.id).await?;
Ok(Some(InvoiceWithItems { invoice, items }))
}
None => Ok(None),
}
}
pub async fn get_invoice_items(&self, invoice_id: &str) -> Result<Vec<InvoiceItem>> {
let rows = sqlx::query_as::<_, InvoiceItem>(
"SELECT id, invoice, relay, sats
FROM invoice_item
WHERE invoice = ?",
)
.bind(invoice_id)
.fetch_all(&self.pool)
.await?;
Ok(rows)
}
pub async fn total_pending_invoices_for_tenant(&self, tenant: &str) -> Result<i64> {
let count = sqlx::query_scalar::<_, i64>(
"SELECT COUNT(*) FROM invoice
WHERE tenant = ? AND status = 'pending'",
)
.bind(tenant)
.fetch_one(&self.pool)
.await?;
Ok(count)
}
pub async fn get_relay_plan_sats(&self, plan: &str) -> Result<i64> {
let sats = Self::list_plans()
.into_iter()
.find(|p| p.id == plan)
.map(|p| p.sats)
.unwrap_or(0);
Ok(sats)
}
pub fn list_plans() -> Vec<Plan> { pub fn list_plans() -> Vec<Plan> {
vec![ vec![
Plan { Plan {
+3 -6
View File
@@ -1,6 +1,3 @@
- [ ] Show relay status on details page - [ ] Split repo into queries and commands
- [ ] Show relay activity history on details page (add activity route and repo method to backend spec + implementation) - [ ] Fix billing by using stripe as a backend to do proration, then mark invoices paid manually when using bitcoin.
- [ ] Infra provisioning isn't happening. At least, the relay's status isn't being updated - [ ] Send a payment link instead of an invoice so we can generate/pay on the fly
- [ ] If relay is inactive, show "reactivate" instead of "inactivate" in the relay detal menu
- [ ] Invoices
- [ ] Stripe