rusoto: s3sink: Implement timeout/retry for part uploads

Rusoto does not implement timeouts or retries for any of its HTTP
requests. This is particularly problematic for the part upload stage of
multipart uploads, as a blip in the network could cause part uploads to
freeze for a long duration and eventually bail.

To avoid this, for part uploads, we add (a) (configurable) timeouts for
each request, and (b) retries with exponential backoff, upto a
configurable duration.

It is not clear if/how we want to do this for other types of requests.
The creation of a multipart upload should be relatively quick, but the
completion of an upload might take several minutes, so there is no
one-size-fits-all configuration, necessarily.

It would likely make more sense to implement some sensible hard-coded
defaults for these other sorts of requests.
This commit is contained in:
Arun Raghavan 2022-02-23 13:10:30 -05:00 committed by Sebastian Dröge
parent 930bfce629
commit 249b0ac4c1
3 changed files with 256 additions and 77 deletions

View file

@ -21,7 +21,7 @@ rusoto_credential = "0.47"
rusoto_signature = "0.47"
url = "2"
percent-encoding = "2"
tokio = { version = "1.0", features = [ "rt-multi-thread" ] }
tokio = { version = "1.0", features = [ "rt-multi-thread", "time" ] }
async-tungstenite = { version = "0.16", features = ["tokio", "tokio-runtime", "tokio-native-tls"] }
nom = "7"
crc = "2"
@ -32,6 +32,7 @@ serde_derive = "1"
serde_json = "1"
atomic_refcell = "0.1"
base32 = "0.4"
backoff = { version = "0.4", features = [ "futures", "tokio" ] }
[lib]
name = "gstrusoto"

View file

@ -25,6 +25,7 @@ use once_cell::sync::Lazy;
use std::collections::HashMap;
use std::sync::Mutex;
use std::time::Duration;
use crate::s3url::*;
use crate::s3utils::{self, WaitError};
@ -32,6 +33,8 @@ use crate::s3utils::{self, WaitError};
use super::OnError;
const DEFAULT_MULTIPART_UPLOAD_ON_ERROR: OnError = OnError::DoNothing;
const DEFAULT_UPLOAD_PART_REQUEST_TIMEOUT_MSEC: u64 = 10_000;
const DEFAULT_UPLOAD_PART_RETRY_DURATION_MSEC: u64 = 60_000;
struct Started {
client: S3Client,
@ -94,6 +97,8 @@ struct Settings {
secret_access_key: Option<String>,
metadata: Option<gst::Structure>,
multipart_upload_on_error: OnError,
upload_part_request_timeout: Option<Duration>,
upload_part_retry_duration: Option<Duration>,
}
impl Settings {
@ -162,6 +167,12 @@ impl Default for Settings {
secret_access_key: None,
metadata: None,
multipart_upload_on_error: DEFAULT_MULTIPART_UPLOAD_ON_ERROR,
upload_part_request_timeout: Some(Duration::from_millis(
DEFAULT_UPLOAD_PART_REQUEST_TIMEOUT_MSEC,
)),
upload_part_retry_duration: Some(Duration::from_millis(
DEFAULT_UPLOAD_PART_RETRY_DURATION_MSEC,
)),
}
}
}
@ -188,10 +199,8 @@ impl S3Sink {
&self,
element: &super::S3Sink,
) -> Result<(), Option<gst::ErrorMessage>> {
let upload_part_req = self.create_upload_part_request()?;
let part_number = upload_part_req.part_number;
let mut state = self.state.lock().unwrap();
let settings = self.settings.lock().unwrap();
let state = match *state {
State::Started(ref mut started_state) => started_state,
State::Stopped => {
@ -199,68 +208,81 @@ impl S3Sink {
}
};
let upload_part_req_future = state.client.upload_part(upload_part_req);
let part_number = state.increment_part_number()?;
let body = std::mem::replace(
&mut state.buffer,
Vec::with_capacity(settings.buffer_size as usize),
);
let upload_id = &state.upload_id;
let client = &state.client;
let output =
s3utils::wait(&self.canceller, upload_part_req_future).map_err(|err| match err {
WaitError::FutureError(err) => {
let settings = self.settings.lock().unwrap();
match settings.multipart_upload_on_error {
OnError::Abort => {
gst_log!(
CAT,
obj: element,
"Aborting multipart upload request with id: {}",
state.upload_id
);
match self.abort_multipart_upload_request(state) {
Ok(()) => {
gst_log!(
CAT,
obj: element,
"Aborting multipart upload request succeeded."
);
}
Err(err) => gst_error!(
let upload_part_req_future =
|| client.upload_part(self.create_upload_part_request(&body, part_number, upload_id));
let output = s3utils::wait_retry(
&self.canceller,
settings.upload_part_request_timeout,
settings.upload_part_retry_duration,
upload_part_req_future,
)
.map_err(|err| match err {
WaitError::FutureError(err) => {
match settings.multipart_upload_on_error {
OnError::Abort => {
gst_log!(
CAT,
obj: element,
"Aborting multipart upload request with id: {}",
state.upload_id
);
match self.abort_multipart_upload_request(state) {
Ok(()) => {
gst_log!(
CAT,
obj: element,
"Aborting multipart upload failed: {}",
err.to_string()
),
"Aborting multipart upload request succeeded."
);
}
}
OnError::Complete => {
gst_log!(
Err(err) => gst_error!(
CAT,
obj: element,
"Completing multipart upload request with id: {}",
state.upload_id
);
match self.complete_multipart_upload_request(state) {
Ok(()) => {
gst_log!(
CAT,
obj: element,
"Complete multipart upload request succeeded."
);
}
Err(err) => gst_error!(
CAT,
obj: element,
"Completing multipart upload failed: {}",
err.to_string()
),
}
"Aborting multipart upload failed: {}",
err.to_string()
),
}
OnError::DoNothing => (),
}
Some(gst::error_msg!(
gst::ResourceError::OpenWrite,
["Failed to upload part: {}", err]
))
OnError::Complete => {
gst_log!(
CAT,
obj: element,
"Completing multipart upload request with id: {}",
state.upload_id
);
match self.complete_multipart_upload_request(state) {
Ok(()) => {
gst_log!(
CAT,
obj: element,
"Complete multipart upload request succeeded."
);
}
Err(err) => gst_error!(
CAT,
obj: element,
"Completing multipart upload failed: {}",
err.to_string()
),
}
}
OnError::DoNothing => (),
}
WaitError::Cancelled => None,
})?;
Some(gst::error_msg!(
gst::ResourceError::OpenWrite,
["Failed to upload part: {}", err]
))
}
WaitError::Cancelled => None,
})?;
state.completed_parts.push(CompletedPart {
e_tag: output.e_tag,
@ -271,29 +293,22 @@ impl S3Sink {
Ok(())
}
fn create_upload_part_request(&self) -> Result<UploadPartRequest, gst::ErrorMessage> {
fn create_upload_part_request(
&self,
body: &[u8],
part_number: i64,
upload_id: &str,
) -> UploadPartRequest {
let url = self.url.lock().unwrap();
let settings = self.settings.lock().unwrap();
let mut state = self.state.lock().unwrap();
let state = match *state {
State::Started(ref mut started_state) => started_state,
State::Stopped => {
unreachable!("Element should be started");
}
};
let part_number = state.increment_part_number()?;
Ok(UploadPartRequest {
body: Some(rusoto_core::ByteStream::from(std::mem::replace(
&mut state.buffer,
Vec::with_capacity(settings.buffer_size as usize),
))),
UploadPartRequest {
body: Some(rusoto_core::ByteStream::from(body.to_owned())),
bucket: url.as_ref().unwrap().bucket.to_owned(),
key: url.as_ref().unwrap().object.to_owned(),
upload_id: state.upload_id.to_owned(),
upload_id: upload_id.to_owned(),
part_number,
..Default::default()
})
}
}
fn create_complete_multipart_upload_request(
@ -641,6 +656,24 @@ impl ObjectImpl for S3Sink {
DEFAULT_MULTIPART_UPLOAD_ON_ERROR as i32,
glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY,
),
glib::ParamSpecInt64::new(
"upload-part-request-timeout",
"Upload part request timeout",
"Timeout for a single upload part request (in ms, set to -1 for infinity)",
-1,
std::i64::MAX,
DEFAULT_UPLOAD_PART_REQUEST_TIMEOUT_MSEC as i64,
glib::ParamFlags::READWRITE,
),
glib::ParamSpecInt64::new(
"upload-part-retry-duration",
"Upload part retry duration",
"How long we should retry upload part requests before giving up (in ms, set to -1 for infinity)",
-1,
std::i64::MAX,
DEFAULT_UPLOAD_PART_RETRY_DURATION_MSEC as i64,
glib::ParamFlags::READWRITE,
),
]
});
@ -717,6 +750,20 @@ impl ObjectImpl for S3Sink {
settings.multipart_upload_on_error =
value.get::<OnError>().expect("type checked upstream");
}
"upload-part-request-timeout" => {
settings.upload_part_request_timeout =
match value.get::<i64>().expect("type checked upstream") {
-1 => None,
v => Some(Duration::from_millis(v as u64)),
}
}
"upload-part-retry-duration" => {
settings.upload_part_retry_duration =
match value.get::<i64>().expect("type checked upstream") {
-1 => None,
v => Some(Duration::from_millis(v as u64)),
}
}
_ => unimplemented!(),
}
}
@ -741,6 +788,20 @@ impl ObjectImpl for S3Sink {
"secret-access-key" => settings.secret_access_key.to_value(),
"metadata" => settings.metadata.to_value(),
"on-error" => settings.multipart_upload_on_error.to_value(),
"upload-part-request-timeout" => {
let timeout: i64 = match settings.upload_part_request_timeout {
None => -1,
Some(v) => v.as_millis() as i64,
};
timeout.to_value()
}
"upload-part-retry-duration" => {
let timeout: i64 = match settings.upload_part_retry_duration {
None => -1,
Some(v) => v.as_millis() as i64,
};
timeout.to_value()
}
_ => unimplemented!(),
}
}

View file

@ -7,13 +7,24 @@
// SPDX-License-Identifier: MPL-2.0
use bytes::{buf::BufMut, Bytes, BytesMut};
use futures::stream::TryStreamExt;
use futures::{future, Future};
use futures::{future, Future, FutureExt, TryFutureExt, TryStreamExt};
use once_cell::sync::Lazy;
use rusoto_core::ByteStream;
use rusoto_core::RusotoError::HttpDispatch;
use rusoto_core::{ByteStream, HttpDispatchError, RusotoError};
use std::sync::Mutex;
use std::time::Duration;
use tokio::runtime;
use gst::gst_warning;
static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
gst::DebugCategory::new(
"rusotos3utils",
gst::DebugColorFlags::empty(),
Some("Amazon S3 utilities"),
)
});
static RUNTIME: Lazy<runtime::Runtime> = Lazy::new(|| {
runtime::Builder::new_multi_thread()
.enable_all()
@ -28,6 +39,112 @@ pub enum WaitError<E> {
FutureError(E),
}
fn make_timeout<F, T, E>(
timeout: Duration,
future: F,
) -> impl Future<Output = Result<T, RusotoError<E>>>
where
E: std::fmt::Debug,
F: Future<Output = Result<T, RusotoError<E>>>,
{
tokio::time::timeout(timeout, future).map(|v| match v {
// Future resolved succesfully
Ok(Ok(v)) => Ok(v),
// Future resolved with an error
Ok(Err(e)) => Err(e),
// Timeout elapsed
// Use an HttpDispatch error so the caller doesn't have to deal with this separately from
// other HTTP dispatch errors
_ => Err(HttpDispatch(HttpDispatchError::new("Timeout".to_owned()))),
})
}
fn make_retry<F, T, E, Fut>(
timeout: Option<Duration>,
mut future: F,
) -> impl Future<Output = Result<T, RusotoError<E>>>
where
E: std::fmt::Debug,
F: FnMut() -> Fut,
Fut: Future<Output = Result<T, RusotoError<E>>>,
{
backoff::future::retry(
backoff::ExponentialBackoffBuilder::new()
.with_initial_interval(Duration::from_millis(500))
.with_multiplier(1.5)
.with_max_elapsed_time(timeout)
.build(),
move || {
future().map_err(|err| match err {
HttpDispatch(_) => {
gst_warning!(CAT, "Error waiting for operation ({:?}), retrying", err);
backoff::Error::transient(err)
}
_ => backoff::Error::permanent(err),
})
},
)
}
pub fn wait_retry<F, T, E, Fut>(
canceller: &Mutex<Option<future::AbortHandle>>,
req_timeout: Option<Duration>,
retry_timeout: Option<Duration>,
mut future: F,
) -> Result<T, WaitError<RusotoError<E>>>
where
E: std::fmt::Debug,
F: FnMut() -> Fut,
Fut: Send + Future<Output = Result<T, RusotoError<E>>>,
Fut::Output: Send,
T: Send,
E: Send,
{
let mut canceller_guard = canceller.lock().unwrap();
let (abort_handle, abort_registration) = future::AbortHandle::new_pair();
canceller_guard.replace(abort_handle);
drop(canceller_guard);
let res = {
let _enter = RUNTIME.enter();
futures::executor::block_on(async {
// The order of this future stack matters: the innermost future is the supplied future
// generator closure. We wrap that in a timeout to bound how long we wait. This, in
// turn, is wrapped in a retrying future which will make multiple attempts until it
// ultimately fails.
// The timeout must be created within the tokio executor
let res = match req_timeout {
None => {
let retry_future = make_retry(retry_timeout, future);
future::Abortable::new(retry_future, abort_registration).await
}
Some(t) => {
let timeout_future = || make_timeout(t, future());
let retry_future = make_retry(retry_timeout, timeout_future);
future::Abortable::new(retry_future, abort_registration).await
}
};
match res {
// Future resolved successfully
Ok(Ok(res)) => Ok(res),
// Future resolved with an error
Ok(Err(err)) => Err(WaitError::FutureError(err)),
// Canceller called before future resolved
Err(future::Aborted) => Err(WaitError::Cancelled),
}
})
};
/* Clear out the canceller */
canceller_guard = canceller.lock().unwrap();
*canceller_guard = None;
res
}
pub fn wait<F, T, E>(
canceller: &Mutex<Option<future::AbortHandle>>,
future: F,