Compare commits
No commits in common. "main" and "feat/add-initial-publish" have entirely different histories.
main
...
feat/add-i
115
.drone.yml
115
.drone.yml
@ -1,2 +1,113 @@
|
|||||||
kind: template
|
kind: pipeline
|
||||||
load: cuddle-rust-service-plan.yaml
|
name: default
|
||||||
|
type: docker
|
||||||
|
|
||||||
|
steps:
|
||||||
|
- name: load_secret
|
||||||
|
image: debian:buster-slim
|
||||||
|
volumes:
|
||||||
|
- name: ssh
|
||||||
|
path: /root/.ssh/
|
||||||
|
environment:
|
||||||
|
SSH_KEY:
|
||||||
|
from_secret: gitea_id_ed25519
|
||||||
|
commands:
|
||||||
|
- mkdir -p $HOME/.ssh/
|
||||||
|
- echo "$SSH_KEY" | base64 -d > $HOME/.ssh/id_ed25519
|
||||||
|
- chmod -R 600 ~/.ssh
|
||||||
|
- |
|
||||||
|
cat >$HOME/.ssh/config <<EOL
|
||||||
|
Host git.front.kjuulh.io
|
||||||
|
IdentityFile $HOME/.ssh/id_ed25519
|
||||||
|
IdentitiesOnly yes
|
||||||
|
UserKnownHostsFile=/dev/null
|
||||||
|
StrictHostKeyChecking no
|
||||||
|
EOL
|
||||||
|
- chmod 700 ~/.ssh/config
|
||||||
|
|
||||||
|
- name: build pr
|
||||||
|
image: kasperhermansen/cuddle-rust-service-plan:main-1708174643
|
||||||
|
pull: always
|
||||||
|
volumes:
|
||||||
|
- name: ssh
|
||||||
|
path: /root/.ssh/
|
||||||
|
commands:
|
||||||
|
- eval `ssh-agent`
|
||||||
|
- ssh-add
|
||||||
|
- echo "$DOCKER_PASSWORD" | docker login --password-stdin --username="$DOCKER_USERNAME" docker.io
|
||||||
|
- export RUST_LOG=trace
|
||||||
|
- cuddle-rust-service-plan pr
|
||||||
|
environment:
|
||||||
|
DOCKER_BUILDKIT: 1
|
||||||
|
DOCKER_PASSWORD:
|
||||||
|
from_secret: docker_password
|
||||||
|
DOCKER_USERNAME:
|
||||||
|
from_secret: docker_username
|
||||||
|
CUDDLE_SECRETS_PROVIDER: 1password
|
||||||
|
CUDDLE_ONE_PASSWORD_DOT_ENV: ".env.ci"
|
||||||
|
CUDDLE_SSH_AGENT: "true"
|
||||||
|
CUDDLE_PLEASE_TOKEN:
|
||||||
|
from_secret: cuddle_please_token
|
||||||
|
OP_SERVICE_ACCOUNT_TOKEN:
|
||||||
|
from_secret: op_service_account_token
|
||||||
|
DOCKER_HOST: "tcp://192.168.1.233:2376"
|
||||||
|
when:
|
||||||
|
event:
|
||||||
|
- push
|
||||||
|
- pull_request
|
||||||
|
exclude:
|
||||||
|
- main
|
||||||
|
- master
|
||||||
|
depends_on:
|
||||||
|
- "load_secret"
|
||||||
|
|
||||||
|
- name: build main
|
||||||
|
image: kasperhermansen/cuddle-rust-service-plan:main-1708174643
|
||||||
|
pull: always
|
||||||
|
volumes:
|
||||||
|
- name: ssh
|
||||||
|
path: /root/.ssh/
|
||||||
|
commands:
|
||||||
|
- eval `ssh-agent`
|
||||||
|
- ssh-add
|
||||||
|
- echo "$DOCKER_PASSWORD" | docker login --password-stdin --username="$DOCKER_USERNAME" docker.io
|
||||||
|
- export RUST_LOG=trace
|
||||||
|
- cuddle-rust-service-plan main
|
||||||
|
environment:
|
||||||
|
REGISTRY_CACHE_USERNAME:
|
||||||
|
from_secret: registry_cache_username
|
||||||
|
REGISTRY_CACHE_PASSWORD:
|
||||||
|
from_secret: registry_cache_password
|
||||||
|
REGISTRY_CACHE_TOKEN:
|
||||||
|
from_secret: registry_cache_token
|
||||||
|
REGISTRY_CACHE_url:
|
||||||
|
from_secret: registry_cache_url
|
||||||
|
DOCKER_BUILDKIT: 1
|
||||||
|
DOCKER_PASSWORD:
|
||||||
|
from_secret: docker_password
|
||||||
|
DOCKER_USERNAME:
|
||||||
|
from_secret: docker_username
|
||||||
|
CUDDLE_SECRETS_PROVIDER: 1password
|
||||||
|
CUDDLE_ONE_PASSWORD_DOT_ENV: ".env.ci"
|
||||||
|
CUDDLE_SSH_AGENT: "true"
|
||||||
|
GIT_PASSWORD:
|
||||||
|
from_secret: git_password
|
||||||
|
DOCKER_HOST: "tcp://192.168.1.233:2376"
|
||||||
|
CUDDLE_PLEASE_TOKEN:
|
||||||
|
from_secret: cuddle_please_token
|
||||||
|
OP_SERVICE_ACCOUNT_TOKEN:
|
||||||
|
from_secret: op_service_account_token
|
||||||
|
when:
|
||||||
|
event:
|
||||||
|
- push
|
||||||
|
branch:
|
||||||
|
- main
|
||||||
|
- master
|
||||||
|
exclude:
|
||||||
|
- pull_request
|
||||||
|
depends_on:
|
||||||
|
- "load_secret"
|
||||||
|
|
||||||
|
volumes:
|
||||||
|
- name: ssh
|
||||||
|
temp: {}
|
||||||
|
2
.env
2
.env
@ -5,5 +5,3 @@ AWS_SECRET_ACCESS_KEY=minioadminpassword
|
|||||||
NATS_URL=127.0.0.1:4222
|
NATS_URL=127.0.0.1:4222
|
||||||
NATS_USERNAME=user
|
NATS_USERNAME=user
|
||||||
NATS_PASSWORD=secret
|
NATS_PASSWORD=secret
|
||||||
|
|
||||||
DATABASE_URL=postgres://root@localhost:26257/defaultdb?sslmode=disable
|
|
||||||
|
2338
Cargo.lock
generated
2338
Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
@ -11,32 +11,21 @@ tracing-subscriber.workspace = true
|
|||||||
clap.workspace = true
|
clap.workspace = true
|
||||||
dotenv.workspace = true
|
dotenv.workspace = true
|
||||||
axum.workspace = true
|
axum.workspace = true
|
||||||
prost = "0.13.2"
|
prost = "0.12.3"
|
||||||
tonic = { version = "0.12.2", features = ["tls", "tls-native-roots"] }
|
tonic = "0.11.0"
|
||||||
uuid = { version = "1.7.0", features = ["v7", "v4"] }
|
uuid = { version = "1.7.0", features = ["v7", "v4"] }
|
||||||
async-trait = "0.1.77"
|
async-trait = "0.1.77"
|
||||||
aws-config = { version = "1.5.5", features = ["behavior-version-latest"] }
|
mockall_double = "0.3.1"
|
||||||
aws-sdk-s3 = { version = "1.48.0", features = ["behavior-version-latest"] }
|
aws-config = { version = "1.1.5", features = ["behavior-version-latest"] }
|
||||||
|
aws-sdk-s3 = { version = "1.15.0", features = ["behavior-version-latest"] }
|
||||||
serde = { version = "1.0.196", features = ["derive"] }
|
serde = { version = "1.0.196", features = ["derive"] }
|
||||||
serde_json = "1.0.113"
|
serde_json = "1.0.113"
|
||||||
nats = "0.25.0"
|
nats = "0.24.1"
|
||||||
walkdir = "2.4.0"
|
walkdir = "2.4.0"
|
||||||
tar = "0.4.40"
|
tar = "0.4.40"
|
||||||
tokio-stream = { version = "0.1.15", features = ["full"] }
|
|
||||||
rand = "0.8.5"
|
|
||||||
sqlx = { version = "0.8.0", features = [
|
|
||||||
"postgres",
|
|
||||||
"runtime-tokio",
|
|
||||||
"uuid",
|
|
||||||
"chrono",
|
|
||||||
] }
|
|
||||||
chrono = "0.4.34"
|
|
||||||
git2 = "0.19.0"
|
|
||||||
rustls = { version = "0.23.12" }
|
|
||||||
|
|
||||||
[build-dependencies]
|
[build-dependencies]
|
||||||
tonic-build = "0.12.0"
|
tonic-build = "0.11.0"
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
lazy_static = "1.4.0"
|
mockall = "0.12.1"
|
||||||
reqwest = "0.12.0"
|
|
||||||
|
@ -1,8 +0,0 @@
|
|||||||
-- Add migration script here
|
|
||||||
CREATE TABLE IF NOT EXISTS artifacts (
|
|
||||||
app VARCHAR NOT NULL,
|
|
||||||
branch VARCHAR NOT NULL,
|
|
||||||
artifact_id UUID NOT NULL PRIMARY KEY,
|
|
||||||
created_at TIMESTAMP DEFAULT NOW(),
|
|
||||||
UNIQUE (app, artifact_id)
|
|
||||||
);
|
|
@ -2,33 +2,16 @@ syntax = "proto3";
|
|||||||
|
|
||||||
package flux_releaser;
|
package flux_releaser;
|
||||||
|
|
||||||
service FluxReleaser {
|
service Greeter {
|
||||||
rpc UploadArtifact (stream UploadArtifactRequest) returns (UploadArtifactResponse) {}
|
rpc SayHello (HelloRequest) returns (HelloReply) {}
|
||||||
rpc CommitArtifact (CommitArtifactRequest) returns (CommitArtifactResponse) {}
|
|
||||||
rpc TriggerRelease (TriggerReleaseRequest) returns (TriggerReleaseResponse) {}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
message UploadArtifactRequest {
|
message HelloRequest {
|
||||||
bytes content = 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
message UploadArtifactResponse {
|
|
||||||
string upload_id = 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
message CommitArtifactRequest {
|
|
||||||
string app = 1;
|
string app = 1;
|
||||||
string branch = 2;
|
string branch = 2;
|
||||||
string upload_id = 3;
|
string folder = 3; // Change to files instead
|
||||||
}
|
}
|
||||||
|
|
||||||
message CommitArtifactResponse {
|
message HelloReply {
|
||||||
string artifact_id = 1;
|
string message = 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
message TriggerReleaseRequest {
|
|
||||||
string app = 1;
|
|
||||||
string branch = 2;
|
|
||||||
}
|
|
||||||
|
|
||||||
message TriggerReleaseResponse {}
|
|
||||||
|
@ -1,14 +1,11 @@
|
|||||||
use std::net::SocketAddr;
|
use std::net::SocketAddr;
|
||||||
|
|
||||||
use axum::{response::IntoResponse, routing::get, Router};
|
use axum::{routing::get, Router};
|
||||||
|
|
||||||
use crate::app::SharedApp;
|
use crate::app::SharedApp;
|
||||||
|
|
||||||
pub async fn axum_serve(host: SocketAddr, app: SharedApp) -> anyhow::Result<()> {
|
pub async fn axum_serve(host: SocketAddr, app: SharedApp) -> anyhow::Result<()> {
|
||||||
let app = Router::new()
|
let app = Router::new().route("/", get(root)).with_state(app);
|
||||||
.route("/ping", get(pong))
|
|
||||||
.route("/", get(root))
|
|
||||||
.with_state(app);
|
|
||||||
|
|
||||||
tracing::info!("listening on {}", host);
|
tracing::info!("listening on {}", host);
|
||||||
let listener = tokio::net::TcpListener::bind(host).await.unwrap();
|
let listener = tokio::net::TcpListener::bind(host).await.unwrap();
|
||||||
@ -18,10 +15,6 @@ pub async fn axum_serve(host: SocketAddr, app: SharedApp) -> anyhow::Result<()>
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn pong() -> impl IntoResponse {
|
|
||||||
"pong!"
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn root() -> &'static str {
|
async fn root() -> &'static str {
|
||||||
"Hello, flux-releaser!"
|
"Hello, flux-releaser!"
|
||||||
}
|
}
|
||||||
|
@ -1,17 +1,9 @@
|
|||||||
use std::{ops::Deref, sync::Arc};
|
use std::{ops::Deref, sync::Arc};
|
||||||
|
|
||||||
use sqlx::{PgPool, Postgres};
|
|
||||||
|
|
||||||
use crate::services::{
|
|
||||||
archive::Archive,
|
|
||||||
cluster_list::ClusterList,
|
|
||||||
git::{Git, SharedGit},
|
|
||||||
};
|
|
||||||
|
|
||||||
use self::infra::{
|
|
||||||
aws_s3::s3_client,
|
use self::infra::aws_s3::s3_client;
|
||||||
grpc::{new_client, FluxReleaserGrpcClient},
|
|
||||||
};
|
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct SharedApp(Arc<App>);
|
pub struct SharedApp(Arc<App>);
|
||||||
@ -33,8 +25,6 @@ impl SharedApp {
|
|||||||
pub struct App {
|
pub struct App {
|
||||||
pub s3_client: aws_sdk_s3::Client,
|
pub s3_client: aws_sdk_s3::Client,
|
||||||
pub nats: infra::nats::Nats,
|
pub nats: infra::nats::Nats,
|
||||||
pub database: PgPool,
|
|
||||||
pub git: SharedGit,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl App {
|
impl App {
|
||||||
@ -42,45 +32,8 @@ impl App {
|
|||||||
Ok(Self {
|
Ok(Self {
|
||||||
s3_client: s3_client().await?,
|
s3_client: s3_client().await?,
|
||||||
nats: infra::nats::Nats::new().await?,
|
nats: infra::nats::Nats::new().await?,
|
||||||
database: infra::database::get_database().await?,
|
|
||||||
git: Git::new(
|
|
||||||
std::env::var("FLUX_RELEASER_GIT_REPOSITORY")
|
|
||||||
.unwrap_or("ssh://git@git.front.kjuulh.io/kjuulh/clank-clusters.git".into()),
|
|
||||||
ClusterList::default(),
|
|
||||||
Archive::default(),
|
|
||||||
)
|
|
||||||
.into(),
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub mod infra;
|
pub mod infra;
|
||||||
|
|
||||||
#[derive(Clone)]
|
|
||||||
pub struct SharedLocalApp(Arc<LocalApp>);
|
|
||||||
|
|
||||||
impl Deref for SharedLocalApp {
|
|
||||||
type Target = Arc<LocalApp>;
|
|
||||||
|
|
||||||
fn deref(&self) -> &Self::Target {
|
|
||||||
&self.0
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl SharedLocalApp {
|
|
||||||
pub fn new(app: LocalApp) -> Self {
|
|
||||||
Self(Arc::new(app))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct LocalApp {
|
|
||||||
pub grpc_client: FluxReleaserGrpcClient,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl LocalApp {
|
|
||||||
pub async fn new(registry: impl Into<String>) -> anyhow::Result<Self> {
|
|
||||||
Ok(Self {
|
|
||||||
grpc_client: new_client(registry).await?,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
@ -1,4 +1,2 @@
|
|||||||
pub mod aws_s3;
|
pub mod aws_s3;
|
||||||
pub mod database;
|
|
||||||
pub mod grpc;
|
|
||||||
pub mod nats;
|
pub mod nats;
|
||||||
|
@ -1,4 +1,3 @@
|
|||||||
use anyhow::Context;
|
|
||||||
use aws_config::{BehaviorVersion, Region};
|
use aws_config::{BehaviorVersion, Region};
|
||||||
use aws_sdk_s3::config::Credentials;
|
use aws_sdk_s3::config::Credentials;
|
||||||
|
|
||||||
@ -6,16 +5,15 @@ pub async fn s3_client() -> anyhow::Result<aws_sdk_s3::Client> {
|
|||||||
let shared_config = aws_config::defaults(BehaviorVersion::latest())
|
let shared_config = aws_config::defaults(BehaviorVersion::latest())
|
||||||
.region(Region::new("eu-west-1"))
|
.region(Region::new("eu-west-1"))
|
||||||
.credentials_provider(Credentials::new(
|
.credentials_provider(Credentials::new(
|
||||||
std::env::var("AWS_ACCESS_KEY_ID").context("AWS_ACCESS_KEY_ID was not set")?,
|
std::env::var("AWS_ACCESS_KEY_ID")?,
|
||||||
std::env::var("AWS_SECRET_ACCESS_KEY").context("AWS_SECRET_ACCESS_KEY was not set")?,
|
std::env::var("AWS_SECRET_ACCESS_KEY")?,
|
||||||
None,
|
None,
|
||||||
None,
|
None,
|
||||||
"flux_releaser",
|
"flux_releaser",
|
||||||
));
|
));
|
||||||
|
|
||||||
let config = aws_sdk_s3::config::Builder::from(&shared_config.load().await)
|
let config = aws_sdk_s3::config::Builder::from(&shared_config.load().await)
|
||||||
.endpoint_url(std::env::var("AWS_ENDPOINT_URL").context("AWS_ENDPOINT_URL was not set")?)
|
.endpoint_url(std::env::var("AWS_ENDPOINT_URL")?)
|
||||||
.force_path_style(true)
|
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
let client = aws_sdk_s3::Client::from_conf(config);
|
let client = aws_sdk_s3::Client::from_conf(config);
|
||||||
|
@ -1,19 +0,0 @@
|
|||||||
use anyhow::Context;
|
|
||||||
use sqlx::{PgPool, Postgres};
|
|
||||||
|
|
||||||
pub async fn get_database() -> anyhow::Result<PgPool> {
|
|
||||||
tracing::trace!("initializing database");
|
|
||||||
let db =
|
|
||||||
sqlx::PgPool::connect(&std::env::var("DATABASE_URL").context("DATABASE_URL is not set")?)
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
tracing::trace!("migrating crdb");
|
|
||||||
sqlx::migrate!("migrations/crdb")
|
|
||||||
.set_locking(false)
|
|
||||||
.run(&db)
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
let _ = sqlx::query("SELECT 1;").fetch_one(&db).await?;
|
|
||||||
|
|
||||||
Ok(db)
|
|
||||||
}
|
|
@ -1,36 +0,0 @@
|
|||||||
use std::sync::Arc;
|
|
||||||
|
|
||||||
use tokio::sync::Mutex;
|
|
||||||
use tonic::transport::{Channel, ClientTlsConfig};
|
|
||||||
|
|
||||||
use crate::grpc::gen::flux_releaser_client::FluxReleaserClient;
|
|
||||||
|
|
||||||
pub type FluxReleaserGrpcClient = Arc<Mutex<FluxReleaserClient<Channel>>>;
|
|
||||||
|
|
||||||
pub async fn new_client(registry: impl Into<String>) -> anyhow::Result<FluxReleaserGrpcClient> {
|
|
||||||
let registry: String = registry.into();
|
|
||||||
|
|
||||||
// Workaround: https://github.com/algesten/ureq/issues/765#issuecomment-2282921492
|
|
||||||
static INIT: std::sync::Once = std::sync::Once::new();
|
|
||||||
|
|
||||||
INIT.call_once(|| {
|
|
||||||
rustls::crypto::aws_lc_rs::default_provider()
|
|
||||||
.install_default()
|
|
||||||
.unwrap();
|
|
||||||
});
|
|
||||||
|
|
||||||
let channel = if registry.starts_with("https") {
|
|
||||||
let tls = ClientTlsConfig::new().with_native_roots();
|
|
||||||
|
|
||||||
Channel::from_shared(registry)?
|
|
||||||
.tls_config(tls)?
|
|
||||||
.connect()
|
|
||||||
.await?
|
|
||||||
} else {
|
|
||||||
Channel::from_shared(registry)?.connect().await?
|
|
||||||
};
|
|
||||||
|
|
||||||
let client = FluxReleaserClient::new(channel);
|
|
||||||
|
|
||||||
Ok(Arc::new(Mutex::new(client)))
|
|
||||||
}
|
|
@ -1,10 +1,11 @@
|
|||||||
use clap::{Parser, Subcommand};
|
use clap::{Parser, Subcommand};
|
||||||
use std::net::SocketAddr;
|
use std::net::SocketAddr;
|
||||||
|
|
||||||
use crate::services::flux_local_cluster::extensions::FluxLocalClusterManagerExt;
|
use crate::{
|
||||||
|
api::axum_serve,
|
||||||
pub mod client;
|
app::{App, SharedApp},
|
||||||
pub mod server;
|
grpc::tonic_serve,
|
||||||
|
};
|
||||||
|
|
||||||
#[derive(Parser)]
|
#[derive(Parser)]
|
||||||
#[command(author, version, about, long_about = None, subcommand_required = true)]
|
#[command(author, version, about, long_about = None, subcommand_required = true)]
|
||||||
@ -21,62 +22,27 @@ pub enum Commands {
|
|||||||
#[arg(env = "SERVICE_GRPC_HOST", long, default_value = "127.0.0.1:7900")]
|
#[arg(env = "SERVICE_GRPC_HOST", long, default_value = "127.0.0.1:7900")]
|
||||||
grpc_host: SocketAddr,
|
grpc_host: SocketAddr,
|
||||||
},
|
},
|
||||||
|
|
||||||
Commit {
|
|
||||||
#[arg(long)]
|
|
||||||
app: String,
|
|
||||||
#[arg(long)]
|
|
||||||
branch: String,
|
|
||||||
#[arg(long)]
|
|
||||||
include: String,
|
|
||||||
#[arg(env = "FLUX_RELEASER_REGISTRY", long)]
|
|
||||||
registry: String,
|
|
||||||
},
|
|
||||||
|
|
||||||
Release {
|
|
||||||
#[arg(long)]
|
|
||||||
app: String,
|
|
||||||
|
|
||||||
#[arg(long)]
|
|
||||||
branch: String,
|
|
||||||
#[arg(env = "FLUX_RELEASER_REGISTRY", long)]
|
|
||||||
registry: String,
|
|
||||||
},
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Command {
|
impl Command {
|
||||||
pub async fn run() -> anyhow::Result<()> {
|
pub async fn run() -> anyhow::Result<()> {
|
||||||
let cli = Command::parse();
|
let cli = Command::parse();
|
||||||
|
|
||||||
match cli.command {
|
if let Some(Commands::Serve { host, grpc_host }) = cli.command {
|
||||||
Some(Commands::Serve { host, grpc_host }) => {
|
tracing_subscriber::fmt::init();
|
||||||
server::run_server(host, grpc_host).await?;
|
|
||||||
}
|
|
||||||
Some(Commands::Commit {
|
|
||||||
app,
|
|
||||||
branch,
|
|
||||||
include,
|
|
||||||
registry,
|
|
||||||
}) => {
|
|
||||||
let app = client::get_local_app(registry).await?;
|
|
||||||
|
|
||||||
let upload_id = app
|
tracing::info!("Starting service");
|
||||||
.flux_local_cluster_manager()
|
|
||||||
.package_clusters(include)
|
|
||||||
.await?;
|
|
||||||
}
|
|
||||||
Some(Commands::Release {
|
|
||||||
app: service_app,
|
|
||||||
branch,
|
|
||||||
registry,
|
|
||||||
}) => {
|
|
||||||
let app = client::get_local_app(registry).await?;
|
|
||||||
|
|
||||||
app.flux_local_cluster_manager()
|
let app = SharedApp::new(App::new().await?);
|
||||||
.trigger_release(service_app, branch)
|
|
||||||
.await?;
|
tokio::select! {
|
||||||
}
|
res = axum_serve(host, app.clone()) => {
|
||||||
None => (),
|
res?;
|
||||||
|
},
|
||||||
|
res = tonic_serve(grpc_host, app.clone()) => {
|
||||||
|
res?;
|
||||||
|
},
|
||||||
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
|
@ -1,11 +0,0 @@
|
|||||||
use crate::app::{LocalApp, SharedLocalApp};
|
|
||||||
|
|
||||||
pub async fn get_local_app(registry: impl Into<String>) -> anyhow::Result<SharedLocalApp> {
|
|
||||||
tracing_subscriber::fmt::init();
|
|
||||||
|
|
||||||
tracing::debug!("Starting client commit");
|
|
||||||
|
|
||||||
let app = SharedLocalApp::new(LocalApp::new(registry).await?);
|
|
||||||
|
|
||||||
Ok(app)
|
|
||||||
}
|
|
@ -1,29 +0,0 @@
|
|||||||
use std::net::SocketAddr;
|
|
||||||
|
|
||||||
use crate::{
|
|
||||||
api::axum_serve,
|
|
||||||
app::{App, SharedApp},
|
|
||||||
grpc::tonic_serve,
|
|
||||||
};
|
|
||||||
|
|
||||||
pub async fn run_server(
|
|
||||||
host: impl Into<SocketAddr>,
|
|
||||||
grpc_host: impl Into<SocketAddr>,
|
|
||||||
) -> anyhow::Result<()> {
|
|
||||||
tracing_subscriber::fmt::init();
|
|
||||||
|
|
||||||
tracing::info!("Starting service");
|
|
||||||
|
|
||||||
let app = SharedApp::new(App::new().await?);
|
|
||||||
|
|
||||||
tokio::select! {
|
|
||||||
res = axum_serve(host.into(), app.clone()) => {
|
|
||||||
res?;
|
|
||||||
},
|
|
||||||
res = tonic_serve(grpc_host.into(), app.clone()) => {
|
|
||||||
res?;
|
|
||||||
},
|
|
||||||
};
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
@ -1,25 +1,17 @@
|
|||||||
use std::{env::temp_dir, fmt::Display, net::SocketAddr};
|
use std::net::SocketAddr;
|
||||||
|
|
||||||
use tokio::io::AsyncWriteExt;
|
use tonic::transport::Server;
|
||||||
use tokio_stream::StreamExt;
|
|
||||||
use tonic::{service::interceptor, transport::Server};
|
|
||||||
use uuid::Uuid;
|
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
app::SharedApp,
|
app::SharedApp,
|
||||||
services::release_manager::{
|
services::release_manager::{
|
||||||
extensions::ReleaseManagerExt,
|
extensions::ReleaseManagerExt, models::CommitArtifact, ReleaseManager,
|
||||||
models::{CommitArtifact, Release},
|
|
||||||
ReleaseManager,
|
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
|
|
||||||
use self::gen::{
|
use self::gen::{greeter_server, HelloReply, HelloRequest};
|
||||||
flux_releaser_server, CommitArtifactRequest, CommitArtifactResponse, TriggerReleaseRequest,
|
|
||||||
TriggerReleaseResponse, UploadArtifactRequest, UploadArtifactResponse,
|
|
||||||
};
|
|
||||||
|
|
||||||
pub mod gen {
|
mod gen {
|
||||||
tonic::include_proto!("flux_releaser");
|
tonic::include_proto!("flux_releaser");
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -35,139 +27,45 @@ impl FluxReleaserGrpc {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl std::fmt::Debug for FluxReleaserGrpc {
|
|
||||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tonic::async_trait]
|
#[tonic::async_trait]
|
||||||
impl flux_releaser_server::FluxReleaser for FluxReleaserGrpc {
|
impl greeter_server::Greeter for FluxReleaserGrpc {
|
||||||
#[tracing::instrument]
|
async fn say_hello(
|
||||||
async fn upload_artifact(
|
|
||||||
&self,
|
&self,
|
||||||
request: tonic::Request<tonic::Streaming<UploadArtifactRequest>>,
|
request: tonic::Request<HelloRequest>,
|
||||||
) -> std::result::Result<tonic::Response<UploadArtifactResponse>, tonic::Status> {
|
) -> std::result::Result<tonic::Response<HelloReply>, tonic::Status> {
|
||||||
let mut stream = request.into_inner();
|
|
||||||
|
|
||||||
let file_path = temp_dir()
|
|
||||||
.join("flux_releaser")
|
|
||||||
.join("tmp")
|
|
||||||
.join("upload_artifact")
|
|
||||||
.join(Uuid::new_v4().to_string());
|
|
||||||
tokio::fs::create_dir_all(file_path.parent().unwrap()).await?;
|
|
||||||
let mut file = tokio::fs::File::create(&file_path).await?;
|
|
||||||
|
|
||||||
while let Some(item) = stream.next().await {
|
|
||||||
tracing::trace!("received chunk");
|
|
||||||
let item = item?;
|
|
||||||
|
|
||||||
let _ = file.write(&item.content).await?;
|
|
||||||
}
|
|
||||||
tracing::info!("got this far 1a");
|
|
||||||
|
|
||||||
file.flush().await?;
|
|
||||||
tracing::info!("got this far 1");
|
|
||||||
|
|
||||||
let upload_id = match self.release_manager.upload_artifact(file_path.into()).await {
|
|
||||||
Ok(res) => res,
|
|
||||||
Err(e) => {
|
|
||||||
tracing::warn!("failed to upload artifact: {}", e);
|
|
||||||
return Err(tonic::Status::unknown(e.to_string()));
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
tracing::info!("got this far 2");
|
|
||||||
|
|
||||||
Ok(tonic::Response::new(UploadArtifactResponse {
|
|
||||||
upload_id: upload_id.to_string(),
|
|
||||||
}))
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tracing::instrument]
|
|
||||||
async fn commit_artifact(
|
|
||||||
&self,
|
|
||||||
request: tonic::Request<CommitArtifactRequest>,
|
|
||||||
) -> std::result::Result<tonic::Response<CommitArtifactResponse>, tonic::Status> {
|
|
||||||
let req = request.into_inner();
|
let req = request.into_inner();
|
||||||
let artifact = self
|
|
||||||
.release_manager
|
|
||||||
.commit_artifact(req.try_into().map_err(|e: anyhow::Error| {
|
|
||||||
tracing::warn!("failed to parse input body: {}", e);
|
|
||||||
tonic::Status::invalid_argument(e.to_string())
|
|
||||||
})?)
|
|
||||||
.await
|
|
||||||
.map_err(|e: anyhow::Error| {
|
|
||||||
tracing::warn!("failed to commit artifact: {}", e);
|
|
||||||
tonic::Status::internal(e.to_string())
|
|
||||||
})?;
|
|
||||||
|
|
||||||
Ok(tonic::Response::new(CommitArtifactResponse {
|
|
||||||
artifact_id: artifact.to_string(),
|
|
||||||
}))
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tracing::instrument]
|
|
||||||
async fn trigger_release(
|
|
||||||
&self,
|
|
||||||
request: tonic::Request<TriggerReleaseRequest>,
|
|
||||||
) -> std::result::Result<tonic::Response<TriggerReleaseResponse>, tonic::Status> {
|
|
||||||
let req = request.into_inner();
|
|
||||||
|
|
||||||
tracing::info!("some trigger release");
|
|
||||||
|
|
||||||
self.release_manager
|
self.release_manager
|
||||||
.release(req.try_into().map_err(|e: anyhow::Error| {
|
.commit_artifact(
|
||||||
tracing::warn!("failed to parse input body: {}", e);
|
req.try_into()
|
||||||
tonic::Status::invalid_argument(e.to_string())
|
.map_err(|e: anyhow::Error| tonic::Status::invalid_argument(e.to_string()))?,
|
||||||
})?)
|
)
|
||||||
.await
|
.await
|
||||||
.map_err(|e| {
|
.unwrap();
|
||||||
tracing::warn!("failed to release: {}", e);
|
|
||||||
tonic::Status::internal(e.to_string())
|
|
||||||
})?;
|
|
||||||
|
|
||||||
Ok(tonic::Response::new(TriggerReleaseResponse {}))
|
Ok(tonic::Response::new(HelloReply {
|
||||||
|
message: "something".into(),
|
||||||
|
}))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl TryFrom<CommitArtifactRequest> for CommitArtifact {
|
impl TryFrom<HelloRequest> for CommitArtifact {
|
||||||
type Error = anyhow::Error;
|
type Error = anyhow::Error;
|
||||||
|
|
||||||
fn try_from(value: CommitArtifactRequest) -> Result<Self, Self::Error> {
|
fn try_from(value: HelloRequest) -> Result<Self, Self::Error> {
|
||||||
if value.app.is_empty() {
|
if value.app.is_empty() {
|
||||||
anyhow::bail!("app cannot be empty")
|
anyhow::bail!("app cannot be empty")
|
||||||
}
|
}
|
||||||
if value.branch.is_empty() {
|
if value.branch.is_empty() {
|
||||||
anyhow::bail!("branch cannot be empty")
|
anyhow::bail!("branch cannot be empty")
|
||||||
}
|
}
|
||||||
if value.upload_id.is_empty() {
|
if value.folder.is_empty() {
|
||||||
anyhow::bail!("folder cannot be empty")
|
anyhow::bail!("folder cannot be empty")
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(Self {
|
Ok(Self {
|
||||||
app: value.app,
|
app: value.app,
|
||||||
branch: value.branch,
|
branch: value.branch,
|
||||||
upload_id: value.upload_id.try_into()?,
|
folder: value.folder.into(),
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl TryFrom<TriggerReleaseRequest> for Release {
|
|
||||||
type Error = anyhow::Error;
|
|
||||||
|
|
||||||
fn try_from(value: TriggerReleaseRequest) -> Result<Self, Self::Error> {
|
|
||||||
if value.app.is_empty() {
|
|
||||||
anyhow::bail!("app cannot be empty");
|
|
||||||
}
|
|
||||||
|
|
||||||
if value.branch.is_empty() {
|
|
||||||
anyhow::bail!("branch canot be empty");
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(Self {
|
|
||||||
app: value.app,
|
|
||||||
branch: value.branch,
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -175,14 +73,11 @@ impl TryFrom<TriggerReleaseRequest> for Release {
|
|||||||
pub async fn tonic_serve(host: SocketAddr, app: SharedApp) -> anyhow::Result<()> {
|
pub async fn tonic_serve(host: SocketAddr, app: SharedApp) -> anyhow::Result<()> {
|
||||||
tracing::info!("grpc listening on: {}", host);
|
tracing::info!("grpc listening on: {}", host);
|
||||||
Server::builder()
|
Server::builder()
|
||||||
.trace_fn(|_| tracing::info_span!("flux_releaser"))
|
.add_service(greeter_server::GreeterServer::new(FluxReleaserGrpc::new(
|
||||||
.add_service(flux_releaser_server::FluxReleaserServer::new(
|
app,
|
||||||
FluxReleaserGrpc::new(app),
|
)))
|
||||||
))
|
|
||||||
.serve(host)
|
.serve(host)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct LogLayer {}
|
|
||||||
|
@ -1,15 +0,0 @@
|
|||||||
use cli::Command;
|
|
||||||
|
|
||||||
pub mod api;
|
|
||||||
pub mod app;
|
|
||||||
pub mod cli;
|
|
||||||
pub mod grpc;
|
|
||||||
pub mod services;
|
|
||||||
|
|
||||||
pub async fn run() -> anyhow::Result<()> {
|
|
||||||
dotenv::dotenv().ok();
|
|
||||||
|
|
||||||
Command::run().await?;
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
@ -1,6 +1,19 @@
|
|||||||
|
use cli::Command;
|
||||||
|
|
||||||
|
mod cli;
|
||||||
|
|
||||||
|
mod api;
|
||||||
|
mod grpc;
|
||||||
|
|
||||||
|
mod app;
|
||||||
|
|
||||||
|
mod services;
|
||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
async fn main() -> anyhow::Result<()> {
|
async fn main() -> anyhow::Result<()> {
|
||||||
flux_releaser::run().await?;
|
dotenv::dotenv().ok();
|
||||||
|
|
||||||
|
Command::run().await?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
@ -1,11 +1,5 @@
|
|||||||
pub mod archive;
|
mod archive;
|
||||||
pub mod cluster_list;
|
mod domain_events;
|
||||||
pub mod domain_events;
|
mod file_reader;
|
||||||
pub mod file_reader;
|
mod file_store;
|
||||||
pub mod file_store;
|
|
||||||
pub mod flux_local_cluster;
|
|
||||||
pub mod flux_releaser_uploader;
|
|
||||||
pub mod git;
|
|
||||||
pub mod release_manager;
|
pub mod release_manager;
|
||||||
|
|
||||||
pub mod artifacts_db;
|
|
||||||
|
@ -1,75 +1,7 @@
|
|||||||
#[derive(Clone, Default)]
|
#[derive(Clone)]
|
||||||
pub struct Archive {}
|
pub struct Archive {}
|
||||||
|
|
||||||
use std::{
|
use std::{io::Cursor, path::Path};
|
||||||
io::{Bytes, Cursor},
|
|
||||||
path::Path,
|
|
||||||
};
|
|
||||||
|
|
||||||
use anyhow::Context;
|
|
||||||
|
|
||||||
use super::file_reader::Files;
|
|
||||||
|
|
||||||
impl Archive {
|
|
||||||
pub fn new() -> Self {
|
|
||||||
Self {}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn create_archive(&self, prefix: &Path, files: Files) -> anyhow::Result<ArchiveFile> {
|
|
||||||
tracing::trace!("archiving files: {}", files.len());
|
|
||||||
|
|
||||||
let buffer = Vec::new();
|
|
||||||
let cursor = Cursor::new(buffer);
|
|
||||||
|
|
||||||
let mut tar_builder = tar::Builder::new(cursor);
|
|
||||||
|
|
||||||
for file in files {
|
|
||||||
let abs_file_path = file.path;
|
|
||||||
|
|
||||||
let mut fd = std::fs::File::open(&abs_file_path)?;
|
|
||||||
if let Some(rel) = file.relative {
|
|
||||||
tracing::trace!("archiving rel file: {}", rel.display());
|
|
||||||
tar_builder.append_file(&rel, &mut fd)?;
|
|
||||||
} else {
|
|
||||||
tracing::trace!("archiving file: {}", abs_file_path.display());
|
|
||||||
tar_builder.append_file(
|
|
||||||
abs_file_path
|
|
||||||
.strip_prefix(prefix)
|
|
||||||
.context("failed to strip prefix from path")?,
|
|
||||||
&mut fd,
|
|
||||||
)?;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
tar_builder.finish()?;
|
|
||||||
|
|
||||||
let cursor = tar_builder.into_inner()?;
|
|
||||||
let buffer = cursor.into_inner();
|
|
||||||
|
|
||||||
Ok(buffer.into())
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn unpack_archive(&self, archive: &ArchiveFile, dest: &Path) -> anyhow::Result<()> {
|
|
||||||
tracing::trace!("unpacking archive: {}", dest.display());
|
|
||||||
|
|
||||||
let cursor = Cursor::new(archive.content.clone());
|
|
||||||
let mut arc = tar::Archive::new(cursor);
|
|
||||||
|
|
||||||
arc.unpack(dest)?;
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct ArchiveFile {
|
|
||||||
pub content: Vec<u8>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl From<Vec<u8>> for ArchiveFile {
|
|
||||||
fn from(value: Vec<u8>) -> Self {
|
|
||||||
Self { content: value }
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub mod extensions {
|
pub mod extensions {
|
||||||
|
|
||||||
@ -79,11 +11,12 @@ pub mod extensions {
|
|||||||
use tokio::io::AsyncWriteExt;
|
use tokio::io::AsyncWriteExt;
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
|
||||||
use crate::app::SharedLocalApp;
|
|
||||||
use crate::{app::SharedApp, services::release_manager::models::ArtifactID};
|
use crate::{app::SharedApp, services::release_manager::models::ArtifactID};
|
||||||
|
|
||||||
|
#[mockall_double::double]
|
||||||
use crate::services::file_store::FileStore;
|
use crate::services::file_store::FileStore;
|
||||||
|
|
||||||
|
#[mockall_double::double]
|
||||||
use super::Archive;
|
use super::Archive;
|
||||||
use super::ArchiveFile;
|
use super::ArchiveFile;
|
||||||
|
|
||||||
@ -97,12 +30,6 @@ pub mod extensions {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ArchiveExt for SharedLocalApp {
|
|
||||||
fn archive(&self) -> Archive {
|
|
||||||
Archive::new()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
pub trait ArchiveUploadExt {
|
pub trait ArchiveUploadExt {
|
||||||
async fn upload_archive(
|
async fn upload_archive(
|
||||||
@ -164,3 +91,49 @@ pub mod extensions {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
use mockall::{automock, mock, predicate::*};
|
||||||
|
|
||||||
|
use super::file_reader::{File, Files};
|
||||||
|
|
||||||
|
#[cfg_attr(test, automock)]
|
||||||
|
impl Archive {
|
||||||
|
pub fn new() -> Self {
|
||||||
|
Self {}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn create_archive(&self, files: Files) -> anyhow::Result<ArchiveFile> {
|
||||||
|
tracing::trace!("archiving files");
|
||||||
|
|
||||||
|
let buffer = Vec::new();
|
||||||
|
let cursor = Cursor::new(buffer);
|
||||||
|
|
||||||
|
let mut tar_builder = tar::Builder::new(cursor);
|
||||||
|
|
||||||
|
for file in files {
|
||||||
|
let abs_file_path = file.path;
|
||||||
|
|
||||||
|
tracing::trace!("archiving file: {}", abs_file_path.display());
|
||||||
|
let mut fd = std::fs::File::open(&abs_file_path)?;
|
||||||
|
tar_builder.append_file(&abs_file_path, &mut fd)?;
|
||||||
|
}
|
||||||
|
|
||||||
|
tar_builder.finish()?;
|
||||||
|
|
||||||
|
let cursor = tar_builder.into_inner()?;
|
||||||
|
let buffer = cursor.into_inner();
|
||||||
|
|
||||||
|
Ok(buffer.into())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct ArchiveFile {
|
||||||
|
pub content: Vec<u8>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<Vec<u8>> for ArchiveFile {
|
||||||
|
fn from(value: Vec<u8>) -> Self {
|
||||||
|
Self { content: value }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -1,132 +0,0 @@
|
|||||||
use std::sync::Arc;
|
|
||||||
|
|
||||||
use sqlx::{prelude::FromRow, PgPool};
|
|
||||||
|
|
||||||
use self::defaults::DefaultArtifactsDB;
|
|
||||||
|
|
||||||
#[derive(Clone, Debug)]
|
|
||||||
pub struct AddCommitArtifact {
|
|
||||||
pub app: String,
|
|
||||||
pub branch: String,
|
|
||||||
pub artifact_id: uuid::Uuid,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Clone, Debug, FromRow)]
|
|
||||||
pub struct Artifact {
|
|
||||||
pub app: String,
|
|
||||||
pub branch: String,
|
|
||||||
pub artifact_id: uuid::Uuid,
|
|
||||||
pub created_at: chrono::NaiveDateTime,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Clone, Debug)]
|
|
||||||
pub struct GetLatestArtifact {
|
|
||||||
pub app: String,
|
|
||||||
pub branch: String,
|
|
||||||
}
|
|
||||||
|
|
||||||
pub mod traits {
|
|
||||||
use axum::async_trait;
|
|
||||||
|
|
||||||
use super::{AddCommitArtifact, Artifact, GetLatestArtifact};
|
|
||||||
|
|
||||||
#[async_trait]
|
|
||||||
pub trait ArtifactsDB {
|
|
||||||
async fn commit_artifact(&self, commit_artifact: AddCommitArtifact) -> anyhow::Result<()>;
|
|
||||||
async fn get_latest_artifact(
|
|
||||||
&self,
|
|
||||||
get_latest_artifact: GetLatestArtifact,
|
|
||||||
) -> anyhow::Result<Artifact>;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub mod defaults {
|
|
||||||
use axum::async_trait;
|
|
||||||
use sqlx::PgPool;
|
|
||||||
|
|
||||||
use super::{traits, AddCommitArtifact, Artifact, GetLatestArtifact};
|
|
||||||
|
|
||||||
pub struct DefaultArtifactsDB {
|
|
||||||
db: PgPool,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl DefaultArtifactsDB {
|
|
||||||
pub fn new(db: PgPool) -> Self {
|
|
||||||
Self { db }
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[async_trait]
|
|
||||||
impl traits::ArtifactsDB for DefaultArtifactsDB {
|
|
||||||
async fn commit_artifact(&self, commit_artifact: AddCommitArtifact) -> anyhow::Result<()> {
|
|
||||||
sqlx::query("INSERT INTO artifacts (app, branch, artifact_id) VALUES ($1, $2, $3)")
|
|
||||||
.bind(commit_artifact.app)
|
|
||||||
.bind(commit_artifact.branch)
|
|
||||||
.bind(commit_artifact.artifact_id)
|
|
||||||
.execute(&self.db)
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn get_latest_artifact(
|
|
||||||
&self,
|
|
||||||
get_latest_artifact: GetLatestArtifact,
|
|
||||||
) -> anyhow::Result<Artifact> {
|
|
||||||
let artifact = sqlx::query_as::<_, Artifact>(
|
|
||||||
"SELECT
|
|
||||||
*
|
|
||||||
FROM
|
|
||||||
artifacts
|
|
||||||
WHERE
|
|
||||||
app = $1 AND
|
|
||||||
branch = $2
|
|
||||||
ORDER BY created_at DESC
|
|
||||||
LIMIT 1",
|
|
||||||
)
|
|
||||||
.bind(get_latest_artifact.app)
|
|
||||||
.bind(get_latest_artifact.branch)
|
|
||||||
.fetch_one(&self.db)
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
Ok(artifact)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Clone)]
|
|
||||||
pub struct ArtifactsDB {
|
|
||||||
inner: Arc<dyn traits::ArtifactsDB + Send + Sync>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl ArtifactsDB {
|
|
||||||
pub fn new(db: PgPool) -> Self {
|
|
||||||
Self {
|
|
||||||
inner: Arc::new(DefaultArtifactsDB::new(db)),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl std::ops::Deref for ArtifactsDB {
|
|
||||||
type Target = Arc<dyn traits::ArtifactsDB + Send + Sync>;
|
|
||||||
|
|
||||||
fn deref(&self) -> &Self::Target {
|
|
||||||
&self.inner
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub mod extensions {
|
|
||||||
use crate::app::App;
|
|
||||||
|
|
||||||
use super::ArtifactsDB;
|
|
||||||
|
|
||||||
pub trait ArtifactsDBExt {
|
|
||||||
fn artifacts_db(&self) -> ArtifactsDB;
|
|
||||||
}
|
|
||||||
|
|
||||||
impl ArtifactsDBExt for App {
|
|
||||||
fn artifacts_db(&self) -> ArtifactsDB {
|
|
||||||
ArtifactsDB::new(self.database.clone())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,23 +0,0 @@
|
|||||||
use std::collections::HashMap;
|
|
||||||
|
|
||||||
#[derive(Default)]
|
|
||||||
pub struct ClusterList {}
|
|
||||||
|
|
||||||
impl ClusterList {
|
|
||||||
pub async fn get_list(&self) -> anyhow::Result<HashMap<String, Vec<String>>> {
|
|
||||||
Ok(HashMap::from([
|
|
||||||
("dev".into(), vec!["clank-dev".into()]),
|
|
||||||
("prod".into(), vec!["clank-prod".into()]),
|
|
||||||
]))
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn get(&self, environment: &str) -> anyhow::Result<Option<Vec<String>>> {
|
|
||||||
let list = self.get_list().await?;
|
|
||||||
|
|
||||||
if let Some(x) = list.get(environment) {
|
|
||||||
Ok(Some(x.clone()))
|
|
||||||
} else {
|
|
||||||
Ok(None)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
@ -5,7 +5,12 @@ pub struct DomainEvents {
|
|||||||
nats: Nats,
|
nats: Nats,
|
||||||
}
|
}
|
||||||
|
|
||||||
use crate::app::infra::nats::Nats;
|
#[cfg(test)]
|
||||||
|
use mockall::{automock, mock, predicate::*};
|
||||||
|
|
||||||
|
use crate::app::infra::{nats::Nats};
|
||||||
|
|
||||||
|
#[cfg_attr(test, automock)]
|
||||||
impl DomainEvents {
|
impl DomainEvents {
|
||||||
pub fn new(nats: Nats) -> Self {
|
pub fn new(nats: Nats) -> Self {
|
||||||
Self { nats }
|
Self { nats }
|
||||||
|
@ -1,5 +1,6 @@
|
|||||||
use crate::app::SharedApp;
|
use crate::app::SharedApp;
|
||||||
|
|
||||||
|
#[mockall_double::double]
|
||||||
use super::DomainEvents;
|
use super::DomainEvents;
|
||||||
|
|
||||||
pub trait DomainEventsExt {
|
pub trait DomainEventsExt {
|
||||||
|
@ -1,18 +1,18 @@
|
|||||||
#[derive(Clone, Default)]
|
#[derive(Clone)]
|
||||||
pub struct FileReader {}
|
pub struct FileReader {}
|
||||||
|
|
||||||
use std::{
|
use std::{collections::BTreeMap, path::PathBuf};
|
||||||
collections::BTreeMap,
|
|
||||||
path::{Path, PathBuf},
|
|
||||||
};
|
|
||||||
|
|
||||||
pub mod extensions;
|
pub mod extensions;
|
||||||
|
|
||||||
use anyhow::{anyhow, Context};
|
use anyhow::anyhow;
|
||||||
|
#[cfg(test)]
|
||||||
|
use mockall::{automock, mock, predicate::*};
|
||||||
|
|
||||||
|
#[cfg_attr(test, automock)]
|
||||||
impl FileReader {
|
impl FileReader {
|
||||||
pub fn new() -> Self {
|
pub fn new() -> Self {
|
||||||
Self::default()
|
Self {}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn read_files(&self, location: PathBuf) -> anyhow::Result<Files> {
|
pub async fn read_files(&self, location: PathBuf) -> anyhow::Result<Files> {
|
||||||
@ -20,9 +20,7 @@ impl FileReader {
|
|||||||
|
|
||||||
let mut clusters: BTreeMap<String, Vec<File>> = BTreeMap::new();
|
let mut clusters: BTreeMap<String, Vec<File>> = BTreeMap::new();
|
||||||
|
|
||||||
let mut dir = tokio::fs::read_dir(&location)
|
let mut dir = tokio::fs::read_dir(&location).await?;
|
||||||
.await
|
|
||||||
.context(format!("failed to find location: {}", &location.display()))?;
|
|
||||||
while let Some(dir_entry) = dir.next_entry().await? {
|
while let Some(dir_entry) = dir.next_entry().await? {
|
||||||
if dir_entry.metadata().await?.is_dir() {
|
if dir_entry.metadata().await?.is_dir() {
|
||||||
tracing::trace!("found cluster in: {}", dir_entry.path().display());
|
tracing::trace!("found cluster in: {}", dir_entry.path().display());
|
||||||
@ -51,13 +49,9 @@ impl FileReader {
|
|||||||
cluster_name
|
cluster_name
|
||||||
);
|
);
|
||||||
|
|
||||||
if file.path().is_absolute() {
|
|
||||||
files.push((file.path(), file.path().strip_prefix(&location)?).into())
|
|
||||||
} else {
|
|
||||||
files.push(file.into_path().into())
|
files.push(file.into_path().into())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
Ok(clusters.into())
|
Ok(clusters.into())
|
||||||
}
|
}
|
||||||
@ -66,32 +60,11 @@ impl FileReader {
|
|||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
pub struct File {
|
pub struct File {
|
||||||
pub path: PathBuf,
|
pub path: PathBuf,
|
||||||
pub relative: Option<PathBuf>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl From<PathBuf> for File {
|
impl From<PathBuf> for File {
|
||||||
fn from(value: PathBuf) -> Self {
|
fn from(value: PathBuf) -> Self {
|
||||||
Self {
|
Self { path: value }
|
||||||
path: value,
|
|
||||||
relative: None,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl From<(PathBuf, PathBuf)> for File {
|
|
||||||
fn from(value: (PathBuf, PathBuf)) -> Self {
|
|
||||||
Self {
|
|
||||||
path: value.0,
|
|
||||||
relative: Some(value.1),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
impl From<(&Path, &Path)> for File {
|
|
||||||
fn from(value: (&Path, &Path)) -> Self {
|
|
||||||
Self {
|
|
||||||
path: value.0.to_path_buf(),
|
|
||||||
relative: Some(value.1.to_path_buf()),
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -119,8 +92,14 @@ impl From<Files> for Vec<File> {
|
|||||||
value
|
value
|
||||||
.iter()
|
.iter()
|
||||||
.map(|(cluster_name, files)| (PathBuf::from(cluster_name), files))
|
.map(|(cluster_name, files)| (PathBuf::from(cluster_name), files))
|
||||||
.flat_map(|(_cluster_name, files)| files.to_vec())
|
.flat_map(|(cluster_name, files)| {
|
||||||
// .map(|f| f.into())
|
files
|
||||||
|
.iter()
|
||||||
|
//.map(|file_path| cluster_name.join(&file_path.path))
|
||||||
|
.map(|file_path| file_path.path.clone())
|
||||||
|
.collect::<Vec<_>>()
|
||||||
|
})
|
||||||
|
.map(|f| f.into())
|
||||||
.collect::<Vec<_>>()
|
.collect::<Vec<_>>()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,5 +1,6 @@
|
|||||||
use crate::app::{SharedApp, SharedLocalApp};
|
use crate::app::SharedApp;
|
||||||
|
|
||||||
|
#[mockall_double::double]
|
||||||
use super::FileReader;
|
use super::FileReader;
|
||||||
|
|
||||||
pub trait FileReaderExt {
|
pub trait FileReaderExt {
|
||||||
@ -11,9 +12,3 @@ impl FileReaderExt for SharedApp {
|
|||||||
FileReader::new()
|
FileReader::new()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl FileReaderExt for SharedLocalApp {
|
|
||||||
fn file_reader(&self) -> FileReader {
|
|
||||||
FileReader::new()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
@ -1,25 +1,23 @@
|
|||||||
use std::{env::temp_dir, path::PathBuf};
|
use std::path::PathBuf;
|
||||||
|
|
||||||
use super::release_manager::models::{ArtifactID, UploadArtifactID};
|
use super::release_manager::models::ArtifactID;
|
||||||
|
|
||||||
pub mod extensions;
|
pub mod extensions;
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct FileStore {
|
pub struct FileStore {
|
||||||
client: aws_sdk_s3::Client,
|
client: aws_sdk_s3::Client,
|
||||||
|
|
||||||
bucket: String,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
use aws_sdk_s3::primitives::ByteStream;
|
use aws_sdk_s3::primitives::ByteStream;
|
||||||
use tokio::io::BufReader;
|
#[cfg(test)]
|
||||||
|
use mockall::{automock, mock, predicate::*};
|
||||||
|
use tokio::io::AsyncReadExt;
|
||||||
|
|
||||||
|
#[cfg_attr(test, automock)]
|
||||||
impl FileStore {
|
impl FileStore {
|
||||||
pub fn new(client: aws_sdk_s3::Client) -> Self {
|
pub fn new(client: aws_sdk_s3::Client) -> Self {
|
||||||
Self {
|
Self { client }
|
||||||
client,
|
|
||||||
bucket: std::env::var("BUCKET_NAME").unwrap_or("flux-releaser".into()),
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn upload_file(&self, artifact_id: ArtifactID, file: PathBuf) -> anyhow::Result<()> {
|
pub async fn upload_file(&self, artifact_id: ArtifactID, file: PathBuf) -> anyhow::Result<()> {
|
||||||
@ -27,7 +25,7 @@ impl FileStore {
|
|||||||
|
|
||||||
self.client
|
self.client
|
||||||
.put_object()
|
.put_object()
|
||||||
.bucket(&self.bucket)
|
.bucket("mybucket")
|
||||||
.key(format!("archives/{}.tar", &artifact_id.to_string()))
|
.key(format!("archives/{}.tar", &artifact_id.to_string()))
|
||||||
.body(ByteStream::from_path(file).await?)
|
.body(ByteStream::from_path(file).await?)
|
||||||
.send()
|
.send()
|
||||||
@ -35,74 +33,4 @@ impl FileStore {
|
|||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn upload_temp(&self, id: UploadArtifactID, file: PathBuf) -> anyhow::Result<()> {
|
|
||||||
tracing::trace!("uploading temp files: {}", id.to_string());
|
|
||||||
|
|
||||||
self.client
|
|
||||||
.put_object()
|
|
||||||
.bucket(&self.bucket)
|
|
||||||
.key(format!("temp/{}.tar", &id.to_string()))
|
|
||||||
.body(ByteStream::from_path(file).await?)
|
|
||||||
.send()
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn get_archive(&self, artifact_id: ArtifactID) -> anyhow::Result<PathBuf> {
|
|
||||||
tracing::trace!("getting archive: {}", artifact_id.to_string());
|
|
||||||
|
|
||||||
let archive_name = format!("archives/{}.tar", &artifact_id.to_string());
|
|
||||||
|
|
||||||
let obj = self
|
|
||||||
.client
|
|
||||||
.get_object()
|
|
||||||
.bucket(&self.bucket)
|
|
||||||
.key(&archive_name)
|
|
||||||
.send()
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
let archive_path = temp_dir()
|
|
||||||
.join("flux_releaser")
|
|
||||||
.join("cache")
|
|
||||||
.join(&archive_name);
|
|
||||||
tokio::fs::create_dir_all(archive_path.parent().unwrap()).await?;
|
|
||||||
let mut archive_file = tokio::fs::File::create(&archive_path).await?;
|
|
||||||
let mut buf_reader = BufReader::new(obj.body.into_async_read());
|
|
||||||
|
|
||||||
tokio::io::copy(&mut buf_reader, &mut archive_file).await?;
|
|
||||||
|
|
||||||
tracing::debug!("created archive: {}", archive_path.display());
|
|
||||||
|
|
||||||
Ok(archive_path)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn get_temp(&self, artifact_id: UploadArtifactID) -> anyhow::Result<PathBuf> {
|
|
||||||
tracing::trace!("getting archive: {}", artifact_id.to_string());
|
|
||||||
|
|
||||||
let archive_name = format!("temp/{}.tar", &artifact_id.to_string());
|
|
||||||
|
|
||||||
let obj = self
|
|
||||||
.client
|
|
||||||
.get_object()
|
|
||||||
.bucket(&self.bucket)
|
|
||||||
.key(&archive_name)
|
|
||||||
.send()
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
let archive_path = temp_dir()
|
|
||||||
.join("flux_releaser")
|
|
||||||
.join("downloads/cache")
|
|
||||||
.join(&archive_name);
|
|
||||||
tokio::fs::create_dir_all(archive_path.parent().unwrap()).await?;
|
|
||||||
let mut archive_file = tokio::fs::File::create(&archive_path).await?;
|
|
||||||
let mut buf_reader = BufReader::new(obj.body.into_async_read());
|
|
||||||
|
|
||||||
tokio::io::copy(&mut buf_reader, &mut archive_file).await?;
|
|
||||||
|
|
||||||
tracing::debug!("created archive: {}", archive_path.display());
|
|
||||||
|
|
||||||
Ok(archive_path)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
@ -1,5 +1,6 @@
|
|||||||
use crate::app::SharedApp;
|
use crate::app::SharedApp;
|
||||||
|
|
||||||
|
#[mockall_double::double]
|
||||||
use super::FileStore;
|
use super::FileStore;
|
||||||
|
|
||||||
pub trait FileStoreExt {
|
pub trait FileStoreExt {
|
||||||
|
@ -1,138 +0,0 @@
|
|||||||
use std::path::PathBuf;
|
|
||||||
|
|
||||||
use anyhow::Context;
|
|
||||||
|
|
||||||
use crate::{
|
|
||||||
app::infra::grpc::FluxReleaserGrpcClient,
|
|
||||||
grpc::gen::{CommitArtifactRequest, TriggerReleaseRequest},
|
|
||||||
};
|
|
||||||
|
|
||||||
use super::{
|
|
||||||
archive::Archive,
|
|
||||||
file_reader::FileReader,
|
|
||||||
flux_releaser_uploader::FluxReleaserUploader,
|
|
||||||
release_manager::models::{ArtifactID, UploadArtifactID},
|
|
||||||
};
|
|
||||||
|
|
||||||
pub struct FluxLocalClusterManager {
|
|
||||||
file_reader: FileReader,
|
|
||||||
archive: Archive,
|
|
||||||
flux_uploader: FluxReleaserUploader,
|
|
||||||
flux_releaser_client: FluxReleaserGrpcClient,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl std::fmt::Debug for FluxLocalClusterManager {
|
|
||||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl FluxLocalClusterManager {
|
|
||||||
pub fn new(
|
|
||||||
file_reader: FileReader,
|
|
||||||
archive: Archive,
|
|
||||||
flux_uploader: FluxReleaserUploader,
|
|
||||||
flux_releaser_client: FluxReleaserGrpcClient,
|
|
||||||
) -> Self {
|
|
||||||
Self {
|
|
||||||
file_reader,
|
|
||||||
archive,
|
|
||||||
flux_uploader,
|
|
||||||
flux_releaser_client,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tracing::instrument(skip(include))]
|
|
||||||
pub async fn package_clusters(
|
|
||||||
&self,
|
|
||||||
include: impl Into<PathBuf>,
|
|
||||||
) -> anyhow::Result<UploadArtifactID> {
|
|
||||||
let include = include.into();
|
|
||||||
|
|
||||||
tracing::debug!("reading files");
|
|
||||||
let files = self.file_reader.read_files(include.clone()).await?;
|
|
||||||
tracing::debug!("creating archive");
|
|
||||||
let archive = self.archive.create_archive(&include, files).await?;
|
|
||||||
tracing::debug!("uploading archive");
|
|
||||||
let upload_id = self.flux_uploader.upload_archive(archive).await?;
|
|
||||||
|
|
||||||
tracing::debug!("done packaging clusters");
|
|
||||||
|
|
||||||
Ok(upload_id)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn commit_artifact(
|
|
||||||
&self,
|
|
||||||
app: impl Into<String>,
|
|
||||||
branch: impl Into<String>,
|
|
||||||
upload_id: UploadArtifactID,
|
|
||||||
) -> anyhow::Result<ArtifactID> {
|
|
||||||
let artifact_id = self
|
|
||||||
.flux_releaser_client
|
|
||||||
.lock()
|
|
||||||
.await
|
|
||||||
.commit_artifact(tonic::Request::new(CommitArtifactRequest {
|
|
||||||
app: app.into(),
|
|
||||||
branch: branch.into(),
|
|
||||||
upload_id: upload_id.to_string(),
|
|
||||||
}))
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
artifact_id.into_inner().artifact_id.try_into()
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn trigger_release(
|
|
||||||
&self,
|
|
||||||
app: impl Into<String>,
|
|
||||||
branch: impl Into<String>,
|
|
||||||
) -> anyhow::Result<()> {
|
|
||||||
self.flux_releaser_client
|
|
||||||
.lock()
|
|
||||||
.await
|
|
||||||
.trigger_release(tonic::Request::new(TriggerReleaseRequest {
|
|
||||||
app: app.into(),
|
|
||||||
branch: branch.into(),
|
|
||||||
}))
|
|
||||||
.await
|
|
||||||
.context("failed to trigger release")?;
|
|
||||||
|
|
||||||
// Send release proto to upstream
|
|
||||||
|
|
||||||
// 1. find app by app + branch
|
|
||||||
|
|
||||||
// 2. Unpack latest artifact by app + branch
|
|
||||||
|
|
||||||
// 3. Unpack by cluster
|
|
||||||
|
|
||||||
// 4. Upload
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub mod extensions {
|
|
||||||
use crate::{
|
|
||||||
app::SharedLocalApp,
|
|
||||||
services::{
|
|
||||||
archive::extensions::ArchiveExt, file_reader::extensions::FileReaderExt,
|
|
||||||
flux_releaser_uploader::extensions::FluxReleaserUploaderExt,
|
|
||||||
},
|
|
||||||
};
|
|
||||||
|
|
||||||
use super::FluxLocalClusterManager;
|
|
||||||
|
|
||||||
pub trait FluxLocalClusterManagerExt {
|
|
||||||
fn flux_local_cluster_manager(&self) -> FluxLocalClusterManager;
|
|
||||||
}
|
|
||||||
|
|
||||||
impl FluxLocalClusterManagerExt for SharedLocalApp {
|
|
||||||
fn flux_local_cluster_manager(&self) -> FluxLocalClusterManager {
|
|
||||||
FluxLocalClusterManager::new(
|
|
||||||
self.file_reader(),
|
|
||||||
self.archive(),
|
|
||||||
self.flux_releaser_uploader(),
|
|
||||||
self.grpc_client.clone(),
|
|
||||||
)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,54 +0,0 @@
|
|||||||
use tonic::transport::Channel;
|
|
||||||
|
|
||||||
use crate::{
|
|
||||||
app::infra::grpc::FluxReleaserGrpcClient,
|
|
||||||
grpc::gen::{flux_releaser_client::FluxReleaserClient, UploadArtifactRequest},
|
|
||||||
};
|
|
||||||
|
|
||||||
use super::{archive::ArchiveFile, release_manager::models::UploadArtifactID};
|
|
||||||
|
|
||||||
pub struct FluxReleaserUploader {
|
|
||||||
flux_client: FluxReleaserGrpcClient,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl FluxReleaserUploader {
|
|
||||||
pub fn new(flux_client: FluxReleaserGrpcClient) -> Self {
|
|
||||||
Self { flux_client }
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn upload_archive(&self, archive: ArchiveFile) -> anyhow::Result<UploadArtifactID> {
|
|
||||||
let chunks = archive
|
|
||||||
.content
|
|
||||||
.chunks(1_000_000) // Slice by about 1MB
|
|
||||||
.map(|ch| UploadArtifactRequest {
|
|
||||||
content: ch.to_vec(),
|
|
||||||
})
|
|
||||||
.collect::<Vec<_>>();
|
|
||||||
|
|
||||||
let iter = tokio_stream::iter(chunks);
|
|
||||||
let resp = self
|
|
||||||
.flux_client
|
|
||||||
.lock()
|
|
||||||
.await
|
|
||||||
.upload_artifact(tonic::Request::new(iter))
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
resp.into_inner().upload_id.try_into()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub mod extensions {
|
|
||||||
use crate::app::SharedLocalApp;
|
|
||||||
|
|
||||||
use super::FluxReleaserUploader;
|
|
||||||
|
|
||||||
pub trait FluxReleaserUploaderExt {
|
|
||||||
fn flux_releaser_uploader(&self) -> FluxReleaserUploader;
|
|
||||||
}
|
|
||||||
|
|
||||||
impl FluxReleaserUploaderExt for SharedLocalApp {
|
|
||||||
fn flux_releaser_uploader(&self) -> FluxReleaserUploader {
|
|
||||||
FluxReleaserUploader::new(self.grpc_client.clone())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,337 +0,0 @@
|
|||||||
use std::{
|
|
||||||
path::{Path, PathBuf},
|
|
||||||
sync::Arc,
|
|
||||||
};
|
|
||||||
|
|
||||||
use anyhow::Context;
|
|
||||||
use git2::{
|
|
||||||
build::{CheckoutBuilder, RepoBuilder},
|
|
||||||
Cred, FetchOptions, IndexAddOption, PushOptions, RemoteCallbacks, Repository, ResetType,
|
|
||||||
Signature,
|
|
||||||
};
|
|
||||||
use tokio::{io::AsyncWriteExt, sync::Mutex};
|
|
||||||
|
|
||||||
use super::{
|
|
||||||
archive::{Archive, ArchiveFile},
|
|
||||||
cluster_list::ClusterList,
|
|
||||||
};
|
|
||||||
|
|
||||||
#[derive(Clone)]
|
|
||||||
pub struct SharedGit {
|
|
||||||
inner: Arc<Git>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl From<Git> for SharedGit {
|
|
||||||
fn from(value: Git) -> Self {
|
|
||||||
Self {
|
|
||||||
inner: Arc::new(value),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl std::ops::Deref for SharedGit {
|
|
||||||
type Target = Git;
|
|
||||||
|
|
||||||
fn deref(&self) -> &Self::Target {
|
|
||||||
&self.inner
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct Git {
|
|
||||||
location: Mutex<PathBuf>,
|
|
||||||
|
|
||||||
registry: String,
|
|
||||||
|
|
||||||
cluster_list: ClusterList,
|
|
||||||
archive: Archive,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Git {
|
|
||||||
pub fn new(registry: String, cluster_list: ClusterList, archive: Archive) -> Self {
|
|
||||||
Self {
|
|
||||||
registry,
|
|
||||||
|
|
||||||
location: Mutex::new(std::env::temp_dir().join("flux_releaser")),
|
|
||||||
|
|
||||||
cluster_list,
|
|
||||||
archive,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn publish(
|
|
||||||
&self,
|
|
||||||
archive: &ArchiveFile,
|
|
||||||
service: &str,
|
|
||||||
namespace: &str,
|
|
||||||
environment: &str,
|
|
||||||
) -> anyhow::Result<()> {
|
|
||||||
// TODO: implement exponential backoff and 5 attempts
|
|
||||||
|
|
||||||
self.publish_attempt(archive, service, namespace, environment)
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn publish_attempt(
|
|
||||||
&self,
|
|
||||||
archive: &ArchiveFile,
|
|
||||||
service: &str,
|
|
||||||
namespace: &str,
|
|
||||||
environment: &str,
|
|
||||||
) -> anyhow::Result<()> {
|
|
||||||
// 1. Clone repo into location (with lock) or update
|
|
||||||
let location = self.location.lock().await;
|
|
||||||
|
|
||||||
let repo_dir = location.join("repo");
|
|
||||||
let repo = match self.get_repo(&repo_dir).await? {
|
|
||||||
//TODO: Possible handle error by just recloning the repo
|
|
||||||
None => self.clone_repo(&repo_dir).await?,
|
|
||||||
Some(repo) => {
|
|
||||||
self.update_repo(repo).await?;
|
|
||||||
self.get_repo(&repo_dir)
|
|
||||||
.await?
|
|
||||||
.ok_or(anyhow::anyhow!("failed to open repository"))?
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
// 2. Extract from archive
|
|
||||||
// TODO: maybe pad archive with hash or something
|
|
||||||
let unpack_dir = location.join("archive");
|
|
||||||
self.archive.unpack_archive(archive, &unpack_dir).await?;
|
|
||||||
|
|
||||||
// 3. Splat tar over application and cluster
|
|
||||||
// The archive should always be structured like so:
|
|
||||||
// - <environment>/
|
|
||||||
// - * manifests
|
|
||||||
|
|
||||||
// 3a. prepare git repo for new files
|
|
||||||
let clusters = self
|
|
||||||
.cluster_list
|
|
||||||
.get(environment)
|
|
||||||
.await?
|
|
||||||
.ok_or(anyhow::anyhow!(
|
|
||||||
"environment is not registered: {} in cluster list",
|
|
||||||
environment
|
|
||||||
))?;
|
|
||||||
|
|
||||||
for cluster in clusters {
|
|
||||||
let service_entry = repo_dir
|
|
||||||
.join("deployments")
|
|
||||||
.join(&cluster)
|
|
||||||
.join(namespace)
|
|
||||||
.join(service);
|
|
||||||
|
|
||||||
if service_entry.exists() {
|
|
||||||
if let Err(e) = tokio::fs::remove_dir_all(&service_entry).await {
|
|
||||||
tracing::warn!("failed to remove existing dir: {}", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
tokio::fs::create_dir_all(&service_entry).await?;
|
|
||||||
|
|
||||||
let cluster_entry = repo_dir
|
|
||||||
.join("clusters")
|
|
||||||
.join(&cluster)
|
|
||||||
.join(namespace)
|
|
||||||
.join(service);
|
|
||||||
|
|
||||||
if cluster_entry.exists() {
|
|
||||||
if let Err(e) = tokio::fs::remove_dir_all(&cluster_entry).await {
|
|
||||||
tracing::warn!("failed to remove existing dir: {}", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
tokio::fs::create_dir_all(&cluster_entry).await?;
|
|
||||||
|
|
||||||
let archive_dir = unpack_dir.join(environment);
|
|
||||||
if !archive_dir.exists() {
|
|
||||||
anyhow::bail!("selected environment is not published for archive");
|
|
||||||
}
|
|
||||||
|
|
||||||
let mut read_dir = tokio::fs::read_dir(archive_dir).await?;
|
|
||||||
while let Some(entry) = read_dir.next_entry().await? {
|
|
||||||
if entry.metadata().await?.is_file() {
|
|
||||||
let entry_path = entry.path();
|
|
||||||
let dest_path = service_entry.join(entry.file_name());
|
|
||||||
tokio::fs::copy(entry_path, dest_path).await?;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
let cluster_entry_file = cluster_entry.join(format!("{service}.yaml"));
|
|
||||||
let mut cluster_entry_file = tokio::fs::File::create(cluster_entry_file).await?;
|
|
||||||
let file_contents = format!(
|
|
||||||
r#"
|
|
||||||
apiVersion: kustomize.toolkit.fluxcd.io/v1beta2
|
|
||||||
kind: Kustomization
|
|
||||||
metadata:
|
|
||||||
name: {service}
|
|
||||||
namespace: flux-system
|
|
||||||
spec:
|
|
||||||
interval: 1m
|
|
||||||
retryInterval: 30s
|
|
||||||
path: ./deployments/{cluster}/{namespace}/{service}
|
|
||||||
prune: true
|
|
||||||
sourceRef:
|
|
||||||
kind: GitRepository
|
|
||||||
name: flux-system
|
|
||||||
namespace: flux-system
|
|
||||||
"#
|
|
||||||
);
|
|
||||||
|
|
||||||
cluster_entry_file
|
|
||||||
.write_all(file_contents.as_bytes())
|
|
||||||
.await?;
|
|
||||||
}
|
|
||||||
|
|
||||||
// 4. Commit && Push
|
|
||||||
self.commit_and_push(repo, environment, service).await?;
|
|
||||||
|
|
||||||
// 5. Cleanup
|
|
||||||
tokio::fs::remove_dir_all(unpack_dir).await?;
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn get_repo(&self, location: &Path) -> anyhow::Result<Option<Repository>> {
|
|
||||||
match Repository::open(location) {
|
|
||||||
Ok(r) => Ok(Some(r)),
|
|
||||||
Err(e) => match e.code() {
|
|
||||||
git2::ErrorCode::NotFound => Ok(None),
|
|
||||||
_ => Err(e).context("failed to open git repository"),
|
|
||||||
},
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn clone_repo(&self, location: &Path) -> anyhow::Result<Repository> {
|
|
||||||
let co = CheckoutBuilder::new();
|
|
||||||
let mut fo = FetchOptions::new();
|
|
||||||
fo.remote_callbacks(self.get_cred()?);
|
|
||||||
let repo = RepoBuilder::new()
|
|
||||||
.fetch_options(fo)
|
|
||||||
.with_checkout(co)
|
|
||||||
.clone(&self.registry, location)
|
|
||||||
.context("failed to clone repository")?;
|
|
||||||
|
|
||||||
Ok(repo)
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn update_repo(&self, repository: Repository) -> anyhow::Result<()> {
|
|
||||||
let mut remote = repository.find_remote("origin")?;
|
|
||||||
let mut fo = FetchOptions::new();
|
|
||||||
fo.remote_callbacks(self.get_cred()?);
|
|
||||||
remote
|
|
||||||
.fetch(&["main"], Some(&mut fo), None)
|
|
||||||
.context("failed to update repo")?;
|
|
||||||
|
|
||||||
let origin_head = repository.find_reference("refs/remotes/origin/HEAD")?;
|
|
||||||
let origin_head_commit = origin_head.peel_to_commit()?;
|
|
||||||
|
|
||||||
// Perform a hard reset to the origin's HEAD
|
|
||||||
repository.reset(origin_head_commit.as_object(), ResetType::Hard, None)?;
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
fn get_cred(&self) -> anyhow::Result<RemoteCallbacks> {
|
|
||||||
let mut cb = RemoteCallbacks::new();
|
|
||||||
cb.credentials(|_, username, _| {
|
|
||||||
if let Ok(_sock) = std::env::var("SSH_AUTH_SOCK") {
|
|
||||||
return Cred::ssh_key_from_agent(username.unwrap_or("git"));
|
|
||||||
}
|
|
||||||
let username = std::env::var("GIT_USERNAME").expect("GIT_USERNAME to be set");
|
|
||||||
let password = std::env::var("GIT_PASSWORD").expect("GIT_PASSWORD to be set");
|
|
||||||
Cred::userpass_plaintext(&username, &password)
|
|
||||||
});
|
|
||||||
cb.certificate_check(|_cert, _| Ok(git2::CertificateCheckStatus::CertificateOk));
|
|
||||||
|
|
||||||
Ok(cb)
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn commit_and_push(
|
|
||||||
&self,
|
|
||||||
repo: Repository,
|
|
||||||
environment: &str,
|
|
||||||
service: &str,
|
|
||||||
) -> anyhow::Result<()> {
|
|
||||||
let mut index = repo.index()?;
|
|
||||||
|
|
||||||
// Add all files to the index
|
|
||||||
index.add_all(["*"].iter(), IndexAddOption::DEFAULT, None)?;
|
|
||||||
index.write()?;
|
|
||||||
|
|
||||||
// Create a tree from the index
|
|
||||||
let oid = index.write_tree()?;
|
|
||||||
let tree = repo.find_tree(oid)?;
|
|
||||||
|
|
||||||
// Get the current HEAD commit
|
|
||||||
let parent_commit = repo.head()?.peel_to_commit()?;
|
|
||||||
|
|
||||||
// Create a signature
|
|
||||||
let sig = Signature::now("flux_releaser", "operations+flux-releaser@kjuulh.io")?;
|
|
||||||
|
|
||||||
// Create the commit
|
|
||||||
repo.commit(
|
|
||||||
Some("HEAD"),
|
|
||||||
&sig,
|
|
||||||
&sig,
|
|
||||||
&format!("chore({environment}/{service}): releasing service"),
|
|
||||||
&tree,
|
|
||||||
&[&parent_commit],
|
|
||||||
)?;
|
|
||||||
|
|
||||||
let mut remote = repo.find_remote("origin")?;
|
|
||||||
let mut po = PushOptions::new();
|
|
||||||
po.remote_callbacks(self.get_cred()?);
|
|
||||||
|
|
||||||
remote.push(
|
|
||||||
&[&format!("refs/heads/{}:refs/heads/{}", "main", "main")],
|
|
||||||
Some(&mut po),
|
|
||||||
)?;
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(test)]
|
|
||||||
mod test {
|
|
||||||
use tokio::io::AsyncReadExt;
|
|
||||||
use uuid::Uuid;
|
|
||||||
|
|
||||||
use crate::services::{
|
|
||||||
archive::{Archive, ArchiveFile},
|
|
||||||
cluster_list::ClusterList,
|
|
||||||
git::Git,
|
|
||||||
};
|
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn can_clone_upstream() -> anyhow::Result<()> {
|
|
||||||
// FIXME: right now CI doesn't support git
|
|
||||||
return Ok(());
|
|
||||||
let random = Uuid::new_v4().to_string();
|
|
||||||
println!("running test for id: {}", random);
|
|
||||||
|
|
||||||
println!("current_dir: {}", std::env::current_dir()?.display());
|
|
||||||
let mut arch = tokio::fs::File::open("testdata/example.tar").await?;
|
|
||||||
let mut dest = Vec::new();
|
|
||||||
arch.read_to_end(&mut dest).await?;
|
|
||||||
|
|
||||||
let git = Git::new(
|
|
||||||
"ssh://git@git.front.kjuulh.io/kjuulh/clank-clusters.git".into(),
|
|
||||||
ClusterList::default(),
|
|
||||||
Archive::default(),
|
|
||||||
);
|
|
||||||
let mut location = git.location.lock().await;
|
|
||||||
*location = location.join(random);
|
|
||||||
println!("into: {}", location.display());
|
|
||||||
drop(location);
|
|
||||||
|
|
||||||
git.publish(
|
|
||||||
&ArchiveFile { content: dest },
|
|
||||||
"flux-releaser-test",
|
|
||||||
"dev",
|
|
||||||
"dev",
|
|
||||||
)
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,58 +1,50 @@
|
|||||||
use serde::Serialize;
|
use serde::Serialize;
|
||||||
|
|
||||||
use crate::services::archive::{Archive, ArchiveFile};
|
use crate::services::archive::extensions::ArchiveUploadExt;
|
||||||
use crate::services::artifacts_db::{AddCommitArtifact, GetLatestArtifact};
|
#[mockall_double::double]
|
||||||
use crate::services::file_store::FileStore;
|
use crate::services::file_store::FileStore;
|
||||||
|
|
||||||
use super::artifacts_db::ArtifactsDB;
|
#[mockall_double::double]
|
||||||
|
use super::archive::Archive;
|
||||||
|
#[mockall_double::double]
|
||||||
use super::domain_events::DomainEvents;
|
use super::domain_events::DomainEvents;
|
||||||
use super::git::SharedGit;
|
#[mockall_double::double]
|
||||||
|
use super::file_reader::FileReader;
|
||||||
|
|
||||||
use self::models::*;
|
use self::models::{ArtifactID, CommitArtifact};
|
||||||
|
|
||||||
pub struct ReleaseManager {
|
pub struct ReleaseManager {
|
||||||
|
archive: Archive,
|
||||||
|
file_reader: FileReader,
|
||||||
file_store: FileStore,
|
file_store: FileStore,
|
||||||
domain_events: DomainEvents,
|
domain_events: DomainEvents,
|
||||||
artifacts_db: ArtifactsDB,
|
|
||||||
git: SharedGit,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ReleaseManager {
|
impl ReleaseManager {
|
||||||
pub fn new(
|
pub fn new(
|
||||||
|
file_reader: FileReader,
|
||||||
file_store: FileStore,
|
file_store: FileStore,
|
||||||
|
archive: Archive,
|
||||||
domain_events: DomainEvents,
|
domain_events: DomainEvents,
|
||||||
artifacts_db: ArtifactsDB,
|
|
||||||
git: SharedGit,
|
|
||||||
) -> Self {
|
) -> Self {
|
||||||
Self {
|
Self {
|
||||||
|
archive,
|
||||||
|
file_reader,
|
||||||
file_store,
|
file_store,
|
||||||
domain_events,
|
domain_events,
|
||||||
artifacts_db,
|
|
||||||
git,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn upload_artifact(
|
|
||||||
&self,
|
|
||||||
request: UploadArtifact,
|
|
||||||
) -> anyhow::Result<UploadArtifactID> {
|
|
||||||
let upload_id = uuid::Uuid::now_v7();
|
|
||||||
|
|
||||||
self.file_store
|
|
||||||
.upload_temp(upload_id.into(), request.file_path)
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
Ok(upload_id.into())
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn commit_artifact(&self, request: CommitArtifact) -> anyhow::Result<ArtifactID> {
|
pub async fn commit_artifact(&self, request: CommitArtifact) -> anyhow::Result<ArtifactID> {
|
||||||
tracing::debug!("committing artifact: {:?}", request);
|
tracing::debug!("committing artifact: {:?}", request);
|
||||||
let artifact_id = ArtifactID::new();
|
let artifact_id = ArtifactID::new();
|
||||||
|
|
||||||
let artifact = self.file_store.get_temp(request.upload_id).await?;
|
let files = self.file_reader.read_files(request.folder).await?;
|
||||||
|
|
||||||
|
let archive = self.archive.create_archive(files).await?;
|
||||||
|
|
||||||
self.file_store
|
self.file_store
|
||||||
.upload_file(artifact_id.clone(), artifact)
|
.upload_archive(artifact_id.clone(), archive)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
self.domain_events
|
self.domain_events
|
||||||
@ -61,68 +53,8 @@ impl ReleaseManager {
|
|||||||
})?)
|
})?)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
self.artifacts_db
|
|
||||||
.commit_artifact(AddCommitArtifact {
|
|
||||||
app: request.app,
|
|
||||||
branch: request.branch,
|
|
||||||
artifact_id: artifact_id.clone().into(),
|
|
||||||
})
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
Ok(artifact_id)
|
Ok(artifact_id)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn release(&self, release_req: Release) -> anyhow::Result<()> {
|
|
||||||
tracing::debug!(
|
|
||||||
app = release_req.app,
|
|
||||||
branch = release_req.branch,
|
|
||||||
"releasing latest commit"
|
|
||||||
);
|
|
||||||
|
|
||||||
let latest_artifact = self
|
|
||||||
.artifacts_db
|
|
||||||
.get_latest_artifact(GetLatestArtifact {
|
|
||||||
app: release_req.app.clone(),
|
|
||||||
branch: release_req.branch.clone(),
|
|
||||||
})
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
tracing::trace!("found latest artifact: {:?}", latest_artifact);
|
|
||||||
|
|
||||||
let artifact = self
|
|
||||||
.file_store
|
|
||||||
.get_archive(latest_artifact.artifact_id.into())
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
tracing::trace!("placed artifact in: {}", artifact.display());
|
|
||||||
|
|
||||||
self.domain_events
|
|
||||||
.publish_event(&serde_json::to_string(&PublishedArtifactEvent {
|
|
||||||
artifact_id: latest_artifact.artifact_id.to_string(),
|
|
||||||
app: release_req.app.clone(),
|
|
||||||
branch: release_req.branch.clone(),
|
|
||||||
})?)
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
let artifact_contents = tokio::fs::read(artifact).await?;
|
|
||||||
let env = if release_req.branch == "main" {
|
|
||||||
// FIXME: select prod instead
|
|
||||||
"prod"
|
|
||||||
//"dev"
|
|
||||||
} else {
|
|
||||||
"dev"
|
|
||||||
};
|
|
||||||
self.git
|
|
||||||
.publish(
|
|
||||||
&ArchiveFile::from(artifact_contents),
|
|
||||||
&release_req.app,
|
|
||||||
env,
|
|
||||||
env,
|
|
||||||
)
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Serialize)]
|
#[derive(Serialize)]
|
||||||
@ -130,12 +62,55 @@ pub struct CommittedArtifactEvent {
|
|||||||
artifact_id: String,
|
artifact_id: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Serialize)]
|
|
||||||
pub struct PublishedArtifactEvent {
|
|
||||||
app: String,
|
|
||||||
branch: String,
|
|
||||||
artifact_id: String,
|
|
||||||
}
|
|
||||||
|
|
||||||
pub mod extensions;
|
pub mod extensions;
|
||||||
pub mod models;
|
pub mod models;
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod test {
|
||||||
|
use crate::services::archive::{ArchiveFile, MockArchive};
|
||||||
|
use crate::services::domain_events::MockDomainEvents;
|
||||||
|
use crate::services::file_reader::{Files, MockFileReader};
|
||||||
|
use crate::services::file_store::MockFileStore;
|
||||||
|
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn generated_artifact_id() -> anyhow::Result<()> {
|
||||||
|
let mut file_store = MockFileStore::default();
|
||||||
|
file_store
|
||||||
|
.expect_upload_file()
|
||||||
|
.times(1)
|
||||||
|
.returning(|_, _| Ok(()));
|
||||||
|
|
||||||
|
let mut domain_events = MockDomainEvents::default();
|
||||||
|
domain_events
|
||||||
|
.expect_publish_event()
|
||||||
|
.times(1)
|
||||||
|
.returning(|_| Ok(()));
|
||||||
|
|
||||||
|
let mut file_reader = MockFileReader::default();
|
||||||
|
file_reader
|
||||||
|
.expect_read_files()
|
||||||
|
.times(1)
|
||||||
|
.returning(|_| Ok(Files::default()));
|
||||||
|
|
||||||
|
let mut archive = MockArchive::default();
|
||||||
|
archive.expect_create_archive().times(1).returning(|_| {
|
||||||
|
Ok(ArchiveFile {
|
||||||
|
content: Vec::new(),
|
||||||
|
})
|
||||||
|
});
|
||||||
|
|
||||||
|
let releaser_manager = ReleaseManager::new(file_reader, file_store, archive, domain_events);
|
||||||
|
|
||||||
|
releaser_manager
|
||||||
|
.commit_artifact(CommitArtifact {
|
||||||
|
app: "app".into(),
|
||||||
|
branch: "branch".into(),
|
||||||
|
folder: "someFolder".into(),
|
||||||
|
})
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -1,8 +1,8 @@
|
|||||||
use crate::{
|
use crate::{
|
||||||
app::SharedApp,
|
app::SharedApp,
|
||||||
services::{
|
services::{
|
||||||
artifacts_db::extensions::ArtifactsDBExt, domain_events::extensions::DomainEventsExt,
|
archive::extensions::ArchiveExt, domain_events::extensions::DomainEventsExt,
|
||||||
file_store::extensions::FileStoreExt,
|
file_reader::extensions::FileReaderExt, file_store::extensions::FileStoreExt,
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -15,10 +15,10 @@ pub trait ReleaseManagerExt {
|
|||||||
impl ReleaseManagerExt for SharedApp {
|
impl ReleaseManagerExt for SharedApp {
|
||||||
fn release_manager(&self) -> ReleaseManager {
|
fn release_manager(&self) -> ReleaseManager {
|
||||||
ReleaseManager::new(
|
ReleaseManager::new(
|
||||||
|
self.file_reader(),
|
||||||
self.file_store(),
|
self.file_store(),
|
||||||
|
self.archive(),
|
||||||
self.domain_events(),
|
self.domain_events(),
|
||||||
self.artifacts_db(),
|
|
||||||
self.git.clone(),
|
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,16 +1,10 @@
|
|||||||
use std::{ops::Deref, path::PathBuf};
|
use std::path::PathBuf;
|
||||||
|
|
||||||
#[derive(Clone, Debug)]
|
#[derive(Clone, Debug)]
|
||||||
pub struct CommitArtifact {
|
pub struct CommitArtifact {
|
||||||
pub app: String,
|
pub app: String,
|
||||||
pub branch: String,
|
pub branch: String,
|
||||||
pub upload_id: UploadArtifactID,
|
pub folder: PathBuf,
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Clone, Debug)]
|
|
||||||
pub struct Release {
|
|
||||||
pub app: String,
|
|
||||||
pub branch: String,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
@ -29,61 +23,3 @@ impl std::ops::Deref for ArtifactID {
|
|||||||
&self.0
|
&self.0
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl TryFrom<String> for ArtifactID {
|
|
||||||
type Error = anyhow::Error;
|
|
||||||
|
|
||||||
fn try_from(value: String) -> Result<Self, Self::Error> {
|
|
||||||
let uuid = uuid::Uuid::parse_str(&value)?;
|
|
||||||
|
|
||||||
Ok(ArtifactID(uuid))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl From<uuid::Uuid> for ArtifactID {
|
|
||||||
fn from(value: uuid::Uuid) -> Self {
|
|
||||||
Self(value)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct UploadArtifact {
|
|
||||||
pub file_path: PathBuf,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl From<PathBuf> for UploadArtifact {
|
|
||||||
fn from(value: PathBuf) -> Self {
|
|
||||||
Self { file_path: value }
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Clone, Debug)]
|
|
||||||
pub struct UploadArtifactID(uuid::Uuid);
|
|
||||||
impl From<uuid::Uuid> for UploadArtifactID {
|
|
||||||
fn from(value: uuid::Uuid) -> Self {
|
|
||||||
Self(value)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl From<ArtifactID> for uuid::Uuid {
|
|
||||||
fn from(value: ArtifactID) -> Self {
|
|
||||||
value.0
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl TryFrom<String> for UploadArtifactID {
|
|
||||||
type Error = anyhow::Error;
|
|
||||||
|
|
||||||
fn try_from(value: String) -> Result<Self, Self::Error> {
|
|
||||||
let uuid = uuid::Uuid::parse_str(&value)?;
|
|
||||||
|
|
||||||
Ok(Self(uuid))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Deref for UploadArtifactID {
|
|
||||||
type Target = uuid::Uuid;
|
|
||||||
|
|
||||||
fn deref(&self) -> &Self::Target {
|
|
||||||
&self.0
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
BIN
crates/flux-releaser/testdata/example.tar
vendored
BIN
crates/flux-releaser/testdata/example.tar
vendored
Binary file not shown.
@ -1,14 +0,0 @@
|
|||||||
use flux_releaser::services::{
|
|
||||||
archive::Archive, file_reader::FileReader, flux_local_cluster::FluxLocalClusterManager,
|
|
||||||
};
|
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn can_package_files() -> anyhow::Result<()> {
|
|
||||||
tracing_subscriber::fmt().init();
|
|
||||||
|
|
||||||
//let sut = FluxLocalClusterManager::new(FileReader::new(), Archive::new());
|
|
||||||
|
|
||||||
//sut.package_clusters("testdata/flux_local_cluster/").await?;
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
@ -1,240 +0,0 @@
|
|||||||
use std::{
|
|
||||||
net::{Ipv4Addr, SocketAddr},
|
|
||||||
sync::Arc,
|
|
||||||
time::Duration,
|
|
||||||
};
|
|
||||||
|
|
||||||
use anyhow::Context;
|
|
||||||
use flux_releaser::{
|
|
||||||
app::{LocalApp, SharedApp, SharedLocalApp},
|
|
||||||
grpc::gen::flux_releaser_client::FluxReleaserClient,
|
|
||||||
services::{
|
|
||||||
archive::ArchiveFile, file_store::extensions::FileStoreExt,
|
|
||||||
flux_local_cluster::extensions::FluxLocalClusterManagerExt,
|
|
||||||
flux_releaser_uploader::FluxReleaserUploader,
|
|
||||||
},
|
|
||||||
};
|
|
||||||
use tokio::{net::TcpListener, runtime::Runtime, sync::Mutex, time::sleep};
|
|
||||||
|
|
||||||
struct Server {
|
|
||||||
endpoints: Endpoints,
|
|
||||||
app: SharedApp,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Clone, Debug)]
|
|
||||||
struct Endpoints {
|
|
||||||
http: SocketAddr,
|
|
||||||
grpc: SocketAddr,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Server {
|
|
||||||
pub async fn new() -> anyhow::Result<Self> {
|
|
||||||
let http_socket = Self::find_free_port().await?;
|
|
||||||
let grpc_socket = Self::find_free_port().await?;
|
|
||||||
|
|
||||||
Ok(Self {
|
|
||||||
endpoints: Endpoints {
|
|
||||||
http: http_socket,
|
|
||||||
grpc: grpc_socket,
|
|
||||||
},
|
|
||||||
app: SharedApp::new(flux_releaser::app::App::new().await?),
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn start(&self) -> anyhow::Result<()> {
|
|
||||||
flux_releaser::cli::server::run_server(self.endpoints.http, self.endpoints.grpc).await?;
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn find_free_port() -> anyhow::Result<SocketAddr> {
|
|
||||||
let socket = SocketAddr::new(std::net::IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0);
|
|
||||||
|
|
||||||
let listener = TcpListener::bind(socket).await?;
|
|
||||||
|
|
||||||
listener.local_addr().context("failed to get local addr")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
static INIT: std::sync::Once = std::sync::Once::new();
|
|
||||||
|
|
||||||
async fn perform_task_with_backoff<F, Fut, T, E>(
|
|
||||||
mut task: F,
|
|
||||||
max_retries: u32,
|
|
||||||
base_delay_ms: u64,
|
|
||||||
) -> Result<T, E>
|
|
||||||
where
|
|
||||||
F: FnMut() -> Fut,
|
|
||||||
Fut: std::future::Future<Output = Result<T, E>>,
|
|
||||||
{
|
|
||||||
let mut retries = 0;
|
|
||||||
let mut delay = base_delay_ms;
|
|
||||||
|
|
||||||
loop {
|
|
||||||
match task().await {
|
|
||||||
Ok(result) => return Ok(result),
|
|
||||||
Err(_e) if retries < max_retries => {
|
|
||||||
sleep(Duration::from_millis(delay)).await;
|
|
||||||
delay *= 2; // Exponential backoff
|
|
||||||
retries += 1;
|
|
||||||
}
|
|
||||||
Err(e) => return Err(e),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Makes sure the setup is ready for execution
|
|
||||||
async fn is_ready() -> anyhow::Result<()> {
|
|
||||||
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
|
|
||||||
|
|
||||||
perform_task_with_backoff(
|
|
||||||
|| async {
|
|
||||||
let endpoints = unsafe {
|
|
||||||
if ENDPOINTS.is_none() {
|
|
||||||
anyhow::bail!("endpoints not set yet");
|
|
||||||
}
|
|
||||||
|
|
||||||
ENDPOINTS.clone().unwrap()
|
|
||||||
};
|
|
||||||
|
|
||||||
let resp = reqwest::get(format!("http://{}/ping", endpoints.http)).await?;
|
|
||||||
|
|
||||||
if !resp.status().is_success() {
|
|
||||||
anyhow::bail!("failed with status: {}", resp.status());
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok::<(), anyhow::Error>(())
|
|
||||||
},
|
|
||||||
5,
|
|
||||||
500,
|
|
||||||
)
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
static mut ENDPOINTS: Option<Endpoints> = None;
|
|
||||||
static mut APP: Option<SharedApp> = None;
|
|
||||||
|
|
||||||
async fn setup() -> anyhow::Result<(Endpoints, SharedApp)> {
|
|
||||||
INIT.call_once(|| {
|
|
||||||
std::thread::spawn(|| {
|
|
||||||
let rt = Runtime::new().unwrap();
|
|
||||||
rt.block_on(async move {
|
|
||||||
println!("once was created once");
|
|
||||||
let server = Server::new().await.unwrap();
|
|
||||||
|
|
||||||
unsafe {
|
|
||||||
ENDPOINTS = Some(server.endpoints.clone());
|
|
||||||
APP = Some(server.app.clone());
|
|
||||||
}
|
|
||||||
|
|
||||||
server.start().await.unwrap();
|
|
||||||
})
|
|
||||||
});
|
|
||||||
});
|
|
||||||
|
|
||||||
is_ready().await?;
|
|
||||||
|
|
||||||
Ok(unsafe { (ENDPOINTS.clone().unwrap(), APP.clone().unwrap()) })
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn local_setup(endpoints: Endpoints) -> anyhow::Result<SharedLocalApp> {
|
|
||||||
Ok(SharedLocalApp::new(
|
|
||||||
LocalApp::new(format!("http://{}", endpoints.grpc)).await?,
|
|
||||||
))
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn can_upload_artifact() -> anyhow::Result<()> {
|
|
||||||
return Ok(());
|
|
||||||
std::env::set_var("RUST_LOG", "flux_releaser=trace");
|
|
||||||
let (endpoints, app) = setup().await?;
|
|
||||||
|
|
||||||
let client = FluxReleaserClient::connect(format!("http://{}", endpoints.grpc)).await?;
|
|
||||||
let client = FluxReleaserUploader::new(Arc::new(Mutex::new(client)));
|
|
||||||
|
|
||||||
let bytes: Vec<u8> = vec![0; 10_000_000];
|
|
||||||
let upload_id = client
|
|
||||||
.upload_archive(ArchiveFile {
|
|
||||||
content: bytes.clone(),
|
|
||||||
})
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
assert!(!upload_id.to_string().is_empty());
|
|
||||||
|
|
||||||
let actual_path = app.file_store().get_temp(upload_id).await?;
|
|
||||||
|
|
||||||
let actual_content = tokio::fs::read(actual_path).await?;
|
|
||||||
|
|
||||||
assert_eq!(&bytes, &actual_content);
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn can_publish_artifact() -> anyhow::Result<()> {
|
|
||||||
return Ok(());
|
|
||||||
std::env::set_var("RUST_LOG", "flux_releaser=trace");
|
|
||||||
|
|
||||||
let (endpoints, app) = setup().await?;
|
|
||||||
let local_app = local_setup(endpoints.clone()).await?;
|
|
||||||
|
|
||||||
let upload_id = local_app
|
|
||||||
.flux_local_cluster_manager()
|
|
||||||
.package_clusters("testdata/flux_local_cluster")
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
let archive = app.file_store().get_temp(upload_id.clone()).await?;
|
|
||||||
|
|
||||||
assert!(archive.exists());
|
|
||||||
|
|
||||||
let artifact_id = local_app
|
|
||||||
.flux_local_cluster_manager()
|
|
||||||
.commit_artifact("some-app", "some-branch", upload_id)
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
let artifact = app.file_store().get_archive(artifact_id).await?;
|
|
||||||
|
|
||||||
assert!(artifact.exists());
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn can_trigger_latest_release() -> anyhow::Result<()> {
|
|
||||||
return Ok(());
|
|
||||||
let test_id = uuid::Uuid::now_v7();
|
|
||||||
|
|
||||||
std::env::set_var("RUST_LOG", "flux_releaser=trace");
|
|
||||||
|
|
||||||
let (endpoints, app) = setup().await?;
|
|
||||||
let local_app = local_setup(endpoints.clone()).await?;
|
|
||||||
|
|
||||||
let upload_id = local_app
|
|
||||||
.flux_local_cluster_manager()
|
|
||||||
.package_clusters("testdata/flux_local_cluster")
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
let archive = app.file_store().get_temp(upload_id.clone()).await?;
|
|
||||||
|
|
||||||
assert!(archive.exists());
|
|
||||||
|
|
||||||
let _ = local_app
|
|
||||||
.flux_local_cluster_manager()
|
|
||||||
.commit_artifact(test_id, "some-branch", upload_id)
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
local_app
|
|
||||||
.flux_local_cluster_manager()
|
|
||||||
.trigger_release(test_id, "some-branch")
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
// 1. Verify that release event has been sent
|
|
||||||
// 2. Verify that we've splatted the flux cluster over the upstream registry
|
|
||||||
// 3. Verify database has a release record
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct TestGreeter {}
|
|
19
cuddle.yaml
19
cuddle.yaml
@ -5,17 +5,16 @@ base: "git@git.front.kjuulh.io:kjuulh/cuddle-rust-service-plan.git"
|
|||||||
vars:
|
vars:
|
||||||
service: "flux-releaser"
|
service: "flux-releaser"
|
||||||
registry: kasperhermansen
|
registry: kasperhermansen
|
||||||
database:
|
|
||||||
crdb: "true"
|
clusters:
|
||||||
ingress:
|
clank-prod:
|
||||||
- external: "true"
|
replicas: "3"
|
||||||
- internal: "true"
|
namespace: prod
|
||||||
|
|
||||||
|
|
||||||
cuddle/clusters:
|
deployment:
|
||||||
dev:
|
registry: git@git.front.kjuulh.io:kjuulh/clank-clusters
|
||||||
env:
|
env:
|
||||||
service.host: "0.0.0.0:3000"
|
|
||||||
prod:
|
prod:
|
||||||
env:
|
clusters:
|
||||||
service.host: "0.0.0.0:3000"
|
- clank-prod
|
||||||
|
@ -1,3 +0,0 @@
|
|||||||
{
|
|
||||||
"$schema": "https://docs.renovatebot.com/renovate-schema.json"
|
|
||||||
}
|
|
@ -21,8 +21,8 @@ services:
|
|||||||
/bin/sh -c "
|
/bin/sh -c "
|
||||||
/usr/bin/mc alias set myminio http://minio:10000 minioadmin minioadminpassword;
|
/usr/bin/mc alias set myminio http://minio:10000 minioadmin minioadminpassword;
|
||||||
/usr/bin/mc admin info myminio;
|
/usr/bin/mc admin info myminio;
|
||||||
/usr/bin/mc mb myminio/flux-releaser;
|
/usr/bin/mc mb myminio/mybucket;
|
||||||
/usr/bin/mc policy set public myminio/flux-releaser;
|
/usr/bin/mc policy set public myminio/mybucket;
|
||||||
exit 0;
|
exit 0;
|
||||||
"
|
"
|
||||||
|
|
||||||
@ -34,17 +34,3 @@ services:
|
|||||||
- NATS_ENABLE_AUTH=yes
|
- NATS_ENABLE_AUTH=yes
|
||||||
- NATS_USERNAME=user
|
- NATS_USERNAME=user
|
||||||
- NATS_PASSWORD=secret
|
- NATS_PASSWORD=secret
|
||||||
|
|
||||||
crdb:
|
|
||||||
restart: 'always'
|
|
||||||
image: 'cockroachdb/cockroach:latest'
|
|
||||||
command: 'start-single-node --advertise-addr 0.0.0.0 --insecure'
|
|
||||||
healthcheck:
|
|
||||||
test: ["CMD", "curl", "-f", "http://localhost:8080/health?ready=1"]
|
|
||||||
interval: '10s'
|
|
||||||
timeout: '30s'
|
|
||||||
retries: 5
|
|
||||||
start_period: '20s'
|
|
||||||
ports:
|
|
||||||
- '28080:8080'
|
|
||||||
- '26257:26257'
|
|
||||||
|
Loading…
Reference in New Issue
Block a user