From a01437b675e43cba2daabacfcdcb50cf7dfdb258 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Dr=C3=B6ge?= Date: Mon, 23 Jan 2023 20:43:26 +0200 Subject: [PATCH] fmp4mux: Add support for sub-fragments / chunking Allow outputting sub-fragments (chunks in CMAF terms) that are shorter than the fragment duration and don't usually start on a keyframe. By this the buffering requirements of the element is reduced to one chunk duration, as is the latency. This is used for formats like low-latency / LL-HLS and DASH. Part-of: --- docs/plugins/gst_plugins_cache.json | 14 + mux/fmp4/src/fmp4mux/boxes.rs | 25 +- mux/fmp4/src/fmp4mux/imp.rs | 1240 +++++++++++++++++++-------- mux/fmp4/src/fmp4mux/mod.rs | 3 + mux/fmp4/tests/tests.rs | 384 +++++++++ 5 files changed, 1300 insertions(+), 366 deletions(-) diff --git a/docs/plugins/gst_plugins_cache.json b/docs/plugins/gst_plugins_cache.json index 02d9bbce..f9587f47 100644 --- a/docs/plugins/gst_plugins_cache.json +++ b/docs/plugins/gst_plugins_cache.json @@ -1728,6 +1728,20 @@ ], "kind": "object", "properties": { + "chunk-duration": { + "blurb": "Duration for each FMP4 chunk (default = no chunks)", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "18446744073709551615", + "max": "18446744073709551615", + "min": "0", + "mutable": "ready", + "readable": true, + "type": "guint64", + "writable": true + }, "fragment-duration": { "blurb": "Duration for each FMP4 fragment", "conditionally-available": false, diff --git a/mux/fmp4/src/fmp4mux/boxes.rs b/mux/fmp4/src/fmp4mux/boxes.rs index 0a2046cf..5d9ff433 100644 --- a/mux/fmp4/src/fmp4mux/boxes.rs +++ b/mux/fmp4/src/fmp4mux/boxes.rs @@ -1570,19 +1570,22 @@ pub(super) fn create_fmp4_fragment_header( ) -> Result<(gst::Buffer, u64), Error> { let mut v = vec![]; - let (brand, compatible_brands) = - brands_from_variant_and_caps(cfg.variant, cfg.streams.iter().map(|s| &s.caps)); + // Don't write a `styp` if this is only a chunk. + if !cfg.chunk { + let (brand, compatible_brands) = + brands_from_variant_and_caps(cfg.variant, cfg.streams.iter().map(|s| &s.caps)); - write_box(&mut v, b"styp", |v| { - // major brand - v.extend(brand); - // minor version - v.extend(0u32.to_be_bytes()); - // compatible brands - v.extend(compatible_brands.into_iter().flatten()); + write_box(&mut v, b"styp", |v| { + // major brand + v.extend(brand); + // minor version + v.extend(0u32.to_be_bytes()); + // compatible brands + v.extend(compatible_brands.into_iter().flatten()); - Ok(()) - })?; + Ok(()) + })?; + } let styp_len = v.len(); diff --git a/mux/fmp4/src/fmp4mux/imp.rs b/mux/fmp4/src/fmp4mux/imp.rs index d0cbe90e..3bdde41a 100644 --- a/mux/fmp4/src/fmp4mux/imp.rs +++ b/mux/fmp4/src/fmp4mux/imp.rs @@ -13,6 +13,7 @@ use gst_base::prelude::*; use gst_base::subclass::prelude::*; use std::collections::VecDeque; +use std::mem; use std::sync::Mutex; use once_cell::sync::Lazy; @@ -85,6 +86,7 @@ static CAT: Lazy = Lazy::new(|| { }); const DEFAULT_FRAGMENT_DURATION: gst::ClockTime = gst::ClockTime::from_seconds(10); +const DEFAULT_CHUNK_DURATION: Option = gst::ClockTime::NONE; const DEFAULT_HEADER_UPDATE_MODE: super::HeaderUpdateMode = super::HeaderUpdateMode::None; const DEFAULT_WRITE_MFRA: bool = false; const DEFAULT_WRITE_MEHD: bool = false; @@ -94,6 +96,7 @@ const DEFAULT_INTERLEAVE_TIME: Option = Some(gst::ClockTime::fro #[derive(Debug, Clone)] struct Settings { fragment_duration: gst::ClockTime, + chunk_duration: Option, header_update_mode: super::HeaderUpdateMode, write_mfra: bool, write_mehd: bool, @@ -107,6 +110,7 @@ impl Default for Settings { fn default() -> Self { Settings { fragment_duration: DEFAULT_FRAGMENT_DURATION, + chunk_duration: DEFAULT_CHUNK_DURATION, header_update_mode: DEFAULT_HEADER_UPDATE_MODE, write_mfra: DEFAULT_WRITE_MFRA, write_mehd: DEFAULT_WRITE_MEHD, @@ -150,6 +154,7 @@ struct PreQueuedBuffer { struct GopBuffer { buffer: gst::Buffer, pts: gst::ClockTime, + pts_position: gst::ClockTime, dts: Option, } @@ -196,6 +201,8 @@ struct Stream { queued_gops: VecDeque, /// Whether the fully queued GOPs are filling a whole fragment. fragment_filled: bool, + /// Whether a whole chunk is queued. + chunk_filled: bool, /// Difference between the first DTS and 0 in case of negative DTS dts_offset: Option, @@ -235,6 +242,11 @@ struct State { /// Start PTS of the current fragment fragment_start_pts: Option, + /// Start PTS of the current chunk + /// + /// This is equal to `fragment_start_pts` if the current chunk is the first of a fragment, + /// and always equal to `fragment_start_pts` if no `chunk_duration` is set. + chunk_start_pts: Option, /// Additional timeout delay in case GOPs are bigger than the fragment duration timeout_delay: gst::ClockTime, @@ -622,6 +634,11 @@ impl FMP4Mux { continue; } + if stream.chunk_filled { + gst::trace!(CAT, obj: stream.sinkpad, "Stream has current chunk filled"); + continue; + } + gst::trace!(CAT, obj: stream.sinkpad, "Stream has running time PTS {} / DTS {} queued", pre_queued_buffer.pts, pre_queued_buffer.dts.display()); let running_time = if stream.delta_frames.requires_dts() { @@ -787,7 +804,12 @@ impl FMP4Mux { end_pts, end_dts, final_end_pts: false, - buffers: vec![GopBuffer { buffer, pts, dts }], + buffers: vec![GopBuffer { + buffer, + pts, + pts_position, + dts, + }], }; stream.queued_gops.push_front(gop); @@ -831,7 +853,12 @@ impl FMP4Mux { gop.end_pts = std::cmp::max(gop.end_pts, end_pts); gop.end_dts = gop.end_dts.opt_max(end_dts); - gop.buffers.push(GopBuffer { buffer, pts, dts }); + gop.buffers.push(GopBuffer { + buffer, + pts, + pts_position, + dts, + }); if delta_frames.requires_dts() { let dts = dts.unwrap(); @@ -914,6 +941,255 @@ impl FMP4Mux { Ok(()) } + fn check_stream_filled( + &self, + settings: &Settings, + stream: &mut Stream, + fragment_start_pts: Option, + chunk_start_pts: Option, + all_eos: bool, + ) { + // Either both are none or neither + let (chunk_start_pts, fragment_start_pts) = match (chunk_start_pts, fragment_start_pts) { + (Some(chunk_start_pts), Some(fragment_start_pts)) => { + (chunk_start_pts, fragment_start_pts) + } + _ => return, + }; + + // Check if this stream is filled enough now. + if let Some(chunk_duration) = settings.chunk_duration { + // In chunk mode + let (gop_idx, gop) = match stream + .queued_gops + .iter() + .enumerate() + .find(|(_idx, gop)| gop.final_earliest_pts || all_eos || stream.sinkpad.is_eos()) + { + Some(res) => res, + None => { + gst::trace!(CAT, obj: stream.sinkpad, "Chunked mode but no GOP with final earliest PTS known yet"); + return; + } + }; + + gst::trace!( + CAT, + obj: stream.sinkpad, + "GOP {gop_idx} start PTS {}, GOP end PTS {} (final {})", + gop.start_pts, + gop.end_pts, + gop.final_end_pts || all_eos || stream.sinkpad.is_eos(), + ); + gst::trace!( + CAT, + obj: stream.sinkpad, + "Current chunk start {}, current fragment start {}", + chunk_start_pts, + fragment_start_pts, + ); + + let chunk_end_pts = chunk_start_pts + chunk_duration; + let fragment_end_pts = fragment_start_pts + settings.fragment_duration; + + gst::trace!( + CAT, + obj: stream.sinkpad, + "Current chunk end {}, current fragment end {}", + chunk_end_pts, + fragment_end_pts, + ); + + // First check if the next split should be the end of a fragment or the end of a chunk. + // If both are the same then a fragment split has preference. + if fragment_end_pts <= chunk_end_pts && gop.start_pts >= fragment_end_pts { + gst::debug!(CAT, obj: stream.sinkpad, "Stream queued enough data for finishing this fragment"); + stream.fragment_filled = true; + } else if chunk_end_pts < fragment_end_pts { + let last_pts = gop.buffers.last().map(|b| b.pts); + + if gop.end_pts >= chunk_end_pts + // only if there's another GOP or at least one further buffer + && (gop_idx > 0 + || last_pts.map_or(false, |last_pts| last_pts.saturating_sub(chunk_start_pts) > chunk_duration)) + { + gst::debug!(CAT, obj: stream.sinkpad, "Stream queued enough data for this chunk"); + stream.chunk_filled = true; + } + } + } else { + let gop = match stream + .queued_gops + .iter() + .find(|gop| gop.final_end_pts || all_eos || stream.sinkpad.is_eos()) + { + Some(gop) => gop, + None => { + gst::trace!(CAT, obj: stream.sinkpad, "Fragment mode but no GOP with final end PTS known yet"); + return; + } + }; + + gst::trace!( + CAT, + obj: stream.sinkpad, + "GOP start PTS {}, GOP end PTS {}", + gop.start_pts, + gop.end_pts, + ); + + // Check if the end of the latest finalized GOP is after the fragment end + let fragment_end_pts = fragment_start_pts + settings.fragment_duration; + gst::trace!( + CAT, + obj: stream.sinkpad, + "Current fragment start{}, current fragment end {}", + fragment_start_pts, + fragment_start_pts + settings.fragment_duration, + ); + + if gop.end_pts >= fragment_end_pts { + gst::debug!(CAT, obj: stream.sinkpad, "Stream queued enough data for this fragment"); + stream.fragment_filled = true; + } + } + } + + fn calculate_earliest_pts( + &self, + settings: &Settings, + state: &mut State, + upstream_events: &mut Vec<(super::FMP4MuxPad, gst::Event)>, + all_eos: bool, + timeout: bool, + ) { + if state.earliest_pts.is_some() { + return; + } + + let fragment_start_pts = state.fragment_start_pts; + let chunk_start_pts = state.chunk_start_pts; + + // Calculate the earliest PTS after queueing input if we can now. + let mut earliest_pts = None; + let mut start_dts = None; + for stream in &state.streams { + let (stream_earliest_pts, stream_start_dts) = match stream.queued_gops.back() { + None => { + if !all_eos && !timeout { + earliest_pts = None; + start_dts = None; + break; + } + continue; + } + Some(oldest_gop) => { + if !all_eos && !timeout && !oldest_gop.final_earliest_pts { + earliest_pts = None; + start_dts = None; + break; + } + + (oldest_gop.earliest_pts, oldest_gop.start_dts) + } + }; + + if earliest_pts.opt_gt(stream_earliest_pts).unwrap_or(true) { + earliest_pts = Some(stream_earliest_pts); + } + + if let Some(stream_start_dts) = stream_start_dts { + if start_dts.opt_gt(stream_start_dts).unwrap_or(true) { + start_dts = Some(stream_start_dts); + } + } + } + + let earliest_pts = match earliest_pts { + Some(earliest_pts) => earliest_pts, + None => return, + }; + + // The earliest PTS is known and as such the start of the first and second fragment. + gst::info!( + CAT, + imp: self, + "Got earliest PTS {}, start DTS {} (timeout: {timeout}, all eos: {all_eos})", + earliest_pts, + start_dts.display() + ); + state.earliest_pts = Some(earliest_pts); + state.start_dts = start_dts; + state.fragment_start_pts = Some(earliest_pts); + state.chunk_start_pts = Some(earliest_pts); + + // Now send force-keyunit events for the second fragment start. + let fku_time = earliest_pts + settings.fragment_duration; + for stream in &state.streams { + let current_position = stream.current_position; + + // In case of ONVIF this needs to be converted back from UTC time to + // the stream's running time + let (fku_time, current_position) = + if self.obj().class().as_ref().variant == super::Variant::ONVIF { + ( + if let Some(fku_time) = utc_time_to_running_time( + fku_time, + stream.running_time_utc_time_mapping.unwrap(), + ) { + fku_time + } else { + continue; + }, + utc_time_to_running_time( + current_position, + stream.running_time_utc_time_mapping.unwrap(), + ), + ) + } else { + (fku_time, Some(current_position)) + }; + + let fku_time = + if current_position.map_or(false, |current_position| current_position > fku_time) { + gst::warning!( + CAT, + obj: stream.sinkpad, + "Sending first force-keyunit event late for running time {} at {}", + fku_time, + current_position.display(), + ); + None + } else { + gst::debug!( + CAT, + obj: stream.sinkpad, + "Sending first force-keyunit event for running time {}", + fku_time, + ); + Some(fku_time) + }; + + let fku = gst_video::UpstreamForceKeyUnitEvent::builder() + .running_time(fku_time) + .all_headers(true) + .build(); + + upstream_events.push((stream.sinkpad.clone(), fku)); + } + + // Check if any of the streams are already filled enough for the first chunk/fragment. + for stream in &mut state.streams { + self.check_stream_filled( + settings, + stream, + fragment_start_pts, + chunk_start_pts, + all_eos, + ); + } + } + #[allow(clippy::type_complexity)] fn drain_buffers( &self, @@ -931,8 +1207,13 @@ impl FMP4Mux { Option, // Minimum start DTS position of all streams (if any stream has DTS) Option, - // End PTS of this drained fragment, i.e. start PTS of the next fragment + // End PTS of this drained fragment or chunk, i.e. start PTS of the next fragment or + // chunk Option, + // With these drained buffers the current fragment is filled + bool, + // These buffers make the start of a new fragment + bool, ), gst::FlowError, > { @@ -941,22 +1222,48 @@ impl FMP4Mux { let mut min_earliest_pts_position = None; let mut min_earliest_pts = None; let mut min_start_dts_position = None; - let mut fragment_end_pts = None; + let mut chunk_end_pts = None; + let mut fragment_start = false; + + // In fragment mode, each chunk is a full fragment. Otherwise, in chunk mode, + // this fragment is filled if it is filled for the first non-EOS stream + let fragment_filled = settings.chunk_duration.is_none() + || state + .streams + .iter() + .find(|s| !s.sinkpad.is_eos()) + .map(|s| s.fragment_filled) + == Some(true); // The first stream decides how much can be dequeued, if anything at all. // - // All complete GOPs (or at EOS everything) up to the fragment duration will be dequeued - // but on timeout in live pipelines it might happen that the first stream does not have a - // complete GOP queued. In that case nothing is dequeued for any of the streams and the - // timeout is advanced by 1s until at least one complete GOP can be dequeued. + // In chunk mode: + // If more than the fragment duration has passed until the latest GOPs earliest PTS then + // the fragment is considered filled and all GOPs until that GOP are drained. The next + // chunk would start a new fragment, and would start with the keyframe at the beginning + // of that latest GOP. + // + // Otherwise if more than a chunk duration is currently queued in GOPs of which the + // earliest PTS is known then drain everything up to that position. If nothing can be + // drained at all then advance the timeout by 1s until something can be dequeued. + // + // Otherwise: + // All complete GOPs (or at EOS everything) up to the fragment duration will be dequeued + // but on timeout in live pipelines it might happen that the first stream does not have a + // complete GOP queued. In that case nothing is dequeued for any of the streams and the + // timeout is advanced by 1s until at least one complete GOP can be dequeued. // // If the first stream is already EOS then the next stream that is not EOS yet will be // taken in its place. gst::info!( CAT, imp: self, - "Starting to drain at {}", - state.fragment_start_pts.display() + "Starting to drain at {} (fragment start {}, fragment end {}, chunk start {}, chunk end {})", + state.chunk_start_pts.display(), + state.fragment_start_pts.display(), + state.fragment_start_pts.map(|start| start + settings.fragment_duration).display(), + state.chunk_start_pts.display(), + Option::zip(state.chunk_start_pts, settings.chunk_duration).map(|(start, duration)| start + duration).display(), ); for (idx, stream) in state.streams.iter_mut().enumerate() { @@ -967,58 +1274,274 @@ impl FMP4Mux { || at_eos || stream.sinkpad.is_eos() || stream.queued_gops.get(1).map(|gop| gop.final_earliest_pts) == Some(true) + || settings.chunk_duration.is_some() ); let mut gops = Vec::with_capacity(stream.queued_gops.len()); if !stream.queued_gops.is_empty() { let fragment_start_pts = state.fragment_start_pts.unwrap(); + let chunk_start_pts = state.chunk_start_pts.unwrap(); - // Drain all complete GOPs until at most one fragment duration was dequeued for the - // first stream, or until the dequeued duration of the first stream. - let dequeue_end_pts = - fragment_end_pts.unwrap_or(fragment_start_pts + settings.fragment_duration); - gst::trace!( - CAT, - obj: stream.sinkpad, - "Draining up to end PTS {} / duration {}", - dequeue_end_pts, - dequeue_end_pts - fragment_start_pts - ); + // For the first stream drain as much as necessary and decide the end of this + // fragment or chunk, for all other streams drain up to that position. + if let Some(chunk_duration) = settings.chunk_duration { + let dequeue_end_pts = if let Some(chunk_end_pts) = chunk_end_pts { + // Not the first stream + chunk_end_pts + } else if fragment_filled { + // Fragment is filled, so only dequeue everything until the latest GOP + fragment_start_pts + settings.fragment_duration + } else { + // Fragment is not filled and we either have a full chunk or timeout + chunk_start_pts + chunk_duration + }; - while let Some(gop) = stream.queued_gops.back() { - // If this GOP is not complete then we can't pop it yet. - // - // If there was no complete GOP at all yet then it might be bigger than the - // fragment duration. In this case we might not be able to handle the latency - // requirements in a live pipeline. - if !gop.final_end_pts && !at_eos && !stream.sinkpad.is_eos() { - break; + gst::trace!( + CAT, + obj: stream.sinkpad, + "Draining up to end PTS {} / duration {}", + dequeue_end_pts, + dequeue_end_pts - chunk_start_pts + ); + + while let Some(gop) = stream.queued_gops.back() { + // If this should be the last chunk of a fragment then only drain every + // finished GOP until the chunk end PTS. If there is no finished GOP for + // this stream (it would be not the first stream then), then drain + // everything up to the chunk end PTS. + // + // If this chunk is not the last chunk of a fragment then simply dequeue + // everything up to the chunk end PTS. + if fragment_filled { + gst::trace!( + CAT, + obj: stream.sinkpad, + "Fragment filled, current GOP start {} end {} (final {})", + gop.start_pts, gop.end_pts, + gop.final_end_pts || at_eos || stream.sinkpad.is_eos() + ); + + if (gop.final_end_pts || at_eos || stream.sinkpad.is_eos()) + && gop.end_pts <= dequeue_end_pts + { + gst::trace!( + CAT, + obj: stream.sinkpad, + "Pushing whole GOP", + ); + gops.push(stream.queued_gops.pop_back().unwrap()); + continue; + } + if !gops.is_empty() { + break; + } + + gst::error!(CAT, obj: stream.sinkpad, "Don't have a full GOP at the end of a fragment"); + } else { + gst::trace!( + CAT, + obj: stream.sinkpad, + "Chunk filled, current GOP start {} end {} (final {})", + gop.start_pts, gop.end_pts, + gop.final_end_pts || at_eos || stream.sinkpad.is_eos() + ); + } + + if gop.end_pts <= dequeue_end_pts + && (gop.final_end_pts || at_eos || stream.sinkpad.is_eos()) + { + gst::trace!( + CAT, + obj: stream.sinkpad, + "Pushing whole GOP", + ); + gops.push(stream.queued_gops.pop_back().unwrap()); + } else if gop.start_pts >= dequeue_end_pts + || (!gop.final_earliest_pts && !at_eos && !stream.sinkpad.is_eos()) + { + gst::trace!( + CAT, + obj: stream.sinkpad, + "GOP starts after chunk end", + ); + break; + } else { + let gop = stream.queued_gops.back_mut().unwrap(); + + let start_pts = gop.start_pts; + let start_dts = gop.start_dts; + let earliest_pts = gop.earliest_pts; + let earliest_pts_position = gop.earliest_pts_position; + + let mut split_index = None; + + for (idx, buffer) in gop.buffers.iter().enumerate() { + if buffer.pts >= dequeue_end_pts { + break; + } + split_index = Some(idx); + } + let split_index = match split_index { + Some(split_index) => split_index, + None => { + // We have B frames and the first buffer of this GOP is too far + // in the future. + gst::trace!( + CAT, + obj: stream.sinkpad, + "First buffer of GOP too far in the future", + ); + break; + } + }; + + // The last buffer of the GOP starts before the chunk end but ends + // after the end. We still take it here and remove the whole GOP. + if split_index == gop.buffers.len() - 1 { + if gop.final_end_pts || at_eos || stream.sinkpad.is_eos() { + gst::trace!( + CAT, + obj: stream.sinkpad, + "Pushing whole GOP", + ); + gops.push(stream.queued_gops.pop_back().unwrap()); + } else { + gst::trace!( + CAT, + obj: stream.sinkpad, + "Can't push whole GOP as it's not final yet", + ); + } + break; + } + + let mut buffers = mem::take(&mut gop.buffers); + // Contains all buffers from `split_index + 1` to the end + gop.buffers = buffers.split_off(split_index + 1); + + gop.start_pts = gop.buffers[0].pts; + gop.start_dts = gop.buffers[0].dts; + gop.earliest_pts_position = gop.buffers[0].pts_position; + gop.earliest_pts = gop.buffers[0].pts; + + gst::trace!( + CAT, + obj: stream.sinkpad, + "Splitting GOP and keeping PTS {}", + gop.buffers[0].pts, + ); + + let queue_gop = Gop { + start_pts, + start_dts, + earliest_pts, + final_earliest_pts: true, + end_pts: gop.start_pts, + final_end_pts: true, + end_dts: gop.start_dts, + earliest_pts_position, + buffers, + }; + + gops.push(queue_gop); + break; + } } - // If this GOP starts after the fragment end then don't dequeue it yet unless this is - // the first stream and no GOPs were dequeued at all yet. This would mean that the - // GOP is bigger than the fragment duration. - if !at_eos - && gop.end_pts > dequeue_end_pts - && (fragment_end_pts.is_some() || !gops.is_empty()) - { - break; + fragment_start = fragment_start_pts == chunk_start_pts; + if fragment_start { + if let Some(first_buffer) = gops.first().and_then(|gop| gop.buffers.first()) + { + if first_buffer + .buffer + .flags() + .contains(gst::BufferFlags::DELTA_UNIT) + { + gst::error!(CAT, obj: stream.sinkpad, "First buffer of a new fragment is not a keyframe"); + } + } + } + } else { + let dequeue_end_pts = if let Some(chunk_end_pts) = chunk_end_pts { + // Not the first stream + chunk_end_pts + } else { + fragment_start_pts + settings.fragment_duration + }; + + gst::trace!( + CAT, + obj: stream.sinkpad, + "Draining up to end PTS {} / duration {}", + dequeue_end_pts, + dequeue_end_pts - chunk_start_pts + ); + + while let Some(gop) = stream.queued_gops.back() { + gst::trace!( + CAT, + obj: stream.sinkpad, + "Current GOP start {} end {} (final {})", + gop.start_pts, gop.end_pts, + gop.final_end_pts || at_eos || stream.sinkpad.is_eos() + ); + + // If this GOP is not complete then we can't pop it yet. + // + // If there was no complete GOP at all yet then it might be bigger than the + // fragment duration. In this case we might not be able to handle the latency + // requirements in a live pipeline. + if !gop.final_end_pts && !at_eos && !stream.sinkpad.is_eos() { + gst::trace!( + CAT, + obj: stream.sinkpad, + "Not including GOP without final end PTS", + ); + break; + } + + // If this GOP starts after the fragment end then don't dequeue it yet unless this is + // the first stream and no GOPs were dequeued at all yet. This would mean that the + // GOP is bigger than the fragment duration. + if !at_eos + && gop.end_pts > dequeue_end_pts + && (chunk_end_pts.is_some() || !gops.is_empty()) + { + gst::trace!( + CAT, + obj: stream.sinkpad, + "Not including GOP yet", + ); + break; + } + + gst::trace!( + CAT, + obj: stream.sinkpad, + "Pushing complete GOP", + ); + gops.push(stream.queued_gops.pop_back().unwrap()); } - gops.push(stream.queued_gops.pop_back().unwrap()); + // If we don't have a next chunk start PTS then this is the first stream as above. + if chunk_end_pts.is_none() { + // In fragment mode, each chunk is a full fragment + fragment_start = true; + } } } stream.fragment_filled = false; + stream.chunk_filled = false; - // If we don't have a next fragment start PTS then this is the first stream as above. - if fragment_end_pts.is_none() { + // If we don't have a next chunk start PTS then this is the first stream as above. + if chunk_end_pts.is_none() { if let Some(last_gop) = gops.last() { // Dequeued something so let's take the end PTS of the last GOP - fragment_end_pts = Some(last_gop.end_pts); + chunk_end_pts = Some(last_gop.end_pts); gst::info!( CAT, obj: stream.sinkpad, - "Draining up to PTS {} for this fragment", + "Draining up to PTS {} for this chunk", last_gop.end_pts, ); } else { @@ -1030,11 +1553,19 @@ impl FMP4Mux { // Otherwise this can only really happen on timeout in live pipelines. assert!(timeout); - gst::warning!( - CAT, - obj: stream.sinkpad, - "Don't have a complete GOP for the first stream on timeout in a live pipeline", - ); + if settings.chunk_duration.is_some() { + gst::warning!( + CAT, + obj: stream.sinkpad, + "Don't have anything to drain for the first stream on timeout in a live pipeline", + ); + } else { + gst::warning!( + CAT, + obj: stream.sinkpad, + "Don't have a complete GOP for the first stream on timeout in a live pipeline", + ); + } // In this case we advance the timeout by 1s and hope that things are // better then. @@ -1043,10 +1574,9 @@ impl FMP4Mux { } } else if at_eos { if let Some(last_gop) = gops.last() { - if fragment_end_pts - .map_or(true, |fragment_end_pts| fragment_end_pts < last_gop.end_pts) + if chunk_end_pts.map_or(true, |chunk_end_pts| chunk_end_pts < last_gop.end_pts) { - fragment_end_pts = Some(last_gop.end_pts); + chunk_end_pts = Some(last_gop.end_pts); } } } @@ -1071,7 +1601,7 @@ impl FMP4Mux { continue; } - assert!(fragment_end_pts.is_some()); + assert!(chunk_end_pts.is_some()); if let Some((prev_gop, first_gop)) = Option::zip( stream.queued_gops.iter().find(|gop| gop.final_end_pts), @@ -1116,7 +1646,7 @@ impl FMP4Mux { while let Some(buffer) = gop_buffers.next() { // If this is a GAP buffer then skip it. Its duration was already considered // below for the non-GAP buffer preceding it, and if there was none then the - // fragment start would be adjusted accordingly for this stream. + // chunk start would be adjusted accordingly for this stream. if buffer.buffer.flags().contains(gst::BufferFlags::GAP) && buffer.buffer.flags().contains(gst::BufferFlags::DROPPABLE) && buffer.buffer.size() == 0 @@ -1277,7 +1807,9 @@ impl FMP4Mux { min_earliest_pts_position, min_earliest_pts, min_start_dts_position, - fragment_end_pts, + chunk_end_pts, + fragment_filled, + fragment_start, )) } @@ -1348,6 +1880,9 @@ impl FMP4Mux { Ok((interleaved_buffers, streams)) } + /// Fills upstream events as needed and returns the caps the first time draining can happen. + /// + /// If it returns `(_, None)` then there's currently nothing to drain anymore. fn drain( &self, state: &mut State, @@ -1362,10 +1897,15 @@ impl FMP4Mux { gst::info!(CAT, imp: self, "Draining at timeout"); } else { for stream in &state.streams { - if !stream.fragment_filled && !stream.sinkpad.is_eos() { + if !stream.chunk_filled && !stream.fragment_filled && !stream.sinkpad.is_eos() { return Ok((None, None)); } } + gst::info!( + CAT, + imp: self, + "Draining because all streams have enough data queued" + ); } // Collect all buffers and their timing information that are to be drained right now. @@ -1374,7 +1914,9 @@ impl FMP4Mux { min_earliest_pts_position, min_earliest_pts, min_start_dts_position, - fragment_end_pts, + chunk_end_pts, + fragment_filled, + fragment_start, ) = self.drain_buffers(state, settings, timeout, at_eos)?; // Create header now if it was not created before and return the caps @@ -1413,7 +1955,7 @@ impl FMP4Mux { // bufferlist for all buffers that have to be output. let min_earliest_pts_position = min_earliest_pts_position.unwrap(); let min_earliest_pts = min_earliest_pts.unwrap(); - let fragment_end_pts = fragment_end_pts.unwrap(); + let chunk_end_pts = chunk_end_pts.unwrap(); let mut fmp4_header = None; if !state.sent_headers { @@ -1440,11 +1982,16 @@ impl FMP4Mux { state.sequence_number = 1; } let sequence_number = state.sequence_number; - state.sequence_number += 1; + // If this is the last chunk of a fragment then increment the sequence number for the + // start of the next fragment. + if fragment_filled { + state.sequence_number += 1; + } let (mut fmp4_fragment_header, moof_offset) = boxes::create_fmp4_fragment_header(super::FragmentHeaderConfiguration { variant: self.obj().class().as_ref().variant, sequence_number, + chunk: !fragment_start, streams: streams.as_slice(), buffers: interleaved_buffers.as_slice(), }) @@ -1462,10 +2009,14 @@ impl FMP4Mux { let buffer = fmp4_fragment_header.get_mut().unwrap(); buffer.set_pts(min_earliest_pts_position); buffer.set_dts(min_start_dts_position); - buffer.set_duration(fragment_end_pts.checked_sub(min_earliest_pts)); + buffer.set_duration(chunk_end_pts.checked_sub(min_earliest_pts)); - // Fragment header is HEADER + // Fragment and chunk header is HEADER buffer.set_flags(gst::BufferFlags::HEADER); + // Chunk header is DELTA_UNIT + if !fragment_start { + buffer.set_flags(gst::BufferFlags::DELTA_UNIT); + } // Copy metas from the first actual buffer to the fragment header. This allows // getting things like the reference timestamp meta or the timecode meta to identify @@ -1506,9 +2057,9 @@ impl FMP4Mux { .collect::(), ); - if settings.write_mfra { - // Write mfra only for the main stream, and if there are no buffers for the main stream - // in this segment then don't write anything. + if settings.write_mfra && fragment_start { + // Write mfra only for the main stream on fragment starts, and if there are no + // buffers for the main stream in this segment then don't write anything. if let Some(super::FragmentHeaderStream { start_time: Some(start_time), .. @@ -1521,98 +2072,82 @@ impl FMP4Mux { } } - state.end_pts = Some(fragment_end_pts); + state.end_pts = Some(chunk_end_pts); - // Update for the start PTS of the next fragment - gst::info!( - CAT, - imp: self, - "Starting new fragment at {}", - fragment_end_pts, - ); - state.fragment_start_pts = Some(fragment_end_pts); + // Update for the start PTS of the next fragment / chunk - let fku_time = fragment_end_pts + settings.fragment_duration; + if fragment_filled { + state.fragment_start_pts = Some(chunk_end_pts); + gst::info!(CAT, imp: self, "Starting new fragment at {}", chunk_end_pts,); + } else { + gst::info!(CAT, imp: self, "Starting new chunk at {}", chunk_end_pts,); + } + state.chunk_start_pts = Some(chunk_end_pts); - for stream in &state.streams { - let current_position = stream.current_position; + // If the current fragment is filled we already have the next fragment's start + // keyframe and can request the following one. + if fragment_filled { + let fku_time = chunk_end_pts + settings.fragment_duration; - // In case of ONVIF this needs to be converted back from UTC time to - // the stream's running time - let (fku_time, current_position) = - if self.obj().class().as_ref().variant == super::Variant::ONVIF { - ( - if let Some(fku_time) = utc_time_to_running_time( - fku_time, - stream.running_time_utc_time_mapping.unwrap(), - ) { - fku_time - } else { - continue; - }, - utc_time_to_running_time( - current_position, - stream.running_time_utc_time_mapping.unwrap(), - ), - ) + for stream in &state.streams { + let current_position = stream.current_position; + + // In case of ONVIF this needs to be converted back from UTC time to + // the stream's running time + let (fku_time, current_position) = + if self.obj().class().as_ref().variant == super::Variant::ONVIF { + ( + if let Some(fku_time) = utc_time_to_running_time( + fku_time, + stream.running_time_utc_time_mapping.unwrap(), + ) { + fku_time + } else { + continue; + }, + utc_time_to_running_time( + current_position, + stream.running_time_utc_time_mapping.unwrap(), + ), + ) + } else { + (fku_time, Some(current_position)) + }; + + let fku_time = if current_position + .map_or(false, |current_position| current_position > fku_time) + { + gst::warning!( + CAT, + obj: stream.sinkpad, + "Sending force-keyunit event late for running time {} at {}", + fku_time, + current_position.display(), + ); + None } else { - (fku_time, Some(current_position)) + gst::debug!( + CAT, + obj: stream.sinkpad, + "Sending force-keyunit event for running time {}", + fku_time, + ); + Some(fku_time) }; - let fku_time = if current_position - .map_or(false, |current_position| current_position > fku_time) - { - gst::warning!( - CAT, - obj: stream.sinkpad, - "Sending force-keyunit event late for running time {} at {}", - fku_time, - current_position.display(), - ); - None - } else { - gst::debug!( - CAT, - obj: stream.sinkpad, - "Sending force-keyunit event for running time {}", - fku_time, - ); - Some(fku_time) - }; + let fku = gst_video::UpstreamForceKeyUnitEvent::builder() + .running_time(fku_time) + .all_headers(true) + .build(); - let fku = gst_video::UpstreamForceKeyUnitEvent::builder() - .running_time(fku_time) - .all_headers(true) - .build(); - - upstream_events.push((stream.sinkpad.clone(), fku)); + upstream_events.push((stream.sinkpad.clone(), fku)); + } } - // Reset timeout delay now that we've output an actual fragment + // Reset timeout delay now that we've output an actual fragment or chunk state.timeout_delay = gst::ClockTime::ZERO; } - if settings.write_mfra && at_eos { - gst::debug!(CAT, imp: self, "Writing mfra box"); - match boxes::create_mfra(&streams[0].caps, &state.fragment_offsets) { - Ok(mut mfra) => { - { - let mfra = mfra.get_mut().unwrap(); - // mfra is HEADER|DELTA_UNIT like other boxes - mfra.set_flags(gst::BufferFlags::HEADER | gst::BufferFlags::DELTA_UNIT); - } - - if buffer_list.is_none() { - buffer_list = Some(gst::BufferList::new_sized(1)); - } - buffer_list.as_mut().unwrap().get_mut().unwrap().add(mfra); - } - Err(err) => { - gst::error!(CAT, imp: self, "Failed to create mfra box: {}", err); - } - } - } - // TODO: Write edit list at EOS // TODO: Rewrite bitrates at EOS @@ -1689,6 +2224,7 @@ impl FMP4Mux { pre_queue: VecDeque::new(), queued_gops: VecDeque::new(), fragment_filled: false, + chunk_filled: false, dts_offset: None, current_position: gst::ClockTime::ZERO, running_time_utc_time_mapping: None, @@ -1824,13 +2360,18 @@ impl ObjectImpl for FMP4Mux { fn properties() -> &'static [glib::ParamSpec] { static PROPERTIES: Lazy> = Lazy::new(|| { vec![ - // TODO: Add chunk-duration property separate from fragment-size glib::ParamSpecUInt64::builder("fragment-duration") .nick("Fragment Duration") .blurb("Duration for each FMP4 fragment") .default_value(DEFAULT_FRAGMENT_DURATION.nseconds()) .mutable_ready() .build(), + glib::ParamSpecUInt64::builder("chunk-duration") + .nick("Chunk Duration") + .blurb("Duration for each FMP4 chunk (default = no chunks)") + .default_value(u64::MAX) + .mutable_ready() + .build(), glib::ParamSpecEnum::builder::("header-update-mode", DEFAULT_HEADER_UPDATE_MODE) .nick("Header update mode") .blurb("Mode for updating the header at the end of the stream") @@ -1878,8 +2419,24 @@ impl ObjectImpl for FMP4Mux { let fragment_duration = value.get().expect("type checked upstream"); if settings.fragment_duration != fragment_duration { settings.fragment_duration = fragment_duration; + let latency = settings + .chunk_duration + .unwrap_or(settings.fragment_duration); drop(settings); - self.obj().set_latency(fragment_duration, None); + self.obj().set_latency(latency, None); + } + } + + "chunk-duration" => { + let mut settings = self.settings.lock().unwrap(); + let chunk_duration = value.get().expect("type checked upstream"); + if settings.chunk_duration != chunk_duration { + settings.chunk_duration = chunk_duration; + let latency = settings + .chunk_duration + .unwrap_or(settings.fragment_duration); + drop(settings); + self.obj().set_latency(latency, None); } } @@ -1930,6 +2487,11 @@ impl ObjectImpl for FMP4Mux { settings.fragment_duration.to_value() } + "chunk-duration" => { + let settings = self.settings.lock().unwrap(); + settings.chunk_duration.to_value() + } + "header-update-mode" => { let settings = self.settings.lock().unwrap(); settings.header_update_mode.to_value() @@ -2011,7 +2573,7 @@ impl ElementImpl for FMP4Mux { impl AggregatorImpl for FMP4Mux { fn next_time(&self) -> Option { let state = self.state.lock().unwrap(); - state.fragment_start_pts.opt_add(state.timeout_delay) + state.chunk_start_pts.opt_add(state.timeout_delay) } fn sink_query( @@ -2193,7 +2755,9 @@ impl AggregatorImpl for FMP4Mux { let mut upstream_events = vec![]; let all_eos; - let (caps, buffers) = { + let mut caps = None; + let mut buffers = vec![]; + { let mut state = self.state.lock().unwrap(); // Create streams @@ -2206,6 +2770,7 @@ impl AggregatorImpl for FMP4Mux { // Always take a buffer from the stream with the earliest queued buffer to keep the // fill-level at all sinkpads in sync. let fragment_start_pts = state.fragment_start_pts; + let chunk_start_pts = state.chunk_start_pts; while let Some((idx, stream)) = self.find_earliest_stream(&mut state, timeout, settings.fragment_duration)? @@ -2221,176 +2786,140 @@ impl AggregatorImpl for FMP4Mux { self.queue_gops(idx, stream, pre_queued_buffer)?; // Check if this stream is filled enough now. - if let Some((queued_end_pts, fragment_start_pts)) = Option::zip( - stream - .queued_gops - .iter() - .find(|gop| gop.final_end_pts) - .map(|gop| gop.end_pts), + self.check_stream_filled( + &settings, + stream, fragment_start_pts, - ) { - if queued_end_pts.saturating_sub(fragment_start_pts) - >= settings.fragment_duration - { - gst::debug!(CAT, obj: stream.sinkpad, "Stream queued enough data for this fragment"); - stream.fragment_filled = true; - } - } + chunk_start_pts, + false, + ); } all_eos = state.streams.iter().all(|stream| stream.sinkpad.is_eos()); if all_eos { gst::debug!(CAT, imp: self, "All streams are EOS now"); - } - // Calculate the earliest PTS after queueing input if we can now. - if state.earliest_pts.is_none() { - let mut earliest_pts = None; - let mut start_dts = None; - - for stream in &state.streams { - let (stream_earliest_pts, stream_start_dts) = match stream.queued_gops.back() { - None => { - if !all_eos && !timeout { - earliest_pts = None; - start_dts = None; - break; - } - continue; - } - Some(oldest_gop) => { - if !all_eos && !timeout && !oldest_gop.final_earliest_pts { - earliest_pts = None; - start_dts = None; - break; - } - - (oldest_gop.earliest_pts, oldest_gop.start_dts) - } - }; - - if earliest_pts.opt_gt(stream_earliest_pts).unwrap_or(true) { - earliest_pts = Some(stream_earliest_pts); - } - - if let Some(stream_start_dts) = stream_start_dts { - if start_dts.opt_gt(stream_start_dts).unwrap_or(true) { - start_dts = Some(stream_start_dts); - } - } - } - - if let Some(earliest_pts) = earliest_pts { - gst::info!( - CAT, - imp: self, - "Got earliest PTS {}, start DTS {} (timeout: {timeout}, all eos: {all_eos})", - earliest_pts, - start_dts.display() + for stream in &mut state.streams { + // Check if this stream is filled enough now that everything is EOS. + self.check_stream_filled( + &settings, + stream, + fragment_start_pts, + chunk_start_pts, + true, ); - state.earliest_pts = Some(earliest_pts); - state.start_dts = start_dts; - state.fragment_start_pts = Some(earliest_pts); - - let fku_time = earliest_pts + settings.fragment_duration; - - for stream in &mut state.streams { - let current_position = stream.current_position; - - // In case of ONVIF this needs to be converted back from UTC time to - // the stream's running time - let (fku_time, current_position) = - if self.obj().class().as_ref().variant == super::Variant::ONVIF { - ( - if let Some(fku_time) = utc_time_to_running_time( - fku_time, - stream.running_time_utc_time_mapping.unwrap(), - ) { - fku_time - } else { - continue; - }, - utc_time_to_running_time( - current_position, - stream.running_time_utc_time_mapping.unwrap(), - ), - ) - } else { - (fku_time, Some(current_position)) - }; - - let fku_time = if current_position - .map_or(false, |current_position| current_position > fku_time) - { - gst::warning!( - CAT, - obj: stream.sinkpad, - "Sending first force-keyunit event late for running time {} at {}", - fku_time, - current_position.display(), - ); - None - } else { - gst::debug!( - CAT, - obj: stream.sinkpad, - "Sending first force-keyunit event for running time {}", - fku_time, - ); - Some(fku_time) - }; - - let fku = gst_video::UpstreamForceKeyUnitEvent::builder() - .running_time(fku_time) - .all_headers(true) - .build(); - - upstream_events.push((stream.sinkpad.clone(), fku)); - - // Check if this stream is filled enough now. - if let Some(queued_end_pts) = stream - .queued_gops - .iter() - .find(|gop| gop.final_end_pts) - .map(|gop| gop.end_pts) - { - if queued_end_pts.saturating_sub(earliest_pts) - >= settings.fragment_duration - { - gst::debug!(CAT, obj: stream.sinkpad, "Stream queued enough data for this fragment"); - stream.fragment_filled = true; - } - } - } } } - // If enough GOPs were queued, drain and create the output fragment - match self.drain( - &mut state, + // Calculate the earliest PTS, i.e. the start of the first fragment, if not known yet. + self.calculate_earliest_pts( &settings, - timeout, - all_eos, + &mut state, &mut upstream_events, - ) { - Ok(res) => res, - Err(gst_base::AGGREGATOR_FLOW_NEED_DATA) => { - gst::element_imp_warning!( - self, - gst::StreamError::Format, - ["Longer GOPs than fragment duration"] - ); - state.timeout_delay += 1.seconds(); + all_eos, + timeout, + ); - drop(state); - for (sinkpad, event) in upstream_events { - sinkpad.push_event(event); + // Loop as long as new chunks can be drained. + // Only the first iteration is considered a timeout. + let mut timeout = timeout; + loop { + // If enough GOPs were queued, drain and create the output fragment or chunk + let res = self.drain( + &mut state, + &settings, + timeout, + all_eos, + &mut upstream_events, + ); + let mut buffer_list = match res { + Ok((new_caps, buffer_list)) => { + if caps.is_none() { + caps = new_caps; + } + + buffer_list } - return Err(gst_base::AGGREGATOR_FLOW_NEED_DATA); + Err(err) => { + if err == gst_base::AGGREGATOR_FLOW_NEED_DATA { + assert!(!all_eos); + assert!(timeout); + gst::element_imp_warning!( + self, + gst::StreamError::Format, + ["Longer GOPs than fragment duration"] + ); + state.timeout_delay += 1.seconds(); + } + + // Although we had an error, push out everything that was produced so far + drop(state); + for (sinkpad, event) in upstream_events { + sinkpad.push_event(event); + } + + if let Some(caps) = caps { + gst::debug!(CAT, imp: self, "Setting caps on source pad: {:?}", caps); + self.obj().set_src_caps(&caps); + } + + for buffer_list in buffers { + gst::trace!(CAT, imp: self, "Pushing buffer list {:?}", buffer_list); + self.obj().finish_buffer_list(buffer_list)?; + } + + return Err(err); + } + }; + + // If nothing can't be drained anymore then break the loop, and if all streams are + // EOS do EOS handling. + if buffer_list.is_none() { + if settings.write_mfra && all_eos { + gst::debug!(CAT, imp: self, "Writing mfra box"); + match boxes::create_mfra(&state.streams[0].caps, &state.fragment_offsets) { + Ok(mut mfra) => { + { + let mfra = mfra.get_mut().unwrap(); + // mfra is DELTA_UNIT like other buffers + mfra.set_flags(gst::BufferFlags::DELTA_UNIT); + } + + if buffer_list.is_none() { + buffer_list = Some(gst::BufferList::new_sized(1)); + } + buffer_list.as_mut().unwrap().get_mut().unwrap().add(mfra); + buffers.extend(buffer_list); + } + Err(err) => { + gst::error!(CAT, imp: self, "Failed to create mfra box: {}", err); + } + } + } + + break; + } + + // Otherwise extend the list of bufferlists and check again if something can be + // drained. + buffers.extend(buffer_list); + + timeout = false; + + let fragment_start_pts = state.fragment_start_pts; + let chunk_start_pts = state.chunk_start_pts; + for stream in &mut state.streams { + // Check if this stream is still filled enough now. + self.check_stream_filled( + &settings, + stream, + fragment_start_pts, + chunk_start_pts, + all_eos, + ); } - Err(err) => return Err(err), } - }; + } for (sinkpad, event) in upstream_events { sinkpad.push_event(event); @@ -2401,54 +2930,38 @@ impl AggregatorImpl for FMP4Mux { self.obj().set_src_caps(&caps); } - if let Some(buffers) = buffers { - gst::trace!(CAT, imp: self, "Pushing buffer list {:?}", buffers); - self.obj().finish_buffer_list(buffers)?; + for buffer_list in buffers { + gst::trace!(CAT, imp: self, "Pushing buffer list {:?}", buffer_list); + self.obj().finish_buffer_list(buffer_list)?; } - if all_eos { - gst::debug!(CAT, imp: self, "Doing EOS handling"); + if !all_eos { + return Ok(gst::FlowSuccess::Ok); + } - if settings.header_update_mode != super::HeaderUpdateMode::None { - let updated_header = - self.update_header(&mut self.state.lock().unwrap(), &settings, true); - match updated_header { - Ok(Some((buffer_list, caps))) => { - match settings.header_update_mode { - super::HeaderUpdateMode::None => unreachable!(), - super::HeaderUpdateMode::Rewrite => { - let mut q = gst::query::Seeking::new(gst::Format::Bytes); - if self.obj().src_pad().peer_query(&mut q) && q.result().0 { - let aggregator = self.obj(); + // Do remaining EOS handling after the end of the stream was pushed. + gst::debug!(CAT, imp: self, "Doing EOS handling"); - aggregator.set_src_caps(&caps); - - // Seek to the beginning with a default bytes segment - aggregator - .update_segment( - &gst::FormattedSegment::::new(), - ); - - if let Err(err) = aggregator.finish_buffer_list(buffer_list) { - gst::error!( - CAT, - imp: self, - "Failed pushing updated header buffer downstream: {:?}", - err, - ); - } - } else { - gst::error!( - CAT, - imp: self, - "Can't rewrite header because downstream is not seekable" - ); - } - } - super::HeaderUpdateMode::Update => { + if settings.header_update_mode != super::HeaderUpdateMode::None { + let updated_header = + self.update_header(&mut self.state.lock().unwrap(), &settings, true); + match updated_header { + Ok(Some((buffer_list, caps))) => { + match settings.header_update_mode { + super::HeaderUpdateMode::None => unreachable!(), + super::HeaderUpdateMode::Rewrite => { + let mut q = gst::query::Seeking::new(gst::Format::Bytes); + if self.obj().src_pad().peer_query(&mut q) && q.result().0 { let aggregator = self.obj(); aggregator.set_src_caps(&caps); + + // Seek to the beginning with a default bytes segment + aggregator + .update_segment( + &gst::FormattedSegment::::new(), + ); + if let Err(err) = aggregator.finish_buffer_list(buffer_list) { gst::error!( CAT, @@ -2457,28 +2970,45 @@ impl AggregatorImpl for FMP4Mux { err, ); } + } else { + gst::error!( + CAT, + imp: self, + "Can't rewrite header because downstream is not seekable" + ); + } + } + super::HeaderUpdateMode::Update => { + let aggregator = self.obj(); + + aggregator.set_src_caps(&caps); + if let Err(err) = aggregator.finish_buffer_list(buffer_list) { + gst::error!( + CAT, + imp: self, + "Failed pushing updated header buffer downstream: {:?}", + err, + ); } } } - Ok(None) => {} - Err(err) => { - gst::error!( - CAT, - imp: self, - "Failed to generate updated header: {:?}", - err - ); - } + } + Ok(None) => {} + Err(err) => { + gst::error!( + CAT, + imp: self, + "Failed to generate updated header: {:?}", + err + ); } } - - // Need to output new headers if started again after EOS - self.state.lock().unwrap().sent_headers = false; - - Err(gst::FlowError::Eos) - } else { - Ok(gst::FlowSuccess::Ok) } + + // Need to output new headers if started again after EOS + self.state.lock().unwrap().sent_headers = false; + + Err(gst::FlowError::Eos) } } diff --git a/mux/fmp4/src/fmp4mux/mod.rs b/mux/fmp4/src/fmp4mux/mod.rs index 396d8b81..4d513a59 100644 --- a/mux/fmp4/src/fmp4mux/mod.rs +++ b/mux/fmp4/src/fmp4mux/mod.rs @@ -110,6 +110,9 @@ pub(crate) struct FragmentHeaderConfiguration<'a> { /// Sequence number for this fragment. sequence_number: u32, + /// If this is a full fragment or only a chunk. + chunk: bool, + streams: &'a [FragmentHeaderStream], buffers: &'a [Buffer], } diff --git a/mux/fmp4/tests/tests.rs b/mux/fmp4/tests/tests.rs index bb1b794d..9af83898 100644 --- a/mux/fmp4/tests/tests.rs +++ b/mux/fmp4/tests/tests.rs @@ -1,3 +1,4 @@ +// Copyright (C) 2021 Sebastian Dröge // // This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. // If a copy of the MPL was not distributed with this file, You can obtain one at @@ -1285,3 +1286,386 @@ fn test_buffer_multi_stream_short_gops() { let ev = h1.pull_event().unwrap(); assert_eq!(ev.type_(), gst::EventType::Eos); } + +#[test] +fn test_chunking_single_stream() { + init(); + + let caps = gst::Caps::builder("video/x-h264") + .field("width", 1920i32) + .field("height", 1080i32) + .field("framerate", gst::Fraction::new(30, 1)) + .field("stream-format", "avc") + .field("alignment", "au") + .field("codec_data", gst::Buffer::with_size(1).unwrap()) + .build(); + + let mut h = gst_check::Harness::new("cmafmux"); + + // 5s fragment duration, 1s chunk duration + h.element() + .unwrap() + .set_property("fragment-duration", 5.seconds()); + h.element() + .unwrap() + .set_property("chunk-duration", 1.seconds()); + + h.set_src_caps(caps); + h.play(); + + // Push 15 buffers of 0.5s each, 1st and 11th buffer without DELTA_UNIT flag + for i in 0..15 { + let mut buffer = gst::Buffer::with_size(1).unwrap(); + { + let buffer = buffer.get_mut().unwrap(); + buffer.set_pts(i * 500.mseconds()); + buffer.set_dts(i * 500.mseconds()); + buffer.set_duration(500.mseconds()); + if i != 0 && i != 10 { + buffer.set_flags(gst::BufferFlags::DELTA_UNIT); + } + } + assert_eq!(h.push(buffer), Ok(gst::FlowSuccess::Ok)); + + if i == 2 { + let ev = loop { + let ev = h.pull_upstream_event().unwrap(); + if ev.type_() != gst::EventType::Reconfigure + && ev.type_() != gst::EventType::Latency + { + break ev; + } + }; + + assert_eq!(ev.type_(), gst::EventType::CustomUpstream); + assert_eq!( + gst_video::UpstreamForceKeyUnitEvent::parse(&ev).unwrap(), + gst_video::UpstreamForceKeyUnitEvent { + running_time: Some(5.seconds()), + all_headers: true, + count: 0 + } + ); + } + } + + // Crank the clock: this should bring us to the end of the first fragment + h.crank_single_clock_wait().unwrap(); + + let header = h.pull().unwrap(); + assert_eq!( + header.flags(), + gst::BufferFlags::HEADER | gst::BufferFlags::DISCONT + ); + assert_eq!(header.pts(), Some(gst::ClockTime::ZERO)); + assert_eq!(header.dts(), Some(gst::ClockTime::ZERO)); + + // There should be 7 chunks now, and the 1st and 6th are starting a fragment. + // Each chunk should have two buffers. + for chunk in 0..7 { + let chunk_header = h.pull().unwrap(); + if chunk == 0 || chunk == 5 { + assert_eq!(chunk_header.flags(), gst::BufferFlags::HEADER); + } else { + assert_eq!( + chunk_header.flags(), + gst::BufferFlags::HEADER | gst::BufferFlags::DELTA_UNIT + ); + } + assert_eq!(chunk_header.pts(), Some(chunk * 1.seconds())); + assert_eq!(chunk_header.dts(), Some(chunk * 1.seconds())); + assert_eq!(chunk_header.duration(), Some(1.seconds())); + + for buffer_idx in 0..2 { + let buffer = h.pull().unwrap(); + if buffer_idx == 1 { + assert_eq!( + buffer.flags(), + gst::BufferFlags::DELTA_UNIT | gst::BufferFlags::MARKER + ); + } else { + assert_eq!(buffer.flags(), gst::BufferFlags::DELTA_UNIT); + } + assert_eq!( + buffer.pts(), + Some((chunk * 2 + buffer_idx) * 500.mseconds()) + ); + assert_eq!( + buffer.dts(), + Some((chunk * 2 + buffer_idx) * 500.mseconds()) + ); + assert_eq!(buffer.duration(), Some(500.mseconds())); + } + } + + h.push_event(gst::event::Eos::new()); + + // There should be the remaining chunk now, containing one 500ms buffer. + for chunk in 7..8 { + let chunk_header = h.pull().unwrap(); + assert_eq!( + chunk_header.flags(), + gst::BufferFlags::HEADER | gst::BufferFlags::DELTA_UNIT + ); + assert_eq!(chunk_header.pts(), Some(chunk * 1.seconds())); + assert_eq!(chunk_header.dts(), Some(chunk * 1.seconds())); + assert_eq!(chunk_header.duration(), Some(500.mseconds())); + + for buffer_idx in 0..1 { + let buffer = h.pull().unwrap(); + assert_eq!( + buffer.flags(), + gst::BufferFlags::DELTA_UNIT | gst::BufferFlags::MARKER + ); + assert_eq!( + buffer.pts(), + Some((chunk * 2 + buffer_idx) * 500.mseconds()) + ); + assert_eq!( + buffer.dts(), + Some((chunk * 2 + buffer_idx) * 500.mseconds()) + ); + assert_eq!(buffer.duration(), Some(500.mseconds())); + } + } + + let ev = h.pull_event().unwrap(); + assert_eq!(ev.type_(), gst::EventType::StreamStart); + let ev = h.pull_event().unwrap(); + assert_eq!(ev.type_(), gst::EventType::Caps); + let ev = h.pull_event().unwrap(); + assert_eq!(ev.type_(), gst::EventType::Segment); + let ev = h.pull_event().unwrap(); + assert_eq!(ev.type_(), gst::EventType::Eos); +} + +#[test] +fn test_chunking_multi_stream() { + init(); + + let mut h1 = gst_check::Harness::with_padnames("isofmp4mux", Some("sink_0"), Some("src")); + let mut h2 = gst_check::Harness::with_element(&h1.element().unwrap(), Some("sink_1"), None); + + // 5s fragment duration, 1s chunk duration + h1.element() + .unwrap() + .set_property("fragment-duration", 5.seconds()); + h1.element() + .unwrap() + .set_property("chunk-duration", 1.seconds()); + + h1.set_src_caps( + gst::Caps::builder("video/x-h264") + .field("width", 1920i32) + .field("height", 1080i32) + .field("framerate", gst::Fraction::new(30, 1)) + .field("stream-format", "avc") + .field("alignment", "au") + .field("codec_data", gst::Buffer::with_size(1).unwrap()) + .build(), + ); + h1.play(); + + h2.set_src_caps( + gst::Caps::builder("audio/mpeg") + .field("mpegversion", 4i32) + .field("channels", 1i32) + .field("rate", 44100i32) + .field("stream-format", "raw") + .field("base-profile", "lc") + .field("profile", "lc") + .field("level", "2") + .field( + "codec_data", + gst::Buffer::from_slice([0x12, 0x08, 0x56, 0xe5, 0x00]), + ) + .build(), + ); + h2.play(); + + let output_offset = (60 * 60 * 1000).seconds(); + + // Push 15 buffers of 0.5s each, 1st and 11th buffer without DELTA_UNIT flag + for i in 0..15 { + let mut buffer = gst::Buffer::with_size(1).unwrap(); + { + let buffer = buffer.get_mut().unwrap(); + buffer.set_pts(i * 500.mseconds()); + buffer.set_dts(i * 500.mseconds()); + buffer.set_duration(500.mseconds()); + if i != 0 && i != 10 { + buffer.set_flags(gst::BufferFlags::DELTA_UNIT); + } + } + assert_eq!(h1.push(buffer), Ok(gst::FlowSuccess::Ok)); + + let mut buffer = gst::Buffer::with_size(1).unwrap(); + { + let buffer = buffer.get_mut().unwrap(); + buffer.set_pts(i * 500.mseconds()); + buffer.set_dts(i * 500.mseconds()); + buffer.set_duration(500.mseconds()); + } + assert_eq!(h2.push(buffer), Ok(gst::FlowSuccess::Ok)); + + if i == 2 { + let ev = loop { + let ev = h1.pull_upstream_event().unwrap(); + if ev.type_() != gst::EventType::Reconfigure + && ev.type_() != gst::EventType::Latency + { + break ev; + } + }; + + assert_eq!(ev.type_(), gst::EventType::CustomUpstream); + assert_eq!( + gst_video::UpstreamForceKeyUnitEvent::parse(&ev).unwrap(), + gst_video::UpstreamForceKeyUnitEvent { + running_time: Some(5.seconds()), + all_headers: true, + count: 0 + } + ); + + let ev = loop { + let ev = h2.pull_upstream_event().unwrap(); + if ev.type_() != gst::EventType::Reconfigure + && ev.type_() != gst::EventType::Latency + { + break ev; + } + }; + + assert_eq!(ev.type_(), gst::EventType::CustomUpstream); + assert_eq!( + gst_video::UpstreamForceKeyUnitEvent::parse(&ev).unwrap(), + gst_video::UpstreamForceKeyUnitEvent { + running_time: Some(5.seconds()), + all_headers: true, + count: 0 + } + ); + } + } + + // Crank the clock: this should bring us to the end of the first fragment + h1.crank_single_clock_wait().unwrap(); + + let header = h1.pull().unwrap(); + assert_eq!( + header.flags(), + gst::BufferFlags::HEADER | gst::BufferFlags::DISCONT + ); + assert_eq!(header.pts(), Some(gst::ClockTime::ZERO + output_offset)); + assert_eq!(header.dts(), Some(gst::ClockTime::ZERO + output_offset)); + + // There should be 7 chunks now, and the 1st and 6th are starting a fragment. + // Each chunk should have two buffers. + for chunk in 0..7 { + let chunk_header = h1.pull().unwrap(); + if chunk == 0 || chunk == 5 { + assert_eq!(chunk_header.flags(), gst::BufferFlags::HEADER); + } else { + assert_eq!( + chunk_header.flags(), + gst::BufferFlags::HEADER | gst::BufferFlags::DELTA_UNIT + ); + } + assert_eq!( + chunk_header.pts(), + Some(chunk * 1.seconds() + output_offset) + ); + assert_eq!( + chunk_header.dts(), + Some(chunk * 1.seconds() + output_offset) + ); + assert_eq!(chunk_header.duration(), Some(1.seconds())); + + for buffer_idx in 0..2 { + for stream_idx in 0..2 { + let buffer = h1.pull().unwrap(); + if buffer_idx == 1 && stream_idx == 1 { + assert_eq!( + buffer.flags(), + gst::BufferFlags::DELTA_UNIT | gst::BufferFlags::MARKER + ); + } else { + assert_eq!(buffer.flags(), gst::BufferFlags::DELTA_UNIT); + } + assert_eq!( + buffer.pts(), + Some((chunk * 2 + buffer_idx) * 500.mseconds() + output_offset) + ); + + if stream_idx == 0 { + assert_eq!( + buffer.dts(), + Some((chunk * 2 + buffer_idx) * 500.mseconds() + output_offset) + ); + } else { + assert!(buffer.dts().is_none()); + } + assert_eq!(buffer.duration(), Some(500.mseconds())); + } + } + } + + h1.push_event(gst::event::Eos::new()); + h2.push_event(gst::event::Eos::new()); + + // There should be the remaining chunk now, containing one 500ms buffer. + for chunk in 7..8 { + let chunk_header = h1.pull().unwrap(); + assert_eq!( + chunk_header.flags(), + gst::BufferFlags::HEADER | gst::BufferFlags::DELTA_UNIT + ); + assert_eq!( + chunk_header.pts(), + Some(chunk * 1.seconds() + output_offset) + ); + assert_eq!( + chunk_header.dts(), + Some(chunk * 1.seconds() + output_offset) + ); + assert_eq!(chunk_header.duration(), Some(500.mseconds())); + + for buffer_idx in 0..1 { + for stream_idx in 0..2 { + let buffer = h1.pull().unwrap(); + if buffer_idx == 0 && stream_idx == 1 { + assert_eq!( + buffer.flags(), + gst::BufferFlags::DELTA_UNIT | gst::BufferFlags::MARKER + ); + } else { + assert_eq!(buffer.flags(), gst::BufferFlags::DELTA_UNIT); + } + + assert_eq!( + buffer.pts(), + Some((chunk * 2 + buffer_idx) * 500.mseconds() + output_offset) + ); + if stream_idx == 0 { + assert_eq!( + buffer.dts(), + Some((chunk * 2 + buffer_idx) * 500.mseconds() + output_offset) + ); + } else { + assert!(buffer.dts().is_none()); + } + assert_eq!(buffer.duration(), Some(500.mseconds())); + } + } + } + + let ev = h1.pull_event().unwrap(); + assert_eq!(ev.type_(), gst::EventType::StreamStart); + let ev = h1.pull_event().unwrap(); + assert_eq!(ev.type_(), gst::EventType::Caps); + let ev = h1.pull_event().unwrap(); + assert_eq!(ev.type_(), gst::EventType::Segment); + let ev = h1.pull_event().unwrap(); + assert_eq!(ev.type_(), gst::EventType::Eos); +}