From 8256601d1df7060c91194f1015734e219602b9a6 Mon Sep 17 00:00:00 2001 From: Mathieu Duponchelle Date: Thu, 24 Apr 2025 20:45:37 +0200 Subject: [PATCH] webrtcsink: fix deadlock on encoder-setup Switching to a lockable session re-introduced a similar deadlock to the one fixed by f82a731b3a09f2752c0475ff6e51757b8551868a: we would call encoder-setup with the session lock, the default handler would try to lock the settings, thus not respecting the locking order (settings, state, session). We fix this by setting up the whole encoding chain outside of any locks, then calling a reduxed version of Session.connect_input_stream() (sans encoder-setup / payloader-setup signal calls). Part-of: --- net/webrtc/src/webrtcsink/imp.rs | 229 ++++++++++++++++++------------- 1 file changed, 136 insertions(+), 93 deletions(-) diff --git a/net/webrtc/src/webrtcsink/imp.rs b/net/webrtc/src/webrtcsink/imp.rs index 7c971952f..4a63ead2c 100644 --- a/net/webrtc/src/webrtcsink/imp.rs +++ b/net/webrtc/src/webrtcsink/imp.rs @@ -1335,22 +1335,9 @@ impl SessionInner { element: &super::BaseWebRTCSink, producer: &StreamProducer, webrtc_pad: &WebRTCPad, - codecs: &BTreeMap, + session_setup_result: SessionSetupResult, ) -> Result<(), Error> { - // No stream name, pad only exists to deactivate media - let stream_name = match webrtc_pad.stream_name { - Some(ref name) => name, - None => { - gst::info!( - CAT, - obj = element, - "Consumer {} not connecting any input stream for inactive media {}", - self.peer_id, - webrtc_pad.media_idx - ); - return Ok(()); - } - }; + let (appsrc, encoding_chain, caps, codec, stream_name) = session_setup_result; gst::info!( CAT, @@ -1361,79 +1348,9 @@ impl SessionInner { webrtc_pad.media_idx ); - let payload = webrtc_pad.payload.unwrap(); - - let codec = match self.codecs { - Some(ref codecs) => { - gst::debug!(CAT, obj = element, "Picking codec from remote offer"); - - codecs - .get(&payload) - .cloned() - .ok_or_else(|| anyhow!("No codec for payload {}", payload))? - } - None => { - gst::debug!(CAT, obj = element, "Picking codec from local offer"); - - codecs - .get(&payload) - .cloned() - .ok_or_else(|| anyhow!("No codec for payload {}", payload))? - } - }; - - let appsrc = make_element("appsrc", Some(stream_name))?; - self.pipeline.add(&appsrc).unwrap(); - let pay_filter = make_element("capsfilter", None)?; self.pipeline.add(&pay_filter).unwrap(); - let output_caps = codec.output_filter().unwrap_or_else(gst::Caps::new_any); - - let PayloadChain { - payloader, - encoding_chain, - } = PayloadChainBuilder::new( - &webrtc_pad.in_caps, - &output_caps, - &codec, - element.emit_by_name::>( - "request-encoded-filter", - &[&Some(&self.peer_id), &stream_name, &codec.caps], - ), - ) - .build(&self.pipeline, &appsrc)?; - - if let Some(ref enc) = encoding_chain.encoder { - element.emit_by_name::("encoder-setup", &[&self.peer_id, &stream_name, &enc]); - } - - let sdp = self.sdp.as_ref().unwrap(); - let sdp_media = sdp.media(webrtc_pad.media_idx).unwrap(); - - let mut global_caps = gst::Caps::new_empty_simple("application/x-unknown"); - - sdp.attributes_to_caps(global_caps.get_mut().unwrap()) - .unwrap(); - sdp_media - .attributes_to_caps(global_caps.get_mut().unwrap()) - .unwrap(); - - let caps = sdp_media - .caps_from_media(payload) - .unwrap() - .intersect(&global_caps); - - element.imp().configure_payloader( - &self.peer_id, - stream_name, - &payloader, - &codec, - Some(webrtc_pad.ssrc), - Some(&caps), - ExtensionConfigurationType::Skip, - )?; - // At this point, the peer has provided its answer, and we want to // let the payloader / encoder perform negotiation according to that. // @@ -1483,7 +1400,7 @@ impl SessionInner { &self.id, codec.caps.structure(0).unwrap().name(), transceiver, - stream_name.clone(), + stream_name.to_string(), ) { match self.cc_info.heuristic { WebRTCSinkCongestionControl::Disabled => { @@ -1715,6 +1632,8 @@ enum ExtensionConfigurationType { Apply { twcc_id: u32 }, } +type SessionSetupResult = (gst::Element, EncodingChain, gst::Caps, Codec, String); + impl BaseWebRTCSink { fn configure_congestion_control( &self, @@ -3625,6 +3544,93 @@ impl BaseWebRTCSink { } } + fn setup_session_payloading_chain( + &self, + peer_id: &str, + webrtc_pad: &WebRTCPad, + codecs: &BTreeMap, + sdp: &gst_sdp::SDPMessage, + pipeline: &gst::Pipeline, + ) -> Result, Error> { + let stream_name = match webrtc_pad.stream_name { + Some(ref name) => name, + None => { + gst::info!( + CAT, + imp = self, + "Consumer {} not connecting any input stream for inactive media {}", + peer_id, + webrtc_pad.media_idx + ); + return Ok(None); + } + }; + + let appsrc = make_element("appsrc", Some(stream_name))?; + pipeline.add(&appsrc).unwrap(); + + let payload = webrtc_pad.payload.unwrap(); + + let codec = codecs + .get(&payload) + .cloned() + .ok_or_else(|| anyhow!("No codec for payload {}", payload))?; + + let output_caps = codec.output_filter().unwrap_or_else(gst::Caps::new_any); + + let PayloadChain { + payloader, + encoding_chain, + } = PayloadChainBuilder::new( + &webrtc_pad.in_caps, + &output_caps, + &codec, + self.obj().emit_by_name::>( + "request-encoded-filter", + &[&Some(peer_id), &stream_name, &codec.caps], + ), + ) + .build(pipeline, &appsrc)?; + + if let Some(ref enc) = encoding_chain.encoder { + self.obj() + .emit_by_name::("encoder-setup", &[&peer_id.to_string(), &stream_name, &enc]); + } + + let sdp_media = sdp.media(webrtc_pad.media_idx).unwrap(); + + let mut global_caps = gst::Caps::new_empty_simple("application/x-unknown"); + + sdp.attributes_to_caps(global_caps.get_mut().unwrap()) + .unwrap(); + sdp_media + .attributes_to_caps(global_caps.get_mut().unwrap()) + .unwrap(); + + let caps = sdp_media + .caps_from_media(payload) + .unwrap() + .intersect(&global_caps); + + self.configure_payloader( + peer_id, + stream_name, + &payloader, + &codec, + Some(webrtc_pad.ssrc), + Some(&caps), + ExtensionConfigurationType::Skip, + )?; + + Ok(Some(( + appsrc, + encoding_chain, + caps, + codec.clone(), + stream_name.to_owned(), + ))) + } + fn on_remote_description_set(&self, session_id: &str) { let mut state_guard = self.state.lock().unwrap(); let mut state = state_guard.deref_mut(); @@ -3664,8 +3670,51 @@ impl BaseWebRTCSink { .and_then(|stream| stream.producer.clone()) { drop(state_guard); + + let peer_id = session.peer_id.clone(); + let session_codecs = session.codecs.clone().unwrap_or_else(|| codecs.clone()); + let sdp = session.sdp.clone(); + let pipeline = session.pipeline.clone(); + + drop(session); + + let res = match self.setup_session_payloading_chain( + &peer_id, + webrtc_pad, + &session_codecs, + sdp.as_ref().unwrap(), + &pipeline, + ) { + Err(err) => { + gst::error!( + CAT, + imp = self, + "Failed to setup elements {} for session {}: {}", + stream_name, + session_id, + err + ); + remove = true; + session = session_clone.lock().unwrap(); + state_guard = self.state.lock().unwrap(); + state = state_guard.deref_mut(); + break; + } + Ok(Some(res)) => res, + Ok(None) => { + session = session_clone.lock().unwrap(); + state_guard = self.state.lock().unwrap(); + state = state_guard.deref_mut(); + continue; + } + }; + + session = session_clone.lock().unwrap(); + state_guard = self.state.lock().unwrap(); + state = state_guard.deref_mut(); + if let Err(err) = - session.connect_input_stream(&self.obj(), &producer, webrtc_pad, &codecs) + session.connect_input_stream(&self.obj(), &producer, webrtc_pad, res) { gst::error!( CAT, @@ -3676,14 +3725,8 @@ impl BaseWebRTCSink { err ); remove = true; - state_guard = self.state.lock().unwrap(); - state = state_guard.deref_mut(); break; } - drop(session); - state_guard = self.state.lock().unwrap(); - state = state_guard.deref_mut(); - session = session_clone.lock().unwrap(); } else { gst::error!( CAT,