feat: add ability to actually publish
Some checks failed
continuous-integration/drone/push Build is failing

Signed-off-by: kjuulh <contact@kjuulh.io>
This commit is contained in:
Kasper Juul Hermansen 2024-05-26 13:37:49 +02:00
parent 0a258829f7
commit 5ce33b379e
Signed by: kjuulh
GPG Key ID: 9AA7BC13CE474394
12 changed files with 487 additions and 4 deletions

70
Cargo.lock generated
View File

@ -807,6 +807,11 @@ name = "cc"
version = "1.0.98"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "41c270e7540d725e65ac7f1b212ac8ce349719624d7bcff99f8e2e488e8cf03f"
dependencies = [
"jobserver",
"libc",
"once_cell",
]
[[package]]
name = "cfg-if"
@ -1310,6 +1315,7 @@ dependencies = [
"chrono",
"clap",
"dotenv",
"git2",
"lazy_static",
"nats",
"prost",
@ -1468,6 +1474,21 @@ version = "0.28.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4271d37baee1b8c7e4b708028c57d816cf9d2434acb33a549475f78c181f6253"
[[package]]
name = "git2"
version = "0.18.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "232e6a7bfe35766bf715e55a88b39a700596c0ccfd88cd3680b4cdb40d66ef70"
dependencies = [
"bitflags 2.5.0",
"libc",
"libgit2-sys",
"log",
"openssl-probe",
"openssl-sys",
"url",
]
[[package]]
name = "group"
version = "0.12.1"
@ -1833,6 +1854,15 @@ version = "1.0.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "49f1f14873335454500d59611f1cf4a4b0f786f9ac11f4312a78e4cf2566695b"
[[package]]
name = "jobserver"
version = "0.1.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d2b099aaa34a9751c5bf0878add70444e1ed2dd73f347be99003d4577277de6e"
dependencies = [
"libc",
]
[[package]]
name = "js-sys"
version = "0.3.69"
@ -1863,6 +1893,20 @@ version = "0.2.155"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "97b3888a4aecf77e811145cadf6eef5901f4782c53886191b2f693f24761847c"
[[package]]
name = "libgit2-sys"
version = "0.16.2+1.7.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ee4126d8b4ee5c9d9ea891dd875cfdc1e9d0950437179104b183d7d8a74d24e8"
dependencies = [
"cc",
"libc",
"libssh2-sys",
"libz-sys",
"openssl-sys",
"pkg-config",
]
[[package]]
name = "libm"
version = "0.2.8"
@ -1880,6 +1924,32 @@ dependencies = [
"vcpkg",
]
[[package]]
name = "libssh2-sys"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2dc8a030b787e2119a731f1951d6a773e2280c660f8ec4b0f5e1505a386e71ee"
dependencies = [
"cc",
"libc",
"libz-sys",
"openssl-sys",
"pkg-config",
"vcpkg",
]
[[package]]
name = "libz-sys"
version = "1.1.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c15da26e5af7e25c90b37a2d75cdbf940cf4a55316de9d84c679c9b8bfabf82e"
dependencies = [
"cc",
"libc",
"pkg-config",
"vcpkg",
]
[[package]]
name = "linux-raw-sys"
version = "0.4.14"

View File

@ -26,6 +26,7 @@ tokio-stream = { version = "0.1.14", features = ["full"] }
rand = "0.8.5"
sqlx = { version = "0.7.3", features = ["postgres", "runtime-tokio", "uuid", "chrono"] }
chrono = "0.4.34"
git2 = "0.18.3"
[build-dependencies]
tonic-build = "0.11.0"

View File

@ -2,6 +2,12 @@ use std::{ops::Deref, sync::Arc};
use sqlx::{PgPool, Postgres};
use crate::services::{
archive::Archive,
cluster_list::ClusterList,
git::{Git, SharedGit},
};
use self::infra::{
aws_s3::s3_client,
grpc::{new_client, FluxReleaserGrpcClient},
@ -28,6 +34,7 @@ pub struct App {
pub s3_client: aws_sdk_s3::Client,
pub nats: infra::nats::Nats,
pub database: PgPool,
pub git: SharedGit,
}
impl App {
@ -36,6 +43,12 @@ impl App {
s3_client: s3_client().await?,
nats: infra::nats::Nats::new().await?,
database: infra::database::get_database().await?,
git: Git::new(
"ssh://git@git.front.kjuulh.io/kjuulh/clank-clusters.git".into(),
ClusterList::default(),
Archive::default(),
)
.into(),
})
}
}

View File

@ -1,9 +1,11 @@
pub mod archive;
pub mod cluster_list;
pub mod domain_events;
pub mod file_reader;
pub mod file_store;
pub mod flux_local_cluster;
pub mod flux_releaser_uploader;
pub mod git;
pub mod release_manager;
pub mod artifacts_db;

View File

@ -1,7 +1,10 @@
#[derive(Clone, Default)]
pub struct Archive {}
use std::{io::Cursor, path::Path};
use std::{
io::{Bytes, Cursor},
path::Path,
};
use anyhow::Context;
@ -45,6 +48,17 @@ impl Archive {
Ok(buffer.into())
}
pub async fn unpack_archive(&self, archive: &ArchiveFile, dest: &Path) -> anyhow::Result<()> {
tracing::trace!("unpacking archive: {}", dest.display());
let cursor = Cursor::new(archive.content.clone());
let mut arc = tar::Archive::new(cursor);
arc.unpack(dest)?;
Ok(())
}
}
pub struct ArchiveFile {

View File

@ -0,0 +1,23 @@
use std::collections::HashMap;
#[derive(Default)]
pub struct ClusterList {}
impl ClusterList {
pub async fn get_list(&self) -> anyhow::Result<HashMap<String, Vec<String>>> {
Ok(HashMap::from([
("dev".into(), vec!["clank-dev".into()]),
("prod".into(), vec!["clank-prod".into()]),
]))
}
pub async fn get(&self, environment: &str) -> anyhow::Result<Option<Vec<String>>> {
let list = self.get_list().await?;
if let Some(x) = list.get(environment) {
Ok(Some(x.clone()))
} else {
Ok(None)
}
}
}

View File

@ -0,0 +1,335 @@
use std::{
path::{Path, PathBuf},
sync::Arc,
};
use anyhow::Context;
use git2::{
build::{CheckoutBuilder, RepoBuilder},
Cred, FetchOptions, IndexAddOption, PushOptions, RemoteCallbacks, Repository, ResetType,
Signature,
};
use tokio::{io::AsyncWriteExt, sync::Mutex};
use super::{
archive::{Archive, ArchiveFile},
cluster_list::ClusterList,
};
#[derive(Clone)]
pub struct SharedGit {
inner: Arc<Git>,
}
impl From<Git> for SharedGit {
fn from(value: Git) -> Self {
Self {
inner: Arc::new(value),
}
}
}
impl std::ops::Deref for SharedGit {
type Target = Git;
fn deref(&self) -> &Self::Target {
&self.inner
}
}
pub struct Git {
location: Mutex<PathBuf>,
registry: String,
cluster_list: ClusterList,
archive: Archive,
}
impl Git {
pub fn new(registry: String, cluster_list: ClusterList, archive: Archive) -> Self {
Self {
registry,
location: Mutex::new(std::env::temp_dir().join("flux_releaser")),
cluster_list,
archive,
}
}
pub async fn publish(
&self,
archive: &ArchiveFile,
service: &str,
namespace: &str,
environment: &str,
) -> anyhow::Result<()> {
// TODO: implement exponential backoff and 5 attempts
self.publish_attempt(archive, service, namespace, environment)
.await?;
Ok(())
}
async fn publish_attempt(
&self,
archive: &ArchiveFile,
service: &str,
namespace: &str,
environment: &str,
) -> anyhow::Result<()> {
// 1. Clone repo into location (with lock) or update
let location = self.location.lock().await;
let repo_dir = location.join("repo");
let repo = match self.get_repo(&repo_dir).await? {
//TODO: Possible handle error by just recloning the repo
None => self.clone_repo(&repo_dir).await?,
Some(repo) => {
self.update_repo(repo).await?;
self.get_repo(&repo_dir)
.await?
.ok_or(anyhow::anyhow!("failed to open repository"))?
}
};
// 2. Extract from archive
// TODO: maybe pad archive with hash or something
let unpack_dir = location.join("archive");
self.archive.unpack_archive(archive, &unpack_dir).await?;
// 3. Splat tar over application and cluster
// The archive should always be structured like so:
// - <environment>/
// - * manifests
// 3a. prepare git repo for new files
let clusters = self
.cluster_list
.get(environment)
.await?
.ok_or(anyhow::anyhow!(
"environment is not registered: {} in cluster list",
environment
))?;
for cluster in clusters {
let service_entry = repo_dir
.join("deployments")
.join(&cluster)
.join(namespace)
.join(service);
if service_entry.exists() {
if let Err(e) = tokio::fs::remove_dir_all(&service_entry).await {
tracing::warn!("failed to remove existing dir: {}", e);
}
}
tokio::fs::create_dir_all(&service_entry).await?;
let cluster_entry = repo_dir
.join("clusters")
.join(&cluster)
.join(namespace)
.join(service);
if cluster_entry.exists() {
if let Err(e) = tokio::fs::remove_dir_all(&cluster_entry).await {
tracing::warn!("failed to remove existing dir: {}", e);
}
}
tokio::fs::create_dir_all(&cluster_entry).await?;
let archive_dir = unpack_dir.join(environment);
if !archive_dir.exists() {
anyhow::bail!("selected environment is not published for archive");
}
let mut read_dir = tokio::fs::read_dir(archive_dir).await?;
while let Some(entry) = read_dir.next_entry().await? {
if entry.metadata().await?.is_file() {
let entry_path = entry.path();
let dest_path = service_entry.join(entry.file_name());
tokio::fs::copy(entry_path, dest_path).await?;
}
}
let cluster_entry_file = cluster_entry.join(format!("{service}.yaml"));
let mut cluster_entry_file = tokio::fs::File::create(cluster_entry_file).await?;
let file_contents = format!(
r#"
apiVersion: kustomize.toolkit.fluxcd.io/v1beta2
kind: Kustomization
metadata:
name: {service}
namespace: flux-system
spec:
interval: 1h
retryInterval: 30s
path: ./deployments/{cluster}/{namespace}/{service}
prune: true
sourceRef:
kind: GitRepository
name: flux-system
namespace: flux-system
"#
);
cluster_entry_file
.write_all(file_contents.as_bytes())
.await?;
}
// 4. Commit && Push
self.commit_and_push(repo, environment, service).await?;
// 5. Cleanup
tokio::fs::remove_dir_all(unpack_dir).await?;
Ok(())
}
async fn get_repo(&self, location: &Path) -> anyhow::Result<Option<Repository>> {
match Repository::open(location) {
Ok(r) => Ok(Some(r)),
Err(e) => match e.code() {
git2::ErrorCode::NotFound => Ok(None),
_ => Err(e).context("failed to open git repository"),
},
}
}
async fn clone_repo(&self, location: &Path) -> anyhow::Result<Repository> {
let co = CheckoutBuilder::new();
let mut fo = FetchOptions::new();
fo.remote_callbacks(self.get_cred()?);
let repo = RepoBuilder::new()
.fetch_options(fo)
.with_checkout(co)
.clone(&self.registry, location)
.context("failed to clone repository")?;
Ok(repo)
}
async fn update_repo(&self, repository: Repository) -> anyhow::Result<()> {
let mut remote = repository.find_remote("origin")?;
let mut fo = FetchOptions::new();
fo.remote_callbacks(self.get_cred()?);
remote
.fetch(&["main"], Some(&mut fo), None)
.context("failed to update repo")?;
let origin_head = repository.find_reference("refs/remotes/origin/HEAD")?;
let origin_head_commit = origin_head.peel_to_commit()?;
// Perform a hard reset to the origin's HEAD
repository.reset(origin_head_commit.as_object(), ResetType::Hard, None)?;
Ok(())
}
fn get_cred(&self) -> anyhow::Result<RemoteCallbacks> {
let mut cb = RemoteCallbacks::new();
cb.credentials(|_, username, _| {
if let Ok(_sock) = std::env::var("SSH_AUTH_SOCK") {
return Cred::ssh_key_from_agent(username.unwrap_or("git"));
}
let username = std::env::var("GIT_USERNAME").expect("GIT_USERNAME to be set");
let password = std::env::var("GIT_PASSWORD").expect("GIT_PASSWORD to be set");
Cred::userpass_plaintext(&username, &password)
});
cb.certificate_check(|_cert, _| Ok(git2::CertificateCheckStatus::CertificateOk));
Ok(cb)
}
async fn commit_and_push(
&self,
repo: Repository,
environment: &str,
service: &str,
) -> anyhow::Result<()> {
let mut index = repo.index()?;
// Add all files to the index
index.add_all(["*"].iter(), IndexAddOption::DEFAULT, None)?;
index.write()?;
// Create a tree from the index
let oid = index.write_tree()?;
let tree = repo.find_tree(oid)?;
// Get the current HEAD commit
let parent_commit = repo.head()?.peel_to_commit()?;
// Create a signature
let sig = Signature::now("flux_releaser", "operations+flux-releaser@kjuulh.io")?;
// Create the commit
repo.commit(
Some("HEAD"),
&sig,
&sig,
&format!("chore({environment}/{service}): releasing service"),
&tree,
&[&parent_commit],
)?;
let mut remote = repo.find_remote("origin")?;
let mut po = PushOptions::new();
po.remote_callbacks(self.get_cred()?);
remote.push(
&[&format!("refs/heads/{}:refs/heads/{}", "main", "main")],
Some(&mut po),
)?;
Ok(())
}
}
#[cfg(test)]
mod test {
use tokio::io::AsyncReadExt;
use uuid::Uuid;
use crate::services::{
archive::{Archive, ArchiveFile},
cluster_list::ClusterList,
git::Git,
};
#[tokio::test]
async fn can_clone_upstream() -> anyhow::Result<()> {
let random = Uuid::new_v4().to_string();
println!("running test for id: {}", random);
println!("current_dir: {}", std::env::current_dir()?.display());
let mut arch = tokio::fs::File::open("testdata/example.tar").await?;
let mut dest = Vec::new();
arch.read_to_end(&mut dest).await?;
let git = Git::new(
"ssh://git@git.front.kjuulh.io/kjuulh/clank-clusters.git".into(),
ClusterList::default(),
Archive::default(),
);
let mut location = git.location.lock().await;
*location = location.join(random);
println!("into: {}", location.display());
drop(location);
git.publish(
&ArchiveFile { content: dest },
"flux-releaser-test",
"dev",
"dev",
)
.await?;
Ok(())
}
}

View File

@ -1,10 +1,12 @@
use serde::Serialize;
use crate::services::archive::{Archive, ArchiveFile};
use crate::services::artifacts_db::{AddCommitArtifact, GetLatestArtifact};
use crate::services::file_store::FileStore;
use super::artifacts_db::ArtifactsDB;
use super::domain_events::DomainEvents;
use super::git::SharedGit;
use self::models::*;
@ -12,6 +14,7 @@ pub struct ReleaseManager {
file_store: FileStore,
domain_events: DomainEvents,
artifacts_db: ArtifactsDB,
git: SharedGit,
}
impl ReleaseManager {
@ -19,11 +22,13 @@ impl ReleaseManager {
file_store: FileStore,
domain_events: DomainEvents,
artifacts_db: ArtifactsDB,
git: SharedGit,
) -> Self {
Self {
file_store,
domain_events,
artifacts_db,
git,
}
}
@ -94,11 +99,26 @@ impl ReleaseManager {
self.domain_events
.publish_event(&serde_json::to_string(&PublishedArtifactEvent {
artifact_id: latest_artifact.artifact_id.to_string(),
app: release_req.app,
branch: release_req.branch,
app: release_req.app.clone(),
branch: release_req.branch.clone(),
})?)
.await?;
let artifact_contents = tokio::fs::read(artifact).await?;
let env = if release_req.branch == "main" {
"prod"
} else {
"dev"
};
self.git
.publish(
&ArchiveFile::from(artifact_contents),
&release_req.app,
env,
env,
)
.await?;
Ok(())
}
}

View File

@ -14,6 +14,11 @@ pub trait ReleaseManagerExt {
impl ReleaseManagerExt for SharedApp {
fn release_manager(&self) -> ReleaseManager {
ReleaseManager::new(self.file_store(), self.domain_events(), self.artifacts_db())
ReleaseManager::new(
self.file_store(),
self.domain_events(),
self.artifacts_db(),
self.git.clone(),
)
}
}

Binary file not shown.