diff --git a/net/webrtchttp/src/lib.rs b/net/webrtchttp/src/lib.rs index 59e8e15e..a795f175 100644 --- a/net/webrtchttp/src/lib.rs +++ b/net/webrtchttp/src/lib.rs @@ -15,6 +15,7 @@ * Since: plugins-rs-0.9.0 */ use gst::glib; +mod utils; mod whepsrc; mod whipsink; diff --git a/net/webrtchttp/src/utils.rs b/net/webrtchttp/src/utils.rs new file mode 100644 index 00000000..5980fd32 --- /dev/null +++ b/net/webrtchttp/src/utils.rs @@ -0,0 +1,192 @@ +use futures::future; +use futures::prelude::*; +use gst::{prelude::*, ErrorMessage}; +use once_cell::sync::Lazy; +use parse_link_header; +use reqwest::header::HeaderMap; +use reqwest::redirect::Policy; +use std::fmt::Display; +use std::sync::Mutex; +use tokio::runtime; + +pub static RUNTIME: Lazy = Lazy::new(|| { + runtime::Builder::new_multi_thread() + .enable_all() + .worker_threads(1) + .thread_name("webrtc-http-runtime") + .build() + .unwrap() +}); + +pub fn wait( + canceller: &Mutex>, + future: F, +) -> Result +where + F: Send + Future>, + T: Send + 'static, + E: Send + Display, +{ + let mut canceller_guard = canceller.lock().unwrap(); + let (abort_handle, abort_registration) = future::AbortHandle::new_pair(); + + if canceller_guard.is_some() { + return Err(gst::error_msg!( + gst::ResourceError::Failed, + ["Old Canceller should not exist"] + )); + } + + canceller_guard.replace(abort_handle); + drop(canceller_guard); + + let future = async { + match future::Abortable::new(future, abort_registration).await { + Ok(Ok(res)) => Ok(res), + + Ok(Err(err)) => Err(gst::error_msg!( + gst::ResourceError::Failed, + ["Future resolved with an error {}", err.to_string()] + )), + + Err(future::Aborted) => Err(gst::error_msg!( + gst::ResourceError::Failed, + ["Canceller called before future resolved"] + )), + } + }; + + let res = { + let _enter = RUNTIME.enter(); + futures::executor::block_on(future) + }; + + canceller_guard = canceller.lock().unwrap(); + *canceller_guard = None; + + res +} + +pub fn parse_redirect_location( + headermap: &HeaderMap, + old_url: &reqwest::Url, +) -> Result { + let location = match headermap.get(reqwest::header::LOCATION) { + Some(location) => location, + None => { + return Err(gst::error_msg!( + gst::ResourceError::Failed, + ["Location header field should be present for WHIP/WHEP resource URL"] + )); + } + }; + + let location = match location.to_str() { + Ok(loc) => loc, + Err(e) => { + return Err(gst::error_msg!( + gst::ResourceError::Failed, + ["Failed to convert location to string {}", e] + )); + } + }; + + match reqwest::Url::parse(location) { + Ok(url) => Ok(url), // Location URL is an absolute path + Err(_) => { + // Location URL is a relative path + let new_url = old_url.clone().join(location).map_err(|err| { + gst::error_msg!( + gst::ResourceError::Failed, + ["URL join operation failed: {:?}", err] + ) + })?; + + Ok(new_url) + } + } +} + +pub fn build_reqwest_client(pol: Policy) -> reqwest::Client { + let client_builder = reqwest::Client::builder(); + client_builder.redirect(pol).build().unwrap() +} + +pub fn set_ice_servers( + webrtcbin: &gst::Element, + headermap: &HeaderMap, +) -> Result<(), ErrorMessage> { + for link in headermap.get_all("link").iter() { + let link = link.to_str().map_err(|err| { + gst::error_msg!( + gst::ResourceError::Failed, + [ + "Header value should contain only visible ASCII strings: {}", + err + ] + ) + })?; + + let item_map = match parse_link_header::parse_with_rel(link) { + Ok(map) => map, + Err(_) => continue, + }; + + let link = match item_map.contains_key("ice-server") { + true => item_map.get("ice-server").unwrap(), + false => continue, // Not a link header we care about + }; + + // Note: webrtcbin needs ice servers to be in the below format + // ://@ + // and the ice-servers (link headers) received from the whip server might be + // in the format : with username and password as separate params. + // Constructing these with 'url' crate also require a format/parse + // for changing : to ://:@. + // So preferred to use the String rather + + let mut ice_server_url; + + // check if uri has :// + if link.uri.has_authority() { + // use raw_uri as is + // username and password in the link.uri.params ignored + ice_server_url = link.raw_uri.as_str().to_string(); + } else { + // No builder pattern is provided by reqwest::Url. Use string operation. + // construct url as '://@' + ice_server_url = format!("{}://", link.uri.scheme()); + if let Some(user) = link.params.get("username") { + ice_server_url += user.as_str(); + if let Some(pass) = link.params.get("credential") { + ice_server_url = ice_server_url + ":" + pass.as_str(); + } + ice_server_url += "@"; + } + + // the raw_uri contains the ice-server in the form : + // so strip the scheme and the ':' from the beginning of raw_uri and use + // the rest of raw_uri to append it the url which will be in the form + // ://@ as expected + ice_server_url += link + .raw_uri + .strip_prefix((link.uri.scheme().to_owned() + ":").as_str()) + .expect("strip 'scheme:' from raw uri"); + } + + // It's nicer to not collapse the `else if` and its inner `if` + #[allow(clippy::collapsible_if)] + if link.uri.scheme() == "stun" { + webrtcbin.set_property_from_str("stun-server", ice_server_url.as_str()); + } else if link.uri.scheme().starts_with("turn") { + if !webrtcbin.emit_by_name::("add-turn-server", &[&ice_server_url.as_str()]) { + return Err(gst::error_msg!( + gst::ResourceError::Failed, + ["Failed to set turn server {}", ice_server_url] + )); + } + } + } + + Ok(()) +} diff --git a/net/webrtchttp/src/whepsrc/imp.rs b/net/webrtchttp/src/whepsrc/imp.rs index c97266ff..a9a038f7 100644 --- a/net/webrtchttp/src/whepsrc/imp.rs +++ b/net/webrtchttp/src/whepsrc/imp.rs @@ -7,24 +7,16 @@ // // SPDX-License-Identifier: MPL-2.0 +use crate::utils::{build_reqwest_client, parse_redirect_location, set_ice_servers, wait}; use crate::GstRsWebRTCICETransportPolicy; +use bytes::Bytes; use futures::future; -use futures::prelude::*; -use tokio::runtime; - use gst::{element_imp_error, glib, prelude::*, subclass::prelude::*, ErrorMessage}; use gst_sdp::*; use gst_webrtc::*; - -use bytes::Bytes; use once_cell::sync::Lazy; -use parse_link_header; - use reqwest::header::{HeaderMap, HeaderValue}; -use reqwest::redirect::Policy; use reqwest::StatusCode; - -use std::fmt::Display; use std::sync::Mutex; static CAT: Lazy = Lazy::new(|| { @@ -35,14 +27,6 @@ static CAT: Lazy = Lazy::new(|| { ) }); -static RUNTIME: Lazy = Lazy::new(|| { - runtime::Builder::new_multi_thread() - .enable_all() - .worker_threads(1) - .build() - .unwrap() -}); - const DEFAULT_ICE_TRANSPORT_POLICY: GstRsWebRTCICETransportPolicy = GstRsWebRTCICETransportPolicy::All; const MAX_REDIRECTS: u8 = 10; @@ -618,7 +602,7 @@ impl WhepSrc { .lock() .expect("Failed to acquire settings lock"); if settings.use_link_headers { - self.set_ice_servers(resp.headers()); + set_ice_servers(&self.webrtcbin, resp.headers())?; } drop(settings); @@ -660,10 +644,7 @@ impl WhepSrc { }; drop(state); - let ans_bytes = match self.wait(resp.bytes()) { - Ok(ans) => ans, - Err(e) => return Err(e), - }; + let ans_bytes = wait(&self.canceller, resp.bytes())?; self.sdp_message_parse(ans_bytes) } @@ -717,7 +698,7 @@ impl WhepSrc { [ "Unexpected response: {} - {}", s.as_str(), - self.wait(resp.bytes()) + wait(&self.canceller, resp.bytes()) .map(|x| x.escape_ascii().to_string()) .unwrap_or_else(|_| "(no further details)".to_string()) ] @@ -874,96 +855,6 @@ impl WhepSrc { } } - // Taken from WHIP sink - fn set_ice_servers(&self, headermap: &HeaderMap) { - for link in headermap.get_all("link").iter() { - let link = link - .to_str() - .expect("Header value should contain only visible ASCII strings"); - - let item_map = match parse_link_header::parse_with_rel(link) { - Ok(map) => map, - Err(e) => { - gst::warning!( - CAT, - imp: self, - "Failed to set ICE server {} due to {:?}", - link, - e - ); - continue; - } - }; - - let link = match item_map.contains_key("ice-server") { - true => item_map.get("ice-server").unwrap(), - false => continue, // Not a link header we care about - }; - - // Note: webrtcbin needs ice servers to be in the below format - // ://@ - // and the ice-servers (link headers) received from the whip server might be - // in the format : with username and password as separate params. - // Constructing these with 'url' crate also require a format/parse - // for changing : to ://:@. - // So preferred to use the String rather - - let mut ice_server_url; - - // check if uri has :// - if link.uri.has_authority() { - // use raw_uri as is - // username and password in the link.uri.params ignored - ice_server_url = link.raw_uri.as_str().to_string(); - } else { - // construct url as '://@' - ice_server_url = format!("{}://", link.uri.scheme()); - if let Some(user) = link.params.get("username") { - ice_server_url += user.as_str(); - if let Some(pass) = link.params.get("credential") { - ice_server_url = ice_server_url + ":" + pass.as_str(); - } - ice_server_url += "@"; - } - - // the raw_uri contains the ice-server in the form : - // so strip the scheme and the ':' from the beginning of raw_uri and use - // the rest of raw_uri to append it the url which will be in the form - // ://@ as expected - ice_server_url += link - .raw_uri - .strip_prefix((link.uri.scheme().to_owned() + ":").as_str()) - .expect("strip 'scheme:' from raw uri"); - } - - gst::info!( - CAT, - imp: self, - "Setting STUN/TURN server {}", - ice_server_url - ); - - // It's nicer to not collapse the `else if` and its inner `if` - #[allow(clippy::collapsible_if)] - if link.uri.scheme() == "stun" { - self.webrtcbin - .set_property_from_str("stun-server", ice_server_url.as_str()); - } else if link.uri.scheme().starts_with("turn") { - if !self - .webrtcbin - .emit_by_name::("add-turn-server", &[&ice_server_url.as_str()]) - { - gst::error!( - CAT, - imp: self, - "Failed to set turn server {}", - ice_server_url - ); - } - } - } - } - fn send_sdp(&self, sdp: SDPMessage) -> Result<(), gst::ErrorMessage> { let sess_desc = WebRTCSessionDescription::new(WebRTCSDPType::Offer, sdp); let settings = self.settings.lock().unwrap(); @@ -1025,7 +916,7 @@ impl WhepSrc { .body(body) .send(); - let resp = self.wait(future)?; + let resp = wait(&self.canceller, future)?; self.parse_endpoint_response(endpoint, 0, resp) } @@ -1068,7 +959,7 @@ impl WhepSrc { let client = build_reqwest_client(reqwest::redirect::Policy::default()); let future = client.delete(resource_url).headers(headermap).send(); - let res = self.wait(future); + let res = wait(&self.canceller, future); match res { Ok(r) => { gst::debug!(CAT, imp: self, "Response to DELETE : {}", r.status()); @@ -1078,74 +969,4 @@ impl WhepSrc { } }; } - - fn wait(&self, future: F) -> Result - where - F: Send + Future>, - T: Send + 'static, - E: Send + Display, - { - let mut canceller = self.canceller.lock().unwrap(); - let (abort_handle, abort_registration) = future::AbortHandle::new_pair(); - - canceller.replace(abort_handle); - drop(canceller); - - let future = async { - match future::Abortable::new(future, abort_registration).await { - Ok(Ok(res)) => Ok(res), - - Ok(Err(err)) => Err(gst::error_msg!( - gst::ResourceError::Failed, - ["Future resolved with an error {}", err.to_string()] - )), - - Err(future::Aborted) => Err(gst::error_msg!( - gst::ResourceError::Failed, - ["Canceller called before future resolved"] - )), - } - }; - - let res = { - let _enter = RUNTIME.enter(); - futures::executor::block_on(future) - }; - - let _ = self.canceller.lock().unwrap().take(); - res - } -} - -fn parse_redirect_location( - headermap: &HeaderMap, - old_url: &reqwest::Url, -) -> Result { - let location = headermap.get(reqwest::header::LOCATION).unwrap(); - if let Err(e) = location.to_str() { - return Err(gst::error_msg!( - gst::ResourceError::Failed, - [ - "Failed to convert the redirect location to string {}", - e.to_string() - ] - )); - } - let location = location.to_str().unwrap(); - - if location.to_ascii_lowercase().starts_with("http") { - // Location URL is an absolute path - reqwest::Url::parse(location) - .map_err(|e| gst::error_msg!(gst::ResourceError::Failed, ["{}", e.to_string()])) - } else { - // Location URL is a relative path - let mut new_url = old_url.clone(); - new_url.set_path(location); - Ok(new_url) - } -} - -fn build_reqwest_client(pol: Policy) -> reqwest::Client { - let client_builder = reqwest::Client::builder(); - client_builder.redirect(pol).build().unwrap() } diff --git a/net/webrtchttp/src/whipsink/imp.rs b/net/webrtchttp/src/whipsink/imp.rs index 7ad884d0..360a30c0 100644 --- a/net/webrtchttp/src/whipsink/imp.rs +++ b/net/webrtchttp/src/whipsink/imp.rs @@ -7,8 +7,8 @@ // // SPDX-License-Identifier: MPL-2.0 +use crate::utils::{build_reqwest_client, parse_redirect_location, wait}; use futures::future; -use futures::prelude::*; use gst::glib; use gst::prelude::*; use gst::subclass::prelude::*; @@ -19,24 +19,13 @@ use once_cell::sync::Lazy; use parse_link_header; use reqwest::header::HeaderMap; use reqwest::header::HeaderValue; -use reqwest::redirect::Policy; use reqwest::StatusCode; -use std::fmt::Display; use std::sync::Mutex; -use tokio::runtime; static CAT: Lazy = Lazy::new(|| { gst::DebugCategory::new("whipsink", gst::DebugColorFlags::empty(), Some("WHIP Sink")) }); -static RUNTIME: Lazy = Lazy::new(|| { - runtime::Builder::new_multi_thread() - .enable_all() - .worker_threads(1) - .build() - .unwrap() -}); - const MAX_REDIRECTS: u8 = 10; #[derive(Debug, Clone)] @@ -400,7 +389,7 @@ impl WhipSink { .headers(headermap) .send(); - let resp = match self.wait(future) { + let resp = match wait(&self.canceller, future) { Ok(r) => r, Err(e) => { return Err(e); @@ -444,7 +433,7 @@ impl WhipSink { [ "lookup_ice_servers - Unexpected response {} {:?}", status, - self.wait(resp.bytes()).unwrap() + wait(&self.canceller, resp.bytes()).unwrap() ] )), } @@ -624,7 +613,7 @@ impl WhipSink { .body(body) .send(); - let resp = match self.wait(future) { + let resp = match wait(&self.canceller, future) { Ok(r) => r, Err(e) => { return Err(e); @@ -654,7 +643,7 @@ impl WhipSink { }; drop(state); - let ans_bytes = match self.wait(resp.bytes()) { + let ans_bytes = match wait(&self.canceller, resp.bytes()) { Ok(ans) => ans.to_vec(), Err(e) => return Err(e), }; @@ -709,7 +698,7 @@ impl WhipSink { [ "Server returned error: {} - {}", s.as_str(), - self.wait(resp.bytes()) + wait(&self.canceller, resp.bytes()) .map(|x| x.escape_ascii().to_string()) .unwrap_or_else(|_| "(no further details)".to_string()) ] @@ -721,7 +710,7 @@ impl WhipSink { [ "Unexpected response {:?} {:?}", s, - self.wait(resp.bytes()).unwrap() + wait(&self.canceller, resp.bytes()).unwrap() ] )), }; @@ -757,7 +746,7 @@ impl WhipSink { let client = build_reqwest_client(reqwest::redirect::Policy::default()); let future = client.delete(resource_url).headers(headermap).send(); - let res = self.wait(future); + let res = wait(&self.canceller, future); match res { Ok(r) => { gst::debug!(CAT, imp: self, "Response to DELETE : {}", r.status()); @@ -767,79 +756,4 @@ impl WhipSink { } }; } - - fn wait(&self, future: F) -> Result - where - F: Send + Future>, - T: Send + 'static, - E: Send + Display, - { - let mut canceller = self.canceller.lock().unwrap(); - let (abort_handle, abort_registration) = future::AbortHandle::new_pair(); - - canceller.replace(abort_handle); - drop(canceller); - - // make abortable - let future = async { - match future::Abortable::new(future, abort_registration).await { - // Future resolved successfully - Ok(Ok(res)) => Ok(res), - - // Future resolved with an error - Ok(Err(err)) => Err(gst::error_msg!( - gst::ResourceError::Failed, - ["Future resolved with an error {}", err.to_string()] - )), - - // Canceller called before future resolved - Err(future::Aborted) => Err(gst::error_msg!( - gst::ResourceError::Failed, - ["Canceller called before future resolved"] - )), - } - }; - - let res = { - let _enter = RUNTIME.enter(); - futures::executor::block_on(future) - }; - - /* Clear out the canceller */ - let _ = self.canceller.lock().unwrap().take(); - res - } -} - -fn parse_redirect_location( - headermap: &HeaderMap, - old_url: &reqwest::Url, -) -> Result { - let location = headermap.get(reqwest::header::LOCATION).unwrap(); - if let Err(e) = location.to_str() { - return Err(gst::error_msg!( - gst::ResourceError::Failed, - [ - "Failed to convert the redirect location to string {}", - e.to_string() - ] - )); - } - let location = location.to_str().unwrap(); - - if location.to_ascii_lowercase().starts_with("http") { - // location url is an absolute path - reqwest::Url::parse(location) - .map_err(|e| gst::error_msg!(gst::ResourceError::Failed, ["{}", e.to_string()])) - } else { - // location url is a relative path - let mut new_url = old_url.clone(); - new_url.set_path(location); - Ok(new_url) - } -} - -fn build_reqwest_client(pol: Policy) -> reqwest::Client { - let client_builder = reqwest::Client::builder(); - client_builder.redirect(pol).build().unwrap() }