From d0ea8019e1ac2daf9718a20a4a17e1d33c485dd4 Mon Sep 17 00:00:00 2001 From: kjuulh Date: Fri, 16 Aug 2024 00:25:43 +0200 Subject: [PATCH] feat: add broker setup this is mainly to decouple the actual sending of events, from the ingest. we now ingest the data, and update consumer groups with the new offset. Consumer groups now in the background continously send out data from the update. they tick 1 second between checks, but if something takes long than a second, the next run just continues from where we left off Signed-off-by: kjuulh --- Cargo.lock | 18 ++- Cargo.toml | 2 +- README.md | 13 ++ crates/nodata/Cargo.toml | 1 + crates/nodata/src/broker.rs | 132 +++++++++++++++ crates/nodata/src/grpc.rs | 16 +- crates/nodata/src/main.rs | 15 +- crates/nodata/src/services.rs | 48 +----- crates/nodata/src/services/consumers.rs | 203 +++++++++++++++++++----- crates/nodata/src/services/handler.rs | 5 +- crates/nodata/src/services/ingest.rs | 45 ++++++ 11 files changed, 391 insertions(+), 107 deletions(-) create mode 100644 crates/nodata/src/broker.rs create mode 100644 crates/nodata/src/services/ingest.rs diff --git a/Cargo.lock b/Cargo.lock index 44efd02..7bbb31d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -480,6 +480,19 @@ version = "0.15.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1aaf95b3e5c8f23aa320147307562d361db0ae0d51242340f558153b4eb2439b" +[[package]] +name = "drift" +version = "0.2.0" +source = "git+https://github.com/kjuulh/drift?branch=main#fb4bd51d7106bf2ebffa851dbce172c06ed12ff7" +dependencies = [ + "anyhow", + "async-trait", + "thiserror", + "tokio", + "tokio-util", + "tracing", +] + [[package]] name = "either" version = "1.13.0" @@ -973,9 +986,9 @@ dependencies = [ [[package]] name = "libc" -version = "0.2.155" +version = "0.2.156" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "97b3888a4aecf77e811145cadf6eef5901f4782c53886191b2f693f24761847c" +checksum = "a5f43f184355eefb8d17fc948dbecf6c13be3c141f20d834ae842193a448c72a" [[package]] name = "libm" @@ -1106,6 +1119,7 @@ dependencies = [ "chrono", "clap", "dotenv", + "drift", "mad", "prost", "prost-types", diff --git a/Cargo.toml b/Cargo.toml index b0b0bfa..9cd866f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,7 +3,6 @@ members = ["crates/*"] resolver = "2" [workspace.dependencies] -nodata = { path = "crates/nodata" } anyhow = { version = "1" } tokio = { version = "1", features = ["full"] } @@ -12,3 +11,4 @@ tracing-subscriber = { version = "0.3.18" } clap = { version = "4", features = ["derive", "env"] } dotenv = { version = "0.15" } axum = { version = "0.7" } +drift = { git = "https://github.com/kjuulh/drift", branch = "main" } diff --git a/README.md b/README.md index 683dcdf..66c6847 100644 --- a/README.md +++ b/README.md @@ -22,3 +22,16 @@ Nodata accepts wasm routines for running aggregations over data to be processed ## Data Egress Nodata exposes aggregations as apis, or events to be sent as grpc streamed apis to a service. + +# Architecture + +## Data flow + +Data enteres nodata + +1. Application uses SDK to publish data +2. Data is sent over grpc using, a topic, id and data +3. Data is sent to a topic +4. A broadcast is sent that said topic was updated with a given offset +5. A client can consume from said topic, given a topic and id +6. A queue is running consuming each broadcast message, assigning jobs for each consumer group to delegate messages diff --git a/crates/nodata/Cargo.toml b/crates/nodata/Cargo.toml index e3449bb..fe1b564 100644 --- a/crates/nodata/Cargo.toml +++ b/crates/nodata/Cargo.toml @@ -11,6 +11,7 @@ tracing-subscriber.workspace = true clap.workspace = true dotenv.workspace = true axum.workspace = true +drift.workspace = true serde = { version = "1.0.197", features = ["derive"] } sqlx = { version = "0.7.3", features = [ diff --git a/crates/nodata/src/broker.rs b/crates/nodata/src/broker.rs new file mode 100644 index 0000000..666de67 --- /dev/null +++ b/crates/nodata/src/broker.rs @@ -0,0 +1,132 @@ +use std::{collections::BTreeMap, sync::Arc, time::Duration}; + +use axum::async_trait; +use drift::Drifter; +use mad::Component; +use tokio::sync::RwLock; +use tokio_util::sync::CancellationToken; + +use crate::{ + services::consumers::{ConsumerGroup, ConsumerId, ConsumersState}, + state::SharedState, +}; + +#[derive(Clone)] +pub struct Broker { + state: SharedState, + + handlers: Arc>>, +} + +impl Broker { + pub fn new(state: &SharedState) -> Self { + Self { + state: state.clone(), + handlers: Arc::default(), + } + } +} + +#[async_trait] +impl Component for Broker { + async fn run( + &self, + cancellation_token: tokio_util::sync::CancellationToken, + ) -> Result<(), mad::MadError> { + let token = drift::schedule_drifter(Duration::from_secs(1), self.clone()); + + tokio::select! { + _ = token.cancelled() => {}, + _ = cancellation_token.cancelled() => { + token.cancel() + }, + } + + Ok(()) + } +} + +#[async_trait] +impl Drifter for Broker { + async fn execute(&self, token: tokio_util::sync::CancellationToken) -> anyhow::Result<()> { + tracing::debug!("reconciling broker"); + + // Execute listen to broker, and execute and register a potential drifter for each consumer group available + + let mut handlers = self.handlers.write().await; + let consumer_groups = self.state.consumers().get_consumer_groups().await; + + let mut new_handlers = Vec::new(); + let mut delete_handlers: Vec = Vec::new(); + for consumer_group in consumer_groups { + if handlers.contains_key(&consumer_group.get_id().await) { + //delete_handlers.push(consumer_group); + } else { + new_handlers.push(consumer_group); + } + } + + for new_handler in new_handlers { + let consumer_id = new_handler.get_id().await; + tracing::debug!(consumer_id = consumer_id, "creating new handler"); + handlers.insert( + new_handler.get_id().await, + BrokerHandler::new(&self.state, new_handler, token.child_token()), + ); + } + + for delete_handler in delete_handlers { + let consumer_id = delete_handler.get_id().await; + tracing::debug!(consumer_id = consumer_id, "deleting consumer"); + handlers.remove(&delete_handler.get_id().await); + } + + Ok(()) + } +} + +pub struct BrokerHandler { + token: CancellationToken, + state: SharedState, +} +impl BrokerHandler { + pub fn new( + state: &SharedState, + consumer_group: ConsumerGroup, + parent_token: CancellationToken, + ) -> Self { + let inner_state = state.clone(); + let token = drift::schedule(Duration::from_secs(1), move || { + let consumer_group = consumer_group.clone(); + let state = inner_state.clone(); + + async move { + let consumer_group = consumer_group; + + if let Err(e) = consumer_group.reconcile_lag(&state).await { + tracing::warn!( + error = e.to_string(), + "failed to reconcile lag for consumer_group" + ) + } + + // Look at offset diffs between, current and lag + // if diff, pick a member + // execute update for member + + Ok(()) + } + }); + + Self { + state: state.clone(), + token, + } + } +} + +impl Drop for BrokerHandler { + fn drop(&mut self) { + self.token.cancel(); + } +} diff --git a/crates/nodata/src/grpc.rs b/crates/nodata/src/grpc.rs index e4a1449..6248c26 100644 --- a/crates/nodata/src/grpc.rs +++ b/crates/nodata/src/grpc.rs @@ -8,7 +8,9 @@ use uuid::Uuid; use crate::{ services::{ - consumers::ConsumersState, handler::HandlerState, ingest::IngestState, + consumers::{Consumer, ConsumersState}, + handler::HandlerState, + ingest::IngestState, staging::StagingEvent, }, state::SharedState, @@ -95,23 +97,17 @@ impl no_data_service_server::NoDataService for GrpcServer { let id = Uuid::new_v4().to_string(); let index = Uuid::new_v4().to_string(); - self.state + let consumer = self + .state .consumers() .add_consumer(&id, &index, req.topic) .await .map_err(|e| { tracing::warn!(error = e.to_string(), "failed to add consumer"); tonic::Status::internal(e.to_string()) - })?; - - let consumer = self - .state - .consumers - .get_consumer(&id, &index) - .await + })? .unwrap(); - let handler = self.state.handler(); let (tx, rx) = tokio::sync::mpsc::channel(128); tokio::spawn(async move { let mut event_stream = consumer.rx.lock().await; diff --git a/crates/nodata/src/main.rs b/crates/nodata/src/main.rs index ed18bf9..21bb649 100644 --- a/crates/nodata/src/main.rs +++ b/crates/nodata/src/main.rs @@ -1,3 +1,4 @@ +mod broker; mod grpc; mod http; mod state; @@ -6,6 +7,7 @@ mod services; use std::net::SocketAddr; +use broker::Broker; use chrono::{Datelike, Timelike}; use clap::{Parser, Subcommand}; use grpc::{GetKeysRequest, GetTopicsRequest, GrpcServer, PublishEventRequest, SubscribeRequest}; @@ -32,12 +34,12 @@ enum Commands { Client { // #[arg(env = "SERVICE_HOST", long, default_value = "http://127.0.0.1:3000")] - // host: String, - // #[arg( - // env = "GRPC_SERVICE_HOST", - // long, - // default_value = "http://127.0.0.1:7900" - // )] + //host: String, + #[arg( + env = "GRPC_SERVICE_HOST", + long, + default_value = "http://127.0.0.1:7900" + )] grpc_host: String, #[command(subcommand)] @@ -83,6 +85,7 @@ async fn main() -> anyhow::Result<()> { tracing::info!("Starting service"); let state = SharedState::new().await?; Mad::builder() + .add(Broker::new(&state)) .add(HttpServer::new(&state, host)) .add(GrpcServer::new(&state, grpc_host)) .run() diff --git a/crates/nodata/src/services.rs b/crates/nodata/src/services.rs index 997d5ec..5368bb5 100644 --- a/crates/nodata/src/services.rs +++ b/crates/nodata/src/services.rs @@ -1,50 +1,4 @@ pub mod consumers; pub mod handler; +pub mod ingest; pub mod staging; -pub mod ingest { - use crate::state::SharedState; - - use super::{ - consumers::{Consumer, Consumers}, - staging::{Staging, StagingEvent}, - }; - - #[derive(Clone)] - pub struct Ingest { - staging: Staging, - consumers: Consumers, - } - - impl Ingest { - pub fn new(staging: Staging, consumers: Consumers) -> Self { - Self { staging, consumers } - } - - pub async fn publish(&self, event: impl Into) -> anyhow::Result<()> { - let event: StagingEvent = event.into(); - let topic = event.topic.clone(); - let key = { - if event.key.is_empty() { - None - } else { - Some(event.key.clone()) - } - }; - - let offset = self.staging.publish(event).await?; - self.consumers.notify_update(topic, key, offset).await?; - - Ok(()) - } - } - - pub trait IngestState { - fn ingest(&self) -> Ingest; - } - - impl IngestState for SharedState { - fn ingest(&self) -> Ingest { - Ingest::new(self.staging.clone(), self.consumers.clone()) - } - } -} diff --git a/crates/nodata/src/services/consumers.rs b/crates/nodata/src/services/consumers.rs index 3b783c2..2a03b15 100644 --- a/crates/nodata/src/services/consumers.rs +++ b/crates/nodata/src/services/consumers.rs @@ -1,10 +1,13 @@ -use std::{collections::BTreeMap, sync::Arc}; +use std::{collections::BTreeMap, ops::Deref, sync::Arc}; use tokio::sync::RwLock; use crate::state::SharedState; -use super::{handler::Handler, staging::StagingEvent}; +use super::{ + handler::{Handler, HandlerState}, + staging::StagingEvent, +}; pub type ConsumerId = String; pub type ConsumerIndex = String; @@ -15,39 +18,147 @@ pub type TopicOffset = usize; // Consumer will at some point contain an interface to send events over a channel to a lib #[derive(Debug, Clone)] pub struct Consumer { - pub offset: TopicOffset, - pub tx: tokio::sync::mpsc::Sender, pub rx: Arc>>, } +#[derive(Default, Clone, Debug)] +pub struct ConsumerGroup { + inner: Arc>, +} + +#[derive(Default, Debug)] +pub struct InnerConsumerGroup { + pub id: ConsumerId, + topic: Topic, + partition_key: Option, + pub(crate) offset: TopicOffset, + pub(crate) cur_offset: TopicOffset, + + members: BTreeMap, +} + +impl ConsumerGroup { + pub fn new( + id: impl Into, + topic: impl Into, + partition_key: Option>, + ) -> Self { + Self { + inner: Arc::new(RwLock::new(InnerConsumerGroup::new( + id, + topic, + partition_key, + ))), + } + } + + pub async fn add_consumer(&self, index: ConsumerIndex, consumer: Consumer) { + let mut inner = self.inner.write().await; + + inner.add_consumer(index, consumer); + } + + pub async fn update_offset(&self, offset: TopicOffset) { + let mut inner = self.inner.write().await; + inner.update_offset(offset); + } + + pub async fn reconcile_lag(&self, state: &SharedState) -> anyhow::Result<()> { + let mut inner = self.inner.write().await; + inner.reconcile_lag(state).await?; + + Ok(()) + } + + pub async fn get_offset(&self) -> TopicOffset { + let inner = self.inner.read().await; + + inner.offset + } + + pub async fn get_id(&self) -> ConsumerId { + let inner = self.inner.read().await; + + inner.id.clone() + } +} + +impl InnerConsumerGroup { + pub fn new( + id: impl Into, + topic: impl Into, + partition_key: Option>, + ) -> Self { + Self { + id: id.into(), + topic: topic.into(), + partition_key: partition_key.map(|pk| pk.into()), + offset: 0, + cur_offset: 0, + members: BTreeMap::default(), + } + } + + pub fn add_consumer(&mut self, index: ConsumerIndex, consumer: Consumer) { + if self.members.insert(index.to_owned(), consumer).is_some() { + tracing::warn!(index = index, "consumer replaced"); + } + } + + pub fn update_offset(&mut self, offset: TopicOffset) { + self.offset = offset; + } + + pub async fn reconcile_lag(&mut self, state: &SharedState) -> anyhow::Result<()> { + if self.offset <= self.cur_offset { + return Ok(()); + } + + // TODO: replace with round robin or something; + let consumer = match self.members.first_key_value() { + Some((_, consumer)) => consumer, + None => return Ok(()), + }; + + tracing::debug!(consumer_id = self.id, "sending update for consumer"); + state + .handler() + .handle_offset( + &self.topic, + self.partition_key.as_ref(), + consumer, + self.cur_offset, + self.offset, + ) + .await?; + + self.cur_offset = self.offset; + + Ok(()) + } +} + impl Default for Consumer { fn default() -> Self { - Self::new(0) + Self::new() } } impl Consumer { - pub fn new(offset: usize) -> Self { + pub fn new() -> Self { let (tx, rx) = tokio::sync::mpsc::channel(128); Self { - offset, tx, rx: Arc::new(tokio::sync::Mutex::new(rx)), } } } -impl PartialEq for Consumer { - fn eq(&self, other: &Self) -> bool { - self.offset == other.offset - } -} - #[derive(Clone)] pub struct Consumers { - storage: Arc>>>, + storage: Arc>>, subscriptions: Arc>>>, handler: Handler, } @@ -71,23 +182,28 @@ impl Consumers { id: impl Into, index: impl Into, topic: impl Into, - ) -> anyhow::Result<()> { + ) -> anyhow::Result> { let id = id.into(); let index = index.into(); let topic = topic.into(); - { + let consumer = { let mut storage = self.storage.write().await; if !storage.contains_key(&id) { - storage.insert(id.clone(), BTreeMap::default()); + storage.insert( + id.clone(), + ConsumerGroup::new(&id, &topic, None::), + ); } let consumer_group = storage.get_mut(&id).unwrap(); - if !consumer_group.contains_key(&index) { - consumer_group.insert(index.clone(), Consumer::default()); - } - } + let consumer = Consumer::default(); + consumer_group + .add_consumer(index.clone(), consumer.clone()) + .await; + consumer + }; { let mut subscriptions = self.subscriptions.write().await; @@ -101,19 +217,24 @@ impl Consumers { } } - Ok(()) + Ok(Some(consumer)) } - pub async fn get_consumer( + pub async fn get_consumer_group( &self, id: impl Into, index: impl Into, - ) -> Option { + ) -> Option { let storage = self.storage.read().await; let consumer_group = storage.get(&id.into())?; - let consumer = consumer_group.get(&index.into())?; - Some(consumer.to_owned()) + Some(consumer_group.to_owned()) + } + + pub async fn get_consumer_groups(&self) -> Vec { + let storage = self.storage.read().await; + + storage.iter().map(|(_, v)| v).cloned().collect() } pub async fn notify_update( @@ -139,13 +260,7 @@ impl Consumers { for consumer_id in subscription { match storage.get_mut(consumer_id) { Some(consumer_groups) => { - for consumer in consumer_groups.values_mut() { - // TODO: Implement retry logic, etc. - self.handler - .handle_offset(&topic, key.as_ref(), consumer, offset) - .await?; - consumer.offset = offset; - } + consumer_groups.update_offset(offset).await; } None => { tracing::trace!( @@ -190,9 +305,12 @@ mod test { consumers .add_consumer(consumer_id, consumer_index, topic) .await?; - let consumer = consumers.get_consumer(consumer_id, consumer_index).await; + let consumer = consumers + .get_consumer_group(consumer_id, consumer_index) + .await + .unwrap(); - assert_eq!(Some(Consumer::default()), consumer); + assert_eq!(0, consumer.get_offset().await); Ok(()) } @@ -226,14 +344,21 @@ mod test { consumers .add_consumer(&consumer_id, &consumer_index, &topic) .await?; - let consumer = consumers.get_consumer(&consumer_id, &consumer_index).await; - assert_eq!(Some(Consumer::default()), consumer); + let consumer = consumers + .get_consumer_group(&consumer_id, &consumer_index) + .await + .unwrap(); + assert_eq!(0, consumer.get_offset().await); consumers .notify_update(&topic, None::, offset) .await?; - let consumer = consumers.get_consumer(&consumer_id, &consumer_index).await; - assert_eq!(Some(Consumer::new(9)), consumer); + let consumer = consumers + .get_consumer_group(&consumer_id, &consumer_index) + .await + .unwrap(); + + assert_eq!(9, consumer.get_offset().await); Ok(()) } diff --git a/crates/nodata/src/services/handler.rs b/crates/nodata/src/services/handler.rs index 6c1f28a..3ede37d 100644 --- a/crates/nodata/src/services/handler.rs +++ b/crates/nodata/src/services/handler.rs @@ -20,11 +20,12 @@ impl Handler { topic: &Topic, partition_key: Option<&PartitionKey>, consumer: &Consumer, - offset: TopicOffset, + start_offset: TopicOffset, + end_offset: TopicOffset, ) -> anyhow::Result<()> { let events = self .staging - .get_topic_offset(topic, partition_key, consumer.offset, offset) + .get_topic_offset(topic, partition_key, start_offset, end_offset) .await?; // TODO: handle events diff --git a/crates/nodata/src/services/ingest.rs b/crates/nodata/src/services/ingest.rs new file mode 100644 index 0000000..2169822 --- /dev/null +++ b/crates/nodata/src/services/ingest.rs @@ -0,0 +1,45 @@ +use crate::state::SharedState; + +use super::{ + consumers::{Consumer, Consumers}, + staging::{Staging, StagingEvent}, +}; + +#[derive(Clone)] +pub struct Ingest { + staging: Staging, + consumers: Consumers, +} + +impl Ingest { + pub fn new(staging: Staging, consumers: Consumers) -> Self { + Self { staging, consumers } + } + + pub async fn publish(&self, event: impl Into) -> anyhow::Result<()> { + let event: StagingEvent = event.into(); + let topic = event.topic.clone(); + let key = { + if event.key.is_empty() { + None + } else { + Some(event.key.clone()) + } + }; + + let offset = self.staging.publish(event).await?; + self.consumers.notify_update(topic, key, offset).await?; + + Ok(()) + } +} + +pub trait IngestState { + fn ingest(&self) -> Ingest; +} + +impl IngestState for SharedState { + fn ingest(&self) -> Ingest { + Ingest::new(self.staging.clone(), self.consumers.clone()) + } +}