diff --git a/crates/flux-releaser/schemas/proto/flux_releaser.proto b/crates/flux-releaser/schemas/proto/flux_releaser.proto index c158b9c..4fde587 100644 --- a/crates/flux-releaser/schemas/proto/flux_releaser.proto +++ b/crates/flux-releaser/schemas/proto/flux_releaser.proto @@ -18,7 +18,7 @@ message UploadArtifactResponse { message CommitArtifactRequest { string app = 1; string branch = 2; - string folder = 3; + string upload_id = 3; } message CommitArtifactResponse { diff --git a/crates/flux-releaser/src/app.rs b/crates/flux-releaser/src/app.rs index 882184c..80bd6f6 100644 --- a/crates/flux-releaser/src/app.rs +++ b/crates/flux-releaser/src/app.rs @@ -1,9 +1,9 @@ use std::{ops::Deref, sync::Arc}; - - - -use self::infra::aws_s3::s3_client; +use self::infra::{ + aws_s3::s3_client, + grpc::{new_client, FluxReleaserGrpcClient}, +}; #[derive(Clone)] pub struct SharedApp(Arc); @@ -37,3 +37,32 @@ impl App { } pub mod infra; + +#[derive(Clone)] +pub struct SharedLocalApp(Arc); + +impl Deref for SharedLocalApp { + type Target = Arc; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl SharedLocalApp { + pub fn new(app: LocalApp) -> Self { + Self(Arc::new(app)) + } +} + +pub struct LocalApp { + pub grpc_client: FluxReleaserGrpcClient, +} + +impl LocalApp { + pub async fn new(registry: impl Into) -> anyhow::Result { + Ok(Self { + grpc_client: new_client(registry).await?, + }) + } +} diff --git a/crates/flux-releaser/src/app/infra.rs b/crates/flux-releaser/src/app/infra.rs index d8acfad..14204a3 100644 --- a/crates/flux-releaser/src/app/infra.rs +++ b/crates/flux-releaser/src/app/infra.rs @@ -1,2 +1,23 @@ pub mod aws_s3; pub mod nats; +pub mod grpc { + use std::sync::Arc; + + use anyhow::Context; + use tokio::sync::Mutex; + use tonic::transport::Channel; + + use crate::grpc::gen::flux_releaser_client::FluxReleaserClient; + + pub type FluxReleaserGrpcClient = Arc>>; + + pub async fn new_client(registry: impl Into) -> anyhow::Result { + let registry = registry.into(); + + let client = FluxReleaserClient::connect(registry) + .await + .context("failed to connect to flux_releaser registry")?; + + Ok(Arc::new(Mutex::new(client))) + } +} diff --git a/crates/flux-releaser/src/cli.rs b/crates/flux-releaser/src/cli.rs index ec880cf..d3ff164 100644 --- a/crates/flux-releaser/src/cli.rs +++ b/crates/flux-releaser/src/cli.rs @@ -1,6 +1,23 @@ use clap::{Parser, Subcommand}; use std::net::SocketAddr; +use crate::services::flux_local_cluster::extensions::FluxLocalClusterManagerExt; + +pub mod server; +pub mod client { + use crate::app::{LocalApp, SharedLocalApp}; + + pub async fn get_local_app(registry: impl Into) -> anyhow::Result { + tracing_subscriber::fmt::init(); + + tracing::debug!("Starting client commit"); + + let app = SharedLocalApp::new(LocalApp::new(registry).await?); + + Ok(app) + } +} + #[derive(Parser)] #[command(author, version, about, long_about = None, subcommand_required = true)] pub struct Command { @@ -16,48 +33,43 @@ pub enum Commands { #[arg(env = "SERVICE_GRPC_HOST", long, default_value = "127.0.0.1:7900")] grpc_host: SocketAddr, }, + + Commit { + #[arg(long)] + app: String, + #[arg(long)] + branch: String, + #[arg(long)] + include: String, + #[arg(env = "FLUX_RELEASER_REGISTRY", long)] + registry: String, + }, } impl Command { pub async fn run() -> anyhow::Result<()> { let cli = Command::parse(); - if let Some(Commands::Serve { host, grpc_host }) = cli.command { - server::run_server(host, grpc_host).await?; + match cli.command { + Some(Commands::Serve { host, grpc_host }) => { + server::run_server(host, grpc_host).await?; + } + Some(Commands::Commit { + app, + branch, + include, + registry, + }) => { + let app = client::get_local_app(registry).await?; + + let upload_id = app + .flux_local_cluster_manager() + .package_clusters(include) + .await?; + } + _ => (), } Ok(()) } } - -pub mod server { - use std::net::SocketAddr; - - use crate::{ - api::axum_serve, - app::{App, SharedApp}, - grpc::tonic_serve, - }; - - pub async fn run_server( - host: impl Into, - grpc_host: impl Into, - ) -> anyhow::Result<()> { - tracing_subscriber::fmt::init(); - - tracing::info!("Starting service"); - - let app = SharedApp::new(App::new().await?); - - tokio::select! { - res = axum_serve(host.into(), app.clone()) => { - res?; - }, - res = tonic_serve(grpc_host.into(), app.clone()) => { - res?; - }, - }; - - Ok(()) - } -} diff --git a/crates/flux-releaser/src/cli/server.rs b/crates/flux-releaser/src/cli/server.rs new file mode 100644 index 0000000..90132f8 --- /dev/null +++ b/crates/flux-releaser/src/cli/server.rs @@ -0,0 +1,29 @@ +use std::net::SocketAddr; + +use crate::{ + api::axum_serve, + app::{App, SharedApp}, + grpc::tonic_serve, +}; + +pub async fn run_server( + host: impl Into, + grpc_host: impl Into, +) -> anyhow::Result<()> { + tracing_subscriber::fmt::init(); + + tracing::info!("Starting service"); + + let app = SharedApp::new(App::new().await?); + + tokio::select! { + res = axum_serve(host.into(), app.clone()) => { + res?; + }, + res = tonic_serve(grpc_host.into(), app.clone()) => { + res?; + }, + }; + + Ok(()) +} diff --git a/crates/flux-releaser/src/grpc.rs b/crates/flux-releaser/src/grpc.rs index c3d3f7c..70e3629 100644 --- a/crates/flux-releaser/src/grpc.rs +++ b/crates/flux-releaser/src/grpc.rs @@ -98,14 +98,14 @@ impl TryFrom for CommitArtifact { if value.branch.is_empty() { anyhow::bail!("branch cannot be empty") } - if value.folder.is_empty() { + if value.upload_id.is_empty() { anyhow::bail!("folder cannot be empty") } Ok(Self { app: value.app, branch: value.branch, - folder: value.folder.into(), + upload_id: value.upload_id.try_into()?, }) } } diff --git a/crates/flux-releaser/src/services.rs b/crates/flux-releaser/src/services.rs index ea1b36e..4ebf58d 100644 --- a/crates/flux-releaser/src/services.rs +++ b/crates/flux-releaser/src/services.rs @@ -2,4 +2,6 @@ pub mod archive; pub mod domain_events; pub mod file_reader; pub mod file_store; +pub mod flux_local_cluster; +pub mod flux_releaser_uploader; pub mod release_manager; diff --git a/crates/flux-releaser/src/services/archive.rs b/crates/flux-releaser/src/services/archive.rs index 0b50a19..4a42c8e 100644 --- a/crates/flux-releaser/src/services/archive.rs +++ b/crates/flux-releaser/src/services/archive.rs @@ -3,6 +3,53 @@ pub struct Archive {} use std::io::Cursor; +use super::file_reader::Files; + +impl Archive { + pub fn new() -> Self { + Self {} + } + + pub async fn create_archive(&self, files: Files) -> anyhow::Result { + tracing::trace!("archiving files: {}", files.len()); + + let buffer = Vec::new(); + let cursor = Cursor::new(buffer); + + let mut tar_builder = tar::Builder::new(cursor); + + for file in files { + let abs_file_path = file.path; + + let mut fd = std::fs::File::open(&abs_file_path)?; + if let Some(rel) = file.relative { + tracing::trace!("archiving rel file: {}", rel.display()); + tar_builder.append_file(&rel, &mut fd)?; + } else { + tracing::trace!("archiving file: {}", abs_file_path.display()); + tar_builder.append_file(&abs_file_path, &mut fd)?; + } + } + + tar_builder.finish()?; + + let cursor = tar_builder.into_inner()?; + let buffer = cursor.into_inner(); + + Ok(buffer.into()) + } +} + +pub struct ArchiveFile { + pub content: Vec, +} + +impl From> for ArchiveFile { + fn from(value: Vec) -> Self { + Self { content: value } + } +} + pub mod extensions { use std::{env::temp_dir, path::PathBuf}; @@ -11,6 +58,7 @@ pub mod extensions { use tokio::io::AsyncWriteExt; use uuid::Uuid; + use crate::app::SharedLocalApp; use crate::{app::SharedApp, services::release_manager::models::ArtifactID}; use crate::services::file_store::FileStore; @@ -28,6 +76,12 @@ pub mod extensions { } } + impl ArchiveExt for SharedLocalApp { + fn archive(&self) -> Archive { + Archive::new() + } + } + #[async_trait] pub trait ArchiveUploadExt { async fn upload_archive( @@ -89,50 +143,3 @@ pub mod extensions { } } } - -use super::file_reader::Files; - -impl Archive { - pub fn new() -> Self { - Self {} - } - - pub async fn create_archive(&self, files: Files) -> anyhow::Result { - tracing::trace!("archiving files: {}", files.len()); - - let buffer = Vec::new(); - let cursor = Cursor::new(buffer); - - let mut tar_builder = tar::Builder::new(cursor); - - for file in files { - let abs_file_path = file.path; - - let mut fd = std::fs::File::open(&abs_file_path)?; - if let Some(rel) = file.relative { - tracing::trace!("archiving rel file: {}", rel.display()); - tar_builder.append_file(&rel, &mut fd)?; - } else { - tracing::trace!("archiving file: {}", abs_file_path.display()); - tar_builder.append_file(&abs_file_path, &mut fd)?; - } - } - - tar_builder.finish()?; - - let cursor = tar_builder.into_inner()?; - let buffer = cursor.into_inner(); - - Ok(buffer.into()) - } -} - -pub struct ArchiveFile { - pub content: Vec, -} - -impl From> for ArchiveFile { - fn from(value: Vec) -> Self { - Self { content: value } - } -} diff --git a/crates/flux-releaser/src/services/file_reader.rs b/crates/flux-releaser/src/services/file_reader.rs index 0614896..cc09f64 100644 --- a/crates/flux-releaser/src/services/file_reader.rs +++ b/crates/flux-releaser/src/services/file_reader.rs @@ -8,7 +8,7 @@ use std::{ pub mod extensions; -use anyhow::anyhow; +use anyhow::{anyhow, Context}; impl FileReader { pub fn new() -> Self { @@ -20,7 +20,9 @@ impl FileReader { let mut clusters: BTreeMap> = BTreeMap::new(); - let mut dir = tokio::fs::read_dir(&location).await?; + let mut dir = tokio::fs::read_dir(&location) + .await + .context(format!("failed to find location: {}", &location.display()))?; while let Some(dir_entry) = dir.next_entry().await? { if dir_entry.metadata().await?.is_dir() { tracing::trace!("found cluster in: {}", dir_entry.path().display()); diff --git a/crates/flux-releaser/src/services/file_reader/extensions.rs b/crates/flux-releaser/src/services/file_reader/extensions.rs index 7052b6a..63b51e4 100644 --- a/crates/flux-releaser/src/services/file_reader/extensions.rs +++ b/crates/flux-releaser/src/services/file_reader/extensions.rs @@ -1,4 +1,4 @@ -use crate::app::SharedApp; +use crate::app::{SharedApp, SharedLocalApp}; use super::FileReader; @@ -11,3 +11,9 @@ impl FileReaderExt for SharedApp { FileReader::new() } } + +impl FileReaderExt for SharedLocalApp { + fn file_reader(&self) -> FileReader { + FileReader::new() + } +} diff --git a/crates/flux-releaser/src/services/flux_local_cluster.rs b/crates/flux-releaser/src/services/flux_local_cluster.rs new file mode 100644 index 0000000..f9fab03 --- /dev/null +++ b/crates/flux-releaser/src/services/flux_local_cluster.rs @@ -0,0 +1,91 @@ +use std::path::PathBuf; + +use crate::{app::infra::grpc::FluxReleaserGrpcClient, grpc::gen::CommitArtifactRequest}; + +use super::{ + archive::Archive, + file_reader::FileReader, + flux_releaser_uploader::FluxReleaserUploader, + release_manager::models::{ArtifactID, UploadArtifactID}, +}; + +pub struct FluxLocalClusterManager { + file_reader: FileReader, + archive: Archive, + flux_uploader: FluxReleaserUploader, + flux_releaser_client: FluxReleaserGrpcClient, +} + +impl FluxLocalClusterManager { + pub fn new( + file_reader: FileReader, + archive: Archive, + flux_uploader: FluxReleaserUploader, + flux_releaser_client: FluxReleaserGrpcClient, + ) -> Self { + Self { + file_reader, + archive, + flux_uploader, + flux_releaser_client, + } + } + + pub async fn package_clusters( + &self, + include: impl Into, + ) -> anyhow::Result { + let files = self.file_reader.read_files(include.into()).await?; + let archive = self.archive.create_archive(files).await?; + let upload_id = self.flux_uploader.upload_archive(archive).await?; + + Ok(upload_id) + } + + pub async fn commit_artifact( + &self, + app: impl Into, + branch: impl Into, + upload_id: UploadArtifactID, + ) -> anyhow::Result { + let artifact_id = self + .flux_releaser_client + .lock() + .await + .commit_artifact(tonic::Request::new(CommitArtifactRequest { + app: app.into(), + branch: branch.into(), + upload_id: upload_id.to_string(), + })) + .await?; + + artifact_id.into_inner().artifact_id.try_into() + } +} + +pub mod extensions { + use crate::{ + app::SharedLocalApp, + services::{ + archive::extensions::ArchiveExt, file_reader::extensions::FileReaderExt, + flux_releaser_uploader::extensions::FluxReleaserUploaderExt, + }, + }; + + use super::FluxLocalClusterManager; + + pub trait FluxLocalClusterManagerExt { + fn flux_local_cluster_manager(&self) -> FluxLocalClusterManager; + } + + impl FluxLocalClusterManagerExt for SharedLocalApp { + fn flux_local_cluster_manager(&self) -> FluxLocalClusterManager { + FluxLocalClusterManager::new( + self.file_reader(), + self.archive(), + self.flux_releaser_uploader(), + self.grpc_client.clone(), + ) + } + } +} diff --git a/crates/flux-releaser/src/services/flux_releaser_uploader.rs b/crates/flux-releaser/src/services/flux_releaser_uploader.rs new file mode 100644 index 0000000..e7b8a64 --- /dev/null +++ b/crates/flux-releaser/src/services/flux_releaser_uploader.rs @@ -0,0 +1,53 @@ +use tonic::transport::Channel; + +use crate::{ + app::infra::grpc::FluxReleaserGrpcClient, + grpc::gen::{flux_releaser_client::FluxReleaserClient, UploadArtifactRequest}, +}; + +use super::{archive::ArchiveFile, release_manager::models::UploadArtifactID}; + +pub struct FluxReleaserUploader { + flux_client: FluxReleaserGrpcClient, +} + +impl FluxReleaserUploader { + pub fn new(flux_client: FluxReleaserGrpcClient) -> Self { + Self { flux_client } + } + + pub async fn upload_archive(&self, archive: ArchiveFile) -> anyhow::Result { + let chunks = archive + .content + .chunks(1_000_000) // Slice by about 1MB + .map(|ch| UploadArtifactRequest { + content: ch.to_vec(), + }) + .collect::>(); + + let resp = self + .flux_client + .lock() + .await + .upload_artifact(tonic::Request::new(tokio_stream::iter(chunks))) + .await?; + + resp.into_inner().upload_id.try_into() + } +} + +pub mod extensions { + use crate::app::SharedLocalApp; + + use super::FluxReleaserUploader; + + pub trait FluxReleaserUploaderExt { + fn flux_releaser_uploader(&self) -> FluxReleaserUploader; + } + + impl FluxReleaserUploaderExt for SharedLocalApp { + fn flux_releaser_uploader(&self) -> FluxReleaserUploader { + FluxReleaserUploader::new(self.grpc_client.clone()) + } + } +} diff --git a/crates/flux-releaser/src/services/release_manager.rs b/crates/flux-releaser/src/services/release_manager.rs index c947fe4..c55bc96 100644 --- a/crates/flux-releaser/src/services/release_manager.rs +++ b/crates/flux-releaser/src/services/release_manager.rs @@ -48,12 +48,10 @@ impl ReleaseManager { tracing::debug!("committing artifact: {:?}", request); let artifact_id = ArtifactID::new(); - let files = self.file_reader.read_files(request.folder).await?; - - let archive = self.archive.create_archive(files).await?; + let artifact = self.file_store.get_temp(request.upload_id).await?; self.file_store - .upload_archive(artifact_id.clone(), archive) + .upload_file(artifact_id.clone(), artifact) .await?; self.domain_events diff --git a/crates/flux-releaser/src/services/release_manager/models.rs b/crates/flux-releaser/src/services/release_manager/models.rs index 0f6f480..4fba5c3 100644 --- a/crates/flux-releaser/src/services/release_manager/models.rs +++ b/crates/flux-releaser/src/services/release_manager/models.rs @@ -4,7 +4,7 @@ use std::{ops::Deref, path::PathBuf}; pub struct CommitArtifact { pub app: String, pub branch: String, - pub folder: PathBuf, + pub upload_id: UploadArtifactID, } #[derive(Debug, Clone)] @@ -44,6 +44,7 @@ impl From for UploadArtifact { } } +#[derive(Clone, Debug)] pub struct UploadArtifactID(uuid::Uuid); impl From for UploadArtifactID { fn from(value: uuid::Uuid) -> Self { diff --git a/crates/flux-releaser/testdata/flux_local_cluster/clusters/some_cluster/some_file.yaml b/crates/flux-releaser/testdata/flux_local_cluster/clusters/some_cluster/some_file.yaml new file mode 100644 index 0000000..e69de29 diff --git a/crates/flux-releaser/testdata/flux_local_cluster/clusters/some_other_cluster/some_other_file.yaml b/crates/flux-releaser/testdata/flux_local_cluster/clusters/some_other_cluster/some_other_file.yaml new file mode 100644 index 0000000..e69de29 diff --git a/crates/flux-releaser/tests/flux_local_cluster_manager_tests.rs b/crates/flux-releaser/tests/flux_local_cluster_manager_tests.rs new file mode 100644 index 0000000..ec760a1 --- /dev/null +++ b/crates/flux-releaser/tests/flux_local_cluster_manager_tests.rs @@ -0,0 +1,14 @@ +use flux_releaser::services::{ + archive::Archive, file_reader::FileReader, flux_local_cluster::FluxLocalClusterManager, +}; + +#[tokio::test] +async fn can_package_files() -> anyhow::Result<()> { + tracing_subscriber::fmt().init(); + + //let sut = FluxLocalClusterManager::new(FileReader::new(), Archive::new()); + + //sut.package_clusters("testdata/flux_local_cluster/").await?; + + Ok(()) +} diff --git a/crates/flux-releaser/tests/publish_artifacts.rs b/crates/flux-releaser/tests/publish_artifacts.rs index 0375446..293a772 100644 --- a/crates/flux-releaser/tests/publish_artifacts.rs +++ b/crates/flux-releaser/tests/publish_artifacts.rs @@ -1,19 +1,20 @@ use std::{ - env::temp_dir, net::{Ipv4Addr, SocketAddr}, + sync::Arc, time::Duration, }; use anyhow::Context; use flux_releaser::{ - app::SharedApp, - grpc::gen::{ - flux_releaser_client::FluxReleaserClient, CommitArtifactRequest, UploadArtifactRequest, + app::{LocalApp, SharedApp, SharedLocalApp}, + grpc::gen::flux_releaser_client::FluxReleaserClient, + services::{ + archive::ArchiveFile, file_store::extensions::FileStoreExt, + flux_local_cluster::extensions::FluxLocalClusterManagerExt, + flux_releaser_uploader::FluxReleaserUploader, }, - services::file_store::extensions::FileStoreExt, }; -use rand::{thread_rng, Rng}; -use tokio::{net::TcpListener, runtime::Runtime, time::sleep}; +use tokio::{net::TcpListener, runtime::Runtime, sync::Mutex, time::sleep}; use uuid::Uuid; struct Server { @@ -139,35 +140,34 @@ async fn setup() -> anyhow::Result<(Endpoints, SharedApp)> { Ok(unsafe { (ENDPOINTS.clone().unwrap(), APP.clone().unwrap()) }) } +async fn local_setup(endpoints: Endpoints) -> anyhow::Result { + Ok(SharedLocalApp::new( + LocalApp::new(format!("http://{}", endpoints.grpc)).await?, + )) +} + #[tokio::test] async fn can_upload_artifact() -> anyhow::Result<()> { + std::env::set_var("RUST_LOG", "flux_releaser=trace"); let (endpoints, app) = setup().await?; - let mut client = FluxReleaserClient::connect(format!("http://{}", endpoints.grpc)).await?; + let client = FluxReleaserClient::connect(format!("http://{}", endpoints.grpc)).await?; + let client = FluxReleaserUploader::new(Arc::new(Mutex::new(client))); - let mut bytes: [u8; 10000] = [0; 10000]; - - thread_rng().fill(&mut bytes[..]); - - let chunks = bytes - .chunks(bytes.len() / 100) - .map(|ch| UploadArtifactRequest { - content: ch.to_vec(), + let bytes: Vec = vec![0; 10_000_000]; + let upload_id = client + .upload_archive(ArchiveFile { + content: bytes.clone(), }) - .collect::>(); + .await?; - let req = tonic::Request::new(tokio_stream::iter(chunks)); + assert!(!upload_id.to_string().is_empty()); - let resp = client.upload_artifact(req).await?; - - let upload_id = resp.into_inner().upload_id; - assert!(!upload_id.is_empty()); - - let actual_path = app.file_store().get_temp(upload_id.try_into()?).await?; + let actual_path = app.file_store().get_temp(upload_id).await?; let actual_content = tokio::fs::read(actual_path).await?; - assert_eq!(&bytes[..], actual_content.as_slice()); + assert_eq!(&bytes, &actual_content); Ok(()) } @@ -177,40 +177,26 @@ async fn can_publish_artifact() -> anyhow::Result<()> { std::env::set_var("RUST_LOG", "flux_releaser=trace"); let (endpoints, app) = setup().await?; + let local_app = local_setup(endpoints.clone()).await?; - let mut client = FluxReleaserClient::connect(format!("http://{}", endpoints.grpc)).await?; - - let test_id = Uuid::new_v4(); - - let temp = temp_dir() - .join("flux_releaser") - .join("tests") - .join(test_id.to_string()); - - let file_path = temp - .join("clusters") - .join(Uuid::new_v4().to_string()) - .join("some-file.yaml"); - tokio::fs::create_dir_all(file_path.parent().unwrap()).await?; - let _ = tokio::fs::File::create(file_path).await?; - - let resp = client - .commit_artifact(CommitArtifactRequest { - app: "some-app".into(), - branch: "some-branch".into(), - folder: temp.to_string_lossy().to_string(), - }) + let upload_id = local_app + .flux_local_cluster_manager() + .package_clusters("testdata/flux_local_cluster") .await?; - let artifact = resp.into_inner(); - - let archive = app - .file_store() - .get_archive(artifact.artifact_id.try_into()?) - .await?; + let archive = app.file_store().get_temp(upload_id.clone()).await?; assert!(archive.exists()); + let artifact_id = local_app + .flux_local_cluster_manager() + .commit_artifact("some-app", "some-branch", upload_id) + .await?; + + let artifact = app.file_store().get_archive(artifact_id).await?; + + assert!(artifact.exists()); + Ok(()) }