feat: with monitoring

Signed-off-by: kjuulh <contact@kjuulh.io>
This commit is contained in:
Kasper Juul Hermansen 2023-08-27 00:07:56 +02:00
parent 569f5272e6
commit e0545c726c
Signed by: kjuulh
GPG Key ID: 9AA7BC13CE474394
8 changed files with 145 additions and 11 deletions

3
Cargo.lock generated
View File

@ -242,6 +242,7 @@ dependencies = [
"tokio", "tokio",
"tracing", "tracing",
"tracing-subscriber", "tracing-subscriber",
"uuid",
] ]
[[package]] [[package]]
@ -274,6 +275,7 @@ dependencies = [
"tokio", "tokio",
"tracing", "tracing",
"tracing-subscriber", "tracing-subscriber",
"uuid",
] ]
[[package]] [[package]]
@ -2000,6 +2002,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "79daa5ed5740825c40b389c5e50312b9c86df53fccd33f281df655642b43869d" checksum = "79daa5ed5740825c40b389c5e50312b9c86df53fccd33f281df655642b43869d"
dependencies = [ dependencies = [
"getrandom", "getrandom",
"serde",
] ]
[[package]] [[package]]

View File

@ -19,4 +19,4 @@ async-trait = "*"
serde = {version = "1", features = ["derive"]} serde = {version = "1", features = ["derive"]}
serde_json = "1" serde_json = "1"
reqwest = {version = "0.11.20", features = ["json"]} reqwest = {version = "0.11.20", features = ["json"]}
uuid = {version = "1.4.1", features = ["v4"]} uuid = {version = "1.4.1", features = ["v4", "serde"]}

View File

@ -14,4 +14,5 @@ clap.workspace = true
dotenv.workspace = true dotenv.workspace = true
axum.workspace = true axum.workspace = true
reqwest.workspace = true reqwest.workspace = true
serde.workspace = true serde.workspace = true
uuid.workspace = true

View File

@ -20,6 +20,6 @@ pub struct ServerEnrollReq {
#[derive(Clone, Debug, Deserialize, Serialize)] #[derive(Clone, Debug, Deserialize, Serialize)]
pub struct ServerMonitorResp { pub struct ServerMonitorResp {
pub cursor: String, pub cursor: Option<uuid::Uuid>,
pub logs: Vec<String>, pub logs: Vec<String>,
} }

View File

@ -0,0 +1,78 @@
use std::collections::HashMap;
use std::sync::Arc;
use axum::async_trait;
use churn_domain::ServerEnrollReq;
use serde::{ser::SerializeStruct, Deserialize, Serialize};
use tokio::sync::{Mutex, RwLock};
#[derive(Clone)]
pub struct EventService(Arc<dyn EventServiceTrait + Send + Sync + 'static>);
impl std::ops::Deref for EventService {
type Target = Arc<dyn EventServiceTrait + Send + Sync + 'static>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl Default for EventService {
fn default() -> Self {
Self(Arc::new(DefaultEventService::default()))
}
}
#[derive(Default)]
struct DefaultEventService {
agents: Arc<RwLock<Vec<LogEvent>>>,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct LogEvent {
pub id: uuid::Uuid,
pub author: String,
pub content: String,
}
impl LogEvent {
pub fn new(author: impl Into<String>, content: impl Into<String>) -> Self {
Self {
id: uuid::Uuid::new_v4(),
author: author.into(),
content: content.into(),
}
}
}
#[async_trait]
pub trait EventServiceTrait {
async fn append(&self, req: LogEvent) -> anyhow::Result<()>;
async fn get_from_cursor(&self, cursor: uuid::Uuid) -> anyhow::Result<Vec<LogEvent>>;
async fn get_from_beginning(&self) -> anyhow::Result<Vec<LogEvent>>;
}
#[async_trait]
impl EventServiceTrait for DefaultEventService {
async fn append(&self, req: LogEvent) -> anyhow::Result<()> {
let mut events = self.agents.write().await;
events.push(req);
Ok(())
}
async fn get_from_cursor(&self, cursor: uuid::Uuid) -> anyhow::Result<Vec<LogEvent>> {
let events = self.agents.read().await;
let items = events
.iter()
.skip_while(|item| item.id != cursor)
.skip(1)
.cloned()
.collect::<Vec<_>>();
Ok(items)
}
async fn get_from_beginning(&self) -> anyhow::Result<Vec<LogEvent>> {
let events = self.agents.read().await;
Ok(events.iter().cloned().collect())
}
}

View File

@ -1,4 +1,5 @@
mod agent; mod agent;
mod event;
mod lease; mod lease;
use std::net::SocketAddr; use std::net::SocketAddr;
@ -12,6 +13,7 @@ use axum::routing::{get, post};
use axum::{Json, Router}; use axum::{Json, Router};
use churn_domain::{LeaseResp, ServerEnrollReq, ServerMonitorResp}; use churn_domain::{LeaseResp, ServerEnrollReq, ServerMonitorResp};
use clap::{Parser, Subcommand}; use clap::{Parser, Subcommand};
use event::{EventService, LogEvent};
use lease::LeaseService; use lease::LeaseService;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use serde_json::json; use serde_json::json;
@ -40,6 +42,7 @@ struct Agent {
struct AppState { struct AppState {
agent: AgentService, agent: AgentService,
leases: LeaseService, leases: LeaseService,
events: EventService,
} }
#[tokio::main] #[tokio::main]
@ -67,6 +70,7 @@ async fn main() -> anyhow::Result<()> {
.with_state(AppState { .with_state(AppState {
agent: AgentService::default(), agent: AgentService::default(),
leases: LeaseService::default(), leases: LeaseService::default(),
events: EventService::default(),
}); });
tracing::info!("churn server listening on {}", host); tracing::info!("churn server listening on {}", host);
@ -110,8 +114,20 @@ async fn enroll(
State(state): State<AppState>, State(state): State<AppState>,
Json(req): Json<ServerEnrollReq>, Json(req): Json<ServerEnrollReq>,
) -> Result<Json<Agent>, AppError> { ) -> Result<Json<Agent>, AppError> {
state
.events
.append(LogEvent::new(&req.agent_name, "attempting to enroll agent"))
.await
.map_err(AppError::Internal)?;
let name = state.agent.enroll(req).await.map_err(AppError::Internal)?; let name = state.agent.enroll(req).await.map_err(AppError::Internal)?;
state
.events
.append(LogEvent::new(&name, "enrolled agent"))
.await
.map_err(AppError::Internal)?;
Ok(Json(Agent { name })) Ok(Json(Agent { name }))
} }
@ -139,10 +155,29 @@ async fn ping() -> impl IntoResponse {
#[derive(Clone, Deserialize)] #[derive(Clone, Deserialize)]
struct LogsQuery { struct LogsQuery {
cursor: Option<String>, cursor: Option<uuid::Uuid>,
} }
async fn logs(Query(cursor): Query<LogsQuery>) -> Result<Json<ServerMonitorResp>, AppError> { async fn logs(
State(state): State<AppState>,
Query(cursor): Query<LogsQuery>,
) -> Result<Json<ServerMonitorResp>, AppError> {
state
.events
.append(LogEvent::new(
"author",
format!(
"logs called: {}",
cursor
.cursor
.as_ref()
.map(|c| format!("(cursor={c})"))
.unwrap_or("".to_string())
),
))
.await
.map_err(AppError::Internal)?;
match cursor.cursor { match cursor.cursor {
Some(cursor) => { Some(cursor) => {
tracing::trace!("finding logs from cursor: {}", cursor); tracing::trace!("finding logs from cursor: {}", cursor);
@ -152,8 +187,24 @@ async fn logs(Query(cursor): Query<LogsQuery>) -> Result<Json<ServerMonitorResp>
} }
} }
let events = match cursor.cursor {
Some(c) => state.events.get_from_cursor(c).await,
None => state.events.get_from_beginning().await,
}
.map_err(AppError::Internal)?;
if events.is_empty() {
return Ok(Json(ServerMonitorResp {
cursor: cursor.cursor.clone(),
logs: Vec::new(),
}));
}
Ok(Json(ServerMonitorResp { Ok(Json(ServerMonitorResp {
cursor: uuid::Uuid::new_v4().to_string(), cursor: events.last().map(|e| e.id),
logs: vec!["something".to_string()], logs: events
.iter()
.map(|e| format!("{}: {}", e.author, e.content))
.collect(),
})) }))
} }

View File

@ -14,3 +14,4 @@ clap.workspace = true
dotenv.workspace = true dotenv.workspace = true
axum.workspace = true axum.workspace = true
reqwest.workspace = true reqwest.workspace = true
uuid.workspace = true

View File

@ -113,9 +113,9 @@ async fn handle_command(cmd: Command) -> anyhow::Result<()> {
} => { } => {
tracing::info!("monitoring server: {}", server); tracing::info!("monitoring server: {}", server);
let mut cursor: Option<String> = None; let mut cursor: Option<uuid::Uuid> = None;
loop { loop {
tracing::info!("reading logs from server: {}", server); tracing::debug!("reading logs from server: {}", server);
let resp = reqwest::get(format!( let resp = reqwest::get(format!(
"{server}/logs{}", "{server}/logs{}",
@ -136,9 +136,9 @@ async fn handle_command(cmd: Command) -> anyhow::Result<()> {
match resp.json::<ServerMonitorResp>().await { match resp.json::<ServerMonitorResp>().await {
Ok(resp) => { Ok(resp) => {
for line in resp.logs { for line in resp.logs {
tracing::info!("server: {}", line); tracing::info!("event: {}", line);
} }
cursor = Some(resp.cursor); cursor = resp.cursor;
} }
Err(e) => { Err(e) => {
tracing::warn!("failed to call server (error={})", e); tracing::warn!("failed to call server (error={})", e);