From 6789e3f14a6848abccc4a4a87f1e4fa5cdb21dee Mon Sep 17 00:00:00 2001 From: kjuulh Date: Sat, 13 Apr 2024 13:27:20 +0200 Subject: [PATCH] feat: add backoff Signed-off-by: kjuulh --- Cargo.lock | 13 +++ crates/contractor/Cargo.toml | 1 + crates/contractor/src/services/gitea.rs | 97 ++++++++++++++------ crates/contractor/src/services/reconciler.rs | 1 + 4 files changed, 83 insertions(+), 29 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 76f4322..ab8b344 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -186,6 +186,18 @@ dependencies = [ "tracing", ] +[[package]] +name = "backon" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d67782c3f868daa71d3533538e98a8e13713231969def7536e8039606fc46bf0" +dependencies = [ + "fastrand", + "futures-core", + "pin-project", + "tokio", +] + [[package]] name = "backtrace" version = "0.3.71" @@ -344,6 +356,7 @@ version = "0.1.0" dependencies = [ "anyhow", "axum", + "backon", "clap", "dagger-sdk", "dotenv", diff --git a/crates/contractor/Cargo.toml b/crates/contractor/Cargo.toml index 8485f4f..f27ec4e 100644 --- a/crates/contractor/Cargo.toml +++ b/crates/contractor/Cargo.toml @@ -22,3 +22,4 @@ itertools = "0.12.1" regex = "1.10.4" serde_json = "1.0.115" dagger-sdk = "0.9.8" +backon = "0.4.4" diff --git a/crates/contractor/src/services/gitea.rs b/crates/contractor/src/services/gitea.rs index 15a1ea9..319b997 100644 --- a/crates/contractor/src/services/gitea.rs +++ b/crates/contractor/src/services/gitea.rs @@ -241,18 +241,35 @@ impl DefaultGiteaClient { tracing::trace!("calling url: {}", &url); - let response = client - .get(&url) - .header("Content-Type", "application/json") - .header("Authorization", format!("token {}", self.token)) - .send() - .await?; + let response = (|| async { + client + .get(&url) + .header("Content-Type", "application/json") + .header("Authorization", format!("token {}", self.token)) + .send() + .await + }) + .retry(&ExponentialBuilder::default()) + .notify(|err, dur| { + tracing::debug!("retrying job: {err}, in: {} seconds", dur.as_secs()); + }) + .await?; match response.error_for_status() { Ok(_) => Ok(Some(())), Err(e) => match e.status() { Some(StatusCode::NOT_FOUND) => Ok(None), - _ => anyhow::bail!(e), + Some(status) => { + tracing::warn!( + "failed to call fetch renovate for: {}, with error: {}", + &repo, + status + ); + anyhow::bail!(e) + } + _ => { + anyhow::bail!(e) + } }, } } @@ -267,12 +284,19 @@ impl DefaultGiteaClient { tracing::trace!("calling url: {}", &url); - let response = client - .get(&url) - .header("Content-Type", "application/json") - .header("Authorization", format!("token {}", self.token)) - .send() - .await?; + let response = (|| async { + client + .get(&url) + .header("Content-Type", "application/json") + .header("Authorization", format!("token {}", self.token)) + .send() + .await + }) + .retry(&ExponentialBuilder::default()) + .notify(|err, dur| { + tracing::debug!("retrying job: {err}, in: {} seconds", dur.as_secs()); + }) + .await?; let webhooks = response.json::>().await?; @@ -301,14 +325,21 @@ impl DefaultGiteaClient { serde_json::to_string(&val)? ); - let response = client - .post(&url) - .header("Content-Type", "application/json") - .header("Accept", "application/json") - .header("Authorization", format!("token {}", self.token)) - .json(&val) - .send() - .await?; + let response = (|| async { + client + .post(&url) + .header("Content-Type", "application/json") + .header("Accept", "application/json") + .header("Authorization", format!("token {}", self.token)) + .json(&val) + .send() + .await + }) + .retry(&ExponentialBuilder::default()) + .notify(|err, dur| { + tracing::debug!("retrying job: {err}, in: {} seconds", dur.as_secs()); + }) + .await?; if let Err(e) = response.error_for_status_ref() { if let Ok(ok) = response.text().await { @@ -351,14 +382,21 @@ impl DefaultGiteaClient { serde_json::to_string(&val)? ); - let response = client - .patch(&url) - .header("Content-Type", "application/json") - .header("Accept", "application/json") - .header("Authorization", format!("token {}", self.token)) - .json(&val) - .send() - .await?; + let response = (|| async { + client + .patch(&url) + .header("Content-Type", "application/json") + .header("Accept", "application/json") + .header("Authorization", format!("token {}", self.token)) + .json(&val) + .send() + .await + }) + .retry(&ExponentialBuilder::default()) + .notify(|err, dur| { + tracing::debug!("retrying job: {err}, in: {} seconds", dur.as_secs()); + }) + .await?; if let Err(e) = response.error_for_status_ref() { if let Ok(ok) = response.text().await { @@ -463,6 +501,7 @@ mod extensions; pub mod traits; use anyhow::Context; +use backon::{ExponentialBuilder, Retryable}; pub use extensions::*; use futures::{stream::FuturesUnordered, TryStreamExt}; use reqwest::{StatusCode, Url}; diff --git a/crates/contractor/src/services/reconciler.rs b/crates/contractor/src/services/reconciler.rs index cc1558b..51ac9c0 100644 --- a/crates/contractor/src/services/reconciler.rs +++ b/crates/contractor/src/services/reconciler.rs @@ -103,6 +103,7 @@ impl Reconciler { } let mut enabled = Vec::new(); + while let Some(res) = futures.next().await { let res = res?;