forked from coracle/caravel
Implement stripe subscription sync
This commit is contained in:
@@ -10,7 +10,9 @@ CREATE TABLE IF NOT EXISTS activity (
|
|||||||
CREATE TABLE IF NOT EXISTS tenant (
|
CREATE TABLE IF NOT EXISTS tenant (
|
||||||
pubkey TEXT PRIMARY KEY,
|
pubkey TEXT PRIMARY KEY,
|
||||||
nwc_url TEXT NOT NULL DEFAULT '',
|
nwc_url TEXT NOT NULL DEFAULT '',
|
||||||
created_at INTEGER NOT NULL
|
created_at INTEGER NOT NULL,
|
||||||
|
stripe_customer_id TEXT NOT NULL DEFAULT '',
|
||||||
|
stripe_subscription_id TEXT NOT NULL DEFAULT ''
|
||||||
);
|
);
|
||||||
|
|
||||||
CREATE TABLE IF NOT EXISTS relay (
|
CREATE TABLE IF NOT EXISTS relay (
|
||||||
@@ -19,6 +21,7 @@ CREATE TABLE IF NOT EXISTS relay (
|
|||||||
schema TEXT NOT NULL,
|
schema TEXT NOT NULL,
|
||||||
subdomain TEXT NOT NULL UNIQUE,
|
subdomain TEXT NOT NULL UNIQUE,
|
||||||
plan TEXT NOT NULL,
|
plan TEXT NOT NULL,
|
||||||
|
stripe_subscription_item_id TEXT,
|
||||||
status TEXT NOT NULL,
|
status TEXT NOT NULL,
|
||||||
synced INTEGER NOT NULL DEFAULT 0,
|
synced INTEGER NOT NULL DEFAULT 0,
|
||||||
sync_error TEXT NOT NULL DEFAULT '',
|
sync_error TEXT NOT NULL DEFAULT '',
|
||||||
|
|||||||
+28
-17
@@ -6,7 +6,7 @@ use axum::{
|
|||||||
extract::{Path, State},
|
extract::{Path, State},
|
||||||
http::{HeaderMap, StatusCode},
|
http::{HeaderMap, StatusCode},
|
||||||
response::{IntoResponse, Response},
|
response::{IntoResponse, Response},
|
||||||
routing::{get, post, put},
|
routing::{get, post},
|
||||||
};
|
};
|
||||||
use base64::Engine;
|
use base64::Engine;
|
||||||
use nostr_sdk::{Event, JsonUtil, Kind};
|
use nostr_sdk::{Event, JsonUtil, Kind};
|
||||||
@@ -325,23 +325,33 @@ async fn get_identity(
|
|||||||
) -> std::result::Result<Response, ApiError> {
|
) -> std::result::Result<Response, ApiError> {
|
||||||
let pubkey = state.api.extract_auth_pubkey(&headers)?;
|
let pubkey = state.api.extract_auth_pubkey(&headers)?;
|
||||||
let is_admin = state.api.admins.iter().any(|a| a == &pubkey);
|
let is_admin = state.api.admins.iter().any(|a| a == &pubkey);
|
||||||
let tenant = Tenant {
|
|
||||||
pubkey: pubkey.clone(),
|
|
||||||
nwc_url: String::new(),
|
|
||||||
created_at: now_ts(),
|
|
||||||
};
|
|
||||||
|
|
||||||
match state.api.command.create_tenant(&tenant).await {
|
// Only create if tenant doesn't exist yet
|
||||||
Ok(()) => true,
|
if let Ok(None) = state.api.query.get_tenant(&pubkey).await {
|
||||||
Err(e) if matches!(map_unique_error(&e), Some("pubkey-exists")) => true,
|
// TODO: Call Stripe API to create customer and subscription
|
||||||
Err(e) => {
|
let stripe_customer_id = String::new();
|
||||||
return Ok(err(
|
let stripe_subscription_id = String::new();
|
||||||
StatusCode::INTERNAL_SERVER_ERROR,
|
|
||||||
"internal",
|
let tenant = Tenant {
|
||||||
&e.to_string(),
|
pubkey: pubkey.clone(),
|
||||||
));
|
nwc_url: String::new(),
|
||||||
}
|
created_at: now_ts(),
|
||||||
};
|
stripe_customer_id,
|
||||||
|
stripe_subscription_id,
|
||||||
|
};
|
||||||
|
|
||||||
|
match state.api.command.create_tenant(&tenant).await {
|
||||||
|
Ok(()) => {}
|
||||||
|
Err(e) if matches!(map_unique_error(&e), Some("pubkey-exists")) => {}
|
||||||
|
Err(e) => {
|
||||||
|
return Ok(err(
|
||||||
|
StatusCode::INTERNAL_SERVER_ERROR,
|
||||||
|
"internal",
|
||||||
|
&e.to_string(),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
Ok(ok(
|
Ok(ok(
|
||||||
StatusCode::OK,
|
StatusCode::OK,
|
||||||
@@ -472,6 +482,7 @@ async fn create_relay(
|
|||||||
schema: String::new(),
|
schema: String::new(),
|
||||||
subdomain: payload.subdomain,
|
subdomain: payload.subdomain,
|
||||||
plan: payload.plan,
|
plan: payload.plan,
|
||||||
|
stripe_subscription_item_id: None,
|
||||||
status: "active".to_string(),
|
status: "active".to_string(),
|
||||||
sync_error: String::new(),
|
sync_error: String::new(),
|
||||||
info_name: payload.info_name.unwrap_or_default(),
|
info_name: payload.info_name.unwrap_or_default(),
|
||||||
|
|||||||
@@ -1,6 +1,7 @@
|
|||||||
use anyhow::Result;
|
use anyhow::Result;
|
||||||
|
|
||||||
use crate::command::Command;
|
use crate::command::Command;
|
||||||
|
use crate::models::Activity;
|
||||||
use crate::query::Query;
|
use crate::query::Query;
|
||||||
use crate::robot::Robot;
|
use crate::robot::Robot;
|
||||||
|
|
||||||
@@ -23,6 +24,62 @@ impl Billing {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub async fn start(self) {
|
||||||
|
let mut rx = self.command.notify.subscribe();
|
||||||
|
|
||||||
|
loop {
|
||||||
|
match rx.recv().await {
|
||||||
|
Ok(activity) => {
|
||||||
|
if let Err(e) = self.handle_activity(&activity).await {
|
||||||
|
tracing::error!(error = %e, "billing handle_activity failed");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
|
||||||
|
tracing::warn!(missed = n, "billing lagged");
|
||||||
|
}
|
||||||
|
Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn handle_activity(&self, activity: &Activity) -> Result<()> {
|
||||||
|
let needs_billing_sync = matches!(
|
||||||
|
activity.activity_type.as_str(),
|
||||||
|
"create_relay" | "update_relay" | "activate_relay" | "deactivate_relay"
|
||||||
|
| "fail_relay_sync" | "complete_relay_sync"
|
||||||
|
);
|
||||||
|
|
||||||
|
if needs_billing_sync {
|
||||||
|
self.sync_relay_subscription_item(activity).await?;
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn sync_relay_subscription_item(&self, activity: &Activity) -> Result<()> {
|
||||||
|
let Some(relay) = self.query.get_relay(&activity.resource_id).await? else {
|
||||||
|
return Ok(());
|
||||||
|
};
|
||||||
|
|
||||||
|
let should_delete = !relay.sync_error.is_empty()
|
||||||
|
|| relay.synced == 0
|
||||||
|
|| relay.plan == "free"
|
||||||
|
|| relay.status == "inactive";
|
||||||
|
|
||||||
|
if should_delete {
|
||||||
|
if relay.stripe_subscription_item_id.is_some() {
|
||||||
|
// TODO: Delete subscription item via Stripe API
|
||||||
|
self.command.delete_relay_subscription_item(&relay.id).await?;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// TODO: Create or update subscription item via Stripe API
|
||||||
|
// let stripe_subscription_item_id = ...;
|
||||||
|
// self.command.set_relay_subscription_item(&relay.id, &stripe_subscription_item_id).await?;
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
pub async fn deactivate_relay(&self, relay_id: &str) -> Result<()> {
|
pub async fn deactivate_relay(&self, relay_id: &str) -> Result<()> {
|
||||||
let relay = self
|
let relay = self
|
||||||
.query
|
.query
|
||||||
|
|||||||
+23
-4
@@ -67,12 +67,14 @@ impl Command {
|
|||||||
let mut tx = self.pool.begin().await?;
|
let mut tx = self.pool.begin().await?;
|
||||||
|
|
||||||
sqlx::query(
|
sqlx::query(
|
||||||
"INSERT INTO tenant (pubkey, nwc_url, created_at)
|
"INSERT INTO tenant (pubkey, nwc_url, created_at, stripe_customer_id, stripe_subscription_id)
|
||||||
VALUES (?, ?, ?)",
|
VALUES (?, ?, ?, ?, ?)",
|
||||||
)
|
)
|
||||||
.bind(&tenant.pubkey)
|
.bind(&tenant.pubkey)
|
||||||
.bind(&tenant.nwc_url)
|
.bind(&tenant.nwc_url)
|
||||||
.bind(tenant.created_at)
|
.bind(tenant.created_at)
|
||||||
|
.bind(&tenant.stripe_customer_id)
|
||||||
|
.bind(&tenant.stripe_subscription_id)
|
||||||
.execute(&mut *tx)
|
.execute(&mut *tx)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
@@ -209,7 +211,7 @@ impl Command {
|
|||||||
pub async fn fail_relay_sync(&self, relay: &Relay, sync_error: String) -> Result<()> {
|
pub async fn fail_relay_sync(&self, relay: &Relay, sync_error: String) -> Result<()> {
|
||||||
let mut tx = self.pool.begin().await?;
|
let mut tx = self.pool.begin().await?;
|
||||||
|
|
||||||
sqlx::query("UPDATE relay SET status = 'inactive', sync_error = ? WHERE id = ?")
|
sqlx::query("UPDATE relay SET sync_error = ? WHERE id = ?")
|
||||||
.bind(&sync_error)
|
.bind(&sync_error)
|
||||||
.bind(&relay.id)
|
.bind(&relay.id)
|
||||||
.execute(&mut *tx)
|
.execute(&mut *tx)
|
||||||
@@ -225,7 +227,7 @@ impl Command {
|
|||||||
pub async fn complete_relay_sync(&self, relay_id: &str) -> Result<()> {
|
pub async fn complete_relay_sync(&self, relay_id: &str) -> Result<()> {
|
||||||
let mut tx = self.pool.begin().await?;
|
let mut tx = self.pool.begin().await?;
|
||||||
|
|
||||||
sqlx::query("UPDATE relay SET synced = 1, status = 'active', sync_error = '' WHERE id = ?")
|
sqlx::query("UPDATE relay SET synced = 1, sync_error = '' WHERE id = ?")
|
||||||
.bind(relay_id)
|
.bind(relay_id)
|
||||||
.execute(&mut *tx)
|
.execute(&mut *tx)
|
||||||
.await?;
|
.await?;
|
||||||
@@ -236,4 +238,21 @@ impl Command {
|
|||||||
self.emit(activity);
|
self.emit(activity);
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub async fn delete_relay_subscription_item(&self, relay_id: &str) -> Result<()> {
|
||||||
|
sqlx::query("UPDATE relay SET stripe_subscription_item_id = NULL WHERE id = ?")
|
||||||
|
.bind(relay_id)
|
||||||
|
.execute(&self.pool)
|
||||||
|
.await?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn set_relay_subscription_item(&self, relay_id: &str, stripe_subscription_item_id: &str) -> Result<()> {
|
||||||
|
sqlx::query("UPDATE relay SET stripe_subscription_item_id = ? WHERE id = ?")
|
||||||
|
.bind(stripe_subscription_item_id)
|
||||||
|
.bind(relay_id)
|
||||||
|
.execute(&self.pool)
|
||||||
|
.await?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -64,6 +64,10 @@ async fn main() -> Result<()> {
|
|||||||
infra.start().await;
|
infra.start().await;
|
||||||
});
|
});
|
||||||
|
|
||||||
|
tokio::spawn(async move {
|
||||||
|
billing.start().await;
|
||||||
|
});
|
||||||
|
|
||||||
let listener = tokio::net::TcpListener::bind(format!("{host}:{port}")).await?;
|
let listener = tokio::net::TcpListener::bind(format!("{host}:{port}")).await?;
|
||||||
axum::serve(listener, app).await?;
|
axum::serve(listener, app).await?;
|
||||||
Ok(())
|
Ok(())
|
||||||
|
|||||||
@@ -14,10 +14,11 @@ pub struct Activity {
|
|||||||
pub struct Plan {
|
pub struct Plan {
|
||||||
pub id: String,
|
pub id: String,
|
||||||
pub name: String,
|
pub name: String,
|
||||||
pub sats: i64,
|
pub amount: i64,
|
||||||
pub members: Option<i64>,
|
pub members: Option<i64>,
|
||||||
pub blossom: bool,
|
pub blossom: bool,
|
||||||
pub livekit: bool,
|
pub livekit: bool,
|
||||||
|
pub stripe_price_id: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize, sqlx::FromRow)]
|
#[derive(Debug, Clone, Serialize, Deserialize, sqlx::FromRow)]
|
||||||
@@ -26,6 +27,8 @@ pub struct Tenant {
|
|||||||
#[serde(skip_serializing)]
|
#[serde(skip_serializing)]
|
||||||
pub nwc_url: String,
|
pub nwc_url: String,
|
||||||
pub created_at: i64,
|
pub created_at: i64,
|
||||||
|
pub stripe_customer_id: String,
|
||||||
|
pub stripe_subscription_id: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize, sqlx::FromRow)]
|
#[derive(Debug, Clone, Serialize, Deserialize, sqlx::FromRow)]
|
||||||
@@ -35,6 +38,7 @@ pub struct Relay {
|
|||||||
pub schema: String,
|
pub schema: String,
|
||||||
pub subdomain: String,
|
pub subdomain: String,
|
||||||
pub plan: String,
|
pub plan: String,
|
||||||
|
pub stripe_subscription_item_id: Option<String>,
|
||||||
pub status: String,
|
pub status: String,
|
||||||
pub sync_error: String,
|
pub sync_error: String,
|
||||||
pub info_name: String,
|
pub info_name: String,
|
||||||
|
|||||||
+14
-8
@@ -15,7 +15,7 @@ impl Query {
|
|||||||
|
|
||||||
pub async fn list_tenants(&self) -> Result<Vec<Tenant>> {
|
pub async fn list_tenants(&self) -> Result<Vec<Tenant>> {
|
||||||
let rows = sqlx::query_as::<_, Tenant>(
|
let rows = sqlx::query_as::<_, Tenant>(
|
||||||
"SELECT pubkey, nwc_url, created_at
|
"SELECT pubkey, nwc_url, created_at, stripe_customer_id, stripe_subscription_id
|
||||||
FROM tenant
|
FROM tenant
|
||||||
ORDER BY pubkey",
|
ORDER BY pubkey",
|
||||||
)
|
)
|
||||||
@@ -26,7 +26,7 @@ impl Query {
|
|||||||
|
|
||||||
pub async fn get_tenant(&self, pubkey: &str) -> Result<Option<Tenant>> {
|
pub async fn get_tenant(&self, pubkey: &str) -> Result<Option<Tenant>> {
|
||||||
let row = sqlx::query_as::<_, Tenant>(
|
let row = sqlx::query_as::<_, Tenant>(
|
||||||
"SELECT pubkey, nwc_url, created_at
|
"SELECT pubkey, nwc_url, created_at, stripe_customer_id, stripe_subscription_id
|
||||||
FROM tenant
|
FROM tenant
|
||||||
WHERE pubkey = ?",
|
WHERE pubkey = ?",
|
||||||
)
|
)
|
||||||
@@ -41,33 +41,37 @@ impl Query {
|
|||||||
Plan {
|
Plan {
|
||||||
id: "free".to_string(),
|
id: "free".to_string(),
|
||||||
name: "Free".to_string(),
|
name: "Free".to_string(),
|
||||||
sats: 0,
|
amount: 0,
|
||||||
members: Some(10),
|
members: Some(10),
|
||||||
blossom: false,
|
blossom: false,
|
||||||
livekit: false,
|
livekit: false,
|
||||||
|
stripe_price_id: String::new(),
|
||||||
},
|
},
|
||||||
Plan {
|
Plan {
|
||||||
id: "basic".to_string(),
|
id: "basic".to_string(),
|
||||||
name: "Basic".to_string(),
|
name: "Basic".to_string(),
|
||||||
sats: 10_000,
|
amount: 500,
|
||||||
members: Some(100),
|
members: Some(100),
|
||||||
blossom: true,
|
blossom: true,
|
||||||
livekit: true,
|
livekit: true,
|
||||||
|
stripe_price_id: std::env::var("STRIPE_PRICE_BASIC").unwrap_or_default(),
|
||||||
},
|
},
|
||||||
Plan {
|
Plan {
|
||||||
id: "growth".to_string(),
|
id: "growth".to_string(),
|
||||||
name: "Growth".to_string(),
|
name: "Growth".to_string(),
|
||||||
sats: 50_000,
|
amount: 2500,
|
||||||
members: None,
|
members: None,
|
||||||
blossom: true,
|
blossom: true,
|
||||||
livekit: true,
|
livekit: true,
|
||||||
|
stripe_price_id: std::env::var("STRIPE_PRICE_GROWTH").unwrap_or_default(),
|
||||||
},
|
},
|
||||||
]
|
]
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn list_relays(&self) -> Result<Vec<Relay>> {
|
pub async fn list_relays(&self) -> Result<Vec<Relay>> {
|
||||||
let rows = sqlx::query_as::<_, Relay>(
|
let rows = sqlx::query_as::<_, Relay>(
|
||||||
"SELECT id, tenant, schema, subdomain, plan, status, sync_error,
|
"SELECT id, tenant, schema, subdomain, plan, stripe_subscription_item_id,
|
||||||
|
status, sync_error,
|
||||||
info_name, info_icon, info_description,
|
info_name, info_icon, info_description,
|
||||||
policy_public_join, policy_strip_signatures,
|
policy_public_join, policy_strip_signatures,
|
||||||
groups_enabled, management_enabled, blossom_enabled,
|
groups_enabled, management_enabled, blossom_enabled,
|
||||||
@@ -82,7 +86,8 @@ impl Query {
|
|||||||
|
|
||||||
pub async fn list_relays_for_tenant(&self, tenant_id: &str) -> Result<Vec<Relay>> {
|
pub async fn list_relays_for_tenant(&self, tenant_id: &str) -> Result<Vec<Relay>> {
|
||||||
let rows = sqlx::query_as::<_, Relay>(
|
let rows = sqlx::query_as::<_, Relay>(
|
||||||
"SELECT id, tenant, schema, subdomain, plan, status, sync_error,
|
"SELECT id, tenant, schema, subdomain, plan, stripe_subscription_item_id,
|
||||||
|
status, sync_error,
|
||||||
info_name, info_icon, info_description,
|
info_name, info_icon, info_description,
|
||||||
policy_public_join, policy_strip_signatures,
|
policy_public_join, policy_strip_signatures,
|
||||||
groups_enabled, management_enabled, blossom_enabled,
|
groups_enabled, management_enabled, blossom_enabled,
|
||||||
@@ -99,7 +104,8 @@ impl Query {
|
|||||||
|
|
||||||
pub async fn get_relay(&self, id: &str) -> Result<Option<Relay>> {
|
pub async fn get_relay(&self, id: &str) -> Result<Option<Relay>> {
|
||||||
let row = sqlx::query_as::<_, Relay>(
|
let row = sqlx::query_as::<_, Relay>(
|
||||||
"SELECT id, tenant, schema, subdomain, plan, status, sync_error,
|
"SELECT id, tenant, schema, subdomain, plan, stripe_subscription_item_id,
|
||||||
|
status, sync_error,
|
||||||
info_name, info_icon, info_description,
|
info_name, info_icon, info_description,
|
||||||
policy_public_join, policy_strip_signatures,
|
policy_public_join, policy_strip_signatures,
|
||||||
groups_enabled, management_enabled, blossom_enabled,
|
groups_enabled, management_enabled, blossom_enabled,
|
||||||
|
|||||||
Reference in New Issue
Block a user