From c2c8290dfe407cc42f6f5bcbabfd38e8fa75a4ee Mon Sep 17 00:00:00 2001 From: kjuulh Date: Mon, 18 Jul 2022 10:51:59 +0200 Subject: [PATCH 1/4] removed hello_world --- src/lib/scel_api/src/graphql/query.rs | 4 ---- 1 file changed, 4 deletions(-) diff --git a/src/lib/scel_api/src/graphql/query.rs b/src/lib/scel_api/src/graphql/query.rs index 07a00f3..b15c473 100644 --- a/src/lib/scel_api/src/graphql/query.rs +++ b/src/lib/scel_api/src/graphql/query.rs @@ -14,10 +14,6 @@ pub struct QueryRoot; #[Object] impl QueryRoot { - async fn hello_world(&self) -> &str { - "Hello, world!" - } - async fn get_download(&self, ctx: &Context<'_>, id: ID) -> Result> { let app = ctx.data_unchecked::>(); -- 2.45.2 From 4b28396134562e1d7535185a2eb3cc21c68bc038 Mon Sep 17 00:00:00 2001 From: kjuulh Date: Mon, 18 Jul 2022 13:00:25 +0200 Subject: [PATCH 2/4] Fix updates on subscriptions --- Cargo.lock | 14 + src/lib/scel_api/src/graphql/mutation.rs | 5 +- src/lib/scel_api/src/graphql/query.rs | 14 +- src/lib/scel_api/src/graphql/subscription.rs | 18 +- src/lib/scel_core/Cargo.toml | 4 + src/lib/scel_core/src/lib.rs | 7 +- src/lib/scel_core/src/services/mod.rs | 101 ++++++-- src/lib/scel_core/src/youtube/mod.rs | 256 +++++++++++++++++++ 8 files changed, 390 insertions(+), 29 deletions(-) create mode 100644 src/lib/scel_core/src/youtube/mod.rs diff --git a/Cargo.lock b/Cargo.lock index 2826e8f..2498522 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1333,8 +1333,12 @@ dependencies = [ "anyhow", "async-trait", "futures", + "lazy_static", + "regex", + "thiserror", "tokio", "tracing", + "uuid", ] [[package]] @@ -1870,6 +1874,16 @@ version = "0.7.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9" +[[package]] +name = "uuid" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dd6469f4314d5f1ffec476e05f17cc9a78bc7a27a6a857842170bdf8d6f98d2f" +dependencies = [ + "getrandom", + "rand", +] + [[package]] name = "valuable" version = "0.1.0" diff --git a/src/lib/scel_api/src/graphql/mutation.rs b/src/lib/scel_api/src/graphql/mutation.rs index ca00e1d..a781fed 100644 --- a/src/lib/scel_api/src/graphql/mutation.rs +++ b/src/lib/scel_api/src/graphql/mutation.rs @@ -21,9 +21,12 @@ impl MutationRoot { let download = app .download_service + .clone() .add_download(Download { - id: Some("some-id".to_string()), + id: None, link: download_link, + progress: None, + file_name: None, }) .await .unwrap(); diff --git a/src/lib/scel_api/src/graphql/query.rs b/src/lib/scel_api/src/graphql/query.rs index b15c473..4fba81c 100644 --- a/src/lib/scel_api/src/graphql/query.rs +++ b/src/lib/scel_api/src/graphql/query.rs @@ -3,11 +3,12 @@ use std::sync::Arc; use async_graphql::{Context, Object, Result, SimpleObject, ID}; use scel_core::App; -#[derive(SimpleObject)] -struct Download { - id: ID, - link: String, - progress: i32, +#[derive(SimpleObject, Clone)] +pub struct Download { + pub id: ID, + pub link: String, + pub progress: Option, + pub file_name: Option, } pub struct QueryRoot; @@ -20,8 +21,9 @@ impl QueryRoot { match app.download_service.get_download(id.to_string()).await { Ok(Some(d)) => Ok(Some(Download { id: ID::from(d.id.expect("ID could not be found")), - progress: 0, + progress: None, link: d.link, + file_name: None, })), Ok(None) => Ok(None), Err(e) => Err(e.into()), diff --git a/src/lib/scel_api/src/graphql/subscription.rs b/src/lib/scel_api/src/graphql/subscription.rs index 2573cd9..6647693 100644 --- a/src/lib/scel_api/src/graphql/subscription.rs +++ b/src/lib/scel_api/src/graphql/subscription.rs @@ -5,16 +5,18 @@ use async_graphql::{ }; use scel_core::App; +use super::query::Download; + pub struct SubscriptionRoot; struct DownloadChanged { - id: ID, + download: Download, } #[Object] impl DownloadChanged { - async fn id(&self) -> &ID { - &self.id + async fn download(&self) -> Download { + self.download.clone() } } @@ -30,8 +32,16 @@ impl SubscriptionRoot { stream! { while stream.changed().await.is_ok() { + let next_download = (*stream.borrow()).clone(); + let id = ID::from(next_download.id.unwrap()); + yield DownloadChanged { - id: id.clone() + download: Download { + id: id, + link: next_download.link, + file_name: next_download.file_name, + progress: next_download.progress, + } } } } diff --git a/src/lib/scel_core/Cargo.toml b/src/lib/scel_core/Cargo.toml index 0fb2d18..6802ede 100644 --- a/src/lib/scel_core/Cargo.toml +++ b/src/lib/scel_core/Cargo.toml @@ -11,3 +11,7 @@ anyhow = { version = "*" } async-trait = { version = "0.1.56" } futures = "0.3.21" tracing = "0.1" +lazy_static = "1.4.0" +regex = { version = "1.5.5" } +thiserror = "1.0.31" +uuid = {version = "1.1.2", features = ["v4", "fast-rng"]} diff --git a/src/lib/scel_core/src/lib.rs b/src/lib/scel_core/src/lib.rs index 3c1922a..21f08ad 100644 --- a/src/lib/scel_core/src/lib.rs +++ b/src/lib/scel_core/src/lib.rs @@ -1,16 +1,19 @@ +use std::sync::Arc; + use services::InMemoryDownloadService; pub mod services; +mod youtube; #[allow(dead_code)] pub struct App { - pub download_service: InMemoryDownloadService, + pub download_service: Arc, } impl App { pub fn new() -> Self { Self { - download_service: InMemoryDownloadService::new(), + download_service: Arc::new(InMemoryDownloadService::new()), } } } diff --git a/src/lib/scel_core/src/services/mod.rs b/src/lib/scel_core/src/services/mod.rs index c2dd46c..55ba1ed 100644 --- a/src/lib/scel_core/src/services/mod.rs +++ b/src/lib/scel_core/src/services/mod.rs @@ -1,16 +1,29 @@ -use std::{collections::HashMap, sync::Arc, time::Duration}; +use std::{collections::HashMap, path::PathBuf, sync::Arc}; use tokio::sync::{watch, Mutex}; -use tracing::info; +use tracing::error; +use uuid::Uuid; + +use crate::youtube::{Arg, YoutubeDL}; #[derive(Clone)] pub struct Download { pub id: Option, pub link: String, + pub progress: Option, + pub file_name: Option, } pub struct InMemoryDownloadService { - downloads: - Mutex>, tokio::sync::watch::Receiver)>>, + downloads: Mutex< + HashMap< + String, + ( + Arc>, + Arc>>, + tokio::sync::watch::Receiver, + ), + >, + >, } impl InMemoryDownloadService { @@ -20,28 +33,86 @@ impl InMemoryDownloadService { } } - pub async fn add_download(&self, download: Download) -> anyhow::Result { + pub async fn add_download(self: Arc, download: Download) -> anyhow::Result { let mut downloads = self.downloads.lock().await; let (tx, rx) = watch::channel(download.clone()); + let shared_tx = Arc::new(Mutex::new(tx)); + + let mut d = download.to_owned(); + + let id = Uuid::new_v4().to_string(); + d.id = Some(id.clone()); downloads.insert( - "key".to_string(), - (Arc::new(Mutex::new(download.clone())), rx.clone()), + id.clone(), + ( + Arc::new(Mutex::new(d.clone())), + shared_tx.clone(), + rx.clone(), + ), ); + let args = vec![ + Arg::new("--progress"), + Arg::new("--newline"), + Arg::new_with_args("--output", "%(title).90s.%(ext)s"), + ]; + let ytd = YoutubeDL::new( + &PathBuf::from("./data/downloads"), + args, + download.link.as_str(), + )?; + tokio::spawn({ - let d = download.clone(); + let download_service = self.clone(); + async move { - loop { - info!("Sending event: {}", d.clone().id.unwrap()); - let _ = tx.send(d.clone()); - tokio::time::sleep(Duration::from_millis(300)).await; + if let Err(e) = ytd + .download( + |percentage| { + let ds = download_service.clone(); + let id = id.clone(); + + async move { + let mut download = ds.get_download(id).await.unwrap().unwrap(); + download.progress = Some(percentage); + let _ = ds.update_download(download).await; + } + }, + |file_name| { + let ds = download_service.clone(); + let id = id.clone(); + + async move { + let mut download = ds.get_download(id).await.unwrap().unwrap(); + download.file_name = Some(file_name); + let _ = ds.update_download(download).await; + } + }, + ) + .await + { + error!("Download failed: {}", e); + } else { + let download = download_service.get_download(id).await.unwrap().unwrap(); + let _ = download_service.update_download(download).await; } } }); - Ok(download) + Ok(d) + } + + pub async fn update_download(self: Arc, download: Download) -> anyhow::Result<()> { + let mut downloads = self.downloads.lock().await; + if let Some(d) = downloads.get_mut(&download.clone().id.unwrap()) { + let mut d_mut = d.0.lock().await; + *d_mut = download.clone(); + let _ = d.1.lock().await.send(download); + } + + Ok(()) } pub async fn get_download(&self, id: String) -> anyhow::Result> { @@ -58,9 +129,7 @@ impl InMemoryDownloadService { pub async fn subscribe_download(&self, id: String) -> tokio::sync::watch::Receiver { let downloads = self.downloads.lock().await; - let download = downloads.get(&id).unwrap(); - - download.1.clone() + download.2.clone() } } diff --git a/src/lib/scel_core/src/youtube/mod.rs b/src/lib/scel_core/src/youtube/mod.rs new file mode 100644 index 0000000..cdcb26d --- /dev/null +++ b/src/lib/scel_core/src/youtube/mod.rs @@ -0,0 +1,256 @@ +use std::fmt::{Display, Formatter}; +use std::fs::{canonicalize, create_dir_all}; +use std::future::Future; +use std::num::ParseIntError; +use std::path::{Path, PathBuf}; +use std::process::{Output, Stdio}; + +use lazy_static::lazy_static; +use regex::Regex; +use thiserror::Error; +use tokio::io::{AsyncBufReadExt, BufReader}; +use tokio::process::Command; + +#[derive(Error, Debug)] +pub enum YoutubeDLError { + #[error("failed to execute youtube-dl")] + IOError(#[from] std::io::Error), + #[error("failed to convert path")] + UTF8Error(#[from] std::string::FromUtf8Error), + #[error("youtube-dl exited with: {0}")] + Failure(String), +} + +type Result = std::result::Result; + +const YOUTUBE_DL_COMMAND: &str = "yt-dlp"; + +#[derive(Clone, Debug)] +pub struct Arg { + arg: String, + input: Option, +} + +impl Arg { + pub fn new(argument: &str) -> Self { + Self { + arg: argument.to_string(), + input: None, + } + } + + pub fn new_with_args(argument: &str, input: &str) -> Self { + Self { + arg: argument.to_string(), + input: Option::from(input.to_string()), + } + } +} + +impl Display for Arg { + fn fmt(&self, fmt: &mut Formatter<'_>) -> std::fmt::Result { + match &self.input { + Some(input) => write!(fmt, "{} {}", self.arg, input), + None => write!(fmt, "{}", self.arg), + } + } +} + +#[derive(Clone, Debug)] +pub struct YoutubeDL { + path: PathBuf, + links: Vec, + args: Vec, +} + +#[derive(Clone, Debug)] +pub struct YoutubeDLResult { + path: PathBuf, + output: String, +} + +impl YoutubeDLResult { + fn new(path: &PathBuf) -> Self { + Self { + path: path.clone(), + output: String::new(), + } + } + + pub fn output_dir(&self) -> &PathBuf { + &self.path + } +} + +impl YoutubeDL { + pub fn new_multiple_links( + dl_path: &PathBuf, + args: Vec, + links: Vec, + ) -> Result { + let path = Path::new(dl_path); + + if !path.exists() { + create_dir_all(&path)?; + } + + if !path.is_dir() { + return Err(YoutubeDLError::IOError(std::io::Error::new( + std::io::ErrorKind::Other, + "path is not a directory", + ))); + } + + let path = canonicalize(dl_path)?; + Ok(YoutubeDL { path, links, args }) + } + + pub fn new(dl_path: &PathBuf, args: Vec, link: &str) -> Result { + YoutubeDL::new_multiple_links(dl_path, args, vec![link.to_string()]) + } + + pub async fn download( + &self, + progress_update_fn: F, + file_name_available: FAvailable, + ) -> Result + where + F: Fn(u32) -> Fut, + FAvailable: Fn(String) -> FutAvailable, + Fut: Future, + FutAvailable: Future, + { + let output = self + .spawn_youtube_dl(progress_update_fn, file_name_available) + .await?; + let mut result = YoutubeDLResult::new(&self.path); + + if !output.status.success() { + return Err(YoutubeDLError::Failure(String::from_utf8(output.stderr)?)); + } + result.output = String::from_utf8(output.stdout)?; + + Ok(result) + } + + async fn spawn_youtube_dl( + &self, + progress_update_fn: F, + file_name_available: FAvailable, + ) -> Result + where + F: Fn(u32) -> Fut, + FAvailable: Fn(String) -> FutAvailable, + Fut: Future, + FutAvailable: Future, + { + let mut cmd = Command::new(YOUTUBE_DL_COMMAND); + cmd.current_dir(&self.path) + .env("LC_ALL", "en_US.UTF-8") + .stdout(Stdio::piped()) + .stderr(Stdio::piped()); + + for arg in self.args.iter() { + match &arg.input { + Some(input) => cmd.arg(&arg.arg).arg(input), + None => cmd.arg(&arg.arg), + }; + } + + for link in self.links.iter() { + cmd.arg(&link); + } + + let mut pr = cmd.spawn()?; + + { + let stdout = pr.stdout.as_mut().unwrap(); + let stdout_reader = BufReader::new(stdout); + let mut stdout_lines = stdout_reader.lines(); + + let mut have_gotten_file_name = false; + while let Ok(Some(line)) = stdout_lines.next_line().await { + println!("{}", line.clone()); + + if !have_gotten_file_name { + if let Some(file_name) = parse_file_name(line.clone()) { + file_name_available(file_name).await; + have_gotten_file_name = true + } + } + + if let Some(Ok(percentage)) = parse_line(line) { + progress_update_fn(percentage).await; + } + } + } + + Ok(pr.wait_with_output().await?) + } +} + +fn parse_line(line: String) -> Option> { + lazy_static! { + static ref RE: Regex = Regex::new(r"\[download\]\s+(\d+)").unwrap(); + } + + let capture: regex::Captures = RE.captures(line.as_str())?; + if capture.len() != 2 { + return None; + } + let str = &capture[1]; + Some(str.to_string().parse::()) +} + +fn parse_file_name(line: String) -> Option { + lazy_static! { + static ref RE: Regex = Regex::new(r"^\[download\] Destination: (.+)$").unwrap(); + } + + let capture: regex::Captures = RE.captures(line.as_str())?; + if capture.len() != 2 { + return None; + } + let str = &capture[1]; + Some(str.to_string()) +} + +#[cfg(test)] +mod tests { + use crate::youtube::{parse_file_name, parse_line}; + + #[test] + fn test_parse_line() { + let percentage = parse_line( + "[download] 95.4% of ~215.85MiB at 9.61MiB/s ETA 00:01 (frag 144/151)".into(), + ); + + assert_eq!(percentage, Some(Ok(95))) + } + + #[test] + fn test_parse_line_get_nothing() { + let nothing = parse_line("[download] Got server HTTP error: The read operation timed out. Retrying (attempt 1 of 10) ...".into()); + + assert_eq!(nothing, None) + } + + #[test] + fn test_parse_file_name() { + let file_name = parse_file_name( + "[download] Destination: 10 Design Patterns Explained in 10 Minutes.mp4".into(), + ); + + assert_eq!( + file_name, + Some("10 Design Patterns Explained in 10 Minutes.mp4".into()) + ); + } + + #[test] + fn test_parse_file_name_get_nothing() { + let nothing = parse_file_name("[download] No fit: something".into()); + + assert_eq!(nothing, None) + } +} -- 2.45.2 From 62ba975afd1f6236bb4d2e150f952d3311c61105 Mon Sep 17 00:00:00 2001 From: kjuulh Date: Mon, 18 Jul 2022 13:13:20 +0200 Subject: [PATCH 3/4] Added drone --- .dockerignore | 4 ++++ .drone.yml | 31 +++++++++++++++++++++++++++++++ Dockerfile | 21 +++++++++++++++++++++ 3 files changed, 56 insertions(+) create mode 100644 .dockerignore create mode 100644 .drone.yml create mode 100644 Dockerfile diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..7f44911 --- /dev/null +++ b/.dockerignore @@ -0,0 +1,4 @@ +target/ +.git/ +.env +data/ diff --git a/.drone.yml b/.drone.yml new file mode 100644 index 0000000..b09d09b --- /dev/null +++ b/.drone.yml @@ -0,0 +1,31 @@ +kind: pipeline +name: default +type: docker +steps: + - name: server + image: plugins/docker + environment: + DOCKER_BUILDKIT: 1 + settings: + username: kasperhermansen + password: + from_secret: + docker_secret + repo: kasperhermansen/scel + tags: latest + context: . + dockerfile: Dockerfile + cache_from: kasperhermansen/scel:latest + + - name: send telegram notification + image: appleboy/drone-telegram + settings: + token: + from_secret: telegram_token + to: 2129601481 + format: markdown + depends_on: + - server + when: + status: [failure] + diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..366b1a6 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,21 @@ +FROM rust:1.60 as builder + +WORKDIR /usr/src/scel + +COPY . . + +RUN --mount=type=cache,target=/usr/src/scel/target cargo build --release +RUN --mount=type=cache,target=/usr/src/scel/target cargo install --path src/cmd/scel + +FROM debian:bullseye-slim + +# Install YTD +RUN rm -f /etc/apt/apt.conf.d/docker-clean; echo 'Binary::apt::APT::Keep-Downloaded-Packages "true";' > /etc/apt/apt.conf.d/keep-cache +RUN --mount=type=cache,target=/var/cache/apt --mount=type=cache,target=/var/lib/apt \ + apt-get update && apt-get install -y python3 python3-pip +RUN python3 -m pip install -U yt-dlp + +# Copy binary +COPY --from=builder /usr/local/cargo/bin/scel /usr/local/bin/scel + +CMD ["scel"] -- 2.45.2 From 20950ebd5c0a512121f0b2f6d6040963e9fbbdcb Mon Sep 17 00:00:00 2001 From: kjuulh Date: Mon, 18 Jul 2022 13:13:54 +0200 Subject: [PATCH 4/4] Formatted with clippy --- src/lib/scel_api/src/auth/mod.rs | 2 +- src/lib/scel_api/src/lib.rs | 2 +- src/lib/scel_core/src/services/mod.rs | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/lib/scel_api/src/auth/mod.rs b/src/lib/scel_api/src/auth/mod.rs index dfdf231..1282441 100644 --- a/src/lib/scel_api/src/auth/mod.rs +++ b/src/lib/scel_api/src/auth/mod.rs @@ -9,7 +9,7 @@ use axum::{ }; use oauth2::{ basic::BasicClient, reqwest::async_http_client, AuthUrl, AuthorizationCode, ClientId, - ClientSecret, CsrfToken, RedirectUrl, Scope, TokenResponse, TokenUrl, + ClientSecret, CsrfToken, RedirectUrl, TokenResponse, TokenUrl, }; use reqwest::header::SET_COOKIE; use serde::Deserialize; diff --git a/src/lib/scel_api/src/lib.rs b/src/lib/scel_api/src/lib.rs index 56e2760..0da69fe 100644 --- a/src/lib/scel_api/src/lib.rs +++ b/src/lib/scel_api/src/lib.rs @@ -79,7 +79,7 @@ impl Server { let addr = SocketAddr::from(([127, 0, 0, 1], 3000)); - return Server { app, addr }; + Server { app, addr } } pub async fn start(self) -> anyhow::Result<()> { diff --git a/src/lib/scel_core/src/services/mod.rs b/src/lib/scel_core/src/services/mod.rs index 55ba1ed..d9497a3 100644 --- a/src/lib/scel_core/src/services/mod.rs +++ b/src/lib/scel_core/src/services/mod.rs @@ -48,8 +48,8 @@ impl InMemoryDownloadService { id.clone(), ( Arc::new(Mutex::new(d.clone())), - shared_tx.clone(), - rx.clone(), + shared_tx, + rx, ), ); -- 2.45.2