feat: add discovery
All checks were successful
continuous-integration/drone/push Build is passing

Signed-off-by: kjuulh <contact@kjuulh.io>
This commit is contained in:
Kasper Juul Hermansen 2024-11-24 17:12:15 +01:00
parent c4434fd841
commit ee323e99e8
Signed by: kjuulh
GPG Key ID: D85D7535F18F35FA
20 changed files with 1482 additions and 117 deletions

6
.env
View File

@ -1 +1,5 @@
EXTERNAL_HOST=http://localhost:3000
PROCESS_HOST=http://localhost:7900
SERVICE_HOST=127.0.0.1:3000
DISCOVERY_HOST=http://127.0.0.1:3000
RUST_LOG=h2=warn,debug

899
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -15,7 +15,7 @@ axum.workspace = true
serde = { version = "1.0.197", features = ["derive"] }
uuid = { version = "1.7.0", features = ["v4"] }
tower-http = { version = "0.6.0", features = ["cors", "trace"] }
notmad = "0.6.0"
notmad = "0.7.1"
tokio-util = "0.7.12"
async-trait = "0.1.83"
nodrift = "0.2.0"
@ -24,3 +24,7 @@ prost-types = "0.13.3"
prost = "0.13.3"
bytes = "1.8.0"
tonic = { version = "0.12.3", features = ["tls", "tls-roots"] }
toml = "0.8.19"
dirs = "5.0.1"
futures = "0.3.31"
reqwest = { version = "0.12.9", features = ["json"] }

View File

@ -5,6 +5,7 @@ package churn.v1;
service Churn {
rpc GetKey(GetKeyRequest) returns (GetKeyResponse);
rpc SetKey(SetKeyRequest) returns (SetKeyResponse);
rpc ListenEvents(ListenEventsRequest) returns (stream ListenEventsResponse);
}
message GetKeyRequest {
@ -23,3 +24,11 @@ message SetKeyRequest {
string value = 4;
}
message SetKeyResponse {}
message ListenEventsRequest {
string namespace = 1;
optional string id = 2;
}
message ListenEventsResponse {
string value = 1;
}

View File

@ -1,80 +1,22 @@
use agent_state::AgentState;
use event_handler::EventHandler;
use refresh::AgentRefresh;
pub use config::setup_config;
mod agent_state;
mod config;
mod discovery_client;
mod event_handler;
mod grpc_client;
mod refresh;
mod grpc_client {
use tonic::transport::{Channel, ClientTlsConfig};
use crate::grpc::{churn_client::ChurnClient, *};
pub struct GrpcClient {
host: String,
}
impl GrpcClient {
pub fn new(host: impl Into<String>) -> Self {
Self { host: host.into() }
}
pub async fn get_key(
&self,
namespace: &str,
id: Option<impl Into<String>>,
key: &str,
) -> anyhow::Result<Option<String>> {
let mut client = self.client().await?;
let resp = client
.get_key(GetKeyRequest {
key: key.into(),
namespace: namespace.into(),
id: id.map(|i| i.into()),
})
.await?;
let resp = resp.into_inner();
Ok(resp.value)
}
pub async fn set_key(
&self,
namespace: &str,
id: Option<impl Into<String>>,
key: &str,
value: &str,
) -> anyhow::Result<()> {
let mut client = self.client().await?;
client
.set_key(SetKeyRequest {
key: key.into(),
value: value.into(),
namespace: namespace.into(),
id: id.map(|i| i.into()),
})
.await?;
Ok(())
}
async fn client(&self) -> anyhow::Result<ChurnClient<tonic::transport::Channel>> {
let channel = Channel::from_shared(self.host.to_owned())?
.tls_config(ClientTlsConfig::new().with_native_roots())?
.connect()
.await?;
let client = ChurnClient::new(channel);
Ok(client)
}
}
}
pub async fn execute(host: impl Into<String>) -> anyhow::Result<()> {
pub async fn execute() -> anyhow::Result<()> {
let state = AgentState::new().await?;
notmad::Mad::builder()
.add(AgentRefresh::new(&state, host))
.add(AgentRefresh::new(&state))
.add(EventHandler::new(&state))
.cancellation(Some(std::time::Duration::from_secs(2)))
.run()
.await?;

View File

@ -1,5 +1,9 @@
use std::{ops::Deref, sync::Arc};
use crate::api::Discovery;
use super::{config::AgentConfig, discovery_client::DiscoveryClient, grpc_client::GrpcClient};
#[derive(Clone)]
pub struct AgentState(Arc<State>);
@ -23,10 +27,24 @@ impl Deref for AgentState {
}
}
pub struct State {}
pub struct State {
pub grpc: GrpcClient,
pub config: AgentConfig,
pub discovery: Discovery,
}
impl State {
pub async fn new() -> anyhow::Result<Self> {
Ok(Self {})
let config = AgentConfig::new().await?;
let discovery = DiscoveryClient::new(&config.discovery);
let discovery = discovery.discover().await?;
let grpc = GrpcClient::new(&discovery.process_host);
Ok(Self {
grpc,
config,
discovery,
})
}
}

View File

@ -0,0 +1,80 @@
use anyhow::Context;
use serde::{Deserialize, Serialize};
use uuid::Uuid;
#[derive(Clone)]
pub struct AgentConfig {
pub agent_id: String,
pub discovery: String,
}
impl AgentConfig {
pub async fn new() -> anyhow::Result<Self> {
let config = ConfigFile::load().await?;
Ok(Self {
agent_id: config.agent_id,
discovery: config.discovery,
})
}
}
#[derive(Serialize, Deserialize)]
struct ConfigFile {
agent_id: String,
discovery: String,
}
impl ConfigFile {
pub async fn load() -> anyhow::Result<Self> {
let directory = dirs::data_dir()
.ok_or(anyhow::anyhow!("failed to get data dir"))?
.join("io.kjuulh.churn-agent")
.join("churn-agent.toml");
if !directory.exists() {
anyhow::bail!(
"No churn agent file was setup, run `churn agent setup` to setup the defaults"
)
}
let contents = tokio::fs::read_to_string(&directory).await?;
toml::from_str(&contents).context("failed to parse the contents of the churn agent config")
}
pub async fn write_default(discovery: impl Into<String>, force: bool) -> anyhow::Result<Self> {
let s = Self {
agent_id: Uuid::new_v4().to_string(),
discovery: discovery.into(),
};
let directory = dirs::data_dir()
.ok_or(anyhow::anyhow!("failed to get data dir"))?
.join("io.kjuulh.churn-agent")
.join("churn-agent.toml");
if let Some(parent) = directory.parent() {
tokio::fs::create_dir_all(&parent).await?;
}
if !force && directory.exists() {
anyhow::bail!("config file already exists, consider moving it to a backup before trying again: {}", directory.display());
}
let contents = toml::to_string_pretty(&s)
.context("failed to convert default implementation to string")?;
tokio::fs::write(directory, contents.as_bytes())
.await
.context("failed to write to agent file")?;
Ok(s)
}
}
pub async fn setup_config(discovery: impl Into<String>, force: bool) -> anyhow::Result<()> {
ConfigFile::write_default(discovery, force).await?;
Ok(())
}

View File

@ -0,0 +1,21 @@
use crate::api::Discovery;
pub struct DiscoveryClient {
host: String,
}
impl DiscoveryClient {
pub fn new(discovery_host: impl Into<String>) -> Self {
Self {
host: discovery_host.into(),
}
}
pub async fn discover(&self) -> anyhow::Result<Discovery> {
tracing::info!(
"getting details from discovery endpoint: {}/discovery",
self.host.trim_end_matches('/')
);
crate::api::Discovery::get_from_host(&self.host).await
}
}

View File

@ -0,0 +1,53 @@
use notmad::{Component, MadError};
use super::{agent_state::AgentState, config::AgentConfig, grpc_client::GrpcClient};
#[derive(Clone)]
pub struct EventHandler {
config: AgentConfig,
grpc: GrpcClient,
}
impl EventHandler {
pub fn new(state: impl Into<AgentState>) -> Self {
let state: AgentState = state.into();
Self {
config: state.config.clone(),
grpc: state.grpc.clone(),
}
}
}
#[async_trait::async_trait]
impl Component for EventHandler {
fn name(&self) -> Option<String> {
Some("event_handler".into())
}
async fn run(
&self,
cancellation_token: tokio_util::sync::CancellationToken,
) -> Result<(), notmad::MadError> {
tokio::select! {
_ = cancellation_token.cancelled() => {},
res = self.grpc.listen_events("agents", None::<String>, self.clone()) => {
res.map_err(MadError::Inner)?;
},
res = self.grpc.listen_events("agents", Some(&self.config.agent_id), self.clone()) => {
res.map_err(MadError::Inner)?;
}
}
Ok(())
}
}
#[async_trait::async_trait]
impl super::grpc_client::ListenEventsExecutor for EventHandler {
async fn execute(&self, event: crate::grpc::ListenEventsResponse) -> anyhow::Result<()> {
tracing::info!(value = event.value, "received event");
Ok(())
}
}

View File

@ -0,0 +1,100 @@
use tonic::transport::{Channel, ClientTlsConfig};
use crate::grpc::{churn_client::ChurnClient, *};
#[derive(Clone)]
pub struct GrpcClient {
host: String,
}
impl GrpcClient {
pub fn new(host: impl Into<String>) -> Self {
Self { host: host.into() }
}
pub async fn get_key(
&self,
namespace: &str,
id: Option<impl Into<String>>,
key: &str,
) -> anyhow::Result<Option<String>> {
let mut client = self.client().await?;
let resp = client
.get_key(GetKeyRequest {
key: key.into(),
namespace: namespace.into(),
id: id.map(|i| i.into()),
})
.await?;
let resp = resp.into_inner();
Ok(resp.value)
}
pub async fn set_key(
&self,
namespace: &str,
id: Option<impl Into<String>>,
key: &str,
value: &str,
) -> anyhow::Result<()> {
let mut client = self.client().await?;
client
.set_key(SetKeyRequest {
key: key.into(),
value: value.into(),
namespace: namespace.into(),
id: id.map(|i| i.into()),
})
.await?;
Ok(())
}
pub async fn listen_events(
&self,
namespace: &str,
id: Option<impl Into<String>>,
exec: impl ListenEventsExecutor,
) -> anyhow::Result<()> {
let mut client = self.client().await?;
let resp = client
.listen_events(ListenEventsRequest {
namespace: namespace.into(),
id: id.map(|i| i.into()),
})
.await?;
let mut inner = resp.into_inner();
while let Ok(Some(message)) = inner.message().await {
exec.execute(message).await?;
}
Ok(())
}
async fn client(&self) -> anyhow::Result<ChurnClient<tonic::transport::Channel>> {
let channel = if self.host.starts_with("https") {
Channel::from_shared(self.host.to_owned())?
.tls_config(ClientTlsConfig::new().with_native_roots())?
.connect()
.await?
} else {
Channel::from_shared(self.host.to_owned())?
.connect()
.await?
};
let client = ChurnClient::new(channel);
Ok(client)
}
}
#[async_trait::async_trait]
pub trait ListenEventsExecutor {
async fn execute(&self, event: ListenEventsResponse) -> anyhow::Result<()>;
}

View File

@ -4,21 +4,24 @@ use super::agent_state::AgentState;
#[derive(Clone)]
pub struct AgentRefresh {
_state: AgentState,
host: String,
process_host: String,
}
impl AgentRefresh {
pub fn new(state: impl Into<AgentState>, host: impl Into<String>) -> Self {
pub fn new(state: impl Into<AgentState>) -> Self {
let state: AgentState = state.into();
Self {
_state: state.into(),
host: host.into(),
process_host: state.discovery.process_host.clone(),
}
}
}
#[async_trait::async_trait]
impl notmad::Component for AgentRefresh {
fn name(&self) -> Option<String> {
Some("agent_refresh".into())
}
async fn run(
&self,
cancellation_token: tokio_util::sync::CancellationToken,
@ -39,7 +42,7 @@ impl notmad::Component for AgentRefresh {
#[async_trait::async_trait]
impl nodrift::Drifter for AgentRefresh {
async fn execute(&self, _token: tokio_util::sync::CancellationToken) -> anyhow::Result<()> {
tracing::info!(host = self.host, "refreshing agent");
tracing::info!(process_host = self.process_host, "refreshing agent");
// Get plan
let plan = Plan::new();

View File

@ -1,6 +1,12 @@
use std::net::SocketAddr;
use axum::{extract::MatchedPath, http::Request, routing::get, Router};
use axum::{
extract::{MatchedPath, State},
http::Request,
routing::get,
Json, Router,
};
use serde::{Deserialize, Serialize};
use tokio_util::sync::CancellationToken;
use tower_http::trace::TraceLayer;
@ -22,6 +28,7 @@ impl Api {
pub async fn serve(&self) -> anyhow::Result<()> {
let app = Router::new()
.route("/", get(root))
.route("/discovery", get(discovery))
.with_state(self.state.clone())
.layer(
TraceLayer::new_for_http().make_span_with(|request: &Request<_>| {
@ -55,6 +62,28 @@ async fn root() -> &'static str {
"Hello, churn!"
}
#[derive(Serialize, Deserialize)]
pub struct Discovery {
pub external_host: String,
pub process_host: String,
}
impl Discovery {
pub async fn get_from_host(host: &str) -> anyhow::Result<Self> {
let resp = reqwest::get(format!("{}/discovery", host.trim_end_matches('/'))).await?;
let s: Self = resp.json().await?;
Ok(s)
}
}
async fn discovery(State(state): State<SharedState>) -> Json<Discovery> {
Json(Discovery {
external_host: state.config.external_host.clone(),
process_host: state.config.process_host.clone(),
})
}
#[async_trait::async_trait]
impl notmad::Component for Api {
async fn run(&self, _cancellation_token: CancellationToken) -> Result<(), notmad::MadError> {

View File

@ -2,27 +2,29 @@ use std::net::SocketAddr;
use clap::{Parser, Subcommand};
use crate::{agent, api, state::SharedState};
use crate::{agent, server};
pub async fn execute() -> anyhow::Result<()> {
let state = SharedState::new().await?;
let cli = Command::parse();
match cli.command.expect("to have a subcommand") {
Commands::Serve { host } => {
Commands::Serve {
host,
grpc_host,
config,
} => {
tracing::info!("Starting service");
notmad::Mad::builder()
.add(api::Api::new(&state, host))
.run()
.await?;
server::execute(host, grpc_host, config).await?;
}
Commands::Agent { commands } => match commands {
AgentCommands::Start { host } => {
AgentCommands::Start {} => {
tracing::info!("starting agent");
agent::execute(host).await?;
agent::execute().await?;
tracing::info!("shut down agent");
}
AgentCommands::Setup { force, discovery } => {
agent::setup_config(discovery, force).await?;
tracing::info!("wrote default agent config");
}
},
}
@ -41,6 +43,12 @@ enum Commands {
Serve {
#[arg(env = "SERVICE_HOST", long, default_value = "127.0.0.1:3000")]
host: SocketAddr,
#[arg(env = "SERVICE_GRPC_HOST", long, default_value = "127.0.0.1:7900")]
grpc_host: SocketAddr,
#[clap(flatten)]
config: server::config::ServerConfig,
},
Agent {
#[command(subcommand)]
@ -50,8 +58,12 @@ enum Commands {
#[derive(Subcommand)]
enum AgentCommands {
Start {
#[arg(env = "SERVICE_HOST", long = "service-host")]
host: String,
Start {},
Setup {
#[arg(long, default_value = "false")]
force: bool,
#[arg(env = "DISCOVERY_HOST", long = "discovery")]
discovery: String,
},
}

View File

@ -32,5 +32,19 @@ pub struct SetKeyRequest {
#[derive(Clone, Copy, PartialEq, ::prost::Message)]
pub struct SetKeyResponse {
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ListenEventsRequest {
#[prost(string, tag="1")]
pub namespace: ::prost::alloc::string::String,
#[prost(string, optional, tag="2")]
pub id: ::core::option::Option<::prost::alloc::string::String>,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ListenEventsResponse {
#[prost(string, tag="1")]
pub value: ::prost::alloc::string::String,
}
include!("churn.v1.tonic.rs");
// @@protoc_insertion_point(module)

View File

@ -122,6 +122,31 @@ pub mod churn_client {
req.extensions_mut().insert(GrpcMethod::new("churn.v1.Churn", "SetKey"));
self.inner.unary(req, path, codec).await
}
pub async fn listen_events(
&mut self,
request: impl tonic::IntoRequest<super::ListenEventsRequest>,
) -> std::result::Result<
tonic::Response<tonic::codec::Streaming<super::ListenEventsResponse>>,
tonic::Status,
> {
self.inner
.ready()
.await
.map_err(|e| {
tonic::Status::new(
tonic::Code::Unknown,
format!("Service was not ready: {}", e.into()),
)
})?;
let codec = tonic::codec::ProstCodec::default();
let path = http::uri::PathAndQuery::from_static(
"/churn.v1.Churn/ListenEvents",
);
let mut req = request.into_request();
req.extensions_mut()
.insert(GrpcMethod::new("churn.v1.Churn", "ListenEvents"));
self.inner.server_streaming(req, path, codec).await
}
}
}
/// Generated server implementations.
@ -139,6 +164,19 @@ pub mod churn_server {
&self,
request: tonic::Request<super::SetKeyRequest>,
) -> std::result::Result<tonic::Response<super::SetKeyResponse>, tonic::Status>;
/// Server streaming response type for the ListenEvents method.
type ListenEventsStream: tonic::codegen::tokio_stream::Stream<
Item = std::result::Result<super::ListenEventsResponse, tonic::Status>,
>
+ Send
+ 'static;
async fn listen_events(
&self,
request: tonic::Request<super::ListenEventsRequest>,
) -> std::result::Result<
tonic::Response<Self::ListenEventsStream>,
tonic::Status,
>;
}
#[derive(Debug)]
pub struct ChurnServer<T: Churn> {
@ -307,6 +345,53 @@ pub mod churn_server {
};
Box::pin(fut)
}
"/churn.v1.Churn/ListenEvents" => {
#[allow(non_camel_case_types)]
struct ListenEventsSvc<T: Churn>(pub Arc<T>);
impl<
T: Churn,
> tonic::server::ServerStreamingService<super::ListenEventsRequest>
for ListenEventsSvc<T> {
type Response = super::ListenEventsResponse;
type ResponseStream = T::ListenEventsStream;
type Future = BoxFuture<
tonic::Response<Self::ResponseStream>,
tonic::Status,
>;
fn call(
&mut self,
request: tonic::Request<super::ListenEventsRequest>,
) -> Self::Future {
let inner = Arc::clone(&self.0);
let fut = async move {
<T as Churn>::listen_events(&inner, request).await
};
Box::pin(fut)
}
}
let accept_compression_encodings = self.accept_compression_encodings;
let send_compression_encodings = self.send_compression_encodings;
let max_decoding_message_size = self.max_decoding_message_size;
let max_encoding_message_size = self.max_encoding_message_size;
let inner = self.inner.clone();
let fut = async move {
let inner = inner.0;
let method = ListenEventsSvc(inner);
let codec = tonic::codec::ProstCodec::default();
let mut grpc = tonic::server::Grpc::new(codec)
.apply_compression_config(
accept_compression_encodings,
send_compression_encodings,
)
.apply_max_message_size_config(
max_decoding_message_size,
max_encoding_message_size,
);
let res = grpc.server_streaming(method, req).await;
Ok(res)
};
Box::pin(fut)
}
_ => {
Box::pin(async move {
Ok(

View File

@ -6,6 +6,7 @@ mod grpc {
}
mod agent;
mod server;
#[tokio::main]
async fn main() -> anyhow::Result<()> {

View File

@ -0,0 +1,23 @@
use std::net::SocketAddr;
pub mod config;
mod grpc_server;
use crate::{api, state::SharedState};
pub async fn execute(
host: impl Into<SocketAddr>,
grpc_host: impl Into<SocketAddr>,
config: config::ServerConfig,
) -> anyhow::Result<()> {
let state = SharedState::new(config).await?;
notmad::Mad::builder()
.add(api::Api::new(&state, host))
.add(grpc_server::GrpcServer::new(grpc_host.into()))
.run()
.await?;
Ok(())
}

View File

@ -0,0 +1,7 @@
#[derive(clap::Args)]
pub struct ServerConfig {
#[arg(long = "external-host", env = "EXTERNAL_HOST")]
pub external_host: String,
#[arg(long = "process-host", env = "PROCESS_HOST")]
pub process_host: String,
}

View File

@ -0,0 +1,93 @@
use std::{net::SocketAddr, pin::Pin};
use anyhow::Context;
use futures::Stream;
use notmad::{Component, MadError};
use tonic::transport::Server;
use crate::grpc::*;
#[derive(Clone)]
pub struct GrpcServer {
grpc_host: SocketAddr,
}
impl GrpcServer {
pub fn new(grpc_host: SocketAddr) -> Self {
Self { grpc_host }
}
}
#[async_trait::async_trait]
impl Component for GrpcServer {
async fn run(
&self,
cancellation_token: tokio_util::sync::CancellationToken,
) -> Result<(), notmad::MadError> {
let task = Server::builder()
.add_service(crate::grpc::churn_server::ChurnServer::new(self.clone()))
.serve(self.grpc_host);
tokio::select! {
_ = cancellation_token.cancelled() => {},
res = task => {
res.context("failed to run grpc server").map_err(MadError::Inner)?;
}
}
Ok(())
}
}
#[async_trait::async_trait]
impl crate::grpc::churn_server::Churn for GrpcServer {
async fn get_key(
&self,
request: tonic::Request<GetKeyRequest>,
) -> std::result::Result<tonic::Response<GetKeyResponse>, tonic::Status> {
todo!()
}
async fn set_key(
&self,
request: tonic::Request<SetKeyRequest>,
) -> std::result::Result<tonic::Response<SetKeyResponse>, tonic::Status> {
todo!()
}
#[doc = " Server streaming response type for the ListenEvents method."]
type ListenEventsStream =
Pin<Box<dyn Stream<Item = Result<ListenEventsResponse, tonic::Status>> + Send>>;
async fn listen_events(
&self,
request: tonic::Request<ListenEventsRequest>,
) -> std::result::Result<tonic::Response<Self::ListenEventsStream>, tonic::Status> {
let (tx, rx) = tokio::sync::mpsc::channel(128);
tokio::spawn(async move {
let mut interval = tokio::time::interval(std::time::Duration::from_secs(10));
loop {
interval.tick().await;
if let Err(e) = tx
.send(Ok(ListenEventsResponse {
value: uuid::Uuid::new_v4().to_string(),
}))
.await
{
tracing::warn!("failed to send response: {}", e);
break;
}
}
});
let stream = futures::stream::unfold(rx, |mut msg| async move {
let next = msg.recv().await?;
Some((next, msg))
});
Ok(tonic::Response::new(Box::pin(stream)))
}
}

View File

@ -1,11 +1,13 @@
use std::{ops::Deref, sync::Arc};
use crate::server::config::ServerConfig;
#[derive(Clone)]
pub struct SharedState(Arc<State>);
impl SharedState {
pub async fn new() -> anyhow::Result<Self> {
Ok(Self(Arc::new(State::new().await?)))
pub async fn new(config: ServerConfig) -> anyhow::Result<Self> {
Ok(Self(Arc::new(State::new(config).await?)))
}
}
@ -23,10 +25,12 @@ impl Deref for SharedState {
}
}
pub struct State {}
pub struct State {
pub config: ServerConfig,
}
impl State {
pub async fn new() -> anyhow::Result<Self> {
Ok(Self {})
pub async fn new(config: ServerConfig) -> anyhow::Result<Self> {
Ok(Self { config })
}
}