use std::collections::BTreeMap; use std::time::UNIX_EPOCH; use std::{net::SocketAddr, ops::Deref, sync::Arc}; use axum::http::{HeaderValue, Method, Request}; use axum::routing::get; use axum::{extract::MatchedPath, response::Html}; use axum::{Json, Router}; use clap::{Parser, Subcommand}; use rand::Rng; use serde::{Deserialize, Serialize}; use tokio::sync::RwLock; use tower_http::cors::{Any, CorsLayer}; use tower_http::trace::TraceLayer; #[derive(Parser)] #[command(author, version, about, long_about = None, subcommand_required = true)] struct Command { #[command(subcommand)] command: Option, } #[derive(Subcommand)] enum Commands { Serve { #[arg(env = "SERVICE_HOST", long, default_value = "127.0.0.1:3000")] host: SocketAddr, }, } const CATEGORIES: [&str; 6] = [ "UserOnboarded", "PaymentProcessed", "UserLogin", "UserOffboarded", "OngoingCalls", "CardProcessed", ]; #[tokio::main] async fn main() -> anyhow::Result<()> { dotenv::dotenv().ok(); tracing_subscriber::fmt::init(); let cli = Command::parse(); if let Some(Commands::Serve { host }) = cli.command { tracing::info!("Starting service"); let state = SharedState(Arc::new(State::new().await?)); notmad::Mad::builder() .add_fn({ let state = state.clone(); move |_cancel| { let state = state.clone(); async move { let app = Router::new() .route("/", get(root)) .route("/metrics", get(metrics)) .with_state(state.clone()) .layer(TraceLayer::new_for_http().make_span_with( |request: &Request<_>| { // Log the matched route's path (with placeholders not filled in). // Use request.uri() or OriginalUri if you want the real path. let matched_path = request .extensions() .get::() .map(MatchedPath::as_str); tracing::info_span!( "http_request", method = ?request.method(), matched_path, some_other_field = tracing::field::Empty, ) }, )) .layer( CorsLayer::new() .allow_origin(Any) .allow_methods([Method::GET]), ); tracing::info!("listening on {}", host); let listener = tokio::net::TcpListener::bind(host).await.unwrap(); axum::serve(listener, app.into_make_service()) .await .unwrap(); Ok(()) } } }) .add_fn({ let state = state.clone(); move |cancel| { let state = state.clone(); async move { let nodrift_cancel = nodrift::schedule(std::time::Duration::from_secs(10), { let state = state.clone(); move || { let state = state.clone(); async move { state .event_metrics .prune_old(std::time::Duration::from_secs(60 * 2)) .await; Ok(()) } } }); tokio::select! { _ = cancel.cancelled() => { nodrift_cancel.cancel(); } _ = nodrift_cancel.cancelled() => { } } Ok(()) } } }) .add_fn(move |cancel| { let state = state.clone(); async move { let nodrift_cancel = nodrift::schedule(std::time::Duration::from_millis(100), { let state = state.clone(); move || { let state = state.clone(); let mut rng = rand::thread_rng(); let category_index = rng.gen_range(0..CATEGORIES.len()); async move { state .event_metrics .push_event(Event { event_name: CATEGORIES[category_index].to_string(), timestamp: std::time::SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap() .as_secs() as usize, }) .await; Ok(()) } } }); tokio::select! { _ = cancel.cancelled() => { nodrift_cancel.cancel(); } _ = nodrift_cancel.cancelled() => { } } Ok(()) } }) .run() .await?; } Ok(()) } const INDEX: &str = include_str!("../assets/html/index.html"); async fn root() -> Html { Html(INDEX.to_string()) } #[derive(Serialize, Deserialize)] struct MetricsQuery { start: usize, end: usize, } async fn metrics( axum::extract::State(state): axum::extract::State, axum::extract::Query(query): axum::extract::Query, ) -> Json { let metrics = state .event_metrics .get_event_metrics(query.start, query.end) .await .expect("to be able to get event metrics"); Json(metrics) } #[derive(Clone)] pub struct Event { pub event_name: String, pub timestamp: usize, } #[derive(serde::Serialize, Clone)] pub struct EventMetric { pub event_name: String, pub amount: usize, } #[derive(serde::Serialize, Clone)] pub struct Metrics { pub metrics: Vec, pub since: usize, pub end: usize, } #[derive(Clone, Default)] pub struct EventMetrics { queue: Arc>>, } impl EventMetrics { pub fn new() -> Self { Self { queue: Arc::default(), } } pub async fn push_event(&self, event: Event) { let mut queue = self.queue.write().await; queue.push(event); } pub async fn prune_old(&self, cutoff: std::time::Duration) { let cutoff_time = std::time::SystemTime::now() .checked_sub(cutoff) .unwrap() .duration_since(UNIX_EPOCH) .unwrap() .as_secs() as usize; tracing::info!(before = cutoff_time, "pruning old events"); let mut queue = self.queue.write().await; let new_queue: Vec<_> = queue .iter() .filter(|&i| i.timestamp >= cutoff_time) .cloned() .collect(); tracing::info!(pruned = queue.len() - new_queue.len(), "pruned events"); *queue = new_queue } pub async fn get_event_metrics(&self, since: usize, end: usize) -> anyhow::Result { let queue = self.queue.read().await; let items = queue .iter() .filter(|i| i.timestamp >= since && i.timestamp < end) .collect::>(); let mut metrics = BTreeMap::::new(); for item in items { match metrics.get_mut(&item.event_name) { Some(metrics) => { metrics.amount += 1; } None => { metrics.insert( item.event_name.clone(), EventMetric { event_name: item.event_name.clone(), amount: 1, }, ); } } } Ok(Metrics { metrics: metrics.values().cloned().collect(), since, end, }) } } #[derive(Clone)] pub struct SharedState(Arc); impl Deref for SharedState { type Target = Arc; fn deref(&self) -> &Self::Target { &self.0 } } pub struct State { event_metrics: EventMetrics, } impl State { pub async fn new() -> anyhow::Result { Ok(Self { event_metrics: EventMetrics::new(), }) } }