rusoto: s3sink: Expose property to control all timeout/retry durations

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/690>
This commit is contained in:
Arun Raghavan 2022-03-18 15:40:18 +05:30
parent 94c5cbbfb8
commit 09a697faef
3 changed files with 101 additions and 47 deletions

View file

@ -28,18 +28,17 @@ use std::sync::Mutex;
use std::time::Duration;
use crate::s3url::*;
use crate::s3utils::{self, RetriableError, WaitError};
use crate::s3utils::{self, duration_from_millis, duration_to_millis, RetriableError, WaitError};
use super::OnError;
const DEFAULT_MULTIPART_UPLOAD_ON_ERROR: OnError = OnError::DoNothing;
// This needs to be independently configurable, as the part size can be upto 5GB
const DEFAULT_UPLOAD_PART_REQUEST_TIMEOUT_MSEC: u64 = 10_000;
const DEFAULT_UPLOAD_PART_RETRY_DURATION_MSEC: u64 = 60_000;
// The rest of these aren't exposed as properties yet
// General setting for create / abort requests
const DEFAULT_REQUEST_TIMEOUT_MSEC: u64 = 10_000;
const DEFAULT_RETRY_DURATION_MSEC: u64 = 60_000;
// This needs to be independently configurable, as the part size can be upto 5GB
const DEFAULT_UPLOAD_PART_REQUEST_TIMEOUT_MSEC: u64 = 10_000;
const DEFAULT_UPLOAD_PART_RETRY_DURATION_MSEC: u64 = 60_000;
// CompletedMultipartUpload can take minutes to complete, so we need a longer value here
// https://docs.aws.amazon.com/AmazonS3/latest/API/API_CompleteMultipartUpload.html
const DEFAULT_COMPLETE_REQUEST_TIMEOUT_MSEC: u64 = 600_000; // 10 minutes
@ -106,8 +105,12 @@ struct Settings {
secret_access_key: Option<String>,
metadata: Option<gst::Structure>,
multipart_upload_on_error: OnError,
request_timeout: Option<Duration>,
retry_duration: Option<Duration>,
upload_part_request_timeout: Option<Duration>,
upload_part_retry_duration: Option<Duration>,
complete_upload_request_timeout: Option<Duration>,
complete_upload_retry_duration: Option<Duration>,
}
impl Settings {
@ -176,12 +179,20 @@ impl Default for Settings {
secret_access_key: None,
metadata: None,
multipart_upload_on_error: DEFAULT_MULTIPART_UPLOAD_ON_ERROR,
request_timeout: Some(Duration::from_millis(DEFAULT_REQUEST_TIMEOUT_MSEC)),
retry_duration: Some(Duration::from_millis(DEFAULT_RETRY_DURATION_MSEC)),
upload_part_request_timeout: Some(Duration::from_millis(
DEFAULT_UPLOAD_PART_REQUEST_TIMEOUT_MSEC,
)),
upload_part_retry_duration: Some(Duration::from_millis(
DEFAULT_UPLOAD_PART_RETRY_DURATION_MSEC,
)),
complete_upload_request_timeout: Some(Duration::from_millis(
DEFAULT_COMPLETE_REQUEST_TIMEOUT_MSEC,
)),
complete_upload_retry_duration: Some(Duration::from_millis(
DEFAULT_COMPLETE_RETRY_DURATION_MSEC,
)),
}
}
}
@ -699,6 +710,24 @@ impl ObjectImpl for S3Sink {
DEFAULT_MULTIPART_UPLOAD_ON_ERROR as i32,
glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY,
),
glib::ParamSpecInt64::new(
"request-timeout",
"Request timeout",
"Timeout for general S3 requests (in ms, set to -1 for infinity)",
-1,
std::i64::MAX,
DEFAULT_REQUEST_TIMEOUT_MSEC as i64,
glib::ParamFlags::READWRITE,
),
glib::ParamSpecInt64::new(
"retry-duration",
"Retry duration",
"How long we should retry general S3 requests before giving up (in ms, set to -1 for infinity)",
-1,
std::i64::MAX,
DEFAULT_RETRY_DURATION_MSEC as i64,
glib::ParamFlags::READWRITE,
),
glib::ParamSpecInt64::new(
"upload-part-request-timeout",
"Upload part request timeout",
@ -717,6 +746,24 @@ impl ObjectImpl for S3Sink {
DEFAULT_UPLOAD_PART_RETRY_DURATION_MSEC as i64,
glib::ParamFlags::READWRITE,
),
glib::ParamSpecInt64::new(
"complete-upload-request-timeout",
"Complete upload request timeout",
"Timeout for the complete multipart upload request (in ms, set to -1 for infinity)",
-1,
std::i64::MAX,
DEFAULT_COMPLETE_REQUEST_TIMEOUT_MSEC as i64,
glib::ParamFlags::READWRITE,
),
glib::ParamSpecInt64::new(
"complete-upload-retry-duration",
"Complete upload retry duration",
"How long we should retry complete multipart upload requests before giving up (in ms, set to -1 for infinity)",
-1,
std::i64::MAX,
DEFAULT_COMPLETE_RETRY_DURATION_MSEC as i64,
glib::ParamFlags::READWRITE,
),
]
});
@ -793,19 +840,29 @@ impl ObjectImpl for S3Sink {
settings.multipart_upload_on_error =
value.get::<OnError>().expect("type checked upstream");
}
"request-timeout" => {
settings.request_timeout =
duration_from_millis(value.get::<i64>().expect("type checked upstream"));
}
"retry-duration" => {
settings.retry_duration =
duration_from_millis(value.get::<i64>().expect("type checked upstream"));
}
"upload-part-request-timeout" => {
settings.upload_part_request_timeout =
match value.get::<i64>().expect("type checked upstream") {
-1 => None,
v => Some(Duration::from_millis(v as u64)),
}
duration_from_millis(value.get::<i64>().expect("type checked upstream"));
}
"upload-part-retry-duration" => {
settings.upload_part_retry_duration =
match value.get::<i64>().expect("type checked upstream") {
-1 => None,
v => Some(Duration::from_millis(v as u64)),
}
duration_from_millis(value.get::<i64>().expect("type checked upstream"));
}
"complete-upload-request-timeout" => {
settings.complete_upload_request_timeout =
duration_from_millis(value.get::<i64>().expect("type checked upstream"));
}
"complete-upload-retry-duration" => {
settings.complete_upload_retry_duration =
duration_from_millis(value.get::<i64>().expect("type checked upstream"));
}
_ => unimplemented!(),
}
@ -831,19 +888,19 @@ impl ObjectImpl for S3Sink {
"secret-access-key" => settings.secret_access_key.to_value(),
"metadata" => settings.metadata.to_value(),
"on-error" => settings.multipart_upload_on_error.to_value(),
"request-timeout" => duration_to_millis(settings.request_timeout).to_value(),
"retry-duration" => duration_to_millis(settings.retry_duration).to_value(),
"upload-part-request-timeout" => {
let timeout: i64 = match settings.upload_part_request_timeout {
None => -1,
Some(v) => v.as_millis() as i64,
};
timeout.to_value()
duration_to_millis(settings.upload_part_request_timeout).to_value()
}
"upload-part-retry-duration" => {
let timeout: i64 = match settings.upload_part_retry_duration {
None => -1,
Some(v) => v.as_millis() as i64,
};
timeout.to_value()
duration_to_millis(settings.upload_part_retry_duration).to_value()
}
"complete-upload-request-timeout" => {
duration_to_millis(settings.complete_upload_request_timeout).to_value()
}
"complete-upload-retry-duration" => {
duration_to_millis(settings.complete_upload_retry_duration).to_value()
}
_ => unimplemented!(),
}

View file

@ -27,7 +27,7 @@ use gst_base::subclass::base_src::CreateSuccess;
use gst_base::subclass::prelude::*;
use crate::s3url::*;
use crate::s3utils::{self, RetriableError, WaitError};
use crate::s3utils::{self, duration_from_millis, duration_to_millis, RetriableError, WaitError};
const DEFAULT_REQUEST_TIMEOUT_MSEC: u64 = 10_000;
const DEFAULT_RETRY_DURATION_MSEC: u64 = 60_000;
@ -335,18 +335,13 @@ impl ObjectImpl for S3Src {
}
"request-timeout" => {
let mut settings = self.settings.lock().unwrap();
settings.request_timeout = match value.get::<i64>().expect("type checked upstream")
{
-1 => None,
v => Some(Duration::from_millis(v as u64)),
}
settings.request_timeout =
duration_from_millis(value.get::<i64>().expect("type checked upstream"));
}
"retry-duration" => {
let mut settings = self.settings.lock().unwrap();
settings.retry_duration = match value.get::<i64>().expect("type checked upstream") {
-1 => None,
v => Some(Duration::from_millis(v as u64)),
}
settings.retry_duration =
duration_from_millis(value.get::<i64>().expect("type checked upstream"));
}
_ => unimplemented!(),
}
@ -366,20 +361,8 @@ impl ObjectImpl for S3Src {
}
"access-key" => settings.access_key.to_value(),
"secret-access-key" => settings.secret_access_key.to_value(),
"request-timeout" => {
let timeout: i64 = match settings.request_timeout {
None => -1,
Some(v) => v.as_millis() as i64,
};
timeout.to_value()
}
"retry-duration" => {
let timeout: i64 = match settings.retry_duration {
None => -1,
Some(v) => v.as_millis() as i64,
};
timeout.to_value()
}
"request-timeout" => duration_to_millis(settings.request_timeout).to_value(),
"retry-duration" => duration_to_millis(settings.retry_duration).to_value(),
_ => unimplemented!(),
}
}

View file

@ -163,3 +163,17 @@ where
res
}
pub fn duration_from_millis(millis: i64) -> Option<Duration> {
match millis {
-1 => None,
v => Some(Duration::from_millis(v as u64)),
}
}
pub fn duration_to_millis(dur: Option<Duration>) -> i64 {
match dur {
None => -1,
Some(d) => d.as_millis() as i64,
}
}