From c51cd4dfc1e34d8cfda1568877aeb84220bdc648 Mon Sep 17 00:00:00 2001 From: kjuulh Date: Sun, 18 Aug 2024 01:13:27 +0200 Subject: [PATCH] feat: with dagger engine Signed-off-by: kjuulh --- Cargo.lock | 619 +++++++++++++++++- README.md | 10 +- crates/nodata/Cargo.toml | 1 + crates/nodata/src/component.rs | 24 + crates/nodata/src/dagger_engine.rs | 260 ++++++++ crates/nodata/src/gen/nodata.v1.rs | 52 +- crates/nodata/src/gen/nodata.v1.tonic.rs | 370 +++++++++++ crates/nodata/src/grpc_component.rs | 26 + crates/nodata/src/main.rs | 3 + .../nodata/v1/{nomicon.proto => nodata.proto} | 0 proto/nodata/v1/nodata_component.proto | 16 + 11 files changed, 1339 insertions(+), 42 deletions(-) create mode 100644 crates/nodata/src/component.rs create mode 100644 crates/nodata/src/dagger_engine.rs create mode 100644 crates/nodata/src/grpc_component.rs rename proto/nodata/v1/{nomicon.proto => nodata.proto} (100%) create mode 100644 proto/nodata/v1/nodata_component.proto diff --git a/Cargo.lock b/Cargo.lock index 7bbb31d..626c99a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -115,6 +115,12 @@ version = "1.0.86" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b3d1d046238990b9cf5bcde22a3fb3584ee5cf65fb2765f454ed428c7a0063da" +[[package]] +name = "ascii" +version = "0.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eab1c04a571841102f5345a8fc0f6bb3d31c315dec879b5c6e42e40ce7ffa34e" + [[package]] name = "async-stream" version = "0.3.5" @@ -179,10 +185,10 @@ dependencies = [ "axum-core", "bytes", "futures-util", - "http", - "http-body", + "http 1.1.0", + "http-body 1.0.1", "http-body-util", - "hyper", + "hyper 1.4.1", "hyper-util", "itoa", "matchit", @@ -212,8 +218,8 @@ dependencies = [ "async-trait", "bytes", "futures-util", - "http", - "http-body", + "http 1.1.0", + "http-body 1.0.1", "http-body-util", "mime", "pin-project-lite", @@ -348,7 +354,7 @@ dependencies = [ "anstream", "anstyle", "clap_lex", - "strsim", + "strsim 0.11.1", ] [[package]] @@ -375,12 +381,35 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d3fd119d74b830634cea2a0f58bbd0d54540518a14397557951e79340abc28c0" +[[package]] +name = "combine" +version = "3.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da3da6baa321ec19e1cc41d31bf599f00c783d0517095cdaf0332e3fe8d20680" +dependencies = [ + "ascii", + "byteorder", + "either", + "memchr", + "unreachable", +] + [[package]] name = "const-oid" version = "0.9.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c2459377285ad874054d797f3ccebf984978aa39129f6eafde5cdc8315b612f8" +[[package]] +name = "core-foundation" +version = "0.9.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "91e195e091a93c46f7102ec7818a2aa394e1e1771c3ab4825963fa03e45afb8f" +dependencies = [ + "core-foundation-sys", + "libc", +] + [[package]] name = "core-foundation-sys" version = "0.8.7" @@ -411,6 +440,15 @@ version = "2.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "19d374276b40fb8bbdee95aef7c7fa6b5316ec764510eb64b8dd0e2ed0d7e7f5" +[[package]] +name = "crc32fast" +version = "1.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a97769d94ddab943e4510d138150169a2758b5ef3eb191a9ee688de3e23ef7b3" +dependencies = [ + "cfg-if", +] + [[package]] name = "crossbeam-queue" version = "0.3.11" @@ -436,6 +474,71 @@ dependencies = [ "typenum", ] +[[package]] +name = "dagger-sdk" +version = "0.11.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f34f5a056235b371e8aabc7a3ccd90632e89316aefd081067cf0e304445e3c1d" +dependencies = [ + "async-trait", + "base64 0.21.7", + "derive_builder", + "dirs", + "eyre", + "flate2", + "futures", + "graphql_client", + "hex", + "hex-literal", + "platform-info", + "reqwest", + "serde", + "serde_graphql_input", + "serde_json", + "sha2", + "tar", + "tempfile", + "thiserror", + "tokio", + "tracing", + "tracing-subscriber", +] + +[[package]] +name = "darling" +version = "0.14.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b750cb3417fd1b327431a470f388520309479ab0bf5e323505daf0290cd3850" +dependencies = [ + "darling_core", + "darling_macro", +] + +[[package]] +name = "darling_core" +version = "0.14.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "109c1ca6e6b7f82cc233a97004ea8ed7ca123a9af07a8230878fcfda9b158bf0" +dependencies = [ + "fnv", + "ident_case", + "proc-macro2", + "quote", + "strsim 0.10.0", + "syn 1.0.109", +] + +[[package]] +name = "darling_macro" +version = "0.14.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a4aab4dbc9f7611d8b55048a3a16d2d010c2c8334e46304b40ac1cc14bf3b48e" +dependencies = [ + "darling_core", + "quote", + "syn 1.0.109", +] + [[package]] name = "der" version = "0.7.9" @@ -456,6 +559,37 @@ dependencies = [ "powerfmt", ] +[[package]] +name = "derive_builder" +version = "0.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8d67778784b508018359cbc8696edb3db78160bab2c2a28ba7f56ef6932997f8" +dependencies = [ + "derive_builder_macro", +] + +[[package]] +name = "derive_builder_core" +version = "0.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c11bdc11a0c47bc7d37d582b5285da6849c96681023680b906673c5707af7b0f" +dependencies = [ + "darling", + "proc-macro2", + "quote", + "syn 1.0.109", +] + +[[package]] +name = "derive_builder_macro" +version = "0.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ebcda35c7a396850a55ffeac740804b40ffec779b98fffbb1738f4033f0ee79e" +dependencies = [ + "derive_builder_core", + "syn 1.0.109", +] + [[package]] name = "digest" version = "0.10.7" @@ -468,6 +602,27 @@ dependencies = [ "subtle", ] +[[package]] +name = "dirs" +version = "5.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "44c45a9d03d6676652bcb5e724c7e988de1acad23a711b5217ab9cbecbec2225" +dependencies = [ + "dirs-sys", +] + +[[package]] +name = "dirs-sys" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "520f05a5cbd335fae5a99ff7a6ab8627577660ee5cfd6a94a6a929b52ff0321c" +dependencies = [ + "libc", + "option-ext", + "redox_users", + "windows-sys 0.48.0", +] + [[package]] name = "dotenv" version = "0.15.0" @@ -502,6 +657,15 @@ dependencies = [ "serde", ] +[[package]] +name = "encoding_rs" +version = "0.8.34" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b45de904aa0b010bce2ab45264d0631681847fa7b6f2eaa7dab7619943bc4f59" +dependencies = [ + "cfg-if", +] + [[package]] name = "equivalent" version = "1.0.1" @@ -535,12 +699,44 @@ version = "2.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0" +[[package]] +name = "eyre" +version = "0.6.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7cd915d99f24784cdc19fd37ef22b97e3ff0ae756c7e492e9fbfe897d61e2aec" +dependencies = [ + "indenter", + "once_cell", +] + [[package]] name = "fastrand" version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9fc0510504f03c51ada170672ac806f1f105a88aa97a5281117e1ddc3368e51a" +[[package]] +name = "filetime" +version = "0.2.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bf401df4a4e3872c4fe8151134cf483738e74b67fc934d6532c882b3d24a4550" +dependencies = [ + "cfg-if", + "libc", + "libredox", + "windows-sys 0.59.0", +] + +[[package]] +name = "flate2" +version = "1.0.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f211bbe8e69bbd0cfdea405084f128ae8b4aaa6b0b522fc8f2b009084797920" +dependencies = [ + "crc32fast", + "miniz_oxide", +] + [[package]] name = "flume" version = "0.11.0" @@ -694,6 +890,84 @@ version = "0.29.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "40ecd4077b5ae9fd2e9e169b102c6c330d0605168eb0e8bf79952b256dbefffd" +[[package]] +name = "graphql-introspection-query" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f2a4732cf5140bd6c082434494f785a19cfb566ab07d1382c3671f5812fed6d" +dependencies = [ + "serde", +] + +[[package]] +name = "graphql-parser" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d2ebc8013b4426d5b81a4364c419a95ed0b404af2b82e2457de52d9348f0e474" +dependencies = [ + "combine", + "thiserror", +] + +[[package]] +name = "graphql_client" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09cdf7b487d864c2939b23902291a5041bc4a84418268f25fda1c8d4e15ad8fa" +dependencies = [ + "graphql_query_derive", + "reqwest", + "serde", + "serde_json", +] + +[[package]] +name = "graphql_client_codegen" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a40f793251171991c4eb75bd84bc640afa8b68ff6907bc89d3b712a22f700506" +dependencies = [ + "graphql-introspection-query", + "graphql-parser", + "heck 0.4.1", + "lazy_static", + "proc-macro2", + "quote", + "serde", + "serde_json", + "syn 1.0.109", +] + +[[package]] +name = "graphql_query_derive" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "00bda454f3d313f909298f626115092d348bc231025699f557b27e248475f48c" +dependencies = [ + "graphql_client_codegen", + "proc-macro2", + "syn 1.0.109", +] + +[[package]] +name = "h2" +version = "0.3.26" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "81fe527a889e1532da5c525686d96d4c2e74cdd345badf8dfef9f6b39dd5f5e8" +dependencies = [ + "bytes", + "fnv", + "futures-core", + "futures-sink", + "futures-util", + "http 0.2.12", + "indexmap 2.4.0", + "slab", + "tokio", + "tokio-util", + "tracing", +] + [[package]] name = "h2" version = "0.4.5" @@ -705,7 +979,7 @@ dependencies = [ "fnv", "futures-core", "futures-sink", - "http", + "http 1.1.0", "indexmap 2.4.0", "slab", "tokio", @@ -765,6 +1039,12 @@ version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" +[[package]] +name = "hex-literal" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6fe2267d4ed49bc07b63801559be28c718ea06c4738b7a03c94df7386d2cde46" + [[package]] name = "hkdf" version = "0.12.4" @@ -792,6 +1072,17 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "http" +version = "0.2.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "601cbb57e577e2f5ef5be8e7b83f0f63994f25aa94d673e54a92d5c516d101f1" +dependencies = [ + "bytes", + "fnv", + "itoa", +] + [[package]] name = "http" version = "1.1.0" @@ -803,6 +1094,17 @@ dependencies = [ "itoa", ] +[[package]] +name = "http-body" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7ceab25649e9960c0311ea418d17bee82c0dcec1bd053b5f9a66e265a693bed2" +dependencies = [ + "bytes", + "http 0.2.12", + "pin-project-lite", +] + [[package]] name = "http-body" version = "1.0.1" @@ -810,7 +1112,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1efedce1fb8e6913f23e0c92de8e62cd5b772a67e7b3946df930a62566c93184" dependencies = [ "bytes", - "http", + "http 1.1.0", ] [[package]] @@ -821,8 +1123,8 @@ checksum = "793429d76616a256bcb62c2a2ec2bed781c8307e797e2598c50010f2bee2544f" dependencies = [ "bytes", "futures-util", - "http", - "http-body", + "http 1.1.0", + "http-body 1.0.1", "pin-project-lite", ] @@ -838,6 +1140,30 @@ version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" +[[package]] +name = "hyper" +version = "0.14.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a152ddd61dfaec7273fe8419ab357f33aee0d914c5f4efbf0d96fa749eea5ec9" +dependencies = [ + "bytes", + "futures-channel", + "futures-core", + "futures-util", + "h2 0.3.26", + "http 0.2.12", + "http-body 0.4.6", + "httparse", + "httpdate", + "itoa", + "pin-project-lite", + "socket2", + "tokio", + "tower-service", + "tracing", + "want", +] + [[package]] name = "hyper" version = "1.4.1" @@ -847,9 +1173,9 @@ dependencies = [ "bytes", "futures-channel", "futures-util", - "h2", - "http", - "http-body", + "h2 0.4.5", + "http 1.1.0", + "http-body 1.0.1", "httparse", "httpdate", "itoa", @@ -859,13 +1185,27 @@ dependencies = [ "want", ] +[[package]] +name = "hyper-rustls" +version = "0.24.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec3efd23720e2049821a693cbc7e65ea87c72f1c58ff2f9522ff332b1491e590" +dependencies = [ + "futures-util", + "http 0.2.12", + "hyper 0.14.30", + "rustls", + "tokio", + "tokio-rustls", +] + [[package]] name = "hyper-timeout" version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3203a961e5c83b6f5498933e78b6b263e208c197b63e9c6c53cc82ffd3f63793" dependencies = [ - "hyper", + "hyper 1.4.1", "hyper-util", "pin-project-lite", "tokio", @@ -881,9 +1221,9 @@ dependencies = [ "bytes", "futures-channel", "futures-util", - "http", - "http-body", - "hyper", + "http 1.1.0", + "http-body 1.0.1", + "hyper 1.4.1", "pin-project-lite", "socket2", "tokio", @@ -915,6 +1255,12 @@ dependencies = [ "cc", ] +[[package]] +name = "ident_case" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9e0384b61958566e926dc50660321d12159025e767c18e043daf26b70104c39" + [[package]] name = "idna" version = "0.5.0" @@ -925,6 +1271,12 @@ dependencies = [ "unicode-normalization", ] +[[package]] +name = "indenter" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ce23b50ad8242c51a442f3ff322d56b02f08852c77e4c0b4d3fd684abc89c683" + [[package]] name = "indexmap" version = "1.9.3" @@ -945,6 +1297,12 @@ dependencies = [ "hashbrown 0.14.5", ] +[[package]] +name = "ipnet" +version = "2.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f518f335dce6725a761382244631d86cf0ccb2863413590b31338feb467f9c3" + [[package]] name = "is_terminal_polyfill" version = "1.70.1" @@ -996,6 +1354,17 @@ version = "0.2.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4ec2a862134d2a7d32d7983ddcdd1c4923530833c9f2ea1a44fc5fa473989058" +[[package]] +name = "libredox" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c0ff37bd590ca25063e35af745c343cb7a0271906fb7b37e4813e8f79f00268d" +dependencies = [ + "bitflags 2.6.0", + "libc", + "redox_syscall 0.5.3", +] + [[package]] name = "libsqlite3-sys" version = "0.27.0" @@ -1118,6 +1487,7 @@ dependencies = [ "bytes", "chrono", "clap", + "dagger-sdk", "dotenv", "drift", "mad", @@ -1224,6 +1594,12 @@ version = "1.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92" +[[package]] +name = "option-ext" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "04744f49eae99ab78e0d5c0b603ab218f515ea8cfe5a456d7629ad883a3b6e7d" + [[package]] name = "overload" version = "0.1.1" @@ -1333,6 +1709,16 @@ version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d231b230927b5e4ad203db57bbcbee2802f6bce620b1e4a9024a07d94e2907ec" +[[package]] +name = "platform-info" +version = "2.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d5ff316b9c4642feda973c18f0decd6c8b0919d4722566f6e4337cce0dd88217" +dependencies = [ + "libc", + "winapi", +] + [[package]] name = "powerfmt" version = "0.2.0" @@ -1446,6 +1832,17 @@ dependencies = [ "bitflags 2.6.0", ] +[[package]] +name = "redox_users" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bd283d9651eeda4b2a83a43c1c91b266c40fd76ecd39a50a8c630ae69dc72891" +dependencies = [ + "getrandom", + "libredox", + "thiserror", +] + [[package]] name = "regex" version = "1.10.6" @@ -1490,6 +1887,49 @@ version = "0.8.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7a66a03ae7c801facd77a29370b4faec201768915ac14a721ba36f20bc9c209b" +[[package]] +name = "reqwest" +version = "0.11.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dd67538700a17451e7cba03ac727fb961abb7607553461627b97de0b89cf4a62" +dependencies = [ + "base64 0.21.7", + "bytes", + "encoding_rs", + "futures-core", + "futures-util", + "h2 0.3.26", + "http 0.2.12", + "http-body 0.4.6", + "hyper 0.14.30", + "hyper-rustls", + "ipnet", + "js-sys", + "log", + "mime", + "once_cell", + "percent-encoding", + "pin-project-lite", + "rustls", + "rustls-pemfile", + "serde", + "serde_json", + "serde_urlencoded", + "sync_wrapper 0.1.2", + "system-configuration", + "tokio", + "tokio-rustls", + "tokio-util", + "tower-service", + "url", + "wasm-bindgen", + "wasm-bindgen-futures", + "wasm-streams", + "web-sys", + "webpki-roots", + "winreg", +] + [[package]] name = "ring" version = "0.17.8" @@ -1550,6 +1990,7 @@ version = "0.21.12" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f56a14d1f48b391359b22f731fd4bd7e43c97f3c50eee276f3aa09c94784d3e" dependencies = [ + "log", "ring", "rustls-webpki", "sct", @@ -1622,6 +2063,19 @@ dependencies = [ "syn 2.0.74", ] +[[package]] +name = "serde_graphql_input" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e7b3ed302fb48549bd1b0df59d180655f0eb621d71a3924c68e1af9aed4f6a6a" +dependencies = [ + "anyhow", + "itoa", + "serde", + "tokio", + "tracing", +] + [[package]] name = "serde_json" version = "1.0.125" @@ -1982,6 +2436,12 @@ dependencies = [ "unicode-properties", ] +[[package]] +name = "strsim" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623" + [[package]] name = "strsim" version = "0.11.1" @@ -2028,6 +2488,38 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a7065abeca94b6a8a577f9bd45aa0867a2238b74e8eb67cf10d492bc39351394" +[[package]] +name = "system-configuration" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba3a3adc5c275d719af8cb4272ea1c4a6d668a777f37e115f6d11ddbc1c8e0e7" +dependencies = [ + "bitflags 1.3.2", + "core-foundation", + "system-configuration-sys", +] + +[[package]] +name = "system-configuration-sys" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a75fb188eb626b924683e3b95e3a48e63551fcfb51949de2f06a9d91dbee93c9" +dependencies = [ + "core-foundation-sys", + "libc", +] + +[[package]] +name = "tar" +version = "0.4.41" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cb797dad5fb5b76fcf519e702f4a589483b5ef06567f160c392832c1f5e44909" +dependencies = [ + "filetime", + "libc", + "xattr", +] + [[package]] name = "tempfile" version = "3.12.0" @@ -2146,6 +2638,16 @@ dependencies = [ "syn 2.0.74", ] +[[package]] +name = "tokio-rustls" +version = "0.24.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c28327cf380ac148141087fbfb9de9d7bd4e84ab5d2c28fbc911d753de8a7081" +dependencies = [ + "rustls", + "tokio", +] + [[package]] name = "tokio-stream" version = "0.1.15" @@ -2181,11 +2683,11 @@ dependencies = [ "axum", "base64 0.22.1", "bytes", - "h2", - "http", - "http-body", + "h2 0.4.5", + "http 1.1.0", + "http-body 1.0.1", "http-body-util", - "hyper", + "hyper 1.4.1", "hyper-timeout", "hyper-util", "percent-encoding", @@ -2228,8 +2730,8 @@ checksum = "1e9cd434a998747dd2c4276bc96ee2e0c7a2eadf3cae88e52be55a05fa9053f5" dependencies = [ "bitflags 2.6.0", "bytes", - "http", - "http-body", + "http 1.1.0", + "http-body 1.0.1", "http-body-util", "pin-project-lite", "tower-layer", @@ -2383,6 +2885,15 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "39ec24b3121d976906ece63c9daad25b85969647682eee313cb5779fdd69e14e" +[[package]] +name = "unreachable" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "382810877fe448991dfc7f0dd6e3ae5d58088fd0ea5e35189655f84e6814fa56" +dependencies = [ + "void", +] + [[package]] name = "untrusted" version = "0.9.0" @@ -2439,6 +2950,12 @@ version = "0.9.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a" +[[package]] +name = "void" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a02e4885ed3bc0f2de90ea6dd45ebcbb66dacffe03547fadbb0eeae2770887d" + [[package]] name = "want" version = "0.3.1" @@ -2486,6 +3003,18 @@ dependencies = [ "wasm-bindgen-shared", ] +[[package]] +name = "wasm-bindgen-futures" +version = "0.4.43" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "61e9300f63a621e96ed275155c108eb6f843b6a26d053f122ab69724559dc8ed" +dependencies = [ + "cfg-if", + "js-sys", + "wasm-bindgen", + "web-sys", +] + [[package]] name = "wasm-bindgen-macro" version = "0.2.93" @@ -2515,6 +3044,29 @@ version = "0.2.93" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c62a0a307cb4a311d3a07867860911ca130c3494e8c2719593806c08bc5d0484" +[[package]] +name = "wasm-streams" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b65dc4c90b63b118468cf747d8bf3566c1913ef60be765b5730ead9e0a3ba129" +dependencies = [ + "futures-util", + "js-sys", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", +] + +[[package]] +name = "web-sys" +version = "0.3.70" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "26fdeaafd9bd129f65e7c031593c24d62186301e0c72c8978fa1678be7d532c0" +dependencies = [ + "js-sys", + "wasm-bindgen", +] + [[package]] name = "webpki-roots" version = "0.25.4" @@ -2710,6 +3262,27 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec" +[[package]] +name = "winreg" +version = "0.50.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "524e57b2c537c0f9b1e69f1965311ec12182b4122e45035b1508cd24d2adadb1" +dependencies = [ + "cfg-if", + "windows-sys 0.48.0", +] + +[[package]] +name = "xattr" +version = "1.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8da84f1a25939b27f6820d92aed108f83ff920fdf11a7b19366c27c4cda81d4f" +dependencies = [ + "libc", + "linux-raw-sys", + "rustix", +] + [[package]] name = "zerocopy" version = "0.7.35" diff --git a/README.md b/README.md index ea63212..6791d34 100644 --- a/README.md +++ b/README.md @@ -45,6 +45,12 @@ A topic at its most basic is a computational unit implementing a certain interfa The most simple is a source and sink, where we respectively push or pull data from the topics. -### What does it look like +A component implements either or all of 3 sdk interfaces -As part of nodata, you'll be given nodata the cli. The cli can bootstrap a variety of components, +1. Create a new sample rust application +2. Add dependency nodata-component +3. In the main functin use: nodata_component::component +4. Implement the interfaces you want to use +5. Build the application into a dockerfile, optionally use the nodata cli to build the app +6. Register the application as a component +7. nodata client add-component --image docker.io/kjuulh/nodata-example-transform:latest --tranform `:` diff --git a/crates/nodata/Cargo.toml b/crates/nodata/Cargo.toml index fe1b564..a92a264 100644 --- a/crates/nodata/Cargo.toml +++ b/crates/nodata/Cargo.toml @@ -31,6 +31,7 @@ prost = "0.13.1" prost-types = "0.13.1" chrono = { version = "0.4.38", features = ["serde"] } tokio-stream = "0.1.15" +dagger-sdk = "0.11.10" [dev-dependencies] tracing-test = "0.2.5" diff --git a/crates/nodata/src/component.rs b/crates/nodata/src/component.rs new file mode 100644 index 0000000..c61f1b3 --- /dev/null +++ b/crates/nodata/src/component.rs @@ -0,0 +1,24 @@ +use crate::dagger_engine::DaggerEngine; + +pub struct Component { + dagger_engine: DaggerEngine, +} + +impl Component { + pub fn new(dagger_engine: DaggerEngine) -> Self { + Self { dagger_engine } + } + pub async fn start_component(&mut self) -> anyhow::Result<()> { + self.dagger_engine.start().await?; + + Ok(()) + } + + pub async fn close_component(&mut self) -> anyhow::Result<()> { + Ok(()) + } + + pub async fn handle_transform_message(&mut self) -> anyhow::Result<()> { + Ok(()) + } +} diff --git a/crates/nodata/src/dagger_engine.rs b/crates/nodata/src/dagger_engine.rs new file mode 100644 index 0000000..f078963 --- /dev/null +++ b/crates/nodata/src/dagger_engine.rs @@ -0,0 +1,260 @@ +use std::sync::Arc; + +use anyhow::Context; +use dagger_sdk::{PortForward, ServiceUpOptsBuilder}; +use tokio::net::TcpListener; +use tokio_util::sync::CancellationToken; + +use crate::grpc_component::GrpcComponentClient; + +struct DaggerConn { + client: dagger_sdk::Query, + cancellation_token: CancellationToken, +} + +impl DaggerConn { + pub fn new(client: &dagger_sdk::Query) -> Self { + Self { + client: client.clone(), + cancellation_token: CancellationToken::new(), + } + } + + pub async fn start_container( + &self, + name: &str, + image: &str, + cancellation_token: CancellationToken, + ) -> anyhow::Result { + let client = self.client.clone(); + + // Bind to the os, and let it select a random port above > 30000 + let component_listener = match TcpListener::bind("127.0.0.1:0").await { + Ok(listener) => listener, + Err(e) => { + tracing::warn!(error = e.to_string(), "failed to allocate port"); + anyhow::bail!(e); + } + }; + + let port = component_listener + .local_addr() + .context("failed to find a valid random port, you may've run out")? + .port(); + + // Let the blocking container run in the background, maintained by the cancellation token handle in the dagger container + let container_name = name.to_string(); + let container_token = cancellation_token.child_token(); + let container_image = image.to_string(); + + tokio::spawn(async move { + tokio::select! { + _ = container_token.cancelled() => {}, + res = spawn_container(&client, &container_image, port) => { + if let Err(e) = res { + tracing::warn!(error=e.to_string(), "container {} failed", container_name); + } + } + } + }); + + let grpc = match GrpcComponentClient::new(format!("127.0.0.1:{}", port)).await { + Ok(grpc) => grpc, + Err(e) => { + tracing::warn!( + error = e.to_string(), + "failed to bootstrap grpc component, service may not be up yet." + ); + + anyhow::bail!(e); + } + }; + + match grpc.ping().await { + Ok(_) => { + // TODO: Finally send something back to the caller + } + Err(e) => { + tracing::warn!( + error = e.to_string(), + "failed to ping grpc server, service may not be up yet." + ); + + anyhow::bail!("failed to ping container"); + } + } + + Ok(DaggerContainer { + name: name.into(), + image: image.into(), + handle: cancellation_token, + url: format!("127.0.0.1:{}", port), + }) + } +} + +pub struct DaggerEngine { + cancellation: CancellationToken, + + dagger_conn: Arc>>, +} + +impl DaggerEngine { + pub fn new() -> Self { + Self { + cancellation: CancellationToken::default(), + dagger_conn: Arc::default(), + } + } + + pub async fn start(&mut self) -> anyhow::Result<()> { + let cancellation = self.cancellation.child_token(); + let dagger_conn = self.dagger_conn.clone(); + + tokio::spawn(async move { + let mut dagger_conn_handle = dagger_conn.lock().await; + + if dagger_conn_handle.is_none() { + let mut dagger_conn = dagger_client(cancellation.child_token()).await; + + if let Some(dagger_conn) = dagger_conn.recv().await { + *dagger_conn_handle = Some(dagger_conn); + } + } + }); + + Ok(()) + } + + pub async fn stop(&mut self) -> anyhow::Result<()> { + self.cancellation.cancel(); + + Ok(()) + } + + pub async fn start_container( + &self, + name: impl Into, + image: impl Into, + ) -> anyhow::Result { + let name = name.into(); + let image = image.into(); + + for i in 0..5 { + let channel = self.dagger_conn.lock().await; + if channel.is_some() { + // TODO: fill out + + match channel + .as_ref() + .unwrap() + .start_container(&name, &image, self.cancellation.child_token()) + .await + { + Ok(container) => return Ok(container), + Err(e) => { + tracing::info!( + container_name = name, + error = e.to_string(), + "failed to get container" + ); + } + } + } + + tokio::time::sleep(std::time::Duration::from_secs(i)).await + } + + anyhow::bail!("failed to find a valid channel, aborting") + } +} + +pub struct DaggerContainer { + name: String, + handle: CancellationToken, + image: String, + url: String, +} + +impl DaggerContainer { + pub async fn grpc_handle(&self) -> anyhow::Result { + let client = GrpcComponentClient::new(&self.url).await?; + + Ok(client) + } +} + +async fn dagger_client(cancellation: CancellationToken) -> tokio::sync::mpsc::Receiver { + let (tx, rx) = tokio::sync::mpsc::channel::(1); + + tokio::spawn(async move { + if let Err(e) = dagger_sdk::connect(|client| async move { + tx.send(DaggerConn::new(&client)).await?; + + cancellation.cancelled().await; + + Ok(()) + }) + .await + { + tracing::warn!( + error = e.to_string(), + "failed to handle dagger connect, components may not be executed as they should " + ); + } + }); + + rx +} + +async fn spawn_container( + client: &dagger_sdk::Query, + image: &str, + outer_port: u16, +) -> anyhow::Result<()> { + let service = client + .container() + .from(image) + .with_exposed_port(80) + .as_service(); + + service + .up_opts( + ServiceUpOptsBuilder::default() + .ports(vec![PortForward { + backend: 80, + frontend: outer_port as isize, + protocol: dagger_sdk::NetworkProtocol::Tcp, + }]) + .build()?, + ) + .await?; + + Ok(()) +} + +#[cfg(test)] +mod tests { + use tracing_test::traced_test; + + use super::*; + + #[tokio::test] + #[traced_test] + async fn test_can_use_dagger_engine() -> anyhow::Result<()> { + let mut dagger_engine = DaggerEngine::new(); + + tracing::info!("starting dagger engine"); + dagger_engine.start().await?; + + tracing::info!("starting dagger container"); + let container = dagger_engine + .start_container("some_name", "nginx:latest") + .await?; + + tracing::info!("getting grpc handle"); + let _ = container.grpc_handle().await?; + + Ok(()) + } +} diff --git a/crates/nodata/src/gen/nodata.v1.rs b/crates/nodata/src/gen/nodata.v1.rs index 8624c76..94757cc 100644 --- a/crates/nodata/src/gen/nodata.v1.rs +++ b/crates/nodata/src/gen/nodata.v1.rs @@ -2,60 +2,78 @@ #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct PublishEventRequest { - #[prost(string, tag = "1")] + #[prost(string, tag="1")] pub topic: ::prost::alloc::string::String, - #[prost(message, optional, tag = "2")] + #[prost(message, optional, tag="2")] pub published: ::core::option::Option<::prost_types::Timestamp>, - #[prost(string, tag = "3")] + #[prost(string, tag="3")] pub key: ::prost::alloc::string::String, - #[prost(bytes = "vec", tag = "4")] + #[prost(bytes="vec", tag="4")] pub value: ::prost::alloc::vec::Vec, - #[prost(string, optional, tag = "5")] + #[prost(string, optional, tag="5")] pub id: ::core::option::Option<::prost::alloc::string::String>, } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] -pub struct PublishEventResponse {} +pub struct PublishEventResponse { +} #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] -pub struct GetTopicsRequest {} +pub struct GetTopicsRequest { +} #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct GetTopicsResponse { - #[prost(string, repeated, tag = "1")] + #[prost(string, repeated, tag="1")] pub topics: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct GetKeysRequest { - #[prost(string, tag = "1")] + #[prost(string, tag="1")] pub topic: ::prost::alloc::string::String, } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct GetKeysResponse { - #[prost(string, repeated, tag = "1")] + #[prost(string, repeated, tag="1")] pub keys: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct SubscribeRequest { - #[prost(string, tag = "1")] + #[prost(string, tag="1")] pub topic: ::prost::alloc::string::String, - #[prost(string, tag = "2")] + #[prost(string, tag="2")] pub key: ::prost::alloc::string::String, } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct SubscribeResponse { - #[prost(string, tag = "1")] + #[prost(string, tag="1")] pub id: ::prost::alloc::string::String, - #[prost(message, optional, tag = "2")] + #[prost(message, optional, tag="2")] pub published: ::core::option::Option<::prost_types::Timestamp>, - #[prost(uint64, tag = "3")] + #[prost(uint64, tag="3")] pub offset: u64, - #[prost(bytes = "vec", tag = "4")] + #[prost(bytes="vec", tag="4")] pub value: ::prost::alloc::vec::Vec, } +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct HandleMsgRequest { +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct HandleMsgResponse { +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct PingRequest { +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct PingResponse { +} include!("nodata.v1.tonic.rs"); -// @@protoc_insertion_point(module) +// @@protoc_insertion_point(module) \ No newline at end of file diff --git a/crates/nodata/src/gen/nodata.v1.tonic.rs b/crates/nodata/src/gen/nodata.v1.tonic.rs index 7546240..6fac069 100644 --- a/crates/nodata/src/gen/nodata.v1.tonic.rs +++ b/crates/nodata/src/gen/nodata.v1.tonic.rs @@ -527,3 +527,373 @@ pub mod no_data_service_server { const NAME: &'static str = "nodata.v1.NoDataService"; } } +/// Generated client implementations. +pub mod no_data_component_client { + #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] + use tonic::codegen::*; + use tonic::codegen::http::Uri; + /// + #[derive(Debug, Clone)] + pub struct NoDataComponentClient { + inner: tonic::client::Grpc, + } + impl NoDataComponentClient { + /// Attempt to create a new client by connecting to a given endpoint. + pub async fn connect(dst: D) -> Result + where + D: TryInto, + D::Error: Into, + { + let conn = tonic::transport::Endpoint::new(dst)?.connect().await?; + Ok(Self::new(conn)) + } + } + impl NoDataComponentClient + where + T: tonic::client::GrpcService, + T::Error: Into, + T::ResponseBody: Body + Send + 'static, + ::Error: Into + Send, + { + pub fn new(inner: T) -> Self { + let inner = tonic::client::Grpc::new(inner); + Self { inner } + } + pub fn with_origin(inner: T, origin: Uri) -> Self { + let inner = tonic::client::Grpc::with_origin(inner, origin); + Self { inner } + } + pub fn with_interceptor( + inner: T, + interceptor: F, + ) -> NoDataComponentClient> + where + F: tonic::service::Interceptor, + T::ResponseBody: Default, + T: tonic::codegen::Service< + http::Request, + Response = http::Response< + >::ResponseBody, + >, + >, + , + >>::Error: Into + Send + Sync, + { + NoDataComponentClient::new(InterceptedService::new(inner, interceptor)) + } + /// Compress requests with the given encoding. + /// + /// This requires the server to support it otherwise it might respond with an + /// error. + #[must_use] + pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.inner = self.inner.send_compressed(encoding); + self + } + /// Enable decompressing responses. + #[must_use] + pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.inner = self.inner.accept_compressed(encoding); + self + } + /// Limits the maximum size of a decoded message. + /// + /// Default: `4MB` + #[must_use] + pub fn max_decoding_message_size(mut self, limit: usize) -> Self { + self.inner = self.inner.max_decoding_message_size(limit); + self + } + /// Limits the maximum size of an encoded message. + /// + /// Default: `usize::MAX` + #[must_use] + pub fn max_encoding_message_size(mut self, limit: usize) -> Self { + self.inner = self.inner.max_encoding_message_size(limit); + self + } + /// + pub async fn transform_msg( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/nodata.v1.NoDataComponent/TransformMsg", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert(GrpcMethod::new("nodata.v1.NoDataComponent", "TransformMsg")); + self.inner.unary(req, path, codec).await + } + /// + pub async fn ping( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result, tonic::Status> { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/nodata.v1.NoDataComponent/Ping", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert(GrpcMethod::new("nodata.v1.NoDataComponent", "Ping")); + self.inner.unary(req, path, codec).await + } + } +} +/// Generated server implementations. +pub mod no_data_component_server { + #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] + use tonic::codegen::*; + /// Generated trait containing gRPC methods that should be implemented for use with NoDataComponentServer. + #[async_trait] + pub trait NoDataComponent: Send + Sync + 'static { + /// + async fn transform_msg( + &self, + request: tonic::Request, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; + /// + async fn ping( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status>; + } + /// + #[derive(Debug)] + pub struct NoDataComponentServer { + inner: _Inner, + accept_compression_encodings: EnabledCompressionEncodings, + send_compression_encodings: EnabledCompressionEncodings, + max_decoding_message_size: Option, + max_encoding_message_size: Option, + } + struct _Inner(Arc); + impl NoDataComponentServer { + pub fn new(inner: T) -> Self { + Self::from_arc(Arc::new(inner)) + } + pub fn from_arc(inner: Arc) -> Self { + let inner = _Inner(inner); + Self { + inner, + accept_compression_encodings: Default::default(), + send_compression_encodings: Default::default(), + max_decoding_message_size: None, + max_encoding_message_size: None, + } + } + pub fn with_interceptor( + inner: T, + interceptor: F, + ) -> InterceptedService + where + F: tonic::service::Interceptor, + { + InterceptedService::new(Self::new(inner), interceptor) + } + /// Enable decompressing requests with the given encoding. + #[must_use] + pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.accept_compression_encodings.enable(encoding); + self + } + /// Compress responses with the given encoding, if the client supports it. + #[must_use] + pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.send_compression_encodings.enable(encoding); + self + } + /// Limits the maximum size of a decoded message. + /// + /// Default: `4MB` + #[must_use] + pub fn max_decoding_message_size(mut self, limit: usize) -> Self { + self.max_decoding_message_size = Some(limit); + self + } + /// Limits the maximum size of an encoded message. + /// + /// Default: `usize::MAX` + #[must_use] + pub fn max_encoding_message_size(mut self, limit: usize) -> Self { + self.max_encoding_message_size = Some(limit); + self + } + } + impl tonic::codegen::Service> for NoDataComponentServer + where + T: NoDataComponent, + B: Body + Send + 'static, + B::Error: Into + Send + 'static, + { + type Response = http::Response; + type Error = std::convert::Infallible; + type Future = BoxFuture; + fn poll_ready( + &mut self, + _cx: &mut Context<'_>, + ) -> Poll> { + Poll::Ready(Ok(())) + } + fn call(&mut self, req: http::Request) -> Self::Future { + let inner = self.inner.clone(); + match req.uri().path() { + "/nodata.v1.NoDataComponent/TransformMsg" => { + #[allow(non_camel_case_types)] + struct TransformMsgSvc(pub Arc); + impl< + T: NoDataComponent, + > tonic::server::UnaryService + for TransformMsgSvc { + type Response = super::HandleMsgResponse; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::transform_msg(&inner, request).await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let inner = inner.0; + let method = TransformMsgSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + "/nodata.v1.NoDataComponent/Ping" => { + #[allow(non_camel_case_types)] + struct PingSvc(pub Arc); + impl< + T: NoDataComponent, + > tonic::server::UnaryService for PingSvc { + type Response = super::PingResponse; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::ping(&inner, request).await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let inner = inner.0; + let method = PingSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + _ => { + Box::pin(async move { + Ok( + http::Response::builder() + .status(200) + .header("grpc-status", "12") + .header("content-type", "application/grpc") + .body(empty_body()) + .unwrap(), + ) + }) + } + } + } + } + impl Clone for NoDataComponentServer { + fn clone(&self) -> Self { + let inner = self.inner.clone(); + Self { + inner, + accept_compression_encodings: self.accept_compression_encodings, + send_compression_encodings: self.send_compression_encodings, + max_decoding_message_size: self.max_decoding_message_size, + max_encoding_message_size: self.max_encoding_message_size, + } + } + } + impl Clone for _Inner { + fn clone(&self) -> Self { + Self(Arc::clone(&self.0)) + } + } + impl std::fmt::Debug for _Inner { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{:?}", self.0) + } + } + impl tonic::server::NamedService for NoDataComponentServer { + const NAME: &'static str = "nodata.v1.NoDataComponent"; + } +} diff --git a/crates/nodata/src/grpc_component.rs b/crates/nodata/src/grpc_component.rs new file mode 100644 index 0000000..f149d0f --- /dev/null +++ b/crates/nodata/src/grpc_component.rs @@ -0,0 +1,26 @@ +use no_data_component_client::NoDataComponentClient; +use tonic::transport::Channel; + +include!("gen/nodata.v1.rs"); + +pub struct GrpcComponentClient { + host_name: String, +} + +impl GrpcComponentClient { + pub async fn new(host_name: impl Into) -> anyhow::Result { + Ok(Self { + host_name: host_name.into(), + }) + } + + pub async fn ping(&self) -> anyhow::Result> { + self.create_client().await?.ping(PingRequest {}).await?; + + Ok(Some(())) + } + + pub(crate) async fn create_client(&self) -> anyhow::Result> { + Ok(NoDataComponentClient::connect(self.host_name.clone()).await?) + } +} diff --git a/crates/nodata/src/main.rs b/crates/nodata/src/main.rs index 21bb649..062d796 100644 --- a/crates/nodata/src/main.rs +++ b/crates/nodata/src/main.rs @@ -1,8 +1,11 @@ mod broker; +mod dagger_engine; mod grpc; +mod grpc_component; mod http; mod state; +mod component; mod services; use std::net::SocketAddr; diff --git a/proto/nodata/v1/nomicon.proto b/proto/nodata/v1/nodata.proto similarity index 100% rename from proto/nodata/v1/nomicon.proto rename to proto/nodata/v1/nodata.proto diff --git a/proto/nodata/v1/nodata_component.proto b/proto/nodata/v1/nodata_component.proto new file mode 100644 index 0000000..af4df18 --- /dev/null +++ b/proto/nodata/v1/nodata_component.proto @@ -0,0 +1,16 @@ +syntax = "proto3"; + +import "google/protobuf/timestamp.proto"; + +package nodata.v1; + +service NoDataComponent { + rpc TransformMsg(HandleMsgRequest) returns (HandleMsgResponse) {} + rpc Ping(PingRequest) returns (PingResponse) {} +} + +message HandleMsgRequest {} +message HandleMsgResponse {} + +message PingRequest {} +message PingResponse {}