feat: add staging

Signed-off-by: kjuulh <contact@kjuulh.io>
This commit is contained in:
Kasper Juul Hermansen 2024-08-12 00:09:37 +02:00
parent 3c5c5759ca
commit 5cf8956cad
Signed by: kjuulh
GPG Key ID: D85D7535F18F35FA
7 changed files with 138 additions and 13 deletions

View File

@ -10,10 +10,11 @@ pub struct PublishEventRequest {
pub key: ::prost::alloc::string::String, pub key: ::prost::alloc::string::String,
#[prost(bytes = "vec", tag = "4")] #[prost(bytes = "vec", tag = "4")]
pub value: ::prost::alloc::vec::Vec<u8>, pub value: ::prost::alloc::vec::Vec<u8>,
#[prost(string, optional, tag = "5")]
pub id: ::core::option::Option<::prost::alloc::string::String>,
} }
#[allow(clippy::derive_partial_eq_without_eq)] #[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)] #[derive(Clone, PartialEq, ::prost::Message)]
pub struct PublishEventResponse { pub struct PublishEventResponse {}
}
include!("nodata.tonic.rs"); include!("nodata.tonic.rs");
// @@protoc_insertion_point(module) // @@protoc_insertion_point(module)

View File

@ -3,7 +3,10 @@ use std::net::SocketAddr;
use anyhow::Context; use anyhow::Context;
use mad::Component; use mad::Component;
use crate::state::SharedState; use crate::{
services::staging::{StagingEvent, StagingState},
state::SharedState,
};
include!("gen/nodata.rs"); include!("gen/nodata.rs");
@ -34,13 +37,29 @@ impl no_data_server::NoData for GrpcServer {
topic = req.topic, topic = req.topic,
key = req.key, key = req.key,
value = std::str::from_utf8(&req.value).ok(), value = std::str::from_utf8(&req.value).ok(),
id = req.id,
"handling event" "handling event"
); );
self.state.staging().publish(req).await.map_err(|e| {
tracing::warn!(error = e.to_string(), "failed to handle ingest of data");
tonic::Status::internal(e.to_string())
})?;
Ok(tonic::Response::new(PublishEventResponse {})) Ok(tonic::Response::new(PublishEventResponse {}))
} }
} }
impl From<PublishEventRequest> for StagingEvent {
fn from(value: PublishEventRequest) -> Self {
Self {
topic: value.topic,
id: value.id,
key: value.key,
}
}
}
#[axum::async_trait] #[axum::async_trait]
impl Component for GrpcServer { impl Component for GrpcServer {
fn name(&self) -> Option<String> { fn name(&self) -> Option<String> {

View File

@ -2,14 +2,17 @@ mod grpc;
mod http; mod http;
mod state; mod state;
mod services;
use std::net::SocketAddr; use std::net::SocketAddr;
use chrono::{Datelike, Timelike}; use chrono::{Datelike, Timelike};
use clap::{Parser, Subcommand}; use clap::{Parser, Subcommand, ValueEnum};
use grpc::{GrpcServer, PublishEventRequest}; use grpc::{GrpcServer, PublishEventRequest};
use http::HttpServer; use http::HttpServer;
use mad::Mad; use mad::Mad;
use state::SharedState; use state::SharedState;
use uuid::Uuid;
#[derive(Parser)] #[derive(Parser)]
#[command(author, version, about, long_about = None, subcommand_required = true)] #[command(author, version, about, long_about = None, subcommand_required = true)]
@ -51,6 +54,10 @@ enum ClientCommands {
key: String, key: String,
#[arg(long)] #[arg(long)]
value: String, value: String,
#[arg(long)]
id: Option<String>,
#[arg(long = "generate-id")]
generate_id: bool,
}, },
} }
@ -60,7 +67,6 @@ async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt::init(); tracing_subscriber::fmt::init();
let cli = Command::parse(); let cli = Command::parse();
match cli.command.unwrap() { match cli.command.unwrap() {
Commands::Serve { host, grpc_host } => { Commands::Serve { host, grpc_host } => {
tracing::info!("Starting service"); tracing::info!("Starting service");
@ -76,13 +82,19 @@ async fn main() -> anyhow::Result<()> {
host, host,
grpc_host, grpc_host,
} => match commands { } => match commands {
ClientCommands::PublishEvent { topic, key, value } => { ClientCommands::PublishEvent {
topic,
key,
value,
id,
generate_id,
} => {
let mut client = let mut client =
crate::grpc::no_data_client::NoDataClient::connect(grpc_host).await?; crate::grpc::no_data_client::NoDataClient::connect(grpc_host).await?;
let timestamp = chrono::Utc::now(); let timestamp = chrono::Utc::now();
let res = client let _ = client
.publish_event(PublishEventRequest { .publish_event(PublishEventRequest {
topic, topic,
published: Some(prost_types::Timestamp::date_time_nanos( published: Some(prost_types::Timestamp::date_time_nanos(
@ -96,6 +108,17 @@ async fn main() -> anyhow::Result<()> {
)?), )?),
key, key,
value: value.into_bytes(), value: value.into_bytes(),
id: {
if id.is_none() {
if generate_id {
Some(Uuid::new_v4().to_string())
} else {
None
}
} else {
id
}
},
}) })
.await?; .await?;
} }

View File

@ -0,0 +1 @@
pub mod staging;

View File

@ -0,0 +1,74 @@
use std::{
collections::BTreeMap,
sync::{Arc, RwLock},
};
use crate::state::SharedState;
pub struct StagingEvent {
pub topic: String,
pub key: String,
pub id: Option<String>,
}
#[derive(Default, Clone)]
pub struct Staging {
// Temporary until we've got an actual file disk store
store: Arc<RwLock<BTreeMap<String, BTreeMap<String, Vec<StagingEvent>>>>>,
}
impl Staging {
pub async fn publish(&self, staging_event: impl Into<StagingEvent>) -> anyhow::Result<()> {
let staging_event: StagingEvent = staging_event.into();
let mut store = self.store.write().unwrap();
tracing::trace!(
topic = staging_event.topic,
id = staging_event.id,
"moving event to staging"
);
match store.get_mut(&staging_event.topic) {
Some(part) => match part.get_mut(&staging_event.key) {
Some(existing_key_part) => {
if staging_event.id.is_none()
|| !existing_key_part.iter().any(|p| p.id == staging_event.id)
{
existing_key_part.push(staging_event);
} else {
tracing::debug!(
topic = staging_event.topic,
id = staging_event.id,
"event already found, skipping"
);
}
}
None => {
part.insert(staging_event.key.to_owned(), vec![staging_event]);
}
},
None => {
tracing::debug!(
topic = staging_event.topic,
id = staging_event.id,
"new topic, creating partition"
);
store.insert(
staging_event.topic.to_owned(),
BTreeMap::from([(staging_event.key.to_owned(), vec![staging_event])]),
);
}
}
Ok(())
}
}
pub trait StagingState {
fn staging(&self) -> Staging;
}
impl StagingState for SharedState {
fn staging(&self) -> Staging {
self.staging.clone()
}
}

View File

@ -3,6 +3,8 @@ use std::{ops::Deref, sync::Arc};
use anyhow::Context; use anyhow::Context;
use sqlx::{Pool, Postgres}; use sqlx::{Pool, Postgres};
use crate::services::staging::Staging;
#[derive(Clone)] #[derive(Clone)]
pub struct SharedState(Arc<State>); pub struct SharedState(Arc<State>);
@ -22,6 +24,7 @@ impl Deref for SharedState {
pub struct State { pub struct State {
pub db: Pool<Postgres>, pub db: Pool<Postgres>,
pub staging: Staging,
} }
impl State { impl State {
@ -38,6 +41,9 @@ impl State {
let _ = sqlx::query("SELECT 1;").fetch_one(&db).await?; let _ = sqlx::query("SELECT 1;").fetch_one(&db).await?;
Ok(Self { db }) Ok(Self {
db,
staging: Staging::default(),
})
} }
} }

View File

@ -13,6 +13,7 @@ message PublishEventRequest {
google.protobuf.Timestamp published = 2; google.protobuf.Timestamp published = 2;
string key = 3; string key = 3;
bytes value = 4; bytes value = 4;
optional string id = 5;
} }
message PublishEventResponse { message PublishEventResponse {