Compare commits
No commits in common. "aee6ba54c638a704362b47e335eb62a111b55752" and "a023bf223e606b60988ac4bde0ebece84245dc79" have entirely different histories.
aee6ba54c6
...
a023bf223e
551
Cargo.lock
generated
551
Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
@ -3,6 +3,7 @@ members = ["crates/*"]
|
||||
resolver = "2"
|
||||
|
||||
[workspace.dependencies]
|
||||
iamvisual = { path = "crates/iamvisual" }
|
||||
|
||||
anyhow = { version = "1" }
|
||||
tokio = { version = "1", features = ["full"] }
|
||||
|
@ -15,6 +15,3 @@ axum.workspace = true
|
||||
serde = { version = "1.0.197", features = ["derive"] }
|
||||
uuid = { version = "1.7.0", features = ["v4"] }
|
||||
tower-http = { version = "0.6.0", features = ["cors", "trace"] }
|
||||
notmad = "0.5.0"
|
||||
nodrift = "0.2.0"
|
||||
rand = "0.8.5"
|
||||
|
@ -57,28 +57,6 @@
|
||||
<script>
|
||||
const horizontal = true;
|
||||
const categoryAmount = 10;
|
||||
const intervalTime = 500
|
||||
|
||||
let categoryQueue = [];
|
||||
let intervalNow = Math.floor(Date.now() / 1000) - 1;
|
||||
setInterval(() => {
|
||||
const unixTimestampNow = Math.floor(Date.now() / 1000);
|
||||
let resp = fetch("https://iamvisual.prod.kjuulh.app/metrics?start=" + intervalNow + "&end=" + unixTimestampNow)
|
||||
.then((resp) => {
|
||||
if (resp.ok) {
|
||||
return resp.json()
|
||||
} else {
|
||||
throw new Error("failed to get response")
|
||||
}
|
||||
}).then((json) => {
|
||||
categoryQueue = [
|
||||
...categoryQueue,
|
||||
json
|
||||
]
|
||||
console.log("received category")
|
||||
});
|
||||
intervalNow = unixTimestampNow;
|
||||
}, intervalTime);
|
||||
|
||||
const parentWidth = document.querySelector('#chart').parentElement.offsetWidth;
|
||||
|
||||
@ -208,69 +186,26 @@
|
||||
// Real-time simulation
|
||||
let allData = [];
|
||||
const scrollingSpeed = 20; // Pixels per second
|
||||
let maxIntensity = 1;
|
||||
|
||||
setInterval(() => {
|
||||
let newMax = 1;
|
||||
allData.map(c => {
|
||||
if (c.amount > newMax) {
|
||||
if (newMax < maxIntensity) {
|
||||
maxIntensity = newMax;
|
||||
}
|
||||
}
|
||||
})
|
||||
}, 10000)
|
||||
|
||||
function generateData() {
|
||||
const item = categoryQueue.pop();
|
||||
if (item == undefined) {
|
||||
return
|
||||
}
|
||||
|
||||
const newData = item.metrics.map((c, i) => {
|
||||
if (c.amount > maxIntensity) {
|
||||
maxIntensity = c.amount;
|
||||
}
|
||||
if (c.amount == 0) {
|
||||
return null
|
||||
}
|
||||
const newIntensity = c.amount / maxIntensity
|
||||
console.log(maxIntensity);
|
||||
const smoothIntensity = d3.interpolate(lastIntensity.get(i), newIntensity)(0.5); // Smooth transition
|
||||
lastIntensity.set(c.event_name, smoothIntensity); // Update last intensity
|
||||
// Simulate sporadic events with intensity
|
||||
const newData = categories.map(c => {
|
||||
if (Math.random() < 0.7) return null; // 70% chance no data for this category
|
||||
const newIntensity = Math.random(); // Random intensity
|
||||
const smoothIntensity = d3.interpolate(lastIntensity.get(c.id), newIntensity)(0.5); // Smooth transition
|
||||
lastIntensity.set(c.id, smoothIntensity); // Update last intensity
|
||||
return {
|
||||
id: `${Date.now()}-${c.event_name}`,
|
||||
category: i,
|
||||
id: `${Date.now()}-${c.id}`,
|
||||
category: c.id,
|
||||
timestamp: Date.now(),
|
||||
intensity: smoothIntensity,
|
||||
color: categories[i].color,
|
||||
amount: c.amount,
|
||||
}
|
||||
}).filter(Boolean);
|
||||
};
|
||||
}).filter(Boolean); // Remove null values
|
||||
|
||||
|
||||
|
||||
allData = [...allData, ...newData].filter(d => new Date(d.timestamp) >= new Date(Date.now() - 600000))
|
||||
return
|
||||
|
||||
// Simulate sporadic events with intensity
|
||||
//const newData = categories.map(c => {
|
||||
// if (Math.random() < 0.7) return null; // 70% chance no data for this category
|
||||
// const newIntensity = Math.random(); // Random intensity
|
||||
// const smoothIntensity = d3.interpolate(lastIntensity.get(c.id), newIntensity)(0.5); // Smooth transition
|
||||
// lastIntensity.set(c.id, smoothIntensity); // Update last intensity
|
||||
// return {
|
||||
// id: `${Date.now()}-${c.id}`,
|
||||
// category: c.id,
|
||||
// timestamp: Date.now(),
|
||||
// intensity: smoothIntensity,
|
||||
// };
|
||||
//}).filter(Boolean); // Remove null values
|
||||
|
||||
//// Append new data and remove older ones beyond the last 60 seconds
|
||||
//allData = [...allData, ...newData].filter(d =>
|
||||
// new Date(d.timestamp) >= new Date(Date.now() - 60000)
|
||||
//);
|
||||
// Append new data and remove older ones beyond the last 60 seconds
|
||||
allData = [...allData, ...newData].filter(d =>
|
||||
new Date(d.timestamp) >= new Date(Date.now() - 60000)
|
||||
);
|
||||
}
|
||||
|
||||
// Continuous scroll
|
||||
@ -297,7 +232,7 @@
|
||||
|
||||
// Update positions of all heatmap cells
|
||||
updateHeatmap(allData);
|
||||
}, 150);
|
||||
});
|
||||
</script>
|
||||
</body>
|
||||
</html>
|
||||
|
@ -1,16 +1,11 @@
|
||||
use std::collections::BTreeMap;
|
||||
use std::time::UNIX_EPOCH;
|
||||
use std::{net::SocketAddr, ops::Deref, sync::Arc};
|
||||
|
||||
use axum::http::{HeaderValue, Method, Request};
|
||||
use anyhow::Context;
|
||||
use axum::http::Request;
|
||||
use axum::routing::get;
|
||||
use axum::Router;
|
||||
use axum::{extract::MatchedPath, response::Html};
|
||||
use axum::{Json, Router};
|
||||
use clap::{Parser, Subcommand};
|
||||
use rand::Rng;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tokio::sync::RwLock;
|
||||
use tower_http::cors::{Any, CorsLayer};
|
||||
use tower_http::trace::TraceLayer;
|
||||
|
||||
#[derive(Parser)]
|
||||
@ -28,15 +23,6 @@ enum Commands {
|
||||
},
|
||||
}
|
||||
|
||||
const CATEGORIES: [&str; 6] = [
|
||||
"UserOnboarded",
|
||||
"PaymentProcessed",
|
||||
"UserLogin",
|
||||
"UserOffboarded",
|
||||
"OngoingCalls",
|
||||
"CardProcessed",
|
||||
];
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
dotenv::dotenv().ok();
|
||||
@ -49,19 +35,11 @@ async fn main() -> anyhow::Result<()> {
|
||||
|
||||
let state = SharedState(Arc::new(State::new().await?));
|
||||
|
||||
notmad::Mad::builder()
|
||||
.add_fn({
|
||||
let state = state.clone();
|
||||
move |_cancel| {
|
||||
let state = state.clone();
|
||||
|
||||
async move {
|
||||
let app = Router::new()
|
||||
.route("/", get(root))
|
||||
.route("/metrics", get(metrics))
|
||||
.with_state(state.clone())
|
||||
.layer(TraceLayer::new_for_http().make_span_with(
|
||||
|request: &Request<_>| {
|
||||
.layer(
|
||||
TraceLayer::new_for_http().make_span_with(|request: &Request<_>| {
|
||||
// Log the matched route's path (with placeholders not filled in).
|
||||
// Use request.uri() or OriginalUri if you want the real path.
|
||||
let matched_path = request
|
||||
@ -75,104 +53,14 @@ async fn main() -> anyhow::Result<()> {
|
||||
matched_path,
|
||||
some_other_field = tracing::field::Empty,
|
||||
)
|
||||
},
|
||||
))
|
||||
.layer(
|
||||
CorsLayer::new()
|
||||
.allow_origin(Any)
|
||||
.allow_methods([Method::GET]),
|
||||
}), // ...
|
||||
);
|
||||
|
||||
tracing::info!("listening on {}", host);
|
||||
|
||||
let listener = tokio::net::TcpListener::bind(host).await.unwrap();
|
||||
axum::serve(listener, app.into_make_service())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
})
|
||||
.add_fn({
|
||||
let state = state.clone();
|
||||
|
||||
move |cancel| {
|
||||
let state = state.clone();
|
||||
|
||||
async move {
|
||||
let nodrift_cancel =
|
||||
nodrift::schedule(std::time::Duration::from_secs(10), {
|
||||
let state = state.clone();
|
||||
|
||||
move || {
|
||||
let state = state.clone();
|
||||
|
||||
async move {
|
||||
state
|
||||
.event_metrics
|
||||
.prune_old(std::time::Duration::from_secs(60 * 2))
|
||||
.await;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
tokio::select! {
|
||||
_ = cancel.cancelled() => {
|
||||
nodrift_cancel.cancel();
|
||||
}
|
||||
_ = nodrift_cancel.cancelled() => {
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
})
|
||||
.add_fn(move |cancel| {
|
||||
let state = state.clone();
|
||||
async move {
|
||||
let nodrift_cancel =
|
||||
nodrift::schedule(std::time::Duration::from_millis(100), {
|
||||
let state = state.clone();
|
||||
move || {
|
||||
let state = state.clone();
|
||||
let mut rng = rand::thread_rng();
|
||||
let category_index = rng.gen_range(0..CATEGORIES.len());
|
||||
|
||||
async move {
|
||||
state
|
||||
.event_metrics
|
||||
.push_event(Event {
|
||||
event_name: CATEGORIES[category_index].to_string(),
|
||||
timestamp: std::time::SystemTime::now()
|
||||
.duration_since(UNIX_EPOCH)
|
||||
.unwrap()
|
||||
.as_secs()
|
||||
as usize,
|
||||
})
|
||||
.await;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
tokio::select! {
|
||||
_ = cancel.cancelled() => {
|
||||
nodrift_cancel.cancel();
|
||||
}
|
||||
_ = nodrift_cancel.cancelled() => {
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
})
|
||||
.run()
|
||||
.await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
@ -184,116 +72,6 @@ async fn root() -> Html<String> {
|
||||
Html(INDEX.to_string())
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
struct MetricsQuery {
|
||||
start: usize,
|
||||
end: usize,
|
||||
}
|
||||
|
||||
async fn metrics(
|
||||
axum::extract::State(state): axum::extract::State<SharedState>,
|
||||
axum::extract::Query(query): axum::extract::Query<MetricsQuery>,
|
||||
) -> Json<Metrics> {
|
||||
let metrics = state
|
||||
.event_metrics
|
||||
.get_event_metrics(query.start, query.end)
|
||||
.await
|
||||
.expect("to be able to get event metrics");
|
||||
|
||||
Json(metrics)
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct Event {
|
||||
pub event_name: String,
|
||||
pub timestamp: usize,
|
||||
}
|
||||
|
||||
#[derive(serde::Serialize, Clone)]
|
||||
pub struct EventMetric {
|
||||
pub event_name: String,
|
||||
pub amount: usize,
|
||||
}
|
||||
|
||||
#[derive(serde::Serialize, Clone)]
|
||||
pub struct Metrics {
|
||||
pub metrics: Vec<EventMetric>,
|
||||
pub since: usize,
|
||||
pub end: usize,
|
||||
}
|
||||
|
||||
#[derive(Clone, Default)]
|
||||
pub struct EventMetrics {
|
||||
queue: Arc<RwLock<Vec<Event>>>,
|
||||
}
|
||||
|
||||
impl EventMetrics {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
queue: Arc::default(),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn push_event(&self, event: Event) {
|
||||
let mut queue = self.queue.write().await;
|
||||
queue.push(event);
|
||||
}
|
||||
|
||||
pub async fn prune_old(&self, cutoff: std::time::Duration) {
|
||||
let cutoff_time = std::time::SystemTime::now()
|
||||
.checked_sub(cutoff)
|
||||
.unwrap()
|
||||
.duration_since(UNIX_EPOCH)
|
||||
.unwrap()
|
||||
.as_secs() as usize;
|
||||
|
||||
tracing::info!(before = cutoff_time, "pruning old events");
|
||||
|
||||
let mut queue = self.queue.write().await;
|
||||
let new_queue: Vec<_> = queue
|
||||
.iter()
|
||||
.filter(|&i| i.timestamp >= cutoff_time)
|
||||
.cloned()
|
||||
.collect();
|
||||
|
||||
tracing::info!(pruned = queue.len() - new_queue.len(), "pruned events");
|
||||
*queue = new_queue
|
||||
}
|
||||
|
||||
pub async fn get_event_metrics(&self, since: usize, end: usize) -> anyhow::Result<Metrics> {
|
||||
let queue = self.queue.read().await;
|
||||
|
||||
let items = queue
|
||||
.iter()
|
||||
.filter(|i| i.timestamp >= since && i.timestamp < end)
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let mut metrics = BTreeMap::<String, EventMetric>::new();
|
||||
for item in items {
|
||||
match metrics.get_mut(&item.event_name) {
|
||||
Some(metrics) => {
|
||||
metrics.amount += 1;
|
||||
}
|
||||
None => {
|
||||
metrics.insert(
|
||||
item.event_name.clone(),
|
||||
EventMetric {
|
||||
event_name: item.event_name.clone(),
|
||||
amount: 1,
|
||||
},
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(Metrics {
|
||||
metrics: metrics.values().cloned().collect(),
|
||||
since,
|
||||
end,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct SharedState(Arc<State>);
|
||||
|
||||
@ -305,14 +83,10 @@ impl Deref for SharedState {
|
||||
}
|
||||
}
|
||||
|
||||
pub struct State {
|
||||
event_metrics: EventMetrics,
|
||||
}
|
||||
pub struct State {}
|
||||
|
||||
impl State {
|
||||
pub async fn new() -> anyhow::Result<Self> {
|
||||
Ok(Self {
|
||||
event_metrics: EventMetrics::new(),
|
||||
})
|
||||
Ok(Self {})
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user