From 43ed89d0d8595615f04f1d3a557439b47ec446d3 Mon Sep 17 00:00:00 2001 From: kjuulh Date: Sun, 27 Aug 2023 19:42:33 +0200 Subject: [PATCH] feat: with agent db Signed-off-by: kjuulh --- crates/churn-capnp/schemas/models.capnp | 4 + crates/churn-capnp/src/lib.rs | 39 +++-- crates/churn-capnp/src/models_capnp.rs | 204 ++++++++++++++++++++++++ crates/churn-domain/src/lib.rs | 5 + crates/churn-server/src/agent.rs | 51 +++--- crates/churn-server/src/db.rs | 6 +- crates/churn-server/src/event.rs | 14 +- crates/churn-server/src/main.rs | 15 +- crates/churn/src/main.rs | 4 +- crates/churning/src/main.rs | 12 +- 10 files changed, 286 insertions(+), 68 deletions(-) diff --git a/crates/churn-capnp/schemas/models.capnp b/crates/churn-capnp/schemas/models.capnp index 9addd33..96c914f 100644 --- a/crates/churn-capnp/schemas/models.capnp +++ b/crates/churn-capnp/schemas/models.capnp @@ -6,3 +6,7 @@ struct LogEvent { content @2 :Text; datetime @3 :Int64; } + +struct Agent { + name @0 :Text; +} diff --git a/crates/churn-capnp/src/lib.rs b/crates/churn-capnp/src/lib.rs index 0cc7b75..7bed7fb 100644 --- a/crates/churn-capnp/src/lib.rs +++ b/crates/churn-capnp/src/lib.rs @@ -1,9 +1,9 @@ use capnp::message::{Builder, HeapAllocator}; use capnp::message::{ReaderOptions, TypedReader}; -use capnp::serialize::{self, OwnedSegments, SliceSegments}; +use capnp::serialize::{self, SliceSegments}; -use capnp::traits::{FromPointerReader, Owned}; -use churn_domain::LogEvent; +use capnp::traits::{Owned}; +use churn_domain::{Agent, LogEvent}; mod models_capnp; @@ -14,11 +14,11 @@ pub trait CapnpPackExt { fn deserialize_capnp(content: &Vec) -> anyhow::Result; fn capnp_to_string(builder: &Builder) -> Vec { - let msg = serialize::write_message_to_words(builder); - msg + + serialize::write_message_to_words(builder) } - fn string_to_capnp<'a, S>(content: &'a Vec) -> TypedReader + fn string_to_capnp(content: &Vec) -> TypedReader where S: Owned, { @@ -26,9 +26,9 @@ pub trait CapnpPackExt { serialize::read_message_from_flat_slice(&mut content.as_slice(), ReaderOptions::new()) .unwrap(); - let log_event = log_event.into_typed::(); + - log_event + log_event.into_typed::() } } @@ -37,7 +37,6 @@ impl CapnpPackExt for LogEvent { fn serialize_capnp(&self) -> Vec { let mut builder = Builder::new_default(); - let mut log_event = builder.init_root::(); log_event.set_id(&self.id.to_string()); log_event.set_author(&self.author); @@ -62,3 +61,25 @@ impl CapnpPackExt for LogEvent { }) } } + +impl CapnpPackExt for Agent { + type Return = Self; + + fn serialize_capnp(&self) -> Vec { + let mut builder = Builder::new_default(); + let mut item = builder.init_root::(); + + item.set_name(&self.name); + + Self::capnp_to_string(&builder) + } + + fn deserialize_capnp(content: &Vec) -> anyhow::Result { + let item = Self::string_to_capnp::(content); + let item = item.get()?; + + Ok(Self { + name: item.get_name()?.into(), + }) + } +} diff --git a/crates/churn-capnp/src/models_capnp.rs b/crates/churn-capnp/src/models_capnp.rs index cd4e7da..6f57c6c 100644 --- a/crates/churn-capnp/src/models_capnp.rs +++ b/crates/churn-capnp/src/models_capnp.rs @@ -315,3 +315,207 @@ pub mod log_event { pub const TYPE_ID: u64 = 0xe78f_0c5b_590e_1932; } } + +pub mod agent { + #[derive(Copy, Clone)] + pub struct Owned(()); + impl ::capnp::introspect::Introspect for Owned { fn introspect() -> ::capnp::introspect::Type { ::capnp::introspect::TypeVariant::Struct(::capnp::introspect::RawBrandedStructSchema { generic: &_private::RAW_SCHEMA, field_types: _private::get_field_types, annotation_types: _private::get_annotation_types }).into() } } + impl ::capnp::traits::Owned for Owned { type Reader<'a> = Reader<'a>; type Builder<'a> = Builder<'a>; } + impl ::capnp::traits::OwnedStruct for Owned { type Reader<'a> = Reader<'a>; type Builder<'a> = Builder<'a>; } + impl ::capnp::traits::Pipelined for Owned { type Pipeline = Pipeline; } + + pub struct Reader<'a> { reader: ::capnp::private::layout::StructReader<'a> } + impl <'a,> ::core::marker::Copy for Reader<'a,> {} + impl <'a,> ::core::clone::Clone for Reader<'a,> { + fn clone(&self) -> Self { *self } + } + + impl <'a,> ::capnp::traits::HasTypeId for Reader<'a,> { + const TYPE_ID: u64 = _private::TYPE_ID; + } + impl <'a,> ::core::convert::From<::capnp::private::layout::StructReader<'a>> for Reader<'a,> { + fn from(reader: ::capnp::private::layout::StructReader<'a>) -> Self { + Self { reader, } + } + } + + impl <'a,> ::core::convert::From> for ::capnp::dynamic_value::Reader<'a> { + fn from(reader: Reader<'a,>) -> Self { + Self::Struct(::capnp::dynamic_struct::Reader::new(reader.reader, ::capnp::schema::StructSchema::new(::capnp::introspect::RawBrandedStructSchema { generic: &_private::RAW_SCHEMA, field_types: _private::get_field_types::<>, annotation_types: _private::get_annotation_types::<>}))) + } + } + + impl <'a,> ::core::fmt::Debug for Reader<'a,> { + fn fmt(&self, f: &mut ::core::fmt::Formatter<'_>) -> ::core::result::Result<(), ::core::fmt::Error> { + core::fmt::Debug::fmt(&::core::convert::Into::<::capnp::dynamic_value::Reader<'_>>::into(*self), f) + } + } + + impl <'a,> ::capnp::traits::FromPointerReader<'a> for Reader<'a,> { + fn get_from_pointer(reader: &::capnp::private::layout::PointerReader<'a>, default: ::core::option::Option<&'a [::capnp::Word]>) -> ::capnp::Result { + ::core::result::Result::Ok(reader.get_struct(default)?.into()) + } + } + + impl <'a,> ::capnp::traits::IntoInternalStructReader<'a> for Reader<'a,> { + fn into_internal_struct_reader(self) -> ::capnp::private::layout::StructReader<'a> { + self.reader + } + } + + impl <'a,> ::capnp::traits::Imbue<'a> for Reader<'a,> { + fn imbue(&mut self, cap_table: &'a ::capnp::private::layout::CapTable) { + self.reader.imbue(::capnp::private::layout::CapTableReader::Plain(cap_table)) + } + } + + impl <'a,> Reader<'a,> { + pub fn reborrow(&self) -> Reader<'_,> { + Self { .. *self } + } + + pub fn total_size(&self) -> ::capnp::Result<::capnp::MessageSize> { + self.reader.total_size() + } + #[inline] + pub fn get_name(self) -> ::capnp::Result<::capnp::text::Reader<'a>> { + ::capnp::traits::FromPointerReader::get_from_pointer(&self.reader.get_pointer_field(0), ::core::option::Option::None) + } + #[inline] + pub fn has_name(&self) -> bool { + !self.reader.get_pointer_field(0).is_null() + } + } + + pub struct Builder<'a> { builder: ::capnp::private::layout::StructBuilder<'a> } + impl <'a,> ::capnp::traits::HasStructSize for Builder<'a,> { + const STRUCT_SIZE: ::capnp::private::layout::StructSize = ::capnp::private::layout::StructSize { data: 0, pointers: 1 }; + } + impl <'a,> ::capnp::traits::HasTypeId for Builder<'a,> { + const TYPE_ID: u64 = _private::TYPE_ID; + } + impl <'a,> ::core::convert::From<::capnp::private::layout::StructBuilder<'a>> for Builder<'a,> { + fn from(builder: ::capnp::private::layout::StructBuilder<'a>) -> Self { + Self { builder, } + } + } + + impl <'a,> ::core::convert::From> for ::capnp::dynamic_value::Builder<'a> { + fn from(builder: Builder<'a,>) -> Self { + Self::Struct(::capnp::dynamic_struct::Builder::new(builder.builder, ::capnp::schema::StructSchema::new(::capnp::introspect::RawBrandedStructSchema { generic: &_private::RAW_SCHEMA, field_types: _private::get_field_types::<>, annotation_types: _private::get_annotation_types::<>}))) + } + } + + impl <'a,> ::capnp::traits::ImbueMut<'a> for Builder<'a,> { + fn imbue_mut(&mut self, cap_table: &'a mut ::capnp::private::layout::CapTable) { + self.builder.imbue(::capnp::private::layout::CapTableBuilder::Plain(cap_table)) + } + } + + impl <'a,> ::capnp::traits::FromPointerBuilder<'a> for Builder<'a,> { + fn init_pointer(builder: ::capnp::private::layout::PointerBuilder<'a>, _size: u32) -> Self { + builder.init_struct(::STRUCT_SIZE).into() + } + fn get_from_pointer(builder: ::capnp::private::layout::PointerBuilder<'a>, default: ::core::option::Option<&'a [::capnp::Word]>) -> ::capnp::Result { + ::core::result::Result::Ok(builder.get_struct(::STRUCT_SIZE, default)?.into()) + } + } + + impl <'a,> ::capnp::traits::SetPointerBuilder for Reader<'a,> { + fn set_pointer_builder(mut pointer: ::capnp::private::layout::PointerBuilder<'_>, value: Self, canonicalize: bool) -> ::capnp::Result<()> { pointer.set_struct(&value.reader, canonicalize) } + } + + impl <'a,> Builder<'a,> { + pub fn into_reader(self) -> Reader<'a,> { + self.builder.into_reader().into() + } + pub fn reborrow(&mut self) -> Builder<'_,> { + Builder { builder: self.builder.reborrow() } + } + pub fn reborrow_as_reader(&self) -> Reader<'_,> { + self.builder.as_reader().into() + } + + pub fn total_size(&self) -> ::capnp::Result<::capnp::MessageSize> { + self.builder.as_reader().total_size() + } + #[inline] + pub fn get_name(self) -> ::capnp::Result<::capnp::text::Builder<'a>> { + ::capnp::traits::FromPointerBuilder::get_from_pointer(self.builder.get_pointer_field(0), ::core::option::Option::None) + } + #[inline] + pub fn set_name(&mut self, value: ::capnp::text::Reader<'_>) { + self.builder.reborrow().get_pointer_field(0).set_text(value); + } + #[inline] + pub fn init_name(self, size: u32) -> ::capnp::text::Builder<'a> { + self.builder.get_pointer_field(0).init_text(size) + } + #[inline] + pub fn has_name(&self) -> bool { + !self.builder.is_pointer_field_null(0) + } + } + + pub struct Pipeline { _typeless: ::capnp::any_pointer::Pipeline } + impl ::capnp::capability::FromTypelessPipeline for Pipeline { + fn new(typeless: ::capnp::any_pointer::Pipeline) -> Self { + Self { _typeless: typeless, } + } + } + impl Pipeline { + } + mod _private { + pub static ENCODED_NODE: [::capnp::Word; 32] = [ + ::capnp::word(0, 0, 0, 0, 5, 0, 6, 0), + ::capnp::word(160, 129, 44, 52, 151, 203, 164, 244), + ::capnp::word(13, 0, 0, 0, 1, 0, 0, 0), + ::capnp::word(164, 172, 216, 255, 36, 223, 58, 242), + ::capnp::word(1, 0, 7, 0, 0, 0, 0, 0), + ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(21, 0, 0, 0, 154, 0, 0, 0), + ::capnp::word(29, 0, 0, 0, 7, 0, 0, 0), + ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(25, 0, 0, 0, 63, 0, 0, 0), + ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(109, 111, 100, 101, 108, 115, 46, 99), + ::capnp::word(97, 112, 110, 112, 58, 65, 103, 101), + ::capnp::word(110, 116, 0, 0, 0, 0, 0, 0), + ::capnp::word(0, 0, 0, 0, 1, 0, 1, 0), + ::capnp::word(4, 0, 0, 0, 3, 0, 4, 0), + ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(0, 0, 1, 0, 0, 0, 0, 0), + ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(13, 0, 0, 0, 42, 0, 0, 0), + ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(8, 0, 0, 0, 3, 0, 1, 0), + ::capnp::word(20, 0, 0, 0, 2, 0, 1, 0), + ::capnp::word(110, 97, 109, 101, 0, 0, 0, 0), + ::capnp::word(12, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(12, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), + ::capnp::word(0, 0, 0, 0, 0, 0, 0, 0), + ]; + pub fn get_field_types(index: u16) -> ::capnp::introspect::Type { + match index { + 0 => <::capnp::text::Owned as ::capnp::introspect::Introspect>::introspect(), + _ => panic!("invalid field index {}", index), + } + } + pub fn get_annotation_types(child_index: Option, index: u32) -> ::capnp::introspect::Type { + panic!("invalid annotation indices ({:?}, {}) ", child_index, index) + } + pub static RAW_SCHEMA: ::capnp::introspect::RawStructSchema = ::capnp::introspect::RawStructSchema { + encoded_node: &ENCODED_NODE, + nonunion_members: NONUNION_MEMBERS, + members_by_discriminant: MEMBERS_BY_DISCRIMINANT, + }; + pub static NONUNION_MEMBERS : &[u16] = &[0]; + pub static MEMBERS_BY_DISCRIMINANT : &[u16] = &[]; + pub const TYPE_ID: u64 = 0xf4a4_cb97_342c_81a0; + } +} diff --git a/crates/churn-domain/src/lib.rs b/crates/churn-domain/src/lib.rs index 44c7df4..8f2ca8a 100644 --- a/crates/churn-domain/src/lib.rs +++ b/crates/churn-domain/src/lib.rs @@ -42,3 +42,8 @@ impl LogEvent { } } } + +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct Agent { + pub name: String, +} diff --git a/crates/churn-server/src/agent.rs b/crates/churn-server/src/agent.rs index a8891b9..e9c5c82 100644 --- a/crates/churn-server/src/agent.rs +++ b/crates/churn-server/src/agent.rs @@ -1,17 +1,21 @@ -use std::collections::HashMap; - use std::sync::Arc; use axum::async_trait; -use churn_domain::ServerEnrollReq; -use tokio::sync::Mutex; +use churn_capnp::CapnpPackExt; +use churn_domain::{Agent, ServerEnrollReq}; -use crate::Agent; +use crate::db::Db; #[derive(Clone)] pub struct AgentService(Arc); +impl AgentService { + pub fn new(db: Db) -> Self { + Self(Arc::new(DefaultAgentService::new(db))) + } +} + impl std::ops::Deref for AgentService { type Target = Arc; @@ -28,7 +32,13 @@ impl Default for AgentService { #[derive(Default)] struct DefaultAgentService { - agents: Arc>>, + agents: Db, +} + +impl DefaultAgentService { + pub fn new(db: Db) -> Self { + Self { agents: db } + } } #[async_trait] @@ -41,24 +51,17 @@ impl AgentServiceTrait for DefaultAgentService { async fn enroll(&self, req: ServerEnrollReq) -> anyhow::Result { let agent_name = req.agent_name; - let mut agents = self.agents.lock().await; + self.agents + .insert( + "agents", + &agent_name, + &Agent { + name: agent_name.clone(), + } + .serialize_capnp(), + ) + .await?; - match agents.insert( - agent_name.clone(), - Agent { - name: agent_name.clone(), - }, - ) { - Some(_) => { - tracing::debug!("agents store already contained agent, replaced existing"); - - Ok(agent_name) - } - None => { - tracing::debug!("agents store didn't contain agent, inserted"); - - Ok(agent_name) - } - } + Ok(agent_name) } } diff --git a/crates/churn-server/src/db.rs b/crates/churn-server/src/db.rs index e2e169f..1bb310c 100644 --- a/crates/churn-server/src/db.rs +++ b/crates/churn-server/src/db.rs @@ -1,12 +1,12 @@ use core::slice::SlicePattern; -use std::collections::HashMap; + use std::path::{Path, PathBuf}; use std::sync::Arc; use async_trait::async_trait; -use churn_domain::ServerEnrollReq; -use tokio::sync::Mutex; + + #[derive(Clone)] pub struct Db(Arc); diff --git a/crates/churn-server/src/event.rs b/crates/churn-server/src/event.rs index df335d1..8738390 100644 --- a/crates/churn-server/src/event.rs +++ b/crates/churn-server/src/event.rs @@ -1,13 +1,13 @@ -use std::collections::HashMap; + use std::sync::Arc; use axum::async_trait; -use churn_domain::{LogEvent, ServerEnrollReq}; +use churn_domain::{LogEvent}; use itertools::Itertools; -use serde::{ser::SerializeStruct, Deserialize, Serialize}; -use tokio::sync::{Mutex, RwLock}; + + use churn_capnp::CapnpPackExt; @@ -68,14 +68,13 @@ impl EventServiceTrait for DefaultEventService { let events = events .iter() - .map(|e| match LogEvent::deserialize_capnp(e) { + .flat_map(|e| match LogEvent::deserialize_capnp(e) { Ok(o) => Ok(o), Err(e) => { tracing::error!("failed to deserialize capnp: {e}"); Err(e) } }) - .flatten() .sorted_by_key(|i| i.timestamp) .skip_while(|item| item.id != cursor) .skip(1) @@ -88,8 +87,7 @@ impl EventServiceTrait for DefaultEventService { let events = events .iter() - .map(|e| LogEvent::deserialize_capnp(e)) - .flatten() + .flat_map(LogEvent::deserialize_capnp) .sorted_by_key(|i| i.timestamp) .collect(); diff --git a/crates/churn-server/src/main.rs b/crates/churn-server/src/main.rs index 05bdeb2..ba0758e 100644 --- a/crates/churn-server/src/main.rs +++ b/crates/churn-server/src/main.rs @@ -15,11 +15,11 @@ use axum::http::StatusCode; use axum::response::{IntoResponse, Response}; use axum::routing::{get, post}; use axum::{Json, Router}; -use churn_domain::{LeaseResp, LogEvent, ServerEnrollReq, ServerMonitorResp}; +use churn_domain::{Agent, LeaseResp, LogEvent, ServerEnrollReq, ServerMonitorResp}; use clap::{Args, Parser, Subcommand, ValueEnum}; use event::EventService; use lease::LeaseService; -use serde::{Deserialize, Serialize}; +use serde::{Deserialize}; use serde_json::json; use crate::db::Db; @@ -56,11 +56,6 @@ enum Commands { }, } -#[derive(Clone, Debug, Deserialize, Serialize)] -struct Agent { - pub name: String, -} - #[derive(Clone)] struct AppState { agent: AgentService, @@ -94,9 +89,9 @@ async fn main() -> anyhow::Result<()> { .route("/lease", post(agent_lease)), ) .with_state(AppState { - agent: AgentService::default(), + agent: AgentService::new(db.clone()), leases: LeaseService::default(), - events: EventService::new(db), + events: EventService::new(db.clone()), }); tracing::info!("churn server listening on {}", host); @@ -221,7 +216,7 @@ async fn logs( if events.is_empty() { return Ok(Json(ServerMonitorResp { - cursor: cursor.cursor.clone(), + cursor: cursor.cursor, logs: Vec::new(), })); } diff --git a/crates/churn/src/main.rs b/crates/churn/src/main.rs index 114eb4c..97c954d 100644 --- a/crates/churn/src/main.rs +++ b/crates/churn/src/main.rs @@ -109,7 +109,7 @@ async fn handle_command(cmd: Command) -> anyhow::Result<()> { } => todo!(), Commands::Monitor { server, - server_token, + server_token: _, } => { tracing::info!("monitoring server: {}", server); @@ -146,8 +146,6 @@ async fn handle_command(cmd: Command) -> anyhow::Result<()> { } tokio::time::sleep(std::time::Duration::from_secs(1)).await; } - - Ok(()) } } } else { diff --git a/crates/churning/src/main.rs b/crates/churning/src/main.rs index e31d981..b69c4cd 100644 --- a/crates/churning/src/main.rs +++ b/crates/churning/src/main.rs @@ -61,8 +61,6 @@ async fn main() -> eyre::Result<()> { } async fn repl(container: dagger_sdk::Container) -> eyre::Result<()> { - let mut container = container; - loop { let stdin = tokio::io::stdin(); let mut stdout = tokio::io::stdout(); @@ -81,7 +79,7 @@ async fn repl(container: dagger_sdk::Container) -> eyre::Result<()> { break; } - container = container.with_exec(input.split(' ').collect()); + let container = container.with_exec(input.split(' ').collect()); match container.stdout().await { Ok(stdout) => { @@ -91,14 +89,6 @@ async fn repl(container: dagger_sdk::Container) -> eyre::Result<()> { eprintln!("{}", e); } } - // match container.stderr().await { - // Ok(stderr) => { - // println!("{stderr}"); - // } - // Err(e) => { - // eprintln!("{}", e); - // } - // } match container.exit_code().await { Ok(_) => {}