From 6a2df92453c6e582e66c44fc6706fe2d581cd0db Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Laignel?= Date: Wed, 20 Jul 2022 16:32:42 +0200 Subject: [PATCH] Revert "fmp4mux: Dequeue as many buffers as are available in each aggregate call" This reverts commit 402500f79cdaf3cc267a25285428f76f3d7c1773. This commit introduces race conditions. It was intended as solving an issue with some pipelines which had their queues filling up, causing the streams to stall. It is reverted as this solution is considered a workaround for another issue. See discussion in: https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/803 --- generic/fmp4/src/fmp4mux/imp.rs | 106 ++++++++++++++++---------------- generic/fmp4/tests/tests.rs | 2 + 2 files changed, 54 insertions(+), 54 deletions(-) diff --git a/generic/fmp4/src/fmp4mux/imp.rs b/generic/fmp4/src/fmp4mux/imp.rs index 99e68871..6237176e 100644 --- a/generic/fmp4/src/fmp4mux/imp.rs +++ b/generic/fmp4/src/fmp4mux/imp.rs @@ -1545,67 +1545,65 @@ impl AggregatorImpl for FMP4Mux { // Queue buffers from all streams that are not filled for the current fragment yet let fragment_start_pts = state.fragment_start_pts; - 'next_stream: for (idx, stream) in state.streams.iter_mut().enumerate() { - loop { - if stream.fragment_filled { - let buffer = stream.sinkpad.peek_buffer(); - all_eos &= buffer.is_none() && stream.sinkpad.is_eos(); - - continue 'next_stream; - } - - let buffer = stream.sinkpad.pop_buffer(); + for (idx, stream) in state.streams.iter_mut().enumerate() { + if stream.fragment_filled { + let buffer = stream.sinkpad.peek_buffer(); all_eos &= buffer.is_none() && stream.sinkpad.is_eos(); - let buffer = match buffer { - None => continue 'next_stream, - Some(buffer) => buffer, - }; + continue; + } - let segment = match stream - .sinkpad - .segment() - .clone() - .downcast::() - .ok() - { - Some(segment) => segment, - None => { - gst::error!(CAT, obj: &stream.sinkpad, "Got buffer before segment"); - return Err(gst::FlowError::Error); - } - }; + let buffer = stream.sinkpad.pop_buffer(); + all_eos &= buffer.is_none() && stream.sinkpad.is_eos(); - let pts = buffer.pts(); + let buffer = match buffer { + None => continue, + Some(buffer) => buffer, + }; - // Queue up the buffer and update GOP tracking state - self.queue_gops(aggregator, idx, stream, &segment, buffer)?; - - // If we have a PTS with this buffer, check if a new force-keyunit event for the next - // fragment start has to be created - if let Some(pts) = pts { - if let Some(event) = self.create_force_keyunit_event( - aggregator, stream, &settings, &segment, pts, - )? { - upstream_events.push((stream.sinkpad.clone(), event)); - } + let segment = match stream + .sinkpad + .segment() + .clone() + .downcast::() + .ok() + { + Some(segment) => segment, + None => { + gst::error!(CAT, obj: &stream.sinkpad, "Got buffer before segment"); + return Err(gst::FlowError::Error); } + }; - // Check if this stream is filled enough now. - if let Some((queued_end_pts, fragment_start_pts)) = Option::zip( - stream - .queued_gops - .iter() - .find(|gop| gop.final_end_pts) - .map(|gop| gop.end_pts), - fragment_start_pts, - ) { - if queued_end_pts.saturating_sub(fragment_start_pts) - >= settings.fragment_duration - { - gst::debug!(CAT, obj: &stream.sinkpad, "Stream queued enough data for this fragment"); - stream.fragment_filled = true; - } + let pts = buffer.pts(); + + // Queue up the buffer and update GOP tracking state + self.queue_gops(aggregator, idx, stream, &segment, buffer)?; + + // If we have a PTS with this buffer, check if a new force-keyunit event for the next + // fragment start has to be created + if let Some(pts) = pts { + if let Some(event) = self + .create_force_keyunit_event(aggregator, stream, &settings, &segment, pts)? + { + upstream_events.push((stream.sinkpad.clone(), event)); + } + } + + // Check if this stream is filled enough now. + if let Some((queued_end_pts, fragment_start_pts)) = Option::zip( + stream + .queued_gops + .iter() + .find(|gop| gop.final_end_pts) + .map(|gop| gop.end_pts), + fragment_start_pts, + ) { + if queued_end_pts.saturating_sub(fragment_start_pts) + >= settings.fragment_duration + { + gst::debug!(CAT, obj: &stream.sinkpad, "Stream queued enough data for this fragment"); + stream.fragment_filled = true; } } } diff --git a/generic/fmp4/tests/tests.rs b/generic/fmp4/tests/tests.rs index 42c0d2cf..266e2462 100644 --- a/generic/fmp4/tests/tests.rs +++ b/generic/fmp4/tests/tests.rs @@ -519,6 +519,7 @@ fn test_live_timeout() { } // Advance time and crank the clock: this should bring us to the end of the first fragment + h1.set_time(gst::ClockTime::from_seconds(5)).unwrap(); h1.crank_single_clock_wait().unwrap(); let header = h1.pull().unwrap(); @@ -550,6 +551,7 @@ fn test_live_timeout() { if j == 1 && i == 4 { // Advance time and crank the clock another time. This brings us at the end of the // EOS. + h1.set_time(gst::ClockTime::from_seconds(7)).unwrap(); h1.crank_single_clock_wait().unwrap(); continue; }