fmp4mux: Handle GOPs ending after the desired fragment end correctly

Either create further chunks if enough data is queued or simply start
the new fragment at a later time if the keyframe is later.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1077>
This commit is contained in:
Sebastian Dröge 2023-02-06 19:11:03 +02:00
parent 5c2de6aeb6
commit 3a408c0146

View file

@ -984,27 +984,6 @@ impl FMP4Mux {
// Check if this stream is filled enough now.
if let Some(chunk_duration) = settings.chunk_duration {
// In chunk mode
let (gop_idx, gop) = match stream
.queued_gops
.iter()
.enumerate()
.find(|(_idx, gop)| gop.final_earliest_pts || all_eos || stream.sinkpad.is_eos())
{
Some(res) => res,
None => {
gst::trace!(CAT, obj: stream.sinkpad, "Chunked mode but no GOP with final earliest PTS known yet");
return;
}
};
gst::trace!(
CAT,
obj: stream.sinkpad,
"GOP {gop_idx} start PTS {}, GOP end PTS {} (final {})",
gop.start_pts,
gop.end_pts,
gop.final_end_pts || all_eos || stream.sinkpad.is_eos(),
);
gst::trace!(
CAT,
obj: stream.sinkpad,
@ -1026,10 +1005,53 @@ impl FMP4Mux {
// First check if the next split should be the end of a fragment or the end of a chunk.
// If both are the same then a fragment split has preference.
if fragment_end_pts <= chunk_end_pts && gop.start_pts >= fragment_end_pts {
gst::debug!(CAT, obj: stream.sinkpad, "Stream queued enough data for finishing this fragment");
stream.fragment_filled = true;
} else if chunk_end_pts < fragment_end_pts {
if fragment_end_pts <= chunk_end_pts {
// We can only finish a fragment if a full GOP with final end PTS is queued and it
// ends at or after the fragment end PTS.
if let Some((gop_idx, gop)) = stream
.queued_gops
.iter()
.enumerate()
.find(|(_idx, gop)| gop.final_end_pts || all_eos || stream.sinkpad.is_eos())
{
gst::trace!(
CAT,
obj: stream.sinkpad,
"GOP {gop_idx} start PTS {}, GOP end PTS {}",
gop.start_pts,
gop.end_pts,
);
if gop.end_pts >= fragment_end_pts {
gst::debug!(CAT, obj: stream.sinkpad, "Stream queued enough data for finishing this fragment");
stream.fragment_filled = true;
return;
}
}
}
if !stream.fragment_filled {
let (gop_idx, gop) = match stream.queued_gops.iter().enumerate().find(
|(_idx, gop)| gop.final_earliest_pts || all_eos || stream.sinkpad.is_eos(),
) {
Some(res) => res,
None => {
gst::trace!(
CAT,
obj: stream.sinkpad,
"Chunked mode and want to finish fragment but no GOP with final end PTS known yet",
);
return;
}
};
gst::trace!(
CAT,
obj: stream.sinkpad,
"GOP {gop_idx} start PTS {}, GOP end PTS {} (final {})",
gop.start_pts,
gop.end_pts,
gop.final_end_pts || all_eos || stream.sinkpad.is_eos(),
);
let last_pts = gop.buffers.last().map(|b| b.pts);
if gop.end_pts >= chunk_end_pts
@ -1287,8 +1309,15 @@ impl FMP4Mux {
gop.final_end_pts || all_eos || stream.sinkpad.is_eos()
);
// If we have a final GOP then include it as long as it's either
// - ending before the dequeue end PTS
// - no GOPs were dequeued yet and this is the first stream
//
// The second case would happen if no GOP ends between the last chunk of the
// fragment and the fragment duration.
if (gop.final_end_pts || all_eos || stream.sinkpad.is_eos())
&& gop.end_pts <= dequeue_end_pts
&& (gop.end_pts <= dequeue_end_pts
|| (gops.is_empty() && chunk_end_pts.is_none()))
{
gst::trace!(
CAT,
@ -1302,7 +1331,16 @@ impl FMP4Mux {
break;
}
gst::error!(CAT, obj: stream.sinkpad, "Don't have a full GOP at the end of a fragment");
// Otherwise if this is the first stream and no full GOP is queued then we need
// to wait for more data.
//
// If this is not the first stream then take an incomplete GOP.
if chunk_end_pts.is_none() {
gst::info!(CAT, obj: stream.sinkpad, "Don't have a full GOP at the end of a fragment");
return Err(gst_base::AGGREGATOR_FLOW_NEED_DATA);
} else {
gst::info!(CAT, obj: stream.sinkpad, "Including incomplete GOP");
}
} else {
gst::trace!(
CAT,
@ -1770,7 +1808,7 @@ impl FMP4Mux {
// This is handled below generally if nothing was dequeued
} else {
if settings.chunk_duration.is_some() {
gst::warning!(
gst::debug!(
CAT,
obj: stream.sinkpad,
"Don't have anything to drain for the first stream on timeout in a live pipeline",
@ -2144,6 +2182,19 @@ impl FMP4Mux {
let min_earliest_pts = min_earliest_pts.unwrap();
let chunk_end_pts = chunk_end_pts.unwrap();
gst::debug!(
CAT,
imp: self,
concat!(
"Draining chunk (fragment start: {} fragment end: {}) ",
"from PTS {} to {}"
),
fragment_start,
fragment_filled,
min_earliest_pts,
chunk_end_pts,
);
let mut fmp4_header = None;
if !state.sent_headers {
let mut buffer = state.stream_header.as_ref().unwrap().copy();
@ -2315,11 +2366,7 @@ impl FMP4Mux {
Err(err) => {
if err == gst_base::AGGREGATOR_FLOW_NEED_DATA {
assert!(!all_eos);
gst::element_imp_warning!(
self,
gst::StreamError::Format,
["Longer GOPs than fragment duration"]
);
gst::debug!(CAT, imp: self, "Need more data");
state.timeout_delay += 1.seconds();
}