fmp4mux: Added tests for sparse streams (late buffer, gap)

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/2236>
This commit is contained in:
Jochen Henneberg 2025-05-21 09:26:59 +02:00 committed by GStreamer Marge Bot
parent c3740cd6a9
commit 8ebdffc68d

View file

@ -9,6 +9,8 @@
use gst::prelude::*;
use std::thread;
fn init() {
use std::sync::Once;
static INIT: Once = Once::new();
@ -46,6 +48,20 @@ fn to_completion(pipeline: &gst::Pipeline) {
.expect("Unable to set the pipeline to the `Null` state");
}
fn check_fragment_header(harness: &mut gst_check::Harness) {
let buf = harness.pull().unwrap();
assert_eq!(buf.flags(), gst::BufferFlags::HEADER);
}
fn check_first_fragment_header(harness: &mut gst_check::Harness) {
let buf = harness.pull().unwrap();
assert_eq!(
buf.flags(),
gst::BufferFlags::DISCONT | gst::BufferFlags::HEADER
);
check_fragment_header(harness)
}
fn test_buffer_flags_single_stream(cmaf: bool, set_dts: bool, caps: gst::Caps) {
let mut h = if cmaf {
gst_check::Harness::new("cmafmux")
@ -3396,8 +3412,8 @@ fn test_multi_stream_late_key_frame() {
);
h2.play();
// 5 frames fit into the fragment, then next frame is key-frame
// outside first fragment
// 5 frames fit into the fragment +1 otherwise the fragment is
// just extended, then next frame is key-frame outside fragment
for i in 0..8 {
let mut buffer = gst::Buffer::with_size(1).unwrap();
{
@ -3426,12 +3442,14 @@ fn test_multi_stream_late_key_frame() {
// global and fragment header and 5 from from the audio stream (no
// video) because of the missing key frame
for _ in 0..7 {
check_first_fragment_header(&mut h1);
for _ in 2..7 {
let _ = h1.pull().unwrap();
}
assert_eq!(h1.buffers_in_queue(), 0);
// 5 frames fit into the fragment, push 6 complete the fragment,
// 5 frames fit into the fragment, +2 to add key-frame and trigger
// check.
for i in 8..15 {
let mut buffer = gst::Buffer::with_size(1).unwrap();
{
@ -3458,9 +3476,602 @@ fn test_multi_stream_late_key_frame() {
// Crank the clock: this should bring us to the end of the 2nd fragment
h1.crank_single_clock_wait().unwrap();
// fragment header and 2x5 from both audio and video this time
for _ in 0..16 {
// fragment header + 8 audio and 7 (GOP size) video
check_fragment_header(&mut h1);
for _ in 1..16 {
let _ = h1.pull().unwrap();
}
assert_eq!(h1.buffers_in_queue(), 0);
}
#[test]
fn test_multi_stream_late_key_frame_skips_fragment() {
init();
let mut h1 = gst_check::Harness::with_padnames("isofmp4mux", Some("sink_0"), Some("src"));
let mut h2 = gst_check::Harness::with_element(&h1.element().unwrap(), Some("sink_1"), None);
// 5s fragment duration
h1.element()
.unwrap()
.set_property("fragment-duration", 5.seconds());
h1.set_src_caps(
gst::Caps::builder("audio/mpeg")
.field("mpegversion", 4i32)
.field("channels", 1i32)
.field("rate", 44100i32)
.field("stream-format", "raw")
.field("base-profile", "lc")
.field("profile", "lc")
.field("level", "2")
.field(
"codec_data",
gst::Buffer::from_slice([0x12, 0x08, 0x56, 0xe5, 0x00]),
)
.build(),
);
h1.play();
h2.set_src_caps(
gst::Caps::builder("video/x-h264")
.field("width", 1920i32)
.field("height", 1080i32)
.field("framerate", gst::Fraction::new(30, 1))
.field("stream-format", "avc")
.field("alignment", "au")
.field("codec_data", gst::Buffer::with_size(1).unwrap())
.build(),
);
h2.play();
// 2x5 frames fit into the fragment +1 otherwise the fragment is
// just extended, then next frame is key-frame outside fragment
// plus one to trigger drain
for i in 0..13 {
let mut buffer = gst::Buffer::with_size(1).unwrap();
{
let buffer = buffer.get_mut().unwrap();
buffer.set_pts(i.seconds());
buffer.set_dts(i.seconds());
buffer.set_duration(gst::ClockTime::SECOND);
}
assert_eq!(h1.push(buffer), Ok(gst::FlowSuccess::Ok));
let mut buffer = gst::Buffer::with_size(1).unwrap();
{
let buffer = buffer.get_mut().unwrap();
buffer.set_pts(i.seconds());
buffer.set_dts(i.seconds());
buffer.set_duration(gst::ClockTime::SECOND);
if i != 11 {
buffer.set_flags(gst::BufferFlags::DELTA_UNIT);
}
}
assert_eq!(h2.push(buffer), Ok(gst::FlowSuccess::Ok));
}
// Crank the clock: this should bring us to the end of the 1st fragment
h1.crank_single_clock_wait().unwrap();
// global and fragment header and 5 from from the audio stream (no
// video) because of the missing key frame
check_first_fragment_header(&mut h1);
for _ in 2..7 {
let _ = h1.pull().unwrap();
}
// fragment header and 5 from from the audio stream (no video)
// because of the missing key frame
check_fragment_header(&mut h1);
for _ in 1..6 {
let _ = h1.pull().unwrap();
}
assert_eq!(h1.buffers_in_queue(), 0);
// 5 frames fit into the fragment, +1 to add key-frame and 1 for
// trigger
for i in 13..20 {
let mut buffer = gst::Buffer::with_size(1).unwrap();
{
let buffer = buffer.get_mut().unwrap();
buffer.set_pts(i.seconds());
buffer.set_dts(i.seconds());
buffer.set_duration(gst::ClockTime::SECOND);
}
assert_eq!(h1.push(buffer), Ok(gst::FlowSuccess::Ok));
let mut buffer = gst::Buffer::with_size(1).unwrap();
{
let buffer = buffer.get_mut().unwrap();
buffer.set_pts(i.seconds());
buffer.set_dts(i.seconds());
buffer.set_duration(gst::ClockTime::SECOND);
if i != 18 {
buffer.set_flags(gst::BufferFlags::DELTA_UNIT);
}
}
assert_eq!(h2.push(buffer), Ok(gst::FlowSuccess::Ok));
}
// Crank the clock: this should bring us to the end of the next
// fragment
h1.crank_single_clock_wait().unwrap();
// fragment header +7 audio and 7 (GOP size) video
check_fragment_header(&mut h1);
for _ in 1..16 {
let _ = h1.pull().unwrap();
}
assert_eq!(h1.buffers_in_queue(), 0);
}
#[test]
fn test_multi_stream_late_key_frame_skips_two_fragments() {
init();
let mut h1 = gst_check::Harness::with_padnames("isofmp4mux", Some("sink_0"), Some("src"));
let mut h2 = gst_check::Harness::with_element(&h1.element().unwrap(), Some("sink_1"), None);
// 5s fragment duration
h1.element()
.unwrap()
.set_property("fragment-duration", 5.seconds());
h1.set_src_caps(
gst::Caps::builder("audio/mpeg")
.field("mpegversion", 4i32)
.field("channels", 1i32)
.field("rate", 44100i32)
.field("stream-format", "raw")
.field("base-profile", "lc")
.field("profile", "lc")
.field("level", "2")
.field(
"codec_data",
gst::Buffer::from_slice([0x12, 0x08, 0x56, 0xe5, 0x00]),
)
.build(),
);
h1.play();
h2.set_src_caps(
gst::Caps::builder("video/x-h264")
.field("width", 1920i32)
.field("height", 1080i32)
.field("framerate", gst::Fraction::new(30, 1))
.field("stream-format", "avc")
.field("alignment", "au")
.field("codec_data", gst::Buffer::with_size(1).unwrap())
.build(),
);
h2.play();
for i in 0..18 {
let mut buffer = gst::Buffer::with_size(1).unwrap();
{
let buffer = buffer.get_mut().unwrap();
buffer.set_pts(i.seconds());
buffer.set_dts(i.seconds());
buffer.set_duration(gst::ClockTime::SECOND);
}
assert_eq!(h1.push(buffer), Ok(gst::FlowSuccess::Ok));
let mut buffer = gst::Buffer::with_size(1).unwrap();
{
let buffer = buffer.get_mut().unwrap();
buffer.set_pts(i.seconds());
buffer.set_dts(i.seconds());
buffer.set_duration(gst::ClockTime::SECOND);
if i != 16 {
buffer.set_flags(gst::BufferFlags::DELTA_UNIT);
}
}
assert_eq!(h2.push(buffer), Ok(gst::FlowSuccess::Ok));
}
// Crank the clock: this should bring us to the end of the 1st fragment
h1.crank_single_clock_wait().unwrap();
// global and fragment header and 5 from from the audio stream (no
// video) because of the missing key frame
check_first_fragment_header(&mut h1);
for _ in 2..7 {
let _ = h1.pull().unwrap();
}
// fragment header and 5 from from the audio stream (no video)
// because of the missing key frame
check_fragment_header(&mut h1);
for _ in 1..6 {
let _ = h1.pull().unwrap();
}
assert_eq!(h1.buffers_in_queue(), 6);
// fragment header and 5 from from the audio stream (no video)
// because of the missing key frame
check_fragment_header(&mut h1);
for _ in 1..6 {
let _ = h1.pull().unwrap();
}
assert_eq!(h1.buffers_in_queue(), 0);
// 5 frames fit into the fragment, +1 to add key-frame and 1 for
// trigger
for i in 18..25 {
let mut buffer = gst::Buffer::with_size(1).unwrap();
{
let buffer = buffer.get_mut().unwrap();
buffer.set_pts(i.seconds());
buffer.set_dts(i.seconds());
buffer.set_duration(gst::ClockTime::SECOND);
}
assert_eq!(h1.push(buffer), Ok(gst::FlowSuccess::Ok));
let mut buffer = gst::Buffer::with_size(1).unwrap();
{
let buffer = buffer.get_mut().unwrap();
buffer.set_pts(i.seconds());
buffer.set_dts(i.seconds());
buffer.set_duration(gst::ClockTime::SECOND);
if i != 23 {
buffer.set_flags(gst::BufferFlags::DELTA_UNIT);
}
}
assert_eq!(h2.push(buffer), Ok(gst::FlowSuccess::Ok));
}
// Crank the clock: this should bring us to the end of the next
// fragment
h1.crank_single_clock_wait().unwrap();
// fragment header +7 audio and 7 (GOP size) video
check_fragment_header(&mut h1);
for _ in 1..16 {
let _ = h1.pull().unwrap();
}
assert_eq!(h1.buffers_in_queue(), 0);
}
#[test]
fn test_multi_stream_late_2nd_stream() {
init();
let mut h1 = gst_check::Harness::with_padnames("isofmp4mux", Some("sink_0"), Some("src"));
let mut h2 = gst_check::Harness::with_element(&h1.element().unwrap(), Some("sink_1"), None);
// 5s fragment duration
h1.element()
.unwrap()
.set_property("fragment-duration", 2.seconds());
h1.set_src_caps(
gst::Caps::builder("audio/mpeg")
.field("mpegversion", 4i32)
.field("channels", 1i32)
.field("rate", 44100i32)
.field("stream-format", "raw")
.field("base-profile", "lc")
.field("profile", "lc")
.field("level", "2")
.field(
"codec_data",
gst::Buffer::from_slice([0x12, 0x08, 0x56, 0xe5, 0x00]),
)
.build(),
);
h1.play();
h2.set_src_caps(
gst::Caps::builder("video/x-h264")
.field("width", 1920i32)
.field("height", 1080i32)
.field("framerate", gst::Fraction::new(30, 1))
.field("stream-format", "avc")
.field("alignment", "au")
.field("codec_data", gst::Buffer::with_size(1).unwrap())
.build(),
);
h2.play();
for i in 0..12 {
let mut buffer = gst::Buffer::with_size(1).unwrap();
{
let buffer = buffer.get_mut().unwrap();
buffer.set_pts((i * 500).mseconds());
buffer.set_dts((i * 500).mseconds());
buffer.set_duration(500.mseconds());
}
assert_eq!(h1.push(buffer), Ok(gst::FlowSuccess::Ok));
// Only audio for 1st fragment
if i < 4 {
// Fire some GAP events to keep GstAggregator running
if i == 0 || i == 2 {
let ev = gst::event::Gap::builder((i * 500).mseconds())
.duration(1.seconds())
.build();
assert!(h2.push_event(ev));
}
continue;
}
let mut buffer = gst::Buffer::with_size(1).unwrap();
{
let buffer = buffer.get_mut().unwrap();
buffer.set_pts((i * 500).mseconds());
buffer.set_dts((i * 500).mseconds());
buffer.set_duration(500.mseconds());
if i != 4 && i != 8 {
buffer.set_flags(gst::BufferFlags::DELTA_UNIT);
}
}
assert_eq!(h2.push(buffer), Ok(gst::FlowSuccess::Ok));
}
// 1st buffer with 4 x audio buffers only and 2 headers
h1.crank_single_clock_wait().unwrap();
check_first_fragment_header(&mut h1);
for _ in 2..6 {
let _ = h1.pull().unwrap();
}
// 2nd fragment contains 4 x audio and 4 x video buffers + header
h1.crank_single_clock_wait().unwrap();
check_fragment_header(&mut h1);
for _ in 1..9 {
let _ = h1.pull().unwrap();
}
// Get the remaining 4 x audio plus 4 x video with header
h1.push_event(gst::event::Eos::new());
h2.push_event(gst::event::Eos::new());
check_fragment_header(&mut h1);
for _ in 1..9 {
let _ = h1.pull().unwrap();
}
assert_eq!(h1.buffers_in_queue(), 0);
}
fn test_late_key_frame_sparse(offset: u64, multi_stream: bool, gap_buffer: bool) {
init();
let mut h1 = gst_check::Harness::with_padnames("isofmp4mux", Some("sink_0"), Some("src"));
let mut h2 = if multi_stream {
let h = gst_check::Harness::with_element(&h1.element().unwrap(), Some("sink_1"), None);
Some(h)
} else {
None
};
let frag_duration = 2000;
let buffer_duration = 500;
// 2s fragment duration
h1.element()
.unwrap()
.set_property("fragment-duration", frag_duration.mseconds());
h1.set_src_caps(
gst::Caps::builder("video/x-h264")
.field("width", 1920i32)
.field("height", 1080i32)
.field("framerate", gst::Fraction::new(30, 1))
.field("stream-format", "avc")
.field("alignment", "au")
.field("codec_data", gst::Buffer::with_size(1).unwrap())
.build(),
);
h1.play();
if let Some(ref mut h) = h2 {
h.set_src_caps(
gst::Caps::builder("audio/mpeg")
.field("mpegversion", 4i32)
.field("channels", 1i32)
.field("rate", 44100i32)
.field("stream-format", "raw")
.field("base-profile", "lc")
.field("profile", "lc")
.field("level", "2")
.field(
"codec_data",
gst::Buffer::from_slice([0x12, 0x08, 0x56, 0xe5, 0x00]),
)
.build(),
);
h.play();
}
let mut pts = 0;
// calculate fragment buffers from headers + video buffers + audio
// buffers
let buffers_per_frag = frag_duration / buffer_duration;
let skip = offset / buffer_duration - 1;
let mut skipped = skip;
// first fragment with 2 headers, subsequent fragments have only one
let (frag_1_size, frag_2_size, frag_3_size) =
match (offset < frag_duration, multi_stream, gap_buffer) {
// First fragment standard, both video and audio filled
// and two headers, next fragment with one header and as
// many audio buffers as video buffers are skipped, 3rd
// fragment standard with audio and video again
(true, true, true) => (
2 + 2 * buffers_per_frag,
1 + skip + 1,
Some(1 + 2 * buffers_per_frag),
),
(false, false, true) => (2 + buffers_per_frag, 1 + buffers_per_frag, None),
// Here -1 because the fragment is too large and then
// fmp4mux does not wait for the audio buffer when the
// video buffer already arrived to close the first GOP
(false, true, false) => (2 + 3 * buffers_per_frag, 1 + 2 * buffers_per_frag, None),
// The fragment is extended until the GOP is closed, means
// 1.5 x GOP (6) audio buffers and 4 (GOP) video buffers
// plus 2 headers for first fragment, then a full fragment
// afterwards plus header.
(true, true, false) => (2 + 2 * buffers_per_frag + 2, 1 + 2 * buffers_per_frag, None),
(false, true, true) => (
2 + 2 * buffers_per_frag,
1 + buffers_per_frag,
Some(1 + 2 * buffers_per_frag),
),
(_, _, _) => (2 + buffers_per_frag, 1 + buffers_per_frag, None),
};
let n_bufs = 3 * buffers_per_frag + 3;
// Make sure that the test is not blocking on buffer push() while
// waiting for output to be pulled but this is not possible
// because the clock is waiting until advanced, this situation may
// happen in GstAggregator
let clk = h1.testclock().unwrap();
// 0 is idle, 1 is crank, 2 is quit
let crank_trigger = std::sync::Arc::new((std::sync::Mutex::new(0), std::sync::Condvar::new()));
let crank_trigger2 = std::sync::Arc::clone(&crank_trigger);
let fwd_clock = thread::spawn(move || {
let (lock, cond) = &*crank_trigger;
loop {
let mut state = lock.lock().unwrap();
if *state == 0 {
state = cond.wait(state).unwrap();
}
match *state {
1 => {
*state = 0;
drop(state);
if !gap_buffer {
clk.wait_for_next_pending_id();
clk.crank();
}
}
2 => break,
_ => unreachable!(),
}
}
});
for i in 0..n_bufs {
{
let (lock, cond) = &*crank_trigger2;
let mut state = lock.lock().unwrap();
*state = 1;
cond.notify_one();
}
if let Some(ref mut h) = h2 {
let mut buffer = gst::Buffer::with_size(1).unwrap();
{
let buffer = buffer.get_mut().unwrap();
buffer.set_duration(buffer_duration.mseconds());
buffer.set_pts((buffer_duration * i).mseconds());
}
assert_eq!(h.push(buffer), Ok(gst::FlowSuccess::Ok));
}
let mut buffer = gst::Buffer::with_size(1).unwrap();
{
let buffer = buffer.get_mut().unwrap();
buffer.set_duration(buffer_duration.mseconds());
match i - (skip - skipped) {
0 | 5 | 9 | 13 => buffer.set_pts(pts.mseconds()),
1..=3 | 6..=8 | 10..=12 | 14..=16 => {
buffer.set_pts(pts.mseconds());
buffer.set_flags(gst::BufferFlags::DELTA_UNIT);
}
4 => {
if gap_buffer {
let ev = gst::event::Gap::builder(pts.mseconds())
.duration(buffer_duration.mseconds())
.build();
assert!(h1.push_event(ev));
}
pts += buffer_duration;
skipped = skipped.saturating_sub(1);
continue;
}
_ => unreachable!(),
}
buffer.set_dts(buffer.pts());
}
assert_eq!(h1.push(buffer), Ok(gst::FlowSuccess::Ok));
pts += buffer_duration;
}
{
let (lock, cond) = &*crank_trigger2;
let mut state = lock.lock().unwrap();
*state = 2;
cond.notify_one();
}
fwd_clock.join().unwrap();
h1.crank_single_clock_wait().unwrap();
check_first_fragment_header(&mut h1);
for _ in 2..frag_1_size {
let _ = h1.pull().unwrap();
}
h1.crank_single_clock_wait().unwrap();
check_fragment_header(&mut h1);
for _ in 1..frag_2_size {
let _ = h1.pull().unwrap();
}
if let Some(frag_size) = frag_3_size {
h1.crank_single_clock_wait().unwrap();
check_fragment_header(&mut h1);
for _ in 1..frag_size {
let _ = h1.pull().unwrap();
}
}
h1.push_event(gst::event::Eos::new());
if let Some(mut h) = h2 {
h.push_event(gst::event::Eos::new());
}
}
#[test]
fn test_single_stream_late_key_frame_sparse() {
test_late_key_frame_sparse(1_000, false, false)
}
#[test]
fn test_single_stream_late_key_frame_sparse_gap() {
test_late_key_frame_sparse(1_000, false, true)
}
#[test]
fn test_multi_stream_late_key_frame_sparse() {
test_late_key_frame_sparse(1_000, true, false)
}
#[test]
fn test_multi_stream_late_key_frame_sparse_gap() {
test_late_key_frame_sparse(1_000, true, true)
}
#[test]
fn test_single_stream_late_key_frame_sparse_on_frag_boundary() {
test_late_key_frame_sparse(2_000, false, false)
}
#[test]
fn test_single_stream_late_key_frame_sparse_on_frag_boundary_gap() {
test_late_key_frame_sparse(2_000, false, true)
}
#[test]
fn test_multi_stream_late_key_frame_sparse_on_frag_boundary() {
test_late_key_frame_sparse(2_000, true, false)
}
#[test]
fn test_multi_stream_late_key_frame_sparse_on_frag_boundary_gap() {
test_late_key_frame_sparse(2_000, true, true)
}