mirror of
https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs.git
synced 2025-09-02 09:43:48 +00:00
webrtcsink: fix deadlock on encoder-setup
Switching to a lockable session re-introduced a similar deadlock to the
one fixed by f82a731b3a
: we would call
encoder-setup with the session lock, the default handler would try to
lock the settings, thus not respecting the locking order (settings,
state, session).
We fix this by setting up the whole encoding chain outside of any locks,
then calling a reduxed version of Session.connect_input_stream() (sans
encoder-setup / payloader-setup signal calls).
Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/2215>
This commit is contained in:
parent
12e6922ac2
commit
8256601d1d
1 changed files with 136 additions and 93 deletions
|
@ -1335,22 +1335,9 @@ impl SessionInner {
|
|||
element: &super::BaseWebRTCSink,
|
||||
producer: &StreamProducer,
|
||||
webrtc_pad: &WebRTCPad,
|
||||
codecs: &BTreeMap<i32, Codec>,
|
||||
session_setup_result: SessionSetupResult,
|
||||
) -> Result<(), Error> {
|
||||
// No stream name, pad only exists to deactivate media
|
||||
let stream_name = match webrtc_pad.stream_name {
|
||||
Some(ref name) => name,
|
||||
None => {
|
||||
gst::info!(
|
||||
CAT,
|
||||
obj = element,
|
||||
"Consumer {} not connecting any input stream for inactive media {}",
|
||||
self.peer_id,
|
||||
webrtc_pad.media_idx
|
||||
);
|
||||
return Ok(());
|
||||
}
|
||||
};
|
||||
let (appsrc, encoding_chain, caps, codec, stream_name) = session_setup_result;
|
||||
|
||||
gst::info!(
|
||||
CAT,
|
||||
|
@ -1361,79 +1348,9 @@ impl SessionInner {
|
|||
webrtc_pad.media_idx
|
||||
);
|
||||
|
||||
let payload = webrtc_pad.payload.unwrap();
|
||||
|
||||
let codec = match self.codecs {
|
||||
Some(ref codecs) => {
|
||||
gst::debug!(CAT, obj = element, "Picking codec from remote offer");
|
||||
|
||||
codecs
|
||||
.get(&payload)
|
||||
.cloned()
|
||||
.ok_or_else(|| anyhow!("No codec for payload {}", payload))?
|
||||
}
|
||||
None => {
|
||||
gst::debug!(CAT, obj = element, "Picking codec from local offer");
|
||||
|
||||
codecs
|
||||
.get(&payload)
|
||||
.cloned()
|
||||
.ok_or_else(|| anyhow!("No codec for payload {}", payload))?
|
||||
}
|
||||
};
|
||||
|
||||
let appsrc = make_element("appsrc", Some(stream_name))?;
|
||||
self.pipeline.add(&appsrc).unwrap();
|
||||
|
||||
let pay_filter = make_element("capsfilter", None)?;
|
||||
self.pipeline.add(&pay_filter).unwrap();
|
||||
|
||||
let output_caps = codec.output_filter().unwrap_or_else(gst::Caps::new_any);
|
||||
|
||||
let PayloadChain {
|
||||
payloader,
|
||||
encoding_chain,
|
||||
} = PayloadChainBuilder::new(
|
||||
&webrtc_pad.in_caps,
|
||||
&output_caps,
|
||||
&codec,
|
||||
element.emit_by_name::<Option<gst::Element>>(
|
||||
"request-encoded-filter",
|
||||
&[&Some(&self.peer_id), &stream_name, &codec.caps],
|
||||
),
|
||||
)
|
||||
.build(&self.pipeline, &appsrc)?;
|
||||
|
||||
if let Some(ref enc) = encoding_chain.encoder {
|
||||
element.emit_by_name::<bool>("encoder-setup", &[&self.peer_id, &stream_name, &enc]);
|
||||
}
|
||||
|
||||
let sdp = self.sdp.as_ref().unwrap();
|
||||
let sdp_media = sdp.media(webrtc_pad.media_idx).unwrap();
|
||||
|
||||
let mut global_caps = gst::Caps::new_empty_simple("application/x-unknown");
|
||||
|
||||
sdp.attributes_to_caps(global_caps.get_mut().unwrap())
|
||||
.unwrap();
|
||||
sdp_media
|
||||
.attributes_to_caps(global_caps.get_mut().unwrap())
|
||||
.unwrap();
|
||||
|
||||
let caps = sdp_media
|
||||
.caps_from_media(payload)
|
||||
.unwrap()
|
||||
.intersect(&global_caps);
|
||||
|
||||
element.imp().configure_payloader(
|
||||
&self.peer_id,
|
||||
stream_name,
|
||||
&payloader,
|
||||
&codec,
|
||||
Some(webrtc_pad.ssrc),
|
||||
Some(&caps),
|
||||
ExtensionConfigurationType::Skip,
|
||||
)?;
|
||||
|
||||
// At this point, the peer has provided its answer, and we want to
|
||||
// let the payloader / encoder perform negotiation according to that.
|
||||
//
|
||||
|
@ -1483,7 +1400,7 @@ impl SessionInner {
|
|||
&self.id,
|
||||
codec.caps.structure(0).unwrap().name(),
|
||||
transceiver,
|
||||
stream_name.clone(),
|
||||
stream_name.to_string(),
|
||||
) {
|
||||
match self.cc_info.heuristic {
|
||||
WebRTCSinkCongestionControl::Disabled => {
|
||||
|
@ -1715,6 +1632,8 @@ enum ExtensionConfigurationType {
|
|||
Apply { twcc_id: u32 },
|
||||
}
|
||||
|
||||
type SessionSetupResult = (gst::Element, EncodingChain, gst::Caps, Codec, String);
|
||||
|
||||
impl BaseWebRTCSink {
|
||||
fn configure_congestion_control(
|
||||
&self,
|
||||
|
@ -3625,6 +3544,93 @@ impl BaseWebRTCSink {
|
|||
}
|
||||
}
|
||||
|
||||
fn setup_session_payloading_chain(
|
||||
&self,
|
||||
peer_id: &str,
|
||||
webrtc_pad: &WebRTCPad,
|
||||
codecs: &BTreeMap<i32, Codec>,
|
||||
sdp: &gst_sdp::SDPMessage,
|
||||
pipeline: &gst::Pipeline,
|
||||
) -> Result<Option<SessionSetupResult>, Error> {
|
||||
let stream_name = match webrtc_pad.stream_name {
|
||||
Some(ref name) => name,
|
||||
None => {
|
||||
gst::info!(
|
||||
CAT,
|
||||
imp = self,
|
||||
"Consumer {} not connecting any input stream for inactive media {}",
|
||||
peer_id,
|
||||
webrtc_pad.media_idx
|
||||
);
|
||||
return Ok(None);
|
||||
}
|
||||
};
|
||||
|
||||
let appsrc = make_element("appsrc", Some(stream_name))?;
|
||||
pipeline.add(&appsrc).unwrap();
|
||||
|
||||
let payload = webrtc_pad.payload.unwrap();
|
||||
|
||||
let codec = codecs
|
||||
.get(&payload)
|
||||
.cloned()
|
||||
.ok_or_else(|| anyhow!("No codec for payload {}", payload))?;
|
||||
|
||||
let output_caps = codec.output_filter().unwrap_or_else(gst::Caps::new_any);
|
||||
|
||||
let PayloadChain {
|
||||
payloader,
|
||||
encoding_chain,
|
||||
} = PayloadChainBuilder::new(
|
||||
&webrtc_pad.in_caps,
|
||||
&output_caps,
|
||||
&codec,
|
||||
self.obj().emit_by_name::<Option<gst::Element>>(
|
||||
"request-encoded-filter",
|
||||
&[&Some(peer_id), &stream_name, &codec.caps],
|
||||
),
|
||||
)
|
||||
.build(pipeline, &appsrc)?;
|
||||
|
||||
if let Some(ref enc) = encoding_chain.encoder {
|
||||
self.obj()
|
||||
.emit_by_name::<bool>("encoder-setup", &[&peer_id.to_string(), &stream_name, &enc]);
|
||||
}
|
||||
|
||||
let sdp_media = sdp.media(webrtc_pad.media_idx).unwrap();
|
||||
|
||||
let mut global_caps = gst::Caps::new_empty_simple("application/x-unknown");
|
||||
|
||||
sdp.attributes_to_caps(global_caps.get_mut().unwrap())
|
||||
.unwrap();
|
||||
sdp_media
|
||||
.attributes_to_caps(global_caps.get_mut().unwrap())
|
||||
.unwrap();
|
||||
|
||||
let caps = sdp_media
|
||||
.caps_from_media(payload)
|
||||
.unwrap()
|
||||
.intersect(&global_caps);
|
||||
|
||||
self.configure_payloader(
|
||||
peer_id,
|
||||
stream_name,
|
||||
&payloader,
|
||||
&codec,
|
||||
Some(webrtc_pad.ssrc),
|
||||
Some(&caps),
|
||||
ExtensionConfigurationType::Skip,
|
||||
)?;
|
||||
|
||||
Ok(Some((
|
||||
appsrc,
|
||||
encoding_chain,
|
||||
caps,
|
||||
codec.clone(),
|
||||
stream_name.to_owned(),
|
||||
)))
|
||||
}
|
||||
|
||||
fn on_remote_description_set(&self, session_id: &str) {
|
||||
let mut state_guard = self.state.lock().unwrap();
|
||||
let mut state = state_guard.deref_mut();
|
||||
|
@ -3664,8 +3670,51 @@ impl BaseWebRTCSink {
|
|||
.and_then(|stream| stream.producer.clone())
|
||||
{
|
||||
drop(state_guard);
|
||||
|
||||
let peer_id = session.peer_id.clone();
|
||||
let session_codecs = session.codecs.clone().unwrap_or_else(|| codecs.clone());
|
||||
let sdp = session.sdp.clone();
|
||||
let pipeline = session.pipeline.clone();
|
||||
|
||||
drop(session);
|
||||
|
||||
let res = match self.setup_session_payloading_chain(
|
||||
&peer_id,
|
||||
webrtc_pad,
|
||||
&session_codecs,
|
||||
sdp.as_ref().unwrap(),
|
||||
&pipeline,
|
||||
) {
|
||||
Err(err) => {
|
||||
gst::error!(
|
||||
CAT,
|
||||
imp = self,
|
||||
"Failed to setup elements {} for session {}: {}",
|
||||
stream_name,
|
||||
session_id,
|
||||
err
|
||||
);
|
||||
remove = true;
|
||||
session = session_clone.lock().unwrap();
|
||||
state_guard = self.state.lock().unwrap();
|
||||
state = state_guard.deref_mut();
|
||||
break;
|
||||
}
|
||||
Ok(Some(res)) => res,
|
||||
Ok(None) => {
|
||||
session = session_clone.lock().unwrap();
|
||||
state_guard = self.state.lock().unwrap();
|
||||
state = state_guard.deref_mut();
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
session = session_clone.lock().unwrap();
|
||||
state_guard = self.state.lock().unwrap();
|
||||
state = state_guard.deref_mut();
|
||||
|
||||
if let Err(err) =
|
||||
session.connect_input_stream(&self.obj(), &producer, webrtc_pad, &codecs)
|
||||
session.connect_input_stream(&self.obj(), &producer, webrtc_pad, res)
|
||||
{
|
||||
gst::error!(
|
||||
CAT,
|
||||
|
@ -3676,14 +3725,8 @@ impl BaseWebRTCSink {
|
|||
err
|
||||
);
|
||||
remove = true;
|
||||
state_guard = self.state.lock().unwrap();
|
||||
state = state_guard.deref_mut();
|
||||
break;
|
||||
}
|
||||
drop(session);
|
||||
state_guard = self.state.lock().unwrap();
|
||||
state = state_guard.deref_mut();
|
||||
session = session_clone.lock().unwrap();
|
||||
} else {
|
||||
gst::error!(
|
||||
CAT,
|
||||
|
|
Loading…
Reference in a new issue