@@ -16,4 +16,6 @@ async-trait.workspace = true
|
||||
futures.workspace = true
|
||||
uuid.workspace = true
|
||||
sqlx.workspace = true
|
||||
serde.workspace = true
|
||||
serde_json.workspace = true
|
||||
tokio-stream = {workspace = true, features = ["sync"]}
|
@@ -4,5 +4,5 @@ CREATE TABLE outbox (
|
||||
metadata JSONB NOT NULL,
|
||||
content BYTEA NOT NULL,
|
||||
inserted_time TIMESTAMPTZ NOT NULL DEFAULT now(),
|
||||
state VARCHAR NOT NULL,
|
||||
state VARCHAR NOT NULL
|
||||
);
|
||||
|
@@ -1,6 +1,12 @@
|
||||
use async_trait::async_trait;
|
||||
use crunch_traits::{errors::PersistenceError, EventInfo};
|
||||
use sqlx::{postgres::PgPoolOptions, Pool, Postgres};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use sqlx::{postgres::PgPoolOptions, types::Json, Pool, Postgres};
|
||||
use uuid::Uuid;
|
||||
|
||||
pub struct PostgresTx {}
|
||||
|
||||
impl crunch_traits::Tx for PostgresTx {}
|
||||
|
||||
pub struct PostgresPersistence {
|
||||
pool: Pool<Postgres>,
|
||||
@@ -14,15 +20,85 @@ impl PostgresPersistence {
|
||||
|
||||
Ok(Self { pool })
|
||||
}
|
||||
|
||||
pub async fn new_from_env() -> anyhow::Result<Self> {
|
||||
let dsn = std::env::var("DATABASE_URL")
|
||||
.map_err(|e| anyhow::anyhow!("DATABASE_URL is not set: {e}"))?;
|
||||
|
||||
Self::new(&dsn).await
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(sqlx::FromRow)]
|
||||
struct InsertResp {
|
||||
id: Uuid,
|
||||
}
|
||||
|
||||
#[derive(Clone, Serialize, Deserialize)]
|
||||
struct PgEventInfo {
|
||||
domain: &'static str,
|
||||
entity_type: &'static str,
|
||||
event_name: &'static str,
|
||||
}
|
||||
|
||||
impl From<&EventInfo> for PgEventInfo {
|
||||
fn from(value: &EventInfo) -> Self {
|
||||
Self {
|
||||
domain: value.domain,
|
||||
entity_type: value.entity_type,
|
||||
event_name: value.event_name,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl crunch_traits::Persistence for PostgresPersistence {
|
||||
// FIXME: Need some sort of concurrency control mechanism. If the insert fails or is done twice, then that user may receive multiple requests.
|
||||
// This should be solved by adding transactions, event streams and sequence numbers
|
||||
async fn insert(&self, event_info: &EventInfo, content: Vec<u8>) -> anyhow::Result<()> {
|
||||
todo!()
|
||||
let event_info: PgEventInfo = event_info.into();
|
||||
sqlx::query_as::<_, InsertResp>(
|
||||
r#"
|
||||
INSERT INTO outbox (id, metadata, content, state)
|
||||
VALUES (
|
||||
$1,
|
||||
$2,
|
||||
$3,
|
||||
'inserted'
|
||||
)
|
||||
RETURNING id;
|
||||
"#,
|
||||
)
|
||||
.bind(uuid::Uuid::new_v4())
|
||||
.bind(Json(&event_info))
|
||||
.bind(content)
|
||||
.fetch_one(&self.pool)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
async fn next(&self) -> Option<String> {
|
||||
todo!()
|
||||
async fn next(&self) -> Result<Option<(String, crunch_traits::DynTx)>, PersistenceError> {
|
||||
let resp = sqlx::query_as::<_, InsertResp>(
|
||||
r#"
|
||||
SELECT id
|
||||
FROM outbox
|
||||
WHERE state = 'inserted'
|
||||
ORDER BY inserted_time ASC
|
||||
LIMIT 1
|
||||
FOR UPDATE;
|
||||
"#,
|
||||
)
|
||||
.fetch_optional(&self.pool)
|
||||
.await
|
||||
.map_err(|e| anyhow::anyhow!(e))
|
||||
.map_err(PersistenceError::AnyErr)?;
|
||||
|
||||
let id = match resp {
|
||||
Some(InsertResp { id }) => Some(id.to_string()),
|
||||
None => None,
|
||||
};
|
||||
|
||||
Ok(id.map(|id| (id, Box::new(PostgresTx {}) as crunch_traits::DynTx)))
|
||||
}
|
||||
async fn get(&self, event_id: &str) -> Result<Option<(EventInfo, Vec<u8>)>, PersistenceError> {
|
||||
todo!()
|
||||
|
8
crates/crunch-postgres/tests/new_test.rs
Normal file
8
crates/crunch-postgres/tests/new_test.rs
Normal file
@@ -0,0 +1,8 @@
|
||||
use crunch_postgres::PostgresPersistence;
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_new_from_env() -> anyhow::Result<()> {
|
||||
PostgresPersistence::new_from_env().await?;
|
||||
|
||||
Ok(())
|
||||
}
|
63
crates/crunch-postgres/tests/persistence_test.rs
Normal file
63
crates/crunch-postgres/tests/persistence_test.rs
Normal file
@@ -0,0 +1,63 @@
|
||||
use crunch_postgres::PostgresPersistence;
|
||||
use crunch_traits::{EventInfo, Persistence};
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_persistence_insert() -> anyhow::Result<()> {
|
||||
let persistence = PostgresPersistence::new_from_env().await?;
|
||||
|
||||
persistence
|
||||
.insert(
|
||||
&EventInfo {
|
||||
domain: "some-domain",
|
||||
entity_type: "some-entity-type",
|
||||
event_name: "some-event-name",
|
||||
},
|
||||
b"some-strange-and-cruncy-content".to_vec(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
persistence
|
||||
.insert(
|
||||
&EventInfo {
|
||||
domain: "some-domain",
|
||||
entity_type: "some-entity-type",
|
||||
event_name: "some-event-name",
|
||||
},
|
||||
b"some-strange-and-cruncy-content".to_vec(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_persistence_next() -> anyhow::Result<()> {
|
||||
let persistence = PostgresPersistence::new_from_env().await?;
|
||||
|
||||
persistence
|
||||
.insert(
|
||||
&EventInfo {
|
||||
domain: "some-domain",
|
||||
entity_type: "some-entity-type",
|
||||
event_name: "some-event-name",
|
||||
},
|
||||
b"some-strange-and-cruncy-content".to_vec(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
persistence
|
||||
.insert(
|
||||
&EventInfo {
|
||||
domain: "some-domain",
|
||||
entity_type: "some-entity-type",
|
||||
event_name: "some-event-name",
|
||||
},
|
||||
b"some-strange-and-cruncy-content".to_vec(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
assert!(persistence.next().await?.is_some());
|
||||
assert!(persistence.next().await?.is_some());
|
||||
|
||||
Ok(())
|
||||
}
|
Reference in New Issue
Block a user