feat: add prometheus and protobuf messages

Signed-off-by: kjuulh <contact@kjuulh.io>
This commit is contained in:
Kasper Juul Hermansen 2024-11-10 13:42:19 +01:00
parent 0e03a8b9ac
commit a91846398c
No known key found for this signature in database
16 changed files with 359 additions and 96 deletions

25
Cargo.lock generated
View File

@ -1627,6 +1627,7 @@ dependencies = [
"drift", "drift",
"nodata-storage", "nodata-storage",
"notmad", "notmad",
"prometheus",
"prost", "prost",
"prost-types", "prost-types",
"rand", "rand",
@ -1661,7 +1662,10 @@ name = "nodata-storage"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"bytes",
"hex", "hex",
"prost",
"prost-types",
"sha2", "sha2",
"tokio", "tokio",
"tracing", "tracing",
@ -1922,6 +1926,21 @@ dependencies = [
"unicode-ident", "unicode-ident",
] ]
[[package]]
name = "prometheus"
version = "0.13.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3d33c28a30771f7f96db69893f78b857f7450d7e0237e9c8fc6427a81bae7ed1"
dependencies = [
"cfg-if",
"fnv",
"lazy_static",
"memchr",
"parking_lot",
"protobuf",
"thiserror",
]
[[package]] [[package]]
name = "prost" name = "prost"
version = "0.13.3" version = "0.13.3"
@ -1954,6 +1973,12 @@ dependencies = [
"prost", "prost",
] ]
[[package]]
name = "protobuf"
version = "2.28.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "106dd99e98437432fed6519dedecfade6a06a73bb7b2a1e019fdd2bee5778d94"
[[package]] [[package]]
name = "quote" name = "quote"
version = "1.0.37" version = "1.0.37"

View File

@ -13,5 +13,8 @@ plugins:
- remote: buf.build/community/neoeinstein-tonic:v0.4.0 - remote: buf.build/community/neoeinstein-tonic:v0.4.0
out: crates/nodata-sdk/src/gen out: crates/nodata-sdk/src/gen
- remote: buf.build/community/neoeinstein-prost
out: crates/nodata-storage/src/gen
inputs: inputs:
- directory: proto - directory: proto

View File

@ -52,5 +52,11 @@ pub struct PingRequest {
#[derive(Clone, Copy, PartialEq, ::prost::Message)] #[derive(Clone, Copy, PartialEq, ::prost::Message)]
pub struct PingResponse { pub struct PingResponse {
} }
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct Log {
#[prost(bytes="vec", repeated, tag="1")]
pub messages: ::prost::alloc::vec::Vec<::prost::alloc::vec::Vec<u8>>,
}
include!("nodata.v1.tonic.rs"); include!("nodata.v1.tonic.rs");
// @@protoc_insertion_point(module) // @@protoc_insertion_point(module)

View File

@ -8,6 +8,9 @@ anyhow.workspace = true
tokio.workspace = true tokio.workspace = true
uuid.workspace = true uuid.workspace = true
tracing.workspace = true tracing.workspace = true
prost.workspace = true
prost-types.workspace = true
bytes.workspace = true
hex = "0.4.3" hex = "0.4.3"
sha2 = "0.10.8" sha2 = "0.10.8"

View File

@ -0,0 +1,84 @@
use std::{
env::temp_dir,
path::{Path, PathBuf},
time::{SystemTime, UNIX_EPOCH},
};
use anyhow::Context;
use tokio::io::AsyncWriteExt;
pub struct StorageBackend {
location: PathBuf,
}
impl StorageBackend {
pub fn new(location: &Path) -> Self {
Self {
location: location.into(),
}
}
pub fn temp() -> Self {
Self::new(&temp_dir().join("nodata"))
}
pub async fn flush_segment(&self, topic: &str, buffer: &[u8]) -> anyhow::Result<String> {
let segment_key = uuid::Uuid::now_v7();
let segment_path = PathBuf::from("logs")
.join(topic)
.join(segment_key.to_string());
tracing::trace!("writing segment file: {}", segment_path.display());
let file_location = self.location.join(&segment_path);
if let Some(parent) = file_location.parent() {
tokio::fs::create_dir_all(parent)
.await
.context("failed to create storage backend dir")?;
}
let mut segment_file = tokio::fs::File::create(&file_location).await?;
segment_file.write_all(buffer).await?;
segment_file.flush().await?;
Ok(segment_key.to_string())
}
pub async fn append_index(
&self,
topic: &str,
segment_file: &str,
time: SystemTime,
) -> anyhow::Result<()> {
let index_path = PathBuf::from("indexes").join(topic);
tracing::trace!("writing index file: {}", index_path.display());
let file_location = self.location.join(&index_path);
if let Some(parent) = file_location.parent() {
tokio::fs::create_dir_all(parent)
.await
.context("failed to create storage backend dir, index")?;
}
if !file_location.exists() {
tokio::fs::File::create(&file_location).await?;
}
let mut index_file = tokio::fs::File::options()
.append(true)
.open(&file_location)
.await?;
index_file
.write_all(
format!(
"{},{}\n",
time.duration_since(UNIX_EPOCH)
.expect("to be able to get time")
.as_secs(),
segment_file
)
.as_bytes(),
)
.await?;
index_file.flush().await?;
Ok(())
}
}

View File

@ -0,0 +1,61 @@
// @generated
// This file is @generated by prost-build.
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct PublishEventRequest {
#[prost(string, tag="1")]
pub topic: ::prost::alloc::string::String,
#[prost(bytes="vec", tag="2")]
pub value: ::prost::alloc::vec::Vec<u8>,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, Copy, PartialEq, ::prost::Message)]
pub struct PublishEventResponse {
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, Copy, PartialEq, ::prost::Message)]
pub struct GetTopicsRequest {
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct GetTopicsResponse {
#[prost(string, repeated, tag="1")]
pub topics: ::prost::alloc::vec::Vec<::prost::alloc::string::String>,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct SubscribeRequest {
#[prost(string, tag="1")]
pub topic: ::prost::alloc::string::String,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct SubscribeResponse {
#[prost(message, optional, tag="2")]
pub published: ::core::option::Option<::prost_types::Timestamp>,
#[prost(bytes="vec", tag="4")]
pub value: ::prost::alloc::vec::Vec<u8>,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, Copy, PartialEq, ::prost::Message)]
pub struct HandleMsgRequest {
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, Copy, PartialEq, ::prost::Message)]
pub struct HandleMsgResponse {
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, Copy, PartialEq, ::prost::Message)]
pub struct PingRequest {
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, Copy, PartialEq, ::prost::Message)]
pub struct PingResponse {
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct Log {
#[prost(bytes="vec", repeated, tag="1")]
pub messages: ::prost::alloc::vec::Vec<::prost::alloc::vec::Vec<u8>>,
}
// @@protoc_insertion_point(module)

View File

@ -9,103 +9,21 @@ use std::{collections::BTreeMap, sync::Arc, time::SystemTime};
use anyhow::Context; use anyhow::Context;
use backend::StorageBackend; use backend::StorageBackend;
use proto::ProtoStorage;
use sha2::{Digest, Sha256}; use sha2::{Digest, Sha256};
use tokio::sync::Mutex; use tokio::sync::Mutex;
type TopicHashKey = String; type TopicHashKey = String;
pub mod backend { pub mod backend;
use std::{
env::temp_dir,
path::{Path, PathBuf},
time::{SystemTime, UNIX_EPOCH},
};
use anyhow::Context;
use tokio::io::AsyncWriteExt;
pub struct StorageBackend {
location: PathBuf,
}
impl StorageBackend {
pub fn new(location: &Path) -> Self {
Self {
location: location.into(),
}
}
pub fn temp() -> Self {
Self::new(&temp_dir().join("nodata"))
}
pub async fn flush_segment(&self, topic: &str, buffer: &[u8]) -> anyhow::Result<String> {
let segment_key = uuid::Uuid::now_v7();
let segment_path = PathBuf::from("logs")
.join(topic)
.join(segment_key.to_string());
tracing::trace!("writing segment file: {}", segment_path.display());
let file_location = self.location.join(&segment_path);
if let Some(parent) = file_location.parent() {
tokio::fs::create_dir_all(parent)
.await
.context("failed to create storage backend dir")?;
}
let mut segment_file = tokio::fs::File::create(&file_location).await?;
segment_file.write_all(buffer).await?;
segment_file.flush().await?;
Ok(segment_key.to_string())
}
pub async fn append_index(
&self,
topic: &str,
segment_file: &str,
time: SystemTime,
) -> anyhow::Result<()> {
let index_path = PathBuf::from("indexes").join(topic);
tracing::trace!("writing index file: {}", index_path.display());
let file_location = self.location.join(&index_path);
if let Some(parent) = file_location.parent() {
tokio::fs::create_dir_all(parent)
.await
.context("failed to create storage backend dir, index")?;
}
if !file_location.exists() {
tokio::fs::File::create(&file_location).await?;
}
let mut index_file = tokio::fs::File::options()
.append(true)
.open(&file_location)
.await?;
index_file
.write_all(
format!(
"{},{}\n",
time.duration_since(UNIX_EPOCH)
.expect("to be able to get time")
.as_secs(),
segment_file
)
.as_bytes(),
)
.await?;
index_file.flush().await?;
Ok(())
}
}
}
#[derive(Clone)] #[derive(Clone)]
pub struct Storage { pub struct Storage {
segment_size_bytes: usize, segment_size_bytes: usize,
buffer: Arc<Mutex<BTreeMap<TopicHashKey, Vec<u8>>>>, buffer: Arc<Mutex<BTreeMap<TopicHashKey, Vec<Vec<u8>>>>>,
backend: Arc<StorageBackend>, backend: Arc<StorageBackend>,
codec: ProtoStorage,
} }
impl Storage { impl Storage {
@ -113,7 +31,9 @@ impl Storage {
Self { Self {
segment_size_bytes: 4096 * 1000, // 4MB segment_size_bytes: 4096 * 1000, // 4MB
buffer: Arc::default(), buffer: Arc::default(),
backend: Arc::new(backend), backend: Arc::new(backend),
codec: ProtoStorage::default(),
} }
} }
@ -136,20 +56,30 @@ impl Storage {
} }
let log_buffer = buffer.get_mut(&topic_key).unwrap(); let log_buffer = buffer.get_mut(&topic_key).unwrap();
if log_buffer.len() + value.len() >= self.segment_size_bytes { if log_buffer
.iter()
.map(|b| b.len())
.reduce(|acc, i| acc + i)
.unwrap_or_default()
>= self.segment_size_bytes
{
let log_buffer = buffer.remove(&topic_key).unwrap();
let time = SystemTime::now(); let time = SystemTime::now();
let segment_id = self let segment_id = self
.backend .backend
.flush_segment(&topic_key, log_buffer) .flush_segment(&topic_key, &self.codec.format_log_message(log_buffer))
.await .await
.context("failed to write segment")?; .context("failed to write segment")?;
self.backend self.backend
.append_index(&topic_key, &segment_id, time) .append_index(&topic_key, &segment_id, time)
.await?; .await?;
log_buffer.clear();
}
log_buffer.extend(value); let mut log_buf = Vec::with_capacity(self.segment_size_bytes);
log_buf.push(value.to_vec());
buffer.insert(topic_key.clone(), log_buf);
} else {
log_buffer.push(value.to_vec());
}
} }
Ok(()) Ok(())
@ -168,3 +98,20 @@ impl Storage {
hex::encode(Sha256::digest(input.as_bytes())) hex::encode(Sha256::digest(input.as_bytes()))
} }
} }
mod proto {
use prost::Message;
mod gen {
include!("gen/nodata.v1.rs");
}
#[derive(Default, Clone)]
pub struct ProtoStorage {}
impl ProtoStorage {
pub fn format_log_message(&self, messages: Vec<Vec<u8>>) -> Vec<u8> {
gen::Log { messages }.encode_to_vec()
}
}
}

View File

@ -35,6 +35,7 @@ tokio-stream = "0.1.15"
dagger-sdk = "0.11.10" dagger-sdk = "0.11.10"
rand = "0.8.5" rand = "0.8.5"
notmad = "0.4.0" notmad = "0.4.0"
prometheus = "0.13.4"
[dev-dependencies] [dev-dependencies]
tracing-test = { version = "0.2.5" } #, features = ["no-env-filter"] } tracing-test = { version = "0.2.5" } #, features = ["no-env-filter"] }

View File

@ -52,5 +52,11 @@ pub struct PingRequest {
#[derive(Clone, Copy, PartialEq, ::prost::Message)] #[derive(Clone, Copy, PartialEq, ::prost::Message)]
pub struct PingResponse { pub struct PingResponse {
} }
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct Log {
#[prost(bytes="vec", repeated, tag="1")]
pub messages: ::prost::alloc::vec::Vec<::prost::alloc::vec::Vec<u8>>,
}
include!("nodata.v1.tonic.rs"); include!("nodata.v1.tonic.rs");
// @@protoc_insertion_point(module) // @@protoc_insertion_point(module)

View File

@ -17,13 +17,26 @@ include!("gen/nodata.v1.rs");
pub struct GrpcServer { pub struct GrpcServer {
state: SharedState, state: SharedState,
host: SocketAddr, host: SocketAddr,
counter: prometheus::Counter,
} }
impl GrpcServer { impl GrpcServer {
pub fn new(state: &SharedState, host: SocketAddr) -> Self { pub fn new(state: &SharedState, host: SocketAddr) -> Self {
let counter = prometheus::Counter::with_opts(prometheus::Opts::new(
"messages_pr_second",
"how many messages are ingested pr second",
))
.unwrap();
state
.metrics_registry
.register(Box::new(counter.clone()))
.expect("to be able to register metrics");
Self { Self {
state: state.clone(), state: state.clone(),
host, host,
counter,
} }
} }
} }
@ -45,6 +58,8 @@ impl no_data_service_server::NoDataService for GrpcServer {
"handling event" "handling event"
); );
self.counter.inc();
self.state.ingest().publish(req).await.map_err(|e| { self.state.ingest().publish(req).await.map_err(|e| {
tracing::warn!(error = e.to_string(), "failed to handle ingest of data"); tracing::warn!(error = e.to_string(), "failed to handle ingest of data");
tonic::Status::internal(e.to_string()) tonic::Status::internal(e.to_string())

View File

@ -3,11 +3,14 @@ use std::net::SocketAddr;
use anyhow::Context; use anyhow::Context;
use axum::async_trait; use axum::async_trait;
use axum::extract::MatchedPath; use axum::extract::MatchedPath;
use axum::extract::State;
use axum::http::Request; use axum::http::Request;
use axum::routing::get; use axum::routing::get;
use axum::Router; use axum::Router;
use notmad::Component; use notmad::Component;
use notmad::MadError; use notmad::MadError;
use prometheus::Encoder;
use prometheus::TextEncoder;
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
use tower_http::trace::TraceLayer; use tower_http::trace::TraceLayer;
@ -32,6 +35,7 @@ impl Component for HttpServer {
async fn run(&self, _cancellation_token: CancellationToken) -> Result<(), notmad::MadError> { async fn run(&self, _cancellation_token: CancellationToken) -> Result<(), notmad::MadError> {
let app = Router::new() let app = Router::new()
.route("/", get(root)) .route("/", get(root))
.route("/metrics", get(metrics))
.with_state(self.state.clone()) .with_state(self.state.clone())
.layer( .layer(
TraceLayer::new_for_http().make_span_with(|request: &Request<_>| { TraceLayer::new_for_http().make_span_with(|request: &Request<_>| {
@ -63,3 +67,15 @@ impl Component for HttpServer {
async fn root() -> &'static str { async fn root() -> &'static str {
"Hello, nodata!" "Hello, nodata!"
} }
async fn metrics(State(state): State<SharedState>) -> String {
let encoder = TextEncoder::new();
let metrics = state.metrics_registry.gather();
let mut buffer = Vec::new();
encoder
.encode(&metrics, &mut buffer)
.expect("to be able to encode metrics");
String::from_utf8(buffer).expect("to be able to encode from utf8")
}

View File

@ -28,9 +28,17 @@ struct Command {
#[derive(Subcommand)] #[derive(Subcommand)]
enum Commands { enum Commands {
Serve { Serve {
#[arg(env = "SERVICE_HOST", long, default_value = "127.0.0.1:3000")] #[arg(
env = "SERVICE_HOST",
long = "service-host",
default_value = "127.0.0.1:3000"
)]
host: SocketAddr, host: SocketAddr,
#[arg(env = "GRPC_SERVICE_HOST", long, default_value = "127.0.0.1:7900")] #[arg(
env = "GRPC_SERVICE_HOST",
long = "service-grpc-host",
default_value = "127.0.0.1:7900"
)]
grpc_host: SocketAddr, grpc_host: SocketAddr,
}, },
@ -39,7 +47,7 @@ enum Commands {
//host: String, //host: String,
#[arg( #[arg(
env = "GRPC_SERVICE_HOST", env = "GRPC_SERVICE_HOST",
long, long = "service-grpc-host",
default_value = "http://127.0.0.1:7900" default_value = "http://127.0.0.1:7900"
)] )]
grpc_host: String, grpc_host: String,
@ -57,6 +65,14 @@ enum ClientCommands {
#[arg(long)] #[arg(long)]
value: String, value: String,
}, },
PublishEvents {
#[arg(long)]
topic: String,
#[arg(long)]
size: usize,
#[arg(long)]
threads: usize,
},
GetTopics {}, GetTopics {},
SubscribeTopic { SubscribeTopic {
#[arg(long)] #[arg(long)]
@ -99,6 +115,39 @@ async fn main() -> anyhow::Result<()> {
}) })
.await?; .await?;
} }
ClientCommands::PublishEvents {
topic,
size,
threads,
} => {
let mut handles = Vec::new();
for _ in 0..threads {
let topic = topic.clone();
let grpc_host = grpc_host.clone();
let handle = tokio::spawn(async move {
let mut client = create_client(grpc_host).await?;
loop {
let _ = client
.publish_event(PublishEventRequest {
topic: topic.clone(),
value: vec![0; size],
})
.await?;
//tokio::time::sleep(std::time::Duration::from_millis(5)).await;
}
#[allow(unreachable_code)]
Ok::<(), anyhow::Error>(())
});
handles.push(handle);
}
for handle in handles {
handle.await??;
}
}
ClientCommands::GetTopics {} => { ClientCommands::GetTopics {} => {
let mut client = create_client(grpc_host).await?; let mut client = create_client(grpc_host).await?;

View File

@ -1,4 +1,4 @@
use std::{collections::BTreeMap, env::temp_dir, sync::Arc}; use std::{collections::BTreeMap, sync::Arc};
use tokio::sync::RwLock; use tokio::sync::RwLock;

View File

@ -1,6 +1,7 @@
use std::{ops::Deref, sync::Arc}; use std::{ops::Deref, sync::Arc};
use anyhow::Context; use anyhow::Context;
use prometheus::Registry;
use sqlx::{Pool, Postgres}; use sqlx::{Pool, Postgres};
use crate::services::{consumers::Consumers, handler::Handler, staging::Staging}; use crate::services::{consumers::Consumers, handler::Handler, staging::Staging};
@ -27,6 +28,7 @@ pub struct State {
pub staging: Staging, pub staging: Staging,
pub consumers: Consumers, pub consumers: Consumers,
pub handler: Handler, pub handler: Handler,
pub metrics_registry: Registry,
} }
impl State { impl State {
@ -51,6 +53,7 @@ impl State {
consumers: Consumers::new(), consumers: Consumers::new(),
staging, staging,
handler, handler,
metrics_registry: Registry::new(),
}) })
} }
} }

View File

@ -0,0 +1,7 @@
syntax = "proto3";
package nodata.v1;
message Log {
repeated bytes messages = 1;
}

View File

@ -0,0 +1,37 @@
services:
prometheus:
image: prom/prometheus:latest
container_name: prometheus
ports:
- "9090:9090"
configs:
- source: prometheus
target: /etc/prometheus/prometheus.yml
grafana:
image: grafana/grafana:latest
container_name: grafana
ports:
- "13000:3000"
depends_on:
- prometheus
volumes:
- grafana_data:/var/lib/grafana
environment:
- GF_SECURITY_ADMIN_USER=admin
- GF_SECURITY_ADMIN_PASSWORD=admin
volumes:
grafana_data:
configs:
prometheus:
content: |
global:
scrape_interval: 5s
scrape_configs:
- job_name: 'nodata'
static_configs:
- targets: ['nefarious:3000']