#![feature(slice_pattern)] mod agent; mod db; mod event; mod lease; use std::net::SocketAddr; use std::path::PathBuf; use agent::AgentService; use anyhow::Error; use axum::extract::{Query, State}; use axum::http::StatusCode; use axum::response::{IntoResponse, Response}; use axum::routing::{get, post}; use axum::{Json, Router}; use churn_domain::{Agent, LeaseResp, LogEvent, ServerEnrollReq, ServerMonitorResp}; use clap::{Args, Parser, Subcommand, ValueEnum}; use event::EventService; use lease::LeaseService; use serde::Deserialize; use serde_json::json; use tokio::net::TcpListener; use crate::db::Db; #[derive(Parser)] #[command(author, version, about, long_about = None, subcommand_required = true)] struct Command { #[command(subcommand)] command: Option, #[clap(flatten)] global: GlobalArgs, } #[derive(Args)] struct GlobalArgs { #[arg(env = "CHURN_DATABASE", long, default_value = "sled")] database: DatabaseType, #[arg(env = "CHURN_SLED_PATH", long, default_value = "churn-server.sled")] sled_path: PathBuf, } #[derive(ValueEnum, Clone)] enum DatabaseType { Sled, } #[derive(Subcommand)] enum Commands { Serve { #[arg(env = "SERVICE_HOST", long, default_value = "127.0.0.1:3000")] host: SocketAddr, }, } #[derive(Clone)] struct AppState { agent: AgentService, leases: LeaseService, events: EventService, } #[tokio::main] async fn main() -> anyhow::Result<()> { dotenv::dotenv().ok(); tracing_subscriber::fmt::init(); let cli = Command::parse(); if let Some(Commands::Serve { host }) = cli.command { tracing::info!("Starting churn server"); let db = match cli.global.database { DatabaseType::Sled => Db::new_sled(&cli.global.sled_path), }; let app = Router::new() .route("/ping", get(ping)) .route("/logs", get(logs)) .nest( "/agent", Router::new() .route("/enroll", post(enroll)) .route("/ping", post(agent_ping)) .route("/events", post(get_tasks)) .route("/lease", post(agent_lease)), ) .with_state(AppState { agent: AgentService::new(db.clone()), leases: LeaseService::new(db.clone()), events: EventService::new(db.clone()), }); tracing::info!("churn server listening on {}", host); let listener = TcpListener::bind(&host).await?; axum::serve(listener, app.into_make_service()) .await .unwrap(); } Ok(()) } enum AppError { Internal(Error), } impl IntoResponse for AppError { fn into_response(self) -> Response { let (status, error_message) = match self { AppError::Internal(e) => { tracing::error!("failed with error: {}", e); ( StatusCode::INTERNAL_SERVER_ERROR, "failed with internal error", ) } }; let body = Json(json!({ "error": error_message, })); (status, body).into_response() } } async fn enroll( State(state): State, Json(req): Json, ) -> Result, AppError> { state .events .append(LogEvent::new(&req.agent_name, "attempting to enroll agent")) .await .map_err(AppError::Internal)?; let name = state.agent.enroll(req).await.map_err(AppError::Internal)?; state .events .append(LogEvent::new(&name, "enrolled agent")) .await .map_err(AppError::Internal)?; Ok(Json(Agent { name })) } async fn agent_lease(State(state): State) -> Result, AppError> { let lease = state .leases .create_lease() .await .map_err(AppError::Internal)?; Ok(Json(LeaseResp { token: lease })) } async fn agent_ping() -> impl IntoResponse { todo!() } async fn get_tasks() -> impl IntoResponse { todo!() } async fn ping() -> impl IntoResponse { "pong!" } #[derive(Clone, Deserialize)] struct LogsQuery { cursor: Option, } async fn logs( State(state): State, Query(cursor): Query, ) -> Result, AppError> { state .events .append(LogEvent::new( "author", format!( "logs called: {}", cursor .cursor .as_ref() .map(|c| format!("(cursor={c})")) .unwrap_or("".to_string()) ), )) .await .map_err(AppError::Internal)?; match cursor.cursor { Some(cursor) => { tracing::debug!("finding logs from cursor: {}", cursor); } None => { tracing::debug!("finding logs from beginning"); } } match cursor.cursor { Some(c) => { let events = state .events .get_from_cursor(c) .await .map_err(AppError::Internal)?; Ok(Json(ServerMonitorResp { cursor: events.last().map(|e| e.id), logs: events .iter() .map(|e| format!("{}: {}", e.author, e.content)) .collect(), })) } None => { let cursor = state .events .get_latest_cursor() .await .map_err(AppError::Internal)?; Ok(Json(ServerMonitorResp { cursor: Some(cursor), logs: Vec::new(), })) } } }