Compare commits
49 Commits
Author | SHA1 | Date | |
---|---|---|---|
c5e5307682 | |||
5fb59ad992 | |||
e21663c8bd | |||
39a01778b2 | |||
f1cdf3ae20 | |||
355587234e | |||
21a13f3444 | |||
5e1b585a2d | |||
94025a02ce | |||
db4cc98643 | |||
2387a70778 | |||
d6fdda0e4e | |||
974e1ee0d6 | |||
717b052a88 | |||
9b52376e7a | |||
7b222af1dd | |||
150c7c3c98 | |||
8b064c2169 | |||
879737eedd | |||
818cc6c671 | |||
e759239243 | |||
b37674987e | |||
1dea5f29ac | |||
38a0a9fba4 | |||
f7c7aef96b | |||
7fefca47c9 | |||
36b1335fe9 | |||
eeabeda036 | |||
569fee52e6 | |||
b0ec41fa93 | |||
77f5ec7475 | |||
f3926d0885 | |||
b48b1ec886 | |||
79a8a34499 | |||
c9bbeb2439 | |||
b19ff0b7e5 | |||
eeaf59ac63 | |||
6f04d0cdda | |||
43c5fa1731 | |||
9badf8e193 | |||
55a0d294c5 | |||
8508d3f640 | |||
dd8ade9798 | |||
f1e6268a2d | |||
6647bb89be | |||
ea5adb2f93 | |||
ee323e99e8 | |||
c4434fd841 | |||
cb340ffb1e |
8
.env
8
.env
@ -1 +1,9 @@
|
|||||||
|
EXTERNAL_HOST=http://localhost:3000
|
||||||
|
PROCESS_HOST=http://localhost:7900
|
||||||
|
SERVICE_HOST=127.0.0.1:3000
|
||||||
|
DISCOVERY_HOST=http://127.0.0.1:3000
|
||||||
|
|
||||||
|
#EXTERNAL_HOST=http://localhost:3000
|
||||||
|
#PROCESS_HOST=http://localhost:7900
|
||||||
|
#SERVICE_HOST=127.0.0.1:3000
|
||||||
|
#DISCOVERY_HOST=https://churn.prod.kjuulh.app
|
||||||
|
2253
Cargo.lock
generated
2253
Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
8
buf.gen.yaml
Normal file
8
buf.gen.yaml
Normal file
@ -0,0 +1,8 @@
|
|||||||
|
version: v2
|
||||||
|
plugins:
|
||||||
|
- remote: buf.build/community/neoeinstein-prost
|
||||||
|
out: crates/churn/src/grpc
|
||||||
|
- remote: buf.build/community/neoeinstein-tonic:v0.4.0
|
||||||
|
out: crates/churn/src/grpc
|
||||||
|
inputs:
|
||||||
|
- directory: crates/churn/proto
|
@ -14,8 +14,8 @@ axum.workspace = true
|
|||||||
|
|
||||||
serde = { version = "1.0.197", features = ["derive"] }
|
serde = { version = "1.0.197", features = ["derive"] }
|
||||||
uuid = { version = "1.7.0", features = ["v4"] }
|
uuid = { version = "1.7.0", features = ["v4"] }
|
||||||
tower-http = { version = "0.5.2", features = ["cors", "trace"] }
|
tower-http = { version = "0.6.0", features = ["cors", "trace"] }
|
||||||
notmad = "0.6.0"
|
notmad = "0.7.1"
|
||||||
tokio-util = "0.7.12"
|
tokio-util = "0.7.12"
|
||||||
async-trait = "0.1.83"
|
async-trait = "0.1.83"
|
||||||
nodrift = "0.2.0"
|
nodrift = "0.2.0"
|
||||||
@ -24,3 +24,16 @@ prost-types = "0.13.3"
|
|||||||
prost = "0.13.3"
|
prost = "0.13.3"
|
||||||
bytes = "1.8.0"
|
bytes = "1.8.0"
|
||||||
tonic = { version = "0.12.3", features = ["tls", "tls-roots"] }
|
tonic = { version = "0.12.3", features = ["tls", "tls-roots"] }
|
||||||
|
toml = "0.8.19"
|
||||||
|
dirs = "5.0.1"
|
||||||
|
futures = "0.3.31"
|
||||||
|
reqwest = { version = "0.12.9", default-features = false, features = [
|
||||||
|
"json",
|
||||||
|
"http2",
|
||||||
|
"charset",
|
||||||
|
"native-tls-vendored",
|
||||||
|
"stream",
|
||||||
|
] }
|
||||||
|
serde_json = "1.0.133"
|
||||||
|
wasmtime = "27.0.0"
|
||||||
|
wasmtime-wasi = "27.0.0"
|
||||||
|
35
crates/churn/proto/churn/v1/churn.proto
Normal file
35
crates/churn/proto/churn/v1/churn.proto
Normal file
@ -0,0 +1,35 @@
|
|||||||
|
syntax = "proto3";
|
||||||
|
|
||||||
|
package churn.v1;
|
||||||
|
|
||||||
|
service Churn {
|
||||||
|
rpc GetKey(GetKeyRequest) returns (GetKeyResponse);
|
||||||
|
rpc SetKey(SetKeyRequest) returns (SetKeyResponse);
|
||||||
|
rpc ListenEvents(ListenEventsRequest) returns (stream ListenEventsResponse);
|
||||||
|
}
|
||||||
|
|
||||||
|
message GetKeyRequest {
|
||||||
|
string namespace = 1;
|
||||||
|
optional string id = 2;
|
||||||
|
string key = 3;
|
||||||
|
}
|
||||||
|
message GetKeyResponse {
|
||||||
|
optional string value = 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
message SetKeyRequest {
|
||||||
|
string namespace = 1;
|
||||||
|
optional string id = 2;
|
||||||
|
string key = 3;
|
||||||
|
string value = 4;
|
||||||
|
}
|
||||||
|
message SetKeyResponse {}
|
||||||
|
|
||||||
|
message ListenEventsRequest {
|
||||||
|
string namespace = 1;
|
||||||
|
optional string id = 2;
|
||||||
|
}
|
||||||
|
message ListenEventsResponse {
|
||||||
|
string id = 1;
|
||||||
|
string value = 2;
|
||||||
|
}
|
@ -1,15 +1,33 @@
|
|||||||
use agent_state::AgentState;
|
use agent_state::AgentState;
|
||||||
|
use event_handler::EventHandler;
|
||||||
use refresh::AgentRefresh;
|
use refresh::AgentRefresh;
|
||||||
|
|
||||||
|
pub use config::setup_config;
|
||||||
|
|
||||||
|
pub mod models;
|
||||||
|
pub(crate) mod task;
|
||||||
|
|
||||||
mod agent_state;
|
mod agent_state;
|
||||||
|
mod config;
|
||||||
|
mod discovery_client;
|
||||||
|
mod event_handler;
|
||||||
|
mod grpc_client;
|
||||||
|
mod plugins;
|
||||||
|
mod queue;
|
||||||
mod refresh;
|
mod refresh;
|
||||||
|
mod scheduler;
|
||||||
|
|
||||||
pub async fn execute(host: impl Into<String>) -> anyhow::Result<()> {
|
mod handlers;
|
||||||
|
|
||||||
|
mod actions;
|
||||||
|
|
||||||
|
pub async fn execute() -> anyhow::Result<()> {
|
||||||
let state = AgentState::new().await?;
|
let state = AgentState::new().await?;
|
||||||
|
|
||||||
notmad::Mad::builder()
|
notmad::Mad::builder()
|
||||||
.add(AgentRefresh::new(&state, host))
|
.add(AgentRefresh::new(&state))
|
||||||
|
.add(EventHandler::new(&state))
|
||||||
|
.add(state.queue.clone())
|
||||||
.cancellation(Some(std::time::Duration::from_secs(2)))
|
.cancellation(Some(std::time::Duration::from_secs(2)))
|
||||||
.run()
|
.run()
|
||||||
.await?;
|
.await?;
|
||||||
|
23
crates/churn/src/agent/actions.rs
Normal file
23
crates/churn/src/agent/actions.rs
Normal file
@ -0,0 +1,23 @@
|
|||||||
|
use apt::AptTask;
|
||||||
|
use plugin_task::PluginTask;
|
||||||
|
|
||||||
|
use super::{plugins::PluginStore, task::IntoTask};
|
||||||
|
|
||||||
|
pub mod apt;
|
||||||
|
pub mod plugin_task;
|
||||||
|
|
||||||
|
pub struct Plan {
|
||||||
|
store: PluginStore,
|
||||||
|
}
|
||||||
|
impl Plan {
|
||||||
|
pub fn new(store: PluginStore) -> Self {
|
||||||
|
Self { store }
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn tasks(&self) -> anyhow::Result<Vec<impl IntoTask>> {
|
||||||
|
Ok(vec![
|
||||||
|
AptTask::new().into_task(),
|
||||||
|
PluginTask::new("alloy@0.1.0", self.store.clone()).into_task(),
|
||||||
|
])
|
||||||
|
}
|
||||||
|
}
|
49
crates/churn/src/agent/actions/apt.rs
Normal file
49
crates/churn/src/agent/actions/apt.rs
Normal file
@ -0,0 +1,49 @@
|
|||||||
|
use anyhow::Context;
|
||||||
|
|
||||||
|
use crate::agent::task::Task;
|
||||||
|
|
||||||
|
pub struct AptTask {}
|
||||||
|
|
||||||
|
impl AptTask {
|
||||||
|
pub fn new() -> Self {
|
||||||
|
Self {}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait::async_trait]
|
||||||
|
impl Task for AptTask {
|
||||||
|
async fn id(&self) -> anyhow::Result<String> {
|
||||||
|
Ok("apt".into())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn execute(&self) -> anyhow::Result<()> {
|
||||||
|
let mut cmd = tokio::process::Command::new("apt-get");
|
||||||
|
cmd.args(["update", "-q"]);
|
||||||
|
let output = cmd.output().await.context("failed to run apt update")?;
|
||||||
|
match output.status.success() {
|
||||||
|
true => tracing::info!("successfully ran apt update"),
|
||||||
|
false => {
|
||||||
|
anyhow::bail!(
|
||||||
|
"failed to run apt update: {}",
|
||||||
|
std::str::from_utf8(&output.stderr)?
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut cmd = tokio::process::Command::new("apt-get");
|
||||||
|
cmd.env("DEBIAN_FRONTEND", "noninteractive")
|
||||||
|
.args(["upgrade", "-y"]);
|
||||||
|
let output = cmd.output().await.context("failed to run apt upgrade")?;
|
||||||
|
match output.status.success() {
|
||||||
|
true => tracing::info!("successfully ran apt upgrade"),
|
||||||
|
false => {
|
||||||
|
anyhow::bail!(
|
||||||
|
"failed to run apt upgrade: {}",
|
||||||
|
std::str::from_utf8(&output.stderr)?
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
30
crates/churn/src/agent/actions/plugin_task.rs
Normal file
30
crates/churn/src/agent/actions/plugin_task.rs
Normal file
@ -0,0 +1,30 @@
|
|||||||
|
use crate::agent::{plugins::PluginStore, task::Task};
|
||||||
|
|
||||||
|
pub struct PluginTask {
|
||||||
|
plugin: String,
|
||||||
|
store: PluginStore,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl PluginTask {
|
||||||
|
pub fn new(plugin: impl Into<String>, store: PluginStore) -> Self {
|
||||||
|
Self {
|
||||||
|
plugin: plugin.into(),
|
||||||
|
store,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait::async_trait]
|
||||||
|
impl Task for PluginTask {
|
||||||
|
async fn id(&self) -> anyhow::Result<String> {
|
||||||
|
let id = self.store.id(&self.plugin).await?;
|
||||||
|
|
||||||
|
Ok(id)
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn execute(&self) -> anyhow::Result<()> {
|
||||||
|
self.store.execute(&self.plugin).await?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
@ -1,5 +1,13 @@
|
|||||||
use std::{ops::Deref, sync::Arc};
|
use std::{ops::Deref, sync::Arc};
|
||||||
|
|
||||||
|
use crate::api::Discovery;
|
||||||
|
|
||||||
|
use super::{
|
||||||
|
config::AgentConfig, discovery_client::DiscoveryClient, grpc_client::GrpcClient,
|
||||||
|
handlers::scheduled_tasks::ScheduledTasks, plugins::PluginStore, queue::AgentQueue,
|
||||||
|
scheduler::Scheduler,
|
||||||
|
};
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct AgentState(Arc<State>);
|
pub struct AgentState(Arc<State>);
|
||||||
|
|
||||||
@ -23,10 +31,30 @@ impl Deref for AgentState {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct State {}
|
pub struct State {
|
||||||
|
pub grpc: GrpcClient,
|
||||||
|
pub config: AgentConfig,
|
||||||
|
pub discovery: Discovery,
|
||||||
|
pub queue: AgentQueue,
|
||||||
|
pub plugin_store: PluginStore,
|
||||||
|
}
|
||||||
|
|
||||||
impl State {
|
impl State {
|
||||||
pub async fn new() -> anyhow::Result<Self> {
|
pub async fn new() -> anyhow::Result<Self> {
|
||||||
Ok(Self {})
|
let config = AgentConfig::new().await?;
|
||||||
|
let discovery = DiscoveryClient::new(&config.discovery).discover().await?;
|
||||||
|
let grpc = GrpcClient::new(&discovery.process_host);
|
||||||
|
let plugin_store = PluginStore::new()?;
|
||||||
|
let scheduled_tasks = ScheduledTasks::new(plugin_store.clone());
|
||||||
|
let scheduler = Scheduler::new(scheduled_tasks);
|
||||||
|
let queue = AgentQueue::new(scheduler);
|
||||||
|
|
||||||
|
Ok(Self {
|
||||||
|
grpc,
|
||||||
|
config,
|
||||||
|
discovery,
|
||||||
|
queue,
|
||||||
|
plugin_store,
|
||||||
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
93
crates/churn/src/agent/config.rs
Normal file
93
crates/churn/src/agent/config.rs
Normal file
@ -0,0 +1,93 @@
|
|||||||
|
use std::collections::BTreeMap;
|
||||||
|
|
||||||
|
use anyhow::Context;
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
use uuid::Uuid;
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct AgentConfig {
|
||||||
|
pub agent_id: String,
|
||||||
|
pub discovery: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl AgentConfig {
|
||||||
|
pub async fn new() -> anyhow::Result<Self> {
|
||||||
|
let config = ConfigFile::load().await?;
|
||||||
|
|
||||||
|
Ok(Self {
|
||||||
|
agent_id: config.agent_id,
|
||||||
|
discovery: config.discovery,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Serialize, Deserialize)]
|
||||||
|
struct ConfigFile {
|
||||||
|
agent_id: String,
|
||||||
|
discovery: String,
|
||||||
|
|
||||||
|
labels: Option<BTreeMap<String, String>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ConfigFile {
|
||||||
|
pub async fn load() -> anyhow::Result<Self> {
|
||||||
|
let directory = dirs::data_dir()
|
||||||
|
.ok_or(anyhow::anyhow!("failed to get data dir"))?
|
||||||
|
.join("io.kjuulh.churn-agent")
|
||||||
|
.join("churn-agent.toml");
|
||||||
|
|
||||||
|
if !directory.exists() {
|
||||||
|
anyhow::bail!(
|
||||||
|
"No churn agent file was setup, run `churn agent setup` to setup the defaults"
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
let contents = tokio::fs::read_to_string(&directory).await?;
|
||||||
|
|
||||||
|
toml::from_str(&contents).context("failed to parse the contents of the churn agent config")
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn write_default(
|
||||||
|
discovery: impl Into<String>,
|
||||||
|
force: bool,
|
||||||
|
labels: impl Into<BTreeMap<String, String>>,
|
||||||
|
) -> anyhow::Result<Self> {
|
||||||
|
let s = Self {
|
||||||
|
agent_id: Uuid::new_v4().to_string(),
|
||||||
|
discovery: discovery.into(),
|
||||||
|
labels: Some(labels.into()),
|
||||||
|
};
|
||||||
|
|
||||||
|
let directory = dirs::data_dir()
|
||||||
|
.ok_or(anyhow::anyhow!("failed to get data dir"))?
|
||||||
|
.join("io.kjuulh.churn-agent")
|
||||||
|
.join("churn-agent.toml");
|
||||||
|
|
||||||
|
if let Some(parent) = directory.parent() {
|
||||||
|
tokio::fs::create_dir_all(&parent).await?;
|
||||||
|
}
|
||||||
|
|
||||||
|
if !force && directory.exists() {
|
||||||
|
anyhow::bail!("config file already exists, consider moving it to a backup before trying again: {}", directory.display());
|
||||||
|
}
|
||||||
|
|
||||||
|
let contents = toml::to_string_pretty(&s)
|
||||||
|
.context("failed to convert default implementation to string")?;
|
||||||
|
|
||||||
|
tokio::fs::write(directory, contents.as_bytes())
|
||||||
|
.await
|
||||||
|
.context("failed to write to agent file")?;
|
||||||
|
|
||||||
|
Ok(s)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn setup_config(
|
||||||
|
discovery: impl Into<String>,
|
||||||
|
force: bool,
|
||||||
|
labels: impl Into<BTreeMap<String, String>>,
|
||||||
|
) -> anyhow::Result<()> {
|
||||||
|
ConfigFile::write_default(discovery, force, labels).await?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
21
crates/churn/src/agent/discovery_client.rs
Normal file
21
crates/churn/src/agent/discovery_client.rs
Normal file
@ -0,0 +1,21 @@
|
|||||||
|
use crate::api::Discovery;
|
||||||
|
|
||||||
|
pub struct DiscoveryClient {
|
||||||
|
host: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl DiscoveryClient {
|
||||||
|
pub fn new(discovery_host: impl Into<String>) -> Self {
|
||||||
|
Self {
|
||||||
|
host: discovery_host.into(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn discover(&self) -> anyhow::Result<Discovery> {
|
||||||
|
tracing::info!(
|
||||||
|
"getting details from discovery endpoint: {}/discovery",
|
||||||
|
self.host.trim_end_matches('/')
|
||||||
|
);
|
||||||
|
crate::api::Discovery::get_from_host(&self.host).await
|
||||||
|
}
|
||||||
|
}
|
63
crates/churn/src/agent/event_handler.rs
Normal file
63
crates/churn/src/agent/event_handler.rs
Normal file
@ -0,0 +1,63 @@
|
|||||||
|
use notmad::{Component, MadError};
|
||||||
|
|
||||||
|
use crate::agent::models::Commands;
|
||||||
|
|
||||||
|
use super::{
|
||||||
|
agent_state::AgentState, config::AgentConfig, grpc_client::GrpcClient, queue::AgentQueue,
|
||||||
|
};
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct EventHandler {
|
||||||
|
config: AgentConfig,
|
||||||
|
grpc: GrpcClient,
|
||||||
|
queue: AgentQueue,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl EventHandler {
|
||||||
|
pub fn new(state: impl Into<AgentState>) -> Self {
|
||||||
|
let state: AgentState = state.into();
|
||||||
|
|
||||||
|
Self {
|
||||||
|
config: state.config.clone(),
|
||||||
|
grpc: state.grpc.clone(),
|
||||||
|
queue: state.queue.clone(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait::async_trait]
|
||||||
|
impl Component for EventHandler {
|
||||||
|
fn name(&self) -> Option<String> {
|
||||||
|
Some("event_handler".into())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn run(
|
||||||
|
&self,
|
||||||
|
cancellation_token: tokio_util::sync::CancellationToken,
|
||||||
|
) -> Result<(), notmad::MadError> {
|
||||||
|
tokio::select! {
|
||||||
|
_ = cancellation_token.cancelled() => {},
|
||||||
|
res = self.grpc.listen_events("agents", None::<String>, self.clone()) => {
|
||||||
|
res.map_err(MadError::Inner)?;
|
||||||
|
},
|
||||||
|
res = self.grpc.listen_events("agents", Some(&self.config.agent_id), self.clone()) => {
|
||||||
|
res.map_err(MadError::Inner)?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait::async_trait]
|
||||||
|
impl super::grpc_client::ListenEventsExecutor for EventHandler {
|
||||||
|
async fn execute(&self, event: crate::grpc::ListenEventsResponse) -> anyhow::Result<()> {
|
||||||
|
tracing::info!(value = event.id, "received event");
|
||||||
|
|
||||||
|
let event: Commands = serde_json::from_str(&event.value)?;
|
||||||
|
|
||||||
|
self.queue.publish(event).await?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
108
crates/churn/src/agent/grpc_client.rs
Normal file
108
crates/churn/src/agent/grpc_client.rs
Normal file
@ -0,0 +1,108 @@
|
|||||||
|
use tonic::transport::{Channel, ClientTlsConfig};
|
||||||
|
|
||||||
|
use crate::grpc::{churn_client::ChurnClient, *};
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct GrpcClient {
|
||||||
|
host: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl GrpcClient {
|
||||||
|
pub fn new(host: impl Into<String>) -> Self {
|
||||||
|
Self { host: host.into() }
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn get_key(
|
||||||
|
&self,
|
||||||
|
namespace: &str,
|
||||||
|
id: Option<impl Into<String>>,
|
||||||
|
key: &str,
|
||||||
|
) -> anyhow::Result<Option<String>> {
|
||||||
|
let mut client = self.client().await?;
|
||||||
|
|
||||||
|
let resp = client
|
||||||
|
.get_key(GetKeyRequest {
|
||||||
|
key: key.into(),
|
||||||
|
namespace: namespace.into(),
|
||||||
|
id: id.map(|i| i.into()),
|
||||||
|
})
|
||||||
|
.await?;
|
||||||
|
let resp = resp.into_inner();
|
||||||
|
|
||||||
|
Ok(resp.value)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn set_key(
|
||||||
|
&self,
|
||||||
|
namespace: &str,
|
||||||
|
id: Option<impl Into<String>>,
|
||||||
|
key: &str,
|
||||||
|
value: &str,
|
||||||
|
) -> anyhow::Result<()> {
|
||||||
|
let mut client = self.client().await?;
|
||||||
|
|
||||||
|
client
|
||||||
|
.set_key(SetKeyRequest {
|
||||||
|
key: key.into(),
|
||||||
|
value: value.into(),
|
||||||
|
namespace: namespace.into(),
|
||||||
|
id: id.map(|i| i.into()),
|
||||||
|
})
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn listen_events(
|
||||||
|
&self,
|
||||||
|
namespace: &str,
|
||||||
|
id: Option<impl Into<String>>,
|
||||||
|
exec: impl ListenEventsExecutor,
|
||||||
|
) -> anyhow::Result<()> {
|
||||||
|
let mut client = self.client().await?;
|
||||||
|
|
||||||
|
tracing::debug!("creating stream for listening to events on: {}", namespace);
|
||||||
|
let resp = client
|
||||||
|
.listen_events(ListenEventsRequest {
|
||||||
|
namespace: namespace.into(),
|
||||||
|
id: id.map(|i| i.into()),
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
.inspect_err(|e| tracing::warn!("failed to establish a connection: {}", e))?;
|
||||||
|
|
||||||
|
tracing::debug!("setup stream: {}", namespace);
|
||||||
|
let mut inner = resp.into_inner();
|
||||||
|
while let Ok(Some(message)) = inner.message().await {
|
||||||
|
tracing::debug!("received message: {}", namespace);
|
||||||
|
exec.execute(message)
|
||||||
|
.await
|
||||||
|
.inspect_err(|e| tracing::warn!("failed to handle message: {}", e))?;
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn client(&self) -> anyhow::Result<ChurnClient<tonic::transport::Channel>> {
|
||||||
|
tracing::debug!("setting up client");
|
||||||
|
let channel = if self.host.starts_with("https") {
|
||||||
|
Channel::from_shared(self.host.to_owned())?
|
||||||
|
.tls_config(ClientTlsConfig::new().with_native_roots())?
|
||||||
|
.connect_timeout(std::time::Duration::from_secs(5))
|
||||||
|
.connect()
|
||||||
|
.await?
|
||||||
|
} else {
|
||||||
|
Channel::from_shared(self.host.to_owned())?
|
||||||
|
.connect()
|
||||||
|
.await?
|
||||||
|
};
|
||||||
|
|
||||||
|
let client = ChurnClient::new(channel);
|
||||||
|
|
||||||
|
Ok(client)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait::async_trait]
|
||||||
|
pub trait ListenEventsExecutor {
|
||||||
|
async fn execute(&self, event: ListenEventsResponse) -> anyhow::Result<()>;
|
||||||
|
}
|
1
crates/churn/src/agent/handlers.rs
Normal file
1
crates/churn/src/agent/handlers.rs
Normal file
@ -0,0 +1 @@
|
|||||||
|
pub mod scheduled_tasks;
|
46
crates/churn/src/agent/handlers/scheduled_tasks.rs
Normal file
46
crates/churn/src/agent/handlers/scheduled_tasks.rs
Normal file
@ -0,0 +1,46 @@
|
|||||||
|
use std::collections::BTreeMap;
|
||||||
|
|
||||||
|
use crate::agent::{
|
||||||
|
actions::Plan,
|
||||||
|
plugins::PluginStore,
|
||||||
|
task::{ConcreteTask, IntoTask},
|
||||||
|
};
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct ScheduledTasks {
|
||||||
|
store: PluginStore,
|
||||||
|
}
|
||||||
|
impl ScheduledTasks {
|
||||||
|
pub fn new(store: PluginStore) -> Self {
|
||||||
|
Self { store }
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn handle(
|
||||||
|
&self,
|
||||||
|
task: &str,
|
||||||
|
_properties: BTreeMap<String, String>,
|
||||||
|
) -> anyhow::Result<()> {
|
||||||
|
tracing::info!("scheduling: {}", task);
|
||||||
|
|
||||||
|
let plan = Plan::new(self.store.clone());
|
||||||
|
let tasks: Vec<ConcreteTask> = plan
|
||||||
|
.tasks()
|
||||||
|
.await?
|
||||||
|
.into_iter()
|
||||||
|
.map(|i| i.into_task())
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
for task in tasks {
|
||||||
|
let id = task.id().await?;
|
||||||
|
if !task.should_run().await? {
|
||||||
|
tracing::debug!(task = id, "skipping run");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
tracing::info!(task = id, "executing task");
|
||||||
|
task.execute().await?;
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
20
crates/churn/src/agent/models.rs
Normal file
20
crates/churn/src/agent/models.rs
Normal file
@ -0,0 +1,20 @@
|
|||||||
|
use std::{collections::BTreeMap, fmt::Display};
|
||||||
|
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
|
#[derive(Serialize, Deserialize, Clone, Debug)]
|
||||||
|
#[serde(tag = "type")]
|
||||||
|
pub enum Commands {
|
||||||
|
ScheduleTask {
|
||||||
|
task: String,
|
||||||
|
properties: BTreeMap<String, String>,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Display for Commands {
|
||||||
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||||
|
f.write_str(match self {
|
||||||
|
Commands::ScheduleTask { .. } => "schedule_task",
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
239
crates/churn/src/agent/plugins.rs
Normal file
239
crates/churn/src/agent/plugins.rs
Normal file
@ -0,0 +1,239 @@
|
|||||||
|
use anyhow::Context;
|
||||||
|
use component::churn_tasks::process::HostProcess;
|
||||||
|
use futures::StreamExt;
|
||||||
|
use std::sync::Arc;
|
||||||
|
use tokio::io::AsyncWriteExt;
|
||||||
|
use tokio::sync::Mutex;
|
||||||
|
use wasmtime::component::*;
|
||||||
|
use wasmtime::{Config, Engine, Store};
|
||||||
|
use wasmtime_wasi::{DirPerms, FilePerms, WasiCtx, WasiCtxBuilder, WasiView};
|
||||||
|
|
||||||
|
wasmtime::component::bindgen!({
|
||||||
|
path: "wit/world.wit",
|
||||||
|
//world: "churn",
|
||||||
|
async: true,
|
||||||
|
with: {
|
||||||
|
"component:churn-tasks/process/process": CustomProcess
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
#[derive(Default)]
|
||||||
|
pub struct CustomProcess {}
|
||||||
|
impl CustomProcess {
|
||||||
|
pub fn run(&self, args: Vec<String>) -> String {
|
||||||
|
tracing::info!("calling function");
|
||||||
|
|
||||||
|
match args.split_first() {
|
||||||
|
Some((item, rest)) => {
|
||||||
|
let mut cmd = std::process::Command::new(item);
|
||||||
|
match cmd.args(rest).output() {
|
||||||
|
Ok(output) => std::str::from_utf8(&output.stdout)
|
||||||
|
.expect("to be able to parse utf8")
|
||||||
|
.to_string(),
|
||||||
|
Err(e) => {
|
||||||
|
tracing::error!("command failed with output: {e}");
|
||||||
|
e.to_string()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
None => {
|
||||||
|
tracing::warn!("failed to call function because it is empty");
|
||||||
|
panic!("failed to call function because it is empty")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct PluginStore {
|
||||||
|
inner: Arc<Mutex<InnerPluginStore>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl PluginStore {
|
||||||
|
pub fn new() -> anyhow::Result<Self> {
|
||||||
|
Ok(Self {
|
||||||
|
inner: Arc::new(Mutex::new(InnerPluginStore::new()?)),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn id(&self, plugin: &str) -> anyhow::Result<String> {
|
||||||
|
let mut inner = self.inner.lock().await;
|
||||||
|
inner.id(plugin).await
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn execute(&self, plugin: &str) -> anyhow::Result<()> {
|
||||||
|
let mut inner = self.inner.lock().await;
|
||||||
|
inner.execute(plugin).await
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct InnerPluginStore {
|
||||||
|
store: wasmtime::Store<ServerWasiView>,
|
||||||
|
linker: wasmtime::component::Linker<ServerWasiView>,
|
||||||
|
engine: wasmtime::Engine,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl InnerPluginStore {
|
||||||
|
pub fn new() -> anyhow::Result<Self> {
|
||||||
|
let mut config = Config::default();
|
||||||
|
config.wasm_component_model(true);
|
||||||
|
config.async_support(true);
|
||||||
|
let engine = Engine::new(&config)?;
|
||||||
|
let mut linker: wasmtime::component::Linker<ServerWasiView> = Linker::new(&engine);
|
||||||
|
|
||||||
|
// Add the command world (aka WASI CLI) to the linker
|
||||||
|
wasmtime_wasi::add_to_linker_async(&mut linker).context("Failed to link command world")?;
|
||||||
|
|
||||||
|
component::churn_tasks::process::add_to_linker(
|
||||||
|
&mut linker,
|
||||||
|
|state: &mut ServerWasiView| state,
|
||||||
|
)?;
|
||||||
|
|
||||||
|
let wasi_view = ServerWasiView::new();
|
||||||
|
let store = Store::new(&engine, wasi_view);
|
||||||
|
|
||||||
|
Ok(Self {
|
||||||
|
store,
|
||||||
|
linker,
|
||||||
|
engine,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn id(&mut self, plugin: &str) -> anyhow::Result<String> {
|
||||||
|
let plugin = self.ensure_plugin(plugin).await?;
|
||||||
|
|
||||||
|
plugin
|
||||||
|
.interface0
|
||||||
|
.call_id(&mut self.store)
|
||||||
|
.await
|
||||||
|
.context("Failed to call add function")
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn execute(&mut self, plugin: &str) -> anyhow::Result<()> {
|
||||||
|
let plugin = self.ensure_plugin(plugin).await?;
|
||||||
|
|
||||||
|
plugin
|
||||||
|
.interface0
|
||||||
|
.call_execute(&mut self.store)
|
||||||
|
.await
|
||||||
|
.context("Failed to call add function")
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn ensure_plugin(&mut self, plugin: &str) -> anyhow::Result<Churn> {
|
||||||
|
let cache = dirs::cache_dir()
|
||||||
|
.ok_or(anyhow::anyhow!("failed to find cache dir"))?
|
||||||
|
.join("io.kjuulh.churn");
|
||||||
|
|
||||||
|
let (plugin_name, plugin_version) = plugin.split_once("@").unwrap_or((plugin, "latest"));
|
||||||
|
|
||||||
|
let plugin_path = cache
|
||||||
|
.join("plugins")
|
||||||
|
.join(plugin_name)
|
||||||
|
.join(plugin_version)
|
||||||
|
.join(format!("{plugin_name}.wasm"));
|
||||||
|
|
||||||
|
let no_cache: bool = std::env::var("CHURN_NO_CACHE")
|
||||||
|
.unwrap_or("false".into())
|
||||||
|
.parse()?;
|
||||||
|
|
||||||
|
if !plugin_path.exists() || no_cache {
|
||||||
|
tracing::info!(
|
||||||
|
plugin_name = plugin_name,
|
||||||
|
plugin_version = plugin_version,
|
||||||
|
"downloading plugin"
|
||||||
|
);
|
||||||
|
if let Some(parent) = plugin_path.parent() {
|
||||||
|
tokio::fs::create_dir_all(parent).await?;
|
||||||
|
}
|
||||||
|
|
||||||
|
let req = reqwest::get(format!("https://api-minio.front.kjuulh.io/churn-registry/{plugin_name}/{plugin_version}/{plugin_name}.wasm")).await.context("failed to get plugin from registry")?;
|
||||||
|
let mut stream = req.bytes_stream();
|
||||||
|
let mut file = tokio::fs::File::create(&plugin_path).await?;
|
||||||
|
while let Some(chunk) = stream.next().await {
|
||||||
|
let chunk = chunk?;
|
||||||
|
file.write_all(&chunk).await?;
|
||||||
|
}
|
||||||
|
file.flush().await?;
|
||||||
|
}
|
||||||
|
|
||||||
|
let component =
|
||||||
|
Component::from_file(&self.engine, plugin_path).context("Component file not found")?;
|
||||||
|
|
||||||
|
tracing::debug!(
|
||||||
|
plugin_name = plugin_name,
|
||||||
|
plugin_version = plugin_version,
|
||||||
|
"instantiating plugin"
|
||||||
|
);
|
||||||
|
let instance = Churn::instantiate_async(&mut self.store, &component, &self.linker)
|
||||||
|
.await
|
||||||
|
.context("Failed to instantiate the example world")
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
Ok(instance)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
struct ServerWasiView {
|
||||||
|
table: ResourceTable,
|
||||||
|
ctx: WasiCtx,
|
||||||
|
processes: ResourceTable,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ServerWasiView {
|
||||||
|
fn new() -> Self {
|
||||||
|
let table = ResourceTable::new();
|
||||||
|
|
||||||
|
let ctx = WasiCtxBuilder::new()
|
||||||
|
.inherit_stdio()
|
||||||
|
.inherit_env()
|
||||||
|
.inherit_network()
|
||||||
|
.preopened_dir("/", "/", DirPerms::all(), FilePerms::all())
|
||||||
|
.expect("to be able to open root")
|
||||||
|
.build();
|
||||||
|
|
||||||
|
Self {
|
||||||
|
table,
|
||||||
|
ctx,
|
||||||
|
processes: ResourceTable::default(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl WasiView for ServerWasiView {
|
||||||
|
fn table(&mut self) -> &mut ResourceTable {
|
||||||
|
&mut self.table
|
||||||
|
}
|
||||||
|
|
||||||
|
fn ctx(&mut self) -> &mut WasiCtx {
|
||||||
|
&mut self.ctx
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl component::churn_tasks::process::Host for ServerWasiView {}
|
||||||
|
|
||||||
|
#[async_trait::async_trait]
|
||||||
|
impl HostProcess for ServerWasiView {
|
||||||
|
async fn new(
|
||||||
|
&mut self,
|
||||||
|
) -> wasmtime::component::Resource<component::churn_tasks::process::Process> {
|
||||||
|
self.processes.push(CustomProcess::default()).unwrap()
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn run_process(
|
||||||
|
&mut self,
|
||||||
|
self_: wasmtime::component::Resource<component::churn_tasks::process::Process>,
|
||||||
|
inputs: wasmtime::component::__internal::Vec<String>,
|
||||||
|
) -> String {
|
||||||
|
let process = self.processes.get(&self_).unwrap();
|
||||||
|
process.run(inputs)
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn drop(
|
||||||
|
&mut self,
|
||||||
|
rep: wasmtime::component::Resource<component::churn_tasks::process::Process>,
|
||||||
|
) -> wasmtime::Result<()> {
|
||||||
|
self.processes.delete(rep)?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
67
crates/churn/src/agent/queue.rs
Normal file
67
crates/churn/src/agent/queue.rs
Normal file
@ -0,0 +1,67 @@
|
|||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use notmad::{Component, MadError};
|
||||||
|
use tokio::sync::Mutex;
|
||||||
|
|
||||||
|
use super::{models::Commands, scheduler::Scheduler};
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct AgentQueue {
|
||||||
|
sender: Arc<tokio::sync::mpsc::Sender<Commands>>,
|
||||||
|
receiver: Arc<Mutex<tokio::sync::mpsc::Receiver<Commands>>>,
|
||||||
|
|
||||||
|
scheduler: Scheduler,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl AgentQueue {
|
||||||
|
pub fn new(scheduler: Scheduler) -> Self {
|
||||||
|
let (tx, rx) = tokio::sync::mpsc::channel(5);
|
||||||
|
Self {
|
||||||
|
sender: Arc::new(tx),
|
||||||
|
receiver: Arc::new(Mutex::new(rx)),
|
||||||
|
|
||||||
|
scheduler,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn handler(&self, command: Commands) -> anyhow::Result<()> {
|
||||||
|
tracing::debug!("handling task");
|
||||||
|
|
||||||
|
self.scheduler.handle(command).await?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn publish(&self, command: Commands) -> anyhow::Result<()> {
|
||||||
|
tracing::debug!("publishing task: {}", command.to_string());
|
||||||
|
|
||||||
|
self.sender.send(command).await?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait::async_trait]
|
||||||
|
impl Component for AgentQueue {
|
||||||
|
async fn run(
|
||||||
|
&self,
|
||||||
|
cancellation_token: tokio_util::sync::CancellationToken,
|
||||||
|
) -> Result<(), notmad::MadError> {
|
||||||
|
loop {
|
||||||
|
let mut recv = self.receiver.lock().await;
|
||||||
|
|
||||||
|
tokio::select! {
|
||||||
|
res = recv.recv() => {
|
||||||
|
if let Some(res) = res {
|
||||||
|
self.handler(res).await.map_err(MadError::Inner)?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
_ = cancellation_token.cancelled() => {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
@ -1,29 +1,37 @@
|
|||||||
use anyhow::Context;
|
use std::collections::BTreeMap;
|
||||||
|
|
||||||
use super::agent_state::AgentState;
|
use crate::agent::models::Commands;
|
||||||
|
|
||||||
|
use super::{agent_state::AgentState, queue::AgentQueue};
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct AgentRefresh {
|
pub struct AgentRefresh {
|
||||||
_state: AgentState,
|
process_host: String,
|
||||||
host: String,
|
queue: AgentQueue,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl AgentRefresh {
|
impl AgentRefresh {
|
||||||
pub fn new(state: impl Into<AgentState>, host: impl Into<String>) -> Self {
|
pub fn new(state: impl Into<AgentState>) -> Self {
|
||||||
|
let state: AgentState = state.into();
|
||||||
Self {
|
Self {
|
||||||
_state: state.into(),
|
process_host: state.discovery.process_host.clone(),
|
||||||
host: host.into(),
|
queue: state.queue.clone(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait::async_trait]
|
#[async_trait::async_trait]
|
||||||
impl notmad::Component for AgentRefresh {
|
impl notmad::Component for AgentRefresh {
|
||||||
|
fn name(&self) -> Option<String> {
|
||||||
|
Some("agent_refresh".into())
|
||||||
|
}
|
||||||
|
|
||||||
async fn run(
|
async fn run(
|
||||||
&self,
|
&self,
|
||||||
cancellation_token: tokio_util::sync::CancellationToken,
|
cancellation_token: tokio_util::sync::CancellationToken,
|
||||||
) -> Result<(), notmad::MadError> {
|
) -> Result<(), notmad::MadError> {
|
||||||
let cancel = nodrift::schedule_drifter(std::time::Duration::from_secs(60), self.clone());
|
let cancel =
|
||||||
|
nodrift::schedule_drifter(std::time::Duration::from_secs(60 * 10), self.clone());
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
_ = cancel.cancelled() => {},
|
_ = cancel.cancelled() => {},
|
||||||
_ = cancellation_token.cancelled() => {
|
_ = cancellation_token.cancelled() => {
|
||||||
@ -39,82 +47,14 @@ impl notmad::Component for AgentRefresh {
|
|||||||
#[async_trait::async_trait]
|
#[async_trait::async_trait]
|
||||||
impl nodrift::Drifter for AgentRefresh {
|
impl nodrift::Drifter for AgentRefresh {
|
||||||
async fn execute(&self, _token: tokio_util::sync::CancellationToken) -> anyhow::Result<()> {
|
async fn execute(&self, _token: tokio_util::sync::CancellationToken) -> anyhow::Result<()> {
|
||||||
tracing::info!(host = self.host, "refreshing agent");
|
tracing::info!(process_host = self.process_host, "refreshing agent");
|
||||||
|
|
||||||
// Get plan
|
self.queue
|
||||||
let plan = Plan::new();
|
.publish(Commands::ScheduleTask {
|
||||||
let tasks = plan.tasks().await?;
|
task: "update".into(),
|
||||||
|
properties: BTreeMap::default(),
|
||||||
// For task
|
})
|
||||||
for task in tasks {
|
.await?;
|
||||||
// Check idempotency rules
|
|
||||||
if !task.should_run().await? {
|
|
||||||
tracing::debug!(task = task.id(), "skipping run");
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Run task if not valid
|
|
||||||
tracing::info!(task = task.id(), "executing task");
|
|
||||||
task.execute().await?;
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct Plan {}
|
|
||||||
impl Plan {
|
|
||||||
pub fn new() -> Self {
|
|
||||||
Self {}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn tasks(&self) -> anyhow::Result<Vec<Task>> {
|
|
||||||
Ok(vec![Task::new()])
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct Task {}
|
|
||||||
|
|
||||||
impl Task {
|
|
||||||
pub fn new() -> Self {
|
|
||||||
Self {}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn id(&self) -> String {
|
|
||||||
"apt".into()
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn should_run(&self) -> anyhow::Result<bool> {
|
|
||||||
Ok(true)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn execute(&self) -> anyhow::Result<()> {
|
|
||||||
let mut cmd = tokio::process::Command::new("apt-get");
|
|
||||||
cmd.args(["update", "-q"]);
|
|
||||||
let output = cmd.output().await.context("failed to run apt update")?;
|
|
||||||
match output.status.success() {
|
|
||||||
true => tracing::info!("successfully ran apt update"),
|
|
||||||
false => {
|
|
||||||
anyhow::bail!(
|
|
||||||
"failed to run apt update: {}",
|
|
||||||
std::str::from_utf8(&output.stderr)?
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
let mut cmd = tokio::process::Command::new("apt-get");
|
|
||||||
cmd.env("DEBIAN_FRONTEND", "noninteractive")
|
|
||||||
.args(["upgrade", "-y"]);
|
|
||||||
let output = cmd.output().await.context("failed to run apt upgrade")?;
|
|
||||||
match output.status.success() {
|
|
||||||
true => tracing::info!("successfully ran apt upgrade"),
|
|
||||||
false => {
|
|
||||||
anyhow::bail!(
|
|
||||||
"failed to run apt upgrade: {}",
|
|
||||||
std::str::from_utf8(&output.stderr)?
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
22
crates/churn/src/agent/scheduler.rs
Normal file
22
crates/churn/src/agent/scheduler.rs
Normal file
@ -0,0 +1,22 @@
|
|||||||
|
use super::{handlers::scheduled_tasks::ScheduledTasks, models::Commands};
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct Scheduler {
|
||||||
|
scheduled_tasks: ScheduledTasks,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Scheduler {
|
||||||
|
pub fn new(scheduled_tasks: ScheduledTasks) -> Self {
|
||||||
|
Self { scheduled_tasks }
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn handle(&self, command: Commands) -> anyhow::Result<()> {
|
||||||
|
match command {
|
||||||
|
Commands::ScheduleTask { task, properties } => {
|
||||||
|
self.scheduled_tasks.handle(&task, properties).await?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
45
crates/churn/src/agent/task.rs
Normal file
45
crates/churn/src/agent/task.rs
Normal file
@ -0,0 +1,45 @@
|
|||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
#[async_trait::async_trait]
|
||||||
|
pub trait Task {
|
||||||
|
async fn id(&self) -> anyhow::Result<String>;
|
||||||
|
async fn should_run(&self) -> anyhow::Result<bool> {
|
||||||
|
Ok(true)
|
||||||
|
}
|
||||||
|
async fn execute(&self) -> anyhow::Result<()>;
|
||||||
|
}
|
||||||
|
|
||||||
|
pub trait IntoTask {
|
||||||
|
fn into_task(self) -> ConcreteTask;
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct ConcreteTask {
|
||||||
|
inner: Arc<dyn Task + Sync + Send + 'static>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ConcreteTask {
|
||||||
|
pub fn new<T: Task + Sync + Send + 'static>(t: T) -> Self {
|
||||||
|
Self { inner: Arc::new(t) }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl std::ops::Deref for ConcreteTask {
|
||||||
|
type Target = Arc<dyn Task + Sync + Send + 'static>;
|
||||||
|
|
||||||
|
fn deref(&self) -> &Self::Target {
|
||||||
|
&self.inner
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl IntoTask for ConcreteTask {
|
||||||
|
fn into_task(self) -> ConcreteTask {
|
||||||
|
self
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T: Task + Sync + Send + 'static> IntoTask for T {
|
||||||
|
fn into_task(self) -> ConcreteTask {
|
||||||
|
ConcreteTask::new(self)
|
||||||
|
}
|
||||||
|
}
|
@ -1,6 +1,12 @@
|
|||||||
use std::net::SocketAddr;
|
use std::net::SocketAddr;
|
||||||
|
|
||||||
use axum::{extract::MatchedPath, http::Request, routing::get, Router};
|
use axum::{
|
||||||
|
extract::{MatchedPath, State},
|
||||||
|
http::Request,
|
||||||
|
routing::get,
|
||||||
|
Json, Router,
|
||||||
|
};
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
use tokio_util::sync::CancellationToken;
|
use tokio_util::sync::CancellationToken;
|
||||||
use tower_http::trace::TraceLayer;
|
use tower_http::trace::TraceLayer;
|
||||||
|
|
||||||
@ -22,6 +28,7 @@ impl Api {
|
|||||||
pub async fn serve(&self) -> anyhow::Result<()> {
|
pub async fn serve(&self) -> anyhow::Result<()> {
|
||||||
let app = Router::new()
|
let app = Router::new()
|
||||||
.route("/", get(root))
|
.route("/", get(root))
|
||||||
|
.route("/discovery", get(discovery))
|
||||||
.with_state(self.state.clone())
|
.with_state(self.state.clone())
|
||||||
.layer(
|
.layer(
|
||||||
TraceLayer::new_for_http().make_span_with(|request: &Request<_>| {
|
TraceLayer::new_for_http().make_span_with(|request: &Request<_>| {
|
||||||
@ -55,6 +62,28 @@ async fn root() -> &'static str {
|
|||||||
"Hello, churn!"
|
"Hello, churn!"
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Serialize, Deserialize)]
|
||||||
|
pub struct Discovery {
|
||||||
|
pub external_host: String,
|
||||||
|
pub process_host: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Discovery {
|
||||||
|
pub async fn get_from_host(host: &str) -> anyhow::Result<Self> {
|
||||||
|
let resp = reqwest::get(format!("{}/discovery", host.trim_end_matches('/'))).await?;
|
||||||
|
let s: Self = resp.json().await?;
|
||||||
|
|
||||||
|
Ok(s)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn discovery(State(state): State<SharedState>) -> Json<Discovery> {
|
||||||
|
Json(Discovery {
|
||||||
|
external_host: state.config.external_host.clone(),
|
||||||
|
process_host: state.config.process_host.clone(),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
#[async_trait::async_trait]
|
#[async_trait::async_trait]
|
||||||
impl notmad::Component for Api {
|
impl notmad::Component for Api {
|
||||||
async fn run(&self, _cancellation_token: CancellationToken) -> Result<(), notmad::MadError> {
|
async fn run(&self, _cancellation_token: CancellationToken) -> Result<(), notmad::MadError> {
|
||||||
|
@ -1,28 +1,39 @@
|
|||||||
use std::net::SocketAddr;
|
use std::{collections::BTreeMap, net::SocketAddr};
|
||||||
|
|
||||||
use clap::{Parser, Subcommand};
|
use clap::{Parser, Subcommand};
|
||||||
|
|
||||||
use crate::{agent, api, state::SharedState};
|
use crate::{agent, server};
|
||||||
|
|
||||||
pub async fn execute() -> anyhow::Result<()> {
|
pub async fn execute() -> anyhow::Result<()> {
|
||||||
let state = SharedState::new().await?;
|
|
||||||
|
|
||||||
let cli = Command::parse();
|
let cli = Command::parse();
|
||||||
match cli.command.expect("to have a subcommand") {
|
match cli.command.expect("to have a subcommand") {
|
||||||
Commands::Serve { host } => {
|
Commands::Serve {
|
||||||
|
host,
|
||||||
|
grpc_host,
|
||||||
|
config,
|
||||||
|
} => {
|
||||||
tracing::info!("Starting service");
|
tracing::info!("Starting service");
|
||||||
|
server::execute(host, grpc_host, config).await?;
|
||||||
notmad::Mad::builder()
|
|
||||||
.add(api::Api::new(&state, host))
|
|
||||||
.run()
|
|
||||||
.await?;
|
|
||||||
}
|
}
|
||||||
Commands::Agent { commands } => match commands {
|
Commands::Agent { commands } => match commands {
|
||||||
AgentCommands::Start { host } => {
|
AgentCommands::Start {} => {
|
||||||
tracing::info!("starting agent");
|
tracing::info!("starting agent");
|
||||||
agent::execute(host).await?;
|
agent::execute().await?;
|
||||||
tracing::info!("shut down agent");
|
tracing::info!("shut down agent");
|
||||||
}
|
}
|
||||||
|
AgentCommands::Setup {
|
||||||
|
force,
|
||||||
|
discovery,
|
||||||
|
labels,
|
||||||
|
} => {
|
||||||
|
let mut setup_labels = BTreeMap::new();
|
||||||
|
for (k, v) in labels {
|
||||||
|
setup_labels.insert(k, v);
|
||||||
|
}
|
||||||
|
|
||||||
|
agent::setup_config(discovery, force, setup_labels).await?;
|
||||||
|
tracing::info!("wrote default agent config");
|
||||||
|
}
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -41,6 +52,12 @@ enum Commands {
|
|||||||
Serve {
|
Serve {
|
||||||
#[arg(env = "SERVICE_HOST", long, default_value = "127.0.0.1:3000")]
|
#[arg(env = "SERVICE_HOST", long, default_value = "127.0.0.1:3000")]
|
||||||
host: SocketAddr,
|
host: SocketAddr,
|
||||||
|
|
||||||
|
#[arg(env = "SERVICE_GRPC_HOST", long, default_value = "127.0.0.1:7900")]
|
||||||
|
grpc_host: SocketAddr,
|
||||||
|
|
||||||
|
#[clap(flatten)]
|
||||||
|
config: server::config::ServerConfig,
|
||||||
},
|
},
|
||||||
Agent {
|
Agent {
|
||||||
#[command(subcommand)]
|
#[command(subcommand)]
|
||||||
@ -50,8 +67,22 @@ enum Commands {
|
|||||||
|
|
||||||
#[derive(Subcommand)]
|
#[derive(Subcommand)]
|
||||||
enum AgentCommands {
|
enum AgentCommands {
|
||||||
Start {
|
Start {},
|
||||||
#[arg(env = "SERVICE_HOST", long = "service-host")]
|
Setup {
|
||||||
host: String,
|
#[arg(long, default_value = "false")]
|
||||||
|
force: bool,
|
||||||
|
|
||||||
|
#[arg(env = "DISCOVERY_HOST", long = "discovery")]
|
||||||
|
discovery: String,
|
||||||
|
|
||||||
|
#[arg(long = "label", short = 'l', value_parser = parse_key_val, action = clap::ArgAction::Append)]
|
||||||
|
labels: Vec<(String, String)>,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn parse_key_val(s: &str) -> Result<(String, String), String> {
|
||||||
|
let (key, value) = s
|
||||||
|
.split_once("=")
|
||||||
|
.ok_or_else(|| format!("invalid key=value: no `=` found in `{s}`"))?;
|
||||||
|
Ok((key.to_string(), value.to_string()))
|
||||||
|
}
|
||||||
|
52
crates/churn/src/grpc/churn.v1.rs
Normal file
52
crates/churn/src/grpc/churn.v1.rs
Normal file
@ -0,0 +1,52 @@
|
|||||||
|
// @generated
|
||||||
|
// This file is @generated by prost-build.
|
||||||
|
#[allow(clippy::derive_partial_eq_without_eq)]
|
||||||
|
#[derive(Clone, PartialEq, ::prost::Message)]
|
||||||
|
pub struct GetKeyRequest {
|
||||||
|
#[prost(string, tag="1")]
|
||||||
|
pub namespace: ::prost::alloc::string::String,
|
||||||
|
#[prost(string, optional, tag="2")]
|
||||||
|
pub id: ::core::option::Option<::prost::alloc::string::String>,
|
||||||
|
#[prost(string, tag="3")]
|
||||||
|
pub key: ::prost::alloc::string::String,
|
||||||
|
}
|
||||||
|
#[allow(clippy::derive_partial_eq_without_eq)]
|
||||||
|
#[derive(Clone, PartialEq, ::prost::Message)]
|
||||||
|
pub struct GetKeyResponse {
|
||||||
|
#[prost(string, optional, tag="1")]
|
||||||
|
pub value: ::core::option::Option<::prost::alloc::string::String>,
|
||||||
|
}
|
||||||
|
#[allow(clippy::derive_partial_eq_without_eq)]
|
||||||
|
#[derive(Clone, PartialEq, ::prost::Message)]
|
||||||
|
pub struct SetKeyRequest {
|
||||||
|
#[prost(string, tag="1")]
|
||||||
|
pub namespace: ::prost::alloc::string::String,
|
||||||
|
#[prost(string, optional, tag="2")]
|
||||||
|
pub id: ::core::option::Option<::prost::alloc::string::String>,
|
||||||
|
#[prost(string, tag="3")]
|
||||||
|
pub key: ::prost::alloc::string::String,
|
||||||
|
#[prost(string, tag="4")]
|
||||||
|
pub value: ::prost::alloc::string::String,
|
||||||
|
}
|
||||||
|
#[allow(clippy::derive_partial_eq_without_eq)]
|
||||||
|
#[derive(Clone, Copy, PartialEq, ::prost::Message)]
|
||||||
|
pub struct SetKeyResponse {
|
||||||
|
}
|
||||||
|
#[allow(clippy::derive_partial_eq_without_eq)]
|
||||||
|
#[derive(Clone, PartialEq, ::prost::Message)]
|
||||||
|
pub struct ListenEventsRequest {
|
||||||
|
#[prost(string, tag="1")]
|
||||||
|
pub namespace: ::prost::alloc::string::String,
|
||||||
|
#[prost(string, optional, tag="2")]
|
||||||
|
pub id: ::core::option::Option<::prost::alloc::string::String>,
|
||||||
|
}
|
||||||
|
#[allow(clippy::derive_partial_eq_without_eq)]
|
||||||
|
#[derive(Clone, PartialEq, ::prost::Message)]
|
||||||
|
pub struct ListenEventsResponse {
|
||||||
|
#[prost(string, tag="1")]
|
||||||
|
pub id: ::prost::alloc::string::String,
|
||||||
|
#[prost(string, tag="2")]
|
||||||
|
pub value: ::prost::alloc::string::String,
|
||||||
|
}
|
||||||
|
include!("churn.v1.tonic.rs");
|
||||||
|
// @@protoc_insertion_point(module)
|
435
crates/churn/src/grpc/churn.v1.tonic.rs
Normal file
435
crates/churn/src/grpc/churn.v1.tonic.rs
Normal file
@ -0,0 +1,435 @@
|
|||||||
|
// @generated
|
||||||
|
/// Generated client implementations.
|
||||||
|
pub mod churn_client {
|
||||||
|
#![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)]
|
||||||
|
use tonic::codegen::*;
|
||||||
|
use tonic::codegen::http::Uri;
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
pub struct ChurnClient<T> {
|
||||||
|
inner: tonic::client::Grpc<T>,
|
||||||
|
}
|
||||||
|
impl ChurnClient<tonic::transport::Channel> {
|
||||||
|
/// Attempt to create a new client by connecting to a given endpoint.
|
||||||
|
pub async fn connect<D>(dst: D) -> Result<Self, tonic::transport::Error>
|
||||||
|
where
|
||||||
|
D: TryInto<tonic::transport::Endpoint>,
|
||||||
|
D::Error: Into<StdError>,
|
||||||
|
{
|
||||||
|
let conn = tonic::transport::Endpoint::new(dst)?.connect().await?;
|
||||||
|
Ok(Self::new(conn))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
impl<T> ChurnClient<T>
|
||||||
|
where
|
||||||
|
T: tonic::client::GrpcService<tonic::body::BoxBody>,
|
||||||
|
T::Error: Into<StdError>,
|
||||||
|
T::ResponseBody: Body<Data = Bytes> + Send + 'static,
|
||||||
|
<T::ResponseBody as Body>::Error: Into<StdError> + Send,
|
||||||
|
{
|
||||||
|
pub fn new(inner: T) -> Self {
|
||||||
|
let inner = tonic::client::Grpc::new(inner);
|
||||||
|
Self { inner }
|
||||||
|
}
|
||||||
|
pub fn with_origin(inner: T, origin: Uri) -> Self {
|
||||||
|
let inner = tonic::client::Grpc::with_origin(inner, origin);
|
||||||
|
Self { inner }
|
||||||
|
}
|
||||||
|
pub fn with_interceptor<F>(
|
||||||
|
inner: T,
|
||||||
|
interceptor: F,
|
||||||
|
) -> ChurnClient<InterceptedService<T, F>>
|
||||||
|
where
|
||||||
|
F: tonic::service::Interceptor,
|
||||||
|
T::ResponseBody: Default,
|
||||||
|
T: tonic::codegen::Service<
|
||||||
|
http::Request<tonic::body::BoxBody>,
|
||||||
|
Response = http::Response<
|
||||||
|
<T as tonic::client::GrpcService<tonic::body::BoxBody>>::ResponseBody,
|
||||||
|
>,
|
||||||
|
>,
|
||||||
|
<T as tonic::codegen::Service<
|
||||||
|
http::Request<tonic::body::BoxBody>,
|
||||||
|
>>::Error: Into<StdError> + Send + Sync,
|
||||||
|
{
|
||||||
|
ChurnClient::new(InterceptedService::new(inner, interceptor))
|
||||||
|
}
|
||||||
|
/// Compress requests with the given encoding.
|
||||||
|
///
|
||||||
|
/// This requires the server to support it otherwise it might respond with an
|
||||||
|
/// error.
|
||||||
|
#[must_use]
|
||||||
|
pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self {
|
||||||
|
self.inner = self.inner.send_compressed(encoding);
|
||||||
|
self
|
||||||
|
}
|
||||||
|
/// Enable decompressing responses.
|
||||||
|
#[must_use]
|
||||||
|
pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self {
|
||||||
|
self.inner = self.inner.accept_compressed(encoding);
|
||||||
|
self
|
||||||
|
}
|
||||||
|
/// Limits the maximum size of a decoded message.
|
||||||
|
///
|
||||||
|
/// Default: `4MB`
|
||||||
|
#[must_use]
|
||||||
|
pub fn max_decoding_message_size(mut self, limit: usize) -> Self {
|
||||||
|
self.inner = self.inner.max_decoding_message_size(limit);
|
||||||
|
self
|
||||||
|
}
|
||||||
|
/// Limits the maximum size of an encoded message.
|
||||||
|
///
|
||||||
|
/// Default: `usize::MAX`
|
||||||
|
#[must_use]
|
||||||
|
pub fn max_encoding_message_size(mut self, limit: usize) -> Self {
|
||||||
|
self.inner = self.inner.max_encoding_message_size(limit);
|
||||||
|
self
|
||||||
|
}
|
||||||
|
pub async fn get_key(
|
||||||
|
&mut self,
|
||||||
|
request: impl tonic::IntoRequest<super::GetKeyRequest>,
|
||||||
|
) -> std::result::Result<tonic::Response<super::GetKeyResponse>, tonic::Status> {
|
||||||
|
self.inner
|
||||||
|
.ready()
|
||||||
|
.await
|
||||||
|
.map_err(|e| {
|
||||||
|
tonic::Status::new(
|
||||||
|
tonic::Code::Unknown,
|
||||||
|
format!("Service was not ready: {}", e.into()),
|
||||||
|
)
|
||||||
|
})?;
|
||||||
|
let codec = tonic::codec::ProstCodec::default();
|
||||||
|
let path = http::uri::PathAndQuery::from_static("/churn.v1.Churn/GetKey");
|
||||||
|
let mut req = request.into_request();
|
||||||
|
req.extensions_mut().insert(GrpcMethod::new("churn.v1.Churn", "GetKey"));
|
||||||
|
self.inner.unary(req, path, codec).await
|
||||||
|
}
|
||||||
|
pub async fn set_key(
|
||||||
|
&mut self,
|
||||||
|
request: impl tonic::IntoRequest<super::SetKeyRequest>,
|
||||||
|
) -> std::result::Result<tonic::Response<super::SetKeyResponse>, tonic::Status> {
|
||||||
|
self.inner
|
||||||
|
.ready()
|
||||||
|
.await
|
||||||
|
.map_err(|e| {
|
||||||
|
tonic::Status::new(
|
||||||
|
tonic::Code::Unknown,
|
||||||
|
format!("Service was not ready: {}", e.into()),
|
||||||
|
)
|
||||||
|
})?;
|
||||||
|
let codec = tonic::codec::ProstCodec::default();
|
||||||
|
let path = http::uri::PathAndQuery::from_static("/churn.v1.Churn/SetKey");
|
||||||
|
let mut req = request.into_request();
|
||||||
|
req.extensions_mut().insert(GrpcMethod::new("churn.v1.Churn", "SetKey"));
|
||||||
|
self.inner.unary(req, path, codec).await
|
||||||
|
}
|
||||||
|
pub async fn listen_events(
|
||||||
|
&mut self,
|
||||||
|
request: impl tonic::IntoRequest<super::ListenEventsRequest>,
|
||||||
|
) -> std::result::Result<
|
||||||
|
tonic::Response<tonic::codec::Streaming<super::ListenEventsResponse>>,
|
||||||
|
tonic::Status,
|
||||||
|
> {
|
||||||
|
self.inner
|
||||||
|
.ready()
|
||||||
|
.await
|
||||||
|
.map_err(|e| {
|
||||||
|
tonic::Status::new(
|
||||||
|
tonic::Code::Unknown,
|
||||||
|
format!("Service was not ready: {}", e.into()),
|
||||||
|
)
|
||||||
|
})?;
|
||||||
|
let codec = tonic::codec::ProstCodec::default();
|
||||||
|
let path = http::uri::PathAndQuery::from_static(
|
||||||
|
"/churn.v1.Churn/ListenEvents",
|
||||||
|
);
|
||||||
|
let mut req = request.into_request();
|
||||||
|
req.extensions_mut()
|
||||||
|
.insert(GrpcMethod::new("churn.v1.Churn", "ListenEvents"));
|
||||||
|
self.inner.server_streaming(req, path, codec).await
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
/// Generated server implementations.
|
||||||
|
pub mod churn_server {
|
||||||
|
#![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)]
|
||||||
|
use tonic::codegen::*;
|
||||||
|
/// Generated trait containing gRPC methods that should be implemented for use with ChurnServer.
|
||||||
|
#[async_trait]
|
||||||
|
pub trait Churn: Send + Sync + 'static {
|
||||||
|
async fn get_key(
|
||||||
|
&self,
|
||||||
|
request: tonic::Request<super::GetKeyRequest>,
|
||||||
|
) -> std::result::Result<tonic::Response<super::GetKeyResponse>, tonic::Status>;
|
||||||
|
async fn set_key(
|
||||||
|
&self,
|
||||||
|
request: tonic::Request<super::SetKeyRequest>,
|
||||||
|
) -> std::result::Result<tonic::Response<super::SetKeyResponse>, tonic::Status>;
|
||||||
|
/// Server streaming response type for the ListenEvents method.
|
||||||
|
type ListenEventsStream: tonic::codegen::tokio_stream::Stream<
|
||||||
|
Item = std::result::Result<super::ListenEventsResponse, tonic::Status>,
|
||||||
|
>
|
||||||
|
+ Send
|
||||||
|
+ 'static;
|
||||||
|
async fn listen_events(
|
||||||
|
&self,
|
||||||
|
request: tonic::Request<super::ListenEventsRequest>,
|
||||||
|
) -> std::result::Result<
|
||||||
|
tonic::Response<Self::ListenEventsStream>,
|
||||||
|
tonic::Status,
|
||||||
|
>;
|
||||||
|
}
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct ChurnServer<T: Churn> {
|
||||||
|
inner: _Inner<T>,
|
||||||
|
accept_compression_encodings: EnabledCompressionEncodings,
|
||||||
|
send_compression_encodings: EnabledCompressionEncodings,
|
||||||
|
max_decoding_message_size: Option<usize>,
|
||||||
|
max_encoding_message_size: Option<usize>,
|
||||||
|
}
|
||||||
|
struct _Inner<T>(Arc<T>);
|
||||||
|
impl<T: Churn> ChurnServer<T> {
|
||||||
|
pub fn new(inner: T) -> Self {
|
||||||
|
Self::from_arc(Arc::new(inner))
|
||||||
|
}
|
||||||
|
pub fn from_arc(inner: Arc<T>) -> Self {
|
||||||
|
let inner = _Inner(inner);
|
||||||
|
Self {
|
||||||
|
inner,
|
||||||
|
accept_compression_encodings: Default::default(),
|
||||||
|
send_compression_encodings: Default::default(),
|
||||||
|
max_decoding_message_size: None,
|
||||||
|
max_encoding_message_size: None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pub fn with_interceptor<F>(
|
||||||
|
inner: T,
|
||||||
|
interceptor: F,
|
||||||
|
) -> InterceptedService<Self, F>
|
||||||
|
where
|
||||||
|
F: tonic::service::Interceptor,
|
||||||
|
{
|
||||||
|
InterceptedService::new(Self::new(inner), interceptor)
|
||||||
|
}
|
||||||
|
/// Enable decompressing requests with the given encoding.
|
||||||
|
#[must_use]
|
||||||
|
pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self {
|
||||||
|
self.accept_compression_encodings.enable(encoding);
|
||||||
|
self
|
||||||
|
}
|
||||||
|
/// Compress responses with the given encoding, if the client supports it.
|
||||||
|
#[must_use]
|
||||||
|
pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self {
|
||||||
|
self.send_compression_encodings.enable(encoding);
|
||||||
|
self
|
||||||
|
}
|
||||||
|
/// Limits the maximum size of a decoded message.
|
||||||
|
///
|
||||||
|
/// Default: `4MB`
|
||||||
|
#[must_use]
|
||||||
|
pub fn max_decoding_message_size(mut self, limit: usize) -> Self {
|
||||||
|
self.max_decoding_message_size = Some(limit);
|
||||||
|
self
|
||||||
|
}
|
||||||
|
/// Limits the maximum size of an encoded message.
|
||||||
|
///
|
||||||
|
/// Default: `usize::MAX`
|
||||||
|
#[must_use]
|
||||||
|
pub fn max_encoding_message_size(mut self, limit: usize) -> Self {
|
||||||
|
self.max_encoding_message_size = Some(limit);
|
||||||
|
self
|
||||||
|
}
|
||||||
|
}
|
||||||
|
impl<T, B> tonic::codegen::Service<http::Request<B>> for ChurnServer<T>
|
||||||
|
where
|
||||||
|
T: Churn,
|
||||||
|
B: Body + Send + 'static,
|
||||||
|
B::Error: Into<StdError> + Send + 'static,
|
||||||
|
{
|
||||||
|
type Response = http::Response<tonic::body::BoxBody>;
|
||||||
|
type Error = std::convert::Infallible;
|
||||||
|
type Future = BoxFuture<Self::Response, Self::Error>;
|
||||||
|
fn poll_ready(
|
||||||
|
&mut self,
|
||||||
|
_cx: &mut Context<'_>,
|
||||||
|
) -> Poll<std::result::Result<(), Self::Error>> {
|
||||||
|
Poll::Ready(Ok(()))
|
||||||
|
}
|
||||||
|
fn call(&mut self, req: http::Request<B>) -> Self::Future {
|
||||||
|
let inner = self.inner.clone();
|
||||||
|
match req.uri().path() {
|
||||||
|
"/churn.v1.Churn/GetKey" => {
|
||||||
|
#[allow(non_camel_case_types)]
|
||||||
|
struct GetKeySvc<T: Churn>(pub Arc<T>);
|
||||||
|
impl<T: Churn> tonic::server::UnaryService<super::GetKeyRequest>
|
||||||
|
for GetKeySvc<T> {
|
||||||
|
type Response = super::GetKeyResponse;
|
||||||
|
type Future = BoxFuture<
|
||||||
|
tonic::Response<Self::Response>,
|
||||||
|
tonic::Status,
|
||||||
|
>;
|
||||||
|
fn call(
|
||||||
|
&mut self,
|
||||||
|
request: tonic::Request<super::GetKeyRequest>,
|
||||||
|
) -> Self::Future {
|
||||||
|
let inner = Arc::clone(&self.0);
|
||||||
|
let fut = async move {
|
||||||
|
<T as Churn>::get_key(&inner, request).await
|
||||||
|
};
|
||||||
|
Box::pin(fut)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
let accept_compression_encodings = self.accept_compression_encodings;
|
||||||
|
let send_compression_encodings = self.send_compression_encodings;
|
||||||
|
let max_decoding_message_size = self.max_decoding_message_size;
|
||||||
|
let max_encoding_message_size = self.max_encoding_message_size;
|
||||||
|
let inner = self.inner.clone();
|
||||||
|
let fut = async move {
|
||||||
|
let inner = inner.0;
|
||||||
|
let method = GetKeySvc(inner);
|
||||||
|
let codec = tonic::codec::ProstCodec::default();
|
||||||
|
let mut grpc = tonic::server::Grpc::new(codec)
|
||||||
|
.apply_compression_config(
|
||||||
|
accept_compression_encodings,
|
||||||
|
send_compression_encodings,
|
||||||
|
)
|
||||||
|
.apply_max_message_size_config(
|
||||||
|
max_decoding_message_size,
|
||||||
|
max_encoding_message_size,
|
||||||
|
);
|
||||||
|
let res = grpc.unary(method, req).await;
|
||||||
|
Ok(res)
|
||||||
|
};
|
||||||
|
Box::pin(fut)
|
||||||
|
}
|
||||||
|
"/churn.v1.Churn/SetKey" => {
|
||||||
|
#[allow(non_camel_case_types)]
|
||||||
|
struct SetKeySvc<T: Churn>(pub Arc<T>);
|
||||||
|
impl<T: Churn> tonic::server::UnaryService<super::SetKeyRequest>
|
||||||
|
for SetKeySvc<T> {
|
||||||
|
type Response = super::SetKeyResponse;
|
||||||
|
type Future = BoxFuture<
|
||||||
|
tonic::Response<Self::Response>,
|
||||||
|
tonic::Status,
|
||||||
|
>;
|
||||||
|
fn call(
|
||||||
|
&mut self,
|
||||||
|
request: tonic::Request<super::SetKeyRequest>,
|
||||||
|
) -> Self::Future {
|
||||||
|
let inner = Arc::clone(&self.0);
|
||||||
|
let fut = async move {
|
||||||
|
<T as Churn>::set_key(&inner, request).await
|
||||||
|
};
|
||||||
|
Box::pin(fut)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
let accept_compression_encodings = self.accept_compression_encodings;
|
||||||
|
let send_compression_encodings = self.send_compression_encodings;
|
||||||
|
let max_decoding_message_size = self.max_decoding_message_size;
|
||||||
|
let max_encoding_message_size = self.max_encoding_message_size;
|
||||||
|
let inner = self.inner.clone();
|
||||||
|
let fut = async move {
|
||||||
|
let inner = inner.0;
|
||||||
|
let method = SetKeySvc(inner);
|
||||||
|
let codec = tonic::codec::ProstCodec::default();
|
||||||
|
let mut grpc = tonic::server::Grpc::new(codec)
|
||||||
|
.apply_compression_config(
|
||||||
|
accept_compression_encodings,
|
||||||
|
send_compression_encodings,
|
||||||
|
)
|
||||||
|
.apply_max_message_size_config(
|
||||||
|
max_decoding_message_size,
|
||||||
|
max_encoding_message_size,
|
||||||
|
);
|
||||||
|
let res = grpc.unary(method, req).await;
|
||||||
|
Ok(res)
|
||||||
|
};
|
||||||
|
Box::pin(fut)
|
||||||
|
}
|
||||||
|
"/churn.v1.Churn/ListenEvents" => {
|
||||||
|
#[allow(non_camel_case_types)]
|
||||||
|
struct ListenEventsSvc<T: Churn>(pub Arc<T>);
|
||||||
|
impl<
|
||||||
|
T: Churn,
|
||||||
|
> tonic::server::ServerStreamingService<super::ListenEventsRequest>
|
||||||
|
for ListenEventsSvc<T> {
|
||||||
|
type Response = super::ListenEventsResponse;
|
||||||
|
type ResponseStream = T::ListenEventsStream;
|
||||||
|
type Future = BoxFuture<
|
||||||
|
tonic::Response<Self::ResponseStream>,
|
||||||
|
tonic::Status,
|
||||||
|
>;
|
||||||
|
fn call(
|
||||||
|
&mut self,
|
||||||
|
request: tonic::Request<super::ListenEventsRequest>,
|
||||||
|
) -> Self::Future {
|
||||||
|
let inner = Arc::clone(&self.0);
|
||||||
|
let fut = async move {
|
||||||
|
<T as Churn>::listen_events(&inner, request).await
|
||||||
|
};
|
||||||
|
Box::pin(fut)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
let accept_compression_encodings = self.accept_compression_encodings;
|
||||||
|
let send_compression_encodings = self.send_compression_encodings;
|
||||||
|
let max_decoding_message_size = self.max_decoding_message_size;
|
||||||
|
let max_encoding_message_size = self.max_encoding_message_size;
|
||||||
|
let inner = self.inner.clone();
|
||||||
|
let fut = async move {
|
||||||
|
let inner = inner.0;
|
||||||
|
let method = ListenEventsSvc(inner);
|
||||||
|
let codec = tonic::codec::ProstCodec::default();
|
||||||
|
let mut grpc = tonic::server::Grpc::new(codec)
|
||||||
|
.apply_compression_config(
|
||||||
|
accept_compression_encodings,
|
||||||
|
send_compression_encodings,
|
||||||
|
)
|
||||||
|
.apply_max_message_size_config(
|
||||||
|
max_decoding_message_size,
|
||||||
|
max_encoding_message_size,
|
||||||
|
);
|
||||||
|
let res = grpc.server_streaming(method, req).await;
|
||||||
|
Ok(res)
|
||||||
|
};
|
||||||
|
Box::pin(fut)
|
||||||
|
}
|
||||||
|
_ => {
|
||||||
|
Box::pin(async move {
|
||||||
|
Ok(
|
||||||
|
http::Response::builder()
|
||||||
|
.status(200)
|
||||||
|
.header("grpc-status", "12")
|
||||||
|
.header("content-type", "application/grpc")
|
||||||
|
.body(empty_body())
|
||||||
|
.unwrap(),
|
||||||
|
)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
impl<T: Churn> Clone for ChurnServer<T> {
|
||||||
|
fn clone(&self) -> Self {
|
||||||
|
let inner = self.inner.clone();
|
||||||
|
Self {
|
||||||
|
inner,
|
||||||
|
accept_compression_encodings: self.accept_compression_encodings,
|
||||||
|
send_compression_encodings: self.send_compression_encodings,
|
||||||
|
max_decoding_message_size: self.max_decoding_message_size,
|
||||||
|
max_encoding_message_size: self.max_encoding_message_size,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
impl<T: Churn> Clone for _Inner<T> {
|
||||||
|
fn clone(&self) -> Self {
|
||||||
|
Self(Arc::clone(&self.0))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
impl<T: std::fmt::Debug> std::fmt::Debug for _Inner<T> {
|
||||||
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||||
|
write!(f, "{:?}", self.0)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
impl<T: Churn> tonic::server::NamedService for ChurnServer<T> {
|
||||||
|
const NAME: &'static str = "churn.v1.Churn";
|
||||||
|
}
|
||||||
|
}
|
@ -1,8 +1,12 @@
|
|||||||
mod api;
|
mod api;
|
||||||
mod cli;
|
mod cli;
|
||||||
mod state;
|
mod state;
|
||||||
|
mod grpc {
|
||||||
|
include!("grpc/churn.v1.rs");
|
||||||
|
}
|
||||||
|
|
||||||
mod agent;
|
mod agent;
|
||||||
|
mod server;
|
||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
async fn main() -> anyhow::Result<()> {
|
async fn main() -> anyhow::Result<()> {
|
||||||
|
23
crates/churn/src/server.rs
Normal file
23
crates/churn/src/server.rs
Normal file
@ -0,0 +1,23 @@
|
|||||||
|
use std::net::SocketAddr;
|
||||||
|
|
||||||
|
pub mod config;
|
||||||
|
|
||||||
|
mod grpc_server;
|
||||||
|
|
||||||
|
use crate::{api, state::SharedState};
|
||||||
|
|
||||||
|
pub async fn execute(
|
||||||
|
host: impl Into<SocketAddr>,
|
||||||
|
grpc_host: impl Into<SocketAddr>,
|
||||||
|
config: config::ServerConfig,
|
||||||
|
) -> anyhow::Result<()> {
|
||||||
|
let state = SharedState::new(config).await?;
|
||||||
|
|
||||||
|
notmad::Mad::builder()
|
||||||
|
.add(api::Api::new(&state, host))
|
||||||
|
.add(grpc_server::GrpcServer::new(grpc_host.into()))
|
||||||
|
.run()
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
7
crates/churn/src/server/config.rs
Normal file
7
crates/churn/src/server/config.rs
Normal file
@ -0,0 +1,7 @@
|
|||||||
|
#[derive(clap::Args)]
|
||||||
|
pub struct ServerConfig {
|
||||||
|
#[arg(long = "external-host", env = "EXTERNAL_HOST")]
|
||||||
|
pub external_host: String,
|
||||||
|
#[arg(long = "process-host", env = "PROCESS_HOST")]
|
||||||
|
pub process_host: String,
|
||||||
|
}
|
102
crates/churn/src/server/grpc_server.rs
Normal file
102
crates/churn/src/server/grpc_server.rs
Normal file
@ -0,0 +1,102 @@
|
|||||||
|
use std::{collections::BTreeMap, net::SocketAddr, pin::Pin};
|
||||||
|
|
||||||
|
use anyhow::Context;
|
||||||
|
use futures::Stream;
|
||||||
|
use notmad::{Component, MadError};
|
||||||
|
use tonic::transport::Server;
|
||||||
|
|
||||||
|
use crate::{agent::models::Commands, grpc::*};
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct GrpcServer {
|
||||||
|
grpc_host: SocketAddr,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl GrpcServer {
|
||||||
|
pub fn new(grpc_host: SocketAddr) -> Self {
|
||||||
|
Self { grpc_host }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait::async_trait]
|
||||||
|
impl Component for GrpcServer {
|
||||||
|
async fn run(
|
||||||
|
&self,
|
||||||
|
cancellation_token: tokio_util::sync::CancellationToken,
|
||||||
|
) -> Result<(), notmad::MadError> {
|
||||||
|
let task = Server::builder()
|
||||||
|
.add_service(crate::grpc::churn_server::ChurnServer::new(self.clone()))
|
||||||
|
.serve(self.grpc_host);
|
||||||
|
|
||||||
|
tokio::select! {
|
||||||
|
_ = cancellation_token.cancelled() => {},
|
||||||
|
res = task => {
|
||||||
|
res.context("failed to run grpc server").map_err(MadError::Inner)?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait::async_trait]
|
||||||
|
impl crate::grpc::churn_server::Churn for GrpcServer {
|
||||||
|
async fn get_key(
|
||||||
|
&self,
|
||||||
|
request: tonic::Request<GetKeyRequest>,
|
||||||
|
) -> std::result::Result<tonic::Response<GetKeyResponse>, tonic::Status> {
|
||||||
|
todo!()
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn set_key(
|
||||||
|
&self,
|
||||||
|
request: tonic::Request<SetKeyRequest>,
|
||||||
|
) -> std::result::Result<tonic::Response<SetKeyResponse>, tonic::Status> {
|
||||||
|
todo!()
|
||||||
|
}
|
||||||
|
|
||||||
|
#[doc = " Server streaming response type for the ListenEvents method."]
|
||||||
|
type ListenEventsStream =
|
||||||
|
Pin<Box<dyn Stream<Item = Result<ListenEventsResponse, tonic::Status>> + Send>>;
|
||||||
|
|
||||||
|
async fn listen_events(
|
||||||
|
&self,
|
||||||
|
request: tonic::Request<ListenEventsRequest>,
|
||||||
|
) -> std::result::Result<tonic::Response<Self::ListenEventsStream>, tonic::Status> {
|
||||||
|
let (tx, rx) = tokio::sync::mpsc::channel(128);
|
||||||
|
tokio::spawn(async move {
|
||||||
|
let mut interval = tokio::time::interval(std::time::Duration::from_secs(60 * 10));
|
||||||
|
|
||||||
|
loop {
|
||||||
|
interval.tick().await;
|
||||||
|
|
||||||
|
let Ok(schedule_task) = serde_json::to_string(&Commands::ScheduleTask {
|
||||||
|
task: "refresh".into(),
|
||||||
|
properties: BTreeMap::default(),
|
||||||
|
}) else {
|
||||||
|
tracing::warn!("failed to serialize event");
|
||||||
|
continue;
|
||||||
|
};
|
||||||
|
|
||||||
|
if let Err(e) = tx
|
||||||
|
.send(Ok(ListenEventsResponse {
|
||||||
|
id: uuid::Uuid::new_v4().to_string(),
|
||||||
|
value: schedule_task,
|
||||||
|
}))
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
tracing::warn!("failed to send response: {}", e);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
let stream = futures::stream::unfold(rx, |mut msg| async move {
|
||||||
|
let next = msg.recv().await?;
|
||||||
|
|
||||||
|
Some((next, msg))
|
||||||
|
});
|
||||||
|
|
||||||
|
Ok(tonic::Response::new(Box::pin(stream)))
|
||||||
|
}
|
||||||
|
}
|
@ -1,11 +1,13 @@
|
|||||||
use std::{ops::Deref, sync::Arc};
|
use std::{ops::Deref, sync::Arc};
|
||||||
|
|
||||||
|
use crate::server::config::ServerConfig;
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct SharedState(Arc<State>);
|
pub struct SharedState(Arc<State>);
|
||||||
|
|
||||||
impl SharedState {
|
impl SharedState {
|
||||||
pub async fn new() -> anyhow::Result<Self> {
|
pub async fn new(config: ServerConfig) -> anyhow::Result<Self> {
|
||||||
Ok(Self(Arc::new(State::new().await?)))
|
Ok(Self(Arc::new(State::new(config).await?)))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -23,10 +25,12 @@ impl Deref for SharedState {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct State {}
|
pub struct State {
|
||||||
|
pub config: ServerConfig,
|
||||||
|
}
|
||||||
|
|
||||||
impl State {
|
impl State {
|
||||||
pub async fn new() -> anyhow::Result<Self> {
|
pub async fn new(config: ServerConfig) -> anyhow::Result<Self> {
|
||||||
Ok(Self {})
|
Ok(Self { config })
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
19
crates/churn/wit/world.wit
Normal file
19
crates/churn/wit/world.wit
Normal file
@ -0,0 +1,19 @@
|
|||||||
|
package component:churn-tasks@0.1.0;
|
||||||
|
|
||||||
|
interface process {
|
||||||
|
resource process {
|
||||||
|
constructor();
|
||||||
|
run-process: func(inputs: list<string>) -> string;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
interface task {
|
||||||
|
id: func() -> string;
|
||||||
|
should-run: func() -> bool;
|
||||||
|
execute: func();
|
||||||
|
}
|
||||||
|
|
||||||
|
world churn {
|
||||||
|
export task;
|
||||||
|
import process;
|
||||||
|
}
|
@ -12,11 +12,20 @@ vars:
|
|||||||
ingress:
|
ingress:
|
||||||
- external: "true"
|
- external: "true"
|
||||||
- internal: "true"
|
- internal: "true"
|
||||||
|
- internal_grpc: "true"
|
||||||
|
|
||||||
cuddle/clusters:
|
cuddle/clusters:
|
||||||
dev:
|
dev:
|
||||||
env:
|
env:
|
||||||
service.host: "0.0.0.0:3000"
|
service.host: "0.0.0.0:3000"
|
||||||
|
service.grpc.host: "0.0.0.0:4001"
|
||||||
|
process.host: "https://grpc.churn.dev.internal.kjuulh.app"
|
||||||
|
external.host: "https://churn.internal.dev.kjuulh.app"
|
||||||
|
rust.log: "h2=warn,debug"
|
||||||
prod:
|
prod:
|
||||||
env:
|
env:
|
||||||
service.host: "0.0.0.0:3000"
|
service.host: "0.0.0.0:3000"
|
||||||
|
service.grpc.host: "0.0.0.0:4001"
|
||||||
|
process.host: "https://grpc.churn.prod.internal.kjuulh.app"
|
||||||
|
external.host: "https://churn.internal.prod.kjuulh.app"
|
||||||
|
rust.log: "h2=warn,debug"
|
||||||
|
127
install.sh
127
install.sh
@ -1,97 +1,92 @@
|
|||||||
#!/bin/bash
|
#!/usr/bin/env bash
|
||||||
|
|
||||||
set -e
|
set -e
|
||||||
|
|
||||||
# Configuration variables
|
# Configuration
|
||||||
GITEA_HOST="https://git.front.kjuulh.io"
|
APP_NAME="churn"
|
||||||
REPO_OWNER="kjuulh"
|
APP_VERSION="latest" # or specify a version
|
||||||
REPO_NAME="churn-v2"
|
S3_BUCKET="rust-artifacts"
|
||||||
BINARY_NAME="churn"
|
BINARY_NAME="churn"
|
||||||
SERVICE_NAME="churn-agent"
|
SERVICE_NAME="${APP_NAME}.service"
|
||||||
SERVICE_USER="churn"
|
INSTALL_DIR="/usr/local/bin"
|
||||||
RELEASE_TAG="latest" # or specific version like "v1.0.0"
|
CONFIG_DIR="/etc/${APP_NAME}"
|
||||||
|
CHURN_DISCOVERY="https://churn.prod.kjuulh.app"
|
||||||
|
|
||||||
|
# Colors for output
|
||||||
|
RED='\033[0;31m'
|
||||||
|
GREEN='\033[0;32m'
|
||||||
|
NC='\033[0m' # No Color
|
||||||
|
|
||||||
# Check if running as root
|
# Check if running as root
|
||||||
if [ "$EUID" -ne 0 ]; then
|
if [ "$EUID" -ne 0 ]; then
|
||||||
echo "Please run as root"
|
echo -e "${RED}Please run as root${NC}"
|
||||||
exit 1
|
exit 1
|
||||||
fi
|
fi
|
||||||
|
|
||||||
# Create service user if it doesn't exist
|
# Create necessary directories
|
||||||
if ! id "$SERVICE_USER" &>/dev/null; then
|
echo "Creating directories..."
|
||||||
useradd -r -s /bin/false "$SERVICE_USER"
|
mkdir -p "${INSTALL_DIR}"
|
||||||
|
mkdir -p "${CONFIG_DIR}"
|
||||||
|
|
||||||
|
if systemctl is-active --quiet churn.service; then
|
||||||
|
echo "Stopping existing churn service..."
|
||||||
|
systemctl stop churn.service
|
||||||
fi
|
fi
|
||||||
|
|
||||||
# Function to get latest release if RELEASE_TAG is "latest"
|
# Download binary from S3
|
||||||
get_latest_release() {
|
echo "Downloading binary..."
|
||||||
curl -s "https://$GITEA_HOST/api/v1/repos/$REPO_OWNER/$REPO_NAME/releases/latest" | \
|
curl -L -s "https://api-minio.front.kjuulh.io/${S3_BUCKET}/releases/${APP_NAME}/${APP_VERSION}/${BINARY_NAME}" -o "${INSTALL_DIR}/${BINARY_NAME}"
|
||||||
grep '"tag_name":' | \
|
|
||||||
sed -E 's/.*"([^"]+)".*/\1/'
|
|
||||||
}
|
|
||||||
|
|
||||||
# Determine the actual release tag
|
# Make binary executable
|
||||||
if [ "$RELEASE_TAG" = "latest" ]; then
|
chmod +x "${INSTALL_DIR}/${BINARY_NAME}"
|
||||||
RELEASE_TAG=$(get_latest_release)
|
|
||||||
|
echo "Starting churn agent setup..."
|
||||||
|
set +e
|
||||||
|
res=$(churn agent setup --discovery "${CHURN_DISCOVERY}" 2>&1)
|
||||||
|
set -e
|
||||||
|
exit_code=$?
|
||||||
|
|
||||||
|
if [[ $exit_code != 0 ]]; then
|
||||||
|
if [[ "$res" != *"config file already exists"* ]] && [[ "$res" != *"already exists"* ]]; then
|
||||||
|
echo "Error detected: $res"
|
||||||
|
exit 1
|
||||||
|
else
|
||||||
|
echo "Ignoring setup 'agent is already setup'"
|
||||||
|
fi
|
||||||
fi
|
fi
|
||||||
|
|
||||||
echo "Installing $BINARY_NAME version $RELEASE_TAG..."
|
|
||||||
|
|
||||||
# Download and install binary
|
|
||||||
TMP_DIR=$(mktemp -d)
|
|
||||||
cd "$TMP_DIR"
|
|
||||||
|
|
||||||
# Download binary from Gitea
|
|
||||||
curl -L -o "$BINARY_NAME" \
|
|
||||||
"https://$GITEA_HOST/$REPO_OWNER/$REPO_NAME/releases/download/$RELEASE_TAG/$BINARY_NAME"
|
|
||||||
|
|
||||||
# Make binary executable and move to appropriate location
|
|
||||||
chmod +x "$BINARY_NAME"
|
|
||||||
mv "$BINARY_NAME" "/usr/local/bin/$BINARY_NAME"
|
|
||||||
|
|
||||||
# Create systemd service file
|
# Create systemd service file
|
||||||
cat > "/etc/systemd/system/$SERVICE_NAME.service" << EOF
|
echo "Creating systemd service..."
|
||||||
|
cat > "/etc/systemd/system/${SERVICE_NAME}" << EOF
|
||||||
[Unit]
|
[Unit]
|
||||||
Description=$SERVICE_NAME Service
|
Description=${APP_NAME} service
|
||||||
After=network.target
|
After=network.target
|
||||||
|
|
||||||
[Service]
|
[Service]
|
||||||
Type=simple
|
Type=simple
|
||||||
User=$SERVICE_USER
|
User=root
|
||||||
ExecStart=/usr/local/bin/$BINARY_NAME
|
Group=root
|
||||||
|
ExecStart=${INSTALL_DIR}/${BINARY_NAME} agent start
|
||||||
Restart=always
|
Restart=always
|
||||||
RestartSec=5
|
RestartSec=10
|
||||||
|
Environment=RUST_LOG=h2=warn,hyper=warn,churn=debug,warn
|
||||||
# Security hardening options
|
|
||||||
ProtectSystem=strict
|
|
||||||
ProtectHome=true
|
|
||||||
NoNewPrivileges=true
|
|
||||||
ReadWritePaths=/var/log/$SERVICE_NAME
|
|
||||||
|
|
||||||
[Install]
|
[Install]
|
||||||
WantedBy=multi-user.target
|
WantedBy=multi-user.target
|
||||||
EOF
|
EOF
|
||||||
|
|
||||||
# Create log directory if logging is needed
|
|
||||||
mkdir -p "/var/log/$SERVICE_NAME"
|
|
||||||
chown "$SERVICE_USER:$SERVICE_USER" "/var/log/$SERVICE_NAME"
|
|
||||||
|
|
||||||
# Reload systemd and enable service
|
# Reload systemd and enable service
|
||||||
|
echo "Configuring systemd service..."
|
||||||
systemctl daemon-reload
|
systemctl daemon-reload
|
||||||
systemctl enable "$SERVICE_NAME"
|
systemctl enable "${SERVICE_NAME}"
|
||||||
systemctl start "$SERVICE_NAME"
|
systemctl start "${SERVICE_NAME}"
|
||||||
|
|
||||||
# Clean up
|
# Check service status
|
||||||
cd
|
if systemctl is-active --quiet "${SERVICE_NAME}"; then
|
||||||
rm -rf "$TMP_DIR"
|
echo -e "${GREEN}Installation successful! ${APP_NAME} is running.${NC}"
|
||||||
|
echo "You can check the status with: systemctl status ${SERVICE_NAME}"
|
||||||
|
else
|
||||||
|
echo -e "${RED}Installation completed but service failed to start. Check logs with: journalctl -u ${SERVICE_NAME}${NC}"
|
||||||
|
exit 1
|
||||||
|
fi
|
||||||
|
|
||||||
echo "Installation complete! Service status:"
|
|
||||||
systemctl status "$SERVICE_NAME"
|
|
||||||
|
|
||||||
# Provide some helpful commands
|
|
||||||
echo "
|
|
||||||
Useful commands:
|
|
||||||
- Check status: systemctl status $SERVICE_NAME
|
|
||||||
- View logs: journalctl -u $SERVICE_NAME
|
|
||||||
- Restart service: systemctl restart $SERVICE_NAME
|
|
||||||
- Stop service: systemctl stop $SERVICE_NAME
|
|
||||||
"
|
|
||||||
|
Loading…
Reference in New Issue
Block a user