feat: enable having get variable from local setup
All checks were successful
continuous-integration/drone/push Build is passing

Signed-off-by: kjuulh <contact@kjuulh.io>
This commit is contained in:
Kasper Juul Hermansen 2025-01-04 01:28:32 +01:00
parent ceaad75057
commit 83294306a4
4 changed files with 44 additions and 9 deletions

View File

@ -44,7 +44,7 @@ impl State {
let config = AgentConfig::new().await?; let config = AgentConfig::new().await?;
let discovery = DiscoveryClient::new(&config.discovery).discover().await?; let discovery = DiscoveryClient::new(&config.discovery).discover().await?;
let grpc = GrpcClient::new(&discovery.process_host); let grpc = GrpcClient::new(&discovery.process_host);
let plugin_store = PluginStore::new()?; let plugin_store = PluginStore::new(config.clone())?;
let scheduled_tasks = ScheduledTasks::new(plugin_store.clone()); let scheduled_tasks = ScheduledTasks::new(plugin_store.clone());
let scheduler = Scheduler::new(scheduled_tasks); let scheduler = Scheduler::new(scheduled_tasks);
let queue = AgentQueue::new(scheduler); let queue = AgentQueue::new(scheduler);

View File

@ -8,6 +8,8 @@ use uuid::Uuid;
pub struct AgentConfig { pub struct AgentConfig {
pub agent_id: String, pub agent_id: String,
pub discovery: String, pub discovery: String,
pub labels: BTreeMap<String, String>,
} }
impl AgentConfig { impl AgentConfig {
@ -17,6 +19,7 @@ impl AgentConfig {
Ok(Self { Ok(Self {
agent_id: config.agent_id, agent_id: config.agent_id,
discovery: config.discovery, discovery: config.discovery,
labels: config.labels.unwrap_or_default(),
}) })
} }
} }

View File

@ -8,6 +8,9 @@ use wasmtime::component::*;
use wasmtime::{Config, Engine, Store}; use wasmtime::{Config, Engine, Store};
use wasmtime_wasi::{DirPerms, FilePerms, WasiCtx, WasiCtxBuilder, WasiView}; use wasmtime_wasi::{DirPerms, FilePerms, WasiCtx, WasiCtxBuilder, WasiView};
use super::agent_state::State;
use super::config::AgentConfig;
wasmtime::component::bindgen!({ wasmtime::component::bindgen!({
path: "wit/world.wit", path: "wit/world.wit",
//world: "churn", //world: "churn",
@ -17,9 +20,14 @@ wasmtime::component::bindgen!({
} }
}); });
#[derive(Default)] pub struct CustomProcess {
pub struct CustomProcess {} agent_config: AgentConfig,
}
impl CustomProcess { impl CustomProcess {
pub fn new(agent_config: AgentConfig) -> Self {
Self { agent_config }
}
pub fn run(&self, args: Vec<String>) -> String { pub fn run(&self, args: Vec<String>) -> String {
tracing::info!("calling function"); tracing::info!("calling function");
@ -42,6 +50,10 @@ impl CustomProcess {
} }
} }
} }
pub fn get_label(&self, label_key: &str) -> Option<String> {
self.agent_config.labels.get(label_key).cloned()
}
} }
#[derive(Clone)] #[derive(Clone)]
@ -50,9 +62,9 @@ pub struct PluginStore {
} }
impl PluginStore { impl PluginStore {
pub fn new() -> anyhow::Result<Self> { pub fn new(config: AgentConfig) -> anyhow::Result<Self> {
Ok(Self { Ok(Self {
inner: Arc::new(Mutex::new(InnerPluginStore::new()?)), inner: Arc::new(Mutex::new(InnerPluginStore::new(config)?)),
}) })
} }
@ -74,7 +86,7 @@ pub struct InnerPluginStore {
} }
impl InnerPluginStore { impl InnerPluginStore {
pub fn new() -> anyhow::Result<Self> { pub fn new(agent_config: AgentConfig) -> anyhow::Result<Self> {
let mut config = Config::default(); let mut config = Config::default();
config.wasm_component_model(true); config.wasm_component_model(true);
config.async_support(true); config.async_support(true);
@ -89,7 +101,7 @@ impl InnerPluginStore {
|state: &mut ServerWasiView| state, |state: &mut ServerWasiView| state,
)?; )?;
let wasi_view = ServerWasiView::new(); let wasi_view = ServerWasiView::new(agent_config);
let store = Store::new(&engine, wasi_view); let store = Store::new(&engine, wasi_view);
Ok(Self { Ok(Self {
@ -148,6 +160,12 @@ impl InnerPluginStore {
let req = reqwest::get(format!("https://api-minio.front.kjuulh.io/churn-registry/{plugin_name}/{plugin_version}/{plugin_name}.wasm")).await.context("failed to get plugin from registry")?; let req = reqwest::get(format!("https://api-minio.front.kjuulh.io/churn-registry/{plugin_name}/{plugin_version}/{plugin_name}.wasm")).await.context("failed to get plugin from registry")?;
let mut stream = req.bytes_stream(); let mut stream = req.bytes_stream();
tracing::info!(
plugin_name = plugin_name,
plugin_path = plugin_path.display().to_string(),
"writing plugin to file"
);
let mut file = tokio::fs::File::create(&plugin_path).await?; let mut file = tokio::fs::File::create(&plugin_path).await?;
while let Some(chunk) = stream.next().await { while let Some(chunk) = stream.next().await {
let chunk = chunk?; let chunk = chunk?;
@ -177,10 +195,11 @@ struct ServerWasiView {
table: ResourceTable, table: ResourceTable,
ctx: WasiCtx, ctx: WasiCtx,
processes: ResourceTable, processes: ResourceTable,
agent_config: AgentConfig,
} }
impl ServerWasiView { impl ServerWasiView {
fn new() -> Self { fn new(agent_config: AgentConfig) -> Self {
let table = ResourceTable::new(); let table = ResourceTable::new();
let ctx = WasiCtxBuilder::new() let ctx = WasiCtxBuilder::new()
@ -197,6 +216,7 @@ impl ServerWasiView {
table, table,
ctx, ctx,
processes: ResourceTable::default(), processes: ResourceTable::default(),
agent_config,
} }
} }
} }
@ -218,7 +238,9 @@ impl HostProcess for ServerWasiView {
async fn new( async fn new(
&mut self, &mut self,
) -> wasmtime::component::Resource<component::churn_tasks::process::Process> { ) -> wasmtime::component::Resource<component::churn_tasks::process::Process> {
self.processes.push(CustomProcess::default()).unwrap() self.processes
.push(CustomProcess::new(self.agent_config.clone()))
.unwrap()
} }
async fn run_process( async fn run_process(
@ -230,6 +252,15 @@ impl HostProcess for ServerWasiView {
process.run(inputs) process.run(inputs)
} }
async fn get_variable(
&mut self,
self_: wasmtime::component::Resource<component::churn_tasks::process::Process>,
key: wasmtime::component::__internal::String,
) -> String {
let process = self.processes.get(&self_).unwrap();
process.get_label(&key).unwrap()
}
async fn drop( async fn drop(
&mut self, &mut self,
rep: wasmtime::component::Resource<component::churn_tasks::process::Process>, rep: wasmtime::component::Resource<component::churn_tasks::process::Process>,

View File

@ -4,6 +4,7 @@ interface process {
resource process { resource process {
constructor(); constructor();
run-process: func(inputs: list<string>) -> string; run-process: func(inputs: list<string>) -> string;
get-variable: func(key: string) -> string;
} }
} }