Compare commits
5 Commits
Author | SHA1 | Date | |
---|---|---|---|
c334dba445 | |||
aaf3a72d3b | |||
643d87895b | |||
21c1507ebe | |||
4bdb39c39d |
@ -6,6 +6,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
|||||||
|
|
||||||
## [Unreleased]
|
## [Unreleased]
|
||||||
|
|
||||||
|
## [0.1.2] - 2025-07-04
|
||||||
|
|
||||||
|
### Added
|
||||||
|
- update basic example with a more simple acquire and run function
|
||||||
|
|
||||||
|
### Docs
|
||||||
|
- update master
|
||||||
|
|
||||||
## [0.1.1] - 2025-07-04
|
## [0.1.1] - 2025-07-04
|
||||||
|
|
||||||
### Added
|
### Added
|
||||||
|
2
Cargo.lock
generated
2
Cargo.lock
generated
@ -645,7 +645,7 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "noleader"
|
name = "noleader"
|
||||||
version = "0.1.0"
|
version = "0.1.2"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"anyhow",
|
"anyhow",
|
||||||
"async-nats",
|
"async-nats",
|
||||||
|
@ -3,7 +3,7 @@ members = ["crates/*"]
|
|||||||
resolver = "2"
|
resolver = "2"
|
||||||
|
|
||||||
[workspace.package]
|
[workspace.package]
|
||||||
version = "0.1.1"
|
version = "0.1.2"
|
||||||
license = "MIT"
|
license = "MIT"
|
||||||
|
|
||||||
[workspace.dependencies]
|
[workspace.dependencies]
|
||||||
|
58
README.md
58
README.md
@ -22,9 +22,9 @@ This library is still young and the API is subject to change.
|
|||||||
|
|
||||||
## Intended use-case
|
## Intended use-case
|
||||||
|
|
||||||
Noleader is not built for distributed consensus, or fast re-election produces. It take upwards to a minute to get reelected, state is the users responsibility to handle.
|
Noleader is not built for distributed consensus, or fast re-election procedures. It take upwards to a minute to get re-elected, state is the users responsibility to handle.
|
||||||
|
|
||||||
Noleader is pretty much just a distributed lock, intended for use-cases where the use wants to only have a single node scheduling work etc.
|
Noleader is pretty much just a distributed lock, intended for use-cases where the user wants to only have a single node scheduling work etc.
|
||||||
|
|
||||||
Good alternatives are:
|
Good alternatives are:
|
||||||
|
|
||||||
@ -71,58 +71,40 @@ async fn main() -> anyhow::Result<()> {
|
|||||||
// Ensure the KV bucket exists
|
// Ensure the KV bucket exists
|
||||||
leader.create_bucket().await?;
|
leader.create_bucket().await?;
|
||||||
|
|
||||||
// Spawn the election loop
|
// Attempts to acquire election loop, will call inner function if it wins, if it loses it will retry over again.
|
||||||
tokio::spawn({
|
// Will block until either the inner function returns and error, or the election processes crashes, intended to allow the application to properly restart
|
||||||
let leader = leader.clone();
|
|
||||||
async move {
|
|
||||||
leader
|
leader
|
||||||
.start(CancellationToken::default())
|
.acquire_and_run({
|
||||||
.await
|
move |token| {
|
||||||
.expect("leadership loop failed");
|
let leader_id = leader_id.clone();
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
// Do work while we are the leader
|
async move {
|
||||||
leader
|
|
||||||
.do_while_leader(|cancel_token| async move {
|
|
||||||
loop {
|
loop {
|
||||||
if cancel_token.is_cancelled() {
|
if token.is_cancelled() {
|
||||||
break;
|
return Ok(());
|
||||||
}
|
}
|
||||||
tracing::info!("🔑 I am the leader—doing work");
|
|
||||||
|
tracing::info!(leader_id, "do work as leader");
|
||||||
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
|
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
|
||||||
}
|
}
|
||||||
Ok(())
|
}
|
||||||
|
}
|
||||||
})
|
})
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
## API Overview
|
## Examples
|
||||||
|
|
||||||
* **`Leader::new(bucket: &str, key: &str, client: async_nats::Client) -> Leader`**
|
See the examples folder in ./crates/noleader/examples
|
||||||
Create a new election participant.
|
|
||||||
* **`create_bucket(&self) -> anyhow::Result<()>`**
|
|
||||||
Ensures the KV bucket exists (no-op if already created).
|
|
||||||
* **`start(&self, token: CancellationToken) -> anyhow::Result<()>`**
|
|
||||||
Begins the background leader-election loop; renews TTL on success or retries on failure.
|
|
||||||
* **`do_while_leader<F, Fut>(&self, f: F) -> anyhow::Result<()>`**
|
|
||||||
Runs your closure as long as you hold leadership; cancels immediately on loss.
|
|
||||||
* **`leader_id(&self) -> Uuid`**
|
|
||||||
Returns your unique candidate ID.
|
|
||||||
* **`is_leader(&self) -> Status`**
|
|
||||||
Returns `Status::Leader` or `Status::Candidate`, taking shutdown into account.
|
|
||||||
|
|
||||||
### Types
|
## Architecture
|
||||||
|
|
||||||
|
Noleader uses a simple election stealing
|
||||||
|
|
||||||
```rust
|
|
||||||
pub enum Status {
|
|
||||||
Leader,
|
|
||||||
Candidate,
|
|
||||||
}
|
|
||||||
```
|
|
||||||
|
|
||||||
## License
|
## License
|
||||||
|
|
||||||
|
@ -1,7 +1,12 @@
|
|||||||
[package]
|
[package]
|
||||||
name = "noleader"
|
name = "noleader"
|
||||||
version = "0.1.0"
|
edition = "2024"
|
||||||
edition = "2021"
|
readme = "../../README.md"
|
||||||
|
version.workspace = true
|
||||||
|
license.workspace = true
|
||||||
|
repository = "https://git.front.kjuulh.io/kjuulh/noleader"
|
||||||
|
authors = ["kjuulh <contact@kasperhermansen.com>"]
|
||||||
|
description = "A small leader election package using NATS keyvalue store as the distributed locking mechanism. Does not require a min / max set of nodes"
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
anyhow.workspace = true
|
anyhow.workspace = true
|
||||||
|
@ -23,29 +23,22 @@ async fn main() -> anyhow::Result<()> {
|
|||||||
tracing::info!("creating bucket");
|
tracing::info!("creating bucket");
|
||||||
leader.create_bucket().await?;
|
leader.create_bucket().await?;
|
||||||
|
|
||||||
tokio::spawn({
|
leader
|
||||||
let leader = leader.clone();
|
.acquire_and_run({
|
||||||
|
move |token| {
|
||||||
let leader_id = leader_id.clone();
|
let leader_id = leader_id.clone();
|
||||||
|
|
||||||
async move {
|
async move {
|
||||||
tracing::debug!(leader_id, "starting leader");
|
|
||||||
leader
|
|
||||||
.start(CancellationToken::default())
|
|
||||||
.await
|
|
||||||
.expect("to succeed");
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
leader
|
|
||||||
.do_while_leader(move |token| async move {
|
|
||||||
loop {
|
loop {
|
||||||
if token.is_cancelled() {
|
if token.is_cancelled() {
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
|
|
||||||
tracing::info!("do work as leader");
|
tracing::info!(leader_id, "do work as leader");
|
||||||
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
|
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
})
|
})
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
|
@ -31,19 +31,72 @@ impl Leader {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub async fn acquire_and_run<F, Fut>(&self, f: F) -> anyhow::Result<()>
|
||||||
|
where
|
||||||
|
F: Fn(CancellationToken) -> Fut,
|
||||||
|
Fut: Future<Output = anyhow::Result<()>> + Send + 'static,
|
||||||
|
{
|
||||||
|
let parent_token = CancellationToken::default();
|
||||||
|
let s = self.clone();
|
||||||
|
|
||||||
|
let server_token = parent_token.child_token();
|
||||||
|
|
||||||
|
// Start the server election process in another task, this is because start is blocking
|
||||||
|
let handle = tokio::spawn({
|
||||||
|
let server_token = server_token.child_token();
|
||||||
|
async move {
|
||||||
|
match s.start(server_token).await {
|
||||||
|
Ok(_) => {}
|
||||||
|
Err(e) => tracing::error!("leader election process failed: {}", e),
|
||||||
|
}
|
||||||
|
|
||||||
|
tracing::info!("shutting down noleader");
|
||||||
|
|
||||||
|
parent_token.cancel();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// Do the work if we're leader
|
||||||
|
let res = self
|
||||||
|
.do_while_leader_inner(server_token.child_token(), f)
|
||||||
|
.await;
|
||||||
|
|
||||||
|
// Stop the server election process if our provided functions returns an error;
|
||||||
|
server_token.cancel();
|
||||||
|
// Close down the task as well, it should already be stopped, but this forces the task to close
|
||||||
|
handle.abort();
|
||||||
|
res?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
pub async fn do_while_leader<F, Fut>(&self, f: F) -> anyhow::Result<()>
|
pub async fn do_while_leader<F, Fut>(&self, f: F) -> anyhow::Result<()>
|
||||||
|
where
|
||||||
|
F: Fn(CancellationToken) -> Fut,
|
||||||
|
Fut: Future<Output = anyhow::Result<()>> + Send + 'static,
|
||||||
|
{
|
||||||
|
self.do_while_leader_inner(CancellationToken::new(), f)
|
||||||
|
.await
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn do_while_leader_inner<F, Fut>(
|
||||||
|
&self,
|
||||||
|
cancellation_token: CancellationToken,
|
||||||
|
f: F,
|
||||||
|
) -> anyhow::Result<()>
|
||||||
where
|
where
|
||||||
F: Fn(CancellationToken) -> Fut,
|
F: Fn(CancellationToken) -> Fut,
|
||||||
Fut: Future<Output = anyhow::Result<()>> + Send + 'static,
|
Fut: Future<Output = anyhow::Result<()>> + Send + 'static,
|
||||||
{
|
{
|
||||||
loop {
|
loop {
|
||||||
|
let cancellation_token = cancellation_token.child_token();
|
||||||
|
|
||||||
let is_leader = self.is_leader.clone();
|
let is_leader = self.is_leader.clone();
|
||||||
if !is_leader.load(Ordering::Relaxed) {
|
if !is_leader.load(Ordering::Relaxed) {
|
||||||
tokio::time::sleep(Duration::from_secs(1)).await;
|
tokio::time::sleep(Duration::from_secs(1)).await;
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
let cancellation_token = CancellationToken::new();
|
|
||||||
let child_token = cancellation_token.child_token();
|
let child_token = cancellation_token.child_token();
|
||||||
|
|
||||||
let guard = tokio::spawn(async move {
|
let guard = tokio::spawn(async move {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user