diff --git a/Cargo.lock b/Cargo.lock index e36d17b..9ce8988 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1109,6 +1109,7 @@ dependencies = [ "serde", "sqlx", "tokio", + "tokio-stream", "tokio-util", "tonic", "tower-http", diff --git a/crates/nodata/Cargo.toml b/crates/nodata/Cargo.toml index f037736..e3449bb 100644 --- a/crates/nodata/Cargo.toml +++ b/crates/nodata/Cargo.toml @@ -29,6 +29,7 @@ bytes = "1.7.1" prost = "0.13.1" prost-types = "0.13.1" chrono = { version = "0.4.38", features = ["serde"] } +tokio-stream = "0.1.15" [dev-dependencies] tracing-test = "0.2.5" diff --git a/crates/nodata/src/gen/nodata.v1.rs b/crates/nodata/src/gen/nodata.v1.rs index 625a76f..8624c76 100644 --- a/crates/nodata/src/gen/nodata.v1.rs +++ b/crates/nodata/src/gen/nodata.v1.rs @@ -2,42 +2,60 @@ #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct PublishEventRequest { - #[prost(string, tag="1")] + #[prost(string, tag = "1")] pub topic: ::prost::alloc::string::String, - #[prost(message, optional, tag="2")] + #[prost(message, optional, tag = "2")] pub published: ::core::option::Option<::prost_types::Timestamp>, - #[prost(string, tag="3")] + #[prost(string, tag = "3")] pub key: ::prost::alloc::string::String, - #[prost(bytes="vec", tag="4")] + #[prost(bytes = "vec", tag = "4")] pub value: ::prost::alloc::vec::Vec, - #[prost(string, optional, tag="5")] + #[prost(string, optional, tag = "5")] pub id: ::core::option::Option<::prost::alloc::string::String>, } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] -pub struct PublishEventResponse { -} +pub struct PublishEventResponse {} #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] -pub struct GetTopicsRequest { -} +pub struct GetTopicsRequest {} #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct GetTopicsResponse { - #[prost(string, repeated, tag="1")] + #[prost(string, repeated, tag = "1")] pub topics: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct GetKeysRequest { - #[prost(string, tag="1")] + #[prost(string, tag = "1")] pub topic: ::prost::alloc::string::String, } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct GetKeysResponse { - #[prost(string, repeated, tag="1")] + #[prost(string, repeated, tag = "1")] pub keys: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, } +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct SubscribeRequest { + #[prost(string, tag = "1")] + pub topic: ::prost::alloc::string::String, + #[prost(string, tag = "2")] + pub key: ::prost::alloc::string::String, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct SubscribeResponse { + #[prost(string, tag = "1")] + pub id: ::prost::alloc::string::String, + #[prost(message, optional, tag = "2")] + pub published: ::core::option::Option<::prost_types::Timestamp>, + #[prost(uint64, tag = "3")] + pub offset: u64, + #[prost(bytes = "vec", tag = "4")] + pub value: ::prost::alloc::vec::Vec, +} include!("nodata.v1.tonic.rs"); -// @@protoc_insertion_point(module) \ No newline at end of file +// @@protoc_insertion_point(module) diff --git a/crates/nodata/src/gen/nodata.v1.tonic.rs b/crates/nodata/src/gen/nodata.v1.tonic.rs index 4542e66..7546240 100644 --- a/crates/nodata/src/gen/nodata.v1.tonic.rs +++ b/crates/nodata/src/gen/nodata.v1.tonic.rs @@ -159,6 +159,31 @@ pub mod no_data_service_client { .insert(GrpcMethod::new("nodata.v1.NoDataService", "GetKeys")); self.inner.unary(req, path, codec).await } + pub async fn subscribe( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result< + tonic::Response>, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/nodata.v1.NoDataService/Subscribe", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert(GrpcMethod::new("nodata.v1.NoDataService", "Subscribe")); + self.inner.server_streaming(req, path, codec).await + } } } /// Generated server implementations. @@ -186,6 +211,16 @@ pub mod no_data_service_server { &self, request: tonic::Request, ) -> std::result::Result, tonic::Status>; + /// Server streaming response type for the Subscribe method. + type SubscribeStream: tonic::codegen::tokio_stream::Stream< + Item = std::result::Result, + > + + Send + + 'static; + async fn subscribe( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status>; } #[derive(Debug)] pub struct NoDataServiceServer { @@ -404,6 +439,53 @@ pub mod no_data_service_server { }; Box::pin(fut) } + "/nodata.v1.NoDataService/Subscribe" => { + #[allow(non_camel_case_types)] + struct SubscribeSvc(pub Arc); + impl< + T: NoDataService, + > tonic::server::ServerStreamingService + for SubscribeSvc { + type Response = super::SubscribeResponse; + type ResponseStream = T::SubscribeStream; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::subscribe(&inner, request).await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let inner = inner.0; + let method = SubscribeSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.server_streaming(method, req).await; + Ok(res) + }; + Box::pin(fut) + } _ => { Box::pin(async move { Ok( diff --git a/crates/nodata/src/grpc.rs b/crates/nodata/src/grpc.rs index 11ec0c3..e4a1449 100644 --- a/crates/nodata/src/grpc.rs +++ b/crates/nodata/src/grpc.rs @@ -1,10 +1,16 @@ -use std::net::SocketAddr; +use std::{net::SocketAddr, pin::Pin}; use anyhow::Context; use mad::Component; +use tokio_stream::wrappers::ReceiverStream; +use tonic::Response; +use uuid::Uuid; use crate::{ - services::{ingest::IngestState, staging::StagingEvent}, + services::{ + consumers::ConsumersState, handler::HandlerState, ingest::IngestState, + staging::StagingEvent, + }, state::SharedState, }; @@ -25,6 +31,9 @@ impl GrpcServer { } } +type ResponseStream = + Pin> + Send>>; + #[tonic::async_trait] impl no_data_service_server::NoDataService for GrpcServer { async fn publish_event( @@ -74,6 +83,56 @@ impl no_data_service_server::NoDataService for GrpcServer { Ok(tonic::Response::new(GetKeysResponse { keys })) } + + type SubscribeStream = ResponseStream; + + async fn subscribe( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status> { + let req = request.into_inner(); + + let id = Uuid::new_v4().to_string(); + let index = Uuid::new_v4().to_string(); + + self.state + .consumers() + .add_consumer(&id, &index, req.topic) + .await + .map_err(|e| { + tracing::warn!(error = e.to_string(), "failed to add consumer"); + tonic::Status::internal(e.to_string()) + })?; + + let consumer = self + .state + .consumers + .get_consumer(&id, &index) + .await + .unwrap(); + + let handler = self.state.handler(); + let (tx, rx) = tokio::sync::mpsc::channel(128); + tokio::spawn(async move { + let mut event_stream = consumer.rx.lock().await; + while let Some(msg) = event_stream.recv().await { + if let Err(e) = tx + .send(Ok(SubscribeResponse { + id: msg.id.unwrap_or_default(), + published: Some(chrono_to_prost_timestamp(msg.published)), + offset: 0, + value: msg.value, + })) + .await + { + tracing::warn!(error = e.to_string(), "failed to send event"); + }; + } + }); + + let stream = ReceiverStream::new(rx); + Ok(Response::new(Box::pin(stream) as Self::SubscribeStream)) + } } impl From for StagingEvent { @@ -82,10 +141,26 @@ impl From for StagingEvent { topic: value.topic, id: value.id, key: value.key, + published: value + .published + .and_then(prost_timestamp_to_chrono) + .unwrap_or_default(), + value: value.value, } } } +fn prost_timestamp_to_chrono(ts: prost_types::Timestamp) -> Option> { + chrono::DateTime::::from_timestamp(ts.seconds, ts.nanos as u32) +} + +fn chrono_to_prost_timestamp(dt: chrono::DateTime) -> prost_types::Timestamp { + prost_types::Timestamp { + seconds: dt.timestamp(), + nanos: dt.timestamp_subsec_nanos() as i32, + } +} + #[axum::async_trait] impl Component for GrpcServer { fn name(&self) -> Option { diff --git a/crates/nodata/src/main.rs b/crates/nodata/src/main.rs index 51026ba..ed18bf9 100644 --- a/crates/nodata/src/main.rs +++ b/crates/nodata/src/main.rs @@ -8,7 +8,7 @@ use std::net::SocketAddr; use chrono::{Datelike, Timelike}; use clap::{Parser, Subcommand}; -use grpc::{GetKeysRequest, GetTopicsRequest, GrpcServer, PublishEventRequest}; +use grpc::{GetKeysRequest, GetTopicsRequest, GrpcServer, PublishEventRequest, SubscribeRequest}; use http::HttpServer; use mad::Mad; use state::SharedState; @@ -31,13 +31,13 @@ enum Commands { }, Client { - #[arg(env = "SERVICE_HOST", long, default_value = "http://127.0.0.1:3000")] - host: String, - #[arg( - env = "GRPC_SERVICE_HOST", - long, - default_value = "http://127.0.0.1:7900" - )] + // #[arg(env = "SERVICE_HOST", long, default_value = "http://127.0.0.1:3000")] + // host: String, + // #[arg( + // env = "GRPC_SERVICE_HOST", + // long, + // default_value = "http://127.0.0.1:7900" + // )] grpc_host: String, #[command(subcommand)] @@ -64,6 +64,12 @@ enum ClientCommands { #[arg(long)] topic: String, }, + SubscribeTopic { + #[arg(long)] + topic: String, + #[arg(long)] + key: String, + }, } #[tokio::main] @@ -84,7 +90,6 @@ async fn main() -> anyhow::Result<()> { } Commands::Client { commands, - host, grpc_host, } => match commands { ClientCommands::PublishEvent { @@ -146,6 +151,23 @@ async fn main() -> anyhow::Result<()> { println!("{key}"); } } + ClientCommands::SubscribeTopic { topic, key } => { + let mut client = create_client(grpc_host).await?; + + println!("listening for events in topic: {}", topic); + + let resp = client.subscribe(SubscribeRequest { topic, key }).await?; + + let mut stream = resp.into_inner(); + while let Some(msg) = stream.message().await? { + println!( + "msg (id={}, published={}): {}", + msg.id, + msg.published.unwrap_or_default(), + std::str::from_utf8(&msg.value).unwrap_or_default(), + ) + } + } }, } diff --git a/crates/nodata/src/services.rs b/crates/nodata/src/services.rs index 2379cf5..997d5ec 100644 --- a/crates/nodata/src/services.rs +++ b/crates/nodata/src/services.rs @@ -16,11 +16,8 @@ pub mod ingest { } impl Ingest { - pub fn new(staging: Staging, consumer: Consumers) -> Self { - Self { - staging, - consumers: consumer, - } + pub fn new(staging: Staging, consumers: Consumers) -> Self { + Self { staging, consumers } } pub async fn publish(&self, event: impl Into) -> anyhow::Result<()> { @@ -35,7 +32,7 @@ pub mod ingest { }; let offset = self.staging.publish(event).await?; - self.consumers.notify_update(topic, key, offset)?; + self.consumers.notify_update(topic, key, offset).await?; Ok(()) } diff --git a/crates/nodata/src/services/consumers.rs b/crates/nodata/src/services/consumers.rs index 1a3cb26..3b783c2 100644 --- a/crates/nodata/src/services/consumers.rs +++ b/crates/nodata/src/services/consumers.rs @@ -1,11 +1,10 @@ -use std::{ - collections::BTreeMap, - sync::{Arc, RwLock}, -}; +use std::{collections::BTreeMap, sync::Arc}; + +use tokio::sync::RwLock; use crate::state::SharedState; -use super::handler::Handler; +use super::{handler::Handler, staging::StagingEvent}; pub type ConsumerId = String; pub type ConsumerIndex = String; @@ -14,9 +13,36 @@ pub type PartitionKey = String; pub type TopicOffset = usize; // Consumer will at some point contain an interface to send events over a channel to a lib -#[derive(Default, Clone, Debug, PartialEq, Eq)] +#[derive(Debug, Clone)] pub struct Consumer { pub offset: TopicOffset, + + pub tx: tokio::sync::mpsc::Sender, + pub rx: Arc>>, +} + +impl Default for Consumer { + fn default() -> Self { + Self::new(0) + } +} + +impl Consumer { + pub fn new(offset: usize) -> Self { + let (tx, rx) = tokio::sync::mpsc::channel(128); + + Self { + offset, + tx, + rx: Arc::new(tokio::sync::Mutex::new(rx)), + } + } +} + +impl PartialEq for Consumer { + fn eq(&self, other: &Self) -> bool { + self.offset == other.offset + } } #[derive(Clone)] @@ -40,7 +66,7 @@ impl Consumers { } } - pub fn add_consumer( + pub async fn add_consumer( &self, id: impl Into, index: impl Into, @@ -51,7 +77,7 @@ impl Consumers { let topic = topic.into(); { - let mut storage = self.storage.write().unwrap(); + let mut storage = self.storage.write().await; if !storage.contains_key(&id) { storage.insert(id.clone(), BTreeMap::default()); @@ -64,7 +90,7 @@ impl Consumers { } { - let mut subscriptions = self.subscriptions.write().unwrap(); + let mut subscriptions = self.subscriptions.write().await; if !subscriptions.contains_key(&topic) { subscriptions.insert(topic.clone(), Vec::default()); } @@ -78,19 +104,19 @@ impl Consumers { Ok(()) } - pub fn get_consumer( + pub async fn get_consumer( &self, id: impl Into, index: impl Into, ) -> Option { - let storage = self.storage.read().unwrap(); + let storage = self.storage.read().await; let consumer_group = storage.get(&id.into())?; let consumer = consumer_group.get(&index.into())?; Some(consumer.to_owned()) } - pub fn notify_update( + pub async fn notify_update( &self, topic: impl Into, key: Option>, @@ -98,9 +124,9 @@ impl Consumers { ) -> anyhow::Result<()> { let topic = topic.into(); let offset = offset.into(); - let key = key.and_then(|k| Some(k.into())); + let key = key.map(|k| k.into()); - let subscriptions = self.subscriptions.read().unwrap(); + let subscriptions = self.subscriptions.read().await; let subscription = match subscriptions.get(&topic) { Some(s) => s, None => { @@ -109,14 +135,15 @@ impl Consumers { } }; - let mut storage = self.storage.write().unwrap(); + let mut storage = self.storage.write().await; for consumer_id in subscription { match storage.get_mut(consumer_id) { Some(consumer_groups) => { for consumer in consumer_groups.values_mut() { // TODO: Implement retry logic, etc. self.handler - .handle_offset(&topic, key.as_ref(), consumer, offset)?; + .handle_offset(&topic, key.as_ref(), consumer, offset) + .await?; consumer.offset = offset; } } @@ -152,18 +179,20 @@ mod test { use super::*; - #[test] - fn can_add_consumer() -> anyhow::Result<()> { + #[tokio::test] + async fn can_add_consumer() -> anyhow::Result<()> { let consumer_id = "some-consumer-id"; let consumer_index = "some-consumer-index"; let topic = "some-topic"; let consumers = Consumers::new(Handler::new(Staging::default())); - consumers.add_consumer(consumer_id, consumer_index, topic)?; - let consumer = consumers.get_consumer(consumer_id, consumer_index); + consumers + .add_consumer(consumer_id, consumer_index, topic) + .await?; + let consumer = consumers.get_consumer(consumer_id, consumer_index).await; - assert_eq!(Some(Consumer { offset: 0 }), consumer); + assert_eq!(Some(Consumer::default()), consumer); Ok(()) } @@ -184,6 +213,8 @@ mod test { topic: topic.clone(), key: "".into(), id: None, + published: chrono::Utc::now(), + value: Vec::new(), }) .await?; @@ -192,13 +223,17 @@ mod test { let consumers = Consumers::new(Handler::new(staging)); - consumers.add_consumer(&consumer_id, &consumer_index, &topic)?; - let consumer = consumers.get_consumer(&consumer_id, &consumer_index); - assert_eq!(Some(Consumer { offset: 0 }), consumer); + consumers + .add_consumer(&consumer_id, &consumer_index, &topic) + .await?; + let consumer = consumers.get_consumer(&consumer_id, &consumer_index).await; + assert_eq!(Some(Consumer::default()), consumer); - consumers.notify_update(&topic, None::, offset)?; - let consumer = consumers.get_consumer(&consumer_id, &consumer_index); - assert_eq!(Some(Consumer { offset: 9 }), consumer); + consumers + .notify_update(&topic, None::, offset) + .await?; + let consumer = consumers.get_consumer(&consumer_id, &consumer_index).await; + assert_eq!(Some(Consumer::new(9)), consumer); Ok(()) } diff --git a/crates/nodata/src/services/handler.rs b/crates/nodata/src/services/handler.rs index 94f948b..6c1f28a 100644 --- a/crates/nodata/src/services/handler.rs +++ b/crates/nodata/src/services/handler.rs @@ -1,4 +1,4 @@ -use std::sync::Arc; +use crate::state::SharedState; use super::{ consumers::{Consumer, PartitionKey, Topic, TopicOffset}, @@ -7,32 +7,42 @@ use super::{ #[derive(Clone)] pub struct Handler { - staging: Arc, + staging: Staging, } impl Handler { pub fn new(staging: Staging) -> Self { - Self { - staging: Arc::new(staging), - } + Self { staging } } - pub fn handle_offset( + pub async fn handle_offset( &self, topic: &Topic, partition_key: Option<&PartitionKey>, consumer: &Consumer, offset: TopicOffset, ) -> anyhow::Result<()> { - let events = - self.staging - .get_topic_offset(topic, partition_key.clone(), consumer.offset, offset)?; + let events = self + .staging + .get_topic_offset(topic, partition_key, consumer.offset, offset) + .await?; // TODO: handle events for event in events { tracing::trace!("handling event: {:?}", event); + consumer.tx.send(event).await?; } Ok(()) } } + +pub trait HandlerState { + fn handler(&self) -> Handler; +} + +impl HandlerState for SharedState { + fn handler(&self) -> Handler { + self.handler.clone() + } +} diff --git a/crates/nodata/src/services/staging.rs b/crates/nodata/src/services/staging.rs index 5ff6568..bd5a10b 100644 --- a/crates/nodata/src/services/staging.rs +++ b/crates/nodata/src/services/staging.rs @@ -1,7 +1,6 @@ -use std::{ - collections::BTreeMap, - sync::{Arc, RwLock}, -}; +use std::{collections::BTreeMap, sync::Arc}; + +use tokio::sync::RwLock; use crate::state::SharedState; @@ -12,6 +11,8 @@ pub struct StagingEvent { pub topic: String, pub key: String, pub id: Option, + pub published: chrono::DateTime, + pub value: Vec, } #[derive(Clone, Default)] @@ -33,7 +34,7 @@ impl Staging { staging_event: impl Into, ) -> anyhow::Result { let staging_event: StagingEvent = staging_event.into(); - let mut store = self.store.write().unwrap(); + let mut store = self.store.write().await; tracing::trace!( topic = staging_event.topic, id = staging_event.id, @@ -82,13 +83,13 @@ impl Staging { } pub async fn get_topics(&self) -> anyhow::Result> { - let store = self.store.read().unwrap(); + let store = self.store.read().await; Ok(store.keys().cloned().collect::>()) } pub async fn get_keys(&self, topic: impl Into) -> anyhow::Result> { - let store = self.store.read().unwrap(); + let store = self.store.read().await; let items = store .get(&topic.into()) @@ -98,7 +99,7 @@ impl Staging { Ok(items.cloned().collect::>()) } - pub fn get_topic_offset( + pub async fn get_topic_offset( &self, topic: impl Into, partition_key: Option>, @@ -122,7 +123,7 @@ impl Staging { ) } - let store = self.store.read().unwrap(); + let store = self.store.read().await; let partitions = match store.get(&topic) { Some(partitions) => partitions, @@ -150,7 +151,7 @@ impl Staging { ) } - Ok(partition[start..=end].to_vec()) + Ok(partition[start + 1..=end].to_vec()) } } diff --git a/proto/nodata/v1/nomicon.proto b/proto/nodata/v1/nomicon.proto index f2d7290..c96a29e 100644 --- a/proto/nodata/v1/nomicon.proto +++ b/proto/nodata/v1/nomicon.proto @@ -8,6 +8,7 @@ service NoDataService { rpc PublishEvent(PublishEventRequest) returns (PublishEventResponse) {} rpc GetTopics(GetTopicsRequest) returns (GetTopicsResponse) {} rpc GetKeys(GetKeysRequest) returns (GetKeysResponse) {} + rpc Subscribe(SubscribeRequest) returns (stream SubscribeResponse){} } message PublishEventRequest { @@ -32,3 +33,14 @@ message GetKeysRequest { message GetKeysResponse { repeated string keys = 1; } + +message SubscribeRequest { + string topic = 1; + string key = 2; +} +message SubscribeResponse{ + string id = 1; + google.protobuf.Timestamp published = 2; + uint64 offset = 3; + bytes value = 4; +}