mirror of
https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs.git
synced 2025-01-08 18:25:30 +00:00
rusoto: s3sink: Expose property to control all timeout/retry durations
Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/690>
This commit is contained in:
parent
724c6d6e32
commit
026b0f9260
3 changed files with 101 additions and 47 deletions
|
@ -29,18 +29,17 @@ use std::sync::Mutex;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
use crate::s3url::*;
|
use crate::s3url::*;
|
||||||
use crate::s3utils::{self, RetriableError, WaitError};
|
use crate::s3utils::{self, duration_from_millis, duration_to_millis, RetriableError, WaitError};
|
||||||
|
|
||||||
use super::OnError;
|
use super::OnError;
|
||||||
|
|
||||||
const DEFAULT_MULTIPART_UPLOAD_ON_ERROR: OnError = OnError::DoNothing;
|
const DEFAULT_MULTIPART_UPLOAD_ON_ERROR: OnError = OnError::DoNothing;
|
||||||
// This needs to be independently configurable, as the part size can be upto 5GB
|
|
||||||
const DEFAULT_UPLOAD_PART_REQUEST_TIMEOUT_MSEC: u64 = 10_000;
|
|
||||||
const DEFAULT_UPLOAD_PART_RETRY_DURATION_MSEC: u64 = 60_000;
|
|
||||||
// The rest of these aren't exposed as properties yet
|
|
||||||
// General setting for create / abort requests
|
// General setting for create / abort requests
|
||||||
const DEFAULT_REQUEST_TIMEOUT_MSEC: u64 = 10_000;
|
const DEFAULT_REQUEST_TIMEOUT_MSEC: u64 = 10_000;
|
||||||
const DEFAULT_RETRY_DURATION_MSEC: u64 = 60_000;
|
const DEFAULT_RETRY_DURATION_MSEC: u64 = 60_000;
|
||||||
|
// This needs to be independently configurable, as the part size can be upto 5GB
|
||||||
|
const DEFAULT_UPLOAD_PART_REQUEST_TIMEOUT_MSEC: u64 = 10_000;
|
||||||
|
const DEFAULT_UPLOAD_PART_RETRY_DURATION_MSEC: u64 = 60_000;
|
||||||
// CompletedMultipartUpload can take minutes to complete, so we need a longer value here
|
// CompletedMultipartUpload can take minutes to complete, so we need a longer value here
|
||||||
// https://docs.aws.amazon.com/AmazonS3/latest/API/API_CompleteMultipartUpload.html
|
// https://docs.aws.amazon.com/AmazonS3/latest/API/API_CompleteMultipartUpload.html
|
||||||
const DEFAULT_COMPLETE_REQUEST_TIMEOUT_MSEC: u64 = 600_000; // 10 minutes
|
const DEFAULT_COMPLETE_REQUEST_TIMEOUT_MSEC: u64 = 600_000; // 10 minutes
|
||||||
|
@ -107,8 +106,12 @@ struct Settings {
|
||||||
secret_access_key: Option<String>,
|
secret_access_key: Option<String>,
|
||||||
metadata: Option<gst::Structure>,
|
metadata: Option<gst::Structure>,
|
||||||
multipart_upload_on_error: OnError,
|
multipart_upload_on_error: OnError,
|
||||||
|
request_timeout: Option<Duration>,
|
||||||
|
retry_duration: Option<Duration>,
|
||||||
upload_part_request_timeout: Option<Duration>,
|
upload_part_request_timeout: Option<Duration>,
|
||||||
upload_part_retry_duration: Option<Duration>,
|
upload_part_retry_duration: Option<Duration>,
|
||||||
|
complete_upload_request_timeout: Option<Duration>,
|
||||||
|
complete_upload_retry_duration: Option<Duration>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Settings {
|
impl Settings {
|
||||||
|
@ -177,12 +180,20 @@ impl Default for Settings {
|
||||||
secret_access_key: None,
|
secret_access_key: None,
|
||||||
metadata: None,
|
metadata: None,
|
||||||
multipart_upload_on_error: DEFAULT_MULTIPART_UPLOAD_ON_ERROR,
|
multipart_upload_on_error: DEFAULT_MULTIPART_UPLOAD_ON_ERROR,
|
||||||
|
request_timeout: Some(Duration::from_millis(DEFAULT_REQUEST_TIMEOUT_MSEC)),
|
||||||
|
retry_duration: Some(Duration::from_millis(DEFAULT_RETRY_DURATION_MSEC)),
|
||||||
upload_part_request_timeout: Some(Duration::from_millis(
|
upload_part_request_timeout: Some(Duration::from_millis(
|
||||||
DEFAULT_UPLOAD_PART_REQUEST_TIMEOUT_MSEC,
|
DEFAULT_UPLOAD_PART_REQUEST_TIMEOUT_MSEC,
|
||||||
)),
|
)),
|
||||||
upload_part_retry_duration: Some(Duration::from_millis(
|
upload_part_retry_duration: Some(Duration::from_millis(
|
||||||
DEFAULT_UPLOAD_PART_RETRY_DURATION_MSEC,
|
DEFAULT_UPLOAD_PART_RETRY_DURATION_MSEC,
|
||||||
)),
|
)),
|
||||||
|
complete_upload_request_timeout: Some(Duration::from_millis(
|
||||||
|
DEFAULT_COMPLETE_REQUEST_TIMEOUT_MSEC,
|
||||||
|
)),
|
||||||
|
complete_upload_retry_duration: Some(Duration::from_millis(
|
||||||
|
DEFAULT_COMPLETE_RETRY_DURATION_MSEC,
|
||||||
|
)),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -700,6 +711,24 @@ impl ObjectImpl for S3Sink {
|
||||||
DEFAULT_MULTIPART_UPLOAD_ON_ERROR as i32,
|
DEFAULT_MULTIPART_UPLOAD_ON_ERROR as i32,
|
||||||
glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY,
|
glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY,
|
||||||
),
|
),
|
||||||
|
glib::ParamSpecInt64::new(
|
||||||
|
"request-timeout",
|
||||||
|
"Request timeout",
|
||||||
|
"Timeout for general S3 requests (in ms, set to -1 for infinity)",
|
||||||
|
-1,
|
||||||
|
std::i64::MAX,
|
||||||
|
DEFAULT_REQUEST_TIMEOUT_MSEC as i64,
|
||||||
|
glib::ParamFlags::READWRITE,
|
||||||
|
),
|
||||||
|
glib::ParamSpecInt64::new(
|
||||||
|
"retry-duration",
|
||||||
|
"Retry duration",
|
||||||
|
"How long we should retry general S3 requests before giving up (in ms, set to -1 for infinity)",
|
||||||
|
-1,
|
||||||
|
std::i64::MAX,
|
||||||
|
DEFAULT_RETRY_DURATION_MSEC as i64,
|
||||||
|
glib::ParamFlags::READWRITE,
|
||||||
|
),
|
||||||
glib::ParamSpecInt64::new(
|
glib::ParamSpecInt64::new(
|
||||||
"upload-part-request-timeout",
|
"upload-part-request-timeout",
|
||||||
"Upload part request timeout",
|
"Upload part request timeout",
|
||||||
|
@ -718,6 +747,24 @@ impl ObjectImpl for S3Sink {
|
||||||
DEFAULT_UPLOAD_PART_RETRY_DURATION_MSEC as i64,
|
DEFAULT_UPLOAD_PART_RETRY_DURATION_MSEC as i64,
|
||||||
glib::ParamFlags::READWRITE,
|
glib::ParamFlags::READWRITE,
|
||||||
),
|
),
|
||||||
|
glib::ParamSpecInt64::new(
|
||||||
|
"complete-upload-request-timeout",
|
||||||
|
"Complete upload request timeout",
|
||||||
|
"Timeout for the complete multipart upload request (in ms, set to -1 for infinity)",
|
||||||
|
-1,
|
||||||
|
std::i64::MAX,
|
||||||
|
DEFAULT_COMPLETE_REQUEST_TIMEOUT_MSEC as i64,
|
||||||
|
glib::ParamFlags::READWRITE,
|
||||||
|
),
|
||||||
|
glib::ParamSpecInt64::new(
|
||||||
|
"complete-upload-retry-duration",
|
||||||
|
"Complete upload retry duration",
|
||||||
|
"How long we should retry complete multipart upload requests before giving up (in ms, set to -1 for infinity)",
|
||||||
|
-1,
|
||||||
|
std::i64::MAX,
|
||||||
|
DEFAULT_COMPLETE_RETRY_DURATION_MSEC as i64,
|
||||||
|
glib::ParamFlags::READWRITE,
|
||||||
|
),
|
||||||
]
|
]
|
||||||
});
|
});
|
||||||
|
|
||||||
|
@ -794,19 +841,29 @@ impl ObjectImpl for S3Sink {
|
||||||
settings.multipart_upload_on_error =
|
settings.multipart_upload_on_error =
|
||||||
value.get::<OnError>().expect("type checked upstream");
|
value.get::<OnError>().expect("type checked upstream");
|
||||||
}
|
}
|
||||||
|
"request-timeout" => {
|
||||||
|
settings.request_timeout =
|
||||||
|
duration_from_millis(value.get::<i64>().expect("type checked upstream"));
|
||||||
|
}
|
||||||
|
"retry-duration" => {
|
||||||
|
settings.retry_duration =
|
||||||
|
duration_from_millis(value.get::<i64>().expect("type checked upstream"));
|
||||||
|
}
|
||||||
"upload-part-request-timeout" => {
|
"upload-part-request-timeout" => {
|
||||||
settings.upload_part_request_timeout =
|
settings.upload_part_request_timeout =
|
||||||
match value.get::<i64>().expect("type checked upstream") {
|
duration_from_millis(value.get::<i64>().expect("type checked upstream"));
|
||||||
-1 => None,
|
|
||||||
v => Some(Duration::from_millis(v as u64)),
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
"upload-part-retry-duration" => {
|
"upload-part-retry-duration" => {
|
||||||
settings.upload_part_retry_duration =
|
settings.upload_part_retry_duration =
|
||||||
match value.get::<i64>().expect("type checked upstream") {
|
duration_from_millis(value.get::<i64>().expect("type checked upstream"));
|
||||||
-1 => None,
|
}
|
||||||
v => Some(Duration::from_millis(v as u64)),
|
"complete-upload-request-timeout" => {
|
||||||
}
|
settings.complete_upload_request_timeout =
|
||||||
|
duration_from_millis(value.get::<i64>().expect("type checked upstream"));
|
||||||
|
}
|
||||||
|
"complete-upload-retry-duration" => {
|
||||||
|
settings.complete_upload_retry_duration =
|
||||||
|
duration_from_millis(value.get::<i64>().expect("type checked upstream"));
|
||||||
}
|
}
|
||||||
_ => unimplemented!(),
|
_ => unimplemented!(),
|
||||||
}
|
}
|
||||||
|
@ -832,19 +889,19 @@ impl ObjectImpl for S3Sink {
|
||||||
"secret-access-key" => settings.secret_access_key.to_value(),
|
"secret-access-key" => settings.secret_access_key.to_value(),
|
||||||
"metadata" => settings.metadata.to_value(),
|
"metadata" => settings.metadata.to_value(),
|
||||||
"on-error" => settings.multipart_upload_on_error.to_value(),
|
"on-error" => settings.multipart_upload_on_error.to_value(),
|
||||||
|
"request-timeout" => duration_to_millis(settings.request_timeout).to_value(),
|
||||||
|
"retry-duration" => duration_to_millis(settings.retry_duration).to_value(),
|
||||||
"upload-part-request-timeout" => {
|
"upload-part-request-timeout" => {
|
||||||
let timeout: i64 = match settings.upload_part_request_timeout {
|
duration_to_millis(settings.upload_part_request_timeout).to_value()
|
||||||
None => -1,
|
|
||||||
Some(v) => v.as_millis() as i64,
|
|
||||||
};
|
|
||||||
timeout.to_value()
|
|
||||||
}
|
}
|
||||||
"upload-part-retry-duration" => {
|
"upload-part-retry-duration" => {
|
||||||
let timeout: i64 = match settings.upload_part_retry_duration {
|
duration_to_millis(settings.upload_part_retry_duration).to_value()
|
||||||
None => -1,
|
}
|
||||||
Some(v) => v.as_millis() as i64,
|
"complete-upload-request-timeout" => {
|
||||||
};
|
duration_to_millis(settings.complete_upload_request_timeout).to_value()
|
||||||
timeout.to_value()
|
}
|
||||||
|
"complete-upload-retry-duration" => {
|
||||||
|
duration_to_millis(settings.complete_upload_retry_duration).to_value()
|
||||||
}
|
}
|
||||||
_ => unimplemented!(),
|
_ => unimplemented!(),
|
||||||
}
|
}
|
||||||
|
|
|
@ -28,7 +28,7 @@ use gst_base::subclass::base_src::CreateSuccess;
|
||||||
use gst_base::subclass::prelude::*;
|
use gst_base::subclass::prelude::*;
|
||||||
|
|
||||||
use crate::s3url::*;
|
use crate::s3url::*;
|
||||||
use crate::s3utils::{self, RetriableError, WaitError};
|
use crate::s3utils::{self, duration_from_millis, duration_to_millis, RetriableError, WaitError};
|
||||||
|
|
||||||
const DEFAULT_REQUEST_TIMEOUT_MSEC: u64 = 10_000;
|
const DEFAULT_REQUEST_TIMEOUT_MSEC: u64 = 10_000;
|
||||||
const DEFAULT_RETRY_DURATION_MSEC: u64 = 60_000;
|
const DEFAULT_RETRY_DURATION_MSEC: u64 = 60_000;
|
||||||
|
@ -336,18 +336,13 @@ impl ObjectImpl for S3Src {
|
||||||
}
|
}
|
||||||
"request-timeout" => {
|
"request-timeout" => {
|
||||||
let mut settings = self.settings.lock().unwrap();
|
let mut settings = self.settings.lock().unwrap();
|
||||||
settings.request_timeout = match value.get::<i64>().expect("type checked upstream")
|
settings.request_timeout =
|
||||||
{
|
duration_from_millis(value.get::<i64>().expect("type checked upstream"));
|
||||||
-1 => None,
|
|
||||||
v => Some(Duration::from_millis(v as u64)),
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
"retry-duration" => {
|
"retry-duration" => {
|
||||||
let mut settings = self.settings.lock().unwrap();
|
let mut settings = self.settings.lock().unwrap();
|
||||||
settings.retry_duration = match value.get::<i64>().expect("type checked upstream") {
|
settings.retry_duration =
|
||||||
-1 => None,
|
duration_from_millis(value.get::<i64>().expect("type checked upstream"));
|
||||||
v => Some(Duration::from_millis(v as u64)),
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
_ => unimplemented!(),
|
_ => unimplemented!(),
|
||||||
}
|
}
|
||||||
|
@ -367,20 +362,8 @@ impl ObjectImpl for S3Src {
|
||||||
}
|
}
|
||||||
"access-key" => settings.access_key.to_value(),
|
"access-key" => settings.access_key.to_value(),
|
||||||
"secret-access-key" => settings.secret_access_key.to_value(),
|
"secret-access-key" => settings.secret_access_key.to_value(),
|
||||||
"request-timeout" => {
|
"request-timeout" => duration_to_millis(settings.request_timeout).to_value(),
|
||||||
let timeout: i64 = match settings.request_timeout {
|
"retry-duration" => duration_to_millis(settings.retry_duration).to_value(),
|
||||||
None => -1,
|
|
||||||
Some(v) => v.as_millis() as i64,
|
|
||||||
};
|
|
||||||
timeout.to_value()
|
|
||||||
}
|
|
||||||
"retry-duration" => {
|
|
||||||
let timeout: i64 = match settings.retry_duration {
|
|
||||||
None => -1,
|
|
||||||
Some(v) => v.as_millis() as i64,
|
|
||||||
};
|
|
||||||
timeout.to_value()
|
|
||||||
}
|
|
||||||
_ => unimplemented!(),
|
_ => unimplemented!(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -165,3 +165,17 @@ where
|
||||||
|
|
||||||
res
|
res
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn duration_from_millis(millis: i64) -> Option<Duration> {
|
||||||
|
match millis {
|
||||||
|
-1 => None,
|
||||||
|
v => Some(Duration::from_millis(v as u64)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn duration_to_millis(dur: Option<Duration>) -> i64 {
|
||||||
|
match dur {
|
||||||
|
None => -1,
|
||||||
|
Some(d) => d.as_millis() as i64,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in a new issue