Compare commits

..

7 Commits

Author SHA1 Message Date
Jon Staab f7bd3e53fe Add snapshots to activity 2026-05-28 15:53:02 -07:00
Jon Staab eb0123abef Rename tenant fields to tenant_pubkey and plan to plan_id 2026-05-28 15:18:41 -07:00
Jon Staab 9f599d66be Clean up billing a bit 2026-05-28 14:34:19 -07:00
Jon Staab 72b30489b9 Add BillingPeriod helper 2026-05-28 13:20:17 -07:00
Jon Staab b11fb5dc25 Fix possible race condition related to billing an activity 2026-05-28 12:45:21 -07:00
Jon Staab 35d9aab02a Make infra module free functions 2026-05-27 17:26:47 -07:00
Jon Staab 0f47b483aa Update docs 2026-05-27 16:56:34 -07:00
27 changed files with 772 additions and 875 deletions
+24 -19
View File
@@ -1,14 +1,3 @@
CREATE TABLE IF NOT EXISTS activity (
id TEXT PRIMARY KEY,
tenant TEXT NOT NULL,
created_at INTEGER NOT NULL,
activity_type TEXT NOT NULL,
resource_type TEXT NOT NULL,
resource_id TEXT NOT NULL,
billed_at INTEGER,
plan_id TEXT
);
CREATE TABLE IF NOT EXISTS tenant (
pubkey TEXT PRIMARY KEY,
nwc_url TEXT NOT NULL DEFAULT '',
@@ -19,11 +8,24 @@ CREATE TABLE IF NOT EXISTS tenant (
renewed_at INTEGER
);
CREATE TABLE IF NOT EXISTS activity (
id TEXT PRIMARY KEY,
tenant_pubkey TEXT NOT NULL,
created_at INTEGER NOT NULL,
activity_type TEXT NOT NULL,
resource_type TEXT NOT NULL,
resource_id TEXT NOT NULL,
billed_at INTEGER,
snapshot TEXT NOT NULL,
FOREIGN KEY (tenant_pubkey) REFERENCES tenant(pubkey)
);
CREATE TABLE IF NOT EXISTS relay (
id TEXT PRIMARY KEY,
tenant TEXT NOT NULL,
tenant_pubkey TEXT NOT NULL,
subdomain TEXT NOT NULL UNIQUE,
plan TEXT NOT NULL,
plan_id TEXT NOT NULL,
status TEXT NOT NULL,
synced INTEGER NOT NULL DEFAULT 0,
sync_error TEXT NOT NULL DEFAULT '',
@@ -37,7 +39,7 @@ CREATE TABLE IF NOT EXISTS relay (
blossom_enabled INTEGER NOT NULL DEFAULT 0,
livekit_enabled INTEGER NOT NULL DEFAULT 0,
push_enabled INTEGER NOT NULL DEFAULT 1,
FOREIGN KEY (tenant) REFERENCES tenant(pubkey)
FOREIGN KEY (tenant_pubkey) REFERENCES tenant(pubkey)
);
CREATE TABLE IF NOT EXISTS invoice (
@@ -58,7 +60,7 @@ CREATE TABLE IF NOT EXISTS invoice_item (
activity_id TEXT,
tenant_pubkey TEXT NOT NULL,
relay_id TEXT NOT NULL,
plan TEXT NOT NULL,
plan_id TEXT NOT NULL,
amount INTEGER NOT NULL,
description TEXT NOT NULL DEFAULT '',
created_at INTEGER NOT NULL,
@@ -84,13 +86,13 @@ CREATE TABLE IF NOT EXISTS intent (
FOREIGN KEY (invoice_id) REFERENCES invoice(id)
);
CREATE INDEX IF NOT EXISTS idx_relay_tenant ON relay (tenant);
CREATE INDEX IF NOT EXISTS idx_activity_tenant_created ON activity (tenant, created_at);
CREATE INDEX IF NOT EXISTS idx_activity_tenant_created ON activity (tenant_pubkey, created_at);
CREATE INDEX IF NOT EXISTS idx_activity_resource_created ON activity (resource_id, created_at);
CREATE INDEX IF NOT EXISTS idx_activity_unbilled ON activity (tenant, created_at) WHERE billed_at IS NULL;
CREATE INDEX IF NOT EXISTS idx_activity_unbilled ON activity (tenant_pubkey, created_at) WHERE billed_at IS NULL;
CREATE INDEX IF NOT EXISTS idx_relay_tenant_pubkey ON relay (tenant_pubkey);
CREATE INDEX IF NOT EXISTS idx_invoice_tenant_created ON invoice (tenant_pubkey, created_at);
@@ -98,6 +100,9 @@ CREATE INDEX IF NOT EXISTS idx_invoice_item_invoice ON invoice_item (invoice_id)
CREATE INDEX IF NOT EXISTS idx_invoice_item_outstanding ON invoice_item (tenant_pubkey) WHERE invoice_id IS NULL;
-- At most one line item per billable activity to ensure no double-billing.
CREATE UNIQUE INDEX IF NOT EXISTS idx_invoice_item_activity ON invoice_item (activity_id);
CREATE INDEX IF NOT EXISTS idx_bolt11_invoice_created ON bolt11 (invoice_id, created_at);
CREATE INDEX IF NOT EXISTS idx_intent_invoice ON intent (invoice_id);
+1 -4
View File
@@ -29,7 +29,6 @@ use nostr_sdk::{Event, JsonUtil, Kind};
use crate::billing::Billing;
use crate::env;
use crate::infra::Infra;
use crate::models::{Relay, Tenant};
use crate::query;
use crate::robot::Robot;
@@ -52,16 +51,14 @@ pub struct Api {
pub billing: Billing,
pub stripe: Stripe,
pub robot: Robot,
pub infra: Infra,
}
impl Api {
pub fn new(billing: Billing, stripe: Stripe, robot: Robot, infra: Infra) -> Self {
pub fn new(billing: Billing, stripe: Stripe, robot: Robot) -> Self {
Self {
billing,
stripe,
robot,
infra,
}
}
+142 -287
View File
@@ -1,17 +1,18 @@
use anyhow::{Result, anyhow};
use std::collections::HashMap;
use std::time::Duration;
use crate::bitcoin;
use crate::command;
use crate::db;
use crate::env;
use crate::models::{Activity, Bolt11, Invoice, InvoiceItem, Tenant};
use crate::models::{Activity, Bolt11, Invoice, InvoiceItem, RELAY_STATUS_ACTIVE, Snapshot, Tenant};
use crate::query;
use crate::robot::Robot;
use crate::stripe::Stripe;
use crate::wallet::Wallet;
/// Owns subscription billing: it reconciles tenant activity into invoice items,
/// renews subscriptions each period, and collects payment (Lightning, then a
/// card on file, then a manual DM link).
#[derive(Clone)]
pub struct Billing {
stripe: Stripe,
@@ -31,107 +32,25 @@ impl Billing {
// --- lifecycle methods ---
pub async fn start(self) {
let mut rx = db::subscribe();
tokio::spawn({
let billing = self.clone();
async move { billing.poll().await }
});
if let Err(error) = self.reconcile_subscriptions("startup").await {
tracing::error!(error = %error, "failed to reconcile subscriptions on startup");
}
loop {
match rx.recv().await {
Ok(activity) => {
if let Err(e) = self.handle_activity(&activity).await {
tracing::error!(error = %e, "billing handle_activity failed");
}
}
Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
tracing::warn!(missed = n, "billing lagged, reconciling all subscriptions");
if let Err(error) = self.reconcile_subscriptions("lagged").await {
tracing::error!(error = %error, "failed to reconcile after lag");
}
}
Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
}
}
}
async fn poll(&self) {
let mut interval = tokio::time::interval(POLL_INTERVAL);
loop {
interval.tick().await;
if let Err(error) = self.autogenerate_invoices().await {
if let Err(error) = self.reconcile_subscriptions().await {
tracing::error!(error = %error, "billing poll failed");
}
}
}
async fn autogenerate_invoices(&self) -> Result<()> {
async fn reconcile_subscriptions(&self) -> Result<()> {
let tenants = query::list_tenants().await?;
tracing::info!(
tenant_count = tenants.len(),
"polling tenants for subscription renewal"
);
for tenant in tenants {
if let Err(error) = self.autogenerate_invoice(&tenant).await {
tracing::error!(
tenant = %tenant.pubkey,
error = ?error,
"failed to autogenerate invoice"
);
}
}
Ok(())
}
/// Poll entry point: generate the tenant's invoice for the current period
/// (adding any due renewals) and, if one results, collect payment.
async fn autogenerate_invoice(&self, tenant: &Tenant) -> Result<()> {
if let Some(invoice) = self.generate_invoice(tenant).await? {
self.attempt_payment(tenant, &invoice).await?;
}
Ok(())
}
async fn handle_activity(&self, activity: &Activity) -> Result<()> {
let should_reconcile = matches!(
activity.activity_type.as_str(),
"create_relay" | "update_relay" | "activate_relay" | "deactivate_relay"
);
if should_reconcile
&& let Some(tenant) = query::get_tenant(&activity.tenant).await?
{
self.reconcile_subscription(&tenant).await?;
}
Ok(())
}
async fn reconcile_subscriptions(&self, source: &str) -> Result<()> {
let tenants = query::list_tenants().await?;
tracing::info!(
source,
tenant_count = tenants.len(),
"reconciling all subscriptions"
);
tracing::info!(tenant_count = tenants.len(), "reconciling all subscriptions");
for tenant in tenants {
if let Err(error) = self.reconcile_subscription(&tenant).await {
tracing::error!(
source,
tenant = %tenant.pubkey,
error = ?error,
"failed to reconcile subscription"
@@ -142,11 +61,17 @@ impl Billing {
Ok(())
}
// --- Reconciliation, renewal, and on-demand billing ---
// --- Reconciliation of activity/renewals ---
async fn reconcile_subscription(&self, tenant: &Tenant) -> Result<()> {
/// Lists billable activity, setting the tenant's billing anchor to the first
/// activity in the process. Generates an invoice for the current period if due
/// for renewal or any billable activities have occurred. Attempts payment if
/// an invoice is generated.
pub async fn reconcile_subscription(&self, tenant: &Tenant) -> Result<()> {
let mut tenant = tenant.clone();
// Reconcile all activity, setting the tenant's billing anchor on the first
// positive-balance line item if not already set.
for activity in query::list_billable_activity_for_tenant(&tenant.pubkey).await? {
if tenant.billing_anchor.is_none() {
tenant.billing_anchor = Some(activity.created_at);
@@ -156,6 +81,21 @@ impl Billing {
self.reconcile_activity(&tenant, &activity).await?;
}
// If the tenant has no billing anchor, they have nothing to bill
let Some(period) = BillingPeriod::current(&tenant) else {
return Ok(());
};
// If tenant is due for renewal, bill any active relays.
if tenant.renewed_at.is_none_or(|at| at < period.start) {
self.reconcile_renewal(&tenant, &period).await?;
}
// Create the invoice, but only if non-zero and attempt payment
if let Some(invoice) = command::create_invoice(&tenant, &period).await? {
self.attempt_payment(&tenant, &invoice).await?;
};
Ok(())
}
@@ -182,14 +122,14 @@ impl Billing {
};
match invoice_item {
Some(item) => command::insert_invoice_item_for_activity(&item, &activity.id).await,
Some(ref item) => command::insert_invoice_item_for_activity(&item, &activity.id).await,
None => command::mark_activity_billed(&activity.id).await,
}
}
/// A prorated charge (or credit, with `sign` = -1) for the relay's current
/// plan. `None` for a missing relay or a free plan. Mid-period items don't
/// stamp `period_start` — the renewal decides coverage from activity history.
/// plan, covering the fraction of the period remaining at the activity.
/// `None` for a missing relay or a free plan.
async fn make_prorated_item(
&self,
tenant: &Tenant,
@@ -198,22 +138,28 @@ impl Billing {
description: &str,
) -> Result<Option<InvoiceItem>> {
let Some(relay) = query::get_relay(&activity.resource_id).await? else {
return Ok(None);
};
let Some(plan) = query::get_plan(&relay.plan) else {
return Ok(None);
return Err(anyhow!("activity resource was not a valid relay"));
};
let plan = query::get_plan(&relay.plan_id)?;
if plan.amount <= 0 {
return Ok(None);
}
let anchor = tenant
.billing_anchor
let period = BillingPeriod::at(tenant, activity.created_at)
.ok_or_else(|| anyhow!("billing anchor must be set before building prorated items"))?;
let fraction = period_fraction_remaining(anchor, activity.created_at);
let amount = sign * prorate(plan.amount, fraction);
let amount = sign * period.prorate(plan.amount, activity.created_at);
Ok(Some(line_item(activity, &relay.id, plan.id, amount, description)))
Ok(Some(InvoiceItem {
id: uuid::Uuid::new_v4().to_string(),
invoice_id: None,
activity_id: Some(activity.id.clone()),
tenant_pubkey: activity.tenant_pubkey.clone(),
relay_id: activity.resource_id.clone(),
plan_id: plan.id,
amount,
description: description.to_string(),
created_at: activity.created_at,
}))
}
/// The prorated delta for a plan change, read straight from the activity log:
@@ -226,128 +172,86 @@ impl Billing {
tenant: &Tenant,
activity: &Activity,
) -> Result<Option<InvoiceItem>> {
let Some(new_plan_id) = activity.plan_id.as_deref() else {
return Ok(None);
let new_plan_id = match &*activity.snapshot {
Snapshot::Relay { plan, .. } => plan,
};
let Some(old_plan_id) =
query::get_relay_plan_before(&activity.resource_id, activity.created_at).await?
else {
return Ok(None);
return Err(anyhow!("no previous plan found for relay update activity"));
};
if old_plan_id == new_plan_id {
if &old_plan_id == new_plan_id {
return Ok(None);
}
let (Some(new_plan), Some(old_plan)) =
(query::get_plan(new_plan_id), query::get_plan(&old_plan_id))
else {
return Ok(None);
};
let new_plan = query::get_plan(new_plan_id)?;
let old_plan = query::get_plan(&old_plan_id)?;
let anchor = tenant
.billing_anchor
let period = BillingPeriod::at(tenant, activity.created_at)
.ok_or_else(|| anyhow!("billing anchor must be set before building prorated items"))?;
let fraction = period_fraction_remaining(anchor, activity.created_at);
let amount = prorate(new_plan.amount, fraction) - prorate(old_plan.amount, fraction);
let amount = period.prorate(new_plan.amount, activity.created_at)
- period.prorate(old_plan.amount, activity.created_at);
if amount == 0 {
return Ok(None);
}
let description = format!("Plan changed from {} to {}", old_plan.name, new_plan.name);
Ok(Some(line_item(
activity,
&activity.resource_id,
new_plan.id,
Ok(Some(InvoiceItem {
id: uuid::Uuid::new_v4().to_string(),
invoice_id: None,
activity_id: Some(activity.id.clone()),
tenant_pubkey: activity.tenant_pubkey.clone(),
relay_id: activity.resource_id.clone(),
plan_id: new_plan.id,
amount,
&description,
)))
}
/// Reconcile pending activity, add this period's renewals for any relay due,
/// and claim everything outstanding onto an invoice. Shared by the poll and
/// the on-demand invoice endpoint — safe to call either way: renewals are
/// per-relay idempotent. No payment is attempted here; callers that want
/// auto-pay do it on the returned invoice. `None` when nothing is owed.
pub async fn generate_invoice(&self, tenant: &Tenant) -> Result<Option<Invoice>> {
self.reconcile_subscription(tenant).await?;
// reconcile may have just set the anchor (first activity); re-read it.
let Some(tenant) = query::get_tenant(&tenant.pubkey).await? else {
return Ok(None);
};
let Some(anchor) = tenant.billing_anchor else {
return Ok(None);
};
let now = chrono::Utc::now().timestamp();
let period_start = period_start_at(anchor, now);
let period_end = add_one_month(period_start);
// Short-circuit the renewal scan once this period is already renewed — the
// common case on all but the first poll of a period (saving ~720 scans a
// month per tenant). renew_tenant re-checks this in-tx as the real guard.
if tenant.renewed_at.is_none_or(|at| at < period_start) {
self.renew_period(&tenant, period_start).await?;
}
self.claim_outstanding(&tenant, period_start, period_end).await
description,
created_at: activity.created_at,
}))
}
/// Charge a full-period renewal for every relay that was active on a paid plan
/// as of `period_start`, reconstructing that state from the activity log
/// (status from create/activate/deactivate, plan from create/update). Per-relay
/// idempotent via `period_start`, so calling it on every generation can't
/// renew a relay twice; a relay created/activated *within* the period isn't
/// active before the boundary, so it's covered by its own prorated charge.
async fn renew_period(&self, tenant: &Tenant, period_start: i64) -> Result<()> {
let activities = query::list_relay_activity_before(&tenant.pubkey, period_start).await?;
/// as of `period.start`, reading that state from each relay's most recent
/// activity snapshot before the boundary (relays with no prior activity didn't
/// exist yet and are skipped). Idempotent per period via the tenant's
/// `renewed_at` marker, so calling it on every generation can't renew twice;
/// a relay created/activated *within* the period isn't active before the
/// boundary, so it's covered by its own prorated charge instead.
async fn reconcile_renewal(&self, tenant: &Tenant, period: &BillingPeriod) -> Result<()> {
let relays = query::list_relays_for_tenant(&tenant.pubkey).await?;
let mut renewal_items = Vec::new();
for (relay_id, state) in relay_states(&activities) {
if !state.active {
continue;
}
let Some(plan) = state.plan.and_then(|id| query::get_plan(&id)) else {
let mut line_items = Vec::new();
for relay in relays {
let Some(activity) =
query::get_latest_relay_activity_before(&relay.id, period.start).await?
else {
continue;
};
let Snapshot::Relay { plan: plan_id, status, .. } = &*activity.snapshot;
if status != RELAY_STATUS_ACTIVE {
continue;
}
let plan = query::get_plan(plan_id)?;
if plan.amount <= 0 {
continue;
}
renewal_items.push(InvoiceItem {
line_items.push(InvoiceItem {
id: uuid::Uuid::new_v4().to_string(),
invoice_id: None,
activity_id: None,
tenant_pubkey: tenant.pubkey.clone(),
relay_id,
plan: plan.id,
relay_id: relay.id,
plan_id: plan.id,
amount: plan.amount,
description: "Subscription renewal".to_string(),
created_at: period_start,
created_at: period.start,
});
}
// Inserts the items and advances `renewed_at` to `period_start` in one
// Inserts the items and advances `renewed_at` to `period.start` in one
// transaction (idempotent via an in-tx guard), so a re-tick is a no-op.
command::renew_tenant(&tenant.pubkey, period_start, &renewal_items).await
command::insert_invoice_items_for_renewal(&line_items, period).await
}
/// Claim the tenant's outstanding items onto a fresh invoice if they net
/// positive; `None` when nothing is owed (a net credit stays outstanding and
/// carries to the next positive invoice).
async fn claim_outstanding(
&self,
tenant: &Tenant,
period_start: i64,
period_end: i64,
) -> Result<Option<Invoice>> {
let invoice_id = uuid::Uuid::new_v4().to_string();
command::claim_outstanding_into_invoice(
&invoice_id,
&tenant.pubkey,
period_start,
period_end,
)
.await
}
// --- Payments ---
pub async fn attempt_payment(&self, tenant: &Tenant, invoice: &Invoice) -> Result<()> {
let mut error_message: Option<String> = None;
@@ -508,114 +412,65 @@ const MANUAL_PAYMENT_DM: &str = "Payment is due for your relay subscription. Ope
const USER_ERROR_PREFIX: &str = "NWC auto-payment failed:";
const USER_ERROR_MAX_CHARS: usize = 240;
/// The start of the billing period containing `now`, for monthly periods
/// anchored at `anchor`. Steps forward in whole calendar months so boundaries
/// track months (2831 days) rather than a fixed span of seconds.
fn period_start_at(anchor: i64, now: i64) -> i64 {
use chrono::{DateTime, Months, Utc};
/// One tenant's monthly billing period containing some timestamp, anchored at
/// the tenant's `billing_anchor`. Half-open `[start, end)` so a moment at
/// exactly `end` belongs to the next period.
pub struct BillingPeriod {
pub start: i64,
pub end: i64,
}
let anchor_dt = DateTime::<Utc>::from_timestamp(anchor, 0).unwrap_or_default();
let mut start = anchor_dt;
let mut months = 1u32;
while let Some(next) = anchor_dt.checked_add_months(Months::new(months)) {
if next.timestamp() > now {
break;
}
start = next;
months += 1;
impl BillingPeriod {
/// The period containing `chrono::Utc::now()` for `tenant`. `None` when the
/// tenant has no `billing_anchor` yet — i.e. no billable activity has been seen.
fn current(tenant: &Tenant) -> Option<Self> {
Self::at(tenant, chrono::Utc::now().timestamp())
}
start.timestamp()
}
/// The period containing `at` for `tenant`. `None` when the tenant has no
/// `billing_anchor` yet — i.e. no billable activity has been seen.
fn at(tenant: &Tenant, at: i64) -> Option<Self> {
use chrono::{DateTime, Months, Utc};
/// One calendar month after `ts` (a unix timestamp), falling back to `ts` if the
/// shifted date can't be represented.
fn add_one_month(ts: i64) -> i64 {
use chrono::{DateTime, Months, Utc};
let anchor = tenant.billing_anchor?;
let anchor_dt = DateTime::<Utc>::from_timestamp(anchor, 0).unwrap_or_default();
DateTime::<Utc>::from_timestamp(ts, 0)
.and_then(|dt| dt.checked_add_months(Months::new(1)))
.map(|dt| dt.timestamp())
.unwrap_or(ts)
}
/// Fraction of the current billing period still unused at `at`, in `[0.0, 1.0]`,
/// for prorating a mid-period charge or credit. With no billing anchor yet the
/// period is only just beginning, so the whole period remains (full price).
fn period_fraction_remaining(billing_anchor: i64, at: i64) -> f64 {
let period_start = period_start_at(billing_anchor, at);
let period_end = add_one_month(period_start);
let period_len = (period_end - period_start) as f64;
if period_len <= 0.0 {
return 1.0;
}
(((period_end - at) as f64) / period_len).clamp(0.0, 1.0)
}
/// Prorate a minor-unit `amount` by `fraction`, rounded to the nearest unit.
fn prorate(amount: i64, fraction: f64) -> i64 {
(amount as f64 * fraction).round() as i64
}
/// Build an outstanding (unassigned, `invoice_id = None`) line item from a
/// reconciled activity. `period_start` is `Some` only for coverage charges
/// (creation/activation), which mark the relay-period as paid.
fn line_item(
activity: &Activity,
relay_id: &str,
plan: String,
amount: i64,
description: &str,
) -> InvoiceItem {
InvoiceItem {
id: uuid::Uuid::new_v4().to_string(),
invoice_id: None,
activity_id: Some(activity.id.clone()),
tenant_pubkey: activity.tenant.clone(),
relay_id: relay_id.to_string(),
plan,
amount,
description: description.to_string(),
created_at: activity.created_at,
}
}
/// A relay's billing-relevant state at a point in time, reconstructed by folding
/// its activity log.
#[derive(Default)]
struct RelayState {
active: bool,
plan: Option<String>,
}
/// Fold relay activities (which must be oldest-first) into each relay's
/// `(active, plan)` state. `create`/`activate`/`deactivate` drive status;
/// `create`/`update` carry the plan via `plan_id`. Feed it activities up to a
/// cutoff to get each relay's state as of that moment (e.g. the period boundary).
fn relay_states(activities: &[Activity]) -> HashMap<String, RelayState> {
let mut states: HashMap<String, RelayState> = HashMap::new();
for activity in activities {
let state = states.entry(activity.resource_id.clone()).or_default();
match activity.activity_type.as_str() {
"create_relay" => {
state.active = true;
state.plan = activity.plan_id.clone();
// Walk forward in whole calendar months from the anchor until the next
// step would pass `at`, so boundaries track months (2831 days) rather
// than a fixed span of seconds.
let mut start = anchor_dt;
let mut months = 1u32;
while let Some(next) = anchor_dt.checked_add_months(Months::new(months)) {
if next.timestamp() > at {
break;
}
"update_relay" => {
if activity.plan_id.is_some() {
state.plan = activity.plan_id.clone();
}
}
"activate_relay" => state.active = true,
"deactivate_relay" => state.active = false,
_ => {}
start = next;
months += 1;
}
let end = start.checked_add_months(Months::new(1)).unwrap_or(start);
Some(Self {
start: start.timestamp(),
end: end.timestamp(),
})
}
states
/// Fraction of this period still unused at `at`, in `[0.0, 1.0]`, for
/// prorating a mid-period charge or credit.
fn fraction_remaining(&self, at: i64) -> f64 {
let len = (self.end - self.start) as f64;
if len <= 0.0 {
return 1.0;
}
(((self.end - at) as f64) / len).clamp(0.0, 1.0)
}
/// Prorate a minor-unit `amount` by the fraction of this period remaining
/// at `at`, rounded to the nearest unit.
fn prorate(&self, amount: i64, at: i64) -> i64 {
(amount as f64 * self.fraction_remaining(at)).round() as i64
}
}
fn summarize_error_message(error: &str) -> Option<String> {
+3
View File
@@ -1,5 +1,7 @@
use anyhow::{Result, anyhow};
/// Convert a fiat amount in minor units (e.g. USD cents) to millisatoshis at the
/// current spot price, for pricing a Lightning invoice from an invoice total.
pub async fn fiat_to_msats(amount_fiat_minor: i64, currency: &str) -> Result<u64> {
let price = get_bitcoin_price(&currency.to_uppercase()).await?;
let divisor = 10_f64.powi(currency_minor_exponent(currency)? as i32);
@@ -18,6 +20,7 @@ struct CoinbaseSpotPriceData {
amount: String,
}
/// The current Bitcoin spot price in `currency`, from Coinbase.
pub async fn get_bitcoin_price(currency: &str) -> Result<f64> {
let http = reqwest::Client::new();
let url = format!("https://api.coinbase.com/v2/prices/BTC-{currency}/spot");
+147 -138
View File
@@ -1,44 +1,36 @@
use anyhow::Result;
use sqlx::types::Json;
use sqlx::{Sqlite, Transaction};
use crate::billing::BillingPeriod;
use crate::db::{pool, publish, with_tx};
use crate::models::{
Activity, Bolt11, Invoice, InvoiceItem, RELAY_STATUS_ACTIVE, RELAY_STATUS_DELINQUENT,
RELAY_STATUS_INACTIVE, Relay, Tenant,
RELAY_STATUS_INACTIVE, Relay, Snapshot, Tenant,
};
// --- Tenants ---
pub async fn create_tenant(tenant: &Tenant) -> Result<()> {
let activity = with_tx(async |tx| {
sqlx::query(
"INSERT INTO tenant (pubkey, nwc_url, created_at, stripe_customer_id)
VALUES (?, ?, ?, ?)",
)
.bind(&tenant.pubkey)
.bind(&tenant.nwc_url)
.bind(tenant.created_at)
.bind(&tenant.stripe_customer_id)
.execute(&mut **tx)
.await?;
insert_activity_tx(tx, "create_tenant", "tenant", &tenant.pubkey, None).await
})
sqlx::query(
"INSERT INTO tenant (pubkey, nwc_url, created_at, stripe_customer_id)
VALUES (?, ?, ?, ?)",
)
.bind(&tenant.pubkey)
.bind(&tenant.nwc_url)
.bind(tenant.created_at)
.bind(&tenant.stripe_customer_id)
.execute(pool())
.await?;
publish(activity);
Ok(())
}
pub async fn update_tenant(tenant: &Tenant) -> Result<()> {
let activity = with_tx(async |tx| {
sqlx::query("UPDATE tenant SET nwc_url = ? WHERE pubkey = ?")
.bind(&tenant.nwc_url)
.bind(&tenant.pubkey)
.execute(&mut **tx)
.await?;
insert_activity_tx(tx, "update_tenant", "tenant", &tenant.pubkey, None).await
})
.await?;
publish(activity);
sqlx::query("UPDATE tenant SET nwc_url = ? WHERE pubkey = ?")
.bind(&tenant.nwc_url)
.bind(&tenant.pubkey)
.execute(pool())
.await?;
Ok(())
}
@@ -65,7 +57,7 @@ pub async fn create_relay(relay: &Relay) -> Result<()> {
let activity = with_tx(async |tx| {
sqlx::query(
"INSERT INTO relay (
id, tenant, subdomain, plan, status, synced, sync_error,
id, tenant_pubkey, subdomain, plan_id, status, synced, sync_error,
info_name, info_icon, info_description,
policy_public_join, policy_strip_signatures,
groups_enabled, management_enabled, blossom_enabled,
@@ -73,9 +65,9 @@ pub async fn create_relay(relay: &Relay) -> Result<()> {
) VALUES (?, ?, ?, ?, 'active', 0, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
)
.bind(&relay.id)
.bind(&relay.tenant)
.bind(&relay.tenant_pubkey)
.bind(&relay.subdomain)
.bind(&relay.plan)
.bind(&relay.plan_id)
.bind(&relay.sync_error)
.bind(&relay.info_name)
.bind(&relay.info_icon)
@@ -89,7 +81,11 @@ pub async fn create_relay(relay: &Relay) -> Result<()> {
.bind(relay.push_enabled)
.execute(&mut **tx)
.await?;
insert_activity_tx(tx, "create_relay", "relay", &relay.id, Some(&relay.plan)).await
let snapshot = Snapshot::Relay {
plan: relay.plan_id.clone(),
status: RELAY_STATUS_ACTIVE.to_string(),
};
insert_activity_tx(tx, "create_relay", &relay.id, snapshot).await
})
.await?;
publish(activity);
@@ -100,16 +96,16 @@ pub async fn update_relay(relay: &Relay) -> Result<()> {
let activity = with_tx(async |tx| {
sqlx::query(
"UPDATE relay
SET tenant = ?, subdomain = ?, plan = ?, status = ?, sync_error = ?, synced = 0,
SET tenant_pubkey = ?, subdomain = ?, plan_id = ?, status = ?, sync_error = ?, synced = 0,
info_name = ?, info_icon = ?, info_description = ?,
policy_public_join = ?, policy_strip_signatures = ?,
groups_enabled = ?, management_enabled = ?, blossom_enabled = ?,
livekit_enabled = ?, push_enabled = ?
WHERE id = ?",
)
.bind(&relay.tenant)
.bind(&relay.tenant_pubkey)
.bind(&relay.subdomain)
.bind(&relay.plan)
.bind(&relay.plan_id)
.bind(&relay.status)
.bind(&relay.sync_error)
.bind(&relay.info_name)
@@ -125,34 +121,42 @@ pub async fn update_relay(relay: &Relay) -> Result<()> {
.bind(&relay.id)
.execute(&mut **tx)
.await?;
insert_activity_tx(tx, "update_relay", "relay", &relay.id, Some(&relay.plan)).await
let snapshot = Snapshot::Relay {
plan: relay.plan_id.clone(),
status: relay.status.clone(),
};
insert_activity_tx(tx, "update_relay", &relay.id, snapshot).await
})
.await?;
publish(activity);
Ok(())
}
pub async fn activate_relay(relay: &Relay) -> Result<()> {
set_relay_status(relay, RELAY_STATUS_ACTIVE, "activate_relay").await
}
pub async fn deactivate_relay(relay: &Relay) -> Result<()> {
set_relay_status(&relay.id, RELAY_STATUS_INACTIVE, "deactivate_relay").await
set_relay_status(relay, RELAY_STATUS_INACTIVE, "deactivate_relay").await
}
#[allow(dead_code)] // wired up by the delinquency flow (not yet implemented)
pub async fn mark_relay_delinquent(relay: &Relay) -> Result<()> {
set_relay_status(&relay.id, RELAY_STATUS_DELINQUENT, "mark_relay_delinquent").await
set_relay_status(relay, RELAY_STATUS_DELINQUENT, "mark_relay_delinquent").await
}
pub async fn activate_relay(relay: &Relay) -> Result<()> {
set_relay_status(&relay.id, RELAY_STATUS_ACTIVE, "activate_relay").await
}
async fn set_relay_status(relay_id: &str, status: &str, activity_type: &str) -> Result<()> {
async fn set_relay_status(relay: &Relay, status: &str, activity_type: &str) -> Result<()> {
let activity = with_tx(async |tx| {
sqlx::query("UPDATE relay SET status = ?, synced = 0 WHERE id = ?")
.bind(status)
.bind(relay_id)
.bind(&relay.id)
.execute(&mut **tx)
.await?;
insert_activity_tx(tx, activity_type, "relay", relay_id, None).await
let snapshot = Snapshot::Relay {
plan: relay.plan_id.clone(),
status: status.to_string(),
};
insert_activity_tx(tx, activity_type, &relay.id, snapshot).await
})
.await?;
publish(activity);
@@ -166,34 +170,88 @@ pub async fn fail_relay_sync(relay: &Relay, sync_error: String) -> Result<()> {
.bind(&relay.id)
.execute(&mut **tx)
.await?;
insert_activity_tx(tx, "fail_relay_sync", "relay", &relay.id, None).await
let snapshot = Snapshot::Relay {
plan: relay.plan_id.clone(),
status: relay.status.clone(),
};
insert_activity_tx(tx, "fail_relay_sync", &relay.id, snapshot).await
})
.await?;
publish(activity);
Ok(())
}
pub async fn complete_relay_sync(relay_id: &str) -> Result<()> {
pub async fn complete_relay_sync(relay: &Relay) -> Result<()> {
let activity = with_tx(async |tx| {
sqlx::query("UPDATE relay SET synced = 1, sync_error = '' WHERE id = ?")
.bind(relay_id)
.bind(&relay.id)
.execute(&mut **tx)
.await?;
insert_activity_tx(tx, "complete_relay_sync", "relay", relay_id, None).await
let snapshot = Snapshot::Relay {
plan: relay.plan_id.clone(),
status: relay.status.clone(),
};
insert_activity_tx(tx, "complete_relay_sync", &relay.id, snapshot).await
})
.await?;
publish(activity);
Ok(())
}
// --- Invoice items (the outstanding-charge ledger) ---
// --- Invoice items ---
/// Persist a reconciled activity's line item and mark the activity billed in one
/// transaction, so a recovery pass never re-bills it.
pub async fn insert_invoice_item_for_activity(invoice_item: &InvoiceItem, activity_id: &str) -> Result<()> {
let now = chrono::Utc::now().timestamp();
with_tx(async |tx| {
insert_invoice_item_tx(tx, invoice_item).await?;
mark_activity_billed_tx(tx, activity_id, now).await?;
// Claim the activity first. If a concurrent reconcile pass already billed
// it, the claim no-ops and we skip the item rather than duplicating it.
if mark_activity_billed_tx(tx, activity_id, now).await? {
insert_invoice_item_tx(tx, invoice_item).await?;
}
Ok(())
})
.await
}
/// Insert this period's renewal items and advance the tenant's `renewed_at`
/// marker to `period.start`, atomically and idempotently. Empty `items` is a
/// no-op — a tenant with no active paid relays has nothing to renew.
pub async fn insert_invoice_items_for_renewal(
items: &[InvoiceItem],
period: &BillingPeriod,
) -> Result<()> {
let Some(first) = items.first() else {
return Ok(());
};
let tenant_pubkey = &first.tenant_pubkey;
with_tx(async |tx| {
// Re-read the marker inside the transaction so the guard and the writes
// commit together — this ensures idempotency so we don't double-invoice.
let renewed_at = sqlx::query_scalar::<_, Option<i64>>(
"SELECT renewed_at FROM tenant WHERE pubkey = ?",
)
.bind(tenant_pubkey)
.fetch_one(&mut **tx)
.await?;
if renewed_at.is_some_and(|at| at >= period.start) {
return Ok(());
}
for item in items {
insert_invoice_item_tx(tx, item).await?;
}
sqlx::query("UPDATE tenant SET renewed_at = ? WHERE pubkey = ?")
.bind(period.start)
.bind(tenant_pubkey)
.execute(&mut **tx)
.await?;
Ok(())
})
@@ -205,45 +263,8 @@ pub async fn insert_invoice_item_for_activity(invoice_item: &InvoiceItem, activi
pub async fn mark_activity_billed(activity_id: &str) -> Result<()> {
let now = chrono::Utc::now().timestamp();
with_tx(async |tx| mark_activity_billed_tx(tx, activity_id, now).await).await
}
/// Insert renewal line items, skipping any relay already covered for the item's
/// `period_start`. The per-relay existence check and insert are a single
/// statement, so neither a re-tick nor a relay's own creation/activation charge
/// (which also stamps `period_start`) can bill the same relay-period twice.
pub async fn renew_tenant(
tenant_pubkey: &str,
period_start: i64,
items: &[InvoiceItem],
) -> Result<()> {
with_tx(async |tx| {
// In-tx guard: bail if this tenant has already been renewed for this
// period (or later). This is the correctness backstop — it keeps renewal
// idempotent under a crash mid-renewal or a poll racing the eager
// endpoint, since the item inserts and the `renewed_at` write commit
// together.
let renewed_at = sqlx::query_scalar::<_, Option<i64>>(
"SELECT renewed_at FROM tenant WHERE pubkey = ?",
)
.bind(tenant_pubkey)
.fetch_one(&mut **tx)
.await?;
if renewed_at.is_some_and(|at| at >= period_start) {
return Ok(());
}
for item in items {
insert_invoice_item_tx(tx, item).await?;
}
sqlx::query("UPDATE tenant SET renewed_at = ? WHERE pubkey = ?")
.bind(period_start)
.bind(tenant_pubkey)
.execute(&mut **tx)
.await?;
mark_activity_billed_tx(tx, activity_id, now).await?;
Ok(())
})
.await
@@ -251,23 +272,16 @@ pub async fn renew_tenant(
// --- Invoices ---
/// Claim all of a tenant's outstanding items onto a new invoice — but only if
/// they sum to a positive amount. A non-positive balance (net credit or nothing
/// owed) leaves the items outstanding so the credit carries to the next positive
/// invoice. The sum, insert, and claim run in one transaction. Returns the
/// invoice, or `None` when there's nothing to bill.
pub async fn claim_outstanding_into_invoice(
invoice_id: &str,
tenant_pubkey: &str,
period_start: i64,
period_end: i64,
) -> Result<Option<Invoice>> {
/// Claim all of a tenant's outstanding items onto a new invoice. A non-positive
/// balance leaves the items outstanding so the credit carries to the next positive
/// invoice. Returns the invoice, or `None` when there's nothing to bill.
pub async fn create_invoice(tenant: &Tenant, period: &BillingPeriod) -> Result<Option<Invoice>> {
with_tx(async |tx| {
let total = sqlx::query_scalar::<_, i64>(
"SELECT COALESCE(SUM(amount), 0) FROM invoice_item
WHERE tenant_pubkey = ? AND invoice_id IS NULL",
)
.bind(tenant_pubkey)
.bind(&tenant.pubkey)
.fetch_one(&mut **tx)
.await?;
@@ -275,15 +289,14 @@ pub async fn claim_outstanding_into_invoice(
return Ok(None);
}
let invoice =
insert_invoice_tx(tx, invoice_id, tenant_pubkey, period_start, period_end).await?;
let invoice = insert_invoice_tx(tx, &tenant, &period).await?;
sqlx::query(
"UPDATE invoice_item SET invoice_id = ?
WHERE tenant_pubkey = ? AND invoice_id IS NULL",
)
.bind(invoice_id)
.bind(tenant_pubkey)
.bind(&invoice.id)
.bind(&tenant.pubkey)
.execute(&mut **tx)
.await?;
@@ -295,17 +308,12 @@ pub async fn claim_outstanding_into_invoice(
pub async fn mark_invoice_paid(invoice_id: &str, method: &str) -> Result<()> {
let updated_at = chrono::Utc::now().timestamp();
let activity = with_tx(async |tx| {
sqlx::query("UPDATE invoice SET status = 'paid', method = ?, updated_at = ? WHERE id = ?")
.bind(method)
.bind(updated_at)
.bind(invoice_id)
.execute(&mut **tx)
.await?;
insert_activity_tx(tx, "invoice_paid", "invoice", invoice_id, None).await
})
.await?;
publish(activity);
sqlx::query("UPDATE invoice SET status = 'paid', method = ?, updated_at = ? WHERE id = ?")
.bind(method)
.bind(updated_at)
.bind(invoice_id)
.execute(pool())
.await?;
Ok(())
}
@@ -370,67 +378,65 @@ pub async fn insert_intent(intent_id: &str, invoice_id: &str) -> Result<()> {
async fn insert_activity_tx(
tx: &mut Transaction<'_, Sqlite>,
activity_type: &str,
resource_type: &str,
resource_id: &str,
plan_id: Option<&str>,
snapshot: Snapshot,
) -> Result<Activity> {
let tenant = match resource_type {
"tenant" => resource_id.to_string(),
"relay" => {
sqlx::query_scalar::<_, String>("SELECT tenant FROM relay WHERE id = ?")
let resource_type = snapshot.resource_type();
let tenant_pubkey = match &snapshot {
Snapshot::Relay { .. } => {
sqlx::query_scalar::<_, String>("SELECT tenant_pubkey FROM relay WHERE id = ?")
.bind(resource_id)
.fetch_one(&mut **tx)
.await?
}
_ => anyhow::bail!("unknown resource_type: {resource_type}"),
};
let id = uuid::Uuid::new_v4().to_string();
let created_at = chrono::Utc::now().timestamp();
let snapshot = Json(snapshot);
sqlx::query(
"INSERT INTO activity (id, tenant, created_at, activity_type, resource_type, resource_id, plan_id)
"INSERT INTO activity (id, tenant_pubkey, created_at, activity_type, resource_type, resource_id, snapshot)
VALUES (?, ?, ?, ?, ?, ?, ?)",
)
.bind(&id)
.bind(&tenant)
.bind(&tenant_pubkey)
.bind(created_at)
.bind(activity_type)
.bind(resource_type)
.bind(resource_id)
.bind(plan_id)
.bind(&snapshot)
.execute(&mut **tx)
.await?;
Ok(Activity {
id,
tenant,
tenant_pubkey,
created_at,
activity_type: activity_type.to_string(),
resource_type: resource_type.to_string(),
resource_id: resource_id.to_string(),
billed_at: None,
plan_id: plan_id.map(str::to_string),
snapshot,
})
}
async fn insert_invoice_tx(
tx: &mut Transaction<'_, Sqlite>,
invoice_id: &str,
tenant_pubkey: &str,
period_start: i64,
period_end: i64,
tenant: &Tenant,
period: &BillingPeriod,
) -> Result<Invoice> {
let now = chrono::Utc::now().timestamp();
let invoice_id = uuid::Uuid::new_v4().to_string();
Ok(sqlx::query_as::<_, Invoice>(
"INSERT INTO invoice (id, tenant_pubkey, status, period_start, period_end, created_at, updated_at)
VALUES (?, ?, 'open', ?, ?, ?, ?) RETURNING *",
)
.bind(invoice_id)
.bind(tenant_pubkey)
.bind(period_start)
.bind(period_end)
.bind(&tenant.pubkey)
.bind(&period.start)
.bind(period.end)
.bind(now)
.bind(now)
.fetch_one(&mut **tx)
@@ -440,7 +446,7 @@ async fn insert_invoice_tx(
async fn insert_invoice_item_tx(tx: &mut Transaction<'_, Sqlite>, item: &InvoiceItem) -> Result<()> {
sqlx::query(
"INSERT INTO invoice_item
(id, invoice_id, activity_id, tenant_pubkey, relay_id, plan, amount, description, created_at)
(id, invoice_id, activity_id, tenant_pubkey, relay_id, plan_id, amount, description, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)",
)
.bind(&item.id)
@@ -448,7 +454,7 @@ async fn insert_invoice_item_tx(tx: &mut Transaction<'_, Sqlite>, item: &Invoice
.bind(&item.activity_id)
.bind(&item.tenant_pubkey)
.bind(&item.relay_id)
.bind(&item.plan)
.bind(&item.plan_id)
.bind(item.amount)
.bind(&item.description)
.bind(item.created_at)
@@ -457,15 +463,18 @@ async fn insert_invoice_item_tx(tx: &mut Transaction<'_, Sqlite>, item: &Invoice
Ok(())
}
/// Claim an activity as billed. Returns `true` if this call set the marker, and
/// `false` if it was already set — e.g. a concurrent reconcile pass won the race —
/// so callers can skip work that would otherwise double-bill.
async fn mark_activity_billed_tx(
tx: &mut Transaction<'_, Sqlite>,
activity_id: &str,
billed_at: i64,
) -> Result<()> {
sqlx::query("UPDATE activity SET billed_at = ? WHERE id = ?")
) -> Result<bool> {
let result = sqlx::query("UPDATE activity SET billed_at = ? WHERE id = ? AND billed_at IS NULL")
.bind(billed_at)
.bind(activity_id)
.execute(&mut **tx)
.await?;
Ok(())
Ok(result.rows_affected() > 0)
}
+261 -271
View File
@@ -1,3 +1,7 @@
//! The relay-provisioning reactor: it keeps the external relay backend (the
//! zooid API) in sync with our relay rows, reacting to relay activity and
//! retrying failed syncs with backoff.
use anyhow::Result;
use nostr_sdk::prelude::*;
use std::time::Duration;
@@ -12,286 +16,272 @@ const RELAY_SYNC_RETRY_BASE_DELAY_SECS: u64 = 30;
const RELAY_SYNC_RETRY_MAX_DELAY_SECS: u64 = 15 * 60;
const RELAY_SYNC_RETRY_MAX_ATTEMPTS: usize = 6;
#[derive(Clone)]
pub struct Infra;
/// Run the reactor for the life of the process: reconcile any relays left
/// unsynced from a previous run, then sync each relay as its activity arrives.
pub async fn start() {
let mut rx = db::subscribe();
impl Infra {
pub fn new() -> Self {
Self
if let Err(error) = reconcile_relay_state("startup").await {
tracing::error!(error = %error, "failed to reconcile relay state on startup");
}
pub async fn start(self) {
let mut rx = db::subscribe();
if let Err(error) = self.reconcile_relay_state("startup").await {
tracing::error!(error = %error, "failed to reconcile relay state on startup");
}
loop {
match rx.recv().await {
Ok(activity) => {
if let Err(e) = self.handle_activity(&activity).await {
tracing::error!(error = %e, "infra handle_activity failed");
}
}
Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
tracing::warn!(missed = n, "infra lagged");
if let Err(error) = self.reconcile_relay_state("lagged").await {
tracing::error!(error = %error, "failed to reconcile relay state after lag");
}
}
Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
}
}
}
async fn handle_activity(&self, activity: &Activity) -> Result<()> {
let needs_sync = matches!(
activity.activity_type.as_str(),
"create_relay" | "update_relay" | "activate_relay" | "deactivate_relay" | "fail_relay_sync"
);
if activity.resource_type != "relay" || !needs_sync {
return Ok(());
}
if activity.activity_type == "fail_relay_sync" {
self.schedule_relay_sync_retry(&activity.resource_id, "activity").await?;
return Ok(());
}
let Some(relay) = query::get_relay(&activity.resource_id).await? else {
return Ok(());
};
self.sync_relay(&relay).await;
Ok(())
}
async fn reconcile_relay_state(&self, source: &str) -> Result<()> {
let relays = query::list_relays_pending_sync().await?;
if relays.is_empty() {
return Ok(());
}
tracing::info!(source, relay_count = relays.len(), "reconciling pending relay state");
for relay in relays {
if relay.sync_error.trim().is_empty() {
self.sync_relay(&relay).await;
} else {
self.schedule_relay_sync_retry(&relay.id, source).await?;
}
}
Ok(())
}
async fn schedule_relay_sync_retry(&self, relay_id: &str, source: &str) -> Result<()> {
fn get_retry_delay(consecutive_failures: usize) -> Option<Duration> {
let retry_attempt = consecutive_failures.max(1);
if retry_attempt > RELAY_SYNC_RETRY_MAX_ATTEMPTS {
return None;
}
let exponent = (retry_attempt - 1).min(31);
let multiplier = 1u64 << exponent;
let delay_secs = RELAY_SYNC_RETRY_BASE_DELAY_SECS
.saturating_mul(multiplier)
.min(RELAY_SYNC_RETRY_MAX_DELAY_SECS);
Some(Duration::from_secs(delay_secs))
}
let activities = query::list_activity_for_resource(relay_id).await?;
let consecutive_failures = activities
.iter()
.take_while(|activity| activity.activity_type == "fail_relay_sync")
.count();
let Some(delay) = get_retry_delay(consecutive_failures) else {
tracing::warn!(
relay = relay_id,
consecutive_failures,
max_attempts = RELAY_SYNC_RETRY_MAX_ATTEMPTS,
"relay sync retries exhausted; awaiting manual intervention"
);
return Ok(());
};
tracing::info!(
relay = relay_id,
source,
consecutive_failures,
delay_secs = delay.as_secs(),
"scheduled relay sync retry"
);
let relay_id = relay_id.to_string();
let infra = self.clone();
tokio::spawn(async move {
tokio::time::sleep(delay).await;
match query::get_relay(&relay_id).await {
Ok(Some(relay)) => infra.sync_relay(&relay).await,
Ok(None) => {}
Err(e) => {
tracing::error!(relay = %relay_id, error = %e, "relay sync retry task failed");
loop {
match rx.recv().await {
Ok(activity) => {
if let Err(e) = handle_activity(&activity).await {
tracing::error!(error = %e, "infra handle_activity failed");
}
}
});
Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
tracing::warn!(missed = n, "infra lagged");
Ok(())
}
async fn sync_relay(&self, relay: &Relay) {
match self.try_sync_relay(relay).await {
Ok(()) => {
tracing::info!(relay = %relay.id, "relay sync succeeded");
if let Err(e) = command::complete_relay_sync(&relay.id).await {
tracing::error!(relay = %relay.id, error = %e, "failed to mark sync complete");
}
}
Err(e) => {
tracing::warn!(relay = %relay.id, error = %e, "relay sync failed");
if let Err(e2) = command::fail_relay_sync(relay, e.to_string()).await {
tracing::error!(relay = %relay.id, error = %e2, "failed to record sync failure");
if let Err(error) = reconcile_relay_state("lagged").await {
tracing::error!(error = %error, "failed to reconcile relay state after lag");
}
}
Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
}
}
async fn try_sync_relay(&self, relay: &Relay) -> Result<()> {
// A relay is "new" (POST with a freshly generated secret) only if it has
// never completed a sync. `synced == 1` short-circuits the activity lookup;
// otherwise check the activity history so that a re-sync after an update
// (which resets `synced` to 0) PATCHes instead of clobbering the secret.
let is_new = relay.synced != 1
&& query::get_latest_activity_for_resource_and_type(&relay.id, "complete_relay_sync")
.await?
.is_none();
let mut body = serde_json::json!({
"host": format!("{}.{}", relay.subdomain, env::get().relay_domain),
"schema": relay.id,
"inactive": relay.status == RELAY_STATUS_INACTIVE
|| relay.status == RELAY_STATUS_DELINQUENT,
"info": {
"name": relay.info_name,
"icon": relay.info_icon,
"description": relay.info_description,
"pubkey": relay.tenant,
},
"policy": {
"public_join": relay.policy_public_join == 1,
"strip_signatures": relay.policy_strip_signatures == 1,
},
"groups": { "enabled": relay.groups_enabled == 1 },
"management": { "enabled": relay.management_enabled == 1 },
"blossom": if relay.blossom_enabled == 1 {
serde_json::json!({
"enabled": true,
"adapter": "s3",
"s3": {
"endpoint": env::get().blossom_s3_endpoint,
"region": env::get().blossom_s3_region,
"bucket": env::get().blossom_s3_bucket,
"access_key": env::get().blossom_s3_access_key,
"secret_key": env::get().blossom_s3_secret_key,
"key_prefix": relay.id,
},
})
} else {
serde_json::json!({ "enabled": false })
},
"livekit": if relay.livekit_enabled == 1 {
serde_json::json!({
"enabled": true,
"server_url": env::get().livekit_url,
"api_key": env::get().livekit_api_key,
"api_secret": env::get().livekit_api_secret,
})
} else {
serde_json::json!({ "enabled": false })
},
"push": { "enabled": relay.push_enabled == 1 },
"roles": {
"admin": { "can_manage": true, "can_invite": true },
"member": { "can_invite": true },
},
});
// Only provide a secret if the relay is new. This allows us to not store the relay secrets on our side.
if is_new && let Some(obj) = body.as_object_mut() {
obj.insert(
"secret".to_string(),
serde_json::Value::String(Keys::generate().secret_key().to_secret_hex()),
);
}
let method = if is_new { HttpMethod::POST } else { HttpMethod::PATCH };
self.request(method, &format!("relay/{}", relay.id), Some(&body))
.await?;
Ok(())
}
pub async fn list_relay_members(&self, relay_id: &str) -> Result<Vec<String>> {
#[derive(serde::Deserialize)]
struct MembersResponse {
members: Vec<String>,
}
let response = self
.request(HttpMethod::GET, &format!("relay/{relay_id}/members"), None)
.await?;
let parsed: MembersResponse = response.json().await?;
Ok(parsed.members)
}
// Internal utilities
/// Sends an authenticated request to the zooid API at `path` (relative to
/// `env.zooid_api_url`). Returns the response on 2xx; bails with the body
/// text otherwise.
async fn request(
&self,
method: HttpMethod,
path: &str,
body: Option<&serde_json::Value>,
) -> Result<reqwest::Response> {
let client = reqwest::Client::builder()
.timeout(Duration::from_secs(5))
.build()?;
let base = env::get().zooid_api_url.trim_end_matches('/');
let path = path.trim_start_matches('/');
let url = format!("{base}/{path}");
let auth = env::get().make_auth(&url, method).await?;
let reqwest_method = match method {
HttpMethod::GET => reqwest::Method::GET,
HttpMethod::POST => reqwest::Method::POST,
HttpMethod::PUT => reqwest::Method::PUT,
HttpMethod::PATCH => reqwest::Method::PATCH,
};
let mut req = client
.request(reqwest_method, &url)
.header("Authorization", auth);
if let Some(body) = body {
req = req.json(body);
}
let response = req.send().await?;
if !response.status().is_success() {
let status = response.status();
let text = response.text().await.unwrap_or_default();
anyhow::bail!("zooid {method} {path} returned {status}: {text}");
}
Ok(response)
}
}
async fn handle_activity(activity: &Activity) -> Result<()> {
let needs_sync = matches!(
activity.activity_type.as_str(),
"create_relay" | "update_relay" | "activate_relay" | "deactivate_relay" | "fail_relay_sync"
);
if activity.resource_type != "relay" || !needs_sync {
return Ok(());
}
if activity.activity_type == "fail_relay_sync" {
schedule_relay_sync_retry(&activity.resource_id, "activity").await?;
return Ok(());
}
let Some(relay) = query::get_relay(&activity.resource_id).await? else {
return Ok(());
};
sync_relay(&relay).await;
Ok(())
}
async fn reconcile_relay_state(source: &str) -> Result<()> {
let relays = query::list_relays_pending_sync().await?;
if relays.is_empty() {
return Ok(());
}
tracing::info!(source, relay_count = relays.len(), "reconciling pending relay state");
for relay in relays {
if relay.sync_error.trim().is_empty() {
sync_relay(&relay).await;
} else {
schedule_relay_sync_retry(&relay.id, source).await?;
}
}
Ok(())
}
async fn schedule_relay_sync_retry(relay_id: &str, source: &str) -> Result<()> {
fn get_retry_delay(consecutive_failures: usize) -> Option<Duration> {
let retry_attempt = consecutive_failures.max(1);
if retry_attempt > RELAY_SYNC_RETRY_MAX_ATTEMPTS {
return None;
}
let exponent = (retry_attempt - 1).min(31);
let multiplier = 1u64 << exponent;
let delay_secs = RELAY_SYNC_RETRY_BASE_DELAY_SECS
.saturating_mul(multiplier)
.min(RELAY_SYNC_RETRY_MAX_DELAY_SECS);
Some(Duration::from_secs(delay_secs))
}
let activities = query::list_activity_for_resource(relay_id).await?;
let consecutive_failures = activities
.iter()
.take_while(|activity| activity.activity_type == "fail_relay_sync")
.count();
let Some(delay) = get_retry_delay(consecutive_failures) else {
tracing::warn!(
relay = relay_id,
consecutive_failures,
max_attempts = RELAY_SYNC_RETRY_MAX_ATTEMPTS,
"relay sync retries exhausted; awaiting manual intervention"
);
return Ok(());
};
tracing::info!(
relay = relay_id,
source,
consecutive_failures,
delay_secs = delay.as_secs(),
"scheduled relay sync retry"
);
let relay_id = relay_id.to_string();
tokio::spawn(async move {
tokio::time::sleep(delay).await;
match query::get_relay(&relay_id).await {
Ok(Some(relay)) => sync_relay(&relay).await,
Ok(None) => {}
Err(e) => {
tracing::error!(relay = %relay_id, error = %e, "relay sync retry task failed");
}
}
});
Ok(())
}
async fn sync_relay(relay: &Relay) {
match try_sync_relay(relay).await {
Ok(()) => {
tracing::info!(relay = %relay.id, "relay sync succeeded");
if let Err(e) = command::complete_relay_sync(relay).await {
tracing::error!(relay = %relay.id, error = %e, "failed to mark sync complete");
}
}
Err(e) => {
tracing::warn!(relay = %relay.id, error = %e, "relay sync failed");
if let Err(e2) = command::fail_relay_sync(relay, e.to_string()).await {
tracing::error!(relay = %relay.id, error = %e2, "failed to record sync failure");
}
}
}
}
async fn try_sync_relay(relay: &Relay) -> Result<()> {
// A relay is "new" (POST with a freshly generated secret) only if it has
// never completed a sync. `synced == 1` short-circuits the activity lookup;
// otherwise check the activity history so that a re-sync after an update
// (which resets `synced` to 0) PATCHes instead of clobbering the secret.
let is_new = relay.synced != 1
&& query::get_latest_activity_for_resource_and_type(&relay.id, "complete_relay_sync")
.await?
.is_none();
let mut body = serde_json::json!({
"host": format!("{}.{}", relay.subdomain, env::get().relay_domain),
"schema": relay.id,
"inactive": relay.status == RELAY_STATUS_INACTIVE
|| relay.status == RELAY_STATUS_DELINQUENT,
"info": {
"name": relay.info_name,
"icon": relay.info_icon,
"description": relay.info_description,
"pubkey": relay.tenant_pubkey,
},
"policy": {
"public_join": relay.policy_public_join == 1,
"strip_signatures": relay.policy_strip_signatures == 1,
},
"groups": { "enabled": relay.groups_enabled == 1 },
"management": { "enabled": relay.management_enabled == 1 },
"blossom": if relay.blossom_enabled == 1 {
serde_json::json!({
"enabled": true,
"adapter": "s3",
"s3": {
"endpoint": env::get().blossom_s3_endpoint,
"region": env::get().blossom_s3_region,
"bucket": env::get().blossom_s3_bucket,
"access_key": env::get().blossom_s3_access_key,
"secret_key": env::get().blossom_s3_secret_key,
"key_prefix": relay.id,
},
})
} else {
serde_json::json!({ "enabled": false })
},
"livekit": if relay.livekit_enabled == 1 {
serde_json::json!({
"enabled": true,
"server_url": env::get().livekit_url,
"api_key": env::get().livekit_api_key,
"api_secret": env::get().livekit_api_secret,
})
} else {
serde_json::json!({ "enabled": false })
},
"push": { "enabled": relay.push_enabled == 1 },
"roles": {
"admin": { "can_manage": true, "can_invite": true },
"member": { "can_invite": true },
},
});
// Only provide a secret if the relay is new. This allows us to not store the relay secrets on our side.
if is_new && let Some(obj) = body.as_object_mut() {
obj.insert(
"secret".to_string(),
serde_json::Value::String(Keys::generate().secret_key().to_secret_hex()),
);
}
let method = if is_new { HttpMethod::POST } else { HttpMethod::PATCH };
request(method, &format!("relay/{}", relay.id), Some(&body)).await?;
Ok(())
}
/// Fetch the member pubkeys of a relay from the zooid API.
pub async fn list_relay_members(relay_id: &str) -> Result<Vec<String>> {
#[derive(serde::Deserialize)]
struct MembersResponse {
members: Vec<String>,
}
let response = request(HttpMethod::GET, &format!("relay/{relay_id}/members"), None).await?;
let parsed: MembersResponse = response.json().await?;
Ok(parsed.members)
}
/// Sends an authenticated request to the zooid API at `path` (relative to
/// `env.zooid_api_url`). Returns the response on 2xx; bails with the body
/// text otherwise.
async fn request(
method: HttpMethod,
path: &str,
body: Option<&serde_json::Value>,
) -> Result<reqwest::Response> {
let client = reqwest::Client::builder()
.timeout(Duration::from_secs(5))
.build()?;
let base = env::get().zooid_api_url.trim_end_matches('/');
let path = path.trim_start_matches('/');
let url = format!("{base}/{path}");
let auth = env::get().make_auth(&url, method).await?;
let reqwest_method = match method {
HttpMethod::GET => reqwest::Method::GET,
HttpMethod::POST => reqwest::Method::POST,
HttpMethod::PUT => reqwest::Method::PUT,
HttpMethod::PATCH => reqwest::Method::PATCH,
};
let mut req = client
.request(reqwest_method, &url)
.header("Authorization", auth);
if let Some(body) = body {
req = req.json(body);
}
let response = req.send().await?;
if !response.status().is_success() {
let status = response.status();
let text = response.text().await.unwrap_or_default();
anyhow::bail!("zooid {method} {path} returned {status}: {text}");
}
Ok(response)
}
+3 -5
View File
@@ -20,7 +20,6 @@ use tower_http::cors::{AllowOrigin, CorsLayer, Any};
use crate::api::Api;
use crate::billing::Billing;
use crate::infra::Infra;
use crate::robot::Robot;
use crate::stripe::Stripe;
@@ -39,9 +38,8 @@ async fn main() -> Result<()> {
let robot = Robot::new().await?;
let stripe = Stripe::new();
let infra = Infra::new();
let billing = Billing::new(robot.clone());
let api = Api::new(billing.clone(), stripe, robot, infra.clone());
let api = Api::new(billing.clone(), stripe, robot);
let parsed = env::get()
.server_allow_origins
@@ -55,8 +53,8 @@ async fn main() -> Result<()> {
let app = api.router().layer(cors);
tokio::spawn(async move {
infra.start().await;
tokio::spawn(async {
infra::start().await;
});
tokio::spawn(async move {
+35 -18
View File
@@ -1,21 +1,26 @@
use serde::{Deserialize, Serialize};
use sqlx::types::Json;
pub const RELAY_STATUS_ACTIVE: &str = "active";
pub const RELAY_STATUS_INACTIVE: &str = "inactive";
pub const RELAY_STATUS_DELINQUENT: &str = "delinquent";
#[derive(Debug, Clone, Serialize, Deserialize, sqlx::FromRow)]
pub struct Activity {
pub id: String,
pub tenant: String,
pub created_at: i64,
pub activity_type: String,
pub resource_type: String,
pub resource_id: String,
pub billed_at: Option<i64>,
/// The relay's plan at the time of a `create_relay`/`update_relay` activity;
/// `None` for all other activity types.
pub plan_id: Option<String>,
/// Per-resource_type snapshot of a resource's state captured on each activity,
/// stored as JSON in `activity.snapshot`. Tagged on `resource_type` so the JSON
/// is self-describing and the variant matches the activity row's column. Add a
/// variant per resource type that needs state preserved on the activity log.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "resource_type", rename_all = "snake_case")]
pub enum Snapshot {
Relay { plan: String, status: String },
}
impl Snapshot {
pub fn resource_type(&self) -> &'static str {
match self {
Self::Relay { .. } => "relay",
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
@@ -41,12 +46,24 @@ pub struct Tenant {
pub renewed_at: Option<i64>,
}
#[derive(Debug, Clone, Serialize, Deserialize, sqlx::FromRow)]
pub struct Activity {
pub id: String,
pub tenant_pubkey: String,
pub created_at: i64,
pub activity_type: String,
pub resource_type: String,
pub resource_id: String,
pub billed_at: Option<i64>,
pub snapshot: Json<Snapshot>,
}
#[derive(Debug, Clone, Serialize, Deserialize, sqlx::FromRow)]
pub struct Relay {
pub id: String,
pub tenant: String,
pub tenant_pubkey: String,
pub subdomain: String,
pub plan: String,
pub plan_id: String,
pub status: String,
pub sync_error: String,
pub info_name: String,
@@ -66,9 +83,9 @@ impl Default for Relay {
fn default() -> Self {
Self {
id: String::new(),
tenant: String::new(),
tenant_pubkey: String::new(),
subdomain: String::new(),
plan: String::new(),
plan_id: String::new(),
status: RELAY_STATUS_ACTIVE.to_string(),
sync_error: String::new(),
info_name: String::new(),
@@ -106,7 +123,7 @@ pub struct InvoiceItem {
pub activity_id: Option<String>,
pub tenant_pubkey: String,
pub relay_id: String,
pub plan: String,
pub plan_id: String,
pub amount: i64,
pub description: String,
pub created_at: i64,
@@ -123,7 +140,7 @@ pub struct Bolt11 {
pub settled_at: Option<i64>,
}
#[allow(dead_code)] // backs the `intent` table for the (not yet implemented) Stripe intent flow
#[allow(dead_code)] // mirrors the `intent` table; rows record paid Stripe PaymentIntents but aren't read back into this struct yet
#[derive(Debug, Clone, Serialize, Deserialize, sqlx::FromRow)]
pub struct Intent {
pub id: String,
+47 -44
View File
@@ -1,4 +1,4 @@
use anyhow::Result;
use anyhow::{Result, anyhow};
use crate::models::{Activity, Bolt11, Invoice, InvoiceItem, Plan, Relay, Tenant};
use crate::db::pool;
@@ -15,7 +15,7 @@ fn select_activity(tail: &str) -> String {
format!("SELECT * FROM activity {tail}")
}
// Plans
// --- Plans ---
pub fn list_plans() -> Vec<Plan> {
vec![
@@ -46,11 +46,14 @@ pub fn list_plans() -> Vec<Plan> {
]
}
pub fn get_plan(plan_id: &str) -> Option<Plan> {
list_plans().into_iter().find(|p| p.id == plan_id)
pub fn get_plan(plan_id: &str) -> Result<Plan> {
list_plans()
.into_iter()
.find(|p| p.id == plan_id)
.ok_or_else(|| anyhow!("plan not found: {plan_id}"))
}
// Tenants
// --- Tenants ---
pub async fn list_tenants() -> Result<Vec<Tenant>> {
Ok(sqlx::query_as::<_, Tenant>(&select_tenant(""))
@@ -65,7 +68,7 @@ pub async fn get_tenant(pubkey: &str) -> Result<Option<Tenant>> {
.await?)
}
// Relays
// --- Relays ---
pub async fn list_relays() -> Result<Vec<Relay>> {
Ok(sqlx::query_as::<_, Relay>(&select_relay(""))
@@ -81,9 +84,9 @@ pub async fn list_relays_pending_sync() -> Result<Vec<Relay>> {
)
}
pub async fn list_relays_for_tenant(tenant_id: &str) -> Result<Vec<Relay>> {
Ok(sqlx::query_as::<_, Relay>(&select_relay("WHERE tenant = ?"))
.bind(tenant_id)
pub async fn list_relays_for_tenant(tenant_pubkey: &str) -> Result<Vec<Relay>> {
Ok(sqlx::query_as::<_, Relay>(&select_relay("WHERE tenant_pubkey = ?"))
.bind(tenant_pubkey)
.fetch_all(pool())
.await?)
}
@@ -95,7 +98,25 @@ pub async fn get_relay(id: &str) -> Result<Option<Relay>> {
.await?)
}
// Invoices
/// The relay's plan immediately before `before`, read from the most recent
/// relay-activity snapshot with `created_at < before`. Billing uses this as
/// the `old` side of a plan-change delta.
pub async fn get_relay_plan_before(relay_id: &str, before: i64) -> Result<Option<String>> {
Ok(sqlx::query_scalar::<_, String>(
"SELECT json_extract(snapshot, '$.plan') FROM activity
WHERE resource_id = ?
AND resource_type = 'relay'
AND created_at < ?
ORDER BY created_at DESC
LIMIT 1",
)
.bind(relay_id)
.bind(before)
.fetch_optional(pool())
.await?)
}
// --- Invoices ---
pub async fn get_invoice(invoice_id: &str) -> Result<Option<Invoice>> {
Ok(sqlx::query_as::<_, Invoice>("SELECT * FROM invoice WHERE id = ?")
@@ -104,7 +125,7 @@ pub async fn get_invoice(invoice_id: &str) -> Result<Option<Invoice>> {
.await?)
}
pub async fn list_invoices(tenant_pubkey: &str) -> Result<Vec<Invoice>> {
pub async fn list_invoices_for_tenant(tenant_pubkey: &str) -> Result<Vec<Invoice>> {
Ok(sqlx::query_as::<_, Invoice>(
"SELECT * FROM invoice WHERE tenant_pubkey = ? ORDER BY created_at DESC",
)
@@ -113,7 +134,7 @@ pub async fn list_invoices(tenant_pubkey: &str) -> Result<Vec<Invoice>> {
.await?)
}
pub async fn get_latest_invoice(tenant_pubkey: &str) -> Result<Option<Invoice>> {
pub async fn get_latest_invoice_for_tenant(tenant_pubkey: &str) -> Result<Option<Invoice>> {
Ok(sqlx::query_as::<_, Invoice>(
"SELECT * FROM invoice WHERE tenant_pubkey = ? ORDER BY created_at DESC LIMIT 1",
)
@@ -131,24 +152,7 @@ pub async fn get_invoice_items_for_invoice(invoice_id: &str) -> Result<Vec<Invoi
)
}
/// The relay's plan immediately before `before`, read from the activity log
/// (the most recent `create_relay`/`update_relay` with `created_at < before`).
/// Billing uses this as the `old` side of a plan-change delta.
pub async fn get_relay_plan_before(relay_id: &str, before: i64) -> Result<Option<String>> {
Ok(sqlx::query_scalar::<_, String>(
"SELECT plan_id FROM activity
WHERE resource_id = ?
AND created_at < ?
AND activity_type IN ('create_relay', 'update_relay')
AND plan_id IS NOT NULL
ORDER BY created_at DESC
LIMIT 1",
)
.bind(relay_id)
.bind(before)
.fetch_optional(pool())
.await?)
}
// --- Bolt11 ---
pub async fn get_bolt11(bolt11_id: &str) -> Result<Option<Bolt11>> {
Ok(sqlx::query_as::<_, Bolt11>("SELECT * FROM bolt11 WHERE id = ?")
@@ -166,7 +170,7 @@ pub async fn get_bolt11_for_invoice(invoice_id: &str) -> Result<Option<Bolt11>>
.await?)
}
// Activity
// --- Activity ---
/// Billable activity for a tenant not yet folded into an invoice. The
/// activity-type filter and the `billed_at IS NULL` guard live here so the
@@ -174,7 +178,7 @@ pub async fn get_bolt11_for_invoice(invoice_id: &str) -> Result<Option<Bolt11>>
/// Ordered oldest-first so line items and proration apply in event order.
pub async fn list_billable_activity_for_tenant(tenant_pubkey: &str) -> Result<Vec<Activity>> {
Ok(sqlx::query_as::<_, Activity>(&select_activity(
"WHERE tenant = ?
"WHERE tenant_pubkey = ?
AND billed_at IS NULL
AND activity_type IN (
'create_relay', 'update_relay', 'activate_relay', 'deactivate_relay'
@@ -186,26 +190,25 @@ pub async fn list_billable_activity_for_tenant(tenant_pubkey: &str) -> Result<Ve
.await?)
}
/// A tenant's relay status/plan activity strictly before `before`, oldest-first
/// — folded by billing to reconstruct each relay's state as of a period boundary.
/// The relay's most recent activity strictly before `before`, or `None` if it
/// had no activity yet — i.e. the relay didn't exist at that point. Billing
/// reads its snapshot to recover the relay's state as of a period boundary.
/// Strict `<` so a relay created exactly at the boundary isn't counted active
/// there (its own creation charge covers that period).
pub async fn list_relay_activity_before(
tenant_pubkey: &str,
pub async fn get_latest_relay_activity_before(
relay_id: &str,
before: i64,
) -> Result<Vec<Activity>> {
) -> Result<Option<Activity>> {
Ok(sqlx::query_as::<_, Activity>(&select_activity(
"WHERE tenant = ?
"WHERE resource_id = ?
AND resource_type = 'relay'
AND activity_type IN (
'create_relay', 'update_relay', 'activate_relay', 'deactivate_relay'
)
AND created_at < ?
ORDER BY created_at ASC",
ORDER BY created_at DESC
LIMIT 1",
))
.bind(tenant_pubkey)
.bind(relay_id)
.bind(before)
.fetch_all(pool())
.fetch_optional(pool())
.await?)
}
+6
View File
@@ -7,6 +7,9 @@ use tokio::sync::Mutex;
use crate::env;
/// The service's Nostr identity: it publishes the robot's profile and relay
/// lists and sends encrypted direct messages to tenants, caching recipients'
/// relay lists between sends.
#[derive(Clone)]
pub struct Robot {
outbox_cache: std::sync::Arc<Mutex<HashMap<String, CacheEntry>>>,
@@ -20,6 +23,7 @@ struct CacheEntry {
}
impl Robot {
/// Build the robot and publish its Nostr identity (profile and relay lists).
pub async fn new() -> Result<Self> {
let robot = Self {
outbox_cache: std::sync::Arc::new(Mutex::new(HashMap::new())),
@@ -80,6 +84,7 @@ impl Robot {
Ok(())
}
/// Send an encrypted direct message to a recipient over their messaging relays.
pub async fn send_dm(&self, recipient: &str, message: &str) -> Result<()> {
let outbox = self.fetch_outbox_relays(recipient).await?;
if outbox.is_empty() {
@@ -123,6 +128,7 @@ impl Robot {
Ok(relays)
}
/// The recipient's display name from their Nostr profile, if they have one.
pub async fn fetch_nostr_name(&self, pubkey: &str) -> Option<String> {
let pubkey = PublicKey::parse(pubkey).ok()?;
let filter = Filter::new().author(pubkey).kind(Kind::Metadata).limit(1);
+4 -7
View File
@@ -19,14 +19,9 @@ pub async fn get_tenant_latest_invoice(
api.require_admin_or_tenant(&auth, &pubkey)?;
let tenant = api.get_tenant_or_404(&pubkey).await?;
// Roll any outstanding charges (and due renewals) into an invoice, then
// return the latest.
api.billing
.generate_invoice(&tenant)
.await
.map_err(internal)?;
api.billing.reconcile_subscription(&tenant).await.map_err(internal)?;
let invoice = query::get_latest_invoice(&pubkey).await.map_err(internal)?;
let invoice = query::get_latest_invoice_for_tenant(&pubkey).await.map_err(internal)?;
ok(invoice)
}
@@ -46,6 +41,8 @@ pub async fn get_invoice(
ok(invoice)
}
/// Return a payable Lightning invoice (bolt11) for an invoice, minting one if
/// needed and first settling it if it was already paid out of band.
pub async fn get_invoice_bolt11(
State(api): State<Arc<Api>>,
AuthedPubkey(auth): AuthedPubkey,
+2 -4
View File
@@ -11,8 +11,6 @@ pub async fn list_plans(State(_api): State<Arc<Api>>) -> ApiResult {
}
pub async fn get_plan(State(_api): State<Arc<Api>>, Path(id): Path<String>) -> ApiResult {
match query::get_plan(&id) {
Some(plan) => ok(plan),
None => Err(not_found("plan not found")),
}
let plan = query::get_plan(&id).map_err(|_| not_found("plan not found"))?;
ok(plan)
}
+27 -24
View File
@@ -9,7 +9,7 @@ use regex::Regex;
use serde::Deserialize;
use crate::api::{Api, AuthedPubkey};
use crate::{command, query};
use crate::{command, infra, query};
use crate::models::{
RELAY_STATUS_ACTIVE, RELAY_STATUS_DELINQUENT, RELAY_STATUS_INACTIVE, Relay,
};
@@ -34,7 +34,7 @@ pub async fn get_relay(
Path(id): Path<String>,
) -> ApiResult {
let relay = api.get_relay_or_404(&id).await?;
api.require_admin_or_tenant(&auth, &relay.tenant)?;
api.require_admin_or_tenant(&auth, &relay.tenant_pubkey)?;
ok(relay)
}
@@ -44,7 +44,7 @@ pub async fn list_relay_activity(
Path(id): Path<String>,
) -> ApiResult {
let relay = api.get_relay_or_404(&id).await?;
api.require_admin_or_tenant(&auth, &relay.tenant)?;
api.require_admin_or_tenant(&auth, &relay.tenant_pubkey)?;
let activity = query::list_activity_for_resource(&id)
.await
@@ -58,17 +58,17 @@ pub async fn list_relay_members(
Path(id): Path<String>,
) -> ApiResult {
let relay = api.get_relay_or_404(&id).await?;
api.require_admin_or_tenant(&auth, &relay.tenant)?;
api.require_admin_or_tenant(&auth, &relay.tenant_pubkey)?;
let members = fetch_relay_members(&api, &relay).await.map_err(internal)?;
let members = fetch_relay_members(&relay).await.map_err(internal)?;
ok(serde_json::json!({ "members": members }))
}
#[derive(Deserialize)]
pub struct CreateRelayRequest {
pub tenant: String,
pub tenant_pubkey: String,
pub subdomain: String,
pub plan: String,
pub plan_id: String,
pub info_name: String,
pub info_icon: String,
pub info_description: String,
@@ -86,7 +86,7 @@ pub async fn create_relay(
AuthedPubkey(auth): AuthedPubkey,
Json(payload): Json<CreateRelayRequest>,
) -> ApiResult {
api.require_admin_or_tenant(&auth, &payload.tenant)?;
api.require_admin_or_tenant(&auth, &payload.tenant_pubkey)?;
let relay_id = format!(
"{}_{}",
@@ -96,9 +96,9 @@ pub async fn create_relay(
let relay = Relay {
id: relay_id.clone(),
tenant: payload.tenant,
tenant_pubkey: payload.tenant_pubkey,
subdomain: payload.subdomain,
plan: payload.plan,
plan_id: payload.plan_id,
info_name: payload.info_name,
info_icon: payload.info_icon,
info_description: payload.info_description,
@@ -124,7 +124,7 @@ pub async fn create_relay(
#[derive(Deserialize)]
pub struct UpdateRelayRequest {
pub subdomain: Option<String>,
pub plan: Option<String>,
pub plan_id: Option<String>,
pub info_name: Option<String>,
pub info_icon: Option<String>,
pub info_description: Option<String>,
@@ -145,16 +145,16 @@ pub async fn update_relay(
) -> ApiResult {
let mut relay = api.get_relay_or_404(&id).await?;
api.require_admin_or_tenant(&auth, &relay.tenant)?;
api.require_admin_or_tenant(&auth, &relay.tenant_pubkey)?;
let current_plan = relay.plan.clone();
let requested_plan = payload.plan.clone();
let current_plan = relay.plan_id.clone();
let requested_plan = payload.plan_id.clone();
if let Some(v) = payload.subdomain {
relay.subdomain = v;
}
if let Some(v) = requested_plan.clone() {
relay.plan = v;
relay.plan_id = v;
}
if let Some(v) = payload.info_name {
relay.info_name = v;
@@ -194,10 +194,9 @@ pub async fn update_relay(
.is_some_and(|requested| requested != current_plan);
if plan_changed {
let selected_plan =
query::get_plan(&relay.plan).expect("validated plan must exist");
let selected_plan = query::get_plan(&relay.plan_id).map_err(internal)?;
if let Some(limit) = selected_plan.members {
let current_members = fetch_relay_members(&api, &relay)
let current_members = fetch_relay_members(&relay)
.await
.map_err(internal)?
.len() as i64;
@@ -225,7 +224,7 @@ pub async fn deactivate_relay(
Path(id): Path<String>,
) -> ApiResult {
let relay = api.get_relay_or_404(&id).await?;
api.require_admin_or_tenant(&auth, &relay.tenant)?;
api.require_admin_or_tenant(&auth, &relay.tenant_pubkey)?;
if relay.status == RELAY_STATUS_DELINQUENT {
return Err(bad_request("relay-is-delinquent", "relay is delinquent"));
@@ -248,7 +247,7 @@ pub async fn reactivate_relay(
Path(id): Path<String>,
) -> ApiResult {
let relay = api.get_relay_or_404(&id).await?;
api.require_admin_or_tenant(&auth, &relay.tenant)?;
api.require_admin_or_tenant(&auth, &relay.tenant_pubkey)?;
if relay.status == RELAY_STATUS_DELINQUENT {
return Err(bad_request("relay-is-delinquent", "relay is delinquent"));
@@ -265,12 +264,12 @@ pub async fn reactivate_relay(
// --- helpers ----------------------------------------------------------------
async fn fetch_relay_members(api: &Api, relay: &Relay) -> Result<Vec<String>> {
async fn fetch_relay_members(relay: &Relay) -> Result<Vec<String>> {
if relay.synced == 0 {
return Ok(Vec::new());
}
api.infra.list_relay_members(&relay.id).await
infra::list_relay_members(&relay.id).await
}
const RESERVED_SUBDOMAINS: [&str; 3] = ["api", "admin", "internal"];
@@ -278,14 +277,17 @@ const RESERVED_SUBDOMAINS: [&str; 3] = ["api", "admin", "internal"];
static SUBDOMAIN_RE: LazyLock<Regex> =
LazyLock::new(|| Regex::new(r"^[a-z0-9][a-z0-9-]{1,61}[a-z0-9]$").unwrap());
/// Validate and normalize a relay before persistence: enforce the subdomain
/// format and reserved names, require an existing plan that permits any enabled
/// premium features, and coerce the boolean columns to 0/1.
fn prepare_relay(mut relay: Relay) -> Result<Relay, ApiError> {
if !SUBDOMAIN_RE.is_match(&relay.subdomain)
|| RESERVED_SUBDOMAINS.contains(&relay.subdomain.as_str()) {
return Err(unprocessable("invalid-subdomain", "subdomain is invalid"));
}
let plan = query::get_plan(&relay.plan)
.ok_or_else(|| unprocessable("invalid-plan", "plan not found"))?;
let plan = query::get_plan(&relay.plan_id)
.map_err(|_| unprocessable("invalid-plan", "plan not found"))?;
if (!plan.blossom && relay.blossom_enabled == 1) || (!plan.livekit && relay.livekit_enabled == 1) {
return Err(unprocessable("premium-feature", "feature requires a paid plan"));
@@ -302,6 +304,7 @@ fn prepare_relay(mut relay: Relay) -> Result<Relay, ApiError> {
Ok(relay)
}
/// Translate a duplicate-subdomain write into a 422; anything else is a 500.
fn map_relay_write_error(e: anyhow::Error) -> ApiError {
if matches!(map_unique_error(&e), Some("subdomain-exists")) {
unprocessable("subdomain-exists", "subdomain already exists")
+7 -2
View File
@@ -48,6 +48,9 @@ pub async fn list_tenants(
.collect::<Vec<_>>())
}
/// Create the tenant row for the calling pubkey and provision its Stripe
/// customer. Idempotent: an existing tenant (including one created by a
/// concurrent unique-constraint race) is returned as-is.
pub async fn create_tenant(
State(api): State<Arc<Api>>,
AuthedPubkey(pubkey): AuthedPubkey,
@@ -138,7 +141,7 @@ pub async fn list_tenant_relays(
ok(relays)
}
/// List a tenant's invoices, most recent first.
pub async fn list_tenant_invoices(
State(api): State<Arc<Api>>,
AuthedPubkey(auth): AuthedPubkey,
@@ -146,7 +149,7 @@ pub async fn list_tenant_invoices(
) -> ApiResult {
api.require_admin_or_tenant(&auth, &pubkey)?;
let invoices = query::list_invoices(&pubkey)
let invoices = query::list_invoices_for_tenant(&pubkey)
.await
.map_err(internal)?;
@@ -158,6 +161,8 @@ pub struct StripeSessionParams {
return_url: Option<String>,
}
/// Create a Stripe billing-portal session for the tenant to manage their saved
/// payment methods, returning the portal URL.
pub async fn create_stripe_session(
State(api): State<Arc<Api>>,
AuthedPubkey(auth): AuthedPubkey,
+10 -2
View File
@@ -12,13 +12,17 @@ use crate::env;
const STRIPE_API: &str = "https://api.stripe.com/v1";
// Stripe struct and impl
#[derive(Clone)]
pub struct Stripe {
http: reqwest::Client,
}
impl Default for Stripe {
fn default() -> Self {
Self::new()
}
}
impl Stripe {
pub fn new() -> Self {
Self {
@@ -54,6 +58,8 @@ impl Stripe {
// --- Customers ---
/// Create a Stripe customer for a tenant and return its id. Idempotent on
/// `tenant_pubkey` so retrying a tenant's creation reuses the same customer.
pub async fn create_customer(&self, tenant_pubkey: &str, name: &str) -> Result<String> {
let body = self
.post("/customers")
@@ -142,6 +148,8 @@ impl Stripe {
// --- Portal ---
/// Open a Stripe billing-portal session for the customer, returning the URL
/// where they can manage their saved payment methods.
pub async fn create_portal_session(
&self,
customer_id: &str,
+3
View File
@@ -4,6 +4,9 @@ use nwc::prelude::{
TransactionState,
};
/// A Nostr Wallet Connect wallet, used both as the service's receiving wallet
/// and as a tenant's paying wallet. Each call spins up and shuts down its own
/// short-lived NWC client; nothing is pooled across calls.
#[derive(Clone)]
pub struct Wallet {
url: NostrWalletConnectURI,
+3 -3
View File
@@ -32,8 +32,8 @@ export default function PaymentDialog(props: PaymentDialogProps) {
const billedRelays = createMemo(() => {
const planById = new Map(plans().map((p) => [p.id, p]))
return (relays() ?? [])
.map((relay) => ({ relay, plan: planById.get(relay.plan) }))
.filter((entry) => entry.plan?.amount > 0)
.map((relay) => ({ relay, plan: planById.get(relay.plan_id) }))
.filter((entry) => Boolean(entry.plan?.amount))
})
async function loadBolt11() {
@@ -147,7 +147,7 @@ export default function PaymentDialog(props: PaymentDialogProps) {
<li class="flex items-center justify-between gap-3 text-sm">
<span class="truncate text-gray-900">{relay.info_name || relay.subdomain}</span>
<span class="flex-shrink-0 text-xs text-gray-500">
{plan?.name ?? relay.plan}
{plan?.name ?? relay.plan_id}
<Show when={plan}> · ${(plan!.amount / 100).toFixed(2)}/mo</Show>
</span>
</li>
+4 -4
View File
@@ -32,9 +32,9 @@ function memberLabel(members: number | null) {
type PricingTableProps = {
selectable?: boolean
selectedPlan?: PlanId
onSelect?: (plan: PlanId) => void
onCta?: (plan: PlanId) => void
selectedPlanId?: PlanId
onSelect?: (planId: PlanId) => void
onCta?: (planId: PlanId) => void
}
export default function PricingTable(props: PricingTableProps) {
@@ -43,7 +43,7 @@ export default function PricingTable(props: PricingTableProps) {
<For each={plans()}>
{(plan) => {
const isPopular = plan.id === "basic"
const isSelected = () => props.selectable && props.selectedPlan === plan.id
const isSelected = () => props.selectable && props.selectedPlanId === plan.id
const card = (
<>
+12 -12
View File
@@ -63,7 +63,7 @@ type RelayDetailCardProps = {
onToggleMediaStorage?: () => void
onToggleLivekitSupport?: () => void
onTogglePushNotifications?: () => void
onUpdatePlan?: (plan: PlanId) => Promise<void>
onUpdatePlan?: (planId: PlanId) => Promise<void>
enforcePlanLimits?: boolean
showPlanActions?: boolean
}
@@ -76,17 +76,17 @@ export default function RelayDetailCard(props: RelayDetailCardProps) {
return fallback
}
const [menuOpen, setMenuOpen] = createSignal(false)
const [plan, setPlan] = createSignal<PlanId>(props.relay.plan)
const [planId, setPlanId] = createSignal<PlanId>(props.relay.plan_id)
const [pendingAction, setPendingAction] = createSignal<"deactivate" | "reactivate" | null>(null)
let menuContainerRef: HTMLDivElement | undefined
const memberLimitLabel = () => {
const p = plans().find(p => p.id === r().plan)
const p = plans().find(p => p.id === r().plan_id)
if (!p) return "?"
return p.members === null ? "∞" : String(p.members)
}
const planLimited = () => (props.enforcePlanLimits ?? true) && r().plan === "free"
const planLimited = () => (props.enforcePlanLimits ?? true) && r().plan_id === "free"
const showPlanActions = () => props.showPlanActions ?? true
const actionBusy = () => pendingAction() === "deactivate" ? !!props.deactivating : pendingAction() === "reactivate" ? !!props.reactivating : false
const relayLabel = () => r().info_name || r().subdomain
@@ -107,11 +107,11 @@ export default function RelayDetailCard(props: RelayDetailCardProps) {
const confirmBusyLabel = () => pendingAction() === "deactivate" ? "Deactivating..." : "Reactivating..."
const confirmTone = () => pendingAction() === "deactivate" ? "danger" : "primary"
async function changePlan(plan: PlanId) {
setPlan(plan)
async function changePlanId(planId: PlanId) {
setPlanId(planId)
try {
await props.onUpdatePlan?.(plan)
setToastMessage(`Plan updated to ${plan}`, "success")
await props.onUpdatePlan?.(planId)
setToastMessage(`Plan updated to ${planId}`, "success")
} catch {
// error is handled by the caller
}
@@ -360,7 +360,7 @@ export default function RelayDetailCard(props: RelayDetailCardProps) {
</Field>
<Show when={props.showTenant}>
<Field label="Tenant">
<span class="font-mono text-xs break-all">{r().tenant}</span>
<span class="font-mono text-xs break-all">{r().tenant_pubkey}</span>
</Field>
</Show>
</MembershipSection>
@@ -373,15 +373,15 @@ export default function RelayDetailCard(props: RelayDetailCardProps) {
when={props.onUpdatePlan}
fallback={
<Field label="Current plan">
<span class="capitalize text-gray-900">{r().plan}</span>
<span class="capitalize text-gray-900">{r().plan_id}</span>
</Field>
}
>
<div class="lg:col-span-2 space-y-4">
<PricingTable
selectable
selectedPlan={plan()}
onSelect={changePlan}
selectedPlanId={planId()}
onSelect={changePlanId}
/>
</div>
</Show>
+8 -8
View File
@@ -5,7 +5,7 @@ import { validateSubdomainLabel } from "@/lib/subdomain"
import { setToastMessage } from "@/components/Toast"
import { plans } from "@/lib/state"
export type RelayFormValues = Pick<Relay, "info_name" | "subdomain" | "info_icon" | "info_description" | "plan">
export type RelayFormValues = Pick<Relay, "info_name" | "subdomain" | "info_icon" | "info_description" | "plan_id">
type RelayFormProps = {
initialValues?: Partial<RelayFormValues>
@@ -16,8 +16,8 @@ type RelayFormProps = {
}
export default function RelayForm(props: RelayFormProps) {
const defaultPlanId = createMemo(() => props.initialValues?.plan ?? plans()[0]?.id ?? "free")
const [plan, setPlan] = createSignal(defaultPlanId())
const defaultPlanId = createMemo(() => props.initialValues?.plan_id ?? plans()[0]?.id ?? "free")
const [planId, setPlanId] = createSignal(defaultPlanId())
const [name, setName] = createSignal(props.initialValues?.info_name ?? "")
const [subdomain, setSubdomain] = createSignal(props.initialValues?.subdomain ?? "")
const [icon, setIcon] = createSignal(props.initialValues?.info_icon ?? "")
@@ -27,7 +27,7 @@ export default function RelayForm(props: RelayFormProps) {
async function handleSubmit(e: Event) {
e.preventDefault()
if (!plan()) {
if (!planId()) {
setToastMessage("Please select a plan")
return
}
@@ -43,7 +43,7 @@ export default function RelayForm(props: RelayFormProps) {
try {
await props.onSubmit({
plan: plan(),
plan_id: planId(),
info_name: name(),
subdomain: subdomain(),
info_icon: icon(),
@@ -56,7 +56,7 @@ export default function RelayForm(props: RelayFormProps) {
}
}
createEffect(() => setPlan(defaultPlanId()))
createEffect(() => setPlanId(defaultPlanId()))
createEffect(() => {
if (props.syncSubdomainWithName) {
@@ -112,8 +112,8 @@ export default function RelayForm(props: RelayFormProps) {
{(p) => (
<button
type="button"
onClick={() => setPlan(p.id)}
class={`border-2 rounded-xl p-4 text-left transition-colors ${plan() === p.id ? "border-blue-600 bg-blue-50" : "border-gray-200 hover:border-gray-300"}`}
onClick={() => setPlanId(p.id)}
class={`border-2 rounded-xl p-4 text-left transition-colors ${planId() === p.id ? "border-blue-600 bg-blue-50" : "border-gray-200 hover:border-gray-300"}`}
>
<div class="font-bold text-gray-900">{p.name}</div>
<div class="text-sm text-gray-500 mt-1">
+1 -1
View File
@@ -17,7 +17,7 @@ export default function RelayListItem(props: RelayListItemProps) {
<p class="font-medium text-gray-900">{props.relay.info_name || props.relay.subdomain}</p>
<p class="text-xs text-gray-500">{props.relay.subdomain}.spaces.coracle.social</p>
{props.showTenant && (
<p class="text-xs text-gray-500 break-all mt-1">Tenant: {props.relay.tenant}</p>
<p class="text-xs text-gray-500 break-all mt-1">Tenant: {props.relay.tenant_pubkey}</p>
)}
</div>
<Show
+6 -6
View File
@@ -44,9 +44,9 @@ export type PlanId = string
export type Relay = {
id: string
tenant: string
tenant_pubkey: string
subdomain: string
plan: PlanId
plan_id: PlanId
status: string
sync_error: string
synced: number
@@ -63,9 +63,9 @@ export type Relay = {
}
export type CreateRelayInput = {
tenant?: string
tenant_pubkey?: string
subdomain: string
plan: string
plan_id: string
info_name?: string
info_icon?: string
info_description?: string
@@ -80,7 +80,7 @@ export type CreateRelayInput = {
export type UpdateRelayInput = {
subdomain?: string
plan?: string
plan_id?: string
info_name?: string
info_icon?: string
info_description?: string
@@ -115,7 +115,7 @@ export type Invoice = {
export type Activity = {
id: string
tenant: string
tenant_pubkey: string
created_at: number
activity_type: string
resource_type: string
+3 -3
View File
@@ -116,8 +116,8 @@ export const createRelayForActiveTenant = (input: CreateRelayInput) => {
const overrides = {
tenant: account()!.pubkey,
blossom_enabled: input.plan === "free" ? 0 : 1,
livekit_enabled: input.plan === "free" ? 0 : 1,
blossom_enabled: input.plan_id === "free" ? 0 : 1,
livekit_enabled: input.plan_id === "free" ? 0 : 1,
}
return createRelay({...defaults, ...input, ...overrides})
@@ -127,7 +127,7 @@ export const updateActiveTenant = (input: { nwc_url?: string }) => updateTenant(
export const updateRelayById = (id: string, input: UpdateRelayInput) => updateRelay(id, input)
export const updateRelayPlanById = (id: string, plan: string) => updateRelay(id, { plan })
export const updateRelayPlanById = (id: string, plan_id: string) => updateRelay(id, { plan_id })
export const deactivateRelayById = (id: string) => deactivateRelay(id)
+7 -7
View File
@@ -77,14 +77,14 @@ export default function useRelayToggles(
}
}
async function handleUpdatePlan(plan: PlanId) {
async function handleUpdatePlan(plan_id: PlanId) {
const current = relay()
if (!current) return
const previous = current
const next = { ...current, plan }
const update: Record<string, unknown> = { plan }
if (plan === "free") {
const next = { ...current, plan_id }
const update: Record<string, unknown> = { plan_id }
if (plan_id === "free") {
next.blossom_enabled = 0
next.livekit_enabled = 0
update.blossom_enabled = 0
@@ -101,7 +101,7 @@ export default function useRelayToggles(
throw e
}
if (plan !== "free") {
if (plan_id !== "free") {
const needsSetup = await tenantNeedsPaymentSetup()
if (needsSetup) {
const invoice = await getLatestOpenInvoice()
@@ -116,9 +116,9 @@ export default function useRelayToggles(
onToggleStripSignatures: () => toggle("policy_strip_signatures", false),
onToggleGroups: () => toggle("groups_enabled", true),
onToggleManagement: () => toggle("management_enabled", true),
onToggleMediaStorage: () => toggle("blossom_enabled", relay()?.plan !== "free"),
onToggleMediaStorage: () => toggle("blossom_enabled", relay()?.plan_id !== "free"),
onTogglePushNotifications: () => toggle("push_enabled", true),
onToggleLivekitSupport: () => toggle("livekit_enabled", relay()?.plan !== "free"),
onToggleLivekitSupport: () => toggle("livekit_enabled", relay()?.plan_id !== "free"),
}
return { busy, handleDeactivate, handleReactivate, handleUpdatePlan, pendingInvoice, clearPendingInvoice: () => setPendingInvoice(undefined), pendingPaymentSetup, clearPendingPaymentSetup: () => setPendingPaymentSetup(false), toggles }
+4 -4
View File
@@ -17,10 +17,10 @@ export default function Home() {
const [showRelayModal, setShowRelayModal] = createSignal(false)
const [showLoginModal, setShowLoginModal] = createSignal(false)
const [draftRelay, setDraftRelay] = createSignal<RelayFormValues>()
const [initialPlan, setInitialPlan] = createSignal<RelayFormValues["plan"]>("free")
const [initialPlanId, setInitialPlanId] = createSignal<RelayFormValues["plan_id"]>("free")
function openRelayModal(plan: RelayFormValues["plan"] = "free") {
setInitialPlan(plan)
function openRelayModal(planId: RelayFormValues["plan_id"] = "free") {
setInitialPlanId(planId)
setShowRelayModal(true)
}
@@ -404,7 +404,7 @@ export default function Home() {
<RelayForm
syncSubdomainWithName
initialValues={{ plan: initialPlan() }}
initialValues={{ plan_id: initialPlanId() }}
onSubmit={onRelayFormSubmit}
submitLabel="Continue"
submittingLabel="Creating..."
+1 -1
View File
@@ -45,7 +45,7 @@ export default function RelayDetail() {
const isPaidRelay = createMemo(() => {
const r = relay()
if (!r) return false
const plan = plans().find(p => p.id === r.plan)
const plan = plans().find(p => p.id === r.plan_id)
return !!(plan && plan.amount > 0)
})
+1 -1
View File
@@ -18,7 +18,7 @@ export default function RelayNew() {
const relay = await createRelayForActiveTenant(values)
createdRelayId = relay.id
if (values.plan !== "free") {
if (values.plan_id !== "free") {
const needsSetup = await tenantNeedsPaymentSetup()
if (needsSetup) {
const invoice = await getLatestOpenInvoice()