// # Nodata Storage: // storage is an abstraction for a log based partitiniong system on top of a given file system. // it allows treating topics with a stream of events as logs, that are persisted for fast lookups // // A topic has a log corresponding log location (hash of the topic name) // Each message is sequentially added to a log file and offsets are stored in a separate index file for fast lookup use std::{collections::BTreeMap, sync::Arc, time::SystemTime}; use anyhow::Context; use backend::{local::LocalStorageBackend, StorageBackend}; use proto::ProtoStorage; use sha2::{Digest, Sha256}; use tokio::sync::Mutex; type TopicHashKey = String; pub mod backend; #[derive(Clone)] pub struct Storage { segment_size_bytes: usize, buffer: Arc>>>>, backend: Arc, codec: ProtoStorage, } impl Storage { pub fn new(backend: LocalStorageBackend) -> Self { Self { segment_size_bytes: 4096 * 1000, // 4MB buffer: Arc::default(), codec: ProtoStorage::default(), backend: Arc::new(backend), } } pub async fn new_from_env() -> anyhow::Result { match std::env::var("STORAGE_BACKEND") .context("failed to find STORAGE_BACKEND in env")? .as_str() { "local" => Ok(Self { segment_size_bytes: 4096 * 1000, // 4MB buffer: Arc::default(), codec: ProtoStorage::default(), backend: Arc::new(LocalStorageBackend::new_from_env()?), }), #[cfg(feature = "s3")] "s3" => Ok(Self { segment_size_bytes: 4 * 1024 * 1000, // 4MB buffer: Arc::default(), codec: ProtoStorage::default(), backend: Arc::new(backend::s3::S3StorageBackend::new_from_env().await?), }), backend => anyhow::bail!("backend is not supported: {}", backend), } } pub async fn push_message(&self, topic: &str, value: &[u8]) -> anyhow::Result<()> { // TODO: Consider caching the topic hash, such that we don't have to hash it every time we need to publish a message let topic_key = self.hash_topic(topic); if value.len() > self.segment_size_bytes { anyhow::bail!("value to large for segment"); } { // FIXME: the lock is very expensive here, especially as it is over multiple wait points. Until it becomes a problem, just leave it be let mut buffer = self.buffer.lock().await; if buffer.get_mut(&topic_key).is_none() { buffer.insert( topic_key.clone(), Vec::with_capacity(self.segment_size_bytes), ); } let log_buffer = buffer.get_mut(&topic_key).unwrap(); if log_buffer .iter() .map(|b| b.len()) .reduce(|acc, i| acc + i) .unwrap_or_default() >= self.segment_size_bytes { let log_buffer = buffer.remove(&topic_key).unwrap(); let time = SystemTime::now(); let segment_id = self .backend .flush_segment(&topic_key, &self.codec.format_log_message(log_buffer)) .await .context("failed to write segment")?; self.backend .append_index(&topic_key, &segment_id, time) .await?; let mut log_buf = Vec::with_capacity(self.segment_size_bytes); log_buf.push(value.to_vec()); buffer.insert(topic_key.clone(), log_buf); } else { log_buffer.push(value.to_vec()); } } Ok(()) } pub async fn commit_segment_offset( &self, topic: &str, segment_id: &str, time: SystemTime, ) -> anyhow::Result<()> { Ok(()) } fn hash_topic(&self, input: &str) -> String { hex::encode(Sha256::digest(input.as_bytes())) } } mod proto { use prost::Message; mod gen { include!("gen/nodata.v1.rs"); } #[derive(Default, Clone)] pub struct ProtoStorage {} impl ProtoStorage { pub fn format_log_message(&self, messages: Vec>) -> Vec { gen::Log { messages }.encode_to_vec() } } }