diff --git a/Cargo.lock b/Cargo.lock index 7cbf41d..ab8b3f2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -345,7 +345,10 @@ checksum = "7f2c685bad3eb3d45a01354cedb7d5faa66194d1d58ba6e267a8de788f79db38" dependencies = [ "android-tzdata", "iana-time-zone", + "js-sys", "num-traits", + "serde", + "wasm-bindgen", "windows-targets", ] @@ -722,6 +725,7 @@ version = "0.1.0" dependencies = [ "anyhow", "async-trait", + "chrono", "crunch-envelope", "crunch-traits", "futures", @@ -2955,6 +2959,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "79daa5ed5740825c40b389c5e50312b9c86df53fccd33f281df655642b43869d" dependencies = [ "getrandom", + "serde", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index bd8853f..87b469e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,7 +22,7 @@ tracing = { version = "0.1", features = ["log"] } tracing-subscriber = "0.3.17" thiserror = {version = "1.0.48"} async-trait = "0.1.73" -uuid = { version = "1.4.1", features = ["v4"]} +uuid = { version = "1.4.1", features = ["v4", "serde"]} futures = "0.3.28" nats = "0.24.0" clap = {version = "4.4.5", features = ["derive"]} @@ -39,5 +39,6 @@ walkdir = {version = "2.4.0"} regex = {version = "1.9.5"} inquire = {version = "0.6.2"} sqlx = {version = "0.7.2", default-features = false, features = ["migrate", "macros", "postgres", "runtime-tokio", "tls-rustls", "chrono", "json", "uuid" ]} +chrono = {version = "0.4.31", features = ["serde"]} pretty_assertions = "1.4.0" \ No newline at end of file diff --git a/crates/crunch-codegen/src/lib.rs b/crates/crunch-codegen/src/lib.rs index b5e6619..7c800e4 100644 --- a/crates/crunch-codegen/src/lib.rs +++ b/crates/crunch-codegen/src/lib.rs @@ -93,9 +93,9 @@ impl Node { impl crunch::traits::Event for $(message) { fn event_info() -> ::crunch::traits::EventInfo { ::crunch::traits::EventInfo { - domain: "my-domain", - entity_type: "my-entity-type", - event_name: "my-event-name", + domain: "my-domain".into(), + entity_type: "my-entity-type".into(), + event_name: "my-event-name".into(), } } } diff --git a/crates/crunch-envelope/build.rs b/crates/crunch-envelope/build.rs index aef506d..e18cfb6 100644 --- a/crates/crunch-envelope/build.rs +++ b/crates/crunch-envelope/build.rs @@ -1,12 +1,15 @@ -extern crate capnpc; - fn main() { - capnpc::CompilerCommand::new() - .output_path("src/") - .src_prefix("schemas/") - .file("schemas/envelope.capnp") - .run() - .unwrap(); + #[cfg(feature = "capnp")] + { + extern crate capnpc; + + capnpc::CompilerCommand::new() + .output_path("src/") + .src_prefix("schemas/") + .file("schemas/envelope.capnp") + .run() + .unwrap(); + } std::fs::create_dir_all("src/generated").unwrap(); let mut config = prost_build::Config::default(); diff --git a/crates/crunch-in-memory/src/persistence.rs b/crates/crunch-in-memory/src/persistence.rs index 3f6e050..387b2a1 100644 --- a/crates/crunch-in-memory/src/persistence.rs +++ b/crates/crunch-in-memory/src/persistence.rs @@ -33,10 +33,11 @@ pub struct InMemoryPersistence { #[async_trait] impl crunch_traits::Persistence for InMemoryPersistence { async fn insert(&self, event_info: &EventInfo, content: Vec) -> anyhow::Result<()> { - let msg = crunch_envelope::proto::wrap(event_info.domain, event_info.entity_type, &content); + let msg = + crunch_envelope::proto::wrap(&event_info.domain, &event_info.entity_type, &content); let msg = Msg { id: uuid::Uuid::new_v4().to_string(), - info: *event_info, + info: event_info.to_owned(), msg, state: MsgState::Pending, }; @@ -71,7 +72,7 @@ impl crunch_traits::Persistence for InMemoryPersistence { let (content, _) = crunch_envelope::proto::unwrap(event.msg.as_slice()) .map_err(|e| PersistenceError::GetErr(anyhow::anyhow!(e)))?; - Ok(Some((event.info, content))) + Ok(Some((event.info.to_owned(), content))) } async fn update_published(&self, event_id: &str) -> Result<(), PersistenceError> { diff --git a/crates/crunch-in-memory/src/transport.rs b/crates/crunch-in-memory/src/transport.rs index 2f25987..b1e0069 100644 --- a/crates/crunch-in-memory/src/transport.rs +++ b/crates/crunch-in-memory/src/transport.rs @@ -64,7 +64,7 @@ impl Transport for InMemoryTransport { .expect("transport to be available, as we just created it"); sender .send(TransportEnvelope { - info: *event_info, + info: event_info.to_owned(), content, }) .map_err(|e| anyhow::anyhow!(e.to_string())) diff --git a/crates/crunch-postgres/Cargo.toml b/crates/crunch-postgres/Cargo.toml index 4447460..a1348fa 100644 --- a/crates/crunch-postgres/Cargo.toml +++ b/crates/crunch-postgres/Cargo.toml @@ -18,4 +18,5 @@ uuid.workspace = true sqlx.workspace = true serde.workspace = true serde_json.workspace = true -tokio-stream = {workspace = true, features = ["sync"]} \ No newline at end of file +tokio-stream = {workspace = true, features = ["sync"]} +chrono.workspace = true diff --git a/crates/crunch-postgres/src/lib.rs b/crates/crunch-postgres/src/lib.rs index afb4078..12940db 100644 --- a/crates/crunch-postgres/src/lib.rs +++ b/crates/crunch-postgres/src/lib.rs @@ -36,13 +36,15 @@ struct InsertResp { #[derive(Clone, Serialize, Deserialize)] struct PgEventInfo { - domain: &'static str, - entity_type: &'static str, - event_name: &'static str, + domain: String, + entity_type: String, + event_name: String, } impl From<&EventInfo> for PgEventInfo { fn from(value: &EventInfo) -> Self { + let value = value.to_owned(); + Self { domain: value.domain, entity_type: value.entity_type, @@ -51,6 +53,26 @@ impl From<&EventInfo> for PgEventInfo { } } +impl From for EventInfo { + fn from(value: PgEventInfo) -> Self { + EventInfo { + domain: value.domain, + entity_type: value.entity_type, + event_name: value.event_name, + } + } +} + +#[allow(dead_code)] +#[derive(sqlx::FromRow)] +struct OutboxEvent { + id: Uuid, + metadata: Json, + content: Vec, + inserted_time: chrono::DateTime, + state: String, +} + #[async_trait] impl crunch_traits::Persistence for PostgresPersistence { // FIXME: Need some sort of concurrency control mechanism. If the insert fails or is done twice, then that user may receive multiple requests. @@ -93,15 +115,30 @@ FOR UPDATE; .map_err(|e| anyhow::anyhow!(e)) .map_err(PersistenceError::AnyErr)?; - let id = match resp { - Some(InsertResp { id }) => Some(id.to_string()), - None => None, - }; + let id = resp.map(|InsertResp { id }| id.to_string()); Ok(id.map(|id| (id, Box::new(PostgresTx {}) as crunch_traits::DynTx))) } async fn get(&self, event_id: &str) -> Result)>, PersistenceError> { - todo!() + let event = sqlx::query_as::<_, OutboxEvent>("SELECT * from outbox where id = $1") + .bind( + Uuid::parse_str(event_id) + .map_err(|e| anyhow::anyhow!(e)) + .map_err(PersistenceError::GetErr)?, + ) + .fetch_optional(&self.pool) + .await + .map_err(|e| anyhow::anyhow!(e)) + .map_err(PersistenceError::GetErr)?; + + match event { + Some(event) => { + let metadata = event.metadata.to_owned(); + + Ok(Some((EventInfo::from(metadata.0), event.content))) + } + None => Ok(None), + } } async fn update_published(&self, event_id: &str) -> Result<(), PersistenceError> { todo!() diff --git a/crates/crunch-traits/src/lib.rs b/crates/crunch-traits/src/lib.rs index da4c3af..20c07ed 100644 --- a/crates/crunch-traits/src/lib.rs +++ b/crates/crunch-traits/src/lib.rs @@ -25,11 +25,11 @@ pub trait Deserializer { Self: Sized; } -#[derive(Debug, Clone, Copy)] +#[derive(Debug, Clone)] pub struct EventInfo { - pub domain: &'static str, - pub entity_type: &'static str, - pub event_name: &'static str, + pub domain: String, + pub entity_type: String, + pub event_name: String, } impl Display for EventInfo { diff --git a/crates/crunch/examples/basic.rs b/crates/crunch/examples/basic.rs index 2c94b55..47295c2 100644 --- a/crates/crunch/examples/basic.rs +++ b/crates/crunch/examples/basic.rs @@ -26,9 +26,9 @@ impl crunch::traits::Deserializer for SomeEvent { impl crunch::traits::Event for SomeEvent { fn event_info() -> crunch::traits::EventInfo { crunch::traits::EventInfo { - domain: "some-domain", - entity_type: "some-entity", - event_name: "some-event", + domain: "some-domain".into(), + entity_type: "some-entity".into(), + event_name: "some-event".into(), } } } diff --git a/crates/crunch/examples/counter.rs b/crates/crunch/examples/counter.rs index a137979..6a65b44 100644 --- a/crates/crunch/examples/counter.rs +++ b/crates/crunch/examples/counter.rs @@ -26,9 +26,9 @@ impl crunch::traits::Deserializer for SomeEvent { impl crunch::traits::Event for SomeEvent { fn event_info() -> crunch::traits::EventInfo { crunch::traits::EventInfo { - domain: "some-domain", - entity_type: "some-entity", - event_name: "some-event", + domain: "some-domain".into(), + entity_type: "some-entity".into(), + event_name: "some-event".into(), } } } diff --git a/crates/crunch/examples/nats.rs b/crates/crunch/examples/nats.rs index 919d58a..c3b1c2c 100644 --- a/crates/crunch/examples/nats.rs +++ b/crates/crunch/examples/nats.rs @@ -27,9 +27,9 @@ impl Deserializer for SomeEvent { impl Event for SomeEvent { fn event_info() -> EventInfo { EventInfo { - domain: "some-domain", - entity_type: "some-entity", - event_name: "some-event", + domain: "some-domain".into(), + entity_type: "some-entity".into(), + event_name: "some-event".into(), } } } diff --git a/examples/basic-setup/src/gencrunch/mod.rs b/examples/basic-setup/src/gencrunch/mod.rs index c3bf297..5f3ef86 100644 --- a/examples/basic-setup/src/gencrunch/mod.rs +++ b/examples/basic-setup/src/gencrunch/mod.rs @@ -1,93 +1,3 @@ -pub mod basic { - pub mod includes { - pub mod my_include { - use prost::Message; - include!("basic.includes.my_include.rs"); - impl ::crunch::traits::Serializer for MyInclude { - fn serialize(&self) -> Result, ::crunch::errors::SerializeError> { - Ok(self.encode_to_vec()) - } - } - impl ::crunch::traits::Deserializer for MyInclude { - fn deserialize(raw: Vec) -> Result - where - Self: Sized, - { - let output = Self::decode(raw.as_slice()) - .map_err(|e| ::crunch::errors::DeserializeError::ProtoErr(e))?; - Ok(output) - } - } - - impl crunch::traits::Event for MyInclude { - fn event_info() -> ::crunch::traits::EventInfo { - ::crunch::traits::EventInfo { - domain: "my-domain", - entity_type: "my-entity-type", - event_name: "my-event-name", - } - } - } - } - } - pub mod my_event { - use prost::Message; - include!("basic.my_event.rs"); - impl ::crunch::traits::Serializer for MyEvent { - fn serialize(&self) -> Result, ::crunch::errors::SerializeError> { - Ok(self.encode_to_vec()) - } - } - impl ::crunch::traits::Deserializer for MyEvent { - fn deserialize(raw: Vec) -> Result - where - Self: Sized, - { - let output = Self::decode(raw.as_slice()) - .map_err(|e| ::crunch::errors::DeserializeError::ProtoErr(e))?; - Ok(output) - } - } - - impl crunch::traits::Event for MyEvent { - fn event_info() -> ::crunch::traits::EventInfo { - ::crunch::traits::EventInfo { - domain: "my-domain", - entity_type: "my-entity-type", - event_name: "my-event-name", - } - } - } - } -} -pub mod examples { - pub mod example { - use prost::Message; - include!("examples.example.rs"); - impl ::crunch::traits::Serializer for MyEvent { - fn serialize(&self) -> Result, ::crunch::errors::SerializeError> { - Ok(self.encode_to_vec()) - } - } - impl ::crunch::traits::Deserializer for MyEvent { - fn deserialize(raw: Vec) -> Result - where - Self: Sized, - { - let output = Self::decode(raw.as_slice()) - .map_err(|e| ::crunch::errors::DeserializeError::ProtoErr(e))?; - Ok(output) - } - } - - impl crunch::traits::Event for MyEvent { - fn event_info() -> ::crunch::traits::EventInfo { - ::crunch::traits::EventInfo { - domain: "my-domain", - entity_type: "my-entity-type", - event_name: "my-event-name", - } - } - } - } -} +pub mod basic { pub mod includes { pub mod my_include { use prost::Message; include!("basic.includes.my_include.rs"); impl ::crunch::traits::Serializer for MyInclude { fn serialize(&self) -> Result, ::crunch::errors::SerializeError> { Ok(self.encode_to_vec()) } } impl ::crunch::traits::Deserializer for MyInclude { fn deserialize(raw: Vec) -> Result where Self: Sized, { let output = Self::decode(raw.as_slice()).map_err(|e| ::crunch::errors::DeserializeError::ProtoErr(e))?; Ok(output) } } impl crunch::traits::Event for MyInclude { fn event_info() -> ::crunch::traits::EventInfo { ::crunch::traits::EventInfo { domain: "my-domain".into(), entity_type: "my-entity-type".into(), event_name: "my-event-name".into(), } } } } } +pub mod my_event { use prost::Message; include!("basic.my_event.rs"); impl ::crunch::traits::Serializer for MyEvent { fn serialize(&self) -> Result, ::crunch::errors::SerializeError> { Ok(self.encode_to_vec()) } } impl ::crunch::traits::Deserializer for MyEvent { fn deserialize(raw: Vec) -> Result where Self: Sized, { let output = Self::decode(raw.as_slice()).map_err(|e| ::crunch::errors::DeserializeError::ProtoErr(e))?; Ok(output) } } impl crunch::traits::Event for MyEvent { fn event_info() -> ::crunch::traits::EventInfo { ::crunch::traits::EventInfo { domain: "my-domain".into(), entity_type: "my-entity-type".into(), event_name: "my-event-name".into(), } } } } } +pub mod examples { pub mod example { use prost::Message; include!("examples.example.rs"); impl ::crunch::traits::Serializer for MyEvent { fn serialize(&self) -> Result, ::crunch::errors::SerializeError> { Ok(self.encode_to_vec()) } } impl ::crunch::traits::Deserializer for MyEvent { fn deserialize(raw: Vec) -> Result where Self: Sized, { let output = Self::decode(raw.as_slice()).map_err(|e| ::crunch::errors::DeserializeError::ProtoErr(e))?; Ok(output) } } impl crunch::traits::Event for MyEvent { fn event_info() -> ::crunch::traits::EventInfo { ::crunch::traits::EventInfo { domain: "my-domain".into(), entity_type: "my-entity-type".into(), event_name: "my-event-name".into(), } } } } }