Compare commits

..

49 Commits
v2 ... main

Author SHA1 Message Date
c5e5307682 fix(deps): update rust crate serde to v1.0.216
All checks were successful
continuous-integration/drone/pr Build is passing
continuous-integration/drone/push Build is passing
2024-12-11 05:09:41 +00:00
5fb59ad992 fix(deps): update tokio-prost monorepo to v0.13.4
All checks were successful
continuous-integration/drone/pr Build is passing
continuous-integration/drone/push Build is passing
2024-12-07 01:13:20 +00:00
e21663c8bd chore(deps): update rust crate clap to v4.5.23
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/pr Build is passing
2024-12-06 01:14:09 +00:00
39a01778b2 fix(deps): update rust crate tokio-util to v0.7.13
All checks were successful
continuous-integration/drone/pr Build is passing
continuous-integration/drone/push Build is passing
2024-12-05 01:11:27 +00:00
f1cdf3ae20 chore(deps): update all dependencies
All checks were successful
continuous-integration/drone/pr Build is passing
continuous-integration/drone/push Build is passing
2024-12-04 01:13:13 +00:00
355587234e
feat: allow process from external code
Some checks failed
continuous-integration/drone/push Build is failing
Signed-off-by: kjuulh <contact@kjuulh.io>
2024-12-02 23:12:37 +01:00
21a13f3444
feat: add inherit
All checks were successful
continuous-integration/drone/push Build is passing
Signed-off-by: kjuulh <contact@kjuulh.io>
2024-12-02 21:00:20 +01:00
5e1b585a2d
feat: add default no labels
All checks were successful
continuous-integration/drone/push Build is passing
Signed-off-by: kjuulh <contact@kjuulh.io>
2024-12-01 22:46:22 +01:00
94025a02ce
feat: warn all targets
All checks were successful
continuous-integration/drone/push Build is passing
Signed-off-by: kjuulh <contact@kjuulh.io>
2024-12-01 22:30:18 +01:00
db4cc98643
feat: update with web assembly components
All checks were successful
continuous-integration/drone/push Build is passing
Signed-off-by: kjuulh <contact@kjuulh.io>
2024-12-01 22:21:17 +01:00
2387a70778
feat: add labels to config
All checks were successful
continuous-integration/drone/push Build is passing
Signed-off-by: kjuulh <contact@kjuulh.io>
2024-12-01 14:38:10 +01:00
d6fdda0e4e
feat: add abstraction around task
All checks were successful
continuous-integration/drone/push Build is passing
Signed-off-by: kjuulh <contact@kjuulh.io>
2024-12-01 14:25:24 +01:00
974e1ee0d6
feat: enable webpki roots
All checks were successful
continuous-integration/drone/push Build is passing
Signed-off-by: kjuulh <contact@kjuulh.io>
2024-11-30 12:31:46 +01:00
717b052a88
feat: add short connect timeout
All checks were successful
continuous-integration/drone/push Build is passing
Signed-off-by: kjuulh <contact@kjuulh.io>
2024-11-30 12:30:16 +01:00
9b52376e7a
feat: more error logging
All checks were successful
continuous-integration/drone/push Build is passing
Signed-off-by: kjuulh <contact@kjuulh.io>
2024-11-30 12:22:44 +01:00
7b222af1dd
feat: stop the service if running
All checks were successful
continuous-integration/drone/push Build is passing
Signed-off-by: kjuulh <contact@kjuulh.io>
2024-11-30 12:20:51 +01:00
150c7c3c98
feat: setup stream logging
All checks were successful
continuous-integration/drone/push Build is passing
Signed-off-by: kjuulh <contact@kjuulh.io>
2024-11-30 12:16:19 +01:00
8b064c2169
feat: update script with warn
All checks were successful
continuous-integration/drone/push Build is passing
Signed-off-by: kjuulh <contact@kjuulh.io>
2024-11-30 12:02:52 +01:00
879737eedd
feat: disable force again
All checks were successful
continuous-integration/drone/push Build is passing
Signed-off-by: kjuulh <contact@kjuulh.io>
2024-11-30 11:52:50 +01:00
818cc6c671
feat: make curl silent"
All checks were successful
continuous-integration/drone/push Build is passing
Signed-off-by: kjuulh <contact@kjuulh.io>
2024-11-30 11:49:23 +01:00
e759239243
feat: force update
All checks were successful
continuous-integration/drone/push Build is passing
Signed-off-by: kjuulh <contact@kjuulh.io>
2024-11-30 11:48:40 +01:00
b37674987e
feat: use public prod
Signed-off-by: kjuulh <contact@kjuulh.io>
2024-11-30 11:48:26 +01:00
1dea5f29ac
feat: run as root
All checks were successful
continuous-integration/drone/push Build is passing
Signed-off-by: kjuulh <contact@kjuulh.io>
2024-11-30 11:45:49 +01:00
38a0a9fba4
feat: agent is already setup
All checks were successful
continuous-integration/drone/push Build is passing
Signed-off-by: kjuulh <contact@kjuulh.io>
2024-11-30 11:44:51 +01:00
f7c7aef96b
feat: allow errors
All checks were successful
continuous-integration/drone/push Build is passing
Signed-off-by: kjuulh <contact@kjuulh.io>
2024-11-30 11:43:45 +01:00
7fefca47c9
feat: some more debugging
All checks were successful
continuous-integration/drone/push Build is passing
Signed-off-by: kjuulh <contact@kjuulh.io>
2024-11-30 11:42:46 +01:00
36b1335fe9
feat: some more debugging
All checks were successful
continuous-integration/drone/push Build is passing
Signed-off-by: kjuulh <contact@kjuulh.io>
2024-11-30 11:40:05 +01:00
eeabeda036
feat: stderr to stdout as well
All checks were successful
continuous-integration/drone/push Build is passing
Signed-off-by: kjuulh <contact@kjuulh.io>
2024-11-30 11:37:28 +01:00
569fee52e6
feat: this should work
All checks were successful
continuous-integration/drone/push Build is passing
Signed-off-by: kjuulh <contact@kjuulh.io>
2024-11-30 11:36:10 +01:00
b0ec41fa93
feat: when config has already been setup
All checks were successful
continuous-integration/drone/push Build is passing
Signed-off-by: kjuulh <contact@kjuulh.io>
2024-11-30 11:35:24 +01:00
77f5ec7475
feat: add agent start as well
All checks were successful
continuous-integration/drone/push Build is passing
Signed-off-by: kjuulh <contact@kjuulh.io>
2024-11-30 11:33:21 +01:00
f3926d0885
feat: update with agent setup
All checks were successful
continuous-integration/drone/push Build is passing
Signed-off-by: kjuulh <contact@kjuulh.io>
2024-11-30 11:32:41 +01:00
b48b1ec886
feat: add install script
All checks were successful
continuous-integration/drone/push Build is passing
Signed-off-by: kjuulh <contact@kjuulh.io>
2024-11-30 11:28:03 +01:00
79a8a34499 chore(deps): update rust crate tracing-subscriber to v0.3.19
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/pr Build is passing
2024-11-30 01:19:48 +00:00
c9bbeb2439 fix(deps): update rust crate bytes to v1.9.0
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/pr Build is passing
2024-11-29 01:10:19 +00:00
b19ff0b7e5 chore(deps): update rust crate tracing to v0.1.41
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/pr Build is passing
2024-11-28 01:11:13 +00:00
eeaf59ac63
feat: add comments
All checks were successful
continuous-integration/drone/push Build is passing
Signed-off-by: kjuulh <contact@kjuulh.io>
2024-11-24 22:04:07 +01:00
6f04d0cdda
feat: use actual internal
All checks were successful
continuous-integration/drone/push Build is passing
Signed-off-by: kjuulh <contact@kjuulh.io>
2024-11-24 21:44:46 +01:00
43c5fa1731
feat: reqwest as native build
All checks were successful
continuous-integration/drone/push Build is passing
Signed-off-by: kjuulh <contact@kjuulh.io>
2024-11-24 21:42:41 +01:00
9badf8e193
feat: use internal
All checks were successful
continuous-integration/drone/push Build is passing
Signed-off-by: kjuulh <contact@kjuulh.io>
2024-11-24 21:41:56 +01:00
55a0d294c5
feat: add external service host
All checks were successful
continuous-integration/drone/push Build is passing
Signed-off-by: kjuulh <contact@kjuulh.io>
2024-11-24 21:37:08 +01:00
8508d3f640
feat: add grpc host
All checks were successful
continuous-integration/drone/push Build is passing
Signed-off-by: kjuulh <contact@kjuulh.io>
2024-11-24 21:34:10 +01:00
dd8ade9798
feat: add external vars
All checks were successful
continuous-integration/drone/push Build is passing
Signed-off-by: kjuulh <contact@kjuulh.io>
2024-11-24 21:29:58 +01:00
f1e6268a2d
feat: add grpc and env
Some checks failed
continuous-integration/drone/push Build is failing
Signed-off-by: kjuulh <contact@kjuulh.io>
2024-11-24 21:28:06 +01:00
6647bb89be
feat: add queue
All checks were successful
continuous-integration/drone/push Build is passing
Signed-off-by: kjuulh <contact@kjuulh.io>
2024-11-24 21:24:50 +01:00
ea5adb2f93
feat: add common queue
All checks were successful
continuous-integration/drone/push Build is passing
Signed-off-by: kjuulh <contact@kjuulh.io>
2024-11-24 21:08:37 +01:00
ee323e99e8
feat: add discovery
All checks were successful
continuous-integration/drone/push Build is passing
Signed-off-by: kjuulh <contact@kjuulh.io>
2024-11-24 17:12:15 +01:00
c4434fd841 fix(deps): update rust crate tower-http to 0.6.0
All checks were successful
continuous-integration/drone/pr Build is passing
continuous-integration/drone/push Build is passing
2024-11-24 01:15:41 +00:00
cb340ffb1e
feat: add tonic
All checks were successful
continuous-integration/drone/push Build is passing
Signed-off-by: kjuulh <contact@kjuulh.io>
2024-11-24 01:34:06 +01:00
35 changed files with 3940 additions and 261 deletions

8
.env
View File

@ -1 +1,9 @@
EXTERNAL_HOST=http://localhost:3000
PROCESS_HOST=http://localhost:7900
SERVICE_HOST=127.0.0.1:3000
DISCOVERY_HOST=http://127.0.0.1:3000
#EXTERNAL_HOST=http://localhost:3000
#PROCESS_HOST=http://localhost:7900
#SERVICE_HOST=127.0.0.1:3000
#DISCOVERY_HOST=https://churn.prod.kjuulh.app

2253
Cargo.lock generated

File diff suppressed because it is too large Load Diff

8
buf.gen.yaml Normal file
View File

@ -0,0 +1,8 @@
version: v2
plugins:
- remote: buf.build/community/neoeinstein-prost
out: crates/churn/src/grpc
- remote: buf.build/community/neoeinstein-tonic:v0.4.0
out: crates/churn/src/grpc
inputs:
- directory: crates/churn/proto

View File

@ -14,8 +14,8 @@ axum.workspace = true
serde = { version = "1.0.197", features = ["derive"] } serde = { version = "1.0.197", features = ["derive"] }
uuid = { version = "1.7.0", features = ["v4"] } uuid = { version = "1.7.0", features = ["v4"] }
tower-http = { version = "0.5.2", features = ["cors", "trace"] } tower-http = { version = "0.6.0", features = ["cors", "trace"] }
notmad = "0.6.0" notmad = "0.7.1"
tokio-util = "0.7.12" tokio-util = "0.7.12"
async-trait = "0.1.83" async-trait = "0.1.83"
nodrift = "0.2.0" nodrift = "0.2.0"
@ -24,3 +24,16 @@ prost-types = "0.13.3"
prost = "0.13.3" prost = "0.13.3"
bytes = "1.8.0" bytes = "1.8.0"
tonic = { version = "0.12.3", features = ["tls", "tls-roots"] } tonic = { version = "0.12.3", features = ["tls", "tls-roots"] }
toml = "0.8.19"
dirs = "5.0.1"
futures = "0.3.31"
reqwest = { version = "0.12.9", default-features = false, features = [
"json",
"http2",
"charset",
"native-tls-vendored",
"stream",
] }
serde_json = "1.0.133"
wasmtime = "27.0.0"
wasmtime-wasi = "27.0.0"

View File

@ -0,0 +1,35 @@
syntax = "proto3";
package churn.v1;
service Churn {
rpc GetKey(GetKeyRequest) returns (GetKeyResponse);
rpc SetKey(SetKeyRequest) returns (SetKeyResponse);
rpc ListenEvents(ListenEventsRequest) returns (stream ListenEventsResponse);
}
message GetKeyRequest {
string namespace = 1;
optional string id = 2;
string key = 3;
}
message GetKeyResponse {
optional string value = 1;
}
message SetKeyRequest {
string namespace = 1;
optional string id = 2;
string key = 3;
string value = 4;
}
message SetKeyResponse {}
message ListenEventsRequest {
string namespace = 1;
optional string id = 2;
}
message ListenEventsResponse {
string id = 1;
string value = 2;
}

View File

@ -1,15 +1,33 @@
use agent_state::AgentState; use agent_state::AgentState;
use event_handler::EventHandler;
use refresh::AgentRefresh; use refresh::AgentRefresh;
pub use config::setup_config;
pub mod models;
pub(crate) mod task;
mod agent_state; mod agent_state;
mod config;
mod discovery_client;
mod event_handler;
mod grpc_client;
mod plugins;
mod queue;
mod refresh; mod refresh;
mod scheduler;
pub async fn execute(host: impl Into<String>) -> anyhow::Result<()> { mod handlers;
mod actions;
pub async fn execute() -> anyhow::Result<()> {
let state = AgentState::new().await?; let state = AgentState::new().await?;
notmad::Mad::builder() notmad::Mad::builder()
.add(AgentRefresh::new(&state, host)) .add(AgentRefresh::new(&state))
.add(EventHandler::new(&state))
.add(state.queue.clone())
.cancellation(Some(std::time::Duration::from_secs(2))) .cancellation(Some(std::time::Duration::from_secs(2)))
.run() .run()
.await?; .await?;

View File

@ -0,0 +1,23 @@
use apt::AptTask;
use plugin_task::PluginTask;
use super::{plugins::PluginStore, task::IntoTask};
pub mod apt;
pub mod plugin_task;
pub struct Plan {
store: PluginStore,
}
impl Plan {
pub fn new(store: PluginStore) -> Self {
Self { store }
}
pub async fn tasks(&self) -> anyhow::Result<Vec<impl IntoTask>> {
Ok(vec![
AptTask::new().into_task(),
PluginTask::new("alloy@0.1.0", self.store.clone()).into_task(),
])
}
}

View File

@ -0,0 +1,49 @@
use anyhow::Context;
use crate::agent::task::Task;
pub struct AptTask {}
impl AptTask {
pub fn new() -> Self {
Self {}
}
}
#[async_trait::async_trait]
impl Task for AptTask {
async fn id(&self) -> anyhow::Result<String> {
Ok("apt".into())
}
async fn execute(&self) -> anyhow::Result<()> {
let mut cmd = tokio::process::Command::new("apt-get");
cmd.args(["update", "-q"]);
let output = cmd.output().await.context("failed to run apt update")?;
match output.status.success() {
true => tracing::info!("successfully ran apt update"),
false => {
anyhow::bail!(
"failed to run apt update: {}",
std::str::from_utf8(&output.stderr)?
);
}
}
let mut cmd = tokio::process::Command::new("apt-get");
cmd.env("DEBIAN_FRONTEND", "noninteractive")
.args(["upgrade", "-y"]);
let output = cmd.output().await.context("failed to run apt upgrade")?;
match output.status.success() {
true => tracing::info!("successfully ran apt upgrade"),
false => {
anyhow::bail!(
"failed to run apt upgrade: {}",
std::str::from_utf8(&output.stderr)?
);
}
}
Ok(())
}
}

View File

@ -0,0 +1,30 @@
use crate::agent::{plugins::PluginStore, task::Task};
pub struct PluginTask {
plugin: String,
store: PluginStore,
}
impl PluginTask {
pub fn new(plugin: impl Into<String>, store: PluginStore) -> Self {
Self {
plugin: plugin.into(),
store,
}
}
}
#[async_trait::async_trait]
impl Task for PluginTask {
async fn id(&self) -> anyhow::Result<String> {
let id = self.store.id(&self.plugin).await?;
Ok(id)
}
async fn execute(&self) -> anyhow::Result<()> {
self.store.execute(&self.plugin).await?;
Ok(())
}
}

View File

@ -1,5 +1,13 @@
use std::{ops::Deref, sync::Arc}; use std::{ops::Deref, sync::Arc};
use crate::api::Discovery;
use super::{
config::AgentConfig, discovery_client::DiscoveryClient, grpc_client::GrpcClient,
handlers::scheduled_tasks::ScheduledTasks, plugins::PluginStore, queue::AgentQueue,
scheduler::Scheduler,
};
#[derive(Clone)] #[derive(Clone)]
pub struct AgentState(Arc<State>); pub struct AgentState(Arc<State>);
@ -23,10 +31,30 @@ impl Deref for AgentState {
} }
} }
pub struct State {} pub struct State {
pub grpc: GrpcClient,
pub config: AgentConfig,
pub discovery: Discovery,
pub queue: AgentQueue,
pub plugin_store: PluginStore,
}
impl State { impl State {
pub async fn new() -> anyhow::Result<Self> { pub async fn new() -> anyhow::Result<Self> {
Ok(Self {}) let config = AgentConfig::new().await?;
let discovery = DiscoveryClient::new(&config.discovery).discover().await?;
let grpc = GrpcClient::new(&discovery.process_host);
let plugin_store = PluginStore::new()?;
let scheduled_tasks = ScheduledTasks::new(plugin_store.clone());
let scheduler = Scheduler::new(scheduled_tasks);
let queue = AgentQueue::new(scheduler);
Ok(Self {
grpc,
config,
discovery,
queue,
plugin_store,
})
} }
} }

View File

@ -0,0 +1,93 @@
use std::collections::BTreeMap;
use anyhow::Context;
use serde::{Deserialize, Serialize};
use uuid::Uuid;
#[derive(Clone)]
pub struct AgentConfig {
pub agent_id: String,
pub discovery: String,
}
impl AgentConfig {
pub async fn new() -> anyhow::Result<Self> {
let config = ConfigFile::load().await?;
Ok(Self {
agent_id: config.agent_id,
discovery: config.discovery,
})
}
}
#[derive(Serialize, Deserialize)]
struct ConfigFile {
agent_id: String,
discovery: String,
labels: Option<BTreeMap<String, String>>,
}
impl ConfigFile {
pub async fn load() -> anyhow::Result<Self> {
let directory = dirs::data_dir()
.ok_or(anyhow::anyhow!("failed to get data dir"))?
.join("io.kjuulh.churn-agent")
.join("churn-agent.toml");
if !directory.exists() {
anyhow::bail!(
"No churn agent file was setup, run `churn agent setup` to setup the defaults"
)
}
let contents = tokio::fs::read_to_string(&directory).await?;
toml::from_str(&contents).context("failed to parse the contents of the churn agent config")
}
pub async fn write_default(
discovery: impl Into<String>,
force: bool,
labels: impl Into<BTreeMap<String, String>>,
) -> anyhow::Result<Self> {
let s = Self {
agent_id: Uuid::new_v4().to_string(),
discovery: discovery.into(),
labels: Some(labels.into()),
};
let directory = dirs::data_dir()
.ok_or(anyhow::anyhow!("failed to get data dir"))?
.join("io.kjuulh.churn-agent")
.join("churn-agent.toml");
if let Some(parent) = directory.parent() {
tokio::fs::create_dir_all(&parent).await?;
}
if !force && directory.exists() {
anyhow::bail!("config file already exists, consider moving it to a backup before trying again: {}", directory.display());
}
let contents = toml::to_string_pretty(&s)
.context("failed to convert default implementation to string")?;
tokio::fs::write(directory, contents.as_bytes())
.await
.context("failed to write to agent file")?;
Ok(s)
}
}
pub async fn setup_config(
discovery: impl Into<String>,
force: bool,
labels: impl Into<BTreeMap<String, String>>,
) -> anyhow::Result<()> {
ConfigFile::write_default(discovery, force, labels).await?;
Ok(())
}

View File

@ -0,0 +1,21 @@
use crate::api::Discovery;
pub struct DiscoveryClient {
host: String,
}
impl DiscoveryClient {
pub fn new(discovery_host: impl Into<String>) -> Self {
Self {
host: discovery_host.into(),
}
}
pub async fn discover(&self) -> anyhow::Result<Discovery> {
tracing::info!(
"getting details from discovery endpoint: {}/discovery",
self.host.trim_end_matches('/')
);
crate::api::Discovery::get_from_host(&self.host).await
}
}

View File

@ -0,0 +1,63 @@
use notmad::{Component, MadError};
use crate::agent::models::Commands;
use super::{
agent_state::AgentState, config::AgentConfig, grpc_client::GrpcClient, queue::AgentQueue,
};
#[derive(Clone)]
pub struct EventHandler {
config: AgentConfig,
grpc: GrpcClient,
queue: AgentQueue,
}
impl EventHandler {
pub fn new(state: impl Into<AgentState>) -> Self {
let state: AgentState = state.into();
Self {
config: state.config.clone(),
grpc: state.grpc.clone(),
queue: state.queue.clone(),
}
}
}
#[async_trait::async_trait]
impl Component for EventHandler {
fn name(&self) -> Option<String> {
Some("event_handler".into())
}
async fn run(
&self,
cancellation_token: tokio_util::sync::CancellationToken,
) -> Result<(), notmad::MadError> {
tokio::select! {
_ = cancellation_token.cancelled() => {},
res = self.grpc.listen_events("agents", None::<String>, self.clone()) => {
res.map_err(MadError::Inner)?;
},
res = self.grpc.listen_events("agents", Some(&self.config.agent_id), self.clone()) => {
res.map_err(MadError::Inner)?;
}
}
Ok(())
}
}
#[async_trait::async_trait]
impl super::grpc_client::ListenEventsExecutor for EventHandler {
async fn execute(&self, event: crate::grpc::ListenEventsResponse) -> anyhow::Result<()> {
tracing::info!(value = event.id, "received event");
let event: Commands = serde_json::from_str(&event.value)?;
self.queue.publish(event).await?;
Ok(())
}
}

View File

@ -0,0 +1,108 @@
use tonic::transport::{Channel, ClientTlsConfig};
use crate::grpc::{churn_client::ChurnClient, *};
#[derive(Clone)]
pub struct GrpcClient {
host: String,
}
impl GrpcClient {
pub fn new(host: impl Into<String>) -> Self {
Self { host: host.into() }
}
pub async fn get_key(
&self,
namespace: &str,
id: Option<impl Into<String>>,
key: &str,
) -> anyhow::Result<Option<String>> {
let mut client = self.client().await?;
let resp = client
.get_key(GetKeyRequest {
key: key.into(),
namespace: namespace.into(),
id: id.map(|i| i.into()),
})
.await?;
let resp = resp.into_inner();
Ok(resp.value)
}
pub async fn set_key(
&self,
namespace: &str,
id: Option<impl Into<String>>,
key: &str,
value: &str,
) -> anyhow::Result<()> {
let mut client = self.client().await?;
client
.set_key(SetKeyRequest {
key: key.into(),
value: value.into(),
namespace: namespace.into(),
id: id.map(|i| i.into()),
})
.await?;
Ok(())
}
pub async fn listen_events(
&self,
namespace: &str,
id: Option<impl Into<String>>,
exec: impl ListenEventsExecutor,
) -> anyhow::Result<()> {
let mut client = self.client().await?;
tracing::debug!("creating stream for listening to events on: {}", namespace);
let resp = client
.listen_events(ListenEventsRequest {
namespace: namespace.into(),
id: id.map(|i| i.into()),
})
.await
.inspect_err(|e| tracing::warn!("failed to establish a connection: {}", e))?;
tracing::debug!("setup stream: {}", namespace);
let mut inner = resp.into_inner();
while let Ok(Some(message)) = inner.message().await {
tracing::debug!("received message: {}", namespace);
exec.execute(message)
.await
.inspect_err(|e| tracing::warn!("failed to handle message: {}", e))?;
}
Ok(())
}
async fn client(&self) -> anyhow::Result<ChurnClient<tonic::transport::Channel>> {
tracing::debug!("setting up client");
let channel = if self.host.starts_with("https") {
Channel::from_shared(self.host.to_owned())?
.tls_config(ClientTlsConfig::new().with_native_roots())?
.connect_timeout(std::time::Duration::from_secs(5))
.connect()
.await?
} else {
Channel::from_shared(self.host.to_owned())?
.connect()
.await?
};
let client = ChurnClient::new(channel);
Ok(client)
}
}
#[async_trait::async_trait]
pub trait ListenEventsExecutor {
async fn execute(&self, event: ListenEventsResponse) -> anyhow::Result<()>;
}

View File

@ -0,0 +1 @@
pub mod scheduled_tasks;

View File

@ -0,0 +1,46 @@
use std::collections::BTreeMap;
use crate::agent::{
actions::Plan,
plugins::PluginStore,
task::{ConcreteTask, IntoTask},
};
#[derive(Clone)]
pub struct ScheduledTasks {
store: PluginStore,
}
impl ScheduledTasks {
pub fn new(store: PluginStore) -> Self {
Self { store }
}
pub async fn handle(
&self,
task: &str,
_properties: BTreeMap<String, String>,
) -> anyhow::Result<()> {
tracing::info!("scheduling: {}", task);
let plan = Plan::new(self.store.clone());
let tasks: Vec<ConcreteTask> = plan
.tasks()
.await?
.into_iter()
.map(|i| i.into_task())
.collect();
for task in tasks {
let id = task.id().await?;
if !task.should_run().await? {
tracing::debug!(task = id, "skipping run");
continue;
}
tracing::info!(task = id, "executing task");
task.execute().await?;
}
Ok(())
}
}

View File

@ -0,0 +1,20 @@
use std::{collections::BTreeMap, fmt::Display};
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize, Clone, Debug)]
#[serde(tag = "type")]
pub enum Commands {
ScheduleTask {
task: String,
properties: BTreeMap<String, String>,
},
}
impl Display for Commands {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(match self {
Commands::ScheduleTask { .. } => "schedule_task",
})
}
}

View File

@ -0,0 +1,239 @@
use anyhow::Context;
use component::churn_tasks::process::HostProcess;
use futures::StreamExt;
use std::sync::Arc;
use tokio::io::AsyncWriteExt;
use tokio::sync::Mutex;
use wasmtime::component::*;
use wasmtime::{Config, Engine, Store};
use wasmtime_wasi::{DirPerms, FilePerms, WasiCtx, WasiCtxBuilder, WasiView};
wasmtime::component::bindgen!({
path: "wit/world.wit",
//world: "churn",
async: true,
with: {
"component:churn-tasks/process/process": CustomProcess
}
});
#[derive(Default)]
pub struct CustomProcess {}
impl CustomProcess {
pub fn run(&self, args: Vec<String>) -> String {
tracing::info!("calling function");
match args.split_first() {
Some((item, rest)) => {
let mut cmd = std::process::Command::new(item);
match cmd.args(rest).output() {
Ok(output) => std::str::from_utf8(&output.stdout)
.expect("to be able to parse utf8")
.to_string(),
Err(e) => {
tracing::error!("command failed with output: {e}");
e.to_string()
}
}
}
None => {
tracing::warn!("failed to call function because it is empty");
panic!("failed to call function because it is empty")
}
}
}
}
#[derive(Clone)]
pub struct PluginStore {
inner: Arc<Mutex<InnerPluginStore>>,
}
impl PluginStore {
pub fn new() -> anyhow::Result<Self> {
Ok(Self {
inner: Arc::new(Mutex::new(InnerPluginStore::new()?)),
})
}
pub async fn id(&self, plugin: &str) -> anyhow::Result<String> {
let mut inner = self.inner.lock().await;
inner.id(plugin).await
}
pub async fn execute(&self, plugin: &str) -> anyhow::Result<()> {
let mut inner = self.inner.lock().await;
inner.execute(plugin).await
}
}
pub struct InnerPluginStore {
store: wasmtime::Store<ServerWasiView>,
linker: wasmtime::component::Linker<ServerWasiView>,
engine: wasmtime::Engine,
}
impl InnerPluginStore {
pub fn new() -> anyhow::Result<Self> {
let mut config = Config::default();
config.wasm_component_model(true);
config.async_support(true);
let engine = Engine::new(&config)?;
let mut linker: wasmtime::component::Linker<ServerWasiView> = Linker::new(&engine);
// Add the command world (aka WASI CLI) to the linker
wasmtime_wasi::add_to_linker_async(&mut linker).context("Failed to link command world")?;
component::churn_tasks::process::add_to_linker(
&mut linker,
|state: &mut ServerWasiView| state,
)?;
let wasi_view = ServerWasiView::new();
let store = Store::new(&engine, wasi_view);
Ok(Self {
store,
linker,
engine,
})
}
pub async fn id(&mut self, plugin: &str) -> anyhow::Result<String> {
let plugin = self.ensure_plugin(plugin).await?;
plugin
.interface0
.call_id(&mut self.store)
.await
.context("Failed to call add function")
}
pub async fn execute(&mut self, plugin: &str) -> anyhow::Result<()> {
let plugin = self.ensure_plugin(plugin).await?;
plugin
.interface0
.call_execute(&mut self.store)
.await
.context("Failed to call add function")
}
async fn ensure_plugin(&mut self, plugin: &str) -> anyhow::Result<Churn> {
let cache = dirs::cache_dir()
.ok_or(anyhow::anyhow!("failed to find cache dir"))?
.join("io.kjuulh.churn");
let (plugin_name, plugin_version) = plugin.split_once("@").unwrap_or((plugin, "latest"));
let plugin_path = cache
.join("plugins")
.join(plugin_name)
.join(plugin_version)
.join(format!("{plugin_name}.wasm"));
let no_cache: bool = std::env::var("CHURN_NO_CACHE")
.unwrap_or("false".into())
.parse()?;
if !plugin_path.exists() || no_cache {
tracing::info!(
plugin_name = plugin_name,
plugin_version = plugin_version,
"downloading plugin"
);
if let Some(parent) = plugin_path.parent() {
tokio::fs::create_dir_all(parent).await?;
}
let req = reqwest::get(format!("https://api-minio.front.kjuulh.io/churn-registry/{plugin_name}/{plugin_version}/{plugin_name}.wasm")).await.context("failed to get plugin from registry")?;
let mut stream = req.bytes_stream();
let mut file = tokio::fs::File::create(&plugin_path).await?;
while let Some(chunk) = stream.next().await {
let chunk = chunk?;
file.write_all(&chunk).await?;
}
file.flush().await?;
}
let component =
Component::from_file(&self.engine, plugin_path).context("Component file not found")?;
tracing::debug!(
plugin_name = plugin_name,
plugin_version = plugin_version,
"instantiating plugin"
);
let instance = Churn::instantiate_async(&mut self.store, &component, &self.linker)
.await
.context("Failed to instantiate the example world")
.unwrap();
Ok(instance)
}
}
struct ServerWasiView {
table: ResourceTable,
ctx: WasiCtx,
processes: ResourceTable,
}
impl ServerWasiView {
fn new() -> Self {
let table = ResourceTable::new();
let ctx = WasiCtxBuilder::new()
.inherit_stdio()
.inherit_env()
.inherit_network()
.preopened_dir("/", "/", DirPerms::all(), FilePerms::all())
.expect("to be able to open root")
.build();
Self {
table,
ctx,
processes: ResourceTable::default(),
}
}
}
impl WasiView for ServerWasiView {
fn table(&mut self) -> &mut ResourceTable {
&mut self.table
}
fn ctx(&mut self) -> &mut WasiCtx {
&mut self.ctx
}
}
impl component::churn_tasks::process::Host for ServerWasiView {}
#[async_trait::async_trait]
impl HostProcess for ServerWasiView {
async fn new(
&mut self,
) -> wasmtime::component::Resource<component::churn_tasks::process::Process> {
self.processes.push(CustomProcess::default()).unwrap()
}
async fn run_process(
&mut self,
self_: wasmtime::component::Resource<component::churn_tasks::process::Process>,
inputs: wasmtime::component::__internal::Vec<String>,
) -> String {
let process = self.processes.get(&self_).unwrap();
process.run(inputs)
}
async fn drop(
&mut self,
rep: wasmtime::component::Resource<component::churn_tasks::process::Process>,
) -> wasmtime::Result<()> {
self.processes.delete(rep)?;
Ok(())
}
}

View File

@ -0,0 +1,67 @@
use std::sync::Arc;
use notmad::{Component, MadError};
use tokio::sync::Mutex;
use super::{models::Commands, scheduler::Scheduler};
#[derive(Clone)]
pub struct AgentQueue {
sender: Arc<tokio::sync::mpsc::Sender<Commands>>,
receiver: Arc<Mutex<tokio::sync::mpsc::Receiver<Commands>>>,
scheduler: Scheduler,
}
impl AgentQueue {
pub fn new(scheduler: Scheduler) -> Self {
let (tx, rx) = tokio::sync::mpsc::channel(5);
Self {
sender: Arc::new(tx),
receiver: Arc::new(Mutex::new(rx)),
scheduler,
}
}
pub async fn handler(&self, command: Commands) -> anyhow::Result<()> {
tracing::debug!("handling task");
self.scheduler.handle(command).await?;
Ok(())
}
pub async fn publish(&self, command: Commands) -> anyhow::Result<()> {
tracing::debug!("publishing task: {}", command.to_string());
self.sender.send(command).await?;
Ok(())
}
}
#[async_trait::async_trait]
impl Component for AgentQueue {
async fn run(
&self,
cancellation_token: tokio_util::sync::CancellationToken,
) -> Result<(), notmad::MadError> {
loop {
let mut recv = self.receiver.lock().await;
tokio::select! {
res = recv.recv() => {
if let Some(res) = res {
self.handler(res).await.map_err(MadError::Inner)?;
}
}
_ = cancellation_token.cancelled() => {
break
}
}
}
Ok(())
}
}

View File

@ -1,29 +1,37 @@
use anyhow::Context; use std::collections::BTreeMap;
use super::agent_state::AgentState; use crate::agent::models::Commands;
use super::{agent_state::AgentState, queue::AgentQueue};
#[derive(Clone)] #[derive(Clone)]
pub struct AgentRefresh { pub struct AgentRefresh {
_state: AgentState, process_host: String,
host: String, queue: AgentQueue,
} }
impl AgentRefresh { impl AgentRefresh {
pub fn new(state: impl Into<AgentState>, host: impl Into<String>) -> Self { pub fn new(state: impl Into<AgentState>) -> Self {
let state: AgentState = state.into();
Self { Self {
_state: state.into(), process_host: state.discovery.process_host.clone(),
host: host.into(), queue: state.queue.clone(),
} }
} }
} }
#[async_trait::async_trait] #[async_trait::async_trait]
impl notmad::Component for AgentRefresh { impl notmad::Component for AgentRefresh {
fn name(&self) -> Option<String> {
Some("agent_refresh".into())
}
async fn run( async fn run(
&self, &self,
cancellation_token: tokio_util::sync::CancellationToken, cancellation_token: tokio_util::sync::CancellationToken,
) -> Result<(), notmad::MadError> { ) -> Result<(), notmad::MadError> {
let cancel = nodrift::schedule_drifter(std::time::Duration::from_secs(60), self.clone()); let cancel =
nodrift::schedule_drifter(std::time::Duration::from_secs(60 * 10), self.clone());
tokio::select! { tokio::select! {
_ = cancel.cancelled() => {}, _ = cancel.cancelled() => {},
_ = cancellation_token.cancelled() => { _ = cancellation_token.cancelled() => {
@ -39,82 +47,14 @@ impl notmad::Component for AgentRefresh {
#[async_trait::async_trait] #[async_trait::async_trait]
impl nodrift::Drifter for AgentRefresh { impl nodrift::Drifter for AgentRefresh {
async fn execute(&self, _token: tokio_util::sync::CancellationToken) -> anyhow::Result<()> { async fn execute(&self, _token: tokio_util::sync::CancellationToken) -> anyhow::Result<()> {
tracing::info!(host = self.host, "refreshing agent"); tracing::info!(process_host = self.process_host, "refreshing agent");
// Get plan self.queue
let plan = Plan::new(); .publish(Commands::ScheduleTask {
let tasks = plan.tasks().await?; task: "update".into(),
properties: BTreeMap::default(),
// For task })
for task in tasks { .await?;
// Check idempotency rules
if !task.should_run().await? {
tracing::debug!(task = task.id(), "skipping run");
continue;
}
// Run task if not valid
tracing::info!(task = task.id(), "executing task");
task.execute().await?;
}
Ok(())
}
}
pub struct Plan {}
impl Plan {
pub fn new() -> Self {
Self {}
}
pub async fn tasks(&self) -> anyhow::Result<Vec<Task>> {
Ok(vec![Task::new()])
}
}
pub struct Task {}
impl Task {
pub fn new() -> Self {
Self {}
}
pub fn id(&self) -> String {
"apt".into()
}
pub async fn should_run(&self) -> anyhow::Result<bool> {
Ok(true)
}
pub async fn execute(&self) -> anyhow::Result<()> {
let mut cmd = tokio::process::Command::new("apt-get");
cmd.args(["update", "-q"]);
let output = cmd.output().await.context("failed to run apt update")?;
match output.status.success() {
true => tracing::info!("successfully ran apt update"),
false => {
anyhow::bail!(
"failed to run apt update: {}",
std::str::from_utf8(&output.stderr)?
);
}
}
let mut cmd = tokio::process::Command::new("apt-get");
cmd.env("DEBIAN_FRONTEND", "noninteractive")
.args(["upgrade", "-y"]);
let output = cmd.output().await.context("failed to run apt upgrade")?;
match output.status.success() {
true => tracing::info!("successfully ran apt upgrade"),
false => {
anyhow::bail!(
"failed to run apt upgrade: {}",
std::str::from_utf8(&output.stderr)?
);
}
}
Ok(()) Ok(())
} }

View File

@ -0,0 +1,22 @@
use super::{handlers::scheduled_tasks::ScheduledTasks, models::Commands};
#[derive(Clone)]
pub struct Scheduler {
scheduled_tasks: ScheduledTasks,
}
impl Scheduler {
pub fn new(scheduled_tasks: ScheduledTasks) -> Self {
Self { scheduled_tasks }
}
pub async fn handle(&self, command: Commands) -> anyhow::Result<()> {
match command {
Commands::ScheduleTask { task, properties } => {
self.scheduled_tasks.handle(&task, properties).await?;
}
}
Ok(())
}
}

View File

@ -0,0 +1,45 @@
use std::sync::Arc;
#[async_trait::async_trait]
pub trait Task {
async fn id(&self) -> anyhow::Result<String>;
async fn should_run(&self) -> anyhow::Result<bool> {
Ok(true)
}
async fn execute(&self) -> anyhow::Result<()>;
}
pub trait IntoTask {
fn into_task(self) -> ConcreteTask;
}
#[derive(Clone)]
pub struct ConcreteTask {
inner: Arc<dyn Task + Sync + Send + 'static>,
}
impl ConcreteTask {
pub fn new<T: Task + Sync + Send + 'static>(t: T) -> Self {
Self { inner: Arc::new(t) }
}
}
impl std::ops::Deref for ConcreteTask {
type Target = Arc<dyn Task + Sync + Send + 'static>;
fn deref(&self) -> &Self::Target {
&self.inner
}
}
impl IntoTask for ConcreteTask {
fn into_task(self) -> ConcreteTask {
self
}
}
impl<T: Task + Sync + Send + 'static> IntoTask for T {
fn into_task(self) -> ConcreteTask {
ConcreteTask::new(self)
}
}

View File

@ -1,6 +1,12 @@
use std::net::SocketAddr; use std::net::SocketAddr;
use axum::{extract::MatchedPath, http::Request, routing::get, Router}; use axum::{
extract::{MatchedPath, State},
http::Request,
routing::get,
Json, Router,
};
use serde::{Deserialize, Serialize};
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
use tower_http::trace::TraceLayer; use tower_http::trace::TraceLayer;
@ -22,6 +28,7 @@ impl Api {
pub async fn serve(&self) -> anyhow::Result<()> { pub async fn serve(&self) -> anyhow::Result<()> {
let app = Router::new() let app = Router::new()
.route("/", get(root)) .route("/", get(root))
.route("/discovery", get(discovery))
.with_state(self.state.clone()) .with_state(self.state.clone())
.layer( .layer(
TraceLayer::new_for_http().make_span_with(|request: &Request<_>| { TraceLayer::new_for_http().make_span_with(|request: &Request<_>| {
@ -55,6 +62,28 @@ async fn root() -> &'static str {
"Hello, churn!" "Hello, churn!"
} }
#[derive(Serialize, Deserialize)]
pub struct Discovery {
pub external_host: String,
pub process_host: String,
}
impl Discovery {
pub async fn get_from_host(host: &str) -> anyhow::Result<Self> {
let resp = reqwest::get(format!("{}/discovery", host.trim_end_matches('/'))).await?;
let s: Self = resp.json().await?;
Ok(s)
}
}
async fn discovery(State(state): State<SharedState>) -> Json<Discovery> {
Json(Discovery {
external_host: state.config.external_host.clone(),
process_host: state.config.process_host.clone(),
})
}
#[async_trait::async_trait] #[async_trait::async_trait]
impl notmad::Component for Api { impl notmad::Component for Api {
async fn run(&self, _cancellation_token: CancellationToken) -> Result<(), notmad::MadError> { async fn run(&self, _cancellation_token: CancellationToken) -> Result<(), notmad::MadError> {

View File

@ -1,28 +1,39 @@
use std::net::SocketAddr; use std::{collections::BTreeMap, net::SocketAddr};
use clap::{Parser, Subcommand}; use clap::{Parser, Subcommand};
use crate::{agent, api, state::SharedState}; use crate::{agent, server};
pub async fn execute() -> anyhow::Result<()> { pub async fn execute() -> anyhow::Result<()> {
let state = SharedState::new().await?;
let cli = Command::parse(); let cli = Command::parse();
match cli.command.expect("to have a subcommand") { match cli.command.expect("to have a subcommand") {
Commands::Serve { host } => { Commands::Serve {
host,
grpc_host,
config,
} => {
tracing::info!("Starting service"); tracing::info!("Starting service");
server::execute(host, grpc_host, config).await?;
notmad::Mad::builder()
.add(api::Api::new(&state, host))
.run()
.await?;
} }
Commands::Agent { commands } => match commands { Commands::Agent { commands } => match commands {
AgentCommands::Start { host } => { AgentCommands::Start {} => {
tracing::info!("starting agent"); tracing::info!("starting agent");
agent::execute(host).await?; agent::execute().await?;
tracing::info!("shut down agent"); tracing::info!("shut down agent");
} }
AgentCommands::Setup {
force,
discovery,
labels,
} => {
let mut setup_labels = BTreeMap::new();
for (k, v) in labels {
setup_labels.insert(k, v);
}
agent::setup_config(discovery, force, setup_labels).await?;
tracing::info!("wrote default agent config");
}
}, },
} }
@ -41,6 +52,12 @@ enum Commands {
Serve { Serve {
#[arg(env = "SERVICE_HOST", long, default_value = "127.0.0.1:3000")] #[arg(env = "SERVICE_HOST", long, default_value = "127.0.0.1:3000")]
host: SocketAddr, host: SocketAddr,
#[arg(env = "SERVICE_GRPC_HOST", long, default_value = "127.0.0.1:7900")]
grpc_host: SocketAddr,
#[clap(flatten)]
config: server::config::ServerConfig,
}, },
Agent { Agent {
#[command(subcommand)] #[command(subcommand)]
@ -50,8 +67,22 @@ enum Commands {
#[derive(Subcommand)] #[derive(Subcommand)]
enum AgentCommands { enum AgentCommands {
Start { Start {},
#[arg(env = "SERVICE_HOST", long = "service-host")] Setup {
host: String, #[arg(long, default_value = "false")]
force: bool,
#[arg(env = "DISCOVERY_HOST", long = "discovery")]
discovery: String,
#[arg(long = "label", short = 'l', value_parser = parse_key_val, action = clap::ArgAction::Append)]
labels: Vec<(String, String)>,
}, },
} }
fn parse_key_val(s: &str) -> Result<(String, String), String> {
let (key, value) = s
.split_once("=")
.ok_or_else(|| format!("invalid key=value: no `=` found in `{s}`"))?;
Ok((key.to_string(), value.to_string()))
}

View File

@ -0,0 +1,52 @@
// @generated
// This file is @generated by prost-build.
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct GetKeyRequest {
#[prost(string, tag="1")]
pub namespace: ::prost::alloc::string::String,
#[prost(string, optional, tag="2")]
pub id: ::core::option::Option<::prost::alloc::string::String>,
#[prost(string, tag="3")]
pub key: ::prost::alloc::string::String,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct GetKeyResponse {
#[prost(string, optional, tag="1")]
pub value: ::core::option::Option<::prost::alloc::string::String>,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct SetKeyRequest {
#[prost(string, tag="1")]
pub namespace: ::prost::alloc::string::String,
#[prost(string, optional, tag="2")]
pub id: ::core::option::Option<::prost::alloc::string::String>,
#[prost(string, tag="3")]
pub key: ::prost::alloc::string::String,
#[prost(string, tag="4")]
pub value: ::prost::alloc::string::String,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, Copy, PartialEq, ::prost::Message)]
pub struct SetKeyResponse {
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ListenEventsRequest {
#[prost(string, tag="1")]
pub namespace: ::prost::alloc::string::String,
#[prost(string, optional, tag="2")]
pub id: ::core::option::Option<::prost::alloc::string::String>,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ListenEventsResponse {
#[prost(string, tag="1")]
pub id: ::prost::alloc::string::String,
#[prost(string, tag="2")]
pub value: ::prost::alloc::string::String,
}
include!("churn.v1.tonic.rs");
// @@protoc_insertion_point(module)

View File

@ -0,0 +1,435 @@
// @generated
/// Generated client implementations.
pub mod churn_client {
#![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)]
use tonic::codegen::*;
use tonic::codegen::http::Uri;
#[derive(Debug, Clone)]
pub struct ChurnClient<T> {
inner: tonic::client::Grpc<T>,
}
impl ChurnClient<tonic::transport::Channel> {
/// Attempt to create a new client by connecting to a given endpoint.
pub async fn connect<D>(dst: D) -> Result<Self, tonic::transport::Error>
where
D: TryInto<tonic::transport::Endpoint>,
D::Error: Into<StdError>,
{
let conn = tonic::transport::Endpoint::new(dst)?.connect().await?;
Ok(Self::new(conn))
}
}
impl<T> ChurnClient<T>
where
T: tonic::client::GrpcService<tonic::body::BoxBody>,
T::Error: Into<StdError>,
T::ResponseBody: Body<Data = Bytes> + Send + 'static,
<T::ResponseBody as Body>::Error: Into<StdError> + Send,
{
pub fn new(inner: T) -> Self {
let inner = tonic::client::Grpc::new(inner);
Self { inner }
}
pub fn with_origin(inner: T, origin: Uri) -> Self {
let inner = tonic::client::Grpc::with_origin(inner, origin);
Self { inner }
}
pub fn with_interceptor<F>(
inner: T,
interceptor: F,
) -> ChurnClient<InterceptedService<T, F>>
where
F: tonic::service::Interceptor,
T::ResponseBody: Default,
T: tonic::codegen::Service<
http::Request<tonic::body::BoxBody>,
Response = http::Response<
<T as tonic::client::GrpcService<tonic::body::BoxBody>>::ResponseBody,
>,
>,
<T as tonic::codegen::Service<
http::Request<tonic::body::BoxBody>,
>>::Error: Into<StdError> + Send + Sync,
{
ChurnClient::new(InterceptedService::new(inner, interceptor))
}
/// Compress requests with the given encoding.
///
/// This requires the server to support it otherwise it might respond with an
/// error.
#[must_use]
pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self {
self.inner = self.inner.send_compressed(encoding);
self
}
/// Enable decompressing responses.
#[must_use]
pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self {
self.inner = self.inner.accept_compressed(encoding);
self
}
/// Limits the maximum size of a decoded message.
///
/// Default: `4MB`
#[must_use]
pub fn max_decoding_message_size(mut self, limit: usize) -> Self {
self.inner = self.inner.max_decoding_message_size(limit);
self
}
/// Limits the maximum size of an encoded message.
///
/// Default: `usize::MAX`
#[must_use]
pub fn max_encoding_message_size(mut self, limit: usize) -> Self {
self.inner = self.inner.max_encoding_message_size(limit);
self
}
pub async fn get_key(
&mut self,
request: impl tonic::IntoRequest<super::GetKeyRequest>,
) -> std::result::Result<tonic::Response<super::GetKeyResponse>, tonic::Status> {
self.inner
.ready()
.await
.map_err(|e| {
tonic::Status::new(
tonic::Code::Unknown,
format!("Service was not ready: {}", e.into()),
)
})?;
let codec = tonic::codec::ProstCodec::default();
let path = http::uri::PathAndQuery::from_static("/churn.v1.Churn/GetKey");
let mut req = request.into_request();
req.extensions_mut().insert(GrpcMethod::new("churn.v1.Churn", "GetKey"));
self.inner.unary(req, path, codec).await
}
pub async fn set_key(
&mut self,
request: impl tonic::IntoRequest<super::SetKeyRequest>,
) -> std::result::Result<tonic::Response<super::SetKeyResponse>, tonic::Status> {
self.inner
.ready()
.await
.map_err(|e| {
tonic::Status::new(
tonic::Code::Unknown,
format!("Service was not ready: {}", e.into()),
)
})?;
let codec = tonic::codec::ProstCodec::default();
let path = http::uri::PathAndQuery::from_static("/churn.v1.Churn/SetKey");
let mut req = request.into_request();
req.extensions_mut().insert(GrpcMethod::new("churn.v1.Churn", "SetKey"));
self.inner.unary(req, path, codec).await
}
pub async fn listen_events(
&mut self,
request: impl tonic::IntoRequest<super::ListenEventsRequest>,
) -> std::result::Result<
tonic::Response<tonic::codec::Streaming<super::ListenEventsResponse>>,
tonic::Status,
> {
self.inner
.ready()
.await
.map_err(|e| {
tonic::Status::new(
tonic::Code::Unknown,
format!("Service was not ready: {}", e.into()),
)
})?;
let codec = tonic::codec::ProstCodec::default();
let path = http::uri::PathAndQuery::from_static(
"/churn.v1.Churn/ListenEvents",
);
let mut req = request.into_request();
req.extensions_mut()
.insert(GrpcMethod::new("churn.v1.Churn", "ListenEvents"));
self.inner.server_streaming(req, path, codec).await
}
}
}
/// Generated server implementations.
pub mod churn_server {
#![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)]
use tonic::codegen::*;
/// Generated trait containing gRPC methods that should be implemented for use with ChurnServer.
#[async_trait]
pub trait Churn: Send + Sync + 'static {
async fn get_key(
&self,
request: tonic::Request<super::GetKeyRequest>,
) -> std::result::Result<tonic::Response<super::GetKeyResponse>, tonic::Status>;
async fn set_key(
&self,
request: tonic::Request<super::SetKeyRequest>,
) -> std::result::Result<tonic::Response<super::SetKeyResponse>, tonic::Status>;
/// Server streaming response type for the ListenEvents method.
type ListenEventsStream: tonic::codegen::tokio_stream::Stream<
Item = std::result::Result<super::ListenEventsResponse, tonic::Status>,
>
+ Send
+ 'static;
async fn listen_events(
&self,
request: tonic::Request<super::ListenEventsRequest>,
) -> std::result::Result<
tonic::Response<Self::ListenEventsStream>,
tonic::Status,
>;
}
#[derive(Debug)]
pub struct ChurnServer<T: Churn> {
inner: _Inner<T>,
accept_compression_encodings: EnabledCompressionEncodings,
send_compression_encodings: EnabledCompressionEncodings,
max_decoding_message_size: Option<usize>,
max_encoding_message_size: Option<usize>,
}
struct _Inner<T>(Arc<T>);
impl<T: Churn> ChurnServer<T> {
pub fn new(inner: T) -> Self {
Self::from_arc(Arc::new(inner))
}
pub fn from_arc(inner: Arc<T>) -> Self {
let inner = _Inner(inner);
Self {
inner,
accept_compression_encodings: Default::default(),
send_compression_encodings: Default::default(),
max_decoding_message_size: None,
max_encoding_message_size: None,
}
}
pub fn with_interceptor<F>(
inner: T,
interceptor: F,
) -> InterceptedService<Self, F>
where
F: tonic::service::Interceptor,
{
InterceptedService::new(Self::new(inner), interceptor)
}
/// Enable decompressing requests with the given encoding.
#[must_use]
pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self {
self.accept_compression_encodings.enable(encoding);
self
}
/// Compress responses with the given encoding, if the client supports it.
#[must_use]
pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self {
self.send_compression_encodings.enable(encoding);
self
}
/// Limits the maximum size of a decoded message.
///
/// Default: `4MB`
#[must_use]
pub fn max_decoding_message_size(mut self, limit: usize) -> Self {
self.max_decoding_message_size = Some(limit);
self
}
/// Limits the maximum size of an encoded message.
///
/// Default: `usize::MAX`
#[must_use]
pub fn max_encoding_message_size(mut self, limit: usize) -> Self {
self.max_encoding_message_size = Some(limit);
self
}
}
impl<T, B> tonic::codegen::Service<http::Request<B>> for ChurnServer<T>
where
T: Churn,
B: Body + Send + 'static,
B::Error: Into<StdError> + Send + 'static,
{
type Response = http::Response<tonic::body::BoxBody>;
type Error = std::convert::Infallible;
type Future = BoxFuture<Self::Response, Self::Error>;
fn poll_ready(
&mut self,
_cx: &mut Context<'_>,
) -> Poll<std::result::Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, req: http::Request<B>) -> Self::Future {
let inner = self.inner.clone();
match req.uri().path() {
"/churn.v1.Churn/GetKey" => {
#[allow(non_camel_case_types)]
struct GetKeySvc<T: Churn>(pub Arc<T>);
impl<T: Churn> tonic::server::UnaryService<super::GetKeyRequest>
for GetKeySvc<T> {
type Response = super::GetKeyResponse;
type Future = BoxFuture<
tonic::Response<Self::Response>,
tonic::Status,
>;
fn call(
&mut self,
request: tonic::Request<super::GetKeyRequest>,
) -> Self::Future {
let inner = Arc::clone(&self.0);
let fut = async move {
<T as Churn>::get_key(&inner, request).await
};
Box::pin(fut)
}
}
let accept_compression_encodings = self.accept_compression_encodings;
let send_compression_encodings = self.send_compression_encodings;
let max_decoding_message_size = self.max_decoding_message_size;
let max_encoding_message_size = self.max_encoding_message_size;
let inner = self.inner.clone();
let fut = async move {
let inner = inner.0;
let method = GetKeySvc(inner);
let codec = tonic::codec::ProstCodec::default();
let mut grpc = tonic::server::Grpc::new(codec)
.apply_compression_config(
accept_compression_encodings,
send_compression_encodings,
)
.apply_max_message_size_config(
max_decoding_message_size,
max_encoding_message_size,
);
let res = grpc.unary(method, req).await;
Ok(res)
};
Box::pin(fut)
}
"/churn.v1.Churn/SetKey" => {
#[allow(non_camel_case_types)]
struct SetKeySvc<T: Churn>(pub Arc<T>);
impl<T: Churn> tonic::server::UnaryService<super::SetKeyRequest>
for SetKeySvc<T> {
type Response = super::SetKeyResponse;
type Future = BoxFuture<
tonic::Response<Self::Response>,
tonic::Status,
>;
fn call(
&mut self,
request: tonic::Request<super::SetKeyRequest>,
) -> Self::Future {
let inner = Arc::clone(&self.0);
let fut = async move {
<T as Churn>::set_key(&inner, request).await
};
Box::pin(fut)
}
}
let accept_compression_encodings = self.accept_compression_encodings;
let send_compression_encodings = self.send_compression_encodings;
let max_decoding_message_size = self.max_decoding_message_size;
let max_encoding_message_size = self.max_encoding_message_size;
let inner = self.inner.clone();
let fut = async move {
let inner = inner.0;
let method = SetKeySvc(inner);
let codec = tonic::codec::ProstCodec::default();
let mut grpc = tonic::server::Grpc::new(codec)
.apply_compression_config(
accept_compression_encodings,
send_compression_encodings,
)
.apply_max_message_size_config(
max_decoding_message_size,
max_encoding_message_size,
);
let res = grpc.unary(method, req).await;
Ok(res)
};
Box::pin(fut)
}
"/churn.v1.Churn/ListenEvents" => {
#[allow(non_camel_case_types)]
struct ListenEventsSvc<T: Churn>(pub Arc<T>);
impl<
T: Churn,
> tonic::server::ServerStreamingService<super::ListenEventsRequest>
for ListenEventsSvc<T> {
type Response = super::ListenEventsResponse;
type ResponseStream = T::ListenEventsStream;
type Future = BoxFuture<
tonic::Response<Self::ResponseStream>,
tonic::Status,
>;
fn call(
&mut self,
request: tonic::Request<super::ListenEventsRequest>,
) -> Self::Future {
let inner = Arc::clone(&self.0);
let fut = async move {
<T as Churn>::listen_events(&inner, request).await
};
Box::pin(fut)
}
}
let accept_compression_encodings = self.accept_compression_encodings;
let send_compression_encodings = self.send_compression_encodings;
let max_decoding_message_size = self.max_decoding_message_size;
let max_encoding_message_size = self.max_encoding_message_size;
let inner = self.inner.clone();
let fut = async move {
let inner = inner.0;
let method = ListenEventsSvc(inner);
let codec = tonic::codec::ProstCodec::default();
let mut grpc = tonic::server::Grpc::new(codec)
.apply_compression_config(
accept_compression_encodings,
send_compression_encodings,
)
.apply_max_message_size_config(
max_decoding_message_size,
max_encoding_message_size,
);
let res = grpc.server_streaming(method, req).await;
Ok(res)
};
Box::pin(fut)
}
_ => {
Box::pin(async move {
Ok(
http::Response::builder()
.status(200)
.header("grpc-status", "12")
.header("content-type", "application/grpc")
.body(empty_body())
.unwrap(),
)
})
}
}
}
}
impl<T: Churn> Clone for ChurnServer<T> {
fn clone(&self) -> Self {
let inner = self.inner.clone();
Self {
inner,
accept_compression_encodings: self.accept_compression_encodings,
send_compression_encodings: self.send_compression_encodings,
max_decoding_message_size: self.max_decoding_message_size,
max_encoding_message_size: self.max_encoding_message_size,
}
}
}
impl<T: Churn> Clone for _Inner<T> {
fn clone(&self) -> Self {
Self(Arc::clone(&self.0))
}
}
impl<T: std::fmt::Debug> std::fmt::Debug for _Inner<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{:?}", self.0)
}
}
impl<T: Churn> tonic::server::NamedService for ChurnServer<T> {
const NAME: &'static str = "churn.v1.Churn";
}
}

View File

@ -1,8 +1,12 @@
mod api; mod api;
mod cli; mod cli;
mod state; mod state;
mod grpc {
include!("grpc/churn.v1.rs");
}
mod agent; mod agent;
mod server;
#[tokio::main] #[tokio::main]
async fn main() -> anyhow::Result<()> { async fn main() -> anyhow::Result<()> {

View File

@ -0,0 +1,23 @@
use std::net::SocketAddr;
pub mod config;
mod grpc_server;
use crate::{api, state::SharedState};
pub async fn execute(
host: impl Into<SocketAddr>,
grpc_host: impl Into<SocketAddr>,
config: config::ServerConfig,
) -> anyhow::Result<()> {
let state = SharedState::new(config).await?;
notmad::Mad::builder()
.add(api::Api::new(&state, host))
.add(grpc_server::GrpcServer::new(grpc_host.into()))
.run()
.await?;
Ok(())
}

View File

@ -0,0 +1,7 @@
#[derive(clap::Args)]
pub struct ServerConfig {
#[arg(long = "external-host", env = "EXTERNAL_HOST")]
pub external_host: String,
#[arg(long = "process-host", env = "PROCESS_HOST")]
pub process_host: String,
}

View File

@ -0,0 +1,102 @@
use std::{collections::BTreeMap, net::SocketAddr, pin::Pin};
use anyhow::Context;
use futures::Stream;
use notmad::{Component, MadError};
use tonic::transport::Server;
use crate::{agent::models::Commands, grpc::*};
#[derive(Clone)]
pub struct GrpcServer {
grpc_host: SocketAddr,
}
impl GrpcServer {
pub fn new(grpc_host: SocketAddr) -> Self {
Self { grpc_host }
}
}
#[async_trait::async_trait]
impl Component for GrpcServer {
async fn run(
&self,
cancellation_token: tokio_util::sync::CancellationToken,
) -> Result<(), notmad::MadError> {
let task = Server::builder()
.add_service(crate::grpc::churn_server::ChurnServer::new(self.clone()))
.serve(self.grpc_host);
tokio::select! {
_ = cancellation_token.cancelled() => {},
res = task => {
res.context("failed to run grpc server").map_err(MadError::Inner)?;
}
}
Ok(())
}
}
#[async_trait::async_trait]
impl crate::grpc::churn_server::Churn for GrpcServer {
async fn get_key(
&self,
request: tonic::Request<GetKeyRequest>,
) -> std::result::Result<tonic::Response<GetKeyResponse>, tonic::Status> {
todo!()
}
async fn set_key(
&self,
request: tonic::Request<SetKeyRequest>,
) -> std::result::Result<tonic::Response<SetKeyResponse>, tonic::Status> {
todo!()
}
#[doc = " Server streaming response type for the ListenEvents method."]
type ListenEventsStream =
Pin<Box<dyn Stream<Item = Result<ListenEventsResponse, tonic::Status>> + Send>>;
async fn listen_events(
&self,
request: tonic::Request<ListenEventsRequest>,
) -> std::result::Result<tonic::Response<Self::ListenEventsStream>, tonic::Status> {
let (tx, rx) = tokio::sync::mpsc::channel(128);
tokio::spawn(async move {
let mut interval = tokio::time::interval(std::time::Duration::from_secs(60 * 10));
loop {
interval.tick().await;
let Ok(schedule_task) = serde_json::to_string(&Commands::ScheduleTask {
task: "refresh".into(),
properties: BTreeMap::default(),
}) else {
tracing::warn!("failed to serialize event");
continue;
};
if let Err(e) = tx
.send(Ok(ListenEventsResponse {
id: uuid::Uuid::new_v4().to_string(),
value: schedule_task,
}))
.await
{
tracing::warn!("failed to send response: {}", e);
break;
}
}
});
let stream = futures::stream::unfold(rx, |mut msg| async move {
let next = msg.recv().await?;
Some((next, msg))
});
Ok(tonic::Response::new(Box::pin(stream)))
}
}

View File

@ -1,11 +1,13 @@
use std::{ops::Deref, sync::Arc}; use std::{ops::Deref, sync::Arc};
use crate::server::config::ServerConfig;
#[derive(Clone)] #[derive(Clone)]
pub struct SharedState(Arc<State>); pub struct SharedState(Arc<State>);
impl SharedState { impl SharedState {
pub async fn new() -> anyhow::Result<Self> { pub async fn new(config: ServerConfig) -> anyhow::Result<Self> {
Ok(Self(Arc::new(State::new().await?))) Ok(Self(Arc::new(State::new(config).await?)))
} }
} }
@ -23,10 +25,12 @@ impl Deref for SharedState {
} }
} }
pub struct State {} pub struct State {
pub config: ServerConfig,
}
impl State { impl State {
pub async fn new() -> anyhow::Result<Self> { pub async fn new(config: ServerConfig) -> anyhow::Result<Self> {
Ok(Self {}) Ok(Self { config })
} }
} }

View File

@ -0,0 +1,19 @@
package component:churn-tasks@0.1.0;
interface process {
resource process {
constructor();
run-process: func(inputs: list<string>) -> string;
}
}
interface task {
id: func() -> string;
should-run: func() -> bool;
execute: func();
}
world churn {
export task;
import process;
}

View File

@ -12,11 +12,20 @@ vars:
ingress: ingress:
- external: "true" - external: "true"
- internal: "true" - internal: "true"
- internal_grpc: "true"
cuddle/clusters: cuddle/clusters:
dev: dev:
env: env:
service.host: "0.0.0.0:3000" service.host: "0.0.0.0:3000"
service.grpc.host: "0.0.0.0:4001"
process.host: "https://grpc.churn.dev.internal.kjuulh.app"
external.host: "https://churn.internal.dev.kjuulh.app"
rust.log: "h2=warn,debug"
prod: prod:
env: env:
service.host: "0.0.0.0:3000" service.host: "0.0.0.0:3000"
service.grpc.host: "0.0.0.0:4001"
process.host: "https://grpc.churn.prod.internal.kjuulh.app"
external.host: "https://churn.internal.prod.kjuulh.app"
rust.log: "h2=warn,debug"

View File

@ -1,97 +1,92 @@
#!/bin/bash #!/usr/bin/env bash
set -e set -e
# Configuration variables # Configuration
GITEA_HOST="https://git.front.kjuulh.io" APP_NAME="churn"
REPO_OWNER="kjuulh" APP_VERSION="latest" # or specify a version
REPO_NAME="churn-v2" S3_BUCKET="rust-artifacts"
BINARY_NAME="churn" BINARY_NAME="churn"
SERVICE_NAME="churn-agent" SERVICE_NAME="${APP_NAME}.service"
SERVICE_USER="churn" INSTALL_DIR="/usr/local/bin"
RELEASE_TAG="latest" # or specific version like "v1.0.0" CONFIG_DIR="/etc/${APP_NAME}"
CHURN_DISCOVERY="https://churn.prod.kjuulh.app"
# Colors for output
RED='\033[0;31m'
GREEN='\033[0;32m'
NC='\033[0m' # No Color
# Check if running as root # Check if running as root
if [ "$EUID" -ne 0 ]; then if [ "$EUID" -ne 0 ]; then
echo "Please run as root" echo -e "${RED}Please run as root${NC}"
exit 1 exit 1
fi fi
# Create service user if it doesn't exist # Create necessary directories
if ! id "$SERVICE_USER" &>/dev/null; then echo "Creating directories..."
useradd -r -s /bin/false "$SERVICE_USER" mkdir -p "${INSTALL_DIR}"
mkdir -p "${CONFIG_DIR}"
if systemctl is-active --quiet churn.service; then
echo "Stopping existing churn service..."
systemctl stop churn.service
fi fi
# Function to get latest release if RELEASE_TAG is "latest" # Download binary from S3
get_latest_release() { echo "Downloading binary..."
curl -s "https://$GITEA_HOST/api/v1/repos/$REPO_OWNER/$REPO_NAME/releases/latest" | \ curl -L -s "https://api-minio.front.kjuulh.io/${S3_BUCKET}/releases/${APP_NAME}/${APP_VERSION}/${BINARY_NAME}" -o "${INSTALL_DIR}/${BINARY_NAME}"
grep '"tag_name":' | \
sed -E 's/.*"([^"]+)".*/\1/'
}
# Determine the actual release tag # Make binary executable
if [ "$RELEASE_TAG" = "latest" ]; then chmod +x "${INSTALL_DIR}/${BINARY_NAME}"
RELEASE_TAG=$(get_latest_release)
echo "Starting churn agent setup..."
set +e
res=$(churn agent setup --discovery "${CHURN_DISCOVERY}" 2>&1)
set -e
exit_code=$?
if [[ $exit_code != 0 ]]; then
if [[ "$res" != *"config file already exists"* ]] && [[ "$res" != *"already exists"* ]]; then
echo "Error detected: $res"
exit 1
else
echo "Ignoring setup 'agent is already setup'"
fi
fi fi
echo "Installing $BINARY_NAME version $RELEASE_TAG..."
# Download and install binary
TMP_DIR=$(mktemp -d)
cd "$TMP_DIR"
# Download binary from Gitea
curl -L -o "$BINARY_NAME" \
"https://$GITEA_HOST/$REPO_OWNER/$REPO_NAME/releases/download/$RELEASE_TAG/$BINARY_NAME"
# Make binary executable and move to appropriate location
chmod +x "$BINARY_NAME"
mv "$BINARY_NAME" "/usr/local/bin/$BINARY_NAME"
# Create systemd service file # Create systemd service file
cat > "/etc/systemd/system/$SERVICE_NAME.service" << EOF echo "Creating systemd service..."
cat > "/etc/systemd/system/${SERVICE_NAME}" << EOF
[Unit] [Unit]
Description=$SERVICE_NAME Service Description=${APP_NAME} service
After=network.target After=network.target
[Service] [Service]
Type=simple Type=simple
User=$SERVICE_USER User=root
ExecStart=/usr/local/bin/$BINARY_NAME Group=root
ExecStart=${INSTALL_DIR}/${BINARY_NAME} agent start
Restart=always Restart=always
RestartSec=5 RestartSec=10
Environment=RUST_LOG=h2=warn,hyper=warn,churn=debug,warn
# Security hardening options
ProtectSystem=strict
ProtectHome=true
NoNewPrivileges=true
ReadWritePaths=/var/log/$SERVICE_NAME
[Install] [Install]
WantedBy=multi-user.target WantedBy=multi-user.target
EOF EOF
# Create log directory if logging is needed
mkdir -p "/var/log/$SERVICE_NAME"
chown "$SERVICE_USER:$SERVICE_USER" "/var/log/$SERVICE_NAME"
# Reload systemd and enable service # Reload systemd and enable service
echo "Configuring systemd service..."
systemctl daemon-reload systemctl daemon-reload
systemctl enable "$SERVICE_NAME" systemctl enable "${SERVICE_NAME}"
systemctl start "$SERVICE_NAME" systemctl start "${SERVICE_NAME}"
# Clean up # Check service status
cd if systemctl is-active --quiet "${SERVICE_NAME}"; then
rm -rf "$TMP_DIR" echo -e "${GREEN}Installation successful! ${APP_NAME} is running.${NC}"
echo "You can check the status with: systemctl status ${SERVICE_NAME}"
else
echo -e "${RED}Installation completed but service failed to start. Check logs with: journalctl -u ${SERVICE_NAME}${NC}"
exit 1
fi
echo "Installation complete! Service status:"
systemctl status "$SERVICE_NAME"
# Provide some helpful commands
echo "
Useful commands:
- Check status: systemctl status $SERVICE_NAME
- View logs: journalctl -u $SERVICE_NAME
- Restart service: systemctl restart $SERVICE_NAME
- Stop service: systemctl stop $SERVICE_NAME
"

7
upload.sh Executable file
View File

@ -0,0 +1,7 @@
#!/usr/bin/env zsh
set -e
cargo build --release --target x86_64-unknown-linux-musl
aws s3 cp target/x86_64-unknown-linux-musl/release/churn s3://rust-artifacts/releases/churn/latest/churn --endpoint-url https://api-minio.front.kjuulh.io