feat: remove more partitions

Signed-off-by: kjuulh <contact@kjuulh.io>
This commit is contained in:
Kasper Juul Hermansen 2024-11-09 16:05:26 +01:00
parent 6dfd7d7011
commit ecc6d785e7
No known key found for this signature in database

View File

@ -28,7 +28,6 @@ pub struct ConsumerGroup {
pub struct InnerConsumerGroup {
pub id: ConsumerId,
topic: Topic,
partition_key: Option<PartitionKey>,
pub(crate) offset: TopicOffset,
pub(crate) cur_offset: TopicOffset,
@ -36,17 +35,9 @@ pub struct InnerConsumerGroup {
}
impl ConsumerGroup {
pub fn new(
id: impl Into<ConsumerId>,
topic: impl Into<Topic>,
partition_key: Option<impl Into<PartitionKey>>,
) -> Self {
pub fn new(id: impl Into<ConsumerId>, topic: impl Into<Topic>) -> Self {
Self {
inner: Arc::new(RwLock::new(InnerConsumerGroup::new(
id,
topic,
partition_key,
))),
inner: Arc::new(RwLock::new(InnerConsumerGroup::new(id, topic))),
}
}
@ -83,15 +74,10 @@ impl ConsumerGroup {
}
impl InnerConsumerGroup {
pub fn new(
id: impl Into<ConsumerId>,
topic: impl Into<Topic>,
partition_key: Option<impl Into<PartitionKey>>,
) -> Self {
pub fn new(id: impl Into<ConsumerId>, topic: impl Into<Topic>) -> Self {
Self {
id: id.into(),
topic: topic.into(),
partition_key: partition_key.map(|pk| pk.into()),
offset: 0,
cur_offset: 0,
members: BTreeMap::default(),
@ -181,10 +167,7 @@ impl Consumers {
let mut storage = self.storage.write().await;
if !storage.contains_key(&id) {
storage.insert(
id.clone(),
ConsumerGroup::new(&id, &topic, None::<PartitionKey>),
);
storage.insert(id.clone(), ConsumerGroup::new(&id, &topic));
}
let consumer_group = storage.get_mut(&id).unwrap();