From 0c8b84d8a8fb6cd6a5d4aff2517c8c8ac3ce8269 Mon Sep 17 00:00:00 2001 From: Jochen Henneberg Date: Mon, 19 Aug 2024 08:54:20 +0200 Subject: [PATCH] fmp4mux: Send new fragment with header in case of caps change Part-of: --- mux/fmp4/src/fmp4mux/imp.rs | 208 ++++++++++++++++++++++++++++++++---- mux/fmp4/src/fmp4mux/mod.rs | 2 +- 2 files changed, 189 insertions(+), 21 deletions(-) diff --git a/mux/fmp4/src/fmp4mux/imp.rs b/mux/fmp4/src/fmp4mux/imp.rs index 6b8a715b..827b250e 100644 --- a/mux/fmp4/src/fmp4mux/imp.rs +++ b/mux/fmp4/src/fmp4mux/imp.rs @@ -207,6 +207,10 @@ struct Stream { /// Currently configured caps for this stream. caps: gst::Caps, + /// Set to the new caps on caps change. If set, this stream will + /// not accept any further buffers until the chunk/fragment is + /// drained and draining will happen ASAP. + next_caps: Option, /// Whether this stream is intra-only and has frame reordering. delta_frames: DeltaFrames, /// Whether this stream might have header frames without timestamps that should be ignored. @@ -243,6 +247,10 @@ struct State { /// sent as part of the buffer list for the first fragment. stream_header: Option, + /// Set to true if the caps of *any* sinkpad have changed or on + /// new language code or image orientation. + caps_changed: bool, + /// Sequence number of the current fragment. sequence_number: u32, @@ -336,6 +344,12 @@ impl FMP4Mux { } } + // do not collect buffers anymore once the caps change has + // been confirmed + if stream.next_caps.is_some() { + return Ok(None); + } + // Pop buffer here, it will be stored in the pre-queue after calculating its timestamps let Some(mut buffer) = stream.sinkpad.pop_buffer() else { return Ok(None); @@ -654,6 +668,56 @@ impl FMP4Mux { stream.pre_queue.pop_front().unwrap() } + // Caps changes are allowed only in case that the + // header-update-mode is None + fn caps_change_allowed(&self) -> bool { + if self.settings.lock().unwrap().header_update_mode == super::HeaderUpdateMode::None { + true + } else { + gst::error!( + CAT, + imp = self, + "Caps change not allowed if header-update-mode is enabled" + ); + false + } + } + + /// Update stream caps only if they have relevant changes for the header. + fn caps_compatible(&self, stream: &Stream, caps: &gst::CapsRef) -> bool { + let fields: &[&str] = match caps.structure(0).unwrap().name().as_str() { + "video/x-h264" | "video/x-h265" | "video/x-vp8" | "video/x-vp9" | "video/x-av1" + | "image/jpeg" => [ + "width", + "height", + "profile", + "level", + "tier", + "colorimetry", + "stream-format", + "chroma-format", + "bit-depth-luma", + "codec_data", + ] + .as_slice(), + "audio/mpeg" | "audio/x-opus" | "audio/x-flac" | "audio/x-alaw" | "audio/x-mulaw" + | "audio/x-adpcm" => ["channels", "rate", "layout", "bitrate", "codec_data"].as_slice(), + "application/x-onvif-metadata" => [].as_slice(), + _ => unreachable!(), + }; + + // Now check if all relevant fields for the existing caps match the new caps + !fields.iter().any(|f| { + let c = stream.caps.structure(0).unwrap().value(*f); + let n = caps.structure(0).unwrap().value(*f); + match (c, n) { + (Ok(c), Ok(n)) => c.compare(n) != Some(core::cmp::Ordering::Equal), + (Err(_), Err(_)) => false, + _ => true, + } + }) + } + /// Finds the stream that has the earliest buffer queued. fn find_earliest_stream<'a>( &self, @@ -1086,6 +1150,14 @@ impl FMP4Mux { _ => return, }; + // Early return, if caps have changed assume this stream is + // ready for pushing a fragment + if stream.next_caps.is_some() { + stream.fragment_filled = true; + stream.chunk_filled = true; + return; + } + // Check if this stream is filled enough now. if let Some(chunk_duration) = settings.chunk_duration { // In chunk mode @@ -1334,7 +1406,7 @@ impl FMP4Mux { for stream in &state.streams { let (stream_earliest_pts, stream_start_dts) = match stream.queued_gops.back() { None => { - if !all_eos && !timeout { + if !all_eos && !timeout && !state.caps_changed { earliest_pts = None; start_dts = None; break; @@ -1342,7 +1414,8 @@ impl FMP4Mux { continue; } Some(oldest_gop) => { - if !all_eos && !timeout && !oldest_gop.final_earliest_pts { + if !all_eos && !timeout && !state.caps_changed && !oldest_gop.final_earliest_pts + { earliest_pts = None; start_dts = None; break; @@ -1372,9 +1445,10 @@ impl FMP4Mux { gst::info!( CAT, imp = self, - "Got earliest PTS {}, start DTS {} (timeout: {timeout}, all eos: {all_eos})", + "Got earliest PTS {}, start DTS {} (timeout: {timeout}, all eos: {all_eos}, caps changed: {})", earliest_pts, - start_dts.display() + start_dts.display(), + state.caps_changed, ); let fragment_start_pts = earliest_pts; @@ -1461,6 +1535,7 @@ impl FMP4Mux { &self, settings: &Settings, stream: &mut Stream, + caps_changed: bool, timeout: bool, all_eos: bool, fragment_end_pts: gst::ClockTime, @@ -1471,6 +1546,7 @@ impl FMP4Mux { ) -> Result, gst::FlowError> { assert!( timeout + || caps_changed || stream.sinkpad.is_eos() || stream.queued_gops.get(1).map(|gop| gop.final_earliest_pts) == Some(true) || settings.chunk_duration.is_some() @@ -1522,7 +1598,7 @@ impl FMP4Mux { "Fragment filled, current GOP start {} end {} (final {})", gop.start_pts, gop.end_pts, - gop.final_end_pts || stream.sinkpad.is_eos() + gop.final_end_pts || stream.sinkpad.is_eos() || caps_changed, ); // If we have a final GOP then include it as long as it's either @@ -1531,7 +1607,7 @@ impl FMP4Mux { // // The second case would happen if no GOP ends between the last chunk of the // fragment and the fragment duration. - if (gop.final_end_pts || stream.sinkpad.is_eos()) + if (gop.final_end_pts || stream.sinkpad.is_eos() || caps_changed) && (gop.end_pts <= dequeue_end_pts || (gops.is_empty() && chunk_end_pts.is_none())) { @@ -1548,7 +1624,7 @@ impl FMP4Mux { // // If this is not the first stream then take an incomplete GOP. if gop.start_pts >= dequeue_end_pts - || (!gop.final_earliest_pts && !stream.sinkpad.is_eos()) + || (!gop.final_earliest_pts && !stream.sinkpad.is_eos() && !caps_changed) { gst::trace!(CAT, obj = stream.sinkpad, "GOP starts after fragment end",); break; @@ -1569,16 +1645,17 @@ impl FMP4Mux { "Chunk filled, current GOP start {} end {} (final {})", gop.start_pts, gop.end_pts, - gop.final_end_pts || stream.sinkpad.is_eos() + gop.final_end_pts || stream.sinkpad.is_eos() || caps_changed ); } - if gop.end_pts <= dequeue_end_pts && (gop.final_end_pts || stream.sinkpad.is_eos()) + if gop.end_pts <= dequeue_end_pts + && (gop.final_end_pts || stream.sinkpad.is_eos() || caps_changed) { gst::trace!(CAT, obj = stream.sinkpad, "Pushing whole GOP",); gops.push(stream.queued_gops.pop_back().unwrap()); } else if gop.start_pts >= dequeue_end_pts - || (!gop.final_earliest_pts && !stream.sinkpad.is_eos()) + || (!gop.final_earliest_pts && !stream.sinkpad.is_eos() && !caps_changed) { gst::trace!(CAT, obj = stream.sinkpad, "GOP starts after chunk end",); break; @@ -1615,7 +1692,7 @@ impl FMP4Mux { // The last buffer of the GOP starts before the chunk end but ends // after the end. We still take it here and remove the whole GOP. if split_index == gop.buffers.len() - 1 { - if gop.final_end_pts || stream.sinkpad.is_eos() { + if gop.final_end_pts || stream.sinkpad.is_eos() || caps_changed { gst::trace!(CAT, obj = stream.sinkpad, "Pushing whole GOP",); gops.push(stream.queued_gops.pop_back().unwrap()); } else { @@ -1702,7 +1779,7 @@ impl FMP4Mux { "Current GOP start {} end {} (final {})", gop.start_pts, gop.end_pts, - gop.final_end_pts || stream.sinkpad.is_eos() + gop.final_end_pts || stream.sinkpad.is_eos() || caps_changed ); // If this GOP is not complete then we can't pop it yet. @@ -1710,7 +1787,7 @@ impl FMP4Mux { // If there was no complete GOP at all yet then it might be bigger than the // fragment duration. In this case we might not be able to handle the latency // requirements in a live pipeline. - if !gop.final_end_pts && !stream.sinkpad.is_eos() { + if !gop.final_end_pts && !stream.sinkpad.is_eos() && !caps_changed { gst::trace!( CAT, obj = stream.sinkpad, @@ -1996,6 +2073,7 @@ impl FMP4Mux { let gops = self.drain_buffers_one_stream( settings, stream, + state.caps_changed, timeout, all_eos, fragment_end_pts, @@ -2344,6 +2422,8 @@ impl FMP4Mux { gst::info!(CAT, imp = self, "Draining at EOS"); } else if timeout { gst::info!(CAT, imp = self, "Draining at timeout"); + } else if state.caps_changed { + gst::info!(CAT, imp = self, "Draining on caps change"); } else { for stream in &state.streams { if !stream.chunk_filled && !stream.fragment_filled && !stream.sinkpad.is_eos() { @@ -2357,6 +2437,14 @@ impl FMP4Mux { ); } + // The fragment_start is set once the first buffer has been + // accepted and then just updated with the end time of the + // previous chunk so once set it will always be set + if state.fragment_start_pts.is_none() { + gst::info!(CAT, imp = self, "Drain requested with nothing to drain yet"); + return Ok((None, None)); + } + // Collect all buffers and their timing information that are to be drained right now. let ( drained_streams, @@ -2536,7 +2624,7 @@ impl FMP4Mux { // Update for the start PTS of the next fragment / chunk - if fragment_filled { + if fragment_filled || state.caps_changed { state.fragment_start_pts = Some(chunk_end_pts); state.fragment_end_pts = Some(self.get_fragment_end_pts( &state.manual_fragment_boundaries, @@ -2746,6 +2834,7 @@ impl FMP4Mux { state.streams.push(Stream { sinkpad: pad, caps, + next_caps: None, delta_frames, discard_header_buffers, pre_queue: VecDeque::new(), @@ -3249,8 +3338,18 @@ impl AggregatorImpl for FMP4Mux { } if let Some(filter_caps) = q.filter() { - let res = filter_caps + let mut res = filter_caps .intersect_with_mode(&allowed_caps, gst::CapsIntersectMode::First); + + // if the caps changed build new caps and reset + // the stream header + if res.is_empty() || !res.is_fixed() { + res = filter_caps.intersect_with_mode( + &aggregator_pad.pad_template_caps(), + gst::CapsIntersectMode::First, + ); + } + q.set_result(&res); } else { q.set_result(&allowed_caps); @@ -3316,6 +3415,35 @@ impl AggregatorImpl for FMP4Mux { self.parent_sink_event(aggregator_pad, event) } + EventView::Caps(caps) => { + // Only care of caps if streams have been setup + let mut state = self.state.lock().unwrap(); + if !state.streams.is_empty() { + state.caps_changed = if let Some(stream) = state + .streams + .iter_mut() + .find(|s| *aggregator_pad == s.sinkpad) + { + if self.caps_change_allowed() && !self.caps_compatible(stream, caps.caps()) + { + gst::trace!( + CAT, + obj = aggregator_pad, + "Update caps and send new headers {:?}", + caps + ); + stream.next_caps = Some(caps.caps().to_owned()); + true + } else { + false + } + } else { + false + }; + } + drop(state); + self.parent_sink_event(aggregator_pad, event) + } EventView::Tag(ev) => { if let Some(tag_value) = ev.tag().get::() { let lang = tag_value.get(); @@ -3334,7 +3462,17 @@ impl AggregatorImpl for FMP4Mux { for (out, c) in Iterator::zip(language_code.iter_mut(), lang.chars()) { *out = c as u8; } - state.language_code = Some(language_code); + + let language_code = Some(language_code); + // If the language changed and we have buffers + // trigger caps change + if state.language_code != language_code + && !state.streams.is_empty() + && self.caps_change_allowed() + { + state.language_code = language_code; + state.caps_changed = true; + } } } else if let Some(tag_value) = ev.tag().get::() { let orientation = tag_value.get(); @@ -3346,7 +3484,7 @@ impl AggregatorImpl for FMP4Mux { ); let mut state = self.state.lock().unwrap(); - state.orientation = match orientation { + let orientation = match orientation { "rotate-0" => Some(ImageOrientation::Rotate0), "rotate-90" => Some(ImageOrientation::Rotate90), "rotate-180" => Some(ImageOrientation::Rotate180), @@ -3357,6 +3495,16 @@ impl AggregatorImpl for FMP4Mux { // "flip-rotate-180" => Some(ImageOrientation::FlipRotate180), // "flip-rotate-270" => Some(ImageOrientation::FlipRotate270), _ => None, + }; + + // If the orientation changed and we have buffers + // trigger caps change + if state.orientation != orientation + && !state.streams.is_empty() + && self.caps_change_allowed() + { + state.orientation = orientation; + state.caps_changed = true; } } @@ -3451,11 +3599,13 @@ impl AggregatorImpl for FMP4Mux { fn aggregate(&self, timeout: bool) -> Result { let settings = self.settings.lock().unwrap().clone(); + gst::trace!(CAT, imp = self, "Aggregate (timeout: {timeout})"); + let all_eos; let mut caps = None; let mut buffers = vec![]; let mut upstream_events = vec![]; - let res = { + let (caps_changed, res) = { let mut state = self.state.lock().unwrap(); // Create streams @@ -3495,7 +3645,7 @@ impl AggregatorImpl for FMP4Mux { ); // Drain everything that can be drained at this point - self.drain( + let res = self.drain( &mut state, &settings, all_eos, @@ -3503,7 +3653,9 @@ impl AggregatorImpl for FMP4Mux { &mut caps, &mut buffers, &mut upstream_events, - ) + ); + + (state.caps_changed, res) }; for (sinkpad, event) in upstream_events { @@ -3520,6 +3672,22 @@ impl AggregatorImpl for FMP4Mux { self.obj().finish_buffer_list(buffer_list)?; } + // all drained then check if a caps change is pending + if caps_changed { + let mut state = self.state.lock().unwrap(); + gst::info!( + CAT, + imp = self, + "Reset state, update stream caps and send new header on caps changed" + ); + state.caps_changed = false; + state.stream_header = None; + state.sent_headers = false; + for stream in state.streams.iter_mut().filter(|s| s.next_caps.is_some()) { + stream.caps = stream.next_caps.take().unwrap(); + } + } + // If an error happened above while draining, return this now after pushing // any output that was produced before the error. res?; diff --git a/mux/fmp4/src/fmp4mux/mod.rs b/mux/fmp4/src/fmp4mux/mod.rs index 407debe7..49ebcbf4 100644 --- a/mux/fmp4/src/fmp4mux/mod.rs +++ b/mux/fmp4/src/fmp4mux/mod.rs @@ -73,7 +73,7 @@ pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { Ok(()) } -#[derive(Debug, Copy, Clone)] +#[derive(Debug, Copy, Clone, PartialEq)] pub(crate) enum ImageOrientation { Rotate0, Rotate90,