From 5cf8956cada08391207cce0b88db07bb3d598116 Mon Sep 17 00:00:00 2001 From: kjuulh Date: Mon, 12 Aug 2024 00:09:37 +0200 Subject: [PATCH] feat: add staging Signed-off-by: kjuulh --- crates/nodata/src/gen/nodata.rs | 15 +++--- crates/nodata/src/grpc.rs | 21 +++++++- crates/nodata/src/main.rs | 31 +++++++++-- crates/nodata/src/services.rs | 1 + crates/nodata/src/services/staging.rs | 74 +++++++++++++++++++++++++++ crates/nodata/src/state.rs | 8 ++- proto/nomicon.proto | 1 + 7 files changed, 138 insertions(+), 13 deletions(-) create mode 100644 crates/nodata/src/services.rs create mode 100644 crates/nodata/src/services/staging.rs diff --git a/crates/nodata/src/gen/nodata.rs b/crates/nodata/src/gen/nodata.rs index 6eda830..84dde39 100644 --- a/crates/nodata/src/gen/nodata.rs +++ b/crates/nodata/src/gen/nodata.rs @@ -2,18 +2,19 @@ #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct PublishEventRequest { - #[prost(string, tag="1")] + #[prost(string, tag = "1")] pub topic: ::prost::alloc::string::String, - #[prost(message, optional, tag="2")] + #[prost(message, optional, tag = "2")] pub published: ::core::option::Option<::prost_types::Timestamp>, - #[prost(string, tag="3")] + #[prost(string, tag = "3")] pub key: ::prost::alloc::string::String, - #[prost(bytes="vec", tag="4")] + #[prost(bytes = "vec", tag = "4")] pub value: ::prost::alloc::vec::Vec, + #[prost(string, optional, tag = "5")] + pub id: ::core::option::Option<::prost::alloc::string::String>, } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] -pub struct PublishEventResponse { -} +pub struct PublishEventResponse {} include!("nodata.tonic.rs"); -// @@protoc_insertion_point(module) \ No newline at end of file +// @@protoc_insertion_point(module) diff --git a/crates/nodata/src/grpc.rs b/crates/nodata/src/grpc.rs index 680ecb4..26bf2b5 100644 --- a/crates/nodata/src/grpc.rs +++ b/crates/nodata/src/grpc.rs @@ -3,7 +3,10 @@ use std::net::SocketAddr; use anyhow::Context; use mad::Component; -use crate::state::SharedState; +use crate::{ + services::staging::{StagingEvent, StagingState}, + state::SharedState, +}; include!("gen/nodata.rs"); @@ -34,13 +37,29 @@ impl no_data_server::NoData for GrpcServer { topic = req.topic, key = req.key, value = std::str::from_utf8(&req.value).ok(), + id = req.id, "handling event" ); + self.state.staging().publish(req).await.map_err(|e| { + tracing::warn!(error = e.to_string(), "failed to handle ingest of data"); + tonic::Status::internal(e.to_string()) + })?; + Ok(tonic::Response::new(PublishEventResponse {})) } } +impl From for StagingEvent { + fn from(value: PublishEventRequest) -> Self { + Self { + topic: value.topic, + id: value.id, + key: value.key, + } + } +} + #[axum::async_trait] impl Component for GrpcServer { fn name(&self) -> Option { diff --git a/crates/nodata/src/main.rs b/crates/nodata/src/main.rs index 30df818..16d1993 100644 --- a/crates/nodata/src/main.rs +++ b/crates/nodata/src/main.rs @@ -2,14 +2,17 @@ mod grpc; mod http; mod state; +mod services; + use std::net::SocketAddr; use chrono::{Datelike, Timelike}; -use clap::{Parser, Subcommand}; +use clap::{Parser, Subcommand, ValueEnum}; use grpc::{GrpcServer, PublishEventRequest}; use http::HttpServer; use mad::Mad; use state::SharedState; +use uuid::Uuid; #[derive(Parser)] #[command(author, version, about, long_about = None, subcommand_required = true)] @@ -51,6 +54,10 @@ enum ClientCommands { key: String, #[arg(long)] value: String, + #[arg(long)] + id: Option, + #[arg(long = "generate-id")] + generate_id: bool, }, } @@ -60,7 +67,6 @@ async fn main() -> anyhow::Result<()> { tracing_subscriber::fmt::init(); let cli = Command::parse(); - match cli.command.unwrap() { Commands::Serve { host, grpc_host } => { tracing::info!("Starting service"); @@ -76,13 +82,19 @@ async fn main() -> anyhow::Result<()> { host, grpc_host, } => match commands { - ClientCommands::PublishEvent { topic, key, value } => { + ClientCommands::PublishEvent { + topic, + key, + value, + id, + generate_id, + } => { let mut client = crate::grpc::no_data_client::NoDataClient::connect(grpc_host).await?; let timestamp = chrono::Utc::now(); - let res = client + let _ = client .publish_event(PublishEventRequest { topic, published: Some(prost_types::Timestamp::date_time_nanos( @@ -96,6 +108,17 @@ async fn main() -> anyhow::Result<()> { )?), key, value: value.into_bytes(), + id: { + if id.is_none() { + if generate_id { + Some(Uuid::new_v4().to_string()) + } else { + None + } + } else { + id + } + }, }) .await?; } diff --git a/crates/nodata/src/services.rs b/crates/nodata/src/services.rs new file mode 100644 index 0000000..5d9d396 --- /dev/null +++ b/crates/nodata/src/services.rs @@ -0,0 +1 @@ +pub mod staging; diff --git a/crates/nodata/src/services/staging.rs b/crates/nodata/src/services/staging.rs new file mode 100644 index 0000000..49d506a --- /dev/null +++ b/crates/nodata/src/services/staging.rs @@ -0,0 +1,74 @@ +use std::{ + collections::BTreeMap, + sync::{Arc, RwLock}, +}; + +use crate::state::SharedState; + +pub struct StagingEvent { + pub topic: String, + pub key: String, + pub id: Option, +} + +#[derive(Default, Clone)] +pub struct Staging { + // Temporary until we've got an actual file disk store + store: Arc>>>>, +} + +impl Staging { + pub async fn publish(&self, staging_event: impl Into) -> anyhow::Result<()> { + let staging_event: StagingEvent = staging_event.into(); + let mut store = self.store.write().unwrap(); + tracing::trace!( + topic = staging_event.topic, + id = staging_event.id, + "moving event to staging" + ); + + match store.get_mut(&staging_event.topic) { + Some(part) => match part.get_mut(&staging_event.key) { + Some(existing_key_part) => { + if staging_event.id.is_none() + || !existing_key_part.iter().any(|p| p.id == staging_event.id) + { + existing_key_part.push(staging_event); + } else { + tracing::debug!( + topic = staging_event.topic, + id = staging_event.id, + "event already found, skipping" + ); + } + } + None => { + part.insert(staging_event.key.to_owned(), vec![staging_event]); + } + }, + None => { + tracing::debug!( + topic = staging_event.topic, + id = staging_event.id, + "new topic, creating partition" + ); + store.insert( + staging_event.topic.to_owned(), + BTreeMap::from([(staging_event.key.to_owned(), vec![staging_event])]), + ); + } + } + + Ok(()) + } +} + +pub trait StagingState { + fn staging(&self) -> Staging; +} + +impl StagingState for SharedState { + fn staging(&self) -> Staging { + self.staging.clone() + } +} diff --git a/crates/nodata/src/state.rs b/crates/nodata/src/state.rs index a02d80a..c62f778 100644 --- a/crates/nodata/src/state.rs +++ b/crates/nodata/src/state.rs @@ -3,6 +3,8 @@ use std::{ops::Deref, sync::Arc}; use anyhow::Context; use sqlx::{Pool, Postgres}; +use crate::services::staging::Staging; + #[derive(Clone)] pub struct SharedState(Arc); @@ -22,6 +24,7 @@ impl Deref for SharedState { pub struct State { pub db: Pool, + pub staging: Staging, } impl State { @@ -38,6 +41,9 @@ impl State { let _ = sqlx::query("SELECT 1;").fetch_one(&db).await?; - Ok(Self { db }) + Ok(Self { + db, + staging: Staging::default(), + }) } } diff --git a/proto/nomicon.proto b/proto/nomicon.proto index a389002..fe0e518 100644 --- a/proto/nomicon.proto +++ b/proto/nomicon.proto @@ -13,6 +13,7 @@ message PublishEventRequest { google.protobuf.Timestamp published = 2; string key = 3; bytes value = 4; + optional string id = 5; } message PublishEventResponse {