Compare commits
7 Commits
cd70ca6654
...
f7bd3e53fe
| Author | SHA1 | Date | |
|---|---|---|---|
| f7bd3e53fe | |||
| eb0123abef | |||
| 9f599d66be | |||
| 72b30489b9 | |||
| b11fb5dc25 | |||
| 35d9aab02a | |||
| 0f47b483aa |
@@ -1,14 +1,3 @@
|
||||
CREATE TABLE IF NOT EXISTS activity (
|
||||
id TEXT PRIMARY KEY,
|
||||
tenant TEXT NOT NULL,
|
||||
created_at INTEGER NOT NULL,
|
||||
activity_type TEXT NOT NULL,
|
||||
resource_type TEXT NOT NULL,
|
||||
resource_id TEXT NOT NULL,
|
||||
billed_at INTEGER,
|
||||
plan_id TEXT
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS tenant (
|
||||
pubkey TEXT PRIMARY KEY,
|
||||
nwc_url TEXT NOT NULL DEFAULT '',
|
||||
@@ -19,11 +8,24 @@ CREATE TABLE IF NOT EXISTS tenant (
|
||||
renewed_at INTEGER
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS activity (
|
||||
id TEXT PRIMARY KEY,
|
||||
tenant_pubkey TEXT NOT NULL,
|
||||
created_at INTEGER NOT NULL,
|
||||
activity_type TEXT NOT NULL,
|
||||
resource_type TEXT NOT NULL,
|
||||
resource_id TEXT NOT NULL,
|
||||
billed_at INTEGER,
|
||||
snapshot TEXT NOT NULL,
|
||||
FOREIGN KEY (tenant_pubkey) REFERENCES tenant(pubkey)
|
||||
);
|
||||
|
||||
|
||||
CREATE TABLE IF NOT EXISTS relay (
|
||||
id TEXT PRIMARY KEY,
|
||||
tenant TEXT NOT NULL,
|
||||
tenant_pubkey TEXT NOT NULL,
|
||||
subdomain TEXT NOT NULL UNIQUE,
|
||||
plan TEXT NOT NULL,
|
||||
plan_id TEXT NOT NULL,
|
||||
status TEXT NOT NULL,
|
||||
synced INTEGER NOT NULL DEFAULT 0,
|
||||
sync_error TEXT NOT NULL DEFAULT '',
|
||||
@@ -37,7 +39,7 @@ CREATE TABLE IF NOT EXISTS relay (
|
||||
blossom_enabled INTEGER NOT NULL DEFAULT 0,
|
||||
livekit_enabled INTEGER NOT NULL DEFAULT 0,
|
||||
push_enabled INTEGER NOT NULL DEFAULT 1,
|
||||
FOREIGN KEY (tenant) REFERENCES tenant(pubkey)
|
||||
FOREIGN KEY (tenant_pubkey) REFERENCES tenant(pubkey)
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS invoice (
|
||||
@@ -58,7 +60,7 @@ CREATE TABLE IF NOT EXISTS invoice_item (
|
||||
activity_id TEXT,
|
||||
tenant_pubkey TEXT NOT NULL,
|
||||
relay_id TEXT NOT NULL,
|
||||
plan TEXT NOT NULL,
|
||||
plan_id TEXT NOT NULL,
|
||||
amount INTEGER NOT NULL,
|
||||
description TEXT NOT NULL DEFAULT '',
|
||||
created_at INTEGER NOT NULL,
|
||||
@@ -84,13 +86,13 @@ CREATE TABLE IF NOT EXISTS intent (
|
||||
FOREIGN KEY (invoice_id) REFERENCES invoice(id)
|
||||
);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_relay_tenant ON relay (tenant);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_activity_tenant_created ON activity (tenant, created_at);
|
||||
CREATE INDEX IF NOT EXISTS idx_activity_tenant_created ON activity (tenant_pubkey, created_at);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_activity_resource_created ON activity (resource_id, created_at);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_activity_unbilled ON activity (tenant, created_at) WHERE billed_at IS NULL;
|
||||
CREATE INDEX IF NOT EXISTS idx_activity_unbilled ON activity (tenant_pubkey, created_at) WHERE billed_at IS NULL;
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_relay_tenant_pubkey ON relay (tenant_pubkey);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_invoice_tenant_created ON invoice (tenant_pubkey, created_at);
|
||||
|
||||
@@ -98,6 +100,9 @@ CREATE INDEX IF NOT EXISTS idx_invoice_item_invoice ON invoice_item (invoice_id)
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_invoice_item_outstanding ON invoice_item (tenant_pubkey) WHERE invoice_id IS NULL;
|
||||
|
||||
-- At most one line item per billable activity to ensure no double-billing.
|
||||
CREATE UNIQUE INDEX IF NOT EXISTS idx_invoice_item_activity ON invoice_item (activity_id);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_bolt11_invoice_created ON bolt11 (invoice_id, created_at);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_intent_invoice ON intent (invoice_id);
|
||||
|
||||
+1
-4
@@ -29,7 +29,6 @@ use nostr_sdk::{Event, JsonUtil, Kind};
|
||||
|
||||
use crate::billing::Billing;
|
||||
use crate::env;
|
||||
use crate::infra::Infra;
|
||||
use crate::models::{Relay, Tenant};
|
||||
use crate::query;
|
||||
use crate::robot::Robot;
|
||||
@@ -52,16 +51,14 @@ pub struct Api {
|
||||
pub billing: Billing,
|
||||
pub stripe: Stripe,
|
||||
pub robot: Robot,
|
||||
pub infra: Infra,
|
||||
}
|
||||
|
||||
impl Api {
|
||||
pub fn new(billing: Billing, stripe: Stripe, robot: Robot, infra: Infra) -> Self {
|
||||
pub fn new(billing: Billing, stripe: Stripe, robot: Robot) -> Self {
|
||||
Self {
|
||||
billing,
|
||||
stripe,
|
||||
robot,
|
||||
infra,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
+142
-287
@@ -1,17 +1,18 @@
|
||||
use anyhow::{Result, anyhow};
|
||||
use std::collections::HashMap;
|
||||
use std::time::Duration;
|
||||
|
||||
use crate::bitcoin;
|
||||
use crate::command;
|
||||
use crate::db;
|
||||
use crate::env;
|
||||
use crate::models::{Activity, Bolt11, Invoice, InvoiceItem, Tenant};
|
||||
use crate::models::{Activity, Bolt11, Invoice, InvoiceItem, RELAY_STATUS_ACTIVE, Snapshot, Tenant};
|
||||
use crate::query;
|
||||
use crate::robot::Robot;
|
||||
use crate::stripe::Stripe;
|
||||
use crate::wallet::Wallet;
|
||||
|
||||
/// Owns subscription billing: it reconciles tenant activity into invoice items,
|
||||
/// renews subscriptions each period, and collects payment (Lightning, then a
|
||||
/// card on file, then a manual DM link).
|
||||
#[derive(Clone)]
|
||||
pub struct Billing {
|
||||
stripe: Stripe,
|
||||
@@ -31,107 +32,25 @@ impl Billing {
|
||||
// --- lifecycle methods ---
|
||||
|
||||
pub async fn start(self) {
|
||||
let mut rx = db::subscribe();
|
||||
|
||||
tokio::spawn({
|
||||
let billing = self.clone();
|
||||
async move { billing.poll().await }
|
||||
});
|
||||
|
||||
if let Err(error) = self.reconcile_subscriptions("startup").await {
|
||||
tracing::error!(error = %error, "failed to reconcile subscriptions on startup");
|
||||
}
|
||||
|
||||
loop {
|
||||
match rx.recv().await {
|
||||
Ok(activity) => {
|
||||
if let Err(e) = self.handle_activity(&activity).await {
|
||||
tracing::error!(error = %e, "billing handle_activity failed");
|
||||
}
|
||||
}
|
||||
Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
|
||||
tracing::warn!(missed = n, "billing lagged, reconciling all subscriptions");
|
||||
|
||||
if let Err(error) = self.reconcile_subscriptions("lagged").await {
|
||||
tracing::error!(error = %error, "failed to reconcile after lag");
|
||||
}
|
||||
}
|
||||
Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn poll(&self) {
|
||||
let mut interval = tokio::time::interval(POLL_INTERVAL);
|
||||
|
||||
loop {
|
||||
interval.tick().await;
|
||||
|
||||
if let Err(error) = self.autogenerate_invoices().await {
|
||||
if let Err(error) = self.reconcile_subscriptions().await {
|
||||
tracing::error!(error = %error, "billing poll failed");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn autogenerate_invoices(&self) -> Result<()> {
|
||||
async fn reconcile_subscriptions(&self) -> Result<()> {
|
||||
let tenants = query::list_tenants().await?;
|
||||
|
||||
tracing::info!(
|
||||
tenant_count = tenants.len(),
|
||||
"polling tenants for subscription renewal"
|
||||
);
|
||||
|
||||
for tenant in tenants {
|
||||
if let Err(error) = self.autogenerate_invoice(&tenant).await {
|
||||
tracing::error!(
|
||||
tenant = %tenant.pubkey,
|
||||
error = ?error,
|
||||
"failed to autogenerate invoice"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Poll entry point: generate the tenant's invoice for the current period
|
||||
/// (adding any due renewals) and, if one results, collect payment.
|
||||
async fn autogenerate_invoice(&self, tenant: &Tenant) -> Result<()> {
|
||||
if let Some(invoice) = self.generate_invoice(tenant).await? {
|
||||
self.attempt_payment(tenant, &invoice).await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn handle_activity(&self, activity: &Activity) -> Result<()> {
|
||||
let should_reconcile = matches!(
|
||||
activity.activity_type.as_str(),
|
||||
"create_relay" | "update_relay" | "activate_relay" | "deactivate_relay"
|
||||
);
|
||||
|
||||
if should_reconcile
|
||||
&& let Some(tenant) = query::get_tenant(&activity.tenant).await?
|
||||
{
|
||||
self.reconcile_subscription(&tenant).await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn reconcile_subscriptions(&self, source: &str) -> Result<()> {
|
||||
let tenants = query::list_tenants().await?;
|
||||
|
||||
tracing::info!(
|
||||
source,
|
||||
tenant_count = tenants.len(),
|
||||
"reconciling all subscriptions"
|
||||
);
|
||||
tracing::info!(tenant_count = tenants.len(), "reconciling all subscriptions");
|
||||
|
||||
for tenant in tenants {
|
||||
if let Err(error) = self.reconcile_subscription(&tenant).await {
|
||||
tracing::error!(
|
||||
source,
|
||||
tenant = %tenant.pubkey,
|
||||
error = ?error,
|
||||
"failed to reconcile subscription"
|
||||
@@ -142,11 +61,17 @@ impl Billing {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// --- Reconciliation, renewal, and on-demand billing ---
|
||||
// --- Reconciliation of activity/renewals ---
|
||||
|
||||
async fn reconcile_subscription(&self, tenant: &Tenant) -> Result<()> {
|
||||
/// Lists billable activity, setting the tenant's billing anchor to the first
|
||||
/// activity in the process. Generates an invoice for the current period if due
|
||||
/// for renewal or any billable activities have occurred. Attempts payment if
|
||||
/// an invoice is generated.
|
||||
pub async fn reconcile_subscription(&self, tenant: &Tenant) -> Result<()> {
|
||||
let mut tenant = tenant.clone();
|
||||
|
||||
// Reconcile all activity, setting the tenant's billing anchor on the first
|
||||
// positive-balance line item if not already set.
|
||||
for activity in query::list_billable_activity_for_tenant(&tenant.pubkey).await? {
|
||||
if tenant.billing_anchor.is_none() {
|
||||
tenant.billing_anchor = Some(activity.created_at);
|
||||
@@ -156,6 +81,21 @@ impl Billing {
|
||||
self.reconcile_activity(&tenant, &activity).await?;
|
||||
}
|
||||
|
||||
// If the tenant has no billing anchor, they have nothing to bill
|
||||
let Some(period) = BillingPeriod::current(&tenant) else {
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
// If tenant is due for renewal, bill any active relays.
|
||||
if tenant.renewed_at.is_none_or(|at| at < period.start) {
|
||||
self.reconcile_renewal(&tenant, &period).await?;
|
||||
}
|
||||
|
||||
// Create the invoice, but only if non-zero and attempt payment
|
||||
if let Some(invoice) = command::create_invoice(&tenant, &period).await? {
|
||||
self.attempt_payment(&tenant, &invoice).await?;
|
||||
};
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -182,14 +122,14 @@ impl Billing {
|
||||
};
|
||||
|
||||
match invoice_item {
|
||||
Some(item) => command::insert_invoice_item_for_activity(&item, &activity.id).await,
|
||||
Some(ref item) => command::insert_invoice_item_for_activity(&item, &activity.id).await,
|
||||
None => command::mark_activity_billed(&activity.id).await,
|
||||
}
|
||||
}
|
||||
|
||||
/// A prorated charge (or credit, with `sign` = -1) for the relay's current
|
||||
/// plan. `None` for a missing relay or a free plan. Mid-period items don't
|
||||
/// stamp `period_start` — the renewal decides coverage from activity history.
|
||||
/// plan, covering the fraction of the period remaining at the activity.
|
||||
/// `None` for a missing relay or a free plan.
|
||||
async fn make_prorated_item(
|
||||
&self,
|
||||
tenant: &Tenant,
|
||||
@@ -198,22 +138,28 @@ impl Billing {
|
||||
description: &str,
|
||||
) -> Result<Option<InvoiceItem>> {
|
||||
let Some(relay) = query::get_relay(&activity.resource_id).await? else {
|
||||
return Ok(None);
|
||||
};
|
||||
let Some(plan) = query::get_plan(&relay.plan) else {
|
||||
return Ok(None);
|
||||
return Err(anyhow!("activity resource was not a valid relay"));
|
||||
};
|
||||
let plan = query::get_plan(&relay.plan_id)?;
|
||||
if plan.amount <= 0 {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
let anchor = tenant
|
||||
.billing_anchor
|
||||
let period = BillingPeriod::at(tenant, activity.created_at)
|
||||
.ok_or_else(|| anyhow!("billing anchor must be set before building prorated items"))?;
|
||||
let fraction = period_fraction_remaining(anchor, activity.created_at);
|
||||
let amount = sign * prorate(plan.amount, fraction);
|
||||
let amount = sign * period.prorate(plan.amount, activity.created_at);
|
||||
|
||||
Ok(Some(line_item(activity, &relay.id, plan.id, amount, description)))
|
||||
Ok(Some(InvoiceItem {
|
||||
id: uuid::Uuid::new_v4().to_string(),
|
||||
invoice_id: None,
|
||||
activity_id: Some(activity.id.clone()),
|
||||
tenant_pubkey: activity.tenant_pubkey.clone(),
|
||||
relay_id: activity.resource_id.clone(),
|
||||
plan_id: plan.id,
|
||||
amount,
|
||||
description: description.to_string(),
|
||||
created_at: activity.created_at,
|
||||
}))
|
||||
}
|
||||
|
||||
/// The prorated delta for a plan change, read straight from the activity log:
|
||||
@@ -226,128 +172,86 @@ impl Billing {
|
||||
tenant: &Tenant,
|
||||
activity: &Activity,
|
||||
) -> Result<Option<InvoiceItem>> {
|
||||
let Some(new_plan_id) = activity.plan_id.as_deref() else {
|
||||
return Ok(None);
|
||||
let new_plan_id = match &*activity.snapshot {
|
||||
Snapshot::Relay { plan, .. } => plan,
|
||||
};
|
||||
let Some(old_plan_id) =
|
||||
query::get_relay_plan_before(&activity.resource_id, activity.created_at).await?
|
||||
else {
|
||||
return Ok(None);
|
||||
return Err(anyhow!("no previous plan found for relay update activity"));
|
||||
};
|
||||
if old_plan_id == new_plan_id {
|
||||
if &old_plan_id == new_plan_id {
|
||||
return Ok(None);
|
||||
}
|
||||
let (Some(new_plan), Some(old_plan)) =
|
||||
(query::get_plan(new_plan_id), query::get_plan(&old_plan_id))
|
||||
else {
|
||||
return Ok(None);
|
||||
};
|
||||
let new_plan = query::get_plan(new_plan_id)?;
|
||||
let old_plan = query::get_plan(&old_plan_id)?;
|
||||
|
||||
let anchor = tenant
|
||||
.billing_anchor
|
||||
let period = BillingPeriod::at(tenant, activity.created_at)
|
||||
.ok_or_else(|| anyhow!("billing anchor must be set before building prorated items"))?;
|
||||
let fraction = period_fraction_remaining(anchor, activity.created_at);
|
||||
let amount = prorate(new_plan.amount, fraction) - prorate(old_plan.amount, fraction);
|
||||
let amount = period.prorate(new_plan.amount, activity.created_at)
|
||||
- period.prorate(old_plan.amount, activity.created_at);
|
||||
if amount == 0 {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
let description = format!("Plan changed from {} to {}", old_plan.name, new_plan.name);
|
||||
Ok(Some(line_item(
|
||||
activity,
|
||||
&activity.resource_id,
|
||||
new_plan.id,
|
||||
Ok(Some(InvoiceItem {
|
||||
id: uuid::Uuid::new_v4().to_string(),
|
||||
invoice_id: None,
|
||||
activity_id: Some(activity.id.clone()),
|
||||
tenant_pubkey: activity.tenant_pubkey.clone(),
|
||||
relay_id: activity.resource_id.clone(),
|
||||
plan_id: new_plan.id,
|
||||
amount,
|
||||
&description,
|
||||
)))
|
||||
}
|
||||
|
||||
/// Reconcile pending activity, add this period's renewals for any relay due,
|
||||
/// and claim everything outstanding onto an invoice. Shared by the poll and
|
||||
/// the on-demand invoice endpoint — safe to call either way: renewals are
|
||||
/// per-relay idempotent. No payment is attempted here; callers that want
|
||||
/// auto-pay do it on the returned invoice. `None` when nothing is owed.
|
||||
pub async fn generate_invoice(&self, tenant: &Tenant) -> Result<Option<Invoice>> {
|
||||
self.reconcile_subscription(tenant).await?;
|
||||
|
||||
// reconcile may have just set the anchor (first activity); re-read it.
|
||||
let Some(tenant) = query::get_tenant(&tenant.pubkey).await? else {
|
||||
return Ok(None);
|
||||
};
|
||||
let Some(anchor) = tenant.billing_anchor else {
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
let now = chrono::Utc::now().timestamp();
|
||||
let period_start = period_start_at(anchor, now);
|
||||
let period_end = add_one_month(period_start);
|
||||
|
||||
// Short-circuit the renewal scan once this period is already renewed — the
|
||||
// common case on all but the first poll of a period (saving ~720 scans a
|
||||
// month per tenant). renew_tenant re-checks this in-tx as the real guard.
|
||||
if tenant.renewed_at.is_none_or(|at| at < period_start) {
|
||||
self.renew_period(&tenant, period_start).await?;
|
||||
}
|
||||
|
||||
self.claim_outstanding(&tenant, period_start, period_end).await
|
||||
description,
|
||||
created_at: activity.created_at,
|
||||
}))
|
||||
}
|
||||
|
||||
/// Charge a full-period renewal for every relay that was active on a paid plan
|
||||
/// as of `period_start`, reconstructing that state from the activity log
|
||||
/// (status from create/activate/deactivate, plan from create/update). Per-relay
|
||||
/// idempotent via `period_start`, so calling it on every generation can't
|
||||
/// renew a relay twice; a relay created/activated *within* the period isn't
|
||||
/// active before the boundary, so it's covered by its own prorated charge.
|
||||
async fn renew_period(&self, tenant: &Tenant, period_start: i64) -> Result<()> {
|
||||
let activities = query::list_relay_activity_before(&tenant.pubkey, period_start).await?;
|
||||
/// as of `period.start`, reading that state from each relay's most recent
|
||||
/// activity snapshot before the boundary (relays with no prior activity didn't
|
||||
/// exist yet and are skipped). Idempotent per period via the tenant's
|
||||
/// `renewed_at` marker, so calling it on every generation can't renew twice;
|
||||
/// a relay created/activated *within* the period isn't active before the
|
||||
/// boundary, so it's covered by its own prorated charge instead.
|
||||
async fn reconcile_renewal(&self, tenant: &Tenant, period: &BillingPeriod) -> Result<()> {
|
||||
let relays = query::list_relays_for_tenant(&tenant.pubkey).await?;
|
||||
|
||||
let mut renewal_items = Vec::new();
|
||||
for (relay_id, state) in relay_states(&activities) {
|
||||
if !state.active {
|
||||
continue;
|
||||
}
|
||||
let Some(plan) = state.plan.and_then(|id| query::get_plan(&id)) else {
|
||||
let mut line_items = Vec::new();
|
||||
for relay in relays {
|
||||
let Some(activity) =
|
||||
query::get_latest_relay_activity_before(&relay.id, period.start).await?
|
||||
else {
|
||||
continue;
|
||||
};
|
||||
let Snapshot::Relay { plan: plan_id, status, .. } = &*activity.snapshot;
|
||||
if status != RELAY_STATUS_ACTIVE {
|
||||
continue;
|
||||
}
|
||||
let plan = query::get_plan(plan_id)?;
|
||||
if plan.amount <= 0 {
|
||||
continue;
|
||||
}
|
||||
renewal_items.push(InvoiceItem {
|
||||
line_items.push(InvoiceItem {
|
||||
id: uuid::Uuid::new_v4().to_string(),
|
||||
invoice_id: None,
|
||||
activity_id: None,
|
||||
tenant_pubkey: tenant.pubkey.clone(),
|
||||
relay_id,
|
||||
plan: plan.id,
|
||||
relay_id: relay.id,
|
||||
plan_id: plan.id,
|
||||
amount: plan.amount,
|
||||
description: "Subscription renewal".to_string(),
|
||||
created_at: period_start,
|
||||
created_at: period.start,
|
||||
});
|
||||
}
|
||||
|
||||
// Inserts the items and advances `renewed_at` to `period_start` in one
|
||||
// Inserts the items and advances `renewed_at` to `period.start` in one
|
||||
// transaction (idempotent via an in-tx guard), so a re-tick is a no-op.
|
||||
command::renew_tenant(&tenant.pubkey, period_start, &renewal_items).await
|
||||
command::insert_invoice_items_for_renewal(&line_items, period).await
|
||||
}
|
||||
|
||||
/// Claim the tenant's outstanding items onto a fresh invoice if they net
|
||||
/// positive; `None` when nothing is owed (a net credit stays outstanding and
|
||||
/// carries to the next positive invoice).
|
||||
async fn claim_outstanding(
|
||||
&self,
|
||||
tenant: &Tenant,
|
||||
period_start: i64,
|
||||
period_end: i64,
|
||||
) -> Result<Option<Invoice>> {
|
||||
let invoice_id = uuid::Uuid::new_v4().to_string();
|
||||
command::claim_outstanding_into_invoice(
|
||||
&invoice_id,
|
||||
&tenant.pubkey,
|
||||
period_start,
|
||||
period_end,
|
||||
)
|
||||
.await
|
||||
}
|
||||
// --- Payments ---
|
||||
|
||||
pub async fn attempt_payment(&self, tenant: &Tenant, invoice: &Invoice) -> Result<()> {
|
||||
let mut error_message: Option<String> = None;
|
||||
@@ -508,114 +412,65 @@ const MANUAL_PAYMENT_DM: &str = "Payment is due for your relay subscription. Ope
|
||||
const USER_ERROR_PREFIX: &str = "NWC auto-payment failed:";
|
||||
const USER_ERROR_MAX_CHARS: usize = 240;
|
||||
|
||||
/// The start of the billing period containing `now`, for monthly periods
|
||||
/// anchored at `anchor`. Steps forward in whole calendar months so boundaries
|
||||
/// track months (28–31 days) rather than a fixed span of seconds.
|
||||
fn period_start_at(anchor: i64, now: i64) -> i64 {
|
||||
use chrono::{DateTime, Months, Utc};
|
||||
/// One tenant's monthly billing period containing some timestamp, anchored at
|
||||
/// the tenant's `billing_anchor`. Half-open `[start, end)` so a moment at
|
||||
/// exactly `end` belongs to the next period.
|
||||
pub struct BillingPeriod {
|
||||
pub start: i64,
|
||||
pub end: i64,
|
||||
}
|
||||
|
||||
let anchor_dt = DateTime::<Utc>::from_timestamp(anchor, 0).unwrap_or_default();
|
||||
|
||||
let mut start = anchor_dt;
|
||||
let mut months = 1u32;
|
||||
while let Some(next) = anchor_dt.checked_add_months(Months::new(months)) {
|
||||
if next.timestamp() > now {
|
||||
break;
|
||||
}
|
||||
start = next;
|
||||
months += 1;
|
||||
impl BillingPeriod {
|
||||
/// The period containing `chrono::Utc::now()` for `tenant`. `None` when the
|
||||
/// tenant has no `billing_anchor` yet — i.e. no billable activity has been seen.
|
||||
fn current(tenant: &Tenant) -> Option<Self> {
|
||||
Self::at(tenant, chrono::Utc::now().timestamp())
|
||||
}
|
||||
|
||||
start.timestamp()
|
||||
}
|
||||
/// The period containing `at` for `tenant`. `None` when the tenant has no
|
||||
/// `billing_anchor` yet — i.e. no billable activity has been seen.
|
||||
fn at(tenant: &Tenant, at: i64) -> Option<Self> {
|
||||
use chrono::{DateTime, Months, Utc};
|
||||
|
||||
/// One calendar month after `ts` (a unix timestamp), falling back to `ts` if the
|
||||
/// shifted date can't be represented.
|
||||
fn add_one_month(ts: i64) -> i64 {
|
||||
use chrono::{DateTime, Months, Utc};
|
||||
let anchor = tenant.billing_anchor?;
|
||||
let anchor_dt = DateTime::<Utc>::from_timestamp(anchor, 0).unwrap_or_default();
|
||||
|
||||
DateTime::<Utc>::from_timestamp(ts, 0)
|
||||
.and_then(|dt| dt.checked_add_months(Months::new(1)))
|
||||
.map(|dt| dt.timestamp())
|
||||
.unwrap_or(ts)
|
||||
}
|
||||
|
||||
/// Fraction of the current billing period still unused at `at`, in `[0.0, 1.0]`,
|
||||
/// for prorating a mid-period charge or credit. With no billing anchor yet the
|
||||
/// period is only just beginning, so the whole period remains (full price).
|
||||
fn period_fraction_remaining(billing_anchor: i64, at: i64) -> f64 {
|
||||
let period_start = period_start_at(billing_anchor, at);
|
||||
let period_end = add_one_month(period_start);
|
||||
let period_len = (period_end - period_start) as f64;
|
||||
if period_len <= 0.0 {
|
||||
return 1.0;
|
||||
}
|
||||
|
||||
(((period_end - at) as f64) / period_len).clamp(0.0, 1.0)
|
||||
}
|
||||
|
||||
/// Prorate a minor-unit `amount` by `fraction`, rounded to the nearest unit.
|
||||
fn prorate(amount: i64, fraction: f64) -> i64 {
|
||||
(amount as f64 * fraction).round() as i64
|
||||
}
|
||||
|
||||
/// Build an outstanding (unassigned, `invoice_id = None`) line item from a
|
||||
/// reconciled activity. `period_start` is `Some` only for coverage charges
|
||||
/// (creation/activation), which mark the relay-period as paid.
|
||||
fn line_item(
|
||||
activity: &Activity,
|
||||
relay_id: &str,
|
||||
plan: String,
|
||||
amount: i64,
|
||||
description: &str,
|
||||
) -> InvoiceItem {
|
||||
InvoiceItem {
|
||||
id: uuid::Uuid::new_v4().to_string(),
|
||||
invoice_id: None,
|
||||
activity_id: Some(activity.id.clone()),
|
||||
tenant_pubkey: activity.tenant.clone(),
|
||||
relay_id: relay_id.to_string(),
|
||||
plan,
|
||||
amount,
|
||||
description: description.to_string(),
|
||||
created_at: activity.created_at,
|
||||
}
|
||||
}
|
||||
|
||||
/// A relay's billing-relevant state at a point in time, reconstructed by folding
|
||||
/// its activity log.
|
||||
#[derive(Default)]
|
||||
struct RelayState {
|
||||
active: bool,
|
||||
plan: Option<String>,
|
||||
}
|
||||
|
||||
/// Fold relay activities (which must be oldest-first) into each relay's
|
||||
/// `(active, plan)` state. `create`/`activate`/`deactivate` drive status;
|
||||
/// `create`/`update` carry the plan via `plan_id`. Feed it activities up to a
|
||||
/// cutoff to get each relay's state as of that moment (e.g. the period boundary).
|
||||
fn relay_states(activities: &[Activity]) -> HashMap<String, RelayState> {
|
||||
let mut states: HashMap<String, RelayState> = HashMap::new();
|
||||
|
||||
for activity in activities {
|
||||
let state = states.entry(activity.resource_id.clone()).or_default();
|
||||
match activity.activity_type.as_str() {
|
||||
"create_relay" => {
|
||||
state.active = true;
|
||||
state.plan = activity.plan_id.clone();
|
||||
// Walk forward in whole calendar months from the anchor until the next
|
||||
// step would pass `at`, so boundaries track months (28–31 days) rather
|
||||
// than a fixed span of seconds.
|
||||
let mut start = anchor_dt;
|
||||
let mut months = 1u32;
|
||||
while let Some(next) = anchor_dt.checked_add_months(Months::new(months)) {
|
||||
if next.timestamp() > at {
|
||||
break;
|
||||
}
|
||||
"update_relay" => {
|
||||
if activity.plan_id.is_some() {
|
||||
state.plan = activity.plan_id.clone();
|
||||
}
|
||||
}
|
||||
"activate_relay" => state.active = true,
|
||||
"deactivate_relay" => state.active = false,
|
||||
_ => {}
|
||||
start = next;
|
||||
months += 1;
|
||||
}
|
||||
|
||||
let end = start.checked_add_months(Months::new(1)).unwrap_or(start);
|
||||
|
||||
Some(Self {
|
||||
start: start.timestamp(),
|
||||
end: end.timestamp(),
|
||||
})
|
||||
}
|
||||
|
||||
states
|
||||
/// Fraction of this period still unused at `at`, in `[0.0, 1.0]`, for
|
||||
/// prorating a mid-period charge or credit.
|
||||
fn fraction_remaining(&self, at: i64) -> f64 {
|
||||
let len = (self.end - self.start) as f64;
|
||||
if len <= 0.0 {
|
||||
return 1.0;
|
||||
}
|
||||
(((self.end - at) as f64) / len).clamp(0.0, 1.0)
|
||||
}
|
||||
|
||||
/// Prorate a minor-unit `amount` by the fraction of this period remaining
|
||||
/// at `at`, rounded to the nearest unit.
|
||||
fn prorate(&self, amount: i64, at: i64) -> i64 {
|
||||
(amount as f64 * self.fraction_remaining(at)).round() as i64
|
||||
}
|
||||
}
|
||||
|
||||
fn summarize_error_message(error: &str) -> Option<String> {
|
||||
|
||||
@@ -1,5 +1,7 @@
|
||||
use anyhow::{Result, anyhow};
|
||||
|
||||
/// Convert a fiat amount in minor units (e.g. USD cents) to millisatoshis at the
|
||||
/// current spot price, for pricing a Lightning invoice from an invoice total.
|
||||
pub async fn fiat_to_msats(amount_fiat_minor: i64, currency: &str) -> Result<u64> {
|
||||
let price = get_bitcoin_price(¤cy.to_uppercase()).await?;
|
||||
let divisor = 10_f64.powi(currency_minor_exponent(currency)? as i32);
|
||||
@@ -18,6 +20,7 @@ struct CoinbaseSpotPriceData {
|
||||
amount: String,
|
||||
}
|
||||
|
||||
/// The current Bitcoin spot price in `currency`, from Coinbase.
|
||||
pub async fn get_bitcoin_price(currency: &str) -> Result<f64> {
|
||||
let http = reqwest::Client::new();
|
||||
let url = format!("https://api.coinbase.com/v2/prices/BTC-{currency}/spot");
|
||||
|
||||
+147
-138
@@ -1,44 +1,36 @@
|
||||
use anyhow::Result;
|
||||
use sqlx::types::Json;
|
||||
use sqlx::{Sqlite, Transaction};
|
||||
|
||||
use crate::billing::BillingPeriod;
|
||||
use crate::db::{pool, publish, with_tx};
|
||||
use crate::models::{
|
||||
Activity, Bolt11, Invoice, InvoiceItem, RELAY_STATUS_ACTIVE, RELAY_STATUS_DELINQUENT,
|
||||
RELAY_STATUS_INACTIVE, Relay, Tenant,
|
||||
RELAY_STATUS_INACTIVE, Relay, Snapshot, Tenant,
|
||||
};
|
||||
|
||||
// --- Tenants ---
|
||||
|
||||
pub async fn create_tenant(tenant: &Tenant) -> Result<()> {
|
||||
let activity = with_tx(async |tx| {
|
||||
sqlx::query(
|
||||
"INSERT INTO tenant (pubkey, nwc_url, created_at, stripe_customer_id)
|
||||
VALUES (?, ?, ?, ?)",
|
||||
)
|
||||
.bind(&tenant.pubkey)
|
||||
.bind(&tenant.nwc_url)
|
||||
.bind(tenant.created_at)
|
||||
.bind(&tenant.stripe_customer_id)
|
||||
.execute(&mut **tx)
|
||||
.await?;
|
||||
insert_activity_tx(tx, "create_tenant", "tenant", &tenant.pubkey, None).await
|
||||
})
|
||||
sqlx::query(
|
||||
"INSERT INTO tenant (pubkey, nwc_url, created_at, stripe_customer_id)
|
||||
VALUES (?, ?, ?, ?)",
|
||||
)
|
||||
.bind(&tenant.pubkey)
|
||||
.bind(&tenant.nwc_url)
|
||||
.bind(tenant.created_at)
|
||||
.bind(&tenant.stripe_customer_id)
|
||||
.execute(pool())
|
||||
.await?;
|
||||
publish(activity);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn update_tenant(tenant: &Tenant) -> Result<()> {
|
||||
let activity = with_tx(async |tx| {
|
||||
sqlx::query("UPDATE tenant SET nwc_url = ? WHERE pubkey = ?")
|
||||
.bind(&tenant.nwc_url)
|
||||
.bind(&tenant.pubkey)
|
||||
.execute(&mut **tx)
|
||||
.await?;
|
||||
insert_activity_tx(tx, "update_tenant", "tenant", &tenant.pubkey, None).await
|
||||
})
|
||||
.await?;
|
||||
publish(activity);
|
||||
sqlx::query("UPDATE tenant SET nwc_url = ? WHERE pubkey = ?")
|
||||
.bind(&tenant.nwc_url)
|
||||
.bind(&tenant.pubkey)
|
||||
.execute(pool())
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -65,7 +57,7 @@ pub async fn create_relay(relay: &Relay) -> Result<()> {
|
||||
let activity = with_tx(async |tx| {
|
||||
sqlx::query(
|
||||
"INSERT INTO relay (
|
||||
id, tenant, subdomain, plan, status, synced, sync_error,
|
||||
id, tenant_pubkey, subdomain, plan_id, status, synced, sync_error,
|
||||
info_name, info_icon, info_description,
|
||||
policy_public_join, policy_strip_signatures,
|
||||
groups_enabled, management_enabled, blossom_enabled,
|
||||
@@ -73,9 +65,9 @@ pub async fn create_relay(relay: &Relay) -> Result<()> {
|
||||
) VALUES (?, ?, ?, ?, 'active', 0, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
|
||||
)
|
||||
.bind(&relay.id)
|
||||
.bind(&relay.tenant)
|
||||
.bind(&relay.tenant_pubkey)
|
||||
.bind(&relay.subdomain)
|
||||
.bind(&relay.plan)
|
||||
.bind(&relay.plan_id)
|
||||
.bind(&relay.sync_error)
|
||||
.bind(&relay.info_name)
|
||||
.bind(&relay.info_icon)
|
||||
@@ -89,7 +81,11 @@ pub async fn create_relay(relay: &Relay) -> Result<()> {
|
||||
.bind(relay.push_enabled)
|
||||
.execute(&mut **tx)
|
||||
.await?;
|
||||
insert_activity_tx(tx, "create_relay", "relay", &relay.id, Some(&relay.plan)).await
|
||||
let snapshot = Snapshot::Relay {
|
||||
plan: relay.plan_id.clone(),
|
||||
status: RELAY_STATUS_ACTIVE.to_string(),
|
||||
};
|
||||
insert_activity_tx(tx, "create_relay", &relay.id, snapshot).await
|
||||
})
|
||||
.await?;
|
||||
publish(activity);
|
||||
@@ -100,16 +96,16 @@ pub async fn update_relay(relay: &Relay) -> Result<()> {
|
||||
let activity = with_tx(async |tx| {
|
||||
sqlx::query(
|
||||
"UPDATE relay
|
||||
SET tenant = ?, subdomain = ?, plan = ?, status = ?, sync_error = ?, synced = 0,
|
||||
SET tenant_pubkey = ?, subdomain = ?, plan_id = ?, status = ?, sync_error = ?, synced = 0,
|
||||
info_name = ?, info_icon = ?, info_description = ?,
|
||||
policy_public_join = ?, policy_strip_signatures = ?,
|
||||
groups_enabled = ?, management_enabled = ?, blossom_enabled = ?,
|
||||
livekit_enabled = ?, push_enabled = ?
|
||||
WHERE id = ?",
|
||||
)
|
||||
.bind(&relay.tenant)
|
||||
.bind(&relay.tenant_pubkey)
|
||||
.bind(&relay.subdomain)
|
||||
.bind(&relay.plan)
|
||||
.bind(&relay.plan_id)
|
||||
.bind(&relay.status)
|
||||
.bind(&relay.sync_error)
|
||||
.bind(&relay.info_name)
|
||||
@@ -125,34 +121,42 @@ pub async fn update_relay(relay: &Relay) -> Result<()> {
|
||||
.bind(&relay.id)
|
||||
.execute(&mut **tx)
|
||||
.await?;
|
||||
insert_activity_tx(tx, "update_relay", "relay", &relay.id, Some(&relay.plan)).await
|
||||
let snapshot = Snapshot::Relay {
|
||||
plan: relay.plan_id.clone(),
|
||||
status: relay.status.clone(),
|
||||
};
|
||||
insert_activity_tx(tx, "update_relay", &relay.id, snapshot).await
|
||||
})
|
||||
.await?;
|
||||
publish(activity);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn activate_relay(relay: &Relay) -> Result<()> {
|
||||
set_relay_status(relay, RELAY_STATUS_ACTIVE, "activate_relay").await
|
||||
}
|
||||
|
||||
pub async fn deactivate_relay(relay: &Relay) -> Result<()> {
|
||||
set_relay_status(&relay.id, RELAY_STATUS_INACTIVE, "deactivate_relay").await
|
||||
set_relay_status(relay, RELAY_STATUS_INACTIVE, "deactivate_relay").await
|
||||
}
|
||||
|
||||
#[allow(dead_code)] // wired up by the delinquency flow (not yet implemented)
|
||||
pub async fn mark_relay_delinquent(relay: &Relay) -> Result<()> {
|
||||
set_relay_status(&relay.id, RELAY_STATUS_DELINQUENT, "mark_relay_delinquent").await
|
||||
set_relay_status(relay, RELAY_STATUS_DELINQUENT, "mark_relay_delinquent").await
|
||||
}
|
||||
|
||||
pub async fn activate_relay(relay: &Relay) -> Result<()> {
|
||||
set_relay_status(&relay.id, RELAY_STATUS_ACTIVE, "activate_relay").await
|
||||
}
|
||||
|
||||
async fn set_relay_status(relay_id: &str, status: &str, activity_type: &str) -> Result<()> {
|
||||
async fn set_relay_status(relay: &Relay, status: &str, activity_type: &str) -> Result<()> {
|
||||
let activity = with_tx(async |tx| {
|
||||
sqlx::query("UPDATE relay SET status = ?, synced = 0 WHERE id = ?")
|
||||
.bind(status)
|
||||
.bind(relay_id)
|
||||
.bind(&relay.id)
|
||||
.execute(&mut **tx)
|
||||
.await?;
|
||||
insert_activity_tx(tx, activity_type, "relay", relay_id, None).await
|
||||
let snapshot = Snapshot::Relay {
|
||||
plan: relay.plan_id.clone(),
|
||||
status: status.to_string(),
|
||||
};
|
||||
insert_activity_tx(tx, activity_type, &relay.id, snapshot).await
|
||||
})
|
||||
.await?;
|
||||
publish(activity);
|
||||
@@ -166,34 +170,88 @@ pub async fn fail_relay_sync(relay: &Relay, sync_error: String) -> Result<()> {
|
||||
.bind(&relay.id)
|
||||
.execute(&mut **tx)
|
||||
.await?;
|
||||
insert_activity_tx(tx, "fail_relay_sync", "relay", &relay.id, None).await
|
||||
let snapshot = Snapshot::Relay {
|
||||
plan: relay.plan_id.clone(),
|
||||
status: relay.status.clone(),
|
||||
};
|
||||
insert_activity_tx(tx, "fail_relay_sync", &relay.id, snapshot).await
|
||||
})
|
||||
.await?;
|
||||
publish(activity);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn complete_relay_sync(relay_id: &str) -> Result<()> {
|
||||
pub async fn complete_relay_sync(relay: &Relay) -> Result<()> {
|
||||
let activity = with_tx(async |tx| {
|
||||
sqlx::query("UPDATE relay SET synced = 1, sync_error = '' WHERE id = ?")
|
||||
.bind(relay_id)
|
||||
.bind(&relay.id)
|
||||
.execute(&mut **tx)
|
||||
.await?;
|
||||
insert_activity_tx(tx, "complete_relay_sync", "relay", relay_id, None).await
|
||||
let snapshot = Snapshot::Relay {
|
||||
plan: relay.plan_id.clone(),
|
||||
status: relay.status.clone(),
|
||||
};
|
||||
insert_activity_tx(tx, "complete_relay_sync", &relay.id, snapshot).await
|
||||
})
|
||||
.await?;
|
||||
publish(activity);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// --- Invoice items (the outstanding-charge ledger) ---
|
||||
// --- Invoice items ---
|
||||
|
||||
/// Persist a reconciled activity's line item and mark the activity billed in one
|
||||
/// transaction, so a recovery pass never re-bills it.
|
||||
pub async fn insert_invoice_item_for_activity(invoice_item: &InvoiceItem, activity_id: &str) -> Result<()> {
|
||||
let now = chrono::Utc::now().timestamp();
|
||||
|
||||
with_tx(async |tx| {
|
||||
insert_invoice_item_tx(tx, invoice_item).await?;
|
||||
mark_activity_billed_tx(tx, activity_id, now).await?;
|
||||
// Claim the activity first. If a concurrent reconcile pass already billed
|
||||
// it, the claim no-ops and we skip the item rather than duplicating it.
|
||||
if mark_activity_billed_tx(tx, activity_id, now).await? {
|
||||
insert_invoice_item_tx(tx, invoice_item).await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
||||
/// Insert this period's renewal items and advance the tenant's `renewed_at`
|
||||
/// marker to `period.start`, atomically and idempotently. Empty `items` is a
|
||||
/// no-op — a tenant with no active paid relays has nothing to renew.
|
||||
pub async fn insert_invoice_items_for_renewal(
|
||||
items: &[InvoiceItem],
|
||||
period: &BillingPeriod,
|
||||
) -> Result<()> {
|
||||
let Some(first) = items.first() else {
|
||||
return Ok(());
|
||||
};
|
||||
let tenant_pubkey = &first.tenant_pubkey;
|
||||
|
||||
with_tx(async |tx| {
|
||||
// Re-read the marker inside the transaction so the guard and the writes
|
||||
// commit together — this ensures idempotency so we don't double-invoice.
|
||||
let renewed_at = sqlx::query_scalar::<_, Option<i64>>(
|
||||
"SELECT renewed_at FROM tenant WHERE pubkey = ?",
|
||||
)
|
||||
.bind(tenant_pubkey)
|
||||
.fetch_one(&mut **tx)
|
||||
.await?;
|
||||
|
||||
if renewed_at.is_some_and(|at| at >= period.start) {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
for item in items {
|
||||
insert_invoice_item_tx(tx, item).await?;
|
||||
}
|
||||
|
||||
sqlx::query("UPDATE tenant SET renewed_at = ? WHERE pubkey = ?")
|
||||
.bind(period.start)
|
||||
.bind(tenant_pubkey)
|
||||
.execute(&mut **tx)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
})
|
||||
@@ -205,45 +263,8 @@ pub async fn insert_invoice_item_for_activity(invoice_item: &InvoiceItem, activi
|
||||
pub async fn mark_activity_billed(activity_id: &str) -> Result<()> {
|
||||
let now = chrono::Utc::now().timestamp();
|
||||
|
||||
with_tx(async |tx| mark_activity_billed_tx(tx, activity_id, now).await).await
|
||||
}
|
||||
|
||||
/// Insert renewal line items, skipping any relay already covered for the item's
|
||||
/// `period_start`. The per-relay existence check and insert are a single
|
||||
/// statement, so neither a re-tick nor a relay's own creation/activation charge
|
||||
/// (which also stamps `period_start`) can bill the same relay-period twice.
|
||||
pub async fn renew_tenant(
|
||||
tenant_pubkey: &str,
|
||||
period_start: i64,
|
||||
items: &[InvoiceItem],
|
||||
) -> Result<()> {
|
||||
with_tx(async |tx| {
|
||||
// In-tx guard: bail if this tenant has already been renewed for this
|
||||
// period (or later). This is the correctness backstop — it keeps renewal
|
||||
// idempotent under a crash mid-renewal or a poll racing the eager
|
||||
// endpoint, since the item inserts and the `renewed_at` write commit
|
||||
// together.
|
||||
let renewed_at = sqlx::query_scalar::<_, Option<i64>>(
|
||||
"SELECT renewed_at FROM tenant WHERE pubkey = ?",
|
||||
)
|
||||
.bind(tenant_pubkey)
|
||||
.fetch_one(&mut **tx)
|
||||
.await?;
|
||||
|
||||
if renewed_at.is_some_and(|at| at >= period_start) {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
for item in items {
|
||||
insert_invoice_item_tx(tx, item).await?;
|
||||
}
|
||||
|
||||
sqlx::query("UPDATE tenant SET renewed_at = ? WHERE pubkey = ?")
|
||||
.bind(period_start)
|
||||
.bind(tenant_pubkey)
|
||||
.execute(&mut **tx)
|
||||
.await?;
|
||||
|
||||
mark_activity_billed_tx(tx, activity_id, now).await?;
|
||||
Ok(())
|
||||
})
|
||||
.await
|
||||
@@ -251,23 +272,16 @@ pub async fn renew_tenant(
|
||||
|
||||
// --- Invoices ---
|
||||
|
||||
/// Claim all of a tenant's outstanding items onto a new invoice — but only if
|
||||
/// they sum to a positive amount. A non-positive balance (net credit or nothing
|
||||
/// owed) leaves the items outstanding so the credit carries to the next positive
|
||||
/// invoice. The sum, insert, and claim run in one transaction. Returns the
|
||||
/// invoice, or `None` when there's nothing to bill.
|
||||
pub async fn claim_outstanding_into_invoice(
|
||||
invoice_id: &str,
|
||||
tenant_pubkey: &str,
|
||||
period_start: i64,
|
||||
period_end: i64,
|
||||
) -> Result<Option<Invoice>> {
|
||||
/// Claim all of a tenant's outstanding items onto a new invoice. A non-positive
|
||||
/// balance leaves the items outstanding so the credit carries to the next positive
|
||||
/// invoice. Returns the invoice, or `None` when there's nothing to bill.
|
||||
pub async fn create_invoice(tenant: &Tenant, period: &BillingPeriod) -> Result<Option<Invoice>> {
|
||||
with_tx(async |tx| {
|
||||
let total = sqlx::query_scalar::<_, i64>(
|
||||
"SELECT COALESCE(SUM(amount), 0) FROM invoice_item
|
||||
WHERE tenant_pubkey = ? AND invoice_id IS NULL",
|
||||
)
|
||||
.bind(tenant_pubkey)
|
||||
.bind(&tenant.pubkey)
|
||||
.fetch_one(&mut **tx)
|
||||
.await?;
|
||||
|
||||
@@ -275,15 +289,14 @@ pub async fn claim_outstanding_into_invoice(
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
let invoice =
|
||||
insert_invoice_tx(tx, invoice_id, tenant_pubkey, period_start, period_end).await?;
|
||||
let invoice = insert_invoice_tx(tx, &tenant, &period).await?;
|
||||
|
||||
sqlx::query(
|
||||
"UPDATE invoice_item SET invoice_id = ?
|
||||
WHERE tenant_pubkey = ? AND invoice_id IS NULL",
|
||||
)
|
||||
.bind(invoice_id)
|
||||
.bind(tenant_pubkey)
|
||||
.bind(&invoice.id)
|
||||
.bind(&tenant.pubkey)
|
||||
.execute(&mut **tx)
|
||||
.await?;
|
||||
|
||||
@@ -295,17 +308,12 @@ pub async fn claim_outstanding_into_invoice(
|
||||
pub async fn mark_invoice_paid(invoice_id: &str, method: &str) -> Result<()> {
|
||||
let updated_at = chrono::Utc::now().timestamp();
|
||||
|
||||
let activity = with_tx(async |tx| {
|
||||
sqlx::query("UPDATE invoice SET status = 'paid', method = ?, updated_at = ? WHERE id = ?")
|
||||
.bind(method)
|
||||
.bind(updated_at)
|
||||
.bind(invoice_id)
|
||||
.execute(&mut **tx)
|
||||
.await?;
|
||||
insert_activity_tx(tx, "invoice_paid", "invoice", invoice_id, None).await
|
||||
})
|
||||
.await?;
|
||||
publish(activity);
|
||||
sqlx::query("UPDATE invoice SET status = 'paid', method = ?, updated_at = ? WHERE id = ?")
|
||||
.bind(method)
|
||||
.bind(updated_at)
|
||||
.bind(invoice_id)
|
||||
.execute(pool())
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -370,67 +378,65 @@ pub async fn insert_intent(intent_id: &str, invoice_id: &str) -> Result<()> {
|
||||
async fn insert_activity_tx(
|
||||
tx: &mut Transaction<'_, Sqlite>,
|
||||
activity_type: &str,
|
||||
resource_type: &str,
|
||||
resource_id: &str,
|
||||
plan_id: Option<&str>,
|
||||
snapshot: Snapshot,
|
||||
) -> Result<Activity> {
|
||||
let tenant = match resource_type {
|
||||
"tenant" => resource_id.to_string(),
|
||||
"relay" => {
|
||||
sqlx::query_scalar::<_, String>("SELECT tenant FROM relay WHERE id = ?")
|
||||
let resource_type = snapshot.resource_type();
|
||||
let tenant_pubkey = match &snapshot {
|
||||
Snapshot::Relay { .. } => {
|
||||
sqlx::query_scalar::<_, String>("SELECT tenant_pubkey FROM relay WHERE id = ?")
|
||||
.bind(resource_id)
|
||||
.fetch_one(&mut **tx)
|
||||
.await?
|
||||
}
|
||||
_ => anyhow::bail!("unknown resource_type: {resource_type}"),
|
||||
};
|
||||
|
||||
let id = uuid::Uuid::new_v4().to_string();
|
||||
let created_at = chrono::Utc::now().timestamp();
|
||||
let snapshot = Json(snapshot);
|
||||
|
||||
sqlx::query(
|
||||
"INSERT INTO activity (id, tenant, created_at, activity_type, resource_type, resource_id, plan_id)
|
||||
"INSERT INTO activity (id, tenant_pubkey, created_at, activity_type, resource_type, resource_id, snapshot)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?)",
|
||||
)
|
||||
.bind(&id)
|
||||
.bind(&tenant)
|
||||
.bind(&tenant_pubkey)
|
||||
.bind(created_at)
|
||||
.bind(activity_type)
|
||||
.bind(resource_type)
|
||||
.bind(resource_id)
|
||||
.bind(plan_id)
|
||||
.bind(&snapshot)
|
||||
.execute(&mut **tx)
|
||||
.await?;
|
||||
|
||||
Ok(Activity {
|
||||
id,
|
||||
tenant,
|
||||
tenant_pubkey,
|
||||
created_at,
|
||||
activity_type: activity_type.to_string(),
|
||||
resource_type: resource_type.to_string(),
|
||||
resource_id: resource_id.to_string(),
|
||||
billed_at: None,
|
||||
plan_id: plan_id.map(str::to_string),
|
||||
snapshot,
|
||||
})
|
||||
}
|
||||
|
||||
async fn insert_invoice_tx(
|
||||
tx: &mut Transaction<'_, Sqlite>,
|
||||
invoice_id: &str,
|
||||
tenant_pubkey: &str,
|
||||
period_start: i64,
|
||||
period_end: i64,
|
||||
tenant: &Tenant,
|
||||
period: &BillingPeriod,
|
||||
) -> Result<Invoice> {
|
||||
let now = chrono::Utc::now().timestamp();
|
||||
let invoice_id = uuid::Uuid::new_v4().to_string();
|
||||
|
||||
Ok(sqlx::query_as::<_, Invoice>(
|
||||
"INSERT INTO invoice (id, tenant_pubkey, status, period_start, period_end, created_at, updated_at)
|
||||
VALUES (?, ?, 'open', ?, ?, ?, ?) RETURNING *",
|
||||
)
|
||||
.bind(invoice_id)
|
||||
.bind(tenant_pubkey)
|
||||
.bind(period_start)
|
||||
.bind(period_end)
|
||||
.bind(&tenant.pubkey)
|
||||
.bind(&period.start)
|
||||
.bind(period.end)
|
||||
.bind(now)
|
||||
.bind(now)
|
||||
.fetch_one(&mut **tx)
|
||||
@@ -440,7 +446,7 @@ async fn insert_invoice_tx(
|
||||
async fn insert_invoice_item_tx(tx: &mut Transaction<'_, Sqlite>, item: &InvoiceItem) -> Result<()> {
|
||||
sqlx::query(
|
||||
"INSERT INTO invoice_item
|
||||
(id, invoice_id, activity_id, tenant_pubkey, relay_id, plan, amount, description, created_at)
|
||||
(id, invoice_id, activity_id, tenant_pubkey, relay_id, plan_id, amount, description, created_at)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)",
|
||||
)
|
||||
.bind(&item.id)
|
||||
@@ -448,7 +454,7 @@ async fn insert_invoice_item_tx(tx: &mut Transaction<'_, Sqlite>, item: &Invoice
|
||||
.bind(&item.activity_id)
|
||||
.bind(&item.tenant_pubkey)
|
||||
.bind(&item.relay_id)
|
||||
.bind(&item.plan)
|
||||
.bind(&item.plan_id)
|
||||
.bind(item.amount)
|
||||
.bind(&item.description)
|
||||
.bind(item.created_at)
|
||||
@@ -457,15 +463,18 @@ async fn insert_invoice_item_tx(tx: &mut Transaction<'_, Sqlite>, item: &Invoice
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Claim an activity as billed. Returns `true` if this call set the marker, and
|
||||
/// `false` if it was already set — e.g. a concurrent reconcile pass won the race —
|
||||
/// so callers can skip work that would otherwise double-bill.
|
||||
async fn mark_activity_billed_tx(
|
||||
tx: &mut Transaction<'_, Sqlite>,
|
||||
activity_id: &str,
|
||||
billed_at: i64,
|
||||
) -> Result<()> {
|
||||
sqlx::query("UPDATE activity SET billed_at = ? WHERE id = ?")
|
||||
) -> Result<bool> {
|
||||
let result = sqlx::query("UPDATE activity SET billed_at = ? WHERE id = ? AND billed_at IS NULL")
|
||||
.bind(billed_at)
|
||||
.bind(activity_id)
|
||||
.execute(&mut **tx)
|
||||
.await?;
|
||||
Ok(())
|
||||
Ok(result.rows_affected() > 0)
|
||||
}
|
||||
|
||||
+261
-271
@@ -1,3 +1,7 @@
|
||||
//! The relay-provisioning reactor: it keeps the external relay backend (the
|
||||
//! zooid API) in sync with our relay rows, reacting to relay activity and
|
||||
//! retrying failed syncs with backoff.
|
||||
|
||||
use anyhow::Result;
|
||||
use nostr_sdk::prelude::*;
|
||||
use std::time::Duration;
|
||||
@@ -12,286 +16,272 @@ const RELAY_SYNC_RETRY_BASE_DELAY_SECS: u64 = 30;
|
||||
const RELAY_SYNC_RETRY_MAX_DELAY_SECS: u64 = 15 * 60;
|
||||
const RELAY_SYNC_RETRY_MAX_ATTEMPTS: usize = 6;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct Infra;
|
||||
/// Run the reactor for the life of the process: reconcile any relays left
|
||||
/// unsynced from a previous run, then sync each relay as its activity arrives.
|
||||
pub async fn start() {
|
||||
let mut rx = db::subscribe();
|
||||
|
||||
impl Infra {
|
||||
pub fn new() -> Self {
|
||||
Self
|
||||
if let Err(error) = reconcile_relay_state("startup").await {
|
||||
tracing::error!(error = %error, "failed to reconcile relay state on startup");
|
||||
}
|
||||
|
||||
pub async fn start(self) {
|
||||
let mut rx = db::subscribe();
|
||||
|
||||
if let Err(error) = self.reconcile_relay_state("startup").await {
|
||||
tracing::error!(error = %error, "failed to reconcile relay state on startup");
|
||||
}
|
||||
|
||||
loop {
|
||||
match rx.recv().await {
|
||||
Ok(activity) => {
|
||||
if let Err(e) = self.handle_activity(&activity).await {
|
||||
tracing::error!(error = %e, "infra handle_activity failed");
|
||||
}
|
||||
}
|
||||
Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
|
||||
tracing::warn!(missed = n, "infra lagged");
|
||||
|
||||
if let Err(error) = self.reconcile_relay_state("lagged").await {
|
||||
tracing::error!(error = %error, "failed to reconcile relay state after lag");
|
||||
}
|
||||
}
|
||||
Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_activity(&self, activity: &Activity) -> Result<()> {
|
||||
let needs_sync = matches!(
|
||||
activity.activity_type.as_str(),
|
||||
"create_relay" | "update_relay" | "activate_relay" | "deactivate_relay" | "fail_relay_sync"
|
||||
);
|
||||
|
||||
if activity.resource_type != "relay" || !needs_sync {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
if activity.activity_type == "fail_relay_sync" {
|
||||
self.schedule_relay_sync_retry(&activity.resource_id, "activity").await?;
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let Some(relay) = query::get_relay(&activity.resource_id).await? else {
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
self.sync_relay(&relay).await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn reconcile_relay_state(&self, source: &str) -> Result<()> {
|
||||
let relays = query::list_relays_pending_sync().await?;
|
||||
|
||||
if relays.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
tracing::info!(source, relay_count = relays.len(), "reconciling pending relay state");
|
||||
|
||||
for relay in relays {
|
||||
if relay.sync_error.trim().is_empty() {
|
||||
self.sync_relay(&relay).await;
|
||||
} else {
|
||||
self.schedule_relay_sync_retry(&relay.id, source).await?;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn schedule_relay_sync_retry(&self, relay_id: &str, source: &str) -> Result<()> {
|
||||
fn get_retry_delay(consecutive_failures: usize) -> Option<Duration> {
|
||||
let retry_attempt = consecutive_failures.max(1);
|
||||
if retry_attempt > RELAY_SYNC_RETRY_MAX_ATTEMPTS {
|
||||
return None;
|
||||
}
|
||||
|
||||
let exponent = (retry_attempt - 1).min(31);
|
||||
let multiplier = 1u64 << exponent;
|
||||
let delay_secs = RELAY_SYNC_RETRY_BASE_DELAY_SECS
|
||||
.saturating_mul(multiplier)
|
||||
.min(RELAY_SYNC_RETRY_MAX_DELAY_SECS);
|
||||
|
||||
Some(Duration::from_secs(delay_secs))
|
||||
}
|
||||
|
||||
let activities = query::list_activity_for_resource(relay_id).await?;
|
||||
let consecutive_failures = activities
|
||||
.iter()
|
||||
.take_while(|activity| activity.activity_type == "fail_relay_sync")
|
||||
.count();
|
||||
|
||||
let Some(delay) = get_retry_delay(consecutive_failures) else {
|
||||
tracing::warn!(
|
||||
relay = relay_id,
|
||||
consecutive_failures,
|
||||
max_attempts = RELAY_SYNC_RETRY_MAX_ATTEMPTS,
|
||||
"relay sync retries exhausted; awaiting manual intervention"
|
||||
);
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
tracing::info!(
|
||||
relay = relay_id,
|
||||
source,
|
||||
consecutive_failures,
|
||||
delay_secs = delay.as_secs(),
|
||||
"scheduled relay sync retry"
|
||||
);
|
||||
|
||||
let relay_id = relay_id.to_string();
|
||||
let infra = self.clone();
|
||||
|
||||
tokio::spawn(async move {
|
||||
tokio::time::sleep(delay).await;
|
||||
|
||||
match query::get_relay(&relay_id).await {
|
||||
Ok(Some(relay)) => infra.sync_relay(&relay).await,
|
||||
Ok(None) => {}
|
||||
Err(e) => {
|
||||
tracing::error!(relay = %relay_id, error = %e, "relay sync retry task failed");
|
||||
loop {
|
||||
match rx.recv().await {
|
||||
Ok(activity) => {
|
||||
if let Err(e) = handle_activity(&activity).await {
|
||||
tracing::error!(error = %e, "infra handle_activity failed");
|
||||
}
|
||||
}
|
||||
});
|
||||
Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
|
||||
tracing::warn!(missed = n, "infra lagged");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn sync_relay(&self, relay: &Relay) {
|
||||
match self.try_sync_relay(relay).await {
|
||||
Ok(()) => {
|
||||
tracing::info!(relay = %relay.id, "relay sync succeeded");
|
||||
if let Err(e) = command::complete_relay_sync(&relay.id).await {
|
||||
tracing::error!(relay = %relay.id, error = %e, "failed to mark sync complete");
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::warn!(relay = %relay.id, error = %e, "relay sync failed");
|
||||
if let Err(e2) = command::fail_relay_sync(relay, e.to_string()).await {
|
||||
tracing::error!(relay = %relay.id, error = %e2, "failed to record sync failure");
|
||||
if let Err(error) = reconcile_relay_state("lagged").await {
|
||||
tracing::error!(error = %error, "failed to reconcile relay state after lag");
|
||||
}
|
||||
}
|
||||
Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
|
||||
}
|
||||
}
|
||||
|
||||
async fn try_sync_relay(&self, relay: &Relay) -> Result<()> {
|
||||
// A relay is "new" (POST with a freshly generated secret) only if it has
|
||||
// never completed a sync. `synced == 1` short-circuits the activity lookup;
|
||||
// otherwise check the activity history so that a re-sync after an update
|
||||
// (which resets `synced` to 0) PATCHes instead of clobbering the secret.
|
||||
let is_new = relay.synced != 1
|
||||
&& query::get_latest_activity_for_resource_and_type(&relay.id, "complete_relay_sync")
|
||||
.await?
|
||||
.is_none();
|
||||
|
||||
let mut body = serde_json::json!({
|
||||
"host": format!("{}.{}", relay.subdomain, env::get().relay_domain),
|
||||
"schema": relay.id,
|
||||
"inactive": relay.status == RELAY_STATUS_INACTIVE
|
||||
|| relay.status == RELAY_STATUS_DELINQUENT,
|
||||
"info": {
|
||||
"name": relay.info_name,
|
||||
"icon": relay.info_icon,
|
||||
"description": relay.info_description,
|
||||
"pubkey": relay.tenant,
|
||||
},
|
||||
"policy": {
|
||||
"public_join": relay.policy_public_join == 1,
|
||||
"strip_signatures": relay.policy_strip_signatures == 1,
|
||||
},
|
||||
"groups": { "enabled": relay.groups_enabled == 1 },
|
||||
"management": { "enabled": relay.management_enabled == 1 },
|
||||
"blossom": if relay.blossom_enabled == 1 {
|
||||
serde_json::json!({
|
||||
"enabled": true,
|
||||
"adapter": "s3",
|
||||
"s3": {
|
||||
"endpoint": env::get().blossom_s3_endpoint,
|
||||
"region": env::get().blossom_s3_region,
|
||||
"bucket": env::get().blossom_s3_bucket,
|
||||
"access_key": env::get().blossom_s3_access_key,
|
||||
"secret_key": env::get().blossom_s3_secret_key,
|
||||
"key_prefix": relay.id,
|
||||
},
|
||||
})
|
||||
} else {
|
||||
serde_json::json!({ "enabled": false })
|
||||
},
|
||||
"livekit": if relay.livekit_enabled == 1 {
|
||||
serde_json::json!({
|
||||
"enabled": true,
|
||||
"server_url": env::get().livekit_url,
|
||||
"api_key": env::get().livekit_api_key,
|
||||
"api_secret": env::get().livekit_api_secret,
|
||||
})
|
||||
} else {
|
||||
serde_json::json!({ "enabled": false })
|
||||
},
|
||||
"push": { "enabled": relay.push_enabled == 1 },
|
||||
"roles": {
|
||||
"admin": { "can_manage": true, "can_invite": true },
|
||||
"member": { "can_invite": true },
|
||||
},
|
||||
});
|
||||
|
||||
// Only provide a secret if the relay is new. This allows us to not store the relay secrets on our side.
|
||||
if is_new && let Some(obj) = body.as_object_mut() {
|
||||
obj.insert(
|
||||
"secret".to_string(),
|
||||
serde_json::Value::String(Keys::generate().secret_key().to_secret_hex()),
|
||||
);
|
||||
}
|
||||
|
||||
let method = if is_new { HttpMethod::POST } else { HttpMethod::PATCH };
|
||||
self.request(method, &format!("relay/{}", relay.id), Some(&body))
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn list_relay_members(&self, relay_id: &str) -> Result<Vec<String>> {
|
||||
#[derive(serde::Deserialize)]
|
||||
struct MembersResponse {
|
||||
members: Vec<String>,
|
||||
}
|
||||
|
||||
let response = self
|
||||
.request(HttpMethod::GET, &format!("relay/{relay_id}/members"), None)
|
||||
.await?;
|
||||
let parsed: MembersResponse = response.json().await?;
|
||||
Ok(parsed.members)
|
||||
}
|
||||
|
||||
// Internal utilities
|
||||
|
||||
/// Sends an authenticated request to the zooid API at `path` (relative to
|
||||
/// `env.zooid_api_url`). Returns the response on 2xx; bails with the body
|
||||
/// text otherwise.
|
||||
async fn request(
|
||||
&self,
|
||||
method: HttpMethod,
|
||||
path: &str,
|
||||
body: Option<&serde_json::Value>,
|
||||
) -> Result<reqwest::Response> {
|
||||
let client = reqwest::Client::builder()
|
||||
.timeout(Duration::from_secs(5))
|
||||
.build()?;
|
||||
let base = env::get().zooid_api_url.trim_end_matches('/');
|
||||
let path = path.trim_start_matches('/');
|
||||
let url = format!("{base}/{path}");
|
||||
let auth = env::get().make_auth(&url, method).await?;
|
||||
|
||||
let reqwest_method = match method {
|
||||
HttpMethod::GET => reqwest::Method::GET,
|
||||
HttpMethod::POST => reqwest::Method::POST,
|
||||
HttpMethod::PUT => reqwest::Method::PUT,
|
||||
HttpMethod::PATCH => reqwest::Method::PATCH,
|
||||
};
|
||||
|
||||
let mut req = client
|
||||
.request(reqwest_method, &url)
|
||||
.header("Authorization", auth);
|
||||
if let Some(body) = body {
|
||||
req = req.json(body);
|
||||
}
|
||||
|
||||
let response = req.send().await?;
|
||||
|
||||
if !response.status().is_success() {
|
||||
let status = response.status();
|
||||
let text = response.text().await.unwrap_or_default();
|
||||
anyhow::bail!("zooid {method} {path} returned {status}: {text}");
|
||||
}
|
||||
Ok(response)
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_activity(activity: &Activity) -> Result<()> {
|
||||
let needs_sync = matches!(
|
||||
activity.activity_type.as_str(),
|
||||
"create_relay" | "update_relay" | "activate_relay" | "deactivate_relay" | "fail_relay_sync"
|
||||
);
|
||||
|
||||
if activity.resource_type != "relay" || !needs_sync {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
if activity.activity_type == "fail_relay_sync" {
|
||||
schedule_relay_sync_retry(&activity.resource_id, "activity").await?;
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let Some(relay) = query::get_relay(&activity.resource_id).await? else {
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
sync_relay(&relay).await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn reconcile_relay_state(source: &str) -> Result<()> {
|
||||
let relays = query::list_relays_pending_sync().await?;
|
||||
|
||||
if relays.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
tracing::info!(source, relay_count = relays.len(), "reconciling pending relay state");
|
||||
|
||||
for relay in relays {
|
||||
if relay.sync_error.trim().is_empty() {
|
||||
sync_relay(&relay).await;
|
||||
} else {
|
||||
schedule_relay_sync_retry(&relay.id, source).await?;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn schedule_relay_sync_retry(relay_id: &str, source: &str) -> Result<()> {
|
||||
fn get_retry_delay(consecutive_failures: usize) -> Option<Duration> {
|
||||
let retry_attempt = consecutive_failures.max(1);
|
||||
if retry_attempt > RELAY_SYNC_RETRY_MAX_ATTEMPTS {
|
||||
return None;
|
||||
}
|
||||
|
||||
let exponent = (retry_attempt - 1).min(31);
|
||||
let multiplier = 1u64 << exponent;
|
||||
let delay_secs = RELAY_SYNC_RETRY_BASE_DELAY_SECS
|
||||
.saturating_mul(multiplier)
|
||||
.min(RELAY_SYNC_RETRY_MAX_DELAY_SECS);
|
||||
|
||||
Some(Duration::from_secs(delay_secs))
|
||||
}
|
||||
|
||||
let activities = query::list_activity_for_resource(relay_id).await?;
|
||||
let consecutive_failures = activities
|
||||
.iter()
|
||||
.take_while(|activity| activity.activity_type == "fail_relay_sync")
|
||||
.count();
|
||||
|
||||
let Some(delay) = get_retry_delay(consecutive_failures) else {
|
||||
tracing::warn!(
|
||||
relay = relay_id,
|
||||
consecutive_failures,
|
||||
max_attempts = RELAY_SYNC_RETRY_MAX_ATTEMPTS,
|
||||
"relay sync retries exhausted; awaiting manual intervention"
|
||||
);
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
tracing::info!(
|
||||
relay = relay_id,
|
||||
source,
|
||||
consecutive_failures,
|
||||
delay_secs = delay.as_secs(),
|
||||
"scheduled relay sync retry"
|
||||
);
|
||||
|
||||
let relay_id = relay_id.to_string();
|
||||
|
||||
tokio::spawn(async move {
|
||||
tokio::time::sleep(delay).await;
|
||||
|
||||
match query::get_relay(&relay_id).await {
|
||||
Ok(Some(relay)) => sync_relay(&relay).await,
|
||||
Ok(None) => {}
|
||||
Err(e) => {
|
||||
tracing::error!(relay = %relay_id, error = %e, "relay sync retry task failed");
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn sync_relay(relay: &Relay) {
|
||||
match try_sync_relay(relay).await {
|
||||
Ok(()) => {
|
||||
tracing::info!(relay = %relay.id, "relay sync succeeded");
|
||||
if let Err(e) = command::complete_relay_sync(relay).await {
|
||||
tracing::error!(relay = %relay.id, error = %e, "failed to mark sync complete");
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::warn!(relay = %relay.id, error = %e, "relay sync failed");
|
||||
if let Err(e2) = command::fail_relay_sync(relay, e.to_string()).await {
|
||||
tracing::error!(relay = %relay.id, error = %e2, "failed to record sync failure");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn try_sync_relay(relay: &Relay) -> Result<()> {
|
||||
// A relay is "new" (POST with a freshly generated secret) only if it has
|
||||
// never completed a sync. `synced == 1` short-circuits the activity lookup;
|
||||
// otherwise check the activity history so that a re-sync after an update
|
||||
// (which resets `synced` to 0) PATCHes instead of clobbering the secret.
|
||||
let is_new = relay.synced != 1
|
||||
&& query::get_latest_activity_for_resource_and_type(&relay.id, "complete_relay_sync")
|
||||
.await?
|
||||
.is_none();
|
||||
|
||||
let mut body = serde_json::json!({
|
||||
"host": format!("{}.{}", relay.subdomain, env::get().relay_domain),
|
||||
"schema": relay.id,
|
||||
"inactive": relay.status == RELAY_STATUS_INACTIVE
|
||||
|| relay.status == RELAY_STATUS_DELINQUENT,
|
||||
"info": {
|
||||
"name": relay.info_name,
|
||||
"icon": relay.info_icon,
|
||||
"description": relay.info_description,
|
||||
"pubkey": relay.tenant_pubkey,
|
||||
},
|
||||
"policy": {
|
||||
"public_join": relay.policy_public_join == 1,
|
||||
"strip_signatures": relay.policy_strip_signatures == 1,
|
||||
},
|
||||
"groups": { "enabled": relay.groups_enabled == 1 },
|
||||
"management": { "enabled": relay.management_enabled == 1 },
|
||||
"blossom": if relay.blossom_enabled == 1 {
|
||||
serde_json::json!({
|
||||
"enabled": true,
|
||||
"adapter": "s3",
|
||||
"s3": {
|
||||
"endpoint": env::get().blossom_s3_endpoint,
|
||||
"region": env::get().blossom_s3_region,
|
||||
"bucket": env::get().blossom_s3_bucket,
|
||||
"access_key": env::get().blossom_s3_access_key,
|
||||
"secret_key": env::get().blossom_s3_secret_key,
|
||||
"key_prefix": relay.id,
|
||||
},
|
||||
})
|
||||
} else {
|
||||
serde_json::json!({ "enabled": false })
|
||||
},
|
||||
"livekit": if relay.livekit_enabled == 1 {
|
||||
serde_json::json!({
|
||||
"enabled": true,
|
||||
"server_url": env::get().livekit_url,
|
||||
"api_key": env::get().livekit_api_key,
|
||||
"api_secret": env::get().livekit_api_secret,
|
||||
})
|
||||
} else {
|
||||
serde_json::json!({ "enabled": false })
|
||||
},
|
||||
"push": { "enabled": relay.push_enabled == 1 },
|
||||
"roles": {
|
||||
"admin": { "can_manage": true, "can_invite": true },
|
||||
"member": { "can_invite": true },
|
||||
},
|
||||
});
|
||||
|
||||
// Only provide a secret if the relay is new. This allows us to not store the relay secrets on our side.
|
||||
if is_new && let Some(obj) = body.as_object_mut() {
|
||||
obj.insert(
|
||||
"secret".to_string(),
|
||||
serde_json::Value::String(Keys::generate().secret_key().to_secret_hex()),
|
||||
);
|
||||
}
|
||||
|
||||
let method = if is_new { HttpMethod::POST } else { HttpMethod::PATCH };
|
||||
request(method, &format!("relay/{}", relay.id), Some(&body)).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Fetch the member pubkeys of a relay from the zooid API.
|
||||
pub async fn list_relay_members(relay_id: &str) -> Result<Vec<String>> {
|
||||
#[derive(serde::Deserialize)]
|
||||
struct MembersResponse {
|
||||
members: Vec<String>,
|
||||
}
|
||||
|
||||
let response = request(HttpMethod::GET, &format!("relay/{relay_id}/members"), None).await?;
|
||||
let parsed: MembersResponse = response.json().await?;
|
||||
Ok(parsed.members)
|
||||
}
|
||||
|
||||
/// Sends an authenticated request to the zooid API at `path` (relative to
|
||||
/// `env.zooid_api_url`). Returns the response on 2xx; bails with the body
|
||||
/// text otherwise.
|
||||
async fn request(
|
||||
method: HttpMethod,
|
||||
path: &str,
|
||||
body: Option<&serde_json::Value>,
|
||||
) -> Result<reqwest::Response> {
|
||||
let client = reqwest::Client::builder()
|
||||
.timeout(Duration::from_secs(5))
|
||||
.build()?;
|
||||
let base = env::get().zooid_api_url.trim_end_matches('/');
|
||||
let path = path.trim_start_matches('/');
|
||||
let url = format!("{base}/{path}");
|
||||
let auth = env::get().make_auth(&url, method).await?;
|
||||
|
||||
let reqwest_method = match method {
|
||||
HttpMethod::GET => reqwest::Method::GET,
|
||||
HttpMethod::POST => reqwest::Method::POST,
|
||||
HttpMethod::PUT => reqwest::Method::PUT,
|
||||
HttpMethod::PATCH => reqwest::Method::PATCH,
|
||||
};
|
||||
|
||||
let mut req = client
|
||||
.request(reqwest_method, &url)
|
||||
.header("Authorization", auth);
|
||||
if let Some(body) = body {
|
||||
req = req.json(body);
|
||||
}
|
||||
|
||||
let response = req.send().await?;
|
||||
|
||||
if !response.status().is_success() {
|
||||
let status = response.status();
|
||||
let text = response.text().await.unwrap_or_default();
|
||||
anyhow::bail!("zooid {method} {path} returned {status}: {text}");
|
||||
}
|
||||
Ok(response)
|
||||
}
|
||||
|
||||
+3
-5
@@ -20,7 +20,6 @@ use tower_http::cors::{AllowOrigin, CorsLayer, Any};
|
||||
|
||||
use crate::api::Api;
|
||||
use crate::billing::Billing;
|
||||
use crate::infra::Infra;
|
||||
use crate::robot::Robot;
|
||||
use crate::stripe::Stripe;
|
||||
|
||||
@@ -39,9 +38,8 @@ async fn main() -> Result<()> {
|
||||
|
||||
let robot = Robot::new().await?;
|
||||
let stripe = Stripe::new();
|
||||
let infra = Infra::new();
|
||||
let billing = Billing::new(robot.clone());
|
||||
let api = Api::new(billing.clone(), stripe, robot, infra.clone());
|
||||
let api = Api::new(billing.clone(), stripe, robot);
|
||||
|
||||
let parsed = env::get()
|
||||
.server_allow_origins
|
||||
@@ -55,8 +53,8 @@ async fn main() -> Result<()> {
|
||||
|
||||
let app = api.router().layer(cors);
|
||||
|
||||
tokio::spawn(async move {
|
||||
infra.start().await;
|
||||
tokio::spawn(async {
|
||||
infra::start().await;
|
||||
});
|
||||
|
||||
tokio::spawn(async move {
|
||||
|
||||
+35
-18
@@ -1,21 +1,26 @@
|
||||
use serde::{Deserialize, Serialize};
|
||||
use sqlx::types::Json;
|
||||
|
||||
pub const RELAY_STATUS_ACTIVE: &str = "active";
|
||||
pub const RELAY_STATUS_INACTIVE: &str = "inactive";
|
||||
pub const RELAY_STATUS_DELINQUENT: &str = "delinquent";
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, sqlx::FromRow)]
|
||||
pub struct Activity {
|
||||
pub id: String,
|
||||
pub tenant: String,
|
||||
pub created_at: i64,
|
||||
pub activity_type: String,
|
||||
pub resource_type: String,
|
||||
pub resource_id: String,
|
||||
pub billed_at: Option<i64>,
|
||||
/// The relay's plan at the time of a `create_relay`/`update_relay` activity;
|
||||
/// `None` for all other activity types.
|
||||
pub plan_id: Option<String>,
|
||||
/// Per-resource_type snapshot of a resource's state captured on each activity,
|
||||
/// stored as JSON in `activity.snapshot`. Tagged on `resource_type` so the JSON
|
||||
/// is self-describing and the variant matches the activity row's column. Add a
|
||||
/// variant per resource type that needs state preserved on the activity log.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
#[serde(tag = "resource_type", rename_all = "snake_case")]
|
||||
pub enum Snapshot {
|
||||
Relay { plan: String, status: String },
|
||||
}
|
||||
|
||||
impl Snapshot {
|
||||
pub fn resource_type(&self) -> &'static str {
|
||||
match self {
|
||||
Self::Relay { .. } => "relay",
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
@@ -41,12 +46,24 @@ pub struct Tenant {
|
||||
pub renewed_at: Option<i64>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, sqlx::FromRow)]
|
||||
pub struct Activity {
|
||||
pub id: String,
|
||||
pub tenant_pubkey: String,
|
||||
pub created_at: i64,
|
||||
pub activity_type: String,
|
||||
pub resource_type: String,
|
||||
pub resource_id: String,
|
||||
pub billed_at: Option<i64>,
|
||||
pub snapshot: Json<Snapshot>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, sqlx::FromRow)]
|
||||
pub struct Relay {
|
||||
pub id: String,
|
||||
pub tenant: String,
|
||||
pub tenant_pubkey: String,
|
||||
pub subdomain: String,
|
||||
pub plan: String,
|
||||
pub plan_id: String,
|
||||
pub status: String,
|
||||
pub sync_error: String,
|
||||
pub info_name: String,
|
||||
@@ -66,9 +83,9 @@ impl Default for Relay {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
id: String::new(),
|
||||
tenant: String::new(),
|
||||
tenant_pubkey: String::new(),
|
||||
subdomain: String::new(),
|
||||
plan: String::new(),
|
||||
plan_id: String::new(),
|
||||
status: RELAY_STATUS_ACTIVE.to_string(),
|
||||
sync_error: String::new(),
|
||||
info_name: String::new(),
|
||||
@@ -106,7 +123,7 @@ pub struct InvoiceItem {
|
||||
pub activity_id: Option<String>,
|
||||
pub tenant_pubkey: String,
|
||||
pub relay_id: String,
|
||||
pub plan: String,
|
||||
pub plan_id: String,
|
||||
pub amount: i64,
|
||||
pub description: String,
|
||||
pub created_at: i64,
|
||||
@@ -123,7 +140,7 @@ pub struct Bolt11 {
|
||||
pub settled_at: Option<i64>,
|
||||
}
|
||||
|
||||
#[allow(dead_code)] // backs the `intent` table for the (not yet implemented) Stripe intent flow
|
||||
#[allow(dead_code)] // mirrors the `intent` table; rows record paid Stripe PaymentIntents but aren't read back into this struct yet
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, sqlx::FromRow)]
|
||||
pub struct Intent {
|
||||
pub id: String,
|
||||
|
||||
+47
-44
@@ -1,4 +1,4 @@
|
||||
use anyhow::Result;
|
||||
use anyhow::{Result, anyhow};
|
||||
|
||||
use crate::models::{Activity, Bolt11, Invoice, InvoiceItem, Plan, Relay, Tenant};
|
||||
use crate::db::pool;
|
||||
@@ -15,7 +15,7 @@ fn select_activity(tail: &str) -> String {
|
||||
format!("SELECT * FROM activity {tail}")
|
||||
}
|
||||
|
||||
// Plans
|
||||
// --- Plans ---
|
||||
|
||||
pub fn list_plans() -> Vec<Plan> {
|
||||
vec![
|
||||
@@ -46,11 +46,14 @@ pub fn list_plans() -> Vec<Plan> {
|
||||
]
|
||||
}
|
||||
|
||||
pub fn get_plan(plan_id: &str) -> Option<Plan> {
|
||||
list_plans().into_iter().find(|p| p.id == plan_id)
|
||||
pub fn get_plan(plan_id: &str) -> Result<Plan> {
|
||||
list_plans()
|
||||
.into_iter()
|
||||
.find(|p| p.id == plan_id)
|
||||
.ok_or_else(|| anyhow!("plan not found: {plan_id}"))
|
||||
}
|
||||
|
||||
// Tenants
|
||||
// --- Tenants ---
|
||||
|
||||
pub async fn list_tenants() -> Result<Vec<Tenant>> {
|
||||
Ok(sqlx::query_as::<_, Tenant>(&select_tenant(""))
|
||||
@@ -65,7 +68,7 @@ pub async fn get_tenant(pubkey: &str) -> Result<Option<Tenant>> {
|
||||
.await?)
|
||||
}
|
||||
|
||||
// Relays
|
||||
// --- Relays ---
|
||||
|
||||
pub async fn list_relays() -> Result<Vec<Relay>> {
|
||||
Ok(sqlx::query_as::<_, Relay>(&select_relay(""))
|
||||
@@ -81,9 +84,9 @@ pub async fn list_relays_pending_sync() -> Result<Vec<Relay>> {
|
||||
)
|
||||
}
|
||||
|
||||
pub async fn list_relays_for_tenant(tenant_id: &str) -> Result<Vec<Relay>> {
|
||||
Ok(sqlx::query_as::<_, Relay>(&select_relay("WHERE tenant = ?"))
|
||||
.bind(tenant_id)
|
||||
pub async fn list_relays_for_tenant(tenant_pubkey: &str) -> Result<Vec<Relay>> {
|
||||
Ok(sqlx::query_as::<_, Relay>(&select_relay("WHERE tenant_pubkey = ?"))
|
||||
.bind(tenant_pubkey)
|
||||
.fetch_all(pool())
|
||||
.await?)
|
||||
}
|
||||
@@ -95,7 +98,25 @@ pub async fn get_relay(id: &str) -> Result<Option<Relay>> {
|
||||
.await?)
|
||||
}
|
||||
|
||||
// Invoices
|
||||
/// The relay's plan immediately before `before`, read from the most recent
|
||||
/// relay-activity snapshot with `created_at < before`. Billing uses this as
|
||||
/// the `old` side of a plan-change delta.
|
||||
pub async fn get_relay_plan_before(relay_id: &str, before: i64) -> Result<Option<String>> {
|
||||
Ok(sqlx::query_scalar::<_, String>(
|
||||
"SELECT json_extract(snapshot, '$.plan') FROM activity
|
||||
WHERE resource_id = ?
|
||||
AND resource_type = 'relay'
|
||||
AND created_at < ?
|
||||
ORDER BY created_at DESC
|
||||
LIMIT 1",
|
||||
)
|
||||
.bind(relay_id)
|
||||
.bind(before)
|
||||
.fetch_optional(pool())
|
||||
.await?)
|
||||
}
|
||||
|
||||
// --- Invoices ---
|
||||
|
||||
pub async fn get_invoice(invoice_id: &str) -> Result<Option<Invoice>> {
|
||||
Ok(sqlx::query_as::<_, Invoice>("SELECT * FROM invoice WHERE id = ?")
|
||||
@@ -104,7 +125,7 @@ pub async fn get_invoice(invoice_id: &str) -> Result<Option<Invoice>> {
|
||||
.await?)
|
||||
}
|
||||
|
||||
pub async fn list_invoices(tenant_pubkey: &str) -> Result<Vec<Invoice>> {
|
||||
pub async fn list_invoices_for_tenant(tenant_pubkey: &str) -> Result<Vec<Invoice>> {
|
||||
Ok(sqlx::query_as::<_, Invoice>(
|
||||
"SELECT * FROM invoice WHERE tenant_pubkey = ? ORDER BY created_at DESC",
|
||||
)
|
||||
@@ -113,7 +134,7 @@ pub async fn list_invoices(tenant_pubkey: &str) -> Result<Vec<Invoice>> {
|
||||
.await?)
|
||||
}
|
||||
|
||||
pub async fn get_latest_invoice(tenant_pubkey: &str) -> Result<Option<Invoice>> {
|
||||
pub async fn get_latest_invoice_for_tenant(tenant_pubkey: &str) -> Result<Option<Invoice>> {
|
||||
Ok(sqlx::query_as::<_, Invoice>(
|
||||
"SELECT * FROM invoice WHERE tenant_pubkey = ? ORDER BY created_at DESC LIMIT 1",
|
||||
)
|
||||
@@ -131,24 +152,7 @@ pub async fn get_invoice_items_for_invoice(invoice_id: &str) -> Result<Vec<Invoi
|
||||
)
|
||||
}
|
||||
|
||||
/// The relay's plan immediately before `before`, read from the activity log
|
||||
/// (the most recent `create_relay`/`update_relay` with `created_at < before`).
|
||||
/// Billing uses this as the `old` side of a plan-change delta.
|
||||
pub async fn get_relay_plan_before(relay_id: &str, before: i64) -> Result<Option<String>> {
|
||||
Ok(sqlx::query_scalar::<_, String>(
|
||||
"SELECT plan_id FROM activity
|
||||
WHERE resource_id = ?
|
||||
AND created_at < ?
|
||||
AND activity_type IN ('create_relay', 'update_relay')
|
||||
AND plan_id IS NOT NULL
|
||||
ORDER BY created_at DESC
|
||||
LIMIT 1",
|
||||
)
|
||||
.bind(relay_id)
|
||||
.bind(before)
|
||||
.fetch_optional(pool())
|
||||
.await?)
|
||||
}
|
||||
// --- Bolt11 ---
|
||||
|
||||
pub async fn get_bolt11(bolt11_id: &str) -> Result<Option<Bolt11>> {
|
||||
Ok(sqlx::query_as::<_, Bolt11>("SELECT * FROM bolt11 WHERE id = ?")
|
||||
@@ -166,7 +170,7 @@ pub async fn get_bolt11_for_invoice(invoice_id: &str) -> Result<Option<Bolt11>>
|
||||
.await?)
|
||||
}
|
||||
|
||||
// Activity
|
||||
// --- Activity ---
|
||||
|
||||
/// Billable activity for a tenant not yet folded into an invoice. The
|
||||
/// activity-type filter and the `billed_at IS NULL` guard live here so the
|
||||
@@ -174,7 +178,7 @@ pub async fn get_bolt11_for_invoice(invoice_id: &str) -> Result<Option<Bolt11>>
|
||||
/// Ordered oldest-first so line items and proration apply in event order.
|
||||
pub async fn list_billable_activity_for_tenant(tenant_pubkey: &str) -> Result<Vec<Activity>> {
|
||||
Ok(sqlx::query_as::<_, Activity>(&select_activity(
|
||||
"WHERE tenant = ?
|
||||
"WHERE tenant_pubkey = ?
|
||||
AND billed_at IS NULL
|
||||
AND activity_type IN (
|
||||
'create_relay', 'update_relay', 'activate_relay', 'deactivate_relay'
|
||||
@@ -186,26 +190,25 @@ pub async fn list_billable_activity_for_tenant(tenant_pubkey: &str) -> Result<Ve
|
||||
.await?)
|
||||
}
|
||||
|
||||
/// A tenant's relay status/plan activity strictly before `before`, oldest-first
|
||||
/// — folded by billing to reconstruct each relay's state as of a period boundary.
|
||||
/// The relay's most recent activity strictly before `before`, or `None` if it
|
||||
/// had no activity yet — i.e. the relay didn't exist at that point. Billing
|
||||
/// reads its snapshot to recover the relay's state as of a period boundary.
|
||||
/// Strict `<` so a relay created exactly at the boundary isn't counted active
|
||||
/// there (its own creation charge covers that period).
|
||||
pub async fn list_relay_activity_before(
|
||||
tenant_pubkey: &str,
|
||||
pub async fn get_latest_relay_activity_before(
|
||||
relay_id: &str,
|
||||
before: i64,
|
||||
) -> Result<Vec<Activity>> {
|
||||
) -> Result<Option<Activity>> {
|
||||
Ok(sqlx::query_as::<_, Activity>(&select_activity(
|
||||
"WHERE tenant = ?
|
||||
"WHERE resource_id = ?
|
||||
AND resource_type = 'relay'
|
||||
AND activity_type IN (
|
||||
'create_relay', 'update_relay', 'activate_relay', 'deactivate_relay'
|
||||
)
|
||||
AND created_at < ?
|
||||
ORDER BY created_at ASC",
|
||||
ORDER BY created_at DESC
|
||||
LIMIT 1",
|
||||
))
|
||||
.bind(tenant_pubkey)
|
||||
.bind(relay_id)
|
||||
.bind(before)
|
||||
.fetch_all(pool())
|
||||
.fetch_optional(pool())
|
||||
.await?)
|
||||
}
|
||||
|
||||
|
||||
@@ -7,6 +7,9 @@ use tokio::sync::Mutex;
|
||||
|
||||
use crate::env;
|
||||
|
||||
/// The service's Nostr identity: it publishes the robot's profile and relay
|
||||
/// lists and sends encrypted direct messages to tenants, caching recipients'
|
||||
/// relay lists between sends.
|
||||
#[derive(Clone)]
|
||||
pub struct Robot {
|
||||
outbox_cache: std::sync::Arc<Mutex<HashMap<String, CacheEntry>>>,
|
||||
@@ -20,6 +23,7 @@ struct CacheEntry {
|
||||
}
|
||||
|
||||
impl Robot {
|
||||
/// Build the robot and publish its Nostr identity (profile and relay lists).
|
||||
pub async fn new() -> Result<Self> {
|
||||
let robot = Self {
|
||||
outbox_cache: std::sync::Arc::new(Mutex::new(HashMap::new())),
|
||||
@@ -80,6 +84,7 @@ impl Robot {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Send an encrypted direct message to a recipient over their messaging relays.
|
||||
pub async fn send_dm(&self, recipient: &str, message: &str) -> Result<()> {
|
||||
let outbox = self.fetch_outbox_relays(recipient).await?;
|
||||
if outbox.is_empty() {
|
||||
@@ -123,6 +128,7 @@ impl Robot {
|
||||
Ok(relays)
|
||||
}
|
||||
|
||||
/// The recipient's display name from their Nostr profile, if they have one.
|
||||
pub async fn fetch_nostr_name(&self, pubkey: &str) -> Option<String> {
|
||||
let pubkey = PublicKey::parse(pubkey).ok()?;
|
||||
let filter = Filter::new().author(pubkey).kind(Kind::Metadata).limit(1);
|
||||
|
||||
@@ -19,14 +19,9 @@ pub async fn get_tenant_latest_invoice(
|
||||
api.require_admin_or_tenant(&auth, &pubkey)?;
|
||||
let tenant = api.get_tenant_or_404(&pubkey).await?;
|
||||
|
||||
// Roll any outstanding charges (and due renewals) into an invoice, then
|
||||
// return the latest.
|
||||
api.billing
|
||||
.generate_invoice(&tenant)
|
||||
.await
|
||||
.map_err(internal)?;
|
||||
api.billing.reconcile_subscription(&tenant).await.map_err(internal)?;
|
||||
|
||||
let invoice = query::get_latest_invoice(&pubkey).await.map_err(internal)?;
|
||||
let invoice = query::get_latest_invoice_for_tenant(&pubkey).await.map_err(internal)?;
|
||||
|
||||
ok(invoice)
|
||||
}
|
||||
@@ -46,6 +41,8 @@ pub async fn get_invoice(
|
||||
ok(invoice)
|
||||
}
|
||||
|
||||
/// Return a payable Lightning invoice (bolt11) for an invoice, minting one if
|
||||
/// needed and first settling it if it was already paid out of band.
|
||||
pub async fn get_invoice_bolt11(
|
||||
State(api): State<Arc<Api>>,
|
||||
AuthedPubkey(auth): AuthedPubkey,
|
||||
|
||||
@@ -11,8 +11,6 @@ pub async fn list_plans(State(_api): State<Arc<Api>>) -> ApiResult {
|
||||
}
|
||||
|
||||
pub async fn get_plan(State(_api): State<Arc<Api>>, Path(id): Path<String>) -> ApiResult {
|
||||
match query::get_plan(&id) {
|
||||
Some(plan) => ok(plan),
|
||||
None => Err(not_found("plan not found")),
|
||||
}
|
||||
let plan = query::get_plan(&id).map_err(|_| not_found("plan not found"))?;
|
||||
ok(plan)
|
||||
}
|
||||
|
||||
@@ -9,7 +9,7 @@ use regex::Regex;
|
||||
use serde::Deserialize;
|
||||
|
||||
use crate::api::{Api, AuthedPubkey};
|
||||
use crate::{command, query};
|
||||
use crate::{command, infra, query};
|
||||
use crate::models::{
|
||||
RELAY_STATUS_ACTIVE, RELAY_STATUS_DELINQUENT, RELAY_STATUS_INACTIVE, Relay,
|
||||
};
|
||||
@@ -34,7 +34,7 @@ pub async fn get_relay(
|
||||
Path(id): Path<String>,
|
||||
) -> ApiResult {
|
||||
let relay = api.get_relay_or_404(&id).await?;
|
||||
api.require_admin_or_tenant(&auth, &relay.tenant)?;
|
||||
api.require_admin_or_tenant(&auth, &relay.tenant_pubkey)?;
|
||||
ok(relay)
|
||||
}
|
||||
|
||||
@@ -44,7 +44,7 @@ pub async fn list_relay_activity(
|
||||
Path(id): Path<String>,
|
||||
) -> ApiResult {
|
||||
let relay = api.get_relay_or_404(&id).await?;
|
||||
api.require_admin_or_tenant(&auth, &relay.tenant)?;
|
||||
api.require_admin_or_tenant(&auth, &relay.tenant_pubkey)?;
|
||||
|
||||
let activity = query::list_activity_for_resource(&id)
|
||||
.await
|
||||
@@ -58,17 +58,17 @@ pub async fn list_relay_members(
|
||||
Path(id): Path<String>,
|
||||
) -> ApiResult {
|
||||
let relay = api.get_relay_or_404(&id).await?;
|
||||
api.require_admin_or_tenant(&auth, &relay.tenant)?;
|
||||
api.require_admin_or_tenant(&auth, &relay.tenant_pubkey)?;
|
||||
|
||||
let members = fetch_relay_members(&api, &relay).await.map_err(internal)?;
|
||||
let members = fetch_relay_members(&relay).await.map_err(internal)?;
|
||||
ok(serde_json::json!({ "members": members }))
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
pub struct CreateRelayRequest {
|
||||
pub tenant: String,
|
||||
pub tenant_pubkey: String,
|
||||
pub subdomain: String,
|
||||
pub plan: String,
|
||||
pub plan_id: String,
|
||||
pub info_name: String,
|
||||
pub info_icon: String,
|
||||
pub info_description: String,
|
||||
@@ -86,7 +86,7 @@ pub async fn create_relay(
|
||||
AuthedPubkey(auth): AuthedPubkey,
|
||||
Json(payload): Json<CreateRelayRequest>,
|
||||
) -> ApiResult {
|
||||
api.require_admin_or_tenant(&auth, &payload.tenant)?;
|
||||
api.require_admin_or_tenant(&auth, &payload.tenant_pubkey)?;
|
||||
|
||||
let relay_id = format!(
|
||||
"{}_{}",
|
||||
@@ -96,9 +96,9 @@ pub async fn create_relay(
|
||||
|
||||
let relay = Relay {
|
||||
id: relay_id.clone(),
|
||||
tenant: payload.tenant,
|
||||
tenant_pubkey: payload.tenant_pubkey,
|
||||
subdomain: payload.subdomain,
|
||||
plan: payload.plan,
|
||||
plan_id: payload.plan_id,
|
||||
info_name: payload.info_name,
|
||||
info_icon: payload.info_icon,
|
||||
info_description: payload.info_description,
|
||||
@@ -124,7 +124,7 @@ pub async fn create_relay(
|
||||
#[derive(Deserialize)]
|
||||
pub struct UpdateRelayRequest {
|
||||
pub subdomain: Option<String>,
|
||||
pub plan: Option<String>,
|
||||
pub plan_id: Option<String>,
|
||||
pub info_name: Option<String>,
|
||||
pub info_icon: Option<String>,
|
||||
pub info_description: Option<String>,
|
||||
@@ -145,16 +145,16 @@ pub async fn update_relay(
|
||||
) -> ApiResult {
|
||||
let mut relay = api.get_relay_or_404(&id).await?;
|
||||
|
||||
api.require_admin_or_tenant(&auth, &relay.tenant)?;
|
||||
api.require_admin_or_tenant(&auth, &relay.tenant_pubkey)?;
|
||||
|
||||
let current_plan = relay.plan.clone();
|
||||
let requested_plan = payload.plan.clone();
|
||||
let current_plan = relay.plan_id.clone();
|
||||
let requested_plan = payload.plan_id.clone();
|
||||
|
||||
if let Some(v) = payload.subdomain {
|
||||
relay.subdomain = v;
|
||||
}
|
||||
if let Some(v) = requested_plan.clone() {
|
||||
relay.plan = v;
|
||||
relay.plan_id = v;
|
||||
}
|
||||
if let Some(v) = payload.info_name {
|
||||
relay.info_name = v;
|
||||
@@ -194,10 +194,9 @@ pub async fn update_relay(
|
||||
.is_some_and(|requested| requested != current_plan);
|
||||
|
||||
if plan_changed {
|
||||
let selected_plan =
|
||||
query::get_plan(&relay.plan).expect("validated plan must exist");
|
||||
let selected_plan = query::get_plan(&relay.plan_id).map_err(internal)?;
|
||||
if let Some(limit) = selected_plan.members {
|
||||
let current_members = fetch_relay_members(&api, &relay)
|
||||
let current_members = fetch_relay_members(&relay)
|
||||
.await
|
||||
.map_err(internal)?
|
||||
.len() as i64;
|
||||
@@ -225,7 +224,7 @@ pub async fn deactivate_relay(
|
||||
Path(id): Path<String>,
|
||||
) -> ApiResult {
|
||||
let relay = api.get_relay_or_404(&id).await?;
|
||||
api.require_admin_or_tenant(&auth, &relay.tenant)?;
|
||||
api.require_admin_or_tenant(&auth, &relay.tenant_pubkey)?;
|
||||
|
||||
if relay.status == RELAY_STATUS_DELINQUENT {
|
||||
return Err(bad_request("relay-is-delinquent", "relay is delinquent"));
|
||||
@@ -248,7 +247,7 @@ pub async fn reactivate_relay(
|
||||
Path(id): Path<String>,
|
||||
) -> ApiResult {
|
||||
let relay = api.get_relay_or_404(&id).await?;
|
||||
api.require_admin_or_tenant(&auth, &relay.tenant)?;
|
||||
api.require_admin_or_tenant(&auth, &relay.tenant_pubkey)?;
|
||||
|
||||
if relay.status == RELAY_STATUS_DELINQUENT {
|
||||
return Err(bad_request("relay-is-delinquent", "relay is delinquent"));
|
||||
@@ -265,12 +264,12 @@ pub async fn reactivate_relay(
|
||||
|
||||
// --- helpers ----------------------------------------------------------------
|
||||
|
||||
async fn fetch_relay_members(api: &Api, relay: &Relay) -> Result<Vec<String>> {
|
||||
async fn fetch_relay_members(relay: &Relay) -> Result<Vec<String>> {
|
||||
if relay.synced == 0 {
|
||||
return Ok(Vec::new());
|
||||
}
|
||||
|
||||
api.infra.list_relay_members(&relay.id).await
|
||||
infra::list_relay_members(&relay.id).await
|
||||
}
|
||||
|
||||
const RESERVED_SUBDOMAINS: [&str; 3] = ["api", "admin", "internal"];
|
||||
@@ -278,14 +277,17 @@ const RESERVED_SUBDOMAINS: [&str; 3] = ["api", "admin", "internal"];
|
||||
static SUBDOMAIN_RE: LazyLock<Regex> =
|
||||
LazyLock::new(|| Regex::new(r"^[a-z0-9][a-z0-9-]{1,61}[a-z0-9]$").unwrap());
|
||||
|
||||
/// Validate and normalize a relay before persistence: enforce the subdomain
|
||||
/// format and reserved names, require an existing plan that permits any enabled
|
||||
/// premium features, and coerce the boolean columns to 0/1.
|
||||
fn prepare_relay(mut relay: Relay) -> Result<Relay, ApiError> {
|
||||
if !SUBDOMAIN_RE.is_match(&relay.subdomain)
|
||||
|| RESERVED_SUBDOMAINS.contains(&relay.subdomain.as_str()) {
|
||||
return Err(unprocessable("invalid-subdomain", "subdomain is invalid"));
|
||||
}
|
||||
|
||||
let plan = query::get_plan(&relay.plan)
|
||||
.ok_or_else(|| unprocessable("invalid-plan", "plan not found"))?;
|
||||
let plan = query::get_plan(&relay.plan_id)
|
||||
.map_err(|_| unprocessable("invalid-plan", "plan not found"))?;
|
||||
|
||||
if (!plan.blossom && relay.blossom_enabled == 1) || (!plan.livekit && relay.livekit_enabled == 1) {
|
||||
return Err(unprocessable("premium-feature", "feature requires a paid plan"));
|
||||
@@ -302,6 +304,7 @@ fn prepare_relay(mut relay: Relay) -> Result<Relay, ApiError> {
|
||||
Ok(relay)
|
||||
}
|
||||
|
||||
/// Translate a duplicate-subdomain write into a 422; anything else is a 500.
|
||||
fn map_relay_write_error(e: anyhow::Error) -> ApiError {
|
||||
if matches!(map_unique_error(&e), Some("subdomain-exists")) {
|
||||
unprocessable("subdomain-exists", "subdomain already exists")
|
||||
|
||||
@@ -48,6 +48,9 @@ pub async fn list_tenants(
|
||||
.collect::<Vec<_>>())
|
||||
}
|
||||
|
||||
/// Create the tenant row for the calling pubkey and provision its Stripe
|
||||
/// customer. Idempotent: an existing tenant (including one created by a
|
||||
/// concurrent unique-constraint race) is returned as-is.
|
||||
pub async fn create_tenant(
|
||||
State(api): State<Arc<Api>>,
|
||||
AuthedPubkey(pubkey): AuthedPubkey,
|
||||
@@ -138,7 +141,7 @@ pub async fn list_tenant_relays(
|
||||
ok(relays)
|
||||
}
|
||||
|
||||
|
||||
/// List a tenant's invoices, most recent first.
|
||||
pub async fn list_tenant_invoices(
|
||||
State(api): State<Arc<Api>>,
|
||||
AuthedPubkey(auth): AuthedPubkey,
|
||||
@@ -146,7 +149,7 @@ pub async fn list_tenant_invoices(
|
||||
) -> ApiResult {
|
||||
api.require_admin_or_tenant(&auth, &pubkey)?;
|
||||
|
||||
let invoices = query::list_invoices(&pubkey)
|
||||
let invoices = query::list_invoices_for_tenant(&pubkey)
|
||||
.await
|
||||
.map_err(internal)?;
|
||||
|
||||
@@ -158,6 +161,8 @@ pub struct StripeSessionParams {
|
||||
return_url: Option<String>,
|
||||
}
|
||||
|
||||
/// Create a Stripe billing-portal session for the tenant to manage their saved
|
||||
/// payment methods, returning the portal URL.
|
||||
pub async fn create_stripe_session(
|
||||
State(api): State<Arc<Api>>,
|
||||
AuthedPubkey(auth): AuthedPubkey,
|
||||
|
||||
+10
-2
@@ -12,13 +12,17 @@ use crate::env;
|
||||
|
||||
const STRIPE_API: &str = "https://api.stripe.com/v1";
|
||||
|
||||
// Stripe struct and impl
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct Stripe {
|
||||
http: reqwest::Client,
|
||||
}
|
||||
|
||||
impl Default for Stripe {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
}
|
||||
}
|
||||
|
||||
impl Stripe {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
@@ -54,6 +58,8 @@ impl Stripe {
|
||||
|
||||
// --- Customers ---
|
||||
|
||||
/// Create a Stripe customer for a tenant and return its id. Idempotent on
|
||||
/// `tenant_pubkey` so retrying a tenant's creation reuses the same customer.
|
||||
pub async fn create_customer(&self, tenant_pubkey: &str, name: &str) -> Result<String> {
|
||||
let body = self
|
||||
.post("/customers")
|
||||
@@ -142,6 +148,8 @@ impl Stripe {
|
||||
|
||||
// --- Portal ---
|
||||
|
||||
/// Open a Stripe billing-portal session for the customer, returning the URL
|
||||
/// where they can manage their saved payment methods.
|
||||
pub async fn create_portal_session(
|
||||
&self,
|
||||
customer_id: &str,
|
||||
|
||||
@@ -4,6 +4,9 @@ use nwc::prelude::{
|
||||
TransactionState,
|
||||
};
|
||||
|
||||
/// A Nostr Wallet Connect wallet, used both as the service's receiving wallet
|
||||
/// and as a tenant's paying wallet. Each call spins up and shuts down its own
|
||||
/// short-lived NWC client; nothing is pooled across calls.
|
||||
#[derive(Clone)]
|
||||
pub struct Wallet {
|
||||
url: NostrWalletConnectURI,
|
||||
|
||||
@@ -32,8 +32,8 @@ export default function PaymentDialog(props: PaymentDialogProps) {
|
||||
const billedRelays = createMemo(() => {
|
||||
const planById = new Map(plans().map((p) => [p.id, p]))
|
||||
return (relays() ?? [])
|
||||
.map((relay) => ({ relay, plan: planById.get(relay.plan) }))
|
||||
.filter((entry) => entry.plan?.amount > 0)
|
||||
.map((relay) => ({ relay, plan: planById.get(relay.plan_id) }))
|
||||
.filter((entry) => Boolean(entry.plan?.amount))
|
||||
})
|
||||
|
||||
async function loadBolt11() {
|
||||
@@ -147,7 +147,7 @@ export default function PaymentDialog(props: PaymentDialogProps) {
|
||||
<li class="flex items-center justify-between gap-3 text-sm">
|
||||
<span class="truncate text-gray-900">{relay.info_name || relay.subdomain}</span>
|
||||
<span class="flex-shrink-0 text-xs text-gray-500">
|
||||
{plan?.name ?? relay.plan}
|
||||
{plan?.name ?? relay.plan_id}
|
||||
<Show when={plan}> · ${(plan!.amount / 100).toFixed(2)}/mo</Show>
|
||||
</span>
|
||||
</li>
|
||||
|
||||
@@ -32,9 +32,9 @@ function memberLabel(members: number | null) {
|
||||
|
||||
type PricingTableProps = {
|
||||
selectable?: boolean
|
||||
selectedPlan?: PlanId
|
||||
onSelect?: (plan: PlanId) => void
|
||||
onCta?: (plan: PlanId) => void
|
||||
selectedPlanId?: PlanId
|
||||
onSelect?: (planId: PlanId) => void
|
||||
onCta?: (planId: PlanId) => void
|
||||
}
|
||||
|
||||
export default function PricingTable(props: PricingTableProps) {
|
||||
@@ -43,7 +43,7 @@ export default function PricingTable(props: PricingTableProps) {
|
||||
<For each={plans()}>
|
||||
{(plan) => {
|
||||
const isPopular = plan.id === "basic"
|
||||
const isSelected = () => props.selectable && props.selectedPlan === plan.id
|
||||
const isSelected = () => props.selectable && props.selectedPlanId === plan.id
|
||||
|
||||
const card = (
|
||||
<>
|
||||
|
||||
@@ -63,7 +63,7 @@ type RelayDetailCardProps = {
|
||||
onToggleMediaStorage?: () => void
|
||||
onToggleLivekitSupport?: () => void
|
||||
onTogglePushNotifications?: () => void
|
||||
onUpdatePlan?: (plan: PlanId) => Promise<void>
|
||||
onUpdatePlan?: (planId: PlanId) => Promise<void>
|
||||
enforcePlanLimits?: boolean
|
||||
showPlanActions?: boolean
|
||||
}
|
||||
@@ -76,17 +76,17 @@ export default function RelayDetailCard(props: RelayDetailCardProps) {
|
||||
return fallback
|
||||
}
|
||||
const [menuOpen, setMenuOpen] = createSignal(false)
|
||||
const [plan, setPlan] = createSignal<PlanId>(props.relay.plan)
|
||||
const [planId, setPlanId] = createSignal<PlanId>(props.relay.plan_id)
|
||||
const [pendingAction, setPendingAction] = createSignal<"deactivate" | "reactivate" | null>(null)
|
||||
|
||||
let menuContainerRef: HTMLDivElement | undefined
|
||||
|
||||
const memberLimitLabel = () => {
|
||||
const p = plans().find(p => p.id === r().plan)
|
||||
const p = plans().find(p => p.id === r().plan_id)
|
||||
if (!p) return "?"
|
||||
return p.members === null ? "∞" : String(p.members)
|
||||
}
|
||||
const planLimited = () => (props.enforcePlanLimits ?? true) && r().plan === "free"
|
||||
const planLimited = () => (props.enforcePlanLimits ?? true) && r().plan_id === "free"
|
||||
const showPlanActions = () => props.showPlanActions ?? true
|
||||
const actionBusy = () => pendingAction() === "deactivate" ? !!props.deactivating : pendingAction() === "reactivate" ? !!props.reactivating : false
|
||||
const relayLabel = () => r().info_name || r().subdomain
|
||||
@@ -107,11 +107,11 @@ export default function RelayDetailCard(props: RelayDetailCardProps) {
|
||||
const confirmBusyLabel = () => pendingAction() === "deactivate" ? "Deactivating..." : "Reactivating..."
|
||||
const confirmTone = () => pendingAction() === "deactivate" ? "danger" : "primary"
|
||||
|
||||
async function changePlan(plan: PlanId) {
|
||||
setPlan(plan)
|
||||
async function changePlanId(planId: PlanId) {
|
||||
setPlanId(planId)
|
||||
try {
|
||||
await props.onUpdatePlan?.(plan)
|
||||
setToastMessage(`Plan updated to ${plan}`, "success")
|
||||
await props.onUpdatePlan?.(planId)
|
||||
setToastMessage(`Plan updated to ${planId}`, "success")
|
||||
} catch {
|
||||
// error is handled by the caller
|
||||
}
|
||||
@@ -360,7 +360,7 @@ export default function RelayDetailCard(props: RelayDetailCardProps) {
|
||||
</Field>
|
||||
<Show when={props.showTenant}>
|
||||
<Field label="Tenant">
|
||||
<span class="font-mono text-xs break-all">{r().tenant}</span>
|
||||
<span class="font-mono text-xs break-all">{r().tenant_pubkey}</span>
|
||||
</Field>
|
||||
</Show>
|
||||
</MembershipSection>
|
||||
@@ -373,15 +373,15 @@ export default function RelayDetailCard(props: RelayDetailCardProps) {
|
||||
when={props.onUpdatePlan}
|
||||
fallback={
|
||||
<Field label="Current plan">
|
||||
<span class="capitalize text-gray-900">{r().plan}</span>
|
||||
<span class="capitalize text-gray-900">{r().plan_id}</span>
|
||||
</Field>
|
||||
}
|
||||
>
|
||||
<div class="lg:col-span-2 space-y-4">
|
||||
<PricingTable
|
||||
selectable
|
||||
selectedPlan={plan()}
|
||||
onSelect={changePlan}
|
||||
selectedPlanId={planId()}
|
||||
onSelect={changePlanId}
|
||||
/>
|
||||
</div>
|
||||
</Show>
|
||||
|
||||
@@ -5,7 +5,7 @@ import { validateSubdomainLabel } from "@/lib/subdomain"
|
||||
import { setToastMessage } from "@/components/Toast"
|
||||
import { plans } from "@/lib/state"
|
||||
|
||||
export type RelayFormValues = Pick<Relay, "info_name" | "subdomain" | "info_icon" | "info_description" | "plan">
|
||||
export type RelayFormValues = Pick<Relay, "info_name" | "subdomain" | "info_icon" | "info_description" | "plan_id">
|
||||
|
||||
type RelayFormProps = {
|
||||
initialValues?: Partial<RelayFormValues>
|
||||
@@ -16,8 +16,8 @@ type RelayFormProps = {
|
||||
}
|
||||
|
||||
export default function RelayForm(props: RelayFormProps) {
|
||||
const defaultPlanId = createMemo(() => props.initialValues?.plan ?? plans()[0]?.id ?? "free")
|
||||
const [plan, setPlan] = createSignal(defaultPlanId())
|
||||
const defaultPlanId = createMemo(() => props.initialValues?.plan_id ?? plans()[0]?.id ?? "free")
|
||||
const [planId, setPlanId] = createSignal(defaultPlanId())
|
||||
const [name, setName] = createSignal(props.initialValues?.info_name ?? "")
|
||||
const [subdomain, setSubdomain] = createSignal(props.initialValues?.subdomain ?? "")
|
||||
const [icon, setIcon] = createSignal(props.initialValues?.info_icon ?? "")
|
||||
@@ -27,7 +27,7 @@ export default function RelayForm(props: RelayFormProps) {
|
||||
async function handleSubmit(e: Event) {
|
||||
e.preventDefault()
|
||||
|
||||
if (!plan()) {
|
||||
if (!planId()) {
|
||||
setToastMessage("Please select a plan")
|
||||
return
|
||||
}
|
||||
@@ -43,7 +43,7 @@ export default function RelayForm(props: RelayFormProps) {
|
||||
|
||||
try {
|
||||
await props.onSubmit({
|
||||
plan: plan(),
|
||||
plan_id: planId(),
|
||||
info_name: name(),
|
||||
subdomain: subdomain(),
|
||||
info_icon: icon(),
|
||||
@@ -56,7 +56,7 @@ export default function RelayForm(props: RelayFormProps) {
|
||||
}
|
||||
}
|
||||
|
||||
createEffect(() => setPlan(defaultPlanId()))
|
||||
createEffect(() => setPlanId(defaultPlanId()))
|
||||
|
||||
createEffect(() => {
|
||||
if (props.syncSubdomainWithName) {
|
||||
@@ -112,8 +112,8 @@ export default function RelayForm(props: RelayFormProps) {
|
||||
{(p) => (
|
||||
<button
|
||||
type="button"
|
||||
onClick={() => setPlan(p.id)}
|
||||
class={`border-2 rounded-xl p-4 text-left transition-colors ${plan() === p.id ? "border-blue-600 bg-blue-50" : "border-gray-200 hover:border-gray-300"}`}
|
||||
onClick={() => setPlanId(p.id)}
|
||||
class={`border-2 rounded-xl p-4 text-left transition-colors ${planId() === p.id ? "border-blue-600 bg-blue-50" : "border-gray-200 hover:border-gray-300"}`}
|
||||
>
|
||||
<div class="font-bold text-gray-900">{p.name}</div>
|
||||
<div class="text-sm text-gray-500 mt-1">
|
||||
|
||||
@@ -17,7 +17,7 @@ export default function RelayListItem(props: RelayListItemProps) {
|
||||
<p class="font-medium text-gray-900">{props.relay.info_name || props.relay.subdomain}</p>
|
||||
<p class="text-xs text-gray-500">{props.relay.subdomain}.spaces.coracle.social</p>
|
||||
{props.showTenant && (
|
||||
<p class="text-xs text-gray-500 break-all mt-1">Tenant: {props.relay.tenant}</p>
|
||||
<p class="text-xs text-gray-500 break-all mt-1">Tenant: {props.relay.tenant_pubkey}</p>
|
||||
)}
|
||||
</div>
|
||||
<Show
|
||||
|
||||
@@ -44,9 +44,9 @@ export type PlanId = string
|
||||
|
||||
export type Relay = {
|
||||
id: string
|
||||
tenant: string
|
||||
tenant_pubkey: string
|
||||
subdomain: string
|
||||
plan: PlanId
|
||||
plan_id: PlanId
|
||||
status: string
|
||||
sync_error: string
|
||||
synced: number
|
||||
@@ -63,9 +63,9 @@ export type Relay = {
|
||||
}
|
||||
|
||||
export type CreateRelayInput = {
|
||||
tenant?: string
|
||||
tenant_pubkey?: string
|
||||
subdomain: string
|
||||
plan: string
|
||||
plan_id: string
|
||||
info_name?: string
|
||||
info_icon?: string
|
||||
info_description?: string
|
||||
@@ -80,7 +80,7 @@ export type CreateRelayInput = {
|
||||
|
||||
export type UpdateRelayInput = {
|
||||
subdomain?: string
|
||||
plan?: string
|
||||
plan_id?: string
|
||||
info_name?: string
|
||||
info_icon?: string
|
||||
info_description?: string
|
||||
@@ -115,7 +115,7 @@ export type Invoice = {
|
||||
|
||||
export type Activity = {
|
||||
id: string
|
||||
tenant: string
|
||||
tenant_pubkey: string
|
||||
created_at: number
|
||||
activity_type: string
|
||||
resource_type: string
|
||||
|
||||
@@ -116,8 +116,8 @@ export const createRelayForActiveTenant = (input: CreateRelayInput) => {
|
||||
|
||||
const overrides = {
|
||||
tenant: account()!.pubkey,
|
||||
blossom_enabled: input.plan === "free" ? 0 : 1,
|
||||
livekit_enabled: input.plan === "free" ? 0 : 1,
|
||||
blossom_enabled: input.plan_id === "free" ? 0 : 1,
|
||||
livekit_enabled: input.plan_id === "free" ? 0 : 1,
|
||||
}
|
||||
|
||||
return createRelay({...defaults, ...input, ...overrides})
|
||||
@@ -127,7 +127,7 @@ export const updateActiveTenant = (input: { nwc_url?: string }) => updateTenant(
|
||||
|
||||
export const updateRelayById = (id: string, input: UpdateRelayInput) => updateRelay(id, input)
|
||||
|
||||
export const updateRelayPlanById = (id: string, plan: string) => updateRelay(id, { plan })
|
||||
export const updateRelayPlanById = (id: string, plan_id: string) => updateRelay(id, { plan_id })
|
||||
|
||||
export const deactivateRelayById = (id: string) => deactivateRelay(id)
|
||||
|
||||
|
||||
@@ -77,14 +77,14 @@ export default function useRelayToggles(
|
||||
}
|
||||
}
|
||||
|
||||
async function handleUpdatePlan(plan: PlanId) {
|
||||
async function handleUpdatePlan(plan_id: PlanId) {
|
||||
const current = relay()
|
||||
if (!current) return
|
||||
|
||||
const previous = current
|
||||
const next = { ...current, plan }
|
||||
const update: Record<string, unknown> = { plan }
|
||||
if (plan === "free") {
|
||||
const next = { ...current, plan_id }
|
||||
const update: Record<string, unknown> = { plan_id }
|
||||
if (plan_id === "free") {
|
||||
next.blossom_enabled = 0
|
||||
next.livekit_enabled = 0
|
||||
update.blossom_enabled = 0
|
||||
@@ -101,7 +101,7 @@ export default function useRelayToggles(
|
||||
throw e
|
||||
}
|
||||
|
||||
if (plan !== "free") {
|
||||
if (plan_id !== "free") {
|
||||
const needsSetup = await tenantNeedsPaymentSetup()
|
||||
if (needsSetup) {
|
||||
const invoice = await getLatestOpenInvoice()
|
||||
@@ -116,9 +116,9 @@ export default function useRelayToggles(
|
||||
onToggleStripSignatures: () => toggle("policy_strip_signatures", false),
|
||||
onToggleGroups: () => toggle("groups_enabled", true),
|
||||
onToggleManagement: () => toggle("management_enabled", true),
|
||||
onToggleMediaStorage: () => toggle("blossom_enabled", relay()?.plan !== "free"),
|
||||
onToggleMediaStorage: () => toggle("blossom_enabled", relay()?.plan_id !== "free"),
|
||||
onTogglePushNotifications: () => toggle("push_enabled", true),
|
||||
onToggleLivekitSupport: () => toggle("livekit_enabled", relay()?.plan !== "free"),
|
||||
onToggleLivekitSupport: () => toggle("livekit_enabled", relay()?.plan_id !== "free"),
|
||||
}
|
||||
|
||||
return { busy, handleDeactivate, handleReactivate, handleUpdatePlan, pendingInvoice, clearPendingInvoice: () => setPendingInvoice(undefined), pendingPaymentSetup, clearPendingPaymentSetup: () => setPendingPaymentSetup(false), toggles }
|
||||
|
||||
@@ -17,10 +17,10 @@ export default function Home() {
|
||||
const [showRelayModal, setShowRelayModal] = createSignal(false)
|
||||
const [showLoginModal, setShowLoginModal] = createSignal(false)
|
||||
const [draftRelay, setDraftRelay] = createSignal<RelayFormValues>()
|
||||
const [initialPlan, setInitialPlan] = createSignal<RelayFormValues["plan"]>("free")
|
||||
const [initialPlanId, setInitialPlanId] = createSignal<RelayFormValues["plan_id"]>("free")
|
||||
|
||||
function openRelayModal(plan: RelayFormValues["plan"] = "free") {
|
||||
setInitialPlan(plan)
|
||||
function openRelayModal(planId: RelayFormValues["plan_id"] = "free") {
|
||||
setInitialPlanId(planId)
|
||||
setShowRelayModal(true)
|
||||
}
|
||||
|
||||
@@ -404,7 +404,7 @@ export default function Home() {
|
||||
|
||||
<RelayForm
|
||||
syncSubdomainWithName
|
||||
initialValues={{ plan: initialPlan() }}
|
||||
initialValues={{ plan_id: initialPlanId() }}
|
||||
onSubmit={onRelayFormSubmit}
|
||||
submitLabel="Continue"
|
||||
submittingLabel="Creating..."
|
||||
|
||||
@@ -45,7 +45,7 @@ export default function RelayDetail() {
|
||||
const isPaidRelay = createMemo(() => {
|
||||
const r = relay()
|
||||
if (!r) return false
|
||||
const plan = plans().find(p => p.id === r.plan)
|
||||
const plan = plans().find(p => p.id === r.plan_id)
|
||||
return !!(plan && plan.amount > 0)
|
||||
})
|
||||
|
||||
|
||||
@@ -18,7 +18,7 @@ export default function RelayNew() {
|
||||
const relay = await createRelayForActiveTenant(values)
|
||||
createdRelayId = relay.id
|
||||
|
||||
if (values.plan !== "free") {
|
||||
if (values.plan_id !== "free") {
|
||||
const needsSetup = await tenantNeedsPaymentSetup()
|
||||
if (needsSetup) {
|
||||
const invoice = await getLatestOpenInvoice()
|
||||
|
||||
Reference in New Issue
Block a user