feat: with base interface

Signed-off-by: kjuulh <contact@kjuulh.io>
This commit is contained in:
2023-09-16 16:12:06 +02:00
parent ee5b85baa1
commit c6b903fa58
5 changed files with 329 additions and 0 deletions

View File

@@ -7,3 +7,8 @@ edition = "2021"
anyhow.workspace = true
tracing.workspace = true
tokio.workspace = true
thiserror.workspace = true
async-trait.workspace = true
[dev-dependencies]
tracing-subscriber.workspace = true

View File

@@ -0,0 +1,47 @@
use crunch::{Deserializer, Event, EventInfo, Persistence, Publisher, Serializer};
struct SomeEvent {
name: String,
}
impl Serializer for SomeEvent {
fn serialize(&self) -> Result<Vec<u8>, crunch::SerializeError> {
Ok(b"field=name".to_vec())
}
}
impl Deserializer for SomeEvent {
fn deserialize(raw: Vec<u8>) -> Result<Self, crunch::DeserializeError>
where
Self: Sized,
{
Ok(Self {
name: "something".into(),
})
}
}
impl Event for SomeEvent {
fn event_info(&self) -> EventInfo {
EventInfo {
domain: "some-domain",
entity_type: "some-entity",
}
}
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt::init();
let in_memory = Persistence::in_memory();
let publisher = Publisher::new(in_memory);
publisher
.publish(SomeEvent {
name: "something".into(),
})
.await?;
Ok(())
}

View File

@@ -1 +1,146 @@
mod traits {
use std::fmt::Display;
use async_trait::async_trait;
use crate::{DeserializeError, SerializeError};
#[async_trait]
pub trait Persistence {
async fn insert(&self, event_info: &EventInfo, content: Vec<u8>) -> anyhow::Result<()>;
}
pub trait Serializer {
fn serialize(&self) -> Result<Vec<u8>, SerializeError>;
}
pub trait Deserializer {
fn deserialize(raw: Vec<u8>) -> Result<Self, DeserializeError>
where
Self: Sized;
}
#[derive(Debug, Clone, Copy)]
pub struct EventInfo {
pub domain: &'static str,
pub entity_type: &'static str,
}
impl Display for EventInfo {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(&format!(
"domain: {}, entity_type: {}",
self.domain, self.entity_type
))
}
}
pub trait Event: Serializer + Deserializer {
fn event_info(&self) -> EventInfo;
}
}
mod impls {
use std::{ops::Deref, sync::Arc};
use async_trait::async_trait;
use crate::{traits, EventInfo};
pub struct InMemoryPersistence {}
#[async_trait]
impl traits::Persistence for InMemoryPersistence {
async fn insert(&self, event_info: &EventInfo, content: Vec<u8>) -> anyhow::Result<()> {
tracing::info!(
event_info = event_info.to_string(),
content_len = content.len(),
"inserted event"
);
Ok(())
}
}
pub struct Persistence(Arc<dyn traits::Persistence + Send + Sync + 'static>);
impl Persistence {
pub fn in_memory() -> Self {
Self(Arc::new(InMemoryPersistence {}))
}
}
impl Deref for Persistence {
type Target = Arc<dyn traits::Persistence + Send + Sync + 'static>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
}
mod errors {
use thiserror::Error;
#[derive(Error, Debug)]
pub enum SerializeError {
#[error("failed to serialize")]
FailedToSerialize(anyhow::Error),
}
#[derive(Error, Debug)]
pub enum DeserializeError {
#[error("failed to serialize")]
FailedToDeserialize(anyhow::Error),
}
#[derive(Error, Debug)]
pub enum PublishError {
#[error("failed to serialize")]
SerializeError(#[source] SerializeError),
#[error("failed to commit to database")]
DbError(#[source] anyhow::Error),
#[error("transaction failed")]
DbTxError(#[source] anyhow::Error),
#[error("failed to connect to database")]
ConnectionError(#[source] anyhow::Error),
}
}
pub use errors::*;
pub use impls::Persistence;
pub use traits::{Deserializer, Event, EventInfo, Serializer};
pub struct Publisher {
persistence: Persistence,
}
#[allow(dead_code)]
impl Publisher {
pub fn new(persistence: Persistence) -> Self {
Self { persistence }
}
pub async fn publish<T>(&self, event: T) -> Result<(), PublishError>
where
T: Event,
{
let content = event.serialize().map_err(PublishError::SerializeError)?;
self.persistence
.insert(&event.event_info(), content)
.await
.map_err(PublishError::DbError)?;
Ok(())
}
pub async fn publish_tx<T>(&self, event: T) -> Result<(), PublishError>
where
T: Event,
{
// TODO: add transaction support later
self.publish(event).await
}
}