pub mod transport; pub mod persistence { use std::{ collections::{BTreeMap, VecDeque}, sync::Arc, }; use async_trait::async_trait; use crunch_traits::{errors::PersistenceError, EventInfo}; use tokio::sync::RwLock; #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)] enum MsgState { Pending, Published, } #[derive(Debug, Clone)] pub struct Msg { id: String, info: EventInfo, msg: Vec, state: MsgState, } pub struct InMemoryPersistence { pub outbox: Arc>>, pub store: Arc>>, } #[async_trait] impl crunch_traits::Persistence for InMemoryPersistence { async fn insert(&self, event_info: &EventInfo, content: Vec) -> anyhow::Result<()> { let msg = crunch_envelope::proto::wrap(event_info.domain, event_info.entity_type, &content); let msg = Msg { id: uuid::Uuid::new_v4().to_string(), info: *event_info, msg, state: MsgState::Pending, }; let mut outbox = self.outbox.write().await; outbox.push_back(msg.clone()); self.store.write().await.insert(msg.id.clone(), msg); tracing::debug!( event_info = event_info.to_string(), content_len = content.len(), "inserted event" ); Ok(()) } async fn next(&self) -> Option { let mut outbox = self.outbox.write().await; outbox.pop_front().map(|i| i.id) } async fn get( &self, event_id: &str, ) -> Result)>, PersistenceError> { let store = self.store.read().await; let event = match store.get(event_id).filter(|m| m.state == MsgState::Pending) { Some(event) => event, None => return Ok(None), }; let (content, _) = crunch_envelope::proto::unwrap(event.msg.as_slice()) .map_err(|e| PersistenceError::GetErr(anyhow::anyhow!(e)))?; Ok(Some((event.info, content))) } async fn update_published(&self, event_id: &str) -> Result<(), PersistenceError> { match self.store.write().await.get_mut(event_id) { Some(msg) => msg.state = MsgState::Published, None => { return Err(PersistenceError::UpdatePublished(anyhow::anyhow!( "event was not found on id: {}", event_id ))) } } Ok(()) } } }