Compare commits

...

3 Commits

Author SHA1 Message Date
cuddle-please
eda50d290a chore(release): 0.1.2
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/pr Build is passing
2025-07-04 11:22:55 +00:00
4bdb39c39d feat: update basic example with a more simple acquire and run function
All checks were successful
continuous-integration/drone/push Build is passing
2025-07-04 13:22:04 +02:00
57295a41c2 chore(release): v0.1.1 (#1)
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
chore(release): 0.1.1

Co-authored-by: cuddle-please <bot@cuddle.sh>
Reviewed-on: #1
2025-07-04 12:53:41 +02:00
4 changed files with 85 additions and 22 deletions

17
CHANGELOG.md Normal file
View File

@ -0,0 +1,17 @@
# Changelog
All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [Unreleased]
## [0.1.2] - 2025-07-04
### Added
- update basic example with a more simple acquire and run function
## [0.1.1] - 2025-07-04
### Added
- empty commit

View File

@ -3,7 +3,7 @@ members = ["crates/*"]
resolver = "2"
[workspace.package]
version = "0.0.1"
version = "0.1.2"
license = "MIT"
[workspace.dependencies]

View File

@ -23,28 +23,21 @@ async fn main() -> anyhow::Result<()> {
tracing::info!("creating bucket");
leader.create_bucket().await?;
tokio::spawn({
let leader = leader.clone();
let leader_id = leader_id.clone();
async move {
tracing::debug!(leader_id, "starting leader");
leader
.start(CancellationToken::default())
.await
.expect("to succeed");
}
});
leader
.do_while_leader(move |token| async move {
loop {
if token.is_cancelled() {
return Ok(());
}
.acquire_and_run({
move |token| {
let leader_id = leader_id.clone();
tracing::info!("do work as leader");
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
async move {
loop {
if token.is_cancelled() {
return Ok(());
}
tracing::info!(leader_id, "do work as leader");
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
}
}
})
.await?;

View File

@ -31,19 +31,72 @@ impl Leader {
}
}
pub async fn acquire_and_run<F, Fut>(&self, f: F) -> anyhow::Result<()>
where
F: Fn(CancellationToken) -> Fut,
Fut: Future<Output = anyhow::Result<()>> + Send + 'static,
{
let parent_token = CancellationToken::default();
let s = self.clone();
let server_token = parent_token.child_token();
// Start the server election process in another task, this is because start is blocking
let handle = tokio::spawn({
let server_token = server_token.child_token();
async move {
match s.start(server_token).await {
Ok(_) => {}
Err(e) => tracing::error!("leader election process failed: {}", e),
}
tracing::info!("shutting down noleader");
parent_token.cancel();
}
});
// Do the work if we're leader
let res = self
.do_while_leader_inner(server_token.child_token(), f)
.await;
// Stop the server election process if our provided functions returns an error;
server_token.cancel();
// Close down the task as well, it should already be stopped, but this forces the task to close
handle.abort();
res?;
Ok(())
}
pub async fn do_while_leader<F, Fut>(&self, f: F) -> anyhow::Result<()>
where
F: Fn(CancellationToken) -> Fut,
Fut: Future<Output = anyhow::Result<()>> + Send + 'static,
{
self.do_while_leader_inner(CancellationToken::new(), f)
.await
}
async fn do_while_leader_inner<F, Fut>(
&self,
cancellation_token: CancellationToken,
f: F,
) -> anyhow::Result<()>
where
F: Fn(CancellationToken) -> Fut,
Fut: Future<Output = anyhow::Result<()>> + Send + 'static,
{
loop {
let cancellation_token = cancellation_token.child_token();
let is_leader = self.is_leader.clone();
if !is_leader.load(Ordering::Relaxed) {
tokio::time::sleep(Duration::from_secs(1)).await;
continue;
}
let cancellation_token = CancellationToken::new();
let child_token = cancellation_token.child_token();
let guard = tokio::spawn(async move {