kafka-ingest-concept/crates/kafka-ingest/src/main.rs
kjuulh f8ef7701ea
feat: add basic redpanda rising wave setup
Signed-off-by: kjuulh <contact@kjuulh.io>
2024-08-05 21:41:30 +02:00

85 lines
2.5 KiB
Rust

use std::time::Duration;
use anyhow::Context;
use chrono::{TimeDelta, Utc};
use clap::{Parser, Subcommand};
use kafka::producer::Record;
use rand::Rng;
use serde::Serialize;
#[derive(Parser)]
#[command(author, version, about, long_about = None, subcommand_required = true)]
struct Command {
#[command(subcommand)]
command: Option<Commands>,
}
#[derive(Subcommand)]
enum Commands {
Hello {},
StartStreaming {},
}
#[derive(Clone, Serialize, Debug)]
struct AdSource {
user_id: i64,
ad_id: i64,
click_timestamp: String,
impression_timestamp: String,
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
dotenv::dotenv().ok();
tracing_subscriber::fmt::init();
let cli = Command::parse();
tracing::debug!("Starting cli");
match cli.command.unwrap() {
Commands::Hello {} => println!("Hello!"),
Commands::StartStreaming {} => {
let send_event = drift::schedule(std::time::Duration::from_millis(50), || async {
tracing::debug!("sending event");
let mut rng = rand::thread_rng();
let mut producer =
kafka::producer::Producer::from_hosts(vec!["localhost:9092".into()])
.with_ack_timeout(Duration::from_secs(1))
.with_required_acks(kafka::client::RequiredAcks::One)
.create()
.map_err(|e| drift::DriftError::JobError(e.into()))?;
let msg = AdSource {
user_id: rng.gen_range(0..64),
ad_id: rng.gen_range(0..64),
click_timestamp: Utc::now()
.checked_add_signed(TimeDelta::milliseconds(500))
.unwrap()
.to_rfc3339(),
impression_timestamp: Utc::now().to_rfc3339(),
};
producer
.send(&Record::from_value(
"ad_clicks",
serde_json::to_string(&msg)
.context("failed to serialize type")
.map_err(drift::DriftError::JobError)?,
))
.map_err(|e| drift::DriftError::JobError(e.into()))?;
Ok(())
});
println!("waiting for closure press ctrl-c to cancel");
if let Ok(()) = tokio::signal::ctrl_c().await {
send_event.cancel();
}
}
}
Ok(())
}