From 78c59118ee93c4bb4bdc583ed8e505feb0747d7a Mon Sep 17 00:00:00 2001 From: Oscar J Linde Date: Thu, 12 Oct 2023 10:39:03 +0200 Subject: [PATCH] fmp4mux: Add support for creating fragment based on size Adds a new property called fragment-max-size that acts as a limit for the fragment size. The limit reached first of fragment-duration and fragment-max-size decides the duration/size of the fragment. --- mux/fmp4/src/fmp4mux/imp.rs | 64 +++++++++++++++++++++++++++++++++++-- 1 file changed, 62 insertions(+), 2 deletions(-) diff --git a/mux/fmp4/src/fmp4mux/imp.rs b/mux/fmp4/src/fmp4mux/imp.rs index 6da92b91..0d23f051 100644 --- a/mux/fmp4/src/fmp4mux/imp.rs +++ b/mux/fmp4/src/fmp4mux/imp.rs @@ -101,6 +101,7 @@ const DEFAULT_WRITE_MFRA: bool = false; const DEFAULT_WRITE_MEHD: bool = false; const DEFAULT_INTERLEAVE_BYTES: Option = None; const DEFAULT_INTERLEAVE_TIME: Option = Some(gst::ClockTime::from_mseconds(250)); +const DEFAULT_FRAGMENT_MAX_SIZE: u32 = u32::MAX; #[derive(Debug, Clone)] struct Settings { @@ -113,6 +114,7 @@ struct Settings { interleave_time: Option, movie_timescale: u32, offset_to_zero: bool, + fragment_max_size: u32, } impl Default for Settings { @@ -127,6 +129,7 @@ impl Default for Settings { interleave_time: DEFAULT_INTERLEAVE_TIME, movie_timescale: 0, offset_to_zero: false, + fragment_max_size: DEFAULT_FRAGMENT_MAX_SIZE, } } } @@ -212,6 +215,10 @@ struct Stream { fragment_filled: bool, /// Whether a whole chunk is queued. chunk_filled: bool, + /// Current size of the stream + size: usize, + /// Whether the fragment max size is reached. + max_size_reached: bool, /// Difference between the first DTS and 0 in case of negative DTS dts_offset: Option, @@ -682,6 +689,7 @@ impl FMP4Mux { /// Queue incoming buffer as individual GOPs. fn queue_gops( &self, + settings: &Settings, stream: &mut Stream, mut pre_queued_buffer: PreQueuedBuffer, ) -> Result<(), gst::FlowError> { @@ -792,6 +800,7 @@ impl FMP4Mux { stream.dts_offset.display(), ); + stream.size += buffer.size(); let gop = Gop { start_pts: pts, start_dts: dts, @@ -823,7 +832,7 @@ impl FMP4Mux { prev_gop.end_pts = std::cmp::max(prev_gop.end_pts, pts); prev_gop.end_dts = std::cmp::max(prev_gop.end_dts, dts); - if !delta_frames.requires_dts() { + if !delta_frames.requires_dts() || settings.chunk_duration.is_none() { prev_gop.final_end_pts = true; } @@ -848,6 +857,7 @@ impl FMP4Mux { } else if let Some(gop) = stream.queued_gops.front_mut() { assert!(!delta_frames.intra_only()); + stream.size += buffer.size(); gop.end_pts = std::cmp::max(gop.end_pts, end_pts); gop.end_dts = gop.end_dts.opt_max(end_dts); gop.buffers.push(GopBuffer { @@ -956,7 +966,7 @@ impl FMP4Mux { let pre_queued_buffer = Self::pop_buffer(self, stream); // Queue up the buffer and update GOP tracking state - self.queue_gops(stream, pre_queued_buffer)?; + self.queue_gops(settings, stream, pre_queued_buffer)?; // Check if this stream is filled enough now. self.check_stream_filled(settings, stream, fragment_start_pts, chunk_start_pts, false); @@ -982,6 +992,22 @@ impl FMP4Mux { _ => return, }; + if !stream.max_size_reached { + if stream.size > settings.fragment_max_size as usize { + gst::debug!(CAT, "Reached fragment max size ({} bytes)", stream.size); + stream.max_size_reached = true; + + // Send force-keyunit event to instatly request a new I-frame + // so that the current gop and fragment can be completed. + let fku = gst_video::UpstreamForceKeyUnitEvent::builder() + .running_time(None) + .all_headers(true) + .build(); + + stream.sinkpad.push_event(fku); + } + } + // Check if this stream is filled enough now. if let Some(chunk_duration) = settings.chunk_duration { // In chunk mode @@ -1177,6 +1203,17 @@ impl FMP4Mux { gst::debug!(CAT, obj: stream.sinkpad, "Stream queued enough data for this fragment"); stream.fragment_filled = true; } + + // Check if we have complete GOPs that have reached fragment max size + if stream.max_size_reached { + let complete_gops_size = stream.queued_gops.iter() + .filter(|gop| gop.final_end_pts) + .map(|gop| gop.buffers.iter().map(|gb| gb.buffer.size()).sum::()) + .sum::(); + if complete_gops_size >= settings.fragment_max_size as usize { + stream.fragment_filled = true; + } + } } } @@ -1613,6 +1650,7 @@ impl FMP4Mux { obj: stream.sinkpad, "Pushing complete GOP", ); + stream.size -= gop.buffers.iter().map(|gb| gb.buffer.size()).sum::(); gops.push(stream.queued_gops.pop_back().unwrap()); } } @@ -1891,6 +1929,7 @@ impl FMP4Mux { )?; stream.fragment_filled = false; stream.chunk_filled = false; + stream.max_size_reached = false; // If we don't have a next chunk start PTS then this is the first stream as above. if chunk_end_pts.is_none() { @@ -2615,6 +2654,8 @@ impl FMP4Mux { queued_gops: VecDeque::new(), fragment_filled: false, chunk_filled: false, + size: 0, + max_size_reached: false, dts_offset: None, current_position: gst::ClockTime::ZERO, running_time_utc_time_mapping: None, @@ -2871,6 +2912,12 @@ impl ObjectImpl for FMP4Mux { .blurb("Timescale to use for the movie (units per second, 0 is automatic)") .mutable_ready() .build(), + glib::ParamSpecUInt::builder("fragment-max-size") + .nick("Fragment max size in bytes") + .blurb("Maximum byte size for each FMP4 fragment, triumphs fragment-duration") + .default_value(DEFAULT_FRAGMENT_MAX_SIZE) + .mutable_ready() + .build(), ] }); @@ -2941,6 +2988,14 @@ impl ObjectImpl for FMP4Mux { settings.movie_timescale = value.get().expect("type checked upstream"); } + "fragment-max-size" => { + let mut settings = self.settings.lock().unwrap(); + let fragment_max_size = value.get().expect("type checked upstream"); + if settings.fragment_max_size != fragment_max_size { + settings.fragment_max_size = fragment_max_size; + } + } + _ => unimplemented!(), } } @@ -2987,6 +3042,11 @@ impl ObjectImpl for FMP4Mux { settings.movie_timescale.to_value() } + "fragment-max-size" => { + let settings = self.settings.lock().unwrap(); + settings.fragment_max_size.to_value() + } + _ => unimplemented!(), } }