feat: add support for reading from gitea

Signed-off-by: kjuulh <contact@kjuulh.io>
This commit is contained in:
Kasper Juul Hermansen 2024-04-12 16:17:43 +02:00
parent 3ceb52c378
commit 9cbef537c9
Signed by: kjuulh
GPG Key ID: 57B6E1465221F912
12 changed files with 885 additions and 412 deletions

1
.env
View File

@ -1 +0,0 @@
DATABASE_URL="postgres://root@localhost:26257/defaultdb?sslmode=disable"

1
.gitignore vendored
View File

@ -1,2 +1,3 @@
target/
.cuddle/
.env

993
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -17,3 +17,4 @@ sqlx = { version = "0.7.3", features = ["runtime-tokio", "tls-rustls", "postgres
uuid = { version = "1.7.0", features = ["v4"] }
tower-http = { version = "0.5.2", features = ["cors", "trace"] }
futures = "0.3.30"
reqwest = {version = "0.12.3", default-features = false, features = ["json", "rustls-tls"]}

View File

@ -17,6 +17,13 @@ enum Commands {
#[arg(env = "SERVICE_HOST", long, default_value = "127.0.0.1:3000")]
host: SocketAddr,
},
Reconcile {
#[arg(long)]
user: Option<String>,
#[arg(long)]
orgs: Option<Vec<String>>,
},
}
mod api;
@ -29,29 +36,39 @@ async fn main() -> anyhow::Result<()> {
let cli = Command::parse();
if let Some(Commands::Serve { host }) = cli.command {
tracing::info!("Starting service");
match cli.command {
Some(Commands::Serve { host }) => {
tracing::info!("Starting service");
let state = SharedState::from(Arc::new(State::new().await?));
let state = SharedState::from(Arc::new(State::new().await?));
let mut tasks = FuturesUnordered::new();
let mut tasks = FuturesUnordered::new();
tasks.push({
let state = state.clone();
task::spawn(async move {
serve_axum(&state, &host).await?;
tasks.push({
let state = state.clone();
task::spawn(async move {
serve_axum(&state, &host).await?;
Ok::<(), anyhow::Error>(())
})
});
tasks.push(task::spawn(async move {
serve_cron_jobs(&state).await?;
Ok::<(), anyhow::Error>(())
})
});
}));
tasks.push(task::spawn(async move {
serve_cron_jobs(&state).await?;
Ok::<(), anyhow::Error>(())
}));
while let Some(result) = tasks.next().await {
result??
while let Some(result) = tasks.next().await {
result??
}
}
Some(Commands::Reconcile { user, orgs }) => {
tracing::info!("running reconcile");
let state = SharedState::from(Arc::new(State::new().await?));
state.reconciler().reconcile(user, orgs).await?;
}
None => {}
}
Ok(())
@ -59,4 +76,6 @@ async fn main() -> anyhow::Result<()> {
mod state;
pub use crate::state::{SharedState, State};
use crate::{api::serve_axum, schedule::serve_cron_jobs};
use crate::{api::serve_axum, schedule::serve_cron_jobs, services::reconciler::ReconcilerState};
mod services;

View File

@ -1,9 +1,14 @@
use crate::SharedState;
use crate::{services::gitea::GiteaClientState, SharedState};
pub async fn serve_cron_jobs(_state: &SharedState) -> Result<(), anyhow::Error> {
pub async fn serve_cron_jobs(state: &SharedState) -> Result<(), anyhow::Error> {
let state = state.clone();
tokio::spawn(async move {
let gitea_client = state.gitea_client();
loop {
tracing::info!("running cronjobs");
todo!();
tokio::time::sleep(std::time::Duration::from_secs(10_000)).await;
}
Ok::<(), anyhow::Error>(())

View File

@ -0,0 +1,2 @@
pub mod gitea;
pub mod reconciler;

View File

@ -0,0 +1,147 @@
use std::{ops::Deref, pin::Pin, sync::Arc};
type DynGiteaClient = Arc<dyn traits::GiteaClient + Send + Sync + 'static>;
pub struct GiteaClient(DynGiteaClient);
impl GiteaClient {
pub fn new() -> Self {
Self(Arc::new(DefaultGiteaClient::default()))
}
}
impl Deref for GiteaClient {
type Target = DynGiteaClient;
fn deref(&self) -> &Self::Target {
&self.0
}
}
#[derive(Clone, Debug)]
pub struct Repository {
pub owner: String,
pub name: String,
}
impl TryFrom<GiteaRepository> for Repository {
type Error = anyhow::Error;
fn try_from(value: GiteaRepository) -> Result<Self, Self::Error> {
let (owner, name) = value
.full_name
.split_once('/')
.ok_or(anyhow::anyhow!(
"name of repository is invalid, should contain a /"
))
.map_err(|e| {
tracing::warn!("failed to parse repository: {}", e);
e
})?;
Ok(Repository {
owner: owner.into(),
name: name.into(),
})
}
}
#[derive(Clone, Debug, Deserialize)]
pub struct GiteaRepository {
full_name: String,
}
pub struct DefaultGiteaClient {
url: String,
token: String,
}
impl Default for DefaultGiteaClient {
fn default() -> Self {
Self {
url: std::env::var("GITEA_URL")
.context("GITEA_URL should be set")
.map(|g| g.trim_end_matches('/').to_string())
.unwrap(),
token: std::env::var("GITEA_TOKEN")
.context("GITEA_TOKEN should be set")
.unwrap(),
}
}
}
impl DefaultGiteaClient {
pub async fn fetch_user_repos(&self) -> anyhow::Result<Vec<Repository>> {
let client = reqwest::Client::new();
let url = format!("{}/api/v1/user/repos", self.url);
tracing::trace!("calling url: {}", &url);
let response = client
.get(&url)
.header("Content-Type", "application/json")
.header("Authorization", format!("token {}", self.token))
.send()
.await?;
let repositories = response.json::<Vec<GiteaRepository>>().await?;
Ok(repositories
.into_iter()
.flat_map(Repository::try_from)
.collect())
}
pub async fn fetch_org_repos(&self, org: &str) -> anyhow::Result<Vec<Repository>> {
let client = reqwest::Client::new();
let url = format!("{}/api/v1/orgs/{}/repos", self.url, org);
tracing::trace!("calling url: {}", &url);
let response = client
.get(&url)
.header("Content-Type", "application/json")
.header("Authorization", format!("token {}", self.token))
.send()
.await?;
let repositories = response.json::<Vec<GiteaRepository>>().await?;
Ok(repositories
.into_iter()
.flat_map(Repository::try_from)
.collect())
}
}
impl traits::GiteaClient for DefaultGiteaClient {
fn get_user_repositories<'a>(
&'a self,
user: &str,
) -> Pin<Box<dyn futures::prelude::Future<Output = anyhow::Result<Vec<Repository>>> + Send + 'a>>
{
tracing::debug!("fetching gitea repositories for user: {user}");
Box::pin(async { self.fetch_user_repos().await })
}
fn get_org_repositories<'a>(
&'a self,
org: &'a str,
) -> Pin<Box<dyn futures::prelude::Future<Output = anyhow::Result<Vec<Repository>>> + Send + 'a>>
{
tracing::debug!("fetching gitea repositories for org: {org}");
Box::pin(async move { self.fetch_org_repos(org).await })
}
}
mod extensions;
pub mod traits;
use anyhow::Context;
use axum::http::HeaderMap;
pub use extensions::*;
use serde::Deserialize;

View File

@ -0,0 +1,11 @@
use crate::SharedState;
use super::GiteaClient;
pub trait GiteaClientState {
fn gitea_client(&self) -> GiteaClient {
GiteaClient::new()
}
}
impl GiteaClientState for SharedState {}

View File

@ -0,0 +1,17 @@
use std::pin::Pin;
use futures::Future;
use super::Repository;
pub trait GiteaClient {
fn get_user_repositories<'a>(
&'a self,
user: &str,
) -> Pin<Box<dyn Future<Output = anyhow::Result<Vec<Repository>>> + Send + 'a>>;
fn get_org_repositories<'a>(
&'a self,
org: &'a str,
) -> Pin<Box<dyn Future<Output = anyhow::Result<Vec<Repository>>> + Send + 'a>>;
}

View File

@ -0,0 +1,58 @@
use crate::SharedState;
use super::gitea::{GiteaClient, GiteaClientState, Repository};
pub struct Reconciler {
gitea_client: GiteaClient,
}
impl Reconciler {
pub fn new(gitea_client: GiteaClient) -> Self {
Self { gitea_client }
}
pub async fn reconcile(
&self,
user: Option<String>,
orgs: Option<Vec<String>>,
) -> anyhow::Result<()> {
let repos = self.get_repos(user, orgs).await?;
tracing::info!("found repositories: {}", repos.len());
Ok(())
}
async fn get_repos(
&self,
user: Option<String>,
orgs: Option<Vec<String>>,
) -> anyhow::Result<Vec<Repository>> {
let mut repos = Vec::new();
if let Some(user) = user {
let mut r = self.gitea_client.get_user_repositories(&user).await?;
repos.append(&mut r);
}
if let Some(orgs) = orgs {
for org in orgs {
let mut r = self.gitea_client.get_org_repositories(&org).await?;
repos.append(&mut r);
}
}
Ok(repos)
}
}
pub trait ReconcilerState {
fn reconciler(&self) -> Reconciler;
}
impl ReconcilerState for SharedState {
fn reconciler(&self) -> Reconciler {
Reconciler::new(self.gitea_client())
}
}

View File

@ -1,5 +1,7 @@
use std::{ops::Deref, sync::Arc};
use crate::services::gitea::GiteaClient;
#[derive(Clone)]
pub struct SharedState(Arc<State>);