feat: add storage

Signed-off-by: kjuulh <contact@kjuulh.io>
This commit is contained in:
2024-04-29 23:34:04 +02:00
parent cce97f62f8
commit 8e4b46d2d7
10 changed files with 530 additions and 47 deletions

View File

@@ -18,6 +18,8 @@ uuid = { version = "1.7.0", features = ["v4"] }
tower-http = { version = "0.5.2", features = ["cors", "trace"] }
serde_json = "1.0.116"
bus = "2.4.1"
dirs = "5.0.1"
[dev-dependencies]
similar-asserts = "1.5.0"
tempfile = "3.10.1"

View File

@@ -1,11 +1,12 @@
use std::{collections::BTreeMap, sync::RwLock};
use std::collections::BTreeMap;
use serde::Serialize;
use crate::{
engine::Engine,
events::Events,
log::{GraphItem, ItemState},
shared_engine::SharedEngine,
storage::Storage,
};
#[derive(Serialize, PartialEq, Eq, Debug, Clone)]
@@ -31,22 +32,30 @@ pub enum Command {
},
}
#[derive(Default)]
pub struct Commander {
engine: RwLock<Engine>,
engine: SharedEngine,
storage: Storage,
events: Events,
}
impl Commander {
pub fn new(engine: SharedEngine, storage: Storage, events: Events) -> anyhow::Result<Self> {
Ok(Self {
engine,
storage,
events,
})
}
pub fn execute(&self, cmd: Command) -> anyhow::Result<()> {
tracing::debug!("executing event: {}", serde_json::to_string(&cmd)?);
match cmd {
match cmd.clone() {
Command::CreateRoot { root } => {
self.engine.write().unwrap().create_root(&root)?;
self.engine.create_root(&root)?;
}
Command::CreateSection { root, path } => {
self.engine.write().unwrap().create(
self.engine.create(
&root,
&path.iter().map(|p| p.as_str()).collect::<Vec<_>>(),
GraphItem::Section(BTreeMap::default()),
@@ -58,7 +67,7 @@ impl Commander {
title,
description,
state,
} => self.engine.write().unwrap().create(
} => self.engine.create(
&root,
&path.iter().map(|p| p.as_str()).collect::<Vec<_>>(),
GraphItem::Item {
@@ -67,13 +76,17 @@ impl Commander {
state,
},
)?,
Command::Move { root, src, dest } => self.engine.write().unwrap().section_move(
Command::Move { root, src, dest } => self.engine.section_move(
&root,
&src.iter().map(|p| p.as_str()).collect::<Vec<_>>(),
&dest.iter().map(|p| p.as_str()).collect::<Vec<_>>(),
)?,
}
self.storage.store(&self.engine)?;
self.events.enque_command(cmd)?;
Ok(())
}
}

View File

@@ -1,6 +1,6 @@
use std::{collections::BTreeMap, fmt::Display};
use anyhow::anyhow;
use anyhow::{anyhow, Context};
use crate::log::{Graph, GraphItem};
@@ -10,6 +10,16 @@ pub struct Engine {
}
impl Engine {
pub fn engine_from_str(input: &str) -> anyhow::Result<Self> {
let graph: Graph = serde_json::from_str(input)?;
Ok(Self { graph })
}
pub fn to_str(&self) -> anyhow::Result<String> {
serde_json::to_string_pretty(&self.graph).context("failed to serialize graph")
}
pub fn create_root(&mut self, root: &str) -> anyhow::Result<()> {
self.graph
.try_insert(root.to_string(), GraphItem::User(BTreeMap::default()))

View File

@@ -1,4 +1,4 @@
use std::sync::Mutex;
use std::sync::{Arc, Mutex};
use bus::{Bus, BusReader};
@@ -9,8 +9,9 @@ pub enum Event {
CommandEvent { command: Command },
}
#[derive(Clone)]
pub struct Events {
bus: Mutex<Bus<Event>>,
bus: Arc<Mutex<Bus<Event>>>,
}
impl Default for Events {
@@ -18,7 +19,7 @@ impl Default for Events {
let bus = Bus::new(10);
Self {
bus: Mutex::new(bus),
bus: Arc::new(Mutex::new(bus)),
}
}
}

View File

@@ -11,9 +11,14 @@ use sqlx::{Pool, Postgres};
use tower_http::trace::TraceLayer;
pub mod commander;
pub mod querier;
pub mod engine;
pub mod events;
pub mod log;
pub mod shared_engine;
pub mod state;
pub mod storage;
#[derive(Parser)]
#[command(author, version, about, long_about = None, subcommand_required = true)]
@@ -28,6 +33,29 @@ enum Commands {
#[arg(env = "SERVICE_HOST", long, default_value = "127.0.0.1:3000")]
host: SocketAddr,
},
CreateRoot {
#[arg(long = "root")]
root: String,
},
CreateSection {
#[arg(long = "root")]
root: String,
#[arg(long = "path")]
path: Option<String>,
},
Get {
#[arg(long = "root")]
root: String,
#[arg(long = "path")]
path: Option<String>,
},
Info {},
}
#[tokio::main]
@@ -37,37 +65,72 @@ async fn main() -> anyhow::Result<()> {
let cli = Command::parse();
if let Some(Commands::Serve { host }) = cli.command {
tracing::info!("Starting service");
let state = state::State::new()?;
let state = SharedState(Arc::new(State::new().await?));
match cli.command {
Some(Commands::Serve { host }) => {
tracing::info!("Starting service");
let app = Router::new()
.route("/", get(root))
.with_state(state.clone())
.layer(
TraceLayer::new_for_http().make_span_with(|request: &Request<_>| {
// Log the matched route's path (with placeholders not filled in).
// Use request.uri() or OriginalUri if you want the real path.
let matched_path = request
.extensions()
.get::<MatchedPath>()
.map(MatchedPath::as_str);
let state = SharedState(Arc::new(State::new().await?));
tracing::info_span!(
"http_request",
method = ?request.method(),
matched_path,
some_other_field = tracing::field::Empty,
)
}), // ...
let app = Router::new()
.route("/", get(root))
.with_state(state.clone())
.layer(
TraceLayer::new_for_http().make_span_with(|request: &Request<_>| {
// Log the matched route's path (with placeholders not filled in).
// Use request.uri() or OriginalUri if you want the real path.
let matched_path = request
.extensions()
.get::<MatchedPath>()
.map(MatchedPath::as_str);
tracing::info_span!(
"http_request",
method = ?request.method(),
matched_path,
some_other_field = tracing::field::Empty,
)
}), // ...
);
tracing::info!("listening on {}", host);
let listener = tokio::net::TcpListener::bind(host).await.unwrap();
axum::serve(listener, app.into_make_service())
.await
.unwrap();
}
Some(Commands::CreateRoot { root }) => state
.commander
.execute(commander::Command::CreateRoot { root })?,
Some(Commands::CreateSection { root, path }) => {
state.commander.execute(commander::Command::CreateSection {
root,
path: path
.unwrap_or_default()
.split('.')
.map(|s| s.to_string())
.filter(|s| !s.is_empty())
.collect::<Vec<String>>(),
})?
}
Some(Commands::Get { root, path }) => {
let res = state.querier.get(
&root,
path.unwrap_or_default()
.split('.')
.filter(|s| !s.is_empty()),
);
tracing::info!("listening on {}", host);
let listener = tokio::net::TcpListener::bind(host).await.unwrap();
axum::serve(listener, app.into_make_service())
.await
.unwrap();
let output = serde_json::to_string_pretty(&res)?;
println!("{}", output);
}
Some(Commands::Info {}) => {
println!("graph stored at: {}", state.storage.info()?)
}
None => {}
}
Ok(())

View File

@@ -0,0 +1,24 @@
use crate::{log::GraphItem, shared_engine::SharedEngine};
pub struct Querier {
engine: SharedEngine,
}
impl Querier {
pub fn new(engine: SharedEngine) -> Self {
Self { engine }
}
pub fn get(
&self,
root: &str,
path: impl IntoIterator<Item = impl Into<String>>,
) -> Option<GraphItem> {
let path = path.into_iter().map(|i| i.into()).collect::<Vec<String>>();
tracing::debug!("quering: {}, len: ({}))", path.join("."), path.len());
self.engine
.get(root, &path.iter().map(|i| i.as_str()).collect::<Vec<_>>())
}
}

View File

@@ -0,0 +1,46 @@
use std::sync::{Arc, RwLock};
use crate::{engine::Engine, log::GraphItem};
#[derive(Clone)]
pub struct SharedEngine {
inner: Arc<RwLock<Engine>>,
}
impl From<Engine> for SharedEngine {
fn from(value: Engine) -> Self {
SharedEngine {
inner: Arc::new(RwLock::new(value)),
}
}
}
impl SharedEngine {
pub fn to_str(&self) -> anyhow::Result<String> {
self.inner.read().unwrap().to_str()
}
pub fn create_root(&self, root: &str) -> anyhow::Result<()> {
self.inner.write().unwrap().create_root(root)
}
pub fn create(&self, root: &str, path: &[&str], item: GraphItem) -> anyhow::Result<()> {
self.inner.write().unwrap().create(root, path, item)
}
pub fn get(&self, root: &str, path: &[&str]) -> Option<GraphItem> {
self.inner.read().unwrap().get(root, path).cloned()
}
pub fn section_move(
&self,
root: &str,
src_path: &[&str],
dest_path: &[&str],
) -> anyhow::Result<()> {
self.inner
.write()
.unwrap()
.section_move(root, src_path, dest_path)
}
}

View File

@@ -0,0 +1,31 @@
use crate::{
commander::Commander, events::Events, querier::Querier, shared_engine::SharedEngine,
storage::Storage,
};
pub struct State {
engine: SharedEngine,
pub storage: Storage,
events: Events,
pub commander: Commander,
pub querier: Querier,
}
impl State {
pub fn new() -> anyhow::Result<Self> {
let storage = Storage::new();
let engine = storage.load()?;
let events = Events::default();
let engine = SharedEngine::from(engine);
Ok(Self {
engine: engine.clone(),
storage: storage.clone(),
events: events.clone(),
commander: Commander::new(engine.clone(), storage, events)?,
querier: Querier::new(engine),
})
}
}

View File

@@ -0,0 +1,253 @@
use std::{
path::{Path, PathBuf},
sync::{Arc, Mutex},
time::Duration,
};
use crate::{engine::Engine, shared_engine::SharedEngine};
pub struct LockFile(PathBuf);
impl Drop for LockFile {
fn drop(&mut self) {
std::fs::remove_file(&self.0).expect("to be able to delete lockfile")
}
}
impl From<PathBuf> for LockFile {
fn from(value: PathBuf) -> Self {
Self(value)
}
}
#[derive(Clone)]
pub struct Storage {
base: PathBuf,
lock_file: Arc<Mutex<Option<LockFile>>>,
}
impl Default for Storage {
fn default() -> Self {
Self::new()
}
}
impl Storage {
pub fn new() -> Self {
let data_dir = dirs::data_local_dir()
.ok_or(anyhow::anyhow!("failed to retrieve the users data dir"))
.expect("to be able to find config");
Self {
base: data_dir,
lock_file: Arc::new(Mutex::new(None)),
}
}
pub fn with_base(&mut self, base: &Path) {
self.base = base.to_path_buf();
}
pub fn store(&self, engine: &SharedEngine) -> anyhow::Result<()> {
let state_path = self.state()?;
std::fs::write(state_path, engine.to_str()?)?;
Ok(())
}
pub fn load(&self) -> anyhow::Result<Engine> {
let mut lock = self.lock_file.lock().unwrap();
if lock.is_none() {
let lock_file = self.state_lock_file()?;
*lock = Some(lock_file);
}
let engine = match self.state_file()? {
Some(contents) => Engine::engine_from_str(&contents)?,
None => Engine::default(),
};
Ok(engine)
}
pub fn unload(self) -> anyhow::Result<()> {
drop(self);
Ok(())
}
fn state(&self) -> anyhow::Result<PathBuf> {
self.cache().map(|c| c.join("graph.json"))
}
fn state_file(&self) -> anyhow::Result<Option<String>> {
let state_path = self.state()?;
if !state_path.exists() {
return Ok(None);
}
let contents = std::fs::read_to_string(&state_path)?;
Ok(Some(contents))
}
fn state_lock(&self) -> anyhow::Result<PathBuf> {
self.cache().map(|c| c.join("graph.lock"))
}
fn create_lock_file(&self) -> anyhow::Result<()> {
let lock_path = self.state_lock()?;
if let Some(parent) = lock_path.parent() {
std::fs::create_dir_all(parent)?;
}
std::fs::write(lock_path, "hyperlog-lock")?;
Ok(())
}
fn state_lock_file(&self) -> anyhow::Result<LockFile> {
let lock_path = self.state_lock()?;
if !lock_path.exists() {
self.create_lock_file()?;
return Ok(LockFile::from(lock_path));
}
if let Ok(modified) = lock_path.metadata()?.modified() {
if modified.elapsed()? > Duration::from_secs(86400) {
std::fs::remove_file(&lock_path)?;
self.create_lock_file()?;
return Ok(LockFile::from(lock_path));
}
}
anyhow::bail!("lock file exists and is valid. Aborting");
}
fn cache(&self) -> anyhow::Result<PathBuf> {
Ok(self.base.join("hyperlog"))
}
pub fn info(&self) -> anyhow::Result<String> {
Ok(format!("storage:\n\tgraph: {}", self.state()?.display()))
}
}
#[cfg(test)]
mod test {
use std::collections::BTreeMap;
use similar_asserts::assert_eq;
use crate::log::GraphItem;
use super::*;
#[test]
fn can_create_state() -> anyhow::Result<()> {
let tempdir = tempfile::tempdir()?;
let mut storage = Storage::default();
storage.with_base(tempdir.path());
let engine = SharedEngine::from(storage.load()?);
engine.create_root("can_create_state")?;
storage.store(&engine)?;
let graph = std::fs::read_to_string(tempdir.path().join("hyperlog").join("graph.json"))?;
let lock = std::fs::read_to_string(tempdir.path().join("hyperlog").join("graph.lock"))?;
assert_eq!(
r#"{
"can_create_state": {
"type": "user"
}
}"#
.to_string(),
graph
);
assert_eq!(r#"hyperlog-lock"#.to_string(), lock);
Ok(())
}
#[test]
fn lock_already_exists() -> anyhow::Result<()> {
let tempdir = tempfile::tempdir()?;
let mut storage = Storage::default();
storage.with_base(tempdir.path());
let _engine = storage.load()?;
let mut storage_should_fail = Storage::default();
storage_should_fail.with_base(tempdir.path());
let engine_should_fail = storage_should_fail.load();
assert!(engine_should_fail.is_err());
if let Err(e) = engine_should_fail {
assert_eq!(
"lock file exists and is valid. Aborting".to_string(),
e.to_string()
);
}
Ok(())
}
#[test]
fn lock_is_cleaned_up() -> anyhow::Result<()> {
let tempdir = tempfile::tempdir()?;
let mut storage = Storage::default();
storage.with_base(tempdir.path());
let engine = SharedEngine::from(storage.load()?);
engine.create_root("can_create_state")?;
storage.store(&engine)?;
storage.unload()?;
assert!(!tempdir.path().join("hyperlog").join("graph.lock").exists());
Ok(())
}
#[test]
fn can_load_state() -> anyhow::Result<()> {
let tempdir = tempfile::tempdir()?;
let mut storage = Storage::default();
storage.with_base(tempdir.path());
let engine = SharedEngine::from(storage.load()?);
engine.create_root("can_create_state")?;
storage.store(&engine)?;
let graph = std::fs::read_to_string(tempdir.path().join("hyperlog").join("graph.json"))?;
assert_eq!(
r#"{
"can_create_state": {
"type": "user"
}
}"#
.to_string(),
graph
);
let engine = storage.load()?;
let res = engine.get("can_create_state", &[]);
assert_eq!(Some(GraphItem::User(BTreeMap::default())), res.cloned());
Ok(())
}
}