diff --git a/generic/fmp4/src/fmp4mux/imp.rs b/generic/fmp4/src/fmp4mux/imp.rs index e338a057..aa74ab03 100644 --- a/generic/fmp4/src/fmp4mux/imp.rs +++ b/generic/fmp4/src/fmp4mux/imp.rs @@ -179,6 +179,94 @@ pub(crate) struct FMP4Mux { } impl FMP4Mux { + fn find_earliest_stream<'a>( + &self, + element: &super::FMP4Mux, + state: &'a mut State, + timeout: bool, + ) -> Result, gst::FlowError> { + let mut earliest_stream = None; + let mut all_have_data_or_eos = true; + + for (idx, stream) in state.streams.iter_mut().enumerate() { + let buffer = match stream.sinkpad.peek_buffer() { + Some(buffer) => buffer, + None => { + if stream.sinkpad.is_eos() { + gst::trace!(CAT, obj: &stream.sinkpad, "Stream is EOS"); + } else { + all_have_data_or_eos = false; + gst::trace!(CAT, obj: &stream.sinkpad, "Stream has no buffer"); + } + continue; + } + }; + + if stream.fragment_filled { + gst::trace!(CAT, obj: &stream.sinkpad, "Stream has current fragment filled"); + continue; + } + + let segment = match stream + .sinkpad + .segment() + .clone() + .downcast::() + .ok() + { + Some(segment) => segment, + None => { + gst::error!(CAT, obj: &stream.sinkpad, "Got buffer before segment"); + return Err(gst::FlowError::Error); + } + }; + + // If the stream has no valid running time, assume it's before everything else. + let running_time = match segment.to_running_time(buffer.dts_or_pts()) { + None => { + gst::trace!(CAT, obj: &stream.sinkpad, "Stream has no valid running time"); + if earliest_stream.is_none() { + earliest_stream = Some((idx, stream, gst::ClockTime::ZERO)); + } + continue; + } + Some(running_time) => running_time, + }; + + gst::trace!(CAT, obj: &stream.sinkpad, "Stream has running time {} queued", running_time); + + if earliest_stream + .as_ref() + .map_or(true, |(_idx, _stream, earliest_running_time)| { + *earliest_running_time > running_time + }) + { + earliest_stream = Some((idx, stream, running_time)); + } + } + + if !timeout && !all_have_data_or_eos { + gst::trace!( + CAT, + obj: element, + "No timeout and not all streams have a buffer or are EOS" + ); + Ok(None) + } else if let Some((idx, stream, earliest_running_time)) = earliest_stream { + gst::trace!( + CAT, + obj: element, + "Stream {} is earliest stream with running time {}", + stream.sinkpad.name(), + earliest_running_time + ); + Ok(Some((idx, stream))) + } else { + gst::trace!(CAT, obj: element, "No streams have data queued currently"); + Ok(None) + } + } + // Queue incoming buffers as individual GOPs. fn queue_gops( &self, @@ -1831,9 +1919,9 @@ impl AggregatorImpl for FMP4Mux { ) -> Result { let settings = self.settings.lock().unwrap().clone(); - let mut all_eos = true; let mut upstream_events = vec![]; + let all_eos; let (caps, buffers) = { let mut state = self.state.lock().unwrap(); @@ -1843,23 +1931,21 @@ impl AggregatorImpl for FMP4Mux { } // Queue buffers from all streams that are not filled for the current fragment yet + // + // Always take a buffer from the stream with the earliest queued buffer to keep the + // fill-level at all sinkpads in sync. let fragment_start_pts = state.fragment_start_pts; - for (idx, stream) in state.streams.iter_mut().enumerate() { - if stream.fragment_filled { - let buffer = stream.sinkpad.peek_buffer(); - all_eos &= buffer.is_none() && stream.sinkpad.is_eos(); - continue; - } - - let buffer = stream.sinkpad.pop_buffer(); - all_eos &= buffer.is_none() && stream.sinkpad.is_eos(); - - let buffer = match buffer { - None => continue, + while let Some((idx, stream)) = + self.find_earliest_stream(aggregator, &mut state, timeout)? + { + // Can only happen if the stream was flushed in the meantime + let buffer = match stream.sinkpad.pop_buffer() { Some(buffer) => buffer, + None => continue, }; + // Can only happen if the stream was flushed in the meantime let segment = match stream .sinkpad .segment() @@ -1907,10 +1993,6 @@ impl AggregatorImpl for FMP4Mux { } } - if all_eos { - gst::debug!(CAT, obj: aggregator, "All streams are EOS now"); - } - // Calculate the earliest PTS after queueing input if we can now. if state.earliest_pts.is_none() { let mut earliest_pts = None; @@ -1969,6 +2051,11 @@ impl AggregatorImpl for FMP4Mux { } } + all_eos = state.streams.iter().all(|stream| stream.sinkpad.is_eos()); + if all_eos { + gst::debug!(CAT, obj: aggregator, "All streams are EOS now"); + } + // If enough GOPs were queued, drain and create the output fragment self.drain(aggregator, &mut state, &settings, timeout, all_eos)? }; diff --git a/generic/fmp4/tests/tests.rs b/generic/fmp4/tests/tests.rs index 266e2462..556b29c6 100644 --- a/generic/fmp4/tests/tests.rs +++ b/generic/fmp4/tests/tests.rs @@ -551,7 +551,6 @@ fn test_live_timeout() { if j == 1 && i == 4 { // Advance time and crank the clock another time. This brings us at the end of the // EOS. - h1.set_time(gst::ClockTime::from_seconds(7)).unwrap(); h1.crank_single_clock_wait().unwrap(); continue; } @@ -652,6 +651,8 @@ fn test_gap_events() { let mut h1 = gst_check::Harness::with_padnames("isofmp4mux", Some("sink_0"), Some("src")); let mut h2 = gst_check::Harness::with_element(&h1.element().unwrap(), Some("sink_1"), None); + h1.use_testclock(); + // 5s fragment duration h1.element() .unwrap() @@ -760,6 +761,10 @@ fn test_gap_events() { } } + // Advance time and crank the clock: this should bring us to the end of the first fragment + h1.set_time(gst::ClockTime::from_seconds(5)).unwrap(); + h1.crank_single_clock_wait().unwrap(); + let header = h1.pull().unwrap(); assert_eq!( header.flags(),