fix: offset be inclusive end

Signed-off-by: kjuulh <contact@kjuulh.io>
This commit is contained in:
2024-08-14 14:46:53 +02:00
parent 242dd2ce76
commit 101a266ea2
4 changed files with 102 additions and 6 deletions

View File

@@ -29,3 +29,6 @@ bytes = "1.7.1"
prost = "0.13.1"
prost-types = "0.13.1"
chrono = { version = "0.4.38", features = ["serde"] }
[dev-dependencies]
tracing-test = "0.2.5"

View File

@@ -144,6 +144,8 @@ impl ConsumersState for SharedState {
#[cfg(test)]
mod test {
use tracing_test::traced_test;
use crate::services::staging::{Staging, StagingEvent};
use super::*;
@@ -165,22 +167,25 @@ mod test {
}
#[tokio::test]
#[traced_test]
async fn can_notify_consumer() -> anyhow::Result<()> {
let consumer_id = "some-consumer-id".to_string();
let consumer_index = "some-consumer-index".to_string();
let topic = "some-topic".to_string();
let offset = 10usize;
let offset = 9usize;
let staging = Staging::default();
// Publish 10 messages
for _ in 0..10 {
staging
let offset = staging
.publish(StagingEvent {
topic: topic.clone(),
key: "".into(),
id: None,
})
.await?;
tracing::trace!("published offset: {}", offset);
}
let consumers = Consumers::new(Handler::new(staging));
@@ -191,7 +196,7 @@ mod test {
consumers.notify_update(&topic, None, offset)?;
let consumer = consumers.get_consumer(&consumer_id, &consumer_index);
assert_eq!(Some(Consumer { offset: 10 }), consumer);
assert_eq!(Some(Consumer { offset: 9 }), consumer);
Ok(())
}

View File

@@ -60,7 +60,7 @@ impl Staging {
}
None => {
part.insert(staging_event.key.to_owned(), vec![staging_event]);
1
0
}
},
None => {
@@ -74,7 +74,7 @@ impl Staging {
BTreeMap::from([(staging_event.key.to_owned(), vec![staging_event])]),
);
1
0
}
};
@@ -150,7 +150,7 @@ impl Staging {
)
}
Ok(partition[start..end].to_vec())
Ok(partition[start..=end].to_vec())
}
}