From 8c1f7f829d8c3262c51fb1e4cd1563f8f7ae9c21 Mon Sep 17 00:00:00 2001 From: kjuulh Date: Mon, 12 Aug 2024 00:26:17 +0200 Subject: [PATCH] feat: add operations endpoint to get topics Signed-off-by: kjuulh --- crates/nodata/src/gen/nodata.rs | 37 +++++-- crates/nodata/src/gen/nodata.tonic.rs | 143 ++++++++++++++++++++++++++ crates/nodata/src/grpc.rs | 26 +++++ crates/nodata/src/main.rs | 38 ++++++- crates/nodata/src/services/staging.rs | 17 +++ proto/nomicon.proto | 13 +++ 6 files changed, 264 insertions(+), 10 deletions(-) diff --git a/crates/nodata/src/gen/nodata.rs b/crates/nodata/src/gen/nodata.rs index 84dde39..4a7329f 100644 --- a/crates/nodata/src/gen/nodata.rs +++ b/crates/nodata/src/gen/nodata.rs @@ -2,19 +2,42 @@ #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct PublishEventRequest { - #[prost(string, tag = "1")] + #[prost(string, tag="1")] pub topic: ::prost::alloc::string::String, - #[prost(message, optional, tag = "2")] + #[prost(message, optional, tag="2")] pub published: ::core::option::Option<::prost_types::Timestamp>, - #[prost(string, tag = "3")] + #[prost(string, tag="3")] pub key: ::prost::alloc::string::String, - #[prost(bytes = "vec", tag = "4")] + #[prost(bytes="vec", tag="4")] pub value: ::prost::alloc::vec::Vec, - #[prost(string, optional, tag = "5")] + #[prost(string, optional, tag="5")] pub id: ::core::option::Option<::prost::alloc::string::String>, } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] -pub struct PublishEventResponse {} +pub struct PublishEventResponse { +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct GetTopicsRequest { +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct GetTopicsResponse { + #[prost(string, repeated, tag="1")] + pub topics: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct GetKeysRequest { + #[prost(string, tag="1")] + pub topic: ::prost::alloc::string::String, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct GetKeysResponse { + #[prost(string, repeated, tag="1")] + pub keys: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, +} include!("nodata.tonic.rs"); -// @@protoc_insertion_point(module) +// @@protoc_insertion_point(module) \ No newline at end of file diff --git a/crates/nodata/src/gen/nodata.tonic.rs b/crates/nodata/src/gen/nodata.tonic.rs index 8c85f58..75710ed 100644 --- a/crates/nodata/src/gen/nodata.tonic.rs +++ b/crates/nodata/src/gen/nodata.tonic.rs @@ -109,6 +109,50 @@ pub mod no_data_client { .insert(GrpcMethod::new("nodata.NoData", "PublishEvent")); self.inner.unary(req, path, codec).await } + pub async fn get_topics( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static("/nodata.NoData/GetTopics"); + let mut req = request.into_request(); + req.extensions_mut().insert(GrpcMethod::new("nodata.NoData", "GetTopics")); + self.inner.unary(req, path, codec).await + } + pub async fn get_keys( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static("/nodata.NoData/GetKeys"); + let mut req = request.into_request(); + req.extensions_mut().insert(GrpcMethod::new("nodata.NoData", "GetKeys")); + self.inner.unary(req, path, codec).await + } } } /// Generated server implementations. @@ -125,6 +169,17 @@ pub mod no_data_server { tonic::Response, tonic::Status, >; + async fn get_topics( + &self, + request: tonic::Request, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; + async fn get_keys( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status>; } #[derive(Debug)] pub struct NoDataServer { @@ -251,6 +306,94 @@ pub mod no_data_server { }; Box::pin(fut) } + "/nodata.NoData/GetTopics" => { + #[allow(non_camel_case_types)] + struct GetTopicsSvc(pub Arc); + impl tonic::server::UnaryService + for GetTopicsSvc { + type Response = super::GetTopicsResponse; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::get_topics(&inner, request).await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let inner = inner.0; + let method = GetTopicsSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + "/nodata.NoData/GetKeys" => { + #[allow(non_camel_case_types)] + struct GetKeysSvc(pub Arc); + impl tonic::server::UnaryService + for GetKeysSvc { + type Response = super::GetKeysResponse; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::get_keys(&inner, request).await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let inner = inner.0; + let method = GetKeysSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } _ => { Box::pin(async move { Ok( diff --git a/crates/nodata/src/grpc.rs b/crates/nodata/src/grpc.rs index 26bf2b5..2809d17 100644 --- a/crates/nodata/src/grpc.rs +++ b/crates/nodata/src/grpc.rs @@ -48,6 +48,32 @@ impl no_data_server::NoData for GrpcServer { Ok(tonic::Response::new(PublishEventResponse {})) } + + async fn get_topics( + &self, + _request: tonic::Request, + ) -> std::result::Result, tonic::Status> { + let topics = self.state.staging.get_topics().await.map_err(|e| { + tracing::warn!(error = e.to_string(), "failed to get topics"); + tonic::Status::internal(e.to_string()) + })?; + + Ok(tonic::Response::new(GetTopicsResponse { topics })) + } + + async fn get_keys( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status> { + let req = request.into_inner(); + + let keys = self.state.staging.get_keys(&req.topic).await.map_err(|e| { + tracing::warn!(error = e.to_string(), "failed to get keys"); + tonic::Status::internal(e.to_string()) + })?; + + Ok(tonic::Response::new(GetKeysResponse { keys })) + } } impl From for StagingEvent { diff --git a/crates/nodata/src/main.rs b/crates/nodata/src/main.rs index 16d1993..836ba42 100644 --- a/crates/nodata/src/main.rs +++ b/crates/nodata/src/main.rs @@ -8,7 +8,7 @@ use std::net::SocketAddr; use chrono::{Datelike, Timelike}; use clap::{Parser, Subcommand, ValueEnum}; -use grpc::{GrpcServer, PublishEventRequest}; +use grpc::{GetKeysRequest, GetTopicsRequest, GrpcServer, PublishEventRequest}; use http::HttpServer; use mad::Mad; use state::SharedState; @@ -59,6 +59,11 @@ enum ClientCommands { #[arg(long = "generate-id")] generate_id: bool, }, + GetTopics {}, + GetKeys { + #[arg(long)] + topic: String, + }, } #[tokio::main] @@ -89,8 +94,7 @@ async fn main() -> anyhow::Result<()> { id, generate_id, } => { - let mut client = - crate::grpc::no_data_client::NoDataClient::connect(grpc_host).await?; + let mut client = create_client(grpc_host).await?; let timestamp = chrono::Utc::now(); @@ -122,8 +126,36 @@ async fn main() -> anyhow::Result<()> { }) .await?; } + ClientCommands::GetTopics {} => { + let mut client = create_client(grpc_host).await?; + + println!("Listing topics"); + let topics = client.get_topics(GetTopicsRequest {}).await?; + + for topic in topics.into_inner().topics { + println!("{topic}"); + } + } + ClientCommands::GetKeys { topic } => { + let mut client = create_client(grpc_host).await?; + + println!("Listing keys for topic: {}", topic); + let keys = client.get_keys(GetKeysRequest { topic }).await?; + + for key in keys.into_inner().keys { + println!("{key}"); + } + } }, } Ok(()) } + +async fn create_client( + grpc_host: String, +) -> anyhow::Result> { + let client = crate::grpc::no_data_client::NoDataClient::connect(grpc_host).await?; + + Ok(client) +} diff --git a/crates/nodata/src/services/staging.rs b/crates/nodata/src/services/staging.rs index 49d506a..566ff56 100644 --- a/crates/nodata/src/services/staging.rs +++ b/crates/nodata/src/services/staging.rs @@ -61,6 +61,23 @@ impl Staging { Ok(()) } + + pub async fn get_topics(&self) -> anyhow::Result> { + let store = self.store.read().unwrap(); + + Ok(store.keys().cloned().collect::>()) + } + + pub async fn get_keys(&self, topic: impl Into) -> anyhow::Result> { + let store = self.store.read().unwrap(); + + let items = store + .get(&topic.into()) + .map(|tree| tree.keys()) + .unwrap_or_default(); + + Ok(items.cloned().collect::>()) + } } pub trait StagingState { diff --git a/proto/nomicon.proto b/proto/nomicon.proto index fe0e518..9bf12ca 100644 --- a/proto/nomicon.proto +++ b/proto/nomicon.proto @@ -6,6 +6,8 @@ package nodata; service NoData { rpc PublishEvent(PublishEventRequest) returns (PublishEventResponse) {} + rpc GetTopics(GetTopicsRequest) returns (GetTopicsResponse) {} + rpc GetKeys(GetKeysRequest) returns (GetKeysResponse) {} } message PublishEventRequest { @@ -19,3 +21,14 @@ message PublishEventRequest { message PublishEventResponse { } + +message GetTopicsRequest {} +message GetTopicsResponse { + repeated string topics = 1; +} +message GetKeysRequest { + string topic = 1; +} +message GetKeysResponse { + repeated string keys = 1; +}