From 410d104ad6146421655ec29fe00a9f3134445664 Mon Sep 17 00:00:00 2001 From: Arun Raghavan Date: Thu, 28 Sep 2023 18:38:19 +0200 Subject: [PATCH] aws: s3putobjectsink: Add a flush-on-error property Makes sure we can send out data even if the pipeline shutdown in error. Part-of: --- net/aws/src/s3sink/putobjectsink.rs | 43 +++++++++++++++++++++++++ net/aws/tests/s3.rs | 49 +++++++++++++++++++++++++---- 2 files changed, 86 insertions(+), 6 deletions(-) diff --git a/net/aws/src/s3sink/putobjectsink.rs b/net/aws/src/s3sink/putobjectsink.rs index 2275a5d6..5b6d4934 100644 --- a/net/aws/src/s3sink/putobjectsink.rs +++ b/net/aws/src/s3sink/putobjectsink.rs @@ -34,6 +34,7 @@ const DEFAULT_RETRY_ATTEMPTS: u32 = 5; const DEFAULT_FLUSH_INTERVAL_BUFFERS: u64 = 1; const DEFAULT_FLUSH_INTERVAL_BYTES: u64 = 0; const DEFAULT_FLUSH_INTERVAL_TIME: gst::ClockTime = gst::ClockTime::from_nseconds(0); +const DEFAULT_FLUSH_ON_ERROR: bool = false; // General setting for create / abort requests const DEFAULT_REQUEST_TIMEOUT_MSEC: u64 = 15_000; @@ -43,6 +44,7 @@ struct Started { buffer: Vec, start_pts: Option, num_buffers: u64, + need_flush: bool, } impl Started { @@ -52,6 +54,7 @@ impl Started { buffer, start_pts: gst::ClockTime::NONE, num_buffers: 0, + need_flush: false, } } } @@ -79,6 +82,7 @@ struct Settings { flush_interval_buffers: u64, flush_interval_bytes: u64, flush_interval_time: Option, + flush_on_error: bool, } impl Settings { @@ -134,6 +138,7 @@ impl Default for Settings { flush_interval_buffers: DEFAULT_FLUSH_INTERVAL_BUFFERS, flush_interval_bytes: DEFAULT_FLUSH_INTERVAL_BYTES, flush_interval_time: Some(DEFAULT_FLUSH_INTERVAL_TIME), + flush_on_error: DEFAULT_FLUSH_ON_ERROR, } } } @@ -435,6 +440,11 @@ impl ObjectImpl for S3PutObjectSink { .blurb("Total duration of buffers to accumulate before doing a write (0 => disable)") .default_value(DEFAULT_FLUSH_INTERVAL_TIME.nseconds()) .build(), + glib::ParamSpecBoolean::builder("flush-on-error") + .nick("Flush on error") + .blurb("Whether to write out the data on error (like stopping without an EOS)") + .default_value(DEFAULT_FLUSH_ON_ERROR) + .build(), ] }); @@ -528,6 +538,9 @@ impl ObjectImpl for S3PutObjectSink { .get::>() .expect("type checked upstream"); } + "flush-on-error" => { + settings.flush_on_error = value.get::().expect("type checked upstream"); + } _ => unimplemented!(), } } @@ -560,6 +573,7 @@ impl ObjectImpl for S3PutObjectSink { "flush-interval-buffers" => settings.flush_interval_buffers.to_value(), "flush-interval-bytes" => settings.flush_interval_bytes.to_value(), "flush-interval-time" => settings.flush_interval_time.to_value(), + "flush-on-error" => settings.flush_on_error.to_value(), _ => unimplemented!(), } } @@ -606,6 +620,26 @@ impl BaseSinkImpl for S3PutObjectSink { fn stop(&self) -> Result<(), gst::ErrorMessage> { let mut state = self.state.lock().unwrap(); + let settings = self.settings.lock().unwrap(); + + if let State::Started(ref started_state) = *state { + if settings.flush_on_error && started_state.need_flush { + drop(settings); + drop(state); + + gst::warning!(CAT, imp: self, "Stopped without EOS, but flushing"); + if let Err(error_message) = self.flush_buffer() { + gst::error!( + CAT, + imp: self, + "Failed to finalize the upload: {:?}", + error_message + ); + } + + state = self.state.lock().unwrap(); + } + } *state = State::Stopped; gst::info!(CAT, imp: self, "Stopped"); @@ -629,6 +663,7 @@ impl BaseSinkImpl for S3PutObjectSink { } started_state.num_buffers += 1; + started_state.need_flush = true; gst::trace!(CAT, imp: self, "Rendering {:?}", buffer); let map = buffer.map_readable().map_err(|_| { @@ -668,6 +703,14 @@ impl BaseSinkImpl for S3PutObjectSink { fn event(&self, event: gst::Event) -> bool { if let gst::EventView::Eos(_) = event.view() { + let mut state = self.state.lock().unwrap(); + + if let State::Started(ref mut started_state) = *state { + started_state.need_flush = false; + } + + drop(state); + if let Err(error_message) = self.flush_buffer() { gst::error!( CAT, diff --git a/net/aws/tests/s3.rs b/net/aws/tests/s3.rs index 4fff59d6..dfe27376 100644 --- a/net/aws/tests/s3.rs +++ b/net/aws/tests/s3.rs @@ -102,6 +102,7 @@ mod tests { buffers: Option, bytes: Option, time: Option, + do_eos: bool, ) { init(); @@ -139,6 +140,9 @@ mod tests { if time.is_some() { h1el.set_property("flush-interval-time", time) }; + if !do_eos { + h1el.set_property("flush-on-error", true) + } h1.set_src_caps(gst::Caps::builder("text/plain").build()); h1.play(); @@ -148,7 +152,13 @@ mod tests { h1.push(make_buffer(content)).unwrap(); h1.push(make_buffer(content)).unwrap(); h1.push(make_buffer(content)).unwrap(); - h1.push_event(gst::event::Eos::new()); + + if do_eos { + h1.push_event(gst::event::Eos::new()); + } else { + // teardown to trigger end + drop(h1); + } let mut h2 = gst_check::Harness::new("awss3src"); h2.element().unwrap().set_property("uri", uri.clone()); @@ -180,29 +190,29 @@ mod tests { #[tokio::test] async fn test_s3_put_object_simple() { - do_s3_putobject_test("s3-put-object-test", None, None, None).await; + do_s3_putobject_test("s3-put-object-test", None, None, None, true).await; } #[tokio::test] async fn test_s3_put_object_whitespace() { - do_s3_putobject_test("s3 put object test", None, None, None).await; + do_s3_putobject_test("s3 put object test", None, None, None, true).await; } #[tokio::test] async fn test_s3_put_object_unicode() { - do_s3_putobject_test("s3 put object 🧪 😱", None, None, None).await; + do_s3_putobject_test("s3 put object 🧪 😱", None, None, None, true).await; } #[tokio::test] async fn test_s3_put_object_flush_buffers() { // Awkward threshold as we push 5 buffers - do_s3_putobject_test("s3-put-object-test fbuf", Some(2), None, None).await; + do_s3_putobject_test("s3-put-object-test fbuf", Some(2), None, None, true).await; } #[tokio::test] async fn test_s3_put_object_flush_bytes() { // Awkward threshold as we push 14 bytes per buffer - do_s3_putobject_test("s3-put-object-test fbytes", None, Some(30), None).await; + do_s3_putobject_test("s3-put-object-test fbytes", None, Some(30), None, true).await; } #[tokio::test] @@ -213,6 +223,33 @@ mod tests { None, // Awkward threshold as we push each buffer with 200ms Some(gst::ClockTime::from_mseconds(300)), + true, + ) + .await; + } + + #[tokio::test] + async fn test_s3_put_object_on_eos() { + // Disable all flush thresholds, so only EOS causes a flush + do_s3_putobject_test( + "s3-put-object-test eos", + Some(0), + Some(0), + Some(gst::ClockTime::from_nseconds(0)), + true, + ) + .await; + } + + #[tokio::test] + async fn test_s3_put_object_without_eos() { + // Disable all flush thresholds, skip EOS, and cause a flush on error + do_s3_putobject_test( + "s3-put-object-test !eos", + Some(0), + Some(0), + Some(gst::ClockTime::from_nseconds(0)), + false, ) .await; }