diff --git a/crates/crunch-in-memory/src/lib.rs b/crates/crunch-in-memory/src/lib.rs index 2f25987..bfc7a33 100644 --- a/crates/crunch-in-memory/src/lib.rs +++ b/crates/crunch-in-memory/src/lib.rs @@ -1,106 +1 @@ -use std::{collections::BTreeMap, pin::Pin}; - -use async_trait::async_trait; -use crunch_traits::{errors::TransportError, EventInfo, Transport}; -use futures::Stream; -use tokio::sync::broadcast::Sender; -use tokio_stream::{wrappers::BroadcastStream, StreamExt}; - -#[derive(Clone)] -struct TransportEnvelope { - info: EventInfo, - content: Vec, -} - -pub struct InMemoryTransport { - events: tokio::sync::RwLock>>, -} - -impl InMemoryTransport { - pub fn new() -> Self { - Self { - events: tokio::sync::RwLock::default(), - } - } - - async fn register_channel(&self, event_info: &EventInfo) { - let transport_key = event_info.transport_name(); - - // Possibly create a trait register handle instead, as this requires a write and then read. It may not matter for in memory though - let mut events = self.events.write().await; - if events.get(&transport_key).is_none() { - let (sender, mut receiver) = tokio::sync::broadcast::channel(100); - events.insert(transport_key.clone(), sender); - tokio::spawn(async move { - while let Ok(item) = receiver.recv().await { - tracing::trace!("default receiver: {}", item.info.transport_name()); - } - }); - } - } -} - -impl Default for InMemoryTransport { - fn default() -> Self { - Self::new() - } -} - -#[async_trait] -impl Transport for InMemoryTransport { - type Stream = Pin> + Send>>; - - async fn publish( - &self, - event_info: &EventInfo, - content: Vec, - ) -> Result<(), TransportError> { - self.register_channel(event_info).await; - - let transport_key = event_info.transport_name(); - let events = self.events.read().await; - let sender = events - .get(&transport_key) - .expect("transport to be available, as we just created it"); - sender - .send(TransportEnvelope { - info: *event_info, - content, - }) - .map_err(|e| anyhow::anyhow!(e.to_string())) - .map_err(TransportError::Err)?; - - Ok(()) - } - - async fn subscriber( - &self, - event_info: &EventInfo, - ) -> Result, TransportError> { - self.register_channel(event_info).await; - - let events = self.events.read().await; - match events.get(&event_info.transport_name()) { - Some(rx) => Ok(Some(Box::pin( - BroadcastStream::new(rx.subscribe()).filter_map(|m| match m { - Ok(m) => Some(m.content), - Err(_) => None, - }), - ))), - None => Ok(None), - } - } -} - -trait EventInfoExt { - fn transport_name(&self) -> String; -} - -impl EventInfoExt for EventInfo { - fn transport_name(&self) -> String { - format!( - "crunch.{}.{}.{}", - self.domain, self.entity_type, self.event_name - ) - } -} +pub mod transport; diff --git a/crates/crunch-in-memory/src/transport.rs b/crates/crunch-in-memory/src/transport.rs new file mode 100644 index 0000000..2f25987 --- /dev/null +++ b/crates/crunch-in-memory/src/transport.rs @@ -0,0 +1,106 @@ +use std::{collections::BTreeMap, pin::Pin}; + +use async_trait::async_trait; +use crunch_traits::{errors::TransportError, EventInfo, Transport}; +use futures::Stream; +use tokio::sync::broadcast::Sender; +use tokio_stream::{wrappers::BroadcastStream, StreamExt}; + +#[derive(Clone)] +struct TransportEnvelope { + info: EventInfo, + content: Vec, +} + +pub struct InMemoryTransport { + events: tokio::sync::RwLock>>, +} + +impl InMemoryTransport { + pub fn new() -> Self { + Self { + events: tokio::sync::RwLock::default(), + } + } + + async fn register_channel(&self, event_info: &EventInfo) { + let transport_key = event_info.transport_name(); + + // Possibly create a trait register handle instead, as this requires a write and then read. It may not matter for in memory though + let mut events = self.events.write().await; + if events.get(&transport_key).is_none() { + let (sender, mut receiver) = tokio::sync::broadcast::channel(100); + events.insert(transport_key.clone(), sender); + tokio::spawn(async move { + while let Ok(item) = receiver.recv().await { + tracing::trace!("default receiver: {}", item.info.transport_name()); + } + }); + } + } +} + +impl Default for InMemoryTransport { + fn default() -> Self { + Self::new() + } +} + +#[async_trait] +impl Transport for InMemoryTransport { + type Stream = Pin> + Send>>; + + async fn publish( + &self, + event_info: &EventInfo, + content: Vec, + ) -> Result<(), TransportError> { + self.register_channel(event_info).await; + + let transport_key = event_info.transport_name(); + let events = self.events.read().await; + let sender = events + .get(&transport_key) + .expect("transport to be available, as we just created it"); + sender + .send(TransportEnvelope { + info: *event_info, + content, + }) + .map_err(|e| anyhow::anyhow!(e.to_string())) + .map_err(TransportError::Err)?; + + Ok(()) + } + + async fn subscriber( + &self, + event_info: &EventInfo, + ) -> Result, TransportError> { + self.register_channel(event_info).await; + + let events = self.events.read().await; + match events.get(&event_info.transport_name()) { + Some(rx) => Ok(Some(Box::pin( + BroadcastStream::new(rx.subscribe()).filter_map(|m| match m { + Ok(m) => Some(m.content), + Err(_) => None, + }), + ))), + None => Ok(None), + } + } +} + +trait EventInfoExt { + fn transport_name(&self) -> String; +} + +impl EventInfoExt for EventInfo { + fn transport_name(&self) -> String { + format!( + "crunch.{}.{}.{}", + self.domain, self.entity_type, self.event_name + ) + } +} diff --git a/crates/crunch/src/transport.rs b/crates/crunch/src/transport.rs index 2f4c199..91e981e 100644 --- a/crates/crunch/src/transport.rs +++ b/crates/crunch/src/transport.rs @@ -10,9 +10,9 @@ impl Transport { #[cfg(feature = "in-memory")] pub fn in_memory() -> Self { - Self(std::sync::Arc::new( - crunch_in_memory::InMemoryTransport::default(), - )) + use crunch_in_memory::transport::InMemoryTransport; + + Self(std::sync::Arc::new(InMemoryTransport::default())) } #[cfg(feature = "nats")]