diff --git a/generic/fmp4/src/fmp4mux/imp.rs b/generic/fmp4/src/fmp4mux/imp.rs index 3f4f71cc..96051327 100644 --- a/generic/fmp4/src/fmp4mux/imp.rs +++ b/generic/fmp4/src/fmp4mux/imp.rs @@ -1496,65 +1496,67 @@ impl AggregatorImpl for FMP4Mux { // Queue buffers from all streams that are not filled for the current fragment yet let fragment_start_pts = state.fragment_start_pts; - for (idx, stream) in state.streams.iter_mut().enumerate() { - if stream.fragment_filled { - let buffer = stream.sinkpad.peek_buffer(); + 'next_stream: for (idx, stream) in state.streams.iter_mut().enumerate() { + loop { + if stream.fragment_filled { + let buffer = stream.sinkpad.peek_buffer(); + all_eos &= buffer.is_none() && stream.sinkpad.is_eos(); + + continue 'next_stream; + } + + let buffer = stream.sinkpad.pop_buffer(); all_eos &= buffer.is_none() && stream.sinkpad.is_eos(); - continue; - } + let buffer = match buffer { + None => continue 'next_stream, + Some(buffer) => buffer, + }; - let buffer = stream.sinkpad.pop_buffer(); - all_eos &= buffer.is_none() && stream.sinkpad.is_eos(); - - let buffer = match buffer { - None => continue, - Some(buffer) => buffer, - }; - - let segment = match stream - .sinkpad - .segment() - .clone() - .downcast::() - .ok() - { - Some(segment) => segment, - None => { - gst::error!(CAT, obj: &stream.sinkpad, "Got buffer before segment"); - return Err(gst::FlowError::Error); - } - }; - - let pts = buffer.pts(); - - // Queue up the buffer and update GOP tracking state - self.queue_gops(aggregator, idx, stream, &segment, buffer)?; - - // If we have a PTS with this buffer, check if a new force-keyunit event for the next - // fragment start has to be created - if let Some(pts) = pts { - if let Some(event) = self - .create_force_keyunit_event(aggregator, stream, &settings, &segment, pts)? + let segment = match stream + .sinkpad + .segment() + .clone() + .downcast::() + .ok() { - upstream_events.push((stream.sinkpad.clone(), event)); - } - } + Some(segment) => segment, + None => { + gst::error!(CAT, obj: &stream.sinkpad, "Got buffer before segment"); + return Err(gst::FlowError::Error); + } + }; - // Check if this stream is filled enough now. - if let Some((queued_end_pts, fragment_start_pts)) = Option::zip( - stream - .queued_gops - .iter() - .find(|gop| gop.final_end_pts) - .map(|gop| gop.end_pts), - fragment_start_pts, - ) { - if queued_end_pts.saturating_sub(fragment_start_pts) - >= settings.fragment_duration - { - gst::debug!(CAT, obj: &stream.sinkpad, "Stream queued enough data for this fragment"); - stream.fragment_filled = true; + let pts = buffer.pts(); + + // Queue up the buffer and update GOP tracking state + self.queue_gops(aggregator, idx, stream, &segment, buffer)?; + + // If we have a PTS with this buffer, check if a new force-keyunit event for the next + // fragment start has to be created + if let Some(pts) = pts { + if let Some(event) = self.create_force_keyunit_event( + aggregator, stream, &settings, &segment, pts, + )? { + upstream_events.push((stream.sinkpad.clone(), event)); + } + } + + // Check if this stream is filled enough now. + if let Some((queued_end_pts, fragment_start_pts)) = Option::zip( + stream + .queued_gops + .iter() + .find(|gop| gop.final_end_pts) + .map(|gop| gop.end_pts), + fragment_start_pts, + ) { + if queued_end_pts.saturating_sub(fragment_start_pts) + >= settings.fragment_duration + { + gst::debug!(CAT, obj: &stream.sinkpad, "Stream queued enough data for this fragment"); + stream.fragment_filled = true; + } } } } diff --git a/generic/fmp4/tests/tests.rs b/generic/fmp4/tests/tests.rs index 266e2462..42c0d2cf 100644 --- a/generic/fmp4/tests/tests.rs +++ b/generic/fmp4/tests/tests.rs @@ -519,7 +519,6 @@ fn test_live_timeout() { } // Advance time and crank the clock: this should bring us to the end of the first fragment - h1.set_time(gst::ClockTime::from_seconds(5)).unwrap(); h1.crank_single_clock_wait().unwrap(); let header = h1.pull().unwrap(); @@ -551,7 +550,6 @@ fn test_live_timeout() { if j == 1 && i == 4 { // Advance time and crank the clock another time. This brings us at the end of the // EOS. - h1.set_time(gst::ClockTime::from_seconds(7)).unwrap(); h1.crank_single_clock_wait().unwrap(); continue; }