spotifyaudiosrc: Fix race condition when unlocking

It would be possible that there is no cancellable yet when unlock() is
called, then the setup task is started and it would simply run and being
waited on instead of not being run at all.

To solve this, remember if unlock() was called and reset this in
unlock_stop().

Also make sure to not keep the abort handle locked while waiting,
otherwise cancellation would never actually work.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1602>
This commit is contained in:
Sebastian Dröge 2024-06-07 15:03:10 +03:00 committed by GStreamer Marge Bot
parent 9945b702b8
commit c42040fbb8

View file

@ -6,7 +6,7 @@
//
// SPDX-License-Identifier: MPL-2.0
use std::sync::{mpsc, Arc, Mutex, MutexGuard};
use std::sync::{mpsc, Arc, Mutex};
use futures::future::{AbortHandle, Abortable, Aborted};
use once_cell::sync::Lazy;
@ -67,15 +67,39 @@ struct Settings {
}
#[derive(Default)]
pub struct SpotifyAudioSrc {
setup_thread: Mutex<Option<SetupThread>>,
state: Arc<Mutex<Option<State>>>,
settings: Mutex<Settings>,
enum SetupThread {
#[default]
None,
Pending {
thread_handle: Option<std::thread::JoinHandle<Result<anyhow::Result<()>, Aborted>>>,
abort_handle: AbortHandle,
},
Cancelled,
Done,
}
struct SetupThread {
thread_handle: std::thread::JoinHandle<Result<anyhow::Result<()>, Aborted>>,
abort_handle: AbortHandle,
impl SetupThread {
fn abort(&mut self) {
// Cancel setup thread if it is pending and not done yet
if matches!(self, SetupThread::None | SetupThread::Done) {
return;
}
if let SetupThread::Pending {
ref abort_handle, ..
} = *self
{
abort_handle.abort();
}
*self = SetupThread::Cancelled;
}
}
#[derive(Default)]
pub struct SpotifyAudioSrc {
setup_thread: Mutex<SetupThread>,
state: Arc<Mutex<Option<State>>>,
settings: Mutex<Settings>,
}
#[glib::object_subclass]
@ -172,21 +196,18 @@ impl BaseSrcImpl for SpotifyAudioSrc {
}
{
let setup_thread = self.setup_thread.lock().unwrap();
if setup_thread.is_some() {
// already starting
return Ok(());
// If not started yet and not cancelled, start the setup
let mut setup_thread = self.setup_thread.lock().unwrap();
assert!(!matches!(&*setup_thread, SetupThread::Cancelled));
if matches!(&*setup_thread, SetupThread::None) {
self.start_setup(&mut setup_thread);
}
self.start_setup(setup_thread);
}
Ok(())
}
fn stop(&self) -> Result<(), gst::ErrorMessage> {
// stop the setup if it's not completed yet
self.cancel_setup();
if let Some(state) = self.state.lock().unwrap().take() {
gst::debug!(CAT, imp: self, "stopping");
state.player.stop();
@ -199,9 +220,17 @@ impl BaseSrcImpl for SpotifyAudioSrc {
}
fn unlock(&self) -> Result<(), gst::ErrorMessage> {
self.cancel_setup();
let mut setup_thread = self.setup_thread.lock().unwrap();
setup_thread.abort();
Ok(())
}
self.parent_unlock()
fn unlock_stop(&self) -> Result<(), gst::ErrorMessage> {
let mut setup_thread = self.setup_thread.lock().unwrap();
if matches!(&*setup_thread, SetupThread::Cancelled) {
*setup_thread = SetupThread::None;
}
Ok(())
}
}
@ -216,30 +245,47 @@ impl PushSrcImpl for SpotifyAudioSrc {
};
if !state_set {
let setup_thread = self.setup_thread.lock().unwrap();
if setup_thread.is_none() {
// unlock() could potentially cancel the setup, and create() can be called after unlock() without going through start() again.
self.start_setup(setup_thread);
// If not started yet and not cancelled, start the setup
let mut setup_thread = self.setup_thread.lock().unwrap();
if matches!(&*setup_thread, SetupThread::Cancelled) {
return Err(gst::FlowError::Flushing);
}
if matches!(&*setup_thread, SetupThread::None) {
self.start_setup(&mut setup_thread);
}
}
{
// wait for the setup to be completed
let mut setup_thread = self.setup_thread.lock().unwrap();
if let Some(setup) = setup_thread.take() {
let res = setup.thread_handle.join().unwrap();
if let SetupThread::Pending {
ref mut thread_handle,
..
} = *setup_thread
{
let thread_handle = thread_handle.take().expect("Waiting multiple times");
drop(setup_thread);
let res = thread_handle.join().unwrap();
match res {
Err(_aborted) => {
gst::debug!(CAT, imp: self, "setup has been cancelled");
setup_thread = self.setup_thread.lock().unwrap();
*setup_thread = SetupThread::Cancelled;
return Err(gst::FlowError::Flushing);
}
Ok(Err(err)) => {
gst::error!(CAT, imp: self, "failed to start: {err:?}");
gst::element_imp_error!(self, gst::ResourceError::Settings, ["{err:?}"]);
setup_thread = self.setup_thread.lock().unwrap();
*setup_thread = SetupThread::None;
return Err(gst::FlowError::Error);
}
Ok(Ok(_)) => {}
Ok(Ok(_)) => {
setup_thread = self.setup_thread.lock().unwrap();
*setup_thread = SetupThread::Done;
}
}
}
}
@ -331,7 +377,9 @@ impl URIHandlerImpl for SpotifyAudioSrc {
}
impl SpotifyAudioSrc {
fn start_setup(&self, mut setup_thread: MutexGuard<Option<SetupThread>>) {
fn start_setup(&self, setup_thread: &mut SetupThread) {
assert!(matches!(setup_thread, SetupThread::None));
let self_ = self.to_owned();
// run the runtime from another thread to prevent the "start a runtime from within a runtime" panic
@ -344,10 +392,10 @@ impl SpotifyAudioSrc {
})
});
setup_thread.replace(SetupThread {
thread_handle,
*setup_thread = SetupThread::Pending {
thread_handle: Some(thread_handle),
abort_handle,
});
};
}
async fn setup(&self) -> anyhow::Result<()> {
@ -420,12 +468,4 @@ impl SpotifyAudioSrc {
Ok(())
}
fn cancel_setup(&self) {
let mut setup_thread = self.setup_thread.lock().unwrap();
if let Some(setup) = setup_thread.take() {
setup.abort_handle.abort();
}
}
}