From 4d150febc7a1b4458f897c7876f87c3becd85794 Mon Sep 17 00:00:00 2001 From: kjuulh Date: Thu, 5 Oct 2023 22:41:14 +0200 Subject: [PATCH] feat: add postgres support Signed-off-by: kjuulh --- crates/crunch-postgres/src/lib.rs | 19 +++- .../crunch-postgres/tests/persistence_test.rs | 68 +++++++++++--- examples/basic-setup/src/gencrunch/mod.rs | 93 ++++++++++++++++++- 3 files changed, 164 insertions(+), 16 deletions(-) diff --git a/crates/crunch-postgres/src/lib.rs b/crates/crunch-postgres/src/lib.rs index 12940db..476cbd5 100644 --- a/crates/crunch-postgres/src/lib.rs +++ b/crates/crunch-postgres/src/lib.rs @@ -141,6 +141,23 @@ FOR UPDATE; } } async fn update_published(&self, event_id: &str) -> Result<(), PersistenceError> { - todo!() + sqlx::query( + r#" +UPDATE outbox +SET state = 'handled' +WHERE id = $1; +"#, + ) + .bind( + Uuid::parse_str(event_id) + .map_err(|e| anyhow::anyhow!(e)) + .map_err(PersistenceError::UpdatePublished)?, + ) + .execute(&self.pool) + .await + .map_err(|e| anyhow::anyhow!(e)) + .map_err(PersistenceError::UpdatePublished)?; + + Ok(()) } } diff --git a/crates/crunch-postgres/tests/persistence_test.rs b/crates/crunch-postgres/tests/persistence_test.rs index 2f311aa..b506d60 100644 --- a/crates/crunch-postgres/tests/persistence_test.rs +++ b/crates/crunch-postgres/tests/persistence_test.rs @@ -8,9 +8,9 @@ async fn test_persistence_insert() -> anyhow::Result<()> { persistence .insert( &EventInfo { - domain: "some-domain", - entity_type: "some-entity-type", - event_name: "some-event-name", + domain: "some-domain".into(), + entity_type: "some-entity-type".into(), + event_name: "some-event-name".into(), }, b"some-strange-and-cruncy-content".to_vec(), ) @@ -19,9 +19,9 @@ async fn test_persistence_insert() -> anyhow::Result<()> { persistence .insert( &EventInfo { - domain: "some-domain", - entity_type: "some-entity-type", - event_name: "some-event-name", + domain: "some-domain".into(), + entity_type: "some-entity-type".into(), + event_name: "some-event-name".into(), }, b"some-strange-and-cruncy-content".to_vec(), ) @@ -37,9 +37,9 @@ async fn test_persistence_next() -> anyhow::Result<()> { persistence .insert( &EventInfo { - domain: "some-domain", - entity_type: "some-entity-type", - event_name: "some-event-name", + domain: "some-domain".into(), + entity_type: "some-entity-type".into(), + event_name: "some-event-name".into(), }, b"some-strange-and-cruncy-content".to_vec(), ) @@ -48,9 +48,9 @@ async fn test_persistence_next() -> anyhow::Result<()> { persistence .insert( &EventInfo { - domain: "some-domain", - entity_type: "some-entity-type", - event_name: "some-event-name", + domain: "some-domain".into(), + entity_type: "some-entity-type".into(), + event_name: "some-event-name".into(), }, b"some-strange-and-cruncy-content".to_vec(), ) @@ -61,3 +61,47 @@ async fn test_persistence_next() -> anyhow::Result<()> { Ok(()) } + +#[tokio::test] +async fn test_persistence_get() -> anyhow::Result<()> { + let persistence = PostgresPersistence::new_from_env().await?; + + persistence + .insert( + &EventInfo { + domain: "some-domain".into(), + entity_type: "some-entity-type".into(), + event_name: "some-event-name".into(), + }, + b"some-strange-and-cruncy-content".to_vec(), + ) + .await?; + + let (event_id, _) = persistence.next().await?.unwrap(); + let (_, _) = persistence.get(&event_id).await?.unwrap(); + + Ok(()) +} + +#[tokio::test] +async fn test_persistence_update() -> anyhow::Result<()> { + let persistence = PostgresPersistence::new_from_env().await?; + + persistence + .insert( + &EventInfo { + domain: "some-domain".into(), + entity_type: "some-entity-type".into(), + event_name: "some-event-name".into(), + }, + b"some-strange-and-cruncy-content".to_vec(), + ) + .await?; + + let (event_id, _) = persistence.next().await?.unwrap(); + let (_, _) = persistence.get(&event_id).await?.unwrap(); + + persistence.update_published(&event_id).await?; + + Ok(()) +} diff --git a/examples/basic-setup/src/gencrunch/mod.rs b/examples/basic-setup/src/gencrunch/mod.rs index 5f3ef86..65163a3 100644 --- a/examples/basic-setup/src/gencrunch/mod.rs +++ b/examples/basic-setup/src/gencrunch/mod.rs @@ -1,3 +1,90 @@ -pub mod basic { pub mod includes { pub mod my_include { use prost::Message; include!("basic.includes.my_include.rs"); impl ::crunch::traits::Serializer for MyInclude { fn serialize(&self) -> Result, ::crunch::errors::SerializeError> { Ok(self.encode_to_vec()) } } impl ::crunch::traits::Deserializer for MyInclude { fn deserialize(raw: Vec) -> Result where Self: Sized, { let output = Self::decode(raw.as_slice()).map_err(|e| ::crunch::errors::DeserializeError::ProtoErr(e))?; Ok(output) } } impl crunch::traits::Event for MyInclude { fn event_info() -> ::crunch::traits::EventInfo { ::crunch::traits::EventInfo { domain: "my-domain".into(), entity_type: "my-entity-type".into(), event_name: "my-event-name".into(), } } } } } -pub mod my_event { use prost::Message; include!("basic.my_event.rs"); impl ::crunch::traits::Serializer for MyEvent { fn serialize(&self) -> Result, ::crunch::errors::SerializeError> { Ok(self.encode_to_vec()) } } impl ::crunch::traits::Deserializer for MyEvent { fn deserialize(raw: Vec) -> Result where Self: Sized, { let output = Self::decode(raw.as_slice()).map_err(|e| ::crunch::errors::DeserializeError::ProtoErr(e))?; Ok(output) } } impl crunch::traits::Event for MyEvent { fn event_info() -> ::crunch::traits::EventInfo { ::crunch::traits::EventInfo { domain: "my-domain".into(), entity_type: "my-entity-type".into(), event_name: "my-event-name".into(), } } } } } -pub mod examples { pub mod example { use prost::Message; include!("examples.example.rs"); impl ::crunch::traits::Serializer for MyEvent { fn serialize(&self) -> Result, ::crunch::errors::SerializeError> { Ok(self.encode_to_vec()) } } impl ::crunch::traits::Deserializer for MyEvent { fn deserialize(raw: Vec) -> Result where Self: Sized, { let output = Self::decode(raw.as_slice()).map_err(|e| ::crunch::errors::DeserializeError::ProtoErr(e))?; Ok(output) } } impl crunch::traits::Event for MyEvent { fn event_info() -> ::crunch::traits::EventInfo { ::crunch::traits::EventInfo { domain: "my-domain".into(), entity_type: "my-entity-type".into(), event_name: "my-event-name".into(), } } } } } +pub mod basic { + pub mod includes { + pub mod my_include { + use prost::Message; + include!("basic.includes.my_include.rs"); + impl ::crunch::traits::Serializer for MyInclude { + fn serialize(&self) -> Result, ::crunch::errors::SerializeError> { + Ok(self.encode_to_vec()) + } + } + impl ::crunch::traits::Deserializer for MyInclude { + fn deserialize(raw: Vec) -> Result + where + Self: Sized, + { + let output = Self::decode(raw.as_slice()) + .map_err(|e| ::crunch::errors::DeserializeError::ProtoErr(e))?; + Ok(output) + } + } + impl crunch::traits::Event for MyInclude { + fn event_info() -> ::crunch::traits::EventInfo { + ::crunch::traits::EventInfo { + domain: "my-domain".into(), + entity_type: "my-entity-type".into(), + event_name: "my-event-name".into(), + } + } + } + } + } + pub mod my_event { + use prost::Message; + include!("basic.my_event.rs"); + impl ::crunch::traits::Serializer for MyEvent { + fn serialize(&self) -> Result, ::crunch::errors::SerializeError> { + Ok(self.encode_to_vec()) + } + } + impl ::crunch::traits::Deserializer for MyEvent { + fn deserialize(raw: Vec) -> Result + where + Self: Sized, + { + let output = Self::decode(raw.as_slice()) + .map_err(|e| ::crunch::errors::DeserializeError::ProtoErr(e))?; + Ok(output) + } + } + impl crunch::traits::Event for MyEvent { + fn event_info() -> ::crunch::traits::EventInfo { + ::crunch::traits::EventInfo { + domain: "my-domain".into(), + entity_type: "my-entity-type".into(), + event_name: "my-event-name".into(), + } + } + } + } +} +pub mod examples { + pub mod example { + use prost::Message; + include!("examples.example.rs"); + impl ::crunch::traits::Serializer for MyEvent { + fn serialize(&self) -> Result, ::crunch::errors::SerializeError> { + Ok(self.encode_to_vec()) + } + } + impl ::crunch::traits::Deserializer for MyEvent { + fn deserialize(raw: Vec) -> Result + where + Self: Sized, + { + let output = Self::decode(raw.as_slice()) + .map_err(|e| ::crunch::errors::DeserializeError::ProtoErr(e))?; + Ok(output) + } + } + impl crunch::traits::Event for MyEvent { + fn event_info() -> ::crunch::traits::EventInfo { + ::crunch::traits::EventInfo { + domain: "my-domain".into(), + entity_type: "my-entity-type".into(), + event_name: "my-event-name".into(), + } + } + } + } +}