feat: without stream, cuz it sucks

Signed-off-by: kjuulh <contact@kjuulh.io>
This commit is contained in:
Kasper Juul Hermansen 2023-09-23 00:22:18 +02:00
parent 5bb9286b10
commit 7590c56dde
Signed by: kjuulh
GPG Key ID: 9AA7BC13CE474394
4 changed files with 41 additions and 42 deletions

View File

@ -1,4 +1,4 @@
use crunch::{Deserializer, Event, EventInfo, Persistence, Publisher, Serializer}; use crunch::{Deserializer, Event, EventInfo, OutboxHandler, Persistence, Publisher, Serializer};
struct SomeEvent { struct SomeEvent {
name: String, name: String,
@ -11,7 +11,7 @@ impl Serializer for SomeEvent {
} }
impl Deserializer for SomeEvent { impl Deserializer for SomeEvent {
fn deserialize(raw: Vec<u8>) -> Result<Self, crunch::DeserializeError> fn deserialize(_raw: Vec<u8>) -> Result<Self, crunch::DeserializeError>
where where
Self: Sized, Self: Sized,
{ {
@ -35,6 +35,7 @@ async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt::init(); tracing_subscriber::fmt::init();
let in_memory = Persistence::in_memory(); let in_memory = Persistence::in_memory();
OutboxHandler::new(in_memory.clone()).spawn();
let publisher = Publisher::new(in_memory); let publisher = Publisher::new(in_memory);
publisher publisher
@ -42,6 +43,28 @@ async fn main() -> anyhow::Result<()> {
name: "something".into(), name: "something".into(),
}) })
.await?; .await?;
publisher
.publish(SomeEvent {
name: "something".into(),
})
.await?;
publisher
.publish(SomeEvent {
name: "something".into(),
})
.await?;
publisher
.publish(SomeEvent {
name: "something".into(),
})
.await?;
publisher
.publish(SomeEvent {
name: "something".into(),
})
.await?;
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
Ok(()) Ok(())
} }

View File

@ -1,8 +1,7 @@
use std::{collections::VecDeque, future::Future, ops::Deref, pin::Pin, sync::Arc, task::Poll}; use std::{collections::VecDeque, ops::Deref, sync::Arc};
use async_trait::async_trait; use async_trait::async_trait;
use tokio::sync::{Mutex, OnceCell, RwLock}; use tokio::sync::RwLock;
use tokio_stream::Stream;
use crate::{traits, EventInfo}; use crate::{traits, EventInfo};
@ -30,7 +29,6 @@ impl traits::Persistence for InMemoryPersistence {
let msg = crunch_envelope::proto::wrap(event_info.domain, event_info.entity_type, &content); let msg = crunch_envelope::proto::wrap(event_info.domain, event_info.entity_type, &content);
let mut outbox = self.outbox.write().await; let mut outbox = self.outbox.write().await;
outbox.push_back(Msg { outbox.push_back(Msg {
id: uuid::Uuid::new_v4().to_string(), id: uuid::Uuid::new_v4().to_string(),
info: event_info.clone(), info: event_info.clone(),
@ -56,7 +54,6 @@ impl traits::Persistence for InMemoryPersistence {
#[derive(Clone)] #[derive(Clone)]
pub struct Persistence { pub struct Persistence {
inner: Arc<dyn traits::Persistence + Send + Sync + 'static>, inner: Arc<dyn traits::Persistence + Send + Sync + 'static>,
pending: Arc<Option<Pin<Box<dyn Future<Output = Option<OnceCell<String>>> + Send + Sync>>>>,
} }
impl Persistence { impl Persistence {
@ -65,7 +62,6 @@ impl Persistence {
inner: Arc::new(InMemoryPersistence { inner: Arc::new(InMemoryPersistence {
outbox: Arc::default(), outbox: Arc::default(),
}), }),
pending: Arc::default(),
} }
} }
} }
@ -77,28 +73,3 @@ impl Deref for Persistence {
&self.inner &self.inner
} }
} }
impl Stream for Persistence {
type Item = OnceCell<String>;
fn poll_next(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Option<Self::Item>> {
let mut pending = self.pending;
if pending.is_none() {
*pending = Some(Box::pin(self.inner.next()));
}
let fut = pending.as_mut().unwrap();
match fut.as_mut().poll(cx) {
Poll::Ready(v) => {
*pending = None;
Poll::Ready(v)
}
Poll::Pending => Poll::Pending,
}
}
}

View File

@ -4,12 +4,11 @@ mod traits;
pub use errors::*; pub use errors::*;
pub use impls::Persistence; pub use impls::Persistence;
pub use outbox::OutboxHandler;
pub use traits::{Deserializer, Event, EventInfo, Serializer}; pub use traits::{Deserializer, Event, EventInfo, Serializer};
mod outbox { mod outbox {
use std::sync::Arc; use crate::Persistence;
pub use crate::Persistence;
pub struct OutboxHandler { pub struct OutboxHandler {
persistence: Persistence, persistence: Persistence,
@ -20,12 +19,19 @@ mod outbox {
Self { persistence } Self { persistence }
} }
pub async fn spawn(&mut self) { pub fn spawn(&mut self) {
let p = self.persistence.clone(); let p = self.persistence.clone();
tokio::spawn(async move { tokio::spawn(async move {
let p = p; loop {
match p.next().await {
while let Some(item) = p.next().await {} Some(item) => {
tracing::info!("got item: {}", item);
}
None => {
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
}
}
}
}); });
} }
} }

View File

@ -1,14 +1,13 @@
use std::fmt::Display; use std::fmt::Display;
use async_trait::async_trait; use async_trait::async_trait;
use tokio::sync::OnceCell;
use crate::{DeserializeError, SerializeError}; use crate::{DeserializeError, SerializeError};
#[async_trait] #[async_trait]
pub trait Persistence { pub trait Persistence {
async fn insert(&self, event_info: &EventInfo, content: Vec<u8>) -> anyhow::Result<()>; async fn insert(&self, event_info: &EventInfo, content: Vec<u8>) -> anyhow::Result<()>;
async fn next(&self) -> Option<OnceCell<String>>; async fn next(&self) -> Option<String>;
} }
pub trait Serializer { pub trait Serializer {