From e6161797745cd6c1513e3df560cf981994f3c92f Mon Sep 17 00:00:00 2001 From: kjuulh Date: Mon, 18 Jul 2022 10:50:52 +0200 Subject: [PATCH] Added subscriptions --- Cargo.lock | 8 +++ src/cmd/scel/src/main.rs | 7 ++- src/lib/scel_api/Cargo.toml | 2 + src/lib/scel_api/src/graphql/mutation.rs | 24 ++++++- src/lib/scel_api/src/graphql/query.rs | 26 +++++++- src/lib/scel_api/src/graphql/subscription.rs | 18 +++++- src/lib/scel_api/src/lib.rs | 9 +-- src/lib/scel_core/Cargo.toml | 5 ++ src/lib/scel_core/src/lib.rs | 16 ++++- src/lib/scel_core/src/services/mod.rs | 66 ++++++++++++++++++++ 10 files changed, 170 insertions(+), 11 deletions(-) create mode 100644 src/lib/scel_core/src/services/mod.rs diff --git a/Cargo.lock b/Cargo.lock index b58e11a..2826e8f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1317,6 +1317,7 @@ dependencies = [ "futures", "oauth2", "reqwest", + "scel_core", "serde", "serde_json", "tokio", @@ -1328,6 +1329,13 @@ dependencies = [ [[package]] name = "scel_core" version = "0.1.0" +dependencies = [ + "anyhow", + "async-trait", + "futures", + "tokio", + "tracing", +] [[package]] name = "scopeguard" diff --git a/src/cmd/scel/src/main.rs b/src/cmd/scel/src/main.rs index e0f07e5..adadbd6 100644 --- a/src/cmd/scel/src/main.rs +++ b/src/cmd/scel/src/main.rs @@ -1,4 +1,7 @@ +use std::sync::Arc; + use dotenv::dotenv; +use scel_core::App; use tracing::info; use tracing_subscriber::{EnvFilter, FmtSubscriber}; @@ -19,5 +22,7 @@ async fn main() -> anyhow::Result<()> { info!("Starting scel"); - scel_api::Server::new().start().await + let app = Arc::new(App::new()); + + scel_api::Server::new(app.clone()).start().await } diff --git a/src/lib/scel_api/Cargo.toml b/src/lib/scel_api/Cargo.toml index 814aed6..dff5412 100644 --- a/src/lib/scel_api/Cargo.toml +++ b/src/lib/scel_api/Cargo.toml @@ -27,3 +27,5 @@ reqwest = { version = "*", default-features = false, features = [ "rustls-tls", "json", ] } + +scel_core = {path = "../scel_core"} diff --git a/src/lib/scel_api/src/graphql/mutation.rs b/src/lib/scel_api/src/graphql/mutation.rs index b49ee52..ca00e1d 100644 --- a/src/lib/scel_api/src/graphql/mutation.rs +++ b/src/lib/scel_api/src/graphql/mutation.rs @@ -1,4 +1,7 @@ +use std::sync::Arc; + use async_graphql::{Context, Object, Result, SimpleObject, ID}; +use scel_core::{services::Download, App}; pub struct MutationRoot; @@ -9,7 +12,24 @@ struct RequestDownloadResponse { #[Object] impl MutationRoot { - async fn request_download(&self, ctx: &Context<'_>) -> Result { - Err("not implemented 123".into()) + async fn request_download( + &self, + ctx: &Context<'_>, + download_link: String, + ) -> Result { + let app = ctx.data_unchecked::>(); + + let download = app + .download_service + .add_download(Download { + id: Some("some-id".to_string()), + link: download_link, + }) + .await + .unwrap(); + + Ok(RequestDownloadResponse { + id: download.id.unwrap().into(), + }) } } diff --git a/src/lib/scel_api/src/graphql/query.rs b/src/lib/scel_api/src/graphql/query.rs index b04697b..07a00f3 100644 --- a/src/lib/scel_api/src/graphql/query.rs +++ b/src/lib/scel_api/src/graphql/query.rs @@ -1,4 +1,14 @@ -use async_graphql::Object; +use std::sync::Arc; + +use async_graphql::{Context, Object, Result, SimpleObject, ID}; +use scel_core::App; + +#[derive(SimpleObject)] +struct Download { + id: ID, + link: String, + progress: i32, +} pub struct QueryRoot; @@ -7,4 +17,18 @@ impl QueryRoot { async fn hello_world(&self) -> &str { "Hello, world!" } + + async fn get_download(&self, ctx: &Context<'_>, id: ID) -> Result> { + let app = ctx.data_unchecked::>(); + + match app.download_service.get_download(id.to_string()).await { + Ok(Some(d)) => Ok(Some(Download { + id: ID::from(d.id.expect("ID could not be found")), + progress: 0, + link: d.link, + })), + Ok(None) => Ok(None), + Err(e) => Err(e.into()), + } + } } diff --git a/src/lib/scel_api/src/graphql/subscription.rs b/src/lib/scel_api/src/graphql/subscription.rs index 6686031..2573cd9 100644 --- a/src/lib/scel_api/src/graphql/subscription.rs +++ b/src/lib/scel_api/src/graphql/subscription.rs @@ -1,6 +1,9 @@ +use std::sync::Arc; + use async_graphql::{ async_stream::stream, futures_util::Stream, Context, Object, Subscription, ID, }; +use scel_core::App; pub struct SubscriptionRoot; @@ -17,9 +20,20 @@ impl DownloadChanged { #[Subscription] impl SubscriptionRoot { - async fn get_download(&self, ctx: &Context<'_>) -> impl Stream { + async fn get_download(&self, ctx: &Context<'_>, id: ID) -> impl Stream { + let app = ctx.data_unchecked::>(); + + let mut stream = app + .download_service + .subscribe_download(id.to_string()) + .await; + stream! { - yield DownloadChanged {id: "Some-id".into()} + while stream.changed().await.is_ok() { + yield DownloadChanged { + id: id.clone() + } + } } } } diff --git a/src/lib/scel_api/src/lib.rs b/src/lib/scel_api/src/lib.rs index 62ebc85..56e2760 100644 --- a/src/lib/scel_api/src/lib.rs +++ b/src/lib/scel_api/src/lib.rs @@ -1,10 +1,10 @@ mod auth; mod graphql; -use std::net::SocketAddr; +use std::{net::SocketAddr, sync::Arc}; use async_graphql::{ - extensions::{Logger, OpenTelemetry, Tracing}, + extensions::{Logger, Tracing}, http::{playground_source, GraphQLPlaygroundConfig}, Request, Response, Schema, }; @@ -21,12 +21,12 @@ use axum::{ use graphql::{ mutation::MutationRoot, query::QueryRoot, schema::ScelSchema, subscription::SubscriptionRoot, }; +use scel_core::App; use serde::{Deserialize, Serialize}; use tower_http::{ cors::CorsLayer, trace::{DefaultMakeSpan, TraceLayer}, }; -use tracing::Subscriber; async fn graphql_playground() -> impl IntoResponse { Html(playground_source( @@ -47,10 +47,11 @@ pub struct Server { } impl Server { - pub fn new() -> Server { + pub fn new(app: Arc) -> Server { let schema = Schema::build(QueryRoot, MutationRoot, SubscriptionRoot) .extension(Tracing) .extension(Logger) + .data(app) .finish(); let cors = vec!["http://localhost:3000" diff --git a/src/lib/scel_core/Cargo.toml b/src/lib/scel_core/Cargo.toml index 9cbd07d..0fb2d18 100644 --- a/src/lib/scel_core/Cargo.toml +++ b/src/lib/scel_core/Cargo.toml @@ -6,3 +6,8 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +tokio = { version = "1.0", features = ["full"] } +anyhow = { version = "*" } +async-trait = { version = "0.1.56" } +futures = "0.3.21" +tracing = "0.1" diff --git a/src/lib/scel_core/src/lib.rs b/src/lib/scel_core/src/lib.rs index a3b76e1..3c1922a 100644 --- a/src/lib/scel_core/src/lib.rs +++ b/src/lib/scel_core/src/lib.rs @@ -1,2 +1,16 @@ -pub struct App {} +use services::InMemoryDownloadService; +pub mod services; + +#[allow(dead_code)] +pub struct App { + pub download_service: InMemoryDownloadService, +} + +impl App { + pub fn new() -> Self { + Self { + download_service: InMemoryDownloadService::new(), + } + } +} diff --git a/src/lib/scel_core/src/services/mod.rs b/src/lib/scel_core/src/services/mod.rs new file mode 100644 index 0000000..c2dd46c --- /dev/null +++ b/src/lib/scel_core/src/services/mod.rs @@ -0,0 +1,66 @@ +use std::{collections::HashMap, sync::Arc, time::Duration}; +use tokio::sync::{watch, Mutex}; +use tracing::info; + +#[derive(Clone)] +pub struct Download { + pub id: Option, + pub link: String, +} + +pub struct InMemoryDownloadService { + downloads: + Mutex>, tokio::sync::watch::Receiver)>>, +} + +impl InMemoryDownloadService { + pub fn new() -> Self { + Self { + downloads: Mutex::new(HashMap::new()), + } + } + + pub async fn add_download(&self, download: Download) -> anyhow::Result { + let mut downloads = self.downloads.lock().await; + + let (tx, rx) = watch::channel(download.clone()); + + downloads.insert( + "key".to_string(), + (Arc::new(Mutex::new(download.clone())), rx.clone()), + ); + + tokio::spawn({ + let d = download.clone(); + async move { + loop { + info!("Sending event: {}", d.clone().id.unwrap()); + let _ = tx.send(d.clone()); + tokio::time::sleep(Duration::from_millis(300)).await; + } + } + }); + + Ok(download) + } + + pub async fn get_download(&self, id: String) -> anyhow::Result> { + let downloads = self.downloads.lock().await; + + if let Some(d) = downloads.get(&id) { + let download = d.0.lock().await; + + Ok(Some(download.clone())) + } else { + Ok(None) + } + } + + pub async fn subscribe_download(&self, id: String) -> tokio::sync::watch::Receiver { + let downloads = self.downloads.lock().await; + + let download = downloads.get(&id).unwrap(); + + download.1.clone() + } +}