fmp4mux: Dequeue the earliest buffer from any pad first instead of dequeueing up to a whole fragment from the same pad

This keeps the fill levels of each sinkpad in sync.
This commit is contained in:
Sebastian Dröge 2022-08-17 13:19:08 +03:00
parent 9827406113
commit 2c99f66ea5
2 changed files with 110 additions and 18 deletions

View file

@ -179,6 +179,94 @@ pub(crate) struct FMP4Mux {
} }
impl FMP4Mux { impl FMP4Mux {
fn find_earliest_stream<'a>(
&self,
element: &super::FMP4Mux,
state: &'a mut State,
timeout: bool,
) -> Result<Option<(usize, &'a mut Stream)>, gst::FlowError> {
let mut earliest_stream = None;
let mut all_have_data_or_eos = true;
for (idx, stream) in state.streams.iter_mut().enumerate() {
let buffer = match stream.sinkpad.peek_buffer() {
Some(buffer) => buffer,
None => {
if stream.sinkpad.is_eos() {
gst::trace!(CAT, obj: &stream.sinkpad, "Stream is EOS");
} else {
all_have_data_or_eos = false;
gst::trace!(CAT, obj: &stream.sinkpad, "Stream has no buffer");
}
continue;
}
};
if stream.fragment_filled {
gst::trace!(CAT, obj: &stream.sinkpad, "Stream has current fragment filled");
continue;
}
let segment = match stream
.sinkpad
.segment()
.clone()
.downcast::<gst::ClockTime>()
.ok()
{
Some(segment) => segment,
None => {
gst::error!(CAT, obj: &stream.sinkpad, "Got buffer before segment");
return Err(gst::FlowError::Error);
}
};
// If the stream has no valid running time, assume it's before everything else.
let running_time = match segment.to_running_time(buffer.dts_or_pts()) {
None => {
gst::trace!(CAT, obj: &stream.sinkpad, "Stream has no valid running time");
if earliest_stream.is_none() {
earliest_stream = Some((idx, stream, gst::ClockTime::ZERO));
}
continue;
}
Some(running_time) => running_time,
};
gst::trace!(CAT, obj: &stream.sinkpad, "Stream has running time {} queued", running_time);
if earliest_stream
.as_ref()
.map_or(true, |(_idx, _stream, earliest_running_time)| {
*earliest_running_time > running_time
})
{
earliest_stream = Some((idx, stream, running_time));
}
}
if !timeout && !all_have_data_or_eos {
gst::trace!(
CAT,
obj: element,
"No timeout and not all streams have a buffer or are EOS"
);
Ok(None)
} else if let Some((idx, stream, earliest_running_time)) = earliest_stream {
gst::trace!(
CAT,
obj: element,
"Stream {} is earliest stream with running time {}",
stream.sinkpad.name(),
earliest_running_time
);
Ok(Some((idx, stream)))
} else {
gst::trace!(CAT, obj: element, "No streams have data queued currently");
Ok(None)
}
}
// Queue incoming buffers as individual GOPs. // Queue incoming buffers as individual GOPs.
fn queue_gops( fn queue_gops(
&self, &self,
@ -1831,9 +1919,9 @@ impl AggregatorImpl for FMP4Mux {
) -> Result<gst::FlowSuccess, gst::FlowError> { ) -> Result<gst::FlowSuccess, gst::FlowError> {
let settings = self.settings.lock().unwrap().clone(); let settings = self.settings.lock().unwrap().clone();
let mut all_eos = true;
let mut upstream_events = vec![]; let mut upstream_events = vec![];
let all_eos;
let (caps, buffers) = { let (caps, buffers) = {
let mut state = self.state.lock().unwrap(); let mut state = self.state.lock().unwrap();
@ -1843,23 +1931,21 @@ impl AggregatorImpl for FMP4Mux {
} }
// Queue buffers from all streams that are not filled for the current fragment yet // Queue buffers from all streams that are not filled for the current fragment yet
//
// Always take a buffer from the stream with the earliest queued buffer to keep the
// fill-level at all sinkpads in sync.
let fragment_start_pts = state.fragment_start_pts; let fragment_start_pts = state.fragment_start_pts;
for (idx, stream) in state.streams.iter_mut().enumerate() {
if stream.fragment_filled {
let buffer = stream.sinkpad.peek_buffer();
all_eos &= buffer.is_none() && stream.sinkpad.is_eos();
continue; while let Some((idx, stream)) =
} self.find_earliest_stream(aggregator, &mut state, timeout)?
{
let buffer = stream.sinkpad.pop_buffer(); // Can only happen if the stream was flushed in the meantime
all_eos &= buffer.is_none() && stream.sinkpad.is_eos(); let buffer = match stream.sinkpad.pop_buffer() {
let buffer = match buffer {
None => continue,
Some(buffer) => buffer, Some(buffer) => buffer,
None => continue,
}; };
// Can only happen if the stream was flushed in the meantime
let segment = match stream let segment = match stream
.sinkpad .sinkpad
.segment() .segment()
@ -1907,10 +1993,6 @@ impl AggregatorImpl for FMP4Mux {
} }
} }
if all_eos {
gst::debug!(CAT, obj: aggregator, "All streams are EOS now");
}
// Calculate the earliest PTS after queueing input if we can now. // Calculate the earliest PTS after queueing input if we can now.
if state.earliest_pts.is_none() { if state.earliest_pts.is_none() {
let mut earliest_pts = None; let mut earliest_pts = None;
@ -1969,6 +2051,11 @@ impl AggregatorImpl for FMP4Mux {
} }
} }
all_eos = state.streams.iter().all(|stream| stream.sinkpad.is_eos());
if all_eos {
gst::debug!(CAT, obj: aggregator, "All streams are EOS now");
}
// If enough GOPs were queued, drain and create the output fragment // If enough GOPs were queued, drain and create the output fragment
self.drain(aggregator, &mut state, &settings, timeout, all_eos)? self.drain(aggregator, &mut state, &settings, timeout, all_eos)?
}; };

View file

@ -551,7 +551,6 @@ fn test_live_timeout() {
if j == 1 && i == 4 { if j == 1 && i == 4 {
// Advance time and crank the clock another time. This brings us at the end of the // Advance time and crank the clock another time. This brings us at the end of the
// EOS. // EOS.
h1.set_time(gst::ClockTime::from_seconds(7)).unwrap();
h1.crank_single_clock_wait().unwrap(); h1.crank_single_clock_wait().unwrap();
continue; continue;
} }
@ -652,6 +651,8 @@ fn test_gap_events() {
let mut h1 = gst_check::Harness::with_padnames("isofmp4mux", Some("sink_0"), Some("src")); let mut h1 = gst_check::Harness::with_padnames("isofmp4mux", Some("sink_0"), Some("src"));
let mut h2 = gst_check::Harness::with_element(&h1.element().unwrap(), Some("sink_1"), None); let mut h2 = gst_check::Harness::with_element(&h1.element().unwrap(), Some("sink_1"), None);
h1.use_testclock();
// 5s fragment duration // 5s fragment duration
h1.element() h1.element()
.unwrap() .unwrap()
@ -760,6 +761,10 @@ fn test_gap_events() {
} }
} }
// Advance time and crank the clock: this should bring us to the end of the first fragment
h1.set_time(gst::ClockTime::from_seconds(5)).unwrap();
h1.crank_single_clock_wait().unwrap();
let header = h1.pull().unwrap(); let header = h1.pull().unwrap();
assert_eq!( assert_eq!(
header.flags(), header.flags(),