webrtcsink: don't block the tokio runtime while holding state lock in unprepare()

It is possible that in unprepare(), waiting for a task to complete while
holding the state lock, that task may be waiting to acquire the state lock and
result in a deadlock.

This is quick to reproduce when starting and stopping webrtcsink in very quick
succession.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1931>
This commit is contained in:
Matthew Waters 2024-11-21 15:10:55 +11:00
parent 29b5ccacdf
commit 25bb2a12f1

View file

@ -2179,13 +2179,19 @@ impl BaseWebRTCSink {
drop(settings); drop(settings);
let mut state = self.state.lock().unwrap(); let mut state = self.state.lock().unwrap();
let mut join_handles = vec![];
#[cfg(feature = "web_server")] #[cfg(feature = "web_server")]
if let Some(web_shutdown_tx) = state.web_shutdown_tx.take() { if let Some(web_shutdown_tx) = state.web_shutdown_tx.take() {
let _ = web_shutdown_tx.send(()); let _ = web_shutdown_tx.send(());
let web_join_handle = state.web_join_handle.take().expect("no web join handle"); let web_join_handle = state.web_join_handle.take().expect("no web join handle");
RUNTIME.block_on(async { // wait for this later
let _ = web_join_handle.await; join_handles.push(
}); async {
let _ = web_join_handle.await;
}
.boxed_local(),
);
} }
let session_ids: Vec<_> = state.sessions.keys().map(|k| k.to_owned()).collect(); let session_ids: Vec<_> = state.sessions.keys().map(|k| k.to_owned()).collect();
@ -2208,9 +2214,12 @@ impl BaseWebRTCSink {
gst::debug!(CAT, imp = self, "Waiting for codec discoveries to finish"); gst::debug!(CAT, imp = self, "Waiting for codec discoveries to finish");
let codecs_done_receiver = std::mem::take(&mut state.codecs_done_receivers); let codecs_done_receiver = std::mem::take(&mut state.codecs_done_receivers);
codecs_done_receiver.into_iter().for_each(|receiver| { codecs_done_receiver.into_iter().for_each(|receiver| {
RUNTIME.block_on(async { join_handles.push(
let _ = receiver.await; async {
}); let _ = receiver.await;
}
.boxed_local(),
);
}); });
gst::debug!(CAT, imp = self, "No codec discovery is running anymore"); gst::debug!(CAT, imp = self, "No codec discovery is running anymore");
@ -2223,6 +2232,14 @@ impl BaseWebRTCSink {
} }
drop(state); drop(state);
// only wait for all handles after the state lock has been dropped. Some of the futures may
// be waiting on the state lock to make forward progress before being able to be cancelled
// from calls above.
for handle in join_handles {
RUNTIME.block_on(handle);
}
gst::debug!(CAT, imp = self, "Ending sessions"); gst::debug!(CAT, imp = self, "Ending sessions");
for session in sessions { for session in sessions {
signaller.end_session(&session.0.lock().unwrap().id); signaller.end_session(&session.0.lock().unwrap().id);