From 8ebdffc68dd9727bdd54454e7e13dccbbae237fe Mon Sep 17 00:00:00 2001 From: Jochen Henneberg Date: Wed, 21 May 2025 09:26:59 +0200 Subject: [PATCH] fmp4mux: Added tests for sparse streams (late buffer, gap) Part-of: --- mux/fmp4/tests/tests.rs | 623 +++++++++++++++++++++++++++++++++++++++- 1 file changed, 617 insertions(+), 6 deletions(-) diff --git a/mux/fmp4/tests/tests.rs b/mux/fmp4/tests/tests.rs index 76c3c6d31..41eb9027f 100644 --- a/mux/fmp4/tests/tests.rs +++ b/mux/fmp4/tests/tests.rs @@ -9,6 +9,8 @@ use gst::prelude::*; +use std::thread; + fn init() { use std::sync::Once; static INIT: Once = Once::new(); @@ -46,6 +48,20 @@ fn to_completion(pipeline: &gst::Pipeline) { .expect("Unable to set the pipeline to the `Null` state"); } +fn check_fragment_header(harness: &mut gst_check::Harness) { + let buf = harness.pull().unwrap(); + assert_eq!(buf.flags(), gst::BufferFlags::HEADER); +} + +fn check_first_fragment_header(harness: &mut gst_check::Harness) { + let buf = harness.pull().unwrap(); + assert_eq!( + buf.flags(), + gst::BufferFlags::DISCONT | gst::BufferFlags::HEADER + ); + check_fragment_header(harness) +} + fn test_buffer_flags_single_stream(cmaf: bool, set_dts: bool, caps: gst::Caps) { let mut h = if cmaf { gst_check::Harness::new("cmafmux") @@ -3396,8 +3412,8 @@ fn test_multi_stream_late_key_frame() { ); h2.play(); - // 5 frames fit into the fragment, then next frame is key-frame - // outside first fragment + // 5 frames fit into the fragment +1 otherwise the fragment is + // just extended, then next frame is key-frame outside fragment for i in 0..8 { let mut buffer = gst::Buffer::with_size(1).unwrap(); { @@ -3426,12 +3442,14 @@ fn test_multi_stream_late_key_frame() { // global and fragment header and 5 from from the audio stream (no // video) because of the missing key frame - for _ in 0..7 { + check_first_fragment_header(&mut h1); + for _ in 2..7 { let _ = h1.pull().unwrap(); } assert_eq!(h1.buffers_in_queue(), 0); - // 5 frames fit into the fragment, push 6 complete the fragment, + // 5 frames fit into the fragment, +2 to add key-frame and trigger + // check. for i in 8..15 { let mut buffer = gst::Buffer::with_size(1).unwrap(); { @@ -3458,9 +3476,602 @@ fn test_multi_stream_late_key_frame() { // Crank the clock: this should bring us to the end of the 2nd fragment h1.crank_single_clock_wait().unwrap(); - // fragment header and 2x5 from both audio and video this time - for _ in 0..16 { + // fragment header + 8 audio and 7 (GOP size) video + check_fragment_header(&mut h1); + for _ in 1..16 { let _ = h1.pull().unwrap(); } assert_eq!(h1.buffers_in_queue(), 0); } + +#[test] +fn test_multi_stream_late_key_frame_skips_fragment() { + init(); + + let mut h1 = gst_check::Harness::with_padnames("isofmp4mux", Some("sink_0"), Some("src")); + let mut h2 = gst_check::Harness::with_element(&h1.element().unwrap(), Some("sink_1"), None); + + // 5s fragment duration + h1.element() + .unwrap() + .set_property("fragment-duration", 5.seconds()); + + h1.set_src_caps( + gst::Caps::builder("audio/mpeg") + .field("mpegversion", 4i32) + .field("channels", 1i32) + .field("rate", 44100i32) + .field("stream-format", "raw") + .field("base-profile", "lc") + .field("profile", "lc") + .field("level", "2") + .field( + "codec_data", + gst::Buffer::from_slice([0x12, 0x08, 0x56, 0xe5, 0x00]), + ) + .build(), + ); + h1.play(); + + h2.set_src_caps( + gst::Caps::builder("video/x-h264") + .field("width", 1920i32) + .field("height", 1080i32) + .field("framerate", gst::Fraction::new(30, 1)) + .field("stream-format", "avc") + .field("alignment", "au") + .field("codec_data", gst::Buffer::with_size(1).unwrap()) + .build(), + ); + h2.play(); + + // 2x5 frames fit into the fragment +1 otherwise the fragment is + // just extended, then next frame is key-frame outside fragment + // plus one to trigger drain + for i in 0..13 { + let mut buffer = gst::Buffer::with_size(1).unwrap(); + { + let buffer = buffer.get_mut().unwrap(); + buffer.set_pts(i.seconds()); + buffer.set_dts(i.seconds()); + buffer.set_duration(gst::ClockTime::SECOND); + } + assert_eq!(h1.push(buffer), Ok(gst::FlowSuccess::Ok)); + + let mut buffer = gst::Buffer::with_size(1).unwrap(); + { + let buffer = buffer.get_mut().unwrap(); + buffer.set_pts(i.seconds()); + buffer.set_dts(i.seconds()); + buffer.set_duration(gst::ClockTime::SECOND); + if i != 11 { + buffer.set_flags(gst::BufferFlags::DELTA_UNIT); + } + } + assert_eq!(h2.push(buffer), Ok(gst::FlowSuccess::Ok)); + } + + // Crank the clock: this should bring us to the end of the 1st fragment + h1.crank_single_clock_wait().unwrap(); + + // global and fragment header and 5 from from the audio stream (no + // video) because of the missing key frame + check_first_fragment_header(&mut h1); + for _ in 2..7 { + let _ = h1.pull().unwrap(); + } + // fragment header and 5 from from the audio stream (no video) + // because of the missing key frame + check_fragment_header(&mut h1); + for _ in 1..6 { + let _ = h1.pull().unwrap(); + } + assert_eq!(h1.buffers_in_queue(), 0); + + // 5 frames fit into the fragment, +1 to add key-frame and 1 for + // trigger + for i in 13..20 { + let mut buffer = gst::Buffer::with_size(1).unwrap(); + { + let buffer = buffer.get_mut().unwrap(); + buffer.set_pts(i.seconds()); + buffer.set_dts(i.seconds()); + buffer.set_duration(gst::ClockTime::SECOND); + } + assert_eq!(h1.push(buffer), Ok(gst::FlowSuccess::Ok)); + + let mut buffer = gst::Buffer::with_size(1).unwrap(); + { + let buffer = buffer.get_mut().unwrap(); + buffer.set_pts(i.seconds()); + buffer.set_dts(i.seconds()); + buffer.set_duration(gst::ClockTime::SECOND); + if i != 18 { + buffer.set_flags(gst::BufferFlags::DELTA_UNIT); + } + } + assert_eq!(h2.push(buffer), Ok(gst::FlowSuccess::Ok)); + } + + // Crank the clock: this should bring us to the end of the next + // fragment + h1.crank_single_clock_wait().unwrap(); + + // fragment header +7 audio and 7 (GOP size) video + check_fragment_header(&mut h1); + for _ in 1..16 { + let _ = h1.pull().unwrap(); + } + assert_eq!(h1.buffers_in_queue(), 0); +} + +#[test] +fn test_multi_stream_late_key_frame_skips_two_fragments() { + init(); + + let mut h1 = gst_check::Harness::with_padnames("isofmp4mux", Some("sink_0"), Some("src")); + let mut h2 = gst_check::Harness::with_element(&h1.element().unwrap(), Some("sink_1"), None); + + // 5s fragment duration + h1.element() + .unwrap() + .set_property("fragment-duration", 5.seconds()); + + h1.set_src_caps( + gst::Caps::builder("audio/mpeg") + .field("mpegversion", 4i32) + .field("channels", 1i32) + .field("rate", 44100i32) + .field("stream-format", "raw") + .field("base-profile", "lc") + .field("profile", "lc") + .field("level", "2") + .field( + "codec_data", + gst::Buffer::from_slice([0x12, 0x08, 0x56, 0xe5, 0x00]), + ) + .build(), + ); + h1.play(); + + h2.set_src_caps( + gst::Caps::builder("video/x-h264") + .field("width", 1920i32) + .field("height", 1080i32) + .field("framerate", gst::Fraction::new(30, 1)) + .field("stream-format", "avc") + .field("alignment", "au") + .field("codec_data", gst::Buffer::with_size(1).unwrap()) + .build(), + ); + h2.play(); + + for i in 0..18 { + let mut buffer = gst::Buffer::with_size(1).unwrap(); + { + let buffer = buffer.get_mut().unwrap(); + buffer.set_pts(i.seconds()); + buffer.set_dts(i.seconds()); + buffer.set_duration(gst::ClockTime::SECOND); + } + assert_eq!(h1.push(buffer), Ok(gst::FlowSuccess::Ok)); + + let mut buffer = gst::Buffer::with_size(1).unwrap(); + { + let buffer = buffer.get_mut().unwrap(); + buffer.set_pts(i.seconds()); + buffer.set_dts(i.seconds()); + buffer.set_duration(gst::ClockTime::SECOND); + if i != 16 { + buffer.set_flags(gst::BufferFlags::DELTA_UNIT); + } + } + assert_eq!(h2.push(buffer), Ok(gst::FlowSuccess::Ok)); + } + + // Crank the clock: this should bring us to the end of the 1st fragment + h1.crank_single_clock_wait().unwrap(); + + // global and fragment header and 5 from from the audio stream (no + // video) because of the missing key frame + check_first_fragment_header(&mut h1); + for _ in 2..7 { + let _ = h1.pull().unwrap(); + } + // fragment header and 5 from from the audio stream (no video) + // because of the missing key frame + check_fragment_header(&mut h1); + for _ in 1..6 { + let _ = h1.pull().unwrap(); + } + assert_eq!(h1.buffers_in_queue(), 6); + + // fragment header and 5 from from the audio stream (no video) + // because of the missing key frame + check_fragment_header(&mut h1); + for _ in 1..6 { + let _ = h1.pull().unwrap(); + } + assert_eq!(h1.buffers_in_queue(), 0); + + // 5 frames fit into the fragment, +1 to add key-frame and 1 for + // trigger + for i in 18..25 { + let mut buffer = gst::Buffer::with_size(1).unwrap(); + { + let buffer = buffer.get_mut().unwrap(); + buffer.set_pts(i.seconds()); + buffer.set_dts(i.seconds()); + buffer.set_duration(gst::ClockTime::SECOND); + } + assert_eq!(h1.push(buffer), Ok(gst::FlowSuccess::Ok)); + + let mut buffer = gst::Buffer::with_size(1).unwrap(); + { + let buffer = buffer.get_mut().unwrap(); + buffer.set_pts(i.seconds()); + buffer.set_dts(i.seconds()); + buffer.set_duration(gst::ClockTime::SECOND); + if i != 23 { + buffer.set_flags(gst::BufferFlags::DELTA_UNIT); + } + } + assert_eq!(h2.push(buffer), Ok(gst::FlowSuccess::Ok)); + } + + // Crank the clock: this should bring us to the end of the next + // fragment + h1.crank_single_clock_wait().unwrap(); + + // fragment header +7 audio and 7 (GOP size) video + check_fragment_header(&mut h1); + for _ in 1..16 { + let _ = h1.pull().unwrap(); + } + assert_eq!(h1.buffers_in_queue(), 0); +} + +#[test] +fn test_multi_stream_late_2nd_stream() { + init(); + + let mut h1 = gst_check::Harness::with_padnames("isofmp4mux", Some("sink_0"), Some("src")); + let mut h2 = gst_check::Harness::with_element(&h1.element().unwrap(), Some("sink_1"), None); + + // 5s fragment duration + h1.element() + .unwrap() + .set_property("fragment-duration", 2.seconds()); + + h1.set_src_caps( + gst::Caps::builder("audio/mpeg") + .field("mpegversion", 4i32) + .field("channels", 1i32) + .field("rate", 44100i32) + .field("stream-format", "raw") + .field("base-profile", "lc") + .field("profile", "lc") + .field("level", "2") + .field( + "codec_data", + gst::Buffer::from_slice([0x12, 0x08, 0x56, 0xe5, 0x00]), + ) + .build(), + ); + h1.play(); + + h2.set_src_caps( + gst::Caps::builder("video/x-h264") + .field("width", 1920i32) + .field("height", 1080i32) + .field("framerate", gst::Fraction::new(30, 1)) + .field("stream-format", "avc") + .field("alignment", "au") + .field("codec_data", gst::Buffer::with_size(1).unwrap()) + .build(), + ); + h2.play(); + + for i in 0..12 { + let mut buffer = gst::Buffer::with_size(1).unwrap(); + { + let buffer = buffer.get_mut().unwrap(); + buffer.set_pts((i * 500).mseconds()); + buffer.set_dts((i * 500).mseconds()); + buffer.set_duration(500.mseconds()); + } + assert_eq!(h1.push(buffer), Ok(gst::FlowSuccess::Ok)); + + // Only audio for 1st fragment + if i < 4 { + // Fire some GAP events to keep GstAggregator running + if i == 0 || i == 2 { + let ev = gst::event::Gap::builder((i * 500).mseconds()) + .duration(1.seconds()) + .build(); + assert!(h2.push_event(ev)); + } + continue; + } + + let mut buffer = gst::Buffer::with_size(1).unwrap(); + { + let buffer = buffer.get_mut().unwrap(); + buffer.set_pts((i * 500).mseconds()); + buffer.set_dts((i * 500).mseconds()); + buffer.set_duration(500.mseconds()); + if i != 4 && i != 8 { + buffer.set_flags(gst::BufferFlags::DELTA_UNIT); + } + } + assert_eq!(h2.push(buffer), Ok(gst::FlowSuccess::Ok)); + } + + // 1st buffer with 4 x audio buffers only and 2 headers + h1.crank_single_clock_wait().unwrap(); + check_first_fragment_header(&mut h1); + for _ in 2..6 { + let _ = h1.pull().unwrap(); + } + + // 2nd fragment contains 4 x audio and 4 x video buffers + header + h1.crank_single_clock_wait().unwrap(); + check_fragment_header(&mut h1); + for _ in 1..9 { + let _ = h1.pull().unwrap(); + } + + // Get the remaining 4 x audio plus 4 x video with header + h1.push_event(gst::event::Eos::new()); + h2.push_event(gst::event::Eos::new()); + check_fragment_header(&mut h1); + for _ in 1..9 { + let _ = h1.pull().unwrap(); + } + + assert_eq!(h1.buffers_in_queue(), 0); +} + +fn test_late_key_frame_sparse(offset: u64, multi_stream: bool, gap_buffer: bool) { + init(); + + let mut h1 = gst_check::Harness::with_padnames("isofmp4mux", Some("sink_0"), Some("src")); + let mut h2 = if multi_stream { + let h = gst_check::Harness::with_element(&h1.element().unwrap(), Some("sink_1"), None); + Some(h) + } else { + None + }; + + let frag_duration = 2000; + let buffer_duration = 500; + + // 2s fragment duration + h1.element() + .unwrap() + .set_property("fragment-duration", frag_duration.mseconds()); + + h1.set_src_caps( + gst::Caps::builder("video/x-h264") + .field("width", 1920i32) + .field("height", 1080i32) + .field("framerate", gst::Fraction::new(30, 1)) + .field("stream-format", "avc") + .field("alignment", "au") + .field("codec_data", gst::Buffer::with_size(1).unwrap()) + .build(), + ); + h1.play(); + + if let Some(ref mut h) = h2 { + h.set_src_caps( + gst::Caps::builder("audio/mpeg") + .field("mpegversion", 4i32) + .field("channels", 1i32) + .field("rate", 44100i32) + .field("stream-format", "raw") + .field("base-profile", "lc") + .field("profile", "lc") + .field("level", "2") + .field( + "codec_data", + gst::Buffer::from_slice([0x12, 0x08, 0x56, 0xe5, 0x00]), + ) + .build(), + ); + h.play(); + } + + let mut pts = 0; + + // calculate fragment buffers from headers + video buffers + audio + // buffers + let buffers_per_frag = frag_duration / buffer_duration; + let skip = offset / buffer_duration - 1; + let mut skipped = skip; + + // first fragment with 2 headers, subsequent fragments have only one + let (frag_1_size, frag_2_size, frag_3_size) = + match (offset < frag_duration, multi_stream, gap_buffer) { + // First fragment standard, both video and audio filled + // and two headers, next fragment with one header and as + // many audio buffers as video buffers are skipped, 3rd + // fragment standard with audio and video again + (true, true, true) => ( + 2 + 2 * buffers_per_frag, + 1 + skip + 1, + Some(1 + 2 * buffers_per_frag), + ), + (false, false, true) => (2 + buffers_per_frag, 1 + buffers_per_frag, None), + // Here -1 because the fragment is too large and then + // fmp4mux does not wait for the audio buffer when the + // video buffer already arrived to close the first GOP + (false, true, false) => (2 + 3 * buffers_per_frag, 1 + 2 * buffers_per_frag, None), + // The fragment is extended until the GOP is closed, means + // 1.5 x GOP (6) audio buffers and 4 (GOP) video buffers + // plus 2 headers for first fragment, then a full fragment + // afterwards plus header. + (true, true, false) => (2 + 2 * buffers_per_frag + 2, 1 + 2 * buffers_per_frag, None), + (false, true, true) => ( + 2 + 2 * buffers_per_frag, + 1 + buffers_per_frag, + Some(1 + 2 * buffers_per_frag), + ), + (_, _, _) => (2 + buffers_per_frag, 1 + buffers_per_frag, None), + }; + + let n_bufs = 3 * buffers_per_frag + 3; + // Make sure that the test is not blocking on buffer push() while + // waiting for output to be pulled but this is not possible + // because the clock is waiting until advanced, this situation may + // happen in GstAggregator + let clk = h1.testclock().unwrap(); + + // 0 is idle, 1 is crank, 2 is quit + let crank_trigger = std::sync::Arc::new((std::sync::Mutex::new(0), std::sync::Condvar::new())); + let crank_trigger2 = std::sync::Arc::clone(&crank_trigger); + let fwd_clock = thread::spawn(move || { + let (lock, cond) = &*crank_trigger; + loop { + let mut state = lock.lock().unwrap(); + if *state == 0 { + state = cond.wait(state).unwrap(); + } + match *state { + 1 => { + *state = 0; + drop(state); + if !gap_buffer { + clk.wait_for_next_pending_id(); + clk.crank(); + } + } + 2 => break, + _ => unreachable!(), + } + } + }); + + for i in 0..n_bufs { + { + let (lock, cond) = &*crank_trigger2; + let mut state = lock.lock().unwrap(); + *state = 1; + cond.notify_one(); + } + + if let Some(ref mut h) = h2 { + let mut buffer = gst::Buffer::with_size(1).unwrap(); + { + let buffer = buffer.get_mut().unwrap(); + buffer.set_duration(buffer_duration.mseconds()); + buffer.set_pts((buffer_duration * i).mseconds()); + } + assert_eq!(h.push(buffer), Ok(gst::FlowSuccess::Ok)); + } + + let mut buffer = gst::Buffer::with_size(1).unwrap(); + { + let buffer = buffer.get_mut().unwrap(); + buffer.set_duration(buffer_duration.mseconds()); + + match i - (skip - skipped) { + 0 | 5 | 9 | 13 => buffer.set_pts(pts.mseconds()), + 1..=3 | 6..=8 | 10..=12 | 14..=16 => { + buffer.set_pts(pts.mseconds()); + buffer.set_flags(gst::BufferFlags::DELTA_UNIT); + } + 4 => { + if gap_buffer { + let ev = gst::event::Gap::builder(pts.mseconds()) + .duration(buffer_duration.mseconds()) + .build(); + assert!(h1.push_event(ev)); + } + pts += buffer_duration; + skipped = skipped.saturating_sub(1); + continue; + } + _ => unreachable!(), + } + + buffer.set_dts(buffer.pts()); + } + + assert_eq!(h1.push(buffer), Ok(gst::FlowSuccess::Ok)); + pts += buffer_duration; + } + + { + let (lock, cond) = &*crank_trigger2; + let mut state = lock.lock().unwrap(); + *state = 2; + cond.notify_one(); + } + fwd_clock.join().unwrap(); + + h1.crank_single_clock_wait().unwrap(); + check_first_fragment_header(&mut h1); + for _ in 2..frag_1_size { + let _ = h1.pull().unwrap(); + } + + h1.crank_single_clock_wait().unwrap(); + check_fragment_header(&mut h1); + for _ in 1..frag_2_size { + let _ = h1.pull().unwrap(); + } + + if let Some(frag_size) = frag_3_size { + h1.crank_single_clock_wait().unwrap(); + check_fragment_header(&mut h1); + for _ in 1..frag_size { + let _ = h1.pull().unwrap(); + } + } + + h1.push_event(gst::event::Eos::new()); + if let Some(mut h) = h2 { + h.push_event(gst::event::Eos::new()); + } +} + +#[test] +fn test_single_stream_late_key_frame_sparse() { + test_late_key_frame_sparse(1_000, false, false) +} + +#[test] +fn test_single_stream_late_key_frame_sparse_gap() { + test_late_key_frame_sparse(1_000, false, true) +} + +#[test] +fn test_multi_stream_late_key_frame_sparse() { + test_late_key_frame_sparse(1_000, true, false) +} + +#[test] +fn test_multi_stream_late_key_frame_sparse_gap() { + test_late_key_frame_sparse(1_000, true, true) +} + +#[test] +fn test_single_stream_late_key_frame_sparse_on_frag_boundary() { + test_late_key_frame_sparse(2_000, false, false) +} + +#[test] +fn test_single_stream_late_key_frame_sparse_on_frag_boundary_gap() { + test_late_key_frame_sparse(2_000, false, true) +} + +#[test] +fn test_multi_stream_late_key_frame_sparse_on_frag_boundary() { + test_late_key_frame_sparse(2_000, true, false) +} + +#[test] +fn test_multi_stream_late_key_frame_sparse_on_frag_boundary_gap() { + test_late_key_frame_sparse(2_000, true, true) +}