diff --git a/net/rusoto/Cargo.toml b/net/rusoto/Cargo.toml index 8d6662a8..6a724d01 100644 --- a/net/rusoto/Cargo.toml +++ b/net/rusoto/Cargo.toml @@ -15,13 +15,19 @@ bytes = "1.0" futures = "0.3" gst = { package = "gstreamer", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" } gst-base = { package = "gstreamer-base", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" } +aws-config = "0.12.0" +aws-sdk-s3 = "0.12.0" +aws-sdk-transcribe = "0.12.0" +aws-types = "0.12.0" +aws-smithy-http = { version = "0.42.0", features = [ "rt-tokio" ] } +aws-smithy-types = "0.42.0" rusoto_core = "0.48" rusoto_s3 = "0.48" rusoto_credential = "0.48" rusoto_signature = "0.48" url = "2" percent-encoding = "2" -tokio = { version = "1.0", features = [ "rt-multi-thread", "time" ] } +tokio = { version = "1.0", features = [ "full" ] } async-tungstenite = { version = "0.17", features = ["tokio", "tokio-runtime", "tokio-native-tls"] } nom = "7" crc = "3" diff --git a/net/rusoto/src/s3sink/imp.rs b/net/rusoto/src/s3sink/imp.rs index e4cedc50..9bb34cf7 100644 --- a/net/rusoto/src/s3sink/imp.rs +++ b/net/rusoto/src/s3sink/imp.rs @@ -6,35 +6,37 @@ // // SPDX-License-Identifier: MPL-2.0 -use futures::TryFutureExt; use gst::glib; use gst::prelude::*; use gst::subclass::prelude::*; - use gst_base::subclass::prelude::*; -use futures::future; -use rusoto_core::{region::Region, request::HttpClient}; -use rusoto_credential::StaticProvider; -use rusoto_s3::{ - AbortMultipartUploadRequest, CompleteMultipartUploadRequest, CompletedMultipartUpload, - CompletedPart, CreateMultipartUploadRequest, S3Client, UploadPartRequest, S3, +use aws_sdk_s3::client::fluent_builders::{ + AbortMultipartUpload, CompleteMultipartUpload, CreateMultipartUpload, UploadPart, }; +use aws_sdk_s3::config; +use aws_sdk_s3::model::{CompletedMultipartUpload, CompletedPart}; +use aws_sdk_s3::types::ByteStream; +use aws_sdk_s3::{Client, Credentials, Region, RetryConfig}; +use futures::future; use once_cell::sync::Lazy; - use std::collections::HashMap; +use std::convert::From; use std::sync::Mutex; use std::time::Duration; use crate::s3url::*; -use crate::s3utils::{self, duration_from_millis, duration_to_millis, RetriableError, WaitError}; +use crate::s3utils::{self, duration_from_millis, duration_to_millis, WaitError}; use super::OnError; +const DEFAULT_RETRY_ATTEMPTS: u32 = 5; +const DEFAULT_BUFFER_SIZE: u64 = 5 * 1024 * 1024; const DEFAULT_MULTIPART_UPLOAD_ON_ERROR: OnError = OnError::DoNothing; + // General setting for create / abort requests -const DEFAULT_REQUEST_TIMEOUT_MSEC: u64 = 10_000; +const DEFAULT_REQUEST_TIMEOUT_MSEC: u64 = 15_000; const DEFAULT_RETRY_DURATION_MSEC: u64 = 60_000; // This needs to be independently configurable, as the part size can be upto 5GB const DEFAULT_UPLOAD_PART_REQUEST_TIMEOUT_MSEC: u64 = 10_000; @@ -45,7 +47,7 @@ const DEFAULT_COMPLETE_REQUEST_TIMEOUT_MSEC: u64 = 600_000; // 10 minutes const DEFAULT_COMPLETE_RETRY_DURATION_MSEC: u64 = 3_600_000; // 60 minutes struct Started { - client: S3Client, + client: Client, buffer: Vec, upload_id: String, part_number: i64, @@ -53,7 +55,7 @@ struct Started { } impl Started { - pub fn new(client: S3Client, buffer: Vec, upload_id: String) -> Started { + pub fn new(client: Client, buffer: Vec, upload_id: String) -> Started { Started { client, buffer, @@ -93,8 +95,6 @@ impl Default for State { } } -const DEFAULT_BUFFER_SIZE: u64 = 5 * 1024 * 1024; - struct Settings { region: Region, bucket: Option, @@ -104,40 +104,16 @@ struct Settings { access_key: Option, secret_access_key: Option, metadata: Option, + retry_attempts: u32, multipart_upload_on_error: OnError, - request_timeout: Option, - retry_duration: Option, - upload_part_request_timeout: Option, - upload_part_retry_duration: Option, - complete_upload_request_timeout: Option, - complete_upload_retry_duration: Option, + request_timeout: Duration, } impl Settings { fn to_uri(&self) -> String { format!( "s3://{}/{}/{}", - match self.region { - Region::Custom { - ref name, - ref endpoint, - } => { - format!( - "{}+{}", - base32::encode( - base32::Alphabet::RFC4648 { padding: true }, - name.as_bytes(), - ), - base32::encode( - base32::Alphabet::RFC4648 { padding: true }, - endpoint.as_bytes(), - ), - ) - } - _ => { - String::from(self.region.name()) - } - }, + self.region, self.bucket.as_ref().unwrap(), self.key.as_ref().unwrap() ) @@ -170,29 +146,17 @@ impl Settings { impl Default for Settings { fn default() -> Self { Settings { - region: Region::default(), + region: Region::new("us-west-2"), bucket: None, key: None, content_type: None, - buffer_size: DEFAULT_BUFFER_SIZE, access_key: None, secret_access_key: None, metadata: None, + buffer_size: DEFAULT_BUFFER_SIZE, + retry_attempts: DEFAULT_RETRY_ATTEMPTS, multipart_upload_on_error: DEFAULT_MULTIPART_UPLOAD_ON_ERROR, - request_timeout: Some(Duration::from_millis(DEFAULT_REQUEST_TIMEOUT_MSEC)), - retry_duration: Some(Duration::from_millis(DEFAULT_RETRY_DURATION_MSEC)), - upload_part_request_timeout: Some(Duration::from_millis( - DEFAULT_UPLOAD_PART_REQUEST_TIMEOUT_MSEC, - )), - upload_part_retry_duration: Some(Duration::from_millis( - DEFAULT_UPLOAD_PART_RETRY_DURATION_MSEC, - )), - complete_upload_request_timeout: Some(Duration::from_millis( - DEFAULT_COMPLETE_REQUEST_TIMEOUT_MSEC, - )), - complete_upload_retry_duration: Some(Duration::from_millis( - DEFAULT_COMPLETE_RETRY_DURATION_MSEC, - )), + request_timeout: Duration::from_millis(DEFAULT_REQUEST_TIMEOUT_MSEC), } } } @@ -219,8 +183,95 @@ impl S3Sink { &self, element: &super::S3Sink, ) -> Result<(), Option> { + let upload_part_req: UploadPart = self.create_upload_part_request()?; + let mut state = self.state.lock().unwrap(); + let state = match *state { + State::Started(ref mut started_state) => started_state, + State::Stopped => { + unreachable!("Element should be started"); + } + }; + + let part_number = state.part_number; + + let upload_part_req_future = upload_part_req.send(); + let output = + s3utils::wait(&self.canceller, upload_part_req_future).map_err(|err| match err { + WaitError::FutureError(err) => { + let settings = self.settings.lock().unwrap(); + match settings.multipart_upload_on_error { + OnError::Abort => { + gst::log!( + CAT, + obj: element, + "Aborting multipart upload request with id: {}", + state.upload_id + ); + match self.abort_multipart_upload_request(state) { + Ok(()) => { + gst::log!( + CAT, + obj: element, + "Aborting multipart upload request succeeded." + ); + } + Err(err) => gst::error!( + CAT, + obj: element, + "Aborting multipart upload failed: {}", + err.to_string() + ), + } + } + OnError::Complete => { + gst::log!( + CAT, + obj: element, + "Completing multipart upload request with id: {}", + state.upload_id + ); + match self.complete_multipart_upload_request(state) { + Ok(()) => { + gst::log!( + CAT, + obj: element, + "Complete multipart upload request succeeded." + ); + } + Err(err) => gst::error!( + CAT, + obj: element, + "Completing multipart upload failed: {}", + err.to_string() + ), + } + } + OnError::DoNothing => (), + } + Some(gst::error_msg!( + gst::ResourceError::OpenWrite, + ["Failed to upload part: {}", err] + )) + } + WaitError::Cancelled => None, + })?; + + let completed_part = CompletedPart::builder() + .set_e_tag(output.e_tag) + .set_part_number(Some(part_number as i32)) + .build(); + state.completed_parts.push(completed_part); + + gst::info!(CAT, obj: element, "Uploaded part {}", part_number); + + Ok(()) + } + + fn create_upload_part_request(&self) -> Result { + let url = self.url.lock().unwrap(); let settings = self.settings.lock().unwrap(); + let mut state = self.state.lock().unwrap(); let state = match *state { State::Started(ref mut started_state) => started_state, State::Stopped => { @@ -229,152 +280,90 @@ impl S3Sink { }; let part_number = state.increment_part_number()?; - let body = std::mem::replace( + let body = Some(ByteStream::from(std::mem::replace( &mut state.buffer, Vec::with_capacity(settings.buffer_size as usize), - ); - let upload_id = &state.upload_id; + ))); + + let bucket = Some(url.as_ref().unwrap().bucket.to_owned()); + let key = Some(url.as_ref().unwrap().object.to_owned()); + let upload_id = Some(state.upload_id.to_owned()); + let client = &state.client; + let upload_part = client + .upload_part() + .set_body(body) + .set_bucket(bucket) + .set_key(key) + .set_upload_id(upload_id) + .set_part_number(Some(part_number as i32)); - let upload_part_req_future = || { - client - .upload_part(self.create_upload_part_request(&body, part_number, upload_id)) - .map_err(RetriableError::Rusoto) - }; - - let output = s3utils::wait_retry( - &self.canceller, - settings.upload_part_request_timeout, - settings.upload_part_retry_duration, - upload_part_req_future, - ) - .map_err(|err| match err { - WaitError::FutureError(err) => { - match settings.multipart_upload_on_error { - OnError::Abort => { - gst::log!( - CAT, - obj: element, - "Aborting multipart upload request with id: {}", - state.upload_id - ); - match self.abort_multipart_upload_request(state) { - Ok(()) => { - gst::log!( - CAT, - obj: element, - "Aborting multipart upload request succeeded." - ); - } - Err(err) => gst::error!( - CAT, - obj: element, - "Aborting multipart upload failed: {}", - err.to_string() - ), - } - } - OnError::Complete => { - gst::log!( - CAT, - obj: element, - "Completing multipart upload request with id: {}", - state.upload_id - ); - match self.complete_multipart_upload_request(state) { - Ok(()) => { - gst::log!( - CAT, - obj: element, - "Complete multipart upload request succeeded." - ); - } - Err(err) => gst::error!( - CAT, - obj: element, - "Completing multipart upload failed: {}", - err.to_string() - ), - } - } - OnError::DoNothing => (), - } - Some(gst::error_msg!( - gst::ResourceError::OpenWrite, - ["Failed to upload part: {:?}", err] - )) - } - WaitError::Cancelled => None, - })?; - - state.completed_parts.push(CompletedPart { - e_tag: output.e_tag, - part_number: Some(part_number), - }); - gst::info!(CAT, obj: element, "Uploaded part {}", part_number); - - Ok(()) - } - - fn create_upload_part_request( - &self, - body: &[u8], - part_number: i64, - upload_id: &str, - ) -> UploadPartRequest { - let url = self.url.lock().unwrap(); - - UploadPartRequest { - body: Some(rusoto_core::ByteStream::from(body.to_owned())), - bucket: url.as_ref().unwrap().bucket.to_owned(), - key: url.as_ref().unwrap().object.to_owned(), - upload_id: upload_id.to_owned(), - part_number, - ..Default::default() - } + Ok(upload_part) } fn create_complete_multipart_upload_request( &self, - started_state: &Started, - completed_upload: CompletedMultipartUpload, - ) -> CompleteMultipartUploadRequest { + started_state: &mut Started, + ) -> CompleteMultipartUpload { + started_state + .completed_parts + .sort_by(|a, b| a.part_number.cmp(&b.part_number)); + + let parts = Some(std::mem::take(&mut started_state.completed_parts)); + + let completed_upload = CompletedMultipartUpload::builder().set_parts(parts).build(); + let url = self.url.lock().unwrap(); - CompleteMultipartUploadRequest { - bucket: url.as_ref().unwrap().bucket.to_owned(), - key: url.as_ref().unwrap().object.to_owned(), - upload_id: started_state.upload_id.to_owned(), - multipart_upload: Some(completed_upload), - ..Default::default() - } + let client = &started_state.client; + + let bucket = Some(url.as_ref().unwrap().bucket.to_owned()); + let key = Some(url.as_ref().unwrap().object.to_owned()); + let upload_id = Some(started_state.upload_id.to_owned()); + let multipart_upload = Some(completed_upload); + + client + .complete_multipart_upload() + .set_bucket(bucket) + .set_key(key) + .set_upload_id(upload_id) + .set_multipart_upload(multipart_upload) } fn create_create_multipart_upload_request( &self, + client: &Client, url: &GstS3Url, settings: &Settings, - ) -> CreateMultipartUploadRequest { - CreateMultipartUploadRequest { - bucket: url.bucket.clone(), - key: url.object.clone(), - content_type: settings.content_type.clone(), - metadata: settings.to_metadata(&self.instance()), - ..Default::default() - } + ) -> CreateMultipartUpload { + let bucket = Some(url.bucket.clone()); + let key = Some(url.object.clone()); + let content_type = settings.content_type.clone(); + let metadata = settings.to_metadata(&self.instance()); + + client + .create_multipart_upload() + .set_bucket(bucket) + .set_key(key) + .set_content_type(content_type) + .set_metadata(metadata) } fn create_abort_multipart_upload_request( &self, + client: &Client, url: &GstS3Url, started_state: &Started, - ) -> AbortMultipartUploadRequest { - AbortMultipartUploadRequest { - bucket: url.bucket.clone(), - expected_bucket_owner: None, - key: url.object.clone(), - request_payer: None, - upload_id: started_state.upload_id.to_owned(), - } + ) -> AbortMultipartUpload { + let bucket = Some(url.bucket.clone()); + let key = Some(url.object.clone()); + + client + .abort_multipart_upload() + .set_bucket(bucket) + .set_expected_bucket_owner(None) + .set_key(key) + .set_request_payer(None) + .set_upload_id(Some(started_state.upload_id.to_owned())) } fn abort_multipart_upload_request( @@ -385,74 +374,50 @@ impl S3Sink { Some(ref url) => url.clone(), None => unreachable!("Element should be started"), }; - let abort_req_future = || { - let abort_req = self.create_abort_multipart_upload_request(&s3url, started_state); - started_state - .client - .abort_multipart_upload(abort_req) - .map_err(RetriableError::Rusoto) - }; - s3utils::wait_retry( - &self.abort_multipart_canceller, - Some(Duration::from_millis(DEFAULT_REQUEST_TIMEOUT_MSEC)), - Some(Duration::from_millis(DEFAULT_RETRY_DURATION_MSEC)), - abort_req_future, - ) - .map(|_| ()) - .map_err(|err| match err { - WaitError::FutureError(err) => { - gst::error_msg!( - gst::ResourceError::Write, - ["Failed to abort multipart upload: {:?}.", err] - ) - } - WaitError::Cancelled => { - gst::error_msg!( - gst::ResourceError::Write, - ["Abort multipart upload request interrupted."] - ) - } - }) + let client = &started_state.client; + let abort_req = self.create_abort_multipart_upload_request(client, &s3url, started_state); + let abort_req_future = abort_req.send(); + + s3utils::wait(&self.abort_multipart_canceller, abort_req_future) + .map(|_| ()) + .map_err(|err| match err { + WaitError::FutureError(err) => { + gst::error_msg!( + gst::ResourceError::Write, + ["Failed to abort multipart upload: {}.", err.to_string()] + ) + } + WaitError::Cancelled => { + gst::error_msg!( + gst::ResourceError::Write, + ["Abort multipart upload request interrupted."] + ) + } + }) } fn complete_multipart_upload_request( &self, started_state: &mut Started, ) -> Result<(), gst::ErrorMessage> { - started_state - .completed_parts - .sort_by(|a, b| a.part_number.cmp(&b.part_number)); + let complete_req = self.create_complete_multipart_upload_request(started_state); + let complete_req_future = complete_req.send(); - let completed_upload = CompletedMultipartUpload { - parts: Some(std::mem::take(&mut started_state.completed_parts)), - }; - - let complete_req_future = || { - let complete_req = self - .create_complete_multipart_upload_request(started_state, completed_upload.clone()); - started_state - .client - .complete_multipart_upload(complete_req) - .map_err(RetriableError::Rusoto) - }; - - s3utils::wait_retry( - &self.canceller, - Some(Duration::from_millis(DEFAULT_COMPLETE_REQUEST_TIMEOUT_MSEC)), - Some(Duration::from_millis(DEFAULT_COMPLETE_RETRY_DURATION_MSEC)), - complete_req_future, - ) - .map(|_| ()) - .map_err(|err| match err { - WaitError::FutureError(err) => gst::error_msg!( - gst::ResourceError::Write, - ["Failed to complete multipart upload: {:?}.", err] - ), - WaitError::Cancelled => { - gst::error_msg!(gst::LibraryError::Failed, ["Interrupted during stop"]) - } - }) + s3utils::wait(&self.canceller, complete_req_future) + .map(|_| ()) + .map_err(|err| match err { + WaitError::FutureError(err) => gst::error_msg!( + gst::ResourceError::Write, + ["Failed to complete multipart upload: {}.", err.to_string()] + ), + WaitError::Cancelled => { + gst::error_msg!( + gst::LibraryError::Failed, + ["Complete multipart upload request interrupted"] + ) + } + }) } fn finalize_upload(&self, element: &super::S3Sink) -> Result<(), gst::ErrorMessage> { @@ -492,45 +457,60 @@ impl S3Sink { } }; - let client = match ( + let timeout_config = s3utils::timeout_config(settings.request_timeout); + + let cred = match ( settings.access_key.as_ref(), settings.secret_access_key.as_ref(), ) { - (Some(access_key), Some(secret_access_key)) => { - let creds = - StaticProvider::new_minimal(access_key.clone(), secret_access_key.clone()); - S3Client::new_with( - HttpClient::new().expect("failed to create request dispatcher"), - creds, - s3url.region.clone(), - ) - } - _ => S3Client::new(s3url.region.clone()), + (Some(access_key), Some(secret_access_key)) => Some(Credentials::new( + access_key.clone(), + secret_access_key.clone(), + None, + None, + "rusoto-s3-sink", + )), + _ => None, }; - let create_multipart_req_future = || { - let create_multipart_req = - self.create_create_multipart_upload_request(&s3url, &settings); - client - .create_multipart_upload(create_multipart_req) - .map_err(RetriableError::Rusoto) - }; + let sdk_config = + s3utils::wait_config(&self.canceller, s3url.region.clone(), timeout_config, cred) + .map_err(|err| match err { + WaitError::FutureError(err) => gst::error_msg!( + gst::ResourceError::OpenWrite, + ["Failed to create SDK config: {}", err] + ), + WaitError::Cancelled => { + gst::error_msg!( + gst::LibraryError::Failed, + ["SDK config request interrupted during start"] + ) + } + })?; - let response = s3utils::wait_retry( - &self.canceller, - Some(Duration::from_millis(DEFAULT_REQUEST_TIMEOUT_MSEC)), - Some(Duration::from_millis(DEFAULT_RETRY_DURATION_MSEC)), - create_multipart_req_future, - ) - .map_err(|err| match err { - WaitError::FutureError(err) => gst::error_msg!( - gst::ResourceError::OpenWrite, - ["Failed to create multipart upload: {:?}", err] - ), - WaitError::Cancelled => { - gst::error_msg!(gst::LibraryError::Failed, ["Interrupted during start"]) - } - })?; + let config = config::Builder::from(&sdk_config) + .retry_config(RetryConfig::new().with_max_attempts(settings.retry_attempts)) + .build(); + let client = Client::from_conf(config); + + let create_multipart_req = + self.create_create_multipart_upload_request(&client, &s3url, &settings); + let create_multipart_req_future = create_multipart_req.send(); + + let response = s3utils::wait(&self.canceller, create_multipart_req_future).map_err( + |err| match err { + WaitError::FutureError(err) => gst::error_msg!( + gst::ResourceError::OpenWrite, + ["Failed to create multipart upload: {}", err] + ), + WaitError::Cancelled => { + gst::error_msg!( + gst::LibraryError::Failed, + ["Create multipart request interrupted during start"] + ) + } + }, + )?; let upload_id = response.upload_id.ok_or_else(|| { gst::error_msg!( @@ -662,7 +642,7 @@ impl ObjectImpl for S3Sink { "region", "AWS Region", "An AWS region (e.g. eu-west-2).", - None, + Some("us-west-2"), glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY, ), glib::ParamSpecUInt64::new( @@ -710,6 +690,15 @@ impl ObjectImpl for S3Sink { DEFAULT_MULTIPART_UPLOAD_ON_ERROR as i32, glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY, ), + glib::ParamSpecUInt::new( + "retry-attempts", + "Retry attempts", + "Number of times AWS SDK attempts a request before abandoning the request", + 1, + 10, + DEFAULT_RETRY_ATTEMPTS, + glib::ParamFlags::READWRITE, + ), glib::ParamSpecInt64::new( "request-timeout", "Request timeout", @@ -719,46 +708,46 @@ impl ObjectImpl for S3Sink { DEFAULT_REQUEST_TIMEOUT_MSEC as i64, glib::ParamFlags::READWRITE, ), - glib::ParamSpecInt64::new( - "retry-duration", - "Retry duration", - "How long we should retry general S3 requests before giving up (in ms, set to -1 for infinity)", - -1, - std::i64::MAX, - DEFAULT_RETRY_DURATION_MSEC as i64, - glib::ParamFlags::READWRITE, - ), glib::ParamSpecInt64::new( "upload-part-request-timeout", "Upload part request timeout", - "Timeout for a single upload part request (in ms, set to -1 for infinity)", + "Timeout for a single upload part request (in ms, set to -1 for infinity) (Deprecated. Use request-timeout.)", -1, std::i64::MAX, DEFAULT_UPLOAD_PART_REQUEST_TIMEOUT_MSEC as i64, glib::ParamFlags::READWRITE, ), - glib::ParamSpecInt64::new( - "upload-part-retry-duration", - "Upload part retry duration", - "How long we should retry upload part requests before giving up (in ms, set to -1 for infinity)", - -1, - std::i64::MAX, - DEFAULT_UPLOAD_PART_RETRY_DURATION_MSEC as i64, - glib::ParamFlags::READWRITE, - ), glib::ParamSpecInt64::new( "complete-upload-request-timeout", "Complete upload request timeout", - "Timeout for the complete multipart upload request (in ms, set to -1 for infinity)", + "Timeout for the complete multipart upload request (in ms, set to -1 for infinity) (Deprecated. Use request-timeout.)", -1, std::i64::MAX, DEFAULT_COMPLETE_REQUEST_TIMEOUT_MSEC as i64, glib::ParamFlags::READWRITE, ), + glib::ParamSpecInt64::new( + "retry-duration", + "Retry duration", + "How long we should retry general S3 requests before giving up (in ms, set to -1 for infinity) (Deprecated. Use retry-attempts.)", + -1, + std::i64::MAX, + DEFAULT_RETRY_DURATION_MSEC as i64, + glib::ParamFlags::READWRITE, + ), + glib::ParamSpecInt64::new( + "upload-part-retry-duration", + "Upload part retry duration", + "How long we should retry upload part requests before giving up (in ms, set to -1 for infinity) (Deprecated. Use retry-attempts.)", + -1, + std::i64::MAX, + DEFAULT_UPLOAD_PART_RETRY_DURATION_MSEC as i64, + glib::ParamFlags::READWRITE, + ), glib::ParamSpecInt64::new( "complete-upload-retry-duration", "Complete upload retry duration", - "How long we should retry complete multipart upload requests before giving up (in ms, set to -1 for infinity)", + "How long we should retry complete multipart upload requests before giving up (in ms, set to -1 for infinity) (Deprecated. Use retry-attempts.)", -1, std::i64::MAX, DEFAULT_COMPLETE_RETRY_DURATION_MSEC as i64, @@ -806,17 +795,7 @@ impl ObjectImpl for S3Sink { } "region" => { let region = value.get::().expect("type checked upstream"); - settings.region = region - .parse::() - .or_else(|_| { - let (name, endpoint) = region.split_once('+').ok_or(())?; - Ok(Region::Custom { - name: name.into(), - endpoint: endpoint.into(), - }) - }) - .unwrap_or_else(|_: ()| panic!("Invalid region '{}'", region)); - + settings.region = Region::new(region); if settings.key.is_some() && settings.bucket.is_some() { let _ = self.set_uri(obj, Some(&settings.to_uri())); } @@ -840,29 +819,34 @@ impl ObjectImpl for S3Sink { settings.multipart_upload_on_error = value.get::().expect("type checked upstream"); } + "retry-attempts" => { + settings.retry_attempts = value.get::().expect("type checked upstream"); + } "request-timeout" => { settings.request_timeout = duration_from_millis(value.get::().expect("type checked upstream")); } - "retry-duration" => { - settings.retry_duration = - duration_from_millis(value.get::().expect("type checked upstream")); - } "upload-part-request-timeout" => { - settings.upload_part_request_timeout = - duration_from_millis(value.get::().expect("type checked upstream")); - } - "upload-part-retry-duration" => { - settings.upload_part_retry_duration = + settings.request_timeout = duration_from_millis(value.get::().expect("type checked upstream")); } "complete-upload-request-timeout" => { - settings.complete_upload_request_timeout = + settings.request_timeout = duration_from_millis(value.get::().expect("type checked upstream")); } - "complete-upload-retry-duration" => { - settings.complete_upload_retry_duration = - duration_from_millis(value.get::().expect("type checked upstream")); + "retry-duration" | "upload-part-retry-duration" | "complete-upload-retry-duration" => { + /* + * To maintain backwards compatibility calculate retry attempts + * by dividing the provided duration from request timeout. + */ + let value = value.get::().expect("type checked upstream"); + let request_timeout = duration_to_millis(Some(settings.request_timeout)); + let retry_attempts = if value > request_timeout { + value / request_timeout + } else { + request_timeout / value + }; + settings.retry_attempts = retry_attempts as u32; } _ => unimplemented!(), } @@ -874,7 +858,7 @@ impl ObjectImpl for S3Sink { match pspec.name() { "key" => settings.key.to_value(), "bucket" => settings.bucket.to_value(), - "region" => settings.region.name().to_value(), + "region" => settings.region.to_string().to_value(), "part-size" => settings.buffer_size.to_value(), "uri" => { let url = match *self.url.lock().unwrap() { @@ -888,19 +872,17 @@ impl ObjectImpl for S3Sink { "secret-access-key" => settings.secret_access_key.to_value(), "metadata" => settings.metadata.to_value(), "on-error" => settings.multipart_upload_on_error.to_value(), - "request-timeout" => duration_to_millis(settings.request_timeout).to_value(), - "retry-duration" => duration_to_millis(settings.retry_duration).to_value(), + "retry-attempts" => settings.retry_attempts.to_value(), + "request-timeout" => duration_to_millis(Some(settings.request_timeout)).to_value(), "upload-part-request-timeout" => { - duration_to_millis(settings.upload_part_request_timeout).to_value() - } - "upload-part-retry-duration" => { - duration_to_millis(settings.upload_part_retry_duration).to_value() + duration_to_millis(Some(settings.request_timeout)).to_value() } "complete-upload-request-timeout" => { - duration_to_millis(settings.complete_upload_request_timeout).to_value() + duration_to_millis(Some(settings.request_timeout)).to_value() } - "complete-upload-retry-duration" => { - duration_to_millis(settings.complete_upload_retry_duration).to_value() + "retry-duration" | "upload-part-retry-duration" | "complete-upload-retry-duration" => { + let request_timeout = duration_to_millis(Some(settings.request_timeout)); + (settings.retry_attempts as i64 * request_timeout).to_value() } _ => unimplemented!(), } diff --git a/net/rusoto/src/s3src/imp.rs b/net/rusoto/src/s3src/imp.rs index f6ac8349..58d1db5b 100644 --- a/net/rusoto/src/s3src/imp.rs +++ b/net/rusoto/src/s3src/imp.rs @@ -6,17 +6,14 @@ // // SPDX-License-Identifier: MPL-2.0 +use bytes::Bytes; +use futures::future; +use once_cell::sync::Lazy; use std::sync::Mutex; use std::time::Duration; -use bytes::{buf::BufMut, Bytes, BytesMut}; -use futures::future; -use futures::{TryFutureExt, TryStreamExt}; -use once_cell::sync::Lazy; -use rusoto_core::request::HttpClient; -use rusoto_credential::StaticProvider; -use rusoto_s3::GetObjectError; -use rusoto_s3::{GetObjectRequest, HeadObjectRequest, S3Client, S3}; +use aws_sdk_s3::config; +use aws_sdk_s3::{Client, Credentials, RetryConfig}; use gst::glib; use gst::prelude::*; @@ -27,9 +24,10 @@ use gst_base::subclass::base_src::CreateSuccess; use gst_base::subclass::prelude::*; use crate::s3url::*; -use crate::s3utils::{self, duration_from_millis, duration_to_millis, RetriableError, WaitError}; +use crate::s3utils::{self, duration_from_millis, duration_to_millis, WaitError}; -const DEFAULT_REQUEST_TIMEOUT_MSEC: u64 = 10_000; +const DEFAULT_RETRY_ATTEMPTS: u32 = 5; +const DEFAULT_REQUEST_TIMEOUT_MSEC: u64 = 15000; const DEFAULT_RETRY_DURATION_MSEC: u64 = 60_000; #[allow(clippy::large_enum_variant)] @@ -37,7 +35,7 @@ enum StreamingState { Stopped, Started { url: GstS3Url, - client: S3Client, + client: Client, size: u64, }, } @@ -48,13 +46,25 @@ impl Default for StreamingState { } } -#[derive(Default)] struct Settings { url: Option, access_key: Option, secret_access_key: Option, - request_timeout: Option, - retry_duration: Option, + retry_attempts: u32, + request_timeout: Duration, +} + +impl Default for Settings { + fn default() -> Self { + let duration = Duration::from_millis(DEFAULT_REQUEST_TIMEOUT_MSEC); + Self { + url: None, + access_key: None, + secret_access_key: None, + retry_attempts: DEFAULT_RETRY_ATTEMPTS, + request_timeout: duration, + } + } } #[derive(Default)] @@ -81,24 +91,44 @@ impl S3Src { }; } - fn connect(self: &S3Src, url: &GstS3Url) -> S3Client { + fn connect(self: &S3Src, url: &GstS3Url) -> Result { let settings = self.settings.lock().unwrap(); + let timeout_config = s3utils::timeout_config(settings.request_timeout); - match ( + let cred = match ( settings.access_key.as_ref(), settings.secret_access_key.as_ref(), ) { - (Some(access_key), Some(secret_access_key)) => { - let creds = - StaticProvider::new_minimal(access_key.clone(), secret_access_key.clone()); - S3Client::new_with( - HttpClient::new().expect("failed to create request dispatcher"), - creds, - url.region.clone(), - ) - } - _ => S3Client::new(url.region.clone()), - } + (Some(access_key), Some(secret_access_key)) => Some(Credentials::new( + access_key.clone(), + secret_access_key.clone(), + None, + None, + "rusoto-s3-src", + )), + _ => None, + }; + + let sdk_config = + s3utils::wait_config(&self.canceller, url.region.clone(), timeout_config, cred) + .map_err(|err| match err { + WaitError::FutureError(err) => gst::error_msg!( + gst::ResourceError::OpenWrite, + ["Failed to create SDK config: {}", err] + ), + WaitError::Cancelled => { + gst::error_msg!( + gst::LibraryError::Failed, + ["SDK config request interrupted during start"] + ) + } + })?; + + let config = config::Builder::from(&sdk_config) + .retry_config(RetryConfig::new().with_max_attempts(settings.retry_attempts)) + .build(); + + Ok(Client::from_conf(config)) } fn set_uri(self: &S3Src, _: &super::S3Src, url_str: Option<&str>) -> Result<(), glib::Error> { @@ -134,47 +164,38 @@ impl S3Src { fn head( self: &S3Src, src: &super::S3Src, - client: &S3Client, + client: &Client, url: &GstS3Url, ) -> Result { - let settings = self.settings.lock().unwrap(); + let head_object = client + .head_object() + .set_bucket(Some(url.bucket.clone())) + .set_key(Some(url.object.clone())) + .set_version_id(url.version.clone()); + let head_object_future = head_object.send(); - let head_object_future = || { - client - .head_object(HeadObjectRequest { - bucket: url.bucket.clone(), - key: url.object.clone(), - version_id: url.version.clone(), - ..Default::default() - }) - .map_err(RetriableError::Rusoto) - }; + let output = + s3utils::wait(&self.canceller, head_object_future).map_err(|err| match err { + WaitError::FutureError(err) => gst::error_msg!( + gst::ResourceError::NotFound, + ["Failed to get HEAD object: {:?}", err] + ), + WaitError::Cancelled => { + gst::error_msg!( + gst::LibraryError::Failed, + ["Head object request interrupted"] + ) + } + })?; - let output = s3utils::wait_retry( - &self.canceller, - settings.request_timeout, - settings.retry_duration, - head_object_future, - ) - .map_err(|err| match err { - WaitError::FutureError(err) => gst::error_msg!( - gst::ResourceError::NotFound, - ["Failed to HEAD object: {:?}", err] - ), - WaitError::Cancelled => { - gst::error_msg!(gst::LibraryError::Failed, ["Interrupted during start"]) - } - })?; + gst::info!( + CAT, + obj: src, + "HEAD success, content length = {}", + output.content_length + ); - if let Some(size) = output.content_length { - gst::info!(CAT, obj: src, "HEAD success, content length = {}", size); - Ok(size as u64) - } else { - Err(gst::error_msg!( - gst::ResourceError::Read, - ["Failed to get content length"] - )) - } + Ok(output.content_length as u64) } /* Returns the bytes, Some(error) if one occured, or a None error if interrupted */ @@ -200,57 +221,38 @@ impl S3Src { } }; - let settings = self.settings.lock().unwrap(); + let get_object = client + .get_object() + .set_bucket(Some(url.bucket.clone())) + .set_key(Some(url.object.clone())) + .set_range(Some(format!("bytes={}-{}", offset, offset + length - 1))) + .set_version_id(url.version.clone()); - let get_object_future = || async { - gst::debug!( - CAT, - obj: src, - "Requesting range: {}-{}", - offset, - offset + length - 1 - ); + gst::debug!( + CAT, + obj: src, + "Requesting range: {}-{}", + offset, + offset + length - 1 + ); - let output = client - .get_object(GetObjectRequest { - bucket: url.bucket.clone(), - key: url.object.clone(), - range: Some(format!("bytes={}-{}", offset, offset + length - 1)), - version_id: url.version.clone(), - ..Default::default() - }) - .map_err(RetriableError::Rusoto) - .await?; + let get_object_future = get_object.send(); - gst::debug!( - CAT, - obj: src, - "Read {} bytes", - output.content_length.unwrap() - ); + let mut output = + s3utils::wait(&self.canceller, get_object_future).map_err(|err| match err { + WaitError::FutureError(err) => Some(gst::error_msg!( + gst::ResourceError::Read, + ["Could not read: {}", err] + )), + WaitError::Cancelled => None, + })?; - let mut collect = BytesMut::new(); - let mut stream = output.body.unwrap(); + gst::debug!(CAT, obj: src, "Read {} bytes", output.content_length); - // Loop over the stream and collect till we're done - // FIXME: Can we use TryStreamExt::collect() here? - while let Some(item) = stream.try_next().map_err(RetriableError::Std).await? { - collect.put(item) - } - - Ok::>(collect.freeze()) - }; - - s3utils::wait_retry( - &self.canceller, - settings.request_timeout, - settings.retry_duration, - get_object_future, - ) - .map_err(|err| match err { + s3utils::wait_stream(&self.canceller, &mut output.body).map_err(|err| match err { WaitError::FutureError(err) => Some(gst::error_msg!( gst::ResourceError::Read, - ["Could not read: {:?}", err] + ["Could not read: {}", err] )), WaitError::Cancelled => None, }) @@ -302,12 +304,21 @@ impl ObjectImpl for S3Src { glib::ParamSpecInt64::new( "retry-duration", "Retry duration", - "How long we should retry S3 requests before giving up (in ms, set to -1 for infinity)", + "How long we should retry S3 requests before giving up (in ms, set to -1 for infinity) (Deprecated. Use retry-attempts.)", -1, std::i64::MAX, DEFAULT_RETRY_DURATION_MSEC as i64, glib::ParamFlags::READWRITE, ), + glib::ParamSpecUInt::new( + "retry-attempts", + "Retry attempts", + "Number of times AWS SDK attempts a request before abandoning the request", + 1, + 10, + DEFAULT_RETRY_ATTEMPTS, + glib::ParamFlags::READWRITE, + ), ] }); @@ -321,27 +332,39 @@ impl ObjectImpl for S3Src { value: &glib::Value, pspec: &glib::ParamSpec, ) { + let mut settings = self.settings.lock().unwrap(); + match pspec.name() { "uri" => { + drop(settings); let _ = self.set_uri(obj, value.get().expect("type checked upstream")); } "access-key" => { - let mut settings = self.settings.lock().unwrap(); settings.access_key = value.get().expect("type checked upstream"); } "secret-access-key" => { - let mut settings = self.settings.lock().unwrap(); settings.secret_access_key = value.get().expect("type checked upstream"); } "request-timeout" => { - let mut settings = self.settings.lock().unwrap(); settings.request_timeout = duration_from_millis(value.get::().expect("type checked upstream")); } "retry-duration" => { - let mut settings = self.settings.lock().unwrap(); - settings.retry_duration = - duration_from_millis(value.get::().expect("type checked upstream")); + /* + * To maintain backwards compatibility calculate retry attempts + * by dividing the provided duration from request timeout. + */ + let value = value.get::().expect("type checked upstream"); + let request_timeout = duration_to_millis(Some(settings.request_timeout)); + let retry_attempts = if value > request_timeout { + value / request_timeout + } else { + request_timeout / value + }; + settings.retry_attempts = retry_attempts as u32; + } + "retry-attempts" => { + settings.retry_attempts = value.get::().expect("type checked upstream"); } _ => unimplemented!(), } @@ -361,8 +384,12 @@ impl ObjectImpl for S3Src { } "access-key" => settings.access_key.to_value(), "secret-access-key" => settings.secret_access_key.to_value(), - "request-timeout" => duration_to_millis(settings.request_timeout).to_value(), - "retry-duration" => duration_to_millis(settings.retry_duration).to_value(), + "request-timeout" => duration_to_millis(Some(settings.request_timeout)).to_value(), + "retry-duration" => { + let request_timeout = duration_to_millis(Some(settings.request_timeout)); + (settings.retry_attempts as i64 * request_timeout).to_value() + } + "retry-attempts" => settings.retry_attempts.to_value(), _ => unimplemented!(), } } @@ -459,16 +486,22 @@ impl BaseSrcImpl for S3Src { }; drop(settings); - let s3client = self.connect(&s3url); - let size = self.head(src, &s3client, &s3url)?; + if let Ok(s3client) = self.connect(&s3url) { + let size = self.head(src, &s3client, &s3url)?; - *state = StreamingState::Started { - url: s3url, - client: s3client, - size, - }; + *state = StreamingState::Started { + url: s3url, + client: s3client, + size, + }; - Ok(()) + Ok(()) + } else { + Err(gst::error_msg!( + gst::ResourceError::Failed, + ["Cannot connect to S3 resource"] + )) + } } fn stop(&self, _: &Self::Type) -> Result<(), gst::ErrorMessage> { diff --git a/net/rusoto/src/s3url.rs b/net/rusoto/src/s3url.rs index c03310e9..1a331118 100644 --- a/net/rusoto/src/s3url.rs +++ b/net/rusoto/src/s3url.rs @@ -6,8 +6,8 @@ // // SPDX-License-Identifier: MPL-2.0 +use aws_sdk_s3::Region; use percent_encoding::{percent_decode, percent_encode, AsciiSet, CONTROLS}; -use rusoto_core::Region; use url::Url; #[derive(Clone)] @@ -29,27 +29,7 @@ impl ToString for GstS3Url { fn to_string(&self) -> String { format!( "s3://{}/{}/{}{}", - match self.region { - Region::Custom { - ref name, - ref endpoint, - } => { - format!( - "{}+{}", - base32::encode( - base32::Alphabet::RFC4648 { padding: true }, - name.as_bytes(), - ), - base32::encode( - base32::Alphabet::RFC4648 { padding: true }, - endpoint.as_bytes(), - ), - ) - } - _ => { - String::from(self.region.name()) - } - }, + self.region, self.bucket, percent_encode(self.object.as_bytes(), PATH_SEGMENT), if self.version.is_some() { @@ -73,8 +53,9 @@ pub fn parse_s3_url(url_str: &str) -> Result { } let host = url.host_str().unwrap(); - let region = host - .parse::() + + let region_str = host + .parse() .or_else(|_| { let (name, endpoint) = host.split_once('+').ok_or(())?; let name = @@ -83,10 +64,15 @@ pub fn parse_s3_url(url_str: &str) -> Result { base32::decode(base32::Alphabet::RFC4648 { padding: true }, endpoint).ok_or(())?; let name = String::from_utf8(name).map_err(|_| ())?; let endpoint = String::from_utf8(endpoint).map_err(|_| ())?; - Ok(Region::Custom { name, endpoint }) + Ok(format!("{}{}", name, endpoint)) }) .map_err(|_: ()| format!("Invalid region '{}'", host))?; + // Note that aws_sdk_s3::Region does not provide any error/validation + // methods to check the region argument being passed to it. + // See https://docs.rs/aws-sdk-s3/latest/aws_sdk_s3/struct.Region.html + let region = Region::new(region_str); + let mut path = url .path_segments() .ok_or_else(|| format!("Invalid uri '{}'", url))?; @@ -127,81 +113,3 @@ pub fn parse_s3_url(url_str: &str) -> Result { version, }) } - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn cannot_be_base() { - assert!(parse_s3_url("data:something").is_err()); - } - - #[test] - fn invalid_scheme() { - assert!(parse_s3_url("file:///dev/zero").is_err()); - } - - #[test] - fn bad_region() { - assert!(parse_s3_url("s3://atlantis-1/i-hope-we/dont-find-this").is_err()); - } - - #[test] - fn no_bucket() { - assert!(parse_s3_url("s3://ap-south-1").is_err()); - assert!(parse_s3_url("s3://ap-south-1/").is_err()); - } - - #[test] - fn no_object() { - assert!(parse_s3_url("s3://ap-south-1/my-bucket").is_err()); - assert!(parse_s3_url("s3://ap-south-1/my-bucket/").is_err()); - } - - #[test] - fn valid_simple() { - assert!(parse_s3_url("s3://ap-south-1/my-bucket/my-object").is_ok()); - } - - #[test] - fn extraneous_query() { - assert!(parse_s3_url("s3://ap-south-1/my-bucket/my-object?foo=bar").is_err()); - } - - #[test] - fn valid_version() { - assert!(parse_s3_url("s3://ap-south-1/my-bucket/my-object?version=one").is_ok()); - } - - #[test] - fn trailing_slash() { - // Slashes are valid at the end of the object key - assert_eq!( - parse_s3_url("s3://ap-south-1/my-bucket/my-object/") - .unwrap() - .object, - "my-object/" - ); - } - - #[test] - fn percent_encoding() { - assert_eq!( - parse_s3_url("s3://ap-south-1/my-bucket/my%20object") - .unwrap() - .object, - "my object" - ); - } - - #[test] - fn percent_decoding() { - assert_eq!( - parse_s3_url("s3://ap-south-1/my-bucket/my object") - .unwrap() - .to_string(), - "s3://ap-south-1/my-bucket/my%20object" - ); - } -} diff --git a/net/rusoto/src/s3utils.rs b/net/rusoto/src/s3utils.rs index fea2bda8..9e574b86 100644 --- a/net/rusoto/src/s3utils.rs +++ b/net/rusoto/src/s3utils.rs @@ -6,21 +6,22 @@ // // SPDX-License-Identifier: MPL-2.0 -use futures::{future, Future, FutureExt, TryFutureExt}; +use aws_config::meta::region::RegionProviderChain; +use aws_sdk_s3::{Credentials, Region}; +use aws_types::sdk_config::SdkConfig; + +use aws_smithy_http::byte_stream::{ByteStream, Error}; +use aws_smithy_types::{timeout, tristate::TriState}; + +use bytes::{buf::BufMut, Bytes, BytesMut}; +use futures::stream::TryStreamExt; +use futures::{future, Future}; use once_cell::sync::Lazy; -use rusoto_core::RusotoError::{HttpDispatch, Unknown}; -use rusoto_core::{HttpDispatchError, RusotoError}; use std::sync::Mutex; use std::time::Duration; use tokio::runtime; -static CAT: Lazy = Lazy::new(|| { - gst::DebugCategory::new( - "rusotos3utils", - gst::DebugColorFlags::empty(), - Some("Amazon S3 utilities"), - ) -}); +const DEFAULT_S3_REGION: &str = "us-west-2"; static RUNTIME: Lazy = Lazy::new(|| { runtime::Builder::new_multi_thread() @@ -32,90 +33,18 @@ static RUNTIME: Lazy = Lazy::new(|| { }); #[derive(Debug)] -pub enum RetriableError { - Rusoto(RusotoError), - Std(std::io::Error), -} - pub enum WaitError { Cancelled, FutureError(E), } -fn make_timeout( - timeout: Duration, - future: F, -) -> impl Future>> -where - E: std::fmt::Debug, - F: Future>>, -{ - tokio::time::timeout(timeout, future).map(|v| match v { - // Future resolved succesfully - Ok(Ok(v)) => Ok(v), - // Future resolved with an error - Ok(Err(e)) => Err(e), - // Timeout elapsed - // Use an HttpDispatch error so the caller doesn't have to deal with this separately from - // other HTTP dispatch errors - _ => Err(RetriableError::Rusoto(HttpDispatch( - HttpDispatchError::new("Timeout".to_owned()), - ))), - }) -} - -fn make_retry( - timeout: Option, - mut future: F, -) -> impl Future>> -where - E: std::fmt::Debug, - F: FnMut() -> Fut, - Fut: Future>>, -{ - backoff::future::retry( - backoff::ExponentialBackoffBuilder::new() - .with_initial_interval(Duration::from_millis(500)) - .with_multiplier(1.5) - .with_max_elapsed_time(timeout) - .build(), - move || { - future().map_err(|err| match err { - RetriableError::Rusoto(HttpDispatch(_)) => { - gst::warning!(CAT, "Error waiting for operation ({:?}), retrying", err); - backoff::Error::transient(err) - } - RetriableError::Rusoto(Unknown(ref response)) => { - gst::warning!( - CAT, - "Unknown error waiting for operation ({:?}), retrying", - response - ); - - // Retry on 5xx errors - if response.status.is_server_error() { - backoff::Error::transient(err) - } else { - backoff::Error::permanent(err) - } - } - _ => backoff::Error::permanent(err), - }) - }, - ) -} - -pub fn wait_retry( +pub fn wait( canceller: &Mutex>, - req_timeout: Option, - retry_timeout: Option, - mut future: F, -) -> Result>> + future: F, +) -> Result> where - E: std::fmt::Debug, - F: FnMut() -> Fut, - Fut: Send + Future>>, - Fut::Output: Send, + F: Send + Future>, + F::Output: Send, T: Send, E: Send, { @@ -125,28 +54,14 @@ where canceller_guard.replace(abort_handle); drop(canceller_guard); + let abortable_future = future::Abortable::new(future, abort_registration); + + // FIXME: add a timeout as well + let res = { let _enter = RUNTIME.enter(); - futures::executor::block_on(async { - // The order of this future stack matters: the innermost future is the supplied future - // generator closure. We wrap that in a timeout to bound how long we wait. This, in - // turn, is wrapped in a retrying future which will make multiple attempts until it - // ultimately fails. - // The timeout must be created within the tokio executor - let res = match req_timeout { - None => { - let retry_future = make_retry(retry_timeout, future); - future::Abortable::new(retry_future, abort_registration).await - } - Some(t) => { - let timeout_future = || make_timeout(t, future()); - let retry_future = make_retry(retry_timeout, timeout_future); - future::Abortable::new(retry_future, abort_registration).await - } - }; - - match res { + match abortable_future.await { // Future resolved successfully Ok(Ok(res)) => Ok(res), // Future resolved with an error @@ -164,16 +79,94 @@ where res } -pub fn duration_from_millis(millis: i64) -> Option { +pub fn wait_stream( + canceller: &Mutex>, + stream: &mut ByteStream, +) -> Result> { + wait(canceller, async move { + let mut collect = BytesMut::new(); + + // Loop over the stream and collect till we're done + while let Some(item) = stream.try_next().await? { + collect.put(item) + } + + Ok::(collect.freeze()) + }) +} + +// See setting-timeouts example in aws-sdk-rust. +pub fn timeout_config(request_timeout: Duration) -> timeout::Config { + timeout::Config::new().with_api_timeouts( + timeout::Api::new() + // This timeout acts at the "Request to a service" level. When the SDK makes a request to a + // service, that "request" can contain several HTTP requests. This way, you can retry + // failures that are likely spurious, or refresh credentials. + .with_call_timeout(TriState::Set(request_timeout)) + // This timeout acts at the "HTTP request" level and sets a separate timeout for each + // HTTP request made as part of a "service request." + .with_call_attempt_timeout(TriState::Set(request_timeout)), + ) +} + +pub fn wait_config( + canceller: &Mutex>, + region: Region, + timeout_config: timeout::Config, + credentials: Option, +) -> Result> { + let region_provider = RegionProviderChain::first_try(region) + .or_default_provider() + .or_else(Region::new(DEFAULT_S3_REGION)); + let config_future = match credentials { + Some(cred) => aws_config::from_env() + .timeout_config(timeout_config) + .region(region_provider) + .credentials_provider(cred) + .load(), + None => aws_config::from_env() + .timeout_config(timeout_config) + .region(region_provider) + .load(), + }; + + let mut canceller_guard = canceller.lock().unwrap(); + let (abort_handle, abort_registration) = future::AbortHandle::new_pair(); + + canceller_guard.replace(abort_handle); + drop(canceller_guard); + + let abortable_future = future::Abortable::new(config_future, abort_registration); + + let res = { + let _enter = RUNTIME.enter(); + futures::executor::block_on(async { + match abortable_future.await { + // Future resolved successfully + Ok(config) => Ok(config), + // Canceller called before future resolved + Err(future::Aborted) => Err(WaitError::Cancelled), + } + }) + }; + + /* Clear out the canceller */ + canceller_guard = canceller.lock().unwrap(); + *canceller_guard = None; + + res +} + +pub fn duration_from_millis(millis: i64) -> Duration { match millis { - -1 => None, - v => Some(Duration::from_millis(v as u64)), + -1 => Duration::MAX, + v => Duration::from_millis(v as u64), } } pub fn duration_to_millis(dur: Option) -> i64 { match dur { - None => -1, + None => Duration::MAX.as_millis() as i64, Some(d) => d.as_millis() as i64, } }