diff --git a/Cargo.lock b/Cargo.lock index bd19d8e..4d7cdd5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -263,6 +263,7 @@ dependencies = [ "reqwest", "rusqlite", "serde", + "serde_json", "tokio", "tokio-util", "toml", diff --git a/crates/churn/Cargo.toml b/crates/churn/Cargo.toml index ba2bf7c..811d02a 100644 --- a/crates/churn/Cargo.toml +++ b/crates/churn/Cargo.toml @@ -28,3 +28,4 @@ toml = "0.8.19" dirs = "5.0.1" futures = "0.3.31" reqwest = { version = "0.12.9", features = ["json"] } +serde_json = "1.0.133" diff --git a/crates/churn/proto/churn/v1/churn.proto b/crates/churn/proto/churn/v1/churn.proto index 1a9d5b9..9921450 100644 --- a/crates/churn/proto/churn/v1/churn.proto +++ b/crates/churn/proto/churn/v1/churn.proto @@ -30,5 +30,6 @@ message ListenEventsRequest { optional string id = 2; } message ListenEventsResponse { - string value = 1; + string id = 1; + string value = 2; } diff --git a/crates/churn/src/agent.rs b/crates/churn/src/agent.rs index 251175b..d5282db 100644 --- a/crates/churn/src/agent.rs +++ b/crates/churn/src/agent.rs @@ -4,12 +4,20 @@ use refresh::AgentRefresh; pub use config::setup_config; +pub mod models; + mod agent_state; mod config; mod discovery_client; mod event_handler; mod grpc_client; +mod queue; mod refresh; +mod scheduler; + +mod handlers; + +mod actions; pub async fn execute() -> anyhow::Result<()> { let state = AgentState::new().await?; @@ -17,6 +25,7 @@ pub async fn execute() -> anyhow::Result<()> { notmad::Mad::builder() .add(AgentRefresh::new(&state)) .add(EventHandler::new(&state)) + .add(state.queue.clone()) .cancellation(Some(std::time::Duration::from_secs(2))) .run() .await?; diff --git a/crates/churn/src/agent/actions.rs b/crates/churn/src/agent/actions.rs new file mode 100644 index 0000000..f7512a8 --- /dev/null +++ b/crates/churn/src/agent/actions.rs @@ -0,0 +1,59 @@ +use anyhow::Context; + +pub struct Plan {} +impl Plan { + pub fn new() -> Self { + Self {} + } + + pub async fn tasks(&self) -> anyhow::Result> { + Ok(vec![Task::new()]) + } +} + +pub struct Task {} + +impl Task { + pub fn new() -> Self { + Self {} + } + + pub fn id(&self) -> String { + "apt".into() + } + + pub async fn should_run(&self) -> anyhow::Result { + Ok(true) + } + + pub async fn execute(&self) -> anyhow::Result<()> { + let mut cmd = tokio::process::Command::new("apt-get"); + cmd.args(["update", "-q"]); + let output = cmd.output().await.context("failed to run apt update")?; + match output.status.success() { + true => tracing::info!("successfully ran apt update"), + false => { + anyhow::bail!( + "failed to run apt update: {}", + std::str::from_utf8(&output.stderr)? + ); + } + } + + let mut cmd = tokio::process::Command::new("apt-get"); + cmd.env("DEBIAN_FRONTEND", "noninteractive") + .args(["upgrade", "-y"]); + let output = cmd.output().await.context("failed to run apt upgrade")?; + match output.status.success() { + true => tracing::info!("successfully ran apt upgrade"), + false => { + anyhow::bail!( + "failed to run apt upgrade: {}", + std::str::from_utf8(&output.stderr)? + ); + } + } + + Ok(()) + } +} diff --git a/crates/churn/src/agent/agent_state.rs b/crates/churn/src/agent/agent_state.rs index adb1605..4e86669 100644 --- a/crates/churn/src/agent/agent_state.rs +++ b/crates/churn/src/agent/agent_state.rs @@ -2,7 +2,14 @@ use std::{ops::Deref, sync::Arc}; use crate::api::Discovery; -use super::{config::AgentConfig, discovery_client::DiscoveryClient, grpc_client::GrpcClient}; +use super::{ + config::AgentConfig, + discovery_client::DiscoveryClient, + grpc_client::GrpcClient, + handlers::scheduled_tasks::{self, ScheduledTasks}, + queue::AgentQueue, + scheduler::Scheduler, +}; #[derive(Clone)] pub struct AgentState(Arc); @@ -31,20 +38,23 @@ pub struct State { pub grpc: GrpcClient, pub config: AgentConfig, pub discovery: Discovery, + pub queue: AgentQueue, } impl State { pub async fn new() -> anyhow::Result { let config = AgentConfig::new().await?; - let discovery = DiscoveryClient::new(&config.discovery); - let discovery = discovery.discover().await?; - + let discovery = DiscoveryClient::new(&config.discovery).discover().await?; let grpc = GrpcClient::new(&discovery.process_host); + let scheduled_tasks = ScheduledTasks::new(); + let scheduler = Scheduler::new(scheduled_tasks); + let queue = AgentQueue::new(scheduler); Ok(Self { grpc, config, discovery, + queue, }) } } diff --git a/crates/churn/src/agent/event_handler.rs b/crates/churn/src/agent/event_handler.rs index 7109c4d..ec061a9 100644 --- a/crates/churn/src/agent/event_handler.rs +++ b/crates/churn/src/agent/event_handler.rs @@ -1,11 +1,16 @@ use notmad::{Component, MadError}; -use super::{agent_state::AgentState, config::AgentConfig, grpc_client::GrpcClient}; +use crate::agent::models::Commands; + +use super::{ + agent_state::AgentState, config::AgentConfig, grpc_client::GrpcClient, queue::AgentQueue, +}; #[derive(Clone)] pub struct EventHandler { config: AgentConfig, grpc: GrpcClient, + queue: AgentQueue, } impl EventHandler { @@ -15,6 +20,7 @@ impl EventHandler { Self { config: state.config.clone(), grpc: state.grpc.clone(), + queue: state.queue.clone(), } } } @@ -46,7 +52,11 @@ impl Component for EventHandler { #[async_trait::async_trait] impl super::grpc_client::ListenEventsExecutor for EventHandler { async fn execute(&self, event: crate::grpc::ListenEventsResponse) -> anyhow::Result<()> { - tracing::info!(value = event.value, "received event"); + tracing::info!(value = event.id, "received event"); + + let event: Commands = serde_json::from_str(&event.value)?; + + self.queue.publish(event).await?; Ok(()) } diff --git a/crates/churn/src/agent/handlers.rs b/crates/churn/src/agent/handlers.rs new file mode 100644 index 0000000..7c0969b --- /dev/null +++ b/crates/churn/src/agent/handlers.rs @@ -0,0 +1 @@ +pub mod scheduled_tasks; diff --git a/crates/churn/src/agent/handlers/scheduled_tasks.rs b/crates/churn/src/agent/handlers/scheduled_tasks.rs new file mode 100644 index 0000000..574f46c --- /dev/null +++ b/crates/churn/src/agent/handlers/scheduled_tasks.rs @@ -0,0 +1,36 @@ +use std::collections::BTreeMap; + +use crate::agent::actions::Plan; + +#[derive(Clone)] +pub struct ScheduledTasks {} +impl ScheduledTasks { + pub fn new() -> Self { + Self {} + } + + pub async fn handle( + &self, + task: &str, + _properties: BTreeMap, + ) -> anyhow::Result<()> { + // Get plan + let plan = Plan::new(); + let tasks = plan.tasks().await?; + + // For task + for task in tasks { + // Check idempotency rules + if !task.should_run().await? { + tracing::debug!(task = task.id(), "skipping run"); + continue; + } + + // Run task if not valid + tracing::info!(task = task.id(), "executing task"); + task.execute().await?; + } + + Ok(()) + } +} diff --git a/crates/churn/src/agent/models.rs b/crates/churn/src/agent/models.rs new file mode 100644 index 0000000..a6f8705 --- /dev/null +++ b/crates/churn/src/agent/models.rs @@ -0,0 +1,20 @@ +use std::{collections::BTreeMap, fmt::Display}; + +use serde::{Deserialize, Serialize}; + +#[derive(Serialize, Deserialize, Clone, Debug)] +#[serde(tag = "type")] +pub enum Commands { + ScheduleTask { + task: String, + properties: BTreeMap, + }, +} + +impl Display for Commands { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str(match self { + Commands::ScheduleTask { .. } => "schedule_task", + }) + } +} diff --git a/crates/churn/src/agent/queue.rs b/crates/churn/src/agent/queue.rs new file mode 100644 index 0000000..2ea9e72 --- /dev/null +++ b/crates/churn/src/agent/queue.rs @@ -0,0 +1,67 @@ +use std::sync::Arc; + +use notmad::{Component, MadError}; +use tokio::sync::Mutex; + +use super::{handlers::scheduled_tasks::ScheduledTasks, models::Commands, scheduler::Scheduler}; + +#[derive(Clone)] +pub struct AgentQueue { + sender: Arc>, + receiver: Arc>>, + + scheduler: Scheduler, +} + +impl AgentQueue { + pub fn new(scheduler: Scheduler) -> Self { + let (tx, rx) = tokio::sync::mpsc::channel(5); + Self { + sender: Arc::new(tx), + receiver: Arc::new(Mutex::new(rx)), + + scheduler, + } + } + + pub async fn handler(&self, command: Commands) -> anyhow::Result<()> { + tracing::debug!("handling task"); + + self.scheduler.handle(command).await?; + + Ok(()) + } + + pub async fn publish(&self, command: Commands) -> anyhow::Result<()> { + tracing::debug!("publishing task: {}", command.to_string()); + + self.sender.send(command).await?; + + Ok(()) + } +} + +#[async_trait::async_trait] +impl Component for AgentQueue { + async fn run( + &self, + cancellation_token: tokio_util::sync::CancellationToken, + ) -> Result<(), notmad::MadError> { + loop { + let mut recv = self.receiver.lock().await; + + tokio::select! { + res = recv.recv() => { + if let Some(res) = res { + self.handler(res).await.map_err(MadError::Inner)?; + } + } + _ = cancellation_token.cancelled() => { + break + } + } + } + + Ok(()) + } +} diff --git a/crates/churn/src/agent/refresh.rs b/crates/churn/src/agent/refresh.rs index 1c24dd8..e87d4d7 100644 --- a/crates/churn/src/agent/refresh.rs +++ b/crates/churn/src/agent/refresh.rs @@ -1,10 +1,15 @@ +use std::collections::BTreeMap; + use anyhow::Context; -use super::agent_state::AgentState; +use crate::agent::models::Commands; + +use super::{agent_state::AgentState, queue::AgentQueue}; #[derive(Clone)] pub struct AgentRefresh { process_host: String, + queue: AgentQueue, } impl AgentRefresh { @@ -12,6 +17,7 @@ impl AgentRefresh { let state: AgentState = state.into(); Self { process_host: state.discovery.process_host.clone(), + queue: state.queue.clone(), } } } @@ -44,80 +50,12 @@ impl nodrift::Drifter for AgentRefresh { async fn execute(&self, _token: tokio_util::sync::CancellationToken) -> anyhow::Result<()> { tracing::info!(process_host = self.process_host, "refreshing agent"); - // Get plan - let plan = Plan::new(); - let tasks = plan.tasks().await?; - - // For task - for task in tasks { - // Check idempotency rules - if !task.should_run().await? { - tracing::debug!(task = task.id(), "skipping run"); - continue; - } - - // Run task if not valid - tracing::info!(task = task.id(), "executing task"); - task.execute().await?; - } - - Ok(()) - } -} - -pub struct Plan {} -impl Plan { - pub fn new() -> Self { - Self {} - } - - pub async fn tasks(&self) -> anyhow::Result> { - Ok(vec![Task::new()]) - } -} - -pub struct Task {} - -impl Task { - pub fn new() -> Self { - Self {} - } - - pub fn id(&self) -> String { - "apt".into() - } - - pub async fn should_run(&self) -> anyhow::Result { - Ok(true) - } - - pub async fn execute(&self) -> anyhow::Result<()> { - let mut cmd = tokio::process::Command::new("apt-get"); - cmd.args(["update", "-q"]); - let output = cmd.output().await.context("failed to run apt update")?; - match output.status.success() { - true => tracing::info!("successfully ran apt update"), - false => { - anyhow::bail!( - "failed to run apt update: {}", - std::str::from_utf8(&output.stderr)? - ); - } - } - - let mut cmd = tokio::process::Command::new("apt-get"); - cmd.env("DEBIAN_FRONTEND", "noninteractive") - .args(["upgrade", "-y"]); - let output = cmd.output().await.context("failed to run apt upgrade")?; - match output.status.success() { - true => tracing::info!("successfully ran apt upgrade"), - false => { - anyhow::bail!( - "failed to run apt upgrade: {}", - std::str::from_utf8(&output.stderr)? - ); - } - } + self.queue + .publish(Commands::ScheduleTask { + task: "update".into(), + properties: BTreeMap::default(), + }) + .await?; Ok(()) } diff --git a/crates/churn/src/agent/scheduler.rs b/crates/churn/src/agent/scheduler.rs new file mode 100644 index 0000000..5b9a077 --- /dev/null +++ b/crates/churn/src/agent/scheduler.rs @@ -0,0 +1,22 @@ +use super::{handlers::scheduled_tasks::ScheduledTasks, models::Commands}; + +#[derive(Clone)] +pub struct Scheduler { + scheduled_tasks: ScheduledTasks, +} + +impl Scheduler { + pub fn new(scheduled_tasks: ScheduledTasks) -> Self { + Self { scheduled_tasks } + } + + pub async fn handle(&self, command: Commands) -> anyhow::Result<()> { + match command { + Commands::ScheduleTask { task, properties } => { + self.scheduled_tasks.handle(&task, properties).await?; + } + } + + Ok(()) + } +} diff --git a/crates/churn/src/grpc/churn.v1.rs b/crates/churn/src/grpc/churn.v1.rs index de04539..f076fd7 100644 --- a/crates/churn/src/grpc/churn.v1.rs +++ b/crates/churn/src/grpc/churn.v1.rs @@ -44,6 +44,8 @@ pub struct ListenEventsRequest { #[derive(Clone, PartialEq, ::prost::Message)] pub struct ListenEventsResponse { #[prost(string, tag="1")] + pub id: ::prost::alloc::string::String, + #[prost(string, tag="2")] pub value: ::prost::alloc::string::String, } include!("churn.v1.tonic.rs"); diff --git a/crates/churn/src/server/grpc_server.rs b/crates/churn/src/server/grpc_server.rs index 6022c9c..ad92773 100644 --- a/crates/churn/src/server/grpc_server.rs +++ b/crates/churn/src/server/grpc_server.rs @@ -1,11 +1,11 @@ -use std::{net::SocketAddr, pin::Pin}; +use std::{collections::BTreeMap, net::SocketAddr, pin::Pin}; use anyhow::Context; use futures::Stream; use notmad::{Component, MadError}; use tonic::transport::Server; -use crate::grpc::*; +use crate::{agent::models::Commands, grpc::*}; #[derive(Clone)] pub struct GrpcServer { @@ -70,9 +70,18 @@ impl crate::grpc::churn_server::Churn for GrpcServer { loop { interval.tick().await; + let Ok(schedule_task) = serde_json::to_string(&Commands::ScheduleTask { + task: "refresh".into(), + properties: BTreeMap::default(), + }) else { + tracing::warn!("failed to serialize event"); + continue; + }; + if let Err(e) = tx .send(Ok(ListenEventsResponse { - value: uuid::Uuid::new_v4().to_string(), + id: uuid::Uuid::new_v4().to_string(), + value: schedule_task, })) .await {