From c3ab28a4de6172e232dec41d107314efe2e9e0ea Mon Sep 17 00:00:00 2001 From: kjuulh Date: Mon, 2 Oct 2023 22:02:03 +0200 Subject: [PATCH] feat: move persistence to another file Signed-off-by: kjuulh --- Cargo.lock | 4 + crates/crunch-in-memory/src/lib.rs | 92 +--------------------- crates/crunch-in-memory/src/persistence.rs | 0 crates/crunch-postgres/src/lib.rs | 13 --- 4 files changed, 5 insertions(+), 104 deletions(-) create mode 100644 crates/crunch-in-memory/src/persistence.rs diff --git a/Cargo.lock b/Cargo.lock index d8bb67c..e733bd6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -619,6 +619,10 @@ dependencies = [ "tracing", ] +[[package]] +name = "crunch-postgres" +version = "0.1.0" + [[package]] name = "crunch-traits" version = "0.1.0" diff --git a/crates/crunch-in-memory/src/lib.rs b/crates/crunch-in-memory/src/lib.rs index 4e5cc30..71d51a5 100644 --- a/crates/crunch-in-memory/src/lib.rs +++ b/crates/crunch-in-memory/src/lib.rs @@ -1,92 +1,2 @@ +pub mod persistence; pub mod transport; - -pub mod persistence { - use std::{ - collections::{BTreeMap, VecDeque}, - sync::Arc, - }; - - use async_trait::async_trait; - use crunch_traits::{errors::PersistenceError, EventInfo}; - use tokio::sync::RwLock; - - #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)] - enum MsgState { - Pending, - Published, - } - - #[derive(Debug, Clone)] - pub struct Msg { - id: String, - info: EventInfo, - msg: Vec, - state: MsgState, - } - - pub struct InMemoryPersistence { - pub outbox: Arc>>, - pub store: Arc>>, - } - - #[async_trait] - impl crunch_traits::Persistence for InMemoryPersistence { - async fn insert(&self, event_info: &EventInfo, content: Vec) -> anyhow::Result<()> { - let msg = - crunch_envelope::proto::wrap(event_info.domain, event_info.entity_type, &content); - let msg = Msg { - id: uuid::Uuid::new_v4().to_string(), - info: *event_info, - msg, - state: MsgState::Pending, - }; - let mut outbox = self.outbox.write().await; - outbox.push_back(msg.clone()); - self.store.write().await.insert(msg.id.clone(), msg); - - tracing::debug!( - event_info = event_info.to_string(), - content_len = content.len(), - "inserted event" - ); - - Ok(()) - } - - async fn next(&self) -> Option { - let mut outbox = self.outbox.write().await; - outbox.pop_front().map(|i| i.id) - } - - async fn get( - &self, - event_id: &str, - ) -> Result)>, PersistenceError> { - let store = self.store.read().await; - - let event = match store.get(event_id).filter(|m| m.state == MsgState::Pending) { - Some(event) => event, - None => return Ok(None), - }; - - let (content, _) = crunch_envelope::proto::unwrap(event.msg.as_slice()) - .map_err(|e| PersistenceError::GetErr(anyhow::anyhow!(e)))?; - - Ok(Some((event.info, content))) - } - - async fn update_published(&self, event_id: &str) -> Result<(), PersistenceError> { - match self.store.write().await.get_mut(event_id) { - Some(msg) => msg.state = MsgState::Published, - None => { - return Err(PersistenceError::UpdatePublished(anyhow::anyhow!( - "event was not found on id: {}", - event_id - ))) - } - } - - Ok(()) - } - } -} diff --git a/crates/crunch-in-memory/src/persistence.rs b/crates/crunch-in-memory/src/persistence.rs new file mode 100644 index 0000000..e69de29 diff --git a/crates/crunch-postgres/src/lib.rs b/crates/crunch-postgres/src/lib.rs index 7d12d9a..8b13789 100644 --- a/crates/crunch-postgres/src/lib.rs +++ b/crates/crunch-postgres/src/lib.rs @@ -1,14 +1 @@ -pub fn add(left: usize, right: usize) -> usize { - left + right -} -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn it_works() { - let result = add(2, 2); - assert_eq!(result, 4); - } -}