diff --git a/examples/arsnova-client-tui.rs b/examples/arsnova-client-tui.rs index 97f725f..212b0cb 100644 --- a/examples/arsnova-client-tui.rs +++ b/examples/arsnova-client-tui.rs @@ -51,7 +51,7 @@ pub struct Cli { url: String, } -#[tokio::main(worker_threads = 3)] +#[tokio::main(worker_threads = 2)] async fn main() -> Result<(), ()> { let cli = Cli::parse(); diff --git a/src/client.rs b/src/client.rs index f81429a..8c76eee 100644 --- a/src/client.rs +++ b/src/client.rs @@ -28,8 +28,8 @@ use futures_util::{SinkExt, StreamExt}; use reqwest::{IntoUrl, StatusCode}; use serde::Deserialize; use serde_json::json; +use tokio::select; use tokio::sync::mpsc::{Receiver, Sender}; -use tokio::{join, select}; use tokio_tungstenite::connect_async; use tokio_tungstenite::tungstenite::Message; use url::Url; @@ -493,7 +493,7 @@ impl Client { .await .map_err(|_| ConnectionError)?; - let (mut write, read) = socket.split(); + let (mut write, mut read) = socket.split(); if write .send(Message::Text( @@ -508,37 +508,35 @@ impl Client { )) .await { - Ok(_) => {} - Err(_) => return Err(ConnectionError), - } - - let jh1 = read.for_each(|msg| async { - if let Ok(msg) = msg { - if msg.is_text() && msg.clone().into_text().unwrap().starts_with("MESSAGE") { - if let Ok(msg) = WsFeedbackMessage::parse(msg.to_text().unwrap()) { - if msg.body.body_type == "FeedbackChanged" { - let feedback = msg.body.payload.get_feedback(); - match &handler { - FeedbackHandler::Fn(f) => f(&feedback), - FeedbackHandler::Sender(tx) => { - let _ = tx.send(feedback).await; + Ok(_) => loop { + select! { + Some(next) = read.next() => { + match &next { + Ok(msg) => { + if msg.is_text() && msg.clone().into_text().unwrap().starts_with("MESSAGE") { + if let Ok(msg) = WsFeedbackMessage::parse(msg.to_text().unwrap()) { + if msg.body.body_type == "FeedbackChanged" { + let feedback = msg.body.payload.get_feedback(); + match &handler { + FeedbackHandler::Fn(f) => f(&feedback), + FeedbackHandler::Sender(tx) => { + let _ = tx.send(feedback).await; + } + }; + } + } } - }; + } + Err(_) => break } } + _ = tokio::time::sleep(Duration::from_secs(15)) => { + let _ = write.send(Message::Text("\n".to_string())).await; + } } - } - }); - - let jh2 = tokio::spawn(async move { - loop { - tokio::time::sleep(Duration::from_secs(15)).await; - let _ = write.send(Message::Text("\n".to_string())).await; - } - }); - - let _ = join!(jh1, jh2); - return Ok(()); + }, + Err(_) => return Err(ConnectionError), + } } Err(ConnectionError)