Compare commits

...

6 Commits
v0.2.1 ... main

Author SHA1 Message Date
b78423377c
docs: add examples
All checks were successful
continuous-integration/drone/push Build is passing
this is to show how we can use closures in the mad component context. It isn't super pretty because of the async closure, so we need to show the slighly complicated syntax

Signed-off-by: kjuulh <contact@kjuulh.io>
2024-08-07 17:09:52 +02:00
1446f4c3cf chore(release): v0.4.0 (#5)
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
### Added
- add correction
- add small docs

Co-authored-by: cuddle-please <bot@cuddle.sh>
Reviewed-on: #5
2024-08-07 16:51:15 +02:00
8a80480d94
feat: add correction
All checks were successful
continuous-integration/drone/push Build is passing
Signed-off-by: kjuulh <contact@kjuulh.io>
2024-08-07 16:44:23 +02:00
b7b2992730
feat: add small docs
All checks were successful
continuous-integration/drone/push Build is passing
Signed-off-by: kjuulh <contact@kjuulh.io>
2024-08-07 16:44:12 +02:00
61cbec0477 chore(release): v0.3.0 (#4)
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
### Added
- add add_fn to execute immediate lambdas

Co-authored-by: cuddle-please <bot@cuddle.sh>
Reviewed-on: #4
2024-08-07 15:57:09 +02:00
10e2739b6e
feat: add add_fn to execute immediate lambdas
All checks were successful
continuous-integration/drone/push Build is passing
Signed-off-by: kjuulh <contact@kjuulh.io>
2024-08-07 15:53:04 +02:00
10 changed files with 312 additions and 18 deletions

View File

@ -6,6 +6,17 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased] ## [Unreleased]
## [0.4.0] - 2024-08-07
### Added
- add correction
- add small docs
## [0.3.0] - 2024-08-07
### Added
- add add_fn to execute immediate lambdas
## [0.2.1] - 2024-08-07 ## [0.2.1] - 2024-08-07
### Docs ### Docs

3
Cargo.lock generated
View File

@ -236,7 +236,7 @@ checksum = "a7a70ba024b9dc04c27ea2f0c0548feb474ec5c54bba33a7f72f873a39d07b24"
[[package]] [[package]]
name = "mad" name = "mad"
version = "0.1.0" version = "0.3.0"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"async-trait", "async-trait",
@ -247,6 +247,7 @@ dependencies = [
"tokio", "tokio",
"tokio-util", "tokio-util",
"tracing", "tracing",
"tracing-subscriber",
"tracing-test", "tracing-test",
] ]

View File

@ -3,7 +3,7 @@ members = ["crates/*"]
resolver = "2" resolver = "2"
[workspace.package] [workspace.package]
version = "0.2.1" version = "0.4.0"
[workspace.dependencies] [workspace.dependencies]
mad = { path = "crates/mad" } mad = { path = "crates/mad" }

View File

@ -41,3 +41,11 @@ async fn main() -> anyhow::Result<()> {
} }
``` ```
## Examples
Can be found (here)[crates/mad/examples]
- basic
- fn
- signals
- error_log

View File

@ -15,4 +15,5 @@ tokio-util = "0.7.11"
tracing.workspace = true tracing.workspace = true
[dev-dependencies] [dev-dependencies]
tracing-subscriber = "0.3.18"
tracing-test = { version = "0.2.5", features = ["no-env-filter"] } tracing-test = { version = "0.2.5", features = ["no-env-filter"] }

View File

@ -0,0 +1,40 @@
use async_trait::async_trait;
use rand::Rng;
use tokio_util::sync::CancellationToken;
use tracing::Level;
struct WaitServer {}
#[async_trait]
impl mad::Component for WaitServer {
fn name(&self) -> Option<String> {
Some("WaitServer".into())
}
async fn run(&self, cancellation: CancellationToken) -> Result<(), mad::MadError> {
let millis_wait = rand::thread_rng().gen_range(500..3000);
tracing::debug!("waiting: {}ms", millis_wait);
// Simulates a server running for some time. Is normally supposed to be futures blocking indefinitely
tokio::time::sleep(std::time::Duration::from_millis(millis_wait)).await;
Ok(())
}
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt()
.with_max_level(Level::TRACE)
.init();
mad::Mad::builder()
.add(WaitServer {})
.add(WaitServer {})
.add(WaitServer {})
.add(WaitServer {})
.run()
.await?;
Ok(())
}

View File

@ -0,0 +1,42 @@
use async_trait::async_trait;
use rand::Rng;
use tokio_util::sync::CancellationToken;
use tracing::Level;
struct ErrorServer {}
#[async_trait]
impl mad::Component for ErrorServer {
fn name(&self) -> Option<String> {
Some("ErrorServer".into())
}
async fn run(&self, cancellation: CancellationToken) -> Result<(), mad::MadError> {
let millis_wait = rand::thread_rng().gen_range(500..3000);
tracing::debug!("waiting: {}ms", millis_wait);
// Simulates a server running for some time. Is normally supposed to be futures blocking indefinitely
tokio::time::sleep(std::time::Duration::from_millis(millis_wait)).await;
Err(mad::MadError::Inner(anyhow::anyhow!("expected error")))
}
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt()
.with_max_level(Level::TRACE)
.init();
// Do note that only the first server which returns an error is guaranteed to be handled. This is because if servers don't respect cancellation, they will be dropped
mad::Mad::builder()
.add(ErrorServer {})
.add(ErrorServer {})
.add(ErrorServer {})
.add(ErrorServer {})
.run()
.await?;
Ok(())
}

View File

@ -0,0 +1,67 @@
use async_trait::async_trait;
use rand::Rng;
use tokio_util::sync::CancellationToken;
use tracing::Level;
struct WaitServer {}
#[async_trait]
impl mad::Component for WaitServer {
fn name(&self) -> Option<String> {
Some("WaitServer".into())
}
async fn run(&self, _cancellation: CancellationToken) -> Result<(), mad::MadError> {
let millis_wait = rand::thread_rng().gen_range(500..3000);
tracing::debug!("waiting: {}ms", millis_wait);
// Simulates a server running for some time. Is normally supposed to be futures blocking indefinitely
tokio::time::sleep(std::time::Duration::from_millis(millis_wait)).await;
Ok(())
}
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt()
.with_max_level(Level::TRACE)
.init();
let item = "some item".to_string();
mad::Mad::builder()
.add(WaitServer {})
.add_fn(|_cancel| async move {
let millis_wait = 50;
tracing::debug!("waiting: {}ms", millis_wait);
// Simulates a server running for some time. Is normally supposed to be futures blocking indefinitely
tokio::time::sleep(std::time::Duration::from_millis(millis_wait)).await;
Ok(())
})
.add_fn(move |_cancel| {
// I am an actual closure
let item = item.clone();
async move {
let _item = item;
let millis_wait = 50;
tracing::debug!("waiting: {}ms", millis_wait);
// Simulates a server running for some time. Is normally supposed to be futures blocking indefinitely
tokio::time::sleep(std::time::Duration::from_millis(millis_wait)).await;
Ok(())
}
})
.run()
.await?;
Ok(())
}

View File

@ -0,0 +1,69 @@
use async_trait::async_trait;
use rand::Rng;
use tokio_util::sync::CancellationToken;
use tracing::Level;
struct WaitServer {}
#[async_trait]
impl mad::Component for WaitServer {
fn name(&self) -> Option<String> {
Some("WaitServer".into())
}
async fn run(&self, cancellation: CancellationToken) -> Result<(), mad::MadError> {
let millis_wait = rand::thread_rng().gen_range(500..3000);
tracing::debug!("waiting: {}ms", millis_wait);
// Simulates a server running for some time. Is normally supposed to be futures blocking indefinitely
tokio::time::sleep(std::time::Duration::from_millis(millis_wait)).await;
Ok(())
}
}
struct RespectCancel {}
#[async_trait]
impl mad::Component for RespectCancel {
fn name(&self) -> Option<String> {
Some("RespectCancel".into())
}
async fn run(&self, cancellation: CancellationToken) -> Result<(), mad::MadError> {
cancellation.cancelled().await;
tracing::debug!("stopping because job is cancelled");
Ok(())
}
}
struct NeverStopServer {}
#[async_trait]
impl mad::Component for NeverStopServer {
fn name(&self) -> Option<String> {
Some("NeverStopServer".into())
}
async fn run(&self, cancellation: CancellationToken) -> Result<(), mad::MadError> {
// Simulates a server running for some time. Is normally supposed to be futures blocking indefinitely
tokio::time::sleep(std::time::Duration::from_millis(999999999)).await;
Ok(())
}
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt()
.with_max_level(Level::TRACE)
.init();
mad::Mad::builder()
.add(WaitServer {})
.add(NeverStopServer {})
.add(RespectCancel {})
.run()
.await?;
Ok(())
}

View File

@ -49,6 +49,11 @@ pub struct Mad {
should_cancel: Option<std::time::Duration>, should_cancel: Option<std::time::Duration>,
} }
struct CompletionResult {
res: Result<(), MadError>,
name: Option<String>,
}
impl Mad { impl Mad {
pub fn builder() -> Self { pub fn builder() -> Self {
Self { Self {
@ -64,6 +69,16 @@ impl Mad {
self self
} }
pub fn add_fn<F, Fut>(&mut self, f: F) -> &mut Self
where
F: Fn(CancellationToken) -> Fut + Send + Sync + 'static,
Fut: futures::Future<Output = Result<(), MadError>> + Send + 'static,
{
let comp = ClosureComponent { inner: Box::new(f) };
self.add(comp)
}
pub fn cancellation(&mut self, should_cancel: Option<std::time::Duration>) -> &mut Self { pub fn cancellation(&mut self, should_cancel: Option<std::time::Duration>) -> &mut Self {
self.should_cancel = should_cancel; self.should_cancel = should_cancel;
@ -79,6 +94,7 @@ impl Mad {
let close_result = self.close_components().await; let close_result = self.close_components().await;
tracing::info!("mad is closed down");
match (run_result, close_result) { match (run_result, close_result) {
(Err(run), Err(close)) => { (Err(run), Err(close)) => {
return Err(MadError::AggregateError(AggregateError { return Err(MadError::AggregateError(AggregateError {
@ -122,22 +138,23 @@ impl Mad {
let cancellation_token = cancellation_token.child_token(); let cancellation_token = cancellation_token.child_token();
let job_cancellation = job_cancellation.child_token(); let job_cancellation = job_cancellation.child_token();
let (error_tx, error_rx) = tokio::sync::mpsc::channel::<Result<(), MadError>>(1); let (error_tx, error_rx) = tokio::sync::mpsc::channel::<CompletionResult>(1);
channels.push(error_rx); channels.push(error_rx);
tokio::spawn(async move { tokio::spawn(async move {
tracing::debug!(component = &comp.name(), "mad running"); let name = comp.name().clone();
tracing::debug!(component = name, "mad running");
tokio::select! { tokio::select! {
_ = cancellation_token.cancelled() => { _ = cancellation_token.cancelled() => {
error_tx.send(Ok(())).await error_tx.send(CompletionResult { res: Ok(()) , name }).await
} }
res = comp.run(job_cancellation) => { res = comp.run(job_cancellation) => {
error_tx.send(res).await error_tx.send(CompletionResult { res , name }).await
} }
_ = tokio::signal::ctrl_c() => { _ = tokio::signal::ctrl_c() => {
error_tx.send(Ok(())).await error_tx.send(CompletionResult { res: Ok(()) , name }).await
} }
} }
}); });
@ -149,19 +166,26 @@ impl Mad {
} }
while let Some(Some(msg)) = futures.next().await { while let Some(Some(msg)) = futures.next().await {
tracing::trace!("received end signal from a component"); match msg.res {
Err(e) => {
tracing::debug!(
error = e.to_string(),
component = msg.name,
"component ran to completion with error"
);
}
Ok(_) => {
tracing::debug!(component = msg.name, "component ran to completion");
}
}
if let Err(e) = msg {
tracing::debug!(error = e.to_string(), "stopping running components");
job_cancellation.cancel(); job_cancellation.cancel();
if let Some(cancel_wait) = self.should_cancel { if let Some(cancel_wait) = self.should_cancel {
tokio::time::sleep(cancel_wait).await; tokio::time::sleep(cancel_wait).await;
cancellation_token.cancel(); cancellation_token.cancel();
} }
} }
}
tracing::debug!("ran components"); tracing::debug!("ran components");
@ -211,3 +235,34 @@ impl<T: Component + Send + Sync + 'static> IntoComponent for T {
Arc::new(self) Arc::new(self)
} }
} }
struct ClosureComponent<F, Fut>
where
F: Fn(CancellationToken) -> Fut + Send + Sync + 'static,
Fut: futures::Future<Output = Result<(), MadError>> + Send + 'static,
{
inner: Box<F>,
}
impl<F, Fut> ClosureComponent<F, Fut>
where
F: Fn(CancellationToken) -> Fut + Send + Sync + 'static,
Fut: futures::Future<Output = Result<(), MadError>> + Send + 'static,
{
pub async fn execute(&self, cancellation_token: CancellationToken) -> Result<(), MadError> {
(*self.inner)(cancellation_token).await?;
Ok(())
}
}
#[async_trait::async_trait]
impl<F, Fut> Component for ClosureComponent<F, Fut>
where
F: Fn(CancellationToken) -> Fut + Send + Sync + 'static,
Fut: futures::Future<Output = Result<(), MadError>> + Send + 'static,
{
async fn run(&self, cancellation_token: CancellationToken) -> Result<(), MadError> {
self.execute(cancellation_token).await
}
}