diff --git a/backend/spec/billing.md b/backend/spec/billing.md index 40b111e..51fd577 100644 --- a/backend/spec/billing.md +++ b/backend/spec/billing.md @@ -12,3 +12,4 @@ Members: ## `pub fn new(query: Query, command: Command, robot: Robot) -> Self` - Reads environment and populates members +- Subscribes to `command.notify.notified` diff --git a/backend/spec/command.md b/backend/spec/command.md index 19b8a7c..220c0e1 100644 --- a/backend/spec/command.md +++ b/backend/spec/command.md @@ -5,15 +5,19 @@ Command writes to the database. Members: - `pool: SqlitePool` - a sqlite connection pool +- `pub notify: broadcast::Sender` - callers can subscribe via `command.notify.subscribe()` Notes: - All public write methods should be atomic - All writes should be accompanied by an activity log entry of `(tenant, activity_type, resource_type, resource_id)` +- `insert_activity` builds and returns the `Activity` struct (using `chrono::Utc::now()` for `created_at`) +- After each successful commit, sends the `Activity` on the broadcast channel ## `pub fn new(&self, pool: SqlitePool) -> Self` - Assigns pool to self +- Creates the broadcast channel ## `pub fn create_tenant(&self, tenant: &Tenant) -> Result<()>` @@ -51,7 +55,7 @@ Notes: - Sets relay status to `inactive`, sets `sync_error` - Logs activity as `(fail_relay_sync, relay_id)` -## `pub fn mark_relay_synced(&self, relay_id: &str) -> Result<()>` +## `pub fn complete_relay_sync(&self, relay_id: &str) -> Result<()>` - Sets `synced = 1`, `status = 'active'`, clears `sync_error` -- No activity log (called by infra after successful sync) +- Logs activity as `(complete_relay_sync, relay_id)` diff --git a/backend/spec/infra.md b/backend/spec/infra.md index a1dd60f..2f66ffc 100644 --- a/backend/spec/infra.md +++ b/backend/spec/infra.md @@ -1,6 +1,6 @@ # `pub struct Infra` -Infra is a service which polls the database and synchronizes updates to relays to a remote zooid instance via `api_url`. +Infra is a service which listens for activity and synchronizes relay updates to a remote zooid instance via `api_url`. Members: @@ -14,18 +14,24 @@ Members: ## `pub async fn start(self)` -- Initializes `last_activity_at` from `query.max_activity_at()` so historical activities are not replayed on restart. -- Calls `self.tick` in a loop every 10 seconds. +- Subscribes to `command.notify` before doing anything else so no activities are missed. +- Calls `catch_up` to sync any relays that need it from before this process started. +- Loops on `rx.recv()`, calling `handle_activity` for each received `Activity`. +- On `Lagged`, logs a warning and runs `catch_up` to recover. -## `pub async fn tick(self)` +## `async fn catch_up(&self)` -Iterates over `query.list_activity` since last run and does the following: +- Lists all relays via `query.list_relays()` and syncs any that have `status = "new"` or a non-empty `sync_error`. -- For `create_relay`, `update_relay`, or `deactivate_relay` activity, sync the relay to zooid. - - Uses `relay.synced` to decide POST vs PUT (not the activity type), so already-synced relays always use PUT even on restart. - - On success, calls `command.mark_relay_synced` to set `synced = 1`, `status = 'active'`, and clear `sync_error`. - - On failure, calls `command.fail_relay_sync`. -- All other activity types are ignored (e.g. `fail_relay_sync` must not trigger another sync). +## `async fn handle_activity(&self, activity: &Activity)` + +- For `create_relay`, `update_relay`, or `deactivate_relay` activity, calls `sync_and_report`. +- All other activity types are ignored (e.g. `fail_relay_sync`, `complete_relay_sync`). + +## `async fn sync_and_report(&self, relay: &Relay, is_new: bool)` + +- Calls `sync_relay` and on success calls `command.complete_relay_sync`. +- On failure calls `command.fail_relay_sync`. ## `async fn sync_relay(&self, relay: &Relay, is_new: bool)` diff --git a/backend/src/command.rs b/backend/src/command.rs index 790b57e..ecad4b7 100644 --- a/backend/src/command.rs +++ b/backend/src/command.rs @@ -1,16 +1,19 @@ use anyhow::Result; use sqlx::{Sqlite, SqlitePool, Transaction}; +use tokio::sync::broadcast; -use crate::models::{Relay, Tenant}; +use crate::models::{Activity, Relay, Tenant}; #[derive(Clone)] pub struct Command { pool: SqlitePool, + pub notify: broadcast::Sender, } impl Command { pub fn new(pool: SqlitePool) -> Self { - Self { pool } + let (notify, _) = broadcast::channel(64); + Self { pool, notify } } async fn insert_activity( @@ -18,7 +21,7 @@ impl Command { activity_type: &str, resource_type: &str, resource_id: &str, - ) -> Result<()> { + ) -> Result { let tenant = match resource_type { "tenant" => resource_id.to_string(), "relay" => { @@ -30,18 +33,34 @@ impl Command { _ => anyhow::bail!("unknown resource_type: {}", resource_type), }; + let id = uuid::Uuid::new_v4().to_string(); + let created_at = chrono::Utc::now().timestamp(); + sqlx::query( "INSERT INTO activity (id, tenant, created_at, activity_type, resource_type, resource_id) - VALUES (?, ?, strftime('%s','now'), ?, ?, ?)", + VALUES (?, ?, ?, ?, ?, ?)", ) - .bind(uuid::Uuid::new_v4().to_string()) - .bind(tenant) + .bind(&id) + .bind(&tenant) + .bind(created_at) .bind(activity_type) .bind(resource_type) .bind(resource_id) .execute(&mut **tx) .await?; - Ok(()) + + Ok(Activity { + id, + tenant, + created_at, + activity_type: activity_type.to_string(), + resource_type: resource_type.to_string(), + resource_id: resource_id.to_string(), + }) + } + + fn emit(&self, activity: Activity) { + let _ = self.notify.send(activity); } pub async fn create_tenant(&self, tenant: &Tenant) -> Result<()> { @@ -57,9 +76,10 @@ impl Command { .execute(&mut *tx) .await?; - Self::insert_activity(&mut tx, "create_tenant", "tenant", &tenant.pubkey).await?; + let activity = Self::insert_activity(&mut tx, "create_tenant", "tenant", &tenant.pubkey).await?; tx.commit().await?; + self.emit(activity); Ok(()) } @@ -72,9 +92,10 @@ impl Command { .execute(&mut *tx) .await?; - Self::insert_activity(&mut tx, "update_tenant", "tenant", &tenant.pubkey).await?; + let activity = Self::insert_activity(&mut tx, "update_tenant", "tenant", &tenant.pubkey).await?; tx.commit().await?; + self.emit(activity); Ok(()) } @@ -109,9 +130,10 @@ impl Command { .execute(&mut *tx) .await?; - Self::insert_activity(&mut tx, "create_relay", "relay", &relay.id).await?; + let activity = Self::insert_activity(&mut tx, "create_relay", "relay", &relay.id).await?; tx.commit().await?; + self.emit(activity); Ok(()) } @@ -147,9 +169,10 @@ impl Command { .execute(&mut *tx) .await?; - Self::insert_activity(&mut tx, "update_relay", "relay", &relay.id).await?; + let activity = Self::insert_activity(&mut tx, "update_relay", "relay", &relay.id).await?; tx.commit().await?; + self.emit(activity); Ok(()) } @@ -161,9 +184,10 @@ impl Command { .execute(&mut *tx) .await?; - Self::insert_activity(&mut tx, "deactivate_relay", "relay", &relay.id).await?; + let activity = Self::insert_activity(&mut tx, "deactivate_relay", "relay", &relay.id).await?; tx.commit().await?; + self.emit(activity); Ok(()) } @@ -175,9 +199,10 @@ impl Command { .execute(&mut *tx) .await?; - Self::insert_activity(&mut tx, "activate_relay", "relay", &relay.id).await?; + let activity = Self::insert_activity(&mut tx, "activate_relay", "relay", &relay.id).await?; tx.commit().await?; + self.emit(activity); Ok(()) } @@ -190,17 +215,25 @@ impl Command { .execute(&mut *tx) .await?; - Self::insert_activity(&mut tx, "fail_relay_sync", "relay", &relay.id).await?; + let activity = Self::insert_activity(&mut tx, "fail_relay_sync", "relay", &relay.id).await?; tx.commit().await?; + self.emit(activity); Ok(()) } - pub async fn mark_relay_synced(&self, relay_id: &str) -> Result<()> { + pub async fn complete_relay_sync(&self, relay_id: &str) -> Result<()> { + let mut tx = self.pool.begin().await?; + sqlx::query("UPDATE relay SET synced = 1, status = 'active', sync_error = '' WHERE id = ?") .bind(relay_id) - .execute(&self.pool) + .execute(&mut *tx) .await?; + + let activity = Self::insert_activity(&mut tx, "complete_relay_sync", "relay", relay_id).await?; + + tx.commit().await?; + self.emit(activity); Ok(()) } } diff --git a/backend/src/infra.rs b/backend/src/infra.rs index f57c90a..71eaa07 100644 --- a/backend/src/infra.rs +++ b/backend/src/infra.rs @@ -1,8 +1,8 @@ use anyhow::Result; use nostr_sdk::prelude::*; -use tokio::sync::Mutex; use crate::command::Command; +use crate::models::Activity; use crate::query::Query; #[derive(Clone)] @@ -15,7 +15,6 @@ pub struct Infra { api_secret: String, query: Query, command: Command, - last_activity_at: std::sync::Arc>, } impl Infra { @@ -35,62 +34,83 @@ impl Infra { api_secret, query, command, - last_activity_at: std::sync::Arc::new(Mutex::new(0)), } } pub async fn start(self) { - // Initialize from DB so we don't replay historical activities on restart - match self.query.max_activity_at().await { - Ok(ts) => *self.last_activity_at.lock().await = ts, - Err(e) => tracing::error!(error = %e, "failed to read max activity timestamp"), + // Subscribe before catch-up so no activities are missed between query and listen + let mut rx = self.command.notify.subscribe(); + + // Catch up on any unsynced relays from before this process started + match self.catch_up().await { + Ok(()) => {} + Err(e) => tracing::error!(error = %e, "infra catch-up failed"), } - let mut interval = tokio::time::interval(std::time::Duration::from_secs(10)); loop { - interval.tick().await; - if let Err(e) = self.tick().await { - tracing::error!(error = %e, "infra tick failed"); + match rx.recv().await { + Ok(activity) => { + if let Err(e) = self.handle_activity(&activity).await { + tracing::error!(error = %e, "infra handle_activity failed"); + } + } + Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => { + tracing::warn!(missed = n, "infra lagged, running catch-up"); + if let Err(e) = self.catch_up().await { + tracing::error!(error = %e, "infra catch-up failed"); + } + } + Err(tokio::sync::broadcast::error::RecvError::Closed) => break, } } } - pub async fn tick(&self) -> Result<()> { - let mut since_guard = self.last_activity_at.lock().await; - let since = *since_guard; - let activity = self.query.list_activity(&since).await?; - - for a in activity { - let needs_sync = matches!( - a.activity_type.as_str(), - "create_relay" | "update_relay" | "deactivate_relay" - ); - - if needs_sync { - let Some(relay) = self.query.get_relay(&a.resource_id).await? else { - continue; - }; - + async fn catch_up(&self) -> Result<()> { + let relays = self.query.list_relays().await?; + for relay in relays { + if relay.status == "new" || (relay.sync_error != "" && relay.status != "inactive") { let is_new = relay.synced == 0; - - match self.sync_relay(&relay, is_new).await { - Ok(()) => { - tracing::info!(relay = %relay.id, "relay sync succeeded"); - self.command.mark_relay_synced(&relay.id).await? - } - Err(e) => { - tracing::warn!(relay = %relay.id, error = %e, "relay sync failed"); - self.command.fail_relay_sync(&relay, e.to_string()).await?; - } - } + self.sync_and_report(&relay, is_new).await; } + } + Ok(()) + } - *since_guard = (*since_guard).max(a.created_at); + async fn handle_activity(&self, activity: &Activity) -> Result<()> { + let needs_sync = matches!( + activity.activity_type.as_str(), + "create_relay" | "update_relay" | "deactivate_relay" + ); + + if needs_sync { + let Some(relay) = self.query.get_relay(&activity.resource_id).await? else { + return Ok(()); + }; + + let is_new = relay.synced == 0; + self.sync_and_report(&relay, is_new).await; } Ok(()) } + async fn sync_and_report(&self, relay: &crate::models::Relay, is_new: bool) { + match self.sync_relay(relay, is_new).await { + Ok(()) => { + tracing::info!(relay = %relay.id, "relay sync succeeded"); + if let Err(e) = self.command.complete_relay_sync(&relay.id).await { + tracing::error!(relay = %relay.id, error = %e, "failed to mark sync complete"); + } + } + Err(e) => { + tracing::warn!(relay = %relay.id, error = %e, "relay sync failed"); + if let Err(e2) = self.command.fail_relay_sync(relay, e.to_string()).await { + tracing::error!(relay = %relay.id, error = %e2, "failed to record sync failure"); + } + } + } + } + async fn nip98_auth(&self, url: &str, method: HttpMethod) -> Result { let keys = Keys::parse(&self.api_secret)?; let server_url = Url::parse(url)?; diff --git a/todo.md b/todo.md index 73d56ef..7715444 100644 --- a/todo.md +++ b/todo.md @@ -1,3 +1,2 @@ -- [ ] Update infra to listen to sqlite - [ ] Fix billing by using stripe as a backend to do proration, then mark invoices paid manually when using bitcoin. - [ ] Send a payment link instead of an invoice so we can generate/pay on the fly