feat: update with web assembly components
All checks were successful
continuous-integration/drone/push Build is passing

Signed-off-by: kjuulh <contact@kjuulh.io>
This commit is contained in:
Kasper Juul Hermansen 2024-12-01 22:21:17 +01:00
parent 2387a70778
commit db4cc98643
Signed by: kjuulh
GPG Key ID: D85D7535F18F35FA
14 changed files with 1500 additions and 25 deletions

5
.env
View File

@ -2,3 +2,8 @@ EXTERNAL_HOST=http://localhost:3000
PROCESS_HOST=http://localhost:7900 PROCESS_HOST=http://localhost:7900
SERVICE_HOST=127.0.0.1:3000 SERVICE_HOST=127.0.0.1:3000
DISCOVERY_HOST=http://127.0.0.1:3000 DISCOVERY_HOST=http://127.0.0.1:3000
#EXTERNAL_HOST=http://localhost:3000
#PROCESS_HOST=http://localhost:7900
#SERVICE_HOST=127.0.0.1:3000
#DISCOVERY_HOST=https://churn.prod.kjuulh.app

1258
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -32,5 +32,8 @@ reqwest = { version = "0.12.9", default-features = false, features = [
"http2", "http2",
"charset", "charset",
"native-tls-vendored", "native-tls-vendored",
"stream",
] } ] }
serde_json = "1.0.133" serde_json = "1.0.133"
wasmtime = "27.0.0"
wasmtime-wasi = "27.0.0"

View File

@ -12,6 +12,7 @@ mod config;
mod discovery_client; mod discovery_client;
mod event_handler; mod event_handler;
mod grpc_client; mod grpc_client;
mod plugins;
mod queue; mod queue;
mod refresh; mod refresh;
mod scheduler; mod scheduler;

View File

@ -1,16 +1,23 @@
use apt::AptTask; use apt::AptTask;
use plugin_task::PluginTask;
use super::task::IntoTask; use super::{plugins::PluginStore, task::IntoTask};
pub struct Plan {} pub mod apt;
pub mod plugin_task;
pub struct Plan {
store: PluginStore,
}
impl Plan { impl Plan {
pub fn new() -> Self { pub fn new(store: PluginStore) -> Self {
Self {} Self { store }
} }
pub async fn tasks(&self) -> anyhow::Result<Vec<impl IntoTask>> { pub async fn tasks(&self) -> anyhow::Result<Vec<impl IntoTask>> {
Ok(vec![AptTask::new()]) Ok(vec![
AptTask::new().into_task(),
PluginTask::new("alloy@0.1.0", self.store.clone()).into_task(),
])
} }
} }
pub mod apt;

View File

@ -12,8 +12,8 @@ impl AptTask {
#[async_trait::async_trait] #[async_trait::async_trait]
impl Task for AptTask { impl Task for AptTask {
fn id(&self) -> String { async fn id(&self) -> anyhow::Result<String> {
"apt".into() Ok("apt".into())
} }
async fn execute(&self) -> anyhow::Result<()> { async fn execute(&self) -> anyhow::Result<()> {

View File

@ -0,0 +1,30 @@
use crate::agent::{plugins::PluginStore, task::Task};
pub struct PluginTask {
plugin: String,
store: PluginStore,
}
impl PluginTask {
pub fn new(plugin: impl Into<String>, store: PluginStore) -> Self {
Self {
plugin: plugin.into(),
store,
}
}
}
#[async_trait::async_trait]
impl Task for PluginTask {
async fn id(&self) -> anyhow::Result<String> {
let id = self.store.id(&self.plugin).await?;
Ok(id)
}
async fn execute(&self) -> anyhow::Result<()> {
self.store.execute(&self.plugin).await?;
Ok(())
}
}

View File

@ -4,7 +4,8 @@ use crate::api::Discovery;
use super::{ use super::{
config::AgentConfig, discovery_client::DiscoveryClient, grpc_client::GrpcClient, config::AgentConfig, discovery_client::DiscoveryClient, grpc_client::GrpcClient,
handlers::scheduled_tasks::ScheduledTasks, queue::AgentQueue, scheduler::Scheduler, handlers::scheduled_tasks::ScheduledTasks, plugins::PluginStore, queue::AgentQueue,
scheduler::Scheduler,
}; };
#[derive(Clone)] #[derive(Clone)]
@ -35,6 +36,7 @@ pub struct State {
pub config: AgentConfig, pub config: AgentConfig,
pub discovery: Discovery, pub discovery: Discovery,
pub queue: AgentQueue, pub queue: AgentQueue,
pub plugin_store: PluginStore,
} }
impl State { impl State {
@ -42,7 +44,8 @@ impl State {
let config = AgentConfig::new().await?; let config = AgentConfig::new().await?;
let discovery = DiscoveryClient::new(&config.discovery).discover().await?; let discovery = DiscoveryClient::new(&config.discovery).discover().await?;
let grpc = GrpcClient::new(&discovery.process_host); let grpc = GrpcClient::new(&discovery.process_host);
let scheduled_tasks = ScheduledTasks::new(); let plugin_store = PluginStore::new()?;
let scheduled_tasks = ScheduledTasks::new(plugin_store.clone());
let scheduler = Scheduler::new(scheduled_tasks); let scheduler = Scheduler::new(scheduled_tasks);
let queue = AgentQueue::new(scheduler); let queue = AgentQueue::new(scheduler);
@ -51,6 +54,7 @@ impl State {
config, config,
discovery, discovery,
queue, queue,
plugin_store,
}) })
} }
} }

View File

@ -2,14 +2,17 @@ use std::collections::BTreeMap;
use crate::agent::{ use crate::agent::{
actions::Plan, actions::Plan,
plugins::PluginStore,
task::{ConcreteTask, IntoTask}, task::{ConcreteTask, IntoTask},
}; };
#[derive(Clone)] #[derive(Clone)]
pub struct ScheduledTasks {} pub struct ScheduledTasks {
store: PluginStore,
}
impl ScheduledTasks { impl ScheduledTasks {
pub fn new() -> Self { pub fn new(store: PluginStore) -> Self {
Self {} Self { store }
} }
pub async fn handle( pub async fn handle(
@ -19,7 +22,7 @@ impl ScheduledTasks {
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
tracing::info!("scheduling: {}", task); tracing::info!("scheduling: {}", task);
let plan = Plan::new(); let plan = Plan::new(self.store.clone());
let tasks: Vec<ConcreteTask> = plan let tasks: Vec<ConcreteTask> = plan
.tasks() .tasks()
.await? .await?
@ -28,12 +31,13 @@ impl ScheduledTasks {
.collect(); .collect();
for task in tasks { for task in tasks {
let id = task.id().await?;
if !task.should_run().await? { if !task.should_run().await? {
tracing::debug!(task = task.id(), "skipping run"); tracing::debug!(task = id, "skipping run");
continue; continue;
} }
tracing::info!(task = task.id(), "executing task"); tracing::info!(task = id, "executing task");
task.execute().await?; task.execute().await?;
} }

View File

@ -0,0 +1,161 @@
use anyhow::Context;
use futures::StreamExt;
use std::path::PathBuf;
use std::sync::{Arc, RwLock};
use tokio::io::AsyncWriteExt;
use tokio::sync::Mutex;
use wasmtime::component::*;
use wasmtime::{Config, Engine, Store};
use wasmtime_wasi::{WasiCtx, WasiCtxBuilder, WasiView};
wasmtime::component::bindgen!({
path: "wit/world.wit",
world: "churn",
async: true
});
#[derive(Clone)]
pub struct PluginStore {
inner: Arc<Mutex<InnerPluginStore>>,
}
impl PluginStore {
pub fn new() -> anyhow::Result<Self> {
Ok(Self {
inner: Arc::new(Mutex::new(InnerPluginStore::new()?)),
})
}
pub async fn id(&self, plugin: &str) -> anyhow::Result<String> {
let mut inner = self.inner.lock().await;
inner.id(plugin).await
}
pub async fn execute(&self, plugin: &str) -> anyhow::Result<()> {
let mut inner = self.inner.lock().await;
inner.execute(plugin).await
}
}
pub struct InnerPluginStore {
store: wasmtime::Store<ServerWasiView>,
linker: wasmtime::component::Linker<ServerWasiView>,
engine: wasmtime::Engine,
}
impl InnerPluginStore {
pub fn new() -> anyhow::Result<Self> {
let mut config = Config::default();
config.wasm_component_model(true);
config.async_support(true);
let engine = Engine::new(&config)?;
let mut linker: wasmtime::component::Linker<ServerWasiView> = Linker::new(&engine);
// Add the command world (aka WASI CLI) to the linker
wasmtime_wasi::add_to_linker_async(&mut linker).context("Failed to link command world")?;
let wasi_view = ServerWasiView::new();
let store = Store::new(&engine, wasi_view);
Ok(Self {
store,
linker,
engine,
})
}
pub async fn id(&mut self, plugin: &str) -> anyhow::Result<String> {
let plugin = self.ensure_plugin(plugin).await?;
plugin
.interface0
.call_id(&mut self.store)
.await
.context("Failed to call add function")
}
pub async fn execute(&mut self, plugin: &str) -> anyhow::Result<()> {
let plugin = self.ensure_plugin(plugin).await?;
plugin
.interface0
.call_execute(&mut self.store)
.await
.context("Failed to call add function")
}
async fn ensure_plugin(&mut self, plugin: &str) -> anyhow::Result<Churn> {
let cache = dirs::cache_dir()
.ok_or(anyhow::anyhow!("failed to find cache dir"))?
.join("io.kjuulh.churn");
let (plugin_name, plugin_version) = plugin.split_once("@").unwrap_or((plugin, "latest"));
let plugin_path = cache
.join("plugins")
.join(plugin_name)
.join(plugin_version)
.join(format!("{plugin_name}.wasm"));
let no_cache: bool = std::env::var("CHURN_NO_CACHE")
.unwrap_or("false".into())
.parse()?;
if !plugin_path.exists() || no_cache {
tracing::info!(
plugin_name = plugin_name,
plugin_version = plugin_version,
"downloading plugin"
);
if let Some(parent) = plugin_path.parent() {
tokio::fs::create_dir_all(parent).await?;
}
let req = reqwest::get(format!("https://api-minio.front.kjuulh.io/churn-registry/{plugin_name}/{plugin_version}/{plugin_name}.wasm")).await.context("failed to get plugin from registry")?;
let mut stream = req.bytes_stream();
let mut file = tokio::fs::File::create(&plugin_path).await?;
while let Some(chunk) = stream.next().await {
let chunk = chunk?;
file.write_all(&chunk).await?;
}
file.flush().await?;
}
let component =
Component::from_file(&self.engine, plugin_path).context("Component file not found")?;
tracing::debug!(
plugin_name = plugin_name,
plugin_version = plugin_version,
"instantiating plugin"
);
let instance = Churn::instantiate_async(&mut self.store, &component, &self.linker)
.await
.context("Failed to instantiate the example world")?;
Ok(instance)
}
}
struct ServerWasiView {
table: ResourceTable,
ctx: WasiCtx,
}
impl ServerWasiView {
fn new() -> Self {
let table = ResourceTable::new();
let ctx = WasiCtxBuilder::new().inherit_stdio().build();
Self { table, ctx }
}
}
impl WasiView for ServerWasiView {
fn table(&mut self) -> &mut ResourceTable {
&mut self.table
}
fn ctx(&mut self) -> &mut WasiCtx {
&mut self.ctx
}
}

View File

@ -30,7 +30,8 @@ impl notmad::Component for AgentRefresh {
&self, &self,
cancellation_token: tokio_util::sync::CancellationToken, cancellation_token: tokio_util::sync::CancellationToken,
) -> Result<(), notmad::MadError> { ) -> Result<(), notmad::MadError> {
let cancel = nodrift::schedule_drifter(std::time::Duration::from_secs(60), self.clone()); let cancel =
nodrift::schedule_drifter(std::time::Duration::from_secs(60 * 10), self.clone());
tokio::select! { tokio::select! {
_ = cancel.cancelled() => {}, _ = cancel.cancelled() => {},
_ = cancellation_token.cancelled() => { _ = cancellation_token.cancelled() => {

View File

@ -2,7 +2,7 @@ use std::sync::Arc;
#[async_trait::async_trait] #[async_trait::async_trait]
pub trait Task { pub trait Task {
fn id(&self) -> String; async fn id(&self) -> anyhow::Result<String>;
async fn should_run(&self) -> anyhow::Result<bool> { async fn should_run(&self) -> anyhow::Result<bool> {
Ok(true) Ok(true)
} }

View File

@ -65,7 +65,7 @@ impl crate::grpc::churn_server::Churn for GrpcServer {
) -> std::result::Result<tonic::Response<Self::ListenEventsStream>, tonic::Status> { ) -> std::result::Result<tonic::Response<Self::ListenEventsStream>, tonic::Status> {
let (tx, rx) = tokio::sync::mpsc::channel(128); let (tx, rx) = tokio::sync::mpsc::channel(128);
tokio::spawn(async move { tokio::spawn(async move {
let mut interval = tokio::time::interval(std::time::Duration::from_secs(10)); let mut interval = tokio::time::interval(std::time::Duration::from_secs(60 * 10));
loop { loop {
interval.tick().await; interval.tick().await;

View File

@ -0,0 +1,11 @@
package component:churn-tasks@0.1.0;
interface task {
id: func() -> string;
should-run: func() -> bool;
execute: func();
}
world churn {
export task;
}