From 86cfc180767e3dc18fea100d886e6ecf3f1103b3 Mon Sep 17 00:00:00 2001 From: kjuulh Date: Sun, 27 Aug 2023 19:57:16 +0200 Subject: [PATCH] chore: fmt Signed-off-by: kjuulh --- crates/churn-server/src/event.rs | 28 +++++++++++++++---- crates/churn-server/src/main.rs | 47 +++++++++++++++++++------------- 2 files changed, 51 insertions(+), 24 deletions(-) diff --git a/crates/churn-server/src/event.rs b/crates/churn-server/src/event.rs index 8738390..b974cbe 100644 --- a/crates/churn-server/src/event.rs +++ b/crates/churn-server/src/event.rs @@ -1,14 +1,10 @@ - - use std::sync::Arc; use axum::async_trait; -use churn_domain::{LogEvent}; +use churn_domain::LogEvent; use itertools::Itertools; - - use churn_capnp::CapnpPackExt; use crate::db::Db; @@ -52,6 +48,7 @@ pub trait EventServiceTrait { async fn append(&self, req: LogEvent) -> anyhow::Result<()>; async fn get_from_cursor(&self, cursor: uuid::Uuid) -> anyhow::Result>; async fn get_from_beginning(&self) -> anyhow::Result>; + async fn get_latest_cursor(&self) -> anyhow::Result; } #[async_trait] @@ -93,4 +90,25 @@ impl EventServiceTrait for DefaultEventService { Ok(events) } + + async fn get_latest_cursor(&self) -> anyhow::Result { + let events = self.db.get_all("events_log").await?; + + let event = events + .iter() + .flat_map(|e| match LogEvent::deserialize_capnp(e) { + Ok(o) => Ok(o), + Err(e) => { + tracing::error!("failed to deserialize capnp: {e}"); + Err(e) + } + }) + .sorted_by_key(|i| i.timestamp) + .last(); + + match event { + Some(x) => Ok(x.id), + None => anyhow::bail!("no events found"), + } + } } diff --git a/crates/churn-server/src/main.rs b/crates/churn-server/src/main.rs index ba0758e..6a6ea51 100644 --- a/crates/churn-server/src/main.rs +++ b/crates/churn-server/src/main.rs @@ -19,7 +19,7 @@ use churn_domain::{Agent, LeaseResp, LogEvent, ServerEnrollReq, ServerMonitorRes use clap::{Args, Parser, Subcommand, ValueEnum}; use event::EventService; use lease::LeaseService; -use serde::{Deserialize}; +use serde::Deserialize; use serde_json::json; use crate::db::Db; @@ -208,24 +208,33 @@ async fn logs( } } - let events = match cursor.cursor { - Some(c) => state.events.get_from_cursor(c).await, - None => state.events.get_from_beginning().await, - } - .map_err(AppError::Internal)?; + match cursor.cursor { + Some(c) => { + let events = state + .events + .get_from_cursor(c) + .await + .map_err(AppError::Internal)?; - if events.is_empty() { - return Ok(Json(ServerMonitorResp { - cursor: cursor.cursor, - logs: Vec::new(), - })); - } + Ok(Json(ServerMonitorResp { + cursor: events.last().map(|e| e.id), + logs: events + .iter() + .map(|e| format!("{}: {}", e.author, e.content)) + .collect(), + })) + } + None => { + let cursor = state + .events + .get_latest_cursor() + .await + .map_err(AppError::Internal)?; - Ok(Json(ServerMonitorResp { - cursor: events.last().map(|e| e.id), - logs: events - .iter() - .map(|e| format!("{}: {}", e.author, e.content)) - .collect(), - })) + return Ok(Json(ServerMonitorResp { + cursor: Some(cursor), + logs: Vec::new(), + })); + } + } }