webrtcsink: fix deadlock on fast session removal

When a session was removed while its input streams were still getting
connected, a race condition was possible where:

* connect_input_stream was getting called from webrtcbin's pc thread
* the state of the pipeline was getting set to NULL in another thread
* connect_input_stream tried to sync state to NULL, and deadlocked
  because webrtcbin tried to stop its pc thread synchronously

This commit fixes this by making sure we hold the session lock when
setting the state of the pipeline to NULL, thus ensuring
`connect_input_stream` isn't getting called at the same time.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/2215>
This commit is contained in:
Mathieu Duponchelle 2025-04-24 20:42:22 +02:00
parent 30a0b155fc
commit 12e6922ac2

View file

@ -368,7 +368,7 @@ struct State {
streams: HashMap<String, InputStream>,
discoveries: HashMap<String, Vec<DiscoveryInfo>>,
signaller_signals: Option<SignallerSignals>,
finalizing_sessions: Arc<(Mutex<HashSet<String>>, Condvar)>,
finalizing_sessions: Arc<(Mutex<HashMap<String, Session>>, Condvar)>,
#[cfg(feature = "web_server")]
web_shutdown_tx: Option<tokio::sync::oneshot::Sender<()>>,
#[cfg(feature = "web_server")]
@ -569,7 +569,7 @@ impl Default for State {
streams: HashMap::new(),
discoveries: HashMap::new(),
signaller_signals: Default::default(),
finalizing_sessions: Arc::new((Mutex::new(HashSet::new()), Condvar::new())),
finalizing_sessions: Arc::new((Mutex::new(HashMap::new()), Condvar::new())),
#[cfg(feature = "web_server")]
web_shutdown_tx: None,
#[cfg(feature = "web_server")]
@ -1184,26 +1184,31 @@ impl VideoEncoder {
}
impl State {
fn finalize_session(&mut self, element: &super::BaseWebRTCSink, session: &mut SessionInner) {
gst::info!(CAT, "Ending session {}", session.id);
session.pipeline.debug_to_dot_file_with_ts(
fn finalize_session(&mut self, element: &super::BaseWebRTCSink, session: Session) {
let mut inner = session.0.lock().unwrap();
gst::info!(CAT, "Ending session {}", inner.id);
inner.pipeline.debug_to_dot_file_with_ts(
gst::DebugGraphDetails::all(),
format!("removing-session-{}-", session.id),
format!("removing-session-{}-", inner.id),
);
for ssrc in session.webrtc_pads.keys() {
session.links.remove(ssrc);
let webrtc_pads: HashMap<_, _> = inner.webrtc_pads.drain().collect();
for ssrc in webrtc_pads.keys() {
inner.links.remove(ssrc);
}
let stats_collection_handle = session.stats_collection_handle.take();
let stats_collection_handle = inner.stats_collection_handle.take();
let pipeline = inner.pipeline.clone();
let session_id = inner.id.clone();
drop(inner);
let finalizing_sessions = self.finalizing_sessions.clone();
let session_id = session.id.clone();
let (sessions, _cvar) = &*finalizing_sessions;
sessions.lock().unwrap().insert(session_id.clone());
sessions.lock().unwrap().insert(session_id.clone(), session);
let element = element.clone();
let pipeline = session.pipeline.clone();
RUNTIME.spawn_blocking(move || {
if let Some(stats_collection_handle) = stats_collection_handle {
stats_collection_handle.abort();
@ -1215,7 +1220,15 @@ impl State {
let (sessions, cvar) = &*finalizing_sessions;
let mut sessions = sessions.lock().unwrap();
sessions.remove(&session_id);
let session = sessions.remove(&session_id).unwrap();
let _ = session
.0
.lock()
.unwrap()
.pipeline
.set_state(gst::State::Null);
cvar.notify_one();
gst::debug!(CAT, obj = element, "Session {session_id} ended");
@ -1228,7 +1241,7 @@ impl State {
session_id: &str,
) -> Option<Session> {
if let Some(session) = self.sessions.remove(session_id) {
self.finalize_session(element, &mut session.0.lock().unwrap());
self.finalize_session(element, session.clone());
Some(session)
} else {
None
@ -3706,7 +3719,7 @@ impl BaseWebRTCSink {
if remove {
let _ = state.sessions.remove(&session.id);
state.finalize_session(&self.obj(), &mut session);
state.finalize_session(&self.obj(), Session(session_clone.clone()));
drop(state_guard);
let settings = self.settings.lock().unwrap();
let signaller = settings.signaller.clone();