chore: clean up

Signed-off-by: kjuulh <contact@kjuulh.io>
This commit is contained in:
Kasper Juul Hermansen 2023-10-02 22:16:15 +02:00
parent c3ab28a4de
commit 7777dcaa44
Signed by: kjuulh
GPG Key ID: 57B6E1465221F912
9 changed files with 120 additions and 55 deletions

12
Cargo.lock generated
View File

@ -622,6 +622,18 @@ dependencies = [
[[package]]
name = "crunch-postgres"
version = "0.1.0"
dependencies = [
"anyhow",
"async-trait",
"crunch-envelope",
"crunch-traits",
"futures",
"thiserror",
"tokio",
"tokio-stream",
"tracing",
"uuid",
]
[[package]]
name = "crunch-traits"

View File

@ -10,6 +10,7 @@ crunch-in-memory = { path = "crates/crunch-in-memory" }
crunch-nats = { path = "crates/crunch-nats" }
crunch-file = {path = "crates/crunch-file"}
crunch-codegen = {path = "crates/crunch-codegen"}
crunch-postgres = {path = "crates/crunch-postgres"}
anyhow = { version = "1.0.75" }
tokio = { version = "1", features = ["full"] }

View File

@ -144,8 +144,8 @@ message MyEvent {{
string my_field = 1;
}}
"#,
config.service.domain.replace("-", "_"),
entity.replace("-", "_")
config.service.domain.replace('-', "_"),
entity.replace('-', "_")
)
.as_bytes(),
)
@ -154,16 +154,16 @@ message MyEvent {{
let output_path = if let Some(dir) = &cli.global_args.crunch_file.parent() {
if dir.display().to_string() == "" {
format!("{schema_path}")
schema_path.to_string()
} else {
format!(
"{}/{}",
dir.display().to_string().trim_end_matches("/"),
schema_path.trim_start_matches("/")
dir.display().to_string().trim_end_matches('/'),
schema_path.trim_start_matches('/')
)
}
} else {
format!("{schema_path}")
schema_path.to_string()
};
println!("Success: added publish, check schema at: {output_path}");
@ -172,9 +172,8 @@ message MyEvent {{
}
},
Commands::Init { commands: None } => {
match config::get_file(&cli.global_args.crunch_file).await {
Ok(_) => anyhow::bail!("config file already exists"),
Err(_) => {}
if (config::get_file(&cli.global_args.crunch_file).await).is_ok() {
anyhow::bail!("config file already exists")
}
let path = &cli.global_args.crunch_file;

View File

@ -54,7 +54,7 @@ impl Node {
fn traverse(&self) -> genco::lang::rust::Tokens {
let mut child_tokens = Vec::new();
let mut nodes = self.children.iter().map(|(_, n)| n).collect::<Vec<_>>();
let mut nodes = self.children.values().collect::<Vec<_>>();
nodes.sort_by(|a, b| a.segment.cmp(&b.segment));
for node in nodes {
let tokens = node.traverse_indent(0);
@ -114,7 +114,7 @@ impl Node {
}
} else {
let mut child_tokens = Vec::new();
let mut nodes = self.children.iter().map(|(_, n)| n).collect::<Vec<_>>();
let mut nodes = self.children.values().collect::<Vec<_>>();
nodes.sort_by(|a, b| a.segment.cmp(&b.segment));
for node in nodes {
let tokens = node.traverse_indent(indent + 1);

View File

@ -3,12 +3,6 @@ name = "crunch-envelope"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[[bench]]
name = "envelope_benchmark"
harness = false
[features]
default = ["proto"]
json = ["dep:serde", "dep:serde_json", "dep:base64"]

View File

@ -1,37 +0,0 @@
use criterion::{criterion_group, criterion_main, Criterion};
use crunch_envelope::{unwrap, wrap};
fn envelope_capnp_benchmark(content: &[u8]) {
let out = wrap("some-domain", "some-entity", content);
let _ = unwrap(&out).expect("to be able to unwrap capnp message");
}
fn envelope_json_benchmark(content: &[u8]) {
let out = crunch_envelope::json::wrap("some-domain", "some-entity", content);
let _ = crunch_envelope::json::unwrap(&out).expect("to be able to unwrap capnp message");
}
fn envelope_proto_benchmark(content: &[u8]) {
let out = crunch_envelope::proto::wrap("some-domain", "some-entity", content);
let _ = crunch_envelope::proto::unwrap(&out).expect("to be able to unwrap capnp message");
}
fn criterion_benchmark(c: &mut Criterion) {
let large_content: [u8; 10000] = [0; 10000];
c.bench_function("envelope::capnp", |b| {
b.iter(|| envelope_capnp_benchmark(&large_content))
});
c.bench_function("envelope::json", |b| {
b.iter(|| envelope_json_benchmark(&large_content))
});
c.bench_function("envelope::proto", |b| {
b.iter(|| envelope_proto_benchmark(&large_content))
});
}
criterion_group!(benches, criterion_benchmark);
criterion_main!(benches);

View File

@ -235,7 +235,8 @@ output-path = "some-output"
},
publish: Some(vec![Publish {
schema_path: "some-schema".into(),
output_path: "some-output".into()
output_path: "some-output".into(),
entities: vec![]
}])
}
);

View File

@ -0,0 +1,84 @@
use std::{
collections::{BTreeMap, VecDeque},
sync::Arc,
};
use async_trait::async_trait;
use crunch_traits::{errors::PersistenceError, EventInfo};
use tokio::sync::RwLock;
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
enum MsgState {
Pending,
Published,
}
#[derive(Debug, Clone)]
pub struct Msg {
id: String,
info: EventInfo,
msg: Vec<u8>,
state: MsgState,
}
pub struct InMemoryPersistence {
pub outbox: Arc<RwLock<VecDeque<Msg>>>,
pub store: Arc<RwLock<BTreeMap<String, Msg>>>,
}
#[async_trait]
impl crunch_traits::Persistence for InMemoryPersistence {
async fn insert(&self, event_info: &EventInfo, content: Vec<u8>) -> anyhow::Result<()> {
let msg = crunch_envelope::proto::wrap(event_info.domain, event_info.entity_type, &content);
let msg = Msg {
id: uuid::Uuid::new_v4().to_string(),
info: *event_info,
msg,
state: MsgState::Pending,
};
let mut outbox = self.outbox.write().await;
outbox.push_back(msg.clone());
self.store.write().await.insert(msg.id.clone(), msg);
tracing::debug!(
event_info = event_info.to_string(),
content_len = content.len(),
"inserted event"
);
Ok(())
}
async fn next(&self) -> Option<String> {
let mut outbox = self.outbox.write().await;
outbox.pop_front().map(|i| i.id)
}
async fn get(&self, event_id: &str) -> Result<Option<(EventInfo, Vec<u8>)>, PersistenceError> {
let store = self.store.read().await;
let event = match store.get(event_id).filter(|m| m.state == MsgState::Pending) {
Some(event) => event,
None => return Ok(None),
};
let (content, _) = crunch_envelope::proto::unwrap(event.msg.as_slice())
.map_err(|e| PersistenceError::GetErr(anyhow::anyhow!(e)))?;
Ok(Some((event.info, content)))
}
async fn update_published(&self, event_id: &str) -> Result<(), PersistenceError> {
match self.store.write().await.get_mut(event_id) {
Some(msg) => msg.state = MsgState::Published,
None => {
return Err(PersistenceError::UpdatePublished(anyhow::anyhow!(
"event was not found on id: {}",
event_id
)))
}
}
Ok(())
}
}

View File

@ -6,3 +6,14 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
crunch-traits.workspace = true
crunch-envelope.workspace = true
anyhow.workspace = true
tracing.workspace = true
tokio.workspace = true
thiserror.workspace = true
async-trait.workspace = true
futures.workspace = true
uuid.workspace = true
tokio-stream = {workspace = true, features = ["sync"]}