feat: remove keys and ids

Signed-off-by: kjuulh <contact@kjuulh.io>
This commit is contained in:
2024-11-09 11:47:50 +01:00
parent b9be9a3ef1
commit a327cdb02e
15 changed files with 596 additions and 680 deletions

View File

@@ -1,24 +1,19 @@
// @generated
// This file is @generated by prost-build.
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct PublishEventRequest {
#[prost(string, tag="1")]
pub topic: ::prost::alloc::string::String,
#[prost(message, optional, tag="2")]
pub published: ::core::option::Option<::prost_types::Timestamp>,
#[prost(string, tag="3")]
pub key: ::prost::alloc::string::String,
#[prost(bytes="vec", tag="4")]
#[prost(bytes="vec", tag="2")]
pub value: ::prost::alloc::vec::Vec<u8>,
#[prost(string, optional, tag="5")]
pub id: ::core::option::Option<::prost::alloc::string::String>,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
#[derive(Clone, Copy, PartialEq, ::prost::Message)]
pub struct PublishEventResponse {
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
#[derive(Clone, Copy, PartialEq, ::prost::Message)]
pub struct GetTopicsRequest {
}
#[allow(clippy::derive_partial_eq_without_eq)]
@@ -29,50 +24,32 @@ pub struct GetTopicsResponse {
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct GetKeysRequest {
#[prost(string, tag="1")]
pub topic: ::prost::alloc::string::String,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct GetKeysResponse {
#[prost(string, repeated, tag="1")]
pub keys: ::prost::alloc::vec::Vec<::prost::alloc::string::String>,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct SubscribeRequest {
#[prost(string, tag="1")]
pub topic: ::prost::alloc::string::String,
#[prost(string, tag="2")]
pub key: ::prost::alloc::string::String,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct SubscribeResponse {
#[prost(string, tag="1")]
pub id: ::prost::alloc::string::String,
#[prost(message, optional, tag="2")]
pub published: ::core::option::Option<::prost_types::Timestamp>,
#[prost(uint64, tag="3")]
pub offset: u64,
#[prost(bytes="vec", tag="4")]
pub value: ::prost::alloc::vec::Vec<u8>,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
#[derive(Clone, Copy, PartialEq, ::prost::Message)]
pub struct HandleMsgRequest {
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
#[derive(Clone, Copy, PartialEq, ::prost::Message)]
pub struct HandleMsgResponse {
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
#[derive(Clone, Copy, PartialEq, ::prost::Message)]
pub struct PingRequest {
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
#[derive(Clone, Copy, PartialEq, ::prost::Message)]
pub struct PingResponse {
}
include!("nodata.v1.tonic.rs");

View File

@@ -134,31 +134,6 @@ pub mod no_data_service_client {
.insert(GrpcMethod::new("nodata.v1.NoDataService", "GetTopics"));
self.inner.unary(req, path, codec).await
}
pub async fn get_keys(
&mut self,
request: impl tonic::IntoRequest<super::GetKeysRequest>,
) -> std::result::Result<
tonic::Response<super::GetKeysResponse>,
tonic::Status,
> {
self.inner
.ready()
.await
.map_err(|e| {
tonic::Status::new(
tonic::Code::Unknown,
format!("Service was not ready: {}", e.into()),
)
})?;
let codec = tonic::codec::ProstCodec::default();
let path = http::uri::PathAndQuery::from_static(
"/nodata.v1.NoDataService/GetKeys",
);
let mut req = request.into_request();
req.extensions_mut()
.insert(GrpcMethod::new("nodata.v1.NoDataService", "GetKeys"));
self.inner.unary(req, path, codec).await
}
pub async fn subscribe(
&mut self,
request: impl tonic::IntoRequest<super::SubscribeRequest>,
@@ -207,10 +182,6 @@ pub mod no_data_service_server {
tonic::Response<super::GetTopicsResponse>,
tonic::Status,
>;
async fn get_keys(
&self,
request: tonic::Request<super::GetKeysRequest>,
) -> std::result::Result<tonic::Response<super::GetKeysResponse>, tonic::Status>;
/// Server streaming response type for the Subscribe method.
type SubscribeStream: tonic::codegen::tokio_stream::Stream<
Item = std::result::Result<super::SubscribeResponse, tonic::Status>,
@@ -393,52 +364,6 @@ pub mod no_data_service_server {
};
Box::pin(fut)
}
"/nodata.v1.NoDataService/GetKeys" => {
#[allow(non_camel_case_types)]
struct GetKeysSvc<T: NoDataService>(pub Arc<T>);
impl<
T: NoDataService,
> tonic::server::UnaryService<super::GetKeysRequest>
for GetKeysSvc<T> {
type Response = super::GetKeysResponse;
type Future = BoxFuture<
tonic::Response<Self::Response>,
tonic::Status,
>;
fn call(
&mut self,
request: tonic::Request<super::GetKeysRequest>,
) -> Self::Future {
let inner = Arc::clone(&self.0);
let fut = async move {
<T as NoDataService>::get_keys(&inner, request).await
};
Box::pin(fut)
}
}
let accept_compression_encodings = self.accept_compression_encodings;
let send_compression_encodings = self.send_compression_encodings;
let max_decoding_message_size = self.max_decoding_message_size;
let max_encoding_message_size = self.max_encoding_message_size;
let inner = self.inner.clone();
let fut = async move {
let inner = inner.0;
let method = GetKeysSvc(inner);
let codec = tonic::codec::ProstCodec::default();
let mut grpc = tonic::server::Grpc::new(codec)
.apply_compression_config(
accept_compression_encodings,
send_compression_encodings,
)
.apply_max_message_size_config(
max_decoding_message_size,
max_encoding_message_size,
);
let res = grpc.unary(method, req).await;
Ok(res)
};
Box::pin(fut)
}
"/nodata.v1.NoDataService/Subscribe" => {
#[allow(non_camel_case_types)]
struct SubscribeSvc<T: NoDataService>(pub Arc<T>);

View File

@@ -23,7 +23,6 @@ sqlx = { version = "0.7.3", features = [
] }
uuid = { version = "1.7.0", features = ["v4"] }
tower-http = { version = "0.5.2", features = ["cors", "trace"] }
mad = { git = "https://github.com/kjuulh/mad", branch = "main" }
tokio-util = "0.7.11"
tonic.workspace = true
bytes.workspace = true
@@ -33,6 +32,7 @@ chrono = { version = "0.4.38", features = ["serde"] }
tokio-stream = "0.1.15"
dagger-sdk = "0.11.10"
rand = "0.8.5"
notmad = "0.4.0"
[dev-dependencies]
tracing-test = { version = "0.2.5" } #, features = ["no-env-filter"] }

View File

@@ -2,7 +2,7 @@ use std::{collections::BTreeMap, sync::Arc, time::Duration};
use axum::async_trait;
use drift::Drifter;
use mad::Component;
use notmad::Component;
use tokio::sync::RwLock;
use tokio_util::sync::CancellationToken;
@@ -32,7 +32,7 @@ impl Component for Broker {
async fn run(
&self,
cancellation_token: tokio_util::sync::CancellationToken,
) -> Result<(), mad::MadError> {
) -> Result<(), notmad::MadError> {
let token = drift::schedule_drifter(Duration::from_secs(1), self.clone());
tokio::select! {

View File

@@ -1,24 +1,19 @@
// @generated
// This file is @generated by prost-build.
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct PublishEventRequest {
#[prost(string, tag="1")]
pub topic: ::prost::alloc::string::String,
#[prost(message, optional, tag="2")]
pub published: ::core::option::Option<::prost_types::Timestamp>,
#[prost(string, tag="3")]
pub key: ::prost::alloc::string::String,
#[prost(bytes="vec", tag="4")]
#[prost(bytes="vec", tag="2")]
pub value: ::prost::alloc::vec::Vec<u8>,
#[prost(string, optional, tag="5")]
pub id: ::core::option::Option<::prost::alloc::string::String>,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
#[derive(Clone, Copy, PartialEq, ::prost::Message)]
pub struct PublishEventResponse {
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
#[derive(Clone, Copy, PartialEq, ::prost::Message)]
pub struct GetTopicsRequest {
}
#[allow(clippy::derive_partial_eq_without_eq)]
@@ -29,50 +24,32 @@ pub struct GetTopicsResponse {
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct GetKeysRequest {
#[prost(string, tag="1")]
pub topic: ::prost::alloc::string::String,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct GetKeysResponse {
#[prost(string, repeated, tag="1")]
pub keys: ::prost::alloc::vec::Vec<::prost::alloc::string::String>,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct SubscribeRequest {
#[prost(string, tag="1")]
pub topic: ::prost::alloc::string::String,
#[prost(string, tag="2")]
pub key: ::prost::alloc::string::String,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct SubscribeResponse {
#[prost(string, tag="1")]
pub id: ::prost::alloc::string::String,
#[prost(message, optional, tag="2")]
pub published: ::core::option::Option<::prost_types::Timestamp>,
#[prost(uint64, tag="3")]
pub offset: u64,
#[prost(bytes="vec", tag="4")]
pub value: ::prost::alloc::vec::Vec<u8>,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
#[derive(Clone, Copy, PartialEq, ::prost::Message)]
pub struct HandleMsgRequest {
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
#[derive(Clone, Copy, PartialEq, ::prost::Message)]
pub struct HandleMsgResponse {
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
#[derive(Clone, Copy, PartialEq, ::prost::Message)]
pub struct PingRequest {
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
#[derive(Clone, Copy, PartialEq, ::prost::Message)]
pub struct PingResponse {
}
include!("nodata.v1.tonic.rs");

View File

@@ -134,31 +134,6 @@ pub mod no_data_service_client {
.insert(GrpcMethod::new("nodata.v1.NoDataService", "GetTopics"));
self.inner.unary(req, path, codec).await
}
pub async fn get_keys(
&mut self,
request: impl tonic::IntoRequest<super::GetKeysRequest>,
) -> std::result::Result<
tonic::Response<super::GetKeysResponse>,
tonic::Status,
> {
self.inner
.ready()
.await
.map_err(|e| {
tonic::Status::new(
tonic::Code::Unknown,
format!("Service was not ready: {}", e.into()),
)
})?;
let codec = tonic::codec::ProstCodec::default();
let path = http::uri::PathAndQuery::from_static(
"/nodata.v1.NoDataService/GetKeys",
);
let mut req = request.into_request();
req.extensions_mut()
.insert(GrpcMethod::new("nodata.v1.NoDataService", "GetKeys"));
self.inner.unary(req, path, codec).await
}
pub async fn subscribe(
&mut self,
request: impl tonic::IntoRequest<super::SubscribeRequest>,
@@ -207,10 +182,6 @@ pub mod no_data_service_server {
tonic::Response<super::GetTopicsResponse>,
tonic::Status,
>;
async fn get_keys(
&self,
request: tonic::Request<super::GetKeysRequest>,
) -> std::result::Result<tonic::Response<super::GetKeysResponse>, tonic::Status>;
/// Server streaming response type for the Subscribe method.
type SubscribeStream: tonic::codegen::tokio_stream::Stream<
Item = std::result::Result<super::SubscribeResponse, tonic::Status>,
@@ -393,52 +364,6 @@ pub mod no_data_service_server {
};
Box::pin(fut)
}
"/nodata.v1.NoDataService/GetKeys" => {
#[allow(non_camel_case_types)]
struct GetKeysSvc<T: NoDataService>(pub Arc<T>);
impl<
T: NoDataService,
> tonic::server::UnaryService<super::GetKeysRequest>
for GetKeysSvc<T> {
type Response = super::GetKeysResponse;
type Future = BoxFuture<
tonic::Response<Self::Response>,
tonic::Status,
>;
fn call(
&mut self,
request: tonic::Request<super::GetKeysRequest>,
) -> Self::Future {
let inner = Arc::clone(&self.0);
let fut = async move {
<T as NoDataService>::get_keys(&inner, request).await
};
Box::pin(fut)
}
}
let accept_compression_encodings = self.accept_compression_encodings;
let send_compression_encodings = self.send_compression_encodings;
let max_decoding_message_size = self.max_decoding_message_size;
let max_encoding_message_size = self.max_encoding_message_size;
let inner = self.inner.clone();
let fut = async move {
let inner = inner.0;
let method = GetKeysSvc(inner);
let codec = tonic::codec::ProstCodec::default();
let mut grpc = tonic::server::Grpc::new(codec)
.apply_compression_config(
accept_compression_encodings,
send_compression_encodings,
)
.apply_max_message_size_config(
max_decoding_message_size,
max_encoding_message_size,
);
let res = grpc.unary(method, req).await;
Ok(res)
};
Box::pin(fut)
}
"/nodata.v1.NoDataService/Subscribe" => {
#[allow(non_camel_case_types)]
struct SubscribeSvc<T: NoDataService>(pub Arc<T>);

View File

@@ -1,7 +1,7 @@
use std::{net::SocketAddr, pin::Pin};
use anyhow::Context;
use mad::Component;
use notmad::Component;
use tokio_stream::wrappers::ReceiverStream;
use tonic::Response;
use uuid::Uuid;
@@ -41,9 +41,7 @@ impl no_data_service_server::NoDataService for GrpcServer {
tracing::debug!(
topic = req.topic,
key = req.key,
value = std::str::from_utf8(&req.value).ok(),
id = req.id,
"handling event"
);
@@ -67,20 +65,6 @@ impl no_data_service_server::NoDataService for GrpcServer {
Ok(tonic::Response::new(GetTopicsResponse { topics }))
}
async fn get_keys(
&self,
request: tonic::Request<GetKeysRequest>,
) -> std::result::Result<tonic::Response<GetKeysResponse>, tonic::Status> {
let req = request.into_inner();
let keys = self.state.staging.get_keys(&req.topic).await.map_err(|e| {
tracing::warn!(error = e.to_string(), "failed to get keys");
tonic::Status::internal(e.to_string())
})?;
Ok(tonic::Response::new(GetKeysResponse { keys }))
}
type SubscribeStream = ResponseStream;
async fn subscribe(
@@ -109,9 +93,7 @@ impl no_data_service_server::NoDataService for GrpcServer {
while let Some(msg) = event_stream.recv().await {
if let Err(e) = tx
.send(Ok(SubscribeResponse {
id: msg.id.unwrap_or_default(),
published: Some(chrono_to_prost_timestamp(msg.published)),
offset: 0,
value: msg.value,
}))
.await
@@ -130,12 +112,7 @@ impl From<PublishEventRequest> for StagingEvent {
fn from(value: PublishEventRequest) -> Self {
Self {
topic: value.topic,
id: value.id,
key: value.key,
published: value
.published
.and_then(prost_timestamp_to_chrono)
.unwrap_or_default(),
published: chrono::Utc::now(),
value: value.value,
}
}
@@ -161,7 +138,7 @@ impl Component for GrpcServer {
async fn run(
&self,
_cancellation_token: tokio_util::sync::CancellationToken,
) -> Result<(), mad::MadError> {
) -> Result<(), notmad::MadError> {
tracing::info!("grpc listening on: {}", self.host);
tonic::transport::Server::builder()
@@ -171,7 +148,7 @@ impl Component for GrpcServer {
.serve(self.host)
.await
.context("grpc server failed")
.map_err(mad::MadError::Inner)?;
.map_err(notmad::MadError::Inner)?;
Ok(())
}

View File

@@ -6,8 +6,8 @@ use axum::extract::MatchedPath;
use axum::http::Request;
use axum::routing::get;
use axum::Router;
use mad::Component;
use mad::MadError;
use notmad::Component;
use notmad::MadError;
use tokio_util::sync::CancellationToken;
use tower_http::trace::TraceLayer;
@@ -29,7 +29,7 @@ impl HttpServer {
#[async_trait]
impl Component for HttpServer {
async fn run(&self, _cancellation_token: CancellationToken) -> Result<(), mad::MadError> {
async fn run(&self, _cancellation_token: CancellationToken) -> Result<(), notmad::MadError> {
let app = Router::new()
.route("/", get(root))
.with_state(self.state.clone())

View File

@@ -11,14 +11,12 @@ mod services;
use std::net::SocketAddr;
use broker::Broker;
use chrono::{Datelike, Timelike};
use clap::{Parser, Subcommand};
use grpc::{GetKeysRequest, GetTopicsRequest, GrpcServer, PublishEventRequest, SubscribeRequest};
use grpc::{GetTopicsRequest, GrpcServer, PublishEventRequest, SubscribeRequest};
use grpc_component::GrpcComponentClient;
use http::HttpServer;
use mad::Mad;
use notmad::Mad;
use state::SharedState;
use uuid::Uuid;
#[derive(Parser)]
#[command(author, version, about, long_about = None, subcommand_required = true)]
@@ -57,24 +55,12 @@ enum ClientCommands {
#[arg(long)]
topic: String,
#[arg(long)]
key: String,
#[arg(long)]
value: String,
#[arg(long)]
id: Option<String>,
#[arg(long = "generate-id")]
generate_id: bool,
},
GetTopics {},
GetKeys {
#[arg(long)]
topic: String,
},
SubscribeTopic {
#[arg(long)]
topic: String,
#[arg(long)]
key: String,
},
ComponentPing {
#[arg(long)]
@@ -103,13 +89,7 @@ async fn main() -> anyhow::Result<()> {
commands,
grpc_host,
} => match commands {
ClientCommands::PublishEvent {
topic,
key,
value,
id,
generate_id,
} => {
ClientCommands::PublishEvent { topic, value } => {
let mut client = create_client(grpc_host).await?;
let timestamp = chrono::Utc::now();
@@ -117,28 +97,7 @@ async fn main() -> anyhow::Result<()> {
let _ = client
.publish_event(PublishEventRequest {
topic,
published: Some(prost_types::Timestamp::date_time_nanos(
timestamp.year() as i64,
timestamp.month() as u8,
timestamp.day() as u8,
timestamp.hour() as u8,
timestamp.minute() as u8,
timestamp.second() as u8,
timestamp.nanosecond(),
)?),
key,
value: value.into_bytes(),
id: {
if id.is_none() {
if generate_id {
Some(Uuid::new_v4().to_string())
} else {
None
}
} else {
id
}
},
})
.await?;
}
@@ -152,28 +111,17 @@ async fn main() -> anyhow::Result<()> {
println!("{topic}");
}
}
ClientCommands::GetKeys { topic } => {
let mut client = create_client(grpc_host).await?;
println!("Listing keys for topic: {}", topic);
let keys = client.get_keys(GetKeysRequest { topic }).await?;
for key in keys.into_inner().keys {
println!("{key}");
}
}
ClientCommands::SubscribeTopic { topic, key } => {
ClientCommands::SubscribeTopic { topic } => {
let mut client = create_client(grpc_host).await?;
println!("listening for events in topic: {}", topic);
let resp = client.subscribe(SubscribeRequest { topic, key }).await?;
let resp = client.subscribe(SubscribeRequest { topic }).await?;
let mut stream = resp.into_inner();
while let Some(msg) = stream.message().await? {
println!(
"msg (id={}, published={}): {}",
msg.id,
"msg (published={}): {}",
msg.published.unwrap_or_default(),
std::str::from_utf8(&msg.value).unwrap_or_default(),
)

View File

@@ -122,13 +122,7 @@ impl InnerConsumerGroup {
tracing::debug!(consumer_id = self.id, "sending update for consumer");
state
.handler()
.handle_offset(
&self.topic,
self.partition_key.as_ref(),
consumer,
self.cur_offset,
self.offset,
)
.handle_offset(&self.topic, consumer, self.cur_offset, self.offset)
.await?;
self.cur_offset = self.offset;
@@ -233,12 +227,10 @@ impl Consumers {
pub async fn notify_update(
&self,
topic: impl Into<Topic>,
key: Option<impl Into<PartitionKey>>,
offset: impl Into<TopicOffset>,
) -> anyhow::Result<()> {
let topic = topic.into();
let offset = offset.into();
let _key = key.map(|k| k.into());
let subscriptions = self.subscriptions.read().await;
let subscription = match subscriptions.get(&topic) {
@@ -319,8 +311,6 @@ mod test {
let offset = staging
.publish(StagingEvent {
topic: topic.clone(),
key: "".into(),
id: None,
published: chrono::Utc::now(),
value: Vec::new(),
})
@@ -337,9 +327,7 @@ mod test {
let consumer = consumers.get_consumer_group(&consumer_id).await.unwrap();
assert_eq!(0, consumer.get_offset().await);
consumers
.notify_update(&topic, None::<String>, offset)
.await?;
consumers.notify_update(&topic, offset).await?;
let consumer = consumers.get_consumer_group(&consumer_id).await.unwrap();
assert_eq!(9, consumer.get_offset().await);

View File

@@ -18,14 +18,13 @@ impl Handler {
pub async fn handle_offset(
&self,
topic: &Topic,
partition_key: Option<&PartitionKey>,
consumer: &Consumer,
start_offset: TopicOffset,
end_offset: TopicOffset,
) -> anyhow::Result<()> {
let events = self
.staging
.get_topic_offset(topic, partition_key, start_offset, end_offset)
.get_topic_offset(topic, start_offset, end_offset)
.await?;
// TODO: handle events

View File

@@ -19,16 +19,9 @@ impl Ingest {
pub async fn publish(&self, event: impl Into<StagingEvent>) -> anyhow::Result<()> {
let event: StagingEvent = event.into();
let topic = event.topic.clone();
let key = {
if event.key.is_empty() {
None
} else {
Some(event.key.clone())
}
};
let offset = self.staging.publish(event).await?;
self.consumers.notify_update(topic, key, offset).await?;
self.consumers.notify_update(topic, offset).await?;
Ok(())
}

View File

@@ -9,8 +9,6 @@ use super::consumers::{PartitionKey, Topic, TopicOffset};
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct StagingEvent {
pub topic: String,
pub key: String,
pub id: Option<String>,
pub published: chrono::DateTime<chrono::Utc>,
pub value: Vec<u8>,
}
@@ -19,7 +17,7 @@ pub struct StagingEvent {
pub struct Staging {
// Temporary until we've got an actual file disk store
#[allow(clippy::complexity)]
store: Arc<RwLock<BTreeMap<Topic, BTreeMap<PartitionKey, Vec<StagingEvent>>>>>,
store: Arc<RwLock<BTreeMap<Topic, Vec<StagingEvent>>>>,
}
impl Staging {
@@ -29,45 +27,16 @@ impl Staging {
) -> anyhow::Result<TopicOffset> {
let staging_event: StagingEvent = staging_event.into();
let mut store = self.store.write().await;
tracing::trace!(
topic = staging_event.topic,
id = staging_event.id,
"moving event to staging"
);
tracing::trace!(topic = staging_event.topic, "moving event to staging");
let offset = match store.get_mut(&staging_event.topic) {
Some(part) => match part.get_mut(&staging_event.key) {
Some(existing_key_part) => {
if staging_event.id.is_none()
|| !existing_key_part.iter().any(|p| p.id == staging_event.id)
{
existing_key_part.push(staging_event);
existing_key_part.len() - 1
} else {
tracing::debug!(
topic = staging_event.topic,
id = staging_event.id,
"event already found, skipping"
);
existing_key_part.len() - 1
}
}
None => {
part.insert(staging_event.key.to_owned(), vec![staging_event]);
0
}
},
Some(part) => {
part.push(staging_event);
part.len() - 1
}
None => {
tracing::debug!(
topic = staging_event.topic,
id = staging_event.id,
"new topic, creating partition"
);
store.insert(
staging_event.topic.to_owned(),
BTreeMap::from([(staging_event.key.to_owned(), vec![staging_event])]),
);
tracing::debug!(topic = staging_event.topic, "new topic, creating partition");
store.insert(staging_event.topic.to_owned(), vec![staging_event]);
0
}
@@ -82,26 +51,13 @@ impl Staging {
Ok(store.keys().cloned().collect::<Vec<_>>())
}
pub async fn get_keys(&self, topic: impl Into<String>) -> anyhow::Result<Vec<String>> {
let store = self.store.read().await;
let items = store
.get(&topic.into())
.map(|tree| tree.keys())
.unwrap_or_default();
Ok(items.cloned().collect::<Vec<_>>())
}
pub async fn get_topic_offset(
&self,
topic: impl Into<String>,
partition_key: Option<impl Into<PartitionKey>>,
start: impl Into<TopicOffset>,
end: impl Into<TopicOffset>,
) -> anyhow::Result<Vec<StagingEvent>> {
let topic = topic.into();
let partition_key = partition_key.map(|p| p.into()).unwrap_or_default();
let start = start.into();
let end = end.into();
@@ -126,26 +82,15 @@ impl Staging {
}
};
let partition = match partitions.get(&partition_key) {
Some(partition) => partition,
None => {
anyhow::bail!(
"partition doesn't exist in storage, (topic={}, partition={})",
&topic,
&partition_key
);
}
};
if partition.len() < end {
if partitions.len() < end {
anyhow::bail!(
"partition len is less than the offset, (partition_len={}, offset={})",
partition.len(),
partitions.len(),
end
)
}
Ok(partition[start + 1..=end].to_vec())
Ok(partitions[start + 1..=end].to_vec())
}
}