Compare commits

...

56 Commits
main-1 ... main

Author SHA1 Message Date
dfc45fdb83 fix(deps): update rust crate aws-sdk-s3 to v1.51.0
All checks were successful
continuous-integration/drone/pr Build is passing
continuous-integration/drone/push Build is passing
2024-09-19 00:30:52 +00:00
c496a6a1e7 fix(deps): update rust crate aws-sdk-s3 to v1.50.0
All checks were successful
continuous-integration/drone/pr Build is passing
continuous-integration/drone/push Build is passing
2024-09-18 00:32:33 +00:00
30888ab5d6 chore(deps): update rust crate anyhow to v1.0.89
All checks were successful
continuous-integration/drone/pr Build is passing
continuous-integration/drone/push Build is passing
2024-09-15 04:28:51 +00:00
ead5cce704 chore(deps): update rust crate anyhow to v1.0.88
All checks were successful
continuous-integration/drone/pr Build is passing
continuous-integration/drone/push Build is passing
2024-09-12 00:31:15 +00:00
8a479a61b8 fix(deps): update aws-sdk-rust monorepo
All checks were successful
continuous-integration/drone/push Build is passing
2024-09-11 02:30:51 +02:00
6ce59c7420 fix(deps): update rust crate rustls to v0.23.13
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/pr Build is passing
2024-09-11 00:27:37 +00:00
f28035fb28 fix(deps): update all dependencies
All checks were successful
continuous-integration/drone/pr Build is passing
continuous-integration/drone/push Build is passing
2024-09-08 13:05:50 +00:00
522db2293b
feat: release dev and prod
All checks were successful
continuous-integration/drone/push Build is passing
Signed-off-by: kjuulh <contact@kjuulh.io>
2024-09-08 12:49:36 +02:00
1fb24abd39
feat: use upstream
Some checks failed
continuous-integration/drone/push Build is failing
Signed-off-by: kjuulh <contact@kjuulh.io>
2024-09-08 12:45:20 +02:00
96f3b52c2a
feat: set interval down to a minute
Some checks failed
continuous-integration/drone/push Build is failing
Signed-off-by: kjuulh <contact@kjuulh.io>
2024-09-08 12:44:05 +02:00
700d1c12ed
feat: with native roots
Some checks failed
continuous-integration/drone/push Build is failing
Signed-off-by: kjuulh <contact@kjuulh.io>
2024-09-08 00:37:34 +02:00
4ce386975c
feat: add comment
Some checks failed
continuous-integration/drone/push Build is failing
Signed-off-by: kjuulh <contact@kjuulh.io>
2024-09-07 23:49:04 +02:00
065b0c5847
feat: always install
Some checks failed
continuous-integration/drone/push Build is failing
Signed-off-by: kjuulh <contact@kjuulh.io>
2024-09-07 23:48:45 +02:00
195d772beb
feat: add install
Some checks failed
continuous-integration/drone/push Build is failing
Signed-off-by: kjuulh <contact@kjuulh.io>
2024-09-07 23:43:04 +02:00
a4e252ac91
feat: with ring;
Some checks failed
continuous-integration/drone/push Build is failing
Signed-off-by: kjuulh <contact@kjuulh.io>
2024-09-07 23:39:23 +02:00
23db713cb7
feat: enable stuff
Some checks failed
continuous-integration/drone/push Build is failing
Signed-off-by: kjuulh <contact@kjuulh.io>
2024-09-07 23:33:21 +02:00
2b250dd4df
feat: update packages
Some checks failed
continuous-integration/drone/push Build is failing
Signed-off-by: kjuulh <contact@kjuulh.io>
2024-09-07 23:24:17 +02:00
2d60d56a30
feat: update packages
Some checks failed
continuous-integration/drone/push Build is failing
Signed-off-by: kjuulh <contact@kjuulh.io>
2024-09-07 23:16:11 +02:00
010cb18a0e fix(deps): update rust crate aws-sdk-s3 to v1.48.0
Some checks failed
continuous-integration/drone/pr Build is failing
continuous-integration/drone/push Build is failing
2024-09-05 00:29:52 +00:00
935f708fe8 fix(deps): update tonic monorepo to v0.12.2
Some checks failed
continuous-integration/drone/pr Build is failing
continuous-integration/drone/push Build is failing
2024-08-26 20:40:59 +00:00
646f5bf916 fix(deps): update tonic monorepo to 0.12.0
Some checks failed
continuous-integration/drone/pr Build is failing
continuous-integration/drone/push Build is failing
2024-08-21 23:28:46 +00:00
be7c89aa5a fix(deps): update rust crate prost to 0.13.0
Some checks failed
renovate/artifacts Artifact file update failure
continuous-integration/drone/pr Build is failing
continuous-integration/drone/push Build is failing
2024-08-21 22:29:28 +00:00
e51dbaa52c fix(deps): update aws-sdk-rust monorepo
Some checks failed
continuous-integration/drone/pr Build is failing
continuous-integration/drone/push Build is failing
2024-08-21 20:58:44 +00:00
44ae9c2d2b
feat: need cargo updated as well
Some checks reported errors
continuous-integration/drone/push Build encountered an error
Signed-off-by: kjuulh <contact@kjuulh.io>
2024-05-27 22:37:10 +02:00
3ca0a836ca
feat: release to prod
Some checks reported errors
continuous-integration/drone/push Build encountered an error
Signed-off-by: kjuulh <contact@kjuulh.io>
2024-05-26 23:08:26 +02:00
921a7c6fbf
feat: add hyper webroots
Some checks reported errors
continuous-integration/drone/push Build encountered an error
Signed-off-by: kjuulh <contact@kjuulh.io>
2024-05-26 20:46:29 +02:00
616d44c988
feat: tls
Some checks reported errors
continuous-integration/drone/push Build encountered an error
Signed-off-by: kjuulh <contact@kjuulh.io>
2024-05-26 17:23:58 +02:00
9601cfa980
feat: use new tokio stream
Some checks reported errors
continuous-integration/drone/push Build encountered an error
Signed-off-by: kjuulh <contact@kjuulh.io>
2024-05-26 15:45:40 +02:00
b12653b9e9
feat: add more debug
Some checks reported errors
continuous-integration/drone/push Build encountered an error
Signed-off-by: kjuulh <contact@kjuulh.io>
2024-05-26 15:19:58 +02:00
5ce33b379e
feat: add ability to actually publish
Some checks failed
continuous-integration/drone/push Build is failing
Signed-off-by: kjuulh <contact@kjuulh.io>
2024-05-26 13:37:49 +02:00
0a258829f7
feat: without prefix
Some checks failed
continuous-integration/drone/push Build is failing
Signed-off-by: kjuulh <contact@kjuulh.io>
2024-05-25 23:35:53 +02:00
fffae453ee
feat: remove unused parameter
All checks were successful
continuous-integration/drone/push Build is passing
Signed-off-by: kjuulh <contact@kjuulh.io>
2024-05-25 22:56:43 +02:00
0f59f19239
feat: without cluster in trigger release
Some checks failed
continuous-integration/drone/push Build is failing
Signed-off-by: kjuulh <contact@kjuulh.io>
2024-05-25 22:50:28 +02:00
c345040441 fix(deps): update aws-sdk-rust monorepo
Some checks reported errors
continuous-integration/drone/push Build encountered an error
2024-05-25 21:14:15 +02:00
a0885c0ae6 fix(deps): update rust crate prost to v0.12.6
Some checks reported errors
continuous-integration/drone/pr Build encountered an error
continuous-integration/drone/push Build encountered an error
2024-05-25 19:14:02 +00:00
c5df5959c6 fix(deps): update rust crate serde to v1.0.203
Some checks failed
continuous-integration/drone/pr Build is failing
continuous-integration/drone/push Build is failing
2024-05-25 18:40:00 +00:00
05bdd89f50
chore: stuff
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone Build is passing
2024-04-10 21:09:47 +02:00
c8600b54e0 Merge pull request 'Configure Renovate' (#1) from renovate/configure into main
Some checks failed
continuous-integration/drone/push Build is failing
Reviewed-on: #1
2024-04-06 20:07:31 +00:00
594bbf23fa Add renovate.json
Some checks failed
continuous-integration/drone/push Build is passing
continuous-integration/drone/pr Build is failing
2024-03-09 21:48:58 +00:00
0293002c7e
feat: without push
All checks were successful
continuous-integration/drone/push Build is passing
Signed-off-by: kjuulh <contact@kjuulh.io>
2024-03-09 22:07:27 +01:00
7c114b70f7
feat: update
Some checks failed
continuous-integration/drone/push Build is failing
Signed-off-by: kjuulh <contact@kjuulh.io>
2024-03-09 22:04:01 +01:00
1d7c056925
feat: download archive
Some checks failed
continuous-integration/drone/push Build is failing
Signed-off-by: kjuulh <contact@kjuulh.io>
2024-02-22 18:36:44 +01:00
ff6cc941d8
feat: add cockroach db
Some checks failed
continuous-integration/drone/push Build is failing
Signed-off-by: kjuulh <contact@kjuulh.io>
2024-02-22 18:26:53 +01:00
b4af46aa23
feat: add crdb
Some checks failed
continuous-integration/drone/push Build is failing
Signed-off-by: kjuulh <contact@kjuulh.io>
2024-02-21 06:51:54 +01:00
da5e6c8bff
feat: add crdb
Some checks failed
continuous-integration/drone/push Build is failing
Signed-off-by: kjuulh <contact@kjuulh.io>
2024-02-21 06:42:15 +01:00
5780d55587
feat: move client into a client file
Some checks failed
continuous-integration/drone/push Build is failing
Signed-off-by: kjuulh <contact@kjuulh.io>
2024-02-21 06:33:23 +01:00
d35f5c8f22
feat: add actual upload
Some checks failed
continuous-integration/drone/push Build is failing
Signed-off-by: kjuulh <contact@kjuulh.io>
2024-02-19 22:50:49 +01:00
a17dd2bd10
feat: with comparing contents
Some checks failed
continuous-integration/drone/push Build is failing
Signed-off-by: kjuulh <contact@kjuulh.io>
2024-02-18 22:37:54 +01:00
02e70fe268
feat: only write not all
Some checks failed
continuous-integration/drone/push Build is failing
Signed-off-by: kjuulh <contact@kjuulh.io>
2024-02-18 22:29:44 +01:00
9cc3d80917
feat: add upload chunker
Some checks failed
continuous-integration/drone/push Build is failing
Signed-off-by: kjuulh <contact@kjuulh.io>
2024-02-18 22:25:43 +01:00
6cf1a23169
feat: with actual archive test
Some checks failed
continuous-integration/drone/push Build is failing
Signed-off-by: kjuulh <contact@kjuulh.io>
2024-02-18 13:38:16 +01:00
e994df19cf
feat: add tests
Signed-off-by: kjuulh <contact@kjuulh.io>
2024-02-18 11:39:02 +01:00
44fbf1f362
feat: fix test
Some checks failed
continuous-integration/drone/pr Build is failing
continuous-integration/drone/push Build is failing
Signed-off-by: kjuulh <contact@kjuulh.io>
2024-02-18 01:31:41 +01:00
c3739c1bc1
feat: can upload archives
Signed-off-by: kjuulh <contact@kjuulh.io>
2024-02-18 01:29:46 +01:00
c54bbaf017
feat: update
Some checks failed
continuous-integration/drone/push Build is failing
Signed-off-by: kjuulh <contact@kjuulh.io>
2024-02-17 16:11:06 +01:00
185d997694
feat: update image
Some checks failed
continuous-integration/drone/push Build is failing
Signed-off-by: kjuulh <contact@kjuulh.io>
2024-02-17 16:10:52 +01:00
44 changed files with 3890 additions and 1056 deletions

View File

@ -1,155 +1,2 @@
kind: pipeline
name: default
type: docker
steps:
- name: load_secret
image: debian:buster-slim
volumes:
- name: ssh
path: /root/.ssh/
environment:
SSH_KEY:
from_secret: gitea_id_ed25519
commands:
- mkdir -p $HOME/.ssh/
- echo "$SSH_KEY" | base64 -d > $HOME/.ssh/id_ed25519
- chmod -R 600 ~/.ssh
- |
cat >$HOME/.ssh/config <<EOL
Host git.front.kjuulh.io
IdentityFile $HOME/.ssh/id_ed25519
IdentitiesOnly yes
UserKnownHostsFile=/dev/null
StrictHostKeyChecking no
EOL
- chmod 700 ~/.ssh/config
- name: build pr
image: kasperhermansen/cuddle:latest
pull: always
volumes:
- name: ssh
path: /root/.ssh/
- name: ci
path: /mnt/ci
commands:
- eval `ssh-agent`
- ssh-add
- echo "$DOCKER_PASSWORD" | docker login --password-stdin --username="$DOCKER_USERNAME" docker.io
- apk add git
- export RUST_LOG=trace
- cuddle-rust-service-plan pr
environment:
DOCKER_BUILDKIT: 1
DOCKER_PASSWORD:
from_secret: docker_password
DOCKER_USERNAME:
from_secret: docker_username
CUDDLE_SECRETS_PROVIDER: 1password
CUDDLE_ONE_PASSWORD_DOT_ENV: ".env.ci"
CUDDLE_SSH_AGENT: "true"
CI_PREFIX: "/mnt/ci/ci"
CUDDLE_PLEASE_TOKEN:
from_secret: cuddle_please_token
OP_SERVICE_ACCOUNT_TOKEN:
from_secret: op_service_account_token
when:
event:
- pull_request
exclude:
- main
- master
depends_on:
- "load_secret"
- name: build main
image: kasperhermansen/cuddle-rust-service-plan:main-1707658158
pull: always
volumes:
- name: ssh
path: /root/.ssh/
commands:
- eval `ssh-agent`
- ssh-add
- echo "$DOCKER_PASSWORD" | docker login --password-stdin --username="$DOCKER_USERNAME" docker.io
- export RUST_LOG=trace
- cuddle-rust-service-plan main
environment:
REGISTRY_CACHE_USERNAME:
from_secret: registry_cache_username
REGISTRY_CACHE_PASSWORD:
from_secret: registry_cache_password
REGISTRY_CACHE_TOKEN:
from_secret: registry_cache_token
REGISTRY_CACHE_url:
from_secret: registry_cache_url
DOCKER_BUILDKIT: 1
DOCKER_PASSWORD:
from_secret: docker_password
DOCKER_USERNAME:
from_secret: docker_username
CUDDLE_SECRETS_PROVIDER: 1password
CUDDLE_ONE_PASSWORD_DOT_ENV: ".env.ci"
CUDDLE_SSH_AGENT: "true"
GIT_PASSWORD:
from_secret: git_password
CI_PREFIX: "/mnt/ci/ci"
DOCKER_HOST: "tcp://192.168.1.233:2376"
CUDDLE_PLEASE_TOKEN:
from_secret: cuddle_please_token
OP_SERVICE_ACCOUNT_TOKEN:
from_secret: op_service_account_token
when:
event:
- push
branch:
- main
- master
exclude:
- pull_request
depends_on:
- "load_secret"
- name: deploy release
image: kasperhermansen/cuddle:latest
pull: always
volumes:
- name: ssh
path: /root/.ssh/
- name: dockersock
path: /var/run
- name: ci
path: /mnt/ci
commands:
- eval `ssh-agent`
- ssh-add
- echo "$DOCKER_PASSWORD" | docker login --password-stdin --username="$DOCKER_USERNAME" docker.io
- apk add git
- cuddle x ci:release
environment:
DOCKER_BUILDKIT: 1
DOCKER_PASSWORD:
from_secret: docker_password
DOCKER_USERNAME:
from_secret: docker_username
CUDDLE_SECRETS_PROVIDER: 1password
CUDDLE_ONE_PASSWORD_DOT_ENV: ".env.ci"
CUDDLE_SSH_AGENT: "true"
CI_PREFIX: "/mnt/ci/ci"
CUDDLE_PLEASE_TOKEN:
from_secret: cuddle_please_token
OP_SERVICE_ACCOUNT_TOKEN:
from_secret: op_service_account_token
when:
event:
- tag
ref:
include:
- refs/tags/v*
depends_on:
- "load_secret"
volumes:
- name: ssh
temp: {}
kind: template
load: cuddle-rust-service-plan.yaml

2
.env
View File

@ -5,3 +5,5 @@ AWS_SECRET_ACCESS_KEY=minioadminpassword
NATS_URL=127.0.0.1:4222
NATS_USERNAME=user
NATS_PASSWORD=secret
DATABASE_URL=postgres://root@localhost:26257/defaultdb?sslmode=disable

2588
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -17,6 +17,9 @@ flux-releaser commit \
--registry https://flux-releaser.i.kjuulh.io
```
!!! note
include path should include a sub folder for each releaseable artifact
flux releaser can also be used as a library, which is especially useful if integrated with dagger
Now you can release or auto-release this your desired environment and app

View File

@ -11,19 +11,32 @@ tracing-subscriber.workspace = true
clap.workspace = true
dotenv.workspace = true
axum.workspace = true
prost = "0.12.3"
tonic = "0.11.0"
prost = "0.13.2"
tonic = { version = "0.12.2", features = ["tls", "tls-native-roots"] }
uuid = { version = "1.7.0", features = ["v7", "v4"] }
async-trait = "0.1.77"
mockall_double = "0.3.1"
aws-config = { version = "1.1.5", features = ["behavior-version-latest"] }
aws-sdk-s3 = { version = "1.15.0", features = ["behavior-version-latest"] }
aws-config = { version = "1.5.5", features = ["behavior-version-latest"] }
aws-sdk-s3 = { version = "1.48.0", features = ["behavior-version-latest"] }
serde = { version = "1.0.196", features = ["derive"] }
serde_json = "1.0.113"
nats = "0.24.1"
nats = "0.25.0"
walkdir = "2.4.0"
tar = "0.4.40"
tokio-stream = { version = "0.1.15", features = ["full"] }
rand = "0.8.5"
sqlx = { version = "0.8.0", features = [
"postgres",
"runtime-tokio",
"uuid",
"chrono",
] }
chrono = "0.4.34"
git2 = "0.19.0"
rustls = { version = "0.23.12" }
[build-dependencies]
tonic-build = "0.11.0"
tonic-build = "0.12.0"
[dev-dependencies]
mockall = "0.12.1"
lazy_static = "1.4.0"
reqwest = "0.12.0"

View File

@ -0,0 +1,8 @@
-- Add migration script here
CREATE TABLE IF NOT EXISTS artifacts (
app VARCHAR NOT NULL,
branch VARCHAR NOT NULL,
artifact_id UUID NOT NULL PRIMARY KEY,
created_at TIMESTAMP DEFAULT NOW(),
UNIQUE (app, artifact_id)
);

View File

@ -2,17 +2,33 @@ syntax = "proto3";
package flux_releaser;
service Greeter {
// Sends a greeting
rpc SayHello (HelloRequest) returns (HelloReply) {}
service FluxReleaser {
rpc UploadArtifact (stream UploadArtifactRequest) returns (UploadArtifactResponse) {}
rpc CommitArtifact (CommitArtifactRequest) returns (CommitArtifactResponse) {}
rpc TriggerRelease (TriggerReleaseRequest) returns (TriggerReleaseResponse) {}
}
// The request message containing the user's name.
message HelloRequest {
string name = 1;
message UploadArtifactRequest {
bytes content = 1;
}
// The response message containing the greetings.
message HelloReply {
string message = 1;
message UploadArtifactResponse {
string upload_id = 1;
}
message CommitArtifactRequest {
string app = 1;
string branch = 2;
string upload_id = 3;
}
message CommitArtifactResponse {
string artifact_id = 1;
}
message TriggerReleaseRequest {
string app = 1;
string branch = 2;
}
message TriggerReleaseResponse {}

View File

@ -1,11 +1,14 @@
use std::net::SocketAddr;
use axum::{routing::get, Router};
use axum::{response::IntoResponse, routing::get, Router};
use crate::app::SharedApp;
pub async fn axum_serve(host: SocketAddr, app: SharedApp) -> anyhow::Result<()> {
let app = Router::new().route("/", get(root)).with_state(app);
let app = Router::new()
.route("/ping", get(pong))
.route("/", get(root))
.with_state(app);
tracing::info!("listening on {}", host);
let listener = tokio::net::TcpListener::bind(host).await.unwrap();
@ -15,6 +18,10 @@ pub async fn axum_serve(host: SocketAddr, app: SharedApp) -> anyhow::Result<()>
Ok(())
}
async fn pong() -> impl IntoResponse {
"pong!"
}
async fn root() -> &'static str {
"Hello, flux-releaser!"
}

View File

@ -1,9 +1,17 @@
use std::{ops::Deref, sync::Arc};
use aws_config::{BehaviorVersion, Region};
use aws_sdk_s3::config::Credentials;
use sqlx::{PgPool, Postgres};
use self::infra::aws_s3::s3_client;
use crate::services::{
archive::Archive,
cluster_list::ClusterList,
git::{Git, SharedGit},
};
use self::infra::{
aws_s3::s3_client,
grpc::{new_client, FluxReleaserGrpcClient},
};
#[derive(Clone)]
pub struct SharedApp(Arc<App>);
@ -25,6 +33,8 @@ impl SharedApp {
pub struct App {
pub s3_client: aws_sdk_s3::Client,
pub nats: infra::nats::Nats,
pub database: PgPool,
pub git: SharedGit,
}
impl App {
@ -32,8 +42,45 @@ impl App {
Ok(Self {
s3_client: s3_client().await?,
nats: infra::nats::Nats::new().await?,
database: infra::database::get_database().await?,
git: Git::new(
std::env::var("FLUX_RELEASER_GIT_REPOSITORY")
.unwrap_or("ssh://git@git.front.kjuulh.io/kjuulh/clank-clusters.git".into()),
ClusterList::default(),
Archive::default(),
)
.into(),
})
}
}
pub mod infra;
#[derive(Clone)]
pub struct SharedLocalApp(Arc<LocalApp>);
impl Deref for SharedLocalApp {
type Target = Arc<LocalApp>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl SharedLocalApp {
pub fn new(app: LocalApp) -> Self {
Self(Arc::new(app))
}
}
pub struct LocalApp {
pub grpc_client: FluxReleaserGrpcClient,
}
impl LocalApp {
pub async fn new(registry: impl Into<String>) -> anyhow::Result<Self> {
Ok(Self {
grpc_client: new_client(registry).await?,
})
}
}

View File

@ -1,2 +1,4 @@
pub mod aws_s3;
pub mod database;
pub mod grpc;
pub mod nats;

View File

@ -1,3 +1,4 @@
use anyhow::Context;
use aws_config::{BehaviorVersion, Region};
use aws_sdk_s3::config::Credentials;
@ -5,15 +6,16 @@ pub async fn s3_client() -> anyhow::Result<aws_sdk_s3::Client> {
let shared_config = aws_config::defaults(BehaviorVersion::latest())
.region(Region::new("eu-west-1"))
.credentials_provider(Credentials::new(
std::env::var("AWS_ACCESS_KEY_ID")?,
std::env::var("AWS_SECRET_ACCESS_KEY")?,
std::env::var("AWS_ACCESS_KEY_ID").context("AWS_ACCESS_KEY_ID was not set")?,
std::env::var("AWS_SECRET_ACCESS_KEY").context("AWS_SECRET_ACCESS_KEY was not set")?,
None,
None,
"flux_releaser",
));
let config = aws_sdk_s3::config::Builder::from(&shared_config.load().await)
.endpoint_url(std::env::var("AWS_ENDPOINT_URL")?)
.endpoint_url(std::env::var("AWS_ENDPOINT_URL").context("AWS_ENDPOINT_URL was not set")?)
.force_path_style(true)
.build();
let client = aws_sdk_s3::Client::from_conf(config);

View File

@ -0,0 +1,19 @@
use anyhow::Context;
use sqlx::{PgPool, Postgres};
pub async fn get_database() -> anyhow::Result<PgPool> {
tracing::trace!("initializing database");
let db =
sqlx::PgPool::connect(&std::env::var("DATABASE_URL").context("DATABASE_URL is not set")?)
.await?;
tracing::trace!("migrating crdb");
sqlx::migrate!("migrations/crdb")
.set_locking(false)
.run(&db)
.await?;
let _ = sqlx::query("SELECT 1;").fetch_one(&db).await?;
Ok(db)
}

View File

@ -0,0 +1,36 @@
use std::sync::Arc;
use tokio::sync::Mutex;
use tonic::transport::{Channel, ClientTlsConfig};
use crate::grpc::gen::flux_releaser_client::FluxReleaserClient;
pub type FluxReleaserGrpcClient = Arc<Mutex<FluxReleaserClient<Channel>>>;
pub async fn new_client(registry: impl Into<String>) -> anyhow::Result<FluxReleaserGrpcClient> {
let registry: String = registry.into();
// Workaround: https://github.com/algesten/ureq/issues/765#issuecomment-2282921492
static INIT: std::sync::Once = std::sync::Once::new();
INIT.call_once(|| {
rustls::crypto::aws_lc_rs::default_provider()
.install_default()
.unwrap();
});
let channel = if registry.starts_with("https") {
let tls = ClientTlsConfig::new().with_native_roots();
Channel::from_shared(registry)?
.tls_config(tls)?
.connect()
.await?
} else {
Channel::from_shared(registry)?.connect().await?
};
let client = FluxReleaserClient::new(channel);
Ok(Arc::new(Mutex::new(client)))
}

View File

@ -1,11 +1,10 @@
use clap::{Parser, Subcommand};
use std::net::SocketAddr;
use crate::{
api::axum_serve,
app::{App, SharedApp},
grpc::tonic_serve,
};
use crate::services::flux_local_cluster::extensions::FluxLocalClusterManagerExt;
pub mod client;
pub mod server;
#[derive(Parser)]
#[command(author, version, about, long_about = None, subcommand_required = true)]
@ -22,27 +21,62 @@ pub enum Commands {
#[arg(env = "SERVICE_GRPC_HOST", long, default_value = "127.0.0.1:7900")]
grpc_host: SocketAddr,
},
Commit {
#[arg(long)]
app: String,
#[arg(long)]
branch: String,
#[arg(long)]
include: String,
#[arg(env = "FLUX_RELEASER_REGISTRY", long)]
registry: String,
},
Release {
#[arg(long)]
app: String,
#[arg(long)]
branch: String,
#[arg(env = "FLUX_RELEASER_REGISTRY", long)]
registry: String,
},
}
impl Command {
pub async fn run() -> anyhow::Result<()> {
let cli = Command::parse();
if let Some(Commands::Serve { host, grpc_host }) = cli.command {
tracing_subscriber::fmt::init();
match cli.command {
Some(Commands::Serve { host, grpc_host }) => {
server::run_server(host, grpc_host).await?;
}
Some(Commands::Commit {
app,
branch,
include,
registry,
}) => {
let app = client::get_local_app(registry).await?;
tracing::info!("Starting service");
let upload_id = app
.flux_local_cluster_manager()
.package_clusters(include)
.await?;
}
Some(Commands::Release {
app: service_app,
branch,
registry,
}) => {
let app = client::get_local_app(registry).await?;
let app = SharedApp::new(App::new().await?);
tokio::select! {
res = axum_serve(host, app.clone()) => {
res?;
},
res = tonic_serve(grpc_host, app.clone()) => {
res?;
},
};
app.flux_local_cluster_manager()
.trigger_release(service_app, branch)
.await?;
}
None => (),
}
Ok(())

View File

@ -0,0 +1,11 @@
use crate::app::{LocalApp, SharedLocalApp};
pub async fn get_local_app(registry: impl Into<String>) -> anyhow::Result<SharedLocalApp> {
tracing_subscriber::fmt::init();
tracing::debug!("Starting client commit");
let app = SharedLocalApp::new(LocalApp::new(registry).await?);
Ok(app)
}

View File

@ -0,0 +1,29 @@
use std::net::SocketAddr;
use crate::{
api::axum_serve,
app::{App, SharedApp},
grpc::tonic_serve,
};
pub async fn run_server(
host: impl Into<SocketAddr>,
grpc_host: impl Into<SocketAddr>,
) -> anyhow::Result<()> {
tracing_subscriber::fmt::init();
tracing::info!("Starting service");
let app = SharedApp::new(App::new().await?);
tokio::select! {
res = axum_serve(host.into(), app.clone()) => {
res?;
},
res = tonic_serve(grpc_host.into(), app.clone()) => {
res?;
},
};
Ok(())
}

View File

@ -1,17 +1,25 @@
use std::net::SocketAddr;
use std::{env::temp_dir, fmt::Display, net::SocketAddr};
use tonic::transport::Server;
use tokio::io::AsyncWriteExt;
use tokio_stream::StreamExt;
use tonic::{service::interceptor, transport::Server};
use uuid::Uuid;
use crate::{
app::SharedApp,
services::release_manager::{
extensions::ReleaseManagerExt, models::CommitArtifact, ReleaseManager,
extensions::ReleaseManagerExt,
models::{CommitArtifact, Release},
ReleaseManager,
},
};
use self::gen::{greeter_server, HelloReply, HelloRequest};
use self::gen::{
flux_releaser_server, CommitArtifactRequest, CommitArtifactResponse, TriggerReleaseRequest,
TriggerReleaseResponse, UploadArtifactRequest, UploadArtifactResponse,
};
mod gen {
pub mod gen {
tonic::include_proto!("flux_releaser");
}
@ -27,34 +35,154 @@ impl FluxReleaserGrpc {
}
}
#[tonic::async_trait]
impl greeter_server::Greeter for FluxReleaserGrpc {
async fn say_hello(
&self,
request: tonic::Request<HelloRequest>,
) -> std::result::Result<tonic::Response<HelloReply>, tonic::Status> {
self.release_manager
.commit_artifact(CommitArtifact {
app: "some-app".into(),
branch: "some-branch".into(),
})
.await
.unwrap();
impl std::fmt::Debug for FluxReleaserGrpc {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
Ok(())
}
}
Ok(tonic::Response::new(HelloReply {
message: "something".into(),
#[tonic::async_trait]
impl flux_releaser_server::FluxReleaser for FluxReleaserGrpc {
#[tracing::instrument]
async fn upload_artifact(
&self,
request: tonic::Request<tonic::Streaming<UploadArtifactRequest>>,
) -> std::result::Result<tonic::Response<UploadArtifactResponse>, tonic::Status> {
let mut stream = request.into_inner();
let file_path = temp_dir()
.join("flux_releaser")
.join("tmp")
.join("upload_artifact")
.join(Uuid::new_v4().to_string());
tokio::fs::create_dir_all(file_path.parent().unwrap()).await?;
let mut file = tokio::fs::File::create(&file_path).await?;
while let Some(item) = stream.next().await {
tracing::trace!("received chunk");
let item = item?;
let _ = file.write(&item.content).await?;
}
tracing::info!("got this far 1a");
file.flush().await?;
tracing::info!("got this far 1");
let upload_id = match self.release_manager.upload_artifact(file_path.into()).await {
Ok(res) => res,
Err(e) => {
tracing::warn!("failed to upload artifact: {}", e);
return Err(tonic::Status::unknown(e.to_string()));
}
};
tracing::info!("got this far 2");
Ok(tonic::Response::new(UploadArtifactResponse {
upload_id: upload_id.to_string(),
}))
}
#[tracing::instrument]
async fn commit_artifact(
&self,
request: tonic::Request<CommitArtifactRequest>,
) -> std::result::Result<tonic::Response<CommitArtifactResponse>, tonic::Status> {
let req = request.into_inner();
let artifact = self
.release_manager
.commit_artifact(req.try_into().map_err(|e: anyhow::Error| {
tracing::warn!("failed to parse input body: {}", e);
tonic::Status::invalid_argument(e.to_string())
})?)
.await
.map_err(|e: anyhow::Error| {
tracing::warn!("failed to commit artifact: {}", e);
tonic::Status::internal(e.to_string())
})?;
Ok(tonic::Response::new(CommitArtifactResponse {
artifact_id: artifact.to_string(),
}))
}
#[tracing::instrument]
async fn trigger_release(
&self,
request: tonic::Request<TriggerReleaseRequest>,
) -> std::result::Result<tonic::Response<TriggerReleaseResponse>, tonic::Status> {
let req = request.into_inner();
tracing::info!("some trigger release");
self.release_manager
.release(req.try_into().map_err(|e: anyhow::Error| {
tracing::warn!("failed to parse input body: {}", e);
tonic::Status::invalid_argument(e.to_string())
})?)
.await
.map_err(|e| {
tracing::warn!("failed to release: {}", e);
tonic::Status::internal(e.to_string())
})?;
Ok(tonic::Response::new(TriggerReleaseResponse {}))
}
}
impl TryFrom<CommitArtifactRequest> for CommitArtifact {
type Error = anyhow::Error;
fn try_from(value: CommitArtifactRequest) -> Result<Self, Self::Error> {
if value.app.is_empty() {
anyhow::bail!("app cannot be empty")
}
if value.branch.is_empty() {
anyhow::bail!("branch cannot be empty")
}
if value.upload_id.is_empty() {
anyhow::bail!("folder cannot be empty")
}
Ok(Self {
app: value.app,
branch: value.branch,
upload_id: value.upload_id.try_into()?,
})
}
}
impl TryFrom<TriggerReleaseRequest> for Release {
type Error = anyhow::Error;
fn try_from(value: TriggerReleaseRequest) -> Result<Self, Self::Error> {
if value.app.is_empty() {
anyhow::bail!("app cannot be empty");
}
if value.branch.is_empty() {
anyhow::bail!("branch canot be empty");
}
Ok(Self {
app: value.app,
branch: value.branch,
})
}
}
pub async fn tonic_serve(host: SocketAddr, app: SharedApp) -> anyhow::Result<()> {
tracing::info!("grpc listening on: {}", host);
Server::builder()
.add_service(greeter_server::GreeterServer::new(FluxReleaserGrpc::new(
app,
)))
.trace_fn(|_| tracing::info_span!("flux_releaser"))
.add_service(flux_releaser_server::FluxReleaserServer::new(
FluxReleaserGrpc::new(app),
))
.serve(host)
.await?;
Ok(())
}
pub struct LogLayer {}

View File

@ -0,0 +1,15 @@
use cli::Command;
pub mod api;
pub mod app;
pub mod cli;
pub mod grpc;
pub mod services;
pub async fn run() -> anyhow::Result<()> {
dotenv::dotenv().ok();
Command::run().await?;
Ok(())
}

View File

@ -1,19 +1,6 @@
use cli::Command;
mod cli;
mod api;
mod grpc;
mod app;
mod services;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
dotenv::dotenv().ok();
Command::run().await?;
flux_releaser::run().await?;
Ok(())
}

View File

@ -1,3 +1,11 @@
mod domain_events;
mod file_store;
pub mod archive;
pub mod cluster_list;
pub mod domain_events;
pub mod file_reader;
pub mod file_store;
pub mod flux_local_cluster;
pub mod flux_releaser_uploader;
pub mod git;
pub mod release_manager;
pub mod artifacts_db;

View File

@ -0,0 +1,166 @@
#[derive(Clone, Default)]
pub struct Archive {}
use std::{
io::{Bytes, Cursor},
path::Path,
};
use anyhow::Context;
use super::file_reader::Files;
impl Archive {
pub fn new() -> Self {
Self {}
}
pub async fn create_archive(&self, prefix: &Path, files: Files) -> anyhow::Result<ArchiveFile> {
tracing::trace!("archiving files: {}", files.len());
let buffer = Vec::new();
let cursor = Cursor::new(buffer);
let mut tar_builder = tar::Builder::new(cursor);
for file in files {
let abs_file_path = file.path;
let mut fd = std::fs::File::open(&abs_file_path)?;
if let Some(rel) = file.relative {
tracing::trace!("archiving rel file: {}", rel.display());
tar_builder.append_file(&rel, &mut fd)?;
} else {
tracing::trace!("archiving file: {}", abs_file_path.display());
tar_builder.append_file(
abs_file_path
.strip_prefix(prefix)
.context("failed to strip prefix from path")?,
&mut fd,
)?;
}
}
tar_builder.finish()?;
let cursor = tar_builder.into_inner()?;
let buffer = cursor.into_inner();
Ok(buffer.into())
}
pub async fn unpack_archive(&self, archive: &ArchiveFile, dest: &Path) -> anyhow::Result<()> {
tracing::trace!("unpacking archive: {}", dest.display());
let cursor = Cursor::new(archive.content.clone());
let mut arc = tar::Archive::new(cursor);
arc.unpack(dest)?;
Ok(())
}
}
pub struct ArchiveFile {
pub content: Vec<u8>,
}
impl From<Vec<u8>> for ArchiveFile {
fn from(value: Vec<u8>) -> Self {
Self { content: value }
}
}
pub mod extensions {
use std::{env::temp_dir, path::PathBuf};
use async_trait::async_trait;
use tokio::io::AsyncWriteExt;
use uuid::Uuid;
use crate::app::SharedLocalApp;
use crate::{app::SharedApp, services::release_manager::models::ArtifactID};
use crate::services::file_store::FileStore;
use super::Archive;
use super::ArchiveFile;
pub trait ArchiveExt {
fn archive(&self) -> Archive;
}
impl ArchiveExt for SharedApp {
fn archive(&self) -> Archive {
Archive::new()
}
}
impl ArchiveExt for SharedLocalApp {
fn archive(&self) -> Archive {
Archive::new()
}
}
#[async_trait]
pub trait ArchiveUploadExt {
async fn upload_archive(
&self,
artifact_id: ArtifactID,
archive: ArchiveFile,
) -> anyhow::Result<()>;
}
#[async_trait]
impl ArchiveUploadExt for FileStore {
async fn upload_archive(
&self,
artifact_id: ArtifactID,
archive: ArchiveFile,
) -> anyhow::Result<()> {
tracing::trace!("uploading archive: {}", artifact_id.to_string());
let suffix = Uuid::new_v4();
let temp_root = temp_dir();
let archive_path = temp_root
.join("flux-releaser")
.join("archives")
.join(format!("{}-{}.tar", &artifact_id.to_string(), suffix));
let archive_file_guard = ArchiveFilePath(archive_path.clone());
tokio::fs::create_dir_all(archive_path.parent().unwrap()).await?;
{
let mut archive_file = tokio::fs::File::create(&archive_path).await?;
archive_file.write_all(&archive.content).await?;
archive_file.flush().await?;
}
self.upload_file(artifact_id, archive_path).await?;
drop(archive_file_guard);
Ok(())
}
}
pub struct ArchiveFilePath(PathBuf);
// make sure we delete the archive file when we're done with it, we don't want it sticking around longer than it needs to
impl Drop for ArchiveFilePath {
fn drop(&mut self) {
let file_path = self.0.clone();
tokio::spawn(async move {
if file_path.exists() {
tracing::trace!("deleting archive: {}", file_path.display());
if let Err(e) = tokio::fs::remove_file(&file_path).await {
tracing::error!("failed to delete archive: {}", e);
}
}
});
}
}
}

View File

@ -0,0 +1,132 @@
use std::sync::Arc;
use sqlx::{prelude::FromRow, PgPool};
use self::defaults::DefaultArtifactsDB;
#[derive(Clone, Debug)]
pub struct AddCommitArtifact {
pub app: String,
pub branch: String,
pub artifact_id: uuid::Uuid,
}
#[derive(Clone, Debug, FromRow)]
pub struct Artifact {
pub app: String,
pub branch: String,
pub artifact_id: uuid::Uuid,
pub created_at: chrono::NaiveDateTime,
}
#[derive(Clone, Debug)]
pub struct GetLatestArtifact {
pub app: String,
pub branch: String,
}
pub mod traits {
use axum::async_trait;
use super::{AddCommitArtifact, Artifact, GetLatestArtifact};
#[async_trait]
pub trait ArtifactsDB {
async fn commit_artifact(&self, commit_artifact: AddCommitArtifact) -> anyhow::Result<()>;
async fn get_latest_artifact(
&self,
get_latest_artifact: GetLatestArtifact,
) -> anyhow::Result<Artifact>;
}
}
pub mod defaults {
use axum::async_trait;
use sqlx::PgPool;
use super::{traits, AddCommitArtifact, Artifact, GetLatestArtifact};
pub struct DefaultArtifactsDB {
db: PgPool,
}
impl DefaultArtifactsDB {
pub fn new(db: PgPool) -> Self {
Self { db }
}
}
#[async_trait]
impl traits::ArtifactsDB for DefaultArtifactsDB {
async fn commit_artifact(&self, commit_artifact: AddCommitArtifact) -> anyhow::Result<()> {
sqlx::query("INSERT INTO artifacts (app, branch, artifact_id) VALUES ($1, $2, $3)")
.bind(commit_artifact.app)
.bind(commit_artifact.branch)
.bind(commit_artifact.artifact_id)
.execute(&self.db)
.await?;
Ok(())
}
async fn get_latest_artifact(
&self,
get_latest_artifact: GetLatestArtifact,
) -> anyhow::Result<Artifact> {
let artifact = sqlx::query_as::<_, Artifact>(
"SELECT
*
FROM
artifacts
WHERE
app = $1 AND
branch = $2
ORDER BY created_at DESC
LIMIT 1",
)
.bind(get_latest_artifact.app)
.bind(get_latest_artifact.branch)
.fetch_one(&self.db)
.await?;
Ok(artifact)
}
}
}
#[derive(Clone)]
pub struct ArtifactsDB {
inner: Arc<dyn traits::ArtifactsDB + Send + Sync>,
}
impl ArtifactsDB {
pub fn new(db: PgPool) -> Self {
Self {
inner: Arc::new(DefaultArtifactsDB::new(db)),
}
}
}
impl std::ops::Deref for ArtifactsDB {
type Target = Arc<dyn traits::ArtifactsDB + Send + Sync>;
fn deref(&self) -> &Self::Target {
&self.inner
}
}
pub mod extensions {
use crate::app::App;
use super::ArtifactsDB;
pub trait ArtifactsDBExt {
fn artifacts_db(&self) -> ArtifactsDB;
}
impl ArtifactsDBExt for App {
fn artifacts_db(&self) -> ArtifactsDB {
ArtifactsDB::new(self.database.clone())
}
}
}

View File

@ -0,0 +1,23 @@
use std::collections::HashMap;
#[derive(Default)]
pub struct ClusterList {}
impl ClusterList {
pub async fn get_list(&self) -> anyhow::Result<HashMap<String, Vec<String>>> {
Ok(HashMap::from([
("dev".into(), vec!["clank-dev".into()]),
("prod".into(), vec!["clank-prod".into()]),
]))
}
pub async fn get(&self, environment: &str) -> anyhow::Result<Option<Vec<String>>> {
let list = self.get_list().await?;
if let Some(x) = list.get(environment) {
Ok(Some(x.clone()))
} else {
Ok(None)
}
}
}

View File

@ -5,12 +5,7 @@ pub struct DomainEvents {
nats: Nats,
}
#[cfg(test)]
use mockall::{automock, mock, predicate::*};
use crate::app::infra::{self, nats::Nats};
#[cfg_attr(test, automock)]
use crate::app::infra::nats::Nats;
impl DomainEvents {
pub fn new(nats: Nats) -> Self {
Self { nats }

View File

@ -1,6 +1,5 @@
use crate::app::SharedApp;
#[mockall_double::double]
use super::DomainEvents;
pub trait DomainEventsExt {

View File

@ -0,0 +1,138 @@
#[derive(Clone, Default)]
pub struct FileReader {}
use std::{
collections::BTreeMap,
path::{Path, PathBuf},
};
pub mod extensions;
use anyhow::{anyhow, Context};
impl FileReader {
pub fn new() -> Self {
Self::default()
}
pub async fn read_files(&self, location: PathBuf) -> anyhow::Result<Files> {
tracing::trace!("reading files: {}", location.display());
let mut clusters: BTreeMap<String, Vec<File>> = BTreeMap::new();
let mut dir = tokio::fs::read_dir(&location)
.await
.context(format!("failed to find location: {}", &location.display()))?;
while let Some(dir_entry) = dir.next_entry().await? {
if dir_entry.metadata().await?.is_dir() {
tracing::trace!("found cluster in: {}", dir_entry.path().display());
clusters.insert(
dir_entry
.file_name()
.into_string()
.map_err(|_| anyhow!("failed to convert file name to string"))?,
Vec::new(),
);
}
}
for (cluster_name, files) in clusters.iter_mut() {
for file in walkdir::WalkDir::new(location.join(cluster_name))
.into_iter()
.flatten()
{
if !file.file_type().is_file() {
continue;
}
tracing::trace!(
"adding file: {} to cluster: {}",
file.path().display(),
cluster_name
);
if file.path().is_absolute() {
files.push((file.path(), file.path().strip_prefix(&location)?).into())
} else {
files.push(file.into_path().into())
}
}
}
Ok(clusters.into())
}
}
#[derive(Debug, Clone)]
pub struct File {
pub path: PathBuf,
pub relative: Option<PathBuf>,
}
impl From<PathBuf> for File {
fn from(value: PathBuf) -> Self {
Self {
path: value,
relative: None,
}
}
}
impl From<(PathBuf, PathBuf)> for File {
fn from(value: (PathBuf, PathBuf)) -> Self {
Self {
path: value.0,
relative: Some(value.1),
}
}
}
impl From<(&Path, &Path)> for File {
fn from(value: (&Path, &Path)) -> Self {
Self {
path: value.0.to_path_buf(),
relative: Some(value.1.to_path_buf()),
}
}
}
#[derive(Default, Clone, Debug)]
pub struct Files {
inner: BTreeMap<String, Vec<File>>,
}
impl From<BTreeMap<String, Vec<File>>> for Files {
fn from(value: BTreeMap<String, Vec<File>>) -> Self {
Self { inner: value }
}
}
impl std::ops::Deref for Files {
type Target = BTreeMap<String, Vec<File>>;
fn deref(&self) -> &Self::Target {
&self.inner
}
}
impl From<Files> for Vec<File> {
fn from(value: Files) -> Self {
value
.iter()
.map(|(cluster_name, files)| (PathBuf::from(cluster_name), files))
.flat_map(|(_cluster_name, files)| files.to_vec())
// .map(|f| f.into())
.collect::<Vec<_>>()
}
}
impl IntoIterator for Files {
type Item = File;
type IntoIter = std::vec::IntoIter<Self::Item>;
fn into_iter(self) -> Self::IntoIter {
let files: Vec<File> = self.into();
files.into_iter()
}
}

View File

@ -0,0 +1,19 @@
use crate::app::{SharedApp, SharedLocalApp};
use super::FileReader;
pub trait FileReaderExt {
fn file_reader(&self) -> FileReader;
}
impl FileReaderExt for SharedApp {
fn file_reader(&self) -> FileReader {
FileReader::new()
}
}
impl FileReaderExt for SharedLocalApp {
fn file_reader(&self) -> FileReader {
FileReader::new()
}
}

View File

@ -1,30 +1,108 @@
use std::{path::PathBuf, sync::Arc};
use std::{env::temp_dir, path::PathBuf};
use super::release_manager::models::ArtifactID;
use super::release_manager::models::{ArtifactID, UploadArtifactID};
pub mod extensions;
#[derive(Clone)]
pub struct FileStore {
client: aws_sdk_s3::Client,
bucket: String,
}
#[cfg(test)]
use mockall::{automock, mock, predicate::*};
use aws_sdk_s3::primitives::ByteStream;
use tokio::io::BufReader;
#[cfg_attr(test, automock)]
impl FileStore {
pub fn new(client: aws_sdk_s3::Client) -> Self {
Self { client }
Self {
client,
bucket: std::env::var("BUCKET_NAME").unwrap_or("flux-releaser".into()),
}
}
pub async fn upload_files(
&self,
artifact_id: ArtifactID,
files: Vec<PathBuf>,
) -> anyhow::Result<()> {
pub async fn upload_file(&self, artifact_id: ArtifactID, file: PathBuf) -> anyhow::Result<()> {
tracing::trace!("uploading files: {}", artifact_id.to_string());
self.client
.put_object()
.bucket(&self.bucket)
.key(format!("archives/{}.tar", &artifact_id.to_string()))
.body(ByteStream::from_path(file).await?)
.send()
.await?;
Ok(())
}
pub async fn upload_temp(&self, id: UploadArtifactID, file: PathBuf) -> anyhow::Result<()> {
tracing::trace!("uploading temp files: {}", id.to_string());
self.client
.put_object()
.bucket(&self.bucket)
.key(format!("temp/{}.tar", &id.to_string()))
.body(ByteStream::from_path(file).await?)
.send()
.await?;
Ok(())
}
pub async fn get_archive(&self, artifact_id: ArtifactID) -> anyhow::Result<PathBuf> {
tracing::trace!("getting archive: {}", artifact_id.to_string());
let archive_name = format!("archives/{}.tar", &artifact_id.to_string());
let obj = self
.client
.get_object()
.bucket(&self.bucket)
.key(&archive_name)
.send()
.await?;
let archive_path = temp_dir()
.join("flux_releaser")
.join("cache")
.join(&archive_name);
tokio::fs::create_dir_all(archive_path.parent().unwrap()).await?;
let mut archive_file = tokio::fs::File::create(&archive_path).await?;
let mut buf_reader = BufReader::new(obj.body.into_async_read());
tokio::io::copy(&mut buf_reader, &mut archive_file).await?;
tracing::debug!("created archive: {}", archive_path.display());
Ok(archive_path)
}
pub async fn get_temp(&self, artifact_id: UploadArtifactID) -> anyhow::Result<PathBuf> {
tracing::trace!("getting archive: {}", artifact_id.to_string());
let archive_name = format!("temp/{}.tar", &artifact_id.to_string());
let obj = self
.client
.get_object()
.bucket(&self.bucket)
.key(&archive_name)
.send()
.await?;
let archive_path = temp_dir()
.join("flux_releaser")
.join("downloads/cache")
.join(&archive_name);
tokio::fs::create_dir_all(archive_path.parent().unwrap()).await?;
let mut archive_file = tokio::fs::File::create(&archive_path).await?;
let mut buf_reader = BufReader::new(obj.body.into_async_read());
tokio::io::copy(&mut buf_reader, &mut archive_file).await?;
tracing::debug!("created archive: {}", archive_path.display());
Ok(archive_path)
}
}

View File

@ -1,6 +1,5 @@
use crate::app::SharedApp;
#[mockall_double::double]
use super::FileStore;
pub trait FileStoreExt {

View File

@ -0,0 +1,138 @@
use std::path::PathBuf;
use anyhow::Context;
use crate::{
app::infra::grpc::FluxReleaserGrpcClient,
grpc::gen::{CommitArtifactRequest, TriggerReleaseRequest},
};
use super::{
archive::Archive,
file_reader::FileReader,
flux_releaser_uploader::FluxReleaserUploader,
release_manager::models::{ArtifactID, UploadArtifactID},
};
pub struct FluxLocalClusterManager {
file_reader: FileReader,
archive: Archive,
flux_uploader: FluxReleaserUploader,
flux_releaser_client: FluxReleaserGrpcClient,
}
impl std::fmt::Debug for FluxLocalClusterManager {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
Ok(())
}
}
impl FluxLocalClusterManager {
pub fn new(
file_reader: FileReader,
archive: Archive,
flux_uploader: FluxReleaserUploader,
flux_releaser_client: FluxReleaserGrpcClient,
) -> Self {
Self {
file_reader,
archive,
flux_uploader,
flux_releaser_client,
}
}
#[tracing::instrument(skip(include))]
pub async fn package_clusters(
&self,
include: impl Into<PathBuf>,
) -> anyhow::Result<UploadArtifactID> {
let include = include.into();
tracing::debug!("reading files");
let files = self.file_reader.read_files(include.clone()).await?;
tracing::debug!("creating archive");
let archive = self.archive.create_archive(&include, files).await?;
tracing::debug!("uploading archive");
let upload_id = self.flux_uploader.upload_archive(archive).await?;
tracing::debug!("done packaging clusters");
Ok(upload_id)
}
pub async fn commit_artifact(
&self,
app: impl Into<String>,
branch: impl Into<String>,
upload_id: UploadArtifactID,
) -> anyhow::Result<ArtifactID> {
let artifact_id = self
.flux_releaser_client
.lock()
.await
.commit_artifact(tonic::Request::new(CommitArtifactRequest {
app: app.into(),
branch: branch.into(),
upload_id: upload_id.to_string(),
}))
.await?;
artifact_id.into_inner().artifact_id.try_into()
}
pub async fn trigger_release(
&self,
app: impl Into<String>,
branch: impl Into<String>,
) -> anyhow::Result<()> {
self.flux_releaser_client
.lock()
.await
.trigger_release(tonic::Request::new(TriggerReleaseRequest {
app: app.into(),
branch: branch.into(),
}))
.await
.context("failed to trigger release")?;
// Send release proto to upstream
// 1. find app by app + branch
// 2. Unpack latest artifact by app + branch
// 3. Unpack by cluster
// 4. Upload
Ok(())
}
}
pub mod extensions {
use crate::{
app::SharedLocalApp,
services::{
archive::extensions::ArchiveExt, file_reader::extensions::FileReaderExt,
flux_releaser_uploader::extensions::FluxReleaserUploaderExt,
},
};
use super::FluxLocalClusterManager;
pub trait FluxLocalClusterManagerExt {
fn flux_local_cluster_manager(&self) -> FluxLocalClusterManager;
}
impl FluxLocalClusterManagerExt for SharedLocalApp {
fn flux_local_cluster_manager(&self) -> FluxLocalClusterManager {
FluxLocalClusterManager::new(
self.file_reader(),
self.archive(),
self.flux_releaser_uploader(),
self.grpc_client.clone(),
)
}
}
}

View File

@ -0,0 +1,54 @@
use tonic::transport::Channel;
use crate::{
app::infra::grpc::FluxReleaserGrpcClient,
grpc::gen::{flux_releaser_client::FluxReleaserClient, UploadArtifactRequest},
};
use super::{archive::ArchiveFile, release_manager::models::UploadArtifactID};
pub struct FluxReleaserUploader {
flux_client: FluxReleaserGrpcClient,
}
impl FluxReleaserUploader {
pub fn new(flux_client: FluxReleaserGrpcClient) -> Self {
Self { flux_client }
}
pub async fn upload_archive(&self, archive: ArchiveFile) -> anyhow::Result<UploadArtifactID> {
let chunks = archive
.content
.chunks(1_000_000) // Slice by about 1MB
.map(|ch| UploadArtifactRequest {
content: ch.to_vec(),
})
.collect::<Vec<_>>();
let iter = tokio_stream::iter(chunks);
let resp = self
.flux_client
.lock()
.await
.upload_artifact(tonic::Request::new(iter))
.await?;
resp.into_inner().upload_id.try_into()
}
}
pub mod extensions {
use crate::app::SharedLocalApp;
use super::FluxReleaserUploader;
pub trait FluxReleaserUploaderExt {
fn flux_releaser_uploader(&self) -> FluxReleaserUploader;
}
impl FluxReleaserUploaderExt for SharedLocalApp {
fn flux_releaser_uploader(&self) -> FluxReleaserUploader {
FluxReleaserUploader::new(self.grpc_client.clone())
}
}
}

View File

@ -0,0 +1,337 @@
use std::{
path::{Path, PathBuf},
sync::Arc,
};
use anyhow::Context;
use git2::{
build::{CheckoutBuilder, RepoBuilder},
Cred, FetchOptions, IndexAddOption, PushOptions, RemoteCallbacks, Repository, ResetType,
Signature,
};
use tokio::{io::AsyncWriteExt, sync::Mutex};
use super::{
archive::{Archive, ArchiveFile},
cluster_list::ClusterList,
};
#[derive(Clone)]
pub struct SharedGit {
inner: Arc<Git>,
}
impl From<Git> for SharedGit {
fn from(value: Git) -> Self {
Self {
inner: Arc::new(value),
}
}
}
impl std::ops::Deref for SharedGit {
type Target = Git;
fn deref(&self) -> &Self::Target {
&self.inner
}
}
pub struct Git {
location: Mutex<PathBuf>,
registry: String,
cluster_list: ClusterList,
archive: Archive,
}
impl Git {
pub fn new(registry: String, cluster_list: ClusterList, archive: Archive) -> Self {
Self {
registry,
location: Mutex::new(std::env::temp_dir().join("flux_releaser")),
cluster_list,
archive,
}
}
pub async fn publish(
&self,
archive: &ArchiveFile,
service: &str,
namespace: &str,
environment: &str,
) -> anyhow::Result<()> {
// TODO: implement exponential backoff and 5 attempts
self.publish_attempt(archive, service, namespace, environment)
.await?;
Ok(())
}
async fn publish_attempt(
&self,
archive: &ArchiveFile,
service: &str,
namespace: &str,
environment: &str,
) -> anyhow::Result<()> {
// 1. Clone repo into location (with lock) or update
let location = self.location.lock().await;
let repo_dir = location.join("repo");
let repo = match self.get_repo(&repo_dir).await? {
//TODO: Possible handle error by just recloning the repo
None => self.clone_repo(&repo_dir).await?,
Some(repo) => {
self.update_repo(repo).await?;
self.get_repo(&repo_dir)
.await?
.ok_or(anyhow::anyhow!("failed to open repository"))?
}
};
// 2. Extract from archive
// TODO: maybe pad archive with hash or something
let unpack_dir = location.join("archive");
self.archive.unpack_archive(archive, &unpack_dir).await?;
// 3. Splat tar over application and cluster
// The archive should always be structured like so:
// - <environment>/
// - * manifests
// 3a. prepare git repo for new files
let clusters = self
.cluster_list
.get(environment)
.await?
.ok_or(anyhow::anyhow!(
"environment is not registered: {} in cluster list",
environment
))?;
for cluster in clusters {
let service_entry = repo_dir
.join("deployments")
.join(&cluster)
.join(namespace)
.join(service);
if service_entry.exists() {
if let Err(e) = tokio::fs::remove_dir_all(&service_entry).await {
tracing::warn!("failed to remove existing dir: {}", e);
}
}
tokio::fs::create_dir_all(&service_entry).await?;
let cluster_entry = repo_dir
.join("clusters")
.join(&cluster)
.join(namespace)
.join(service);
if cluster_entry.exists() {
if let Err(e) = tokio::fs::remove_dir_all(&cluster_entry).await {
tracing::warn!("failed to remove existing dir: {}", e);
}
}
tokio::fs::create_dir_all(&cluster_entry).await?;
let archive_dir = unpack_dir.join(environment);
if !archive_dir.exists() {
anyhow::bail!("selected environment is not published for archive");
}
let mut read_dir = tokio::fs::read_dir(archive_dir).await?;
while let Some(entry) = read_dir.next_entry().await? {
if entry.metadata().await?.is_file() {
let entry_path = entry.path();
let dest_path = service_entry.join(entry.file_name());
tokio::fs::copy(entry_path, dest_path).await?;
}
}
let cluster_entry_file = cluster_entry.join(format!("{service}.yaml"));
let mut cluster_entry_file = tokio::fs::File::create(cluster_entry_file).await?;
let file_contents = format!(
r#"
apiVersion: kustomize.toolkit.fluxcd.io/v1beta2
kind: Kustomization
metadata:
name: {service}
namespace: flux-system
spec:
interval: 1m
retryInterval: 30s
path: ./deployments/{cluster}/{namespace}/{service}
prune: true
sourceRef:
kind: GitRepository
name: flux-system
namespace: flux-system
"#
);
cluster_entry_file
.write_all(file_contents.as_bytes())
.await?;
}
// 4. Commit && Push
self.commit_and_push(repo, environment, service).await?;
// 5. Cleanup
tokio::fs::remove_dir_all(unpack_dir).await?;
Ok(())
}
async fn get_repo(&self, location: &Path) -> anyhow::Result<Option<Repository>> {
match Repository::open(location) {
Ok(r) => Ok(Some(r)),
Err(e) => match e.code() {
git2::ErrorCode::NotFound => Ok(None),
_ => Err(e).context("failed to open git repository"),
},
}
}
async fn clone_repo(&self, location: &Path) -> anyhow::Result<Repository> {
let co = CheckoutBuilder::new();
let mut fo = FetchOptions::new();
fo.remote_callbacks(self.get_cred()?);
let repo = RepoBuilder::new()
.fetch_options(fo)
.with_checkout(co)
.clone(&self.registry, location)
.context("failed to clone repository")?;
Ok(repo)
}
async fn update_repo(&self, repository: Repository) -> anyhow::Result<()> {
let mut remote = repository.find_remote("origin")?;
let mut fo = FetchOptions::new();
fo.remote_callbacks(self.get_cred()?);
remote
.fetch(&["main"], Some(&mut fo), None)
.context("failed to update repo")?;
let origin_head = repository.find_reference("refs/remotes/origin/HEAD")?;
let origin_head_commit = origin_head.peel_to_commit()?;
// Perform a hard reset to the origin's HEAD
repository.reset(origin_head_commit.as_object(), ResetType::Hard, None)?;
Ok(())
}
fn get_cred(&self) -> anyhow::Result<RemoteCallbacks> {
let mut cb = RemoteCallbacks::new();
cb.credentials(|_, username, _| {
if let Ok(_sock) = std::env::var("SSH_AUTH_SOCK") {
return Cred::ssh_key_from_agent(username.unwrap_or("git"));
}
let username = std::env::var("GIT_USERNAME").expect("GIT_USERNAME to be set");
let password = std::env::var("GIT_PASSWORD").expect("GIT_PASSWORD to be set");
Cred::userpass_plaintext(&username, &password)
});
cb.certificate_check(|_cert, _| Ok(git2::CertificateCheckStatus::CertificateOk));
Ok(cb)
}
async fn commit_and_push(
&self,
repo: Repository,
environment: &str,
service: &str,
) -> anyhow::Result<()> {
let mut index = repo.index()?;
// Add all files to the index
index.add_all(["*"].iter(), IndexAddOption::DEFAULT, None)?;
index.write()?;
// Create a tree from the index
let oid = index.write_tree()?;
let tree = repo.find_tree(oid)?;
// Get the current HEAD commit
let parent_commit = repo.head()?.peel_to_commit()?;
// Create a signature
let sig = Signature::now("flux_releaser", "operations+flux-releaser@kjuulh.io")?;
// Create the commit
repo.commit(
Some("HEAD"),
&sig,
&sig,
&format!("chore({environment}/{service}): releasing service"),
&tree,
&[&parent_commit],
)?;
let mut remote = repo.find_remote("origin")?;
let mut po = PushOptions::new();
po.remote_callbacks(self.get_cred()?);
remote.push(
&[&format!("refs/heads/{}:refs/heads/{}", "main", "main")],
Some(&mut po),
)?;
Ok(())
}
}
#[cfg(test)]
mod test {
use tokio::io::AsyncReadExt;
use uuid::Uuid;
use crate::services::{
archive::{Archive, ArchiveFile},
cluster_list::ClusterList,
git::Git,
};
#[tokio::test]
async fn can_clone_upstream() -> anyhow::Result<()> {
// FIXME: right now CI doesn't support git
return Ok(());
let random = Uuid::new_v4().to_string();
println!("running test for id: {}", random);
println!("current_dir: {}", std::env::current_dir()?.display());
let mut arch = tokio::fs::File::open("testdata/example.tar").await?;
let mut dest = Vec::new();
arch.read_to_end(&mut dest).await?;
let git = Git::new(
"ssh://git@git.front.kjuulh.io/kjuulh/clank-clusters.git".into(),
ClusterList::default(),
Archive::default(),
);
let mut location = git.location.lock().await;
*location = location.join(random);
println!("into: {}", location.display());
drop(location);
git.publish(
&ArchiveFile { content: dest },
"flux-releaser-test",
"dev",
"dev",
)
.await?;
Ok(())
}
}

View File

@ -1,32 +1,58 @@
use serde::Serialize;
#[mockall_double::double]
use crate::services::archive::{Archive, ArchiveFile};
use crate::services::artifacts_db::{AddCommitArtifact, GetLatestArtifact};
use crate::services::file_store::FileStore;
#[mockall_double::double]
use super::artifacts_db::ArtifactsDB;
use super::domain_events::DomainEvents;
use super::git::SharedGit;
use self::models::{ArtifactID, CommitArtifact};
use self::models::*;
pub struct ReleaseManager {
file_store: FileStore,
domain_events: DomainEvents,
artifacts_db: ArtifactsDB,
git: SharedGit,
}
impl ReleaseManager {
pub fn new(file_store: FileStore, domain_events: DomainEvents) -> Self {
pub fn new(
file_store: FileStore,
domain_events: DomainEvents,
artifacts_db: ArtifactsDB,
git: SharedGit,
) -> Self {
Self {
file_store,
domain_events,
artifacts_db,
git,
}
}
pub async fn upload_artifact(
&self,
request: UploadArtifact,
) -> anyhow::Result<UploadArtifactID> {
let upload_id = uuid::Uuid::now_v7();
self.file_store
.upload_temp(upload_id.into(), request.file_path)
.await?;
Ok(upload_id.into())
}
pub async fn commit_artifact(&self, request: CommitArtifact) -> anyhow::Result<ArtifactID> {
tracing::debug!("committing artifact: {:?}", request);
let artifact_id = ArtifactID::new();
let artifact = self.file_store.get_temp(request.upload_id).await?;
self.file_store
.upload_files(artifact_id.clone(), Vec::new())
.upload_file(artifact_id.clone(), artifact)
.await?;
self.domain_events
@ -35,8 +61,68 @@ impl ReleaseManager {
})?)
.await?;
self.artifacts_db
.commit_artifact(AddCommitArtifact {
app: request.app,
branch: request.branch,
artifact_id: artifact_id.clone().into(),
})
.await?;
Ok(artifact_id)
}
pub async fn release(&self, release_req: Release) -> anyhow::Result<()> {
tracing::debug!(
app = release_req.app,
branch = release_req.branch,
"releasing latest commit"
);
let latest_artifact = self
.artifacts_db
.get_latest_artifact(GetLatestArtifact {
app: release_req.app.clone(),
branch: release_req.branch.clone(),
})
.await?;
tracing::trace!("found latest artifact: {:?}", latest_artifact);
let artifact = self
.file_store
.get_archive(latest_artifact.artifact_id.into())
.await?;
tracing::trace!("placed artifact in: {}", artifact.display());
self.domain_events
.publish_event(&serde_json::to_string(&PublishedArtifactEvent {
artifact_id: latest_artifact.artifact_id.to_string(),
app: release_req.app.clone(),
branch: release_req.branch.clone(),
})?)
.await?;
let artifact_contents = tokio::fs::read(artifact).await?;
let env = if release_req.branch == "main" {
// FIXME: select prod instead
"prod"
//"dev"
} else {
"dev"
};
self.git
.publish(
&ArchiveFile::from(artifact_contents),
&release_req.app,
env,
env,
)
.await?;
Ok(())
}
}
#[derive(Serialize)]
@ -44,39 +130,12 @@ pub struct CommittedArtifactEvent {
artifact_id: String,
}
#[derive(Serialize)]
pub struct PublishedArtifactEvent {
app: String,
branch: String,
artifact_id: String,
}
pub mod extensions;
pub mod models;
#[cfg(test)]
mod test {
use crate::services::domain_events::MockDomainEvents;
use crate::services::file_store::MockFileStore;
use super::*;
#[tokio::test]
async fn generated_artifact_id() -> anyhow::Result<()> {
let mut file_store = MockFileStore::default();
file_store
.expect_upload_files()
.times(1)
.returning(|_, _| Ok(()));
let mut domain_events = MockDomainEvents::default();
domain_events
.expect_publish_event()
.times(1)
.returning(|_| Ok(()));
let releaser_manager = ReleaseManager::new(file_store, domain_events);
releaser_manager
.commit_artifact(CommitArtifact {
app: "app".into(),
branch: "branch".into(),
})
.await?;
Ok(())
}
}

View File

@ -1,6 +1,9 @@
use crate::{
app::SharedApp,
services::{domain_events::extensions::DomainEventsExt, file_store::extensions::FileStoreExt},
services::{
artifacts_db::extensions::ArtifactsDBExt, domain_events::extensions::DomainEventsExt,
file_store::extensions::FileStoreExt,
},
};
use super::ReleaseManager;
@ -11,6 +14,11 @@ pub trait ReleaseManagerExt {
impl ReleaseManagerExt for SharedApp {
fn release_manager(&self) -> ReleaseManager {
ReleaseManager::new(self.file_store(), self.domain_events())
ReleaseManager::new(
self.file_store(),
self.domain_events(),
self.artifacts_db(),
self.git.clone(),
)
}
}

View File

@ -1,7 +1,16 @@
use std::{ops::Deref, path::PathBuf};
#[derive(Clone, Debug)]
pub struct CommitArtifact {
pub app: String,
pub branch: String,
pub upload_id: UploadArtifactID,
}
#[derive(Clone, Debug)]
pub struct Release {
pub app: String,
pub branch: String,
}
#[derive(Debug, Clone)]
@ -20,3 +29,61 @@ impl std::ops::Deref for ArtifactID {
&self.0
}
}
impl TryFrom<String> for ArtifactID {
type Error = anyhow::Error;
fn try_from(value: String) -> Result<Self, Self::Error> {
let uuid = uuid::Uuid::parse_str(&value)?;
Ok(ArtifactID(uuid))
}
}
impl From<uuid::Uuid> for ArtifactID {
fn from(value: uuid::Uuid) -> Self {
Self(value)
}
}
pub struct UploadArtifact {
pub file_path: PathBuf,
}
impl From<PathBuf> for UploadArtifact {
fn from(value: PathBuf) -> Self {
Self { file_path: value }
}
}
#[derive(Clone, Debug)]
pub struct UploadArtifactID(uuid::Uuid);
impl From<uuid::Uuid> for UploadArtifactID {
fn from(value: uuid::Uuid) -> Self {
Self(value)
}
}
impl From<ArtifactID> for uuid::Uuid {
fn from(value: ArtifactID) -> Self {
value.0
}
}
impl TryFrom<String> for UploadArtifactID {
type Error = anyhow::Error;
fn try_from(value: String) -> Result<Self, Self::Error> {
let uuid = uuid::Uuid::parse_str(&value)?;
Ok(Self(uuid))
}
}
impl Deref for UploadArtifactID {
type Target = uuid::Uuid;
fn deref(&self) -> &Self::Target {
&self.0
}
}

Binary file not shown.

View File

@ -0,0 +1,14 @@
use flux_releaser::services::{
archive::Archive, file_reader::FileReader, flux_local_cluster::FluxLocalClusterManager,
};
#[tokio::test]
async fn can_package_files() -> anyhow::Result<()> {
tracing_subscriber::fmt().init();
//let sut = FluxLocalClusterManager::new(FileReader::new(), Archive::new());
//sut.package_clusters("testdata/flux_local_cluster/").await?;
Ok(())
}

View File

@ -0,0 +1,240 @@
use std::{
net::{Ipv4Addr, SocketAddr},
sync::Arc,
time::Duration,
};
use anyhow::Context;
use flux_releaser::{
app::{LocalApp, SharedApp, SharedLocalApp},
grpc::gen::flux_releaser_client::FluxReleaserClient,
services::{
archive::ArchiveFile, file_store::extensions::FileStoreExt,
flux_local_cluster::extensions::FluxLocalClusterManagerExt,
flux_releaser_uploader::FluxReleaserUploader,
},
};
use tokio::{net::TcpListener, runtime::Runtime, sync::Mutex, time::sleep};
struct Server {
endpoints: Endpoints,
app: SharedApp,
}
#[derive(Clone, Debug)]
struct Endpoints {
http: SocketAddr,
grpc: SocketAddr,
}
impl Server {
pub async fn new() -> anyhow::Result<Self> {
let http_socket = Self::find_free_port().await?;
let grpc_socket = Self::find_free_port().await?;
Ok(Self {
endpoints: Endpoints {
http: http_socket,
grpc: grpc_socket,
},
app: SharedApp::new(flux_releaser::app::App::new().await?),
})
}
pub async fn start(&self) -> anyhow::Result<()> {
flux_releaser::cli::server::run_server(self.endpoints.http, self.endpoints.grpc).await?;
Ok(())
}
pub async fn find_free_port() -> anyhow::Result<SocketAddr> {
let socket = SocketAddr::new(std::net::IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0);
let listener = TcpListener::bind(socket).await?;
listener.local_addr().context("failed to get local addr")
}
}
static INIT: std::sync::Once = std::sync::Once::new();
async fn perform_task_with_backoff<F, Fut, T, E>(
mut task: F,
max_retries: u32,
base_delay_ms: u64,
) -> Result<T, E>
where
F: FnMut() -> Fut,
Fut: std::future::Future<Output = Result<T, E>>,
{
let mut retries = 0;
let mut delay = base_delay_ms;
loop {
match task().await {
Ok(result) => return Ok(result),
Err(_e) if retries < max_retries => {
sleep(Duration::from_millis(delay)).await;
delay *= 2; // Exponential backoff
retries += 1;
}
Err(e) => return Err(e),
}
}
}
// Makes sure the setup is ready for execution
async fn is_ready() -> anyhow::Result<()> {
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
perform_task_with_backoff(
|| async {
let endpoints = unsafe {
if ENDPOINTS.is_none() {
anyhow::bail!("endpoints not set yet");
}
ENDPOINTS.clone().unwrap()
};
let resp = reqwest::get(format!("http://{}/ping", endpoints.http)).await?;
if !resp.status().is_success() {
anyhow::bail!("failed with status: {}", resp.status());
}
Ok::<(), anyhow::Error>(())
},
5,
500,
)
.await?;
Ok(())
}
static mut ENDPOINTS: Option<Endpoints> = None;
static mut APP: Option<SharedApp> = None;
async fn setup() -> anyhow::Result<(Endpoints, SharedApp)> {
INIT.call_once(|| {
std::thread::spawn(|| {
let rt = Runtime::new().unwrap();
rt.block_on(async move {
println!("once was created once");
let server = Server::new().await.unwrap();
unsafe {
ENDPOINTS = Some(server.endpoints.clone());
APP = Some(server.app.clone());
}
server.start().await.unwrap();
})
});
});
is_ready().await?;
Ok(unsafe { (ENDPOINTS.clone().unwrap(), APP.clone().unwrap()) })
}
async fn local_setup(endpoints: Endpoints) -> anyhow::Result<SharedLocalApp> {
Ok(SharedLocalApp::new(
LocalApp::new(format!("http://{}", endpoints.grpc)).await?,
))
}
#[tokio::test]
async fn can_upload_artifact() -> anyhow::Result<()> {
return Ok(());
std::env::set_var("RUST_LOG", "flux_releaser=trace");
let (endpoints, app) = setup().await?;
let client = FluxReleaserClient::connect(format!("http://{}", endpoints.grpc)).await?;
let client = FluxReleaserUploader::new(Arc::new(Mutex::new(client)));
let bytes: Vec<u8> = vec![0; 10_000_000];
let upload_id = client
.upload_archive(ArchiveFile {
content: bytes.clone(),
})
.await?;
assert!(!upload_id.to_string().is_empty());
let actual_path = app.file_store().get_temp(upload_id).await?;
let actual_content = tokio::fs::read(actual_path).await?;
assert_eq!(&bytes, &actual_content);
Ok(())
}
#[tokio::test]
async fn can_publish_artifact() -> anyhow::Result<()> {
return Ok(());
std::env::set_var("RUST_LOG", "flux_releaser=trace");
let (endpoints, app) = setup().await?;
let local_app = local_setup(endpoints.clone()).await?;
let upload_id = local_app
.flux_local_cluster_manager()
.package_clusters("testdata/flux_local_cluster")
.await?;
let archive = app.file_store().get_temp(upload_id.clone()).await?;
assert!(archive.exists());
let artifact_id = local_app
.flux_local_cluster_manager()
.commit_artifact("some-app", "some-branch", upload_id)
.await?;
let artifact = app.file_store().get_archive(artifact_id).await?;
assert!(artifact.exists());
Ok(())
}
#[tokio::test]
async fn can_trigger_latest_release() -> anyhow::Result<()> {
return Ok(());
let test_id = uuid::Uuid::now_v7();
std::env::set_var("RUST_LOG", "flux_releaser=trace");
let (endpoints, app) = setup().await?;
let local_app = local_setup(endpoints.clone()).await?;
let upload_id = local_app
.flux_local_cluster_manager()
.package_clusters("testdata/flux_local_cluster")
.await?;
let archive = app.file_store().get_temp(upload_id.clone()).await?;
assert!(archive.exists());
let _ = local_app
.flux_local_cluster_manager()
.commit_artifact(test_id, "some-branch", upload_id)
.await?;
local_app
.flux_local_cluster_manager()
.trigger_release(test_id, "some-branch")
.await?;
// 1. Verify that release event has been sent
// 2. Verify that we've splatted the flux cluster over the upstream registry
// 3. Verify database has a release record
Ok(())
}
pub struct TestGreeter {}

View File

@ -5,16 +5,17 @@ base: "git@git.front.kjuulh.io:kjuulh/cuddle-rust-service-plan.git"
vars:
service: "flux-releaser"
registry: kasperhermansen
clusters:
clank-prod:
replicas: "3"
namespace: prod
database:
crdb: "true"
ingress:
- external: "true"
- internal: "true"
deployment:
registry: git@git.front.kjuulh.io:kjuulh/clank-clusters
env:
prod:
clusters:
- clank-prod
cuddle/clusters:
dev:
env:
service.host: "0.0.0.0:3000"
prod:
env:
service.host: "0.0.0.0:3000"

40
docs/architecture.md Normal file
View File

@ -0,0 +1,40 @@
# Architecture
Flux releaser is structured to support how flux2 recommends structuring a flux project. This means that we will end up with a structure like so
```
clusters/<cluster_names>/kustomizations
<cluster_names>/kubernetes manifests
```
Flux releaser goes a small step further in how it opinionates its folders
First of all each entry in flux-releaser is required to have a unique app and namespace, this is important for guaranteeing unique ness, otherwise flux-releaser can replace services it didn't intend to.
Flux releaser currently makes no guarantee that a namespace and app name is not taken over by another party, this is up to you to control access to the flux gitops repository
As such this means that flux-releaser will place folders in
```
clusters/<cluster_names>/<namespaces>/<apps>/kustomizations
<cluster_names>/<namespaces>/<apps>/kubernetes manifests
```
This means that each app can have one or more `clusters`, `namespaces` but only one `app`, but only one `namespace` pr `cluster`.
The way flux-releaser stages its commits, is that the producer (usually CI) prepares a setup folder. This mirrors the structure above
```
<tmp_dir>/
clusters/
<cluster_names>/
<namespace>/
<kubernetes_manifests>
```
This will then all be bundled up in a tar ball and staged under a specific artifact id, along with some metadata about the release, (when it happened, whom the committer was, which branch it originated from, which service it belongs to etc).
When a flux release is processed it will be triggered via. a branch, the app name, the bundled artifact will then be applied on top of the gitops registry and finally released.

3
renovate.json Normal file
View File

@ -0,0 +1,3 @@
{
"$schema": "https://docs.renovatebot.com/renovate-schema.json"
}

View File

@ -21,8 +21,8 @@ services:
/bin/sh -c "
/usr/bin/mc alias set myminio http://minio:10000 minioadmin minioadminpassword;
/usr/bin/mc admin info myminio;
/usr/bin/mc mb myminio/mybucket;
/usr/bin/mc policy set public myminio/mybucket;
/usr/bin/mc mb myminio/flux-releaser;
/usr/bin/mc policy set public myminio/flux-releaser;
exit 0;
"
@ -34,3 +34,17 @@ services:
- NATS_ENABLE_AUTH=yes
- NATS_USERNAME=user
- NATS_PASSWORD=secret
crdb:
restart: 'always'
image: 'cockroachdb/cockroach:latest'
command: 'start-single-node --advertise-addr 0.0.0.0 --insecure'
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8080/health?ready=1"]
interval: '10s'
timeout: '30s'
retries: 5
start_period: '20s'
ports:
- '28080:8080'
- '26257:26257'