From 705da454975e5f576a32bd68791c446cbb4eb915 Mon Sep 17 00:00:00 2001 From: kjuulh Date: Sat, 17 Aug 2024 13:59:58 +0200 Subject: [PATCH] feat: do cleanup Signed-off-by: kjuulh --- README.md | 13 ++++++++ crates/nodata/src/broker.rs | 12 +++----- crates/nodata/src/grpc.rs | 7 +---- crates/nodata/src/http.rs | 2 +- crates/nodata/src/services/consumers.rs | 40 ++++++++----------------- crates/nodata/src/services/ingest.rs | 2 +- crates/nodata/src/services/staging.rs | 9 ++---- crates/nodata/src/state.rs | 6 ++-- 8 files changed, 37 insertions(+), 54 deletions(-) diff --git a/README.md b/README.md index 66c6847..ea63212 100644 --- a/README.md +++ b/README.md @@ -34,4 +34,17 @@ Data enteres nodata 3. Data is sent to a topic 4. A broadcast is sent that said topic was updated with a given offset 5. A client can consume from said topic, given a topic and id +// 6. We need a partition in here to separate handling between partitions and consumer groups 6. A queue is running consuming each broadcast message, assigning jobs for each consumer group to delegate messages + +## Components + +A component is a consumer on a set topic, it will either act as a source, sink or a tranformation between topics. It can declare topics, use topics, transform data and much more. + +A topic at its most basic is a computational unit implementing a certain interface, source, sink, transformation. + +The most simple is a source and sink, where we respectively push or pull data from the topics. + +### What does it look like + +As part of nodata, you'll be given nodata the cli. The cli can bootstrap a variety of components, diff --git a/crates/nodata/src/broker.rs b/crates/nodata/src/broker.rs index 666de67..27b3295 100644 --- a/crates/nodata/src/broker.rs +++ b/crates/nodata/src/broker.rs @@ -49,7 +49,7 @@ impl Component for Broker { #[async_trait] impl Drifter for Broker { async fn execute(&self, token: tokio_util::sync::CancellationToken) -> anyhow::Result<()> { - tracing::debug!("reconciling broker"); + tracing::trace!("reconciling broker"); // Execute listen to broker, and execute and register a potential drifter for each consumer group available @@ -57,7 +57,7 @@ impl Drifter for Broker { let consumer_groups = self.state.consumers().get_consumer_groups().await; let mut new_handlers = Vec::new(); - let mut delete_handlers: Vec = Vec::new(); + let delete_handlers: Vec = Vec::new(); for consumer_group in consumer_groups { if handlers.contains_key(&consumer_group.get_id().await) { //delete_handlers.push(consumer_group); @@ -87,13 +87,12 @@ impl Drifter for Broker { pub struct BrokerHandler { token: CancellationToken, - state: SharedState, } impl BrokerHandler { pub fn new( state: &SharedState, consumer_group: ConsumerGroup, - parent_token: CancellationToken, + _parent_token: CancellationToken, ) -> Self { let inner_state = state.clone(); let token = drift::schedule(Duration::from_secs(1), move || { @@ -118,10 +117,7 @@ impl BrokerHandler { } }); - Self { - state: state.clone(), - token, - } + Self { token } } } diff --git a/crates/nodata/src/grpc.rs b/crates/nodata/src/grpc.rs index 6248c26..bc7cb2a 100644 --- a/crates/nodata/src/grpc.rs +++ b/crates/nodata/src/grpc.rs @@ -7,12 +7,7 @@ use tonic::Response; use uuid::Uuid; use crate::{ - services::{ - consumers::{Consumer, ConsumersState}, - handler::HandlerState, - ingest::IngestState, - staging::StagingEvent, - }, + services::{consumers::ConsumersState, ingest::IngestState, staging::StagingEvent}, state::SharedState, }; diff --git a/crates/nodata/src/http.rs b/crates/nodata/src/http.rs index 9d8d987..5e7a354 100644 --- a/crates/nodata/src/http.rs +++ b/crates/nodata/src/http.rs @@ -29,7 +29,7 @@ impl HttpServer { #[async_trait] impl Component for HttpServer { - async fn run(&self, cancellation_token: CancellationToken) -> Result<(), mad::MadError> { + async fn run(&self, _cancellation_token: CancellationToken) -> Result<(), mad::MadError> { let app = Router::new() .route("/", get(root)) .with_state(self.state.clone()) diff --git a/crates/nodata/src/services/consumers.rs b/crates/nodata/src/services/consumers.rs index 2a03b15..6d2c937 100644 --- a/crates/nodata/src/services/consumers.rs +++ b/crates/nodata/src/services/consumers.rs @@ -1,13 +1,10 @@ -use std::{collections::BTreeMap, ops::Deref, sync::Arc}; +use std::{collections::BTreeMap, sync::Arc}; use tokio::sync::RwLock; use crate::state::SharedState; -use super::{ - handler::{Handler, HandlerState}, - staging::StagingEvent, -}; +use super::{handler::HandlerState, staging::StagingEvent}; pub type ConsumerId = String; pub type ConsumerIndex = String; @@ -71,6 +68,7 @@ impl ConsumerGroup { Ok(()) } + #[allow(dead_code)] pub async fn get_offset(&self) -> TopicOffset { let inner = self.inner.read().await; @@ -160,7 +158,6 @@ impl Consumer { pub struct Consumers { storage: Arc>>, subscriptions: Arc>>>, - handler: Handler, } // message arrives in queue @@ -169,11 +166,10 @@ pub struct Consumers { // active consumers now listen and receive events, if they process ok, the offset of a consumer will move ahead in the partition // so we keep a system of record of each consumer impl Consumers { - pub fn new(handler: Handler) -> Self { + pub fn new() -> Self { Self { storage: Arc::default(), subscriptions: Arc::default(), - handler, } } @@ -220,11 +216,8 @@ impl Consumers { Ok(Some(consumer)) } - pub async fn get_consumer_group( - &self, - id: impl Into, - index: impl Into, - ) -> Option { + #[allow(dead_code)] + pub async fn get_consumer_group(&self, id: impl Into) -> Option { let storage = self.storage.read().await; let consumer_group = storage.get(&id.into())?; @@ -245,7 +238,7 @@ impl Consumers { ) -> anyhow::Result<()> { let topic = topic.into(); let offset = offset.into(); - let key = key.map(|k| k.into()); + let _key = key.map(|k| k.into()); let subscriptions = self.subscriptions.read().await; let subscription = match subscriptions.get(&topic) { @@ -300,15 +293,12 @@ mod test { let consumer_index = "some-consumer-index"; let topic = "some-topic"; - let consumers = Consumers::new(Handler::new(Staging::default())); + let consumers = Consumers::new(); consumers .add_consumer(consumer_id, consumer_index, topic) .await?; - let consumer = consumers - .get_consumer_group(consumer_id, consumer_index) - .await - .unwrap(); + let consumer = consumers.get_consumer_group(consumer_id).await.unwrap(); assert_eq!(0, consumer.get_offset().await); @@ -339,24 +329,18 @@ mod test { tracing::trace!("published offset: {}", offset); } - let consumers = Consumers::new(Handler::new(staging)); + let consumers = Consumers::new(); consumers .add_consumer(&consumer_id, &consumer_index, &topic) .await?; - let consumer = consumers - .get_consumer_group(&consumer_id, &consumer_index) - .await - .unwrap(); + let consumer = consumers.get_consumer_group(&consumer_id).await.unwrap(); assert_eq!(0, consumer.get_offset().await); consumers .notify_update(&topic, None::, offset) .await?; - let consumer = consumers - .get_consumer_group(&consumer_id, &consumer_index) - .await - .unwrap(); + let consumer = consumers.get_consumer_group(&consumer_id).await.unwrap(); assert_eq!(9, consumer.get_offset().await); diff --git a/crates/nodata/src/services/ingest.rs b/crates/nodata/src/services/ingest.rs index 2169822..95bc90c 100644 --- a/crates/nodata/src/services/ingest.rs +++ b/crates/nodata/src/services/ingest.rs @@ -1,7 +1,7 @@ use crate::state::SharedState; use super::{ - consumers::{Consumer, Consumers}, + consumers::Consumers, staging::{Staging, StagingEvent}, }; diff --git a/crates/nodata/src/services/staging.rs b/crates/nodata/src/services/staging.rs index bd5a10b..ffd9613 100644 --- a/crates/nodata/src/services/staging.rs +++ b/crates/nodata/src/services/staging.rs @@ -4,7 +4,7 @@ use tokio::sync::RwLock; use crate::state::SharedState; -use super::consumers::{Consumers, PartitionKey, Topic, TopicOffset}; +use super::consumers::{PartitionKey, Topic, TopicOffset}; #[derive(Clone, Debug, PartialEq, Eq)] pub struct StagingEvent { @@ -23,12 +23,6 @@ pub struct Staging { } impl Staging { - pub fn new() -> Self { - Self { - store: Arc::default(), - } - } - pub async fn publish( &self, staging_event: impl Into, @@ -155,6 +149,7 @@ impl Staging { } } +#[allow(dead_code)] pub trait StagingState { fn staging(&self) -> Staging; } diff --git a/crates/nodata/src/state.rs b/crates/nodata/src/state.rs index ef655df..db63f9f 100644 --- a/crates/nodata/src/state.rs +++ b/crates/nodata/src/state.rs @@ -23,7 +23,7 @@ impl Deref for SharedState { } pub struct State { - pub db: Pool, + pub _db: Pool, pub staging: Staging, pub consumers: Consumers, pub handler: Handler, @@ -47,8 +47,8 @@ impl State { let handler = Handler::new(staging.clone()); Ok(Self { - db, - consumers: Consumers::new(handler.clone()), + _db: db, + consumers: Consumers::new(), staging, handler, })