diff --git a/.env b/.env index 8b13789..e402c0c 100644 --- a/.env +++ b/.env @@ -1 +1,5 @@ - +EXTERNAL_HOST=http://localhost:3000 +PROCESS_HOST=http://localhost:7900 +SERVICE_HOST=127.0.0.1:3000 +DISCOVERY_HOST=http://127.0.0.1:3000 +RUST_LOG=h2=warn,debug diff --git a/Cargo.lock b/Cargo.lock index a4a6c30..bd19d8e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -196,7 +196,7 @@ dependencies = [ "miniz_oxide", "object", "rustc-demangle", - "windows-targets", + "windows-targets 0.52.6", ] [[package]] @@ -211,6 +211,12 @@ version = "2.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b048fb63fd8b5923fc5aa7b340d8e156aec7ec02f0c78fa8a6ddc2613f6f71de" +[[package]] +name = "bumpalo" +version = "3.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "79296716171880943b8470b5f8d03aa55eb2e645a4874bdbb28adb49162e012c" + [[package]] name = "byteorder" version = "1.5.0" @@ -247,15 +253,19 @@ dependencies = [ "axum", "bytes", "clap", + "dirs", "dotenv", + "futures", "nodrift", "notmad", "prost", "prost-types", + "reqwest", "rusqlite", "serde", "tokio", "tokio-util", + "toml", "tonic", "tower-http", "tracing", @@ -309,6 +319,16 @@ version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5b63caa9aa9397e2d9480a9b13673856c78d8ac123288526c37d7839f2a86990" +[[package]] +name = "core-foundation" +version = "0.9.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "91e195e091a93c46f7102ec7818a2aa394e1e1771c3ab4825963fa03e45afb8f" +dependencies = [ + "core-foundation-sys", + "libc", +] + [[package]] name = "core-foundation" version = "0.10.0" @@ -325,6 +345,38 @@ version = "0.8.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b" +[[package]] +name = "dirs" +version = "5.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "44c45a9d03d6676652bcb5e724c7e988de1acad23a711b5217ab9cbecbec2225" +dependencies = [ + "dirs-sys", +] + +[[package]] +name = "dirs-sys" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "520f05a5cbd335fae5a99ff7a6ab8627577660ee5cfd6a94a6a929b52ff0321c" +dependencies = [ + "libc", + "option-ext", + "redox_users", + "windows-sys 0.48.0", +] + +[[package]] +name = "displaydoc" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97369cbbc041bc366949bc74d34658d6cda5621039731c6310521892a3a20ae0" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "dotenv" version = "0.15.0" @@ -337,12 +389,31 @@ version = "1.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "60b1af1c220855b6ceac025d3f6ecdd2b7c4894bfe9cd9bda4fbb4bc7c0d4cf0" +[[package]] +name = "encoding_rs" +version = "0.8.35" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "75030f3c4f45dafd7586dd6780965a8c7e8e285a5ecb86713e63a79c5b2766f3" +dependencies = [ + "cfg-if", +] + [[package]] name = "equivalent" version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5" +[[package]] +name = "errno" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "534c5cf6194dfab3db3242765c03bbe257cf92f22b38f6bc0c58d59108a820ba" +dependencies = [ + "libc", + "windows-sys 0.52.0", +] + [[package]] name = "fallible-iterator" version = "0.3.0" @@ -355,12 +426,33 @@ version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7360491ce676a36bf9bb3c56c1aa791658183a54d2744120f27285738d90465a" +[[package]] +name = "fastrand" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "486f806e73c5707928240ddc295403b1b93c96a02038563881c4a2fd84b81ac4" + [[package]] name = "fnv" version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" +[[package]] +name = "foreign-types" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1" +dependencies = [ + "foreign-types-shared", +] + +[[package]] +name = "foreign-types-shared" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b" + [[package]] name = "form_urlencoded" version = "1.2.1" @@ -604,6 +696,23 @@ dependencies = [ "want", ] +[[package]] +name = "hyper-rustls" +version = "0.27.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "08afdbb5c31130e3034af566421053ab03787c640246a446327f550d11bcb333" +dependencies = [ + "futures-util", + "http", + "hyper", + "hyper-util", + "rustls", + "rustls-pki-types", + "tokio", + "tokio-rustls", + "tower-service", +] + [[package]] name = "hyper-timeout" version = "0.5.2" @@ -617,6 +726,22 @@ dependencies = [ "tower-service", ] +[[package]] +name = "hyper-tls" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "70206fc6890eaca9fde8a0bf71caa2ddfc9fe045ac9e5c70df101a7dbde866e0" +dependencies = [ + "bytes", + "http-body-util", + "hyper", + "hyper-util", + "native-tls", + "tokio", + "tokio-native-tls", + "tower-service", +] + [[package]] name = "hyper-util" version = "0.1.10" @@ -636,6 +761,145 @@ dependencies = [ "tracing", ] +[[package]] +name = "icu_collections" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "db2fa452206ebee18c4b5c2274dbf1de17008e874b4dc4f0aea9d01ca79e4526" +dependencies = [ + "displaydoc", + "yoke", + "zerofrom", + "zerovec", +] + +[[package]] +name = "icu_locid" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "13acbb8371917fc971be86fc8057c41a64b521c184808a698c02acc242dbf637" +dependencies = [ + "displaydoc", + "litemap", + "tinystr", + "writeable", + "zerovec", +] + +[[package]] +name = "icu_locid_transform" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "01d11ac35de8e40fdeda00d9e1e9d92525f3f9d887cdd7aa81d727596788b54e" +dependencies = [ + "displaydoc", + "icu_locid", + "icu_locid_transform_data", + "icu_provider", + "tinystr", + "zerovec", +] + +[[package]] +name = "icu_locid_transform_data" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fdc8ff3388f852bede6b579ad4e978ab004f139284d7b28715f773507b946f6e" + +[[package]] +name = "icu_normalizer" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "19ce3e0da2ec68599d193c93d088142efd7f9c5d6fc9b803774855747dc6a84f" +dependencies = [ + "displaydoc", + "icu_collections", + "icu_normalizer_data", + "icu_properties", + "icu_provider", + "smallvec", + "utf16_iter", + "utf8_iter", + "write16", + "zerovec", +] + +[[package]] +name = "icu_normalizer_data" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8cafbf7aa791e9b22bec55a167906f9e1215fd475cd22adfcf660e03e989516" + +[[package]] +name = "icu_properties" +version = "1.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "93d6020766cfc6302c15dbbc9c8778c37e62c14427cb7f6e601d849e092aeef5" +dependencies = [ + "displaydoc", + "icu_collections", + "icu_locid_transform", + "icu_properties_data", + "icu_provider", + "tinystr", + "zerovec", +] + +[[package]] +name = "icu_properties_data" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "67a8effbc3dd3e4ba1afa8ad918d5684b8868b3b26500753effea8d2eed19569" + +[[package]] +name = "icu_provider" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6ed421c8a8ef78d3e2dbc98a973be2f3770cb42b606e3ab18d6237c4dfde68d9" +dependencies = [ + "displaydoc", + "icu_locid", + "icu_provider_macros", + "stable_deref_trait", + "tinystr", + "writeable", + "yoke", + "zerofrom", + "zerovec", +] + +[[package]] +name = "icu_provider_macros" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ec89e9337638ecdc08744df490b221a7399bf8d164eb52a665454e60e075ad6" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "idna" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "686f825264d630750a544639377bae737628043f20d38bbc029e8f29ea968a7e" +dependencies = [ + "idna_adapter", + "smallvec", + "utf8_iter", +] + +[[package]] +name = "idna_adapter" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "daca1df1c957320b2cf139ac61e7bd64fed304c5040df000a745aa1de3b4ef71" +dependencies = [ + "icu_normalizer", + "icu_properties", +] + [[package]] name = "indexmap" version = "1.9.3" @@ -656,6 +920,12 @@ dependencies = [ "hashbrown 0.15.1", ] +[[package]] +name = "ipnet" +version = "2.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ddc24109865250148c2e0f3d25d4f0f479571723792d3802153c60922a4fb708" + [[package]] name = "is_terminal_polyfill" version = "1.70.1" @@ -677,6 +947,15 @@ version = "1.0.13" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "540654e97a3f4470a492cd30ff187bc95d89557a903a2bbf112e2fae98104ef2" +[[package]] +name = "js-sys" +version = "0.3.72" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a88f1bda2bd75b0452a14784937d796722fdebfe50df998aeb3f0b7603019a9" +dependencies = [ + "wasm-bindgen", +] + [[package]] name = "lazy_static" version = "1.5.0" @@ -689,6 +968,16 @@ version = "0.2.164" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "433bfe06b8c75da9b2e3fbea6e5329ff87748f0b144ef75306e674c3f6f7c13f" +[[package]] +name = "libredox" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c0ff37bd590ca25063e35af745c343cb7a0271906fb7b37e4813e8f79f00268d" +dependencies = [ + "bitflags", + "libc", +] + [[package]] name = "libsqlite3-sys" version = "0.30.1" @@ -700,6 +989,18 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "linux-raw-sys" +version = "0.4.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "78b3ae25bc7c8c38cec158d1f2757ee79e9b3740fbc7ccf0e59e4b08d793fa89" + +[[package]] +name = "litemap" +version = "0.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ee93343901ab17bd981295f2cf0026d4ad018c7c31ba84549a4ddbb47a45104" + [[package]] name = "lock_api" version = "0.4.12" @@ -755,6 +1056,23 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "native-tls" +version = "0.2.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a8614eb2c83d59d1c8cc974dd3f920198647674a0a035e1af1fa58707e317466" +dependencies = [ + "libc", + "log", + "openssl", + "openssl-probe", + "openssl-sys", + "schannel", + "security-framework 2.11.1", + "security-framework-sys", + "tempfile", +] + [[package]] name = "nodrift" version = "0.2.0" @@ -763,7 +1081,7 @@ checksum = "154be26c4796e549cab55b834bb8bf6cbcd24e759ecaa6f91155464520c616ba" dependencies = [ "anyhow", "async-trait", - "thiserror", + "thiserror 1.0.69", "tokio", "tokio-util", "tracing", @@ -771,16 +1089,16 @@ dependencies = [ [[package]] name = "notmad" -version = "0.6.0" +version = "0.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "590b2938244df27d8e6b9989dd320db6499bdad8c0a2381b4d748134998f5515" +checksum = "84a91fcd59e5177fb66e5ee7009fd4d4822a73364b848ca7083345ccf1c2dab1" dependencies = [ "anyhow", "async-trait", "futures", "futures-util", "rand", - "thiserror", + "thiserror 2.0.3", "tokio", "tokio-util", "tracing", @@ -811,12 +1129,56 @@ version = "1.20.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1261fe7e33c73b354eab43b1273a57c8f967d0391e80353e51f764ac02cf6775" +[[package]] +name = "openssl" +version = "0.10.68" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6174bc48f102d208783c2c84bf931bb75927a617866870de8a4ea85597f871f5" +dependencies = [ + "bitflags", + "cfg-if", + "foreign-types", + "libc", + "once_cell", + "openssl-macros", + "openssl-sys", +] + +[[package]] +name = "openssl-macros" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "openssl-probe" version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" +[[package]] +name = "openssl-sys" +version = "0.9.104" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "45abf306cbf99debc8195b66b7346498d7b10c210de50418b5ccd7ceba08c741" +dependencies = [ + "cc", + "libc", + "pkg-config", + "vcpkg", +] + +[[package]] +name = "option-ext" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "04744f49eae99ab78e0d5c0b603ab218f515ea8cfe5a456d7629ad883a3b6e7d" + [[package]] name = "overload" version = "0.1.1" @@ -843,7 +1205,7 @@ dependencies = [ "libc", "redox_syscall", "smallvec", - "windows-targets", + "windows-targets 0.52.6", ] [[package]] @@ -988,6 +1350,60 @@ dependencies = [ "bitflags", ] +[[package]] +name = "redox_users" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba009ff324d1fc1b900bd1fdb31564febe58a8ccc8a6fdbb93b543d33b13ca43" +dependencies = [ + "getrandom", + "libredox", + "thiserror 1.0.69", +] + +[[package]] +name = "reqwest" +version = "0.12.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a77c62af46e79de0a562e1a9849205ffcb7fc1238876e9bd743357570e04046f" +dependencies = [ + "base64", + "bytes", + "encoding_rs", + "futures-core", + "futures-util", + "h2", + "http", + "http-body", + "http-body-util", + "hyper", + "hyper-rustls", + "hyper-tls", + "hyper-util", + "ipnet", + "js-sys", + "log", + "mime", + "native-tls", + "once_cell", + "percent-encoding", + "pin-project-lite", + "rustls-pemfile", + "serde", + "serde_json", + "serde_urlencoded", + "sync_wrapper 1.0.2", + "system-configuration", + "tokio", + "tokio-native-tls", + "tower-service", + "url", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", + "windows-registry", +] + [[package]] name = "ring" version = "0.17.8" @@ -1023,6 +1439,19 @@ version = "0.1.24" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "719b953e2095829ee67db738b3bfa9fa368c94900df327b3f07fe6e794d2fe1f" +[[package]] +name = "rustix" +version = "0.38.41" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d7f649912bc1495e167a6edee79151c84b1bad49748cb4f1f1167f459f6224f6" +dependencies = [ + "bitflags", + "errno", + "libc", + "linux-raw-sys", + "windows-sys 0.52.0", +] + [[package]] name = "rustls" version = "0.23.18" @@ -1047,7 +1476,7 @@ dependencies = [ "openssl-probe", "rustls-pki-types", "schannel", - "security-framework", + "security-framework 3.0.1", ] [[package]] @@ -1103,6 +1532,19 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" +[[package]] +name = "security-framework" +version = "2.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "897b2245f0b511c87893af39b033e5ca9cce68824c4d7e7630b5a1d339658d02" +dependencies = [ + "bitflags", + "core-foundation 0.9.4", + "core-foundation-sys", + "libc", + "security-framework-sys", +] + [[package]] name = "security-framework" version = "3.0.1" @@ -1110,7 +1552,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e1415a607e92bec364ea2cf9264646dcce0f91e6d65281bd6f2819cca3bf39c8" dependencies = [ "bitflags", - "core-foundation", + "core-foundation 0.10.0", "core-foundation-sys", "libc", "security-framework-sys", @@ -1168,6 +1610,15 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_spanned" +version = "0.6.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87607cb1398ed59d48732e575a4c28a7a8ebf2454b964fe3f224f2afc07909e1" +dependencies = [ + "serde", +] + [[package]] name = "serde_urlencoded" version = "0.7.1" @@ -1235,6 +1686,12 @@ version = "0.9.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" +[[package]] +name = "stable_deref_trait" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3" + [[package]] name = "strsim" version = "0.11.1" @@ -1269,6 +1726,54 @@ name = "sync_wrapper" version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0bf256ce5efdfa370213c1dabab5935a12e49f2c58d15e9eac2870d3b4f27263" +dependencies = [ + "futures-core", +] + +[[package]] +name = "synstructure" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c8af7666ab7b6390ab78131fb5b0fce11d6b7a6951602017c35fa82800708971" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "system-configuration" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3c879d448e9d986b661742763247d3693ed13609438cf3d006f51f5368a5ba6b" +dependencies = [ + "bitflags", + "core-foundation 0.9.4", + "system-configuration-sys", +] + +[[package]] +name = "system-configuration-sys" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e1d1b10ced5ca923a1fcb8d03e96b8d3268065d724548c0211415ff6ac6bac4" +dependencies = [ + "core-foundation-sys", + "libc", +] + +[[package]] +name = "tempfile" +version = "3.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "28cce251fcbc87fac86a866eeb0d6c2d536fc16d06f184bb61aeae11aa4cee0c" +dependencies = [ + "cfg-if", + "fastrand", + "once_cell", + "rustix", + "windows-sys 0.59.0", +] [[package]] name = "thiserror" @@ -1276,7 +1781,16 @@ version = "1.0.69" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b6aaf5339b578ea85b50e080feb250a3e8ae8cfcdff9a461c9ec2904bc923f52" dependencies = [ - "thiserror-impl", + "thiserror-impl 1.0.69", +] + +[[package]] +name = "thiserror" +version = "2.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c006c85c7651b3cf2ada4584faa36773bd07bac24acfb39f3c431b36d7e667aa" +dependencies = [ + "thiserror-impl 2.0.3", ] [[package]] @@ -1290,6 +1804,17 @@ dependencies = [ "syn", ] +[[package]] +name = "thiserror-impl" +version = "2.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f077553d607adc1caf65430528a576c757a71ed73944b66ebb58ef2bbd243568" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "thread_local" version = "1.1.8" @@ -1300,6 +1825,16 @@ dependencies = [ "once_cell", ] +[[package]] +name = "tinystr" +version = "0.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9117f5d4db391c1cf6927e7bea3db74b9a1c1add8f7eda9ffd5364f40f57b82f" +dependencies = [ + "displaydoc", + "zerovec", +] + [[package]] name = "tokio" version = "1.41.1" @@ -1329,6 +1864,16 @@ dependencies = [ "syn", ] +[[package]] +name = "tokio-native-tls" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbae76ab933c85776efabc971569dd6119c580d8f5d448769dec1764bf796ef2" +dependencies = [ + "native-tls", + "tokio", +] + [[package]] name = "tokio-rustls" version = "0.26.0" @@ -1364,6 +1909,40 @@ dependencies = [ "tokio", ] +[[package]] +name = "toml" +version = "0.8.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a1ed1f98e3fdc28d6d910e6737ae6ab1a93bf1985935a1193e68f93eeb68d24e" +dependencies = [ + "serde", + "serde_spanned", + "toml_datetime", + "toml_edit", +] + +[[package]] +name = "toml_datetime" +version = "0.6.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0dd7358ecb8fc2f8d014bf86f6f638ce72ba252a2c3a2572f2a795f1d23efb41" +dependencies = [ + "serde", +] + +[[package]] +name = "toml_edit" +version = "0.22.22" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ae48d6208a266e853d946088ed816055e556cc6028c5e8e2b84d9fa5dd7c7f5" +dependencies = [ + "indexmap 2.6.0", + "serde", + "serde_spanned", + "toml_datetime", + "winnow", +] + [[package]] name = "tonic" version = "0.12.3" @@ -1537,6 +2116,29 @@ version = "0.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1" +[[package]] +name = "url" +version = "2.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32f8b686cadd1473f4bd0117a5d28d36b1ade384ea9b5069a1c40aefed7fda60" +dependencies = [ + "form_urlencoded", + "idna", + "percent-encoding", +] + +[[package]] +name = "utf16_iter" +version = "1.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c8232dd3cdaed5356e0f716d285e4b40b932ac434100fe9b7e0e8e935b9e6246" + +[[package]] +name = "utf8_iter" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6c140620e7ffbb22c2dee59cafe6084a59b5ffc27a8859a5f0d494b5d52b6be" + [[package]] name = "utf8parse" version = "0.2.2" @@ -1585,6 +2187,83 @@ version = "0.11.0+wasi-snapshot-preview1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" +[[package]] +name = "wasm-bindgen" +version = "0.2.95" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "128d1e363af62632b8eb57219c8fd7877144af57558fb2ef0368d0087bddeb2e" +dependencies = [ + "cfg-if", + "once_cell", + "wasm-bindgen-macro", +] + +[[package]] +name = "wasm-bindgen-backend" +version = "0.2.95" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cb6dd4d3ca0ddffd1dd1c9c04f94b868c37ff5fac97c30b97cff2d74fce3a358" +dependencies = [ + "bumpalo", + "log", + "once_cell", + "proc-macro2", + "quote", + "syn", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-futures" +version = "0.4.45" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cc7ec4f8827a71586374db3e87abdb5a2bb3a15afed140221307c3ec06b1f63b" +dependencies = [ + "cfg-if", + "js-sys", + "wasm-bindgen", + "web-sys", +] + +[[package]] +name = "wasm-bindgen-macro" +version = "0.2.95" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e79384be7f8f5a9dd5d7167216f022090cf1f9ec128e6e6a482a2cb5c5422c56" +dependencies = [ + "quote", + "wasm-bindgen-macro-support", +] + +[[package]] +name = "wasm-bindgen-macro-support" +version = "0.2.95" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "26c6ab57572f7a24a4985830b120de1594465e5d500f24afe89e16b4e833ef68" +dependencies = [ + "proc-macro2", + "quote", + "syn", + "wasm-bindgen-backend", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-shared" +version = "0.2.95" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "65fc09f10666a9f147042251e0dda9c18f166ff7de300607007e96bdebc1068d" + +[[package]] +name = "web-sys" +version = "0.3.72" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f6488b90108c040df0fe62fa815cbdee25124641df01814dd7282749234c6112" +dependencies = [ + "js-sys", + "wasm-bindgen", +] + [[package]] name = "winapi" version = "0.3.9" @@ -1607,13 +2286,52 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" +[[package]] +name = "windows-registry" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e400001bb720a623c1c69032f8e3e4cf09984deec740f007dd2b03ec864804b0" +dependencies = [ + "windows-result", + "windows-strings", + "windows-targets 0.52.6", +] + +[[package]] +name = "windows-result" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d1043d8214f791817bab27572aaa8af63732e11bf84aa21a45a78d6c317ae0e" +dependencies = [ + "windows-targets 0.52.6", +] + +[[package]] +name = "windows-strings" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4cd9b125c486025df0eabcb585e62173c6c9eddcec5d117d3b6e8c30e2ee4d10" +dependencies = [ + "windows-result", + "windows-targets 0.52.6", +] + +[[package]] +name = "windows-sys" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "677d2418bec65e3338edb076e806bc1ec15693c5d0104683f2efe857f61056a9" +dependencies = [ + "windows-targets 0.48.5", +] + [[package]] name = "windows-sys" version = "0.52.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "282be5f36a8ce781fad8c8ae18fa3f9beff57ec1b52cb3de0789201425d9a33d" dependencies = [ - "windows-targets", + "windows-targets 0.52.6", ] [[package]] @@ -1622,7 +2340,22 @@ version = "0.59.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1e38bc4d79ed67fd075bcc251a1c39b32a1776bbe92e5bef1f0bf1f8c531853b" dependencies = [ - "windows-targets", + "windows-targets 0.52.6", +] + +[[package]] +name = "windows-targets" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a2fa6e2155d7247be68c096456083145c183cbbbc2764150dda45a87197940c" +dependencies = [ + "windows_aarch64_gnullvm 0.48.5", + "windows_aarch64_msvc 0.48.5", + "windows_i686_gnu 0.48.5", + "windows_i686_msvc 0.48.5", + "windows_x86_64_gnu 0.48.5", + "windows_x86_64_gnullvm 0.48.5", + "windows_x86_64_msvc 0.48.5", ] [[package]] @@ -1631,28 +2364,46 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9b724f72796e036ab90c1021d4780d4d3d648aca59e491e6b98e725b84e99973" dependencies = [ - "windows_aarch64_gnullvm", - "windows_aarch64_msvc", - "windows_i686_gnu", + "windows_aarch64_gnullvm 0.52.6", + "windows_aarch64_msvc 0.52.6", + "windows_i686_gnu 0.52.6", "windows_i686_gnullvm", - "windows_i686_msvc", - "windows_x86_64_gnu", - "windows_x86_64_gnullvm", - "windows_x86_64_msvc", + "windows_i686_msvc 0.52.6", + "windows_x86_64_gnu 0.52.6", + "windows_x86_64_gnullvm 0.52.6", + "windows_x86_64_msvc 0.52.6", ] +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b38e32f0abccf9987a4e3079dfb67dcd799fb61361e53e2882c3cbaf0d905d8" + [[package]] name = "windows_aarch64_gnullvm" version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "32a4622180e7a0ec044bb555404c800bc9fd9ec262ec147edd5989ccd0c02cd3" +[[package]] +name = "windows_aarch64_msvc" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dc35310971f3b2dbbf3f0690a219f40e2d9afcf64f9ab7cc1be722937c26b4bc" + [[package]] name = "windows_aarch64_msvc" version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "09ec2a7bb152e2252b53fa7803150007879548bc709c039df7627cabbd05d469" +[[package]] +name = "windows_i686_gnu" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a75915e7def60c94dcef72200b9a8e58e5091744960da64ec734a6c6e9b3743e" + [[package]] name = "windows_i686_gnu" version = "0.52.6" @@ -1665,30 +2416,99 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0eee52d38c090b3caa76c563b86c3a4bd71ef1a819287c19d586d7334ae8ed66" +[[package]] +name = "windows_i686_msvc" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f55c233f70c4b27f66c523580f78f1004e8b5a8b659e05a4eb49d4166cca406" + [[package]] name = "windows_i686_msvc" version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "240948bc05c5e7c6dabba28bf89d89ffce3e303022809e73deaefe4f6ec56c66" +[[package]] +name = "windows_x86_64_gnu" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "53d40abd2583d23e4718fddf1ebec84dbff8381c07cae67ff7768bbf19c6718e" + [[package]] name = "windows_x86_64_gnu" version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "147a5c80aabfbf0c7d901cb5895d1de30ef2907eb21fbbab29ca94c5b08b1a78" +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b7b52767868a23d5bab768e390dc5f5c55825b6d30b86c844ff2dc7414044cc" + [[package]] name = "windows_x86_64_gnullvm" version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "24d5b23dc417412679681396f2b49f3de8c1473deb516bd34410872eff51ed0d" +[[package]] +name = "windows_x86_64_msvc" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed94fce61571a4006852b7389a063ab983c02eb1bb37b47f8272ce92d06d9538" + [[package]] name = "windows_x86_64_msvc" version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec" +[[package]] +name = "winnow" +version = "0.6.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "36c1fec1a2bb5866f07c25f68c26e565c4c200aebb96d7e55710c19d3e8ac49b" +dependencies = [ + "memchr", +] + +[[package]] +name = "write16" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d1890f4022759daae28ed4fe62859b1236caebfc61ede2f63ed4e695f3f6d936" + +[[package]] +name = "writeable" +version = "0.5.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e9df38ee2d2c3c5948ea468a8406ff0db0b29ae1ffde1bcf20ef305bcc95c51" + +[[package]] +name = "yoke" +version = "0.7.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "120e6aef9aa629e3d4f52dc8cc43a015c7724194c97dfaf45180d2daf2b77f40" +dependencies = [ + "serde", + "stable_deref_trait", + "yoke-derive", + "zerofrom", +] + +[[package]] +name = "yoke-derive" +version = "0.7.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2380878cad4ac9aac1e2435f3eb4020e8374b5f13c296cb75b4620ff8e229154" +dependencies = [ + "proc-macro2", + "quote", + "syn", + "synstructure", +] + [[package]] name = "zerocopy" version = "0.7.35" @@ -1710,8 +2530,51 @@ dependencies = [ "syn", ] +[[package]] +name = "zerofrom" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cff3ee08c995dee1859d998dea82f7374f2826091dd9cd47def953cae446cd2e" +dependencies = [ + "zerofrom-derive", +] + +[[package]] +name = "zerofrom-derive" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "595eed982f7d355beb85837f651fa22e90b3c044842dc7f2c2842c086f295808" +dependencies = [ + "proc-macro2", + "quote", + "syn", + "synstructure", +] + [[package]] name = "zeroize" version = "1.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ced3678a2879b30306d323f4542626697a464a97c0a07c9aebf7ebca65cd4dde" + +[[package]] +name = "zerovec" +version = "0.10.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aa2b893d79df23bfb12d5461018d408ea19dfafe76c2c7ef6d4eba614f8ff079" +dependencies = [ + "yoke", + "zerofrom", + "zerovec-derive", +] + +[[package]] +name = "zerovec-derive" +version = "0.10.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6eafa6dfb17584ea3e2bd6e76e0cc15ad7af12b09abdd1ca55961bed9b1063c6" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] diff --git a/crates/churn/Cargo.toml b/crates/churn/Cargo.toml index 9b7808a..ba2bf7c 100644 --- a/crates/churn/Cargo.toml +++ b/crates/churn/Cargo.toml @@ -15,7 +15,7 @@ axum.workspace = true serde = { version = "1.0.197", features = ["derive"] } uuid = { version = "1.7.0", features = ["v4"] } tower-http = { version = "0.6.0", features = ["cors", "trace"] } -notmad = "0.6.0" +notmad = "0.7.1" tokio-util = "0.7.12" async-trait = "0.1.83" nodrift = "0.2.0" @@ -24,3 +24,7 @@ prost-types = "0.13.3" prost = "0.13.3" bytes = "1.8.0" tonic = { version = "0.12.3", features = ["tls", "tls-roots"] } +toml = "0.8.19" +dirs = "5.0.1" +futures = "0.3.31" +reqwest = { version = "0.12.9", features = ["json"] } diff --git a/crates/churn/proto/churn/v1/churn.proto b/crates/churn/proto/churn/v1/churn.proto index 77ef798..1a9d5b9 100644 --- a/crates/churn/proto/churn/v1/churn.proto +++ b/crates/churn/proto/churn/v1/churn.proto @@ -5,6 +5,7 @@ package churn.v1; service Churn { rpc GetKey(GetKeyRequest) returns (GetKeyResponse); rpc SetKey(SetKeyRequest) returns (SetKeyResponse); + rpc ListenEvents(ListenEventsRequest) returns (stream ListenEventsResponse); } message GetKeyRequest { @@ -23,3 +24,11 @@ message SetKeyRequest { string value = 4; } message SetKeyResponse {} + +message ListenEventsRequest { + string namespace = 1; + optional string id = 2; +} +message ListenEventsResponse { + string value = 1; +} diff --git a/crates/churn/src/agent.rs b/crates/churn/src/agent.rs index 4303461..251175b 100644 --- a/crates/churn/src/agent.rs +++ b/crates/churn/src/agent.rs @@ -1,80 +1,22 @@ use agent_state::AgentState; +use event_handler::EventHandler; use refresh::AgentRefresh; + +pub use config::setup_config; + mod agent_state; +mod config; +mod discovery_client; +mod event_handler; +mod grpc_client; mod refresh; -mod grpc_client { - use tonic::transport::{Channel, ClientTlsConfig}; - use crate::grpc::{churn_client::ChurnClient, *}; - - pub struct GrpcClient { - host: String, - } - - impl GrpcClient { - pub fn new(host: impl Into) -> Self { - Self { host: host.into() } - } - - pub async fn get_key( - &self, - namespace: &str, - id: Option>, - key: &str, - ) -> anyhow::Result> { - let mut client = self.client().await?; - - let resp = client - .get_key(GetKeyRequest { - key: key.into(), - namespace: namespace.into(), - id: id.map(|i| i.into()), - }) - .await?; - let resp = resp.into_inner(); - - Ok(resp.value) - } - - pub async fn set_key( - &self, - namespace: &str, - id: Option>, - key: &str, - value: &str, - ) -> anyhow::Result<()> { - let mut client = self.client().await?; - - client - .set_key(SetKeyRequest { - key: key.into(), - value: value.into(), - namespace: namespace.into(), - id: id.map(|i| i.into()), - }) - .await?; - - Ok(()) - } - - async fn client(&self) -> anyhow::Result> { - let channel = Channel::from_shared(self.host.to_owned())? - .tls_config(ClientTlsConfig::new().with_native_roots())? - .connect() - .await?; - - let client = ChurnClient::new(channel); - - Ok(client) - } - } -} - -pub async fn execute(host: impl Into) -> anyhow::Result<()> { +pub async fn execute() -> anyhow::Result<()> { let state = AgentState::new().await?; notmad::Mad::builder() - .add(AgentRefresh::new(&state, host)) + .add(AgentRefresh::new(&state)) + .add(EventHandler::new(&state)) .cancellation(Some(std::time::Duration::from_secs(2))) .run() .await?; diff --git a/crates/churn/src/agent/agent_state.rs b/crates/churn/src/agent/agent_state.rs index 9941f33..adb1605 100644 --- a/crates/churn/src/agent/agent_state.rs +++ b/crates/churn/src/agent/agent_state.rs @@ -1,5 +1,9 @@ use std::{ops::Deref, sync::Arc}; +use crate::api::Discovery; + +use super::{config::AgentConfig, discovery_client::DiscoveryClient, grpc_client::GrpcClient}; + #[derive(Clone)] pub struct AgentState(Arc); @@ -23,10 +27,24 @@ impl Deref for AgentState { } } -pub struct State {} +pub struct State { + pub grpc: GrpcClient, + pub config: AgentConfig, + pub discovery: Discovery, +} impl State { pub async fn new() -> anyhow::Result { - Ok(Self {}) + let config = AgentConfig::new().await?; + let discovery = DiscoveryClient::new(&config.discovery); + let discovery = discovery.discover().await?; + + let grpc = GrpcClient::new(&discovery.process_host); + + Ok(Self { + grpc, + config, + discovery, + }) } } diff --git a/crates/churn/src/agent/config.rs b/crates/churn/src/agent/config.rs new file mode 100644 index 0000000..56d3a18 --- /dev/null +++ b/crates/churn/src/agent/config.rs @@ -0,0 +1,80 @@ +use anyhow::Context; +use serde::{Deserialize, Serialize}; +use uuid::Uuid; + +#[derive(Clone)] +pub struct AgentConfig { + pub agent_id: String, + pub discovery: String, +} + +impl AgentConfig { + pub async fn new() -> anyhow::Result { + let config = ConfigFile::load().await?; + + Ok(Self { + agent_id: config.agent_id, + discovery: config.discovery, + }) + } +} + +#[derive(Serialize, Deserialize)] +struct ConfigFile { + agent_id: String, + discovery: String, +} + +impl ConfigFile { + pub async fn load() -> anyhow::Result { + let directory = dirs::data_dir() + .ok_or(anyhow::anyhow!("failed to get data dir"))? + .join("io.kjuulh.churn-agent") + .join("churn-agent.toml"); + + if !directory.exists() { + anyhow::bail!( + "No churn agent file was setup, run `churn agent setup` to setup the defaults" + ) + } + + let contents = tokio::fs::read_to_string(&directory).await?; + + toml::from_str(&contents).context("failed to parse the contents of the churn agent config") + } + + pub async fn write_default(discovery: impl Into, force: bool) -> anyhow::Result { + let s = Self { + agent_id: Uuid::new_v4().to_string(), + discovery: discovery.into(), + }; + + let directory = dirs::data_dir() + .ok_or(anyhow::anyhow!("failed to get data dir"))? + .join("io.kjuulh.churn-agent") + .join("churn-agent.toml"); + + if let Some(parent) = directory.parent() { + tokio::fs::create_dir_all(&parent).await?; + } + + if !force && directory.exists() { + anyhow::bail!("config file already exists, consider moving it to a backup before trying again: {}", directory.display()); + } + + let contents = toml::to_string_pretty(&s) + .context("failed to convert default implementation to string")?; + + tokio::fs::write(directory, contents.as_bytes()) + .await + .context("failed to write to agent file")?; + + Ok(s) + } +} + +pub async fn setup_config(discovery: impl Into, force: bool) -> anyhow::Result<()> { + ConfigFile::write_default(discovery, force).await?; + + Ok(()) +} diff --git a/crates/churn/src/agent/discovery_client.rs b/crates/churn/src/agent/discovery_client.rs new file mode 100644 index 0000000..67ac370 --- /dev/null +++ b/crates/churn/src/agent/discovery_client.rs @@ -0,0 +1,21 @@ +use crate::api::Discovery; + +pub struct DiscoveryClient { + host: String, +} + +impl DiscoveryClient { + pub fn new(discovery_host: impl Into) -> Self { + Self { + host: discovery_host.into(), + } + } + + pub async fn discover(&self) -> anyhow::Result { + tracing::info!( + "getting details from discovery endpoint: {}/discovery", + self.host.trim_end_matches('/') + ); + crate::api::Discovery::get_from_host(&self.host).await + } +} diff --git a/crates/churn/src/agent/event_handler.rs b/crates/churn/src/agent/event_handler.rs new file mode 100644 index 0000000..7109c4d --- /dev/null +++ b/crates/churn/src/agent/event_handler.rs @@ -0,0 +1,53 @@ +use notmad::{Component, MadError}; + +use super::{agent_state::AgentState, config::AgentConfig, grpc_client::GrpcClient}; + +#[derive(Clone)] +pub struct EventHandler { + config: AgentConfig, + grpc: GrpcClient, +} + +impl EventHandler { + pub fn new(state: impl Into) -> Self { + let state: AgentState = state.into(); + + Self { + config: state.config.clone(), + grpc: state.grpc.clone(), + } + } +} + +#[async_trait::async_trait] +impl Component for EventHandler { + fn name(&self) -> Option { + Some("event_handler".into()) + } + + async fn run( + &self, + cancellation_token: tokio_util::sync::CancellationToken, + ) -> Result<(), notmad::MadError> { + tokio::select! { + _ = cancellation_token.cancelled() => {}, + res = self.grpc.listen_events("agents", None::, self.clone()) => { + res.map_err(MadError::Inner)?; + }, + res = self.grpc.listen_events("agents", Some(&self.config.agent_id), self.clone()) => { + res.map_err(MadError::Inner)?; + } + } + + Ok(()) + } +} + +#[async_trait::async_trait] +impl super::grpc_client::ListenEventsExecutor for EventHandler { + async fn execute(&self, event: crate::grpc::ListenEventsResponse) -> anyhow::Result<()> { + tracing::info!(value = event.value, "received event"); + + Ok(()) + } +} diff --git a/crates/churn/src/agent/grpc_client.rs b/crates/churn/src/agent/grpc_client.rs new file mode 100644 index 0000000..234b99b --- /dev/null +++ b/crates/churn/src/agent/grpc_client.rs @@ -0,0 +1,100 @@ +use tonic::transport::{Channel, ClientTlsConfig}; + +use crate::grpc::{churn_client::ChurnClient, *}; + +#[derive(Clone)] +pub struct GrpcClient { + host: String, +} + +impl GrpcClient { + pub fn new(host: impl Into) -> Self { + Self { host: host.into() } + } + + pub async fn get_key( + &self, + namespace: &str, + id: Option>, + key: &str, + ) -> anyhow::Result> { + let mut client = self.client().await?; + + let resp = client + .get_key(GetKeyRequest { + key: key.into(), + namespace: namespace.into(), + id: id.map(|i| i.into()), + }) + .await?; + let resp = resp.into_inner(); + + Ok(resp.value) + } + + pub async fn set_key( + &self, + namespace: &str, + id: Option>, + key: &str, + value: &str, + ) -> anyhow::Result<()> { + let mut client = self.client().await?; + + client + .set_key(SetKeyRequest { + key: key.into(), + value: value.into(), + namespace: namespace.into(), + id: id.map(|i| i.into()), + }) + .await?; + + Ok(()) + } + + pub async fn listen_events( + &self, + namespace: &str, + id: Option>, + exec: impl ListenEventsExecutor, + ) -> anyhow::Result<()> { + let mut client = self.client().await?; + + let resp = client + .listen_events(ListenEventsRequest { + namespace: namespace.into(), + id: id.map(|i| i.into()), + }) + .await?; + + let mut inner = resp.into_inner(); + while let Ok(Some(message)) = inner.message().await { + exec.execute(message).await?; + } + + Ok(()) + } + + async fn client(&self) -> anyhow::Result> { + let channel = if self.host.starts_with("https") { + Channel::from_shared(self.host.to_owned())? + .tls_config(ClientTlsConfig::new().with_native_roots())? + .connect() + .await? + } else { + Channel::from_shared(self.host.to_owned())? + .connect() + .await? + }; + + let client = ChurnClient::new(channel); + + Ok(client) + } +} + +#[async_trait::async_trait] +pub trait ListenEventsExecutor { + async fn execute(&self, event: ListenEventsResponse) -> anyhow::Result<()>; +} diff --git a/crates/churn/src/agent/refresh.rs b/crates/churn/src/agent/refresh.rs index a9b3d4f..1c24dd8 100644 --- a/crates/churn/src/agent/refresh.rs +++ b/crates/churn/src/agent/refresh.rs @@ -4,21 +4,24 @@ use super::agent_state::AgentState; #[derive(Clone)] pub struct AgentRefresh { - _state: AgentState, - host: String, + process_host: String, } impl AgentRefresh { - pub fn new(state: impl Into, host: impl Into) -> Self { + pub fn new(state: impl Into) -> Self { + let state: AgentState = state.into(); Self { - _state: state.into(), - host: host.into(), + process_host: state.discovery.process_host.clone(), } } } #[async_trait::async_trait] impl notmad::Component for AgentRefresh { + fn name(&self) -> Option { + Some("agent_refresh".into()) + } + async fn run( &self, cancellation_token: tokio_util::sync::CancellationToken, @@ -39,7 +42,7 @@ impl notmad::Component for AgentRefresh { #[async_trait::async_trait] impl nodrift::Drifter for AgentRefresh { async fn execute(&self, _token: tokio_util::sync::CancellationToken) -> anyhow::Result<()> { - tracing::info!(host = self.host, "refreshing agent"); + tracing::info!(process_host = self.process_host, "refreshing agent"); // Get plan let plan = Plan::new(); diff --git a/crates/churn/src/api.rs b/crates/churn/src/api.rs index 76f0159..aa3ba4c 100644 --- a/crates/churn/src/api.rs +++ b/crates/churn/src/api.rs @@ -1,6 +1,12 @@ use std::net::SocketAddr; -use axum::{extract::MatchedPath, http::Request, routing::get, Router}; +use axum::{ + extract::{MatchedPath, State}, + http::Request, + routing::get, + Json, Router, +}; +use serde::{Deserialize, Serialize}; use tokio_util::sync::CancellationToken; use tower_http::trace::TraceLayer; @@ -22,6 +28,7 @@ impl Api { pub async fn serve(&self) -> anyhow::Result<()> { let app = Router::new() .route("/", get(root)) + .route("/discovery", get(discovery)) .with_state(self.state.clone()) .layer( TraceLayer::new_for_http().make_span_with(|request: &Request<_>| { @@ -55,6 +62,28 @@ async fn root() -> &'static str { "Hello, churn!" } +#[derive(Serialize, Deserialize)] +pub struct Discovery { + pub external_host: String, + pub process_host: String, +} + +impl Discovery { + pub async fn get_from_host(host: &str) -> anyhow::Result { + let resp = reqwest::get(format!("{}/discovery", host.trim_end_matches('/'))).await?; + let s: Self = resp.json().await?; + + Ok(s) + } +} + +async fn discovery(State(state): State) -> Json { + Json(Discovery { + external_host: state.config.external_host.clone(), + process_host: state.config.process_host.clone(), + }) +} + #[async_trait::async_trait] impl notmad::Component for Api { async fn run(&self, _cancellation_token: CancellationToken) -> Result<(), notmad::MadError> { diff --git a/crates/churn/src/cli.rs b/crates/churn/src/cli.rs index 72ab6dd..fd3e9b9 100644 --- a/crates/churn/src/cli.rs +++ b/crates/churn/src/cli.rs @@ -2,27 +2,29 @@ use std::net::SocketAddr; use clap::{Parser, Subcommand}; -use crate::{agent, api, state::SharedState}; +use crate::{agent, server}; pub async fn execute() -> anyhow::Result<()> { - let state = SharedState::new().await?; - let cli = Command::parse(); match cli.command.expect("to have a subcommand") { - Commands::Serve { host } => { + Commands::Serve { + host, + grpc_host, + config, + } => { tracing::info!("Starting service"); - - notmad::Mad::builder() - .add(api::Api::new(&state, host)) - .run() - .await?; + server::execute(host, grpc_host, config).await?; } Commands::Agent { commands } => match commands { - AgentCommands::Start { host } => { + AgentCommands::Start {} => { tracing::info!("starting agent"); - agent::execute(host).await?; + agent::execute().await?; tracing::info!("shut down agent"); } + AgentCommands::Setup { force, discovery } => { + agent::setup_config(discovery, force).await?; + tracing::info!("wrote default agent config"); + } }, } @@ -41,6 +43,12 @@ enum Commands { Serve { #[arg(env = "SERVICE_HOST", long, default_value = "127.0.0.1:3000")] host: SocketAddr, + + #[arg(env = "SERVICE_GRPC_HOST", long, default_value = "127.0.0.1:7900")] + grpc_host: SocketAddr, + + #[clap(flatten)] + config: server::config::ServerConfig, }, Agent { #[command(subcommand)] @@ -50,8 +58,12 @@ enum Commands { #[derive(Subcommand)] enum AgentCommands { - Start { - #[arg(env = "SERVICE_HOST", long = "service-host")] - host: String, + Start {}, + Setup { + #[arg(long, default_value = "false")] + force: bool, + + #[arg(env = "DISCOVERY_HOST", long = "discovery")] + discovery: String, }, } diff --git a/crates/churn/src/grpc/churn.v1.rs b/crates/churn/src/grpc/churn.v1.rs index 2717f7c..de04539 100644 --- a/crates/churn/src/grpc/churn.v1.rs +++ b/crates/churn/src/grpc/churn.v1.rs @@ -32,5 +32,19 @@ pub struct SetKeyRequest { #[derive(Clone, Copy, PartialEq, ::prost::Message)] pub struct SetKeyResponse { } +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct ListenEventsRequest { + #[prost(string, tag="1")] + pub namespace: ::prost::alloc::string::String, + #[prost(string, optional, tag="2")] + pub id: ::core::option::Option<::prost::alloc::string::String>, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct ListenEventsResponse { + #[prost(string, tag="1")] + pub value: ::prost::alloc::string::String, +} include!("churn.v1.tonic.rs"); // @@protoc_insertion_point(module) \ No newline at end of file diff --git a/crates/churn/src/grpc/churn.v1.tonic.rs b/crates/churn/src/grpc/churn.v1.tonic.rs index 8b748e3..37eb61d 100644 --- a/crates/churn/src/grpc/churn.v1.tonic.rs +++ b/crates/churn/src/grpc/churn.v1.tonic.rs @@ -122,6 +122,31 @@ pub mod churn_client { req.extensions_mut().insert(GrpcMethod::new("churn.v1.Churn", "SetKey")); self.inner.unary(req, path, codec).await } + pub async fn listen_events( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result< + tonic::Response>, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/churn.v1.Churn/ListenEvents", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert(GrpcMethod::new("churn.v1.Churn", "ListenEvents")); + self.inner.server_streaming(req, path, codec).await + } } } /// Generated server implementations. @@ -139,6 +164,19 @@ pub mod churn_server { &self, request: tonic::Request, ) -> std::result::Result, tonic::Status>; + /// Server streaming response type for the ListenEvents method. + type ListenEventsStream: tonic::codegen::tokio_stream::Stream< + Item = std::result::Result, + > + + Send + + 'static; + async fn listen_events( + &self, + request: tonic::Request, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; } #[derive(Debug)] pub struct ChurnServer { @@ -307,6 +345,53 @@ pub mod churn_server { }; Box::pin(fut) } + "/churn.v1.Churn/ListenEvents" => { + #[allow(non_camel_case_types)] + struct ListenEventsSvc(pub Arc); + impl< + T: Churn, + > tonic::server::ServerStreamingService + for ListenEventsSvc { + type Response = super::ListenEventsResponse; + type ResponseStream = T::ListenEventsStream; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::listen_events(&inner, request).await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let inner = inner.0; + let method = ListenEventsSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.server_streaming(method, req).await; + Ok(res) + }; + Box::pin(fut) + } _ => { Box::pin(async move { Ok( diff --git a/crates/churn/src/main.rs b/crates/churn/src/main.rs index 32526de..b275895 100644 --- a/crates/churn/src/main.rs +++ b/crates/churn/src/main.rs @@ -6,6 +6,7 @@ mod grpc { } mod agent; +mod server; #[tokio::main] async fn main() -> anyhow::Result<()> { diff --git a/crates/churn/src/server.rs b/crates/churn/src/server.rs new file mode 100644 index 0000000..90a3f7f --- /dev/null +++ b/crates/churn/src/server.rs @@ -0,0 +1,23 @@ +use std::net::SocketAddr; + +pub mod config; + +mod grpc_server; + +use crate::{api, state::SharedState}; + +pub async fn execute( + host: impl Into, + grpc_host: impl Into, + config: config::ServerConfig, +) -> anyhow::Result<()> { + let state = SharedState::new(config).await?; + + notmad::Mad::builder() + .add(api::Api::new(&state, host)) + .add(grpc_server::GrpcServer::new(grpc_host.into())) + .run() + .await?; + + Ok(()) +} diff --git a/crates/churn/src/server/config.rs b/crates/churn/src/server/config.rs new file mode 100644 index 0000000..3482907 --- /dev/null +++ b/crates/churn/src/server/config.rs @@ -0,0 +1,7 @@ +#[derive(clap::Args)] +pub struct ServerConfig { + #[arg(long = "external-host", env = "EXTERNAL_HOST")] + pub external_host: String, + #[arg(long = "process-host", env = "PROCESS_HOST")] + pub process_host: String, +} diff --git a/crates/churn/src/server/grpc_server.rs b/crates/churn/src/server/grpc_server.rs new file mode 100644 index 0000000..6022c9c --- /dev/null +++ b/crates/churn/src/server/grpc_server.rs @@ -0,0 +1,93 @@ +use std::{net::SocketAddr, pin::Pin}; + +use anyhow::Context; +use futures::Stream; +use notmad::{Component, MadError}; +use tonic::transport::Server; + +use crate::grpc::*; + +#[derive(Clone)] +pub struct GrpcServer { + grpc_host: SocketAddr, +} + +impl GrpcServer { + pub fn new(grpc_host: SocketAddr) -> Self { + Self { grpc_host } + } +} + +#[async_trait::async_trait] +impl Component for GrpcServer { + async fn run( + &self, + cancellation_token: tokio_util::sync::CancellationToken, + ) -> Result<(), notmad::MadError> { + let task = Server::builder() + .add_service(crate::grpc::churn_server::ChurnServer::new(self.clone())) + .serve(self.grpc_host); + + tokio::select! { + _ = cancellation_token.cancelled() => {}, + res = task => { + res.context("failed to run grpc server").map_err(MadError::Inner)?; + } + } + + Ok(()) + } +} + +#[async_trait::async_trait] +impl crate::grpc::churn_server::Churn for GrpcServer { + async fn get_key( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status> { + todo!() + } + + async fn set_key( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status> { + todo!() + } + + #[doc = " Server streaming response type for the ListenEvents method."] + type ListenEventsStream = + Pin> + Send>>; + + async fn listen_events( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status> { + let (tx, rx) = tokio::sync::mpsc::channel(128); + tokio::spawn(async move { + let mut interval = tokio::time::interval(std::time::Duration::from_secs(10)); + + loop { + interval.tick().await; + + if let Err(e) = tx + .send(Ok(ListenEventsResponse { + value: uuid::Uuid::new_v4().to_string(), + })) + .await + { + tracing::warn!("failed to send response: {}", e); + break; + } + } + }); + + let stream = futures::stream::unfold(rx, |mut msg| async move { + let next = msg.recv().await?; + + Some((next, msg)) + }); + + Ok(tonic::Response::new(Box::pin(stream))) + } +} diff --git a/crates/churn/src/state.rs b/crates/churn/src/state.rs index 02a3bc9..c69bb65 100644 --- a/crates/churn/src/state.rs +++ b/crates/churn/src/state.rs @@ -1,11 +1,13 @@ use std::{ops::Deref, sync::Arc}; +use crate::server::config::ServerConfig; + #[derive(Clone)] pub struct SharedState(Arc); impl SharedState { - pub async fn new() -> anyhow::Result { - Ok(Self(Arc::new(State::new().await?))) + pub async fn new(config: ServerConfig) -> anyhow::Result { + Ok(Self(Arc::new(State::new(config).await?))) } } @@ -23,10 +25,12 @@ impl Deref for SharedState { } } -pub struct State {} +pub struct State { + pub config: ServerConfig, +} impl State { - pub async fn new() -> anyhow::Result { - Ok(Self {}) + pub async fn new(config: ServerConfig) -> anyhow::Result { + Ok(Self { config }) } }