From c8f4bae1f2696a958d1145c85b7b2aa802b60281 Mon Sep 17 00:00:00 2001 From: kjuulh Date: Sun, 17 Nov 2024 15:43:37 +0100 Subject: [PATCH] feat: add s3 and deployment Signed-off-by: kjuulh --- .env | 10 +- Cargo.lock | 768 +++++++++++++++++++-- crates/nodata-storage/Cargo.toml | 12 + crates/nodata-storage/src/backend.rs | 86 +-- crates/nodata-storage/src/backend/local.rs | 97 +++ crates/nodata-storage/src/backend/s3.rs | 171 +++++ crates/nodata-storage/src/lib.rs | 32 +- crates/nodata/src/grpc.rs | 7 +- crates/nodata/src/services/consumers.rs | 2 +- crates/nodata/src/services/staging.rs | 9 +- crates/nodata/src/state.rs | 18 +- cuddle.yaml | 39 +- 12 files changed, 1093 insertions(+), 158 deletions(-) create mode 100644 crates/nodata-storage/src/backend/local.rs create mode 100644 crates/nodata-storage/src/backend/s3.rs diff --git a/.env b/.env index 119ed0d..4503af0 100644 --- a/.env +++ b/.env @@ -1 +1,9 @@ - DATABASE_URL="postgres://root@localhost:26257/defaultdb?sslmode=disable" +DATABASE_URL="postgres://root@localhost:26257/defaultdb?sslmode=disable" +#STORAGE_BACKEND=local +#LOCAL_STORAGE_LOCATION=/tmp/nodata/local + +STORAGE_BACKEND=s3 +AWS_ACCESS_KEY_ID=OgAfuzefQRBHq4up2eYr +AWS_SECRET_ACCESS_KEY=nW85rHFOlZeMg7v6kkCikpYbyE3Pw28RS2O5FNZu +AWS_ENDPOINT_URL="https://api.minio.i.kjuulh.io" +AWS_BUCKET="nodata-dev" diff --git a/Cargo.lock b/Cargo.lock index 36e6a6f..b10ac86 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -40,9 +40,9 @@ dependencies = [ [[package]] name = "allocator-api2" -version = "0.2.19" +version = "0.2.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "611cc2ae7d2e242c457e4be7f97036b8ad9ca152b499f53faf99b1ed8fc2553f" +checksum = "45862d1c77f2228b9e10bc609d5bc203d86ebc9b87ad8d5d5167a6c9abf739d9" [[package]] name = "android-tzdata" @@ -175,10 +175,384 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ace50bade8e6234aa140d9a2f552bbee1db4d353f69b8217bc503490fc1a9f26" [[package]] -name = "axum" -version = "0.7.7" +name = "aws-config" +version = "1.5.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "504e3947307ac8326a5437504c517c4b56716c9d98fac0028c2acc7ca47d70ae" +checksum = "9b49afaa341e8dd8577e1a2200468f98956d6eda50bcf4a53246cc00174ba924" +dependencies = [ + "aws-credential-types", + "aws-runtime", + "aws-sdk-sso", + "aws-sdk-ssooidc", + "aws-sdk-sts", + "aws-smithy-async", + "aws-smithy-http", + "aws-smithy-json", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-types", + "bytes", + "fastrand", + "hex", + "http 0.2.12", + "ring", + "time", + "tokio", + "tracing", + "url", + "zeroize", +] + +[[package]] +name = "aws-credential-types" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "60e8f6b615cb5fc60a98132268508ad104310f0cfb25a1c22eee76efdf9154da" +dependencies = [ + "aws-smithy-async", + "aws-smithy-runtime-api", + "aws-smithy-types", + "zeroize", +] + +[[package]] +name = "aws-runtime" +version = "1.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a10d5c055aa540164d9561a0e2e74ad30f0dcf7393c3a92f6733ddf9c5762468" +dependencies = [ + "aws-credential-types", + "aws-sigv4", + "aws-smithy-async", + "aws-smithy-eventstream", + "aws-smithy-http", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-types", + "bytes", + "fastrand", + "http 0.2.12", + "http-body 0.4.6", + "once_cell", + "percent-encoding", + "pin-project-lite", + "tracing", + "uuid", +] + +[[package]] +name = "aws-sdk-s3" +version = "1.61.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0e531658a0397d22365dfe26c3e1c0c8448bf6a3a2d8a098ded802f2b1261615" +dependencies = [ + "aws-credential-types", + "aws-runtime", + "aws-sigv4", + "aws-smithy-async", + "aws-smithy-checksums", + "aws-smithy-eventstream", + "aws-smithy-http", + "aws-smithy-json", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-smithy-xml", + "aws-types", + "bytes", + "fastrand", + "hex", + "hmac", + "http 0.2.12", + "http-body 0.4.6", + "lru", + "once_cell", + "percent-encoding", + "regex-lite", + "sha2", + "tracing", + "url", +] + +[[package]] +name = "aws-sdk-sso" +version = "1.49.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09677244a9da92172c8dc60109b4a9658597d4d298b188dd0018b6a66b410ca4" +dependencies = [ + "aws-credential-types", + "aws-runtime", + "aws-smithy-async", + "aws-smithy-http", + "aws-smithy-json", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-types", + "bytes", + "http 0.2.12", + "once_cell", + "regex-lite", + "tracing", +] + +[[package]] +name = "aws-sdk-ssooidc" +version = "1.50.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "81fea2f3a8bb3bd10932ae7ad59cc59f65f270fc9183a7e91f501dc5efbef7ee" +dependencies = [ + "aws-credential-types", + "aws-runtime", + "aws-smithy-async", + "aws-smithy-http", + "aws-smithy-json", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-types", + "bytes", + "http 0.2.12", + "once_cell", + "regex-lite", + "tracing", +] + +[[package]] +name = "aws-sdk-sts" +version = "1.50.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6ada54e5f26ac246dc79727def52f7f8ed38915cb47781e2a72213957dc3a7d5" +dependencies = [ + "aws-credential-types", + "aws-runtime", + "aws-smithy-async", + "aws-smithy-http", + "aws-smithy-json", + "aws-smithy-query", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-smithy-xml", + "aws-types", + "http 0.2.12", + "once_cell", + "regex-lite", + "tracing", +] + +[[package]] +name = "aws-sigv4" +version = "1.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5619742a0d8f253be760bfbb8e8e8368c69e3587e4637af5754e488a611499b1" +dependencies = [ + "aws-credential-types", + "aws-smithy-eventstream", + "aws-smithy-http", + "aws-smithy-runtime-api", + "aws-smithy-types", + "bytes", + "crypto-bigint 0.5.5", + "form_urlencoded", + "hex", + "hmac", + "http 0.2.12", + "http 1.1.0", + "once_cell", + "p256", + "percent-encoding", + "ring", + "sha2", + "subtle", + "time", + "tracing", + "zeroize", +] + +[[package]] +name = "aws-smithy-async" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62220bc6e97f946ddd51b5f1361f78996e704677afc518a4ff66b7a72ea1378c" +dependencies = [ + "futures-util", + "pin-project-lite", + "tokio", +] + +[[package]] +name = "aws-smithy-checksums" +version = "0.60.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba1a71073fca26775c8b5189175ea8863afb1c9ea2cceb02a5de5ad9dfbaa795" +dependencies = [ + "aws-smithy-http", + "aws-smithy-types", + "bytes", + "crc32c", + "crc32fast", + "hex", + "http 0.2.12", + "http-body 0.4.6", + "md-5", + "pin-project-lite", + "sha1", + "sha2", + "tracing", +] + +[[package]] +name = "aws-smithy-eventstream" +version = "0.60.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cef7d0a272725f87e51ba2bf89f8c21e4df61b9e49ae1ac367a6d69916ef7c90" +dependencies = [ + "aws-smithy-types", + "bytes", + "crc32fast", +] + +[[package]] +name = "aws-smithy-http" +version = "0.60.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c8bc3e8fdc6b8d07d976e301c02fe553f72a39b7a9fea820e023268467d7ab6" +dependencies = [ + "aws-smithy-eventstream", + "aws-smithy-runtime-api", + "aws-smithy-types", + "bytes", + "bytes-utils", + "futures-core", + "http 0.2.12", + "http-body 0.4.6", + "once_cell", + "percent-encoding", + "pin-project-lite", + "pin-utils", + "tracing", +] + +[[package]] +name = "aws-smithy-json" +version = "0.60.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4683df9469ef09468dad3473d129960119a0d3593617542b7d52086c8486f2d6" +dependencies = [ + "aws-smithy-types", +] + +[[package]] +name = "aws-smithy-query" +version = "0.60.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f2fbd61ceb3fe8a1cb7352e42689cec5335833cd9f94103a61e98f9bb61c64bb" +dependencies = [ + "aws-smithy-types", + "urlencoding", +] + +[[package]] +name = "aws-smithy-runtime" +version = "1.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "be28bd063fa91fd871d131fc8b68d7cd4c5fa0869bea68daca50dcb1cbd76be2" +dependencies = [ + "aws-smithy-async", + "aws-smithy-http", + "aws-smithy-runtime-api", + "aws-smithy-types", + "bytes", + "fastrand", + "h2 0.3.26", + "http 0.2.12", + "http-body 0.4.6", + "http-body 1.0.1", + "httparse", + "hyper 0.14.31", + "hyper-rustls", + "once_cell", + "pin-project-lite", + "pin-utils", + "rustls 0.21.12", + "tokio", + "tracing", +] + +[[package]] +name = "aws-smithy-runtime-api" +version = "1.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "92165296a47a812b267b4f41032ff8069ab7ff783696d217f0994a0d7ab585cd" +dependencies = [ + "aws-smithy-async", + "aws-smithy-types", + "bytes", + "http 0.2.12", + "http 1.1.0", + "pin-project-lite", + "tokio", + "tracing", + "zeroize", +] + +[[package]] +name = "aws-smithy-types" +version = "1.2.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4fbd94a32b3a7d55d3806fe27d98d3ad393050439dd05eb53ece36ec5e3d3510" +dependencies = [ + "base64-simd", + "bytes", + "bytes-utils", + "futures-core", + "http 0.2.12", + "http 1.1.0", + "http-body 0.4.6", + "http-body 1.0.1", + "http-body-util", + "itoa", + "num-integer", + "pin-project-lite", + "pin-utils", + "ryu", + "serde", + "time", + "tokio", + "tokio-util", +] + +[[package]] +name = "aws-smithy-xml" +version = "0.60.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ab0b0166827aa700d3dc519f72f8b3a91c35d0b8d042dc5d643a91e6f80648fc" +dependencies = [ + "xmlparser", +] + +[[package]] +name = "aws-types" +version = "1.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5221b91b3e441e6675310829fd8984801b772cb1546ef6c0e54dec9f1ac13fef" +dependencies = [ + "aws-credential-types", + "aws-smithy-async", + "aws-smithy-runtime-api", + "aws-smithy-types", + "rustc_version", + "tracing", +] + +[[package]] +name = "axum" +version = "0.7.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "edca88bc138befd0323b20752846e6587272d3b03b0343c8ea28a6f819e6e71f" dependencies = [ "async-trait", "axum-core", @@ -244,6 +618,12 @@ dependencies = [ "windows-targets 0.52.6", ] +[[package]] +name = "base16ct" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "349a06037c7bf932dd7e7d1f653678b2038b9ad46a74102f1fc7bd7872678cce" + [[package]] name = "base64" version = "0.21.7" @@ -256,6 +636,16 @@ version = "0.22.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6" +[[package]] +name = "base64-simd" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "339abbe78e73178762e23bea9dfd08e697eb3f3301cd4be981c0f78ba5859195" +dependencies = [ + "outref", + "vsimd", +] + [[package]] name = "base64ct" version = "1.6.0" @@ -305,10 +695,20 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9ac0150caa2ae65ca5bd83f25c7de183dea78d4d366469f148435e2acfbad0da" [[package]] -name = "cc" -version = "1.1.37" +name = "bytes-utils" +version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "40545c26d092346d8a8dab71ee48e7685a7a9cba76e634790c215b41a4a7b4cf" +checksum = "7dafe3a8757b027e2be6e4e5601ed563c55989fcf1546e933c66c8eb3a058d35" +dependencies = [ + "bytes", + "either", +] + +[[package]] +name = "cc" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fd9de9f2205d5ef3fd67e685b0df337994ddd4495e2a28d185500d0e1edfea47" dependencies = [ "shlex", ] @@ -336,9 +736,9 @@ dependencies = [ [[package]] name = "clap" -version = "4.5.20" +version = "4.5.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b97f376d85a664d5837dbae44bf546e6477a679ff6610010f17276f686d867e8" +checksum = "fb3b4b9e5a7c7514dfa52869339ee98b3156b0bfb4e8a77c4ff4babb64b1604f" dependencies = [ "clap_builder", "clap_derive", @@ -346,9 +746,9 @@ dependencies = [ [[package]] name = "clap_builder" -version = "4.5.20" +version = "4.5.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "19bc80abd44e4bed93ca373a0704ccbd1b710dc5749406201bb018272808dc54" +checksum = "b17a95aa67cc7b5ebd32aa5370189aa0d79069ef1c64ce893bd30fb24bff20ec" dependencies = [ "anstream", "anstyle", @@ -370,9 +770,9 @@ dependencies = [ [[package]] name = "clap_lex" -version = "0.7.2" +version = "0.7.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1462739cb27611015575c0c11df5df7601141071f07518d56fcc1be504cbec97" +checksum = "afb84c814227b90d6895e01398aee0d8033c00e7466aca416fb6a8e0eb19d8a7" [[package]] name = "colorchoice" @@ -426,9 +826,9 @@ checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b" [[package]] name = "cpufeatures" -version = "0.2.14" +version = "0.2.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "608697df725056feaccfa42cffdaeeec3fccc4ffc38358ecd19b243e716a78e0" +checksum = "0ca741a962e1b0bff6d724a1a0958b686406e853bb14061f218562e1896f95e6" dependencies = [ "libc", ] @@ -448,6 +848,15 @@ version = "2.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "19d374276b40fb8bbdee95aef7c7fa6b5316ec764510eb64b8dd0e2ed0d7e7f5" +[[package]] +name = "crc32c" +version = "0.6.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3a47af21622d091a8f0fb295b88bc886ac74efcc613efc19f5d0b21de5c89e47" +dependencies = [ + "rustc_version", +] + [[package]] name = "crc32fast" version = "1.4.2" @@ -472,6 +881,28 @@ version = "0.8.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "22ec99545bb0ed0ea7bb9b8e1e9122ea386ff8a48c0922e43f36d45ab09e0e80" +[[package]] +name = "crypto-bigint" +version = "0.4.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ef2b4b23cddf68b89b8f8069890e8c270d54e2d5fe1b143820234805e4cb17ef" +dependencies = [ + "generic-array", + "rand_core", + "subtle", + "zeroize", +] + +[[package]] +name = "crypto-bigint" +version = "0.5.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0dc92fb57ca44df6db8059111ab3af99a63d5d0f8375d9972e319a379c6bab76" +dependencies = [ + "rand_core", + "subtle", +] + [[package]] name = "crypto-common" version = "0.1.6" @@ -547,6 +978,16 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "der" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f1a467a65c5e759bce6e65eaf91cc29f466cdc57cb65777bd646872a8a1fd4de" +dependencies = [ + "const-oid", + "zeroize", +] + [[package]] name = "der" version = "0.7.9" @@ -667,6 +1108,18 @@ dependencies = [ "tracing", ] +[[package]] +name = "ecdsa" +version = "0.14.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "413301934810f597c1d19ca71c8710e99a3f1ba28a0d2ebc01551a2daeea3c5c" +dependencies = [ + "der 0.6.1", + "elliptic-curve", + "rfc6979", + "signature 1.6.4", +] + [[package]] name = "either" version = "1.13.0" @@ -676,6 +1129,26 @@ dependencies = [ "serde", ] +[[package]] +name = "elliptic-curve" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e7bb888ab5300a19b8e5bceef25ac745ad065f3c9f7efc6de1b91958110891d3" +dependencies = [ + "base16ct", + "crypto-bigint 0.4.9", + "der 0.6.1", + "digest", + "ff", + "generic-array", + "group", + "pkcs8 0.9.0", + "rand_core", + "sec1", + "subtle", + "zeroize", +] + [[package]] name = "encoding_rs" version = "0.8.35" @@ -739,6 +1212,16 @@ version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "486f806e73c5707928240ddc295403b1b93c96a02038563881c4a2fd84b81ac4" +[[package]] +name = "ff" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d013fc25338cc558c5c2cfbad646908fb23591e2404481826742b651c9af7160" +dependencies = [ + "rand_core", + "subtle", +] + [[package]] name = "filetime" version = "0.2.25" @@ -753,9 +1236,9 @@ dependencies = [ [[package]] name = "flate2" -version = "1.0.34" +version = "1.0.35" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a1b589b4dc103969ad3cf85c950899926ec64300a1a46d76c03a6072957036f0" +checksum = "c936bfdafb507ebbf50b8074c54fa31c5be9a1e7e5f467dd659697041407d07c" dependencies = [ "crc32fast", "miniz_oxide", @@ -778,6 +1261,12 @@ version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" +[[package]] +name = "foldhash" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f81ec6369c545a7d40e4589b5597581fa1c441fe1cce96dd1de43159910a36a2" + [[package]] name = "form_urlencoded" version = "1.2.1" @@ -973,6 +1462,17 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "group" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5dfbfb3a6cfbd390d5c9564ab283a0349b9b9fcd46a706c1eb10e0db70bfbac7" +dependencies = [ + "ff", + "rand_core", + "subtle", +] + [[package]] name = "h2" version = "0.3.26" @@ -1032,6 +1532,11 @@ name = "hashbrown" version = "0.15.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3a9bfc1af68b1726ea47d3d5109de126281def866b33970e10fbab11b5dafab3" +dependencies = [ + "allocator-api2", + "equivalent", + "foldhash", +] [[package]] name = "hashlink" @@ -1221,7 +1726,9 @@ dependencies = [ "futures-util", "http 0.2.12", "hyper 0.14.31", + "log", "rustls 0.21.12", + "rustls-native-certs", "tokio", "tokio-rustls", ] @@ -1499,9 +2006,9 @@ dependencies = [ [[package]] name = "libc" -version = "0.2.162" +version = "0.2.164" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "18d287de67fe55fd7e1581fe933d965a5a9477b38e949cfa9f8574ef01506398" +checksum = "433bfe06b8c75da9b2e3fbea6e5329ff87748f0b144ef75306e674c3f6f7c13f" [[package]] name = "libm" @@ -1559,6 +2066,15 @@ version = "0.4.22" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a7a70ba024b9dc04c27ea2f0c0548feb474ec5c54bba33a7f72f873a39d07b24" +[[package]] +name = "lru" +version = "0.12.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "234cf4f4a04dc1f57e24b96cc0cd600cf2af460d4161ac5ecdd0af8e1f3b2a38" +dependencies = [ + "hashbrown 0.15.1", +] + [[package]] name = "matchers" version = "0.1.0" @@ -1672,6 +2188,9 @@ name = "nodata-storage" version = "0.1.0" dependencies = [ "anyhow", + "async-trait", + "aws-config", + "aws-sdk-s3", "bytes", "hex", "prost", @@ -1787,18 +2306,41 @@ version = "1.20.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1261fe7e33c73b354eab43b1273a57c8f967d0391e80353e51f764ac02cf6775" +[[package]] +name = "openssl-probe" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" + [[package]] name = "option-ext" version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "04744f49eae99ab78e0d5c0b603ab218f515ea8cfe5a456d7629ad883a3b6e7d" +[[package]] +name = "outref" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4030760ffd992bef45b0ae3f10ce1aba99e33464c90d14dd7c039884963ddc7a" + [[package]] name = "overload" version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" +[[package]] +name = "p256" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "51f44edd08f51e2ade572f141051021c5af22677e42b7dd28a88155151c33594" +dependencies = [ + "ecdsa", + "elliptic-curve", + "sha2", +] + [[package]] name = "parking" version = "2.2.1" @@ -1887,9 +2429,19 @@ version = "0.7.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c8ffb9f10fa047879315e6625af03c164b16962a5368d724ed16323b68ace47f" dependencies = [ - "der", - "pkcs8", - "spki", + "der 0.7.9", + "pkcs8 0.10.2", + "spki 0.7.3", +] + +[[package]] +name = "pkcs8" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9eca2c590a5f85da82668fa685c09ce2888b9430e83299debf1f34b65fd4a4ba" +dependencies = [ + "der 0.6.1", + "spki 0.6.0", ] [[package]] @@ -1898,8 +2450,8 @@ version = "0.10.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f950b2377845cebe5cf8b5165cb3cc1a5e0fa5cfa3e1f7f55707d8fd82e0a7b7" dependencies = [ - "der", - "spki", + "der 0.7.9", + "spki 0.7.3", ] [[package]] @@ -2062,7 +2614,7 @@ checksum = "b544ef1b4eac5dc2db33ea63606ae9ffcfac26c1416a2806ae0bf5f56b201191" dependencies = [ "aho-corasick", "memchr", - "regex-automata 0.4.8", + "regex-automata 0.4.9", "regex-syntax 0.8.5", ] @@ -2077,15 +2629,21 @@ dependencies = [ [[package]] name = "regex-automata" -version = "0.4.8" +version = "0.4.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "368758f23274712b504848e9d5a6f010445cc8b87a7cdb4d7cbee666c1288da3" +checksum = "809e8dc61f6de73b46c85f4c96486310fe304c434cfa43669d7b40f711150908" dependencies = [ "aho-corasick", "memchr", "regex-syntax 0.8.5", ] +[[package]] +name = "regex-lite" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "53a49587ad06b26609c52e423de037e7f57f20d53535d66e08c695f347df952a" + [[package]] name = "regex-syntax" version = "0.6.29" @@ -2141,6 +2699,17 @@ dependencies = [ "winreg", ] +[[package]] +name = "rfc6979" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7743f17af12fa0b03b803ba12cd6a8d9483a587e89c69445e3909655c0b9fabb" +dependencies = [ + "crypto-bigint 0.4.9", + "hmac", + "zeroize", +] + [[package]] name = "ring" version = "0.17.8" @@ -2168,10 +2737,10 @@ dependencies = [ "num-integer", "num-traits", "pkcs1", - "pkcs8", + "pkcs8 0.10.2", "rand_core", - "signature", - "spki", + "signature 2.2.0", + "spki 0.7.3", "subtle", "zeroize", ] @@ -2183,10 +2752,19 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "719b953e2095829ee67db738b3bfa9fa368c94900df327b3f07fe6e794d2fe1f" [[package]] -name = "rustix" -version = "0.38.39" +name = "rustc_version" +version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "375116bee2be9ed569afe2154ea6a99dfdffd257f533f187498c2a8f5feaf4ee" +checksum = "cfcb3a22ef46e85b45de6ee7e79d063319ebb6594faafcf1c225ea92ab6e9b92" +dependencies = [ + "semver", +] + +[[package]] +name = "rustix" +version = "0.38.40" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "99e4ea3e1cdc4b559b8e5650f9c8e5998e3e5c1343b4eaf034565f32318d63c0" dependencies = [ "bitflags 2.6.0", "errno", @@ -2209,9 +2787,9 @@ dependencies = [ [[package]] name = "rustls" -version = "0.23.16" +version = "0.23.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eee87ff5d9b36712a58574e12e9f0ea80f915a5b0ac518d322b24a465617925e" +checksum = "7f1a745511c54ba6d4465e8d5dfbd81b45791756de28d4981af70d6dca128f1e" dependencies = [ "once_cell", "ring", @@ -2221,6 +2799,18 @@ dependencies = [ "zeroize", ] +[[package]] +name = "rustls-native-certs" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a9aace74cb666635c918e9c12bc0d348266037aa8eb599b5cba565709a8dff00" +dependencies = [ + "openssl-probe", + "rustls-pemfile 1.0.4", + "schannel", + "security-framework", +] + [[package]] name = "rustls-pemfile" version = "1.0.4" @@ -2278,6 +2868,15 @@ version = "1.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f3cb5ba0dc43242ce17de99c180e96db90b235b8a9fdc9543c96d2209116bd9f" +[[package]] +name = "schannel" +version = "0.1.26" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "01227be5826fa0690321a2ba6c5cd57a19cf3f6a09e76973b58e61de6ab9d1c1" +dependencies = [ + "windows-sys 0.59.0", +] + [[package]] name = "scopeguard" version = "1.2.0" @@ -2294,6 +2893,49 @@ dependencies = [ "untrusted", ] +[[package]] +name = "sec1" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3be24c1842290c45df0a7bf069e0c268a747ad05a192f2fd7dcfdbc1cba40928" +dependencies = [ + "base16ct", + "der 0.6.1", + "generic-array", + "pkcs8 0.9.0", + "subtle", + "zeroize", +] + +[[package]] +name = "security-framework" +version = "2.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "897b2245f0b511c87893af39b033e5ca9cce68824c4d7e7630b5a1d339658d02" +dependencies = [ + "bitflags 2.6.0", + "core-foundation", + "core-foundation-sys", + "libc", + "security-framework-sys", +] + +[[package]] +name = "security-framework-sys" +version = "2.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fa39c7303dc58b5543c94d22c1766b0d31f2ee58306363ea622b10bbc075eaa2" +dependencies = [ + "core-foundation-sys", + "libc", +] + +[[package]] +name = "semver" +version = "1.0.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "61697e0a1c7e512e84a621326239844a24d8207b4669b41bc18b32ea5cbf988b" + [[package]] name = "serde" version = "1.0.215" @@ -2329,9 +2971,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.132" +version = "1.0.133" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d726bfaff4b320266d395898905d0eba0345aae23b54aee3a737e260fd46db03" +checksum = "c7fceb2473b9166b2294ef05efcb65a3db80803f0b03ef86a5fc88a2b85ee377" dependencies = [ "itoa", "memchr", @@ -2407,6 +3049,16 @@ dependencies = [ "libc", ] +[[package]] +name = "signature" +version = "1.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "74233d3b3b2f6d4b006dc19dee745e73e2a6bfb6f93607cd3b02bd5b00797d7c" +dependencies = [ + "digest", + "rand_core", +] + [[package]] name = "signature" version = "2.2.0" @@ -2454,6 +3106,16 @@ dependencies = [ "lock_api", ] +[[package]] +name = "spki" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "67cf02bbac7a337dc36e4f5a693db6c21e7863f45070f7064577eb4367a3212b" +dependencies = [ + "base64ct", + "der 0.6.1", +] + [[package]] name = "spki" version = "0.7.3" @@ -2461,7 +3123,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d91ed6c858b01f942cd56b37a94b3e0a1798290327d1236e4d9cf4eaca44d29d" dependencies = [ "base64ct", - "der", + "der 0.7.9", ] [[package]] @@ -2514,7 +3176,7 @@ dependencies = [ "once_cell", "paste", "percent-encoding", - "rustls 0.23.16", + "rustls 0.23.17", "rustls-pemfile 2.2.0", "serde", "serde_json", @@ -2806,18 +3468,18 @@ dependencies = [ [[package]] name = "thiserror" -version = "1.0.68" +version = "1.0.69" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "02dd99dc800bbb97186339685293e1cc5d9df1f8fae2d0aecd9ff1c77efea892" +checksum = "b6aaf5339b578ea85b50e080feb250a3e8ae8cfcdff9a461c9ec2904bc923f52" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.68" +version = "1.0.69" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a7c61ec9a6f64d2793d8a45faba21efbe3ced62a886d44c36a009b2b519b4c7e" +checksum = "4fee6c4efc90059e10f81e6d42c60a18f76588c3d74cb83a0b242a2b6c7504c1" dependencies = [ "proc-macro2", "quote", @@ -3201,6 +3863,12 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "urlencoding" +version = "2.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "daf8dba3b7eb870caf1ddeed7bc9d2a049f3cfdfae7cb521b087cc33ae4c49da" + [[package]] name = "utf16_iter" version = "1.0.5" @@ -3252,6 +3920,12 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6a02e4885ed3bc0f2de90ea6dd45ebcbb66dacffe03547fadbb0eeae2770887d" +[[package]] +name = "vsimd" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c3082ca00d5a5ef149bb8b555a72ae84c9c59f7250f013ac822ac2e49b19c64" + [[package]] name = "want" version = "0.3.1" @@ -3600,6 +4274,12 @@ dependencies = [ "rustix", ] +[[package]] +name = "xmlparser" +version = "0.13.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "66fee0b777b0f5ac1c69bb06d361268faafa61cd4682ae064a171c16c433e9e4" + [[package]] name = "yoke" version = "0.7.4" diff --git a/crates/nodata-storage/Cargo.toml b/crates/nodata-storage/Cargo.toml index 895861d..8ff27f3 100644 --- a/crates/nodata-storage/Cargo.toml +++ b/crates/nodata-storage/Cargo.toml @@ -11,6 +11,18 @@ tracing.workspace = true prost.workspace = true prost-types.workspace = true bytes.workspace = true +async-trait.workspace = true hex = "0.4.3" sha2 = "0.10.8" + +aws-config = { version = "1.5.10", features = [ + "behavior-version-latest", +], optional = true } +aws-sdk-s3 = { version = "1.61.0", features = [ + "behavior-version-latest", +], optional = true } + +[features] +default = ["s3"] +s3 = ["dep:aws-config", "dep:aws-sdk-s3"] diff --git a/crates/nodata-storage/src/backend.rs b/crates/nodata-storage/src/backend.rs index 05b9062..464b575 100644 --- a/crates/nodata-storage/src/backend.rs +++ b/crates/nodata-storage/src/backend.rs @@ -1,84 +1,18 @@ -use std::{ - env::temp_dir, - path::{Path, PathBuf}, - time::{SystemTime, UNIX_EPOCH}, -}; +use std::time::SystemTime; -use anyhow::Context; -use tokio::io::AsyncWriteExt; +use async_trait::async_trait; -pub struct StorageBackend { - location: PathBuf, -} +pub mod local; +#[cfg(feature = "s3")] +pub mod s3; -impl StorageBackend { - pub fn new(location: &Path) -> Self { - Self { - location: location.into(), - } - } - - pub fn temp() -> Self { - Self::new(&temp_dir().join("nodata")) - } - - pub async fn flush_segment(&self, topic: &str, buffer: &[u8]) -> anyhow::Result { - let segment_key = uuid::Uuid::now_v7(); - let segment_path = PathBuf::from("logs") - .join(topic) - .join(segment_key.to_string()); - tracing::trace!("writing segment file: {}", segment_path.display()); - let file_location = self.location.join(&segment_path); - if let Some(parent) = file_location.parent() { - tokio::fs::create_dir_all(parent) - .await - .context("failed to create storage backend dir")?; - } - - let mut segment_file = tokio::fs::File::create(&file_location).await?; - segment_file.write_all(buffer).await?; - segment_file.flush().await?; - - Ok(segment_key.to_string()) - } - - pub async fn append_index( +#[async_trait] +pub trait StorageBackend { + async fn flush_segment(&self, topic: &str, buffer: &[u8]) -> anyhow::Result; + async fn append_index( &self, topic: &str, segment_file: &str, time: SystemTime, - ) -> anyhow::Result<()> { - let index_path = PathBuf::from("indexes").join(topic); - tracing::trace!("writing index file: {}", index_path.display()); - let file_location = self.location.join(&index_path); - if let Some(parent) = file_location.parent() { - tokio::fs::create_dir_all(parent) - .await - .context("failed to create storage backend dir, index")?; - } - - if !file_location.exists() { - tokio::fs::File::create(&file_location).await?; - } - - let mut index_file = tokio::fs::File::options() - .append(true) - .open(&file_location) - .await?; - index_file - .write_all( - format!( - "{},{}\n", - time.duration_since(UNIX_EPOCH) - .expect("to be able to get time") - .as_secs(), - segment_file - ) - .as_bytes(), - ) - .await?; - index_file.flush().await?; - - Ok(()) - } + ) -> anyhow::Result<()>; } diff --git a/crates/nodata-storage/src/backend/local.rs b/crates/nodata-storage/src/backend/local.rs new file mode 100644 index 0000000..a7f5b27 --- /dev/null +++ b/crates/nodata-storage/src/backend/local.rs @@ -0,0 +1,97 @@ +use std::{ + env::temp_dir, + path::{Path, PathBuf}, + time::{SystemTime, UNIX_EPOCH}, +}; + +use anyhow::Context; +use async_trait::async_trait; +use tokio::io::AsyncWriteExt; + +use super::StorageBackend; + +pub struct LocalStorageBackend { + location: PathBuf, +} + +impl LocalStorageBackend { + pub fn new(location: &Path) -> Self { + Self { + location: location.into(), + } + } + + pub fn new_from_env() -> anyhow::Result { + Ok(Self::new(&PathBuf::from( + std::env::var("LOCAL_STORAGE_LOCATION") + .context("LOCAL_STORAGE_LOCATION was not found in env")?, + ))) + } + + pub fn temp() -> Self { + Self::new(&temp_dir().join("nodata")) + } +} + +#[async_trait] +impl StorageBackend for LocalStorageBackend { + async fn flush_segment(&self, topic: &str, buffer: &[u8]) -> anyhow::Result { + let segment_key = uuid::Uuid::now_v7(); + let segment_path = PathBuf::from("logs") + .join(topic) + .join(segment_key.to_string()); + tracing::trace!("writing segment file: {}", segment_path.display()); + let file_location = self.location.join(&segment_path); + if let Some(parent) = file_location.parent() { + tokio::fs::create_dir_all(parent) + .await + .context("failed to create storage backend dir")?; + } + + let mut segment_file = tokio::fs::File::create(&file_location).await?; + segment_file.write_all(buffer).await?; + segment_file.flush().await?; + + Ok(segment_key.to_string()) + } + + async fn append_index( + &self, + topic: &str, + segment_file: &str, + time: SystemTime, + ) -> anyhow::Result<()> { + let index_path = PathBuf::from("indexes").join(topic); + tracing::trace!("writing index file: {}", index_path.display()); + let file_location = self.location.join(&index_path); + if let Some(parent) = file_location.parent() { + tokio::fs::create_dir_all(parent) + .await + .context("failed to create storage backend dir, index")?; + } + + if !file_location.exists() { + tokio::fs::File::create(&file_location).await?; + } + + let mut index_file = tokio::fs::File::options() + .append(true) + .open(&file_location) + .await?; + index_file + .write_all( + format!( + "{},{}\n", + time.duration_since(UNIX_EPOCH) + .expect("to be able to get time") + .as_secs(), + segment_file + ) + .as_bytes(), + ) + .await?; + index_file.flush().await?; + + Ok(()) + } +} diff --git a/crates/nodata-storage/src/backend/s3.rs b/crates/nodata-storage/src/backend/s3.rs new file mode 100644 index 0000000..57a6c00 --- /dev/null +++ b/crates/nodata-storage/src/backend/s3.rs @@ -0,0 +1,171 @@ +use std::{ + collections::BTreeMap, + time::{SystemTime, UNIX_EPOCH}, +}; + +use anyhow::Context; +use async_trait::async_trait; +use aws_config::{BehaviorVersion, Region}; +use aws_sdk_s3::{ + config::Credentials, + primitives::{ByteStream, SdkBody}, +}; +use tokio::{ + io::{AsyncReadExt, BufReader}, + sync::RwLock, +}; + +use super::StorageBackend; + +pub struct S3StorageBackend { + client: aws_sdk_s3::Client, + bucket: String, + + index_lock: RwLock>>, +} + +impl S3StorageBackend { + pub async fn upload_file(&self, path: &str, buffer: &[u8]) -> anyhow::Result<()> { + tracing::trace!("committing file: {}", &path); + + self.client + .put_object() + .bucket(&self.bucket) + .key(path) + .body(ByteStream::new(SdkBody::from(buffer))) + .send() + .await?; + + Ok(()) + } + + pub async fn get_file(&self, path: &str) -> anyhow::Result>> { + tracing::trace!("getting file: {}", path); + + let obj = match self + .client + .get_object() + .bucket(&self.bucket) + .key(path) + .send() + .await + { + Ok(ok) => ok, + Err(err) => match err.into_service_error() { + aws_sdk_s3::operation::get_object::GetObjectError::NoSuchKey(_) => return Ok(None), + e => anyhow::bail!(e.to_string()), + }, + }; + + let mut buf_reader = BufReader::new(obj.body.into_async_read()); + + let mut output = Vec::new(); + buf_reader.read_buf(&mut output).await?; + + Ok(Some(output)) + } + + pub async fn append_file(&self, path: &str, buffer: &[u8]) -> anyhow::Result<()> { + tracing::trace!("appending file: {}", &path); + { + let mut index_lock = self.index_lock.write().await; + let item = index_lock.get(path); + if item.is_none() { + index_lock.insert(path.to_string(), RwLock::default()); + } + } + let index_lock = self.index_lock.read().await; + let item = index_lock.get(path).expect("to find a path lock"); + let lock = item.write().await; + + let file = self.get_file(path).await?; + match file { + Some(mut file_contents) => { + file_contents.extend_from_slice(buffer); + self.upload_file(path, &file_contents).await? + } + None => self.upload_file(path, buffer).await?, + } + + drop(lock); + + Ok(()) + } +} + +impl S3StorageBackend { + pub async fn new( + key_id: impl Into, + key: impl Into, + endpint_url: impl Into, + bucket: impl Into, + ) -> anyhow::Result { + let shared_config = aws_config::defaults(BehaviorVersion::latest()) + .region(Region::new("eu-west-1")) + .credentials_provider(Credentials::new( + key_id, + key, + None, + None, + env!("CARGO_PKG_NAME"), + )); + + let config = aws_sdk_s3::config::Builder::from(&shared_config.load().await) + .endpoint_url(endpint_url) + .force_path_style(true) + .build(); + + let client = aws_sdk_s3::Client::from_conf(config); + + Ok(Self { + client, + bucket: bucket.into(), + index_lock: RwLock::default(), + }) + } + + pub async fn new_from_env() -> anyhow::Result { + let key_id = std::env::var("AWS_ACCESS_KEY_ID").context("AWS_ACCESS_KEY_ID was not set")?; + let access_key = + std::env::var("AWS_SECRET_ACCESS_KEY").context("AWS_SECRET_ACCESS_KEY was not set")?; + let endpoint_url = + std::env::var("AWS_ENDPOINT_URL").context("AWS_ENDPOINT_URL was not set")?; + let bucket = std::env::var("AWS_BUCKET").context("AWS_BUCKET was not set")?; + + Self::new(key_id, access_key, endpoint_url, bucket).await + } +} + +#[async_trait] +impl StorageBackend for S3StorageBackend { + async fn flush_segment(&self, topic: &str, buffer: &[u8]) -> anyhow::Result { + let segment_key = uuid::Uuid::now_v7(); + + self.upload_file( + &format!("nodata/logs/{}/{}.pb", topic, &segment_key.to_string()), + buffer, + ) + .await?; + + Ok(segment_key.to_string()) + } + async fn append_index( + &self, + topic: &str, + segment_file: &str, + time: SystemTime, + ) -> anyhow::Result<()> { + self.append_file( + &format!("nodata/indexes/{}", topic), + format!( + "{},{}\n", + time.duration_since(UNIX_EPOCH) + .expect("to be able to get time") + .as_secs(), + segment_file + ) + .as_bytes(), + ) + .await + } +} diff --git a/crates/nodata-storage/src/lib.rs b/crates/nodata-storage/src/lib.rs index cbea281..c137592 100644 --- a/crates/nodata-storage/src/lib.rs +++ b/crates/nodata-storage/src/lib.rs @@ -8,7 +8,7 @@ use std::{collections::BTreeMap, sync::Arc, time::SystemTime}; use anyhow::Context; -use backend::StorageBackend; +use backend::{local::LocalStorageBackend, StorageBackend}; use proto::ProtoStorage; use sha2::{Digest, Sha256}; use tokio::sync::Mutex; @@ -21,19 +21,43 @@ pub mod backend; pub struct Storage { segment_size_bytes: usize, buffer: Arc>>>>, - backend: Arc, + backend: Arc, codec: ProtoStorage, } impl Storage { - pub fn new(backend: StorageBackend) -> Self { + pub fn new(backend: LocalStorageBackend) -> Self { Self { segment_size_bytes: 4096 * 1000, // 4MB buffer: Arc::default(), + codec: ProtoStorage::default(), backend: Arc::new(backend), - codec: ProtoStorage::default(), + } + } + + pub async fn new_from_env() -> anyhow::Result { + match std::env::var("STORAGE_BACKEND") + .context("failed to find STORAGE_BACKEND in env")? + .as_str() + { + "local" => Ok(Self { + segment_size_bytes: 4096 * 1000, // 4MB + buffer: Arc::default(), + codec: ProtoStorage::default(), + + backend: Arc::new(LocalStorageBackend::new_from_env()?), + }), + #[cfg(feature = "s3")] + "s3" => Ok(Self { + segment_size_bytes: 4 * 1024 * 1000, // 4MB + buffer: Arc::default(), + codec: ProtoStorage::default(), + + backend: Arc::new(backend::s3::S3StorageBackend::new_from_env().await?), + }), + backend => anyhow::bail!("backend is not supported: {}", backend), } } diff --git a/crates/nodata/src/grpc.rs b/crates/nodata/src/grpc.rs index 1eb669b..44d24ce 100644 --- a/crates/nodata/src/grpc.rs +++ b/crates/nodata/src/grpc.rs @@ -61,7 +61,12 @@ impl no_data_service_server::NoDataService for GrpcServer { self.counter.inc(); self.state.ingest().publish(req).await.map_err(|e| { - tracing::warn!(error = e.to_string(), "failed to handle ingest of data"); + let caused_by = e + .chain() + .map(|e| e.to_string()) + .collect::>() + .join(", "); + tracing::warn!("failed to handle ingest of data: {}: {}", e, caused_by); tonic::Status::internal(e.to_string()) })?; diff --git a/crates/nodata/src/services/consumers.rs b/crates/nodata/src/services/consumers.rs index 5d6a151..b6cc35c 100644 --- a/crates/nodata/src/services/consumers.rs +++ b/crates/nodata/src/services/consumers.rs @@ -288,7 +288,7 @@ mod test { let topic = "some-topic".to_string(); let offset = 9usize; - let staging = Staging::new(); + let staging = Staging::new().await?; // Publish 10 messages for _ in 0..10 { let offset = staging diff --git a/crates/nodata/src/services/staging.rs b/crates/nodata/src/services/staging.rs index 7b5ebb7..3d1062a 100644 --- a/crates/nodata/src/services/staging.rs +++ b/crates/nodata/src/services/staging.rs @@ -1,5 +1,6 @@ use std::{collections::BTreeMap, sync::Arc}; +use nodata_storage::backend::local::LocalStorageBackend; use tokio::sync::RwLock; use crate::state::SharedState; @@ -23,11 +24,11 @@ pub struct Staging { } impl Staging { - pub fn new() -> Self { - Self { + pub async fn new() -> anyhow::Result { + Ok(Self { store: Arc::default(), - storage: nodata_storage::Storage::new(nodata_storage::backend::StorageBackend::temp()), - } + storage: nodata_storage::Storage::new_from_env().await?, + }) } pub async fn publish( diff --git a/crates/nodata/src/state.rs b/crates/nodata/src/state.rs index a14b18f..51906f7 100644 --- a/crates/nodata/src/state.rs +++ b/crates/nodata/src/state.rs @@ -1,8 +1,6 @@ use std::{ops::Deref, sync::Arc}; -use anyhow::Context; use prometheus::Registry; -use sqlx::{Pool, Postgres}; use crate::services::{consumers::Consumers, handler::Handler, staging::Staging}; @@ -24,7 +22,6 @@ impl Deref for SharedState { } pub struct State { - pub _db: Pool, pub staging: Staging, pub consumers: Consumers, pub handler: Handler, @@ -33,23 +30,10 @@ pub struct State { impl State { pub async fn new() -> anyhow::Result { - let db = sqlx::PgPool::connect( - &std::env::var("DATABASE_URL").context("DATABASE_URL is not set")?, - ) - .await?; - - sqlx::migrate!("migrations/crdb") - .set_locking(false) - .run(&db) - .await?; - - let _ = sqlx::query("SELECT 1;").fetch_one(&db).await?; - - let staging = Staging::new(); + let staging = Staging::new().await?; let handler = Handler::new(staging.clone()); Ok(Self { - _db: db, consumers: Consumers::new(), staging, handler, diff --git a/cuddle.yaml b/cuddle.yaml index f14955f..57bc89c 100644 --- a/cuddle.yaml +++ b/cuddle.yaml @@ -6,16 +6,35 @@ vars: service: "nodata" registry: kasperhermansen - clusters: - clank-prod: - replicas: "3" - namespace: prod + database: + crdb: "false" + + ingress: + - internal: "true" + - internal_grpc: "true" -deployment: - registry: git@git.front.kjuulh.io:kjuulh/clank-clusters - env: - prod: - clusters: - - clank-prod +cuddle/clusters: + dev: + env: + service.host: "0.0.0.0:3001" + service.grpc.host: "0.0.0.0:4001" + storage.backend: "s3" + aws.endpoint.url: "https://api.minio.i.kjuulh.io" + aws.bucket: "nodata-dev" + aws.access.key.id: + vault: true + aws.secret.access.key: + vault: true + prod: + env: + service.host: "0.0.0.0:3001" + service.grpc.host: "0.0.0.0:4001" + storage.backend: "s3" + aws.endpoint.url: "https://api.minio.i.kjuulh.io" + aws.bucket: "nodata-prod" + aws.access.key.id: + vault: true + aws.secret.access.key: + vault: true