feat: add nats example

Signed-off-by: kjuulh <contact@kjuulh.io>
This commit is contained in:
Kasper Juul Hermansen 2023-09-24 01:09:43 +02:00
parent 687af13d9c
commit 19a44f968d
Signed by: kjuulh
GPG Key ID: 9AA7BC13CE474394
6 changed files with 92 additions and 4 deletions

View File

@ -89,7 +89,7 @@ We recommend wrapping and exposing the parts you need to the library, so that yo
You will need a transport of some sort. Transport is what transfers messages between services, crunch is built to be configurable, and unopinionated, as such most messaging protocols should work fine. You will need a transport of some sort. Transport is what transfers messages between services, crunch is built to be configurable, and unopinionated, as such most messaging protocols should work fine.
- [ ] [NATS (recommended)](crates/crunch-transport-nats) - [x] [NATS (recommended)](crates/crunch-transport-nats)
- [x] [Tokio channel (used for in-memory processing)](crates/crunch-transport-tokio-channel) - [x] [Tokio channel (used for in-memory processing)](crates/crunch-transport-tokio-channel)
### Persistence ### Persistence

View File

@ -24,7 +24,7 @@ impl NatsTransport {
nats::asynk::Options::with_user_pass(user, pass) nats::asynk::Options::with_user_pass(user, pass)
.connect(options.host) .connect(options.host)
.await .await
.map_err(|e| anyhow::anyhow!(e)) .map_err(|e| anyhow::anyhow!("failed to connect with username password: {}", e))
.map_err(TransportError::Err)? .map_err(TransportError::Err)?
} }
}; };
@ -60,6 +60,7 @@ impl Transport for NatsTransport {
.map_err(TransportError::Err)?; .map_err(TransportError::Err)?;
let stream = futures::stream::unfold(sub, |sub| async move { let stream = futures::stream::unfold(sub, |sub| async move {
tracing::trace!("got event from nats");
let next = sub.next().await?; let next = sub.next().await?;
let next = next.data; let next = next.data;
Some((next, sub)) Some((next, sub))

View File

@ -0,0 +1,12 @@
version: '3'
services:
nats:
image: bitnami/nats:2.10.1
ports:
- "4222:4222"
environment:
- NATS_ENABLE_AUTH=yes
- NATS_USERNAME=user
- NATS_PASSWORD=secret

View File

@ -62,6 +62,7 @@ async fn main() -> anyhow::Result<()> {
name: "something".into(), name: "something".into(),
}; };
// Publish a lot of events
for _ in 0..50 { for _ in 0..50 {
tokio::spawn({ tokio::spawn({
let event = event.clone(); let event = event.clone();
@ -79,7 +80,7 @@ async fn main() -> anyhow::Result<()> {
tokio::time::sleep(std::time::Duration::from_secs(30)).await; tokio::time::sleep(std::time::Duration::from_secs(30)).await;
let amount_run = counter.load(std::sync::atomic::Ordering::SeqCst); let amount_run = counter.load(std::sync::atomic::Ordering::SeqCst);
tracing::error!("ran {} amount of times", amount_run); tracing::info!("ran {} amount of times", amount_run);
Ok(()) Ok(())
} }

View File

@ -0,0 +1,74 @@
use crunch::errors::*;
use crunch::nats::{NatsConnectCredentials, NatsConnectOptions};
use crunch::traits::{Deserializer, Event, EventInfo, Serializer};
#[derive(Clone)]
struct SomeEvent {
name: String,
}
impl Serializer for SomeEvent {
fn serialize(&self) -> Result<Vec<u8>, SerializeError> {
Ok(b"field=name".to_vec())
}
}
impl Deserializer for SomeEvent {
fn deserialize(_raw: Vec<u8>) -> Result<Self, DeserializeError>
where
Self: Sized,
{
Ok(Self {
name: "something".into(),
})
}
}
impl Event for SomeEvent {
fn event_info() -> EventInfo {
EventInfo {
domain: "some-domain",
entity_type: "some-entity",
event_name: "some-event",
}
}
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt::init();
// Remember to start nats via. docker first, or set equivalent settings as in `NatsConnectOptions`
let crunch = crunch::builder::Builder::default()
.with_nats_transport(NatsConnectOptions {
host: "127.0.0.1:4222",
credentials: NatsConnectCredentials::UserPass {
user: "user",
pass: "secret",
},
})
.await?
.build()?;
crunch
.subscribe(move |item: SomeEvent| async move {
tracing::info!(
"subscription got event: {}, info: {}",
item.name,
item.int_event_info(),
);
Ok(())
})
.await?;
let event = SomeEvent {
name: "something".into(),
};
for _ in 0..5 {
crunch.publish(event.clone()).await?;
}
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
Ok(())
}

View File

@ -39,7 +39,7 @@ async fn handle_messages(p: &Persistence, t: &Transport) -> anyhow::Result<Optio
Some((info, content)) => { Some((info, content)) => {
t.publish(&info, content).await?; t.publish(&info, content).await?;
p.update_published(&item).await?; p.update_published(&item).await?;
tracing::info!("published item: {}", item); tracing::debug!("published item: {}", item);
} }
None => { None => {
tracing::info!("did not find any events for item: {}", item); tracing::info!("did not find any events for item: {}", item);