feat: add paging
All checks were successful
continuous-integration/drone/push Build is passing

Signed-off-by: kjuulh <contact@kjuulh.io>
This commit is contained in:
Kasper Juul Hermansen 2024-04-13 00:07:46 +02:00
parent 8d6e706263
commit 2b095687b4
Signed by: kjuulh
GPG Key ID: 9AA7BC13CE474394
2 changed files with 128 additions and 22 deletions

View File

@ -81,6 +81,8 @@ async fn main() -> anyhow::Result<()> {
.reconciler() .reconciler()
.reconcile(user, org, filter, force_refresh) .reconcile(user, org, filter, force_refresh)
.await?; .await?;
tracing::info!("done running reconcile");
} }
None => {} None => {}
} }

View File

@ -113,11 +113,13 @@ pub struct CreateGiteaWebhookConfig {
} }
impl DefaultGiteaClient { impl DefaultGiteaClient {
pub async fn fetch_user_repos(&self) -> anyhow::Result<Vec<Repository>> { async fn fetch_user_repos_page(
//FIXME: We should collect the pages for these queries &self,
page: usize,
) -> anyhow::Result<(Vec<Repository>, Vec<usize>)> {
let client = reqwest::Client::new(); let client = reqwest::Client::new();
let url = format!("{}/api/v1/user/repos", self.url); let url = format!("{}/api/v1/user/repos?page={page}&limit=50", self.url);
tracing::trace!("calling url: {}", &url); tracing::trace!("calling url: {}", &url);
@ -128,34 +130,105 @@ impl DefaultGiteaClient {
.send() .send()
.await?; .await?;
let mut pages = Vec::new();
if page <= 1 {
if let Some(link_header) = response.headers().get("link") {
let link_str = link_header.to_str()?;
pages = parse_link(page, link_str)?;
}
}
let repositories = response.json::<Vec<GiteaRepository>>().await?; let repositories = response.json::<Vec<GiteaRepository>>().await?;
Ok(repositories Ok((
repositories
.into_iter() .into_iter()
.flat_map(Repository::try_from) .flat_map(Repository::try_from)
.collect()) .collect(),
pages,
))
}
pub async fn fetch_user_repos(&self) -> anyhow::Result<Vec<Repository>> {
let (mut repos, pages) = self.fetch_user_repos_page(1).await?;
let mut tasks = FuturesUnordered::new();
for page in pages {
tasks.push(async move {
let (new_repos, _) = self.fetch_user_repos_page(page).await?;
Ok::<Vec<Repository>, anyhow::Error>(new_repos)
})
}
while let Some(new_repos) = tasks.next().await {
let mut new_repos = new_repos?;
repos.append(&mut new_repos);
}
Ok(repos)
}
async fn fetch_org_repos_page(
&self,
org: &str,
page: usize,
) -> anyhow::Result<(Vec<Repository>, Vec<usize>)> {
let client = reqwest::Client::new();
let url = format!(
"{}/api/v1/orgs/{}/repos?page={page}&limit=50",
self.url, org
);
tracing::trace!("calling url: {}", &url);
let response = client
.get(&url)
.header("Content-Type", "application/json")
.header("Authorization", format!("token {}", self.token))
.send()
.await?;
let mut pages = Vec::new();
if page <= 1 {
if let Some(link_header) = response.headers().get("link") {
let link_str = link_header.to_str()?;
pages = parse_link(page, link_str)?;
}
}
let repositories = response.json::<Vec<GiteaRepository>>().await?;
Ok((
repositories
.into_iter()
.flat_map(Repository::try_from)
.collect(),
pages,
))
} }
pub async fn fetch_org_repos(&self, org: &str) -> anyhow::Result<Vec<Repository>> { pub async fn fetch_org_repos(&self, org: &str) -> anyhow::Result<Vec<Repository>> {
let client = reqwest::Client::new(); let (mut repos, pages) = self.fetch_org_repos_page(org, 1).await?;
let url = format!("{}/api/v1/orgs/{}/repos", self.url, org); let mut tasks = FuturesUnordered::new();
tracing::trace!("calling url: {}", &url); for page in pages {
tasks.push(async move {
let (new_repos, _) = self.fetch_org_repos_page(org, page).await?;
let response = client Ok::<Vec<Repository>, anyhow::Error>(new_repos)
.get(&url) })
.header("Content-Type", "application/json") }
.header("Authorization", format!("token {}", self.token))
.send()
.await?;
let repositories = response.json::<Vec<GiteaRepository>>().await?; while let Some(new_repos) = tasks.next().await {
let mut new_repos = new_repos?;
repos.append(&mut new_repos);
}
Ok(repositories Ok(repos)
.into_iter()
.flat_map(Repository::try_from)
.collect())
} }
async fn fetch_renovate(&self, repo: &Repository) -> anyhow::Result<Option<()>> { async fn fetch_renovate(&self, repo: &Repository) -> anyhow::Result<Option<()>> {
@ -362,10 +435,41 @@ impl traits::GiteaClient for DefaultGiteaClient {
} }
} }
// <https://git.front.kjuulh.io/api/v1/user/repos?page=2>; rel="next",<https://git.front.kjuulh.io/api/v1/user/repos?page=9>; rel="last"
fn parse_link(page: usize, link_str: &str) -> anyhow::Result<Vec<usize>> {
let link_sections = link_str.split(',');
for link_section in link_sections {
if let Some((link, rel)) = link_section.rsplit_once("; ") {
if rel == r#"rel="last""# {
let actual_link = &link[1..link.len() - 1];
let url = Url::parse(actual_link)?;
if let Some(page_num) = url
.query_pairs()
.into_iter()
.find(|(name, _)| name == "page")
.map(|(_, value)| value)
{
let page_num: usize = page_num.parse()?;
let page_numbers = (page + 1..page_num).collect::<Vec<usize>>();
return Ok(page_numbers);
}
}
}
}
Ok(Vec::default())
}
mod extensions; mod extensions;
pub mod traits; pub mod traits;
use anyhow::Context; use anyhow::Context;
pub use extensions::*; pub use extensions::*;
use reqwest::StatusCode; use futures::{stream::FuturesUnordered, StreamExt};
use reqwest::{StatusCode, Url};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};