feat: make sure to close down properly
All checks were successful
continuous-integration/drone/push Build is passing

Signed-off-by: kjuulh <contact@kjuulh.io>
This commit is contained in:
Kasper Juul Hermansen 2024-11-24 00:08:03 +01:00
parent 0eb24aa937
commit 1fec4e3708
Signed by: kjuulh
GPG Key ID: D85D7535F18F35FA
2 changed files with 29 additions and 14 deletions

2
Cargo.lock generated
View File

@ -266,7 +266,7 @@ dependencies = [
[[package]]
name = "notmad"
version = "0.4.0"
version = "0.5.0"
dependencies = [
"anyhow",
"async-trait",

View File

@ -154,16 +154,38 @@ impl Mad {
res = comp.run(job_cancellation) => {
error_tx.send(CompletionResult { res , name }).await
}
_ = tokio::signal::ctrl_c() => {
error_tx.send(CompletionResult { res: Ok(()) , name }).await
}
_ = signal_unix_terminate() => {
error_tx.send(CompletionResult { res: Ok(()) , name }).await
}
}
});
}
tokio::spawn({
let cancellation_token = cancellation_token.child_token();
let wait_cancel = self.should_cancel;
async move {
let should_cancel =
|cancel: CancellationToken, wait: Option<std::time::Duration>| async move {
if let Some(cancel_wait) = wait {
tokio::time::sleep(cancel_wait).await;
cancel.cancel();
}
};
tokio::select! {
_ = cancellation_token.cancelled() => {
job_cancellation.cancel();
}
_ = tokio::signal::ctrl_c() => {
should_cancel(job_cancellation, wait_cancel).await;
}
_ = signal_unix_terminate() => {
should_cancel(job_cancellation, wait_cancel).await;
}
}
}
});
let mut futures = FuturesUnordered::new();
for channel in channels.iter_mut() {
futures.push(channel.recv());
@ -182,13 +204,6 @@ impl Mad {
tracing::debug!(component = msg.name, "component ran to completion");
}
}
job_cancellation.cancel();
if let Some(cancel_wait) = self.should_cancel {
tokio::time::sleep(cancel_wait).await;
cancellation_token.cancel();
}
}
tracing::debug!("ran components");