feat: add server implementation

Signed-off-by: kjuulh <contact@kjuulh.io>
This commit is contained in:
Kasper Juul Hermansen 2024-11-22 12:56:28 +01:00
parent a023bf223e
commit 463e49ecf3
Signed by: kjuulh
GPG Key ID: D85D7535F18F35FA
5 changed files with 660 additions and 293 deletions

571
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -3,7 +3,6 @@ members = ["crates/*"]
resolver = "2"
[workspace.dependencies]
iamvisual = { path = "crates/iamvisual" }
anyhow = { version = "1" }
tokio = { version = "1", features = ["full"] }

View File

@ -15,3 +15,6 @@ axum.workspace = true
serde = { version = "1.0.197", features = ["derive"] }
uuid = { version = "1.7.0", features = ["v4"] }
tower-http = { version = "0.6.0", features = ["cors", "trace"] }
notmad = "0.5.0"
nodrift = "0.2.0"
rand = "0.8.5"

View File

@ -57,6 +57,28 @@
<script>
const horizontal = true;
const categoryAmount = 10;
const intervalTime = 500
let categoryQueue = [];
let intervalNow = Math.floor(Date.now() / 1000) - 1;
setInterval(() => {
const unixTimestampNow = Math.floor(Date.now() / 1000);
let resp = fetch("http://localhost:3000/metrics?start=" + intervalNow + "&end=" + unixTimestampNow)
.then((resp) => {
if (resp.ok) {
return resp.json()
} else {
throw new Error("failed to get response")
}
}).then((json) => {
categoryQueue = [
...categoryQueue,
json
]
console.log("received category")
});
intervalNow = unixTimestampNow;
}, intervalTime);
const parentWidth = document.querySelector('#chart').parentElement.offsetWidth;
@ -186,26 +208,69 @@
// Real-time simulation
let allData = [];
const scrollingSpeed = 20; // Pixels per second
let maxIntensity = 1;
setInterval(() => {
let newMax = 1;
allData.map(c => {
if (c.amount > newMax) {
if (newMax < maxIntensity) {
maxIntensity = newMax;
}
}
})
}, 10000)
function generateData() {
// Simulate sporadic events with intensity
const newData = categories.map(c => {
if (Math.random() < 0.7) return null; // 70% chance no data for this category
const newIntensity = Math.random(); // Random intensity
const smoothIntensity = d3.interpolate(lastIntensity.get(c.id), newIntensity)(0.5); // Smooth transition
lastIntensity.set(c.id, smoothIntensity); // Update last intensity
const item = categoryQueue.pop();
if (item == undefined) {
return
}
const newData = item.metrics.map((c, i) => {
if (c.amount > maxIntensity) {
maxIntensity = c.amount;
}
if (c.amount == 0) {
return null
}
const newIntensity = c.amount / maxIntensity
console.log(maxIntensity);
const smoothIntensity = d3.interpolate(lastIntensity.get(i), newIntensity)(0.5); // Smooth transition
lastIntensity.set(c.event_name, smoothIntensity); // Update last intensity
return {
id: `${Date.now()}-${c.id}`,
category: c.id,
id: `${Date.now()}-${c.event_name}`,
category: i,
timestamp: Date.now(),
intensity: smoothIntensity,
};
}).filter(Boolean); // Remove null values
color: categories[i].color,
amount: c.amount,
}
}).filter(Boolean);
// Append new data and remove older ones beyond the last 60 seconds
allData = [...allData, ...newData].filter(d =>
new Date(d.timestamp) >= new Date(Date.now() - 60000)
);
allData = [...allData, ...newData].filter(d => new Date(d.timestamp) >= new Date(Date.now() - 600000))
return
// Simulate sporadic events with intensity
//const newData = categories.map(c => {
// if (Math.random() < 0.7) return null; // 70% chance no data for this category
// const newIntensity = Math.random(); // Random intensity
// const smoothIntensity = d3.interpolate(lastIntensity.get(c.id), newIntensity)(0.5); // Smooth transition
// lastIntensity.set(c.id, smoothIntensity); // Update last intensity
// return {
// id: `${Date.now()}-${c.id}`,
// category: c.id,
// timestamp: Date.now(),
// intensity: smoothIntensity,
// };
//}).filter(Boolean); // Remove null values
//// Append new data and remove older ones beyond the last 60 seconds
//allData = [...allData, ...newData].filter(d =>
// new Date(d.timestamp) >= new Date(Date.now() - 60000)
//);
}
// Continuous scroll
@ -232,7 +297,7 @@
// Update positions of all heatmap cells
updateHeatmap(allData);
});
}, 150);
</script>
</body>
</html>

View File

@ -1,11 +1,16 @@
use std::collections::BTreeMap;
use std::time::UNIX_EPOCH;
use std::{net::SocketAddr, ops::Deref, sync::Arc};
use anyhow::Context;
use axum::http::Request;
use axum::http::{HeaderValue, Method, Request};
use axum::routing::get;
use axum::Router;
use axum::{extract::MatchedPath, response::Html};
use axum::{Json, Router};
use clap::{Parser, Subcommand};
use rand::Rng;
use serde::{Deserialize, Serialize};
use tokio::sync::RwLock;
use tower_http::cors::{Any, CorsLayer};
use tower_http::trace::TraceLayer;
#[derive(Parser)]
@ -23,6 +28,15 @@ enum Commands {
},
}
const CATEGORIES: [&str; 6] = [
"UserOnboarded",
"PaymentProcessed",
"UserLogin",
"UserOffboarded",
"OngoingCalls",
"CardProcessed",
];
#[tokio::main]
async fn main() -> anyhow::Result<()> {
dotenv::dotenv().ok();
@ -35,11 +49,19 @@ async fn main() -> anyhow::Result<()> {
let state = SharedState(Arc::new(State::new().await?));
notmad::Mad::builder()
.add_fn({
let state = state.clone();
move |_cancel| {
let state = state.clone();
async move {
let app = Router::new()
.route("/", get(root))
.route("/metrics", get(metrics))
.with_state(state.clone())
.layer(
TraceLayer::new_for_http().make_span_with(|request: &Request<_>| {
.layer(TraceLayer::new_for_http().make_span_with(
|request: &Request<_>| {
// Log the matched route's path (with placeholders not filled in).
// Use request.uri() or OriginalUri if you want the real path.
let matched_path = request
@ -53,14 +75,103 @@ async fn main() -> anyhow::Result<()> {
matched_path,
some_other_field = tracing::field::Empty,
)
}), // ...
},
))
.layer(
CorsLayer::new()
.allow_origin(Any)
.allow_methods([Method::GET]),
);
tracing::info!("listening on {}", host);
let listener = tokio::net::TcpListener::bind(host).await.unwrap();
axum::serve(listener, app.into_make_service())
.await
.unwrap();
Ok(())
}
}
})
.add_fn({
let state = state.clone();
move |cancel| {
let state = state.clone();
async move {
let nodrift_cancel =
nodrift::schedule(std::time::Duration::from_secs(10), {
let state = state.clone();
move || {
let state = state.clone();
async move {
state
.event_metrics
.prune_old(std::time::Duration::from_secs(60 * 2))
.await;
Ok(())
}
}
});
tokio::select! {
_ = cancel.cancelled() => {
nodrift_cancel.cancel();
}
_ = nodrift_cancel.cancelled() => {
}
}
Ok(())
}
}
})
.add_fn(move |cancel| {
let state = state.clone();
async move {
let nodrift_cancel = nodrift::schedule(std::time::Duration::from_millis(1), {
let state = state.clone();
move || {
let state = state.clone();
let mut rng = rand::thread_rng();
let category_index = rng.gen_range(0..CATEGORIES.len());
async move {
state
.event_metrics
.push_event(Event {
event_name: CATEGORIES[category_index].to_string(),
timestamp: std::time::SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs()
as usize,
})
.await;
Ok(())
}
}
});
tokio::select! {
_ = cancel.cancelled() => {
nodrift_cancel.cancel();
}
_ = nodrift_cancel.cancelled() => {
}
}
Ok(())
}
})
.run()
.await?;
}
Ok(())
@ -72,6 +183,116 @@ async fn root() -> Html<String> {
Html(INDEX.to_string())
}
#[derive(Serialize, Deserialize)]
struct MetricsQuery {
start: usize,
end: usize,
}
async fn metrics(
axum::extract::State(state): axum::extract::State<SharedState>,
axum::extract::Query(query): axum::extract::Query<MetricsQuery>,
) -> Json<Metrics> {
let metrics = state
.event_metrics
.get_event_metrics(query.start, query.end)
.await
.expect("to be able to get event metrics");
Json(metrics)
}
#[derive(Clone)]
pub struct Event {
pub event_name: String,
pub timestamp: usize,
}
#[derive(serde::Serialize, Clone)]
pub struct EventMetric {
pub event_name: String,
pub amount: usize,
}
#[derive(serde::Serialize, Clone)]
pub struct Metrics {
pub metrics: Vec<EventMetric>,
pub since: usize,
pub end: usize,
}
#[derive(Clone, Default)]
pub struct EventMetrics {
queue: Arc<RwLock<Vec<Event>>>,
}
impl EventMetrics {
pub fn new() -> Self {
Self {
queue: Arc::default(),
}
}
pub async fn push_event(&self, event: Event) {
let mut queue = self.queue.write().await;
queue.push(event);
}
pub async fn prune_old(&self, cutoff: std::time::Duration) {
let cutoff_time = std::time::SystemTime::now()
.checked_sub(cutoff)
.unwrap()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs() as usize;
tracing::info!(before = cutoff_time, "pruning old events");
let mut queue = self.queue.write().await;
let new_queue: Vec<_> = queue
.iter()
.filter(|&i| i.timestamp >= cutoff_time)
.cloned()
.collect();
tracing::info!(pruned = queue.len() - new_queue.len(), "pruned events");
*queue = new_queue
}
pub async fn get_event_metrics(&self, since: usize, end: usize) -> anyhow::Result<Metrics> {
let queue = self.queue.read().await;
let items = queue
.iter()
.filter(|i| i.timestamp >= since && i.timestamp < end)
.collect::<Vec<_>>();
let mut metrics = BTreeMap::<String, EventMetric>::new();
for item in items {
match metrics.get_mut(&item.event_name) {
Some(metrics) => {
metrics.amount += 1;
}
None => {
metrics.insert(
item.event_name.clone(),
EventMetric {
event_name: item.event_name.clone(),
amount: 1,
},
);
}
}
}
Ok(Metrics {
metrics: metrics.values().cloned().collect(),
since,
end,
})
}
}
#[derive(Clone)]
pub struct SharedState(Arc<State>);
@ -83,10 +304,14 @@ impl Deref for SharedState {
}
}
pub struct State {}
pub struct State {
event_metrics: EventMetrics,
}
impl State {
pub async fn new() -> anyhow::Result<Self> {
Ok(Self {})
Ok(Self {
event_metrics: EventMetrics::new(),
})
}
}