mirror of
https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs.git
synced 2024-11-26 05:21:00 +00:00
webrtchttp: Fix race condition when unlocking
It would be possible that there is no cancellable yet when unlock() is called, then a new future is executed and it wouldn't have any information that it is not supposed to run at all. To solve this remember if cancellation should happen and reset this later. Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1602>
This commit is contained in:
parent
51f6d3986f
commit
3d4d785a2a
3 changed files with 170 additions and 108 deletions
|
@ -23,8 +23,26 @@ pub static RUNTIME: Lazy<runtime::Runtime> = Lazy::new(|| {
|
||||||
.unwrap()
|
.unwrap()
|
||||||
});
|
});
|
||||||
|
|
||||||
|
#[derive(Default)]
|
||||||
|
pub enum Canceller {
|
||||||
|
#[default]
|
||||||
|
None,
|
||||||
|
Handle(future::AbortHandle),
|
||||||
|
Cancelled,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Canceller {
|
||||||
|
pub fn abort(&mut self) {
|
||||||
|
if let Canceller::Handle(ref canceller) = *self {
|
||||||
|
canceller.abort();
|
||||||
|
}
|
||||||
|
|
||||||
|
*self = Canceller::Cancelled;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pub async fn wait_async<F, T>(
|
pub async fn wait_async<F, T>(
|
||||||
canceller: &Mutex<Option<future::AbortHandle>>,
|
canceller_mutex: &Mutex<Canceller>,
|
||||||
future: F,
|
future: F,
|
||||||
timeout: u32,
|
timeout: u32,
|
||||||
) -> Result<T, WaitError>
|
) -> Result<T, WaitError>
|
||||||
|
@ -34,15 +52,17 @@ where
|
||||||
{
|
{
|
||||||
let (abort_handle, abort_registration) = future::AbortHandle::new_pair();
|
let (abort_handle, abort_registration) = future::AbortHandle::new_pair();
|
||||||
{
|
{
|
||||||
let mut canceller_guard = canceller.lock().unwrap();
|
let mut canceller = canceller_mutex.lock().unwrap();
|
||||||
if canceller_guard.is_some() {
|
if matches!(*canceller, Canceller::Cancelled) {
|
||||||
|
return Err(WaitError::FutureAborted);
|
||||||
|
} else if matches!(*canceller, Canceller::Handle(..)) {
|
||||||
return Err(WaitError::FutureError(gst::error_msg!(
|
return Err(WaitError::FutureError(gst::error_msg!(
|
||||||
gst::ResourceError::Failed,
|
gst::ResourceError::Failed,
|
||||||
["Old Canceller should not exist"]
|
["Old Canceller should not exist"]
|
||||||
)));
|
)));
|
||||||
}
|
}
|
||||||
canceller_guard.replace(abort_handle);
|
*canceller = Canceller::Handle(abort_handle);
|
||||||
drop(canceller_guard);
|
drop(canceller);
|
||||||
}
|
}
|
||||||
|
|
||||||
let future = async {
|
let future = async {
|
||||||
|
@ -76,14 +96,19 @@ where
|
||||||
|
|
||||||
let res = future.await;
|
let res = future.await;
|
||||||
|
|
||||||
let mut canceller_guard = canceller.lock().unwrap();
|
{
|
||||||
*canceller_guard = None;
|
let mut canceller = canceller_mutex.lock().unwrap();
|
||||||
|
if matches!(*canceller, Canceller::Cancelled) {
|
||||||
|
return Err(WaitError::FutureAborted);
|
||||||
|
}
|
||||||
|
*canceller = Canceller::None;
|
||||||
|
}
|
||||||
|
|
||||||
res
|
res
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn wait<F, T>(
|
pub fn wait<F, T>(
|
||||||
canceller: &Mutex<Option<future::AbortHandle>>,
|
canceller_mutex: &Mutex<Canceller>,
|
||||||
future: F,
|
future: F,
|
||||||
timeout: u32,
|
timeout: u32,
|
||||||
) -> Result<T, WaitError>
|
) -> Result<T, WaitError>
|
||||||
|
@ -91,18 +116,18 @@ where
|
||||||
F: Send + Future<Output = Result<T, ErrorMessage>>,
|
F: Send + Future<Output = Result<T, ErrorMessage>>,
|
||||||
T: Send + 'static,
|
T: Send + 'static,
|
||||||
{
|
{
|
||||||
let mut canceller_guard = canceller.lock().unwrap();
|
let mut canceller = canceller_mutex.lock().unwrap();
|
||||||
let (abort_handle, abort_registration) = future::AbortHandle::new_pair();
|
if matches!(*canceller, Canceller::Cancelled) {
|
||||||
|
return Err(WaitError::FutureAborted);
|
||||||
if canceller_guard.is_some() {
|
} else if matches!(*canceller, Canceller::Handle(..)) {
|
||||||
return Err(WaitError::FutureError(gst::error_msg!(
|
return Err(WaitError::FutureError(gst::error_msg!(
|
||||||
gst::ResourceError::Failed,
|
gst::ResourceError::Failed,
|
||||||
["Old Canceller should not exist"]
|
["Old Canceller should not exist"]
|
||||||
)));
|
)));
|
||||||
}
|
}
|
||||||
|
let (abort_handle, abort_registration) = future::AbortHandle::new_pair();
|
||||||
canceller_guard.replace(abort_handle);
|
*canceller = Canceller::Handle(abort_handle);
|
||||||
drop(canceller_guard);
|
drop(canceller);
|
||||||
|
|
||||||
let future = async {
|
let future = async {
|
||||||
if timeout == 0 {
|
if timeout == 0 {
|
||||||
|
@ -138,8 +163,11 @@ where
|
||||||
futures::executor::block_on(future)
|
futures::executor::block_on(future)
|
||||||
};
|
};
|
||||||
|
|
||||||
canceller_guard = canceller.lock().unwrap();
|
let mut canceller = canceller_mutex.lock().unwrap();
|
||||||
*canceller_guard = None;
|
if matches!(*canceller, Canceller::Cancelled) {
|
||||||
|
return Err(WaitError::FutureAborted);
|
||||||
|
}
|
||||||
|
*canceller = Canceller::None;
|
||||||
|
|
||||||
res
|
res
|
||||||
}
|
}
|
||||||
|
|
|
@ -8,13 +8,12 @@
|
||||||
// SPDX-License-Identifier: MPL-2.0
|
// SPDX-License-Identifier: MPL-2.0
|
||||||
|
|
||||||
use crate::utils::{
|
use crate::utils::{
|
||||||
build_reqwest_client, parse_redirect_location, set_ice_servers, wait, wait_async, WaitError,
|
self, build_reqwest_client, parse_redirect_location, set_ice_servers, wait, wait_async,
|
||||||
RUNTIME,
|
WaitError, RUNTIME,
|
||||||
};
|
};
|
||||||
use crate::IceTransportPolicy;
|
use crate::IceTransportPolicy;
|
||||||
use async_recursion::async_recursion;
|
use async_recursion::async_recursion;
|
||||||
use bytes::Bytes;
|
use bytes::Bytes;
|
||||||
use futures::future;
|
|
||||||
use gst::{glib, prelude::*, subclass::prelude::*};
|
use gst::{glib, prelude::*, subclass::prelude::*};
|
||||||
use gst_sdp::*;
|
use gst_sdp::*;
|
||||||
use gst_webrtc::*;
|
use gst_webrtc::*;
|
||||||
|
@ -101,7 +100,7 @@ pub struct WhepSrc {
|
||||||
settings: Mutex<Settings>,
|
settings: Mutex<Settings>,
|
||||||
state: Mutex<State>,
|
state: Mutex<State>,
|
||||||
webrtcbin: gst::Element,
|
webrtcbin: gst::Element,
|
||||||
canceller: Mutex<Option<future::AbortHandle>>,
|
canceller: Mutex<utils::Canceller>,
|
||||||
client: reqwest::Client,
|
client: reqwest::Client,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -120,7 +119,7 @@ impl Default for WhepSrc {
|
||||||
settings: Mutex::new(Settings::default()),
|
settings: Mutex::new(Settings::default()),
|
||||||
state: Mutex::new(State::default()),
|
state: Mutex::new(State::default()),
|
||||||
webrtcbin,
|
webrtcbin,
|
||||||
canceller: Mutex::new(None),
|
canceller: Mutex::new(utils::Canceller::default()),
|
||||||
client,
|
client,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -157,63 +156,80 @@ impl ElementImpl for WhepSrc {
|
||||||
PAD_TEMPLATES.as_ref()
|
PAD_TEMPLATES.as_ref()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[allow(clippy::single_match)]
|
||||||
fn change_state(
|
fn change_state(
|
||||||
&self,
|
&self,
|
||||||
transition: gst::StateChange,
|
transition: gst::StateChange,
|
||||||
) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
|
) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
|
||||||
if transition == gst::StateChange::NullToReady {
|
match transition {
|
||||||
/*
|
gst::StateChange::NullToReady => {
|
||||||
* Fail the state change if WHEP endpoint has not been set by the
|
/*
|
||||||
* time ReadyToPaused transition happens. This prevents us from
|
* Fail the state change if WHEP endpoint has not been set by the
|
||||||
* having to check this everywhere else.
|
* time ReadyToPaused transition happens. This prevents us from
|
||||||
*/
|
* having to check this everywhere else.
|
||||||
let settings = self.settings.lock().unwrap();
|
*/
|
||||||
|
let settings = self.settings.lock().unwrap();
|
||||||
|
|
||||||
if settings.whep_endpoint.is_none() {
|
if settings.whep_endpoint.is_none() {
|
||||||
gst::error!(CAT, imp: self, "WHEP endpoint URL must be set");
|
gst::error!(CAT, imp: self, "WHEP endpoint URL must be set");
|
||||||
return Err(gst::StateChangeError);
|
return Err(gst::StateChangeError);
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Check if we have a valid URL. We can be assured any further URL
|
||||||
|
* handling won't fail due to invalid URLs.
|
||||||
|
*/
|
||||||
|
if let Err(e) =
|
||||||
|
reqwest::Url::parse(settings.whep_endpoint.as_ref().unwrap().as_str())
|
||||||
|
{
|
||||||
|
gst::error!(
|
||||||
|
CAT,
|
||||||
|
imp: self,
|
||||||
|
"WHEP endpoint URL could not be parsed: {}",
|
||||||
|
e
|
||||||
|
);
|
||||||
|
return Err(gst::StateChangeError);
|
||||||
|
}
|
||||||
|
|
||||||
|
drop(settings);
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
gst::StateChange::PausedToReady => {
|
||||||
* Check if we have a valid URL. We can be assured any further URL
|
let mut canceller = self.canceller.lock().unwrap();
|
||||||
* handling won't fail due to invalid URLs.
|
|
||||||
*/
|
|
||||||
if let Err(e) = reqwest::Url::parse(settings.whep_endpoint.as_ref().unwrap().as_str()) {
|
|
||||||
gst::error!(
|
|
||||||
CAT,
|
|
||||||
imp: self,
|
|
||||||
"WHEP endpoint URL could not be parsed: {}",
|
|
||||||
e
|
|
||||||
);
|
|
||||||
return Err(gst::StateChangeError);
|
|
||||||
}
|
|
||||||
|
|
||||||
drop(settings);
|
|
||||||
}
|
|
||||||
|
|
||||||
if transition == gst::StateChange::PausedToReady {
|
|
||||||
if let Some(canceller) = &*self.canceller.lock().unwrap() {
|
|
||||||
canceller.abort();
|
canceller.abort();
|
||||||
}
|
}
|
||||||
|
_ => (),
|
||||||
let state = self.state.lock().unwrap();
|
|
||||||
if let State::Running { .. } = *state {
|
|
||||||
drop(state);
|
|
||||||
self.terminate_session();
|
|
||||||
}
|
|
||||||
|
|
||||||
for pad in self.obj().src_pads() {
|
|
||||||
gst::debug!(CAT, imp: self, "Removing pad: {}", pad.name());
|
|
||||||
|
|
||||||
// No need to deactivate pad here. Parent GstBin will deactivate
|
|
||||||
// the pad. Only remove the pad.
|
|
||||||
if let Err(e) = self.obj().remove_pad(&pad) {
|
|
||||||
gst::error!(CAT, imp: self, "Failed to remove pad {}: {}", pad.name(), e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
self.parent_change_state(transition)
|
let res = self.parent_change_state(transition)?;
|
||||||
|
|
||||||
|
match transition {
|
||||||
|
gst::StateChange::PausedToReady => {
|
||||||
|
{
|
||||||
|
let mut canceller = self.canceller.lock().unwrap();
|
||||||
|
*canceller = utils::Canceller::None;
|
||||||
|
}
|
||||||
|
|
||||||
|
let state = self.state.lock().unwrap();
|
||||||
|
if let State::Running { .. } = *state {
|
||||||
|
drop(state);
|
||||||
|
self.terminate_session();
|
||||||
|
}
|
||||||
|
|
||||||
|
for pad in self.obj().src_pads() {
|
||||||
|
gst::debug!(CAT, imp: self, "Removing pad: {}", pad.name());
|
||||||
|
|
||||||
|
// No need to deactivate pad here. Parent GstBin will deactivate
|
||||||
|
// the pad. Only remove the pad.
|
||||||
|
if let Err(e) = self.obj().remove_pad(&pad) {
|
||||||
|
gst::error!(CAT, imp: self, "Failed to remove pad {}: {}", pad.name(), e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
_ => (),
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(res)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -8,12 +8,11 @@
|
||||||
// SPDX-License-Identifier: MPL-2.0
|
// SPDX-License-Identifier: MPL-2.0
|
||||||
|
|
||||||
use crate::utils::{
|
use crate::utils::{
|
||||||
build_reqwest_client, parse_redirect_location, set_ice_servers, wait, wait_async, WaitError,
|
self, build_reqwest_client, parse_redirect_location, set_ice_servers, wait, wait_async,
|
||||||
RUNTIME,
|
WaitError, RUNTIME,
|
||||||
};
|
};
|
||||||
use crate::IceTransportPolicy;
|
use crate::IceTransportPolicy;
|
||||||
use async_recursion::async_recursion;
|
use async_recursion::async_recursion;
|
||||||
use futures::future;
|
|
||||||
use gst::glib;
|
use gst::glib;
|
||||||
use gst::prelude::*;
|
use gst::prelude::*;
|
||||||
use gst::subclass::prelude::*;
|
use gst::subclass::prelude::*;
|
||||||
|
@ -76,7 +75,7 @@ pub struct WhipSink {
|
||||||
settings: Mutex<Settings>,
|
settings: Mutex<Settings>,
|
||||||
state: Mutex<State>,
|
state: Mutex<State>,
|
||||||
webrtcbin: gst::Element,
|
webrtcbin: gst::Element,
|
||||||
canceller: Mutex<Option<future::AbortHandle>>,
|
canceller: Mutex<utils::Canceller>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Default for WhipSink {
|
impl Default for WhipSink {
|
||||||
|
@ -89,7 +88,7 @@ impl Default for WhipSink {
|
||||||
settings: Mutex::new(Settings::default()),
|
settings: Mutex::new(Settings::default()),
|
||||||
state: Mutex::new(State::default()),
|
state: Mutex::new(State::default()),
|
||||||
webrtcbin,
|
webrtcbin,
|
||||||
canceller: Mutex::new(None),
|
canceller: Mutex::new(utils::Canceller::default()),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -146,55 +145,74 @@ impl ElementImpl for WhipSink {
|
||||||
Some(sink_pad.upcast())
|
Some(sink_pad.upcast())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[allow(clippy::single_match)]
|
||||||
fn change_state(
|
fn change_state(
|
||||||
&self,
|
&self,
|
||||||
transition: gst::StateChange,
|
transition: gst::StateChange,
|
||||||
) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
|
) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
|
||||||
if transition == gst::StateChange::NullToReady {
|
match transition {
|
||||||
/*
|
gst::StateChange::NullToReady => {
|
||||||
* Fail the state change if WHIP endpoint has not been set by the
|
/*
|
||||||
* time ReadyToPaused transition happens. This prevents us from
|
* Fail the state change if WHIP endpoint has not been set by the
|
||||||
* having to check this everywhere else.
|
* time ReadyToPaused transition happens. This prevents us from
|
||||||
*/
|
* having to check this everywhere else.
|
||||||
let settings = self.settings.lock().unwrap();
|
*/
|
||||||
|
let settings = self.settings.lock().unwrap();
|
||||||
|
|
||||||
if settings.whip_endpoint.is_none() {
|
if settings.whip_endpoint.is_none() {
|
||||||
gst::error!(CAT, imp: self, "WHIP endpoint URL must be set");
|
gst::error!(CAT, imp: self, "WHIP endpoint URL must be set");
|
||||||
return Err(gst::StateChangeError);
|
return Err(gst::StateChangeError);
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Check if we have a valid URL. We can be assured any further URL
|
||||||
|
* handling won't fail due to invalid URLs.
|
||||||
|
*/
|
||||||
|
if let Err(e) =
|
||||||
|
reqwest::Url::parse(settings.whip_endpoint.as_ref().unwrap().as_str())
|
||||||
|
{
|
||||||
|
gst::error!(
|
||||||
|
CAT,
|
||||||
|
imp: self,
|
||||||
|
"WHIP endpoint URL could not be parsed: {}",
|
||||||
|
e
|
||||||
|
);
|
||||||
|
|
||||||
|
return Err(gst::StateChangeError);
|
||||||
|
}
|
||||||
|
drop(settings);
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
gst::StateChange::PausedToReady => {
|
||||||
* Check if we have a valid URL. We can be assured any further URL
|
// Interrupt requests in progress, if any
|
||||||
* handling won't fail due to invalid URLs.
|
{
|
||||||
*/
|
let mut canceller = self.canceller.lock().unwrap();
|
||||||
if let Err(e) = reqwest::Url::parse(settings.whip_endpoint.as_ref().unwrap().as_str()) {
|
canceller.abort();
|
||||||
gst::error!(
|
}
|
||||||
CAT,
|
|
||||||
imp: self,
|
|
||||||
"WHIP endpoint URL could not be parsed: {}",
|
|
||||||
e
|
|
||||||
);
|
|
||||||
|
|
||||||
return Err(gst::StateChangeError);
|
|
||||||
}
|
}
|
||||||
drop(settings);
|
_ => (),
|
||||||
}
|
}
|
||||||
|
|
||||||
if transition == gst::StateChange::PausedToReady {
|
let res = self.parent_change_state(transition)?;
|
||||||
// Interrupt requests in progress, if any
|
|
||||||
if let Some(canceller) = &*self.canceller.lock().unwrap() {
|
|
||||||
canceller.abort();
|
|
||||||
}
|
|
||||||
|
|
||||||
let state = self.state.lock().unwrap();
|
match transition {
|
||||||
if let State::Running { .. } = *state {
|
gst::StateChange::PausedToReady => {
|
||||||
// Release server-side resources
|
{
|
||||||
drop(state);
|
let mut canceller = self.canceller.lock().unwrap();
|
||||||
self.terminate_session();
|
*canceller = utils::Canceller::None;
|
||||||
|
}
|
||||||
|
|
||||||
|
let state = self.state.lock().unwrap();
|
||||||
|
if let State::Running { .. } = *state {
|
||||||
|
// Release server-side resources
|
||||||
|
drop(state);
|
||||||
|
self.terminate_session();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
_ => (),
|
||||||
}
|
}
|
||||||
|
|
||||||
self.parent_change_state(transition)
|
Ok(res)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue