From 3627e526738146ff5d2f5582f58df419f4d4acec Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Laignel?= Date: Mon, 18 Sep 2023 16:59:27 +0200 Subject: [PATCH] net/webrtcsink: don't miss ice candidates During `on_remote_description_set()` processing, current session is removed from the sessions `HashMap`. If an ice candidate is submitted to `handle_ice()` by that time, the session can't be found and the candidate is ignored. This commit wraps the Session in the sessions `HashMap` so an entry is kept while `on_remote_description_set()` is running. Incoming candidates received by `handle_ice()` will be processed immediately or enqueued and handled when the session is restored by `on_remote_description_set()`. Part-of: --- net/webrtc/src/webrtcsink/imp.rs | 189 +++++++++++++++++++++++++++---- 1 file changed, 170 insertions(+), 19 deletions(-) diff --git a/net/webrtc/src/webrtcsink/imp.rs b/net/webrtc/src/webrtcsink/imp.rs index 873d6517..c758f948 100644 --- a/net/webrtc/src/webrtcsink/imp.rs +++ b/net/webrtc/src/webrtcsink/imp.rs @@ -238,11 +238,144 @@ struct NavigationEvent { event: gst_video::NavigationEvent, } +struct IceCandidate { + sdp_m_line_index: u32, + candidate: String, +} + +/// Wrapper around `Session`. +/// +/// This makes it possible for the `Session` to be taken out of the `State`, +/// without removing the entry in the `sessions` `HashMap`, thus allowing +/// the `State` lock to be released, e.g. before calling a `Signal`. +/// +/// Taking the `Session`, replaces it with a placeholder which can enqueue +/// items (currently ICE candidates) received while the `Session` is taken. +/// In which case, the enqueued items will be processed when the `Session` is +/// restored. +enum SessionWrapper { + /// The `Session` is available in the `SessionWrapper`. + InPlace(Session), + /// The `Session` was taken out the `SessionWrapper`. + Taken(Vec), +} + +impl SessionWrapper { + /// Unwraps a reference to the `Session` of this `SessionWrapper`. + /// + /// # Panics + /// + /// Panics is the `Session` was taken. + fn unwrap(&self) -> &Session { + match self { + SessionWrapper::InPlace(session) => session, + _ => panic!("Session is not In Place"), + } + } + + /// Unwraps a mutable reference to the `Session` of this `SessionWrapper`. + /// + /// # Panics + /// + /// Panics is the `Session` was taken. + fn unwrap_mut(&mut self) -> &mut Session { + match self { + SessionWrapper::InPlace(session) => session, + _ => panic!("Session is not In Place"), + } + } + + /// Consumes the `SessionWrapper`, returning the wrapped `Session`. + /// + /// # Panics + /// + /// Panics is the `Session` was taken. + fn into_inner(self) -> Session { + match self { + SessionWrapper::InPlace(session) => session, + _ => panic!("Session is not In Place"), + } + } + + /// Takes the `Session` out of this `SessionWrapper`, leaving it in the `Taken` state. + /// + /// # Panics + /// + /// Panics is the `Session` was taken. + fn take(&mut self) -> Session { + use SessionWrapper::*; + match std::mem::replace(self, Taken(Vec::new())) { + InPlace(session) => session, + _ => panic!("Session is not In Place"), + } + } + + /// Restores a `Session` to this `SessionWrapper`. + /// + /// Processes any pending items enqueued while the `Session` was taken. + /// + /// # Panics + /// + /// Panics is the `Session` is already in place. + fn restore(&mut self, session: Session) { + let cands = if let SessionWrapper::Taken(ref cands) = self { + cands + } else { + panic!("Session is already in place"); + }; + + if !cands.is_empty() { + gst::trace!( + CAT, + "handling {} pending ice candidates for session {}", + cands.len(), + session.id, + ); + for cand in cands { + session.webrtcbin.emit_by_name::<()>( + "add-ice-candidate", + &[&cand.sdp_m_line_index, &cand.candidate], + ); + } + } + + *self = SessionWrapper::InPlace(session); + } + + /// Adds an ICE candidate to this `SessionWrapper`. + /// + /// If the `Session` is in place, the ICE candidate is added immediately, + /// otherwise, it will be added when the `Session` is restored. + fn add_ice_candidate(&mut self, session_id: &str, sdp_m_line_index: u32, candidate: &str) { + match self { + SessionWrapper::InPlace(session) => { + gst::trace!(CAT, "adding ice candidate for session {session_id}"); + session + .webrtcbin + .emit_by_name::<()>("add-ice-candidate", &[&sdp_m_line_index, &candidate]); + } + SessionWrapper::Taken(cands) => { + gst::trace!(CAT, "queuing ice candidate for session {session_id}"); + cands.push(IceCandidate { + sdp_m_line_index, + candidate: candidate.to_string(), + }); + } + } + } +} + +impl From for SessionWrapper { + fn from(session: Session) -> Self { + SessionWrapper::InPlace(session) + } +} + /* Our internal state */ struct State { signaller: Box, signaller_state: SignallerState, - sessions: HashMap, + sessions: HashMap, codecs: BTreeMap, /// Used to abort codec discovery codecs_abort_handle: Option, @@ -890,7 +1023,8 @@ impl State { session_id: &str, signal: bool, ) -> Option { - if let Some(mut session) = self.sessions.remove(session_id) { + if let Some(session) = self.sessions.remove(session_id) { + let mut session = session.into_inner(); self.finalize_session(element, &mut session, signal); Some(session) } else { @@ -1418,6 +1552,7 @@ impl WebRTCSink { if let Some(session) = state.sessions.get(session_id) { session + .unwrap() .webrtcbin .emit_by_name::<()>("set-local-description", &[&offer, &None::]); @@ -1491,6 +1626,7 @@ impl WebRTCSink { }); session + .unwrap() .webrtcbin .emit_by_name::<()>("create-offer", &[&None::, &promise]); } else { @@ -1730,7 +1866,7 @@ impl WebRTCSink { let state = this.state.lock().unwrap(); if let Some(session) = state.sessions.get(&session_id_clone) { - for webrtc_pad in session.webrtc_pads.values() { + for webrtc_pad in session.unwrap().webrtc_pads.values() { if let Some(srcpad) = webrtc_pad.pad.peer() { srcpad.send_event( gst_video::UpstreamForceKeyUnitEvent::builder() @@ -1795,7 +1931,8 @@ impl WebRTCSink { let element = element.expect("on-new-ssrc emited when webrtcsink has been disposed?"); let mut state = element.imp().state.lock().unwrap(); - if let Some(mut session) = state.sessions.get_mut(&session_id_str) { + if let Some(session) = state.sessions.get_mut(&session_id_str) { + let session = session.unwrap_mut(); if session.stats_sigid.is_none() { let session_id_str = session_id_str.clone(); @@ -1893,7 +2030,9 @@ impl WebRTCSink { state.navigation_handler = Some(NavigationEventHandler::new(element, &webrtcbin)); } - state.sessions.insert(session_id.to_string(), session); + state + .sessions + .insert(session_id.to_string(), session.into()); drop(state); drop(settings); @@ -1953,7 +2092,8 @@ impl WebRTCSink { stats: &gst::Structure, ) { let mut state = element.imp().state.lock().unwrap(); - if let Some(mut session) = state.sessions.get_mut(session_id) { + if let Some(session) = state.sessions.get_mut(session_id) { + let session = session.unwrap_mut(); if let Some(congestion_controller) = session.congestion_controller.as_mut() { congestion_controller.loss_control(element, stats, &mut session.encoders); } @@ -1973,7 +2113,8 @@ impl WebRTCSink { if let Ok(Some(stats)) = reply { let mut state = element.imp().state.lock().unwrap(); - if let Some(mut session) = state.sessions.get_mut(&session_id) { + if let Some(session) = state.sessions.get_mut(&session_id) { + let session = session.unwrap_mut(); if let Some(congestion_controller) = session.congestion_controller.as_mut() { congestion_controller.delay_control(&element, stats, &mut session.encoders,); } @@ -1990,7 +2131,7 @@ impl WebRTCSink { let mut state = element.imp().state.lock().unwrap(); if let Some(session) = state.sessions.get_mut(peer_id) { - session.rtprtxsend = Some(rtprtxsend); + session.unwrap_mut().rtprtxsend = Some(rtprtxsend); } } @@ -1999,6 +2140,8 @@ impl WebRTCSink { let mut state = element.imp().state.lock().unwrap(); if let Some(session) = state.sessions.get_mut(peer_id) { + let session = session.unwrap_mut(); + let n_encoders = session.encoders.len(); let fec_ratio = { @@ -2033,7 +2176,9 @@ impl WebRTCSink { let mut remove = false; let codecs = state.codecs.clone(); - if let Some(mut session) = state.sessions.remove(&session_id) { + if let Some(session) = state.sessions.get_mut(&session_id) { + let mut session = session.take(); + for webrtc_pad in session.webrtc_pads.clone().values() { let transceiver = webrtc_pad .pad @@ -2106,8 +2251,10 @@ impl WebRTCSink { if remove { state.finalize_session(element, &mut session, true); + } else if let Some(session_wrapper) = state.sessions.get_mut(&session.id) { + session_wrapper.restore(session); } else { - state.sessions.insert(session.id.clone(), session); + gst::warning!(CAT, "Session {} was removed", session.id); } } } @@ -2121,15 +2268,12 @@ impl WebRTCSink { _sdp_mid: Option, candidate: &str, ) -> Result<(), WebRTCSinkError> { - let state = self.state.lock().unwrap(); + let mut state = self.state.lock().unwrap(); let sdp_m_line_index = sdp_m_line_index.ok_or(WebRTCSinkError::MandatorySdpMlineIndex)?; - if let Some(session) = state.sessions.get(session_id) { - gst::trace!(CAT, "adding ice candidate for session {}", session_id); - session - .webrtcbin - .emit_by_name::<()>("add-ice-candidate", &[&sdp_m_line_index, &candidate]); + if let Some(session_wrapper) = state.sessions.get_mut(session_id) { + session_wrapper.add_ice_candidate(session_id, sdp_m_line_index, candidate); Ok(()) } else { Err(WebRTCSinkError::NoSessionWithId(session_id.to_string())) @@ -2146,6 +2290,8 @@ impl WebRTCSink { let mut state = self.state.lock().unwrap(); if let Some(session) = state.sessions.get_mut(session_id) { + let session = session.unwrap_mut(); + let sdp = desc.sdp(); session.sdp = Some(sdp.to_owned()); @@ -2403,7 +2549,7 @@ impl WebRTCSink { return Err(anyhow!("No caps found for stream {}", name)); } - if let Some(mut stream) = state.streams.get_mut(&name) { + if let Some(stream) = state.streams.get_mut(&name) { stream.out_caps = Some(caps); } } @@ -2421,7 +2567,12 @@ impl WebRTCSink { .unwrap() .sessions .iter() - .map(|(name, consumer)| (name.as_str(), consumer.gather_stats().to_send_value())), + .map(|(name, consumer)| { + ( + name.as_str(), + consumer.unwrap().gather_stats().to_send_value(), + ) + }), ) } @@ -2448,7 +2599,7 @@ impl WebRTCSink { .unwrap() .streams .iter_mut() - .for_each(|(_, mut stream)| { + .for_each(|(_, stream)| { if stream.sink_pad.upcast_ref::() == pad { // We do not want VideoInfo to consider max-framerate // when computing fps, so we strip it away here