diff --git a/Cargo.lock b/Cargo.lock index 8ecc8ed..473387e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1493,6 +1493,7 @@ dependencies = [ "mad", "prost", "prost-types", + "rand", "serde", "sqlx", "tokio", diff --git a/crates/nodata/Cargo.toml b/crates/nodata/Cargo.toml index d1ec9e1..9c0d941 100644 --- a/crates/nodata/Cargo.toml +++ b/crates/nodata/Cargo.toml @@ -32,6 +32,7 @@ prost-types.workspace = true chrono = { version = "0.4.38", features = ["serde"] } tokio-stream = "0.1.15" dagger-sdk = "0.11.10" +rand = "0.8.5" [dev-dependencies] -tracing-test = "0.2.5" +tracing-test = { version = "0.2.5" } #, features = ["no-env-filter"] } diff --git a/crates/nodata/src/dagger_engine.rs b/crates/nodata/src/dagger_engine.rs index cc8e5b2..8703ac2 100644 --- a/crates/nodata/src/dagger_engine.rs +++ b/crates/nodata/src/dagger_engine.rs @@ -1,7 +1,7 @@ use std::sync::Arc; use anyhow::Context; -use dagger_sdk::{PortForward, ServiceUpOptsBuilder}; +use dagger_sdk::{PortForward, ServiceEndpointOpts, ServiceUpOpts}; use tokio::net::TcpListener; use tokio_util::sync::CancellationToken; @@ -30,6 +30,7 @@ impl DaggerConn { // Bind to the os, and let it select a random port above > 30000 let component_listener = match TcpListener::bind("127.0.0.1:0").await { + // FIXME: dagger doesn't support isize for some reason? Ok(listener) => listener, Err(e) => { tracing::warn!(error = e.to_string(), "failed to allocate port"); @@ -42,6 +43,9 @@ impl DaggerConn { .context("failed to find a valid random port, you may've run out")? .port(); + // free up reserved listener + drop(component_listener); + // Let the blocking container run in the background, maintained by the cancellation token handle in the dagger container let container_name = name.to_string(); let container_token = cancellation_token.child_token(); @@ -58,11 +62,12 @@ impl DaggerConn { } }); - let grpc = match GrpcComponentClient::new(format!("127.0.0.1:{}", port)).await { + let grpc = match GrpcComponentClient::new(format!("http://127.0.0.1:{}", port)).await { Ok(grpc) => grpc, Err(e) => { tracing::warn!( error = e.to_string(), + port = port, "failed to bootstrap grpc component, service may not be up yet." ); @@ -70,17 +75,21 @@ impl DaggerConn { } }; - match grpc.ping().await { - Ok(_) => { - // TODO: Finally send something back to the caller - } - Err(e) => { - tracing::warn!( - error = e.to_string(), - "failed to ping grpc server, service may not be up yet." - ); + for i in 1..5 { + match grpc.ping().await { + Ok(_) => { + // TODO: Finally send something back to the caller + break; + } + Err(e) => { + tracing::warn!( + error = e.to_string(), + port = port, + "failed to ping grpc server, service may not be up yet." + ); - anyhow::bail!("failed to ping container"); + tokio::time::sleep(std::time::Duration::from_secs(i)).await; + } } } @@ -212,6 +221,12 @@ async fn spawn_container( image: &str, outer_port: u16, ) -> anyhow::Result<()> { + tracing::debug!( + image = image, + outer_port = outer_port as isize, + "spawning container" + ); + let service = client .container() .from(image) @@ -220,15 +235,14 @@ async fn spawn_container( .as_service(); service - .up_opts( - ServiceUpOptsBuilder::default() - .ports(vec![PortForward { - backend: 7900, - frontend: outer_port as isize, - protocol: dagger_sdk::NetworkProtocol::Tcp, - }]) - .build()?, - ) + .up_opts(ServiceUpOpts { + ports: Some(vec![PortForward { + frontend: outer_port as isize, + backend: 7900, + protocol: dagger_sdk::NetworkProtocol::Tcp, + }]), + random: Some(false), + }) .await?; Ok(()) @@ -252,7 +266,7 @@ mod tests { let container = dagger_engine .start_container( "some_name", - "kasperhermansen/nodata-transformer-test:main-1723938077", + "docker.io/kasperhermansen/nodata-transformer-test:main-1723938433", ) .await?; diff --git a/crates/nodata/src/main.rs b/crates/nodata/src/main.rs index 062d796..6ff49fb 100644 --- a/crates/nodata/src/main.rs +++ b/crates/nodata/src/main.rs @@ -14,6 +14,7 @@ use broker::Broker; use chrono::{Datelike, Timelike}; use clap::{Parser, Subcommand}; use grpc::{GetKeysRequest, GetTopicsRequest, GrpcServer, PublishEventRequest, SubscribeRequest}; +use grpc_component::GrpcComponentClient; use http::HttpServer; use mad::Mad; use state::SharedState; @@ -75,6 +76,10 @@ enum ClientCommands { #[arg(long)] key: String, }, + ComponentPing { + #[arg(long)] + host: String, + }, } #[tokio::main] @@ -174,6 +179,13 @@ async fn main() -> anyhow::Result<()> { ) } } + ClientCommands::ComponentPing { host } => { + tracing::info!(host = host, "creating client"); + let client = GrpcComponentClient::new(&host).await?; + + tracing::info!(host = host, "sending ping"); + client.ping().await?; + } }, }