diff --git a/Cargo.lock b/Cargo.lock index 7f8641d..70c44d2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -332,8 +332,10 @@ dependencies = [ "anyhow", "async-trait", "crunch-traits", + "futures", "thiserror", "tokio", + "tokio-stream", "tracing", ] @@ -343,6 +345,7 @@ version = "0.1.0" dependencies = [ "anyhow", "async-trait", + "futures", "thiserror", "tokio", "uuid", @@ -1220,6 +1223,20 @@ dependencies = [ "futures-core", "pin-project-lite", "tokio", + "tokio-util", +] + +[[package]] +name = "tokio-util" +version = "0.7.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "806fe8c2c87eccc8b3267cbae29ed3ab2d0bd37fca70ab622e46aaa9375ddb7d" +dependencies = [ + "bytes 1.5.0", + "futures-core", + "futures-sink", + "pin-project-lite", + "tokio", ] [[package]] diff --git a/crates/crunch-in-memory/Cargo.toml b/crates/crunch-in-memory/Cargo.toml index 26e664d..e8bc10b 100644 --- a/crates/crunch-in-memory/Cargo.toml +++ b/crates/crunch-in-memory/Cargo.toml @@ -13,3 +13,5 @@ tracing.workspace = true tokio.workspace = true thiserror.workspace = true async-trait.workspace = true +futures.workspace = true +tokio-stream = {workspace = true, features = ["sync"]} diff --git a/crates/crunch-in-memory/src/lib.rs b/crates/crunch-in-memory/src/lib.rs index 6ed85d8..a097f8c 100644 --- a/crates/crunch-in-memory/src/lib.rs +++ b/crates/crunch-in-memory/src/lib.rs @@ -1,8 +1,10 @@ -use std::collections::BTreeMap; +use std::{collections::BTreeMap, pin::Pin}; use async_trait::async_trait; use crunch_traits::{errors::TransportError, EventInfo, Transport}; +use futures::Stream; use tokio::sync::broadcast::{Receiver, Sender}; +use tokio_stream::{wrappers::BroadcastStream, StreamExt}; #[derive(Clone)] struct TransportEnvelope { @@ -20,6 +22,22 @@ impl InMemoryTransport { events: tokio::sync::RwLock::default(), } } + + async fn register_channel(&self, event_info: &EventInfo) { + let transport_key = event_info.transport_name(); + + // Possibly create a trait register handle instead, as this requires a write and then read. It may not matter for in memory though + let mut events = self.events.write().await; + if let None = events.get(&transport_key) { + let (sender, mut receiver) = tokio::sync::broadcast::channel(100); + events.insert(transport_key.clone(), sender); + tokio::spawn(async move { + while let Ok(item) = receiver.recv().await { + tracing::info!("default receiver: {}", item.info.transport_name()); + } + }); + } + } } impl Default for InMemoryTransport { @@ -30,27 +48,16 @@ impl Default for InMemoryTransport { #[async_trait] impl Transport for InMemoryTransport { + type Stream = Pin> + Send>>; + async fn publish( &self, event_info: &EventInfo, content: Vec, ) -> Result<(), TransportError> { + self.register_channel(event_info).await; + let transport_key = event_info.transport_name(); - - // Possibly create a register handle instead, as this requires a write and then read. It may not matter for in memory though - { - let mut events = self.events.write().await; - if let None = events.get(&transport_key) { - let (sender, mut receiver) = tokio::sync::broadcast::channel(100); - events.insert(transport_key.clone(), sender); - tokio::spawn(async move { - while let Ok(item) = receiver.recv().await { - tracing::info!("default receiver: {}", item.info.transport_name()); - } - }); - } - } - let events = self.events.read().await; let sender = events .get(&transport_key) @@ -65,6 +72,24 @@ impl Transport for InMemoryTransport { Ok(()) } + + async fn subscriber( + &self, + event_info: &EventInfo, + ) -> Result, TransportError> { + self.register_channel(event_info).await; + + let events = self.events.read().await; + match events.get(&event_info.transport_name()) { + Some(rx) => Ok(Some(Box::pin( + BroadcastStream::new(rx.subscribe()).filter_map(|m| match m { + Ok(m) => Some(m.content), + Err(_) => None, + }), + ))), + None => Ok(None), + } + } } trait EventInfoExt { diff --git a/crates/crunch-traits/Cargo.toml b/crates/crunch-traits/Cargo.toml index 8ab0a74..46e32f3 100644 --- a/crates/crunch-traits/Cargo.toml +++ b/crates/crunch-traits/Cargo.toml @@ -11,3 +11,4 @@ tokio.workspace = true thiserror.workspace = true async-trait.workspace = true uuid.workspace = true +futures.workspace = true diff --git a/crates/crunch-traits/src/errors.rs b/crates/crunch-traits/src/errors.rs index 10f1c0d..2440a82 100644 --- a/crates/crunch-traits/src/errors.rs +++ b/crates/crunch-traits/src/errors.rs @@ -27,6 +27,18 @@ pub enum PublishError { ConnectionError(#[source] anyhow::Error), } +#[derive(Error, Debug)] +pub enum SubscriptionError { + #[error("failed to subscribe: {0}")] + FailedToSubscribe(#[source] anyhow::Error), + + #[error("connection failed: {0}")] + ConnectionFailed(#[source] TransportError), + + #[error("failed to deserialize{0}")] + DeserializationFailed(#[source] DeserializeError), +} + #[derive(Error, Debug)] pub enum TransportError { #[error("to publish to transport {0}")] diff --git a/crates/crunch-traits/src/lib.rs b/crates/crunch-traits/src/lib.rs index d2d9759..93169c5 100644 --- a/crates/crunch-traits/src/lib.rs +++ b/crates/crunch-traits/src/lib.rs @@ -38,7 +38,11 @@ impl Display for EventInfo { } pub trait Event: Serializer + Deserializer { - fn event_info(&self) -> EventInfo; + fn event_info() -> EventInfo; + + fn int_event_info(&self) -> EventInfo { + Self::event_info() + } } pub mod errors; diff --git a/crates/crunch-traits/src/transport.rs b/crates/crunch-traits/src/transport.rs index d0ec1a6..8b474ac 100644 --- a/crates/crunch-traits/src/transport.rs +++ b/crates/crunch-traits/src/transport.rs @@ -1,4 +1,4 @@ -use std::sync::Arc; +use std::{pin::Pin, sync::Arc}; use async_trait::async_trait; @@ -6,7 +6,19 @@ use crate::{errors::TransportError, EventInfo}; #[async_trait] pub trait Transport { + type Stream: futures::Stream>; + async fn publish(&self, event_info: &EventInfo, content: Vec) -> Result<(), TransportError>; + async fn subscriber( + &self, + event_info: &EventInfo, + ) -> Result, TransportError>; } -pub type DynTransport = Arc; + +pub type DynTransport = Arc< + dyn Transport> + Send>>> + + Send + + Sync + + 'static, +>; diff --git a/crates/crunch/examples/basic.rs b/crates/crunch/examples/basic.rs index 663844d..08bf6df 100644 --- a/crates/crunch/examples/basic.rs +++ b/crates/crunch/examples/basic.rs @@ -1,4 +1,5 @@ use crunch::errors::*; +use crunch::traits::Event; struct SomeEvent { name: String, @@ -22,7 +23,7 @@ impl crunch::traits::Deserializer for SomeEvent { } impl crunch::traits::Event for SomeEvent { - fn event_info(&self) -> crunch::traits::EventInfo { + fn event_info() -> crunch::traits::EventInfo { crunch::traits::EventInfo { domain: "some-domain", entity_type: "some-entity", @@ -39,6 +40,18 @@ async fn main() -> anyhow::Result<()> { let transport = crunch::Transport::in_memory(); crunch::OutboxHandler::new(in_memory.clone(), transport.clone()).spawn(); let publisher = crunch::Publisher::new(in_memory); + let subscriber = crunch::Subscriber::new(transport); + + subscriber + .subscribe(|item: SomeEvent| async move { + tracing::info!( + "subscription got event: {}, info: {}", + item.name, + item.int_event_info(), + ); + Ok(()) + }) + .await?; publisher .publish(SomeEvent { diff --git a/crates/crunch/src/lib.rs b/crates/crunch/src/lib.rs index 57a1f66..b20c611 100644 --- a/crates/crunch/src/lib.rs +++ b/crates/crunch/src/lib.rs @@ -15,4 +15,64 @@ pub mod errors { pub use impls::Persistence; pub use outbox::OutboxHandler; pub use publisher::Publisher; +pub use subscriber::Subscriber; pub use transport::Transport; + +mod subscriber { + use crunch_traits::{Event, EventInfo}; + use futures::StreamExt; + + use crate::{errors, Transport}; + + pub struct Subscriber { + transport: Transport, + } + + impl Subscriber { + pub fn new(transport: Transport) -> Self { + Self { transport } + } + + pub async fn subscribe( + &self, + callback: F, + ) -> Result<(), errors::SubscriptionError> + where + F: Fn(I) -> Fut + Send + Sync + 'static, + Fut: futures::Future> + Send + 'static, + I: Event + Send + 'static, + { + let mut stream = self + .transport + .subscriber(&I::event_info()) + .await + .map_err(errors::SubscriptionError::ConnectionFailed)? + .ok_or(errors::SubscriptionError::FailedToSubscribe( + anyhow::anyhow!("failed to find channel to subscribe to"), + ))?; + + tokio::spawn(async move { + while let Some(item) = stream.next().await { + let item = match I::deserialize(item) + .map_err(errors::SubscriptionError::DeserializationFailed) + { + Ok(i) => i, + Err(e) => { + tracing::warn!("deserialization failed: {}", e); + continue; + } + }; + + match callback(item).await { + Ok(_) => {} + Err(e) => { + tracing::error!("subscription callback failed: {}", e) + } + } + } + }); + + Ok(()) + } + } +} diff --git a/crates/crunch/src/publisher.rs b/crates/crunch/src/publisher.rs index 14e9885..c06df6b 100644 --- a/crates/crunch/src/publisher.rs +++ b/crates/crunch/src/publisher.rs @@ -19,7 +19,7 @@ impl Publisher { let content = event.serialize().map_err(PublishError::SerializeError)?; self.persistence - .insert(&event.event_info(), content) + .insert(&event.int_event_info(), content) .await .map_err(PublishError::DbError)?;