diff --git a/net/webrtchttp/src/utils.rs b/net/webrtchttp/src/utils.rs index f05030d3..e980a963 100644 --- a/net/webrtchttp/src/utils.rs +++ b/net/webrtchttp/src/utils.rs @@ -2,13 +2,18 @@ use futures::future; use futures::prelude::*; use gst::{prelude::*, ErrorMessage}; use once_cell::sync::Lazy; -use parse_link_header; use reqwest::header::HeaderMap; use reqwest::redirect::Policy; use std::sync::Mutex; use std::time::Duration; use tokio::runtime; +#[derive(Debug)] +pub enum WaitError { + FutureAborted, + FutureError(ErrorMessage), +} + pub static RUNTIME: Lazy = Lazy::new(|| { runtime::Builder::new_multi_thread() .enable_all() @@ -21,8 +26,8 @@ pub static RUNTIME: Lazy = Lazy::new(|| { pub fn wait( canceller: &Mutex>, future: F, - timeout: u32 -) -> Result + timeout: u32, +) -> Result where F: Send + Future>, T: Send + 'static, @@ -31,10 +36,10 @@ where let (abort_handle, abort_registration) = future::AbortHandle::new_pair(); if canceller_guard.is_some() { - return Err(gst::error_msg!( + return Err(WaitError::FutureError(gst::error_msg!( gst::ResourceError::Failed, ["Old Canceller should not exist"] - )); + ))); } canceller_guard.replace(abort_handle); @@ -60,15 +65,12 @@ where match future::Abortable::new(future, abort_registration).await { Ok(Ok(res)) => Ok(res), - Ok(Err(err)) => Err(gst::error_msg!( + Ok(Err(err)) => Err(WaitError::FutureError(gst::error_msg!( gst::ResourceError::Failed, - ["Future resolved with an error {}", err.to_string()] - )), + ["Future resolved with an error {:?}", err] + ))), - Err(future::Aborted) => Err(gst::error_msg!( - gst::ResourceError::Failed, - ["Canceller called before future resolved"] - )), + Err(future::Aborted) => Err(WaitError::FutureAborted), } }; diff --git a/net/webrtchttp/src/whepsrc/imp.rs b/net/webrtchttp/src/whepsrc/imp.rs index 8a1acdcc..fe0147e8 100644 --- a/net/webrtchttp/src/whepsrc/imp.rs +++ b/net/webrtchttp/src/whepsrc/imp.rs @@ -7,7 +7,9 @@ // // SPDX-License-Identifier: MPL-2.0 -use crate::utils::{build_reqwest_client, parse_redirect_location, set_ice_servers, wait}; +use crate::utils::{ + build_reqwest_client, parse_redirect_location, set_ice_servers, wait, WaitError, +}; use crate::GstRsWebRTCICETransportPolicy; use bytes::Bytes; use futures::future; @@ -18,6 +20,7 @@ use once_cell::sync::Lazy; use reqwest::header::{HeaderMap, HeaderValue}; use reqwest::StatusCode; use std::sync::Mutex; +use std::thread::{spawn, JoinHandle}; static CAT: Lazy = Lazy::new(|| { gst::DebugCategory::new( @@ -74,11 +77,17 @@ impl Default for Settings { } } -#[derive(Debug, Clone)] +#[derive(Debug)] enum State { Stopped, - Post { redirects: u8 }, - Running { whep_resource: String }, + Post { + redirects: u8, + thread_handle: Option>, + }, + Running { + whep_resource: String, + thread_handle: Option>, + }, } impl Default for State { @@ -203,9 +212,7 @@ impl ElementImpl for WhepSrc { } } - let ret = self.parent_change_state(transition); - - ret + self.parent_change_state(transition) } } @@ -423,7 +430,17 @@ impl WhepSrc { WebRTCICEGatheringState::Complete => { gst::info!(CAT, imp: self_, "ICE gathering completed"); - self_.whep_offer(); + let mut state = self_.state.lock().unwrap(); + let self_ref = self_.ref_counted(); + + gst::debug!(CAT, imp: self_, "Spawning thread to send offer"); + let handle = spawn(move || self_ref.whep_offer()); + + *state = State::Post { + redirects: 0, + thread_handle: Some(handle), + }; + drop(state); } _ => (), } @@ -544,7 +561,10 @@ impl WhepSrc { drop(settings); let mut state = self_.state.lock().unwrap(); - *state = State::Post { redirects: 0 }; + *state = State::Post { + redirects: 0, + thread_handle: None, + }; drop(state); if let Err(e) = self_.initial_post_request(endpoint.unwrap()) { @@ -562,11 +582,8 @@ impl WhepSrc { } fn sdp_message_parse(&self, sdp_bytes: Bytes) -> Result<(), ErrorMessage> { - let sdp = sdp_message::SDPMessage::parse_buffer(&sdp_bytes).or_else(|_| { - Err(gst::error_msg!( - gst::ResourceError::Failed, - ["Could not parse answer SDP"] - )) + let sdp = sdp_message::SDPMessage::parse_buffer(&sdp_bytes).map_err(|_| { + gst::error_msg!(gst::ResourceError::Failed, ["Could not parse answer SDP"]) })?; let remote_sdp = WebRTCSessionDescription::new(WebRTCSDPType::Answer, sdp); @@ -660,8 +677,20 @@ impl WhepSrc { })?; let mut state = self.state.lock().unwrap(); - *state = State::Running { - whep_resource: url.to_string(), + *state = match *state { + State::Post { + redirects: _r, + thread_handle: ref mut h, + } => State::Running { + whep_resource: url.to_string(), + thread_handle: h.take(), + }, + _ => { + return Err(gst::error_msg!( + gst::ResourceError::Failed, + ["Expected to be in POST state"] + )); + } }; drop(state); @@ -674,29 +703,39 @@ impl WhepSrc { }) }; - let ans_bytes = wait(&self.canceller, future, timeout)?; - - self.sdp_message_parse(ans_bytes) + match wait(&self.canceller, future, timeout) { + Ok(ans_bytes) => self.sdp_message_parse(ans_bytes), + Err(err) => match err { + WaitError::FutureAborted => Ok(()), + WaitError::FutureError(e) => Err(e), + }, + } } status if status.is_redirection() => { if redirects < MAX_REDIRECTS { let mut state = self.state.lock().unwrap(); - /* - * As per section 4.6 of the specification, redirection is - * not required to be supported for the PATCH and DELETE - * requests to the final WHEP resource URL. Only the initial - * POST request may support redirection. - */ - if let State::Running { .. } = *state { - return Err(gst::error_msg!( - gst::ResourceError::Failed, - ["Unexpected redirection in RUNNING state"] - )); - } - - *state = State::Post { - redirects: redirects + 1, + *state = match *state { + State::Post { + redirects: _r, + thread_handle: ref mut h, + } => State::Post { + redirects: redirects + 1, + thread_handle: h.take(), + }, + /* + * As per section 4.6 of the specification, redirection is + * not required to be supported for the PATCH and DELETE + * requests to the final WHEP resource URL. Only the initial + * POST request may support redirection. + */ + State::Running { .. } => { + return Err(gst::error_msg!( + gst::ResourceError::Failed, + ["Unexpected redirection in RUNNING state"] + )); + } + State::Stopped => unreachable!(), }; drop(state); @@ -845,8 +884,8 @@ impl WhepSrc { gst::info!(CAT, imp: self, "WHEP endpoint url: {}", endpoint.as_str()); - let _ = match *state { - State::Post { redirects } => redirects, + match *state { + State::Post { .. } => (), _ => { return Err(gst::error_msg!( gst::ResourceError::Failed, @@ -964,20 +1003,36 @@ impl WhepSrc { }) }; - let resp = wait(&self.canceller, future, timeout)?; - - self.parse_endpoint_response(endpoint, 0, resp) + match wait(&self.canceller, future, timeout) { + Ok(resp) => self.parse_endpoint_response(endpoint, 0, resp), + Err(err) => match err { + WaitError::FutureAborted => Ok(()), + WaitError::FutureError(e) => Err(e), + }, + } } fn terminate_session(&self) { let settings = self.settings.lock().unwrap(); - let state = self.state.lock().unwrap(); + let mut state = self.state.lock().unwrap(); let timeout = settings.timeout; + let resource_url; - let resource_url = match *state { + (*state, resource_url) = match *state { State::Running { whep_resource: ref whep_resource_url, - } => whep_resource_url.clone(), + thread_handle: ref mut h, + } => { + if let Some(th) = h.take() { + match th.join() { + Ok(_) => { + gst::debug!(CAT, imp: self, "Send offer thread joined successfully"); + } + Err(e) => gst::error!(CAT, imp: self, "Failed to join thread: {:?}", e), + } + } + (State::Stopped, whep_resource_url.clone()) + } _ => { element_imp_error!( self, @@ -1025,9 +1080,14 @@ impl WhepSrc { Ok(r) => { gst::debug!(CAT, imp: self, "Response to DELETE : {}", r.status()); } - Err(e) => { - gst::error!(CAT, imp: self, "Error on DELETE request : {}", e); - } + Err(e) => match e { + WaitError::FutureAborted => { + gst::warning!(CAT, imp: self, "DELETE request aborted") + } + WaitError::FutureError(e) => { + gst::error!(CAT, imp: self, "Error on DELETE request : {}", e) + } + }, }; } } diff --git a/net/webrtchttp/src/whipsink/imp.rs b/net/webrtchttp/src/whipsink/imp.rs index 5d0616e9..884ebb73 100644 --- a/net/webrtchttp/src/whipsink/imp.rs +++ b/net/webrtchttp/src/whipsink/imp.rs @@ -7,7 +7,9 @@ // // SPDX-License-Identifier: MPL-2.0 -use crate::utils::{build_reqwest_client, parse_redirect_location, set_ice_servers, wait}; +use crate::utils::{ + build_reqwest_client, parse_redirect_location, set_ice_servers, wait, WaitError, +}; use crate::GstRsWebRTCICETransportPolicy; use futures::future; use gst::element_imp_error; @@ -22,6 +24,7 @@ use reqwest::header::HeaderMap; use reqwest::header::HeaderValue; use reqwest::StatusCode; use std::sync::Mutex; +use std::thread::{spawn, JoinHandle}; static CAT: Lazy = Lazy::new(|| { gst::DebugCategory::new("whipsink", gst::DebugColorFlags::empty(), Some("WHIP Sink")) @@ -58,12 +61,17 @@ impl Default for Settings { } } -#[derive(Debug, Clone)] +#[derive(Debug)] enum State { Stopped, - Options { redirects: u8 }, - Post { redirects: u8 }, - Running { whip_resource_url: String }, + Post { + redirects: u8, + thread_handle: Option>, + }, + Running { + whip_resource_url: String, + thread_handle: Option>, + }, } impl Default for State { @@ -191,9 +199,7 @@ impl ElementImpl for WhipSink { } } - let ret = self.parent_change_state(transition); - - ret + self.parent_change_state(transition) } } @@ -209,7 +215,7 @@ impl ObjectImpl for WhipSink { glib::ParamSpecBoolean::builder("use-link-headers") .nick("Use Link Headers") - .blurb("Use link headers to configure ice-servers from the WHIP server response to the POST or OPTIONS request. + .blurb("Use link headers to configure ice-servers from the WHIP server response to the POST request. If set to TRUE and the WHIP server returns valid ice-servers, this property overrides the ice-servers values set using the stun-server and turn-server properties.") .mutable_ready() @@ -363,7 +369,17 @@ impl ObjectImpl for WhipSink { WebRTCICEGatheringState::Complete => { gst::info!(CAT, imp: self_, "ICE gathering completed"); - self_.send_offer(); + let mut state = self_.state.lock().unwrap(); + let self_ref = self_.ref_counted(); + + gst::debug!(CAT, imp: self_, "Spawning thread to send offer"); + let handle = spawn(move || self_ref.send_offer()); + + *state = State::Post { + redirects: 0, + thread_handle: Some(handle), + }; + drop(state); } _ => (), } @@ -403,17 +419,6 @@ impl ObjectImpl for WhipSink { } drop(settings); - let mut state = whipsink.state.lock().unwrap(); - *state = State::Options { redirects: 0 }; - drop(state); - - if let Err(e) = whipsink.lookup_ice_servers(endpoint.unwrap()) { - gst::element_error!( - ele, - gst::ResourceError::Failed, - ["Error in 'lookup_ice_servers' - {}", e.to_string()] - ); - } // Promise for 'create-offer' signal emitted to webrtcbin // Closure is called when the promise is fulfilled @@ -487,115 +492,6 @@ impl ObjectSubclass for WhipSink { } impl WhipSink { - fn lookup_ice_servers(&self, endpoint: reqwest::Url) -> Result<(), ErrorMessage> { - let settings = self.settings.lock().unwrap(); - let state = self.state.lock().unwrap(); - let timeout = settings.timeout; - - let redirects = match *state { - State::Options { redirects } => redirects, - _ => { - return Err(gst::error_msg!( - gst::ResourceError::Failed, - ["Trying to do OPTIONS in unexpected state"] - )); - } - }; - drop(state); - - if !settings.use_link_headers { - // We're not configured to use OPTIONS, so we're done - return Ok(()); - } - - // We'll handle redirects manually since the default redirect handler does not - // reuse the authentication token on the redirected server - let pol = reqwest::redirect::Policy::none(); - let client = build_reqwest_client(pol); - - let mut headermap = HeaderMap::new(); - if let Some(token) = &settings.auth_token { - let bearer_token = "Bearer ".to_owned() + token; - drop(settings); - headermap.insert( - reqwest::header::AUTHORIZATION, - HeaderValue::from_str(&bearer_token) - .expect("Auth token should only contain characters valid for an HTTP header"), - ); - } - - let future = async { - client - .request(reqwest::Method::OPTIONS, endpoint.as_ref()) - .headers(headermap) - .send() - .await - .map_err(|err| { - gst::error_msg!( - gst::ResourceError::Failed, - ["OPTIONS request failed: {:?}", err] - ) - }) - }; - - let resp = wait(&self.canceller, future, timeout)?; - - match resp.status() { - StatusCode::NO_CONTENT => { - set_ice_servers(&self.webrtcbin, resp.headers())?; - Ok(()) - } - status if status.is_redirection() => { - if redirects < MAX_REDIRECTS { - let mut state = self.state.lock().unwrap(); - *state = State::Options { - redirects: redirects + 1, - }; - drop(state); - - match parse_redirect_location(resp.headers(), &endpoint) { - Ok(redirect_url) => { - gst::debug!( - CAT, - imp: self, - "Redirecting endpoint to {}", - redirect_url.as_str() - ); - self.lookup_ice_servers(redirect_url) - } - Err(e) => Err(e), - } - } else { - Err(gst::error_msg!( - gst::ResourceError::Failed, - ["Too many redirects. Unable to connect to do OPTIONS request"] - )) - } - } - status => { - let future = async { - resp.bytes().await.map_err(|err| { - gst::error_msg!( - gst::ResourceError::Failed, - ["Failed to get response body: {:?}", err] - ) - }) - }; - - let res = wait(&self.canceller, future, timeout); - - Err(gst::error_msg!( - gst::ResourceError::Failed, - [ - "lookup_ice_servers - Unexpected response {} {:?}", - status, - res - ] - )) - } - } - } - fn send_offer(&self) { let settings = self.settings.lock().unwrap(); @@ -612,7 +508,10 @@ impl WhipSink { drop(settings); let mut state = self.state.lock().unwrap(); - *state = State::Post { redirects: 0 }; + *state = State::Post { + redirects: 0, + thread_handle: None, + }; drop(state); let local_desc = self @@ -639,12 +538,7 @@ impl WhipSink { ); match self.do_post(offer_sdp, endpoint.unwrap()) { - Ok(answer) => { - self.webrtcbin.emit_by_name::<()>( - "set-remote-description", - &[&answer, &None::], - ); - } + Ok(_) => (), Err(e) => { element_imp_error!( self, @@ -659,13 +553,16 @@ impl WhipSink { &self, offer: gst_webrtc::WebRTCSessionDescription, endpoint: reqwest::Url, - ) -> Result { + ) -> Result<(), gst::ErrorMessage> { let settings = self.settings.lock().unwrap(); let state = self.state.lock().unwrap(); let timeout = settings.timeout; let redirects = match *state { - State::Post { redirects } => redirects, + State::Post { + redirects, + thread_handle: ref _h, + } => redirects, _ => { return Err(gst::error_msg!( gst::ResourceError::Failed, @@ -692,7 +589,6 @@ impl WhipSink { if let Some(token) = &settings.auth_token { let bearer_token = "Bearer ".to_owned() + token; - drop(settings); headermap.insert( reqwest::header::AUTHORIZATION, HeaderValue::from_str(bearer_token.as_str()) @@ -715,14 +611,36 @@ impl WhipSink { }) }; - let resp = wait(&self.canceller, future, timeout)?; + drop(settings); - let res = match resp.status() { + match wait(&self.canceller, future, timeout) { + Ok(resp) => self.parse_endpoint_response(offer, endpoint, redirects, resp), + Err(err) => match err { + WaitError::FutureAborted => Ok(()), + WaitError::FutureError(e) => Err(e), + }, + } + } + + fn parse_endpoint_response( + &self, + offer: gst_webrtc::WebRTCSessionDescription, + endpoint: reqwest::Url, + redirects: u8, + resp: reqwest::Response, + ) -> Result<(), ErrorMessage> { + match resp.status() { StatusCode::OK | StatusCode::CREATED => { - // XXX: this is a long function, we should factor it out. - // Note: Not taking care of 'Link' headers in POST response - // because we do a mandatory OPTIONS request before 'create-offer' - // and update the ICE servers from response to OPTIONS + let settings = self + .settings + .lock() + .expect("Failed to acquire settings lock"); + let timeout = settings.timeout; + + if settings.use_link_headers { + set_ice_servers(&self.webrtcbin, resp.headers())?; + } + drop(settings); // Get the url of the resource from 'location' header. // The resource created is expected be a relative path @@ -762,8 +680,20 @@ impl WhipSink { })?; let mut state = self.state.lock().unwrap(); - *state = State::Running { - whip_resource_url: url.to_string(), + *state = match *state { + State::Post { + redirects: _r, + thread_handle: ref mut h, + } => State::Running { + whip_resource_url: url.to_string(), + thread_handle: h.take(), + }, + _ => { + return Err(gst::error_msg!( + gst::ResourceError::Failed, + ["Expected to be in POST state"] + )); + } }; drop(state); @@ -776,29 +706,56 @@ impl WhipSink { }) }; - let ans_bytes = wait(&self.canceller, future, timeout)?; + match wait(&self.canceller, future, timeout) { + Ok(ans_bytes) => match sdp_message::SDPMessage::parse_buffer(&ans_bytes) { + Ok(ans_sdp) => { + let answer = gst_webrtc::WebRTCSessionDescription::new( + gst_webrtc::WebRTCSDPType::Answer, + ans_sdp, + ); + self.webrtcbin.emit_by_name::<()>( + "set-remote-description", + &[&answer, &None::], + ); + Ok(()) + } - match sdp_message::SDPMessage::parse_buffer(&ans_bytes) { - Ok(ans_sdp) => { - let answer = gst_webrtc::WebRTCSessionDescription::new( - gst_webrtc::WebRTCSDPType::Answer, - ans_sdp, - ); - Ok(answer) - } - - Err(e) => Err(gst::error_msg!( - gst::ResourceError::Failed, - ["Could not parse answer SDP: {}", e] - )), + Err(e) => Err(gst::error_msg!( + gst::ResourceError::Failed, + ["Could not parse answer SDP: {}", e] + )), + }, + Err(err) => match err { + WaitError::FutureAborted => Ok(()), + WaitError::FutureError(e) => Err(e), + }, } } s if s.is_redirection() => { if redirects < MAX_REDIRECTS { let mut state = self.state.lock().unwrap(); - *state = State::Post { - redirects: redirects + 1, + *state = match *state { + State::Post { + redirects: _r, + thread_handle: ref mut h, + } => State::Post { + redirects: redirects + 1, + thread_handle: h.take(), + }, + /* + * As per section 4.6 of the specification, redirection is + * not required to be supported for the PATCH and DELETE + * requests to the final WHEP resource URL. Only the initial + * POST request may support redirection. + */ + State::Running { .. } => { + return Err(gst::error_msg!( + gst::ResourceError::Failed, + ["Unexpected redirection in RUNNING state"] + )); + } + State::Stopped => unreachable!(), }; drop(state); @@ -812,7 +769,7 @@ impl WhipSink { ); self.do_post(offer, redirect_url) } - Err(e) => return Err(e), + Err(e) => Err(e), } } else { Err(gst::error_msg!( @@ -832,6 +789,13 @@ impl WhipSink { }) }; + let settings = self + .settings + .lock() + .expect("Failed to acquire settings lock"); + let timeout = settings.timeout; + drop(settings); + let resp = wait(&self.canceller, future, timeout) .map(|x| x.escape_ascii().to_string()) .unwrap_or_else(|_| "(no further details)".to_string()); @@ -842,24 +806,40 @@ impl WhipSink { ["Server returned error: {} - {}", s.as_str(), resp] )) } - }; - - res + } } fn terminate_session(&self) { let settings = self.settings.lock().unwrap(); - let state = self.state.lock().unwrap(); + let mut state = self.state.lock().unwrap(); let timeout = settings.timeout; - let resource_url = match *state { + let resource_url; + + (*state, resource_url) = match *state { State::Running { - ref whip_resource_url, - } => whip_resource_url.clone(), + whip_resource_url: ref resource_url, + thread_handle: ref mut h, + } => { + if let Some(th) = h.take() { + match th.join() { + Ok(_) => { + gst::debug!(CAT, imp: self, "Send offer thread joined successfully"); + } + Err(e) => gst::error!(CAT, imp: self, "Failed to join thread: {:?}", e), + } + } + (State::Stopped, resource_url.clone()) + } _ => { - gst::error!(CAT, imp: self, "Terminated in unexpected state"); + element_imp_error!( + self, + gst::ResourceError::Failed, + ["Terminated in unexpected state"] + ); return; } }; + drop(state); let mut headermap = HeaderMap::new(); @@ -893,9 +873,14 @@ impl WhipSink { Ok(r) => { gst::debug!(CAT, imp: self, "Response to DELETE : {}", r.status()); } - Err(e) => { - gst::error!(CAT, imp: self, "{}", e); - } + Err(e) => match e { + WaitError::FutureAborted => { + gst::warning!(CAT, imp: self, "DELETE request aborted") + } + WaitError::FutureError(e) => { + gst::error!(CAT, imp: self, "Error on DELETE request : {}", e) + } + }, }; } }