diff --git a/Cargo.lock b/Cargo.lock index 9780b28..154e7f8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1627,6 +1627,7 @@ dependencies = [ "drift", "nodata-storage", "notmad", + "prometheus", "prost", "prost-types", "rand", @@ -1661,7 +1662,10 @@ name = "nodata-storage" version = "0.1.0" dependencies = [ "anyhow", + "bytes", "hex", + "prost", + "prost-types", "sha2", "tokio", "tracing", @@ -1922,6 +1926,21 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "prometheus" +version = "0.13.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3d33c28a30771f7f96db69893f78b857f7450d7e0237e9c8fc6427a81bae7ed1" +dependencies = [ + "cfg-if", + "fnv", + "lazy_static", + "memchr", + "parking_lot", + "protobuf", + "thiserror", +] + [[package]] name = "prost" version = "0.13.3" @@ -1954,6 +1973,12 @@ dependencies = [ "prost", ] +[[package]] +name = "protobuf" +version = "2.28.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "106dd99e98437432fed6519dedecfade6a06a73bb7b2a1e019fdd2bee5778d94" + [[package]] name = "quote" version = "1.0.37" diff --git a/buf.gen.yaml b/buf.gen.yaml index 2ecafa4..43d8f54 100644 --- a/buf.gen.yaml +++ b/buf.gen.yaml @@ -13,5 +13,8 @@ plugins: - remote: buf.build/community/neoeinstein-tonic:v0.4.0 out: crates/nodata-sdk/src/gen +- remote: buf.build/community/neoeinstein-prost + out: crates/nodata-storage/src/gen + inputs: - directory: proto diff --git a/crates/nodata-sdk/src/gen/nodata.v1.rs b/crates/nodata-sdk/src/gen/nodata.v1.rs index 126ea72..203e160 100644 --- a/crates/nodata-sdk/src/gen/nodata.v1.rs +++ b/crates/nodata-sdk/src/gen/nodata.v1.rs @@ -52,5 +52,11 @@ pub struct PingRequest { #[derive(Clone, Copy, PartialEq, ::prost::Message)] pub struct PingResponse { } +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct Log { + #[prost(bytes="vec", repeated, tag="1")] + pub messages: ::prost::alloc::vec::Vec<::prost::alloc::vec::Vec>, +} include!("nodata.v1.tonic.rs"); // @@protoc_insertion_point(module) \ No newline at end of file diff --git a/crates/nodata-storage/Cargo.toml b/crates/nodata-storage/Cargo.toml index 8f0ad63..895861d 100644 --- a/crates/nodata-storage/Cargo.toml +++ b/crates/nodata-storage/Cargo.toml @@ -8,6 +8,9 @@ anyhow.workspace = true tokio.workspace = true uuid.workspace = true tracing.workspace = true +prost.workspace = true +prost-types.workspace = true +bytes.workspace = true hex = "0.4.3" sha2 = "0.10.8" diff --git a/crates/nodata-storage/src/backend.rs b/crates/nodata-storage/src/backend.rs new file mode 100644 index 0000000..05b9062 --- /dev/null +++ b/crates/nodata-storage/src/backend.rs @@ -0,0 +1,84 @@ +use std::{ + env::temp_dir, + path::{Path, PathBuf}, + time::{SystemTime, UNIX_EPOCH}, +}; + +use anyhow::Context; +use tokio::io::AsyncWriteExt; + +pub struct StorageBackend { + location: PathBuf, +} + +impl StorageBackend { + pub fn new(location: &Path) -> Self { + Self { + location: location.into(), + } + } + + pub fn temp() -> Self { + Self::new(&temp_dir().join("nodata")) + } + + pub async fn flush_segment(&self, topic: &str, buffer: &[u8]) -> anyhow::Result { + let segment_key = uuid::Uuid::now_v7(); + let segment_path = PathBuf::from("logs") + .join(topic) + .join(segment_key.to_string()); + tracing::trace!("writing segment file: {}", segment_path.display()); + let file_location = self.location.join(&segment_path); + if let Some(parent) = file_location.parent() { + tokio::fs::create_dir_all(parent) + .await + .context("failed to create storage backend dir")?; + } + + let mut segment_file = tokio::fs::File::create(&file_location).await?; + segment_file.write_all(buffer).await?; + segment_file.flush().await?; + + Ok(segment_key.to_string()) + } + + pub async fn append_index( + &self, + topic: &str, + segment_file: &str, + time: SystemTime, + ) -> anyhow::Result<()> { + let index_path = PathBuf::from("indexes").join(topic); + tracing::trace!("writing index file: {}", index_path.display()); + let file_location = self.location.join(&index_path); + if let Some(parent) = file_location.parent() { + tokio::fs::create_dir_all(parent) + .await + .context("failed to create storage backend dir, index")?; + } + + if !file_location.exists() { + tokio::fs::File::create(&file_location).await?; + } + + let mut index_file = tokio::fs::File::options() + .append(true) + .open(&file_location) + .await?; + index_file + .write_all( + format!( + "{},{}\n", + time.duration_since(UNIX_EPOCH) + .expect("to be able to get time") + .as_secs(), + segment_file + ) + .as_bytes(), + ) + .await?; + index_file.flush().await?; + + Ok(()) + } +} diff --git a/crates/nodata-storage/src/gen/nodata.v1.rs b/crates/nodata-storage/src/gen/nodata.v1.rs new file mode 100644 index 0000000..1a47184 --- /dev/null +++ b/crates/nodata-storage/src/gen/nodata.v1.rs @@ -0,0 +1,61 @@ +// @generated +// This file is @generated by prost-build. +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct PublishEventRequest { + #[prost(string, tag="1")] + pub topic: ::prost::alloc::string::String, + #[prost(bytes="vec", tag="2")] + pub value: ::prost::alloc::vec::Vec, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, Copy, PartialEq, ::prost::Message)] +pub struct PublishEventResponse { +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, Copy, PartialEq, ::prost::Message)] +pub struct GetTopicsRequest { +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct GetTopicsResponse { + #[prost(string, repeated, tag="1")] + pub topics: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct SubscribeRequest { + #[prost(string, tag="1")] + pub topic: ::prost::alloc::string::String, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct SubscribeResponse { + #[prost(message, optional, tag="2")] + pub published: ::core::option::Option<::prost_types::Timestamp>, + #[prost(bytes="vec", tag="4")] + pub value: ::prost::alloc::vec::Vec, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, Copy, PartialEq, ::prost::Message)] +pub struct HandleMsgRequest { +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, Copy, PartialEq, ::prost::Message)] +pub struct HandleMsgResponse { +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, Copy, PartialEq, ::prost::Message)] +pub struct PingRequest { +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, Copy, PartialEq, ::prost::Message)] +pub struct PingResponse { +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct Log { + #[prost(bytes="vec", repeated, tag="1")] + pub messages: ::prost::alloc::vec::Vec<::prost::alloc::vec::Vec>, +} +// @@protoc_insertion_point(module) diff --git a/crates/nodata-storage/src/lib.rs b/crates/nodata-storage/src/lib.rs index 4bfe47e..cbea281 100644 --- a/crates/nodata-storage/src/lib.rs +++ b/crates/nodata-storage/src/lib.rs @@ -9,103 +9,21 @@ use std::{collections::BTreeMap, sync::Arc, time::SystemTime}; use anyhow::Context; use backend::StorageBackend; +use proto::ProtoStorage; use sha2::{Digest, Sha256}; use tokio::sync::Mutex; type TopicHashKey = String; -pub mod backend { - use std::{ - env::temp_dir, - path::{Path, PathBuf}, - time::{SystemTime, UNIX_EPOCH}, - }; - - use anyhow::Context; - use tokio::io::AsyncWriteExt; - - pub struct StorageBackend { - location: PathBuf, - } - - impl StorageBackend { - pub fn new(location: &Path) -> Self { - Self { - location: location.into(), - } - } - - pub fn temp() -> Self { - Self::new(&temp_dir().join("nodata")) - } - - pub async fn flush_segment(&self, topic: &str, buffer: &[u8]) -> anyhow::Result { - let segment_key = uuid::Uuid::now_v7(); - let segment_path = PathBuf::from("logs") - .join(topic) - .join(segment_key.to_string()); - tracing::trace!("writing segment file: {}", segment_path.display()); - let file_location = self.location.join(&segment_path); - if let Some(parent) = file_location.parent() { - tokio::fs::create_dir_all(parent) - .await - .context("failed to create storage backend dir")?; - } - - let mut segment_file = tokio::fs::File::create(&file_location).await?; - segment_file.write_all(buffer).await?; - segment_file.flush().await?; - - Ok(segment_key.to_string()) - } - - pub async fn append_index( - &self, - topic: &str, - segment_file: &str, - time: SystemTime, - ) -> anyhow::Result<()> { - let index_path = PathBuf::from("indexes").join(topic); - tracing::trace!("writing index file: {}", index_path.display()); - let file_location = self.location.join(&index_path); - if let Some(parent) = file_location.parent() { - tokio::fs::create_dir_all(parent) - .await - .context("failed to create storage backend dir, index")?; - } - - if !file_location.exists() { - tokio::fs::File::create(&file_location).await?; - } - - let mut index_file = tokio::fs::File::options() - .append(true) - .open(&file_location) - .await?; - index_file - .write_all( - format!( - "{},{}\n", - time.duration_since(UNIX_EPOCH) - .expect("to be able to get time") - .as_secs(), - segment_file - ) - .as_bytes(), - ) - .await?; - index_file.flush().await?; - - Ok(()) - } - } -} +pub mod backend; #[derive(Clone)] pub struct Storage { segment_size_bytes: usize, - buffer: Arc>>>, + buffer: Arc>>>>, backend: Arc, + + codec: ProtoStorage, } impl Storage { @@ -113,7 +31,9 @@ impl Storage { Self { segment_size_bytes: 4096 * 1000, // 4MB buffer: Arc::default(), + backend: Arc::new(backend), + codec: ProtoStorage::default(), } } @@ -136,20 +56,30 @@ impl Storage { } let log_buffer = buffer.get_mut(&topic_key).unwrap(); - if log_buffer.len() + value.len() >= self.segment_size_bytes { + if log_buffer + .iter() + .map(|b| b.len()) + .reduce(|acc, i| acc + i) + .unwrap_or_default() + >= self.segment_size_bytes + { + let log_buffer = buffer.remove(&topic_key).unwrap(); let time = SystemTime::now(); let segment_id = self .backend - .flush_segment(&topic_key, log_buffer) + .flush_segment(&topic_key, &self.codec.format_log_message(log_buffer)) .await .context("failed to write segment")?; self.backend .append_index(&topic_key, &segment_id, time) .await?; - log_buffer.clear(); - } - log_buffer.extend(value); + let mut log_buf = Vec::with_capacity(self.segment_size_bytes); + log_buf.push(value.to_vec()); + buffer.insert(topic_key.clone(), log_buf); + } else { + log_buffer.push(value.to_vec()); + } } Ok(()) @@ -168,3 +98,20 @@ impl Storage { hex::encode(Sha256::digest(input.as_bytes())) } } + +mod proto { + use prost::Message; + + mod gen { + include!("gen/nodata.v1.rs"); + } + + #[derive(Default, Clone)] + pub struct ProtoStorage {} + + impl ProtoStorage { + pub fn format_log_message(&self, messages: Vec>) -> Vec { + gen::Log { messages }.encode_to_vec() + } + } +} diff --git a/crates/nodata/Cargo.toml b/crates/nodata/Cargo.toml index 40a7bf4..0f864d6 100644 --- a/crates/nodata/Cargo.toml +++ b/crates/nodata/Cargo.toml @@ -35,6 +35,7 @@ tokio-stream = "0.1.15" dagger-sdk = "0.11.10" rand = "0.8.5" notmad = "0.4.0" +prometheus = "0.13.4" [dev-dependencies] tracing-test = { version = "0.2.5" } #, features = ["no-env-filter"] } diff --git a/crates/nodata/src/gen/nodata.v1.rs b/crates/nodata/src/gen/nodata.v1.rs index 126ea72..203e160 100644 --- a/crates/nodata/src/gen/nodata.v1.rs +++ b/crates/nodata/src/gen/nodata.v1.rs @@ -52,5 +52,11 @@ pub struct PingRequest { #[derive(Clone, Copy, PartialEq, ::prost::Message)] pub struct PingResponse { } +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct Log { + #[prost(bytes="vec", repeated, tag="1")] + pub messages: ::prost::alloc::vec::Vec<::prost::alloc::vec::Vec>, +} include!("nodata.v1.tonic.rs"); // @@protoc_insertion_point(module) \ No newline at end of file diff --git a/crates/nodata/src/grpc.rs b/crates/nodata/src/grpc.rs index 3252d92..1eb669b 100644 --- a/crates/nodata/src/grpc.rs +++ b/crates/nodata/src/grpc.rs @@ -17,13 +17,26 @@ include!("gen/nodata.v1.rs"); pub struct GrpcServer { state: SharedState, host: SocketAddr, + counter: prometheus::Counter, } impl GrpcServer { pub fn new(state: &SharedState, host: SocketAddr) -> Self { + let counter = prometheus::Counter::with_opts(prometheus::Opts::new( + "messages_pr_second", + "how many messages are ingested pr second", + )) + .unwrap(); + + state + .metrics_registry + .register(Box::new(counter.clone())) + .expect("to be able to register metrics"); + Self { state: state.clone(), host, + counter, } } } @@ -45,6 +58,8 @@ impl no_data_service_server::NoDataService for GrpcServer { "handling event" ); + self.counter.inc(); + self.state.ingest().publish(req).await.map_err(|e| { tracing::warn!(error = e.to_string(), "failed to handle ingest of data"); tonic::Status::internal(e.to_string()) diff --git a/crates/nodata/src/http.rs b/crates/nodata/src/http.rs index 42142bb..4ca9d2d 100644 --- a/crates/nodata/src/http.rs +++ b/crates/nodata/src/http.rs @@ -3,11 +3,14 @@ use std::net::SocketAddr; use anyhow::Context; use axum::async_trait; use axum::extract::MatchedPath; +use axum::extract::State; use axum::http::Request; use axum::routing::get; use axum::Router; use notmad::Component; use notmad::MadError; +use prometheus::Encoder; +use prometheus::TextEncoder; use tokio_util::sync::CancellationToken; use tower_http::trace::TraceLayer; @@ -32,6 +35,7 @@ impl Component for HttpServer { async fn run(&self, _cancellation_token: CancellationToken) -> Result<(), notmad::MadError> { let app = Router::new() .route("/", get(root)) + .route("/metrics", get(metrics)) .with_state(self.state.clone()) .layer( TraceLayer::new_for_http().make_span_with(|request: &Request<_>| { @@ -63,3 +67,15 @@ impl Component for HttpServer { async fn root() -> &'static str { "Hello, nodata!" } + +async fn metrics(State(state): State) -> String { + let encoder = TextEncoder::new(); + let metrics = state.metrics_registry.gather(); + + let mut buffer = Vec::new(); + encoder + .encode(&metrics, &mut buffer) + .expect("to be able to encode metrics"); + + String::from_utf8(buffer).expect("to be able to encode from utf8") +} diff --git a/crates/nodata/src/main.rs b/crates/nodata/src/main.rs index 0187826..40df025 100644 --- a/crates/nodata/src/main.rs +++ b/crates/nodata/src/main.rs @@ -28,9 +28,17 @@ struct Command { #[derive(Subcommand)] enum Commands { Serve { - #[arg(env = "SERVICE_HOST", long, default_value = "127.0.0.1:3000")] + #[arg( + env = "SERVICE_HOST", + long = "service-host", + default_value = "127.0.0.1:3000" + )] host: SocketAddr, - #[arg(env = "GRPC_SERVICE_HOST", long, default_value = "127.0.0.1:7900")] + #[arg( + env = "GRPC_SERVICE_HOST", + long = "service-grpc-host", + default_value = "127.0.0.1:7900" + )] grpc_host: SocketAddr, }, @@ -39,7 +47,7 @@ enum Commands { //host: String, #[arg( env = "GRPC_SERVICE_HOST", - long, + long = "service-grpc-host", default_value = "http://127.0.0.1:7900" )] grpc_host: String, @@ -57,6 +65,14 @@ enum ClientCommands { #[arg(long)] value: String, }, + PublishEvents { + #[arg(long)] + topic: String, + #[arg(long)] + size: usize, + #[arg(long)] + threads: usize, + }, GetTopics {}, SubscribeTopic { #[arg(long)] @@ -99,6 +115,39 @@ async fn main() -> anyhow::Result<()> { }) .await?; } + ClientCommands::PublishEvents { + topic, + size, + threads, + } => { + let mut handles = Vec::new(); + for _ in 0..threads { + let topic = topic.clone(); + let grpc_host = grpc_host.clone(); + let handle = tokio::spawn(async move { + let mut client = create_client(grpc_host).await?; + + loop { + let _ = client + .publish_event(PublishEventRequest { + topic: topic.clone(), + value: vec![0; size], + }) + .await?; + //tokio::time::sleep(std::time::Duration::from_millis(5)).await; + } + + #[allow(unreachable_code)] + Ok::<(), anyhow::Error>(()) + }); + + handles.push(handle); + } + + for handle in handles { + handle.await??; + } + } ClientCommands::GetTopics {} => { let mut client = create_client(grpc_host).await?; diff --git a/crates/nodata/src/services/staging.rs b/crates/nodata/src/services/staging.rs index e86c63b..7b5ebb7 100644 --- a/crates/nodata/src/services/staging.rs +++ b/crates/nodata/src/services/staging.rs @@ -1,4 +1,4 @@ -use std::{collections::BTreeMap, env::temp_dir, sync::Arc}; +use std::{collections::BTreeMap, sync::Arc}; use tokio::sync::RwLock; diff --git a/crates/nodata/src/state.rs b/crates/nodata/src/state.rs index 9a75105..a14b18f 100644 --- a/crates/nodata/src/state.rs +++ b/crates/nodata/src/state.rs @@ -1,6 +1,7 @@ use std::{ops::Deref, sync::Arc}; use anyhow::Context; +use prometheus::Registry; use sqlx::{Pool, Postgres}; use crate::services::{consumers::Consumers, handler::Handler, staging::Staging}; @@ -27,6 +28,7 @@ pub struct State { pub staging: Staging, pub consumers: Consumers, pub handler: Handler, + pub metrics_registry: Registry, } impl State { @@ -51,6 +53,7 @@ impl State { consumers: Consumers::new(), staging, handler, + metrics_registry: Registry::new(), }) } } diff --git a/proto/nodata/v1/nodata_storage.proto b/proto/nodata/v1/nodata_storage.proto new file mode 100644 index 0000000..1cf9b24 --- /dev/null +++ b/proto/nodata/v1/nodata_storage.proto @@ -0,0 +1,7 @@ +syntax = "proto3"; + +package nodata.v1; + +message Log { + repeated bytes messages = 1; +} diff --git a/templates/docker-compose.monitoring.yaml b/templates/docker-compose.monitoring.yaml new file mode 100644 index 0000000..4f61bed --- /dev/null +++ b/templates/docker-compose.monitoring.yaml @@ -0,0 +1,37 @@ +services: + prometheus: + image: prom/prometheus:latest + container_name: prometheus + ports: + - "9090:9090" + configs: + - source: prometheus + target: /etc/prometheus/prometheus.yml + + grafana: + image: grafana/grafana:latest + container_name: grafana + ports: + - "13000:3000" + depends_on: + - prometheus + volumes: + - grafana_data:/var/lib/grafana + environment: + - GF_SECURITY_ADMIN_USER=admin + - GF_SECURITY_ADMIN_PASSWORD=admin + +volumes: + grafana_data: + +configs: + prometheus: + content: | + global: + scrape_interval: 5s + + scrape_configs: + - job_name: 'nodata' + static_configs: + - targets: ['nefarious:3000'] +