feat: add offset to consumers

Signed-off-by: kjuulh <contact@kjuulh.io>
This commit is contained in:
Kasper Juul Hermansen 2024-08-14 14:40:58 +02:00
parent 334dd1bde2
commit 242dd2ce76
Signed by: kjuulh
GPG Key ID: 9AA7BC13CE474394
4 changed files with 331 additions and 9 deletions

View File

@ -1 +1,45 @@
pub mod consumers;
pub mod staging; pub mod staging;
pub mod handler {
use std::sync::Arc;
use super::{
consumers::{Consumer, PartitionKey, Topic, TopicOffset},
staging::Staging,
};
#[derive(Clone)]
pub struct Handler {
staging: Arc<Staging>,
}
impl Handler {
pub fn new(staging: Staging) -> Self {
Self {
staging: Arc::new(staging),
}
}
pub fn handle_offset(
&self,
topic: &Topic,
partition_key: &Option<PartitionKey>,
consumer: &Consumer,
offset: TopicOffset,
) -> anyhow::Result<()> {
let events = self.staging.get_topic_offset(
topic,
partition_key.clone(),
consumer.offset,
offset,
)?;
// TODO: handle events
for event in events {
tracing::trace!("handling event: {:?}", event);
}
Ok(())
}
}
}

View File

@ -0,0 +1,198 @@
use std::{
collections::BTreeMap,
sync::{Arc, RwLock},
};
use crate::state::SharedState;
use super::handler::Handler;
pub type ConsumerId = String;
pub type ConsumerIndex = String;
pub type Topic = String;
pub type PartitionKey = String;
pub type TopicOffset = usize;
// Consumer will at some point contain an interface to send events over a channel to a lib
#[derive(Default, Clone, Debug, PartialEq, Eq)]
pub struct Consumer {
pub offset: TopicOffset,
}
#[derive(Clone)]
pub struct Consumers {
storage: Arc<RwLock<BTreeMap<ConsumerId, BTreeMap<ConsumerIndex, Consumer>>>>,
subscriptions: Arc<RwLock<BTreeMap<Topic, Vec<ConsumerId>>>>,
handler: Handler,
}
// message arrives in queue
// message is stored in staging
// after staging, topic and key as a partition is notifing active consumers.
// active consumers now listen and receive events, if they process ok, the offset of a consumer will move ahead in the partition
// so we keep a system of record of each consumer
impl Consumers {
pub fn new(handler: Handler) -> Self {
Self {
storage: Arc::default(),
subscriptions: Arc::default(),
handler,
}
}
pub fn add_consumer(
&self,
id: impl Into<ConsumerId>,
index: impl Into<ConsumerIndex>,
topic: impl Into<Topic>,
) -> anyhow::Result<()> {
let id = id.into();
let index = index.into();
let topic = topic.into();
{
let mut storage = self.storage.write().unwrap();
if !storage.contains_key(&id) {
storage.insert(id.clone(), BTreeMap::default());
}
let consumer_group = storage.get_mut(&id).unwrap();
if !consumer_group.contains_key(&index) {
consumer_group.insert(index.clone(), Consumer::default());
}
}
{
let mut subscriptions = self.subscriptions.write().unwrap();
if !subscriptions.contains_key(&topic) {
subscriptions.insert(topic.clone(), Vec::default());
}
let subscription_consumers = subscriptions.get_mut(&topic).unwrap();
if !subscription_consumers.contains(&id) {
subscription_consumers.push(id.clone());
}
}
Ok(())
}
pub fn get_consumer(
&self,
id: impl Into<ConsumerId>,
index: impl Into<ConsumerIndex>,
) -> Option<Consumer> {
let storage = self.storage.read().unwrap();
let consumer_group = storage.get(&id.into())?;
let consumer = consumer_group.get(&index.into())?;
Some(consumer.to_owned())
}
pub fn notify_update(
&self,
topic: impl Into<Topic>,
key: Option<PartitionKey>,
offset: impl Into<TopicOffset>,
) -> anyhow::Result<()> {
let topic = topic.into();
let offset = offset.into();
let subscriptions = self.subscriptions.read().unwrap();
let subscription = match subscriptions.get(&topic) {
Some(s) => s,
None => {
tracing::debug!(topic = &topic, "no subscription for topic");
return Ok(());
}
};
let mut storage = self.storage.write().unwrap();
for consumer_id in subscription {
match storage.get_mut(consumer_id) {
Some(consumer_groups) => {
for consumer in consumer_groups.values_mut() {
// TODO: Implement retry logic, etc.
self.handler.handle_offset(&topic, &key, consumer, offset)?;
consumer.offset = offset;
}
}
None => {
tracing::trace!(
topic = &topic,
consumer_id = &consumer_id,
"found no consumer"
)
}
}
}
Ok(())
}
}
pub trait ConsumersState {
fn consumers(&self) -> Consumers;
}
impl ConsumersState for SharedState {
fn consumers(&self) -> Consumers {
self.consumers.clone()
}
}
#[cfg(test)]
mod test {
use crate::services::staging::{Staging, StagingEvent};
use super::*;
#[test]
fn can_add_consumer() -> anyhow::Result<()> {
let consumer_id = "some-consumer-id";
let consumer_index = "some-consumer-index";
let topic = "some-topic";
let consumers = Consumers::new(Handler::new(Staging::default()));
consumers.add_consumer(consumer_id, consumer_index, topic)?;
let consumer = consumers.get_consumer(consumer_id, consumer_index);
assert_eq!(Some(Consumer { offset: 0 }), consumer);
Ok(())
}
#[tokio::test]
async fn can_notify_consumer() -> anyhow::Result<()> {
let consumer_id = "some-consumer-id".to_string();
let consumer_index = "some-consumer-index".to_string();
let topic = "some-topic".to_string();
let offset = 10usize;
let staging = Staging::default();
// Publish 10 messages
for _ in 0..10 {
staging
.publish(StagingEvent {
topic: topic.clone(),
key: "".into(),
id: None,
})
.await?;
}
let consumers = Consumers::new(Handler::new(staging));
consumers.add_consumer(&consumer_id, &consumer_index, &topic)?;
let consumer = consumers.get_consumer(&consumer_id, &consumer_index);
assert_eq!(Some(Consumer { offset: 0 }), consumer);
consumers.notify_update(&topic, None, offset)?;
let consumer = consumers.get_consumer(&consumer_id, &consumer_index);
assert_eq!(Some(Consumer { offset: 10 }), consumer);
Ok(())
}
}

View File

@ -5,21 +5,33 @@ use std::{
use crate::state::SharedState; use crate::state::SharedState;
use super::consumers::{Consumers, PartitionKey, Topic, TopicOffset};
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct StagingEvent { pub struct StagingEvent {
pub topic: String, pub topic: String,
pub key: String, pub key: String,
pub id: Option<String>, pub id: Option<String>,
} }
#[derive(Default, Clone)] #[derive(Clone, Default)]
pub struct Staging { pub struct Staging {
// Temporary until we've got an actual file disk store // Temporary until we've got an actual file disk store
#[allow(clippy::complexity)] #[allow(clippy::complexity)]
store: Arc<RwLock<BTreeMap<String, BTreeMap<String, Vec<StagingEvent>>>>>, store: Arc<RwLock<BTreeMap<Topic, BTreeMap<PartitionKey, Vec<StagingEvent>>>>>,
} }
impl Staging { impl Staging {
pub async fn publish(&self, staging_event: impl Into<StagingEvent>) -> anyhow::Result<()> { pub fn new() -> Self {
Self {
store: Arc::default(),
}
}
pub async fn publish(
&self,
staging_event: impl Into<StagingEvent>,
) -> anyhow::Result<TopicOffset> {
let staging_event: StagingEvent = staging_event.into(); let staging_event: StagingEvent = staging_event.into();
let mut store = self.store.write().unwrap(); let mut store = self.store.write().unwrap();
tracing::trace!( tracing::trace!(
@ -28,23 +40,27 @@ impl Staging {
"moving event to staging" "moving event to staging"
); );
match store.get_mut(&staging_event.topic) { let offset = match store.get_mut(&staging_event.topic) {
Some(part) => match part.get_mut(&staging_event.key) { Some(part) => match part.get_mut(&staging_event.key) {
Some(existing_key_part) => { Some(existing_key_part) => {
if staging_event.id.is_none() if staging_event.id.is_none()
|| !existing_key_part.iter().any(|p| p.id == staging_event.id) || !existing_key_part.iter().any(|p| p.id == staging_event.id)
{ {
existing_key_part.push(staging_event); existing_key_part.push(staging_event);
existing_key_part.len() - 1
} else { } else {
tracing::debug!( tracing::debug!(
topic = staging_event.topic, topic = staging_event.topic,
id = staging_event.id, id = staging_event.id,
"event already found, skipping" "event already found, skipping"
); );
existing_key_part.len() - 1
} }
} }
None => { None => {
part.insert(staging_event.key.to_owned(), vec![staging_event]); part.insert(staging_event.key.to_owned(), vec![staging_event]);
1
} }
}, },
None => { None => {
@ -57,10 +73,12 @@ impl Staging {
staging_event.topic.to_owned(), staging_event.topic.to_owned(),
BTreeMap::from([(staging_event.key.to_owned(), vec![staging_event])]), BTreeMap::from([(staging_event.key.to_owned(), vec![staging_event])]),
); );
}
}
Ok(()) 1
}
};
Ok(offset)
} }
pub async fn get_topics(&self) -> anyhow::Result<Vec<String>> { pub async fn get_topics(&self) -> anyhow::Result<Vec<String>> {
@ -79,6 +97,61 @@ impl Staging {
Ok(items.cloned().collect::<Vec<_>>()) Ok(items.cloned().collect::<Vec<_>>())
} }
pub fn get_topic_offset(
&self,
topic: impl Into<String>,
partition_key: Option<impl Into<PartitionKey>>,
start: impl Into<TopicOffset>,
end: impl Into<TopicOffset>,
) -> anyhow::Result<Vec<StagingEvent>> {
let topic = topic.into();
let partition_key = partition_key.map(|p| p.into()).unwrap_or_default();
let start = start.into();
let end = end.into();
if start == end {
return Ok(Vec::new());
}
if start > end {
anyhow::bail!(
"start cannot be greater than end, (start={}, end={})",
start,
end
)
}
let store = self.store.read().unwrap();
let partitions = match store.get(&topic) {
Some(partitions) => partitions,
None => {
anyhow::bail!("topic doesn't exist in storage: {}", &topic);
}
};
let partition = match partitions.get(&partition_key) {
Some(partition) => partition,
None => {
anyhow::bail!(
"partition doesn't exist in storage, (topic={}, partition={})",
&topic,
&partition_key
);
}
};
if partition.len() < end {
anyhow::bail!(
"partition len is less than the offset, (partition_len={}, offset={})",
partition.len(),
end
)
}
Ok(partition[start..end].to_vec())
}
} }
pub trait StagingState { pub trait StagingState {

View File

@ -3,7 +3,7 @@ use std::{ops::Deref, sync::Arc};
use anyhow::Context; use anyhow::Context;
use sqlx::{Pool, Postgres}; use sqlx::{Pool, Postgres};
use crate::services::staging::Staging; use crate::services::{consumers::Consumers, handler::Handler, staging::Staging};
#[derive(Clone)] #[derive(Clone)]
pub struct SharedState(Arc<State>); pub struct SharedState(Arc<State>);
@ -25,6 +25,8 @@ impl Deref for SharedState {
pub struct State { pub struct State {
pub db: Pool<Postgres>, pub db: Pool<Postgres>,
pub staging: Staging, pub staging: Staging,
pub consumers: Consumers,
pub handler: Handler,
} }
impl State { impl State {
@ -41,9 +43,14 @@ impl State {
let _ = sqlx::query("SELECT 1;").fetch_one(&db).await?; let _ = sqlx::query("SELECT 1;").fetch_one(&db).await?;
let staging = Staging::default();
let handler = Handler::new(staging.clone());
Ok(Self { Ok(Self {
db, db,
staging: Staging::default(), consumers: Consumers::new(handler.clone()),
staging,
handler,
}) })
} }
} }