feat: with sled db

Signed-off-by: kjuulh <contact@kjuulh.io>
This commit is contained in:
2023-08-27 16:49:53 +02:00
parent 9e61ed7ef7
commit 757d1081bd
12 changed files with 682 additions and 31 deletions

View File

@@ -11,6 +11,7 @@ publish.workspace = true
[dependencies]
churn-domain.workspace = true
churn-capnp.workspace = true
anyhow.workspace = true
tokio.workspace = true
@@ -22,3 +23,6 @@ axum.workspace = true
serde.workspace = true
serde_json.workspace = true
uuid.workspace = true
async-trait.workspace = true
sled.workspace = true

View File

@@ -0,0 +1,79 @@
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use async_trait::async_trait;
use churn_domain::ServerEnrollReq;
use tokio::sync::Mutex;
#[derive(Clone)]
pub struct Db(Arc<dyn DbTrait + Send + Sync + 'static>);
impl Db {
pub fn new_sled(path: &Path) -> Self {
Self(Arc::new(DefaultDb::new(path)))
}
}
impl std::ops::Deref for Db {
type Target = Arc<dyn DbTrait + Send + Sync + 'static>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl Default for Db {
fn default() -> Self {
Self(Arc::new(DefaultDb::default()))
}
}
struct DefaultDb {
db: sled::Db,
}
impl Default for DefaultDb {
fn default() -> Self {
Self::new(&PathBuf::from("churn-server.sled"))
}
}
impl DefaultDb {
pub fn new(path: &Path) -> Self {
Self {
db: sled::open(path).expect("to be able open a sled path"),
}
}
}
#[async_trait]
pub trait DbTrait {
async fn insert(&self, namespace: &str, key: &str, value: &str) -> anyhow::Result<()>;
async fn get_all(&self, namespace: &str) -> anyhow::Result<Vec<String>>;
}
#[async_trait]
impl DbTrait for DefaultDb {
async fn insert(&self, namespace: &str, key: &str, value: &str) -> anyhow::Result<()> {
let tree = self.db.open_tree(namespace)?;
tree.insert(key, value)?;
tree.flush_async().await?;
Ok(())
}
async fn get_all(&self, namespace: &str) -> anyhow::Result<Vec<String>> {
let tree = self.db.open_tree(namespace)?;
Ok(tree
.iter()
.flatten()
.map(|(_, val)| val)
.flat_map(|v| v.iter().map(|v| v.to_string()).collect::<Vec<String>>())
.collect::<Vec<_>>())
}
}

View File

@@ -4,13 +4,23 @@ use std::sync::Arc;
use axum::async_trait;
use churn_domain::ServerEnrollReq;
use churn_domain::{LogEvent, ServerEnrollReq};
use serde::{ser::SerializeStruct, Deserialize, Serialize};
use tokio::sync::{Mutex, RwLock};
use churn_capnp::CapnpPackExt;
use crate::db::Db;
#[derive(Clone)]
pub struct EventService(Arc<dyn EventServiceTrait + Send + Sync + 'static>);
impl EventService {
pub fn new(db: Db) -> Self {
Self(Arc::new(DefaultEventService::new(db)))
}
}
impl std::ops::Deref for EventService {
type Target = Arc<dyn EventServiceTrait + Send + Sync + 'static>;
@@ -27,23 +37,12 @@ impl Default for EventService {
#[derive(Default)]
struct DefaultEventService {
agents: Arc<RwLock<Vec<LogEvent>>>,
db: Db,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct LogEvent {
pub id: uuid::Uuid,
pub author: String,
pub content: String,
}
impl LogEvent {
pub fn new(author: impl Into<String>, content: impl Into<String>) -> Self {
Self {
id: uuid::Uuid::new_v4(),
author: author.into(),
content: content.into(),
}
impl DefaultEventService {
pub fn new(db: Db) -> Self {
Self { db }
}
}
@@ -57,22 +56,34 @@ pub trait EventServiceTrait {
#[async_trait]
impl EventServiceTrait for DefaultEventService {
async fn append(&self, req: LogEvent) -> anyhow::Result<()> {
let mut events = self.agents.write().await;
events.push(req);
self.db
.insert("events_log", &req.id.to_string(), &req.serialize_capnp())
.await;
Ok(())
}
async fn get_from_cursor(&self, cursor: uuid::Uuid) -> anyhow::Result<Vec<LogEvent>> {
let events = self.agents.read().await;
let items = events
let events = self.db.get_all("events_log").await?;
let events = events
.iter()
.map(|e| LogEvent::deserialize_capnp(e))
.flatten()
.skip_while(|item| item.id != cursor)
.skip(1)
.cloned()
.collect::<Vec<_>>();
Ok(items)
.collect();
Ok(events)
}
async fn get_from_beginning(&self) -> anyhow::Result<Vec<LogEvent>> {
let events = self.agents.read().await;
Ok(events.iter().cloned().collect())
let events = self.db.get_all("events_log").await?;
let events = events
.iter()
.map(|e| LogEvent::deserialize_capnp(e))
.flatten()
.collect();
Ok(events)
}
}

View File

@@ -1,8 +1,10 @@
mod agent;
mod db;
mod event;
mod lease;
use std::net::SocketAddr;
use std::path::PathBuf;
use agent::AgentService;
use anyhow::Error;
@@ -11,18 +13,37 @@ use axum::http::StatusCode;
use axum::response::{IntoResponse, Response};
use axum::routing::{get, post};
use axum::{Json, Router};
use churn_domain::{LeaseResp, ServerEnrollReq, ServerMonitorResp};
use clap::{Parser, Subcommand};
use event::{EventService, LogEvent};
use churn_domain::{LeaseResp, LogEvent, ServerEnrollReq, ServerMonitorResp};
use clap::{Args, Parser, Subcommand, ValueEnum};
use event::EventService;
use lease::LeaseService;
use serde::{Deserialize, Serialize};
use serde_json::json;
use crate::db::Db;
#[derive(Parser)]
#[command(author, version, about, long_about = None, subcommand_required = true)]
struct Command {
#[command(subcommand)]
command: Option<Commands>,
#[clap(flatten)]
global: GlobalArgs,
}
#[derive(Args)]
struct GlobalArgs {
#[arg(env = "CHURN_DATABASE", long, default_value = "sled")]
database: DatabaseType,
#[arg(env = "CHURN_SLED_PATH", long, default_value = "churn-server.sled")]
sled_path: PathBuf,
}
#[derive(ValueEnum, Clone)]
enum DatabaseType {
Sled,
}
#[derive(Subcommand)]
@@ -55,6 +76,9 @@ async fn main() -> anyhow::Result<()> {
match cli.command {
Some(Commands::Serve { host }) => {
tracing::info!("Starting churn server");
let db = match cli.global.database {
DatabaseType::Sled => Db::new_sled(&cli.global.sled_path),
};
let app = Router::new()
.route("/ping", get(ping))
@@ -70,7 +94,7 @@ async fn main() -> anyhow::Result<()> {
.with_state(AppState {
agent: AgentService::default(),
leases: LeaseService::default(),
events: EventService::default(),
events: EventService::new(db),
});
tracing::info!("churn server listening on {}", host);