feat: add nodata

Signed-off-by: kjuulh <contact@kjuulh.io>
This commit is contained in:
Kasper Juul Hermansen 2024-11-17 22:25:23 +01:00
parent 4d150febc7
commit 58c27e429b
No known key found for this signature in database
13 changed files with 2657 additions and 810 deletions

2376
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -14,6 +14,7 @@ crunch-nats = { path = "crates/crunch-nats" }
crunch-file = { path = "crates/crunch-file" }
crunch-codegen = { path = "crates/crunch-codegen" }
crunch-postgres = { path = "crates/crunch-postgres" }
crunch-nodata = { path = "crates/crunch-nodata" }
anyhow = { version = "1.0.75" }
tokio = { version = "1", features = ["full"] }
@ -29,8 +30,8 @@ clap = {version = "4.4.5", features = ["derive"]}
toml_edit = { version = "0.20.0", features = ["serde"] }
serde = { version = "1.0.188", features = ["derive"] }
serde_json = { version = "1.0.107" }
prost = {version = "0.12"}
prost-types = {version = "0.12"}
prost = { version = "0.13" }
prost-types = { version = "0.13" }
prost-build = "0.12"
bytes = { version = "1.5" }
tempfile = { version = "3.8.0" }
@ -38,7 +39,18 @@ genco = {version = "0.17.6"}
walkdir = { version = "2.4.0" }
regex = { version = "1.9.5" }
inquire = { version = "0.6.2" }
sqlx = {version = "0.7.2", default-features = false, features = ["migrate", "macros", "postgres", "runtime-tokio", "tls-rustls", "chrono", "json", "uuid" ]}
sqlx = { version = "0.7.2", default-features = false, features = [
"migrate",
"macros",
"postgres",
"runtime-tokio",
"tls-rustls",
"chrono",
"json",
"uuid",
] }
chrono = { version = "0.4.31", features = ["serde"] }
nodata = { version = "0.1.0" }
tonic = { version = "0.12.3", features = ["tls", "tls-roots"] }
pretty_assertions = "1.4.0"

View File

@ -1,3 +1,4 @@
// This file is @generated by prost-build.
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct Envelope {

View File

@ -32,8 +32,8 @@ pub mod capnp {
envelope.set_content(content);
let mut metadata = envelope.init_metadata();
metadata.set_domain(domain);
metadata.set_entity(entity);
metadata.set_domain(domain.into());
metadata.set_entity(entity.into());
serialize::write_message_to_words(&builder)
}
@ -56,11 +56,13 @@ pub mod capnp {
domain: metadata
.get_domain()
.map_err(EnvelopeError::CapnpError)?
.to_string(),
.to_string()
.expect("message to be utf8"),
entity: metadata
.get_entity()
.map_err(EnvelopeError::CapnpError)?
.to_string(),
.to_string()
.expect("message to be utf8"),
},
))
}

View File

@ -0,0 +1,18 @@
[package]
name = "crunch-nodata"
version = "0.1.0"
edition = "2021"
[dependencies]
crunch-traits.workspace = true
prost.workspace = true
prost-types.workspace = true
bytes.workspace = true
tonic.workspace = true
anyhow.workspace = true
tracing.workspace = true
tokio.workspace = true
thiserror.workspace = true
async-trait.workspace = true
futures.workspace = true

View File

@ -0,0 +1,13 @@
version: v2
managed:
enabled: true
plugins:
# dependencies
- remote: buf.build/community/neoeinstein-prost
out: src/gen
- remote: buf.build/community/neoeinstein-tonic
out: src/gen
inputs:
- module: buf.build/noschemaplz/nodata:main

View File

@ -0,0 +1,62 @@
// @generated
// This file is @generated by prost-build.
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct PublishEventRequest {
#[prost(string, tag="1")]
pub topic: ::prost::alloc::string::String,
#[prost(bytes="vec", tag="2")]
pub value: ::prost::alloc::vec::Vec<u8>,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, Copy, PartialEq, ::prost::Message)]
pub struct PublishEventResponse {
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, Copy, PartialEq, ::prost::Message)]
pub struct GetTopicsRequest {
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct GetTopicsResponse {
#[prost(string, repeated, tag="1")]
pub topics: ::prost::alloc::vec::Vec<::prost::alloc::string::String>,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct SubscribeRequest {
#[prost(string, tag="1")]
pub topic: ::prost::alloc::string::String,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct SubscribeResponse {
#[prost(message, optional, tag="2")]
pub published: ::core::option::Option<::prost_types::Timestamp>,
#[prost(bytes="vec", tag="4")]
pub value: ::prost::alloc::vec::Vec<u8>,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, Copy, PartialEq, ::prost::Message)]
pub struct HandleMsgRequest {
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, Copy, PartialEq, ::prost::Message)]
pub struct HandleMsgResponse {
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, Copy, PartialEq, ::prost::Message)]
pub struct PingRequest {
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, Copy, PartialEq, ::prost::Message)]
pub struct PingResponse {
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct Log {
#[prost(bytes="vec", repeated, tag="1")]
pub messages: ::prost::alloc::vec::Vec<::prost::alloc::vec::Vec<u8>>,
}
include!("nodata.v1.tonic.rs");
// @@protoc_insertion_point(module)

View File

@ -0,0 +1,799 @@
// @generated
/// Generated client implementations.
pub mod no_data_service_client {
#![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)]
use tonic::codegen::*;
use tonic::codegen::http::Uri;
#[derive(Debug, Clone)]
pub struct NoDataServiceClient<T> {
inner: tonic::client::Grpc<T>,
}
impl NoDataServiceClient<tonic::transport::Channel> {
/// Attempt to create a new client by connecting to a given endpoint.
pub async fn connect<D>(dst: D) -> Result<Self, tonic::transport::Error>
where
D: TryInto<tonic::transport::Endpoint>,
D::Error: Into<StdError>,
{
let conn = tonic::transport::Endpoint::new(dst)?.connect().await?;
Ok(Self::new(conn))
}
}
impl<T> NoDataServiceClient<T>
where
T: tonic::client::GrpcService<tonic::body::BoxBody>,
T::Error: Into<StdError>,
T::ResponseBody: Body<Data = Bytes> + Send + 'static,
<T::ResponseBody as Body>::Error: Into<StdError> + Send,
{
pub fn new(inner: T) -> Self {
let inner = tonic::client::Grpc::new(inner);
Self { inner }
}
pub fn with_origin(inner: T, origin: Uri) -> Self {
let inner = tonic::client::Grpc::with_origin(inner, origin);
Self { inner }
}
pub fn with_interceptor<F>(
inner: T,
interceptor: F,
) -> NoDataServiceClient<InterceptedService<T, F>>
where
F: tonic::service::Interceptor,
T::ResponseBody: Default,
T: tonic::codegen::Service<
http::Request<tonic::body::BoxBody>,
Response = http::Response<
<T as tonic::client::GrpcService<tonic::body::BoxBody>>::ResponseBody,
>,
>,
<T as tonic::codegen::Service<
http::Request<tonic::body::BoxBody>,
>>::Error: Into<StdError> + Send + Sync,
{
NoDataServiceClient::new(InterceptedService::new(inner, interceptor))
}
/// Compress requests with the given encoding.
///
/// This requires the server to support it otherwise it might respond with an
/// error.
#[must_use]
pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self {
self.inner = self.inner.send_compressed(encoding);
self
}
/// Enable decompressing responses.
#[must_use]
pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self {
self.inner = self.inner.accept_compressed(encoding);
self
}
/// Limits the maximum size of a decoded message.
///
/// Default: `4MB`
#[must_use]
pub fn max_decoding_message_size(mut self, limit: usize) -> Self {
self.inner = self.inner.max_decoding_message_size(limit);
self
}
/// Limits the maximum size of an encoded message.
///
/// Default: `usize::MAX`
#[must_use]
pub fn max_encoding_message_size(mut self, limit: usize) -> Self {
self.inner = self.inner.max_encoding_message_size(limit);
self
}
pub async fn publish_event(
&mut self,
request: impl tonic::IntoRequest<super::PublishEventRequest>,
) -> std::result::Result<
tonic::Response<super::PublishEventResponse>,
tonic::Status,
> {
self.inner
.ready()
.await
.map_err(|e| {
tonic::Status::new(
tonic::Code::Unknown,
format!("Service was not ready: {}", e.into()),
)
})?;
let codec = tonic::codec::ProstCodec::default();
let path = http::uri::PathAndQuery::from_static(
"/nodata.v1.NoDataService/PublishEvent",
);
let mut req = request.into_request();
req.extensions_mut()
.insert(GrpcMethod::new("nodata.v1.NoDataService", "PublishEvent"));
self.inner.unary(req, path, codec).await
}
pub async fn get_topics(
&mut self,
request: impl tonic::IntoRequest<super::GetTopicsRequest>,
) -> std::result::Result<
tonic::Response<super::GetTopicsResponse>,
tonic::Status,
> {
self.inner
.ready()
.await
.map_err(|e| {
tonic::Status::new(
tonic::Code::Unknown,
format!("Service was not ready: {}", e.into()),
)
})?;
let codec = tonic::codec::ProstCodec::default();
let path = http::uri::PathAndQuery::from_static(
"/nodata.v1.NoDataService/GetTopics",
);
let mut req = request.into_request();
req.extensions_mut()
.insert(GrpcMethod::new("nodata.v1.NoDataService", "GetTopics"));
self.inner.unary(req, path, codec).await
}
pub async fn subscribe(
&mut self,
request: impl tonic::IntoRequest<super::SubscribeRequest>,
) -> std::result::Result<
tonic::Response<tonic::codec::Streaming<super::SubscribeResponse>>,
tonic::Status,
> {
self.inner
.ready()
.await
.map_err(|e| {
tonic::Status::new(
tonic::Code::Unknown,
format!("Service was not ready: {}", e.into()),
)
})?;
let codec = tonic::codec::ProstCodec::default();
let path = http::uri::PathAndQuery::from_static(
"/nodata.v1.NoDataService/Subscribe",
);
let mut req = request.into_request();
req.extensions_mut()
.insert(GrpcMethod::new("nodata.v1.NoDataService", "Subscribe"));
self.inner.server_streaming(req, path, codec).await
}
}
}
/// Generated server implementations.
pub mod no_data_service_server {
#![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)]
use tonic::codegen::*;
/// Generated trait containing gRPC methods that should be implemented for use with NoDataServiceServer.
#[async_trait]
pub trait NoDataService: Send + Sync + 'static {
async fn publish_event(
&self,
request: tonic::Request<super::PublishEventRequest>,
) -> std::result::Result<
tonic::Response<super::PublishEventResponse>,
tonic::Status,
>;
async fn get_topics(
&self,
request: tonic::Request<super::GetTopicsRequest>,
) -> std::result::Result<
tonic::Response<super::GetTopicsResponse>,
tonic::Status,
>;
/// Server streaming response type for the Subscribe method.
type SubscribeStream: tonic::codegen::tokio_stream::Stream<
Item = std::result::Result<super::SubscribeResponse, tonic::Status>,
>
+ Send
+ 'static;
async fn subscribe(
&self,
request: tonic::Request<super::SubscribeRequest>,
) -> std::result::Result<tonic::Response<Self::SubscribeStream>, tonic::Status>;
}
#[derive(Debug)]
pub struct NoDataServiceServer<T: NoDataService> {
inner: Arc<T>,
accept_compression_encodings: EnabledCompressionEncodings,
send_compression_encodings: EnabledCompressionEncodings,
max_decoding_message_size: Option<usize>,
max_encoding_message_size: Option<usize>,
}
impl<T: NoDataService> NoDataServiceServer<T> {
pub fn new(inner: T) -> Self {
Self::from_arc(Arc::new(inner))
}
pub fn from_arc(inner: Arc<T>) -> Self {
Self {
inner,
accept_compression_encodings: Default::default(),
send_compression_encodings: Default::default(),
max_decoding_message_size: None,
max_encoding_message_size: None,
}
}
pub fn with_interceptor<F>(
inner: T,
interceptor: F,
) -> InterceptedService<Self, F>
where
F: tonic::service::Interceptor,
{
InterceptedService::new(Self::new(inner), interceptor)
}
/// Enable decompressing requests with the given encoding.
#[must_use]
pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self {
self.accept_compression_encodings.enable(encoding);
self
}
/// Compress responses with the given encoding, if the client supports it.
#[must_use]
pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self {
self.send_compression_encodings.enable(encoding);
self
}
/// Limits the maximum size of a decoded message.
///
/// Default: `4MB`
#[must_use]
pub fn max_decoding_message_size(mut self, limit: usize) -> Self {
self.max_decoding_message_size = Some(limit);
self
}
/// Limits the maximum size of an encoded message.
///
/// Default: `usize::MAX`
#[must_use]
pub fn max_encoding_message_size(mut self, limit: usize) -> Self {
self.max_encoding_message_size = Some(limit);
self
}
}
impl<T, B> tonic::codegen::Service<http::Request<B>> for NoDataServiceServer<T>
where
T: NoDataService,
B: Body + Send + 'static,
B::Error: Into<StdError> + Send + 'static,
{
type Response = http::Response<tonic::body::BoxBody>;
type Error = std::convert::Infallible;
type Future = BoxFuture<Self::Response, Self::Error>;
fn poll_ready(
&mut self,
_cx: &mut Context<'_>,
) -> Poll<std::result::Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, req: http::Request<B>) -> Self::Future {
match req.uri().path() {
"/nodata.v1.NoDataService/PublishEvent" => {
#[allow(non_camel_case_types)]
struct PublishEventSvc<T: NoDataService>(pub Arc<T>);
impl<
T: NoDataService,
> tonic::server::UnaryService<super::PublishEventRequest>
for PublishEventSvc<T> {
type Response = super::PublishEventResponse;
type Future = BoxFuture<
tonic::Response<Self::Response>,
tonic::Status,
>;
fn call(
&mut self,
request: tonic::Request<super::PublishEventRequest>,
) -> Self::Future {
let inner = Arc::clone(&self.0);
let fut = async move {
<T as NoDataService>::publish_event(&inner, request).await
};
Box::pin(fut)
}
}
let accept_compression_encodings = self.accept_compression_encodings;
let send_compression_encodings = self.send_compression_encodings;
let max_decoding_message_size = self.max_decoding_message_size;
let max_encoding_message_size = self.max_encoding_message_size;
let inner = self.inner.clone();
let fut = async move {
let method = PublishEventSvc(inner);
let codec = tonic::codec::ProstCodec::default();
let mut grpc = tonic::server::Grpc::new(codec)
.apply_compression_config(
accept_compression_encodings,
send_compression_encodings,
)
.apply_max_message_size_config(
max_decoding_message_size,
max_encoding_message_size,
);
let res = grpc.unary(method, req).await;
Ok(res)
};
Box::pin(fut)
}
"/nodata.v1.NoDataService/GetTopics" => {
#[allow(non_camel_case_types)]
struct GetTopicsSvc<T: NoDataService>(pub Arc<T>);
impl<
T: NoDataService,
> tonic::server::UnaryService<super::GetTopicsRequest>
for GetTopicsSvc<T> {
type Response = super::GetTopicsResponse;
type Future = BoxFuture<
tonic::Response<Self::Response>,
tonic::Status,
>;
fn call(
&mut self,
request: tonic::Request<super::GetTopicsRequest>,
) -> Self::Future {
let inner = Arc::clone(&self.0);
let fut = async move {
<T as NoDataService>::get_topics(&inner, request).await
};
Box::pin(fut)
}
}
let accept_compression_encodings = self.accept_compression_encodings;
let send_compression_encodings = self.send_compression_encodings;
let max_decoding_message_size = self.max_decoding_message_size;
let max_encoding_message_size = self.max_encoding_message_size;
let inner = self.inner.clone();
let fut = async move {
let method = GetTopicsSvc(inner);
let codec = tonic::codec::ProstCodec::default();
let mut grpc = tonic::server::Grpc::new(codec)
.apply_compression_config(
accept_compression_encodings,
send_compression_encodings,
)
.apply_max_message_size_config(
max_decoding_message_size,
max_encoding_message_size,
);
let res = grpc.unary(method, req).await;
Ok(res)
};
Box::pin(fut)
}
"/nodata.v1.NoDataService/Subscribe" => {
#[allow(non_camel_case_types)]
struct SubscribeSvc<T: NoDataService>(pub Arc<T>);
impl<
T: NoDataService,
> tonic::server::ServerStreamingService<super::SubscribeRequest>
for SubscribeSvc<T> {
type Response = super::SubscribeResponse;
type ResponseStream = T::SubscribeStream;
type Future = BoxFuture<
tonic::Response<Self::ResponseStream>,
tonic::Status,
>;
fn call(
&mut self,
request: tonic::Request<super::SubscribeRequest>,
) -> Self::Future {
let inner = Arc::clone(&self.0);
let fut = async move {
<T as NoDataService>::subscribe(&inner, request).await
};
Box::pin(fut)
}
}
let accept_compression_encodings = self.accept_compression_encodings;
let send_compression_encodings = self.send_compression_encodings;
let max_decoding_message_size = self.max_decoding_message_size;
let max_encoding_message_size = self.max_encoding_message_size;
let inner = self.inner.clone();
let fut = async move {
let method = SubscribeSvc(inner);
let codec = tonic::codec::ProstCodec::default();
let mut grpc = tonic::server::Grpc::new(codec)
.apply_compression_config(
accept_compression_encodings,
send_compression_encodings,
)
.apply_max_message_size_config(
max_decoding_message_size,
max_encoding_message_size,
);
let res = grpc.server_streaming(method, req).await;
Ok(res)
};
Box::pin(fut)
}
_ => {
Box::pin(async move {
Ok(
http::Response::builder()
.status(200)
.header("grpc-status", tonic::Code::Unimplemented as i32)
.header(
http::header::CONTENT_TYPE,
tonic::metadata::GRPC_CONTENT_TYPE,
)
.body(empty_body())
.unwrap(),
)
})
}
}
}
}
impl<T: NoDataService> Clone for NoDataServiceServer<T> {
fn clone(&self) -> Self {
let inner = self.inner.clone();
Self {
inner,
accept_compression_encodings: self.accept_compression_encodings,
send_compression_encodings: self.send_compression_encodings,
max_decoding_message_size: self.max_decoding_message_size,
max_encoding_message_size: self.max_encoding_message_size,
}
}
}
impl<T: NoDataService> tonic::server::NamedService for NoDataServiceServer<T> {
const NAME: &'static str = "nodata.v1.NoDataService";
}
}
/// Generated client implementations.
pub mod no_data_component_client {
#![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)]
use tonic::codegen::*;
use tonic::codegen::http::Uri;
///
#[derive(Debug, Clone)]
pub struct NoDataComponentClient<T> {
inner: tonic::client::Grpc<T>,
}
impl NoDataComponentClient<tonic::transport::Channel> {
/// Attempt to create a new client by connecting to a given endpoint.
pub async fn connect<D>(dst: D) -> Result<Self, tonic::transport::Error>
where
D: TryInto<tonic::transport::Endpoint>,
D::Error: Into<StdError>,
{
let conn = tonic::transport::Endpoint::new(dst)?.connect().await?;
Ok(Self::new(conn))
}
}
impl<T> NoDataComponentClient<T>
where
T: tonic::client::GrpcService<tonic::body::BoxBody>,
T::Error: Into<StdError>,
T::ResponseBody: Body<Data = Bytes> + Send + 'static,
<T::ResponseBody as Body>::Error: Into<StdError> + Send,
{
pub fn new(inner: T) -> Self {
let inner = tonic::client::Grpc::new(inner);
Self { inner }
}
pub fn with_origin(inner: T, origin: Uri) -> Self {
let inner = tonic::client::Grpc::with_origin(inner, origin);
Self { inner }
}
pub fn with_interceptor<F>(
inner: T,
interceptor: F,
) -> NoDataComponentClient<InterceptedService<T, F>>
where
F: tonic::service::Interceptor,
T::ResponseBody: Default,
T: tonic::codegen::Service<
http::Request<tonic::body::BoxBody>,
Response = http::Response<
<T as tonic::client::GrpcService<tonic::body::BoxBody>>::ResponseBody,
>,
>,
<T as tonic::codegen::Service<
http::Request<tonic::body::BoxBody>,
>>::Error: Into<StdError> + Send + Sync,
{
NoDataComponentClient::new(InterceptedService::new(inner, interceptor))
}
/// Compress requests with the given encoding.
///
/// This requires the server to support it otherwise it might respond with an
/// error.
#[must_use]
pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self {
self.inner = self.inner.send_compressed(encoding);
self
}
/// Enable decompressing responses.
#[must_use]
pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self {
self.inner = self.inner.accept_compressed(encoding);
self
}
/// Limits the maximum size of a decoded message.
///
/// Default: `4MB`
#[must_use]
pub fn max_decoding_message_size(mut self, limit: usize) -> Self {
self.inner = self.inner.max_decoding_message_size(limit);
self
}
/// Limits the maximum size of an encoded message.
///
/// Default: `usize::MAX`
#[must_use]
pub fn max_encoding_message_size(mut self, limit: usize) -> Self {
self.inner = self.inner.max_encoding_message_size(limit);
self
}
///
pub async fn transform_msg(
&mut self,
request: impl tonic::IntoRequest<super::HandleMsgRequest>,
) -> std::result::Result<
tonic::Response<super::HandleMsgResponse>,
tonic::Status,
> {
self.inner
.ready()
.await
.map_err(|e| {
tonic::Status::new(
tonic::Code::Unknown,
format!("Service was not ready: {}", e.into()),
)
})?;
let codec = tonic::codec::ProstCodec::default();
let path = http::uri::PathAndQuery::from_static(
"/nodata.v1.NoDataComponent/TransformMsg",
);
let mut req = request.into_request();
req.extensions_mut()
.insert(GrpcMethod::new("nodata.v1.NoDataComponent", "TransformMsg"));
self.inner.unary(req, path, codec).await
}
///
pub async fn ping(
&mut self,
request: impl tonic::IntoRequest<super::PingRequest>,
) -> std::result::Result<tonic::Response<super::PingResponse>, tonic::Status> {
self.inner
.ready()
.await
.map_err(|e| {
tonic::Status::new(
tonic::Code::Unknown,
format!("Service was not ready: {}", e.into()),
)
})?;
let codec = tonic::codec::ProstCodec::default();
let path = http::uri::PathAndQuery::from_static(
"/nodata.v1.NoDataComponent/Ping",
);
let mut req = request.into_request();
req.extensions_mut()
.insert(GrpcMethod::new("nodata.v1.NoDataComponent", "Ping"));
self.inner.unary(req, path, codec).await
}
}
}
/// Generated server implementations.
pub mod no_data_component_server {
#![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)]
use tonic::codegen::*;
/// Generated trait containing gRPC methods that should be implemented for use with NoDataComponentServer.
#[async_trait]
pub trait NoDataComponent: Send + Sync + 'static {
///
async fn transform_msg(
&self,
request: tonic::Request<super::HandleMsgRequest>,
) -> std::result::Result<
tonic::Response<super::HandleMsgResponse>,
tonic::Status,
>;
///
async fn ping(
&self,
request: tonic::Request<super::PingRequest>,
) -> std::result::Result<tonic::Response<super::PingResponse>, tonic::Status>;
}
///
#[derive(Debug)]
pub struct NoDataComponentServer<T: NoDataComponent> {
inner: Arc<T>,
accept_compression_encodings: EnabledCompressionEncodings,
send_compression_encodings: EnabledCompressionEncodings,
max_decoding_message_size: Option<usize>,
max_encoding_message_size: Option<usize>,
}
impl<T: NoDataComponent> NoDataComponentServer<T> {
pub fn new(inner: T) -> Self {
Self::from_arc(Arc::new(inner))
}
pub fn from_arc(inner: Arc<T>) -> Self {
Self {
inner,
accept_compression_encodings: Default::default(),
send_compression_encodings: Default::default(),
max_decoding_message_size: None,
max_encoding_message_size: None,
}
}
pub fn with_interceptor<F>(
inner: T,
interceptor: F,
) -> InterceptedService<Self, F>
where
F: tonic::service::Interceptor,
{
InterceptedService::new(Self::new(inner), interceptor)
}
/// Enable decompressing requests with the given encoding.
#[must_use]
pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self {
self.accept_compression_encodings.enable(encoding);
self
}
/// Compress responses with the given encoding, if the client supports it.
#[must_use]
pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self {
self.send_compression_encodings.enable(encoding);
self
}
/// Limits the maximum size of a decoded message.
///
/// Default: `4MB`
#[must_use]
pub fn max_decoding_message_size(mut self, limit: usize) -> Self {
self.max_decoding_message_size = Some(limit);
self
}
/// Limits the maximum size of an encoded message.
///
/// Default: `usize::MAX`
#[must_use]
pub fn max_encoding_message_size(mut self, limit: usize) -> Self {
self.max_encoding_message_size = Some(limit);
self
}
}
impl<T, B> tonic::codegen::Service<http::Request<B>> for NoDataComponentServer<T>
where
T: NoDataComponent,
B: Body + Send + 'static,
B::Error: Into<StdError> + Send + 'static,
{
type Response = http::Response<tonic::body::BoxBody>;
type Error = std::convert::Infallible;
type Future = BoxFuture<Self::Response, Self::Error>;
fn poll_ready(
&mut self,
_cx: &mut Context<'_>,
) -> Poll<std::result::Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, req: http::Request<B>) -> Self::Future {
match req.uri().path() {
"/nodata.v1.NoDataComponent/TransformMsg" => {
#[allow(non_camel_case_types)]
struct TransformMsgSvc<T: NoDataComponent>(pub Arc<T>);
impl<
T: NoDataComponent,
> tonic::server::UnaryService<super::HandleMsgRequest>
for TransformMsgSvc<T> {
type Response = super::HandleMsgResponse;
type Future = BoxFuture<
tonic::Response<Self::Response>,
tonic::Status,
>;
fn call(
&mut self,
request: tonic::Request<super::HandleMsgRequest>,
) -> Self::Future {
let inner = Arc::clone(&self.0);
let fut = async move {
<T as NoDataComponent>::transform_msg(&inner, request).await
};
Box::pin(fut)
}
}
let accept_compression_encodings = self.accept_compression_encodings;
let send_compression_encodings = self.send_compression_encodings;
let max_decoding_message_size = self.max_decoding_message_size;
let max_encoding_message_size = self.max_encoding_message_size;
let inner = self.inner.clone();
let fut = async move {
let method = TransformMsgSvc(inner);
let codec = tonic::codec::ProstCodec::default();
let mut grpc = tonic::server::Grpc::new(codec)
.apply_compression_config(
accept_compression_encodings,
send_compression_encodings,
)
.apply_max_message_size_config(
max_decoding_message_size,
max_encoding_message_size,
);
let res = grpc.unary(method, req).await;
Ok(res)
};
Box::pin(fut)
}
"/nodata.v1.NoDataComponent/Ping" => {
#[allow(non_camel_case_types)]
struct PingSvc<T: NoDataComponent>(pub Arc<T>);
impl<
T: NoDataComponent,
> tonic::server::UnaryService<super::PingRequest> for PingSvc<T> {
type Response = super::PingResponse;
type Future = BoxFuture<
tonic::Response<Self::Response>,
tonic::Status,
>;
fn call(
&mut self,
request: tonic::Request<super::PingRequest>,
) -> Self::Future {
let inner = Arc::clone(&self.0);
let fut = async move {
<T as NoDataComponent>::ping(&inner, request).await
};
Box::pin(fut)
}
}
let accept_compression_encodings = self.accept_compression_encodings;
let send_compression_encodings = self.send_compression_encodings;
let max_decoding_message_size = self.max_decoding_message_size;
let max_encoding_message_size = self.max_encoding_message_size;
let inner = self.inner.clone();
let fut = async move {
let method = PingSvc(inner);
let codec = tonic::codec::ProstCodec::default();
let mut grpc = tonic::server::Grpc::new(codec)
.apply_compression_config(
accept_compression_encodings,
send_compression_encodings,
)
.apply_max_message_size_config(
max_decoding_message_size,
max_encoding_message_size,
);
let res = grpc.unary(method, req).await;
Ok(res)
};
Box::pin(fut)
}
_ => {
Box::pin(async move {
Ok(
http::Response::builder()
.status(200)
.header("grpc-status", tonic::Code::Unimplemented as i32)
.header(
http::header::CONTENT_TYPE,
tonic::metadata::GRPC_CONTENT_TYPE,
)
.body(empty_body())
.unwrap(),
)
})
}
}
}
}
impl<T: NoDataComponent> Clone for NoDataComponentServer<T> {
fn clone(&self) -> Self {
let inner = self.inner.clone();
Self {
inner,
accept_compression_encodings: self.accept_compression_encodings,
send_compression_encodings: self.send_compression_encodings,
max_decoding_message_size: self.max_decoding_message_size,
max_encoding_message_size: self.max_encoding_message_size,
}
}
}
impl<T: NoDataComponent> tonic::server::NamedService for NoDataComponentServer<T> {
const NAME: &'static str = "nodata.v1.NoDataComponent";
}
}

View File

@ -0,0 +1 @@
include!("gen/nodata.v1.rs");

View File

@ -0,0 +1,109 @@
use std::pin::Pin;
use anyhow::Context;
use async_trait::async_trait;
use crunch_traits::{errors::TransportError, EventInfo, Transport};
use futures::{Stream, StreamExt};
use grpc::{no_data_service_client::NoDataServiceClient, PublishEventRequest, SubscribeRequest};
use tonic::transport::{Channel, ClientTlsConfig};
mod grpc;
pub struct NoDataTransport {
host: String,
}
impl NoDataTransport {
pub fn new(host: impl Into<String>) -> Self {
Self { host: host.into() }
}
async fn client(&self) -> anyhow::Result<NoDataServiceClient<tonic::transport::Channel>> {
let channel = if self.host.starts_with("https") {
Channel::from_shared(self.host.to_owned())
.context(format!("failed to connect to: {}", &self.host))?
.tls_config(ClientTlsConfig::new().with_native_roots())?
.connect()
.await
.context(format!("failed to connect to: {}", &self.host))?
} else {
Channel::from_shared(self.host.to_owned())
.context(format!("failed to connect to: {}", &self.host))?
.connect()
.await
.context(format!("failed to connect to: {}", &self.host))?
};
let client = NoDataServiceClient::new(channel);
Ok(client)
}
}
#[async_trait]
impl Transport for NoDataTransport {
type Stream = Pin<Box<dyn Stream<Item = Vec<u8>> + Send>>;
async fn publish(
&self,
event_info: &EventInfo,
content: Vec<u8>,
) -> Result<(), TransportError> {
let mut client = self.client().await.map_err(TransportError::Err)?;
client
.publish_event(PublishEventRequest {
topic: event_info.transport_name(),
value: content,
})
.await
.context("failed to send crunch(nodata) message")
.map_err(TransportError::Err)?;
Ok(())
}
async fn subscriber(
&self,
event_info: &EventInfo,
) -> Result<Option<Self::Stream>, TransportError> {
let mut client = self.client().await.map_err(TransportError::Err)?;
let resp_stream = client
.subscribe(SubscribeRequest {
topic: event_info.transport_name(),
})
.await
.context("failed to establish connection to nodata")
.map_err(TransportError::Err)?;
let sub = resp_stream.into_inner();
let stream = futures::stream::unfold(sub, |mut sub| async move {
tracing::trace!("got event from nodata");
let next = sub.next().await?;
match next {
Ok(next) => Some((next.value, sub)),
Err(e) => {
tracing::error!("failed to receive event from nodata: {e}");
None
}
}
});
Ok(Some(Box::pin(stream)))
}
}
trait EventInfoExt {
fn transport_name(&self) -> String;
}
impl EventInfoExt for EventInfo {
fn transport_name(&self) -> String {
format!(
"crunch.{}.{}.{}",
self.domain, self.entity_type, self.event_name
)
}
}

View File

@ -8,6 +8,7 @@ crunch-envelope.workspace = true
crunch-in-memory = { workspace = true, optional = true }
crunch-traits.workspace = true
crunch-nats = { workspace = true, optional = true }
crunch-nodata = { workspace = true, optional = true }
anyhow.workspace = true
tracing.workspace = true
@ -22,7 +23,8 @@ futures.workspace = true
tracing-subscriber.workspace = true
[features]
default = ["in-memory", "traits", "nats"]
default = ["in-memory", "traits"]
traits = []
in-memory = ["dep:crunch-in-memory"]
nats = ["dep:crunch-nats"]
nodata = ["dep:crunch-nodata"]

View File

@ -89,6 +89,15 @@ mod builder {
Ok(self)
}
#[cfg(feature = "nodata")]
pub async fn with_nodata_transport(
&mut self,
host: &str,
) -> Result<&mut Self, crunch_traits::errors::TransportError> {
self.transport = Some(Transport::nodata(host)?);
Ok(self)
}
pub fn with_outbox(&mut self, enabled: bool) -> &mut Self {
self.outbox_enabled = enabled;
self

View File

@ -23,6 +23,13 @@ impl Transport {
crunch_nats::NatsTransport::new(options).await?,
)))
}
#[cfg(feature = "nodata")]
pub fn nodata(host: &str) -> Result<Self, crunch_traits::errors::TransportError> {
Ok(Self(std::sync::Arc::new(
crunch_nodata::NoDataTransport::new(host),
)))
}
}
impl From<DynTransport> for Transport {