From 7590c56dde6734ade1ecb5e1400a5b71005bb927 Mon Sep 17 00:00:00 2001 From: kjuulh Date: Sat, 23 Sep 2023 00:22:18 +0200 Subject: [PATCH] feat: without stream, cuz it sucks Signed-off-by: kjuulh --- crates/crunch/examples/basic.rs | 27 +++++++++++++++++++++++++-- crates/crunch/src/impls.rs | 33 ++------------------------------- crates/crunch/src/lib.rs | 20 +++++++++++++------- crates/crunch/src/traits.rs | 3 +-- 4 files changed, 41 insertions(+), 42 deletions(-) diff --git a/crates/crunch/examples/basic.rs b/crates/crunch/examples/basic.rs index 4395eb2..b8f7ad5 100644 --- a/crates/crunch/examples/basic.rs +++ b/crates/crunch/examples/basic.rs @@ -1,4 +1,4 @@ -use crunch::{Deserializer, Event, EventInfo, Persistence, Publisher, Serializer}; +use crunch::{Deserializer, Event, EventInfo, OutboxHandler, Persistence, Publisher, Serializer}; struct SomeEvent { name: String, @@ -11,7 +11,7 @@ impl Serializer for SomeEvent { } impl Deserializer for SomeEvent { - fn deserialize(raw: Vec) -> Result + fn deserialize(_raw: Vec) -> Result where Self: Sized, { @@ -35,6 +35,7 @@ async fn main() -> anyhow::Result<()> { tracing_subscriber::fmt::init(); let in_memory = Persistence::in_memory(); + OutboxHandler::new(in_memory.clone()).spawn(); let publisher = Publisher::new(in_memory); publisher @@ -42,6 +43,28 @@ async fn main() -> anyhow::Result<()> { name: "something".into(), }) .await?; + publisher + .publish(SomeEvent { + name: "something".into(), + }) + .await?; + publisher + .publish(SomeEvent { + name: "something".into(), + }) + .await?; + publisher + .publish(SomeEvent { + name: "something".into(), + }) + .await?; + publisher + .publish(SomeEvent { + name: "something".into(), + }) + .await?; + + tokio::time::sleep(std::time::Duration::from_secs(5)).await; Ok(()) } diff --git a/crates/crunch/src/impls.rs b/crates/crunch/src/impls.rs index 08a6f94..74bc6b0 100644 --- a/crates/crunch/src/impls.rs +++ b/crates/crunch/src/impls.rs @@ -1,8 +1,7 @@ -use std::{collections::VecDeque, future::Future, ops::Deref, pin::Pin, sync::Arc, task::Poll}; +use std::{collections::VecDeque, ops::Deref, sync::Arc}; use async_trait::async_trait; -use tokio::sync::{Mutex, OnceCell, RwLock}; -use tokio_stream::Stream; +use tokio::sync::RwLock; use crate::{traits, EventInfo}; @@ -30,7 +29,6 @@ impl traits::Persistence for InMemoryPersistence { let msg = crunch_envelope::proto::wrap(event_info.domain, event_info.entity_type, &content); let mut outbox = self.outbox.write().await; - outbox.push_back(Msg { id: uuid::Uuid::new_v4().to_string(), info: event_info.clone(), @@ -56,7 +54,6 @@ impl traits::Persistence for InMemoryPersistence { #[derive(Clone)] pub struct Persistence { inner: Arc, - pending: Arc>> + Send + Sync>>>>, } impl Persistence { @@ -65,7 +62,6 @@ impl Persistence { inner: Arc::new(InMemoryPersistence { outbox: Arc::default(), }), - pending: Arc::default(), } } } @@ -77,28 +73,3 @@ impl Deref for Persistence { &self.inner } } - -impl Stream for Persistence { - type Item = OnceCell; - - fn poll_next( - self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> Poll> { - let mut pending = self.pending; - - if pending.is_none() { - *pending = Some(Box::pin(self.inner.next())); - } - - let fut = pending.as_mut().unwrap(); - - match fut.as_mut().poll(cx) { - Poll::Ready(v) => { - *pending = None; - Poll::Ready(v) - } - Poll::Pending => Poll::Pending, - } - } -} diff --git a/crates/crunch/src/lib.rs b/crates/crunch/src/lib.rs index 9b26415..8ef4bd1 100644 --- a/crates/crunch/src/lib.rs +++ b/crates/crunch/src/lib.rs @@ -4,12 +4,11 @@ mod traits; pub use errors::*; pub use impls::Persistence; +pub use outbox::OutboxHandler; pub use traits::{Deserializer, Event, EventInfo, Serializer}; mod outbox { - use std::sync::Arc; - - pub use crate::Persistence; + use crate::Persistence; pub struct OutboxHandler { persistence: Persistence, @@ -20,12 +19,19 @@ mod outbox { Self { persistence } } - pub async fn spawn(&mut self) { + pub fn spawn(&mut self) { let p = self.persistence.clone(); tokio::spawn(async move { - let p = p; - - while let Some(item) = p.next().await {} + loop { + match p.next().await { + Some(item) => { + tracing::info!("got item: {}", item); + } + None => { + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + } + } + } }); } } diff --git a/crates/crunch/src/traits.rs b/crates/crunch/src/traits.rs index 9b5cbc0..1247b13 100644 --- a/crates/crunch/src/traits.rs +++ b/crates/crunch/src/traits.rs @@ -1,14 +1,13 @@ use std::fmt::Display; use async_trait::async_trait; -use tokio::sync::OnceCell; use crate::{DeserializeError, SerializeError}; #[async_trait] pub trait Persistence { async fn insert(&self, event_info: &EventInfo, content: Vec) -> anyhow::Result<()>; - async fn next(&self) -> Option>; + async fn next(&self) -> Option; } pub trait Serializer {