feat: with subscription

Signed-off-by: kjuulh <contact@kjuulh.io>
This commit is contained in:
Kasper Juul Hermansen 2023-09-23 22:38:12 +02:00
parent 05fb5c0722
commit 7b08b16cdb
Signed by: kjuulh
GPG Key ID: 9AA7BC13CE474394
10 changed files with 167 additions and 21 deletions

17
Cargo.lock generated
View File

@ -332,8 +332,10 @@ dependencies = [
"anyhow", "anyhow",
"async-trait", "async-trait",
"crunch-traits", "crunch-traits",
"futures",
"thiserror", "thiserror",
"tokio", "tokio",
"tokio-stream",
"tracing", "tracing",
] ]
@ -343,6 +345,7 @@ version = "0.1.0"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"async-trait", "async-trait",
"futures",
"thiserror", "thiserror",
"tokio", "tokio",
"uuid", "uuid",
@ -1220,6 +1223,20 @@ dependencies = [
"futures-core", "futures-core",
"pin-project-lite", "pin-project-lite",
"tokio", "tokio",
"tokio-util",
]
[[package]]
name = "tokio-util"
version = "0.7.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "806fe8c2c87eccc8b3267cbae29ed3ab2d0bd37fca70ab622e46aaa9375ddb7d"
dependencies = [
"bytes 1.5.0",
"futures-core",
"futures-sink",
"pin-project-lite",
"tokio",
] ]
[[package]] [[package]]

View File

@ -13,3 +13,5 @@ tracing.workspace = true
tokio.workspace = true tokio.workspace = true
thiserror.workspace = true thiserror.workspace = true
async-trait.workspace = true async-trait.workspace = true
futures.workspace = true
tokio-stream = {workspace = true, features = ["sync"]}

View File

@ -1,8 +1,10 @@
use std::collections::BTreeMap; use std::{collections::BTreeMap, pin::Pin};
use async_trait::async_trait; use async_trait::async_trait;
use crunch_traits::{errors::TransportError, EventInfo, Transport}; use crunch_traits::{errors::TransportError, EventInfo, Transport};
use futures::Stream;
use tokio::sync::broadcast::{Receiver, Sender}; use tokio::sync::broadcast::{Receiver, Sender};
use tokio_stream::{wrappers::BroadcastStream, StreamExt};
#[derive(Clone)] #[derive(Clone)]
struct TransportEnvelope { struct TransportEnvelope {
@ -20,6 +22,22 @@ impl InMemoryTransport {
events: tokio::sync::RwLock::default(), events: tokio::sync::RwLock::default(),
} }
} }
async fn register_channel(&self, event_info: &EventInfo) {
let transport_key = event_info.transport_name();
// Possibly create a trait register handle instead, as this requires a write and then read. It may not matter for in memory though
let mut events = self.events.write().await;
if let None = events.get(&transport_key) {
let (sender, mut receiver) = tokio::sync::broadcast::channel(100);
events.insert(transport_key.clone(), sender);
tokio::spawn(async move {
while let Ok(item) = receiver.recv().await {
tracing::info!("default receiver: {}", item.info.transport_name());
}
});
}
}
} }
impl Default for InMemoryTransport { impl Default for InMemoryTransport {
@ -30,27 +48,16 @@ impl Default for InMemoryTransport {
#[async_trait] #[async_trait]
impl Transport for InMemoryTransport { impl Transport for InMemoryTransport {
type Stream = Pin<Box<dyn Stream<Item = Vec<u8>> + Send>>;
async fn publish( async fn publish(
&self, &self,
event_info: &EventInfo, event_info: &EventInfo,
content: Vec<u8>, content: Vec<u8>,
) -> Result<(), TransportError> { ) -> Result<(), TransportError> {
self.register_channel(event_info).await;
let transport_key = event_info.transport_name(); let transport_key = event_info.transport_name();
// Possibly create a register handle instead, as this requires a write and then read. It may not matter for in memory though
{
let mut events = self.events.write().await;
if let None = events.get(&transport_key) {
let (sender, mut receiver) = tokio::sync::broadcast::channel(100);
events.insert(transport_key.clone(), sender);
tokio::spawn(async move {
while let Ok(item) = receiver.recv().await {
tracing::info!("default receiver: {}", item.info.transport_name());
}
});
}
}
let events = self.events.read().await; let events = self.events.read().await;
let sender = events let sender = events
.get(&transport_key) .get(&transport_key)
@ -65,6 +72,24 @@ impl Transport for InMemoryTransport {
Ok(()) Ok(())
} }
async fn subscriber(
&self,
event_info: &EventInfo,
) -> Result<Option<Self::Stream>, TransportError> {
self.register_channel(event_info).await;
let events = self.events.read().await;
match events.get(&event_info.transport_name()) {
Some(rx) => Ok(Some(Box::pin(
BroadcastStream::new(rx.subscribe()).filter_map(|m| match m {
Ok(m) => Some(m.content),
Err(_) => None,
}),
))),
None => Ok(None),
}
}
} }
trait EventInfoExt { trait EventInfoExt {

View File

@ -11,3 +11,4 @@ tokio.workspace = true
thiserror.workspace = true thiserror.workspace = true
async-trait.workspace = true async-trait.workspace = true
uuid.workspace = true uuid.workspace = true
futures.workspace = true

View File

@ -27,6 +27,18 @@ pub enum PublishError {
ConnectionError(#[source] anyhow::Error), ConnectionError(#[source] anyhow::Error),
} }
#[derive(Error, Debug)]
pub enum SubscriptionError {
#[error("failed to subscribe: {0}")]
FailedToSubscribe(#[source] anyhow::Error),
#[error("connection failed: {0}")]
ConnectionFailed(#[source] TransportError),
#[error("failed to deserialize{0}")]
DeserializationFailed(#[source] DeserializeError),
}
#[derive(Error, Debug)] #[derive(Error, Debug)]
pub enum TransportError { pub enum TransportError {
#[error("to publish to transport {0}")] #[error("to publish to transport {0}")]

View File

@ -38,7 +38,11 @@ impl Display for EventInfo {
} }
pub trait Event: Serializer + Deserializer { pub trait Event: Serializer + Deserializer {
fn event_info(&self) -> EventInfo; fn event_info() -> EventInfo;
fn int_event_info(&self) -> EventInfo {
Self::event_info()
}
} }
pub mod errors; pub mod errors;

View File

@ -1,4 +1,4 @@
use std::sync::Arc; use std::{pin::Pin, sync::Arc};
use async_trait::async_trait; use async_trait::async_trait;
@ -6,7 +6,19 @@ use crate::{errors::TransportError, EventInfo};
#[async_trait] #[async_trait]
pub trait Transport { pub trait Transport {
type Stream: futures::Stream<Item = Vec<u8>>;
async fn publish(&self, event_info: &EventInfo, content: Vec<u8>) async fn publish(&self, event_info: &EventInfo, content: Vec<u8>)
-> Result<(), TransportError>; -> Result<(), TransportError>;
async fn subscriber(
&self,
event_info: &EventInfo,
) -> Result<Option<Self::Stream>, TransportError>;
} }
pub type DynTransport = Arc<dyn Transport + Send + Sync + 'static>;
pub type DynTransport = Arc<
dyn Transport<Stream = Pin<Box<dyn futures::Stream<Item = Vec<u8>> + Send>>>
+ Send
+ Sync
+ 'static,
>;

View File

@ -1,4 +1,5 @@
use crunch::errors::*; use crunch::errors::*;
use crunch::traits::Event;
struct SomeEvent { struct SomeEvent {
name: String, name: String,
@ -22,7 +23,7 @@ impl crunch::traits::Deserializer for SomeEvent {
} }
impl crunch::traits::Event for SomeEvent { impl crunch::traits::Event for SomeEvent {
fn event_info(&self) -> crunch::traits::EventInfo { fn event_info() -> crunch::traits::EventInfo {
crunch::traits::EventInfo { crunch::traits::EventInfo {
domain: "some-domain", domain: "some-domain",
entity_type: "some-entity", entity_type: "some-entity",
@ -39,6 +40,18 @@ async fn main() -> anyhow::Result<()> {
let transport = crunch::Transport::in_memory(); let transport = crunch::Transport::in_memory();
crunch::OutboxHandler::new(in_memory.clone(), transport.clone()).spawn(); crunch::OutboxHandler::new(in_memory.clone(), transport.clone()).spawn();
let publisher = crunch::Publisher::new(in_memory); let publisher = crunch::Publisher::new(in_memory);
let subscriber = crunch::Subscriber::new(transport);
subscriber
.subscribe(|item: SomeEvent| async move {
tracing::info!(
"subscription got event: {}, info: {}",
item.name,
item.int_event_info(),
);
Ok(())
})
.await?;
publisher publisher
.publish(SomeEvent { .publish(SomeEvent {

View File

@ -15,4 +15,64 @@ pub mod errors {
pub use impls::Persistence; pub use impls::Persistence;
pub use outbox::OutboxHandler; pub use outbox::OutboxHandler;
pub use publisher::Publisher; pub use publisher::Publisher;
pub use subscriber::Subscriber;
pub use transport::Transport; pub use transport::Transport;
mod subscriber {
use crunch_traits::{Event, EventInfo};
use futures::StreamExt;
use crate::{errors, Transport};
pub struct Subscriber {
transport: Transport,
}
impl Subscriber {
pub fn new(transport: Transport) -> Self {
Self { transport }
}
pub async fn subscribe<I, F, Fut>(
&self,
callback: F,
) -> Result<(), errors::SubscriptionError>
where
F: Fn(I) -> Fut + Send + Sync + 'static,
Fut: futures::Future<Output = Result<(), errors::SubscriptionError>> + Send + 'static,
I: Event + Send + 'static,
{
let mut stream = self
.transport
.subscriber(&I::event_info())
.await
.map_err(errors::SubscriptionError::ConnectionFailed)?
.ok_or(errors::SubscriptionError::FailedToSubscribe(
anyhow::anyhow!("failed to find channel to subscribe to"),
))?;
tokio::spawn(async move {
while let Some(item) = stream.next().await {
let item = match I::deserialize(item)
.map_err(errors::SubscriptionError::DeserializationFailed)
{
Ok(i) => i,
Err(e) => {
tracing::warn!("deserialization failed: {}", e);
continue;
}
};
match callback(item).await {
Ok(_) => {}
Err(e) => {
tracing::error!("subscription callback failed: {}", e)
}
}
}
});
Ok(())
}
}
}

View File

@ -19,7 +19,7 @@ impl Publisher {
let content = event.serialize().map_err(PublishError::SerializeError)?; let content = event.serialize().map_err(PublishError::SerializeError)?;
self.persistence self.persistence
.insert(&event.event_info(), content) .insert(&event.int_event_info(), content)
.await .await
.map_err(PublishError::DbError)?; .map_err(PublishError::DbError)?;