rusoto: s3sink: Make remaining requests bounded in time

This implements a default timeout and retry duration for the remaining
S3 requests that were still able to be blocked indefinitely. There are 3
classes of operations: multipart upload creation/abort (should not take
too long), uploads (duration depends on part size), multipart upload
completion (can take several minutes according to documentation).

We currently only expose the part upload times as configurable, and hard
code the rest. If it seems sensible, we can expose the other two sets of
parameters as well.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/690>
This commit is contained in:
Arun Raghavan 2022-03-18 15:14:17 +05:30
parent 7b8d3acf10
commit 724c6d6e32
2 changed files with 93 additions and 94 deletions

View file

@ -34,8 +34,17 @@ use crate::s3utils::{self, RetriableError, WaitError};
use super::OnError;
const DEFAULT_MULTIPART_UPLOAD_ON_ERROR: OnError = OnError::DoNothing;
// This needs to be independently configurable, as the part size can be upto 5GB
const DEFAULT_UPLOAD_PART_REQUEST_TIMEOUT_MSEC: u64 = 10_000;
const DEFAULT_UPLOAD_PART_RETRY_DURATION_MSEC: u64 = 60_000;
// The rest of these aren't exposed as properties yet
// General setting for create / abort requests
const DEFAULT_REQUEST_TIMEOUT_MSEC: u64 = 10_000;
const DEFAULT_RETRY_DURATION_MSEC: u64 = 60_000;
// CompletedMultipartUpload can take minutes to complete, so we need a longer value here
// https://docs.aws.amazon.com/AmazonS3/latest/API/API_CompleteMultipartUpload.html
const DEFAULT_COMPLETE_REQUEST_TIMEOUT_MSEC: u64 = 600_000; // 10 minutes
const DEFAULT_COMPLETE_RETRY_DURATION_MSEC: u64 = 3_600_000; // 60 minutes
struct Started {
client: S3Client,
@ -317,16 +326,9 @@ impl S3Sink {
fn create_complete_multipart_upload_request(
&self,
started_state: &mut Started,
started_state: &Started,
completed_upload: CompletedMultipartUpload,
) -> CompleteMultipartUploadRequest {
started_state
.completed_parts
.sort_by(|a, b| a.part_number.cmp(&b.part_number));
let completed_upload = CompletedMultipartUpload {
parts: Some(std::mem::take(&mut started_state.completed_parts)),
};
let url = self.url.lock().unwrap();
CompleteMultipartUploadRequest {
bucket: url.as_ref().unwrap().bucket.to_owned(),
@ -373,45 +375,74 @@ impl S3Sink {
Some(ref url) => url.clone(),
None => unreachable!("Element should be started"),
};
let abort_req = self.create_abort_multipart_upload_request(&s3url, started_state);
let abort_req_future = started_state.client.abort_multipart_upload(abort_req);
let abort_req_future = || {
let abort_req = self.create_abort_multipart_upload_request(&s3url, started_state);
started_state
.client
.abort_multipart_upload(abort_req)
.map_err(RetriableError::Rusoto)
};
s3utils::wait(&self.abort_multipart_canceller, abort_req_future)
.map(|_| ())
.map_err(|err| match err {
WaitError::FutureError(err) => {
gst::error_msg!(
gst::ResourceError::Write,
["Failed to abort multipart upload: {}.", err.to_string()]
)
}
WaitError::Cancelled => {
gst::error_msg!(
gst::ResourceError::Write,
["Abort multipart upload request interrupted."]
)
}
})
s3utils::wait_retry(
&self.abort_multipart_canceller,
Some(Duration::from_millis(DEFAULT_REQUEST_TIMEOUT_MSEC)),
Some(Duration::from_millis(DEFAULT_RETRY_DURATION_MSEC)),
abort_req_future,
)
.map(|_| ())
.map_err(|err| match err {
WaitError::FutureError(err) => {
gst::error_msg!(
gst::ResourceError::Write,
["Failed to abort multipart upload: {:?}.", err]
)
}
WaitError::Cancelled => {
gst::error_msg!(
gst::ResourceError::Write,
["Abort multipart upload request interrupted."]
)
}
})
}
fn complete_multipart_upload_request(
&self,
started_state: &mut Started,
) -> Result<(), gst::ErrorMessage> {
let complete_req = self.create_complete_multipart_upload_request(started_state);
let complete_req_future = started_state.client.complete_multipart_upload(complete_req);
started_state
.completed_parts
.sort_by(|a, b| a.part_number.cmp(&b.part_number));
s3utils::wait(&self.canceller, complete_req_future)
.map(|_| ())
.map_err(|err| match err {
WaitError::FutureError(err) => gst::error_msg!(
gst::ResourceError::Write,
["Failed to complete multipart upload: {}.", err.to_string()]
),
WaitError::Cancelled => {
gst::error_msg!(gst::LibraryError::Failed, ["Interrupted during stop"])
}
})
let completed_upload = CompletedMultipartUpload {
parts: Some(std::mem::take(&mut started_state.completed_parts)),
};
let complete_req_future = || {
let complete_req = self
.create_complete_multipart_upload_request(started_state, completed_upload.clone());
started_state
.client
.complete_multipart_upload(complete_req)
.map_err(RetriableError::Rusoto)
};
s3utils::wait_retry(
&self.canceller,
Some(Duration::from_millis(DEFAULT_COMPLETE_REQUEST_TIMEOUT_MSEC)),
Some(Duration::from_millis(DEFAULT_COMPLETE_RETRY_DURATION_MSEC)),
complete_req_future,
)
.map(|_| ())
.map_err(|err| match err {
WaitError::FutureError(err) => gst::error_msg!(
gst::ResourceError::Write,
["Failed to complete multipart upload: {:?}.", err]
),
WaitError::Cancelled => {
gst::error_msg!(gst::LibraryError::Failed, ["Interrupted during stop"])
}
})
}
fn finalize_upload(&self, element: &super::S3Sink) -> Result<(), gst::ErrorMessage> {
@ -467,20 +498,29 @@ impl S3Sink {
_ => S3Client::new(s3url.region.clone()),
};
let create_multipart_req = self.create_create_multipart_upload_request(&s3url, &settings);
let create_multipart_req_future = client.create_multipart_upload(create_multipart_req);
let create_multipart_req_future = || {
let create_multipart_req =
self.create_create_multipart_upload_request(&s3url, &settings);
client
.create_multipart_upload(create_multipart_req)
.map_err(RetriableError::Rusoto)
};
let response = s3utils::wait(&self.canceller, create_multipart_req_future).map_err(
|err| match err {
WaitError::FutureError(err) => gst::error_msg!(
gst::ResourceError::OpenWrite,
["Failed to create multipart upload: {}", err]
),
WaitError::Cancelled => {
gst::error_msg!(gst::LibraryError::Failed, ["Interrupted during start"])
}
},
)?;
let response = s3utils::wait_retry(
&self.canceller,
Some(Duration::from_millis(DEFAULT_REQUEST_TIMEOUT_MSEC)),
Some(Duration::from_millis(DEFAULT_RETRY_DURATION_MSEC)),
create_multipart_req_future,
)
.map_err(|err| match err {
WaitError::FutureError(err) => gst::error_msg!(
gst::ResourceError::OpenWrite,
["Failed to create multipart upload: {:?}", err]
),
WaitError::Cancelled => {
gst::error_msg!(gst::LibraryError::Failed, ["Interrupted during start"])
}
})?;
let upload_id = response.upload_id.ok_or_else(|| {
gst::error_msg!(

View file

@ -165,44 +165,3 @@ where
res
}
pub fn wait<F, T, E>(
canceller: &Mutex<Option<future::AbortHandle>>,
future: F,
) -> Result<T, WaitError<E>>
where
F: Send + Future<Output = Result<T, E>>,
F::Output: Send,
T: Send,
E: Send,
{
let mut canceller_guard = canceller.lock().unwrap();
let (abort_handle, abort_registration) = future::AbortHandle::new_pair();
canceller_guard.replace(abort_handle);
drop(canceller_guard);
let abortable_future = future::Abortable::new(future, abort_registration);
// FIXME: add a timeout as well
let res = {
let _enter = RUNTIME.enter();
futures::executor::block_on(async {
match abortable_future.await {
// Future resolved successfully
Ok(Ok(res)) => Ok(res),
// Future resolved with an error
Ok(Err(err)) => Err(WaitError::FutureError(err)),
// Canceller called before future resolved
Err(future::Aborted) => Err(WaitError::Cancelled),
}
})
};
/* Clear out the canceller */
canceller_guard = canceller.lock().unwrap();
*canceller_guard = None;
res
}