Simplify relay upsert

This commit is contained in:
Jon Staab
2026-03-03 09:08:54 -08:00
parent 6618025b54
commit 46a270513e
13 changed files with 495 additions and 242 deletions
+55 -48
View File
@@ -12,7 +12,7 @@ use serde::{Deserialize, Serialize};
use uuid::Uuid;
use crate::auth::verify_nip98;
use crate::models::{NewRelay, NewTenant, Relay, UpdateRelay};
use crate::models::{NewTenant, Relay, RelayConfig};
use crate::provisioning::Provisioner;
use crate::repo::Repo;
@@ -211,6 +211,7 @@ struct CreateRelayRequest {
icon: String,
description: String,
plan: String,
config: Option<RelayConfig>,
}
async fn create_tenant_relay(
@@ -241,7 +242,7 @@ async fn create_tenant_relay(
.into_response();
}
let relay = NewRelay {
let relay = Relay {
id: Uuid::new_v4().to_string(),
tenant: pubkey.clone(),
name: payload.name,
@@ -251,9 +252,10 @@ async fn create_tenant_relay(
description: payload.description,
plan: payload.plan,
status: "pending".to_string(),
config: payload.config,
};
if let Err(err) = state.repo.create_relay(&relay).await {
if let Err(err) = state.repo.upsert_relay(&relay).await {
if is_unique_subdomain_violation(&err) {
return (
StatusCode::CONFLICT,
@@ -273,7 +275,17 @@ async fn create_tenant_relay(
.into_response();
}
spawn_provisioning_worker(state.repo.clone(), state.provisioner.clone(), relay.clone());
if let Err(err) = state.provisioner.sync_relay(&relay, true).await {
tracing::error!(relay_id = relay.id, error = %err, "zooid create failed");
let _ = state.repo.update_relay_status(&relay.id, "provisioning_failed").await;
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError { error: format!("failed to provision relay: {err}") }),
)
.into_response();
}
let _ = state.repo.update_relay_status(&relay.id, "active").await;
(StatusCode::CREATED, Json(relay)).into_response()
}
@@ -311,6 +323,7 @@ struct UpdateRelayRequest {
icon: String,
description: String,
plan: String,
config: Option<RelayConfig>,
}
async fn update_tenant_relay(
@@ -344,8 +357,9 @@ async fn update_tenant_relay(
return forbidden();
}
let updated = UpdateRelay {
let relay = Relay {
id: existing.id,
tenant: existing.tenant,
name: payload.name,
subdomain: payload.subdomain.clone(),
schema: payload.subdomain.replace('-', "_"),
@@ -353,9 +367,10 @@ async fn update_tenant_relay(
description: payload.description,
plan: payload.plan,
status: existing.status,
config: payload.config,
};
if let Err(err) = state.repo.update_relay(&updated).await {
if let Err(err) = state.repo.upsert_relay(&relay).await {
if is_unique_subdomain_violation(&err) {
return (
StatusCode::CONFLICT,
@@ -375,7 +390,16 @@ async fn update_tenant_relay(
.into_response();
}
(StatusCode::OK, Json(updated)).into_response()
if let Err(err) = state.provisioner.sync_relay(&relay, false).await {
tracing::error!(relay_id = relay.id, error = %err, "zooid patch failed");
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError { error: format!("failed to update relay config: {err}") }),
)
.into_response();
}
(StatusCode::OK, Json(relay)).into_response()
}
async fn deactivate_tenant_relay(
@@ -408,18 +432,13 @@ async fn deactivate_tenant_relay(
return forbidden();
}
let updated = UpdateRelay {
id: existing.id,
name: existing.name,
subdomain: existing.subdomain.clone(),
schema: existing.subdomain.replace('-', "_"),
icon: existing.icon,
description: existing.description,
plan: existing.plan,
let relay = Relay {
status: "deactivated".to_string(),
config: None,
..existing
};
if let Err(_) = state.repo.update_relay(&updated).await {
if let Err(_) = state.repo.upsert_relay(&relay).await {
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError {
@@ -429,7 +448,7 @@ async fn deactivate_tenant_relay(
.into_response();
}
(StatusCode::OK, Json(updated)).into_response()
(StatusCode::OK, Json(relay)).into_response()
}
async fn list_tenant_invoices(
@@ -734,8 +753,9 @@ async fn admin_update_relay(
}
};
let updated = UpdateRelay {
let relay = Relay {
id: existing.id,
tenant: existing.tenant,
name: payload.name,
subdomain: payload.subdomain.clone(),
schema: payload.subdomain.replace('-', "_"),
@@ -743,9 +763,10 @@ async fn admin_update_relay(
description: payload.description,
plan: payload.plan,
status: existing.status,
config: payload.config,
};
if let Err(err) = state.repo.update_relay(&updated).await {
if let Err(err) = state.repo.upsert_relay(&relay).await {
if is_unique_subdomain_violation(&err) {
return (
StatusCode::CONFLICT,
@@ -765,7 +786,16 @@ async fn admin_update_relay(
.into_response();
}
(StatusCode::OK, Json(updated)).into_response()
if let Err(err) = state.provisioner.sync_relay(&relay, false).await {
tracing::error!(relay_id = relay.id, error = %err, "zooid patch failed");
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError { error: format!("failed to update relay config: {err}") }),
)
.into_response();
}
(StatusCode::OK, Json(relay)).into_response()
}
async fn admin_deactivate_relay(
@@ -798,18 +828,13 @@ async fn admin_deactivate_relay(
}
};
let updated = UpdateRelay {
id: existing.id,
name: existing.name,
subdomain: existing.subdomain.clone(),
schema: existing.subdomain.replace('-', "_"),
icon: existing.icon,
description: existing.description,
plan: existing.plan,
let relay = Relay {
status: "deactivated".to_string(),
config: None,
..existing
};
if let Err(_) = state.repo.update_relay(&updated).await {
if let Err(_) = state.repo.upsert_relay(&relay).await {
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError {
@@ -819,25 +844,7 @@ async fn admin_deactivate_relay(
.into_response();
}
(StatusCode::OK, Json(updated)).into_response()
(StatusCode::OK, Json(relay)).into_response()
}
fn spawn_provisioning_worker(repo: Repo, provisioner: Provisioner, relay: NewRelay) {
tokio::spawn(async move {
tracing::info!(relay_id = relay.id, "provisioning worker started");
if let Err(err) = provisioner.provision_relay(&relay).await {
tracing::error!(relay_id = relay.id, error = %err, "provisioning failed");
if let Err(err) = repo
.update_relay_status(&relay.id, "provisioning_failed")
.await
{
tracing::error!(relay_id = relay.id, error = %err, "failed to update relay status");
}
return;
}
if let Err(err) = repo.update_relay_status(&relay.id, "active").await {
tracing::error!(relay_id = relay.id, error = %err, "failed to update relay status");
}
});
}
+11 -26
View File
@@ -1,5 +1,14 @@
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RelayConfig {
pub policy: Option<serde_json::Value>,
pub groups: Option<serde_json::Value>,
pub management: Option<serde_json::Value>,
pub blossom: Option<serde_json::Value>,
pub push: Option<serde_json::Value>,
}
#[derive(Debug, Clone, Serialize, Deserialize, sqlx::FromRow)]
pub struct Tenant {
pub pubkey: String,
@@ -14,7 +23,7 @@ pub struct NewTenant {
pub tenant_nwc_url: String,
}
#[derive(Debug, Clone, Serialize, Deserialize, sqlx::FromRow)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Relay {
pub id: String,
pub tenant: String,
@@ -25,31 +34,7 @@ pub struct Relay {
pub description: String,
pub plan: String,
pub status: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NewRelay {
pub id: String,
pub tenant: String,
pub name: String,
pub subdomain: String,
pub schema: String,
pub icon: String,
pub description: String,
pub plan: String,
pub status: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct UpdateRelay {
pub id: String,
pub name: String,
pub subdomain: String,
pub schema: String,
pub icon: String,
pub description: String,
pub plan: String,
pub status: String,
pub config: Option<RelayConfig>,
}
#[derive(Debug, Clone, Serialize, Deserialize, sqlx::FromRow)]
+115 -118
View File
@@ -3,12 +3,13 @@ use rand::RngCore;
use rand::rngs::OsRng;
use reqwest::Client;
use serde::Serialize;
use serde_json::{Value, json};
use nostr_sdk::nostr::Keys;
use nostr_sdk::nostr::nips::nip98::{HttpData, HttpMethod};
use nostr_sdk::nostr::types::url::Url;
use crate::models::NewRelay;
use crate::models::{Relay, RelayConfig};
#[derive(Clone)]
pub struct Provisioner {
@@ -35,31 +36,49 @@ impl Provisioner {
})
}
pub async fn provision_relay(&self, relay: &NewRelay) -> Result<()> {
let host = format!("{}.{}", relay.subdomain, self.relay_domain);
let secret = generate_secret_hex();
let payload = ZooidConfig::new(relay, host, secret);
/// Create or update a relay in zooid.
///
/// On creation, POSTs the full config (including a generated secret and host).
/// On update, PATCHes only the mutable fields (info + config sections).
pub async fn sync_relay(&self, relay: &Relay, is_new: bool) -> Result<()> {
let url = format!("{}/relay/{}", self.base_url.trim_end_matches('/'), relay.id);
let auth = self.build_auth_header(&url, HttpMethod::POST).await?;
let res = self
.client
.post(&url)
.header(reqwest::header::AUTHORIZATION, auth)
.json(&payload)
.send()
.await?;
if is_new {
let host = format!("{}.{}", relay.subdomain, self.relay_domain);
let secret = generate_secret_hex();
let payload = build_full_config(relay, host, secret);
let auth = self.build_auth_header(&url, HttpMethod::POST).await?;
if !res.status().is_success() {
let status = res.status();
let body = res.text().await.unwrap_or_default();
return Err(anyhow!(
"zooid provisioning failed for {}: {} {}",
url,
status,
body
));
let res = self
.client
.post(&url)
.header(reqwest::header::AUTHORIZATION, auth)
.json(&payload)
.send()
.await?;
if !res.status().is_success() {
let status = res.status();
let body = res.text().await.unwrap_or_default();
return Err(anyhow!("zooid create failed: {} {}", status, body));
}
} else {
let patch = build_patch(relay);
let auth = self.build_auth_header(&url, HttpMethod::PATCH).await?;
let res = self
.client
.patch(&url)
.header(reqwest::header::AUTHORIZATION, auth)
.json(&patch)
.send()
.await?;
if !res.status().is_success() {
let status = res.status();
let body = res.text().await.unwrap_or_default();
return Err(anyhow!("zooid patch failed: {} {}", status, body));
}
}
Ok(())
@@ -73,108 +92,86 @@ impl Provisioner {
}
}
#[derive(Debug, Serialize)]
struct ZooidConfig {
host: String,
schema: String,
secret: String,
info: ZooidInfo,
policy: ZooidPolicy,
groups: ZooidGroups,
push: ZooidPush,
management: ZooidManagement,
blossom: ZooidBlossom,
roles: ZooidRoles,
/// Builds the full zooid config payload for relay creation (POST).
fn build_full_config(relay: &Relay, host: String, secret: String) -> Value {
let blossom_default = relay.plan != "free";
let cfg = relay.config.as_ref();
json!({
"host": host,
"schema": relay.schema,
"secret": secret,
"info": {
"name": relay.name,
"icon": relay.icon,
"pubkey": relay.tenant,
"description": relay.description,
},
"policy": {
"public_join": cfg_bool(cfg, |c| &c.policy, "public_join", false),
"strip_signatures": cfg_bool(cfg, |c| &c.policy, "strip_signatures", false),
},
"groups": {
"enabled": cfg_bool(cfg, |c| &c.groups, "enabled", true),
"auto_join": cfg_bool(cfg, |c| &c.groups, "auto_join", true),
},
"push": {
"enabled": cfg_bool(cfg, |c| &c.push, "enabled", true),
},
"management": {
"enabled": cfg_bool(cfg, |c| &c.management, "enabled", true),
},
"blossom": {
"enabled": cfg_bool(cfg, |c| &c.blossom, "enabled", blossom_default),
},
"roles": {
"member": { "pubkeys": [], "can_invite": true, "can_manage": false }
},
})
}
impl ZooidConfig {
fn new(relay: &NewRelay, host: String, secret: String) -> Self {
let blossom_enabled = relay.plan != "free";
/// Builds the partial zooid patch payload for relay updates (PATCH).
fn build_patch(relay: &Relay) -> Value {
let blossom_default = relay.plan != "free";
let cfg = relay.config.as_ref();
Self {
host,
schema: relay.schema.clone(),
secret,
info: ZooidInfo {
name: relay.name.clone(),
icon: relay.icon.clone(),
pubkey: relay.tenant.clone(),
description: relay.description.clone(),
},
policy: ZooidPolicy {
public_join: false,
strip_signatures: false,
},
groups: ZooidGroups {
enabled: true,
auto_join: true,
},
push: ZooidPush { enabled: true },
management: ZooidManagement {
enabled: true,
methods: Vec::new(),
},
blossom: ZooidBlossom {
enabled: blossom_enabled,
},
roles: ZooidRoles {
member: ZooidRole {
pubkeys: Vec::new(),
can_invite: true,
can_manage: false,
},
},
}
}
json!({
"info": {
"name": relay.name,
"icon": relay.icon,
"description": relay.description,
},
"policy": {
"public_join": cfg_bool(cfg, |c| &c.policy, "public_join", false),
"strip_signatures": cfg_bool(cfg, |c| &c.policy, "strip_signatures", false),
},
"groups": {
"enabled": cfg_bool(cfg, |c| &c.groups, "enabled", true),
"auto_join": cfg_bool(cfg, |c| &c.groups, "auto_join", true),
},
"push": {
"enabled": cfg_bool(cfg, |c| &c.push, "enabled", true),
},
"management": {
"enabled": cfg_bool(cfg, |c| &c.management, "enabled", true),
},
"blossom": {
"enabled": cfg_bool(cfg, |c| &c.blossom, "enabled", blossom_default),
},
})
}
#[derive(Debug, Serialize)]
struct ZooidInfo {
name: String,
icon: String,
pubkey: String,
description: String,
fn cfg_bool(
cfg: Option<&RelayConfig>,
section: impl Fn(&RelayConfig) -> &Option<Value>,
key: &str,
default: bool,
) -> bool {
cfg.and_then(|c| section(c).as_ref())
.and_then(|v| v[key].as_bool())
.unwrap_or(default)
}
#[derive(Debug, Serialize)]
struct ZooidPolicy {
public_join: bool,
strip_signatures: bool,
}
#[derive(Debug, Serialize)]
struct ZooidGroups {
enabled: bool,
auto_join: bool,
}
#[derive(Debug, Serialize)]
struct ZooidPush {
enabled: bool,
}
#[derive(Debug, Serialize)]
struct ZooidManagement {
enabled: bool,
methods: Vec<String>,
}
#[derive(Debug, Serialize)]
struct ZooidBlossom {
enabled: bool,
}
#[derive(Debug, Serialize)]
struct ZooidRoles {
member: ZooidRole,
}
#[derive(Debug, Serialize)]
struct ZooidRole {
pubkeys: Vec<String>,
can_invite: bool,
can_manage: bool,
}
fn generate_secret_hex() -> String {
let mut bytes = [0u8; 32];
+42 -33
View File
@@ -1,11 +1,27 @@
use anyhow::Result;
use sqlx::{Sqlite, SqlitePool, Transaction};
use sqlx::{Row, Sqlite, SqlitePool, Transaction};
use crate::models::{
Invoice, InvoiceItem, NewInvoice, NewInvoiceItem, NewRelay, NewTenant, Relay, Tenant,
UpdateRelay,
Invoice, InvoiceItem, NewInvoice, NewInvoiceItem, NewTenant, Relay, Tenant,
};
fn relay_from_row(row: sqlx::sqlite::SqliteRow) -> Relay {
let config_json: Option<String> = row.get("config");
let config = config_json.and_then(|s| serde_json::from_str(&s).ok());
Relay {
id: row.get("id"),
tenant: row.get("tenant"),
name: row.get("name"),
subdomain: row.get("subdomain"),
schema: row.get("schema"),
icon: row.get("icon"),
description: row.get("description"),
plan: row.get("plan"),
status: row.get("status"),
config,
}
}
#[derive(Clone)]
pub struct Repo {
pool: SqlitePool,
@@ -75,10 +91,20 @@ impl Repo {
Ok(())
}
pub async fn create_relay(&self, relay: &NewRelay) -> Result<()> {
pub async fn upsert_relay(&self, relay: &Relay) -> Result<()> {
let config_json = relay.config.as_ref().map(serde_json::to_string).transpose()?;
sqlx::query(
"INSERT INTO relays (id, tenant, name, subdomain, schema, icon, description, plan, status)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)",
"INSERT INTO relays (id, tenant, name, subdomain, schema, icon, description, plan, status, config)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(id) DO UPDATE SET
name = excluded.name,
subdomain = excluded.subdomain,
schema = excluded.schema,
icon = excluded.icon,
description = excluded.description,
plan = excluded.plan,
status = excluded.status,
config = excluded.config",
)
.bind(&relay.id)
.bind(&relay.tenant)
@@ -89,24 +115,7 @@ impl Repo {
.bind(&relay.description)
.bind(&relay.plan)
.bind(&relay.status)
.execute(&self.pool)
.await?;
Ok(())
}
pub async fn update_relay(&self, relay: &UpdateRelay) -> Result<()> {
sqlx::query(
"UPDATE relays SET name = ?, subdomain = ?, schema = ?, icon = ?, description = ?, plan = ?, status = ?
WHERE id = ?",
)
.bind(&relay.name)
.bind(&relay.subdomain)
.bind(&relay.schema)
.bind(&relay.icon)
.bind(&relay.description)
.bind(&relay.plan)
.bind(&relay.status)
.bind(&relay.id)
.bind(config_json)
.execute(&self.pool)
.await?;
Ok(())
@@ -132,32 +141,32 @@ impl Repo {
}
pub async fn get_relay(&self, id: &str) -> Result<Option<Relay>> {
let relay = sqlx::query_as::<_, Relay>(
"SELECT id, tenant, name, subdomain, schema, icon, description, plan, status FROM relays WHERE id = ?",
let row = sqlx::query(
"SELECT id, tenant, name, subdomain, schema, icon, description, plan, status, config FROM relays WHERE id = ?",
)
.bind(id)
.fetch_optional(&self.pool)
.await?;
Ok(relay)
Ok(row.map(relay_from_row))
}
pub async fn list_relays_by_tenant(&self, tenant: &str) -> Result<Vec<Relay>> {
let relays = sqlx::query_as::<_, Relay>(
"SELECT id, tenant, name, subdomain, schema, icon, description, plan, status FROM relays WHERE tenant = ? ORDER BY name",
let rows = sqlx::query(
"SELECT id, tenant, name, subdomain, schema, icon, description, plan, status, config FROM relays WHERE tenant = ? ORDER BY name",
)
.bind(tenant)
.fetch_all(&self.pool)
.await?;
Ok(relays)
Ok(rows.into_iter().map(relay_from_row).collect())
}
pub async fn list_relays(&self) -> Result<Vec<Relay>> {
let relays = sqlx::query_as::<_, Relay>(
"SELECT id, tenant, name, subdomain, schema, icon, description, plan, status FROM relays ORDER BY name",
let rows = sqlx::query(
"SELECT id, tenant, name, subdomain, schema, icon, description, plan, status, config FROM relays ORDER BY name",
)
.fetch_all(&self.pool)
.await?;
Ok(relays)
Ok(rows.into_iter().map(relay_from_row).collect())
}
pub async fn create_invoice_with_items(