Signed-off-by: kjuulh <contact@kjuulh.io>
This commit is contained in:
@@ -15,6 +15,8 @@ axum.workspace = true
|
||||
serde = { version = "1.0.197", features = ["derive"] }
|
||||
uuid = { version = "1.7.0", features = ["v4"] }
|
||||
tower-http = { version = "0.5.2", features = ["cors", "trace"] }
|
||||
notmad = "0.5.0"
|
||||
notmad = "0.6.0"
|
||||
tokio-util = "0.7.12"
|
||||
async-trait = "0.1.83"
|
||||
nodrift = "0.2.0"
|
||||
rusqlite = { version = "0.32.1", features = ["bundled"] }
|
||||
|
18
crates/churn/src/agent.rs
Normal file
18
crates/churn/src/agent.rs
Normal file
@@ -0,0 +1,18 @@
|
||||
use agent_state::AgentState;
|
||||
use refresh::AgentRefresh;
|
||||
|
||||
mod agent_state;
|
||||
|
||||
mod refresh;
|
||||
|
||||
pub async fn execute(host: impl Into<String>) -> anyhow::Result<()> {
|
||||
let state = AgentState::new().await?;
|
||||
|
||||
notmad::Mad::builder()
|
||||
.add(AgentRefresh::new(&state, host))
|
||||
.cancellation(Some(std::time::Duration::from_secs(2)))
|
||||
.run()
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
32
crates/churn/src/agent/agent_state.rs
Normal file
32
crates/churn/src/agent/agent_state.rs
Normal file
@@ -0,0 +1,32 @@
|
||||
use std::{ops::Deref, sync::Arc};
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct AgentState(Arc<State>);
|
||||
|
||||
impl AgentState {
|
||||
pub async fn new() -> anyhow::Result<Self> {
|
||||
Ok(Self(Arc::new(State::new().await?)))
|
||||
}
|
||||
}
|
||||
|
||||
impl From<&AgentState> for AgentState {
|
||||
fn from(value: &AgentState) -> Self {
|
||||
value.clone()
|
||||
}
|
||||
}
|
||||
|
||||
impl Deref for AgentState {
|
||||
type Target = Arc<State>;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.0
|
||||
}
|
||||
}
|
||||
|
||||
pub struct State {}
|
||||
|
||||
impl State {
|
||||
pub async fn new() -> anyhow::Result<Self> {
|
||||
Ok(Self {})
|
||||
}
|
||||
}
|
119
crates/churn/src/agent/refresh.rs
Normal file
119
crates/churn/src/agent/refresh.rs
Normal file
@@ -0,0 +1,119 @@
|
||||
use super::agent_state::AgentState;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct AgentRefresh {
|
||||
_state: AgentState,
|
||||
host: String,
|
||||
}
|
||||
|
||||
impl AgentRefresh {
|
||||
pub fn new(state: impl Into<AgentState>, host: impl Into<String>) -> Self {
|
||||
Self {
|
||||
_state: state.into(),
|
||||
host: host.into(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl notmad::Component for AgentRefresh {
|
||||
async fn run(
|
||||
&self,
|
||||
cancellation_token: tokio_util::sync::CancellationToken,
|
||||
) -> Result<(), notmad::MadError> {
|
||||
let cancel = nodrift::schedule_drifter(std::time::Duration::from_secs(5), self.clone());
|
||||
tokio::select! {
|
||||
_ = cancel.cancelled() => {},
|
||||
_ = cancellation_token.cancelled() => {
|
||||
tracing::debug!("cancelling agent refresh");
|
||||
cancel.cancel();
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl nodrift::Drifter for AgentRefresh {
|
||||
async fn execute(&self, _token: tokio_util::sync::CancellationToken) -> anyhow::Result<()> {
|
||||
tracing::info!(host = self.host, "refreshing agent");
|
||||
|
||||
// Get plan
|
||||
let plan = Plan::new();
|
||||
let tasks = plan.tasks().await?;
|
||||
|
||||
// For task
|
||||
for task in tasks {
|
||||
// Check idempotency rules
|
||||
if !task.should_run().await? {
|
||||
tracing::debug!(task = task.id(), "skipping run");
|
||||
continue;
|
||||
}
|
||||
|
||||
// Run task if not valid
|
||||
tracing::info!(task = task.id(), "executing task");
|
||||
task.execute().await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
pub struct Plan {}
|
||||
impl Plan {
|
||||
pub fn new() -> Self {
|
||||
Self {}
|
||||
}
|
||||
|
||||
pub async fn tasks(&self) -> anyhow::Result<Vec<Task>> {
|
||||
Ok(vec![Task::new()])
|
||||
}
|
||||
}
|
||||
|
||||
pub struct Task {}
|
||||
|
||||
impl Task {
|
||||
pub fn new() -> Self {
|
||||
Self {}
|
||||
}
|
||||
|
||||
pub fn id(&self) -> String {
|
||||
"apt".into()
|
||||
}
|
||||
|
||||
pub async fn should_run(&self) -> anyhow::Result<bool> {
|
||||
Ok(true)
|
||||
}
|
||||
|
||||
pub async fn execute(&self) -> anyhow::Result<()> {
|
||||
let mut cmd = tokio::process::Command::new("apt");
|
||||
cmd.args(["apt-get", "update", "-q"]);
|
||||
let output = cmd.output().await?;
|
||||
match output.status.success() {
|
||||
true => tracing::info!("successfully ran apt update"),
|
||||
false => {
|
||||
anyhow::bail!(
|
||||
"failed to run apt update: {}",
|
||||
std::str::from_utf8(&output.stderr)?
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
let mut cmd = tokio::process::Command::new("apt");
|
||||
cmd.env("DEBIAN_FRONTEND", "noninteractive")
|
||||
.args(["apt-get", "upgrade", "-y"]);
|
||||
let output = cmd.output().await?;
|
||||
match output.status.success() {
|
||||
true => tracing::info!("successfully ran apt upgrade"),
|
||||
false => {
|
||||
anyhow::bail!(
|
||||
"failed to run apt upgrade: {}",
|
||||
std::str::from_utf8(&output.stderr)?
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
@@ -2,19 +2,28 @@ use std::net::SocketAddr;
|
||||
|
||||
use clap::{Parser, Subcommand};
|
||||
|
||||
use crate::{api, state::SharedState};
|
||||
use crate::{agent, api, state::SharedState};
|
||||
|
||||
pub async fn execute() -> anyhow::Result<()> {
|
||||
let state = SharedState::new().await?;
|
||||
|
||||
let cli = Command::parse();
|
||||
if let Some(Commands::Serve { host }) = cli.command {
|
||||
tracing::info!("Starting service");
|
||||
match cli.command.expect("to have a subcommand") {
|
||||
Commands::Serve { host } => {
|
||||
tracing::info!("Starting service");
|
||||
|
||||
notmad::Mad::builder()
|
||||
.add(api::Api::new(&state, host))
|
||||
.run()
|
||||
.await?;
|
||||
notmad::Mad::builder()
|
||||
.add(api::Api::new(&state, host))
|
||||
.run()
|
||||
.await?;
|
||||
}
|
||||
Commands::Agent { commands } => match commands {
|
||||
AgentCommands::Start { host } => {
|
||||
tracing::info!("starting agent");
|
||||
agent::execute(host).await?;
|
||||
tracing::info!("shut down agent");
|
||||
}
|
||||
},
|
||||
}
|
||||
|
||||
Ok(())
|
||||
@@ -33,4 +42,16 @@ enum Commands {
|
||||
#[arg(env = "SERVICE_HOST", long, default_value = "127.0.0.1:3000")]
|
||||
host: SocketAddr,
|
||||
},
|
||||
Agent {
|
||||
#[command(subcommand)]
|
||||
commands: AgentCommands,
|
||||
},
|
||||
}
|
||||
|
||||
#[derive(Subcommand)]
|
||||
enum AgentCommands {
|
||||
Start {
|
||||
#[arg(env = "SERVICE_HOST", long = "service-host")]
|
||||
host: String,
|
||||
},
|
||||
}
|
||||
|
@@ -2,6 +2,8 @@ mod api;
|
||||
mod cli;
|
||||
mod state;
|
||||
|
||||
mod agent;
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
dotenv::dotenv().ok();
|
||||
|
Reference in New Issue
Block a user