feat: add upload chunker
Some checks failed
continuous-integration/drone/push Build is failing

Signed-off-by: kjuulh <contact@kjuulh.io>
This commit is contained in:
Kasper Juul Hermansen 2024-02-18 22:25:43 +01:00
parent 6cf1a23169
commit 9cc3d80917
Signed by: kjuulh
GPG Key ID: 9AA7BC13CE474394
9 changed files with 153 additions and 34 deletions

2
Cargo.lock generated
View File

@ -1189,6 +1189,7 @@ dependencies = [
"serde_json",
"tar",
"tokio",
"tokio-stream",
"tonic",
"tonic-build",
"tracing",
@ -2891,6 +2892,7 @@ dependencies = [
"futures-core",
"pin-project-lite",
"tokio",
"tokio-util",
]
[[package]]

View File

@ -22,6 +22,7 @@ serde_json = "1.0.113"
nats = "0.24.1"
walkdir = "2.4.0"
tar = "0.4.40"
tokio-stream = { version = "0.1.14", features = ["full"] }
[build-dependencies]
tonic-build = "0.11.0"

View File

@ -2,16 +2,25 @@ syntax = "proto3";
package flux_releaser;
service Greeter {
rpc SayHello (HelloRequest) returns (HelloReply) {}
service FluxReleaser {
rpc UploadArtifact (stream UploadArtifactRequest) returns (UploadArtifactResponse) {}
rpc CommitArtifact (CommitArtifactRequest) returns (CommitArtifactResponse) {}
}
message HelloRequest {
message UploadArtifactRequest {
bytes content = 1;
}
message UploadArtifactResponse {
string upload_id = 1;
}
message CommitArtifactRequest {
string app = 1;
string branch = 2;
string folder = 3; // Change to files instead
string folder = 3;
}
message HelloReply {
message CommitArtifactResponse {
string artifact_id = 1;
}

View File

@ -1,6 +1,9 @@
use std::net::SocketAddr;
use std::{env::temp_dir, net::SocketAddr};
use tokio::io::AsyncWriteExt;
use tokio_stream::StreamExt;
use tonic::transport::Server;
use uuid::Uuid;
use crate::{
app::SharedApp,
@ -9,7 +12,10 @@ use crate::{
},
};
use self::gen::{greeter_server, HelloReply, HelloRequest};
use self::gen::{
flux_releaser_server, CommitArtifactRequest, CommitArtifactResponse, UploadArtifactRequest,
UploadArtifactResponse,
};
pub mod gen {
tonic::include_proto!("flux_releaser");
@ -28,11 +34,44 @@ impl FluxReleaserGrpc {
}
#[tonic::async_trait]
impl greeter_server::Greeter for FluxReleaserGrpc {
async fn say_hello(
impl flux_releaser_server::FluxReleaser for FluxReleaserGrpc {
async fn upload_artifact(
&self,
request: tonic::Request<HelloRequest>,
) -> std::result::Result<tonic::Response<HelloReply>, tonic::Status> {
request: tonic::Request<tonic::Streaming<UploadArtifactRequest>>,
) -> std::result::Result<tonic::Response<UploadArtifactResponse>, tonic::Status> {
let mut stream = request.into_inner();
let file_path = temp_dir()
.join("flux_releaser")
.join("tmp")
.join("upload_artifact")
.join(Uuid::new_v4().to_string());
tokio::fs::create_dir_all(file_path.parent().unwrap()).await?;
let mut file = tokio::fs::File::create(&file_path).await?;
while let Some(item) = stream.next().await {
tracing::trace!("received chunk");
let item = item?;
file.write_all(&item.content).await?;
}
file.flush().await?;
let upload_id = match self.release_manager.upload_artifact(file_path.into()).await {
Ok(res) => res,
Err(e) => return Err(tonic::Status::unknown(e.to_string())),
};
Ok(tonic::Response::new(UploadArtifactResponse {
upload_id: upload_id.to_string(),
}))
}
async fn commit_artifact(
&self,
request: tonic::Request<CommitArtifactRequest>,
) -> std::result::Result<tonic::Response<CommitArtifactResponse>, tonic::Status> {
let req = request.into_inner();
let artifact = self
.release_manager
@ -43,16 +82,16 @@ impl greeter_server::Greeter for FluxReleaserGrpc {
.await
.unwrap();
Ok(tonic::Response::new(HelloReply {
Ok(tonic::Response::new(CommitArtifactResponse {
artifact_id: artifact.to_string(),
}))
}
}
impl TryFrom<HelloRequest> for CommitArtifact {
impl TryFrom<CommitArtifactRequest> for CommitArtifact {
type Error = anyhow::Error;
fn try_from(value: HelloRequest) -> Result<Self, Self::Error> {
fn try_from(value: CommitArtifactRequest) -> Result<Self, Self::Error> {
if value.app.is_empty() {
anyhow::bail!("app cannot be empty")
}
@ -74,9 +113,9 @@ impl TryFrom<HelloRequest> for CommitArtifact {
pub async fn tonic_serve(host: SocketAddr, app: SharedApp) -> anyhow::Result<()> {
tracing::info!("grpc listening on: {}", host);
Server::builder()
.add_service(greeter_server::GreeterServer::new(FluxReleaserGrpc::new(
app,
)))
.add_service(flux_releaser_server::FluxReleaserServer::new(
FluxReleaserGrpc::new(app),
))
.serve(host)
.await?;

View File

@ -1,7 +1,7 @@
#[derive(Clone)]
pub struct Archive {}
use std::{io::Cursor, path::Path};
use std::io::Cursor;
pub mod extensions {
@ -90,7 +90,7 @@ pub mod extensions {
}
}
use super::file_reader::{File, Files};
use super::file_reader::Files;
impl Archive {
pub fn new() -> Self {

View File

@ -1,6 +1,6 @@
use std::{env::temp_dir, path::PathBuf};
use super::release_manager::models::ArtifactID;
use super::release_manager::models::{ArtifactID, UploadArtifactID};
pub mod extensions;
@ -31,6 +31,20 @@ impl FileStore {
Ok(())
}
pub async fn upload_temp(&self, id: UploadArtifactID, file: PathBuf) -> anyhow::Result<()> {
tracing::trace!("uploading temp files: {}", id.to_string());
self.client
.put_object()
.bucket("mybucket")
.key(format!("temp/{}.tar", &id.to_string()))
.body(ByteStream::from_path(file).await?)
.send()
.await?;
Ok(())
}
pub async fn get_archive(&self, artifact_id: ArtifactID) -> anyhow::Result<PathBuf> {
tracing::trace!("getting archive: {}", artifact_id.to_string());

View File

@ -7,7 +7,7 @@ use super::archive::Archive;
use super::domain_events::DomainEvents;
use super::file_reader::FileReader;
use self::models::{ArtifactID, CommitArtifact};
use self::models::{ArtifactID, CommitArtifact, UploadArtifact, UploadArtifactID};
pub struct ReleaseManager {
archive: Archive,
@ -31,6 +31,19 @@ impl ReleaseManager {
}
}
pub async fn upload_artifact(
&self,
request: UploadArtifact,
) -> anyhow::Result<UploadArtifactID> {
let upload_id = uuid::Uuid::now_v7();
self.file_store
.upload_temp(upload_id.into(), request.file_path)
.await?;
Ok(upload_id.into())
}
pub async fn commit_artifact(&self, request: CommitArtifact) -> anyhow::Result<ArtifactID> {
tracing::debug!("committing artifact: {:?}", request);
let artifact_id = ArtifactID::new();

View File

@ -1,4 +1,4 @@
use std::path::PathBuf;
use std::{ops::Deref, path::PathBuf};
#[derive(Clone, Debug)]
pub struct CommitArtifact {
@ -33,3 +33,28 @@ impl TryFrom<String> for ArtifactID {
Ok(ArtifactID(uuid))
}
}
pub struct UploadArtifact {
pub file_path: PathBuf,
}
impl From<PathBuf> for UploadArtifact {
fn from(value: PathBuf) -> Self {
Self { file_path: value }
}
}
pub struct UploadArtifactID(uuid::Uuid);
impl From<uuid::Uuid> for UploadArtifactID {
fn from(value: uuid::Uuid) -> Self {
Self(value)
}
}
impl Deref for UploadArtifactID {
type Target = uuid::Uuid;
fn deref(&self) -> &Self::Target {
&self.0
}
}

View File

@ -6,10 +6,13 @@ use std::{
use anyhow::Context;
use flux_releaser::{
app::SharedApp, grpc::gen::HelloRequest, services::file_store::extensions::FileStoreExt,
app::SharedApp,
grpc::gen::{
flux_releaser_client::FluxReleaserClient, CommitArtifactRequest, UploadArtifactRequest,
},
services::file_store::extensions::FileStoreExt,
};
use tokio::{net::TcpListener, runtime::Runtime, time::sleep};
use tonic::transport::Channel;
use uuid::Uuid;
struct Server {
@ -69,7 +72,7 @@ where
loop {
match task().await {
Ok(result) => return Ok(result),
Err(e) if retries < max_retries => {
Err(_e) if retries < max_retries => {
sleep(Duration::from_millis(delay)).await;
delay *= 2; // Exponential backoff
retries += 1;
@ -136,23 +139,36 @@ async fn setup() -> anyhow::Result<(Endpoints, SharedApp)> {
}
#[tokio::test]
async fn can_create_artifact() -> anyhow::Result<()> {
setup().await?;
async fn can_upload_artifact() -> anyhow::Result<()> {
let (endpoints, app) = setup().await?;
let mut client = FluxReleaserClient::connect(format!("http://{}", endpoints.grpc)).await?;
let bytes: [u8; 10000] = [0; 10000];
let chunks = bytes
.chunks(bytes.len() / 100)
.map(|ch| UploadArtifactRequest {
content: ch.to_vec(),
})
.collect::<Vec<_>>();
let req = tonic::Request::new(tokio_stream::iter(chunks));
let resp = client.upload_artifact(req).await?;
assert!(!resp.into_inner().upload_id.is_empty());
Ok(())
}
#[tokio::test]
async fn can_more_create_artifact() -> anyhow::Result<()> {
async fn can_publish_artifact() -> anyhow::Result<()> {
std::env::set_var("RUST_LOG", "flux_releaser=trace");
let (endpoints, app) = setup().await?;
let mut client = flux_releaser::grpc::gen::greeter_client::GreeterClient::connect(format!(
"http://{}",
endpoints.grpc
))
.await?;
let mut client = FluxReleaserClient::connect(format!("http://{}", endpoints.grpc)).await?;
let test_id = Uuid::new_v4();
@ -169,7 +185,7 @@ async fn can_more_create_artifact() -> anyhow::Result<()> {
let _ = tokio::fs::File::create(file_path).await?;
let resp = client
.say_hello(HelloRequest {
.commit_artifact(CommitArtifactRequest {
app: "some-app".into(),
branch: "some-branch".into(),
folder: temp.to_string_lossy().to_string(),