diff --git a/crates/crunch-in-memory/src/lib.rs b/crates/crunch-in-memory/src/lib.rs index 4be5e85..f506fb4 100644 --- a/crates/crunch-in-memory/src/lib.rs +++ b/crates/crunch-in-memory/src/lib.rs @@ -33,7 +33,7 @@ impl InMemoryTransport { events.insert(transport_key.clone(), sender); tokio::spawn(async move { while let Ok(item) = receiver.recv().await { - tracing::info!("default receiver: {}", item.info.transport_name()); + tracing::trace!("default receiver: {}", item.info.transport_name()); } }); } diff --git a/crates/crunch/examples/basic.rs b/crates/crunch/examples/basic.rs index 7e858f1..88ecd97 100644 --- a/crates/crunch/examples/basic.rs +++ b/crates/crunch/examples/basic.rs @@ -38,23 +38,14 @@ async fn main() -> anyhow::Result<()> { tracing_subscriber::fmt::init(); let crunch = crunch::builder::Builder::default().build()?; - let counter = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0)); - - let inner_counter = counter.clone(); crunch - .subscribe(move |item: SomeEvent| { - let counter = inner_counter.clone(); - - async move { - tracing::info!( - "subscription got event: {}, info: {}", - item.name, - item.int_event_info(), - ); - - counter.fetch_add(1, std::sync::atomic::Ordering::SeqCst); - Ok(()) - } + .subscribe(move |item: SomeEvent| async move { + tracing::info!( + "subscription got event: {}, info: {}", + item.name, + item.int_event_info(), + ); + Ok(()) }) .await?; @@ -62,24 +53,11 @@ async fn main() -> anyhow::Result<()> { name: "something".into(), }; - for _ in 0..50 { - tokio::spawn({ - let event = event.clone(); - let crunch = crunch.clone(); + crunch.publish(event.clone()).await?; + crunch.publish(event.clone()).await?; + crunch.publish(event.clone()).await?; - async move { - loop { - crunch.publish(event.clone()).await.unwrap(); - tokio::time::sleep(std::time::Duration::from_millis(1)).await; - } - } - }); - } - - tokio::time::sleep(std::time::Duration::from_secs(30)).await; - - let amount_run = counter.load(std::sync::atomic::Ordering::SeqCst); - tracing::error!("ran {} amount of times", amount_run); + tokio::time::sleep(std::time::Duration::from_secs(5)).await; Ok(()) } diff --git a/crates/crunch/examples/counter.rs b/crates/crunch/examples/counter.rs new file mode 100644 index 0000000..7e858f1 --- /dev/null +++ b/crates/crunch/examples/counter.rs @@ -0,0 +1,85 @@ +use crunch::errors::*; +use crunch::traits::Event; + +#[derive(Clone)] +struct SomeEvent { + name: String, +} + +impl crunch::traits::Serializer for SomeEvent { + fn serialize(&self) -> Result, SerializeError> { + Ok(b"field=name".to_vec()) + } +} + +impl crunch::traits::Deserializer for SomeEvent { + fn deserialize(_raw: Vec) -> Result + where + Self: Sized, + { + Ok(Self { + name: "something".into(), + }) + } +} + +impl crunch::traits::Event for SomeEvent { + fn event_info() -> crunch::traits::EventInfo { + crunch::traits::EventInfo { + domain: "some-domain", + entity_type: "some-entity", + event_name: "some-event", + } + } +} + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + tracing_subscriber::fmt::init(); + + let crunch = crunch::builder::Builder::default().build()?; + let counter = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0)); + + let inner_counter = counter.clone(); + crunch + .subscribe(move |item: SomeEvent| { + let counter = inner_counter.clone(); + + async move { + tracing::info!( + "subscription got event: {}, info: {}", + item.name, + item.int_event_info(), + ); + + counter.fetch_add(1, std::sync::atomic::Ordering::SeqCst); + Ok(()) + } + }) + .await?; + + let event = SomeEvent { + name: "something".into(), + }; + + for _ in 0..50 { + tokio::spawn({ + let event = event.clone(); + let crunch = crunch.clone(); + + async move { + loop { + crunch.publish(event.clone()).await.unwrap(); + tokio::time::sleep(std::time::Duration::from_millis(1)).await; + } + } + }); + } + + tokio::time::sleep(std::time::Duration::from_secs(30)).await; + + let amount_run = counter.load(std::sync::atomic::Ordering::SeqCst); + tracing::error!("ran {} amount of times", amount_run); + + Ok(()) +} diff --git a/crates/crunch/src/impls.rs b/crates/crunch/src/impls.rs index d3383e9..38adc14 100644 --- a/crates/crunch/src/impls.rs +++ b/crates/crunch/src/impls.rs @@ -41,7 +41,7 @@ impl crunch_traits::Persistence for InMemoryPersistence { outbox.push_back(msg.clone()); self.store.write().await.insert(msg.id.clone(), msg); - tracing::info!( + tracing::debug!( event_info = event_info.to_string(), content_len = content.len(), "inserted event"