feat: with many producers

Signed-off-by: kjuulh <contact@kjuulh.io>
This commit is contained in:
Kasper Juul Hermansen 2023-09-24 00:00:41 +02:00
parent 7b08b16cdb
commit 0a78124489
Signed by: kjuulh
GPG Key ID: 9AA7BC13CE474394
5 changed files with 195 additions and 85 deletions

View File

@ -53,3 +53,9 @@ pub enum PersistenceError {
#[error("failed to publish item {0}")]
UpdatePublished(anyhow::Error),
}
#[derive(Error, Debug)]
pub enum BuilderError {
#[error("dependency not added to builder: {0}")]
DependencyError(anyhow::Error),
}

View File

@ -1,6 +1,7 @@
use crunch::errors::*;
use crunch::traits::Event;
#[derive(Clone)]
struct SomeEvent {
name: String,
}
@ -36,50 +37,49 @@ impl crunch::traits::Event for SomeEvent {
async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt::init();
let in_memory = crunch::Persistence::in_memory();
let transport = crunch::Transport::in_memory();
crunch::OutboxHandler::new(in_memory.clone(), transport.clone()).spawn();
let publisher = crunch::Publisher::new(in_memory);
let subscriber = crunch::Subscriber::new(transport);
let crunch = crunch::builder::Builder::default().build()?;
let counter = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0));
subscriber
.subscribe(|item: SomeEvent| async move {
tracing::info!(
"subscription got event: {}, info: {}",
item.name,
item.int_event_info(),
);
Ok(())
let inner_counter = counter.clone();
crunch
.subscribe(move |item: SomeEvent| {
let counter = inner_counter.clone();
async move {
tracing::info!(
"subscription got event: {}, info: {}",
item.name,
item.int_event_info(),
);
counter.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
Ok(())
}
})
.await?;
publisher
.publish(SomeEvent {
name: "something".into(),
})
.await?;
publisher
.publish(SomeEvent {
name: "something".into(),
})
.await?;
publisher
.publish(SomeEvent {
name: "something".into(),
})
.await?;
publisher
.publish(SomeEvent {
name: "something".into(),
})
.await?;
publisher
.publish(SomeEvent {
name: "something".into(),
})
.await?;
let event = SomeEvent {
name: "something".into(),
};
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
for _ in 0..50 {
tokio::spawn({
let event = event.clone();
let crunch = crunch.clone();
async move {
loop {
crunch.publish(event.clone()).await.unwrap();
tokio::time::sleep(std::time::Duration::from_millis(1)).await;
}
}
});
}
tokio::time::sleep(std::time::Duration::from_secs(30)).await;
let amount_run = counter.load(std::sync::atomic::Ordering::SeqCst);
tracing::error!("ran {} amount of times", amount_run);
Ok(())
}

View File

@ -1,6 +1,7 @@
mod impls;
mod outbox;
mod publisher;
mod subscriber;
mod transport;
#[cfg(feature = "traits")]
@ -12,67 +13,115 @@ pub mod errors {
pub use crunch_traits::errors::*;
}
use crunch_traits::Event;
pub use impls::Persistence;
pub use outbox::OutboxHandler;
pub use publisher::Publisher;
pub use subscriber::Subscriber;
pub use transport::Transport;
mod subscriber {
use crunch_traits::{Event, EventInfo};
use futures::StreamExt;
use crate::{errors, Transport};
pub struct Subscriber {
transport: Transport,
#[derive(Clone)]
pub struct Crunch {
publisher: Publisher,
subscriber: Subscriber,
}
impl Crunch {
pub fn new(publisher: Publisher, subscriber: Subscriber) -> Self {
Self {
publisher,
subscriber,
}
}
impl Subscriber {
pub fn new(transport: Transport) -> Self {
Self { transport }
pub async fn subscribe<I, F, Fut>(&self, callback: F) -> Result<(), errors::SubscriptionError>
where
F: Fn(I) -> Fut + Send + Sync + 'static,
Fut: futures::Future<Output = Result<(), errors::SubscriptionError>> + Send + 'static,
I: Event + Send + 'static,
{
self.subscriber.subscribe(callback).await
}
}
impl std::ops::Deref for Crunch {
type Target = Publisher;
fn deref(&self) -> &Self::Target {
&self.publisher
}
}
pub mod builder {
use crate::{errors, Crunch, OutboxHandler, Persistence, Publisher, Subscriber, Transport};
#[derive(Clone)]
pub struct Builder {
persistence: Option<Persistence>,
transport: Option<Transport>,
outbox_enabled: bool,
}
impl Builder {
#[cfg(feature = "in-memory")]
pub fn with_in_memory_persistence(&mut self) -> &mut Self {
self.persistence = Some(Persistence::in_memory());
self
}
pub async fn subscribe<I, F, Fut>(
&self,
callback: F,
) -> Result<(), errors::SubscriptionError>
where
F: Fn(I) -> Fut + Send + Sync + 'static,
Fut: futures::Future<Output = Result<(), errors::SubscriptionError>> + Send + 'static,
I: Event + Send + 'static,
{
let mut stream = self
#[cfg(feature = "in-memory")]
pub fn with_in_memory_transport(&mut self) -> &mut Self {
self.transport = Some(Transport::in_memory());
self
}
pub fn with_outbox(&mut self, enabled: bool) -> &mut Self {
self.outbox_enabled = enabled;
self
}
pub fn build(&mut self) -> Result<Crunch, errors::BuilderError> {
let persistence =
self.persistence
.clone()
.ok_or(errors::BuilderError::DependencyError(anyhow::anyhow!(
"persistence was not set"
)))?;
let transport = self
.transport
.subscriber(&I::event_info())
.await
.map_err(errors::SubscriptionError::ConnectionFailed)?
.ok_or(errors::SubscriptionError::FailedToSubscribe(
anyhow::anyhow!("failed to find channel to subscribe to"),
))?;
.clone()
.ok_or(errors::BuilderError::DependencyError(anyhow::anyhow!(
"transport was not set"
)))?;
tokio::spawn(async move {
while let Some(item) = stream.next().await {
let item = match I::deserialize(item)
.map_err(errors::SubscriptionError::DeserializationFailed)
{
Ok(i) => i,
Err(e) => {
tracing::warn!("deserialization failed: {}", e);
continue;
}
};
let publisher = Publisher::new(persistence.clone());
let subscriber = Subscriber::new(transport.clone());
if self.outbox_enabled {
OutboxHandler::new(persistence.clone(), transport.clone()).spawn();
}
match callback(item).await {
Ok(_) => {}
Err(e) => {
tracing::error!("subscription callback failed: {}", e)
}
}
Ok(Crunch::new(publisher, subscriber))
}
}
impl Default for Builder {
fn default() -> Self {
#[cfg(feature = "in-memory")]
{
return Self {
outbox_enabled: true,
persistence: None,
transport: None,
}
});
.with_in_memory_persistence()
.with_in_memory_transport()
.clone();
}
Ok(())
Self {
persistence: None,
transport: None,
outbox_enabled: true,
}
}
}
}

View File

@ -2,6 +2,7 @@ use crunch_traits::{errors::PublishError, Event};
use crate::Persistence;
#[derive(Clone)]
pub struct Publisher {
persistence: Persistence,
}

View File

@ -0,0 +1,54 @@
use crunch_traits::{Event, EventInfo};
use futures::StreamExt;
use crate::{errors, Transport};
#[derive(Clone)]
pub struct Subscriber {
transport: Transport,
}
impl Subscriber {
pub fn new(transport: Transport) -> Self {
Self { transport }
}
pub async fn subscribe<I, F, Fut>(&self, callback: F) -> Result<(), errors::SubscriptionError>
where
F: Fn(I) -> Fut + Send + Sync + 'static,
Fut: futures::Future<Output = Result<(), errors::SubscriptionError>> + Send + 'static,
I: Event + Send + 'static,
{
let mut stream = self
.transport
.subscriber(&I::event_info())
.await
.map_err(errors::SubscriptionError::ConnectionFailed)?
.ok_or(errors::SubscriptionError::FailedToSubscribe(
anyhow::anyhow!("failed to find channel to subscribe to"),
))?;
tokio::spawn(async move {
while let Some(item) = stream.next().await {
let item = match I::deserialize(item)
.map_err(errors::SubscriptionError::DeserializationFailed)
{
Ok(i) => i,
Err(e) => {
tracing::warn!("deserialization failed: {}", e);
continue;
}
};
match callback(item).await {
Ok(_) => {}
Err(e) => {
tracing::error!("subscription callback failed: {}", e)
}
}
}
});
Ok(())
}
}