feat: add s3 and deployment

Signed-off-by: kjuulh <contact@kjuulh.io>
This commit is contained in:
Kasper Juul Hermansen 2024-11-17 15:43:37 +01:00
parent 2e8d14f5a6
commit c8f4bae1f2
No known key found for this signature in database
12 changed files with 1093 additions and 158 deletions

8
.env
View File

@ -1 +1,9 @@
DATABASE_URL="postgres://root@localhost:26257/defaultdb?sslmode=disable"
#STORAGE_BACKEND=local
#LOCAL_STORAGE_LOCATION=/tmp/nodata/local
STORAGE_BACKEND=s3
AWS_ACCESS_KEY_ID=OgAfuzefQRBHq4up2eYr
AWS_SECRET_ACCESS_KEY=nW85rHFOlZeMg7v6kkCikpYbyE3Pw28RS2O5FNZu
AWS_ENDPOINT_URL="https://api.minio.i.kjuulh.io"
AWS_BUCKET="nodata-dev"

768
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -11,6 +11,18 @@ tracing.workspace = true
prost.workspace = true
prost-types.workspace = true
bytes.workspace = true
async-trait.workspace = true
hex = "0.4.3"
sha2 = "0.10.8"
aws-config = { version = "1.5.10", features = [
"behavior-version-latest",
], optional = true }
aws-sdk-s3 = { version = "1.61.0", features = [
"behavior-version-latest",
], optional = true }
[features]
default = ["s3"]
s3 = ["dep:aws-config", "dep:aws-sdk-s3"]

View File

@ -1,84 +1,18 @@
use std::{
env::temp_dir,
path::{Path, PathBuf},
time::{SystemTime, UNIX_EPOCH},
};
use std::time::SystemTime;
use anyhow::Context;
use tokio::io::AsyncWriteExt;
use async_trait::async_trait;
pub struct StorageBackend {
location: PathBuf,
}
pub mod local;
#[cfg(feature = "s3")]
pub mod s3;
impl StorageBackend {
pub fn new(location: &Path) -> Self {
Self {
location: location.into(),
}
}
pub fn temp() -> Self {
Self::new(&temp_dir().join("nodata"))
}
pub async fn flush_segment(&self, topic: &str, buffer: &[u8]) -> anyhow::Result<String> {
let segment_key = uuid::Uuid::now_v7();
let segment_path = PathBuf::from("logs")
.join(topic)
.join(segment_key.to_string());
tracing::trace!("writing segment file: {}", segment_path.display());
let file_location = self.location.join(&segment_path);
if let Some(parent) = file_location.parent() {
tokio::fs::create_dir_all(parent)
.await
.context("failed to create storage backend dir")?;
}
let mut segment_file = tokio::fs::File::create(&file_location).await?;
segment_file.write_all(buffer).await?;
segment_file.flush().await?;
Ok(segment_key.to_string())
}
pub async fn append_index(
#[async_trait]
pub trait StorageBackend {
async fn flush_segment(&self, topic: &str, buffer: &[u8]) -> anyhow::Result<String>;
async fn append_index(
&self,
topic: &str,
segment_file: &str,
time: SystemTime,
) -> anyhow::Result<()> {
let index_path = PathBuf::from("indexes").join(topic);
tracing::trace!("writing index file: {}", index_path.display());
let file_location = self.location.join(&index_path);
if let Some(parent) = file_location.parent() {
tokio::fs::create_dir_all(parent)
.await
.context("failed to create storage backend dir, index")?;
}
if !file_location.exists() {
tokio::fs::File::create(&file_location).await?;
}
let mut index_file = tokio::fs::File::options()
.append(true)
.open(&file_location)
.await?;
index_file
.write_all(
format!(
"{},{}\n",
time.duration_since(UNIX_EPOCH)
.expect("to be able to get time")
.as_secs(),
segment_file
)
.as_bytes(),
)
.await?;
index_file.flush().await?;
Ok(())
}
) -> anyhow::Result<()>;
}

View File

@ -0,0 +1,97 @@
use std::{
env::temp_dir,
path::{Path, PathBuf},
time::{SystemTime, UNIX_EPOCH},
};
use anyhow::Context;
use async_trait::async_trait;
use tokio::io::AsyncWriteExt;
use super::StorageBackend;
pub struct LocalStorageBackend {
location: PathBuf,
}
impl LocalStorageBackend {
pub fn new(location: &Path) -> Self {
Self {
location: location.into(),
}
}
pub fn new_from_env() -> anyhow::Result<Self> {
Ok(Self::new(&PathBuf::from(
std::env::var("LOCAL_STORAGE_LOCATION")
.context("LOCAL_STORAGE_LOCATION was not found in env")?,
)))
}
pub fn temp() -> Self {
Self::new(&temp_dir().join("nodata"))
}
}
#[async_trait]
impl StorageBackend for LocalStorageBackend {
async fn flush_segment(&self, topic: &str, buffer: &[u8]) -> anyhow::Result<String> {
let segment_key = uuid::Uuid::now_v7();
let segment_path = PathBuf::from("logs")
.join(topic)
.join(segment_key.to_string());
tracing::trace!("writing segment file: {}", segment_path.display());
let file_location = self.location.join(&segment_path);
if let Some(parent) = file_location.parent() {
tokio::fs::create_dir_all(parent)
.await
.context("failed to create storage backend dir")?;
}
let mut segment_file = tokio::fs::File::create(&file_location).await?;
segment_file.write_all(buffer).await?;
segment_file.flush().await?;
Ok(segment_key.to_string())
}
async fn append_index(
&self,
topic: &str,
segment_file: &str,
time: SystemTime,
) -> anyhow::Result<()> {
let index_path = PathBuf::from("indexes").join(topic);
tracing::trace!("writing index file: {}", index_path.display());
let file_location = self.location.join(&index_path);
if let Some(parent) = file_location.parent() {
tokio::fs::create_dir_all(parent)
.await
.context("failed to create storage backend dir, index")?;
}
if !file_location.exists() {
tokio::fs::File::create(&file_location).await?;
}
let mut index_file = tokio::fs::File::options()
.append(true)
.open(&file_location)
.await?;
index_file
.write_all(
format!(
"{},{}\n",
time.duration_since(UNIX_EPOCH)
.expect("to be able to get time")
.as_secs(),
segment_file
)
.as_bytes(),
)
.await?;
index_file.flush().await?;
Ok(())
}
}

View File

@ -0,0 +1,171 @@
use std::{
collections::BTreeMap,
time::{SystemTime, UNIX_EPOCH},
};
use anyhow::Context;
use async_trait::async_trait;
use aws_config::{BehaviorVersion, Region};
use aws_sdk_s3::{
config::Credentials,
primitives::{ByteStream, SdkBody},
};
use tokio::{
io::{AsyncReadExt, BufReader},
sync::RwLock,
};
use super::StorageBackend;
pub struct S3StorageBackend {
client: aws_sdk_s3::Client,
bucket: String,
index_lock: RwLock<BTreeMap<String, RwLock<()>>>,
}
impl S3StorageBackend {
pub async fn upload_file(&self, path: &str, buffer: &[u8]) -> anyhow::Result<()> {
tracing::trace!("committing file: {}", &path);
self.client
.put_object()
.bucket(&self.bucket)
.key(path)
.body(ByteStream::new(SdkBody::from(buffer)))
.send()
.await?;
Ok(())
}
pub async fn get_file(&self, path: &str) -> anyhow::Result<Option<Vec<u8>>> {
tracing::trace!("getting file: {}", path);
let obj = match self
.client
.get_object()
.bucket(&self.bucket)
.key(path)
.send()
.await
{
Ok(ok) => ok,
Err(err) => match err.into_service_error() {
aws_sdk_s3::operation::get_object::GetObjectError::NoSuchKey(_) => return Ok(None),
e => anyhow::bail!(e.to_string()),
},
};
let mut buf_reader = BufReader::new(obj.body.into_async_read());
let mut output = Vec::new();
buf_reader.read_buf(&mut output).await?;
Ok(Some(output))
}
pub async fn append_file(&self, path: &str, buffer: &[u8]) -> anyhow::Result<()> {
tracing::trace!("appending file: {}", &path);
{
let mut index_lock = self.index_lock.write().await;
let item = index_lock.get(path);
if item.is_none() {
index_lock.insert(path.to_string(), RwLock::default());
}
}
let index_lock = self.index_lock.read().await;
let item = index_lock.get(path).expect("to find a path lock");
let lock = item.write().await;
let file = self.get_file(path).await?;
match file {
Some(mut file_contents) => {
file_contents.extend_from_slice(buffer);
self.upload_file(path, &file_contents).await?
}
None => self.upload_file(path, buffer).await?,
}
drop(lock);
Ok(())
}
}
impl S3StorageBackend {
pub async fn new(
key_id: impl Into<String>,
key: impl Into<String>,
endpint_url: impl Into<String>,
bucket: impl Into<String>,
) -> anyhow::Result<Self> {
let shared_config = aws_config::defaults(BehaviorVersion::latest())
.region(Region::new("eu-west-1"))
.credentials_provider(Credentials::new(
key_id,
key,
None,
None,
env!("CARGO_PKG_NAME"),
));
let config = aws_sdk_s3::config::Builder::from(&shared_config.load().await)
.endpoint_url(endpint_url)
.force_path_style(true)
.build();
let client = aws_sdk_s3::Client::from_conf(config);
Ok(Self {
client,
bucket: bucket.into(),
index_lock: RwLock::default(),
})
}
pub async fn new_from_env() -> anyhow::Result<Self> {
let key_id = std::env::var("AWS_ACCESS_KEY_ID").context("AWS_ACCESS_KEY_ID was not set")?;
let access_key =
std::env::var("AWS_SECRET_ACCESS_KEY").context("AWS_SECRET_ACCESS_KEY was not set")?;
let endpoint_url =
std::env::var("AWS_ENDPOINT_URL").context("AWS_ENDPOINT_URL was not set")?;
let bucket = std::env::var("AWS_BUCKET").context("AWS_BUCKET was not set")?;
Self::new(key_id, access_key, endpoint_url, bucket).await
}
}
#[async_trait]
impl StorageBackend for S3StorageBackend {
async fn flush_segment(&self, topic: &str, buffer: &[u8]) -> anyhow::Result<String> {
let segment_key = uuid::Uuid::now_v7();
self.upload_file(
&format!("nodata/logs/{}/{}.pb", topic, &segment_key.to_string()),
buffer,
)
.await?;
Ok(segment_key.to_string())
}
async fn append_index(
&self,
topic: &str,
segment_file: &str,
time: SystemTime,
) -> anyhow::Result<()> {
self.append_file(
&format!("nodata/indexes/{}", topic),
format!(
"{},{}\n",
time.duration_since(UNIX_EPOCH)
.expect("to be able to get time")
.as_secs(),
segment_file
)
.as_bytes(),
)
.await
}
}

View File

@ -8,7 +8,7 @@
use std::{collections::BTreeMap, sync::Arc, time::SystemTime};
use anyhow::Context;
use backend::StorageBackend;
use backend::{local::LocalStorageBackend, StorageBackend};
use proto::ProtoStorage;
use sha2::{Digest, Sha256};
use tokio::sync::Mutex;
@ -21,19 +21,43 @@ pub mod backend;
pub struct Storage {
segment_size_bytes: usize,
buffer: Arc<Mutex<BTreeMap<TopicHashKey, Vec<Vec<u8>>>>>,
backend: Arc<StorageBackend>,
backend: Arc<dyn StorageBackend + Send + Sync + 'static>,
codec: ProtoStorage,
}
impl Storage {
pub fn new(backend: StorageBackend) -> Self {
pub fn new(backend: LocalStorageBackend) -> Self {
Self {
segment_size_bytes: 4096 * 1000, // 4MB
buffer: Arc::default(),
codec: ProtoStorage::default(),
backend: Arc::new(backend),
}
}
pub async fn new_from_env() -> anyhow::Result<Self> {
match std::env::var("STORAGE_BACKEND")
.context("failed to find STORAGE_BACKEND in env")?
.as_str()
{
"local" => Ok(Self {
segment_size_bytes: 4096 * 1000, // 4MB
buffer: Arc::default(),
codec: ProtoStorage::default(),
backend: Arc::new(LocalStorageBackend::new_from_env()?),
}),
#[cfg(feature = "s3")]
"s3" => Ok(Self {
segment_size_bytes: 4 * 1024 * 1000, // 4MB
buffer: Arc::default(),
codec: ProtoStorage::default(),
backend: Arc::new(backend::s3::S3StorageBackend::new_from_env().await?),
}),
backend => anyhow::bail!("backend is not supported: {}", backend),
}
}

View File

@ -61,7 +61,12 @@ impl no_data_service_server::NoDataService for GrpcServer {
self.counter.inc();
self.state.ingest().publish(req).await.map_err(|e| {
tracing::warn!(error = e.to_string(), "failed to handle ingest of data");
let caused_by = e
.chain()
.map(|e| e.to_string())
.collect::<Vec<String>>()
.join(", ");
tracing::warn!("failed to handle ingest of data: {}: {}", e, caused_by);
tonic::Status::internal(e.to_string())
})?;

View File

@ -288,7 +288,7 @@ mod test {
let topic = "some-topic".to_string();
let offset = 9usize;
let staging = Staging::new();
let staging = Staging::new().await?;
// Publish 10 messages
for _ in 0..10 {
let offset = staging

View File

@ -1,5 +1,6 @@
use std::{collections::BTreeMap, sync::Arc};
use nodata_storage::backend::local::LocalStorageBackend;
use tokio::sync::RwLock;
use crate::state::SharedState;
@ -23,11 +24,11 @@ pub struct Staging {
}
impl Staging {
pub fn new() -> Self {
Self {
pub async fn new() -> anyhow::Result<Self> {
Ok(Self {
store: Arc::default(),
storage: nodata_storage::Storage::new(nodata_storage::backend::StorageBackend::temp()),
}
storage: nodata_storage::Storage::new_from_env().await?,
})
}
pub async fn publish(

View File

@ -1,8 +1,6 @@
use std::{ops::Deref, sync::Arc};
use anyhow::Context;
use prometheus::Registry;
use sqlx::{Pool, Postgres};
use crate::services::{consumers::Consumers, handler::Handler, staging::Staging};
@ -24,7 +22,6 @@ impl Deref for SharedState {
}
pub struct State {
pub _db: Pool<Postgres>,
pub staging: Staging,
pub consumers: Consumers,
pub handler: Handler,
@ -33,23 +30,10 @@ pub struct State {
impl State {
pub async fn new() -> anyhow::Result<Self> {
let db = sqlx::PgPool::connect(
&std::env::var("DATABASE_URL").context("DATABASE_URL is not set")?,
)
.await?;
sqlx::migrate!("migrations/crdb")
.set_locking(false)
.run(&db)
.await?;
let _ = sqlx::query("SELECT 1;").fetch_one(&db).await?;
let staging = Staging::new();
let staging = Staging::new().await?;
let handler = Handler::new(staging.clone());
Ok(Self {
_db: db,
consumers: Consumers::new(),
staging,
handler,

View File

@ -6,16 +6,35 @@ vars:
service: "nodata"
registry: kasperhermansen
clusters:
clank-prod:
replicas: "3"
namespace: prod
database:
crdb: "false"
ingress:
- internal: "true"
- internal_grpc: "true"
deployment:
registry: git@git.front.kjuulh.io:kjuulh/clank-clusters
cuddle/clusters:
dev:
env:
prod:
clusters:
- clank-prod
service.host: "0.0.0.0:3001"
service.grpc.host: "0.0.0.0:4001"
storage.backend: "s3"
aws.endpoint.url: "https://api.minio.i.kjuulh.io"
aws.bucket: "nodata-dev"
aws.access.key.id:
vault: true
aws.secret.access.key:
vault: true
prod:
env:
service.host: "0.0.0.0:3001"
service.grpc.host: "0.0.0.0:4001"
storage.backend: "s3"
aws.endpoint.url: "https://api.minio.i.kjuulh.io"
aws.bucket: "nodata-prod"
aws.access.key.id:
vault: true
aws.secret.access.key:
vault: true