From 420716fb63792c695ca886e4a659d08c21c2343a Mon Sep 17 00:00:00 2001 From: Sanchayan Maity Date: Thu, 27 Oct 2022 18:06:26 +0530 Subject: [PATCH] webrtchttp: whipsink: Miscellaneous clean up Part-of: --- net/webrtchttp/src/whipsink/imp.rs | 178 +++++++++++------------------ 1 file changed, 68 insertions(+), 110 deletions(-) diff --git a/net/webrtchttp/src/whipsink/imp.rs b/net/webrtchttp/src/whipsink/imp.rs index 360a30c0..4677824b 100644 --- a/net/webrtchttp/src/whipsink/imp.rs +++ b/net/webrtchttp/src/whipsink/imp.rs @@ -7,7 +7,7 @@ // // SPDX-License-Identifier: MPL-2.0 -use crate::utils::{build_reqwest_client, parse_redirect_location, wait}; +use crate::utils::{build_reqwest_client, parse_redirect_location, set_ice_servers, wait}; use futures::future; use gst::glib; use gst::prelude::*; @@ -16,7 +16,6 @@ use gst::ErrorMessage; use gst_sdp::*; use gst_webrtc::*; use once_cell::sync::Lazy; -use parse_link_header; use reqwest::header::HeaderMap; use reqwest::header::HeaderValue; use reqwest::StatusCode; @@ -135,7 +134,36 @@ impl ElementImpl for WhipSink { &self, transition: gst::StateChange, ) -> Result { - let ret = self.parent_change_state(transition); + if transition == gst::StateChange::NullToReady { + /* + * Fail the state change if WHIP endpoint has not been set by the + * time ReadyToPaused transition happens. This prevents us from + * having to check this everywhere else. + */ + let settings = self.settings.lock().unwrap(); + + if settings.whip_endpoint.is_none() { + gst::error!(CAT, imp: self, "WHIP endpoint URL must be set"); + return Err(gst::StateChangeError); + } + + /* + * Check if we have a valid URL. We can be assured any further URL + * handling won't fail due to invalid URLs. + */ + if let Err(e) = reqwest::Url::parse(settings.whip_endpoint.as_ref().unwrap().as_str()) { + gst::error!( + CAT, + imp: self, + "WHIP endpoint URL could not be parsed: {}", + e + ); + + return Err(gst::StateChangeError); + } + drop(settings); + } + if transition == gst::StateChange::PausedToReady { // Interrupt requests in progress, if any if let Some(canceller) = &*self.canceller.lock().unwrap() { @@ -150,6 +178,8 @@ impl ElementImpl for WhipSink { } } + let ret = self.parent_change_state(transition); + ret } } @@ -389,16 +419,11 @@ impl WhipSink { .headers(headermap) .send(); - let resp = match wait(&self.canceller, future) { - Ok(r) => r, - Err(e) => { - return Err(e); - } - }; + let resp = wait(&self.canceller, future)?; match resp.status() { StatusCode::NO_CONTENT => { - self.set_ice_servers(resp.headers()); + set_ice_servers(&self.webrtcbin, resp.headers())?; Ok(()) } status if status.is_redirection() => { @@ -439,93 +464,6 @@ impl WhipSink { } } - fn set_ice_servers(&self, headermap: &HeaderMap) { - for link in headermap.get_all("link").iter() { - let link = link - .to_str() - .expect("Header value should contain only visible ASCII strings"); - // FIXME: The Demo WHIP Server appends an extra ; at the end of each link - // but not needed as per https://datatracker.ietf.org/doc/html/rfc8288#section-3.5 - let link = link.trim_matches(';'); - - let item_map = parse_link_header::parse_with_rel(link); - if let Err(e) = item_map { - gst::error!(CAT, imp: self, "set_ice_servers {} - Error {:?}", link, e); - continue; - } - - let item_map = item_map.unwrap(); - if !item_map.contains_key("ice-server") { - // Not a link header we care about - continue; - } - - let link = item_map.get("ice-server").unwrap(); - - // Note: webrtcbin needs ice servers to be in the below format - // ://@ - // and the ice-servers (link headers) received from the whip server might be - // in the format : with username and password as separate params. - // Constructing these with 'url' crate also require a format/parse - // for changing : to ://:@. - // So preferred to use the String rather - - let mut ice_server_url; - - // check if uri has :// - if link.uri.has_authority() { - // use raw_uri as is - // username and password in the link.uri.params ignored - ice_server_url = link.raw_uri.as_str().to_string(); - } else { - // construct url as '://@' - ice_server_url = format!("{}://", link.uri.scheme()); - if let Some(user) = link.params.get("username") { - ice_server_url += user.as_str(); - if let Some(pass) = link.params.get("credential") { - ice_server_url = ice_server_url + ":" + pass.as_str(); - } - ice_server_url += "@"; - } - - // the raw_uri contains the ice-server in the form : - // so strip the scheme and the ':' from the beginning of raw_uri and use - // the rest of raw_uri to append it the url which will be in the form - // ://@ as expected - ice_server_url += link - .raw_uri - .strip_prefix((link.uri.scheme().to_owned() + ":").as_str()) - .expect("strip 'scheme:' from raw uri"); - } - - gst::debug!( - CAT, - imp: self, - "Setting STUN/TURN server {}", - ice_server_url - ); - - // It's icer to not collapse the `else if` and its inner `if` - #[allow(clippy::collapsible_if)] - if link.uri.scheme() == "stun" { - self.webrtcbin - .set_property_from_str("stun-server", ice_server_url.as_str()); - } else if link.uri.scheme().starts_with("turn") { - if !self - .webrtcbin - .emit_by_name::("add-turn-server", &[&ice_server_url.as_str()]) - { - gst::error!( - CAT, - imp: self, - "Falied to set turn server {}", - ice_server_url - ); - } - } - } - } - fn send_offer( &self, offer: gst_webrtc::WebRTCSessionDescription, @@ -613,12 +551,7 @@ impl WhipSink { .body(body) .send(); - let resp = match wait(&self.canceller, future) { - Ok(r) => r, - Err(e) => { - return Err(e); - } - }; + let resp = wait(&self.canceller, future)?; let res = match resp.status() { StatusCode::OK | StatusCode::CREATED => { @@ -633,20 +566,45 @@ impl WhipSink { // So we want to construct the full url of the resource // using the endpoint url i.e., replace the end point path with // resource path - let location = resp.headers().get(reqwest::header::LOCATION).unwrap(); - let mut url = reqwest::Url::parse(endpoint.as_str()).unwrap(); + let location = match resp.headers().get(reqwest::header::LOCATION) { + Some(location) => location, + None => { + return Err(gst::error_msg!( + gst::ResourceError::Failed, + ["Location header field should be present for WHIP resource URL"] + )); + } + }; + + let location = match location.to_str() { + Ok(loc) => loc, + Err(e) => { + return Err(gst::error_msg!( + gst::ResourceError::Failed, + ["Failed to convert location to string {}", e] + )); + } + }; + + let url = reqwest::Url::parse(endpoint.as_str()).unwrap(); + + gst::debug!(CAT, imp: self, "WHIP resource: {:?}", location); + + let url = url.join(location).map_err(|err| { + gst::error_msg!( + gst::ResourceError::Failed, + ["URL join operation failed: {:?}", err] + ) + })?; - url.set_path(location.to_str().unwrap()); let mut state = self.state.lock().unwrap(); *state = State::Running { whip_resource_url: url.to_string(), }; drop(state); - let ans_bytes = match wait(&self.canceller, resp.bytes()) { - Ok(ans) => ans.to_vec(), - Err(e) => return Err(e), - }; + let ans_bytes = wait(&self.canceller, resp.bytes())?; + match sdp_message::SDPMessage::parse_buffer(&ans_bytes) { Ok(ans_sdp) => { let answer = gst_webrtc::WebRTCSessionDescription::new(