From af10f48d8d48348c5db7e077826439ac181f53d0 Mon Sep 17 00:00:00 2001 From: kjuulh Date: Thu, 27 Mar 2025 14:33:56 +0100 Subject: [PATCH] feat: add cron --- Cargo.lock | 23 ++++++- crates/drift/Cargo.toml | 1 + crates/drift/src/lib.rs | 135 +++++++++++++++++++++++++++++++++++++++- 3 files changed, 155 insertions(+), 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b314221..3ed8cfd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -132,6 +132,17 @@ version = "0.8.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b" +[[package]] +name = "cron" +version = "0.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5877d3fbf742507b66bc2a1945106bd30dd8504019d596901ddd012a4dd01740" +dependencies = [ + "chrono", + "once_cell", + "winnow", +] + [[package]] name = "futures-core" version = "0.3.31" @@ -249,11 +260,12 @@ dependencies = [ [[package]] name = "nodrift" -version = "0.3.2" +version = "0.3.3" dependencies = [ "anyhow", "async-trait", "chrono", + "cron", "thiserror", "tokio", "tokio-util", @@ -810,3 +822,12 @@ name = "windows_x86_64_msvc" version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec" + +[[package]] +name = "winnow" +version = "0.6.26" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e90edd2ac1aa278a5c4599b1d89cf03074b610800f866d4026dc199d7929a28" +dependencies = [ + "memchr", +] diff --git a/crates/drift/Cargo.toml b/crates/drift/Cargo.toml index a8e40a3..7bc9f4d 100644 --- a/crates/drift/Cargo.toml +++ b/crates/drift/Cargo.toml @@ -9,6 +9,7 @@ edition = "2021" anyhow.workspace = true async-trait = "0.1.81" chrono = "0.4.40" +cron = "0.15.0" thiserror = "2.0.0" tokio.workspace = true tokio-util = "0.7.11" diff --git a/crates/drift/src/lib.rs b/crates/drift/src/lib.rs index 7b12e9d..6917dfc 100644 --- a/crates/drift/src/lib.rs +++ b/crates/drift/src/lib.rs @@ -1,8 +1,7 @@ -use std::{sync::Arc, time::Duration}; +use std::{str::FromStr, sync::Arc, time::Duration}; -use anyhow::Context; use async_trait::async_trait; -use chrono::{DateTime, Local, TimeDelta}; +use chrono::{DateTime, Local, TimeDelta, Utc}; use std::future::Future; use tokio::time; use tokio_util::sync::CancellationToken; @@ -23,6 +22,86 @@ where schedule_drifter(interval, drifter) } +pub fn schedule_cron(cron: &str, func: F) -> anyhow::Result +where + F: Fn() -> Fut + Send + Sync + 'static, + Fut: Future> + Send + 'static, +{ + let drifter = FuncDrifter::new(func); + + schedule_drifter_cron(cron, drifter) +} + +pub fn schedule_drifter_cron( + cron: &str, + drifter: FDrifter, +) -> anyhow::Result +where + FDrifter: Drifter + Send + 'static, + FDrifter: Clone, +{ + let schedule = ::cron::Schedule::from_str(cron)?; + + let cancellation_token = CancellationToken::new(); + + tokio::spawn({ + let cancellation_token = cancellation_token.clone(); + let drifter = drifter.clone(); + + async move { + let upcoming = schedule.upcoming(Utc {}); + + let child_token = cancellation_token.child_token(); + for datetime in upcoming { + let now = Utc::now(); + + let diff = datetime - now; + if diff <= TimeDelta::zero() { + tracing::info!( + "job schedule for {} was in the past: {}, skipping iteration", + datetime.to_string(), + now.to_string() + ); + continue; + } + + let diff = diff.to_std().expect("to be able to get diff time"); + let sleep = time::sleep(diff); + tokio::pin!(sleep); + + tracing::debug!( + "schedule job: {}, waiting: {}s for execution", + datetime.to_string(), + diff.as_secs() + ); + + tokio::select! { + _ = cancellation_token.cancelled() => { + tracing::trace!("stopping drift job"); + + break + } + _ = &mut sleep => { + let start = std::time::Instant::now(); + + tracing::debug!("running job"); + if let Err(e) = drifter.execute(child_token.child_token()).await { + tracing::error!("drift job failed with error: {}", e); + continue + } + + let elapsed = start.elapsed(); + + tracing::debug!("job took: {}ms ", elapsed.as_millis()); + } + + } + } + } + }); + + Ok(cancellation_token) +} pub fn schedule_drifter(interval: Duration, drifter: FDrifter) -> CancellationToken where FDrifter: Drifter + Send + 'static, @@ -231,4 +310,54 @@ mod tests { Ok(()) } + + #[tokio::test] + #[traced_test] + async fn test_cron() -> anyhow::Result<()> { + let token = schedule_cron("* * * * * *", || async { + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + + Ok(()) + })?; + + tokio::time::sleep(Duration::from_secs(5)).await; + + assert!(!token.is_cancelled()); + + assert!(logs_contain("running job")); + assert!(logs_contain("job took:")); + + Ok(()) + } + + #[tokio::test] + #[traced_test] + async fn test_cron_no_wait() -> anyhow::Result<()> { + let token = schedule_cron("* * * * * *", || async { Ok(()) })?; + + tokio::time::sleep(Duration::from_secs(5)).await; + + assert!(!token.is_cancelled()); + + assert!(logs_contain("running job")); + assert!(logs_contain("job took:")); + + Ok(()) + } + + #[tokio::test] + #[traced_test] + async fn test_cron_job_taking_longer_than_cycle() -> anyhow::Result<()> { + let token = schedule_cron("* * * * * *", || async { + tokio::time::sleep(std::time::Duration::from_millis(1500)).await; + + Ok(()) + })?; + + tokio::time::sleep(Duration::from_secs(5)).await; + + assert!(!token.is_cancelled()); + + Ok(()) + } }