From 0a78124489891d2ff28fb5a1fb85f149f055edd3 Mon Sep 17 00:00:00 2001 From: kjuulh Date: Sun, 24 Sep 2023 00:00:41 +0200 Subject: [PATCH] feat: with many producers Signed-off-by: kjuulh --- crates/crunch-traits/src/errors.rs | 6 ++ crates/crunch/examples/basic.rs | 78 ++++++++-------- crates/crunch/src/lib.rs | 141 +++++++++++++++++++---------- crates/crunch/src/publisher.rs | 1 + crates/crunch/src/subscriber.rs | 54 +++++++++++ 5 files changed, 195 insertions(+), 85 deletions(-) create mode 100644 crates/crunch/src/subscriber.rs diff --git a/crates/crunch-traits/src/errors.rs b/crates/crunch-traits/src/errors.rs index 2440a82..d131b68 100644 --- a/crates/crunch-traits/src/errors.rs +++ b/crates/crunch-traits/src/errors.rs @@ -53,3 +53,9 @@ pub enum PersistenceError { #[error("failed to publish item {0}")] UpdatePublished(anyhow::Error), } + +#[derive(Error, Debug)] +pub enum BuilderError { + #[error("dependency not added to builder: {0}")] + DependencyError(anyhow::Error), +} diff --git a/crates/crunch/examples/basic.rs b/crates/crunch/examples/basic.rs index 08bf6df..7e858f1 100644 --- a/crates/crunch/examples/basic.rs +++ b/crates/crunch/examples/basic.rs @@ -1,6 +1,7 @@ use crunch::errors::*; use crunch::traits::Event; +#[derive(Clone)] struct SomeEvent { name: String, } @@ -36,50 +37,49 @@ impl crunch::traits::Event for SomeEvent { async fn main() -> anyhow::Result<()> { tracing_subscriber::fmt::init(); - let in_memory = crunch::Persistence::in_memory(); - let transport = crunch::Transport::in_memory(); - crunch::OutboxHandler::new(in_memory.clone(), transport.clone()).spawn(); - let publisher = crunch::Publisher::new(in_memory); - let subscriber = crunch::Subscriber::new(transport); + let crunch = crunch::builder::Builder::default().build()?; + let counter = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0)); - subscriber - .subscribe(|item: SomeEvent| async move { - tracing::info!( - "subscription got event: {}, info: {}", - item.name, - item.int_event_info(), - ); - Ok(()) + let inner_counter = counter.clone(); + crunch + .subscribe(move |item: SomeEvent| { + let counter = inner_counter.clone(); + + async move { + tracing::info!( + "subscription got event: {}, info: {}", + item.name, + item.int_event_info(), + ); + + counter.fetch_add(1, std::sync::atomic::Ordering::SeqCst); + Ok(()) + } }) .await?; - publisher - .publish(SomeEvent { - name: "something".into(), - }) - .await?; - publisher - .publish(SomeEvent { - name: "something".into(), - }) - .await?; - publisher - .publish(SomeEvent { - name: "something".into(), - }) - .await?; - publisher - .publish(SomeEvent { - name: "something".into(), - }) - .await?; - publisher - .publish(SomeEvent { - name: "something".into(), - }) - .await?; + let event = SomeEvent { + name: "something".into(), + }; - tokio::time::sleep(std::time::Duration::from_secs(5)).await; + for _ in 0..50 { + tokio::spawn({ + let event = event.clone(); + let crunch = crunch.clone(); + + async move { + loop { + crunch.publish(event.clone()).await.unwrap(); + tokio::time::sleep(std::time::Duration::from_millis(1)).await; + } + } + }); + } + + tokio::time::sleep(std::time::Duration::from_secs(30)).await; + + let amount_run = counter.load(std::sync::atomic::Ordering::SeqCst); + tracing::error!("ran {} amount of times", amount_run); Ok(()) } diff --git a/crates/crunch/src/lib.rs b/crates/crunch/src/lib.rs index b20c611..6eedcc3 100644 --- a/crates/crunch/src/lib.rs +++ b/crates/crunch/src/lib.rs @@ -1,6 +1,7 @@ mod impls; mod outbox; mod publisher; +mod subscriber; mod transport; #[cfg(feature = "traits")] @@ -12,67 +13,115 @@ pub mod errors { pub use crunch_traits::errors::*; } +use crunch_traits::Event; pub use impls::Persistence; pub use outbox::OutboxHandler; pub use publisher::Publisher; pub use subscriber::Subscriber; pub use transport::Transport; -mod subscriber { - use crunch_traits::{Event, EventInfo}; - use futures::StreamExt; - - use crate::{errors, Transport}; - - pub struct Subscriber { - transport: Transport, +#[derive(Clone)] +pub struct Crunch { + publisher: Publisher, + subscriber: Subscriber, +} +impl Crunch { + pub fn new(publisher: Publisher, subscriber: Subscriber) -> Self { + Self { + publisher, + subscriber, + } } - impl Subscriber { - pub fn new(transport: Transport) -> Self { - Self { transport } + pub async fn subscribe(&self, callback: F) -> Result<(), errors::SubscriptionError> + where + F: Fn(I) -> Fut + Send + Sync + 'static, + Fut: futures::Future> + Send + 'static, + I: Event + Send + 'static, + { + self.subscriber.subscribe(callback).await + } +} + +impl std::ops::Deref for Crunch { + type Target = Publisher; + + fn deref(&self) -> &Self::Target { + &self.publisher + } +} + +pub mod builder { + use crate::{errors, Crunch, OutboxHandler, Persistence, Publisher, Subscriber, Transport}; + + #[derive(Clone)] + pub struct Builder { + persistence: Option, + transport: Option, + outbox_enabled: bool, + } + + impl Builder { + #[cfg(feature = "in-memory")] + pub fn with_in_memory_persistence(&mut self) -> &mut Self { + self.persistence = Some(Persistence::in_memory()); + self } - pub async fn subscribe( - &self, - callback: F, - ) -> Result<(), errors::SubscriptionError> - where - F: Fn(I) -> Fut + Send + Sync + 'static, - Fut: futures::Future> + Send + 'static, - I: Event + Send + 'static, - { - let mut stream = self + #[cfg(feature = "in-memory")] + pub fn with_in_memory_transport(&mut self) -> &mut Self { + self.transport = Some(Transport::in_memory()); + self + } + + pub fn with_outbox(&mut self, enabled: bool) -> &mut Self { + self.outbox_enabled = enabled; + self + } + + pub fn build(&mut self) -> Result { + let persistence = + self.persistence + .clone() + .ok_or(errors::BuilderError::DependencyError(anyhow::anyhow!( + "persistence was not set" + )))?; + let transport = self .transport - .subscriber(&I::event_info()) - .await - .map_err(errors::SubscriptionError::ConnectionFailed)? - .ok_or(errors::SubscriptionError::FailedToSubscribe( - anyhow::anyhow!("failed to find channel to subscribe to"), - ))?; + .clone() + .ok_or(errors::BuilderError::DependencyError(anyhow::anyhow!( + "transport was not set" + )))?; - tokio::spawn(async move { - while let Some(item) = stream.next().await { - let item = match I::deserialize(item) - .map_err(errors::SubscriptionError::DeserializationFailed) - { - Ok(i) => i, - Err(e) => { - tracing::warn!("deserialization failed: {}", e); - continue; - } - }; + let publisher = Publisher::new(persistence.clone()); + let subscriber = Subscriber::new(transport.clone()); + if self.outbox_enabled { + OutboxHandler::new(persistence.clone(), transport.clone()).spawn(); + } - match callback(item).await { - Ok(_) => {} - Err(e) => { - tracing::error!("subscription callback failed: {}", e) - } - } + Ok(Crunch::new(publisher, subscriber)) + } + } + + impl Default for Builder { + fn default() -> Self { + #[cfg(feature = "in-memory")] + { + return Self { + outbox_enabled: true, + persistence: None, + transport: None, } - }); + .with_in_memory_persistence() + .with_in_memory_transport() + .clone(); + } - Ok(()) + Self { + persistence: None, + transport: None, + outbox_enabled: true, + } } } } diff --git a/crates/crunch/src/publisher.rs b/crates/crunch/src/publisher.rs index c06df6b..d4c5157 100644 --- a/crates/crunch/src/publisher.rs +++ b/crates/crunch/src/publisher.rs @@ -2,6 +2,7 @@ use crunch_traits::{errors::PublishError, Event}; use crate::Persistence; +#[derive(Clone)] pub struct Publisher { persistence: Persistence, } diff --git a/crates/crunch/src/subscriber.rs b/crates/crunch/src/subscriber.rs new file mode 100644 index 0000000..3758560 --- /dev/null +++ b/crates/crunch/src/subscriber.rs @@ -0,0 +1,54 @@ +use crunch_traits::{Event, EventInfo}; +use futures::StreamExt; + +use crate::{errors, Transport}; + +#[derive(Clone)] +pub struct Subscriber { + transport: Transport, +} + +impl Subscriber { + pub fn new(transport: Transport) -> Self { + Self { transport } + } + + pub async fn subscribe(&self, callback: F) -> Result<(), errors::SubscriptionError> + where + F: Fn(I) -> Fut + Send + Sync + 'static, + Fut: futures::Future> + Send + 'static, + I: Event + Send + 'static, + { + let mut stream = self + .transport + .subscriber(&I::event_info()) + .await + .map_err(errors::SubscriptionError::ConnectionFailed)? + .ok_or(errors::SubscriptionError::FailedToSubscribe( + anyhow::anyhow!("failed to find channel to subscribe to"), + ))?; + + tokio::spawn(async move { + while let Some(item) = stream.next().await { + let item = match I::deserialize(item) + .map_err(errors::SubscriptionError::DeserializationFailed) + { + Ok(i) => i, + Err(e) => { + tracing::warn!("deserialization failed: {}", e); + continue; + } + }; + + match callback(item).await { + Ok(_) => {} + Err(e) => { + tracing::error!("subscription callback failed: {}", e) + } + } + } + }); + + Ok(()) + } +}