From 7c1b317d0884b0e4098cbcbbd90c81a95e26d83f Mon Sep 17 00:00:00 2001 From: kjuulh Date: Sun, 24 Nov 2024 00:25:02 +0100 Subject: [PATCH] feat: adding test to make sure we can gracefully shutdown Signed-off-by: kjuulh --- crates/mad/src/lib.rs | 8 +++++++ crates/mad/tests/mod.rs | 51 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 59 insertions(+) diff --git a/crates/mad/src/lib.rs b/crates/mad/src/lib.rs index b73c7a4..01f44ab 100644 --- a/crates/mad/src/lib.rs +++ b/crates/mad/src/lib.rs @@ -133,6 +133,7 @@ impl Mad { let mut channels = Vec::new(); let cancellation_token = CancellationToken::new(); let job_cancellation = CancellationToken::new(); + let job_done = CancellationToken::new(); for comp in &self.components { let comp = comp.clone(); @@ -160,6 +161,8 @@ impl Mad { tokio::spawn({ let cancellation_token = cancellation_token.child_token(); + let job_done = job_done.child_token(); + let wait_cancel = self.should_cancel; async move { @@ -176,6 +179,9 @@ impl Mad { _ = cancellation_token.cancelled() => { job_cancellation.cancel(); } + _ = job_done.cancelled() => { + should_cancel(job_cancellation, wait_cancel).await; + } _ = tokio::signal::ctrl_c() => { should_cancel(job_cancellation, wait_cancel).await; } @@ -204,6 +210,8 @@ impl Mad { tracing::debug!(component = msg.name, "component ran to completion"); } } + + job_done.cancel(); } tracing::debug!("ran components"); diff --git a/crates/mad/tests/mod.rs b/crates/mad/tests/mod.rs index aa54fbf..81bef33 100644 --- a/crates/mad/tests/mod.rs +++ b/crates/mad/tests/mod.rs @@ -1,6 +1,9 @@ +use std::sync::Arc; + use async_trait::async_trait; use notmad::{Component, Mad}; use rand::Rng; +use tokio::sync::Mutex; use tokio_util::sync::CancellationToken; use tracing_test::traced_test; @@ -86,3 +89,51 @@ async fn test_can_run_components() -> anyhow::Result<()> { Ok(()) } + +#[tokio::test] +#[traced_test] +async fn test_can_shutdown_gracefully() -> anyhow::Result<()> { + let check = Arc::new(Mutex::new(None)); + + Mad::builder() + .add_fn({ + let check = check.clone(); + + move |cancel| { + let check = check.clone(); + + async move { + let start = std::time::SystemTime::now(); + tracing::info!("waiting for cancel"); + cancel.cancelled().await; + tracing::info!("submitting check"); + let mut check = check.lock().await; + let elapsed = start.elapsed().expect("to be able to get elapsed"); + *check = Some(elapsed); + tracing::info!("check submitted"); + + Ok(()) + } + } + }) + .add_fn(|_| async move { + tracing::info!("starting sleep"); + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + tracing::info!("sleep ended"); + + Ok(()) + }) + .run() + .await?; + + let check = check + .lock() + .await + .expect("to be able to get a duration from cancel"); + + // We default wait 100 ms for graceful shutdown, and we explicitly wait 100ms in the sleep routine + tracing::info!("check millis: {}", check.as_millis()); + assert!(check.as_millis() < 250); + + Ok(()) +}