feat: fixed basic example

Signed-off-by: kjuulh <contact@kjuulh.io>
This commit is contained in:
Kasper Juul Hermansen 2023-09-24 00:47:59 +02:00
parent 6db03e37cb
commit 687af13d9c
Signed by: kjuulh
GPG Key ID: 9AA7BC13CE474394
4 changed files with 98 additions and 35 deletions

View File

@ -33,7 +33,7 @@ impl InMemoryTransport {
events.insert(transport_key.clone(), sender); events.insert(transport_key.clone(), sender);
tokio::spawn(async move { tokio::spawn(async move {
while let Ok(item) = receiver.recv().await { while let Ok(item) = receiver.recv().await {
tracing::info!("default receiver: {}", item.info.transport_name()); tracing::trace!("default receiver: {}", item.info.transport_name());
} }
}); });
} }

View File

@ -38,23 +38,14 @@ async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt::init(); tracing_subscriber::fmt::init();
let crunch = crunch::builder::Builder::default().build()?; let crunch = crunch::builder::Builder::default().build()?;
let counter = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0));
let inner_counter = counter.clone();
crunch crunch
.subscribe(move |item: SomeEvent| { .subscribe(move |item: SomeEvent| async move {
let counter = inner_counter.clone();
async move {
tracing::info!( tracing::info!(
"subscription got event: {}, info: {}", "subscription got event: {}, info: {}",
item.name, item.name,
item.int_event_info(), item.int_event_info(),
); );
counter.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
Ok(()) Ok(())
}
}) })
.await?; .await?;
@ -62,24 +53,11 @@ async fn main() -> anyhow::Result<()> {
name: "something".into(), name: "something".into(),
}; };
for _ in 0..50 { crunch.publish(event.clone()).await?;
tokio::spawn({ crunch.publish(event.clone()).await?;
let event = event.clone(); crunch.publish(event.clone()).await?;
let crunch = crunch.clone();
async move { tokio::time::sleep(std::time::Duration::from_secs(5)).await;
loop {
crunch.publish(event.clone()).await.unwrap();
tokio::time::sleep(std::time::Duration::from_millis(1)).await;
}
}
});
}
tokio::time::sleep(std::time::Duration::from_secs(30)).await;
let amount_run = counter.load(std::sync::atomic::Ordering::SeqCst);
tracing::error!("ran {} amount of times", amount_run);
Ok(()) Ok(())
} }

View File

@ -0,0 +1,85 @@
use crunch::errors::*;
use crunch::traits::Event;
#[derive(Clone)]
struct SomeEvent {
name: String,
}
impl crunch::traits::Serializer for SomeEvent {
fn serialize(&self) -> Result<Vec<u8>, SerializeError> {
Ok(b"field=name".to_vec())
}
}
impl crunch::traits::Deserializer for SomeEvent {
fn deserialize(_raw: Vec<u8>) -> Result<Self, DeserializeError>
where
Self: Sized,
{
Ok(Self {
name: "something".into(),
})
}
}
impl crunch::traits::Event for SomeEvent {
fn event_info() -> crunch::traits::EventInfo {
crunch::traits::EventInfo {
domain: "some-domain",
entity_type: "some-entity",
event_name: "some-event",
}
}
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt::init();
let crunch = crunch::builder::Builder::default().build()?;
let counter = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0));
let inner_counter = counter.clone();
crunch
.subscribe(move |item: SomeEvent| {
let counter = inner_counter.clone();
async move {
tracing::info!(
"subscription got event: {}, info: {}",
item.name,
item.int_event_info(),
);
counter.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
Ok(())
}
})
.await?;
let event = SomeEvent {
name: "something".into(),
};
for _ in 0..50 {
tokio::spawn({
let event = event.clone();
let crunch = crunch.clone();
async move {
loop {
crunch.publish(event.clone()).await.unwrap();
tokio::time::sleep(std::time::Duration::from_millis(1)).await;
}
}
});
}
tokio::time::sleep(std::time::Duration::from_secs(30)).await;
let amount_run = counter.load(std::sync::atomic::Ordering::SeqCst);
tracing::error!("ran {} amount of times", amount_run);
Ok(())
}

View File

@ -41,7 +41,7 @@ impl crunch_traits::Persistence for InMemoryPersistence {
outbox.push_back(msg.clone()); outbox.push_back(msg.clone());
self.store.write().await.insert(msg.id.clone(), msg); self.store.write().await.insert(msg.id.clone(), msg);
tracing::info!( tracing::debug!(
event_info = event_info.to_string(), event_info = event_info.to_string(),
content_len = content.len(), content_len = content.len(),
"inserted event" "inserted event"