From 26a6c3b7b91564e070f1bf3ca03e42b6a615ff96 Mon Sep 17 00:00:00 2001 From: Jochen Henneberg Date: Tue, 29 Oct 2024 09:19:42 +0100 Subject: [PATCH] fmp4mux: Fixed caps change handling for multiple streams * Moved FKU code into separate function. * Send FKU for each stream where the caps change happened. * Send FKU for each stream where an incomplete GOP has been pushed on caps change. * Push incomplete GOP from stream if the early fragment end would not contain that stream otherwise. * If an incomplete GOP has been pushed on the previous fragment accept inter-frames on the next fragment and do not drop them. Part-of: --- mux/fmp4/src/fmp4mux/imp.rs | 278 ++++++++++++++------------ mux/fmp4/tests/tests.rs | 377 +++++++++++++++++++++++++++++++++++- 2 files changed, 526 insertions(+), 129 deletions(-) diff --git a/mux/fmp4/src/fmp4mux/imp.rs b/mux/fmp4/src/fmp4mux/imp.rs index 98193e51..a97b1496 100644 --- a/mux/fmp4/src/fmp4mux/imp.rs +++ b/mux/fmp4/src/fmp4mux/imp.rs @@ -80,12 +80,13 @@ fn running_time_to_utc_time( /// Converts an UTC time to a running time. fn utc_time_to_running_time( - utc_time: gst::ClockTime, + utc_time: Option, running_time_utc_time_mapping: ( impl Into>, impl Into>, ), ) -> Option { + let utc_time = utc_time?; running_time_utc_time_mapping .0 .into() @@ -218,6 +219,13 @@ struct Stream { /// not accept any further buffers until the chunk/fragment is /// drained and draining will happen ASAP. next_caps: Option, + /// Set if language or rotation tag has changed. + tag_changed: bool, + /// Set to true if an incomplete GOP has been drained to the last + /// fragment and we accept delta-frames without having a + /// key-frame. The GOP queue is empty at this point. Once it is + /// filled again this flag will be reset. + pushed_incomplete_gop: bool, /// Whether this stream is intra-only and has frame reordering. delta_frames: DeltaFrames, /// Whether this stream might have header frames without timestamps that should be ignored. @@ -323,6 +331,10 @@ impl Stream { 10_000 } } + + fn caps_or_tag_change(&self) -> bool { + self.next_caps.is_some() || self.tag_changed + } } #[derive(Default)] @@ -495,9 +507,7 @@ impl FMP4Mux { } } - // do not collect buffers anymore once the caps change has - // been confirmed - if stream.next_caps.is_some() { + if stream.caps_or_tag_change() { return Ok(None); } @@ -1086,7 +1096,12 @@ impl FMP4Mux { let pts_position = buffer.pts().unwrap(); - if !buffer.flags().contains(gst::BufferFlags::DELTA_UNIT) { + // Accept new buffers if this is either an empty queue and we + // have drain an incomplete GOP before or if this frame is a + // key frame. + if (stream.queued_gops.is_empty() && stream.pushed_incomplete_gop) + || !buffer.flags().contains(gst::BufferFlags::DELTA_UNIT) + { gst::debug!( CAT, obj = stream.sinkpad, @@ -1239,6 +1254,9 @@ impl FMP4Mux { ); } + // A buffer has been accepted so this flag can be reset. + stream.pushed_incomplete_gop = false; + if let Some((prev_gop, first_gop)) = Option::zip( stream.queued_gops.iter().find(|gop| gop.final_end_pts), stream.queued_gops.back(), @@ -1332,7 +1350,12 @@ impl FMP4Mux { // Early return, if caps have changed assume this stream is // ready for pushing a fragment - if stream.next_caps.is_some() { + if stream.caps_or_tag_change() { + gst::trace!( + CAT, + obj = stream.sinkpad, + "On caps change stream is considered ready for fragment push", + ); stream.fragment_filled = true; stream.chunk_filled = true; return; @@ -1645,63 +1668,10 @@ impl FMP4Mux { state.fragment_end_pts = Some(fragment_end_pts); state.chunk_start_pts = Some(chunk_start_pts); - // Now send force-keyunit events for the second fragment start. - let fku_time = fragment_end_pts; - for stream in &state.streams { - let current_position = stream.current_position; - - // In case of ONVIF this needs to be converted back from UTC time to - // the stream's running time - let (fku_time, current_position) = - if self.obj().class().as_ref().variant == super::Variant::ONVIF { - ( - if let Some(fku_time) = utc_time_to_running_time( - fku_time, - stream.running_time_utc_time_mapping.unwrap(), - ) { - fku_time - } else { - continue; - }, - utc_time_to_running_time( - current_position, - stream.running_time_utc_time_mapping.unwrap(), - ), - ) - } else { - (fku_time, Some(current_position)) - }; - - let fku_time = - if current_position.is_some_and(|current_position| current_position > fku_time) { - gst::warning!( - CAT, - obj = stream.sinkpad, - "Sending first force-keyunit event late for running time {} at {}", - fku_time, - current_position.display(), - ); - None - } else { - gst::debug!( - CAT, - obj = stream.sinkpad, - "Sending first force-keyunit event for running time {}", - fku_time, - ); - Some(fku_time) - }; - - let fku = gst_video::UpstreamForceKeyUnitEvent::builder() - .running_time(fku_time) - .all_headers(true) - .build(); - - upstream_events.push((stream.sinkpad.clone(), fku)); - } - // Check if any of the streams are already filled enough for the first chunk/fragment. for stream in &mut state.streams { + // Now send force-keyunit events for the second fragment start. + self.request_force_keyunit_event(stream, state.fragment_end_pts, upstream_events); self.check_stream_filled( settings, stream, @@ -1784,7 +1754,8 @@ impl FMP4Mux { gop.final_end_pts || stream.sinkpad.is_eos() || need_new_header, ); - // If we have a final GOP then include it as long as it's either + // If we have a final GOP, EOS or caps change then include it as long as + // it's either // - ending before the dequeue end PTS // - no GOPs were dequeued yet and this is the first stream // @@ -1794,7 +1765,12 @@ impl FMP4Mux { && (gop.end_pts <= dequeue_end_pts || (gops.is_empty() && chunk_end_pts.is_none())) { - gst::trace!(CAT, obj = stream.sinkpad, "Pushing whole GOP",); + if !gop.final_end_pts && need_new_header { + stream.pushed_incomplete_gop = true; + gst::trace!(CAT, obj = stream.sinkpad, "Pushing incomplete GOP"); + } else { + gst::trace!(CAT, obj = stream.sinkpad, "Pushing whole GOP"); + } gops.push(stream.queued_gops.pop_back().unwrap()); continue; } @@ -1962,20 +1938,38 @@ impl FMP4Mux { "Current GOP start {} end {} (final {})", gop.start_pts, gop.end_pts, - gop.final_end_pts || stream.sinkpad.is_eos() || need_new_header + gop.final_end_pts || stream.sinkpad.is_eos() || stream.caps_or_tag_change() ); - // If this GOP is not complete then we can't pop it yet. + let end_pts = gop.end_pts; + // If this GOP is not complete then we can't pop it yet unless the pad is EOS + // or we had a caps change and no buffers from this stream in the fragment yet. // // If there was no complete GOP at all yet then it might be bigger than the // fragment duration. In this case we might not be able to handle the latency // requirements in a live pipeline. - if !gop.final_end_pts && !stream.sinkpad.is_eos() && !need_new_header { + if !gop.final_end_pts && !stream.sinkpad.is_eos() { gst::trace!( CAT, obj = stream.sinkpad, "Not including GOP without final end PTS", ); + + // Still take the partial GOP if we did not yet + // had a full GOP but a caps change on some stream + // or if this stream has the caps change. + if !stream.pushed_incomplete_gop + && ((gops.is_empty() && need_new_header) || stream.caps_or_tag_change()) + { + gst::trace!( + CAT, + obj = stream.sinkpad, + "Pushing partial GOP due to caps change", + ); + stream.pushed_incomplete_gop = true; + gops.push(stream.queued_gops.pop_back().unwrap()); + } + break; } @@ -1983,14 +1977,14 @@ impl FMP4Mux { // the first stream and no GOPs were dequeued at all yet. This would mean that the // GOP is bigger than the fragment duration. if !all_eos - && gop.end_pts > dequeue_end_pts + && end_pts > dequeue_end_pts && (chunk_end_pts.is_some() || !gops.is_empty()) { gst::trace!(CAT, obj = stream.sinkpad, "Not including GOP yet",); break; } - gst::trace!(CAT, obj = stream.sinkpad, "Pushing complete GOP",); + gst::trace!(CAT, obj = stream.sinkpad, "Pushing complete GOP"); gops.push(stream.queued_gops.pop_back().unwrap()); } } @@ -2522,71 +2516,62 @@ impl FMP4Mux { Ok((interleaved_buffers, streams)) } - /// Request a force-keyunit event for the start of the next fragment. - /// - /// This is called whenever the last chunk of a fragment is pushed out. - /// - /// `chunk_end_pts` gives the time of the previously drained chunk, which - /// ideally should be lower than the next fragment starts PTS. + /// Request a force-keyunit event for the given PTS. fn request_force_keyunit_event( &self, - state: &State, + stream: &Stream, + pts: Option, upstream_events: &mut Vec<(super::FMP4MuxPad, gst::Event)>, ) { - let fku_time = state.fragment_end_pts.unwrap(); + let current_position = stream.current_position; - for stream in &state.streams { - let current_position = stream.current_position; - - // In case of ONVIF this needs to be converted back from UTC time to - // the stream's running time - let (fku_time, current_position) = - if self.obj().class().as_ref().variant == super::Variant::ONVIF { - ( - if let Some(fku_time) = utc_time_to_running_time( - fku_time, - stream.running_time_utc_time_mapping.unwrap(), - ) { - fku_time - } else { - continue; - }, - utc_time_to_running_time( - current_position, - stream.running_time_utc_time_mapping.unwrap(), - ), - ) - } else { - (fku_time, Some(current_position)) + // In case of ONVIF this needs to be converted back from UTC time to + // the stream's running time + let (fku_time, current_position) = + if self.obj().class().as_ref().variant == super::Variant::ONVIF { + let Some(fku_time) = + utc_time_to_running_time(pts, stream.running_time_utc_time_mapping.unwrap()) + else { + return; }; + ( + Some(fku_time), + utc_time_to_running_time( + Some(current_position), + stream.running_time_utc_time_mapping.unwrap(), + ), + ) + } else { + (pts, Some(current_position)) + }; - let fku_time = - if current_position.is_some_and(|current_position| current_position > fku_time) { - gst::warning!( - CAT, - obj = stream.sinkpad, - "Sending force-keyunit event late for running time {} at {}", - fku_time, - current_position.display(), - ); - None - } else { - gst::debug!( - CAT, - obj = stream.sinkpad, - "Sending force-keyunit event for running time {}", - fku_time, - ); - Some(fku_time) - }; + let fku_time = if current_position + .is_some_and(|current_position| fku_time.is_some_and(|t| current_position > t)) + { + gst::warning!( + CAT, + obj = stream.sinkpad, + "Sending force-keyunit event late for running time {:?} at {}", + fku_time, + current_position.display(), + ); + None + } else { + gst::debug!( + CAT, + obj = stream.sinkpad, + "Sending force-keyunit event for running time {:?}", + fku_time, + ); + fku_time + }; - let fku = gst_video::UpstreamForceKeyUnitEvent::builder() - .running_time(fku_time) - .all_headers(true) - .build(); + let fku = gst_video::UpstreamForceKeyUnitEvent::builder() + .running_time(fku_time) + .all_headers(true) + .build(); - upstream_events.push((stream.sinkpad.clone(), fku)); - } + upstream_events.push((stream.sinkpad.clone(), fku)); } /// Fills upstream events as needed and returns the caps the first time draining can happen. @@ -2805,7 +2790,6 @@ impl FMP4Mux { state.end_pts = Some(chunk_end_pts); // Update for the start PTS of the next fragment / chunk - if fragment_filled || state.need_new_header { state.fragment_start_pts = Some(chunk_end_pts); state.fragment_end_pts = Some(self.get_fragment_end_pts( @@ -2827,7 +2811,9 @@ impl FMP4Mux { // If the current fragment is filled we already have the next fragment's start // keyframe and can request the following one. if fragment_filled { - self.request_force_keyunit_event(state, upstream_events); + for stream in &state.streams { + self.request_force_keyunit_event(stream, state.fragment_end_pts, upstream_events); + } } // Reset timeout delay now that we've output an actual fragment or chunk @@ -3017,6 +3003,8 @@ impl FMP4Mux { sinkpad: pad, caps, next_caps: None, + tag_changed: false, + pushed_incomplete_gop: false, delta_frames, discard_header_buffers, pre_queue: VecDeque::new(), @@ -3685,6 +3673,14 @@ impl AggregatorImpl for FMP4Mux { { state.language_code = language_code; state.need_new_header = true; + + if let Some(stream) = state + .streams + .iter_mut() + .find(|s| *aggregator_pad == s.sinkpad) + { + stream.tag_changed = true; + } } } } else if let Some(tag_value) = ev.tag().get::() { @@ -3882,6 +3878,25 @@ impl AggregatorImpl for FMP4Mux { &mut upstream_events, ); + if state.need_new_header { + for stream in state + .streams + .iter() + .filter(|s| s.next_caps.is_some() || s.pushed_incomplete_gop) + { + gst::info!( + CAT, + imp = self, + "Incomplete GOP pushed or caps change - send force-key-unit event" + ); + self.request_force_keyunit_event( + stream, + state.fragment_start_pts, + &mut upstream_events, + ); + } + } + (state.need_new_header, res) }; @@ -3910,8 +3925,15 @@ impl AggregatorImpl for FMP4Mux { state.need_new_header = false; state.stream_header = None; state.sent_headers = false; - for stream in state.streams.iter_mut().filter(|s| s.next_caps.is_some()) { - stream.caps = stream.next_caps.take().unwrap(); + for stream in state + .streams + .iter_mut() + .filter(|s| s.tag_changed || s.next_caps.is_some()) + { + stream.tag_changed = false; + if let Some(caps) = stream.next_caps.take() { + stream.caps = caps; + } } } diff --git a/mux/fmp4/tests/tests.rs b/mux/fmp4/tests/tests.rs index c708899f..e012ebba 100644 --- a/mux/fmp4/tests/tests.rs +++ b/mux/fmp4/tests/tests.rs @@ -2535,6 +2535,380 @@ fn test_caps_change_at_gop_boundary() { assert_eq!(h.buffers_in_queue(), 0); } +#[test] +fn test_language_change_at_gop_boundary() { + init(); + + let mut h = gst_check::Harness::with_padnames("isofmp4mux", Some("sink_0"), Some("src")); + + let caps = gst::Caps::builder("video/x-h264") + .field("width", 1920i32) + .field("height", 1080i32) + .field("framerate", gst::Fraction::new(30, 1)) + .field("stream-format", "avc") + .field("alignment", "au") + .field("codec_data", gst::Buffer::from_slice([1, 2, 3, 4])) + .build(); + + h.element() + .unwrap() + .set_property("fragment-duration", 1.seconds()); + + h.set_src_caps(caps); + h.play(); + + for i in 0..30 { + let mut buffer = gst::Buffer::with_size(1).unwrap(); + { + if i == 10 { + let mut tl = gst::TagList::new(); + tl.make_mut() + .add::(&"abc", gst::TagMergeMode::Append); + let ev = gst::event::Tag::builder(tl).build(); + h.push_event(ev); + } + + let buffer = buffer.get_mut().unwrap(); + buffer.set_pts(i * 100.mseconds()); + buffer.set_dts(i * 100.mseconds()); + buffer.set_duration(100.mseconds()); + + if i % 10 != 0 { + buffer.set_flags(gst::BufferFlags::DELTA_UNIT); + } + } + assert_eq!(h.push(buffer), Ok(gst::FlowSuccess::Ok)); + } + + //test_caps_changed_buffers(&mut h, 30, 10, 10, 100, true, false); + + h.crank_single_clock_wait().unwrap(); + // Initial fragment with HEADER and DISCONT + test_caps_changed_verify(&mut h, 1 + 1 + 10, true, false); + + h.crank_single_clock_wait().unwrap(); + // Full GOP with HEADER due to language change + test_caps_changed_verify(&mut h, 1 + 1 + 10, true, false); + + h.crank_single_clock_wait().unwrap(); + h.push_event(gst::event::Eos::new()); + // Full GOP with HEADER but no DISCONT because no caps change + test_caps_changed_verify(&mut h, 1 + 10, false, false); + + assert_eq!(h.buffers_in_queue(), 0); +} + +#[test] +fn test_caps_change_at_gop_boundary_multi_stream() { + init(); + + let mut h1 = gst_check::Harness::with_padnames("isofmp4mux", Some("sink_0"), Some("src")); + let mut h2 = gst_check::Harness::with_element(&h1.element().unwrap(), Some("sink_1"), None); + + let caps1 = gst::Caps::builder("video/x-h264") + .field("width", 1920i32) + .field("height", 1080i32) + .field("framerate", gst::Fraction::new(30, 1)) + .field("stream-format", "avc") + .field("alignment", "au") + .field("codec_data", gst::Buffer::from_slice([1, 2, 3, 4])) + .build(); + + let caps2 = gst::Caps::builder("video/x-h264") + .field("width", 640) + .field("height", 480) + .field("framerate", gst::Fraction::new(30, 1)) + .field("stream-format", "avc") + .field("alignment", "au") + .field("codec_data", gst::Buffer::from_slice([4, 3, 2, 1])) + .build(); + + h1.element() + .unwrap() + .set_property("fragment-duration", 330.mseconds()); + + h1.set_src_caps(caps1); + h1.play(); + h2.set_src_caps(caps2); + h2.play(); + + for i in 0..21 { + // caps change on 5th and 20th buffer + if let Some(caps) = match i { + 5 => Some( + gst::Caps::builder("video/x-h264") + .field("width", 1280i32) + .field("height", 720i32) + .field("framerate", gst::Fraction::new(30, 1)) + .field("stream-format", "avc") + .field("alignment", "au") + .field("codec_data", gst::Buffer::from_slice([1, 2, 3, 4])) + .build(), + ), + 20 => Some( + gst::Caps::builder("video/x-h264") + .field("width", 1024) + .field("height", 800) + .field("framerate", gst::Fraction::new(30, 1)) + .field("stream-format", "avc") + .field("alignment", "au") + .field("codec_data", gst::Buffer::from_slice([1, 2, 3, 4])) + .build(), + ), + _ => None, + } { + h1.push_event(gst::event::Caps::new(&caps)); + } + + let mut buffer = gst::Buffer::with_size(1).unwrap(); + { + let buffer = buffer.get_mut().unwrap(); + buffer.set_pts(i * 33.mseconds()); + buffer.set_dts(i * 33.mseconds()); + buffer.set_duration(33.mseconds()); + + // GOP size of 5 + if i % 5 != 0 { + buffer.set_flags(gst::BufferFlags::DELTA_UNIT); + } + } + assert_eq!(h1.push(buffer), Ok(gst::FlowSuccess::Ok)); + + let mut buffer = gst::Buffer::with_size(2).unwrap(); + { + let buffer = buffer.get_mut().unwrap(); + buffer.set_pts(i * 33.mseconds()); + buffer.set_dts(i * 33.mseconds()); + buffer.set_duration(33.mseconds()); + + // GOP size of 7 + if i % 7 != 0 { + buffer.set_flags(gst::BufferFlags::DELTA_UNIT); + } + } + assert_eq!(h2.push(buffer), Ok(gst::FlowSuccess::Ok)); + + if i != 5 { + continue; + } + + // reconfigure event + h1.try_pull_upstream_event().unwrap(); + h2.try_pull_upstream_event().unwrap(); + let ev1 = h1.pull_upstream_event().unwrap(); + assert_eq!( + gst_video::UpstreamForceKeyUnitEvent::parse(&ev1).unwrap(), + gst_video::UpstreamForceKeyUnitEvent { + running_time: Some(330.mseconds()), + all_headers: true, + count: 0 + } + ); + let ev2 = h2.pull_upstream_event().unwrap(); + assert_eq!( + gst_video::UpstreamForceKeyUnitEvent::parse(&ev1).unwrap(), + gst_video::UpstreamForceKeyUnitEvent::parse(&ev2).unwrap() + ); + let ev1 = h1.pull_upstream_event().unwrap(); + assert_eq!( + gst_video::UpstreamForceKeyUnitEvent::parse(&ev1).unwrap(), + gst_video::UpstreamForceKeyUnitEvent { + running_time: Some(495.mseconds()), + all_headers: true, + count: 0 + } + ); + let ev2 = h2.pull_upstream_event().unwrap(); + assert_eq!( + gst_video::UpstreamForceKeyUnitEvent::parse(&ev1).unwrap(), + gst_video::UpstreamForceKeyUnitEvent::parse(&ev2).unwrap() + ); + let ev1 = h1.pull_upstream_event().unwrap(); + assert_eq!( + gst_video::UpstreamForceKeyUnitEvent::parse(&ev1).unwrap(), + gst_video::UpstreamForceKeyUnitEvent { + running_time: Some(165.mseconds()), + all_headers: true, + count: 0 + } + ); + let ev2 = h2.pull_upstream_event().unwrap(); + assert_eq!( + gst_video::UpstreamForceKeyUnitEvent::parse(&ev1).unwrap(), + gst_video::UpstreamForceKeyUnitEvent::parse(&ev2).unwrap() + ); + } + + h1.crank_single_clock_wait().unwrap(); + // Caps change on 6th buffer, 1st stream has complete GOP but 2nd + // stream GOP is incomplete. Still 2nd stream buffers are pushed + // because the stream would be missing in the fragment otherwise. + test_caps_changed_verify(&mut h1, 1 + 1 + 5 + 4, true, false); + + h1.crank_single_clock_wait().unwrap(); + // Caps change on 20th buffer, both streams have complete GOPs, 3 + // from stream #1 and the rest of the old GOP and one complete GOP + // from stream #2. + test_caps_changed_verify(&mut h1, 1 + 1 + 5 + 5 + 5 + 3 + 7, true, false); + + // On stream #1 a FKU event for partial GOP and next GOP and on + // stream #2 just the next GOP FKU because no partial GOP has been + // pushed. + assert_eq!(h1.upstream_events_in_queue(), 2); + assert_eq!(h2.upstream_events_in_queue(), 1); + + h1.push_event(gst::event::Eos::new()); + h2.push_event(gst::event::Eos::new()); + + assert_eq!(h1.buffers_in_queue(), 0); +} + +#[test] +fn test_caps_change_at_gop_boundary_chunked_multi_stream() { + init(); + + let mut h1 = gst_check::Harness::with_padnames("isofmp4mux", Some("sink_0"), Some("src")); + let mut h2 = gst_check::Harness::with_element(&h1.element().unwrap(), Some("sink_1"), None); + + let caps1 = gst::Caps::builder("video/x-h264") + .field("width", 1920i32) + .field("height", 1080i32) + .field("framerate", gst::Fraction::new(30, 1)) + .field("stream-format", "avc") + .field("alignment", "au") + .field("codec_data", gst::Buffer::from_slice([1, 2, 3, 4])) + .build(); + + let caps2 = gst::Caps::builder("video/x-h264") + .field("width", 640) + .field("height", 480) + .field("framerate", gst::Fraction::new(30, 1)) + .field("stream-format", "avc") + .field("alignment", "au") + .field("codec_data", gst::Buffer::from_slice([4, 3, 2, 1])) + .build(); + + h1.element() + .unwrap() + .set_property("fragment-duration", 1.seconds()); + h1.element() + .unwrap() + .set_property("chunk-duration", 250.mseconds()); + + h1.set_src_caps(caps1); + h1.play(); + h2.set_src_caps(caps2); + h2.play(); + + for i in 0..21 { + // caps change on 10th and 20th buffer + if let Some(caps) = match i { + 10 => Some( + gst::Caps::builder("video/x-h264") + .field("width", 1280i32) + .field("height", 720i32) + .field("framerate", gst::Fraction::new(30, 1)) + .field("stream-format", "avc") + .field("alignment", "au") + .field("codec_data", gst::Buffer::from_slice([1, 2, 3, 4])) + .build(), + ), + _ => None, + } { + h1.push_event(gst::event::Caps::new(&caps)); + } + + let mut buffer = gst::Buffer::with_size(1).unwrap(); + { + let buffer = buffer.get_mut().unwrap(); + buffer.set_pts(i * 33.mseconds()); + buffer.set_dts(i * 33.mseconds()); + buffer.set_duration(33.mseconds()); + + // GOP size of 5 + if i % 5 != 0 { + buffer.set_flags(gst::BufferFlags::DELTA_UNIT); + } + } + assert_eq!(h1.push(buffer), Ok(gst::FlowSuccess::Ok)); + + let mut buffer = gst::Buffer::with_size(2).unwrap(); + { + let buffer = buffer.get_mut().unwrap(); + buffer.set_pts(i * 33.mseconds()); + buffer.set_dts(i * 33.mseconds()); + buffer.set_duration(33.mseconds()); + + // GOP size of 7 + if i % 7 != 0 { + buffer.set_flags(gst::BufferFlags::DELTA_UNIT); + } + } + assert_eq!(h2.push(buffer), Ok(gst::FlowSuccess::Ok)); + + if i != 5 { + continue; + } + + // reconfigure event + h1.try_pull_upstream_event().unwrap(); + h2.try_pull_upstream_event().unwrap(); + let ev1 = h1.pull_upstream_event().unwrap(); + assert_eq!( + gst_video::UpstreamForceKeyUnitEvent::parse(&ev1).unwrap(), + gst_video::UpstreamForceKeyUnitEvent { + running_time: Some(1.seconds()), + all_headers: true, + count: 0 + } + ); + let ev2 = h2.pull_upstream_event().unwrap(); + assert_eq!( + gst_video::UpstreamForceKeyUnitEvent::parse(&ev1).unwrap(), + gst_video::UpstreamForceKeyUnitEvent::parse(&ev2).unwrap() + ); + } + + h1.crank_single_clock_wait().unwrap(); + // Fragment start chunk + test_caps_changed_verify(&mut h1, 1 + 1 + 8 + 8, true, false); + + h1.crank_single_clock_wait().unwrap(); + // Early end of chunk due to caps change + test_caps_changed_verify(&mut h1, 1 + 2 + 1, false, true); + + // Signalling new keyunit for next fragment + let ev1 = h1.pull_upstream_event().unwrap(); + assert_eq!( + gst_video::UpstreamForceKeyUnitEvent::parse(&ev1).unwrap(), + gst_video::UpstreamForceKeyUnitEvent { + running_time: Some(1330.mseconds()), + all_headers: true, + count: 0 + } + ); + + // Signalling new keyunit on stream with caps change + let ev1 = h1.pull_upstream_event().unwrap(); + assert_eq!( + gst_video::UpstreamForceKeyUnitEvent::parse(&ev1).unwrap(), + gst_video::UpstreamForceKeyUnitEvent { + running_time: Some(330.mseconds()), + all_headers: true, + count: 0 + } + ); + + h1.crank_single_clock_wait().unwrap(); + // The first chunk of the new fragment + test_caps_changed_verify(&mut h1, 1 + 1 + 8 + 9, true, false); + + h1.push_event(gst::event::Eos::new()); + h2.push_event(gst::event::Eos::new()); + + assert_eq!(h1.buffers_in_queue(), 0); +} + #[test] fn test_caps_change_at_gop_boundary_compatible() { init(); @@ -2777,7 +3151,8 @@ fn test_caps_change_within_gop_no_key() { h.crank_single_clock_wait().unwrap(); // Reduced GOP with HEADER and DISCONT due to caps change - test_caps_changed_verify(&mut h, 1 + 1 + 10, true, false); + test_caps_changed_verify(&mut h, 1 + 1 + 5, true, false); + test_caps_changed_verify(&mut h, 1 + 10, false, false); h.crank_single_clock_wait().unwrap(); h.push_event(gst::event::Eos::new());