Revert "fmp4mux: Dequeue as many buffers as are available in each aggregate call"

This reverts commit 402500f79c.

This commit introduces race conditions. It was intended as solving
an issue with some pipelines which had their queues filling up,
causing the streams to stall. It is reverted as this solution is
considered a workaround for another issue.

See discussion in:

https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/803
This commit is contained in:
François Laignel 2022-07-20 16:32:42 +02:00 committed by Sebastian Dröge
parent fe210a5715
commit 6a2df92453
2 changed files with 54 additions and 54 deletions

View file

@ -1545,67 +1545,65 @@ impl AggregatorImpl for FMP4Mux {
// Queue buffers from all streams that are not filled for the current fragment yet
let fragment_start_pts = state.fragment_start_pts;
'next_stream: for (idx, stream) in state.streams.iter_mut().enumerate() {
loop {
if stream.fragment_filled {
let buffer = stream.sinkpad.peek_buffer();
all_eos &= buffer.is_none() && stream.sinkpad.is_eos();
continue 'next_stream;
}
let buffer = stream.sinkpad.pop_buffer();
for (idx, stream) in state.streams.iter_mut().enumerate() {
if stream.fragment_filled {
let buffer = stream.sinkpad.peek_buffer();
all_eos &= buffer.is_none() && stream.sinkpad.is_eos();
let buffer = match buffer {
None => continue 'next_stream,
Some(buffer) => buffer,
};
continue;
}
let segment = match stream
.sinkpad
.segment()
.clone()
.downcast::<gst::ClockTime>()
.ok()
{
Some(segment) => segment,
None => {
gst::error!(CAT, obj: &stream.sinkpad, "Got buffer before segment");
return Err(gst::FlowError::Error);
}
};
let buffer = stream.sinkpad.pop_buffer();
all_eos &= buffer.is_none() && stream.sinkpad.is_eos();
let pts = buffer.pts();
let buffer = match buffer {
None => continue,
Some(buffer) => buffer,
};
// Queue up the buffer and update GOP tracking state
self.queue_gops(aggregator, idx, stream, &segment, buffer)?;
// If we have a PTS with this buffer, check if a new force-keyunit event for the next
// fragment start has to be created
if let Some(pts) = pts {
if let Some(event) = self.create_force_keyunit_event(
aggregator, stream, &settings, &segment, pts,
)? {
upstream_events.push((stream.sinkpad.clone(), event));
}
let segment = match stream
.sinkpad
.segment()
.clone()
.downcast::<gst::ClockTime>()
.ok()
{
Some(segment) => segment,
None => {
gst::error!(CAT, obj: &stream.sinkpad, "Got buffer before segment");
return Err(gst::FlowError::Error);
}
};
// Check if this stream is filled enough now.
if let Some((queued_end_pts, fragment_start_pts)) = Option::zip(
stream
.queued_gops
.iter()
.find(|gop| gop.final_end_pts)
.map(|gop| gop.end_pts),
fragment_start_pts,
) {
if queued_end_pts.saturating_sub(fragment_start_pts)
>= settings.fragment_duration
{
gst::debug!(CAT, obj: &stream.sinkpad, "Stream queued enough data for this fragment");
stream.fragment_filled = true;
}
let pts = buffer.pts();
// Queue up the buffer and update GOP tracking state
self.queue_gops(aggregator, idx, stream, &segment, buffer)?;
// If we have a PTS with this buffer, check if a new force-keyunit event for the next
// fragment start has to be created
if let Some(pts) = pts {
if let Some(event) = self
.create_force_keyunit_event(aggregator, stream, &settings, &segment, pts)?
{
upstream_events.push((stream.sinkpad.clone(), event));
}
}
// Check if this stream is filled enough now.
if let Some((queued_end_pts, fragment_start_pts)) = Option::zip(
stream
.queued_gops
.iter()
.find(|gop| gop.final_end_pts)
.map(|gop| gop.end_pts),
fragment_start_pts,
) {
if queued_end_pts.saturating_sub(fragment_start_pts)
>= settings.fragment_duration
{
gst::debug!(CAT, obj: &stream.sinkpad, "Stream queued enough data for this fragment");
stream.fragment_filled = true;
}
}
}

View file

@ -519,6 +519,7 @@ fn test_live_timeout() {
}
// Advance time and crank the clock: this should bring us to the end of the first fragment
h1.set_time(gst::ClockTime::from_seconds(5)).unwrap();
h1.crank_single_clock_wait().unwrap();
let header = h1.pull().unwrap();
@ -550,6 +551,7 @@ fn test_live_timeout() {
if j == 1 && i == 4 {
// Advance time and crank the clock another time. This brings us at the end of the
// EOS.
h1.set_time(gst::ClockTime::from_seconds(7)).unwrap();
h1.crank_single_clock_wait().unwrap();
continue;
}