diff --git a/net/webrtchttp/Cargo.toml b/net/webrtchttp/Cargo.toml index e6eba698..2631ebc2 100644 --- a/net/webrtchttp/Cargo.toml +++ b/net/webrtchttp/Cargo.toml @@ -20,6 +20,7 @@ parse_link_header = {version = "0.3", features = ["url"]} tokio = { version = "1.20.1", default-features = false, features = ["time", "rt-multi-thread"] } futures = "0.3.23" bytes = "1" +async-recursion = "1.0.0" [lib] name = "gstwebrtchttp" diff --git a/net/webrtchttp/src/utils.rs b/net/webrtchttp/src/utils.rs index e980a963..b2db1551 100644 --- a/net/webrtchttp/src/utils.rs +++ b/net/webrtchttp/src/utils.rs @@ -23,6 +23,59 @@ pub static RUNTIME: Lazy = Lazy::new(|| { .unwrap() }); +pub async fn wait_async( + canceller: &Mutex>, + future: F, + timeout: u32, +) -> Result +where + F: Send + Future, + T: Send + 'static, +{ + let (abort_handle, abort_registration) = future::AbortHandle::new_pair(); + { + let mut canceller_guard = canceller.lock().unwrap(); + canceller_guard.replace(abort_handle); + drop(canceller_guard); + } + + let future = async { + if timeout == 0 { + Ok(future.await) + } else { + let res = tokio::time::timeout(Duration::from_secs(timeout.into()), future).await; + + match res { + Ok(r) => Ok(r), + Err(e) => Err(WaitError::FutureError(gst::error_msg!( + gst::ResourceError::Read, + ["Request timeout, elapsed: {}", e] + ))), + } + } + }; + + let future = async { + match future::Abortable::new(future, abort_registration).await { + Ok(Ok(r)) => Ok(r), + + Ok(Err(err)) => Err(WaitError::FutureError(gst::error_msg!( + gst::ResourceError::Failed, + ["Future resolved with an error {:?}", err] + ))), + + Err(future::Aborted) => Err(WaitError::FutureAborted), + } + }; + + let res = future.await; + + let mut canceller_guard = canceller.lock().unwrap(); + *canceller_guard = None; + + res +} + pub fn wait( canceller: &Mutex>, future: F, diff --git a/net/webrtchttp/src/whepsrc/imp.rs b/net/webrtchttp/src/whepsrc/imp.rs index ec911fa7..5b38ef1f 100644 --- a/net/webrtchttp/src/whepsrc/imp.rs +++ b/net/webrtchttp/src/whepsrc/imp.rs @@ -8,19 +8,20 @@ // SPDX-License-Identifier: MPL-2.0 use crate::utils::{ - build_reqwest_client, parse_redirect_location, set_ice_servers, wait, WaitError, + build_reqwest_client, parse_redirect_location, set_ice_servers, wait, wait_async, WaitError, + RUNTIME, }; use crate::IceTransportPolicy; +use async_recursion::async_recursion; use bytes::Bytes; use futures::future; -use gst::{glib, prelude::*, subclass::prelude::*, ErrorMessage}; +use gst::{glib, prelude::*, subclass::prelude::*}; use gst_sdp::*; use gst_webrtc::*; use once_cell::sync::Lazy; use reqwest::header::{HeaderMap, HeaderValue}; use reqwest::StatusCode; use std::sync::Mutex; -use std::thread::{spawn, JoinHandle}; static CAT: Lazy = Lazy::new(|| { gst::DebugCategory::new( @@ -79,14 +80,8 @@ impl Default for Settings { #[derive(Debug)] enum State { Stopped, - Post { - redirects: u8, - thread_handle: Option>, - }, - Running { - whep_resource: String, - thread_handle: Option>, - }, + Post { redirects: u8 }, + Running { whep_resource: String }, } impl Default for State { @@ -407,6 +402,22 @@ impl BinImpl for WhepSrc { } impl WhepSrc { + fn raise_error(&self, resource_error: gst::ResourceError, msg: String) { + gst::error_msg!(resource_error, [msg.as_str()]); + gst::element_imp_error!(self, resource_error, [msg.as_str()]); + } + + fn handle_future_error(&self, err: WaitError) { + match err { + WaitError::FutureAborted => { + gst::warning!(CAT, imp: self, "Future aborted") + } + WaitError::FutureError(err) => { + self.raise_error(gst::ResourceError::Failed, err.to_string()) + } + }; + } + fn setup_webrtcbin(&self) { // The specification requires all m= lines to be bundled (section 4.5) self.webrtcbin @@ -429,17 +440,18 @@ impl WhepSrc { WebRTCICEGatheringState::Complete => { gst::info!(CAT, imp: self_, "ICE gathering completed"); - let mut state = self_.state.lock().unwrap(); let self_ref = self_.ref_counted(); - gst::debug!(CAT, imp: self_, "Spawning thread to send offer"); - let handle = spawn(move || self_ref.whep_offer()); - - *state = State::Post { - redirects: 0, - thread_handle: Some(handle), - }; - drop(state); + // With tokio's spawn one does not have to .await the + // returned JoinHandle to make the provided future start + // execution. It will start running in the background + // immediately when spawn is called. So silence the clippy + // warning. + #[allow(clippy::let_underscore_future)] + let _ = RUNTIME.spawn(async move { + /* Note that we check for a valid WHEP endpoint in change_state */ + self_ref.whep_offer().await + }); } _ => (), } @@ -560,30 +572,27 @@ impl WhepSrc { drop(settings); let mut state = self_.state.lock().unwrap(); - *state = State::Post { - redirects: 0, - thread_handle: None, - }; + *state = State::Post { redirects: 0 }; drop(state); - if let Err(e) = self_.initial_post_request(endpoint.unwrap()) { - gst::element_imp_error!( - self_, - gst::ResourceError::Failed, - ["Error in initial post request - {}", e.to_string()] - ); - return None; - } + self_.initial_post_request(endpoint.unwrap()); None } }); } - fn sdp_message_parse(&self, sdp_bytes: Bytes) -> Result<(), ErrorMessage> { - let sdp = sdp_message::SDPMessage::parse_buffer(&sdp_bytes).map_err(|_| { - gst::error_msg!(gst::ResourceError::Failed, ["Could not parse answer SDP"]) - })?; + fn sdp_message_parse(&self, sdp_bytes: Bytes) { + let sdp = match sdp_message::SDPMessage::parse_buffer(&sdp_bytes) { + Ok(sdp) => sdp, + Err(_) => { + self.raise_error( + gst::ResourceError::Failed, + "Could not parse answer SDP".to_string(), + ); + return; + } + }; let remote_sdp = WebRTCSessionDescription::new(WebRTCSDPType::Answer, sdp); @@ -612,55 +621,63 @@ impl WhepSrc { .emit_by_name::<()>("add-ice-candidate", &[&m_line_index, &c]); } } - - Ok(()) } - fn parse_endpoint_response( + async fn parse_endpoint_response( &self, - endpoint: reqwest::Url, - redirects: u8, + sess_desc: WebRTCSessionDescription, resp: reqwest::Response, - ) -> Result<(), ErrorMessage> { + redirects: u8, + ) { + let endpoint; + let timeout; + let use_link_headers; + + { + let settings = self.settings.lock().unwrap(); + endpoint = + reqwest::Url::parse(settings.whep_endpoint.as_ref().unwrap().as_str()).unwrap(); + use_link_headers = settings.use_link_headers; + timeout = settings.timeout; + drop(settings); + } + match resp.status() { StatusCode::OK | StatusCode::NO_CONTENT => { gst::info!(CAT, imp: self, "SDP offer successfully send"); - Ok(()) } StatusCode::CREATED => { gst::debug!(CAT, imp: self, "Response headers: {:?}", resp.headers()); - let settings = self - .settings - .lock() - .expect("Failed to acquire settings lock"); - let timeout = settings.timeout; - - if settings.use_link_headers { - set_ice_servers(&self.webrtcbin, resp.headers())?; + if use_link_headers { + if let Err(e) = set_ice_servers(&self.webrtcbin, resp.headers()) { + self.raise_error(gst::ResourceError::Failed, e.to_string()); + return; + }; } - drop(settings); - /* See section 4.2 of the WHEP specification */ let location = match resp.headers().get(reqwest::header::LOCATION) { Some(location) => location, None => { - return Err(gst::error_msg!( + self.raise_error( gst::ResourceError::Failed, - ["Location header field should be present for WHEP resource URL"] - )); + "Location header field should be present for WHEP resource URL" + .to_string(), + ); + return; } }; let location = match location.to_str() { Ok(loc) => loc, Err(e) => { - return Err(gst::error_msg!( + self.raise_error( gst::ResourceError::Failed, - ["Failed to convert location to string {}", e] - )); + format!("Failed to convert location to string: {e}"), + ); + return; } }; @@ -668,123 +685,109 @@ impl WhepSrc { gst::debug!(CAT, imp: self, "WHEP resource: {:?}", location); - let url = url.join(location).map_err(|err| { - gst::error_msg!( - gst::ResourceError::Failed, - ["URL join operation failed: {:?}", err] - ) - })?; - - let mut state = self.state.lock().unwrap(); - *state = match *state { - State::Post { - redirects: _r, - thread_handle: ref mut h, - } => State::Running { - whep_resource: url.to_string(), - thread_handle: h.take(), - }, - _ => { - return Err(gst::error_msg!( + let url = match url.join(location) { + Ok(joined_url) => joined_url, + Err(err) => { + self.raise_error( gst::ResourceError::Failed, - ["Expected to be in POST state"] - )); + format!("URL join operation failed: {err:?}"), + ); + return; } }; - drop(state); - let future = async { - resp.bytes().await.map_err(|err| { - gst::error_msg!( - gst::ResourceError::Failed, - ["Failed to get response body: {:?}", err] - ) - }) - }; + match wait_async(&self.canceller, resp.bytes(), timeout).await { + Ok(res) => match res { + Ok(ans_bytes) => { + let mut state = self.state.lock().unwrap(); + *state = match *state { + State::Post { redirects: _r } => State::Running { + whep_resource: url.to_string(), + }, + _ => { + self.raise_error( + gst::ResourceError::Failed, + "Expected to be in POST state".to_string(), + ); + return; + } + }; + drop(state); - match wait(&self.canceller, future, timeout) { - Ok(ans_bytes) => self.sdp_message_parse(ans_bytes), - Err(err) => match err { - WaitError::FutureAborted => Ok(()), - WaitError::FutureError(e) => Err(e), + self.sdp_message_parse(ans_bytes) + } + Err(err) => self.raise_error(gst::ResourceError::Failed, err.to_string()), }, + Err(err) => self.handle_future_error(err), } } status if status.is_redirection() => { if redirects < MAX_REDIRECTS { - let mut state = self.state.lock().unwrap(); - *state = match *state { - State::Post { - redirects: _r, - thread_handle: ref mut h, - } => State::Post { - redirects: redirects + 1, - thread_handle: h.take(), - }, - /* - * As per section 4.6 of the specification, redirection is - * not required to be supported for the PATCH and DELETE - * requests to the final WHEP resource URL. Only the initial - * POST request may support redirection. - */ - State::Running { .. } => { - return Err(gst::error_msg!( - gst::ResourceError::Failed, - ["Unexpected redirection in RUNNING state"] - )); - } - State::Stopped => unreachable!(), - }; - - drop(state); - match parse_redirect_location(resp.headers(), &endpoint) { Ok(redirect_url) => { + { + let mut state = self.state.lock().unwrap(); + *state = match *state { + State::Post { redirects: _r } => State::Post { + redirects: redirects + 1, + }, + /* + * As per section 4.6 of the specification, redirection is + * not required to be supported for the PATCH and DELETE + * requests to the final WHEP resource URL. Only the initial + * POST request may support redirection. + */ + State::Running { .. } => { + self.raise_error( + gst::ResourceError::Failed, + "Unexpected redirection in RUNNING state".to_string(), + ); + return; + } + State::Stopped => unreachable!(), + }; + drop(state); + } + gst::warning!( CAT, imp: self, "Redirecting endpoint to {}", redirect_url.as_str() ); - self.initial_post_request(redirect_url) + + if let Err(err) = + wait_async(&self.canceller, self.do_post(sess_desc), timeout).await + { + self.handle_future_error(err); + } } - Err(e) => Err(e), + Err(e) => self.raise_error(gst::ResourceError::Failed, e.to_string()), } } else { - Err(gst::error_msg!( + self.raise_error( gst::ResourceError::Failed, - ["Too many redirects. Unable to connect."] - )) + "Too many redirects. Unable to connect.".to_string(), + ); } } s => { - let future = async { - resp.bytes().await.map_err(|err| { - gst::error_msg!( + match wait_async(&self.canceller, resp.bytes(), timeout).await { + Ok(r) => { + let res = r + .map(|x| x.escape_ascii().to_string()) + .unwrap_or_else(|_| "(no further details)".to_string()); + + // FIXME: Check and handle 'Retry-After' header in case of server error + self.raise_error( gst::ResourceError::Failed, - ["Failed to get response body: {:?}", err] - ) - }) - }; - - let settings = self - .settings - .lock() - .expect("Failed to acquire settings lock"); - let timeout = settings.timeout; - drop(settings); - - let res = wait(&self.canceller, future, timeout) - .map(|x| x.escape_ascii().to_string()) - .unwrap_or_else(|_| "(no further details)".to_string()); - - // FIXME: Check and handle 'Retry-After' header in case of server error - Err(gst::error_msg!( - gst::ResourceError::Failed, - ["Unexpected response: {} - {}", s.as_str(), res] - )) + format!("Unexpected response: {} - {}", s.as_str(), res), + ); + } + Err(err) => self.handle_future_error(err), + } } } } @@ -833,11 +836,16 @@ impl WhepSrc { &[&offer_sdp, &None::], ); } else { - gst::error!(CAT, imp: self_, "Reply without an offer: {}", reply); + let error = reply + .value("error") + .expect("structure must have an error value") + .get::() + .expect("value must be a GLib error"); + gst::element_imp_error!( self_, gst::LibraryError::Failed, - ["generate offer::Promise returned with no reply"] + ["generate offer::Promise returned with no reply: {}", error] ); } }); @@ -878,7 +886,7 @@ impl WhepSrc { .emit_by_name::<()>("create-offer", &[&None::, &promise]); } - fn initial_post_request(&self, endpoint: reqwest::Url) -> Result<(), ErrorMessage> { + fn initial_post_request(&self, endpoint: reqwest::Url) { let state = self.state.lock().unwrap(); gst::info!(CAT, imp: self, "WHEP endpoint url: {}", endpoint.as_str()); @@ -886,20 +894,19 @@ impl WhepSrc { match *state { State::Post { .. } => (), _ => { - return Err(gst::error_msg!( + self.raise_error( gst::ResourceError::Failed, - ["Trying to do POST in unexpected state"] - )); + "Trying to do POST in unexpected state".to_string(), + ); + return; } }; drop(state); - self.generate_offer(); - - Ok(()) + self.generate_offer() } - fn whep_offer(&self) { + async fn whep_offer(&self) { let local_desc = self .webrtcbin .property::>("local-description"); @@ -923,41 +930,35 @@ impl WhepSrc { offer_sdp.sdp().as_text() ); - if let Err(e) = self.send_sdp(offer_sdp.sdp()) { - gst::element_imp_error!( - self, - gst::ResourceError::Failed, - ["Error in sending answer - {}", e.to_string()] - ); + let sess_desc = WebRTCSessionDescription::new(WebRTCSDPType::Offer, offer_sdp.sdp()); + + let timeout; + { + let settings = self.settings.lock().unwrap(); + timeout = settings.timeout; + drop(settings); + } + + if let Err(e) = wait_async(&self.canceller, self.do_post(sess_desc), timeout).await { + self.handle_future_error(e); } } - fn send_sdp(&self, sdp: SDPMessage) -> Result<(), gst::ErrorMessage> { - let sess_desc = WebRTCSessionDescription::new(WebRTCSDPType::Offer, sdp); - let settings = self.settings.lock().unwrap(); + #[async_recursion] + async fn do_post(&self, offer: WebRTCSessionDescription) { + let auth_token; + let endpoint; + let timeout; - let endpoint = reqwest::Url::parse(settings.whep_endpoint.as_ref().unwrap().as_str()); - - if let Err(e) = endpoint { - return Err(gst::error_msg!( - gst::ResourceError::Failed, - ["Could not parse endpoint URL: {}", e] - )); + { + let settings = self.settings.lock().unwrap(); + endpoint = + reqwest::Url::parse(settings.whep_endpoint.as_ref().unwrap().as_str()).unwrap(); + auth_token = settings.auth_token.clone(); + timeout = settings.timeout; + drop(settings); } - drop(settings); - - self.do_post(sess_desc, endpoint.unwrap()) - } - - fn do_post( - &self, - offer: WebRTCSessionDescription, - endpoint: reqwest::Url, - ) -> Result<(), gst::ErrorMessage> { - let settings = self.settings.lock().unwrap(); - let timeout = settings.timeout; - let sdp = offer.sdp(); let body = sdp.as_text().unwrap(); @@ -969,8 +970,8 @@ impl WhepSrc { HeaderValue::from_static("application/sdp"), ); - if let Some(token) = &settings.auth_token { - let bearer_token = "Bearer ".to_owned() + token; + if let Some(token) = auth_token.as_ref() { + let bearer_token = "Bearer ".to_owned() + token.as_str(); headermap.insert( reqwest::header::AUTHORIZATION, HeaderValue::from_str(bearer_token.as_str()) @@ -978,8 +979,6 @@ impl WhepSrc { ); } - drop(settings); - gst::debug!( CAT, imp: self, @@ -987,51 +986,63 @@ impl WhepSrc { endpoint.as_str() ); - let future = async { + let res = wait_async( + &self.canceller, self.client .request(reqwest::Method::POST, endpoint.clone()) .headers(headermap) .body(body) - .send() - .await - .map_err(|err| { - gst::error_msg!( - gst::ResourceError::Failed, - ["HTTP POST request failed {}: {:?}", endpoint.as_str(), err] - ) - }) - }; + .send(), + timeout, + ) + .await; - match wait(&self.canceller, future, timeout) { - Ok(resp) => self.parse_endpoint_response(endpoint, 0, resp), - Err(err) => match err { - WaitError::FutureAborted => Ok(()), - WaitError::FutureError(e) => Err(e), + match res { + Ok(resp) => match resp { + Ok(r) => { + #[allow(unused_mut)] + let mut redirects; + + { + let state = self.state.lock().unwrap(); + redirects = match *state { + State::Post { redirects } => redirects, + _ => { + self.raise_error( + gst::ResourceError::Failed, + "Trying to do POST in unexpected state".to_string(), + ); + return; + } + }; + drop(state); + } + + if let Err(e) = wait_async( + &self.canceller, + self.parse_endpoint_response(offer, r, redirects), + timeout, + ) + .await + { + self.handle_future_error(e); + } + } + Err(err) => self.raise_error(gst::ResourceError::Failed, err.to_string()), }, + Err(err) => self.handle_future_error(err), } } fn terminate_session(&self) { let settings = self.settings.lock().unwrap(); - let mut state = self.state.lock().unwrap(); + let state = self.state.lock().unwrap(); let timeout = settings.timeout; - let resource_url; - (*state, resource_url) = match *state { + let resource_url = match *state { State::Running { whep_resource: ref whep_resource_url, - thread_handle: ref mut h, - } => { - if let Some(th) = h.take() { - match th.join() { - Ok(_) => { - gst::debug!(CAT, imp: self, "Send offer thread joined successfully"); - } - Err(e) => gst::error!(CAT, imp: self, "Failed to join thread: {:?}", e), - } - } - (State::Stopped, whep_resource_url.clone()) - } + } => whep_resource_url.clone(), _ => { gst::element_imp_error!( self, diff --git a/net/webrtchttp/src/whipsink/imp.rs b/net/webrtchttp/src/whipsink/imp.rs index 35f28067..12ef0dee 100644 --- a/net/webrtchttp/src/whipsink/imp.rs +++ b/net/webrtchttp/src/whipsink/imp.rs @@ -8,14 +8,15 @@ // SPDX-License-Identifier: MPL-2.0 use crate::utils::{ - build_reqwest_client, parse_redirect_location, set_ice_servers, wait, WaitError, + build_reqwest_client, parse_redirect_location, set_ice_servers, wait, wait_async, WaitError, + RUNTIME, }; use crate::IceTransportPolicy; +use async_recursion::async_recursion; use futures::future; use gst::glib; use gst::prelude::*; use gst::subclass::prelude::*; -use gst::ErrorMessage; use gst_sdp::*; use gst_webrtc::*; use once_cell::sync::Lazy; @@ -23,14 +24,12 @@ use reqwest::header::HeaderMap; use reqwest::header::HeaderValue; use reqwest::StatusCode; use std::sync::Mutex; -use std::thread::{spawn, JoinHandle}; static CAT: Lazy = Lazy::new(|| { gst::DebugCategory::new("whipsink", gst::DebugColorFlags::empty(), Some("WHIP Sink")) }); -const DEFAULT_ICE_TRANSPORT_POLICY: IceTransportPolicy = - IceTransportPolicy::All; +const DEFAULT_ICE_TRANSPORT_POLICY: IceTransportPolicy = IceTransportPolicy::All; const MAX_REDIRECTS: u8 = 10; const DEFAULT_TIMEOUT: u32 = 15; @@ -63,14 +62,8 @@ impl Default for Settings { #[derive(Debug)] enum State { Stopped, - Post { - redirects: u8, - thread_handle: Option>, - }, - Running { - whip_resource_url: String, - thread_handle: Option>, - }, + Post { redirects: u8 }, + Running { whip_resource_url: String }, } impl Default for State { @@ -368,17 +361,18 @@ impl ObjectImpl for WhipSink { WebRTCICEGatheringState::Complete => { gst::info!(CAT, imp: self_, "ICE gathering completed"); - let mut state = self_.state.lock().unwrap(); let self_ref = self_.ref_counted(); - gst::debug!(CAT, imp: self_, "Spawning thread to send offer"); - let handle = spawn(move || self_ref.send_offer()); - - *state = State::Post { - redirects: 0, - thread_handle: Some(handle), - }; - drop(state); + // With tokio's spawn one does not have to .await the + // returned JoinHandle to make the provided future start + // execution. It will start running in the background + // immediately when spawn is called. So silence the clippy + // warning. + #[allow(clippy::let_underscore_future)] + let _ = RUNTIME.spawn(async move { + /* Note that we check for a valid WHIP endpoint in change_state */ + self_ref.send_offer().await + }); } _ => (), } @@ -491,27 +485,28 @@ impl ObjectSubclass for WhipSink { } impl WhipSink { - fn send_offer(&self) { - let settings = self.settings.lock().unwrap(); + fn raise_error(&self, resource_error: gst::ResourceError, msg: String) { + gst::error_msg!(resource_error, [msg.as_str()]); + gst::element_imp_error!(self, resource_error, [msg.as_str()]); + } - /* Note that we check for a valid WHIP endpoint in change_state */ - let endpoint = reqwest::Url::parse(settings.whip_endpoint.as_ref().unwrap().as_str()); - if let Err(e) = endpoint { - gst::element_imp_error!( - self, - gst::ResourceError::Failed, - ["Could not parse endpoint URL: {}", e] - ); - return; - } - - drop(settings); - let mut state = self.state.lock().unwrap(); - *state = State::Post { - redirects: 0, - thread_handle: None, + fn handle_future_error(&self, err: WaitError) { + match err { + WaitError::FutureAborted => { + gst::warning!(CAT, imp: self, "Future aborted") + } + WaitError::FutureError(err) => { + self.raise_error(gst::ResourceError::Failed, err.to_string()) + } }; - drop(state); + } + + async fn send_offer(&self) { + { + let mut state = self.state.lock().unwrap(); + *state = State::Post { redirects: 0 }; + drop(state); + } let local_desc = self .webrtcbin @@ -536,40 +531,50 @@ impl WhipSink { offer_sdp.sdp().as_text() ); - match self.do_post(offer_sdp, endpoint.unwrap()) { - Ok(_) => (), - Err(e) => { - gst::element_imp_error!( - self, - gst::ResourceError::Failed, - ["Failed to send offer: {}", e] - ); - } + let timeout; + { + let settings = self.settings.lock().unwrap(); + timeout = settings.timeout; + drop(settings); + } + + if let Err(e) = wait_async(&self.canceller, self.do_post(offer_sdp), timeout).await { + self.handle_future_error(e); } } - fn do_post( - &self, - offer: gst_webrtc::WebRTCSessionDescription, - endpoint: reqwest::Url, - ) -> Result<(), gst::ErrorMessage> { - let settings = self.settings.lock().unwrap(); - let state = self.state.lock().unwrap(); - let timeout = settings.timeout; + #[async_recursion] + async fn do_post(&self, offer: gst_webrtc::WebRTCSessionDescription) { + let auth_token; + let endpoint; + let timeout; - let redirects = match *state { - State::Post { - redirects, - thread_handle: ref _h, - } => redirects, - _ => { - return Err(gst::error_msg!( - gst::ResourceError::Failed, - ["Trying to POST in unexpected state"] - )); - } - }; - drop(state); + { + let settings = self.settings.lock().unwrap(); + endpoint = + reqwest::Url::parse(settings.whip_endpoint.as_ref().unwrap().as_str()).unwrap(); + auth_token = settings.auth_token.clone(); + timeout = settings.timeout; + drop(settings); + } + + #[allow(unused_mut)] + let mut redirects; + + { + let state = self.state.lock().unwrap(); + redirects = match *state { + State::Post { redirects } => redirects, + _ => { + self.raise_error( + gst::ResourceError::Failed, + "Trying to do POST in unexpected state".to_string(), + ); + return; + } + }; + drop(state); + } // Default policy for redirect does not share the auth token to new location // So disable inbuilt redirecting and do a recursive call upon 3xx response code @@ -586,7 +591,7 @@ impl WhipSink { HeaderValue::from_static("application/sdp"), ); - if let Some(token) = &settings.auth_token { + if let Some(token) = auth_token.as_ref() { let bearer_token = "Bearer ".to_owned() + token; headermap.insert( reqwest::header::AUTHORIZATION, @@ -595,51 +600,63 @@ impl WhipSink { ); } - let future = async { + let res = wait_async( + &self.canceller, client - .request(reqwest::Method::POST, endpoint.as_ref()) + .request(reqwest::Method::POST, endpoint.clone()) .headers(headermap) .body(body) - .send() - .await - .map_err(|err| { - gst::error_msg!( - gst::ResourceError::Failed, - ["POST request failed: {:?}", err] + .send(), + timeout, + ) + .await; + + match res { + Ok(r) => match r { + Ok(resp) => { + if let Err(e) = wait_async( + &self.canceller, + self.parse_endpoint_response(offer, resp, redirects), + timeout, ) - }) - }; - - drop(settings); - - match wait(&self.canceller, future, timeout) { - Ok(resp) => self.parse_endpoint_response(offer, endpoint, redirects, resp), - Err(err) => match err { - WaitError::FutureAborted => Ok(()), - WaitError::FutureError(e) => Err(e), + .await + { + self.handle_future_error(e); + } + } + Err(err) => self.raise_error(gst::ResourceError::Failed, err.to_string()), }, + Err(err) => self.handle_future_error(err), } } - fn parse_endpoint_response( + async fn parse_endpoint_response( &self, offer: gst_webrtc::WebRTCSessionDescription, - endpoint: reqwest::Url, - redirects: u8, resp: reqwest::Response, - ) -> Result<(), ErrorMessage> { + redirects: u8, + ) { + let endpoint; + let timeout; + let use_link_headers; + + { + let settings = self.settings.lock().unwrap(); + endpoint = + reqwest::Url::parse(settings.whip_endpoint.as_ref().unwrap().as_str()).unwrap(); + use_link_headers = settings.use_link_headers; + timeout = settings.timeout; + drop(settings); + } + match resp.status() { StatusCode::OK | StatusCode::CREATED => { - let settings = self - .settings - .lock() - .expect("Failed to acquire settings lock"); - let timeout = settings.timeout; - - if settings.use_link_headers { - set_ice_servers(&self.webrtcbin, resp.headers())?; + if use_link_headers { + if let Err(e) = set_ice_servers(&self.webrtcbin, resp.headers()) { + self.raise_error(gst::ResourceError::Failed, e.to_string()); + return; + }; } - drop(settings); // Get the url of the resource from 'location' header. // The resource created is expected be a relative path @@ -650,20 +667,23 @@ impl WhipSink { let location = match resp.headers().get(reqwest::header::LOCATION) { Some(location) => location, None => { - return Err(gst::error_msg!( + self.raise_error( gst::ResourceError::Failed, - ["Location header field should be present for WHIP resource URL"] - )); + "Location header field should be present for WHIP resource URL" + .to_string(), + ); + return; } }; let location = match location.to_str() { Ok(loc) => loc, Err(e) => { - return Err(gst::error_msg!( + self.raise_error( gst::ResourceError::Failed, - ["Failed to convert location to string {}", e] - )); + format!("Failed to convert location to string: {e}"), + ); + return; } }; @@ -671,164 +691,139 @@ impl WhipSink { gst::debug!(CAT, imp: self, "WHIP resource: {:?}", location); - let url = url.join(location).map_err(|err| { - gst::error_msg!( - gst::ResourceError::Failed, - ["URL join operation failed: {:?}", err] - ) - })?; - - let mut state = self.state.lock().unwrap(); - *state = match *state { - State::Post { - redirects: _r, - thread_handle: ref mut h, - } => State::Running { - whip_resource_url: url.to_string(), - thread_handle: h.take(), - }, - _ => { - return Err(gst::error_msg!( + let url = match url.join(location) { + Ok(joined_url) => joined_url, + Err(err) => { + self.raise_error( gst::ResourceError::Failed, - ["Expected to be in POST state"] - )); + format!("URL join operation failed: {err:?}"), + ); + return; } }; - drop(state); - let future = async { - resp.bytes().await.map_err(|err| { - gst::error_msg!( - gst::ResourceError::Failed, - ["Failed to get response body: {:?}", err] - ) - }) - }; - - match wait(&self.canceller, future, timeout) { - Ok(ans_bytes) => match sdp_message::SDPMessage::parse_buffer(&ans_bytes) { - Ok(ans_sdp) => { - let answer = gst_webrtc::WebRTCSessionDescription::new( - gst_webrtc::WebRTCSDPType::Answer, - ans_sdp, + { + let mut state = self.state.lock().unwrap(); + *state = match *state { + State::Post { redirects: _r } => State::Running { + whip_resource_url: url.to_string(), + }, + _ => { + self.raise_error( + gst::ResourceError::Failed, + "Expected to be in POST state".to_string(), ); - self.webrtcbin.emit_by_name::<()>( - "set-remote-description", - &[&answer, &None::], - ); - Ok(()) + return; } + }; + drop(state); + } - Err(e) => Err(gst::error_msg!( - gst::ResourceError::Failed, - ["Could not parse answer SDP: {}", e] - )), - }, - Err(err) => match err { - WaitError::FutureAborted => Ok(()), - WaitError::FutureError(e) => Err(e), + match wait_async(&self.canceller, resp.bytes(), timeout).await { + Ok(res) => match res { + Ok(ans_bytes) => match sdp_message::SDPMessage::parse_buffer(&ans_bytes) { + Ok(ans_sdp) => { + let answer = gst_webrtc::WebRTCSessionDescription::new( + gst_webrtc::WebRTCSDPType::Answer, + ans_sdp, + ); + self.webrtcbin.emit_by_name::<()>( + "set-remote-description", + &[&answer, &None::], + ); + } + Err(err) => { + self.raise_error( + gst::ResourceError::Failed, + format!("Could not parse answer SDP: {err}"), + ); + } + }, + Err(err) => self.raise_error(gst::ResourceError::Failed, err.to_string()), }, + Err(err) => self.handle_future_error(err), } } s if s.is_redirection() => { if redirects < MAX_REDIRECTS { - let mut state = self.state.lock().unwrap(); - *state = match *state { - State::Post { - redirects: _r, - thread_handle: ref mut h, - } => State::Post { - redirects: redirects + 1, - thread_handle: h.take(), - }, - /* - * As per section 4.6 of the specification, redirection is - * not required to be supported for the PATCH and DELETE - * requests to the final WHEP resource URL. Only the initial - * POST request may support redirection. - */ - State::Running { .. } => { - return Err(gst::error_msg!( - gst::ResourceError::Failed, - ["Unexpected redirection in RUNNING state"] - )); - } - State::Stopped => unreachable!(), - }; - drop(state); - match parse_redirect_location(resp.headers(), &endpoint) { Ok(redirect_url) => { + { + let mut state = self.state.lock().unwrap(); + *state = match *state { + State::Post { redirects: _r } => State::Post { + redirects: redirects + 1, + }, + /* + * As per section 4.6 of the specification, redirection is + * not required to be supported for the PATCH and DELETE + * requests to the final WHEP resource URL. Only the initial + * POST request may support redirection. + */ + State::Running { .. } => { + self.raise_error( + gst::ResourceError::Failed, + "Unexpected redirection in RUNNING state".to_string(), + ); + return; + } + State::Stopped => unreachable!(), + }; + drop(state); + } + gst::debug!( CAT, imp: self, "Redirecting endpoint to {}", redirect_url.as_str() ); - self.do_post(offer, redirect_url) + + if let Err(err) = + wait_async(&self.canceller, self.do_post(offer), timeout).await + { + self.handle_future_error(err); + } } - Err(e) => Err(e), + Err(e) => self.raise_error(gst::ResourceError::Failed, e.to_string()), } } else { - Err(gst::error_msg!( + self.raise_error( gst::ResourceError::Failed, - ["Too many redirects. Unable to connect to do POST"] - )) + "Too many redirects. Unable to connect.".to_string(), + ); } } s => { - let future = async { - resp.bytes().await.map_err(|err| { - gst::error_msg!( + match wait_async(&self.canceller, resp.bytes(), timeout).await { + Ok(r) => { + let res = r + .map(|x| x.escape_ascii().to_string()) + .unwrap_or_else(|_| "(no further details)".to_string()); + + // FIXME: Check and handle 'Retry-After' header in case of server error + self.raise_error( gst::ResourceError::Failed, - ["Failed to get response body: {:?}", err] - ) - }) - }; - - let settings = self - .settings - .lock() - .expect("Failed to acquire settings lock"); - let timeout = settings.timeout; - drop(settings); - - let resp = wait(&self.canceller, future, timeout) - .map(|x| x.escape_ascii().to_string()) - .unwrap_or_else(|_| "(no further details)".to_string()); - - // FIXME: Check and handle 'Retry-After' header in case of server error - Err(gst::error_msg!( - gst::ResourceError::Failed, - ["Server returned error: {} - {}", s.as_str(), resp] - )) + format!("Unexpected response: {} - {}", s.as_str(), res), + ); + } + Err(err) => self.handle_future_error(err), + } } } } fn terminate_session(&self) { let settings = self.settings.lock().unwrap(); - let mut state = self.state.lock().unwrap(); + let state = self.state.lock().unwrap(); let timeout = settings.timeout; - let resource_url; - (*state, resource_url) = match *state { + let resource_url = match *state { State::Running { whip_resource_url: ref resource_url, - thread_handle: ref mut h, - } => { - if let Some(th) = h.take() { - match th.join() { - Ok(_) => { - gst::debug!(CAT, imp: self, "Send offer thread joined successfully"); - } - Err(e) => gst::error!(CAT, imp: self, "Failed to join thread: {:?}", e), - } - } - (State::Stopped, resource_url.clone()) - } + } => resource_url.clone(), _ => { gst::element_imp_error!( self,