fmp4mux: Fixed caps change handling for multiple streams

* Moved FKU code into separate function.
* Send FKU for each stream where the caps change happened.
* Send FKU for each stream where an incomplete GOP has been pushed on
  caps change.
* Push incomplete GOP from stream if the early fragment end would not
  contain that stream otherwise.
* If an incomplete GOP has been pushed on the previous fragment accept
  inter-frames on the next fragment and do not drop them.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1895>
This commit is contained in:
Jochen Henneberg 2024-10-29 09:19:42 +01:00 committed by GStreamer Marge Bot
parent e7813ca3fc
commit 26a6c3b7b9
2 changed files with 526 additions and 129 deletions

View file

@ -80,12 +80,13 @@ fn running_time_to_utc_time(
/// Converts an UTC time to a running time.
fn utc_time_to_running_time(
utc_time: gst::ClockTime,
utc_time: Option<gst::ClockTime>,
running_time_utc_time_mapping: (
impl Into<gst::Signed<gst::ClockTime>>,
impl Into<gst::Signed<gst::ClockTime>>,
),
) -> Option<gst::ClockTime> {
let utc_time = utc_time?;
running_time_utc_time_mapping
.0
.into()
@ -218,6 +219,13 @@ struct Stream {
/// not accept any further buffers until the chunk/fragment is
/// drained and draining will happen ASAP.
next_caps: Option<gst::Caps>,
/// Set if language or rotation tag has changed.
tag_changed: bool,
/// Set to true if an incomplete GOP has been drained to the last
/// fragment and we accept delta-frames without having a
/// key-frame. The GOP queue is empty at this point. Once it is
/// filled again this flag will be reset.
pushed_incomplete_gop: bool,
/// Whether this stream is intra-only and has frame reordering.
delta_frames: DeltaFrames,
/// Whether this stream might have header frames without timestamps that should be ignored.
@ -323,6 +331,10 @@ impl Stream {
10_000
}
}
fn caps_or_tag_change(&self) -> bool {
self.next_caps.is_some() || self.tag_changed
}
}
#[derive(Default)]
@ -495,9 +507,7 @@ impl FMP4Mux {
}
}
// do not collect buffers anymore once the caps change has
// been confirmed
if stream.next_caps.is_some() {
if stream.caps_or_tag_change() {
return Ok(None);
}
@ -1086,7 +1096,12 @@ impl FMP4Mux {
let pts_position = buffer.pts().unwrap();
if !buffer.flags().contains(gst::BufferFlags::DELTA_UNIT) {
// Accept new buffers if this is either an empty queue and we
// have drain an incomplete GOP before or if this frame is a
// key frame.
if (stream.queued_gops.is_empty() && stream.pushed_incomplete_gop)
|| !buffer.flags().contains(gst::BufferFlags::DELTA_UNIT)
{
gst::debug!(
CAT,
obj = stream.sinkpad,
@ -1239,6 +1254,9 @@ impl FMP4Mux {
);
}
// A buffer has been accepted so this flag can be reset.
stream.pushed_incomplete_gop = false;
if let Some((prev_gop, first_gop)) = Option::zip(
stream.queued_gops.iter().find(|gop| gop.final_end_pts),
stream.queued_gops.back(),
@ -1332,7 +1350,12 @@ impl FMP4Mux {
// Early return, if caps have changed assume this stream is
// ready for pushing a fragment
if stream.next_caps.is_some() {
if stream.caps_or_tag_change() {
gst::trace!(
CAT,
obj = stream.sinkpad,
"On caps change stream is considered ready for fragment push",
);
stream.fragment_filled = true;
stream.chunk_filled = true;
return;
@ -1645,63 +1668,10 @@ impl FMP4Mux {
state.fragment_end_pts = Some(fragment_end_pts);
state.chunk_start_pts = Some(chunk_start_pts);
// Now send force-keyunit events for the second fragment start.
let fku_time = fragment_end_pts;
for stream in &state.streams {
let current_position = stream.current_position;
// In case of ONVIF this needs to be converted back from UTC time to
// the stream's running time
let (fku_time, current_position) =
if self.obj().class().as_ref().variant == super::Variant::ONVIF {
(
if let Some(fku_time) = utc_time_to_running_time(
fku_time,
stream.running_time_utc_time_mapping.unwrap(),
) {
fku_time
} else {
continue;
},
utc_time_to_running_time(
current_position,
stream.running_time_utc_time_mapping.unwrap(),
),
)
} else {
(fku_time, Some(current_position))
};
let fku_time =
if current_position.is_some_and(|current_position| current_position > fku_time) {
gst::warning!(
CAT,
obj = stream.sinkpad,
"Sending first force-keyunit event late for running time {} at {}",
fku_time,
current_position.display(),
);
None
} else {
gst::debug!(
CAT,
obj = stream.sinkpad,
"Sending first force-keyunit event for running time {}",
fku_time,
);
Some(fku_time)
};
let fku = gst_video::UpstreamForceKeyUnitEvent::builder()
.running_time(fku_time)
.all_headers(true)
.build();
upstream_events.push((stream.sinkpad.clone(), fku));
}
// Check if any of the streams are already filled enough for the first chunk/fragment.
for stream in &mut state.streams {
// Now send force-keyunit events for the second fragment start.
self.request_force_keyunit_event(stream, state.fragment_end_pts, upstream_events);
self.check_stream_filled(
settings,
stream,
@ -1784,7 +1754,8 @@ impl FMP4Mux {
gop.final_end_pts || stream.sinkpad.is_eos() || need_new_header,
);
// If we have a final GOP then include it as long as it's either
// If we have a final GOP, EOS or caps change then include it as long as
// it's either
// - ending before the dequeue end PTS
// - no GOPs were dequeued yet and this is the first stream
//
@ -1794,7 +1765,12 @@ impl FMP4Mux {
&& (gop.end_pts <= dequeue_end_pts
|| (gops.is_empty() && chunk_end_pts.is_none()))
{
gst::trace!(CAT, obj = stream.sinkpad, "Pushing whole GOP",);
if !gop.final_end_pts && need_new_header {
stream.pushed_incomplete_gop = true;
gst::trace!(CAT, obj = stream.sinkpad, "Pushing incomplete GOP");
} else {
gst::trace!(CAT, obj = stream.sinkpad, "Pushing whole GOP");
}
gops.push(stream.queued_gops.pop_back().unwrap());
continue;
}
@ -1962,20 +1938,38 @@ impl FMP4Mux {
"Current GOP start {} end {} (final {})",
gop.start_pts,
gop.end_pts,
gop.final_end_pts || stream.sinkpad.is_eos() || need_new_header
gop.final_end_pts || stream.sinkpad.is_eos() || stream.caps_or_tag_change()
);
// If this GOP is not complete then we can't pop it yet.
let end_pts = gop.end_pts;
// If this GOP is not complete then we can't pop it yet unless the pad is EOS
// or we had a caps change and no buffers from this stream in the fragment yet.
//
// If there was no complete GOP at all yet then it might be bigger than the
// fragment duration. In this case we might not be able to handle the latency
// requirements in a live pipeline.
if !gop.final_end_pts && !stream.sinkpad.is_eos() && !need_new_header {
if !gop.final_end_pts && !stream.sinkpad.is_eos() {
gst::trace!(
CAT,
obj = stream.sinkpad,
"Not including GOP without final end PTS",
);
// Still take the partial GOP if we did not yet
// had a full GOP but a caps change on some stream
// or if this stream has the caps change.
if !stream.pushed_incomplete_gop
&& ((gops.is_empty() && need_new_header) || stream.caps_or_tag_change())
{
gst::trace!(
CAT,
obj = stream.sinkpad,
"Pushing partial GOP due to caps change",
);
stream.pushed_incomplete_gop = true;
gops.push(stream.queued_gops.pop_back().unwrap());
}
break;
}
@ -1983,14 +1977,14 @@ impl FMP4Mux {
// the first stream and no GOPs were dequeued at all yet. This would mean that the
// GOP is bigger than the fragment duration.
if !all_eos
&& gop.end_pts > dequeue_end_pts
&& end_pts > dequeue_end_pts
&& (chunk_end_pts.is_some() || !gops.is_empty())
{
gst::trace!(CAT, obj = stream.sinkpad, "Not including GOP yet",);
break;
}
gst::trace!(CAT, obj = stream.sinkpad, "Pushing complete GOP",);
gst::trace!(CAT, obj = stream.sinkpad, "Pushing complete GOP");
gops.push(stream.queued_gops.pop_back().unwrap());
}
}
@ -2522,71 +2516,62 @@ impl FMP4Mux {
Ok((interleaved_buffers, streams))
}
/// Request a force-keyunit event for the start of the next fragment.
///
/// This is called whenever the last chunk of a fragment is pushed out.
///
/// `chunk_end_pts` gives the time of the previously drained chunk, which
/// ideally should be lower than the next fragment starts PTS.
/// Request a force-keyunit event for the given PTS.
fn request_force_keyunit_event(
&self,
state: &State,
stream: &Stream,
pts: Option<gst::ClockTime>,
upstream_events: &mut Vec<(super::FMP4MuxPad, gst::Event)>,
) {
let fku_time = state.fragment_end_pts.unwrap();
let current_position = stream.current_position;
for stream in &state.streams {
let current_position = stream.current_position;
// In case of ONVIF this needs to be converted back from UTC time to
// the stream's running time
let (fku_time, current_position) =
if self.obj().class().as_ref().variant == super::Variant::ONVIF {
(
if let Some(fku_time) = utc_time_to_running_time(
fku_time,
stream.running_time_utc_time_mapping.unwrap(),
) {
fku_time
} else {
continue;
},
utc_time_to_running_time(
current_position,
stream.running_time_utc_time_mapping.unwrap(),
),
)
} else {
(fku_time, Some(current_position))
// In case of ONVIF this needs to be converted back from UTC time to
// the stream's running time
let (fku_time, current_position) =
if self.obj().class().as_ref().variant == super::Variant::ONVIF {
let Some(fku_time) =
utc_time_to_running_time(pts, stream.running_time_utc_time_mapping.unwrap())
else {
return;
};
(
Some(fku_time),
utc_time_to_running_time(
Some(current_position),
stream.running_time_utc_time_mapping.unwrap(),
),
)
} else {
(pts, Some(current_position))
};
let fku_time =
if current_position.is_some_and(|current_position| current_position > fku_time) {
gst::warning!(
CAT,
obj = stream.sinkpad,
"Sending force-keyunit event late for running time {} at {}",
fku_time,
current_position.display(),
);
None
} else {
gst::debug!(
CAT,
obj = stream.sinkpad,
"Sending force-keyunit event for running time {}",
fku_time,
);
Some(fku_time)
};
let fku_time = if current_position
.is_some_and(|current_position| fku_time.is_some_and(|t| current_position > t))
{
gst::warning!(
CAT,
obj = stream.sinkpad,
"Sending force-keyunit event late for running time {:?} at {}",
fku_time,
current_position.display(),
);
None
} else {
gst::debug!(
CAT,
obj = stream.sinkpad,
"Sending force-keyunit event for running time {:?}",
fku_time,
);
fku_time
};
let fku = gst_video::UpstreamForceKeyUnitEvent::builder()
.running_time(fku_time)
.all_headers(true)
.build();
let fku = gst_video::UpstreamForceKeyUnitEvent::builder()
.running_time(fku_time)
.all_headers(true)
.build();
upstream_events.push((stream.sinkpad.clone(), fku));
}
upstream_events.push((stream.sinkpad.clone(), fku));
}
/// Fills upstream events as needed and returns the caps the first time draining can happen.
@ -2805,7 +2790,6 @@ impl FMP4Mux {
state.end_pts = Some(chunk_end_pts);
// Update for the start PTS of the next fragment / chunk
if fragment_filled || state.need_new_header {
state.fragment_start_pts = Some(chunk_end_pts);
state.fragment_end_pts = Some(self.get_fragment_end_pts(
@ -2827,7 +2811,9 @@ impl FMP4Mux {
// If the current fragment is filled we already have the next fragment's start
// keyframe and can request the following one.
if fragment_filled {
self.request_force_keyunit_event(state, upstream_events);
for stream in &state.streams {
self.request_force_keyunit_event(stream, state.fragment_end_pts, upstream_events);
}
}
// Reset timeout delay now that we've output an actual fragment or chunk
@ -3017,6 +3003,8 @@ impl FMP4Mux {
sinkpad: pad,
caps,
next_caps: None,
tag_changed: false,
pushed_incomplete_gop: false,
delta_frames,
discard_header_buffers,
pre_queue: VecDeque::new(),
@ -3685,6 +3673,14 @@ impl AggregatorImpl for FMP4Mux {
{
state.language_code = language_code;
state.need_new_header = true;
if let Some(stream) = state
.streams
.iter_mut()
.find(|s| *aggregator_pad == s.sinkpad)
{
stream.tag_changed = true;
}
}
}
} else if let Some(tag_value) = ev.tag().get::<gst::tags::ImageOrientation>() {
@ -3882,6 +3878,25 @@ impl AggregatorImpl for FMP4Mux {
&mut upstream_events,
);
if state.need_new_header {
for stream in state
.streams
.iter()
.filter(|s| s.next_caps.is_some() || s.pushed_incomplete_gop)
{
gst::info!(
CAT,
imp = self,
"Incomplete GOP pushed or caps change - send force-key-unit event"
);
self.request_force_keyunit_event(
stream,
state.fragment_start_pts,
&mut upstream_events,
);
}
}
(state.need_new_header, res)
};
@ -3910,8 +3925,15 @@ impl AggregatorImpl for FMP4Mux {
state.need_new_header = false;
state.stream_header = None;
state.sent_headers = false;
for stream in state.streams.iter_mut().filter(|s| s.next_caps.is_some()) {
stream.caps = stream.next_caps.take().unwrap();
for stream in state
.streams
.iter_mut()
.filter(|s| s.tag_changed || s.next_caps.is_some())
{
stream.tag_changed = false;
if let Some(caps) = stream.next_caps.take() {
stream.caps = caps;
}
}
}

View file

@ -2535,6 +2535,380 @@ fn test_caps_change_at_gop_boundary() {
assert_eq!(h.buffers_in_queue(), 0);
}
#[test]
fn test_language_change_at_gop_boundary() {
init();
let mut h = gst_check::Harness::with_padnames("isofmp4mux", Some("sink_0"), Some("src"));
let caps = gst::Caps::builder("video/x-h264")
.field("width", 1920i32)
.field("height", 1080i32)
.field("framerate", gst::Fraction::new(30, 1))
.field("stream-format", "avc")
.field("alignment", "au")
.field("codec_data", gst::Buffer::from_slice([1, 2, 3, 4]))
.build();
h.element()
.unwrap()
.set_property("fragment-duration", 1.seconds());
h.set_src_caps(caps);
h.play();
for i in 0..30 {
let mut buffer = gst::Buffer::with_size(1).unwrap();
{
if i == 10 {
let mut tl = gst::TagList::new();
tl.make_mut()
.add::<gst::tags::LanguageCode>(&"abc", gst::TagMergeMode::Append);
let ev = gst::event::Tag::builder(tl).build();
h.push_event(ev);
}
let buffer = buffer.get_mut().unwrap();
buffer.set_pts(i * 100.mseconds());
buffer.set_dts(i * 100.mseconds());
buffer.set_duration(100.mseconds());
if i % 10 != 0 {
buffer.set_flags(gst::BufferFlags::DELTA_UNIT);
}
}
assert_eq!(h.push(buffer), Ok(gst::FlowSuccess::Ok));
}
//test_caps_changed_buffers(&mut h, 30, 10, 10, 100, true, false);
h.crank_single_clock_wait().unwrap();
// Initial fragment with HEADER and DISCONT
test_caps_changed_verify(&mut h, 1 + 1 + 10, true, false);
h.crank_single_clock_wait().unwrap();
// Full GOP with HEADER due to language change
test_caps_changed_verify(&mut h, 1 + 1 + 10, true, false);
h.crank_single_clock_wait().unwrap();
h.push_event(gst::event::Eos::new());
// Full GOP with HEADER but no DISCONT because no caps change
test_caps_changed_verify(&mut h, 1 + 10, false, false);
assert_eq!(h.buffers_in_queue(), 0);
}
#[test]
fn test_caps_change_at_gop_boundary_multi_stream() {
init();
let mut h1 = gst_check::Harness::with_padnames("isofmp4mux", Some("sink_0"), Some("src"));
let mut h2 = gst_check::Harness::with_element(&h1.element().unwrap(), Some("sink_1"), None);
let caps1 = gst::Caps::builder("video/x-h264")
.field("width", 1920i32)
.field("height", 1080i32)
.field("framerate", gst::Fraction::new(30, 1))
.field("stream-format", "avc")
.field("alignment", "au")
.field("codec_data", gst::Buffer::from_slice([1, 2, 3, 4]))
.build();
let caps2 = gst::Caps::builder("video/x-h264")
.field("width", 640)
.field("height", 480)
.field("framerate", gst::Fraction::new(30, 1))
.field("stream-format", "avc")
.field("alignment", "au")
.field("codec_data", gst::Buffer::from_slice([4, 3, 2, 1]))
.build();
h1.element()
.unwrap()
.set_property("fragment-duration", 330.mseconds());
h1.set_src_caps(caps1);
h1.play();
h2.set_src_caps(caps2);
h2.play();
for i in 0..21 {
// caps change on 5th and 20th buffer
if let Some(caps) = match i {
5 => Some(
gst::Caps::builder("video/x-h264")
.field("width", 1280i32)
.field("height", 720i32)
.field("framerate", gst::Fraction::new(30, 1))
.field("stream-format", "avc")
.field("alignment", "au")
.field("codec_data", gst::Buffer::from_slice([1, 2, 3, 4]))
.build(),
),
20 => Some(
gst::Caps::builder("video/x-h264")
.field("width", 1024)
.field("height", 800)
.field("framerate", gst::Fraction::new(30, 1))
.field("stream-format", "avc")
.field("alignment", "au")
.field("codec_data", gst::Buffer::from_slice([1, 2, 3, 4]))
.build(),
),
_ => None,
} {
h1.push_event(gst::event::Caps::new(&caps));
}
let mut buffer = gst::Buffer::with_size(1).unwrap();
{
let buffer = buffer.get_mut().unwrap();
buffer.set_pts(i * 33.mseconds());
buffer.set_dts(i * 33.mseconds());
buffer.set_duration(33.mseconds());
// GOP size of 5
if i % 5 != 0 {
buffer.set_flags(gst::BufferFlags::DELTA_UNIT);
}
}
assert_eq!(h1.push(buffer), Ok(gst::FlowSuccess::Ok));
let mut buffer = gst::Buffer::with_size(2).unwrap();
{
let buffer = buffer.get_mut().unwrap();
buffer.set_pts(i * 33.mseconds());
buffer.set_dts(i * 33.mseconds());
buffer.set_duration(33.mseconds());
// GOP size of 7
if i % 7 != 0 {
buffer.set_flags(gst::BufferFlags::DELTA_UNIT);
}
}
assert_eq!(h2.push(buffer), Ok(gst::FlowSuccess::Ok));
if i != 5 {
continue;
}
// reconfigure event
h1.try_pull_upstream_event().unwrap();
h2.try_pull_upstream_event().unwrap();
let ev1 = h1.pull_upstream_event().unwrap();
assert_eq!(
gst_video::UpstreamForceKeyUnitEvent::parse(&ev1).unwrap(),
gst_video::UpstreamForceKeyUnitEvent {
running_time: Some(330.mseconds()),
all_headers: true,
count: 0
}
);
let ev2 = h2.pull_upstream_event().unwrap();
assert_eq!(
gst_video::UpstreamForceKeyUnitEvent::parse(&ev1).unwrap(),
gst_video::UpstreamForceKeyUnitEvent::parse(&ev2).unwrap()
);
let ev1 = h1.pull_upstream_event().unwrap();
assert_eq!(
gst_video::UpstreamForceKeyUnitEvent::parse(&ev1).unwrap(),
gst_video::UpstreamForceKeyUnitEvent {
running_time: Some(495.mseconds()),
all_headers: true,
count: 0
}
);
let ev2 = h2.pull_upstream_event().unwrap();
assert_eq!(
gst_video::UpstreamForceKeyUnitEvent::parse(&ev1).unwrap(),
gst_video::UpstreamForceKeyUnitEvent::parse(&ev2).unwrap()
);
let ev1 = h1.pull_upstream_event().unwrap();
assert_eq!(
gst_video::UpstreamForceKeyUnitEvent::parse(&ev1).unwrap(),
gst_video::UpstreamForceKeyUnitEvent {
running_time: Some(165.mseconds()),
all_headers: true,
count: 0
}
);
let ev2 = h2.pull_upstream_event().unwrap();
assert_eq!(
gst_video::UpstreamForceKeyUnitEvent::parse(&ev1).unwrap(),
gst_video::UpstreamForceKeyUnitEvent::parse(&ev2).unwrap()
);
}
h1.crank_single_clock_wait().unwrap();
// Caps change on 6th buffer, 1st stream has complete GOP but 2nd
// stream GOP is incomplete. Still 2nd stream buffers are pushed
// because the stream would be missing in the fragment otherwise.
test_caps_changed_verify(&mut h1, 1 + 1 + 5 + 4, true, false);
h1.crank_single_clock_wait().unwrap();
// Caps change on 20th buffer, both streams have complete GOPs, 3
// from stream #1 and the rest of the old GOP and one complete GOP
// from stream #2.
test_caps_changed_verify(&mut h1, 1 + 1 + 5 + 5 + 5 + 3 + 7, true, false);
// On stream #1 a FKU event for partial GOP and next GOP and on
// stream #2 just the next GOP FKU because no partial GOP has been
// pushed.
assert_eq!(h1.upstream_events_in_queue(), 2);
assert_eq!(h2.upstream_events_in_queue(), 1);
h1.push_event(gst::event::Eos::new());
h2.push_event(gst::event::Eos::new());
assert_eq!(h1.buffers_in_queue(), 0);
}
#[test]
fn test_caps_change_at_gop_boundary_chunked_multi_stream() {
init();
let mut h1 = gst_check::Harness::with_padnames("isofmp4mux", Some("sink_0"), Some("src"));
let mut h2 = gst_check::Harness::with_element(&h1.element().unwrap(), Some("sink_1"), None);
let caps1 = gst::Caps::builder("video/x-h264")
.field("width", 1920i32)
.field("height", 1080i32)
.field("framerate", gst::Fraction::new(30, 1))
.field("stream-format", "avc")
.field("alignment", "au")
.field("codec_data", gst::Buffer::from_slice([1, 2, 3, 4]))
.build();
let caps2 = gst::Caps::builder("video/x-h264")
.field("width", 640)
.field("height", 480)
.field("framerate", gst::Fraction::new(30, 1))
.field("stream-format", "avc")
.field("alignment", "au")
.field("codec_data", gst::Buffer::from_slice([4, 3, 2, 1]))
.build();
h1.element()
.unwrap()
.set_property("fragment-duration", 1.seconds());
h1.element()
.unwrap()
.set_property("chunk-duration", 250.mseconds());
h1.set_src_caps(caps1);
h1.play();
h2.set_src_caps(caps2);
h2.play();
for i in 0..21 {
// caps change on 10th and 20th buffer
if let Some(caps) = match i {
10 => Some(
gst::Caps::builder("video/x-h264")
.field("width", 1280i32)
.field("height", 720i32)
.field("framerate", gst::Fraction::new(30, 1))
.field("stream-format", "avc")
.field("alignment", "au")
.field("codec_data", gst::Buffer::from_slice([1, 2, 3, 4]))
.build(),
),
_ => None,
} {
h1.push_event(gst::event::Caps::new(&caps));
}
let mut buffer = gst::Buffer::with_size(1).unwrap();
{
let buffer = buffer.get_mut().unwrap();
buffer.set_pts(i * 33.mseconds());
buffer.set_dts(i * 33.mseconds());
buffer.set_duration(33.mseconds());
// GOP size of 5
if i % 5 != 0 {
buffer.set_flags(gst::BufferFlags::DELTA_UNIT);
}
}
assert_eq!(h1.push(buffer), Ok(gst::FlowSuccess::Ok));
let mut buffer = gst::Buffer::with_size(2).unwrap();
{
let buffer = buffer.get_mut().unwrap();
buffer.set_pts(i * 33.mseconds());
buffer.set_dts(i * 33.mseconds());
buffer.set_duration(33.mseconds());
// GOP size of 7
if i % 7 != 0 {
buffer.set_flags(gst::BufferFlags::DELTA_UNIT);
}
}
assert_eq!(h2.push(buffer), Ok(gst::FlowSuccess::Ok));
if i != 5 {
continue;
}
// reconfigure event
h1.try_pull_upstream_event().unwrap();
h2.try_pull_upstream_event().unwrap();
let ev1 = h1.pull_upstream_event().unwrap();
assert_eq!(
gst_video::UpstreamForceKeyUnitEvent::parse(&ev1).unwrap(),
gst_video::UpstreamForceKeyUnitEvent {
running_time: Some(1.seconds()),
all_headers: true,
count: 0
}
);
let ev2 = h2.pull_upstream_event().unwrap();
assert_eq!(
gst_video::UpstreamForceKeyUnitEvent::parse(&ev1).unwrap(),
gst_video::UpstreamForceKeyUnitEvent::parse(&ev2).unwrap()
);
}
h1.crank_single_clock_wait().unwrap();
// Fragment start chunk
test_caps_changed_verify(&mut h1, 1 + 1 + 8 + 8, true, false);
h1.crank_single_clock_wait().unwrap();
// Early end of chunk due to caps change
test_caps_changed_verify(&mut h1, 1 + 2 + 1, false, true);
// Signalling new keyunit for next fragment
let ev1 = h1.pull_upstream_event().unwrap();
assert_eq!(
gst_video::UpstreamForceKeyUnitEvent::parse(&ev1).unwrap(),
gst_video::UpstreamForceKeyUnitEvent {
running_time: Some(1330.mseconds()),
all_headers: true,
count: 0
}
);
// Signalling new keyunit on stream with caps change
let ev1 = h1.pull_upstream_event().unwrap();
assert_eq!(
gst_video::UpstreamForceKeyUnitEvent::parse(&ev1).unwrap(),
gst_video::UpstreamForceKeyUnitEvent {
running_time: Some(330.mseconds()),
all_headers: true,
count: 0
}
);
h1.crank_single_clock_wait().unwrap();
// The first chunk of the new fragment
test_caps_changed_verify(&mut h1, 1 + 1 + 8 + 9, true, false);
h1.push_event(gst::event::Eos::new());
h2.push_event(gst::event::Eos::new());
assert_eq!(h1.buffers_in_queue(), 0);
}
#[test]
fn test_caps_change_at_gop_boundary_compatible() {
init();
@ -2777,7 +3151,8 @@ fn test_caps_change_within_gop_no_key() {
h.crank_single_clock_wait().unwrap();
// Reduced GOP with HEADER and DISCONT due to caps change
test_caps_changed_verify(&mut h, 1 + 1 + 10, true, false);
test_caps_changed_verify(&mut h, 1 + 1 + 5, true, false);
test_caps_changed_verify(&mut h, 1 + 10, false, false);
h.crank_single_clock_wait().unwrap();
h.push_event(gst::event::Eos::new());