@@ -3,7 +3,7 @@ use std::{collections::BTreeMap, pin::Pin};
|
||||
use async_trait::async_trait;
|
||||
use crunch_traits::{errors::TransportError, EventInfo, Transport};
|
||||
use futures::Stream;
|
||||
use tokio::sync::broadcast::{Receiver, Sender};
|
||||
use tokio::sync::broadcast::Sender;
|
||||
use tokio_stream::{wrappers::BroadcastStream, StreamExt};
|
||||
|
||||
#[derive(Clone)]
|
||||
|
18
crates/crunch-nats/Cargo.toml
Normal file
18
crates/crunch-nats/Cargo.toml
Normal file
@@ -0,0 +1,18 @@
|
||||
[package]
|
||||
name = "crunch-nats"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
|
||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
|
||||
[dependencies]
|
||||
crunch-traits.workspace = true
|
||||
|
||||
nats = {workspace = true}
|
||||
anyhow.workspace = true
|
||||
tracing.workspace = true
|
||||
tokio.workspace = true
|
||||
thiserror.workspace = true
|
||||
async-trait.workspace = true
|
||||
futures.workspace = true
|
||||
tokio-stream = {workspace = true, features = ["sync"]}
|
83
crates/crunch-nats/src/lib.rs
Normal file
83
crates/crunch-nats/src/lib.rs
Normal file
@@ -0,0 +1,83 @@
|
||||
use std::pin::Pin;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use crunch_traits::{errors::TransportError, EventInfo, Transport};
|
||||
use futures::Stream;
|
||||
|
||||
pub struct NatsConnectOptions<'a> {
|
||||
pub host: &'a str,
|
||||
pub credentials: NatsConnectCredentials<'a>,
|
||||
}
|
||||
pub enum NatsConnectCredentials<'a> {
|
||||
UserPass { user: &'a str, pass: &'a str },
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct NatsTransport {
|
||||
conn: nats::asynk::Connection,
|
||||
}
|
||||
|
||||
impl NatsTransport {
|
||||
pub async fn new(options: NatsConnectOptions<'_>) -> Result<Self, TransportError> {
|
||||
let conn = match options.credentials {
|
||||
NatsConnectCredentials::UserPass { user, pass } => {
|
||||
nats::asynk::Options::with_user_pass(user, pass)
|
||||
.connect(options.host)
|
||||
.await
|
||||
.map_err(|e| anyhow::anyhow!(e))
|
||||
.map_err(TransportError::Err)?
|
||||
}
|
||||
};
|
||||
|
||||
Ok(Self { conn })
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl Transport for NatsTransport {
|
||||
type Stream = Pin<Box<dyn Stream<Item = Vec<u8>> + Send>>;
|
||||
|
||||
async fn publish(
|
||||
&self,
|
||||
event_info: &EventInfo,
|
||||
content: Vec<u8>,
|
||||
) -> Result<(), TransportError> {
|
||||
self.conn
|
||||
.publish(&event_info.transport_name(), &content)
|
||||
.await
|
||||
.map_err(|e| anyhow::anyhow!(e))
|
||||
.map_err(TransportError::Err)
|
||||
}
|
||||
async fn subscriber(
|
||||
&self,
|
||||
event_info: &EventInfo,
|
||||
) -> Result<Option<Self::Stream>, TransportError> {
|
||||
let sub = self
|
||||
.conn
|
||||
.subscribe(&event_info.transport_name())
|
||||
.await
|
||||
.map_err(|e| anyhow::anyhow!(e))
|
||||
.map_err(TransportError::Err)?;
|
||||
|
||||
let stream = futures::stream::unfold(sub, |sub| async move {
|
||||
let next = sub.next().await?;
|
||||
let next = next.data;
|
||||
Some((next, sub))
|
||||
});
|
||||
|
||||
Ok(Some(Box::pin(stream)))
|
||||
}
|
||||
}
|
||||
|
||||
trait EventInfoExt {
|
||||
fn transport_name(&self) -> String;
|
||||
}
|
||||
|
||||
impl EventInfoExt for EventInfo {
|
||||
fn transport_name(&self) -> String {
|
||||
format!(
|
||||
"crunch.{}.{}.{}",
|
||||
self.domain, self.entity_type, self.event_name
|
||||
)
|
||||
}
|
||||
}
|
@@ -7,6 +7,7 @@ edition = "2021"
|
||||
crunch-envelope.workspace = true
|
||||
crunch-in-memory = { workspace = true, optional = true }
|
||||
crunch-traits.workspace = true
|
||||
crunch-nats = {workspace = true,optional = true}
|
||||
|
||||
anyhow.workspace = true
|
||||
tracing.workspace = true
|
||||
@@ -21,6 +22,7 @@ futures.workspace = true
|
||||
tracing-subscriber.workspace = true
|
||||
|
||||
[features]
|
||||
default = ["in-memory", "traits"]
|
||||
default = ["in-memory", "traits", "nats"]
|
||||
traits = []
|
||||
in-memory = ["dep:crunch-in-memory"]
|
||||
in-memory = ["dep:crunch-in-memory"]
|
||||
nats = ["dep:crunch-nats"]
|
@@ -20,6 +20,11 @@ pub use publisher::Publisher;
|
||||
pub use subscriber::Subscriber;
|
||||
pub use transport::Transport;
|
||||
|
||||
#[cfg(feature = "nats")]
|
||||
pub mod nats {
|
||||
pub use crunch_nats::{NatsConnectCredentials, NatsConnectOptions};
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct Crunch {
|
||||
publisher: Publisher,
|
||||
@@ -74,6 +79,15 @@ pub mod builder {
|
||||
self
|
||||
}
|
||||
|
||||
#[cfg(feature = "nats")]
|
||||
pub async fn with_nats_transport(
|
||||
&mut self,
|
||||
options: crate::nats::NatsConnectOptions<'_>,
|
||||
) -> Result<&mut Self, crunch_traits::errors::TransportError> {
|
||||
self.transport = Some(Transport::nats(options).await?);
|
||||
Ok(self)
|
||||
}
|
||||
|
||||
pub fn with_outbox(&mut self, enabled: bool) -> &mut Self {
|
||||
self.outbox_enabled = enabled;
|
||||
self
|
||||
|
@@ -14,6 +14,15 @@ impl Transport {
|
||||
crunch_in_memory::InMemoryTransport::default(),
|
||||
))
|
||||
}
|
||||
|
||||
#[cfg(feature = "nats")]
|
||||
pub async fn nats(
|
||||
options: crate::nats::NatsConnectOptions<'_>,
|
||||
) -> Result<Self, crunch_traits::errors::TransportError> {
|
||||
Ok(Self(std::sync::Arc::new(
|
||||
crunch_nats::NatsTransport::new(options).await?,
|
||||
)))
|
||||
}
|
||||
}
|
||||
|
||||
impl From<DynTransport> for Transport {
|
||||
|
Reference in New Issue
Block a user