From 987e4efc020b8d3244e607437b3e1a0f8f93bb3f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Dr=C3=B6ge?= Date: Tue, 17 May 2022 15:05:19 +0300 Subject: [PATCH] fmp4mux: In live pipelines use the current fragment end time as timeout This allows muxing even if some streams are sparse or have big gaps. --- generic/fmp4/src/fmp4mux/imp.rs | 21 ++- generic/fmp4/tests/tests.rs | 241 ++++++++++++++++++++++++++++++++ 2 files changed, 256 insertions(+), 6 deletions(-) diff --git a/generic/fmp4/src/fmp4mux/imp.rs b/generic/fmp4/src/fmp4mux/imp.rs index 399047be..e08ed50e 100644 --- a/generic/fmp4/src/fmp4mux/imp.rs +++ b/generic/fmp4/src/fmp4mux/imp.rs @@ -500,12 +500,15 @@ impl FMP4Mux { element: &super::FMP4Mux, state: &mut State, settings: &Settings, + timeout: bool, at_eos: bool, ) -> Result, gst::FlowError> { let class = element.class(); if at_eos { gst::info!(CAT, obj: element, "Draining at EOS"); + } else if timeout { + gst::info!(CAT, obj: element, "Draining at timeout"); } else { for stream in &state.streams { if !stream.fragment_filled && !stream.sinkpad.is_eos() { @@ -525,7 +528,8 @@ impl FMP4Mux { for stream in &mut state.streams { assert!( - at_eos + timeout + || at_eos || stream.sinkpad.is_eos() || stream.queued_gops.get(1).map(|gop| gop.final_earliest_pts) == Some(true) ); @@ -540,7 +544,7 @@ impl FMP4Mux { let fragment_start_pts = state.fragment_start_pts.unwrap(); while let Some(gop) = stream.queued_gops.pop_back() { - assert!(gop.final_end_pts); + assert!(timeout || gop.final_end_pts); let end_pts = gop.end_pts; gops.push(gop); @@ -674,7 +678,7 @@ impl FMP4Mux { let mut buffer_list = None; if interleaved_buffers.is_empty() { - assert!(at_eos); + assert!(timeout || at_eos); } else { let min_earliest_pts_position = min_earliest_pts_position.unwrap(); let min_earliest_pts = min_earliest_pts.unwrap(); @@ -1181,6 +1185,11 @@ impl ElementImpl for FMP4Mux { } impl AggregatorImpl for FMP4Mux { + fn next_time(&self, _aggregator: &Self::Type) -> Option { + let state = self.state.lock().unwrap(); + state.fragment_start_pts + } + fn sink_query( &self, aggregator: &Self::Type, @@ -1365,7 +1374,7 @@ impl AggregatorImpl for FMP4Mux { fn aggregate( &self, aggregator: &Self::Type, - _timeout: bool, + timeout: bool, ) -> Result { let settings = self.settings.lock().unwrap().clone(); @@ -1468,7 +1477,7 @@ impl AggregatorImpl for FMP4Mux { break; } Some(oldest_gop) => { - if !oldest_gop.final_earliest_pts { + if !timeout && !oldest_gop.final_earliest_pts { earliest_pts = None; break; } @@ -1517,7 +1526,7 @@ impl AggregatorImpl for FMP4Mux { } // If enough GOPs were queued, drain and create the output fragment - self.drain(aggregator, &mut state, &settings, all_eos)? + self.drain(aggregator, &mut state, &settings, timeout, all_eos)? }; for (sinkpad, event) in upstream_events { diff --git a/generic/fmp4/tests/tests.rs b/generic/fmp4/tests/tests.rs index 9e4aa6f5..d21cc7b4 100644 --- a/generic/fmp4/tests/tests.rs +++ b/generic/fmp4/tests/tests.rs @@ -403,3 +403,244 @@ fn test_buffer_flags_multi_stream() { let ev = h1.pull_event().unwrap(); assert_eq!(ev.type_(), gst::EventType::Eos); } + +#[test] +fn test_live_timeout() { + init(); + + let mut h1 = gst_check::Harness::with_padnames("isofmp4mux", Some("sink_0"), Some("src")); + let mut h2 = gst_check::Harness::with_element(&h1.element().unwrap(), Some("sink_1"), None); + + h1.use_testclock(); + + // 5s fragment duration + h1.element() + .unwrap() + .set_property("fragment-duration", gst::ClockTime::from_seconds(5)); + + h1.set_src_caps( + gst::Caps::builder("video/x-h264") + .field("width", 1920i32) + .field("height", 1080i32) + .field("framerate", gst::Fraction::new(30, 1)) + .field("stream-format", "avc") + .field("alignment", "au") + .field("codec_data", gst::Buffer::with_size(1).unwrap()) + .build(), + ); + h1.play(); + + h2.set_src_caps( + gst::Caps::builder("audio/mpeg") + .field("mpegversion", 4i32) + .field("channels", 1i32) + .field("rate", 44100i32) + .field("stream-format", "raw") + .field("base-profile", "lc") + .field("profile", "lc") + .field("level", "2") + .field( + "codec_data", + gst::Buffer::from_slice([0x12, 0x08, 0x56, 0xe5, 0x00]), + ) + .build(), + ); + h2.play(); + + let output_offset = gst::ClockTime::from_seconds(60 * 60 * 1000); + + // Push 7 buffers of 1s each, 1st and last buffer without DELTA_UNIT flag + for i in 0..7 { + let mut buffer = gst::Buffer::with_size(1).unwrap(); + { + let buffer = buffer.get_mut().unwrap(); + buffer.set_pts(gst::ClockTime::from_seconds(i)); + buffer.set_dts(gst::ClockTime::from_seconds(i)); + buffer.set_duration(gst::ClockTime::SECOND); + if i != 0 && i != 5 { + buffer.set_flags(gst::BufferFlags::DELTA_UNIT); + } + } + assert_eq!(h1.push(buffer), Ok(gst::FlowSuccess::Ok)); + + // Skip buffer 4th and 6th buffer (end of fragment / stream) + if i == 4 || i == 6 { + continue; + } else { + let mut buffer = gst::Buffer::with_size(1).unwrap(); + { + let buffer = buffer.get_mut().unwrap(); + buffer.set_pts(gst::ClockTime::from_seconds(i)); + buffer.set_dts(gst::ClockTime::from_seconds(i)); + buffer.set_duration(gst::ClockTime::SECOND); + } + assert_eq!(h2.push(buffer), Ok(gst::FlowSuccess::Ok)); + } + + if i == 2 { + let ev = loop { + let ev = h1.pull_upstream_event().unwrap(); + if ev.type_() != gst::EventType::Reconfigure + && ev.type_() != gst::EventType::Latency + { + break ev; + } + }; + + assert_eq!(ev.type_(), gst::EventType::CustomUpstream); + assert_eq!( + gst_video::UpstreamForceKeyUnitEvent::parse(&ev).unwrap(), + gst_video::UpstreamForceKeyUnitEvent { + running_time: Some(gst::ClockTime::from_seconds(5)), + all_headers: true, + count: 0 + } + ); + + let ev = loop { + let ev = h2.pull_upstream_event().unwrap(); + if ev.type_() != gst::EventType::Reconfigure + && ev.type_() != gst::EventType::Latency + { + break ev; + } + }; + + assert_eq!(ev.type_(), gst::EventType::CustomUpstream); + assert_eq!( + gst_video::UpstreamForceKeyUnitEvent::parse(&ev).unwrap(), + gst_video::UpstreamForceKeyUnitEvent { + running_time: Some(gst::ClockTime::from_seconds(5)), + all_headers: true, + count: 0 + } + ); + } + } + + // Advance time and crank the clock: this should bring us to the end of the first fragment + h1.set_time(gst::ClockTime::from_seconds(5)).unwrap(); + h1.crank_single_clock_wait().unwrap(); + + let header = h1.pull().unwrap(); + assert_eq!( + header.flags(), + gst::BufferFlags::HEADER | gst::BufferFlags::DISCONT + ); + assert_eq!(header.pts(), Some(gst::ClockTime::ZERO + output_offset)); + assert_eq!(header.dts(), Some(gst::ClockTime::ZERO + output_offset)); + + let fragment_header = h1.pull().unwrap(); + assert_eq!(fragment_header.flags(), gst::BufferFlags::HEADER); + assert_eq!( + fragment_header.pts(), + Some(gst::ClockTime::ZERO + output_offset) + ); + assert_eq!( + fragment_header.dts(), + Some(gst::ClockTime::ZERO + output_offset) + ); + assert_eq!( + fragment_header.duration(), + Some(gst::ClockTime::from_seconds(5)) + ); + + for i in 0..5 { + for j in 0..2 { + // Skip gap events that don't result in buffers + if j == 1 && i == 4 { + // Advance time and crank the clock another time. This brings us at the end of the + // EOS. + h1.set_time(gst::ClockTime::from_seconds(7)).unwrap(); + h1.crank_single_clock_wait().unwrap(); + continue; + } + + let buffer = h1.pull().unwrap(); + if i == 4 && j == 0 { + assert_eq!( + buffer.flags(), + gst::BufferFlags::DELTA_UNIT | gst::BufferFlags::MARKER + ); + } else if i == 5 && j == 0 { + assert_eq!(buffer.flags(), gst::BufferFlags::HEADER); + } else { + assert_eq!(buffer.flags(), gst::BufferFlags::DELTA_UNIT); + } + + assert_eq!( + buffer.pts(), + Some(gst::ClockTime::from_seconds(i) + output_offset) + ); + + if j == 0 { + assert_eq!( + buffer.dts(), + Some(gst::ClockTime::from_seconds(i) + output_offset) + ); + } else { + assert!(buffer.dts().is_none()); + } + assert_eq!(buffer.duration(), Some(gst::ClockTime::SECOND)); + } + } + + h1.push_event(gst::event::Eos::new()); + h2.push_event(gst::event::Eos::new()); + + let fragment_header = h1.pull().unwrap(); + assert_eq!(fragment_header.flags(), gst::BufferFlags::HEADER); + assert_eq!( + fragment_header.pts(), + Some(gst::ClockTime::from_seconds(5) + output_offset) + ); + assert_eq!( + fragment_header.dts(), + Some(gst::ClockTime::from_seconds(5) + output_offset) + ); + assert_eq!( + fragment_header.duration(), + Some(gst::ClockTime::from_seconds(2)) + ); + + for i in 5..7 { + for j in 0..2 { + // Skip gap events that don't result in buffers + if j == 1 && i == 6 { + continue; + } + + let buffer = h1.pull().unwrap(); + if i == 6 && j == 0 { + assert_eq!( + buffer.flags(), + gst::BufferFlags::DELTA_UNIT | gst::BufferFlags::MARKER + ); + } else { + assert_eq!(buffer.flags(), gst::BufferFlags::DELTA_UNIT); + } + assert_eq!( + buffer.pts(), + Some(gst::ClockTime::from_seconds(i) + output_offset) + ); + if j == 0 { + assert_eq!( + buffer.dts(), + Some(gst::ClockTime::from_seconds(i) + output_offset) + ); + } else { + assert!(buffer.dts().is_none()); + } + assert_eq!(buffer.duration(), Some(gst::ClockTime::SECOND)); + } + } + + let ev = h1.pull_event().unwrap(); + assert_eq!(ev.type_(), gst::EventType::StreamStart); + let ev = h1.pull_event().unwrap(); + assert_eq!(ev.type_(), gst::EventType::Caps); + let ev = h1.pull_event().unwrap(); + assert_eq!(ev.type_(), gst::EventType::Segment); + let ev = h1.pull_event().unwrap(); + assert_eq!(ev.type_(), gst::EventType::Eos); +}