Compare commits

...

71 Commits
v2 ... main

Author SHA1 Message Date
06a9dd10e1 fix(deps): update tokio-prost monorepo to v0.13.5
Some checks failed
continuous-integration/drone/push Build is failing
continuous-integration/drone/pr Build is failing
2025-02-13 01:11:41 +00:00
55befef95b chore(release): v0.1.0 (#17)
All checks were successful
continuous-integration/drone/tag Build is passing
continuous-integration/drone/push Build is passing
chore(release): 0.1.0

Co-authored-by: cuddle-please <bot@cuddle.sh>
Reviewed-on: #17
2025-01-11 15:26:44 +01:00
53cc689dc4
docs: update readme
All checks were successful
continuous-integration/drone/push Build is passing
next up is differentiating the different agents, such that we can execute commands from the cli to for example update dependencies on all machines, restart machines etc.
2025-01-11 15:22:38 +01:00
1c20383de6
chore: update final repo
All checks were successful
continuous-integration/drone/push Build is passing
2025-01-11 15:11:30 +01:00
53c15a653f
feat: add cuddle please
All checks were successful
continuous-integration/drone/push Build is passing
2025-01-11 15:10:59 +01:00
9c5cb6667e
chore: update lock"
All checks were successful
continuous-integration/drone/push Build is passing
2025-01-11 15:09:23 +01:00
b0c40196b6
docs: add installation docs
All checks were successful
continuous-integration/drone/push Build is passing
2025-01-11 14:11:02 +01:00
a28a5ca6ee
fix: use actual names for files
All checks were successful
continuous-integration/drone/push Build is passing
2025-01-11 13:08:04 +01:00
ea6bfc9c04
feat: enable churn update service
All checks were successful
continuous-integration/drone/push Build is passing
2025-01-11 13:04:54 +01:00
844f8519d5
feat: add updater to install script 2025-01-11 13:03:11 +01:00
1508fbb2bf
feat: add updater to install script
All checks were successful
continuous-integration/drone/push Build is passing
2025-01-11 13:02:57 +01:00
ef6ae3f2b1
chore: update default schedule
All checks were successful
continuous-integration/drone/push Build is passing
2025-01-10 21:46:57 +01:00
8923c60d9e
feat: add http client
All checks were successful
continuous-integration/drone/push Build is passing
2025-01-10 21:42:35 +01:00
efec76d28c feat: run more often
All checks were successful
continuous-integration/drone/push Build is passing
Signed-off-by: kjuulh <contact@kjuulh.io>
2025-01-05 20:50:49 +01:00
03e23c7d9d feat: enable checking if it should actually run
All checks were successful
continuous-integration/drone/push Build is passing
Signed-off-by: kjuulh <contact@kjuulh.io>
2025-01-04 01:52:05 +01:00
83294306a4 feat: enable having get variable from local setup
All checks were successful
continuous-integration/drone/push Build is passing
Signed-off-by: kjuulh <contact@kjuulh.io>
2025-01-04 01:28:32 +01:00
ceaad75057 feat: inherit output as well
All checks were successful
continuous-integration/drone/push Build is passing
Signed-off-by: kjuulh <contact@kjuulh.io>
2025-01-04 00:35:18 +01:00
b86fa54671 fix(deps): update rust crate serde to v1.0.217
All checks were successful
continuous-integration/drone/pr Build is passing
continuous-integration/drone/push Build is passing
2024-12-28 01:13:18 +00:00
aecdace4ee chore(deps): update rust crate anyhow to v1.0.95
All checks were successful
continuous-integration/drone/pr Build is passing
continuous-integration/drone/push Build is passing
2024-12-23 01:10:59 +00:00
c98761622d fix(deps): update rust crate serde_json to v1.0.134
All checks were successful
continuous-integration/drone/pr Build is passing
continuous-integration/drone/push Build is passing
2024-12-22 01:09:42 +00:00
825f612aea fix(deps): update all dependencies to v28
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/pr Build is passing
2024-12-21 01:11:47 +00:00
c12b02ba92 fix(deps): update rust crate nodrift to 0.3.0
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/pr Build is passing
2024-12-14 01:09:13 +00:00
c5e5307682 fix(deps): update rust crate serde to v1.0.216
All checks were successful
continuous-integration/drone/pr Build is passing
continuous-integration/drone/push Build is passing
2024-12-11 05:09:41 +00:00
5fb59ad992 fix(deps): update tokio-prost monorepo to v0.13.4
All checks were successful
continuous-integration/drone/pr Build is passing
continuous-integration/drone/push Build is passing
2024-12-07 01:13:20 +00:00
e21663c8bd chore(deps): update rust crate clap to v4.5.23
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/pr Build is passing
2024-12-06 01:14:09 +00:00
39a01778b2 fix(deps): update rust crate tokio-util to v0.7.13
All checks were successful
continuous-integration/drone/pr Build is passing
continuous-integration/drone/push Build is passing
2024-12-05 01:11:27 +00:00
f1cdf3ae20 chore(deps): update all dependencies
All checks were successful
continuous-integration/drone/pr Build is passing
continuous-integration/drone/push Build is passing
2024-12-04 01:13:13 +00:00
355587234e
feat: allow process from external code
Some checks failed
continuous-integration/drone/push Build is failing
Signed-off-by: kjuulh <contact@kjuulh.io>
2024-12-02 23:12:37 +01:00
21a13f3444
feat: add inherit
All checks were successful
continuous-integration/drone/push Build is passing
Signed-off-by: kjuulh <contact@kjuulh.io>
2024-12-02 21:00:20 +01:00
5e1b585a2d
feat: add default no labels
All checks were successful
continuous-integration/drone/push Build is passing
Signed-off-by: kjuulh <contact@kjuulh.io>
2024-12-01 22:46:22 +01:00
94025a02ce
feat: warn all targets
All checks were successful
continuous-integration/drone/push Build is passing
Signed-off-by: kjuulh <contact@kjuulh.io>
2024-12-01 22:30:18 +01:00
db4cc98643
feat: update with web assembly components
All checks were successful
continuous-integration/drone/push Build is passing
Signed-off-by: kjuulh <contact@kjuulh.io>
2024-12-01 22:21:17 +01:00
2387a70778
feat: add labels to config
All checks were successful
continuous-integration/drone/push Build is passing
Signed-off-by: kjuulh <contact@kjuulh.io>
2024-12-01 14:38:10 +01:00
d6fdda0e4e
feat: add abstraction around task
All checks were successful
continuous-integration/drone/push Build is passing
Signed-off-by: kjuulh <contact@kjuulh.io>
2024-12-01 14:25:24 +01:00
974e1ee0d6
feat: enable webpki roots
All checks were successful
continuous-integration/drone/push Build is passing
Signed-off-by: kjuulh <contact@kjuulh.io>
2024-11-30 12:31:46 +01:00
717b052a88
feat: add short connect timeout
All checks were successful
continuous-integration/drone/push Build is passing
Signed-off-by: kjuulh <contact@kjuulh.io>
2024-11-30 12:30:16 +01:00
9b52376e7a
feat: more error logging
All checks were successful
continuous-integration/drone/push Build is passing
Signed-off-by: kjuulh <contact@kjuulh.io>
2024-11-30 12:22:44 +01:00
7b222af1dd
feat: stop the service if running
All checks were successful
continuous-integration/drone/push Build is passing
Signed-off-by: kjuulh <contact@kjuulh.io>
2024-11-30 12:20:51 +01:00
150c7c3c98
feat: setup stream logging
All checks were successful
continuous-integration/drone/push Build is passing
Signed-off-by: kjuulh <contact@kjuulh.io>
2024-11-30 12:16:19 +01:00
8b064c2169
feat: update script with warn
All checks were successful
continuous-integration/drone/push Build is passing
Signed-off-by: kjuulh <contact@kjuulh.io>
2024-11-30 12:02:52 +01:00
879737eedd
feat: disable force again
All checks were successful
continuous-integration/drone/push Build is passing
Signed-off-by: kjuulh <contact@kjuulh.io>
2024-11-30 11:52:50 +01:00
818cc6c671
feat: make curl silent"
All checks were successful
continuous-integration/drone/push Build is passing
Signed-off-by: kjuulh <contact@kjuulh.io>
2024-11-30 11:49:23 +01:00
e759239243
feat: force update
All checks were successful
continuous-integration/drone/push Build is passing
Signed-off-by: kjuulh <contact@kjuulh.io>
2024-11-30 11:48:40 +01:00
b37674987e
feat: use public prod
Signed-off-by: kjuulh <contact@kjuulh.io>
2024-11-30 11:48:26 +01:00
1dea5f29ac
feat: run as root
All checks were successful
continuous-integration/drone/push Build is passing
Signed-off-by: kjuulh <contact@kjuulh.io>
2024-11-30 11:45:49 +01:00
38a0a9fba4
feat: agent is already setup
All checks were successful
continuous-integration/drone/push Build is passing
Signed-off-by: kjuulh <contact@kjuulh.io>
2024-11-30 11:44:51 +01:00
f7c7aef96b
feat: allow errors
All checks were successful
continuous-integration/drone/push Build is passing
Signed-off-by: kjuulh <contact@kjuulh.io>
2024-11-30 11:43:45 +01:00
7fefca47c9
feat: some more debugging
All checks were successful
continuous-integration/drone/push Build is passing
Signed-off-by: kjuulh <contact@kjuulh.io>
2024-11-30 11:42:46 +01:00
36b1335fe9
feat: some more debugging
All checks were successful
continuous-integration/drone/push Build is passing
Signed-off-by: kjuulh <contact@kjuulh.io>
2024-11-30 11:40:05 +01:00
eeabeda036
feat: stderr to stdout as well
All checks were successful
continuous-integration/drone/push Build is passing
Signed-off-by: kjuulh <contact@kjuulh.io>
2024-11-30 11:37:28 +01:00
569fee52e6
feat: this should work
All checks were successful
continuous-integration/drone/push Build is passing
Signed-off-by: kjuulh <contact@kjuulh.io>
2024-11-30 11:36:10 +01:00
b0ec41fa93
feat: when config has already been setup
All checks were successful
continuous-integration/drone/push Build is passing
Signed-off-by: kjuulh <contact@kjuulh.io>
2024-11-30 11:35:24 +01:00
77f5ec7475
feat: add agent start as well
All checks were successful
continuous-integration/drone/push Build is passing
Signed-off-by: kjuulh <contact@kjuulh.io>
2024-11-30 11:33:21 +01:00
f3926d0885
feat: update with agent setup
All checks were successful
continuous-integration/drone/push Build is passing
Signed-off-by: kjuulh <contact@kjuulh.io>
2024-11-30 11:32:41 +01:00
b48b1ec886
feat: add install script
All checks were successful
continuous-integration/drone/push Build is passing
Signed-off-by: kjuulh <contact@kjuulh.io>
2024-11-30 11:28:03 +01:00
79a8a34499 chore(deps): update rust crate tracing-subscriber to v0.3.19
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/pr Build is passing
2024-11-30 01:19:48 +00:00
c9bbeb2439 fix(deps): update rust crate bytes to v1.9.0
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/pr Build is passing
2024-11-29 01:10:19 +00:00
b19ff0b7e5 chore(deps): update rust crate tracing to v0.1.41
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/pr Build is passing
2024-11-28 01:11:13 +00:00
eeaf59ac63
feat: add comments
All checks were successful
continuous-integration/drone/push Build is passing
Signed-off-by: kjuulh <contact@kjuulh.io>
2024-11-24 22:04:07 +01:00
6f04d0cdda
feat: use actual internal
All checks were successful
continuous-integration/drone/push Build is passing
Signed-off-by: kjuulh <contact@kjuulh.io>
2024-11-24 21:44:46 +01:00
43c5fa1731
feat: reqwest as native build
All checks were successful
continuous-integration/drone/push Build is passing
Signed-off-by: kjuulh <contact@kjuulh.io>
2024-11-24 21:42:41 +01:00
9badf8e193
feat: use internal
All checks were successful
continuous-integration/drone/push Build is passing
Signed-off-by: kjuulh <contact@kjuulh.io>
2024-11-24 21:41:56 +01:00
55a0d294c5
feat: add external service host
All checks were successful
continuous-integration/drone/push Build is passing
Signed-off-by: kjuulh <contact@kjuulh.io>
2024-11-24 21:37:08 +01:00
8508d3f640
feat: add grpc host
All checks were successful
continuous-integration/drone/push Build is passing
Signed-off-by: kjuulh <contact@kjuulh.io>
2024-11-24 21:34:10 +01:00
dd8ade9798
feat: add external vars
All checks were successful
continuous-integration/drone/push Build is passing
Signed-off-by: kjuulh <contact@kjuulh.io>
2024-11-24 21:29:58 +01:00
f1e6268a2d
feat: add grpc and env
Some checks failed
continuous-integration/drone/push Build is failing
Signed-off-by: kjuulh <contact@kjuulh.io>
2024-11-24 21:28:06 +01:00
6647bb89be
feat: add queue
All checks were successful
continuous-integration/drone/push Build is passing
Signed-off-by: kjuulh <contact@kjuulh.io>
2024-11-24 21:24:50 +01:00
ea5adb2f93
feat: add common queue
All checks were successful
continuous-integration/drone/push Build is passing
Signed-off-by: kjuulh <contact@kjuulh.io>
2024-11-24 21:08:37 +01:00
ee323e99e8
feat: add discovery
All checks were successful
continuous-integration/drone/push Build is passing
Signed-off-by: kjuulh <contact@kjuulh.io>
2024-11-24 17:12:15 +01:00
c4434fd841 fix(deps): update rust crate tower-http to 0.6.0
All checks were successful
continuous-integration/drone/pr Build is passing
continuous-integration/drone/push Build is passing
2024-11-24 01:15:41 +00:00
cb340ffb1e
feat: add tonic
All checks were successful
continuous-integration/drone/push Build is passing
Signed-off-by: kjuulh <contact@kjuulh.io>
2024-11-24 01:34:06 +01:00
39 changed files with 4544 additions and 333 deletions

8
.env
View File

@ -1 +1,9 @@
EXTERNAL_HOST=http://localhost:3000
PROCESS_HOST=http://localhost:7900
SERVICE_HOST=127.0.0.1:3000
DISCOVERY_HOST=http://127.0.0.1:3000
#EXTERNAL_HOST=http://localhost:3000
#PROCESS_HOST=http://localhost:7900
#SERVICE_HOST=127.0.0.1:3000
#DISCOVERY_HOST=https://churn.prod.kjuulh.app

165
CHANGELOG.md Normal file
View File

@ -0,0 +1,165 @@
# Changelog
All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [Unreleased]
## [0.1.0] - 2025-01-11
### Added
- add cuddle please
- enable churn update service
- add updater to install script
- add updater to install script
- add http client
- run more often
- enable checking if it should actually run
- enable having get variable from local setup
- inherit output as well
- allow process from external code
- add inherit
- add default no labels
- warn all targets
- update with web assembly components
- add labels to config
- add abstraction around task
- enable webpki roots
- add short connect timeout
- more error logging
- stop the service if running
- setup stream logging
- update script with warn
- disable force again
- make curl silent"
- force update
- use public prod
- run as root
- agent is already setup
- allow errors
- some more debugging
- some more debugging
- stderr to stdout as well
- this should work
- when config has already been setup
- add agent start as well
- update with agent setup
- add install script
- add comments
- use actual internal
- reqwest as native build
- use internal
- add external service host
- add grpc host
- add external vars
- add grpc and env
- add queue
- add common queue
- add discovery
- add tonic
- added tonic
- added longer timer
- fix error message
- add agent
- add churn v2
- initial v2 commit
- reset
- update
- update
- update stuff
- update
- with drone
- with agent db
- with sled db and capnp
- with sled db
- with basic changelog
- with basic package
- with publish
- with monitoring
- with monitor
- with extra churning repl thingy
- with enroll
- add initial churn
- add simple health check
### Docs
- update readme
next up is differentiating the different agents, such that we can execute commands from the cli to for example update dependencies on all machines, restart machines etc.
- add installation docs
- add notes
### Fixed
- use actual names for files
- *(deps)* update rust crate serde to v1.0.217
- *(deps)* update rust crate serde_json to v1.0.134
- *(deps)* update all dependencies to v28
- *(deps)* update rust crate nodrift to 0.3.0
- *(deps)* update rust crate serde to v1.0.216
- *(deps)* update tokio-prost monorepo to v0.13.4
- *(deps)* update rust crate tokio-util to v0.7.13
- *(deps)* update rust crate bytes to v1.9.0
- *(deps)* update rust crate tower-http to 0.6.0
- *(deps)* update all dependencies
- *(deps)* update rust crate capnp to 0.19.5
- *(deps)* update rust crate capnp to 0.19.4
### Other
- update final repo
- update lock"
- update default schedule
- *(deps)* update rust crate anyhow to v1.0.95
- *(deps)* update rust crate clap to v4.5.23
- *(deps)* update all dependencies
- *(deps)* update rust crate tracing-subscriber to v0.3.19
- *(deps)* update rust crate tracing to v0.1.41
- *(deps)* update rust crate serde to v1.0.215
- *(deps)* update rust crate serde to v1.0.214
- *(deps)* update rust crate serde to v1.0.213
- *(deps)* update rust crate serde to v1.0.210
- *(deps)* update rust crate serde to v1.0.209
- *(deps)* update rust crate serde_json to v1.0.126
- *(deps)* update all dependencies
- *(deps)* update rust crate serde to v1.0.208
- *(deps)* update all dependencies
- *(deps)* update rust crate serde to v1.0.203
- *(deps)* update rust crate anyhow to 1.0.86
- *(deps)* update rust crate anyhow to 1.0.85
- *(deps)* update rust crate anyhow to 1.0.84
- *(deps)* update rust crate itertools to 0.13.0
- *(deps)* update rust crate anyhow to 1.0.83
- *(deps)* update rust crate reqwest to 0.12.4
- *(deps)* update rust crate chrono to 0.4.38
- *(deps)* update rust crate anyhow to 1.0.82
- Merge pull request 'chore(release): v0.1.0' (#4) from cuddle-please/release into main
Reviewed-on: https://git.front.kjuulh.io/kjuulh/churn/pulls/4
- *(release)* 0.1.0
- *(test)* test commit
- *(test)* test commit
- *(test)* test commit
- *(test)* test commit
- Merge pull request 'chore(deps): update all dependencies' (#2) from renovate/all into main
Reviewed-on: https://git.front.kjuulh.io/kjuulh/churn/pulls/2
- *(deps)* update all dependencies
- change to byte slice
- fmt
- fmt
- Add renovate.json
- Release churn-server v0.1.0
- Release churn-agent v0.1.0
- Release churn v0.1.0
- Release churn v0.1.0
- Release churn-domain v0.1.0, churn v0.1.0
- with changelog
- Release churn-domain v0.1.0, churn v0.1.0

2560
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -3,7 +3,6 @@ members = ["crates/*"]
resolver = "2"
[workspace.dependencies]
churn = { path = "crates/churn" }
anyhow = { version = "1" }
tokio = { version = "1", features = ["full"] }
@ -12,3 +11,6 @@ tracing-subscriber = { version = "0.3.18" }
clap = { version = "4", features = ["derive", "env"] }
dotenv = { version = "0.15" }
axum = { version = "0.7" }
[workspace.package]
version = "0.1.0"

View File

@ -1 +1,27 @@
# churn
## Installation
To install churn, you need first of all a server and agents.
Servers can be run via. docker.
```shell
docker run docker.io/kjuulh/churn-v2:latest
```
To install an agent run the following script
```shell
curl https://git.front.kjuulh.io/kjuulh/churn-v2/raw/branch/main/install.sh | bash
```
configure `~/.local/share/io.kjuulh.churn-agent/churn-agent.toml` use an editor of choice. Churn agent will generate a randomish name for the specific agent, consider giving it something more semantically meaningful to you
## CLI (TBD)
Using the churn cli allows sending specific commands to a set of agents
```
```

8
buf.gen.yaml Normal file
View File

@ -0,0 +1,8 @@
version: v2
plugins:
- remote: buf.build/community/neoeinstein-prost
out: crates/churn/src/grpc
- remote: buf.build/community/neoeinstein-tonic:v0.4.0
out: crates/churn/src/grpc
inputs:
- directory: crates/churn/proto

View File

@ -14,13 +14,27 @@ axum.workspace = true
serde = { version = "1.0.197", features = ["derive"] }
uuid = { version = "1.7.0", features = ["v4"] }
tower-http = { version = "0.5.2", features = ["cors", "trace"] }
notmad = "0.6.0"
tower-http = { version = "0.6.0", features = ["cors", "trace"] }
notmad = "0.7.1"
tokio-util = "0.7.12"
async-trait = "0.1.83"
nodrift = "0.2.0"
nodrift = "0.3.0"
rusqlite = { version = "0.32.1", features = ["bundled"] }
prost-types = "0.13.3"
prost = "0.13.3"
bytes = "1.8.0"
tonic = { version = "0.12.3", features = ["tls", "tls-roots"] }
toml = "0.8.19"
dirs = "5.0.1"
futures = "0.3.31"
reqwest = { version = "0.12.9", default-features = false, features = [
"json",
"http2",
"charset",
"native-tls-vendored",
"stream",
] }
serde_json = "1.0.133"
wasmtime = "28.0.0"
wasmtime-wasi = "28.0.0"
petname = "2.0.2"

View File

@ -0,0 +1,35 @@
syntax = "proto3";
package churn.v1;
service Churn {
rpc GetKey(GetKeyRequest) returns (GetKeyResponse);
rpc SetKey(SetKeyRequest) returns (SetKeyResponse);
rpc ListenEvents(ListenEventsRequest) returns (stream ListenEventsResponse);
}
message GetKeyRequest {
string namespace = 1;
optional string id = 2;
string key = 3;
}
message GetKeyResponse {
optional string value = 1;
}
message SetKeyRequest {
string namespace = 1;
optional string id = 2;
string key = 3;
string value = 4;
}
message SetKeyResponse {}
message ListenEventsRequest {
string namespace = 1;
optional string id = 2;
}
message ListenEventsResponse {
string id = 1;
string value = 2;
}

View File

@ -1,15 +1,33 @@
use agent_state::AgentState;
use event_handler::EventHandler;
use refresh::AgentRefresh;
pub use config::setup_config;
pub mod models;
pub(crate) mod task;
mod agent_state;
mod config;
mod discovery_client;
mod event_handler;
mod grpc_client;
mod plugins;
mod queue;
mod refresh;
mod scheduler;
pub async fn execute(host: impl Into<String>) -> anyhow::Result<()> {
mod handlers;
mod actions;
pub async fn execute() -> anyhow::Result<()> {
let state = AgentState::new().await?;
notmad::Mad::builder()
.add(AgentRefresh::new(&state, host))
.add(AgentRefresh::new(&state))
.add(EventHandler::new(&state))
.add(state.queue.clone())
.cancellation(Some(std::time::Duration::from_secs(2)))
.run()
.await?;

View File

@ -0,0 +1,24 @@
use apt::AptTask;
use plugin_task::PluginTask;
use super::{plugins::PluginStore, task::IntoTask};
pub mod apt;
pub mod plugin_task;
pub struct Plan {
store: PluginStore,
}
impl Plan {
pub fn new(store: PluginStore) -> Self {
Self { store }
}
pub async fn tasks(&self) -> anyhow::Result<Vec<impl IntoTask>> {
Ok(vec![
AptTask::new().into_task(),
PluginTask::new("alloy@0.1.0", self.store.clone()).into_task(),
PluginTask::new("dev_packages@0.1.0", self.store.clone()).into_task(),
])
}
}

View File

@ -0,0 +1,49 @@
use anyhow::Context;
use crate::agent::task::Task;
pub struct AptTask {}
impl AptTask {
pub fn new() -> Self {
Self {}
}
}
#[async_trait::async_trait]
impl Task for AptTask {
async fn id(&self) -> anyhow::Result<String> {
Ok("apt".into())
}
async fn execute(&self) -> anyhow::Result<()> {
let mut cmd = tokio::process::Command::new("apt-get");
cmd.args(["update", "-q"]);
let output = cmd.output().await.context("failed to run apt update")?;
match output.status.success() {
true => tracing::info!("successfully ran apt update"),
false => {
anyhow::bail!(
"failed to run apt update: {}",
std::str::from_utf8(&output.stderr)?
);
}
}
let mut cmd = tokio::process::Command::new("apt-get");
cmd.env("DEBIAN_FRONTEND", "noninteractive")
.args(["upgrade", "-y"]);
let output = cmd.output().await.context("failed to run apt upgrade")?;
match output.status.success() {
true => tracing::info!("successfully ran apt upgrade"),
false => {
anyhow::bail!(
"failed to run apt upgrade: {}",
std::str::from_utf8(&output.stderr)?
);
}
}
Ok(())
}
}

View File

@ -0,0 +1,30 @@
use crate::agent::{plugins::PluginStore, task::Task};
pub struct PluginTask {
plugin: String,
store: PluginStore,
}
impl PluginTask {
pub fn new(plugin: impl Into<String>, store: PluginStore) -> Self {
Self {
plugin: plugin.into(),
store,
}
}
}
#[async_trait::async_trait]
impl Task for PluginTask {
async fn id(&self) -> anyhow::Result<String> {
let id = self.store.id(&self.plugin).await?;
Ok(id)
}
async fn execute(&self) -> anyhow::Result<()> {
self.store.execute(&self.plugin).await?;
Ok(())
}
}

View File

@ -1,5 +1,13 @@
use std::{ops::Deref, sync::Arc};
use crate::api::Discovery;
use super::{
config::AgentConfig, discovery_client::DiscoveryClient, grpc_client::GrpcClient,
handlers::scheduled_tasks::ScheduledTasks, plugins::PluginStore, queue::AgentQueue,
scheduler::Scheduler,
};
#[derive(Clone)]
pub struct AgentState(Arc<State>);
@ -23,10 +31,30 @@ impl Deref for AgentState {
}
}
pub struct State {}
pub struct State {
pub grpc: GrpcClient,
pub config: AgentConfig,
pub discovery: Discovery,
pub queue: AgentQueue,
pub plugin_store: PluginStore,
}
impl State {
pub async fn new() -> anyhow::Result<Self> {
Ok(Self {})
let config = AgentConfig::new().await?;
let discovery = DiscoveryClient::new(&config.discovery).discover().await?;
let grpc = GrpcClient::new(&discovery.process_host);
let plugin_store = PluginStore::new(config.clone())?;
let scheduled_tasks = ScheduledTasks::new(plugin_store.clone());
let scheduler = Scheduler::new(scheduled_tasks);
let queue = AgentQueue::new(scheduler);
Ok(Self {
grpc,
config,
discovery,
queue,
plugin_store,
})
}
}

View File

@ -0,0 +1,96 @@
use std::collections::BTreeMap;
use anyhow::Context;
use serde::{Deserialize, Serialize};
use uuid::Uuid;
#[derive(Clone)]
pub struct AgentConfig {
pub agent_id: String,
pub discovery: String,
pub labels: BTreeMap<String, String>,
}
impl AgentConfig {
pub async fn new() -> anyhow::Result<Self> {
let config = ConfigFile::load().await?;
Ok(Self {
agent_id: config.agent_id,
discovery: config.discovery,
labels: config.labels.unwrap_or_default(),
})
}
}
#[derive(Serialize, Deserialize)]
struct ConfigFile {
agent_id: String,
discovery: String,
labels: Option<BTreeMap<String, String>>,
}
impl ConfigFile {
pub async fn load() -> anyhow::Result<Self> {
let directory = dirs::data_dir()
.ok_or(anyhow::anyhow!("failed to get data dir"))?
.join("io.kjuulh.churn-agent")
.join("churn-agent.toml");
if !directory.exists() {
anyhow::bail!(
"No churn agent file was setup, run `churn agent setup` to setup the defaults"
)
}
let contents = tokio::fs::read_to_string(&directory).await?;
toml::from_str(&contents).context("failed to parse the contents of the churn agent config")
}
pub async fn write_default(
discovery: impl Into<String>,
force: bool,
labels: impl Into<BTreeMap<String, String>>,
) -> anyhow::Result<Self> {
let s = Self {
agent_id: Uuid::new_v4().to_string(),
discovery: discovery.into(),
labels: Some(labels.into()),
};
let directory = dirs::data_dir()
.ok_or(anyhow::anyhow!("failed to get data dir"))?
.join("io.kjuulh.churn-agent")
.join("churn-agent.toml");
if let Some(parent) = directory.parent() {
tokio::fs::create_dir_all(&parent).await?;
}
if !force && directory.exists() {
anyhow::bail!("config file already exists, consider moving it to a backup before trying again: {}", directory.display());
}
let contents = toml::to_string_pretty(&s)
.context("failed to convert default implementation to string")?;
tokio::fs::write(directory, contents.as_bytes())
.await
.context("failed to write to agent file")?;
Ok(s)
}
}
pub async fn setup_config(
discovery: impl Into<String>,
force: bool,
labels: impl Into<BTreeMap<String, String>>,
) -> anyhow::Result<()> {
ConfigFile::write_default(discovery, force, labels).await?;
Ok(())
}

View File

@ -0,0 +1,21 @@
use crate::api::Discovery;
pub struct DiscoveryClient {
host: String,
}
impl DiscoveryClient {
pub fn new(discovery_host: impl Into<String>) -> Self {
Self {
host: discovery_host.into(),
}
}
pub async fn discover(&self) -> anyhow::Result<Discovery> {
tracing::info!(
"getting details from discovery endpoint: {}/discovery",
self.host.trim_end_matches('/')
);
crate::api::Discovery::get_from_host(&self.host).await
}
}

View File

@ -0,0 +1,63 @@
use notmad::{Component, MadError};
use crate::agent::models::Commands;
use super::{
agent_state::AgentState, config::AgentConfig, grpc_client::GrpcClient, queue::AgentQueue,
};
#[derive(Clone)]
pub struct EventHandler {
config: AgentConfig,
grpc: GrpcClient,
queue: AgentQueue,
}
impl EventHandler {
pub fn new(state: impl Into<AgentState>) -> Self {
let state: AgentState = state.into();
Self {
config: state.config.clone(),
grpc: state.grpc.clone(),
queue: state.queue.clone(),
}
}
}
#[async_trait::async_trait]
impl Component for EventHandler {
fn name(&self) -> Option<String> {
Some("event_handler".into())
}
async fn run(
&self,
cancellation_token: tokio_util::sync::CancellationToken,
) -> Result<(), notmad::MadError> {
tokio::select! {
_ = cancellation_token.cancelled() => {},
res = self.grpc.listen_events("agents", None::<String>, self.clone()) => {
res.map_err(MadError::Inner)?;
},
res = self.grpc.listen_events("agents", Some(&self.config.agent_id), self.clone()) => {
res.map_err(MadError::Inner)?;
}
}
Ok(())
}
}
#[async_trait::async_trait]
impl super::grpc_client::ListenEventsExecutor for EventHandler {
async fn execute(&self, event: crate::grpc::ListenEventsResponse) -> anyhow::Result<()> {
tracing::info!(value = event.id, "received event");
let event: Commands = serde_json::from_str(&event.value)?;
self.queue.publish(event).await?;
Ok(())
}
}

View File

@ -0,0 +1,108 @@
use tonic::transport::{Channel, ClientTlsConfig};
use crate::grpc::{churn_client::ChurnClient, *};
#[derive(Clone)]
pub struct GrpcClient {
host: String,
}
impl GrpcClient {
pub fn new(host: impl Into<String>) -> Self {
Self { host: host.into() }
}
pub async fn get_key(
&self,
namespace: &str,
id: Option<impl Into<String>>,
key: &str,
) -> anyhow::Result<Option<String>> {
let mut client = self.client().await?;
let resp = client
.get_key(GetKeyRequest {
key: key.into(),
namespace: namespace.into(),
id: id.map(|i| i.into()),
})
.await?;
let resp = resp.into_inner();
Ok(resp.value)
}
pub async fn set_key(
&self,
namespace: &str,
id: Option<impl Into<String>>,
key: &str,
value: &str,
) -> anyhow::Result<()> {
let mut client = self.client().await?;
client
.set_key(SetKeyRequest {
key: key.into(),
value: value.into(),
namespace: namespace.into(),
id: id.map(|i| i.into()),
})
.await?;
Ok(())
}
pub async fn listen_events(
&self,
namespace: &str,
id: Option<impl Into<String>>,
exec: impl ListenEventsExecutor,
) -> anyhow::Result<()> {
let mut client = self.client().await?;
tracing::debug!("creating stream for listening to events on: {}", namespace);
let resp = client
.listen_events(ListenEventsRequest {
namespace: namespace.into(),
id: id.map(|i| i.into()),
})
.await
.inspect_err(|e| tracing::warn!("failed to establish a connection: {}", e))?;
tracing::debug!("setup stream: {}", namespace);
let mut inner = resp.into_inner();
while let Ok(Some(message)) = inner.message().await {
tracing::debug!("received message: {}", namespace);
exec.execute(message)
.await
.inspect_err(|e| tracing::warn!("failed to handle message: {}", e))?;
}
Ok(())
}
async fn client(&self) -> anyhow::Result<ChurnClient<tonic::transport::Channel>> {
tracing::debug!("setting up client");
let channel = if self.host.starts_with("https") {
Channel::from_shared(self.host.to_owned())?
.tls_config(ClientTlsConfig::new().with_native_roots())?
.connect_timeout(std::time::Duration::from_secs(5))
.connect()
.await?
} else {
Channel::from_shared(self.host.to_owned())?
.connect()
.await?
};
let client = ChurnClient::new(channel);
Ok(client)
}
}
#[async_trait::async_trait]
pub trait ListenEventsExecutor {
async fn execute(&self, event: ListenEventsResponse) -> anyhow::Result<()>;
}

View File

@ -0,0 +1 @@
pub mod scheduled_tasks;

View File

@ -0,0 +1,46 @@
use std::collections::BTreeMap;
use crate::agent::{
actions::Plan,
plugins::PluginStore,
task::{ConcreteTask, IntoTask},
};
#[derive(Clone)]
pub struct ScheduledTasks {
store: PluginStore,
}
impl ScheduledTasks {
pub fn new(store: PluginStore) -> Self {
Self { store }
}
pub async fn handle(
&self,
task: &str,
_properties: BTreeMap<String, String>,
) -> anyhow::Result<()> {
tracing::info!("scheduling: {}", task);
let plan = Plan::new(self.store.clone());
let tasks: Vec<ConcreteTask> = plan
.tasks()
.await?
.into_iter()
.map(|i| i.into_task())
.collect();
for task in tasks {
let id = task.id().await?;
if !task.should_run().await? {
tracing::debug!(task = id, "skipping run");
continue;
}
tracing::info!(task = id, "executing task");
task.execute().await?;
}
Ok(())
}
}

View File

@ -0,0 +1,20 @@
use std::{collections::BTreeMap, fmt::Display};
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize, Clone, Debug)]
#[serde(tag = "type")]
pub enum Commands {
ScheduleTask {
task: String,
properties: BTreeMap<String, String>,
},
}
impl Display for Commands {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(match self {
Commands::ScheduleTask { .. } => "schedule_task",
})
}
}

View File

@ -0,0 +1,328 @@
use anyhow::Context;
use component::churn_tasks::process::HostProcess;
use futures::StreamExt;
use std::sync::Arc;
use tokio::io::AsyncWriteExt;
use tokio::sync::Mutex;
use wasmtime::component::*;
use wasmtime::{Config, Engine, Store};
use wasmtime_wasi::{DirPerms, FilePerms, WasiCtx, WasiCtxBuilder, WasiView};
use super::config::AgentConfig;
wasmtime::component::bindgen!({
path: "wit/world.wit",
//world: "churn",
async: true,
with: {
"component:churn-tasks/process/process": CustomProcess,
"component:churn-tasks/http/client": http::HttpClient
}
});
mod http;
pub struct CustomProcess {
agent_config: AgentConfig,
}
impl CustomProcess {
pub fn new(agent_config: AgentConfig) -> Self {
Self { agent_config }
}
pub fn run(&self, args: Vec<String>) -> String {
tracing::info!("calling function");
match args.split_first() {
Some((item, rest)) => {
let mut cmd = std::process::Command::new(item);
match cmd.args(rest).output() {
Ok(output) => std::str::from_utf8(&output.stdout)
.expect("to be able to parse utf8")
.to_string(),
Err(e) => {
tracing::error!("command failed with output: {e}");
e.to_string()
}
}
}
None => {
tracing::warn!("failed to call function because it is empty");
panic!("failed to call function because it is empty")
}
}
}
pub fn get_label(&self, label_key: &str) -> Option<String> {
self.agent_config.labels.get(label_key).cloned()
}
}
#[derive(Clone)]
pub struct PluginStore {
inner: Arc<Mutex<InnerPluginStore>>,
}
impl PluginStore {
pub fn new(config: AgentConfig) -> anyhow::Result<Self> {
Ok(Self {
inner: Arc::new(Mutex::new(InnerPluginStore::new(config)?)),
})
}
pub async fn id(&self, plugin: &str) -> anyhow::Result<String> {
let mut inner = self.inner.lock().await;
inner.id(plugin).await
}
pub async fn execute(&self, plugin: &str) -> anyhow::Result<()> {
let mut inner = self.inner.lock().await;
// FIXME: hack to avoid memory leak issues from instantiating plugins
*inner = InnerPluginStore::new(inner.agent_config.clone())?;
inner.execute(plugin).await
}
}
pub struct InnerPluginStore {
store: wasmtime::Store<ServerWasiView>,
linker: wasmtime::component::Linker<ServerWasiView>,
engine: wasmtime::Engine,
agent_config: AgentConfig,
}
impl InnerPluginStore {
pub fn new(agent_config: AgentConfig) -> anyhow::Result<Self> {
let mut config = Config::default();
config.wasm_component_model(true);
config.async_support(true);
let engine = Engine::new(&config)?;
let mut linker: wasmtime::component::Linker<ServerWasiView> = Linker::new(&engine);
// Add the command world (aka WASI CLI) to the linker
wasmtime_wasi::add_to_linker_async(&mut linker).context("Failed to link command world")?;
component::churn_tasks::process::add_to_linker(
&mut linker,
|state: &mut ServerWasiView| state,
)?;
component::churn_tasks::http::add_to_linker(&mut linker, |state: &mut ServerWasiView| {
state
})?;
let wasi_view = ServerWasiView::new(agent_config.clone());
let store = Store::new(&engine, wasi_view);
Ok(Self {
store,
linker,
engine,
agent_config,
})
}
pub async fn id(&mut self, plugin: &str) -> anyhow::Result<String> {
let plugin = self.ensure_plugin(plugin).await?;
plugin
.interface0
.call_id(&mut self.store)
.await
.context("Failed to call add function")
}
pub async fn execute(&mut self, plugin: &str) -> anyhow::Result<()> {
let plugin = self.ensure_plugin(plugin).await?;
self.store.gc_async().await;
if plugin
.interface0
.call_should_run(&mut self.store)
.await
.context("Failed to call should run")?
{
tracing::info!("job was marked as required to run");
return plugin
.interface0
.call_execute(&mut self.store)
.await
.context("Failed to call add function");
}
Ok(())
}
async fn ensure_plugin(&mut self, plugin: &str) -> anyhow::Result<Churn> {
let cache = dirs::cache_dir()
.ok_or(anyhow::anyhow!("failed to find cache dir"))?
.join("io.kjuulh.churn");
let (plugin_name, plugin_version) = plugin.split_once("@").unwrap_or((plugin, "latest"));
let plugin_path = cache
.join("plugins")
.join(plugin_name)
.join(plugin_version)
.join(format!("{plugin_name}.wasm"));
let no_cache: bool = std::env::var("CHURN_NO_CACHE")
.unwrap_or("false".into())
.parse()?;
if !plugin_path.exists() || no_cache {
tracing::info!(
plugin_name = plugin_name,
plugin_version = plugin_version,
"downloading plugin"
);
if let Some(parent) = plugin_path.parent() {
tokio::fs::create_dir_all(parent).await?;
}
let req = reqwest::get(format!("https://api-minio.front.kjuulh.io/churn-registry/{plugin_name}/{plugin_version}/{plugin_name}.wasm")).await.context("failed to get plugin from registry")?;
let mut stream = req.bytes_stream();
tracing::info!(
plugin_name = plugin_name,
plugin_path = plugin_path.display().to_string(),
"writing plugin to file"
);
let mut file = tokio::fs::File::create(&plugin_path).await?;
while let Some(chunk) = stream.next().await {
let chunk = chunk?;
file.write_all(&chunk).await?;
}
file.flush().await?;
}
let component =
Component::from_file(&self.engine, plugin_path).context("Component file not found")?;
tracing::debug!(
plugin_name = plugin_name,
plugin_version = plugin_version,
"instantiating plugin"
);
let instance = Churn::instantiate_async(&mut self.store, &component, &self.linker)
.await
.context("Failed to instantiate the example world")
.unwrap();
Ok(instance)
}
}
struct ServerWasiView {
table: ResourceTable,
ctx: WasiCtx,
processes: ResourceTable,
clients: ResourceTable,
agent_config: AgentConfig,
}
impl ServerWasiView {
fn new(agent_config: AgentConfig) -> Self {
let table = ResourceTable::new();
let ctx = WasiCtxBuilder::new()
.inherit_stdio()
.inherit_stdout()
.inherit_env()
.inherit_stderr()
.inherit_network()
.preopened_dir("/", "/", DirPerms::all(), FilePerms::all())
.expect("to be able to open root")
.build();
Self {
table,
ctx,
processes: ResourceTable::default(),
clients: ResourceTable::default(),
agent_config,
}
}
}
impl WasiView for ServerWasiView {
fn table(&mut self) -> &mut ResourceTable {
&mut self.table
}
fn ctx(&mut self) -> &mut WasiCtx {
&mut self.ctx
}
}
impl component::churn_tasks::process::Host for ServerWasiView {}
#[async_trait::async_trait]
impl HostProcess for ServerWasiView {
async fn new(
&mut self,
) -> wasmtime::component::Resource<component::churn_tasks::process::Process> {
self.processes
.push(CustomProcess::new(self.agent_config.clone()))
.unwrap()
}
async fn run_process(
&mut self,
self_: wasmtime::component::Resource<component::churn_tasks::process::Process>,
inputs: wasmtime::component::__internal::Vec<String>,
) -> String {
let process = self.processes.get(&self_).unwrap();
process.run(inputs)
}
async fn get_variable(
&mut self,
self_: wasmtime::component::Resource<component::churn_tasks::process::Process>,
key: wasmtime::component::__internal::String,
) -> String {
let process = self.processes.get(&self_).unwrap();
process.get_label(&key).unwrap()
}
async fn drop(
&mut self,
rep: wasmtime::component::Resource<component::churn_tasks::process::Process>,
) -> wasmtime::Result<()> {
self.processes.delete(rep)?;
Ok(())
}
}
impl component::churn_tasks::http::Host for ServerWasiView {}
#[async_trait::async_trait]
impl component::churn_tasks::http::HostClient for ServerWasiView {
async fn new(&mut self) -> wasmtime::component::Resource<component::churn_tasks::http::Client> {
self.clients.push(http::HttpClient::new()).unwrap()
}
async fn get(
&mut self,
self_: wasmtime::component::Resource<component::churn_tasks::http::Client>,
url: wasmtime::component::__internal::String,
) -> Vec<u8> {
let process = self.clients.get(&self_).unwrap();
process
.get(&url)
.await
.expect("to be able to make http call")
}
async fn drop(
&mut self,
rep: wasmtime::component::Resource<component::churn_tasks::http::Client>,
) -> wasmtime::Result<()> {
self.clients.delete(rep)?;
Ok(())
}
}

View File

@ -0,0 +1,12 @@
pub struct HttpClient {}
impl HttpClient {
pub fn new() -> Self {
Self {}
}
pub async fn get(&self, url: &str) -> anyhow::Result<Vec<u8>> {
let bytes = reqwest::get(url).await?.bytes().await?;
Ok(bytes.into())
}
}

View File

@ -0,0 +1,67 @@
use std::sync::Arc;
use notmad::{Component, MadError};
use tokio::sync::Mutex;
use super::{models::Commands, scheduler::Scheduler};
#[derive(Clone)]
pub struct AgentQueue {
sender: Arc<tokio::sync::mpsc::Sender<Commands>>,
receiver: Arc<Mutex<tokio::sync::mpsc::Receiver<Commands>>>,
scheduler: Scheduler,
}
impl AgentQueue {
pub fn new(scheduler: Scheduler) -> Self {
let (tx, rx) = tokio::sync::mpsc::channel(5);
Self {
sender: Arc::new(tx),
receiver: Arc::new(Mutex::new(rx)),
scheduler,
}
}
pub async fn handler(&self, command: Commands) -> anyhow::Result<()> {
tracing::debug!("handling task");
self.scheduler.handle(command).await?;
Ok(())
}
pub async fn publish(&self, command: Commands) -> anyhow::Result<()> {
tracing::debug!("publishing task: {}", command.to_string());
self.sender.send(command).await?;
Ok(())
}
}
#[async_trait::async_trait]
impl Component for AgentQueue {
async fn run(
&self,
cancellation_token: tokio_util::sync::CancellationToken,
) -> Result<(), notmad::MadError> {
loop {
let mut recv = self.receiver.lock().await;
tokio::select! {
res = recv.recv() => {
if let Some(res) = res {
self.handler(res).await.map_err(MadError::Inner)?;
}
}
_ = cancellation_token.cancelled() => {
break
}
}
}
Ok(())
}
}

View File

@ -1,29 +1,39 @@
use anyhow::Context;
use std::collections::BTreeMap;
use super::agent_state::AgentState;
use crate::agent::models::Commands;
use super::{agent_state::AgentState, queue::AgentQueue};
#[derive(Clone)]
pub struct AgentRefresh {
_state: AgentState,
host: String,
process_host: String,
queue: AgentQueue,
}
impl AgentRefresh {
pub fn new(state: impl Into<AgentState>, host: impl Into<String>) -> Self {
pub fn new(state: impl Into<AgentState>) -> Self {
let state: AgentState = state.into();
Self {
_state: state.into(),
host: host.into(),
process_host: state.discovery.process_host.clone(),
queue: state.queue.clone(),
}
}
}
#[async_trait::async_trait]
impl notmad::Component for AgentRefresh {
fn name(&self) -> Option<String> {
Some("agent_refresh".into())
}
async fn run(
&self,
cancellation_token: tokio_util::sync::CancellationToken,
) -> Result<(), notmad::MadError> {
let cancel = nodrift::schedule_drifter(std::time::Duration::from_secs(60), self.clone());
// let cancel =
// nodrift::schedule_drifter(std::time::Duration::from_secs(60 * 10), self.clone());
let cancel =
nodrift::schedule_drifter(std::time::Duration::from_secs(60 * 5), self.clone());
tokio::select! {
_ = cancel.cancelled() => {},
_ = cancellation_token.cancelled() => {
@ -39,82 +49,14 @@ impl notmad::Component for AgentRefresh {
#[async_trait::async_trait]
impl nodrift::Drifter for AgentRefresh {
async fn execute(&self, _token: tokio_util::sync::CancellationToken) -> anyhow::Result<()> {
tracing::info!(host = self.host, "refreshing agent");
tracing::info!(process_host = self.process_host, "refreshing agent");
// Get plan
let plan = Plan::new();
let tasks = plan.tasks().await?;
// For task
for task in tasks {
// Check idempotency rules
if !task.should_run().await? {
tracing::debug!(task = task.id(), "skipping run");
continue;
}
// Run task if not valid
tracing::info!(task = task.id(), "executing task");
task.execute().await?;
}
Ok(())
}
}
pub struct Plan {}
impl Plan {
pub fn new() -> Self {
Self {}
}
pub async fn tasks(&self) -> anyhow::Result<Vec<Task>> {
Ok(vec![Task::new()])
}
}
pub struct Task {}
impl Task {
pub fn new() -> Self {
Self {}
}
pub fn id(&self) -> String {
"apt".into()
}
pub async fn should_run(&self) -> anyhow::Result<bool> {
Ok(true)
}
pub async fn execute(&self) -> anyhow::Result<()> {
let mut cmd = tokio::process::Command::new("apt-get");
cmd.args(["update", "-q"]);
let output = cmd.output().await.context("failed to run apt update")?;
match output.status.success() {
true => tracing::info!("successfully ran apt update"),
false => {
anyhow::bail!(
"failed to run apt update: {}",
std::str::from_utf8(&output.stderr)?
);
}
}
let mut cmd = tokio::process::Command::new("apt-get");
cmd.env("DEBIAN_FRONTEND", "noninteractive")
.args(["upgrade", "-y"]);
let output = cmd.output().await.context("failed to run apt upgrade")?;
match output.status.success() {
true => tracing::info!("successfully ran apt upgrade"),
false => {
anyhow::bail!(
"failed to run apt upgrade: {}",
std::str::from_utf8(&output.stderr)?
);
}
}
self.queue
.publish(Commands::ScheduleTask {
task: "update".into(),
properties: BTreeMap::default(),
})
.await?;
Ok(())
}

View File

@ -0,0 +1,22 @@
use super::{handlers::scheduled_tasks::ScheduledTasks, models::Commands};
#[derive(Clone)]
pub struct Scheduler {
scheduled_tasks: ScheduledTasks,
}
impl Scheduler {
pub fn new(scheduled_tasks: ScheduledTasks) -> Self {
Self { scheduled_tasks }
}
pub async fn handle(&self, command: Commands) -> anyhow::Result<()> {
match command {
Commands::ScheduleTask { task, properties } => {
self.scheduled_tasks.handle(&task, properties).await?;
}
}
Ok(())
}
}

View File

@ -0,0 +1,45 @@
use std::sync::Arc;
#[async_trait::async_trait]
pub trait Task {
async fn id(&self) -> anyhow::Result<String>;
async fn should_run(&self) -> anyhow::Result<bool> {
Ok(true)
}
async fn execute(&self) -> anyhow::Result<()>;
}
pub trait IntoTask {
fn into_task(self) -> ConcreteTask;
}
#[derive(Clone)]
pub struct ConcreteTask {
inner: Arc<dyn Task + Sync + Send + 'static>,
}
impl ConcreteTask {
pub fn new<T: Task + Sync + Send + 'static>(t: T) -> Self {
Self { inner: Arc::new(t) }
}
}
impl std::ops::Deref for ConcreteTask {
type Target = Arc<dyn Task + Sync + Send + 'static>;
fn deref(&self) -> &Self::Target {
&self.inner
}
}
impl IntoTask for ConcreteTask {
fn into_task(self) -> ConcreteTask {
self
}
}
impl<T: Task + Sync + Send + 'static> IntoTask for T {
fn into_task(self) -> ConcreteTask {
ConcreteTask::new(self)
}
}

View File

@ -1,6 +1,12 @@
use std::net::SocketAddr;
use axum::{extract::MatchedPath, http::Request, routing::get, Router};
use axum::{
extract::{MatchedPath, State},
http::Request,
routing::get,
Json, Router,
};
use serde::{Deserialize, Serialize};
use tokio_util::sync::CancellationToken;
use tower_http::trace::TraceLayer;
@ -22,6 +28,7 @@ impl Api {
pub async fn serve(&self) -> anyhow::Result<()> {
let app = Router::new()
.route("/", get(root))
.route("/discovery", get(discovery))
.with_state(self.state.clone())
.layer(
TraceLayer::new_for_http().make_span_with(|request: &Request<_>| {
@ -55,6 +62,28 @@ async fn root() -> &'static str {
"Hello, churn!"
}
#[derive(Serialize, Deserialize)]
pub struct Discovery {
pub external_host: String,
pub process_host: String,
}
impl Discovery {
pub async fn get_from_host(host: &str) -> anyhow::Result<Self> {
let resp = reqwest::get(format!("{}/discovery", host.trim_end_matches('/'))).await?;
let s: Self = resp.json().await?;
Ok(s)
}
}
async fn discovery(State(state): State<SharedState>) -> Json<Discovery> {
Json(Discovery {
external_host: state.config.external_host.clone(),
process_host: state.config.process_host.clone(),
})
}
#[async_trait::async_trait]
impl notmad::Component for Api {
async fn run(&self, _cancellation_token: CancellationToken) -> Result<(), notmad::MadError> {

View File

@ -1,28 +1,46 @@
use std::net::SocketAddr;
use std::{collections::BTreeMap, net::SocketAddr};
use clap::{Parser, Subcommand};
use crate::{agent, api, state::SharedState};
use crate::{agent, server};
pub async fn execute() -> anyhow::Result<()> {
let state = SharedState::new().await?;
let cli = Command::parse();
match cli.command.expect("to have a subcommand") {
Commands::Serve { host } => {
Commands::Serve {
host,
grpc_host,
config,
} => {
tracing::info!("Starting service");
notmad::Mad::builder()
.add(api::Api::new(&state, host))
.run()
.await?;
server::execute(host, grpc_host, config).await?;
}
Commands::Agent { commands } => match commands {
AgentCommands::Start { host } => {
AgentCommands::Start {} => {
tracing::info!("starting agent");
agent::execute(host).await?;
agent::execute().await?;
tracing::info!("shut down agent");
}
AgentCommands::Setup {
force,
discovery,
labels,
} => {
let mut setup_labels = BTreeMap::new();
for (k, v) in labels {
setup_labels.insert(k, v);
}
if !setup_labels.contains_key("node_name") {
setup_labels.insert(
"node_name".into(),
petname::petname(2, "-").expect("to be able to generate a valid petname"),
);
}
agent::setup_config(discovery, force, setup_labels).await?;
tracing::info!("wrote default agent config");
}
},
}
@ -41,6 +59,12 @@ enum Commands {
Serve {
#[arg(env = "SERVICE_HOST", long, default_value = "127.0.0.1:3000")]
host: SocketAddr,
#[arg(env = "SERVICE_GRPC_HOST", long, default_value = "127.0.0.1:7900")]
grpc_host: SocketAddr,
#[clap(flatten)]
config: server::config::ServerConfig,
},
Agent {
#[command(subcommand)]
@ -50,8 +74,22 @@ enum Commands {
#[derive(Subcommand)]
enum AgentCommands {
Start {
#[arg(env = "SERVICE_HOST", long = "service-host")]
host: String,
Start {},
Setup {
#[arg(long, default_value = "false")]
force: bool,
#[arg(env = "DISCOVERY_HOST", long = "discovery")]
discovery: String,
#[arg(long = "label", short = 'l', value_parser = parse_key_val, action = clap::ArgAction::Append)]
labels: Vec<(String, String)>,
},
}
fn parse_key_val(s: &str) -> Result<(String, String), String> {
let (key, value) = s
.split_once("=")
.ok_or_else(|| format!("invalid key=value: no `=` found in `{s}`"))?;
Ok((key.to_string(), value.to_string()))
}

View File

@ -0,0 +1,52 @@
// @generated
// This file is @generated by prost-build.
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct GetKeyRequest {
#[prost(string, tag="1")]
pub namespace: ::prost::alloc::string::String,
#[prost(string, optional, tag="2")]
pub id: ::core::option::Option<::prost::alloc::string::String>,
#[prost(string, tag="3")]
pub key: ::prost::alloc::string::String,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct GetKeyResponse {
#[prost(string, optional, tag="1")]
pub value: ::core::option::Option<::prost::alloc::string::String>,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct SetKeyRequest {
#[prost(string, tag="1")]
pub namespace: ::prost::alloc::string::String,
#[prost(string, optional, tag="2")]
pub id: ::core::option::Option<::prost::alloc::string::String>,
#[prost(string, tag="3")]
pub key: ::prost::alloc::string::String,
#[prost(string, tag="4")]
pub value: ::prost::alloc::string::String,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, Copy, PartialEq, ::prost::Message)]
pub struct SetKeyResponse {
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ListenEventsRequest {
#[prost(string, tag="1")]
pub namespace: ::prost::alloc::string::String,
#[prost(string, optional, tag="2")]
pub id: ::core::option::Option<::prost::alloc::string::String>,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ListenEventsResponse {
#[prost(string, tag="1")]
pub id: ::prost::alloc::string::String,
#[prost(string, tag="2")]
pub value: ::prost::alloc::string::String,
}
include!("churn.v1.tonic.rs");
// @@protoc_insertion_point(module)

View File

@ -0,0 +1,435 @@
// @generated
/// Generated client implementations.
pub mod churn_client {
#![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)]
use tonic::codegen::*;
use tonic::codegen::http::Uri;
#[derive(Debug, Clone)]
pub struct ChurnClient<T> {
inner: tonic::client::Grpc<T>,
}
impl ChurnClient<tonic::transport::Channel> {
/// Attempt to create a new client by connecting to a given endpoint.
pub async fn connect<D>(dst: D) -> Result<Self, tonic::transport::Error>
where
D: TryInto<tonic::transport::Endpoint>,
D::Error: Into<StdError>,
{
let conn = tonic::transport::Endpoint::new(dst)?.connect().await?;
Ok(Self::new(conn))
}
}
impl<T> ChurnClient<T>
where
T: tonic::client::GrpcService<tonic::body::BoxBody>,
T::Error: Into<StdError>,
T::ResponseBody: Body<Data = Bytes> + Send + 'static,
<T::ResponseBody as Body>::Error: Into<StdError> + Send,
{
pub fn new(inner: T) -> Self {
let inner = tonic::client::Grpc::new(inner);
Self { inner }
}
pub fn with_origin(inner: T, origin: Uri) -> Self {
let inner = tonic::client::Grpc::with_origin(inner, origin);
Self { inner }
}
pub fn with_interceptor<F>(
inner: T,
interceptor: F,
) -> ChurnClient<InterceptedService<T, F>>
where
F: tonic::service::Interceptor,
T::ResponseBody: Default,
T: tonic::codegen::Service<
http::Request<tonic::body::BoxBody>,
Response = http::Response<
<T as tonic::client::GrpcService<tonic::body::BoxBody>>::ResponseBody,
>,
>,
<T as tonic::codegen::Service<
http::Request<tonic::body::BoxBody>,
>>::Error: Into<StdError> + Send + Sync,
{
ChurnClient::new(InterceptedService::new(inner, interceptor))
}
/// Compress requests with the given encoding.
///
/// This requires the server to support it otherwise it might respond with an
/// error.
#[must_use]
pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self {
self.inner = self.inner.send_compressed(encoding);
self
}
/// Enable decompressing responses.
#[must_use]
pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self {
self.inner = self.inner.accept_compressed(encoding);
self
}
/// Limits the maximum size of a decoded message.
///
/// Default: `4MB`
#[must_use]
pub fn max_decoding_message_size(mut self, limit: usize) -> Self {
self.inner = self.inner.max_decoding_message_size(limit);
self
}
/// Limits the maximum size of an encoded message.
///
/// Default: `usize::MAX`
#[must_use]
pub fn max_encoding_message_size(mut self, limit: usize) -> Self {
self.inner = self.inner.max_encoding_message_size(limit);
self
}
pub async fn get_key(
&mut self,
request: impl tonic::IntoRequest<super::GetKeyRequest>,
) -> std::result::Result<tonic::Response<super::GetKeyResponse>, tonic::Status> {
self.inner
.ready()
.await
.map_err(|e| {
tonic::Status::new(
tonic::Code::Unknown,
format!("Service was not ready: {}", e.into()),
)
})?;
let codec = tonic::codec::ProstCodec::default();
let path = http::uri::PathAndQuery::from_static("/churn.v1.Churn/GetKey");
let mut req = request.into_request();
req.extensions_mut().insert(GrpcMethod::new("churn.v1.Churn", "GetKey"));
self.inner.unary(req, path, codec).await
}
pub async fn set_key(
&mut self,
request: impl tonic::IntoRequest<super::SetKeyRequest>,
) -> std::result::Result<tonic::Response<super::SetKeyResponse>, tonic::Status> {
self.inner
.ready()
.await
.map_err(|e| {
tonic::Status::new(
tonic::Code::Unknown,
format!("Service was not ready: {}", e.into()),
)
})?;
let codec = tonic::codec::ProstCodec::default();
let path = http::uri::PathAndQuery::from_static("/churn.v1.Churn/SetKey");
let mut req = request.into_request();
req.extensions_mut().insert(GrpcMethod::new("churn.v1.Churn", "SetKey"));
self.inner.unary(req, path, codec).await
}
pub async fn listen_events(
&mut self,
request: impl tonic::IntoRequest<super::ListenEventsRequest>,
) -> std::result::Result<
tonic::Response<tonic::codec::Streaming<super::ListenEventsResponse>>,
tonic::Status,
> {
self.inner
.ready()
.await
.map_err(|e| {
tonic::Status::new(
tonic::Code::Unknown,
format!("Service was not ready: {}", e.into()),
)
})?;
let codec = tonic::codec::ProstCodec::default();
let path = http::uri::PathAndQuery::from_static(
"/churn.v1.Churn/ListenEvents",
);
let mut req = request.into_request();
req.extensions_mut()
.insert(GrpcMethod::new("churn.v1.Churn", "ListenEvents"));
self.inner.server_streaming(req, path, codec).await
}
}
}
/// Generated server implementations.
pub mod churn_server {
#![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)]
use tonic::codegen::*;
/// Generated trait containing gRPC methods that should be implemented for use with ChurnServer.
#[async_trait]
pub trait Churn: Send + Sync + 'static {
async fn get_key(
&self,
request: tonic::Request<super::GetKeyRequest>,
) -> std::result::Result<tonic::Response<super::GetKeyResponse>, tonic::Status>;
async fn set_key(
&self,
request: tonic::Request<super::SetKeyRequest>,
) -> std::result::Result<tonic::Response<super::SetKeyResponse>, tonic::Status>;
/// Server streaming response type for the ListenEvents method.
type ListenEventsStream: tonic::codegen::tokio_stream::Stream<
Item = std::result::Result<super::ListenEventsResponse, tonic::Status>,
>
+ Send
+ 'static;
async fn listen_events(
&self,
request: tonic::Request<super::ListenEventsRequest>,
) -> std::result::Result<
tonic::Response<Self::ListenEventsStream>,
tonic::Status,
>;
}
#[derive(Debug)]
pub struct ChurnServer<T: Churn> {
inner: _Inner<T>,
accept_compression_encodings: EnabledCompressionEncodings,
send_compression_encodings: EnabledCompressionEncodings,
max_decoding_message_size: Option<usize>,
max_encoding_message_size: Option<usize>,
}
struct _Inner<T>(Arc<T>);
impl<T: Churn> ChurnServer<T> {
pub fn new(inner: T) -> Self {
Self::from_arc(Arc::new(inner))
}
pub fn from_arc(inner: Arc<T>) -> Self {
let inner = _Inner(inner);
Self {
inner,
accept_compression_encodings: Default::default(),
send_compression_encodings: Default::default(),
max_decoding_message_size: None,
max_encoding_message_size: None,
}
}
pub fn with_interceptor<F>(
inner: T,
interceptor: F,
) -> InterceptedService<Self, F>
where
F: tonic::service::Interceptor,
{
InterceptedService::new(Self::new(inner), interceptor)
}
/// Enable decompressing requests with the given encoding.
#[must_use]
pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self {
self.accept_compression_encodings.enable(encoding);
self
}
/// Compress responses with the given encoding, if the client supports it.
#[must_use]
pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self {
self.send_compression_encodings.enable(encoding);
self
}
/// Limits the maximum size of a decoded message.
///
/// Default: `4MB`
#[must_use]
pub fn max_decoding_message_size(mut self, limit: usize) -> Self {
self.max_decoding_message_size = Some(limit);
self
}
/// Limits the maximum size of an encoded message.
///
/// Default: `usize::MAX`
#[must_use]
pub fn max_encoding_message_size(mut self, limit: usize) -> Self {
self.max_encoding_message_size = Some(limit);
self
}
}
impl<T, B> tonic::codegen::Service<http::Request<B>> for ChurnServer<T>
where
T: Churn,
B: Body + Send + 'static,
B::Error: Into<StdError> + Send + 'static,
{
type Response = http::Response<tonic::body::BoxBody>;
type Error = std::convert::Infallible;
type Future = BoxFuture<Self::Response, Self::Error>;
fn poll_ready(
&mut self,
_cx: &mut Context<'_>,
) -> Poll<std::result::Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, req: http::Request<B>) -> Self::Future {
let inner = self.inner.clone();
match req.uri().path() {
"/churn.v1.Churn/GetKey" => {
#[allow(non_camel_case_types)]
struct GetKeySvc<T: Churn>(pub Arc<T>);
impl<T: Churn> tonic::server::UnaryService<super::GetKeyRequest>
for GetKeySvc<T> {
type Response = super::GetKeyResponse;
type Future = BoxFuture<
tonic::Response<Self::Response>,
tonic::Status,
>;
fn call(
&mut self,
request: tonic::Request<super::GetKeyRequest>,
) -> Self::Future {
let inner = Arc::clone(&self.0);
let fut = async move {
<T as Churn>::get_key(&inner, request).await
};
Box::pin(fut)
}
}
let accept_compression_encodings = self.accept_compression_encodings;
let send_compression_encodings = self.send_compression_encodings;
let max_decoding_message_size = self.max_decoding_message_size;
let max_encoding_message_size = self.max_encoding_message_size;
let inner = self.inner.clone();
let fut = async move {
let inner = inner.0;
let method = GetKeySvc(inner);
let codec = tonic::codec::ProstCodec::default();
let mut grpc = tonic::server::Grpc::new(codec)
.apply_compression_config(
accept_compression_encodings,
send_compression_encodings,
)
.apply_max_message_size_config(
max_decoding_message_size,
max_encoding_message_size,
);
let res = grpc.unary(method, req).await;
Ok(res)
};
Box::pin(fut)
}
"/churn.v1.Churn/SetKey" => {
#[allow(non_camel_case_types)]
struct SetKeySvc<T: Churn>(pub Arc<T>);
impl<T: Churn> tonic::server::UnaryService<super::SetKeyRequest>
for SetKeySvc<T> {
type Response = super::SetKeyResponse;
type Future = BoxFuture<
tonic::Response<Self::Response>,
tonic::Status,
>;
fn call(
&mut self,
request: tonic::Request<super::SetKeyRequest>,
) -> Self::Future {
let inner = Arc::clone(&self.0);
let fut = async move {
<T as Churn>::set_key(&inner, request).await
};
Box::pin(fut)
}
}
let accept_compression_encodings = self.accept_compression_encodings;
let send_compression_encodings = self.send_compression_encodings;
let max_decoding_message_size = self.max_decoding_message_size;
let max_encoding_message_size = self.max_encoding_message_size;
let inner = self.inner.clone();
let fut = async move {
let inner = inner.0;
let method = SetKeySvc(inner);
let codec = tonic::codec::ProstCodec::default();
let mut grpc = tonic::server::Grpc::new(codec)
.apply_compression_config(
accept_compression_encodings,
send_compression_encodings,
)
.apply_max_message_size_config(
max_decoding_message_size,
max_encoding_message_size,
);
let res = grpc.unary(method, req).await;
Ok(res)
};
Box::pin(fut)
}
"/churn.v1.Churn/ListenEvents" => {
#[allow(non_camel_case_types)]
struct ListenEventsSvc<T: Churn>(pub Arc<T>);
impl<
T: Churn,
> tonic::server::ServerStreamingService<super::ListenEventsRequest>
for ListenEventsSvc<T> {
type Response = super::ListenEventsResponse;
type ResponseStream = T::ListenEventsStream;
type Future = BoxFuture<
tonic::Response<Self::ResponseStream>,
tonic::Status,
>;
fn call(
&mut self,
request: tonic::Request<super::ListenEventsRequest>,
) -> Self::Future {
let inner = Arc::clone(&self.0);
let fut = async move {
<T as Churn>::listen_events(&inner, request).await
};
Box::pin(fut)
}
}
let accept_compression_encodings = self.accept_compression_encodings;
let send_compression_encodings = self.send_compression_encodings;
let max_decoding_message_size = self.max_decoding_message_size;
let max_encoding_message_size = self.max_encoding_message_size;
let inner = self.inner.clone();
let fut = async move {
let inner = inner.0;
let method = ListenEventsSvc(inner);
let codec = tonic::codec::ProstCodec::default();
let mut grpc = tonic::server::Grpc::new(codec)
.apply_compression_config(
accept_compression_encodings,
send_compression_encodings,
)
.apply_max_message_size_config(
max_decoding_message_size,
max_encoding_message_size,
);
let res = grpc.server_streaming(method, req).await;
Ok(res)
};
Box::pin(fut)
}
_ => {
Box::pin(async move {
Ok(
http::Response::builder()
.status(200)
.header("grpc-status", "12")
.header("content-type", "application/grpc")
.body(empty_body())
.unwrap(),
)
})
}
}
}
}
impl<T: Churn> Clone for ChurnServer<T> {
fn clone(&self) -> Self {
let inner = self.inner.clone();
Self {
inner,
accept_compression_encodings: self.accept_compression_encodings,
send_compression_encodings: self.send_compression_encodings,
max_decoding_message_size: self.max_decoding_message_size,
max_encoding_message_size: self.max_encoding_message_size,
}
}
}
impl<T: Churn> Clone for _Inner<T> {
fn clone(&self) -> Self {
Self(Arc::clone(&self.0))
}
}
impl<T: std::fmt::Debug> std::fmt::Debug for _Inner<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{:?}", self.0)
}
}
impl<T: Churn> tonic::server::NamedService for ChurnServer<T> {
const NAME: &'static str = "churn.v1.Churn";
}
}

View File

@ -1,8 +1,12 @@
mod api;
mod cli;
mod state;
mod grpc {
include!("grpc/churn.v1.rs");
}
mod agent;
mod server;
#[tokio::main]
async fn main() -> anyhow::Result<()> {

View File

@ -0,0 +1,23 @@
use std::net::SocketAddr;
pub mod config;
mod grpc_server;
use crate::{api, state::SharedState};
pub async fn execute(
host: impl Into<SocketAddr>,
grpc_host: impl Into<SocketAddr>,
config: config::ServerConfig,
) -> anyhow::Result<()> {
let state = SharedState::new(config).await?;
notmad::Mad::builder()
.add(api::Api::new(&state, host))
.add(grpc_server::GrpcServer::new(grpc_host.into()))
.run()
.await?;
Ok(())
}

View File

@ -0,0 +1,7 @@
#[derive(clap::Args)]
pub struct ServerConfig {
#[arg(long = "external-host", env = "EXTERNAL_HOST")]
pub external_host: String,
#[arg(long = "process-host", env = "PROCESS_HOST")]
pub process_host: String,
}

View File

@ -0,0 +1,102 @@
use std::{collections::BTreeMap, net::SocketAddr, pin::Pin};
use anyhow::Context;
use futures::Stream;
use notmad::{Component, MadError};
use tonic::transport::Server;
use crate::{agent::models::Commands, grpc::*};
#[derive(Clone)]
pub struct GrpcServer {
grpc_host: SocketAddr,
}
impl GrpcServer {
pub fn new(grpc_host: SocketAddr) -> Self {
Self { grpc_host }
}
}
#[async_trait::async_trait]
impl Component for GrpcServer {
async fn run(
&self,
cancellation_token: tokio_util::sync::CancellationToken,
) -> Result<(), notmad::MadError> {
let task = Server::builder()
.add_service(crate::grpc::churn_server::ChurnServer::new(self.clone()))
.serve(self.grpc_host);
tokio::select! {
_ = cancellation_token.cancelled() => {},
res = task => {
res.context("failed to run grpc server").map_err(MadError::Inner)?;
}
}
Ok(())
}
}
#[async_trait::async_trait]
impl crate::grpc::churn_server::Churn for GrpcServer {
async fn get_key(
&self,
request: tonic::Request<GetKeyRequest>,
) -> std::result::Result<tonic::Response<GetKeyResponse>, tonic::Status> {
todo!()
}
async fn set_key(
&self,
request: tonic::Request<SetKeyRequest>,
) -> std::result::Result<tonic::Response<SetKeyResponse>, tonic::Status> {
todo!()
}
#[doc = " Server streaming response type for the ListenEvents method."]
type ListenEventsStream =
Pin<Box<dyn Stream<Item = Result<ListenEventsResponse, tonic::Status>> + Send>>;
async fn listen_events(
&self,
request: tonic::Request<ListenEventsRequest>,
) -> std::result::Result<tonic::Response<Self::ListenEventsStream>, tonic::Status> {
let (tx, rx) = tokio::sync::mpsc::channel(128);
tokio::spawn(async move {
let mut interval = tokio::time::interval(std::time::Duration::from_secs(60 * 10));
loop {
interval.tick().await;
let Ok(schedule_task) = serde_json::to_string(&Commands::ScheduleTask {
task: "refresh".into(),
properties: BTreeMap::default(),
}) else {
tracing::warn!("failed to serialize event");
continue;
};
if let Err(e) = tx
.send(Ok(ListenEventsResponse {
id: uuid::Uuid::new_v4().to_string(),
value: schedule_task,
}))
.await
{
tracing::warn!("failed to send response: {}", e);
break;
}
}
});
let stream = futures::stream::unfold(rx, |mut msg| async move {
let next = msg.recv().await?;
Some((next, msg))
});
Ok(tonic::Response::new(Box::pin(stream)))
}
}

View File

@ -1,11 +1,13 @@
use std::{ops::Deref, sync::Arc};
use crate::server::config::ServerConfig;
#[derive(Clone)]
pub struct SharedState(Arc<State>);
impl SharedState {
pub async fn new() -> anyhow::Result<Self> {
Ok(Self(Arc::new(State::new().await?)))
pub async fn new(config: ServerConfig) -> anyhow::Result<Self> {
Ok(Self(Arc::new(State::new(config).await?)))
}
}
@ -23,10 +25,12 @@ impl Deref for SharedState {
}
}
pub struct State {}
pub struct State {
pub config: ServerConfig,
}
impl State {
pub async fn new() -> anyhow::Result<Self> {
Ok(Self {})
pub async fn new(config: ServerConfig) -> anyhow::Result<Self> {
Ok(Self { config })
}
}

View File

@ -0,0 +1,28 @@
package component:churn-tasks@0.1.0;
interface process {
resource process {
constructor();
run-process: func(inputs: list<string>) -> string;
get-variable: func(key: string) -> string;
}
}
interface http {
resource client {
constructor();
get: func(url: string) -> list<u8>;
}
}
interface task {
id: func() -> string;
should-run: func() -> bool;
execute: func();
}
world churn {
export task;
import process;
import http;
}

View File

@ -12,11 +12,30 @@ vars:
ingress:
- external: "true"
- internal: "true"
- internal_grpc: "true"
please:
project:
owner: kjuulh
repository: churn-v2
branch: main
settings:
api_url: https://git.front.kjuulh.io
actions:
rust:
cuddle/clusters:
dev:
env:
service.host: "0.0.0.0:3000"
service.grpc.host: "0.0.0.0:4001"
process.host: "https://grpc.churn.dev.internal.kjuulh.app"
external.host: "https://churn.internal.dev.kjuulh.app"
rust.log: "h2=warn,debug"
prod:
env:
service.host: "0.0.0.0:3000"
service.grpc.host: "0.0.0.0:4001"
process.host: "https://grpc.churn.prod.internal.kjuulh.app"
external.host: "https://churn.internal.prod.kjuulh.app"
rust.log: "h2=warn,debug"

View File

@ -1,97 +1,134 @@
#!/bin/bash
#!/usr/bin/env bash
set -e
# Configuration variables
GITEA_HOST="https://git.front.kjuulh.io"
REPO_OWNER="kjuulh"
REPO_NAME="churn-v2"
# Configuration
APP_NAME="churn"
APP_VERSION="latest" # or specify a version
S3_BUCKET="rust-artifacts"
BINARY_NAME="churn"
SERVICE_NAME="churn-agent"
SERVICE_USER="churn"
RELEASE_TAG="latest" # or specific version like "v1.0.0"
SERVICE_NAME="${APP_NAME}.service"
SERVICE_UPDATE_NAME="${APP_NAME}-update.service"
TIMER_UPDATE_NAME="${APP_NAME}-update.timer"
INSTALL_DIR="/usr/local/bin"
CONFIG_DIR="/etc/${APP_NAME}"
CHURN_DISCOVERY="https://churn.prod.kjuulh.app"
LOG="/var/log/churn-install.log"
# Colors for output
RED='\033[0;31m'
GREEN='\033[0;32m'
NC='\033[0m' # No Color
exec > >(tee -i ${LOG})
exec 2>&1
echo "Starting churn install $(date)"
# Check if running as root
if [ "$EUID" -ne 0 ]; then
echo "Please run as root"
echo -e "${RED}Please run as root${NC}"
exit 1
fi
# Create service user if it doesn't exist
if ! id "$SERVICE_USER" &>/dev/null; then
useradd -r -s /bin/false "$SERVICE_USER"
# Create necessary directories
echo "Creating directories..."
mkdir -p "${INSTALL_DIR}"
mkdir -p "${CONFIG_DIR}"
if systemctl is-active --quiet churn.service; then
echo "Stopping existing churn service..."
systemctl stop churn.service
fi
# Function to get latest release if RELEASE_TAG is "latest"
get_latest_release() {
curl -s "https://$GITEA_HOST/api/v1/repos/$REPO_OWNER/$REPO_NAME/releases/latest" | \
grep '"tag_name":' | \
sed -E 's/.*"([^"]+)".*/\1/'
}
# Download binary from S3
echo "Downloading binary..."
curl -L -s "https://api-minio.front.kjuulh.io/${S3_BUCKET}/releases/${APP_NAME}/${APP_VERSION}/${BINARY_NAME}" -o "${INSTALL_DIR}/${BINARY_NAME}"
# Determine the actual release tag
if [ "$RELEASE_TAG" = "latest" ]; then
RELEASE_TAG=$(get_latest_release)
# Make binary executable
chmod +x "${INSTALL_DIR}/${BINARY_NAME}"
echo "Starting churn agent setup..."
set +e
res=$(churn agent setup --discovery "${CHURN_DISCOVERY}" 2>&1)
set -e
exit_code=$?
if [[ $exit_code != 0 ]]; then
if [[ "$res" != *"config file already exists"* ]] && [[ "$res" != *"already exists"* ]]; then
echo "Error detected: $res"
exit 1
else
echo "Ignoring setup 'agent is already setup'"
fi
fi
echo "Installing $BINARY_NAME version $RELEASE_TAG..."
# Download and install binary
TMP_DIR=$(mktemp -d)
cd "$TMP_DIR"
# Download binary from Gitea
curl -L -o "$BINARY_NAME" \
"https://$GITEA_HOST/$REPO_OWNER/$REPO_NAME/releases/download/$RELEASE_TAG/$BINARY_NAME"
# Make binary executable and move to appropriate location
chmod +x "$BINARY_NAME"
mv "$BINARY_NAME" "/usr/local/bin/$BINARY_NAME"
# Create systemd service file
cat > "/etc/systemd/system/$SERVICE_NAME.service" << EOF
echo "Creating systemd service..."
cat > "/etc/systemd/system/${SERVICE_NAME}" << EOF
[Unit]
Description=$SERVICE_NAME Service
Description=${APP_NAME} service
After=network.target
[Service]
Type=simple
User=$SERVICE_USER
ExecStart=/usr/local/bin/$BINARY_NAME
User=root
Group=root
ExecStart=${INSTALL_DIR}/${BINARY_NAME} agent start
Restart=always
RestartSec=5
# Security hardening options
ProtectSystem=strict
ProtectHome=true
NoNewPrivileges=true
ReadWritePaths=/var/log/$SERVICE_NAME
RestartSec=10
Environment=RUST_LOG=h2=warn,hyper=warn,churn=debug,warn
[Install]
WantedBy=multi-user.target
EOF
# Create log directory if logging is needed
mkdir -p "/var/log/$SERVICE_NAME"
chown "$SERVICE_USER:$SERVICE_USER" "/var/log/$SERVICE_NAME"
echo "Creating churn update service..."
cat > "/etc/systemd/system/${SERVICE_UPDATE_NAME}" <<EOF
[Unit]
Description=Daily Churn Update Service
After=network-online.target
Wants=network-online.target
[Service]
Type=oneshot
ExecStart=/bin/bash -c 'curl -s https://git.front.kjuulh.io/kjuulh/churn-v2/raw/branch/main/install.sh | bash'
User=root
[Install]
WantedBy=multi-user.target
EOF
cat > "/etc/systemd/system/${TIMER_UPDATE_NAME}" <<EOF
[Unit]
Description=Run Churn Update Daily
[Timer]
OnCalendar=daily
Persistent=true
[Install]
WantedBy=timers.target
EOF
# Reload systemd and enable service
echo "Configuring systemd service..."
systemctl daemon-reload
systemctl enable "$SERVICE_NAME"
systemctl start "$SERVICE_NAME"
# Clean up
cd
rm -rf "$TMP_DIR"
systemctl enable "${SERVICE_NAME}"
systemctl start "${SERVICE_NAME}"
echo "Installation complete! Service status:"
systemctl status "$SERVICE_NAME"
systemctl enable "${SERVICE_UPDATE_NAME}"
systemctl enable "${TIMER_UPDATE_NAME}"
systemctl start "${TIMER_UPDATE_NAME}"
# Check service status
if systemctl is-active --quiet "${SERVICE_NAME}"; then
echo -e "${GREEN}Installation successful! ${APP_NAME} is running.${NC}"
echo "You can check the status with: systemctl status ${SERVICE_NAME}"
else
echo -e "${RED}Installation completed but service failed to start. Check logs with: journalctl -u ${SERVICE_NAME}${NC}"
exit 1
fi
# Provide some helpful commands
echo "
Useful commands:
- Check status: systemctl status $SERVICE_NAME
- View logs: journalctl -u $SERVICE_NAME
- Restart service: systemctl restart $SERVICE_NAME
- Stop service: systemctl stop $SERVICE_NAME
"

7
upload.sh Executable file
View File

@ -0,0 +1,7 @@
#!/usr/bin/env zsh
set -e
cargo build --release --target x86_64-unknown-linux-musl
aws s3 cp target/x86_64-unknown-linux-musl/release/churn s3://rust-artifacts/releases/churn/latest/churn --endpoint-url https://api-minio.front.kjuulh.io