From 8e4b46d2d7afd8996a83d61a6f3fdcff79838aa7 Mon Sep 17 00:00:00 2001 From: kjuulh Date: Mon, 29 Apr 2024 23:34:04 +0200 Subject: [PATCH] feat: add storage Signed-off-by: kjuulh --- Cargo.lock | 56 +++++- crates/hyperlog/Cargo.toml | 2 + crates/hyperlog/src/commander.rs | 31 +++- crates/hyperlog/src/engine.rs | 12 +- crates/hyperlog/src/events.rs | 7 +- crates/hyperlog/src/main.rs | 115 +++++++++--- crates/hyperlog/src/querier.rs | 24 +++ crates/hyperlog/src/shared_engine.rs | 46 +++++ crates/hyperlog/src/state.rs | 31 ++++ crates/hyperlog/src/storage.rs | 253 +++++++++++++++++++++++++++ 10 files changed, 530 insertions(+), 47 deletions(-) create mode 100644 crates/hyperlog/src/querier.rs create mode 100644 crates/hyperlog/src/shared_engine.rs create mode 100644 crates/hyperlog/src/state.rs create mode 100644 crates/hyperlog/src/storage.rs diff --git a/Cargo.lock b/Cargo.lock index f2ceeb7..e37c3ae 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -405,6 +405,27 @@ dependencies = [ "subtle", ] +[[package]] +name = "dirs" +version = "5.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "44c45a9d03d6676652bcb5e724c7e988de1acad23a711b5217ab9cbecbec2225" +dependencies = [ + "dirs-sys", +] + +[[package]] +name = "dirs-sys" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "520f05a5cbd335fae5a99ff7a6ab8627577660ee5cfd6a94a6a929b52ff0321c" +dependencies = [ + "libc", + "option-ext", + "redox_users", + "windows-sys 0.48.0", +] + [[package]] name = "dotenv" version = "0.15.0" @@ -780,11 +801,13 @@ dependencies = [ "axum", "bus", "clap", + "dirs", "dotenv", "serde", "serde_json", "similar-asserts", "sqlx", + "tempfile", "tokio", "tower-http", "tracing", @@ -871,6 +894,16 @@ version = "0.2.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4ec2a862134d2a7d32d7983ddcdd1c4923530833c9f2ea1a44fc5fa473989058" +[[package]] +name = "libredox" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c0ff37bd590ca25063e35af745c343cb7a0271906fb7b37e4813e8f79f00268d" +dependencies = [ + "bitflags 2.4.2", + "libc", +] + [[package]] name = "libsqlite3-sys" version = "0.27.0" @@ -1050,6 +1083,12 @@ version = "1.18.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dd8b5dd2ae5ed71462c540258bedcb51965123ad7e7ccf4b9a8cafaa4a63576d" +[[package]] +name = "option-ext" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "04744f49eae99ab78e0d5c0b603ab218f515ea8cfe5a456d7629ad883a3b6e7d" + [[package]] name = "overload" version = "0.1.1" @@ -1074,7 +1113,7 @@ checksum = "93f00c865fe7cabf650081affecd3871070f26767e7b2070a3ffae14c654b447" dependencies = [ "cfg-if", "libc", - "redox_syscall 0.3.5", + "redox_syscall", "smallvec", "windows-targets 0.48.0", ] @@ -1229,12 +1268,14 @@ dependencies = [ ] [[package]] -name = "redox_syscall" -version = "0.4.1" +name = "redox_users" +version = "0.4.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4722d768eff46b75989dd134e5c353f0d6296e5aaa3132e776cbdb56be7731aa" +checksum = "bd283d9651eeda4b2a83a43c1c91b266c40fd76ecd39a50a8c630ae69dc72891" dependencies = [ - "bitflags 1.3.2", + "getrandom", + "libredox", + "thiserror", ] [[package]] @@ -1816,13 +1857,12 @@ checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160" [[package]] name = "tempfile" -version = "3.9.0" +version = "3.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "01ce4141aa927a6d1bd34a041795abd0db1cccba5d5f24b009f694bdf3a1f3fa" +checksum = "85b77fafb263dd9d05cbeac119526425676db3784113aa9295c88498cbf8bff1" dependencies = [ "cfg-if", "fastrand", - "redox_syscall 0.4.1", "rustix 0.38.31", "windows-sys 0.52.0", ] diff --git a/crates/hyperlog/Cargo.toml b/crates/hyperlog/Cargo.toml index 991b09d..5a84bc1 100644 --- a/crates/hyperlog/Cargo.toml +++ b/crates/hyperlog/Cargo.toml @@ -18,6 +18,8 @@ uuid = { version = "1.7.0", features = ["v4"] } tower-http = { version = "0.5.2", features = ["cors", "trace"] } serde_json = "1.0.116" bus = "2.4.1" +dirs = "5.0.1" [dev-dependencies] similar-asserts = "1.5.0" +tempfile = "3.10.1" diff --git a/crates/hyperlog/src/commander.rs b/crates/hyperlog/src/commander.rs index fb1d2de..dc7e599 100644 --- a/crates/hyperlog/src/commander.rs +++ b/crates/hyperlog/src/commander.rs @@ -1,11 +1,12 @@ -use std::{collections::BTreeMap, sync::RwLock}; +use std::collections::BTreeMap; use serde::Serialize; use crate::{ - engine::Engine, events::Events, log::{GraphItem, ItemState}, + shared_engine::SharedEngine, + storage::Storage, }; #[derive(Serialize, PartialEq, Eq, Debug, Clone)] @@ -31,22 +32,30 @@ pub enum Command { }, } -#[derive(Default)] pub struct Commander { - engine: RwLock, + engine: SharedEngine, + storage: Storage, events: Events, } impl Commander { + pub fn new(engine: SharedEngine, storage: Storage, events: Events) -> anyhow::Result { + Ok(Self { + engine, + storage, + events, + }) + } + pub fn execute(&self, cmd: Command) -> anyhow::Result<()> { tracing::debug!("executing event: {}", serde_json::to_string(&cmd)?); - match cmd { + match cmd.clone() { Command::CreateRoot { root } => { - self.engine.write().unwrap().create_root(&root)?; + self.engine.create_root(&root)?; } Command::CreateSection { root, path } => { - self.engine.write().unwrap().create( + self.engine.create( &root, &path.iter().map(|p| p.as_str()).collect::>(), GraphItem::Section(BTreeMap::default()), @@ -58,7 +67,7 @@ impl Commander { title, description, state, - } => self.engine.write().unwrap().create( + } => self.engine.create( &root, &path.iter().map(|p| p.as_str()).collect::>(), GraphItem::Item { @@ -67,13 +76,17 @@ impl Commander { state, }, )?, - Command::Move { root, src, dest } => self.engine.write().unwrap().section_move( + Command::Move { root, src, dest } => self.engine.section_move( &root, &src.iter().map(|p| p.as_str()).collect::>(), &dest.iter().map(|p| p.as_str()).collect::>(), )?, } + self.storage.store(&self.engine)?; + + self.events.enque_command(cmd)?; + Ok(()) } } diff --git a/crates/hyperlog/src/engine.rs b/crates/hyperlog/src/engine.rs index 717d6af..9816b49 100644 --- a/crates/hyperlog/src/engine.rs +++ b/crates/hyperlog/src/engine.rs @@ -1,6 +1,6 @@ use std::{collections::BTreeMap, fmt::Display}; -use anyhow::anyhow; +use anyhow::{anyhow, Context}; use crate::log::{Graph, GraphItem}; @@ -10,6 +10,16 @@ pub struct Engine { } impl Engine { + pub fn engine_from_str(input: &str) -> anyhow::Result { + let graph: Graph = serde_json::from_str(input)?; + + Ok(Self { graph }) + } + + pub fn to_str(&self) -> anyhow::Result { + serde_json::to_string_pretty(&self.graph).context("failed to serialize graph") + } + pub fn create_root(&mut self, root: &str) -> anyhow::Result<()> { self.graph .try_insert(root.to_string(), GraphItem::User(BTreeMap::default())) diff --git a/crates/hyperlog/src/events.rs b/crates/hyperlog/src/events.rs index 89cf7fd..8a654e0 100644 --- a/crates/hyperlog/src/events.rs +++ b/crates/hyperlog/src/events.rs @@ -1,4 +1,4 @@ -use std::sync::Mutex; +use std::sync::{Arc, Mutex}; use bus::{Bus, BusReader}; @@ -9,8 +9,9 @@ pub enum Event { CommandEvent { command: Command }, } +#[derive(Clone)] pub struct Events { - bus: Mutex>, + bus: Arc>>, } impl Default for Events { @@ -18,7 +19,7 @@ impl Default for Events { let bus = Bus::new(10); Self { - bus: Mutex::new(bus), + bus: Arc::new(Mutex::new(bus)), } } } diff --git a/crates/hyperlog/src/main.rs b/crates/hyperlog/src/main.rs index 16eb2fc..2ba29c2 100644 --- a/crates/hyperlog/src/main.rs +++ b/crates/hyperlog/src/main.rs @@ -11,9 +11,14 @@ use sqlx::{Pool, Postgres}; use tower_http::trace::TraceLayer; pub mod commander; +pub mod querier; + pub mod engine; pub mod events; pub mod log; +pub mod shared_engine; +pub mod state; +pub mod storage; #[derive(Parser)] #[command(author, version, about, long_about = None, subcommand_required = true)] @@ -28,6 +33,29 @@ enum Commands { #[arg(env = "SERVICE_HOST", long, default_value = "127.0.0.1:3000")] host: SocketAddr, }, + + CreateRoot { + #[arg(long = "root")] + root: String, + }, + + CreateSection { + #[arg(long = "root")] + root: String, + + #[arg(long = "path")] + path: Option, + }, + + Get { + #[arg(long = "root")] + root: String, + + #[arg(long = "path")] + path: Option, + }, + + Info {}, } #[tokio::main] @@ -37,37 +65,72 @@ async fn main() -> anyhow::Result<()> { let cli = Command::parse(); - if let Some(Commands::Serve { host }) = cli.command { - tracing::info!("Starting service"); + let state = state::State::new()?; - let state = SharedState(Arc::new(State::new().await?)); + match cli.command { + Some(Commands::Serve { host }) => { + tracing::info!("Starting service"); - let app = Router::new() - .route("/", get(root)) - .with_state(state.clone()) - .layer( - TraceLayer::new_for_http().make_span_with(|request: &Request<_>| { - // Log the matched route's path (with placeholders not filled in). - // Use request.uri() or OriginalUri if you want the real path. - let matched_path = request - .extensions() - .get::() - .map(MatchedPath::as_str); + let state = SharedState(Arc::new(State::new().await?)); - tracing::info_span!( - "http_request", - method = ?request.method(), - matched_path, - some_other_field = tracing::field::Empty, - ) - }), // ... + let app = Router::new() + .route("/", get(root)) + .with_state(state.clone()) + .layer( + TraceLayer::new_for_http().make_span_with(|request: &Request<_>| { + // Log the matched route's path (with placeholders not filled in). + // Use request.uri() or OriginalUri if you want the real path. + let matched_path = request + .extensions() + .get::() + .map(MatchedPath::as_str); + + tracing::info_span!( + "http_request", + method = ?request.method(), + matched_path, + some_other_field = tracing::field::Empty, + ) + }), // ... + ); + + tracing::info!("listening on {}", host); + let listener = tokio::net::TcpListener::bind(host).await.unwrap(); + axum::serve(listener, app.into_make_service()) + .await + .unwrap(); + } + Some(Commands::CreateRoot { root }) => state + .commander + .execute(commander::Command::CreateRoot { root })?, + Some(Commands::CreateSection { root, path }) => { + state.commander.execute(commander::Command::CreateSection { + root, + path: path + .unwrap_or_default() + .split('.') + .map(|s| s.to_string()) + .filter(|s| !s.is_empty()) + .collect::>(), + })? + } + Some(Commands::Get { root, path }) => { + let res = state.querier.get( + &root, + path.unwrap_or_default() + .split('.') + .filter(|s| !s.is_empty()), ); - tracing::info!("listening on {}", host); - let listener = tokio::net::TcpListener::bind(host).await.unwrap(); - axum::serve(listener, app.into_make_service()) - .await - .unwrap(); + let output = serde_json::to_string_pretty(&res)?; + + println!("{}", output); + } + Some(Commands::Info {}) => { + println!("graph stored at: {}", state.storage.info()?) + } + + None => {} } Ok(()) diff --git a/crates/hyperlog/src/querier.rs b/crates/hyperlog/src/querier.rs new file mode 100644 index 0000000..98ca177 --- /dev/null +++ b/crates/hyperlog/src/querier.rs @@ -0,0 +1,24 @@ +use crate::{log::GraphItem, shared_engine::SharedEngine}; + +pub struct Querier { + engine: SharedEngine, +} + +impl Querier { + pub fn new(engine: SharedEngine) -> Self { + Self { engine } + } + + pub fn get( + &self, + root: &str, + path: impl IntoIterator>, + ) -> Option { + let path = path.into_iter().map(|i| i.into()).collect::>(); + + tracing::debug!("quering: {}, len: ({}))", path.join("."), path.len()); + + self.engine + .get(root, &path.iter().map(|i| i.as_str()).collect::>()) + } +} diff --git a/crates/hyperlog/src/shared_engine.rs b/crates/hyperlog/src/shared_engine.rs new file mode 100644 index 0000000..bb53bda --- /dev/null +++ b/crates/hyperlog/src/shared_engine.rs @@ -0,0 +1,46 @@ +use std::sync::{Arc, RwLock}; + +use crate::{engine::Engine, log::GraphItem}; + +#[derive(Clone)] +pub struct SharedEngine { + inner: Arc>, +} + +impl From for SharedEngine { + fn from(value: Engine) -> Self { + SharedEngine { + inner: Arc::new(RwLock::new(value)), + } + } +} + +impl SharedEngine { + pub fn to_str(&self) -> anyhow::Result { + self.inner.read().unwrap().to_str() + } + + pub fn create_root(&self, root: &str) -> anyhow::Result<()> { + self.inner.write().unwrap().create_root(root) + } + + pub fn create(&self, root: &str, path: &[&str], item: GraphItem) -> anyhow::Result<()> { + self.inner.write().unwrap().create(root, path, item) + } + + pub fn get(&self, root: &str, path: &[&str]) -> Option { + self.inner.read().unwrap().get(root, path).cloned() + } + + pub fn section_move( + &self, + root: &str, + src_path: &[&str], + dest_path: &[&str], + ) -> anyhow::Result<()> { + self.inner + .write() + .unwrap() + .section_move(root, src_path, dest_path) + } +} diff --git a/crates/hyperlog/src/state.rs b/crates/hyperlog/src/state.rs new file mode 100644 index 0000000..6dff701 --- /dev/null +++ b/crates/hyperlog/src/state.rs @@ -0,0 +1,31 @@ +use crate::{ + commander::Commander, events::Events, querier::Querier, shared_engine::SharedEngine, + storage::Storage, +}; + +pub struct State { + engine: SharedEngine, + pub storage: Storage, + events: Events, + + pub commander: Commander, + pub querier: Querier, +} + +impl State { + pub fn new() -> anyhow::Result { + let storage = Storage::new(); + let engine = storage.load()?; + let events = Events::default(); + let engine = SharedEngine::from(engine); + + Ok(Self { + engine: engine.clone(), + storage: storage.clone(), + events: events.clone(), + + commander: Commander::new(engine.clone(), storage, events)?, + querier: Querier::new(engine), + }) + } +} diff --git a/crates/hyperlog/src/storage.rs b/crates/hyperlog/src/storage.rs new file mode 100644 index 0000000..b3a2177 --- /dev/null +++ b/crates/hyperlog/src/storage.rs @@ -0,0 +1,253 @@ +use std::{ + path::{Path, PathBuf}, + sync::{Arc, Mutex}, + time::Duration, +}; + +use crate::{engine::Engine, shared_engine::SharedEngine}; + +pub struct LockFile(PathBuf); + +impl Drop for LockFile { + fn drop(&mut self) { + std::fs::remove_file(&self.0).expect("to be able to delete lockfile") + } +} + +impl From for LockFile { + fn from(value: PathBuf) -> Self { + Self(value) + } +} + +#[derive(Clone)] +pub struct Storage { + base: PathBuf, + lock_file: Arc>>, +} + +impl Default for Storage { + fn default() -> Self { + Self::new() + } +} + +impl Storage { + pub fn new() -> Self { + let data_dir = dirs::data_local_dir() + .ok_or(anyhow::anyhow!("failed to retrieve the users data dir")) + .expect("to be able to find config"); + + Self { + base: data_dir, + lock_file: Arc::new(Mutex::new(None)), + } + } + + pub fn with_base(&mut self, base: &Path) { + self.base = base.to_path_buf(); + } + + pub fn store(&self, engine: &SharedEngine) -> anyhow::Result<()> { + let state_path = self.state()?; + + std::fs::write(state_path, engine.to_str()?)?; + + Ok(()) + } + + pub fn load(&self) -> anyhow::Result { + let mut lock = self.lock_file.lock().unwrap(); + if lock.is_none() { + let lock_file = self.state_lock_file()?; + *lock = Some(lock_file); + } + + let engine = match self.state_file()? { + Some(contents) => Engine::engine_from_str(&contents)?, + None => Engine::default(), + }; + + Ok(engine) + } + + pub fn unload(self) -> anyhow::Result<()> { + drop(self); + Ok(()) + } + + fn state(&self) -> anyhow::Result { + self.cache().map(|c| c.join("graph.json")) + } + + fn state_file(&self) -> anyhow::Result> { + let state_path = self.state()?; + + if !state_path.exists() { + return Ok(None); + } + + let contents = std::fs::read_to_string(&state_path)?; + + Ok(Some(contents)) + } + + fn state_lock(&self) -> anyhow::Result { + self.cache().map(|c| c.join("graph.lock")) + } + + fn create_lock_file(&self) -> anyhow::Result<()> { + let lock_path = self.state_lock()?; + + if let Some(parent) = lock_path.parent() { + std::fs::create_dir_all(parent)?; + } + std::fs::write(lock_path, "hyperlog-lock")?; + + Ok(()) + } + + fn state_lock_file(&self) -> anyhow::Result { + let lock_path = self.state_lock()?; + + if !lock_path.exists() { + self.create_lock_file()?; + return Ok(LockFile::from(lock_path)); + } + + if let Ok(modified) = lock_path.metadata()?.modified() { + if modified.elapsed()? > Duration::from_secs(86400) { + std::fs::remove_file(&lock_path)?; + + self.create_lock_file()?; + return Ok(LockFile::from(lock_path)); + } + } + + anyhow::bail!("lock file exists and is valid. Aborting"); + } + + fn cache(&self) -> anyhow::Result { + Ok(self.base.join("hyperlog")) + } + + pub fn info(&self) -> anyhow::Result { + Ok(format!("storage:\n\tgraph: {}", self.state()?.display())) + } +} + +#[cfg(test)] +mod test { + use std::collections::BTreeMap; + + use similar_asserts::assert_eq; + + use crate::log::GraphItem; + + use super::*; + + #[test] + fn can_create_state() -> anyhow::Result<()> { + let tempdir = tempfile::tempdir()?; + + let mut storage = Storage::default(); + storage.with_base(tempdir.path()); + + let engine = SharedEngine::from(storage.load()?); + engine.create_root("can_create_state")?; + + storage.store(&engine)?; + + let graph = std::fs::read_to_string(tempdir.path().join("hyperlog").join("graph.json"))?; + let lock = std::fs::read_to_string(tempdir.path().join("hyperlog").join("graph.lock"))?; + + assert_eq!( + r#"{ + "can_create_state": { + "type": "user" + } +}"# + .to_string(), + graph + ); + assert_eq!(r#"hyperlog-lock"#.to_string(), lock); + + Ok(()) + } + + #[test] + fn lock_already_exists() -> anyhow::Result<()> { + let tempdir = tempfile::tempdir()?; + + let mut storage = Storage::default(); + storage.with_base(tempdir.path()); + + let _engine = storage.load()?; + + let mut storage_should_fail = Storage::default(); + storage_should_fail.with_base(tempdir.path()); + + let engine_should_fail = storage_should_fail.load(); + + assert!(engine_should_fail.is_err()); + if let Err(e) = engine_should_fail { + assert_eq!( + "lock file exists and is valid. Aborting".to_string(), + e.to_string() + ); + } + + Ok(()) + } + + #[test] + fn lock_is_cleaned_up() -> anyhow::Result<()> { + let tempdir = tempfile::tempdir()?; + + let mut storage = Storage::default(); + storage.with_base(tempdir.path()); + + let engine = SharedEngine::from(storage.load()?); + engine.create_root("can_create_state")?; + + storage.store(&engine)?; + storage.unload()?; + + assert!(!tempdir.path().join("hyperlog").join("graph.lock").exists()); + + Ok(()) + } + + #[test] + fn can_load_state() -> anyhow::Result<()> { + let tempdir = tempfile::tempdir()?; + + let mut storage = Storage::default(); + storage.with_base(tempdir.path()); + + let engine = SharedEngine::from(storage.load()?); + engine.create_root("can_create_state")?; + + storage.store(&engine)?; + + let graph = std::fs::read_to_string(tempdir.path().join("hyperlog").join("graph.json"))?; + + assert_eq!( + r#"{ + "can_create_state": { + "type": "user" + } +}"# + .to_string(), + graph + ); + + let engine = storage.load()?; + + let res = engine.get("can_create_state", &[]); + + assert_eq!(Some(GraphItem::User(BTreeMap::default())), res.cloned()); + + Ok(()) + } +}