webrtchttp: Use tokio runtime for spawning thread used for candidate offer

While at it, we had a bug in whepsrc where for redirect we were
incorrectly calling initial_post_request instead of do_post. Fix
that.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/949>
This commit is contained in:
Sanchayan Maity 2022-12-01 11:29:40 +05:30
parent d18761892e
commit 40680a47ab
4 changed files with 508 additions and 448 deletions

View file

@ -20,6 +20,7 @@ parse_link_header = {version = "0.3", features = ["url"]}
tokio = { version = "1.20.1", default-features = false, features = ["time", "rt-multi-thread"] } tokio = { version = "1.20.1", default-features = false, features = ["time", "rt-multi-thread"] }
futures = "0.3.23" futures = "0.3.23"
bytes = "1" bytes = "1"
async-recursion = "1.0.0"
[lib] [lib]
name = "gstwebrtchttp" name = "gstwebrtchttp"

View file

@ -23,6 +23,59 @@ pub static RUNTIME: Lazy<runtime::Runtime> = Lazy::new(|| {
.unwrap() .unwrap()
}); });
pub async fn wait_async<F, T>(
canceller: &Mutex<Option<future::AbortHandle>>,
future: F,
timeout: u32,
) -> Result<T, WaitError>
where
F: Send + Future<Output = T>,
T: Send + 'static,
{
let (abort_handle, abort_registration) = future::AbortHandle::new_pair();
{
let mut canceller_guard = canceller.lock().unwrap();
canceller_guard.replace(abort_handle);
drop(canceller_guard);
}
let future = async {
if timeout == 0 {
Ok(future.await)
} else {
let res = tokio::time::timeout(Duration::from_secs(timeout.into()), future).await;
match res {
Ok(r) => Ok(r),
Err(e) => Err(WaitError::FutureError(gst::error_msg!(
gst::ResourceError::Read,
["Request timeout, elapsed: {}", e]
))),
}
}
};
let future = async {
match future::Abortable::new(future, abort_registration).await {
Ok(Ok(r)) => Ok(r),
Ok(Err(err)) => Err(WaitError::FutureError(gst::error_msg!(
gst::ResourceError::Failed,
["Future resolved with an error {:?}", err]
))),
Err(future::Aborted) => Err(WaitError::FutureAborted),
}
};
let res = future.await;
let mut canceller_guard = canceller.lock().unwrap();
*canceller_guard = None;
res
}
pub fn wait<F, T>( pub fn wait<F, T>(
canceller: &Mutex<Option<future::AbortHandle>>, canceller: &Mutex<Option<future::AbortHandle>>,
future: F, future: F,

View file

@ -8,19 +8,20 @@
// SPDX-License-Identifier: MPL-2.0 // SPDX-License-Identifier: MPL-2.0
use crate::utils::{ use crate::utils::{
build_reqwest_client, parse_redirect_location, set_ice_servers, wait, WaitError, build_reqwest_client, parse_redirect_location, set_ice_servers, wait, wait_async, WaitError,
RUNTIME,
}; };
use crate::IceTransportPolicy; use crate::IceTransportPolicy;
use async_recursion::async_recursion;
use bytes::Bytes; use bytes::Bytes;
use futures::future; use futures::future;
use gst::{glib, prelude::*, subclass::prelude::*, ErrorMessage}; use gst::{glib, prelude::*, subclass::prelude::*};
use gst_sdp::*; use gst_sdp::*;
use gst_webrtc::*; use gst_webrtc::*;
use once_cell::sync::Lazy; use once_cell::sync::Lazy;
use reqwest::header::{HeaderMap, HeaderValue}; use reqwest::header::{HeaderMap, HeaderValue};
use reqwest::StatusCode; use reqwest::StatusCode;
use std::sync::Mutex; use std::sync::Mutex;
use std::thread::{spawn, JoinHandle};
static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| { static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
gst::DebugCategory::new( gst::DebugCategory::new(
@ -79,14 +80,8 @@ impl Default for Settings {
#[derive(Debug)] #[derive(Debug)]
enum State { enum State {
Stopped, Stopped,
Post { Post { redirects: u8 },
redirects: u8, Running { whep_resource: String },
thread_handle: Option<JoinHandle<()>>,
},
Running {
whep_resource: String,
thread_handle: Option<JoinHandle<()>>,
},
} }
impl Default for State { impl Default for State {
@ -407,6 +402,22 @@ impl BinImpl for WhepSrc {
} }
impl WhepSrc { impl WhepSrc {
fn raise_error(&self, resource_error: gst::ResourceError, msg: String) {
gst::error_msg!(resource_error, [msg.as_str()]);
gst::element_imp_error!(self, resource_error, [msg.as_str()]);
}
fn handle_future_error(&self, err: WaitError) {
match err {
WaitError::FutureAborted => {
gst::warning!(CAT, imp: self, "Future aborted")
}
WaitError::FutureError(err) => {
self.raise_error(gst::ResourceError::Failed, err.to_string())
}
};
}
fn setup_webrtcbin(&self) { fn setup_webrtcbin(&self) {
// The specification requires all m= lines to be bundled (section 4.5) // The specification requires all m= lines to be bundled (section 4.5)
self.webrtcbin self.webrtcbin
@ -429,17 +440,18 @@ impl WhepSrc {
WebRTCICEGatheringState::Complete => { WebRTCICEGatheringState::Complete => {
gst::info!(CAT, imp: self_, "ICE gathering completed"); gst::info!(CAT, imp: self_, "ICE gathering completed");
let mut state = self_.state.lock().unwrap();
let self_ref = self_.ref_counted(); let self_ref = self_.ref_counted();
gst::debug!(CAT, imp: self_, "Spawning thread to send offer"); // With tokio's spawn one does not have to .await the
let handle = spawn(move || self_ref.whep_offer()); // returned JoinHandle to make the provided future start
// execution. It will start running in the background
*state = State::Post { // immediately when spawn is called. So silence the clippy
redirects: 0, // warning.
thread_handle: Some(handle), #[allow(clippy::let_underscore_future)]
}; let _ = RUNTIME.spawn(async move {
drop(state); /* Note that we check for a valid WHEP endpoint in change_state */
self_ref.whep_offer().await
});
} }
_ => (), _ => (),
} }
@ -560,30 +572,27 @@ impl WhepSrc {
drop(settings); drop(settings);
let mut state = self_.state.lock().unwrap(); let mut state = self_.state.lock().unwrap();
*state = State::Post { *state = State::Post { redirects: 0 };
redirects: 0,
thread_handle: None,
};
drop(state); drop(state);
if let Err(e) = self_.initial_post_request(endpoint.unwrap()) { self_.initial_post_request(endpoint.unwrap());
gst::element_imp_error!(
self_,
gst::ResourceError::Failed,
["Error in initial post request - {}", e.to_string()]
);
return None;
}
None None
} }
}); });
} }
fn sdp_message_parse(&self, sdp_bytes: Bytes) -> Result<(), ErrorMessage> { fn sdp_message_parse(&self, sdp_bytes: Bytes) {
let sdp = sdp_message::SDPMessage::parse_buffer(&sdp_bytes).map_err(|_| { let sdp = match sdp_message::SDPMessage::parse_buffer(&sdp_bytes) {
gst::error_msg!(gst::ResourceError::Failed, ["Could not parse answer SDP"]) Ok(sdp) => sdp,
})?; Err(_) => {
self.raise_error(
gst::ResourceError::Failed,
"Could not parse answer SDP".to_string(),
);
return;
}
};
let remote_sdp = WebRTCSessionDescription::new(WebRTCSDPType::Answer, sdp); let remote_sdp = WebRTCSessionDescription::new(WebRTCSDPType::Answer, sdp);
@ -612,55 +621,63 @@ impl WhepSrc {
.emit_by_name::<()>("add-ice-candidate", &[&m_line_index, &c]); .emit_by_name::<()>("add-ice-candidate", &[&m_line_index, &c]);
} }
} }
Ok(())
} }
fn parse_endpoint_response( async fn parse_endpoint_response(
&self, &self,
endpoint: reqwest::Url, sess_desc: WebRTCSessionDescription,
redirects: u8,
resp: reqwest::Response, resp: reqwest::Response,
) -> Result<(), ErrorMessage> { redirects: u8,
) {
let endpoint;
let timeout;
let use_link_headers;
{
let settings = self.settings.lock().unwrap();
endpoint =
reqwest::Url::parse(settings.whep_endpoint.as_ref().unwrap().as_str()).unwrap();
use_link_headers = settings.use_link_headers;
timeout = settings.timeout;
drop(settings);
}
match resp.status() { match resp.status() {
StatusCode::OK | StatusCode::NO_CONTENT => { StatusCode::OK | StatusCode::NO_CONTENT => {
gst::info!(CAT, imp: self, "SDP offer successfully send"); gst::info!(CAT, imp: self, "SDP offer successfully send");
Ok(())
} }
StatusCode::CREATED => { StatusCode::CREATED => {
gst::debug!(CAT, imp: self, "Response headers: {:?}", resp.headers()); gst::debug!(CAT, imp: self, "Response headers: {:?}", resp.headers());
let settings = self if use_link_headers {
.settings if let Err(e) = set_ice_servers(&self.webrtcbin, resp.headers()) {
.lock() self.raise_error(gst::ResourceError::Failed, e.to_string());
.expect("Failed to acquire settings lock"); return;
let timeout = settings.timeout; };
if settings.use_link_headers {
set_ice_servers(&self.webrtcbin, resp.headers())?;
} }
drop(settings);
/* See section 4.2 of the WHEP specification */ /* See section 4.2 of the WHEP specification */
let location = match resp.headers().get(reqwest::header::LOCATION) { let location = match resp.headers().get(reqwest::header::LOCATION) {
Some(location) => location, Some(location) => location,
None => { None => {
return Err(gst::error_msg!( self.raise_error(
gst::ResourceError::Failed, gst::ResourceError::Failed,
["Location header field should be present for WHEP resource URL"] "Location header field should be present for WHEP resource URL"
)); .to_string(),
);
return;
} }
}; };
let location = match location.to_str() { let location = match location.to_str() {
Ok(loc) => loc, Ok(loc) => loc,
Err(e) => { Err(e) => {
return Err(gst::error_msg!( self.raise_error(
gst::ResourceError::Failed, gst::ResourceError::Failed,
["Failed to convert location to string {}", e] format!("Failed to convert location to string: {e}"),
)); );
return;
} }
}; };
@ -668,59 +685,52 @@ impl WhepSrc {
gst::debug!(CAT, imp: self, "WHEP resource: {:?}", location); gst::debug!(CAT, imp: self, "WHEP resource: {:?}", location);
let url = url.join(location).map_err(|err| { let url = match url.join(location) {
gst::error_msg!( Ok(joined_url) => joined_url,
Err(err) => {
self.raise_error(
gst::ResourceError::Failed, gst::ResourceError::Failed,
["URL join operation failed: {:?}", err] format!("URL join operation failed: {err:?}"),
) );
})?; return;
}
};
match wait_async(&self.canceller, resp.bytes(), timeout).await {
Ok(res) => match res {
Ok(ans_bytes) => {
let mut state = self.state.lock().unwrap(); let mut state = self.state.lock().unwrap();
*state = match *state { *state = match *state {
State::Post { State::Post { redirects: _r } => State::Running {
redirects: _r,
thread_handle: ref mut h,
} => State::Running {
whep_resource: url.to_string(), whep_resource: url.to_string(),
thread_handle: h.take(),
}, },
_ => { _ => {
return Err(gst::error_msg!( self.raise_error(
gst::ResourceError::Failed, gst::ResourceError::Failed,
["Expected to be in POST state"] "Expected to be in POST state".to_string(),
)); );
return;
} }
}; };
drop(state); drop(state);
let future = async { self.sdp_message_parse(ans_bytes)
resp.bytes().await.map_err(|err| { }
gst::error_msg!( Err(err) => self.raise_error(gst::ResourceError::Failed, err.to_string()),
gst::ResourceError::Failed,
["Failed to get response body: {:?}", err]
)
})
};
match wait(&self.canceller, future, timeout) {
Ok(ans_bytes) => self.sdp_message_parse(ans_bytes),
Err(err) => match err {
WaitError::FutureAborted => Ok(()),
WaitError::FutureError(e) => Err(e),
}, },
Err(err) => self.handle_future_error(err),
} }
} }
status if status.is_redirection() => { status if status.is_redirection() => {
if redirects < MAX_REDIRECTS { if redirects < MAX_REDIRECTS {
match parse_redirect_location(resp.headers(), &endpoint) {
Ok(redirect_url) => {
{
let mut state = self.state.lock().unwrap(); let mut state = self.state.lock().unwrap();
*state = match *state { *state = match *state {
State::Post { State::Post { redirects: _r } => State::Post {
redirects: _r,
thread_handle: ref mut h,
} => State::Post {
redirects: redirects + 1, redirects: redirects + 1,
thread_handle: h.take(),
}, },
/* /*
* As per section 4.6 of the specification, redirection is * As per section 4.6 of the specification, redirection is
@ -729,62 +739,55 @@ impl WhepSrc {
* POST request may support redirection. * POST request may support redirection.
*/ */
State::Running { .. } => { State::Running { .. } => {
return Err(gst::error_msg!( self.raise_error(
gst::ResourceError::Failed, gst::ResourceError::Failed,
["Unexpected redirection in RUNNING state"] "Unexpected redirection in RUNNING state".to_string(),
)); );
return;
} }
State::Stopped => unreachable!(), State::Stopped => unreachable!(),
}; };
drop(state); drop(state);
}
match parse_redirect_location(resp.headers(), &endpoint) {
Ok(redirect_url) => {
gst::warning!( gst::warning!(
CAT, CAT,
imp: self, imp: self,
"Redirecting endpoint to {}", "Redirecting endpoint to {}",
redirect_url.as_str() redirect_url.as_str()
); );
self.initial_post_request(redirect_url)
if let Err(err) =
wait_async(&self.canceller, self.do_post(sess_desc), timeout).await
{
self.handle_future_error(err);
} }
Err(e) => Err(e), }
Err(e) => self.raise_error(gst::ResourceError::Failed, e.to_string()),
} }
} else { } else {
Err(gst::error_msg!( self.raise_error(
gst::ResourceError::Failed, gst::ResourceError::Failed,
["Too many redirects. Unable to connect."] "Too many redirects. Unable to connect.".to_string(),
)) );
} }
} }
s => { s => {
let future = async { match wait_async(&self.canceller, resp.bytes(), timeout).await {
resp.bytes().await.map_err(|err| { Ok(r) => {
gst::error_msg!( let res = r
gst::ResourceError::Failed,
["Failed to get response body: {:?}", err]
)
})
};
let settings = self
.settings
.lock()
.expect("Failed to acquire settings lock");
let timeout = settings.timeout;
drop(settings);
let res = wait(&self.canceller, future, timeout)
.map(|x| x.escape_ascii().to_string()) .map(|x| x.escape_ascii().to_string())
.unwrap_or_else(|_| "(no further details)".to_string()); .unwrap_or_else(|_| "(no further details)".to_string());
// FIXME: Check and handle 'Retry-After' header in case of server error // FIXME: Check and handle 'Retry-After' header in case of server error
Err(gst::error_msg!( self.raise_error(
gst::ResourceError::Failed, gst::ResourceError::Failed,
["Unexpected response: {} - {}", s.as_str(), res] format!("Unexpected response: {} - {}", s.as_str(), res),
)) );
}
Err(err) => self.handle_future_error(err),
}
} }
} }
} }
@ -833,11 +836,16 @@ impl WhepSrc {
&[&offer_sdp, &None::<gst::Promise>], &[&offer_sdp, &None::<gst::Promise>],
); );
} else { } else {
gst::error!(CAT, imp: self_, "Reply without an offer: {}", reply); let error = reply
.value("error")
.expect("structure must have an error value")
.get::<glib::Error>()
.expect("value must be a GLib error");
gst::element_imp_error!( gst::element_imp_error!(
self_, self_,
gst::LibraryError::Failed, gst::LibraryError::Failed,
["generate offer::Promise returned with no reply"] ["generate offer::Promise returned with no reply: {}", error]
); );
} }
}); });
@ -878,7 +886,7 @@ impl WhepSrc {
.emit_by_name::<()>("create-offer", &[&None::<gst::Structure>, &promise]); .emit_by_name::<()>("create-offer", &[&None::<gst::Structure>, &promise]);
} }
fn initial_post_request(&self, endpoint: reqwest::Url) -> Result<(), ErrorMessage> { fn initial_post_request(&self, endpoint: reqwest::Url) {
let state = self.state.lock().unwrap(); let state = self.state.lock().unwrap();
gst::info!(CAT, imp: self, "WHEP endpoint url: {}", endpoint.as_str()); gst::info!(CAT, imp: self, "WHEP endpoint url: {}", endpoint.as_str());
@ -886,20 +894,19 @@ impl WhepSrc {
match *state { match *state {
State::Post { .. } => (), State::Post { .. } => (),
_ => { _ => {
return Err(gst::error_msg!( self.raise_error(
gst::ResourceError::Failed, gst::ResourceError::Failed,
["Trying to do POST in unexpected state"] "Trying to do POST in unexpected state".to_string(),
)); );
return;
} }
}; };
drop(state); drop(state);
self.generate_offer(); self.generate_offer()
Ok(())
} }
fn whep_offer(&self) { async fn whep_offer(&self) {
let local_desc = self let local_desc = self
.webrtcbin .webrtcbin
.property::<Option<WebRTCSessionDescription>>("local-description"); .property::<Option<WebRTCSessionDescription>>("local-description");
@ -923,40 +930,34 @@ impl WhepSrc {
offer_sdp.sdp().as_text() offer_sdp.sdp().as_text()
); );
if let Err(e) = self.send_sdp(offer_sdp.sdp()) { let sess_desc = WebRTCSessionDescription::new(WebRTCSDPType::Offer, offer_sdp.sdp());
gst::element_imp_error!(
self,
gst::ResourceError::Failed,
["Error in sending answer - {}", e.to_string()]
);
}
}
fn send_sdp(&self, sdp: SDPMessage) -> Result<(), gst::ErrorMessage> { let timeout;
let sess_desc = WebRTCSessionDescription::new(WebRTCSDPType::Offer, sdp); {
let settings = self.settings.lock().unwrap(); let settings = self.settings.lock().unwrap();
timeout = settings.timeout;
let endpoint = reqwest::Url::parse(settings.whep_endpoint.as_ref().unwrap().as_str());
if let Err(e) = endpoint {
return Err(gst::error_msg!(
gst::ResourceError::Failed,
["Could not parse endpoint URL: {}", e]
));
}
drop(settings); drop(settings);
self.do_post(sess_desc, endpoint.unwrap())
} }
fn do_post( if let Err(e) = wait_async(&self.canceller, self.do_post(sess_desc), timeout).await {
&self, self.handle_future_error(e);
offer: WebRTCSessionDescription, }
endpoint: reqwest::Url, }
) -> Result<(), gst::ErrorMessage> {
#[async_recursion]
async fn do_post(&self, offer: WebRTCSessionDescription) {
let auth_token;
let endpoint;
let timeout;
{
let settings = self.settings.lock().unwrap(); let settings = self.settings.lock().unwrap();
let timeout = settings.timeout; endpoint =
reqwest::Url::parse(settings.whep_endpoint.as_ref().unwrap().as_str()).unwrap();
auth_token = settings.auth_token.clone();
timeout = settings.timeout;
drop(settings);
}
let sdp = offer.sdp(); let sdp = offer.sdp();
let body = sdp.as_text().unwrap(); let body = sdp.as_text().unwrap();
@ -969,8 +970,8 @@ impl WhepSrc {
HeaderValue::from_static("application/sdp"), HeaderValue::from_static("application/sdp"),
); );
if let Some(token) = &settings.auth_token { if let Some(token) = auth_token.as_ref() {
let bearer_token = "Bearer ".to_owned() + token; let bearer_token = "Bearer ".to_owned() + token.as_str();
headermap.insert( headermap.insert(
reqwest::header::AUTHORIZATION, reqwest::header::AUTHORIZATION,
HeaderValue::from_str(bearer_token.as_str()) HeaderValue::from_str(bearer_token.as_str())
@ -978,8 +979,6 @@ impl WhepSrc {
); );
} }
drop(settings);
gst::debug!( gst::debug!(
CAT, CAT,
imp: self, imp: self,
@ -987,51 +986,63 @@ impl WhepSrc {
endpoint.as_str() endpoint.as_str()
); );
let future = async { let res = wait_async(
&self.canceller,
self.client self.client
.request(reqwest::Method::POST, endpoint.clone()) .request(reqwest::Method::POST, endpoint.clone())
.headers(headermap) .headers(headermap)
.body(body) .body(body)
.send() .send(),
.await timeout,
.map_err(|err| {
gst::error_msg!(
gst::ResourceError::Failed,
["HTTP POST request failed {}: {:?}", endpoint.as_str(), err]
) )
}) .await;
};
match wait(&self.canceller, future, timeout) { match res {
Ok(resp) => self.parse_endpoint_response(endpoint, 0, resp), Ok(resp) => match resp {
Err(err) => match err { Ok(r) => {
WaitError::FutureAborted => Ok(()), #[allow(unused_mut)]
WaitError::FutureError(e) => Err(e), let mut redirects;
{
let state = self.state.lock().unwrap();
redirects = match *state {
State::Post { redirects } => redirects,
_ => {
self.raise_error(
gst::ResourceError::Failed,
"Trying to do POST in unexpected state".to_string(),
);
return;
}
};
drop(state);
}
if let Err(e) = wait_async(
&self.canceller,
self.parse_endpoint_response(offer, r, redirects),
timeout,
)
.await
{
self.handle_future_error(e);
}
}
Err(err) => self.raise_error(gst::ResourceError::Failed, err.to_string()),
}, },
Err(err) => self.handle_future_error(err),
} }
} }
fn terminate_session(&self) { fn terminate_session(&self) {
let settings = self.settings.lock().unwrap(); let settings = self.settings.lock().unwrap();
let mut state = self.state.lock().unwrap(); let state = self.state.lock().unwrap();
let timeout = settings.timeout; let timeout = settings.timeout;
let resource_url;
(*state, resource_url) = match *state { let resource_url = match *state {
State::Running { State::Running {
whep_resource: ref whep_resource_url, whep_resource: ref whep_resource_url,
thread_handle: ref mut h, } => whep_resource_url.clone(),
} => {
if let Some(th) = h.take() {
match th.join() {
Ok(_) => {
gst::debug!(CAT, imp: self, "Send offer thread joined successfully");
}
Err(e) => gst::error!(CAT, imp: self, "Failed to join thread: {:?}", e),
}
}
(State::Stopped, whep_resource_url.clone())
}
_ => { _ => {
gst::element_imp_error!( gst::element_imp_error!(
self, self,

View file

@ -8,14 +8,15 @@
// SPDX-License-Identifier: MPL-2.0 // SPDX-License-Identifier: MPL-2.0
use crate::utils::{ use crate::utils::{
build_reqwest_client, parse_redirect_location, set_ice_servers, wait, WaitError, build_reqwest_client, parse_redirect_location, set_ice_servers, wait, wait_async, WaitError,
RUNTIME,
}; };
use crate::IceTransportPolicy; use crate::IceTransportPolicy;
use async_recursion::async_recursion;
use futures::future; use futures::future;
use gst::glib; use gst::glib;
use gst::prelude::*; use gst::prelude::*;
use gst::subclass::prelude::*; use gst::subclass::prelude::*;
use gst::ErrorMessage;
use gst_sdp::*; use gst_sdp::*;
use gst_webrtc::*; use gst_webrtc::*;
use once_cell::sync::Lazy; use once_cell::sync::Lazy;
@ -23,14 +24,12 @@ use reqwest::header::HeaderMap;
use reqwest::header::HeaderValue; use reqwest::header::HeaderValue;
use reqwest::StatusCode; use reqwest::StatusCode;
use std::sync::Mutex; use std::sync::Mutex;
use std::thread::{spawn, JoinHandle};
static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| { static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
gst::DebugCategory::new("whipsink", gst::DebugColorFlags::empty(), Some("WHIP Sink")) gst::DebugCategory::new("whipsink", gst::DebugColorFlags::empty(), Some("WHIP Sink"))
}); });
const DEFAULT_ICE_TRANSPORT_POLICY: IceTransportPolicy = const DEFAULT_ICE_TRANSPORT_POLICY: IceTransportPolicy = IceTransportPolicy::All;
IceTransportPolicy::All;
const MAX_REDIRECTS: u8 = 10; const MAX_REDIRECTS: u8 = 10;
const DEFAULT_TIMEOUT: u32 = 15; const DEFAULT_TIMEOUT: u32 = 15;
@ -63,14 +62,8 @@ impl Default for Settings {
#[derive(Debug)] #[derive(Debug)]
enum State { enum State {
Stopped, Stopped,
Post { Post { redirects: u8 },
redirects: u8, Running { whip_resource_url: String },
thread_handle: Option<JoinHandle<()>>,
},
Running {
whip_resource_url: String,
thread_handle: Option<JoinHandle<()>>,
},
} }
impl Default for State { impl Default for State {
@ -368,17 +361,18 @@ impl ObjectImpl for WhipSink {
WebRTCICEGatheringState::Complete => { WebRTCICEGatheringState::Complete => {
gst::info!(CAT, imp: self_, "ICE gathering completed"); gst::info!(CAT, imp: self_, "ICE gathering completed");
let mut state = self_.state.lock().unwrap();
let self_ref = self_.ref_counted(); let self_ref = self_.ref_counted();
gst::debug!(CAT, imp: self_, "Spawning thread to send offer"); // With tokio's spawn one does not have to .await the
let handle = spawn(move || self_ref.send_offer()); // returned JoinHandle to make the provided future start
// execution. It will start running in the background
*state = State::Post { // immediately when spawn is called. So silence the clippy
redirects: 0, // warning.
thread_handle: Some(handle), #[allow(clippy::let_underscore_future)]
}; let _ = RUNTIME.spawn(async move {
drop(state); /* Note that we check for a valid WHIP endpoint in change_state */
self_ref.send_offer().await
});
} }
_ => (), _ => (),
} }
@ -491,27 +485,28 @@ impl ObjectSubclass for WhipSink {
} }
impl WhipSink { impl WhipSink {
fn send_offer(&self) { fn raise_error(&self, resource_error: gst::ResourceError, msg: String) {
let settings = self.settings.lock().unwrap(); gst::error_msg!(resource_error, [msg.as_str()]);
gst::element_imp_error!(self, resource_error, [msg.as_str()]);
/* Note that we check for a valid WHIP endpoint in change_state */
let endpoint = reqwest::Url::parse(settings.whip_endpoint.as_ref().unwrap().as_str());
if let Err(e) = endpoint {
gst::element_imp_error!(
self,
gst::ResourceError::Failed,
["Could not parse endpoint URL: {}", e]
);
return;
} }
drop(settings); fn handle_future_error(&self, err: WaitError) {
let mut state = self.state.lock().unwrap(); match err {
*state = State::Post { WaitError::FutureAborted => {
redirects: 0, gst::warning!(CAT, imp: self, "Future aborted")
thread_handle: None, }
WaitError::FutureError(err) => {
self.raise_error(gst::ResourceError::Failed, err.to_string())
}
}; };
}
async fn send_offer(&self) {
{
let mut state = self.state.lock().unwrap();
*state = State::Post { redirects: 0 };
drop(state); drop(state);
}
let local_desc = self let local_desc = self
.webrtcbin .webrtcbin
@ -536,40 +531,50 @@ impl WhipSink {
offer_sdp.sdp().as_text() offer_sdp.sdp().as_text()
); );
match self.do_post(offer_sdp, endpoint.unwrap()) { let timeout;
Ok(_) => (), {
Err(e) => {
gst::element_imp_error!(
self,
gst::ResourceError::Failed,
["Failed to send offer: {}", e]
);
}
}
}
fn do_post(
&self,
offer: gst_webrtc::WebRTCSessionDescription,
endpoint: reqwest::Url,
) -> Result<(), gst::ErrorMessage> {
let settings = self.settings.lock().unwrap(); let settings = self.settings.lock().unwrap();
let state = self.state.lock().unwrap(); timeout = settings.timeout;
let timeout = settings.timeout; drop(settings);
}
let redirects = match *state { if let Err(e) = wait_async(&self.canceller, self.do_post(offer_sdp), timeout).await {
State::Post { self.handle_future_error(e);
redirects, }
thread_handle: ref _h, }
} => redirects,
#[async_recursion]
async fn do_post(&self, offer: gst_webrtc::WebRTCSessionDescription) {
let auth_token;
let endpoint;
let timeout;
{
let settings = self.settings.lock().unwrap();
endpoint =
reqwest::Url::parse(settings.whip_endpoint.as_ref().unwrap().as_str()).unwrap();
auth_token = settings.auth_token.clone();
timeout = settings.timeout;
drop(settings);
}
#[allow(unused_mut)]
let mut redirects;
{
let state = self.state.lock().unwrap();
redirects = match *state {
State::Post { redirects } => redirects,
_ => { _ => {
return Err(gst::error_msg!( self.raise_error(
gst::ResourceError::Failed, gst::ResourceError::Failed,
["Trying to POST in unexpected state"] "Trying to do POST in unexpected state".to_string(),
)); );
return;
} }
}; };
drop(state); drop(state);
}
// Default policy for redirect does not share the auth token to new location // Default policy for redirect does not share the auth token to new location
// So disable inbuilt redirecting and do a recursive call upon 3xx response code // So disable inbuilt redirecting and do a recursive call upon 3xx response code
@ -586,7 +591,7 @@ impl WhipSink {
HeaderValue::from_static("application/sdp"), HeaderValue::from_static("application/sdp"),
); );
if let Some(token) = &settings.auth_token { if let Some(token) = auth_token.as_ref() {
let bearer_token = "Bearer ".to_owned() + token; let bearer_token = "Bearer ".to_owned() + token;
headermap.insert( headermap.insert(
reqwest::header::AUTHORIZATION, reqwest::header::AUTHORIZATION,
@ -595,51 +600,63 @@ impl WhipSink {
); );
} }
let future = async { let res = wait_async(
&self.canceller,
client client
.request(reqwest::Method::POST, endpoint.as_ref()) .request(reqwest::Method::POST, endpoint.clone())
.headers(headermap) .headers(headermap)
.body(body) .body(body)
.send() .send(),
.await timeout,
.map_err(|err| {
gst::error_msg!(
gst::ResourceError::Failed,
["POST request failed: {:?}", err]
) )
}) .await;
};
drop(settings); match res {
Ok(r) => match r {
match wait(&self.canceller, future, timeout) { Ok(resp) => {
Ok(resp) => self.parse_endpoint_response(offer, endpoint, redirects, resp), if let Err(e) = wait_async(
Err(err) => match err { &self.canceller,
WaitError::FutureAborted => Ok(()), self.parse_endpoint_response(offer, resp, redirects),
WaitError::FutureError(e) => Err(e), timeout,
)
.await
{
self.handle_future_error(e);
}
}
Err(err) => self.raise_error(gst::ResourceError::Failed, err.to_string()),
}, },
Err(err) => self.handle_future_error(err),
} }
} }
fn parse_endpoint_response( async fn parse_endpoint_response(
&self, &self,
offer: gst_webrtc::WebRTCSessionDescription, offer: gst_webrtc::WebRTCSessionDescription,
endpoint: reqwest::Url,
redirects: u8,
resp: reqwest::Response, resp: reqwest::Response,
) -> Result<(), ErrorMessage> { redirects: u8,
) {
let endpoint;
let timeout;
let use_link_headers;
{
let settings = self.settings.lock().unwrap();
endpoint =
reqwest::Url::parse(settings.whip_endpoint.as_ref().unwrap().as_str()).unwrap();
use_link_headers = settings.use_link_headers;
timeout = settings.timeout;
drop(settings);
}
match resp.status() { match resp.status() {
StatusCode::OK | StatusCode::CREATED => { StatusCode::OK | StatusCode::CREATED => {
let settings = self if use_link_headers {
.settings if let Err(e) = set_ice_servers(&self.webrtcbin, resp.headers()) {
.lock() self.raise_error(gst::ResourceError::Failed, e.to_string());
.expect("Failed to acquire settings lock"); return;
let timeout = settings.timeout; };
if settings.use_link_headers {
set_ice_servers(&self.webrtcbin, resp.headers())?;
} }
drop(settings);
// Get the url of the resource from 'location' header. // Get the url of the resource from 'location' header.
// The resource created is expected be a relative path // The resource created is expected be a relative path
@ -650,20 +667,23 @@ impl WhipSink {
let location = match resp.headers().get(reqwest::header::LOCATION) { let location = match resp.headers().get(reqwest::header::LOCATION) {
Some(location) => location, Some(location) => location,
None => { None => {
return Err(gst::error_msg!( self.raise_error(
gst::ResourceError::Failed, gst::ResourceError::Failed,
["Location header field should be present for WHIP resource URL"] "Location header field should be present for WHIP resource URL"
)); .to_string(),
);
return;
} }
}; };
let location = match location.to_str() { let location = match location.to_str() {
Ok(loc) => loc, Ok(loc) => loc,
Err(e) => { Err(e) => {
return Err(gst::error_msg!( self.raise_error(
gst::ResourceError::Failed, gst::ResourceError::Failed,
["Failed to convert location to string {}", e] format!("Failed to convert location to string: {e}"),
)); );
return;
} }
}; };
@ -671,41 +691,36 @@ impl WhipSink {
gst::debug!(CAT, imp: self, "WHIP resource: {:?}", location); gst::debug!(CAT, imp: self, "WHIP resource: {:?}", location);
let url = url.join(location).map_err(|err| { let url = match url.join(location) {
gst::error_msg!( Ok(joined_url) => joined_url,
Err(err) => {
self.raise_error(
gst::ResourceError::Failed, gst::ResourceError::Failed,
["URL join operation failed: {:?}", err] format!("URL join operation failed: {err:?}"),
) );
})?; return;
}
};
{
let mut state = self.state.lock().unwrap(); let mut state = self.state.lock().unwrap();
*state = match *state { *state = match *state {
State::Post { State::Post { redirects: _r } => State::Running {
redirects: _r,
thread_handle: ref mut h,
} => State::Running {
whip_resource_url: url.to_string(), whip_resource_url: url.to_string(),
thread_handle: h.take(),
}, },
_ => { _ => {
return Err(gst::error_msg!( self.raise_error(
gst::ResourceError::Failed, gst::ResourceError::Failed,
["Expected to be in POST state"] "Expected to be in POST state".to_string(),
)); );
return;
} }
}; };
drop(state); drop(state);
}
let future = async { match wait_async(&self.canceller, resp.bytes(), timeout).await {
resp.bytes().await.map_err(|err| { Ok(res) => match res {
gst::error_msg!(
gst::ResourceError::Failed,
["Failed to get response body: {:?}", err]
)
})
};
match wait(&self.canceller, future, timeout) {
Ok(ans_bytes) => match sdp_message::SDPMessage::parse_buffer(&ans_bytes) { Ok(ans_bytes) => match sdp_message::SDPMessage::parse_buffer(&ans_bytes) {
Ok(ans_sdp) => { Ok(ans_sdp) => {
let answer = gst_webrtc::WebRTCSessionDescription::new( let answer = gst_webrtc::WebRTCSessionDescription::new(
@ -716,31 +731,29 @@ impl WhipSink {
"set-remote-description", "set-remote-description",
&[&answer, &None::<gst::Promise>], &[&answer, &None::<gst::Promise>],
); );
Ok(())
} }
Err(err) => {
Err(e) => Err(gst::error_msg!( self.raise_error(
gst::ResourceError::Failed, gst::ResourceError::Failed,
["Could not parse answer SDP: {}", e] format!("Could not parse answer SDP: {err}"),
)), );
}
}, },
Err(err) => match err { Err(err) => self.raise_error(gst::ResourceError::Failed, err.to_string()),
WaitError::FutureAborted => Ok(()),
WaitError::FutureError(e) => Err(e),
}, },
Err(err) => self.handle_future_error(err),
} }
} }
s if s.is_redirection() => { s if s.is_redirection() => {
if redirects < MAX_REDIRECTS { if redirects < MAX_REDIRECTS {
match parse_redirect_location(resp.headers(), &endpoint) {
Ok(redirect_url) => {
{
let mut state = self.state.lock().unwrap(); let mut state = self.state.lock().unwrap();
*state = match *state { *state = match *state {
State::Post { State::Post { redirects: _r } => State::Post {
redirects: _r,
thread_handle: ref mut h,
} => State::Post {
redirects: redirects + 1, redirects: redirects + 1,
thread_handle: h.take(),
}, },
/* /*
* As per section 4.6 of the specification, redirection is * As per section 4.6 of the specification, redirection is
@ -749,86 +762,68 @@ impl WhipSink {
* POST request may support redirection. * POST request may support redirection.
*/ */
State::Running { .. } => { State::Running { .. } => {
return Err(gst::error_msg!( self.raise_error(
gst::ResourceError::Failed, gst::ResourceError::Failed,
["Unexpected redirection in RUNNING state"] "Unexpected redirection in RUNNING state".to_string(),
)); );
return;
} }
State::Stopped => unreachable!(), State::Stopped => unreachable!(),
}; };
drop(state); drop(state);
}
match parse_redirect_location(resp.headers(), &endpoint) {
Ok(redirect_url) => {
gst::debug!( gst::debug!(
CAT, CAT,
imp: self, imp: self,
"Redirecting endpoint to {}", "Redirecting endpoint to {}",
redirect_url.as_str() redirect_url.as_str()
); );
self.do_post(offer, redirect_url)
if let Err(err) =
wait_async(&self.canceller, self.do_post(offer), timeout).await
{
self.handle_future_error(err);
} }
Err(e) => Err(e), }
Err(e) => self.raise_error(gst::ResourceError::Failed, e.to_string()),
} }
} else { } else {
Err(gst::error_msg!( self.raise_error(
gst::ResourceError::Failed, gst::ResourceError::Failed,
["Too many redirects. Unable to connect to do POST"] "Too many redirects. Unable to connect.".to_string(),
)) );
} }
} }
s => { s => {
let future = async { match wait_async(&self.canceller, resp.bytes(), timeout).await {
resp.bytes().await.map_err(|err| { Ok(r) => {
gst::error_msg!( let res = r
gst::ResourceError::Failed,
["Failed to get response body: {:?}", err]
)
})
};
let settings = self
.settings
.lock()
.expect("Failed to acquire settings lock");
let timeout = settings.timeout;
drop(settings);
let resp = wait(&self.canceller, future, timeout)
.map(|x| x.escape_ascii().to_string()) .map(|x| x.escape_ascii().to_string())
.unwrap_or_else(|_| "(no further details)".to_string()); .unwrap_or_else(|_| "(no further details)".to_string());
// FIXME: Check and handle 'Retry-After' header in case of server error // FIXME: Check and handle 'Retry-After' header in case of server error
Err(gst::error_msg!( self.raise_error(
gst::ResourceError::Failed, gst::ResourceError::Failed,
["Server returned error: {} - {}", s.as_str(), resp] format!("Unexpected response: {} - {}", s.as_str(), res),
)) );
}
Err(err) => self.handle_future_error(err),
}
} }
} }
} }
fn terminate_session(&self) { fn terminate_session(&self) {
let settings = self.settings.lock().unwrap(); let settings = self.settings.lock().unwrap();
let mut state = self.state.lock().unwrap(); let state = self.state.lock().unwrap();
let timeout = settings.timeout; let timeout = settings.timeout;
let resource_url;
(*state, resource_url) = match *state { let resource_url = match *state {
State::Running { State::Running {
whip_resource_url: ref resource_url, whip_resource_url: ref resource_url,
thread_handle: ref mut h, } => resource_url.clone(),
} => {
if let Some(th) = h.take() {
match th.join() {
Ok(_) => {
gst::debug!(CAT, imp: self, "Send offer thread joined successfully");
}
Err(e) => gst::error!(CAT, imp: self, "Failed to join thread: {:?}", e),
}
}
(State::Stopped, resource_url.clone())
}
_ => { _ => {
gst::element_imp_error!( gst::element_imp_error!(
self, self,