diff --git a/README.md b/README.md index d7efcbc..0d37422 100644 --- a/README.md +++ b/README.md @@ -89,7 +89,7 @@ We recommend wrapping and exposing the parts you need to the library, so that yo You will need a transport of some sort. Transport is what transfers messages between services, crunch is built to be configurable, and unopinionated, as such most messaging protocols should work fine. -- [ ] [NATS (recommended)](crates/crunch-transport-nats) +- [x] [NATS (recommended)](crates/crunch-transport-nats) - [x] [Tokio channel (used for in-memory processing)](crates/crunch-transport-tokio-channel) ### Persistence diff --git a/crates/crunch-nats/src/lib.rs b/crates/crunch-nats/src/lib.rs index eb88505..b0b9b0b 100644 --- a/crates/crunch-nats/src/lib.rs +++ b/crates/crunch-nats/src/lib.rs @@ -24,7 +24,7 @@ impl NatsTransport { nats::asynk::Options::with_user_pass(user, pass) .connect(options.host) .await - .map_err(|e| anyhow::anyhow!(e)) + .map_err(|e| anyhow::anyhow!("failed to connect with username password: {}", e)) .map_err(TransportError::Err)? } }; @@ -60,6 +60,7 @@ impl Transport for NatsTransport { .map_err(TransportError::Err)?; let stream = futures::stream::unfold(sub, |sub| async move { + tracing::trace!("got event from nats"); let next = sub.next().await?; let next = next.data; Some((next, sub)) diff --git a/crates/crunch/assets/docker-compose.nats.yaml b/crates/crunch/assets/docker-compose.nats.yaml new file mode 100644 index 0000000..e21205e --- /dev/null +++ b/crates/crunch/assets/docker-compose.nats.yaml @@ -0,0 +1,12 @@ +version: '3' + +services: + nats: + image: bitnami/nats:2.10.1 + ports: + - "4222:4222" + environment: + - NATS_ENABLE_AUTH=yes + - NATS_USERNAME=user + - NATS_PASSWORD=secret + diff --git a/crates/crunch/examples/counter.rs b/crates/crunch/examples/counter.rs index 7e858f1..1d96307 100644 --- a/crates/crunch/examples/counter.rs +++ b/crates/crunch/examples/counter.rs @@ -62,6 +62,7 @@ async fn main() -> anyhow::Result<()> { name: "something".into(), }; + // Publish a lot of events for _ in 0..50 { tokio::spawn({ let event = event.clone(); @@ -79,7 +80,7 @@ async fn main() -> anyhow::Result<()> { tokio::time::sleep(std::time::Duration::from_secs(30)).await; let amount_run = counter.load(std::sync::atomic::Ordering::SeqCst); - tracing::error!("ran {} amount of times", amount_run); + tracing::info!("ran {} amount of times", amount_run); Ok(()) } diff --git a/crates/crunch/examples/nats.rs b/crates/crunch/examples/nats.rs new file mode 100644 index 0000000..be62370 --- /dev/null +++ b/crates/crunch/examples/nats.rs @@ -0,0 +1,74 @@ +use crunch::errors::*; +use crunch::nats::{NatsConnectCredentials, NatsConnectOptions}; +use crunch::traits::{Deserializer, Event, EventInfo, Serializer}; + +#[derive(Clone)] +struct SomeEvent { + name: String, +} + +impl Serializer for SomeEvent { + fn serialize(&self) -> Result, SerializeError> { + Ok(b"field=name".to_vec()) + } +} + +impl Deserializer for SomeEvent { + fn deserialize(_raw: Vec) -> Result + where + Self: Sized, + { + Ok(Self { + name: "something".into(), + }) + } +} + +impl Event for SomeEvent { + fn event_info() -> EventInfo { + EventInfo { + domain: "some-domain", + entity_type: "some-entity", + event_name: "some-event", + } + } +} + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + tracing_subscriber::fmt::init(); + + // Remember to start nats via. docker first, or set equivalent settings as in `NatsConnectOptions` + let crunch = crunch::builder::Builder::default() + .with_nats_transport(NatsConnectOptions { + host: "127.0.0.1:4222", + credentials: NatsConnectCredentials::UserPass { + user: "user", + pass: "secret", + }, + }) + .await? + .build()?; + crunch + .subscribe(move |item: SomeEvent| async move { + tracing::info!( + "subscription got event: {}, info: {}", + item.name, + item.int_event_info(), + ); + Ok(()) + }) + .await?; + + let event = SomeEvent { + name: "something".into(), + }; + + for _ in 0..5 { + crunch.publish(event.clone()).await?; + } + + tokio::time::sleep(std::time::Duration::from_secs(5)).await; + + Ok(()) +} diff --git a/crates/crunch/src/outbox.rs b/crates/crunch/src/outbox.rs index e0c2948..62be44c 100644 --- a/crates/crunch/src/outbox.rs +++ b/crates/crunch/src/outbox.rs @@ -39,7 +39,7 @@ async fn handle_messages(p: &Persistence, t: &Transport) -> anyhow::Result { t.publish(&info, content).await?; p.update_published(&item).await?; - tracing::info!("published item: {}", item); + tracing::debug!("published item: {}", item); } None => { tracing::info!("did not find any events for item: {}", item);