From 7777dcaa44d3032def9b0243fc3e9ddbbdb307a6 Mon Sep 17 00:00:00 2001 From: kjuulh Date: Mon, 2 Oct 2023 22:16:15 +0200 Subject: [PATCH] chore: clean up Signed-off-by: kjuulh --- Cargo.lock | 12 +++ Cargo.toml | 1 + crates/crunch-cli/src/main.rs | 17 ++-- crates/crunch-codegen/src/lib.rs | 4 +- crates/crunch-envelope/Cargo.toml | 6 -- .../benches/envelope_benchmark.rs | 37 -------- crates/crunch-file/src/lib.rs | 3 +- crates/crunch-in-memory/src/persistence.rs | 84 +++++++++++++++++++ crates/crunch-postgres/Cargo.toml | 11 +++ 9 files changed, 120 insertions(+), 55 deletions(-) delete mode 100644 crates/crunch-envelope/benches/envelope_benchmark.rs diff --git a/Cargo.lock b/Cargo.lock index e733bd6..3ce2845 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -622,6 +622,18 @@ dependencies = [ [[package]] name = "crunch-postgres" version = "0.1.0" +dependencies = [ + "anyhow", + "async-trait", + "crunch-envelope", + "crunch-traits", + "futures", + "thiserror", + "tokio", + "tokio-stream", + "tracing", + "uuid", +] [[package]] name = "crunch-traits" diff --git a/Cargo.toml b/Cargo.toml index 44d7313..714086f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,6 +10,7 @@ crunch-in-memory = { path = "crates/crunch-in-memory" } crunch-nats = { path = "crates/crunch-nats" } crunch-file = {path = "crates/crunch-file"} crunch-codegen = {path = "crates/crunch-codegen"} +crunch-postgres = {path = "crates/crunch-postgres"} anyhow = { version = "1.0.75" } tokio = { version = "1", features = ["full"] } diff --git a/crates/crunch-cli/src/main.rs b/crates/crunch-cli/src/main.rs index ad05656..1a5a66f 100644 --- a/crates/crunch-cli/src/main.rs +++ b/crates/crunch-cli/src/main.rs @@ -144,8 +144,8 @@ message MyEvent {{ string my_field = 1; }} "#, - config.service.domain.replace("-", "_"), - entity.replace("-", "_") + config.service.domain.replace('-', "_"), + entity.replace('-', "_") ) .as_bytes(), ) @@ -154,16 +154,16 @@ message MyEvent {{ let output_path = if let Some(dir) = &cli.global_args.crunch_file.parent() { if dir.display().to_string() == "" { - format!("{schema_path}") + schema_path.to_string() } else { format!( "{}/{}", - dir.display().to_string().trim_end_matches("/"), - schema_path.trim_start_matches("/") + dir.display().to_string().trim_end_matches('/'), + schema_path.trim_start_matches('/') ) } } else { - format!("{schema_path}") + schema_path.to_string() }; println!("Success: added publish, check schema at: {output_path}"); @@ -172,9 +172,8 @@ message MyEvent {{ } }, Commands::Init { commands: None } => { - match config::get_file(&cli.global_args.crunch_file).await { - Ok(_) => anyhow::bail!("config file already exists"), - Err(_) => {} + if (config::get_file(&cli.global_args.crunch_file).await).is_ok() { + anyhow::bail!("config file already exists") } let path = &cli.global_args.crunch_file; diff --git a/crates/crunch-codegen/src/lib.rs b/crates/crunch-codegen/src/lib.rs index 353a28b..6fc1a27 100644 --- a/crates/crunch-codegen/src/lib.rs +++ b/crates/crunch-codegen/src/lib.rs @@ -54,7 +54,7 @@ impl Node { fn traverse(&self) -> genco::lang::rust::Tokens { let mut child_tokens = Vec::new(); - let mut nodes = self.children.iter().map(|(_, n)| n).collect::>(); + let mut nodes = self.children.values().collect::>(); nodes.sort_by(|a, b| a.segment.cmp(&b.segment)); for node in nodes { let tokens = node.traverse_indent(0); @@ -114,7 +114,7 @@ impl Node { } } else { let mut child_tokens = Vec::new(); - let mut nodes = self.children.iter().map(|(_, n)| n).collect::>(); + let mut nodes = self.children.values().collect::>(); nodes.sort_by(|a, b| a.segment.cmp(&b.segment)); for node in nodes { let tokens = node.traverse_indent(indent + 1); diff --git a/crates/crunch-envelope/Cargo.toml b/crates/crunch-envelope/Cargo.toml index 3f2d437..87b73e0 100644 --- a/crates/crunch-envelope/Cargo.toml +++ b/crates/crunch-envelope/Cargo.toml @@ -3,12 +3,6 @@ name = "crunch-envelope" version = "0.1.0" edition = "2021" -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - -[[bench]] -name = "envelope_benchmark" -harness = false - [features] default = ["proto"] json = ["dep:serde", "dep:serde_json", "dep:base64"] diff --git a/crates/crunch-envelope/benches/envelope_benchmark.rs b/crates/crunch-envelope/benches/envelope_benchmark.rs deleted file mode 100644 index 7265eac..0000000 --- a/crates/crunch-envelope/benches/envelope_benchmark.rs +++ /dev/null @@ -1,37 +0,0 @@ -use criterion::{criterion_group, criterion_main, Criterion}; -use crunch_envelope::{unwrap, wrap}; - -fn envelope_capnp_benchmark(content: &[u8]) { - let out = wrap("some-domain", "some-entity", content); - - let _ = unwrap(&out).expect("to be able to unwrap capnp message"); -} - -fn envelope_json_benchmark(content: &[u8]) { - let out = crunch_envelope::json::wrap("some-domain", "some-entity", content); - - let _ = crunch_envelope::json::unwrap(&out).expect("to be able to unwrap capnp message"); -} - -fn envelope_proto_benchmark(content: &[u8]) { - let out = crunch_envelope::proto::wrap("some-domain", "some-entity", content); - - let _ = crunch_envelope::proto::unwrap(&out).expect("to be able to unwrap capnp message"); -} - -fn criterion_benchmark(c: &mut Criterion) { - let large_content: [u8; 10000] = [0; 10000]; - - c.bench_function("envelope::capnp", |b| { - b.iter(|| envelope_capnp_benchmark(&large_content)) - }); - c.bench_function("envelope::json", |b| { - b.iter(|| envelope_json_benchmark(&large_content)) - }); - c.bench_function("envelope::proto", |b| { - b.iter(|| envelope_proto_benchmark(&large_content)) - }); -} - -criterion_group!(benches, criterion_benchmark); -criterion_main!(benches); diff --git a/crates/crunch-file/src/lib.rs b/crates/crunch-file/src/lib.rs index 93d7ca0..bfd003a 100644 --- a/crates/crunch-file/src/lib.rs +++ b/crates/crunch-file/src/lib.rs @@ -235,7 +235,8 @@ output-path = "some-output" }, publish: Some(vec![Publish { schema_path: "some-schema".into(), - output_path: "some-output".into() + output_path: "some-output".into(), + entities: vec![] }]) } ); diff --git a/crates/crunch-in-memory/src/persistence.rs b/crates/crunch-in-memory/src/persistence.rs index e69de29..858fd7c 100644 --- a/crates/crunch-in-memory/src/persistence.rs +++ b/crates/crunch-in-memory/src/persistence.rs @@ -0,0 +1,84 @@ +use std::{ + collections::{BTreeMap, VecDeque}, + sync::Arc, +}; + +use async_trait::async_trait; +use crunch_traits::{errors::PersistenceError, EventInfo}; +use tokio::sync::RwLock; + +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)] +enum MsgState { + Pending, + Published, +} + +#[derive(Debug, Clone)] +pub struct Msg { + id: String, + info: EventInfo, + msg: Vec, + state: MsgState, +} + +pub struct InMemoryPersistence { + pub outbox: Arc>>, + pub store: Arc>>, +} + +#[async_trait] +impl crunch_traits::Persistence for InMemoryPersistence { + async fn insert(&self, event_info: &EventInfo, content: Vec) -> anyhow::Result<()> { + let msg = crunch_envelope::proto::wrap(event_info.domain, event_info.entity_type, &content); + let msg = Msg { + id: uuid::Uuid::new_v4().to_string(), + info: *event_info, + msg, + state: MsgState::Pending, + }; + let mut outbox = self.outbox.write().await; + outbox.push_back(msg.clone()); + self.store.write().await.insert(msg.id.clone(), msg); + + tracing::debug!( + event_info = event_info.to_string(), + content_len = content.len(), + "inserted event" + ); + + Ok(()) + } + + async fn next(&self) -> Option { + let mut outbox = self.outbox.write().await; + outbox.pop_front().map(|i| i.id) + } + + async fn get(&self, event_id: &str) -> Result)>, PersistenceError> { + let store = self.store.read().await; + + let event = match store.get(event_id).filter(|m| m.state == MsgState::Pending) { + Some(event) => event, + None => return Ok(None), + }; + + let (content, _) = crunch_envelope::proto::unwrap(event.msg.as_slice()) + .map_err(|e| PersistenceError::GetErr(anyhow::anyhow!(e)))?; + + Ok(Some((event.info, content))) + } + + async fn update_published(&self, event_id: &str) -> Result<(), PersistenceError> { + match self.store.write().await.get_mut(event_id) { + Some(msg) => msg.state = MsgState::Published, + None => { + return Err(PersistenceError::UpdatePublished(anyhow::anyhow!( + "event was not found on id: {}", + event_id + ))) + } + } + + Ok(()) + } +} diff --git a/crates/crunch-postgres/Cargo.toml b/crates/crunch-postgres/Cargo.toml index 7c9d190..f11d559 100644 --- a/crates/crunch-postgres/Cargo.toml +++ b/crates/crunch-postgres/Cargo.toml @@ -6,3 +6,14 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +crunch-traits.workspace = true +crunch-envelope.workspace = true + +anyhow.workspace = true +tracing.workspace = true +tokio.workspace = true +thiserror.workspace = true +async-trait.workspace = true +futures.workspace = true +uuid.workspace = true +tokio-stream = {workspace = true, features = ["sync"]} \ No newline at end of file