feat: working dagger engine

Signed-off-by: kjuulh <contact@kjuulh.io>
This commit is contained in:
Kasper Juul Hermansen 2024-08-19 07:29:26 +02:00
parent bfe88dc008
commit b9be9a3ef1
Signed by: kjuulh
GPG Key ID: 9AA7BC13CE474394
4 changed files with 51 additions and 23 deletions

1
Cargo.lock generated
View File

@ -1493,6 +1493,7 @@ dependencies = [
"mad", "mad",
"prost", "prost",
"prost-types", "prost-types",
"rand",
"serde", "serde",
"sqlx", "sqlx",
"tokio", "tokio",

View File

@ -32,6 +32,7 @@ prost-types.workspace = true
chrono = { version = "0.4.38", features = ["serde"] } chrono = { version = "0.4.38", features = ["serde"] }
tokio-stream = "0.1.15" tokio-stream = "0.1.15"
dagger-sdk = "0.11.10" dagger-sdk = "0.11.10"
rand = "0.8.5"
[dev-dependencies] [dev-dependencies]
tracing-test = "0.2.5" tracing-test = { version = "0.2.5" } #, features = ["no-env-filter"] }

View File

@ -1,7 +1,7 @@
use std::sync::Arc; use std::sync::Arc;
use anyhow::Context; use anyhow::Context;
use dagger_sdk::{PortForward, ServiceUpOptsBuilder}; use dagger_sdk::{PortForward, ServiceEndpointOpts, ServiceUpOpts};
use tokio::net::TcpListener; use tokio::net::TcpListener;
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
@ -30,6 +30,7 @@ impl DaggerConn {
// Bind to the os, and let it select a random port above > 30000 // Bind to the os, and let it select a random port above > 30000
let component_listener = match TcpListener::bind("127.0.0.1:0").await { let component_listener = match TcpListener::bind("127.0.0.1:0").await {
// FIXME: dagger doesn't support isize for some reason?
Ok(listener) => listener, Ok(listener) => listener,
Err(e) => { Err(e) => {
tracing::warn!(error = e.to_string(), "failed to allocate port"); tracing::warn!(error = e.to_string(), "failed to allocate port");
@ -42,6 +43,9 @@ impl DaggerConn {
.context("failed to find a valid random port, you may've run out")? .context("failed to find a valid random port, you may've run out")?
.port(); .port();
// free up reserved listener
drop(component_listener);
// Let the blocking container run in the background, maintained by the cancellation token handle in the dagger container // Let the blocking container run in the background, maintained by the cancellation token handle in the dagger container
let container_name = name.to_string(); let container_name = name.to_string();
let container_token = cancellation_token.child_token(); let container_token = cancellation_token.child_token();
@ -58,11 +62,12 @@ impl DaggerConn {
} }
}); });
let grpc = match GrpcComponentClient::new(format!("127.0.0.1:{}", port)).await { let grpc = match GrpcComponentClient::new(format!("http://127.0.0.1:{}", port)).await {
Ok(grpc) => grpc, Ok(grpc) => grpc,
Err(e) => { Err(e) => {
tracing::warn!( tracing::warn!(
error = e.to_string(), error = e.to_string(),
port = port,
"failed to bootstrap grpc component, service may not be up yet." "failed to bootstrap grpc component, service may not be up yet."
); );
@ -70,17 +75,21 @@ impl DaggerConn {
} }
}; };
match grpc.ping().await { for i in 1..5 {
Ok(_) => { match grpc.ping().await {
// TODO: Finally send something back to the caller Ok(_) => {
} // TODO: Finally send something back to the caller
Err(e) => { break;
tracing::warn!( }
error = e.to_string(), Err(e) => {
"failed to ping grpc server, service may not be up yet." tracing::warn!(
); error = e.to_string(),
port = port,
"failed to ping grpc server, service may not be up yet."
);
anyhow::bail!("failed to ping container"); tokio::time::sleep(std::time::Duration::from_secs(i)).await;
}
} }
} }
@ -212,6 +221,12 @@ async fn spawn_container(
image: &str, image: &str,
outer_port: u16, outer_port: u16,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
tracing::debug!(
image = image,
outer_port = outer_port as isize,
"spawning container"
);
let service = client let service = client
.container() .container()
.from(image) .from(image)
@ -220,15 +235,14 @@ async fn spawn_container(
.as_service(); .as_service();
service service
.up_opts( .up_opts(ServiceUpOpts {
ServiceUpOptsBuilder::default() ports: Some(vec![PortForward {
.ports(vec![PortForward { frontend: outer_port as isize,
backend: 7900, backend: 7900,
frontend: outer_port as isize, protocol: dagger_sdk::NetworkProtocol::Tcp,
protocol: dagger_sdk::NetworkProtocol::Tcp, }]),
}]) random: Some(false),
.build()?, })
)
.await?; .await?;
Ok(()) Ok(())
@ -252,7 +266,7 @@ mod tests {
let container = dagger_engine let container = dagger_engine
.start_container( .start_container(
"some_name", "some_name",
"kasperhermansen/nodata-transformer-test:main-1723938077", "docker.io/kasperhermansen/nodata-transformer-test:main-1723938433",
) )
.await?; .await?;

View File

@ -14,6 +14,7 @@ use broker::Broker;
use chrono::{Datelike, Timelike}; use chrono::{Datelike, Timelike};
use clap::{Parser, Subcommand}; use clap::{Parser, Subcommand};
use grpc::{GetKeysRequest, GetTopicsRequest, GrpcServer, PublishEventRequest, SubscribeRequest}; use grpc::{GetKeysRequest, GetTopicsRequest, GrpcServer, PublishEventRequest, SubscribeRequest};
use grpc_component::GrpcComponentClient;
use http::HttpServer; use http::HttpServer;
use mad::Mad; use mad::Mad;
use state::SharedState; use state::SharedState;
@ -75,6 +76,10 @@ enum ClientCommands {
#[arg(long)] #[arg(long)]
key: String, key: String,
}, },
ComponentPing {
#[arg(long)]
host: String,
},
} }
#[tokio::main] #[tokio::main]
@ -174,6 +179,13 @@ async fn main() -> anyhow::Result<()> {
) )
} }
} }
ClientCommands::ComponentPing { host } => {
tracing::info!(host = host, "creating client");
let client = GrpcComponentClient::new(&host).await?;
tracing::info!(host = host, "sending ping");
client.ping().await?;
}
}, },
} }