diff --git a/net/webrtc/signalling/src/server/mod.rs b/net/webrtc/signalling/src/server/mod.rs index 93f43bc6..e0350dce 100644 --- a/net/webrtc/signalling/src/server/mod.rs +++ b/net/webrtc/signalling/src/server/mod.rs @@ -10,7 +10,7 @@ use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::pin::Pin; use std::sync::{Arc, Mutex}; -use tracing::{info, instrument, trace, warn}; +use tracing::{debug, error, info, instrument, trace, warn}; struct Peer { receive_task_handle: task::JoinHandle<()>, @@ -138,6 +138,7 @@ impl Server { let this_id_clone = this_id.clone(); let (mut ws_sink, mut ws_stream) = ws.split(); let send_task_handle = task::spawn(async move { + let mut res = Ok(()); loop { match async_std::future::timeout( std::time::Duration::from_secs(30), @@ -147,21 +148,28 @@ impl Server { { Ok(Some(msg)) => { trace!(this_id = %this_id_clone, "sending {}", msg); - ws_sink.send(WsMessage::Text(msg)).await?; + res = ws_sink.send(WsMessage::Text(msg)).await; } Ok(None) => { break; } Err(_) => { trace!(this_id = %this_id_clone, "timeout, sending ping"); - ws_sink.send(WsMessage::Ping(vec![])).await?; + res = ws_sink.send(WsMessage::Ping(vec![])).await; } } + + if let Err(ref err) = res { + error!(this_id = %this_id_clone, %err, "Quitting send loop"); + break; + } } - ws_sink.close().await?; + debug!(this_id = %this_id_clone, "Done sending"); - Ok::<(), Error>(()) + let _ = ws_sink.close().await; + + res.map_err(Into::into) }); let mut tx = self.state.lock().unwrap().tx.clone(); diff --git a/net/webrtc/src/signaller/imp.rs b/net/webrtc/src/signaller/imp.rs index 90918251..8aa94a1d 100644 --- a/net/webrtc/src/signaller/imp.rs +++ b/net/webrtc/src/signaller/imp.rs @@ -83,22 +83,30 @@ impl Signaller { mpsc::channel::(1000); let element_clone = element.downgrade(); let send_task_handle = task::spawn(async move { + let mut res = Ok(()); while let Some(msg) = websocket_receiver.next().await { if let Some(element) = element_clone.upgrade() { gst::trace!(CAT, obj: element, "Sending websocket message {:?}", msg); } - ws_sink + res = ws_sink .send(WsMessage::Text(serde_json::to_string(&msg).unwrap())) - .await?; + .await; + + if let Err(ref err) = res { + if let Some(element) = element_clone.upgrade() { + gst::error!(CAT, obj: element, "Quitting send loop: {err}") + } + break; + } } if let Some(element) = element_clone.upgrade() { - gst::info!(CAT, obj: element, "Done sending"); + gst::debug!(CAT, obj: element, "Done sending"); } - ws_sink.close().await?; + let _ = ws_sink.close().await; - Ok::<(), Error>(()) + res.map_err(Into::into) }); let meta = if let Some(meta) = element.property::>("meta") { diff --git a/net/webrtc/src/webrtcsrc/signaller/imp.rs b/net/webrtc/src/webrtcsrc/signaller/imp.rs index a85134bc..1bde6350 100644 --- a/net/webrtc/src/webrtcsrc/signaller/imp.rs +++ b/net/webrtc/src/webrtcsrc/signaller/imp.rs @@ -138,21 +138,29 @@ impl Signaller { let (websocket_sender, mut websocket_receiver) = mpsc::channel::(1000); let send_task_handle = task::spawn(glib::clone!(@weak-allow-none self as this => async move { + let mut res = Ok(()); while let Some(msg) = websocket_receiver.next().await { gst::log!(CAT, "Sending websocket message {:?}", msg); - ws_sink + res = ws_sink .send(WsMessage::Text(serde_json::to_string(&msg).unwrap())) - .await?; + .await; + + if let Err(ref err) = res { + this.as_ref().map_or_else(|| gst::error!(CAT, "Quitting send loop: {err}"), + |this| gst::error!(CAT, imp: this, "Quitting send loop: {err}") + ); + break; + } } let msg = "Done sending"; - this.map_or_else(|| gst::info!(CAT, "{msg}"), - |this| gst::info!(CAT, imp: this, "{msg}") + this.map_or_else(|| gst::debug!(CAT, "{msg}"), + |this| gst::debug!(CAT, imp: this, "{msg}") ); - ws_sink.close().await?; + let _ = ws_sink.close().await; - Ok::<(), Error>(()) + res.map_err(Into::into) })); let obj = self.obj();