fmp4mux: Dequeue as many buffers as are available in each aggregate call

This commit is contained in:
Sebastian Dröge 2022-05-31 17:34:30 +03:00
parent 7e2cf613b4
commit 402500f79c
2 changed files with 56 additions and 56 deletions

View file

@ -1496,65 +1496,67 @@ impl AggregatorImpl for FMP4Mux {
// Queue buffers from all streams that are not filled for the current fragment yet
let fragment_start_pts = state.fragment_start_pts;
for (idx, stream) in state.streams.iter_mut().enumerate() {
if stream.fragment_filled {
let buffer = stream.sinkpad.peek_buffer();
'next_stream: for (idx, stream) in state.streams.iter_mut().enumerate() {
loop {
if stream.fragment_filled {
let buffer = stream.sinkpad.peek_buffer();
all_eos &= buffer.is_none() && stream.sinkpad.is_eos();
continue 'next_stream;
}
let buffer = stream.sinkpad.pop_buffer();
all_eos &= buffer.is_none() && stream.sinkpad.is_eos();
continue;
}
let buffer = match buffer {
None => continue 'next_stream,
Some(buffer) => buffer,
};
let buffer = stream.sinkpad.pop_buffer();
all_eos &= buffer.is_none() && stream.sinkpad.is_eos();
let buffer = match buffer {
None => continue,
Some(buffer) => buffer,
};
let segment = match stream
.sinkpad
.segment()
.clone()
.downcast::<gst::ClockTime>()
.ok()
{
Some(segment) => segment,
None => {
gst::error!(CAT, obj: &stream.sinkpad, "Got buffer before segment");
return Err(gst::FlowError::Error);
}
};
let pts = buffer.pts();
// Queue up the buffer and update GOP tracking state
self.queue_gops(aggregator, idx, stream, &segment, buffer)?;
// If we have a PTS with this buffer, check if a new force-keyunit event for the next
// fragment start has to be created
if let Some(pts) = pts {
if let Some(event) = self
.create_force_keyunit_event(aggregator, stream, &settings, &segment, pts)?
let segment = match stream
.sinkpad
.segment()
.clone()
.downcast::<gst::ClockTime>()
.ok()
{
upstream_events.push((stream.sinkpad.clone(), event));
}
}
Some(segment) => segment,
None => {
gst::error!(CAT, obj: &stream.sinkpad, "Got buffer before segment");
return Err(gst::FlowError::Error);
}
};
// Check if this stream is filled enough now.
if let Some((queued_end_pts, fragment_start_pts)) = Option::zip(
stream
.queued_gops
.iter()
.find(|gop| gop.final_end_pts)
.map(|gop| gop.end_pts),
fragment_start_pts,
) {
if queued_end_pts.saturating_sub(fragment_start_pts)
>= settings.fragment_duration
{
gst::debug!(CAT, obj: &stream.sinkpad, "Stream queued enough data for this fragment");
stream.fragment_filled = true;
let pts = buffer.pts();
// Queue up the buffer and update GOP tracking state
self.queue_gops(aggregator, idx, stream, &segment, buffer)?;
// If we have a PTS with this buffer, check if a new force-keyunit event for the next
// fragment start has to be created
if let Some(pts) = pts {
if let Some(event) = self.create_force_keyunit_event(
aggregator, stream, &settings, &segment, pts,
)? {
upstream_events.push((stream.sinkpad.clone(), event));
}
}
// Check if this stream is filled enough now.
if let Some((queued_end_pts, fragment_start_pts)) = Option::zip(
stream
.queued_gops
.iter()
.find(|gop| gop.final_end_pts)
.map(|gop| gop.end_pts),
fragment_start_pts,
) {
if queued_end_pts.saturating_sub(fragment_start_pts)
>= settings.fragment_duration
{
gst::debug!(CAT, obj: &stream.sinkpad, "Stream queued enough data for this fragment");
stream.fragment_filled = true;
}
}
}
}

View file

@ -519,7 +519,6 @@ fn test_live_timeout() {
}
// Advance time and crank the clock: this should bring us to the end of the first fragment
h1.set_time(gst::ClockTime::from_seconds(5)).unwrap();
h1.crank_single_clock_wait().unwrap();
let header = h1.pull().unwrap();
@ -551,7 +550,6 @@ fn test_live_timeout() {
if j == 1 && i == 4 {
// Advance time and crank the clock another time. This brings us at the end of the
// EOS.
h1.set_time(gst::ClockTime::from_seconds(7)).unwrap();
h1.crank_single_clock_wait().unwrap();
continue;
}