From fd696da660dde4ca347928e467a11fd654b2e751 Mon Sep 17 00:00:00 2001 From: Sanchayan Maity Date: Wed, 17 Apr 2024 21:41:06 +0530 Subject: [PATCH] aws: Add next-file support to putobjectsink Add `next-file` support to `awss3putobjectsink` on similar lines to the `next-file` support in `multifilesink`. --- Cargo.lock | 13 +- docs/plugins/gst_plugins_cache.json | 63 +++- net/aws/Cargo.toml | 2 + net/aws/src/s3sink/mod.rs | 33 ++ net/aws/src/s3sink/putobjectsink.rs | 484 +++++++++++++++++++++------- 5 files changed, 474 insertions(+), 121 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 163a0b7c..06225f64 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2243,12 +2243,14 @@ dependencies = [ "gstreamer-audio", "gstreamer-base", "gstreamer-check", + "gstreamer-video", "once_cell", "percent-encoding", "rand", "serde", "serde_derive", "serde_json", + "sprintf 0.2.1", "test-with", "tokio", "url", @@ -2482,7 +2484,7 @@ dependencies = [ "gstreamer-video", "m3u8-rs", "once_cell", - "sprintf", + "sprintf 0.1.4", ] [[package]] @@ -6354,6 +6356,15 @@ version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6c0cdea5a20a06e7c57f627094e7b1618e5665592cd88f2d45fa4014e348db58" +[[package]] +name = "sprintf" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f2819cb5194dfe9e6d102f4519a9fb9dc7106d2879b71b4fd4d4677f1175bd39" +dependencies = [ + "thiserror", +] + [[package]] name = "static_assertions" version = "1.1.0" diff --git a/docs/plugins/gst_plugins_cache.json b/docs/plugins/gst_plugins_cache.json index ad008d5b..d443f618 100644 --- a/docs/plugins/gst_plugins_cache.json +++ b/docs/plugins/gst_plugins_cache.json @@ -362,6 +362,32 @@ "type": "GstStructure", "writable": true }, + "min-keyframe-distance": { + "blurb": "Minimum distance between keyframes to start a new file", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "10000000000", + "max": "18446744073709551615", + "min": "0", + "mutable": "null", + "readable": true, + "type": "guint64", + "writable": true + }, + "next-file": { + "blurb": "When to start new file", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "next-buffer (0)", + "mutable": "ready", + "readable": false, + "type": "GstS3PutObjectSinkNextFile", + "writable": true + }, "region": { "blurb": "An AWS region (e.g. eu-west-2).", "conditionally-available": false, @@ -1183,6 +1209,41 @@ } ] }, + "GstS3PutObjectSinkNextFile": { + "kind": "enum", + "values": [ + { + "desc": "NextBuffer: New file for each buffer.", + "name": "next-buffer", + "value": "0" + }, + { + "desc": "NextDiscont: New file after each discontinuity.", + "name": "next-discont", + "value": "1" + }, + { + "desc": "NextKeyFrame: New file at each key frame.", + "name": "next-key-frame", + "value": "2" + }, + { + "desc": "NextKeyUnitEvent: New file after a force key unit event.", + "name": "next-key-unit-event", + "value": "3" + }, + { + "desc": "NextMaxSize: New file when the configured maximum file size would be exceeded with the next buffer or buffer list.", + "name": "next-max-size", + "value": "4" + }, + { + "desc": "NextMaxDuration: New file when the configured maximum duration would be exceeded with the next buffer or buffer list.", + "name": "next-max-duration", + "value": "5" + } + ] + }, "GstS3SinkOnError": { "kind": "enum", "values": [ @@ -11781,4 +11842,4 @@ "tracers": {}, "url": "https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs" } -} \ No newline at end of file +} diff --git a/net/aws/Cargo.toml b/net/aws/Cargo.toml index 775a8ce0..5d9bf9b2 100644 --- a/net/aws/Cargo.toml +++ b/net/aws/Cargo.toml @@ -32,6 +32,8 @@ serde_derive = "1" serde_json = "1" url = "2" once_cell.workspace = true +gst-video = { workspace = true, features = ["v1_22"] } +sprintf = "0.2" [dev-dependencies] chrono = { version = "0.4", features = [ "alloc" ] } diff --git a/net/aws/src/s3sink/mod.rs b/net/aws/src/s3sink/mod.rs index 78308966..0f29ca38 100644 --- a/net/aws/src/s3sink/mod.rs +++ b/net/aws/src/s3sink/mod.rs @@ -14,6 +14,39 @@ use gst::prelude::*; mod multipartsink; mod putobjectsink; +#[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Clone, Copy, glib::Enum)] +#[repr(u32)] +#[enum_type(name = "GstS3PutObjectSinkNextFile")] +pub(crate) enum NextFile { + #[enum_value(name = "NextBuffer: New file for each buffer.", nick = "next-buffer")] + Buffer, + #[enum_value( + name = "NextDiscont: New file after each discontinuity.", + nick = "next-discont" + )] + Discont, + #[enum_value( + name = "NextKeyFrame: New file at each key frame.", + nick = "next-key-frame" + )] + KeyFrame, + #[enum_value( + name = "NextKeyUnitEvent: New file after a force key unit event.", + nick = "next-key-unit-event" + )] + KeyUnitEvent, + #[enum_value( + name = "NextMaxSize: New file when the configured maximum file size would be exceeded with the next buffer or buffer list.", + nick = "next-max-size" + )] + MaxSize, + #[enum_value( + name = "NextMaxDuration: New file when the configured maximum duration would be exceeded with the next buffer or buffer list.", + nick = "next-max-duration" + )] + MaxDuration, +} + #[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Clone, Copy, glib::Enum)] #[repr(u32)] #[enum_type(name = "GstS3SinkOnError")] diff --git a/net/aws/src/s3sink/putobjectsink.rs b/net/aws/src/s3sink/putobjectsink.rs index fb3ecd35..4e253ee9 100644 --- a/net/aws/src/s3sink/putobjectsink.rs +++ b/net/aws/src/s3sink/putobjectsink.rs @@ -15,12 +15,12 @@ use gst_base::subclass::prelude::*; use aws_sdk_s3::{ config::{self, retry::RetryConfig, Credentials, Region}, - error::ProvideErrorMetadata, operation::put_object::builders::PutObjectFluentBuilder, primitives::ByteStream, Client, }; +use super::NextFile; use futures::future; use once_cell::sync::Lazy; use std::collections::HashMap; @@ -37,6 +37,8 @@ const DEFAULT_FLUSH_INTERVAL_BYTES: u64 = 0; const DEFAULT_FLUSH_INTERVAL_TIME: gst::ClockTime = gst::ClockTime::from_nseconds(0); const DEFAULT_FLUSH_ON_ERROR: bool = false; const DEFAULT_FORCE_PATH_STYLE: bool = false; +const DEFAULT_NEXT_FILE: NextFile = NextFile::Buffer; +const DEFAULT_MIN_KEYFRAME_DISTANCE: gst::ClockTime = gst::ClockTime::from_seconds(10); // General setting for create / abort requests const DEFAULT_REQUEST_TIMEOUT_MSEC: u64 = 15_000; @@ -47,6 +49,12 @@ struct Started { start_pts: Option, num_buffers: u64, need_flush: bool, + index: u64, + next_segment: Option, + streamheaders: Option>, + streamheaders_size: u64, + force_key_unit_count: i32, + file_pts: Option, } impl Started { @@ -57,6 +65,12 @@ impl Started { start_pts: gst::ClockTime::NONE, num_buffers: 0, need_flush: false, + index: 0, + next_segment: gst::ClockTime::NONE, + streamheaders: None, + streamheaders_size: 0, + force_key_unit_count: -1, + file_pts: gst::ClockTime::NONE, } } } @@ -86,6 +100,8 @@ struct Settings { flush_interval_bytes: u64, flush_interval_time: Option, flush_on_error: bool, + next_file: NextFile, + min_keyframe_distance: gst::ClockTime, } impl Settings { @@ -143,6 +159,8 @@ impl Default for Settings { flush_interval_bytes: DEFAULT_FLUSH_INTERVAL_BYTES, flush_interval_time: Some(DEFAULT_FLUSH_INTERVAL_TIME), flush_on_error: DEFAULT_FLUSH_ON_ERROR, + next_file: DEFAULT_NEXT_FILE, + min_keyframe_distance: DEFAULT_MIN_KEYFRAME_DISTANCE, } } } @@ -164,13 +182,9 @@ static CAT: Lazy = Lazy::new(|| { }); impl S3PutObjectSink { - fn check_thresholds( - &self, - state: &Started, - pts: Option, - duration: Option, - ) -> bool { - let settings = self.settings.lock().unwrap(); + fn check_thresholds(&self, settings: &Settings, state: &Started, buffer: &gst::Buffer) -> bool { + let pts = buffer.pts(); + let duration = buffer.duration(); #[allow(clippy::if_same_then_else)] #[allow(clippy::needless_bool)] @@ -198,51 +212,6 @@ impl S3PutObjectSink { } } - fn flush_buffer(&self) -> Result<(), Option> { - let put_object_req = self.create_put_object_request(); - - let put_object_req_future = put_object_req.send(); - let _output = - s3utils::wait(&self.canceller, put_object_req_future).map_err(|err| match err { - WaitError::FutureError(err) => Some(gst::error_msg!( - gst::ResourceError::OpenWrite, - ["Failed to upload object: {err}: {}", err.meta()] - )), - WaitError::Cancelled => None, - })?; - - gst::debug!(CAT, imp: self, "Upload complete"); - - Ok(()) - } - - fn create_put_object_request(&self) -> PutObjectFluentBuilder { - let url = self.url.lock().unwrap(); - let settings = self.settings.lock().unwrap(); - let state = self.state.lock().unwrap(); - let state = match *state { - State::Started(ref started_state) => started_state, - State::Stopped => { - unreachable!("Element should be started"); - } - }; - - let body = Some(ByteStream::from(state.buffer.clone())); - - let bucket = Some(url.as_ref().unwrap().bucket.to_owned()); - let key = Some(url.as_ref().unwrap().object.to_owned()); - let metadata = settings.to_metadata(self); - - let client = &state.client; - - client - .put_object() - .set_body(body) - .set_bucket(bucket) - .set_key(key) - .set_metadata(metadata) - } - fn start(&self) -> Result<(), gst::ErrorMessage> { let mut state = self.state.lock().unwrap(); let settings = self.settings.lock().unwrap(); @@ -351,6 +320,217 @@ impl S3PutObjectSink { )), } } + + fn accumulate_buffer( + &self, + buffer: &gst::Buffer, + started_state: &mut Started, + ) -> Result { + if started_state.start_pts.is_none() { + started_state.start_pts = buffer.pts(); + } + + started_state.num_buffers += 1; + started_state.need_flush = true; + + gst::trace!(CAT, imp: self, "Rendering {:?}", buffer); + let map = buffer.map_readable().map_err(|_| { + gst::element_imp_error!(self, gst::CoreError::Failed, ["Failed to map buffer"]); + gst::FlowError::Error + })?; + + started_state.buffer.extend_from_slice(map.as_slice()); + + Ok(gst::FlowSuccess::Ok) + } + + fn create_body_with_streamheaders( + &self, + next_file: NextFile, + streamheaders: &Option>, + buffer: &[u8], + ) -> ByteStream { + match next_file { + NextFile::KeyFrame | NextFile::MaxSize | NextFile::MaxDuration => { + if let Some(headers) = streamheaders { + let with_sh = [&headers[..], buffer].concat(); + + ByteStream::from(with_sh) + } else { + ByteStream::from(buffer.to_vec()) + } + } + _ => ByteStream::from(buffer.to_vec()), + } + } + + fn create_put_object_request( + &self, + started_state: &mut Started, + ) -> Result, gst::FlowError> { + let url = self.url.lock().unwrap(); + let settings = self.settings.lock().unwrap(); + + if started_state.buffer.is_empty() { + return Ok(None); + } + + let body = Some(self.create_body_with_streamheaders( + settings.next_file, + &started_state.streamheaders, + &started_state.buffer, + )); + + let bucket = Some(url.as_ref().unwrap().bucket.to_owned()); + + let object = url.as_ref().unwrap().object.to_owned(); + let key = if object.contains("%0") { + match sprintf::sprintf!(&object, started_state.index) { + Ok(k) => { + /* Equivalent to opening a new file */ + started_state.index += 1; + started_state.buffer = Vec::new(); + + Some(k) + } + Err(e) => { + gst::element_imp_error!( + self, + gst::CoreError::Failed, + ["Failed to format file name: {}", e] + ); + return Err(gst::FlowError::Error); + } + } + } else { + Some(object) + }; + let metadata = settings.to_metadata(self); + let client = &started_state.client; + + Ok(Some( + client + .put_object() + .set_body(body) + .set_bucket(bucket) + .set_key(key) + .set_metadata(metadata), + )) + } + + fn to_write_next_file( + &self, + started_state: &mut Started, + buffer: &gst::Buffer, + buffer_size: u64, + ) -> bool { + let settings = self.settings.lock().unwrap(); + let next_file = settings.next_file; + let max_file_size = settings.flush_interval_bytes; + let max_file_duration = settings.flush_interval_time; + let min_keyframe_distance = settings.min_keyframe_distance; + + match next_file { + NextFile::Buffer => self.check_thresholds(&settings, started_state, buffer), + NextFile::MaxSize => { + started_state.buffer.len() as u64 + started_state.streamheaders_size + buffer_size + > max_file_size + } + NextFile::MaxDuration => { + let mut new_duration = gst::ClockTime::ZERO; + + if buffer.pts().is_some() && started_state.file_pts.is_some() { + new_duration = buffer.pts().unwrap() - started_state.file_pts.unwrap(); + + if buffer.duration().is_some() { + new_duration += buffer.duration().unwrap(); + } + } + + started_state.file_pts = match buffer.pts() { + Some(pts) => Some(pts), + None => started_state.file_pts, + }; + + new_duration > max_file_duration.unwrap() + } + NextFile::KeyFrame => { + if started_state.next_segment == gst::ClockTime::NONE && buffer.pts().is_some() { + started_state.next_segment = + Some(buffer.pts().unwrap() + min_keyframe_distance); + } + + if buffer.pts().is_some() { + let buffer_ts = buffer.pts().unwrap(); + let delta_unit = buffer.flags().contains(gst::BufferFlags::DELTA_UNIT); + let next_segment = started_state + .next_segment + .expect("Next segment must be valid here"); + + if buffer_ts >= next_segment && !delta_unit { + started_state.next_segment = Some(next_segment + min_keyframe_distance); + + true + } else { + false + } + } else { + false + } + } + NextFile::Discont => buffer.flags().contains(gst::BufferFlags::DISCONT), + NextFile::KeyUnitEvent => true, + } + } + + fn write_put_object_request_with_next_file( + &self, + started_state: &mut Started, + ) -> Result { + let req = self.create_put_object_request(started_state)?; + + if let Some(put_object_req) = req { + let put_object_req_future = put_object_req.send(); + + match s3utils::wait(&self.canceller, put_object_req_future) { + Ok(_) => Ok(gst::FlowSuccess::Ok), + Err(err) => match err { + WaitError::Cancelled => Ok(gst::FlowSuccess::Ok), + WaitError::FutureError(e) => { + gst::element_imp_error!(self, gst::CoreError::Failed, ["{}", e]); + Err(gst::FlowError::Error) + } + }, + } + } else { + Ok(gst::FlowSuccess::Ok) + } + } + + fn write_buffer_with_next_file( + &self, + buffer: &gst::Buffer, + ) -> Result { + let mut state = self.state.lock().unwrap(); + let started_state = match *state { + State::Started(ref mut started_state) => started_state, + State::Stopped => { + gst::element_imp_error!(self, gst::CoreError::Failed, ["Not started yet"]); + return Err(gst::FlowError::Error); + } + }; + + let map = buffer.map_readable().map_err(|_| { + gst::element_imp_error!(self, gst::CoreError::Failed, ["Failed to map buffer"]); + gst::FlowError::Error + })?; + + if self.to_write_next_file(started_state, buffer, map.size() as u64) { + self.write_put_object_request_with_next_file(started_state)?; + } + + self.accumulate_buffer(buffer, started_state) + } } #[glib::object_subclass] @@ -455,6 +635,17 @@ impl ObjectImpl for S3PutObjectSink { .blurb("Force client to use path-style addressing for buckets") .default_value(DEFAULT_FORCE_PATH_STYLE) .build(), + glib::ParamSpecEnum::builder_with_default("next-file", DEFAULT_NEXT_FILE) + .nick("Next File") + .blurb("When to start new file") + .write_only() + .mutable_ready() + .build(), + glib::ParamSpecUInt64::builder("min-keyframe-distance") + .nick("Minimum keyframe distance") + .blurb("Minimum distance between keyframes to start a new file") + .default_value(DEFAULT_MIN_KEYFRAME_DISTANCE.into()) + .build(), ] }); @@ -554,6 +745,14 @@ impl ObjectImpl for S3PutObjectSink { "force-path-style" => { settings.force_path_style = value.get::().expect("type checked upstream"); } + "next-file" => { + settings.next_file = value.get::().expect("type checked upstream"); + } + "min-keyframe-distance" => { + settings.min_keyframe_distance = value + .get::() + .expect("type checked upstream"); + } _ => unimplemented!(), } } @@ -588,6 +787,7 @@ impl ObjectImpl for S3PutObjectSink { "flush-interval-time" => settings.flush_interval_time.to_value(), "flush-on-error" => settings.flush_on_error.to_value(), "force-path-style" => settings.force_path_style.to_value(), + "min-keyframe-distance" => settings.min_keyframe_distance.to_value(), _ => unimplemented!(), } } @@ -598,6 +798,8 @@ impl GstObjectImpl for S3PutObjectSink {} impl ElementImpl for S3PutObjectSink { fn metadata() -> Option<&'static gst::subclass::ElementMetadata> { static ELEMENT_METADATA: Lazy = Lazy::new(|| { + #[cfg(feature = "doc")] + NextFile::static_type().mark_as_plugin_api(gst::PluginAPIFlags::empty()); gst::subclass::ElementMetadata::new( "Amazon S3 PutObject sink", "Source/Network", @@ -636,77 +838,32 @@ impl BaseSinkImpl for S3PutObjectSink { let mut state = self.state.lock().unwrap(); let settings = self.settings.lock().unwrap(); - if let State::Started(ref started_state) = *state { + if let State::Started(ref mut started_state) = *state { if settings.flush_on_error && started_state.need_flush { drop(settings); - drop(state); - gst::warning!(CAT, imp: self, "Stopped without EOS, but flushing"); - if let Err(error_message) = self.flush_buffer() { + if self + .write_put_object_request_with_next_file(started_state) + .is_err() + { gst::error!( CAT, imp: self, - "Failed to finalize the upload: {:?}", - error_message + "Failed to finalize the next-file upload", ); } - - state = self.state.lock().unwrap(); } } *state = State::Stopped; + gst::info!(CAT, imp: self, "Stopped"); Ok(()) } fn render(&self, buffer: &gst::Buffer) -> Result { - let mut state = self.state.lock().unwrap(); - - let started_state = match *state { - State::Started(ref mut s) => s, - State::Stopped => { - gst::element_imp_error!(self, gst::CoreError::Failed, ["Not started yet"]); - return Err(gst::FlowError::Error); - } - }; - - if started_state.start_pts.is_none() { - started_state.start_pts = buffer.pts(); - } - - started_state.num_buffers += 1; - started_state.need_flush = true; - - gst::trace!(CAT, imp: self, "Rendering {:?}", buffer); - let map = buffer.map_readable().map_err(|_| { - gst::element_imp_error!(self, gst::CoreError::Failed, ["Failed to map buffer"]); - gst::FlowError::Error - })?; - - started_state.buffer.extend_from_slice(map.as_slice()); - - if !self.check_thresholds(started_state, buffer.pts(), buffer.duration()) { - return Ok(gst::FlowSuccess::Ok); - } - - drop(state); - - match self.flush_buffer() { - Ok(_) => Ok(gst::FlowSuccess::Ok), - Err(err) => match err { - Some(error_message) => { - gst::error!(CAT, imp: self, "Upload failed: {}", error_message); - self.post_error_message(error_message); - Err(gst::FlowError::Error) - } - _ => { - gst::info!(CAT, imp: self, "Upload interrupted. Flushing..."); - Err(gst::FlowError::Flushing) - } - }, - } + self.write_buffer_with_next_file(buffer) } fn unlock(&self) -> Result<(), gst::ErrorMessage> { @@ -716,26 +873,115 @@ impl BaseSinkImpl for S3PutObjectSink { } fn event(&self, event: gst::Event) -> bool { - if let gst::EventView::Eos(_) = event.view() { - let mut state = self.state.lock().unwrap(); + use gst::EventView; - if let State::Started(ref mut started_state) = *state { - started_state.need_flush = false; + match event.view() { + EventView::CustomDownstream(ev) => { + let settings = self.settings.lock().unwrap(); + let next_file = settings.next_file; + let is_next_key_unit_event = next_file == NextFile::KeyUnitEvent; + drop(settings); + + /* Taken from gst_multi_file_sink_event */ + if is_next_key_unit_event && gst_video::ForceKeyUnitEvent::is(ev) { + use gst_video::DownstreamForceKeyUnitEvent; + + match DownstreamForceKeyUnitEvent::parse(ev) { + Ok(key_unit_event) => { + let mut state = self.state.lock().unwrap(); + + if let State::Started(ref mut started_state) = *state { + if started_state.force_key_unit_count != -1 + && started_state.force_key_unit_count as u32 + == key_unit_event.count + { + return BaseSinkImplExt::parent_event(self, event); + } + + started_state.force_key_unit_count = key_unit_event.count as i32; + + let _ = self.write_put_object_request_with_next_file(started_state); + } + } + Err(e) => gst::error!(CAT, "Failed to parse key unit event: {}", e), + } + } } + EventView::Eos(_) => { + let mut state = self.state.lock().unwrap(); - drop(state); + if let State::Started(ref mut started_state) = *state { + started_state.need_flush = false; - if let Err(error_message) = self.flush_buffer() { - gst::error!( - CAT, - imp: self, - "Failed to finalize the upload: {:?}", - error_message - ); - return false; + if self + .write_put_object_request_with_next_file(started_state) + .is_err() + { + gst::element_imp_error!( + self, + gst::CoreError::Failed, + ["Failed to finalize the upload"] + ); + } + } } + _ => (), } BaseSinkImplExt::parent_event(self, event) } + + fn set_caps(&self, caps: &gst::Caps) -> Result<(), gst::LoggableError> { + let s = caps + .structure(0) + .ok_or(gst::loggable_error!(CAT, "Missing caps in set_caps"))?; + + if let Ok(Some(streamheaders)) = s.get_optional::("streamheader") { + if streamheaders.is_empty() { + return Ok(()); + } + + let streamheaders = streamheaders.as_slice(); + let mut headers: Vec = Vec::new(); + + let mut state = self.state.lock().unwrap(); + let started_state = match *state { + State::Started(ref mut started_state) => started_state, + State::Stopped => { + return Err(gst::loggable_error!(CAT, "Element should be started")); + } + }; + + started_state.streamheaders_size = 0; + + for header in streamheaders { + let buffer = header.get::>(); + + if let Ok(Some(buf)) = buffer { + let map = buf.map_readable().map_err(|_| { + gst::element_imp_error!( + self, + gst::CoreError::Failed, + ["Failed to map streamheader buffer"] + ); + gst::loggable_error!(CAT, "Failed to map streamheader buffer") + })?; + + headers.extend_from_slice(map.as_slice()); + + started_state.streamheaders_size += map.size() as u64; + } + } + + if !headers.is_empty() { + let _ = started_state.streamheaders.take(); + gst::info!(CAT, imp: self, "Got streamheaders"); + started_state.streamheaders = Some(headers); + } + + drop(state); + } + + self.parent_set_caps(caps) + } }