rusoto: s3src: Implement timeout and retries

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/690>
This commit is contained in:
Arun Raghavan 2022-03-16 12:40:28 +05:30
parent 191b1644d6
commit 1ad277a410

View file

@ -7,6 +7,7 @@
// SPDX-License-Identifier: MPL-2.0 // SPDX-License-Identifier: MPL-2.0
use std::sync::Mutex; use std::sync::Mutex;
use std::time::Duration;
use bytes::Bytes; use bytes::Bytes;
use futures::future; use futures::future;
@ -27,6 +28,9 @@ use gst_base::subclass::prelude::*;
use crate::s3url::*; use crate::s3url::*;
use crate::s3utils::{self, WaitError}; use crate::s3utils::{self, WaitError};
const DEFAULT_REQUEST_TIMEOUT_MSEC: u64 = 10_000;
const DEFAULT_RETRY_DURATION_MSEC: u64 = 60_000;
#[allow(clippy::large_enum_variant)] #[allow(clippy::large_enum_variant)]
enum StreamingState { enum StreamingState {
Stopped, Stopped,
@ -48,6 +52,8 @@ struct Settings {
url: Option<GstS3Url>, url: Option<GstS3Url>,
access_key: Option<String>, access_key: Option<String>,
secret_access_key: Option<String>, secret_access_key: Option<String>,
request_timeout: Option<Duration>,
retry_duration: Option<Duration>,
} }
#[derive(Default)] #[derive(Default)]
@ -130,16 +136,24 @@ impl S3Src {
client: &S3Client, client: &S3Client,
url: &GstS3Url, url: &GstS3Url,
) -> Result<u64, gst::ErrorMessage> { ) -> Result<u64, gst::ErrorMessage> {
let request = HeadObjectRequest { let settings = self.settings.lock().unwrap();
bucket: url.bucket.clone(),
key: url.object.clone(), let head_object_future = || {
version_id: url.version.clone(), client.head_object(HeadObjectRequest {
..Default::default() bucket: url.bucket.clone(),
key: url.object.clone(),
version_id: url.version.clone(),
..Default::default()
})
}; };
let response = client.head_object(request); let output = s3utils::wait_retry(
&self.canceller,
let output = s3utils::wait(&self.canceller, response).map_err(|err| match err { settings.request_timeout,
settings.retry_duration,
head_object_future,
)
.map_err(|err| match err {
WaitError::FutureError(err) => gst::error_msg!( WaitError::FutureError(err) => gst::error_msg!(
gst::ResourceError::NotFound, gst::ResourceError::NotFound,
["Failed to HEAD object: {}", err] ["Failed to HEAD object: {}", err]
@ -183,13 +197,7 @@ impl S3Src {
} }
}; };
let request = GetObjectRequest { let settings = self.settings.lock().unwrap();
bucket: url.bucket.clone(),
key: url.object.clone(),
range: Some(format!("bytes={}-{}", offset, offset + length - 1)),
version_id: url.version.clone(),
..Default::default()
};
gst_debug!( gst_debug!(
CAT, CAT,
@ -199,9 +207,23 @@ impl S3Src {
offset + length - 1 offset + length - 1
); );
let response = client.get_object(request); let get_object_future = || {
client.get_object(GetObjectRequest {
bucket: url.bucket.clone(),
key: url.object.clone(),
range: Some(format!("bytes={}-{}", offset, offset + length - 1)),
version_id: url.version.clone(),
..Default::default()
})
};
let output = s3utils::wait(&self.canceller, response).map_err(|err| match err { let output = s3utils::wait_retry(
&self.canceller,
settings.request_timeout,
settings.retry_duration,
get_object_future,
)
.map_err(|err| match err {
WaitError::FutureError(err) => Some(gst::error_msg!( WaitError::FutureError(err) => Some(gst::error_msg!(
gst::ResourceError::Read, gst::ResourceError::Read,
["Could not read: {}", err] ["Could not read: {}", err]
@ -259,6 +281,24 @@ impl ObjectImpl for S3Src {
None, None,
glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY, glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY,
), ),
glib::ParamSpecInt64::new(
"request-timeout",
"Request timeout",
"Timeout for each S3 request (in ms, set to -1 for infinity)",
-1,
std::i64::MAX,
DEFAULT_REQUEST_TIMEOUT_MSEC as i64,
glib::ParamFlags::READWRITE,
),
glib::ParamSpecInt64::new(
"retry-duration",
"Retry duration",
"How long we should retry S3 requests before giving up (in ms, set to -1 for infinity)",
-1,
std::i64::MAX,
DEFAULT_RETRY_DURATION_MSEC as i64,
glib::ParamFlags::READWRITE,
),
] ]
}); });
@ -284,6 +324,21 @@ impl ObjectImpl for S3Src {
let mut settings = self.settings.lock().unwrap(); let mut settings = self.settings.lock().unwrap();
settings.secret_access_key = value.get().expect("type checked upstream"); settings.secret_access_key = value.get().expect("type checked upstream");
} }
"request-timeout" => {
let mut settings = self.settings.lock().unwrap();
settings.request_timeout = match value.get::<i64>().expect("type checked upstream")
{
-1 => None,
v => Some(Duration::from_millis(v as u64)),
}
}
"retry-duration" => {
let mut settings = self.settings.lock().unwrap();
settings.retry_duration = match value.get::<i64>().expect("type checked upstream") {
-1 => None,
v => Some(Duration::from_millis(v as u64)),
}
}
_ => unimplemented!(), _ => unimplemented!(),
} }
} }
@ -302,6 +357,20 @@ impl ObjectImpl for S3Src {
} }
"access-key" => settings.access_key.to_value(), "access-key" => settings.access_key.to_value(),
"secret-access-key" => settings.secret_access_key.to_value(), "secret-access-key" => settings.secret_access_key.to_value(),
"request-timeout" => {
let timeout: i64 = match settings.request_timeout {
None => -1,
Some(v) => v.as_millis() as i64,
};
timeout.to_value()
}
"retry-duration" => {
let timeout: i64 = match settings.retry_duration {
None => -1,
Some(v) => v.as_millis() as i64,
};
timeout.to_value()
}
_ => unimplemented!(), _ => unimplemented!(),
} }
} }