From d5a9c7a94082780d10104f426787b6f26baf61f5 Mon Sep 17 00:00:00 2001 From: Jendrik Weise Date: Sat, 31 Aug 2024 04:05:11 +0200 Subject: [PATCH] fmp4: Add split-at-running-time signal Part-of: --- docs/plugins/gst_plugins_cache.json | 19 +++ mux/fmp4/src/fmp4mux/imp.rs | 200 +++++++++++++++++++--------- 2 files changed, 154 insertions(+), 65 deletions(-) diff --git a/docs/plugins/gst_plugins_cache.json b/docs/plugins/gst_plugins_cache.json index ec75f02f..fa39a795 100644 --- a/docs/plugins/gst_plugins_cache.json +++ b/docs/plugins/gst_plugins_cache.json @@ -2449,6 +2449,25 @@ "type": "gboolean", "writable": true } + }, + "signals": { + "send-headers": { + "action": true, + "args": [], + "return-type": "void", + "when": "last" + }, + "split-at-running-time": { + "action": true, + "args": [ + { + "name": "arg0", + "type": "guint64" + } + ], + "return-type": "void", + "when": "last" + } } }, "GstFMP4MuxHeaderUpdateMode": { diff --git a/mux/fmp4/src/fmp4mux/imp.rs b/mux/fmp4/src/fmp4mux/imp.rs index 2012f4f8..170a41a7 100644 --- a/mux/fmp4/src/fmp4mux/imp.rs +++ b/mux/fmp4/src/fmp4mux/imp.rs @@ -12,8 +12,10 @@ use gst::subclass::prelude::*; use gst_base::prelude::*; use gst_base::subclass::prelude::*; +use std::collections::BTreeSet; use std::collections::VecDeque; use std::mem; +use std::ops::Bound::Excluded; use std::sync::Mutex; use crate::fmp4mux::obu::read_seq_header_obu_bytes; @@ -261,6 +263,8 @@ struct State { /// Start PTS of the current fragment fragment_start_pts: Option, + /// End PTS of the current fragment + fragment_end_pts: Option, /// Start PTS of the current chunk /// /// This is equal to `fragment_start_pts` if the current chunk is the first of a fragment, @@ -271,6 +275,9 @@ struct State { /// If headers (ftyp / moov box) were sent. sent_headers: bool, + + /// Manually requested fragment boundaries + manual_fragment_boundaries: BTreeSet, } #[derive(Default)] @@ -650,15 +657,11 @@ impl FMP4Mux { /// Finds the stream that has the earliest buffer queued. fn find_earliest_stream<'a>( &self, - state: &'a mut State, + streams: &'a mut [Stream], timeout: bool, fragment_duration: gst::ClockTime, ) -> Result, gst::FlowError> { - if state - .streams - .iter() - .all(|s| s.fragment_filled || s.chunk_filled) - { + if streams.iter().all(|s| s.fragment_filled || s.chunk_filled) { gst::trace!( CAT, imp = self, @@ -670,7 +673,7 @@ impl FMP4Mux { let mut earliest_stream = None; let mut all_have_data_or_eos = true; - for stream in state.streams.iter_mut() { + for stream in streams.iter_mut() { let pre_queued_buffer = match Self::peek_buffer(self, stream, fragment_duration) { Ok(Some(buffer)) => buffer, Ok(None) | Err(gst_base::AGGREGATOR_FLOW_NEED_DATA) => { @@ -1024,12 +1027,13 @@ impl FMP4Mux { timeout: bool, ) -> Result<(), gst::FlowError> { let fragment_start_pts = state.fragment_start_pts; + let fragment_end_pts = state.fragment_end_pts; let chunk_start_pts = state.chunk_start_pts; // Always take a buffer from the stream with the earliest queued buffer to keep the // fill-level at all sinkpads in sync. while let Some(stream) = - self.find_earliest_stream(state, timeout, settings.fragment_duration)? + self.find_earliest_stream(&mut state.streams, timeout, settings.fragment_duration)? { let pre_queued_buffer = Self::pop_buffer(self, stream); @@ -1037,28 +1041,52 @@ impl FMP4Mux { self.queue_gops(stream, pre_queued_buffer)?; // Check if this stream is filled enough now. - self.check_stream_filled(settings, stream, fragment_start_pts, chunk_start_pts, false); + self.check_stream_filled( + settings, + stream, + fragment_start_pts, + fragment_end_pts, + chunk_start_pts, + false, + ); } Ok(()) } + fn get_fragment_end_pts( + &self, + manual_fragment_boundaries: &BTreeSet, + settings: &Settings, + fragment_start_pts: gst::ClockTime, + ) -> gst::ClockTime { + let fragment_end_pts = fragment_start_pts + settings.fragment_duration; + + // If we have a manual fragment boundary set then use that + return *manual_fragment_boundaries + .range((Excluded(fragment_start_pts), Excluded(fragment_end_pts))) + .next() + .unwrap_or(&fragment_end_pts); + } + /// Check if the stream is filled enough for the current chunk / fragment. fn check_stream_filled( &self, settings: &Settings, stream: &mut Stream, fragment_start_pts: Option, + fragment_end_pts: Option, chunk_start_pts: Option, all_eos: bool, ) { - // Either both are none or neither - let (chunk_start_pts, fragment_start_pts) = match (chunk_start_pts, fragment_start_pts) { - (Some(chunk_start_pts), Some(fragment_start_pts)) => { - (chunk_start_pts, fragment_start_pts) - } - _ => return, - }; + // Either all are none or none are + let (chunk_start_pts, fragment_start_pts, fragment_end_pts) = + match (chunk_start_pts, fragment_start_pts, fragment_end_pts) { + (Some(chunk_start_pts), Some(fragment_start_pts), Some(fragment_end_pts)) => { + (chunk_start_pts, fragment_start_pts, fragment_end_pts) + } + _ => return, + }; // Check if this stream is filled enough now. if let Some(chunk_duration) = settings.chunk_duration { @@ -1072,7 +1100,6 @@ impl FMP4Mux { ); let chunk_end_pts = chunk_start_pts + chunk_duration; - let fragment_end_pts = fragment_start_pts + settings.fragment_duration; if fragment_end_pts < chunk_end_pts { gst::trace!( @@ -1219,13 +1246,12 @@ impl FMP4Mux { } } else { // Check if the end of the latest finalized GOP is after the fragment end - let fragment_end_pts = fragment_start_pts + settings.fragment_duration; gst::trace!( CAT, obj = stream.sinkpad, "Current fragment start {}, current fragment end {}", fragment_start_pts, - fragment_start_pts + settings.fragment_duration, + fragment_end_pts, ); // If the first GOP already starts after the fragment end PTS then this stream is @@ -1349,13 +1375,20 @@ impl FMP4Mux { earliest_pts, start_dts.display() ); + + let fragment_start_pts = earliest_pts; + let fragment_end_pts = + self.get_fragment_end_pts(&state.manual_fragment_boundaries, settings, earliest_pts); + let chunk_start_pts = earliest_pts; + state.earliest_pts = Some(earliest_pts); state.start_dts = start_dts; - state.fragment_start_pts = Some(earliest_pts); - state.chunk_start_pts = Some(earliest_pts); + state.fragment_start_pts = Some(fragment_start_pts); + state.fragment_end_pts = Some(fragment_end_pts); + state.chunk_start_pts = Some(chunk_start_pts); // Now send force-keyunit events for the second fragment start. - let fku_time = earliest_pts + settings.fragment_duration; + let fku_time = fragment_end_pts; for stream in &state.streams { let current_position = stream.current_position; @@ -1409,16 +1442,14 @@ impl FMP4Mux { upstream_events.push((stream.sinkpad.clone(), fku)); } - let fragment_start_pts = state.fragment_start_pts; - let chunk_start_pts = state.chunk_start_pts; - // Check if any of the streams are already filled enough for the first chunk/fragment. for stream in &mut state.streams { self.check_stream_filled( settings, stream, - fragment_start_pts, - chunk_start_pts, + state.fragment_start_pts, + state.fragment_end_pts, + state.chunk_start_pts, all_eos, ); } @@ -1432,7 +1463,7 @@ impl FMP4Mux { stream: &mut Stream, timeout: bool, all_eos: bool, - fragment_start_pts: gst::ClockTime, + fragment_end_pts: gst::ClockTime, chunk_start_pts: gst::ClockTime, chunk_end_pts: Option, fragment_start: bool, @@ -1462,7 +1493,7 @@ impl FMP4Mux { chunk_end_pts } else if fragment_filled { // Fragment is filled, so only dequeue everything until the latest GOP - fragment_start_pts + settings.fragment_duration + fragment_end_pts } else { // Fragment is not filled and we either have a full chunk or timeout chunk_start_pts + chunk_duration @@ -1654,7 +1685,7 @@ impl FMP4Mux { // Not the first stream chunk_end_pts } else { - fragment_start_pts + settings.fragment_duration + fragment_end_pts }; gst::trace!( @@ -1902,6 +1933,7 @@ impl FMP4Mux { let mut chunk_end_pts = None; let fragment_start_pts = state.fragment_start_pts.unwrap(); + let fragment_end_pts = state.fragment_end_pts.unwrap(); let chunk_start_pts = state.chunk_start_pts.unwrap(); let fragment_start = fragment_start_pts == chunk_start_pts; @@ -1915,7 +1947,7 @@ impl FMP4Mux { .find(|s| { !s.sinkpad.is_eos() && s.queued_gops.back().is_some_and(|gop| { - gop.start_pts <= fragment_start_pts + settings.fragment_duration + gop.start_pts <= fragment_end_pts // In chunk mode we might've drained a partial GOP as a chunk after // the fragment end if the keyframe came too late. The GOP now // starts with a non-keyframe after the fragment end but is part of @@ -1955,7 +1987,7 @@ impl FMP4Mux { "Starting to drain at {} (fragment start {}, fragment end {}, chunk start {}, chunk end {})", chunk_start_pts, fragment_start_pts, - fragment_start_pts + settings.fragment_duration, + fragment_end_pts, chunk_start_pts.display(), settings.chunk_duration.map(|duration| chunk_start_pts + duration).display(), ); @@ -1968,7 +2000,7 @@ impl FMP4Mux { stream, timeout, all_eos, - fragment_start_pts, + fragment_end_pts, chunk_start_pts, chunk_end_pts, fragment_start, @@ -1995,7 +2027,7 @@ impl FMP4Mux { let stream_after_chunk = stream.queued_gops.back().is_some_and(|gop| { gop.start_pts >= if fragment_filled { - fragment_start_pts + settings.fragment_duration + fragment_end_pts } else { chunk_start_pts + settings.chunk_duration.unwrap() } @@ -2031,6 +2063,15 @@ impl FMP4Mux { } } + if fragment_filled { + while let Some(boundary) = state.manual_fragment_boundaries.first() { + if boundary > &fragment_end_pts { + break; + } + let _ = state.manual_fragment_boundaries.pop_first(); + } + } + if gops.is_empty() { gst::info!(CAT, obj = stream.sinkpad, "Draining no buffers",); @@ -2232,11 +2273,9 @@ impl FMP4Mux { fn request_force_keyunit_event( &self, state: &State, - settings: &Settings, upstream_events: &mut Vec<(super::FMP4MuxPad, gst::Event)>, - chunk_end_pts: gst::ClockTime, ) { - let fku_time = chunk_end_pts + settings.fragment_duration; + let fku_time = state.fragment_end_pts.unwrap(); for stream in &state.streams { let current_position = stream.current_position; @@ -2501,6 +2540,11 @@ impl FMP4Mux { if fragment_filled { state.fragment_start_pts = Some(chunk_end_pts); + state.fragment_end_pts = Some(self.get_fragment_end_pts( + &state.manual_fragment_boundaries, + settings, + chunk_end_pts, + )); gst::info!( CAT, imp = self, @@ -2515,7 +2559,7 @@ impl FMP4Mux { // If the current fragment is filled we already have the next fragment's start // keyframe and can request the following one. if fragment_filled { - self.request_force_keyunit_event(state, settings, upstream_events, chunk_end_pts); + self.request_force_keyunit_event(state, upstream_events); } // Reset timeout delay now that we've output an actual fragment or chunk @@ -2601,6 +2645,7 @@ impl FMP4Mux { timeout = false; let fragment_start_pts = state.fragment_start_pts; + let fragment_end_pts = state.fragment_end_pts; let chunk_start_pts = state.chunk_start_pts; for stream in &mut state.streams { // Check if this stream is still filled enough now. @@ -2608,6 +2653,7 @@ impl FMP4Mux { settings, stream, fragment_start_pts, + fragment_end_pts, chunk_start_pts, all_eos, ); @@ -2919,7 +2965,54 @@ impl ObjectSubclass for FMP4Mux { type Class = Class; } +static FMP4_SIGNAL_SEND_HEADERS: &str = "send-headers"; +static FMP4_SIGNAL_SPLIT_AT_RUNNING_TIME: &str = "split-at-running-time"; + impl ObjectImpl for FMP4Mux { + fn signals() -> &'static [glib::subclass::Signal] { + static SIGNALS: Lazy> = Lazy::new(|| { + vec![ + glib::subclass::Signal::builder(FMP4_SIGNAL_SEND_HEADERS) + .action() + .class_handler(|_token, args| { + let element = args[0].get::().expect("signal arg"); + let imp = element.imp(); + let mut state = imp.state.lock().unwrap(); + + state.sent_headers = false; + gst::debug!( + CAT, + obj = element, + "Init headers will be re-sent alongside the next chunk" + ); + + None + }) + .build(), + glib::subclass::Signal::builder(FMP4_SIGNAL_SPLIT_AT_RUNNING_TIME) + .param_types([gst::ClockTime::static_type()]) + .action() + .class_handler(|_token, args| { + let element = args[0].get::().expect("signal arg"); + let imp = element.imp(); + let mut state = imp.state.lock().unwrap(); + let time = args[1] + .get::>() + .expect("time arg") + .unwrap_or(gst::ClockTime::ZERO); + + state.manual_fragment_boundaries.insert(time); + gst::debug!(CAT, obj = element, "New fragment split added at {:?}", time); + + None + }) + .build(), + ] + }); + + SIGNALS.as_ref() + } + fn properties() -> &'static [glib::ParamSpec] { static PROPERTIES: Lazy> = Lazy::new(|| { vec![ @@ -3316,6 +3409,7 @@ impl AggregatorImpl for FMP4Mux { state.current_offset = 0; state.fragment_offsets.clear(); + state.manual_fragment_boundaries.clear(); drop(state); @@ -3379,6 +3473,7 @@ impl AggregatorImpl for FMP4Mux { gst::debug!(CAT, imp = self, "All streams are EOS now"); let fragment_start_pts = state.fragment_start_pts; + let fragment_end_pts = state.fragment_end_pts; let chunk_start_pts = state.chunk_start_pts; for stream in &mut state.streams { @@ -3387,6 +3482,7 @@ impl AggregatorImpl for FMP4Mux { &settings, stream, fragment_start_pts, + fragment_end_pts, chunk_start_pts, true, ); @@ -3632,8 +3728,6 @@ impl FMP4MuxImpl for ISOFMP4Mux { const VARIANT: super::Variant = super::Variant::ISO; } -static CMAF_SIGNAL_SEND_HEADERS: &str = "send-headers"; - #[derive(Default)] pub(crate) struct CMAFMux; @@ -3644,31 +3738,7 @@ impl ObjectSubclass for CMAFMux { type ParentType = super::FMP4Mux; } -impl ObjectImpl for CMAFMux { - fn signals() -> &'static [glib::subclass::Signal] { - static SIGNALS: Lazy> = Lazy::new(|| { - vec![glib::subclass::Signal::builder(CMAF_SIGNAL_SEND_HEADERS) - .action() - .class_handler(|_token, args| { - let element = args[0].get::().expect("signal arg"); - let imp = element.imp(); - let mut state = imp.state.lock().unwrap(); - - state.sent_headers = false; - gst::debug!( - CAT, - obj = element, - "Init headers will be re-sent alongside the next chunk" - ); - - None - }) - .build()] - }); - - SIGNALS.as_ref() - } -} +impl ObjectImpl for CMAFMux {} impl GstObjectImpl for CMAFMux {}