From ecc6d785e7c71f9a53a15cc9f548010852da2106 Mon Sep 17 00:00:00 2001 From: kjuulh Date: Sat, 9 Nov 2024 16:05:26 +0100 Subject: [PATCH] feat: remove more partitions Signed-off-by: kjuulh --- crates/nodata/src/services/consumers.rs | 25 ++++--------------------- 1 file changed, 4 insertions(+), 21 deletions(-) diff --git a/crates/nodata/src/services/consumers.rs b/crates/nodata/src/services/consumers.rs index f0ddb76..608e73e 100644 --- a/crates/nodata/src/services/consumers.rs +++ b/crates/nodata/src/services/consumers.rs @@ -28,7 +28,6 @@ pub struct ConsumerGroup { pub struct InnerConsumerGroup { pub id: ConsumerId, topic: Topic, - partition_key: Option, pub(crate) offset: TopicOffset, pub(crate) cur_offset: TopicOffset, @@ -36,17 +35,9 @@ pub struct InnerConsumerGroup { } impl ConsumerGroup { - pub fn new( - id: impl Into, - topic: impl Into, - partition_key: Option>, - ) -> Self { + pub fn new(id: impl Into, topic: impl Into) -> Self { Self { - inner: Arc::new(RwLock::new(InnerConsumerGroup::new( - id, - topic, - partition_key, - ))), + inner: Arc::new(RwLock::new(InnerConsumerGroup::new(id, topic))), } } @@ -83,15 +74,10 @@ impl ConsumerGroup { } impl InnerConsumerGroup { - pub fn new( - id: impl Into, - topic: impl Into, - partition_key: Option>, - ) -> Self { + pub fn new(id: impl Into, topic: impl Into) -> Self { Self { id: id.into(), topic: topic.into(), - partition_key: partition_key.map(|pk| pk.into()), offset: 0, cur_offset: 0, members: BTreeMap::default(), @@ -181,10 +167,7 @@ impl Consumers { let mut storage = self.storage.write().await; if !storage.contains_key(&id) { - storage.insert( - id.clone(), - ConsumerGroup::new(&id, &topic, None::), - ); + storage.insert(id.clone(), ConsumerGroup::new(&id, &topic)); } let consumer_group = storage.get_mut(&id).unwrap();