diff --git a/net/webrtchttp/src/utils.rs b/net/webrtchttp/src/utils.rs index 5980fd32..99eec59d 100644 --- a/net/webrtchttp/src/utils.rs +++ b/net/webrtchttp/src/utils.rs @@ -5,8 +5,8 @@ use once_cell::sync::Lazy; use parse_link_header; use reqwest::header::HeaderMap; use reqwest::redirect::Policy; -use std::fmt::Display; use std::sync::Mutex; +use std::time::Duration; use tokio::runtime; pub static RUNTIME: Lazy = Lazy::new(|| { @@ -18,14 +18,14 @@ pub static RUNTIME: Lazy = Lazy::new(|| { .unwrap() }); -pub fn wait( +pub fn wait( canceller: &Mutex>, future: F, + timeout: u32 ) -> Result where - F: Send + Future>, + F: Send + Future>, T: Send + 'static, - E: Send + Display, { let mut canceller_guard = canceller.lock().unwrap(); let (abort_handle, abort_registration) = future::AbortHandle::new_pair(); @@ -40,6 +40,22 @@ where canceller_guard.replace(abort_handle); drop(canceller_guard); + let future = async { + if timeout == 0 { + future.await + } else { + let res = tokio::time::timeout(Duration::from_secs(timeout.into()), future).await; + + match res { + Ok(r) => r, + Err(e) => Err(gst::error_msg!( + gst::ResourceError::Read, + ["Request timeout, elapsed: {}", e.to_string()] + )), + } + } + }; + let future = async { match future::Abortable::new(future, abort_registration).await { Ok(Ok(res)) => Ok(res), diff --git a/net/webrtchttp/src/whepsrc/imp.rs b/net/webrtchttp/src/whepsrc/imp.rs index a9a038f7..8a1acdcc 100644 --- a/net/webrtchttp/src/whepsrc/imp.rs +++ b/net/webrtchttp/src/whepsrc/imp.rs @@ -30,6 +30,7 @@ static CAT: Lazy = Lazy::new(|| { const DEFAULT_ICE_TRANSPORT_POLICY: GstRsWebRTCICETransportPolicy = GstRsWebRTCICETransportPolicy::All; const MAX_REDIRECTS: u8 = 10; +const DEFAULT_TIMEOUT: u32 = 15; #[derive(Debug, Clone)] struct Settings { @@ -41,6 +42,7 @@ struct Settings { auth_token: Option, use_link_headers: bool, ice_transport_policy: GstRsWebRTCICETransportPolicy, + timeout: u32, } #[allow(clippy::derivable_impls)] @@ -67,6 +69,7 @@ impl Default for Settings { auth_token: None, use_link_headers: false, ice_transport_policy: DEFAULT_ICE_TRANSPORT_POLICY, + timeout: DEFAULT_TIMEOUT, } } } @@ -242,6 +245,13 @@ impl ObjectImpl for WhepSrc { .nick("ICE transport policy") .blurb("The policy to apply for ICE transport") .build(), + glib::ParamSpecUInt::builder("timeout") + .nick("Timeout") + .blurb("Value in seconds to timeout WHEP endpoint requests (0 = No timeout).") + .maximum(3600) + .default_value(DEFAULT_TIMEOUT) + .readwrite() + .build(), ] }); PROPERTIES.as_ref() @@ -307,6 +317,10 @@ impl ObjectImpl for WhepSrc { .set_property_from_str("ice-transport-policy", "all"); } } + "timeout" => { + let mut settings = self.settings.lock().unwrap(); + settings.timeout = value.get().expect("type checked upstream"); + } _ => unimplemented!(), } } @@ -345,6 +359,10 @@ impl ObjectImpl for WhepSrc { let settings = self.settings.lock().unwrap(); settings.ice_transport_policy.to_value() } + "timeout" => { + let settings = self.settings.lock().unwrap(); + settings.timeout.to_value() + } _ => unimplemented!(), } } @@ -601,9 +619,12 @@ impl WhepSrc { .settings .lock() .expect("Failed to acquire settings lock"); + let timeout = settings.timeout; + if settings.use_link_headers { set_ice_servers(&self.webrtcbin, resp.headers())?; } + drop(settings); /* See section 4.2 of the WHEP specification */ @@ -644,7 +665,16 @@ impl WhepSrc { }; drop(state); - let ans_bytes = wait(&self.canceller, resp.bytes())?; + let future = async { + resp.bytes().await.map_err(|err| { + gst::error_msg!( + gst::ResourceError::Failed, + ["Failed to get response body: {:?}", err] + ) + }) + }; + + let ans_bytes = wait(&self.canceller, future, timeout)?; self.sdp_message_parse(ans_bytes) } @@ -692,16 +722,30 @@ impl WhepSrc { } s => { + let future = async { + resp.bytes().await.map_err(|err| { + gst::error_msg!( + gst::ResourceError::Failed, + ["Failed to get response body: {:?}", err] + ) + }) + }; + + let settings = self + .settings + .lock() + .expect("Failed to acquire settings lock"); + let timeout = settings.timeout; + drop(settings); + + let res = wait(&self.canceller, future, timeout) + .map(|x| x.escape_ascii().to_string()) + .unwrap_or_else(|_| "(no further details)".to_string()); + // FIXME: Check and handle 'Retry-After' header in case of server error Err(gst::error_msg!( gst::ResourceError::Failed, - [ - "Unexpected response: {} - {}", - s.as_str(), - wait(&self.canceller, resp.bytes()) - .map(|x| x.escape_ascii().to_string()) - .unwrap_or_else(|_| "(no further details)".to_string()) - ] + ["Unexpected response: {} - {}", s.as_str(), res] )) } } @@ -751,12 +795,7 @@ impl WhepSrc { &[&offer_sdp, &None::], ); } else { - gst::error!( - CAT, - imp: self_, - "Reply without an offer: {}", - reply - ); + gst::error!(CAT, imp: self_, "Reply without an offer: {}", reply); element_imp_error!( self_, gst::LibraryError::Failed, @@ -879,6 +918,7 @@ impl WhepSrc { endpoint: reqwest::Url, ) -> Result<(), gst::ErrorMessage> { let settings = self.settings.lock().unwrap(); + let timeout = settings.timeout; let sdp = offer.sdp(); let body = sdp.as_text().unwrap(); @@ -909,14 +949,22 @@ impl WhepSrc { endpoint.as_str() ); - let future = self - .client - .request(reqwest::Method::POST, endpoint.clone()) - .headers(headermap) - .body(body) - .send(); + let future = async { + self.client + .request(reqwest::Method::POST, endpoint.clone()) + .headers(headermap) + .body(body) + .send() + .await + .map_err(|err| { + gst::error_msg!( + gst::ResourceError::Failed, + ["HTTP POST request failed {}: {:?}", endpoint.as_str(), err] + ) + }) + }; - let resp = wait(&self.canceller, future)?; + let resp = wait(&self.canceller, future, timeout)?; self.parse_endpoint_response(endpoint, 0, resp) } @@ -924,6 +972,7 @@ impl WhepSrc { fn terminate_session(&self) { let settings = self.settings.lock().unwrap(); let state = self.state.lock().unwrap(); + let timeout = settings.timeout; let resource_url = match *state { State::Running { @@ -957,9 +1006,21 @@ impl WhepSrc { /* DELETE request goes to the WHEP resource URL. See section 3 of the specification. */ let client = build_reqwest_client(reqwest::redirect::Policy::default()); - let future = client.delete(resource_url).headers(headermap).send(); + let future = async { + client + .delete(resource_url.clone()) + .headers(headermap) + .send() + .await + .map_err(|err| { + gst::error_msg!( + gst::ResourceError::Failed, + ["DELETE request failed {}: {:?}", resource_url, err] + ) + }) + }; - let res = wait(&self.canceller, future); + let res = wait(&self.canceller, future, timeout); match res { Ok(r) => { gst::debug!(CAT, imp: self, "Response to DELETE : {}", r.status()); diff --git a/net/webrtchttp/src/whipsink/imp.rs b/net/webrtchttp/src/whipsink/imp.rs index ee712a3c..5d0616e9 100644 --- a/net/webrtchttp/src/whipsink/imp.rs +++ b/net/webrtchttp/src/whipsink/imp.rs @@ -30,6 +30,7 @@ static CAT: Lazy = Lazy::new(|| { const DEFAULT_ICE_TRANSPORT_POLICY: GstRsWebRTCICETransportPolicy = GstRsWebRTCICETransportPolicy::All; const MAX_REDIRECTS: u8 = 10; +const DEFAULT_TIMEOUT: u32 = 15; #[derive(Debug, Clone)] struct Settings { @@ -39,6 +40,7 @@ struct Settings { turn_server: Option, stun_server: Option, ice_transport_policy: GstRsWebRTCICETransportPolicy, + timeout: u32, } #[allow(clippy::derivable_impls)] @@ -51,6 +53,7 @@ impl Default for Settings { stun_server: None, turn_server: None, ice_transport_policy: DEFAULT_ICE_TRANSPORT_POLICY, + timeout: DEFAULT_TIMEOUT, } } } @@ -232,6 +235,13 @@ impl ObjectImpl for WhipSink { .nick("ICE transport policy") .blurb("The policy to apply for ICE transport") .build(), + + glib::ParamSpecUInt::builder("timeout") + .nick("Timeout") + .blurb("Value in seconds to timeout WHIP endpoint requests (0 = No timeout).") + .maximum(3600) + .default_value(DEFAULT_TIMEOUT) + .build(), ] }); PROPERTIES.as_ref() @@ -283,6 +293,10 @@ impl ObjectImpl for WhipSink { .set_property_from_str("ice-transport-policy", "all"); } } + "timeout" => { + let mut settings = self.settings.lock().unwrap(); + settings.timeout = value.get().expect("type checked upstream"); + } _ => unimplemented!(), } } @@ -313,6 +327,10 @@ impl ObjectImpl for WhipSink { let settings = self.settings.lock().unwrap(); settings.ice_transport_policy.to_value() } + "timeout" => { + let settings = self.settings.lock().unwrap(); + settings.timeout.to_value() + } _ => unimplemented!(), } } @@ -472,6 +490,7 @@ impl WhipSink { fn lookup_ice_servers(&self, endpoint: reqwest::Url) -> Result<(), ErrorMessage> { let settings = self.settings.lock().unwrap(); let state = self.state.lock().unwrap(); + let timeout = settings.timeout; let redirects = match *state { State::Options { redirects } => redirects, @@ -505,12 +524,21 @@ impl WhipSink { ); } - let future = client - .request(reqwest::Method::OPTIONS, endpoint.as_ref()) - .headers(headermap) - .send(); + let future = async { + client + .request(reqwest::Method::OPTIONS, endpoint.as_ref()) + .headers(headermap) + .send() + .await + .map_err(|err| { + gst::error_msg!( + gst::ResourceError::Failed, + ["OPTIONS request failed: {:?}", err] + ) + }) + }; - let resp = wait(&self.canceller, future)?; + let resp = wait(&self.canceller, future, timeout)?; match resp.status() { StatusCode::NO_CONTENT => { @@ -544,14 +572,27 @@ impl WhipSink { )) } } - status => Err(gst::error_msg!( - gst::ResourceError::Failed, - [ - "lookup_ice_servers - Unexpected response {} {:?}", - status, - wait(&self.canceller, resp.bytes()).unwrap() - ] - )), + status => { + let future = async { + resp.bytes().await.map_err(|err| { + gst::error_msg!( + gst::ResourceError::Failed, + ["Failed to get response body: {:?}", err] + ) + }) + }; + + let res = wait(&self.canceller, future, timeout); + + Err(gst::error_msg!( + gst::ResourceError::Failed, + [ + "lookup_ice_servers - Unexpected response {} {:?}", + status, + res + ] + )) + } } } @@ -621,6 +662,7 @@ impl WhipSink { ) -> Result { let settings = self.settings.lock().unwrap(); let state = self.state.lock().unwrap(); + let timeout = settings.timeout; let redirects = match *state { State::Post { redirects } => redirects, @@ -658,13 +700,22 @@ impl WhipSink { ); } - let future = client - .request(reqwest::Method::POST, endpoint.as_ref()) - .headers(headermap) - .body(body) - .send(); + let future = async { + client + .request(reqwest::Method::POST, endpoint.as_ref()) + .headers(headermap) + .body(body) + .send() + .await + .map_err(|err| { + gst::error_msg!( + gst::ResourceError::Failed, + ["POST request failed: {:?}", err] + ) + }) + }; - let resp = wait(&self.canceller, future)?; + let resp = wait(&self.canceller, future, timeout)?; let res = match resp.status() { StatusCode::OK | StatusCode::CREATED => { @@ -716,7 +767,16 @@ impl WhipSink { }; drop(state); - let ans_bytes = wait(&self.canceller, resp.bytes())?; + let future = async { + resp.bytes().await.map_err(|err| { + gst::error_msg!( + gst::ResourceError::Failed, + ["Failed to get response body: {:?}", err] + ) + }) + }; + + let ans_bytes = wait(&self.canceller, future, timeout)?; match sdp_message::SDPMessage::parse_buffer(&ans_bytes) { Ok(ans_sdp) => { @@ -762,28 +822,26 @@ impl WhipSink { } } - s if s.is_server_error() => { + s => { + let future = async { + resp.bytes().await.map_err(|err| { + gst::error_msg!( + gst::ResourceError::Failed, + ["Failed to get response body: {:?}", err] + ) + }) + }; + + let resp = wait(&self.canceller, future, timeout) + .map(|x| x.escape_ascii().to_string()) + .unwrap_or_else(|_| "(no further details)".to_string()); + // FIXME: Check and handle 'Retry-After' header in case of server error Err(gst::error_msg!( gst::ResourceError::Failed, - [ - "Server returned error: {} - {}", - s.as_str(), - wait(&self.canceller, resp.bytes()) - .map(|x| x.escape_ascii().to_string()) - .unwrap_or_else(|_| "(no further details)".to_string()) - ] + ["Server returned error: {} - {}", s.as_str(), resp] )) } - - s => Err(gst::error_msg!( - gst::ResourceError::Failed, - [ - "Unexpected response {:?} {:?}", - s, - wait(&self.canceller, resp.bytes()).unwrap() - ] - )), }; res @@ -792,6 +850,7 @@ impl WhipSink { fn terminate_session(&self) { let settings = self.settings.lock().unwrap(); let state = self.state.lock().unwrap(); + let timeout = settings.timeout; let resource_url = match *state { State::Running { ref whip_resource_url, @@ -815,9 +874,21 @@ impl WhipSink { gst::debug!(CAT, imp: self, "DELETE request on {}", resource_url); let client = build_reqwest_client(reqwest::redirect::Policy::default()); - let future = client.delete(resource_url).headers(headermap).send(); + let future = async { + client + .delete(resource_url.clone()) + .headers(headermap) + .send() + .await + .map_err(|err| { + gst::error_msg!( + gst::ResourceError::Failed, + ["DELETE request failed {}: {:?}", resource_url, err] + ) + }) + }; - let res = wait(&self.canceller, future); + let res = wait(&self.canceller, future, timeout); match res { Ok(r) => { gst::debug!(CAT, imp: self, "Response to DELETE : {}", r.status());