Fix updates on subscriptions

This commit is contained in:
Kasper Juul Hermansen 2022-07-18 13:00:25 +02:00
parent c2c8290dfe
commit 4b28396134
Signed by: kjuulh
GPG Key ID: 57B6E1465221F912
8 changed files with 390 additions and 29 deletions

14
Cargo.lock generated
View File

@ -1333,8 +1333,12 @@ dependencies = [
"anyhow", "anyhow",
"async-trait", "async-trait",
"futures", "futures",
"lazy_static",
"regex",
"thiserror",
"tokio", "tokio",
"tracing", "tracing",
"uuid",
] ]
[[package]] [[package]]
@ -1870,6 +1874,16 @@ version = "0.7.6"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9" checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9"
[[package]]
name = "uuid"
version = "1.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dd6469f4314d5f1ffec476e05f17cc9a78bc7a27a6a857842170bdf8d6f98d2f"
dependencies = [
"getrandom",
"rand",
]
[[package]] [[package]]
name = "valuable" name = "valuable"
version = "0.1.0" version = "0.1.0"

View File

@ -21,9 +21,12 @@ impl MutationRoot {
let download = app let download = app
.download_service .download_service
.clone()
.add_download(Download { .add_download(Download {
id: Some("some-id".to_string()), id: None,
link: download_link, link: download_link,
progress: None,
file_name: None,
}) })
.await .await
.unwrap(); .unwrap();

View File

@ -3,11 +3,12 @@ use std::sync::Arc;
use async_graphql::{Context, Object, Result, SimpleObject, ID}; use async_graphql::{Context, Object, Result, SimpleObject, ID};
use scel_core::App; use scel_core::App;
#[derive(SimpleObject)] #[derive(SimpleObject, Clone)]
struct Download { pub struct Download {
id: ID, pub id: ID,
link: String, pub link: String,
progress: i32, pub progress: Option<u32>,
pub file_name: Option<String>,
} }
pub struct QueryRoot; pub struct QueryRoot;
@ -20,8 +21,9 @@ impl QueryRoot {
match app.download_service.get_download(id.to_string()).await { match app.download_service.get_download(id.to_string()).await {
Ok(Some(d)) => Ok(Some(Download { Ok(Some(d)) => Ok(Some(Download {
id: ID::from(d.id.expect("ID could not be found")), id: ID::from(d.id.expect("ID could not be found")),
progress: 0, progress: None,
link: d.link, link: d.link,
file_name: None,
})), })),
Ok(None) => Ok(None), Ok(None) => Ok(None),
Err(e) => Err(e.into()), Err(e) => Err(e.into()),

View File

@ -5,16 +5,18 @@ use async_graphql::{
}; };
use scel_core::App; use scel_core::App;
use super::query::Download;
pub struct SubscriptionRoot; pub struct SubscriptionRoot;
struct DownloadChanged { struct DownloadChanged {
id: ID, download: Download,
} }
#[Object] #[Object]
impl DownloadChanged { impl DownloadChanged {
async fn id(&self) -> &ID { async fn download(&self) -> Download {
&self.id self.download.clone()
} }
} }
@ -30,8 +32,16 @@ impl SubscriptionRoot {
stream! { stream! {
while stream.changed().await.is_ok() { while stream.changed().await.is_ok() {
let next_download = (*stream.borrow()).clone();
let id = ID::from(next_download.id.unwrap());
yield DownloadChanged { yield DownloadChanged {
id: id.clone() download: Download {
id: id,
link: next_download.link,
file_name: next_download.file_name,
progress: next_download.progress,
}
} }
} }
} }

View File

@ -11,3 +11,7 @@ anyhow = { version = "*" }
async-trait = { version = "0.1.56" } async-trait = { version = "0.1.56" }
futures = "0.3.21" futures = "0.3.21"
tracing = "0.1" tracing = "0.1"
lazy_static = "1.4.0"
regex = { version = "1.5.5" }
thiserror = "1.0.31"
uuid = {version = "1.1.2", features = ["v4", "fast-rng"]}

View File

@ -1,16 +1,19 @@
use std::sync::Arc;
use services::InMemoryDownloadService; use services::InMemoryDownloadService;
pub mod services; pub mod services;
mod youtube;
#[allow(dead_code)] #[allow(dead_code)]
pub struct App { pub struct App {
pub download_service: InMemoryDownloadService, pub download_service: Arc<InMemoryDownloadService>,
} }
impl App { impl App {
pub fn new() -> Self { pub fn new() -> Self {
Self { Self {
download_service: InMemoryDownloadService::new(), download_service: Arc::new(InMemoryDownloadService::new()),
} }
} }
} }

View File

@ -1,16 +1,29 @@
use std::{collections::HashMap, sync::Arc, time::Duration}; use std::{collections::HashMap, path::PathBuf, sync::Arc};
use tokio::sync::{watch, Mutex}; use tokio::sync::{watch, Mutex};
use tracing::info; use tracing::error;
use uuid::Uuid;
use crate::youtube::{Arg, YoutubeDL};
#[derive(Clone)] #[derive(Clone)]
pub struct Download { pub struct Download {
pub id: Option<String>, pub id: Option<String>,
pub link: String, pub link: String,
pub progress: Option<u32>,
pub file_name: Option<String>,
} }
pub struct InMemoryDownloadService { pub struct InMemoryDownloadService {
downloads: downloads: Mutex<
Mutex<HashMap<String, (Arc<Mutex<Download>>, tokio::sync::watch::Receiver<Download>)>>, HashMap<
String,
(
Arc<Mutex<Download>>,
Arc<Mutex<tokio::sync::watch::Sender<Download>>>,
tokio::sync::watch::Receiver<Download>,
),
>,
>,
} }
impl InMemoryDownloadService { impl InMemoryDownloadService {
@ -20,28 +33,86 @@ impl InMemoryDownloadService {
} }
} }
pub async fn add_download(&self, download: Download) -> anyhow::Result<Download> { pub async fn add_download(self: Arc<Self>, download: Download) -> anyhow::Result<Download> {
let mut downloads = self.downloads.lock().await; let mut downloads = self.downloads.lock().await;
let (tx, rx) = watch::channel(download.clone()); let (tx, rx) = watch::channel(download.clone());
let shared_tx = Arc::new(Mutex::new(tx));
let mut d = download.to_owned();
let id = Uuid::new_v4().to_string();
d.id = Some(id.clone());
downloads.insert( downloads.insert(
"key".to_string(), id.clone(),
(Arc::new(Mutex::new(download.clone())), rx.clone()), (
Arc::new(Mutex::new(d.clone())),
shared_tx.clone(),
rx.clone(),
),
); );
let args = vec![
Arg::new("--progress"),
Arg::new("--newline"),
Arg::new_with_args("--output", "%(title).90s.%(ext)s"),
];
let ytd = YoutubeDL::new(
&PathBuf::from("./data/downloads"),
args,
download.link.as_str(),
)?;
tokio::spawn({ tokio::spawn({
let d = download.clone(); let download_service = self.clone();
async move { async move {
loop { if let Err(e) = ytd
info!("Sending event: {}", d.clone().id.unwrap()); .download(
let _ = tx.send(d.clone()); |percentage| {
tokio::time::sleep(Duration::from_millis(300)).await; let ds = download_service.clone();
let id = id.clone();
async move {
let mut download = ds.get_download(id).await.unwrap().unwrap();
download.progress = Some(percentage);
let _ = ds.update_download(download).await;
}
},
|file_name| {
let ds = download_service.clone();
let id = id.clone();
async move {
let mut download = ds.get_download(id).await.unwrap().unwrap();
download.file_name = Some(file_name);
let _ = ds.update_download(download).await;
}
},
)
.await
{
error!("Download failed: {}", e);
} else {
let download = download_service.get_download(id).await.unwrap().unwrap();
let _ = download_service.update_download(download).await;
} }
} }
}); });
Ok(download) Ok(d)
}
pub async fn update_download(self: Arc<Self>, download: Download) -> anyhow::Result<()> {
let mut downloads = self.downloads.lock().await;
if let Some(d) = downloads.get_mut(&download.clone().id.unwrap()) {
let mut d_mut = d.0.lock().await;
*d_mut = download.clone();
let _ = d.1.lock().await.send(download);
}
Ok(())
} }
pub async fn get_download(&self, id: String) -> anyhow::Result<Option<Download>> { pub async fn get_download(&self, id: String) -> anyhow::Result<Option<Download>> {
@ -58,9 +129,7 @@ impl InMemoryDownloadService {
pub async fn subscribe_download(&self, id: String) -> tokio::sync::watch::Receiver<Download> { pub async fn subscribe_download(&self, id: String) -> tokio::sync::watch::Receiver<Download> {
let downloads = self.downloads.lock().await; let downloads = self.downloads.lock().await;
let download = downloads.get(&id).unwrap(); let download = downloads.get(&id).unwrap();
download.2.clone()
download.1.clone()
} }
} }

View File

@ -0,0 +1,256 @@
use std::fmt::{Display, Formatter};
use std::fs::{canonicalize, create_dir_all};
use std::future::Future;
use std::num::ParseIntError;
use std::path::{Path, PathBuf};
use std::process::{Output, Stdio};
use lazy_static::lazy_static;
use regex::Regex;
use thiserror::Error;
use tokio::io::{AsyncBufReadExt, BufReader};
use tokio::process::Command;
#[derive(Error, Debug)]
pub enum YoutubeDLError {
#[error("failed to execute youtube-dl")]
IOError(#[from] std::io::Error),
#[error("failed to convert path")]
UTF8Error(#[from] std::string::FromUtf8Error),
#[error("youtube-dl exited with: {0}")]
Failure(String),
}
type Result<T> = std::result::Result<T, YoutubeDLError>;
const YOUTUBE_DL_COMMAND: &str = "yt-dlp";
#[derive(Clone, Debug)]
pub struct Arg {
arg: String,
input: Option<String>,
}
impl Arg {
pub fn new(argument: &str) -> Self {
Self {
arg: argument.to_string(),
input: None,
}
}
pub fn new_with_args(argument: &str, input: &str) -> Self {
Self {
arg: argument.to_string(),
input: Option::from(input.to_string()),
}
}
}
impl Display for Arg {
fn fmt(&self, fmt: &mut Formatter<'_>) -> std::fmt::Result {
match &self.input {
Some(input) => write!(fmt, "{} {}", self.arg, input),
None => write!(fmt, "{}", self.arg),
}
}
}
#[derive(Clone, Debug)]
pub struct YoutubeDL {
path: PathBuf,
links: Vec<String>,
args: Vec<Arg>,
}
#[derive(Clone, Debug)]
pub struct YoutubeDLResult {
path: PathBuf,
output: String,
}
impl YoutubeDLResult {
fn new(path: &PathBuf) -> Self {
Self {
path: path.clone(),
output: String::new(),
}
}
pub fn output_dir(&self) -> &PathBuf {
&self.path
}
}
impl YoutubeDL {
pub fn new_multiple_links(
dl_path: &PathBuf,
args: Vec<Arg>,
links: Vec<String>,
) -> Result<YoutubeDL> {
let path = Path::new(dl_path);
if !path.exists() {
create_dir_all(&path)?;
}
if !path.is_dir() {
return Err(YoutubeDLError::IOError(std::io::Error::new(
std::io::ErrorKind::Other,
"path is not a directory",
)));
}
let path = canonicalize(dl_path)?;
Ok(YoutubeDL { path, links, args })
}
pub fn new(dl_path: &PathBuf, args: Vec<Arg>, link: &str) -> Result<YoutubeDL> {
YoutubeDL::new_multiple_links(dl_path, args, vec![link.to_string()])
}
pub async fn download<F, FutAvailable, FAvailable, Fut>(
&self,
progress_update_fn: F,
file_name_available: FAvailable,
) -> Result<YoutubeDLResult>
where
F: Fn(u32) -> Fut,
FAvailable: Fn(String) -> FutAvailable,
Fut: Future<Output = ()>,
FutAvailable: Future<Output = ()>,
{
let output = self
.spawn_youtube_dl(progress_update_fn, file_name_available)
.await?;
let mut result = YoutubeDLResult::new(&self.path);
if !output.status.success() {
return Err(YoutubeDLError::Failure(String::from_utf8(output.stderr)?));
}
result.output = String::from_utf8(output.stdout)?;
Ok(result)
}
async fn spawn_youtube_dl<F, FutAvailable, FAvailable, Fut>(
&self,
progress_update_fn: F,
file_name_available: FAvailable,
) -> Result<Output>
where
F: Fn(u32) -> Fut,
FAvailable: Fn(String) -> FutAvailable,
Fut: Future<Output = ()>,
FutAvailable: Future<Output = ()>,
{
let mut cmd = Command::new(YOUTUBE_DL_COMMAND);
cmd.current_dir(&self.path)
.env("LC_ALL", "en_US.UTF-8")
.stdout(Stdio::piped())
.stderr(Stdio::piped());
for arg in self.args.iter() {
match &arg.input {
Some(input) => cmd.arg(&arg.arg).arg(input),
None => cmd.arg(&arg.arg),
};
}
for link in self.links.iter() {
cmd.arg(&link);
}
let mut pr = cmd.spawn()?;
{
let stdout = pr.stdout.as_mut().unwrap();
let stdout_reader = BufReader::new(stdout);
let mut stdout_lines = stdout_reader.lines();
let mut have_gotten_file_name = false;
while let Ok(Some(line)) = stdout_lines.next_line().await {
println!("{}", line.clone());
if !have_gotten_file_name {
if let Some(file_name) = parse_file_name(line.clone()) {
file_name_available(file_name).await;
have_gotten_file_name = true
}
}
if let Some(Ok(percentage)) = parse_line(line) {
progress_update_fn(percentage).await;
}
}
}
Ok(pr.wait_with_output().await?)
}
}
fn parse_line(line: String) -> Option<core::result::Result<u32, ParseIntError>> {
lazy_static! {
static ref RE: Regex = Regex::new(r"\[download\]\s+(\d+)").unwrap();
}
let capture: regex::Captures = RE.captures(line.as_str())?;
if capture.len() != 2 {
return None;
}
let str = &capture[1];
Some(str.to_string().parse::<u32>())
}
fn parse_file_name(line: String) -> Option<String> {
lazy_static! {
static ref RE: Regex = Regex::new(r"^\[download\] Destination: (.+)$").unwrap();
}
let capture: regex::Captures = RE.captures(line.as_str())?;
if capture.len() != 2 {
return None;
}
let str = &capture[1];
Some(str.to_string())
}
#[cfg(test)]
mod tests {
use crate::youtube::{parse_file_name, parse_line};
#[test]
fn test_parse_line() {
let percentage = parse_line(
"[download] 95.4% of ~215.85MiB at 9.61MiB/s ETA 00:01 (frag 144/151)".into(),
);
assert_eq!(percentage, Some(Ok(95)))
}
#[test]
fn test_parse_line_get_nothing() {
let nothing = parse_line("[download] Got server HTTP error: The read operation timed out. Retrying (attempt 1 of 10) ...".into());
assert_eq!(nothing, None)
}
#[test]
fn test_parse_file_name() {
let file_name = parse_file_name(
"[download] Destination: 10 Design Patterns Explained in 10 Minutes.mp4".into(),
);
assert_eq!(
file_name,
Some("10 Design Patterns Explained in 10 Minutes.mp4".into())
);
}
#[test]
fn test_parse_file_name_get_nothing() {
let nothing = parse_file_name("[download] No fit: something".into());
assert_eq!(nothing, None)
}
}