Compare commits

...

4 Commits

Author SHA1 Message Date
e0bb98e855 fix(deps): update all dependencies
Some checks failed
renovate/artifacts Artifact file update failure
continuous-integration/drone/push Build is passing
continuous-integration/drone/pr Build is passing
2025-01-04 01:10:01 +00:00
03e23c7d9d feat: enable checking if it should actually run
All checks were successful
continuous-integration/drone/push Build is passing
Signed-off-by: kjuulh <contact@kjuulh.io>
2025-01-04 01:52:05 +01:00
83294306a4 feat: enable having get variable from local setup
All checks were successful
continuous-integration/drone/push Build is passing
Signed-off-by: kjuulh <contact@kjuulh.io>
2025-01-04 01:28:32 +01:00
ceaad75057 feat: inherit output as well
All checks were successful
continuous-integration/drone/push Build is passing
Signed-off-by: kjuulh <contact@kjuulh.io>
2025-01-04 00:35:18 +01:00
8 changed files with 152 additions and 35 deletions

106
Cargo.lock generated
View File

@ -135,9 +135,9 @@ dependencies = [
[[package]] [[package]]
name = "async-trait" name = "async-trait"
version = "0.1.83" version = "0.1.84"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "721cae7de5c34fbb2acd27e21e6d2cf7b886dce0c27388d46c4e6c47ea4318dd" checksum = "1b1244b10dcd56c92219da4e14caa97e312079e185f04ba3eea25061561dc0a0"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
@ -163,16 +163,43 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "edca88bc138befd0323b20752846e6587272d3b03b0343c8ea28a6f819e6e71f" checksum = "edca88bc138befd0323b20752846e6587272d3b03b0343c8ea28a6f819e6e71f"
dependencies = [ dependencies = [
"async-trait", "async-trait",
"axum-core", "axum-core 0.4.5",
"bytes", "bytes",
"futures-util", "futures-util",
"http", "http",
"http-body", "http-body",
"http-body-util", "http-body-util",
"itoa",
"matchit 0.7.3",
"memchr",
"mime",
"percent-encoding",
"pin-project-lite",
"rustversion",
"serde",
"sync_wrapper",
"tower 0.5.2",
"tower-layer",
"tower-service",
]
[[package]]
name = "axum"
version = "0.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6d6fd624c75e18b3b4c6b9caf42b1afe24437daaee904069137d8bab077be8b8"
dependencies = [
"axum-core 0.5.0",
"bytes",
"form_urlencoded",
"futures-util",
"http",
"http-body",
"http-body-util",
"hyper", "hyper",
"hyper-util", "hyper-util",
"itoa", "itoa",
"matchit", "matchit 0.8.4",
"memchr", "memchr",
"mime", "mime",
"percent-encoding", "percent-encoding",
@ -182,9 +209,9 @@ dependencies = [
"serde_json", "serde_json",
"serde_path_to_error", "serde_path_to_error",
"serde_urlencoded", "serde_urlencoded",
"sync_wrapper 1.0.2", "sync_wrapper",
"tokio", "tokio",
"tower 0.5.1", "tower 0.5.2",
"tower-layer", "tower-layer",
"tower-service", "tower-service",
"tracing", "tracing",
@ -205,7 +232,26 @@ dependencies = [
"mime", "mime",
"pin-project-lite", "pin-project-lite",
"rustversion", "rustversion",
"sync_wrapper 1.0.2", "sync_wrapper",
"tower-layer",
"tower-service",
]
[[package]]
name = "axum-core"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "df1362f362fd16024ae199c1970ce98f9661bf5ef94b9808fee734bc3698b733"
dependencies = [
"bytes",
"futures-util",
"http",
"http-body",
"http-body-util",
"mime",
"pin-project-lite",
"rustversion",
"sync_wrapper",
"tower-layer", "tower-layer",
"tower-service", "tower-service",
"tracing", "tracing",
@ -374,7 +420,7 @@ version = "0.1.0"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"async-trait", "async-trait",
"axum", "axum 0.8.1",
"bytes", "bytes",
"clap", "clap",
"dirs 5.0.1", "dirs 5.0.1",
@ -382,6 +428,7 @@ dependencies = [
"futures", "futures",
"nodrift", "nodrift",
"notmad", "notmad",
"petname",
"prost", "prost",
"prost-types", "prost-types",
"reqwest", "reqwest",
@ -1557,6 +1604,12 @@ version = "0.7.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94" checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94"
[[package]]
name = "matchit"
version = "0.8.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "47e1ffaa40ddd1f3ed91f717a33c8c0ee23fff369e3aa8772b9605cc1d22f4c3"
[[package]] [[package]]
name = "maybe-owned" name = "maybe-owned"
version = "0.3.4" version = "0.3.4"
@ -1781,6 +1834,20 @@ version = "2.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e" checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e"
[[package]]
name = "petname"
version = "2.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9cd31dcfdbbd7431a807ef4df6edd6473228e94d5c805e8cf671227a21bad068"
dependencies = [
"anyhow",
"clap",
"itertools",
"proc-macro2",
"quote",
"rand",
]
[[package]] [[package]]
name = "pin-project" name = "pin-project"
version = "1.1.7" version = "1.1.7"
@ -1996,9 +2063,9 @@ dependencies = [
[[package]] [[package]]
name = "reqwest" name = "reqwest"
version = "0.12.9" version = "0.12.12"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a77c62af46e79de0a562e1a9849205ffcb7fc1238876e9bd743357570e04046f" checksum = "43e734407157c3c2034e0258f5e4473ddb361b1e85f95a66690d67264d7cd1da"
dependencies = [ dependencies = [
"base64 0.22.1", "base64 0.22.1",
"bytes", "bytes",
@ -2025,10 +2092,11 @@ dependencies = [
"serde", "serde",
"serde_json", "serde_json",
"serde_urlencoded", "serde_urlencoded",
"sync_wrapper 1.0.2", "sync_wrapper",
"tokio", "tokio",
"tokio-native-tls", "tokio-native-tls",
"tokio-util", "tokio-util",
"tower 0.5.2",
"tower-service", "tower-service",
"url", "url",
"wasm-bindgen", "wasm-bindgen",
@ -2395,12 +2463,6 @@ dependencies = [
"unicode-ident", "unicode-ident",
] ]
[[package]]
name = "sync_wrapper"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160"
[[package]] [[package]]
name = "sync_wrapper" name = "sync_wrapper"
version = "1.0.2" version = "1.0.2"
@ -2641,7 +2703,7 @@ checksum = "877c5b330756d856ffcc4553ab34a5684481ade925ecc54bcd1bf02b1d0d4d52"
dependencies = [ dependencies = [
"async-stream", "async-stream",
"async-trait", "async-trait",
"axum", "axum 0.7.9",
"base64 0.22.1", "base64 0.22.1",
"bytes", "bytes",
"h2", "h2",
@ -2688,14 +2750,14 @@ dependencies = [
[[package]] [[package]]
name = "tower" name = "tower"
version = "0.5.1" version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2873938d487c3cfb9aed7546dc9f2711d867c9f90c46b889989a2cb84eba6b4f" checksum = "d039ad9159c98b70ecfd540b2573b97f7f52c3e8d9f8ad57a24b916a536975f9"
dependencies = [ dependencies = [
"futures-core", "futures-core",
"futures-util", "futures-util",
"pin-project-lite", "pin-project-lite",
"sync_wrapper 0.1.2", "sync_wrapper",
"tokio", "tokio",
"tower-layer", "tower-layer",
"tower-service", "tower-service",
@ -3409,7 +3471,7 @@ version = "0.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cf221c93e13a30d793f7645a0e7762c55d169dbb0a49671918a2319d289b10bb" checksum = "cf221c93e13a30d793f7645a0e7762c55d169dbb0a49671918a2319d289b10bb"
dependencies = [ dependencies = [
"windows-sys 0.59.0", "windows-sys 0.48.0",
] ]
[[package]] [[package]]

View File

@ -11,4 +11,4 @@ tracing = { version = "0.1", features = ["log"] }
tracing-subscriber = { version = "0.3.18" } tracing-subscriber = { version = "0.3.18" }
clap = { version = "4", features = ["derive", "env"] } clap = { version = "4", features = ["derive", "env"] }
dotenv = { version = "0.15" } dotenv = { version = "0.15" }
axum = { version = "0.7" } axum = { version = "0.8" }

View File

@ -37,3 +37,4 @@ reqwest = { version = "0.12.9", default-features = false, features = [
serde_json = "1.0.133" serde_json = "1.0.133"
wasmtime = "28.0.0" wasmtime = "28.0.0"
wasmtime-wasi = "28.0.0" wasmtime-wasi = "28.0.0"
petname = "2.0.2"

View File

@ -44,7 +44,7 @@ impl State {
let config = AgentConfig::new().await?; let config = AgentConfig::new().await?;
let discovery = DiscoveryClient::new(&config.discovery).discover().await?; let discovery = DiscoveryClient::new(&config.discovery).discover().await?;
let grpc = GrpcClient::new(&discovery.process_host); let grpc = GrpcClient::new(&discovery.process_host);
let plugin_store = PluginStore::new()?; let plugin_store = PluginStore::new(config.clone())?;
let scheduled_tasks = ScheduledTasks::new(plugin_store.clone()); let scheduled_tasks = ScheduledTasks::new(plugin_store.clone());
let scheduler = Scheduler::new(scheduled_tasks); let scheduler = Scheduler::new(scheduled_tasks);
let queue = AgentQueue::new(scheduler); let queue = AgentQueue::new(scheduler);

View File

@ -8,6 +8,8 @@ use uuid::Uuid;
pub struct AgentConfig { pub struct AgentConfig {
pub agent_id: String, pub agent_id: String,
pub discovery: String, pub discovery: String,
pub labels: BTreeMap<String, String>,
} }
impl AgentConfig { impl AgentConfig {
@ -17,6 +19,7 @@ impl AgentConfig {
Ok(Self { Ok(Self {
agent_id: config.agent_id, agent_id: config.agent_id,
discovery: config.discovery, discovery: config.discovery,
labels: config.labels.unwrap_or_default(),
}) })
} }
} }

View File

@ -8,6 +8,9 @@ use wasmtime::component::*;
use wasmtime::{Config, Engine, Store}; use wasmtime::{Config, Engine, Store};
use wasmtime_wasi::{DirPerms, FilePerms, WasiCtx, WasiCtxBuilder, WasiView}; use wasmtime_wasi::{DirPerms, FilePerms, WasiCtx, WasiCtxBuilder, WasiView};
use super::agent_state::State;
use super::config::AgentConfig;
wasmtime::component::bindgen!({ wasmtime::component::bindgen!({
path: "wit/world.wit", path: "wit/world.wit",
//world: "churn", //world: "churn",
@ -17,9 +20,14 @@ wasmtime::component::bindgen!({
} }
}); });
#[derive(Default)] pub struct CustomProcess {
pub struct CustomProcess {} agent_config: AgentConfig,
}
impl CustomProcess { impl CustomProcess {
pub fn new(agent_config: AgentConfig) -> Self {
Self { agent_config }
}
pub fn run(&self, args: Vec<String>) -> String { pub fn run(&self, args: Vec<String>) -> String {
tracing::info!("calling function"); tracing::info!("calling function");
@ -42,6 +50,10 @@ impl CustomProcess {
} }
} }
} }
pub fn get_label(&self, label_key: &str) -> Option<String> {
self.agent_config.labels.get(label_key).cloned()
}
} }
#[derive(Clone)] #[derive(Clone)]
@ -50,9 +62,9 @@ pub struct PluginStore {
} }
impl PluginStore { impl PluginStore {
pub fn new() -> anyhow::Result<Self> { pub fn new(config: AgentConfig) -> anyhow::Result<Self> {
Ok(Self { Ok(Self {
inner: Arc::new(Mutex::new(InnerPluginStore::new()?)), inner: Arc::new(Mutex::new(InnerPluginStore::new(config)?)),
}) })
} }
@ -74,7 +86,7 @@ pub struct InnerPluginStore {
} }
impl InnerPluginStore { impl InnerPluginStore {
pub fn new() -> anyhow::Result<Self> { pub fn new(agent_config: AgentConfig) -> anyhow::Result<Self> {
let mut config = Config::default(); let mut config = Config::default();
config.wasm_component_model(true); config.wasm_component_model(true);
config.async_support(true); config.async_support(true);
@ -89,7 +101,7 @@ impl InnerPluginStore {
|state: &mut ServerWasiView| state, |state: &mut ServerWasiView| state,
)?; )?;
let wasi_view = ServerWasiView::new(); let wasi_view = ServerWasiView::new(agent_config);
let store = Store::new(&engine, wasi_view); let store = Store::new(&engine, wasi_view);
Ok(Self { Ok(Self {
@ -112,11 +124,21 @@ impl InnerPluginStore {
pub async fn execute(&mut self, plugin: &str) -> anyhow::Result<()> { pub async fn execute(&mut self, plugin: &str) -> anyhow::Result<()> {
let plugin = self.ensure_plugin(plugin).await?; let plugin = self.ensure_plugin(plugin).await?;
plugin if plugin
.interface0 .interface0
.call_execute(&mut self.store) .call_should_run(&mut self.store)
.await .await
.context("Failed to call add function") .context("Failed to call should run")?
{
tracing::info!("job was marked as required to run");
return plugin
.interface0
.call_execute(&mut self.store)
.await
.context("Failed to call add function");
}
Ok(())
} }
async fn ensure_plugin(&mut self, plugin: &str) -> anyhow::Result<Churn> { async fn ensure_plugin(&mut self, plugin: &str) -> anyhow::Result<Churn> {
@ -148,6 +170,12 @@ impl InnerPluginStore {
let req = reqwest::get(format!("https://api-minio.front.kjuulh.io/churn-registry/{plugin_name}/{plugin_version}/{plugin_name}.wasm")).await.context("failed to get plugin from registry")?; let req = reqwest::get(format!("https://api-minio.front.kjuulh.io/churn-registry/{plugin_name}/{plugin_version}/{plugin_name}.wasm")).await.context("failed to get plugin from registry")?;
let mut stream = req.bytes_stream(); let mut stream = req.bytes_stream();
tracing::info!(
plugin_name = plugin_name,
plugin_path = plugin_path.display().to_string(),
"writing plugin to file"
);
let mut file = tokio::fs::File::create(&plugin_path).await?; let mut file = tokio::fs::File::create(&plugin_path).await?;
while let Some(chunk) = stream.next().await { while let Some(chunk) = stream.next().await {
let chunk = chunk?; let chunk = chunk?;
@ -177,14 +205,17 @@ struct ServerWasiView {
table: ResourceTable, table: ResourceTable,
ctx: WasiCtx, ctx: WasiCtx,
processes: ResourceTable, processes: ResourceTable,
agent_config: AgentConfig,
} }
impl ServerWasiView { impl ServerWasiView {
fn new() -> Self { fn new(agent_config: AgentConfig) -> Self {
let table = ResourceTable::new(); let table = ResourceTable::new();
let ctx = WasiCtxBuilder::new() let ctx = WasiCtxBuilder::new()
.inherit_stdio() .inherit_stdio()
.inherit_stdout()
.inherit_stderr()
.inherit_env() .inherit_env()
.inherit_network() .inherit_network()
.preopened_dir("/", "/", DirPerms::all(), FilePerms::all()) .preopened_dir("/", "/", DirPerms::all(), FilePerms::all())
@ -195,6 +226,7 @@ impl ServerWasiView {
table, table,
ctx, ctx,
processes: ResourceTable::default(), processes: ResourceTable::default(),
agent_config,
} }
} }
} }
@ -216,7 +248,9 @@ impl HostProcess for ServerWasiView {
async fn new( async fn new(
&mut self, &mut self,
) -> wasmtime::component::Resource<component::churn_tasks::process::Process> { ) -> wasmtime::component::Resource<component::churn_tasks::process::Process> {
self.processes.push(CustomProcess::default()).unwrap() self.processes
.push(CustomProcess::new(self.agent_config.clone()))
.unwrap()
} }
async fn run_process( async fn run_process(
@ -228,6 +262,15 @@ impl HostProcess for ServerWasiView {
process.run(inputs) process.run(inputs)
} }
async fn get_variable(
&mut self,
self_: wasmtime::component::Resource<component::churn_tasks::process::Process>,
key: wasmtime::component::__internal::String,
) -> String {
let process = self.processes.get(&self_).unwrap();
process.get_label(&key).unwrap()
}
async fn drop( async fn drop(
&mut self, &mut self,
rep: wasmtime::component::Resource<component::churn_tasks::process::Process>, rep: wasmtime::component::Resource<component::churn_tasks::process::Process>,

View File

@ -31,6 +31,13 @@ pub async fn execute() -> anyhow::Result<()> {
setup_labels.insert(k, v); setup_labels.insert(k, v);
} }
if !setup_labels.contains_key("node_name") {
setup_labels.insert(
"node_name".into(),
petname::petname(2, "-").expect("to be able to generate a valid petname"),
);
}
agent::setup_config(discovery, force, setup_labels).await?; agent::setup_config(discovery, force, setup_labels).await?;
tracing::info!("wrote default agent config"); tracing::info!("wrote default agent config");
} }

View File

@ -4,6 +4,7 @@ interface process {
resource process { resource process {
constructor(); constructor();
run-process: func(inputs: list<string>) -> string; run-process: func(inputs: list<string>) -> string;
get-variable: func(key: string) -> string;
} }
} }