diff --git a/Cargo.lock b/Cargo.lock index 44efd02..7bbb31d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -480,6 +480,19 @@ version = "0.15.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1aaf95b3e5c8f23aa320147307562d361db0ae0d51242340f558153b4eb2439b" +[[package]] +name = "drift" +version = "0.2.0" +source = "git+https://github.com/kjuulh/drift?branch=main#fb4bd51d7106bf2ebffa851dbce172c06ed12ff7" +dependencies = [ + "anyhow", + "async-trait", + "thiserror", + "tokio", + "tokio-util", + "tracing", +] + [[package]] name = "either" version = "1.13.0" @@ -973,9 +986,9 @@ dependencies = [ [[package]] name = "libc" -version = "0.2.155" +version = "0.2.156" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "97b3888a4aecf77e811145cadf6eef5901f4782c53886191b2f693f24761847c" +checksum = "a5f43f184355eefb8d17fc948dbecf6c13be3c141f20d834ae842193a448c72a" [[package]] name = "libm" @@ -1106,6 +1119,7 @@ dependencies = [ "chrono", "clap", "dotenv", + "drift", "mad", "prost", "prost-types", diff --git a/Cargo.toml b/Cargo.toml index b0b0bfa..9cd866f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,7 +3,6 @@ members = ["crates/*"] resolver = "2" [workspace.dependencies] -nodata = { path = "crates/nodata" } anyhow = { version = "1" } tokio = { version = "1", features = ["full"] } @@ -12,3 +11,4 @@ tracing-subscriber = { version = "0.3.18" } clap = { version = "4", features = ["derive", "env"] } dotenv = { version = "0.15" } axum = { version = "0.7" } +drift = { git = "https://github.com/kjuulh/drift", branch = "main" } diff --git a/README.md b/README.md index 683dcdf..66c6847 100644 --- a/README.md +++ b/README.md @@ -22,3 +22,16 @@ Nodata accepts wasm routines for running aggregations over data to be processed ## Data Egress Nodata exposes aggregations as apis, or events to be sent as grpc streamed apis to a service. + +# Architecture + +## Data flow + +Data enteres nodata + +1. Application uses SDK to publish data +2. Data is sent over grpc using, a topic, id and data +3. Data is sent to a topic +4. A broadcast is sent that said topic was updated with a given offset +5. A client can consume from said topic, given a topic and id +6. A queue is running consuming each broadcast message, assigning jobs for each consumer group to delegate messages diff --git a/crates/nodata/Cargo.toml b/crates/nodata/Cargo.toml index e3449bb..fe1b564 100644 --- a/crates/nodata/Cargo.toml +++ b/crates/nodata/Cargo.toml @@ -11,6 +11,7 @@ tracing-subscriber.workspace = true clap.workspace = true dotenv.workspace = true axum.workspace = true +drift.workspace = true serde = { version = "1.0.197", features = ["derive"] } sqlx = { version = "0.7.3", features = [ diff --git a/crates/nodata/src/broker.rs b/crates/nodata/src/broker.rs new file mode 100644 index 0000000..666de67 --- /dev/null +++ b/crates/nodata/src/broker.rs @@ -0,0 +1,132 @@ +use std::{collections::BTreeMap, sync::Arc, time::Duration}; + +use axum::async_trait; +use drift::Drifter; +use mad::Component; +use tokio::sync::RwLock; +use tokio_util::sync::CancellationToken; + +use crate::{ + services::consumers::{ConsumerGroup, ConsumerId, ConsumersState}, + state::SharedState, +}; + +#[derive(Clone)] +pub struct Broker { + state: SharedState, + + handlers: Arc>>, +} + +impl Broker { + pub fn new(state: &SharedState) -> Self { + Self { + state: state.clone(), + handlers: Arc::default(), + } + } +} + +#[async_trait] +impl Component for Broker { + async fn run( + &self, + cancellation_token: tokio_util::sync::CancellationToken, + ) -> Result<(), mad::MadError> { + let token = drift::schedule_drifter(Duration::from_secs(1), self.clone()); + + tokio::select! { + _ = token.cancelled() => {}, + _ = cancellation_token.cancelled() => { + token.cancel() + }, + } + + Ok(()) + } +} + +#[async_trait] +impl Drifter for Broker { + async fn execute(&self, token: tokio_util::sync::CancellationToken) -> anyhow::Result<()> { + tracing::debug!("reconciling broker"); + + // Execute listen to broker, and execute and register a potential drifter for each consumer group available + + let mut handlers = self.handlers.write().await; + let consumer_groups = self.state.consumers().get_consumer_groups().await; + + let mut new_handlers = Vec::new(); + let mut delete_handlers: Vec = Vec::new(); + for consumer_group in consumer_groups { + if handlers.contains_key(&consumer_group.get_id().await) { + //delete_handlers.push(consumer_group); + } else { + new_handlers.push(consumer_group); + } + } + + for new_handler in new_handlers { + let consumer_id = new_handler.get_id().await; + tracing::debug!(consumer_id = consumer_id, "creating new handler"); + handlers.insert( + new_handler.get_id().await, + BrokerHandler::new(&self.state, new_handler, token.child_token()), + ); + } + + for delete_handler in delete_handlers { + let consumer_id = delete_handler.get_id().await; + tracing::debug!(consumer_id = consumer_id, "deleting consumer"); + handlers.remove(&delete_handler.get_id().await); + } + + Ok(()) + } +} + +pub struct BrokerHandler { + token: CancellationToken, + state: SharedState, +} +impl BrokerHandler { + pub fn new( + state: &SharedState, + consumer_group: ConsumerGroup, + parent_token: CancellationToken, + ) -> Self { + let inner_state = state.clone(); + let token = drift::schedule(Duration::from_secs(1), move || { + let consumer_group = consumer_group.clone(); + let state = inner_state.clone(); + + async move { + let consumer_group = consumer_group; + + if let Err(e) = consumer_group.reconcile_lag(&state).await { + tracing::warn!( + error = e.to_string(), + "failed to reconcile lag for consumer_group" + ) + } + + // Look at offset diffs between, current and lag + // if diff, pick a member + // execute update for member + + Ok(()) + } + }); + + Self { + state: state.clone(), + token, + } + } +} + +impl Drop for BrokerHandler { + fn drop(&mut self) { + self.token.cancel(); + } +} diff --git a/crates/nodata/src/grpc.rs b/crates/nodata/src/grpc.rs index e4a1449..6248c26 100644 --- a/crates/nodata/src/grpc.rs +++ b/crates/nodata/src/grpc.rs @@ -8,7 +8,9 @@ use uuid::Uuid; use crate::{ services::{ - consumers::ConsumersState, handler::HandlerState, ingest::IngestState, + consumers::{Consumer, ConsumersState}, + handler::HandlerState, + ingest::IngestState, staging::StagingEvent, }, state::SharedState, @@ -95,23 +97,17 @@ impl no_data_service_server::NoDataService for GrpcServer { let id = Uuid::new_v4().to_string(); let index = Uuid::new_v4().to_string(); - self.state + let consumer = self + .state .consumers() .add_consumer(&id, &index, req.topic) .await .map_err(|e| { tracing::warn!(error = e.to_string(), "failed to add consumer"); tonic::Status::internal(e.to_string()) - })?; - - let consumer = self - .state - .consumers - .get_consumer(&id, &index) - .await + })? .unwrap(); - let handler = self.state.handler(); let (tx, rx) = tokio::sync::mpsc::channel(128); tokio::spawn(async move { let mut event_stream = consumer.rx.lock().await; diff --git a/crates/nodata/src/main.rs b/crates/nodata/src/main.rs index ed18bf9..21bb649 100644 --- a/crates/nodata/src/main.rs +++ b/crates/nodata/src/main.rs @@ -1,3 +1,4 @@ +mod broker; mod grpc; mod http; mod state; @@ -6,6 +7,7 @@ mod services; use std::net::SocketAddr; +use broker::Broker; use chrono::{Datelike, Timelike}; use clap::{Parser, Subcommand}; use grpc::{GetKeysRequest, GetTopicsRequest, GrpcServer, PublishEventRequest, SubscribeRequest}; @@ -32,12 +34,12 @@ enum Commands { Client { // #[arg(env = "SERVICE_HOST", long, default_value = "http://127.0.0.1:3000")] - // host: String, - // #[arg( - // env = "GRPC_SERVICE_HOST", - // long, - // default_value = "http://127.0.0.1:7900" - // )] + //host: String, + #[arg( + env = "GRPC_SERVICE_HOST", + long, + default_value = "http://127.0.0.1:7900" + )] grpc_host: String, #[command(subcommand)] @@ -83,6 +85,7 @@ async fn main() -> anyhow::Result<()> { tracing::info!("Starting service"); let state = SharedState::new().await?; Mad::builder() + .add(Broker::new(&state)) .add(HttpServer::new(&state, host)) .add(GrpcServer::new(&state, grpc_host)) .run() diff --git a/crates/nodata/src/services.rs b/crates/nodata/src/services.rs index 997d5ec..5368bb5 100644 --- a/crates/nodata/src/services.rs +++ b/crates/nodata/src/services.rs @@ -1,50 +1,4 @@ pub mod consumers; pub mod handler; +pub mod ingest; pub mod staging; -pub mod ingest { - use crate::state::SharedState; - - use super::{ - consumers::{Consumer, Consumers}, - staging::{Staging, StagingEvent}, - }; - - #[derive(Clone)] - pub struct Ingest { - staging: Staging, - consumers: Consumers, - } - - impl Ingest { - pub fn new(staging: Staging, consumers: Consumers) -> Self { - Self { staging, consumers } - } - - pub async fn publish(&self, event: impl Into) -> anyhow::Result<()> { - let event: StagingEvent = event.into(); - let topic = event.topic.clone(); - let key = { - if event.key.is_empty() { - None - } else { - Some(event.key.clone()) - } - }; - - let offset = self.staging.publish(event).await?; - self.consumers.notify_update(topic, key, offset).await?; - - Ok(()) - } - } - - pub trait IngestState { - fn ingest(&self) -> Ingest; - } - - impl IngestState for SharedState { - fn ingest(&self) -> Ingest { - Ingest::new(self.staging.clone(), self.consumers.clone()) - } - } -} diff --git a/crates/nodata/src/services/consumers.rs b/crates/nodata/src/services/consumers.rs index 3b783c2..2a03b15 100644 --- a/crates/nodata/src/services/consumers.rs +++ b/crates/nodata/src/services/consumers.rs @@ -1,10 +1,13 @@ -use std::{collections::BTreeMap, sync::Arc}; +use std::{collections::BTreeMap, ops::Deref, sync::Arc}; use tokio::sync::RwLock; use crate::state::SharedState; -use super::{handler::Handler, staging::StagingEvent}; +use super::{ + handler::{Handler, HandlerState}, + staging::StagingEvent, +}; pub type ConsumerId = String; pub type ConsumerIndex = String; @@ -15,39 +18,147 @@ pub type TopicOffset = usize; // Consumer will at some point contain an interface to send events over a channel to a lib #[derive(Debug, Clone)] pub struct Consumer { - pub offset: TopicOffset, - pub tx: tokio::sync::mpsc::Sender, pub rx: Arc>>, } +#[derive(Default, Clone, Debug)] +pub struct ConsumerGroup { + inner: Arc>, +} + +#[derive(Default, Debug)] +pub struct InnerConsumerGroup { + pub id: ConsumerId, + topic: Topic, + partition_key: Option, + pub(crate) offset: TopicOffset, + pub(crate) cur_offset: TopicOffset, + + members: BTreeMap, +} + +impl ConsumerGroup { + pub fn new( + id: impl Into, + topic: impl Into, + partition_key: Option>, + ) -> Self { + Self { + inner: Arc::new(RwLock::new(InnerConsumerGroup::new( + id, + topic, + partition_key, + ))), + } + } + + pub async fn add_consumer(&self, index: ConsumerIndex, consumer: Consumer) { + let mut inner = self.inner.write().await; + + inner.add_consumer(index, consumer); + } + + pub async fn update_offset(&self, offset: TopicOffset) { + let mut inner = self.inner.write().await; + inner.update_offset(offset); + } + + pub async fn reconcile_lag(&self, state: &SharedState) -> anyhow::Result<()> { + let mut inner = self.inner.write().await; + inner.reconcile_lag(state).await?; + + Ok(()) + } + + pub async fn get_offset(&self) -> TopicOffset { + let inner = self.inner.read().await; + + inner.offset + } + + pub async fn get_id(&self) -> ConsumerId { + let inner = self.inner.read().await; + + inner.id.clone() + } +} + +impl InnerConsumerGroup { + pub fn new( + id: impl Into, + topic: impl Into, + partition_key: Option>, + ) -> Self { + Self { + id: id.into(), + topic: topic.into(), + partition_key: partition_key.map(|pk| pk.into()), + offset: 0, + cur_offset: 0, + members: BTreeMap::default(), + } + } + + pub fn add_consumer(&mut self, index: ConsumerIndex, consumer: Consumer) { + if self.members.insert(index.to_owned(), consumer).is_some() { + tracing::warn!(index = index, "consumer replaced"); + } + } + + pub fn update_offset(&mut self, offset: TopicOffset) { + self.offset = offset; + } + + pub async fn reconcile_lag(&mut self, state: &SharedState) -> anyhow::Result<()> { + if self.offset <= self.cur_offset { + return Ok(()); + } + + // TODO: replace with round robin or something; + let consumer = match self.members.first_key_value() { + Some((_, consumer)) => consumer, + None => return Ok(()), + }; + + tracing::debug!(consumer_id = self.id, "sending update for consumer"); + state + .handler() + .handle_offset( + &self.topic, + self.partition_key.as_ref(), + consumer, + self.cur_offset, + self.offset, + ) + .await?; + + self.cur_offset = self.offset; + + Ok(()) + } +} + impl Default for Consumer { fn default() -> Self { - Self::new(0) + Self::new() } } impl Consumer { - pub fn new(offset: usize) -> Self { + pub fn new() -> Self { let (tx, rx) = tokio::sync::mpsc::channel(128); Self { - offset, tx, rx: Arc::new(tokio::sync::Mutex::new(rx)), } } } -impl PartialEq for Consumer { - fn eq(&self, other: &Self) -> bool { - self.offset == other.offset - } -} - #[derive(Clone)] pub struct Consumers { - storage: Arc>>>, + storage: Arc>>, subscriptions: Arc>>>, handler: Handler, } @@ -71,23 +182,28 @@ impl Consumers { id: impl Into, index: impl Into, topic: impl Into, - ) -> anyhow::Result<()> { + ) -> anyhow::Result> { let id = id.into(); let index = index.into(); let topic = topic.into(); - { + let consumer = { let mut storage = self.storage.write().await; if !storage.contains_key(&id) { - storage.insert(id.clone(), BTreeMap::default()); + storage.insert( + id.clone(), + ConsumerGroup::new(&id, &topic, None::), + ); } let consumer_group = storage.get_mut(&id).unwrap(); - if !consumer_group.contains_key(&index) { - consumer_group.insert(index.clone(), Consumer::default()); - } - } + let consumer = Consumer::default(); + consumer_group + .add_consumer(index.clone(), consumer.clone()) + .await; + consumer + }; { let mut subscriptions = self.subscriptions.write().await; @@ -101,19 +217,24 @@ impl Consumers { } } - Ok(()) + Ok(Some(consumer)) } - pub async fn get_consumer( + pub async fn get_consumer_group( &self, id: impl Into, index: impl Into, - ) -> Option { + ) -> Option { let storage = self.storage.read().await; let consumer_group = storage.get(&id.into())?; - let consumer = consumer_group.get(&index.into())?; - Some(consumer.to_owned()) + Some(consumer_group.to_owned()) + } + + pub async fn get_consumer_groups(&self) -> Vec { + let storage = self.storage.read().await; + + storage.iter().map(|(_, v)| v).cloned().collect() } pub async fn notify_update( @@ -139,13 +260,7 @@ impl Consumers { for consumer_id in subscription { match storage.get_mut(consumer_id) { Some(consumer_groups) => { - for consumer in consumer_groups.values_mut() { - // TODO: Implement retry logic, etc. - self.handler - .handle_offset(&topic, key.as_ref(), consumer, offset) - .await?; - consumer.offset = offset; - } + consumer_groups.update_offset(offset).await; } None => { tracing::trace!( @@ -190,9 +305,12 @@ mod test { consumers .add_consumer(consumer_id, consumer_index, topic) .await?; - let consumer = consumers.get_consumer(consumer_id, consumer_index).await; + let consumer = consumers + .get_consumer_group(consumer_id, consumer_index) + .await + .unwrap(); - assert_eq!(Some(Consumer::default()), consumer); + assert_eq!(0, consumer.get_offset().await); Ok(()) } @@ -226,14 +344,21 @@ mod test { consumers .add_consumer(&consumer_id, &consumer_index, &topic) .await?; - let consumer = consumers.get_consumer(&consumer_id, &consumer_index).await; - assert_eq!(Some(Consumer::default()), consumer); + let consumer = consumers + .get_consumer_group(&consumer_id, &consumer_index) + .await + .unwrap(); + assert_eq!(0, consumer.get_offset().await); consumers .notify_update(&topic, None::, offset) .await?; - let consumer = consumers.get_consumer(&consumer_id, &consumer_index).await; - assert_eq!(Some(Consumer::new(9)), consumer); + let consumer = consumers + .get_consumer_group(&consumer_id, &consumer_index) + .await + .unwrap(); + + assert_eq!(9, consumer.get_offset().await); Ok(()) } diff --git a/crates/nodata/src/services/handler.rs b/crates/nodata/src/services/handler.rs index 6c1f28a..3ede37d 100644 --- a/crates/nodata/src/services/handler.rs +++ b/crates/nodata/src/services/handler.rs @@ -20,11 +20,12 @@ impl Handler { topic: &Topic, partition_key: Option<&PartitionKey>, consumer: &Consumer, - offset: TopicOffset, + start_offset: TopicOffset, + end_offset: TopicOffset, ) -> anyhow::Result<()> { let events = self .staging - .get_topic_offset(topic, partition_key, consumer.offset, offset) + .get_topic_offset(topic, partition_key, start_offset, end_offset) .await?; // TODO: handle events diff --git a/crates/nodata/src/services/ingest.rs b/crates/nodata/src/services/ingest.rs new file mode 100644 index 0000000..2169822 --- /dev/null +++ b/crates/nodata/src/services/ingest.rs @@ -0,0 +1,45 @@ +use crate::state::SharedState; + +use super::{ + consumers::{Consumer, Consumers}, + staging::{Staging, StagingEvent}, +}; + +#[derive(Clone)] +pub struct Ingest { + staging: Staging, + consumers: Consumers, +} + +impl Ingest { + pub fn new(staging: Staging, consumers: Consumers) -> Self { + Self { staging, consumers } + } + + pub async fn publish(&self, event: impl Into) -> anyhow::Result<()> { + let event: StagingEvent = event.into(); + let topic = event.topic.clone(); + let key = { + if event.key.is_empty() { + None + } else { + Some(event.key.clone()) + } + }; + + let offset = self.staging.publish(event).await?; + self.consumers.notify_update(topic, key, offset).await?; + + Ok(()) + } +} + +pub trait IngestState { + fn ingest(&self) -> Ingest; +} + +impl IngestState for SharedState { + fn ingest(&self) -> Ingest { + Ingest::new(self.staging.clone(), self.consumers.clone()) + } +}