net: webrtc/webrtchttp: Fix canceller usage

Commit 08b6251a added the check to ensure only one canceller at a time for net/webrtc.

In `whipsink` and since `whipwebrtcsink` picked up the same implementation, there exists a
bug around the use of canceller. `whipsink` calls `wait_async` while passing the canceller
as an argument. The path `send_offer -> do_post -> parse_endpoint_response` results in the
canceller being replaced in each subsequent call to `wait_async`. Since `wait_async` call
does not ensure one canceller, with the async call the use of canceller/abort was subtly
broken. Similarly, for `whepsrc`.

We really don't need to use `wait_async` inside `do_post` for any `await` calls. If the
root future viz. `do_post` with `wait_async` is aborted, the child futures will be taken
care of.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1290>
This commit is contained in:
Sanchayan Maity 2023-07-31 11:23:32 +05:30
parent d6616fed3f
commit 5b60ecbb18
4 changed files with 116 additions and 197 deletions

View file

@ -124,14 +124,12 @@ impl Signaller {
async fn do_post(&self, offer: gst_webrtc::WebRTCSessionDescription, webrtcbin: &gst::Element) {
let auth_token;
let endpoint;
let timeout;
{
let settings = self.settings.lock().unwrap();
endpoint =
reqwest::Url::parse(settings.whip_endpoint.as_ref().unwrap().as_str()).unwrap();
auth_token = settings.auth_token.clone();
timeout = settings.timeout;
drop(settings);
}
@ -174,33 +172,19 @@ impl Signaller {
);
}
let res = wait_async(
&self.canceller,
client
let res = client
.request(reqwest::Method::POST, endpoint.clone())
.headers(headermap)
.body(body)
.send(),
timeout,
)
.send()
.await;
match res {
Ok(r) => match r {
Ok(resp) => {
if let Err(e) = wait_async(
&self.canceller,
self.parse_endpoint_response(offer, resp, redirects, webrtcbin),
timeout,
)
self.parse_endpoint_response(offer, resp, redirects, webrtcbin)
.await
{
self.handle_future_error(e);
}
}
Err(err) => self.raise_error(err.to_string()),
},
Err(err) => self.handle_future_error(err),
}
}
@ -214,7 +198,6 @@ impl Signaller {
gst::debug!(CAT, imp: self, "Parsing endpoint response");
let endpoint;
let timeout;
let use_link_headers;
{
@ -222,7 +205,6 @@ impl Signaller {
endpoint =
reqwest::Url::parse(settings.whip_endpoint.as_ref().unwrap().as_str()).unwrap();
use_link_headers = settings.use_link_headers;
timeout = settings.timeout;
drop(settings);
}
@ -288,26 +270,21 @@ impl Signaller {
drop(state);
}
match wait_async(&self.canceller, resp.bytes(), timeout).await {
Ok(res) => match res {
match resp.bytes().await {
Ok(ans_bytes) => match gst_sdp::SDPMessage::parse_buffer(&ans_bytes) {
Ok(ans_sdp) => {
let answer = gst_webrtc::WebRTCSessionDescription::new(
gst_webrtc::WebRTCSDPType::Answer,
ans_sdp,
);
self.obj().emit_by_name::<()>(
"session-description",
&[&"unique", &answer],
);
self.obj()
.emit_by_name::<()>("session-description", &[&"unique", &answer]);
}
Err(err) => {
self.raise_error(format!("Could not parse answer SDP: {err}"));
}
},
Err(err) => self.raise_error(err.to_string()),
},
Err(err) => self.handle_future_error(err),
}
}
@ -347,12 +324,7 @@ impl Signaller {
redirect_url.as_str()
);
if let Err(err) =
wait_async(&self.canceller, self.do_post(offer, webrtcbin), timeout)
.await
{
self.handle_future_error(err);
}
self.do_post(offer, webrtcbin).await
}
Err(e) => self.raise_error(e.to_string()),
}
@ -362,16 +334,14 @@ impl Signaller {
}
s => {
match wait_async(&self.canceller, resp.bytes(), timeout).await {
match resp.bytes().await {
Ok(r) => {
let res = r
.map(|x| x.escape_ascii().to_string())
.unwrap_or_else(|_| "(no further details)".to_string());
let res = r.escape_ascii().to_string();
// FIXME: Check and handle 'Retry-After' header in case of server error
self.raise_error(format!("Unexpected response: {} - {}", s.as_str(), res));
}
Err(err) => self.handle_future_error(err),
Err(err) => self.raise_error(err.to_string()),
}
}
}

View file

@ -35,6 +35,12 @@ where
let (abort_handle, abort_registration) = future::AbortHandle::new_pair();
{
let mut canceller_guard = canceller.lock().unwrap();
if canceller_guard.is_some() {
return Err(WaitError::FutureError(gst::error_msg!(
gst::ResourceError::Failed,
["Old Canceller should not exist"]
)));
}
canceller_guard.replace(abort_handle);
drop(canceller_guard);
}

View file

@ -626,7 +626,6 @@ impl WhepSrc {
redirects: u8,
) {
let endpoint;
let timeout;
let use_link_headers;
{
@ -634,7 +633,6 @@ impl WhepSrc {
endpoint =
reqwest::Url::parse(settings.whep_endpoint.as_ref().unwrap().as_str()).unwrap();
use_link_headers = settings.use_link_headers;
timeout = settings.timeout;
drop(settings);
}
@ -692,8 +690,7 @@ impl WhepSrc {
}
};
match wait_async(&self.canceller, resp.bytes(), timeout).await {
Ok(res) => match res {
match resp.bytes().await {
Ok(ans_bytes) => {
let mut state = self.state.lock().unwrap();
*state = match *state {
@ -713,8 +710,6 @@ impl WhepSrc {
self.sdp_message_parse(ans_bytes)
}
Err(err) => self.raise_error(gst::ResourceError::Failed, err.to_string()),
},
Err(err) => self.handle_future_error(err),
}
}
@ -753,11 +748,7 @@ impl WhepSrc {
redirect_url.as_str()
);
if let Err(err) =
wait_async(&self.canceller, self.do_post(sess_desc), timeout).await
{
self.handle_future_error(err);
}
self.do_post(sess_desc).await
}
Err(e) => self.raise_error(gst::ResourceError::Failed, e.to_string()),
}
@ -770,11 +761,9 @@ impl WhepSrc {
}
s => {
match wait_async(&self.canceller, resp.bytes(), timeout).await {
match resp.bytes().await {
Ok(r) => {
let res = r
.map(|x| x.escape_ascii().to_string())
.unwrap_or_else(|_| "(no further details)".to_string());
let res = r.escape_ascii().to_string();
// FIXME: Check and handle 'Retry-After' header in case of server error
self.raise_error(
@ -782,7 +771,7 @@ impl WhepSrc {
format!("Unexpected response: {} - {}", s.as_str(), res),
);
}
Err(err) => self.handle_future_error(err),
Err(err) => self.raise_error(gst::ResourceError::Failed, err.to_string()),
}
}
}
@ -944,14 +933,12 @@ impl WhepSrc {
async fn do_post(&self, offer: WebRTCSessionDescription) {
let auth_token;
let endpoint;
let timeout;
{
let settings = self.settings.lock().unwrap();
endpoint =
reqwest::Url::parse(settings.whep_endpoint.as_ref().unwrap().as_str()).unwrap();
auth_token = settings.auth_token.clone();
timeout = settings.timeout;
drop(settings);
}
@ -982,19 +969,15 @@ impl WhepSrc {
endpoint.as_str()
);
let res = wait_async(
&self.canceller,
self.client
let resp = self
.client
.request(reqwest::Method::POST, endpoint.clone())
.headers(headermap)
.body(body)
.send(),
timeout,
)
.send()
.await;
match res {
Ok(resp) => match resp {
match resp {
Ok(r) => {
#[allow(unused_mut)]
let mut redirects;
@ -1014,19 +997,9 @@ impl WhepSrc {
drop(state);
}
if let Err(e) = wait_async(
&self.canceller,
self.parse_endpoint_response(offer, r, redirects),
timeout,
)
.await
{
self.handle_future_error(e);
}
self.parse_endpoint_response(offer, r, redirects).await
}
Err(err) => self.raise_error(gst::ResourceError::Failed, err.to_string()),
},
Err(err) => self.handle_future_error(err),
}
}

View file

@ -565,14 +565,12 @@ impl WhipSink {
async fn do_post(&self, offer: gst_webrtc::WebRTCSessionDescription) {
let auth_token;
let endpoint;
let timeout;
{
let settings = self.settings.lock().unwrap();
endpoint =
reqwest::Url::parse(settings.whip_endpoint.as_ref().unwrap().as_str()).unwrap();
auth_token = settings.auth_token.clone();
timeout = settings.timeout;
drop(settings);
}
@ -618,33 +616,16 @@ impl WhipSink {
);
}
let res = wait_async(
&self.canceller,
client
let res = client
.request(reqwest::Method::POST, endpoint.clone())
.headers(headermap)
.body(body)
.send(),
timeout,
)
.send()
.await;
match res {
Ok(r) => match r {
Ok(resp) => {
if let Err(e) = wait_async(
&self.canceller,
self.parse_endpoint_response(offer, resp, redirects),
timeout,
)
.await
{
self.handle_future_error(e);
}
}
Ok(resp) => self.parse_endpoint_response(offer, resp, redirects).await,
Err(err) => self.raise_error(gst::ResourceError::Failed, err.to_string()),
},
Err(err) => self.handle_future_error(err),
}
}
@ -655,7 +636,6 @@ impl WhipSink {
redirects: u8,
) {
let endpoint;
let timeout;
let use_link_headers;
{
@ -663,7 +643,6 @@ impl WhipSink {
endpoint =
reqwest::Url::parse(settings.whip_endpoint.as_ref().unwrap().as_str()).unwrap();
use_link_headers = settings.use_link_headers;
timeout = settings.timeout;
drop(settings);
}
@ -737,8 +716,7 @@ impl WhipSink {
drop(state);
}
match wait_async(&self.canceller, resp.bytes(), timeout).await {
Ok(res) => match res {
match resp.bytes().await {
Ok(ans_bytes) => match sdp_message::SDPMessage::parse_buffer(&ans_bytes) {
Ok(ans_sdp) => {
let answer = gst_webrtc::WebRTCSessionDescription::new(
@ -758,8 +736,6 @@ impl WhipSink {
}
},
Err(err) => self.raise_error(gst::ResourceError::Failed, err.to_string()),
},
Err(err) => self.handle_future_error(err),
}
}
@ -798,11 +774,7 @@ impl WhipSink {
redirect_url.as_str()
);
if let Err(err) =
wait_async(&self.canceller, self.do_post(offer), timeout).await
{
self.handle_future_error(err);
}
self.do_post(offer).await
}
Err(e) => self.raise_error(gst::ResourceError::Failed, e.to_string()),
}
@ -815,11 +787,9 @@ impl WhipSink {
}
s => {
match wait_async(&self.canceller, resp.bytes(), timeout).await {
match resp.bytes().await {
Ok(r) => {
let res = r
.map(|x| x.escape_ascii().to_string())
.unwrap_or_else(|_| "(no further details)".to_string());
let res = r.escape_ascii().to_string();
// FIXME: Check and handle 'Retry-After' header in case of server error
self.raise_error(
@ -827,7 +797,7 @@ impl WhipSink {
format!("Unexpected response: {} - {}", s.as_str(), res),
);
}
Err(err) => self.handle_future_error(err),
Err(err) => self.raise_error(gst::ResourceError::Failed, err.to_string()),
}
}
}