From 569f5272e667deeef9f269db5eaf3dec57e2df1c Mon Sep 17 00:00:00 2001 From: kjuulh Date: Sat, 26 Aug 2023 22:32:38 +0200 Subject: [PATCH] feat: with monitor Signed-off-by: kjuulh --- Cargo.lock | 31 +++++++++ Cargo.toml | 3 +- crates/churn-agent/Cargo.toml | 5 ++ crates/churn-agent/src/agent.rs | 85 ++++++++++++++++++++++ crates/churn-agent/src/main.rs | 71 +++++++++++++++++-- crates/churn-domain/Cargo.toml | 17 +++++ crates/churn-domain/src/lib.rs | 25 +++++++ crates/churn-server/Cargo.toml | 5 +- crates/churn-server/src/agent.rs | 64 +++++++++++++++++ crates/churn-server/src/lease.rs | 57 +++++++++++++++ crates/churn-server/src/main.rs | 116 +++++++++++++------------------ crates/churn/Cargo.toml | 2 + crates/churn/src/main.rs | 99 ++++++++++++++++++++++++-- crates/churning/src/main.rs | 41 ++++++----- 14 files changed, 526 insertions(+), 95 deletions(-) create mode 100644 crates/churn-agent/src/agent.rs create mode 100644 crates/churn-domain/Cargo.toml create mode 100644 crates/churn-domain/src/lib.rs create mode 100644 crates/churn-server/src/agent.rs create mode 100644 crates/churn-server/src/lease.rs diff --git a/Cargo.lock b/Cargo.lock index 5304f94..8e91e75 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -235,6 +235,7 @@ version = "0.1.0" dependencies = [ "anyhow", "axum", + "churn-domain", "clap", "dotenv", "reqwest", @@ -249,8 +250,27 @@ version = "0.1.0" dependencies = [ "anyhow", "axum", + "churn-domain", "clap", "dotenv", + "reqwest", + "serde", + "serde_json", + "tokio", + "tracing", + "tracing-subscriber", +] + +[[package]] +name = "churn-domain" +version = "0.1.0" +dependencies = [ + "anyhow", + "axum", + "clap", + "dotenv", + "reqwest", + "serde", "tokio", "tracing", "tracing-subscriber", @@ -262,6 +282,7 @@ version = "0.1.0" dependencies = [ "anyhow", "axum", + "churn-domain", "clap", "dotenv", "serde", @@ -269,6 +290,7 @@ dependencies = [ "tokio", "tracing", "tracing-subscriber", + "uuid", ] [[package]] @@ -1971,6 +1993,15 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a" +[[package]] +name = "uuid" +version = "1.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "79daa5ed5740825c40b389c5e50312b9c86df53fccd33f281df655642b43869d" +dependencies = [ + "getrandom", +] + [[package]] name = "valuable" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index 5c72ced..defc137 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,6 +6,7 @@ resolver = "2" churn = { path = "crates/churn" } churn-agent = { path = "crates/churn-agent" } churn-server = { path = "crates/churn-server" } +churn-domain = { path = "crates/churn-domain" } anyhow = { version = "1.0.71" } tokio = { version = "1", features = ["full"] } @@ -17,5 +18,5 @@ axum = { version = "0.6.18", features = ["macros"] } async-trait = "*" serde = {version = "1", features = ["derive"]} serde_json = "1" - reqwest = {version = "0.11.20", features = ["json"]} +uuid = {version = "1.4.1", features = ["v4"]} diff --git a/crates/churn-agent/Cargo.toml b/crates/churn-agent/Cargo.toml index cbf8d77..a19b71e 100644 --- a/crates/churn-agent/Cargo.toml +++ b/crates/churn-agent/Cargo.toml @@ -6,6 +6,8 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +churn-domain.workspace = true + anyhow.workspace = true tokio.workspace = true tracing.workspace = true @@ -13,3 +15,6 @@ tracing-subscriber.workspace = true clap.workspace = true dotenv.workspace = true axum.workspace = true +serde.workspace = true +serde_json.workspace = true +reqwest.workspace = true diff --git a/crates/churn-agent/src/agent.rs b/crates/churn-agent/src/agent.rs new file mode 100644 index 0000000..88f8f52 --- /dev/null +++ b/crates/churn-agent/src/agent.rs @@ -0,0 +1,85 @@ + + +use std::sync::Arc; + + + + + + +use axum::{async_trait}; +use churn_domain::{ServerEnrollReq}; + + + +use tokio::sync::Mutex; + +#[derive(Clone)] +pub struct AgentService(Arc); + +impl std::ops::Deref for AgentService { + type Target = Arc; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl Default for AgentService { + fn default() -> Self { + Self(Arc::new(DefaultAgentService::default())) + } +} + +#[derive(Default)] +struct DefaultAgentService { + server: Arc>, + leases: Arc>>, +} + + + +#[async_trait] +pub trait AgentServiceTrait { + async fn enroll(&self, agent_name: &str, server: &str, lease: &str) -> anyhow::Result<()>; +} + +#[async_trait] +impl AgentServiceTrait for DefaultAgentService { + async fn enroll(&self, agent_name: &str, server: &str, lease: &str) -> anyhow::Result<()> { + let mut cur_server = self.server.lock().await; + let mut leases = self.leases.lock().await; + + let client = reqwest::Client::new(); + let req = client + .post(format!("{server}/agent/enroll")) + .json(&ServerEnrollReq { + lease: lease.into(), + agent_name: agent_name.into(), + }) + .build()?; + + let resp = client.execute(req).await?; + if !resp.status().is_success() { + if let Ok(text) = resp.text().await { + anyhow::bail!( + "could not enroll agent: {} at server: {}, error: {}", + agent_name, + server, + text + ) + } + + anyhow::bail!( + "could not enroll agent: {} at server: {}", + agent_name, + server + ) + } + + *cur_server = server.to_string(); + leases.push(lease.to_string()); + + Ok(()) + } +} diff --git a/crates/churn-agent/src/main.rs b/crates/churn-agent/src/main.rs index 0304f1d..9adcf70 100644 --- a/crates/churn-agent/src/main.rs +++ b/crates/churn-agent/src/main.rs @@ -1,7 +1,19 @@ +mod agent; + use std::net::SocketAddr; -use axum::{response::IntoResponse, routing::get, Router}; +use agent::AgentService; +use anyhow::Error; +use axum::{ + extract::State, + http::StatusCode, + response::{IntoResponse, Response}, + routing::{get, post}, + Json, Router, +}; +use churn_domain::AgentEnrollReq; use clap::{Parser, Subcommand}; +use serde_json::json; #[derive(Parser)] #[command(author, version, about, long_about = None, subcommand_required = true)] @@ -30,6 +42,14 @@ enum Commands { }, } +#[derive(Clone)] +#[derive(Default)] +struct AppState { + agent: AgentService, +} + + + #[tokio::main] async fn main() -> anyhow::Result<()> { dotenv::dotenv().ok(); @@ -47,7 +67,10 @@ async fn handle_command(cmd: Command) -> anyhow::Result<()> { Some(Commands::Daemon { host }) => { tracing::info!("Starting churn server"); - let app = Router::new().route("/ping", get(ping)); + let app = Router::new() + .route("/enroll", post(enroll)) + .route("/ping", get(ping)) + .with_state(AppState::default()); tracing::info!("churn server listening on {}", host); axum::Server::bind(&host) @@ -58,14 +81,52 @@ async fn handle_command(cmd: Command) -> anyhow::Result<()> { Ok(()) } Some(Commands::Connect { - host, - token, - agent_name, + host: _, + token: _, + agent_name: _, }) => todo!(), None => todo!(), } } +enum AppError { + Internal(Error), +} + +impl IntoResponse for AppError { + fn into_response(self) -> Response { + let (status, error_message) = match self { + AppError::Internal(e) => { + tracing::error!("failed with error: {}", e); + + ( + StatusCode::INTERNAL_SERVER_ERROR, + "failed with internal error", + ) + } + }; + + let body = Json(json!({ + "error": error_message, + })); + + (status, body).into_response() + } +} + async fn ping() -> impl IntoResponse { "pong!" } + +async fn enroll( + State(state): State, + Json(req): Json, +) -> Result<(), AppError> { + state + .agent + .enroll(&req.agent_name, &req.server, &req.lease) + .await + .map_err(AppError::Internal)?; + + Ok(()) +} diff --git a/crates/churn-domain/Cargo.toml b/crates/churn-domain/Cargo.toml new file mode 100644 index 0000000..d765bcb --- /dev/null +++ b/crates/churn-domain/Cargo.toml @@ -0,0 +1,17 @@ +[package] +name = "churn-domain" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +anyhow.workspace = true +tokio.workspace = true +tracing.workspace = true +tracing-subscriber.workspace = true +clap.workspace = true +dotenv.workspace = true +axum.workspace = true +reqwest.workspace = true +serde.workspace = true \ No newline at end of file diff --git a/crates/churn-domain/src/lib.rs b/crates/churn-domain/src/lib.rs new file mode 100644 index 0000000..45cf361 --- /dev/null +++ b/crates/churn-domain/src/lib.rs @@ -0,0 +1,25 @@ +use serde::{Deserialize, Serialize}; + +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct LeaseResp { + pub token: String, +} + +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct AgentEnrollReq { + pub lease: String, + pub server: String, + pub agent_name: String, +} + +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct ServerEnrollReq { + pub lease: String, + pub agent_name: String, +} + +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct ServerMonitorResp { + pub cursor: String, + pub logs: Vec, +} diff --git a/crates/churn-server/Cargo.toml b/crates/churn-server/Cargo.toml index 22e7753..0ed716c 100644 --- a/crates/churn-server/Cargo.toml +++ b/crates/churn-server/Cargo.toml @@ -6,6 +6,8 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +churn-domain.workspace = true + anyhow.workspace = true tokio.workspace = true tracing.workspace = true @@ -14,4 +16,5 @@ clap.workspace = true dotenv.workspace = true axum.workspace = true serde.workspace = true -serde_json.workspace = true \ No newline at end of file +serde_json.workspace = true +uuid.workspace = true \ No newline at end of file diff --git a/crates/churn-server/src/agent.rs b/crates/churn-server/src/agent.rs new file mode 100644 index 0000000..a8891b9 --- /dev/null +++ b/crates/churn-server/src/agent.rs @@ -0,0 +1,64 @@ +use std::collections::HashMap; + +use std::sync::Arc; + +use axum::async_trait; + +use churn_domain::ServerEnrollReq; +use tokio::sync::Mutex; + +use crate::Agent; + +#[derive(Clone)] +pub struct AgentService(Arc); + +impl std::ops::Deref for AgentService { + type Target = Arc; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl Default for AgentService { + fn default() -> Self { + Self(Arc::new(DefaultAgentService::default())) + } +} + +#[derive(Default)] +struct DefaultAgentService { + agents: Arc>>, +} + +#[async_trait] +pub trait AgentServiceTrait { + async fn enroll(&self, req: ServerEnrollReq) -> anyhow::Result; +} + +#[async_trait] +impl AgentServiceTrait for DefaultAgentService { + async fn enroll(&self, req: ServerEnrollReq) -> anyhow::Result { + let agent_name = req.agent_name; + + let mut agents = self.agents.lock().await; + + match agents.insert( + agent_name.clone(), + Agent { + name: agent_name.clone(), + }, + ) { + Some(_) => { + tracing::debug!("agents store already contained agent, replaced existing"); + + Ok(agent_name) + } + None => { + tracing::debug!("agents store didn't contain agent, inserted"); + + Ok(agent_name) + } + } + } +} diff --git a/crates/churn-server/src/lease.rs b/crates/churn-server/src/lease.rs new file mode 100644 index 0000000..34ba128 --- /dev/null +++ b/crates/churn-server/src/lease.rs @@ -0,0 +1,57 @@ + + +use std::sync::Arc; + + + + + + +use axum::{async_trait}; + + + + +use tokio::sync::Mutex; + +#[derive(Clone)] +pub struct LeaseService(Arc); + +impl std::ops::Deref for LeaseService { + type Target = Arc; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl Default for LeaseService { + fn default() -> Self { + Self(Arc::new(DefaultLeaseService::default())) + } +} + +#[derive(Default)] +struct DefaultLeaseService { + leases: Arc>>, +} + + + +#[async_trait] +pub trait LeaseServiceTrait { + async fn create_lease(&self) -> anyhow::Result; +} + +#[async_trait] +impl LeaseServiceTrait for DefaultLeaseService { + async fn create_lease(&self) -> anyhow::Result { + let mut leases = self.leases.lock().await; + + let lease = uuid::Uuid::new_v4().to_string(); + + leases.push(lease.clone()); + + Ok(lease) + } +} diff --git a/crates/churn-server/src/main.rs b/crates/churn-server/src/main.rs index 0de245e..a94891c 100644 --- a/crates/churn-server/src/main.rs +++ b/crates/churn-server/src/main.rs @@ -1,17 +1,20 @@ -use std::collections::HashMap; -use std::net::SocketAddr; -use std::sync::Arc; +mod agent; +mod lease; +use std::net::SocketAddr; + +use agent::AgentService; use anyhow::Error; -use axum::extract::State; +use axum::extract::{Query, State}; use axum::http::StatusCode; use axum::response::{IntoResponse, Response}; use axum::routing::{get, post}; -use axum::{async_trait, Json, Router}; +use axum::{Json, Router}; +use churn_domain::{LeaseResp, ServerEnrollReq, ServerMonitorResp}; use clap::{Parser, Subcommand}; +use lease::LeaseService; use serde::{Deserialize, Serialize}; use serde_json::json; -use tokio::sync::Mutex; #[derive(Parser)] #[command(author, version, about, long_about = None, subcommand_required = true)] @@ -28,60 +31,6 @@ enum Commands { }, } -#[derive(Clone)] -struct AgentService(Arc); - -impl std::ops::Deref for AgentService { - type Target = Arc; - - fn deref(&self) -> &Self::Target { - &self.0 - } -} - -impl Default for AgentService { - fn default() -> Self { - Self(Arc::new(DefaultAgentService::default())) - } -} - -struct DefaultAgentService { - agents: Arc>>, -} - -impl Default for DefaultAgentService { - fn default() -> Self { - Self { - agents: Arc::default(), - } - } -} - -#[async_trait] -trait AgentServiceTrait { - async fn enroll(&self, agent: Agent) -> anyhow::Result; -} - -#[async_trait] -impl AgentServiceTrait for DefaultAgentService { - async fn enroll(&self, agent: Agent) -> anyhow::Result { - let mut agents = self.agents.lock().await; - - match agents.insert(agent.name.clone(), agent.clone()) { - Some(_) => { - tracing::debug!("agents store already contained agent, replaced existing"); - - Ok(agent.name) - } - None => { - tracing::debug!("agents store didn't contain agent, inserted"); - - Ok(agent.name) - } - } - } -} - #[derive(Clone, Debug, Deserialize, Serialize)] struct Agent { pub name: String, @@ -90,6 +39,7 @@ struct Agent { #[derive(Clone)] struct AppState { agent: AgentService, + leases: LeaseService, } #[tokio::main] @@ -105,15 +55,18 @@ async fn main() -> anyhow::Result<()> { let app = Router::new() .route("/ping", get(ping)) + .route("/logs", get(logs)) .nest( "/agent", Router::new() .route("/enroll", post(enroll)) .route("/ping", post(agent_ping)) - .route("/events", post(get_tasks)), + .route("/events", post(get_tasks)) + .route("/lease", post(agent_lease)), ) .with_state(AppState { agent: AgentService::default(), + leases: LeaseService::default(), }); tracing::info!("churn server listening on {}", host); @@ -155,15 +108,21 @@ impl IntoResponse for AppError { async fn enroll( State(state): State, - Json(agent): Json, + Json(req): Json, ) -> Result, AppError> { - let _ = state - .agent - .enroll(agent.clone()) - .await - .map_err(|e| AppError::Internal(e)); + let name = state.agent.enroll(req).await.map_err(AppError::Internal)?; - Ok(Json(agent)) + Ok(Json(Agent { name })) +} + +async fn agent_lease(State(state): State) -> Result, AppError> { + let lease = state + .leases + .create_lease() + .await + .map_err(AppError::Internal)?; + + Ok(Json(LeaseResp { token: lease })) } async fn agent_ping() -> impl IntoResponse { @@ -177,3 +136,24 @@ async fn get_tasks() -> impl IntoResponse { async fn ping() -> impl IntoResponse { "pong!" } + +#[derive(Clone, Deserialize)] +struct LogsQuery { + cursor: Option, +} + +async fn logs(Query(cursor): Query) -> Result, AppError> { + match cursor.cursor { + Some(cursor) => { + tracing::trace!("finding logs from cursor: {}", cursor); + } + None => { + tracing::trace!("finding logs from beginning"); + } + } + + Ok(Json(ServerMonitorResp { + cursor: uuid::Uuid::new_v4().to_string(), + logs: vec!["something".to_string()], + })) +} diff --git a/crates/churn/Cargo.toml b/crates/churn/Cargo.toml index 7c32059..d8ed1a2 100644 --- a/crates/churn/Cargo.toml +++ b/crates/churn/Cargo.toml @@ -4,6 +4,8 @@ version = "0.1.0" edition = "2021" [dependencies] +churn-domain.workspace = true + anyhow.workspace = true tokio.workspace = true tracing.workspace = true diff --git a/crates/churn/src/main.rs b/crates/churn/src/main.rs index 9e67468..0b31cf2 100644 --- a/crates/churn/src/main.rs +++ b/crates/churn/src/main.rs @@ -1,5 +1,4 @@ -use std::net::SocketAddr; - +use churn_domain::{AgentEnrollReq, LeaseResp, ServerMonitorResp}; use clap::{Parser, Subcommand}; #[derive(Parser)] @@ -11,10 +10,20 @@ struct Command { #[derive(Subcommand)] enum Commands { + Auth { + #[arg(env = "CHURN_SERVER", long)] + server: String, + + #[arg(env = "CHURN_SERVER_TOKEN", long)] + server_token: String, + }, Bootstrap { #[arg(env = "CHURN_AGENT", long)] agent: String, + #[arg(env = "CHURN_AGENT_NAME", long)] + agent_name: String, + #[arg(env = "CHURN_SERVER", long)] server: String, @@ -27,6 +36,13 @@ enum Commands { #[arg(env = "CHURN_AGENT", long)] agent: String, }, + Monitor { + #[arg(env = "CHURN_SERVER", long)] + server: String, + + #[arg(env = "CHURN_SERVER_TOKEN", long)] + server_token: String, + }, } #[tokio::main] @@ -46,9 +62,38 @@ async fn handle_command(cmd: Command) -> anyhow::Result<()> { match cmd { Commands::Bootstrap { agent, + agent_name, server, - server_token, - } => todo!(), + server_token: _, + } => { + tracing::info!("enrolling agent: {} for server: {}", agent, server); + let client = reqwest::Client::new(); + let req = client.post(format!("{server}/agent/lease")).build()?; + let lease_resp = client.execute(req).await?; + let lease = lease_resp.json::().await?; + + let req = client + .post(format!("{agent}/enroll")) + .json(&AgentEnrollReq { + lease: lease.token, + server, + agent_name, + }) + .build()?; + let lease_resp = client.execute(req).await?; + if !lease_resp.status().is_success() { + if let Ok(text) = lease_resp.text().await { + tracing::warn!( + "could not enroll because agent server encoutered error: {}", + text + ); + anyhow::bail!("encountered error: {}", text); + } + anyhow::bail!("encountered error"); + } + + Ok(()) + } Commands::Health { server, agent } => { tracing::info!("connecting to server: {}", server); reqwest::get(format!("{server}/ping")).await?; @@ -58,6 +103,52 @@ async fn handle_command(cmd: Command) -> anyhow::Result<()> { tracing::info!("connected to agent successfully"); Ok(()) } + Commands::Auth { + server: _, + server_token: _, + } => todo!(), + Commands::Monitor { + server, + server_token, + } => { + tracing::info!("monitoring server: {}", server); + + let mut cursor: Option = None; + loop { + tracing::info!("reading logs from server: {}", server); + + let resp = reqwest::get(format!( + "{server}/logs{}", + match &cursor { + None => "".to_string(), + Some(cursor) => format!("?cursor={}", cursor), + } + )) + .await?; + + if !resp.status().is_success() { + if let Ok(text) = resp.text().await { + anyhow::bail!("encountered error: {}", text); + } + anyhow::bail!("encountered error"); + } + + match resp.json::().await { + Ok(resp) => { + for line in resp.logs { + tracing::info!("server: {}", line); + } + cursor = Some(resp.cursor); + } + Err(e) => { + tracing::warn!("failed to call server (error={})", e); + } + } + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + } + + Ok(()) + } } } else { panic!("no command supplied") diff --git a/crates/churning/src/main.rs b/crates/churning/src/main.rs index 5087b20..f8cf350 100644 --- a/crates/churning/src/main.rs +++ b/crates/churning/src/main.rs @@ -11,21 +11,30 @@ async fn main() -> eyre::Result<()> { println!("Building churning..."); - let client = dagger_sdk::connect_opts(config).await?; + //let client = dagger_sdk::connect_opts(config).await?; + let client = dagger_sdk::connect().await?; - let agent = build_container(client.clone(), "churn-agent").await?; - let agent = agent - .with_exec(vec!["churn-agent", "daemon", "--host", "0.0.0.0:3000"]) - .with_exposed_port(3000); let cli = build_container(client.clone(), "churn").await?; let server = build_container(client.clone(), "churn-server").await?; let server = server .with_exec(vec!["churn-server", "serve", "--host", "0.0.0.0:3000"]) .with_exposed_port(3000); + let server_id = server.id().await?; + + let agent = build_container(client.clone(), "churn-agent").await?; + let agent = agent + .with_service_binding("churn-server", server_id.clone()) + .with_exec(vec!["churn-agent", "daemon", "--host", "0.0.0.0:3000"]) + .with_exposed_port(3000); + let churning = cli .with_service_binding("churn-agent", agent.id().await?) - .with_service_binding("churn-server", server.id().await?) + .with_service_binding("churn-server", server_id) + .with_env_variable("CHURN_SERVER", "http://churn-server:3000") + .with_env_variable("CHURN_SERVER_TOKEN", "something") + .with_env_variable("CHURN_AGENT", "http://churn-agent:3000") + .with_env_variable("CHURN_AGENT_NAME", "churn-agent") .with_exec(vec![ "churn", "health", @@ -52,7 +61,7 @@ async fn repl(container: dagger_sdk::Container) -> eyre::Result<()> { let mut container = container; loop { - let mut stdin = tokio::io::stdin(); + let stdin = tokio::io::stdin(); let mut stdout = tokio::io::stdout(); stdout.write_all(b"> ").await?; @@ -79,14 +88,14 @@ async fn repl(container: dagger_sdk::Container) -> eyre::Result<()> { eprintln!("{}", e); } } - match container.stderr().await { - Ok(stderr) => { - println!("{stderr}"); - } - Err(e) => { - eprintln!("{}", e); - } - } + // match container.stderr().await { + // Ok(stderr) => { + // println!("{stderr}"); + // } + // Err(e) => { + // eprintln!("{}", e); + // } + // } match container.exit_code().await { Ok(_) => {} @@ -121,7 +130,7 @@ async fn build_container( .collect::>(), architecture: dagger_rust::build::BuildArchitecture::Amd64, }], - &bin_name, + bin_name, ) .await?;