mirror of
https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs.git
synced 2025-02-18 05:45:14 +00:00
webrtchttp: Factor out the common bits for WHIP and WHEP
Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/949>
This commit is contained in:
parent
6be5796888
commit
b427cb6a3d
4 changed files with 208 additions and 280 deletions
|
@ -15,6 +15,7 @@
|
||||||
* Since: plugins-rs-0.9.0
|
* Since: plugins-rs-0.9.0
|
||||||
*/
|
*/
|
||||||
use gst::glib;
|
use gst::glib;
|
||||||
|
mod utils;
|
||||||
mod whepsrc;
|
mod whepsrc;
|
||||||
mod whipsink;
|
mod whipsink;
|
||||||
|
|
||||||
|
|
192
net/webrtchttp/src/utils.rs
Normal file
192
net/webrtchttp/src/utils.rs
Normal file
|
@ -0,0 +1,192 @@
|
||||||
|
use futures::future;
|
||||||
|
use futures::prelude::*;
|
||||||
|
use gst::{prelude::*, ErrorMessage};
|
||||||
|
use once_cell::sync::Lazy;
|
||||||
|
use parse_link_header;
|
||||||
|
use reqwest::header::HeaderMap;
|
||||||
|
use reqwest::redirect::Policy;
|
||||||
|
use std::fmt::Display;
|
||||||
|
use std::sync::Mutex;
|
||||||
|
use tokio::runtime;
|
||||||
|
|
||||||
|
pub static RUNTIME: Lazy<runtime::Runtime> = Lazy::new(|| {
|
||||||
|
runtime::Builder::new_multi_thread()
|
||||||
|
.enable_all()
|
||||||
|
.worker_threads(1)
|
||||||
|
.thread_name("webrtc-http-runtime")
|
||||||
|
.build()
|
||||||
|
.unwrap()
|
||||||
|
});
|
||||||
|
|
||||||
|
pub fn wait<F, T, E>(
|
||||||
|
canceller: &Mutex<Option<future::AbortHandle>>,
|
||||||
|
future: F,
|
||||||
|
) -> Result<T, ErrorMessage>
|
||||||
|
where
|
||||||
|
F: Send + Future<Output = Result<T, E>>,
|
||||||
|
T: Send + 'static,
|
||||||
|
E: Send + Display,
|
||||||
|
{
|
||||||
|
let mut canceller_guard = canceller.lock().unwrap();
|
||||||
|
let (abort_handle, abort_registration) = future::AbortHandle::new_pair();
|
||||||
|
|
||||||
|
if canceller_guard.is_some() {
|
||||||
|
return Err(gst::error_msg!(
|
||||||
|
gst::ResourceError::Failed,
|
||||||
|
["Old Canceller should not exist"]
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
canceller_guard.replace(abort_handle);
|
||||||
|
drop(canceller_guard);
|
||||||
|
|
||||||
|
let future = async {
|
||||||
|
match future::Abortable::new(future, abort_registration).await {
|
||||||
|
Ok(Ok(res)) => Ok(res),
|
||||||
|
|
||||||
|
Ok(Err(err)) => Err(gst::error_msg!(
|
||||||
|
gst::ResourceError::Failed,
|
||||||
|
["Future resolved with an error {}", err.to_string()]
|
||||||
|
)),
|
||||||
|
|
||||||
|
Err(future::Aborted) => Err(gst::error_msg!(
|
||||||
|
gst::ResourceError::Failed,
|
||||||
|
["Canceller called before future resolved"]
|
||||||
|
)),
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let res = {
|
||||||
|
let _enter = RUNTIME.enter();
|
||||||
|
futures::executor::block_on(future)
|
||||||
|
};
|
||||||
|
|
||||||
|
canceller_guard = canceller.lock().unwrap();
|
||||||
|
*canceller_guard = None;
|
||||||
|
|
||||||
|
res
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn parse_redirect_location(
|
||||||
|
headermap: &HeaderMap,
|
||||||
|
old_url: &reqwest::Url,
|
||||||
|
) -> Result<reqwest::Url, ErrorMessage> {
|
||||||
|
let location = match headermap.get(reqwest::header::LOCATION) {
|
||||||
|
Some(location) => location,
|
||||||
|
None => {
|
||||||
|
return Err(gst::error_msg!(
|
||||||
|
gst::ResourceError::Failed,
|
||||||
|
["Location header field should be present for WHIP/WHEP resource URL"]
|
||||||
|
));
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let location = match location.to_str() {
|
||||||
|
Ok(loc) => loc,
|
||||||
|
Err(e) => {
|
||||||
|
return Err(gst::error_msg!(
|
||||||
|
gst::ResourceError::Failed,
|
||||||
|
["Failed to convert location to string {}", e]
|
||||||
|
));
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
match reqwest::Url::parse(location) {
|
||||||
|
Ok(url) => Ok(url), // Location URL is an absolute path
|
||||||
|
Err(_) => {
|
||||||
|
// Location URL is a relative path
|
||||||
|
let new_url = old_url.clone().join(location).map_err(|err| {
|
||||||
|
gst::error_msg!(
|
||||||
|
gst::ResourceError::Failed,
|
||||||
|
["URL join operation failed: {:?}", err]
|
||||||
|
)
|
||||||
|
})?;
|
||||||
|
|
||||||
|
Ok(new_url)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn build_reqwest_client(pol: Policy) -> reqwest::Client {
|
||||||
|
let client_builder = reqwest::Client::builder();
|
||||||
|
client_builder.redirect(pol).build().unwrap()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn set_ice_servers(
|
||||||
|
webrtcbin: &gst::Element,
|
||||||
|
headermap: &HeaderMap,
|
||||||
|
) -> Result<(), ErrorMessage> {
|
||||||
|
for link in headermap.get_all("link").iter() {
|
||||||
|
let link = link.to_str().map_err(|err| {
|
||||||
|
gst::error_msg!(
|
||||||
|
gst::ResourceError::Failed,
|
||||||
|
[
|
||||||
|
"Header value should contain only visible ASCII strings: {}",
|
||||||
|
err
|
||||||
|
]
|
||||||
|
)
|
||||||
|
})?;
|
||||||
|
|
||||||
|
let item_map = match parse_link_header::parse_with_rel(link) {
|
||||||
|
Ok(map) => map,
|
||||||
|
Err(_) => continue,
|
||||||
|
};
|
||||||
|
|
||||||
|
let link = match item_map.contains_key("ice-server") {
|
||||||
|
true => item_map.get("ice-server").unwrap(),
|
||||||
|
false => continue, // Not a link header we care about
|
||||||
|
};
|
||||||
|
|
||||||
|
// Note: webrtcbin needs ice servers to be in the below format
|
||||||
|
// <scheme>://<user:pass>@<url>
|
||||||
|
// and the ice-servers (link headers) received from the whip server might be
|
||||||
|
// in the format <scheme>:<host> with username and password as separate params.
|
||||||
|
// Constructing these with 'url' crate also require a format/parse
|
||||||
|
// for changing <scheme>:<host> to <scheme>://<user>:<password>@<host>.
|
||||||
|
// So preferred to use the String rather
|
||||||
|
|
||||||
|
let mut ice_server_url;
|
||||||
|
|
||||||
|
// check if uri has ://
|
||||||
|
if link.uri.has_authority() {
|
||||||
|
// use raw_uri as is
|
||||||
|
// username and password in the link.uri.params ignored
|
||||||
|
ice_server_url = link.raw_uri.as_str().to_string();
|
||||||
|
} else {
|
||||||
|
// No builder pattern is provided by reqwest::Url. Use string operation.
|
||||||
|
// construct url as '<scheme>://<user:pass>@<url>'
|
||||||
|
ice_server_url = format!("{}://", link.uri.scheme());
|
||||||
|
if let Some(user) = link.params.get("username") {
|
||||||
|
ice_server_url += user.as_str();
|
||||||
|
if let Some(pass) = link.params.get("credential") {
|
||||||
|
ice_server_url = ice_server_url + ":" + pass.as_str();
|
||||||
|
}
|
||||||
|
ice_server_url += "@";
|
||||||
|
}
|
||||||
|
|
||||||
|
// the raw_uri contains the ice-server in the form <scheme>:<url>
|
||||||
|
// so strip the scheme and the ':' from the beginning of raw_uri and use
|
||||||
|
// the rest of raw_uri to append it the url which will be in the form
|
||||||
|
// <scheme>://<user:pass>@<url> as expected
|
||||||
|
ice_server_url += link
|
||||||
|
.raw_uri
|
||||||
|
.strip_prefix((link.uri.scheme().to_owned() + ":").as_str())
|
||||||
|
.expect("strip 'scheme:' from raw uri");
|
||||||
|
}
|
||||||
|
|
||||||
|
// It's nicer to not collapse the `else if` and its inner `if`
|
||||||
|
#[allow(clippy::collapsible_if)]
|
||||||
|
if link.uri.scheme() == "stun" {
|
||||||
|
webrtcbin.set_property_from_str("stun-server", ice_server_url.as_str());
|
||||||
|
} else if link.uri.scheme().starts_with("turn") {
|
||||||
|
if !webrtcbin.emit_by_name::<bool>("add-turn-server", &[&ice_server_url.as_str()]) {
|
||||||
|
return Err(gst::error_msg!(
|
||||||
|
gst::ResourceError::Failed,
|
||||||
|
["Failed to set turn server {}", ice_server_url]
|
||||||
|
));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
|
@ -7,24 +7,16 @@
|
||||||
//
|
//
|
||||||
// SPDX-License-Identifier: MPL-2.0
|
// SPDX-License-Identifier: MPL-2.0
|
||||||
|
|
||||||
|
use crate::utils::{build_reqwest_client, parse_redirect_location, set_ice_servers, wait};
|
||||||
use crate::GstRsWebRTCICETransportPolicy;
|
use crate::GstRsWebRTCICETransportPolicy;
|
||||||
|
use bytes::Bytes;
|
||||||
use futures::future;
|
use futures::future;
|
||||||
use futures::prelude::*;
|
|
||||||
use tokio::runtime;
|
|
||||||
|
|
||||||
use gst::{element_imp_error, glib, prelude::*, subclass::prelude::*, ErrorMessage};
|
use gst::{element_imp_error, glib, prelude::*, subclass::prelude::*, ErrorMessage};
|
||||||
use gst_sdp::*;
|
use gst_sdp::*;
|
||||||
use gst_webrtc::*;
|
use gst_webrtc::*;
|
||||||
|
|
||||||
use bytes::Bytes;
|
|
||||||
use once_cell::sync::Lazy;
|
use once_cell::sync::Lazy;
|
||||||
use parse_link_header;
|
|
||||||
|
|
||||||
use reqwest::header::{HeaderMap, HeaderValue};
|
use reqwest::header::{HeaderMap, HeaderValue};
|
||||||
use reqwest::redirect::Policy;
|
|
||||||
use reqwest::StatusCode;
|
use reqwest::StatusCode;
|
||||||
|
|
||||||
use std::fmt::Display;
|
|
||||||
use std::sync::Mutex;
|
use std::sync::Mutex;
|
||||||
|
|
||||||
static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
|
static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
|
||||||
|
@ -35,14 +27,6 @@ static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
|
||||||
)
|
)
|
||||||
});
|
});
|
||||||
|
|
||||||
static RUNTIME: Lazy<runtime::Runtime> = Lazy::new(|| {
|
|
||||||
runtime::Builder::new_multi_thread()
|
|
||||||
.enable_all()
|
|
||||||
.worker_threads(1)
|
|
||||||
.build()
|
|
||||||
.unwrap()
|
|
||||||
});
|
|
||||||
|
|
||||||
const DEFAULT_ICE_TRANSPORT_POLICY: GstRsWebRTCICETransportPolicy =
|
const DEFAULT_ICE_TRANSPORT_POLICY: GstRsWebRTCICETransportPolicy =
|
||||||
GstRsWebRTCICETransportPolicy::All;
|
GstRsWebRTCICETransportPolicy::All;
|
||||||
const MAX_REDIRECTS: u8 = 10;
|
const MAX_REDIRECTS: u8 = 10;
|
||||||
|
@ -618,7 +602,7 @@ impl WhepSrc {
|
||||||
.lock()
|
.lock()
|
||||||
.expect("Failed to acquire settings lock");
|
.expect("Failed to acquire settings lock");
|
||||||
if settings.use_link_headers {
|
if settings.use_link_headers {
|
||||||
self.set_ice_servers(resp.headers());
|
set_ice_servers(&self.webrtcbin, resp.headers())?;
|
||||||
}
|
}
|
||||||
drop(settings);
|
drop(settings);
|
||||||
|
|
||||||
|
@ -660,10 +644,7 @@ impl WhepSrc {
|
||||||
};
|
};
|
||||||
drop(state);
|
drop(state);
|
||||||
|
|
||||||
let ans_bytes = match self.wait(resp.bytes()) {
|
let ans_bytes = wait(&self.canceller, resp.bytes())?;
|
||||||
Ok(ans) => ans,
|
|
||||||
Err(e) => return Err(e),
|
|
||||||
};
|
|
||||||
|
|
||||||
self.sdp_message_parse(ans_bytes)
|
self.sdp_message_parse(ans_bytes)
|
||||||
}
|
}
|
||||||
|
@ -717,7 +698,7 @@ impl WhepSrc {
|
||||||
[
|
[
|
||||||
"Unexpected response: {} - {}",
|
"Unexpected response: {} - {}",
|
||||||
s.as_str(),
|
s.as_str(),
|
||||||
self.wait(resp.bytes())
|
wait(&self.canceller, resp.bytes())
|
||||||
.map(|x| x.escape_ascii().to_string())
|
.map(|x| x.escape_ascii().to_string())
|
||||||
.unwrap_or_else(|_| "(no further details)".to_string())
|
.unwrap_or_else(|_| "(no further details)".to_string())
|
||||||
]
|
]
|
||||||
|
@ -874,96 +855,6 @@ impl WhepSrc {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Taken from WHIP sink
|
|
||||||
fn set_ice_servers(&self, headermap: &HeaderMap) {
|
|
||||||
for link in headermap.get_all("link").iter() {
|
|
||||||
let link = link
|
|
||||||
.to_str()
|
|
||||||
.expect("Header value should contain only visible ASCII strings");
|
|
||||||
|
|
||||||
let item_map = match parse_link_header::parse_with_rel(link) {
|
|
||||||
Ok(map) => map,
|
|
||||||
Err(e) => {
|
|
||||||
gst::warning!(
|
|
||||||
CAT,
|
|
||||||
imp: self,
|
|
||||||
"Failed to set ICE server {} due to {:?}",
|
|
||||||
link,
|
|
||||||
e
|
|
||||||
);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
let link = match item_map.contains_key("ice-server") {
|
|
||||||
true => item_map.get("ice-server").unwrap(),
|
|
||||||
false => continue, // Not a link header we care about
|
|
||||||
};
|
|
||||||
|
|
||||||
// Note: webrtcbin needs ice servers to be in the below format
|
|
||||||
// <scheme>://<user:pass>@<url>
|
|
||||||
// and the ice-servers (link headers) received from the whip server might be
|
|
||||||
// in the format <scheme>:<host> with username and password as separate params.
|
|
||||||
// Constructing these with 'url' crate also require a format/parse
|
|
||||||
// for changing <scheme>:<host> to <scheme>://<user>:<password>@<host>.
|
|
||||||
// So preferred to use the String rather
|
|
||||||
|
|
||||||
let mut ice_server_url;
|
|
||||||
|
|
||||||
// check if uri has ://
|
|
||||||
if link.uri.has_authority() {
|
|
||||||
// use raw_uri as is
|
|
||||||
// username and password in the link.uri.params ignored
|
|
||||||
ice_server_url = link.raw_uri.as_str().to_string();
|
|
||||||
} else {
|
|
||||||
// construct url as '<scheme>://<user:pass>@<url>'
|
|
||||||
ice_server_url = format!("{}://", link.uri.scheme());
|
|
||||||
if let Some(user) = link.params.get("username") {
|
|
||||||
ice_server_url += user.as_str();
|
|
||||||
if let Some(pass) = link.params.get("credential") {
|
|
||||||
ice_server_url = ice_server_url + ":" + pass.as_str();
|
|
||||||
}
|
|
||||||
ice_server_url += "@";
|
|
||||||
}
|
|
||||||
|
|
||||||
// the raw_uri contains the ice-server in the form <scheme>:<url>
|
|
||||||
// so strip the scheme and the ':' from the beginning of raw_uri and use
|
|
||||||
// the rest of raw_uri to append it the url which will be in the form
|
|
||||||
// <scheme>://<user:pass>@<url> as expected
|
|
||||||
ice_server_url += link
|
|
||||||
.raw_uri
|
|
||||||
.strip_prefix((link.uri.scheme().to_owned() + ":").as_str())
|
|
||||||
.expect("strip 'scheme:' from raw uri");
|
|
||||||
}
|
|
||||||
|
|
||||||
gst::info!(
|
|
||||||
CAT,
|
|
||||||
imp: self,
|
|
||||||
"Setting STUN/TURN server {}",
|
|
||||||
ice_server_url
|
|
||||||
);
|
|
||||||
|
|
||||||
// It's nicer to not collapse the `else if` and its inner `if`
|
|
||||||
#[allow(clippy::collapsible_if)]
|
|
||||||
if link.uri.scheme() == "stun" {
|
|
||||||
self.webrtcbin
|
|
||||||
.set_property_from_str("stun-server", ice_server_url.as_str());
|
|
||||||
} else if link.uri.scheme().starts_with("turn") {
|
|
||||||
if !self
|
|
||||||
.webrtcbin
|
|
||||||
.emit_by_name::<bool>("add-turn-server", &[&ice_server_url.as_str()])
|
|
||||||
{
|
|
||||||
gst::error!(
|
|
||||||
CAT,
|
|
||||||
imp: self,
|
|
||||||
"Failed to set turn server {}",
|
|
||||||
ice_server_url
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn send_sdp(&self, sdp: SDPMessage) -> Result<(), gst::ErrorMessage> {
|
fn send_sdp(&self, sdp: SDPMessage) -> Result<(), gst::ErrorMessage> {
|
||||||
let sess_desc = WebRTCSessionDescription::new(WebRTCSDPType::Offer, sdp);
|
let sess_desc = WebRTCSessionDescription::new(WebRTCSDPType::Offer, sdp);
|
||||||
let settings = self.settings.lock().unwrap();
|
let settings = self.settings.lock().unwrap();
|
||||||
|
@ -1025,7 +916,7 @@ impl WhepSrc {
|
||||||
.body(body)
|
.body(body)
|
||||||
.send();
|
.send();
|
||||||
|
|
||||||
let resp = self.wait(future)?;
|
let resp = wait(&self.canceller, future)?;
|
||||||
|
|
||||||
self.parse_endpoint_response(endpoint, 0, resp)
|
self.parse_endpoint_response(endpoint, 0, resp)
|
||||||
}
|
}
|
||||||
|
@ -1068,7 +959,7 @@ impl WhepSrc {
|
||||||
let client = build_reqwest_client(reqwest::redirect::Policy::default());
|
let client = build_reqwest_client(reqwest::redirect::Policy::default());
|
||||||
let future = client.delete(resource_url).headers(headermap).send();
|
let future = client.delete(resource_url).headers(headermap).send();
|
||||||
|
|
||||||
let res = self.wait(future);
|
let res = wait(&self.canceller, future);
|
||||||
match res {
|
match res {
|
||||||
Ok(r) => {
|
Ok(r) => {
|
||||||
gst::debug!(CAT, imp: self, "Response to DELETE : {}", r.status());
|
gst::debug!(CAT, imp: self, "Response to DELETE : {}", r.status());
|
||||||
|
@ -1078,74 +969,4 @@ impl WhepSrc {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
fn wait<F, T, E>(&self, future: F) -> Result<T, ErrorMessage>
|
|
||||||
where
|
|
||||||
F: Send + Future<Output = Result<T, E>>,
|
|
||||||
T: Send + 'static,
|
|
||||||
E: Send + Display,
|
|
||||||
{
|
|
||||||
let mut canceller = self.canceller.lock().unwrap();
|
|
||||||
let (abort_handle, abort_registration) = future::AbortHandle::new_pair();
|
|
||||||
|
|
||||||
canceller.replace(abort_handle);
|
|
||||||
drop(canceller);
|
|
||||||
|
|
||||||
let future = async {
|
|
||||||
match future::Abortable::new(future, abort_registration).await {
|
|
||||||
Ok(Ok(res)) => Ok(res),
|
|
||||||
|
|
||||||
Ok(Err(err)) => Err(gst::error_msg!(
|
|
||||||
gst::ResourceError::Failed,
|
|
||||||
["Future resolved with an error {}", err.to_string()]
|
|
||||||
)),
|
|
||||||
|
|
||||||
Err(future::Aborted) => Err(gst::error_msg!(
|
|
||||||
gst::ResourceError::Failed,
|
|
||||||
["Canceller called before future resolved"]
|
|
||||||
)),
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
let res = {
|
|
||||||
let _enter = RUNTIME.enter();
|
|
||||||
futures::executor::block_on(future)
|
|
||||||
};
|
|
||||||
|
|
||||||
let _ = self.canceller.lock().unwrap().take();
|
|
||||||
res
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn parse_redirect_location(
|
|
||||||
headermap: &HeaderMap,
|
|
||||||
old_url: &reqwest::Url,
|
|
||||||
) -> Result<reqwest::Url, ErrorMessage> {
|
|
||||||
let location = headermap.get(reqwest::header::LOCATION).unwrap();
|
|
||||||
if let Err(e) = location.to_str() {
|
|
||||||
return Err(gst::error_msg!(
|
|
||||||
gst::ResourceError::Failed,
|
|
||||||
[
|
|
||||||
"Failed to convert the redirect location to string {}",
|
|
||||||
e.to_string()
|
|
||||||
]
|
|
||||||
));
|
|
||||||
}
|
|
||||||
let location = location.to_str().unwrap();
|
|
||||||
|
|
||||||
if location.to_ascii_lowercase().starts_with("http") {
|
|
||||||
// Location URL is an absolute path
|
|
||||||
reqwest::Url::parse(location)
|
|
||||||
.map_err(|e| gst::error_msg!(gst::ResourceError::Failed, ["{}", e.to_string()]))
|
|
||||||
} else {
|
|
||||||
// Location URL is a relative path
|
|
||||||
let mut new_url = old_url.clone();
|
|
||||||
new_url.set_path(location);
|
|
||||||
Ok(new_url)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn build_reqwest_client(pol: Policy) -> reqwest::Client {
|
|
||||||
let client_builder = reqwest::Client::builder();
|
|
||||||
client_builder.redirect(pol).build().unwrap()
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -7,8 +7,8 @@
|
||||||
//
|
//
|
||||||
// SPDX-License-Identifier: MPL-2.0
|
// SPDX-License-Identifier: MPL-2.0
|
||||||
|
|
||||||
|
use crate::utils::{build_reqwest_client, parse_redirect_location, wait};
|
||||||
use futures::future;
|
use futures::future;
|
||||||
use futures::prelude::*;
|
|
||||||
use gst::glib;
|
use gst::glib;
|
||||||
use gst::prelude::*;
|
use gst::prelude::*;
|
||||||
use gst::subclass::prelude::*;
|
use gst::subclass::prelude::*;
|
||||||
|
@ -19,24 +19,13 @@ use once_cell::sync::Lazy;
|
||||||
use parse_link_header;
|
use parse_link_header;
|
||||||
use reqwest::header::HeaderMap;
|
use reqwest::header::HeaderMap;
|
||||||
use reqwest::header::HeaderValue;
|
use reqwest::header::HeaderValue;
|
||||||
use reqwest::redirect::Policy;
|
|
||||||
use reqwest::StatusCode;
|
use reqwest::StatusCode;
|
||||||
use std::fmt::Display;
|
|
||||||
use std::sync::Mutex;
|
use std::sync::Mutex;
|
||||||
use tokio::runtime;
|
|
||||||
|
|
||||||
static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
|
static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
|
||||||
gst::DebugCategory::new("whipsink", gst::DebugColorFlags::empty(), Some("WHIP Sink"))
|
gst::DebugCategory::new("whipsink", gst::DebugColorFlags::empty(), Some("WHIP Sink"))
|
||||||
});
|
});
|
||||||
|
|
||||||
static RUNTIME: Lazy<runtime::Runtime> = Lazy::new(|| {
|
|
||||||
runtime::Builder::new_multi_thread()
|
|
||||||
.enable_all()
|
|
||||||
.worker_threads(1)
|
|
||||||
.build()
|
|
||||||
.unwrap()
|
|
||||||
});
|
|
||||||
|
|
||||||
const MAX_REDIRECTS: u8 = 10;
|
const MAX_REDIRECTS: u8 = 10;
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
|
@ -400,7 +389,7 @@ impl WhipSink {
|
||||||
.headers(headermap)
|
.headers(headermap)
|
||||||
.send();
|
.send();
|
||||||
|
|
||||||
let resp = match self.wait(future) {
|
let resp = match wait(&self.canceller, future) {
|
||||||
Ok(r) => r,
|
Ok(r) => r,
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
return Err(e);
|
return Err(e);
|
||||||
|
@ -444,7 +433,7 @@ impl WhipSink {
|
||||||
[
|
[
|
||||||
"lookup_ice_servers - Unexpected response {} {:?}",
|
"lookup_ice_servers - Unexpected response {} {:?}",
|
||||||
status,
|
status,
|
||||||
self.wait(resp.bytes()).unwrap()
|
wait(&self.canceller, resp.bytes()).unwrap()
|
||||||
]
|
]
|
||||||
)),
|
)),
|
||||||
}
|
}
|
||||||
|
@ -624,7 +613,7 @@ impl WhipSink {
|
||||||
.body(body)
|
.body(body)
|
||||||
.send();
|
.send();
|
||||||
|
|
||||||
let resp = match self.wait(future) {
|
let resp = match wait(&self.canceller, future) {
|
||||||
Ok(r) => r,
|
Ok(r) => r,
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
return Err(e);
|
return Err(e);
|
||||||
|
@ -654,7 +643,7 @@ impl WhipSink {
|
||||||
};
|
};
|
||||||
drop(state);
|
drop(state);
|
||||||
|
|
||||||
let ans_bytes = match self.wait(resp.bytes()) {
|
let ans_bytes = match wait(&self.canceller, resp.bytes()) {
|
||||||
Ok(ans) => ans.to_vec(),
|
Ok(ans) => ans.to_vec(),
|
||||||
Err(e) => return Err(e),
|
Err(e) => return Err(e),
|
||||||
};
|
};
|
||||||
|
@ -709,7 +698,7 @@ impl WhipSink {
|
||||||
[
|
[
|
||||||
"Server returned error: {} - {}",
|
"Server returned error: {} - {}",
|
||||||
s.as_str(),
|
s.as_str(),
|
||||||
self.wait(resp.bytes())
|
wait(&self.canceller, resp.bytes())
|
||||||
.map(|x| x.escape_ascii().to_string())
|
.map(|x| x.escape_ascii().to_string())
|
||||||
.unwrap_or_else(|_| "(no further details)".to_string())
|
.unwrap_or_else(|_| "(no further details)".to_string())
|
||||||
]
|
]
|
||||||
|
@ -721,7 +710,7 @@ impl WhipSink {
|
||||||
[
|
[
|
||||||
"Unexpected response {:?} {:?}",
|
"Unexpected response {:?} {:?}",
|
||||||
s,
|
s,
|
||||||
self.wait(resp.bytes()).unwrap()
|
wait(&self.canceller, resp.bytes()).unwrap()
|
||||||
]
|
]
|
||||||
)),
|
)),
|
||||||
};
|
};
|
||||||
|
@ -757,7 +746,7 @@ impl WhipSink {
|
||||||
let client = build_reqwest_client(reqwest::redirect::Policy::default());
|
let client = build_reqwest_client(reqwest::redirect::Policy::default());
|
||||||
let future = client.delete(resource_url).headers(headermap).send();
|
let future = client.delete(resource_url).headers(headermap).send();
|
||||||
|
|
||||||
let res = self.wait(future);
|
let res = wait(&self.canceller, future);
|
||||||
match res {
|
match res {
|
||||||
Ok(r) => {
|
Ok(r) => {
|
||||||
gst::debug!(CAT, imp: self, "Response to DELETE : {}", r.status());
|
gst::debug!(CAT, imp: self, "Response to DELETE : {}", r.status());
|
||||||
|
@ -767,79 +756,4 @@ impl WhipSink {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
fn wait<F, T, E>(&self, future: F) -> Result<T, ErrorMessage>
|
|
||||||
where
|
|
||||||
F: Send + Future<Output = Result<T, E>>,
|
|
||||||
T: Send + 'static,
|
|
||||||
E: Send + Display,
|
|
||||||
{
|
|
||||||
let mut canceller = self.canceller.lock().unwrap();
|
|
||||||
let (abort_handle, abort_registration) = future::AbortHandle::new_pair();
|
|
||||||
|
|
||||||
canceller.replace(abort_handle);
|
|
||||||
drop(canceller);
|
|
||||||
|
|
||||||
// make abortable
|
|
||||||
let future = async {
|
|
||||||
match future::Abortable::new(future, abort_registration).await {
|
|
||||||
// Future resolved successfully
|
|
||||||
Ok(Ok(res)) => Ok(res),
|
|
||||||
|
|
||||||
// Future resolved with an error
|
|
||||||
Ok(Err(err)) => Err(gst::error_msg!(
|
|
||||||
gst::ResourceError::Failed,
|
|
||||||
["Future resolved with an error {}", err.to_string()]
|
|
||||||
)),
|
|
||||||
|
|
||||||
// Canceller called before future resolved
|
|
||||||
Err(future::Aborted) => Err(gst::error_msg!(
|
|
||||||
gst::ResourceError::Failed,
|
|
||||||
["Canceller called before future resolved"]
|
|
||||||
)),
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
let res = {
|
|
||||||
let _enter = RUNTIME.enter();
|
|
||||||
futures::executor::block_on(future)
|
|
||||||
};
|
|
||||||
|
|
||||||
/* Clear out the canceller */
|
|
||||||
let _ = self.canceller.lock().unwrap().take();
|
|
||||||
res
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn parse_redirect_location(
|
|
||||||
headermap: &HeaderMap,
|
|
||||||
old_url: &reqwest::Url,
|
|
||||||
) -> Result<reqwest::Url, ErrorMessage> {
|
|
||||||
let location = headermap.get(reqwest::header::LOCATION).unwrap();
|
|
||||||
if let Err(e) = location.to_str() {
|
|
||||||
return Err(gst::error_msg!(
|
|
||||||
gst::ResourceError::Failed,
|
|
||||||
[
|
|
||||||
"Failed to convert the redirect location to string {}",
|
|
||||||
e.to_string()
|
|
||||||
]
|
|
||||||
));
|
|
||||||
}
|
|
||||||
let location = location.to_str().unwrap();
|
|
||||||
|
|
||||||
if location.to_ascii_lowercase().starts_with("http") {
|
|
||||||
// location url is an absolute path
|
|
||||||
reqwest::Url::parse(location)
|
|
||||||
.map_err(|e| gst::error_msg!(gst::ResourceError::Failed, ["{}", e.to_string()]))
|
|
||||||
} else {
|
|
||||||
// location url is a relative path
|
|
||||||
let mut new_url = old_url.clone();
|
|
||||||
new_url.set_path(location);
|
|
||||||
Ok(new_url)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn build_reqwest_client(pol: Policy) -> reqwest::Client {
|
|
||||||
let client_builder = reqwest::Client::builder();
|
|
||||||
client_builder.redirect(pol).build().unwrap()
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue