feat: make into stream instead
All checks were successful
continuous-integration/drone/push Build is passing

Signed-off-by: kjuulh <contact@kjuulh.io>
This commit is contained in:
Kasper Juul Hermansen 2024-04-13 00:19:43 +02:00
parent 2b095687b4
commit 7f73220753
Signed by: kjuulh
GPG Key ID: 9AA7BC13CE474394

View File

@ -150,24 +150,21 @@ impl DefaultGiteaClient {
} }
pub async fn fetch_user_repos(&self) -> anyhow::Result<Vec<Repository>> { pub async fn fetch_user_repos(&self) -> anyhow::Result<Vec<Repository>> {
let (mut repos, pages) = self.fetch_user_repos_page(1).await?; let (repos, pages) = self.fetch_user_repos_page(1).await?;
let mut tasks = FuturesUnordered::new(); let tasks = pages
.into_iter()
for page in pages { .map(|page| async move {
tasks.push(async move {
let (new_repos, _) = self.fetch_user_repos_page(page).await?; let (new_repos, _) = self.fetch_user_repos_page(page).await?;
Ok::<Vec<Repository>, anyhow::Error>(new_repos) Ok::<Vec<Repository>, anyhow::Error>(new_repos)
}) })
} .collect::<FuturesUnordered<_>>();
while let Some(new_repos) = tasks.next().await { let res: Result<Vec<Vec<Repository>>, anyhow::Error> = tasks.try_collect().await;
let mut new_repos = new_repos?; let res = res?.into_iter().flatten();
repos.append(&mut new_repos);
}
Ok(repos) Ok(repos.into_iter().chain(res).collect())
} }
async fn fetch_org_repos_page( async fn fetch_org_repos_page(
@ -211,24 +208,21 @@ impl DefaultGiteaClient {
} }
pub async fn fetch_org_repos(&self, org: &str) -> anyhow::Result<Vec<Repository>> { pub async fn fetch_org_repos(&self, org: &str) -> anyhow::Result<Vec<Repository>> {
let (mut repos, pages) = self.fetch_org_repos_page(org, 1).await?; let (repos, pages) = self.fetch_org_repos_page(org, 1).await?;
let mut tasks = FuturesUnordered::new(); let tasks = pages
.into_iter()
for page in pages { .map(|page| async move {
tasks.push(async move {
let (new_repos, _) = self.fetch_org_repos_page(org, page).await?; let (new_repos, _) = self.fetch_org_repos_page(org, page).await?;
Ok::<Vec<Repository>, anyhow::Error>(new_repos) Ok::<Vec<Repository>, anyhow::Error>(new_repos)
}) })
} .collect::<FuturesUnordered<_>>();
while let Some(new_repos) = tasks.next().await { let res: Result<Vec<Vec<Repository>>, anyhow::Error> = tasks.try_collect().await;
let mut new_repos = new_repos?; let res = res?.into_iter().flatten();
repos.append(&mut new_repos);
}
Ok(repos) Ok(repos.into_iter().chain(res).collect())
} }
async fn fetch_renovate(&self, repo: &Repository) -> anyhow::Result<Option<()>> { async fn fetch_renovate(&self, repo: &Repository) -> anyhow::Result<Option<()>> {
@ -470,6 +464,7 @@ pub mod traits;
use anyhow::Context; use anyhow::Context;
pub use extensions::*; pub use extensions::*;
use futures::{stream::FuturesUnordered, StreamExt}; use futures::{stream::FuturesUnordered, StreamExt, TryStreamExt};
use itertools::Itertools;
use reqwest::{StatusCode, Url}; use reqwest::{StatusCode, Url};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};