feat: add broker setup

this is mainly to decouple the actual sending of events, from the ingest.

we now ingest the data, and update consumer groups with the new offset. Consumer groups now in the background continously send out data from the update. they tick 1 second between checks, but if something takes long than a second, the next run just continues from where we left off

Signed-off-by: kjuulh <contact@kjuulh.io>
This commit is contained in:
Kasper Juul Hermansen 2024-08-16 00:25:43 +02:00
parent f818a18d65
commit d0ea8019e1
Signed by: kjuulh
GPG Key ID: 9AA7BC13CE474394
11 changed files with 391 additions and 107 deletions

18
Cargo.lock generated
View File

@ -480,6 +480,19 @@ version = "0.15.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1aaf95b3e5c8f23aa320147307562d361db0ae0d51242340f558153b4eb2439b"
[[package]]
name = "drift"
version = "0.2.0"
source = "git+https://github.com/kjuulh/drift?branch=main#fb4bd51d7106bf2ebffa851dbce172c06ed12ff7"
dependencies = [
"anyhow",
"async-trait",
"thiserror",
"tokio",
"tokio-util",
"tracing",
]
[[package]]
name = "either"
version = "1.13.0"
@ -973,9 +986,9 @@ dependencies = [
[[package]]
name = "libc"
version = "0.2.155"
version = "0.2.156"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "97b3888a4aecf77e811145cadf6eef5901f4782c53886191b2f693f24761847c"
checksum = "a5f43f184355eefb8d17fc948dbecf6c13be3c141f20d834ae842193a448c72a"
[[package]]
name = "libm"
@ -1106,6 +1119,7 @@ dependencies = [
"chrono",
"clap",
"dotenv",
"drift",
"mad",
"prost",
"prost-types",

View File

@ -3,7 +3,6 @@ members = ["crates/*"]
resolver = "2"
[workspace.dependencies]
nodata = { path = "crates/nodata" }
anyhow = { version = "1" }
tokio = { version = "1", features = ["full"] }
@ -12,3 +11,4 @@ tracing-subscriber = { version = "0.3.18" }
clap = { version = "4", features = ["derive", "env"] }
dotenv = { version = "0.15" }
axum = { version = "0.7" }
drift = { git = "https://github.com/kjuulh/drift", branch = "main" }

View File

@ -22,3 +22,16 @@ Nodata accepts wasm routines for running aggregations over data to be processed
## Data Egress
Nodata exposes aggregations as apis, or events to be sent as grpc streamed apis to a service.
# Architecture
## Data flow
Data enteres nodata
1. Application uses SDK to publish data
2. Data is sent over grpc using, a topic, id and data
3. Data is sent to a topic
4. A broadcast is sent that said topic was updated with a given offset
5. A client can consume from said topic, given a topic and id
6. A queue is running consuming each broadcast message, assigning jobs for each consumer group to delegate messages

View File

@ -11,6 +11,7 @@ tracing-subscriber.workspace = true
clap.workspace = true
dotenv.workspace = true
axum.workspace = true
drift.workspace = true
serde = { version = "1.0.197", features = ["derive"] }
sqlx = { version = "0.7.3", features = [

132
crates/nodata/src/broker.rs Normal file
View File

@ -0,0 +1,132 @@
use std::{collections::BTreeMap, sync::Arc, time::Duration};
use axum::async_trait;
use drift::Drifter;
use mad::Component;
use tokio::sync::RwLock;
use tokio_util::sync::CancellationToken;
use crate::{
services::consumers::{ConsumerGroup, ConsumerId, ConsumersState},
state::SharedState,
};
#[derive(Clone)]
pub struct Broker {
state: SharedState,
handlers: Arc<RwLock<BTreeMap<ConsumerId, BrokerHandler>>>,
}
impl Broker {
pub fn new(state: &SharedState) -> Self {
Self {
state: state.clone(),
handlers: Arc::default(),
}
}
}
#[async_trait]
impl Component for Broker {
async fn run(
&self,
cancellation_token: tokio_util::sync::CancellationToken,
) -> Result<(), mad::MadError> {
let token = drift::schedule_drifter(Duration::from_secs(1), self.clone());
tokio::select! {
_ = token.cancelled() => {},
_ = cancellation_token.cancelled() => {
token.cancel()
},
}
Ok(())
}
}
#[async_trait]
impl Drifter for Broker {
async fn execute(&self, token: tokio_util::sync::CancellationToken) -> anyhow::Result<()> {
tracing::debug!("reconciling broker");
// Execute listen to broker, and execute and register a potential drifter for each consumer group available
let mut handlers = self.handlers.write().await;
let consumer_groups = self.state.consumers().get_consumer_groups().await;
let mut new_handlers = Vec::new();
let mut delete_handlers: Vec<ConsumerGroup> = Vec::new();
for consumer_group in consumer_groups {
if handlers.contains_key(&consumer_group.get_id().await) {
//delete_handlers.push(consumer_group);
} else {
new_handlers.push(consumer_group);
}
}
for new_handler in new_handlers {
let consumer_id = new_handler.get_id().await;
tracing::debug!(consumer_id = consumer_id, "creating new handler");
handlers.insert(
new_handler.get_id().await,
BrokerHandler::new(&self.state, new_handler, token.child_token()),
);
}
for delete_handler in delete_handlers {
let consumer_id = delete_handler.get_id().await;
tracing::debug!(consumer_id = consumer_id, "deleting consumer");
handlers.remove(&delete_handler.get_id().await);
}
Ok(())
}
}
pub struct BrokerHandler {
token: CancellationToken,
state: SharedState,
}
impl BrokerHandler {
pub fn new(
state: &SharedState,
consumer_group: ConsumerGroup,
parent_token: CancellationToken,
) -> Self {
let inner_state = state.clone();
let token = drift::schedule(Duration::from_secs(1), move || {
let consumer_group = consumer_group.clone();
let state = inner_state.clone();
async move {
let consumer_group = consumer_group;
if let Err(e) = consumer_group.reconcile_lag(&state).await {
tracing::warn!(
error = e.to_string(),
"failed to reconcile lag for consumer_group"
)
}
// Look at offset diffs between, current and lag
// if diff, pick a member
// execute update for member
Ok(())
}
});
Self {
state: state.clone(),
token,
}
}
}
impl Drop for BrokerHandler {
fn drop(&mut self) {
self.token.cancel();
}
}

View File

@ -8,7 +8,9 @@ use uuid::Uuid;
use crate::{
services::{
consumers::ConsumersState, handler::HandlerState, ingest::IngestState,
consumers::{Consumer, ConsumersState},
handler::HandlerState,
ingest::IngestState,
staging::StagingEvent,
},
state::SharedState,
@ -95,23 +97,17 @@ impl no_data_service_server::NoDataService for GrpcServer {
let id = Uuid::new_v4().to_string();
let index = Uuid::new_v4().to_string();
self.state
let consumer = self
.state
.consumers()
.add_consumer(&id, &index, req.topic)
.await
.map_err(|e| {
tracing::warn!(error = e.to_string(), "failed to add consumer");
tonic::Status::internal(e.to_string())
})?;
let consumer = self
.state
.consumers
.get_consumer(&id, &index)
.await
})?
.unwrap();
let handler = self.state.handler();
let (tx, rx) = tokio::sync::mpsc::channel(128);
tokio::spawn(async move {
let mut event_stream = consumer.rx.lock().await;

View File

@ -1,3 +1,4 @@
mod broker;
mod grpc;
mod http;
mod state;
@ -6,6 +7,7 @@ mod services;
use std::net::SocketAddr;
use broker::Broker;
use chrono::{Datelike, Timelike};
use clap::{Parser, Subcommand};
use grpc::{GetKeysRequest, GetTopicsRequest, GrpcServer, PublishEventRequest, SubscribeRequest};
@ -32,12 +34,12 @@ enum Commands {
Client {
// #[arg(env = "SERVICE_HOST", long, default_value = "http://127.0.0.1:3000")]
// host: String,
// #[arg(
// env = "GRPC_SERVICE_HOST",
// long,
// default_value = "http://127.0.0.1:7900"
// )]
//host: String,
#[arg(
env = "GRPC_SERVICE_HOST",
long,
default_value = "http://127.0.0.1:7900"
)]
grpc_host: String,
#[command(subcommand)]
@ -83,6 +85,7 @@ async fn main() -> anyhow::Result<()> {
tracing::info!("Starting service");
let state = SharedState::new().await?;
Mad::builder()
.add(Broker::new(&state))
.add(HttpServer::new(&state, host))
.add(GrpcServer::new(&state, grpc_host))
.run()

View File

@ -1,50 +1,4 @@
pub mod consumers;
pub mod handler;
pub mod ingest;
pub mod staging;
pub mod ingest {
use crate::state::SharedState;
use super::{
consumers::{Consumer, Consumers},
staging::{Staging, StagingEvent},
};
#[derive(Clone)]
pub struct Ingest {
staging: Staging,
consumers: Consumers,
}
impl Ingest {
pub fn new(staging: Staging, consumers: Consumers) -> Self {
Self { staging, consumers }
}
pub async fn publish(&self, event: impl Into<StagingEvent>) -> anyhow::Result<()> {
let event: StagingEvent = event.into();
let topic = event.topic.clone();
let key = {
if event.key.is_empty() {
None
} else {
Some(event.key.clone())
}
};
let offset = self.staging.publish(event).await?;
self.consumers.notify_update(topic, key, offset).await?;
Ok(())
}
}
pub trait IngestState {
fn ingest(&self) -> Ingest;
}
impl IngestState for SharedState {
fn ingest(&self) -> Ingest {
Ingest::new(self.staging.clone(), self.consumers.clone())
}
}
}

View File

@ -1,10 +1,13 @@
use std::{collections::BTreeMap, sync::Arc};
use std::{collections::BTreeMap, ops::Deref, sync::Arc};
use tokio::sync::RwLock;
use crate::state::SharedState;
use super::{handler::Handler, staging::StagingEvent};
use super::{
handler::{Handler, HandlerState},
staging::StagingEvent,
};
pub type ConsumerId = String;
pub type ConsumerIndex = String;
@ -15,39 +18,147 @@ pub type TopicOffset = usize;
// Consumer will at some point contain an interface to send events over a channel to a lib
#[derive(Debug, Clone)]
pub struct Consumer {
pub offset: TopicOffset,
pub tx: tokio::sync::mpsc::Sender<StagingEvent>,
pub rx: Arc<tokio::sync::Mutex<tokio::sync::mpsc::Receiver<StagingEvent>>>,
}
#[derive(Default, Clone, Debug)]
pub struct ConsumerGroup {
inner: Arc<RwLock<InnerConsumerGroup>>,
}
#[derive(Default, Debug)]
pub struct InnerConsumerGroup {
pub id: ConsumerId,
topic: Topic,
partition_key: Option<PartitionKey>,
pub(crate) offset: TopicOffset,
pub(crate) cur_offset: TopicOffset,
members: BTreeMap<ConsumerIndex, Consumer>,
}
impl ConsumerGroup {
pub fn new(
id: impl Into<ConsumerId>,
topic: impl Into<Topic>,
partition_key: Option<impl Into<PartitionKey>>,
) -> Self {
Self {
inner: Arc::new(RwLock::new(InnerConsumerGroup::new(
id,
topic,
partition_key,
))),
}
}
pub async fn add_consumer(&self, index: ConsumerIndex, consumer: Consumer) {
let mut inner = self.inner.write().await;
inner.add_consumer(index, consumer);
}
pub async fn update_offset(&self, offset: TopicOffset) {
let mut inner = self.inner.write().await;
inner.update_offset(offset);
}
pub async fn reconcile_lag(&self, state: &SharedState) -> anyhow::Result<()> {
let mut inner = self.inner.write().await;
inner.reconcile_lag(state).await?;
Ok(())
}
pub async fn get_offset(&self) -> TopicOffset {
let inner = self.inner.read().await;
inner.offset
}
pub async fn get_id(&self) -> ConsumerId {
let inner = self.inner.read().await;
inner.id.clone()
}
}
impl InnerConsumerGroup {
pub fn new(
id: impl Into<ConsumerId>,
topic: impl Into<Topic>,
partition_key: Option<impl Into<PartitionKey>>,
) -> Self {
Self {
id: id.into(),
topic: topic.into(),
partition_key: partition_key.map(|pk| pk.into()),
offset: 0,
cur_offset: 0,
members: BTreeMap::default(),
}
}
pub fn add_consumer(&mut self, index: ConsumerIndex, consumer: Consumer) {
if self.members.insert(index.to_owned(), consumer).is_some() {
tracing::warn!(index = index, "consumer replaced");
}
}
pub fn update_offset(&mut self, offset: TopicOffset) {
self.offset = offset;
}
pub async fn reconcile_lag(&mut self, state: &SharedState) -> anyhow::Result<()> {
if self.offset <= self.cur_offset {
return Ok(());
}
// TODO: replace with round robin or something;
let consumer = match self.members.first_key_value() {
Some((_, consumer)) => consumer,
None => return Ok(()),
};
tracing::debug!(consumer_id = self.id, "sending update for consumer");
state
.handler()
.handle_offset(
&self.topic,
self.partition_key.as_ref(),
consumer,
self.cur_offset,
self.offset,
)
.await?;
self.cur_offset = self.offset;
Ok(())
}
}
impl Default for Consumer {
fn default() -> Self {
Self::new(0)
Self::new()
}
}
impl Consumer {
pub fn new(offset: usize) -> Self {
pub fn new() -> Self {
let (tx, rx) = tokio::sync::mpsc::channel(128);
Self {
offset,
tx,
rx: Arc::new(tokio::sync::Mutex::new(rx)),
}
}
}
impl PartialEq for Consumer {
fn eq(&self, other: &Self) -> bool {
self.offset == other.offset
}
}
#[derive(Clone)]
pub struct Consumers {
storage: Arc<RwLock<BTreeMap<ConsumerId, BTreeMap<ConsumerIndex, Consumer>>>>,
storage: Arc<RwLock<BTreeMap<ConsumerId, ConsumerGroup>>>,
subscriptions: Arc<RwLock<BTreeMap<Topic, Vec<ConsumerId>>>>,
handler: Handler,
}
@ -71,23 +182,28 @@ impl Consumers {
id: impl Into<ConsumerId>,
index: impl Into<ConsumerIndex>,
topic: impl Into<Topic>,
) -> anyhow::Result<()> {
) -> anyhow::Result<Option<Consumer>> {
let id = id.into();
let index = index.into();
let topic = topic.into();
{
let consumer = {
let mut storage = self.storage.write().await;
if !storage.contains_key(&id) {
storage.insert(id.clone(), BTreeMap::default());
storage.insert(
id.clone(),
ConsumerGroup::new(&id, &topic, None::<PartitionKey>),
);
}
let consumer_group = storage.get_mut(&id).unwrap();
if !consumer_group.contains_key(&index) {
consumer_group.insert(index.clone(), Consumer::default());
}
}
let consumer = Consumer::default();
consumer_group
.add_consumer(index.clone(), consumer.clone())
.await;
consumer
};
{
let mut subscriptions = self.subscriptions.write().await;
@ -101,19 +217,24 @@ impl Consumers {
}
}
Ok(())
Ok(Some(consumer))
}
pub async fn get_consumer(
pub async fn get_consumer_group(
&self,
id: impl Into<ConsumerId>,
index: impl Into<ConsumerIndex>,
) -> Option<Consumer> {
) -> Option<ConsumerGroup> {
let storage = self.storage.read().await;
let consumer_group = storage.get(&id.into())?;
let consumer = consumer_group.get(&index.into())?;
Some(consumer.to_owned())
Some(consumer_group.to_owned())
}
pub async fn get_consumer_groups(&self) -> Vec<ConsumerGroup> {
let storage = self.storage.read().await;
storage.iter().map(|(_, v)| v).cloned().collect()
}
pub async fn notify_update(
@ -139,13 +260,7 @@ impl Consumers {
for consumer_id in subscription {
match storage.get_mut(consumer_id) {
Some(consumer_groups) => {
for consumer in consumer_groups.values_mut() {
// TODO: Implement retry logic, etc.
self.handler
.handle_offset(&topic, key.as_ref(), consumer, offset)
.await?;
consumer.offset = offset;
}
consumer_groups.update_offset(offset).await;
}
None => {
tracing::trace!(
@ -190,9 +305,12 @@ mod test {
consumers
.add_consumer(consumer_id, consumer_index, topic)
.await?;
let consumer = consumers.get_consumer(consumer_id, consumer_index).await;
let consumer = consumers
.get_consumer_group(consumer_id, consumer_index)
.await
.unwrap();
assert_eq!(Some(Consumer::default()), consumer);
assert_eq!(0, consumer.get_offset().await);
Ok(())
}
@ -226,14 +344,21 @@ mod test {
consumers
.add_consumer(&consumer_id, &consumer_index, &topic)
.await?;
let consumer = consumers.get_consumer(&consumer_id, &consumer_index).await;
assert_eq!(Some(Consumer::default()), consumer);
let consumer = consumers
.get_consumer_group(&consumer_id, &consumer_index)
.await
.unwrap();
assert_eq!(0, consumer.get_offset().await);
consumers
.notify_update(&topic, None::<String>, offset)
.await?;
let consumer = consumers.get_consumer(&consumer_id, &consumer_index).await;
assert_eq!(Some(Consumer::new(9)), consumer);
let consumer = consumers
.get_consumer_group(&consumer_id, &consumer_index)
.await
.unwrap();
assert_eq!(9, consumer.get_offset().await);
Ok(())
}

View File

@ -20,11 +20,12 @@ impl Handler {
topic: &Topic,
partition_key: Option<&PartitionKey>,
consumer: &Consumer,
offset: TopicOffset,
start_offset: TopicOffset,
end_offset: TopicOffset,
) -> anyhow::Result<()> {
let events = self
.staging
.get_topic_offset(topic, partition_key, consumer.offset, offset)
.get_topic_offset(topic, partition_key, start_offset, end_offset)
.await?;
// TODO: handle events

View File

@ -0,0 +1,45 @@
use crate::state::SharedState;
use super::{
consumers::{Consumer, Consumers},
staging::{Staging, StagingEvent},
};
#[derive(Clone)]
pub struct Ingest {
staging: Staging,
consumers: Consumers,
}
impl Ingest {
pub fn new(staging: Staging, consumers: Consumers) -> Self {
Self { staging, consumers }
}
pub async fn publish(&self, event: impl Into<StagingEvent>) -> anyhow::Result<()> {
let event: StagingEvent = event.into();
let topic = event.topic.clone();
let key = {
if event.key.is_empty() {
None
} else {
Some(event.key.clone())
}
};
let offset = self.staging.publish(event).await?;
self.consumers.notify_update(topic, key, offset).await?;
Ok(())
}
}
pub trait IngestState {
fn ingest(&self) -> Ingest;
}
impl IngestState for SharedState {
fn ingest(&self) -> Ingest {
Ingest::new(self.staging.clone(), self.consumers.clone())
}
}