feat: with sled db and capnp

Signed-off-by: kjuulh <contact@kjuulh.io>
This commit is contained in:
2023-08-27 18:15:25 +02:00
parent 757d1081bd
commit 75d99c2461
13 changed files with 207 additions and 41 deletions

View File

@@ -11,10 +11,13 @@ publish.workspace = true
[dependencies]
churn-domain.workspace = true
uuid.workspace = true
anyhow.workspace = true
chrono.workspace = true
capnp = "0.17.2"
[build-dependencies]
capnpc = "0.17.2"

View File

@@ -4,4 +4,5 @@ struct LogEvent {
id @0 :Text;
author @1 :Text;
content @2 :Text;
datetime @3 :Int64;
}

View File

@@ -1,6 +1,6 @@
use capnp::message::{Builder, HeapAllocator};
use capnp::message::{ReaderOptions, TypedReader};
use capnp::serialize::{self, OwnedSegments};
use capnp::serialize::{self, OwnedSegments, SliceSegments};
use capnp::traits::{FromPointerReader, Owned};
use churn_domain::LogEvent;
@@ -10,21 +10,21 @@ mod models_capnp;
pub trait CapnpPackExt {
type Return;
fn serialize_capnp(&self) -> String;
fn deserialize_capnp(content: &str) -> anyhow::Result<Self::Return>;
fn serialize_capnp(&self) -> Vec<u8>;
fn deserialize_capnp(content: &Vec<u8>) -> anyhow::Result<Self::Return>;
fn capnp_to_string(builder: &Builder<HeapAllocator>) -> String {
fn capnp_to_string(builder: &Builder<HeapAllocator>) -> Vec<u8> {
let msg = serialize::write_message_to_words(builder);
std::str::from_utf8(msg.as_slice())
.expect("to be able to parse capnp as utf8")
.to_string()
msg
}
fn string_to_capnp<'a, S>(content: &'a str) -> TypedReader<OwnedSegments, S>
fn string_to_capnp<'a, S>(content: &'a Vec<u8>) -> TypedReader<SliceSegments, S>
where
S: Owned,
{
let log_event = serialize::read_message(content.as_bytes(), ReaderOptions::new()).unwrap();
let log_event =
serialize::read_message_from_flat_slice(&mut content.as_slice(), ReaderOptions::new())
.unwrap();
let log_event = log_event.into_typed::<S>();
@@ -35,18 +35,19 @@ pub trait CapnpPackExt {
impl CapnpPackExt for LogEvent {
type Return = Self;
fn serialize_capnp(&self) -> String {
fn serialize_capnp(&self) -> Vec<u8> {
let mut builder = Builder::new_default();
let mut log_event = builder.init_root::<models_capnp::log_event::Builder>();
log_event.set_id(&self.id.to_string());
log_event.set_author(&self.author);
log_event.set_content(&self.content);
log_event.set_datetime(self.timestamp.timestamp());
Self::capnp_to_string(&builder)
}
fn deserialize_capnp(content: &str) -> anyhow::Result<Self> {
fn deserialize_capnp(content: &Vec<u8>) -> anyhow::Result<Self> {
let log_event = Self::string_to_capnp::<models_capnp::log_event::Owned>(content);
let log_event = log_event.get()?;
@@ -54,6 +55,10 @@ impl CapnpPackExt for LogEvent {
id: uuid::Uuid::parse_str(log_event.get_id()?)?,
author: log_event.get_author()?.into(),
content: log_event.get_content()?.into(),
timestamp: chrono::DateTime::<chrono::Utc>::from_utc(
chrono::NaiveDateTime::from_timestamp_opt(log_event.get_datetime(), 0).unwrap(),
chrono::Utc,
),
})
}
}

View File

@@ -88,11 +88,15 @@ pub mod log_event {
pub fn has_content(&self) -> bool {
!self.reader.get_pointer_field(2).is_null()
}
#[inline]
pub fn get_datetime(self) -> i64 {
self.reader.get_data_field::<i64>(0)
}
}
pub struct Builder<'a> { builder: ::capnp::private::layout::StructBuilder<'a> }
impl <'a,> ::capnp::traits::HasStructSize for Builder<'a,> {
const STRUCT_SIZE: ::capnp::private::layout::StructSize = ::capnp::private::layout::StructSize { data: 0, pointers: 3 };
const STRUCT_SIZE: ::capnp::private::layout::StructSize = ::capnp::private::layout::StructSize { data: 1, pointers: 3 };
}
impl <'a,> ::capnp::traits::HasTypeId for Builder<'a,> {
const TYPE_ID: u64 = _private::TYPE_ID;
@@ -190,6 +194,14 @@ pub mod log_event {
pub fn has_content(&self) -> bool {
!self.builder.is_pointer_field_null(2)
}
#[inline]
pub fn get_datetime(self) -> i64 {
self.builder.get_data_field::<i64>(0)
}
#[inline]
pub fn set_datetime(&mut self, value: i64) {
self.builder.set_data_field::<i64>(0, value);
}
}
pub struct Pipeline { _typeless: ::capnp::any_pointer::Pipeline }
@@ -201,45 +213,52 @@ pub mod log_event {
impl Pipeline {
}
mod _private {
pub static ENCODED_NODE: [::capnp::Word; 62] = [
pub static ENCODED_NODE: [::capnp::Word; 78] = [
::capnp::word(0, 0, 0, 0, 5, 0, 6, 0),
::capnp::word(50, 25, 14, 89, 91, 12, 143, 231),
::capnp::word(13, 0, 0, 0, 1, 0, 0, 0),
::capnp::word(13, 0, 0, 0, 1, 0, 1, 0),
::capnp::word(164, 172, 216, 255, 36, 223, 58, 242),
::capnp::word(3, 0, 7, 0, 0, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(21, 0, 0, 0, 178, 0, 0, 0),
::capnp::word(29, 0, 0, 0, 7, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(25, 0, 0, 0, 175, 0, 0, 0),
::capnp::word(25, 0, 0, 0, 231, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(109, 111, 100, 101, 108, 115, 46, 99),
::capnp::word(97, 112, 110, 112, 58, 76, 111, 103),
::capnp::word(69, 118, 101, 110, 116, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 1, 0, 1, 0),
::capnp::word(12, 0, 0, 0, 3, 0, 4, 0),
::capnp::word(16, 0, 0, 0, 3, 0, 4, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(0, 0, 1, 0, 0, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(69, 0, 0, 0, 26, 0, 0, 0),
::capnp::word(97, 0, 0, 0, 26, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(64, 0, 0, 0, 3, 0, 1, 0),
::capnp::word(76, 0, 0, 0, 2, 0, 1, 0),
::capnp::word(92, 0, 0, 0, 3, 0, 1, 0),
::capnp::word(104, 0, 0, 0, 2, 0, 1, 0),
::capnp::word(1, 0, 0, 0, 1, 0, 0, 0),
::capnp::word(0, 0, 1, 0, 1, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(73, 0, 0, 0, 58, 0, 0, 0),
::capnp::word(101, 0, 0, 0, 58, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(68, 0, 0, 0, 3, 0, 1, 0),
::capnp::word(80, 0, 0, 0, 2, 0, 1, 0),
::capnp::word(96, 0, 0, 0, 3, 0, 1, 0),
::capnp::word(108, 0, 0, 0, 2, 0, 1, 0),
::capnp::word(2, 0, 0, 0, 2, 0, 0, 0),
::capnp::word(0, 0, 1, 0, 2, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(77, 0, 0, 0, 66, 0, 0, 0),
::capnp::word(105, 0, 0, 0, 66, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(72, 0, 0, 0, 3, 0, 1, 0),
::capnp::word(84, 0, 0, 0, 2, 0, 1, 0),
::capnp::word(100, 0, 0, 0, 3, 0, 1, 0),
::capnp::word(112, 0, 0, 0, 2, 0, 1, 0),
::capnp::word(3, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(0, 0, 1, 0, 3, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(109, 0, 0, 0, 74, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(108, 0, 0, 0, 3, 0, 1, 0),
::capnp::word(120, 0, 0, 0, 2, 0, 1, 0),
::capnp::word(105, 100, 0, 0, 0, 0, 0, 0),
::capnp::word(12, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
@@ -264,12 +283,22 @@ pub mod log_event {
::capnp::word(12, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(100, 97, 116, 101, 116, 105, 109, 101),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(5, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(5, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
::capnp::word(0, 0, 0, 0, 0, 0, 0, 0),
];
pub fn get_field_types(index: u16) -> ::capnp::introspect::Type {
match index {
0 => <::capnp::text::Owned as ::capnp::introspect::Introspect>::introspect(),
1 => <::capnp::text::Owned as ::capnp::introspect::Introspect>::introspect(),
2 => <::capnp::text::Owned as ::capnp::introspect::Introspect>::introspect(),
3 => <i64 as ::capnp::introspect::Introspect>::introspect(),
_ => panic!("invalid field index {}", index),
}
}
@@ -281,7 +310,7 @@ pub mod log_event {
nonunion_members: NONUNION_MEMBERS,
members_by_discriminant: MEMBERS_BY_DISCRIMINANT,
};
pub static NONUNION_MEMBERS : &[u16] = &[0,1,2];
pub static NONUNION_MEMBERS : &[u16] = &[0,1,2,3];
pub static MEMBERS_BY_DISCRIMINANT : &[u16] = &[];
pub const TYPE_ID: u64 = 0xe78f_0c5b_590e_1932;
}

View File

@@ -20,3 +20,4 @@ axum.workspace = true
reqwest.workspace = true
serde.workspace = true
uuid.workspace = true
chrono.workspace = true

View File

@@ -29,6 +29,7 @@ pub struct LogEvent {
pub id: uuid::Uuid,
pub author: String,
pub content: String,
pub timestamp: chrono::DateTime<chrono::Utc>,
}
impl LogEvent {
@@ -37,6 +38,7 @@ impl LogEvent {
id: uuid::Uuid::new_v4(),
author: author.into(),
content: content.into(),
timestamp: chrono::Utc::now(),
}
}
}

View File

@@ -24,5 +24,6 @@ serde.workspace = true
serde_json.workspace = true
uuid.workspace = true
async-trait.workspace = true
itertools.workspace = true
sled.workspace = true

View File

@@ -1,3 +1,4 @@
use core::slice::SlicePattern;
use std::collections::HashMap;
use std::path::{Path, PathBuf};
@@ -50,30 +51,29 @@ impl DefaultDb {
#[async_trait]
pub trait DbTrait {
async fn insert(&self, namespace: &str, key: &str, value: &str) -> anyhow::Result<()>;
async fn get_all(&self, namespace: &str) -> anyhow::Result<Vec<String>>;
async fn insert(&self, namespace: &str, key: &str, value: &[u8]) -> anyhow::Result<()>;
async fn get_all(&self, namespace: &str) -> anyhow::Result<Vec<Vec<u8>>>;
}
#[async_trait]
impl DbTrait for DefaultDb {
async fn insert(&self, namespace: &str, key: &str, value: &str) -> anyhow::Result<()> {
async fn insert(&self, namespace: &str, key: &str, value: &[u8]) -> anyhow::Result<()> {
let tree = self.db.open_tree(namespace)?;
tree.insert(key, value)?;
tree.flush_async().await?;
//tree.flush_async().await?;
Ok(())
}
async fn get_all(&self, namespace: &str) -> anyhow::Result<Vec<String>> {
async fn get_all(&self, namespace: &str) -> anyhow::Result<Vec<Vec<u8>>> {
let tree = self.db.open_tree(namespace)?;
Ok(tree
.iter()
.flatten()
.map(|(_, val)| val)
.flat_map(|v| v.iter().map(|v| v.to_string()).collect::<Vec<String>>())
.map(|(_, val)| val.as_slice().to_vec())
.collect::<Vec<_>>())
}
}

View File

@@ -5,6 +5,7 @@ use std::sync::Arc;
use axum::async_trait;
use churn_domain::{LogEvent, ServerEnrollReq};
use itertools::Itertools;
use serde::{ser::SerializeStruct, Deserialize, Serialize};
use tokio::sync::{Mutex, RwLock};
@@ -58,7 +59,7 @@ impl EventServiceTrait for DefaultEventService {
async fn append(&self, req: LogEvent) -> anyhow::Result<()> {
self.db
.insert("events_log", &req.id.to_string(), &req.serialize_capnp())
.await;
.await?;
Ok(())
}
@@ -67,8 +68,15 @@ impl EventServiceTrait for DefaultEventService {
let events = events
.iter()
.map(|e| LogEvent::deserialize_capnp(e))
.map(|e| match LogEvent::deserialize_capnp(e) {
Ok(o) => Ok(o),
Err(e) => {
tracing::error!("failed to deserialize capnp: {e}");
Err(e)
}
})
.flatten()
.sorted_by_key(|i| i.timestamp)
.skip_while(|item| item.id != cursor)
.skip(1)
.collect();
@@ -82,6 +90,7 @@ impl EventServiceTrait for DefaultEventService {
.iter()
.map(|e| LogEvent::deserialize_capnp(e))
.flatten()
.sorted_by_key(|i| i.timestamp)
.collect();
Ok(events)

View File

@@ -1,3 +1,5 @@
#![feature(slice_pattern)]
mod agent;
mod db;
mod event;
@@ -204,10 +206,10 @@ async fn logs(
match cursor.cursor {
Some(cursor) => {
tracing::trace!("finding logs from cursor: {}", cursor);
tracing::debug!("finding logs from cursor: {}", cursor);
}
None => {
tracing::trace!("finding logs from beginning");
tracing::debug!("finding logs from beginning");
}
}

View File

@@ -17,6 +17,9 @@ async fn main() -> eyre::Result<()> {
let cli = build_container(client.clone(), "churn").await?;
let server = build_container(client.clone(), "churn-server").await?;
let server = server
.with_env_variable("CHURN_DATABASE", "sled")
.with_env_variable("CHURN_SLED_PATH", "/mnt/sled")
.with_mounted_cache("/mnt/sled", client.cache_volume("sled").id().await?)
.with_exec(vec!["churn-server", "serve", "--host", "0.0.0.0:3000"])
.with_exposed_port(3000);
@@ -113,7 +116,14 @@ async fn build_container(
bin_name: &str,
) -> eyre::Result<dagger_sdk::Container> {
let crates = &["crates/*", "ci"];
let debian_deps = &["libssl-dev", "pkg-config", "openssl", "git", "jq"];
let debian_deps = &[
"libssl-dev",
"pkg-config",
"openssl",
"git",
"jq",
"capnproto",
];
let debian_image = "debian:bullseye".to_string();
let images = dagger_rust::build::RustBuild::new(client.clone())