diff --git a/crates/churn/src/agent/agent_state.rs b/crates/churn/src/agent/agent_state.rs index 5f7d59a..a9ebecc 100644 --- a/crates/churn/src/agent/agent_state.rs +++ b/crates/churn/src/agent/agent_state.rs @@ -44,7 +44,7 @@ impl State { let config = AgentConfig::new().await?; let discovery = DiscoveryClient::new(&config.discovery).discover().await?; let grpc = GrpcClient::new(&discovery.process_host); - let plugin_store = PluginStore::new()?; + let plugin_store = PluginStore::new(config.clone())?; let scheduled_tasks = ScheduledTasks::new(plugin_store.clone()); let scheduler = Scheduler::new(scheduled_tasks); let queue = AgentQueue::new(scheduler); diff --git a/crates/churn/src/agent/config.rs b/crates/churn/src/agent/config.rs index 88c0966..ae8ef3c 100644 --- a/crates/churn/src/agent/config.rs +++ b/crates/churn/src/agent/config.rs @@ -8,6 +8,8 @@ use uuid::Uuid; pub struct AgentConfig { pub agent_id: String, pub discovery: String, + + pub labels: BTreeMap, } impl AgentConfig { @@ -17,6 +19,7 @@ impl AgentConfig { Ok(Self { agent_id: config.agent_id, discovery: config.discovery, + labels: config.labels.unwrap_or_default(), }) } } diff --git a/crates/churn/src/agent/plugins.rs b/crates/churn/src/agent/plugins.rs index de21e20..5951f03 100644 --- a/crates/churn/src/agent/plugins.rs +++ b/crates/churn/src/agent/plugins.rs @@ -8,6 +8,9 @@ use wasmtime::component::*; use wasmtime::{Config, Engine, Store}; use wasmtime_wasi::{DirPerms, FilePerms, WasiCtx, WasiCtxBuilder, WasiView}; +use super::agent_state::State; +use super::config::AgentConfig; + wasmtime::component::bindgen!({ path: "wit/world.wit", //world: "churn", @@ -17,9 +20,14 @@ wasmtime::component::bindgen!({ } }); -#[derive(Default)] -pub struct CustomProcess {} +pub struct CustomProcess { + agent_config: AgentConfig, +} impl CustomProcess { + pub fn new(agent_config: AgentConfig) -> Self { + Self { agent_config } + } + pub fn run(&self, args: Vec) -> String { tracing::info!("calling function"); @@ -42,6 +50,10 @@ impl CustomProcess { } } } + + pub fn get_label(&self, label_key: &str) -> Option { + self.agent_config.labels.get(label_key).cloned() + } } #[derive(Clone)] @@ -50,9 +62,9 @@ pub struct PluginStore { } impl PluginStore { - pub fn new() -> anyhow::Result { + pub fn new(config: AgentConfig) -> anyhow::Result { Ok(Self { - inner: Arc::new(Mutex::new(InnerPluginStore::new()?)), + inner: Arc::new(Mutex::new(InnerPluginStore::new(config)?)), }) } @@ -74,7 +86,7 @@ pub struct InnerPluginStore { } impl InnerPluginStore { - pub fn new() -> anyhow::Result { + pub fn new(agent_config: AgentConfig) -> anyhow::Result { let mut config = Config::default(); config.wasm_component_model(true); config.async_support(true); @@ -89,7 +101,7 @@ impl InnerPluginStore { |state: &mut ServerWasiView| state, )?; - let wasi_view = ServerWasiView::new(); + let wasi_view = ServerWasiView::new(agent_config); let store = Store::new(&engine, wasi_view); Ok(Self { @@ -148,6 +160,12 @@ impl InnerPluginStore { let req = reqwest::get(format!("https://api-minio.front.kjuulh.io/churn-registry/{plugin_name}/{plugin_version}/{plugin_name}.wasm")).await.context("failed to get plugin from registry")?; let mut stream = req.bytes_stream(); + + tracing::info!( + plugin_name = plugin_name, + plugin_path = plugin_path.display().to_string(), + "writing plugin to file" + ); let mut file = tokio::fs::File::create(&plugin_path).await?; while let Some(chunk) = stream.next().await { let chunk = chunk?; @@ -177,10 +195,11 @@ struct ServerWasiView { table: ResourceTable, ctx: WasiCtx, processes: ResourceTable, + agent_config: AgentConfig, } impl ServerWasiView { - fn new() -> Self { + fn new(agent_config: AgentConfig) -> Self { let table = ResourceTable::new(); let ctx = WasiCtxBuilder::new() @@ -197,6 +216,7 @@ impl ServerWasiView { table, ctx, processes: ResourceTable::default(), + agent_config, } } } @@ -218,7 +238,9 @@ impl HostProcess for ServerWasiView { async fn new( &mut self, ) -> wasmtime::component::Resource { - self.processes.push(CustomProcess::default()).unwrap() + self.processes + .push(CustomProcess::new(self.agent_config.clone())) + .unwrap() } async fn run_process( @@ -230,6 +252,15 @@ impl HostProcess for ServerWasiView { process.run(inputs) } + async fn get_variable( + &mut self, + self_: wasmtime::component::Resource, + key: wasmtime::component::__internal::String, + ) -> String { + let process = self.processes.get(&self_).unwrap(); + process.get_label(&key).unwrap() + } + async fn drop( &mut self, rep: wasmtime::component::Resource, diff --git a/crates/churn/wit/world.wit b/crates/churn/wit/world.wit index 86cf72a..240a6fc 100644 --- a/crates/churn/wit/world.wit +++ b/crates/churn/wit/world.wit @@ -4,6 +4,7 @@ interface process { resource process { constructor(); run-process: func(inputs: list) -> string; + get-variable: func(key: string) -> string; } }