From b6afcc700c81934edb7421a1333b9006a6e028fd Mon Sep 17 00:00:00 2001 From: kjuulh Date: Wed, 14 Aug 2024 14:47:19 +0200 Subject: [PATCH] feat: extract handler Signed-off-by: kjuulh --- crates/nodata/src/services.rs | 44 +-------------------------- crates/nodata/src/services/handler.rs | 38 +++++++++++++++++++++++ 2 files changed, 39 insertions(+), 43 deletions(-) create mode 100644 crates/nodata/src/services/handler.rs diff --git a/crates/nodata/src/services.rs b/crates/nodata/src/services.rs index aa927fb..5c74901 100644 --- a/crates/nodata/src/services.rs +++ b/crates/nodata/src/services.rs @@ -1,45 +1,3 @@ pub mod consumers; +pub mod handler; pub mod staging; -pub mod handler { - use std::sync::Arc; - - use super::{ - consumers::{Consumer, PartitionKey, Topic, TopicOffset}, - staging::Staging, - }; - - #[derive(Clone)] - pub struct Handler { - staging: Arc, - } - - impl Handler { - pub fn new(staging: Staging) -> Self { - Self { - staging: Arc::new(staging), - } - } - - pub fn handle_offset( - &self, - topic: &Topic, - partition_key: &Option, - consumer: &Consumer, - offset: TopicOffset, - ) -> anyhow::Result<()> { - let events = self.staging.get_topic_offset( - topic, - partition_key.clone(), - consumer.offset, - offset, - )?; - - // TODO: handle events - for event in events { - tracing::trace!("handling event: {:?}", event); - } - - Ok(()) - } - } -} diff --git a/crates/nodata/src/services/handler.rs b/crates/nodata/src/services/handler.rs new file mode 100644 index 0000000..269f1d4 --- /dev/null +++ b/crates/nodata/src/services/handler.rs @@ -0,0 +1,38 @@ +use std::sync::Arc; + +use super::{ + consumers::{Consumer, PartitionKey, Topic, TopicOffset}, + staging::Staging, +}; + +#[derive(Clone)] +pub struct Handler { + staging: Arc, +} + +impl Handler { + pub fn new(staging: Staging) -> Self { + Self { + staging: Arc::new(staging), + } + } + + pub fn handle_offset( + &self, + topic: &Topic, + partition_key: &Option, + consumer: &Consumer, + offset: TopicOffset, + ) -> anyhow::Result<()> { + let events = + self.staging + .get_topic_offset(topic, partition_key.clone(), consumer.offset, offset)?; + + // TODO: handle events + for event in events { + tracing::trace!("handling event: {:?}", event); + } + + Ok(()) + } +}