webrtchttp: whipsink: Miscellaneous clean up

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/949>
This commit is contained in:
Sanchayan Maity 2022-10-27 18:06:26 +05:30
parent b427cb6a3d
commit b992596236

View file

@ -7,7 +7,7 @@
//
// SPDX-License-Identifier: MPL-2.0
use crate::utils::{build_reqwest_client, parse_redirect_location, wait};
use crate::utils::{build_reqwest_client, parse_redirect_location, set_ice_servers, wait};
use futures::future;
use gst::glib;
use gst::prelude::*;
@ -16,7 +16,6 @@ use gst::ErrorMessage;
use gst_sdp::*;
use gst_webrtc::*;
use once_cell::sync::Lazy;
use parse_link_header;
use reqwest::header::HeaderMap;
use reqwest::header::HeaderValue;
use reqwest::StatusCode;
@ -135,7 +134,36 @@ impl ElementImpl for WhipSink {
&self,
transition: gst::StateChange,
) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
let ret = self.parent_change_state(transition);
if transition == gst::StateChange::NullToReady {
/*
* Fail the state change if WHIP endpoint has not been set by the
* time ReadyToPaused transition happens. This prevents us from
* having to check this everywhere else.
*/
let settings = self.settings.lock().unwrap();
if settings.whip_endpoint.is_none() {
gst::error!(CAT, imp: self, "WHIP endpoint URL must be set");
return Err(gst::StateChangeError);
}
/*
* Check if we have a valid URL. We can be assured any further URL
* handling won't fail due to invalid URLs.
*/
if let Err(e) = reqwest::Url::parse(settings.whip_endpoint.as_ref().unwrap().as_str()) {
gst::error!(
CAT,
imp: self,
"WHIP endpoint URL could not be parsed: {}",
e
);
return Err(gst::StateChangeError);
}
drop(settings);
}
if transition == gst::StateChange::PausedToReady {
// Interrupt requests in progress, if any
if let Some(canceller) = &*self.canceller.lock().unwrap() {
@ -150,6 +178,8 @@ impl ElementImpl for WhipSink {
}
}
let ret = self.parent_change_state(transition);
ret
}
}
@ -389,16 +419,11 @@ impl WhipSink {
.headers(headermap)
.send();
let resp = match wait(&self.canceller, future) {
Ok(r) => r,
Err(e) => {
return Err(e);
}
};
let resp = wait(&self.canceller, future)?;
match resp.status() {
StatusCode::NO_CONTENT => {
self.set_ice_servers(resp.headers());
set_ice_servers(&self.webrtcbin, resp.headers())?;
Ok(())
}
status if status.is_redirection() => {
@ -439,93 +464,6 @@ impl WhipSink {
}
}
fn set_ice_servers(&self, headermap: &HeaderMap) {
for link in headermap.get_all("link").iter() {
let link = link
.to_str()
.expect("Header value should contain only visible ASCII strings");
// FIXME: The Demo WHIP Server appends an extra ; at the end of each link
// but not needed as per https://datatracker.ietf.org/doc/html/rfc8288#section-3.5
let link = link.trim_matches(';');
let item_map = parse_link_header::parse_with_rel(link);
if let Err(e) = item_map {
gst::error!(CAT, imp: self, "set_ice_servers {} - Error {:?}", link, e);
continue;
}
let item_map = item_map.unwrap();
if !item_map.contains_key("ice-server") {
// Not a link header we care about
continue;
}
let link = item_map.get("ice-server").unwrap();
// Note: webrtcbin needs ice servers to be in the below format
// <scheme>://<user:pass>@<url>
// and the ice-servers (link headers) received from the whip server might be
// in the format <scheme>:<host> with username and password as separate params.
// Constructing these with 'url' crate also require a format/parse
// for changing <scheme>:<host> to <scheme>://<user>:<password>@<host>.
// So preferred to use the String rather
let mut ice_server_url;
// check if uri has ://
if link.uri.has_authority() {
// use raw_uri as is
// username and password in the link.uri.params ignored
ice_server_url = link.raw_uri.as_str().to_string();
} else {
// construct url as '<scheme>://<user:pass>@<url>'
ice_server_url = format!("{}://", link.uri.scheme());
if let Some(user) = link.params.get("username") {
ice_server_url += user.as_str();
if let Some(pass) = link.params.get("credential") {
ice_server_url = ice_server_url + ":" + pass.as_str();
}
ice_server_url += "@";
}
// the raw_uri contains the ice-server in the form <scheme>:<url>
// so strip the scheme and the ':' from the beginning of raw_uri and use
// the rest of raw_uri to append it the url which will be in the form
// <scheme>://<user:pass>@<url> as expected
ice_server_url += link
.raw_uri
.strip_prefix((link.uri.scheme().to_owned() + ":").as_str())
.expect("strip 'scheme:' from raw uri");
}
gst::debug!(
CAT,
imp: self,
"Setting STUN/TURN server {}",
ice_server_url
);
// It's icer to not collapse the `else if` and its inner `if`
#[allow(clippy::collapsible_if)]
if link.uri.scheme() == "stun" {
self.webrtcbin
.set_property_from_str("stun-server", ice_server_url.as_str());
} else if link.uri.scheme().starts_with("turn") {
if !self
.webrtcbin
.emit_by_name::<bool>("add-turn-server", &[&ice_server_url.as_str()])
{
gst::error!(
CAT,
imp: self,
"Falied to set turn server {}",
ice_server_url
);
}
}
}
}
fn send_offer(
&self,
offer: gst_webrtc::WebRTCSessionDescription,
@ -613,12 +551,7 @@ impl WhipSink {
.body(body)
.send();
let resp = match wait(&self.canceller, future) {
Ok(r) => r,
Err(e) => {
return Err(e);
}
};
let resp = wait(&self.canceller, future)?;
let res = match resp.status() {
StatusCode::OK | StatusCode::CREATED => {
@ -633,20 +566,45 @@ impl WhipSink {
// So we want to construct the full url of the resource
// using the endpoint url i.e., replace the end point path with
// resource path
let location = resp.headers().get(reqwest::header::LOCATION).unwrap();
let mut url = reqwest::Url::parse(endpoint.as_str()).unwrap();
let location = match resp.headers().get(reqwest::header::LOCATION) {
Some(location) => location,
None => {
return Err(gst::error_msg!(
gst::ResourceError::Failed,
["Location header field should be present for WHIP resource URL"]
));
}
};
let location = match location.to_str() {
Ok(loc) => loc,
Err(e) => {
return Err(gst::error_msg!(
gst::ResourceError::Failed,
["Failed to convert location to string {}", e]
));
}
};
let url = reqwest::Url::parse(endpoint.as_str()).unwrap();
gst::debug!(CAT, imp: self, "WHIP resource: {:?}", location);
let url = url.join(location).map_err(|err| {
gst::error_msg!(
gst::ResourceError::Failed,
["URL join operation failed: {:?}", err]
)
})?;
url.set_path(location.to_str().unwrap());
let mut state = self.state.lock().unwrap();
*state = State::Running {
whip_resource_url: url.to_string(),
};
drop(state);
let ans_bytes = match wait(&self.canceller, resp.bytes()) {
Ok(ans) => ans.to_vec(),
Err(e) => return Err(e),
};
let ans_bytes = wait(&self.canceller, resp.bytes())?;
match sdp_message::SDPMessage::parse_buffer(&ans_bytes) {
Ok(ans_sdp) => {
let answer = gst_webrtc::WebRTCSessionDescription::new(