From 9cc3d809175c792892f2328deb9f1f4e6b17b9c1 Mon Sep 17 00:00:00 2001 From: kjuulh Date: Sun, 18 Feb 2024 22:25:43 +0100 Subject: [PATCH] feat: add upload chunker Signed-off-by: kjuulh --- Cargo.lock | 2 + crates/flux-releaser/Cargo.toml | 1 + .../schemas/proto/flux_releaser.proto | 19 ++++-- crates/flux-releaser/src/grpc.rs | 63 +++++++++++++++---- crates/flux-releaser/src/services/archive.rs | 4 +- .../flux-releaser/src/services/file_store.rs | 16 ++++- .../src/services/release_manager.rs | 15 ++++- .../src/services/release_manager/models.rs | 27 +++++++- .../flux-releaser/tests/publish_artifacts.rs | 40 ++++++++---- 9 files changed, 153 insertions(+), 34 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d824b80..15008fe 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1189,6 +1189,7 @@ dependencies = [ "serde_json", "tar", "tokio", + "tokio-stream", "tonic", "tonic-build", "tracing", @@ -2891,6 +2892,7 @@ dependencies = [ "futures-core", "pin-project-lite", "tokio", + "tokio-util", ] [[package]] diff --git a/crates/flux-releaser/Cargo.toml b/crates/flux-releaser/Cargo.toml index 2543236..24e4a76 100644 --- a/crates/flux-releaser/Cargo.toml +++ b/crates/flux-releaser/Cargo.toml @@ -22,6 +22,7 @@ serde_json = "1.0.113" nats = "0.24.1" walkdir = "2.4.0" tar = "0.4.40" +tokio-stream = { version = "0.1.14", features = ["full"] } [build-dependencies] tonic-build = "0.11.0" diff --git a/crates/flux-releaser/schemas/proto/flux_releaser.proto b/crates/flux-releaser/schemas/proto/flux_releaser.proto index 570f596..c158b9c 100644 --- a/crates/flux-releaser/schemas/proto/flux_releaser.proto +++ b/crates/flux-releaser/schemas/proto/flux_releaser.proto @@ -2,16 +2,25 @@ syntax = "proto3"; package flux_releaser; -service Greeter { - rpc SayHello (HelloRequest) returns (HelloReply) {} +service FluxReleaser { + rpc UploadArtifact (stream UploadArtifactRequest) returns (UploadArtifactResponse) {} + rpc CommitArtifact (CommitArtifactRequest) returns (CommitArtifactResponse) {} } -message HelloRequest { +message UploadArtifactRequest { + bytes content = 1; +} + +message UploadArtifactResponse { + string upload_id = 1; +} + +message CommitArtifactRequest { string app = 1; string branch = 2; - string folder = 3; // Change to files instead + string folder = 3; } -message HelloReply { +message CommitArtifactResponse { string artifact_id = 1; } diff --git a/crates/flux-releaser/src/grpc.rs b/crates/flux-releaser/src/grpc.rs index 2fedad3..dff7327 100644 --- a/crates/flux-releaser/src/grpc.rs +++ b/crates/flux-releaser/src/grpc.rs @@ -1,6 +1,9 @@ -use std::net::SocketAddr; +use std::{env::temp_dir, net::SocketAddr}; +use tokio::io::AsyncWriteExt; +use tokio_stream::StreamExt; use tonic::transport::Server; +use uuid::Uuid; use crate::{ app::SharedApp, @@ -9,7 +12,10 @@ use crate::{ }, }; -use self::gen::{greeter_server, HelloReply, HelloRequest}; +use self::gen::{ + flux_releaser_server, CommitArtifactRequest, CommitArtifactResponse, UploadArtifactRequest, + UploadArtifactResponse, +}; pub mod gen { tonic::include_proto!("flux_releaser"); @@ -28,11 +34,44 @@ impl FluxReleaserGrpc { } #[tonic::async_trait] -impl greeter_server::Greeter for FluxReleaserGrpc { - async fn say_hello( +impl flux_releaser_server::FluxReleaser for FluxReleaserGrpc { + async fn upload_artifact( &self, - request: tonic::Request, - ) -> std::result::Result, tonic::Status> { + request: tonic::Request>, + ) -> std::result::Result, tonic::Status> { + let mut stream = request.into_inner(); + + let file_path = temp_dir() + .join("flux_releaser") + .join("tmp") + .join("upload_artifact") + .join(Uuid::new_v4().to_string()); + tokio::fs::create_dir_all(file_path.parent().unwrap()).await?; + let mut file = tokio::fs::File::create(&file_path).await?; + + while let Some(item) = stream.next().await { + tracing::trace!("received chunk"); + let item = item?; + + file.write_all(&item.content).await?; + } + + file.flush().await?; + + let upload_id = match self.release_manager.upload_artifact(file_path.into()).await { + Ok(res) => res, + Err(e) => return Err(tonic::Status::unknown(e.to_string())), + }; + + Ok(tonic::Response::new(UploadArtifactResponse { + upload_id: upload_id.to_string(), + })) + } + + async fn commit_artifact( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status> { let req = request.into_inner(); let artifact = self .release_manager @@ -43,16 +82,16 @@ impl greeter_server::Greeter for FluxReleaserGrpc { .await .unwrap(); - Ok(tonic::Response::new(HelloReply { + Ok(tonic::Response::new(CommitArtifactResponse { artifact_id: artifact.to_string(), })) } } -impl TryFrom for CommitArtifact { +impl TryFrom for CommitArtifact { type Error = anyhow::Error; - fn try_from(value: HelloRequest) -> Result { + fn try_from(value: CommitArtifactRequest) -> Result { if value.app.is_empty() { anyhow::bail!("app cannot be empty") } @@ -74,9 +113,9 @@ impl TryFrom for CommitArtifact { pub async fn tonic_serve(host: SocketAddr, app: SharedApp) -> anyhow::Result<()> { tracing::info!("grpc listening on: {}", host); Server::builder() - .add_service(greeter_server::GreeterServer::new(FluxReleaserGrpc::new( - app, - ))) + .add_service(flux_releaser_server::FluxReleaserServer::new( + FluxReleaserGrpc::new(app), + )) .serve(host) .await?; diff --git a/crates/flux-releaser/src/services/archive.rs b/crates/flux-releaser/src/services/archive.rs index cd50691..0b50a19 100644 --- a/crates/flux-releaser/src/services/archive.rs +++ b/crates/flux-releaser/src/services/archive.rs @@ -1,7 +1,7 @@ #[derive(Clone)] pub struct Archive {} -use std::{io::Cursor, path::Path}; +use std::io::Cursor; pub mod extensions { @@ -90,7 +90,7 @@ pub mod extensions { } } -use super::file_reader::{File, Files}; +use super::file_reader::Files; impl Archive { pub fn new() -> Self { diff --git a/crates/flux-releaser/src/services/file_store.rs b/crates/flux-releaser/src/services/file_store.rs index b9e6e2b..79f39fc 100644 --- a/crates/flux-releaser/src/services/file_store.rs +++ b/crates/flux-releaser/src/services/file_store.rs @@ -1,6 +1,6 @@ use std::{env::temp_dir, path::PathBuf}; -use super::release_manager::models::ArtifactID; +use super::release_manager::models::{ArtifactID, UploadArtifactID}; pub mod extensions; @@ -31,6 +31,20 @@ impl FileStore { Ok(()) } + pub async fn upload_temp(&self, id: UploadArtifactID, file: PathBuf) -> anyhow::Result<()> { + tracing::trace!("uploading temp files: {}", id.to_string()); + + self.client + .put_object() + .bucket("mybucket") + .key(format!("temp/{}.tar", &id.to_string())) + .body(ByteStream::from_path(file).await?) + .send() + .await?; + + Ok(()) + } + pub async fn get_archive(&self, artifact_id: ArtifactID) -> anyhow::Result { tracing::trace!("getting archive: {}", artifact_id.to_string()); diff --git a/crates/flux-releaser/src/services/release_manager.rs b/crates/flux-releaser/src/services/release_manager.rs index 1a3894b..c947fe4 100644 --- a/crates/flux-releaser/src/services/release_manager.rs +++ b/crates/flux-releaser/src/services/release_manager.rs @@ -7,7 +7,7 @@ use super::archive::Archive; use super::domain_events::DomainEvents; use super::file_reader::FileReader; -use self::models::{ArtifactID, CommitArtifact}; +use self::models::{ArtifactID, CommitArtifact, UploadArtifact, UploadArtifactID}; pub struct ReleaseManager { archive: Archive, @@ -31,6 +31,19 @@ impl ReleaseManager { } } + pub async fn upload_artifact( + &self, + request: UploadArtifact, + ) -> anyhow::Result { + let upload_id = uuid::Uuid::now_v7(); + + self.file_store + .upload_temp(upload_id.into(), request.file_path) + .await?; + + Ok(upload_id.into()) + } + pub async fn commit_artifact(&self, request: CommitArtifact) -> anyhow::Result { tracing::debug!("committing artifact: {:?}", request); let artifact_id = ArtifactID::new(); diff --git a/crates/flux-releaser/src/services/release_manager/models.rs b/crates/flux-releaser/src/services/release_manager/models.rs index 689355e..47c09f5 100644 --- a/crates/flux-releaser/src/services/release_manager/models.rs +++ b/crates/flux-releaser/src/services/release_manager/models.rs @@ -1,4 +1,4 @@ -use std::path::PathBuf; +use std::{ops::Deref, path::PathBuf}; #[derive(Clone, Debug)] pub struct CommitArtifact { @@ -33,3 +33,28 @@ impl TryFrom for ArtifactID { Ok(ArtifactID(uuid)) } } + +pub struct UploadArtifact { + pub file_path: PathBuf, +} + +impl From for UploadArtifact { + fn from(value: PathBuf) -> Self { + Self { file_path: value } + } +} + +pub struct UploadArtifactID(uuid::Uuid); +impl From for UploadArtifactID { + fn from(value: uuid::Uuid) -> Self { + Self(value) + } +} + +impl Deref for UploadArtifactID { + type Target = uuid::Uuid; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} diff --git a/crates/flux-releaser/tests/publish_artifacts.rs b/crates/flux-releaser/tests/publish_artifacts.rs index 3f119ab..acadb6c 100644 --- a/crates/flux-releaser/tests/publish_artifacts.rs +++ b/crates/flux-releaser/tests/publish_artifacts.rs @@ -6,10 +6,13 @@ use std::{ use anyhow::Context; use flux_releaser::{ - app::SharedApp, grpc::gen::HelloRequest, services::file_store::extensions::FileStoreExt, + app::SharedApp, + grpc::gen::{ + flux_releaser_client::FluxReleaserClient, CommitArtifactRequest, UploadArtifactRequest, + }, + services::file_store::extensions::FileStoreExt, }; use tokio::{net::TcpListener, runtime::Runtime, time::sleep}; -use tonic::transport::Channel; use uuid::Uuid; struct Server { @@ -69,7 +72,7 @@ where loop { match task().await { Ok(result) => return Ok(result), - Err(e) if retries < max_retries => { + Err(_e) if retries < max_retries => { sleep(Duration::from_millis(delay)).await; delay *= 2; // Exponential backoff retries += 1; @@ -136,23 +139,36 @@ async fn setup() -> anyhow::Result<(Endpoints, SharedApp)> { } #[tokio::test] -async fn can_create_artifact() -> anyhow::Result<()> { - setup().await?; +async fn can_upload_artifact() -> anyhow::Result<()> { + let (endpoints, app) = setup().await?; + + let mut client = FluxReleaserClient::connect(format!("http://{}", endpoints.grpc)).await?; + + let bytes: [u8; 10000] = [0; 10000]; + + let chunks = bytes + .chunks(bytes.len() / 100) + .map(|ch| UploadArtifactRequest { + content: ch.to_vec(), + }) + .collect::>(); + + let req = tonic::Request::new(tokio_stream::iter(chunks)); + + let resp = client.upload_artifact(req).await?; + + assert!(!resp.into_inner().upload_id.is_empty()); Ok(()) } #[tokio::test] -async fn can_more_create_artifact() -> anyhow::Result<()> { +async fn can_publish_artifact() -> anyhow::Result<()> { std::env::set_var("RUST_LOG", "flux_releaser=trace"); let (endpoints, app) = setup().await?; - let mut client = flux_releaser::grpc::gen::greeter_client::GreeterClient::connect(format!( - "http://{}", - endpoints.grpc - )) - .await?; + let mut client = FluxReleaserClient::connect(format!("http://{}", endpoints.grpc)).await?; let test_id = Uuid::new_v4(); @@ -169,7 +185,7 @@ async fn can_more_create_artifact() -> anyhow::Result<()> { let _ = tokio::fs::File::create(file_path).await?; let resp = client - .say_hello(HelloRequest { + .commit_artifact(CommitArtifactRequest { app: "some-app".into(), branch: "some-branch".into(), folder: temp.to_string_lossy().to_string(),