webrtchttp: Fix race condition when unlocking

It would be possible that there is no cancellable yet when unlock() is
called, then a new future is executed and it wouldn't have any
information that it is not supposed to run at all.

To solve this remember if cancellation should happen and reset this
later.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1617>
This commit is contained in:
Sebastian Dröge 2024-06-07 19:15:05 +03:00 committed by GStreamer Marge Bot
parent d7b2f15df6
commit 61d2259b6b
3 changed files with 170 additions and 108 deletions

View file

@ -23,8 +23,26 @@ pub static RUNTIME: Lazy<runtime::Runtime> = Lazy::new(|| {
.unwrap()
});
#[derive(Default)]
pub enum Canceller {
#[default]
None,
Handle(future::AbortHandle),
Cancelled,
}
impl Canceller {
pub fn abort(&mut self) {
if let Canceller::Handle(ref canceller) = *self {
canceller.abort();
}
*self = Canceller::Cancelled;
}
}
pub async fn wait_async<F, T>(
canceller: &Mutex<Option<future::AbortHandle>>,
canceller_mutex: &Mutex<Canceller>,
future: F,
timeout: u32,
) -> Result<T, WaitError>
@ -34,15 +52,17 @@ where
{
let (abort_handle, abort_registration) = future::AbortHandle::new_pair();
{
let mut canceller_guard = canceller.lock().unwrap();
if canceller_guard.is_some() {
let mut canceller = canceller_mutex.lock().unwrap();
if matches!(*canceller, Canceller::Cancelled) {
return Err(WaitError::FutureAborted);
} else if matches!(*canceller, Canceller::Handle(..)) {
return Err(WaitError::FutureError(gst::error_msg!(
gst::ResourceError::Failed,
["Old Canceller should not exist"]
)));
}
canceller_guard.replace(abort_handle);
drop(canceller_guard);
*canceller = Canceller::Handle(abort_handle);
drop(canceller);
}
let future = async {
@ -76,14 +96,19 @@ where
let res = future.await;
let mut canceller_guard = canceller.lock().unwrap();
*canceller_guard = None;
{
let mut canceller = canceller_mutex.lock().unwrap();
if matches!(*canceller, Canceller::Cancelled) {
return Err(WaitError::FutureAborted);
}
*canceller = Canceller::None;
}
res
}
pub fn wait<F, T>(
canceller: &Mutex<Option<future::AbortHandle>>,
canceller_mutex: &Mutex<Canceller>,
future: F,
timeout: u32,
) -> Result<T, WaitError>
@ -91,18 +116,18 @@ where
F: Send + Future<Output = Result<T, ErrorMessage>>,
T: Send + 'static,
{
let mut canceller_guard = canceller.lock().unwrap();
let (abort_handle, abort_registration) = future::AbortHandle::new_pair();
if canceller_guard.is_some() {
let mut canceller = canceller_mutex.lock().unwrap();
if matches!(*canceller, Canceller::Cancelled) {
return Err(WaitError::FutureAborted);
} else if matches!(*canceller, Canceller::Handle(..)) {
return Err(WaitError::FutureError(gst::error_msg!(
gst::ResourceError::Failed,
["Old Canceller should not exist"]
)));
}
canceller_guard.replace(abort_handle);
drop(canceller_guard);
let (abort_handle, abort_registration) = future::AbortHandle::new_pair();
*canceller = Canceller::Handle(abort_handle);
drop(canceller);
let future = async {
if timeout == 0 {
@ -138,8 +163,11 @@ where
futures::executor::block_on(future)
};
canceller_guard = canceller.lock().unwrap();
*canceller_guard = None;
let mut canceller = canceller_mutex.lock().unwrap();
if matches!(*canceller, Canceller::Cancelled) {
return Err(WaitError::FutureAborted);
}
*canceller = Canceller::None;
res
}

View file

@ -8,13 +8,12 @@
// SPDX-License-Identifier: MPL-2.0
use crate::utils::{
build_reqwest_client, parse_redirect_location, set_ice_servers, wait, wait_async, WaitError,
RUNTIME,
self, build_reqwest_client, parse_redirect_location, set_ice_servers, wait, wait_async,
WaitError, RUNTIME,
};
use crate::IceTransportPolicy;
use async_recursion::async_recursion;
use bytes::Bytes;
use futures::future;
use gst::{glib, prelude::*, subclass::prelude::*};
use gst_sdp::*;
use gst_webrtc::*;
@ -101,7 +100,7 @@ pub struct WhepSrc {
settings: Mutex<Settings>,
state: Mutex<State>,
webrtcbin: gst::Element,
canceller: Mutex<Option<future::AbortHandle>>,
canceller: Mutex<utils::Canceller>,
client: reqwest::Client,
}
@ -120,7 +119,7 @@ impl Default for WhepSrc {
settings: Mutex::new(Settings::default()),
state: Mutex::new(State::default()),
webrtcbin,
canceller: Mutex::new(None),
canceller: Mutex::new(utils::Canceller::default()),
client,
}
}
@ -157,11 +156,13 @@ impl ElementImpl for WhepSrc {
PAD_TEMPLATES.as_ref()
}
#[allow(clippy::single_match)]
fn change_state(
&self,
transition: gst::StateChange,
) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
if transition == gst::StateChange::NullToReady {
match transition {
gst::StateChange::NullToReady => {
/*
* Fail the state change if WHEP endpoint has not been set by the
* time ReadyToPaused transition happens. This prevents us from
@ -178,7 +179,9 @@ impl ElementImpl for WhepSrc {
* Check if we have a valid URL. We can be assured any further URL
* handling won't fail due to invalid URLs.
*/
if let Err(e) = reqwest::Url::parse(settings.whep_endpoint.as_ref().unwrap().as_str()) {
if let Err(e) =
reqwest::Url::parse(settings.whep_endpoint.as_ref().unwrap().as_str())
{
gst::error!(
CAT,
imp: self,
@ -191,10 +194,21 @@ impl ElementImpl for WhepSrc {
drop(settings);
}
if transition == gst::StateChange::PausedToReady {
if let Some(canceller) = &*self.canceller.lock().unwrap() {
gst::StateChange::PausedToReady => {
let mut canceller = self.canceller.lock().unwrap();
canceller.abort();
}
_ => (),
}
let res = self.parent_change_state(transition)?;
match transition {
gst::StateChange::PausedToReady => {
{
let mut canceller = self.canceller.lock().unwrap();
*canceller = utils::Canceller::None;
}
let state = self.state.lock().unwrap();
if let State::Running { .. } = *state {
@ -212,8 +226,10 @@ impl ElementImpl for WhepSrc {
}
}
}
_ => (),
}
self.parent_change_state(transition)
Ok(res)
}
}

View file

@ -8,12 +8,11 @@
// SPDX-License-Identifier: MPL-2.0
use crate::utils::{
build_reqwest_client, parse_redirect_location, set_ice_servers, wait, wait_async, WaitError,
RUNTIME,
self, build_reqwest_client, parse_redirect_location, set_ice_servers, wait, wait_async,
WaitError, RUNTIME,
};
use crate::IceTransportPolicy;
use async_recursion::async_recursion;
use futures::future;
use gst::glib;
use gst::prelude::*;
use gst::subclass::prelude::*;
@ -76,7 +75,7 @@ pub struct WhipSink {
settings: Mutex<Settings>,
state: Mutex<State>,
webrtcbin: gst::Element,
canceller: Mutex<Option<future::AbortHandle>>,
canceller: Mutex<utils::Canceller>,
}
impl Default for WhipSink {
@ -89,7 +88,7 @@ impl Default for WhipSink {
settings: Mutex::new(Settings::default()),
state: Mutex::new(State::default()),
webrtcbin,
canceller: Mutex::new(None),
canceller: Mutex::new(utils::Canceller::default()),
}
}
}
@ -146,11 +145,13 @@ impl ElementImpl for WhipSink {
Some(sink_pad.upcast())
}
#[allow(clippy::single_match)]
fn change_state(
&self,
transition: gst::StateChange,
) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
if transition == gst::StateChange::NullToReady {
match transition {
gst::StateChange::NullToReady => {
/*
* Fail the state change if WHIP endpoint has not been set by the
* time ReadyToPaused transition happens. This prevents us from
@ -167,7 +168,9 @@ impl ElementImpl for WhipSink {
* Check if we have a valid URL. We can be assured any further URL
* handling won't fail due to invalid URLs.
*/
if let Err(e) = reqwest::Url::parse(settings.whip_endpoint.as_ref().unwrap().as_str()) {
if let Err(e) =
reqwest::Url::parse(settings.whip_endpoint.as_ref().unwrap().as_str())
{
gst::error!(
CAT,
imp: self,
@ -180,11 +183,24 @@ impl ElementImpl for WhipSink {
drop(settings);
}
if transition == gst::StateChange::PausedToReady {
gst::StateChange::PausedToReady => {
// Interrupt requests in progress, if any
if let Some(canceller) = &*self.canceller.lock().unwrap() {
{
let mut canceller = self.canceller.lock().unwrap();
canceller.abort();
}
}
_ => (),
}
let res = self.parent_change_state(transition)?;
match transition {
gst::StateChange::PausedToReady => {
{
let mut canceller = self.canceller.lock().unwrap();
*canceller = utils::Canceller::None;
}
let state = self.state.lock().unwrap();
if let State::Running { .. } = *state {
@ -193,8 +209,10 @@ impl ElementImpl for WhipSink {
self.terminate_session();
}
}
_ => (),
}
self.parent_change_state(transition)
Ok(res)
}
}