rusoto: s3sink: Make remaining requests bounded in time

This implements a default timeout and retry duration for the remaining
S3 requests that were still able to be blocked indefinitely. There are 3
classes of operations: multipart upload creation/abort (should not take
too long), uploads (duration depends on part size), multipart upload
completion (can take several minutes according to documentation).

We currently only expose the part upload times as configurable, and hard
code the rest. If it seems sensible, we can expose the other two sets of
parameters as well.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/690>
This commit is contained in:
Arun Raghavan 2022-03-18 15:14:17 +05:30
parent 5fe95afe87
commit 94c5cbbfb8
2 changed files with 93 additions and 94 deletions

View file

@ -33,8 +33,17 @@ use crate::s3utils::{self, RetriableError, WaitError};
use super::OnError; use super::OnError;
const DEFAULT_MULTIPART_UPLOAD_ON_ERROR: OnError = OnError::DoNothing; const DEFAULT_MULTIPART_UPLOAD_ON_ERROR: OnError = OnError::DoNothing;
// This needs to be independently configurable, as the part size can be upto 5GB
const DEFAULT_UPLOAD_PART_REQUEST_TIMEOUT_MSEC: u64 = 10_000; const DEFAULT_UPLOAD_PART_REQUEST_TIMEOUT_MSEC: u64 = 10_000;
const DEFAULT_UPLOAD_PART_RETRY_DURATION_MSEC: u64 = 60_000; const DEFAULT_UPLOAD_PART_RETRY_DURATION_MSEC: u64 = 60_000;
// The rest of these aren't exposed as properties yet
// General setting for create / abort requests
const DEFAULT_REQUEST_TIMEOUT_MSEC: u64 = 10_000;
const DEFAULT_RETRY_DURATION_MSEC: u64 = 60_000;
// CompletedMultipartUpload can take minutes to complete, so we need a longer value here
// https://docs.aws.amazon.com/AmazonS3/latest/API/API_CompleteMultipartUpload.html
const DEFAULT_COMPLETE_REQUEST_TIMEOUT_MSEC: u64 = 600_000; // 10 minutes
const DEFAULT_COMPLETE_RETRY_DURATION_MSEC: u64 = 3_600_000; // 60 minutes
struct Started { struct Started {
client: S3Client, client: S3Client,
@ -316,16 +325,9 @@ impl S3Sink {
fn create_complete_multipart_upload_request( fn create_complete_multipart_upload_request(
&self, &self,
started_state: &mut Started, started_state: &Started,
completed_upload: CompletedMultipartUpload,
) -> CompleteMultipartUploadRequest { ) -> CompleteMultipartUploadRequest {
started_state
.completed_parts
.sort_by(|a, b| a.part_number.cmp(&b.part_number));
let completed_upload = CompletedMultipartUpload {
parts: Some(std::mem::take(&mut started_state.completed_parts)),
};
let url = self.url.lock().unwrap(); let url = self.url.lock().unwrap();
CompleteMultipartUploadRequest { CompleteMultipartUploadRequest {
bucket: url.as_ref().unwrap().bucket.to_owned(), bucket: url.as_ref().unwrap().bucket.to_owned(),
@ -372,45 +374,74 @@ impl S3Sink {
Some(ref url) => url.clone(), Some(ref url) => url.clone(),
None => unreachable!("Element should be started"), None => unreachable!("Element should be started"),
}; };
let abort_req = self.create_abort_multipart_upload_request(&s3url, started_state); let abort_req_future = || {
let abort_req_future = started_state.client.abort_multipart_upload(abort_req); let abort_req = self.create_abort_multipart_upload_request(&s3url, started_state);
started_state
.client
.abort_multipart_upload(abort_req)
.map_err(RetriableError::Rusoto)
};
s3utils::wait(&self.abort_multipart_canceller, abort_req_future) s3utils::wait_retry(
.map(|_| ()) &self.abort_multipart_canceller,
.map_err(|err| match err { Some(Duration::from_millis(DEFAULT_REQUEST_TIMEOUT_MSEC)),
WaitError::FutureError(err) => { Some(Duration::from_millis(DEFAULT_RETRY_DURATION_MSEC)),
gst::error_msg!( abort_req_future,
gst::ResourceError::Write, )
["Failed to abort multipart upload: {}.", err.to_string()] .map(|_| ())
) .map_err(|err| match err {
} WaitError::FutureError(err) => {
WaitError::Cancelled => { gst::error_msg!(
gst::error_msg!( gst::ResourceError::Write,
gst::ResourceError::Write, ["Failed to abort multipart upload: {:?}.", err]
["Abort multipart upload request interrupted."] )
) }
} WaitError::Cancelled => {
}) gst::error_msg!(
gst::ResourceError::Write,
["Abort multipart upload request interrupted."]
)
}
})
} }
fn complete_multipart_upload_request( fn complete_multipart_upload_request(
&self, &self,
started_state: &mut Started, started_state: &mut Started,
) -> Result<(), gst::ErrorMessage> { ) -> Result<(), gst::ErrorMessage> {
let complete_req = self.create_complete_multipart_upload_request(started_state); started_state
let complete_req_future = started_state.client.complete_multipart_upload(complete_req); .completed_parts
.sort_by(|a, b| a.part_number.cmp(&b.part_number));
s3utils::wait(&self.canceller, complete_req_future) let completed_upload = CompletedMultipartUpload {
.map(|_| ()) parts: Some(std::mem::take(&mut started_state.completed_parts)),
.map_err(|err| match err { };
WaitError::FutureError(err) => gst::error_msg!(
gst::ResourceError::Write, let complete_req_future = || {
["Failed to complete multipart upload: {}.", err.to_string()] let complete_req = self
), .create_complete_multipart_upload_request(started_state, completed_upload.clone());
WaitError::Cancelled => { started_state
gst::error_msg!(gst::LibraryError::Failed, ["Interrupted during stop"]) .client
} .complete_multipart_upload(complete_req)
}) .map_err(RetriableError::Rusoto)
};
s3utils::wait_retry(
&self.canceller,
Some(Duration::from_millis(DEFAULT_COMPLETE_REQUEST_TIMEOUT_MSEC)),
Some(Duration::from_millis(DEFAULT_COMPLETE_RETRY_DURATION_MSEC)),
complete_req_future,
)
.map(|_| ())
.map_err(|err| match err {
WaitError::FutureError(err) => gst::error_msg!(
gst::ResourceError::Write,
["Failed to complete multipart upload: {:?}.", err]
),
WaitError::Cancelled => {
gst::error_msg!(gst::LibraryError::Failed, ["Interrupted during stop"])
}
})
} }
fn finalize_upload(&self, element: &super::S3Sink) -> Result<(), gst::ErrorMessage> { fn finalize_upload(&self, element: &super::S3Sink) -> Result<(), gst::ErrorMessage> {
@ -466,20 +497,29 @@ impl S3Sink {
_ => S3Client::new(s3url.region.clone()), _ => S3Client::new(s3url.region.clone()),
}; };
let create_multipart_req = self.create_create_multipart_upload_request(&s3url, &settings); let create_multipart_req_future = || {
let create_multipart_req_future = client.create_multipart_upload(create_multipart_req); let create_multipart_req =
self.create_create_multipart_upload_request(&s3url, &settings);
client
.create_multipart_upload(create_multipart_req)
.map_err(RetriableError::Rusoto)
};
let response = s3utils::wait(&self.canceller, create_multipart_req_future).map_err( let response = s3utils::wait_retry(
|err| match err { &self.canceller,
WaitError::FutureError(err) => gst::error_msg!( Some(Duration::from_millis(DEFAULT_REQUEST_TIMEOUT_MSEC)),
gst::ResourceError::OpenWrite, Some(Duration::from_millis(DEFAULT_RETRY_DURATION_MSEC)),
["Failed to create multipart upload: {}", err] create_multipart_req_future,
), )
WaitError::Cancelled => { .map_err(|err| match err {
gst::error_msg!(gst::LibraryError::Failed, ["Interrupted during start"]) WaitError::FutureError(err) => gst::error_msg!(
} gst::ResourceError::OpenWrite,
}, ["Failed to create multipart upload: {:?}", err]
)?; ),
WaitError::Cancelled => {
gst::error_msg!(gst::LibraryError::Failed, ["Interrupted during start"])
}
})?;
let upload_id = response.upload_id.ok_or_else(|| { let upload_id = response.upload_id.ok_or_else(|| {
gst::error_msg!( gst::error_msg!(

View file

@ -163,44 +163,3 @@ where
res res
} }
pub fn wait<F, T, E>(
canceller: &Mutex<Option<future::AbortHandle>>,
future: F,
) -> Result<T, WaitError<E>>
where
F: Send + Future<Output = Result<T, E>>,
F::Output: Send,
T: Send,
E: Send,
{
let mut canceller_guard = canceller.lock().unwrap();
let (abort_handle, abort_registration) = future::AbortHandle::new_pair();
canceller_guard.replace(abort_handle);
drop(canceller_guard);
let abortable_future = future::Abortable::new(future, abort_registration);
// FIXME: add a timeout as well
let res = {
let _enter = RUNTIME.enter();
futures::executor::block_on(async {
match abortable_future.await {
// Future resolved successfully
Ok(Ok(res)) => Ok(res),
// Future resolved with an error
Ok(Err(err)) => Err(WaitError::FutureError(err)),
// Canceller called before future resolved
Err(future::Aborted) => Err(WaitError::Cancelled),
}
})
};
/* Clear out the canceller */
canceller_guard = canceller.lock().unwrap();
*canceller_guard = None;
res
}