Added subscriptions

This commit is contained in:
Kasper Juul Hermansen 2022-07-18 10:50:52 +02:00
parent e0262a7f17
commit e616179774
Signed by: kjuulh
GPG Key ID: 57B6E1465221F912
10 changed files with 170 additions and 11 deletions

8
Cargo.lock generated
View File

@ -1317,6 +1317,7 @@ dependencies = [
"futures", "futures",
"oauth2", "oauth2",
"reqwest", "reqwest",
"scel_core",
"serde", "serde",
"serde_json", "serde_json",
"tokio", "tokio",
@ -1328,6 +1329,13 @@ dependencies = [
[[package]] [[package]]
name = "scel_core" name = "scel_core"
version = "0.1.0" version = "0.1.0"
dependencies = [
"anyhow",
"async-trait",
"futures",
"tokio",
"tracing",
]
[[package]] [[package]]
name = "scopeguard" name = "scopeguard"

View File

@ -1,4 +1,7 @@
use std::sync::Arc;
use dotenv::dotenv; use dotenv::dotenv;
use scel_core::App;
use tracing::info; use tracing::info;
use tracing_subscriber::{EnvFilter, FmtSubscriber}; use tracing_subscriber::{EnvFilter, FmtSubscriber};
@ -19,5 +22,7 @@ async fn main() -> anyhow::Result<()> {
info!("Starting scel"); info!("Starting scel");
scel_api::Server::new().start().await let app = Arc::new(App::new());
scel_api::Server::new(app.clone()).start().await
} }

View File

@ -27,3 +27,5 @@ reqwest = { version = "*", default-features = false, features = [
"rustls-tls", "rustls-tls",
"json", "json",
] } ] }
scel_core = {path = "../scel_core"}

View File

@ -1,4 +1,7 @@
use std::sync::Arc;
use async_graphql::{Context, Object, Result, SimpleObject, ID}; use async_graphql::{Context, Object, Result, SimpleObject, ID};
use scel_core::{services::Download, App};
pub struct MutationRoot; pub struct MutationRoot;
@ -9,7 +12,24 @@ struct RequestDownloadResponse {
#[Object] #[Object]
impl MutationRoot { impl MutationRoot {
async fn request_download(&self, ctx: &Context<'_>) -> Result<RequestDownloadResponse> { async fn request_download(
Err("not implemented 123".into()) &self,
ctx: &Context<'_>,
download_link: String,
) -> Result<RequestDownloadResponse> {
let app = ctx.data_unchecked::<Arc<App>>();
let download = app
.download_service
.add_download(Download {
id: Some("some-id".to_string()),
link: download_link,
})
.await
.unwrap();
Ok(RequestDownloadResponse {
id: download.id.unwrap().into(),
})
} }
} }

View File

@ -1,4 +1,14 @@
use async_graphql::Object; use std::sync::Arc;
use async_graphql::{Context, Object, Result, SimpleObject, ID};
use scel_core::App;
#[derive(SimpleObject)]
struct Download {
id: ID,
link: String,
progress: i32,
}
pub struct QueryRoot; pub struct QueryRoot;
@ -7,4 +17,18 @@ impl QueryRoot {
async fn hello_world(&self) -> &str { async fn hello_world(&self) -> &str {
"Hello, world!" "Hello, world!"
} }
async fn get_download(&self, ctx: &Context<'_>, id: ID) -> Result<Option<Download>> {
let app = ctx.data_unchecked::<Arc<App>>();
match app.download_service.get_download(id.to_string()).await {
Ok(Some(d)) => Ok(Some(Download {
id: ID::from(d.id.expect("ID could not be found")),
progress: 0,
link: d.link,
})),
Ok(None) => Ok(None),
Err(e) => Err(e.into()),
}
}
} }

View File

@ -1,6 +1,9 @@
use std::sync::Arc;
use async_graphql::{ use async_graphql::{
async_stream::stream, futures_util::Stream, Context, Object, Subscription, ID, async_stream::stream, futures_util::Stream, Context, Object, Subscription, ID,
}; };
use scel_core::App;
pub struct SubscriptionRoot; pub struct SubscriptionRoot;
@ -17,9 +20,20 @@ impl DownloadChanged {
#[Subscription] #[Subscription]
impl SubscriptionRoot { impl SubscriptionRoot {
async fn get_download(&self, ctx: &Context<'_>) -> impl Stream<Item = DownloadChanged> { async fn get_download(&self, ctx: &Context<'_>, id: ID) -> impl Stream<Item = DownloadChanged> {
let app = ctx.data_unchecked::<Arc<App>>();
let mut stream = app
.download_service
.subscribe_download(id.to_string())
.await;
stream! { stream! {
yield DownloadChanged {id: "Some-id".into()} while stream.changed().await.is_ok() {
yield DownloadChanged {
id: id.clone()
}
}
} }
} }
} }

View File

@ -1,10 +1,10 @@
mod auth; mod auth;
mod graphql; mod graphql;
use std::net::SocketAddr; use std::{net::SocketAddr, sync::Arc};
use async_graphql::{ use async_graphql::{
extensions::{Logger, OpenTelemetry, Tracing}, extensions::{Logger, Tracing},
http::{playground_source, GraphQLPlaygroundConfig}, http::{playground_source, GraphQLPlaygroundConfig},
Request, Response, Schema, Request, Response, Schema,
}; };
@ -21,12 +21,12 @@ use axum::{
use graphql::{ use graphql::{
mutation::MutationRoot, query::QueryRoot, schema::ScelSchema, subscription::SubscriptionRoot, mutation::MutationRoot, query::QueryRoot, schema::ScelSchema, subscription::SubscriptionRoot,
}; };
use scel_core::App;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use tower_http::{ use tower_http::{
cors::CorsLayer, cors::CorsLayer,
trace::{DefaultMakeSpan, TraceLayer}, trace::{DefaultMakeSpan, TraceLayer},
}; };
use tracing::Subscriber;
async fn graphql_playground() -> impl IntoResponse { async fn graphql_playground() -> impl IntoResponse {
Html(playground_source( Html(playground_source(
@ -47,10 +47,11 @@ pub struct Server {
} }
impl Server { impl Server {
pub fn new() -> Server { pub fn new(app: Arc<App>) -> Server {
let schema = Schema::build(QueryRoot, MutationRoot, SubscriptionRoot) let schema = Schema::build(QueryRoot, MutationRoot, SubscriptionRoot)
.extension(Tracing) .extension(Tracing)
.extension(Logger) .extension(Logger)
.data(app)
.finish(); .finish();
let cors = vec!["http://localhost:3000" let cors = vec!["http://localhost:3000"

View File

@ -6,3 +6,8 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies] [dependencies]
tokio = { version = "1.0", features = ["full"] }
anyhow = { version = "*" }
async-trait = { version = "0.1.56" }
futures = "0.3.21"
tracing = "0.1"

View File

@ -1,2 +1,16 @@
pub struct App {} use services::InMemoryDownloadService;
pub mod services;
#[allow(dead_code)]
pub struct App {
pub download_service: InMemoryDownloadService,
}
impl App {
pub fn new() -> Self {
Self {
download_service: InMemoryDownloadService::new(),
}
}
}

View File

@ -0,0 +1,66 @@
use std::{collections::HashMap, sync::Arc, time::Duration};
use tokio::sync::{watch, Mutex};
use tracing::info;
#[derive(Clone)]
pub struct Download {
pub id: Option<String>,
pub link: String,
}
pub struct InMemoryDownloadService {
downloads:
Mutex<HashMap<String, (Arc<Mutex<Download>>, tokio::sync::watch::Receiver<Download>)>>,
}
impl InMemoryDownloadService {
pub fn new() -> Self {
Self {
downloads: Mutex::new(HashMap::new()),
}
}
pub async fn add_download(&self, download: Download) -> anyhow::Result<Download> {
let mut downloads = self.downloads.lock().await;
let (tx, rx) = watch::channel(download.clone());
downloads.insert(
"key".to_string(),
(Arc::new(Mutex::new(download.clone())), rx.clone()),
);
tokio::spawn({
let d = download.clone();
async move {
loop {
info!("Sending event: {}", d.clone().id.unwrap());
let _ = tx.send(d.clone());
tokio::time::sleep(Duration::from_millis(300)).await;
}
}
});
Ok(download)
}
pub async fn get_download(&self, id: String) -> anyhow::Result<Option<Download>> {
let downloads = self.downloads.lock().await;
if let Some(d) = downloads.get(&id) {
let download = d.0.lock().await;
Ok(Some(download.clone()))
} else {
Ok(None)
}
}
pub async fn subscribe_download(&self, id: String) -> tokio::sync::watch::Receiver<Download> {
let downloads = self.downloads.lock().await;
let download = downloads.get(&id).unwrap();
download.1.clone()
}
}