feat: update basic example with a more simple acquire and run function
All checks were successful
continuous-integration/drone/push Build is passing
All checks were successful
continuous-integration/drone/push Build is passing
This commit is contained in:
parent
57295a41c2
commit
4bdb39c39d
@ -23,28 +23,21 @@ async fn main() -> anyhow::Result<()> {
|
||||
tracing::info!("creating bucket");
|
||||
leader.create_bucket().await?;
|
||||
|
||||
tokio::spawn({
|
||||
let leader = leader.clone();
|
||||
let leader_id = leader_id.clone();
|
||||
|
||||
async move {
|
||||
tracing::debug!(leader_id, "starting leader");
|
||||
leader
|
||||
.start(CancellationToken::default())
|
||||
.await
|
||||
.expect("to succeed");
|
||||
}
|
||||
});
|
||||
|
||||
leader
|
||||
.do_while_leader(move |token| async move {
|
||||
loop {
|
||||
if token.is_cancelled() {
|
||||
return Ok(());
|
||||
}
|
||||
.acquire_and_run({
|
||||
move |token| {
|
||||
let leader_id = leader_id.clone();
|
||||
|
||||
tracing::info!("do work as leader");
|
||||
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
|
||||
async move {
|
||||
loop {
|
||||
if token.is_cancelled() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
tracing::info!(leader_id, "do work as leader");
|
||||
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
.await?;
|
||||
|
@ -31,19 +31,72 @@ impl Leader {
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn acquire_and_run<F, Fut>(&self, f: F) -> anyhow::Result<()>
|
||||
where
|
||||
F: Fn(CancellationToken) -> Fut,
|
||||
Fut: Future<Output = anyhow::Result<()>> + Send + 'static,
|
||||
{
|
||||
let parent_token = CancellationToken::default();
|
||||
let s = self.clone();
|
||||
|
||||
let server_token = parent_token.child_token();
|
||||
|
||||
// Start the server election process in another task, this is because start is blocking
|
||||
let handle = tokio::spawn({
|
||||
let server_token = server_token.child_token();
|
||||
async move {
|
||||
match s.start(server_token).await {
|
||||
Ok(_) => {}
|
||||
Err(e) => tracing::error!("leader election process failed: {}", e),
|
||||
}
|
||||
|
||||
tracing::info!("shutting down noleader");
|
||||
|
||||
parent_token.cancel();
|
||||
}
|
||||
});
|
||||
|
||||
// Do the work if we're leader
|
||||
let res = self
|
||||
.do_while_leader_inner(server_token.child_token(), f)
|
||||
.await;
|
||||
|
||||
// Stop the server election process if our provided functions returns an error;
|
||||
server_token.cancel();
|
||||
// Close down the task as well, it should already be stopped, but this forces the task to close
|
||||
handle.abort();
|
||||
res?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn do_while_leader<F, Fut>(&self, f: F) -> anyhow::Result<()>
|
||||
where
|
||||
F: Fn(CancellationToken) -> Fut,
|
||||
Fut: Future<Output = anyhow::Result<()>> + Send + 'static,
|
||||
{
|
||||
self.do_while_leader_inner(CancellationToken::new(), f)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn do_while_leader_inner<F, Fut>(
|
||||
&self,
|
||||
cancellation_token: CancellationToken,
|
||||
f: F,
|
||||
) -> anyhow::Result<()>
|
||||
where
|
||||
F: Fn(CancellationToken) -> Fut,
|
||||
Fut: Future<Output = anyhow::Result<()>> + Send + 'static,
|
||||
{
|
||||
loop {
|
||||
let cancellation_token = cancellation_token.child_token();
|
||||
|
||||
let is_leader = self.is_leader.clone();
|
||||
if !is_leader.load(Ordering::Relaxed) {
|
||||
tokio::time::sleep(Duration::from_secs(1)).await;
|
||||
continue;
|
||||
}
|
||||
|
||||
let cancellation_token = CancellationToken::new();
|
||||
let child_token = cancellation_token.child_token();
|
||||
|
||||
let guard = tokio::spawn(async move {
|
||||
|
Loading…
x
Reference in New Issue
Block a user