diff --git a/mux/fmp4/src/fmp4mux/imp.rs b/mux/fmp4/src/fmp4mux/imp.rs index 6870f8a1..3718ac40 100644 --- a/mux/fmp4/src/fmp4mux/imp.rs +++ b/mux/fmp4/src/fmp4mux/imp.rs @@ -1003,13 +1003,23 @@ impl FMP4Mux { let chunk_end_pts = chunk_start_pts + chunk_duration; let fragment_end_pts = fragment_start_pts + settings.fragment_duration; - gst::trace!( - CAT, - obj: stream.sinkpad, - "Current chunk end {}, current fragment end {}", - chunk_end_pts, - fragment_end_pts, - ); + if fragment_end_pts < chunk_end_pts { + gst::trace!( + CAT, + obj: stream.sinkpad, + "Current chunk end {}, current fragment end {}. Fragment end before chunk end, extending fragment", + chunk_end_pts, + fragment_end_pts, + ); + } else { + gst::trace!( + CAT, + obj: stream.sinkpad, + "Current chunk end {}, current fragment end {}", + chunk_end_pts, + fragment_end_pts, + ); + } // First check if the next split should be the end of a fragment or the end of a chunk. // If both are the same then a fragment split has preference. @@ -1025,7 +1035,18 @@ impl FMP4Mux { gop.start_pts, gop.end_pts, ); - if gop.start_pts > fragment_end_pts { + // If this GOP starts after the end of the current fragment, i.e. is not + // included at all, then consider this stream filled as it won't contribute to + // this fragment. + // + // However if the first buffer of the GOP is not actually a keyframe then we + // previously drained a partial GOP because the GOP is ending too far after the + // planned fragment end. + if gop.start_pts > fragment_end_pts + && !gop.buffers.first().map_or(false, |b| { + b.buffer.flags().contains(gst::BufferFlags::DELTA_UNIT) + }) + { gst::debug!(CAT, obj: stream.sinkpad, "Stream's first GOP starting after this fragment"); stream.fragment_filled = true; return; @@ -1812,6 +1833,14 @@ impl FMP4Mux { !s.sinkpad.is_eos() && s.queued_gops.back().map_or(false, |gop| { gop.start_pts <= fragment_start_pts + settings.fragment_duration + // In chunk mode we might've drained a partial GOP as a chunk after + // the fragment end if the keyframe came too late. The GOP now + // starts with a non-keyframe after the fragment end but is part of + // the fragment: the fragment is extended after the end. Allow this + // situation here. + || gop.buffers.first().map_or(false, |b| { + b.buffer.flags().contains(gst::BufferFlags::DELTA_UNIT) + }) }) }) .map(|s| s.fragment_filled) diff --git a/mux/fmp4/tests/tests.rs b/mux/fmp4/tests/tests.rs index ac59e377..a32e65fc 100644 --- a/mux/fmp4/tests/tests.rs +++ b/mux/fmp4/tests/tests.rs @@ -1669,3 +1669,327 @@ fn test_chunking_multi_stream() { let ev = h1.pull_event().unwrap(); assert_eq!(ev.type_(), gst::EventType::Eos); } + +#[test] +fn test_chunking_single_stream_gops_after_fragment_end_before_next_chunk_end() { + init(); + + let caps = gst::Caps::builder("video/x-h264") + .field("width", 1920i32) + .field("height", 1080i32) + .field("framerate", gst::Fraction::new(30, 1)) + .field("stream-format", "avc") + .field("alignment", "au") + .field("codec_data", gst::Buffer::with_size(1).unwrap()) + .build(); + + let mut h = gst_check::Harness::new("cmafmux"); + + // 5s fragment duration, 1s chunk duration + h.element() + .unwrap() + .set_property("fragment-duration", 5.seconds()); + h.element() + .unwrap() + .set_property("chunk-duration", 1.seconds()); + + h.set_src_caps(caps); + h.play(); + + // Push 15 buffers of 0.5s each, 1st and 12th buffer without DELTA_UNIT flag + for i in 0..15 { + let mut buffer = gst::Buffer::with_size(1).unwrap(); + { + let buffer = buffer.get_mut().unwrap(); + buffer.set_pts(i * 500.mseconds()); + buffer.set_dts(i * 500.mseconds()); + buffer.set_duration(500.mseconds()); + if i != 0 && i != 11 { + buffer.set_flags(gst::BufferFlags::DELTA_UNIT); + } + } + assert_eq!(h.push(buffer), Ok(gst::FlowSuccess::Ok)); + + if i == 2 { + let ev = loop { + let ev = h.pull_upstream_event().unwrap(); + if ev.type_() != gst::EventType::Reconfigure + && ev.type_() != gst::EventType::Latency + { + break ev; + } + }; + + assert_eq!(ev.type_(), gst::EventType::CustomUpstream); + assert_eq!( + gst_video::UpstreamForceKeyUnitEvent::parse(&ev).unwrap(), + gst_video::UpstreamForceKeyUnitEvent { + running_time: Some(5.seconds()), + all_headers: true, + count: 0 + } + ); + } + } + + // Crank the clock: this should bring us to the end of the first fragment + h.crank_single_clock_wait().unwrap(); + + let mut expected_ts = gst::ClockTime::ZERO; + let mut num_buffers = 0; + + let header = h.pull().unwrap(); + assert_eq!( + header.flags(), + gst::BufferFlags::HEADER | gst::BufferFlags::DISCONT + ); + assert_eq!(header.pts(), Some(expected_ts)); + assert_eq!(header.dts(), Some(expected_ts)); + + // There should be 7 chunks now, and the 1st and 7th are starting a fragment. + // Each chunk should have two buffers except for the 6th. + for chunk in 0..7 { + let chunk_header = h.pull().unwrap(); + if chunk == 0 || chunk == 6 { + assert_eq!(chunk_header.flags(), gst::BufferFlags::HEADER); + } else { + assert_eq!( + chunk_header.flags(), + gst::BufferFlags::HEADER | gst::BufferFlags::DELTA_UNIT + ); + } + assert_eq!(chunk_header.pts(), Some(expected_ts)); + assert_eq!(chunk_header.dts(), Some(expected_ts)); + if chunk == 5 { + assert_eq!(chunk_header.duration(), Some(500.mseconds())); + } else { + assert_eq!(chunk_header.duration(), Some(1.seconds())); + } + + for buffer_idx in 0..2 { + let buffer = h.pull().unwrap(); + num_buffers += 1; + if buffer_idx == 1 || (chunk == 5 && buffer_idx == 0) { + assert_eq!( + buffer.flags(), + gst::BufferFlags::DELTA_UNIT | gst::BufferFlags::MARKER + ); + } else { + assert_eq!(buffer.flags(), gst::BufferFlags::DELTA_UNIT); + } + assert_eq!(buffer.pts(), Some(expected_ts)); + assert_eq!(buffer.dts(), Some(expected_ts)); + assert_eq!(buffer.duration(), Some(500.mseconds())); + + expected_ts += 500.mseconds(); + + // Only one buffer in this chunk + if chunk == 5 && buffer_idx == 0 { + break; + } + } + } + + h.push_event(gst::event::Eos::new()); + + // There should be one remaining chunk now, containing two 500ms buffer. + for _chunk in 7..8 { + let chunk_header = h.pull().unwrap(); + assert_eq!( + chunk_header.flags(), + gst::BufferFlags::HEADER | gst::BufferFlags::DELTA_UNIT + ); + assert_eq!(chunk_header.pts(), Some(expected_ts)); + assert_eq!(chunk_header.dts(), Some(expected_ts)); + assert_eq!(chunk_header.duration(), Some(1.seconds())); + + for buffer_idx in 0..2 { + let buffer = h.pull().unwrap(); + num_buffers += 1; + if buffer_idx == 1 { + assert_eq!( + buffer.flags(), + gst::BufferFlags::DELTA_UNIT | gst::BufferFlags::MARKER + ); + } else { + assert_eq!(buffer.flags(), gst::BufferFlags::DELTA_UNIT); + } + assert_eq!(buffer.pts(), Some(expected_ts)); + assert_eq!(buffer.dts(), Some(expected_ts)); + assert_eq!(buffer.duration(), Some(500.mseconds())); + expected_ts += 500.mseconds(); + } + } + + assert_eq!(num_buffers, 15); + + let ev = h.pull_event().unwrap(); + assert_eq!(ev.type_(), gst::EventType::StreamStart); + let ev = h.pull_event().unwrap(); + assert_eq!(ev.type_(), gst::EventType::Caps); + let ev = h.pull_event().unwrap(); + assert_eq!(ev.type_(), gst::EventType::Segment); + let ev = h.pull_event().unwrap(); + assert_eq!(ev.type_(), gst::EventType::Eos); +} + +#[test] +fn test_chunking_single_stream_gops_after_fragment_end_after_next_chunk_end() { + init(); + + let caps = gst::Caps::builder("video/x-h264") + .field("width", 1920i32) + .field("height", 1080i32) + .field("framerate", gst::Fraction::new(30, 1)) + .field("stream-format", "avc") + .field("alignment", "au") + .field("codec_data", gst::Buffer::with_size(1).unwrap()) + .build(); + + let mut h = gst_check::Harness::new("cmafmux"); + + // 5s fragment duration, 1s chunk duration + h.element() + .unwrap() + .set_property("fragment-duration", 5.seconds()); + h.element() + .unwrap() + .set_property("chunk-duration", 1.seconds()); + + h.set_src_caps(caps); + h.play(); + + // Push 15 buffers of 0.5s each, 1st and 14th buffer without DELTA_UNIT flag + for i in 0..15 { + let mut buffer = gst::Buffer::with_size(1).unwrap(); + { + let buffer = buffer.get_mut().unwrap(); + buffer.set_pts(i * 500.mseconds()); + buffer.set_dts(i * 500.mseconds()); + buffer.set_duration(500.mseconds()); + if i != 0 && i != 13 { + buffer.set_flags(gst::BufferFlags::DELTA_UNIT); + } + } + assert_eq!(h.push(buffer), Ok(gst::FlowSuccess::Ok)); + + if i == 2 { + let ev = loop { + let ev = h.pull_upstream_event().unwrap(); + if ev.type_() != gst::EventType::Reconfigure + && ev.type_() != gst::EventType::Latency + { + break ev; + } + }; + + assert_eq!(ev.type_(), gst::EventType::CustomUpstream); + assert_eq!( + gst_video::UpstreamForceKeyUnitEvent::parse(&ev).unwrap(), + gst_video::UpstreamForceKeyUnitEvent { + running_time: Some(5.seconds()), + all_headers: true, + count: 0 + } + ); + } + } + + // Crank the clock: this should bring us to the end of the first fragment + h.crank_single_clock_wait().unwrap(); + + let mut expected_ts = gst::ClockTime::ZERO; + let mut num_buffers = 0; + + let header = h.pull().unwrap(); + assert_eq!( + header.flags(), + gst::BufferFlags::HEADER | gst::BufferFlags::DISCONT + ); + assert_eq!(header.pts(), Some(expected_ts)); + assert_eq!(header.dts(), Some(expected_ts)); + + // There should be 7 chunks now, and the 1st is starting a fragment. + // Each chunk should have two buffers except for the 7th. + for chunk in 0..7 { + let chunk_header = h.pull().unwrap(); + if chunk == 0 { + assert_eq!(chunk_header.flags(), gst::BufferFlags::HEADER); + } else { + assert_eq!( + chunk_header.flags(), + gst::BufferFlags::HEADER | gst::BufferFlags::DELTA_UNIT + ); + } + assert_eq!(chunk_header.pts(), Some(expected_ts)); + assert_eq!(chunk_header.dts(), Some(expected_ts)); + if chunk == 6 { + assert_eq!(chunk_header.duration(), Some(500.mseconds())); + } else { + assert_eq!(chunk_header.duration(), Some(1.seconds())); + } + + for buffer_idx in 0..2 { + let buffer = h.pull().unwrap(); + num_buffers += 1; + if buffer_idx == 1 || (chunk == 6 && buffer_idx == 0) { + assert_eq!( + buffer.flags(), + gst::BufferFlags::DELTA_UNIT | gst::BufferFlags::MARKER + ); + } else { + assert_eq!(buffer.flags(), gst::BufferFlags::DELTA_UNIT); + } + assert_eq!(buffer.pts(), Some(expected_ts)); + assert_eq!(buffer.dts(), Some(expected_ts)); + assert_eq!(buffer.duration(), Some(500.mseconds())); + + expected_ts += 500.mseconds(); + + // Only one buffer in this chunk + if chunk == 6 && buffer_idx == 0 { + break; + } + } + } + + h.push_event(gst::event::Eos::new()); + + // There should be two remaining chunks now, containing two 500ms buffers. + // This should start a new fragment. + for _chunk in 7..8 { + let chunk_header = h.pull().unwrap(); + assert_eq!(chunk_header.flags(), gst::BufferFlags::HEADER); + assert_eq!(chunk_header.pts(), Some(expected_ts)); + assert_eq!(chunk_header.dts(), Some(expected_ts)); + assert_eq!(chunk_header.duration(), Some(1.seconds())); + + for buffer_idx in 0..2 { + let buffer = h.pull().unwrap(); + num_buffers += 1; + if buffer_idx == 1 { + assert_eq!( + buffer.flags(), + gst::BufferFlags::DELTA_UNIT | gst::BufferFlags::MARKER + ); + } else { + assert_eq!(buffer.flags(), gst::BufferFlags::DELTA_UNIT); + } + assert_eq!(buffer.pts(), Some(expected_ts)); + assert_eq!(buffer.dts(), Some(expected_ts)); + assert_eq!(buffer.duration(), Some(500.mseconds())); + expected_ts += 500.mseconds(); + } + } + + assert_eq!(num_buffers, 15); + + let ev = h.pull_event().unwrap(); + assert_eq!(ev.type_(), gst::EventType::StreamStart); + let ev = h.pull_event().unwrap(); + assert_eq!(ev.type_(), gst::EventType::Caps); + let ev = h.pull_event().unwrap(); + assert_eq!(ev.type_(), gst::EventType::Segment); + let ev = h.pull_event().unwrap(); + assert_eq!(ev.type_(), gst::EventType::Eos); +}