reqwesthttpsrc: add timeout

This commit is contained in:
ahamedsajeer.15 2019-08-09 17:07:35 +05:30 committed by Sebastian Dröge
parent 0e11ac87d3
commit 9119349c08

View file

@ -6,6 +6,7 @@
// option. This file may not be copied, modified, or distributed
// except according to those terms.
use bytes::Bytes;
use futures::future::Either;
use futures::sync::oneshot;
use futures::{Future, Stream};
use hyperx::header::{
@ -16,7 +17,9 @@ use reqwest::r#async::{Client, Decoder};
use reqwest::StatusCode;
use std::mem;
use std::sync::Mutex;
use std::time::Duration;
use std::u64;
use tokio::prelude::*;
use tokio::runtime;
use url::Url;
@ -38,6 +41,7 @@ const DEFAULT_USER_AGENT: &str = concat!(
env!("COMMIT_ID")
);
const DEFAULT_IS_LIVE: bool = false;
const DEFAULT_TIMEOUT: u32 = 15;
#[derive(Debug, Clone)]
struct Settings {
@ -45,6 +49,7 @@ struct Settings {
user_agent: String,
user_id: Option<String>,
user_pw: Option<String>,
timeout: u32,
}
impl Default for Settings {
@ -54,11 +59,12 @@ impl Default for Settings {
user_agent: DEFAULT_USER_AGENT.into(),
user_id: None,
user_pw: None,
timeout: DEFAULT_TIMEOUT,
}
}
}
static PROPERTIES: [subclass::Property; 5] = [
static PROPERTIES: [subclass::Property; 6] = [
subclass::Property("location", |name| {
glib::ParamSpec::string(
name,
@ -104,6 +110,17 @@ static PROPERTIES: [subclass::Property; 5] = [
glib::ParamFlags::READWRITE,
)
}),
subclass::Property("timeout", |name| {
glib::ParamSpec::uint(
name,
"Timeout",
"Value in seconds to timeout a blocking I/O (0 = No timeout).",
0,
3600,
DEFAULT_TIMEOUT,
glib::ParamFlags::READWRITE,
)
}),
];
#[derive(Debug)]
@ -205,8 +222,6 @@ impl ReqwestHttpSrc {
// Add all headers for the request here
let req = req.headers(headers.into());
gst_debug!(cat, obj: src, "Doing new request {:?}", req);
let req = if let Some(user_id) = settings.user_id {
// HTTP auth available
req.basic_auth(user_id, settings.user_pw)
@ -214,6 +229,8 @@ impl ReqwestHttpSrc {
req
};
gst_debug!(cat, obj: src, "Doing new request {:?}", req);
let src_clone = src.clone();
let response_fut = req.send().and_then(move |res| {
gst_debug!(cat, obj: &src_clone, "Response received: {:?}", res);
@ -311,14 +328,26 @@ impl ReqwestHttpSrc {
F: Send + Future<Error = gst::ErrorMessage> + 'static,
F::Item: Send,
{
let timeout = self.settings.lock().unwrap().timeout;
let mut canceller = self.canceller.lock().unwrap();
let (sender, receiver) = oneshot::channel::<Bytes>();
canceller.replace(sender);
// wrapping timeout around future
let future_timeout = if timeout == 0 {
Either::A(future)
} else {
Either::B(
future
.timeout(Duration::from_secs(timeout.into()))
.map_err(|_err| gst_error_msg!(gst::ResourceError::Read, ["Request timeout"])),
)
};
let unlock_error = gst_error_msg!(gst::ResourceError::Busy, ["unlock"]);
let res = oneshot::spawn(future, &self.runtime.executor())
let res = oneshot::spawn(future_timeout, &self.runtime.executor())
.select(receiver.then(|_| Err(unlock_error.clone())))
.wait()
.map(|v| v.0)
@ -379,6 +408,11 @@ impl ObjectImpl for ReqwestHttpSrc {
let user_pw = value.get().expect("type checked upstream");
settings.user_pw = user_pw;
}
subclass::Property("timeout", ..) => {
let mut settings = self.settings.lock().unwrap();
let timeout = value.get_some().expect("type checked upstream");
settings.timeout = timeout;
}
_ => unimplemented!(),
};
}
@ -408,6 +442,10 @@ impl ObjectImpl for ReqwestHttpSrc {
let settings = self.settings.lock().unwrap();
Ok(settings.user_pw.to_value())
}
subclass::Property("timeout", ..) => {
let settings = self.settings.lock().unwrap();
Ok(settings.timeout.to_value())
}
_ => unimplemented!(),
}
}