From 0e9d33b38b4419919ea2784ab2e019385c1c830a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Laignel?= Date: Mon, 12 Feb 2024 17:32:19 +0100 Subject: [PATCH] webrtc: signallers: attempt to close the ws when an error occurs This commit discards the early error returns in the send tasks to log the error and attempt to close the websocket. Part-of: --- net/webrtc/signalling/src/server/mod.rs | 18 +++++++++++++----- net/webrtc/src/signaller/imp.rs | 18 +++++++++++++----- net/webrtc/src/webrtcsrc/signaller/imp.rs | 20 ++++++++++++++------ 3 files changed, 40 insertions(+), 16 deletions(-) diff --git a/net/webrtc/signalling/src/server/mod.rs b/net/webrtc/signalling/src/server/mod.rs index 93f43bc6..e0350dce 100644 --- a/net/webrtc/signalling/src/server/mod.rs +++ b/net/webrtc/signalling/src/server/mod.rs @@ -10,7 +10,7 @@ use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::pin::Pin; use std::sync::{Arc, Mutex}; -use tracing::{info, instrument, trace, warn}; +use tracing::{debug, error, info, instrument, trace, warn}; struct Peer { receive_task_handle: task::JoinHandle<()>, @@ -138,6 +138,7 @@ impl Server { let this_id_clone = this_id.clone(); let (mut ws_sink, mut ws_stream) = ws.split(); let send_task_handle = task::spawn(async move { + let mut res = Ok(()); loop { match async_std::future::timeout( std::time::Duration::from_secs(30), @@ -147,21 +148,28 @@ impl Server { { Ok(Some(msg)) => { trace!(this_id = %this_id_clone, "sending {}", msg); - ws_sink.send(WsMessage::Text(msg)).await?; + res = ws_sink.send(WsMessage::Text(msg)).await; } Ok(None) => { break; } Err(_) => { trace!(this_id = %this_id_clone, "timeout, sending ping"); - ws_sink.send(WsMessage::Ping(vec![])).await?; + res = ws_sink.send(WsMessage::Ping(vec![])).await; } } + + if let Err(ref err) = res { + error!(this_id = %this_id_clone, %err, "Quitting send loop"); + break; + } } - ws_sink.close().await?; + debug!(this_id = %this_id_clone, "Done sending"); - Ok::<(), Error>(()) + let _ = ws_sink.close().await; + + res.map_err(Into::into) }); let mut tx = self.state.lock().unwrap().tx.clone(); diff --git a/net/webrtc/src/signaller/imp.rs b/net/webrtc/src/signaller/imp.rs index 90918251..8aa94a1d 100644 --- a/net/webrtc/src/signaller/imp.rs +++ b/net/webrtc/src/signaller/imp.rs @@ -83,22 +83,30 @@ impl Signaller { mpsc::channel::(1000); let element_clone = element.downgrade(); let send_task_handle = task::spawn(async move { + let mut res = Ok(()); while let Some(msg) = websocket_receiver.next().await { if let Some(element) = element_clone.upgrade() { gst::trace!(CAT, obj: element, "Sending websocket message {:?}", msg); } - ws_sink + res = ws_sink .send(WsMessage::Text(serde_json::to_string(&msg).unwrap())) - .await?; + .await; + + if let Err(ref err) = res { + if let Some(element) = element_clone.upgrade() { + gst::error!(CAT, obj: element, "Quitting send loop: {err}") + } + break; + } } if let Some(element) = element_clone.upgrade() { - gst::info!(CAT, obj: element, "Done sending"); + gst::debug!(CAT, obj: element, "Done sending"); } - ws_sink.close().await?; + let _ = ws_sink.close().await; - Ok::<(), Error>(()) + res.map_err(Into::into) }); let meta = if let Some(meta) = element.property::>("meta") { diff --git a/net/webrtc/src/webrtcsrc/signaller/imp.rs b/net/webrtc/src/webrtcsrc/signaller/imp.rs index a85134bc..1bde6350 100644 --- a/net/webrtc/src/webrtcsrc/signaller/imp.rs +++ b/net/webrtc/src/webrtcsrc/signaller/imp.rs @@ -138,21 +138,29 @@ impl Signaller { let (websocket_sender, mut websocket_receiver) = mpsc::channel::(1000); let send_task_handle = task::spawn(glib::clone!(@weak-allow-none self as this => async move { + let mut res = Ok(()); while let Some(msg) = websocket_receiver.next().await { gst::log!(CAT, "Sending websocket message {:?}", msg); - ws_sink + res = ws_sink .send(WsMessage::Text(serde_json::to_string(&msg).unwrap())) - .await?; + .await; + + if let Err(ref err) = res { + this.as_ref().map_or_else(|| gst::error!(CAT, "Quitting send loop: {err}"), + |this| gst::error!(CAT, imp: this, "Quitting send loop: {err}") + ); + break; + } } let msg = "Done sending"; - this.map_or_else(|| gst::info!(CAT, "{msg}"), - |this| gst::info!(CAT, imp: this, "{msg}") + this.map_or_else(|| gst::debug!(CAT, "{msg}"), + |this| gst::debug!(CAT, imp: this, "{msg}") ); - ws_sink.close().await?; + let _ = ws_sink.close().await; - Ok::<(), Error>(()) + res.map_err(Into::into) })); let obj = self.obj();