feat: add common queue
All checks were successful
continuous-integration/drone/push Build is passing

Signed-off-by: kjuulh <contact@kjuulh.io>
This commit is contained in:
Kasper Juul Hermansen 2024-11-24 21:08:37 +01:00
parent ee323e99e8
commit ea5adb2f93
Signed by: kjuulh
GPG Key ID: D85D7535F18F35FA
15 changed files with 271 additions and 85 deletions

1
Cargo.lock generated
View File

@ -263,6 +263,7 @@ dependencies = [
"reqwest",
"rusqlite",
"serde",
"serde_json",
"tokio",
"tokio-util",
"toml",

View File

@ -28,3 +28,4 @@ toml = "0.8.19"
dirs = "5.0.1"
futures = "0.3.31"
reqwest = { version = "0.12.9", features = ["json"] }
serde_json = "1.0.133"

View File

@ -30,5 +30,6 @@ message ListenEventsRequest {
optional string id = 2;
}
message ListenEventsResponse {
string value = 1;
string id = 1;
string value = 2;
}

View File

@ -4,12 +4,20 @@ use refresh::AgentRefresh;
pub use config::setup_config;
pub mod models;
mod agent_state;
mod config;
mod discovery_client;
mod event_handler;
mod grpc_client;
mod queue;
mod refresh;
mod scheduler;
mod handlers;
mod actions;
pub async fn execute() -> anyhow::Result<()> {
let state = AgentState::new().await?;
@ -17,6 +25,7 @@ pub async fn execute() -> anyhow::Result<()> {
notmad::Mad::builder()
.add(AgentRefresh::new(&state))
.add(EventHandler::new(&state))
.add(state.queue.clone())
.cancellation(Some(std::time::Duration::from_secs(2)))
.run()
.await?;

View File

@ -0,0 +1,59 @@
use anyhow::Context;
pub struct Plan {}
impl Plan {
pub fn new() -> Self {
Self {}
}
pub async fn tasks(&self) -> anyhow::Result<Vec<Task>> {
Ok(vec![Task::new()])
}
}
pub struct Task {}
impl Task {
pub fn new() -> Self {
Self {}
}
pub fn id(&self) -> String {
"apt".into()
}
pub async fn should_run(&self) -> anyhow::Result<bool> {
Ok(true)
}
pub async fn execute(&self) -> anyhow::Result<()> {
let mut cmd = tokio::process::Command::new("apt-get");
cmd.args(["update", "-q"]);
let output = cmd.output().await.context("failed to run apt update")?;
match output.status.success() {
true => tracing::info!("successfully ran apt update"),
false => {
anyhow::bail!(
"failed to run apt update: {}",
std::str::from_utf8(&output.stderr)?
);
}
}
let mut cmd = tokio::process::Command::new("apt-get");
cmd.env("DEBIAN_FRONTEND", "noninteractive")
.args(["upgrade", "-y"]);
let output = cmd.output().await.context("failed to run apt upgrade")?;
match output.status.success() {
true => tracing::info!("successfully ran apt upgrade"),
false => {
anyhow::bail!(
"failed to run apt upgrade: {}",
std::str::from_utf8(&output.stderr)?
);
}
}
Ok(())
}
}

View File

@ -2,7 +2,14 @@ use std::{ops::Deref, sync::Arc};
use crate::api::Discovery;
use super::{config::AgentConfig, discovery_client::DiscoveryClient, grpc_client::GrpcClient};
use super::{
config::AgentConfig,
discovery_client::DiscoveryClient,
grpc_client::GrpcClient,
handlers::scheduled_tasks::{self, ScheduledTasks},
queue::AgentQueue,
scheduler::Scheduler,
};
#[derive(Clone)]
pub struct AgentState(Arc<State>);
@ -31,20 +38,23 @@ pub struct State {
pub grpc: GrpcClient,
pub config: AgentConfig,
pub discovery: Discovery,
pub queue: AgentQueue,
}
impl State {
pub async fn new() -> anyhow::Result<Self> {
let config = AgentConfig::new().await?;
let discovery = DiscoveryClient::new(&config.discovery);
let discovery = discovery.discover().await?;
let discovery = DiscoveryClient::new(&config.discovery).discover().await?;
let grpc = GrpcClient::new(&discovery.process_host);
let scheduled_tasks = ScheduledTasks::new();
let scheduler = Scheduler::new(scheduled_tasks);
let queue = AgentQueue::new(scheduler);
Ok(Self {
grpc,
config,
discovery,
queue,
})
}
}

View File

@ -1,11 +1,16 @@
use notmad::{Component, MadError};
use super::{agent_state::AgentState, config::AgentConfig, grpc_client::GrpcClient};
use crate::agent::models::Commands;
use super::{
agent_state::AgentState, config::AgentConfig, grpc_client::GrpcClient, queue::AgentQueue,
};
#[derive(Clone)]
pub struct EventHandler {
config: AgentConfig,
grpc: GrpcClient,
queue: AgentQueue,
}
impl EventHandler {
@ -15,6 +20,7 @@ impl EventHandler {
Self {
config: state.config.clone(),
grpc: state.grpc.clone(),
queue: state.queue.clone(),
}
}
}
@ -46,7 +52,11 @@ impl Component for EventHandler {
#[async_trait::async_trait]
impl super::grpc_client::ListenEventsExecutor for EventHandler {
async fn execute(&self, event: crate::grpc::ListenEventsResponse) -> anyhow::Result<()> {
tracing::info!(value = event.value, "received event");
tracing::info!(value = event.id, "received event");
let event: Commands = serde_json::from_str(&event.value)?;
self.queue.publish(event).await?;
Ok(())
}

View File

@ -0,0 +1 @@
pub mod scheduled_tasks;

View File

@ -0,0 +1,36 @@
use std::collections::BTreeMap;
use crate::agent::actions::Plan;
#[derive(Clone)]
pub struct ScheduledTasks {}
impl ScheduledTasks {
pub fn new() -> Self {
Self {}
}
pub async fn handle(
&self,
task: &str,
_properties: BTreeMap<String, String>,
) -> anyhow::Result<()> {
// Get plan
let plan = Plan::new();
let tasks = plan.tasks().await?;
// For task
for task in tasks {
// Check idempotency rules
if !task.should_run().await? {
tracing::debug!(task = task.id(), "skipping run");
continue;
}
// Run task if not valid
tracing::info!(task = task.id(), "executing task");
task.execute().await?;
}
Ok(())
}
}

View File

@ -0,0 +1,20 @@
use std::{collections::BTreeMap, fmt::Display};
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize, Clone, Debug)]
#[serde(tag = "type")]
pub enum Commands {
ScheduleTask {
task: String,
properties: BTreeMap<String, String>,
},
}
impl Display for Commands {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(match self {
Commands::ScheduleTask { .. } => "schedule_task",
})
}
}

View File

@ -0,0 +1,67 @@
use std::sync::Arc;
use notmad::{Component, MadError};
use tokio::sync::Mutex;
use super::{handlers::scheduled_tasks::ScheduledTasks, models::Commands, scheduler::Scheduler};
#[derive(Clone)]
pub struct AgentQueue {
sender: Arc<tokio::sync::mpsc::Sender<Commands>>,
receiver: Arc<Mutex<tokio::sync::mpsc::Receiver<Commands>>>,
scheduler: Scheduler,
}
impl AgentQueue {
pub fn new(scheduler: Scheduler) -> Self {
let (tx, rx) = tokio::sync::mpsc::channel(5);
Self {
sender: Arc::new(tx),
receiver: Arc::new(Mutex::new(rx)),
scheduler,
}
}
pub async fn handler(&self, command: Commands) -> anyhow::Result<()> {
tracing::debug!("handling task");
self.scheduler.handle(command).await?;
Ok(())
}
pub async fn publish(&self, command: Commands) -> anyhow::Result<()> {
tracing::debug!("publishing task: {}", command.to_string());
self.sender.send(command).await?;
Ok(())
}
}
#[async_trait::async_trait]
impl Component for AgentQueue {
async fn run(
&self,
cancellation_token: tokio_util::sync::CancellationToken,
) -> Result<(), notmad::MadError> {
loop {
let mut recv = self.receiver.lock().await;
tokio::select! {
res = recv.recv() => {
if let Some(res) = res {
self.handler(res).await.map_err(MadError::Inner)?;
}
}
_ = cancellation_token.cancelled() => {
break
}
}
}
Ok(())
}
}

View File

@ -1,10 +1,15 @@
use std::collections::BTreeMap;
use anyhow::Context;
use super::agent_state::AgentState;
use crate::agent::models::Commands;
use super::{agent_state::AgentState, queue::AgentQueue};
#[derive(Clone)]
pub struct AgentRefresh {
process_host: String,
queue: AgentQueue,
}
impl AgentRefresh {
@ -12,6 +17,7 @@ impl AgentRefresh {
let state: AgentState = state.into();
Self {
process_host: state.discovery.process_host.clone(),
queue: state.queue.clone(),
}
}
}
@ -44,80 +50,12 @@ impl nodrift::Drifter for AgentRefresh {
async fn execute(&self, _token: tokio_util::sync::CancellationToken) -> anyhow::Result<()> {
tracing::info!(process_host = self.process_host, "refreshing agent");
// Get plan
let plan = Plan::new();
let tasks = plan.tasks().await?;
// For task
for task in tasks {
// Check idempotency rules
if !task.should_run().await? {
tracing::debug!(task = task.id(), "skipping run");
continue;
}
// Run task if not valid
tracing::info!(task = task.id(), "executing task");
task.execute().await?;
}
Ok(())
}
}
pub struct Plan {}
impl Plan {
pub fn new() -> Self {
Self {}
}
pub async fn tasks(&self) -> anyhow::Result<Vec<Task>> {
Ok(vec![Task::new()])
}
}
pub struct Task {}
impl Task {
pub fn new() -> Self {
Self {}
}
pub fn id(&self) -> String {
"apt".into()
}
pub async fn should_run(&self) -> anyhow::Result<bool> {
Ok(true)
}
pub async fn execute(&self) -> anyhow::Result<()> {
let mut cmd = tokio::process::Command::new("apt-get");
cmd.args(["update", "-q"]);
let output = cmd.output().await.context("failed to run apt update")?;
match output.status.success() {
true => tracing::info!("successfully ran apt update"),
false => {
anyhow::bail!(
"failed to run apt update: {}",
std::str::from_utf8(&output.stderr)?
);
}
}
let mut cmd = tokio::process::Command::new("apt-get");
cmd.env("DEBIAN_FRONTEND", "noninteractive")
.args(["upgrade", "-y"]);
let output = cmd.output().await.context("failed to run apt upgrade")?;
match output.status.success() {
true => tracing::info!("successfully ran apt upgrade"),
false => {
anyhow::bail!(
"failed to run apt upgrade: {}",
std::str::from_utf8(&output.stderr)?
);
}
}
self.queue
.publish(Commands::ScheduleTask {
task: "update".into(),
properties: BTreeMap::default(),
})
.await?;
Ok(())
}

View File

@ -0,0 +1,22 @@
use super::{handlers::scheduled_tasks::ScheduledTasks, models::Commands};
#[derive(Clone)]
pub struct Scheduler {
scheduled_tasks: ScheduledTasks,
}
impl Scheduler {
pub fn new(scheduled_tasks: ScheduledTasks) -> Self {
Self { scheduled_tasks }
}
pub async fn handle(&self, command: Commands) -> anyhow::Result<()> {
match command {
Commands::ScheduleTask { task, properties } => {
self.scheduled_tasks.handle(&task, properties).await?;
}
}
Ok(())
}
}

View File

@ -44,6 +44,8 @@ pub struct ListenEventsRequest {
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ListenEventsResponse {
#[prost(string, tag="1")]
pub id: ::prost::alloc::string::String,
#[prost(string, tag="2")]
pub value: ::prost::alloc::string::String,
}
include!("churn.v1.tonic.rs");

View File

@ -1,11 +1,11 @@
use std::{net::SocketAddr, pin::Pin};
use std::{collections::BTreeMap, net::SocketAddr, pin::Pin};
use anyhow::Context;
use futures::Stream;
use notmad::{Component, MadError};
use tonic::transport::Server;
use crate::grpc::*;
use crate::{agent::models::Commands, grpc::*};
#[derive(Clone)]
pub struct GrpcServer {
@ -70,9 +70,18 @@ impl crate::grpc::churn_server::Churn for GrpcServer {
loop {
interval.tick().await;
let Ok(schedule_task) = serde_json::to_string(&Commands::ScheduleTask {
task: "refresh".into(),
properties: BTreeMap::default(),
}) else {
tracing::warn!("failed to serialize event");
continue;
};
if let Err(e) = tx
.send(Ok(ListenEventsResponse {
value: uuid::Uuid::new_v4().to_string(),
id: uuid::Uuid::new_v4().to_string(),
value: schedule_task,
}))
.await
{