mirror of
https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs.git
synced 2025-02-13 03:15:25 +00:00
aws: s3sink: Treat stopping without EOS as an error for multipart upload
This allows us to try to clean up based on configuration (abort / complete / do nothing) if the pipeline is shut down without an EOS. Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/986>
This commit is contained in:
parent
274e57a536
commit
b015688447
1 changed files with 94 additions and 51 deletions
|
@ -88,6 +88,7 @@ impl Started {
|
|||
|
||||
enum State {
|
||||
Stopped,
|
||||
Completed,
|
||||
Started(Started),
|
||||
}
|
||||
|
||||
|
@ -185,12 +186,68 @@ static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
|
|||
});
|
||||
|
||||
impl S3Sink {
|
||||
fn flush_multipart_upload(&self, state: &mut Started) {
|
||||
let settings = self.settings.lock().unwrap();
|
||||
match settings.multipart_upload_on_error {
|
||||
OnError::Abort => {
|
||||
gst::log!(
|
||||
CAT,
|
||||
imp: self,
|
||||
"Aborting multipart upload request with id: {}",
|
||||
state.upload_id
|
||||
);
|
||||
match self.abort_multipart_upload_request(state) {
|
||||
Ok(()) => {
|
||||
gst::log!(
|
||||
CAT,
|
||||
imp: self,
|
||||
"Aborting multipart upload request succeeded."
|
||||
);
|
||||
}
|
||||
Err(err) => gst::error!(
|
||||
CAT,
|
||||
imp: self,
|
||||
"Aborting multipart upload failed: {}",
|
||||
err.to_string()
|
||||
),
|
||||
}
|
||||
}
|
||||
OnError::Complete => {
|
||||
gst::log!(
|
||||
CAT,
|
||||
imp: self,
|
||||
"Completing multipart upload request with id: {}",
|
||||
state.upload_id
|
||||
);
|
||||
match self.complete_multipart_upload_request(state) {
|
||||
Ok(()) => {
|
||||
gst::log!(
|
||||
CAT,
|
||||
imp: self,
|
||||
"Complete multipart upload request succeeded."
|
||||
);
|
||||
}
|
||||
Err(err) => gst::error!(
|
||||
CAT,
|
||||
imp: self,
|
||||
"Completing multipart upload failed: {}",
|
||||
err.to_string()
|
||||
),
|
||||
}
|
||||
}
|
||||
OnError::DoNothing => (),
|
||||
}
|
||||
}
|
||||
|
||||
fn flush_current_buffer(&self) -> Result<(), Option<gst::ErrorMessage>> {
|
||||
let upload_part_req: UploadPart = self.create_upload_part_request()?;
|
||||
|
||||
let mut state = self.state.lock().unwrap();
|
||||
let state = match *state {
|
||||
State::Started(ref mut started_state) => started_state,
|
||||
State::Completed => {
|
||||
unreachable!("Upload should not be completed yet");
|
||||
}
|
||||
State::Stopped => {
|
||||
unreachable!("Element should be started");
|
||||
}
|
||||
|
@ -202,56 +259,7 @@ impl S3Sink {
|
|||
let output =
|
||||
s3utils::wait(&self.canceller, upload_part_req_future).map_err(|err| match err {
|
||||
WaitError::FutureError(err) => {
|
||||
let settings = self.settings.lock().unwrap();
|
||||
match settings.multipart_upload_on_error {
|
||||
OnError::Abort => {
|
||||
gst::log!(
|
||||
CAT,
|
||||
imp: self,
|
||||
"Aborting multipart upload request with id: {}",
|
||||
state.upload_id
|
||||
);
|
||||
match self.abort_multipart_upload_request(state) {
|
||||
Ok(()) => {
|
||||
gst::log!(
|
||||
CAT,
|
||||
imp: self,
|
||||
"Aborting multipart upload request succeeded."
|
||||
);
|
||||
}
|
||||
Err(err) => gst::error!(
|
||||
CAT,
|
||||
imp: self,
|
||||
"Aborting multipart upload failed: {}",
|
||||
err.to_string()
|
||||
),
|
||||
}
|
||||
}
|
||||
OnError::Complete => {
|
||||
gst::log!(
|
||||
CAT,
|
||||
imp: self,
|
||||
"Completing multipart upload request with id: {}",
|
||||
state.upload_id
|
||||
);
|
||||
match self.complete_multipart_upload_request(state) {
|
||||
Ok(()) => {
|
||||
gst::log!(
|
||||
CAT,
|
||||
imp: self,
|
||||
"Complete multipart upload request succeeded."
|
||||
);
|
||||
}
|
||||
Err(err) => gst::error!(
|
||||
CAT,
|
||||
imp: self,
|
||||
"Completing multipart upload failed: {}",
|
||||
err.to_string()
|
||||
),
|
||||
}
|
||||
}
|
||||
OnError::DoNothing => (),
|
||||
}
|
||||
self.flush_multipart_upload(state);
|
||||
Some(gst::error_msg!(
|
||||
gst::ResourceError::OpenWrite,
|
||||
["Failed to upload part: {}", err]
|
||||
|
@ -277,6 +285,9 @@ impl S3Sink {
|
|||
let mut state = self.state.lock().unwrap();
|
||||
let state = match *state {
|
||||
State::Started(ref mut started_state) => started_state,
|
||||
State::Completed => {
|
||||
unreachable!("Upload should not be completed yet");
|
||||
}
|
||||
State::Stopped => {
|
||||
unreachable!("Element should be started");
|
||||
}
|
||||
|
@ -437,12 +448,21 @@ impl S3Sink {
|
|||
let mut state = self.state.lock().unwrap();
|
||||
let started_state = match *state {
|
||||
State::Started(ref mut started_state) => started_state,
|
||||
State::Completed => {
|
||||
unreachable!("Upload should not be completed yet");
|
||||
}
|
||||
State::Stopped => {
|
||||
unreachable!("Element should be started");
|
||||
}
|
||||
};
|
||||
|
||||
self.complete_multipart_upload_request(started_state)
|
||||
let res = self.complete_multipart_upload_request(started_state);
|
||||
|
||||
if res.is_ok() {
|
||||
*state = State::Completed;
|
||||
}
|
||||
|
||||
res
|
||||
}
|
||||
|
||||
fn start(&self) -> Result<(), gst::ErrorMessage> {
|
||||
|
@ -562,6 +582,9 @@ impl S3Sink {
|
|||
let mut state = self.state.lock().unwrap();
|
||||
let started_state = match *state {
|
||||
State::Started(ref mut started_state) => started_state,
|
||||
State::Completed => {
|
||||
unreachable!("Upload should not be completed yet");
|
||||
}
|
||||
State::Stopped => {
|
||||
unreachable!("Element should be started already");
|
||||
}
|
||||
|
@ -953,6 +976,17 @@ impl BaseSinkImpl for S3Sink {
|
|||
|
||||
fn stop(&self) -> Result<(), gst::ErrorMessage> {
|
||||
let mut state = self.state.lock().unwrap();
|
||||
|
||||
if let State::Started(ref mut state) = *state {
|
||||
gst::warning!(CAT, imp: self, "Stopped without EOS");
|
||||
|
||||
// We're stopping without an EOS -- treat this as an error and deal with the open
|
||||
// multipart upload accordingly _if_ we managed to upload any parts
|
||||
if !state.completed_parts.is_empty() {
|
||||
self.flush_multipart_upload(state);
|
||||
}
|
||||
}
|
||||
|
||||
*state = State::Stopped;
|
||||
gst::info!(CAT, imp: self, "Stopped");
|
||||
|
||||
|
@ -965,6 +999,15 @@ impl BaseSinkImpl for S3Sink {
|
|||
return Err(gst::FlowError::Error);
|
||||
}
|
||||
|
||||
if let State::Completed = *self.state.lock().unwrap() {
|
||||
gst::element_imp_error!(
|
||||
self,
|
||||
gst::CoreError::Failed,
|
||||
["Trying to render after upload complete"]
|
||||
);
|
||||
return Err(gst::FlowError::Error);
|
||||
}
|
||||
|
||||
gst::trace!(CAT, imp: self, "Rendering {:?}", buffer);
|
||||
let map = buffer.map_readable().map_err(|_| {
|
||||
gst::element_imp_error!(self, gst::CoreError::Failed, ["Failed to map buffer"]);
|
||||
|
|
Loading…
Reference in a new issue