diff --git a/Cargo.lock b/Cargo.lock index 8ffcb556..339ee41a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2302,12 +2302,14 @@ dependencies = [ "gstreamer-audio", "gstreamer-base", "gstreamer-check", + "gstreamer-video", "once_cell", "percent-encoding", "rand", "serde", "serde_derive", "serde_json", + "sprintf 0.2.1", "test-with", "tokio", "url", @@ -2542,7 +2544,7 @@ dependencies = [ "gstreamer-video", "m3u8-rs", "once_cell", - "sprintf", + "sprintf 0.1.4", ] [[package]] @@ -6617,6 +6619,15 @@ version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6c0cdea5a20a06e7c57f627094e7b1618e5665592cd88f2d45fa4014e348db58" +[[package]] +name = "sprintf" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f2819cb5194dfe9e6d102f4519a9fb9dc7106d2879b71b4fd4d4677f1175bd39" +dependencies = [ + "thiserror", +] + [[package]] name = "static_assertions" version = "1.1.0" diff --git a/docs/plugins/gst_plugins_cache.json b/docs/plugins/gst_plugins_cache.json index 36bf7fa3..cca33829 100644 --- a/docs/plugins/gst_plugins_cache.json +++ b/docs/plugins/gst_plugins_cache.json @@ -398,6 +398,32 @@ "type": "GstStructure", "writable": true }, + "min-keyframe-distance": { + "blurb": "Minimum distance between keyframes to start a new file", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "10000000000", + "max": "18446744073709551615", + "min": "0", + "mutable": "null", + "readable": true, + "type": "guint64", + "writable": true + }, + "next-file": { + "blurb": "When to start new file", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "next-buffer (0)", + "mutable": "ready", + "readable": true, + "type": "GstS3PutObjectSinkNextFile", + "writable": true + }, "region": { "blurb": "An AWS region (e.g. eu-west-2).", "conditionally-available": false, @@ -1255,6 +1281,41 @@ } ] }, + "GstS3PutObjectSinkNextFile": { + "kind": "enum", + "values": [ + { + "desc": "New file for each buffer", + "name": "next-buffer", + "value": "0" + }, + { + "desc": "New file after each discontinuity", + "name": "next-discont", + "value": "1" + }, + { + "desc": "New file at each key frame", + "name": "next-key-frame", + "value": "2" + }, + { + "desc": "New file after a force key unit event", + "name": "next-key-unit-event", + "value": "3" + }, + { + "desc": "New file when the configured maximum file size would be exceeded with the next buffer or buffer list", + "name": "next-max-size", + "value": "4" + }, + { + "desc": "New file when the configured maximum duration would be exceeded with the next buffer or buffer list", + "name": "next-max-duration", + "value": "5" + } + ] + }, "GstS3SinkOnError": { "kind": "enum", "values": [ diff --git a/net/aws/Cargo.toml b/net/aws/Cargo.toml index a5a6607d..866a3a0f 100644 --- a/net/aws/Cargo.toml +++ b/net/aws/Cargo.toml @@ -32,6 +32,8 @@ serde_derive = "1" serde_json = "1" url = "2" once_cell.workspace = true +gst-video = { workspace = true, features = ["v1_22"] } +sprintf = "0.2" [dev-dependencies] chrono = { version = "0.4", features = [ "alloc" ] } diff --git a/net/aws/src/s3sink/mod.rs b/net/aws/src/s3sink/mod.rs index 78308966..00d15474 100644 --- a/net/aws/src/s3sink/mod.rs +++ b/net/aws/src/s3sink/mod.rs @@ -14,6 +14,33 @@ use gst::prelude::*; mod multipartsink; mod putobjectsink; +#[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Clone, Copy, glib::Enum)] +#[repr(u32)] +#[enum_type(name = "GstS3PutObjectSinkNextFile")] +pub(crate) enum NextFile { + #[enum_value(name = "New file for each buffer", nick = "next-buffer")] + Buffer, + #[enum_value(name = "New file after each discontinuity", nick = "next-discont")] + Discont, + #[enum_value(name = "New file at each key frame", nick = "next-key-frame")] + KeyFrame, + #[enum_value( + name = "New file after a force key unit event", + nick = "next-key-unit-event" + )] + KeyUnitEvent, + #[enum_value( + name = "New file when the configured maximum file size would be exceeded with the next buffer or buffer list", + nick = "next-max-size" + )] + MaxSize, + #[enum_value( + name = "New file when the configured maximum duration would be exceeded with the next buffer or buffer list", + nick = "next-max-duration" + )] + MaxDuration, +} + #[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Clone, Copy, glib::Enum)] #[repr(u32)] #[enum_type(name = "GstS3SinkOnError")] diff --git a/net/aws/src/s3sink/putobjectsink.rs b/net/aws/src/s3sink/putobjectsink.rs index 9633b539..5cb16604 100644 --- a/net/aws/src/s3sink/putobjectsink.rs +++ b/net/aws/src/s3sink/putobjectsink.rs @@ -20,6 +20,7 @@ use aws_sdk_s3::{ Client, }; +use super::NextFile; use once_cell::sync::Lazy; use std::collections::HashMap; use std::convert::From; @@ -35,6 +36,8 @@ const DEFAULT_FLUSH_INTERVAL_BYTES: u64 = 0; const DEFAULT_FLUSH_INTERVAL_TIME: gst::ClockTime = gst::ClockTime::from_nseconds(0); const DEFAULT_FLUSH_ON_ERROR: bool = false; const DEFAULT_FORCE_PATH_STYLE: bool = false; +const DEFAULT_NEXT_FILE: NextFile = NextFile::Buffer; +const DEFAULT_MIN_KEYFRAME_DISTANCE: gst::ClockTime = gst::ClockTime::from_seconds(10); // General setting for create / abort requests const DEFAULT_REQUEST_TIMEOUT_MSEC: u64 = 15_000; @@ -45,6 +48,11 @@ struct Started { start_pts: Option, num_buffers: u64, need_flush: bool, + index: u64, + next_segment: Option, + streamheaders: Option>, + streamheaders_size: u64, + file_start_pts: Option, } impl Started { @@ -55,6 +63,11 @@ impl Started { start_pts: gst::ClockTime::NONE, num_buffers: 0, need_flush: false, + index: 0, + next_segment: gst::ClockTime::NONE, + streamheaders: None, + streamheaders_size: 0, + file_start_pts: gst::ClockTime::NONE, } } } @@ -87,6 +100,8 @@ struct Settings { flush_interval_bytes: u64, flush_interval_time: Option, flush_on_error: bool, + next_file: NextFile, + min_keyframe_distance: gst::ClockTime, } impl Settings { @@ -147,6 +162,8 @@ impl Default for Settings { flush_interval_bytes: DEFAULT_FLUSH_INTERVAL_BYTES, flush_interval_time: Some(DEFAULT_FLUSH_INTERVAL_TIME), flush_on_error: DEFAULT_FLUSH_ON_ERROR, + next_file: DEFAULT_NEXT_FILE, + min_keyframe_distance: DEFAULT_MIN_KEYFRAME_DISTANCE, } } } @@ -168,13 +185,9 @@ static CAT: Lazy = Lazy::new(|| { }); impl S3PutObjectSink { - fn check_thresholds( - &self, - state: &Started, - pts: Option, - duration: Option, - ) -> bool { - let settings = self.settings.lock().unwrap(); + fn check_thresholds(&self, settings: &Settings, state: &Started, buffer: &gst::Buffer) -> bool { + let pts = buffer.pts(); + let duration = buffer.duration(); #[allow(clippy::if_same_then_else)] #[allow(clippy::needless_bool)] @@ -202,61 +215,6 @@ impl S3PutObjectSink { } } - fn flush_buffer(&self) -> Result<(), Option> { - let put_object_req = self.create_put_object_request(); - - let put_object_req_future = put_object_req.send(); - let _output = - s3utils::wait(&self.canceller, put_object_req_future).map_err(|err| match &err { - WaitError::FutureError(_) => Some(gst::error_msg!( - gst::ResourceError::OpenWrite, - ["Failed to upload object: {err}"] - )), - WaitError::Cancelled => None, - })?; - - gst::debug!(CAT, imp = self, "Upload complete"); - - Ok(()) - } - - fn create_put_object_request(&self) -> PutObjectFluentBuilder { - let url = self.url.lock().unwrap(); - let settings = self.settings.lock().unwrap(); - let state = self.state.lock().unwrap(); - let state = match *state { - State::Started(ref started_state) => started_state, - State::Stopped => { - unreachable!("Element should be started"); - } - }; - - let body = Some(ByteStream::from(state.buffer.clone())); - - let bucket = Some(url.as_ref().unwrap().bucket.to_owned()); - let key = Some(url.as_ref().unwrap().object.to_owned()); - let cache_control = settings.cache_control.clone(); - let content_type = settings.content_type.clone(); - let content_disposition = settings.content_disposition.clone(); - let content_encoding = settings.content_encoding.clone(); - let content_language = settings.content_language.clone(); - let metadata = settings.to_metadata(self); - - let client = &state.client; - - client - .put_object() - .set_body(body) - .set_bucket(bucket) - .set_cache_control(cache_control) - .set_content_disposition(content_disposition) - .set_content_encoding(content_encoding) - .set_content_type(content_type) - .set_content_language(content_language) - .set_key(key) - .set_metadata(metadata) - } - fn start(&self) -> Result<(), gst::ErrorMessage> { let mut state = self.state.lock().unwrap(); let settings = self.settings.lock().unwrap(); @@ -357,6 +315,214 @@ impl S3PutObjectSink { )), } } + + fn accumulate_buffer( + &self, + buffer: &gst::Buffer, + started_state: &mut Started, + ) -> Result { + if started_state.start_pts.is_none() { + started_state.start_pts = buffer.pts(); + } + + started_state.num_buffers += 1; + started_state.need_flush = true; + + gst::trace!(CAT, imp = self, "Rendering {:?}", buffer); + let map = buffer.map_readable().map_err(|_| { + gst::element_imp_error!(self, gst::CoreError::Failed, ["Failed to map buffer"]); + gst::FlowError::Error + })?; + + started_state.buffer.extend_from_slice(map.as_slice()); + + Ok(gst::FlowSuccess::Ok) + } + + fn create_body_with_streamheaders( + &self, + next_file: NextFile, + streamheaders: &Option>, + buffer: &[u8], + ) -> ByteStream { + match next_file { + NextFile::KeyFrame | NextFile::MaxSize | NextFile::MaxDuration => { + if let Some(headers) = streamheaders { + let with_sh = [&headers[..], buffer].concat(); + + ByteStream::from(with_sh) + } else { + ByteStream::from(buffer.to_vec()) + } + } + _ => ByteStream::from(buffer.to_vec()), + } + } + + fn create_put_object_request( + &self, + started_state: &mut Started, + ) -> Result, gst::FlowError> { + let url = self.url.lock().unwrap(); + let settings = self.settings.lock().unwrap(); + + if started_state.buffer.is_empty() { + return Ok(None); + } + + let body = Some(self.create_body_with_streamheaders( + settings.next_file, + &started_state.streamheaders, + &started_state.buffer, + )); + + let bucket = Some(url.as_ref().unwrap().bucket.to_owned()); + + let object = url.as_ref().unwrap().object.to_owned(); + let key = if object.contains("%0") { + match sprintf::sprintf!(&object, started_state.index) { + Ok(k) => { + /* Equivalent to opening a new file */ + started_state.index += 1; + started_state.buffer = Vec::new(); + + Some(k) + } + Err(e) => { + gst::element_imp_error!( + self, + gst::CoreError::Failed, + ["Failed to format file name: {}", e] + ); + return Err(gst::FlowError::Error); + } + } + } else { + Some(object) + }; + let metadata = settings.to_metadata(self); + let client = &started_state.client; + + Ok(Some( + client + .put_object() + .set_body(body) + .set_bucket(bucket) + .set_key(key) + .set_metadata(metadata), + )) + } + + fn to_write_next_file( + &self, + started_state: &mut Started, + buffer: &gst::Buffer, + buffer_size: u64, + ) -> bool { + let settings = self.settings.lock().unwrap(); + let next_file = settings.next_file; + let max_file_size = settings.flush_interval_bytes; + let max_file_duration = settings.flush_interval_time; + let min_keyframe_distance = settings.min_keyframe_distance; + + match next_file { + NextFile::Buffer => self.check_thresholds(&settings, started_state, buffer), + NextFile::MaxSize => { + started_state.buffer.len() as u64 + started_state.streamheaders_size + buffer_size + > max_file_size + } + NextFile::MaxDuration => { + let mut new_duration = gst::ClockTime::ZERO; + + if buffer.pts().is_some() && started_state.file_start_pts.is_some() { + new_duration = buffer.pts().unwrap() - started_state.file_start_pts.unwrap(); + + if buffer.duration().is_some() { + new_duration += buffer.duration().unwrap(); + } + } + + started_state.file_start_pts = match buffer.pts() { + Some(pts) => Some(pts), + None => started_state.file_start_pts, + }; + + new_duration > max_file_duration.unwrap() + } + NextFile::KeyFrame => { + if started_state.next_segment == gst::ClockTime::NONE && buffer.pts().is_some() { + started_state.next_segment = + Some(buffer.pts().unwrap() + min_keyframe_distance); + } + + if buffer.pts().is_some() { + let buffer_ts = buffer.pts().unwrap(); + let delta_unit = buffer.flags().contains(gst::BufferFlags::DELTA_UNIT); + let next_segment = started_state + .next_segment + .expect("Next segment must be valid here"); + + if buffer_ts >= next_segment && !delta_unit { + started_state.next_segment = Some(next_segment + min_keyframe_distance); + + true + } else { + false + } + } else { + false + } + } + NextFile::Discont => buffer.flags().contains(gst::BufferFlags::DISCONT), + NextFile::KeyUnitEvent => false, // Next file will be opened on KeyUnitEvent + } + } + + fn write_put_object_request( + &self, + started_state: &mut Started, + ) -> Result { + let req = self.create_put_object_request(started_state)?; + + if let Some(put_object_req) = req { + let put_object_req_future = put_object_req.send(); + + match s3utils::wait(&self.canceller, put_object_req_future) { + Ok(_) => Ok(gst::FlowSuccess::Ok), + Err(err) => match err { + WaitError::Cancelled => Ok(gst::FlowSuccess::Ok), + WaitError::FutureError(e) => { + gst::element_imp_error!(self, gst::CoreError::Failed, ["{e}"]); + Err(gst::FlowError::Error) + } + }, + } + } else { + Ok(gst::FlowSuccess::Ok) + } + } + + fn write_buffer(&self, buffer: &gst::Buffer) -> Result { + let mut state = self.state.lock().unwrap(); + let started_state = match *state { + State::Started(ref mut started_state) => started_state, + State::Stopped => { + gst::element_imp_error!(self, gst::CoreError::Failed, ["Not started yet"]); + return Err(gst::FlowError::Error); + } + }; + + let map = buffer.map_readable().map_err(|_| { + gst::element_imp_error!(self, gst::CoreError::Failed, ["Failed to map buffer"]); + gst::FlowError::Error + })?; + + if self.to_write_next_file(started_state, buffer, map.size() as u64) { + self.write_put_object_request(started_state)?; + } + + self.accumulate_buffer(buffer, started_state) + } } #[glib::object_subclass] @@ -473,6 +639,16 @@ impl ObjectImpl for S3PutObjectSink { .blurb("Force client to use path-style addressing for buckets") .default_value(DEFAULT_FORCE_PATH_STYLE) .build(), + glib::ParamSpecEnum::builder_with_default("next-file", DEFAULT_NEXT_FILE) + .nick("Next File") + .blurb("When to start new file") + .mutable_ready() + .build(), + glib::ParamSpecUInt64::builder("min-keyframe-distance") + .nick("Minimum keyframe distance") + .blurb("Minimum distance between keyframes to start a new file") + .default_value(DEFAULT_MIN_KEYFRAME_DISTANCE.into()) + .build(), ] }); @@ -587,6 +763,14 @@ impl ObjectImpl for S3PutObjectSink { "force-path-style" => { settings.force_path_style = value.get::().expect("type checked upstream"); } + "next-file" => { + settings.next_file = value.get::().expect("type checked upstream"); + } + "min-keyframe-distance" => { + settings.min_keyframe_distance = value + .get::() + .expect("type checked upstream"); + } _ => unimplemented!(), } } @@ -624,6 +808,8 @@ impl ObjectImpl for S3PutObjectSink { "flush-interval-time" => settings.flush_interval_time.to_value(), "flush-on-error" => settings.flush_on_error.to_value(), "force-path-style" => settings.force_path_style.to_value(), + "min-keyframe-distance" => settings.min_keyframe_distance.to_value(), + "next-file" => settings.next_file.to_value(), _ => unimplemented!(), } } @@ -634,6 +820,8 @@ impl GstObjectImpl for S3PutObjectSink {} impl ElementImpl for S3PutObjectSink { fn metadata() -> Option<&'static gst::subclass::ElementMetadata> { static ELEMENT_METADATA: Lazy = Lazy::new(|| { + #[cfg(feature = "doc")] + NextFile::static_type().mark_as_plugin_api(gst::PluginAPIFlags::empty()); gst::subclass::ElementMetadata::new( "Amazon S3 PutObject sink", "Source/Network", @@ -672,77 +860,77 @@ impl BaseSinkImpl for S3PutObjectSink { let mut state = self.state.lock().unwrap(); let settings = self.settings.lock().unwrap(); - if let State::Started(ref started_state) = *state { + if let State::Started(ref mut started_state) = *state { if settings.flush_on_error && started_state.need_flush { drop(settings); - drop(state); - gst::warning!(CAT, imp = self, "Stopped without EOS, but flushing"); - if let Err(error_message) = self.flush_buffer() { - gst::error!( - CAT, - imp = self, - "Failed to finalize the upload: {:?}", - error_message - ); + if self.write_put_object_request(started_state).is_err() { + gst::error!(CAT, imp = self, "Failed to finalize the next-file upload",); } - - state = self.state.lock().unwrap(); } } *state = State::Stopped; + gst::info!(CAT, imp = self, "Stopped"); Ok(()) } fn render(&self, buffer: &gst::Buffer) -> Result { - let mut state = self.state.lock().unwrap(); + self.write_buffer(buffer) + } - let started_state = match *state { - State::Started(ref mut s) => s, - State::Stopped => { - gst::element_imp_error!(self, gst::CoreError::Failed, ["Not started yet"]); - return Err(gst::FlowError::Error); + fn event(&self, event: gst::Event) -> bool { + use gst::EventView; + + match event.view() { + EventView::CustomDownstream(ev) => { + let settings = self.settings.lock().unwrap(); + let next_file = settings.next_file; + let is_next_key_unit_event = next_file == NextFile::KeyUnitEvent; + drop(settings); + + if is_next_key_unit_event && gst_video::ForceKeyUnitEvent::is(ev) { + use gst_video::DownstreamForceKeyUnitEvent; + + match DownstreamForceKeyUnitEvent::parse(ev) { + Ok(_key_unit_event) => { + let mut state = self.state.lock().unwrap(); + + if let State::Started(ref mut started_state) = *state { + if let Err(e) = self.write_put_object_request(started_state) { + gst::element_imp_error!( + self, + gst::CoreError::Failed, + ["Failed to write on KeyUnitEvent, {e}"] + ); + } + } + } + Err(e) => gst::error!(CAT, "Failed to parse key unit event: {e}"), + } + } } - }; + EventView::Eos(_) => { + let mut state = self.state.lock().unwrap(); - if started_state.start_pts.is_none() { - started_state.start_pts = buffer.pts(); - } + if let State::Started(ref mut started_state) = *state { + started_state.need_flush = false; - started_state.num_buffers += 1; - started_state.need_flush = true; - - gst::trace!(CAT, imp = self, "Rendering {:?}", buffer); - let map = buffer.map_readable().map_err(|_| { - gst::element_imp_error!(self, gst::CoreError::Failed, ["Failed to map buffer"]); - gst::FlowError::Error - })?; - - started_state.buffer.extend_from_slice(map.as_slice()); - - if !self.check_thresholds(started_state, buffer.pts(), buffer.duration()) { - return Ok(gst::FlowSuccess::Ok); - } - - drop(state); - - match self.flush_buffer() { - Ok(_) => Ok(gst::FlowSuccess::Ok), - Err(err) => match err { - Some(error_message) => { - gst::error!(CAT, imp = self, "Upload failed: {}", error_message); - self.post_error_message(error_message); - Err(gst::FlowError::Error) + if self.write_put_object_request(started_state).is_err() { + gst::element_imp_error!( + self, + gst::CoreError::Failed, + ["Failed to finalize the upload"] + ); + } } - _ => { - gst::info!(CAT, imp = self, "Upload interrupted. Flushing..."); - Err(gst::FlowError::Flushing) - } - }, + } + _ => (), } + + BaseSinkImplExt::parent_event(self, event) } fn unlock(&self) -> Result<(), gst::ErrorMessage> { @@ -757,27 +945,55 @@ impl BaseSinkImpl for S3PutObjectSink { Ok(()) } - fn event(&self, event: gst::Event) -> bool { - if let gst::EventView::Eos(_) = event.view() { - let mut state = self.state.lock().unwrap(); + fn set_caps(&self, caps: &gst::Caps) -> Result<(), gst::LoggableError> { + let s = caps + .structure(0) + .ok_or(gst::loggable_error!(CAT, "Missing caps in set_caps"))?; - if let State::Started(ref mut started_state) = *state { - started_state.need_flush = false; + if let Ok(Some(streamheaders)) = s.get_optional::("streamheader") { + if streamheaders.is_empty() { + return Ok(()); } - drop(state); + let streamheaders = streamheaders.as_slice(); + let mut headers: Vec = Vec::new(); - if let Err(error_message) = self.flush_buffer() { - gst::error!( - CAT, - imp = self, - "Failed to finalize the upload: {:?}", - error_message - ); - return false; + let mut state = self.state.lock().unwrap(); + let started_state = match *state { + State::Started(ref mut started_state) => started_state, + State::Stopped => { + return Err(gst::loggable_error!(CAT, "Element should be started")); + } + }; + + started_state.streamheaders_size = 0; + + for header in streamheaders { + let buffer = header.get::>(); + + if let Ok(Some(buf)) = buffer { + let map = buf.map_readable().map_err(|_| { + gst::element_imp_error!( + self, + gst::CoreError::Failed, + ["Failed to map streamheader buffer"] + ); + gst::loggable_error!(CAT, "Failed to map streamheader buffer") + })?; + + headers.extend_from_slice(map.as_slice()); + + started_state.streamheaders_size += map.size() as u64; + } + } + + if !headers.is_empty() { + let _ = started_state.streamheaders.take(); + gst::info!(CAT, imp = self, "Got streamheaders"); + started_state.streamheaders = Some(headers); } } - BaseSinkImplExt::parent_event(self, event) + self.parent_set_caps(caps) } }