// # Nodata Storage: // storage is an abstraction for a log based partitiniong system on top of a given file system. // it allows treating topics with a stream of events as logs, that are persisted for fast lookups // // A topic has a log corresponding log location (hash of the topic name) // Each message is sequentially added to a log file and offsets are stored in a separate index file for fast lookup use std::{collections::BTreeMap, sync::Arc, time::SystemTime}; use anyhow::Context; use backend::StorageBackend; use sha2::{Digest, Sha256}; use tokio::sync::Mutex; type TopicHashKey = String; pub mod backend { use std::{ env::temp_dir, path::{Path, PathBuf}, time::{SystemTime, UNIX_EPOCH}, }; use anyhow::Context; use tokio::io::AsyncWriteExt; pub struct StorageBackend { location: PathBuf, } impl StorageBackend { pub fn new(location: &Path) -> Self { Self { location: location.into(), } } pub fn temp() -> Self { Self::new(&temp_dir().join("nodata")) } pub async fn flush_segment(&self, topic: &str, buffer: &[u8]) -> anyhow::Result { let segment_key = uuid::Uuid::now_v7(); let segment_path = PathBuf::from("logs") .join(topic) .join(segment_key.to_string()); tracing::trace!("writing segment file: {}", segment_path.display()); let file_location = self.location.join(&segment_path); if let Some(parent) = file_location.parent() { tokio::fs::create_dir_all(parent) .await .context("failed to create storage backend dir")?; } let mut segment_file = tokio::fs::File::create(&file_location).await?; segment_file.write_all(buffer).await?; segment_file.flush().await?; Ok(segment_key.to_string()) } pub async fn append_index( &self, topic: &str, segment_file: &str, time: SystemTime, ) -> anyhow::Result<()> { let index_path = PathBuf::from("indexes").join(topic); tracing::trace!("writing index file: {}", index_path.display()); let file_location = self.location.join(&index_path); if let Some(parent) = file_location.parent() { tokio::fs::create_dir_all(parent) .await .context("failed to create storage backend dir, index")?; } if !file_location.exists() { tokio::fs::File::create(&file_location).await?; } let mut index_file = tokio::fs::File::options() .append(true) .open(&file_location) .await?; index_file .write_all( format!( "{},{}\n", time.duration_since(UNIX_EPOCH) .expect("to be able to get time") .as_secs(), segment_file ) .as_bytes(), ) .await?; index_file.flush().await?; Ok(()) } } } #[derive(Clone)] pub struct Storage { segment_size_bytes: usize, buffer: Arc>>>, backend: Arc, } impl Storage { pub fn new(backend: StorageBackend) -> Self { Self { segment_size_bytes: 4096 * 100, // 4MB buffer: Arc::default(), backend: Arc::new(backend), } } pub async fn push_message(&self, topic: &str, value: &[u8]) -> anyhow::Result<()> { // TODO: Consider caching the topic hash, such that we don't have to hash it every time we need to publish a message let topic_key = self.hash_topic(topic); if value.len() > self.segment_size_bytes { anyhow::bail!("value to large for segment"); } { // FIXME: the lock is very expensive here, especially as it is over multiple wait points. Until it becomes a problem, just leave it be let mut buffer = self.buffer.lock().await; if buffer.get_mut(&topic_key).is_none() { buffer.insert( topic_key.clone(), Vec::with_capacity(self.segment_size_bytes), ); } let log_buffer = buffer.get_mut(&topic_key).unwrap(); if log_buffer.len() + value.len() >= self.segment_size_bytes { let time = SystemTime::now(); let segment_id = self .backend .flush_segment(&topic_key, log_buffer) .await .context("failed to write segment")?; self.backend .append_index(&topic_key, &segment_id, time) .await?; log_buffer.clear(); } log_buffer.extend(value); } Ok(()) } pub async fn commit_segment_offset( &self, topic: &str, segment_id: &str, time: SystemTime, ) -> anyhow::Result<()> { Ok(()) } fn hash_topic(&self, input: &str) -> String { hex::encode(Sha256::digest(input.as_bytes())) } }