From e0545c726c44dccfb8ea179266c1da93389c07e4 Mon Sep 17 00:00:00 2001 From: kjuulh Date: Sun, 27 Aug 2023 00:07:56 +0200 Subject: [PATCH] feat: with monitoring Signed-off-by: kjuulh --- Cargo.lock | 3 ++ Cargo.toml | 2 +- crates/churn-domain/Cargo.toml | 3 +- crates/churn-domain/src/lib.rs | 2 +- crates/churn-server/src/event.rs | 78 ++++++++++++++++++++++++++++++++ crates/churn-server/src/main.rs | 59 ++++++++++++++++++++++-- crates/churn/Cargo.toml | 1 + crates/churn/src/main.rs | 8 ++-- 8 files changed, 145 insertions(+), 11 deletions(-) create mode 100644 crates/churn-server/src/event.rs diff --git a/Cargo.lock b/Cargo.lock index 8e91e75..46b4170 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -242,6 +242,7 @@ dependencies = [ "tokio", "tracing", "tracing-subscriber", + "uuid", ] [[package]] @@ -274,6 +275,7 @@ dependencies = [ "tokio", "tracing", "tracing-subscriber", + "uuid", ] [[package]] @@ -2000,6 +2002,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "79daa5ed5740825c40b389c5e50312b9c86df53fccd33f281df655642b43869d" dependencies = [ "getrandom", + "serde", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index defc137..a935d3f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,4 +19,4 @@ async-trait = "*" serde = {version = "1", features = ["derive"]} serde_json = "1" reqwest = {version = "0.11.20", features = ["json"]} -uuid = {version = "1.4.1", features = ["v4"]} +uuid = {version = "1.4.1", features = ["v4", "serde"]} diff --git a/crates/churn-domain/Cargo.toml b/crates/churn-domain/Cargo.toml index d765bcb..60e52be 100644 --- a/crates/churn-domain/Cargo.toml +++ b/crates/churn-domain/Cargo.toml @@ -14,4 +14,5 @@ clap.workspace = true dotenv.workspace = true axum.workspace = true reqwest.workspace = true -serde.workspace = true \ No newline at end of file +serde.workspace = true +uuid.workspace = true \ No newline at end of file diff --git a/crates/churn-domain/src/lib.rs b/crates/churn-domain/src/lib.rs index 45cf361..0d90a17 100644 --- a/crates/churn-domain/src/lib.rs +++ b/crates/churn-domain/src/lib.rs @@ -20,6 +20,6 @@ pub struct ServerEnrollReq { #[derive(Clone, Debug, Deserialize, Serialize)] pub struct ServerMonitorResp { - pub cursor: String, + pub cursor: Option, pub logs: Vec, } diff --git a/crates/churn-server/src/event.rs b/crates/churn-server/src/event.rs new file mode 100644 index 0000000..13ea61c --- /dev/null +++ b/crates/churn-server/src/event.rs @@ -0,0 +1,78 @@ +use std::collections::HashMap; + +use std::sync::Arc; + +use axum::async_trait; + +use churn_domain::ServerEnrollReq; +use serde::{ser::SerializeStruct, Deserialize, Serialize}; +use tokio::sync::{Mutex, RwLock}; + +#[derive(Clone)] +pub struct EventService(Arc); + +impl std::ops::Deref for EventService { + type Target = Arc; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl Default for EventService { + fn default() -> Self { + Self(Arc::new(DefaultEventService::default())) + } +} + +#[derive(Default)] +struct DefaultEventService { + agents: Arc>>, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct LogEvent { + pub id: uuid::Uuid, + pub author: String, + pub content: String, +} + +impl LogEvent { + pub fn new(author: impl Into, content: impl Into) -> Self { + Self { + id: uuid::Uuid::new_v4(), + author: author.into(), + content: content.into(), + } + } +} + +#[async_trait] +pub trait EventServiceTrait { + async fn append(&self, req: LogEvent) -> anyhow::Result<()>; + async fn get_from_cursor(&self, cursor: uuid::Uuid) -> anyhow::Result>; + async fn get_from_beginning(&self) -> anyhow::Result>; +} + +#[async_trait] +impl EventServiceTrait for DefaultEventService { + async fn append(&self, req: LogEvent) -> anyhow::Result<()> { + let mut events = self.agents.write().await; + events.push(req); + Ok(()) + } + async fn get_from_cursor(&self, cursor: uuid::Uuid) -> anyhow::Result> { + let events = self.agents.read().await; + let items = events + .iter() + .skip_while(|item| item.id != cursor) + .skip(1) + .cloned() + .collect::>(); + Ok(items) + } + async fn get_from_beginning(&self) -> anyhow::Result> { + let events = self.agents.read().await; + Ok(events.iter().cloned().collect()) + } +} diff --git a/crates/churn-server/src/main.rs b/crates/churn-server/src/main.rs index a94891c..505fc64 100644 --- a/crates/churn-server/src/main.rs +++ b/crates/churn-server/src/main.rs @@ -1,4 +1,5 @@ mod agent; +mod event; mod lease; use std::net::SocketAddr; @@ -12,6 +13,7 @@ use axum::routing::{get, post}; use axum::{Json, Router}; use churn_domain::{LeaseResp, ServerEnrollReq, ServerMonitorResp}; use clap::{Parser, Subcommand}; +use event::{EventService, LogEvent}; use lease::LeaseService; use serde::{Deserialize, Serialize}; use serde_json::json; @@ -40,6 +42,7 @@ struct Agent { struct AppState { agent: AgentService, leases: LeaseService, + events: EventService, } #[tokio::main] @@ -67,6 +70,7 @@ async fn main() -> anyhow::Result<()> { .with_state(AppState { agent: AgentService::default(), leases: LeaseService::default(), + events: EventService::default(), }); tracing::info!("churn server listening on {}", host); @@ -110,8 +114,20 @@ async fn enroll( State(state): State, Json(req): Json, ) -> Result, AppError> { + state + .events + .append(LogEvent::new(&req.agent_name, "attempting to enroll agent")) + .await + .map_err(AppError::Internal)?; + let name = state.agent.enroll(req).await.map_err(AppError::Internal)?; + state + .events + .append(LogEvent::new(&name, "enrolled agent")) + .await + .map_err(AppError::Internal)?; + Ok(Json(Agent { name })) } @@ -139,10 +155,29 @@ async fn ping() -> impl IntoResponse { #[derive(Clone, Deserialize)] struct LogsQuery { - cursor: Option, + cursor: Option, } -async fn logs(Query(cursor): Query) -> Result, AppError> { +async fn logs( + State(state): State, + Query(cursor): Query, +) -> Result, AppError> { + state + .events + .append(LogEvent::new( + "author", + format!( + "logs called: {}", + cursor + .cursor + .as_ref() + .map(|c| format!("(cursor={c})")) + .unwrap_or("".to_string()) + ), + )) + .await + .map_err(AppError::Internal)?; + match cursor.cursor { Some(cursor) => { tracing::trace!("finding logs from cursor: {}", cursor); @@ -152,8 +187,24 @@ async fn logs(Query(cursor): Query) -> Result } } + let events = match cursor.cursor { + Some(c) => state.events.get_from_cursor(c).await, + None => state.events.get_from_beginning().await, + } + .map_err(AppError::Internal)?; + + if events.is_empty() { + return Ok(Json(ServerMonitorResp { + cursor: cursor.cursor.clone(), + logs: Vec::new(), + })); + } + Ok(Json(ServerMonitorResp { - cursor: uuid::Uuid::new_v4().to_string(), - logs: vec!["something".to_string()], + cursor: events.last().map(|e| e.id), + logs: events + .iter() + .map(|e| format!("{}: {}", e.author, e.content)) + .collect(), })) } diff --git a/crates/churn/Cargo.toml b/crates/churn/Cargo.toml index d8ed1a2..071dc89 100644 --- a/crates/churn/Cargo.toml +++ b/crates/churn/Cargo.toml @@ -14,3 +14,4 @@ clap.workspace = true dotenv.workspace = true axum.workspace = true reqwest.workspace = true +uuid.workspace = true diff --git a/crates/churn/src/main.rs b/crates/churn/src/main.rs index 0b31cf2..114eb4c 100644 --- a/crates/churn/src/main.rs +++ b/crates/churn/src/main.rs @@ -113,9 +113,9 @@ async fn handle_command(cmd: Command) -> anyhow::Result<()> { } => { tracing::info!("monitoring server: {}", server); - let mut cursor: Option = None; + let mut cursor: Option = None; loop { - tracing::info!("reading logs from server: {}", server); + tracing::debug!("reading logs from server: {}", server); let resp = reqwest::get(format!( "{server}/logs{}", @@ -136,9 +136,9 @@ async fn handle_command(cmd: Command) -> anyhow::Result<()> { match resp.json::().await { Ok(resp) => { for line in resp.logs { - tracing::info!("server: {}", line); + tracing::info!("event: {}", line); } - cursor = Some(resp.cursor); + cursor = resp.cursor; } Err(e) => { tracing::warn!("failed to call server (error={})", e);