Compare commits

...

7 Commits
v0.3.2 ... main

Author SHA1 Message Date
8342822e81 chore(deps): update rust crate anyhow to v1.0.98
Some checks failed
continuous-integration/drone/pr Build is passing
continuous-integration/drone/push Build is failing
2025-04-14 03:24:47 +00:00
0608fbbfe6 chore(deps): update rust crate tokio to v1.44.2
All checks were successful
continuous-integration/drone/pr Build is passing
continuous-integration/drone/push Build is passing
2025-04-06 00:24:21 +00:00
a7b5394d08 chore(release): v0.3.4 (#23)
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
chore(release): 0.3.4

Co-authored-by: cuddle-please <bot@cuddle.sh>
Reviewed-on: #23
2025-03-27 15:26:54 +01:00
af10f48d8d feat: add cron
All checks were successful
continuous-integration/drone/push Build is passing
2025-03-27 15:16:57 +01:00
9c39c0daa2 chore(release): v0.3.3 (#22)
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
chore(release): 0.3.3

Co-authored-by: cuddle-please <bot@cuddle.sh>
Reviewed-on: #22
2025-03-27 14:11:17 +01:00
ca233208e2 feat: don't stop on error
All checks were successful
continuous-integration/drone/push Build is passing
2025-03-27 14:10:11 +01:00
3c9d15779d feat: redo main part
All checks were successful
continuous-integration/drone/push Build is passing
2025-03-27 14:08:26 +01:00
5 changed files with 190 additions and 27 deletions

View File

@ -6,6 +6,17 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased] ## [Unreleased]
## [0.3.4] - 2025-03-27
### Added
- add cron
## [0.3.3] - 2025-03-27
### Added
- don't stop on error
- redo main part
## [0.3.2] - 2025-03-27 ## [0.3.2] - 2025-03-27
### Fixed ### Fixed

31
Cargo.lock generated
View File

@ -43,9 +43,9 @@ dependencies = [
[[package]] [[package]]
name = "anyhow" name = "anyhow"
version = "1.0.97" version = "1.0.98"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dcfed56ad506cb2c684a14971b8861fdc3baaaae314b9e5f9bb532cbe3ba7a4f" checksum = "e16d2d3311acee920a9eb8d33b8cbc1787ce4a264e85f964c2404b969bdcd487"
[[package]] [[package]]
name = "async-trait" name = "async-trait"
@ -132,6 +132,17 @@ version = "0.8.7"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b" checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b"
[[package]]
name = "cron"
version = "0.15.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5877d3fbf742507b66bc2a1945106bd30dd8504019d596901ddd012a4dd01740"
dependencies = [
"chrono",
"once_cell",
"winnow",
]
[[package]] [[package]]
name = "futures-core" name = "futures-core"
version = "0.3.31" version = "0.3.31"
@ -249,11 +260,12 @@ dependencies = [
[[package]] [[package]]
name = "nodrift" name = "nodrift"
version = "0.3.1" version = "0.3.4"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"async-trait", "async-trait",
"chrono", "chrono",
"cron",
"thiserror", "thiserror",
"tokio", "tokio",
"tokio-util", "tokio-util",
@ -502,9 +514,9 @@ dependencies = [
[[package]] [[package]]
name = "tokio" name = "tokio"
version = "1.44.1" version = "1.44.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f382da615b842244d4b8738c82ed1275e6c5dd90c459a30941cd07080b06c91a" checksum = "e6b88822cbe49de4185e3a4cbf8321dd487cf5fe0c5c65695fef6346371e9c48"
dependencies = [ dependencies = [
"backtrace", "backtrace",
"bytes", "bytes",
@ -810,3 +822,12 @@ name = "windows_x86_64_msvc"
version = "0.52.6" version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec" checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec"
[[package]]
name = "winnow"
version = "0.6.26"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e90edd2ac1aa278a5c4599b1d89cf03074b610800f866d4026dc199d7929a28"
dependencies = [
"memchr",
]

View File

@ -3,7 +3,7 @@ members = ["crates/*"]
resolver = "2" resolver = "2"
[workspace.package] [workspace.package]
version = "0.3.2" version = "0.3.4"
[workspace.dependencies] [workspace.dependencies]
drift = { path = "crates/drift" } drift = { path = "crates/drift" }

View File

@ -9,6 +9,7 @@ edition = "2021"
anyhow.workspace = true anyhow.workspace = true
async-trait = "0.1.81" async-trait = "0.1.81"
chrono = "0.4.40" chrono = "0.4.40"
cron = "0.15.0"
thiserror = "2.0.0" thiserror = "2.0.0"
tokio.workspace = true tokio.workspace = true
tokio-util = "0.7.11" tokio-util = "0.7.11"

View File

@ -1,8 +1,7 @@
use std::{sync::Arc, time::Duration}; use std::{str::FromStr, sync::Arc, time::Duration};
use anyhow::Context;
use async_trait::async_trait; use async_trait::async_trait;
use chrono::{DateTime, Local, TimeDelta}; use chrono::{DateTime, Local, TimeDelta, Utc};
use std::future::Future; use std::future::Future;
use tokio::time; use tokio::time;
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
@ -23,6 +22,86 @@ where
schedule_drifter(interval, drifter) schedule_drifter(interval, drifter)
} }
pub fn schedule_cron<F, Fut>(cron: &str, func: F) -> anyhow::Result<CancellationToken>
where
F: Fn() -> Fut + Send + Sync + 'static,
Fut: Future<Output = Result<(), DriftError>> + Send + 'static,
{
let drifter = FuncDrifter::new(func);
schedule_drifter_cron(cron, drifter)
}
pub fn schedule_drifter_cron<FDrifter>(
cron: &str,
drifter: FDrifter,
) -> anyhow::Result<CancellationToken>
where
FDrifter: Drifter + Send + 'static,
FDrifter: Clone,
{
let schedule = ::cron::Schedule::from_str(cron)?;
let cancellation_token = CancellationToken::new();
tokio::spawn({
let cancellation_token = cancellation_token.clone();
let drifter = drifter.clone();
async move {
let upcoming = schedule.upcoming(Utc {});
let child_token = cancellation_token.child_token();
for datetime in upcoming {
let now = Utc::now();
let diff = datetime - now;
if diff <= TimeDelta::zero() {
tracing::info!(
"job schedule for {} was in the past: {}, skipping iteration",
datetime.to_string(),
now.to_string()
);
continue;
}
let diff = diff.to_std().expect("to be able to get diff time");
let sleep = time::sleep(diff);
tokio::pin!(sleep);
tracing::debug!(
"schedule job: {}, waiting: {}s for execution",
datetime.to_string(),
diff.as_secs()
);
tokio::select! {
_ = cancellation_token.cancelled() => {
tracing::trace!("stopping drift job");
break
}
_ = &mut sleep => {
let start = std::time::Instant::now();
tracing::debug!("running job");
if let Err(e) = drifter.execute(child_token.child_token()).await {
tracing::error!("drift job failed with error: {}", e);
continue
}
let elapsed = start.elapsed();
tracing::debug!("job took: {}ms ", elapsed.as_millis());
}
}
}
}
});
Ok(cancellation_token)
}
pub fn schedule_drifter<FDrifter>(interval: Duration, drifter: FDrifter) -> CancellationToken pub fn schedule_drifter<FDrifter>(interval: Duration, drifter: FDrifter) -> CancellationToken
where where
FDrifter: Drifter + Send + 'static, FDrifter: Drifter + Send + 'static,
@ -35,22 +114,7 @@ where
let drifter = drifter.clone(); let drifter = drifter.clone();
async move { async move {
let start = std::time::Instant::now(); let mut wait = Duration::default();
tracing::debug!("running job");
let child_token = cancellation_token.child_token();
if let Err(e) = drifter.execute(child_token).await {
tracing::error!("drift job failed with error: {}, stopping routine", e);
cancellation_token.cancel();
}
let elapsed = start.elapsed();
let mut wait = interval.saturating_sub(elapsed);
tracing::debug!(
"job took: {}ms, waiting: {}ms for next run",
elapsed.as_millis(),
wait.as_millis()
);
loop { loop {
let child_token = cancellation_token.child_token(); let child_token = cancellation_token.child_token();
@ -68,8 +132,7 @@ where
tracing::debug!("running job"); tracing::debug!("running job");
if let Err(e) = drifter.execute(child_token).await { if let Err(e) = drifter.execute(child_token).await {
tracing::error!("drift job failed with error: {}, stopping routine", e); tracing::error!("drift job failed with error: {}", e);
cancellation_token.cancel();
continue continue
} }
@ -228,6 +291,73 @@ mod tests {
assert!(logs_contain("running job")); assert!(logs_contain("running job"));
assert!(logs_contain("job took:")); assert!(logs_contain("job took:"));
Ok(())
}
#[tokio::test]
#[traced_test]
async fn test_calls_trace_on_start_and_end_long() -> anyhow::Result<()> {
let token = schedule(Duration::from_millis(100), || async {
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
Ok(())
});
tokio::time::sleep(Duration::from_millis(500)).await;
assert!(!token.is_cancelled());
assert!(logs_contain("running job"));
assert!(logs_contain("job took:"));
Ok(())
}
#[tokio::test]
#[traced_test]
async fn test_cron() -> anyhow::Result<()> {
let token = schedule_cron("* * * * * *", || async {
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
Ok(())
})?;
tokio::time::sleep(Duration::from_secs(5)).await;
assert!(!token.is_cancelled());
assert!(logs_contain("running job"));
assert!(logs_contain("job took:"));
Ok(())
}
#[tokio::test]
#[traced_test]
async fn test_cron_no_wait() -> anyhow::Result<()> {
let token = schedule_cron("* * * * * *", || async { Ok(()) })?;
tokio::time::sleep(Duration::from_secs(5)).await;
assert!(!token.is_cancelled());
assert!(logs_contain("running job"));
assert!(logs_contain("job took:"));
Ok(())
}
#[tokio::test]
#[traced_test]
async fn test_cron_job_taking_longer_than_cycle() -> anyhow::Result<()> {
let token = schedule_cron("* * * * * *", || async {
tokio::time::sleep(std::time::Duration::from_millis(1500)).await;
Ok(())
})?;
tokio::time::sleep(Duration::from_secs(5)).await;
assert!(!token.is_cancelled());
Ok(()) Ok(())
} }
} }