diff --git a/Cargo.lock b/Cargo.lock index 95d3c06..3fc9127 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -296,10 +296,14 @@ version = "0.1.0" dependencies = [ "anyhow", "async-trait", + "crunch-envelope", + "futures", "thiserror", "tokio", + "tokio-stream", "tracing", "tracing-subscriber", + "uuid", ] [[package]] @@ -367,6 +371,106 @@ version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "86d4de0081402f5e88cdac65c8dcdcc73118c1a7a465e2a05f0da05843a8ea33" +[[package]] +name = "futures" +version = "0.3.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "23342abe12aba583913b2e62f22225ff9c950774065e4bfb61a19cd9770fec40" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-channel" +version = "0.3.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "955518d47e09b25bbebc7a18df10b81f0c766eaf4c4f1cccef2fca5f2a4fb5f2" +dependencies = [ + "futures-core", + "futures-sink", +] + +[[package]] +name = "futures-core" +version = "0.3.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4bca583b7e26f571124fe5b7561d49cb2868d79116cfa0eefce955557c6fee8c" + +[[package]] +name = "futures-executor" +version = "0.3.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ccecee823288125bd88b4d7f565c9e58e41858e47ab72e8ea2d64e93624386e0" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-io" +version = "0.3.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4fff74096e71ed47f8e023204cfd0aa1289cd54ae5430a9523be060cdb849964" + +[[package]] +name = "futures-macro" +version = "0.3.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "89ca545a94061b6365f2c7355b4b32bd20df3ff95f02da9329b34ccc3bd6ee72" +dependencies = [ + "proc-macro2 1.0.67", + "quote 1.0.33", + "syn 2.0.35", +] + +[[package]] +name = "futures-sink" +version = "0.3.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f43be4fe21a13b9781a69afa4985b0f6ee0e1afab2c6f454a8cf30e2b2237b6e" + +[[package]] +name = "futures-task" +version = "0.3.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "76d3d132be6c0e6aa1534069c705a74a5997a356c0dc2f86a47765e5617c5b65" + +[[package]] +name = "futures-util" +version = "0.3.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "26b01e40b772d54cf6c6d721c1d1abd0647a0106a12ecaa1c186273392a69533" +dependencies = [ + "futures-channel", + "futures-core", + "futures-io", + "futures-macro", + "futures-sink", + "futures-task", + "memchr", + "pin-project-lite", + "pin-utils", + "slab", +] + +[[package]] +name = "getrandom" +version = "0.2.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "be4136b2a15dd319360be1c07d9933517ccf0be8f16bf62a3bee4f0d618df427" +dependencies = [ + "cfg-if", + "libc", + "wasi", +] + [[package]] name = "gimli" version = "0.28.0" @@ -636,6 +740,12 @@ version = "0.2.13" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8afb450f006bf6385ca15ef45d71d2288452bc3683ce2e2cacc0d18e4be60b58" +[[package]] +name = "pin-utils" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" + [[package]] name = "plotters" version = "0.3.5" @@ -933,6 +1043,15 @@ dependencies = [ "libc", ] +[[package]] +name = "slab" +version = "0.4.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f92a496fb766b417c996b9c5e57daf2f7ad3b0bebe1ccfca4856390e3d3bb67" +dependencies = [ + "autocfg", +] + [[package]] name = "smallvec" version = "1.11.0" @@ -1067,6 +1186,17 @@ dependencies = [ "syn 2.0.35", ] +[[package]] +name = "tokio-stream" +version = "0.1.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "397c988d37662c7dda6d2208364a706264bf3d6138b11d436cbac0ad38832842" +dependencies = [ + "futures-core", + "pin-project-lite", + "tokio", +] + [[package]] name = "tracing" version = "0.1.37" @@ -1144,6 +1274,15 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fc72304796d0818e357ead4e000d19c9c174ab23dc11093ac919054d20a6a7fc" +[[package]] +name = "uuid" +version = "1.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "79daa5ed5740825c40b389c5e50312b9c86df53fccd33f281df655642b43869d" +dependencies = [ + "getrandom", +] + [[package]] name = "valuable" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index fa369e3..2f69601 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,7 +8,10 @@ crunch-envelope = { path = "crates/crunch-envelope" } anyhow = { version = "1.0.71" } tokio = { version = "1", features = ["full"] } +tokio-stream = {version = "0.1.14"} tracing = { version = "0.1", features = ["log"] } tracing-subscriber = "0.3.17" thiserror = {version = "1.0.48"} async-trait = "0.1.73" +uuid = { version = "1.4.1", features = ["v4"]} +futures = "0.3.28" diff --git a/crates/crunch/Cargo.toml b/crates/crunch/Cargo.toml index c2fd671..7539027 100644 --- a/crates/crunch/Cargo.toml +++ b/crates/crunch/Cargo.toml @@ -4,11 +4,16 @@ version = "0.1.0" edition = "2021" [dependencies] +crunch-envelope.workspace = true + anyhow.workspace = true tracing.workspace = true tokio.workspace = true +tokio-stream.workspace = true thiserror.workspace = true async-trait.workspace = true +uuid.workspace = true +futures.workspace = true [dev-dependencies] tracing-subscriber.workspace = true \ No newline at end of file diff --git a/crates/crunch/src/errors.rs b/crates/crunch/src/errors.rs new file mode 100644 index 0000000..202e003 --- /dev/null +++ b/crates/crunch/src/errors.rs @@ -0,0 +1,28 @@ +use thiserror::Error; + +#[derive(Error, Debug)] +pub enum SerializeError { + #[error("failed to serialize")] + FailedToSerialize(anyhow::Error), +} + +#[derive(Error, Debug)] +pub enum DeserializeError { + #[error("failed to serialize")] + FailedToDeserialize(anyhow::Error), +} + +#[derive(Error, Debug)] +pub enum PublishError { + #[error("failed to serialize")] + SerializeError(#[source] SerializeError), + + #[error("failed to commit to database")] + DbError(#[source] anyhow::Error), + + #[error("transaction failed")] + DbTxError(#[source] anyhow::Error), + + #[error("failed to connect to database")] + ConnectionError(#[source] anyhow::Error), +} diff --git a/crates/crunch/src/impls.rs b/crates/crunch/src/impls.rs new file mode 100644 index 0000000..08a6f94 --- /dev/null +++ b/crates/crunch/src/impls.rs @@ -0,0 +1,104 @@ +use std::{collections::VecDeque, future::Future, ops::Deref, pin::Pin, sync::Arc, task::Poll}; + +use async_trait::async_trait; +use tokio::sync::{Mutex, OnceCell, RwLock}; +use tokio_stream::Stream; + +use crate::{traits, EventInfo}; + +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)] +enum MsgState { + Pending, + Published, +} + +#[derive(Debug, Clone)] +struct Msg { + id: String, + info: EventInfo, + msg: Vec, + state: MsgState, +} + +pub struct InMemoryPersistence { + outbox: Arc>>, +} + +#[async_trait] +impl traits::Persistence for InMemoryPersistence { + async fn insert(&self, event_info: &EventInfo, content: Vec) -> anyhow::Result<()> { + let msg = crunch_envelope::proto::wrap(event_info.domain, event_info.entity_type, &content); + + let mut outbox = self.outbox.write().await; + + outbox.push_back(Msg { + id: uuid::Uuid::new_v4().to_string(), + info: event_info.clone(), + msg, + state: MsgState::Pending, + }); + + tracing::info!( + event_info = event_info.to_string(), + content_len = content.len(), + "inserted event" + ); + + Ok(()) + } + + async fn next(&self) -> Option { + let mut outbox = self.outbox.write().await; + outbox.pop_front().map(|i| i.id) + } +} + +#[derive(Clone)] +pub struct Persistence { + inner: Arc, + pending: Arc>> + Send + Sync>>>>, +} + +impl Persistence { + pub fn in_memory() -> Self { + Self { + inner: Arc::new(InMemoryPersistence { + outbox: Arc::default(), + }), + pending: Arc::default(), + } + } +} + +impl Deref for Persistence { + type Target = Arc; + + fn deref(&self) -> &Self::Target { + &self.inner + } +} + +impl Stream for Persistence { + type Item = OnceCell; + + fn poll_next( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> Poll> { + let mut pending = self.pending; + + if pending.is_none() { + *pending = Some(Box::pin(self.inner.next())); + } + + let fut = pending.as_mut().unwrap(); + + match fut.as_mut().poll(cx) { + Poll::Ready(v) => { + *pending = None; + Poll::Ready(v) + } + Poll::Pending => Poll::Pending, + } + } +} diff --git a/crates/crunch/src/lib.rs b/crates/crunch/src/lib.rs index 37bf3b6..9b26415 100644 --- a/crates/crunch/src/lib.rs +++ b/crates/crunch/src/lib.rs @@ -1,119 +1,36 @@ -mod traits { - use std::fmt::Display; - - use async_trait::async_trait; - - use crate::{DeserializeError, SerializeError}; - - #[async_trait] - pub trait Persistence { - async fn insert(&self, event_info: &EventInfo, content: Vec) -> anyhow::Result<()>; - } - - pub trait Serializer { - fn serialize(&self) -> Result, SerializeError>; - } - - pub trait Deserializer { - fn deserialize(raw: Vec) -> Result - where - Self: Sized; - } - - #[derive(Debug, Clone, Copy)] - pub struct EventInfo { - pub domain: &'static str, - pub entity_type: &'static str, - } - - impl Display for EventInfo { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.write_str(&format!( - "domain: {}, entity_type: {}", - self.domain, self.entity_type - )) - } - } - - pub trait Event: Serializer + Deserializer { - fn event_info(&self) -> EventInfo; - } -} - -mod impls { - use std::{ops::Deref, sync::Arc}; - - use async_trait::async_trait; - - use crate::{traits, EventInfo}; - - pub struct InMemoryPersistence {} - - #[async_trait] - impl traits::Persistence for InMemoryPersistence { - async fn insert(&self, event_info: &EventInfo, content: Vec) -> anyhow::Result<()> { - tracing::info!( - event_info = event_info.to_string(), - content_len = content.len(), - "inserted event" - ); - - Ok(()) - } - } - - pub struct Persistence(Arc); - - impl Persistence { - pub fn in_memory() -> Self { - Self(Arc::new(InMemoryPersistence {})) - } - } - - impl Deref for Persistence { - type Target = Arc; - - fn deref(&self) -> &Self::Target { - &self.0 - } - } -} - -mod errors { - use thiserror::Error; - - #[derive(Error, Debug)] - pub enum SerializeError { - #[error("failed to serialize")] - FailedToSerialize(anyhow::Error), - } - - #[derive(Error, Debug)] - pub enum DeserializeError { - #[error("failed to serialize")] - FailedToDeserialize(anyhow::Error), - } - - #[derive(Error, Debug)] - pub enum PublishError { - #[error("failed to serialize")] - SerializeError(#[source] SerializeError), - - #[error("failed to commit to database")] - DbError(#[source] anyhow::Error), - - #[error("transaction failed")] - DbTxError(#[source] anyhow::Error), - - #[error("failed to connect to database")] - ConnectionError(#[source] anyhow::Error), - } -} +mod errors; +mod impls; +mod traits; pub use errors::*; pub use impls::Persistence; pub use traits::{Deserializer, Event, EventInfo, Serializer}; +mod outbox { + use std::sync::Arc; + + pub use crate::Persistence; + + pub struct OutboxHandler { + persistence: Persistence, + } + + impl OutboxHandler { + pub fn new(persistence: Persistence) -> Self { + Self { persistence } + } + + pub async fn spawn(&mut self) { + let p = self.persistence.clone(); + tokio::spawn(async move { + let p = p; + + while let Some(item) = p.next().await {} + }); + } + } +} + pub struct Publisher { persistence: Persistence, } diff --git a/crates/crunch/src/traits.rs b/crates/crunch/src/traits.rs new file mode 100644 index 0000000..9b5cbc0 --- /dev/null +++ b/crates/crunch/src/traits.rs @@ -0,0 +1,41 @@ +use std::fmt::Display; + +use async_trait::async_trait; +use tokio::sync::OnceCell; + +use crate::{DeserializeError, SerializeError}; + +#[async_trait] +pub trait Persistence { + async fn insert(&self, event_info: &EventInfo, content: Vec) -> anyhow::Result<()>; + async fn next(&self) -> Option>; +} + +pub trait Serializer { + fn serialize(&self) -> Result, SerializeError>; +} + +pub trait Deserializer { + fn deserialize(raw: Vec) -> Result + where + Self: Sized; +} + +#[derive(Debug, Clone, Copy)] +pub struct EventInfo { + pub domain: &'static str, + pub entity_type: &'static str, +} + +impl Display for EventInfo { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str(&format!( + "domain: {}, entity_type: {}", + self.domain, self.entity_type + )) + } +} + +pub trait Event: Serializer + Deserializer { + fn event_info(&self) -> EventInfo; +}