Compare commits
7 Commits
Author | SHA1 | Date | |
---|---|---|---|
8342822e81 | |||
0608fbbfe6 | |||
a7b5394d08 | |||
af10f48d8d | |||
9c39c0daa2 | |||
ca233208e2 | |||
3c9d15779d |
11
CHANGELOG.md
11
CHANGELOG.md
@ -6,6 +6,17 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
|||||||
|
|
||||||
## [Unreleased]
|
## [Unreleased]
|
||||||
|
|
||||||
|
## [0.3.4] - 2025-03-27
|
||||||
|
|
||||||
|
### Added
|
||||||
|
- add cron
|
||||||
|
|
||||||
|
## [0.3.3] - 2025-03-27
|
||||||
|
|
||||||
|
### Added
|
||||||
|
- don't stop on error
|
||||||
|
- redo main part
|
||||||
|
|
||||||
## [0.3.2] - 2025-03-27
|
## [0.3.2] - 2025-03-27
|
||||||
|
|
||||||
### Fixed
|
### Fixed
|
||||||
|
31
Cargo.lock
generated
31
Cargo.lock
generated
@ -43,9 +43,9 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "anyhow"
|
name = "anyhow"
|
||||||
version = "1.0.97"
|
version = "1.0.98"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "dcfed56ad506cb2c684a14971b8861fdc3baaaae314b9e5f9bb532cbe3ba7a4f"
|
checksum = "e16d2d3311acee920a9eb8d33b8cbc1787ce4a264e85f964c2404b969bdcd487"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "async-trait"
|
name = "async-trait"
|
||||||
@ -132,6 +132,17 @@ version = "0.8.7"
|
|||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b"
|
checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "cron"
|
||||||
|
version = "0.15.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "5877d3fbf742507b66bc2a1945106bd30dd8504019d596901ddd012a4dd01740"
|
||||||
|
dependencies = [
|
||||||
|
"chrono",
|
||||||
|
"once_cell",
|
||||||
|
"winnow",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "futures-core"
|
name = "futures-core"
|
||||||
version = "0.3.31"
|
version = "0.3.31"
|
||||||
@ -249,11 +260,12 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "nodrift"
|
name = "nodrift"
|
||||||
version = "0.3.1"
|
version = "0.3.4"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"anyhow",
|
"anyhow",
|
||||||
"async-trait",
|
"async-trait",
|
||||||
"chrono",
|
"chrono",
|
||||||
|
"cron",
|
||||||
"thiserror",
|
"thiserror",
|
||||||
"tokio",
|
"tokio",
|
||||||
"tokio-util",
|
"tokio-util",
|
||||||
@ -502,9 +514,9 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "tokio"
|
name = "tokio"
|
||||||
version = "1.44.1"
|
version = "1.44.2"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "f382da615b842244d4b8738c82ed1275e6c5dd90c459a30941cd07080b06c91a"
|
checksum = "e6b88822cbe49de4185e3a4cbf8321dd487cf5fe0c5c65695fef6346371e9c48"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"backtrace",
|
"backtrace",
|
||||||
"bytes",
|
"bytes",
|
||||||
@ -810,3 +822,12 @@ name = "windows_x86_64_msvc"
|
|||||||
version = "0.52.6"
|
version = "0.52.6"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec"
|
checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "winnow"
|
||||||
|
version = "0.6.26"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "1e90edd2ac1aa278a5c4599b1d89cf03074b610800f866d4026dc199d7929a28"
|
||||||
|
dependencies = [
|
||||||
|
"memchr",
|
||||||
|
]
|
||||||
|
@ -3,7 +3,7 @@ members = ["crates/*"]
|
|||||||
resolver = "2"
|
resolver = "2"
|
||||||
|
|
||||||
[workspace.package]
|
[workspace.package]
|
||||||
version = "0.3.2"
|
version = "0.3.4"
|
||||||
|
|
||||||
[workspace.dependencies]
|
[workspace.dependencies]
|
||||||
drift = { path = "crates/drift" }
|
drift = { path = "crates/drift" }
|
||||||
|
@ -9,6 +9,7 @@ edition = "2021"
|
|||||||
anyhow.workspace = true
|
anyhow.workspace = true
|
||||||
async-trait = "0.1.81"
|
async-trait = "0.1.81"
|
||||||
chrono = "0.4.40"
|
chrono = "0.4.40"
|
||||||
|
cron = "0.15.0"
|
||||||
thiserror = "2.0.0"
|
thiserror = "2.0.0"
|
||||||
tokio.workspace = true
|
tokio.workspace = true
|
||||||
tokio-util = "0.7.11"
|
tokio-util = "0.7.11"
|
||||||
|
@ -1,8 +1,7 @@
|
|||||||
use std::{sync::Arc, time::Duration};
|
use std::{str::FromStr, sync::Arc, time::Duration};
|
||||||
|
|
||||||
use anyhow::Context;
|
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use chrono::{DateTime, Local, TimeDelta};
|
use chrono::{DateTime, Local, TimeDelta, Utc};
|
||||||
use std::future::Future;
|
use std::future::Future;
|
||||||
use tokio::time;
|
use tokio::time;
|
||||||
use tokio_util::sync::CancellationToken;
|
use tokio_util::sync::CancellationToken;
|
||||||
@ -23,6 +22,86 @@ where
|
|||||||
schedule_drifter(interval, drifter)
|
schedule_drifter(interval, drifter)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn schedule_cron<F, Fut>(cron: &str, func: F) -> anyhow::Result<CancellationToken>
|
||||||
|
where
|
||||||
|
F: Fn() -> Fut + Send + Sync + 'static,
|
||||||
|
Fut: Future<Output = Result<(), DriftError>> + Send + 'static,
|
||||||
|
{
|
||||||
|
let drifter = FuncDrifter::new(func);
|
||||||
|
|
||||||
|
schedule_drifter_cron(cron, drifter)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn schedule_drifter_cron<FDrifter>(
|
||||||
|
cron: &str,
|
||||||
|
drifter: FDrifter,
|
||||||
|
) -> anyhow::Result<CancellationToken>
|
||||||
|
where
|
||||||
|
FDrifter: Drifter + Send + 'static,
|
||||||
|
FDrifter: Clone,
|
||||||
|
{
|
||||||
|
let schedule = ::cron::Schedule::from_str(cron)?;
|
||||||
|
|
||||||
|
let cancellation_token = CancellationToken::new();
|
||||||
|
|
||||||
|
tokio::spawn({
|
||||||
|
let cancellation_token = cancellation_token.clone();
|
||||||
|
let drifter = drifter.clone();
|
||||||
|
|
||||||
|
async move {
|
||||||
|
let upcoming = schedule.upcoming(Utc {});
|
||||||
|
|
||||||
|
let child_token = cancellation_token.child_token();
|
||||||
|
for datetime in upcoming {
|
||||||
|
let now = Utc::now();
|
||||||
|
|
||||||
|
let diff = datetime - now;
|
||||||
|
if diff <= TimeDelta::zero() {
|
||||||
|
tracing::info!(
|
||||||
|
"job schedule for {} was in the past: {}, skipping iteration",
|
||||||
|
datetime.to_string(),
|
||||||
|
now.to_string()
|
||||||
|
);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
let diff = diff.to_std().expect("to be able to get diff time");
|
||||||
|
let sleep = time::sleep(diff);
|
||||||
|
tokio::pin!(sleep);
|
||||||
|
|
||||||
|
tracing::debug!(
|
||||||
|
"schedule job: {}, waiting: {}s for execution",
|
||||||
|
datetime.to_string(),
|
||||||
|
diff.as_secs()
|
||||||
|
);
|
||||||
|
|
||||||
|
tokio::select! {
|
||||||
|
_ = cancellation_token.cancelled() => {
|
||||||
|
tracing::trace!("stopping drift job");
|
||||||
|
|
||||||
|
break
|
||||||
|
}
|
||||||
|
_ = &mut sleep => {
|
||||||
|
let start = std::time::Instant::now();
|
||||||
|
|
||||||
|
tracing::debug!("running job");
|
||||||
|
if let Err(e) = drifter.execute(child_token.child_token()).await {
|
||||||
|
tracing::error!("drift job failed with error: {}", e);
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
let elapsed = start.elapsed();
|
||||||
|
|
||||||
|
tracing::debug!("job took: {}ms ", elapsed.as_millis());
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
Ok(cancellation_token)
|
||||||
|
}
|
||||||
pub fn schedule_drifter<FDrifter>(interval: Duration, drifter: FDrifter) -> CancellationToken
|
pub fn schedule_drifter<FDrifter>(interval: Duration, drifter: FDrifter) -> CancellationToken
|
||||||
where
|
where
|
||||||
FDrifter: Drifter + Send + 'static,
|
FDrifter: Drifter + Send + 'static,
|
||||||
@ -35,22 +114,7 @@ where
|
|||||||
let drifter = drifter.clone();
|
let drifter = drifter.clone();
|
||||||
|
|
||||||
async move {
|
async move {
|
||||||
let start = std::time::Instant::now();
|
let mut wait = Duration::default();
|
||||||
|
|
||||||
tracing::debug!("running job");
|
|
||||||
let child_token = cancellation_token.child_token();
|
|
||||||
if let Err(e) = drifter.execute(child_token).await {
|
|
||||||
tracing::error!("drift job failed with error: {}, stopping routine", e);
|
|
||||||
cancellation_token.cancel();
|
|
||||||
}
|
|
||||||
|
|
||||||
let elapsed = start.elapsed();
|
|
||||||
let mut wait = interval.saturating_sub(elapsed);
|
|
||||||
tracing::debug!(
|
|
||||||
"job took: {}ms, waiting: {}ms for next run",
|
|
||||||
elapsed.as_millis(),
|
|
||||||
wait.as_millis()
|
|
||||||
);
|
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
let child_token = cancellation_token.child_token();
|
let child_token = cancellation_token.child_token();
|
||||||
@ -68,8 +132,7 @@ where
|
|||||||
|
|
||||||
tracing::debug!("running job");
|
tracing::debug!("running job");
|
||||||
if let Err(e) = drifter.execute(child_token).await {
|
if let Err(e) = drifter.execute(child_token).await {
|
||||||
tracing::error!("drift job failed with error: {}, stopping routine", e);
|
tracing::error!("drift job failed with error: {}", e);
|
||||||
cancellation_token.cancel();
|
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -228,6 +291,73 @@ mod tests {
|
|||||||
assert!(logs_contain("running job"));
|
assert!(logs_contain("running job"));
|
||||||
assert!(logs_contain("job took:"));
|
assert!(logs_contain("job took:"));
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
#[tokio::test]
|
||||||
|
#[traced_test]
|
||||||
|
async fn test_calls_trace_on_start_and_end_long() -> anyhow::Result<()> {
|
||||||
|
let token = schedule(Duration::from_millis(100), || async {
|
||||||
|
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
});
|
||||||
|
tokio::time::sleep(Duration::from_millis(500)).await;
|
||||||
|
|
||||||
|
assert!(!token.is_cancelled());
|
||||||
|
|
||||||
|
assert!(logs_contain("running job"));
|
||||||
|
assert!(logs_contain("job took:"));
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
#[traced_test]
|
||||||
|
async fn test_cron() -> anyhow::Result<()> {
|
||||||
|
let token = schedule_cron("* * * * * *", || async {
|
||||||
|
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
})?;
|
||||||
|
|
||||||
|
tokio::time::sleep(Duration::from_secs(5)).await;
|
||||||
|
|
||||||
|
assert!(!token.is_cancelled());
|
||||||
|
|
||||||
|
assert!(logs_contain("running job"));
|
||||||
|
assert!(logs_contain("job took:"));
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
#[traced_test]
|
||||||
|
async fn test_cron_no_wait() -> anyhow::Result<()> {
|
||||||
|
let token = schedule_cron("* * * * * *", || async { Ok(()) })?;
|
||||||
|
|
||||||
|
tokio::time::sleep(Duration::from_secs(5)).await;
|
||||||
|
|
||||||
|
assert!(!token.is_cancelled());
|
||||||
|
|
||||||
|
assert!(logs_contain("running job"));
|
||||||
|
assert!(logs_contain("job took:"));
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
#[traced_test]
|
||||||
|
async fn test_cron_job_taking_longer_than_cycle() -> anyhow::Result<()> {
|
||||||
|
let token = schedule_cron("* * * * * *", || async {
|
||||||
|
tokio::time::sleep(std::time::Duration::from_millis(1500)).await;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
})?;
|
||||||
|
|
||||||
|
tokio::time::sleep(Duration::from_secs(5)).await;
|
||||||
|
|
||||||
|
assert!(!token.is_cancelled());
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user