From cb340ffb1e5cd27bf59314addd45d37b61a02a37 Mon Sep 17 00:00:00 2001 From: kjuulh Date: Sun, 24 Nov 2024 01:34:06 +0100 Subject: [PATCH] feat: add tonic Signed-off-by: kjuulh --- buf.gen.yaml | 8 + crates/churn/proto/churn/v1/churn.proto | 25 ++ crates/churn/src/agent.rs | 69 ++++- crates/churn/src/grpc/churn.v1.rs | 36 +++ crates/churn/src/grpc/churn.v1.tonic.rs | 350 ++++++++++++++++++++++++ crates/churn/src/main.rs | 3 + 6 files changed, 489 insertions(+), 2 deletions(-) create mode 100644 buf.gen.yaml create mode 100644 crates/churn/proto/churn/v1/churn.proto create mode 100644 crates/churn/src/grpc/churn.v1.rs create mode 100644 crates/churn/src/grpc/churn.v1.tonic.rs diff --git a/buf.gen.yaml b/buf.gen.yaml new file mode 100644 index 0000000..ce04c1c --- /dev/null +++ b/buf.gen.yaml @@ -0,0 +1,8 @@ +version: v2 +plugins: +- remote: buf.build/community/neoeinstein-prost + out: crates/churn/src/grpc +- remote: buf.build/community/neoeinstein-tonic:v0.4.0 + out: crates/churn/src/grpc +inputs: + - directory: crates/churn/proto diff --git a/crates/churn/proto/churn/v1/churn.proto b/crates/churn/proto/churn/v1/churn.proto new file mode 100644 index 0000000..77ef798 --- /dev/null +++ b/crates/churn/proto/churn/v1/churn.proto @@ -0,0 +1,25 @@ +syntax = "proto3"; + +package churn.v1; + +service Churn { + rpc GetKey(GetKeyRequest) returns (GetKeyResponse); + rpc SetKey(SetKeyRequest) returns (SetKeyResponse); +} + +message GetKeyRequest { + string namespace = 1; + optional string id = 2; + string key = 3; +} +message GetKeyResponse { + optional string value = 1; +} + +message SetKeyRequest { + string namespace = 1; + optional string id = 2; + string key = 3; + string value = 4; +} +message SetKeyResponse {} diff --git a/crates/churn/src/agent.rs b/crates/churn/src/agent.rs index ed5d900..4303461 100644 --- a/crates/churn/src/agent.rs +++ b/crates/churn/src/agent.rs @@ -1,9 +1,74 @@ use agent_state::AgentState; use refresh::AgentRefresh; - mod agent_state; - mod refresh; +mod grpc_client { + use tonic::transport::{Channel, ClientTlsConfig}; + + use crate::grpc::{churn_client::ChurnClient, *}; + + pub struct GrpcClient { + host: String, + } + + impl GrpcClient { + pub fn new(host: impl Into) -> Self { + Self { host: host.into() } + } + + pub async fn get_key( + &self, + namespace: &str, + id: Option>, + key: &str, + ) -> anyhow::Result> { + let mut client = self.client().await?; + + let resp = client + .get_key(GetKeyRequest { + key: key.into(), + namespace: namespace.into(), + id: id.map(|i| i.into()), + }) + .await?; + let resp = resp.into_inner(); + + Ok(resp.value) + } + + pub async fn set_key( + &self, + namespace: &str, + id: Option>, + key: &str, + value: &str, + ) -> anyhow::Result<()> { + let mut client = self.client().await?; + + client + .set_key(SetKeyRequest { + key: key.into(), + value: value.into(), + namespace: namespace.into(), + id: id.map(|i| i.into()), + }) + .await?; + + Ok(()) + } + + async fn client(&self) -> anyhow::Result> { + let channel = Channel::from_shared(self.host.to_owned())? + .tls_config(ClientTlsConfig::new().with_native_roots())? + .connect() + .await?; + + let client = ChurnClient::new(channel); + + Ok(client) + } + } +} pub async fn execute(host: impl Into) -> anyhow::Result<()> { let state = AgentState::new().await?; diff --git a/crates/churn/src/grpc/churn.v1.rs b/crates/churn/src/grpc/churn.v1.rs new file mode 100644 index 0000000..2717f7c --- /dev/null +++ b/crates/churn/src/grpc/churn.v1.rs @@ -0,0 +1,36 @@ +// @generated +// This file is @generated by prost-build. +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct GetKeyRequest { + #[prost(string, tag="1")] + pub namespace: ::prost::alloc::string::String, + #[prost(string, optional, tag="2")] + pub id: ::core::option::Option<::prost::alloc::string::String>, + #[prost(string, tag="3")] + pub key: ::prost::alloc::string::String, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct GetKeyResponse { + #[prost(string, optional, tag="1")] + pub value: ::core::option::Option<::prost::alloc::string::String>, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct SetKeyRequest { + #[prost(string, tag="1")] + pub namespace: ::prost::alloc::string::String, + #[prost(string, optional, tag="2")] + pub id: ::core::option::Option<::prost::alloc::string::String>, + #[prost(string, tag="3")] + pub key: ::prost::alloc::string::String, + #[prost(string, tag="4")] + pub value: ::prost::alloc::string::String, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, Copy, PartialEq, ::prost::Message)] +pub struct SetKeyResponse { +} +include!("churn.v1.tonic.rs"); +// @@protoc_insertion_point(module) \ No newline at end of file diff --git a/crates/churn/src/grpc/churn.v1.tonic.rs b/crates/churn/src/grpc/churn.v1.tonic.rs new file mode 100644 index 0000000..8b748e3 --- /dev/null +++ b/crates/churn/src/grpc/churn.v1.tonic.rs @@ -0,0 +1,350 @@ +// @generated +/// Generated client implementations. +pub mod churn_client { + #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] + use tonic::codegen::*; + use tonic::codegen::http::Uri; + #[derive(Debug, Clone)] + pub struct ChurnClient { + inner: tonic::client::Grpc, + } + impl ChurnClient { + /// Attempt to create a new client by connecting to a given endpoint. + pub async fn connect(dst: D) -> Result + where + D: TryInto, + D::Error: Into, + { + let conn = tonic::transport::Endpoint::new(dst)?.connect().await?; + Ok(Self::new(conn)) + } + } + impl ChurnClient + where + T: tonic::client::GrpcService, + T::Error: Into, + T::ResponseBody: Body + Send + 'static, + ::Error: Into + Send, + { + pub fn new(inner: T) -> Self { + let inner = tonic::client::Grpc::new(inner); + Self { inner } + } + pub fn with_origin(inner: T, origin: Uri) -> Self { + let inner = tonic::client::Grpc::with_origin(inner, origin); + Self { inner } + } + pub fn with_interceptor( + inner: T, + interceptor: F, + ) -> ChurnClient> + where + F: tonic::service::Interceptor, + T::ResponseBody: Default, + T: tonic::codegen::Service< + http::Request, + Response = http::Response< + >::ResponseBody, + >, + >, + , + >>::Error: Into + Send + Sync, + { + ChurnClient::new(InterceptedService::new(inner, interceptor)) + } + /// Compress requests with the given encoding. + /// + /// This requires the server to support it otherwise it might respond with an + /// error. + #[must_use] + pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.inner = self.inner.send_compressed(encoding); + self + } + /// Enable decompressing responses. + #[must_use] + pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.inner = self.inner.accept_compressed(encoding); + self + } + /// Limits the maximum size of a decoded message. + /// + /// Default: `4MB` + #[must_use] + pub fn max_decoding_message_size(mut self, limit: usize) -> Self { + self.inner = self.inner.max_decoding_message_size(limit); + self + } + /// Limits the maximum size of an encoded message. + /// + /// Default: `usize::MAX` + #[must_use] + pub fn max_encoding_message_size(mut self, limit: usize) -> Self { + self.inner = self.inner.max_encoding_message_size(limit); + self + } + pub async fn get_key( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result, tonic::Status> { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static("/churn.v1.Churn/GetKey"); + let mut req = request.into_request(); + req.extensions_mut().insert(GrpcMethod::new("churn.v1.Churn", "GetKey")); + self.inner.unary(req, path, codec).await + } + pub async fn set_key( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result, tonic::Status> { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static("/churn.v1.Churn/SetKey"); + let mut req = request.into_request(); + req.extensions_mut().insert(GrpcMethod::new("churn.v1.Churn", "SetKey")); + self.inner.unary(req, path, codec).await + } + } +} +/// Generated server implementations. +pub mod churn_server { + #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] + use tonic::codegen::*; + /// Generated trait containing gRPC methods that should be implemented for use with ChurnServer. + #[async_trait] + pub trait Churn: Send + Sync + 'static { + async fn get_key( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status>; + async fn set_key( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status>; + } + #[derive(Debug)] + pub struct ChurnServer { + inner: _Inner, + accept_compression_encodings: EnabledCompressionEncodings, + send_compression_encodings: EnabledCompressionEncodings, + max_decoding_message_size: Option, + max_encoding_message_size: Option, + } + struct _Inner(Arc); + impl ChurnServer { + pub fn new(inner: T) -> Self { + Self::from_arc(Arc::new(inner)) + } + pub fn from_arc(inner: Arc) -> Self { + let inner = _Inner(inner); + Self { + inner, + accept_compression_encodings: Default::default(), + send_compression_encodings: Default::default(), + max_decoding_message_size: None, + max_encoding_message_size: None, + } + } + pub fn with_interceptor( + inner: T, + interceptor: F, + ) -> InterceptedService + where + F: tonic::service::Interceptor, + { + InterceptedService::new(Self::new(inner), interceptor) + } + /// Enable decompressing requests with the given encoding. + #[must_use] + pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.accept_compression_encodings.enable(encoding); + self + } + /// Compress responses with the given encoding, if the client supports it. + #[must_use] + pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.send_compression_encodings.enable(encoding); + self + } + /// Limits the maximum size of a decoded message. + /// + /// Default: `4MB` + #[must_use] + pub fn max_decoding_message_size(mut self, limit: usize) -> Self { + self.max_decoding_message_size = Some(limit); + self + } + /// Limits the maximum size of an encoded message. + /// + /// Default: `usize::MAX` + #[must_use] + pub fn max_encoding_message_size(mut self, limit: usize) -> Self { + self.max_encoding_message_size = Some(limit); + self + } + } + impl tonic::codegen::Service> for ChurnServer + where + T: Churn, + B: Body + Send + 'static, + B::Error: Into + Send + 'static, + { + type Response = http::Response; + type Error = std::convert::Infallible; + type Future = BoxFuture; + fn poll_ready( + &mut self, + _cx: &mut Context<'_>, + ) -> Poll> { + Poll::Ready(Ok(())) + } + fn call(&mut self, req: http::Request) -> Self::Future { + let inner = self.inner.clone(); + match req.uri().path() { + "/churn.v1.Churn/GetKey" => { + #[allow(non_camel_case_types)] + struct GetKeySvc(pub Arc); + impl tonic::server::UnaryService + for GetKeySvc { + type Response = super::GetKeyResponse; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::get_key(&inner, request).await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let inner = inner.0; + let method = GetKeySvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + "/churn.v1.Churn/SetKey" => { + #[allow(non_camel_case_types)] + struct SetKeySvc(pub Arc); + impl tonic::server::UnaryService + for SetKeySvc { + type Response = super::SetKeyResponse; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::set_key(&inner, request).await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let inner = inner.0; + let method = SetKeySvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + _ => { + Box::pin(async move { + Ok( + http::Response::builder() + .status(200) + .header("grpc-status", "12") + .header("content-type", "application/grpc") + .body(empty_body()) + .unwrap(), + ) + }) + } + } + } + } + impl Clone for ChurnServer { + fn clone(&self) -> Self { + let inner = self.inner.clone(); + Self { + inner, + accept_compression_encodings: self.accept_compression_encodings, + send_compression_encodings: self.send_compression_encodings, + max_decoding_message_size: self.max_decoding_message_size, + max_encoding_message_size: self.max_encoding_message_size, + } + } + } + impl Clone for _Inner { + fn clone(&self) -> Self { + Self(Arc::clone(&self.0)) + } + } + impl std::fmt::Debug for _Inner { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{:?}", self.0) + } + } + impl tonic::server::NamedService for ChurnServer { + const NAME: &'static str = "churn.v1.Churn"; + } +} diff --git a/crates/churn/src/main.rs b/crates/churn/src/main.rs index ace2eb0..32526de 100644 --- a/crates/churn/src/main.rs +++ b/crates/churn/src/main.rs @@ -1,6 +1,9 @@ mod api; mod cli; mod state; +mod grpc { + include!("grpc/churn.v1.rs"); +} mod agent;