2024-11-09 17:19:19 +01:00
|
|
|
// # Nodata Storage:
|
|
|
|
// storage is an abstraction for a log based partitiniong system on top of a given file system.
|
|
|
|
// it allows treating topics with a stream of events as logs, that are persisted for fast lookups
|
|
|
|
//
|
|
|
|
// A topic has a log corresponding log location (hash of the topic name)
|
|
|
|
// Each message is sequentially added to a log file and offsets are stored in a separate index file for fast lookup
|
|
|
|
|
2024-11-09 17:44:05 +01:00
|
|
|
use std::{collections::BTreeMap, sync::Arc, time::SystemTime};
|
2024-11-09 17:19:19 +01:00
|
|
|
|
|
|
|
use anyhow::Context;
|
|
|
|
use backend::StorageBackend;
|
2024-11-10 13:42:19 +01:00
|
|
|
use proto::ProtoStorage;
|
2024-11-09 17:19:19 +01:00
|
|
|
use sha2::{Digest, Sha256};
|
|
|
|
use tokio::sync::Mutex;
|
|
|
|
|
|
|
|
type TopicHashKey = String;
|
|
|
|
|
2024-11-10 13:42:19 +01:00
|
|
|
pub mod backend;
|
2024-11-09 17:19:19 +01:00
|
|
|
|
|
|
|
#[derive(Clone)]
|
|
|
|
pub struct Storage {
|
|
|
|
segment_size_bytes: usize,
|
2024-11-10 13:42:19 +01:00
|
|
|
buffer: Arc<Mutex<BTreeMap<TopicHashKey, Vec<Vec<u8>>>>>,
|
2024-11-09 17:19:19 +01:00
|
|
|
backend: Arc<StorageBackend>,
|
2024-11-10 13:42:19 +01:00
|
|
|
|
|
|
|
codec: ProtoStorage,
|
2024-11-09 17:19:19 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
impl Storage {
|
|
|
|
pub fn new(backend: StorageBackend) -> Self {
|
|
|
|
Self {
|
2024-11-09 17:44:20 +01:00
|
|
|
segment_size_bytes: 4096 * 1000, // 4MB
|
2024-11-09 17:19:19 +01:00
|
|
|
buffer: Arc::default(),
|
2024-11-10 13:42:19 +01:00
|
|
|
|
2024-11-09 17:19:19 +01:00
|
|
|
backend: Arc::new(backend),
|
2024-11-10 13:42:19 +01:00
|
|
|
codec: ProtoStorage::default(),
|
2024-11-09 17:19:19 +01:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
pub async fn push_message(&self, topic: &str, value: &[u8]) -> anyhow::Result<()> {
|
|
|
|
// TODO: Consider caching the topic hash, such that we don't have to hash it every time we need to publish a message
|
|
|
|
let topic_key = self.hash_topic(topic);
|
|
|
|
|
|
|
|
if value.len() > self.segment_size_bytes {
|
|
|
|
anyhow::bail!("value to large for segment");
|
|
|
|
}
|
|
|
|
|
|
|
|
{
|
|
|
|
// FIXME: the lock is very expensive here, especially as it is over multiple wait points. Until it becomes a problem, just leave it be
|
|
|
|
let mut buffer = self.buffer.lock().await;
|
|
|
|
if buffer.get_mut(&topic_key).is_none() {
|
|
|
|
buffer.insert(
|
|
|
|
topic_key.clone(),
|
|
|
|
Vec::with_capacity(self.segment_size_bytes),
|
|
|
|
);
|
|
|
|
}
|
|
|
|
|
|
|
|
let log_buffer = buffer.get_mut(&topic_key).unwrap();
|
2024-11-10 13:42:19 +01:00
|
|
|
if log_buffer
|
|
|
|
.iter()
|
|
|
|
.map(|b| b.len())
|
|
|
|
.reduce(|acc, i| acc + i)
|
|
|
|
.unwrap_or_default()
|
|
|
|
>= self.segment_size_bytes
|
|
|
|
{
|
|
|
|
let log_buffer = buffer.remove(&topic_key).unwrap();
|
2024-11-09 17:44:05 +01:00
|
|
|
let time = SystemTime::now();
|
|
|
|
let segment_id = self
|
|
|
|
.backend
|
2024-11-10 13:42:19 +01:00
|
|
|
.flush_segment(&topic_key, &self.codec.format_log_message(log_buffer))
|
2024-11-09 17:19:19 +01:00
|
|
|
.await
|
|
|
|
.context("failed to write segment")?;
|
2024-11-09 17:44:05 +01:00
|
|
|
self.backend
|
|
|
|
.append_index(&topic_key, &segment_id, time)
|
|
|
|
.await?;
|
2024-11-09 17:19:19 +01:00
|
|
|
|
2024-11-10 13:42:19 +01:00
|
|
|
let mut log_buf = Vec::with_capacity(self.segment_size_bytes);
|
|
|
|
log_buf.push(value.to_vec());
|
|
|
|
buffer.insert(topic_key.clone(), log_buf);
|
|
|
|
} else {
|
|
|
|
log_buffer.push(value.to_vec());
|
|
|
|
}
|
2024-11-09 17:19:19 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
2024-11-09 17:44:05 +01:00
|
|
|
pub async fn commit_segment_offset(
|
|
|
|
&self,
|
|
|
|
topic: &str,
|
|
|
|
segment_id: &str,
|
|
|
|
time: SystemTime,
|
|
|
|
) -> anyhow::Result<()> {
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
2024-11-09 17:19:19 +01:00
|
|
|
fn hash_topic(&self, input: &str) -> String {
|
|
|
|
hex::encode(Sha256::digest(input.as_bytes()))
|
|
|
|
}
|
|
|
|
}
|
2024-11-10 13:42:19 +01:00
|
|
|
|
|
|
|
mod proto {
|
|
|
|
use prost::Message;
|
|
|
|
|
|
|
|
mod gen {
|
|
|
|
include!("gen/nodata.v1.rs");
|
|
|
|
}
|
|
|
|
|
|
|
|
#[derive(Default, Clone)]
|
|
|
|
pub struct ProtoStorage {}
|
|
|
|
|
|
|
|
impl ProtoStorage {
|
|
|
|
pub fn format_log_message(&self, messages: Vec<Vec<u8>>) -> Vec<u8> {
|
|
|
|
gen::Log { messages }.encode_to_vec()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|