Compare commits
No commits in common. "main" and "feat/add-initial-publish" have entirely different histories.
main
...
feat/add-i
115
.drone.yml
115
.drone.yml
@ -1,2 +1,113 @@
|
||||
kind: template
|
||||
load: cuddle-rust-service-plan.yaml
|
||||
kind: pipeline
|
||||
name: default
|
||||
type: docker
|
||||
|
||||
steps:
|
||||
- name: load_secret
|
||||
image: debian:buster-slim
|
||||
volumes:
|
||||
- name: ssh
|
||||
path: /root/.ssh/
|
||||
environment:
|
||||
SSH_KEY:
|
||||
from_secret: gitea_id_ed25519
|
||||
commands:
|
||||
- mkdir -p $HOME/.ssh/
|
||||
- echo "$SSH_KEY" | base64 -d > $HOME/.ssh/id_ed25519
|
||||
- chmod -R 600 ~/.ssh
|
||||
- |
|
||||
cat >$HOME/.ssh/config <<EOL
|
||||
Host git.front.kjuulh.io
|
||||
IdentityFile $HOME/.ssh/id_ed25519
|
||||
IdentitiesOnly yes
|
||||
UserKnownHostsFile=/dev/null
|
||||
StrictHostKeyChecking no
|
||||
EOL
|
||||
- chmod 700 ~/.ssh/config
|
||||
|
||||
- name: build pr
|
||||
image: kasperhermansen/cuddle-rust-service-plan:main-1708174643
|
||||
pull: always
|
||||
volumes:
|
||||
- name: ssh
|
||||
path: /root/.ssh/
|
||||
commands:
|
||||
- eval `ssh-agent`
|
||||
- ssh-add
|
||||
- echo "$DOCKER_PASSWORD" | docker login --password-stdin --username="$DOCKER_USERNAME" docker.io
|
||||
- export RUST_LOG=trace
|
||||
- cuddle-rust-service-plan pr
|
||||
environment:
|
||||
DOCKER_BUILDKIT: 1
|
||||
DOCKER_PASSWORD:
|
||||
from_secret: docker_password
|
||||
DOCKER_USERNAME:
|
||||
from_secret: docker_username
|
||||
CUDDLE_SECRETS_PROVIDER: 1password
|
||||
CUDDLE_ONE_PASSWORD_DOT_ENV: ".env.ci"
|
||||
CUDDLE_SSH_AGENT: "true"
|
||||
CUDDLE_PLEASE_TOKEN:
|
||||
from_secret: cuddle_please_token
|
||||
OP_SERVICE_ACCOUNT_TOKEN:
|
||||
from_secret: op_service_account_token
|
||||
DOCKER_HOST: "tcp://192.168.1.233:2376"
|
||||
when:
|
||||
event:
|
||||
- push
|
||||
- pull_request
|
||||
exclude:
|
||||
- main
|
||||
- master
|
||||
depends_on:
|
||||
- "load_secret"
|
||||
|
||||
- name: build main
|
||||
image: kasperhermansen/cuddle-rust-service-plan:main-1708174643
|
||||
pull: always
|
||||
volumes:
|
||||
- name: ssh
|
||||
path: /root/.ssh/
|
||||
commands:
|
||||
- eval `ssh-agent`
|
||||
- ssh-add
|
||||
- echo "$DOCKER_PASSWORD" | docker login --password-stdin --username="$DOCKER_USERNAME" docker.io
|
||||
- export RUST_LOG=trace
|
||||
- cuddle-rust-service-plan main
|
||||
environment:
|
||||
REGISTRY_CACHE_USERNAME:
|
||||
from_secret: registry_cache_username
|
||||
REGISTRY_CACHE_PASSWORD:
|
||||
from_secret: registry_cache_password
|
||||
REGISTRY_CACHE_TOKEN:
|
||||
from_secret: registry_cache_token
|
||||
REGISTRY_CACHE_url:
|
||||
from_secret: registry_cache_url
|
||||
DOCKER_BUILDKIT: 1
|
||||
DOCKER_PASSWORD:
|
||||
from_secret: docker_password
|
||||
DOCKER_USERNAME:
|
||||
from_secret: docker_username
|
||||
CUDDLE_SECRETS_PROVIDER: 1password
|
||||
CUDDLE_ONE_PASSWORD_DOT_ENV: ".env.ci"
|
||||
CUDDLE_SSH_AGENT: "true"
|
||||
GIT_PASSWORD:
|
||||
from_secret: git_password
|
||||
DOCKER_HOST: "tcp://192.168.1.233:2376"
|
||||
CUDDLE_PLEASE_TOKEN:
|
||||
from_secret: cuddle_please_token
|
||||
OP_SERVICE_ACCOUNT_TOKEN:
|
||||
from_secret: op_service_account_token
|
||||
when:
|
||||
event:
|
||||
- push
|
||||
branch:
|
||||
- main
|
||||
- master
|
||||
exclude:
|
||||
- pull_request
|
||||
depends_on:
|
||||
- "load_secret"
|
||||
|
||||
volumes:
|
||||
- name: ssh
|
||||
temp: {}
|
||||
|
2
.env
2
.env
@ -5,5 +5,3 @@ AWS_SECRET_ACCESS_KEY=minioadminpassword
|
||||
NATS_URL=127.0.0.1:4222
|
||||
NATS_USERNAME=user
|
||||
NATS_PASSWORD=secret
|
||||
|
||||
DATABASE_URL=postgres://root@localhost:26257/defaultdb?sslmode=disable
|
||||
|
2338
Cargo.lock
generated
2338
Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
@ -11,32 +11,21 @@ tracing-subscriber.workspace = true
|
||||
clap.workspace = true
|
||||
dotenv.workspace = true
|
||||
axum.workspace = true
|
||||
prost = "0.13.2"
|
||||
tonic = { version = "0.12.2", features = ["tls", "tls-native-roots"] }
|
||||
prost = "0.12.3"
|
||||
tonic = "0.11.0"
|
||||
uuid = { version = "1.7.0", features = ["v7", "v4"] }
|
||||
async-trait = "0.1.77"
|
||||
aws-config = { version = "1.5.5", features = ["behavior-version-latest"] }
|
||||
aws-sdk-s3 = { version = "1.48.0", features = ["behavior-version-latest"] }
|
||||
mockall_double = "0.3.1"
|
||||
aws-config = { version = "1.1.5", features = ["behavior-version-latest"] }
|
||||
aws-sdk-s3 = { version = "1.15.0", features = ["behavior-version-latest"] }
|
||||
serde = { version = "1.0.196", features = ["derive"] }
|
||||
serde_json = "1.0.113"
|
||||
nats = "0.25.0"
|
||||
nats = "0.24.1"
|
||||
walkdir = "2.4.0"
|
||||
tar = "0.4.40"
|
||||
tokio-stream = { version = "0.1.15", features = ["full"] }
|
||||
rand = "0.8.5"
|
||||
sqlx = { version = "0.8.0", features = [
|
||||
"postgres",
|
||||
"runtime-tokio",
|
||||
"uuid",
|
||||
"chrono",
|
||||
] }
|
||||
chrono = "0.4.34"
|
||||
git2 = "0.19.0"
|
||||
rustls = { version = "0.23.12" }
|
||||
|
||||
[build-dependencies]
|
||||
tonic-build = "0.12.0"
|
||||
tonic-build = "0.11.0"
|
||||
|
||||
[dev-dependencies]
|
||||
lazy_static = "1.4.0"
|
||||
reqwest = "0.12.0"
|
||||
mockall = "0.12.1"
|
||||
|
@ -1,8 +0,0 @@
|
||||
-- Add migration script here
|
||||
CREATE TABLE IF NOT EXISTS artifacts (
|
||||
app VARCHAR NOT NULL,
|
||||
branch VARCHAR NOT NULL,
|
||||
artifact_id UUID NOT NULL PRIMARY KEY,
|
||||
created_at TIMESTAMP DEFAULT NOW(),
|
||||
UNIQUE (app, artifact_id)
|
||||
);
|
@ -2,33 +2,16 @@ syntax = "proto3";
|
||||
|
||||
package flux_releaser;
|
||||
|
||||
service FluxReleaser {
|
||||
rpc UploadArtifact (stream UploadArtifactRequest) returns (UploadArtifactResponse) {}
|
||||
rpc CommitArtifact (CommitArtifactRequest) returns (CommitArtifactResponse) {}
|
||||
rpc TriggerRelease (TriggerReleaseRequest) returns (TriggerReleaseResponse) {}
|
||||
service Greeter {
|
||||
rpc SayHello (HelloRequest) returns (HelloReply) {}
|
||||
}
|
||||
|
||||
message UploadArtifactRequest {
|
||||
bytes content = 1;
|
||||
}
|
||||
|
||||
message UploadArtifactResponse {
|
||||
string upload_id = 1;
|
||||
}
|
||||
|
||||
message CommitArtifactRequest {
|
||||
message HelloRequest {
|
||||
string app = 1;
|
||||
string branch = 2;
|
||||
string upload_id = 3;
|
||||
string folder = 3; // Change to files instead
|
||||
}
|
||||
|
||||
message CommitArtifactResponse {
|
||||
string artifact_id = 1;
|
||||
message HelloReply {
|
||||
string message = 1;
|
||||
}
|
||||
|
||||
message TriggerReleaseRequest {
|
||||
string app = 1;
|
||||
string branch = 2;
|
||||
}
|
||||
|
||||
message TriggerReleaseResponse {}
|
||||
|
@ -1,14 +1,11 @@
|
||||
use std::net::SocketAddr;
|
||||
|
||||
use axum::{response::IntoResponse, routing::get, Router};
|
||||
use axum::{routing::get, Router};
|
||||
|
||||
use crate::app::SharedApp;
|
||||
|
||||
pub async fn axum_serve(host: SocketAddr, app: SharedApp) -> anyhow::Result<()> {
|
||||
let app = Router::new()
|
||||
.route("/ping", get(pong))
|
||||
.route("/", get(root))
|
||||
.with_state(app);
|
||||
let app = Router::new().route("/", get(root)).with_state(app);
|
||||
|
||||
tracing::info!("listening on {}", host);
|
||||
let listener = tokio::net::TcpListener::bind(host).await.unwrap();
|
||||
@ -18,10 +15,6 @@ pub async fn axum_serve(host: SocketAddr, app: SharedApp) -> anyhow::Result<()>
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn pong() -> impl IntoResponse {
|
||||
"pong!"
|
||||
}
|
||||
|
||||
async fn root() -> &'static str {
|
||||
"Hello, flux-releaser!"
|
||||
}
|
||||
|
@ -1,17 +1,9 @@
|
||||
use std::{ops::Deref, sync::Arc};
|
||||
|
||||
use sqlx::{PgPool, Postgres};
|
||||
|
||||
use crate::services::{
|
||||
archive::Archive,
|
||||
cluster_list::ClusterList,
|
||||
git::{Git, SharedGit},
|
||||
};
|
||||
|
||||
use self::infra::{
|
||||
aws_s3::s3_client,
|
||||
grpc::{new_client, FluxReleaserGrpcClient},
|
||||
};
|
||||
|
||||
use self::infra::aws_s3::s3_client;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct SharedApp(Arc<App>);
|
||||
@ -33,8 +25,6 @@ impl SharedApp {
|
||||
pub struct App {
|
||||
pub s3_client: aws_sdk_s3::Client,
|
||||
pub nats: infra::nats::Nats,
|
||||
pub database: PgPool,
|
||||
pub git: SharedGit,
|
||||
}
|
||||
|
||||
impl App {
|
||||
@ -42,45 +32,8 @@ impl App {
|
||||
Ok(Self {
|
||||
s3_client: s3_client().await?,
|
||||
nats: infra::nats::Nats::new().await?,
|
||||
database: infra::database::get_database().await?,
|
||||
git: Git::new(
|
||||
std::env::var("FLUX_RELEASER_GIT_REPOSITORY")
|
||||
.unwrap_or("ssh://git@git.front.kjuulh.io/kjuulh/clank-clusters.git".into()),
|
||||
ClusterList::default(),
|
||||
Archive::default(),
|
||||
)
|
||||
.into(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
pub mod infra;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct SharedLocalApp(Arc<LocalApp>);
|
||||
|
||||
impl Deref for SharedLocalApp {
|
||||
type Target = Arc<LocalApp>;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.0
|
||||
}
|
||||
}
|
||||
|
||||
impl SharedLocalApp {
|
||||
pub fn new(app: LocalApp) -> Self {
|
||||
Self(Arc::new(app))
|
||||
}
|
||||
}
|
||||
|
||||
pub struct LocalApp {
|
||||
pub grpc_client: FluxReleaserGrpcClient,
|
||||
}
|
||||
|
||||
impl LocalApp {
|
||||
pub async fn new(registry: impl Into<String>) -> anyhow::Result<Self> {
|
||||
Ok(Self {
|
||||
grpc_client: new_client(registry).await?,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
@ -1,4 +1,2 @@
|
||||
pub mod aws_s3;
|
||||
pub mod database;
|
||||
pub mod grpc;
|
||||
pub mod nats;
|
||||
|
@ -1,4 +1,3 @@
|
||||
use anyhow::Context;
|
||||
use aws_config::{BehaviorVersion, Region};
|
||||
use aws_sdk_s3::config::Credentials;
|
||||
|
||||
@ -6,16 +5,15 @@ pub async fn s3_client() -> anyhow::Result<aws_sdk_s3::Client> {
|
||||
let shared_config = aws_config::defaults(BehaviorVersion::latest())
|
||||
.region(Region::new("eu-west-1"))
|
||||
.credentials_provider(Credentials::new(
|
||||
std::env::var("AWS_ACCESS_KEY_ID").context("AWS_ACCESS_KEY_ID was not set")?,
|
||||
std::env::var("AWS_SECRET_ACCESS_KEY").context("AWS_SECRET_ACCESS_KEY was not set")?,
|
||||
std::env::var("AWS_ACCESS_KEY_ID")?,
|
||||
std::env::var("AWS_SECRET_ACCESS_KEY")?,
|
||||
None,
|
||||
None,
|
||||
"flux_releaser",
|
||||
));
|
||||
|
||||
let config = aws_sdk_s3::config::Builder::from(&shared_config.load().await)
|
||||
.endpoint_url(std::env::var("AWS_ENDPOINT_URL").context("AWS_ENDPOINT_URL was not set")?)
|
||||
.force_path_style(true)
|
||||
.endpoint_url(std::env::var("AWS_ENDPOINT_URL")?)
|
||||
.build();
|
||||
|
||||
let client = aws_sdk_s3::Client::from_conf(config);
|
||||
|
@ -1,19 +0,0 @@
|
||||
use anyhow::Context;
|
||||
use sqlx::{PgPool, Postgres};
|
||||
|
||||
pub async fn get_database() -> anyhow::Result<PgPool> {
|
||||
tracing::trace!("initializing database");
|
||||
let db =
|
||||
sqlx::PgPool::connect(&std::env::var("DATABASE_URL").context("DATABASE_URL is not set")?)
|
||||
.await?;
|
||||
|
||||
tracing::trace!("migrating crdb");
|
||||
sqlx::migrate!("migrations/crdb")
|
||||
.set_locking(false)
|
||||
.run(&db)
|
||||
.await?;
|
||||
|
||||
let _ = sqlx::query("SELECT 1;").fetch_one(&db).await?;
|
||||
|
||||
Ok(db)
|
||||
}
|
@ -1,36 +0,0 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use tokio::sync::Mutex;
|
||||
use tonic::transport::{Channel, ClientTlsConfig};
|
||||
|
||||
use crate::grpc::gen::flux_releaser_client::FluxReleaserClient;
|
||||
|
||||
pub type FluxReleaserGrpcClient = Arc<Mutex<FluxReleaserClient<Channel>>>;
|
||||
|
||||
pub async fn new_client(registry: impl Into<String>) -> anyhow::Result<FluxReleaserGrpcClient> {
|
||||
let registry: String = registry.into();
|
||||
|
||||
// Workaround: https://github.com/algesten/ureq/issues/765#issuecomment-2282921492
|
||||
static INIT: std::sync::Once = std::sync::Once::new();
|
||||
|
||||
INIT.call_once(|| {
|
||||
rustls::crypto::aws_lc_rs::default_provider()
|
||||
.install_default()
|
||||
.unwrap();
|
||||
});
|
||||
|
||||
let channel = if registry.starts_with("https") {
|
||||
let tls = ClientTlsConfig::new().with_native_roots();
|
||||
|
||||
Channel::from_shared(registry)?
|
||||
.tls_config(tls)?
|
||||
.connect()
|
||||
.await?
|
||||
} else {
|
||||
Channel::from_shared(registry)?.connect().await?
|
||||
};
|
||||
|
||||
let client = FluxReleaserClient::new(channel);
|
||||
|
||||
Ok(Arc::new(Mutex::new(client)))
|
||||
}
|
@ -1,10 +1,11 @@
|
||||
use clap::{Parser, Subcommand};
|
||||
use std::net::SocketAddr;
|
||||
|
||||
use crate::services::flux_local_cluster::extensions::FluxLocalClusterManagerExt;
|
||||
|
||||
pub mod client;
|
||||
pub mod server;
|
||||
use crate::{
|
||||
api::axum_serve,
|
||||
app::{App, SharedApp},
|
||||
grpc::tonic_serve,
|
||||
};
|
||||
|
||||
#[derive(Parser)]
|
||||
#[command(author, version, about, long_about = None, subcommand_required = true)]
|
||||
@ -21,62 +22,27 @@ pub enum Commands {
|
||||
#[arg(env = "SERVICE_GRPC_HOST", long, default_value = "127.0.0.1:7900")]
|
||||
grpc_host: SocketAddr,
|
||||
},
|
||||
|
||||
Commit {
|
||||
#[arg(long)]
|
||||
app: String,
|
||||
#[arg(long)]
|
||||
branch: String,
|
||||
#[arg(long)]
|
||||
include: String,
|
||||
#[arg(env = "FLUX_RELEASER_REGISTRY", long)]
|
||||
registry: String,
|
||||
},
|
||||
|
||||
Release {
|
||||
#[arg(long)]
|
||||
app: String,
|
||||
|
||||
#[arg(long)]
|
||||
branch: String,
|
||||
#[arg(env = "FLUX_RELEASER_REGISTRY", long)]
|
||||
registry: String,
|
||||
},
|
||||
}
|
||||
|
||||
impl Command {
|
||||
pub async fn run() -> anyhow::Result<()> {
|
||||
let cli = Command::parse();
|
||||
|
||||
match cli.command {
|
||||
Some(Commands::Serve { host, grpc_host }) => {
|
||||
server::run_server(host, grpc_host).await?;
|
||||
}
|
||||
Some(Commands::Commit {
|
||||
app,
|
||||
branch,
|
||||
include,
|
||||
registry,
|
||||
}) => {
|
||||
let app = client::get_local_app(registry).await?;
|
||||
if let Some(Commands::Serve { host, grpc_host }) = cli.command {
|
||||
tracing_subscriber::fmt::init();
|
||||
|
||||
let upload_id = app
|
||||
.flux_local_cluster_manager()
|
||||
.package_clusters(include)
|
||||
.await?;
|
||||
}
|
||||
Some(Commands::Release {
|
||||
app: service_app,
|
||||
branch,
|
||||
registry,
|
||||
}) => {
|
||||
let app = client::get_local_app(registry).await?;
|
||||
tracing::info!("Starting service");
|
||||
|
||||
app.flux_local_cluster_manager()
|
||||
.trigger_release(service_app, branch)
|
||||
.await?;
|
||||
}
|
||||
None => (),
|
||||
let app = SharedApp::new(App::new().await?);
|
||||
|
||||
tokio::select! {
|
||||
res = axum_serve(host, app.clone()) => {
|
||||
res?;
|
||||
},
|
||||
res = tonic_serve(grpc_host, app.clone()) => {
|
||||
res?;
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
@ -1,11 +0,0 @@
|
||||
use crate::app::{LocalApp, SharedLocalApp};
|
||||
|
||||
pub async fn get_local_app(registry: impl Into<String>) -> anyhow::Result<SharedLocalApp> {
|
||||
tracing_subscriber::fmt::init();
|
||||
|
||||
tracing::debug!("Starting client commit");
|
||||
|
||||
let app = SharedLocalApp::new(LocalApp::new(registry).await?);
|
||||
|
||||
Ok(app)
|
||||
}
|
@ -1,29 +0,0 @@
|
||||
use std::net::SocketAddr;
|
||||
|
||||
use crate::{
|
||||
api::axum_serve,
|
||||
app::{App, SharedApp},
|
||||
grpc::tonic_serve,
|
||||
};
|
||||
|
||||
pub async fn run_server(
|
||||
host: impl Into<SocketAddr>,
|
||||
grpc_host: impl Into<SocketAddr>,
|
||||
) -> anyhow::Result<()> {
|
||||
tracing_subscriber::fmt::init();
|
||||
|
||||
tracing::info!("Starting service");
|
||||
|
||||
let app = SharedApp::new(App::new().await?);
|
||||
|
||||
tokio::select! {
|
||||
res = axum_serve(host.into(), app.clone()) => {
|
||||
res?;
|
||||
},
|
||||
res = tonic_serve(grpc_host.into(), app.clone()) => {
|
||||
res?;
|
||||
},
|
||||
};
|
||||
|
||||
Ok(())
|
||||
}
|
@ -1,25 +1,17 @@
|
||||
use std::{env::temp_dir, fmt::Display, net::SocketAddr};
|
||||
use std::net::SocketAddr;
|
||||
|
||||
use tokio::io::AsyncWriteExt;
|
||||
use tokio_stream::StreamExt;
|
||||
use tonic::{service::interceptor, transport::Server};
|
||||
use uuid::Uuid;
|
||||
use tonic::transport::Server;
|
||||
|
||||
use crate::{
|
||||
app::SharedApp,
|
||||
services::release_manager::{
|
||||
extensions::ReleaseManagerExt,
|
||||
models::{CommitArtifact, Release},
|
||||
ReleaseManager,
|
||||
extensions::ReleaseManagerExt, models::CommitArtifact, ReleaseManager,
|
||||
},
|
||||
};
|
||||
|
||||
use self::gen::{
|
||||
flux_releaser_server, CommitArtifactRequest, CommitArtifactResponse, TriggerReleaseRequest,
|
||||
TriggerReleaseResponse, UploadArtifactRequest, UploadArtifactResponse,
|
||||
};
|
||||
use self::gen::{greeter_server, HelloReply, HelloRequest};
|
||||
|
||||
pub mod gen {
|
||||
mod gen {
|
||||
tonic::include_proto!("flux_releaser");
|
||||
}
|
||||
|
||||
@ -35,139 +27,45 @@ impl FluxReleaserGrpc {
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for FluxReleaserGrpc {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[tonic::async_trait]
|
||||
impl flux_releaser_server::FluxReleaser for FluxReleaserGrpc {
|
||||
#[tracing::instrument]
|
||||
async fn upload_artifact(
|
||||
impl greeter_server::Greeter for FluxReleaserGrpc {
|
||||
async fn say_hello(
|
||||
&self,
|
||||
request: tonic::Request<tonic::Streaming<UploadArtifactRequest>>,
|
||||
) -> std::result::Result<tonic::Response<UploadArtifactResponse>, tonic::Status> {
|
||||
let mut stream = request.into_inner();
|
||||
|
||||
let file_path = temp_dir()
|
||||
.join("flux_releaser")
|
||||
.join("tmp")
|
||||
.join("upload_artifact")
|
||||
.join(Uuid::new_v4().to_string());
|
||||
tokio::fs::create_dir_all(file_path.parent().unwrap()).await?;
|
||||
let mut file = tokio::fs::File::create(&file_path).await?;
|
||||
|
||||
while let Some(item) = stream.next().await {
|
||||
tracing::trace!("received chunk");
|
||||
let item = item?;
|
||||
|
||||
let _ = file.write(&item.content).await?;
|
||||
}
|
||||
tracing::info!("got this far 1a");
|
||||
|
||||
file.flush().await?;
|
||||
tracing::info!("got this far 1");
|
||||
|
||||
let upload_id = match self.release_manager.upload_artifact(file_path.into()).await {
|
||||
Ok(res) => res,
|
||||
Err(e) => {
|
||||
tracing::warn!("failed to upload artifact: {}", e);
|
||||
return Err(tonic::Status::unknown(e.to_string()));
|
||||
}
|
||||
};
|
||||
|
||||
tracing::info!("got this far 2");
|
||||
|
||||
Ok(tonic::Response::new(UploadArtifactResponse {
|
||||
upload_id: upload_id.to_string(),
|
||||
}))
|
||||
}
|
||||
|
||||
#[tracing::instrument]
|
||||
async fn commit_artifact(
|
||||
&self,
|
||||
request: tonic::Request<CommitArtifactRequest>,
|
||||
) -> std::result::Result<tonic::Response<CommitArtifactResponse>, tonic::Status> {
|
||||
request: tonic::Request<HelloRequest>,
|
||||
) -> std::result::Result<tonic::Response<HelloReply>, tonic::Status> {
|
||||
let req = request.into_inner();
|
||||
let artifact = self
|
||||
.release_manager
|
||||
.commit_artifact(req.try_into().map_err(|e: anyhow::Error| {
|
||||
tracing::warn!("failed to parse input body: {}", e);
|
||||
tonic::Status::invalid_argument(e.to_string())
|
||||
})?)
|
||||
.await
|
||||
.map_err(|e: anyhow::Error| {
|
||||
tracing::warn!("failed to commit artifact: {}", e);
|
||||
tonic::Status::internal(e.to_string())
|
||||
})?;
|
||||
|
||||
Ok(tonic::Response::new(CommitArtifactResponse {
|
||||
artifact_id: artifact.to_string(),
|
||||
}))
|
||||
}
|
||||
|
||||
#[tracing::instrument]
|
||||
async fn trigger_release(
|
||||
&self,
|
||||
request: tonic::Request<TriggerReleaseRequest>,
|
||||
) -> std::result::Result<tonic::Response<TriggerReleaseResponse>, tonic::Status> {
|
||||
let req = request.into_inner();
|
||||
|
||||
tracing::info!("some trigger release");
|
||||
|
||||
self.release_manager
|
||||
.release(req.try_into().map_err(|e: anyhow::Error| {
|
||||
tracing::warn!("failed to parse input body: {}", e);
|
||||
tonic::Status::invalid_argument(e.to_string())
|
||||
})?)
|
||||
.commit_artifact(
|
||||
req.try_into()
|
||||
.map_err(|e: anyhow::Error| tonic::Status::invalid_argument(e.to_string()))?,
|
||||
)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
tracing::warn!("failed to release: {}", e);
|
||||
tonic::Status::internal(e.to_string())
|
||||
})?;
|
||||
.unwrap();
|
||||
|
||||
Ok(tonic::Response::new(TriggerReleaseResponse {}))
|
||||
Ok(tonic::Response::new(HelloReply {
|
||||
message: "something".into(),
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<CommitArtifactRequest> for CommitArtifact {
|
||||
impl TryFrom<HelloRequest> for CommitArtifact {
|
||||
type Error = anyhow::Error;
|
||||
|
||||
fn try_from(value: CommitArtifactRequest) -> Result<Self, Self::Error> {
|
||||
fn try_from(value: HelloRequest) -> Result<Self, Self::Error> {
|
||||
if value.app.is_empty() {
|
||||
anyhow::bail!("app cannot be empty")
|
||||
}
|
||||
if value.branch.is_empty() {
|
||||
anyhow::bail!("branch cannot be empty")
|
||||
}
|
||||
if value.upload_id.is_empty() {
|
||||
if value.folder.is_empty() {
|
||||
anyhow::bail!("folder cannot be empty")
|
||||
}
|
||||
|
||||
Ok(Self {
|
||||
app: value.app,
|
||||
branch: value.branch,
|
||||
upload_id: value.upload_id.try_into()?,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<TriggerReleaseRequest> for Release {
|
||||
type Error = anyhow::Error;
|
||||
|
||||
fn try_from(value: TriggerReleaseRequest) -> Result<Self, Self::Error> {
|
||||
if value.app.is_empty() {
|
||||
anyhow::bail!("app cannot be empty");
|
||||
}
|
||||
|
||||
if value.branch.is_empty() {
|
||||
anyhow::bail!("branch canot be empty");
|
||||
}
|
||||
|
||||
Ok(Self {
|
||||
app: value.app,
|
||||
branch: value.branch,
|
||||
folder: value.folder.into(),
|
||||
})
|
||||
}
|
||||
}
|
||||
@ -175,14 +73,11 @@ impl TryFrom<TriggerReleaseRequest> for Release {
|
||||
pub async fn tonic_serve(host: SocketAddr, app: SharedApp) -> anyhow::Result<()> {
|
||||
tracing::info!("grpc listening on: {}", host);
|
||||
Server::builder()
|
||||
.trace_fn(|_| tracing::info_span!("flux_releaser"))
|
||||
.add_service(flux_releaser_server::FluxReleaserServer::new(
|
||||
FluxReleaserGrpc::new(app),
|
||||
))
|
||||
.add_service(greeter_server::GreeterServer::new(FluxReleaserGrpc::new(
|
||||
app,
|
||||
)))
|
||||
.serve(host)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub struct LogLayer {}
|
||||
|
@ -1,15 +0,0 @@
|
||||
use cli::Command;
|
||||
|
||||
pub mod api;
|
||||
pub mod app;
|
||||
pub mod cli;
|
||||
pub mod grpc;
|
||||
pub mod services;
|
||||
|
||||
pub async fn run() -> anyhow::Result<()> {
|
||||
dotenv::dotenv().ok();
|
||||
|
||||
Command::run().await?;
|
||||
|
||||
Ok(())
|
||||
}
|
@ -1,6 +1,19 @@
|
||||
use cli::Command;
|
||||
|
||||
mod cli;
|
||||
|
||||
mod api;
|
||||
mod grpc;
|
||||
|
||||
mod app;
|
||||
|
||||
mod services;
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
flux_releaser::run().await?;
|
||||
dotenv::dotenv().ok();
|
||||
|
||||
Command::run().await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
@ -1,11 +1,5 @@
|
||||
pub mod archive;
|
||||
pub mod cluster_list;
|
||||
pub mod domain_events;
|
||||
pub mod file_reader;
|
||||
pub mod file_store;
|
||||
pub mod flux_local_cluster;
|
||||
pub mod flux_releaser_uploader;
|
||||
pub mod git;
|
||||
mod archive;
|
||||
mod domain_events;
|
||||
mod file_reader;
|
||||
mod file_store;
|
||||
pub mod release_manager;
|
||||
|
||||
pub mod artifacts_db;
|
||||
|
@ -1,75 +1,7 @@
|
||||
#[derive(Clone, Default)]
|
||||
#[derive(Clone)]
|
||||
pub struct Archive {}
|
||||
|
||||
use std::{
|
||||
io::{Bytes, Cursor},
|
||||
path::Path,
|
||||
};
|
||||
|
||||
use anyhow::Context;
|
||||
|
||||
use super::file_reader::Files;
|
||||
|
||||
impl Archive {
|
||||
pub fn new() -> Self {
|
||||
Self {}
|
||||
}
|
||||
|
||||
pub async fn create_archive(&self, prefix: &Path, files: Files) -> anyhow::Result<ArchiveFile> {
|
||||
tracing::trace!("archiving files: {}", files.len());
|
||||
|
||||
let buffer = Vec::new();
|
||||
let cursor = Cursor::new(buffer);
|
||||
|
||||
let mut tar_builder = tar::Builder::new(cursor);
|
||||
|
||||
for file in files {
|
||||
let abs_file_path = file.path;
|
||||
|
||||
let mut fd = std::fs::File::open(&abs_file_path)?;
|
||||
if let Some(rel) = file.relative {
|
||||
tracing::trace!("archiving rel file: {}", rel.display());
|
||||
tar_builder.append_file(&rel, &mut fd)?;
|
||||
} else {
|
||||
tracing::trace!("archiving file: {}", abs_file_path.display());
|
||||
tar_builder.append_file(
|
||||
abs_file_path
|
||||
.strip_prefix(prefix)
|
||||
.context("failed to strip prefix from path")?,
|
||||
&mut fd,
|
||||
)?;
|
||||
}
|
||||
}
|
||||
|
||||
tar_builder.finish()?;
|
||||
|
||||
let cursor = tar_builder.into_inner()?;
|
||||
let buffer = cursor.into_inner();
|
||||
|
||||
Ok(buffer.into())
|
||||
}
|
||||
|
||||
pub async fn unpack_archive(&self, archive: &ArchiveFile, dest: &Path) -> anyhow::Result<()> {
|
||||
tracing::trace!("unpacking archive: {}", dest.display());
|
||||
|
||||
let cursor = Cursor::new(archive.content.clone());
|
||||
let mut arc = tar::Archive::new(cursor);
|
||||
|
||||
arc.unpack(dest)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
pub struct ArchiveFile {
|
||||
pub content: Vec<u8>,
|
||||
}
|
||||
|
||||
impl From<Vec<u8>> for ArchiveFile {
|
||||
fn from(value: Vec<u8>) -> Self {
|
||||
Self { content: value }
|
||||
}
|
||||
}
|
||||
use std::{io::Cursor, path::Path};
|
||||
|
||||
pub mod extensions {
|
||||
|
||||
@ -79,11 +11,12 @@ pub mod extensions {
|
||||
use tokio::io::AsyncWriteExt;
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::app::SharedLocalApp;
|
||||
use crate::{app::SharedApp, services::release_manager::models::ArtifactID};
|
||||
|
||||
#[mockall_double::double]
|
||||
use crate::services::file_store::FileStore;
|
||||
|
||||
#[mockall_double::double]
|
||||
use super::Archive;
|
||||
use super::ArchiveFile;
|
||||
|
||||
@ -97,12 +30,6 @@ pub mod extensions {
|
||||
}
|
||||
}
|
||||
|
||||
impl ArchiveExt for SharedLocalApp {
|
||||
fn archive(&self) -> Archive {
|
||||
Archive::new()
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
pub trait ArchiveUploadExt {
|
||||
async fn upload_archive(
|
||||
@ -164,3 +91,49 @@ pub mod extensions {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
use mockall::{automock, mock, predicate::*};
|
||||
|
||||
use super::file_reader::{File, Files};
|
||||
|
||||
#[cfg_attr(test, automock)]
|
||||
impl Archive {
|
||||
pub fn new() -> Self {
|
||||
Self {}
|
||||
}
|
||||
|
||||
pub async fn create_archive(&self, files: Files) -> anyhow::Result<ArchiveFile> {
|
||||
tracing::trace!("archiving files");
|
||||
|
||||
let buffer = Vec::new();
|
||||
let cursor = Cursor::new(buffer);
|
||||
|
||||
let mut tar_builder = tar::Builder::new(cursor);
|
||||
|
||||
for file in files {
|
||||
let abs_file_path = file.path;
|
||||
|
||||
tracing::trace!("archiving file: {}", abs_file_path.display());
|
||||
let mut fd = std::fs::File::open(&abs_file_path)?;
|
||||
tar_builder.append_file(&abs_file_path, &mut fd)?;
|
||||
}
|
||||
|
||||
tar_builder.finish()?;
|
||||
|
||||
let cursor = tar_builder.into_inner()?;
|
||||
let buffer = cursor.into_inner();
|
||||
|
||||
Ok(buffer.into())
|
||||
}
|
||||
}
|
||||
|
||||
pub struct ArchiveFile {
|
||||
pub content: Vec<u8>,
|
||||
}
|
||||
|
||||
impl From<Vec<u8>> for ArchiveFile {
|
||||
fn from(value: Vec<u8>) -> Self {
|
||||
Self { content: value }
|
||||
}
|
||||
}
|
||||
|
@ -1,132 +0,0 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use sqlx::{prelude::FromRow, PgPool};
|
||||
|
||||
use self::defaults::DefaultArtifactsDB;
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct AddCommitArtifact {
|
||||
pub app: String,
|
||||
pub branch: String,
|
||||
pub artifact_id: uuid::Uuid,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, FromRow)]
|
||||
pub struct Artifact {
|
||||
pub app: String,
|
||||
pub branch: String,
|
||||
pub artifact_id: uuid::Uuid,
|
||||
pub created_at: chrono::NaiveDateTime,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct GetLatestArtifact {
|
||||
pub app: String,
|
||||
pub branch: String,
|
||||
}
|
||||
|
||||
pub mod traits {
|
||||
use axum::async_trait;
|
||||
|
||||
use super::{AddCommitArtifact, Artifact, GetLatestArtifact};
|
||||
|
||||
#[async_trait]
|
||||
pub trait ArtifactsDB {
|
||||
async fn commit_artifact(&self, commit_artifact: AddCommitArtifact) -> anyhow::Result<()>;
|
||||
async fn get_latest_artifact(
|
||||
&self,
|
||||
get_latest_artifact: GetLatestArtifact,
|
||||
) -> anyhow::Result<Artifact>;
|
||||
}
|
||||
}
|
||||
|
||||
pub mod defaults {
|
||||
use axum::async_trait;
|
||||
use sqlx::PgPool;
|
||||
|
||||
use super::{traits, AddCommitArtifact, Artifact, GetLatestArtifact};
|
||||
|
||||
pub struct DefaultArtifactsDB {
|
||||
db: PgPool,
|
||||
}
|
||||
|
||||
impl DefaultArtifactsDB {
|
||||
pub fn new(db: PgPool) -> Self {
|
||||
Self { db }
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl traits::ArtifactsDB for DefaultArtifactsDB {
|
||||
async fn commit_artifact(&self, commit_artifact: AddCommitArtifact) -> anyhow::Result<()> {
|
||||
sqlx::query("INSERT INTO artifacts (app, branch, artifact_id) VALUES ($1, $2, $3)")
|
||||
.bind(commit_artifact.app)
|
||||
.bind(commit_artifact.branch)
|
||||
.bind(commit_artifact.artifact_id)
|
||||
.execute(&self.db)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn get_latest_artifact(
|
||||
&self,
|
||||
get_latest_artifact: GetLatestArtifact,
|
||||
) -> anyhow::Result<Artifact> {
|
||||
let artifact = sqlx::query_as::<_, Artifact>(
|
||||
"SELECT
|
||||
*
|
||||
FROM
|
||||
artifacts
|
||||
WHERE
|
||||
app = $1 AND
|
||||
branch = $2
|
||||
ORDER BY created_at DESC
|
||||
LIMIT 1",
|
||||
)
|
||||
.bind(get_latest_artifact.app)
|
||||
.bind(get_latest_artifact.branch)
|
||||
.fetch_one(&self.db)
|
||||
.await?;
|
||||
|
||||
Ok(artifact)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct ArtifactsDB {
|
||||
inner: Arc<dyn traits::ArtifactsDB + Send + Sync>,
|
||||
}
|
||||
|
||||
impl ArtifactsDB {
|
||||
pub fn new(db: PgPool) -> Self {
|
||||
Self {
|
||||
inner: Arc::new(DefaultArtifactsDB::new(db)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl std::ops::Deref for ArtifactsDB {
|
||||
type Target = Arc<dyn traits::ArtifactsDB + Send + Sync>;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.inner
|
||||
}
|
||||
}
|
||||
|
||||
pub mod extensions {
|
||||
use crate::app::App;
|
||||
|
||||
use super::ArtifactsDB;
|
||||
|
||||
pub trait ArtifactsDBExt {
|
||||
fn artifacts_db(&self) -> ArtifactsDB;
|
||||
}
|
||||
|
||||
impl ArtifactsDBExt for App {
|
||||
fn artifacts_db(&self) -> ArtifactsDB {
|
||||
ArtifactsDB::new(self.database.clone())
|
||||
}
|
||||
}
|
||||
}
|
@ -1,23 +0,0 @@
|
||||
use std::collections::HashMap;
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct ClusterList {}
|
||||
|
||||
impl ClusterList {
|
||||
pub async fn get_list(&self) -> anyhow::Result<HashMap<String, Vec<String>>> {
|
||||
Ok(HashMap::from([
|
||||
("dev".into(), vec!["clank-dev".into()]),
|
||||
("prod".into(), vec!["clank-prod".into()]),
|
||||
]))
|
||||
}
|
||||
|
||||
pub async fn get(&self, environment: &str) -> anyhow::Result<Option<Vec<String>>> {
|
||||
let list = self.get_list().await?;
|
||||
|
||||
if let Some(x) = list.get(environment) {
|
||||
Ok(Some(x.clone()))
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
}
|
@ -5,7 +5,12 @@ pub struct DomainEvents {
|
||||
nats: Nats,
|
||||
}
|
||||
|
||||
use crate::app::infra::nats::Nats;
|
||||
#[cfg(test)]
|
||||
use mockall::{automock, mock, predicate::*};
|
||||
|
||||
use crate::app::infra::{nats::Nats};
|
||||
|
||||
#[cfg_attr(test, automock)]
|
||||
impl DomainEvents {
|
||||
pub fn new(nats: Nats) -> Self {
|
||||
Self { nats }
|
||||
|
@ -1,5 +1,6 @@
|
||||
use crate::app::SharedApp;
|
||||
|
||||
#[mockall_double::double]
|
||||
use super::DomainEvents;
|
||||
|
||||
pub trait DomainEventsExt {
|
||||
|
@ -1,18 +1,18 @@
|
||||
#[derive(Clone, Default)]
|
||||
#[derive(Clone)]
|
||||
pub struct FileReader {}
|
||||
|
||||
use std::{
|
||||
collections::BTreeMap,
|
||||
path::{Path, PathBuf},
|
||||
};
|
||||
use std::{collections::BTreeMap, path::PathBuf};
|
||||
|
||||
pub mod extensions;
|
||||
|
||||
use anyhow::{anyhow, Context};
|
||||
use anyhow::anyhow;
|
||||
#[cfg(test)]
|
||||
use mockall::{automock, mock, predicate::*};
|
||||
|
||||
#[cfg_attr(test, automock)]
|
||||
impl FileReader {
|
||||
pub fn new() -> Self {
|
||||
Self::default()
|
||||
Self {}
|
||||
}
|
||||
|
||||
pub async fn read_files(&self, location: PathBuf) -> anyhow::Result<Files> {
|
||||
@ -20,9 +20,7 @@ impl FileReader {
|
||||
|
||||
let mut clusters: BTreeMap<String, Vec<File>> = BTreeMap::new();
|
||||
|
||||
let mut dir = tokio::fs::read_dir(&location)
|
||||
.await
|
||||
.context(format!("failed to find location: {}", &location.display()))?;
|
||||
let mut dir = tokio::fs::read_dir(&location).await?;
|
||||
while let Some(dir_entry) = dir.next_entry().await? {
|
||||
if dir_entry.metadata().await?.is_dir() {
|
||||
tracing::trace!("found cluster in: {}", dir_entry.path().display());
|
||||
@ -51,11 +49,7 @@ impl FileReader {
|
||||
cluster_name
|
||||
);
|
||||
|
||||
if file.path().is_absolute() {
|
||||
files.push((file.path(), file.path().strip_prefix(&location)?).into())
|
||||
} else {
|
||||
files.push(file.into_path().into())
|
||||
}
|
||||
files.push(file.into_path().into())
|
||||
}
|
||||
}
|
||||
|
||||
@ -66,32 +60,11 @@ impl FileReader {
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct File {
|
||||
pub path: PathBuf,
|
||||
pub relative: Option<PathBuf>,
|
||||
}
|
||||
|
||||
impl From<PathBuf> for File {
|
||||
fn from(value: PathBuf) -> Self {
|
||||
Self {
|
||||
path: value,
|
||||
relative: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<(PathBuf, PathBuf)> for File {
|
||||
fn from(value: (PathBuf, PathBuf)) -> Self {
|
||||
Self {
|
||||
path: value.0,
|
||||
relative: Some(value.1),
|
||||
}
|
||||
}
|
||||
}
|
||||
impl From<(&Path, &Path)> for File {
|
||||
fn from(value: (&Path, &Path)) -> Self {
|
||||
Self {
|
||||
path: value.0.to_path_buf(),
|
||||
relative: Some(value.1.to_path_buf()),
|
||||
}
|
||||
Self { path: value }
|
||||
}
|
||||
}
|
||||
|
||||
@ -119,8 +92,14 @@ impl From<Files> for Vec<File> {
|
||||
value
|
||||
.iter()
|
||||
.map(|(cluster_name, files)| (PathBuf::from(cluster_name), files))
|
||||
.flat_map(|(_cluster_name, files)| files.to_vec())
|
||||
// .map(|f| f.into())
|
||||
.flat_map(|(cluster_name, files)| {
|
||||
files
|
||||
.iter()
|
||||
//.map(|file_path| cluster_name.join(&file_path.path))
|
||||
.map(|file_path| file_path.path.clone())
|
||||
.collect::<Vec<_>>()
|
||||
})
|
||||
.map(|f| f.into())
|
||||
.collect::<Vec<_>>()
|
||||
}
|
||||
}
|
||||
|
@ -1,5 +1,6 @@
|
||||
use crate::app::{SharedApp, SharedLocalApp};
|
||||
use crate::app::SharedApp;
|
||||
|
||||
#[mockall_double::double]
|
||||
use super::FileReader;
|
||||
|
||||
pub trait FileReaderExt {
|
||||
@ -11,9 +12,3 @@ impl FileReaderExt for SharedApp {
|
||||
FileReader::new()
|
||||
}
|
||||
}
|
||||
|
||||
impl FileReaderExt for SharedLocalApp {
|
||||
fn file_reader(&self) -> FileReader {
|
||||
FileReader::new()
|
||||
}
|
||||
}
|
||||
|
@ -1,25 +1,23 @@
|
||||
use std::{env::temp_dir, path::PathBuf};
|
||||
use std::path::PathBuf;
|
||||
|
||||
use super::release_manager::models::{ArtifactID, UploadArtifactID};
|
||||
use super::release_manager::models::ArtifactID;
|
||||
|
||||
pub mod extensions;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct FileStore {
|
||||
client: aws_sdk_s3::Client,
|
||||
|
||||
bucket: String,
|
||||
}
|
||||
|
||||
use aws_sdk_s3::primitives::ByteStream;
|
||||
use tokio::io::BufReader;
|
||||
#[cfg(test)]
|
||||
use mockall::{automock, mock, predicate::*};
|
||||
use tokio::io::AsyncReadExt;
|
||||
|
||||
#[cfg_attr(test, automock)]
|
||||
impl FileStore {
|
||||
pub fn new(client: aws_sdk_s3::Client) -> Self {
|
||||
Self {
|
||||
client,
|
||||
bucket: std::env::var("BUCKET_NAME").unwrap_or("flux-releaser".into()),
|
||||
}
|
||||
Self { client }
|
||||
}
|
||||
|
||||
pub async fn upload_file(&self, artifact_id: ArtifactID, file: PathBuf) -> anyhow::Result<()> {
|
||||
@ -27,7 +25,7 @@ impl FileStore {
|
||||
|
||||
self.client
|
||||
.put_object()
|
||||
.bucket(&self.bucket)
|
||||
.bucket("mybucket")
|
||||
.key(format!("archives/{}.tar", &artifact_id.to_string()))
|
||||
.body(ByteStream::from_path(file).await?)
|
||||
.send()
|
||||
@ -35,74 +33,4 @@ impl FileStore {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn upload_temp(&self, id: UploadArtifactID, file: PathBuf) -> anyhow::Result<()> {
|
||||
tracing::trace!("uploading temp files: {}", id.to_string());
|
||||
|
||||
self.client
|
||||
.put_object()
|
||||
.bucket(&self.bucket)
|
||||
.key(format!("temp/{}.tar", &id.to_string()))
|
||||
.body(ByteStream::from_path(file).await?)
|
||||
.send()
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn get_archive(&self, artifact_id: ArtifactID) -> anyhow::Result<PathBuf> {
|
||||
tracing::trace!("getting archive: {}", artifact_id.to_string());
|
||||
|
||||
let archive_name = format!("archives/{}.tar", &artifact_id.to_string());
|
||||
|
||||
let obj = self
|
||||
.client
|
||||
.get_object()
|
||||
.bucket(&self.bucket)
|
||||
.key(&archive_name)
|
||||
.send()
|
||||
.await?;
|
||||
|
||||
let archive_path = temp_dir()
|
||||
.join("flux_releaser")
|
||||
.join("cache")
|
||||
.join(&archive_name);
|
||||
tokio::fs::create_dir_all(archive_path.parent().unwrap()).await?;
|
||||
let mut archive_file = tokio::fs::File::create(&archive_path).await?;
|
||||
let mut buf_reader = BufReader::new(obj.body.into_async_read());
|
||||
|
||||
tokio::io::copy(&mut buf_reader, &mut archive_file).await?;
|
||||
|
||||
tracing::debug!("created archive: {}", archive_path.display());
|
||||
|
||||
Ok(archive_path)
|
||||
}
|
||||
|
||||
pub async fn get_temp(&self, artifact_id: UploadArtifactID) -> anyhow::Result<PathBuf> {
|
||||
tracing::trace!("getting archive: {}", artifact_id.to_string());
|
||||
|
||||
let archive_name = format!("temp/{}.tar", &artifact_id.to_string());
|
||||
|
||||
let obj = self
|
||||
.client
|
||||
.get_object()
|
||||
.bucket(&self.bucket)
|
||||
.key(&archive_name)
|
||||
.send()
|
||||
.await?;
|
||||
|
||||
let archive_path = temp_dir()
|
||||
.join("flux_releaser")
|
||||
.join("downloads/cache")
|
||||
.join(&archive_name);
|
||||
tokio::fs::create_dir_all(archive_path.parent().unwrap()).await?;
|
||||
let mut archive_file = tokio::fs::File::create(&archive_path).await?;
|
||||
let mut buf_reader = BufReader::new(obj.body.into_async_read());
|
||||
|
||||
tokio::io::copy(&mut buf_reader, &mut archive_file).await?;
|
||||
|
||||
tracing::debug!("created archive: {}", archive_path.display());
|
||||
|
||||
Ok(archive_path)
|
||||
}
|
||||
}
|
||||
|
@ -1,5 +1,6 @@
|
||||
use crate::app::SharedApp;
|
||||
|
||||
#[mockall_double::double]
|
||||
use super::FileStore;
|
||||
|
||||
pub trait FileStoreExt {
|
||||
|
@ -1,138 +0,0 @@
|
||||
use std::path::PathBuf;
|
||||
|
||||
use anyhow::Context;
|
||||
|
||||
use crate::{
|
||||
app::infra::grpc::FluxReleaserGrpcClient,
|
||||
grpc::gen::{CommitArtifactRequest, TriggerReleaseRequest},
|
||||
};
|
||||
|
||||
use super::{
|
||||
archive::Archive,
|
||||
file_reader::FileReader,
|
||||
flux_releaser_uploader::FluxReleaserUploader,
|
||||
release_manager::models::{ArtifactID, UploadArtifactID},
|
||||
};
|
||||
|
||||
pub struct FluxLocalClusterManager {
|
||||
file_reader: FileReader,
|
||||
archive: Archive,
|
||||
flux_uploader: FluxReleaserUploader,
|
||||
flux_releaser_client: FluxReleaserGrpcClient,
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for FluxLocalClusterManager {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl FluxLocalClusterManager {
|
||||
pub fn new(
|
||||
file_reader: FileReader,
|
||||
archive: Archive,
|
||||
flux_uploader: FluxReleaserUploader,
|
||||
flux_releaser_client: FluxReleaserGrpcClient,
|
||||
) -> Self {
|
||||
Self {
|
||||
file_reader,
|
||||
archive,
|
||||
flux_uploader,
|
||||
flux_releaser_client,
|
||||
}
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip(include))]
|
||||
pub async fn package_clusters(
|
||||
&self,
|
||||
include: impl Into<PathBuf>,
|
||||
) -> anyhow::Result<UploadArtifactID> {
|
||||
let include = include.into();
|
||||
|
||||
tracing::debug!("reading files");
|
||||
let files = self.file_reader.read_files(include.clone()).await?;
|
||||
tracing::debug!("creating archive");
|
||||
let archive = self.archive.create_archive(&include, files).await?;
|
||||
tracing::debug!("uploading archive");
|
||||
let upload_id = self.flux_uploader.upload_archive(archive).await?;
|
||||
|
||||
tracing::debug!("done packaging clusters");
|
||||
|
||||
Ok(upload_id)
|
||||
}
|
||||
|
||||
pub async fn commit_artifact(
|
||||
&self,
|
||||
app: impl Into<String>,
|
||||
branch: impl Into<String>,
|
||||
upload_id: UploadArtifactID,
|
||||
) -> anyhow::Result<ArtifactID> {
|
||||
let artifact_id = self
|
||||
.flux_releaser_client
|
||||
.lock()
|
||||
.await
|
||||
.commit_artifact(tonic::Request::new(CommitArtifactRequest {
|
||||
app: app.into(),
|
||||
branch: branch.into(),
|
||||
upload_id: upload_id.to_string(),
|
||||
}))
|
||||
.await?;
|
||||
|
||||
artifact_id.into_inner().artifact_id.try_into()
|
||||
}
|
||||
|
||||
pub async fn trigger_release(
|
||||
&self,
|
||||
app: impl Into<String>,
|
||||
branch: impl Into<String>,
|
||||
) -> anyhow::Result<()> {
|
||||
self.flux_releaser_client
|
||||
.lock()
|
||||
.await
|
||||
.trigger_release(tonic::Request::new(TriggerReleaseRequest {
|
||||
app: app.into(),
|
||||
branch: branch.into(),
|
||||
}))
|
||||
.await
|
||||
.context("failed to trigger release")?;
|
||||
|
||||
// Send release proto to upstream
|
||||
|
||||
// 1. find app by app + branch
|
||||
|
||||
// 2. Unpack latest artifact by app + branch
|
||||
|
||||
// 3. Unpack by cluster
|
||||
|
||||
// 4. Upload
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
pub mod extensions {
|
||||
use crate::{
|
||||
app::SharedLocalApp,
|
||||
services::{
|
||||
archive::extensions::ArchiveExt, file_reader::extensions::FileReaderExt,
|
||||
flux_releaser_uploader::extensions::FluxReleaserUploaderExt,
|
||||
},
|
||||
};
|
||||
|
||||
use super::FluxLocalClusterManager;
|
||||
|
||||
pub trait FluxLocalClusterManagerExt {
|
||||
fn flux_local_cluster_manager(&self) -> FluxLocalClusterManager;
|
||||
}
|
||||
|
||||
impl FluxLocalClusterManagerExt for SharedLocalApp {
|
||||
fn flux_local_cluster_manager(&self) -> FluxLocalClusterManager {
|
||||
FluxLocalClusterManager::new(
|
||||
self.file_reader(),
|
||||
self.archive(),
|
||||
self.flux_releaser_uploader(),
|
||||
self.grpc_client.clone(),
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
@ -1,54 +0,0 @@
|
||||
use tonic::transport::Channel;
|
||||
|
||||
use crate::{
|
||||
app::infra::grpc::FluxReleaserGrpcClient,
|
||||
grpc::gen::{flux_releaser_client::FluxReleaserClient, UploadArtifactRequest},
|
||||
};
|
||||
|
||||
use super::{archive::ArchiveFile, release_manager::models::UploadArtifactID};
|
||||
|
||||
pub struct FluxReleaserUploader {
|
||||
flux_client: FluxReleaserGrpcClient,
|
||||
}
|
||||
|
||||
impl FluxReleaserUploader {
|
||||
pub fn new(flux_client: FluxReleaserGrpcClient) -> Self {
|
||||
Self { flux_client }
|
||||
}
|
||||
|
||||
pub async fn upload_archive(&self, archive: ArchiveFile) -> anyhow::Result<UploadArtifactID> {
|
||||
let chunks = archive
|
||||
.content
|
||||
.chunks(1_000_000) // Slice by about 1MB
|
||||
.map(|ch| UploadArtifactRequest {
|
||||
content: ch.to_vec(),
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let iter = tokio_stream::iter(chunks);
|
||||
let resp = self
|
||||
.flux_client
|
||||
.lock()
|
||||
.await
|
||||
.upload_artifact(tonic::Request::new(iter))
|
||||
.await?;
|
||||
|
||||
resp.into_inner().upload_id.try_into()
|
||||
}
|
||||
}
|
||||
|
||||
pub mod extensions {
|
||||
use crate::app::SharedLocalApp;
|
||||
|
||||
use super::FluxReleaserUploader;
|
||||
|
||||
pub trait FluxReleaserUploaderExt {
|
||||
fn flux_releaser_uploader(&self) -> FluxReleaserUploader;
|
||||
}
|
||||
|
||||
impl FluxReleaserUploaderExt for SharedLocalApp {
|
||||
fn flux_releaser_uploader(&self) -> FluxReleaserUploader {
|
||||
FluxReleaserUploader::new(self.grpc_client.clone())
|
||||
}
|
||||
}
|
||||
}
|
@ -1,337 +0,0 @@
|
||||
use std::{
|
||||
path::{Path, PathBuf},
|
||||
sync::Arc,
|
||||
};
|
||||
|
||||
use anyhow::Context;
|
||||
use git2::{
|
||||
build::{CheckoutBuilder, RepoBuilder},
|
||||
Cred, FetchOptions, IndexAddOption, PushOptions, RemoteCallbacks, Repository, ResetType,
|
||||
Signature,
|
||||
};
|
||||
use tokio::{io::AsyncWriteExt, sync::Mutex};
|
||||
|
||||
use super::{
|
||||
archive::{Archive, ArchiveFile},
|
||||
cluster_list::ClusterList,
|
||||
};
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct SharedGit {
|
||||
inner: Arc<Git>,
|
||||
}
|
||||
|
||||
impl From<Git> for SharedGit {
|
||||
fn from(value: Git) -> Self {
|
||||
Self {
|
||||
inner: Arc::new(value),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl std::ops::Deref for SharedGit {
|
||||
type Target = Git;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.inner
|
||||
}
|
||||
}
|
||||
|
||||
pub struct Git {
|
||||
location: Mutex<PathBuf>,
|
||||
|
||||
registry: String,
|
||||
|
||||
cluster_list: ClusterList,
|
||||
archive: Archive,
|
||||
}
|
||||
|
||||
impl Git {
|
||||
pub fn new(registry: String, cluster_list: ClusterList, archive: Archive) -> Self {
|
||||
Self {
|
||||
registry,
|
||||
|
||||
location: Mutex::new(std::env::temp_dir().join("flux_releaser")),
|
||||
|
||||
cluster_list,
|
||||
archive,
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn publish(
|
||||
&self,
|
||||
archive: &ArchiveFile,
|
||||
service: &str,
|
||||
namespace: &str,
|
||||
environment: &str,
|
||||
) -> anyhow::Result<()> {
|
||||
// TODO: implement exponential backoff and 5 attempts
|
||||
|
||||
self.publish_attempt(archive, service, namespace, environment)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn publish_attempt(
|
||||
&self,
|
||||
archive: &ArchiveFile,
|
||||
service: &str,
|
||||
namespace: &str,
|
||||
environment: &str,
|
||||
) -> anyhow::Result<()> {
|
||||
// 1. Clone repo into location (with lock) or update
|
||||
let location = self.location.lock().await;
|
||||
|
||||
let repo_dir = location.join("repo");
|
||||
let repo = match self.get_repo(&repo_dir).await? {
|
||||
//TODO: Possible handle error by just recloning the repo
|
||||
None => self.clone_repo(&repo_dir).await?,
|
||||
Some(repo) => {
|
||||
self.update_repo(repo).await?;
|
||||
self.get_repo(&repo_dir)
|
||||
.await?
|
||||
.ok_or(anyhow::anyhow!("failed to open repository"))?
|
||||
}
|
||||
};
|
||||
|
||||
// 2. Extract from archive
|
||||
// TODO: maybe pad archive with hash or something
|
||||
let unpack_dir = location.join("archive");
|
||||
self.archive.unpack_archive(archive, &unpack_dir).await?;
|
||||
|
||||
// 3. Splat tar over application and cluster
|
||||
// The archive should always be structured like so:
|
||||
// - <environment>/
|
||||
// - * manifests
|
||||
|
||||
// 3a. prepare git repo for new files
|
||||
let clusters = self
|
||||
.cluster_list
|
||||
.get(environment)
|
||||
.await?
|
||||
.ok_or(anyhow::anyhow!(
|
||||
"environment is not registered: {} in cluster list",
|
||||
environment
|
||||
))?;
|
||||
|
||||
for cluster in clusters {
|
||||
let service_entry = repo_dir
|
||||
.join("deployments")
|
||||
.join(&cluster)
|
||||
.join(namespace)
|
||||
.join(service);
|
||||
|
||||
if service_entry.exists() {
|
||||
if let Err(e) = tokio::fs::remove_dir_all(&service_entry).await {
|
||||
tracing::warn!("failed to remove existing dir: {}", e);
|
||||
}
|
||||
}
|
||||
tokio::fs::create_dir_all(&service_entry).await?;
|
||||
|
||||
let cluster_entry = repo_dir
|
||||
.join("clusters")
|
||||
.join(&cluster)
|
||||
.join(namespace)
|
||||
.join(service);
|
||||
|
||||
if cluster_entry.exists() {
|
||||
if let Err(e) = tokio::fs::remove_dir_all(&cluster_entry).await {
|
||||
tracing::warn!("failed to remove existing dir: {}", e);
|
||||
}
|
||||
}
|
||||
tokio::fs::create_dir_all(&cluster_entry).await?;
|
||||
|
||||
let archive_dir = unpack_dir.join(environment);
|
||||
if !archive_dir.exists() {
|
||||
anyhow::bail!("selected environment is not published for archive");
|
||||
}
|
||||
|
||||
let mut read_dir = tokio::fs::read_dir(archive_dir).await?;
|
||||
while let Some(entry) = read_dir.next_entry().await? {
|
||||
if entry.metadata().await?.is_file() {
|
||||
let entry_path = entry.path();
|
||||
let dest_path = service_entry.join(entry.file_name());
|
||||
tokio::fs::copy(entry_path, dest_path).await?;
|
||||
}
|
||||
}
|
||||
|
||||
let cluster_entry_file = cluster_entry.join(format!("{service}.yaml"));
|
||||
let mut cluster_entry_file = tokio::fs::File::create(cluster_entry_file).await?;
|
||||
let file_contents = format!(
|
||||
r#"
|
||||
apiVersion: kustomize.toolkit.fluxcd.io/v1beta2
|
||||
kind: Kustomization
|
||||
metadata:
|
||||
name: {service}
|
||||
namespace: flux-system
|
||||
spec:
|
||||
interval: 1m
|
||||
retryInterval: 30s
|
||||
path: ./deployments/{cluster}/{namespace}/{service}
|
||||
prune: true
|
||||
sourceRef:
|
||||
kind: GitRepository
|
||||
name: flux-system
|
||||
namespace: flux-system
|
||||
"#
|
||||
);
|
||||
|
||||
cluster_entry_file
|
||||
.write_all(file_contents.as_bytes())
|
||||
.await?;
|
||||
}
|
||||
|
||||
// 4. Commit && Push
|
||||
self.commit_and_push(repo, environment, service).await?;
|
||||
|
||||
// 5. Cleanup
|
||||
tokio::fs::remove_dir_all(unpack_dir).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn get_repo(&self, location: &Path) -> anyhow::Result<Option<Repository>> {
|
||||
match Repository::open(location) {
|
||||
Ok(r) => Ok(Some(r)),
|
||||
Err(e) => match e.code() {
|
||||
git2::ErrorCode::NotFound => Ok(None),
|
||||
_ => Err(e).context("failed to open git repository"),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
async fn clone_repo(&self, location: &Path) -> anyhow::Result<Repository> {
|
||||
let co = CheckoutBuilder::new();
|
||||
let mut fo = FetchOptions::new();
|
||||
fo.remote_callbacks(self.get_cred()?);
|
||||
let repo = RepoBuilder::new()
|
||||
.fetch_options(fo)
|
||||
.with_checkout(co)
|
||||
.clone(&self.registry, location)
|
||||
.context("failed to clone repository")?;
|
||||
|
||||
Ok(repo)
|
||||
}
|
||||
|
||||
async fn update_repo(&self, repository: Repository) -> anyhow::Result<()> {
|
||||
let mut remote = repository.find_remote("origin")?;
|
||||
let mut fo = FetchOptions::new();
|
||||
fo.remote_callbacks(self.get_cred()?);
|
||||
remote
|
||||
.fetch(&["main"], Some(&mut fo), None)
|
||||
.context("failed to update repo")?;
|
||||
|
||||
let origin_head = repository.find_reference("refs/remotes/origin/HEAD")?;
|
||||
let origin_head_commit = origin_head.peel_to_commit()?;
|
||||
|
||||
// Perform a hard reset to the origin's HEAD
|
||||
repository.reset(origin_head_commit.as_object(), ResetType::Hard, None)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn get_cred(&self) -> anyhow::Result<RemoteCallbacks> {
|
||||
let mut cb = RemoteCallbacks::new();
|
||||
cb.credentials(|_, username, _| {
|
||||
if let Ok(_sock) = std::env::var("SSH_AUTH_SOCK") {
|
||||
return Cred::ssh_key_from_agent(username.unwrap_or("git"));
|
||||
}
|
||||
let username = std::env::var("GIT_USERNAME").expect("GIT_USERNAME to be set");
|
||||
let password = std::env::var("GIT_PASSWORD").expect("GIT_PASSWORD to be set");
|
||||
Cred::userpass_plaintext(&username, &password)
|
||||
});
|
||||
cb.certificate_check(|_cert, _| Ok(git2::CertificateCheckStatus::CertificateOk));
|
||||
|
||||
Ok(cb)
|
||||
}
|
||||
|
||||
async fn commit_and_push(
|
||||
&self,
|
||||
repo: Repository,
|
||||
environment: &str,
|
||||
service: &str,
|
||||
) -> anyhow::Result<()> {
|
||||
let mut index = repo.index()?;
|
||||
|
||||
// Add all files to the index
|
||||
index.add_all(["*"].iter(), IndexAddOption::DEFAULT, None)?;
|
||||
index.write()?;
|
||||
|
||||
// Create a tree from the index
|
||||
let oid = index.write_tree()?;
|
||||
let tree = repo.find_tree(oid)?;
|
||||
|
||||
// Get the current HEAD commit
|
||||
let parent_commit = repo.head()?.peel_to_commit()?;
|
||||
|
||||
// Create a signature
|
||||
let sig = Signature::now("flux_releaser", "operations+flux-releaser@kjuulh.io")?;
|
||||
|
||||
// Create the commit
|
||||
repo.commit(
|
||||
Some("HEAD"),
|
||||
&sig,
|
||||
&sig,
|
||||
&format!("chore({environment}/{service}): releasing service"),
|
||||
&tree,
|
||||
&[&parent_commit],
|
||||
)?;
|
||||
|
||||
let mut remote = repo.find_remote("origin")?;
|
||||
let mut po = PushOptions::new();
|
||||
po.remote_callbacks(self.get_cred()?);
|
||||
|
||||
remote.push(
|
||||
&[&format!("refs/heads/{}:refs/heads/{}", "main", "main")],
|
||||
Some(&mut po),
|
||||
)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use tokio::io::AsyncReadExt;
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::services::{
|
||||
archive::{Archive, ArchiveFile},
|
||||
cluster_list::ClusterList,
|
||||
git::Git,
|
||||
};
|
||||
|
||||
#[tokio::test]
|
||||
async fn can_clone_upstream() -> anyhow::Result<()> {
|
||||
// FIXME: right now CI doesn't support git
|
||||
return Ok(());
|
||||
let random = Uuid::new_v4().to_string();
|
||||
println!("running test for id: {}", random);
|
||||
|
||||
println!("current_dir: {}", std::env::current_dir()?.display());
|
||||
let mut arch = tokio::fs::File::open("testdata/example.tar").await?;
|
||||
let mut dest = Vec::new();
|
||||
arch.read_to_end(&mut dest).await?;
|
||||
|
||||
let git = Git::new(
|
||||
"ssh://git@git.front.kjuulh.io/kjuulh/clank-clusters.git".into(),
|
||||
ClusterList::default(),
|
||||
Archive::default(),
|
||||
);
|
||||
let mut location = git.location.lock().await;
|
||||
*location = location.join(random);
|
||||
println!("into: {}", location.display());
|
||||
drop(location);
|
||||
|
||||
git.publish(
|
||||
&ArchiveFile { content: dest },
|
||||
"flux-releaser-test",
|
||||
"dev",
|
||||
"dev",
|
||||
)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
@ -1,58 +1,50 @@
|
||||
use serde::Serialize;
|
||||
|
||||
use crate::services::archive::{Archive, ArchiveFile};
|
||||
use crate::services::artifacts_db::{AddCommitArtifact, GetLatestArtifact};
|
||||
use crate::services::archive::extensions::ArchiveUploadExt;
|
||||
#[mockall_double::double]
|
||||
use crate::services::file_store::FileStore;
|
||||
|
||||
use super::artifacts_db::ArtifactsDB;
|
||||
#[mockall_double::double]
|
||||
use super::archive::Archive;
|
||||
#[mockall_double::double]
|
||||
use super::domain_events::DomainEvents;
|
||||
use super::git::SharedGit;
|
||||
#[mockall_double::double]
|
||||
use super::file_reader::FileReader;
|
||||
|
||||
use self::models::*;
|
||||
use self::models::{ArtifactID, CommitArtifact};
|
||||
|
||||
pub struct ReleaseManager {
|
||||
archive: Archive,
|
||||
file_reader: FileReader,
|
||||
file_store: FileStore,
|
||||
domain_events: DomainEvents,
|
||||
artifacts_db: ArtifactsDB,
|
||||
git: SharedGit,
|
||||
}
|
||||
|
||||
impl ReleaseManager {
|
||||
pub fn new(
|
||||
file_reader: FileReader,
|
||||
file_store: FileStore,
|
||||
archive: Archive,
|
||||
domain_events: DomainEvents,
|
||||
artifacts_db: ArtifactsDB,
|
||||
git: SharedGit,
|
||||
) -> Self {
|
||||
Self {
|
||||
archive,
|
||||
file_reader,
|
||||
file_store,
|
||||
domain_events,
|
||||
artifacts_db,
|
||||
git,
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn upload_artifact(
|
||||
&self,
|
||||
request: UploadArtifact,
|
||||
) -> anyhow::Result<UploadArtifactID> {
|
||||
let upload_id = uuid::Uuid::now_v7();
|
||||
|
||||
self.file_store
|
||||
.upload_temp(upload_id.into(), request.file_path)
|
||||
.await?;
|
||||
|
||||
Ok(upload_id.into())
|
||||
}
|
||||
|
||||
pub async fn commit_artifact(&self, request: CommitArtifact) -> anyhow::Result<ArtifactID> {
|
||||
tracing::debug!("committing artifact: {:?}", request);
|
||||
let artifact_id = ArtifactID::new();
|
||||
|
||||
let artifact = self.file_store.get_temp(request.upload_id).await?;
|
||||
let files = self.file_reader.read_files(request.folder).await?;
|
||||
|
||||
let archive = self.archive.create_archive(files).await?;
|
||||
|
||||
self.file_store
|
||||
.upload_file(artifact_id.clone(), artifact)
|
||||
.upload_archive(artifact_id.clone(), archive)
|
||||
.await?;
|
||||
|
||||
self.domain_events
|
||||
@ -61,68 +53,8 @@ impl ReleaseManager {
|
||||
})?)
|
||||
.await?;
|
||||
|
||||
self.artifacts_db
|
||||
.commit_artifact(AddCommitArtifact {
|
||||
app: request.app,
|
||||
branch: request.branch,
|
||||
artifact_id: artifact_id.clone().into(),
|
||||
})
|
||||
.await?;
|
||||
|
||||
Ok(artifact_id)
|
||||
}
|
||||
|
||||
pub async fn release(&self, release_req: Release) -> anyhow::Result<()> {
|
||||
tracing::debug!(
|
||||
app = release_req.app,
|
||||
branch = release_req.branch,
|
||||
"releasing latest commit"
|
||||
);
|
||||
|
||||
let latest_artifact = self
|
||||
.artifacts_db
|
||||
.get_latest_artifact(GetLatestArtifact {
|
||||
app: release_req.app.clone(),
|
||||
branch: release_req.branch.clone(),
|
||||
})
|
||||
.await?;
|
||||
|
||||
tracing::trace!("found latest artifact: {:?}", latest_artifact);
|
||||
|
||||
let artifact = self
|
||||
.file_store
|
||||
.get_archive(latest_artifact.artifact_id.into())
|
||||
.await?;
|
||||
|
||||
tracing::trace!("placed artifact in: {}", artifact.display());
|
||||
|
||||
self.domain_events
|
||||
.publish_event(&serde_json::to_string(&PublishedArtifactEvent {
|
||||
artifact_id: latest_artifact.artifact_id.to_string(),
|
||||
app: release_req.app.clone(),
|
||||
branch: release_req.branch.clone(),
|
||||
})?)
|
||||
.await?;
|
||||
|
||||
let artifact_contents = tokio::fs::read(artifact).await?;
|
||||
let env = if release_req.branch == "main" {
|
||||
// FIXME: select prod instead
|
||||
"prod"
|
||||
//"dev"
|
||||
} else {
|
||||
"dev"
|
||||
};
|
||||
self.git
|
||||
.publish(
|
||||
&ArchiveFile::from(artifact_contents),
|
||||
&release_req.app,
|
||||
env,
|
||||
env,
|
||||
)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
@ -130,12 +62,55 @@ pub struct CommittedArtifactEvent {
|
||||
artifact_id: String,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
pub struct PublishedArtifactEvent {
|
||||
app: String,
|
||||
branch: String,
|
||||
artifact_id: String,
|
||||
}
|
||||
|
||||
pub mod extensions;
|
||||
pub mod models;
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use crate::services::archive::{ArchiveFile, MockArchive};
|
||||
use crate::services::domain_events::MockDomainEvents;
|
||||
use crate::services::file_reader::{Files, MockFileReader};
|
||||
use crate::services::file_store::MockFileStore;
|
||||
|
||||
use super::*;
|
||||
|
||||
#[tokio::test]
|
||||
async fn generated_artifact_id() -> anyhow::Result<()> {
|
||||
let mut file_store = MockFileStore::default();
|
||||
file_store
|
||||
.expect_upload_file()
|
||||
.times(1)
|
||||
.returning(|_, _| Ok(()));
|
||||
|
||||
let mut domain_events = MockDomainEvents::default();
|
||||
domain_events
|
||||
.expect_publish_event()
|
||||
.times(1)
|
||||
.returning(|_| Ok(()));
|
||||
|
||||
let mut file_reader = MockFileReader::default();
|
||||
file_reader
|
||||
.expect_read_files()
|
||||
.times(1)
|
||||
.returning(|_| Ok(Files::default()));
|
||||
|
||||
let mut archive = MockArchive::default();
|
||||
archive.expect_create_archive().times(1).returning(|_| {
|
||||
Ok(ArchiveFile {
|
||||
content: Vec::new(),
|
||||
})
|
||||
});
|
||||
|
||||
let releaser_manager = ReleaseManager::new(file_reader, file_store, archive, domain_events);
|
||||
|
||||
releaser_manager
|
||||
.commit_artifact(CommitArtifact {
|
||||
app: "app".into(),
|
||||
branch: "branch".into(),
|
||||
folder: "someFolder".into(),
|
||||
})
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
@ -1,8 +1,8 @@
|
||||
use crate::{
|
||||
app::SharedApp,
|
||||
services::{
|
||||
artifacts_db::extensions::ArtifactsDBExt, domain_events::extensions::DomainEventsExt,
|
||||
file_store::extensions::FileStoreExt,
|
||||
archive::extensions::ArchiveExt, domain_events::extensions::DomainEventsExt,
|
||||
file_reader::extensions::FileReaderExt, file_store::extensions::FileStoreExt,
|
||||
},
|
||||
};
|
||||
|
||||
@ -15,10 +15,10 @@ pub trait ReleaseManagerExt {
|
||||
impl ReleaseManagerExt for SharedApp {
|
||||
fn release_manager(&self) -> ReleaseManager {
|
||||
ReleaseManager::new(
|
||||
self.file_reader(),
|
||||
self.file_store(),
|
||||
self.archive(),
|
||||
self.domain_events(),
|
||||
self.artifacts_db(),
|
||||
self.git.clone(),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
@ -1,16 +1,10 @@
|
||||
use std::{ops::Deref, path::PathBuf};
|
||||
use std::path::PathBuf;
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct CommitArtifact {
|
||||
pub app: String,
|
||||
pub branch: String,
|
||||
pub upload_id: UploadArtifactID,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct Release {
|
||||
pub app: String,
|
||||
pub branch: String,
|
||||
pub folder: PathBuf,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
@ -29,61 +23,3 @@ impl std::ops::Deref for ArtifactID {
|
||||
&self.0
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<String> for ArtifactID {
|
||||
type Error = anyhow::Error;
|
||||
|
||||
fn try_from(value: String) -> Result<Self, Self::Error> {
|
||||
let uuid = uuid::Uuid::parse_str(&value)?;
|
||||
|
||||
Ok(ArtifactID(uuid))
|
||||
}
|
||||
}
|
||||
|
||||
impl From<uuid::Uuid> for ArtifactID {
|
||||
fn from(value: uuid::Uuid) -> Self {
|
||||
Self(value)
|
||||
}
|
||||
}
|
||||
|
||||
pub struct UploadArtifact {
|
||||
pub file_path: PathBuf,
|
||||
}
|
||||
|
||||
impl From<PathBuf> for UploadArtifact {
|
||||
fn from(value: PathBuf) -> Self {
|
||||
Self { file_path: value }
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct UploadArtifactID(uuid::Uuid);
|
||||
impl From<uuid::Uuid> for UploadArtifactID {
|
||||
fn from(value: uuid::Uuid) -> Self {
|
||||
Self(value)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<ArtifactID> for uuid::Uuid {
|
||||
fn from(value: ArtifactID) -> Self {
|
||||
value.0
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<String> for UploadArtifactID {
|
||||
type Error = anyhow::Error;
|
||||
|
||||
fn try_from(value: String) -> Result<Self, Self::Error> {
|
||||
let uuid = uuid::Uuid::parse_str(&value)?;
|
||||
|
||||
Ok(Self(uuid))
|
||||
}
|
||||
}
|
||||
|
||||
impl Deref for UploadArtifactID {
|
||||
type Target = uuid::Uuid;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.0
|
||||
}
|
||||
}
|
||||
|
BIN
crates/flux-releaser/testdata/example.tar
vendored
BIN
crates/flux-releaser/testdata/example.tar
vendored
Binary file not shown.
@ -1,14 +0,0 @@
|
||||
use flux_releaser::services::{
|
||||
archive::Archive, file_reader::FileReader, flux_local_cluster::FluxLocalClusterManager,
|
||||
};
|
||||
|
||||
#[tokio::test]
|
||||
async fn can_package_files() -> anyhow::Result<()> {
|
||||
tracing_subscriber::fmt().init();
|
||||
|
||||
//let sut = FluxLocalClusterManager::new(FileReader::new(), Archive::new());
|
||||
|
||||
//sut.package_clusters("testdata/flux_local_cluster/").await?;
|
||||
|
||||
Ok(())
|
||||
}
|
@ -1,240 +0,0 @@
|
||||
use std::{
|
||||
net::{Ipv4Addr, SocketAddr},
|
||||
sync::Arc,
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
use anyhow::Context;
|
||||
use flux_releaser::{
|
||||
app::{LocalApp, SharedApp, SharedLocalApp},
|
||||
grpc::gen::flux_releaser_client::FluxReleaserClient,
|
||||
services::{
|
||||
archive::ArchiveFile, file_store::extensions::FileStoreExt,
|
||||
flux_local_cluster::extensions::FluxLocalClusterManagerExt,
|
||||
flux_releaser_uploader::FluxReleaserUploader,
|
||||
},
|
||||
};
|
||||
use tokio::{net::TcpListener, runtime::Runtime, sync::Mutex, time::sleep};
|
||||
|
||||
struct Server {
|
||||
endpoints: Endpoints,
|
||||
app: SharedApp,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
struct Endpoints {
|
||||
http: SocketAddr,
|
||||
grpc: SocketAddr,
|
||||
}
|
||||
|
||||
impl Server {
|
||||
pub async fn new() -> anyhow::Result<Self> {
|
||||
let http_socket = Self::find_free_port().await?;
|
||||
let grpc_socket = Self::find_free_port().await?;
|
||||
|
||||
Ok(Self {
|
||||
endpoints: Endpoints {
|
||||
http: http_socket,
|
||||
grpc: grpc_socket,
|
||||
},
|
||||
app: SharedApp::new(flux_releaser::app::App::new().await?),
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn start(&self) -> anyhow::Result<()> {
|
||||
flux_releaser::cli::server::run_server(self.endpoints.http, self.endpoints.grpc).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn find_free_port() -> anyhow::Result<SocketAddr> {
|
||||
let socket = SocketAddr::new(std::net::IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0);
|
||||
|
||||
let listener = TcpListener::bind(socket).await?;
|
||||
|
||||
listener.local_addr().context("failed to get local addr")
|
||||
}
|
||||
}
|
||||
|
||||
static INIT: std::sync::Once = std::sync::Once::new();
|
||||
|
||||
async fn perform_task_with_backoff<F, Fut, T, E>(
|
||||
mut task: F,
|
||||
max_retries: u32,
|
||||
base_delay_ms: u64,
|
||||
) -> Result<T, E>
|
||||
where
|
||||
F: FnMut() -> Fut,
|
||||
Fut: std::future::Future<Output = Result<T, E>>,
|
||||
{
|
||||
let mut retries = 0;
|
||||
let mut delay = base_delay_ms;
|
||||
|
||||
loop {
|
||||
match task().await {
|
||||
Ok(result) => return Ok(result),
|
||||
Err(_e) if retries < max_retries => {
|
||||
sleep(Duration::from_millis(delay)).await;
|
||||
delay *= 2; // Exponential backoff
|
||||
retries += 1;
|
||||
}
|
||||
Err(e) => return Err(e),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Makes sure the setup is ready for execution
|
||||
async fn is_ready() -> anyhow::Result<()> {
|
||||
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
|
||||
|
||||
perform_task_with_backoff(
|
||||
|| async {
|
||||
let endpoints = unsafe {
|
||||
if ENDPOINTS.is_none() {
|
||||
anyhow::bail!("endpoints not set yet");
|
||||
}
|
||||
|
||||
ENDPOINTS.clone().unwrap()
|
||||
};
|
||||
|
||||
let resp = reqwest::get(format!("http://{}/ping", endpoints.http)).await?;
|
||||
|
||||
if !resp.status().is_success() {
|
||||
anyhow::bail!("failed with status: {}", resp.status());
|
||||
}
|
||||
|
||||
Ok::<(), anyhow::Error>(())
|
||||
},
|
||||
5,
|
||||
500,
|
||||
)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
static mut ENDPOINTS: Option<Endpoints> = None;
|
||||
static mut APP: Option<SharedApp> = None;
|
||||
|
||||
async fn setup() -> anyhow::Result<(Endpoints, SharedApp)> {
|
||||
INIT.call_once(|| {
|
||||
std::thread::spawn(|| {
|
||||
let rt = Runtime::new().unwrap();
|
||||
rt.block_on(async move {
|
||||
println!("once was created once");
|
||||
let server = Server::new().await.unwrap();
|
||||
|
||||
unsafe {
|
||||
ENDPOINTS = Some(server.endpoints.clone());
|
||||
APP = Some(server.app.clone());
|
||||
}
|
||||
|
||||
server.start().await.unwrap();
|
||||
})
|
||||
});
|
||||
});
|
||||
|
||||
is_ready().await?;
|
||||
|
||||
Ok(unsafe { (ENDPOINTS.clone().unwrap(), APP.clone().unwrap()) })
|
||||
}
|
||||
|
||||
async fn local_setup(endpoints: Endpoints) -> anyhow::Result<SharedLocalApp> {
|
||||
Ok(SharedLocalApp::new(
|
||||
LocalApp::new(format!("http://{}", endpoints.grpc)).await?,
|
||||
))
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn can_upload_artifact() -> anyhow::Result<()> {
|
||||
return Ok(());
|
||||
std::env::set_var("RUST_LOG", "flux_releaser=trace");
|
||||
let (endpoints, app) = setup().await?;
|
||||
|
||||
let client = FluxReleaserClient::connect(format!("http://{}", endpoints.grpc)).await?;
|
||||
let client = FluxReleaserUploader::new(Arc::new(Mutex::new(client)));
|
||||
|
||||
let bytes: Vec<u8> = vec![0; 10_000_000];
|
||||
let upload_id = client
|
||||
.upload_archive(ArchiveFile {
|
||||
content: bytes.clone(),
|
||||
})
|
||||
.await?;
|
||||
|
||||
assert!(!upload_id.to_string().is_empty());
|
||||
|
||||
let actual_path = app.file_store().get_temp(upload_id).await?;
|
||||
|
||||
let actual_content = tokio::fs::read(actual_path).await?;
|
||||
|
||||
assert_eq!(&bytes, &actual_content);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn can_publish_artifact() -> anyhow::Result<()> {
|
||||
return Ok(());
|
||||
std::env::set_var("RUST_LOG", "flux_releaser=trace");
|
||||
|
||||
let (endpoints, app) = setup().await?;
|
||||
let local_app = local_setup(endpoints.clone()).await?;
|
||||
|
||||
let upload_id = local_app
|
||||
.flux_local_cluster_manager()
|
||||
.package_clusters("testdata/flux_local_cluster")
|
||||
.await?;
|
||||
|
||||
let archive = app.file_store().get_temp(upload_id.clone()).await?;
|
||||
|
||||
assert!(archive.exists());
|
||||
|
||||
let artifact_id = local_app
|
||||
.flux_local_cluster_manager()
|
||||
.commit_artifact("some-app", "some-branch", upload_id)
|
||||
.await?;
|
||||
|
||||
let artifact = app.file_store().get_archive(artifact_id).await?;
|
||||
|
||||
assert!(artifact.exists());
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn can_trigger_latest_release() -> anyhow::Result<()> {
|
||||
return Ok(());
|
||||
let test_id = uuid::Uuid::now_v7();
|
||||
|
||||
std::env::set_var("RUST_LOG", "flux_releaser=trace");
|
||||
|
||||
let (endpoints, app) = setup().await?;
|
||||
let local_app = local_setup(endpoints.clone()).await?;
|
||||
|
||||
let upload_id = local_app
|
||||
.flux_local_cluster_manager()
|
||||
.package_clusters("testdata/flux_local_cluster")
|
||||
.await?;
|
||||
|
||||
let archive = app.file_store().get_temp(upload_id.clone()).await?;
|
||||
|
||||
assert!(archive.exists());
|
||||
|
||||
let _ = local_app
|
||||
.flux_local_cluster_manager()
|
||||
.commit_artifact(test_id, "some-branch", upload_id)
|
||||
.await?;
|
||||
|
||||
local_app
|
||||
.flux_local_cluster_manager()
|
||||
.trigger_release(test_id, "some-branch")
|
||||
.await?;
|
||||
|
||||
// 1. Verify that release event has been sent
|
||||
// 2. Verify that we've splatted the flux cluster over the upstream registry
|
||||
// 3. Verify database has a release record
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub struct TestGreeter {}
|
23
cuddle.yaml
23
cuddle.yaml
@ -5,17 +5,16 @@ base: "git@git.front.kjuulh.io:kjuulh/cuddle-rust-service-plan.git"
|
||||
vars:
|
||||
service: "flux-releaser"
|
||||
registry: kasperhermansen
|
||||
database:
|
||||
crdb: "true"
|
||||
ingress:
|
||||
- external: "true"
|
||||
- internal: "true"
|
||||
|
||||
clusters:
|
||||
clank-prod:
|
||||
replicas: "3"
|
||||
namespace: prod
|
||||
|
||||
|
||||
cuddle/clusters:
|
||||
dev:
|
||||
env:
|
||||
service.host: "0.0.0.0:3000"
|
||||
prod:
|
||||
env:
|
||||
service.host: "0.0.0.0:3000"
|
||||
deployment:
|
||||
registry: git@git.front.kjuulh.io:kjuulh/clank-clusters
|
||||
env:
|
||||
prod:
|
||||
clusters:
|
||||
- clank-prod
|
||||
|
@ -1,3 +0,0 @@
|
||||
{
|
||||
"$schema": "https://docs.renovatebot.com/renovate-schema.json"
|
||||
}
|
@ -21,8 +21,8 @@ services:
|
||||
/bin/sh -c "
|
||||
/usr/bin/mc alias set myminio http://minio:10000 minioadmin minioadminpassword;
|
||||
/usr/bin/mc admin info myminio;
|
||||
/usr/bin/mc mb myminio/flux-releaser;
|
||||
/usr/bin/mc policy set public myminio/flux-releaser;
|
||||
/usr/bin/mc mb myminio/mybucket;
|
||||
/usr/bin/mc policy set public myminio/mybucket;
|
||||
exit 0;
|
||||
"
|
||||
|
||||
@ -34,17 +34,3 @@ services:
|
||||
- NATS_ENABLE_AUTH=yes
|
||||
- NATS_USERNAME=user
|
||||
- NATS_PASSWORD=secret
|
||||
|
||||
crdb:
|
||||
restart: 'always'
|
||||
image: 'cockroachdb/cockroach:latest'
|
||||
command: 'start-single-node --advertise-addr 0.0.0.0 --insecure'
|
||||
healthcheck:
|
||||
test: ["CMD", "curl", "-f", "http://localhost:8080/health?ready=1"]
|
||||
interval: '10s'
|
||||
timeout: '30s'
|
||||
retries: 5
|
||||
start_period: '20s'
|
||||
ports:
|
||||
- '28080:8080'
|
||||
- '26257:26257'
|
||||
|
Loading…
Reference in New Issue
Block a user