diff --git a/Cargo.lock b/Cargo.lock index 063841b..d8bb67c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -594,12 +594,14 @@ version = "0.1.0" dependencies = [ "anyhow", "async-trait", + "crunch-envelope", "crunch-traits", "futures", "thiserror", "tokio", "tokio-stream", "tracing", + "uuid", ] [[package]] diff --git a/crates/crunch-in-memory/Cargo.toml b/crates/crunch-in-memory/Cargo.toml index e8bc10b..bc93a0e 100644 --- a/crates/crunch-in-memory/Cargo.toml +++ b/crates/crunch-in-memory/Cargo.toml @@ -7,6 +7,7 @@ edition = "2021" [dependencies] crunch-traits.workspace = true +crunch-envelope.workspace = true anyhow.workspace = true tracing.workspace = true @@ -14,4 +15,5 @@ tokio.workspace = true thiserror.workspace = true async-trait.workspace = true futures.workspace = true +uuid.workspace = true tokio-stream = {workspace = true, features = ["sync"]} diff --git a/crates/crunch-in-memory/src/lib.rs b/crates/crunch-in-memory/src/lib.rs index bfc7a33..4e5cc30 100644 --- a/crates/crunch-in-memory/src/lib.rs +++ b/crates/crunch-in-memory/src/lib.rs @@ -1 +1,92 @@ pub mod transport; + +pub mod persistence { + use std::{ + collections::{BTreeMap, VecDeque}, + sync::Arc, + }; + + use async_trait::async_trait; + use crunch_traits::{errors::PersistenceError, EventInfo}; + use tokio::sync::RwLock; + + #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)] + enum MsgState { + Pending, + Published, + } + + #[derive(Debug, Clone)] + pub struct Msg { + id: String, + info: EventInfo, + msg: Vec, + state: MsgState, + } + + pub struct InMemoryPersistence { + pub outbox: Arc>>, + pub store: Arc>>, + } + + #[async_trait] + impl crunch_traits::Persistence for InMemoryPersistence { + async fn insert(&self, event_info: &EventInfo, content: Vec) -> anyhow::Result<()> { + let msg = + crunch_envelope::proto::wrap(event_info.domain, event_info.entity_type, &content); + let msg = Msg { + id: uuid::Uuid::new_v4().to_string(), + info: *event_info, + msg, + state: MsgState::Pending, + }; + let mut outbox = self.outbox.write().await; + outbox.push_back(msg.clone()); + self.store.write().await.insert(msg.id.clone(), msg); + + tracing::debug!( + event_info = event_info.to_string(), + content_len = content.len(), + "inserted event" + ); + + Ok(()) + } + + async fn next(&self) -> Option { + let mut outbox = self.outbox.write().await; + outbox.pop_front().map(|i| i.id) + } + + async fn get( + &self, + event_id: &str, + ) -> Result)>, PersistenceError> { + let store = self.store.read().await; + + let event = match store.get(event_id).filter(|m| m.state == MsgState::Pending) { + Some(event) => event, + None => return Ok(None), + }; + + let (content, _) = crunch_envelope::proto::unwrap(event.msg.as_slice()) + .map_err(|e| PersistenceError::GetErr(anyhow::anyhow!(e)))?; + + Ok(Some((event.info, content))) + } + + async fn update_published(&self, event_id: &str) -> Result<(), PersistenceError> { + match self.store.write().await.get_mut(event_id) { + Some(msg) => msg.state = MsgState::Published, + None => { + return Err(PersistenceError::UpdatePublished(anyhow::anyhow!( + "event was not found on id: {}", + event_id + ))) + } + } + + Ok(()) + } + } +} diff --git a/crates/crunch/src/impls.rs b/crates/crunch/src/impls.rs index 3c02ecf..ac52c7b 100644 --- a/crates/crunch/src/impls.rs +++ b/crates/crunch/src/impls.rs @@ -1,88 +1,4 @@ -use std::{ - collections::{BTreeMap, VecDeque}, - ops::Deref, - sync::Arc, -}; - -use async_trait::async_trait; -use crunch_traits::{errors::PersistenceError, EventInfo}; -use tokio::sync::RwLock; - -#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)] -enum MsgState { - Pending, - Published, -} - -#[derive(Debug, Clone)] -struct Msg { - id: String, - info: EventInfo, - msg: Vec, - state: MsgState, -} - -pub struct InMemoryPersistence { - outbox: Arc>>, - store: Arc>>, -} - -#[async_trait] -impl crunch_traits::Persistence for InMemoryPersistence { - async fn insert(&self, event_info: &EventInfo, content: Vec) -> anyhow::Result<()> { - let msg = crunch_envelope::proto::wrap(event_info.domain, event_info.entity_type, &content); - let msg = Msg { - id: uuid::Uuid::new_v4().to_string(), - info: *event_info, - msg, - state: MsgState::Pending, - }; - let mut outbox = self.outbox.write().await; - outbox.push_back(msg.clone()); - self.store.write().await.insert(msg.id.clone(), msg); - - tracing::debug!( - event_info = event_info.to_string(), - content_len = content.len(), - "inserted event" - ); - - Ok(()) - } - - async fn next(&self) -> Option { - let mut outbox = self.outbox.write().await; - outbox.pop_front().map(|i| i.id) - } - - async fn get(&self, event_id: &str) -> Result)>, PersistenceError> { - let store = self.store.read().await; - - let event = match store.get(event_id).filter(|m| m.state == MsgState::Pending) { - Some(event) => event, - None => return Ok(None), - }; - - let (content, _) = crunch_envelope::proto::unwrap(event.msg.as_slice()) - .map_err(|e| PersistenceError::GetErr(anyhow::anyhow!(e)))?; - - Ok(Some((event.info, content))) - } - - async fn update_published(&self, event_id: &str) -> Result<(), PersistenceError> { - match self.store.write().await.get_mut(event_id) { - Some(msg) => msg.state = MsgState::Published, - None => { - return Err(PersistenceError::UpdatePublished(anyhow::anyhow!( - "event was not found on id: {}", - event_id - ))) - } - } - - Ok(()) - } -} +use std::{ops::Deref, sync::Arc}; #[derive(Clone)] pub struct Persistence { @@ -92,6 +8,8 @@ pub struct Persistence { impl Persistence { #[cfg(feature = "in-memory")] pub fn in_memory() -> Self { + use crunch_in_memory::persistence::InMemoryPersistence; + Self { inner: std::sync::Arc::new(InMemoryPersistence { outbox: std::sync::Arc::default(),