use std::time::Duration; use anyhow::Context; use chrono::{TimeDelta, Utc}; use clap::{Parser, Subcommand}; use kafka::producer::Record; use rand::Rng; use serde::Serialize; #[derive(Parser)] #[command(author, version, about, long_about = None, subcommand_required = true)] struct Command { #[command(subcommand)] command: Option, } #[derive(Subcommand)] enum Commands { Produce { #[arg(long)] host: String, #[arg(long)] topic: String, #[arg(long = "delay-ms")] delay_ms: u64, }, } #[derive(Clone, Serialize, Debug)] struct AdSource { user_id: i64, ad_id: i64, click_timestamp: String, impression_timestamp: String, } #[tokio::main] async fn main() -> anyhow::Result<()> { dotenvy::dotenv().ok(); tracing_subscriber::fmt::init(); let cli = Command::parse(); tracing::debug!("Starting cli"); match cli.command.unwrap() { Commands::Produce { topic, delay_ms, host, } => { let send_event = nodrift::schedule(std::time::Duration::from_millis(delay_ms), move || { let host = host.clone(); let topic = topic.clone(); async move { tracing::info!("sending event"); let mut rng = rand::rng(); let mut producer = kafka::producer::Producer::from_hosts(vec![host]) .with_ack_timeout(Duration::from_secs(1)) .with_required_acks(kafka::client::RequiredAcks::One) .create() .map_err(|e| nodrift::DriftError::JobError(e.into()))?; let msg = AdSource { user_id: rng.random_range(0..64), ad_id: rng.random_range(0..64), click_timestamp: format!( "{}", Utc::now() .checked_add_signed(TimeDelta::milliseconds(500)) .unwrap() .format("%Y-%m-%dT%H:%M:%S") ), impression_timestamp: format!( "{}", Utc::now().to_utc().format("%Y-%m-%dT%H:%M:%S") ), }; producer .send(&Record::from_value( &topic, serde_json::to_string(&msg) .context("failed to serialize type") .map_err(nodrift::DriftError::JobError)?, )) .map_err(|e| nodrift::DriftError::JobError(e.into()))?; Ok(()) } }); println!("waiting for closure press ctrl-c to cancel"); tokio::select! { _ = send_event.cancelled() => { tokio::time::sleep(Duration::from_secs(5)).await; return Ok(()) } _ = tokio::signal::ctrl_c() => { send_event.cancel(); return Ok(()) } } } } }