fallbackswitch: Move active_sinkpad out of State into its own Mutex

As described in issue #200, we hold the srcpad's stream lock in some
situations where we notify the `active-pad` property.

If there's a handler installed it will most likely attempt to read the
property, which had to take the `state` lock. Another thread could
already be holding this lock and attempting to obtain the srcpad's
stream lock. This resulted in a deadlock.

To avoid this, move the `active_sinkpad` field into its own Mutex, which
we never hold for long.

Fixes: https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/issues/200
This commit is contained in:
Jan Alexander Steffens (heftig) 2022-05-12 15:43:30 +02:00
parent 557917b92a
commit d27e279272
No known key found for this signature in database
GPG key ID: 3B94A80E50A477C7

View file

@ -66,7 +66,6 @@ impl Default for Settings {
#[derive(Debug)]
struct State {
active_sinkpad: Option<super::FallbackSwitchSinkPad>,
upstream_latency: gst::ClockTime,
timed_out: bool,
switched_pad: bool,
@ -82,7 +81,6 @@ struct State {
impl Default for State {
fn default() -> State {
State {
active_sinkpad: None,
upstream_latency: gst::ClockTime::ZERO,
timed_out: false,
switched_pad: false,
@ -389,6 +387,12 @@ impl SinkState {
pub struct FallbackSwitch {
state: Mutex<State>,
settings: Mutex<Settings>,
// Separated from the rest of the `state` because it can be
// read from a property notify, which is prone to deadlocks.
// https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/issues/200
active_sinkpad: Mutex<Option<super::FallbackSwitchSinkPad>>,
src_pad: gst::Pad,
sink_pad_serial: AtomicU32,
}
@ -397,11 +401,11 @@ impl GstObjectImpl for FallbackSwitch {}
impl FallbackSwitch {
fn set_active_pad(&self, state: &mut State, pad: &super::FallbackSwitchSinkPad) {
if state.active_sinkpad.as_ref() == Some(pad) {
let prev_active_pad = self.active_sinkpad.lock().replace(pad.clone());
if prev_active_pad.as_ref() == Some(pad) {
return;
};
}
state.active_sinkpad = Some(pad.clone());
state.switched_pad = true;
state.discont_pending = true;
@ -427,6 +431,8 @@ impl FallbackSwitch {
/* Advance the output running time to this timeout */
state.output_running_time = Some(state.timeout_running_time);
let active_sinkpad = self.active_sinkpad.lock().clone();
let mut best_priority = 0u32;
let mut best_pad = None;
@ -434,7 +440,7 @@ impl FallbackSwitch {
/* Don't consider the active sinkpad */
let pad = pad.downcast_ref::<super::FallbackSwitchSinkPad>().unwrap();
let pad_imp = FallbackSwitchSinkPad::from_instance(pad);
if Some(pad) == state.active_sinkpad.as_ref() {
if active_sinkpad.as_ref() == Some(pad) {
continue;
}
let pad_state = pad_imp.state.lock();
@ -590,11 +596,13 @@ impl FallbackSwitch {
*/
/* First see if we should become the active pad */
if state.active_sinkpad.as_ref() != Some(pad) && settings.auto_switch {
let active_sinkpad = self.active_sinkpad.lock().clone();
let mut is_active = active_sinkpad.as_ref() == Some(pad);
if !is_active && settings.auto_switch {
let pad_settings = pad_imp.settings.lock().clone();
let mut switch_to_pad = state.timed_out;
switch_to_pad |= if let Some(active_sinkpad) = &state.active_sinkpad {
switch_to_pad |= if let Some(active_sinkpad) = &active_sinkpad {
let active_sinkpad_imp = active_sinkpad.imp();
let active_pad_settings = active_sinkpad_imp.settings.lock().clone();
(pad_settings.priority < active_pad_settings.priority)
@ -613,12 +621,10 @@ impl FallbackSwitch {
if switch_to_pad {
state.timed_out = false;
self.set_active_pad(&mut state, pad)
self.set_active_pad(&mut state, pad);
is_active = true;
}
};
/* This might be the active_sinkpad now */
let is_active = state.active_sinkpad.as_ref() == Some(pad);
}
let mut pad_state = pad_imp.state.lock();
let raw_pad = !matches!(pad_state.caps_info, CapsInfo::None);
@ -702,7 +708,7 @@ impl FallbackSwitch {
return Err(gst::FlowError::Flushing);
}
let is_active = state.active_sinkpad.as_ref() == Some(pad);
let is_active = self.active_sinkpad.lock().as_ref() == Some(pad);
let switched_pad = state.switched_pad;
let discont_pending = state.discont_pending;
@ -809,7 +815,7 @@ impl FallbackSwitch {
event: gst::Event,
) -> bool {
let mut state = self.state.lock();
let forward = state.active_sinkpad.as_ref() == Some(pad);
let forward = self.active_sinkpad.lock().as_ref() == Some(pad);
let mut pad_state = pad.imp().state.lock();
@ -897,11 +903,10 @@ impl FallbackSwitch {
QueryView::Duration(_) => true,
QueryView::Caps(_) => true,
QueryView::Allocation(_) => {
let state = self.state.lock();
/* Forward allocation only for the active sink pad,
* for others switching will send a reconfigure event upstream
*/
state.active_sinkpad.as_ref() == Some(pad)
self.active_sinkpad.lock().as_ref() == Some(pad)
}
_ => {
pad.query_default(Some(element), query);
@ -920,6 +925,7 @@ impl FallbackSwitch {
fn reset(&self, _element: &super::FallbackSwitch) {
let mut state = self.state.lock();
*state = State::default();
self.active_sinkpad.lock().take();
}
fn src_query(
@ -966,10 +972,8 @@ impl FallbackSwitch {
ret
}
QueryViewMut::Caps(_) => {
let sinkpad = {
let state = self.state.lock();
state.active_sinkpad.clone()
};
// Unlock before forwarding
let sinkpad = self.active_sinkpad.lock().clone();
if let Some(sinkpad) = sinkpad {
sinkpad.peer_query(query)
@ -978,10 +982,8 @@ impl FallbackSwitch {
}
}
_ => {
let sinkpad = {
let state = self.state.lock();
state.active_sinkpad.clone()
};
// Unlock before forwarding
let sinkpad = self.active_sinkpad.lock().clone();
if let Some(sinkpad) = sinkpad {
sinkpad.peer_query(query)
@ -1013,9 +1015,10 @@ impl ObjectSubclass for FallbackSwitch {
.build();
Self {
src_pad: srcpad,
state: Mutex::new(State::default()),
settings: Mutex::new(Settings::default()),
active_sinkpad: Mutex::new(None),
src_pad: srcpad,
sink_pad_serial: AtomicU32::new(0),
}
}
@ -1153,8 +1156,7 @@ impl ObjectImpl for FallbackSwitch {
fn property(&self, _obj: &Self::Type, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
match pspec.name() {
PROP_ACTIVE_PAD => {
let state = self.state.lock();
let active_pad = state.active_sinkpad.clone();
let active_pad = self.active_sinkpad.lock().clone();
active_pad.to_value()
}
PROP_TIMEOUT => {
@ -1245,14 +1247,14 @@ impl ElementImpl for FallbackSwitch {
}
gst::StateChange::ReadyToPaused => {
let mut state = self.state.lock();
let prev_active_pad = state.active_sinkpad.clone();
let prev_active_pad = self.active_sinkpad.lock().take();
*state = State::default();
let pads = element.sink_pads();
if let Some(pad) = pads.first() {
let pad = pad.downcast_ref::<super::FallbackSwitchSinkPad>().unwrap();
state.active_sinkpad = Some(pad.clone());
*self.active_sinkpad.lock() = Some(pad.clone());
state.switched_pad = true;
state.discont_pending = true;
drop(state);
@ -1341,13 +1343,15 @@ impl ElementImpl for FallbackSwitch {
pad.set_active(true).unwrap();
element.add_pad(&pad).unwrap();
let mut notify_active_pad = false;
if state.active_sinkpad.is_none() {
state.active_sinkpad = Some(pad.clone());
state.switched_pad = true;
state.discont_pending = true;
notify_active_pad = true;
}
let notify_active_pad = match &mut *self.active_sinkpad.lock() {
active_sinkpad @ None => {
*active_sinkpad = Some(pad.clone());
state.switched_pad = true;
state.discont_pending = true;
true
}
_ => false,
};
let mut pad_settings = pad.imp().settings.lock();
pad_settings.priority = pad_serial;