Compare commits
No commits in common. "main" and "v2" have entirely different histories.
8
.env
8
.env
@ -1,9 +1 @@
|
|||||||
EXTERNAL_HOST=http://localhost:3000
|
|
||||||
PROCESS_HOST=http://localhost:7900
|
|
||||||
SERVICE_HOST=127.0.0.1:3000
|
|
||||||
DISCOVERY_HOST=http://127.0.0.1:3000
|
|
||||||
|
|
||||||
#EXTERNAL_HOST=http://localhost:3000
|
|
||||||
#PROCESS_HOST=http://localhost:7900
|
|
||||||
#SERVICE_HOST=127.0.0.1:3000
|
|
||||||
#DISCOVERY_HOST=https://churn.prod.kjuulh.app
|
|
||||||
|
2208
Cargo.lock
generated
2208
Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
@ -1,8 +0,0 @@
|
|||||||
version: v2
|
|
||||||
plugins:
|
|
||||||
- remote: buf.build/community/neoeinstein-prost
|
|
||||||
out: crates/churn/src/grpc
|
|
||||||
- remote: buf.build/community/neoeinstein-tonic:v0.4.0
|
|
||||||
out: crates/churn/src/grpc
|
|
||||||
inputs:
|
|
||||||
- directory: crates/churn/proto
|
|
@ -14,8 +14,8 @@ axum.workspace = true
|
|||||||
|
|
||||||
serde = { version = "1.0.197", features = ["derive"] }
|
serde = { version = "1.0.197", features = ["derive"] }
|
||||||
uuid = { version = "1.7.0", features = ["v4"] }
|
uuid = { version = "1.7.0", features = ["v4"] }
|
||||||
tower-http = { version = "0.6.0", features = ["cors", "trace"] }
|
tower-http = { version = "0.5.2", features = ["cors", "trace"] }
|
||||||
notmad = "0.7.1"
|
notmad = "0.6.0"
|
||||||
tokio-util = "0.7.12"
|
tokio-util = "0.7.12"
|
||||||
async-trait = "0.1.83"
|
async-trait = "0.1.83"
|
||||||
nodrift = "0.2.0"
|
nodrift = "0.2.0"
|
||||||
@ -24,16 +24,3 @@ prost-types = "0.13.3"
|
|||||||
prost = "0.13.3"
|
prost = "0.13.3"
|
||||||
bytes = "1.8.0"
|
bytes = "1.8.0"
|
||||||
tonic = { version = "0.12.3", features = ["tls", "tls-roots"] }
|
tonic = { version = "0.12.3", features = ["tls", "tls-roots"] }
|
||||||
toml = "0.8.19"
|
|
||||||
dirs = "5.0.1"
|
|
||||||
futures = "0.3.31"
|
|
||||||
reqwest = { version = "0.12.9", default-features = false, features = [
|
|
||||||
"json",
|
|
||||||
"http2",
|
|
||||||
"charset",
|
|
||||||
"native-tls-vendored",
|
|
||||||
"stream",
|
|
||||||
] }
|
|
||||||
serde_json = "1.0.133"
|
|
||||||
wasmtime = "27.0.0"
|
|
||||||
wasmtime-wasi = "27.0.0"
|
|
||||||
|
@ -1,35 +0,0 @@
|
|||||||
syntax = "proto3";
|
|
||||||
|
|
||||||
package churn.v1;
|
|
||||||
|
|
||||||
service Churn {
|
|
||||||
rpc GetKey(GetKeyRequest) returns (GetKeyResponse);
|
|
||||||
rpc SetKey(SetKeyRequest) returns (SetKeyResponse);
|
|
||||||
rpc ListenEvents(ListenEventsRequest) returns (stream ListenEventsResponse);
|
|
||||||
}
|
|
||||||
|
|
||||||
message GetKeyRequest {
|
|
||||||
string namespace = 1;
|
|
||||||
optional string id = 2;
|
|
||||||
string key = 3;
|
|
||||||
}
|
|
||||||
message GetKeyResponse {
|
|
||||||
optional string value = 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
message SetKeyRequest {
|
|
||||||
string namespace = 1;
|
|
||||||
optional string id = 2;
|
|
||||||
string key = 3;
|
|
||||||
string value = 4;
|
|
||||||
}
|
|
||||||
message SetKeyResponse {}
|
|
||||||
|
|
||||||
message ListenEventsRequest {
|
|
||||||
string namespace = 1;
|
|
||||||
optional string id = 2;
|
|
||||||
}
|
|
||||||
message ListenEventsResponse {
|
|
||||||
string id = 1;
|
|
||||||
string value = 2;
|
|
||||||
}
|
|
@ -1,33 +1,15 @@
|
|||||||
use agent_state::AgentState;
|
use agent_state::AgentState;
|
||||||
use event_handler::EventHandler;
|
|
||||||
use refresh::AgentRefresh;
|
use refresh::AgentRefresh;
|
||||||
|
|
||||||
pub use config::setup_config;
|
|
||||||
|
|
||||||
pub mod models;
|
|
||||||
pub(crate) mod task;
|
|
||||||
|
|
||||||
mod agent_state;
|
mod agent_state;
|
||||||
mod config;
|
|
||||||
mod discovery_client;
|
|
||||||
mod event_handler;
|
|
||||||
mod grpc_client;
|
|
||||||
mod plugins;
|
|
||||||
mod queue;
|
|
||||||
mod refresh;
|
mod refresh;
|
||||||
mod scheduler;
|
|
||||||
|
|
||||||
mod handlers;
|
pub async fn execute(host: impl Into<String>) -> anyhow::Result<()> {
|
||||||
|
|
||||||
mod actions;
|
|
||||||
|
|
||||||
pub async fn execute() -> anyhow::Result<()> {
|
|
||||||
let state = AgentState::new().await?;
|
let state = AgentState::new().await?;
|
||||||
|
|
||||||
notmad::Mad::builder()
|
notmad::Mad::builder()
|
||||||
.add(AgentRefresh::new(&state))
|
.add(AgentRefresh::new(&state, host))
|
||||||
.add(EventHandler::new(&state))
|
|
||||||
.add(state.queue.clone())
|
|
||||||
.cancellation(Some(std::time::Duration::from_secs(2)))
|
.cancellation(Some(std::time::Duration::from_secs(2)))
|
||||||
.run()
|
.run()
|
||||||
.await?;
|
.await?;
|
||||||
|
@ -1,23 +0,0 @@
|
|||||||
use apt::AptTask;
|
|
||||||
use plugin_task::PluginTask;
|
|
||||||
|
|
||||||
use super::{plugins::PluginStore, task::IntoTask};
|
|
||||||
|
|
||||||
pub mod apt;
|
|
||||||
pub mod plugin_task;
|
|
||||||
|
|
||||||
pub struct Plan {
|
|
||||||
store: PluginStore,
|
|
||||||
}
|
|
||||||
impl Plan {
|
|
||||||
pub fn new(store: PluginStore) -> Self {
|
|
||||||
Self { store }
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn tasks(&self) -> anyhow::Result<Vec<impl IntoTask>> {
|
|
||||||
Ok(vec![
|
|
||||||
AptTask::new().into_task(),
|
|
||||||
PluginTask::new("alloy@0.1.0", self.store.clone()).into_task(),
|
|
||||||
])
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,49 +0,0 @@
|
|||||||
use anyhow::Context;
|
|
||||||
|
|
||||||
use crate::agent::task::Task;
|
|
||||||
|
|
||||||
pub struct AptTask {}
|
|
||||||
|
|
||||||
impl AptTask {
|
|
||||||
pub fn new() -> Self {
|
|
||||||
Self {}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[async_trait::async_trait]
|
|
||||||
impl Task for AptTask {
|
|
||||||
async fn id(&self) -> anyhow::Result<String> {
|
|
||||||
Ok("apt".into())
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn execute(&self) -> anyhow::Result<()> {
|
|
||||||
let mut cmd = tokio::process::Command::new("apt-get");
|
|
||||||
cmd.args(["update", "-q"]);
|
|
||||||
let output = cmd.output().await.context("failed to run apt update")?;
|
|
||||||
match output.status.success() {
|
|
||||||
true => tracing::info!("successfully ran apt update"),
|
|
||||||
false => {
|
|
||||||
anyhow::bail!(
|
|
||||||
"failed to run apt update: {}",
|
|
||||||
std::str::from_utf8(&output.stderr)?
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
let mut cmd = tokio::process::Command::new("apt-get");
|
|
||||||
cmd.env("DEBIAN_FRONTEND", "noninteractive")
|
|
||||||
.args(["upgrade", "-y"]);
|
|
||||||
let output = cmd.output().await.context("failed to run apt upgrade")?;
|
|
||||||
match output.status.success() {
|
|
||||||
true => tracing::info!("successfully ran apt upgrade"),
|
|
||||||
false => {
|
|
||||||
anyhow::bail!(
|
|
||||||
"failed to run apt upgrade: {}",
|
|
||||||
std::str::from_utf8(&output.stderr)?
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,30 +0,0 @@
|
|||||||
use crate::agent::{plugins::PluginStore, task::Task};
|
|
||||||
|
|
||||||
pub struct PluginTask {
|
|
||||||
plugin: String,
|
|
||||||
store: PluginStore,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl PluginTask {
|
|
||||||
pub fn new(plugin: impl Into<String>, store: PluginStore) -> Self {
|
|
||||||
Self {
|
|
||||||
plugin: plugin.into(),
|
|
||||||
store,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[async_trait::async_trait]
|
|
||||||
impl Task for PluginTask {
|
|
||||||
async fn id(&self) -> anyhow::Result<String> {
|
|
||||||
let id = self.store.id(&self.plugin).await?;
|
|
||||||
|
|
||||||
Ok(id)
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn execute(&self) -> anyhow::Result<()> {
|
|
||||||
self.store.execute(&self.plugin).await?;
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,13 +1,5 @@
|
|||||||
use std::{ops::Deref, sync::Arc};
|
use std::{ops::Deref, sync::Arc};
|
||||||
|
|
||||||
use crate::api::Discovery;
|
|
||||||
|
|
||||||
use super::{
|
|
||||||
config::AgentConfig, discovery_client::DiscoveryClient, grpc_client::GrpcClient,
|
|
||||||
handlers::scheduled_tasks::ScheduledTasks, plugins::PluginStore, queue::AgentQueue,
|
|
||||||
scheduler::Scheduler,
|
|
||||||
};
|
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct AgentState(Arc<State>);
|
pub struct AgentState(Arc<State>);
|
||||||
|
|
||||||
@ -31,30 +23,10 @@ impl Deref for AgentState {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct State {
|
pub struct State {}
|
||||||
pub grpc: GrpcClient,
|
|
||||||
pub config: AgentConfig,
|
|
||||||
pub discovery: Discovery,
|
|
||||||
pub queue: AgentQueue,
|
|
||||||
pub plugin_store: PluginStore,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl State {
|
impl State {
|
||||||
pub async fn new() -> anyhow::Result<Self> {
|
pub async fn new() -> anyhow::Result<Self> {
|
||||||
let config = AgentConfig::new().await?;
|
Ok(Self {})
|
||||||
let discovery = DiscoveryClient::new(&config.discovery).discover().await?;
|
|
||||||
let grpc = GrpcClient::new(&discovery.process_host);
|
|
||||||
let plugin_store = PluginStore::new()?;
|
|
||||||
let scheduled_tasks = ScheduledTasks::new(plugin_store.clone());
|
|
||||||
let scheduler = Scheduler::new(scheduled_tasks);
|
|
||||||
let queue = AgentQueue::new(scheduler);
|
|
||||||
|
|
||||||
Ok(Self {
|
|
||||||
grpc,
|
|
||||||
config,
|
|
||||||
discovery,
|
|
||||||
queue,
|
|
||||||
plugin_store,
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,93 +0,0 @@
|
|||||||
use std::collections::BTreeMap;
|
|
||||||
|
|
||||||
use anyhow::Context;
|
|
||||||
use serde::{Deserialize, Serialize};
|
|
||||||
use uuid::Uuid;
|
|
||||||
|
|
||||||
#[derive(Clone)]
|
|
||||||
pub struct AgentConfig {
|
|
||||||
pub agent_id: String,
|
|
||||||
pub discovery: String,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl AgentConfig {
|
|
||||||
pub async fn new() -> anyhow::Result<Self> {
|
|
||||||
let config = ConfigFile::load().await?;
|
|
||||||
|
|
||||||
Ok(Self {
|
|
||||||
agent_id: config.agent_id,
|
|
||||||
discovery: config.discovery,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize)]
|
|
||||||
struct ConfigFile {
|
|
||||||
agent_id: String,
|
|
||||||
discovery: String,
|
|
||||||
|
|
||||||
labels: Option<BTreeMap<String, String>>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl ConfigFile {
|
|
||||||
pub async fn load() -> anyhow::Result<Self> {
|
|
||||||
let directory = dirs::data_dir()
|
|
||||||
.ok_or(anyhow::anyhow!("failed to get data dir"))?
|
|
||||||
.join("io.kjuulh.churn-agent")
|
|
||||||
.join("churn-agent.toml");
|
|
||||||
|
|
||||||
if !directory.exists() {
|
|
||||||
anyhow::bail!(
|
|
||||||
"No churn agent file was setup, run `churn agent setup` to setup the defaults"
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
let contents = tokio::fs::read_to_string(&directory).await?;
|
|
||||||
|
|
||||||
toml::from_str(&contents).context("failed to parse the contents of the churn agent config")
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn write_default(
|
|
||||||
discovery: impl Into<String>,
|
|
||||||
force: bool,
|
|
||||||
labels: impl Into<BTreeMap<String, String>>,
|
|
||||||
) -> anyhow::Result<Self> {
|
|
||||||
let s = Self {
|
|
||||||
agent_id: Uuid::new_v4().to_string(),
|
|
||||||
discovery: discovery.into(),
|
|
||||||
labels: Some(labels.into()),
|
|
||||||
};
|
|
||||||
|
|
||||||
let directory = dirs::data_dir()
|
|
||||||
.ok_or(anyhow::anyhow!("failed to get data dir"))?
|
|
||||||
.join("io.kjuulh.churn-agent")
|
|
||||||
.join("churn-agent.toml");
|
|
||||||
|
|
||||||
if let Some(parent) = directory.parent() {
|
|
||||||
tokio::fs::create_dir_all(&parent).await?;
|
|
||||||
}
|
|
||||||
|
|
||||||
if !force && directory.exists() {
|
|
||||||
anyhow::bail!("config file already exists, consider moving it to a backup before trying again: {}", directory.display());
|
|
||||||
}
|
|
||||||
|
|
||||||
let contents = toml::to_string_pretty(&s)
|
|
||||||
.context("failed to convert default implementation to string")?;
|
|
||||||
|
|
||||||
tokio::fs::write(directory, contents.as_bytes())
|
|
||||||
.await
|
|
||||||
.context("failed to write to agent file")?;
|
|
||||||
|
|
||||||
Ok(s)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn setup_config(
|
|
||||||
discovery: impl Into<String>,
|
|
||||||
force: bool,
|
|
||||||
labels: impl Into<BTreeMap<String, String>>,
|
|
||||||
) -> anyhow::Result<()> {
|
|
||||||
ConfigFile::write_default(discovery, force, labels).await?;
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
@ -1,21 +0,0 @@
|
|||||||
use crate::api::Discovery;
|
|
||||||
|
|
||||||
pub struct DiscoveryClient {
|
|
||||||
host: String,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl DiscoveryClient {
|
|
||||||
pub fn new(discovery_host: impl Into<String>) -> Self {
|
|
||||||
Self {
|
|
||||||
host: discovery_host.into(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn discover(&self) -> anyhow::Result<Discovery> {
|
|
||||||
tracing::info!(
|
|
||||||
"getting details from discovery endpoint: {}/discovery",
|
|
||||||
self.host.trim_end_matches('/')
|
|
||||||
);
|
|
||||||
crate::api::Discovery::get_from_host(&self.host).await
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,63 +0,0 @@
|
|||||||
use notmad::{Component, MadError};
|
|
||||||
|
|
||||||
use crate::agent::models::Commands;
|
|
||||||
|
|
||||||
use super::{
|
|
||||||
agent_state::AgentState, config::AgentConfig, grpc_client::GrpcClient, queue::AgentQueue,
|
|
||||||
};
|
|
||||||
|
|
||||||
#[derive(Clone)]
|
|
||||||
pub struct EventHandler {
|
|
||||||
config: AgentConfig,
|
|
||||||
grpc: GrpcClient,
|
|
||||||
queue: AgentQueue,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl EventHandler {
|
|
||||||
pub fn new(state: impl Into<AgentState>) -> Self {
|
|
||||||
let state: AgentState = state.into();
|
|
||||||
|
|
||||||
Self {
|
|
||||||
config: state.config.clone(),
|
|
||||||
grpc: state.grpc.clone(),
|
|
||||||
queue: state.queue.clone(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[async_trait::async_trait]
|
|
||||||
impl Component for EventHandler {
|
|
||||||
fn name(&self) -> Option<String> {
|
|
||||||
Some("event_handler".into())
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn run(
|
|
||||||
&self,
|
|
||||||
cancellation_token: tokio_util::sync::CancellationToken,
|
|
||||||
) -> Result<(), notmad::MadError> {
|
|
||||||
tokio::select! {
|
|
||||||
_ = cancellation_token.cancelled() => {},
|
|
||||||
res = self.grpc.listen_events("agents", None::<String>, self.clone()) => {
|
|
||||||
res.map_err(MadError::Inner)?;
|
|
||||||
},
|
|
||||||
res = self.grpc.listen_events("agents", Some(&self.config.agent_id), self.clone()) => {
|
|
||||||
res.map_err(MadError::Inner)?;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[async_trait::async_trait]
|
|
||||||
impl super::grpc_client::ListenEventsExecutor for EventHandler {
|
|
||||||
async fn execute(&self, event: crate::grpc::ListenEventsResponse) -> anyhow::Result<()> {
|
|
||||||
tracing::info!(value = event.id, "received event");
|
|
||||||
|
|
||||||
let event: Commands = serde_json::from_str(&event.value)?;
|
|
||||||
|
|
||||||
self.queue.publish(event).await?;
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,108 +0,0 @@
|
|||||||
use tonic::transport::{Channel, ClientTlsConfig};
|
|
||||||
|
|
||||||
use crate::grpc::{churn_client::ChurnClient, *};
|
|
||||||
|
|
||||||
#[derive(Clone)]
|
|
||||||
pub struct GrpcClient {
|
|
||||||
host: String,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl GrpcClient {
|
|
||||||
pub fn new(host: impl Into<String>) -> Self {
|
|
||||||
Self { host: host.into() }
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn get_key(
|
|
||||||
&self,
|
|
||||||
namespace: &str,
|
|
||||||
id: Option<impl Into<String>>,
|
|
||||||
key: &str,
|
|
||||||
) -> anyhow::Result<Option<String>> {
|
|
||||||
let mut client = self.client().await?;
|
|
||||||
|
|
||||||
let resp = client
|
|
||||||
.get_key(GetKeyRequest {
|
|
||||||
key: key.into(),
|
|
||||||
namespace: namespace.into(),
|
|
||||||
id: id.map(|i| i.into()),
|
|
||||||
})
|
|
||||||
.await?;
|
|
||||||
let resp = resp.into_inner();
|
|
||||||
|
|
||||||
Ok(resp.value)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn set_key(
|
|
||||||
&self,
|
|
||||||
namespace: &str,
|
|
||||||
id: Option<impl Into<String>>,
|
|
||||||
key: &str,
|
|
||||||
value: &str,
|
|
||||||
) -> anyhow::Result<()> {
|
|
||||||
let mut client = self.client().await?;
|
|
||||||
|
|
||||||
client
|
|
||||||
.set_key(SetKeyRequest {
|
|
||||||
key: key.into(),
|
|
||||||
value: value.into(),
|
|
||||||
namespace: namespace.into(),
|
|
||||||
id: id.map(|i| i.into()),
|
|
||||||
})
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn listen_events(
|
|
||||||
&self,
|
|
||||||
namespace: &str,
|
|
||||||
id: Option<impl Into<String>>,
|
|
||||||
exec: impl ListenEventsExecutor,
|
|
||||||
) -> anyhow::Result<()> {
|
|
||||||
let mut client = self.client().await?;
|
|
||||||
|
|
||||||
tracing::debug!("creating stream for listening to events on: {}", namespace);
|
|
||||||
let resp = client
|
|
||||||
.listen_events(ListenEventsRequest {
|
|
||||||
namespace: namespace.into(),
|
|
||||||
id: id.map(|i| i.into()),
|
|
||||||
})
|
|
||||||
.await
|
|
||||||
.inspect_err(|e| tracing::warn!("failed to establish a connection: {}", e))?;
|
|
||||||
|
|
||||||
tracing::debug!("setup stream: {}", namespace);
|
|
||||||
let mut inner = resp.into_inner();
|
|
||||||
while let Ok(Some(message)) = inner.message().await {
|
|
||||||
tracing::debug!("received message: {}", namespace);
|
|
||||||
exec.execute(message)
|
|
||||||
.await
|
|
||||||
.inspect_err(|e| tracing::warn!("failed to handle message: {}", e))?;
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn client(&self) -> anyhow::Result<ChurnClient<tonic::transport::Channel>> {
|
|
||||||
tracing::debug!("setting up client");
|
|
||||||
let channel = if self.host.starts_with("https") {
|
|
||||||
Channel::from_shared(self.host.to_owned())?
|
|
||||||
.tls_config(ClientTlsConfig::new().with_native_roots())?
|
|
||||||
.connect_timeout(std::time::Duration::from_secs(5))
|
|
||||||
.connect()
|
|
||||||
.await?
|
|
||||||
} else {
|
|
||||||
Channel::from_shared(self.host.to_owned())?
|
|
||||||
.connect()
|
|
||||||
.await?
|
|
||||||
};
|
|
||||||
|
|
||||||
let client = ChurnClient::new(channel);
|
|
||||||
|
|
||||||
Ok(client)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[async_trait::async_trait]
|
|
||||||
pub trait ListenEventsExecutor {
|
|
||||||
async fn execute(&self, event: ListenEventsResponse) -> anyhow::Result<()>;
|
|
||||||
}
|
|
@ -1 +0,0 @@
|
|||||||
pub mod scheduled_tasks;
|
|
@ -1,46 +0,0 @@
|
|||||||
use std::collections::BTreeMap;
|
|
||||||
|
|
||||||
use crate::agent::{
|
|
||||||
actions::Plan,
|
|
||||||
plugins::PluginStore,
|
|
||||||
task::{ConcreteTask, IntoTask},
|
|
||||||
};
|
|
||||||
|
|
||||||
#[derive(Clone)]
|
|
||||||
pub struct ScheduledTasks {
|
|
||||||
store: PluginStore,
|
|
||||||
}
|
|
||||||
impl ScheduledTasks {
|
|
||||||
pub fn new(store: PluginStore) -> Self {
|
|
||||||
Self { store }
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn handle(
|
|
||||||
&self,
|
|
||||||
task: &str,
|
|
||||||
_properties: BTreeMap<String, String>,
|
|
||||||
) -> anyhow::Result<()> {
|
|
||||||
tracing::info!("scheduling: {}", task);
|
|
||||||
|
|
||||||
let plan = Plan::new(self.store.clone());
|
|
||||||
let tasks: Vec<ConcreteTask> = plan
|
|
||||||
.tasks()
|
|
||||||
.await?
|
|
||||||
.into_iter()
|
|
||||||
.map(|i| i.into_task())
|
|
||||||
.collect();
|
|
||||||
|
|
||||||
for task in tasks {
|
|
||||||
let id = task.id().await?;
|
|
||||||
if !task.should_run().await? {
|
|
||||||
tracing::debug!(task = id, "skipping run");
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
tracing::info!(task = id, "executing task");
|
|
||||||
task.execute().await?;
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,20 +0,0 @@
|
|||||||
use std::{collections::BTreeMap, fmt::Display};
|
|
||||||
|
|
||||||
use serde::{Deserialize, Serialize};
|
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize, Clone, Debug)]
|
|
||||||
#[serde(tag = "type")]
|
|
||||||
pub enum Commands {
|
|
||||||
ScheduleTask {
|
|
||||||
task: String,
|
|
||||||
properties: BTreeMap<String, String>,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Display for Commands {
|
|
||||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
|
||||||
f.write_str(match self {
|
|
||||||
Commands::ScheduleTask { .. } => "schedule_task",
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,239 +0,0 @@
|
|||||||
use anyhow::Context;
|
|
||||||
use component::churn_tasks::process::HostProcess;
|
|
||||||
use futures::StreamExt;
|
|
||||||
use std::sync::Arc;
|
|
||||||
use tokio::io::AsyncWriteExt;
|
|
||||||
use tokio::sync::Mutex;
|
|
||||||
use wasmtime::component::*;
|
|
||||||
use wasmtime::{Config, Engine, Store};
|
|
||||||
use wasmtime_wasi::{DirPerms, FilePerms, WasiCtx, WasiCtxBuilder, WasiView};
|
|
||||||
|
|
||||||
wasmtime::component::bindgen!({
|
|
||||||
path: "wit/world.wit",
|
|
||||||
//world: "churn",
|
|
||||||
async: true,
|
|
||||||
with: {
|
|
||||||
"component:churn-tasks/process/process": CustomProcess
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
#[derive(Default)]
|
|
||||||
pub struct CustomProcess {}
|
|
||||||
impl CustomProcess {
|
|
||||||
pub fn run(&self, args: Vec<String>) -> String {
|
|
||||||
tracing::info!("calling function");
|
|
||||||
|
|
||||||
match args.split_first() {
|
|
||||||
Some((item, rest)) => {
|
|
||||||
let mut cmd = std::process::Command::new(item);
|
|
||||||
match cmd.args(rest).output() {
|
|
||||||
Ok(output) => std::str::from_utf8(&output.stdout)
|
|
||||||
.expect("to be able to parse utf8")
|
|
||||||
.to_string(),
|
|
||||||
Err(e) => {
|
|
||||||
tracing::error!("command failed with output: {e}");
|
|
||||||
e.to_string()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
None => {
|
|
||||||
tracing::warn!("failed to call function because it is empty");
|
|
||||||
panic!("failed to call function because it is empty")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Clone)]
|
|
||||||
pub struct PluginStore {
|
|
||||||
inner: Arc<Mutex<InnerPluginStore>>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl PluginStore {
|
|
||||||
pub fn new() -> anyhow::Result<Self> {
|
|
||||||
Ok(Self {
|
|
||||||
inner: Arc::new(Mutex::new(InnerPluginStore::new()?)),
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn id(&self, plugin: &str) -> anyhow::Result<String> {
|
|
||||||
let mut inner = self.inner.lock().await;
|
|
||||||
inner.id(plugin).await
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn execute(&self, plugin: &str) -> anyhow::Result<()> {
|
|
||||||
let mut inner = self.inner.lock().await;
|
|
||||||
inner.execute(plugin).await
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct InnerPluginStore {
|
|
||||||
store: wasmtime::Store<ServerWasiView>,
|
|
||||||
linker: wasmtime::component::Linker<ServerWasiView>,
|
|
||||||
engine: wasmtime::Engine,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl InnerPluginStore {
|
|
||||||
pub fn new() -> anyhow::Result<Self> {
|
|
||||||
let mut config = Config::default();
|
|
||||||
config.wasm_component_model(true);
|
|
||||||
config.async_support(true);
|
|
||||||
let engine = Engine::new(&config)?;
|
|
||||||
let mut linker: wasmtime::component::Linker<ServerWasiView> = Linker::new(&engine);
|
|
||||||
|
|
||||||
// Add the command world (aka WASI CLI) to the linker
|
|
||||||
wasmtime_wasi::add_to_linker_async(&mut linker).context("Failed to link command world")?;
|
|
||||||
|
|
||||||
component::churn_tasks::process::add_to_linker(
|
|
||||||
&mut linker,
|
|
||||||
|state: &mut ServerWasiView| state,
|
|
||||||
)?;
|
|
||||||
|
|
||||||
let wasi_view = ServerWasiView::new();
|
|
||||||
let store = Store::new(&engine, wasi_view);
|
|
||||||
|
|
||||||
Ok(Self {
|
|
||||||
store,
|
|
||||||
linker,
|
|
||||||
engine,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn id(&mut self, plugin: &str) -> anyhow::Result<String> {
|
|
||||||
let plugin = self.ensure_plugin(plugin).await?;
|
|
||||||
|
|
||||||
plugin
|
|
||||||
.interface0
|
|
||||||
.call_id(&mut self.store)
|
|
||||||
.await
|
|
||||||
.context("Failed to call add function")
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn execute(&mut self, plugin: &str) -> anyhow::Result<()> {
|
|
||||||
let plugin = self.ensure_plugin(plugin).await?;
|
|
||||||
|
|
||||||
plugin
|
|
||||||
.interface0
|
|
||||||
.call_execute(&mut self.store)
|
|
||||||
.await
|
|
||||||
.context("Failed to call add function")
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn ensure_plugin(&mut self, plugin: &str) -> anyhow::Result<Churn> {
|
|
||||||
let cache = dirs::cache_dir()
|
|
||||||
.ok_or(anyhow::anyhow!("failed to find cache dir"))?
|
|
||||||
.join("io.kjuulh.churn");
|
|
||||||
|
|
||||||
let (plugin_name, plugin_version) = plugin.split_once("@").unwrap_or((plugin, "latest"));
|
|
||||||
|
|
||||||
let plugin_path = cache
|
|
||||||
.join("plugins")
|
|
||||||
.join(plugin_name)
|
|
||||||
.join(plugin_version)
|
|
||||||
.join(format!("{plugin_name}.wasm"));
|
|
||||||
|
|
||||||
let no_cache: bool = std::env::var("CHURN_NO_CACHE")
|
|
||||||
.unwrap_or("false".into())
|
|
||||||
.parse()?;
|
|
||||||
|
|
||||||
if !plugin_path.exists() || no_cache {
|
|
||||||
tracing::info!(
|
|
||||||
plugin_name = plugin_name,
|
|
||||||
plugin_version = plugin_version,
|
|
||||||
"downloading plugin"
|
|
||||||
);
|
|
||||||
if let Some(parent) = plugin_path.parent() {
|
|
||||||
tokio::fs::create_dir_all(parent).await?;
|
|
||||||
}
|
|
||||||
|
|
||||||
let req = reqwest::get(format!("https://api-minio.front.kjuulh.io/churn-registry/{plugin_name}/{plugin_version}/{plugin_name}.wasm")).await.context("failed to get plugin from registry")?;
|
|
||||||
let mut stream = req.bytes_stream();
|
|
||||||
let mut file = tokio::fs::File::create(&plugin_path).await?;
|
|
||||||
while let Some(chunk) = stream.next().await {
|
|
||||||
let chunk = chunk?;
|
|
||||||
file.write_all(&chunk).await?;
|
|
||||||
}
|
|
||||||
file.flush().await?;
|
|
||||||
}
|
|
||||||
|
|
||||||
let component =
|
|
||||||
Component::from_file(&self.engine, plugin_path).context("Component file not found")?;
|
|
||||||
|
|
||||||
tracing::debug!(
|
|
||||||
plugin_name = plugin_name,
|
|
||||||
plugin_version = plugin_version,
|
|
||||||
"instantiating plugin"
|
|
||||||
);
|
|
||||||
let instance = Churn::instantiate_async(&mut self.store, &component, &self.linker)
|
|
||||||
.await
|
|
||||||
.context("Failed to instantiate the example world")
|
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
Ok(instance)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
struct ServerWasiView {
|
|
||||||
table: ResourceTable,
|
|
||||||
ctx: WasiCtx,
|
|
||||||
processes: ResourceTable,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl ServerWasiView {
|
|
||||||
fn new() -> Self {
|
|
||||||
let table = ResourceTable::new();
|
|
||||||
|
|
||||||
let ctx = WasiCtxBuilder::new()
|
|
||||||
.inherit_stdio()
|
|
||||||
.inherit_env()
|
|
||||||
.inherit_network()
|
|
||||||
.preopened_dir("/", "/", DirPerms::all(), FilePerms::all())
|
|
||||||
.expect("to be able to open root")
|
|
||||||
.build();
|
|
||||||
|
|
||||||
Self {
|
|
||||||
table,
|
|
||||||
ctx,
|
|
||||||
processes: ResourceTable::default(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl WasiView for ServerWasiView {
|
|
||||||
fn table(&mut self) -> &mut ResourceTable {
|
|
||||||
&mut self.table
|
|
||||||
}
|
|
||||||
|
|
||||||
fn ctx(&mut self) -> &mut WasiCtx {
|
|
||||||
&mut self.ctx
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl component::churn_tasks::process::Host for ServerWasiView {}
|
|
||||||
|
|
||||||
#[async_trait::async_trait]
|
|
||||||
impl HostProcess for ServerWasiView {
|
|
||||||
async fn new(
|
|
||||||
&mut self,
|
|
||||||
) -> wasmtime::component::Resource<component::churn_tasks::process::Process> {
|
|
||||||
self.processes.push(CustomProcess::default()).unwrap()
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn run_process(
|
|
||||||
&mut self,
|
|
||||||
self_: wasmtime::component::Resource<component::churn_tasks::process::Process>,
|
|
||||||
inputs: wasmtime::component::__internal::Vec<String>,
|
|
||||||
) -> String {
|
|
||||||
let process = self.processes.get(&self_).unwrap();
|
|
||||||
process.run(inputs)
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn drop(
|
|
||||||
&mut self,
|
|
||||||
rep: wasmtime::component::Resource<component::churn_tasks::process::Process>,
|
|
||||||
) -> wasmtime::Result<()> {
|
|
||||||
self.processes.delete(rep)?;
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,67 +0,0 @@
|
|||||||
use std::sync::Arc;
|
|
||||||
|
|
||||||
use notmad::{Component, MadError};
|
|
||||||
use tokio::sync::Mutex;
|
|
||||||
|
|
||||||
use super::{models::Commands, scheduler::Scheduler};
|
|
||||||
|
|
||||||
#[derive(Clone)]
|
|
||||||
pub struct AgentQueue {
|
|
||||||
sender: Arc<tokio::sync::mpsc::Sender<Commands>>,
|
|
||||||
receiver: Arc<Mutex<tokio::sync::mpsc::Receiver<Commands>>>,
|
|
||||||
|
|
||||||
scheduler: Scheduler,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl AgentQueue {
|
|
||||||
pub fn new(scheduler: Scheduler) -> Self {
|
|
||||||
let (tx, rx) = tokio::sync::mpsc::channel(5);
|
|
||||||
Self {
|
|
||||||
sender: Arc::new(tx),
|
|
||||||
receiver: Arc::new(Mutex::new(rx)),
|
|
||||||
|
|
||||||
scheduler,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn handler(&self, command: Commands) -> anyhow::Result<()> {
|
|
||||||
tracing::debug!("handling task");
|
|
||||||
|
|
||||||
self.scheduler.handle(command).await?;
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn publish(&self, command: Commands) -> anyhow::Result<()> {
|
|
||||||
tracing::debug!("publishing task: {}", command.to_string());
|
|
||||||
|
|
||||||
self.sender.send(command).await?;
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[async_trait::async_trait]
|
|
||||||
impl Component for AgentQueue {
|
|
||||||
async fn run(
|
|
||||||
&self,
|
|
||||||
cancellation_token: tokio_util::sync::CancellationToken,
|
|
||||||
) -> Result<(), notmad::MadError> {
|
|
||||||
loop {
|
|
||||||
let mut recv = self.receiver.lock().await;
|
|
||||||
|
|
||||||
tokio::select! {
|
|
||||||
res = recv.recv() => {
|
|
||||||
if let Some(res) = res {
|
|
||||||
self.handler(res).await.map_err(MadError::Inner)?;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
_ = cancellation_token.cancelled() => {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,37 +1,29 @@
|
|||||||
use std::collections::BTreeMap;
|
use anyhow::Context;
|
||||||
|
|
||||||
use crate::agent::models::Commands;
|
use super::agent_state::AgentState;
|
||||||
|
|
||||||
use super::{agent_state::AgentState, queue::AgentQueue};
|
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct AgentRefresh {
|
pub struct AgentRefresh {
|
||||||
process_host: String,
|
_state: AgentState,
|
||||||
queue: AgentQueue,
|
host: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl AgentRefresh {
|
impl AgentRefresh {
|
||||||
pub fn new(state: impl Into<AgentState>) -> Self {
|
pub fn new(state: impl Into<AgentState>, host: impl Into<String>) -> Self {
|
||||||
let state: AgentState = state.into();
|
|
||||||
Self {
|
Self {
|
||||||
process_host: state.discovery.process_host.clone(),
|
_state: state.into(),
|
||||||
queue: state.queue.clone(),
|
host: host.into(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait::async_trait]
|
#[async_trait::async_trait]
|
||||||
impl notmad::Component for AgentRefresh {
|
impl notmad::Component for AgentRefresh {
|
||||||
fn name(&self) -> Option<String> {
|
|
||||||
Some("agent_refresh".into())
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn run(
|
async fn run(
|
||||||
&self,
|
&self,
|
||||||
cancellation_token: tokio_util::sync::CancellationToken,
|
cancellation_token: tokio_util::sync::CancellationToken,
|
||||||
) -> Result<(), notmad::MadError> {
|
) -> Result<(), notmad::MadError> {
|
||||||
let cancel =
|
let cancel = nodrift::schedule_drifter(std::time::Duration::from_secs(60), self.clone());
|
||||||
nodrift::schedule_drifter(std::time::Duration::from_secs(60 * 10), self.clone());
|
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
_ = cancel.cancelled() => {},
|
_ = cancel.cancelled() => {},
|
||||||
_ = cancellation_token.cancelled() => {
|
_ = cancellation_token.cancelled() => {
|
||||||
@ -47,14 +39,82 @@ impl notmad::Component for AgentRefresh {
|
|||||||
#[async_trait::async_trait]
|
#[async_trait::async_trait]
|
||||||
impl nodrift::Drifter for AgentRefresh {
|
impl nodrift::Drifter for AgentRefresh {
|
||||||
async fn execute(&self, _token: tokio_util::sync::CancellationToken) -> anyhow::Result<()> {
|
async fn execute(&self, _token: tokio_util::sync::CancellationToken) -> anyhow::Result<()> {
|
||||||
tracing::info!(process_host = self.process_host, "refreshing agent");
|
tracing::info!(host = self.host, "refreshing agent");
|
||||||
|
|
||||||
self.queue
|
// Get plan
|
||||||
.publish(Commands::ScheduleTask {
|
let plan = Plan::new();
|
||||||
task: "update".into(),
|
let tasks = plan.tasks().await?;
|
||||||
properties: BTreeMap::default(),
|
|
||||||
})
|
// For task
|
||||||
.await?;
|
for task in tasks {
|
||||||
|
// Check idempotency rules
|
||||||
|
if !task.should_run().await? {
|
||||||
|
tracing::debug!(task = task.id(), "skipping run");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Run task if not valid
|
||||||
|
tracing::info!(task = task.id(), "executing task");
|
||||||
|
task.execute().await?;
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct Plan {}
|
||||||
|
impl Plan {
|
||||||
|
pub fn new() -> Self {
|
||||||
|
Self {}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn tasks(&self) -> anyhow::Result<Vec<Task>> {
|
||||||
|
Ok(vec![Task::new()])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct Task {}
|
||||||
|
|
||||||
|
impl Task {
|
||||||
|
pub fn new() -> Self {
|
||||||
|
Self {}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn id(&self) -> String {
|
||||||
|
"apt".into()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn should_run(&self) -> anyhow::Result<bool> {
|
||||||
|
Ok(true)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn execute(&self) -> anyhow::Result<()> {
|
||||||
|
let mut cmd = tokio::process::Command::new("apt-get");
|
||||||
|
cmd.args(["update", "-q"]);
|
||||||
|
let output = cmd.output().await.context("failed to run apt update")?;
|
||||||
|
match output.status.success() {
|
||||||
|
true => tracing::info!("successfully ran apt update"),
|
||||||
|
false => {
|
||||||
|
anyhow::bail!(
|
||||||
|
"failed to run apt update: {}",
|
||||||
|
std::str::from_utf8(&output.stderr)?
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut cmd = tokio::process::Command::new("apt-get");
|
||||||
|
cmd.env("DEBIAN_FRONTEND", "noninteractive")
|
||||||
|
.args(["upgrade", "-y"]);
|
||||||
|
let output = cmd.output().await.context("failed to run apt upgrade")?;
|
||||||
|
match output.status.success() {
|
||||||
|
true => tracing::info!("successfully ran apt upgrade"),
|
||||||
|
false => {
|
||||||
|
anyhow::bail!(
|
||||||
|
"failed to run apt upgrade: {}",
|
||||||
|
std::str::from_utf8(&output.stderr)?
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
@ -1,22 +0,0 @@
|
|||||||
use super::{handlers::scheduled_tasks::ScheduledTasks, models::Commands};
|
|
||||||
|
|
||||||
#[derive(Clone)]
|
|
||||||
pub struct Scheduler {
|
|
||||||
scheduled_tasks: ScheduledTasks,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Scheduler {
|
|
||||||
pub fn new(scheduled_tasks: ScheduledTasks) -> Self {
|
|
||||||
Self { scheduled_tasks }
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn handle(&self, command: Commands) -> anyhow::Result<()> {
|
|
||||||
match command {
|
|
||||||
Commands::ScheduleTask { task, properties } => {
|
|
||||||
self.scheduled_tasks.handle(&task, properties).await?;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,45 +0,0 @@
|
|||||||
use std::sync::Arc;
|
|
||||||
|
|
||||||
#[async_trait::async_trait]
|
|
||||||
pub trait Task {
|
|
||||||
async fn id(&self) -> anyhow::Result<String>;
|
|
||||||
async fn should_run(&self) -> anyhow::Result<bool> {
|
|
||||||
Ok(true)
|
|
||||||
}
|
|
||||||
async fn execute(&self) -> anyhow::Result<()>;
|
|
||||||
}
|
|
||||||
|
|
||||||
pub trait IntoTask {
|
|
||||||
fn into_task(self) -> ConcreteTask;
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Clone)]
|
|
||||||
pub struct ConcreteTask {
|
|
||||||
inner: Arc<dyn Task + Sync + Send + 'static>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl ConcreteTask {
|
|
||||||
pub fn new<T: Task + Sync + Send + 'static>(t: T) -> Self {
|
|
||||||
Self { inner: Arc::new(t) }
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl std::ops::Deref for ConcreteTask {
|
|
||||||
type Target = Arc<dyn Task + Sync + Send + 'static>;
|
|
||||||
|
|
||||||
fn deref(&self) -> &Self::Target {
|
|
||||||
&self.inner
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl IntoTask for ConcreteTask {
|
|
||||||
fn into_task(self) -> ConcreteTask {
|
|
||||||
self
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<T: Task + Sync + Send + 'static> IntoTask for T {
|
|
||||||
fn into_task(self) -> ConcreteTask {
|
|
||||||
ConcreteTask::new(self)
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,12 +1,6 @@
|
|||||||
use std::net::SocketAddr;
|
use std::net::SocketAddr;
|
||||||
|
|
||||||
use axum::{
|
use axum::{extract::MatchedPath, http::Request, routing::get, Router};
|
||||||
extract::{MatchedPath, State},
|
|
||||||
http::Request,
|
|
||||||
routing::get,
|
|
||||||
Json, Router,
|
|
||||||
};
|
|
||||||
use serde::{Deserialize, Serialize};
|
|
||||||
use tokio_util::sync::CancellationToken;
|
use tokio_util::sync::CancellationToken;
|
||||||
use tower_http::trace::TraceLayer;
|
use tower_http::trace::TraceLayer;
|
||||||
|
|
||||||
@ -28,7 +22,6 @@ impl Api {
|
|||||||
pub async fn serve(&self) -> anyhow::Result<()> {
|
pub async fn serve(&self) -> anyhow::Result<()> {
|
||||||
let app = Router::new()
|
let app = Router::new()
|
||||||
.route("/", get(root))
|
.route("/", get(root))
|
||||||
.route("/discovery", get(discovery))
|
|
||||||
.with_state(self.state.clone())
|
.with_state(self.state.clone())
|
||||||
.layer(
|
.layer(
|
||||||
TraceLayer::new_for_http().make_span_with(|request: &Request<_>| {
|
TraceLayer::new_for_http().make_span_with(|request: &Request<_>| {
|
||||||
@ -62,28 +55,6 @@ async fn root() -> &'static str {
|
|||||||
"Hello, churn!"
|
"Hello, churn!"
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize)]
|
|
||||||
pub struct Discovery {
|
|
||||||
pub external_host: String,
|
|
||||||
pub process_host: String,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Discovery {
|
|
||||||
pub async fn get_from_host(host: &str) -> anyhow::Result<Self> {
|
|
||||||
let resp = reqwest::get(format!("{}/discovery", host.trim_end_matches('/'))).await?;
|
|
||||||
let s: Self = resp.json().await?;
|
|
||||||
|
|
||||||
Ok(s)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn discovery(State(state): State<SharedState>) -> Json<Discovery> {
|
|
||||||
Json(Discovery {
|
|
||||||
external_host: state.config.external_host.clone(),
|
|
||||||
process_host: state.config.process_host.clone(),
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
#[async_trait::async_trait]
|
#[async_trait::async_trait]
|
||||||
impl notmad::Component for Api {
|
impl notmad::Component for Api {
|
||||||
async fn run(&self, _cancellation_token: CancellationToken) -> Result<(), notmad::MadError> {
|
async fn run(&self, _cancellation_token: CancellationToken) -> Result<(), notmad::MadError> {
|
||||||
|
@ -1,39 +1,28 @@
|
|||||||
use std::{collections::BTreeMap, net::SocketAddr};
|
use std::net::SocketAddr;
|
||||||
|
|
||||||
use clap::{Parser, Subcommand};
|
use clap::{Parser, Subcommand};
|
||||||
|
|
||||||
use crate::{agent, server};
|
use crate::{agent, api, state::SharedState};
|
||||||
|
|
||||||
pub async fn execute() -> anyhow::Result<()> {
|
pub async fn execute() -> anyhow::Result<()> {
|
||||||
|
let state = SharedState::new().await?;
|
||||||
|
|
||||||
let cli = Command::parse();
|
let cli = Command::parse();
|
||||||
match cli.command.expect("to have a subcommand") {
|
match cli.command.expect("to have a subcommand") {
|
||||||
Commands::Serve {
|
Commands::Serve { host } => {
|
||||||
host,
|
|
||||||
grpc_host,
|
|
||||||
config,
|
|
||||||
} => {
|
|
||||||
tracing::info!("Starting service");
|
tracing::info!("Starting service");
|
||||||
server::execute(host, grpc_host, config).await?;
|
|
||||||
|
notmad::Mad::builder()
|
||||||
|
.add(api::Api::new(&state, host))
|
||||||
|
.run()
|
||||||
|
.await?;
|
||||||
}
|
}
|
||||||
Commands::Agent { commands } => match commands {
|
Commands::Agent { commands } => match commands {
|
||||||
AgentCommands::Start {} => {
|
AgentCommands::Start { host } => {
|
||||||
tracing::info!("starting agent");
|
tracing::info!("starting agent");
|
||||||
agent::execute().await?;
|
agent::execute(host).await?;
|
||||||
tracing::info!("shut down agent");
|
tracing::info!("shut down agent");
|
||||||
}
|
}
|
||||||
AgentCommands::Setup {
|
|
||||||
force,
|
|
||||||
discovery,
|
|
||||||
labels,
|
|
||||||
} => {
|
|
||||||
let mut setup_labels = BTreeMap::new();
|
|
||||||
for (k, v) in labels {
|
|
||||||
setup_labels.insert(k, v);
|
|
||||||
}
|
|
||||||
|
|
||||||
agent::setup_config(discovery, force, setup_labels).await?;
|
|
||||||
tracing::info!("wrote default agent config");
|
|
||||||
}
|
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -52,12 +41,6 @@ enum Commands {
|
|||||||
Serve {
|
Serve {
|
||||||
#[arg(env = "SERVICE_HOST", long, default_value = "127.0.0.1:3000")]
|
#[arg(env = "SERVICE_HOST", long, default_value = "127.0.0.1:3000")]
|
||||||
host: SocketAddr,
|
host: SocketAddr,
|
||||||
|
|
||||||
#[arg(env = "SERVICE_GRPC_HOST", long, default_value = "127.0.0.1:7900")]
|
|
||||||
grpc_host: SocketAddr,
|
|
||||||
|
|
||||||
#[clap(flatten)]
|
|
||||||
config: server::config::ServerConfig,
|
|
||||||
},
|
},
|
||||||
Agent {
|
Agent {
|
||||||
#[command(subcommand)]
|
#[command(subcommand)]
|
||||||
@ -67,22 +50,8 @@ enum Commands {
|
|||||||
|
|
||||||
#[derive(Subcommand)]
|
#[derive(Subcommand)]
|
||||||
enum AgentCommands {
|
enum AgentCommands {
|
||||||
Start {},
|
Start {
|
||||||
Setup {
|
#[arg(env = "SERVICE_HOST", long = "service-host")]
|
||||||
#[arg(long, default_value = "false")]
|
host: String,
|
||||||
force: bool,
|
|
||||||
|
|
||||||
#[arg(env = "DISCOVERY_HOST", long = "discovery")]
|
|
||||||
discovery: String,
|
|
||||||
|
|
||||||
#[arg(long = "label", short = 'l', value_parser = parse_key_val, action = clap::ArgAction::Append)]
|
|
||||||
labels: Vec<(String, String)>,
|
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
fn parse_key_val(s: &str) -> Result<(String, String), String> {
|
|
||||||
let (key, value) = s
|
|
||||||
.split_once("=")
|
|
||||||
.ok_or_else(|| format!("invalid key=value: no `=` found in `{s}`"))?;
|
|
||||||
Ok((key.to_string(), value.to_string()))
|
|
||||||
}
|
|
||||||
|
@ -1,52 +0,0 @@
|
|||||||
// @generated
|
|
||||||
// This file is @generated by prost-build.
|
|
||||||
#[allow(clippy::derive_partial_eq_without_eq)]
|
|
||||||
#[derive(Clone, PartialEq, ::prost::Message)]
|
|
||||||
pub struct GetKeyRequest {
|
|
||||||
#[prost(string, tag="1")]
|
|
||||||
pub namespace: ::prost::alloc::string::String,
|
|
||||||
#[prost(string, optional, tag="2")]
|
|
||||||
pub id: ::core::option::Option<::prost::alloc::string::String>,
|
|
||||||
#[prost(string, tag="3")]
|
|
||||||
pub key: ::prost::alloc::string::String,
|
|
||||||
}
|
|
||||||
#[allow(clippy::derive_partial_eq_without_eq)]
|
|
||||||
#[derive(Clone, PartialEq, ::prost::Message)]
|
|
||||||
pub struct GetKeyResponse {
|
|
||||||
#[prost(string, optional, tag="1")]
|
|
||||||
pub value: ::core::option::Option<::prost::alloc::string::String>,
|
|
||||||
}
|
|
||||||
#[allow(clippy::derive_partial_eq_without_eq)]
|
|
||||||
#[derive(Clone, PartialEq, ::prost::Message)]
|
|
||||||
pub struct SetKeyRequest {
|
|
||||||
#[prost(string, tag="1")]
|
|
||||||
pub namespace: ::prost::alloc::string::String,
|
|
||||||
#[prost(string, optional, tag="2")]
|
|
||||||
pub id: ::core::option::Option<::prost::alloc::string::String>,
|
|
||||||
#[prost(string, tag="3")]
|
|
||||||
pub key: ::prost::alloc::string::String,
|
|
||||||
#[prost(string, tag="4")]
|
|
||||||
pub value: ::prost::alloc::string::String,
|
|
||||||
}
|
|
||||||
#[allow(clippy::derive_partial_eq_without_eq)]
|
|
||||||
#[derive(Clone, Copy, PartialEq, ::prost::Message)]
|
|
||||||
pub struct SetKeyResponse {
|
|
||||||
}
|
|
||||||
#[allow(clippy::derive_partial_eq_without_eq)]
|
|
||||||
#[derive(Clone, PartialEq, ::prost::Message)]
|
|
||||||
pub struct ListenEventsRequest {
|
|
||||||
#[prost(string, tag="1")]
|
|
||||||
pub namespace: ::prost::alloc::string::String,
|
|
||||||
#[prost(string, optional, tag="2")]
|
|
||||||
pub id: ::core::option::Option<::prost::alloc::string::String>,
|
|
||||||
}
|
|
||||||
#[allow(clippy::derive_partial_eq_without_eq)]
|
|
||||||
#[derive(Clone, PartialEq, ::prost::Message)]
|
|
||||||
pub struct ListenEventsResponse {
|
|
||||||
#[prost(string, tag="1")]
|
|
||||||
pub id: ::prost::alloc::string::String,
|
|
||||||
#[prost(string, tag="2")]
|
|
||||||
pub value: ::prost::alloc::string::String,
|
|
||||||
}
|
|
||||||
include!("churn.v1.tonic.rs");
|
|
||||||
// @@protoc_insertion_point(module)
|
|
@ -1,435 +0,0 @@
|
|||||||
// @generated
|
|
||||||
/// Generated client implementations.
|
|
||||||
pub mod churn_client {
|
|
||||||
#![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)]
|
|
||||||
use tonic::codegen::*;
|
|
||||||
use tonic::codegen::http::Uri;
|
|
||||||
#[derive(Debug, Clone)]
|
|
||||||
pub struct ChurnClient<T> {
|
|
||||||
inner: tonic::client::Grpc<T>,
|
|
||||||
}
|
|
||||||
impl ChurnClient<tonic::transport::Channel> {
|
|
||||||
/// Attempt to create a new client by connecting to a given endpoint.
|
|
||||||
pub async fn connect<D>(dst: D) -> Result<Self, tonic::transport::Error>
|
|
||||||
where
|
|
||||||
D: TryInto<tonic::transport::Endpoint>,
|
|
||||||
D::Error: Into<StdError>,
|
|
||||||
{
|
|
||||||
let conn = tonic::transport::Endpoint::new(dst)?.connect().await?;
|
|
||||||
Ok(Self::new(conn))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
impl<T> ChurnClient<T>
|
|
||||||
where
|
|
||||||
T: tonic::client::GrpcService<tonic::body::BoxBody>,
|
|
||||||
T::Error: Into<StdError>,
|
|
||||||
T::ResponseBody: Body<Data = Bytes> + Send + 'static,
|
|
||||||
<T::ResponseBody as Body>::Error: Into<StdError> + Send,
|
|
||||||
{
|
|
||||||
pub fn new(inner: T) -> Self {
|
|
||||||
let inner = tonic::client::Grpc::new(inner);
|
|
||||||
Self { inner }
|
|
||||||
}
|
|
||||||
pub fn with_origin(inner: T, origin: Uri) -> Self {
|
|
||||||
let inner = tonic::client::Grpc::with_origin(inner, origin);
|
|
||||||
Self { inner }
|
|
||||||
}
|
|
||||||
pub fn with_interceptor<F>(
|
|
||||||
inner: T,
|
|
||||||
interceptor: F,
|
|
||||||
) -> ChurnClient<InterceptedService<T, F>>
|
|
||||||
where
|
|
||||||
F: tonic::service::Interceptor,
|
|
||||||
T::ResponseBody: Default,
|
|
||||||
T: tonic::codegen::Service<
|
|
||||||
http::Request<tonic::body::BoxBody>,
|
|
||||||
Response = http::Response<
|
|
||||||
<T as tonic::client::GrpcService<tonic::body::BoxBody>>::ResponseBody,
|
|
||||||
>,
|
|
||||||
>,
|
|
||||||
<T as tonic::codegen::Service<
|
|
||||||
http::Request<tonic::body::BoxBody>,
|
|
||||||
>>::Error: Into<StdError> + Send + Sync,
|
|
||||||
{
|
|
||||||
ChurnClient::new(InterceptedService::new(inner, interceptor))
|
|
||||||
}
|
|
||||||
/// Compress requests with the given encoding.
|
|
||||||
///
|
|
||||||
/// This requires the server to support it otherwise it might respond with an
|
|
||||||
/// error.
|
|
||||||
#[must_use]
|
|
||||||
pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self {
|
|
||||||
self.inner = self.inner.send_compressed(encoding);
|
|
||||||
self
|
|
||||||
}
|
|
||||||
/// Enable decompressing responses.
|
|
||||||
#[must_use]
|
|
||||||
pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self {
|
|
||||||
self.inner = self.inner.accept_compressed(encoding);
|
|
||||||
self
|
|
||||||
}
|
|
||||||
/// Limits the maximum size of a decoded message.
|
|
||||||
///
|
|
||||||
/// Default: `4MB`
|
|
||||||
#[must_use]
|
|
||||||
pub fn max_decoding_message_size(mut self, limit: usize) -> Self {
|
|
||||||
self.inner = self.inner.max_decoding_message_size(limit);
|
|
||||||
self
|
|
||||||
}
|
|
||||||
/// Limits the maximum size of an encoded message.
|
|
||||||
///
|
|
||||||
/// Default: `usize::MAX`
|
|
||||||
#[must_use]
|
|
||||||
pub fn max_encoding_message_size(mut self, limit: usize) -> Self {
|
|
||||||
self.inner = self.inner.max_encoding_message_size(limit);
|
|
||||||
self
|
|
||||||
}
|
|
||||||
pub async fn get_key(
|
|
||||||
&mut self,
|
|
||||||
request: impl tonic::IntoRequest<super::GetKeyRequest>,
|
|
||||||
) -> std::result::Result<tonic::Response<super::GetKeyResponse>, tonic::Status> {
|
|
||||||
self.inner
|
|
||||||
.ready()
|
|
||||||
.await
|
|
||||||
.map_err(|e| {
|
|
||||||
tonic::Status::new(
|
|
||||||
tonic::Code::Unknown,
|
|
||||||
format!("Service was not ready: {}", e.into()),
|
|
||||||
)
|
|
||||||
})?;
|
|
||||||
let codec = tonic::codec::ProstCodec::default();
|
|
||||||
let path = http::uri::PathAndQuery::from_static("/churn.v1.Churn/GetKey");
|
|
||||||
let mut req = request.into_request();
|
|
||||||
req.extensions_mut().insert(GrpcMethod::new("churn.v1.Churn", "GetKey"));
|
|
||||||
self.inner.unary(req, path, codec).await
|
|
||||||
}
|
|
||||||
pub async fn set_key(
|
|
||||||
&mut self,
|
|
||||||
request: impl tonic::IntoRequest<super::SetKeyRequest>,
|
|
||||||
) -> std::result::Result<tonic::Response<super::SetKeyResponse>, tonic::Status> {
|
|
||||||
self.inner
|
|
||||||
.ready()
|
|
||||||
.await
|
|
||||||
.map_err(|e| {
|
|
||||||
tonic::Status::new(
|
|
||||||
tonic::Code::Unknown,
|
|
||||||
format!("Service was not ready: {}", e.into()),
|
|
||||||
)
|
|
||||||
})?;
|
|
||||||
let codec = tonic::codec::ProstCodec::default();
|
|
||||||
let path = http::uri::PathAndQuery::from_static("/churn.v1.Churn/SetKey");
|
|
||||||
let mut req = request.into_request();
|
|
||||||
req.extensions_mut().insert(GrpcMethod::new("churn.v1.Churn", "SetKey"));
|
|
||||||
self.inner.unary(req, path, codec).await
|
|
||||||
}
|
|
||||||
pub async fn listen_events(
|
|
||||||
&mut self,
|
|
||||||
request: impl tonic::IntoRequest<super::ListenEventsRequest>,
|
|
||||||
) -> std::result::Result<
|
|
||||||
tonic::Response<tonic::codec::Streaming<super::ListenEventsResponse>>,
|
|
||||||
tonic::Status,
|
|
||||||
> {
|
|
||||||
self.inner
|
|
||||||
.ready()
|
|
||||||
.await
|
|
||||||
.map_err(|e| {
|
|
||||||
tonic::Status::new(
|
|
||||||
tonic::Code::Unknown,
|
|
||||||
format!("Service was not ready: {}", e.into()),
|
|
||||||
)
|
|
||||||
})?;
|
|
||||||
let codec = tonic::codec::ProstCodec::default();
|
|
||||||
let path = http::uri::PathAndQuery::from_static(
|
|
||||||
"/churn.v1.Churn/ListenEvents",
|
|
||||||
);
|
|
||||||
let mut req = request.into_request();
|
|
||||||
req.extensions_mut()
|
|
||||||
.insert(GrpcMethod::new("churn.v1.Churn", "ListenEvents"));
|
|
||||||
self.inner.server_streaming(req, path, codec).await
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
/// Generated server implementations.
|
|
||||||
pub mod churn_server {
|
|
||||||
#![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)]
|
|
||||||
use tonic::codegen::*;
|
|
||||||
/// Generated trait containing gRPC methods that should be implemented for use with ChurnServer.
|
|
||||||
#[async_trait]
|
|
||||||
pub trait Churn: Send + Sync + 'static {
|
|
||||||
async fn get_key(
|
|
||||||
&self,
|
|
||||||
request: tonic::Request<super::GetKeyRequest>,
|
|
||||||
) -> std::result::Result<tonic::Response<super::GetKeyResponse>, tonic::Status>;
|
|
||||||
async fn set_key(
|
|
||||||
&self,
|
|
||||||
request: tonic::Request<super::SetKeyRequest>,
|
|
||||||
) -> std::result::Result<tonic::Response<super::SetKeyResponse>, tonic::Status>;
|
|
||||||
/// Server streaming response type for the ListenEvents method.
|
|
||||||
type ListenEventsStream: tonic::codegen::tokio_stream::Stream<
|
|
||||||
Item = std::result::Result<super::ListenEventsResponse, tonic::Status>,
|
|
||||||
>
|
|
||||||
+ Send
|
|
||||||
+ 'static;
|
|
||||||
async fn listen_events(
|
|
||||||
&self,
|
|
||||||
request: tonic::Request<super::ListenEventsRequest>,
|
|
||||||
) -> std::result::Result<
|
|
||||||
tonic::Response<Self::ListenEventsStream>,
|
|
||||||
tonic::Status,
|
|
||||||
>;
|
|
||||||
}
|
|
||||||
#[derive(Debug)]
|
|
||||||
pub struct ChurnServer<T: Churn> {
|
|
||||||
inner: _Inner<T>,
|
|
||||||
accept_compression_encodings: EnabledCompressionEncodings,
|
|
||||||
send_compression_encodings: EnabledCompressionEncodings,
|
|
||||||
max_decoding_message_size: Option<usize>,
|
|
||||||
max_encoding_message_size: Option<usize>,
|
|
||||||
}
|
|
||||||
struct _Inner<T>(Arc<T>);
|
|
||||||
impl<T: Churn> ChurnServer<T> {
|
|
||||||
pub fn new(inner: T) -> Self {
|
|
||||||
Self::from_arc(Arc::new(inner))
|
|
||||||
}
|
|
||||||
pub fn from_arc(inner: Arc<T>) -> Self {
|
|
||||||
let inner = _Inner(inner);
|
|
||||||
Self {
|
|
||||||
inner,
|
|
||||||
accept_compression_encodings: Default::default(),
|
|
||||||
send_compression_encodings: Default::default(),
|
|
||||||
max_decoding_message_size: None,
|
|
||||||
max_encoding_message_size: None,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
pub fn with_interceptor<F>(
|
|
||||||
inner: T,
|
|
||||||
interceptor: F,
|
|
||||||
) -> InterceptedService<Self, F>
|
|
||||||
where
|
|
||||||
F: tonic::service::Interceptor,
|
|
||||||
{
|
|
||||||
InterceptedService::new(Self::new(inner), interceptor)
|
|
||||||
}
|
|
||||||
/// Enable decompressing requests with the given encoding.
|
|
||||||
#[must_use]
|
|
||||||
pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self {
|
|
||||||
self.accept_compression_encodings.enable(encoding);
|
|
||||||
self
|
|
||||||
}
|
|
||||||
/// Compress responses with the given encoding, if the client supports it.
|
|
||||||
#[must_use]
|
|
||||||
pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self {
|
|
||||||
self.send_compression_encodings.enable(encoding);
|
|
||||||
self
|
|
||||||
}
|
|
||||||
/// Limits the maximum size of a decoded message.
|
|
||||||
///
|
|
||||||
/// Default: `4MB`
|
|
||||||
#[must_use]
|
|
||||||
pub fn max_decoding_message_size(mut self, limit: usize) -> Self {
|
|
||||||
self.max_decoding_message_size = Some(limit);
|
|
||||||
self
|
|
||||||
}
|
|
||||||
/// Limits the maximum size of an encoded message.
|
|
||||||
///
|
|
||||||
/// Default: `usize::MAX`
|
|
||||||
#[must_use]
|
|
||||||
pub fn max_encoding_message_size(mut self, limit: usize) -> Self {
|
|
||||||
self.max_encoding_message_size = Some(limit);
|
|
||||||
self
|
|
||||||
}
|
|
||||||
}
|
|
||||||
impl<T, B> tonic::codegen::Service<http::Request<B>> for ChurnServer<T>
|
|
||||||
where
|
|
||||||
T: Churn,
|
|
||||||
B: Body + Send + 'static,
|
|
||||||
B::Error: Into<StdError> + Send + 'static,
|
|
||||||
{
|
|
||||||
type Response = http::Response<tonic::body::BoxBody>;
|
|
||||||
type Error = std::convert::Infallible;
|
|
||||||
type Future = BoxFuture<Self::Response, Self::Error>;
|
|
||||||
fn poll_ready(
|
|
||||||
&mut self,
|
|
||||||
_cx: &mut Context<'_>,
|
|
||||||
) -> Poll<std::result::Result<(), Self::Error>> {
|
|
||||||
Poll::Ready(Ok(()))
|
|
||||||
}
|
|
||||||
fn call(&mut self, req: http::Request<B>) -> Self::Future {
|
|
||||||
let inner = self.inner.clone();
|
|
||||||
match req.uri().path() {
|
|
||||||
"/churn.v1.Churn/GetKey" => {
|
|
||||||
#[allow(non_camel_case_types)]
|
|
||||||
struct GetKeySvc<T: Churn>(pub Arc<T>);
|
|
||||||
impl<T: Churn> tonic::server::UnaryService<super::GetKeyRequest>
|
|
||||||
for GetKeySvc<T> {
|
|
||||||
type Response = super::GetKeyResponse;
|
|
||||||
type Future = BoxFuture<
|
|
||||||
tonic::Response<Self::Response>,
|
|
||||||
tonic::Status,
|
|
||||||
>;
|
|
||||||
fn call(
|
|
||||||
&mut self,
|
|
||||||
request: tonic::Request<super::GetKeyRequest>,
|
|
||||||
) -> Self::Future {
|
|
||||||
let inner = Arc::clone(&self.0);
|
|
||||||
let fut = async move {
|
|
||||||
<T as Churn>::get_key(&inner, request).await
|
|
||||||
};
|
|
||||||
Box::pin(fut)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
let accept_compression_encodings = self.accept_compression_encodings;
|
|
||||||
let send_compression_encodings = self.send_compression_encodings;
|
|
||||||
let max_decoding_message_size = self.max_decoding_message_size;
|
|
||||||
let max_encoding_message_size = self.max_encoding_message_size;
|
|
||||||
let inner = self.inner.clone();
|
|
||||||
let fut = async move {
|
|
||||||
let inner = inner.0;
|
|
||||||
let method = GetKeySvc(inner);
|
|
||||||
let codec = tonic::codec::ProstCodec::default();
|
|
||||||
let mut grpc = tonic::server::Grpc::new(codec)
|
|
||||||
.apply_compression_config(
|
|
||||||
accept_compression_encodings,
|
|
||||||
send_compression_encodings,
|
|
||||||
)
|
|
||||||
.apply_max_message_size_config(
|
|
||||||
max_decoding_message_size,
|
|
||||||
max_encoding_message_size,
|
|
||||||
);
|
|
||||||
let res = grpc.unary(method, req).await;
|
|
||||||
Ok(res)
|
|
||||||
};
|
|
||||||
Box::pin(fut)
|
|
||||||
}
|
|
||||||
"/churn.v1.Churn/SetKey" => {
|
|
||||||
#[allow(non_camel_case_types)]
|
|
||||||
struct SetKeySvc<T: Churn>(pub Arc<T>);
|
|
||||||
impl<T: Churn> tonic::server::UnaryService<super::SetKeyRequest>
|
|
||||||
for SetKeySvc<T> {
|
|
||||||
type Response = super::SetKeyResponse;
|
|
||||||
type Future = BoxFuture<
|
|
||||||
tonic::Response<Self::Response>,
|
|
||||||
tonic::Status,
|
|
||||||
>;
|
|
||||||
fn call(
|
|
||||||
&mut self,
|
|
||||||
request: tonic::Request<super::SetKeyRequest>,
|
|
||||||
) -> Self::Future {
|
|
||||||
let inner = Arc::clone(&self.0);
|
|
||||||
let fut = async move {
|
|
||||||
<T as Churn>::set_key(&inner, request).await
|
|
||||||
};
|
|
||||||
Box::pin(fut)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
let accept_compression_encodings = self.accept_compression_encodings;
|
|
||||||
let send_compression_encodings = self.send_compression_encodings;
|
|
||||||
let max_decoding_message_size = self.max_decoding_message_size;
|
|
||||||
let max_encoding_message_size = self.max_encoding_message_size;
|
|
||||||
let inner = self.inner.clone();
|
|
||||||
let fut = async move {
|
|
||||||
let inner = inner.0;
|
|
||||||
let method = SetKeySvc(inner);
|
|
||||||
let codec = tonic::codec::ProstCodec::default();
|
|
||||||
let mut grpc = tonic::server::Grpc::new(codec)
|
|
||||||
.apply_compression_config(
|
|
||||||
accept_compression_encodings,
|
|
||||||
send_compression_encodings,
|
|
||||||
)
|
|
||||||
.apply_max_message_size_config(
|
|
||||||
max_decoding_message_size,
|
|
||||||
max_encoding_message_size,
|
|
||||||
);
|
|
||||||
let res = grpc.unary(method, req).await;
|
|
||||||
Ok(res)
|
|
||||||
};
|
|
||||||
Box::pin(fut)
|
|
||||||
}
|
|
||||||
"/churn.v1.Churn/ListenEvents" => {
|
|
||||||
#[allow(non_camel_case_types)]
|
|
||||||
struct ListenEventsSvc<T: Churn>(pub Arc<T>);
|
|
||||||
impl<
|
|
||||||
T: Churn,
|
|
||||||
> tonic::server::ServerStreamingService<super::ListenEventsRequest>
|
|
||||||
for ListenEventsSvc<T> {
|
|
||||||
type Response = super::ListenEventsResponse;
|
|
||||||
type ResponseStream = T::ListenEventsStream;
|
|
||||||
type Future = BoxFuture<
|
|
||||||
tonic::Response<Self::ResponseStream>,
|
|
||||||
tonic::Status,
|
|
||||||
>;
|
|
||||||
fn call(
|
|
||||||
&mut self,
|
|
||||||
request: tonic::Request<super::ListenEventsRequest>,
|
|
||||||
) -> Self::Future {
|
|
||||||
let inner = Arc::clone(&self.0);
|
|
||||||
let fut = async move {
|
|
||||||
<T as Churn>::listen_events(&inner, request).await
|
|
||||||
};
|
|
||||||
Box::pin(fut)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
let accept_compression_encodings = self.accept_compression_encodings;
|
|
||||||
let send_compression_encodings = self.send_compression_encodings;
|
|
||||||
let max_decoding_message_size = self.max_decoding_message_size;
|
|
||||||
let max_encoding_message_size = self.max_encoding_message_size;
|
|
||||||
let inner = self.inner.clone();
|
|
||||||
let fut = async move {
|
|
||||||
let inner = inner.0;
|
|
||||||
let method = ListenEventsSvc(inner);
|
|
||||||
let codec = tonic::codec::ProstCodec::default();
|
|
||||||
let mut grpc = tonic::server::Grpc::new(codec)
|
|
||||||
.apply_compression_config(
|
|
||||||
accept_compression_encodings,
|
|
||||||
send_compression_encodings,
|
|
||||||
)
|
|
||||||
.apply_max_message_size_config(
|
|
||||||
max_decoding_message_size,
|
|
||||||
max_encoding_message_size,
|
|
||||||
);
|
|
||||||
let res = grpc.server_streaming(method, req).await;
|
|
||||||
Ok(res)
|
|
||||||
};
|
|
||||||
Box::pin(fut)
|
|
||||||
}
|
|
||||||
_ => {
|
|
||||||
Box::pin(async move {
|
|
||||||
Ok(
|
|
||||||
http::Response::builder()
|
|
||||||
.status(200)
|
|
||||||
.header("grpc-status", "12")
|
|
||||||
.header("content-type", "application/grpc")
|
|
||||||
.body(empty_body())
|
|
||||||
.unwrap(),
|
|
||||||
)
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
impl<T: Churn> Clone for ChurnServer<T> {
|
|
||||||
fn clone(&self) -> Self {
|
|
||||||
let inner = self.inner.clone();
|
|
||||||
Self {
|
|
||||||
inner,
|
|
||||||
accept_compression_encodings: self.accept_compression_encodings,
|
|
||||||
send_compression_encodings: self.send_compression_encodings,
|
|
||||||
max_decoding_message_size: self.max_decoding_message_size,
|
|
||||||
max_encoding_message_size: self.max_encoding_message_size,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
impl<T: Churn> Clone for _Inner<T> {
|
|
||||||
fn clone(&self) -> Self {
|
|
||||||
Self(Arc::clone(&self.0))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
impl<T: std::fmt::Debug> std::fmt::Debug for _Inner<T> {
|
|
||||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
|
||||||
write!(f, "{:?}", self.0)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
impl<T: Churn> tonic::server::NamedService for ChurnServer<T> {
|
|
||||||
const NAME: &'static str = "churn.v1.Churn";
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,12 +1,8 @@
|
|||||||
mod api;
|
mod api;
|
||||||
mod cli;
|
mod cli;
|
||||||
mod state;
|
mod state;
|
||||||
mod grpc {
|
|
||||||
include!("grpc/churn.v1.rs");
|
|
||||||
}
|
|
||||||
|
|
||||||
mod agent;
|
mod agent;
|
||||||
mod server;
|
|
||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
async fn main() -> anyhow::Result<()> {
|
async fn main() -> anyhow::Result<()> {
|
||||||
|
@ -1,23 +0,0 @@
|
|||||||
use std::net::SocketAddr;
|
|
||||||
|
|
||||||
pub mod config;
|
|
||||||
|
|
||||||
mod grpc_server;
|
|
||||||
|
|
||||||
use crate::{api, state::SharedState};
|
|
||||||
|
|
||||||
pub async fn execute(
|
|
||||||
host: impl Into<SocketAddr>,
|
|
||||||
grpc_host: impl Into<SocketAddr>,
|
|
||||||
config: config::ServerConfig,
|
|
||||||
) -> anyhow::Result<()> {
|
|
||||||
let state = SharedState::new(config).await?;
|
|
||||||
|
|
||||||
notmad::Mad::builder()
|
|
||||||
.add(api::Api::new(&state, host))
|
|
||||||
.add(grpc_server::GrpcServer::new(grpc_host.into()))
|
|
||||||
.run()
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
@ -1,7 +0,0 @@
|
|||||||
#[derive(clap::Args)]
|
|
||||||
pub struct ServerConfig {
|
|
||||||
#[arg(long = "external-host", env = "EXTERNAL_HOST")]
|
|
||||||
pub external_host: String,
|
|
||||||
#[arg(long = "process-host", env = "PROCESS_HOST")]
|
|
||||||
pub process_host: String,
|
|
||||||
}
|
|
@ -1,102 +0,0 @@
|
|||||||
use std::{collections::BTreeMap, net::SocketAddr, pin::Pin};
|
|
||||||
|
|
||||||
use anyhow::Context;
|
|
||||||
use futures::Stream;
|
|
||||||
use notmad::{Component, MadError};
|
|
||||||
use tonic::transport::Server;
|
|
||||||
|
|
||||||
use crate::{agent::models::Commands, grpc::*};
|
|
||||||
|
|
||||||
#[derive(Clone)]
|
|
||||||
pub struct GrpcServer {
|
|
||||||
grpc_host: SocketAddr,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl GrpcServer {
|
|
||||||
pub fn new(grpc_host: SocketAddr) -> Self {
|
|
||||||
Self { grpc_host }
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[async_trait::async_trait]
|
|
||||||
impl Component for GrpcServer {
|
|
||||||
async fn run(
|
|
||||||
&self,
|
|
||||||
cancellation_token: tokio_util::sync::CancellationToken,
|
|
||||||
) -> Result<(), notmad::MadError> {
|
|
||||||
let task = Server::builder()
|
|
||||||
.add_service(crate::grpc::churn_server::ChurnServer::new(self.clone()))
|
|
||||||
.serve(self.grpc_host);
|
|
||||||
|
|
||||||
tokio::select! {
|
|
||||||
_ = cancellation_token.cancelled() => {},
|
|
||||||
res = task => {
|
|
||||||
res.context("failed to run grpc server").map_err(MadError::Inner)?;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[async_trait::async_trait]
|
|
||||||
impl crate::grpc::churn_server::Churn for GrpcServer {
|
|
||||||
async fn get_key(
|
|
||||||
&self,
|
|
||||||
request: tonic::Request<GetKeyRequest>,
|
|
||||||
) -> std::result::Result<tonic::Response<GetKeyResponse>, tonic::Status> {
|
|
||||||
todo!()
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn set_key(
|
|
||||||
&self,
|
|
||||||
request: tonic::Request<SetKeyRequest>,
|
|
||||||
) -> std::result::Result<tonic::Response<SetKeyResponse>, tonic::Status> {
|
|
||||||
todo!()
|
|
||||||
}
|
|
||||||
|
|
||||||
#[doc = " Server streaming response type for the ListenEvents method."]
|
|
||||||
type ListenEventsStream =
|
|
||||||
Pin<Box<dyn Stream<Item = Result<ListenEventsResponse, tonic::Status>> + Send>>;
|
|
||||||
|
|
||||||
async fn listen_events(
|
|
||||||
&self,
|
|
||||||
request: tonic::Request<ListenEventsRequest>,
|
|
||||||
) -> std::result::Result<tonic::Response<Self::ListenEventsStream>, tonic::Status> {
|
|
||||||
let (tx, rx) = tokio::sync::mpsc::channel(128);
|
|
||||||
tokio::spawn(async move {
|
|
||||||
let mut interval = tokio::time::interval(std::time::Duration::from_secs(60 * 10));
|
|
||||||
|
|
||||||
loop {
|
|
||||||
interval.tick().await;
|
|
||||||
|
|
||||||
let Ok(schedule_task) = serde_json::to_string(&Commands::ScheduleTask {
|
|
||||||
task: "refresh".into(),
|
|
||||||
properties: BTreeMap::default(),
|
|
||||||
}) else {
|
|
||||||
tracing::warn!("failed to serialize event");
|
|
||||||
continue;
|
|
||||||
};
|
|
||||||
|
|
||||||
if let Err(e) = tx
|
|
||||||
.send(Ok(ListenEventsResponse {
|
|
||||||
id: uuid::Uuid::new_v4().to_string(),
|
|
||||||
value: schedule_task,
|
|
||||||
}))
|
|
||||||
.await
|
|
||||||
{
|
|
||||||
tracing::warn!("failed to send response: {}", e);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
let stream = futures::stream::unfold(rx, |mut msg| async move {
|
|
||||||
let next = msg.recv().await?;
|
|
||||||
|
|
||||||
Some((next, msg))
|
|
||||||
});
|
|
||||||
|
|
||||||
Ok(tonic::Response::new(Box::pin(stream)))
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,13 +1,11 @@
|
|||||||
use std::{ops::Deref, sync::Arc};
|
use std::{ops::Deref, sync::Arc};
|
||||||
|
|
||||||
use crate::server::config::ServerConfig;
|
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct SharedState(Arc<State>);
|
pub struct SharedState(Arc<State>);
|
||||||
|
|
||||||
impl SharedState {
|
impl SharedState {
|
||||||
pub async fn new(config: ServerConfig) -> anyhow::Result<Self> {
|
pub async fn new() -> anyhow::Result<Self> {
|
||||||
Ok(Self(Arc::new(State::new(config).await?)))
|
Ok(Self(Arc::new(State::new().await?)))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -25,12 +23,10 @@ impl Deref for SharedState {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct State {
|
pub struct State {}
|
||||||
pub config: ServerConfig,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl State {
|
impl State {
|
||||||
pub async fn new(config: ServerConfig) -> anyhow::Result<Self> {
|
pub async fn new() -> anyhow::Result<Self> {
|
||||||
Ok(Self { config })
|
Ok(Self {})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,19 +0,0 @@
|
|||||||
package component:churn-tasks@0.1.0;
|
|
||||||
|
|
||||||
interface process {
|
|
||||||
resource process {
|
|
||||||
constructor();
|
|
||||||
run-process: func(inputs: list<string>) -> string;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
interface task {
|
|
||||||
id: func() -> string;
|
|
||||||
should-run: func() -> bool;
|
|
||||||
execute: func();
|
|
||||||
}
|
|
||||||
|
|
||||||
world churn {
|
|
||||||
export task;
|
|
||||||
import process;
|
|
||||||
}
|
|
@ -12,20 +12,11 @@ vars:
|
|||||||
ingress:
|
ingress:
|
||||||
- external: "true"
|
- external: "true"
|
||||||
- internal: "true"
|
- internal: "true"
|
||||||
- internal_grpc: "true"
|
|
||||||
|
|
||||||
cuddle/clusters:
|
cuddle/clusters:
|
||||||
dev:
|
dev:
|
||||||
env:
|
env:
|
||||||
service.host: "0.0.0.0:3000"
|
service.host: "0.0.0.0:3000"
|
||||||
service.grpc.host: "0.0.0.0:4001"
|
|
||||||
process.host: "https://grpc.churn.dev.internal.kjuulh.app"
|
|
||||||
external.host: "https://churn.internal.dev.kjuulh.app"
|
|
||||||
rust.log: "h2=warn,debug"
|
|
||||||
prod:
|
prod:
|
||||||
env:
|
env:
|
||||||
service.host: "0.0.0.0:3000"
|
service.host: "0.0.0.0:3000"
|
||||||
service.grpc.host: "0.0.0.0:4001"
|
|
||||||
process.host: "https://grpc.churn.prod.internal.kjuulh.app"
|
|
||||||
external.host: "https://churn.internal.prod.kjuulh.app"
|
|
||||||
rust.log: "h2=warn,debug"
|
|
||||||
|
127
install.sh
127
install.sh
@ -1,92 +1,97 @@
|
|||||||
#!/usr/bin/env bash
|
#!/bin/bash
|
||||||
|
|
||||||
set -e
|
set -e
|
||||||
|
|
||||||
# Configuration
|
# Configuration variables
|
||||||
APP_NAME="churn"
|
GITEA_HOST="https://git.front.kjuulh.io"
|
||||||
APP_VERSION="latest" # or specify a version
|
REPO_OWNER="kjuulh"
|
||||||
S3_BUCKET="rust-artifacts"
|
REPO_NAME="churn-v2"
|
||||||
BINARY_NAME="churn"
|
BINARY_NAME="churn"
|
||||||
SERVICE_NAME="${APP_NAME}.service"
|
SERVICE_NAME="churn-agent"
|
||||||
INSTALL_DIR="/usr/local/bin"
|
SERVICE_USER="churn"
|
||||||
CONFIG_DIR="/etc/${APP_NAME}"
|
RELEASE_TAG="latest" # or specific version like "v1.0.0"
|
||||||
CHURN_DISCOVERY="https://churn.prod.kjuulh.app"
|
|
||||||
|
|
||||||
# Colors for output
|
|
||||||
RED='\033[0;31m'
|
|
||||||
GREEN='\033[0;32m'
|
|
||||||
NC='\033[0m' # No Color
|
|
||||||
|
|
||||||
# Check if running as root
|
# Check if running as root
|
||||||
if [ "$EUID" -ne 0 ]; then
|
if [ "$EUID" -ne 0 ]; then
|
||||||
echo -e "${RED}Please run as root${NC}"
|
echo "Please run as root"
|
||||||
exit 1
|
exit 1
|
||||||
fi
|
fi
|
||||||
|
|
||||||
# Create necessary directories
|
# Create service user if it doesn't exist
|
||||||
echo "Creating directories..."
|
if ! id "$SERVICE_USER" &>/dev/null; then
|
||||||
mkdir -p "${INSTALL_DIR}"
|
useradd -r -s /bin/false "$SERVICE_USER"
|
||||||
mkdir -p "${CONFIG_DIR}"
|
|
||||||
|
|
||||||
if systemctl is-active --quiet churn.service; then
|
|
||||||
echo "Stopping existing churn service..."
|
|
||||||
systemctl stop churn.service
|
|
||||||
fi
|
fi
|
||||||
|
|
||||||
# Download binary from S3
|
# Function to get latest release if RELEASE_TAG is "latest"
|
||||||
echo "Downloading binary..."
|
get_latest_release() {
|
||||||
curl -L -s "https://api-minio.front.kjuulh.io/${S3_BUCKET}/releases/${APP_NAME}/${APP_VERSION}/${BINARY_NAME}" -o "${INSTALL_DIR}/${BINARY_NAME}"
|
curl -s "https://$GITEA_HOST/api/v1/repos/$REPO_OWNER/$REPO_NAME/releases/latest" | \
|
||||||
|
grep '"tag_name":' | \
|
||||||
|
sed -E 's/.*"([^"]+)".*/\1/'
|
||||||
|
}
|
||||||
|
|
||||||
# Make binary executable
|
# Determine the actual release tag
|
||||||
chmod +x "${INSTALL_DIR}/${BINARY_NAME}"
|
if [ "$RELEASE_TAG" = "latest" ]; then
|
||||||
|
RELEASE_TAG=$(get_latest_release)
|
||||||
echo "Starting churn agent setup..."
|
|
||||||
set +e
|
|
||||||
res=$(churn agent setup --discovery "${CHURN_DISCOVERY}" 2>&1)
|
|
||||||
set -e
|
|
||||||
exit_code=$?
|
|
||||||
|
|
||||||
if [[ $exit_code != 0 ]]; then
|
|
||||||
if [[ "$res" != *"config file already exists"* ]] && [[ "$res" != *"already exists"* ]]; then
|
|
||||||
echo "Error detected: $res"
|
|
||||||
exit 1
|
|
||||||
else
|
|
||||||
echo "Ignoring setup 'agent is already setup'"
|
|
||||||
fi
|
|
||||||
fi
|
fi
|
||||||
|
|
||||||
|
echo "Installing $BINARY_NAME version $RELEASE_TAG..."
|
||||||
|
|
||||||
|
# Download and install binary
|
||||||
|
TMP_DIR=$(mktemp -d)
|
||||||
|
cd "$TMP_DIR"
|
||||||
|
|
||||||
|
# Download binary from Gitea
|
||||||
|
curl -L -o "$BINARY_NAME" \
|
||||||
|
"https://$GITEA_HOST/$REPO_OWNER/$REPO_NAME/releases/download/$RELEASE_TAG/$BINARY_NAME"
|
||||||
|
|
||||||
|
# Make binary executable and move to appropriate location
|
||||||
|
chmod +x "$BINARY_NAME"
|
||||||
|
mv "$BINARY_NAME" "/usr/local/bin/$BINARY_NAME"
|
||||||
|
|
||||||
# Create systemd service file
|
# Create systemd service file
|
||||||
echo "Creating systemd service..."
|
cat > "/etc/systemd/system/$SERVICE_NAME.service" << EOF
|
||||||
cat > "/etc/systemd/system/${SERVICE_NAME}" << EOF
|
|
||||||
[Unit]
|
[Unit]
|
||||||
Description=${APP_NAME} service
|
Description=$SERVICE_NAME Service
|
||||||
After=network.target
|
After=network.target
|
||||||
|
|
||||||
[Service]
|
[Service]
|
||||||
Type=simple
|
Type=simple
|
||||||
User=root
|
User=$SERVICE_USER
|
||||||
Group=root
|
ExecStart=/usr/local/bin/$BINARY_NAME
|
||||||
ExecStart=${INSTALL_DIR}/${BINARY_NAME} agent start
|
|
||||||
Restart=always
|
Restart=always
|
||||||
RestartSec=10
|
RestartSec=5
|
||||||
Environment=RUST_LOG=h2=warn,hyper=warn,churn=debug,warn
|
|
||||||
|
# Security hardening options
|
||||||
|
ProtectSystem=strict
|
||||||
|
ProtectHome=true
|
||||||
|
NoNewPrivileges=true
|
||||||
|
ReadWritePaths=/var/log/$SERVICE_NAME
|
||||||
|
|
||||||
[Install]
|
[Install]
|
||||||
WantedBy=multi-user.target
|
WantedBy=multi-user.target
|
||||||
EOF
|
EOF
|
||||||
|
|
||||||
|
# Create log directory if logging is needed
|
||||||
|
mkdir -p "/var/log/$SERVICE_NAME"
|
||||||
|
chown "$SERVICE_USER:$SERVICE_USER" "/var/log/$SERVICE_NAME"
|
||||||
|
|
||||||
# Reload systemd and enable service
|
# Reload systemd and enable service
|
||||||
echo "Configuring systemd service..."
|
|
||||||
systemctl daemon-reload
|
systemctl daemon-reload
|
||||||
systemctl enable "${SERVICE_NAME}"
|
systemctl enable "$SERVICE_NAME"
|
||||||
systemctl start "${SERVICE_NAME}"
|
systemctl start "$SERVICE_NAME"
|
||||||
|
|
||||||
# Check service status
|
# Clean up
|
||||||
if systemctl is-active --quiet "${SERVICE_NAME}"; then
|
cd
|
||||||
echo -e "${GREEN}Installation successful! ${APP_NAME} is running.${NC}"
|
rm -rf "$TMP_DIR"
|
||||||
echo "You can check the status with: systemctl status ${SERVICE_NAME}"
|
|
||||||
else
|
|
||||||
echo -e "${RED}Installation completed but service failed to start. Check logs with: journalctl -u ${SERVICE_NAME}${NC}"
|
|
||||||
exit 1
|
|
||||||
fi
|
|
||||||
|
|
||||||
|
echo "Installation complete! Service status:"
|
||||||
|
systemctl status "$SERVICE_NAME"
|
||||||
|
|
||||||
|
# Provide some helpful commands
|
||||||
|
echo "
|
||||||
|
Useful commands:
|
||||||
|
- Check status: systemctl status $SERVICE_NAME
|
||||||
|
- View logs: journalctl -u $SERVICE_NAME
|
||||||
|
- Restart service: systemctl restart $SERVICE_NAME
|
||||||
|
- Stop service: systemctl stop $SERVICE_NAME
|
||||||
|
"
|
||||||
|
Loading…
Reference in New Issue
Block a user