From bc28451f8d7619722cfbf235c5a7480af4824d37 Mon Sep 17 00:00:00 2001 From: kjuulh Date: Tue, 3 Oct 2023 23:08:12 +0200 Subject: [PATCH] feat: with tx on all Signed-off-by: kjuulh --- .gitignore | 1 + Cargo.lock | 6 ++ Cargo.toml | 3 +- crates/crunch-codegen/src/lib.rs | 34 ++++---- crates/crunch-file/src/lib.rs | 3 + crates/crunch-in-memory/src/persistence.rs | 10 ++- crates/crunch-postgres/Cargo.toml | 2 + .../migrations/20231003181849_initial.sql | 2 +- crates/crunch-postgres/src/lib.rs | 84 ++++++++++++++++++- crates/crunch-postgres/tests/new_test.rs | 8 ++ .../crunch-postgres/tests/persistence_test.rs | 63 ++++++++++++++ crates/crunch-traits/src/errors.rs | 3 + crates/crunch-traits/src/lib.rs | 6 +- crates/crunch/src/outbox.rs | 4 +- cuddle.yaml | 8 ++ scripts/db:shell.sh | 4 + scripts/local_down.sh | 19 +++++ scripts/local_up.sh | 24 ++++++ 18 files changed, 256 insertions(+), 28 deletions(-) create mode 100644 crates/crunch-postgres/tests/new_test.rs create mode 100644 crates/crunch-postgres/tests/persistence_test.rs create mode 100755 scripts/db:shell.sh create mode 100755 scripts/local_down.sh create mode 100755 scripts/local_up.sh diff --git a/.gitignore b/.gitignore index 9c4c004..a0fc708 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ target/ .cuddle/ +local.env diff --git a/Cargo.lock b/Cargo.lock index 1c1d1d1..7cbf41d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -725,6 +725,8 @@ dependencies = [ "crunch-envelope", "crunch-traits", "futures", + "serde", + "serde_json", "sqlx", "thiserror", "tokio", @@ -2449,6 +2451,7 @@ dependencies = [ "tokio-stream", "tracing", "url", + "uuid", "webpki-roots", ] @@ -2531,6 +2534,7 @@ dependencies = [ "stringprep", "thiserror", "tracing", + "uuid", "whoami", ] @@ -2571,6 +2575,7 @@ dependencies = [ "stringprep", "thiserror", "tracing", + "uuid", "whoami", ] @@ -2595,6 +2600,7 @@ dependencies = [ "sqlx-core", "tracing", "url", + "uuid", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 2976217..bd8853f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,6 +28,7 @@ nats = "0.24.0" clap = {version = "4.4.5", features = ["derive"]} toml_edit = {version = "0.20.0",features = ["serde"]} serde = {version = "1.0.188", features = ["derive"]} +serde_json = {version = "1.0.107"} prost = {version = "0.12"} prost-types = {version = "0.12"} prost-build = "0.12" @@ -37,6 +38,6 @@ genco = {version = "0.17.6"} walkdir = {version = "2.4.0"} regex = {version = "1.9.5"} inquire = {version = "0.6.2"} -sqlx = {version = "0.7.2", default-features = false, features = ["migrate", "macros", "postgres", "runtime-tokio", "tls-rustls", "chrono", "json" ]} +sqlx = {version = "0.7.2", default-features = false, features = ["migrate", "macros", "postgres", "runtime-tokio", "tls-rustls", "chrono", "json", "uuid" ]} pretty_assertions = "1.4.0" \ No newline at end of file diff --git a/crates/crunch-codegen/src/lib.rs b/crates/crunch-codegen/src/lib.rs index 6fc1a27..b5e6619 100644 --- a/crates/crunch-codegen/src/lib.rs +++ b/crates/crunch-codegen/src/lib.rs @@ -306,24 +306,24 @@ impl Default for Codegen { } } -#[cfg(test)] -mod tests { - use super::*; - #[test] - fn test_node() { - let mut root = Node::new("root".into(), None, None); +// #[cfg(test)] +// mod tests { +// use super::*; +// #[test] +// fn test_node() { +// let mut root = Node::new("root".into(), None, None); - root.insert("basic.my_event.rs", vec!["One".into(), "Two".into()]); - root.insert("basic.includes.includes.rs", vec!["Three".into()]); - root.insert("basic.includes.includes-two.rs", Vec::new()); +// root.insert("basic.my_event.rs", vec!["One".into(), "Two".into()]); +// root.insert("basic.includes.includes.rs", vec!["Three".into()]); +// root.insert("basic.includes.includes-two.rs", Vec::new()); - let res = root - .traverse() - .to_file_string() - .expect("to generate rust code"); +// let res = root +// .traverse() +// .to_file_string() +// .expect("to generate rust code"); - pretty_assertions::assert_eq!(res, r#""#); +// pretty_assertions::assert_eq!(res, r#""#); - panic!(); - } -} +// panic!(); +// } +// } diff --git a/crates/crunch-file/src/lib.rs b/crates/crunch-file/src/lib.rs index bfd003a..65afb42 100644 --- a/crates/crunch-file/src/lib.rs +++ b/crates/crunch-file/src/lib.rs @@ -151,6 +151,7 @@ output-path = "src/crunch" [[publish]] schema-path = "some-schema" output-path = "some-output" +entities = [] "#; let mut config = File::parse(raw).await?; let config = config.add_publish("some-schema", "some-output", &[]); @@ -176,6 +177,7 @@ codegen = ["rust"] [[publish]] schema-path = "some-schema" output-path = "some-output" +entities = [] "#; let mut config = File::parse(raw).await?; let config = config.add_publish("some-schema", "some-output", &[]); @@ -221,6 +223,7 @@ codegen = ["rust"] [[publish]] schema-path = "some-schema" output-path = "some-output" +entities = [] "#; let config = File::parse(raw).await?.get_config()?; diff --git a/crates/crunch-in-memory/src/persistence.rs b/crates/crunch-in-memory/src/persistence.rs index 858fd7c..3f6e050 100644 --- a/crates/crunch-in-memory/src/persistence.rs +++ b/crates/crunch-in-memory/src/persistence.rs @@ -21,6 +21,10 @@ pub struct Msg { state: MsgState, } +pub struct InMemoryTx {} + +impl crunch_traits::Tx for InMemoryTx {} + pub struct InMemoryPersistence { pub outbox: Arc>>, pub store: Arc>>, @@ -49,9 +53,11 @@ impl crunch_traits::Persistence for InMemoryPersistence { Ok(()) } - async fn next(&self) -> Option { + async fn next(&self) -> Result, PersistenceError> { let mut outbox = self.outbox.write().await; - outbox.pop_front().map(|i| i.id) + Ok(outbox + .pop_front() + .map(|i| (i.id, Box::new(InMemoryTx {}) as crunch_traits::DynTx))) } async fn get(&self, event_id: &str) -> Result)>, PersistenceError> { diff --git a/crates/crunch-postgres/Cargo.toml b/crates/crunch-postgres/Cargo.toml index eb17ba6..4447460 100644 --- a/crates/crunch-postgres/Cargo.toml +++ b/crates/crunch-postgres/Cargo.toml @@ -16,4 +16,6 @@ async-trait.workspace = true futures.workspace = true uuid.workspace = true sqlx.workspace = true +serde.workspace = true +serde_json.workspace = true tokio-stream = {workspace = true, features = ["sync"]} \ No newline at end of file diff --git a/crates/crunch-postgres/migrations/20231003181849_initial.sql b/crates/crunch-postgres/migrations/20231003181849_initial.sql index 580b2d1..e2a0c61 100644 --- a/crates/crunch-postgres/migrations/20231003181849_initial.sql +++ b/crates/crunch-postgres/migrations/20231003181849_initial.sql @@ -4,5 +4,5 @@ CREATE TABLE outbox ( metadata JSONB NOT NULL, content BYTEA NOT NULL, inserted_time TIMESTAMPTZ NOT NULL DEFAULT now(), - state VARCHAR NOT NULL, + state VARCHAR NOT NULL ); diff --git a/crates/crunch-postgres/src/lib.rs b/crates/crunch-postgres/src/lib.rs index 393fd1a..afb4078 100644 --- a/crates/crunch-postgres/src/lib.rs +++ b/crates/crunch-postgres/src/lib.rs @@ -1,6 +1,12 @@ use async_trait::async_trait; use crunch_traits::{errors::PersistenceError, EventInfo}; -use sqlx::{postgres::PgPoolOptions, Pool, Postgres}; +use serde::{Deserialize, Serialize}; +use sqlx::{postgres::PgPoolOptions, types::Json, Pool, Postgres}; +use uuid::Uuid; + +pub struct PostgresTx {} + +impl crunch_traits::Tx for PostgresTx {} pub struct PostgresPersistence { pool: Pool, @@ -14,15 +20,85 @@ impl PostgresPersistence { Ok(Self { pool }) } + + pub async fn new_from_env() -> anyhow::Result { + let dsn = std::env::var("DATABASE_URL") + .map_err(|e| anyhow::anyhow!("DATABASE_URL is not set: {e}"))?; + + Self::new(&dsn).await + } +} + +#[derive(sqlx::FromRow)] +struct InsertResp { + id: Uuid, +} + +#[derive(Clone, Serialize, Deserialize)] +struct PgEventInfo { + domain: &'static str, + entity_type: &'static str, + event_name: &'static str, +} + +impl From<&EventInfo> for PgEventInfo { + fn from(value: &EventInfo) -> Self { + Self { + domain: value.domain, + entity_type: value.entity_type, + event_name: value.event_name, + } + } } #[async_trait] impl crunch_traits::Persistence for PostgresPersistence { + // FIXME: Need some sort of concurrency control mechanism. If the insert fails or is done twice, then that user may receive multiple requests. + // This should be solved by adding transactions, event streams and sequence numbers async fn insert(&self, event_info: &EventInfo, content: Vec) -> anyhow::Result<()> { - todo!() + let event_info: PgEventInfo = event_info.into(); + sqlx::query_as::<_, InsertResp>( + r#" +INSERT INTO outbox (id, metadata, content, state) +VALUES ( + $1, + $2, + $3, + 'inserted' +) +RETURNING id; +"#, + ) + .bind(uuid::Uuid::new_v4()) + .bind(Json(&event_info)) + .bind(content) + .fetch_one(&self.pool) + .await?; + + Ok(()) } - async fn next(&self) -> Option { - todo!() + async fn next(&self) -> Result, PersistenceError> { + let resp = sqlx::query_as::<_, InsertResp>( + r#" +SELECT id +FROM outbox +WHERE state = 'inserted' +ORDER BY inserted_time ASC +LIMIT 1 +FOR UPDATE; +"#, + ) + .fetch_optional(&self.pool) + .await + .map_err(|e| anyhow::anyhow!(e)) + .map_err(PersistenceError::AnyErr)?; + + let id = match resp { + Some(InsertResp { id }) => Some(id.to_string()), + None => None, + }; + + Ok(id.map(|id| (id, Box::new(PostgresTx {}) as crunch_traits::DynTx))) } async fn get(&self, event_id: &str) -> Result)>, PersistenceError> { todo!() diff --git a/crates/crunch-postgres/tests/new_test.rs b/crates/crunch-postgres/tests/new_test.rs new file mode 100644 index 0000000..030a3c8 --- /dev/null +++ b/crates/crunch-postgres/tests/new_test.rs @@ -0,0 +1,8 @@ +use crunch_postgres::PostgresPersistence; + +#[tokio::test] +async fn test_new_from_env() -> anyhow::Result<()> { + PostgresPersistence::new_from_env().await?; + + Ok(()) +} diff --git a/crates/crunch-postgres/tests/persistence_test.rs b/crates/crunch-postgres/tests/persistence_test.rs new file mode 100644 index 0000000..2f311aa --- /dev/null +++ b/crates/crunch-postgres/tests/persistence_test.rs @@ -0,0 +1,63 @@ +use crunch_postgres::PostgresPersistence; +use crunch_traits::{EventInfo, Persistence}; + +#[tokio::test] +async fn test_persistence_insert() -> anyhow::Result<()> { + let persistence = PostgresPersistence::new_from_env().await?; + + persistence + .insert( + &EventInfo { + domain: "some-domain", + entity_type: "some-entity-type", + event_name: "some-event-name", + }, + b"some-strange-and-cruncy-content".to_vec(), + ) + .await?; + + persistence + .insert( + &EventInfo { + domain: "some-domain", + entity_type: "some-entity-type", + event_name: "some-event-name", + }, + b"some-strange-and-cruncy-content".to_vec(), + ) + .await?; + + Ok(()) +} + +#[tokio::test] +async fn test_persistence_next() -> anyhow::Result<()> { + let persistence = PostgresPersistence::new_from_env().await?; + + persistence + .insert( + &EventInfo { + domain: "some-domain", + entity_type: "some-entity-type", + event_name: "some-event-name", + }, + b"some-strange-and-cruncy-content".to_vec(), + ) + .await?; + + persistence + .insert( + &EventInfo { + domain: "some-domain", + entity_type: "some-entity-type", + event_name: "some-event-name", + }, + b"some-strange-and-cruncy-content".to_vec(), + ) + .await?; + + assert!(persistence.next().await?.is_some()); + assert!(persistence.next().await?.is_some()); + + Ok(()) +} diff --git a/crates/crunch-traits/src/errors.rs b/crates/crunch-traits/src/errors.rs index 0ebe494..6254b7a 100644 --- a/crates/crunch-traits/src/errors.rs +++ b/crates/crunch-traits/src/errors.rs @@ -54,6 +54,9 @@ pub enum PersistenceError { #[error("failed to publish item {0}")] UpdatePublished(anyhow::Error), + + #[error("database query failed {0}")] + AnyErr(anyhow::Error), } #[derive(Error, Debug)] diff --git a/crates/crunch-traits/src/lib.rs b/crates/crunch-traits/src/lib.rs index 8615a3e..da4c3af 100644 --- a/crates/crunch-traits/src/lib.rs +++ b/crates/crunch-traits/src/lib.rs @@ -3,10 +3,14 @@ use std::fmt::Display; use async_trait::async_trait; use errors::{DeserializeError, PersistenceError, SerializeError}; +pub trait Tx: Send + Sync {} + +pub type DynTx = Box; + #[async_trait] pub trait Persistence { async fn insert(&self, event_info: &EventInfo, content: Vec) -> anyhow::Result<()>; - async fn next(&self) -> Option; + async fn next(&self) -> Result, PersistenceError>; async fn get(&self, event_id: &str) -> Result)>, PersistenceError>; async fn update_published(&self, event_id: &str) -> Result<(), PersistenceError>; } diff --git a/crates/crunch/src/outbox.rs b/crates/crunch/src/outbox.rs index 62be44c..fda841e 100644 --- a/crates/crunch/src/outbox.rs +++ b/crates/crunch/src/outbox.rs @@ -34,8 +34,8 @@ impl OutboxHandler { } async fn handle_messages(p: &Persistence, t: &Transport) -> anyhow::Result> { - match p.next().await { - Some(item) => match p.get(&item).await? { + match p.next().await? { + Some((item, _)) => match p.get(&item).await? { Some((info, content)) => { t.publish(&info, content).await?; p.update_published(&item).await?; diff --git a/cuddle.yaml b/cuddle.yaml index de8cbec..9f31be1 100644 --- a/cuddle.yaml +++ b/cuddle.yaml @@ -5,3 +5,11 @@ base: "git@git.front.kjuulh.io:kjuulh/cuddle-base.git" vars: service: "crunch" registry: kasperhermansen + +scripts: + local_up: + type: shell + local_down: + type: shell + "db:shell": + type: shell diff --git a/scripts/db:shell.sh b/scripts/db:shell.sh new file mode 100755 index 0000000..5d7dbfe --- /dev/null +++ b/scripts/db:shell.sh @@ -0,0 +1,4 @@ +#!/bin/bash + +export PGPASSWORD="cuddle" +psql -h localhost -d cuddle -U cuddle diff --git a/scripts/local_down.sh b/scripts/local_down.sh new file mode 100755 index 0000000..3394688 --- /dev/null +++ b/scripts/local_down.sh @@ -0,0 +1,19 @@ +#!/bin/bash + +set -e + +docker_compose_content=$(cat < local.env +DATABASE_URL=postgres://cuddle:cuddle@localhost:5432/cuddle +EOF +