Compare commits

..

No commits in common. "main" and "feat/add-initial-publish" have entirely different histories.

42 changed files with 1031 additions and 3431 deletions

View File

@ -1,2 +1,113 @@
kind: template kind: pipeline
load: cuddle-rust-service-plan.yaml name: default
type: docker
steps:
- name: load_secret
image: debian:buster-slim
volumes:
- name: ssh
path: /root/.ssh/
environment:
SSH_KEY:
from_secret: gitea_id_ed25519
commands:
- mkdir -p $HOME/.ssh/
- echo "$SSH_KEY" | base64 -d > $HOME/.ssh/id_ed25519
- chmod -R 600 ~/.ssh
- |
cat >$HOME/.ssh/config <<EOL
Host git.front.kjuulh.io
IdentityFile $HOME/.ssh/id_ed25519
IdentitiesOnly yes
UserKnownHostsFile=/dev/null
StrictHostKeyChecking no
EOL
- chmod 700 ~/.ssh/config
- name: build pr
image: kasperhermansen/cuddle-rust-service-plan:main-1708174643
pull: always
volumes:
- name: ssh
path: /root/.ssh/
commands:
- eval `ssh-agent`
- ssh-add
- echo "$DOCKER_PASSWORD" | docker login --password-stdin --username="$DOCKER_USERNAME" docker.io
- export RUST_LOG=trace
- cuddle-rust-service-plan pr
environment:
DOCKER_BUILDKIT: 1
DOCKER_PASSWORD:
from_secret: docker_password
DOCKER_USERNAME:
from_secret: docker_username
CUDDLE_SECRETS_PROVIDER: 1password
CUDDLE_ONE_PASSWORD_DOT_ENV: ".env.ci"
CUDDLE_SSH_AGENT: "true"
CUDDLE_PLEASE_TOKEN:
from_secret: cuddle_please_token
OP_SERVICE_ACCOUNT_TOKEN:
from_secret: op_service_account_token
DOCKER_HOST: "tcp://192.168.1.233:2376"
when:
event:
- push
- pull_request
exclude:
- main
- master
depends_on:
- "load_secret"
- name: build main
image: kasperhermansen/cuddle-rust-service-plan:main-1708174643
pull: always
volumes:
- name: ssh
path: /root/.ssh/
commands:
- eval `ssh-agent`
- ssh-add
- echo "$DOCKER_PASSWORD" | docker login --password-stdin --username="$DOCKER_USERNAME" docker.io
- export RUST_LOG=trace
- cuddle-rust-service-plan main
environment:
REGISTRY_CACHE_USERNAME:
from_secret: registry_cache_username
REGISTRY_CACHE_PASSWORD:
from_secret: registry_cache_password
REGISTRY_CACHE_TOKEN:
from_secret: registry_cache_token
REGISTRY_CACHE_url:
from_secret: registry_cache_url
DOCKER_BUILDKIT: 1
DOCKER_PASSWORD:
from_secret: docker_password
DOCKER_USERNAME:
from_secret: docker_username
CUDDLE_SECRETS_PROVIDER: 1password
CUDDLE_ONE_PASSWORD_DOT_ENV: ".env.ci"
CUDDLE_SSH_AGENT: "true"
GIT_PASSWORD:
from_secret: git_password
DOCKER_HOST: "tcp://192.168.1.233:2376"
CUDDLE_PLEASE_TOKEN:
from_secret: cuddle_please_token
OP_SERVICE_ACCOUNT_TOKEN:
from_secret: op_service_account_token
when:
event:
- push
branch:
- main
- master
exclude:
- pull_request
depends_on:
- "load_secret"
volumes:
- name: ssh
temp: {}

2
.env
View File

@ -5,5 +5,3 @@ AWS_SECRET_ACCESS_KEY=minioadminpassword
NATS_URL=127.0.0.1:4222 NATS_URL=127.0.0.1:4222
NATS_USERNAME=user NATS_USERNAME=user
NATS_PASSWORD=secret NATS_PASSWORD=secret
DATABASE_URL=postgres://root@localhost:26257/defaultdb?sslmode=disable

2338
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -11,32 +11,21 @@ tracing-subscriber.workspace = true
clap.workspace = true clap.workspace = true
dotenv.workspace = true dotenv.workspace = true
axum.workspace = true axum.workspace = true
prost = "0.13.2" prost = "0.12.3"
tonic = { version = "0.12.2", features = ["tls", "tls-native-roots"] } tonic = "0.11.0"
uuid = { version = "1.7.0", features = ["v7", "v4"] } uuid = { version = "1.7.0", features = ["v7", "v4"] }
async-trait = "0.1.77" async-trait = "0.1.77"
aws-config = { version = "1.5.5", features = ["behavior-version-latest"] } mockall_double = "0.3.1"
aws-sdk-s3 = { version = "1.48.0", features = ["behavior-version-latest"] } aws-config = { version = "1.1.5", features = ["behavior-version-latest"] }
aws-sdk-s3 = { version = "1.15.0", features = ["behavior-version-latest"] }
serde = { version = "1.0.196", features = ["derive"] } serde = { version = "1.0.196", features = ["derive"] }
serde_json = "1.0.113" serde_json = "1.0.113"
nats = "0.25.0" nats = "0.24.1"
walkdir = "2.4.0" walkdir = "2.4.0"
tar = "0.4.40" tar = "0.4.40"
tokio-stream = { version = "0.1.15", features = ["full"] }
rand = "0.8.5"
sqlx = { version = "0.8.0", features = [
"postgres",
"runtime-tokio",
"uuid",
"chrono",
] }
chrono = "0.4.34"
git2 = "0.19.0"
rustls = { version = "0.23.12" }
[build-dependencies] [build-dependencies]
tonic-build = "0.12.0" tonic-build = "0.11.0"
[dev-dependencies] [dev-dependencies]
lazy_static = "1.4.0" mockall = "0.12.1"
reqwest = "0.12.0"

View File

@ -1,8 +0,0 @@
-- Add migration script here
CREATE TABLE IF NOT EXISTS artifacts (
app VARCHAR NOT NULL,
branch VARCHAR NOT NULL,
artifact_id UUID NOT NULL PRIMARY KEY,
created_at TIMESTAMP DEFAULT NOW(),
UNIQUE (app, artifact_id)
);

View File

@ -2,33 +2,16 @@ syntax = "proto3";
package flux_releaser; package flux_releaser;
service FluxReleaser { service Greeter {
rpc UploadArtifact (stream UploadArtifactRequest) returns (UploadArtifactResponse) {} rpc SayHello (HelloRequest) returns (HelloReply) {}
rpc CommitArtifact (CommitArtifactRequest) returns (CommitArtifactResponse) {}
rpc TriggerRelease (TriggerReleaseRequest) returns (TriggerReleaseResponse) {}
} }
message UploadArtifactRequest { message HelloRequest {
bytes content = 1;
}
message UploadArtifactResponse {
string upload_id = 1;
}
message CommitArtifactRequest {
string app = 1; string app = 1;
string branch = 2; string branch = 2;
string upload_id = 3; string folder = 3; // Change to files instead
} }
message CommitArtifactResponse { message HelloReply {
string artifact_id = 1; string message = 1;
} }
message TriggerReleaseRequest {
string app = 1;
string branch = 2;
}
message TriggerReleaseResponse {}

View File

@ -1,14 +1,11 @@
use std::net::SocketAddr; use std::net::SocketAddr;
use axum::{response::IntoResponse, routing::get, Router}; use axum::{routing::get, Router};
use crate::app::SharedApp; use crate::app::SharedApp;
pub async fn axum_serve(host: SocketAddr, app: SharedApp) -> anyhow::Result<()> { pub async fn axum_serve(host: SocketAddr, app: SharedApp) -> anyhow::Result<()> {
let app = Router::new() let app = Router::new().route("/", get(root)).with_state(app);
.route("/ping", get(pong))
.route("/", get(root))
.with_state(app);
tracing::info!("listening on {}", host); tracing::info!("listening on {}", host);
let listener = tokio::net::TcpListener::bind(host).await.unwrap(); let listener = tokio::net::TcpListener::bind(host).await.unwrap();
@ -18,10 +15,6 @@ pub async fn axum_serve(host: SocketAddr, app: SharedApp) -> anyhow::Result<()>
Ok(()) Ok(())
} }
async fn pong() -> impl IntoResponse {
"pong!"
}
async fn root() -> &'static str { async fn root() -> &'static str {
"Hello, flux-releaser!" "Hello, flux-releaser!"
} }

View File

@ -1,17 +1,9 @@
use std::{ops::Deref, sync::Arc}; use std::{ops::Deref, sync::Arc};
use sqlx::{PgPool, Postgres};
use crate::services::{
archive::Archive,
cluster_list::ClusterList,
git::{Git, SharedGit},
};
use self::infra::{
aws_s3::s3_client, use self::infra::aws_s3::s3_client;
grpc::{new_client, FluxReleaserGrpcClient},
};
#[derive(Clone)] #[derive(Clone)]
pub struct SharedApp(Arc<App>); pub struct SharedApp(Arc<App>);
@ -33,8 +25,6 @@ impl SharedApp {
pub struct App { pub struct App {
pub s3_client: aws_sdk_s3::Client, pub s3_client: aws_sdk_s3::Client,
pub nats: infra::nats::Nats, pub nats: infra::nats::Nats,
pub database: PgPool,
pub git: SharedGit,
} }
impl App { impl App {
@ -42,45 +32,8 @@ impl App {
Ok(Self { Ok(Self {
s3_client: s3_client().await?, s3_client: s3_client().await?,
nats: infra::nats::Nats::new().await?, nats: infra::nats::Nats::new().await?,
database: infra::database::get_database().await?,
git: Git::new(
std::env::var("FLUX_RELEASER_GIT_REPOSITORY")
.unwrap_or("ssh://git@git.front.kjuulh.io/kjuulh/clank-clusters.git".into()),
ClusterList::default(),
Archive::default(),
)
.into(),
}) })
} }
} }
pub mod infra; pub mod infra;
#[derive(Clone)]
pub struct SharedLocalApp(Arc<LocalApp>);
impl Deref for SharedLocalApp {
type Target = Arc<LocalApp>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl SharedLocalApp {
pub fn new(app: LocalApp) -> Self {
Self(Arc::new(app))
}
}
pub struct LocalApp {
pub grpc_client: FluxReleaserGrpcClient,
}
impl LocalApp {
pub async fn new(registry: impl Into<String>) -> anyhow::Result<Self> {
Ok(Self {
grpc_client: new_client(registry).await?,
})
}
}

View File

@ -1,4 +1,2 @@
pub mod aws_s3; pub mod aws_s3;
pub mod database;
pub mod grpc;
pub mod nats; pub mod nats;

View File

@ -1,4 +1,3 @@
use anyhow::Context;
use aws_config::{BehaviorVersion, Region}; use aws_config::{BehaviorVersion, Region};
use aws_sdk_s3::config::Credentials; use aws_sdk_s3::config::Credentials;
@ -6,16 +5,15 @@ pub async fn s3_client() -> anyhow::Result<aws_sdk_s3::Client> {
let shared_config = aws_config::defaults(BehaviorVersion::latest()) let shared_config = aws_config::defaults(BehaviorVersion::latest())
.region(Region::new("eu-west-1")) .region(Region::new("eu-west-1"))
.credentials_provider(Credentials::new( .credentials_provider(Credentials::new(
std::env::var("AWS_ACCESS_KEY_ID").context("AWS_ACCESS_KEY_ID was not set")?, std::env::var("AWS_ACCESS_KEY_ID")?,
std::env::var("AWS_SECRET_ACCESS_KEY").context("AWS_SECRET_ACCESS_KEY was not set")?, std::env::var("AWS_SECRET_ACCESS_KEY")?,
None, None,
None, None,
"flux_releaser", "flux_releaser",
)); ));
let config = aws_sdk_s3::config::Builder::from(&shared_config.load().await) let config = aws_sdk_s3::config::Builder::from(&shared_config.load().await)
.endpoint_url(std::env::var("AWS_ENDPOINT_URL").context("AWS_ENDPOINT_URL was not set")?) .endpoint_url(std::env::var("AWS_ENDPOINT_URL")?)
.force_path_style(true)
.build(); .build();
let client = aws_sdk_s3::Client::from_conf(config); let client = aws_sdk_s3::Client::from_conf(config);

View File

@ -1,19 +0,0 @@
use anyhow::Context;
use sqlx::{PgPool, Postgres};
pub async fn get_database() -> anyhow::Result<PgPool> {
tracing::trace!("initializing database");
let db =
sqlx::PgPool::connect(&std::env::var("DATABASE_URL").context("DATABASE_URL is not set")?)
.await?;
tracing::trace!("migrating crdb");
sqlx::migrate!("migrations/crdb")
.set_locking(false)
.run(&db)
.await?;
let _ = sqlx::query("SELECT 1;").fetch_one(&db).await?;
Ok(db)
}

View File

@ -1,36 +0,0 @@
use std::sync::Arc;
use tokio::sync::Mutex;
use tonic::transport::{Channel, ClientTlsConfig};
use crate::grpc::gen::flux_releaser_client::FluxReleaserClient;
pub type FluxReleaserGrpcClient = Arc<Mutex<FluxReleaserClient<Channel>>>;
pub async fn new_client(registry: impl Into<String>) -> anyhow::Result<FluxReleaserGrpcClient> {
let registry: String = registry.into();
// Workaround: https://github.com/algesten/ureq/issues/765#issuecomment-2282921492
static INIT: std::sync::Once = std::sync::Once::new();
INIT.call_once(|| {
rustls::crypto::aws_lc_rs::default_provider()
.install_default()
.unwrap();
});
let channel = if registry.starts_with("https") {
let tls = ClientTlsConfig::new().with_native_roots();
Channel::from_shared(registry)?
.tls_config(tls)?
.connect()
.await?
} else {
Channel::from_shared(registry)?.connect().await?
};
let client = FluxReleaserClient::new(channel);
Ok(Arc::new(Mutex::new(client)))
}

View File

@ -1,10 +1,11 @@
use clap::{Parser, Subcommand}; use clap::{Parser, Subcommand};
use std::net::SocketAddr; use std::net::SocketAddr;
use crate::services::flux_local_cluster::extensions::FluxLocalClusterManagerExt; use crate::{
api::axum_serve,
pub mod client; app::{App, SharedApp},
pub mod server; grpc::tonic_serve,
};
#[derive(Parser)] #[derive(Parser)]
#[command(author, version, about, long_about = None, subcommand_required = true)] #[command(author, version, about, long_about = None, subcommand_required = true)]
@ -21,62 +22,27 @@ pub enum Commands {
#[arg(env = "SERVICE_GRPC_HOST", long, default_value = "127.0.0.1:7900")] #[arg(env = "SERVICE_GRPC_HOST", long, default_value = "127.0.0.1:7900")]
grpc_host: SocketAddr, grpc_host: SocketAddr,
}, },
Commit {
#[arg(long)]
app: String,
#[arg(long)]
branch: String,
#[arg(long)]
include: String,
#[arg(env = "FLUX_RELEASER_REGISTRY", long)]
registry: String,
},
Release {
#[arg(long)]
app: String,
#[arg(long)]
branch: String,
#[arg(env = "FLUX_RELEASER_REGISTRY", long)]
registry: String,
},
} }
impl Command { impl Command {
pub async fn run() -> anyhow::Result<()> { pub async fn run() -> anyhow::Result<()> {
let cli = Command::parse(); let cli = Command::parse();
match cli.command { if let Some(Commands::Serve { host, grpc_host }) = cli.command {
Some(Commands::Serve { host, grpc_host }) => { tracing_subscriber::fmt::init();
server::run_server(host, grpc_host).await?;
}
Some(Commands::Commit {
app,
branch,
include,
registry,
}) => {
let app = client::get_local_app(registry).await?;
let upload_id = app tracing::info!("Starting service");
.flux_local_cluster_manager()
.package_clusters(include)
.await?;
}
Some(Commands::Release {
app: service_app,
branch,
registry,
}) => {
let app = client::get_local_app(registry).await?;
app.flux_local_cluster_manager() let app = SharedApp::new(App::new().await?);
.trigger_release(service_app, branch)
.await?; tokio::select! {
} res = axum_serve(host, app.clone()) => {
None => (), res?;
},
res = tonic_serve(grpc_host, app.clone()) => {
res?;
},
};
} }
Ok(()) Ok(())

View File

@ -1,11 +0,0 @@
use crate::app::{LocalApp, SharedLocalApp};
pub async fn get_local_app(registry: impl Into<String>) -> anyhow::Result<SharedLocalApp> {
tracing_subscriber::fmt::init();
tracing::debug!("Starting client commit");
let app = SharedLocalApp::new(LocalApp::new(registry).await?);
Ok(app)
}

View File

@ -1,29 +0,0 @@
use std::net::SocketAddr;
use crate::{
api::axum_serve,
app::{App, SharedApp},
grpc::tonic_serve,
};
pub async fn run_server(
host: impl Into<SocketAddr>,
grpc_host: impl Into<SocketAddr>,
) -> anyhow::Result<()> {
tracing_subscriber::fmt::init();
tracing::info!("Starting service");
let app = SharedApp::new(App::new().await?);
tokio::select! {
res = axum_serve(host.into(), app.clone()) => {
res?;
},
res = tonic_serve(grpc_host.into(), app.clone()) => {
res?;
},
};
Ok(())
}

View File

@ -1,25 +1,17 @@
use std::{env::temp_dir, fmt::Display, net::SocketAddr}; use std::net::SocketAddr;
use tokio::io::AsyncWriteExt; use tonic::transport::Server;
use tokio_stream::StreamExt;
use tonic::{service::interceptor, transport::Server};
use uuid::Uuid;
use crate::{ use crate::{
app::SharedApp, app::SharedApp,
services::release_manager::{ services::release_manager::{
extensions::ReleaseManagerExt, extensions::ReleaseManagerExt, models::CommitArtifact, ReleaseManager,
models::{CommitArtifact, Release},
ReleaseManager,
}, },
}; };
use self::gen::{ use self::gen::{greeter_server, HelloReply, HelloRequest};
flux_releaser_server, CommitArtifactRequest, CommitArtifactResponse, TriggerReleaseRequest,
TriggerReleaseResponse, UploadArtifactRequest, UploadArtifactResponse,
};
pub mod gen { mod gen {
tonic::include_proto!("flux_releaser"); tonic::include_proto!("flux_releaser");
} }
@ -35,139 +27,45 @@ impl FluxReleaserGrpc {
} }
} }
impl std::fmt::Debug for FluxReleaserGrpc {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
Ok(())
}
}
#[tonic::async_trait] #[tonic::async_trait]
impl flux_releaser_server::FluxReleaser for FluxReleaserGrpc { impl greeter_server::Greeter for FluxReleaserGrpc {
#[tracing::instrument] async fn say_hello(
async fn upload_artifact(
&self, &self,
request: tonic::Request<tonic::Streaming<UploadArtifactRequest>>, request: tonic::Request<HelloRequest>,
) -> std::result::Result<tonic::Response<UploadArtifactResponse>, tonic::Status> { ) -> std::result::Result<tonic::Response<HelloReply>, tonic::Status> {
let mut stream = request.into_inner();
let file_path = temp_dir()
.join("flux_releaser")
.join("tmp")
.join("upload_artifact")
.join(Uuid::new_v4().to_string());
tokio::fs::create_dir_all(file_path.parent().unwrap()).await?;
let mut file = tokio::fs::File::create(&file_path).await?;
while let Some(item) = stream.next().await {
tracing::trace!("received chunk");
let item = item?;
let _ = file.write(&item.content).await?;
}
tracing::info!("got this far 1a");
file.flush().await?;
tracing::info!("got this far 1");
let upload_id = match self.release_manager.upload_artifact(file_path.into()).await {
Ok(res) => res,
Err(e) => {
tracing::warn!("failed to upload artifact: {}", e);
return Err(tonic::Status::unknown(e.to_string()));
}
};
tracing::info!("got this far 2");
Ok(tonic::Response::new(UploadArtifactResponse {
upload_id: upload_id.to_string(),
}))
}
#[tracing::instrument]
async fn commit_artifact(
&self,
request: tonic::Request<CommitArtifactRequest>,
) -> std::result::Result<tonic::Response<CommitArtifactResponse>, tonic::Status> {
let req = request.into_inner(); let req = request.into_inner();
let artifact = self
.release_manager
.commit_artifact(req.try_into().map_err(|e: anyhow::Error| {
tracing::warn!("failed to parse input body: {}", e);
tonic::Status::invalid_argument(e.to_string())
})?)
.await
.map_err(|e: anyhow::Error| {
tracing::warn!("failed to commit artifact: {}", e);
tonic::Status::internal(e.to_string())
})?;
Ok(tonic::Response::new(CommitArtifactResponse {
artifact_id: artifact.to_string(),
}))
}
#[tracing::instrument]
async fn trigger_release(
&self,
request: tonic::Request<TriggerReleaseRequest>,
) -> std::result::Result<tonic::Response<TriggerReleaseResponse>, tonic::Status> {
let req = request.into_inner();
tracing::info!("some trigger release");
self.release_manager self.release_manager
.release(req.try_into().map_err(|e: anyhow::Error| { .commit_artifact(
tracing::warn!("failed to parse input body: {}", e); req.try_into()
tonic::Status::invalid_argument(e.to_string()) .map_err(|e: anyhow::Error| tonic::Status::invalid_argument(e.to_string()))?,
})?) )
.await .await
.map_err(|e| { .unwrap();
tracing::warn!("failed to release: {}", e);
tonic::Status::internal(e.to_string())
})?;
Ok(tonic::Response::new(TriggerReleaseResponse {})) Ok(tonic::Response::new(HelloReply {
message: "something".into(),
}))
} }
} }
impl TryFrom<CommitArtifactRequest> for CommitArtifact { impl TryFrom<HelloRequest> for CommitArtifact {
type Error = anyhow::Error; type Error = anyhow::Error;
fn try_from(value: CommitArtifactRequest) -> Result<Self, Self::Error> { fn try_from(value: HelloRequest) -> Result<Self, Self::Error> {
if value.app.is_empty() { if value.app.is_empty() {
anyhow::bail!("app cannot be empty") anyhow::bail!("app cannot be empty")
} }
if value.branch.is_empty() { if value.branch.is_empty() {
anyhow::bail!("branch cannot be empty") anyhow::bail!("branch cannot be empty")
} }
if value.upload_id.is_empty() { if value.folder.is_empty() {
anyhow::bail!("folder cannot be empty") anyhow::bail!("folder cannot be empty")
} }
Ok(Self { Ok(Self {
app: value.app, app: value.app,
branch: value.branch, branch: value.branch,
upload_id: value.upload_id.try_into()?, folder: value.folder.into(),
})
}
}
impl TryFrom<TriggerReleaseRequest> for Release {
type Error = anyhow::Error;
fn try_from(value: TriggerReleaseRequest) -> Result<Self, Self::Error> {
if value.app.is_empty() {
anyhow::bail!("app cannot be empty");
}
if value.branch.is_empty() {
anyhow::bail!("branch canot be empty");
}
Ok(Self {
app: value.app,
branch: value.branch,
}) })
} }
} }
@ -175,14 +73,11 @@ impl TryFrom<TriggerReleaseRequest> for Release {
pub async fn tonic_serve(host: SocketAddr, app: SharedApp) -> anyhow::Result<()> { pub async fn tonic_serve(host: SocketAddr, app: SharedApp) -> anyhow::Result<()> {
tracing::info!("grpc listening on: {}", host); tracing::info!("grpc listening on: {}", host);
Server::builder() Server::builder()
.trace_fn(|_| tracing::info_span!("flux_releaser")) .add_service(greeter_server::GreeterServer::new(FluxReleaserGrpc::new(
.add_service(flux_releaser_server::FluxReleaserServer::new( app,
FluxReleaserGrpc::new(app), )))
))
.serve(host) .serve(host)
.await?; .await?;
Ok(()) Ok(())
} }
pub struct LogLayer {}

View File

@ -1,15 +0,0 @@
use cli::Command;
pub mod api;
pub mod app;
pub mod cli;
pub mod grpc;
pub mod services;
pub async fn run() -> anyhow::Result<()> {
dotenv::dotenv().ok();
Command::run().await?;
Ok(())
}

View File

@ -1,6 +1,19 @@
use cli::Command;
mod cli;
mod api;
mod grpc;
mod app;
mod services;
#[tokio::main] #[tokio::main]
async fn main() -> anyhow::Result<()> { async fn main() -> anyhow::Result<()> {
flux_releaser::run().await?; dotenv::dotenv().ok();
Command::run().await?;
Ok(()) Ok(())
} }

View File

@ -1,11 +1,5 @@
pub mod archive; mod archive;
pub mod cluster_list; mod domain_events;
pub mod domain_events; mod file_reader;
pub mod file_reader; mod file_store;
pub mod file_store;
pub mod flux_local_cluster;
pub mod flux_releaser_uploader;
pub mod git;
pub mod release_manager; pub mod release_manager;
pub mod artifacts_db;

View File

@ -1,75 +1,7 @@
#[derive(Clone, Default)] #[derive(Clone)]
pub struct Archive {} pub struct Archive {}
use std::{ use std::{io::Cursor, path::Path};
io::{Bytes, Cursor},
path::Path,
};
use anyhow::Context;
use super::file_reader::Files;
impl Archive {
pub fn new() -> Self {
Self {}
}
pub async fn create_archive(&self, prefix: &Path, files: Files) -> anyhow::Result<ArchiveFile> {
tracing::trace!("archiving files: {}", files.len());
let buffer = Vec::new();
let cursor = Cursor::new(buffer);
let mut tar_builder = tar::Builder::new(cursor);
for file in files {
let abs_file_path = file.path;
let mut fd = std::fs::File::open(&abs_file_path)?;
if let Some(rel) = file.relative {
tracing::trace!("archiving rel file: {}", rel.display());
tar_builder.append_file(&rel, &mut fd)?;
} else {
tracing::trace!("archiving file: {}", abs_file_path.display());
tar_builder.append_file(
abs_file_path
.strip_prefix(prefix)
.context("failed to strip prefix from path")?,
&mut fd,
)?;
}
}
tar_builder.finish()?;
let cursor = tar_builder.into_inner()?;
let buffer = cursor.into_inner();
Ok(buffer.into())
}
pub async fn unpack_archive(&self, archive: &ArchiveFile, dest: &Path) -> anyhow::Result<()> {
tracing::trace!("unpacking archive: {}", dest.display());
let cursor = Cursor::new(archive.content.clone());
let mut arc = tar::Archive::new(cursor);
arc.unpack(dest)?;
Ok(())
}
}
pub struct ArchiveFile {
pub content: Vec<u8>,
}
impl From<Vec<u8>> for ArchiveFile {
fn from(value: Vec<u8>) -> Self {
Self { content: value }
}
}
pub mod extensions { pub mod extensions {
@ -79,11 +11,12 @@ pub mod extensions {
use tokio::io::AsyncWriteExt; use tokio::io::AsyncWriteExt;
use uuid::Uuid; use uuid::Uuid;
use crate::app::SharedLocalApp;
use crate::{app::SharedApp, services::release_manager::models::ArtifactID}; use crate::{app::SharedApp, services::release_manager::models::ArtifactID};
#[mockall_double::double]
use crate::services::file_store::FileStore; use crate::services::file_store::FileStore;
#[mockall_double::double]
use super::Archive; use super::Archive;
use super::ArchiveFile; use super::ArchiveFile;
@ -97,12 +30,6 @@ pub mod extensions {
} }
} }
impl ArchiveExt for SharedLocalApp {
fn archive(&self) -> Archive {
Archive::new()
}
}
#[async_trait] #[async_trait]
pub trait ArchiveUploadExt { pub trait ArchiveUploadExt {
async fn upload_archive( async fn upload_archive(
@ -164,3 +91,49 @@ pub mod extensions {
} }
} }
} }
#[cfg(test)]
use mockall::{automock, mock, predicate::*};
use super::file_reader::{File, Files};
#[cfg_attr(test, automock)]
impl Archive {
pub fn new() -> Self {
Self {}
}
pub async fn create_archive(&self, files: Files) -> anyhow::Result<ArchiveFile> {
tracing::trace!("archiving files");
let buffer = Vec::new();
let cursor = Cursor::new(buffer);
let mut tar_builder = tar::Builder::new(cursor);
for file in files {
let abs_file_path = file.path;
tracing::trace!("archiving file: {}", abs_file_path.display());
let mut fd = std::fs::File::open(&abs_file_path)?;
tar_builder.append_file(&abs_file_path, &mut fd)?;
}
tar_builder.finish()?;
let cursor = tar_builder.into_inner()?;
let buffer = cursor.into_inner();
Ok(buffer.into())
}
}
pub struct ArchiveFile {
pub content: Vec<u8>,
}
impl From<Vec<u8>> for ArchiveFile {
fn from(value: Vec<u8>) -> Self {
Self { content: value }
}
}

View File

@ -1,132 +0,0 @@
use std::sync::Arc;
use sqlx::{prelude::FromRow, PgPool};
use self::defaults::DefaultArtifactsDB;
#[derive(Clone, Debug)]
pub struct AddCommitArtifact {
pub app: String,
pub branch: String,
pub artifact_id: uuid::Uuid,
}
#[derive(Clone, Debug, FromRow)]
pub struct Artifact {
pub app: String,
pub branch: String,
pub artifact_id: uuid::Uuid,
pub created_at: chrono::NaiveDateTime,
}
#[derive(Clone, Debug)]
pub struct GetLatestArtifact {
pub app: String,
pub branch: String,
}
pub mod traits {
use axum::async_trait;
use super::{AddCommitArtifact, Artifact, GetLatestArtifact};
#[async_trait]
pub trait ArtifactsDB {
async fn commit_artifact(&self, commit_artifact: AddCommitArtifact) -> anyhow::Result<()>;
async fn get_latest_artifact(
&self,
get_latest_artifact: GetLatestArtifact,
) -> anyhow::Result<Artifact>;
}
}
pub mod defaults {
use axum::async_trait;
use sqlx::PgPool;
use super::{traits, AddCommitArtifact, Artifact, GetLatestArtifact};
pub struct DefaultArtifactsDB {
db: PgPool,
}
impl DefaultArtifactsDB {
pub fn new(db: PgPool) -> Self {
Self { db }
}
}
#[async_trait]
impl traits::ArtifactsDB for DefaultArtifactsDB {
async fn commit_artifact(&self, commit_artifact: AddCommitArtifact) -> anyhow::Result<()> {
sqlx::query("INSERT INTO artifacts (app, branch, artifact_id) VALUES ($1, $2, $3)")
.bind(commit_artifact.app)
.bind(commit_artifact.branch)
.bind(commit_artifact.artifact_id)
.execute(&self.db)
.await?;
Ok(())
}
async fn get_latest_artifact(
&self,
get_latest_artifact: GetLatestArtifact,
) -> anyhow::Result<Artifact> {
let artifact = sqlx::query_as::<_, Artifact>(
"SELECT
*
FROM
artifacts
WHERE
app = $1 AND
branch = $2
ORDER BY created_at DESC
LIMIT 1",
)
.bind(get_latest_artifact.app)
.bind(get_latest_artifact.branch)
.fetch_one(&self.db)
.await?;
Ok(artifact)
}
}
}
#[derive(Clone)]
pub struct ArtifactsDB {
inner: Arc<dyn traits::ArtifactsDB + Send + Sync>,
}
impl ArtifactsDB {
pub fn new(db: PgPool) -> Self {
Self {
inner: Arc::new(DefaultArtifactsDB::new(db)),
}
}
}
impl std::ops::Deref for ArtifactsDB {
type Target = Arc<dyn traits::ArtifactsDB + Send + Sync>;
fn deref(&self) -> &Self::Target {
&self.inner
}
}
pub mod extensions {
use crate::app::App;
use super::ArtifactsDB;
pub trait ArtifactsDBExt {
fn artifacts_db(&self) -> ArtifactsDB;
}
impl ArtifactsDBExt for App {
fn artifacts_db(&self) -> ArtifactsDB {
ArtifactsDB::new(self.database.clone())
}
}
}

View File

@ -1,23 +0,0 @@
use std::collections::HashMap;
#[derive(Default)]
pub struct ClusterList {}
impl ClusterList {
pub async fn get_list(&self) -> anyhow::Result<HashMap<String, Vec<String>>> {
Ok(HashMap::from([
("dev".into(), vec!["clank-dev".into()]),
("prod".into(), vec!["clank-prod".into()]),
]))
}
pub async fn get(&self, environment: &str) -> anyhow::Result<Option<Vec<String>>> {
let list = self.get_list().await?;
if let Some(x) = list.get(environment) {
Ok(Some(x.clone()))
} else {
Ok(None)
}
}
}

View File

@ -5,7 +5,12 @@ pub struct DomainEvents {
nats: Nats, nats: Nats,
} }
use crate::app::infra::nats::Nats; #[cfg(test)]
use mockall::{automock, mock, predicate::*};
use crate::app::infra::{nats::Nats};
#[cfg_attr(test, automock)]
impl DomainEvents { impl DomainEvents {
pub fn new(nats: Nats) -> Self { pub fn new(nats: Nats) -> Self {
Self { nats } Self { nats }

View File

@ -1,5 +1,6 @@
use crate::app::SharedApp; use crate::app::SharedApp;
#[mockall_double::double]
use super::DomainEvents; use super::DomainEvents;
pub trait DomainEventsExt { pub trait DomainEventsExt {

View File

@ -1,18 +1,18 @@
#[derive(Clone, Default)] #[derive(Clone)]
pub struct FileReader {} pub struct FileReader {}
use std::{ use std::{collections::BTreeMap, path::PathBuf};
collections::BTreeMap,
path::{Path, PathBuf},
};
pub mod extensions; pub mod extensions;
use anyhow::{anyhow, Context}; use anyhow::anyhow;
#[cfg(test)]
use mockall::{automock, mock, predicate::*};
#[cfg_attr(test, automock)]
impl FileReader { impl FileReader {
pub fn new() -> Self { pub fn new() -> Self {
Self::default() Self {}
} }
pub async fn read_files(&self, location: PathBuf) -> anyhow::Result<Files> { pub async fn read_files(&self, location: PathBuf) -> anyhow::Result<Files> {
@ -20,9 +20,7 @@ impl FileReader {
let mut clusters: BTreeMap<String, Vec<File>> = BTreeMap::new(); let mut clusters: BTreeMap<String, Vec<File>> = BTreeMap::new();
let mut dir = tokio::fs::read_dir(&location) let mut dir = tokio::fs::read_dir(&location).await?;
.await
.context(format!("failed to find location: {}", &location.display()))?;
while let Some(dir_entry) = dir.next_entry().await? { while let Some(dir_entry) = dir.next_entry().await? {
if dir_entry.metadata().await?.is_dir() { if dir_entry.metadata().await?.is_dir() {
tracing::trace!("found cluster in: {}", dir_entry.path().display()); tracing::trace!("found cluster in: {}", dir_entry.path().display());
@ -51,13 +49,9 @@ impl FileReader {
cluster_name cluster_name
); );
if file.path().is_absolute() {
files.push((file.path(), file.path().strip_prefix(&location)?).into())
} else {
files.push(file.into_path().into()) files.push(file.into_path().into())
} }
} }
}
Ok(clusters.into()) Ok(clusters.into())
} }
@ -66,32 +60,11 @@ impl FileReader {
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct File { pub struct File {
pub path: PathBuf, pub path: PathBuf,
pub relative: Option<PathBuf>,
} }
impl From<PathBuf> for File { impl From<PathBuf> for File {
fn from(value: PathBuf) -> Self { fn from(value: PathBuf) -> Self {
Self { Self { path: value }
path: value,
relative: None,
}
}
}
impl From<(PathBuf, PathBuf)> for File {
fn from(value: (PathBuf, PathBuf)) -> Self {
Self {
path: value.0,
relative: Some(value.1),
}
}
}
impl From<(&Path, &Path)> for File {
fn from(value: (&Path, &Path)) -> Self {
Self {
path: value.0.to_path_buf(),
relative: Some(value.1.to_path_buf()),
}
} }
} }
@ -119,8 +92,14 @@ impl From<Files> for Vec<File> {
value value
.iter() .iter()
.map(|(cluster_name, files)| (PathBuf::from(cluster_name), files)) .map(|(cluster_name, files)| (PathBuf::from(cluster_name), files))
.flat_map(|(_cluster_name, files)| files.to_vec()) .flat_map(|(cluster_name, files)| {
// .map(|f| f.into()) files
.iter()
//.map(|file_path| cluster_name.join(&file_path.path))
.map(|file_path| file_path.path.clone())
.collect::<Vec<_>>()
})
.map(|f| f.into())
.collect::<Vec<_>>() .collect::<Vec<_>>()
} }
} }

View File

@ -1,5 +1,6 @@
use crate::app::{SharedApp, SharedLocalApp}; use crate::app::SharedApp;
#[mockall_double::double]
use super::FileReader; use super::FileReader;
pub trait FileReaderExt { pub trait FileReaderExt {
@ -11,9 +12,3 @@ impl FileReaderExt for SharedApp {
FileReader::new() FileReader::new()
} }
} }
impl FileReaderExt for SharedLocalApp {
fn file_reader(&self) -> FileReader {
FileReader::new()
}
}

View File

@ -1,25 +1,23 @@
use std::{env::temp_dir, path::PathBuf}; use std::path::PathBuf;
use super::release_manager::models::{ArtifactID, UploadArtifactID}; use super::release_manager::models::ArtifactID;
pub mod extensions; pub mod extensions;
#[derive(Clone)] #[derive(Clone)]
pub struct FileStore { pub struct FileStore {
client: aws_sdk_s3::Client, client: aws_sdk_s3::Client,
bucket: String,
} }
use aws_sdk_s3::primitives::ByteStream; use aws_sdk_s3::primitives::ByteStream;
use tokio::io::BufReader; #[cfg(test)]
use mockall::{automock, mock, predicate::*};
use tokio::io::AsyncReadExt;
#[cfg_attr(test, automock)]
impl FileStore { impl FileStore {
pub fn new(client: aws_sdk_s3::Client) -> Self { pub fn new(client: aws_sdk_s3::Client) -> Self {
Self { Self { client }
client,
bucket: std::env::var("BUCKET_NAME").unwrap_or("flux-releaser".into()),
}
} }
pub async fn upload_file(&self, artifact_id: ArtifactID, file: PathBuf) -> anyhow::Result<()> { pub async fn upload_file(&self, artifact_id: ArtifactID, file: PathBuf) -> anyhow::Result<()> {
@ -27,7 +25,7 @@ impl FileStore {
self.client self.client
.put_object() .put_object()
.bucket(&self.bucket) .bucket("mybucket")
.key(format!("archives/{}.tar", &artifact_id.to_string())) .key(format!("archives/{}.tar", &artifact_id.to_string()))
.body(ByteStream::from_path(file).await?) .body(ByteStream::from_path(file).await?)
.send() .send()
@ -35,74 +33,4 @@ impl FileStore {
Ok(()) Ok(())
} }
pub async fn upload_temp(&self, id: UploadArtifactID, file: PathBuf) -> anyhow::Result<()> {
tracing::trace!("uploading temp files: {}", id.to_string());
self.client
.put_object()
.bucket(&self.bucket)
.key(format!("temp/{}.tar", &id.to_string()))
.body(ByteStream::from_path(file).await?)
.send()
.await?;
Ok(())
}
pub async fn get_archive(&self, artifact_id: ArtifactID) -> anyhow::Result<PathBuf> {
tracing::trace!("getting archive: {}", artifact_id.to_string());
let archive_name = format!("archives/{}.tar", &artifact_id.to_string());
let obj = self
.client
.get_object()
.bucket(&self.bucket)
.key(&archive_name)
.send()
.await?;
let archive_path = temp_dir()
.join("flux_releaser")
.join("cache")
.join(&archive_name);
tokio::fs::create_dir_all(archive_path.parent().unwrap()).await?;
let mut archive_file = tokio::fs::File::create(&archive_path).await?;
let mut buf_reader = BufReader::new(obj.body.into_async_read());
tokio::io::copy(&mut buf_reader, &mut archive_file).await?;
tracing::debug!("created archive: {}", archive_path.display());
Ok(archive_path)
}
pub async fn get_temp(&self, artifact_id: UploadArtifactID) -> anyhow::Result<PathBuf> {
tracing::trace!("getting archive: {}", artifact_id.to_string());
let archive_name = format!("temp/{}.tar", &artifact_id.to_string());
let obj = self
.client
.get_object()
.bucket(&self.bucket)
.key(&archive_name)
.send()
.await?;
let archive_path = temp_dir()
.join("flux_releaser")
.join("downloads/cache")
.join(&archive_name);
tokio::fs::create_dir_all(archive_path.parent().unwrap()).await?;
let mut archive_file = tokio::fs::File::create(&archive_path).await?;
let mut buf_reader = BufReader::new(obj.body.into_async_read());
tokio::io::copy(&mut buf_reader, &mut archive_file).await?;
tracing::debug!("created archive: {}", archive_path.display());
Ok(archive_path)
}
} }

View File

@ -1,5 +1,6 @@
use crate::app::SharedApp; use crate::app::SharedApp;
#[mockall_double::double]
use super::FileStore; use super::FileStore;
pub trait FileStoreExt { pub trait FileStoreExt {

View File

@ -1,138 +0,0 @@
use std::path::PathBuf;
use anyhow::Context;
use crate::{
app::infra::grpc::FluxReleaserGrpcClient,
grpc::gen::{CommitArtifactRequest, TriggerReleaseRequest},
};
use super::{
archive::Archive,
file_reader::FileReader,
flux_releaser_uploader::FluxReleaserUploader,
release_manager::models::{ArtifactID, UploadArtifactID},
};
pub struct FluxLocalClusterManager {
file_reader: FileReader,
archive: Archive,
flux_uploader: FluxReleaserUploader,
flux_releaser_client: FluxReleaserGrpcClient,
}
impl std::fmt::Debug for FluxLocalClusterManager {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
Ok(())
}
}
impl FluxLocalClusterManager {
pub fn new(
file_reader: FileReader,
archive: Archive,
flux_uploader: FluxReleaserUploader,
flux_releaser_client: FluxReleaserGrpcClient,
) -> Self {
Self {
file_reader,
archive,
flux_uploader,
flux_releaser_client,
}
}
#[tracing::instrument(skip(include))]
pub async fn package_clusters(
&self,
include: impl Into<PathBuf>,
) -> anyhow::Result<UploadArtifactID> {
let include = include.into();
tracing::debug!("reading files");
let files = self.file_reader.read_files(include.clone()).await?;
tracing::debug!("creating archive");
let archive = self.archive.create_archive(&include, files).await?;
tracing::debug!("uploading archive");
let upload_id = self.flux_uploader.upload_archive(archive).await?;
tracing::debug!("done packaging clusters");
Ok(upload_id)
}
pub async fn commit_artifact(
&self,
app: impl Into<String>,
branch: impl Into<String>,
upload_id: UploadArtifactID,
) -> anyhow::Result<ArtifactID> {
let artifact_id = self
.flux_releaser_client
.lock()
.await
.commit_artifact(tonic::Request::new(CommitArtifactRequest {
app: app.into(),
branch: branch.into(),
upload_id: upload_id.to_string(),
}))
.await?;
artifact_id.into_inner().artifact_id.try_into()
}
pub async fn trigger_release(
&self,
app: impl Into<String>,
branch: impl Into<String>,
) -> anyhow::Result<()> {
self.flux_releaser_client
.lock()
.await
.trigger_release(tonic::Request::new(TriggerReleaseRequest {
app: app.into(),
branch: branch.into(),
}))
.await
.context("failed to trigger release")?;
// Send release proto to upstream
// 1. find app by app + branch
// 2. Unpack latest artifact by app + branch
// 3. Unpack by cluster
// 4. Upload
Ok(())
}
}
pub mod extensions {
use crate::{
app::SharedLocalApp,
services::{
archive::extensions::ArchiveExt, file_reader::extensions::FileReaderExt,
flux_releaser_uploader::extensions::FluxReleaserUploaderExt,
},
};
use super::FluxLocalClusterManager;
pub trait FluxLocalClusterManagerExt {
fn flux_local_cluster_manager(&self) -> FluxLocalClusterManager;
}
impl FluxLocalClusterManagerExt for SharedLocalApp {
fn flux_local_cluster_manager(&self) -> FluxLocalClusterManager {
FluxLocalClusterManager::new(
self.file_reader(),
self.archive(),
self.flux_releaser_uploader(),
self.grpc_client.clone(),
)
}
}
}

View File

@ -1,54 +0,0 @@
use tonic::transport::Channel;
use crate::{
app::infra::grpc::FluxReleaserGrpcClient,
grpc::gen::{flux_releaser_client::FluxReleaserClient, UploadArtifactRequest},
};
use super::{archive::ArchiveFile, release_manager::models::UploadArtifactID};
pub struct FluxReleaserUploader {
flux_client: FluxReleaserGrpcClient,
}
impl FluxReleaserUploader {
pub fn new(flux_client: FluxReleaserGrpcClient) -> Self {
Self { flux_client }
}
pub async fn upload_archive(&self, archive: ArchiveFile) -> anyhow::Result<UploadArtifactID> {
let chunks = archive
.content
.chunks(1_000_000) // Slice by about 1MB
.map(|ch| UploadArtifactRequest {
content: ch.to_vec(),
})
.collect::<Vec<_>>();
let iter = tokio_stream::iter(chunks);
let resp = self
.flux_client
.lock()
.await
.upload_artifact(tonic::Request::new(iter))
.await?;
resp.into_inner().upload_id.try_into()
}
}
pub mod extensions {
use crate::app::SharedLocalApp;
use super::FluxReleaserUploader;
pub trait FluxReleaserUploaderExt {
fn flux_releaser_uploader(&self) -> FluxReleaserUploader;
}
impl FluxReleaserUploaderExt for SharedLocalApp {
fn flux_releaser_uploader(&self) -> FluxReleaserUploader {
FluxReleaserUploader::new(self.grpc_client.clone())
}
}
}

View File

@ -1,337 +0,0 @@
use std::{
path::{Path, PathBuf},
sync::Arc,
};
use anyhow::Context;
use git2::{
build::{CheckoutBuilder, RepoBuilder},
Cred, FetchOptions, IndexAddOption, PushOptions, RemoteCallbacks, Repository, ResetType,
Signature,
};
use tokio::{io::AsyncWriteExt, sync::Mutex};
use super::{
archive::{Archive, ArchiveFile},
cluster_list::ClusterList,
};
#[derive(Clone)]
pub struct SharedGit {
inner: Arc<Git>,
}
impl From<Git> for SharedGit {
fn from(value: Git) -> Self {
Self {
inner: Arc::new(value),
}
}
}
impl std::ops::Deref for SharedGit {
type Target = Git;
fn deref(&self) -> &Self::Target {
&self.inner
}
}
pub struct Git {
location: Mutex<PathBuf>,
registry: String,
cluster_list: ClusterList,
archive: Archive,
}
impl Git {
pub fn new(registry: String, cluster_list: ClusterList, archive: Archive) -> Self {
Self {
registry,
location: Mutex::new(std::env::temp_dir().join("flux_releaser")),
cluster_list,
archive,
}
}
pub async fn publish(
&self,
archive: &ArchiveFile,
service: &str,
namespace: &str,
environment: &str,
) -> anyhow::Result<()> {
// TODO: implement exponential backoff and 5 attempts
self.publish_attempt(archive, service, namespace, environment)
.await?;
Ok(())
}
async fn publish_attempt(
&self,
archive: &ArchiveFile,
service: &str,
namespace: &str,
environment: &str,
) -> anyhow::Result<()> {
// 1. Clone repo into location (with lock) or update
let location = self.location.lock().await;
let repo_dir = location.join("repo");
let repo = match self.get_repo(&repo_dir).await? {
//TODO: Possible handle error by just recloning the repo
None => self.clone_repo(&repo_dir).await?,
Some(repo) => {
self.update_repo(repo).await?;
self.get_repo(&repo_dir)
.await?
.ok_or(anyhow::anyhow!("failed to open repository"))?
}
};
// 2. Extract from archive
// TODO: maybe pad archive with hash or something
let unpack_dir = location.join("archive");
self.archive.unpack_archive(archive, &unpack_dir).await?;
// 3. Splat tar over application and cluster
// The archive should always be structured like so:
// - <environment>/
// - * manifests
// 3a. prepare git repo for new files
let clusters = self
.cluster_list
.get(environment)
.await?
.ok_or(anyhow::anyhow!(
"environment is not registered: {} in cluster list",
environment
))?;
for cluster in clusters {
let service_entry = repo_dir
.join("deployments")
.join(&cluster)
.join(namespace)
.join(service);
if service_entry.exists() {
if let Err(e) = tokio::fs::remove_dir_all(&service_entry).await {
tracing::warn!("failed to remove existing dir: {}", e);
}
}
tokio::fs::create_dir_all(&service_entry).await?;
let cluster_entry = repo_dir
.join("clusters")
.join(&cluster)
.join(namespace)
.join(service);
if cluster_entry.exists() {
if let Err(e) = tokio::fs::remove_dir_all(&cluster_entry).await {
tracing::warn!("failed to remove existing dir: {}", e);
}
}
tokio::fs::create_dir_all(&cluster_entry).await?;
let archive_dir = unpack_dir.join(environment);
if !archive_dir.exists() {
anyhow::bail!("selected environment is not published for archive");
}
let mut read_dir = tokio::fs::read_dir(archive_dir).await?;
while let Some(entry) = read_dir.next_entry().await? {
if entry.metadata().await?.is_file() {
let entry_path = entry.path();
let dest_path = service_entry.join(entry.file_name());
tokio::fs::copy(entry_path, dest_path).await?;
}
}
let cluster_entry_file = cluster_entry.join(format!("{service}.yaml"));
let mut cluster_entry_file = tokio::fs::File::create(cluster_entry_file).await?;
let file_contents = format!(
r#"
apiVersion: kustomize.toolkit.fluxcd.io/v1beta2
kind: Kustomization
metadata:
name: {service}
namespace: flux-system
spec:
interval: 1m
retryInterval: 30s
path: ./deployments/{cluster}/{namespace}/{service}
prune: true
sourceRef:
kind: GitRepository
name: flux-system
namespace: flux-system
"#
);
cluster_entry_file
.write_all(file_contents.as_bytes())
.await?;
}
// 4. Commit && Push
self.commit_and_push(repo, environment, service).await?;
// 5. Cleanup
tokio::fs::remove_dir_all(unpack_dir).await?;
Ok(())
}
async fn get_repo(&self, location: &Path) -> anyhow::Result<Option<Repository>> {
match Repository::open(location) {
Ok(r) => Ok(Some(r)),
Err(e) => match e.code() {
git2::ErrorCode::NotFound => Ok(None),
_ => Err(e).context("failed to open git repository"),
},
}
}
async fn clone_repo(&self, location: &Path) -> anyhow::Result<Repository> {
let co = CheckoutBuilder::new();
let mut fo = FetchOptions::new();
fo.remote_callbacks(self.get_cred()?);
let repo = RepoBuilder::new()
.fetch_options(fo)
.with_checkout(co)
.clone(&self.registry, location)
.context("failed to clone repository")?;
Ok(repo)
}
async fn update_repo(&self, repository: Repository) -> anyhow::Result<()> {
let mut remote = repository.find_remote("origin")?;
let mut fo = FetchOptions::new();
fo.remote_callbacks(self.get_cred()?);
remote
.fetch(&["main"], Some(&mut fo), None)
.context("failed to update repo")?;
let origin_head = repository.find_reference("refs/remotes/origin/HEAD")?;
let origin_head_commit = origin_head.peel_to_commit()?;
// Perform a hard reset to the origin's HEAD
repository.reset(origin_head_commit.as_object(), ResetType::Hard, None)?;
Ok(())
}
fn get_cred(&self) -> anyhow::Result<RemoteCallbacks> {
let mut cb = RemoteCallbacks::new();
cb.credentials(|_, username, _| {
if let Ok(_sock) = std::env::var("SSH_AUTH_SOCK") {
return Cred::ssh_key_from_agent(username.unwrap_or("git"));
}
let username = std::env::var("GIT_USERNAME").expect("GIT_USERNAME to be set");
let password = std::env::var("GIT_PASSWORD").expect("GIT_PASSWORD to be set");
Cred::userpass_plaintext(&username, &password)
});
cb.certificate_check(|_cert, _| Ok(git2::CertificateCheckStatus::CertificateOk));
Ok(cb)
}
async fn commit_and_push(
&self,
repo: Repository,
environment: &str,
service: &str,
) -> anyhow::Result<()> {
let mut index = repo.index()?;
// Add all files to the index
index.add_all(["*"].iter(), IndexAddOption::DEFAULT, None)?;
index.write()?;
// Create a tree from the index
let oid = index.write_tree()?;
let tree = repo.find_tree(oid)?;
// Get the current HEAD commit
let parent_commit = repo.head()?.peel_to_commit()?;
// Create a signature
let sig = Signature::now("flux_releaser", "operations+flux-releaser@kjuulh.io")?;
// Create the commit
repo.commit(
Some("HEAD"),
&sig,
&sig,
&format!("chore({environment}/{service}): releasing service"),
&tree,
&[&parent_commit],
)?;
let mut remote = repo.find_remote("origin")?;
let mut po = PushOptions::new();
po.remote_callbacks(self.get_cred()?);
remote.push(
&[&format!("refs/heads/{}:refs/heads/{}", "main", "main")],
Some(&mut po),
)?;
Ok(())
}
}
#[cfg(test)]
mod test {
use tokio::io::AsyncReadExt;
use uuid::Uuid;
use crate::services::{
archive::{Archive, ArchiveFile},
cluster_list::ClusterList,
git::Git,
};
#[tokio::test]
async fn can_clone_upstream() -> anyhow::Result<()> {
// FIXME: right now CI doesn't support git
return Ok(());
let random = Uuid::new_v4().to_string();
println!("running test for id: {}", random);
println!("current_dir: {}", std::env::current_dir()?.display());
let mut arch = tokio::fs::File::open("testdata/example.tar").await?;
let mut dest = Vec::new();
arch.read_to_end(&mut dest).await?;
let git = Git::new(
"ssh://git@git.front.kjuulh.io/kjuulh/clank-clusters.git".into(),
ClusterList::default(),
Archive::default(),
);
let mut location = git.location.lock().await;
*location = location.join(random);
println!("into: {}", location.display());
drop(location);
git.publish(
&ArchiveFile { content: dest },
"flux-releaser-test",
"dev",
"dev",
)
.await?;
Ok(())
}
}

View File

@ -1,58 +1,50 @@
use serde::Serialize; use serde::Serialize;
use crate::services::archive::{Archive, ArchiveFile}; use crate::services::archive::extensions::ArchiveUploadExt;
use crate::services::artifacts_db::{AddCommitArtifact, GetLatestArtifact}; #[mockall_double::double]
use crate::services::file_store::FileStore; use crate::services::file_store::FileStore;
use super::artifacts_db::ArtifactsDB; #[mockall_double::double]
use super::archive::Archive;
#[mockall_double::double]
use super::domain_events::DomainEvents; use super::domain_events::DomainEvents;
use super::git::SharedGit; #[mockall_double::double]
use super::file_reader::FileReader;
use self::models::*; use self::models::{ArtifactID, CommitArtifact};
pub struct ReleaseManager { pub struct ReleaseManager {
archive: Archive,
file_reader: FileReader,
file_store: FileStore, file_store: FileStore,
domain_events: DomainEvents, domain_events: DomainEvents,
artifacts_db: ArtifactsDB,
git: SharedGit,
} }
impl ReleaseManager { impl ReleaseManager {
pub fn new( pub fn new(
file_reader: FileReader,
file_store: FileStore, file_store: FileStore,
archive: Archive,
domain_events: DomainEvents, domain_events: DomainEvents,
artifacts_db: ArtifactsDB,
git: SharedGit,
) -> Self { ) -> Self {
Self { Self {
archive,
file_reader,
file_store, file_store,
domain_events, domain_events,
artifacts_db,
git,
} }
} }
pub async fn upload_artifact(
&self,
request: UploadArtifact,
) -> anyhow::Result<UploadArtifactID> {
let upload_id = uuid::Uuid::now_v7();
self.file_store
.upload_temp(upload_id.into(), request.file_path)
.await?;
Ok(upload_id.into())
}
pub async fn commit_artifact(&self, request: CommitArtifact) -> anyhow::Result<ArtifactID> { pub async fn commit_artifact(&self, request: CommitArtifact) -> anyhow::Result<ArtifactID> {
tracing::debug!("committing artifact: {:?}", request); tracing::debug!("committing artifact: {:?}", request);
let artifact_id = ArtifactID::new(); let artifact_id = ArtifactID::new();
let artifact = self.file_store.get_temp(request.upload_id).await?; let files = self.file_reader.read_files(request.folder).await?;
let archive = self.archive.create_archive(files).await?;
self.file_store self.file_store
.upload_file(artifact_id.clone(), artifact) .upload_archive(artifact_id.clone(), archive)
.await?; .await?;
self.domain_events self.domain_events
@ -61,68 +53,8 @@ impl ReleaseManager {
})?) })?)
.await?; .await?;
self.artifacts_db
.commit_artifact(AddCommitArtifact {
app: request.app,
branch: request.branch,
artifact_id: artifact_id.clone().into(),
})
.await?;
Ok(artifact_id) Ok(artifact_id)
} }
pub async fn release(&self, release_req: Release) -> anyhow::Result<()> {
tracing::debug!(
app = release_req.app,
branch = release_req.branch,
"releasing latest commit"
);
let latest_artifact = self
.artifacts_db
.get_latest_artifact(GetLatestArtifact {
app: release_req.app.clone(),
branch: release_req.branch.clone(),
})
.await?;
tracing::trace!("found latest artifact: {:?}", latest_artifact);
let artifact = self
.file_store
.get_archive(latest_artifact.artifact_id.into())
.await?;
tracing::trace!("placed artifact in: {}", artifact.display());
self.domain_events
.publish_event(&serde_json::to_string(&PublishedArtifactEvent {
artifact_id: latest_artifact.artifact_id.to_string(),
app: release_req.app.clone(),
branch: release_req.branch.clone(),
})?)
.await?;
let artifact_contents = tokio::fs::read(artifact).await?;
let env = if release_req.branch == "main" {
// FIXME: select prod instead
"prod"
//"dev"
} else {
"dev"
};
self.git
.publish(
&ArchiveFile::from(artifact_contents),
&release_req.app,
env,
env,
)
.await?;
Ok(())
}
} }
#[derive(Serialize)] #[derive(Serialize)]
@ -130,12 +62,55 @@ pub struct CommittedArtifactEvent {
artifact_id: String, artifact_id: String,
} }
#[derive(Serialize)]
pub struct PublishedArtifactEvent {
app: String,
branch: String,
artifact_id: String,
}
pub mod extensions; pub mod extensions;
pub mod models; pub mod models;
#[cfg(test)]
mod test {
use crate::services::archive::{ArchiveFile, MockArchive};
use crate::services::domain_events::MockDomainEvents;
use crate::services::file_reader::{Files, MockFileReader};
use crate::services::file_store::MockFileStore;
use super::*;
#[tokio::test]
async fn generated_artifact_id() -> anyhow::Result<()> {
let mut file_store = MockFileStore::default();
file_store
.expect_upload_file()
.times(1)
.returning(|_, _| Ok(()));
let mut domain_events = MockDomainEvents::default();
domain_events
.expect_publish_event()
.times(1)
.returning(|_| Ok(()));
let mut file_reader = MockFileReader::default();
file_reader
.expect_read_files()
.times(1)
.returning(|_| Ok(Files::default()));
let mut archive = MockArchive::default();
archive.expect_create_archive().times(1).returning(|_| {
Ok(ArchiveFile {
content: Vec::new(),
})
});
let releaser_manager = ReleaseManager::new(file_reader, file_store, archive, domain_events);
releaser_manager
.commit_artifact(CommitArtifact {
app: "app".into(),
branch: "branch".into(),
folder: "someFolder".into(),
})
.await?;
Ok(())
}
}

View File

@ -1,8 +1,8 @@
use crate::{ use crate::{
app::SharedApp, app::SharedApp,
services::{ services::{
artifacts_db::extensions::ArtifactsDBExt, domain_events::extensions::DomainEventsExt, archive::extensions::ArchiveExt, domain_events::extensions::DomainEventsExt,
file_store::extensions::FileStoreExt, file_reader::extensions::FileReaderExt, file_store::extensions::FileStoreExt,
}, },
}; };
@ -15,10 +15,10 @@ pub trait ReleaseManagerExt {
impl ReleaseManagerExt for SharedApp { impl ReleaseManagerExt for SharedApp {
fn release_manager(&self) -> ReleaseManager { fn release_manager(&self) -> ReleaseManager {
ReleaseManager::new( ReleaseManager::new(
self.file_reader(),
self.file_store(), self.file_store(),
self.archive(),
self.domain_events(), self.domain_events(),
self.artifacts_db(),
self.git.clone(),
) )
} }
} }

View File

@ -1,16 +1,10 @@
use std::{ops::Deref, path::PathBuf}; use std::path::PathBuf;
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct CommitArtifact { pub struct CommitArtifact {
pub app: String, pub app: String,
pub branch: String, pub branch: String,
pub upload_id: UploadArtifactID, pub folder: PathBuf,
}
#[derive(Clone, Debug)]
pub struct Release {
pub app: String,
pub branch: String,
} }
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
@ -29,61 +23,3 @@ impl std::ops::Deref for ArtifactID {
&self.0 &self.0
} }
} }
impl TryFrom<String> for ArtifactID {
type Error = anyhow::Error;
fn try_from(value: String) -> Result<Self, Self::Error> {
let uuid = uuid::Uuid::parse_str(&value)?;
Ok(ArtifactID(uuid))
}
}
impl From<uuid::Uuid> for ArtifactID {
fn from(value: uuid::Uuid) -> Self {
Self(value)
}
}
pub struct UploadArtifact {
pub file_path: PathBuf,
}
impl From<PathBuf> for UploadArtifact {
fn from(value: PathBuf) -> Self {
Self { file_path: value }
}
}
#[derive(Clone, Debug)]
pub struct UploadArtifactID(uuid::Uuid);
impl From<uuid::Uuid> for UploadArtifactID {
fn from(value: uuid::Uuid) -> Self {
Self(value)
}
}
impl From<ArtifactID> for uuid::Uuid {
fn from(value: ArtifactID) -> Self {
value.0
}
}
impl TryFrom<String> for UploadArtifactID {
type Error = anyhow::Error;
fn try_from(value: String) -> Result<Self, Self::Error> {
let uuid = uuid::Uuid::parse_str(&value)?;
Ok(Self(uuid))
}
}
impl Deref for UploadArtifactID {
type Target = uuid::Uuid;
fn deref(&self) -> &Self::Target {
&self.0
}
}

Binary file not shown.

View File

@ -1,14 +0,0 @@
use flux_releaser::services::{
archive::Archive, file_reader::FileReader, flux_local_cluster::FluxLocalClusterManager,
};
#[tokio::test]
async fn can_package_files() -> anyhow::Result<()> {
tracing_subscriber::fmt().init();
//let sut = FluxLocalClusterManager::new(FileReader::new(), Archive::new());
//sut.package_clusters("testdata/flux_local_cluster/").await?;
Ok(())
}

View File

@ -1,240 +0,0 @@
use std::{
net::{Ipv4Addr, SocketAddr},
sync::Arc,
time::Duration,
};
use anyhow::Context;
use flux_releaser::{
app::{LocalApp, SharedApp, SharedLocalApp},
grpc::gen::flux_releaser_client::FluxReleaserClient,
services::{
archive::ArchiveFile, file_store::extensions::FileStoreExt,
flux_local_cluster::extensions::FluxLocalClusterManagerExt,
flux_releaser_uploader::FluxReleaserUploader,
},
};
use tokio::{net::TcpListener, runtime::Runtime, sync::Mutex, time::sleep};
struct Server {
endpoints: Endpoints,
app: SharedApp,
}
#[derive(Clone, Debug)]
struct Endpoints {
http: SocketAddr,
grpc: SocketAddr,
}
impl Server {
pub async fn new() -> anyhow::Result<Self> {
let http_socket = Self::find_free_port().await?;
let grpc_socket = Self::find_free_port().await?;
Ok(Self {
endpoints: Endpoints {
http: http_socket,
grpc: grpc_socket,
},
app: SharedApp::new(flux_releaser::app::App::new().await?),
})
}
pub async fn start(&self) -> anyhow::Result<()> {
flux_releaser::cli::server::run_server(self.endpoints.http, self.endpoints.grpc).await?;
Ok(())
}
pub async fn find_free_port() -> anyhow::Result<SocketAddr> {
let socket = SocketAddr::new(std::net::IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0);
let listener = TcpListener::bind(socket).await?;
listener.local_addr().context("failed to get local addr")
}
}
static INIT: std::sync::Once = std::sync::Once::new();
async fn perform_task_with_backoff<F, Fut, T, E>(
mut task: F,
max_retries: u32,
base_delay_ms: u64,
) -> Result<T, E>
where
F: FnMut() -> Fut,
Fut: std::future::Future<Output = Result<T, E>>,
{
let mut retries = 0;
let mut delay = base_delay_ms;
loop {
match task().await {
Ok(result) => return Ok(result),
Err(_e) if retries < max_retries => {
sleep(Duration::from_millis(delay)).await;
delay *= 2; // Exponential backoff
retries += 1;
}
Err(e) => return Err(e),
}
}
}
// Makes sure the setup is ready for execution
async fn is_ready() -> anyhow::Result<()> {
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
perform_task_with_backoff(
|| async {
let endpoints = unsafe {
if ENDPOINTS.is_none() {
anyhow::bail!("endpoints not set yet");
}
ENDPOINTS.clone().unwrap()
};
let resp = reqwest::get(format!("http://{}/ping", endpoints.http)).await?;
if !resp.status().is_success() {
anyhow::bail!("failed with status: {}", resp.status());
}
Ok::<(), anyhow::Error>(())
},
5,
500,
)
.await?;
Ok(())
}
static mut ENDPOINTS: Option<Endpoints> = None;
static mut APP: Option<SharedApp> = None;
async fn setup() -> anyhow::Result<(Endpoints, SharedApp)> {
INIT.call_once(|| {
std::thread::spawn(|| {
let rt = Runtime::new().unwrap();
rt.block_on(async move {
println!("once was created once");
let server = Server::new().await.unwrap();
unsafe {
ENDPOINTS = Some(server.endpoints.clone());
APP = Some(server.app.clone());
}
server.start().await.unwrap();
})
});
});
is_ready().await?;
Ok(unsafe { (ENDPOINTS.clone().unwrap(), APP.clone().unwrap()) })
}
async fn local_setup(endpoints: Endpoints) -> anyhow::Result<SharedLocalApp> {
Ok(SharedLocalApp::new(
LocalApp::new(format!("http://{}", endpoints.grpc)).await?,
))
}
#[tokio::test]
async fn can_upload_artifact() -> anyhow::Result<()> {
return Ok(());
std::env::set_var("RUST_LOG", "flux_releaser=trace");
let (endpoints, app) = setup().await?;
let client = FluxReleaserClient::connect(format!("http://{}", endpoints.grpc)).await?;
let client = FluxReleaserUploader::new(Arc::new(Mutex::new(client)));
let bytes: Vec<u8> = vec![0; 10_000_000];
let upload_id = client
.upload_archive(ArchiveFile {
content: bytes.clone(),
})
.await?;
assert!(!upload_id.to_string().is_empty());
let actual_path = app.file_store().get_temp(upload_id).await?;
let actual_content = tokio::fs::read(actual_path).await?;
assert_eq!(&bytes, &actual_content);
Ok(())
}
#[tokio::test]
async fn can_publish_artifact() -> anyhow::Result<()> {
return Ok(());
std::env::set_var("RUST_LOG", "flux_releaser=trace");
let (endpoints, app) = setup().await?;
let local_app = local_setup(endpoints.clone()).await?;
let upload_id = local_app
.flux_local_cluster_manager()
.package_clusters("testdata/flux_local_cluster")
.await?;
let archive = app.file_store().get_temp(upload_id.clone()).await?;
assert!(archive.exists());
let artifact_id = local_app
.flux_local_cluster_manager()
.commit_artifact("some-app", "some-branch", upload_id)
.await?;
let artifact = app.file_store().get_archive(artifact_id).await?;
assert!(artifact.exists());
Ok(())
}
#[tokio::test]
async fn can_trigger_latest_release() -> anyhow::Result<()> {
return Ok(());
let test_id = uuid::Uuid::now_v7();
std::env::set_var("RUST_LOG", "flux_releaser=trace");
let (endpoints, app) = setup().await?;
let local_app = local_setup(endpoints.clone()).await?;
let upload_id = local_app
.flux_local_cluster_manager()
.package_clusters("testdata/flux_local_cluster")
.await?;
let archive = app.file_store().get_temp(upload_id.clone()).await?;
assert!(archive.exists());
let _ = local_app
.flux_local_cluster_manager()
.commit_artifact(test_id, "some-branch", upload_id)
.await?;
local_app
.flux_local_cluster_manager()
.trigger_release(test_id, "some-branch")
.await?;
// 1. Verify that release event has been sent
// 2. Verify that we've splatted the flux cluster over the upstream registry
// 3. Verify database has a release record
Ok(())
}
pub struct TestGreeter {}

View File

@ -5,17 +5,16 @@ base: "git@git.front.kjuulh.io:kjuulh/cuddle-rust-service-plan.git"
vars: vars:
service: "flux-releaser" service: "flux-releaser"
registry: kasperhermansen registry: kasperhermansen
database:
crdb: "true" clusters:
ingress: clank-prod:
- external: "true" replicas: "3"
- internal: "true" namespace: prod
cuddle/clusters: deployment:
dev: registry: git@git.front.kjuulh.io:kjuulh/clank-clusters
env: env:
service.host: "0.0.0.0:3000"
prod: prod:
env: clusters:
service.host: "0.0.0.0:3000" - clank-prod

View File

@ -1,3 +0,0 @@
{
"$schema": "https://docs.renovatebot.com/renovate-schema.json"
}

View File

@ -21,8 +21,8 @@ services:
/bin/sh -c " /bin/sh -c "
/usr/bin/mc alias set myminio http://minio:10000 minioadmin minioadminpassword; /usr/bin/mc alias set myminio http://minio:10000 minioadmin minioadminpassword;
/usr/bin/mc admin info myminio; /usr/bin/mc admin info myminio;
/usr/bin/mc mb myminio/flux-releaser; /usr/bin/mc mb myminio/mybucket;
/usr/bin/mc policy set public myminio/flux-releaser; /usr/bin/mc policy set public myminio/mybucket;
exit 0; exit 0;
" "
@ -34,17 +34,3 @@ services:
- NATS_ENABLE_AUTH=yes - NATS_ENABLE_AUTH=yes
- NATS_USERNAME=user - NATS_USERNAME=user
- NATS_PASSWORD=secret - NATS_PASSWORD=secret
crdb:
restart: 'always'
image: 'cockroachdb/cockroach:latest'
command: 'start-single-node --advertise-addr 0.0.0.0 --insecure'
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8080/health?ready=1"]
interval: '10s'
timeout: '30s'
retries: 5
start_period: '20s'
ports:
- '28080:8080'
- '26257:26257'