fix: enforce relay member capacity limits from plan definitions (#43)
Co-authored-by: userAdityaa <aditya.chaudhary1558@gmail.com> Co-committed-by: userAdityaa <aditya.chaudhary1558@gmail.com>
This commit was merged in pull request #43.
This commit is contained in:
+84
-3
@@ -1,6 +1,6 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use anyhow::anyhow;
|
||||
use anyhow::{Result, anyhow};
|
||||
use axum::{
|
||||
Json, Router,
|
||||
extract::{Path, State},
|
||||
@@ -14,6 +14,7 @@ use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::billing::{Billing, InvoiceLookupError};
|
||||
use crate::command::Command;
|
||||
use crate::infra::Infra;
|
||||
use crate::models::{
|
||||
RELAY_STATUS_ACTIVE, RELAY_STATUS_DELINQUENT, RELAY_STATUS_INACTIVE, Relay, Tenant,
|
||||
};
|
||||
@@ -27,6 +28,7 @@ pub struct Api {
|
||||
query: Query,
|
||||
command: Command,
|
||||
billing: Billing,
|
||||
infra: Infra,
|
||||
}
|
||||
|
||||
async fn stripe_webhook(
|
||||
@@ -117,7 +119,7 @@ fn map_invoice_lookup_error(error: InvoiceLookupError) -> ApiError {
|
||||
}
|
||||
|
||||
impl Api {
|
||||
pub fn new(query: Query, command: Command, billing: Billing) -> Self {
|
||||
pub fn new(query: Query, command: Command, billing: Billing, infra: Infra) -> Self {
|
||||
let host = std::env::var("HOST").unwrap_or_else(|_| "127.0.0.1".to_string());
|
||||
let admins = std::env::var("ADMINS")
|
||||
.unwrap_or_default()
|
||||
@@ -131,6 +133,7 @@ impl Api {
|
||||
query,
|
||||
command,
|
||||
billing,
|
||||
infra,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -148,6 +151,7 @@ impl Api {
|
||||
.route("/tenants/:pubkey/relays", get(list_tenant_relays))
|
||||
.route("/relays", get(list_relays).post(create_relay))
|
||||
.route("/relays/:id", get(get_relay).put(update_relay))
|
||||
.route("/relays/:id/members", get(list_relay_members))
|
||||
.route("/relays/:id/activity", get(list_relay_activity))
|
||||
.route("/relays/:id/deactivate", post(deactivate_relay))
|
||||
.route("/relays/:id/reactivate", post(reactivate_relay))
|
||||
@@ -251,6 +255,14 @@ impl Api {
|
||||
}
|
||||
}
|
||||
|
||||
async fn fetch_relay_members(&self, relay: &Relay) -> Result<Vec<String>> {
|
||||
if relay.synced == 0 {
|
||||
return Ok(Vec::new());
|
||||
}
|
||||
|
||||
self.infra.list_relay_members(&relay.id).await
|
||||
}
|
||||
|
||||
fn prepare_relay(&self, mut relay: Relay) -> std::result::Result<Relay, RelayValidationError> {
|
||||
validate_subdomain_label(&relay.subdomain)?;
|
||||
|
||||
@@ -669,6 +681,40 @@ async fn list_relay_activity(
|
||||
}
|
||||
}
|
||||
|
||||
async fn list_relay_members(
|
||||
State(state): State<AppState>,
|
||||
headers: HeaderMap,
|
||||
Path(id): Path<String>,
|
||||
) -> std::result::Result<Response, ApiError> {
|
||||
let auth = state.api.extract_auth_pubkey(&headers)?;
|
||||
|
||||
let relay = match state.api.query.get_relay(&id).await {
|
||||
Ok(Some(r)) => r,
|
||||
Ok(None) => return Ok(err(StatusCode::NOT_FOUND, "not-found", "relay not found")),
|
||||
Err(e) => {
|
||||
return Ok(err(
|
||||
StatusCode::INTERNAL_SERVER_ERROR,
|
||||
"internal",
|
||||
&e.to_string(),
|
||||
));
|
||||
}
|
||||
};
|
||||
|
||||
state.api.require_admin_or_tenant(&auth, &relay.tenant)?;
|
||||
|
||||
match state.api.fetch_relay_members(&relay).await {
|
||||
Ok(members) => Ok(ok(
|
||||
StatusCode::OK,
|
||||
serde_json::json!({ "members": members }),
|
||||
)),
|
||||
Err(e) => Ok(err(
|
||||
StatusCode::INTERNAL_SERVER_ERROR,
|
||||
"internal",
|
||||
&e.to_string(),
|
||||
)),
|
||||
}
|
||||
}
|
||||
|
||||
async fn create_relay(
|
||||
State(state): State<AppState>,
|
||||
headers: HeaderMap,
|
||||
@@ -748,10 +794,13 @@ async fn update_relay(
|
||||
|
||||
state.api.require_admin_or_tenant(&auth, &relay.tenant)?;
|
||||
|
||||
let current_plan = relay.plan.clone();
|
||||
let requested_plan = payload.plan.clone();
|
||||
|
||||
if let Some(v) = payload.subdomain {
|
||||
relay.subdomain = v;
|
||||
}
|
||||
if let Some(v) = payload.plan {
|
||||
if let Some(v) = requested_plan.clone() {
|
||||
relay.plan = v;
|
||||
}
|
||||
if let Some(v) = payload.info_name {
|
||||
@@ -792,6 +841,38 @@ async fn update_relay(
|
||||
}
|
||||
};
|
||||
|
||||
let plan_changed = requested_plan
|
||||
.as_deref()
|
||||
.is_some_and(|requested| requested != current_plan);
|
||||
|
||||
if plan_changed {
|
||||
let selected_plan = Query::get_plan(&relay.plan).expect("validated plan must exist");
|
||||
if let Some(limit) = selected_plan.members {
|
||||
let current_members = match state.api.fetch_relay_members(&relay).await {
|
||||
Ok(members) => members.len() as i64,
|
||||
Err(e) => {
|
||||
return Ok(err(
|
||||
StatusCode::INTERNAL_SERVER_ERROR,
|
||||
"internal",
|
||||
&e.to_string(),
|
||||
));
|
||||
}
|
||||
};
|
||||
|
||||
if current_members > limit {
|
||||
let message = format!(
|
||||
"relay has {current_members} members, which exceeds the {} plan limit of {limit}",
|
||||
selected_plan.name.to_lowercase()
|
||||
);
|
||||
return Ok(err(
|
||||
StatusCode::UNPROCESSABLE_ENTITY,
|
||||
"member-limit-exceeded",
|
||||
&message,
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
match state.api.command.update_relay(&relay).await {
|
||||
Ok(()) => Ok(ok(StatusCode::OK, relay)),
|
||||
Err(e) => {
|
||||
|
||||
+65
-5
@@ -2,7 +2,7 @@ use anyhow::Result;
|
||||
use nostr_sdk::prelude::*;
|
||||
|
||||
use crate::command::Command;
|
||||
use crate::models::{Activity, Relay, RELAY_STATUS_DELINQUENT, RELAY_STATUS_INACTIVE};
|
||||
use crate::models::{Activity, RELAY_STATUS_DELINQUENT, RELAY_STATUS_INACTIVE, Relay};
|
||||
use crate::query::Query;
|
||||
|
||||
#[derive(Clone)]
|
||||
@@ -18,14 +18,22 @@ pub struct Infra {
|
||||
}
|
||||
|
||||
impl Infra {
|
||||
pub fn new(query: Query, command: Command) -> Self {
|
||||
pub fn new(query: Query, command: Command) -> Result<Self> {
|
||||
let api_url = std::env::var("ZOOID_API_URL").unwrap_or_default();
|
||||
let relay_domain = std::env::var("RELAY_DOMAIN").unwrap_or_default();
|
||||
let livekit_url = std::env::var("LIVEKIT_URL").unwrap_or_default();
|
||||
let livekit_api_key = std::env::var("LIVEKIT_API_KEY").unwrap_or_default();
|
||||
let livekit_api_secret = std::env::var("LIVEKIT_API_SECRET").unwrap_or_default();
|
||||
let api_secret = std::env::var("ZOOID_API_SECRET").unwrap_or_default();
|
||||
Self {
|
||||
|
||||
if api_url.trim().is_empty() {
|
||||
anyhow::bail!("missing ZOOID_API_URL");
|
||||
}
|
||||
if api_secret.trim().is_empty() {
|
||||
anyhow::bail!("missing ZOOID_API_SECRET");
|
||||
}
|
||||
|
||||
Ok(Self {
|
||||
api_url,
|
||||
relay_domain,
|
||||
livekit_url,
|
||||
@@ -34,7 +42,7 @@ impl Infra {
|
||||
api_secret,
|
||||
query,
|
||||
command,
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn start(self) {
|
||||
@@ -96,6 +104,28 @@ impl Infra {
|
||||
Ok(auth)
|
||||
}
|
||||
|
||||
pub async fn list_relay_members(&self, relay_id: &str) -> Result<Vec<String>> {
|
||||
let client = reqwest::Client::new();
|
||||
let base = self.api_url.trim_end_matches('/');
|
||||
let url = format!("{base}/relay/{relay_id}/members");
|
||||
let auth = self.nip98_auth(&url, HttpMethod::GET).await?;
|
||||
|
||||
let response = client
|
||||
.get(&url)
|
||||
.header("Authorization", auth)
|
||||
.send()
|
||||
.await?;
|
||||
|
||||
if !response.status().is_success() {
|
||||
let status = response.status();
|
||||
let body = response.text().await.unwrap_or_default();
|
||||
anyhow::bail!("zooid members returned {status}: {body}");
|
||||
}
|
||||
|
||||
let body = response.text().await?;
|
||||
parse_relay_members_response(&body)
|
||||
}
|
||||
|
||||
async fn sync_relay(&self, relay: &Relay, is_new: bool) -> Result<()> {
|
||||
let client = reqwest::Client::new();
|
||||
let base = self.api_url.trim_end_matches('/');
|
||||
@@ -125,7 +155,9 @@ impl Infra {
|
||||
);
|
||||
|
||||
let url = format!("{}/relay/{}", base, relay.id);
|
||||
let auth = self.nip98_auth(&url, zooid_sync_http_method(is_new)).await?;
|
||||
let auth = self
|
||||
.nip98_auth(&url, zooid_sync_http_method(is_new))
|
||||
.await?;
|
||||
|
||||
let request = if is_new {
|
||||
client.post(&url)
|
||||
@@ -156,6 +188,34 @@ fn zooid_sync_http_method(is_new: bool) -> HttpMethod {
|
||||
}
|
||||
}
|
||||
|
||||
fn parse_relay_members_response(body: &str) -> Result<Vec<String>> {
|
||||
let value: serde_json::Value = serde_json::from_str(body)?;
|
||||
|
||||
if let Some(members) = members_from_value(&value) {
|
||||
return Ok(members);
|
||||
}
|
||||
if let Some(members) = value.get("members").and_then(members_from_value) {
|
||||
return Ok(members);
|
||||
}
|
||||
if let Some(members) = value
|
||||
.get("data")
|
||||
.and_then(|data| data.get("members"))
|
||||
.and_then(members_from_value)
|
||||
{
|
||||
return Ok(members);
|
||||
}
|
||||
|
||||
anyhow::bail!("zooid members response missing members array")
|
||||
}
|
||||
|
||||
fn members_from_value(value: &serde_json::Value) -> Option<Vec<String>> {
|
||||
let values = value.as_array()?;
|
||||
values
|
||||
.iter()
|
||||
.map(|value| value.as_str().map(ToString::to_string))
|
||||
.collect()
|
||||
}
|
||||
|
||||
fn relay_sync_body(
|
||||
relay: &Relay,
|
||||
host: String,
|
||||
|
||||
+2
-2
@@ -33,8 +33,8 @@ async fn main() -> Result<()> {
|
||||
let query = Query::new(pool.clone());
|
||||
let command = Command::new(pool);
|
||||
let billing = Billing::new(query.clone(), command.clone(), robot.clone());
|
||||
let infra = Infra::new(query.clone(), command.clone());
|
||||
let api = Api::new(query, command, billing.clone());
|
||||
let infra = Infra::new(query.clone(), command.clone())?;
|
||||
let api = Api::new(query, command, billing.clone(), infra.clone());
|
||||
|
||||
let host = std::env::var("HOST").unwrap_or_else(|_| "127.0.0.1".to_string());
|
||||
let port: u16 = std::env::var("PORT")
|
||||
|
||||
Reference in New Issue
Block a user