churn-v2/crates/churn/src/server/grpc_server.rs
kjuulh db4cc98643
All checks were successful
continuous-integration/drone/push Build is passing
feat: update with web assembly components
Signed-off-by: kjuulh <contact@kjuulh.io>
2024-12-01 22:21:17 +01:00

103 lines
2.9 KiB
Rust

use std::{collections::BTreeMap, net::SocketAddr, pin::Pin};
use anyhow::Context;
use futures::Stream;
use notmad::{Component, MadError};
use tonic::transport::Server;
use crate::{agent::models::Commands, grpc::*};
#[derive(Clone)]
pub struct GrpcServer {
grpc_host: SocketAddr,
}
impl GrpcServer {
pub fn new(grpc_host: SocketAddr) -> Self {
Self { grpc_host }
}
}
#[async_trait::async_trait]
impl Component for GrpcServer {
async fn run(
&self,
cancellation_token: tokio_util::sync::CancellationToken,
) -> Result<(), notmad::MadError> {
let task = Server::builder()
.add_service(crate::grpc::churn_server::ChurnServer::new(self.clone()))
.serve(self.grpc_host);
tokio::select! {
_ = cancellation_token.cancelled() => {},
res = task => {
res.context("failed to run grpc server").map_err(MadError::Inner)?;
}
}
Ok(())
}
}
#[async_trait::async_trait]
impl crate::grpc::churn_server::Churn for GrpcServer {
async fn get_key(
&self,
request: tonic::Request<GetKeyRequest>,
) -> std::result::Result<tonic::Response<GetKeyResponse>, tonic::Status> {
todo!()
}
async fn set_key(
&self,
request: tonic::Request<SetKeyRequest>,
) -> std::result::Result<tonic::Response<SetKeyResponse>, tonic::Status> {
todo!()
}
#[doc = " Server streaming response type for the ListenEvents method."]
type ListenEventsStream =
Pin<Box<dyn Stream<Item = Result<ListenEventsResponse, tonic::Status>> + Send>>;
async fn listen_events(
&self,
request: tonic::Request<ListenEventsRequest>,
) -> std::result::Result<tonic::Response<Self::ListenEventsStream>, tonic::Status> {
let (tx, rx) = tokio::sync::mpsc::channel(128);
tokio::spawn(async move {
let mut interval = tokio::time::interval(std::time::Duration::from_secs(60 * 10));
loop {
interval.tick().await;
let Ok(schedule_task) = serde_json::to_string(&Commands::ScheduleTask {
task: "refresh".into(),
properties: BTreeMap::default(),
}) else {
tracing::warn!("failed to serialize event");
continue;
};
if let Err(e) = tx
.send(Ok(ListenEventsResponse {
id: uuid::Uuid::new_v4().to_string(),
value: schedule_task,
}))
.await
{
tracing::warn!("failed to send response: {}", e);
break;
}
}
});
let stream = futures::stream::unfold(rx, |mut msg| async move {
let next = msg.recv().await?;
Some((next, msg))
});
Ok(tonic::Response::new(Box::pin(stream)))
}
}