commit fedf7598c9a3f2fe7bfe64cd1f3626d42e846980 Author: kjuulh Date: Tue Jul 1 15:24:59 2025 +0200 feat: add basic worker pool diff --git a/.drone.yml b/.drone.yml new file mode 100644 index 0000000..7d36638 --- /dev/null +++ b/.drone.yml @@ -0,0 +1,2 @@ +kind: template +load: cuddle-rust-lib-plan.yaml diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..9c4c004 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +target/ +.cuddle/ diff --git a/Cargo.lock b/Cargo.lock new file mode 100644 index 0000000..42277d2 --- /dev/null +++ b/Cargo.lock @@ -0,0 +1,387 @@ +# This file is automatically @generated by Cargo. +# It is not intended for manual editing. +version = 4 + +[[package]] +name = "anyhow" +version = "1.0.71" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c7d0618f0e0b7e8ff11427422b64564d5fb0be1940354bfe2e0529b18a9d9b8" + +[[package]] +name = "autocfg" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" + +[[package]] +name = "bitflags" +version = "1.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" + +[[package]] +name = "bytes" +version = "1.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d71b6127be86fdcfddb610f7182ac57211d4b18a3e9c82eb2d17662f2227ad6a" + +[[package]] +name = "cfg-if" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" + +[[package]] +name = "futures-core" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e" + +[[package]] +name = "futures-sink" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e575fab7d1e0dcb8d0c7bcf9a63ee213816ab51902e6d244a95819acacf1d4f7" + +[[package]] +name = "hermit-abi" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee512640fe35acbfb4bb779db6f0d80704c2cacfa2e39b601ef3e3f47d1ae4c7" +dependencies = [ + "libc", +] + +[[package]] +name = "libc" +version = "0.2.146" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f92be4933c13fd498862a9e02a3055f8a8d9c039ce33db97306fd5a6caa7f29b" + +[[package]] +name = "lock_api" +version = "0.4.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c1cc9717a20b1bb222f333e6a92fd32f7d8a18ddc5a3191a11af45dcbf4dcd16" +dependencies = [ + "autocfg", + "scopeguard", +] + +[[package]] +name = "log" +version = "0.4.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b06a4cde4c0f271a446782e3eff8de789548ce57dbc8eca9292c27f4a42004b4" + +[[package]] +name = "mio" +version = "0.8.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "927a765cd3fc26206e66b296465fa9d3e5ab003e651c1b3c060e7956d96b19d2" +dependencies = [ + "libc", + "wasi", + "windows-sys", +] + +[[package]] +name = "noworkers" +version = "0.1.0" +dependencies = [ + "anyhow", + "tokio", + "tokio-util", + "tracing", +] + +[[package]] +name = "num_cpus" +version = "1.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0fac9e2da13b5eb447a6ce3d392f23a29d8694bff781bf03a16cd9ac8697593b" +dependencies = [ + "hermit-abi", + "libc", +] + +[[package]] +name = "once_cell" +version = "1.18.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dd8b5dd2ae5ed71462c540258bedcb51965123ad7e7ccf4b9a8cafaa4a63576d" + +[[package]] +name = "parking_lot" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3742b2c103b9f06bc9fff0a37ff4912935851bee6d36f3c02bcc755bcfec228f" +dependencies = [ + "lock_api", + "parking_lot_core", +] + +[[package]] +name = "parking_lot_core" +version = "0.9.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "93f00c865fe7cabf650081affecd3871070f26767e7b2070a3ffae14c654b447" +dependencies = [ + "cfg-if", + "libc", + "redox_syscall", + "smallvec", + "windows-targets", +] + +[[package]] +name = "pin-project-lite" +version = "0.2.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b3cff922bd51709b605d9ead9aa71031d81447142d828eb4a6eba76fe619f9b" + +[[package]] +name = "proc-macro2" +version = "1.0.60" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dec2b086b7a862cf4de201096214fa870344cf922b2b30c167badb3af3195406" +dependencies = [ + "unicode-ident", +] + +[[package]] +name = "quote" +version = "1.0.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1b9ab9c7eadfd8df19006f1cf1a4aed13540ed5cbc047010ece5826e10825488" +dependencies = [ + "proc-macro2", +] + +[[package]] +name = "redox_syscall" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "567664f262709473930a4bf9e51bf2ebf3348f2e748ccc50dea20646858f8f29" +dependencies = [ + "bitflags", +] + +[[package]] +name = "scopeguard" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd" + +[[package]] +name = "signal-hook-registry" +version = "1.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d8229b473baa5980ac72ef434c4415e70c4b5e71b423043adb4ba059f89c99a1" +dependencies = [ + "libc", +] + +[[package]] +name = "smallvec" +version = "1.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a507befe795404456341dfab10cef66ead4c041f62b8b11bbb92bffe5d0953e0" + +[[package]] +name = "socket2" +version = "0.4.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "64a4a911eed85daf18834cfaa86a79b7d266ff93ff5ba14005426219480ed662" +dependencies = [ + "libc", + "winapi", +] + +[[package]] +name = "syn" +version = "2.0.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32d41677bcbe24c20c52e7c70b0d8db04134c5d1066bf98662e2871ad200ea3e" +dependencies = [ + "proc-macro2", + "quote", + "unicode-ident", +] + +[[package]] +name = "tokio" +version = "1.28.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94d7b1cfd2aa4011f2de74c2c4c63665e27a71006b0a192dcd2710272e73dfa2" +dependencies = [ + "autocfg", + "bytes", + "libc", + "mio", + "num_cpus", + "parking_lot", + "pin-project-lite", + "signal-hook-registry", + "socket2", + "tokio-macros", + "windows-sys", +] + +[[package]] +name = "tokio-macros" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "630bdcf245f78637c13ec01ffae6187cca34625e8c63150d424b59e55af2675e" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "tokio-util" +version = "0.7.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "66a539a9ad6d5d281510d5bd368c973d636c02dbf8a67300bfb6b950696ad7df" +dependencies = [ + "bytes", + "futures-core", + "futures-sink", + "pin-project-lite", + "tokio", +] + +[[package]] +name = "tracing" +version = "0.1.37" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ce8c33a8d48bd45d624a6e523445fd21ec13d3653cd51f681abf67418f54eb8" +dependencies = [ + "cfg-if", + "log", + "pin-project-lite", + "tracing-attributes", + "tracing-core", +] + +[[package]] +name = "tracing-attributes" +version = "0.1.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0f57e3ca2a01450b1a921183a9c9cbfda207fd822cef4ccb00a65402cbba7a74" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "tracing-core" +version = "0.1.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0955b8137a1df6f1a2e9a37d8a6656291ff0297c1a97c24e0d8425fe2312f79a" +dependencies = [ + "once_cell", +] + +[[package]] +name = "unicode-ident" +version = "1.0.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b15811caf2415fb889178633e7724bad2509101cde276048e013b9def5e51fa0" + +[[package]] +name = "wasi" +version = "0.11.0+wasi-snapshot-preview1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" + +[[package]] +name = "winapi" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419" +dependencies = [ + "winapi-i686-pc-windows-gnu", + "winapi-x86_64-pc-windows-gnu", +] + +[[package]] +name = "winapi-i686-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" + +[[package]] +name = "winapi-x86_64-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" + +[[package]] +name = "windows-sys" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "677d2418bec65e3338edb076e806bc1ec15693c5d0104683f2efe857f61056a9" +dependencies = [ + "windows-targets", +] + +[[package]] +name = "windows-targets" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b1eb6f0cd7c80c79759c929114ef071b87354ce476d9d94271031c0497adfd5" +dependencies = [ + "windows_aarch64_gnullvm", + "windows_aarch64_msvc", + "windows_i686_gnu", + "windows_i686_msvc", + "windows_x86_64_gnu", + "windows_x86_64_gnullvm", + "windows_x86_64_msvc", +] + +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "91ae572e1b79dba883e0d315474df7305d12f569b400fcf90581b06062f7e1bc" + +[[package]] +name = "windows_aarch64_msvc" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b2ef27e0d7bdfcfc7b868b317c1d32c641a6fe4629c171b8928c7b08d98d7cf3" + +[[package]] +name = "windows_i686_gnu" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "622a1962a7db830d6fd0a69683c80a18fda201879f0f447f065a3b7467daa241" + +[[package]] +name = "windows_i686_msvc" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4542c6e364ce21bf45d69fdd2a8e455fa38d316158cfd43b3ac1c5b1b19f8e00" + +[[package]] +name = "windows_x86_64_gnu" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ca2b8a661f7628cbd23440e50b05d705db3686f894fc9580820623656af974b1" + +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7896dbc1f41e08872e9d5e8f8baa8fdd2677f29468c4e156210174edc7f7b953" + +[[package]] +name = "windows_x86_64_msvc" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a515f5799fe4961cb532f983ce2b23082366b898e52ffbce459c86f67c8378a" diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..f509ca1 --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,10 @@ +[workspace] +members = ["crates/*"] +resolver = "2" + +[workspace.dependencies] +noworkers = { path = "crates/noworkers" } + +anyhow = { version = "1.0.71" } +tokio = { version = "1", features = ["full"] } +tracing = { version = "0.1", features = ["log"] } diff --git a/crates/noworkers/Cargo.toml b/crates/noworkers/Cargo.toml new file mode 100644 index 0000000..c3e383f --- /dev/null +++ b/crates/noworkers/Cargo.toml @@ -0,0 +1,10 @@ +[package] +name = "noworkers" +version = "0.1.0" +edition = "2021" + +[dependencies] +anyhow.workspace = true +tokio.workspace = true +tokio-util = "0.7.15" +tracing.workspace = true diff --git a/crates/noworkers/src/lib.rs b/crates/noworkers/src/lib.rs new file mode 100644 index 0000000..3ac2a26 --- /dev/null +++ b/crates/noworkers/src/lib.rs @@ -0,0 +1,598 @@ +use std::{future::Future, sync::Arc}; + +use tokio::{sync::Mutex, task::JoinHandle}; +use tokio_util::sync::CancellationToken; + +type ErrChan = Arc< + Mutex<( + Option>, + tokio::sync::oneshot::Receiver, + )>, +>; + +type JoinHandles = Arc>>>; + +#[derive(Clone)] +pub struct Workers { + limit: WorkerLimit, + + once: ErrChan, + cancellation: CancellationToken, + handles: JoinHandles, +} + +impl Default for Workers { + fn default() -> Self { + Self::new() + } +} + +#[derive(Default, Clone)] +enum WorkerLimit { + #[default] + NoLimit, + Amount { + queue: tokio::sync::mpsc::Sender<()>, + done: Arc>>, + }, +} + +impl WorkerLimit { + pub async fn queue_worker(&self) -> WorkerGuard { + match self { + WorkerLimit::NoLimit => {} + WorkerLimit::Amount { queue, .. } => { + // Queue work, if the channel is limited, we will block until there is enough room + queue + .send(()) + .await + .expect("tried to queue work on a closed worker channel"); + } + } + + WorkerGuard { + limit: self.clone(), + } + } +} + +pub struct WorkerGuard { + limit: WorkerLimit, +} + +impl Drop for WorkerGuard { + fn drop(&mut self) { + match &self.limit { + WorkerLimit::NoLimit => { /* no limit on dequeue */ } + WorkerLimit::Amount { done, .. } => { + let done = done.clone(); + tokio::spawn(async move { + let mut done = done.lock().await; + + // dequeue an item, leave room for the next + done.recv().await + }); + } + } + } +} + +impl Workers { + pub fn new() -> Self { + let once = tokio::sync::oneshot::channel(); + + Self { + once: Arc::new(Mutex::new((Some(once.0), once.1))), + limit: WorkerLimit::default(), + cancellation: CancellationToken::default(), + handles: Arc::default(), + } + } + + /// respects an external cancellation token, it is undefined behavior to use this with with_cancel_task, we will always only cancel a child token, i.e. never the external token + pub fn with_cancel(&mut self, cancel: &CancellationToken) -> &mut Self { + self.cancellation = cancel.child_token(); + self + } + + /// with_cancel and with_cancel_task used together is considered undefined behavior, as we will cancel the external cancellation token on cancel_task completion + pub fn with_cancel_task(&mut self, f: T) -> &mut Self + where + T: Future + Send + 'static, + { + let cancel = self.cancellation.clone(); + + tokio::spawn(async move { + f.await; + cancel.cancel(); + }); + self + } + + /// with_limit can be dangerous if used with an external cancel, because we still queue work after the cancel, it doesn't guarantee that everyone respects said channel, and closes down in a timely manner. Work will still be queued after the cancel, it is up to the provided worker function to respect when cancel is called + pub fn with_limit(&mut self, limit: usize) -> &mut Self { + let (tx, rx) = tokio::sync::mpsc::channel(limit); + + self.limit = WorkerLimit::Amount { + queue: tx, + done: Arc::new(Mutex::new(rx)), + }; + self + } + + /// Add + /// Note: Add is immediate, this means your future will be polled immediately, no matter if wait has been called or not. Wait is just for waiting for completion, as well as receiving errors + pub async fn add(&self, f: T) -> anyhow::Result<()> + where + T: FnOnce(CancellationToken) -> TFut + Send + 'static, + TFut: Future> + Send + 'static, + { + let s = self.clone(); + + let handle = tokio::spawn(async move { + let queue_guard = s.limit.queue_worker().await; + if let Err(err) = f(s.cancellation.child_token()).await { + if let Ok(mut erronce) = s.once.try_lock() { + if let Some(tx) = erronce.0.take() { + // oneshot works as a once channel, we don't care about subsequent errors + let _ = tx.send(err); + s.cancellation.cancel(); + } + } + } + + // ensure it survives the scope, it isn't required that we call it here though + drop(queue_guard) + }); + + { + let mut handles = self.handles.lock().await; + handles.push(handle); + } + + Ok(()) + } + + pub async fn wait(self) -> anyhow::Result<()> { + { + let mut handles = self.handles.lock().await; + + for handle in handles.iter_mut() { + handle.await?; + } + } + + self.cancellation.cancel(); + + { + let mut once = self.once.lock().await; + if let Ok(e) = once.1.try_recv() { + anyhow::bail!("{}", e) + } + } + + Ok(()) + } +} + +#[cfg(test)] +mod test { + use std::time::{Duration, SystemTime}; + + use crate::*; + + #[tokio::test] + async fn test_can_start_worker_group() -> anyhow::Result<()> { + let workers = Workers::new(); + + workers + .add(|_cancel| async move { + println!("starting worker"); + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + + println!("worker finished"); + Ok(()) + }) + .await?; + + workers.wait().await?; + + Ok(()) + } + + #[tokio::test] + async fn test_worker_can_return_error() -> anyhow::Result<()> { + let workers = Workers::new(); + + workers + .add(|_cancel| async move { + println!("starting worker"); + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + + println!("worker finished"); + anyhow::bail!("worker should fail") + }) + .await?; + + workers.wait().await.expect_err("Error: worker should fail"); + + Ok(()) + } + + #[tokio::test] + async fn test_group_waits() -> anyhow::Result<()> { + let workers = Workers::new(); + + workers + .add(|_cancel| async move { + println!("starting worker"); + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + + println!("worker finished"); + Ok(()) + }) + .await?; + + let (called_tx, called_rx) = tokio::sync::oneshot::channel(); + + workers + .add(|_cancel| async move { + println!("starting worker (immediate)"); + println!("worker finished (immediate)"); + println!("worker send finish (immediate)"); + let _ = called_tx.send(()); + + Ok(()) + }) + .await?; + + workers.wait().await?; + + called_rx.await.expect("to receive called"); + + Ok(()) + } + + #[tokio::test] + async fn test_group_waits_are_cancelled_on_error() -> anyhow::Result<()> { + let workers = Workers::new(); + + workers + .add(|_cancel| async move { + println!("starting worker"); + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + + println!("worker finished"); + Err(anyhow::anyhow!("expected error")) + }) + .await?; + + for i in 0..10 { + workers + .add(move |cancel| async move { + println!("starting worker (waits) id: {i}"); + + cancel.cancelled().await; + + println!("worker finished (waits) id: {i}"); + + Ok(()) + }) + .await?; + } + + workers.wait().await.expect_err("expected error"); + + Ok(()) + } + + #[tokio::test] + async fn test_are_concurrent() -> anyhow::Result<()> { + let workers = Workers::new(); + + let (initial_tx, initial_rx) = tokio::sync::oneshot::channel(); + let (reply_tx, reply_rx) = tokio::sync::oneshot::channel(); + let (ok_tx, ok_rx) = tokio::sync::oneshot::channel(); + + // Having two workers swap between tasks should illustrate that we're concurrent, and not just waiting in line for each component + + workers + .add(move |_cancel| async move { + println!("starting worker b"); + + println!("waiting for initial request"); + initial_rx.await?; + println!("sending reply"); + reply_tx.send(()).unwrap(); + + println!("worker finished"); + Err(anyhow::anyhow!("expected error")) + }) + .await?; + + workers + .add(move |_cancel| async move { + println!("starting worker a"); + + println!("sending initial"); + initial_tx.send(()).unwrap(); + println!("received reply"); + reply_rx.await?; + println!("sending ok"); + ok_tx.send(()).unwrap(); + + println!("worker finished"); + Err(anyhow::anyhow!("expected error")) + }) + .await?; + + workers.wait().await.expect_err("expected error"); + + ok_rx.await?; + + Ok(()) + } + + #[tokio::test] + async fn test_multiple_errors() -> anyhow::Result<()> { + let workers = Workers::new(); + + let now = std::time::SystemTime::now(); + for _ in 0..100 { + workers + .add(move |_cancel| async move { + println!("starting worker a"); + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + + println!("worker finished"); + Err(anyhow::anyhow!("unexpected")) + }) + .await?; + } + workers + .add(move |_cancel| async move { + println!("starting worker b"); + + println!("worker finished"); + Err(anyhow::anyhow!("expected error")) + }) + .await?; + + let err = workers.wait().await.unwrap_err(); + if !err.to_string().contains("expected error") { + panic!("'{err}' error it should not have been this one"); + } + + let after = now.elapsed()?; + println!( + "it took {} seconds and {} total ms, {} total nanos to add 100 workers", + after.as_secs(), + after.as_millis(), + after.as_nanos() + ); + + Ok(()) + } + + #[tokio::test] + async fn test_wait_is_optional() -> anyhow::Result<()> { + let workers = Workers::new(); + + let (done_tx, done_rx) = tokio::sync::oneshot::channel(); + + workers + .add(move |_cancel| async move { + println!("starting worker a"); + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + + done_tx.send(()).unwrap(); + + println!("worker finished"); + Ok(()) + }) + .await?; + + done_rx.await?; + + Ok(()) + } + + #[tokio::test] + async fn test_wait_is_optional_err() -> anyhow::Result<()> { + let workers = Workers::new(); + + let (done_tx, done_rx) = tokio::sync::oneshot::channel(); + + workers + .add(move |_cancel| async move { + println!("starting worker a"); + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + + done_tx.send(()).unwrap(); + + println!("worker finished"); + anyhow::bail!("expected failure"); + }) + .await?; + + done_rx.await?; + + workers + .wait() + .await + .expect_err("there should be an error here"); + + Ok(()) + } + + #[tokio::test] + async fn test_wait_called_twice() -> anyhow::Result<()> { + let workers = Workers::new(); + + workers + .add(move |_cancel| async move { + println!("starting worker"); + + println!("worker finished"); + Ok(()) + }) + .await?; + + workers.wait().await?; + + Ok(()) + } + + #[tokio::test] + async fn test_limits_work() -> anyhow::Result<()> { + let mut workers = Workers::new(); + + // sets how many tasks we can queue at once + workers.with_limit(40); + + let start = SystemTime::now(); + + for i in 0..1000 { + workers + .add(move |_cancel| async move { + println!( + "starting worker: {i}: {} millis", + start.elapsed().unwrap().as_millis() + ); + + tokio::time::sleep(Duration::from_millis(10)).await; + + // println!( + // "worker finished: {i}: {} millis", + // start.elapsed().unwrap().as_millis() + // ); + Ok(()) + }) + .await?; + } + + workers.wait().await?; + + assert!(start.elapsed().unwrap() > Duration::from_millis(100)); + + Ok(()) + } + + #[tokio::test] + async fn test_blocking() -> anyhow::Result<()> { + let mut workers = Workers::new(); + + // sets how many tasks we can queue at once + workers.with_limit(40); + + let start = SystemTime::now(); + + let lock = Arc::new(Mutex::new(())); + + for i in 0..1000 { + let lock = lock.clone(); + workers + .add(move |_cancel| async move { + // println!( + // "starting worker: {i}: {} millis", + // start.elapsed().unwrap().as_millis() + // ); + + let guard = lock.lock().await; + + println!( + "worker finished: {i}: {} millis", + start.elapsed().unwrap().as_millis() + ); + + drop(guard); + + Ok(()) + }) + .await?; + } + + workers.wait().await?; + + // we compile, and there is no deadlock + + Ok(()) + } + + #[tokio::test] + async fn test_cancellation() -> anyhow::Result<()> { + let mut workers = Workers::new(); + + let cancel = CancellationToken::new(); + + workers.with_cancel(&cancel.child_token()); + + // sets how many tasks we can queue at once + workers.with_limit(5); + + let start = SystemTime::now(); + + for i in 0..10 { + workers + .add(move |cancel| async move { + println!("worker: {i} waiting for cancellation"); + + cancel.cancelled().await; + + println!("worker: {i} received cancellation"); + + Ok(()) + }) + .await?; + } + + tokio::spawn(async move { + println!("queuing cancellation (waiting 300ms)"); + tokio::time::sleep(std::time::Duration::from_millis(300)).await; + println!("sending external cancellation"); + cancel.cancel(); + println!("cancellation sent"); + }); + + // No deadlocks + workers.wait().await?; + + assert!(start.elapsed().unwrap() >= Duration::from_millis(300)); + + Ok(()) + } + + #[tokio::test] + async fn test_cancellation_task() -> anyhow::Result<()> { + let mut workers = Workers::new(); + + workers.with_cancel_task(async move { + println!("queuing cancellation (waiting 300ms)"); + tokio::time::sleep(Duration::from_millis(300)).await; + println!("cancellation sent"); + }); + + let start = SystemTime::now(); + + for i in 0..10 { + workers + .add(move |cancel| async move { + println!("worker: {i} waiting for cancellation"); + + cancel.cancelled().await; + + println!("worker: {i} received cancellation"); + + Ok(()) + }) + .await?; + } + + // No deadlocks + workers.wait().await?; + + assert!(start.elapsed().unwrap() >= Duration::from_millis(300)); + + Ok(()) + } +} diff --git a/cuddle.yaml b/cuddle.yaml new file mode 100644 index 0000000..a62c8b8 --- /dev/null +++ b/cuddle.yaml @@ -0,0 +1,17 @@ +# yaml-language-server: $schema=https://git.front.kjuulh.io/kjuulh/cuddle/raw/branch/main/schemas/base.json + +base: "git@git.front.kjuulh.io:kjuulh/cuddle-rust-lib-plan.git" + +vars: + service: "noworkers" + registry: kasperhermansen + +please: + project: + owner: kjuulh + repository: "noworkers" + branch: main + settings: + api_url: "https://git.front.kjuulh.io" + actions: + rust: