Compare commits
8 Commits
Author | SHA1 | Date | |
---|---|---|---|
b78423377c | |||
1446f4c3cf | |||
8a80480d94 | |||
b7b2992730 | |||
61cbec0477 | |||
10e2739b6e | |||
ffa3efd99c | |||
7c08d7a5df |
16
CHANGELOG.md
16
CHANGELOG.md
@ -6,6 +6,22 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
|
||||
## [Unreleased]
|
||||
|
||||
## [0.4.0] - 2024-08-07
|
||||
|
||||
### Added
|
||||
- add correction
|
||||
- add small docs
|
||||
|
||||
## [0.3.0] - 2024-08-07
|
||||
|
||||
### Added
|
||||
- add add_fn to execute immediate lambdas
|
||||
|
||||
## [0.2.1] - 2024-08-07
|
||||
|
||||
### Docs
|
||||
- add a small readme
|
||||
|
||||
## [0.2.0] - 2024-08-07
|
||||
|
||||
### Added
|
||||
|
3
Cargo.lock
generated
3
Cargo.lock
generated
@ -236,7 +236,7 @@ checksum = "a7a70ba024b9dc04c27ea2f0c0548feb474ec5c54bba33a7f72f873a39d07b24"
|
||||
|
||||
[[package]]
|
||||
name = "mad"
|
||||
version = "0.1.0"
|
||||
version = "0.3.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"async-trait",
|
||||
@ -247,6 +247,7 @@ dependencies = [
|
||||
"tokio",
|
||||
"tokio-util",
|
||||
"tracing",
|
||||
"tracing-subscriber",
|
||||
"tracing-test",
|
||||
]
|
||||
|
||||
|
@ -3,7 +3,7 @@ members = ["crates/*"]
|
||||
resolver = "2"
|
||||
|
||||
[workspace.package]
|
||||
version = "0.2.0"
|
||||
version = "0.4.0"
|
||||
|
||||
[workspace.dependencies]
|
||||
mad = { path = "crates/mad" }
|
||||
|
51
README.md
Normal file
51
README.md
Normal file
@ -0,0 +1,51 @@
|
||||
# MAD
|
||||
|
||||
Mad is a life-cycle manager for long running rust operations.
|
||||
|
||||
- Webservers
|
||||
- Queue bindings
|
||||
- gRPC servers etc
|
||||
- Cron runners
|
||||
|
||||
It is supposed to be the main thing the application runs, and everything from it is spawned and managed by it.
|
||||
|
||||
```rust
|
||||
struct WaitServer {}
|
||||
|
||||
#[async_trait]
|
||||
impl Component for WaitServer {
|
||||
fn name(&self) -> Option<String> {
|
||||
Some("NeverEndingRun".into())
|
||||
}
|
||||
|
||||
async fn run(&self, cancellation: CancellationToken) -> Result<(), mad::MadError> {
|
||||
let millis_wait = rand::thread_rng().gen_range(50..1000);
|
||||
|
||||
// Simulates a server running for some time. Is normally supposed to be futures blocking indefinitely
|
||||
tokio::time::sleep(std::time::Duration::from_millis(millis_wait)).await;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
Mad::builder()
|
||||
.add(WaitServer {})
|
||||
.add(WaitServer {})
|
||||
.add(WaitServer {})
|
||||
.run()
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
```
|
||||
|
||||
## Examples
|
||||
|
||||
Can be found (here)[crates/mad/examples]
|
||||
|
||||
- basic
|
||||
- fn
|
||||
- signals
|
||||
- error_log
|
@ -15,4 +15,5 @@ tokio-util = "0.7.11"
|
||||
tracing.workspace = true
|
||||
|
||||
[dev-dependencies]
|
||||
tracing-subscriber = "0.3.18"
|
||||
tracing-test = { version = "0.2.5", features = ["no-env-filter"] }
|
||||
|
40
crates/mad/examples/basic/main.rs
Normal file
40
crates/mad/examples/basic/main.rs
Normal file
@ -0,0 +1,40 @@
|
||||
use async_trait::async_trait;
|
||||
use rand::Rng;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::Level;
|
||||
|
||||
struct WaitServer {}
|
||||
#[async_trait]
|
||||
impl mad::Component for WaitServer {
|
||||
fn name(&self) -> Option<String> {
|
||||
Some("WaitServer".into())
|
||||
}
|
||||
|
||||
async fn run(&self, cancellation: CancellationToken) -> Result<(), mad::MadError> {
|
||||
let millis_wait = rand::thread_rng().gen_range(500..3000);
|
||||
|
||||
tracing::debug!("waiting: {}ms", millis_wait);
|
||||
|
||||
// Simulates a server running for some time. Is normally supposed to be futures blocking indefinitely
|
||||
tokio::time::sleep(std::time::Duration::from_millis(millis_wait)).await;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
tracing_subscriber::fmt()
|
||||
.with_max_level(Level::TRACE)
|
||||
.init();
|
||||
|
||||
mad::Mad::builder()
|
||||
.add(WaitServer {})
|
||||
.add(WaitServer {})
|
||||
.add(WaitServer {})
|
||||
.add(WaitServer {})
|
||||
.run()
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
42
crates/mad/examples/error_log/main.rs
Normal file
42
crates/mad/examples/error_log/main.rs
Normal file
@ -0,0 +1,42 @@
|
||||
use async_trait::async_trait;
|
||||
use rand::Rng;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::Level;
|
||||
|
||||
struct ErrorServer {}
|
||||
#[async_trait]
|
||||
impl mad::Component for ErrorServer {
|
||||
fn name(&self) -> Option<String> {
|
||||
Some("ErrorServer".into())
|
||||
}
|
||||
|
||||
async fn run(&self, cancellation: CancellationToken) -> Result<(), mad::MadError> {
|
||||
let millis_wait = rand::thread_rng().gen_range(500..3000);
|
||||
|
||||
tracing::debug!("waiting: {}ms", millis_wait);
|
||||
|
||||
// Simulates a server running for some time. Is normally supposed to be futures blocking indefinitely
|
||||
tokio::time::sleep(std::time::Duration::from_millis(millis_wait)).await;
|
||||
|
||||
Err(mad::MadError::Inner(anyhow::anyhow!("expected error")))
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
tracing_subscriber::fmt()
|
||||
.with_max_level(Level::TRACE)
|
||||
.init();
|
||||
|
||||
// Do note that only the first server which returns an error is guaranteed to be handled. This is because if servers don't respect cancellation, they will be dropped
|
||||
|
||||
mad::Mad::builder()
|
||||
.add(ErrorServer {})
|
||||
.add(ErrorServer {})
|
||||
.add(ErrorServer {})
|
||||
.add(ErrorServer {})
|
||||
.run()
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
67
crates/mad/examples/fn/main.rs
Normal file
67
crates/mad/examples/fn/main.rs
Normal file
@ -0,0 +1,67 @@
|
||||
use async_trait::async_trait;
|
||||
use rand::Rng;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::Level;
|
||||
|
||||
struct WaitServer {}
|
||||
#[async_trait]
|
||||
impl mad::Component for WaitServer {
|
||||
fn name(&self) -> Option<String> {
|
||||
Some("WaitServer".into())
|
||||
}
|
||||
|
||||
async fn run(&self, _cancellation: CancellationToken) -> Result<(), mad::MadError> {
|
||||
let millis_wait = rand::thread_rng().gen_range(500..3000);
|
||||
|
||||
tracing::debug!("waiting: {}ms", millis_wait);
|
||||
|
||||
// Simulates a server running for some time. Is normally supposed to be futures blocking indefinitely
|
||||
tokio::time::sleep(std::time::Duration::from_millis(millis_wait)).await;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
tracing_subscriber::fmt()
|
||||
.with_max_level(Level::TRACE)
|
||||
.init();
|
||||
|
||||
let item = "some item".to_string();
|
||||
|
||||
mad::Mad::builder()
|
||||
.add(WaitServer {})
|
||||
.add_fn(|_cancel| async move {
|
||||
let millis_wait = 50;
|
||||
|
||||
tracing::debug!("waiting: {}ms", millis_wait);
|
||||
|
||||
// Simulates a server running for some time. Is normally supposed to be futures blocking indefinitely
|
||||
tokio::time::sleep(std::time::Duration::from_millis(millis_wait)).await;
|
||||
|
||||
Ok(())
|
||||
})
|
||||
.add_fn(move |_cancel| {
|
||||
// I am an actual closure
|
||||
|
||||
let item = item.clone();
|
||||
|
||||
async move {
|
||||
let _item = item;
|
||||
|
||||
let millis_wait = 50;
|
||||
|
||||
tracing::debug!("waiting: {}ms", millis_wait);
|
||||
|
||||
// Simulates a server running for some time. Is normally supposed to be futures blocking indefinitely
|
||||
tokio::time::sleep(std::time::Duration::from_millis(millis_wait)).await;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
})
|
||||
.run()
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
69
crates/mad/examples/signals/main.rs
Normal file
69
crates/mad/examples/signals/main.rs
Normal file
@ -0,0 +1,69 @@
|
||||
use async_trait::async_trait;
|
||||
use rand::Rng;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::Level;
|
||||
|
||||
struct WaitServer {}
|
||||
#[async_trait]
|
||||
impl mad::Component for WaitServer {
|
||||
fn name(&self) -> Option<String> {
|
||||
Some("WaitServer".into())
|
||||
}
|
||||
|
||||
async fn run(&self, cancellation: CancellationToken) -> Result<(), mad::MadError> {
|
||||
let millis_wait = rand::thread_rng().gen_range(500..3000);
|
||||
|
||||
tracing::debug!("waiting: {}ms", millis_wait);
|
||||
|
||||
// Simulates a server running for some time. Is normally supposed to be futures blocking indefinitely
|
||||
tokio::time::sleep(std::time::Duration::from_millis(millis_wait)).await;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
struct RespectCancel {}
|
||||
#[async_trait]
|
||||
impl mad::Component for RespectCancel {
|
||||
fn name(&self) -> Option<String> {
|
||||
Some("RespectCancel".into())
|
||||
}
|
||||
|
||||
async fn run(&self, cancellation: CancellationToken) -> Result<(), mad::MadError> {
|
||||
cancellation.cancelled().await;
|
||||
tracing::debug!("stopping because job is cancelled");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
struct NeverStopServer {}
|
||||
#[async_trait]
|
||||
impl mad::Component for NeverStopServer {
|
||||
fn name(&self) -> Option<String> {
|
||||
Some("NeverStopServer".into())
|
||||
}
|
||||
|
||||
async fn run(&self, cancellation: CancellationToken) -> Result<(), mad::MadError> {
|
||||
// Simulates a server running for some time. Is normally supposed to be futures blocking indefinitely
|
||||
tokio::time::sleep(std::time::Duration::from_millis(999999999)).await;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
tracing_subscriber::fmt()
|
||||
.with_max_level(Level::TRACE)
|
||||
.init();
|
||||
|
||||
mad::Mad::builder()
|
||||
.add(WaitServer {})
|
||||
.add(NeverStopServer {})
|
||||
.add(RespectCancel {})
|
||||
.run()
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
@ -49,6 +49,11 @@ pub struct Mad {
|
||||
should_cancel: Option<std::time::Duration>,
|
||||
}
|
||||
|
||||
struct CompletionResult {
|
||||
res: Result<(), MadError>,
|
||||
name: Option<String>,
|
||||
}
|
||||
|
||||
impl Mad {
|
||||
pub fn builder() -> Self {
|
||||
Self {
|
||||
@ -64,6 +69,16 @@ impl Mad {
|
||||
self
|
||||
}
|
||||
|
||||
pub fn add_fn<F, Fut>(&mut self, f: F) -> &mut Self
|
||||
where
|
||||
F: Fn(CancellationToken) -> Fut + Send + Sync + 'static,
|
||||
Fut: futures::Future<Output = Result<(), MadError>> + Send + 'static,
|
||||
{
|
||||
let comp = ClosureComponent { inner: Box::new(f) };
|
||||
|
||||
self.add(comp)
|
||||
}
|
||||
|
||||
pub fn cancellation(&mut self, should_cancel: Option<std::time::Duration>) -> &mut Self {
|
||||
self.should_cancel = should_cancel;
|
||||
|
||||
@ -79,6 +94,7 @@ impl Mad {
|
||||
|
||||
let close_result = self.close_components().await;
|
||||
|
||||
tracing::info!("mad is closed down");
|
||||
match (run_result, close_result) {
|
||||
(Err(run), Err(close)) => {
|
||||
return Err(MadError::AggregateError(AggregateError {
|
||||
@ -122,22 +138,23 @@ impl Mad {
|
||||
let cancellation_token = cancellation_token.child_token();
|
||||
let job_cancellation = job_cancellation.child_token();
|
||||
|
||||
let (error_tx, error_rx) = tokio::sync::mpsc::channel::<Result<(), MadError>>(1);
|
||||
let (error_tx, error_rx) = tokio::sync::mpsc::channel::<CompletionResult>(1);
|
||||
channels.push(error_rx);
|
||||
|
||||
tokio::spawn(async move {
|
||||
tracing::debug!(component = &comp.name(), "mad running");
|
||||
let name = comp.name().clone();
|
||||
|
||||
tracing::debug!(component = name, "mad running");
|
||||
|
||||
tokio::select! {
|
||||
_ = cancellation_token.cancelled() => {
|
||||
error_tx.send(Ok(())).await
|
||||
error_tx.send(CompletionResult { res: Ok(()) , name }).await
|
||||
}
|
||||
res = comp.run(job_cancellation) => {
|
||||
error_tx.send(res).await
|
||||
|
||||
error_tx.send(CompletionResult { res , name }).await
|
||||
}
|
||||
_ = tokio::signal::ctrl_c() => {
|
||||
error_tx.send(Ok(())).await
|
||||
error_tx.send(CompletionResult { res: Ok(()) , name }).await
|
||||
}
|
||||
}
|
||||
});
|
||||
@ -149,17 +166,24 @@ impl Mad {
|
||||
}
|
||||
|
||||
while let Some(Some(msg)) = futures.next().await {
|
||||
tracing::trace!("received end signal from a component");
|
||||
|
||||
if let Err(e) = msg {
|
||||
tracing::debug!(error = e.to_string(), "stopping running components");
|
||||
job_cancellation.cancel();
|
||||
|
||||
if let Some(cancel_wait) = self.should_cancel {
|
||||
tokio::time::sleep(cancel_wait).await;
|
||||
|
||||
cancellation_token.cancel();
|
||||
match msg.res {
|
||||
Err(e) => {
|
||||
tracing::debug!(
|
||||
error = e.to_string(),
|
||||
component = msg.name,
|
||||
"component ran to completion with error"
|
||||
);
|
||||
}
|
||||
Ok(_) => {
|
||||
tracing::debug!(component = msg.name, "component ran to completion");
|
||||
}
|
||||
}
|
||||
|
||||
job_cancellation.cancel();
|
||||
if let Some(cancel_wait) = self.should_cancel {
|
||||
tokio::time::sleep(cancel_wait).await;
|
||||
|
||||
cancellation_token.cancel();
|
||||
}
|
||||
}
|
||||
|
||||
@ -211,3 +235,34 @@ impl<T: Component + Send + Sync + 'static> IntoComponent for T {
|
||||
Arc::new(self)
|
||||
}
|
||||
}
|
||||
|
||||
struct ClosureComponent<F, Fut>
|
||||
where
|
||||
F: Fn(CancellationToken) -> Fut + Send + Sync + 'static,
|
||||
Fut: futures::Future<Output = Result<(), MadError>> + Send + 'static,
|
||||
{
|
||||
inner: Box<F>,
|
||||
}
|
||||
|
||||
impl<F, Fut> ClosureComponent<F, Fut>
|
||||
where
|
||||
F: Fn(CancellationToken) -> Fut + Send + Sync + 'static,
|
||||
Fut: futures::Future<Output = Result<(), MadError>> + Send + 'static,
|
||||
{
|
||||
pub async fn execute(&self, cancellation_token: CancellationToken) -> Result<(), MadError> {
|
||||
(*self.inner)(cancellation_token).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl<F, Fut> Component for ClosureComponent<F, Fut>
|
||||
where
|
||||
F: Fn(CancellationToken) -> Fut + Send + Sync + 'static,
|
||||
Fut: futures::Future<Output = Result<(), MadError>> + Send + 'static,
|
||||
{
|
||||
async fn run(&self, cancellation_token: CancellationToken) -> Result<(), MadError> {
|
||||
self.execute(cancellation_token).await
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user