Compare commits

...

2 Commits

5 changed files with 108 additions and 20 deletions
+1 -1
View File
@@ -19,7 +19,7 @@ Members:
## `async fn handle_activity(&self, activity: &Activity)` ## `async fn handle_activity(&self, activity: &Activity)`
- For `create_relay`, `update_relay`, or `deactivate_relay` activity, calls `sync_and_report`. - For `create_relay`, `update_relay`, `activate_relay`, or `deactivate_relay` activity, calls `sync_and_report`.
- All other activity types are ignored (e.g. `fail_relay_sync`, `complete_relay_sync`). - All other activity types are ignored (e.g. `fail_relay_sync`, `complete_relay_sync`).
## `async fn sync_and_report(&self, relay: &Relay, is_new: bool)` ## `async fn sync_and_report(&self, relay: &Relay, is_new: bool)`
+33 -3
View File
@@ -12,7 +12,7 @@ use base64::Engine;
use nostr_sdk::{Event, JsonUtil, Kind}; use nostr_sdk::{Event, JsonUtil, Kind};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use crate::billing::Billing; use crate::billing::{Billing, InvoiceLookupError};
use crate::command::Command; use crate::command::Command;
use crate::models::{ use crate::models::{
RELAY_STATUS_ACTIVE, RELAY_STATUS_DELINQUENT, RELAY_STATUS_INACTIVE, Relay, Tenant, RELAY_STATUS_ACTIVE, RELAY_STATUS_DELINQUENT, RELAY_STATUS_INACTIVE, Relay, Tenant,
@@ -72,6 +72,11 @@ enum ApiError {
Unauthorized(anyhow::Error), Unauthorized(anyhow::Error),
Forbidden(&'static str), Forbidden(&'static str),
NotFound(&'static str), NotFound(&'static str),
Client {
status: StatusCode,
code: &'static str,
message: &'static str,
},
Internal(String), Internal(String),
} }
@@ -81,11 +86,36 @@ impl IntoResponse for ApiError {
Self::Unauthorized(e) => err(StatusCode::UNAUTHORIZED, "unauthorized", &e.to_string()), Self::Unauthorized(e) => err(StatusCode::UNAUTHORIZED, "unauthorized", &e.to_string()),
Self::Forbidden(message) => err(StatusCode::FORBIDDEN, "forbidden", message), Self::Forbidden(message) => err(StatusCode::FORBIDDEN, "forbidden", message),
Self::NotFound(message) => err(StatusCode::NOT_FOUND, "not-found", message), Self::NotFound(message) => err(StatusCode::NOT_FOUND, "not-found", message),
Self::Client {
status,
code,
message,
} => err(status, code, message),
Self::Internal(message) => err(StatusCode::INTERNAL_SERVER_ERROR, "internal", &message), Self::Internal(message) => err(StatusCode::INTERNAL_SERVER_ERROR, "internal", &message),
} }
} }
} }
fn map_invoice_lookup_error(error: InvoiceLookupError) -> ApiError {
match error {
InvoiceLookupError::StripeClient { status } => {
let status = StatusCode::from_u16(status.as_u16()).unwrap_or(StatusCode::BAD_REQUEST);
match status {
StatusCode::NOT_FOUND => ApiError::NotFound("invoice not found"),
StatusCode::UNAUTHORIZED | StatusCode::FORBIDDEN => {
ApiError::Forbidden("invoice access denied")
}
_ => ApiError::Client {
status,
code: "invoice-request-rejected",
message: "invoice request rejected",
},
}
}
InvoiceLookupError::Internal(error) => ApiError::Internal(error.to_string()),
}
}
impl Api { impl Api {
pub fn new(query: Query, command: Command, billing: Billing) -> Self { pub fn new(query: Query, command: Command, billing: Billing) -> Self {
let host = std::env::var("HOST").unwrap_or_else(|_| "127.0.0.1".to_string()); let host = std::env::var("HOST").unwrap_or_else(|_| "127.0.0.1".to_string());
@@ -808,7 +838,7 @@ async fn get_invoice(
.billing .billing
.get_invoice_with_tenant(&id) .get_invoice_with_tenant(&id)
.await .await
.map_err(|e| ApiError::Internal(e.to_string()))?; .map_err(map_invoice_lookup_error)?;
state.api.require_admin_or_tenant(&auth, &tenant.pubkey)?; state.api.require_admin_or_tenant(&auth, &tenant.pubkey)?;
Ok(ok(StatusCode::OK, invoice)) Ok(ok(StatusCode::OK, invoice))
@@ -825,7 +855,7 @@ async fn get_invoice_bolt11(
.billing .billing
.get_invoice_with_tenant(&id) .get_invoice_with_tenant(&id)
.await .await
.map_err(|e| ApiError::Internal(e.to_string()))?; .map_err(map_invoice_lookup_error)?;
state.api.require_admin_or_tenant(&auth, &tenant.pubkey)?; state.api.require_admin_or_tenant(&auth, &tenant.pubkey)?;
let status = invoice["status"].as_str().unwrap_or_default(); let status = invoice["status"].as_str().unwrap_or_default();
+50 -7
View File
@@ -18,6 +18,41 @@ const STRIPE_API: &str = "https://api.stripe.com/v1";
const COINBASE_SPOT_API: &str = "https://api.coinbase.com/v2/prices"; const COINBASE_SPOT_API: &str = "https://api.coinbase.com/v2/prices";
const WEBHOOK_TOLERANCE_SECS: i64 = 300; const WEBHOOK_TOLERANCE_SECS: i64 = 300;
#[derive(Debug)]
pub enum InvoiceLookupError {
StripeClient { status: reqwest::StatusCode },
Internal(anyhow::Error),
}
impl std::fmt::Display for InvoiceLookupError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::StripeClient { status } => {
write!(
f,
"stripe invoice lookup failed with status {}",
status.as_u16()
)
}
Self::Internal(error) => write!(f, "{error}"),
}
}
}
impl std::error::Error for InvoiceLookupError {}
impl From<anyhow::Error> for InvoiceLookupError {
fn from(value: anyhow::Error) -> Self {
Self::Internal(value)
}
}
impl From<reqwest::Error> for InvoiceLookupError {
fn from(value: reqwest::Error) -> Self {
Self::Internal(value.into())
}
}
#[derive(serde::Deserialize)] #[derive(serde::Deserialize)]
struct StripeEvent { struct StripeEvent {
#[serde(rename = "type")] #[serde(rename = "type")]
@@ -462,16 +497,18 @@ impl Billing {
pub async fn get_invoice_with_tenant( pub async fn get_invoice_with_tenant(
&self, &self,
invoice_id: &str, invoice_id: &str,
) -> Result<(serde_json::Value, crate::models::Tenant)> { ) -> std::result::Result<(serde_json::Value, crate::models::Tenant), InvoiceLookupError> {
let invoice = self.stripe_get_invoice(invoice_id).await?; let invoice = self.stripe_get_invoice(invoice_id).await?;
let customer_id = invoice["customer"] let customer_id = invoice["customer"]
.as_str() .as_str()
.ok_or_else(|| anyhow!("invoice missing customer"))?; .ok_or_else(|| InvoiceLookupError::Internal(anyhow!("invoice missing customer")))?;
let tenant = self let tenant = self
.query .query
.get_tenant_by_stripe_customer_id(customer_id) .get_tenant_by_stripe_customer_id(customer_id)
.await? .await?
.ok_or_else(|| anyhow!("tenant not found for customer"))?; .ok_or_else(|| {
InvoiceLookupError::Internal(anyhow!("tenant not found for customer"))
})?;
Ok((invoice, tenant)) Ok((invoice, tenant))
} }
@@ -515,7 +552,10 @@ impl Billing {
Ok(body["data"].clone()) Ok(body["data"].clone())
} }
pub async fn stripe_get_invoice(&self, invoice_id: &str) -> Result<serde_json::Value> { pub async fn stripe_get_invoice(
&self,
invoice_id: &str,
) -> std::result::Result<serde_json::Value, InvoiceLookupError> {
let resp = self let resp = self
.http .http
.get(format!("{STRIPE_API}/invoices/{invoice_id}")) .get(format!("{STRIPE_API}/invoices/{invoice_id}"))
@@ -523,6 +563,12 @@ impl Billing {
.send() .send()
.await?; .await?;
if resp.status().is_client_error() {
return Err(InvoiceLookupError::StripeClient {
status: resp.status(),
});
}
let body: serde_json::Value = resp.error_for_status()?.json().await?; let body: serde_json::Value = resp.error_for_status()?.json().await?;
Ok(body) Ok(body)
} }
@@ -901,10 +947,7 @@ mod tests {
&unknown_status_paid &unknown_status_paid
)); ));
} }
}
#[cfg(test)]
mod tests {
use super::*; use super::*;
use sqlx::SqlitePool; use sqlx::SqlitePool;
use sqlx::sqlite::{SqliteConnectOptions, SqlitePoolOptions}; use sqlx::sqlite::{SqliteConnectOptions, SqlitePoolOptions};
+1 -2
View File
@@ -209,8 +209,7 @@ impl Command {
.execute(&mut *tx) .execute(&mut *tx)
.await?; .await?;
let activity = let activity = Self::insert_activity(&mut tx, activity_type, "relay", relay_id).await?;
Self::insert_activity(&mut tx, "deactivate_relay", "relay", &relay_id).await?;
tx.commit().await?; tx.commit().await?;
self.emit(activity); self.emit(activity);
+23 -7
View File
@@ -56,10 +56,7 @@ impl Infra {
} }
async fn handle_activity(&self, activity: &Activity) -> Result<()> { async fn handle_activity(&self, activity: &Activity) -> Result<()> {
let needs_sync = matches!( let needs_sync = should_sync_relay_activity(activity.activity_type.as_str());
activity.activity_type.as_str(),
"create_relay" | "update_relay" | "deactivate_relay"
);
if needs_sync { if needs_sync {
let Some(relay) = self.query.get_relay(&activity.resource_id).await? else { let Some(relay) = self.query.get_relay(&activity.resource_id).await? else {
@@ -93,7 +90,9 @@ impl Infra {
async fn nip98_auth(&self, url: &str, method: HttpMethod) -> Result<String> { async fn nip98_auth(&self, url: &str, method: HttpMethod) -> Result<String> {
let keys = Keys::parse(&self.api_secret)?; let keys = Keys::parse(&self.api_secret)?;
let server_url = Url::parse(url)?; let server_url = Url::parse(url)?;
let auth = HttpData::new(server_url, method).to_authorization(&keys).await?; let auth = HttpData::new(server_url, method)
.to_authorization(&keys)
.await?;
Ok(auth) Ok(auth)
} }
@@ -150,11 +149,21 @@ impl Infra {
let response = if is_new { let response = if is_new {
let url = format!("{}/relay/{}", base, relay.id); let url = format!("{}/relay/{}", base, relay.id);
let auth = self.nip98_auth(&url, HttpMethod::POST).await?; let auth = self.nip98_auth(&url, HttpMethod::POST).await?;
client.post(&url).header("Authorization", auth).json(&body).send().await? client
.post(&url)
.header("Authorization", auth)
.json(&body)
.send()
.await?
} else { } else {
let url = format!("{}/relay/{}", base, relay.id); let url = format!("{}/relay/{}", base, relay.id);
let auth = self.nip98_auth(&url, HttpMethod::PUT).await?; let auth = self.nip98_auth(&url, HttpMethod::PUT).await?;
client.put(&url).header("Authorization", auth).json(&body).send().await? client
.put(&url)
.header("Authorization", auth)
.json(&body)
.send()
.await?
}; };
if !response.status().is_success() { if !response.status().is_success() {
@@ -165,3 +174,10 @@ impl Infra {
Ok(()) Ok(())
} }
} }
fn should_sync_relay_activity(activity_type: &str) -> bool {
matches!(
activity_type,
"create_relay" | "update_relay" | "activate_relay" | "deactivate_relay"
)
}