rusoto: s3sink: Implement timeout/retry for part uploads

Rusoto does not implement timeouts or retries for any of its HTTP
requests. This is particularly problematic for the part upload stage of
multipart uploads, as a blip in the network could cause part uploads to
freeze for a long duration and eventually bail.

To avoid this, for part uploads, we add (a) (configurable) timeouts for
each request, and (b) retries with exponential backoff, upto a
configurable duration.

It is not clear if/how we want to do this for other types of requests.
The creation of a multipart upload should be relatively quick, but the
completion of an upload might take several minutes, so there is no
one-size-fits-all configuration, necessarily.

It would likely make more sense to implement some sensible hard-coded
defaults for these other sorts of requests.
This commit is contained in:
Arun Raghavan 2022-02-23 13:10:30 -05:00
parent ab344a469a
commit c2aafa4f46
3 changed files with 254 additions and 77 deletions

View file

@ -21,7 +21,7 @@ rusoto_credential = "0.47"
rusoto_signature = "0.47" rusoto_signature = "0.47"
url = "2" url = "2"
percent-encoding = "2" percent-encoding = "2"
tokio = { version = "1.0", features = [ "rt-multi-thread" ] } tokio = { version = "1.0", features = [ "rt-multi-thread", "time" ] }
async-tungstenite = { version = "0.16", features = ["tokio", "tokio-runtime", "tokio-native-tls"] } async-tungstenite = { version = "0.16", features = ["tokio", "tokio-runtime", "tokio-native-tls"] }
nom = "7" nom = "7"
crc = "2" crc = "2"
@ -32,6 +32,7 @@ serde_derive = "1"
serde_json = "1" serde_json = "1"
atomic_refcell = "0.1" atomic_refcell = "0.1"
base32 = "0.4" base32 = "0.4"
backoff = { version = "0.4", features = [ "futures", "tokio" ] }
[lib] [lib]
name = "gstrusoto" name = "gstrusoto"

View file

@ -24,6 +24,7 @@ use once_cell::sync::Lazy;
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::Mutex; use std::sync::Mutex;
use std::time::Duration;
use crate::s3url::*; use crate::s3url::*;
use crate::s3utils::{self, WaitError}; use crate::s3utils::{self, WaitError};
@ -31,6 +32,8 @@ use crate::s3utils::{self, WaitError};
use super::OnError; use super::OnError;
const DEFAULT_MULTIPART_UPLOAD_ON_ERROR: OnError = OnError::DoNothing; const DEFAULT_MULTIPART_UPLOAD_ON_ERROR: OnError = OnError::DoNothing;
const DEFAULT_UPLOAD_PART_REQUEST_TIMEOUT_MSEC: u64 = 10_000;
const DEFAULT_UPLOAD_PART_RETRY_DURATION_MSEC: u64 = 60_000;
struct Started { struct Started {
client: S3Client, client: S3Client,
@ -93,6 +96,8 @@ struct Settings {
secret_access_key: Option<String>, secret_access_key: Option<String>,
metadata: Option<gst::Structure>, metadata: Option<gst::Structure>,
multipart_upload_on_error: OnError, multipart_upload_on_error: OnError,
upload_part_request_timeout: Option<Duration>,
upload_part_retry_duration: Option<Duration>,
} }
impl Settings { impl Settings {
@ -161,6 +166,12 @@ impl Default for Settings {
secret_access_key: None, secret_access_key: None,
metadata: None, metadata: None,
multipart_upload_on_error: DEFAULT_MULTIPART_UPLOAD_ON_ERROR, multipart_upload_on_error: DEFAULT_MULTIPART_UPLOAD_ON_ERROR,
upload_part_request_timeout: Some(Duration::from_millis(
DEFAULT_UPLOAD_PART_REQUEST_TIMEOUT_MSEC,
)),
upload_part_retry_duration: Some(Duration::from_millis(
DEFAULT_UPLOAD_PART_RETRY_DURATION_MSEC,
)),
} }
} }
} }
@ -187,10 +198,8 @@ impl S3Sink {
&self, &self,
element: &super::S3Sink, element: &super::S3Sink,
) -> Result<(), Option<gst::ErrorMessage>> { ) -> Result<(), Option<gst::ErrorMessage>> {
let upload_part_req = self.create_upload_part_request()?;
let part_number = upload_part_req.part_number;
let mut state = self.state.lock().unwrap(); let mut state = self.state.lock().unwrap();
let settings = self.settings.lock().unwrap();
let state = match *state { let state = match *state {
State::Started(ref mut started_state) => started_state, State::Started(ref mut started_state) => started_state,
State::Stopped => { State::Stopped => {
@ -198,68 +207,81 @@ impl S3Sink {
} }
}; };
let upload_part_req_future = state.client.upload_part(upload_part_req); let part_number = state.increment_part_number()?;
let body = std::mem::replace(
&mut state.buffer,
Vec::with_capacity(settings.buffer_size as usize),
);
let upload_id = &state.upload_id;
let client = &state.client;
let output = let upload_part_req_future =
s3utils::wait(&self.canceller, upload_part_req_future).map_err(|err| match err { || client.upload_part(self.create_upload_part_request(&body, part_number, upload_id));
WaitError::FutureError(err) => {
let settings = self.settings.lock().unwrap(); let output = s3utils::wait_retry(
match settings.multipart_upload_on_error { &self.canceller,
OnError::Abort => { settings.upload_part_request_timeout,
gst::log!( settings.upload_part_retry_duration,
CAT, upload_part_req_future,
obj: element, )
"Aborting multipart upload request with id: {}", .map_err(|err| match err {
state.upload_id WaitError::FutureError(err) => {
); match settings.multipart_upload_on_error {
match self.abort_multipart_upload_request(state) { OnError::Abort => {
Ok(()) => { gst::log!(
gst::log!( CAT,
CAT, obj: element,
obj: element, "Aborting multipart upload request with id: {}",
"Aborting multipart upload request succeeded." state.upload_id
); );
} match self.abort_multipart_upload_request(state) {
Err(err) => gst::error!( Ok(()) => {
gst::log!(
CAT, CAT,
obj: element, obj: element,
"Aborting multipart upload failed: {}", "Aborting multipart upload request succeeded."
err.to_string() );
),
} }
} Err(err) => gst::error!(
OnError::Complete => {
gst::log!(
CAT, CAT,
obj: element, obj: element,
"Completing multipart upload request with id: {}", "Aborting multipart upload failed: {}",
state.upload_id err.to_string()
); ),
match self.complete_multipart_upload_request(state) {
Ok(()) => {
gst::log!(
CAT,
obj: element,
"Complete multipart upload request succeeded."
);
}
Err(err) => gst::error!(
CAT,
obj: element,
"Completing multipart upload failed: {}",
err.to_string()
),
}
} }
OnError::DoNothing => (),
} }
Some(gst::error_msg!( OnError::Complete => {
gst::ResourceError::OpenWrite, gst::log!(
["Failed to upload part: {}", err] CAT,
)) obj: element,
"Completing multipart upload request with id: {}",
state.upload_id
);
match self.complete_multipart_upload_request(state) {
Ok(()) => {
gst::log!(
CAT,
obj: element,
"Complete multipart upload request succeeded."
);
}
Err(err) => gst::error!(
CAT,
obj: element,
"Completing multipart upload failed: {}",
err.to_string()
),
}
}
OnError::DoNothing => (),
} }
WaitError::Cancelled => None, Some(gst::error_msg!(
})?; gst::ResourceError::OpenWrite,
["Failed to upload part: {}", err]
))
}
WaitError::Cancelled => None,
})?;
state.completed_parts.push(CompletedPart { state.completed_parts.push(CompletedPart {
e_tag: output.e_tag, e_tag: output.e_tag,
@ -270,29 +292,22 @@ impl S3Sink {
Ok(()) Ok(())
} }
fn create_upload_part_request(&self) -> Result<UploadPartRequest, gst::ErrorMessage> { fn create_upload_part_request(
&self,
body: &[u8],
part_number: i64,
upload_id: &str,
) -> UploadPartRequest {
let url = self.url.lock().unwrap(); let url = self.url.lock().unwrap();
let settings = self.settings.lock().unwrap();
let mut state = self.state.lock().unwrap();
let state = match *state {
State::Started(ref mut started_state) => started_state,
State::Stopped => {
unreachable!("Element should be started");
}
};
let part_number = state.increment_part_number()?; UploadPartRequest {
Ok(UploadPartRequest { body: Some(rusoto_core::ByteStream::from(body.to_owned())),
body: Some(rusoto_core::ByteStream::from(std::mem::replace(
&mut state.buffer,
Vec::with_capacity(settings.buffer_size as usize),
))),
bucket: url.as_ref().unwrap().bucket.to_owned(), bucket: url.as_ref().unwrap().bucket.to_owned(),
key: url.as_ref().unwrap().object.to_owned(), key: url.as_ref().unwrap().object.to_owned(),
upload_id: state.upload_id.to_owned(), upload_id: upload_id.to_owned(),
part_number, part_number,
..Default::default() ..Default::default()
}) }
} }
fn create_complete_multipart_upload_request( fn create_complete_multipart_upload_request(
@ -640,6 +655,24 @@ impl ObjectImpl for S3Sink {
DEFAULT_MULTIPART_UPLOAD_ON_ERROR as i32, DEFAULT_MULTIPART_UPLOAD_ON_ERROR as i32,
glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY, glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY,
), ),
glib::ParamSpecInt64::new(
"upload-part-request-timeout",
"Upload part request timeout",
"Timeout for a single upload part request (in ms, set to -1 for infinity)",
-1,
std::i64::MAX,
DEFAULT_UPLOAD_PART_REQUEST_TIMEOUT_MSEC as i64,
glib::ParamFlags::READWRITE,
),
glib::ParamSpecInt64::new(
"upload-part-retry-duration",
"Upload part retry duration",
"How long we should retry upload part requests before giving up (in ms, set to -1 for infinity)",
-1,
std::i64::MAX,
DEFAULT_UPLOAD_PART_RETRY_DURATION_MSEC as i64,
glib::ParamFlags::READWRITE,
),
] ]
}); });
@ -716,6 +749,20 @@ impl ObjectImpl for S3Sink {
settings.multipart_upload_on_error = settings.multipart_upload_on_error =
value.get::<OnError>().expect("type checked upstream"); value.get::<OnError>().expect("type checked upstream");
} }
"upload-part-request-timeout" => {
settings.upload_part_request_timeout =
match value.get::<i64>().expect("type checked upstream") {
-1 => None,
v => Some(Duration::from_millis(v as u64)),
}
}
"upload-part-retry-duration" => {
settings.upload_part_retry_duration =
match value.get::<i64>().expect("type checked upstream") {
-1 => None,
v => Some(Duration::from_millis(v as u64)),
}
}
_ => unimplemented!(), _ => unimplemented!(),
} }
} }
@ -740,6 +787,20 @@ impl ObjectImpl for S3Sink {
"secret-access-key" => settings.secret_access_key.to_value(), "secret-access-key" => settings.secret_access_key.to_value(),
"metadata" => settings.metadata.to_value(), "metadata" => settings.metadata.to_value(),
"on-error" => settings.multipart_upload_on_error.to_value(), "on-error" => settings.multipart_upload_on_error.to_value(),
"upload-part-request-timeout" => {
let timeout: i64 = match settings.upload_part_request_timeout {
None => -1,
Some(v) => v.as_millis() as i64,
};
timeout.to_value()
}
"upload-part-retry-duration" => {
let timeout: i64 = match settings.upload_part_retry_duration {
None => -1,
Some(v) => v.as_millis() as i64,
};
timeout.to_value()
}
_ => unimplemented!(), _ => unimplemented!(),
} }
} }

View file

@ -7,13 +7,22 @@
// SPDX-License-Identifier: MPL-2.0 // SPDX-License-Identifier: MPL-2.0
use bytes::{buf::BufMut, Bytes, BytesMut}; use bytes::{buf::BufMut, Bytes, BytesMut};
use futures::stream::TryStreamExt; use futures::{future, Future, FutureExt, TryFutureExt, TryStreamExt};
use futures::{future, Future};
use once_cell::sync::Lazy; use once_cell::sync::Lazy;
use rusoto_core::ByteStream; use rusoto_core::RusotoError::HttpDispatch;
use rusoto_core::{ByteStream, HttpDispatchError, RusotoError};
use std::sync::Mutex; use std::sync::Mutex;
use std::time::Duration;
use tokio::runtime; use tokio::runtime;
static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
gst::DebugCategory::new(
"rusotos3utils",
gst::DebugColorFlags::empty(),
Some("Amazon S3 utilities"),
)
});
static RUNTIME: Lazy<runtime::Runtime> = Lazy::new(|| { static RUNTIME: Lazy<runtime::Runtime> = Lazy::new(|| {
runtime::Builder::new_multi_thread() runtime::Builder::new_multi_thread()
.enable_all() .enable_all()
@ -28,6 +37,112 @@ pub enum WaitError<E> {
FutureError(E), FutureError(E),
} }
fn make_timeout<F, T, E>(
timeout: Duration,
future: F,
) -> impl Future<Output = Result<T, RusotoError<E>>>
where
E: std::fmt::Debug,
F: Future<Output = Result<T, RusotoError<E>>>,
{
tokio::time::timeout(timeout, future).map(|v| match v {
// Future resolved succesfully
Ok(Ok(v)) => Ok(v),
// Future resolved with an error
Ok(Err(e)) => Err(e),
// Timeout elapsed
// Use an HttpDispatch error so the caller doesn't have to deal with this separately from
// other HTTP dispatch errors
_ => Err(HttpDispatch(HttpDispatchError::new("Timeout".to_owned()))),
})
}
fn make_retry<F, T, E, Fut>(
timeout: Option<Duration>,
mut future: F,
) -> impl Future<Output = Result<T, RusotoError<E>>>
where
E: std::fmt::Debug,
F: FnMut() -> Fut,
Fut: Future<Output = Result<T, RusotoError<E>>>,
{
backoff::future::retry(
backoff::ExponentialBackoffBuilder::new()
.with_initial_interval(Duration::from_millis(500))
.with_multiplier(1.5)
.with_max_elapsed_time(timeout)
.build(),
move || {
future().map_err(|err| match err {
HttpDispatch(_) => {
gst::warning!(CAT, "Error waiting for operation ({:?}), retrying", err);
backoff::Error::transient(err)
}
_ => backoff::Error::permanent(err),
})
},
)
}
pub fn wait_retry<F, T, E, Fut>(
canceller: &Mutex<Option<future::AbortHandle>>,
req_timeout: Option<Duration>,
retry_timeout: Option<Duration>,
mut future: F,
) -> Result<T, WaitError<RusotoError<E>>>
where
E: std::fmt::Debug,
F: FnMut() -> Fut,
Fut: Send + Future<Output = Result<T, RusotoError<E>>>,
Fut::Output: Send,
T: Send,
E: Send,
{
let mut canceller_guard = canceller.lock().unwrap();
let (abort_handle, abort_registration) = future::AbortHandle::new_pair();
canceller_guard.replace(abort_handle);
drop(canceller_guard);
let res = {
let _enter = RUNTIME.enter();
futures::executor::block_on(async {
// The order of this future stack matters: the innermost future is the supplied future
// generator closure. We wrap that in a timeout to bound how long we wait. This, in
// turn, is wrapped in a retrying future which will make multiple attempts until it
// ultimately fails.
// The timeout must be created within the tokio executor
let res = match req_timeout {
None => {
let retry_future = make_retry(retry_timeout, future);
future::Abortable::new(retry_future, abort_registration).await
}
Some(t) => {
let timeout_future = || make_timeout(t, future());
let retry_future = make_retry(retry_timeout, timeout_future);
future::Abortable::new(retry_future, abort_registration).await
}
};
match res {
// Future resolved successfully
Ok(Ok(res)) => Ok(res),
// Future resolved with an error
Ok(Err(err)) => Err(WaitError::FutureError(err)),
// Canceller called before future resolved
Err(future::Aborted) => Err(WaitError::Cancelled),
}
})
};
/* Clear out the canceller */
canceller_guard = canceller.lock().unwrap();
*canceller_guard = None;
res
}
pub fn wait<F, T, E>( pub fn wait<F, T, E>(
canceller: &Mutex<Option<future::AbortHandle>>, canceller: &Mutex<Option<future::AbortHandle>>,
future: F, future: F,