mirror of
https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs.git
synced 2025-01-25 02:18:12 +00:00
webrtchttp: whipsink: Miscellaneous clean up
Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1008>
This commit is contained in:
parent
baf3da86cc
commit
420716fb63
1 changed files with 68 additions and 110 deletions
|
@ -7,7 +7,7 @@
|
|||
//
|
||||
// SPDX-License-Identifier: MPL-2.0
|
||||
|
||||
use crate::utils::{build_reqwest_client, parse_redirect_location, wait};
|
||||
use crate::utils::{build_reqwest_client, parse_redirect_location, set_ice_servers, wait};
|
||||
use futures::future;
|
||||
use gst::glib;
|
||||
use gst::prelude::*;
|
||||
|
@ -16,7 +16,6 @@ use gst::ErrorMessage;
|
|||
use gst_sdp::*;
|
||||
use gst_webrtc::*;
|
||||
use once_cell::sync::Lazy;
|
||||
use parse_link_header;
|
||||
use reqwest::header::HeaderMap;
|
||||
use reqwest::header::HeaderValue;
|
||||
use reqwest::StatusCode;
|
||||
|
@ -135,7 +134,36 @@ impl ElementImpl for WhipSink {
|
|||
&self,
|
||||
transition: gst::StateChange,
|
||||
) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
|
||||
let ret = self.parent_change_state(transition);
|
||||
if transition == gst::StateChange::NullToReady {
|
||||
/*
|
||||
* Fail the state change if WHIP endpoint has not been set by the
|
||||
* time ReadyToPaused transition happens. This prevents us from
|
||||
* having to check this everywhere else.
|
||||
*/
|
||||
let settings = self.settings.lock().unwrap();
|
||||
|
||||
if settings.whip_endpoint.is_none() {
|
||||
gst::error!(CAT, imp: self, "WHIP endpoint URL must be set");
|
||||
return Err(gst::StateChangeError);
|
||||
}
|
||||
|
||||
/*
|
||||
* Check if we have a valid URL. We can be assured any further URL
|
||||
* handling won't fail due to invalid URLs.
|
||||
*/
|
||||
if let Err(e) = reqwest::Url::parse(settings.whip_endpoint.as_ref().unwrap().as_str()) {
|
||||
gst::error!(
|
||||
CAT,
|
||||
imp: self,
|
||||
"WHIP endpoint URL could not be parsed: {}",
|
||||
e
|
||||
);
|
||||
|
||||
return Err(gst::StateChangeError);
|
||||
}
|
||||
drop(settings);
|
||||
}
|
||||
|
||||
if transition == gst::StateChange::PausedToReady {
|
||||
// Interrupt requests in progress, if any
|
||||
if let Some(canceller) = &*self.canceller.lock().unwrap() {
|
||||
|
@ -150,6 +178,8 @@ impl ElementImpl for WhipSink {
|
|||
}
|
||||
}
|
||||
|
||||
let ret = self.parent_change_state(transition);
|
||||
|
||||
ret
|
||||
}
|
||||
}
|
||||
|
@ -389,16 +419,11 @@ impl WhipSink {
|
|||
.headers(headermap)
|
||||
.send();
|
||||
|
||||
let resp = match wait(&self.canceller, future) {
|
||||
Ok(r) => r,
|
||||
Err(e) => {
|
||||
return Err(e);
|
||||
}
|
||||
};
|
||||
let resp = wait(&self.canceller, future)?;
|
||||
|
||||
match resp.status() {
|
||||
StatusCode::NO_CONTENT => {
|
||||
self.set_ice_servers(resp.headers());
|
||||
set_ice_servers(&self.webrtcbin, resp.headers())?;
|
||||
Ok(())
|
||||
}
|
||||
status if status.is_redirection() => {
|
||||
|
@ -439,93 +464,6 @@ impl WhipSink {
|
|||
}
|
||||
}
|
||||
|
||||
fn set_ice_servers(&self, headermap: &HeaderMap) {
|
||||
for link in headermap.get_all("link").iter() {
|
||||
let link = link
|
||||
.to_str()
|
||||
.expect("Header value should contain only visible ASCII strings");
|
||||
// FIXME: The Demo WHIP Server appends an extra ; at the end of each link
|
||||
// but not needed as per https://datatracker.ietf.org/doc/html/rfc8288#section-3.5
|
||||
let link = link.trim_matches(';');
|
||||
|
||||
let item_map = parse_link_header::parse_with_rel(link);
|
||||
if let Err(e) = item_map {
|
||||
gst::error!(CAT, imp: self, "set_ice_servers {} - Error {:?}", link, e);
|
||||
continue;
|
||||
}
|
||||
|
||||
let item_map = item_map.unwrap();
|
||||
if !item_map.contains_key("ice-server") {
|
||||
// Not a link header we care about
|
||||
continue;
|
||||
}
|
||||
|
||||
let link = item_map.get("ice-server").unwrap();
|
||||
|
||||
// Note: webrtcbin needs ice servers to be in the below format
|
||||
// <scheme>://<user:pass>@<url>
|
||||
// and the ice-servers (link headers) received from the whip server might be
|
||||
// in the format <scheme>:<host> with username and password as separate params.
|
||||
// Constructing these with 'url' crate also require a format/parse
|
||||
// for changing <scheme>:<host> to <scheme>://<user>:<password>@<host>.
|
||||
// So preferred to use the String rather
|
||||
|
||||
let mut ice_server_url;
|
||||
|
||||
// check if uri has ://
|
||||
if link.uri.has_authority() {
|
||||
// use raw_uri as is
|
||||
// username and password in the link.uri.params ignored
|
||||
ice_server_url = link.raw_uri.as_str().to_string();
|
||||
} else {
|
||||
// construct url as '<scheme>://<user:pass>@<url>'
|
||||
ice_server_url = format!("{}://", link.uri.scheme());
|
||||
if let Some(user) = link.params.get("username") {
|
||||
ice_server_url += user.as_str();
|
||||
if let Some(pass) = link.params.get("credential") {
|
||||
ice_server_url = ice_server_url + ":" + pass.as_str();
|
||||
}
|
||||
ice_server_url += "@";
|
||||
}
|
||||
|
||||
// the raw_uri contains the ice-server in the form <scheme>:<url>
|
||||
// so strip the scheme and the ':' from the beginning of raw_uri and use
|
||||
// the rest of raw_uri to append it the url which will be in the form
|
||||
// <scheme>://<user:pass>@<url> as expected
|
||||
ice_server_url += link
|
||||
.raw_uri
|
||||
.strip_prefix((link.uri.scheme().to_owned() + ":").as_str())
|
||||
.expect("strip 'scheme:' from raw uri");
|
||||
}
|
||||
|
||||
gst::debug!(
|
||||
CAT,
|
||||
imp: self,
|
||||
"Setting STUN/TURN server {}",
|
||||
ice_server_url
|
||||
);
|
||||
|
||||
// It's icer to not collapse the `else if` and its inner `if`
|
||||
#[allow(clippy::collapsible_if)]
|
||||
if link.uri.scheme() == "stun" {
|
||||
self.webrtcbin
|
||||
.set_property_from_str("stun-server", ice_server_url.as_str());
|
||||
} else if link.uri.scheme().starts_with("turn") {
|
||||
if !self
|
||||
.webrtcbin
|
||||
.emit_by_name::<bool>("add-turn-server", &[&ice_server_url.as_str()])
|
||||
{
|
||||
gst::error!(
|
||||
CAT,
|
||||
imp: self,
|
||||
"Falied to set turn server {}",
|
||||
ice_server_url
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn send_offer(
|
||||
&self,
|
||||
offer: gst_webrtc::WebRTCSessionDescription,
|
||||
|
@ -613,12 +551,7 @@ impl WhipSink {
|
|||
.body(body)
|
||||
.send();
|
||||
|
||||
let resp = match wait(&self.canceller, future) {
|
||||
Ok(r) => r,
|
||||
Err(e) => {
|
||||
return Err(e);
|
||||
}
|
||||
};
|
||||
let resp = wait(&self.canceller, future)?;
|
||||
|
||||
let res = match resp.status() {
|
||||
StatusCode::OK | StatusCode::CREATED => {
|
||||
|
@ -633,20 +566,45 @@ impl WhipSink {
|
|||
// So we want to construct the full url of the resource
|
||||
// using the endpoint url i.e., replace the end point path with
|
||||
// resource path
|
||||
let location = resp.headers().get(reqwest::header::LOCATION).unwrap();
|
||||
let mut url = reqwest::Url::parse(endpoint.as_str()).unwrap();
|
||||
let location = match resp.headers().get(reqwest::header::LOCATION) {
|
||||
Some(location) => location,
|
||||
None => {
|
||||
return Err(gst::error_msg!(
|
||||
gst::ResourceError::Failed,
|
||||
["Location header field should be present for WHIP resource URL"]
|
||||
));
|
||||
}
|
||||
};
|
||||
|
||||
let location = match location.to_str() {
|
||||
Ok(loc) => loc,
|
||||
Err(e) => {
|
||||
return Err(gst::error_msg!(
|
||||
gst::ResourceError::Failed,
|
||||
["Failed to convert location to string {}", e]
|
||||
));
|
||||
}
|
||||
};
|
||||
|
||||
let url = reqwest::Url::parse(endpoint.as_str()).unwrap();
|
||||
|
||||
gst::debug!(CAT, imp: self, "WHIP resource: {:?}", location);
|
||||
|
||||
let url = url.join(location).map_err(|err| {
|
||||
gst::error_msg!(
|
||||
gst::ResourceError::Failed,
|
||||
["URL join operation failed: {:?}", err]
|
||||
)
|
||||
})?;
|
||||
|
||||
url.set_path(location.to_str().unwrap());
|
||||
let mut state = self.state.lock().unwrap();
|
||||
*state = State::Running {
|
||||
whip_resource_url: url.to_string(),
|
||||
};
|
||||
drop(state);
|
||||
|
||||
let ans_bytes = match wait(&self.canceller, resp.bytes()) {
|
||||
Ok(ans) => ans.to_vec(),
|
||||
Err(e) => return Err(e),
|
||||
};
|
||||
let ans_bytes = wait(&self.canceller, resp.bytes())?;
|
||||
|
||||
match sdp_message::SDPMessage::parse_buffer(&ans_bytes) {
|
||||
Ok(ans_sdp) => {
|
||||
let answer = gst_webrtc::WebRTCSessionDescription::new(
|
||||
|
|
Loading…
Reference in a new issue