Compare commits
No commits in common. "main" and "v0.1.0" have entirely different histories.
21
CHANGELOG.md
21
CHANGELOG.md
@ -6,27 +6,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
|||||||
|
|
||||||
## [Unreleased]
|
## [Unreleased]
|
||||||
|
|
||||||
## [0.4.0] - 2024-08-07
|
|
||||||
|
|
||||||
### Added
|
|
||||||
- add correction
|
|
||||||
- add small docs
|
|
||||||
|
|
||||||
## [0.3.0] - 2024-08-07
|
|
||||||
|
|
||||||
### Added
|
|
||||||
- add add_fn to execute immediate lambdas
|
|
||||||
|
|
||||||
## [0.2.1] - 2024-08-07
|
|
||||||
|
|
||||||
### Docs
|
|
||||||
- add a small readme
|
|
||||||
|
|
||||||
## [0.2.0] - 2024-08-07
|
|
||||||
|
|
||||||
### Added
|
|
||||||
- with ctrl-c signal
|
|
||||||
|
|
||||||
## [0.1.0] - 2024-08-07
|
## [0.1.0] - 2024-08-07
|
||||||
|
|
||||||
### Added
|
### Added
|
||||||
|
3
Cargo.lock
generated
3
Cargo.lock
generated
@ -236,7 +236,7 @@ checksum = "a7a70ba024b9dc04c27ea2f0c0548feb474ec5c54bba33a7f72f873a39d07b24"
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "mad"
|
name = "mad"
|
||||||
version = "0.3.0"
|
version = "0.1.0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"anyhow",
|
"anyhow",
|
||||||
"async-trait",
|
"async-trait",
|
||||||
@ -247,7 +247,6 @@ dependencies = [
|
|||||||
"tokio",
|
"tokio",
|
||||||
"tokio-util",
|
"tokio-util",
|
||||||
"tracing",
|
"tracing",
|
||||||
"tracing-subscriber",
|
|
||||||
"tracing-test",
|
"tracing-test",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
@ -3,7 +3,7 @@ members = ["crates/*"]
|
|||||||
resolver = "2"
|
resolver = "2"
|
||||||
|
|
||||||
[workspace.package]
|
[workspace.package]
|
||||||
version = "0.4.0"
|
version = "0.1.0"
|
||||||
|
|
||||||
[workspace.dependencies]
|
[workspace.dependencies]
|
||||||
mad = { path = "crates/mad" }
|
mad = { path = "crates/mad" }
|
||||||
|
51
README.md
51
README.md
@ -1,51 +0,0 @@
|
|||||||
# MAD
|
|
||||||
|
|
||||||
Mad is a life-cycle manager for long running rust operations.
|
|
||||||
|
|
||||||
- Webservers
|
|
||||||
- Queue bindings
|
|
||||||
- gRPC servers etc
|
|
||||||
- Cron runners
|
|
||||||
|
|
||||||
It is supposed to be the main thing the application runs, and everything from it is spawned and managed by it.
|
|
||||||
|
|
||||||
```rust
|
|
||||||
struct WaitServer {}
|
|
||||||
|
|
||||||
#[async_trait]
|
|
||||||
impl Component for WaitServer {
|
|
||||||
fn name(&self) -> Option<String> {
|
|
||||||
Some("NeverEndingRun".into())
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn run(&self, cancellation: CancellationToken) -> Result<(), mad::MadError> {
|
|
||||||
let millis_wait = rand::thread_rng().gen_range(50..1000);
|
|
||||||
|
|
||||||
// Simulates a server running for some time. Is normally supposed to be futures blocking indefinitely
|
|
||||||
tokio::time::sleep(std::time::Duration::from_millis(millis_wait)).await;
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::main]
|
|
||||||
async fn main() -> anyhow::Result<()> {
|
|
||||||
Mad::builder()
|
|
||||||
.add(WaitServer {})
|
|
||||||
.add(WaitServer {})
|
|
||||||
.add(WaitServer {})
|
|
||||||
.run()
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
```
|
|
||||||
|
|
||||||
## Examples
|
|
||||||
|
|
||||||
Can be found (here)[crates/mad/examples]
|
|
||||||
|
|
||||||
- basic
|
|
||||||
- fn
|
|
||||||
- signals
|
|
||||||
- error_log
|
|
@ -15,5 +15,4 @@ tokio-util = "0.7.11"
|
|||||||
tracing.workspace = true
|
tracing.workspace = true
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
tracing-subscriber = "0.3.18"
|
|
||||||
tracing-test = { version = "0.2.5", features = ["no-env-filter"] }
|
tracing-test = { version = "0.2.5", features = ["no-env-filter"] }
|
||||||
|
@ -1,40 +0,0 @@
|
|||||||
use async_trait::async_trait;
|
|
||||||
use rand::Rng;
|
|
||||||
use tokio_util::sync::CancellationToken;
|
|
||||||
use tracing::Level;
|
|
||||||
|
|
||||||
struct WaitServer {}
|
|
||||||
#[async_trait]
|
|
||||||
impl mad::Component for WaitServer {
|
|
||||||
fn name(&self) -> Option<String> {
|
|
||||||
Some("WaitServer".into())
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn run(&self, cancellation: CancellationToken) -> Result<(), mad::MadError> {
|
|
||||||
let millis_wait = rand::thread_rng().gen_range(500..3000);
|
|
||||||
|
|
||||||
tracing::debug!("waiting: {}ms", millis_wait);
|
|
||||||
|
|
||||||
// Simulates a server running for some time. Is normally supposed to be futures blocking indefinitely
|
|
||||||
tokio::time::sleep(std::time::Duration::from_millis(millis_wait)).await;
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::main]
|
|
||||||
async fn main() -> anyhow::Result<()> {
|
|
||||||
tracing_subscriber::fmt()
|
|
||||||
.with_max_level(Level::TRACE)
|
|
||||||
.init();
|
|
||||||
|
|
||||||
mad::Mad::builder()
|
|
||||||
.add(WaitServer {})
|
|
||||||
.add(WaitServer {})
|
|
||||||
.add(WaitServer {})
|
|
||||||
.add(WaitServer {})
|
|
||||||
.run()
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
@ -1,42 +0,0 @@
|
|||||||
use async_trait::async_trait;
|
|
||||||
use rand::Rng;
|
|
||||||
use tokio_util::sync::CancellationToken;
|
|
||||||
use tracing::Level;
|
|
||||||
|
|
||||||
struct ErrorServer {}
|
|
||||||
#[async_trait]
|
|
||||||
impl mad::Component for ErrorServer {
|
|
||||||
fn name(&self) -> Option<String> {
|
|
||||||
Some("ErrorServer".into())
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn run(&self, cancellation: CancellationToken) -> Result<(), mad::MadError> {
|
|
||||||
let millis_wait = rand::thread_rng().gen_range(500..3000);
|
|
||||||
|
|
||||||
tracing::debug!("waiting: {}ms", millis_wait);
|
|
||||||
|
|
||||||
// Simulates a server running for some time. Is normally supposed to be futures blocking indefinitely
|
|
||||||
tokio::time::sleep(std::time::Duration::from_millis(millis_wait)).await;
|
|
||||||
|
|
||||||
Err(mad::MadError::Inner(anyhow::anyhow!("expected error")))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::main]
|
|
||||||
async fn main() -> anyhow::Result<()> {
|
|
||||||
tracing_subscriber::fmt()
|
|
||||||
.with_max_level(Level::TRACE)
|
|
||||||
.init();
|
|
||||||
|
|
||||||
// Do note that only the first server which returns an error is guaranteed to be handled. This is because if servers don't respect cancellation, they will be dropped
|
|
||||||
|
|
||||||
mad::Mad::builder()
|
|
||||||
.add(ErrorServer {})
|
|
||||||
.add(ErrorServer {})
|
|
||||||
.add(ErrorServer {})
|
|
||||||
.add(ErrorServer {})
|
|
||||||
.run()
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
@ -1,67 +0,0 @@
|
|||||||
use async_trait::async_trait;
|
|
||||||
use rand::Rng;
|
|
||||||
use tokio_util::sync::CancellationToken;
|
|
||||||
use tracing::Level;
|
|
||||||
|
|
||||||
struct WaitServer {}
|
|
||||||
#[async_trait]
|
|
||||||
impl mad::Component for WaitServer {
|
|
||||||
fn name(&self) -> Option<String> {
|
|
||||||
Some("WaitServer".into())
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn run(&self, _cancellation: CancellationToken) -> Result<(), mad::MadError> {
|
|
||||||
let millis_wait = rand::thread_rng().gen_range(500..3000);
|
|
||||||
|
|
||||||
tracing::debug!("waiting: {}ms", millis_wait);
|
|
||||||
|
|
||||||
// Simulates a server running for some time. Is normally supposed to be futures blocking indefinitely
|
|
||||||
tokio::time::sleep(std::time::Duration::from_millis(millis_wait)).await;
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::main]
|
|
||||||
async fn main() -> anyhow::Result<()> {
|
|
||||||
tracing_subscriber::fmt()
|
|
||||||
.with_max_level(Level::TRACE)
|
|
||||||
.init();
|
|
||||||
|
|
||||||
let item = "some item".to_string();
|
|
||||||
|
|
||||||
mad::Mad::builder()
|
|
||||||
.add(WaitServer {})
|
|
||||||
.add_fn(|_cancel| async move {
|
|
||||||
let millis_wait = 50;
|
|
||||||
|
|
||||||
tracing::debug!("waiting: {}ms", millis_wait);
|
|
||||||
|
|
||||||
// Simulates a server running for some time. Is normally supposed to be futures blocking indefinitely
|
|
||||||
tokio::time::sleep(std::time::Duration::from_millis(millis_wait)).await;
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
})
|
|
||||||
.add_fn(move |_cancel| {
|
|
||||||
// I am an actual closure
|
|
||||||
|
|
||||||
let item = item.clone();
|
|
||||||
|
|
||||||
async move {
|
|
||||||
let _item = item;
|
|
||||||
|
|
||||||
let millis_wait = 50;
|
|
||||||
|
|
||||||
tracing::debug!("waiting: {}ms", millis_wait);
|
|
||||||
|
|
||||||
// Simulates a server running for some time. Is normally supposed to be futures blocking indefinitely
|
|
||||||
tokio::time::sleep(std::time::Duration::from_millis(millis_wait)).await;
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
})
|
|
||||||
.run()
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
@ -1,69 +0,0 @@
|
|||||||
use async_trait::async_trait;
|
|
||||||
use rand::Rng;
|
|
||||||
use tokio_util::sync::CancellationToken;
|
|
||||||
use tracing::Level;
|
|
||||||
|
|
||||||
struct WaitServer {}
|
|
||||||
#[async_trait]
|
|
||||||
impl mad::Component for WaitServer {
|
|
||||||
fn name(&self) -> Option<String> {
|
|
||||||
Some("WaitServer".into())
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn run(&self, cancellation: CancellationToken) -> Result<(), mad::MadError> {
|
|
||||||
let millis_wait = rand::thread_rng().gen_range(500..3000);
|
|
||||||
|
|
||||||
tracing::debug!("waiting: {}ms", millis_wait);
|
|
||||||
|
|
||||||
// Simulates a server running for some time. Is normally supposed to be futures blocking indefinitely
|
|
||||||
tokio::time::sleep(std::time::Duration::from_millis(millis_wait)).await;
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
struct RespectCancel {}
|
|
||||||
#[async_trait]
|
|
||||||
impl mad::Component for RespectCancel {
|
|
||||||
fn name(&self) -> Option<String> {
|
|
||||||
Some("RespectCancel".into())
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn run(&self, cancellation: CancellationToken) -> Result<(), mad::MadError> {
|
|
||||||
cancellation.cancelled().await;
|
|
||||||
tracing::debug!("stopping because job is cancelled");
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
struct NeverStopServer {}
|
|
||||||
#[async_trait]
|
|
||||||
impl mad::Component for NeverStopServer {
|
|
||||||
fn name(&self) -> Option<String> {
|
|
||||||
Some("NeverStopServer".into())
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn run(&self, cancellation: CancellationToken) -> Result<(), mad::MadError> {
|
|
||||||
// Simulates a server running for some time. Is normally supposed to be futures blocking indefinitely
|
|
||||||
tokio::time::sleep(std::time::Duration::from_millis(999999999)).await;
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::main]
|
|
||||||
async fn main() -> anyhow::Result<()> {
|
|
||||||
tracing_subscriber::fmt()
|
|
||||||
.with_max_level(Level::TRACE)
|
|
||||||
.init();
|
|
||||||
|
|
||||||
mad::Mad::builder()
|
|
||||||
.add(WaitServer {})
|
|
||||||
.add(NeverStopServer {})
|
|
||||||
.add(RespectCancel {})
|
|
||||||
.run()
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
@ -49,11 +49,6 @@ pub struct Mad {
|
|||||||
should_cancel: Option<std::time::Duration>,
|
should_cancel: Option<std::time::Duration>,
|
||||||
}
|
}
|
||||||
|
|
||||||
struct CompletionResult {
|
|
||||||
res: Result<(), MadError>,
|
|
||||||
name: Option<String>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Mad {
|
impl Mad {
|
||||||
pub fn builder() -> Self {
|
pub fn builder() -> Self {
|
||||||
Self {
|
Self {
|
||||||
@ -69,16 +64,6 @@ impl Mad {
|
|||||||
self
|
self
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn add_fn<F, Fut>(&mut self, f: F) -> &mut Self
|
|
||||||
where
|
|
||||||
F: Fn(CancellationToken) -> Fut + Send + Sync + 'static,
|
|
||||||
Fut: futures::Future<Output = Result<(), MadError>> + Send + 'static,
|
|
||||||
{
|
|
||||||
let comp = ClosureComponent { inner: Box::new(f) };
|
|
||||||
|
|
||||||
self.add(comp)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn cancellation(&mut self, should_cancel: Option<std::time::Duration>) -> &mut Self {
|
pub fn cancellation(&mut self, should_cancel: Option<std::time::Duration>) -> &mut Self {
|
||||||
self.should_cancel = should_cancel;
|
self.should_cancel = should_cancel;
|
||||||
|
|
||||||
@ -94,7 +79,6 @@ impl Mad {
|
|||||||
|
|
||||||
let close_result = self.close_components().await;
|
let close_result = self.close_components().await;
|
||||||
|
|
||||||
tracing::info!("mad is closed down");
|
|
||||||
match (run_result, close_result) {
|
match (run_result, close_result) {
|
||||||
(Err(run), Err(close)) => {
|
(Err(run), Err(close)) => {
|
||||||
return Err(MadError::AggregateError(AggregateError {
|
return Err(MadError::AggregateError(AggregateError {
|
||||||
@ -138,23 +122,18 @@ impl Mad {
|
|||||||
let cancellation_token = cancellation_token.child_token();
|
let cancellation_token = cancellation_token.child_token();
|
||||||
let job_cancellation = job_cancellation.child_token();
|
let job_cancellation = job_cancellation.child_token();
|
||||||
|
|
||||||
let (error_tx, error_rx) = tokio::sync::mpsc::channel::<CompletionResult>(1);
|
let (error_tx, error_rx) = tokio::sync::mpsc::channel::<Result<(), MadError>>(1);
|
||||||
channels.push(error_rx);
|
channels.push(error_rx);
|
||||||
|
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
let name = comp.name().clone();
|
tracing::debug!(component = &comp.name(), "mad running");
|
||||||
|
|
||||||
tracing::debug!(component = name, "mad running");
|
|
||||||
|
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
_ = cancellation_token.cancelled() => {
|
_ = cancellation_token.cancelled() => {
|
||||||
error_tx.send(CompletionResult { res: Ok(()) , name }).await
|
error_tx.send(Ok(())).await
|
||||||
}
|
}
|
||||||
res = comp.run(job_cancellation) => {
|
res = comp.run(job_cancellation) => {
|
||||||
error_tx.send(CompletionResult { res , name }).await
|
error_tx.send(res).await
|
||||||
}
|
|
||||||
_ = tokio::signal::ctrl_c() => {
|
|
||||||
error_tx.send(CompletionResult { res: Ok(()) , name }).await
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
@ -166,24 +145,17 @@ impl Mad {
|
|||||||
}
|
}
|
||||||
|
|
||||||
while let Some(Some(msg)) = futures.next().await {
|
while let Some(Some(msg)) = futures.next().await {
|
||||||
match msg.res {
|
tracing::trace!("received end signal from a component");
|
||||||
Err(e) => {
|
|
||||||
tracing::debug!(
|
|
||||||
error = e.to_string(),
|
|
||||||
component = msg.name,
|
|
||||||
"component ran to completion with error"
|
|
||||||
);
|
|
||||||
}
|
|
||||||
Ok(_) => {
|
|
||||||
tracing::debug!(component = msg.name, "component ran to completion");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
job_cancellation.cancel();
|
if let Err(e) = msg {
|
||||||
if let Some(cancel_wait) = self.should_cancel {
|
tracing::debug!(error = e.to_string(), "stopping running components");
|
||||||
tokio::time::sleep(cancel_wait).await;
|
job_cancellation.cancel();
|
||||||
|
|
||||||
cancellation_token.cancel();
|
if let Some(cancel_wait) = self.should_cancel {
|
||||||
|
tokio::time::sleep(cancel_wait).await;
|
||||||
|
|
||||||
|
cancellation_token.cancel();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -235,34 +207,3 @@ impl<T: Component + Send + Sync + 'static> IntoComponent for T {
|
|||||||
Arc::new(self)
|
Arc::new(self)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
struct ClosureComponent<F, Fut>
|
|
||||||
where
|
|
||||||
F: Fn(CancellationToken) -> Fut + Send + Sync + 'static,
|
|
||||||
Fut: futures::Future<Output = Result<(), MadError>> + Send + 'static,
|
|
||||||
{
|
|
||||||
inner: Box<F>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<F, Fut> ClosureComponent<F, Fut>
|
|
||||||
where
|
|
||||||
F: Fn(CancellationToken) -> Fut + Send + Sync + 'static,
|
|
||||||
Fut: futures::Future<Output = Result<(), MadError>> + Send + 'static,
|
|
||||||
{
|
|
||||||
pub async fn execute(&self, cancellation_token: CancellationToken) -> Result<(), MadError> {
|
|
||||||
(*self.inner)(cancellation_token).await?;
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[async_trait::async_trait]
|
|
||||||
impl<F, Fut> Component for ClosureComponent<F, Fut>
|
|
||||||
where
|
|
||||||
F: Fn(CancellationToken) -> Fut + Send + Sync + 'static,
|
|
||||||
Fut: futures::Future<Output = Result<(), MadError>> + Send + 'static,
|
|
||||||
{
|
|
||||||
async fn run(&self, cancellation_token: CancellationToken) -> Result<(), MadError> {
|
|
||||||
self.execute(cancellation_token).await
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
Loading…
Reference in New Issue
Block a user