From ee655d02ef9670b2c0cbaa600c3e2af9f54dbf65 Mon Sep 17 00:00:00 2001 From: kjuulh Date: Fri, 27 Jan 2023 21:57:39 +0100 Subject: [PATCH] with engine --- Cargo.lock | 16 +++++++ Cargo.toml | 2 + src/cli_session.rs | 103 ++++++++++++++++++++++++++++++++++++++++++ src/config.rs | 24 ++++++++++ src/connect_params.rs | 20 ++++++++ src/downloader.rs | 5 +- src/engine.rs | 46 +++++++++++++++++++ src/lib.rs | 4 ++ 8 files changed, 217 insertions(+), 3 deletions(-) create mode 100644 src/cli_session.rs create mode 100644 src/config.rs create mode 100644 src/connect_params.rs create mode 100644 src/engine.rs diff --git a/Cargo.lock b/Cargo.lock index 09008e3..cfdc244 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -150,6 +150,8 @@ dependencies = [ "hex-literal", "platform-info", "reqwest", + "serde", + "serde_json", "sha2", "tar", "tempfile", @@ -898,6 +900,20 @@ name = "serde" version = "1.0.152" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bb7d1f0d3021d347a83e556fc4683dea2ea09d87bccdf88ff5c12545d89d5efb" +dependencies = [ + "serde_derive", +] + +[[package]] +name = "serde_derive" +version = "1.0.152" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "af487d118eecd09402d70a5d72551860e788df87b464af30e5ea6a38c75c541e" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] [[package]] name = "serde_json" diff --git a/Cargo.toml b/Cargo.toml index 6c79191..aa59e79 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,6 +17,8 @@ hex = "0.4.3" hex-literal = "0.3.4" platform-info = "1.0.2" reqwest = { version = "0.11.14", features = ["stream", "blocking", "deflate"] } +serde = { version = "1.0.152", features = ["derive"] } +serde_json = "1.0.91" sha2 = "0.10.6" tar = "0.4.38" tempfile = "3.3.0" diff --git a/src/cli_session.rs b/src/cli_session.rs new file mode 100644 index 0000000..50c57f4 --- /dev/null +++ b/src/cli_session.rs @@ -0,0 +1,103 @@ +use std::{ + fs::canonicalize, + io::{BufRead, BufReader}, + path::PathBuf, + process::Stdio, + sync::Arc, +}; + +use crate::{config::Config, connect_params::ConnectParams}; + +#[derive(Clone, Debug)] +pub struct CliSession { + inner: Arc, +} + +impl CliSession { + pub fn new() -> Self { + Self { + inner: Arc::new(InnerCliSession {}), + } + } + + pub fn connect(&self, config: &Config, cli_path: &PathBuf) -> eyre::Result { + self.inner.connect(config, cli_path) + } +} + +#[derive(Debug)] +struct InnerCliSession {} + +impl InnerCliSession { + pub fn connect(&self, config: &Config, cli_path: &PathBuf) -> eyre::Result { + let mut proc = self.start(config, cli_path)?; + let params = self.get_conn(&mut proc)?; + Ok(params) + } + + fn start(&self, config: &Config, cli_path: &PathBuf) -> eyre::Result { + let mut args: Vec = vec!["session".into()]; + if let Some(workspace) = &config.workdir_path { + let abs_path = canonicalize(workspace)?; + args.extend(["--workdir".into(), abs_path.to_string_lossy().to_string()]) + } + if let Some(config_path) = &config.config_path { + let abs_path = canonicalize(config_path)?; + args.extend(["--project".into(), abs_path.to_string_lossy().to_string()]) + } + + let proc = std::process::Command::new( + cli_path + .to_str() + .ok_or(eyre::anyhow!("could not get string from path"))?, + ) + .args(args.as_slice()) + .stdin(Stdio::piped()) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .spawn()?; + + //TODO: Add retry mechanism + + return Ok(proc); + } + + fn get_conn(&self, proc: &mut std::process::Child) -> eyre::Result { + let stdout = proc + .stdout + .take() + .ok_or(eyre::anyhow!("could not acquire stdout from child process"))?; + + let stderr = proc + .stderr + .take() + .ok_or(eyre::anyhow!("could not acquire stderr from child process"))?; + + let mut conn: Option = None; + + std::thread::scope(|s| { + s.spawn(|| { + let stdout_bufr = BufReader::new(stdout); + let mut res_conn: Option = None; + for line in stdout_bufr.lines() { + let out = line.unwrap(); + let conn: ConnectParams = serde_json::from_str(&out).unwrap(); + res_conn = Some(conn); + break; + } + + conn = res_conn; + }); + + //s.spawn(|| { + // let stderr_bufr = BufReader::new(stderr); + // for line in stderr_bufr.lines() { + // let out = line.unwrap(); + // panic!("could not start dagger session: {}", out) + // } + //}); + }); + + Ok(conn.ok_or(eyre::anyhow!("could not connect to dagger"))?) + } +} diff --git a/src/config.rs b/src/config.rs new file mode 100644 index 0000000..34acb96 --- /dev/null +++ b/src/config.rs @@ -0,0 +1,24 @@ +use std::path::PathBuf; + +pub struct Config { + pub workdir_path: Option, + pub config_path: Option, + pub timeout_ms: u64, + pub execute_timeout_ms: Option, +} + +impl Config { + pub fn new( + workdir_path: Option, + config_path: Option, + timeout_ms: Option, + execute_timeout_ms: Option, + ) -> Self { + Self { + workdir_path, + config_path, + timeout_ms: timeout_ms.unwrap_or(10 * 1000), + execute_timeout_ms, + } + } +} diff --git a/src/connect_params.rs b/src/connect_params.rs new file mode 100644 index 0000000..cdfe604 --- /dev/null +++ b/src/connect_params.rs @@ -0,0 +1,20 @@ +use serde::Deserialize; + +#[derive(Clone, Debug, Deserialize, PartialEq)] +pub struct ConnectParams { + pub port: u64, + pub session_token: String, +} + +impl ConnectParams { + pub fn new(port: u64, session_token: &str) -> Self { + Self { + port, + session_token: session_token.to_string(), + } + } + + pub fn url(&self) -> String { + format!("http://127.0.0.1:{}/query", self.port) + } +} diff --git a/src/downloader.rs b/src/downloader.rs index 5c1d815..7e5d1a8 100644 --- a/src/downloader.rs +++ b/src/downloader.rs @@ -1,17 +1,16 @@ use std::{ fs::File, - io::{copy, BufReader, BufWriter, Read, Write}, + io::{copy, Read, Write}, os::unix::prelude::PermissionsExt, path::PathBuf, }; use eyre::Context; use flate2::read::GzDecoder; -use hex_literal::hex; use platform_info::Uname; use sha2::Digest; use tar::Archive; -use tempfile::{tempfile, NamedTempFile}; +use tempfile::tempfile; #[allow(dead_code)] #[derive(Clone)] diff --git a/src/engine.rs b/src/engine.rs new file mode 100644 index 0000000..2401b98 --- /dev/null +++ b/src/engine.rs @@ -0,0 +1,46 @@ +use crate::{ + cli_session::CliSession, config::Config, connect_params::ConnectParams, downloader::Downloader, +}; + +pub struct Engine {} + +impl Engine { + pub fn new() -> Self { + Self {} + } + + fn from_cli(&self, cfg: &Config) -> eyre::Result { + let cli = Downloader::new("0.3.10".into())?.get_cli()?; + + let cli_session = CliSession::new(); + + Ok(cli_session.connect(cfg, &cli)?) + } + + pub fn start(&self, cfg: &Config) -> eyre::Result { + // TODO: Add from existing session as well + self.from_cli(cfg) + } +} + +#[cfg(test)] +mod tests { + use crate::{config::Config, connect_params::ConnectParams}; + + use super::Engine; + + // TODO: these tests potentially have a race condition + #[test] + fn engine_can_start() { + let engine = Engine::new(); + let params = engine.start(&Config::new(None, None, None, None)).unwrap(); + + assert_ne!( + params, + ConnectParams { + port: 123, + session_token: "123".into() + } + ) + } +} diff --git a/src/lib.rs b/src/lib.rs index 3c163e4..efbfbfb 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,4 +1,8 @@ pub mod cli; +mod cli_session; +mod config; +mod connect_params; pub mod dagger; mod downloader; +mod engine; mod schema;