From cce97f62f8027c7d1a03663cfd172112128b2ac1 Mon Sep 17 00:00:00 2001 From: kjuulh Date: Mon, 29 Apr 2024 21:48:01 +0200 Subject: [PATCH] feat: add event layer Signed-off-by: kjuulh --- Cargo.lock | 21 ++++++ Cargo.toml | 1 - crates/hyperlog/Cargo.toml | 1 + crates/hyperlog/src/commander.rs | 79 +++++++++++++++++++ crates/hyperlog/src/events.rs | 125 +++++++++++++++++++++++++++++++ crates/hyperlog/src/log.rs | 2 +- crates/hyperlog/src/main.rs | 2 + 7 files changed, 229 insertions(+), 2 deletions(-) create mode 100644 crates/hyperlog/src/commander.rs create mode 100644 crates/hyperlog/src/events.rs diff --git a/Cargo.lock b/Cargo.lock index e323d65..f2ceeb7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -214,6 +214,17 @@ dependencies = [ "regex-automata", ] +[[package]] +name = "bus" +version = "2.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4b7118d0221d84fada881b657c2ddb7cd55108db79c8764c9ee212c0c259b783" +dependencies = [ + "crossbeam-channel", + "num_cpus", + "parking_lot_core", +] + [[package]] name = "byteorder" version = "1.5.0" @@ -328,6 +339,15 @@ version = "2.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "19d374276b40fb8bbdee95aef7c7fa6b5316ec764510eb64b8dd0e2ed0d7e7f5" +[[package]] +name = "crossbeam-channel" +version = "0.5.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ab3db02a9c5b5121e1e42fbdb1aeb65f5e02624cc58c43f2884c6ccac0b82f95" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "crossbeam-queue" version = "0.3.11" @@ -758,6 +778,7 @@ version = "0.1.0" dependencies = [ "anyhow", "axum", + "bus", "clap", "dotenv", "serde", diff --git a/Cargo.toml b/Cargo.toml index fb2309c..1519161 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,7 +3,6 @@ members = ["crates/*"] resolver = "2" [workspace.dependencies] -hyperlog = { path = "crates/hyperlog" } anyhow = { version = "1" } tokio = { version = "1", features = ["full"] } diff --git a/crates/hyperlog/Cargo.toml b/crates/hyperlog/Cargo.toml index 08f2cc2..991b09d 100644 --- a/crates/hyperlog/Cargo.toml +++ b/crates/hyperlog/Cargo.toml @@ -17,6 +17,7 @@ sqlx = { version = "0.7.3", features = ["runtime-tokio", "tls-rustls", "postgres uuid = { version = "1.7.0", features = ["v4"] } tower-http = { version = "0.5.2", features = ["cors", "trace"] } serde_json = "1.0.116" +bus = "2.4.1" [dev-dependencies] similar-asserts = "1.5.0" diff --git a/crates/hyperlog/src/commander.rs b/crates/hyperlog/src/commander.rs new file mode 100644 index 0000000..fb1d2de --- /dev/null +++ b/crates/hyperlog/src/commander.rs @@ -0,0 +1,79 @@ +use std::{collections::BTreeMap, sync::RwLock}; + +use serde::Serialize; + +use crate::{ + engine::Engine, + events::Events, + log::{GraphItem, ItemState}, +}; + +#[derive(Serialize, PartialEq, Eq, Debug, Clone)] +pub enum Command { + CreateRoot { + root: String, + }, + CreateSection { + root: String, + path: Vec, + }, + CreateItem { + root: String, + path: Vec, + title: String, + description: String, + state: ItemState, + }, + Move { + root: String, + src: Vec, + dest: Vec, + }, +} + +#[derive(Default)] +pub struct Commander { + engine: RwLock, + events: Events, +} + +impl Commander { + pub fn execute(&self, cmd: Command) -> anyhow::Result<()> { + tracing::debug!("executing event: {}", serde_json::to_string(&cmd)?); + + match cmd { + Command::CreateRoot { root } => { + self.engine.write().unwrap().create_root(&root)?; + } + Command::CreateSection { root, path } => { + self.engine.write().unwrap().create( + &root, + &path.iter().map(|p| p.as_str()).collect::>(), + GraphItem::Section(BTreeMap::default()), + )?; + } + Command::CreateItem { + root, + path, + title, + description, + state, + } => self.engine.write().unwrap().create( + &root, + &path.iter().map(|p| p.as_str()).collect::>(), + GraphItem::Item { + title, + description, + state, + }, + )?, + Command::Move { root, src, dest } => self.engine.write().unwrap().section_move( + &root, + &src.iter().map(|p| p.as_str()).collect::>(), + &dest.iter().map(|p| p.as_str()).collect::>(), + )?, + } + + Ok(()) + } +} diff --git a/crates/hyperlog/src/events.rs b/crates/hyperlog/src/events.rs new file mode 100644 index 0000000..89cf7fd --- /dev/null +++ b/crates/hyperlog/src/events.rs @@ -0,0 +1,125 @@ +use std::sync::Mutex; + +use bus::{Bus, BusReader}; + +use crate::commander::Command; + +#[derive(PartialEq, Eq, Debug, Clone)] +pub enum Event { + CommandEvent { command: Command }, +} + +pub struct Events { + bus: Mutex>, +} + +impl Default for Events { + fn default() -> Self { + let bus = Bus::new(10); + + Self { + bus: Mutex::new(bus), + } + } +} + +impl Events { + pub fn enque_command(&self, cmd: Command) -> anyhow::Result<()> { + self.bus + .lock() + .unwrap() + .broadcast(Event::CommandEvent { command: cmd }); + + Ok(()) + } + + pub fn subscribe(&self) -> anyhow::Result> { + let rx = self.bus.lock().unwrap().add_rx(); + + Ok(rx) + } +} + +#[cfg(test)] +mod test { + use similar_asserts::assert_eq; + + use crate::{commander::Command, events::Event}; + + use super::Events; + + #[test] + fn can_enque_command() -> anyhow::Result<()> { + let events = Events::default(); + + events.enque_command(Command::CreateRoot { + root: "some-root".into(), + })?; + + Ok(()) + } + + #[test] + fn can_deque_command() -> anyhow::Result<()> { + let events = Events::default(); + let mut rx = events.subscribe()?; + + std::thread::spawn(move || { + events + .enque_command(Command::CreateRoot { + root: "some-root".into(), + }) + .unwrap(); + }); + + let event = rx.recv()?; + + assert_eq!( + Event::CommandEvent { + command: Command::CreateRoot { + root: "some-root".into() + }, + }, + event + ); + + Ok(()) + } + + #[test] + fn is_broadcast() -> anyhow::Result<()> { + let events = Events::default(); + let mut rx1 = events.subscribe()?; + let mut rx2 = events.subscribe()?; + + std::thread::spawn(move || { + events + .enque_command(Command::CreateRoot { + root: "some-root".into(), + }) + .unwrap(); + + events + .enque_command(Command::CreateRoot { + root: "another-event".into(), + }) + .unwrap(); + }); + + let event = rx1.recv()?; + let same_event = rx2.recv()?; + + assert_eq!(event, same_event); + + assert_eq!( + Event::CommandEvent { + command: Command::CreateRoot { + root: "some-root".into() + }, + }, + event + ); + + Ok(()) + } +} diff --git a/crates/hyperlog/src/log.rs b/crates/hyperlog/src/log.rs index a9fb4de..0aa5e8f 100644 --- a/crates/hyperlog/src/log.rs +++ b/crates/hyperlog/src/log.rs @@ -5,7 +5,7 @@ use std::{ use serde::{Deserialize, Serialize}; -#[derive(Deserialize, Serialize, PartialEq, Eq, Clone)] +#[derive(Deserialize, Serialize, PartialEq, Eq, Clone, Debug)] pub enum ItemState { #[serde(rename = "not-done")] NotDone, diff --git a/crates/hyperlog/src/main.rs b/crates/hyperlog/src/main.rs index e001f07..16eb2fc 100644 --- a/crates/hyperlog/src/main.rs +++ b/crates/hyperlog/src/main.rs @@ -10,7 +10,9 @@ use clap::{Parser, Subcommand}; use sqlx::{Pool, Postgres}; use tower_http::trace::TraceLayer; +pub mod commander; pub mod engine; +pub mod events; pub mod log; #[derive(Parser)]