feat: with monitor

Signed-off-by: kjuulh <contact@kjuulh.io>
This commit is contained in:
Kasper Juul Hermansen 2023-08-26 22:32:38 +02:00
parent 10eae9b36c
commit 569f5272e6
Signed by: kjuulh
GPG Key ID: 9AA7BC13CE474394
14 changed files with 526 additions and 95 deletions

31
Cargo.lock generated
View File

@ -235,6 +235,7 @@ version = "0.1.0"
dependencies = [
"anyhow",
"axum",
"churn-domain",
"clap",
"dotenv",
"reqwest",
@ -249,8 +250,27 @@ version = "0.1.0"
dependencies = [
"anyhow",
"axum",
"churn-domain",
"clap",
"dotenv",
"reqwest",
"serde",
"serde_json",
"tokio",
"tracing",
"tracing-subscriber",
]
[[package]]
name = "churn-domain"
version = "0.1.0"
dependencies = [
"anyhow",
"axum",
"clap",
"dotenv",
"reqwest",
"serde",
"tokio",
"tracing",
"tracing-subscriber",
@ -262,6 +282,7 @@ version = "0.1.0"
dependencies = [
"anyhow",
"axum",
"churn-domain",
"clap",
"dotenv",
"serde",
@ -269,6 +290,7 @@ dependencies = [
"tokio",
"tracing",
"tracing-subscriber",
"uuid",
]
[[package]]
@ -1971,6 +1993,15 @@ version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a"
[[package]]
name = "uuid"
version = "1.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "79daa5ed5740825c40b389c5e50312b9c86df53fccd33f281df655642b43869d"
dependencies = [
"getrandom",
]
[[package]]
name = "valuable"
version = "0.1.0"

View File

@ -6,6 +6,7 @@ resolver = "2"
churn = { path = "crates/churn" }
churn-agent = { path = "crates/churn-agent" }
churn-server = { path = "crates/churn-server" }
churn-domain = { path = "crates/churn-domain" }
anyhow = { version = "1.0.71" }
tokio = { version = "1", features = ["full"] }
@ -17,5 +18,5 @@ axum = { version = "0.6.18", features = ["macros"] }
async-trait = "*"
serde = {version = "1", features = ["derive"]}
serde_json = "1"
reqwest = {version = "0.11.20", features = ["json"]}
uuid = {version = "1.4.1", features = ["v4"]}

View File

@ -6,6 +6,8 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
churn-domain.workspace = true
anyhow.workspace = true
tokio.workspace = true
tracing.workspace = true
@ -13,3 +15,6 @@ tracing-subscriber.workspace = true
clap.workspace = true
dotenv.workspace = true
axum.workspace = true
serde.workspace = true
serde_json.workspace = true
reqwest.workspace = true

View File

@ -0,0 +1,85 @@
use std::sync::Arc;
use axum::{async_trait};
use churn_domain::{ServerEnrollReq};
use tokio::sync::Mutex;
#[derive(Clone)]
pub struct AgentService(Arc<dyn AgentServiceTrait + Send + Sync + 'static>);
impl std::ops::Deref for AgentService {
type Target = Arc<dyn AgentServiceTrait + Send + Sync + 'static>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl Default for AgentService {
fn default() -> Self {
Self(Arc::new(DefaultAgentService::default()))
}
}
#[derive(Default)]
struct DefaultAgentService {
server: Arc<Mutex<String>>,
leases: Arc<Mutex<Vec<String>>>,
}
#[async_trait]
pub trait AgentServiceTrait {
async fn enroll(&self, agent_name: &str, server: &str, lease: &str) -> anyhow::Result<()>;
}
#[async_trait]
impl AgentServiceTrait for DefaultAgentService {
async fn enroll(&self, agent_name: &str, server: &str, lease: &str) -> anyhow::Result<()> {
let mut cur_server = self.server.lock().await;
let mut leases = self.leases.lock().await;
let client = reqwest::Client::new();
let req = client
.post(format!("{server}/agent/enroll"))
.json(&ServerEnrollReq {
lease: lease.into(),
agent_name: agent_name.into(),
})
.build()?;
let resp = client.execute(req).await?;
if !resp.status().is_success() {
if let Ok(text) = resp.text().await {
anyhow::bail!(
"could not enroll agent: {} at server: {}, error: {}",
agent_name,
server,
text
)
}
anyhow::bail!(
"could not enroll agent: {} at server: {}",
agent_name,
server
)
}
*cur_server = server.to_string();
leases.push(lease.to_string());
Ok(())
}
}

View File

@ -1,7 +1,19 @@
mod agent;
use std::net::SocketAddr;
use axum::{response::IntoResponse, routing::get, Router};
use agent::AgentService;
use anyhow::Error;
use axum::{
extract::State,
http::StatusCode,
response::{IntoResponse, Response},
routing::{get, post},
Json, Router,
};
use churn_domain::AgentEnrollReq;
use clap::{Parser, Subcommand};
use serde_json::json;
#[derive(Parser)]
#[command(author, version, about, long_about = None, subcommand_required = true)]
@ -30,6 +42,14 @@ enum Commands {
},
}
#[derive(Clone)]
#[derive(Default)]
struct AppState {
agent: AgentService,
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
dotenv::dotenv().ok();
@ -47,7 +67,10 @@ async fn handle_command(cmd: Command) -> anyhow::Result<()> {
Some(Commands::Daemon { host }) => {
tracing::info!("Starting churn server");
let app = Router::new().route("/ping", get(ping));
let app = Router::new()
.route("/enroll", post(enroll))
.route("/ping", get(ping))
.with_state(AppState::default());
tracing::info!("churn server listening on {}", host);
axum::Server::bind(&host)
@ -58,14 +81,52 @@ async fn handle_command(cmd: Command) -> anyhow::Result<()> {
Ok(())
}
Some(Commands::Connect {
host,
token,
agent_name,
host: _,
token: _,
agent_name: _,
}) => todo!(),
None => todo!(),
}
}
enum AppError {
Internal(Error),
}
impl IntoResponse for AppError {
fn into_response(self) -> Response {
let (status, error_message) = match self {
AppError::Internal(e) => {
tracing::error!("failed with error: {}", e);
(
StatusCode::INTERNAL_SERVER_ERROR,
"failed with internal error",
)
}
};
let body = Json(json!({
"error": error_message,
}));
(status, body).into_response()
}
}
async fn ping() -> impl IntoResponse {
"pong!"
}
async fn enroll(
State(state): State<AppState>,
Json(req): Json<AgentEnrollReq>,
) -> Result<(), AppError> {
state
.agent
.enroll(&req.agent_name, &req.server, &req.lease)
.await
.map_err(AppError::Internal)?;
Ok(())
}

View File

@ -0,0 +1,17 @@
[package]
name = "churn-domain"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
anyhow.workspace = true
tokio.workspace = true
tracing.workspace = true
tracing-subscriber.workspace = true
clap.workspace = true
dotenv.workspace = true
axum.workspace = true
reqwest.workspace = true
serde.workspace = true

View File

@ -0,0 +1,25 @@
use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct LeaseResp {
pub token: String,
}
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct AgentEnrollReq {
pub lease: String,
pub server: String,
pub agent_name: String,
}
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct ServerEnrollReq {
pub lease: String,
pub agent_name: String,
}
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct ServerMonitorResp {
pub cursor: String,
pub logs: Vec<String>,
}

View File

@ -6,6 +6,8 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
churn-domain.workspace = true
anyhow.workspace = true
tokio.workspace = true
tracing.workspace = true
@ -14,4 +16,5 @@ clap.workspace = true
dotenv.workspace = true
axum.workspace = true
serde.workspace = true
serde_json.workspace = true
serde_json.workspace = true
uuid.workspace = true

View File

@ -0,0 +1,64 @@
use std::collections::HashMap;
use std::sync::Arc;
use axum::async_trait;
use churn_domain::ServerEnrollReq;
use tokio::sync::Mutex;
use crate::Agent;
#[derive(Clone)]
pub struct AgentService(Arc<dyn AgentServiceTrait + Send + Sync + 'static>);
impl std::ops::Deref for AgentService {
type Target = Arc<dyn AgentServiceTrait + Send + Sync + 'static>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl Default for AgentService {
fn default() -> Self {
Self(Arc::new(DefaultAgentService::default()))
}
}
#[derive(Default)]
struct DefaultAgentService {
agents: Arc<Mutex<HashMap<String, Agent>>>,
}
#[async_trait]
pub trait AgentServiceTrait {
async fn enroll(&self, req: ServerEnrollReq) -> anyhow::Result<String>;
}
#[async_trait]
impl AgentServiceTrait for DefaultAgentService {
async fn enroll(&self, req: ServerEnrollReq) -> anyhow::Result<String> {
let agent_name = req.agent_name;
let mut agents = self.agents.lock().await;
match agents.insert(
agent_name.clone(),
Agent {
name: agent_name.clone(),
},
) {
Some(_) => {
tracing::debug!("agents store already contained agent, replaced existing");
Ok(agent_name)
}
None => {
tracing::debug!("agents store didn't contain agent, inserted");
Ok(agent_name)
}
}
}
}

View File

@ -0,0 +1,57 @@
use std::sync::Arc;
use axum::{async_trait};
use tokio::sync::Mutex;
#[derive(Clone)]
pub struct LeaseService(Arc<dyn LeaseServiceTrait + Send + Sync + 'static>);
impl std::ops::Deref for LeaseService {
type Target = Arc<dyn LeaseServiceTrait + Send + Sync + 'static>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl Default for LeaseService {
fn default() -> Self {
Self(Arc::new(DefaultLeaseService::default()))
}
}
#[derive(Default)]
struct DefaultLeaseService {
leases: Arc<Mutex<Vec<String>>>,
}
#[async_trait]
pub trait LeaseServiceTrait {
async fn create_lease(&self) -> anyhow::Result<String>;
}
#[async_trait]
impl LeaseServiceTrait for DefaultLeaseService {
async fn create_lease(&self) -> anyhow::Result<String> {
let mut leases = self.leases.lock().await;
let lease = uuid::Uuid::new_v4().to_string();
leases.push(lease.clone());
Ok(lease)
}
}

View File

@ -1,17 +1,20 @@
use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::Arc;
mod agent;
mod lease;
use std::net::SocketAddr;
use agent::AgentService;
use anyhow::Error;
use axum::extract::State;
use axum::extract::{Query, State};
use axum::http::StatusCode;
use axum::response::{IntoResponse, Response};
use axum::routing::{get, post};
use axum::{async_trait, Json, Router};
use axum::{Json, Router};
use churn_domain::{LeaseResp, ServerEnrollReq, ServerMonitorResp};
use clap::{Parser, Subcommand};
use lease::LeaseService;
use serde::{Deserialize, Serialize};
use serde_json::json;
use tokio::sync::Mutex;
#[derive(Parser)]
#[command(author, version, about, long_about = None, subcommand_required = true)]
@ -28,60 +31,6 @@ enum Commands {
},
}
#[derive(Clone)]
struct AgentService(Arc<dyn AgentServiceTrait + Send + Sync + 'static>);
impl std::ops::Deref for AgentService {
type Target = Arc<dyn AgentServiceTrait + Send + Sync + 'static>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl Default for AgentService {
fn default() -> Self {
Self(Arc::new(DefaultAgentService::default()))
}
}
struct DefaultAgentService {
agents: Arc<Mutex<HashMap<String, Agent>>>,
}
impl Default for DefaultAgentService {
fn default() -> Self {
Self {
agents: Arc::default(),
}
}
}
#[async_trait]
trait AgentServiceTrait {
async fn enroll(&self, agent: Agent) -> anyhow::Result<String>;
}
#[async_trait]
impl AgentServiceTrait for DefaultAgentService {
async fn enroll(&self, agent: Agent) -> anyhow::Result<String> {
let mut agents = self.agents.lock().await;
match agents.insert(agent.name.clone(), agent.clone()) {
Some(_) => {
tracing::debug!("agents store already contained agent, replaced existing");
Ok(agent.name)
}
None => {
tracing::debug!("agents store didn't contain agent, inserted");
Ok(agent.name)
}
}
}
}
#[derive(Clone, Debug, Deserialize, Serialize)]
struct Agent {
pub name: String,
@ -90,6 +39,7 @@ struct Agent {
#[derive(Clone)]
struct AppState {
agent: AgentService,
leases: LeaseService,
}
#[tokio::main]
@ -105,15 +55,18 @@ async fn main() -> anyhow::Result<()> {
let app = Router::new()
.route("/ping", get(ping))
.route("/logs", get(logs))
.nest(
"/agent",
Router::new()
.route("/enroll", post(enroll))
.route("/ping", post(agent_ping))
.route("/events", post(get_tasks)),
.route("/events", post(get_tasks))
.route("/lease", post(agent_lease)),
)
.with_state(AppState {
agent: AgentService::default(),
leases: LeaseService::default(),
});
tracing::info!("churn server listening on {}", host);
@ -155,15 +108,21 @@ impl IntoResponse for AppError {
async fn enroll(
State(state): State<AppState>,
Json(agent): Json<Agent>,
Json(req): Json<ServerEnrollReq>,
) -> Result<Json<Agent>, AppError> {
let _ = state
.agent
.enroll(agent.clone())
.await
.map_err(|e| AppError::Internal(e));
let name = state.agent.enroll(req).await.map_err(AppError::Internal)?;
Ok(Json(agent))
Ok(Json(Agent { name }))
}
async fn agent_lease(State(state): State<AppState>) -> Result<Json<LeaseResp>, AppError> {
let lease = state
.leases
.create_lease()
.await
.map_err(AppError::Internal)?;
Ok(Json(LeaseResp { token: lease }))
}
async fn agent_ping() -> impl IntoResponse {
@ -177,3 +136,24 @@ async fn get_tasks() -> impl IntoResponse {
async fn ping() -> impl IntoResponse {
"pong!"
}
#[derive(Clone, Deserialize)]
struct LogsQuery {
cursor: Option<String>,
}
async fn logs(Query(cursor): Query<LogsQuery>) -> Result<Json<ServerMonitorResp>, AppError> {
match cursor.cursor {
Some(cursor) => {
tracing::trace!("finding logs from cursor: {}", cursor);
}
None => {
tracing::trace!("finding logs from beginning");
}
}
Ok(Json(ServerMonitorResp {
cursor: uuid::Uuid::new_v4().to_string(),
logs: vec!["something".to_string()],
}))
}

View File

@ -4,6 +4,8 @@ version = "0.1.0"
edition = "2021"
[dependencies]
churn-domain.workspace = true
anyhow.workspace = true
tokio.workspace = true
tracing.workspace = true

View File

@ -1,5 +1,4 @@
use std::net::SocketAddr;
use churn_domain::{AgentEnrollReq, LeaseResp, ServerMonitorResp};
use clap::{Parser, Subcommand};
#[derive(Parser)]
@ -11,10 +10,20 @@ struct Command {
#[derive(Subcommand)]
enum Commands {
Auth {
#[arg(env = "CHURN_SERVER", long)]
server: String,
#[arg(env = "CHURN_SERVER_TOKEN", long)]
server_token: String,
},
Bootstrap {
#[arg(env = "CHURN_AGENT", long)]
agent: String,
#[arg(env = "CHURN_AGENT_NAME", long)]
agent_name: String,
#[arg(env = "CHURN_SERVER", long)]
server: String,
@ -27,6 +36,13 @@ enum Commands {
#[arg(env = "CHURN_AGENT", long)]
agent: String,
},
Monitor {
#[arg(env = "CHURN_SERVER", long)]
server: String,
#[arg(env = "CHURN_SERVER_TOKEN", long)]
server_token: String,
},
}
#[tokio::main]
@ -46,9 +62,38 @@ async fn handle_command(cmd: Command) -> anyhow::Result<()> {
match cmd {
Commands::Bootstrap {
agent,
agent_name,
server,
server_token,
} => todo!(),
server_token: _,
} => {
tracing::info!("enrolling agent: {} for server: {}", agent, server);
let client = reqwest::Client::new();
let req = client.post(format!("{server}/agent/lease")).build()?;
let lease_resp = client.execute(req).await?;
let lease = lease_resp.json::<LeaseResp>().await?;
let req = client
.post(format!("{agent}/enroll"))
.json(&AgentEnrollReq {
lease: lease.token,
server,
agent_name,
})
.build()?;
let lease_resp = client.execute(req).await?;
if !lease_resp.status().is_success() {
if let Ok(text) = lease_resp.text().await {
tracing::warn!(
"could not enroll because agent server encoutered error: {}",
text
);
anyhow::bail!("encountered error: {}", text);
}
anyhow::bail!("encountered error");
}
Ok(())
}
Commands::Health { server, agent } => {
tracing::info!("connecting to server: {}", server);
reqwest::get(format!("{server}/ping")).await?;
@ -58,6 +103,52 @@ async fn handle_command(cmd: Command) -> anyhow::Result<()> {
tracing::info!("connected to agent successfully");
Ok(())
}
Commands::Auth {
server: _,
server_token: _,
} => todo!(),
Commands::Monitor {
server,
server_token,
} => {
tracing::info!("monitoring server: {}", server);
let mut cursor: Option<String> = None;
loop {
tracing::info!("reading logs from server: {}", server);
let resp = reqwest::get(format!(
"{server}/logs{}",
match &cursor {
None => "".to_string(),
Some(cursor) => format!("?cursor={}", cursor),
}
))
.await?;
if !resp.status().is_success() {
if let Ok(text) = resp.text().await {
anyhow::bail!("encountered error: {}", text);
}
anyhow::bail!("encountered error");
}
match resp.json::<ServerMonitorResp>().await {
Ok(resp) => {
for line in resp.logs {
tracing::info!("server: {}", line);
}
cursor = Some(resp.cursor);
}
Err(e) => {
tracing::warn!("failed to call server (error={})", e);
}
}
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
Ok(())
}
}
} else {
panic!("no command supplied")

View File

@ -11,21 +11,30 @@ async fn main() -> eyre::Result<()> {
println!("Building churning...");
let client = dagger_sdk::connect_opts(config).await?;
//let client = dagger_sdk::connect_opts(config).await?;
let client = dagger_sdk::connect().await?;
let agent = build_container(client.clone(), "churn-agent").await?;
let agent = agent
.with_exec(vec!["churn-agent", "daemon", "--host", "0.0.0.0:3000"])
.with_exposed_port(3000);
let cli = build_container(client.clone(), "churn").await?;
let server = build_container(client.clone(), "churn-server").await?;
let server = server
.with_exec(vec!["churn-server", "serve", "--host", "0.0.0.0:3000"])
.with_exposed_port(3000);
let server_id = server.id().await?;
let agent = build_container(client.clone(), "churn-agent").await?;
let agent = agent
.with_service_binding("churn-server", server_id.clone())
.with_exec(vec!["churn-agent", "daemon", "--host", "0.0.0.0:3000"])
.with_exposed_port(3000);
let churning = cli
.with_service_binding("churn-agent", agent.id().await?)
.with_service_binding("churn-server", server.id().await?)
.with_service_binding("churn-server", server_id)
.with_env_variable("CHURN_SERVER", "http://churn-server:3000")
.with_env_variable("CHURN_SERVER_TOKEN", "something")
.with_env_variable("CHURN_AGENT", "http://churn-agent:3000")
.with_env_variable("CHURN_AGENT_NAME", "churn-agent")
.with_exec(vec![
"churn",
"health",
@ -52,7 +61,7 @@ async fn repl(container: dagger_sdk::Container) -> eyre::Result<()> {
let mut container = container;
loop {
let mut stdin = tokio::io::stdin();
let stdin = tokio::io::stdin();
let mut stdout = tokio::io::stdout();
stdout.write_all(b"> ").await?;
@ -79,14 +88,14 @@ async fn repl(container: dagger_sdk::Container) -> eyre::Result<()> {
eprintln!("{}", e);
}
}
match container.stderr().await {
Ok(stderr) => {
println!("{stderr}");
}
Err(e) => {
eprintln!("{}", e);
}
}
// match container.stderr().await {
// Ok(stderr) => {
// println!("{stderr}");
// }
// Err(e) => {
// eprintln!("{}", e);
// }
// }
match container.exit_code().await {
Ok(_) => {}
@ -121,7 +130,7 @@ async fn build_container(
.collect::<Vec<_>>(),
architecture: dagger_rust::build::BuildArchitecture::Amd64,
}],
&bin_name,
bin_name,
)
.await?;