feat: extract handler
Signed-off-by: kjuulh <contact@kjuulh.io>
This commit is contained in:
parent
101a266ea2
commit
b6afcc700c
@ -1,45 +1,3 @@
|
|||||||
pub mod consumers;
|
pub mod consumers;
|
||||||
|
pub mod handler;
|
||||||
pub mod staging;
|
pub mod staging;
|
||||||
pub mod handler {
|
|
||||||
use std::sync::Arc;
|
|
||||||
|
|
||||||
use super::{
|
|
||||||
consumers::{Consumer, PartitionKey, Topic, TopicOffset},
|
|
||||||
staging::Staging,
|
|
||||||
};
|
|
||||||
|
|
||||||
#[derive(Clone)]
|
|
||||||
pub struct Handler {
|
|
||||||
staging: Arc<Staging>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Handler {
|
|
||||||
pub fn new(staging: Staging) -> Self {
|
|
||||||
Self {
|
|
||||||
staging: Arc::new(staging),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn handle_offset(
|
|
||||||
&self,
|
|
||||||
topic: &Topic,
|
|
||||||
partition_key: &Option<PartitionKey>,
|
|
||||||
consumer: &Consumer,
|
|
||||||
offset: TopicOffset,
|
|
||||||
) -> anyhow::Result<()> {
|
|
||||||
let events = self.staging.get_topic_offset(
|
|
||||||
topic,
|
|
||||||
partition_key.clone(),
|
|
||||||
consumer.offset,
|
|
||||||
offset,
|
|
||||||
)?;
|
|
||||||
|
|
||||||
// TODO: handle events
|
|
||||||
for event in events {
|
|
||||||
tracing::trace!("handling event: {:?}", event);
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
38
crates/nodata/src/services/handler.rs
Normal file
38
crates/nodata/src/services/handler.rs
Normal file
@ -0,0 +1,38 @@
|
|||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use super::{
|
||||||
|
consumers::{Consumer, PartitionKey, Topic, TopicOffset},
|
||||||
|
staging::Staging,
|
||||||
|
};
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct Handler {
|
||||||
|
staging: Arc<Staging>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Handler {
|
||||||
|
pub fn new(staging: Staging) -> Self {
|
||||||
|
Self {
|
||||||
|
staging: Arc::new(staging),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn handle_offset(
|
||||||
|
&self,
|
||||||
|
topic: &Topic,
|
||||||
|
partition_key: &Option<PartitionKey>,
|
||||||
|
consumer: &Consumer,
|
||||||
|
offset: TopicOffset,
|
||||||
|
) -> anyhow::Result<()> {
|
||||||
|
let events =
|
||||||
|
self.staging
|
||||||
|
.get_topic_offset(topic, partition_key.clone(), consumer.offset, offset)?;
|
||||||
|
|
||||||
|
// TODO: handle events
|
||||||
|
for event in events {
|
||||||
|
tracing::trace!("handling event: {:?}", event);
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user