feat: add ingest layer

Signed-off-by: kjuulh <contact@kjuulh.io>
This commit is contained in:
Kasper Juul Hermansen 2024-08-14 14:59:26 +02:00
parent b6afcc700c
commit cce1400a34
Signed by: kjuulh
GPG Key ID: 9AA7BC13CE474394
4 changed files with 58 additions and 6 deletions

View File

@ -4,7 +4,7 @@ use anyhow::Context;
use mad::Component; use mad::Component;
use crate::{ use crate::{
services::staging::{StagingEvent, StagingState}, services::{ingest::IngestState, staging::StagingEvent},
state::SharedState, state::SharedState,
}; };
@ -41,7 +41,7 @@ impl no_data_service_server::NoDataService for GrpcServer {
"handling event" "handling event"
); );
self.state.staging().publish(req).await.map_err(|e| { self.state.ingest().publish(req).await.map_err(|e| {
tracing::warn!(error = e.to_string(), "failed to handle ingest of data"); tracing::warn!(error = e.to_string(), "failed to handle ingest of data");
tonic::Status::internal(e.to_string()) tonic::Status::internal(e.to_string())
})?; })?;

View File

@ -1,3 +1,53 @@
pub mod consumers; pub mod consumers;
pub mod handler; pub mod handler;
pub mod staging; pub mod staging;
pub mod ingest {
use crate::state::SharedState;
use super::{
consumers::{Consumer, Consumers},
staging::{Staging, StagingEvent},
};
#[derive(Clone)]
pub struct Ingest {
staging: Staging,
consumers: Consumers,
}
impl Ingest {
pub fn new(staging: Staging, consumer: Consumers) -> Self {
Self {
staging,
consumers: consumer,
}
}
pub async fn publish(&self, event: impl Into<StagingEvent>) -> anyhow::Result<()> {
let event: StagingEvent = event.into();
let topic = event.topic.clone();
let key = {
if event.key.is_empty() {
None
} else {
Some(event.key.clone())
}
};
let offset = self.staging.publish(event).await?;
self.consumers.notify_update(topic, key, offset)?;
Ok(())
}
}
pub trait IngestState {
fn ingest(&self) -> Ingest;
}
impl IngestState for SharedState {
fn ingest(&self) -> Ingest {
Ingest::new(self.staging.clone(), self.consumers.clone())
}
}
}

View File

@ -93,11 +93,12 @@ impl Consumers {
pub fn notify_update( pub fn notify_update(
&self, &self,
topic: impl Into<Topic>, topic: impl Into<Topic>,
key: Option<PartitionKey>, key: Option<impl Into<PartitionKey>>,
offset: impl Into<TopicOffset>, offset: impl Into<TopicOffset>,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let topic = topic.into(); let topic = topic.into();
let offset = offset.into(); let offset = offset.into();
let key = key.and_then(|k| Some(k.into()));
let subscriptions = self.subscriptions.read().unwrap(); let subscriptions = self.subscriptions.read().unwrap();
let subscription = match subscriptions.get(&topic) { let subscription = match subscriptions.get(&topic) {
@ -114,7 +115,8 @@ impl Consumers {
Some(consumer_groups) => { Some(consumer_groups) => {
for consumer in consumer_groups.values_mut() { for consumer in consumer_groups.values_mut() {
// TODO: Implement retry logic, etc. // TODO: Implement retry logic, etc.
self.handler.handle_offset(&topic, &key, consumer, offset)?; self.handler
.handle_offset(&topic, key.as_ref(), consumer, offset)?;
consumer.offset = offset; consumer.offset = offset;
} }
} }
@ -194,7 +196,7 @@ mod test {
let consumer = consumers.get_consumer(&consumer_id, &consumer_index); let consumer = consumers.get_consumer(&consumer_id, &consumer_index);
assert_eq!(Some(Consumer { offset: 0 }), consumer); assert_eq!(Some(Consumer { offset: 0 }), consumer);
consumers.notify_update(&topic, None, offset)?; consumers.notify_update(&topic, None::<String>, offset)?;
let consumer = consumers.get_consumer(&consumer_id, &consumer_index); let consumer = consumers.get_consumer(&consumer_id, &consumer_index);
assert_eq!(Some(Consumer { offset: 9 }), consumer); assert_eq!(Some(Consumer { offset: 9 }), consumer);

View File

@ -20,7 +20,7 @@ impl Handler {
pub fn handle_offset( pub fn handle_offset(
&self, &self,
topic: &Topic, topic: &Topic,
partition_key: &Option<PartitionKey>, partition_key: Option<&PartitionKey>,
consumer: &Consumer, consumer: &Consumer,
offset: TopicOffset, offset: TopicOffset,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {