webrtchttp: Implement timeout for waiting on futures

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1008>
This commit is contained in:
Sanchayan Maity 2022-10-28 14:02:23 +05:30 committed by Sebastian Dröge
parent 3fc0326084
commit 4e9ec324e1
3 changed files with 214 additions and 66 deletions

View file

@ -5,8 +5,8 @@ use once_cell::sync::Lazy;
use parse_link_header; use parse_link_header;
use reqwest::header::HeaderMap; use reqwest::header::HeaderMap;
use reqwest::redirect::Policy; use reqwest::redirect::Policy;
use std::fmt::Display;
use std::sync::Mutex; use std::sync::Mutex;
use std::time::Duration;
use tokio::runtime; use tokio::runtime;
pub static RUNTIME: Lazy<runtime::Runtime> = Lazy::new(|| { pub static RUNTIME: Lazy<runtime::Runtime> = Lazy::new(|| {
@ -18,14 +18,14 @@ pub static RUNTIME: Lazy<runtime::Runtime> = Lazy::new(|| {
.unwrap() .unwrap()
}); });
pub fn wait<F, T, E>( pub fn wait<F, T>(
canceller: &Mutex<Option<future::AbortHandle>>, canceller: &Mutex<Option<future::AbortHandle>>,
future: F, future: F,
timeout: u32
) -> Result<T, ErrorMessage> ) -> Result<T, ErrorMessage>
where where
F: Send + Future<Output = Result<T, E>>, F: Send + Future<Output = Result<T, ErrorMessage>>,
T: Send + 'static, T: Send + 'static,
E: Send + Display,
{ {
let mut canceller_guard = canceller.lock().unwrap(); let mut canceller_guard = canceller.lock().unwrap();
let (abort_handle, abort_registration) = future::AbortHandle::new_pair(); let (abort_handle, abort_registration) = future::AbortHandle::new_pair();
@ -40,6 +40,22 @@ where
canceller_guard.replace(abort_handle); canceller_guard.replace(abort_handle);
drop(canceller_guard); drop(canceller_guard);
let future = async {
if timeout == 0 {
future.await
} else {
let res = tokio::time::timeout(Duration::from_secs(timeout.into()), future).await;
match res {
Ok(r) => r,
Err(e) => Err(gst::error_msg!(
gst::ResourceError::Read,
["Request timeout, elapsed: {}", e.to_string()]
)),
}
}
};
let future = async { let future = async {
match future::Abortable::new(future, abort_registration).await { match future::Abortable::new(future, abort_registration).await {
Ok(Ok(res)) => Ok(res), Ok(Ok(res)) => Ok(res),

View file

@ -30,6 +30,7 @@ static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
const DEFAULT_ICE_TRANSPORT_POLICY: GstRsWebRTCICETransportPolicy = const DEFAULT_ICE_TRANSPORT_POLICY: GstRsWebRTCICETransportPolicy =
GstRsWebRTCICETransportPolicy::All; GstRsWebRTCICETransportPolicy::All;
const MAX_REDIRECTS: u8 = 10; const MAX_REDIRECTS: u8 = 10;
const DEFAULT_TIMEOUT: u32 = 15;
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
struct Settings { struct Settings {
@ -41,6 +42,7 @@ struct Settings {
auth_token: Option<String>, auth_token: Option<String>,
use_link_headers: bool, use_link_headers: bool,
ice_transport_policy: GstRsWebRTCICETransportPolicy, ice_transport_policy: GstRsWebRTCICETransportPolicy,
timeout: u32,
} }
#[allow(clippy::derivable_impls)] #[allow(clippy::derivable_impls)]
@ -67,6 +69,7 @@ impl Default for Settings {
auth_token: None, auth_token: None,
use_link_headers: false, use_link_headers: false,
ice_transport_policy: DEFAULT_ICE_TRANSPORT_POLICY, ice_transport_policy: DEFAULT_ICE_TRANSPORT_POLICY,
timeout: DEFAULT_TIMEOUT,
} }
} }
} }
@ -242,6 +245,13 @@ impl ObjectImpl for WhepSrc {
.nick("ICE transport policy") .nick("ICE transport policy")
.blurb("The policy to apply for ICE transport") .blurb("The policy to apply for ICE transport")
.build(), .build(),
glib::ParamSpecUInt::builder("timeout")
.nick("Timeout")
.blurb("Value in seconds to timeout WHEP endpoint requests (0 = No timeout).")
.maximum(3600)
.default_value(DEFAULT_TIMEOUT)
.readwrite()
.build(),
] ]
}); });
PROPERTIES.as_ref() PROPERTIES.as_ref()
@ -307,6 +317,10 @@ impl ObjectImpl for WhepSrc {
.set_property_from_str("ice-transport-policy", "all"); .set_property_from_str("ice-transport-policy", "all");
} }
} }
"timeout" => {
let mut settings = self.settings.lock().unwrap();
settings.timeout = value.get().expect("type checked upstream");
}
_ => unimplemented!(), _ => unimplemented!(),
} }
} }
@ -345,6 +359,10 @@ impl ObjectImpl for WhepSrc {
let settings = self.settings.lock().unwrap(); let settings = self.settings.lock().unwrap();
settings.ice_transport_policy.to_value() settings.ice_transport_policy.to_value()
} }
"timeout" => {
let settings = self.settings.lock().unwrap();
settings.timeout.to_value()
}
_ => unimplemented!(), _ => unimplemented!(),
} }
} }
@ -601,9 +619,12 @@ impl WhepSrc {
.settings .settings
.lock() .lock()
.expect("Failed to acquire settings lock"); .expect("Failed to acquire settings lock");
let timeout = settings.timeout;
if settings.use_link_headers { if settings.use_link_headers {
set_ice_servers(&self.webrtcbin, resp.headers())?; set_ice_servers(&self.webrtcbin, resp.headers())?;
} }
drop(settings); drop(settings);
/* See section 4.2 of the WHEP specification */ /* See section 4.2 of the WHEP specification */
@ -644,7 +665,16 @@ impl WhepSrc {
}; };
drop(state); drop(state);
let ans_bytes = wait(&self.canceller, resp.bytes())?; let future = async {
resp.bytes().await.map_err(|err| {
gst::error_msg!(
gst::ResourceError::Failed,
["Failed to get response body: {:?}", err]
)
})
};
let ans_bytes = wait(&self.canceller, future, timeout)?;
self.sdp_message_parse(ans_bytes) self.sdp_message_parse(ans_bytes)
} }
@ -692,16 +722,30 @@ impl WhepSrc {
} }
s => { s => {
let future = async {
resp.bytes().await.map_err(|err| {
gst::error_msg!(
gst::ResourceError::Failed,
["Failed to get response body: {:?}", err]
)
})
};
let settings = self
.settings
.lock()
.expect("Failed to acquire settings lock");
let timeout = settings.timeout;
drop(settings);
let res = wait(&self.canceller, future, timeout)
.map(|x| x.escape_ascii().to_string())
.unwrap_or_else(|_| "(no further details)".to_string());
// FIXME: Check and handle 'Retry-After' header in case of server error // FIXME: Check and handle 'Retry-After' header in case of server error
Err(gst::error_msg!( Err(gst::error_msg!(
gst::ResourceError::Failed, gst::ResourceError::Failed,
[ ["Unexpected response: {} - {}", s.as_str(), res]
"Unexpected response: {} - {}",
s.as_str(),
wait(&self.canceller, resp.bytes())
.map(|x| x.escape_ascii().to_string())
.unwrap_or_else(|_| "(no further details)".to_string())
]
)) ))
} }
} }
@ -751,12 +795,7 @@ impl WhepSrc {
&[&offer_sdp, &None::<gst::Promise>], &[&offer_sdp, &None::<gst::Promise>],
); );
} else { } else {
gst::error!( gst::error!(CAT, imp: self_, "Reply without an offer: {}", reply);
CAT,
imp: self_,
"Reply without an offer: {}",
reply
);
element_imp_error!( element_imp_error!(
self_, self_,
gst::LibraryError::Failed, gst::LibraryError::Failed,
@ -879,6 +918,7 @@ impl WhepSrc {
endpoint: reqwest::Url, endpoint: reqwest::Url,
) -> Result<(), gst::ErrorMessage> { ) -> Result<(), gst::ErrorMessage> {
let settings = self.settings.lock().unwrap(); let settings = self.settings.lock().unwrap();
let timeout = settings.timeout;
let sdp = offer.sdp(); let sdp = offer.sdp();
let body = sdp.as_text().unwrap(); let body = sdp.as_text().unwrap();
@ -909,14 +949,22 @@ impl WhepSrc {
endpoint.as_str() endpoint.as_str()
); );
let future = self let future = async {
.client self.client
.request(reqwest::Method::POST, endpoint.clone()) .request(reqwest::Method::POST, endpoint.clone())
.headers(headermap) .headers(headermap)
.body(body) .body(body)
.send(); .send()
.await
.map_err(|err| {
gst::error_msg!(
gst::ResourceError::Failed,
["HTTP POST request failed {}: {:?}", endpoint.as_str(), err]
)
})
};
let resp = wait(&self.canceller, future)?; let resp = wait(&self.canceller, future, timeout)?;
self.parse_endpoint_response(endpoint, 0, resp) self.parse_endpoint_response(endpoint, 0, resp)
} }
@ -924,6 +972,7 @@ impl WhepSrc {
fn terminate_session(&self) { fn terminate_session(&self) {
let settings = self.settings.lock().unwrap(); let settings = self.settings.lock().unwrap();
let state = self.state.lock().unwrap(); let state = self.state.lock().unwrap();
let timeout = settings.timeout;
let resource_url = match *state { let resource_url = match *state {
State::Running { State::Running {
@ -957,9 +1006,21 @@ impl WhepSrc {
/* DELETE request goes to the WHEP resource URL. See section 3 of the specification. */ /* DELETE request goes to the WHEP resource URL. See section 3 of the specification. */
let client = build_reqwest_client(reqwest::redirect::Policy::default()); let client = build_reqwest_client(reqwest::redirect::Policy::default());
let future = client.delete(resource_url).headers(headermap).send(); let future = async {
client
.delete(resource_url.clone())
.headers(headermap)
.send()
.await
.map_err(|err| {
gst::error_msg!(
gst::ResourceError::Failed,
["DELETE request failed {}: {:?}", resource_url, err]
)
})
};
let res = wait(&self.canceller, future); let res = wait(&self.canceller, future, timeout);
match res { match res {
Ok(r) => { Ok(r) => {
gst::debug!(CAT, imp: self, "Response to DELETE : {}", r.status()); gst::debug!(CAT, imp: self, "Response to DELETE : {}", r.status());

View file

@ -30,6 +30,7 @@ static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
const DEFAULT_ICE_TRANSPORT_POLICY: GstRsWebRTCICETransportPolicy = const DEFAULT_ICE_TRANSPORT_POLICY: GstRsWebRTCICETransportPolicy =
GstRsWebRTCICETransportPolicy::All; GstRsWebRTCICETransportPolicy::All;
const MAX_REDIRECTS: u8 = 10; const MAX_REDIRECTS: u8 = 10;
const DEFAULT_TIMEOUT: u32 = 15;
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
struct Settings { struct Settings {
@ -39,6 +40,7 @@ struct Settings {
turn_server: Option<String>, turn_server: Option<String>,
stun_server: Option<String>, stun_server: Option<String>,
ice_transport_policy: GstRsWebRTCICETransportPolicy, ice_transport_policy: GstRsWebRTCICETransportPolicy,
timeout: u32,
} }
#[allow(clippy::derivable_impls)] #[allow(clippy::derivable_impls)]
@ -51,6 +53,7 @@ impl Default for Settings {
stun_server: None, stun_server: None,
turn_server: None, turn_server: None,
ice_transport_policy: DEFAULT_ICE_TRANSPORT_POLICY, ice_transport_policy: DEFAULT_ICE_TRANSPORT_POLICY,
timeout: DEFAULT_TIMEOUT,
} }
} }
} }
@ -232,6 +235,13 @@ impl ObjectImpl for WhipSink {
.nick("ICE transport policy") .nick("ICE transport policy")
.blurb("The policy to apply for ICE transport") .blurb("The policy to apply for ICE transport")
.build(), .build(),
glib::ParamSpecUInt::builder("timeout")
.nick("Timeout")
.blurb("Value in seconds to timeout WHIP endpoint requests (0 = No timeout).")
.maximum(3600)
.default_value(DEFAULT_TIMEOUT)
.build(),
] ]
}); });
PROPERTIES.as_ref() PROPERTIES.as_ref()
@ -283,6 +293,10 @@ impl ObjectImpl for WhipSink {
.set_property_from_str("ice-transport-policy", "all"); .set_property_from_str("ice-transport-policy", "all");
} }
} }
"timeout" => {
let mut settings = self.settings.lock().unwrap();
settings.timeout = value.get().expect("type checked upstream");
}
_ => unimplemented!(), _ => unimplemented!(),
} }
} }
@ -313,6 +327,10 @@ impl ObjectImpl for WhipSink {
let settings = self.settings.lock().unwrap(); let settings = self.settings.lock().unwrap();
settings.ice_transport_policy.to_value() settings.ice_transport_policy.to_value()
} }
"timeout" => {
let settings = self.settings.lock().unwrap();
settings.timeout.to_value()
}
_ => unimplemented!(), _ => unimplemented!(),
} }
} }
@ -472,6 +490,7 @@ impl WhipSink {
fn lookup_ice_servers(&self, endpoint: reqwest::Url) -> Result<(), ErrorMessage> { fn lookup_ice_servers(&self, endpoint: reqwest::Url) -> Result<(), ErrorMessage> {
let settings = self.settings.lock().unwrap(); let settings = self.settings.lock().unwrap();
let state = self.state.lock().unwrap(); let state = self.state.lock().unwrap();
let timeout = settings.timeout;
let redirects = match *state { let redirects = match *state {
State::Options { redirects } => redirects, State::Options { redirects } => redirects,
@ -505,12 +524,21 @@ impl WhipSink {
); );
} }
let future = client let future = async {
.request(reqwest::Method::OPTIONS, endpoint.as_ref()) client
.headers(headermap) .request(reqwest::Method::OPTIONS, endpoint.as_ref())
.send(); .headers(headermap)
.send()
.await
.map_err(|err| {
gst::error_msg!(
gst::ResourceError::Failed,
["OPTIONS request failed: {:?}", err]
)
})
};
let resp = wait(&self.canceller, future)?; let resp = wait(&self.canceller, future, timeout)?;
match resp.status() { match resp.status() {
StatusCode::NO_CONTENT => { StatusCode::NO_CONTENT => {
@ -544,14 +572,27 @@ impl WhipSink {
)) ))
} }
} }
status => Err(gst::error_msg!( status => {
gst::ResourceError::Failed, let future = async {
[ resp.bytes().await.map_err(|err| {
"lookup_ice_servers - Unexpected response {} {:?}", gst::error_msg!(
status, gst::ResourceError::Failed,
wait(&self.canceller, resp.bytes()).unwrap() ["Failed to get response body: {:?}", err]
] )
)), })
};
let res = wait(&self.canceller, future, timeout);
Err(gst::error_msg!(
gst::ResourceError::Failed,
[
"lookup_ice_servers - Unexpected response {} {:?}",
status,
res
]
))
}
} }
} }
@ -621,6 +662,7 @@ impl WhipSink {
) -> Result<gst_webrtc::WebRTCSessionDescription, gst::ErrorMessage> { ) -> Result<gst_webrtc::WebRTCSessionDescription, gst::ErrorMessage> {
let settings = self.settings.lock().unwrap(); let settings = self.settings.lock().unwrap();
let state = self.state.lock().unwrap(); let state = self.state.lock().unwrap();
let timeout = settings.timeout;
let redirects = match *state { let redirects = match *state {
State::Post { redirects } => redirects, State::Post { redirects } => redirects,
@ -658,13 +700,22 @@ impl WhipSink {
); );
} }
let future = client let future = async {
.request(reqwest::Method::POST, endpoint.as_ref()) client
.headers(headermap) .request(reqwest::Method::POST, endpoint.as_ref())
.body(body) .headers(headermap)
.send(); .body(body)
.send()
.await
.map_err(|err| {
gst::error_msg!(
gst::ResourceError::Failed,
["POST request failed: {:?}", err]
)
})
};
let resp = wait(&self.canceller, future)?; let resp = wait(&self.canceller, future, timeout)?;
let res = match resp.status() { let res = match resp.status() {
StatusCode::OK | StatusCode::CREATED => { StatusCode::OK | StatusCode::CREATED => {
@ -716,7 +767,16 @@ impl WhipSink {
}; };
drop(state); drop(state);
let ans_bytes = wait(&self.canceller, resp.bytes())?; let future = async {
resp.bytes().await.map_err(|err| {
gst::error_msg!(
gst::ResourceError::Failed,
["Failed to get response body: {:?}", err]
)
})
};
let ans_bytes = wait(&self.canceller, future, timeout)?;
match sdp_message::SDPMessage::parse_buffer(&ans_bytes) { match sdp_message::SDPMessage::parse_buffer(&ans_bytes) {
Ok(ans_sdp) => { Ok(ans_sdp) => {
@ -762,28 +822,26 @@ impl WhipSink {
} }
} }
s if s.is_server_error() => { s => {
let future = async {
resp.bytes().await.map_err(|err| {
gst::error_msg!(
gst::ResourceError::Failed,
["Failed to get response body: {:?}", err]
)
})
};
let resp = wait(&self.canceller, future, timeout)
.map(|x| x.escape_ascii().to_string())
.unwrap_or_else(|_| "(no further details)".to_string());
// FIXME: Check and handle 'Retry-After' header in case of server error // FIXME: Check and handle 'Retry-After' header in case of server error
Err(gst::error_msg!( Err(gst::error_msg!(
gst::ResourceError::Failed, gst::ResourceError::Failed,
[ ["Server returned error: {} - {}", s.as_str(), resp]
"Server returned error: {} - {}",
s.as_str(),
wait(&self.canceller, resp.bytes())
.map(|x| x.escape_ascii().to_string())
.unwrap_or_else(|_| "(no further details)".to_string())
]
)) ))
} }
s => Err(gst::error_msg!(
gst::ResourceError::Failed,
[
"Unexpected response {:?} {:?}",
s,
wait(&self.canceller, resp.bytes()).unwrap()
]
)),
}; };
res res
@ -792,6 +850,7 @@ impl WhipSink {
fn terminate_session(&self) { fn terminate_session(&self) {
let settings = self.settings.lock().unwrap(); let settings = self.settings.lock().unwrap();
let state = self.state.lock().unwrap(); let state = self.state.lock().unwrap();
let timeout = settings.timeout;
let resource_url = match *state { let resource_url = match *state {
State::Running { State::Running {
ref whip_resource_url, ref whip_resource_url,
@ -815,9 +874,21 @@ impl WhipSink {
gst::debug!(CAT, imp: self, "DELETE request on {}", resource_url); gst::debug!(CAT, imp: self, "DELETE request on {}", resource_url);
let client = build_reqwest_client(reqwest::redirect::Policy::default()); let client = build_reqwest_client(reqwest::redirect::Policy::default());
let future = client.delete(resource_url).headers(headermap).send(); let future = async {
client
.delete(resource_url.clone())
.headers(headermap)
.send()
.await
.map_err(|err| {
gst::error_msg!(
gst::ResourceError::Failed,
["DELETE request failed {}: {:?}", resource_url, err]
)
})
};
let res = wait(&self.canceller, future); let res = wait(&self.canceller, future, timeout);
match res { match res {
Ok(r) => { Ok(r) => {
gst::debug!(CAT, imp: self, "Response to DELETE : {}", r.status()); gst::debug!(CAT, imp: self, "Response to DELETE : {}", r.status());