fix: silent relay state drift when activity bus drops events (#53)
Co-authored-by: userAdityaa <aditya.chaudhary1558@gmail.com> Co-committed-by: userAdityaa <aditya.chaudhary1558@gmail.com>
This commit was merged in pull request #53.
This commit is contained in:
@@ -120,6 +120,10 @@ impl Billing {
|
||||
pub async fn start(self) {
|
||||
let mut rx = self.command.notify.subscribe();
|
||||
|
||||
if let Err(error) = self.reconcile_relay_subscriptions("startup").await {
|
||||
tracing::error!(error = %error, "failed to reconcile relay billing state on startup");
|
||||
}
|
||||
|
||||
loop {
|
||||
match rx.recv().await {
|
||||
Ok(activity) => {
|
||||
@@ -129,12 +133,39 @@ impl Billing {
|
||||
}
|
||||
Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
|
||||
tracing::warn!(missed = n, "billing lagged");
|
||||
|
||||
if let Err(error) = self.reconcile_relay_subscriptions("lagged").await {
|
||||
tracing::error!(error = %error, "failed to reconcile relay billing state after lag");
|
||||
}
|
||||
}
|
||||
Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn reconcile_relay_subscriptions(&self, source: &str) -> Result<()> {
|
||||
let relays = self.query.list_relays().await?;
|
||||
|
||||
if relays.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
tracing::info!(source, relay_count = relays.len(), "reconciling relay billing state");
|
||||
|
||||
for relay in relays {
|
||||
if let Err(error) = self.sync_relay_subscription_for_relay(&relay).await {
|
||||
tracing::error!(
|
||||
source,
|
||||
relay = %relay.id,
|
||||
error = %error,
|
||||
"failed to reconcile relay billing state"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn handle_activity(&self, activity: &Activity) -> Result<()> {
|
||||
let needs_billing_sync = matches!(
|
||||
activity.activity_type.as_str(),
|
||||
@@ -158,6 +189,10 @@ impl Billing {
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
self.sync_relay_subscription_for_relay(&relay).await
|
||||
}
|
||||
|
||||
async fn sync_relay_subscription_for_relay(&self, relay: &Relay) -> Result<()> {
|
||||
let Some(tenant) = self.query.get_tenant(&relay.tenant).await? else {
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
Reference in New Issue
Block a user