fallbackswitch: protect src pad stream lock using Cond

Should prevent stream and State deadlocks, see https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/issues/202

Fix #202
Hopefully fix #192 as well.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1329>
This commit is contained in:
Guillaume Desmottes 2023-08-14 11:51:42 +02:00 committed by Sebastian Dröge
parent c9c49dc54c
commit 1d52139e35

View file

@ -14,7 +14,7 @@ use gst::{debug, log, trace};
use once_cell::sync::Lazy;
use parking_lot::{Mutex, MutexGuard};
use parking_lot::{Condvar, Mutex, MutexGuard};
use std::sync::atomic::{AtomicU32, Ordering};
const PROP_PRIORITY: &str = "priority";
@ -87,6 +87,10 @@ struct State {
timeout_running_time: Option<gst::ClockTime>,
timeout_clock_id: Option<gst::ClockId>,
/// If the src pad is currently busy. Should be checked and waited on using `src_busy_cond`
/// before calling anything requiring the stream lock.
src_busy: bool,
}
impl Default for State {
@ -102,6 +106,8 @@ impl Default for State {
timeout_running_time: None,
timeout_clock_id: None,
src_busy: false,
}
}
}
@ -420,6 +426,7 @@ impl SinkState {
#[derive(Debug)]
pub struct FallbackSwitch {
state: Mutex<State>,
src_busy_cond: Condvar,
settings: Mutex<Settings>,
// Separated from the rest of the `state` because it can be
@ -952,10 +959,12 @@ impl FallbackSwitch {
}
if switched_pad {
let _ = pad.push_event(gst::event::Reconfigure::new());
pad.sticky_events_foreach(|event| {
self.src_pad.push_event(event.clone());
std::ops::ControlFlow::Continue(gst::EventForeachAction::Keep)
self.with_src_busy(|| {
let _ = pad.push_event(gst::event::Reconfigure::new());
pad.sticky_events_foreach(|event| {
self.src_pad.push_event(event.clone());
std::ops::ControlFlow::Continue(gst::EventForeachAction::Keep)
});
});
self.obj().notify(PROP_ACTIVE_PAD);
@ -987,11 +996,13 @@ impl FallbackSwitch {
let out_gap_event = builder.build();
self.src_pad.push_event(out_gap_event);
self.with_src_busy(|| {
self.src_pad.push_event(out_gap_event);
});
Ok(gst::FlowSuccess::Ok)
} else {
self.src_pad.push(buffer)
self.with_src_busy(|| self.src_pad.push(buffer))
}
}
@ -1114,15 +1125,18 @@ impl FallbackSwitch {
drop(state);
if fwd_sticky {
let _ = pad.push_event(gst::event::Reconfigure::new());
pad.sticky_events_foreach(|event| {
self.src_pad.push_event(event.clone());
std::ops::ControlFlow::Continue(gst::EventForeachAction::Keep)
self.with_src_busy(|| {
let _ = pad.push_event(gst::event::Reconfigure::new());
pad.sticky_events_foreach(|event| {
self.src_pad.push_event(event.clone());
std::ops::ControlFlow::Continue(gst::EventForeachAction::Keep)
});
});
self.obj().notify(PROP_ACTIVE_PAD);
}
self.src_pad.push_event(event)
self.with_src_busy(|| self.src_pad.push_event(event))
}
fn sink_query(&self, pad: &super::FallbackSwitchSinkPad, query: &mut gst::QueryRef) -> bool {
@ -1237,6 +1251,34 @@ impl FallbackSwitch {
false
}
/// Wait until src_busy is not set and set it, execute
/// the closure, then unset it again and notify its Cond.
///
/// The State lock is taken while modifying src_busy,
/// but not while executing the closure.
fn with_src_busy<F, R>(&self, func: F) -> R
where
F: FnOnce() -> R,
{
{
let mut state = self.state.lock();
while state.src_busy {
self.src_busy_cond.wait(&mut state);
}
state.src_busy = true;
}
let ret = func();
{
let mut state = self.state.lock();
state.src_busy = false;
self.src_busy_cond.notify_one();
}
ret
}
}
#[glib::object_subclass]
@ -1260,6 +1302,7 @@ impl ObjectSubclass for FallbackSwitch {
Self {
state: Mutex::new(State::default()),
src_busy_cond: Condvar::default(),
settings: Mutex::new(Settings::default()),
active_sinkpad: Mutex::new(None),
src_pad: srcpad,