diff --git a/Cargo.lock b/Cargo.lock index 13da29d..9780b28 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1625,6 +1625,7 @@ dependencies = [ "dagger-sdk", "dotenv", "drift", + "nodata-storage", "notmad", "prost", "prost-types", @@ -1655,6 +1656,18 @@ dependencies = [ "tracing", ] +[[package]] +name = "nodata-storage" +version = "0.1.0" +dependencies = [ + "anyhow", + "hex", + "sha2", + "tokio", + "tracing", + "uuid", +] + [[package]] name = "nom" version = "7.1.3" diff --git a/Cargo.toml b/Cargo.toml index bd78e78..8b0ad80 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,6 +3,7 @@ members = ["crates/*"] resolver = "2" [workspace.dependencies] +nodata-storage = { path = "crates/nodata-storage" } anyhow = { version = "1" } tokio = { version = "1", features = ["full"] } @@ -17,3 +18,4 @@ tonic = "0.12.1" bytes = "1.7.1" prost = "0.13.1" prost-types = "0.13.1" +uuid = { version = "1.7.0", features = ["v4", "v7"] } diff --git a/crates/nodata-storage/Cargo.toml b/crates/nodata-storage/Cargo.toml new file mode 100644 index 0000000..8f0ad63 --- /dev/null +++ b/crates/nodata-storage/Cargo.toml @@ -0,0 +1,13 @@ +[package] +name = "nodata-storage" +version = "0.1.0" +edition = "2021" + +[dependencies] +anyhow.workspace = true +tokio.workspace = true +uuid.workspace = true +tracing.workspace = true + +hex = "0.4.3" +sha2 = "0.10.8" diff --git a/crates/nodata-storage/src/lib.rs b/crates/nodata-storage/src/lib.rs new file mode 100644 index 0000000..70c3442 --- /dev/null +++ b/crates/nodata-storage/src/lib.rs @@ -0,0 +1,114 @@ +// # Nodata Storage: +// storage is an abstraction for a log based partitiniong system on top of a given file system. +// it allows treating topics with a stream of events as logs, that are persisted for fast lookups +// +// A topic has a log corresponding log location (hash of the topic name) +// Each message is sequentially added to a log file and offsets are stored in a separate index file for fast lookup + +use std::{collections::BTreeMap, sync::Arc}; + +use anyhow::Context; +use backend::StorageBackend; +use sha2::{Digest, Sha256}; +use tokio::sync::Mutex; + +type TopicHashKey = String; + +pub mod backend { + use std::{ + env::temp_dir, + path::{Path, PathBuf}, + }; + + use anyhow::Context; + use tokio::io::AsyncWriteExt; + + pub struct StorageBackend { + location: PathBuf, + } + + impl StorageBackend { + pub fn new(location: &Path) -> Self { + Self { + location: location.into(), + } + } + + pub fn temp() -> Self { + Self::new(&temp_dir().join("nodata")) + } + pub async fn flush_segment(&self, topic: &str, buffer: &[u8]) -> anyhow::Result { + let segment_key = uuid::Uuid::now_v7(); + let segment_path = PathBuf::from("logs") + .join(topic) + .join(segment_key.to_string()); + tracing::trace!("writing segment file: {}", segment_path.display()); + let file_location = self.location.join(&segment_path); + if let Some(parent) = file_location.parent() { + tokio::fs::create_dir_all(parent) + .await + .context("failed to create storage backend dir")?; + } + + let mut segment_file = tokio::fs::File::create(&file_location).await?; + segment_file.write_all(buffer).await?; + segment_file.flush().await?; + + Ok(segment_path.to_string_lossy().to_string()) + } + } +} + +#[derive(Clone)] +pub struct Storage { + segment_size_bytes: usize, + buffer: Arc>>>, + backend: Arc, +} + +impl Storage { + pub fn new(backend: StorageBackend) -> Self { + Self { + segment_size_bytes: 4096 * 100, // 4MB + buffer: Arc::default(), + backend: Arc::new(backend), + } + } + + pub async fn push_message(&self, topic: &str, value: &[u8]) -> anyhow::Result<()> { + // TODO: Consider caching the topic hash, such that we don't have to hash it every time we need to publish a message + let topic_key = self.hash_topic(topic); + + if value.len() > self.segment_size_bytes { + anyhow::bail!("value to large for segment"); + } + + { + // FIXME: the lock is very expensive here, especially as it is over multiple wait points. Until it becomes a problem, just leave it be + let mut buffer = self.buffer.lock().await; + if buffer.get_mut(&topic_key).is_none() { + buffer.insert( + topic_key.clone(), + Vec::with_capacity(self.segment_size_bytes), + ); + } + + let log_buffer = buffer.get_mut(&topic_key).unwrap(); + if log_buffer.len() + value.len() >= self.segment_size_bytes { + self.backend + .flush_segment(&topic_key, log_buffer) + .await + .context("failed to write segment")?; + log_buffer.clear(); + } + + log_buffer.extend(value); + } + + Ok(()) + } + + fn hash_topic(&self, input: &str) -> String { + hex::encode(Sha256::digest(input.as_bytes())) + } +} diff --git a/crates/nodata/Cargo.toml b/crates/nodata/Cargo.toml index 0f5aaeb..40a7bf4 100644 --- a/crates/nodata/Cargo.toml +++ b/crates/nodata/Cargo.toml @@ -4,6 +4,8 @@ version = "0.1.0" edition = "2021" [dependencies] +nodata-storage.workspace = true + anyhow.workspace = true tokio.workspace = true tracing.workspace = true @@ -12,6 +14,7 @@ clap.workspace = true dotenv.workspace = true axum.workspace = true drift.workspace = true +uuid.workspace = true serde = { version = "1.0.197", features = ["derive"] } sqlx = { version = "0.7.3", features = [ @@ -21,7 +24,6 @@ sqlx = { version = "0.7.3", features = [ "uuid", "time", ] } -uuid = { version = "1.7.0", features = ["v4"] } tower-http = { version = "0.5.2", features = ["cors", "trace"] } tokio-util = "0.7.11" tonic.workspace = true diff --git a/crates/nodata/src/services/consumers.rs b/crates/nodata/src/services/consumers.rs index 608e73e..5d6a151 100644 --- a/crates/nodata/src/services/consumers.rs +++ b/crates/nodata/src/services/consumers.rs @@ -288,7 +288,7 @@ mod test { let topic = "some-topic".to_string(); let offset = 9usize; - let staging = Staging::default(); + let staging = Staging::new(); // Publish 10 messages for _ in 0..10 { let offset = staging diff --git a/crates/nodata/src/services/staging.rs b/crates/nodata/src/services/staging.rs index 94415b5..e86c63b 100644 --- a/crates/nodata/src/services/staging.rs +++ b/crates/nodata/src/services/staging.rs @@ -1,4 +1,4 @@ -use std::{collections::BTreeMap, sync::Arc}; +use std::{collections::BTreeMap, env::temp_dir, sync::Arc}; use tokio::sync::RwLock; @@ -13,14 +13,23 @@ pub struct StagingEvent { pub value: Vec, } -#[derive(Clone, Default)] +#[derive(Clone)] pub struct Staging { // Temporary until we've got an actual file disk store #[allow(clippy::complexity)] store: Arc>>>, + + storage: nodata_storage::Storage, } impl Staging { + pub fn new() -> Self { + Self { + store: Arc::default(), + storage: nodata_storage::Storage::new(nodata_storage::backend::StorageBackend::temp()), + } + } + pub async fn publish( &self, staging_event: impl Into, @@ -29,6 +38,10 @@ impl Staging { let mut store = self.store.write().await; tracing::trace!(topic = staging_event.topic, "moving event to staging"); + self.storage + .push_message(&staging_event.topic, &staging_event.value) + .await?; + let offset = match store.get_mut(&staging_event.topic) { Some(part) => { part.push(staging_event); diff --git a/crates/nodata/src/state.rs b/crates/nodata/src/state.rs index db63f9f..9a75105 100644 --- a/crates/nodata/src/state.rs +++ b/crates/nodata/src/state.rs @@ -43,7 +43,7 @@ impl State { let _ = sqlx::query("SELECT 1;").fetch_one(&db).await?; - let staging = Staging::default(); + let staging = Staging::new(); let handler = Handler::new(staging.clone()); Ok(Self {