feat: add event layer

Signed-off-by: kjuulh <contact@kjuulh.io>
This commit is contained in:
Kasper Juul Hermansen 2024-04-29 21:48:01 +02:00
parent 7de4872816
commit cce97f62f8
Signed by: kjuulh
GPG Key ID: 57B6E1465221F912
7 changed files with 229 additions and 2 deletions

21
Cargo.lock generated
View File

@ -214,6 +214,17 @@ dependencies = [
"regex-automata", "regex-automata",
] ]
[[package]]
name = "bus"
version = "2.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4b7118d0221d84fada881b657c2ddb7cd55108db79c8764c9ee212c0c259b783"
dependencies = [
"crossbeam-channel",
"num_cpus",
"parking_lot_core",
]
[[package]] [[package]]
name = "byteorder" name = "byteorder"
version = "1.5.0" version = "1.5.0"
@ -328,6 +339,15 @@ version = "2.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "19d374276b40fb8bbdee95aef7c7fa6b5316ec764510eb64b8dd0e2ed0d7e7f5" checksum = "19d374276b40fb8bbdee95aef7c7fa6b5316ec764510eb64b8dd0e2ed0d7e7f5"
[[package]]
name = "crossbeam-channel"
version = "0.5.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ab3db02a9c5b5121e1e42fbdb1aeb65f5e02624cc58c43f2884c6ccac0b82f95"
dependencies = [
"crossbeam-utils",
]
[[package]] [[package]]
name = "crossbeam-queue" name = "crossbeam-queue"
version = "0.3.11" version = "0.3.11"
@ -758,6 +778,7 @@ version = "0.1.0"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"axum", "axum",
"bus",
"clap", "clap",
"dotenv", "dotenv",
"serde", "serde",

View File

@ -3,7 +3,6 @@ members = ["crates/*"]
resolver = "2" resolver = "2"
[workspace.dependencies] [workspace.dependencies]
hyperlog = { path = "crates/hyperlog" }
anyhow = { version = "1" } anyhow = { version = "1" }
tokio = { version = "1", features = ["full"] } tokio = { version = "1", features = ["full"] }

View File

@ -17,6 +17,7 @@ sqlx = { version = "0.7.3", features = ["runtime-tokio", "tls-rustls", "postgres
uuid = { version = "1.7.0", features = ["v4"] } uuid = { version = "1.7.0", features = ["v4"] }
tower-http = { version = "0.5.2", features = ["cors", "trace"] } tower-http = { version = "0.5.2", features = ["cors", "trace"] }
serde_json = "1.0.116" serde_json = "1.0.116"
bus = "2.4.1"
[dev-dependencies] [dev-dependencies]
similar-asserts = "1.5.0" similar-asserts = "1.5.0"

View File

@ -0,0 +1,79 @@
use std::{collections::BTreeMap, sync::RwLock};
use serde::Serialize;
use crate::{
engine::Engine,
events::Events,
log::{GraphItem, ItemState},
};
#[derive(Serialize, PartialEq, Eq, Debug, Clone)]
pub enum Command {
CreateRoot {
root: String,
},
CreateSection {
root: String,
path: Vec<String>,
},
CreateItem {
root: String,
path: Vec<String>,
title: String,
description: String,
state: ItemState,
},
Move {
root: String,
src: Vec<String>,
dest: Vec<String>,
},
}
#[derive(Default)]
pub struct Commander {
engine: RwLock<Engine>,
events: Events,
}
impl Commander {
pub fn execute(&self, cmd: Command) -> anyhow::Result<()> {
tracing::debug!("executing event: {}", serde_json::to_string(&cmd)?);
match cmd {
Command::CreateRoot { root } => {
self.engine.write().unwrap().create_root(&root)?;
}
Command::CreateSection { root, path } => {
self.engine.write().unwrap().create(
&root,
&path.iter().map(|p| p.as_str()).collect::<Vec<_>>(),
GraphItem::Section(BTreeMap::default()),
)?;
}
Command::CreateItem {
root,
path,
title,
description,
state,
} => self.engine.write().unwrap().create(
&root,
&path.iter().map(|p| p.as_str()).collect::<Vec<_>>(),
GraphItem::Item {
title,
description,
state,
},
)?,
Command::Move { root, src, dest } => self.engine.write().unwrap().section_move(
&root,
&src.iter().map(|p| p.as_str()).collect::<Vec<_>>(),
&dest.iter().map(|p| p.as_str()).collect::<Vec<_>>(),
)?,
}
Ok(())
}
}

View File

@ -0,0 +1,125 @@
use std::sync::Mutex;
use bus::{Bus, BusReader};
use crate::commander::Command;
#[derive(PartialEq, Eq, Debug, Clone)]
pub enum Event {
CommandEvent { command: Command },
}
pub struct Events {
bus: Mutex<Bus<Event>>,
}
impl Default for Events {
fn default() -> Self {
let bus = Bus::new(10);
Self {
bus: Mutex::new(bus),
}
}
}
impl Events {
pub fn enque_command(&self, cmd: Command) -> anyhow::Result<()> {
self.bus
.lock()
.unwrap()
.broadcast(Event::CommandEvent { command: cmd });
Ok(())
}
pub fn subscribe(&self) -> anyhow::Result<BusReader<Event>> {
let rx = self.bus.lock().unwrap().add_rx();
Ok(rx)
}
}
#[cfg(test)]
mod test {
use similar_asserts::assert_eq;
use crate::{commander::Command, events::Event};
use super::Events;
#[test]
fn can_enque_command() -> anyhow::Result<()> {
let events = Events::default();
events.enque_command(Command::CreateRoot {
root: "some-root".into(),
})?;
Ok(())
}
#[test]
fn can_deque_command() -> anyhow::Result<()> {
let events = Events::default();
let mut rx = events.subscribe()?;
std::thread::spawn(move || {
events
.enque_command(Command::CreateRoot {
root: "some-root".into(),
})
.unwrap();
});
let event = rx.recv()?;
assert_eq!(
Event::CommandEvent {
command: Command::CreateRoot {
root: "some-root".into()
},
},
event
);
Ok(())
}
#[test]
fn is_broadcast() -> anyhow::Result<()> {
let events = Events::default();
let mut rx1 = events.subscribe()?;
let mut rx2 = events.subscribe()?;
std::thread::spawn(move || {
events
.enque_command(Command::CreateRoot {
root: "some-root".into(),
})
.unwrap();
events
.enque_command(Command::CreateRoot {
root: "another-event".into(),
})
.unwrap();
});
let event = rx1.recv()?;
let same_event = rx2.recv()?;
assert_eq!(event, same_event);
assert_eq!(
Event::CommandEvent {
command: Command::CreateRoot {
root: "some-root".into()
},
},
event
);
Ok(())
}
}

View File

@ -5,7 +5,7 @@ use std::{
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
#[derive(Deserialize, Serialize, PartialEq, Eq, Clone)] #[derive(Deserialize, Serialize, PartialEq, Eq, Clone, Debug)]
pub enum ItemState { pub enum ItemState {
#[serde(rename = "not-done")] #[serde(rename = "not-done")]
NotDone, NotDone,

View File

@ -10,7 +10,9 @@ use clap::{Parser, Subcommand};
use sqlx::{Pool, Postgres}; use sqlx::{Pool, Postgres};
use tower_http::trace::TraceLayer; use tower_http::trace::TraceLayer;
pub mod commander;
pub mod engine; pub mod engine;
pub mod events;
pub mod log; pub mod log;
#[derive(Parser)] #[derive(Parser)]