feat: add storage backend
Signed-off-by: kjuulh <contact@kjuulh.io>
This commit is contained in:
parent
ecc6d785e7
commit
4d37578c59
13
Cargo.lock
generated
13
Cargo.lock
generated
@ -1625,6 +1625,7 @@ dependencies = [
|
|||||||
"dagger-sdk",
|
"dagger-sdk",
|
||||||
"dotenv",
|
"dotenv",
|
||||||
"drift",
|
"drift",
|
||||||
|
"nodata-storage",
|
||||||
"notmad",
|
"notmad",
|
||||||
"prost",
|
"prost",
|
||||||
"prost-types",
|
"prost-types",
|
||||||
@ -1655,6 +1656,18 @@ dependencies = [
|
|||||||
"tracing",
|
"tracing",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "nodata-storage"
|
||||||
|
version = "0.1.0"
|
||||||
|
dependencies = [
|
||||||
|
"anyhow",
|
||||||
|
"hex",
|
||||||
|
"sha2",
|
||||||
|
"tokio",
|
||||||
|
"tracing",
|
||||||
|
"uuid",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "nom"
|
name = "nom"
|
||||||
version = "7.1.3"
|
version = "7.1.3"
|
||||||
|
@ -3,6 +3,7 @@ members = ["crates/*"]
|
|||||||
resolver = "2"
|
resolver = "2"
|
||||||
|
|
||||||
[workspace.dependencies]
|
[workspace.dependencies]
|
||||||
|
nodata-storage = { path = "crates/nodata-storage" }
|
||||||
|
|
||||||
anyhow = { version = "1" }
|
anyhow = { version = "1" }
|
||||||
tokio = { version = "1", features = ["full"] }
|
tokio = { version = "1", features = ["full"] }
|
||||||
@ -17,3 +18,4 @@ tonic = "0.12.1"
|
|||||||
bytes = "1.7.1"
|
bytes = "1.7.1"
|
||||||
prost = "0.13.1"
|
prost = "0.13.1"
|
||||||
prost-types = "0.13.1"
|
prost-types = "0.13.1"
|
||||||
|
uuid = { version = "1.7.0", features = ["v4", "v7"] }
|
||||||
|
13
crates/nodata-storage/Cargo.toml
Normal file
13
crates/nodata-storage/Cargo.toml
Normal file
@ -0,0 +1,13 @@
|
|||||||
|
[package]
|
||||||
|
name = "nodata-storage"
|
||||||
|
version = "0.1.0"
|
||||||
|
edition = "2021"
|
||||||
|
|
||||||
|
[dependencies]
|
||||||
|
anyhow.workspace = true
|
||||||
|
tokio.workspace = true
|
||||||
|
uuid.workspace = true
|
||||||
|
tracing.workspace = true
|
||||||
|
|
||||||
|
hex = "0.4.3"
|
||||||
|
sha2 = "0.10.8"
|
114
crates/nodata-storage/src/lib.rs
Normal file
114
crates/nodata-storage/src/lib.rs
Normal file
@ -0,0 +1,114 @@
|
|||||||
|
// # Nodata Storage:
|
||||||
|
// storage is an abstraction for a log based partitiniong system on top of a given file system.
|
||||||
|
// it allows treating topics with a stream of events as logs, that are persisted for fast lookups
|
||||||
|
//
|
||||||
|
// A topic has a log corresponding log location (hash of the topic name)
|
||||||
|
// Each message is sequentially added to a log file and offsets are stored in a separate index file for fast lookup
|
||||||
|
|
||||||
|
use std::{collections::BTreeMap, sync::Arc};
|
||||||
|
|
||||||
|
use anyhow::Context;
|
||||||
|
use backend::StorageBackend;
|
||||||
|
use sha2::{Digest, Sha256};
|
||||||
|
use tokio::sync::Mutex;
|
||||||
|
|
||||||
|
type TopicHashKey = String;
|
||||||
|
|
||||||
|
pub mod backend {
|
||||||
|
use std::{
|
||||||
|
env::temp_dir,
|
||||||
|
path::{Path, PathBuf},
|
||||||
|
};
|
||||||
|
|
||||||
|
use anyhow::Context;
|
||||||
|
use tokio::io::AsyncWriteExt;
|
||||||
|
|
||||||
|
pub struct StorageBackend {
|
||||||
|
location: PathBuf,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl StorageBackend {
|
||||||
|
pub fn new(location: &Path) -> Self {
|
||||||
|
Self {
|
||||||
|
location: location.into(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn temp() -> Self {
|
||||||
|
Self::new(&temp_dir().join("nodata"))
|
||||||
|
}
|
||||||
|
pub async fn flush_segment(&self, topic: &str, buffer: &[u8]) -> anyhow::Result<String> {
|
||||||
|
let segment_key = uuid::Uuid::now_v7();
|
||||||
|
let segment_path = PathBuf::from("logs")
|
||||||
|
.join(topic)
|
||||||
|
.join(segment_key.to_string());
|
||||||
|
tracing::trace!("writing segment file: {}", segment_path.display());
|
||||||
|
let file_location = self.location.join(&segment_path);
|
||||||
|
if let Some(parent) = file_location.parent() {
|
||||||
|
tokio::fs::create_dir_all(parent)
|
||||||
|
.await
|
||||||
|
.context("failed to create storage backend dir")?;
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut segment_file = tokio::fs::File::create(&file_location).await?;
|
||||||
|
segment_file.write_all(buffer).await?;
|
||||||
|
segment_file.flush().await?;
|
||||||
|
|
||||||
|
Ok(segment_path.to_string_lossy().to_string())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct Storage {
|
||||||
|
segment_size_bytes: usize,
|
||||||
|
buffer: Arc<Mutex<BTreeMap<TopicHashKey, Vec<u8>>>>,
|
||||||
|
backend: Arc<StorageBackend>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Storage {
|
||||||
|
pub fn new(backend: StorageBackend) -> Self {
|
||||||
|
Self {
|
||||||
|
segment_size_bytes: 4096 * 100, // 4MB
|
||||||
|
buffer: Arc::default(),
|
||||||
|
backend: Arc::new(backend),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn push_message(&self, topic: &str, value: &[u8]) -> anyhow::Result<()> {
|
||||||
|
// TODO: Consider caching the topic hash, such that we don't have to hash it every time we need to publish a message
|
||||||
|
let topic_key = self.hash_topic(topic);
|
||||||
|
|
||||||
|
if value.len() > self.segment_size_bytes {
|
||||||
|
anyhow::bail!("value to large for segment");
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
// FIXME: the lock is very expensive here, especially as it is over multiple wait points. Until it becomes a problem, just leave it be
|
||||||
|
let mut buffer = self.buffer.lock().await;
|
||||||
|
if buffer.get_mut(&topic_key).is_none() {
|
||||||
|
buffer.insert(
|
||||||
|
topic_key.clone(),
|
||||||
|
Vec::with_capacity(self.segment_size_bytes),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
let log_buffer = buffer.get_mut(&topic_key).unwrap();
|
||||||
|
if log_buffer.len() + value.len() >= self.segment_size_bytes {
|
||||||
|
self.backend
|
||||||
|
.flush_segment(&topic_key, log_buffer)
|
||||||
|
.await
|
||||||
|
.context("failed to write segment")?;
|
||||||
|
log_buffer.clear();
|
||||||
|
}
|
||||||
|
|
||||||
|
log_buffer.extend(value);
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn hash_topic(&self, input: &str) -> String {
|
||||||
|
hex::encode(Sha256::digest(input.as_bytes()))
|
||||||
|
}
|
||||||
|
}
|
@ -4,6 +4,8 @@ version = "0.1.0"
|
|||||||
edition = "2021"
|
edition = "2021"
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
|
nodata-storage.workspace = true
|
||||||
|
|
||||||
anyhow.workspace = true
|
anyhow.workspace = true
|
||||||
tokio.workspace = true
|
tokio.workspace = true
|
||||||
tracing.workspace = true
|
tracing.workspace = true
|
||||||
@ -12,6 +14,7 @@ clap.workspace = true
|
|||||||
dotenv.workspace = true
|
dotenv.workspace = true
|
||||||
axum.workspace = true
|
axum.workspace = true
|
||||||
drift.workspace = true
|
drift.workspace = true
|
||||||
|
uuid.workspace = true
|
||||||
|
|
||||||
serde = { version = "1.0.197", features = ["derive"] }
|
serde = { version = "1.0.197", features = ["derive"] }
|
||||||
sqlx = { version = "0.7.3", features = [
|
sqlx = { version = "0.7.3", features = [
|
||||||
@ -21,7 +24,6 @@ sqlx = { version = "0.7.3", features = [
|
|||||||
"uuid",
|
"uuid",
|
||||||
"time",
|
"time",
|
||||||
] }
|
] }
|
||||||
uuid = { version = "1.7.0", features = ["v4"] }
|
|
||||||
tower-http = { version = "0.5.2", features = ["cors", "trace"] }
|
tower-http = { version = "0.5.2", features = ["cors", "trace"] }
|
||||||
tokio-util = "0.7.11"
|
tokio-util = "0.7.11"
|
||||||
tonic.workspace = true
|
tonic.workspace = true
|
||||||
|
@ -288,7 +288,7 @@ mod test {
|
|||||||
let topic = "some-topic".to_string();
|
let topic = "some-topic".to_string();
|
||||||
let offset = 9usize;
|
let offset = 9usize;
|
||||||
|
|
||||||
let staging = Staging::default();
|
let staging = Staging::new();
|
||||||
// Publish 10 messages
|
// Publish 10 messages
|
||||||
for _ in 0..10 {
|
for _ in 0..10 {
|
||||||
let offset = staging
|
let offset = staging
|
||||||
|
@ -1,4 +1,4 @@
|
|||||||
use std::{collections::BTreeMap, sync::Arc};
|
use std::{collections::BTreeMap, env::temp_dir, sync::Arc};
|
||||||
|
|
||||||
use tokio::sync::RwLock;
|
use tokio::sync::RwLock;
|
||||||
|
|
||||||
@ -13,14 +13,23 @@ pub struct StagingEvent {
|
|||||||
pub value: Vec<u8>,
|
pub value: Vec<u8>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone, Default)]
|
#[derive(Clone)]
|
||||||
pub struct Staging {
|
pub struct Staging {
|
||||||
// Temporary until we've got an actual file disk store
|
// Temporary until we've got an actual file disk store
|
||||||
#[allow(clippy::complexity)]
|
#[allow(clippy::complexity)]
|
||||||
store: Arc<RwLock<BTreeMap<Topic, Vec<StagingEvent>>>>,
|
store: Arc<RwLock<BTreeMap<Topic, Vec<StagingEvent>>>>,
|
||||||
|
|
||||||
|
storage: nodata_storage::Storage,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Staging {
|
impl Staging {
|
||||||
|
pub fn new() -> Self {
|
||||||
|
Self {
|
||||||
|
store: Arc::default(),
|
||||||
|
storage: nodata_storage::Storage::new(nodata_storage::backend::StorageBackend::temp()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pub async fn publish(
|
pub async fn publish(
|
||||||
&self,
|
&self,
|
||||||
staging_event: impl Into<StagingEvent>,
|
staging_event: impl Into<StagingEvent>,
|
||||||
@ -29,6 +38,10 @@ impl Staging {
|
|||||||
let mut store = self.store.write().await;
|
let mut store = self.store.write().await;
|
||||||
tracing::trace!(topic = staging_event.topic, "moving event to staging");
|
tracing::trace!(topic = staging_event.topic, "moving event to staging");
|
||||||
|
|
||||||
|
self.storage
|
||||||
|
.push_message(&staging_event.topic, &staging_event.value)
|
||||||
|
.await?;
|
||||||
|
|
||||||
let offset = match store.get_mut(&staging_event.topic) {
|
let offset = match store.get_mut(&staging_event.topic) {
|
||||||
Some(part) => {
|
Some(part) => {
|
||||||
part.push(staging_event);
|
part.push(staging_event);
|
||||||
|
@ -43,7 +43,7 @@ impl State {
|
|||||||
|
|
||||||
let _ = sqlx::query("SELECT 1;").fetch_one(&db).await?;
|
let _ = sqlx::query("SELECT 1;").fetch_one(&db).await?;
|
||||||
|
|
||||||
let staging = Staging::default();
|
let staging = Staging::new();
|
||||||
let handler = Handler::new(staging.clone());
|
let handler = Handler::new(staging.clone());
|
||||||
|
|
||||||
Ok(Self {
|
Ok(Self {
|
||||||
|
Loading…
Reference in New Issue
Block a user