kjuulh
b12653b9e9
Some checks reported errors
continuous-integration/drone/push Build encountered an error
Signed-off-by: kjuulh <contact@kjuulh.io>
241 lines
6.3 KiB
Rust
241 lines
6.3 KiB
Rust
use std::{
|
|
net::{Ipv4Addr, SocketAddr},
|
|
sync::Arc,
|
|
time::Duration,
|
|
};
|
|
|
|
use anyhow::Context;
|
|
use flux_releaser::{
|
|
app::{LocalApp, SharedApp, SharedLocalApp},
|
|
grpc::gen::flux_releaser_client::FluxReleaserClient,
|
|
services::{
|
|
archive::ArchiveFile, file_store::extensions::FileStoreExt,
|
|
flux_local_cluster::extensions::FluxLocalClusterManagerExt,
|
|
flux_releaser_uploader::FluxReleaserUploader,
|
|
},
|
|
};
|
|
use tokio::{net::TcpListener, runtime::Runtime, sync::Mutex, time::sleep};
|
|
|
|
struct Server {
|
|
endpoints: Endpoints,
|
|
app: SharedApp,
|
|
}
|
|
|
|
#[derive(Clone, Debug)]
|
|
struct Endpoints {
|
|
http: SocketAddr,
|
|
grpc: SocketAddr,
|
|
}
|
|
|
|
impl Server {
|
|
pub async fn new() -> anyhow::Result<Self> {
|
|
let http_socket = Self::find_free_port().await?;
|
|
let grpc_socket = Self::find_free_port().await?;
|
|
|
|
Ok(Self {
|
|
endpoints: Endpoints {
|
|
http: http_socket,
|
|
grpc: grpc_socket,
|
|
},
|
|
app: SharedApp::new(flux_releaser::app::App::new().await?),
|
|
})
|
|
}
|
|
|
|
pub async fn start(&self) -> anyhow::Result<()> {
|
|
flux_releaser::cli::server::run_server(self.endpoints.http, self.endpoints.grpc).await?;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
pub async fn find_free_port() -> anyhow::Result<SocketAddr> {
|
|
let socket = SocketAddr::new(std::net::IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0);
|
|
|
|
let listener = TcpListener::bind(socket).await?;
|
|
|
|
listener.local_addr().context("failed to get local addr")
|
|
}
|
|
}
|
|
|
|
static INIT: std::sync::Once = std::sync::Once::new();
|
|
|
|
async fn perform_task_with_backoff<F, Fut, T, E>(
|
|
mut task: F,
|
|
max_retries: u32,
|
|
base_delay_ms: u64,
|
|
) -> Result<T, E>
|
|
where
|
|
F: FnMut() -> Fut,
|
|
Fut: std::future::Future<Output = Result<T, E>>,
|
|
{
|
|
let mut retries = 0;
|
|
let mut delay = base_delay_ms;
|
|
|
|
loop {
|
|
match task().await {
|
|
Ok(result) => return Ok(result),
|
|
Err(_e) if retries < max_retries => {
|
|
sleep(Duration::from_millis(delay)).await;
|
|
delay *= 2; // Exponential backoff
|
|
retries += 1;
|
|
}
|
|
Err(e) => return Err(e),
|
|
}
|
|
}
|
|
}
|
|
|
|
// Makes sure the setup is ready for execution
|
|
async fn is_ready() -> anyhow::Result<()> {
|
|
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
|
|
|
|
perform_task_with_backoff(
|
|
|| async {
|
|
let endpoints = unsafe {
|
|
if ENDPOINTS.is_none() {
|
|
anyhow::bail!("endpoints not set yet");
|
|
}
|
|
|
|
ENDPOINTS.clone().unwrap()
|
|
};
|
|
|
|
let resp = reqwest::get(format!("http://{}/ping", endpoints.http)).await?;
|
|
|
|
if !resp.status().is_success() {
|
|
anyhow::bail!("failed with status: {}", resp.status());
|
|
}
|
|
|
|
Ok::<(), anyhow::Error>(())
|
|
},
|
|
5,
|
|
500,
|
|
)
|
|
.await?;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
static mut ENDPOINTS: Option<Endpoints> = None;
|
|
static mut APP: Option<SharedApp> = None;
|
|
|
|
async fn setup() -> anyhow::Result<(Endpoints, SharedApp)> {
|
|
INIT.call_once(|| {
|
|
std::thread::spawn(|| {
|
|
let rt = Runtime::new().unwrap();
|
|
rt.block_on(async move {
|
|
println!("once was created once");
|
|
let server = Server::new().await.unwrap();
|
|
|
|
unsafe {
|
|
ENDPOINTS = Some(server.endpoints.clone());
|
|
APP = Some(server.app.clone());
|
|
}
|
|
|
|
server.start().await.unwrap();
|
|
})
|
|
});
|
|
});
|
|
|
|
is_ready().await?;
|
|
|
|
Ok(unsafe { (ENDPOINTS.clone().unwrap(), APP.clone().unwrap()) })
|
|
}
|
|
|
|
async fn local_setup(endpoints: Endpoints) -> anyhow::Result<SharedLocalApp> {
|
|
Ok(SharedLocalApp::new(
|
|
LocalApp::new(format!("http://{}", endpoints.grpc)).await?,
|
|
))
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn can_upload_artifact() -> anyhow::Result<()> {
|
|
return Ok(());
|
|
std::env::set_var("RUST_LOG", "flux_releaser=trace");
|
|
let (endpoints, app) = setup().await?;
|
|
|
|
let client = FluxReleaserClient::connect(format!("http://{}", endpoints.grpc)).await?;
|
|
let client = FluxReleaserUploader::new(Arc::new(Mutex::new(client)));
|
|
|
|
let bytes: Vec<u8> = vec![0; 10_000_000];
|
|
let upload_id = client
|
|
.upload_archive(ArchiveFile {
|
|
content: bytes.clone(),
|
|
})
|
|
.await?;
|
|
|
|
assert!(!upload_id.to_string().is_empty());
|
|
|
|
let actual_path = app.file_store().get_temp(upload_id).await?;
|
|
|
|
let actual_content = tokio::fs::read(actual_path).await?;
|
|
|
|
assert_eq!(&bytes, &actual_content);
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn can_publish_artifact() -> anyhow::Result<()> {
|
|
return Ok(());
|
|
std::env::set_var("RUST_LOG", "flux_releaser=trace");
|
|
|
|
let (endpoints, app) = setup().await?;
|
|
let local_app = local_setup(endpoints.clone()).await?;
|
|
|
|
let upload_id = local_app
|
|
.flux_local_cluster_manager()
|
|
.package_clusters("testdata/flux_local_cluster")
|
|
.await?;
|
|
|
|
let archive = app.file_store().get_temp(upload_id.clone()).await?;
|
|
|
|
assert!(archive.exists());
|
|
|
|
let artifact_id = local_app
|
|
.flux_local_cluster_manager()
|
|
.commit_artifact("some-app", "some-branch", upload_id)
|
|
.await?;
|
|
|
|
let artifact = app.file_store().get_archive(artifact_id).await?;
|
|
|
|
assert!(artifact.exists());
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn can_trigger_latest_release() -> anyhow::Result<()> {
|
|
return Ok(());
|
|
let test_id = uuid::Uuid::now_v7();
|
|
|
|
std::env::set_var("RUST_LOG", "flux_releaser=trace");
|
|
|
|
let (endpoints, app) = setup().await?;
|
|
let local_app = local_setup(endpoints.clone()).await?;
|
|
|
|
let upload_id = local_app
|
|
.flux_local_cluster_manager()
|
|
.package_clusters("testdata/flux_local_cluster")
|
|
.await?;
|
|
|
|
let archive = app.file_store().get_temp(upload_id.clone()).await?;
|
|
|
|
assert!(archive.exists());
|
|
|
|
let _ = local_app
|
|
.flux_local_cluster_manager()
|
|
.commit_artifact(test_id, "some-branch", upload_id)
|
|
.await?;
|
|
|
|
local_app
|
|
.flux_local_cluster_manager()
|
|
.trigger_release(test_id, "some-branch")
|
|
.await?;
|
|
|
|
// 1. Verify that release event has been sent
|
|
// 2. Verify that we've splatted the flux cluster over the upstream registry
|
|
// 3. Verify database has a release record
|
|
|
|
Ok(())
|
|
}
|
|
|
|
pub struct TestGreeter {}
|