feat: add basic redpanda rising wave setup

Signed-off-by: kjuulh <contact@kjuulh.io>
This commit is contained in:
2024-08-05 21:41:30 +02:00
commit f8ef7701ea
17 changed files with 3043 additions and 0 deletions

1
crates/kafka-ingest/.gitignore vendored Normal file
View File

@@ -0,0 +1 @@
/target

View File

@@ -0,0 +1,31 @@
[package]
name = "kafka-ingest"
version = "0.1.0"
edition = "2021"
[dependencies]
anyhow.workspace = true
tokio.workspace = true
tracing.workspace = true
tracing-subscriber.workspace = true
clap.workspace = true
dotenv.workspace = true
axum.workspace = true
serde = { version = "1.0.197", features = ["derive"] }
sqlx = { version = "0.7.3", features = [
"runtime-tokio",
"tls-rustls",
"postgres",
"uuid",
"time",
] }
uuid = { version = "1.7.0", features = ["v4"] }
tower-http = { version = "0.5.2", features = ["cors", "trace"] }
kafka = "0.10.0"
drift = { git = "https://github.com/kjuulh/drift", branch = "main" }
num = { version = "0.4.3", features = ["num-bigint", "rand", "serde"] }
chrono = { version = "0.4.38", features = ["serde"] }
rand = "0.8.5"
serde_json = "1.0.122"

View File

@@ -0,0 +1,84 @@
use std::time::Duration;
use anyhow::Context;
use chrono::{TimeDelta, Utc};
use clap::{Parser, Subcommand};
use kafka::producer::Record;
use rand::Rng;
use serde::Serialize;
#[derive(Parser)]
#[command(author, version, about, long_about = None, subcommand_required = true)]
struct Command {
#[command(subcommand)]
command: Option<Commands>,
}
#[derive(Subcommand)]
enum Commands {
Hello {},
StartStreaming {},
}
#[derive(Clone, Serialize, Debug)]
struct AdSource {
user_id: i64,
ad_id: i64,
click_timestamp: String,
impression_timestamp: String,
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
dotenv::dotenv().ok();
tracing_subscriber::fmt::init();
let cli = Command::parse();
tracing::debug!("Starting cli");
match cli.command.unwrap() {
Commands::Hello {} => println!("Hello!"),
Commands::StartStreaming {} => {
let send_event = drift::schedule(std::time::Duration::from_millis(50), || async {
tracing::debug!("sending event");
let mut rng = rand::thread_rng();
let mut producer =
kafka::producer::Producer::from_hosts(vec!["localhost:9092".into()])
.with_ack_timeout(Duration::from_secs(1))
.with_required_acks(kafka::client::RequiredAcks::One)
.create()
.map_err(|e| drift::DriftError::JobError(e.into()))?;
let msg = AdSource {
user_id: rng.gen_range(0..64),
ad_id: rng.gen_range(0..64),
click_timestamp: Utc::now()
.checked_add_signed(TimeDelta::milliseconds(500))
.unwrap()
.to_rfc3339(),
impression_timestamp: Utc::now().to_rfc3339(),
};
producer
.send(&Record::from_value(
"ad_clicks",
serde_json::to_string(&msg)
.context("failed to serialize type")
.map_err(drift::DriftError::JobError)?,
))
.map_err(|e| drift::DriftError::JobError(e.into()))?;
Ok(())
});
println!("waiting for closure press ctrl-c to cancel");
if let Ok(()) = tokio::signal::ctrl_c().await {
send_event.cancel();
}
}
}
Ok(())
}