Allow infra to listen to activity actively

This commit is contained in:
Jon Staab
2026-04-01 16:01:10 -07:00
parent 07dfe86210
commit 3e131b6a1b
6 changed files with 131 additions and 68 deletions
+1
View File
@@ -12,3 +12,4 @@ Members:
## `pub fn new(query: Query, command: Command, robot: Robot) -> Self`
- Reads environment and populates members
- Subscribes to `command.notify.notified`
+6 -2
View File
@@ -5,15 +5,19 @@ Command writes to the database.
Members:
- `pool: SqlitePool` - a sqlite connection pool
- `pub notify: broadcast::Sender<Activity>` - callers can subscribe via `command.notify.subscribe()`
Notes:
- All public write methods should be atomic
- All writes should be accompanied by an activity log entry of `(tenant, activity_type, resource_type, resource_id)`
- `insert_activity` builds and returns the `Activity` struct (using `chrono::Utc::now()` for `created_at`)
- After each successful commit, sends the `Activity` on the broadcast channel
## `pub fn new(&self, pool: SqlitePool) -> Self`
- Assigns pool to self
- Creates the broadcast channel
## `pub fn create_tenant(&self, tenant: &Tenant) -> Result<()>`
@@ -51,7 +55,7 @@ Notes:
- Sets relay status to `inactive`, sets `sync_error`
- Logs activity as `(fail_relay_sync, relay_id)`
## `pub fn mark_relay_synced(&self, relay_id: &str) -> Result<()>`
## `pub fn complete_relay_sync(&self, relay_id: &str) -> Result<()>`
- Sets `synced = 1`, `status = 'active'`, clears `sync_error`
- No activity log (called by infra after successful sync)
- Logs activity as `(complete_relay_sync, relay_id)`
+16 -10
View File
@@ -1,6 +1,6 @@
# `pub struct Infra`
Infra is a service which polls the database and synchronizes updates to relays to a remote zooid instance via `api_url`.
Infra is a service which listens for activity and synchronizes relay updates to a remote zooid instance via `api_url`.
Members:
@@ -14,18 +14,24 @@ Members:
## `pub async fn start(self)`
- Initializes `last_activity_at` from `query.max_activity_at()` so historical activities are not replayed on restart.
- Calls `self.tick` in a loop every 10 seconds.
- Subscribes to `command.notify` before doing anything else so no activities are missed.
- Calls `catch_up` to sync any relays that need it from before this process started.
- Loops on `rx.recv()`, calling `handle_activity` for each received `Activity`.
- On `Lagged`, logs a warning and runs `catch_up` to recover.
## `pub async fn tick(self)`
## `async fn catch_up(&self)`
Iterates over `query.list_activity` since last run and does the following:
- Lists all relays via `query.list_relays()` and syncs any that have `status = "new"` or a non-empty `sync_error`.
- For `create_relay`, `update_relay`, or `deactivate_relay` activity, sync the relay to zooid.
- Uses `relay.synced` to decide POST vs PUT (not the activity type), so already-synced relays always use PUT even on restart.
- On success, calls `command.mark_relay_synced` to set `synced = 1`, `status = 'active'`, and clear `sync_error`.
- On failure, calls `command.fail_relay_sync`.
- All other activity types are ignored (e.g. `fail_relay_sync` must not trigger another sync).
## `async fn handle_activity(&self, activity: &Activity)`
- For `create_relay`, `update_relay`, or `deactivate_relay` activity, calls `sync_and_report`.
- All other activity types are ignored (e.g. `fail_relay_sync`, `complete_relay_sync`).
## `async fn sync_and_report(&self, relay: &Relay, is_new: bool)`
- Calls `sync_relay` and on success calls `command.complete_relay_sync`.
- On failure calls `command.fail_relay_sync`.
## `async fn sync_relay(&self, relay: &Relay, is_new: bool)`
+49 -16
View File
@@ -1,16 +1,19 @@
use anyhow::Result;
use sqlx::{Sqlite, SqlitePool, Transaction};
use tokio::sync::broadcast;
use crate::models::{Relay, Tenant};
use crate::models::{Activity, Relay, Tenant};
#[derive(Clone)]
pub struct Command {
pool: SqlitePool,
pub notify: broadcast::Sender<Activity>,
}
impl Command {
pub fn new(pool: SqlitePool) -> Self {
Self { pool }
let (notify, _) = broadcast::channel(64);
Self { pool, notify }
}
async fn insert_activity(
@@ -18,7 +21,7 @@ impl Command {
activity_type: &str,
resource_type: &str,
resource_id: &str,
) -> Result<()> {
) -> Result<Activity> {
let tenant = match resource_type {
"tenant" => resource_id.to_string(),
"relay" => {
@@ -30,18 +33,34 @@ impl Command {
_ => anyhow::bail!("unknown resource_type: {}", resource_type),
};
let id = uuid::Uuid::new_v4().to_string();
let created_at = chrono::Utc::now().timestamp();
sqlx::query(
"INSERT INTO activity (id, tenant, created_at, activity_type, resource_type, resource_id)
VALUES (?, ?, strftime('%s','now'), ?, ?, ?)",
VALUES (?, ?, ?, ?, ?, ?)",
)
.bind(uuid::Uuid::new_v4().to_string())
.bind(tenant)
.bind(&id)
.bind(&tenant)
.bind(created_at)
.bind(activity_type)
.bind(resource_type)
.bind(resource_id)
.execute(&mut **tx)
.await?;
Ok(())
Ok(Activity {
id,
tenant,
created_at,
activity_type: activity_type.to_string(),
resource_type: resource_type.to_string(),
resource_id: resource_id.to_string(),
})
}
fn emit(&self, activity: Activity) {
let _ = self.notify.send(activity);
}
pub async fn create_tenant(&self, tenant: &Tenant) -> Result<()> {
@@ -57,9 +76,10 @@ impl Command {
.execute(&mut *tx)
.await?;
Self::insert_activity(&mut tx, "create_tenant", "tenant", &tenant.pubkey).await?;
let activity = Self::insert_activity(&mut tx, "create_tenant", "tenant", &tenant.pubkey).await?;
tx.commit().await?;
self.emit(activity);
Ok(())
}
@@ -72,9 +92,10 @@ impl Command {
.execute(&mut *tx)
.await?;
Self::insert_activity(&mut tx, "update_tenant", "tenant", &tenant.pubkey).await?;
let activity = Self::insert_activity(&mut tx, "update_tenant", "tenant", &tenant.pubkey).await?;
tx.commit().await?;
self.emit(activity);
Ok(())
}
@@ -109,9 +130,10 @@ impl Command {
.execute(&mut *tx)
.await?;
Self::insert_activity(&mut tx, "create_relay", "relay", &relay.id).await?;
let activity = Self::insert_activity(&mut tx, "create_relay", "relay", &relay.id).await?;
tx.commit().await?;
self.emit(activity);
Ok(())
}
@@ -147,9 +169,10 @@ impl Command {
.execute(&mut *tx)
.await?;
Self::insert_activity(&mut tx, "update_relay", "relay", &relay.id).await?;
let activity = Self::insert_activity(&mut tx, "update_relay", "relay", &relay.id).await?;
tx.commit().await?;
self.emit(activity);
Ok(())
}
@@ -161,9 +184,10 @@ impl Command {
.execute(&mut *tx)
.await?;
Self::insert_activity(&mut tx, "deactivate_relay", "relay", &relay.id).await?;
let activity = Self::insert_activity(&mut tx, "deactivate_relay", "relay", &relay.id).await?;
tx.commit().await?;
self.emit(activity);
Ok(())
}
@@ -175,9 +199,10 @@ impl Command {
.execute(&mut *tx)
.await?;
Self::insert_activity(&mut tx, "activate_relay", "relay", &relay.id).await?;
let activity = Self::insert_activity(&mut tx, "activate_relay", "relay", &relay.id).await?;
tx.commit().await?;
self.emit(activity);
Ok(())
}
@@ -190,17 +215,25 @@ impl Command {
.execute(&mut *tx)
.await?;
Self::insert_activity(&mut tx, "fail_relay_sync", "relay", &relay.id).await?;
let activity = Self::insert_activity(&mut tx, "fail_relay_sync", "relay", &relay.id).await?;
tx.commit().await?;
self.emit(activity);
Ok(())
}
pub async fn mark_relay_synced(&self, relay_id: &str) -> Result<()> {
pub async fn complete_relay_sync(&self, relay_id: &str) -> Result<()> {
let mut tx = self.pool.begin().await?;
sqlx::query("UPDATE relay SET synced = 1, status = 'active', sync_error = '' WHERE id = ?")
.bind(relay_id)
.execute(&self.pool)
.execute(&mut *tx)
.await?;
let activity = Self::insert_activity(&mut tx, "complete_relay_sync", "relay", relay_id).await?;
tx.commit().await?;
self.emit(activity);
Ok(())
}
}
+59 -39
View File
@@ -1,8 +1,8 @@
use anyhow::Result;
use nostr_sdk::prelude::*;
use tokio::sync::Mutex;
use crate::command::Command;
use crate::models::Activity;
use crate::query::Query;
#[derive(Clone)]
@@ -15,7 +15,6 @@ pub struct Infra {
api_secret: String,
query: Query,
command: Command,
last_activity_at: std::sync::Arc<Mutex<i64>>,
}
impl Infra {
@@ -35,62 +34,83 @@ impl Infra {
api_secret,
query,
command,
last_activity_at: std::sync::Arc::new(Mutex::new(0)),
}
}
pub async fn start(self) {
// Initialize from DB so we don't replay historical activities on restart
match self.query.max_activity_at().await {
Ok(ts) => *self.last_activity_at.lock().await = ts,
Err(e) => tracing::error!(error = %e, "failed to read max activity timestamp"),
// Subscribe before catch-up so no activities are missed between query and listen
let mut rx = self.command.notify.subscribe();
// Catch up on any unsynced relays from before this process started
match self.catch_up().await {
Ok(()) => {}
Err(e) => tracing::error!(error = %e, "infra catch-up failed"),
}
let mut interval = tokio::time::interval(std::time::Duration::from_secs(10));
loop {
interval.tick().await;
if let Err(e) = self.tick().await {
tracing::error!(error = %e, "infra tick failed");
match rx.recv().await {
Ok(activity) => {
if let Err(e) = self.handle_activity(&activity).await {
tracing::error!(error = %e, "infra handle_activity failed");
}
}
Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
tracing::warn!(missed = n, "infra lagged, running catch-up");
if let Err(e) = self.catch_up().await {
tracing::error!(error = %e, "infra catch-up failed");
}
}
Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
}
}
}
pub async fn tick(&self) -> Result<()> {
let mut since_guard = self.last_activity_at.lock().await;
let since = *since_guard;
let activity = self.query.list_activity(&since).await?;
for a in activity {
let needs_sync = matches!(
a.activity_type.as_str(),
"create_relay" | "update_relay" | "deactivate_relay"
);
if needs_sync {
let Some(relay) = self.query.get_relay(&a.resource_id).await? else {
continue;
};
async fn catch_up(&self) -> Result<()> {
let relays = self.query.list_relays().await?;
for relay in relays {
if relay.status == "new" || (relay.sync_error != "" && relay.status != "inactive") {
let is_new = relay.synced == 0;
match self.sync_relay(&relay, is_new).await {
Ok(()) => {
tracing::info!(relay = %relay.id, "relay sync succeeded");
self.command.mark_relay_synced(&relay.id).await?
}
Err(e) => {
tracing::warn!(relay = %relay.id, error = %e, "relay sync failed");
self.command.fail_relay_sync(&relay, e.to_string()).await?;
}
}
self.sync_and_report(&relay, is_new).await;
}
}
Ok(())
}
*since_guard = (*since_guard).max(a.created_at);
async fn handle_activity(&self, activity: &Activity) -> Result<()> {
let needs_sync = matches!(
activity.activity_type.as_str(),
"create_relay" | "update_relay" | "deactivate_relay"
);
if needs_sync {
let Some(relay) = self.query.get_relay(&activity.resource_id).await? else {
return Ok(());
};
let is_new = relay.synced == 0;
self.sync_and_report(&relay, is_new).await;
}
Ok(())
}
async fn sync_and_report(&self, relay: &crate::models::Relay, is_new: bool) {
match self.sync_relay(relay, is_new).await {
Ok(()) => {
tracing::info!(relay = %relay.id, "relay sync succeeded");
if let Err(e) = self.command.complete_relay_sync(&relay.id).await {
tracing::error!(relay = %relay.id, error = %e, "failed to mark sync complete");
}
}
Err(e) => {
tracing::warn!(relay = %relay.id, error = %e, "relay sync failed");
if let Err(e2) = self.command.fail_relay_sync(relay, e.to_string()).await {
tracing::error!(relay = %relay.id, error = %e2, "failed to record sync failure");
}
}
}
}
async fn nip98_auth(&self, url: &str, method: HttpMethod) -> Result<String> {
let keys = Keys::parse(&self.api_secret)?;
let server_url = Url::parse(url)?;
-1
View File
@@ -1,3 +1,2 @@
- [ ] Update infra to listen to sqlite
- [ ] Fix billing by using stripe as a backend to do proration, then mark invoices paid manually when using bitcoin.
- [ ] Send a payment link instead of an invoice so we can generate/pay on the fly