feat: add actual upload
Some checks failed
continuous-integration/drone/push Build is failing

Signed-off-by: kjuulh <contact@kjuulh.io>
This commit is contained in:
Kasper Juul Hermansen 2024-02-19 22:50:49 +01:00
parent a17dd2bd10
commit d35f5c8f22
Signed by: kjuulh
GPG Key ID: 9AA7BC13CE474394
18 changed files with 400 additions and 149 deletions

View File

@ -18,7 +18,7 @@ message UploadArtifactResponse {
message CommitArtifactRequest { message CommitArtifactRequest {
string app = 1; string app = 1;
string branch = 2; string branch = 2;
string folder = 3; string upload_id = 3;
} }
message CommitArtifactResponse { message CommitArtifactResponse {

View File

@ -1,9 +1,9 @@
use std::{ops::Deref, sync::Arc}; use std::{ops::Deref, sync::Arc};
use self::infra::{
aws_s3::s3_client,
grpc::{new_client, FluxReleaserGrpcClient},
use self::infra::aws_s3::s3_client; };
#[derive(Clone)] #[derive(Clone)]
pub struct SharedApp(Arc<App>); pub struct SharedApp(Arc<App>);
@ -37,3 +37,32 @@ impl App {
} }
pub mod infra; pub mod infra;
#[derive(Clone)]
pub struct SharedLocalApp(Arc<LocalApp>);
impl Deref for SharedLocalApp {
type Target = Arc<LocalApp>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl SharedLocalApp {
pub fn new(app: LocalApp) -> Self {
Self(Arc::new(app))
}
}
pub struct LocalApp {
pub grpc_client: FluxReleaserGrpcClient,
}
impl LocalApp {
pub async fn new(registry: impl Into<String>) -> anyhow::Result<Self> {
Ok(Self {
grpc_client: new_client(registry).await?,
})
}
}

View File

@ -1,2 +1,23 @@
pub mod aws_s3; pub mod aws_s3;
pub mod nats; pub mod nats;
pub mod grpc {
use std::sync::Arc;
use anyhow::Context;
use tokio::sync::Mutex;
use tonic::transport::Channel;
use crate::grpc::gen::flux_releaser_client::FluxReleaserClient;
pub type FluxReleaserGrpcClient = Arc<Mutex<FluxReleaserClient<Channel>>>;
pub async fn new_client(registry: impl Into<String>) -> anyhow::Result<FluxReleaserGrpcClient> {
let registry = registry.into();
let client = FluxReleaserClient::connect(registry)
.await
.context("failed to connect to flux_releaser registry")?;
Ok(Arc::new(Mutex::new(client)))
}
}

View File

@ -1,6 +1,23 @@
use clap::{Parser, Subcommand}; use clap::{Parser, Subcommand};
use std::net::SocketAddr; use std::net::SocketAddr;
use crate::services::flux_local_cluster::extensions::FluxLocalClusterManagerExt;
pub mod server;
pub mod client {
use crate::app::{LocalApp, SharedLocalApp};
pub async fn get_local_app(registry: impl Into<String>) -> anyhow::Result<SharedLocalApp> {
tracing_subscriber::fmt::init();
tracing::debug!("Starting client commit");
let app = SharedLocalApp::new(LocalApp::new(registry).await?);
Ok(app)
}
}
#[derive(Parser)] #[derive(Parser)]
#[command(author, version, about, long_about = None, subcommand_required = true)] #[command(author, version, about, long_about = None, subcommand_required = true)]
pub struct Command { pub struct Command {
@ -16,47 +33,42 @@ pub enum Commands {
#[arg(env = "SERVICE_GRPC_HOST", long, default_value = "127.0.0.1:7900")] #[arg(env = "SERVICE_GRPC_HOST", long, default_value = "127.0.0.1:7900")]
grpc_host: SocketAddr, grpc_host: SocketAddr,
}, },
Commit {
#[arg(long)]
app: String,
#[arg(long)]
branch: String,
#[arg(long)]
include: String,
#[arg(env = "FLUX_RELEASER_REGISTRY", long)]
registry: String,
},
} }
impl Command { impl Command {
pub async fn run() -> anyhow::Result<()> { pub async fn run() -> anyhow::Result<()> {
let cli = Command::parse(); let cli = Command::parse();
if let Some(Commands::Serve { host, grpc_host }) = cli.command { match cli.command {
Some(Commands::Serve { host, grpc_host }) => {
server::run_server(host, grpc_host).await?; server::run_server(host, grpc_host).await?;
} }
Some(Commands::Commit {
Ok(()) app,
} branch,
} include,
registry,
pub mod server { }) => {
use std::net::SocketAddr; let app = client::get_local_app(registry).await?;
use crate::{ let upload_id = app
api::axum_serve, .flux_local_cluster_manager()
app::{App, SharedApp}, .package_clusters(include)
grpc::tonic_serve, .await?;
}; }
_ => (),
pub async fn run_server( }
host: impl Into<SocketAddr>,
grpc_host: impl Into<SocketAddr>,
) -> anyhow::Result<()> {
tracing_subscriber::fmt::init();
tracing::info!("Starting service");
let app = SharedApp::new(App::new().await?);
tokio::select! {
res = axum_serve(host.into(), app.clone()) => {
res?;
},
res = tonic_serve(grpc_host.into(), app.clone()) => {
res?;
},
};
Ok(()) Ok(())
} }

View File

@ -0,0 +1,29 @@
use std::net::SocketAddr;
use crate::{
api::axum_serve,
app::{App, SharedApp},
grpc::tonic_serve,
};
pub async fn run_server(
host: impl Into<SocketAddr>,
grpc_host: impl Into<SocketAddr>,
) -> anyhow::Result<()> {
tracing_subscriber::fmt::init();
tracing::info!("Starting service");
let app = SharedApp::new(App::new().await?);
tokio::select! {
res = axum_serve(host.into(), app.clone()) => {
res?;
},
res = tonic_serve(grpc_host.into(), app.clone()) => {
res?;
},
};
Ok(())
}

View File

@ -98,14 +98,14 @@ impl TryFrom<CommitArtifactRequest> for CommitArtifact {
if value.branch.is_empty() { if value.branch.is_empty() {
anyhow::bail!("branch cannot be empty") anyhow::bail!("branch cannot be empty")
} }
if value.folder.is_empty() { if value.upload_id.is_empty() {
anyhow::bail!("folder cannot be empty") anyhow::bail!("folder cannot be empty")
} }
Ok(Self { Ok(Self {
app: value.app, app: value.app,
branch: value.branch, branch: value.branch,
folder: value.folder.into(), upload_id: value.upload_id.try_into()?,
}) })
} }
} }

View File

@ -2,4 +2,6 @@ pub mod archive;
pub mod domain_events; pub mod domain_events;
pub mod file_reader; pub mod file_reader;
pub mod file_store; pub mod file_store;
pub mod flux_local_cluster;
pub mod flux_releaser_uploader;
pub mod release_manager; pub mod release_manager;

View File

@ -3,6 +3,53 @@ pub struct Archive {}
use std::io::Cursor; use std::io::Cursor;
use super::file_reader::Files;
impl Archive {
pub fn new() -> Self {
Self {}
}
pub async fn create_archive(&self, files: Files) -> anyhow::Result<ArchiveFile> {
tracing::trace!("archiving files: {}", files.len());
let buffer = Vec::new();
let cursor = Cursor::new(buffer);
let mut tar_builder = tar::Builder::new(cursor);
for file in files {
let abs_file_path = file.path;
let mut fd = std::fs::File::open(&abs_file_path)?;
if let Some(rel) = file.relative {
tracing::trace!("archiving rel file: {}", rel.display());
tar_builder.append_file(&rel, &mut fd)?;
} else {
tracing::trace!("archiving file: {}", abs_file_path.display());
tar_builder.append_file(&abs_file_path, &mut fd)?;
}
}
tar_builder.finish()?;
let cursor = tar_builder.into_inner()?;
let buffer = cursor.into_inner();
Ok(buffer.into())
}
}
pub struct ArchiveFile {
pub content: Vec<u8>,
}
impl From<Vec<u8>> for ArchiveFile {
fn from(value: Vec<u8>) -> Self {
Self { content: value }
}
}
pub mod extensions { pub mod extensions {
use std::{env::temp_dir, path::PathBuf}; use std::{env::temp_dir, path::PathBuf};
@ -11,6 +58,7 @@ pub mod extensions {
use tokio::io::AsyncWriteExt; use tokio::io::AsyncWriteExt;
use uuid::Uuid; use uuid::Uuid;
use crate::app::SharedLocalApp;
use crate::{app::SharedApp, services::release_manager::models::ArtifactID}; use crate::{app::SharedApp, services::release_manager::models::ArtifactID};
use crate::services::file_store::FileStore; use crate::services::file_store::FileStore;
@ -28,6 +76,12 @@ pub mod extensions {
} }
} }
impl ArchiveExt for SharedLocalApp {
fn archive(&self) -> Archive {
Archive::new()
}
}
#[async_trait] #[async_trait]
pub trait ArchiveUploadExt { pub trait ArchiveUploadExt {
async fn upload_archive( async fn upload_archive(
@ -89,50 +143,3 @@ pub mod extensions {
} }
} }
} }
use super::file_reader::Files;
impl Archive {
pub fn new() -> Self {
Self {}
}
pub async fn create_archive(&self, files: Files) -> anyhow::Result<ArchiveFile> {
tracing::trace!("archiving files: {}", files.len());
let buffer = Vec::new();
let cursor = Cursor::new(buffer);
let mut tar_builder = tar::Builder::new(cursor);
for file in files {
let abs_file_path = file.path;
let mut fd = std::fs::File::open(&abs_file_path)?;
if let Some(rel) = file.relative {
tracing::trace!("archiving rel file: {}", rel.display());
tar_builder.append_file(&rel, &mut fd)?;
} else {
tracing::trace!("archiving file: {}", abs_file_path.display());
tar_builder.append_file(&abs_file_path, &mut fd)?;
}
}
tar_builder.finish()?;
let cursor = tar_builder.into_inner()?;
let buffer = cursor.into_inner();
Ok(buffer.into())
}
}
pub struct ArchiveFile {
pub content: Vec<u8>,
}
impl From<Vec<u8>> for ArchiveFile {
fn from(value: Vec<u8>) -> Self {
Self { content: value }
}
}

View File

@ -8,7 +8,7 @@ use std::{
pub mod extensions; pub mod extensions;
use anyhow::anyhow; use anyhow::{anyhow, Context};
impl FileReader { impl FileReader {
pub fn new() -> Self { pub fn new() -> Self {
@ -20,7 +20,9 @@ impl FileReader {
let mut clusters: BTreeMap<String, Vec<File>> = BTreeMap::new(); let mut clusters: BTreeMap<String, Vec<File>> = BTreeMap::new();
let mut dir = tokio::fs::read_dir(&location).await?; let mut dir = tokio::fs::read_dir(&location)
.await
.context(format!("failed to find location: {}", &location.display()))?;
while let Some(dir_entry) = dir.next_entry().await? { while let Some(dir_entry) = dir.next_entry().await? {
if dir_entry.metadata().await?.is_dir() { if dir_entry.metadata().await?.is_dir() {
tracing::trace!("found cluster in: {}", dir_entry.path().display()); tracing::trace!("found cluster in: {}", dir_entry.path().display());

View File

@ -1,4 +1,4 @@
use crate::app::SharedApp; use crate::app::{SharedApp, SharedLocalApp};
use super::FileReader; use super::FileReader;
@ -11,3 +11,9 @@ impl FileReaderExt for SharedApp {
FileReader::new() FileReader::new()
} }
} }
impl FileReaderExt for SharedLocalApp {
fn file_reader(&self) -> FileReader {
FileReader::new()
}
}

View File

@ -0,0 +1,91 @@
use std::path::PathBuf;
use crate::{app::infra::grpc::FluxReleaserGrpcClient, grpc::gen::CommitArtifactRequest};
use super::{
archive::Archive,
file_reader::FileReader,
flux_releaser_uploader::FluxReleaserUploader,
release_manager::models::{ArtifactID, UploadArtifactID},
};
pub struct FluxLocalClusterManager {
file_reader: FileReader,
archive: Archive,
flux_uploader: FluxReleaserUploader,
flux_releaser_client: FluxReleaserGrpcClient,
}
impl FluxLocalClusterManager {
pub fn new(
file_reader: FileReader,
archive: Archive,
flux_uploader: FluxReleaserUploader,
flux_releaser_client: FluxReleaserGrpcClient,
) -> Self {
Self {
file_reader,
archive,
flux_uploader,
flux_releaser_client,
}
}
pub async fn package_clusters(
&self,
include: impl Into<PathBuf>,
) -> anyhow::Result<UploadArtifactID> {
let files = self.file_reader.read_files(include.into()).await?;
let archive = self.archive.create_archive(files).await?;
let upload_id = self.flux_uploader.upload_archive(archive).await?;
Ok(upload_id)
}
pub async fn commit_artifact(
&self,
app: impl Into<String>,
branch: impl Into<String>,
upload_id: UploadArtifactID,
) -> anyhow::Result<ArtifactID> {
let artifact_id = self
.flux_releaser_client
.lock()
.await
.commit_artifact(tonic::Request::new(CommitArtifactRequest {
app: app.into(),
branch: branch.into(),
upload_id: upload_id.to_string(),
}))
.await?;
artifact_id.into_inner().artifact_id.try_into()
}
}
pub mod extensions {
use crate::{
app::SharedLocalApp,
services::{
archive::extensions::ArchiveExt, file_reader::extensions::FileReaderExt,
flux_releaser_uploader::extensions::FluxReleaserUploaderExt,
},
};
use super::FluxLocalClusterManager;
pub trait FluxLocalClusterManagerExt {
fn flux_local_cluster_manager(&self) -> FluxLocalClusterManager;
}
impl FluxLocalClusterManagerExt for SharedLocalApp {
fn flux_local_cluster_manager(&self) -> FluxLocalClusterManager {
FluxLocalClusterManager::new(
self.file_reader(),
self.archive(),
self.flux_releaser_uploader(),
self.grpc_client.clone(),
)
}
}
}

View File

@ -0,0 +1,53 @@
use tonic::transport::Channel;
use crate::{
app::infra::grpc::FluxReleaserGrpcClient,
grpc::gen::{flux_releaser_client::FluxReleaserClient, UploadArtifactRequest},
};
use super::{archive::ArchiveFile, release_manager::models::UploadArtifactID};
pub struct FluxReleaserUploader {
flux_client: FluxReleaserGrpcClient,
}
impl FluxReleaserUploader {
pub fn new(flux_client: FluxReleaserGrpcClient) -> Self {
Self { flux_client }
}
pub async fn upload_archive(&self, archive: ArchiveFile) -> anyhow::Result<UploadArtifactID> {
let chunks = archive
.content
.chunks(1_000_000) // Slice by about 1MB
.map(|ch| UploadArtifactRequest {
content: ch.to_vec(),
})
.collect::<Vec<_>>();
let resp = self
.flux_client
.lock()
.await
.upload_artifact(tonic::Request::new(tokio_stream::iter(chunks)))
.await?;
resp.into_inner().upload_id.try_into()
}
}
pub mod extensions {
use crate::app::SharedLocalApp;
use super::FluxReleaserUploader;
pub trait FluxReleaserUploaderExt {
fn flux_releaser_uploader(&self) -> FluxReleaserUploader;
}
impl FluxReleaserUploaderExt for SharedLocalApp {
fn flux_releaser_uploader(&self) -> FluxReleaserUploader {
FluxReleaserUploader::new(self.grpc_client.clone())
}
}
}

View File

@ -48,12 +48,10 @@ impl ReleaseManager {
tracing::debug!("committing artifact: {:?}", request); tracing::debug!("committing artifact: {:?}", request);
let artifact_id = ArtifactID::new(); let artifact_id = ArtifactID::new();
let files = self.file_reader.read_files(request.folder).await?; let artifact = self.file_store.get_temp(request.upload_id).await?;
let archive = self.archive.create_archive(files).await?;
self.file_store self.file_store
.upload_archive(artifact_id.clone(), archive) .upload_file(artifact_id.clone(), artifact)
.await?; .await?;
self.domain_events self.domain_events

View File

@ -4,7 +4,7 @@ use std::{ops::Deref, path::PathBuf};
pub struct CommitArtifact { pub struct CommitArtifact {
pub app: String, pub app: String,
pub branch: String, pub branch: String,
pub folder: PathBuf, pub upload_id: UploadArtifactID,
} }
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
@ -44,6 +44,7 @@ impl From<PathBuf> for UploadArtifact {
} }
} }
#[derive(Clone, Debug)]
pub struct UploadArtifactID(uuid::Uuid); pub struct UploadArtifactID(uuid::Uuid);
impl From<uuid::Uuid> for UploadArtifactID { impl From<uuid::Uuid> for UploadArtifactID {
fn from(value: uuid::Uuid) -> Self { fn from(value: uuid::Uuid) -> Self {

View File

@ -0,0 +1,14 @@
use flux_releaser::services::{
archive::Archive, file_reader::FileReader, flux_local_cluster::FluxLocalClusterManager,
};
#[tokio::test]
async fn can_package_files() -> anyhow::Result<()> {
tracing_subscriber::fmt().init();
//let sut = FluxLocalClusterManager::new(FileReader::new(), Archive::new());
//sut.package_clusters("testdata/flux_local_cluster/").await?;
Ok(())
}

View File

@ -1,19 +1,20 @@
use std::{ use std::{
env::temp_dir,
net::{Ipv4Addr, SocketAddr}, net::{Ipv4Addr, SocketAddr},
sync::Arc,
time::Duration, time::Duration,
}; };
use anyhow::Context; use anyhow::Context;
use flux_releaser::{ use flux_releaser::{
app::SharedApp, app::{LocalApp, SharedApp, SharedLocalApp},
grpc::gen::{ grpc::gen::flux_releaser_client::FluxReleaserClient,
flux_releaser_client::FluxReleaserClient, CommitArtifactRequest, UploadArtifactRequest, services::{
archive::ArchiveFile, file_store::extensions::FileStoreExt,
flux_local_cluster::extensions::FluxLocalClusterManagerExt,
flux_releaser_uploader::FluxReleaserUploader,
}, },
services::file_store::extensions::FileStoreExt,
}; };
use rand::{thread_rng, Rng}; use tokio::{net::TcpListener, runtime::Runtime, sync::Mutex, time::sleep};
use tokio::{net::TcpListener, runtime::Runtime, time::sleep};
use uuid::Uuid; use uuid::Uuid;
struct Server { struct Server {
@ -139,35 +140,34 @@ async fn setup() -> anyhow::Result<(Endpoints, SharedApp)> {
Ok(unsafe { (ENDPOINTS.clone().unwrap(), APP.clone().unwrap()) }) Ok(unsafe { (ENDPOINTS.clone().unwrap(), APP.clone().unwrap()) })
} }
async fn local_setup(endpoints: Endpoints) -> anyhow::Result<SharedLocalApp> {
Ok(SharedLocalApp::new(
LocalApp::new(format!("http://{}", endpoints.grpc)).await?,
))
}
#[tokio::test] #[tokio::test]
async fn can_upload_artifact() -> anyhow::Result<()> { async fn can_upload_artifact() -> anyhow::Result<()> {
std::env::set_var("RUST_LOG", "flux_releaser=trace");
let (endpoints, app) = setup().await?; let (endpoints, app) = setup().await?;
let mut client = FluxReleaserClient::connect(format!("http://{}", endpoints.grpc)).await?; let client = FluxReleaserClient::connect(format!("http://{}", endpoints.grpc)).await?;
let client = FluxReleaserUploader::new(Arc::new(Mutex::new(client)));
let mut bytes: [u8; 10000] = [0; 10000]; let bytes: Vec<u8> = vec![0; 10_000_000];
let upload_id = client
thread_rng().fill(&mut bytes[..]); .upload_archive(ArchiveFile {
content: bytes.clone(),
let chunks = bytes
.chunks(bytes.len() / 100)
.map(|ch| UploadArtifactRequest {
content: ch.to_vec(),
}) })
.collect::<Vec<_>>(); .await?;
let req = tonic::Request::new(tokio_stream::iter(chunks)); assert!(!upload_id.to_string().is_empty());
let resp = client.upload_artifact(req).await?; let actual_path = app.file_store().get_temp(upload_id).await?;
let upload_id = resp.into_inner().upload_id;
assert!(!upload_id.is_empty());
let actual_path = app.file_store().get_temp(upload_id.try_into()?).await?;
let actual_content = tokio::fs::read(actual_path).await?; let actual_content = tokio::fs::read(actual_path).await?;
assert_eq!(&bytes[..], actual_content.as_slice()); assert_eq!(&bytes, &actual_content);
Ok(()) Ok(())
} }
@ -177,40 +177,26 @@ async fn can_publish_artifact() -> anyhow::Result<()> {
std::env::set_var("RUST_LOG", "flux_releaser=trace"); std::env::set_var("RUST_LOG", "flux_releaser=trace");
let (endpoints, app) = setup().await?; let (endpoints, app) = setup().await?;
let local_app = local_setup(endpoints.clone()).await?;
let mut client = FluxReleaserClient::connect(format!("http://{}", endpoints.grpc)).await?; let upload_id = local_app
.flux_local_cluster_manager()
let test_id = Uuid::new_v4(); .package_clusters("testdata/flux_local_cluster")
let temp = temp_dir()
.join("flux_releaser")
.join("tests")
.join(test_id.to_string());
let file_path = temp
.join("clusters")
.join(Uuid::new_v4().to_string())
.join("some-file.yaml");
tokio::fs::create_dir_all(file_path.parent().unwrap()).await?;
let _ = tokio::fs::File::create(file_path).await?;
let resp = client
.commit_artifact(CommitArtifactRequest {
app: "some-app".into(),
branch: "some-branch".into(),
folder: temp.to_string_lossy().to_string(),
})
.await?; .await?;
let artifact = resp.into_inner(); let archive = app.file_store().get_temp(upload_id.clone()).await?;
let archive = app
.file_store()
.get_archive(artifact.artifact_id.try_into()?)
.await?;
assert!(archive.exists()); assert!(archive.exists());
let artifact_id = local_app
.flux_local_cluster_manager()
.commit_artifact("some-app", "some-branch", upload_id)
.await?;
let artifact = app.file_store().get_archive(artifact_id).await?;
assert!(artifact.exists());
Ok(()) Ok(())
} }