feat: move to package

Signed-off-by: kjuulh <contact@kjuulh.io>
This commit is contained in:
Kasper Juul Hermansen 2023-10-02 21:49:30 +02:00
parent 3fbd0f8720
commit 97173f1a79
Signed by: kjuulh
GPG Key ID: 57B6E1465221F912
4 changed files with 98 additions and 85 deletions

2
Cargo.lock generated
View File

@ -594,12 +594,14 @@ version = "0.1.0"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"async-trait", "async-trait",
"crunch-envelope",
"crunch-traits", "crunch-traits",
"futures", "futures",
"thiserror", "thiserror",
"tokio", "tokio",
"tokio-stream", "tokio-stream",
"tracing", "tracing",
"uuid",
] ]
[[package]] [[package]]

View File

@ -7,6 +7,7 @@ edition = "2021"
[dependencies] [dependencies]
crunch-traits.workspace = true crunch-traits.workspace = true
crunch-envelope.workspace = true
anyhow.workspace = true anyhow.workspace = true
tracing.workspace = true tracing.workspace = true
@ -14,4 +15,5 @@ tokio.workspace = true
thiserror.workspace = true thiserror.workspace = true
async-trait.workspace = true async-trait.workspace = true
futures.workspace = true futures.workspace = true
uuid.workspace = true
tokio-stream = {workspace = true, features = ["sync"]} tokio-stream = {workspace = true, features = ["sync"]}

View File

@ -1 +1,92 @@
pub mod transport; pub mod transport;
pub mod persistence {
use std::{
collections::{BTreeMap, VecDeque},
sync::Arc,
};
use async_trait::async_trait;
use crunch_traits::{errors::PersistenceError, EventInfo};
use tokio::sync::RwLock;
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
enum MsgState {
Pending,
Published,
}
#[derive(Debug, Clone)]
pub struct Msg {
id: String,
info: EventInfo,
msg: Vec<u8>,
state: MsgState,
}
pub struct InMemoryPersistence {
pub outbox: Arc<RwLock<VecDeque<Msg>>>,
pub store: Arc<RwLock<BTreeMap<String, Msg>>>,
}
#[async_trait]
impl crunch_traits::Persistence for InMemoryPersistence {
async fn insert(&self, event_info: &EventInfo, content: Vec<u8>) -> anyhow::Result<()> {
let msg =
crunch_envelope::proto::wrap(event_info.domain, event_info.entity_type, &content);
let msg = Msg {
id: uuid::Uuid::new_v4().to_string(),
info: *event_info,
msg,
state: MsgState::Pending,
};
let mut outbox = self.outbox.write().await;
outbox.push_back(msg.clone());
self.store.write().await.insert(msg.id.clone(), msg);
tracing::debug!(
event_info = event_info.to_string(),
content_len = content.len(),
"inserted event"
);
Ok(())
}
async fn next(&self) -> Option<String> {
let mut outbox = self.outbox.write().await;
outbox.pop_front().map(|i| i.id)
}
async fn get(
&self,
event_id: &str,
) -> Result<Option<(EventInfo, Vec<u8>)>, PersistenceError> {
let store = self.store.read().await;
let event = match store.get(event_id).filter(|m| m.state == MsgState::Pending) {
Some(event) => event,
None => return Ok(None),
};
let (content, _) = crunch_envelope::proto::unwrap(event.msg.as_slice())
.map_err(|e| PersistenceError::GetErr(anyhow::anyhow!(e)))?;
Ok(Some((event.info, content)))
}
async fn update_published(&self, event_id: &str) -> Result<(), PersistenceError> {
match self.store.write().await.get_mut(event_id) {
Some(msg) => msg.state = MsgState::Published,
None => {
return Err(PersistenceError::UpdatePublished(anyhow::anyhow!(
"event was not found on id: {}",
event_id
)))
}
}
Ok(())
}
}
}

View File

@ -1,88 +1,4 @@
use std::{ use std::{ops::Deref, sync::Arc};
collections::{BTreeMap, VecDeque},
ops::Deref,
sync::Arc,
};
use async_trait::async_trait;
use crunch_traits::{errors::PersistenceError, EventInfo};
use tokio::sync::RwLock;
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
enum MsgState {
Pending,
Published,
}
#[derive(Debug, Clone)]
struct Msg {
id: String,
info: EventInfo,
msg: Vec<u8>,
state: MsgState,
}
pub struct InMemoryPersistence {
outbox: Arc<RwLock<VecDeque<Msg>>>,
store: Arc<RwLock<BTreeMap<String, Msg>>>,
}
#[async_trait]
impl crunch_traits::Persistence for InMemoryPersistence {
async fn insert(&self, event_info: &EventInfo, content: Vec<u8>) -> anyhow::Result<()> {
let msg = crunch_envelope::proto::wrap(event_info.domain, event_info.entity_type, &content);
let msg = Msg {
id: uuid::Uuid::new_v4().to_string(),
info: *event_info,
msg,
state: MsgState::Pending,
};
let mut outbox = self.outbox.write().await;
outbox.push_back(msg.clone());
self.store.write().await.insert(msg.id.clone(), msg);
tracing::debug!(
event_info = event_info.to_string(),
content_len = content.len(),
"inserted event"
);
Ok(())
}
async fn next(&self) -> Option<String> {
let mut outbox = self.outbox.write().await;
outbox.pop_front().map(|i| i.id)
}
async fn get(&self, event_id: &str) -> Result<Option<(EventInfo, Vec<u8>)>, PersistenceError> {
let store = self.store.read().await;
let event = match store.get(event_id).filter(|m| m.state == MsgState::Pending) {
Some(event) => event,
None => return Ok(None),
};
let (content, _) = crunch_envelope::proto::unwrap(event.msg.as_slice())
.map_err(|e| PersistenceError::GetErr(anyhow::anyhow!(e)))?;
Ok(Some((event.info, content)))
}
async fn update_published(&self, event_id: &str) -> Result<(), PersistenceError> {
match self.store.write().await.get_mut(event_id) {
Some(msg) => msg.state = MsgState::Published,
None => {
return Err(PersistenceError::UpdatePublished(anyhow::anyhow!(
"event was not found on id: {}",
event_id
)))
}
}
Ok(())
}
}
#[derive(Clone)] #[derive(Clone)]
pub struct Persistence { pub struct Persistence {
@ -92,6 +8,8 @@ pub struct Persistence {
impl Persistence { impl Persistence {
#[cfg(feature = "in-memory")] #[cfg(feature = "in-memory")]
pub fn in_memory() -> Self { pub fn in_memory() -> Self {
use crunch_in_memory::persistence::InMemoryPersistence;
Self { Self {
inner: std::sync::Arc::new(InMemoryPersistence { inner: std::sync::Arc::new(InMemoryPersistence {
outbox: std::sync::Arc::default(), outbox: std::sync::Arc::default(),