diff --git a/crates/nodata/src/services.rs b/crates/nodata/src/services.rs index 5d9d396..aa927fb 100644 --- a/crates/nodata/src/services.rs +++ b/crates/nodata/src/services.rs @@ -1 +1,45 @@ +pub mod consumers; pub mod staging; +pub mod handler { + use std::sync::Arc; + + use super::{ + consumers::{Consumer, PartitionKey, Topic, TopicOffset}, + staging::Staging, + }; + + #[derive(Clone)] + pub struct Handler { + staging: Arc, + } + + impl Handler { + pub fn new(staging: Staging) -> Self { + Self { + staging: Arc::new(staging), + } + } + + pub fn handle_offset( + &self, + topic: &Topic, + partition_key: &Option, + consumer: &Consumer, + offset: TopicOffset, + ) -> anyhow::Result<()> { + let events = self.staging.get_topic_offset( + topic, + partition_key.clone(), + consumer.offset, + offset, + )?; + + // TODO: handle events + for event in events { + tracing::trace!("handling event: {:?}", event); + } + + Ok(()) + } + } +} diff --git a/crates/nodata/src/services/consumers.rs b/crates/nodata/src/services/consumers.rs new file mode 100644 index 0000000..311374f --- /dev/null +++ b/crates/nodata/src/services/consumers.rs @@ -0,0 +1,198 @@ +use std::{ + collections::BTreeMap, + sync::{Arc, RwLock}, +}; + +use crate::state::SharedState; + +use super::handler::Handler; + +pub type ConsumerId = String; +pub type ConsumerIndex = String; +pub type Topic = String; +pub type PartitionKey = String; +pub type TopicOffset = usize; + +// Consumer will at some point contain an interface to send events over a channel to a lib +#[derive(Default, Clone, Debug, PartialEq, Eq)] +pub struct Consumer { + pub offset: TopicOffset, +} + +#[derive(Clone)] +pub struct Consumers { + storage: Arc>>>, + subscriptions: Arc>>>, + handler: Handler, +} + +// message arrives in queue +// message is stored in staging +// after staging, topic and key as a partition is notifing active consumers. +// active consumers now listen and receive events, if they process ok, the offset of a consumer will move ahead in the partition +// so we keep a system of record of each consumer +impl Consumers { + pub fn new(handler: Handler) -> Self { + Self { + storage: Arc::default(), + subscriptions: Arc::default(), + handler, + } + } + + pub fn add_consumer( + &self, + id: impl Into, + index: impl Into, + topic: impl Into, + ) -> anyhow::Result<()> { + let id = id.into(); + let index = index.into(); + let topic = topic.into(); + + { + let mut storage = self.storage.write().unwrap(); + + if !storage.contains_key(&id) { + storage.insert(id.clone(), BTreeMap::default()); + } + + let consumer_group = storage.get_mut(&id).unwrap(); + if !consumer_group.contains_key(&index) { + consumer_group.insert(index.clone(), Consumer::default()); + } + } + + { + let mut subscriptions = self.subscriptions.write().unwrap(); + if !subscriptions.contains_key(&topic) { + subscriptions.insert(topic.clone(), Vec::default()); + } + + let subscription_consumers = subscriptions.get_mut(&topic).unwrap(); + if !subscription_consumers.contains(&id) { + subscription_consumers.push(id.clone()); + } + } + + Ok(()) + } + + pub fn get_consumer( + &self, + id: impl Into, + index: impl Into, + ) -> Option { + let storage = self.storage.read().unwrap(); + let consumer_group = storage.get(&id.into())?; + let consumer = consumer_group.get(&index.into())?; + + Some(consumer.to_owned()) + } + + pub fn notify_update( + &self, + topic: impl Into, + key: Option, + offset: impl Into, + ) -> anyhow::Result<()> { + let topic = topic.into(); + let offset = offset.into(); + + let subscriptions = self.subscriptions.read().unwrap(); + let subscription = match subscriptions.get(&topic) { + Some(s) => s, + None => { + tracing::debug!(topic = &topic, "no subscription for topic"); + return Ok(()); + } + }; + + let mut storage = self.storage.write().unwrap(); + for consumer_id in subscription { + match storage.get_mut(consumer_id) { + Some(consumer_groups) => { + for consumer in consumer_groups.values_mut() { + // TODO: Implement retry logic, etc. + self.handler.handle_offset(&topic, &key, consumer, offset)?; + consumer.offset = offset; + } + } + None => { + tracing::trace!( + topic = &topic, + consumer_id = &consumer_id, + "found no consumer" + ) + } + } + } + + Ok(()) + } +} + +pub trait ConsumersState { + fn consumers(&self) -> Consumers; +} + +impl ConsumersState for SharedState { + fn consumers(&self) -> Consumers { + self.consumers.clone() + } +} + +#[cfg(test)] +mod test { + use crate::services::staging::{Staging, StagingEvent}; + + use super::*; + + #[test] + fn can_add_consumer() -> anyhow::Result<()> { + let consumer_id = "some-consumer-id"; + let consumer_index = "some-consumer-index"; + let topic = "some-topic"; + + let consumers = Consumers::new(Handler::new(Staging::default())); + + consumers.add_consumer(consumer_id, consumer_index, topic)?; + let consumer = consumers.get_consumer(consumer_id, consumer_index); + + assert_eq!(Some(Consumer { offset: 0 }), consumer); + + Ok(()) + } + + #[tokio::test] + async fn can_notify_consumer() -> anyhow::Result<()> { + let consumer_id = "some-consumer-id".to_string(); + let consumer_index = "some-consumer-index".to_string(); + let topic = "some-topic".to_string(); + let offset = 10usize; + + let staging = Staging::default(); + // Publish 10 messages + for _ in 0..10 { + staging + .publish(StagingEvent { + topic: topic.clone(), + key: "".into(), + id: None, + }) + .await?; + } + + let consumers = Consumers::new(Handler::new(staging)); + + consumers.add_consumer(&consumer_id, &consumer_index, &topic)?; + let consumer = consumers.get_consumer(&consumer_id, &consumer_index); + assert_eq!(Some(Consumer { offset: 0 }), consumer); + + consumers.notify_update(&topic, None, offset)?; + let consumer = consumers.get_consumer(&consumer_id, &consumer_index); + assert_eq!(Some(Consumer { offset: 10 }), consumer); + + Ok(()) + } +} diff --git a/crates/nodata/src/services/staging.rs b/crates/nodata/src/services/staging.rs index 6bd9d0c..7816279 100644 --- a/crates/nodata/src/services/staging.rs +++ b/crates/nodata/src/services/staging.rs @@ -5,21 +5,33 @@ use std::{ use crate::state::SharedState; +use super::consumers::{Consumers, PartitionKey, Topic, TopicOffset}; + +#[derive(Clone, Debug, PartialEq, Eq)] pub struct StagingEvent { pub topic: String, pub key: String, pub id: Option, } -#[derive(Default, Clone)] +#[derive(Clone, Default)] pub struct Staging { // Temporary until we've got an actual file disk store #[allow(clippy::complexity)] - store: Arc>>>>, + store: Arc>>>>, } impl Staging { - pub async fn publish(&self, staging_event: impl Into) -> anyhow::Result<()> { + pub fn new() -> Self { + Self { + store: Arc::default(), + } + } + + pub async fn publish( + &self, + staging_event: impl Into, + ) -> anyhow::Result { let staging_event: StagingEvent = staging_event.into(); let mut store = self.store.write().unwrap(); tracing::trace!( @@ -28,23 +40,27 @@ impl Staging { "moving event to staging" ); - match store.get_mut(&staging_event.topic) { + let offset = match store.get_mut(&staging_event.topic) { Some(part) => match part.get_mut(&staging_event.key) { Some(existing_key_part) => { if staging_event.id.is_none() || !existing_key_part.iter().any(|p| p.id == staging_event.id) { existing_key_part.push(staging_event); + existing_key_part.len() - 1 } else { tracing::debug!( topic = staging_event.topic, id = staging_event.id, "event already found, skipping" ); + + existing_key_part.len() - 1 } } None => { part.insert(staging_event.key.to_owned(), vec![staging_event]); + 1 } }, None => { @@ -57,10 +73,12 @@ impl Staging { staging_event.topic.to_owned(), BTreeMap::from([(staging_event.key.to_owned(), vec![staging_event])]), ); - } - } - Ok(()) + 1 + } + }; + + Ok(offset) } pub async fn get_topics(&self) -> anyhow::Result> { @@ -79,6 +97,61 @@ impl Staging { Ok(items.cloned().collect::>()) } + + pub fn get_topic_offset( + &self, + topic: impl Into, + partition_key: Option>, + start: impl Into, + end: impl Into, + ) -> anyhow::Result> { + let topic = topic.into(); + let partition_key = partition_key.map(|p| p.into()).unwrap_or_default(); + let start = start.into(); + let end = end.into(); + + if start == end { + return Ok(Vec::new()); + } + + if start > end { + anyhow::bail!( + "start cannot be greater than end, (start={}, end={})", + start, + end + ) + } + + let store = self.store.read().unwrap(); + + let partitions = match store.get(&topic) { + Some(partitions) => partitions, + None => { + anyhow::bail!("topic doesn't exist in storage: {}", &topic); + } + }; + + let partition = match partitions.get(&partition_key) { + Some(partition) => partition, + None => { + anyhow::bail!( + "partition doesn't exist in storage, (topic={}, partition={})", + &topic, + &partition_key + ); + } + }; + + if partition.len() < end { + anyhow::bail!( + "partition len is less than the offset, (partition_len={}, offset={})", + partition.len(), + end + ) + } + + Ok(partition[start..end].to_vec()) + } } pub trait StagingState { diff --git a/crates/nodata/src/state.rs b/crates/nodata/src/state.rs index c62f778..ef655df 100644 --- a/crates/nodata/src/state.rs +++ b/crates/nodata/src/state.rs @@ -3,7 +3,7 @@ use std::{ops::Deref, sync::Arc}; use anyhow::Context; use sqlx::{Pool, Postgres}; -use crate::services::staging::Staging; +use crate::services::{consumers::Consumers, handler::Handler, staging::Staging}; #[derive(Clone)] pub struct SharedState(Arc); @@ -25,6 +25,8 @@ impl Deref for SharedState { pub struct State { pub db: Pool, pub staging: Staging, + pub consumers: Consumers, + pub handler: Handler, } impl State { @@ -41,9 +43,14 @@ impl State { let _ = sqlx::query("SELECT 1;").fetch_one(&db).await?; + let staging = Staging::default(); + let handler = Handler::new(staging.clone()); + Ok(Self { db, - staging: Staging::default(), + consumers: Consumers::new(handler.clone()), + staging, + handler, }) } }