feat: add postgres support

Signed-off-by: kjuulh <contact@kjuulh.io>
This commit is contained in:
Kasper Juul Hermansen 2023-10-05 22:41:14 +02:00
parent 4a88d2fecd
commit 4d150febc7
Signed by: kjuulh
GPG Key ID: 57B6E1465221F912
3 changed files with 164 additions and 16 deletions

View File

@ -141,6 +141,23 @@ FOR UPDATE;
} }
} }
async fn update_published(&self, event_id: &str) -> Result<(), PersistenceError> { async fn update_published(&self, event_id: &str) -> Result<(), PersistenceError> {
todo!() sqlx::query(
r#"
UPDATE outbox
SET state = 'handled'
WHERE id = $1;
"#,
)
.bind(
Uuid::parse_str(event_id)
.map_err(|e| anyhow::anyhow!(e))
.map_err(PersistenceError::UpdatePublished)?,
)
.execute(&self.pool)
.await
.map_err(|e| anyhow::anyhow!(e))
.map_err(PersistenceError::UpdatePublished)?;
Ok(())
} }
} }

View File

@ -8,9 +8,9 @@ async fn test_persistence_insert() -> anyhow::Result<()> {
persistence persistence
.insert( .insert(
&EventInfo { &EventInfo {
domain: "some-domain", domain: "some-domain".into(),
entity_type: "some-entity-type", entity_type: "some-entity-type".into(),
event_name: "some-event-name", event_name: "some-event-name".into(),
}, },
b"some-strange-and-cruncy-content".to_vec(), b"some-strange-and-cruncy-content".to_vec(),
) )
@ -19,9 +19,9 @@ async fn test_persistence_insert() -> anyhow::Result<()> {
persistence persistence
.insert( .insert(
&EventInfo { &EventInfo {
domain: "some-domain", domain: "some-domain".into(),
entity_type: "some-entity-type", entity_type: "some-entity-type".into(),
event_name: "some-event-name", event_name: "some-event-name".into(),
}, },
b"some-strange-and-cruncy-content".to_vec(), b"some-strange-and-cruncy-content".to_vec(),
) )
@ -37,9 +37,9 @@ async fn test_persistence_next() -> anyhow::Result<()> {
persistence persistence
.insert( .insert(
&EventInfo { &EventInfo {
domain: "some-domain", domain: "some-domain".into(),
entity_type: "some-entity-type", entity_type: "some-entity-type".into(),
event_name: "some-event-name", event_name: "some-event-name".into(),
}, },
b"some-strange-and-cruncy-content".to_vec(), b"some-strange-and-cruncy-content".to_vec(),
) )
@ -48,9 +48,9 @@ async fn test_persistence_next() -> anyhow::Result<()> {
persistence persistence
.insert( .insert(
&EventInfo { &EventInfo {
domain: "some-domain", domain: "some-domain".into(),
entity_type: "some-entity-type", entity_type: "some-entity-type".into(),
event_name: "some-event-name", event_name: "some-event-name".into(),
}, },
b"some-strange-and-cruncy-content".to_vec(), b"some-strange-and-cruncy-content".to_vec(),
) )
@ -61,3 +61,47 @@ async fn test_persistence_next() -> anyhow::Result<()> {
Ok(()) Ok(())
} }
#[tokio::test]
async fn test_persistence_get() -> anyhow::Result<()> {
let persistence = PostgresPersistence::new_from_env().await?;
persistence
.insert(
&EventInfo {
domain: "some-domain".into(),
entity_type: "some-entity-type".into(),
event_name: "some-event-name".into(),
},
b"some-strange-and-cruncy-content".to_vec(),
)
.await?;
let (event_id, _) = persistence.next().await?.unwrap();
let (_, _) = persistence.get(&event_id).await?.unwrap();
Ok(())
}
#[tokio::test]
async fn test_persistence_update() -> anyhow::Result<()> {
let persistence = PostgresPersistence::new_from_env().await?;
persistence
.insert(
&EventInfo {
domain: "some-domain".into(),
entity_type: "some-entity-type".into(),
event_name: "some-event-name".into(),
},
b"some-strange-and-cruncy-content".to_vec(),
)
.await?;
let (event_id, _) = persistence.next().await?.unwrap();
let (_, _) = persistence.get(&event_id).await?.unwrap();
persistence.update_published(&event_id).await?;
Ok(())
}

View File

@ -1,3 +1,90 @@
pub mod basic { pub mod includes { pub mod my_include { use prost::Message; include!("basic.includes.my_include.rs"); impl ::crunch::traits::Serializer for MyInclude { fn serialize(&self) -> Result<Vec<u8>, ::crunch::errors::SerializeError> { Ok(self.encode_to_vec()) } } impl ::crunch::traits::Deserializer for MyInclude { fn deserialize(raw: Vec<u8>) -> Result<Self, ::crunch::errors::DeserializeError> where Self: Sized, { let output = Self::decode(raw.as_slice()).map_err(|e| ::crunch::errors::DeserializeError::ProtoErr(e))?; Ok(output) } } impl crunch::traits::Event for MyInclude { fn event_info() -> ::crunch::traits::EventInfo { ::crunch::traits::EventInfo { domain: "my-domain".into(), entity_type: "my-entity-type".into(), event_name: "my-event-name".into(), } } } } } pub mod basic {
pub mod my_event { use prost::Message; include!("basic.my_event.rs"); impl ::crunch::traits::Serializer for MyEvent { fn serialize(&self) -> Result<Vec<u8>, ::crunch::errors::SerializeError> { Ok(self.encode_to_vec()) } } impl ::crunch::traits::Deserializer for MyEvent { fn deserialize(raw: Vec<u8>) -> Result<Self, ::crunch::errors::DeserializeError> where Self: Sized, { let output = Self::decode(raw.as_slice()).map_err(|e| ::crunch::errors::DeserializeError::ProtoErr(e))?; Ok(output) } } impl crunch::traits::Event for MyEvent { fn event_info() -> ::crunch::traits::EventInfo { ::crunch::traits::EventInfo { domain: "my-domain".into(), entity_type: "my-entity-type".into(), event_name: "my-event-name".into(), } } } } } pub mod includes {
pub mod examples { pub mod example { use prost::Message; include!("examples.example.rs"); impl ::crunch::traits::Serializer for MyEvent { fn serialize(&self) -> Result<Vec<u8>, ::crunch::errors::SerializeError> { Ok(self.encode_to_vec()) } } impl ::crunch::traits::Deserializer for MyEvent { fn deserialize(raw: Vec<u8>) -> Result<Self, ::crunch::errors::DeserializeError> where Self: Sized, { let output = Self::decode(raw.as_slice()).map_err(|e| ::crunch::errors::DeserializeError::ProtoErr(e))?; Ok(output) } } impl crunch::traits::Event for MyEvent { fn event_info() -> ::crunch::traits::EventInfo { ::crunch::traits::EventInfo { domain: "my-domain".into(), entity_type: "my-entity-type".into(), event_name: "my-event-name".into(), } } } } } pub mod my_include {
use prost::Message;
include!("basic.includes.my_include.rs");
impl ::crunch::traits::Serializer for MyInclude {
fn serialize(&self) -> Result<Vec<u8>, ::crunch::errors::SerializeError> {
Ok(self.encode_to_vec())
}
}
impl ::crunch::traits::Deserializer for MyInclude {
fn deserialize(raw: Vec<u8>) -> Result<Self, ::crunch::errors::DeserializeError>
where
Self: Sized,
{
let output = Self::decode(raw.as_slice())
.map_err(|e| ::crunch::errors::DeserializeError::ProtoErr(e))?;
Ok(output)
}
}
impl crunch::traits::Event for MyInclude {
fn event_info() -> ::crunch::traits::EventInfo {
::crunch::traits::EventInfo {
domain: "my-domain".into(),
entity_type: "my-entity-type".into(),
event_name: "my-event-name".into(),
}
}
}
}
}
pub mod my_event {
use prost::Message;
include!("basic.my_event.rs");
impl ::crunch::traits::Serializer for MyEvent {
fn serialize(&self) -> Result<Vec<u8>, ::crunch::errors::SerializeError> {
Ok(self.encode_to_vec())
}
}
impl ::crunch::traits::Deserializer for MyEvent {
fn deserialize(raw: Vec<u8>) -> Result<Self, ::crunch::errors::DeserializeError>
where
Self: Sized,
{
let output = Self::decode(raw.as_slice())
.map_err(|e| ::crunch::errors::DeserializeError::ProtoErr(e))?;
Ok(output)
}
}
impl crunch::traits::Event for MyEvent {
fn event_info() -> ::crunch::traits::EventInfo {
::crunch::traits::EventInfo {
domain: "my-domain".into(),
entity_type: "my-entity-type".into(),
event_name: "my-event-name".into(),
}
}
}
}
}
pub mod examples {
pub mod example {
use prost::Message;
include!("examples.example.rs");
impl ::crunch::traits::Serializer for MyEvent {
fn serialize(&self) -> Result<Vec<u8>, ::crunch::errors::SerializeError> {
Ok(self.encode_to_vec())
}
}
impl ::crunch::traits::Deserializer for MyEvent {
fn deserialize(raw: Vec<u8>) -> Result<Self, ::crunch::errors::DeserializeError>
where
Self: Sized,
{
let output = Self::decode(raw.as_slice())
.map_err(|e| ::crunch::errors::DeserializeError::ProtoErr(e))?;
Ok(output)
}
}
impl crunch::traits::Event for MyEvent {
fn event_info() -> ::crunch::traits::EventInfo {
::crunch::traits::EventInfo {
domain: "my-domain".into(),
entity_type: "my-entity-type".into(),
event_name: "my-event-name".into(),
}
}
}
}
}