Compare commits
49 Commits
Author | SHA1 | Date | |
---|---|---|---|
c5e5307682 | |||
5fb59ad992 | |||
e21663c8bd | |||
39a01778b2 | |||
f1cdf3ae20 | |||
355587234e | |||
21a13f3444 | |||
5e1b585a2d | |||
94025a02ce | |||
db4cc98643 | |||
2387a70778 | |||
d6fdda0e4e | |||
974e1ee0d6 | |||
717b052a88 | |||
9b52376e7a | |||
7b222af1dd | |||
150c7c3c98 | |||
8b064c2169 | |||
879737eedd | |||
818cc6c671 | |||
e759239243 | |||
b37674987e | |||
1dea5f29ac | |||
38a0a9fba4 | |||
f7c7aef96b | |||
7fefca47c9 | |||
36b1335fe9 | |||
eeabeda036 | |||
569fee52e6 | |||
b0ec41fa93 | |||
77f5ec7475 | |||
f3926d0885 | |||
b48b1ec886 | |||
79a8a34499 | |||
c9bbeb2439 | |||
b19ff0b7e5 | |||
eeaf59ac63 | |||
6f04d0cdda | |||
43c5fa1731 | |||
9badf8e193 | |||
55a0d294c5 | |||
8508d3f640 | |||
dd8ade9798 | |||
f1e6268a2d | |||
6647bb89be | |||
ea5adb2f93 | |||
ee323e99e8 | |||
c4434fd841 | |||
cb340ffb1e |
8
.env
8
.env
@ -1 +1,9 @@
|
||||
EXTERNAL_HOST=http://localhost:3000
|
||||
PROCESS_HOST=http://localhost:7900
|
||||
SERVICE_HOST=127.0.0.1:3000
|
||||
DISCOVERY_HOST=http://127.0.0.1:3000
|
||||
|
||||
#EXTERNAL_HOST=http://localhost:3000
|
||||
#PROCESS_HOST=http://localhost:7900
|
||||
#SERVICE_HOST=127.0.0.1:3000
|
||||
#DISCOVERY_HOST=https://churn.prod.kjuulh.app
|
||||
|
2253
Cargo.lock
generated
2253
Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
8
buf.gen.yaml
Normal file
8
buf.gen.yaml
Normal file
@ -0,0 +1,8 @@
|
||||
version: v2
|
||||
plugins:
|
||||
- remote: buf.build/community/neoeinstein-prost
|
||||
out: crates/churn/src/grpc
|
||||
- remote: buf.build/community/neoeinstein-tonic:v0.4.0
|
||||
out: crates/churn/src/grpc
|
||||
inputs:
|
||||
- directory: crates/churn/proto
|
@ -14,8 +14,8 @@ axum.workspace = true
|
||||
|
||||
serde = { version = "1.0.197", features = ["derive"] }
|
||||
uuid = { version = "1.7.0", features = ["v4"] }
|
||||
tower-http = { version = "0.5.2", features = ["cors", "trace"] }
|
||||
notmad = "0.6.0"
|
||||
tower-http = { version = "0.6.0", features = ["cors", "trace"] }
|
||||
notmad = "0.7.1"
|
||||
tokio-util = "0.7.12"
|
||||
async-trait = "0.1.83"
|
||||
nodrift = "0.2.0"
|
||||
@ -24,3 +24,16 @@ prost-types = "0.13.3"
|
||||
prost = "0.13.3"
|
||||
bytes = "1.8.0"
|
||||
tonic = { version = "0.12.3", features = ["tls", "tls-roots"] }
|
||||
toml = "0.8.19"
|
||||
dirs = "5.0.1"
|
||||
futures = "0.3.31"
|
||||
reqwest = { version = "0.12.9", default-features = false, features = [
|
||||
"json",
|
||||
"http2",
|
||||
"charset",
|
||||
"native-tls-vendored",
|
||||
"stream",
|
||||
] }
|
||||
serde_json = "1.0.133"
|
||||
wasmtime = "27.0.0"
|
||||
wasmtime-wasi = "27.0.0"
|
||||
|
35
crates/churn/proto/churn/v1/churn.proto
Normal file
35
crates/churn/proto/churn/v1/churn.proto
Normal file
@ -0,0 +1,35 @@
|
||||
syntax = "proto3";
|
||||
|
||||
package churn.v1;
|
||||
|
||||
service Churn {
|
||||
rpc GetKey(GetKeyRequest) returns (GetKeyResponse);
|
||||
rpc SetKey(SetKeyRequest) returns (SetKeyResponse);
|
||||
rpc ListenEvents(ListenEventsRequest) returns (stream ListenEventsResponse);
|
||||
}
|
||||
|
||||
message GetKeyRequest {
|
||||
string namespace = 1;
|
||||
optional string id = 2;
|
||||
string key = 3;
|
||||
}
|
||||
message GetKeyResponse {
|
||||
optional string value = 1;
|
||||
}
|
||||
|
||||
message SetKeyRequest {
|
||||
string namespace = 1;
|
||||
optional string id = 2;
|
||||
string key = 3;
|
||||
string value = 4;
|
||||
}
|
||||
message SetKeyResponse {}
|
||||
|
||||
message ListenEventsRequest {
|
||||
string namespace = 1;
|
||||
optional string id = 2;
|
||||
}
|
||||
message ListenEventsResponse {
|
||||
string id = 1;
|
||||
string value = 2;
|
||||
}
|
@ -1,15 +1,33 @@
|
||||
use agent_state::AgentState;
|
||||
use event_handler::EventHandler;
|
||||
use refresh::AgentRefresh;
|
||||
|
||||
pub use config::setup_config;
|
||||
|
||||
pub mod models;
|
||||
pub(crate) mod task;
|
||||
|
||||
mod agent_state;
|
||||
|
||||
mod config;
|
||||
mod discovery_client;
|
||||
mod event_handler;
|
||||
mod grpc_client;
|
||||
mod plugins;
|
||||
mod queue;
|
||||
mod refresh;
|
||||
mod scheduler;
|
||||
|
||||
pub async fn execute(host: impl Into<String>) -> anyhow::Result<()> {
|
||||
mod handlers;
|
||||
|
||||
mod actions;
|
||||
|
||||
pub async fn execute() -> anyhow::Result<()> {
|
||||
let state = AgentState::new().await?;
|
||||
|
||||
notmad::Mad::builder()
|
||||
.add(AgentRefresh::new(&state, host))
|
||||
.add(AgentRefresh::new(&state))
|
||||
.add(EventHandler::new(&state))
|
||||
.add(state.queue.clone())
|
||||
.cancellation(Some(std::time::Duration::from_secs(2)))
|
||||
.run()
|
||||
.await?;
|
||||
|
23
crates/churn/src/agent/actions.rs
Normal file
23
crates/churn/src/agent/actions.rs
Normal file
@ -0,0 +1,23 @@
|
||||
use apt::AptTask;
|
||||
use plugin_task::PluginTask;
|
||||
|
||||
use super::{plugins::PluginStore, task::IntoTask};
|
||||
|
||||
pub mod apt;
|
||||
pub mod plugin_task;
|
||||
|
||||
pub struct Plan {
|
||||
store: PluginStore,
|
||||
}
|
||||
impl Plan {
|
||||
pub fn new(store: PluginStore) -> Self {
|
||||
Self { store }
|
||||
}
|
||||
|
||||
pub async fn tasks(&self) -> anyhow::Result<Vec<impl IntoTask>> {
|
||||
Ok(vec![
|
||||
AptTask::new().into_task(),
|
||||
PluginTask::new("alloy@0.1.0", self.store.clone()).into_task(),
|
||||
])
|
||||
}
|
||||
}
|
49
crates/churn/src/agent/actions/apt.rs
Normal file
49
crates/churn/src/agent/actions/apt.rs
Normal file
@ -0,0 +1,49 @@
|
||||
use anyhow::Context;
|
||||
|
||||
use crate::agent::task::Task;
|
||||
|
||||
pub struct AptTask {}
|
||||
|
||||
impl AptTask {
|
||||
pub fn new() -> Self {
|
||||
Self {}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl Task for AptTask {
|
||||
async fn id(&self) -> anyhow::Result<String> {
|
||||
Ok("apt".into())
|
||||
}
|
||||
|
||||
async fn execute(&self) -> anyhow::Result<()> {
|
||||
let mut cmd = tokio::process::Command::new("apt-get");
|
||||
cmd.args(["update", "-q"]);
|
||||
let output = cmd.output().await.context("failed to run apt update")?;
|
||||
match output.status.success() {
|
||||
true => tracing::info!("successfully ran apt update"),
|
||||
false => {
|
||||
anyhow::bail!(
|
||||
"failed to run apt update: {}",
|
||||
std::str::from_utf8(&output.stderr)?
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
let mut cmd = tokio::process::Command::new("apt-get");
|
||||
cmd.env("DEBIAN_FRONTEND", "noninteractive")
|
||||
.args(["upgrade", "-y"]);
|
||||
let output = cmd.output().await.context("failed to run apt upgrade")?;
|
||||
match output.status.success() {
|
||||
true => tracing::info!("successfully ran apt upgrade"),
|
||||
false => {
|
||||
anyhow::bail!(
|
||||
"failed to run apt upgrade: {}",
|
||||
std::str::from_utf8(&output.stderr)?
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
30
crates/churn/src/agent/actions/plugin_task.rs
Normal file
30
crates/churn/src/agent/actions/plugin_task.rs
Normal file
@ -0,0 +1,30 @@
|
||||
use crate::agent::{plugins::PluginStore, task::Task};
|
||||
|
||||
pub struct PluginTask {
|
||||
plugin: String,
|
||||
store: PluginStore,
|
||||
}
|
||||
|
||||
impl PluginTask {
|
||||
pub fn new(plugin: impl Into<String>, store: PluginStore) -> Self {
|
||||
Self {
|
||||
plugin: plugin.into(),
|
||||
store,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl Task for PluginTask {
|
||||
async fn id(&self) -> anyhow::Result<String> {
|
||||
let id = self.store.id(&self.plugin).await?;
|
||||
|
||||
Ok(id)
|
||||
}
|
||||
|
||||
async fn execute(&self) -> anyhow::Result<()> {
|
||||
self.store.execute(&self.plugin).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
@ -1,5 +1,13 @@
|
||||
use std::{ops::Deref, sync::Arc};
|
||||
|
||||
use crate::api::Discovery;
|
||||
|
||||
use super::{
|
||||
config::AgentConfig, discovery_client::DiscoveryClient, grpc_client::GrpcClient,
|
||||
handlers::scheduled_tasks::ScheduledTasks, plugins::PluginStore, queue::AgentQueue,
|
||||
scheduler::Scheduler,
|
||||
};
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct AgentState(Arc<State>);
|
||||
|
||||
@ -23,10 +31,30 @@ impl Deref for AgentState {
|
||||
}
|
||||
}
|
||||
|
||||
pub struct State {}
|
||||
pub struct State {
|
||||
pub grpc: GrpcClient,
|
||||
pub config: AgentConfig,
|
||||
pub discovery: Discovery,
|
||||
pub queue: AgentQueue,
|
||||
pub plugin_store: PluginStore,
|
||||
}
|
||||
|
||||
impl State {
|
||||
pub async fn new() -> anyhow::Result<Self> {
|
||||
Ok(Self {})
|
||||
let config = AgentConfig::new().await?;
|
||||
let discovery = DiscoveryClient::new(&config.discovery).discover().await?;
|
||||
let grpc = GrpcClient::new(&discovery.process_host);
|
||||
let plugin_store = PluginStore::new()?;
|
||||
let scheduled_tasks = ScheduledTasks::new(plugin_store.clone());
|
||||
let scheduler = Scheduler::new(scheduled_tasks);
|
||||
let queue = AgentQueue::new(scheduler);
|
||||
|
||||
Ok(Self {
|
||||
grpc,
|
||||
config,
|
||||
discovery,
|
||||
queue,
|
||||
plugin_store,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
93
crates/churn/src/agent/config.rs
Normal file
93
crates/churn/src/agent/config.rs
Normal file
@ -0,0 +1,93 @@
|
||||
use std::collections::BTreeMap;
|
||||
|
||||
use anyhow::Context;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use uuid::Uuid;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct AgentConfig {
|
||||
pub agent_id: String,
|
||||
pub discovery: String,
|
||||
}
|
||||
|
||||
impl AgentConfig {
|
||||
pub async fn new() -> anyhow::Result<Self> {
|
||||
let config = ConfigFile::load().await?;
|
||||
|
||||
Ok(Self {
|
||||
agent_id: config.agent_id,
|
||||
discovery: config.discovery,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
struct ConfigFile {
|
||||
agent_id: String,
|
||||
discovery: String,
|
||||
|
||||
labels: Option<BTreeMap<String, String>>,
|
||||
}
|
||||
|
||||
impl ConfigFile {
|
||||
pub async fn load() -> anyhow::Result<Self> {
|
||||
let directory = dirs::data_dir()
|
||||
.ok_or(anyhow::anyhow!("failed to get data dir"))?
|
||||
.join("io.kjuulh.churn-agent")
|
||||
.join("churn-agent.toml");
|
||||
|
||||
if !directory.exists() {
|
||||
anyhow::bail!(
|
||||
"No churn agent file was setup, run `churn agent setup` to setup the defaults"
|
||||
)
|
||||
}
|
||||
|
||||
let contents = tokio::fs::read_to_string(&directory).await?;
|
||||
|
||||
toml::from_str(&contents).context("failed to parse the contents of the churn agent config")
|
||||
}
|
||||
|
||||
pub async fn write_default(
|
||||
discovery: impl Into<String>,
|
||||
force: bool,
|
||||
labels: impl Into<BTreeMap<String, String>>,
|
||||
) -> anyhow::Result<Self> {
|
||||
let s = Self {
|
||||
agent_id: Uuid::new_v4().to_string(),
|
||||
discovery: discovery.into(),
|
||||
labels: Some(labels.into()),
|
||||
};
|
||||
|
||||
let directory = dirs::data_dir()
|
||||
.ok_or(anyhow::anyhow!("failed to get data dir"))?
|
||||
.join("io.kjuulh.churn-agent")
|
||||
.join("churn-agent.toml");
|
||||
|
||||
if let Some(parent) = directory.parent() {
|
||||
tokio::fs::create_dir_all(&parent).await?;
|
||||
}
|
||||
|
||||
if !force && directory.exists() {
|
||||
anyhow::bail!("config file already exists, consider moving it to a backup before trying again: {}", directory.display());
|
||||
}
|
||||
|
||||
let contents = toml::to_string_pretty(&s)
|
||||
.context("failed to convert default implementation to string")?;
|
||||
|
||||
tokio::fs::write(directory, contents.as_bytes())
|
||||
.await
|
||||
.context("failed to write to agent file")?;
|
||||
|
||||
Ok(s)
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn setup_config(
|
||||
discovery: impl Into<String>,
|
||||
force: bool,
|
||||
labels: impl Into<BTreeMap<String, String>>,
|
||||
) -> anyhow::Result<()> {
|
||||
ConfigFile::write_default(discovery, force, labels).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
21
crates/churn/src/agent/discovery_client.rs
Normal file
21
crates/churn/src/agent/discovery_client.rs
Normal file
@ -0,0 +1,21 @@
|
||||
use crate::api::Discovery;
|
||||
|
||||
pub struct DiscoveryClient {
|
||||
host: String,
|
||||
}
|
||||
|
||||
impl DiscoveryClient {
|
||||
pub fn new(discovery_host: impl Into<String>) -> Self {
|
||||
Self {
|
||||
host: discovery_host.into(),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn discover(&self) -> anyhow::Result<Discovery> {
|
||||
tracing::info!(
|
||||
"getting details from discovery endpoint: {}/discovery",
|
||||
self.host.trim_end_matches('/')
|
||||
);
|
||||
crate::api::Discovery::get_from_host(&self.host).await
|
||||
}
|
||||
}
|
63
crates/churn/src/agent/event_handler.rs
Normal file
63
crates/churn/src/agent/event_handler.rs
Normal file
@ -0,0 +1,63 @@
|
||||
use notmad::{Component, MadError};
|
||||
|
||||
use crate::agent::models::Commands;
|
||||
|
||||
use super::{
|
||||
agent_state::AgentState, config::AgentConfig, grpc_client::GrpcClient, queue::AgentQueue,
|
||||
};
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct EventHandler {
|
||||
config: AgentConfig,
|
||||
grpc: GrpcClient,
|
||||
queue: AgentQueue,
|
||||
}
|
||||
|
||||
impl EventHandler {
|
||||
pub fn new(state: impl Into<AgentState>) -> Self {
|
||||
let state: AgentState = state.into();
|
||||
|
||||
Self {
|
||||
config: state.config.clone(),
|
||||
grpc: state.grpc.clone(),
|
||||
queue: state.queue.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl Component for EventHandler {
|
||||
fn name(&self) -> Option<String> {
|
||||
Some("event_handler".into())
|
||||
}
|
||||
|
||||
async fn run(
|
||||
&self,
|
||||
cancellation_token: tokio_util::sync::CancellationToken,
|
||||
) -> Result<(), notmad::MadError> {
|
||||
tokio::select! {
|
||||
_ = cancellation_token.cancelled() => {},
|
||||
res = self.grpc.listen_events("agents", None::<String>, self.clone()) => {
|
||||
res.map_err(MadError::Inner)?;
|
||||
},
|
||||
res = self.grpc.listen_events("agents", Some(&self.config.agent_id), self.clone()) => {
|
||||
res.map_err(MadError::Inner)?;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl super::grpc_client::ListenEventsExecutor for EventHandler {
|
||||
async fn execute(&self, event: crate::grpc::ListenEventsResponse) -> anyhow::Result<()> {
|
||||
tracing::info!(value = event.id, "received event");
|
||||
|
||||
let event: Commands = serde_json::from_str(&event.value)?;
|
||||
|
||||
self.queue.publish(event).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
108
crates/churn/src/agent/grpc_client.rs
Normal file
108
crates/churn/src/agent/grpc_client.rs
Normal file
@ -0,0 +1,108 @@
|
||||
use tonic::transport::{Channel, ClientTlsConfig};
|
||||
|
||||
use crate::grpc::{churn_client::ChurnClient, *};
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct GrpcClient {
|
||||
host: String,
|
||||
}
|
||||
|
||||
impl GrpcClient {
|
||||
pub fn new(host: impl Into<String>) -> Self {
|
||||
Self { host: host.into() }
|
||||
}
|
||||
|
||||
pub async fn get_key(
|
||||
&self,
|
||||
namespace: &str,
|
||||
id: Option<impl Into<String>>,
|
||||
key: &str,
|
||||
) -> anyhow::Result<Option<String>> {
|
||||
let mut client = self.client().await?;
|
||||
|
||||
let resp = client
|
||||
.get_key(GetKeyRequest {
|
||||
key: key.into(),
|
||||
namespace: namespace.into(),
|
||||
id: id.map(|i| i.into()),
|
||||
})
|
||||
.await?;
|
||||
let resp = resp.into_inner();
|
||||
|
||||
Ok(resp.value)
|
||||
}
|
||||
|
||||
pub async fn set_key(
|
||||
&self,
|
||||
namespace: &str,
|
||||
id: Option<impl Into<String>>,
|
||||
key: &str,
|
||||
value: &str,
|
||||
) -> anyhow::Result<()> {
|
||||
let mut client = self.client().await?;
|
||||
|
||||
client
|
||||
.set_key(SetKeyRequest {
|
||||
key: key.into(),
|
||||
value: value.into(),
|
||||
namespace: namespace.into(),
|
||||
id: id.map(|i| i.into()),
|
||||
})
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn listen_events(
|
||||
&self,
|
||||
namespace: &str,
|
||||
id: Option<impl Into<String>>,
|
||||
exec: impl ListenEventsExecutor,
|
||||
) -> anyhow::Result<()> {
|
||||
let mut client = self.client().await?;
|
||||
|
||||
tracing::debug!("creating stream for listening to events on: {}", namespace);
|
||||
let resp = client
|
||||
.listen_events(ListenEventsRequest {
|
||||
namespace: namespace.into(),
|
||||
id: id.map(|i| i.into()),
|
||||
})
|
||||
.await
|
||||
.inspect_err(|e| tracing::warn!("failed to establish a connection: {}", e))?;
|
||||
|
||||
tracing::debug!("setup stream: {}", namespace);
|
||||
let mut inner = resp.into_inner();
|
||||
while let Ok(Some(message)) = inner.message().await {
|
||||
tracing::debug!("received message: {}", namespace);
|
||||
exec.execute(message)
|
||||
.await
|
||||
.inspect_err(|e| tracing::warn!("failed to handle message: {}", e))?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn client(&self) -> anyhow::Result<ChurnClient<tonic::transport::Channel>> {
|
||||
tracing::debug!("setting up client");
|
||||
let channel = if self.host.starts_with("https") {
|
||||
Channel::from_shared(self.host.to_owned())?
|
||||
.tls_config(ClientTlsConfig::new().with_native_roots())?
|
||||
.connect_timeout(std::time::Duration::from_secs(5))
|
||||
.connect()
|
||||
.await?
|
||||
} else {
|
||||
Channel::from_shared(self.host.to_owned())?
|
||||
.connect()
|
||||
.await?
|
||||
};
|
||||
|
||||
let client = ChurnClient::new(channel);
|
||||
|
||||
Ok(client)
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
pub trait ListenEventsExecutor {
|
||||
async fn execute(&self, event: ListenEventsResponse) -> anyhow::Result<()>;
|
||||
}
|
1
crates/churn/src/agent/handlers.rs
Normal file
1
crates/churn/src/agent/handlers.rs
Normal file
@ -0,0 +1 @@
|
||||
pub mod scheduled_tasks;
|
46
crates/churn/src/agent/handlers/scheduled_tasks.rs
Normal file
46
crates/churn/src/agent/handlers/scheduled_tasks.rs
Normal file
@ -0,0 +1,46 @@
|
||||
use std::collections::BTreeMap;
|
||||
|
||||
use crate::agent::{
|
||||
actions::Plan,
|
||||
plugins::PluginStore,
|
||||
task::{ConcreteTask, IntoTask},
|
||||
};
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct ScheduledTasks {
|
||||
store: PluginStore,
|
||||
}
|
||||
impl ScheduledTasks {
|
||||
pub fn new(store: PluginStore) -> Self {
|
||||
Self { store }
|
||||
}
|
||||
|
||||
pub async fn handle(
|
||||
&self,
|
||||
task: &str,
|
||||
_properties: BTreeMap<String, String>,
|
||||
) -> anyhow::Result<()> {
|
||||
tracing::info!("scheduling: {}", task);
|
||||
|
||||
let plan = Plan::new(self.store.clone());
|
||||
let tasks: Vec<ConcreteTask> = plan
|
||||
.tasks()
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(|i| i.into_task())
|
||||
.collect();
|
||||
|
||||
for task in tasks {
|
||||
let id = task.id().await?;
|
||||
if !task.should_run().await? {
|
||||
tracing::debug!(task = id, "skipping run");
|
||||
continue;
|
||||
}
|
||||
|
||||
tracing::info!(task = id, "executing task");
|
||||
task.execute().await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
20
crates/churn/src/agent/models.rs
Normal file
20
crates/churn/src/agent/models.rs
Normal file
@ -0,0 +1,20 @@
|
||||
use std::{collections::BTreeMap, fmt::Display};
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
#[derive(Serialize, Deserialize, Clone, Debug)]
|
||||
#[serde(tag = "type")]
|
||||
pub enum Commands {
|
||||
ScheduleTask {
|
||||
task: String,
|
||||
properties: BTreeMap<String, String>,
|
||||
},
|
||||
}
|
||||
|
||||
impl Display for Commands {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.write_str(match self {
|
||||
Commands::ScheduleTask { .. } => "schedule_task",
|
||||
})
|
||||
}
|
||||
}
|
239
crates/churn/src/agent/plugins.rs
Normal file
239
crates/churn/src/agent/plugins.rs
Normal file
@ -0,0 +1,239 @@
|
||||
use anyhow::Context;
|
||||
use component::churn_tasks::process::HostProcess;
|
||||
use futures::StreamExt;
|
||||
use std::sync::Arc;
|
||||
use tokio::io::AsyncWriteExt;
|
||||
use tokio::sync::Mutex;
|
||||
use wasmtime::component::*;
|
||||
use wasmtime::{Config, Engine, Store};
|
||||
use wasmtime_wasi::{DirPerms, FilePerms, WasiCtx, WasiCtxBuilder, WasiView};
|
||||
|
||||
wasmtime::component::bindgen!({
|
||||
path: "wit/world.wit",
|
||||
//world: "churn",
|
||||
async: true,
|
||||
with: {
|
||||
"component:churn-tasks/process/process": CustomProcess
|
||||
}
|
||||
});
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct CustomProcess {}
|
||||
impl CustomProcess {
|
||||
pub fn run(&self, args: Vec<String>) -> String {
|
||||
tracing::info!("calling function");
|
||||
|
||||
match args.split_first() {
|
||||
Some((item, rest)) => {
|
||||
let mut cmd = std::process::Command::new(item);
|
||||
match cmd.args(rest).output() {
|
||||
Ok(output) => std::str::from_utf8(&output.stdout)
|
||||
.expect("to be able to parse utf8")
|
||||
.to_string(),
|
||||
Err(e) => {
|
||||
tracing::error!("command failed with output: {e}");
|
||||
e.to_string()
|
||||
}
|
||||
}
|
||||
}
|
||||
None => {
|
||||
tracing::warn!("failed to call function because it is empty");
|
||||
panic!("failed to call function because it is empty")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct PluginStore {
|
||||
inner: Arc<Mutex<InnerPluginStore>>,
|
||||
}
|
||||
|
||||
impl PluginStore {
|
||||
pub fn new() -> anyhow::Result<Self> {
|
||||
Ok(Self {
|
||||
inner: Arc::new(Mutex::new(InnerPluginStore::new()?)),
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn id(&self, plugin: &str) -> anyhow::Result<String> {
|
||||
let mut inner = self.inner.lock().await;
|
||||
inner.id(plugin).await
|
||||
}
|
||||
|
||||
pub async fn execute(&self, plugin: &str) -> anyhow::Result<()> {
|
||||
let mut inner = self.inner.lock().await;
|
||||
inner.execute(plugin).await
|
||||
}
|
||||
}
|
||||
|
||||
pub struct InnerPluginStore {
|
||||
store: wasmtime::Store<ServerWasiView>,
|
||||
linker: wasmtime::component::Linker<ServerWasiView>,
|
||||
engine: wasmtime::Engine,
|
||||
}
|
||||
|
||||
impl InnerPluginStore {
|
||||
pub fn new() -> anyhow::Result<Self> {
|
||||
let mut config = Config::default();
|
||||
config.wasm_component_model(true);
|
||||
config.async_support(true);
|
||||
let engine = Engine::new(&config)?;
|
||||
let mut linker: wasmtime::component::Linker<ServerWasiView> = Linker::new(&engine);
|
||||
|
||||
// Add the command world (aka WASI CLI) to the linker
|
||||
wasmtime_wasi::add_to_linker_async(&mut linker).context("Failed to link command world")?;
|
||||
|
||||
component::churn_tasks::process::add_to_linker(
|
||||
&mut linker,
|
||||
|state: &mut ServerWasiView| state,
|
||||
)?;
|
||||
|
||||
let wasi_view = ServerWasiView::new();
|
||||
let store = Store::new(&engine, wasi_view);
|
||||
|
||||
Ok(Self {
|
||||
store,
|
||||
linker,
|
||||
engine,
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn id(&mut self, plugin: &str) -> anyhow::Result<String> {
|
||||
let plugin = self.ensure_plugin(plugin).await?;
|
||||
|
||||
plugin
|
||||
.interface0
|
||||
.call_id(&mut self.store)
|
||||
.await
|
||||
.context("Failed to call add function")
|
||||
}
|
||||
|
||||
pub async fn execute(&mut self, plugin: &str) -> anyhow::Result<()> {
|
||||
let plugin = self.ensure_plugin(plugin).await?;
|
||||
|
||||
plugin
|
||||
.interface0
|
||||
.call_execute(&mut self.store)
|
||||
.await
|
||||
.context("Failed to call add function")
|
||||
}
|
||||
|
||||
async fn ensure_plugin(&mut self, plugin: &str) -> anyhow::Result<Churn> {
|
||||
let cache = dirs::cache_dir()
|
||||
.ok_or(anyhow::anyhow!("failed to find cache dir"))?
|
||||
.join("io.kjuulh.churn");
|
||||
|
||||
let (plugin_name, plugin_version) = plugin.split_once("@").unwrap_or((plugin, "latest"));
|
||||
|
||||
let plugin_path = cache
|
||||
.join("plugins")
|
||||
.join(plugin_name)
|
||||
.join(plugin_version)
|
||||
.join(format!("{plugin_name}.wasm"));
|
||||
|
||||
let no_cache: bool = std::env::var("CHURN_NO_CACHE")
|
||||
.unwrap_or("false".into())
|
||||
.parse()?;
|
||||
|
||||
if !plugin_path.exists() || no_cache {
|
||||
tracing::info!(
|
||||
plugin_name = plugin_name,
|
||||
plugin_version = plugin_version,
|
||||
"downloading plugin"
|
||||
);
|
||||
if let Some(parent) = plugin_path.parent() {
|
||||
tokio::fs::create_dir_all(parent).await?;
|
||||
}
|
||||
|
||||
let req = reqwest::get(format!("https://api-minio.front.kjuulh.io/churn-registry/{plugin_name}/{plugin_version}/{plugin_name}.wasm")).await.context("failed to get plugin from registry")?;
|
||||
let mut stream = req.bytes_stream();
|
||||
let mut file = tokio::fs::File::create(&plugin_path).await?;
|
||||
while let Some(chunk) = stream.next().await {
|
||||
let chunk = chunk?;
|
||||
file.write_all(&chunk).await?;
|
||||
}
|
||||
file.flush().await?;
|
||||
}
|
||||
|
||||
let component =
|
||||
Component::from_file(&self.engine, plugin_path).context("Component file not found")?;
|
||||
|
||||
tracing::debug!(
|
||||
plugin_name = plugin_name,
|
||||
plugin_version = plugin_version,
|
||||
"instantiating plugin"
|
||||
);
|
||||
let instance = Churn::instantiate_async(&mut self.store, &component, &self.linker)
|
||||
.await
|
||||
.context("Failed to instantiate the example world")
|
||||
.unwrap();
|
||||
|
||||
Ok(instance)
|
||||
}
|
||||
}
|
||||
|
||||
struct ServerWasiView {
|
||||
table: ResourceTable,
|
||||
ctx: WasiCtx,
|
||||
processes: ResourceTable,
|
||||
}
|
||||
|
||||
impl ServerWasiView {
|
||||
fn new() -> Self {
|
||||
let table = ResourceTable::new();
|
||||
|
||||
let ctx = WasiCtxBuilder::new()
|
||||
.inherit_stdio()
|
||||
.inherit_env()
|
||||
.inherit_network()
|
||||
.preopened_dir("/", "/", DirPerms::all(), FilePerms::all())
|
||||
.expect("to be able to open root")
|
||||
.build();
|
||||
|
||||
Self {
|
||||
table,
|
||||
ctx,
|
||||
processes: ResourceTable::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl WasiView for ServerWasiView {
|
||||
fn table(&mut self) -> &mut ResourceTable {
|
||||
&mut self.table
|
||||
}
|
||||
|
||||
fn ctx(&mut self) -> &mut WasiCtx {
|
||||
&mut self.ctx
|
||||
}
|
||||
}
|
||||
|
||||
impl component::churn_tasks::process::Host for ServerWasiView {}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl HostProcess for ServerWasiView {
|
||||
async fn new(
|
||||
&mut self,
|
||||
) -> wasmtime::component::Resource<component::churn_tasks::process::Process> {
|
||||
self.processes.push(CustomProcess::default()).unwrap()
|
||||
}
|
||||
|
||||
async fn run_process(
|
||||
&mut self,
|
||||
self_: wasmtime::component::Resource<component::churn_tasks::process::Process>,
|
||||
inputs: wasmtime::component::__internal::Vec<String>,
|
||||
) -> String {
|
||||
let process = self.processes.get(&self_).unwrap();
|
||||
process.run(inputs)
|
||||
}
|
||||
|
||||
async fn drop(
|
||||
&mut self,
|
||||
rep: wasmtime::component::Resource<component::churn_tasks::process::Process>,
|
||||
) -> wasmtime::Result<()> {
|
||||
self.processes.delete(rep)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
67
crates/churn/src/agent/queue.rs
Normal file
67
crates/churn/src/agent/queue.rs
Normal file
@ -0,0 +1,67 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use notmad::{Component, MadError};
|
||||
use tokio::sync::Mutex;
|
||||
|
||||
use super::{models::Commands, scheduler::Scheduler};
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct AgentQueue {
|
||||
sender: Arc<tokio::sync::mpsc::Sender<Commands>>,
|
||||
receiver: Arc<Mutex<tokio::sync::mpsc::Receiver<Commands>>>,
|
||||
|
||||
scheduler: Scheduler,
|
||||
}
|
||||
|
||||
impl AgentQueue {
|
||||
pub fn new(scheduler: Scheduler) -> Self {
|
||||
let (tx, rx) = tokio::sync::mpsc::channel(5);
|
||||
Self {
|
||||
sender: Arc::new(tx),
|
||||
receiver: Arc::new(Mutex::new(rx)),
|
||||
|
||||
scheduler,
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn handler(&self, command: Commands) -> anyhow::Result<()> {
|
||||
tracing::debug!("handling task");
|
||||
|
||||
self.scheduler.handle(command).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn publish(&self, command: Commands) -> anyhow::Result<()> {
|
||||
tracing::debug!("publishing task: {}", command.to_string());
|
||||
|
||||
self.sender.send(command).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl Component for AgentQueue {
|
||||
async fn run(
|
||||
&self,
|
||||
cancellation_token: tokio_util::sync::CancellationToken,
|
||||
) -> Result<(), notmad::MadError> {
|
||||
loop {
|
||||
let mut recv = self.receiver.lock().await;
|
||||
|
||||
tokio::select! {
|
||||
res = recv.recv() => {
|
||||
if let Some(res) = res {
|
||||
self.handler(res).await.map_err(MadError::Inner)?;
|
||||
}
|
||||
}
|
||||
_ = cancellation_token.cancelled() => {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
@ -1,29 +1,37 @@
|
||||
use anyhow::Context;
|
||||
use std::collections::BTreeMap;
|
||||
|
||||
use super::agent_state::AgentState;
|
||||
use crate::agent::models::Commands;
|
||||
|
||||
use super::{agent_state::AgentState, queue::AgentQueue};
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct AgentRefresh {
|
||||
_state: AgentState,
|
||||
host: String,
|
||||
process_host: String,
|
||||
queue: AgentQueue,
|
||||
}
|
||||
|
||||
impl AgentRefresh {
|
||||
pub fn new(state: impl Into<AgentState>, host: impl Into<String>) -> Self {
|
||||
pub fn new(state: impl Into<AgentState>) -> Self {
|
||||
let state: AgentState = state.into();
|
||||
Self {
|
||||
_state: state.into(),
|
||||
host: host.into(),
|
||||
process_host: state.discovery.process_host.clone(),
|
||||
queue: state.queue.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl notmad::Component for AgentRefresh {
|
||||
fn name(&self) -> Option<String> {
|
||||
Some("agent_refresh".into())
|
||||
}
|
||||
|
||||
async fn run(
|
||||
&self,
|
||||
cancellation_token: tokio_util::sync::CancellationToken,
|
||||
) -> Result<(), notmad::MadError> {
|
||||
let cancel = nodrift::schedule_drifter(std::time::Duration::from_secs(60), self.clone());
|
||||
let cancel =
|
||||
nodrift::schedule_drifter(std::time::Duration::from_secs(60 * 10), self.clone());
|
||||
tokio::select! {
|
||||
_ = cancel.cancelled() => {},
|
||||
_ = cancellation_token.cancelled() => {
|
||||
@ -39,82 +47,14 @@ impl notmad::Component for AgentRefresh {
|
||||
#[async_trait::async_trait]
|
||||
impl nodrift::Drifter for AgentRefresh {
|
||||
async fn execute(&self, _token: tokio_util::sync::CancellationToken) -> anyhow::Result<()> {
|
||||
tracing::info!(host = self.host, "refreshing agent");
|
||||
tracing::info!(process_host = self.process_host, "refreshing agent");
|
||||
|
||||
// Get plan
|
||||
let plan = Plan::new();
|
||||
let tasks = plan.tasks().await?;
|
||||
|
||||
// For task
|
||||
for task in tasks {
|
||||
// Check idempotency rules
|
||||
if !task.should_run().await? {
|
||||
tracing::debug!(task = task.id(), "skipping run");
|
||||
continue;
|
||||
}
|
||||
|
||||
// Run task if not valid
|
||||
tracing::info!(task = task.id(), "executing task");
|
||||
task.execute().await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
pub struct Plan {}
|
||||
impl Plan {
|
||||
pub fn new() -> Self {
|
||||
Self {}
|
||||
}
|
||||
|
||||
pub async fn tasks(&self) -> anyhow::Result<Vec<Task>> {
|
||||
Ok(vec![Task::new()])
|
||||
}
|
||||
}
|
||||
|
||||
pub struct Task {}
|
||||
|
||||
impl Task {
|
||||
pub fn new() -> Self {
|
||||
Self {}
|
||||
}
|
||||
|
||||
pub fn id(&self) -> String {
|
||||
"apt".into()
|
||||
}
|
||||
|
||||
pub async fn should_run(&self) -> anyhow::Result<bool> {
|
||||
Ok(true)
|
||||
}
|
||||
|
||||
pub async fn execute(&self) -> anyhow::Result<()> {
|
||||
let mut cmd = tokio::process::Command::new("apt-get");
|
||||
cmd.args(["update", "-q"]);
|
||||
let output = cmd.output().await.context("failed to run apt update")?;
|
||||
match output.status.success() {
|
||||
true => tracing::info!("successfully ran apt update"),
|
||||
false => {
|
||||
anyhow::bail!(
|
||||
"failed to run apt update: {}",
|
||||
std::str::from_utf8(&output.stderr)?
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
let mut cmd = tokio::process::Command::new("apt-get");
|
||||
cmd.env("DEBIAN_FRONTEND", "noninteractive")
|
||||
.args(["upgrade", "-y"]);
|
||||
let output = cmd.output().await.context("failed to run apt upgrade")?;
|
||||
match output.status.success() {
|
||||
true => tracing::info!("successfully ran apt upgrade"),
|
||||
false => {
|
||||
anyhow::bail!(
|
||||
"failed to run apt upgrade: {}",
|
||||
std::str::from_utf8(&output.stderr)?
|
||||
);
|
||||
}
|
||||
}
|
||||
self.queue
|
||||
.publish(Commands::ScheduleTask {
|
||||
task: "update".into(),
|
||||
properties: BTreeMap::default(),
|
||||
})
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
22
crates/churn/src/agent/scheduler.rs
Normal file
22
crates/churn/src/agent/scheduler.rs
Normal file
@ -0,0 +1,22 @@
|
||||
use super::{handlers::scheduled_tasks::ScheduledTasks, models::Commands};
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct Scheduler {
|
||||
scheduled_tasks: ScheduledTasks,
|
||||
}
|
||||
|
||||
impl Scheduler {
|
||||
pub fn new(scheduled_tasks: ScheduledTasks) -> Self {
|
||||
Self { scheduled_tasks }
|
||||
}
|
||||
|
||||
pub async fn handle(&self, command: Commands) -> anyhow::Result<()> {
|
||||
match command {
|
||||
Commands::ScheduleTask { task, properties } => {
|
||||
self.scheduled_tasks.handle(&task, properties).await?;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
45
crates/churn/src/agent/task.rs
Normal file
45
crates/churn/src/agent/task.rs
Normal file
@ -0,0 +1,45 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
#[async_trait::async_trait]
|
||||
pub trait Task {
|
||||
async fn id(&self) -> anyhow::Result<String>;
|
||||
async fn should_run(&self) -> anyhow::Result<bool> {
|
||||
Ok(true)
|
||||
}
|
||||
async fn execute(&self) -> anyhow::Result<()>;
|
||||
}
|
||||
|
||||
pub trait IntoTask {
|
||||
fn into_task(self) -> ConcreteTask;
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct ConcreteTask {
|
||||
inner: Arc<dyn Task + Sync + Send + 'static>,
|
||||
}
|
||||
|
||||
impl ConcreteTask {
|
||||
pub fn new<T: Task + Sync + Send + 'static>(t: T) -> Self {
|
||||
Self { inner: Arc::new(t) }
|
||||
}
|
||||
}
|
||||
|
||||
impl std::ops::Deref for ConcreteTask {
|
||||
type Target = Arc<dyn Task + Sync + Send + 'static>;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.inner
|
||||
}
|
||||
}
|
||||
|
||||
impl IntoTask for ConcreteTask {
|
||||
fn into_task(self) -> ConcreteTask {
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Task + Sync + Send + 'static> IntoTask for T {
|
||||
fn into_task(self) -> ConcreteTask {
|
||||
ConcreteTask::new(self)
|
||||
}
|
||||
}
|
@ -1,6 +1,12 @@
|
||||
use std::net::SocketAddr;
|
||||
|
||||
use axum::{extract::MatchedPath, http::Request, routing::get, Router};
|
||||
use axum::{
|
||||
extract::{MatchedPath, State},
|
||||
http::Request,
|
||||
routing::get,
|
||||
Json, Router,
|
||||
};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tower_http::trace::TraceLayer;
|
||||
|
||||
@ -22,6 +28,7 @@ impl Api {
|
||||
pub async fn serve(&self) -> anyhow::Result<()> {
|
||||
let app = Router::new()
|
||||
.route("/", get(root))
|
||||
.route("/discovery", get(discovery))
|
||||
.with_state(self.state.clone())
|
||||
.layer(
|
||||
TraceLayer::new_for_http().make_span_with(|request: &Request<_>| {
|
||||
@ -55,6 +62,28 @@ async fn root() -> &'static str {
|
||||
"Hello, churn!"
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub struct Discovery {
|
||||
pub external_host: String,
|
||||
pub process_host: String,
|
||||
}
|
||||
|
||||
impl Discovery {
|
||||
pub async fn get_from_host(host: &str) -> anyhow::Result<Self> {
|
||||
let resp = reqwest::get(format!("{}/discovery", host.trim_end_matches('/'))).await?;
|
||||
let s: Self = resp.json().await?;
|
||||
|
||||
Ok(s)
|
||||
}
|
||||
}
|
||||
|
||||
async fn discovery(State(state): State<SharedState>) -> Json<Discovery> {
|
||||
Json(Discovery {
|
||||
external_host: state.config.external_host.clone(),
|
||||
process_host: state.config.process_host.clone(),
|
||||
})
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl notmad::Component for Api {
|
||||
async fn run(&self, _cancellation_token: CancellationToken) -> Result<(), notmad::MadError> {
|
||||
|
@ -1,28 +1,39 @@
|
||||
use std::net::SocketAddr;
|
||||
use std::{collections::BTreeMap, net::SocketAddr};
|
||||
|
||||
use clap::{Parser, Subcommand};
|
||||
|
||||
use crate::{agent, api, state::SharedState};
|
||||
use crate::{agent, server};
|
||||
|
||||
pub async fn execute() -> anyhow::Result<()> {
|
||||
let state = SharedState::new().await?;
|
||||
|
||||
let cli = Command::parse();
|
||||
match cli.command.expect("to have a subcommand") {
|
||||
Commands::Serve { host } => {
|
||||
Commands::Serve {
|
||||
host,
|
||||
grpc_host,
|
||||
config,
|
||||
} => {
|
||||
tracing::info!("Starting service");
|
||||
|
||||
notmad::Mad::builder()
|
||||
.add(api::Api::new(&state, host))
|
||||
.run()
|
||||
.await?;
|
||||
server::execute(host, grpc_host, config).await?;
|
||||
}
|
||||
Commands::Agent { commands } => match commands {
|
||||
AgentCommands::Start { host } => {
|
||||
AgentCommands::Start {} => {
|
||||
tracing::info!("starting agent");
|
||||
agent::execute(host).await?;
|
||||
agent::execute().await?;
|
||||
tracing::info!("shut down agent");
|
||||
}
|
||||
AgentCommands::Setup {
|
||||
force,
|
||||
discovery,
|
||||
labels,
|
||||
} => {
|
||||
let mut setup_labels = BTreeMap::new();
|
||||
for (k, v) in labels {
|
||||
setup_labels.insert(k, v);
|
||||
}
|
||||
|
||||
agent::setup_config(discovery, force, setup_labels).await?;
|
||||
tracing::info!("wrote default agent config");
|
||||
}
|
||||
},
|
||||
}
|
||||
|
||||
@ -41,6 +52,12 @@ enum Commands {
|
||||
Serve {
|
||||
#[arg(env = "SERVICE_HOST", long, default_value = "127.0.0.1:3000")]
|
||||
host: SocketAddr,
|
||||
|
||||
#[arg(env = "SERVICE_GRPC_HOST", long, default_value = "127.0.0.1:7900")]
|
||||
grpc_host: SocketAddr,
|
||||
|
||||
#[clap(flatten)]
|
||||
config: server::config::ServerConfig,
|
||||
},
|
||||
Agent {
|
||||
#[command(subcommand)]
|
||||
@ -50,8 +67,22 @@ enum Commands {
|
||||
|
||||
#[derive(Subcommand)]
|
||||
enum AgentCommands {
|
||||
Start {
|
||||
#[arg(env = "SERVICE_HOST", long = "service-host")]
|
||||
host: String,
|
||||
Start {},
|
||||
Setup {
|
||||
#[arg(long, default_value = "false")]
|
||||
force: bool,
|
||||
|
||||
#[arg(env = "DISCOVERY_HOST", long = "discovery")]
|
||||
discovery: String,
|
||||
|
||||
#[arg(long = "label", short = 'l', value_parser = parse_key_val, action = clap::ArgAction::Append)]
|
||||
labels: Vec<(String, String)>,
|
||||
},
|
||||
}
|
||||
|
||||
fn parse_key_val(s: &str) -> Result<(String, String), String> {
|
||||
let (key, value) = s
|
||||
.split_once("=")
|
||||
.ok_or_else(|| format!("invalid key=value: no `=` found in `{s}`"))?;
|
||||
Ok((key.to_string(), value.to_string()))
|
||||
}
|
||||
|
52
crates/churn/src/grpc/churn.v1.rs
Normal file
52
crates/churn/src/grpc/churn.v1.rs
Normal file
@ -0,0 +1,52 @@
|
||||
// @generated
|
||||
// This file is @generated by prost-build.
|
||||
#[allow(clippy::derive_partial_eq_without_eq)]
|
||||
#[derive(Clone, PartialEq, ::prost::Message)]
|
||||
pub struct GetKeyRequest {
|
||||
#[prost(string, tag="1")]
|
||||
pub namespace: ::prost::alloc::string::String,
|
||||
#[prost(string, optional, tag="2")]
|
||||
pub id: ::core::option::Option<::prost::alloc::string::String>,
|
||||
#[prost(string, tag="3")]
|
||||
pub key: ::prost::alloc::string::String,
|
||||
}
|
||||
#[allow(clippy::derive_partial_eq_without_eq)]
|
||||
#[derive(Clone, PartialEq, ::prost::Message)]
|
||||
pub struct GetKeyResponse {
|
||||
#[prost(string, optional, tag="1")]
|
||||
pub value: ::core::option::Option<::prost::alloc::string::String>,
|
||||
}
|
||||
#[allow(clippy::derive_partial_eq_without_eq)]
|
||||
#[derive(Clone, PartialEq, ::prost::Message)]
|
||||
pub struct SetKeyRequest {
|
||||
#[prost(string, tag="1")]
|
||||
pub namespace: ::prost::alloc::string::String,
|
||||
#[prost(string, optional, tag="2")]
|
||||
pub id: ::core::option::Option<::prost::alloc::string::String>,
|
||||
#[prost(string, tag="3")]
|
||||
pub key: ::prost::alloc::string::String,
|
||||
#[prost(string, tag="4")]
|
||||
pub value: ::prost::alloc::string::String,
|
||||
}
|
||||
#[allow(clippy::derive_partial_eq_without_eq)]
|
||||
#[derive(Clone, Copy, PartialEq, ::prost::Message)]
|
||||
pub struct SetKeyResponse {
|
||||
}
|
||||
#[allow(clippy::derive_partial_eq_without_eq)]
|
||||
#[derive(Clone, PartialEq, ::prost::Message)]
|
||||
pub struct ListenEventsRequest {
|
||||
#[prost(string, tag="1")]
|
||||
pub namespace: ::prost::alloc::string::String,
|
||||
#[prost(string, optional, tag="2")]
|
||||
pub id: ::core::option::Option<::prost::alloc::string::String>,
|
||||
}
|
||||
#[allow(clippy::derive_partial_eq_without_eq)]
|
||||
#[derive(Clone, PartialEq, ::prost::Message)]
|
||||
pub struct ListenEventsResponse {
|
||||
#[prost(string, tag="1")]
|
||||
pub id: ::prost::alloc::string::String,
|
||||
#[prost(string, tag="2")]
|
||||
pub value: ::prost::alloc::string::String,
|
||||
}
|
||||
include!("churn.v1.tonic.rs");
|
||||
// @@protoc_insertion_point(module)
|
435
crates/churn/src/grpc/churn.v1.tonic.rs
Normal file
435
crates/churn/src/grpc/churn.v1.tonic.rs
Normal file
@ -0,0 +1,435 @@
|
||||
// @generated
|
||||
/// Generated client implementations.
|
||||
pub mod churn_client {
|
||||
#![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)]
|
||||
use tonic::codegen::*;
|
||||
use tonic::codegen::http::Uri;
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct ChurnClient<T> {
|
||||
inner: tonic::client::Grpc<T>,
|
||||
}
|
||||
impl ChurnClient<tonic::transport::Channel> {
|
||||
/// Attempt to create a new client by connecting to a given endpoint.
|
||||
pub async fn connect<D>(dst: D) -> Result<Self, tonic::transport::Error>
|
||||
where
|
||||
D: TryInto<tonic::transport::Endpoint>,
|
||||
D::Error: Into<StdError>,
|
||||
{
|
||||
let conn = tonic::transport::Endpoint::new(dst)?.connect().await?;
|
||||
Ok(Self::new(conn))
|
||||
}
|
||||
}
|
||||
impl<T> ChurnClient<T>
|
||||
where
|
||||
T: tonic::client::GrpcService<tonic::body::BoxBody>,
|
||||
T::Error: Into<StdError>,
|
||||
T::ResponseBody: Body<Data = Bytes> + Send + 'static,
|
||||
<T::ResponseBody as Body>::Error: Into<StdError> + Send,
|
||||
{
|
||||
pub fn new(inner: T) -> Self {
|
||||
let inner = tonic::client::Grpc::new(inner);
|
||||
Self { inner }
|
||||
}
|
||||
pub fn with_origin(inner: T, origin: Uri) -> Self {
|
||||
let inner = tonic::client::Grpc::with_origin(inner, origin);
|
||||
Self { inner }
|
||||
}
|
||||
pub fn with_interceptor<F>(
|
||||
inner: T,
|
||||
interceptor: F,
|
||||
) -> ChurnClient<InterceptedService<T, F>>
|
||||
where
|
||||
F: tonic::service::Interceptor,
|
||||
T::ResponseBody: Default,
|
||||
T: tonic::codegen::Service<
|
||||
http::Request<tonic::body::BoxBody>,
|
||||
Response = http::Response<
|
||||
<T as tonic::client::GrpcService<tonic::body::BoxBody>>::ResponseBody,
|
||||
>,
|
||||
>,
|
||||
<T as tonic::codegen::Service<
|
||||
http::Request<tonic::body::BoxBody>,
|
||||
>>::Error: Into<StdError> + Send + Sync,
|
||||
{
|
||||
ChurnClient::new(InterceptedService::new(inner, interceptor))
|
||||
}
|
||||
/// Compress requests with the given encoding.
|
||||
///
|
||||
/// This requires the server to support it otherwise it might respond with an
|
||||
/// error.
|
||||
#[must_use]
|
||||
pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self {
|
||||
self.inner = self.inner.send_compressed(encoding);
|
||||
self
|
||||
}
|
||||
/// Enable decompressing responses.
|
||||
#[must_use]
|
||||
pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self {
|
||||
self.inner = self.inner.accept_compressed(encoding);
|
||||
self
|
||||
}
|
||||
/// Limits the maximum size of a decoded message.
|
||||
///
|
||||
/// Default: `4MB`
|
||||
#[must_use]
|
||||
pub fn max_decoding_message_size(mut self, limit: usize) -> Self {
|
||||
self.inner = self.inner.max_decoding_message_size(limit);
|
||||
self
|
||||
}
|
||||
/// Limits the maximum size of an encoded message.
|
||||
///
|
||||
/// Default: `usize::MAX`
|
||||
#[must_use]
|
||||
pub fn max_encoding_message_size(mut self, limit: usize) -> Self {
|
||||
self.inner = self.inner.max_encoding_message_size(limit);
|
||||
self
|
||||
}
|
||||
pub async fn get_key(
|
||||
&mut self,
|
||||
request: impl tonic::IntoRequest<super::GetKeyRequest>,
|
||||
) -> std::result::Result<tonic::Response<super::GetKeyResponse>, tonic::Status> {
|
||||
self.inner
|
||||
.ready()
|
||||
.await
|
||||
.map_err(|e| {
|
||||
tonic::Status::new(
|
||||
tonic::Code::Unknown,
|
||||
format!("Service was not ready: {}", e.into()),
|
||||
)
|
||||
})?;
|
||||
let codec = tonic::codec::ProstCodec::default();
|
||||
let path = http::uri::PathAndQuery::from_static("/churn.v1.Churn/GetKey");
|
||||
let mut req = request.into_request();
|
||||
req.extensions_mut().insert(GrpcMethod::new("churn.v1.Churn", "GetKey"));
|
||||
self.inner.unary(req, path, codec).await
|
||||
}
|
||||
pub async fn set_key(
|
||||
&mut self,
|
||||
request: impl tonic::IntoRequest<super::SetKeyRequest>,
|
||||
) -> std::result::Result<tonic::Response<super::SetKeyResponse>, tonic::Status> {
|
||||
self.inner
|
||||
.ready()
|
||||
.await
|
||||
.map_err(|e| {
|
||||
tonic::Status::new(
|
||||
tonic::Code::Unknown,
|
||||
format!("Service was not ready: {}", e.into()),
|
||||
)
|
||||
})?;
|
||||
let codec = tonic::codec::ProstCodec::default();
|
||||
let path = http::uri::PathAndQuery::from_static("/churn.v1.Churn/SetKey");
|
||||
let mut req = request.into_request();
|
||||
req.extensions_mut().insert(GrpcMethod::new("churn.v1.Churn", "SetKey"));
|
||||
self.inner.unary(req, path, codec).await
|
||||
}
|
||||
pub async fn listen_events(
|
||||
&mut self,
|
||||
request: impl tonic::IntoRequest<super::ListenEventsRequest>,
|
||||
) -> std::result::Result<
|
||||
tonic::Response<tonic::codec::Streaming<super::ListenEventsResponse>>,
|
||||
tonic::Status,
|
||||
> {
|
||||
self.inner
|
||||
.ready()
|
||||
.await
|
||||
.map_err(|e| {
|
||||
tonic::Status::new(
|
||||
tonic::Code::Unknown,
|
||||
format!("Service was not ready: {}", e.into()),
|
||||
)
|
||||
})?;
|
||||
let codec = tonic::codec::ProstCodec::default();
|
||||
let path = http::uri::PathAndQuery::from_static(
|
||||
"/churn.v1.Churn/ListenEvents",
|
||||
);
|
||||
let mut req = request.into_request();
|
||||
req.extensions_mut()
|
||||
.insert(GrpcMethod::new("churn.v1.Churn", "ListenEvents"));
|
||||
self.inner.server_streaming(req, path, codec).await
|
||||
}
|
||||
}
|
||||
}
|
||||
/// Generated server implementations.
|
||||
pub mod churn_server {
|
||||
#![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)]
|
||||
use tonic::codegen::*;
|
||||
/// Generated trait containing gRPC methods that should be implemented for use with ChurnServer.
|
||||
#[async_trait]
|
||||
pub trait Churn: Send + Sync + 'static {
|
||||
async fn get_key(
|
||||
&self,
|
||||
request: tonic::Request<super::GetKeyRequest>,
|
||||
) -> std::result::Result<tonic::Response<super::GetKeyResponse>, tonic::Status>;
|
||||
async fn set_key(
|
||||
&self,
|
||||
request: tonic::Request<super::SetKeyRequest>,
|
||||
) -> std::result::Result<tonic::Response<super::SetKeyResponse>, tonic::Status>;
|
||||
/// Server streaming response type for the ListenEvents method.
|
||||
type ListenEventsStream: tonic::codegen::tokio_stream::Stream<
|
||||
Item = std::result::Result<super::ListenEventsResponse, tonic::Status>,
|
||||
>
|
||||
+ Send
|
||||
+ 'static;
|
||||
async fn listen_events(
|
||||
&self,
|
||||
request: tonic::Request<super::ListenEventsRequest>,
|
||||
) -> std::result::Result<
|
||||
tonic::Response<Self::ListenEventsStream>,
|
||||
tonic::Status,
|
||||
>;
|
||||
}
|
||||
#[derive(Debug)]
|
||||
pub struct ChurnServer<T: Churn> {
|
||||
inner: _Inner<T>,
|
||||
accept_compression_encodings: EnabledCompressionEncodings,
|
||||
send_compression_encodings: EnabledCompressionEncodings,
|
||||
max_decoding_message_size: Option<usize>,
|
||||
max_encoding_message_size: Option<usize>,
|
||||
}
|
||||
struct _Inner<T>(Arc<T>);
|
||||
impl<T: Churn> ChurnServer<T> {
|
||||
pub fn new(inner: T) -> Self {
|
||||
Self::from_arc(Arc::new(inner))
|
||||
}
|
||||
pub fn from_arc(inner: Arc<T>) -> Self {
|
||||
let inner = _Inner(inner);
|
||||
Self {
|
||||
inner,
|
||||
accept_compression_encodings: Default::default(),
|
||||
send_compression_encodings: Default::default(),
|
||||
max_decoding_message_size: None,
|
||||
max_encoding_message_size: None,
|
||||
}
|
||||
}
|
||||
pub fn with_interceptor<F>(
|
||||
inner: T,
|
||||
interceptor: F,
|
||||
) -> InterceptedService<Self, F>
|
||||
where
|
||||
F: tonic::service::Interceptor,
|
||||
{
|
||||
InterceptedService::new(Self::new(inner), interceptor)
|
||||
}
|
||||
/// Enable decompressing requests with the given encoding.
|
||||
#[must_use]
|
||||
pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self {
|
||||
self.accept_compression_encodings.enable(encoding);
|
||||
self
|
||||
}
|
||||
/// Compress responses with the given encoding, if the client supports it.
|
||||
#[must_use]
|
||||
pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self {
|
||||
self.send_compression_encodings.enable(encoding);
|
||||
self
|
||||
}
|
||||
/// Limits the maximum size of a decoded message.
|
||||
///
|
||||
/// Default: `4MB`
|
||||
#[must_use]
|
||||
pub fn max_decoding_message_size(mut self, limit: usize) -> Self {
|
||||
self.max_decoding_message_size = Some(limit);
|
||||
self
|
||||
}
|
||||
/// Limits the maximum size of an encoded message.
|
||||
///
|
||||
/// Default: `usize::MAX`
|
||||
#[must_use]
|
||||
pub fn max_encoding_message_size(mut self, limit: usize) -> Self {
|
||||
self.max_encoding_message_size = Some(limit);
|
||||
self
|
||||
}
|
||||
}
|
||||
impl<T, B> tonic::codegen::Service<http::Request<B>> for ChurnServer<T>
|
||||
where
|
||||
T: Churn,
|
||||
B: Body + Send + 'static,
|
||||
B::Error: Into<StdError> + Send + 'static,
|
||||
{
|
||||
type Response = http::Response<tonic::body::BoxBody>;
|
||||
type Error = std::convert::Infallible;
|
||||
type Future = BoxFuture<Self::Response, Self::Error>;
|
||||
fn poll_ready(
|
||||
&mut self,
|
||||
_cx: &mut Context<'_>,
|
||||
) -> Poll<std::result::Result<(), Self::Error>> {
|
||||
Poll::Ready(Ok(()))
|
||||
}
|
||||
fn call(&mut self, req: http::Request<B>) -> Self::Future {
|
||||
let inner = self.inner.clone();
|
||||
match req.uri().path() {
|
||||
"/churn.v1.Churn/GetKey" => {
|
||||
#[allow(non_camel_case_types)]
|
||||
struct GetKeySvc<T: Churn>(pub Arc<T>);
|
||||
impl<T: Churn> tonic::server::UnaryService<super::GetKeyRequest>
|
||||
for GetKeySvc<T> {
|
||||
type Response = super::GetKeyResponse;
|
||||
type Future = BoxFuture<
|
||||
tonic::Response<Self::Response>,
|
||||
tonic::Status,
|
||||
>;
|
||||
fn call(
|
||||
&mut self,
|
||||
request: tonic::Request<super::GetKeyRequest>,
|
||||
) -> Self::Future {
|
||||
let inner = Arc::clone(&self.0);
|
||||
let fut = async move {
|
||||
<T as Churn>::get_key(&inner, request).await
|
||||
};
|
||||
Box::pin(fut)
|
||||
}
|
||||
}
|
||||
let accept_compression_encodings = self.accept_compression_encodings;
|
||||
let send_compression_encodings = self.send_compression_encodings;
|
||||
let max_decoding_message_size = self.max_decoding_message_size;
|
||||
let max_encoding_message_size = self.max_encoding_message_size;
|
||||
let inner = self.inner.clone();
|
||||
let fut = async move {
|
||||
let inner = inner.0;
|
||||
let method = GetKeySvc(inner);
|
||||
let codec = tonic::codec::ProstCodec::default();
|
||||
let mut grpc = tonic::server::Grpc::new(codec)
|
||||
.apply_compression_config(
|
||||
accept_compression_encodings,
|
||||
send_compression_encodings,
|
||||
)
|
||||
.apply_max_message_size_config(
|
||||
max_decoding_message_size,
|
||||
max_encoding_message_size,
|
||||
);
|
||||
let res = grpc.unary(method, req).await;
|
||||
Ok(res)
|
||||
};
|
||||
Box::pin(fut)
|
||||
}
|
||||
"/churn.v1.Churn/SetKey" => {
|
||||
#[allow(non_camel_case_types)]
|
||||
struct SetKeySvc<T: Churn>(pub Arc<T>);
|
||||
impl<T: Churn> tonic::server::UnaryService<super::SetKeyRequest>
|
||||
for SetKeySvc<T> {
|
||||
type Response = super::SetKeyResponse;
|
||||
type Future = BoxFuture<
|
||||
tonic::Response<Self::Response>,
|
||||
tonic::Status,
|
||||
>;
|
||||
fn call(
|
||||
&mut self,
|
||||
request: tonic::Request<super::SetKeyRequest>,
|
||||
) -> Self::Future {
|
||||
let inner = Arc::clone(&self.0);
|
||||
let fut = async move {
|
||||
<T as Churn>::set_key(&inner, request).await
|
||||
};
|
||||
Box::pin(fut)
|
||||
}
|
||||
}
|
||||
let accept_compression_encodings = self.accept_compression_encodings;
|
||||
let send_compression_encodings = self.send_compression_encodings;
|
||||
let max_decoding_message_size = self.max_decoding_message_size;
|
||||
let max_encoding_message_size = self.max_encoding_message_size;
|
||||
let inner = self.inner.clone();
|
||||
let fut = async move {
|
||||
let inner = inner.0;
|
||||
let method = SetKeySvc(inner);
|
||||
let codec = tonic::codec::ProstCodec::default();
|
||||
let mut grpc = tonic::server::Grpc::new(codec)
|
||||
.apply_compression_config(
|
||||
accept_compression_encodings,
|
||||
send_compression_encodings,
|
||||
)
|
||||
.apply_max_message_size_config(
|
||||
max_decoding_message_size,
|
||||
max_encoding_message_size,
|
||||
);
|
||||
let res = grpc.unary(method, req).await;
|
||||
Ok(res)
|
||||
};
|
||||
Box::pin(fut)
|
||||
}
|
||||
"/churn.v1.Churn/ListenEvents" => {
|
||||
#[allow(non_camel_case_types)]
|
||||
struct ListenEventsSvc<T: Churn>(pub Arc<T>);
|
||||
impl<
|
||||
T: Churn,
|
||||
> tonic::server::ServerStreamingService<super::ListenEventsRequest>
|
||||
for ListenEventsSvc<T> {
|
||||
type Response = super::ListenEventsResponse;
|
||||
type ResponseStream = T::ListenEventsStream;
|
||||
type Future = BoxFuture<
|
||||
tonic::Response<Self::ResponseStream>,
|
||||
tonic::Status,
|
||||
>;
|
||||
fn call(
|
||||
&mut self,
|
||||
request: tonic::Request<super::ListenEventsRequest>,
|
||||
) -> Self::Future {
|
||||
let inner = Arc::clone(&self.0);
|
||||
let fut = async move {
|
||||
<T as Churn>::listen_events(&inner, request).await
|
||||
};
|
||||
Box::pin(fut)
|
||||
}
|
||||
}
|
||||
let accept_compression_encodings = self.accept_compression_encodings;
|
||||
let send_compression_encodings = self.send_compression_encodings;
|
||||
let max_decoding_message_size = self.max_decoding_message_size;
|
||||
let max_encoding_message_size = self.max_encoding_message_size;
|
||||
let inner = self.inner.clone();
|
||||
let fut = async move {
|
||||
let inner = inner.0;
|
||||
let method = ListenEventsSvc(inner);
|
||||
let codec = tonic::codec::ProstCodec::default();
|
||||
let mut grpc = tonic::server::Grpc::new(codec)
|
||||
.apply_compression_config(
|
||||
accept_compression_encodings,
|
||||
send_compression_encodings,
|
||||
)
|
||||
.apply_max_message_size_config(
|
||||
max_decoding_message_size,
|
||||
max_encoding_message_size,
|
||||
);
|
||||
let res = grpc.server_streaming(method, req).await;
|
||||
Ok(res)
|
||||
};
|
||||
Box::pin(fut)
|
||||
}
|
||||
_ => {
|
||||
Box::pin(async move {
|
||||
Ok(
|
||||
http::Response::builder()
|
||||
.status(200)
|
||||
.header("grpc-status", "12")
|
||||
.header("content-type", "application/grpc")
|
||||
.body(empty_body())
|
||||
.unwrap(),
|
||||
)
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
impl<T: Churn> Clone for ChurnServer<T> {
|
||||
fn clone(&self) -> Self {
|
||||
let inner = self.inner.clone();
|
||||
Self {
|
||||
inner,
|
||||
accept_compression_encodings: self.accept_compression_encodings,
|
||||
send_compression_encodings: self.send_compression_encodings,
|
||||
max_decoding_message_size: self.max_decoding_message_size,
|
||||
max_encoding_message_size: self.max_encoding_message_size,
|
||||
}
|
||||
}
|
||||
}
|
||||
impl<T: Churn> Clone for _Inner<T> {
|
||||
fn clone(&self) -> Self {
|
||||
Self(Arc::clone(&self.0))
|
||||
}
|
||||
}
|
||||
impl<T: std::fmt::Debug> std::fmt::Debug for _Inner<T> {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "{:?}", self.0)
|
||||
}
|
||||
}
|
||||
impl<T: Churn> tonic::server::NamedService for ChurnServer<T> {
|
||||
const NAME: &'static str = "churn.v1.Churn";
|
||||
}
|
||||
}
|
@ -1,8 +1,12 @@
|
||||
mod api;
|
||||
mod cli;
|
||||
mod state;
|
||||
mod grpc {
|
||||
include!("grpc/churn.v1.rs");
|
||||
}
|
||||
|
||||
mod agent;
|
||||
mod server;
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
|
23
crates/churn/src/server.rs
Normal file
23
crates/churn/src/server.rs
Normal file
@ -0,0 +1,23 @@
|
||||
use std::net::SocketAddr;
|
||||
|
||||
pub mod config;
|
||||
|
||||
mod grpc_server;
|
||||
|
||||
use crate::{api, state::SharedState};
|
||||
|
||||
pub async fn execute(
|
||||
host: impl Into<SocketAddr>,
|
||||
grpc_host: impl Into<SocketAddr>,
|
||||
config: config::ServerConfig,
|
||||
) -> anyhow::Result<()> {
|
||||
let state = SharedState::new(config).await?;
|
||||
|
||||
notmad::Mad::builder()
|
||||
.add(api::Api::new(&state, host))
|
||||
.add(grpc_server::GrpcServer::new(grpc_host.into()))
|
||||
.run()
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
7
crates/churn/src/server/config.rs
Normal file
7
crates/churn/src/server/config.rs
Normal file
@ -0,0 +1,7 @@
|
||||
#[derive(clap::Args)]
|
||||
pub struct ServerConfig {
|
||||
#[arg(long = "external-host", env = "EXTERNAL_HOST")]
|
||||
pub external_host: String,
|
||||
#[arg(long = "process-host", env = "PROCESS_HOST")]
|
||||
pub process_host: String,
|
||||
}
|
102
crates/churn/src/server/grpc_server.rs
Normal file
102
crates/churn/src/server/grpc_server.rs
Normal file
@ -0,0 +1,102 @@
|
||||
use std::{collections::BTreeMap, net::SocketAddr, pin::Pin};
|
||||
|
||||
use anyhow::Context;
|
||||
use futures::Stream;
|
||||
use notmad::{Component, MadError};
|
||||
use tonic::transport::Server;
|
||||
|
||||
use crate::{agent::models::Commands, grpc::*};
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct GrpcServer {
|
||||
grpc_host: SocketAddr,
|
||||
}
|
||||
|
||||
impl GrpcServer {
|
||||
pub fn new(grpc_host: SocketAddr) -> Self {
|
||||
Self { grpc_host }
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl Component for GrpcServer {
|
||||
async fn run(
|
||||
&self,
|
||||
cancellation_token: tokio_util::sync::CancellationToken,
|
||||
) -> Result<(), notmad::MadError> {
|
||||
let task = Server::builder()
|
||||
.add_service(crate::grpc::churn_server::ChurnServer::new(self.clone()))
|
||||
.serve(self.grpc_host);
|
||||
|
||||
tokio::select! {
|
||||
_ = cancellation_token.cancelled() => {},
|
||||
res = task => {
|
||||
res.context("failed to run grpc server").map_err(MadError::Inner)?;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl crate::grpc::churn_server::Churn for GrpcServer {
|
||||
async fn get_key(
|
||||
&self,
|
||||
request: tonic::Request<GetKeyRequest>,
|
||||
) -> std::result::Result<tonic::Response<GetKeyResponse>, tonic::Status> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
async fn set_key(
|
||||
&self,
|
||||
request: tonic::Request<SetKeyRequest>,
|
||||
) -> std::result::Result<tonic::Response<SetKeyResponse>, tonic::Status> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
#[doc = " Server streaming response type for the ListenEvents method."]
|
||||
type ListenEventsStream =
|
||||
Pin<Box<dyn Stream<Item = Result<ListenEventsResponse, tonic::Status>> + Send>>;
|
||||
|
||||
async fn listen_events(
|
||||
&self,
|
||||
request: tonic::Request<ListenEventsRequest>,
|
||||
) -> std::result::Result<tonic::Response<Self::ListenEventsStream>, tonic::Status> {
|
||||
let (tx, rx) = tokio::sync::mpsc::channel(128);
|
||||
tokio::spawn(async move {
|
||||
let mut interval = tokio::time::interval(std::time::Duration::from_secs(60 * 10));
|
||||
|
||||
loop {
|
||||
interval.tick().await;
|
||||
|
||||
let Ok(schedule_task) = serde_json::to_string(&Commands::ScheduleTask {
|
||||
task: "refresh".into(),
|
||||
properties: BTreeMap::default(),
|
||||
}) else {
|
||||
tracing::warn!("failed to serialize event");
|
||||
continue;
|
||||
};
|
||||
|
||||
if let Err(e) = tx
|
||||
.send(Ok(ListenEventsResponse {
|
||||
id: uuid::Uuid::new_v4().to_string(),
|
||||
value: schedule_task,
|
||||
}))
|
||||
.await
|
||||
{
|
||||
tracing::warn!("failed to send response: {}", e);
|
||||
break;
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
let stream = futures::stream::unfold(rx, |mut msg| async move {
|
||||
let next = msg.recv().await?;
|
||||
|
||||
Some((next, msg))
|
||||
});
|
||||
|
||||
Ok(tonic::Response::new(Box::pin(stream)))
|
||||
}
|
||||
}
|
@ -1,11 +1,13 @@
|
||||
use std::{ops::Deref, sync::Arc};
|
||||
|
||||
use crate::server::config::ServerConfig;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct SharedState(Arc<State>);
|
||||
|
||||
impl SharedState {
|
||||
pub async fn new() -> anyhow::Result<Self> {
|
||||
Ok(Self(Arc::new(State::new().await?)))
|
||||
pub async fn new(config: ServerConfig) -> anyhow::Result<Self> {
|
||||
Ok(Self(Arc::new(State::new(config).await?)))
|
||||
}
|
||||
}
|
||||
|
||||
@ -23,10 +25,12 @@ impl Deref for SharedState {
|
||||
}
|
||||
}
|
||||
|
||||
pub struct State {}
|
||||
pub struct State {
|
||||
pub config: ServerConfig,
|
||||
}
|
||||
|
||||
impl State {
|
||||
pub async fn new() -> anyhow::Result<Self> {
|
||||
Ok(Self {})
|
||||
pub async fn new(config: ServerConfig) -> anyhow::Result<Self> {
|
||||
Ok(Self { config })
|
||||
}
|
||||
}
|
||||
|
19
crates/churn/wit/world.wit
Normal file
19
crates/churn/wit/world.wit
Normal file
@ -0,0 +1,19 @@
|
||||
package component:churn-tasks@0.1.0;
|
||||
|
||||
interface process {
|
||||
resource process {
|
||||
constructor();
|
||||
run-process: func(inputs: list<string>) -> string;
|
||||
}
|
||||
}
|
||||
|
||||
interface task {
|
||||
id: func() -> string;
|
||||
should-run: func() -> bool;
|
||||
execute: func();
|
||||
}
|
||||
|
||||
world churn {
|
||||
export task;
|
||||
import process;
|
||||
}
|
@ -12,11 +12,20 @@ vars:
|
||||
ingress:
|
||||
- external: "true"
|
||||
- internal: "true"
|
||||
- internal_grpc: "true"
|
||||
|
||||
cuddle/clusters:
|
||||
dev:
|
||||
env:
|
||||
service.host: "0.0.0.0:3000"
|
||||
service.grpc.host: "0.0.0.0:4001"
|
||||
process.host: "https://grpc.churn.dev.internal.kjuulh.app"
|
||||
external.host: "https://churn.internal.dev.kjuulh.app"
|
||||
rust.log: "h2=warn,debug"
|
||||
prod:
|
||||
env:
|
||||
service.host: "0.0.0.0:3000"
|
||||
service.grpc.host: "0.0.0.0:4001"
|
||||
process.host: "https://grpc.churn.prod.internal.kjuulh.app"
|
||||
external.host: "https://churn.internal.prod.kjuulh.app"
|
||||
rust.log: "h2=warn,debug"
|
||||
|
127
install.sh
127
install.sh
@ -1,97 +1,92 @@
|
||||
#!/bin/bash
|
||||
#!/usr/bin/env bash
|
||||
|
||||
set -e
|
||||
|
||||
# Configuration variables
|
||||
GITEA_HOST="https://git.front.kjuulh.io"
|
||||
REPO_OWNER="kjuulh"
|
||||
REPO_NAME="churn-v2"
|
||||
# Configuration
|
||||
APP_NAME="churn"
|
||||
APP_VERSION="latest" # or specify a version
|
||||
S3_BUCKET="rust-artifacts"
|
||||
BINARY_NAME="churn"
|
||||
SERVICE_NAME="churn-agent"
|
||||
SERVICE_USER="churn"
|
||||
RELEASE_TAG="latest" # or specific version like "v1.0.0"
|
||||
SERVICE_NAME="${APP_NAME}.service"
|
||||
INSTALL_DIR="/usr/local/bin"
|
||||
CONFIG_DIR="/etc/${APP_NAME}"
|
||||
CHURN_DISCOVERY="https://churn.prod.kjuulh.app"
|
||||
|
||||
# Colors for output
|
||||
RED='\033[0;31m'
|
||||
GREEN='\033[0;32m'
|
||||
NC='\033[0m' # No Color
|
||||
|
||||
# Check if running as root
|
||||
if [ "$EUID" -ne 0 ]; then
|
||||
echo "Please run as root"
|
||||
echo -e "${RED}Please run as root${NC}"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
# Create service user if it doesn't exist
|
||||
if ! id "$SERVICE_USER" &>/dev/null; then
|
||||
useradd -r -s /bin/false "$SERVICE_USER"
|
||||
# Create necessary directories
|
||||
echo "Creating directories..."
|
||||
mkdir -p "${INSTALL_DIR}"
|
||||
mkdir -p "${CONFIG_DIR}"
|
||||
|
||||
if systemctl is-active --quiet churn.service; then
|
||||
echo "Stopping existing churn service..."
|
||||
systemctl stop churn.service
|
||||
fi
|
||||
|
||||
# Function to get latest release if RELEASE_TAG is "latest"
|
||||
get_latest_release() {
|
||||
curl -s "https://$GITEA_HOST/api/v1/repos/$REPO_OWNER/$REPO_NAME/releases/latest" | \
|
||||
grep '"tag_name":' | \
|
||||
sed -E 's/.*"([^"]+)".*/\1/'
|
||||
}
|
||||
# Download binary from S3
|
||||
echo "Downloading binary..."
|
||||
curl -L -s "https://api-minio.front.kjuulh.io/${S3_BUCKET}/releases/${APP_NAME}/${APP_VERSION}/${BINARY_NAME}" -o "${INSTALL_DIR}/${BINARY_NAME}"
|
||||
|
||||
# Determine the actual release tag
|
||||
if [ "$RELEASE_TAG" = "latest" ]; then
|
||||
RELEASE_TAG=$(get_latest_release)
|
||||
# Make binary executable
|
||||
chmod +x "${INSTALL_DIR}/${BINARY_NAME}"
|
||||
|
||||
echo "Starting churn agent setup..."
|
||||
set +e
|
||||
res=$(churn agent setup --discovery "${CHURN_DISCOVERY}" 2>&1)
|
||||
set -e
|
||||
exit_code=$?
|
||||
|
||||
if [[ $exit_code != 0 ]]; then
|
||||
if [[ "$res" != *"config file already exists"* ]] && [[ "$res" != *"already exists"* ]]; then
|
||||
echo "Error detected: $res"
|
||||
exit 1
|
||||
else
|
||||
echo "Ignoring setup 'agent is already setup'"
|
||||
fi
|
||||
fi
|
||||
|
||||
echo "Installing $BINARY_NAME version $RELEASE_TAG..."
|
||||
|
||||
# Download and install binary
|
||||
TMP_DIR=$(mktemp -d)
|
||||
cd "$TMP_DIR"
|
||||
|
||||
# Download binary from Gitea
|
||||
curl -L -o "$BINARY_NAME" \
|
||||
"https://$GITEA_HOST/$REPO_OWNER/$REPO_NAME/releases/download/$RELEASE_TAG/$BINARY_NAME"
|
||||
|
||||
# Make binary executable and move to appropriate location
|
||||
chmod +x "$BINARY_NAME"
|
||||
mv "$BINARY_NAME" "/usr/local/bin/$BINARY_NAME"
|
||||
|
||||
# Create systemd service file
|
||||
cat > "/etc/systemd/system/$SERVICE_NAME.service" << EOF
|
||||
echo "Creating systemd service..."
|
||||
cat > "/etc/systemd/system/${SERVICE_NAME}" << EOF
|
||||
[Unit]
|
||||
Description=$SERVICE_NAME Service
|
||||
Description=${APP_NAME} service
|
||||
After=network.target
|
||||
|
||||
[Service]
|
||||
Type=simple
|
||||
User=$SERVICE_USER
|
||||
ExecStart=/usr/local/bin/$BINARY_NAME
|
||||
User=root
|
||||
Group=root
|
||||
ExecStart=${INSTALL_DIR}/${BINARY_NAME} agent start
|
||||
Restart=always
|
||||
RestartSec=5
|
||||
|
||||
# Security hardening options
|
||||
ProtectSystem=strict
|
||||
ProtectHome=true
|
||||
NoNewPrivileges=true
|
||||
ReadWritePaths=/var/log/$SERVICE_NAME
|
||||
RestartSec=10
|
||||
Environment=RUST_LOG=h2=warn,hyper=warn,churn=debug,warn
|
||||
|
||||
[Install]
|
||||
WantedBy=multi-user.target
|
||||
EOF
|
||||
|
||||
# Create log directory if logging is needed
|
||||
mkdir -p "/var/log/$SERVICE_NAME"
|
||||
chown "$SERVICE_USER:$SERVICE_USER" "/var/log/$SERVICE_NAME"
|
||||
|
||||
# Reload systemd and enable service
|
||||
echo "Configuring systemd service..."
|
||||
systemctl daemon-reload
|
||||
systemctl enable "$SERVICE_NAME"
|
||||
systemctl start "$SERVICE_NAME"
|
||||
systemctl enable "${SERVICE_NAME}"
|
||||
systemctl start "${SERVICE_NAME}"
|
||||
|
||||
# Clean up
|
||||
cd
|
||||
rm -rf "$TMP_DIR"
|
||||
# Check service status
|
||||
if systemctl is-active --quiet "${SERVICE_NAME}"; then
|
||||
echo -e "${GREEN}Installation successful! ${APP_NAME} is running.${NC}"
|
||||
echo "You can check the status with: systemctl status ${SERVICE_NAME}"
|
||||
else
|
||||
echo -e "${RED}Installation completed but service failed to start. Check logs with: journalctl -u ${SERVICE_NAME}${NC}"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
echo "Installation complete! Service status:"
|
||||
systemctl status "$SERVICE_NAME"
|
||||
|
||||
# Provide some helpful commands
|
||||
echo "
|
||||
Useful commands:
|
||||
- Check status: systemctl status $SERVICE_NAME
|
||||
- View logs: journalctl -u $SERVICE_NAME
|
||||
- Restart service: systemctl restart $SERVICE_NAME
|
||||
- Stop service: systemctl stop $SERVICE_NAME
|
||||
"
|
||||
|
Loading…
Reference in New Issue
Block a user