diff --git a/net/rusoto/src/s3sink/imp.rs b/net/rusoto/src/s3sink/imp.rs index 67e311ba..5b2f3f5f 100644 --- a/net/rusoto/src/s3sink/imp.rs +++ b/net/rusoto/src/s3sink/imp.rs @@ -29,18 +29,17 @@ use std::sync::Mutex; use std::time::Duration; use crate::s3url::*; -use crate::s3utils::{self, RetriableError, WaitError}; +use crate::s3utils::{self, duration_from_millis, duration_to_millis, RetriableError, WaitError}; use super::OnError; const DEFAULT_MULTIPART_UPLOAD_ON_ERROR: OnError = OnError::DoNothing; -// This needs to be independently configurable, as the part size can be upto 5GB -const DEFAULT_UPLOAD_PART_REQUEST_TIMEOUT_MSEC: u64 = 10_000; -const DEFAULT_UPLOAD_PART_RETRY_DURATION_MSEC: u64 = 60_000; -// The rest of these aren't exposed as properties yet // General setting for create / abort requests const DEFAULT_REQUEST_TIMEOUT_MSEC: u64 = 10_000; const DEFAULT_RETRY_DURATION_MSEC: u64 = 60_000; +// This needs to be independently configurable, as the part size can be upto 5GB +const DEFAULT_UPLOAD_PART_REQUEST_TIMEOUT_MSEC: u64 = 10_000; +const DEFAULT_UPLOAD_PART_RETRY_DURATION_MSEC: u64 = 60_000; // CompletedMultipartUpload can take minutes to complete, so we need a longer value here // https://docs.aws.amazon.com/AmazonS3/latest/API/API_CompleteMultipartUpload.html const DEFAULT_COMPLETE_REQUEST_TIMEOUT_MSEC: u64 = 600_000; // 10 minutes @@ -107,8 +106,12 @@ struct Settings { secret_access_key: Option, metadata: Option, multipart_upload_on_error: OnError, + request_timeout: Option, + retry_duration: Option, upload_part_request_timeout: Option, upload_part_retry_duration: Option, + complete_upload_request_timeout: Option, + complete_upload_retry_duration: Option, } impl Settings { @@ -177,12 +180,20 @@ impl Default for Settings { secret_access_key: None, metadata: None, multipart_upload_on_error: DEFAULT_MULTIPART_UPLOAD_ON_ERROR, + request_timeout: Some(Duration::from_millis(DEFAULT_REQUEST_TIMEOUT_MSEC)), + retry_duration: Some(Duration::from_millis(DEFAULT_RETRY_DURATION_MSEC)), upload_part_request_timeout: Some(Duration::from_millis( DEFAULT_UPLOAD_PART_REQUEST_TIMEOUT_MSEC, )), upload_part_retry_duration: Some(Duration::from_millis( DEFAULT_UPLOAD_PART_RETRY_DURATION_MSEC, )), + complete_upload_request_timeout: Some(Duration::from_millis( + DEFAULT_COMPLETE_REQUEST_TIMEOUT_MSEC, + )), + complete_upload_retry_duration: Some(Duration::from_millis( + DEFAULT_COMPLETE_RETRY_DURATION_MSEC, + )), } } } @@ -700,6 +711,24 @@ impl ObjectImpl for S3Sink { DEFAULT_MULTIPART_UPLOAD_ON_ERROR as i32, glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY, ), + glib::ParamSpecInt64::new( + "request-timeout", + "Request timeout", + "Timeout for general S3 requests (in ms, set to -1 for infinity)", + -1, + std::i64::MAX, + DEFAULT_REQUEST_TIMEOUT_MSEC as i64, + glib::ParamFlags::READWRITE, + ), + glib::ParamSpecInt64::new( + "retry-duration", + "Retry duration", + "How long we should retry general S3 requests before giving up (in ms, set to -1 for infinity)", + -1, + std::i64::MAX, + DEFAULT_RETRY_DURATION_MSEC as i64, + glib::ParamFlags::READWRITE, + ), glib::ParamSpecInt64::new( "upload-part-request-timeout", "Upload part request timeout", @@ -718,6 +747,24 @@ impl ObjectImpl for S3Sink { DEFAULT_UPLOAD_PART_RETRY_DURATION_MSEC as i64, glib::ParamFlags::READWRITE, ), + glib::ParamSpecInt64::new( + "complete-upload-request-timeout", + "Complete upload request timeout", + "Timeout for the complete multipart upload request (in ms, set to -1 for infinity)", + -1, + std::i64::MAX, + DEFAULT_COMPLETE_REQUEST_TIMEOUT_MSEC as i64, + glib::ParamFlags::READWRITE, + ), + glib::ParamSpecInt64::new( + "complete-upload-retry-duration", + "Complete upload retry duration", + "How long we should retry complete multipart upload requests before giving up (in ms, set to -1 for infinity)", + -1, + std::i64::MAX, + DEFAULT_COMPLETE_RETRY_DURATION_MSEC as i64, + glib::ParamFlags::READWRITE, + ), ] }); @@ -794,19 +841,29 @@ impl ObjectImpl for S3Sink { settings.multipart_upload_on_error = value.get::().expect("type checked upstream"); } + "request-timeout" => { + settings.request_timeout = + duration_from_millis(value.get::().expect("type checked upstream")); + } + "retry-duration" => { + settings.retry_duration = + duration_from_millis(value.get::().expect("type checked upstream")); + } "upload-part-request-timeout" => { settings.upload_part_request_timeout = - match value.get::().expect("type checked upstream") { - -1 => None, - v => Some(Duration::from_millis(v as u64)), - } + duration_from_millis(value.get::().expect("type checked upstream")); } "upload-part-retry-duration" => { settings.upload_part_retry_duration = - match value.get::().expect("type checked upstream") { - -1 => None, - v => Some(Duration::from_millis(v as u64)), - } + duration_from_millis(value.get::().expect("type checked upstream")); + } + "complete-upload-request-timeout" => { + settings.complete_upload_request_timeout = + duration_from_millis(value.get::().expect("type checked upstream")); + } + "complete-upload-retry-duration" => { + settings.complete_upload_retry_duration = + duration_from_millis(value.get::().expect("type checked upstream")); } _ => unimplemented!(), } @@ -832,19 +889,19 @@ impl ObjectImpl for S3Sink { "secret-access-key" => settings.secret_access_key.to_value(), "metadata" => settings.metadata.to_value(), "on-error" => settings.multipart_upload_on_error.to_value(), + "request-timeout" => duration_to_millis(settings.request_timeout).to_value(), + "retry-duration" => duration_to_millis(settings.retry_duration).to_value(), "upload-part-request-timeout" => { - let timeout: i64 = match settings.upload_part_request_timeout { - None => -1, - Some(v) => v.as_millis() as i64, - }; - timeout.to_value() + duration_to_millis(settings.upload_part_request_timeout).to_value() } "upload-part-retry-duration" => { - let timeout: i64 = match settings.upload_part_retry_duration { - None => -1, - Some(v) => v.as_millis() as i64, - }; - timeout.to_value() + duration_to_millis(settings.upload_part_retry_duration).to_value() + } + "complete-upload-request-timeout" => { + duration_to_millis(settings.complete_upload_request_timeout).to_value() + } + "complete-upload-retry-duration" => { + duration_to_millis(settings.complete_upload_retry_duration).to_value() } _ => unimplemented!(), } diff --git a/net/rusoto/src/s3src/imp.rs b/net/rusoto/src/s3src/imp.rs index fc7f99ac..70d020a8 100644 --- a/net/rusoto/src/s3src/imp.rs +++ b/net/rusoto/src/s3src/imp.rs @@ -28,7 +28,7 @@ use gst_base::subclass::base_src::CreateSuccess; use gst_base::subclass::prelude::*; use crate::s3url::*; -use crate::s3utils::{self, RetriableError, WaitError}; +use crate::s3utils::{self, duration_from_millis, duration_to_millis, RetriableError, WaitError}; const DEFAULT_REQUEST_TIMEOUT_MSEC: u64 = 10_000; const DEFAULT_RETRY_DURATION_MSEC: u64 = 60_000; @@ -336,18 +336,13 @@ impl ObjectImpl for S3Src { } "request-timeout" => { let mut settings = self.settings.lock().unwrap(); - settings.request_timeout = match value.get::().expect("type checked upstream") - { - -1 => None, - v => Some(Duration::from_millis(v as u64)), - } + settings.request_timeout = + duration_from_millis(value.get::().expect("type checked upstream")); } "retry-duration" => { let mut settings = self.settings.lock().unwrap(); - settings.retry_duration = match value.get::().expect("type checked upstream") { - -1 => None, - v => Some(Duration::from_millis(v as u64)), - } + settings.retry_duration = + duration_from_millis(value.get::().expect("type checked upstream")); } _ => unimplemented!(), } @@ -367,20 +362,8 @@ impl ObjectImpl for S3Src { } "access-key" => settings.access_key.to_value(), "secret-access-key" => settings.secret_access_key.to_value(), - "request-timeout" => { - let timeout: i64 = match settings.request_timeout { - None => -1, - Some(v) => v.as_millis() as i64, - }; - timeout.to_value() - } - "retry-duration" => { - let timeout: i64 = match settings.retry_duration { - None => -1, - Some(v) => v.as_millis() as i64, - }; - timeout.to_value() - } + "request-timeout" => duration_to_millis(settings.request_timeout).to_value(), + "retry-duration" => duration_to_millis(settings.retry_duration).to_value(), _ => unimplemented!(), } } diff --git a/net/rusoto/src/s3utils.rs b/net/rusoto/src/s3utils.rs index d11e0ddd..2369d084 100644 --- a/net/rusoto/src/s3utils.rs +++ b/net/rusoto/src/s3utils.rs @@ -165,3 +165,17 @@ where res } + +pub fn duration_from_millis(millis: i64) -> Option { + match millis { + -1 => None, + v => Some(Duration::from_millis(v as u64)), + } +} + +pub fn duration_to_millis(dur: Option) -> i64 { + match dur { + None => -1, + Some(d) => d.as_millis() as i64, + } +}