feat: do cleanup

Signed-off-by: kjuulh <contact@kjuulh.io>
This commit is contained in:
Kasper Juul Hermansen 2024-08-17 13:59:58 +02:00
parent d0ea8019e1
commit 705da45497
Signed by: kjuulh
GPG Key ID: 9AA7BC13CE474394
8 changed files with 37 additions and 54 deletions

View File

@ -34,4 +34,17 @@ Data enteres nodata
3. Data is sent to a topic 3. Data is sent to a topic
4. A broadcast is sent that said topic was updated with a given offset 4. A broadcast is sent that said topic was updated with a given offset
5. A client can consume from said topic, given a topic and id 5. A client can consume from said topic, given a topic and id
// 6. We need a partition in here to separate handling between partitions and consumer groups
6. A queue is running consuming each broadcast message, assigning jobs for each consumer group to delegate messages 6. A queue is running consuming each broadcast message, assigning jobs for each consumer group to delegate messages
## Components
A component is a consumer on a set topic, it will either act as a source, sink or a tranformation between topics. It can declare topics, use topics, transform data and much more.
A topic at its most basic is a computational unit implementing a certain interface, source, sink, transformation.
The most simple is a source and sink, where we respectively push or pull data from the topics.
### What does it look like
As part of nodata, you'll be given nodata the cli. The cli can bootstrap a variety of components,

View File

@ -49,7 +49,7 @@ impl Component for Broker {
#[async_trait] #[async_trait]
impl Drifter for Broker { impl Drifter for Broker {
async fn execute(&self, token: tokio_util::sync::CancellationToken) -> anyhow::Result<()> { async fn execute(&self, token: tokio_util::sync::CancellationToken) -> anyhow::Result<()> {
tracing::debug!("reconciling broker"); tracing::trace!("reconciling broker");
// Execute listen to broker, and execute and register a potential drifter for each consumer group available // Execute listen to broker, and execute and register a potential drifter for each consumer group available
@ -57,7 +57,7 @@ impl Drifter for Broker {
let consumer_groups = self.state.consumers().get_consumer_groups().await; let consumer_groups = self.state.consumers().get_consumer_groups().await;
let mut new_handlers = Vec::new(); let mut new_handlers = Vec::new();
let mut delete_handlers: Vec<ConsumerGroup> = Vec::new(); let delete_handlers: Vec<ConsumerGroup> = Vec::new();
for consumer_group in consumer_groups { for consumer_group in consumer_groups {
if handlers.contains_key(&consumer_group.get_id().await) { if handlers.contains_key(&consumer_group.get_id().await) {
//delete_handlers.push(consumer_group); //delete_handlers.push(consumer_group);
@ -87,13 +87,12 @@ impl Drifter for Broker {
pub struct BrokerHandler { pub struct BrokerHandler {
token: CancellationToken, token: CancellationToken,
state: SharedState,
} }
impl BrokerHandler { impl BrokerHandler {
pub fn new( pub fn new(
state: &SharedState, state: &SharedState,
consumer_group: ConsumerGroup, consumer_group: ConsumerGroup,
parent_token: CancellationToken, _parent_token: CancellationToken,
) -> Self { ) -> Self {
let inner_state = state.clone(); let inner_state = state.clone();
let token = drift::schedule(Duration::from_secs(1), move || { let token = drift::schedule(Duration::from_secs(1), move || {
@ -118,10 +117,7 @@ impl BrokerHandler {
} }
}); });
Self { Self { token }
state: state.clone(),
token,
}
} }
} }

View File

@ -7,12 +7,7 @@ use tonic::Response;
use uuid::Uuid; use uuid::Uuid;
use crate::{ use crate::{
services::{ services::{consumers::ConsumersState, ingest::IngestState, staging::StagingEvent},
consumers::{Consumer, ConsumersState},
handler::HandlerState,
ingest::IngestState,
staging::StagingEvent,
},
state::SharedState, state::SharedState,
}; };

View File

@ -29,7 +29,7 @@ impl HttpServer {
#[async_trait] #[async_trait]
impl Component for HttpServer { impl Component for HttpServer {
async fn run(&self, cancellation_token: CancellationToken) -> Result<(), mad::MadError> { async fn run(&self, _cancellation_token: CancellationToken) -> Result<(), mad::MadError> {
let app = Router::new() let app = Router::new()
.route("/", get(root)) .route("/", get(root))
.with_state(self.state.clone()) .with_state(self.state.clone())

View File

@ -1,13 +1,10 @@
use std::{collections::BTreeMap, ops::Deref, sync::Arc}; use std::{collections::BTreeMap, sync::Arc};
use tokio::sync::RwLock; use tokio::sync::RwLock;
use crate::state::SharedState; use crate::state::SharedState;
use super::{ use super::{handler::HandlerState, staging::StagingEvent};
handler::{Handler, HandlerState},
staging::StagingEvent,
};
pub type ConsumerId = String; pub type ConsumerId = String;
pub type ConsumerIndex = String; pub type ConsumerIndex = String;
@ -71,6 +68,7 @@ impl ConsumerGroup {
Ok(()) Ok(())
} }
#[allow(dead_code)]
pub async fn get_offset(&self) -> TopicOffset { pub async fn get_offset(&self) -> TopicOffset {
let inner = self.inner.read().await; let inner = self.inner.read().await;
@ -160,7 +158,6 @@ impl Consumer {
pub struct Consumers { pub struct Consumers {
storage: Arc<RwLock<BTreeMap<ConsumerId, ConsumerGroup>>>, storage: Arc<RwLock<BTreeMap<ConsumerId, ConsumerGroup>>>,
subscriptions: Arc<RwLock<BTreeMap<Topic, Vec<ConsumerId>>>>, subscriptions: Arc<RwLock<BTreeMap<Topic, Vec<ConsumerId>>>>,
handler: Handler,
} }
// message arrives in queue // message arrives in queue
@ -169,11 +166,10 @@ pub struct Consumers {
// active consumers now listen and receive events, if they process ok, the offset of a consumer will move ahead in the partition // active consumers now listen and receive events, if they process ok, the offset of a consumer will move ahead in the partition
// so we keep a system of record of each consumer // so we keep a system of record of each consumer
impl Consumers { impl Consumers {
pub fn new(handler: Handler) -> Self { pub fn new() -> Self {
Self { Self {
storage: Arc::default(), storage: Arc::default(),
subscriptions: Arc::default(), subscriptions: Arc::default(),
handler,
} }
} }
@ -220,11 +216,8 @@ impl Consumers {
Ok(Some(consumer)) Ok(Some(consumer))
} }
pub async fn get_consumer_group( #[allow(dead_code)]
&self, pub async fn get_consumer_group(&self, id: impl Into<ConsumerId>) -> Option<ConsumerGroup> {
id: impl Into<ConsumerId>,
index: impl Into<ConsumerIndex>,
) -> Option<ConsumerGroup> {
let storage = self.storage.read().await; let storage = self.storage.read().await;
let consumer_group = storage.get(&id.into())?; let consumer_group = storage.get(&id.into())?;
@ -245,7 +238,7 @@ impl Consumers {
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let topic = topic.into(); let topic = topic.into();
let offset = offset.into(); let offset = offset.into();
let key = key.map(|k| k.into()); let _key = key.map(|k| k.into());
let subscriptions = self.subscriptions.read().await; let subscriptions = self.subscriptions.read().await;
let subscription = match subscriptions.get(&topic) { let subscription = match subscriptions.get(&topic) {
@ -300,15 +293,12 @@ mod test {
let consumer_index = "some-consumer-index"; let consumer_index = "some-consumer-index";
let topic = "some-topic"; let topic = "some-topic";
let consumers = Consumers::new(Handler::new(Staging::default())); let consumers = Consumers::new();
consumers consumers
.add_consumer(consumer_id, consumer_index, topic) .add_consumer(consumer_id, consumer_index, topic)
.await?; .await?;
let consumer = consumers let consumer = consumers.get_consumer_group(consumer_id).await.unwrap();
.get_consumer_group(consumer_id, consumer_index)
.await
.unwrap();
assert_eq!(0, consumer.get_offset().await); assert_eq!(0, consumer.get_offset().await);
@ -339,24 +329,18 @@ mod test {
tracing::trace!("published offset: {}", offset); tracing::trace!("published offset: {}", offset);
} }
let consumers = Consumers::new(Handler::new(staging)); let consumers = Consumers::new();
consumers consumers
.add_consumer(&consumer_id, &consumer_index, &topic) .add_consumer(&consumer_id, &consumer_index, &topic)
.await?; .await?;
let consumer = consumers let consumer = consumers.get_consumer_group(&consumer_id).await.unwrap();
.get_consumer_group(&consumer_id, &consumer_index)
.await
.unwrap();
assert_eq!(0, consumer.get_offset().await); assert_eq!(0, consumer.get_offset().await);
consumers consumers
.notify_update(&topic, None::<String>, offset) .notify_update(&topic, None::<String>, offset)
.await?; .await?;
let consumer = consumers let consumer = consumers.get_consumer_group(&consumer_id).await.unwrap();
.get_consumer_group(&consumer_id, &consumer_index)
.await
.unwrap();
assert_eq!(9, consumer.get_offset().await); assert_eq!(9, consumer.get_offset().await);

View File

@ -1,7 +1,7 @@
use crate::state::SharedState; use crate::state::SharedState;
use super::{ use super::{
consumers::{Consumer, Consumers}, consumers::Consumers,
staging::{Staging, StagingEvent}, staging::{Staging, StagingEvent},
}; };

View File

@ -4,7 +4,7 @@ use tokio::sync::RwLock;
use crate::state::SharedState; use crate::state::SharedState;
use super::consumers::{Consumers, PartitionKey, Topic, TopicOffset}; use super::consumers::{PartitionKey, Topic, TopicOffset};
#[derive(Clone, Debug, PartialEq, Eq)] #[derive(Clone, Debug, PartialEq, Eq)]
pub struct StagingEvent { pub struct StagingEvent {
@ -23,12 +23,6 @@ pub struct Staging {
} }
impl Staging { impl Staging {
pub fn new() -> Self {
Self {
store: Arc::default(),
}
}
pub async fn publish( pub async fn publish(
&self, &self,
staging_event: impl Into<StagingEvent>, staging_event: impl Into<StagingEvent>,
@ -155,6 +149,7 @@ impl Staging {
} }
} }
#[allow(dead_code)]
pub trait StagingState { pub trait StagingState {
fn staging(&self) -> Staging; fn staging(&self) -> Staging;
} }

View File

@ -23,7 +23,7 @@ impl Deref for SharedState {
} }
pub struct State { pub struct State {
pub db: Pool<Postgres>, pub _db: Pool<Postgres>,
pub staging: Staging, pub staging: Staging,
pub consumers: Consumers, pub consumers: Consumers,
pub handler: Handler, pub handler: Handler,
@ -47,8 +47,8 @@ impl State {
let handler = Handler::new(staging.clone()); let handler = Handler::new(staging.clone());
Ok(Self { Ok(Self {
db, _db: db,
consumers: Consumers::new(handler.clone()), consumers: Consumers::new(),
staging, staging,
handler, handler,
}) })