diff --git a/Cargo.lock b/Cargo.lock index 626c99a..8ecc8ed 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1506,6 +1506,19 @@ dependencies = [ "uuid", ] +[[package]] +name = "nodata-sdk" +version = "0.1.0" +dependencies = [ + "anyhow", + "async-trait", + "bytes", + "prost", + "prost-types", + "tonic", + "tracing", +] + [[package]] name = "nom" version = "7.1.3" diff --git a/Cargo.toml b/Cargo.toml index 9cd866f..bd78e78 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,3 +12,8 @@ clap = { version = "4", features = ["derive", "env"] } dotenv = { version = "0.15" } axum = { version = "0.7" } drift = { git = "https://github.com/kjuulh/drift", branch = "main" } +async-trait = "0.1" +tonic = "0.12.1" +bytes = "1.7.1" +prost = "0.13.1" +prost-types = "0.13.1" diff --git a/buf.gen.yaml b/buf.gen.yaml index e69238f..2ecafa4 100644 --- a/buf.gen.yaml +++ b/buf.gen.yaml @@ -7,5 +7,11 @@ plugins: out: crates/nodata/src/gen - remote: buf.build/community/neoeinstein-tonic:v0.4.0 out: crates/nodata/src/gen + +- remote: buf.build/community/neoeinstein-prost + out: crates/nodata-sdk/src/gen +- remote: buf.build/community/neoeinstein-tonic:v0.4.0 + out: crates/nodata-sdk/src/gen + inputs: - directory: proto diff --git a/crates/nodata-sdk/Cargo.toml b/crates/nodata-sdk/Cargo.toml new file mode 100644 index 0000000..526216b --- /dev/null +++ b/crates/nodata-sdk/Cargo.toml @@ -0,0 +1,13 @@ +[package] +name = "nodata-sdk" +version = "0.1.0" +edition = "2021" + +[dependencies] +anyhow.workspace = true +async-trait.workspace = true +tonic.workspace = true +tracing.workspace = true +bytes.workspace = true +prost.workspace = true +prost-types.workspace = true diff --git a/crates/nodata-sdk/src/gen/nodata.v1.rs b/crates/nodata-sdk/src/gen/nodata.v1.rs new file mode 100644 index 0000000..94757cc --- /dev/null +++ b/crates/nodata-sdk/src/gen/nodata.v1.rs @@ -0,0 +1,79 @@ +// @generated +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct PublishEventRequest { + #[prost(string, tag="1")] + pub topic: ::prost::alloc::string::String, + #[prost(message, optional, tag="2")] + pub published: ::core::option::Option<::prost_types::Timestamp>, + #[prost(string, tag="3")] + pub key: ::prost::alloc::string::String, + #[prost(bytes="vec", tag="4")] + pub value: ::prost::alloc::vec::Vec, + #[prost(string, optional, tag="5")] + pub id: ::core::option::Option<::prost::alloc::string::String>, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct PublishEventResponse { +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct GetTopicsRequest { +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct GetTopicsResponse { + #[prost(string, repeated, tag="1")] + pub topics: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct GetKeysRequest { + #[prost(string, tag="1")] + pub topic: ::prost::alloc::string::String, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct GetKeysResponse { + #[prost(string, repeated, tag="1")] + pub keys: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct SubscribeRequest { + #[prost(string, tag="1")] + pub topic: ::prost::alloc::string::String, + #[prost(string, tag="2")] + pub key: ::prost::alloc::string::String, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct SubscribeResponse { + #[prost(string, tag="1")] + pub id: ::prost::alloc::string::String, + #[prost(message, optional, tag="2")] + pub published: ::core::option::Option<::prost_types::Timestamp>, + #[prost(uint64, tag="3")] + pub offset: u64, + #[prost(bytes="vec", tag="4")] + pub value: ::prost::alloc::vec::Vec, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct HandleMsgRequest { +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct HandleMsgResponse { +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct PingRequest { +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct PingResponse { +} +include!("nodata.v1.tonic.rs"); +// @@protoc_insertion_point(module) \ No newline at end of file diff --git a/crates/nodata-sdk/src/gen/nodata.v1.tonic.rs b/crates/nodata-sdk/src/gen/nodata.v1.tonic.rs new file mode 100644 index 0000000..6fac069 --- /dev/null +++ b/crates/nodata-sdk/src/gen/nodata.v1.tonic.rs @@ -0,0 +1,899 @@ +// @generated +/// Generated client implementations. +pub mod no_data_service_client { + #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] + use tonic::codegen::*; + use tonic::codegen::http::Uri; + #[derive(Debug, Clone)] + pub struct NoDataServiceClient { + inner: tonic::client::Grpc, + } + impl NoDataServiceClient { + /// Attempt to create a new client by connecting to a given endpoint. + pub async fn connect(dst: D) -> Result + where + D: TryInto, + D::Error: Into, + { + let conn = tonic::transport::Endpoint::new(dst)?.connect().await?; + Ok(Self::new(conn)) + } + } + impl NoDataServiceClient + where + T: tonic::client::GrpcService, + T::Error: Into, + T::ResponseBody: Body + Send + 'static, + ::Error: Into + Send, + { + pub fn new(inner: T) -> Self { + let inner = tonic::client::Grpc::new(inner); + Self { inner } + } + pub fn with_origin(inner: T, origin: Uri) -> Self { + let inner = tonic::client::Grpc::with_origin(inner, origin); + Self { inner } + } + pub fn with_interceptor( + inner: T, + interceptor: F, + ) -> NoDataServiceClient> + where + F: tonic::service::Interceptor, + T::ResponseBody: Default, + T: tonic::codegen::Service< + http::Request, + Response = http::Response< + >::ResponseBody, + >, + >, + , + >>::Error: Into + Send + Sync, + { + NoDataServiceClient::new(InterceptedService::new(inner, interceptor)) + } + /// Compress requests with the given encoding. + /// + /// This requires the server to support it otherwise it might respond with an + /// error. + #[must_use] + pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.inner = self.inner.send_compressed(encoding); + self + } + /// Enable decompressing responses. + #[must_use] + pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.inner = self.inner.accept_compressed(encoding); + self + } + /// Limits the maximum size of a decoded message. + /// + /// Default: `4MB` + #[must_use] + pub fn max_decoding_message_size(mut self, limit: usize) -> Self { + self.inner = self.inner.max_decoding_message_size(limit); + self + } + /// Limits the maximum size of an encoded message. + /// + /// Default: `usize::MAX` + #[must_use] + pub fn max_encoding_message_size(mut self, limit: usize) -> Self { + self.inner = self.inner.max_encoding_message_size(limit); + self + } + pub async fn publish_event( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/nodata.v1.NoDataService/PublishEvent", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert(GrpcMethod::new("nodata.v1.NoDataService", "PublishEvent")); + self.inner.unary(req, path, codec).await + } + pub async fn get_topics( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/nodata.v1.NoDataService/GetTopics", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert(GrpcMethod::new("nodata.v1.NoDataService", "GetTopics")); + self.inner.unary(req, path, codec).await + } + pub async fn get_keys( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/nodata.v1.NoDataService/GetKeys", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert(GrpcMethod::new("nodata.v1.NoDataService", "GetKeys")); + self.inner.unary(req, path, codec).await + } + pub async fn subscribe( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result< + tonic::Response>, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/nodata.v1.NoDataService/Subscribe", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert(GrpcMethod::new("nodata.v1.NoDataService", "Subscribe")); + self.inner.server_streaming(req, path, codec).await + } + } +} +/// Generated server implementations. +pub mod no_data_service_server { + #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] + use tonic::codegen::*; + /// Generated trait containing gRPC methods that should be implemented for use with NoDataServiceServer. + #[async_trait] + pub trait NoDataService: Send + Sync + 'static { + async fn publish_event( + &self, + request: tonic::Request, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; + async fn get_topics( + &self, + request: tonic::Request, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; + async fn get_keys( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status>; + /// Server streaming response type for the Subscribe method. + type SubscribeStream: tonic::codegen::tokio_stream::Stream< + Item = std::result::Result, + > + + Send + + 'static; + async fn subscribe( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status>; + } + #[derive(Debug)] + pub struct NoDataServiceServer { + inner: _Inner, + accept_compression_encodings: EnabledCompressionEncodings, + send_compression_encodings: EnabledCompressionEncodings, + max_decoding_message_size: Option, + max_encoding_message_size: Option, + } + struct _Inner(Arc); + impl NoDataServiceServer { + pub fn new(inner: T) -> Self { + Self::from_arc(Arc::new(inner)) + } + pub fn from_arc(inner: Arc) -> Self { + let inner = _Inner(inner); + Self { + inner, + accept_compression_encodings: Default::default(), + send_compression_encodings: Default::default(), + max_decoding_message_size: None, + max_encoding_message_size: None, + } + } + pub fn with_interceptor( + inner: T, + interceptor: F, + ) -> InterceptedService + where + F: tonic::service::Interceptor, + { + InterceptedService::new(Self::new(inner), interceptor) + } + /// Enable decompressing requests with the given encoding. + #[must_use] + pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.accept_compression_encodings.enable(encoding); + self + } + /// Compress responses with the given encoding, if the client supports it. + #[must_use] + pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.send_compression_encodings.enable(encoding); + self + } + /// Limits the maximum size of a decoded message. + /// + /// Default: `4MB` + #[must_use] + pub fn max_decoding_message_size(mut self, limit: usize) -> Self { + self.max_decoding_message_size = Some(limit); + self + } + /// Limits the maximum size of an encoded message. + /// + /// Default: `usize::MAX` + #[must_use] + pub fn max_encoding_message_size(mut self, limit: usize) -> Self { + self.max_encoding_message_size = Some(limit); + self + } + } + impl tonic::codegen::Service> for NoDataServiceServer + where + T: NoDataService, + B: Body + Send + 'static, + B::Error: Into + Send + 'static, + { + type Response = http::Response; + type Error = std::convert::Infallible; + type Future = BoxFuture; + fn poll_ready( + &mut self, + _cx: &mut Context<'_>, + ) -> Poll> { + Poll::Ready(Ok(())) + } + fn call(&mut self, req: http::Request) -> Self::Future { + let inner = self.inner.clone(); + match req.uri().path() { + "/nodata.v1.NoDataService/PublishEvent" => { + #[allow(non_camel_case_types)] + struct PublishEventSvc(pub Arc); + impl< + T: NoDataService, + > tonic::server::UnaryService + for PublishEventSvc { + type Response = super::PublishEventResponse; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::publish_event(&inner, request).await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let inner = inner.0; + let method = PublishEventSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + "/nodata.v1.NoDataService/GetTopics" => { + #[allow(non_camel_case_types)] + struct GetTopicsSvc(pub Arc); + impl< + T: NoDataService, + > tonic::server::UnaryService + for GetTopicsSvc { + type Response = super::GetTopicsResponse; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::get_topics(&inner, request).await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let inner = inner.0; + let method = GetTopicsSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + "/nodata.v1.NoDataService/GetKeys" => { + #[allow(non_camel_case_types)] + struct GetKeysSvc(pub Arc); + impl< + T: NoDataService, + > tonic::server::UnaryService + for GetKeysSvc { + type Response = super::GetKeysResponse; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::get_keys(&inner, request).await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let inner = inner.0; + let method = GetKeysSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + "/nodata.v1.NoDataService/Subscribe" => { + #[allow(non_camel_case_types)] + struct SubscribeSvc(pub Arc); + impl< + T: NoDataService, + > tonic::server::ServerStreamingService + for SubscribeSvc { + type Response = super::SubscribeResponse; + type ResponseStream = T::SubscribeStream; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::subscribe(&inner, request).await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let inner = inner.0; + let method = SubscribeSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.server_streaming(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + _ => { + Box::pin(async move { + Ok( + http::Response::builder() + .status(200) + .header("grpc-status", "12") + .header("content-type", "application/grpc") + .body(empty_body()) + .unwrap(), + ) + }) + } + } + } + } + impl Clone for NoDataServiceServer { + fn clone(&self) -> Self { + let inner = self.inner.clone(); + Self { + inner, + accept_compression_encodings: self.accept_compression_encodings, + send_compression_encodings: self.send_compression_encodings, + max_decoding_message_size: self.max_decoding_message_size, + max_encoding_message_size: self.max_encoding_message_size, + } + } + } + impl Clone for _Inner { + fn clone(&self) -> Self { + Self(Arc::clone(&self.0)) + } + } + impl std::fmt::Debug for _Inner { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{:?}", self.0) + } + } + impl tonic::server::NamedService for NoDataServiceServer { + const NAME: &'static str = "nodata.v1.NoDataService"; + } +} +/// Generated client implementations. +pub mod no_data_component_client { + #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] + use tonic::codegen::*; + use tonic::codegen::http::Uri; + /// + #[derive(Debug, Clone)] + pub struct NoDataComponentClient { + inner: tonic::client::Grpc, + } + impl NoDataComponentClient { + /// Attempt to create a new client by connecting to a given endpoint. + pub async fn connect(dst: D) -> Result + where + D: TryInto, + D::Error: Into, + { + let conn = tonic::transport::Endpoint::new(dst)?.connect().await?; + Ok(Self::new(conn)) + } + } + impl NoDataComponentClient + where + T: tonic::client::GrpcService, + T::Error: Into, + T::ResponseBody: Body + Send + 'static, + ::Error: Into + Send, + { + pub fn new(inner: T) -> Self { + let inner = tonic::client::Grpc::new(inner); + Self { inner } + } + pub fn with_origin(inner: T, origin: Uri) -> Self { + let inner = tonic::client::Grpc::with_origin(inner, origin); + Self { inner } + } + pub fn with_interceptor( + inner: T, + interceptor: F, + ) -> NoDataComponentClient> + where + F: tonic::service::Interceptor, + T::ResponseBody: Default, + T: tonic::codegen::Service< + http::Request, + Response = http::Response< + >::ResponseBody, + >, + >, + , + >>::Error: Into + Send + Sync, + { + NoDataComponentClient::new(InterceptedService::new(inner, interceptor)) + } + /// Compress requests with the given encoding. + /// + /// This requires the server to support it otherwise it might respond with an + /// error. + #[must_use] + pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.inner = self.inner.send_compressed(encoding); + self + } + /// Enable decompressing responses. + #[must_use] + pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.inner = self.inner.accept_compressed(encoding); + self + } + /// Limits the maximum size of a decoded message. + /// + /// Default: `4MB` + #[must_use] + pub fn max_decoding_message_size(mut self, limit: usize) -> Self { + self.inner = self.inner.max_decoding_message_size(limit); + self + } + /// Limits the maximum size of an encoded message. + /// + /// Default: `usize::MAX` + #[must_use] + pub fn max_encoding_message_size(mut self, limit: usize) -> Self { + self.inner = self.inner.max_encoding_message_size(limit); + self + } + /// + pub async fn transform_msg( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/nodata.v1.NoDataComponent/TransformMsg", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert(GrpcMethod::new("nodata.v1.NoDataComponent", "TransformMsg")); + self.inner.unary(req, path, codec).await + } + /// + pub async fn ping( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result, tonic::Status> { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/nodata.v1.NoDataComponent/Ping", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert(GrpcMethod::new("nodata.v1.NoDataComponent", "Ping")); + self.inner.unary(req, path, codec).await + } + } +} +/// Generated server implementations. +pub mod no_data_component_server { + #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] + use tonic::codegen::*; + /// Generated trait containing gRPC methods that should be implemented for use with NoDataComponentServer. + #[async_trait] + pub trait NoDataComponent: Send + Sync + 'static { + /// + async fn transform_msg( + &self, + request: tonic::Request, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; + /// + async fn ping( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status>; + } + /// + #[derive(Debug)] + pub struct NoDataComponentServer { + inner: _Inner, + accept_compression_encodings: EnabledCompressionEncodings, + send_compression_encodings: EnabledCompressionEncodings, + max_decoding_message_size: Option, + max_encoding_message_size: Option, + } + struct _Inner(Arc); + impl NoDataComponentServer { + pub fn new(inner: T) -> Self { + Self::from_arc(Arc::new(inner)) + } + pub fn from_arc(inner: Arc) -> Self { + let inner = _Inner(inner); + Self { + inner, + accept_compression_encodings: Default::default(), + send_compression_encodings: Default::default(), + max_decoding_message_size: None, + max_encoding_message_size: None, + } + } + pub fn with_interceptor( + inner: T, + interceptor: F, + ) -> InterceptedService + where + F: tonic::service::Interceptor, + { + InterceptedService::new(Self::new(inner), interceptor) + } + /// Enable decompressing requests with the given encoding. + #[must_use] + pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.accept_compression_encodings.enable(encoding); + self + } + /// Compress responses with the given encoding, if the client supports it. + #[must_use] + pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.send_compression_encodings.enable(encoding); + self + } + /// Limits the maximum size of a decoded message. + /// + /// Default: `4MB` + #[must_use] + pub fn max_decoding_message_size(mut self, limit: usize) -> Self { + self.max_decoding_message_size = Some(limit); + self + } + /// Limits the maximum size of an encoded message. + /// + /// Default: `usize::MAX` + #[must_use] + pub fn max_encoding_message_size(mut self, limit: usize) -> Self { + self.max_encoding_message_size = Some(limit); + self + } + } + impl tonic::codegen::Service> for NoDataComponentServer + where + T: NoDataComponent, + B: Body + Send + 'static, + B::Error: Into + Send + 'static, + { + type Response = http::Response; + type Error = std::convert::Infallible; + type Future = BoxFuture; + fn poll_ready( + &mut self, + _cx: &mut Context<'_>, + ) -> Poll> { + Poll::Ready(Ok(())) + } + fn call(&mut self, req: http::Request) -> Self::Future { + let inner = self.inner.clone(); + match req.uri().path() { + "/nodata.v1.NoDataComponent/TransformMsg" => { + #[allow(non_camel_case_types)] + struct TransformMsgSvc(pub Arc); + impl< + T: NoDataComponent, + > tonic::server::UnaryService + for TransformMsgSvc { + type Response = super::HandleMsgResponse; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::transform_msg(&inner, request).await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let inner = inner.0; + let method = TransformMsgSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + "/nodata.v1.NoDataComponent/Ping" => { + #[allow(non_camel_case_types)] + struct PingSvc(pub Arc); + impl< + T: NoDataComponent, + > tonic::server::UnaryService for PingSvc { + type Response = super::PingResponse; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::ping(&inner, request).await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let inner = inner.0; + let method = PingSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + _ => { + Box::pin(async move { + Ok( + http::Response::builder() + .status(200) + .header("grpc-status", "12") + .header("content-type", "application/grpc") + .body(empty_body()) + .unwrap(), + ) + }) + } + } + } + } + impl Clone for NoDataComponentServer { + fn clone(&self) -> Self { + let inner = self.inner.clone(); + Self { + inner, + accept_compression_encodings: self.accept_compression_encodings, + send_compression_encodings: self.send_compression_encodings, + max_decoding_message_size: self.max_decoding_message_size, + max_encoding_message_size: self.max_encoding_message_size, + } + } + } + impl Clone for _Inner { + fn clone(&self) -> Self { + Self(Arc::clone(&self.0)) + } + } + impl std::fmt::Debug for _Inner { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{:?}", self.0) + } + } + impl tonic::server::NamedService for NoDataComponentServer { + const NAME: &'static str = "nodata.v1.NoDataComponent"; + } +} diff --git a/crates/nodata-sdk/src/lib.rs b/crates/nodata-sdk/src/lib.rs new file mode 100644 index 0000000..4bbea36 --- /dev/null +++ b/crates/nodata-sdk/src/lib.rs @@ -0,0 +1,67 @@ +use anyhow::Context; +use async_trait::async_trait; +use grpc::PingResponse; + +mod grpc { + include!("gen/nodata.v1.rs"); +} + +#[derive(Default)] +pub struct NoData { + transformers: Option>, +} + +impl NoData { + pub fn new() -> Self { + Self { + transformers: Option::default(), + } + } + + pub fn with_transformer( + &mut self, + transformer: T, + ) -> &mut Self { + self.transformers = Some(Box::new(transformer)); + + self + } + + pub async fn serve(&mut self) -> anyhow::Result<()> { + tracing::info!("starting no data component"); + let grpc_server = grpc::no_data_component_server::NoDataComponentServer::new(NoDataGrpc {}); + + tracing::info!("component grpc listening on: 127.0.0.1:7900"); + tonic::transport::Server::builder() + .add_service(grpc_server) + .serve("127.0.0.1:7900".parse()?) + .await + .context("nodata component grpc server failed")?; + + Ok(()) + } +} + +struct NoDataGrpc {} + +#[async_trait] +impl grpc::no_data_component_server::NoDataComponent for NoDataGrpc { + async fn transform_msg( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status> { + todo!() + } + + async fn ping( + &self, + _request: tonic::Request, + ) -> std::result::Result, tonic::Status> { + Ok(tonic::Response::new(PingResponse {})) + } +} + +#[async_trait] +pub trait NoDataTransformers { + async fn transform_msg(&self) -> anyhow::Result<()>; +} diff --git a/crates/nodata/Cargo.toml b/crates/nodata/Cargo.toml index a92a264..d1ec9e1 100644 --- a/crates/nodata/Cargo.toml +++ b/crates/nodata/Cargo.toml @@ -25,10 +25,10 @@ uuid = { version = "1.7.0", features = ["v4"] } tower-http = { version = "0.5.2", features = ["cors", "trace"] } mad = { git = "https://github.com/kjuulh/mad", branch = "main" } tokio-util = "0.7.11" -tonic = "0.12.1" -bytes = "1.7.1" -prost = "0.13.1" -prost-types = "0.13.1" +tonic.workspace = true +bytes.workspace = true +prost.workspace = true +prost-types.workspace = true chrono = { version = "0.4.38", features = ["serde"] } tokio-stream = "0.1.15" dagger-sdk = "0.11.10"