From 3c5c5759ca643bb1a7cda316a7f314c8d73fcbd8 Mon Sep 17 00:00:00 2001 From: kjuulh Date: Sat, 10 Aug 2024 00:54:10 +0200 Subject: [PATCH] feat: add data ingest Signed-off-by: kjuulh --- Cargo.lock | 333 ++++++++++++++++++++++++- buf.gen.yaml | 11 + buf.yaml | 4 + crates/nodata/Cargo.toml | 5 + crates/nodata/src/gen/nodata.rs | 19 ++ crates/nodata/src/gen/nodata.tonic.rs | 294 ++++++++++++++++++++++ crates/nodata/src/gen/nomicon.rs | 19 ++ crates/nodata/src/gen/nomicon.tonic.rs | 294 ++++++++++++++++++++++ crates/nodata/src/grpc.rs | 65 +++++ crates/nodata/src/main.rs | 75 +++++- proto/nomicon.proto | 20 ++ 11 files changed, 1126 insertions(+), 13 deletions(-) create mode 100644 buf.gen.yaml create mode 100644 buf.yaml create mode 100644 crates/nodata/src/gen/nodata.rs create mode 100644 crates/nodata/src/gen/nodata.tonic.rs create mode 100644 crates/nodata/src/gen/nomicon.rs create mode 100644 crates/nodata/src/gen/nomicon.tonic.rs create mode 100644 crates/nodata/src/grpc.rs create mode 100644 proto/nomicon.proto diff --git a/Cargo.lock b/Cargo.lock index 52036d3..4500689 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -36,6 +36,21 @@ version = "0.2.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5c6cb57a04249c6480766f7f7cef5467412af1490f8d1e243141daddada3264f" +[[package]] +name = "android-tzdata" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e999941b234f3131b00bc13c22d06e8c5ff726d1b6318ac7eb276997bbb4fef0" + +[[package]] +name = "android_system_properties" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "819e7219dbd41043ac279b19830f2efc897156490d7fd6ea916720117ee66311" +dependencies = [ + "libc", +] + [[package]] name = "anstream" version = "0.6.15" @@ -91,6 +106,28 @@ version = "1.0.86" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b3d1d046238990b9cf5bcde22a3fb3584ee5cf65fb2765f454ed428c7a0063da" +[[package]] +name = "async-stream" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cd56dd203fef61ac097dd65721a419ddccb106b2d2b70ba60a6b529f03961a51" +dependencies = [ + "async-stream-impl", + "futures-core", + "pin-project-lite", +] + +[[package]] +name = "async-stream-impl" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.72", +] + [[package]] name = "async-trait" version = "0.1.81" @@ -111,6 +148,12 @@ dependencies = [ "num-traits", ] +[[package]] +name = "atomic-waker" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0" + [[package]] name = "autocfg" version = "1.3.0" @@ -193,6 +236,12 @@ version = "0.21.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9d297deb1925b89f2ccc13d7635fa0714f12c87adce1c75356b39ca9b7178567" +[[package]] +name = "base64" +version = "0.22.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6" + [[package]] name = "base64ct" version = "1.6.0" @@ -223,6 +272,12 @@ dependencies = [ "generic-array", ] +[[package]] +name = "bumpalo" +version = "3.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "79296716171880943b8470b5f8d03aa55eb2e645a4874bdbb28adb49162e012c" + [[package]] name = "byteorder" version = "1.5.0" @@ -247,6 +302,21 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" +[[package]] +name = "chrono" +version = "0.4.38" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a21f936df1771bf62b77f047b726c4625ff2e8aa607c01ec06e5a05bd8463401" +dependencies = [ + "android-tzdata", + "iana-time-zone", + "js-sys", + "num-traits", + "serde", + "wasm-bindgen", + "windows-targets 0.52.6", +] + [[package]] name = "clap" version = "4.5.14" @@ -299,6 +369,12 @@ version = "0.9.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c2459377285ad874054d797f3ccebf984978aa39129f6eafde5cdc8315b612f8" +[[package]] +name = "core-foundation-sys" +version = "0.8.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06ea2b9bc92be3c2baa9334a323ebca2d6f074ff852cd1d7b11064035cd3868f" + [[package]] name = "cpufeatures" version = "0.2.12" @@ -593,6 +669,31 @@ version = "0.29.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "40ecd4077b5ae9fd2e9e169b102c6c330d0605168eb0e8bf79952b256dbefffd" +[[package]] +name = "h2" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fa82e28a107a8cc405f0839610bdc9b15f1e25ec7d696aa5cf173edbcb1486ab" +dependencies = [ + "atomic-waker", + "bytes", + "fnv", + "futures-core", + "futures-sink", + "http", + "indexmap 2.3.0", + "slab", + "tokio", + "tokio-util", + "tracing", +] + +[[package]] +name = "hashbrown" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" + [[package]] name = "hashbrown" version = "0.14.5" @@ -609,7 +710,7 @@ version = "0.8.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e8094feaf31ff591f651a2664fb9cfd92bba7a60ce3197265e9482ebe753c8f7" dependencies = [ - "hashbrown", + "hashbrown 0.14.5", ] [[package]] @@ -721,6 +822,7 @@ dependencies = [ "bytes", "futures-channel", "futures-util", + "h2", "http", "http-body", "httparse", @@ -729,6 +831,20 @@ dependencies = [ "pin-project-lite", "smallvec", "tokio", + "want", +] + +[[package]] +name = "hyper-timeout" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3203a961e5c83b6f5498933e78b6b263e208c197b63e9c6c53cc82ffd3f63793" +dependencies = [ + "hyper", + "hyper-util", + "pin-project-lite", + "tokio", + "tower-service", ] [[package]] @@ -738,12 +854,40 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cde7055719c54e36e95e8719f95883f22072a48ede39db7fc17a4e1d5281e9b9" dependencies = [ "bytes", + "futures-channel", "futures-util", "http", "http-body", "hyper", "pin-project-lite", + "socket2", "tokio", + "tower", + "tower-service", + "tracing", +] + +[[package]] +name = "iana-time-zone" +version = "0.1.60" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e7ffbb5a1b541ea2561f8c41c087286cc091e21e556a4f09a8f6cbf17b69b141" +dependencies = [ + "android_system_properties", + "core-foundation-sys", + "iana-time-zone-haiku", + "js-sys", + "wasm-bindgen", + "windows-core", +] + +[[package]] +name = "iana-time-zone-haiku" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f31827a206f56af32e590ba56d5d2d085f558508192593743f16b2306495269f" +dependencies = [ + "cc", ] [[package]] @@ -756,6 +900,16 @@ dependencies = [ "unicode-normalization", ] +[[package]] +name = "indexmap" +version = "1.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bd070e393353796e801d209ad339e89596eb4c8d430d18ede6a1cced8fafbd99" +dependencies = [ + "autocfg", + "hashbrown 0.12.3", +] + [[package]] name = "indexmap" version = "2.3.0" @@ -763,7 +917,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "de3fc2e30ba82dd1b3911c8de1ffc143c74a914a14e99514d7637e3099df5ea0" dependencies = [ "equivalent", - "hashbrown", + "hashbrown 0.14.5", ] [[package]] @@ -772,12 +926,30 @@ version = "1.70.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7943c866cc5cd64cbc25b2e01621d07fa8eb2a1a23160ee81ce38704e97b8ecf" +[[package]] +name = "itertools" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "413ee7dfc52ee1a4949ceeb7dbc8a33f2d6c088194d9f922fb8318faf1f01186" +dependencies = [ + "either", +] + [[package]] name = "itoa" version = "1.0.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "49f1f14873335454500d59611f1cf4a4b0f786f9ac11f4312a78e4cf2566695b" +[[package]] +name = "js-sys" +version = "0.3.69" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "29c15563dc2726973df627357ce0c9ddddbea194836909d655df6a75d2cf296d" +dependencies = [ + "wasm-bindgen", +] + [[package]] name = "lazy_static" version = "1.5.0" @@ -909,13 +1081,18 @@ version = "0.1.0" dependencies = [ "anyhow", "axum", + "bytes", + "chrono", "clap", "dotenv", "mad", + "prost", + "prost-types", "serde", "sqlx", "tokio", "tokio-util", + "tonic", "tower-http", "tracing", "tracing-subscriber", @@ -1143,6 +1320,38 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "prost" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e13db3d3fde688c61e2446b4d843bc27a7e8af269a69440c0308021dc92333cc" +dependencies = [ + "bytes", + "prost-derive", +] + +[[package]] +name = "prost-derive" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "18bec9b0adc4eba778b33684b7ba3e7137789434769ee3ce3930463ef904cfca" +dependencies = [ + "anyhow", + "itertools", + "proc-macro2", + "quote", + "syn 2.0.72", +] + +[[package]] +name = "prost-types" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cee5168b05f49d4b0ca581206eb14a7b22fafd963efe729ac48eb03266e25cc2" +dependencies = [ + "prost", +] + [[package]] name = "quote" version = "1.0.36" @@ -1271,7 +1480,7 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1c74cae0a4cf6ccbbf5f359f08efdf8ee7e1dc532573bf0db71968cb56b1448c" dependencies = [ - "base64", + "base64 0.21.7", ] [[package]] @@ -1504,7 +1713,7 @@ dependencies = [ "futures-util", "hashlink", "hex", - "indexmap", + "indexmap 2.3.0", "log", "memchr", "once_cell", @@ -1573,7 +1782,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1ed31390216d20e538e447a7a9b959e06ed9fc51c37b514b46eb758016ecd418" dependencies = [ "atoi", - "base64", + "base64 0.21.7", "bitflags 2.6.0", "byteorder", "bytes", @@ -1617,7 +1826,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7c824eb80b894f926f89a0b9da0c7f435d27cdd35b8c655b114e58223918577e" dependencies = [ "atoi", - "base64", + "base64 0.21.7", "bitflags 2.6.0", "byteorder", "crc", @@ -1874,6 +2083,36 @@ dependencies = [ "tokio", ] +[[package]] +name = "tonic" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "38659f4a91aba8598d27821589f5db7dddd94601e7a01b1e485a50e5484c7401" +dependencies = [ + "async-stream", + "async-trait", + "axum", + "base64 0.22.1", + "bytes", + "h2", + "http", + "http-body", + "http-body-util", + "hyper", + "hyper-timeout", + "hyper-util", + "percent-encoding", + "pin-project", + "prost", + "socket2", + "tokio", + "tokio-stream", + "tower", + "tower-layer", + "tower-service", + "tracing", +] + [[package]] name = "tower" version = "0.4.13" @@ -1882,9 +2121,13 @@ checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c" dependencies = [ "futures-core", "futures-util", + "indexmap 1.9.3", "pin-project", "pin-project-lite", + "rand", + "slab", "tokio", + "tokio-util", "tower-layer", "tower-service", "tracing", @@ -1977,6 +2220,12 @@ dependencies = [ "tracing-log", ] +[[package]] +name = "try-lock" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" + [[package]] name = "typenum" version = "1.17.0" @@ -2078,6 +2327,15 @@ version = "0.9.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a" +[[package]] +name = "want" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bfa7760aed19e106de2c7c0b581b509f2f25d3dacaf737cb82ac61bc6d760b0e" +dependencies = [ + "try-lock", +] + [[package]] name = "wasi" version = "0.11.0+wasi-snapshot-preview1" @@ -2090,6 +2348,60 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b8dad83b4f25e74f184f64c43b150b91efe7647395b42289f38e50566d82855b" +[[package]] +name = "wasm-bindgen" +version = "0.2.92" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4be2531df63900aeb2bca0daaaddec08491ee64ceecbee5076636a3b026795a8" +dependencies = [ + "cfg-if", + "wasm-bindgen-macro", +] + +[[package]] +name = "wasm-bindgen-backend" +version = "0.2.92" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "614d787b966d3989fa7bb98a654e369c762374fd3213d212cfc0251257e747da" +dependencies = [ + "bumpalo", + "log", + "once_cell", + "proc-macro2", + "quote", + "syn 2.0.72", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-macro" +version = "0.2.92" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a1f8823de937b71b9460c0c34e25f3da88250760bec0ebac694b49997550d726" +dependencies = [ + "quote", + "wasm-bindgen-macro-support", +] + +[[package]] +name = "wasm-bindgen-macro-support" +version = "0.2.92" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e94f17b526d0a461a191c78ea52bbce64071ed5c04c9ffe424dcb38f74171bb7" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.72", + "wasm-bindgen-backend", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-shared" +version = "0.2.92" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "af190c94f2773fdb3729c55b007a722abb5384da03bc0986df4c289bf5567e96" + [[package]] name = "webpki-roots" version = "0.25.4" @@ -2128,6 +2440,15 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" +[[package]] +name = "windows-core" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "33ab640c8d7e35bf8ba19b884ba838ceb4fba93a4e8c65a9059d08afcfc683d9" +dependencies = [ + "windows-targets 0.52.6", +] + [[package]] name = "windows-sys" version = "0.48.0" diff --git a/buf.gen.yaml b/buf.gen.yaml new file mode 100644 index 0000000..e69238f --- /dev/null +++ b/buf.gen.yaml @@ -0,0 +1,11 @@ +version: v2 +managed: + enabled: true +plugins: + # dependencies +- remote: buf.build/community/neoeinstein-prost + out: crates/nodata/src/gen +- remote: buf.build/community/neoeinstein-tonic:v0.4.0 + out: crates/nodata/src/gen +inputs: + - directory: proto diff --git a/buf.yaml b/buf.yaml new file mode 100644 index 0000000..87fb918 --- /dev/null +++ b/buf.yaml @@ -0,0 +1,4 @@ +version: v2 +modules: + - path: proto + name: buf.build/noschemaplz/nodata diff --git a/crates/nodata/Cargo.toml b/crates/nodata/Cargo.toml index 9e03fcb..cba8d2f 100644 --- a/crates/nodata/Cargo.toml +++ b/crates/nodata/Cargo.toml @@ -24,3 +24,8 @@ uuid = { version = "1.7.0", features = ["v4"] } tower-http = { version = "0.5.2", features = ["cors", "trace"] } mad = { git = "https://github.com/kjuulh/mad", branch = "main" } tokio-util = "0.7.11" +tonic = "0.12.1" +bytes = "1.7.1" +prost = "0.13.1" +prost-types = "0.13.1" +chrono = { version = "0.4.38", features = ["serde"] } diff --git a/crates/nodata/src/gen/nodata.rs b/crates/nodata/src/gen/nodata.rs new file mode 100644 index 0000000..6eda830 --- /dev/null +++ b/crates/nodata/src/gen/nodata.rs @@ -0,0 +1,19 @@ +// @generated +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct PublishEventRequest { + #[prost(string, tag="1")] + pub topic: ::prost::alloc::string::String, + #[prost(message, optional, tag="2")] + pub published: ::core::option::Option<::prost_types::Timestamp>, + #[prost(string, tag="3")] + pub key: ::prost::alloc::string::String, + #[prost(bytes="vec", tag="4")] + pub value: ::prost::alloc::vec::Vec, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct PublishEventResponse { +} +include!("nodata.tonic.rs"); +// @@protoc_insertion_point(module) \ No newline at end of file diff --git a/crates/nodata/src/gen/nodata.tonic.rs b/crates/nodata/src/gen/nodata.tonic.rs new file mode 100644 index 0000000..8c85f58 --- /dev/null +++ b/crates/nodata/src/gen/nodata.tonic.rs @@ -0,0 +1,294 @@ +// @generated +/// Generated client implementations. +pub mod no_data_client { + #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] + use tonic::codegen::*; + use tonic::codegen::http::Uri; + #[derive(Debug, Clone)] + pub struct NoDataClient { + inner: tonic::client::Grpc, + } + impl NoDataClient { + /// Attempt to create a new client by connecting to a given endpoint. + pub async fn connect(dst: D) -> Result + where + D: TryInto, + D::Error: Into, + { + let conn = tonic::transport::Endpoint::new(dst)?.connect().await?; + Ok(Self::new(conn)) + } + } + impl NoDataClient + where + T: tonic::client::GrpcService, + T::Error: Into, + T::ResponseBody: Body + Send + 'static, + ::Error: Into + Send, + { + pub fn new(inner: T) -> Self { + let inner = tonic::client::Grpc::new(inner); + Self { inner } + } + pub fn with_origin(inner: T, origin: Uri) -> Self { + let inner = tonic::client::Grpc::with_origin(inner, origin); + Self { inner } + } + pub fn with_interceptor( + inner: T, + interceptor: F, + ) -> NoDataClient> + where + F: tonic::service::Interceptor, + T::ResponseBody: Default, + T: tonic::codegen::Service< + http::Request, + Response = http::Response< + >::ResponseBody, + >, + >, + , + >>::Error: Into + Send + Sync, + { + NoDataClient::new(InterceptedService::new(inner, interceptor)) + } + /// Compress requests with the given encoding. + /// + /// This requires the server to support it otherwise it might respond with an + /// error. + #[must_use] + pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.inner = self.inner.send_compressed(encoding); + self + } + /// Enable decompressing responses. + #[must_use] + pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.inner = self.inner.accept_compressed(encoding); + self + } + /// Limits the maximum size of a decoded message. + /// + /// Default: `4MB` + #[must_use] + pub fn max_decoding_message_size(mut self, limit: usize) -> Self { + self.inner = self.inner.max_decoding_message_size(limit); + self + } + /// Limits the maximum size of an encoded message. + /// + /// Default: `usize::MAX` + #[must_use] + pub fn max_encoding_message_size(mut self, limit: usize) -> Self { + self.inner = self.inner.max_encoding_message_size(limit); + self + } + pub async fn publish_event( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/nodata.NoData/PublishEvent", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert(GrpcMethod::new("nodata.NoData", "PublishEvent")); + self.inner.unary(req, path, codec).await + } + } +} +/// Generated server implementations. +pub mod no_data_server { + #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] + use tonic::codegen::*; + /// Generated trait containing gRPC methods that should be implemented for use with NoDataServer. + #[async_trait] + pub trait NoData: Send + Sync + 'static { + async fn publish_event( + &self, + request: tonic::Request, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; + } + #[derive(Debug)] + pub struct NoDataServer { + inner: _Inner, + accept_compression_encodings: EnabledCompressionEncodings, + send_compression_encodings: EnabledCompressionEncodings, + max_decoding_message_size: Option, + max_encoding_message_size: Option, + } + struct _Inner(Arc); + impl NoDataServer { + pub fn new(inner: T) -> Self { + Self::from_arc(Arc::new(inner)) + } + pub fn from_arc(inner: Arc) -> Self { + let inner = _Inner(inner); + Self { + inner, + accept_compression_encodings: Default::default(), + send_compression_encodings: Default::default(), + max_decoding_message_size: None, + max_encoding_message_size: None, + } + } + pub fn with_interceptor( + inner: T, + interceptor: F, + ) -> InterceptedService + where + F: tonic::service::Interceptor, + { + InterceptedService::new(Self::new(inner), interceptor) + } + /// Enable decompressing requests with the given encoding. + #[must_use] + pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.accept_compression_encodings.enable(encoding); + self + } + /// Compress responses with the given encoding, if the client supports it. + #[must_use] + pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.send_compression_encodings.enable(encoding); + self + } + /// Limits the maximum size of a decoded message. + /// + /// Default: `4MB` + #[must_use] + pub fn max_decoding_message_size(mut self, limit: usize) -> Self { + self.max_decoding_message_size = Some(limit); + self + } + /// Limits the maximum size of an encoded message. + /// + /// Default: `usize::MAX` + #[must_use] + pub fn max_encoding_message_size(mut self, limit: usize) -> Self { + self.max_encoding_message_size = Some(limit); + self + } + } + impl tonic::codegen::Service> for NoDataServer + where + T: NoData, + B: Body + Send + 'static, + B::Error: Into + Send + 'static, + { + type Response = http::Response; + type Error = std::convert::Infallible; + type Future = BoxFuture; + fn poll_ready( + &mut self, + _cx: &mut Context<'_>, + ) -> Poll> { + Poll::Ready(Ok(())) + } + fn call(&mut self, req: http::Request) -> Self::Future { + let inner = self.inner.clone(); + match req.uri().path() { + "/nodata.NoData/PublishEvent" => { + #[allow(non_camel_case_types)] + struct PublishEventSvc(pub Arc); + impl< + T: NoData, + > tonic::server::UnaryService + for PublishEventSvc { + type Response = super::PublishEventResponse; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::publish_event(&inner, request).await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let inner = inner.0; + let method = PublishEventSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + _ => { + Box::pin(async move { + Ok( + http::Response::builder() + .status(200) + .header("grpc-status", "12") + .header("content-type", "application/grpc") + .body(empty_body()) + .unwrap(), + ) + }) + } + } + } + } + impl Clone for NoDataServer { + fn clone(&self) -> Self { + let inner = self.inner.clone(); + Self { + inner, + accept_compression_encodings: self.accept_compression_encodings, + send_compression_encodings: self.send_compression_encodings, + max_decoding_message_size: self.max_decoding_message_size, + max_encoding_message_size: self.max_encoding_message_size, + } + } + } + impl Clone for _Inner { + fn clone(&self) -> Self { + Self(Arc::clone(&self.0)) + } + } + impl std::fmt::Debug for _Inner { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{:?}", self.0) + } + } + impl tonic::server::NamedService for NoDataServer { + const NAME: &'static str = "nodata.NoData"; + } +} diff --git a/crates/nodata/src/gen/nomicon.rs b/crates/nodata/src/gen/nomicon.rs new file mode 100644 index 0000000..3a69fca --- /dev/null +++ b/crates/nodata/src/gen/nomicon.rs @@ -0,0 +1,19 @@ +// @generated +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct PublishEventRequest { + #[prost(string, tag="1")] + pub topic: ::prost::alloc::string::String, + #[prost(message, optional, tag="2")] + pub published: ::core::option::Option<::prost_types::Timestamp>, + #[prost(string, tag="3")] + pub key: ::prost::alloc::string::String, + #[prost(bytes="vec", tag="4")] + pub value: ::prost::alloc::vec::Vec, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct PublishEventResponse { +} +include!("nomicon.tonic.rs"); +// @@protoc_insertion_point(module) \ No newline at end of file diff --git a/crates/nodata/src/gen/nomicon.tonic.rs b/crates/nodata/src/gen/nomicon.tonic.rs new file mode 100644 index 0000000..84e8c88 --- /dev/null +++ b/crates/nodata/src/gen/nomicon.tonic.rs @@ -0,0 +1,294 @@ +// @generated +/// Generated client implementations. +pub mod no_data_client { + #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] + use tonic::codegen::*; + use tonic::codegen::http::Uri; + #[derive(Debug, Clone)] + pub struct NoDataClient { + inner: tonic::client::Grpc, + } + impl NoDataClient { + /// Attempt to create a new client by connecting to a given endpoint. + pub async fn connect(dst: D) -> Result + where + D: TryInto, + D::Error: Into, + { + let conn = tonic::transport::Endpoint::new(dst)?.connect().await?; + Ok(Self::new(conn)) + } + } + impl NoDataClient + where + T: tonic::client::GrpcService, + T::Error: Into, + T::ResponseBody: Body + Send + 'static, + ::Error: Into + Send, + { + pub fn new(inner: T) -> Self { + let inner = tonic::client::Grpc::new(inner); + Self { inner } + } + pub fn with_origin(inner: T, origin: Uri) -> Self { + let inner = tonic::client::Grpc::with_origin(inner, origin); + Self { inner } + } + pub fn with_interceptor( + inner: T, + interceptor: F, + ) -> NoDataClient> + where + F: tonic::service::Interceptor, + T::ResponseBody: Default, + T: tonic::codegen::Service< + http::Request, + Response = http::Response< + >::ResponseBody, + >, + >, + , + >>::Error: Into + Send + Sync, + { + NoDataClient::new(InterceptedService::new(inner, interceptor)) + } + /// Compress requests with the given encoding. + /// + /// This requires the server to support it otherwise it might respond with an + /// error. + #[must_use] + pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.inner = self.inner.send_compressed(encoding); + self + } + /// Enable decompressing responses. + #[must_use] + pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.inner = self.inner.accept_compressed(encoding); + self + } + /// Limits the maximum size of a decoded message. + /// + /// Default: `4MB` + #[must_use] + pub fn max_decoding_message_size(mut self, limit: usize) -> Self { + self.inner = self.inner.max_decoding_message_size(limit); + self + } + /// Limits the maximum size of an encoded message. + /// + /// Default: `usize::MAX` + #[must_use] + pub fn max_encoding_message_size(mut self, limit: usize) -> Self { + self.inner = self.inner.max_encoding_message_size(limit); + self + } + pub async fn publish_event( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/nomicon.NoData/PublishEvent", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert(GrpcMethod::new("nomicon.NoData", "PublishEvent")); + self.inner.unary(req, path, codec).await + } + } +} +/// Generated server implementations. +pub mod no_data_server { + #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] + use tonic::codegen::*; + /// Generated trait containing gRPC methods that should be implemented for use with NoDataServer. + #[async_trait] + pub trait NoData: Send + Sync + 'static { + async fn publish_event( + &self, + request: tonic::Request, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; + } + #[derive(Debug)] + pub struct NoDataServer { + inner: _Inner, + accept_compression_encodings: EnabledCompressionEncodings, + send_compression_encodings: EnabledCompressionEncodings, + max_decoding_message_size: Option, + max_encoding_message_size: Option, + } + struct _Inner(Arc); + impl NoDataServer { + pub fn new(inner: T) -> Self { + Self::from_arc(Arc::new(inner)) + } + pub fn from_arc(inner: Arc) -> Self { + let inner = _Inner(inner); + Self { + inner, + accept_compression_encodings: Default::default(), + send_compression_encodings: Default::default(), + max_decoding_message_size: None, + max_encoding_message_size: None, + } + } + pub fn with_interceptor( + inner: T, + interceptor: F, + ) -> InterceptedService + where + F: tonic::service::Interceptor, + { + InterceptedService::new(Self::new(inner), interceptor) + } + /// Enable decompressing requests with the given encoding. + #[must_use] + pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.accept_compression_encodings.enable(encoding); + self + } + /// Compress responses with the given encoding, if the client supports it. + #[must_use] + pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.send_compression_encodings.enable(encoding); + self + } + /// Limits the maximum size of a decoded message. + /// + /// Default: `4MB` + #[must_use] + pub fn max_decoding_message_size(mut self, limit: usize) -> Self { + self.max_decoding_message_size = Some(limit); + self + } + /// Limits the maximum size of an encoded message. + /// + /// Default: `usize::MAX` + #[must_use] + pub fn max_encoding_message_size(mut self, limit: usize) -> Self { + self.max_encoding_message_size = Some(limit); + self + } + } + impl tonic::codegen::Service> for NoDataServer + where + T: NoData, + B: Body + Send + 'static, + B::Error: Into + Send + 'static, + { + type Response = http::Response; + type Error = std::convert::Infallible; + type Future = BoxFuture; + fn poll_ready( + &mut self, + _cx: &mut Context<'_>, + ) -> Poll> { + Poll::Ready(Ok(())) + } + fn call(&mut self, req: http::Request) -> Self::Future { + let inner = self.inner.clone(); + match req.uri().path() { + "/nomicon.NoData/PublishEvent" => { + #[allow(non_camel_case_types)] + struct PublishEventSvc(pub Arc); + impl< + T: NoData, + > tonic::server::UnaryService + for PublishEventSvc { + type Response = super::PublishEventResponse; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::publish_event(&inner, request).await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let inner = inner.0; + let method = PublishEventSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + _ => { + Box::pin(async move { + Ok( + http::Response::builder() + .status(200) + .header("grpc-status", "12") + .header("content-type", "application/grpc") + .body(empty_body()) + .unwrap(), + ) + }) + } + } + } + } + impl Clone for NoDataServer { + fn clone(&self) -> Self { + let inner = self.inner.clone(); + Self { + inner, + accept_compression_encodings: self.accept_compression_encodings, + send_compression_encodings: self.send_compression_encodings, + max_decoding_message_size: self.max_decoding_message_size, + max_encoding_message_size: self.max_encoding_message_size, + } + } + } + impl Clone for _Inner { + fn clone(&self) -> Self { + Self(Arc::clone(&self.0)) + } + } + impl std::fmt::Debug for _Inner { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{:?}", self.0) + } + } + impl tonic::server::NamedService for NoDataServer { + const NAME: &'static str = "nomicon.NoData"; + } +} diff --git a/crates/nodata/src/grpc.rs b/crates/nodata/src/grpc.rs new file mode 100644 index 0000000..680ecb4 --- /dev/null +++ b/crates/nodata/src/grpc.rs @@ -0,0 +1,65 @@ +use std::net::SocketAddr; + +use anyhow::Context; +use mad::Component; + +use crate::state::SharedState; + +include!("gen/nodata.rs"); + +#[derive(Clone)] +pub struct GrpcServer { + state: SharedState, + host: SocketAddr, +} + +impl GrpcServer { + pub fn new(state: &SharedState, host: SocketAddr) -> Self { + Self { + state: state.clone(), + host, + } + } +} + +#[tonic::async_trait] +impl no_data_server::NoData for GrpcServer { + async fn publish_event( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status> { + let req = request.into_inner(); + + tracing::debug!( + topic = req.topic, + key = req.key, + value = std::str::from_utf8(&req.value).ok(), + "handling event" + ); + + Ok(tonic::Response::new(PublishEventResponse {})) + } +} + +#[axum::async_trait] +impl Component for GrpcServer { + fn name(&self) -> Option { + Some("grpc_server".into()) + } + + async fn run( + &self, + _cancellation_token: tokio_util::sync::CancellationToken, + ) -> Result<(), mad::MadError> { + tracing::info!("grpc listening on: {}", self.host); + + tonic::transport::Server::builder() + .add_service(no_data_server::NoDataServer::new(self.clone())) + .serve(self.host) + .await + .context("grpc server failed") + .map_err(mad::MadError::Inner)?; + + Ok(()) + } +} diff --git a/crates/nodata/src/main.rs b/crates/nodata/src/main.rs index 0809784..30df818 100644 --- a/crates/nodata/src/main.rs +++ b/crates/nodata/src/main.rs @@ -1,9 +1,12 @@ +mod grpc; mod http; mod state; use std::net::SocketAddr; +use chrono::{Datelike, Timelike}; use clap::{Parser, Subcommand}; +use grpc::{GrpcServer, PublishEventRequest}; use http::HttpServer; use mad::Mad; use state::SharedState; @@ -20,6 +23,34 @@ enum Commands { Serve { #[arg(env = "SERVICE_HOST", long, default_value = "127.0.0.1:3000")] host: SocketAddr, + #[arg(env = "GRPC_SERVICE_HOST", long, default_value = "127.0.0.1:7900")] + grpc_host: SocketAddr, + }, + + Client { + #[arg(env = "SERVICE_HOST", long, default_value = "http://127.0.0.1:3000")] + host: String, + #[arg( + env = "GRPC_SERVICE_HOST", + long, + default_value = "http://127.0.0.1:7900" + )] + grpc_host: String, + + #[command(subcommand)] + commands: ClientCommands, + }, +} + +#[derive(Subcommand)] +enum ClientCommands { + PublishEvent { + #[arg(long)] + topic: String, + #[arg(long)] + key: String, + #[arg(long)] + value: String, }, } @@ -30,15 +61,45 @@ async fn main() -> anyhow::Result<()> { let cli = Command::parse(); - if let Some(Commands::Serve { host }) = cli.command { - tracing::info!("Starting service"); + match cli.command.unwrap() { + Commands::Serve { host, grpc_host } => { + tracing::info!("Starting service"); + let state = SharedState::new().await?; + Mad::builder() + .add(HttpServer::new(&state, host)) + .add(GrpcServer::new(&state, grpc_host)) + .run() + .await?; + } + Commands::Client { + commands, + host, + grpc_host, + } => match commands { + ClientCommands::PublishEvent { topic, key, value } => { + let mut client = + crate::grpc::no_data_client::NoDataClient::connect(grpc_host).await?; - let state = SharedState::new().await?; + let timestamp = chrono::Utc::now(); - Mad::builder() - .add(HttpServer::new(&state, host)) - .run() - .await?; + let res = client + .publish_event(PublishEventRequest { + topic, + published: Some(prost_types::Timestamp::date_time_nanos( + timestamp.year() as i64, + timestamp.month() as u8, + timestamp.day() as u8, + timestamp.hour() as u8, + timestamp.minute() as u8, + timestamp.second() as u8, + timestamp.nanosecond(), + )?), + key, + value: value.into_bytes(), + }) + .await?; + } + }, } Ok(()) diff --git a/proto/nomicon.proto b/proto/nomicon.proto new file mode 100644 index 0000000..a389002 --- /dev/null +++ b/proto/nomicon.proto @@ -0,0 +1,20 @@ +syntax = "proto3"; + +import "google/protobuf/timestamp.proto"; + +package nodata; + +service NoData { + rpc PublishEvent(PublishEventRequest) returns (PublishEventResponse) {} +} + +message PublishEventRequest { + string topic = 1; + google.protobuf.Timestamp published = 2; + string key = 3; + bytes value = 4; +} + +message PublishEventResponse { + +}