feat: with String EventInfo

Signed-off-by: kjuulh <contact@kjuulh.io>
This commit is contained in:
Kasper Juul Hermansen 2023-10-05 22:09:56 +02:00
parent bc28451f8d
commit 4a88d2fecd
Signed by: kjuulh
GPG Key ID: 57B6E1465221F912
13 changed files with 89 additions and 131 deletions

5
Cargo.lock generated
View File

@ -345,7 +345,10 @@ checksum = "7f2c685bad3eb3d45a01354cedb7d5faa66194d1d58ba6e267a8de788f79db38"
dependencies = [ dependencies = [
"android-tzdata", "android-tzdata",
"iana-time-zone", "iana-time-zone",
"js-sys",
"num-traits", "num-traits",
"serde",
"wasm-bindgen",
"windows-targets", "windows-targets",
] ]
@ -722,6 +725,7 @@ version = "0.1.0"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"async-trait", "async-trait",
"chrono",
"crunch-envelope", "crunch-envelope",
"crunch-traits", "crunch-traits",
"futures", "futures",
@ -2955,6 +2959,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "79daa5ed5740825c40b389c5e50312b9c86df53fccd33f281df655642b43869d" checksum = "79daa5ed5740825c40b389c5e50312b9c86df53fccd33f281df655642b43869d"
dependencies = [ dependencies = [
"getrandom", "getrandom",
"serde",
] ]
[[package]] [[package]]

View File

@ -22,7 +22,7 @@ tracing = { version = "0.1", features = ["log"] }
tracing-subscriber = "0.3.17" tracing-subscriber = "0.3.17"
thiserror = {version = "1.0.48"} thiserror = {version = "1.0.48"}
async-trait = "0.1.73" async-trait = "0.1.73"
uuid = { version = "1.4.1", features = ["v4"]} uuid = { version = "1.4.1", features = ["v4", "serde"]}
futures = "0.3.28" futures = "0.3.28"
nats = "0.24.0" nats = "0.24.0"
clap = {version = "4.4.5", features = ["derive"]} clap = {version = "4.4.5", features = ["derive"]}
@ -39,5 +39,6 @@ walkdir = {version = "2.4.0"}
regex = {version = "1.9.5"} regex = {version = "1.9.5"}
inquire = {version = "0.6.2"} inquire = {version = "0.6.2"}
sqlx = {version = "0.7.2", default-features = false, features = ["migrate", "macros", "postgres", "runtime-tokio", "tls-rustls", "chrono", "json", "uuid" ]} sqlx = {version = "0.7.2", default-features = false, features = ["migrate", "macros", "postgres", "runtime-tokio", "tls-rustls", "chrono", "json", "uuid" ]}
chrono = {version = "0.4.31", features = ["serde"]}
pretty_assertions = "1.4.0" pretty_assertions = "1.4.0"

View File

@ -93,9 +93,9 @@ impl Node {
impl crunch::traits::Event for $(message) { impl crunch::traits::Event for $(message) {
fn event_info() -> ::crunch::traits::EventInfo { fn event_info() -> ::crunch::traits::EventInfo {
::crunch::traits::EventInfo { ::crunch::traits::EventInfo {
domain: "my-domain", domain: "my-domain".into(),
entity_type: "my-entity-type", entity_type: "my-entity-type".into(),
event_name: "my-event-name", event_name: "my-event-name".into(),
} }
} }
} }

View File

@ -1,12 +1,15 @@
extern crate capnpc;
fn main() { fn main() {
capnpc::CompilerCommand::new() #[cfg(feature = "capnp")]
.output_path("src/") {
.src_prefix("schemas/") extern crate capnpc;
.file("schemas/envelope.capnp")
.run() capnpc::CompilerCommand::new()
.unwrap(); .output_path("src/")
.src_prefix("schemas/")
.file("schemas/envelope.capnp")
.run()
.unwrap();
}
std::fs::create_dir_all("src/generated").unwrap(); std::fs::create_dir_all("src/generated").unwrap();
let mut config = prost_build::Config::default(); let mut config = prost_build::Config::default();

View File

@ -33,10 +33,11 @@ pub struct InMemoryPersistence {
#[async_trait] #[async_trait]
impl crunch_traits::Persistence for InMemoryPersistence { impl crunch_traits::Persistence for InMemoryPersistence {
async fn insert(&self, event_info: &EventInfo, content: Vec<u8>) -> anyhow::Result<()> { async fn insert(&self, event_info: &EventInfo, content: Vec<u8>) -> anyhow::Result<()> {
let msg = crunch_envelope::proto::wrap(event_info.domain, event_info.entity_type, &content); let msg =
crunch_envelope::proto::wrap(&event_info.domain, &event_info.entity_type, &content);
let msg = Msg { let msg = Msg {
id: uuid::Uuid::new_v4().to_string(), id: uuid::Uuid::new_v4().to_string(),
info: *event_info, info: event_info.to_owned(),
msg, msg,
state: MsgState::Pending, state: MsgState::Pending,
}; };
@ -71,7 +72,7 @@ impl crunch_traits::Persistence for InMemoryPersistence {
let (content, _) = crunch_envelope::proto::unwrap(event.msg.as_slice()) let (content, _) = crunch_envelope::proto::unwrap(event.msg.as_slice())
.map_err(|e| PersistenceError::GetErr(anyhow::anyhow!(e)))?; .map_err(|e| PersistenceError::GetErr(anyhow::anyhow!(e)))?;
Ok(Some((event.info, content))) Ok(Some((event.info.to_owned(), content)))
} }
async fn update_published(&self, event_id: &str) -> Result<(), PersistenceError> { async fn update_published(&self, event_id: &str) -> Result<(), PersistenceError> {

View File

@ -64,7 +64,7 @@ impl Transport for InMemoryTransport {
.expect("transport to be available, as we just created it"); .expect("transport to be available, as we just created it");
sender sender
.send(TransportEnvelope { .send(TransportEnvelope {
info: *event_info, info: event_info.to_owned(),
content, content,
}) })
.map_err(|e| anyhow::anyhow!(e.to_string())) .map_err(|e| anyhow::anyhow!(e.to_string()))

View File

@ -18,4 +18,5 @@ uuid.workspace = true
sqlx.workspace = true sqlx.workspace = true
serde.workspace = true serde.workspace = true
serde_json.workspace = true serde_json.workspace = true
tokio-stream = {workspace = true, features = ["sync"]} tokio-stream = {workspace = true, features = ["sync"]}
chrono.workspace = true

View File

@ -36,13 +36,15 @@ struct InsertResp {
#[derive(Clone, Serialize, Deserialize)] #[derive(Clone, Serialize, Deserialize)]
struct PgEventInfo { struct PgEventInfo {
domain: &'static str, domain: String,
entity_type: &'static str, entity_type: String,
event_name: &'static str, event_name: String,
} }
impl From<&EventInfo> for PgEventInfo { impl From<&EventInfo> for PgEventInfo {
fn from(value: &EventInfo) -> Self { fn from(value: &EventInfo) -> Self {
let value = value.to_owned();
Self { Self {
domain: value.domain, domain: value.domain,
entity_type: value.entity_type, entity_type: value.entity_type,
@ -51,6 +53,26 @@ impl From<&EventInfo> for PgEventInfo {
} }
} }
impl From<PgEventInfo> for EventInfo {
fn from(value: PgEventInfo) -> Self {
EventInfo {
domain: value.domain,
entity_type: value.entity_type,
event_name: value.event_name,
}
}
}
#[allow(dead_code)]
#[derive(sqlx::FromRow)]
struct OutboxEvent {
id: Uuid,
metadata: Json<PgEventInfo>,
content: Vec<u8>,
inserted_time: chrono::DateTime<chrono::Utc>,
state: String,
}
#[async_trait] #[async_trait]
impl crunch_traits::Persistence for PostgresPersistence { impl crunch_traits::Persistence for PostgresPersistence {
// FIXME: Need some sort of concurrency control mechanism. If the insert fails or is done twice, then that user may receive multiple requests. // FIXME: Need some sort of concurrency control mechanism. If the insert fails or is done twice, then that user may receive multiple requests.
@ -93,15 +115,30 @@ FOR UPDATE;
.map_err(|e| anyhow::anyhow!(e)) .map_err(|e| anyhow::anyhow!(e))
.map_err(PersistenceError::AnyErr)?; .map_err(PersistenceError::AnyErr)?;
let id = match resp { let id = resp.map(|InsertResp { id }| id.to_string());
Some(InsertResp { id }) => Some(id.to_string()),
None => None,
};
Ok(id.map(|id| (id, Box::new(PostgresTx {}) as crunch_traits::DynTx))) Ok(id.map(|id| (id, Box::new(PostgresTx {}) as crunch_traits::DynTx)))
} }
async fn get(&self, event_id: &str) -> Result<Option<(EventInfo, Vec<u8>)>, PersistenceError> { async fn get(&self, event_id: &str) -> Result<Option<(EventInfo, Vec<u8>)>, PersistenceError> {
todo!() let event = sqlx::query_as::<_, OutboxEvent>("SELECT * from outbox where id = $1")
.bind(
Uuid::parse_str(event_id)
.map_err(|e| anyhow::anyhow!(e))
.map_err(PersistenceError::GetErr)?,
)
.fetch_optional(&self.pool)
.await
.map_err(|e| anyhow::anyhow!(e))
.map_err(PersistenceError::GetErr)?;
match event {
Some(event) => {
let metadata = event.metadata.to_owned();
Ok(Some((EventInfo::from(metadata.0), event.content)))
}
None => Ok(None),
}
} }
async fn update_published(&self, event_id: &str) -> Result<(), PersistenceError> { async fn update_published(&self, event_id: &str) -> Result<(), PersistenceError> {
todo!() todo!()

View File

@ -25,11 +25,11 @@ pub trait Deserializer {
Self: Sized; Self: Sized;
} }
#[derive(Debug, Clone, Copy)] #[derive(Debug, Clone)]
pub struct EventInfo { pub struct EventInfo {
pub domain: &'static str, pub domain: String,
pub entity_type: &'static str, pub entity_type: String,
pub event_name: &'static str, pub event_name: String,
} }
impl Display for EventInfo { impl Display for EventInfo {

View File

@ -26,9 +26,9 @@ impl crunch::traits::Deserializer for SomeEvent {
impl crunch::traits::Event for SomeEvent { impl crunch::traits::Event for SomeEvent {
fn event_info() -> crunch::traits::EventInfo { fn event_info() -> crunch::traits::EventInfo {
crunch::traits::EventInfo { crunch::traits::EventInfo {
domain: "some-domain", domain: "some-domain".into(),
entity_type: "some-entity", entity_type: "some-entity".into(),
event_name: "some-event", event_name: "some-event".into(),
} }
} }
} }

View File

@ -26,9 +26,9 @@ impl crunch::traits::Deserializer for SomeEvent {
impl crunch::traits::Event for SomeEvent { impl crunch::traits::Event for SomeEvent {
fn event_info() -> crunch::traits::EventInfo { fn event_info() -> crunch::traits::EventInfo {
crunch::traits::EventInfo { crunch::traits::EventInfo {
domain: "some-domain", domain: "some-domain".into(),
entity_type: "some-entity", entity_type: "some-entity".into(),
event_name: "some-event", event_name: "some-event".into(),
} }
} }
} }

View File

@ -27,9 +27,9 @@ impl Deserializer for SomeEvent {
impl Event for SomeEvent { impl Event for SomeEvent {
fn event_info() -> EventInfo { fn event_info() -> EventInfo {
EventInfo { EventInfo {
domain: "some-domain", domain: "some-domain".into(),
entity_type: "some-entity", entity_type: "some-entity".into(),
event_name: "some-event", event_name: "some-event".into(),
} }
} }
} }

View File

@ -1,93 +1,3 @@
pub mod basic { pub mod basic { pub mod includes { pub mod my_include { use prost::Message; include!("basic.includes.my_include.rs"); impl ::crunch::traits::Serializer for MyInclude { fn serialize(&self) -> Result<Vec<u8>, ::crunch::errors::SerializeError> { Ok(self.encode_to_vec()) } } impl ::crunch::traits::Deserializer for MyInclude { fn deserialize(raw: Vec<u8>) -> Result<Self, ::crunch::errors::DeserializeError> where Self: Sized, { let output = Self::decode(raw.as_slice()).map_err(|e| ::crunch::errors::DeserializeError::ProtoErr(e))?; Ok(output) } } impl crunch::traits::Event for MyInclude { fn event_info() -> ::crunch::traits::EventInfo { ::crunch::traits::EventInfo { domain: "my-domain".into(), entity_type: "my-entity-type".into(), event_name: "my-event-name".into(), } } } } }
pub mod includes { pub mod my_event { use prost::Message; include!("basic.my_event.rs"); impl ::crunch::traits::Serializer for MyEvent { fn serialize(&self) -> Result<Vec<u8>, ::crunch::errors::SerializeError> { Ok(self.encode_to_vec()) } } impl ::crunch::traits::Deserializer for MyEvent { fn deserialize(raw: Vec<u8>) -> Result<Self, ::crunch::errors::DeserializeError> where Self: Sized, { let output = Self::decode(raw.as_slice()).map_err(|e| ::crunch::errors::DeserializeError::ProtoErr(e))?; Ok(output) } } impl crunch::traits::Event for MyEvent { fn event_info() -> ::crunch::traits::EventInfo { ::crunch::traits::EventInfo { domain: "my-domain".into(), entity_type: "my-entity-type".into(), event_name: "my-event-name".into(), } } } } }
pub mod my_include { pub mod examples { pub mod example { use prost::Message; include!("examples.example.rs"); impl ::crunch::traits::Serializer for MyEvent { fn serialize(&self) -> Result<Vec<u8>, ::crunch::errors::SerializeError> { Ok(self.encode_to_vec()) } } impl ::crunch::traits::Deserializer for MyEvent { fn deserialize(raw: Vec<u8>) -> Result<Self, ::crunch::errors::DeserializeError> where Self: Sized, { let output = Self::decode(raw.as_slice()).map_err(|e| ::crunch::errors::DeserializeError::ProtoErr(e))?; Ok(output) } } impl crunch::traits::Event for MyEvent { fn event_info() -> ::crunch::traits::EventInfo { ::crunch::traits::EventInfo { domain: "my-domain".into(), entity_type: "my-entity-type".into(), event_name: "my-event-name".into(), } } } } }
use prost::Message;
include!("basic.includes.my_include.rs");
impl ::crunch::traits::Serializer for MyInclude {
fn serialize(&self) -> Result<Vec<u8>, ::crunch::errors::SerializeError> {
Ok(self.encode_to_vec())
}
}
impl ::crunch::traits::Deserializer for MyInclude {
fn deserialize(raw: Vec<u8>) -> Result<Self, ::crunch::errors::DeserializeError>
where
Self: Sized,
{
let output = Self::decode(raw.as_slice())
.map_err(|e| ::crunch::errors::DeserializeError::ProtoErr(e))?;
Ok(output)
}
}
impl crunch::traits::Event for MyInclude {
fn event_info() -> ::crunch::traits::EventInfo {
::crunch::traits::EventInfo {
domain: "my-domain",
entity_type: "my-entity-type",
event_name: "my-event-name",
}
}
}
}
}
pub mod my_event {
use prost::Message;
include!("basic.my_event.rs");
impl ::crunch::traits::Serializer for MyEvent {
fn serialize(&self) -> Result<Vec<u8>, ::crunch::errors::SerializeError> {
Ok(self.encode_to_vec())
}
}
impl ::crunch::traits::Deserializer for MyEvent {
fn deserialize(raw: Vec<u8>) -> Result<Self, ::crunch::errors::DeserializeError>
where
Self: Sized,
{
let output = Self::decode(raw.as_slice())
.map_err(|e| ::crunch::errors::DeserializeError::ProtoErr(e))?;
Ok(output)
}
}
impl crunch::traits::Event for MyEvent {
fn event_info() -> ::crunch::traits::EventInfo {
::crunch::traits::EventInfo {
domain: "my-domain",
entity_type: "my-entity-type",
event_name: "my-event-name",
}
}
}
}
}
pub mod examples {
pub mod example {
use prost::Message;
include!("examples.example.rs");
impl ::crunch::traits::Serializer for MyEvent {
fn serialize(&self) -> Result<Vec<u8>, ::crunch::errors::SerializeError> {
Ok(self.encode_to_vec())
}
}
impl ::crunch::traits::Deserializer for MyEvent {
fn deserialize(raw: Vec<u8>) -> Result<Self, ::crunch::errors::DeserializeError>
where
Self: Sized,
{
let output = Self::decode(raw.as_slice())
.map_err(|e| ::crunch::errors::DeserializeError::ProtoErr(e))?;
Ok(output)
}
}
impl crunch::traits::Event for MyEvent {
fn event_info() -> ::crunch::traits::EventInfo {
::crunch::traits::EventInfo {
domain: "my-domain",
entity_type: "my-entity-type",
event_name: "my-event-name",
}
}
}
}
}