From b9e1184a7a492b70eed8850cb4bdac5e17c071fd Mon Sep 17 00:00:00 2001 From: kjuulh Date: Sat, 9 Nov 2024 17:44:05 +0100 Subject: [PATCH] feat: add indexes Signed-off-by: kjuulh --- crates/nodata-storage/src/lib.rs | 64 ++++++++++++++++++++++++++++++-- 1 file changed, 60 insertions(+), 4 deletions(-) diff --git a/crates/nodata-storage/src/lib.rs b/crates/nodata-storage/src/lib.rs index a9d5922..b08cc30 100644 --- a/crates/nodata-storage/src/lib.rs +++ b/crates/nodata-storage/src/lib.rs @@ -5,7 +5,7 @@ // A topic has a log corresponding log location (hash of the topic name) // Each message is sequentially added to a log file and offsets are stored in a separate index file for fast lookup -use std::{collections::BTreeMap, sync::Arc}; +use std::{collections::BTreeMap, sync::Arc, time::SystemTime}; use anyhow::Context; use backend::StorageBackend; @@ -18,6 +18,7 @@ pub mod backend { use std::{ env::temp_dir, path::{Path, PathBuf}, + time::{SystemTime, UNIX_EPOCH}, }; use anyhow::Context; @@ -37,6 +38,7 @@ pub mod backend { pub fn temp() -> Self { Self::new(&temp_dir().join("nodata")) } + pub async fn flush_segment(&self, topic: &str, buffer: &[u8]) -> anyhow::Result { let segment_key = uuid::Uuid::now_v7(); let segment_path = PathBuf::from("logs") @@ -54,7 +56,47 @@ pub mod backend { segment_file.write_all(buffer).await?; segment_file.flush().await?; - Ok(segment_path.to_string_lossy().to_string()) + Ok(segment_key.to_string()) + } + + pub async fn append_index( + &self, + topic: &str, + segment_file: &str, + time: SystemTime, + ) -> anyhow::Result<()> { + let index_path = PathBuf::from("indexes").join(topic); + tracing::trace!("writing index file: {}", index_path.display()); + let file_location = self.location.join(&index_path); + if let Some(parent) = file_location.parent() { + tokio::fs::create_dir_all(parent) + .await + .context("failed to create storage backend dir, index")?; + } + + if !file_location.exists() { + tokio::fs::File::create(&file_location).await?; + } + + let mut index_file = tokio::fs::File::options() + .append(true) + .open(&file_location) + .await?; + index_file + .write_all( + format!( + "{},{}\n", + time.duration_since(UNIX_EPOCH) + .expect("to be able to get time") + .as_secs(), + segment_file + ) + .as_bytes(), + ) + .await?; + index_file.flush().await?; + + Ok(()) } } } @@ -69,7 +111,7 @@ pub struct Storage { impl Storage { pub fn new(backend: StorageBackend) -> Self { Self { - segment_size_bytes: 4096 * 1000, // 4MB + segment_size_bytes: 4096 * 100, // 4MB buffer: Arc::default(), backend: Arc::new(backend), } @@ -95,10 +137,15 @@ impl Storage { let log_buffer = buffer.get_mut(&topic_key).unwrap(); if log_buffer.len() + value.len() >= self.segment_size_bytes { - self.backend + let time = SystemTime::now(); + let segment_id = self + .backend .flush_segment(&topic_key, log_buffer) .await .context("failed to write segment")?; + self.backend + .append_index(&topic_key, &segment_id, time) + .await?; log_buffer.clear(); } @@ -108,6 +155,15 @@ impl Storage { Ok(()) } + pub async fn commit_segment_offset( + &self, + topic: &str, + segment_id: &str, + time: SystemTime, + ) -> anyhow::Result<()> { + Ok(()) + } + fn hash_topic(&self, input: &str) -> String { hex::encode(Sha256::digest(input.as_bytes())) }