nodata/crates/nodata-storage/src/lib.rs

115 lines
3.6 KiB
Rust
Raw Normal View History

// # Nodata Storage:
// storage is an abstraction for a log based partitiniong system on top of a given file system.
// it allows treating topics with a stream of events as logs, that are persisted for fast lookups
//
// A topic has a log corresponding log location (hash of the topic name)
// Each message is sequentially added to a log file and offsets are stored in a separate index file for fast lookup
use std::{collections::BTreeMap, sync::Arc};
use anyhow::Context;
use backend::StorageBackend;
use sha2::{Digest, Sha256};
use tokio::sync::Mutex;
type TopicHashKey = String;
pub mod backend {
use std::{
env::temp_dir,
path::{Path, PathBuf},
};
use anyhow::Context;
use tokio::io::AsyncWriteExt;
pub struct StorageBackend {
location: PathBuf,
}
impl StorageBackend {
pub fn new(location: &Path) -> Self {
Self {
location: location.into(),
}
}
pub fn temp() -> Self {
Self::new(&temp_dir().join("nodata"))
}
pub async fn flush_segment(&self, topic: &str, buffer: &[u8]) -> anyhow::Result<String> {
let segment_key = uuid::Uuid::now_v7();
let segment_path = PathBuf::from("logs")
.join(topic)
.join(segment_key.to_string());
tracing::trace!("writing segment file: {}", segment_path.display());
let file_location = self.location.join(&segment_path);
if let Some(parent) = file_location.parent() {
tokio::fs::create_dir_all(parent)
.await
.context("failed to create storage backend dir")?;
}
let mut segment_file = tokio::fs::File::create(&file_location).await?;
segment_file.write_all(buffer).await?;
segment_file.flush().await?;
Ok(segment_path.to_string_lossy().to_string())
}
}
}
#[derive(Clone)]
pub struct Storage {
segment_size_bytes: usize,
buffer: Arc<Mutex<BTreeMap<TopicHashKey, Vec<u8>>>>,
backend: Arc<StorageBackend>,
}
impl Storage {
pub fn new(backend: StorageBackend) -> Self {
Self {
segment_size_bytes: 4096 * 1000, // 4MB
buffer: Arc::default(),
backend: Arc::new(backend),
}
}
pub async fn push_message(&self, topic: &str, value: &[u8]) -> anyhow::Result<()> {
// TODO: Consider caching the topic hash, such that we don't have to hash it every time we need to publish a message
let topic_key = self.hash_topic(topic);
if value.len() > self.segment_size_bytes {
anyhow::bail!("value to large for segment");
}
{
// FIXME: the lock is very expensive here, especially as it is over multiple wait points. Until it becomes a problem, just leave it be
let mut buffer = self.buffer.lock().await;
if buffer.get_mut(&topic_key).is_none() {
buffer.insert(
topic_key.clone(),
Vec::with_capacity(self.segment_size_bytes),
);
}
let log_buffer = buffer.get_mut(&topic_key).unwrap();
if log_buffer.len() + value.len() >= self.segment_size_bytes {
self.backend
.flush_segment(&topic_key, log_buffer)
.await
.context("failed to write segment")?;
log_buffer.clear();
}
log_buffer.extend(value);
}
Ok(())
}
fn hash_topic(&self, input: &str) -> String {
hex::encode(Sha256::digest(input.as_bytes()))
}
}