mirror of
https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs.git
synced 2025-01-24 09:58:13 +00:00
fmp4mux: Fix draining in chunk mode if keyframes are too late
We would create another chunk that ends after the fragment end, and would from then on consider the stream always filled for the chunk because it starts after the current fragment end (i.e. nothing would go into this fragment). This is obviously wrong because the actual fragment end moved further ahead because of the additional chunk. Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1285>
This commit is contained in:
parent
b4b2ca9a82
commit
b6168c7255
2 changed files with 361 additions and 8 deletions
|
@ -1003,6 +1003,15 @@ impl FMP4Mux {
|
|||
let chunk_end_pts = chunk_start_pts + chunk_duration;
|
||||
let fragment_end_pts = fragment_start_pts + settings.fragment_duration;
|
||||
|
||||
if fragment_end_pts < chunk_end_pts {
|
||||
gst::trace!(
|
||||
CAT,
|
||||
obj: stream.sinkpad,
|
||||
"Current chunk end {}, current fragment end {}. Fragment end before chunk end, extending fragment",
|
||||
chunk_end_pts,
|
||||
fragment_end_pts,
|
||||
);
|
||||
} else {
|
||||
gst::trace!(
|
||||
CAT,
|
||||
obj: stream.sinkpad,
|
||||
|
@ -1010,6 +1019,7 @@ impl FMP4Mux {
|
|||
chunk_end_pts,
|
||||
fragment_end_pts,
|
||||
);
|
||||
}
|
||||
|
||||
// First check if the next split should be the end of a fragment or the end of a chunk.
|
||||
// If both are the same then a fragment split has preference.
|
||||
|
@ -1025,7 +1035,18 @@ impl FMP4Mux {
|
|||
gop.start_pts,
|
||||
gop.end_pts,
|
||||
);
|
||||
if gop.start_pts > fragment_end_pts {
|
||||
// If this GOP starts after the end of the current fragment, i.e. is not
|
||||
// included at all, then consider this stream filled as it won't contribute to
|
||||
// this fragment.
|
||||
//
|
||||
// However if the first buffer of the GOP is not actually a keyframe then we
|
||||
// previously drained a partial GOP because the GOP is ending too far after the
|
||||
// planned fragment end.
|
||||
if gop.start_pts > fragment_end_pts
|
||||
&& !gop.buffers.first().map_or(false, |b| {
|
||||
b.buffer.flags().contains(gst::BufferFlags::DELTA_UNIT)
|
||||
})
|
||||
{
|
||||
gst::debug!(CAT, obj: stream.sinkpad, "Stream's first GOP starting after this fragment");
|
||||
stream.fragment_filled = true;
|
||||
return;
|
||||
|
@ -1812,6 +1833,14 @@ impl FMP4Mux {
|
|||
!s.sinkpad.is_eos()
|
||||
&& s.queued_gops.back().map_or(false, |gop| {
|
||||
gop.start_pts <= fragment_start_pts + settings.fragment_duration
|
||||
// In chunk mode we might've drained a partial GOP as a chunk after
|
||||
// the fragment end if the keyframe came too late. The GOP now
|
||||
// starts with a non-keyframe after the fragment end but is part of
|
||||
// the fragment: the fragment is extended after the end. Allow this
|
||||
// situation here.
|
||||
|| gop.buffers.first().map_or(false, |b| {
|
||||
b.buffer.flags().contains(gst::BufferFlags::DELTA_UNIT)
|
||||
})
|
||||
})
|
||||
})
|
||||
.map(|s| s.fragment_filled)
|
||||
|
|
|
@ -1669,3 +1669,327 @@ fn test_chunking_multi_stream() {
|
|||
let ev = h1.pull_event().unwrap();
|
||||
assert_eq!(ev.type_(), gst::EventType::Eos);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_chunking_single_stream_gops_after_fragment_end_before_next_chunk_end() {
|
||||
init();
|
||||
|
||||
let caps = gst::Caps::builder("video/x-h264")
|
||||
.field("width", 1920i32)
|
||||
.field("height", 1080i32)
|
||||
.field("framerate", gst::Fraction::new(30, 1))
|
||||
.field("stream-format", "avc")
|
||||
.field("alignment", "au")
|
||||
.field("codec_data", gst::Buffer::with_size(1).unwrap())
|
||||
.build();
|
||||
|
||||
let mut h = gst_check::Harness::new("cmafmux");
|
||||
|
||||
// 5s fragment duration, 1s chunk duration
|
||||
h.element()
|
||||
.unwrap()
|
||||
.set_property("fragment-duration", 5.seconds());
|
||||
h.element()
|
||||
.unwrap()
|
||||
.set_property("chunk-duration", 1.seconds());
|
||||
|
||||
h.set_src_caps(caps);
|
||||
h.play();
|
||||
|
||||
// Push 15 buffers of 0.5s each, 1st and 12th buffer without DELTA_UNIT flag
|
||||
for i in 0..15 {
|
||||
let mut buffer = gst::Buffer::with_size(1).unwrap();
|
||||
{
|
||||
let buffer = buffer.get_mut().unwrap();
|
||||
buffer.set_pts(i * 500.mseconds());
|
||||
buffer.set_dts(i * 500.mseconds());
|
||||
buffer.set_duration(500.mseconds());
|
||||
if i != 0 && i != 11 {
|
||||
buffer.set_flags(gst::BufferFlags::DELTA_UNIT);
|
||||
}
|
||||
}
|
||||
assert_eq!(h.push(buffer), Ok(gst::FlowSuccess::Ok));
|
||||
|
||||
if i == 2 {
|
||||
let ev = loop {
|
||||
let ev = h.pull_upstream_event().unwrap();
|
||||
if ev.type_() != gst::EventType::Reconfigure
|
||||
&& ev.type_() != gst::EventType::Latency
|
||||
{
|
||||
break ev;
|
||||
}
|
||||
};
|
||||
|
||||
assert_eq!(ev.type_(), gst::EventType::CustomUpstream);
|
||||
assert_eq!(
|
||||
gst_video::UpstreamForceKeyUnitEvent::parse(&ev).unwrap(),
|
||||
gst_video::UpstreamForceKeyUnitEvent {
|
||||
running_time: Some(5.seconds()),
|
||||
all_headers: true,
|
||||
count: 0
|
||||
}
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// Crank the clock: this should bring us to the end of the first fragment
|
||||
h.crank_single_clock_wait().unwrap();
|
||||
|
||||
let mut expected_ts = gst::ClockTime::ZERO;
|
||||
let mut num_buffers = 0;
|
||||
|
||||
let header = h.pull().unwrap();
|
||||
assert_eq!(
|
||||
header.flags(),
|
||||
gst::BufferFlags::HEADER | gst::BufferFlags::DISCONT
|
||||
);
|
||||
assert_eq!(header.pts(), Some(expected_ts));
|
||||
assert_eq!(header.dts(), Some(expected_ts));
|
||||
|
||||
// There should be 7 chunks now, and the 1st and 7th are starting a fragment.
|
||||
// Each chunk should have two buffers except for the 6th.
|
||||
for chunk in 0..7 {
|
||||
let chunk_header = h.pull().unwrap();
|
||||
if chunk == 0 || chunk == 6 {
|
||||
assert_eq!(chunk_header.flags(), gst::BufferFlags::HEADER);
|
||||
} else {
|
||||
assert_eq!(
|
||||
chunk_header.flags(),
|
||||
gst::BufferFlags::HEADER | gst::BufferFlags::DELTA_UNIT
|
||||
);
|
||||
}
|
||||
assert_eq!(chunk_header.pts(), Some(expected_ts));
|
||||
assert_eq!(chunk_header.dts(), Some(expected_ts));
|
||||
if chunk == 5 {
|
||||
assert_eq!(chunk_header.duration(), Some(500.mseconds()));
|
||||
} else {
|
||||
assert_eq!(chunk_header.duration(), Some(1.seconds()));
|
||||
}
|
||||
|
||||
for buffer_idx in 0..2 {
|
||||
let buffer = h.pull().unwrap();
|
||||
num_buffers += 1;
|
||||
if buffer_idx == 1 || (chunk == 5 && buffer_idx == 0) {
|
||||
assert_eq!(
|
||||
buffer.flags(),
|
||||
gst::BufferFlags::DELTA_UNIT | gst::BufferFlags::MARKER
|
||||
);
|
||||
} else {
|
||||
assert_eq!(buffer.flags(), gst::BufferFlags::DELTA_UNIT);
|
||||
}
|
||||
assert_eq!(buffer.pts(), Some(expected_ts));
|
||||
assert_eq!(buffer.dts(), Some(expected_ts));
|
||||
assert_eq!(buffer.duration(), Some(500.mseconds()));
|
||||
|
||||
expected_ts += 500.mseconds();
|
||||
|
||||
// Only one buffer in this chunk
|
||||
if chunk == 5 && buffer_idx == 0 {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
h.push_event(gst::event::Eos::new());
|
||||
|
||||
// There should be one remaining chunk now, containing two 500ms buffer.
|
||||
for _chunk in 7..8 {
|
||||
let chunk_header = h.pull().unwrap();
|
||||
assert_eq!(
|
||||
chunk_header.flags(),
|
||||
gst::BufferFlags::HEADER | gst::BufferFlags::DELTA_UNIT
|
||||
);
|
||||
assert_eq!(chunk_header.pts(), Some(expected_ts));
|
||||
assert_eq!(chunk_header.dts(), Some(expected_ts));
|
||||
assert_eq!(chunk_header.duration(), Some(1.seconds()));
|
||||
|
||||
for buffer_idx in 0..2 {
|
||||
let buffer = h.pull().unwrap();
|
||||
num_buffers += 1;
|
||||
if buffer_idx == 1 {
|
||||
assert_eq!(
|
||||
buffer.flags(),
|
||||
gst::BufferFlags::DELTA_UNIT | gst::BufferFlags::MARKER
|
||||
);
|
||||
} else {
|
||||
assert_eq!(buffer.flags(), gst::BufferFlags::DELTA_UNIT);
|
||||
}
|
||||
assert_eq!(buffer.pts(), Some(expected_ts));
|
||||
assert_eq!(buffer.dts(), Some(expected_ts));
|
||||
assert_eq!(buffer.duration(), Some(500.mseconds()));
|
||||
expected_ts += 500.mseconds();
|
||||
}
|
||||
}
|
||||
|
||||
assert_eq!(num_buffers, 15);
|
||||
|
||||
let ev = h.pull_event().unwrap();
|
||||
assert_eq!(ev.type_(), gst::EventType::StreamStart);
|
||||
let ev = h.pull_event().unwrap();
|
||||
assert_eq!(ev.type_(), gst::EventType::Caps);
|
||||
let ev = h.pull_event().unwrap();
|
||||
assert_eq!(ev.type_(), gst::EventType::Segment);
|
||||
let ev = h.pull_event().unwrap();
|
||||
assert_eq!(ev.type_(), gst::EventType::Eos);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_chunking_single_stream_gops_after_fragment_end_after_next_chunk_end() {
|
||||
init();
|
||||
|
||||
let caps = gst::Caps::builder("video/x-h264")
|
||||
.field("width", 1920i32)
|
||||
.field("height", 1080i32)
|
||||
.field("framerate", gst::Fraction::new(30, 1))
|
||||
.field("stream-format", "avc")
|
||||
.field("alignment", "au")
|
||||
.field("codec_data", gst::Buffer::with_size(1).unwrap())
|
||||
.build();
|
||||
|
||||
let mut h = gst_check::Harness::new("cmafmux");
|
||||
|
||||
// 5s fragment duration, 1s chunk duration
|
||||
h.element()
|
||||
.unwrap()
|
||||
.set_property("fragment-duration", 5.seconds());
|
||||
h.element()
|
||||
.unwrap()
|
||||
.set_property("chunk-duration", 1.seconds());
|
||||
|
||||
h.set_src_caps(caps);
|
||||
h.play();
|
||||
|
||||
// Push 15 buffers of 0.5s each, 1st and 14th buffer without DELTA_UNIT flag
|
||||
for i in 0..15 {
|
||||
let mut buffer = gst::Buffer::with_size(1).unwrap();
|
||||
{
|
||||
let buffer = buffer.get_mut().unwrap();
|
||||
buffer.set_pts(i * 500.mseconds());
|
||||
buffer.set_dts(i * 500.mseconds());
|
||||
buffer.set_duration(500.mseconds());
|
||||
if i != 0 && i != 13 {
|
||||
buffer.set_flags(gst::BufferFlags::DELTA_UNIT);
|
||||
}
|
||||
}
|
||||
assert_eq!(h.push(buffer), Ok(gst::FlowSuccess::Ok));
|
||||
|
||||
if i == 2 {
|
||||
let ev = loop {
|
||||
let ev = h.pull_upstream_event().unwrap();
|
||||
if ev.type_() != gst::EventType::Reconfigure
|
||||
&& ev.type_() != gst::EventType::Latency
|
||||
{
|
||||
break ev;
|
||||
}
|
||||
};
|
||||
|
||||
assert_eq!(ev.type_(), gst::EventType::CustomUpstream);
|
||||
assert_eq!(
|
||||
gst_video::UpstreamForceKeyUnitEvent::parse(&ev).unwrap(),
|
||||
gst_video::UpstreamForceKeyUnitEvent {
|
||||
running_time: Some(5.seconds()),
|
||||
all_headers: true,
|
||||
count: 0
|
||||
}
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// Crank the clock: this should bring us to the end of the first fragment
|
||||
h.crank_single_clock_wait().unwrap();
|
||||
|
||||
let mut expected_ts = gst::ClockTime::ZERO;
|
||||
let mut num_buffers = 0;
|
||||
|
||||
let header = h.pull().unwrap();
|
||||
assert_eq!(
|
||||
header.flags(),
|
||||
gst::BufferFlags::HEADER | gst::BufferFlags::DISCONT
|
||||
);
|
||||
assert_eq!(header.pts(), Some(expected_ts));
|
||||
assert_eq!(header.dts(), Some(expected_ts));
|
||||
|
||||
// There should be 7 chunks now, and the 1st is starting a fragment.
|
||||
// Each chunk should have two buffers except for the 7th.
|
||||
for chunk in 0..7 {
|
||||
let chunk_header = h.pull().unwrap();
|
||||
if chunk == 0 {
|
||||
assert_eq!(chunk_header.flags(), gst::BufferFlags::HEADER);
|
||||
} else {
|
||||
assert_eq!(
|
||||
chunk_header.flags(),
|
||||
gst::BufferFlags::HEADER | gst::BufferFlags::DELTA_UNIT
|
||||
);
|
||||
}
|
||||
assert_eq!(chunk_header.pts(), Some(expected_ts));
|
||||
assert_eq!(chunk_header.dts(), Some(expected_ts));
|
||||
if chunk == 6 {
|
||||
assert_eq!(chunk_header.duration(), Some(500.mseconds()));
|
||||
} else {
|
||||
assert_eq!(chunk_header.duration(), Some(1.seconds()));
|
||||
}
|
||||
|
||||
for buffer_idx in 0..2 {
|
||||
let buffer = h.pull().unwrap();
|
||||
num_buffers += 1;
|
||||
if buffer_idx == 1 || (chunk == 6 && buffer_idx == 0) {
|
||||
assert_eq!(
|
||||
buffer.flags(),
|
||||
gst::BufferFlags::DELTA_UNIT | gst::BufferFlags::MARKER
|
||||
);
|
||||
} else {
|
||||
assert_eq!(buffer.flags(), gst::BufferFlags::DELTA_UNIT);
|
||||
}
|
||||
assert_eq!(buffer.pts(), Some(expected_ts));
|
||||
assert_eq!(buffer.dts(), Some(expected_ts));
|
||||
assert_eq!(buffer.duration(), Some(500.mseconds()));
|
||||
|
||||
expected_ts += 500.mseconds();
|
||||
|
||||
// Only one buffer in this chunk
|
||||
if chunk == 6 && buffer_idx == 0 {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
h.push_event(gst::event::Eos::new());
|
||||
|
||||
// There should be two remaining chunks now, containing two 500ms buffers.
|
||||
// This should start a new fragment.
|
||||
for _chunk in 7..8 {
|
||||
let chunk_header = h.pull().unwrap();
|
||||
assert_eq!(chunk_header.flags(), gst::BufferFlags::HEADER);
|
||||
assert_eq!(chunk_header.pts(), Some(expected_ts));
|
||||
assert_eq!(chunk_header.dts(), Some(expected_ts));
|
||||
assert_eq!(chunk_header.duration(), Some(1.seconds()));
|
||||
|
||||
for buffer_idx in 0..2 {
|
||||
let buffer = h.pull().unwrap();
|
||||
num_buffers += 1;
|
||||
if buffer_idx == 1 {
|
||||
assert_eq!(
|
||||
buffer.flags(),
|
||||
gst::BufferFlags::DELTA_UNIT | gst::BufferFlags::MARKER
|
||||
);
|
||||
} else {
|
||||
assert_eq!(buffer.flags(), gst::BufferFlags::DELTA_UNIT);
|
||||
}
|
||||
assert_eq!(buffer.pts(), Some(expected_ts));
|
||||
assert_eq!(buffer.dts(), Some(expected_ts));
|
||||
assert_eq!(buffer.duration(), Some(500.mseconds()));
|
||||
expected_ts += 500.mseconds();
|
||||
}
|
||||
}
|
||||
|
||||
assert_eq!(num_buffers, 15);
|
||||
|
||||
let ev = h.pull_event().unwrap();
|
||||
assert_eq!(ev.type_(), gst::EventType::StreamStart);
|
||||
let ev = h.pull_event().unwrap();
|
||||
assert_eq!(ev.type_(), gst::EventType::Caps);
|
||||
let ev = h.pull_event().unwrap();
|
||||
assert_eq!(ev.type_(), gst::EventType::Segment);
|
||||
let ev = h.pull_event().unwrap();
|
||||
assert_eq!(ev.type_(), gst::EventType::Eos);
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue