feat: with agent db

Signed-off-by: kjuulh <contact@kjuulh.io>
This commit is contained in:
Kasper Juul Hermansen 2023-08-27 19:42:33 +02:00
parent 75d99c2461
commit 43ed89d0d8
Signed by: kjuulh
GPG Key ID: 9AA7BC13CE474394
10 changed files with 286 additions and 68 deletions

View File

@ -6,3 +6,7 @@ struct LogEvent {
content @2 :Text; content @2 :Text;
datetime @3 :Int64; datetime @3 :Int64;
} }
struct Agent {
name @0 :Text;
}

View File

@ -1,9 +1,9 @@
use capnp::message::{Builder, HeapAllocator}; use capnp::message::{Builder, HeapAllocator};
use capnp::message::{ReaderOptions, TypedReader}; use capnp::message::{ReaderOptions, TypedReader};
use capnp::serialize::{self, OwnedSegments, SliceSegments}; use capnp::serialize::{self, SliceSegments};
use capnp::traits::{FromPointerReader, Owned}; use capnp::traits::{Owned};
use churn_domain::LogEvent; use churn_domain::{Agent, LogEvent};
mod models_capnp; mod models_capnp;
@ -14,11 +14,11 @@ pub trait CapnpPackExt {
fn deserialize_capnp(content: &Vec<u8>) -> anyhow::Result<Self::Return>; fn deserialize_capnp(content: &Vec<u8>) -> anyhow::Result<Self::Return>;
fn capnp_to_string(builder: &Builder<HeapAllocator>) -> Vec<u8> { fn capnp_to_string(builder: &Builder<HeapAllocator>) -> Vec<u8> {
let msg = serialize::write_message_to_words(builder);
msg serialize::write_message_to_words(builder)
} }
fn string_to_capnp<'a, S>(content: &'a Vec<u8>) -> TypedReader<SliceSegments, S> fn string_to_capnp<S>(content: &Vec<u8>) -> TypedReader<SliceSegments, S>
where where
S: Owned, S: Owned,
{ {
@ -26,9 +26,9 @@ pub trait CapnpPackExt {
serialize::read_message_from_flat_slice(&mut content.as_slice(), ReaderOptions::new()) serialize::read_message_from_flat_slice(&mut content.as_slice(), ReaderOptions::new())
.unwrap(); .unwrap();
let log_event = log_event.into_typed::<S>();
log_event
log_event.into_typed::<S>()
} }
} }
@ -37,7 +37,6 @@ impl CapnpPackExt for LogEvent {
fn serialize_capnp(&self) -> Vec<u8> { fn serialize_capnp(&self) -> Vec<u8> {
let mut builder = Builder::new_default(); let mut builder = Builder::new_default();
let mut log_event = builder.init_root::<models_capnp::log_event::Builder>(); let mut log_event = builder.init_root::<models_capnp::log_event::Builder>();
log_event.set_id(&self.id.to_string()); log_event.set_id(&self.id.to_string());
log_event.set_author(&self.author); log_event.set_author(&self.author);
@ -62,3 +61,25 @@ impl CapnpPackExt for LogEvent {
}) })
} }
} }
impl CapnpPackExt for Agent {
type Return = Self;
fn serialize_capnp(&self) -> Vec<u8> {
let mut builder = Builder::new_default();
let mut item = builder.init_root::<models_capnp::agent::Builder>();
item.set_name(&self.name);
Self::capnp_to_string(&builder)
}
fn deserialize_capnp(content: &Vec<u8>) -> anyhow::Result<Self::Return> {
let item = Self::string_to_capnp::<models_capnp::agent::Owned>(content);
let item = item.get()?;
Ok(Self {
name: item.get_name()?.into(),
})
}
}

View File

@ -315,3 +315,207 @@ pub mod log_event {
pub const TYPE_ID: u64 = 0xe78f_0c5b_590e_1932; pub const TYPE_ID: u64 = 0xe78f_0c5b_590e_1932;
} }
} }
pub mod agent {
#[derive(Copy, Clone)]
pub struct Owned(());
impl ::capnp::introspect::Introspect for Owned { fn introspect() -> ::capnp::introspect::Type { ::capnp::introspect::TypeVariant::Struct(::capnp::introspect::RawBrandedStructSchema { generic: &_private::RAW_SCHEMA, field_types: _private::get_field_types, annotation_types: _private::get_annotation_types }).into() } }
impl ::capnp::traits::Owned for Owned { type Reader<'a> = Reader<'a>; type Builder<'a> = Builder<'a>; }
impl ::capnp::traits::OwnedStruct for Owned { type Reader<'a> = Reader<'a>; type Builder<'a> = Builder<'a>; }
impl ::capnp::traits::Pipelined for Owned { type Pipeline = Pipeline; }
pub struct Reader<'a> { reader: ::capnp::private::layout::StructReader<'a> }
impl <'a,> ::core::marker::Copy for Reader<'a,> {}
impl <'a,> ::core::clone::Clone for Reader<'a,> {
fn clone(&self) -> Self { *self }
}
impl <'a,> ::capnp::traits::HasTypeId for Reader<'a,> {
const TYPE_ID: u64 = _private::TYPE_ID;
}
impl <'a,> ::core::convert::From<::capnp::private::layout::StructReader<'a>> for Reader<'a,> {
fn from(reader: ::capnp::private::layout::StructReader<'a>) -> Self {
Self { reader, }
}
}
impl <'a,> ::core::convert::From<Reader<'a,>> for ::capnp::dynamic_value::Reader<'a> {
fn from(reader: Reader<'a,>) -> Self {
Self::Struct(::capnp::dynamic_struct::Reader::new(reader.reader, ::capnp::schema::StructSchema::new(::capnp::introspect::RawBrandedStructSchema { generic: &_private::RAW_SCHEMA, field_types: _private::get_field_types::<>, annotation_types: _private::get_annotation_types::<>})))
}
}
impl <'a,> ::core::fmt::Debug for Reader<'a,> {
fn fmt(&self, f: &mut ::core::fmt::Formatter<'_>) -> ::core::result::Result<(), ::core::fmt::Error> {
core::fmt::Debug::fmt(&::core::convert::Into::<::capnp::dynamic_value::Reader<'_>>::into(*self), f)
}
}
impl <'a,> ::capnp::traits::FromPointerReader<'a> for Reader<'a,> {
fn get_from_pointer(reader: &::capnp::private::layout::PointerReader<'a>, default: ::core::option::Option<&'a [::capnp::Word]>) -> ::capnp::Result<Self> {
::core::result::Result::Ok(reader.get_struct(default)?.into())
}
}
impl <'a,> ::capnp::traits::IntoInternalStructReader<'a> for Reader<'a,> {
fn into_internal_struct_reader(self) -> ::capnp::private::layout::StructReader<'a> {
self.reader
}
}
impl <'a,> ::capnp::traits::Imbue<'a> for Reader<'a,> {
fn imbue(&mut self, cap_table: &'a ::capnp::private::layout::CapTable) {
self.reader.imbue(::capnp::private::layout::CapTableReader::Plain(cap_table))
}
}
impl <'a,> Reader<'a,> {
pub fn reborrow(&self) -> Reader<'_,> {
Self { .. *self }
}
pub fn total_size(&self) -> ::capnp::Result<::capnp::MessageSize> {
self.reader.total_size()
}
#[inline]
pub fn get_name(self) -> ::capnp::Result<::capnp::text::Reader<'a>> {
::capnp::traits::FromPointerReader::get_from_pointer(&self.reader.get_pointer_field(0), ::core::option::Option::None)
}
#[inline]
pub fn has_name(&self) -> bool {
!self.reader.get_pointer_field(0).is_null()
}
}
pub struct Builder<'a> { builder: ::capnp::private::layout::StructBuilder<'a> }
impl <'a,> ::capnp::traits::HasStructSize for Builder<'a,> {
const STRUCT_SIZE: ::capnp::private::layout::StructSize = ::capnp::private::layout::StructSize { data: 0, pointers: 1 };
}
impl <'a,> ::capnp::traits::HasTypeId for Builder<'a,> {
const TYPE_ID: u64 = _private::TYPE_ID;
}
impl <'a,> ::core::convert::From<::capnp::private::layout::StructBuilder<'a>> for Builder<'a,> {
fn from(builder: ::capnp::private::layout::StructBuilder<'a>) -> Self {
Self { builder, }
}
}
impl <'a,> ::core::convert::From<Builder<'a,>> for ::capnp::dynamic_value::Builder<'a> {
fn from(builder: Builder<'a,>) -> Self {
Self::Struct(::capnp::dynamic_struct::Builder::new(builder.builder, ::capnp::schema::StructSchema::new(::capnp::introspect::RawBrandedStructSchema { generic: &_private::RAW_SCHEMA, field_types: _private::get_field_types::<>, annotation_types: _private::get_annotation_types::<>})))
}
}
impl <'a,> ::capnp::traits::ImbueMut<'a> for Builder<'a,> {
fn imbue_mut(&mut self, cap_table: &'a mut ::capnp::private::layout::CapTable) {
self.builder.imbue(::capnp::private::layout::CapTableBuilder::Plain(cap_table))
}
}
impl <'a,> ::capnp::traits::FromPointerBuilder<'a> for Builder<'a,> {
fn init_pointer(builder: ::capnp::private::layout::PointerBuilder<'a>, _size: u32) -> Self {
builder.init_struct(<Self as ::capnp::traits::HasStructSize>::STRUCT_SIZE).into()
}
fn get_from_pointer(builder: ::capnp::private::layout::PointerBuilder<'a>, default: ::core::option::Option<&'a [::capnp::Word]>) -> ::capnp::Result<Self> {
::core::result::Result::Ok(builder.get_struct(<Self as ::capnp::traits::HasStructSize>::STRUCT_SIZE, default)?.into())
}
}
impl <'a,> ::capnp::traits::SetPointerBuilder for Reader<'a,> {
fn set_pointer_builder(mut pointer: ::capnp::private::layout::PointerBuilder<'_>, value: Self, canonicalize: bool) -> ::capnp::Result<()> { pointer.set_struct(&value.reader, canonicalize) }
}
impl <'a,> Builder<'a,> {
pub fn into_reader(self) -> Reader<'a,> {
self.builder.into_reader().into()
}
pub fn reborrow(&mut self) -> Builder<'_,> {
Builder { builder: self.builder.reborrow() }
}
pub fn reborrow_as_reader(&self) -> Reader<'_,> {
self.builder.as_reader().into()
}
pub fn total_size(&self) -> ::capnp::Result<::capnp::MessageSize> {
self.builder.as_reader().total_size()
}
#[inline]
pub fn get_name(self) -> ::capnp::Result<::capnp::text::Builder<'a>> {
::capnp::traits::FromPointerBuilder::get_from_pointer(self.builder.get_pointer_field(0), ::core::option::Option::None)
}
#[inline]
pub fn set_name(&mut self, value: ::capnp::text::Reader<'_>) {
self.builder.reborrow().get_pointer_field(0).set_text(value);
}
#[inline]
pub fn init_name(self, size: u32) -> ::capnp::text::Builder<'a> {
self.builder.get_pointer_field(0).init_text(size)
}
#[inline]
pub fn has_name(&self) -> bool {
!self.builder.is_pointer_field_null(0)
}
}
pub struct Pipeline { _typeless: ::capnp::any_pointer::Pipeline }
impl ::capnp::capability::FromTypelessPipeline for Pipeline {
fn new(typeless: ::capnp::any_pointer::Pipeline) -> Self {
Self { _typeless: typeless, }
}
}
impl Pipeline {
}
mod _private {
pub static ENCODED_NODE: [::capnp::Word; 32] = [
::capnp::word(0, 0, 0, 0, 5, 0, 6, 0),
::capnp::word(160, 129, 44, 52, 151, 203, 164, 244),
::capnp::word(13, 0, 0, 0, 1, 0, 0, 0),
::capnp::word(164, 172, 216, 255, 36, 223, 58, 242),
::capnp::word(1, 0, 7, 0, 0, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(21, 0, 0, 0, 154, 0, 0, 0),
::capnp::word(29, 0, 0, 0, 7, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(25, 0, 0, 0, 63, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(109, 111, 100, 101, 108, 115, 46, 99),
::capnp::word(97, 112, 110, 112, 58, 65, 103, 101),
::capnp::word(110, 116, 0, 0, 0, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 1, 0, 1, 0),
::capnp::word(4, 0, 0, 0, 3, 0, 4, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(0, 0, 1, 0, 0, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(13, 0, 0, 0, 42, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(8, 0, 0, 0, 3, 0, 1, 0),
::capnp::word(20, 0, 0, 0, 2, 0, 1, 0),
::capnp::word(110, 97, 109, 101, 0, 0, 0, 0),
::capnp::word(12, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(12, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
];
pub fn get_field_types(index: u16) -> ::capnp::introspect::Type {
match index {
0 => <::capnp::text::Owned as ::capnp::introspect::Introspect>::introspect(),
_ => panic!("invalid field index {}", index),
}
}
pub fn get_annotation_types(child_index: Option<u16>, index: u32) -> ::capnp::introspect::Type {
panic!("invalid annotation indices ({:?}, {}) ", child_index, index)
}
pub static RAW_SCHEMA: ::capnp::introspect::RawStructSchema = ::capnp::introspect::RawStructSchema {
encoded_node: &ENCODED_NODE,
nonunion_members: NONUNION_MEMBERS,
members_by_discriminant: MEMBERS_BY_DISCRIMINANT,
};
pub static NONUNION_MEMBERS : &[u16] = &[0];
pub static MEMBERS_BY_DISCRIMINANT : &[u16] = &[];
pub const TYPE_ID: u64 = 0xf4a4_cb97_342c_81a0;
}
}

View File

@ -42,3 +42,8 @@ impl LogEvent {
} }
} }
} }
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct Agent {
pub name: String,
}

View File

@ -1,17 +1,21 @@
use std::collections::HashMap;
use std::sync::Arc; use std::sync::Arc;
use axum::async_trait; use axum::async_trait;
use churn_domain::ServerEnrollReq; use churn_capnp::CapnpPackExt;
use tokio::sync::Mutex; use churn_domain::{Agent, ServerEnrollReq};
use crate::Agent; use crate::db::Db;
#[derive(Clone)] #[derive(Clone)]
pub struct AgentService(Arc<dyn AgentServiceTrait + Send + Sync + 'static>); pub struct AgentService(Arc<dyn AgentServiceTrait + Send + Sync + 'static>);
impl AgentService {
pub fn new(db: Db) -> Self {
Self(Arc::new(DefaultAgentService::new(db)))
}
}
impl std::ops::Deref for AgentService { impl std::ops::Deref for AgentService {
type Target = Arc<dyn AgentServiceTrait + Send + Sync + 'static>; type Target = Arc<dyn AgentServiceTrait + Send + Sync + 'static>;
@ -28,7 +32,13 @@ impl Default for AgentService {
#[derive(Default)] #[derive(Default)]
struct DefaultAgentService { struct DefaultAgentService {
agents: Arc<Mutex<HashMap<String, Agent>>>, agents: Db,
}
impl DefaultAgentService {
pub fn new(db: Db) -> Self {
Self { agents: db }
}
} }
#[async_trait] #[async_trait]
@ -41,24 +51,17 @@ impl AgentServiceTrait for DefaultAgentService {
async fn enroll(&self, req: ServerEnrollReq) -> anyhow::Result<String> { async fn enroll(&self, req: ServerEnrollReq) -> anyhow::Result<String> {
let agent_name = req.agent_name; let agent_name = req.agent_name;
let mut agents = self.agents.lock().await; self.agents
.insert(
match agents.insert( "agents",
agent_name.clone(), &agent_name,
Agent { &Agent {
name: agent_name.clone(), name: agent_name.clone(),
}, }
) { .serialize_capnp(),
Some(_) => { )
tracing::debug!("agents store already contained agent, replaced existing"); .await?;
Ok(agent_name) Ok(agent_name)
} }
None => {
tracing::debug!("agents store didn't contain agent, inserted");
Ok(agent_name)
}
}
}
} }

View File

@ -1,12 +1,12 @@
use core::slice::SlicePattern; use core::slice::SlicePattern;
use std::collections::HashMap;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::sync::Arc; use std::sync::Arc;
use async_trait::async_trait; use async_trait::async_trait;
use churn_domain::ServerEnrollReq;
use tokio::sync::Mutex;
#[derive(Clone)] #[derive(Clone)]
pub struct Db(Arc<dyn DbTrait + Send + Sync + 'static>); pub struct Db(Arc<dyn DbTrait + Send + Sync + 'static>);

View File

@ -1,13 +1,13 @@
use std::collections::HashMap;
use std::sync::Arc; use std::sync::Arc;
use axum::async_trait; use axum::async_trait;
use churn_domain::{LogEvent, ServerEnrollReq}; use churn_domain::{LogEvent};
use itertools::Itertools; use itertools::Itertools;
use serde::{ser::SerializeStruct, Deserialize, Serialize};
use tokio::sync::{Mutex, RwLock};
use churn_capnp::CapnpPackExt; use churn_capnp::CapnpPackExt;
@ -68,14 +68,13 @@ impl EventServiceTrait for DefaultEventService {
let events = events let events = events
.iter() .iter()
.map(|e| match LogEvent::deserialize_capnp(e) { .flat_map(|e| match LogEvent::deserialize_capnp(e) {
Ok(o) => Ok(o), Ok(o) => Ok(o),
Err(e) => { Err(e) => {
tracing::error!("failed to deserialize capnp: {e}"); tracing::error!("failed to deserialize capnp: {e}");
Err(e) Err(e)
} }
}) })
.flatten()
.sorted_by_key(|i| i.timestamp) .sorted_by_key(|i| i.timestamp)
.skip_while(|item| item.id != cursor) .skip_while(|item| item.id != cursor)
.skip(1) .skip(1)
@ -88,8 +87,7 @@ impl EventServiceTrait for DefaultEventService {
let events = events let events = events
.iter() .iter()
.map(|e| LogEvent::deserialize_capnp(e)) .flat_map(LogEvent::deserialize_capnp)
.flatten()
.sorted_by_key(|i| i.timestamp) .sorted_by_key(|i| i.timestamp)
.collect(); .collect();

View File

@ -15,11 +15,11 @@ use axum::http::StatusCode;
use axum::response::{IntoResponse, Response}; use axum::response::{IntoResponse, Response};
use axum::routing::{get, post}; use axum::routing::{get, post};
use axum::{Json, Router}; use axum::{Json, Router};
use churn_domain::{LeaseResp, LogEvent, ServerEnrollReq, ServerMonitorResp}; use churn_domain::{Agent, LeaseResp, LogEvent, ServerEnrollReq, ServerMonitorResp};
use clap::{Args, Parser, Subcommand, ValueEnum}; use clap::{Args, Parser, Subcommand, ValueEnum};
use event::EventService; use event::EventService;
use lease::LeaseService; use lease::LeaseService;
use serde::{Deserialize, Serialize}; use serde::{Deserialize};
use serde_json::json; use serde_json::json;
use crate::db::Db; use crate::db::Db;
@ -56,11 +56,6 @@ enum Commands {
}, },
} }
#[derive(Clone, Debug, Deserialize, Serialize)]
struct Agent {
pub name: String,
}
#[derive(Clone)] #[derive(Clone)]
struct AppState { struct AppState {
agent: AgentService, agent: AgentService,
@ -94,9 +89,9 @@ async fn main() -> anyhow::Result<()> {
.route("/lease", post(agent_lease)), .route("/lease", post(agent_lease)),
) )
.with_state(AppState { .with_state(AppState {
agent: AgentService::default(), agent: AgentService::new(db.clone()),
leases: LeaseService::default(), leases: LeaseService::default(),
events: EventService::new(db), events: EventService::new(db.clone()),
}); });
tracing::info!("churn server listening on {}", host); tracing::info!("churn server listening on {}", host);
@ -221,7 +216,7 @@ async fn logs(
if events.is_empty() { if events.is_empty() {
return Ok(Json(ServerMonitorResp { return Ok(Json(ServerMonitorResp {
cursor: cursor.cursor.clone(), cursor: cursor.cursor,
logs: Vec::new(), logs: Vec::new(),
})); }));
} }

View File

@ -109,7 +109,7 @@ async fn handle_command(cmd: Command) -> anyhow::Result<()> {
} => todo!(), } => todo!(),
Commands::Monitor { Commands::Monitor {
server, server,
server_token, server_token: _,
} => { } => {
tracing::info!("monitoring server: {}", server); tracing::info!("monitoring server: {}", server);
@ -146,8 +146,6 @@ async fn handle_command(cmd: Command) -> anyhow::Result<()> {
} }
tokio::time::sleep(std::time::Duration::from_secs(1)).await; tokio::time::sleep(std::time::Duration::from_secs(1)).await;
} }
Ok(())
} }
} }
} else { } else {

View File

@ -61,8 +61,6 @@ async fn main() -> eyre::Result<()> {
} }
async fn repl(container: dagger_sdk::Container) -> eyre::Result<()> { async fn repl(container: dagger_sdk::Container) -> eyre::Result<()> {
let mut container = container;
loop { loop {
let stdin = tokio::io::stdin(); let stdin = tokio::io::stdin();
let mut stdout = tokio::io::stdout(); let mut stdout = tokio::io::stdout();
@ -81,7 +79,7 @@ async fn repl(container: dagger_sdk::Container) -> eyre::Result<()> {
break; break;
} }
container = container.with_exec(input.split(' ').collect()); let container = container.with_exec(input.split(' ').collect());
match container.stdout().await { match container.stdout().await {
Ok(stdout) => { Ok(stdout) => {
@ -91,14 +89,6 @@ async fn repl(container: dagger_sdk::Container) -> eyre::Result<()> {
eprintln!("{}", e); eprintln!("{}", e);
} }
} }
// match container.stderr().await {
// Ok(stderr) => {
// println!("{stderr}");
// }
// Err(e) => {
// eprintln!("{}", e);
// }
// }
match container.exit_code().await { match container.exit_code().await {
Ok(_) => {} Ok(_) => {}