From c2aafa4f4690331f10143c274eecd7ffdc75e3fa Mon Sep 17 00:00:00 2001 From: Arun Raghavan Date: Wed, 23 Feb 2022 13:10:30 -0500 Subject: [PATCH] rusoto: s3sink: Implement timeout/retry for part uploads Rusoto does not implement timeouts or retries for any of its HTTP requests. This is particularly problematic for the part upload stage of multipart uploads, as a blip in the network could cause part uploads to freeze for a long duration and eventually bail. To avoid this, for part uploads, we add (a) (configurable) timeouts for each request, and (b) retries with exponential backoff, upto a configurable duration. It is not clear if/how we want to do this for other types of requests. The creation of a multipart upload should be relatively quick, but the completion of an upload might take several minutes, so there is no one-size-fits-all configuration, necessarily. It would likely make more sense to implement some sensible hard-coded defaults for these other sorts of requests. --- net/rusoto/Cargo.toml | 3 +- net/rusoto/src/s3sink/imp.rs | 207 +++++++++++++++++++++++------------ net/rusoto/src/s3utils.rs | 121 +++++++++++++++++++- 3 files changed, 254 insertions(+), 77 deletions(-) diff --git a/net/rusoto/Cargo.toml b/net/rusoto/Cargo.toml index cf99b1ae..5e113b17 100644 --- a/net/rusoto/Cargo.toml +++ b/net/rusoto/Cargo.toml @@ -21,7 +21,7 @@ rusoto_credential = "0.47" rusoto_signature = "0.47" url = "2" percent-encoding = "2" -tokio = { version = "1.0", features = [ "rt-multi-thread" ] } +tokio = { version = "1.0", features = [ "rt-multi-thread", "time" ] } async-tungstenite = { version = "0.16", features = ["tokio", "tokio-runtime", "tokio-native-tls"] } nom = "7" crc = "2" @@ -32,6 +32,7 @@ serde_derive = "1" serde_json = "1" atomic_refcell = "0.1" base32 = "0.4" +backoff = { version = "0.4", features = [ "futures", "tokio" ] } [lib] name = "gstrusoto" diff --git a/net/rusoto/src/s3sink/imp.rs b/net/rusoto/src/s3sink/imp.rs index 894474f3..96aa893b 100644 --- a/net/rusoto/src/s3sink/imp.rs +++ b/net/rusoto/src/s3sink/imp.rs @@ -24,6 +24,7 @@ use once_cell::sync::Lazy; use std::collections::HashMap; use std::sync::Mutex; +use std::time::Duration; use crate::s3url::*; use crate::s3utils::{self, WaitError}; @@ -31,6 +32,8 @@ use crate::s3utils::{self, WaitError}; use super::OnError; const DEFAULT_MULTIPART_UPLOAD_ON_ERROR: OnError = OnError::DoNothing; +const DEFAULT_UPLOAD_PART_REQUEST_TIMEOUT_MSEC: u64 = 10_000; +const DEFAULT_UPLOAD_PART_RETRY_DURATION_MSEC: u64 = 60_000; struct Started { client: S3Client, @@ -93,6 +96,8 @@ struct Settings { secret_access_key: Option, metadata: Option, multipart_upload_on_error: OnError, + upload_part_request_timeout: Option, + upload_part_retry_duration: Option, } impl Settings { @@ -161,6 +166,12 @@ impl Default for Settings { secret_access_key: None, metadata: None, multipart_upload_on_error: DEFAULT_MULTIPART_UPLOAD_ON_ERROR, + upload_part_request_timeout: Some(Duration::from_millis( + DEFAULT_UPLOAD_PART_REQUEST_TIMEOUT_MSEC, + )), + upload_part_retry_duration: Some(Duration::from_millis( + DEFAULT_UPLOAD_PART_RETRY_DURATION_MSEC, + )), } } } @@ -187,10 +198,8 @@ impl S3Sink { &self, element: &super::S3Sink, ) -> Result<(), Option> { - let upload_part_req = self.create_upload_part_request()?; - let part_number = upload_part_req.part_number; - let mut state = self.state.lock().unwrap(); + let settings = self.settings.lock().unwrap(); let state = match *state { State::Started(ref mut started_state) => started_state, State::Stopped => { @@ -198,68 +207,81 @@ impl S3Sink { } }; - let upload_part_req_future = state.client.upload_part(upload_part_req); + let part_number = state.increment_part_number()?; + let body = std::mem::replace( + &mut state.buffer, + Vec::with_capacity(settings.buffer_size as usize), + ); + let upload_id = &state.upload_id; + let client = &state.client; - let output = - s3utils::wait(&self.canceller, upload_part_req_future).map_err(|err| match err { - WaitError::FutureError(err) => { - let settings = self.settings.lock().unwrap(); - match settings.multipart_upload_on_error { - OnError::Abort => { - gst::log!( - CAT, - obj: element, - "Aborting multipart upload request with id: {}", - state.upload_id - ); - match self.abort_multipart_upload_request(state) { - Ok(()) => { - gst::log!( - CAT, - obj: element, - "Aborting multipart upload request succeeded." - ); - } - Err(err) => gst::error!( + let upload_part_req_future = + || client.upload_part(self.create_upload_part_request(&body, part_number, upload_id)); + + let output = s3utils::wait_retry( + &self.canceller, + settings.upload_part_request_timeout, + settings.upload_part_retry_duration, + upload_part_req_future, + ) + .map_err(|err| match err { + WaitError::FutureError(err) => { + match settings.multipart_upload_on_error { + OnError::Abort => { + gst::log!( + CAT, + obj: element, + "Aborting multipart upload request with id: {}", + state.upload_id + ); + match self.abort_multipart_upload_request(state) { + Ok(()) => { + gst::log!( CAT, obj: element, - "Aborting multipart upload failed: {}", - err.to_string() - ), + "Aborting multipart upload request succeeded." + ); } - } - OnError::Complete => { - gst::log!( + Err(err) => gst::error!( CAT, obj: element, - "Completing multipart upload request with id: {}", - state.upload_id - ); - match self.complete_multipart_upload_request(state) { - Ok(()) => { - gst::log!( - CAT, - obj: element, - "Complete multipart upload request succeeded." - ); - } - Err(err) => gst::error!( - CAT, - obj: element, - "Completing multipart upload failed: {}", - err.to_string() - ), - } + "Aborting multipart upload failed: {}", + err.to_string() + ), } - OnError::DoNothing => (), } - Some(gst::error_msg!( - gst::ResourceError::OpenWrite, - ["Failed to upload part: {}", err] - )) + OnError::Complete => { + gst::log!( + CAT, + obj: element, + "Completing multipart upload request with id: {}", + state.upload_id + ); + match self.complete_multipart_upload_request(state) { + Ok(()) => { + gst::log!( + CAT, + obj: element, + "Complete multipart upload request succeeded." + ); + } + Err(err) => gst::error!( + CAT, + obj: element, + "Completing multipart upload failed: {}", + err.to_string() + ), + } + } + OnError::DoNothing => (), } - WaitError::Cancelled => None, - })?; + Some(gst::error_msg!( + gst::ResourceError::OpenWrite, + ["Failed to upload part: {}", err] + )) + } + WaitError::Cancelled => None, + })?; state.completed_parts.push(CompletedPart { e_tag: output.e_tag, @@ -270,29 +292,22 @@ impl S3Sink { Ok(()) } - fn create_upload_part_request(&self) -> Result { + fn create_upload_part_request( + &self, + body: &[u8], + part_number: i64, + upload_id: &str, + ) -> UploadPartRequest { let url = self.url.lock().unwrap(); - let settings = self.settings.lock().unwrap(); - let mut state = self.state.lock().unwrap(); - let state = match *state { - State::Started(ref mut started_state) => started_state, - State::Stopped => { - unreachable!("Element should be started"); - } - }; - let part_number = state.increment_part_number()?; - Ok(UploadPartRequest { - body: Some(rusoto_core::ByteStream::from(std::mem::replace( - &mut state.buffer, - Vec::with_capacity(settings.buffer_size as usize), - ))), + UploadPartRequest { + body: Some(rusoto_core::ByteStream::from(body.to_owned())), bucket: url.as_ref().unwrap().bucket.to_owned(), key: url.as_ref().unwrap().object.to_owned(), - upload_id: state.upload_id.to_owned(), + upload_id: upload_id.to_owned(), part_number, ..Default::default() - }) + } } fn create_complete_multipart_upload_request( @@ -640,6 +655,24 @@ impl ObjectImpl for S3Sink { DEFAULT_MULTIPART_UPLOAD_ON_ERROR as i32, glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY, ), + glib::ParamSpecInt64::new( + "upload-part-request-timeout", + "Upload part request timeout", + "Timeout for a single upload part request (in ms, set to -1 for infinity)", + -1, + std::i64::MAX, + DEFAULT_UPLOAD_PART_REQUEST_TIMEOUT_MSEC as i64, + glib::ParamFlags::READWRITE, + ), + glib::ParamSpecInt64::new( + "upload-part-retry-duration", + "Upload part retry duration", + "How long we should retry upload part requests before giving up (in ms, set to -1 for infinity)", + -1, + std::i64::MAX, + DEFAULT_UPLOAD_PART_RETRY_DURATION_MSEC as i64, + glib::ParamFlags::READWRITE, + ), ] }); @@ -716,6 +749,20 @@ impl ObjectImpl for S3Sink { settings.multipart_upload_on_error = value.get::().expect("type checked upstream"); } + "upload-part-request-timeout" => { + settings.upload_part_request_timeout = + match value.get::().expect("type checked upstream") { + -1 => None, + v => Some(Duration::from_millis(v as u64)), + } + } + "upload-part-retry-duration" => { + settings.upload_part_retry_duration = + match value.get::().expect("type checked upstream") { + -1 => None, + v => Some(Duration::from_millis(v as u64)), + } + } _ => unimplemented!(), } } @@ -740,6 +787,20 @@ impl ObjectImpl for S3Sink { "secret-access-key" => settings.secret_access_key.to_value(), "metadata" => settings.metadata.to_value(), "on-error" => settings.multipart_upload_on_error.to_value(), + "upload-part-request-timeout" => { + let timeout: i64 = match settings.upload_part_request_timeout { + None => -1, + Some(v) => v.as_millis() as i64, + }; + timeout.to_value() + } + "upload-part-retry-duration" => { + let timeout: i64 = match settings.upload_part_retry_duration { + None => -1, + Some(v) => v.as_millis() as i64, + }; + timeout.to_value() + } _ => unimplemented!(), } } diff --git a/net/rusoto/src/s3utils.rs b/net/rusoto/src/s3utils.rs index 9775ebd4..fed979b1 100644 --- a/net/rusoto/src/s3utils.rs +++ b/net/rusoto/src/s3utils.rs @@ -7,13 +7,22 @@ // SPDX-License-Identifier: MPL-2.0 use bytes::{buf::BufMut, Bytes, BytesMut}; -use futures::stream::TryStreamExt; -use futures::{future, Future}; +use futures::{future, Future, FutureExt, TryFutureExt, TryStreamExt}; use once_cell::sync::Lazy; -use rusoto_core::ByteStream; +use rusoto_core::RusotoError::HttpDispatch; +use rusoto_core::{ByteStream, HttpDispatchError, RusotoError}; use std::sync::Mutex; +use std::time::Duration; use tokio::runtime; +static CAT: Lazy = Lazy::new(|| { + gst::DebugCategory::new( + "rusotos3utils", + gst::DebugColorFlags::empty(), + Some("Amazon S3 utilities"), + ) +}); + static RUNTIME: Lazy = Lazy::new(|| { runtime::Builder::new_multi_thread() .enable_all() @@ -28,6 +37,112 @@ pub enum WaitError { FutureError(E), } +fn make_timeout( + timeout: Duration, + future: F, +) -> impl Future>> +where + E: std::fmt::Debug, + F: Future>>, +{ + tokio::time::timeout(timeout, future).map(|v| match v { + // Future resolved succesfully + Ok(Ok(v)) => Ok(v), + // Future resolved with an error + Ok(Err(e)) => Err(e), + // Timeout elapsed + // Use an HttpDispatch error so the caller doesn't have to deal with this separately from + // other HTTP dispatch errors + _ => Err(HttpDispatch(HttpDispatchError::new("Timeout".to_owned()))), + }) +} + +fn make_retry( + timeout: Option, + mut future: F, +) -> impl Future>> +where + E: std::fmt::Debug, + F: FnMut() -> Fut, + Fut: Future>>, +{ + backoff::future::retry( + backoff::ExponentialBackoffBuilder::new() + .with_initial_interval(Duration::from_millis(500)) + .with_multiplier(1.5) + .with_max_elapsed_time(timeout) + .build(), + move || { + future().map_err(|err| match err { + HttpDispatch(_) => { + gst::warning!(CAT, "Error waiting for operation ({:?}), retrying", err); + backoff::Error::transient(err) + } + _ => backoff::Error::permanent(err), + }) + }, + ) +} + +pub fn wait_retry( + canceller: &Mutex>, + req_timeout: Option, + retry_timeout: Option, + mut future: F, +) -> Result>> +where + E: std::fmt::Debug, + F: FnMut() -> Fut, + Fut: Send + Future>>, + Fut::Output: Send, + T: Send, + E: Send, +{ + let mut canceller_guard = canceller.lock().unwrap(); + let (abort_handle, abort_registration) = future::AbortHandle::new_pair(); + + canceller_guard.replace(abort_handle); + drop(canceller_guard); + + let res = { + let _enter = RUNTIME.enter(); + + futures::executor::block_on(async { + // The order of this future stack matters: the innermost future is the supplied future + // generator closure. We wrap that in a timeout to bound how long we wait. This, in + // turn, is wrapped in a retrying future which will make multiple attempts until it + // ultimately fails. + // The timeout must be created within the tokio executor + let res = match req_timeout { + None => { + let retry_future = make_retry(retry_timeout, future); + future::Abortable::new(retry_future, abort_registration).await + } + Some(t) => { + let timeout_future = || make_timeout(t, future()); + let retry_future = make_retry(retry_timeout, timeout_future); + future::Abortable::new(retry_future, abort_registration).await + } + }; + + match res { + // Future resolved successfully + Ok(Ok(res)) => Ok(res), + // Future resolved with an error + Ok(Err(err)) => Err(WaitError::FutureError(err)), + // Canceller called before future resolved + Err(future::Aborted) => Err(WaitError::Cancelled), + } + }) + }; + + /* Clear out the canceller */ + canceller_guard = canceller.lock().unwrap(); + *canceller_guard = None; + + res +} + pub fn wait( canceller: &Mutex>, future: F,