feat: add basic streaming data

Signed-off-by: kjuulh <contact@kjuulh.io>
This commit is contained in:
Kasper Juul Hermansen 2024-08-15 21:07:22 +02:00
parent cce1400a34
commit 3a2a65dfa7
Signed by: kjuulh
GPG Key ID: 9AA7BC13CE474394
11 changed files with 330 additions and 76 deletions

1
Cargo.lock generated
View File

@ -1109,6 +1109,7 @@ dependencies = [
"serde",
"sqlx",
"tokio",
"tokio-stream",
"tokio-util",
"tonic",
"tower-http",

View File

@ -29,6 +29,7 @@ bytes = "1.7.1"
prost = "0.13.1"
prost-types = "0.13.1"
chrono = { version = "0.4.38", features = ["serde"] }
tokio-stream = "0.1.15"
[dev-dependencies]
tracing-test = "0.2.5"

View File

@ -2,42 +2,60 @@
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct PublishEventRequest {
#[prost(string, tag="1")]
#[prost(string, tag = "1")]
pub topic: ::prost::alloc::string::String,
#[prost(message, optional, tag="2")]
#[prost(message, optional, tag = "2")]
pub published: ::core::option::Option<::prost_types::Timestamp>,
#[prost(string, tag="3")]
#[prost(string, tag = "3")]
pub key: ::prost::alloc::string::String,
#[prost(bytes="vec", tag="4")]
#[prost(bytes = "vec", tag = "4")]
pub value: ::prost::alloc::vec::Vec<u8>,
#[prost(string, optional, tag="5")]
#[prost(string, optional, tag = "5")]
pub id: ::core::option::Option<::prost::alloc::string::String>,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct PublishEventResponse {
}
pub struct PublishEventResponse {}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct GetTopicsRequest {
}
pub struct GetTopicsRequest {}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct GetTopicsResponse {
#[prost(string, repeated, tag="1")]
#[prost(string, repeated, tag = "1")]
pub topics: ::prost::alloc::vec::Vec<::prost::alloc::string::String>,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct GetKeysRequest {
#[prost(string, tag="1")]
#[prost(string, tag = "1")]
pub topic: ::prost::alloc::string::String,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct GetKeysResponse {
#[prost(string, repeated, tag="1")]
#[prost(string, repeated, tag = "1")]
pub keys: ::prost::alloc::vec::Vec<::prost::alloc::string::String>,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct SubscribeRequest {
#[prost(string, tag = "1")]
pub topic: ::prost::alloc::string::String,
#[prost(string, tag = "2")]
pub key: ::prost::alloc::string::String,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct SubscribeResponse {
#[prost(string, tag = "1")]
pub id: ::prost::alloc::string::String,
#[prost(message, optional, tag = "2")]
pub published: ::core::option::Option<::prost_types::Timestamp>,
#[prost(uint64, tag = "3")]
pub offset: u64,
#[prost(bytes = "vec", tag = "4")]
pub value: ::prost::alloc::vec::Vec<u8>,
}
include!("nodata.v1.tonic.rs");
// @@protoc_insertion_point(module)
// @@protoc_insertion_point(module)

View File

@ -159,6 +159,31 @@ pub mod no_data_service_client {
.insert(GrpcMethod::new("nodata.v1.NoDataService", "GetKeys"));
self.inner.unary(req, path, codec).await
}
pub async fn subscribe(
&mut self,
request: impl tonic::IntoRequest<super::SubscribeRequest>,
) -> std::result::Result<
tonic::Response<tonic::codec::Streaming<super::SubscribeResponse>>,
tonic::Status,
> {
self.inner
.ready()
.await
.map_err(|e| {
tonic::Status::new(
tonic::Code::Unknown,
format!("Service was not ready: {}", e.into()),
)
})?;
let codec = tonic::codec::ProstCodec::default();
let path = http::uri::PathAndQuery::from_static(
"/nodata.v1.NoDataService/Subscribe",
);
let mut req = request.into_request();
req.extensions_mut()
.insert(GrpcMethod::new("nodata.v1.NoDataService", "Subscribe"));
self.inner.server_streaming(req, path, codec).await
}
}
}
/// Generated server implementations.
@ -186,6 +211,16 @@ pub mod no_data_service_server {
&self,
request: tonic::Request<super::GetKeysRequest>,
) -> std::result::Result<tonic::Response<super::GetKeysResponse>, tonic::Status>;
/// Server streaming response type for the Subscribe method.
type SubscribeStream: tonic::codegen::tokio_stream::Stream<
Item = std::result::Result<super::SubscribeResponse, tonic::Status>,
>
+ Send
+ 'static;
async fn subscribe(
&self,
request: tonic::Request<super::SubscribeRequest>,
) -> std::result::Result<tonic::Response<Self::SubscribeStream>, tonic::Status>;
}
#[derive(Debug)]
pub struct NoDataServiceServer<T: NoDataService> {
@ -404,6 +439,53 @@ pub mod no_data_service_server {
};
Box::pin(fut)
}
"/nodata.v1.NoDataService/Subscribe" => {
#[allow(non_camel_case_types)]
struct SubscribeSvc<T: NoDataService>(pub Arc<T>);
impl<
T: NoDataService,
> tonic::server::ServerStreamingService<super::SubscribeRequest>
for SubscribeSvc<T> {
type Response = super::SubscribeResponse;
type ResponseStream = T::SubscribeStream;
type Future = BoxFuture<
tonic::Response<Self::ResponseStream>,
tonic::Status,
>;
fn call(
&mut self,
request: tonic::Request<super::SubscribeRequest>,
) -> Self::Future {
let inner = Arc::clone(&self.0);
let fut = async move {
<T as NoDataService>::subscribe(&inner, request).await
};
Box::pin(fut)
}
}
let accept_compression_encodings = self.accept_compression_encodings;
let send_compression_encodings = self.send_compression_encodings;
let max_decoding_message_size = self.max_decoding_message_size;
let max_encoding_message_size = self.max_encoding_message_size;
let inner = self.inner.clone();
let fut = async move {
let inner = inner.0;
let method = SubscribeSvc(inner);
let codec = tonic::codec::ProstCodec::default();
let mut grpc = tonic::server::Grpc::new(codec)
.apply_compression_config(
accept_compression_encodings,
send_compression_encodings,
)
.apply_max_message_size_config(
max_decoding_message_size,
max_encoding_message_size,
);
let res = grpc.server_streaming(method, req).await;
Ok(res)
};
Box::pin(fut)
}
_ => {
Box::pin(async move {
Ok(

View File

@ -1,10 +1,16 @@
use std::net::SocketAddr;
use std::{net::SocketAddr, pin::Pin};
use anyhow::Context;
use mad::Component;
use tokio_stream::wrappers::ReceiverStream;
use tonic::Response;
use uuid::Uuid;
use crate::{
services::{ingest::IngestState, staging::StagingEvent},
services::{
consumers::ConsumersState, handler::HandlerState, ingest::IngestState,
staging::StagingEvent,
},
state::SharedState,
};
@ -25,6 +31,9 @@ impl GrpcServer {
}
}
type ResponseStream =
Pin<Box<dyn tokio_stream::Stream<Item = Result<SubscribeResponse, tonic::Status>> + Send>>;
#[tonic::async_trait]
impl no_data_service_server::NoDataService for GrpcServer {
async fn publish_event(
@ -74,6 +83,56 @@ impl no_data_service_server::NoDataService for GrpcServer {
Ok(tonic::Response::new(GetKeysResponse { keys }))
}
type SubscribeStream = ResponseStream;
async fn subscribe(
&self,
request: tonic::Request<self::SubscribeRequest>,
) -> std::result::Result<tonic::Response<Self::SubscribeStream>, tonic::Status> {
let req = request.into_inner();
let id = Uuid::new_v4().to_string();
let index = Uuid::new_v4().to_string();
self.state
.consumers()
.add_consumer(&id, &index, req.topic)
.await
.map_err(|e| {
tracing::warn!(error = e.to_string(), "failed to add consumer");
tonic::Status::internal(e.to_string())
})?;
let consumer = self
.state
.consumers
.get_consumer(&id, &index)
.await
.unwrap();
let handler = self.state.handler();
let (tx, rx) = tokio::sync::mpsc::channel(128);
tokio::spawn(async move {
let mut event_stream = consumer.rx.lock().await;
while let Some(msg) = event_stream.recv().await {
if let Err(e) = tx
.send(Ok(SubscribeResponse {
id: msg.id.unwrap_or_default(),
published: Some(chrono_to_prost_timestamp(msg.published)),
offset: 0,
value: msg.value,
}))
.await
{
tracing::warn!(error = e.to_string(), "failed to send event");
};
}
});
let stream = ReceiverStream::new(rx);
Ok(Response::new(Box::pin(stream) as Self::SubscribeStream))
}
}
impl From<PublishEventRequest> for StagingEvent {
@ -82,10 +141,26 @@ impl From<PublishEventRequest> for StagingEvent {
topic: value.topic,
id: value.id,
key: value.key,
published: value
.published
.and_then(prost_timestamp_to_chrono)
.unwrap_or_default(),
value: value.value,
}
}
}
fn prost_timestamp_to_chrono(ts: prost_types::Timestamp) -> Option<chrono::DateTime<chrono::Utc>> {
chrono::DateTime::<chrono::Utc>::from_timestamp(ts.seconds, ts.nanos as u32)
}
fn chrono_to_prost_timestamp(dt: chrono::DateTime<chrono::Utc>) -> prost_types::Timestamp {
prost_types::Timestamp {
seconds: dt.timestamp(),
nanos: dt.timestamp_subsec_nanos() as i32,
}
}
#[axum::async_trait]
impl Component for GrpcServer {
fn name(&self) -> Option<String> {

View File

@ -8,7 +8,7 @@ use std::net::SocketAddr;
use chrono::{Datelike, Timelike};
use clap::{Parser, Subcommand};
use grpc::{GetKeysRequest, GetTopicsRequest, GrpcServer, PublishEventRequest};
use grpc::{GetKeysRequest, GetTopicsRequest, GrpcServer, PublishEventRequest, SubscribeRequest};
use http::HttpServer;
use mad::Mad;
use state::SharedState;
@ -31,13 +31,13 @@ enum Commands {
},
Client {
#[arg(env = "SERVICE_HOST", long, default_value = "http://127.0.0.1:3000")]
host: String,
#[arg(
env = "GRPC_SERVICE_HOST",
long,
default_value = "http://127.0.0.1:7900"
)]
// #[arg(env = "SERVICE_HOST", long, default_value = "http://127.0.0.1:3000")]
// host: String,
// #[arg(
// env = "GRPC_SERVICE_HOST",
// long,
// default_value = "http://127.0.0.1:7900"
// )]
grpc_host: String,
#[command(subcommand)]
@ -64,6 +64,12 @@ enum ClientCommands {
#[arg(long)]
topic: String,
},
SubscribeTopic {
#[arg(long)]
topic: String,
#[arg(long)]
key: String,
},
}
#[tokio::main]
@ -84,7 +90,6 @@ async fn main() -> anyhow::Result<()> {
}
Commands::Client {
commands,
host,
grpc_host,
} => match commands {
ClientCommands::PublishEvent {
@ -146,6 +151,23 @@ async fn main() -> anyhow::Result<()> {
println!("{key}");
}
}
ClientCommands::SubscribeTopic { topic, key } => {
let mut client = create_client(grpc_host).await?;
println!("listening for events in topic: {}", topic);
let resp = client.subscribe(SubscribeRequest { topic, key }).await?;
let mut stream = resp.into_inner();
while let Some(msg) = stream.message().await? {
println!(
"msg (id={}, published={}): {}",
msg.id,
msg.published.unwrap_or_default(),
std::str::from_utf8(&msg.value).unwrap_or_default(),
)
}
}
},
}

View File

@ -16,11 +16,8 @@ pub mod ingest {
}
impl Ingest {
pub fn new(staging: Staging, consumer: Consumers) -> Self {
Self {
staging,
consumers: consumer,
}
pub fn new(staging: Staging, consumers: Consumers) -> Self {
Self { staging, consumers }
}
pub async fn publish(&self, event: impl Into<StagingEvent>) -> anyhow::Result<()> {
@ -35,7 +32,7 @@ pub mod ingest {
};
let offset = self.staging.publish(event).await?;
self.consumers.notify_update(topic, key, offset)?;
self.consumers.notify_update(topic, key, offset).await?;
Ok(())
}

View File

@ -1,11 +1,10 @@
use std::{
collections::BTreeMap,
sync::{Arc, RwLock},
};
use std::{collections::BTreeMap, sync::Arc};
use tokio::sync::RwLock;
use crate::state::SharedState;
use super::handler::Handler;
use super::{handler::Handler, staging::StagingEvent};
pub type ConsumerId = String;
pub type ConsumerIndex = String;
@ -14,9 +13,36 @@ pub type PartitionKey = String;
pub type TopicOffset = usize;
// Consumer will at some point contain an interface to send events over a channel to a lib
#[derive(Default, Clone, Debug, PartialEq, Eq)]
#[derive(Debug, Clone)]
pub struct Consumer {
pub offset: TopicOffset,
pub tx: tokio::sync::mpsc::Sender<StagingEvent>,
pub rx: Arc<tokio::sync::Mutex<tokio::sync::mpsc::Receiver<StagingEvent>>>,
}
impl Default for Consumer {
fn default() -> Self {
Self::new(0)
}
}
impl Consumer {
pub fn new(offset: usize) -> Self {
let (tx, rx) = tokio::sync::mpsc::channel(128);
Self {
offset,
tx,
rx: Arc::new(tokio::sync::Mutex::new(rx)),
}
}
}
impl PartialEq for Consumer {
fn eq(&self, other: &Self) -> bool {
self.offset == other.offset
}
}
#[derive(Clone)]
@ -40,7 +66,7 @@ impl Consumers {
}
}
pub fn add_consumer(
pub async fn add_consumer(
&self,
id: impl Into<ConsumerId>,
index: impl Into<ConsumerIndex>,
@ -51,7 +77,7 @@ impl Consumers {
let topic = topic.into();
{
let mut storage = self.storage.write().unwrap();
let mut storage = self.storage.write().await;
if !storage.contains_key(&id) {
storage.insert(id.clone(), BTreeMap::default());
@ -64,7 +90,7 @@ impl Consumers {
}
{
let mut subscriptions = self.subscriptions.write().unwrap();
let mut subscriptions = self.subscriptions.write().await;
if !subscriptions.contains_key(&topic) {
subscriptions.insert(topic.clone(), Vec::default());
}
@ -78,19 +104,19 @@ impl Consumers {
Ok(())
}
pub fn get_consumer(
pub async fn get_consumer(
&self,
id: impl Into<ConsumerId>,
index: impl Into<ConsumerIndex>,
) -> Option<Consumer> {
let storage = self.storage.read().unwrap();
let storage = self.storage.read().await;
let consumer_group = storage.get(&id.into())?;
let consumer = consumer_group.get(&index.into())?;
Some(consumer.to_owned())
}
pub fn notify_update(
pub async fn notify_update(
&self,
topic: impl Into<Topic>,
key: Option<impl Into<PartitionKey>>,
@ -98,9 +124,9 @@ impl Consumers {
) -> anyhow::Result<()> {
let topic = topic.into();
let offset = offset.into();
let key = key.and_then(|k| Some(k.into()));
let key = key.map(|k| k.into());
let subscriptions = self.subscriptions.read().unwrap();
let subscriptions = self.subscriptions.read().await;
let subscription = match subscriptions.get(&topic) {
Some(s) => s,
None => {
@ -109,14 +135,15 @@ impl Consumers {
}
};
let mut storage = self.storage.write().unwrap();
let mut storage = self.storage.write().await;
for consumer_id in subscription {
match storage.get_mut(consumer_id) {
Some(consumer_groups) => {
for consumer in consumer_groups.values_mut() {
// TODO: Implement retry logic, etc.
self.handler
.handle_offset(&topic, key.as_ref(), consumer, offset)?;
.handle_offset(&topic, key.as_ref(), consumer, offset)
.await?;
consumer.offset = offset;
}
}
@ -152,18 +179,20 @@ mod test {
use super::*;
#[test]
fn can_add_consumer() -> anyhow::Result<()> {
#[tokio::test]
async fn can_add_consumer() -> anyhow::Result<()> {
let consumer_id = "some-consumer-id";
let consumer_index = "some-consumer-index";
let topic = "some-topic";
let consumers = Consumers::new(Handler::new(Staging::default()));
consumers.add_consumer(consumer_id, consumer_index, topic)?;
let consumer = consumers.get_consumer(consumer_id, consumer_index);
consumers
.add_consumer(consumer_id, consumer_index, topic)
.await?;
let consumer = consumers.get_consumer(consumer_id, consumer_index).await;
assert_eq!(Some(Consumer { offset: 0 }), consumer);
assert_eq!(Some(Consumer::default()), consumer);
Ok(())
}
@ -184,6 +213,8 @@ mod test {
topic: topic.clone(),
key: "".into(),
id: None,
published: chrono::Utc::now(),
value: Vec::new(),
})
.await?;
@ -192,13 +223,17 @@ mod test {
let consumers = Consumers::new(Handler::new(staging));
consumers.add_consumer(&consumer_id, &consumer_index, &topic)?;
let consumer = consumers.get_consumer(&consumer_id, &consumer_index);
assert_eq!(Some(Consumer { offset: 0 }), consumer);
consumers
.add_consumer(&consumer_id, &consumer_index, &topic)
.await?;
let consumer = consumers.get_consumer(&consumer_id, &consumer_index).await;
assert_eq!(Some(Consumer::default()), consumer);
consumers.notify_update(&topic, None::<String>, offset)?;
let consumer = consumers.get_consumer(&consumer_id, &consumer_index);
assert_eq!(Some(Consumer { offset: 9 }), consumer);
consumers
.notify_update(&topic, None::<String>, offset)
.await?;
let consumer = consumers.get_consumer(&consumer_id, &consumer_index).await;
assert_eq!(Some(Consumer::new(9)), consumer);
Ok(())
}

View File

@ -1,4 +1,4 @@
use std::sync::Arc;
use crate::state::SharedState;
use super::{
consumers::{Consumer, PartitionKey, Topic, TopicOffset},
@ -7,32 +7,42 @@ use super::{
#[derive(Clone)]
pub struct Handler {
staging: Arc<Staging>,
staging: Staging,
}
impl Handler {
pub fn new(staging: Staging) -> Self {
Self {
staging: Arc::new(staging),
}
Self { staging }
}
pub fn handle_offset(
pub async fn handle_offset(
&self,
topic: &Topic,
partition_key: Option<&PartitionKey>,
consumer: &Consumer,
offset: TopicOffset,
) -> anyhow::Result<()> {
let events =
self.staging
.get_topic_offset(topic, partition_key.clone(), consumer.offset, offset)?;
let events = self
.staging
.get_topic_offset(topic, partition_key, consumer.offset, offset)
.await?;
// TODO: handle events
for event in events {
tracing::trace!("handling event: {:?}", event);
consumer.tx.send(event).await?;
}
Ok(())
}
}
pub trait HandlerState {
fn handler(&self) -> Handler;
}
impl HandlerState for SharedState {
fn handler(&self) -> Handler {
self.handler.clone()
}
}

View File

@ -1,7 +1,6 @@
use std::{
collections::BTreeMap,
sync::{Arc, RwLock},
};
use std::{collections::BTreeMap, sync::Arc};
use tokio::sync::RwLock;
use crate::state::SharedState;
@ -12,6 +11,8 @@ pub struct StagingEvent {
pub topic: String,
pub key: String,
pub id: Option<String>,
pub published: chrono::DateTime<chrono::Utc>,
pub value: Vec<u8>,
}
#[derive(Clone, Default)]
@ -33,7 +34,7 @@ impl Staging {
staging_event: impl Into<StagingEvent>,
) -> anyhow::Result<TopicOffset> {
let staging_event: StagingEvent = staging_event.into();
let mut store = self.store.write().unwrap();
let mut store = self.store.write().await;
tracing::trace!(
topic = staging_event.topic,
id = staging_event.id,
@ -82,13 +83,13 @@ impl Staging {
}
pub async fn get_topics(&self) -> anyhow::Result<Vec<String>> {
let store = self.store.read().unwrap();
let store = self.store.read().await;
Ok(store.keys().cloned().collect::<Vec<_>>())
}
pub async fn get_keys(&self, topic: impl Into<String>) -> anyhow::Result<Vec<String>> {
let store = self.store.read().unwrap();
let store = self.store.read().await;
let items = store
.get(&topic.into())
@ -98,7 +99,7 @@ impl Staging {
Ok(items.cloned().collect::<Vec<_>>())
}
pub fn get_topic_offset(
pub async fn get_topic_offset(
&self,
topic: impl Into<String>,
partition_key: Option<impl Into<PartitionKey>>,
@ -122,7 +123,7 @@ impl Staging {
)
}
let store = self.store.read().unwrap();
let store = self.store.read().await;
let partitions = match store.get(&topic) {
Some(partitions) => partitions,
@ -150,7 +151,7 @@ impl Staging {
)
}
Ok(partition[start..=end].to_vec())
Ok(partition[start + 1..=end].to_vec())
}
}

View File

@ -8,6 +8,7 @@ service NoDataService {
rpc PublishEvent(PublishEventRequest) returns (PublishEventResponse) {}
rpc GetTopics(GetTopicsRequest) returns (GetTopicsResponse) {}
rpc GetKeys(GetKeysRequest) returns (GetKeysResponse) {}
rpc Subscribe(SubscribeRequest) returns (stream SubscribeResponse){}
}
message PublishEventRequest {
@ -32,3 +33,14 @@ message GetKeysRequest {
message GetKeysResponse {
repeated string keys = 1;
}
message SubscribeRequest {
string topic = 1;
string key = 2;
}
message SubscribeResponse{
string id = 1;
google.protobuf.Timestamp published = 2;
uint64 offset = 3;
bytes value = 4;
}