From 4bdb39c39d699ae45e487fbbbb61d6c93f04397b Mon Sep 17 00:00:00 2001 From: kjuulh Date: Fri, 4 Jul 2025 13:21:10 +0200 Subject: [PATCH] feat: update basic example with a more simple acquire and run function --- crates/noleader/examples/basic/main.rs | 33 ++++++---------- crates/noleader/src/lib.rs | 55 +++++++++++++++++++++++++- 2 files changed, 67 insertions(+), 21 deletions(-) diff --git a/crates/noleader/examples/basic/main.rs b/crates/noleader/examples/basic/main.rs index 6baacb5..732c1fd 100644 --- a/crates/noleader/examples/basic/main.rs +++ b/crates/noleader/examples/basic/main.rs @@ -23,28 +23,21 @@ async fn main() -> anyhow::Result<()> { tracing::info!("creating bucket"); leader.create_bucket().await?; - tokio::spawn({ - let leader = leader.clone(); - let leader_id = leader_id.clone(); - - async move { - tracing::debug!(leader_id, "starting leader"); - leader - .start(CancellationToken::default()) - .await - .expect("to succeed"); - } - }); - leader - .do_while_leader(move |token| async move { - loop { - if token.is_cancelled() { - return Ok(()); - } + .acquire_and_run({ + move |token| { + let leader_id = leader_id.clone(); - tracing::info!("do work as leader"); - tokio::time::sleep(std::time::Duration::from_secs(1)).await; + async move { + loop { + if token.is_cancelled() { + return Ok(()); + } + + tracing::info!(leader_id, "do work as leader"); + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + } + } } }) .await?; diff --git a/crates/noleader/src/lib.rs b/crates/noleader/src/lib.rs index 4f761fd..296c87e 100644 --- a/crates/noleader/src/lib.rs +++ b/crates/noleader/src/lib.rs @@ -31,19 +31,72 @@ impl Leader { } } + pub async fn acquire_and_run(&self, f: F) -> anyhow::Result<()> + where + F: Fn(CancellationToken) -> Fut, + Fut: Future> + Send + 'static, + { + let parent_token = CancellationToken::default(); + let s = self.clone(); + + let server_token = parent_token.child_token(); + + // Start the server election process in another task, this is because start is blocking + let handle = tokio::spawn({ + let server_token = server_token.child_token(); + async move { + match s.start(server_token).await { + Ok(_) => {} + Err(e) => tracing::error!("leader election process failed: {}", e), + } + + tracing::info!("shutting down noleader"); + + parent_token.cancel(); + } + }); + + // Do the work if we're leader + let res = self + .do_while_leader_inner(server_token.child_token(), f) + .await; + + // Stop the server election process if our provided functions returns an error; + server_token.cancel(); + // Close down the task as well, it should already be stopped, but this forces the task to close + handle.abort(); + res?; + + Ok(()) + } + pub async fn do_while_leader(&self, f: F) -> anyhow::Result<()> + where + F: Fn(CancellationToken) -> Fut, + Fut: Future> + Send + 'static, + { + self.do_while_leader_inner(CancellationToken::new(), f) + .await + } + + async fn do_while_leader_inner( + &self, + cancellation_token: CancellationToken, + f: F, + ) -> anyhow::Result<()> where F: Fn(CancellationToken) -> Fut, Fut: Future> + Send + 'static, { loop { + let cancellation_token = cancellation_token.child_token(); + let is_leader = self.is_leader.clone(); if !is_leader.load(Ordering::Relaxed) { tokio::time::sleep(Duration::from_secs(1)).await; continue; } - let cancellation_token = CancellationToken::new(); let child_token = cancellation_token.child_token(); let guard = tokio::spawn(async move {