Separate command and query

This commit is contained in:
Jon Staab
2026-04-01 15:33:03 -07:00
parent baae65b8b2
commit 07dfe86210
18 changed files with 615 additions and 549 deletions
+24 -22
View File
@@ -13,14 +13,16 @@ use nostr_sdk::{Event, JsonUtil, Kind};
use serde::{Deserialize, Serialize};
use crate::billing::Billing;
use crate::command::Command;
use crate::models::{Relay, Tenant};
use crate::repo::Repo;
use crate::query::Query;
#[derive(Clone)]
pub struct Api {
host: String,
admins: Vec<String>,
repo: Repo,
query: Query,
command: Command,
billing: Billing,
}
@@ -57,7 +59,7 @@ impl IntoResponse for ApiError {
}
impl Api {
pub fn new(repo: Repo, billing: Billing) -> Self {
pub fn new(query: Query, command: Command, billing: Billing) -> Self {
let host = std::env::var("HOST").unwrap_or_else(|_| "127.0.0.1".to_string());
let admins = std::env::var("ADMINS")
.unwrap_or_default()
@@ -68,7 +70,8 @@ impl Api {
Self {
host,
admins,
repo,
query,
command,
billing,
}
}
@@ -302,7 +305,7 @@ async fn list_tenants(
let pubkey = state.api.extract_auth_pubkey(&headers)?;
state.api.require_admin(&pubkey)?;
match state.api.repo.list_tenants().await {
match state.api.query.list_tenants().await {
Ok(tenants) => Ok(ok(StatusCode::OK, tenants)),
Err(e) => Ok(err(
StatusCode::INTERNAL_SERVER_ERROR,
@@ -313,7 +316,7 @@ async fn list_tenants(
}
async fn list_plans() -> Response {
ok(StatusCode::OK, Repo::list_plans())
ok(StatusCode::OK, Query::list_plans())
}
async fn get_identity(
@@ -328,7 +331,7 @@ async fn get_identity(
created_at: now_ts(),
};
match state.api.repo.create_tenant(&tenant).await {
match state.api.command.create_tenant(&tenant).await {
Ok(()) => true,
Err(e) if matches!(map_unique_error(&e), Some("pubkey-exists")) => true,
Err(e) => {
@@ -350,7 +353,7 @@ async fn get_identity(
}
async fn get_plan(Path(id): Path<String>) -> Response {
match Repo::list_plans().into_iter().find(|p| p.id == id) {
match Query::list_plans().into_iter().find(|p| p.id == id) {
Some(plan) => ok(StatusCode::OK, plan),
None => err(StatusCode::NOT_FOUND, "not-found", "plan not found"),
}
@@ -364,7 +367,7 @@ async fn get_tenant(
let auth = state.api.extract_auth_pubkey(&headers)?;
state.api.require_admin_or_tenant(&auth, &pubkey)?;
match state.api.repo.get_tenant(&pubkey).await {
match state.api.query.get_tenant(&pubkey).await {
Ok(Some(tenant)) => Ok(ok(StatusCode::OK, tenant)),
Ok(None) => Ok(err(StatusCode::NOT_FOUND, "not-found", "tenant not found")),
Err(e) => Ok(err(
@@ -382,7 +385,7 @@ async fn list_relays(
let pubkey = state.api.extract_auth_pubkey(&headers)?;
state.api.require_admin(&pubkey)?;
match state.api.repo.list_relays().await {
match state.api.query.list_relays().await {
Ok(relays) => Ok(ok(StatusCode::OK, relays)),
Err(e) => Ok(err(
StatusCode::INTERNAL_SERVER_ERROR,
@@ -400,7 +403,7 @@ async fn list_tenant_relays(
let auth = state.api.extract_auth_pubkey(&headers)?;
state.api.require_admin_or_tenant(&auth, &pubkey)?;
match state.api.repo.list_relays_for_tenant(&pubkey).await {
match state.api.query.list_relays_for_tenant(&pubkey).await {
Ok(relays) => Ok(ok(StatusCode::OK, relays)),
Err(e) => Ok(err(
StatusCode::INTERNAL_SERVER_ERROR,
@@ -417,7 +420,7 @@ async fn get_relay(
) -> std::result::Result<Response, ApiError> {
let auth = state.api.extract_auth_pubkey(&headers)?;
let relay = match state.api.repo.get_relay(&id).await {
let relay = match state.api.query.get_relay(&id).await {
Ok(Some(r)) => r,
Ok(None) => return Ok(err(StatusCode::NOT_FOUND, "not-found", "relay not found")),
Err(e) => {
@@ -441,7 +444,7 @@ async fn list_relay_activity(
) -> std::result::Result<Response, ApiError> {
let auth = state.api.extract_auth_pubkey(&headers)?;
let relay = match state.api.repo.get_relay(&id).await {
let relay = match state.api.query.get_relay(&id).await {
Ok(Some(r)) => r,
Ok(None) => return Ok(err(StatusCode::NOT_FOUND, "not-found", "relay not found")),
Err(e) => return Ok(err(StatusCode::INTERNAL_SERVER_ERROR, "internal", &e.to_string())),
@@ -449,7 +452,7 @@ async fn list_relay_activity(
state.api.require_admin_or_tenant(&auth, &relay.tenant)?;
match state.api.repo.list_activity_for_relay(&id).await {
match state.api.query.list_activity_for_relay(&id).await {
Ok(activity) => Ok(ok(StatusCode::OK, serde_json::json!({ "activity": activity }))),
Err(e) => Ok(err(StatusCode::INTERNAL_SERVER_ERROR, "internal", &e.to_string())),
}
@@ -502,7 +505,7 @@ async fn create_relay(
}
};
match state.api.repo.create_relay(&relay).await {
match state.api.command.create_relay(&relay).await {
Ok(()) => Ok(ok(StatusCode::CREATED, relay)),
Err(e) => {
if matches!(map_unique_error(&e), Some("subdomain-exists")) {
@@ -530,7 +533,7 @@ async fn update_relay(
) -> std::result::Result<Response, ApiError> {
let auth = state.api.extract_auth_pubkey(&headers)?;
let mut relay = match state.api.repo.get_relay(&id).await {
let mut relay = match state.api.query.get_relay(&id).await {
Ok(Some(r)) => r,
Ok(None) => return Ok(err(StatusCode::NOT_FOUND, "not-found", "relay not found")),
Err(e) => {
@@ -599,7 +602,7 @@ async fn update_relay(
}
};
match state.api.repo.update_relay(&relay).await {
match state.api.command.update_relay(&relay).await {
Ok(()) => Ok(ok(StatusCode::OK, relay)),
Err(e) => {
if matches!(map_unique_error(&e), Some("subdomain-exists")) {
@@ -626,7 +629,7 @@ async fn deactivate_relay(
) -> std::result::Result<Response, ApiError> {
let auth = state.api.extract_auth_pubkey(&headers)?;
let relay = match state.api.repo.get_relay(&id).await {
let relay = match state.api.query.get_relay(&id).await {
Ok(Some(r)) => r,
Ok(None) => return Ok(err(StatusCode::NOT_FOUND, "not-found", "relay not found")),
Err(e) => {
@@ -665,7 +668,7 @@ async fn reactivate_relay(
) -> std::result::Result<Response, ApiError> {
let auth = state.api.extract_auth_pubkey(&headers)?;
let relay = match state.api.repo.get_relay(&id).await {
let relay = match state.api.query.get_relay(&id).await {
Ok(Some(r)) => r,
Ok(None) => return Ok(err(StatusCode::NOT_FOUND, "not-found", "relay not found")),
Err(e) => {
@@ -706,7 +709,7 @@ async fn update_tenant(
let auth = state.api.extract_auth_pubkey(&headers)?;
state.api.require_admin_or_tenant(&auth, &pubkey)?;
let mut tenant = match state.api.repo.get_tenant(&pubkey).await {
let mut tenant = match state.api.query.get_tenant(&pubkey).await {
Ok(Some(t)) => t,
Ok(None) => return Ok(err(StatusCode::NOT_FOUND, "not-found", "tenant not found")),
Err(e) => {
@@ -722,7 +725,7 @@ async fn update_tenant(
tenant.nwc_url = nwc_url;
}
match state.api.repo.update_tenant(&tenant).await {
match state.api.command.update_tenant(&tenant).await {
Ok(()) => Ok(ok(StatusCode::OK, tenant)),
Err(e) => Ok(err(
StatusCode::INTERNAL_SERVER_ERROR,
@@ -731,4 +734,3 @@ async fn update_tenant(
)),
}
}
+11 -8
View File
@@ -1,40 +1,43 @@
use anyhow::Result;
use crate::repo::Repo;
use crate::command::Command;
use crate::query::Query;
use crate::robot::Robot;
#[derive(Clone)]
pub struct Billing {
nwc_url: String,
repo: Repo,
query: Query,
command: Command,
robot: Robot,
}
impl Billing {
pub fn new(repo: Repo, robot: Robot) -> Self {
pub fn new(query: Query, command: Command, robot: Robot) -> Self {
let nwc_url = std::env::var("NWC_URL").unwrap_or_default();
Self {
nwc_url,
repo,
query,
command,
robot,
}
}
pub async fn deactivate_relay(&self, relay_id: &str) -> Result<()> {
let relay = self
.repo
.query
.get_relay(relay_id)
.await?
.ok_or_else(|| anyhow::anyhow!("relay not found"))?;
self.repo.deactivate_relay(&relay).await
self.command.deactivate_relay(&relay).await
}
pub async fn reactivate_relay(&self, relay_id: &str) -> Result<()> {
let relay = self
.repo
.query
.get_relay(relay_id)
.await?
.ok_or_else(|| anyhow::anyhow!("relay not found"))?;
self.repo.activate_relay(&relay).await
self.command.activate_relay(&relay).await
}
}
+206
View File
@@ -0,0 +1,206 @@
use anyhow::Result;
use sqlx::{Sqlite, SqlitePool, Transaction};
use crate::models::{Relay, Tenant};
#[derive(Clone)]
pub struct Command {
pool: SqlitePool,
}
impl Command {
pub fn new(pool: SqlitePool) -> Self {
Self { pool }
}
async fn insert_activity(
tx: &mut Transaction<'_, Sqlite>,
activity_type: &str,
resource_type: &str,
resource_id: &str,
) -> Result<()> {
let tenant = match resource_type {
"tenant" => resource_id.to_string(),
"relay" => {
sqlx::query_scalar::<_, String>("SELECT tenant FROM relay WHERE id = ?")
.bind(resource_id)
.fetch_one(&mut **tx)
.await?
}
_ => anyhow::bail!("unknown resource_type: {}", resource_type),
};
sqlx::query(
"INSERT INTO activity (id, tenant, created_at, activity_type, resource_type, resource_id)
VALUES (?, ?, strftime('%s','now'), ?, ?, ?)",
)
.bind(uuid::Uuid::new_v4().to_string())
.bind(tenant)
.bind(activity_type)
.bind(resource_type)
.bind(resource_id)
.execute(&mut **tx)
.await?;
Ok(())
}
pub async fn create_tenant(&self, tenant: &Tenant) -> Result<()> {
let mut tx = self.pool.begin().await?;
sqlx::query(
"INSERT INTO tenant (pubkey, nwc_url, created_at)
VALUES (?, ?, ?)",
)
.bind(&tenant.pubkey)
.bind(&tenant.nwc_url)
.bind(tenant.created_at)
.execute(&mut *tx)
.await?;
Self::insert_activity(&mut tx, "create_tenant", "tenant", &tenant.pubkey).await?;
tx.commit().await?;
Ok(())
}
pub async fn update_tenant(&self, tenant: &Tenant) -> Result<()> {
let mut tx = self.pool.begin().await?;
sqlx::query("UPDATE tenant SET nwc_url = ? WHERE pubkey = ?")
.bind(&tenant.nwc_url)
.bind(&tenant.pubkey)
.execute(&mut *tx)
.await?;
Self::insert_activity(&mut tx, "update_tenant", "tenant", &tenant.pubkey).await?;
tx.commit().await?;
Ok(())
}
pub async fn create_relay(&self, relay: &Relay) -> Result<()> {
let mut tx = self.pool.begin().await?;
sqlx::query(
"INSERT INTO relay (
id, tenant, schema, subdomain, plan, status, sync_error,
info_name, info_icon, info_description,
policy_public_join, policy_strip_signatures,
groups_enabled, management_enabled, blossom_enabled,
livekit_enabled, push_enabled
) VALUES (?, ?, ?, ?, ?, 'new', ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
)
.bind(&relay.id)
.bind(&relay.tenant)
.bind(&relay.schema)
.bind(&relay.subdomain)
.bind(&relay.plan)
.bind(&relay.sync_error)
.bind(&relay.info_name)
.bind(&relay.info_icon)
.bind(&relay.info_description)
.bind(relay.policy_public_join)
.bind(relay.policy_strip_signatures)
.bind(relay.groups_enabled)
.bind(relay.management_enabled)
.bind(relay.blossom_enabled)
.bind(relay.livekit_enabled)
.bind(relay.push_enabled)
.execute(&mut *tx)
.await?;
Self::insert_activity(&mut tx, "create_relay", "relay", &relay.id).await?;
tx.commit().await?;
Ok(())
}
pub async fn update_relay(&self, relay: &Relay) -> Result<()> {
let mut tx = self.pool.begin().await?;
sqlx::query(
"UPDATE relay
SET tenant = ?, schema = ?, subdomain = ?, plan = ?, status = ?, sync_error = ?,
info_name = ?, info_icon = ?, info_description = ?,
policy_public_join = ?, policy_strip_signatures = ?,
groups_enabled = ?, management_enabled = ?, blossom_enabled = ?,
livekit_enabled = ?, push_enabled = ?
WHERE id = ?",
)
.bind(&relay.tenant)
.bind(&relay.schema)
.bind(&relay.subdomain)
.bind(&relay.plan)
.bind(&relay.status)
.bind(&relay.sync_error)
.bind(&relay.info_name)
.bind(&relay.info_icon)
.bind(&relay.info_description)
.bind(relay.policy_public_join)
.bind(relay.policy_strip_signatures)
.bind(relay.groups_enabled)
.bind(relay.management_enabled)
.bind(relay.blossom_enabled)
.bind(relay.livekit_enabled)
.bind(relay.push_enabled)
.bind(&relay.id)
.execute(&mut *tx)
.await?;
Self::insert_activity(&mut tx, "update_relay", "relay", &relay.id).await?;
tx.commit().await?;
Ok(())
}
pub async fn deactivate_relay(&self, relay: &Relay) -> Result<()> {
let mut tx = self.pool.begin().await?;
sqlx::query("UPDATE relay SET status = 'inactive' WHERE id = ?")
.bind(&relay.id)
.execute(&mut *tx)
.await?;
Self::insert_activity(&mut tx, "deactivate_relay", "relay", &relay.id).await?;
tx.commit().await?;
Ok(())
}
pub async fn activate_relay(&self, relay: &Relay) -> Result<()> {
let mut tx = self.pool.begin().await?;
sqlx::query("UPDATE relay SET status = 'active' WHERE id = ?")
.bind(&relay.id)
.execute(&mut *tx)
.await?;
Self::insert_activity(&mut tx, "activate_relay", "relay", &relay.id).await?;
tx.commit().await?;
Ok(())
}
pub async fn fail_relay_sync(&self, relay: &Relay, sync_error: String) -> Result<()> {
let mut tx = self.pool.begin().await?;
sqlx::query("UPDATE relay SET status = 'inactive', sync_error = ? WHERE id = ?")
.bind(&sync_error)
.bind(&relay.id)
.execute(&mut *tx)
.await?;
Self::insert_activity(&mut tx, "fail_relay_sync", "relay", &relay.id).await?;
tx.commit().await?;
Ok(())
}
pub async fn mark_relay_synced(&self, relay_id: &str) -> Result<()> {
sqlx::query("UPDATE relay SET synced = 1, status = 'active', sync_error = '' WHERE id = ?")
.bind(relay_id)
.execute(&self.pool)
.await?;
Ok(())
}
}
+12 -9
View File
@@ -2,7 +2,8 @@ use anyhow::Result;
use nostr_sdk::prelude::*;
use tokio::sync::Mutex;
use crate::repo::Repo;
use crate::command::Command;
use crate::query::Query;
#[derive(Clone)]
pub struct Infra {
@@ -12,12 +13,13 @@ pub struct Infra {
livekit_api_key: String,
livekit_api_secret: String,
api_secret: String,
repo: Repo,
query: Query,
command: Command,
last_activity_at: std::sync::Arc<Mutex<i64>>,
}
impl Infra {
pub fn new(repo: Repo) -> Self {
pub fn new(query: Query, command: Command) -> Self {
let api_url = std::env::var("ZOOID_API_URL").unwrap_or_default();
let relay_domain = std::env::var("RELAY_DOMAIN").unwrap_or_default();
let livekit_url = std::env::var("LIVEKIT_URL").unwrap_or_default();
@@ -31,14 +33,15 @@ impl Infra {
livekit_api_key,
livekit_api_secret,
api_secret,
repo,
query,
command,
last_activity_at: std::sync::Arc::new(Mutex::new(0)),
}
}
pub async fn start(self) {
// Initialize from DB so we don't replay historical activities on restart
match self.repo.max_activity_at().await {
match self.query.max_activity_at().await {
Ok(ts) => *self.last_activity_at.lock().await = ts,
Err(e) => tracing::error!(error = %e, "failed to read max activity timestamp"),
}
@@ -55,7 +58,7 @@ impl Infra {
pub async fn tick(&self) -> Result<()> {
let mut since_guard = self.last_activity_at.lock().await;
let since = *since_guard;
let activity = self.repo.list_activity(&since).await?;
let activity = self.query.list_activity(&since).await?;
for a in activity {
let needs_sync = matches!(
@@ -64,7 +67,7 @@ impl Infra {
);
if needs_sync {
let Some(relay) = self.repo.get_relay(&a.resource_id).await? else {
let Some(relay) = self.query.get_relay(&a.resource_id).await? else {
continue;
};
@@ -73,11 +76,11 @@ impl Infra {
match self.sync_relay(&relay, is_new).await {
Ok(()) => {
tracing::info!(relay = %relay.id, "relay sync succeeded");
self.repo.mark_relay_synced(&relay.id).await?
self.command.mark_relay_synced(&relay.id).await?
}
Err(e) => {
tracing::warn!(relay = %relay.id, error = %e, "relay sync failed");
self.repo.fail_relay_sync(&relay, e.to_string()).await?;
self.command.fail_relay_sync(&relay, e.to_string()).await?;
}
}
}
+3 -1
View File
@@ -1,6 +1,8 @@
pub mod api;
pub mod billing;
pub mod command;
pub mod infra;
pub mod models;
pub mod repo;
pub mod pool;
pub mod query;
pub mod robot;
+11 -6
View File
@@ -1,8 +1,10 @@
mod api;
mod billing;
mod command;
mod infra;
mod models;
mod repo;
mod query;
mod pool;
mod robot;
use anyhow::Result;
@@ -12,8 +14,9 @@ use tower_http::cors::{AllowOrigin, CorsLayer};
use crate::api::Api;
use crate::billing::Billing;
use crate::command::Command;
use crate::infra::Infra;
use crate::repo::Repo;
use crate::query::Query;
use crate::robot::Robot;
#[tokio::main]
@@ -25,11 +28,13 @@ async fn main() -> Result<()> {
.with(tracing_subscriber::fmt::layer())
.init();
let repo = Repo::new().await?;
let pool = pool::create_pool().await?;
let robot = Robot::new().await?;
let billing = Billing::new(repo.clone(), robot.clone());
let infra = Infra::new(repo.clone());
let api = Api::new(repo, billing.clone());
let query = Query::new(pool.clone());
let command = Command::new(pool);
let billing = Billing::new(query.clone(), command.clone(), robot.clone());
let infra = Infra::new(query.clone(), command.clone());
let api = Api::new(query, command, billing.clone());
let host = std::env::var("HOST").unwrap_or_else(|_| "127.0.0.1".to_string());
let port: u16 = std::env::var("PORT")
+51
View File
@@ -0,0 +1,51 @@
use std::path::Path;
use std::str::FromStr;
use anyhow::Result;
use sqlx::{
SqlitePool,
sqlite::{SqliteConnectOptions, SqlitePoolOptions},
};
pub async fn create_pool() -> Result<SqlitePool> {
let raw_database_url = std::env::var("DATABASE_URL")
.unwrap_or_else(|_| format!("sqlite://{}/data/caravel.db", env!("CARGO_MANIFEST_DIR")));
let database_url = normalize_sqlite_url(&raw_database_url);
if let Some(path) = database_url.strip_prefix("sqlite://")
&& !path.is_empty()
&& path != ":memory:"
&& let Some(parent) = Path::new(path).parent()
&& !parent.as_os_str().is_empty()
{
std::fs::create_dir_all(parent)?;
}
let connect_options =
SqliteConnectOptions::from_str(&database_url)?.create_if_missing(true);
let pool = SqlitePoolOptions::new()
.max_connections(5)
.connect_with(connect_options)
.await?;
sqlx::query("PRAGMA journal_mode = WAL;")
.execute(&pool)
.await?;
sqlx::migrate!("./migrations").run(&pool).await?;
Ok(pool)
}
fn normalize_sqlite_url(url: &str) -> String {
let Some(path) = url.strip_prefix("sqlite://") else {
return url.to_string();
};
if path.is_empty() || path == ":memory:" || Path::new(path).is_absolute() {
return url.to_string();
}
format!("sqlite://{}/{}", env!("CARGO_MANIFEST_DIR"), path)
}
+150
View File
@@ -0,0 +1,150 @@
use anyhow::Result;
use sqlx::SqlitePool;
use crate::models::{Activity, Plan, Relay, Tenant};
#[derive(Clone)]
pub struct Query {
pool: SqlitePool,
}
impl Query {
pub fn new(pool: SqlitePool) -> Self {
Self { pool }
}
pub async fn list_tenants(&self) -> Result<Vec<Tenant>> {
let rows = sqlx::query_as::<_, Tenant>(
"SELECT pubkey, nwc_url, created_at
FROM tenant
ORDER BY pubkey",
)
.fetch_all(&self.pool)
.await?;
Ok(rows)
}
pub async fn get_tenant(&self, pubkey: &str) -> Result<Option<Tenant>> {
let row = sqlx::query_as::<_, Tenant>(
"SELECT pubkey, nwc_url, created_at
FROM tenant
WHERE pubkey = ?",
)
.bind(pubkey)
.fetch_optional(&self.pool)
.await?;
Ok(row)
}
pub fn list_plans() -> Vec<Plan> {
vec![
Plan {
id: "free".to_string(),
name: "Free".to_string(),
sats: 0,
members: Some(10),
blossom: false,
livekit: false,
},
Plan {
id: "basic".to_string(),
name: "Basic".to_string(),
sats: 10_000,
members: Some(100),
blossom: true,
livekit: true,
},
Plan {
id: "growth".to_string(),
name: "Growth".to_string(),
sats: 50_000,
members: None,
blossom: true,
livekit: true,
},
]
}
pub async fn list_relays(&self) -> Result<Vec<Relay>> {
let rows = sqlx::query_as::<_, Relay>(
"SELECT id, tenant, schema, subdomain, plan, status, sync_error,
info_name, info_icon, info_description,
policy_public_join, policy_strip_signatures,
groups_enabled, management_enabled, blossom_enabled,
livekit_enabled, push_enabled, synced
FROM relay
ORDER BY id",
)
.fetch_all(&self.pool)
.await?;
Ok(rows)
}
pub async fn list_relays_for_tenant(&self, tenant_id: &str) -> Result<Vec<Relay>> {
let rows = sqlx::query_as::<_, Relay>(
"SELECT id, tenant, schema, subdomain, plan, status, sync_error,
info_name, info_icon, info_description,
policy_public_join, policy_strip_signatures,
groups_enabled, management_enabled, blossom_enabled,
livekit_enabled, push_enabled, synced
FROM relay
WHERE tenant = ?
ORDER BY id",
)
.bind(tenant_id)
.fetch_all(&self.pool)
.await?;
Ok(rows)
}
pub async fn get_relay(&self, id: &str) -> Result<Option<Relay>> {
let row = sqlx::query_as::<_, Relay>(
"SELECT id, tenant, schema, subdomain, plan, status, sync_error,
info_name, info_icon, info_description,
policy_public_join, policy_strip_signatures,
groups_enabled, management_enabled, blossom_enabled,
livekit_enabled, push_enabled, synced
FROM relay
WHERE id = ?",
)
.bind(id)
.fetch_optional(&self.pool)
.await?;
Ok(row)
}
pub async fn max_activity_at(&self) -> Result<i64> {
let val = sqlx::query_scalar::<_, Option<i64>>(
"SELECT MAX(created_at) FROM activity",
)
.fetch_one(&self.pool)
.await?;
Ok(val.unwrap_or(0))
}
pub async fn list_activity(&self, since: &i64) -> Result<Vec<Activity>> {
let rows = sqlx::query_as::<_, Activity>(
"SELECT id, tenant, created_at, activity_type, resource_type, resource_id
FROM activity
WHERE created_at > ?
ORDER BY created_at, id",
)
.bind(since)
.fetch_all(&self.pool)
.await?;
Ok(rows)
}
pub async fn list_activity_for_relay(&self, relay_id: &str) -> Result<Vec<Activity>> {
let rows = sqlx::query_as::<_, Activity>(
"SELECT id, tenant, created_at, activity_type, resource_type, resource_id
FROM activity
WHERE resource_type = 'relay' AND resource_id = ?
ORDER BY created_at DESC, id DESC",
)
.bind(relay_id)
.fetch_all(&self.pool)
.await?;
Ok(rows)
}
}
-372
View File
@@ -1,372 +0,0 @@
use std::path::Path;
use std::str::FromStr;
use anyhow::Result;
use sqlx::{
Sqlite, SqlitePool, Transaction,
sqlite::{SqliteConnectOptions, SqlitePoolOptions},
};
use crate::models::{Activity, Plan, Relay, Tenant};
#[derive(Clone)]
pub struct Repo {
pub pool: SqlitePool,
}
impl Repo {
pub async fn new() -> Result<Self> {
let raw_database_url = std::env::var("DATABASE_URL")
.unwrap_or_else(|_| format!("sqlite://{}/data/caravel.db", env!("CARGO_MANIFEST_DIR")));
let database_url = normalize_sqlite_url(&raw_database_url);
if let Some(path) = database_url.strip_prefix("sqlite://")
&& !path.is_empty()
&& path != ":memory:"
&& let Some(parent) = Path::new(path).parent()
&& !parent.as_os_str().is_empty()
{
std::fs::create_dir_all(parent)?;
}
let connect_options =
SqliteConnectOptions::from_str(&database_url)?.create_if_missing(true);
let pool = SqlitePoolOptions::new()
.max_connections(5)
.connect_with(connect_options)
.await?;
sqlx::query("PRAGMA journal_mode = WAL;")
.execute(&pool)
.await?;
sqlx::migrate!("./migrations").run(&pool).await?;
Ok(Self { pool })
}
async fn insert_activity(
tx: &mut Transaction<'_, Sqlite>,
activity_type: &str,
resource_type: &str,
resource_id: &str,
) -> Result<()> {
let tenant = match resource_type {
"tenant" => resource_id.to_string(),
"relay" => {
sqlx::query_scalar::<_, String>("SELECT tenant FROM relay WHERE id = ?")
.bind(resource_id)
.fetch_one(&mut **tx)
.await?
}
_ => anyhow::bail!("unknown resource_type: {}", resource_type),
};
sqlx::query(
"INSERT INTO activity (id, tenant, created_at, activity_type, resource_type, resource_id)
VALUES (?, ?, strftime('%s','now'), ?, ?, ?)",
)
.bind(uuid::Uuid::new_v4().to_string())
.bind(tenant)
.bind(activity_type)
.bind(resource_type)
.bind(resource_id)
.execute(&mut **tx)
.await?;
Ok(())
}
pub async fn list_tenants(&self) -> Result<Vec<Tenant>> {
let rows = sqlx::query_as::<_, Tenant>(
"SELECT pubkey, nwc_url, created_at
FROM tenant
ORDER BY pubkey",
)
.fetch_all(&self.pool)
.await?;
Ok(rows)
}
pub async fn get_tenant(&self, pubkey: &str) -> Result<Option<Tenant>> {
let row = sqlx::query_as::<_, Tenant>(
"SELECT pubkey, nwc_url, created_at
FROM tenant
WHERE pubkey = ?",
)
.bind(pubkey)
.fetch_optional(&self.pool)
.await?;
Ok(row)
}
pub async fn create_tenant(&self, tenant: &Tenant) -> Result<()> {
let mut tx = self.pool.begin().await?;
sqlx::query(
"INSERT INTO tenant (pubkey, nwc_url, created_at)
VALUES (?, ?, ?)",
)
.bind(&tenant.pubkey)
.bind(&tenant.nwc_url)
.bind(tenant.created_at)
.execute(&mut *tx)
.await?;
Self::insert_activity(&mut tx, "create_tenant", "tenant", &tenant.pubkey).await?;
tx.commit().await?;
Ok(())
}
pub async fn update_tenant(&self, tenant: &Tenant) -> Result<()> {
let mut tx = self.pool.begin().await?;
sqlx::query("UPDATE tenant SET nwc_url = ? WHERE pubkey = ?")
.bind(&tenant.nwc_url)
.bind(&tenant.pubkey)
.execute(&mut *tx)
.await?;
Self::insert_activity(&mut tx, "update_tenant", "tenant", &tenant.pubkey).await?;
tx.commit().await?;
Ok(())
}
pub async fn list_relays(&self) -> Result<Vec<Relay>> {
let rows = sqlx::query_as::<_, Relay>(
"SELECT id, tenant, schema, subdomain, plan, status, sync_error,
info_name, info_icon, info_description,
policy_public_join, policy_strip_signatures,
groups_enabled, management_enabled, blossom_enabled,
livekit_enabled, push_enabled, synced
FROM relay
ORDER BY id",
)
.fetch_all(&self.pool)
.await?;
Ok(rows)
}
pub async fn list_relays_for_tenant(&self, tenant_id: &str) -> Result<Vec<Relay>> {
let rows = sqlx::query_as::<_, Relay>(
"SELECT id, tenant, schema, subdomain, plan, status, sync_error,
info_name, info_icon, info_description,
policy_public_join, policy_strip_signatures,
groups_enabled, management_enabled, blossom_enabled,
livekit_enabled, push_enabled, synced
FROM relay
WHERE tenant = ?
ORDER BY id",
)
.bind(tenant_id)
.fetch_all(&self.pool)
.await?;
Ok(rows)
}
pub async fn get_relay(&self, id: &str) -> Result<Option<Relay>> {
let row = sqlx::query_as::<_, Relay>(
"SELECT id, tenant, schema, subdomain, plan, status, sync_error,
info_name, info_icon, info_description,
policy_public_join, policy_strip_signatures,
groups_enabled, management_enabled, blossom_enabled,
livekit_enabled, push_enabled, synced
FROM relay
WHERE id = ?",
)
.bind(id)
.fetch_optional(&self.pool)
.await?;
Ok(row)
}
pub async fn create_relay(&self, relay: &Relay) -> Result<()> {
let mut tx = self.pool.begin().await?;
sqlx::query(
"INSERT INTO relay (
id, tenant, schema, subdomain, plan, status, sync_error,
info_name, info_icon, info_description,
policy_public_join, policy_strip_signatures,
groups_enabled, management_enabled, blossom_enabled,
livekit_enabled, push_enabled
) VALUES (?, ?, ?, ?, ?, 'new', ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
)
.bind(&relay.id)
.bind(&relay.tenant)
.bind(&relay.schema)
.bind(&relay.subdomain)
.bind(&relay.plan)
.bind(&relay.sync_error)
.bind(&relay.info_name)
.bind(&relay.info_icon)
.bind(&relay.info_description)
.bind(relay.policy_public_join)
.bind(relay.policy_strip_signatures)
.bind(relay.groups_enabled)
.bind(relay.management_enabled)
.bind(relay.blossom_enabled)
.bind(relay.livekit_enabled)
.bind(relay.push_enabled)
.execute(&mut *tx)
.await?;
Self::insert_activity(&mut tx, "create_relay", "relay", &relay.id).await?;
tx.commit().await?;
Ok(())
}
pub async fn update_relay(&self, relay: &Relay) -> Result<()> {
let mut tx = self.pool.begin().await?;
sqlx::query(
"UPDATE relay
SET tenant = ?, schema = ?, subdomain = ?, plan = ?, status = ?, sync_error = ?,
info_name = ?, info_icon = ?, info_description = ?,
policy_public_join = ?, policy_strip_signatures = ?,
groups_enabled = ?, management_enabled = ?, blossom_enabled = ?,
livekit_enabled = ?, push_enabled = ?
WHERE id = ?",
)
.bind(&relay.tenant)
.bind(&relay.schema)
.bind(&relay.subdomain)
.bind(&relay.plan)
.bind(&relay.status)
.bind(&relay.sync_error)
.bind(&relay.info_name)
.bind(&relay.info_icon)
.bind(&relay.info_description)
.bind(relay.policy_public_join)
.bind(relay.policy_strip_signatures)
.bind(relay.groups_enabled)
.bind(relay.management_enabled)
.bind(relay.blossom_enabled)
.bind(relay.livekit_enabled)
.bind(relay.push_enabled)
.bind(&relay.id)
.execute(&mut *tx)
.await?;
Self::insert_activity(&mut tx, "update_relay", "relay", &relay.id).await?;
tx.commit().await?;
Ok(())
}
pub async fn deactivate_relay(&self, relay: &Relay) -> Result<()> {
let mut tx = self.pool.begin().await?;
sqlx::query("UPDATE relay SET status = 'inactive' WHERE id = ?")
.bind(&relay.id)
.execute(&mut *tx)
.await?;
Self::insert_activity(&mut tx, "deactivate_relay", "relay", &relay.id).await?;
tx.commit().await?;
Ok(())
}
pub async fn fail_relay_sync(&self, relay: &Relay, sync_error: String) -> Result<()> {
let mut tx = self.pool.begin().await?;
sqlx::query("UPDATE relay SET status = 'inactive', sync_error = ? WHERE id = ?")
.bind(&sync_error)
.bind(&relay.id)
.execute(&mut *tx)
.await?;
Self::insert_activity(&mut tx, "fail_relay_sync", "relay", &relay.id).await?;
tx.commit().await?;
Ok(())
}
pub async fn mark_relay_synced(&self, relay_id: &str) -> Result<()> {
sqlx::query("UPDATE relay SET synced = 1, status = 'active', sync_error = '' WHERE id = ?")
.bind(relay_id)
.execute(&self.pool)
.await?;
Ok(())
}
pub async fn max_activity_at(&self) -> Result<i64> {
let val = sqlx::query_scalar::<_, Option<i64>>(
"SELECT MAX(created_at) FROM activity",
)
.fetch_one(&self.pool)
.await?;
Ok(val.unwrap_or(0))
}
pub async fn list_activity(&self, since: &i64) -> Result<Vec<Activity>> {
let rows = sqlx::query_as::<_, Activity>(
"SELECT id, tenant, created_at, activity_type, resource_type, resource_id
FROM activity
WHERE created_at > ?
ORDER BY created_at, id",
)
.bind(since)
.fetch_all(&self.pool)
.await?;
Ok(rows)
}
pub async fn list_activity_for_relay(&self, relay_id: &str) -> Result<Vec<Activity>> {
let rows = sqlx::query_as::<_, Activity>(
"SELECT id, tenant, created_at, activity_type, resource_type, resource_id
FROM activity
WHERE resource_type = 'relay' AND resource_id = ?
ORDER BY created_at DESC, id DESC",
)
.bind(relay_id)
.fetch_all(&self.pool)
.await?;
Ok(rows)
}
pub fn list_plans() -> Vec<Plan> {
vec![
Plan {
id: "free".to_string(),
name: "Free".to_string(),
sats: 0,
members: Some(10),
blossom: false,
livekit: false,
},
Plan {
id: "basic".to_string(),
name: "Basic".to_string(),
sats: 10_000,
members: Some(100),
blossom: true,
livekit: true,
},
Plan {
id: "growth".to_string(),
name: "Growth".to_string(),
sats: 50_000,
members: None,
blossom: true,
livekit: true,
},
]
}
}
fn normalize_sqlite_url(url: &str) -> String {
let Some(path) = url.strip_prefix("sqlite://") else {
return url.to_string();
};
if path.is_empty() || path == ":memory:" || Path::new(path).is_absolute() {
return url.to_string();
}
format!("sqlite://{}/{}", env!("CARGO_MANIFEST_DIR"), path)
}