From 099a3f2114b1b1eb8f9dfd161854307ef36a5fb3 Mon Sep 17 00:00:00 2001 From: Sanchayan Maity Date: Mon, 6 Dec 2021 17:36:10 +0530 Subject: [PATCH] rusoto: s3sink: Support aborting or completing multipart upload on error A multipart upload should either be completed or aborted on error. In the current state of things, a multipart upload would neither be completed nor aborted, putting the onus on an external entity to take care of finishing incomplete uploads or relying on a sane bucket life cycle policy configured to abort incomplete multipart uploads. An incomplete multipart upload still contributes to the storage costs as long as it exists. We introduce a property here to allow the user to select either aborting or completing multipart uploads on error. Aborting the upload causes whole of data to be discarded and the same upload ID is not usable for uploading more parts to the same. Completing an incomplete multipart upload can be useful in situations like having a streamable MP4 where one might want to complete the upload and have part of the data which was uploaded be preserved. Part-of: --- net/rusoto/src/s3sink/imp.rs | 167 ++++++++++++++++++++++++++++++----- net/rusoto/src/s3sink/mod.rs | 15 ++++ 2 files changed, 162 insertions(+), 20 deletions(-) diff --git a/net/rusoto/src/s3sink/imp.rs b/net/rusoto/src/s3sink/imp.rs index 0d43ff51..006c6f11 100644 --- a/net/rusoto/src/s3sink/imp.rs +++ b/net/rusoto/src/s3sink/imp.rs @@ -17,8 +17,8 @@ use futures::future; use rusoto_core::{region::Region, request::HttpClient}; use rusoto_credential::StaticProvider; use rusoto_s3::{ - CompleteMultipartUploadRequest, CompletedMultipartUpload, CompletedPart, - CreateMultipartUploadRequest, S3Client, UploadPartRequest, S3, + AbortMultipartUploadRequest, CompleteMultipartUploadRequest, CompletedMultipartUpload, + CompletedPart, CreateMultipartUploadRequest, S3Client, UploadPartRequest, S3, }; use once_cell::sync::Lazy; @@ -29,6 +29,10 @@ use std::sync::Mutex; use crate::s3url::*; use crate::s3utils::{self, WaitError}; +use super::OnError; + +const DEFAULT_MULTIPART_UPLOAD_ON_ERROR: OnError = OnError::DoNothing; + struct Started { client: S3Client, buffer: Vec, @@ -89,6 +93,7 @@ struct Settings { access_key: Option, secret_access_key: Option, metadata: Option, + multipart_upload_on_error: OnError, } impl Settings { @@ -156,6 +161,7 @@ impl Default for Settings { access_key: None, secret_access_key: None, metadata: None, + multipart_upload_on_error: DEFAULT_MULTIPART_UPLOAD_ON_ERROR, } } } @@ -166,6 +172,7 @@ pub struct S3Sink { settings: Mutex, state: Mutex, canceller: Mutex>, + abort_multipart_canceller: Mutex>, } static CAT: Lazy = Lazy::new(|| { @@ -196,10 +203,62 @@ impl S3Sink { let output = s3utils::wait(&self.canceller, upload_part_req_future).map_err(|err| match err { - WaitError::FutureError(err) => Some(gst::error_msg!( - gst::ResourceError::OpenWrite, - ["Failed to upload part: {}", err] - )), + WaitError::FutureError(err) => { + let settings = self.settings.lock().unwrap(); + match settings.multipart_upload_on_error { + OnError::Abort => { + gst_log!( + CAT, + obj: element, + "Aborting multipart upload request with id: {}", + state.upload_id + ); + match self.abort_multipart_upload_request(state) { + Ok(()) => { + gst_log!( + CAT, + obj: element, + "Aborting multipart upload request succeeded." + ); + } + Err(err) => gst_error!( + CAT, + obj: element, + "Aborting multipart upload failed: {}", + err.to_string() + ), + } + } + OnError::Complete => { + gst_log!( + CAT, + obj: element, + "Completing multipart upload request with id: {}", + state.upload_id + ); + match self.complete_multipart_upload_request(state) { + Ok(()) => { + gst_log!( + CAT, + obj: element, + "Complete multipart upload request succeeded." + ); + } + Err(err) => gst_error!( + CAT, + obj: element, + "Completing multipart upload failed: {}", + err.to_string() + ), + } + } + OnError::DoNothing => (), + } + Some(gst::error_msg!( + gst::ResourceError::OpenWrite, + ["Failed to upload part: {}", err] + )) + } WaitError::Cancelled => None, })?; @@ -273,6 +332,69 @@ impl S3Sink { } } + fn create_abort_multipart_upload_request( + &self, + url: &GstS3Url, + started_state: &Started, + ) -> AbortMultipartUploadRequest { + AbortMultipartUploadRequest { + bucket: url.bucket.clone(), + expected_bucket_owner: None, + key: url.object.clone(), + request_payer: None, + upload_id: started_state.upload_id.to_owned(), + } + } + + fn abort_multipart_upload_request( + &self, + started_state: &Started, + ) -> Result<(), gst::ErrorMessage> { + let s3url = match *self.url.lock().unwrap() { + Some(ref url) => url.clone(), + None => unreachable!("Element should be started"), + }; + let abort_req = self.create_abort_multipart_upload_request(&s3url, started_state); + let abort_req_future = started_state.client.abort_multipart_upload(abort_req); + + s3utils::wait(&self.abort_multipart_canceller, abort_req_future) + .map(|_| ()) + .map_err(|err| match err { + WaitError::FutureError(err) => { + gst::error_msg!( + gst::ResourceError::Write, + ["Failed to abort multipart upload: {}.", err.to_string()] + ) + } + WaitError::Cancelled => { + gst::error_msg!( + gst::ResourceError::Write, + ["Abort multipart upload request interrupted."] + ) + } + }) + } + + fn complete_multipart_upload_request( + &self, + started_state: &mut Started, + ) -> Result<(), gst::ErrorMessage> { + let complete_req = self.create_complete_multipart_upload_request(started_state); + let complete_req_future = started_state.client.complete_multipart_upload(complete_req); + + s3utils::wait(&self.canceller, complete_req_future) + .map(|_| ()) + .map_err(|err| match err { + WaitError::FutureError(err) => gst::error_msg!( + gst::ResourceError::Write, + ["Failed to complete multipart upload: {}.", err.to_string()] + ), + WaitError::Cancelled => { + gst::error_msg!(gst::LibraryError::Failed, ["Interrupted during stop"]) + } + }) + } + fn finalize_upload(&self, element: &super::S3Sink) -> Result<(), gst::ErrorMessage> { if self.flush_current_buffer(element).is_err() { return Err(gst::error_msg!( @@ -289,20 +411,7 @@ impl S3Sink { } }; - let complete_req = self.create_complete_multipart_upload_request(started_state); - let complete_req_future = started_state.client.complete_multipart_upload(complete_req); - - s3utils::wait(&self.canceller, complete_req_future) - .map(|_| ()) - .map_err(|err| match err { - WaitError::FutureError(err) => gst::error_msg!( - gst::ResourceError::Write, - ["Failed to complete multipart upload: {}.", err.to_string()] - ), - WaitError::Cancelled => { - gst::error_msg!(gst::LibraryError::Failed, ["Interrupted during stop"]) - } - }) + self.complete_multipart_upload_request(started_state) } fn start(&self) -> Result<(), gst::ErrorMessage> { @@ -406,6 +515,11 @@ impl S3Sink { fn cancel(&self) { let mut canceller = self.canceller.lock().unwrap(); + let mut abort_canceller = self.abort_multipart_canceller.lock().unwrap(); + + if let Some(c) = abort_canceller.take() { + c.abort() + }; if let Some(c) = canceller.take() { c.abort() @@ -519,6 +633,14 @@ impl ObjectImpl for S3Sink { gst::Structure::static_type(), glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY, ), + glib::ParamSpecEnum::new( + "on-error", + "Whether to upload or complete the multipart upload on error", + "Do nothing, abort or complete a multipart upload request on error", + OnError::static_type(), + DEFAULT_MULTIPART_UPLOAD_ON_ERROR as i32, + glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY, + ), ] }); @@ -591,6 +713,10 @@ impl ObjectImpl for S3Sink { "metadata" => { settings.metadata = value.get().expect("type checked upstream"); } + "on-error" => { + settings.multipart_upload_on_error = + value.get::().expect("type checked upstream"); + } _ => unimplemented!(), } } @@ -614,6 +740,7 @@ impl ObjectImpl for S3Sink { "access-key" => settings.access_key.to_value(), "secret-access-key" => settings.secret_access_key.to_value(), "metadata" => settings.metadata.to_value(), + "on-error" => settings.multipart_upload_on_error.to_value(), _ => unimplemented!(), } } diff --git a/net/rusoto/src/s3sink/mod.rs b/net/rusoto/src/s3sink/mod.rs index 57dc6a1d..0f039925 100644 --- a/net/rusoto/src/s3sink/mod.rs +++ b/net/rusoto/src/s3sink/mod.rs @@ -11,6 +11,21 @@ use gst::prelude::*; mod imp; +#[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Clone, Copy, glib::Enum)] +#[repr(u32)] +#[enum_type(name = "GstS3SinkOnError")] +pub(crate) enum OnError { + #[enum_value(name = "Abort: Abort multipart upload on error.", nick = "abort")] + Abort, + #[enum_value( + name = "Complete: Complete multipart upload on error.", + nick = "complete" + )] + Complete, + #[enum_value(name = "DoNothing: Do nothing on error.", nick = "nothing")] + DoNothing, +} + glib::wrapper! { pub struct S3Sink(ObjectSubclass) @extends gst_base::BaseSink, gst::Element, gst::Object; }