feat: with listener

Signed-off-by: kjuulh <contact@kjuulh.io>
This commit is contained in:
Kasper Juul Hermansen 2023-09-22 21:31:32 +02:00
parent 41f36c09be
commit 5bb9286b10
Signed by: kjuulh
GPG Key ID: 57B6E1465221F912
7 changed files with 348 additions and 111 deletions

139
Cargo.lock generated
View File

@ -296,10 +296,14 @@ version = "0.1.0"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"async-trait", "async-trait",
"crunch-envelope",
"futures",
"thiserror", "thiserror",
"tokio", "tokio",
"tokio-stream",
"tracing", "tracing",
"tracing-subscriber", "tracing-subscriber",
"uuid",
] ]
[[package]] [[package]]
@ -367,6 +371,106 @@ version = "0.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "86d4de0081402f5e88cdac65c8dcdcc73118c1a7a465e2a05f0da05843a8ea33" checksum = "86d4de0081402f5e88cdac65c8dcdcc73118c1a7a465e2a05f0da05843a8ea33"
[[package]]
name = "futures"
version = "0.3.28"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "23342abe12aba583913b2e62f22225ff9c950774065e4bfb61a19cd9770fec40"
dependencies = [
"futures-channel",
"futures-core",
"futures-executor",
"futures-io",
"futures-sink",
"futures-task",
"futures-util",
]
[[package]]
name = "futures-channel"
version = "0.3.28"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "955518d47e09b25bbebc7a18df10b81f0c766eaf4c4f1cccef2fca5f2a4fb5f2"
dependencies = [
"futures-core",
"futures-sink",
]
[[package]]
name = "futures-core"
version = "0.3.28"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4bca583b7e26f571124fe5b7561d49cb2868d79116cfa0eefce955557c6fee8c"
[[package]]
name = "futures-executor"
version = "0.3.28"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ccecee823288125bd88b4d7f565c9e58e41858e47ab72e8ea2d64e93624386e0"
dependencies = [
"futures-core",
"futures-task",
"futures-util",
]
[[package]]
name = "futures-io"
version = "0.3.28"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4fff74096e71ed47f8e023204cfd0aa1289cd54ae5430a9523be060cdb849964"
[[package]]
name = "futures-macro"
version = "0.3.28"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "89ca545a94061b6365f2c7355b4b32bd20df3ff95f02da9329b34ccc3bd6ee72"
dependencies = [
"proc-macro2 1.0.67",
"quote 1.0.33",
"syn 2.0.35",
]
[[package]]
name = "futures-sink"
version = "0.3.28"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f43be4fe21a13b9781a69afa4985b0f6ee0e1afab2c6f454a8cf30e2b2237b6e"
[[package]]
name = "futures-task"
version = "0.3.28"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "76d3d132be6c0e6aa1534069c705a74a5997a356c0dc2f86a47765e5617c5b65"
[[package]]
name = "futures-util"
version = "0.3.28"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "26b01e40b772d54cf6c6d721c1d1abd0647a0106a12ecaa1c186273392a69533"
dependencies = [
"futures-channel",
"futures-core",
"futures-io",
"futures-macro",
"futures-sink",
"futures-task",
"memchr",
"pin-project-lite",
"pin-utils",
"slab",
]
[[package]]
name = "getrandom"
version = "0.2.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "be4136b2a15dd319360be1c07d9933517ccf0be8f16bf62a3bee4f0d618df427"
dependencies = [
"cfg-if",
"libc",
"wasi",
]
[[package]] [[package]]
name = "gimli" name = "gimli"
version = "0.28.0" version = "0.28.0"
@ -636,6 +740,12 @@ version = "0.2.13"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8afb450f006bf6385ca15ef45d71d2288452bc3683ce2e2cacc0d18e4be60b58" checksum = "8afb450f006bf6385ca15ef45d71d2288452bc3683ce2e2cacc0d18e4be60b58"
[[package]]
name = "pin-utils"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
[[package]] [[package]]
name = "plotters" name = "plotters"
version = "0.3.5" version = "0.3.5"
@ -933,6 +1043,15 @@ dependencies = [
"libc", "libc",
] ]
[[package]]
name = "slab"
version = "0.4.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8f92a496fb766b417c996b9c5e57daf2f7ad3b0bebe1ccfca4856390e3d3bb67"
dependencies = [
"autocfg",
]
[[package]] [[package]]
name = "smallvec" name = "smallvec"
version = "1.11.0" version = "1.11.0"
@ -1067,6 +1186,17 @@ dependencies = [
"syn 2.0.35", "syn 2.0.35",
] ]
[[package]]
name = "tokio-stream"
version = "0.1.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "397c988d37662c7dda6d2208364a706264bf3d6138b11d436cbac0ad38832842"
dependencies = [
"futures-core",
"pin-project-lite",
"tokio",
]
[[package]] [[package]]
name = "tracing" name = "tracing"
version = "0.1.37" version = "0.1.37"
@ -1144,6 +1274,15 @@ version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fc72304796d0818e357ead4e000d19c9c174ab23dc11093ac919054d20a6a7fc" checksum = "fc72304796d0818e357ead4e000d19c9c174ab23dc11093ac919054d20a6a7fc"
[[package]]
name = "uuid"
version = "1.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "79daa5ed5740825c40b389c5e50312b9c86df53fccd33f281df655642b43869d"
dependencies = [
"getrandom",
]
[[package]] [[package]]
name = "valuable" name = "valuable"
version = "0.1.0" version = "0.1.0"

View File

@ -8,7 +8,10 @@ crunch-envelope = { path = "crates/crunch-envelope" }
anyhow = { version = "1.0.71" } anyhow = { version = "1.0.71" }
tokio = { version = "1", features = ["full"] } tokio = { version = "1", features = ["full"] }
tokio-stream = {version = "0.1.14"}
tracing = { version = "0.1", features = ["log"] } tracing = { version = "0.1", features = ["log"] }
tracing-subscriber = "0.3.17" tracing-subscriber = "0.3.17"
thiserror = {version = "1.0.48"} thiserror = {version = "1.0.48"}
async-trait = "0.1.73" async-trait = "0.1.73"
uuid = { version = "1.4.1", features = ["v4"]}
futures = "0.3.28"

View File

@ -4,11 +4,16 @@ version = "0.1.0"
edition = "2021" edition = "2021"
[dependencies] [dependencies]
crunch-envelope.workspace = true
anyhow.workspace = true anyhow.workspace = true
tracing.workspace = true tracing.workspace = true
tokio.workspace = true tokio.workspace = true
tokio-stream.workspace = true
thiserror.workspace = true thiserror.workspace = true
async-trait.workspace = true async-trait.workspace = true
uuid.workspace = true
futures.workspace = true
[dev-dependencies] [dev-dependencies]
tracing-subscriber.workspace = true tracing-subscriber.workspace = true

View File

@ -0,0 +1,28 @@
use thiserror::Error;
#[derive(Error, Debug)]
pub enum SerializeError {
#[error("failed to serialize")]
FailedToSerialize(anyhow::Error),
}
#[derive(Error, Debug)]
pub enum DeserializeError {
#[error("failed to serialize")]
FailedToDeserialize(anyhow::Error),
}
#[derive(Error, Debug)]
pub enum PublishError {
#[error("failed to serialize")]
SerializeError(#[source] SerializeError),
#[error("failed to commit to database")]
DbError(#[source] anyhow::Error),
#[error("transaction failed")]
DbTxError(#[source] anyhow::Error),
#[error("failed to connect to database")]
ConnectionError(#[source] anyhow::Error),
}

104
crates/crunch/src/impls.rs Normal file
View File

@ -0,0 +1,104 @@
use std::{collections::VecDeque, future::Future, ops::Deref, pin::Pin, sync::Arc, task::Poll};
use async_trait::async_trait;
use tokio::sync::{Mutex, OnceCell, RwLock};
use tokio_stream::Stream;
use crate::{traits, EventInfo};
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
enum MsgState {
Pending,
Published,
}
#[derive(Debug, Clone)]
struct Msg {
id: String,
info: EventInfo,
msg: Vec<u8>,
state: MsgState,
}
pub struct InMemoryPersistence {
outbox: Arc<RwLock<VecDeque<Msg>>>,
}
#[async_trait]
impl traits::Persistence for InMemoryPersistence {
async fn insert(&self, event_info: &EventInfo, content: Vec<u8>) -> anyhow::Result<()> {
let msg = crunch_envelope::proto::wrap(event_info.domain, event_info.entity_type, &content);
let mut outbox = self.outbox.write().await;
outbox.push_back(Msg {
id: uuid::Uuid::new_v4().to_string(),
info: event_info.clone(),
msg,
state: MsgState::Pending,
});
tracing::info!(
event_info = event_info.to_string(),
content_len = content.len(),
"inserted event"
);
Ok(())
}
async fn next(&self) -> Option<String> {
let mut outbox = self.outbox.write().await;
outbox.pop_front().map(|i| i.id)
}
}
#[derive(Clone)]
pub struct Persistence {
inner: Arc<dyn traits::Persistence + Send + Sync + 'static>,
pending: Arc<Option<Pin<Box<dyn Future<Output = Option<OnceCell<String>>> + Send + Sync>>>>,
}
impl Persistence {
pub fn in_memory() -> Self {
Self {
inner: Arc::new(InMemoryPersistence {
outbox: Arc::default(),
}),
pending: Arc::default(),
}
}
}
impl Deref for Persistence {
type Target = Arc<dyn traits::Persistence + Send + Sync + 'static>;
fn deref(&self) -> &Self::Target {
&self.inner
}
}
impl Stream for Persistence {
type Item = OnceCell<String>;
fn poll_next(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Option<Self::Item>> {
let mut pending = self.pending;
if pending.is_none() {
*pending = Some(Box::pin(self.inner.next()));
}
let fut = pending.as_mut().unwrap();
match fut.as_mut().poll(cx) {
Poll::Ready(v) => {
*pending = None;
Poll::Ready(v)
}
Poll::Pending => Poll::Pending,
}
}
}

View File

@ -1,119 +1,36 @@
mod traits { mod errors;
use std::fmt::Display; mod impls;
mod traits;
use async_trait::async_trait;
use crate::{DeserializeError, SerializeError};
#[async_trait]
pub trait Persistence {
async fn insert(&self, event_info: &EventInfo, content: Vec<u8>) -> anyhow::Result<()>;
}
pub trait Serializer {
fn serialize(&self) -> Result<Vec<u8>, SerializeError>;
}
pub trait Deserializer {
fn deserialize(raw: Vec<u8>) -> Result<Self, DeserializeError>
where
Self: Sized;
}
#[derive(Debug, Clone, Copy)]
pub struct EventInfo {
pub domain: &'static str,
pub entity_type: &'static str,
}
impl Display for EventInfo {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(&format!(
"domain: {}, entity_type: {}",
self.domain, self.entity_type
))
}
}
pub trait Event: Serializer + Deserializer {
fn event_info(&self) -> EventInfo;
}
}
mod impls {
use std::{ops::Deref, sync::Arc};
use async_trait::async_trait;
use crate::{traits, EventInfo};
pub struct InMemoryPersistence {}
#[async_trait]
impl traits::Persistence for InMemoryPersistence {
async fn insert(&self, event_info: &EventInfo, content: Vec<u8>) -> anyhow::Result<()> {
tracing::info!(
event_info = event_info.to_string(),
content_len = content.len(),
"inserted event"
);
Ok(())
}
}
pub struct Persistence(Arc<dyn traits::Persistence + Send + Sync + 'static>);
impl Persistence {
pub fn in_memory() -> Self {
Self(Arc::new(InMemoryPersistence {}))
}
}
impl Deref for Persistence {
type Target = Arc<dyn traits::Persistence + Send + Sync + 'static>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
}
mod errors {
use thiserror::Error;
#[derive(Error, Debug)]
pub enum SerializeError {
#[error("failed to serialize")]
FailedToSerialize(anyhow::Error),
}
#[derive(Error, Debug)]
pub enum DeserializeError {
#[error("failed to serialize")]
FailedToDeserialize(anyhow::Error),
}
#[derive(Error, Debug)]
pub enum PublishError {
#[error("failed to serialize")]
SerializeError(#[source] SerializeError),
#[error("failed to commit to database")]
DbError(#[source] anyhow::Error),
#[error("transaction failed")]
DbTxError(#[source] anyhow::Error),
#[error("failed to connect to database")]
ConnectionError(#[source] anyhow::Error),
}
}
pub use errors::*; pub use errors::*;
pub use impls::Persistence; pub use impls::Persistence;
pub use traits::{Deserializer, Event, EventInfo, Serializer}; pub use traits::{Deserializer, Event, EventInfo, Serializer};
mod outbox {
use std::sync::Arc;
pub use crate::Persistence;
pub struct OutboxHandler {
persistence: Persistence,
}
impl OutboxHandler {
pub fn new(persistence: Persistence) -> Self {
Self { persistence }
}
pub async fn spawn(&mut self) {
let p = self.persistence.clone();
tokio::spawn(async move {
let p = p;
while let Some(item) = p.next().await {}
});
}
}
}
pub struct Publisher { pub struct Publisher {
persistence: Persistence, persistence: Persistence,
} }

View File

@ -0,0 +1,41 @@
use std::fmt::Display;
use async_trait::async_trait;
use tokio::sync::OnceCell;
use crate::{DeserializeError, SerializeError};
#[async_trait]
pub trait Persistence {
async fn insert(&self, event_info: &EventInfo, content: Vec<u8>) -> anyhow::Result<()>;
async fn next(&self) -> Option<OnceCell<String>>;
}
pub trait Serializer {
fn serialize(&self) -> Result<Vec<u8>, SerializeError>;
}
pub trait Deserializer {
fn deserialize(raw: Vec<u8>) -> Result<Self, DeserializeError>
where
Self: Sized;
}
#[derive(Debug, Clone, Copy)]
pub struct EventInfo {
pub domain: &'static str,
pub entity_type: &'static str,
}
impl Display for EventInfo {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(&format!(
"domain: {}, entity_type: {}",
self.domain, self.entity_type
))
}
}
pub trait Event: Serializer + Deserializer {
fn event_info(&self) -> EventInfo;
}