Compare commits

...

5 Commits
v0.1.1 ... main

Author SHA1 Message Date
c334dba445 feat: do publish
All checks were successful
continuous-integration/drone/push Build is passing
2025-07-06 20:56:42 +02:00
aaf3a72d3b feat: allow readme
All checks were successful
continuous-integration/drone/push Build is passing
2025-07-04 13:28:28 +02:00
643d87895b chore(release): v0.1.2 (#2)
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
chore(release): 0.1.2

Co-authored-by: cuddle-please <bot@cuddle.sh>
Reviewed-on: #2
2025-07-04 13:25:21 +02:00
21c1507ebe docs: update master
All checks were successful
continuous-integration/drone/push Build is passing
2025-07-04 13:24:24 +02:00
4bdb39c39d feat: update basic example with a more simple acquire and run function
All checks were successful
continuous-integration/drone/push Build is passing
2025-07-04 13:22:04 +02:00
7 changed files with 107 additions and 66 deletions

View File

@ -6,6 +6,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased] ## [Unreleased]
## [0.1.2] - 2025-07-04
### Added
- update basic example with a more simple acquire and run function
### Docs
- update master
## [0.1.1] - 2025-07-04 ## [0.1.1] - 2025-07-04
### Added ### Added

2
Cargo.lock generated
View File

@ -645,7 +645,7 @@ dependencies = [
[[package]] [[package]]
name = "noleader" name = "noleader"
version = "0.1.0" version = "0.1.2"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"async-nats", "async-nats",

View File

@ -3,7 +3,7 @@ members = ["crates/*"]
resolver = "2" resolver = "2"
[workspace.package] [workspace.package]
version = "0.1.1" version = "0.1.2"
license = "MIT" license = "MIT"
[workspace.dependencies] [workspace.dependencies]

View File

@ -22,9 +22,9 @@ This library is still young and the API is subject to change.
## Intended use-case ## Intended use-case
Noleader is not built for distributed consensus, or fast re-election produces. It take upwards to a minute to get reelected, state is the users responsibility to handle. Noleader is not built for distributed consensus, or fast re-election procedures. It take upwards to a minute to get re-elected, state is the users responsibility to handle.
Noleader is pretty much just a distributed lock, intended for use-cases where the use wants to only have a single node scheduling work etc. Noleader is pretty much just a distributed lock, intended for use-cases where the user wants to only have a single node scheduling work etc.
Good alternatives are: Good alternatives are:
@ -71,58 +71,40 @@ async fn main() -> anyhow::Result<()> {
// Ensure the KV bucket exists // Ensure the KV bucket exists
leader.create_bucket().await?; leader.create_bucket().await?;
// Spawn the election loop // Attempts to acquire election loop, will call inner function if it wins, if it loses it will retry over again.
tokio::spawn({ // Will block until either the inner function returns and error, or the election processes crashes, intended to allow the application to properly restart
let leader = leader.clone();
async move {
leader leader
.start(CancellationToken::default()) .acquire_and_run({
.await move |token| {
.expect("leadership loop failed"); let leader_id = leader_id.clone();
}
});
// Do work while we are the leader async move {
leader
.do_while_leader(|cancel_token| async move {
loop { loop {
if cancel_token.is_cancelled() { if token.is_cancelled() {
break; return Ok(());
} }
tracing::info!("🔑 I am the leader—doing work");
tracing::info!(leader_id, "do work as leader");
tokio::time::sleep(std::time::Duration::from_secs(1)).await; tokio::time::sleep(std::time::Duration::from_secs(1)).await;
} }
Ok(()) }
}
}) })
.await?; .await?;
Ok(()) Ok(())
} }
``` ```
## API Overview ## Examples
* **`Leader::new(bucket: &str, key: &str, client: async_nats::Client) -> Leader`** See the examples folder in ./crates/noleader/examples
Create a new election participant.
* **`create_bucket(&self) -> anyhow::Result<()>`**
Ensures the KV bucket exists (no-op if already created).
* **`start(&self, token: CancellationToken) -> anyhow::Result<()>`**
Begins the background leader-election loop; renews TTL on success or retries on failure.
* **`do_while_leader<F, Fut>(&self, f: F) -> anyhow::Result<()>`**
Runs your closure as long as you hold leadership; cancels immediately on loss.
* **`leader_id(&self) -> Uuid`**
Returns your unique candidate ID.
* **`is_leader(&self) -> Status`**
Returns `Status::Leader` or `Status::Candidate`, taking shutdown into account.
### Types ## Architecture
Noleader uses a simple election stealing
```rust
pub enum Status {
Leader,
Candidate,
}
```
## License ## License

View File

@ -1,7 +1,12 @@
[package] [package]
name = "noleader" name = "noleader"
version = "0.1.0" edition = "2024"
edition = "2021" readme = "../../README.md"
version.workspace = true
license.workspace = true
repository = "https://git.front.kjuulh.io/kjuulh/noleader"
authors = ["kjuulh <contact@kasperhermansen.com>"]
description = "A small leader election package using NATS keyvalue store as the distributed locking mechanism. Does not require a min / max set of nodes"
[dependencies] [dependencies]
anyhow.workspace = true anyhow.workspace = true

View File

@ -23,29 +23,22 @@ async fn main() -> anyhow::Result<()> {
tracing::info!("creating bucket"); tracing::info!("creating bucket");
leader.create_bucket().await?; leader.create_bucket().await?;
tokio::spawn({ leader
let leader = leader.clone(); .acquire_and_run({
move |token| {
let leader_id = leader_id.clone(); let leader_id = leader_id.clone();
async move { async move {
tracing::debug!(leader_id, "starting leader");
leader
.start(CancellationToken::default())
.await
.expect("to succeed");
}
});
leader
.do_while_leader(move |token| async move {
loop { loop {
if token.is_cancelled() { if token.is_cancelled() {
return Ok(()); return Ok(());
} }
tracing::info!("do work as leader"); tracing::info!(leader_id, "do work as leader");
tokio::time::sleep(std::time::Duration::from_secs(1)).await; tokio::time::sleep(std::time::Duration::from_secs(1)).await;
} }
}
}
}) })
.await?; .await?;

View File

@ -31,19 +31,72 @@ impl Leader {
} }
} }
pub async fn acquire_and_run<F, Fut>(&self, f: F) -> anyhow::Result<()>
where
F: Fn(CancellationToken) -> Fut,
Fut: Future<Output = anyhow::Result<()>> + Send + 'static,
{
let parent_token = CancellationToken::default();
let s = self.clone();
let server_token = parent_token.child_token();
// Start the server election process in another task, this is because start is blocking
let handle = tokio::spawn({
let server_token = server_token.child_token();
async move {
match s.start(server_token).await {
Ok(_) => {}
Err(e) => tracing::error!("leader election process failed: {}", e),
}
tracing::info!("shutting down noleader");
parent_token.cancel();
}
});
// Do the work if we're leader
let res = self
.do_while_leader_inner(server_token.child_token(), f)
.await;
// Stop the server election process if our provided functions returns an error;
server_token.cancel();
// Close down the task as well, it should already be stopped, but this forces the task to close
handle.abort();
res?;
Ok(())
}
pub async fn do_while_leader<F, Fut>(&self, f: F) -> anyhow::Result<()> pub async fn do_while_leader<F, Fut>(&self, f: F) -> anyhow::Result<()>
where
F: Fn(CancellationToken) -> Fut,
Fut: Future<Output = anyhow::Result<()>> + Send + 'static,
{
self.do_while_leader_inner(CancellationToken::new(), f)
.await
}
async fn do_while_leader_inner<F, Fut>(
&self,
cancellation_token: CancellationToken,
f: F,
) -> anyhow::Result<()>
where where
F: Fn(CancellationToken) -> Fut, F: Fn(CancellationToken) -> Fut,
Fut: Future<Output = anyhow::Result<()>> + Send + 'static, Fut: Future<Output = anyhow::Result<()>> + Send + 'static,
{ {
loop { loop {
let cancellation_token = cancellation_token.child_token();
let is_leader = self.is_leader.clone(); let is_leader = self.is_leader.clone();
if !is_leader.load(Ordering::Relaxed) { if !is_leader.load(Ordering::Relaxed) {
tokio::time::sleep(Duration::from_secs(1)).await; tokio::time::sleep(Duration::from_secs(1)).await;
continue; continue;
} }
let cancellation_token = CancellationToken::new();
let child_token = cancellation_token.child_token(); let child_token = cancellation_token.child_token();
let guard = tokio::spawn(async move { let guard = tokio::spawn(async move {