diff --git a/Cargo.lock b/Cargo.lock index 4500689..e36d17b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -30,6 +30,15 @@ dependencies = [ "zerocopy", ] +[[package]] +name = "aho-corasick" +version = "1.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e60d3430d3a69478ad0993f19238d2df97c507009a52b3c10addcd7f6bcb916" +dependencies = [ + "memchr", +] + [[package]] name = "allocator-api2" version = "0.2.18" @@ -1020,6 +1029,15 @@ dependencies = [ "tracing", ] +[[package]] +name = "matchers" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8263075bb86c5a1b1427b5ae862e8889656f126e9f77c484496e8b47cf5c5558" +dependencies = [ + "regex-automata 0.1.10", +] + [[package]] name = "matchit" version = "0.7.3" @@ -1096,6 +1114,7 @@ dependencies = [ "tower-http", "tracing", "tracing-subscriber", + "tracing-test", "uuid", ] @@ -1409,6 +1428,50 @@ dependencies = [ "bitflags 2.6.0", ] +[[package]] +name = "regex" +version = "1.10.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4219d74c6b67a3654a9fbebc4b419e22126d13d2f3c4a07ee0cb61ff79a79619" +dependencies = [ + "aho-corasick", + "memchr", + "regex-automata 0.4.7", + "regex-syntax 0.8.4", +] + +[[package]] +name = "regex-automata" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132" +dependencies = [ + "regex-syntax 0.6.29", +] + +[[package]] +name = "regex-automata" +version = "0.4.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "38caf58cc5ef2fed281f89292ef23f6365465ed9a41b7a7754eb4e26496c92df" +dependencies = [ + "aho-corasick", + "memchr", + "regex-syntax 0.8.4", +] + +[[package]] +name = "regex-syntax" +version = "0.6.29" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f162c6dd7b008981e4d40210aca20b4bd0f9b60ca9271061b07f78537722f2e1" + +[[package]] +name = "regex-syntax" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a66a03ae7c801facd77a29370b4faec201768915ac14a721ba36f20bc9c209b" + [[package]] name = "ring" version = "0.17.8" @@ -2212,14 +2275,39 @@ version = "0.3.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ad0f048c97dbd9faa9b7df56362b8ebcaa52adb06b498c050d2f4e32f90a7a8b" dependencies = [ + "matchers", "nu-ansi-term", + "once_cell", + "regex", "sharded-slab", "smallvec", "thread_local", + "tracing", "tracing-core", "tracing-log", ] +[[package]] +name = "tracing-test" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "557b891436fe0d5e0e363427fc7f217abf9ccd510d5136549847bdcbcd011d68" +dependencies = [ + "tracing-core", + "tracing-subscriber", + "tracing-test-macro", +] + +[[package]] +name = "tracing-test-macro" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "04659ddb06c87d233c566112c1c9c5b9e98256d9af50ec3bc9c8327f873a7568" +dependencies = [ + "quote", + "syn 2.0.72", +] + [[package]] name = "try-lock" version = "0.2.5" diff --git a/crates/nodata/Cargo.toml b/crates/nodata/Cargo.toml index cba8d2f..f037736 100644 --- a/crates/nodata/Cargo.toml +++ b/crates/nodata/Cargo.toml @@ -29,3 +29,6 @@ bytes = "1.7.1" prost = "0.13.1" prost-types = "0.13.1" chrono = { version = "0.4.38", features = ["serde"] } + +[dev-dependencies] +tracing-test = "0.2.5" diff --git a/crates/nodata/src/services/consumers.rs b/crates/nodata/src/services/consumers.rs index 311374f..69a70f4 100644 --- a/crates/nodata/src/services/consumers.rs +++ b/crates/nodata/src/services/consumers.rs @@ -144,6 +144,8 @@ impl ConsumersState for SharedState { #[cfg(test)] mod test { + use tracing_test::traced_test; + use crate::services::staging::{Staging, StagingEvent}; use super::*; @@ -165,22 +167,25 @@ mod test { } #[tokio::test] + #[traced_test] async fn can_notify_consumer() -> anyhow::Result<()> { let consumer_id = "some-consumer-id".to_string(); let consumer_index = "some-consumer-index".to_string(); let topic = "some-topic".to_string(); - let offset = 10usize; + let offset = 9usize; let staging = Staging::default(); // Publish 10 messages for _ in 0..10 { - staging + let offset = staging .publish(StagingEvent { topic: topic.clone(), key: "".into(), id: None, }) .await?; + + tracing::trace!("published offset: {}", offset); } let consumers = Consumers::new(Handler::new(staging)); @@ -191,7 +196,7 @@ mod test { consumers.notify_update(&topic, None, offset)?; let consumer = consumers.get_consumer(&consumer_id, &consumer_index); - assert_eq!(Some(Consumer { offset: 10 }), consumer); + assert_eq!(Some(Consumer { offset: 9 }), consumer); Ok(()) } diff --git a/crates/nodata/src/services/staging.rs b/crates/nodata/src/services/staging.rs index 7816279..5ff6568 100644 --- a/crates/nodata/src/services/staging.rs +++ b/crates/nodata/src/services/staging.rs @@ -60,7 +60,7 @@ impl Staging { } None => { part.insert(staging_event.key.to_owned(), vec![staging_event]); - 1 + 0 } }, None => { @@ -74,7 +74,7 @@ impl Staging { BTreeMap::from([(staging_event.key.to_owned(), vec![staging_event])]), ); - 1 + 0 } }; @@ -150,7 +150,7 @@ impl Staging { ) } - Ok(partition[start..end].to_vec()) + Ok(partition[start..=end].to_vec()) } }