From c3739c1bc1a939ddaeba409c7759d9b3733c4f6f Mon Sep 17 00:00:00 2001 From: kjuulh Date: Sun, 18 Feb 2024 01:29:46 +0100 Subject: [PATCH] feat: can upload archives Signed-off-by: kjuulh --- Cargo.lock | 64 ++++++++ README.md | 3 + crates/flux-releaser/Cargo.toml | 2 + .../schemas/proto/flux_releaser.proto | 7 +- crates/flux-releaser/src/app.rs | 4 +- crates/flux-releaser/src/grpc.rs | 31 +++- crates/flux-releaser/src/services.rs | 2 + crates/flux-releaser/src/services/archive.rs | 139 ++++++++++++++++++ .../src/services/domain_events.rs | 2 +- .../flux-releaser/src/services/file_reader.rs | 117 +++++++++++++++ .../src/services/file_reader/extensions.rs | 14 ++ .../flux-releaser/src/services/file_store.rs | 18 ++- .../src/services/release_manager.rs | 37 ++++- .../services/release_manager/extensions.rs | 12 +- .../src/services/release_manager/models.rs | 3 + docs/architecture.md | 40 +++++ 16 files changed, 472 insertions(+), 23 deletions(-) create mode 100644 crates/flux-releaser/src/services/archive.rs create mode 100644 crates/flux-releaser/src/services/file_reader.rs create mode 100644 crates/flux-releaser/src/services/file_reader/extensions.rs create mode 100644 docs/architecture.md diff --git a/Cargo.lock b/Cargo.lock index ad5f80a..7c2ccec 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1149,6 +1149,18 @@ version = "0.2.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1676f435fc1dadde4d03e43f5d62b259e1ce5f40bd4ffb21db2b42ebe59c1382" +[[package]] +name = "filetime" +version = "0.2.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ee447700ac8aa0b2f2bd7bc4462ad686ba06baa6727ac149a2d6277f0d240fd" +dependencies = [ + "cfg-if", + "libc", + "redox_syscall", + "windows-sys 0.52.0", +] + [[package]] name = "fixedbitset" version = "0.4.2" @@ -1172,12 +1184,14 @@ dependencies = [ "prost", "serde", "serde_json", + "tar", "tokio", "tonic", "tonic-build", "tracing", "tracing-subscriber", "uuid", + "walkdir", ] [[package]] @@ -2331,6 +2345,15 @@ version = "1.0.16" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f98d2aa92eebf49b69786be48e4477826b256916e84a57ff2a4f21923b48eb4c" +[[package]] +name = "same-file" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "93fc1dc3aaa9bfed95e02e6eadabb4baf7e3078b0bd1b4d7b6b0b68378900502" +dependencies = [ + "winapi-util", +] + [[package]] name = "schannel" version = "0.1.23" @@ -2630,6 +2653,17 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160" +[[package]] +name = "tar" +version = "0.4.40" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b16afcea1f22891c49a00c751c7b63b2233284064f11a200fc624137c51e2ddb" +dependencies = [ + "filetime", + "libc", + "xattr", +] + [[package]] name = "tempfile" version = "3.10.0" @@ -3005,6 +3039,16 @@ version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5c3082ca00d5a5ef149bb8b555a72ae84c9c59f7250f013ac822ac2e49b19c64" +[[package]] +name = "walkdir" +version = "2.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d71d857dc86794ca4c280d616f7da00d2dbfd8cd788846559a6813e6aa4b54ee" +dependencies = [ + "same-file", + "winapi-util", +] + [[package]] name = "want" version = "0.3.1" @@ -3112,6 +3156,15 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" +[[package]] +name = "winapi-util" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f29e6f9198ba0d26b4c9f07dbe6f9ed633e1f3d5b8b414090084349e46a52596" +dependencies = [ + "winapi", +] + [[package]] name = "winapi-x86_64-pc-windows-gnu" version = "0.4.0" @@ -3250,6 +3303,17 @@ version = "0.52.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dff9641d1cd4be8d1a070daf9e3773c5f67e78b4d9d42263020c057706765c04" +[[package]] +name = "xattr" +version = "1.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8da84f1a25939b27f6820d92aed108f83ff920fdf11a7b19366c27c4cda81d4f" +dependencies = [ + "libc", + "linux-raw-sys", + "rustix", +] + [[package]] name = "xmlparser" version = "0.13.6" diff --git a/README.md b/README.md index 4cf0137..b3a9167 100644 --- a/README.md +++ b/README.md @@ -17,6 +17,9 @@ flux-releaser commit \ --registry https://flux-releaser.i.kjuulh.io ``` +!!! note + include path should include a sub folder for each releaseable artifact + flux releaser can also be used as a library, which is especially useful if integrated with dagger Now you can release or auto-release this your desired environment and app diff --git a/crates/flux-releaser/Cargo.toml b/crates/flux-releaser/Cargo.toml index d471367..2950429 100644 --- a/crates/flux-releaser/Cargo.toml +++ b/crates/flux-releaser/Cargo.toml @@ -21,6 +21,8 @@ aws-sdk-s3 = { version = "1.15.0", features = ["behavior-version-latest"] } serde = { version = "1.0.196", features = ["derive"] } serde_json = "1.0.113" nats = "0.24.1" +walkdir = "2.4.0" +tar = "0.4.40" [build-dependencies] tonic-build = "0.11.0" diff --git a/crates/flux-releaser/schemas/proto/flux_releaser.proto b/crates/flux-releaser/schemas/proto/flux_releaser.proto index 59fccc3..c79d931 100644 --- a/crates/flux-releaser/schemas/proto/flux_releaser.proto +++ b/crates/flux-releaser/schemas/proto/flux_releaser.proto @@ -3,16 +3,15 @@ syntax = "proto3"; package flux_releaser; service Greeter { - // Sends a greeting rpc SayHello (HelloRequest) returns (HelloReply) {} } -// The request message containing the user's name. message HelloRequest { - string name = 1; + string app = 1; + string branch = 2; + string folder = 3; // Change to files instead } -// The response message containing the greetings. message HelloReply { string message = 1; } diff --git a/crates/flux-releaser/src/app.rs b/crates/flux-releaser/src/app.rs index fd62f1c..882184c 100644 --- a/crates/flux-releaser/src/app.rs +++ b/crates/flux-releaser/src/app.rs @@ -1,7 +1,7 @@ use std::{ops::Deref, sync::Arc}; -use aws_config::{BehaviorVersion, Region}; -use aws_sdk_s3::config::Credentials; + + use self::infra::aws_s3::s3_client; diff --git a/crates/flux-releaser/src/grpc.rs b/crates/flux-releaser/src/grpc.rs index d37a4fb..06ad02d 100644 --- a/crates/flux-releaser/src/grpc.rs +++ b/crates/flux-releaser/src/grpc.rs @@ -33,11 +33,12 @@ impl greeter_server::Greeter for FluxReleaserGrpc { &self, request: tonic::Request, ) -> std::result::Result, tonic::Status> { + let req = request.into_inner(); self.release_manager - .commit_artifact(CommitArtifact { - app: "some-app".into(), - branch: "some-branch".into(), - }) + .commit_artifact( + req.try_into() + .map_err(|e: anyhow::Error| tonic::Status::invalid_argument(e.to_string()))?, + ) .await .unwrap(); @@ -47,6 +48,28 @@ impl greeter_server::Greeter for FluxReleaserGrpc { } } +impl TryFrom for CommitArtifact { + type Error = anyhow::Error; + + fn try_from(value: HelloRequest) -> Result { + if value.app.is_empty() { + anyhow::bail!("app cannot be empty") + } + if value.branch.is_empty() { + anyhow::bail!("branch cannot be empty") + } + if value.folder.is_empty() { + anyhow::bail!("folder cannot be empty") + } + + Ok(Self { + app: value.app, + branch: value.branch, + folder: value.folder.into(), + }) + } +} + pub async fn tonic_serve(host: SocketAddr, app: SharedApp) -> anyhow::Result<()> { tracing::info!("grpc listening on: {}", host); Server::builder() diff --git a/crates/flux-releaser/src/services.rs b/crates/flux-releaser/src/services.rs index 5338f9c..267f293 100644 --- a/crates/flux-releaser/src/services.rs +++ b/crates/flux-releaser/src/services.rs @@ -1,3 +1,5 @@ +mod archive; mod domain_events; +mod file_reader; mod file_store; pub mod release_manager; diff --git a/crates/flux-releaser/src/services/archive.rs b/crates/flux-releaser/src/services/archive.rs new file mode 100644 index 0000000..3ea37e9 --- /dev/null +++ b/crates/flux-releaser/src/services/archive.rs @@ -0,0 +1,139 @@ +#[derive(Clone)] +pub struct Archive {} + +use std::{io::Cursor, path::Path}; + +pub mod extensions { + + use std::{env::temp_dir, path::PathBuf}; + + use async_trait::async_trait; + use tokio::io::AsyncWriteExt; + use uuid::Uuid; + + use crate::{app::SharedApp, services::release_manager::models::ArtifactID}; + + #[mockall_double::double] + use crate::services::file_store::FileStore; + + #[mockall_double::double] + use super::Archive; + use super::ArchiveFile; + + pub trait ArchiveExt { + fn archive(&self) -> Archive; + } + + impl ArchiveExt for SharedApp { + fn archive(&self) -> Archive { + Archive::new() + } + } + + #[async_trait] + pub trait ArchiveUploadExt { + async fn upload_archive( + &self, + artifact_id: ArtifactID, + archive: ArchiveFile, + ) -> anyhow::Result<()>; + } + + #[async_trait] + impl ArchiveUploadExt for FileStore { + async fn upload_archive( + &self, + artifact_id: ArtifactID, + archive: ArchiveFile, + ) -> anyhow::Result<()> { + tracing::trace!("uploading archive: {}", artifact_id.to_string()); + let suffix = Uuid::new_v4(); + let temp_root = temp_dir(); + + let archive_path = temp_root + .join("flux-releaser") + .join("archives") + .join(format!("{}-{}.tar", &artifact_id.to_string(), suffix)); + + let archive_file_guard = ArchiveFilePath(archive_path.clone()); + + tokio::fs::create_dir_all(archive_path.parent().unwrap()).await?; + + { + let mut archive_file = tokio::fs::File::create(&archive_path).await?; + + archive_file.write_all(&archive.content).await?; + archive_file.flush().await?; + } + + self.upload_file(artifact_id, archive_path).await?; + + drop(archive_file_guard); + + Ok(()) + } + } + + pub struct ArchiveFilePath(PathBuf); + + // make sure we delete the archive file when we're done with it, we don't want it sticking around longer than it needs to + impl Drop for ArchiveFilePath { + fn drop(&mut self) { + let file_path = self.0.clone(); + tokio::spawn(async move { + if file_path.exists() { + tracing::trace!("deleting archive: {}", file_path.display()); + if let Err(e) = tokio::fs::remove_file(&file_path).await { + tracing::error!("failed to delete archive: {}", e); + } + } + }); + } + } +} + +#[cfg(test)] +use mockall::{automock, mock, predicate::*}; + +use super::file_reader::{File, Files}; + +#[cfg_attr(test, automock)] +impl Archive { + pub fn new() -> Self { + Self {} + } + + pub async fn create_archive(&self, files: Files) -> anyhow::Result { + tracing::trace!("archiving files"); + + let buffer = Vec::new(); + let cursor = Cursor::new(buffer); + + let mut tar_builder = tar::Builder::new(cursor); + + for file in files { + let abs_file_path = file.path; + + tracing::trace!("archiving file: {}", abs_file_path.display()); + let mut fd = std::fs::File::open(&abs_file_path)?; + tar_builder.append_file(&abs_file_path, &mut fd)?; + } + + tar_builder.finish()?; + + let cursor = tar_builder.into_inner()?; + let buffer = cursor.into_inner(); + + Ok(buffer.into()) + } +} + +pub struct ArchiveFile { + content: Vec, +} + +impl From> for ArchiveFile { + fn from(value: Vec) -> Self { + Self { content: value } + } +} diff --git a/crates/flux-releaser/src/services/domain_events.rs b/crates/flux-releaser/src/services/domain_events.rs index a2e2094..c264f1e 100644 --- a/crates/flux-releaser/src/services/domain_events.rs +++ b/crates/flux-releaser/src/services/domain_events.rs @@ -8,7 +8,7 @@ pub struct DomainEvents { #[cfg(test)] use mockall::{automock, mock, predicate::*}; -use crate::app::infra::{self, nats::Nats}; +use crate::app::infra::{nats::Nats}; #[cfg_attr(test, automock)] impl DomainEvents { diff --git a/crates/flux-releaser/src/services/file_reader.rs b/crates/flux-releaser/src/services/file_reader.rs new file mode 100644 index 0000000..6f4bfff --- /dev/null +++ b/crates/flux-releaser/src/services/file_reader.rs @@ -0,0 +1,117 @@ +#[derive(Clone)] +pub struct FileReader {} + +use std::{collections::BTreeMap, path::PathBuf}; + +pub mod extensions; + +use anyhow::anyhow; +#[cfg(test)] +use mockall::{automock, mock, predicate::*}; + +#[cfg_attr(test, automock)] +impl FileReader { + pub fn new() -> Self { + Self {} + } + + pub async fn read_files(&self, location: PathBuf) -> anyhow::Result { + tracing::trace!("reading files: {}", location.display()); + + let mut clusters: BTreeMap> = BTreeMap::new(); + + let mut dir = tokio::fs::read_dir(&location).await?; + while let Some(dir_entry) = dir.next_entry().await? { + if dir_entry.metadata().await?.is_dir() { + tracing::trace!("found cluster in: {}", dir_entry.path().display()); + clusters.insert( + dir_entry + .file_name() + .into_string() + .map_err(|_| anyhow!("failed to convert file name to string"))?, + Vec::new(), + ); + } + } + + for (cluster_name, files) in clusters.iter_mut() { + for file in walkdir::WalkDir::new(location.join(cluster_name)) + .into_iter() + .flatten() + { + if !file.file_type().is_file() { + continue; + } + + tracing::trace!( + "adding file: {} to cluster: {}", + file.path().display(), + cluster_name + ); + + files.push(file.into_path().into()) + } + } + + Ok(clusters.into()) + } +} + +#[derive(Debug, Clone)] +pub struct File { + pub path: PathBuf, +} + +impl From for File { + fn from(value: PathBuf) -> Self { + Self { path: value } + } +} + +#[derive(Default, Clone, Debug)] +pub struct Files { + inner: BTreeMap>, +} + +impl From>> for Files { + fn from(value: BTreeMap>) -> Self { + Self { inner: value } + } +} + +impl std::ops::Deref for Files { + type Target = BTreeMap>; + + fn deref(&self) -> &Self::Target { + &self.inner + } +} + +impl From for Vec { + fn from(value: Files) -> Self { + value + .iter() + .map(|(cluster_name, files)| (PathBuf::from(cluster_name), files)) + .flat_map(|(cluster_name, files)| { + files + .iter() + //.map(|file_path| cluster_name.join(&file_path.path)) + .map(|file_path| file_path.path.clone()) + .collect::>() + }) + .map(|f| f.into()) + .collect::>() + } +} + +impl IntoIterator for Files { + type Item = File; + + type IntoIter = std::vec::IntoIter; + + fn into_iter(self) -> Self::IntoIter { + let files: Vec = self.into(); + + files.into_iter() + } +} diff --git a/crates/flux-releaser/src/services/file_reader/extensions.rs b/crates/flux-releaser/src/services/file_reader/extensions.rs new file mode 100644 index 0000000..d3a1192 --- /dev/null +++ b/crates/flux-releaser/src/services/file_reader/extensions.rs @@ -0,0 +1,14 @@ +use crate::app::SharedApp; + +#[mockall_double::double] +use super::FileReader; + +pub trait FileReaderExt { + fn file_reader(&self) -> FileReader; +} + +impl FileReaderExt for SharedApp { + fn file_reader(&self) -> FileReader { + FileReader::new() + } +} diff --git a/crates/flux-releaser/src/services/file_store.rs b/crates/flux-releaser/src/services/file_store.rs index b642e05..68ece0f 100644 --- a/crates/flux-releaser/src/services/file_store.rs +++ b/crates/flux-releaser/src/services/file_store.rs @@ -1,4 +1,4 @@ -use std::{path::PathBuf, sync::Arc}; +use std::path::PathBuf; use super::release_manager::models::ArtifactID; @@ -9,8 +9,10 @@ pub struct FileStore { client: aws_sdk_s3::Client, } +use aws_sdk_s3::primitives::ByteStream; #[cfg(test)] use mockall::{automock, mock, predicate::*}; +use tokio::io::AsyncReadExt; #[cfg_attr(test, automock)] impl FileStore { @@ -18,13 +20,17 @@ impl FileStore { Self { client } } - pub async fn upload_files( - &self, - artifact_id: ArtifactID, - files: Vec, - ) -> anyhow::Result<()> { + pub async fn upload_file(&self, artifact_id: ArtifactID, file: PathBuf) -> anyhow::Result<()> { tracing::trace!("uploading files: {}", artifact_id.to_string()); + self.client + .put_object() + .bucket("mybucket") + .key(format!("archives/{}.tar", &artifact_id.to_string())) + .body(ByteStream::from_path(file).await?) + .send() + .await?; + Ok(()) } } diff --git a/crates/flux-releaser/src/services/release_manager.rs b/crates/flux-releaser/src/services/release_manager.rs index c98e930..8795457 100644 --- a/crates/flux-releaser/src/services/release_manager.rs +++ b/crates/flux-releaser/src/services/release_manager.rs @@ -1,21 +1,35 @@ use serde::Serialize; +use crate::services::archive::extensions::ArchiveUploadExt; #[mockall_double::double] use crate::services::file_store::FileStore; +#[mockall_double::double] +use super::archive::Archive; #[mockall_double::double] use super::domain_events::DomainEvents; +#[mockall_double::double] +use super::file_reader::FileReader; use self::models::{ArtifactID, CommitArtifact}; pub struct ReleaseManager { + archive: Archive, + file_reader: FileReader, file_store: FileStore, domain_events: DomainEvents, } impl ReleaseManager { - pub fn new(file_store: FileStore, domain_events: DomainEvents) -> Self { + pub fn new( + file_reader: FileReader, + file_store: FileStore, + archive: Archive, + domain_events: DomainEvents, + ) -> Self { Self { + archive, + file_reader, file_store, domain_events, } @@ -25,8 +39,12 @@ impl ReleaseManager { tracing::debug!("committing artifact: {:?}", request); let artifact_id = ArtifactID::new(); + let files = self.file_reader.read_files(request.folder).await?; + + let archive = self.archive.create_archive(files).await?; + self.file_store - .upload_files(artifact_id.clone(), Vec::new()) + .upload_archive(artifact_id.clone(), archive) .await?; self.domain_events @@ -49,7 +67,9 @@ pub mod models; #[cfg(test)] mod test { + use crate::services::archive::MockArchive; use crate::services::domain_events::MockDomainEvents; + use crate::services::file_reader::{Files, MockFileReader}; use crate::services::file_store::MockFileStore; use super::*; @@ -58,7 +78,7 @@ mod test { async fn generated_artifact_id() -> anyhow::Result<()> { let mut file_store = MockFileStore::default(); file_store - .expect_upload_files() + .expect_upload_file() .times(1) .returning(|_, _| Ok(())); @@ -68,12 +88,21 @@ mod test { .times(1) .returning(|_| Ok(())); - let releaser_manager = ReleaseManager::new(file_store, domain_events); + let mut file_reader = MockFileReader::default(); + file_reader + .expect_read_files() + .times(1) + .returning(|_| Ok(Files::default())); + + let mut archive = MockArchive::default(); + + let releaser_manager = ReleaseManager::new(file_reader, file_store, archive, domain_events); releaser_manager .commit_artifact(CommitArtifact { app: "app".into(), branch: "branch".into(), + folder: "someFolder".into(), }) .await?; diff --git a/crates/flux-releaser/src/services/release_manager/extensions.rs b/crates/flux-releaser/src/services/release_manager/extensions.rs index 0030be5..43ff52d 100644 --- a/crates/flux-releaser/src/services/release_manager/extensions.rs +++ b/crates/flux-releaser/src/services/release_manager/extensions.rs @@ -1,6 +1,9 @@ use crate::{ app::SharedApp, - services::{domain_events::extensions::DomainEventsExt, file_store::extensions::FileStoreExt}, + services::{ + archive::extensions::ArchiveExt, domain_events::extensions::DomainEventsExt, + file_reader::extensions::FileReaderExt, file_store::extensions::FileStoreExt, + }, }; use super::ReleaseManager; @@ -11,6 +14,11 @@ pub trait ReleaseManagerExt { impl ReleaseManagerExt for SharedApp { fn release_manager(&self) -> ReleaseManager { - ReleaseManager::new(self.file_store(), self.domain_events()) + ReleaseManager::new( + self.file_reader(), + self.file_store(), + self.archive(), + self.domain_events(), + ) } } diff --git a/crates/flux-releaser/src/services/release_manager/models.rs b/crates/flux-releaser/src/services/release_manager/models.rs index 6cd69cc..83c25fa 100644 --- a/crates/flux-releaser/src/services/release_manager/models.rs +++ b/crates/flux-releaser/src/services/release_manager/models.rs @@ -1,7 +1,10 @@ +use std::path::PathBuf; + #[derive(Clone, Debug)] pub struct CommitArtifact { pub app: String, pub branch: String, + pub folder: PathBuf, } #[derive(Debug, Clone)] diff --git a/docs/architecture.md b/docs/architecture.md new file mode 100644 index 0000000..12f78ee --- /dev/null +++ b/docs/architecture.md @@ -0,0 +1,40 @@ +# Architecture + +Flux releaser is structured to support how flux2 recommends structuring a flux project. This means that we will end up with a structure like so + +``` +clusters//kustomizations +/kubernetes manifests +``` + +Flux releaser goes a small step further in how it opinionates its folders + +First of all each entry in flux-releaser is required to have a unique app and namespace, this is important for guaranteeing unique ness, otherwise flux-releaser can replace services it didn't intend to. + +Flux releaser currently makes no guarantee that a namespace and app name is not taken over by another party, this is up to you to control access to the flux gitops repository + +As such this means that flux-releaser will place folders in + +``` +clusters////kustomizations +///kubernetes manifests +``` + +This means that each app can have one or more `clusters`, `namespaces` but only one `app`, but only one `namespace` pr `cluster`. + +The way flux-releaser stages its commits, is that the producer (usually CI) prepares a setup folder. This mirrors the structure above + + +``` +/ + clusters/ + / + / + +``` + +This will then all be bundled up in a tar ball and staged under a specific artifact id, along with some metadata about the release, (when it happened, whom the committer was, which branch it originated from, which service it belongs to etc). + +When a flux release is processed it will be triggered via. a branch, the app name, the bundled artifact will then be applied on top of the gitops registry and finally released. + +