diff --git a/net/rusoto/src/s3sink/imp.rs b/net/rusoto/src/s3sink/imp.rs index 0d43ff518..006c6f114 100644 --- a/net/rusoto/src/s3sink/imp.rs +++ b/net/rusoto/src/s3sink/imp.rs @@ -17,8 +17,8 @@ use futures::future; use rusoto_core::{region::Region, request::HttpClient}; use rusoto_credential::StaticProvider; use rusoto_s3::{ - CompleteMultipartUploadRequest, CompletedMultipartUpload, CompletedPart, - CreateMultipartUploadRequest, S3Client, UploadPartRequest, S3, + AbortMultipartUploadRequest, CompleteMultipartUploadRequest, CompletedMultipartUpload, + CompletedPart, CreateMultipartUploadRequest, S3Client, UploadPartRequest, S3, }; use once_cell::sync::Lazy; @@ -29,6 +29,10 @@ use std::sync::Mutex; use crate::s3url::*; use crate::s3utils::{self, WaitError}; +use super::OnError; + +const DEFAULT_MULTIPART_UPLOAD_ON_ERROR: OnError = OnError::DoNothing; + struct Started { client: S3Client, buffer: Vec, @@ -89,6 +93,7 @@ struct Settings { access_key: Option, secret_access_key: Option, metadata: Option, + multipart_upload_on_error: OnError, } impl Settings { @@ -156,6 +161,7 @@ impl Default for Settings { access_key: None, secret_access_key: None, metadata: None, + multipart_upload_on_error: DEFAULT_MULTIPART_UPLOAD_ON_ERROR, } } } @@ -166,6 +172,7 @@ pub struct S3Sink { settings: Mutex, state: Mutex, canceller: Mutex>, + abort_multipart_canceller: Mutex>, } static CAT: Lazy = Lazy::new(|| { @@ -196,10 +203,62 @@ impl S3Sink { let output = s3utils::wait(&self.canceller, upload_part_req_future).map_err(|err| match err { - WaitError::FutureError(err) => Some(gst::error_msg!( - gst::ResourceError::OpenWrite, - ["Failed to upload part: {}", err] - )), + WaitError::FutureError(err) => { + let settings = self.settings.lock().unwrap(); + match settings.multipart_upload_on_error { + OnError::Abort => { + gst_log!( + CAT, + obj: element, + "Aborting multipart upload request with id: {}", + state.upload_id + ); + match self.abort_multipart_upload_request(state) { + Ok(()) => { + gst_log!( + CAT, + obj: element, + "Aborting multipart upload request succeeded." + ); + } + Err(err) => gst_error!( + CAT, + obj: element, + "Aborting multipart upload failed: {}", + err.to_string() + ), + } + } + OnError::Complete => { + gst_log!( + CAT, + obj: element, + "Completing multipart upload request with id: {}", + state.upload_id + ); + match self.complete_multipart_upload_request(state) { + Ok(()) => { + gst_log!( + CAT, + obj: element, + "Complete multipart upload request succeeded." + ); + } + Err(err) => gst_error!( + CAT, + obj: element, + "Completing multipart upload failed: {}", + err.to_string() + ), + } + } + OnError::DoNothing => (), + } + Some(gst::error_msg!( + gst::ResourceError::OpenWrite, + ["Failed to upload part: {}", err] + )) + } WaitError::Cancelled => None, })?; @@ -273,6 +332,69 @@ impl S3Sink { } } + fn create_abort_multipart_upload_request( + &self, + url: &GstS3Url, + started_state: &Started, + ) -> AbortMultipartUploadRequest { + AbortMultipartUploadRequest { + bucket: url.bucket.clone(), + expected_bucket_owner: None, + key: url.object.clone(), + request_payer: None, + upload_id: started_state.upload_id.to_owned(), + } + } + + fn abort_multipart_upload_request( + &self, + started_state: &Started, + ) -> Result<(), gst::ErrorMessage> { + let s3url = match *self.url.lock().unwrap() { + Some(ref url) => url.clone(), + None => unreachable!("Element should be started"), + }; + let abort_req = self.create_abort_multipart_upload_request(&s3url, started_state); + let abort_req_future = started_state.client.abort_multipart_upload(abort_req); + + s3utils::wait(&self.abort_multipart_canceller, abort_req_future) + .map(|_| ()) + .map_err(|err| match err { + WaitError::FutureError(err) => { + gst::error_msg!( + gst::ResourceError::Write, + ["Failed to abort multipart upload: {}.", err.to_string()] + ) + } + WaitError::Cancelled => { + gst::error_msg!( + gst::ResourceError::Write, + ["Abort multipart upload request interrupted."] + ) + } + }) + } + + fn complete_multipart_upload_request( + &self, + started_state: &mut Started, + ) -> Result<(), gst::ErrorMessage> { + let complete_req = self.create_complete_multipart_upload_request(started_state); + let complete_req_future = started_state.client.complete_multipart_upload(complete_req); + + s3utils::wait(&self.canceller, complete_req_future) + .map(|_| ()) + .map_err(|err| match err { + WaitError::FutureError(err) => gst::error_msg!( + gst::ResourceError::Write, + ["Failed to complete multipart upload: {}.", err.to_string()] + ), + WaitError::Cancelled => { + gst::error_msg!(gst::LibraryError::Failed, ["Interrupted during stop"]) + } + }) + } + fn finalize_upload(&self, element: &super::S3Sink) -> Result<(), gst::ErrorMessage> { if self.flush_current_buffer(element).is_err() { return Err(gst::error_msg!( @@ -289,20 +411,7 @@ impl S3Sink { } }; - let complete_req = self.create_complete_multipart_upload_request(started_state); - let complete_req_future = started_state.client.complete_multipart_upload(complete_req); - - s3utils::wait(&self.canceller, complete_req_future) - .map(|_| ()) - .map_err(|err| match err { - WaitError::FutureError(err) => gst::error_msg!( - gst::ResourceError::Write, - ["Failed to complete multipart upload: {}.", err.to_string()] - ), - WaitError::Cancelled => { - gst::error_msg!(gst::LibraryError::Failed, ["Interrupted during stop"]) - } - }) + self.complete_multipart_upload_request(started_state) } fn start(&self) -> Result<(), gst::ErrorMessage> { @@ -406,6 +515,11 @@ impl S3Sink { fn cancel(&self) { let mut canceller = self.canceller.lock().unwrap(); + let mut abort_canceller = self.abort_multipart_canceller.lock().unwrap(); + + if let Some(c) = abort_canceller.take() { + c.abort() + }; if let Some(c) = canceller.take() { c.abort() @@ -519,6 +633,14 @@ impl ObjectImpl for S3Sink { gst::Structure::static_type(), glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY, ), + glib::ParamSpecEnum::new( + "on-error", + "Whether to upload or complete the multipart upload on error", + "Do nothing, abort or complete a multipart upload request on error", + OnError::static_type(), + DEFAULT_MULTIPART_UPLOAD_ON_ERROR as i32, + glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY, + ), ] }); @@ -591,6 +713,10 @@ impl ObjectImpl for S3Sink { "metadata" => { settings.metadata = value.get().expect("type checked upstream"); } + "on-error" => { + settings.multipart_upload_on_error = + value.get::().expect("type checked upstream"); + } _ => unimplemented!(), } } @@ -614,6 +740,7 @@ impl ObjectImpl for S3Sink { "access-key" => settings.access_key.to_value(), "secret-access-key" => settings.secret_access_key.to_value(), "metadata" => settings.metadata.to_value(), + "on-error" => settings.multipart_upload_on_error.to_value(), _ => unimplemented!(), } } diff --git a/net/rusoto/src/s3sink/mod.rs b/net/rusoto/src/s3sink/mod.rs index 57dc6a1d6..0f0399256 100644 --- a/net/rusoto/src/s3sink/mod.rs +++ b/net/rusoto/src/s3sink/mod.rs @@ -11,6 +11,21 @@ use gst::prelude::*; mod imp; +#[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Clone, Copy, glib::Enum)] +#[repr(u32)] +#[enum_type(name = "GstS3SinkOnError")] +pub(crate) enum OnError { + #[enum_value(name = "Abort: Abort multipart upload on error.", nick = "abort")] + Abort, + #[enum_value( + name = "Complete: Complete multipart upload on error.", + nick = "complete" + )] + Complete, + #[enum_value(name = "DoNothing: Do nothing on error.", nick = "nothing")] + DoNothing, +} + glib::wrapper! { pub struct S3Sink(ObjectSubclass) @extends gst_base::BaseSink, gst::Element, gst::Object; }