Compare commits
No commits in common. "main" and "v0.1.0" have entirely different histories.
21
CHANGELOG.md
21
CHANGELOG.md
@ -6,27 +6,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
|
||||
## [Unreleased]
|
||||
|
||||
## [0.4.0] - 2024-08-07
|
||||
|
||||
### Added
|
||||
- add correction
|
||||
- add small docs
|
||||
|
||||
## [0.3.0] - 2024-08-07
|
||||
|
||||
### Added
|
||||
- add add_fn to execute immediate lambdas
|
||||
|
||||
## [0.2.1] - 2024-08-07
|
||||
|
||||
### Docs
|
||||
- add a small readme
|
||||
|
||||
## [0.2.0] - 2024-08-07
|
||||
|
||||
### Added
|
||||
- with ctrl-c signal
|
||||
|
||||
## [0.1.0] - 2024-08-07
|
||||
|
||||
### Added
|
||||
|
3
Cargo.lock
generated
3
Cargo.lock
generated
@ -236,7 +236,7 @@ checksum = "a7a70ba024b9dc04c27ea2f0c0548feb474ec5c54bba33a7f72f873a39d07b24"
|
||||
|
||||
[[package]]
|
||||
name = "mad"
|
||||
version = "0.3.0"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"async-trait",
|
||||
@ -247,7 +247,6 @@ dependencies = [
|
||||
"tokio",
|
||||
"tokio-util",
|
||||
"tracing",
|
||||
"tracing-subscriber",
|
||||
"tracing-test",
|
||||
]
|
||||
|
||||
|
@ -3,7 +3,7 @@ members = ["crates/*"]
|
||||
resolver = "2"
|
||||
|
||||
[workspace.package]
|
||||
version = "0.4.0"
|
||||
version = "0.1.0"
|
||||
|
||||
[workspace.dependencies]
|
||||
mad = { path = "crates/mad" }
|
||||
|
51
README.md
51
README.md
@ -1,51 +0,0 @@
|
||||
# MAD
|
||||
|
||||
Mad is a life-cycle manager for long running rust operations.
|
||||
|
||||
- Webservers
|
||||
- Queue bindings
|
||||
- gRPC servers etc
|
||||
- Cron runners
|
||||
|
||||
It is supposed to be the main thing the application runs, and everything from it is spawned and managed by it.
|
||||
|
||||
```rust
|
||||
struct WaitServer {}
|
||||
|
||||
#[async_trait]
|
||||
impl Component for WaitServer {
|
||||
fn name(&self) -> Option<String> {
|
||||
Some("NeverEndingRun".into())
|
||||
}
|
||||
|
||||
async fn run(&self, cancellation: CancellationToken) -> Result<(), mad::MadError> {
|
||||
let millis_wait = rand::thread_rng().gen_range(50..1000);
|
||||
|
||||
// Simulates a server running for some time. Is normally supposed to be futures blocking indefinitely
|
||||
tokio::time::sleep(std::time::Duration::from_millis(millis_wait)).await;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
Mad::builder()
|
||||
.add(WaitServer {})
|
||||
.add(WaitServer {})
|
||||
.add(WaitServer {})
|
||||
.run()
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
```
|
||||
|
||||
## Examples
|
||||
|
||||
Can be found (here)[crates/mad/examples]
|
||||
|
||||
- basic
|
||||
- fn
|
||||
- signals
|
||||
- error_log
|
@ -15,5 +15,4 @@ tokio-util = "0.7.11"
|
||||
tracing.workspace = true
|
||||
|
||||
[dev-dependencies]
|
||||
tracing-subscriber = "0.3.18"
|
||||
tracing-test = { version = "0.2.5", features = ["no-env-filter"] }
|
||||
|
@ -1,40 +0,0 @@
|
||||
use async_trait::async_trait;
|
||||
use rand::Rng;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::Level;
|
||||
|
||||
struct WaitServer {}
|
||||
#[async_trait]
|
||||
impl mad::Component for WaitServer {
|
||||
fn name(&self) -> Option<String> {
|
||||
Some("WaitServer".into())
|
||||
}
|
||||
|
||||
async fn run(&self, cancellation: CancellationToken) -> Result<(), mad::MadError> {
|
||||
let millis_wait = rand::thread_rng().gen_range(500..3000);
|
||||
|
||||
tracing::debug!("waiting: {}ms", millis_wait);
|
||||
|
||||
// Simulates a server running for some time. Is normally supposed to be futures blocking indefinitely
|
||||
tokio::time::sleep(std::time::Duration::from_millis(millis_wait)).await;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
tracing_subscriber::fmt()
|
||||
.with_max_level(Level::TRACE)
|
||||
.init();
|
||||
|
||||
mad::Mad::builder()
|
||||
.add(WaitServer {})
|
||||
.add(WaitServer {})
|
||||
.add(WaitServer {})
|
||||
.add(WaitServer {})
|
||||
.run()
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
@ -1,42 +0,0 @@
|
||||
use async_trait::async_trait;
|
||||
use rand::Rng;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::Level;
|
||||
|
||||
struct ErrorServer {}
|
||||
#[async_trait]
|
||||
impl mad::Component for ErrorServer {
|
||||
fn name(&self) -> Option<String> {
|
||||
Some("ErrorServer".into())
|
||||
}
|
||||
|
||||
async fn run(&self, cancellation: CancellationToken) -> Result<(), mad::MadError> {
|
||||
let millis_wait = rand::thread_rng().gen_range(500..3000);
|
||||
|
||||
tracing::debug!("waiting: {}ms", millis_wait);
|
||||
|
||||
// Simulates a server running for some time. Is normally supposed to be futures blocking indefinitely
|
||||
tokio::time::sleep(std::time::Duration::from_millis(millis_wait)).await;
|
||||
|
||||
Err(mad::MadError::Inner(anyhow::anyhow!("expected error")))
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
tracing_subscriber::fmt()
|
||||
.with_max_level(Level::TRACE)
|
||||
.init();
|
||||
|
||||
// Do note that only the first server which returns an error is guaranteed to be handled. This is because if servers don't respect cancellation, they will be dropped
|
||||
|
||||
mad::Mad::builder()
|
||||
.add(ErrorServer {})
|
||||
.add(ErrorServer {})
|
||||
.add(ErrorServer {})
|
||||
.add(ErrorServer {})
|
||||
.run()
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
@ -1,67 +0,0 @@
|
||||
use async_trait::async_trait;
|
||||
use rand::Rng;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::Level;
|
||||
|
||||
struct WaitServer {}
|
||||
#[async_trait]
|
||||
impl mad::Component for WaitServer {
|
||||
fn name(&self) -> Option<String> {
|
||||
Some("WaitServer".into())
|
||||
}
|
||||
|
||||
async fn run(&self, _cancellation: CancellationToken) -> Result<(), mad::MadError> {
|
||||
let millis_wait = rand::thread_rng().gen_range(500..3000);
|
||||
|
||||
tracing::debug!("waiting: {}ms", millis_wait);
|
||||
|
||||
// Simulates a server running for some time. Is normally supposed to be futures blocking indefinitely
|
||||
tokio::time::sleep(std::time::Duration::from_millis(millis_wait)).await;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
tracing_subscriber::fmt()
|
||||
.with_max_level(Level::TRACE)
|
||||
.init();
|
||||
|
||||
let item = "some item".to_string();
|
||||
|
||||
mad::Mad::builder()
|
||||
.add(WaitServer {})
|
||||
.add_fn(|_cancel| async move {
|
||||
let millis_wait = 50;
|
||||
|
||||
tracing::debug!("waiting: {}ms", millis_wait);
|
||||
|
||||
// Simulates a server running for some time. Is normally supposed to be futures blocking indefinitely
|
||||
tokio::time::sleep(std::time::Duration::from_millis(millis_wait)).await;
|
||||
|
||||
Ok(())
|
||||
})
|
||||
.add_fn(move |_cancel| {
|
||||
// I am an actual closure
|
||||
|
||||
let item = item.clone();
|
||||
|
||||
async move {
|
||||
let _item = item;
|
||||
|
||||
let millis_wait = 50;
|
||||
|
||||
tracing::debug!("waiting: {}ms", millis_wait);
|
||||
|
||||
// Simulates a server running for some time. Is normally supposed to be futures blocking indefinitely
|
||||
tokio::time::sleep(std::time::Duration::from_millis(millis_wait)).await;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
})
|
||||
.run()
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
@ -1,69 +0,0 @@
|
||||
use async_trait::async_trait;
|
||||
use rand::Rng;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::Level;
|
||||
|
||||
struct WaitServer {}
|
||||
#[async_trait]
|
||||
impl mad::Component for WaitServer {
|
||||
fn name(&self) -> Option<String> {
|
||||
Some("WaitServer".into())
|
||||
}
|
||||
|
||||
async fn run(&self, cancellation: CancellationToken) -> Result<(), mad::MadError> {
|
||||
let millis_wait = rand::thread_rng().gen_range(500..3000);
|
||||
|
||||
tracing::debug!("waiting: {}ms", millis_wait);
|
||||
|
||||
// Simulates a server running for some time. Is normally supposed to be futures blocking indefinitely
|
||||
tokio::time::sleep(std::time::Duration::from_millis(millis_wait)).await;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
struct RespectCancel {}
|
||||
#[async_trait]
|
||||
impl mad::Component for RespectCancel {
|
||||
fn name(&self) -> Option<String> {
|
||||
Some("RespectCancel".into())
|
||||
}
|
||||
|
||||
async fn run(&self, cancellation: CancellationToken) -> Result<(), mad::MadError> {
|
||||
cancellation.cancelled().await;
|
||||
tracing::debug!("stopping because job is cancelled");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
struct NeverStopServer {}
|
||||
#[async_trait]
|
||||
impl mad::Component for NeverStopServer {
|
||||
fn name(&self) -> Option<String> {
|
||||
Some("NeverStopServer".into())
|
||||
}
|
||||
|
||||
async fn run(&self, cancellation: CancellationToken) -> Result<(), mad::MadError> {
|
||||
// Simulates a server running for some time. Is normally supposed to be futures blocking indefinitely
|
||||
tokio::time::sleep(std::time::Duration::from_millis(999999999)).await;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
tracing_subscriber::fmt()
|
||||
.with_max_level(Level::TRACE)
|
||||
.init();
|
||||
|
||||
mad::Mad::builder()
|
||||
.add(WaitServer {})
|
||||
.add(NeverStopServer {})
|
||||
.add(RespectCancel {})
|
||||
.run()
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
@ -49,11 +49,6 @@ pub struct Mad {
|
||||
should_cancel: Option<std::time::Duration>,
|
||||
}
|
||||
|
||||
struct CompletionResult {
|
||||
res: Result<(), MadError>,
|
||||
name: Option<String>,
|
||||
}
|
||||
|
||||
impl Mad {
|
||||
pub fn builder() -> Self {
|
||||
Self {
|
||||
@ -69,16 +64,6 @@ impl Mad {
|
||||
self
|
||||
}
|
||||
|
||||
pub fn add_fn<F, Fut>(&mut self, f: F) -> &mut Self
|
||||
where
|
||||
F: Fn(CancellationToken) -> Fut + Send + Sync + 'static,
|
||||
Fut: futures::Future<Output = Result<(), MadError>> + Send + 'static,
|
||||
{
|
||||
let comp = ClosureComponent { inner: Box::new(f) };
|
||||
|
||||
self.add(comp)
|
||||
}
|
||||
|
||||
pub fn cancellation(&mut self, should_cancel: Option<std::time::Duration>) -> &mut Self {
|
||||
self.should_cancel = should_cancel;
|
||||
|
||||
@ -94,7 +79,6 @@ impl Mad {
|
||||
|
||||
let close_result = self.close_components().await;
|
||||
|
||||
tracing::info!("mad is closed down");
|
||||
match (run_result, close_result) {
|
||||
(Err(run), Err(close)) => {
|
||||
return Err(MadError::AggregateError(AggregateError {
|
||||
@ -138,23 +122,18 @@ impl Mad {
|
||||
let cancellation_token = cancellation_token.child_token();
|
||||
let job_cancellation = job_cancellation.child_token();
|
||||
|
||||
let (error_tx, error_rx) = tokio::sync::mpsc::channel::<CompletionResult>(1);
|
||||
let (error_tx, error_rx) = tokio::sync::mpsc::channel::<Result<(), MadError>>(1);
|
||||
channels.push(error_rx);
|
||||
|
||||
tokio::spawn(async move {
|
||||
let name = comp.name().clone();
|
||||
|
||||
tracing::debug!(component = name, "mad running");
|
||||
tracing::debug!(component = &comp.name(), "mad running");
|
||||
|
||||
tokio::select! {
|
||||
_ = cancellation_token.cancelled() => {
|
||||
error_tx.send(CompletionResult { res: Ok(()) , name }).await
|
||||
error_tx.send(Ok(())).await
|
||||
}
|
||||
res = comp.run(job_cancellation) => {
|
||||
error_tx.send(CompletionResult { res , name }).await
|
||||
}
|
||||
_ = tokio::signal::ctrl_c() => {
|
||||
error_tx.send(CompletionResult { res: Ok(()) , name }).await
|
||||
error_tx.send(res).await
|
||||
}
|
||||
}
|
||||
});
|
||||
@ -166,24 +145,17 @@ impl Mad {
|
||||
}
|
||||
|
||||
while let Some(Some(msg)) = futures.next().await {
|
||||
match msg.res {
|
||||
Err(e) => {
|
||||
tracing::debug!(
|
||||
error = e.to_string(),
|
||||
component = msg.name,
|
||||
"component ran to completion with error"
|
||||
);
|
||||
}
|
||||
Ok(_) => {
|
||||
tracing::debug!(component = msg.name, "component ran to completion");
|
||||
}
|
||||
}
|
||||
tracing::trace!("received end signal from a component");
|
||||
|
||||
job_cancellation.cancel();
|
||||
if let Some(cancel_wait) = self.should_cancel {
|
||||
tokio::time::sleep(cancel_wait).await;
|
||||
if let Err(e) = msg {
|
||||
tracing::debug!(error = e.to_string(), "stopping running components");
|
||||
job_cancellation.cancel();
|
||||
|
||||
cancellation_token.cancel();
|
||||
if let Some(cancel_wait) = self.should_cancel {
|
||||
tokio::time::sleep(cancel_wait).await;
|
||||
|
||||
cancellation_token.cancel();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -235,34 +207,3 @@ impl<T: Component + Send + Sync + 'static> IntoComponent for T {
|
||||
Arc::new(self)
|
||||
}
|
||||
}
|
||||
|
||||
struct ClosureComponent<F, Fut>
|
||||
where
|
||||
F: Fn(CancellationToken) -> Fut + Send + Sync + 'static,
|
||||
Fut: futures::Future<Output = Result<(), MadError>> + Send + 'static,
|
||||
{
|
||||
inner: Box<F>,
|
||||
}
|
||||
|
||||
impl<F, Fut> ClosureComponent<F, Fut>
|
||||
where
|
||||
F: Fn(CancellationToken) -> Fut + Send + Sync + 'static,
|
||||
Fut: futures::Future<Output = Result<(), MadError>> + Send + 'static,
|
||||
{
|
||||
pub async fn execute(&self, cancellation_token: CancellationToken) -> Result<(), MadError> {
|
||||
(*self.inner)(cancellation_token).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl<F, Fut> Component for ClosureComponent<F, Fut>
|
||||
where
|
||||
F: Fn(CancellationToken) -> Fut + Send + Sync + 'static,
|
||||
Fut: futures::Future<Output = Result<(), MadError>> + Send + 'static,
|
||||
{
|
||||
async fn run(&self, cancellation_token: CancellationToken) -> Result<(), MadError> {
|
||||
self.execute(cancellation_token).await
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user