From cce1400a34616c4b8a0724a985725a71ff439ceb Mon Sep 17 00:00:00 2001 From: kjuulh Date: Wed, 14 Aug 2024 14:59:26 +0200 Subject: [PATCH] feat: add ingest layer Signed-off-by: kjuulh --- crates/nodata/src/grpc.rs | 4 +- crates/nodata/src/services.rs | 50 +++++++++++++++++++++++++ crates/nodata/src/services/consumers.rs | 8 ++-- crates/nodata/src/services/handler.rs | 2 +- 4 files changed, 58 insertions(+), 6 deletions(-) diff --git a/crates/nodata/src/grpc.rs b/crates/nodata/src/grpc.rs index 9727710..11ec0c3 100644 --- a/crates/nodata/src/grpc.rs +++ b/crates/nodata/src/grpc.rs @@ -4,7 +4,7 @@ use anyhow::Context; use mad::Component; use crate::{ - services::staging::{StagingEvent, StagingState}, + services::{ingest::IngestState, staging::StagingEvent}, state::SharedState, }; @@ -41,7 +41,7 @@ impl no_data_service_server::NoDataService for GrpcServer { "handling event" ); - self.state.staging().publish(req).await.map_err(|e| { + self.state.ingest().publish(req).await.map_err(|e| { tracing::warn!(error = e.to_string(), "failed to handle ingest of data"); tonic::Status::internal(e.to_string()) })?; diff --git a/crates/nodata/src/services.rs b/crates/nodata/src/services.rs index 5c74901..2379cf5 100644 --- a/crates/nodata/src/services.rs +++ b/crates/nodata/src/services.rs @@ -1,3 +1,53 @@ pub mod consumers; pub mod handler; pub mod staging; +pub mod ingest { + use crate::state::SharedState; + + use super::{ + consumers::{Consumer, Consumers}, + staging::{Staging, StagingEvent}, + }; + + #[derive(Clone)] + pub struct Ingest { + staging: Staging, + consumers: Consumers, + } + + impl Ingest { + pub fn new(staging: Staging, consumer: Consumers) -> Self { + Self { + staging, + consumers: consumer, + } + } + + pub async fn publish(&self, event: impl Into) -> anyhow::Result<()> { + let event: StagingEvent = event.into(); + let topic = event.topic.clone(); + let key = { + if event.key.is_empty() { + None + } else { + Some(event.key.clone()) + } + }; + + let offset = self.staging.publish(event).await?; + self.consumers.notify_update(topic, key, offset)?; + + Ok(()) + } + } + + pub trait IngestState { + fn ingest(&self) -> Ingest; + } + + impl IngestState for SharedState { + fn ingest(&self) -> Ingest { + Ingest::new(self.staging.clone(), self.consumers.clone()) + } + } +} diff --git a/crates/nodata/src/services/consumers.rs b/crates/nodata/src/services/consumers.rs index 69a70f4..1a3cb26 100644 --- a/crates/nodata/src/services/consumers.rs +++ b/crates/nodata/src/services/consumers.rs @@ -93,11 +93,12 @@ impl Consumers { pub fn notify_update( &self, topic: impl Into, - key: Option, + key: Option>, offset: impl Into, ) -> anyhow::Result<()> { let topic = topic.into(); let offset = offset.into(); + let key = key.and_then(|k| Some(k.into())); let subscriptions = self.subscriptions.read().unwrap(); let subscription = match subscriptions.get(&topic) { @@ -114,7 +115,8 @@ impl Consumers { Some(consumer_groups) => { for consumer in consumer_groups.values_mut() { // TODO: Implement retry logic, etc. - self.handler.handle_offset(&topic, &key, consumer, offset)?; + self.handler + .handle_offset(&topic, key.as_ref(), consumer, offset)?; consumer.offset = offset; } } @@ -194,7 +196,7 @@ mod test { let consumer = consumers.get_consumer(&consumer_id, &consumer_index); assert_eq!(Some(Consumer { offset: 0 }), consumer); - consumers.notify_update(&topic, None, offset)?; + consumers.notify_update(&topic, None::, offset)?; let consumer = consumers.get_consumer(&consumer_id, &consumer_index); assert_eq!(Some(Consumer { offset: 9 }), consumer); diff --git a/crates/nodata/src/services/handler.rs b/crates/nodata/src/services/handler.rs index 269f1d4..94f948b 100644 --- a/crates/nodata/src/services/handler.rs +++ b/crates/nodata/src/services/handler.rs @@ -20,7 +20,7 @@ impl Handler { pub fn handle_offset( &self, topic: &Topic, - partition_key: &Option, + partition_key: Option<&PartitionKey>, consumer: &Consumer, offset: TopicOffset, ) -> anyhow::Result<()> {