From 13b6f8fad4b9412787d6b64fa9f761b7e87dfe35 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Dr=C3=B6ge?= Date: Fri, 16 Dec 2022 16:54:38 +0200 Subject: [PATCH] fmp4mux: Skip gap buffers earlier to consider them for the sample durations and fragment start durations Otherwise dropping the gap buffers would offset the timestamps of following samples. Part-of: --- mux/fmp4/src/fmp4mux/imp.rs | 281 +++++++++++++++++++++--------------- 1 file changed, 164 insertions(+), 117 deletions(-) diff --git a/mux/fmp4/src/fmp4mux/imp.rs b/mux/fmp4/src/fmp4mux/imp.rs index 3351a510..6f4fdf21 100644 --- a/mux/fmp4/src/fmp4mux/imp.rs +++ b/mux/fmp4/src/fmp4mux/imp.rs @@ -179,8 +179,6 @@ struct Gop { /// Earliest PTS buffer position earliest_pts_position: gst::ClockTime, - /// Start DTS buffer position - start_dts_position: Option, /// Buffer, PTS running time, DTS running time buffers: Vec, @@ -763,7 +761,6 @@ impl FMP4Mux { let end_dts = end_dts.map(|v| v.positive().unwrap()); let pts_position = buffer.pts().unwrap(); - let dts_position = buffer.dts(); if !buffer.flags().contains(gst::BufferFlags::DELTA_UNIT) { gst::debug!( @@ -778,11 +775,6 @@ impl FMP4Mux { let gop = Gop { start_pts: pts, start_dts: dts, - start_dts_position: if !delta_frames.requires_dts() { - None - } else { - dts_position - }, earliest_pts: pts, earliest_pts_position: pts_position, final_earliest_pts: !delta_frames.requires_dts(), @@ -1075,43 +1067,6 @@ impl FMP4Mux { assert!(fragment_end_pts.is_some()); - let first_gop = gops.first().unwrap(); - let last_gop = gops.last().unwrap(); - let earliest_pts = first_gop.earliest_pts; - let earliest_pts_position = first_gop.earliest_pts_position; - let start_dts = first_gop.start_dts; - let start_dts_position = first_gop.start_dts_position; - let end_pts = last_gop.end_pts; - let dts_offset = stream.dts_offset; - - if min_earliest_pts.opt_gt(earliest_pts).unwrap_or(true) { - min_earliest_pts = Some(earliest_pts); - } - if min_earliest_pts_position - .opt_gt(earliest_pts_position) - .unwrap_or(true) - { - min_earliest_pts_position = Some(earliest_pts_position); - } - if let Some(start_dts_position) = start_dts_position { - if min_start_dts_position - .opt_gt(start_dts_position) - .unwrap_or(true) - { - min_start_dts_position = Some(start_dts_position); - } - } - - gst::info!( - CAT, - obj: stream.sinkpad, - "Draining {} worth of buffers starting at PTS {} DTS {}, DTS offset {}", - end_pts.saturating_sub(earliest_pts), - earliest_pts, - start_dts.display(), - dts_offset.display(), - ); - if let Some((prev_gop, first_gop)) = Option::zip( stream.queued_gops.iter().find(|gop| gop.final_end_pts), stream.queued_gops.back(), @@ -1133,70 +1088,175 @@ impl FMP4Mux { .unwrap_or(gst::ClockTime::ZERO) ); + let last_gop = gops.last().unwrap(); + let end_pts = last_gop.end_pts; + let end_dts = last_gop.end_dts; + + // First flatten all GOPs into a single `Vec` + let mut gop_buffers = Vec::with_capacity(gops.iter().map(|g| g.buffers.len()).sum()); + gop_buffers.extend(gops.into_iter().flat_map(|gop| gop.buffers.into_iter())); + + // Then calculate durations for all of the buffers and get rid of any GAP buffers in + // the process. + // Also calculate the earliest PTS / start DTS here, which needs to consider GAP + // buffers too. + let mut buffers = VecDeque::with_capacity(gop_buffers.len()); + let mut earliest_pts = None; + let mut earliest_pts_position = None; + let mut start_dts = None; + let mut start_dts_position = None; + + let mut gop_buffers = gop_buffers.into_iter(); + while let Some(buffer) = gop_buffers.next() { + // If this is a GAP buffer then skip it. Its duration was already considered + // below for the non-GAP buffer preceding it, and if there was none then the + // fragment start would be adjusted accordingly for this stream. + if buffer.buffer.flags().contains(gst::BufferFlags::GAP) + && buffer.buffer.flags().contains(gst::BufferFlags::DROPPABLE) + && buffer.buffer.size() == 0 + { + gst::trace!( + CAT, + obj: stream.sinkpad, + "Skipping gap buffer {buffer:?}", + ); + continue; + } + + if earliest_pts.map_or(true, |earliest_pts| buffer.pts < earliest_pts) { + earliest_pts = Some(buffer.pts); + } + if earliest_pts_position.map_or(true, |earliest_pts_position| { + buffer.buffer.pts().unwrap() < earliest_pts_position + }) { + earliest_pts_position = Some(buffer.buffer.pts().unwrap()); + } + if stream.delta_frames.requires_dts() && start_dts.is_none() { + start_dts = Some(buffer.dts.unwrap()); + } + if stream.delta_frames.requires_dts() && start_dts_position.is_none() { + start_dts_position = Some(buffer.buffer.dts().unwrap()); + } + + let timestamp = if !stream.delta_frames.requires_dts() { + buffer.pts + } else { + buffer.dts.unwrap() + }; + + // Take as end timestamp the timestamp of the next non-GAP buffer + let end_timestamp = match gop_buffers.as_slice().iter().find(|buf| { + !buf.buffer.flags().contains(gst::BufferFlags::GAP) + || !buf.buffer.flags().contains(gst::BufferFlags::DROPPABLE) + || buf.buffer.size() != 0 + }) { + Some(buffer) => { + if !stream.delta_frames.requires_dts() { + buffer.pts + } else { + buffer.dts.unwrap() + } + } + None => { + if !stream.delta_frames.requires_dts() { + end_pts + } else { + end_dts.unwrap() + } + } + }; + + // Timestamps are enforced to monotonically increase when queueing buffers + let duration = end_timestamp + .checked_sub(timestamp) + .expect("Timestamps going backwards"); + + let composition_time_offset = if !stream.delta_frames.requires_dts() { + None + } else { + let pts = buffer.pts; + let dts = buffer.dts.unwrap(); + + Some( + i64::try_from( + (gst::Signed::Positive(pts) - gst::Signed::Positive(dts)).nseconds(), + ) + .map_err(|_| { + gst::error!(CAT, obj: stream.sinkpad, "Too big PTS/DTS difference"); + gst::FlowError::Error + })?, + ) + }; + + buffers.push_back(Buffer { + idx, + buffer: buffer.buffer, + timestamp, + duration, + composition_time_offset, + }); + } + + if buffers.is_empty() { + gst::info!( + CAT, + obj: stream.sinkpad, + "Drained only gap buffers", + ); + + drained_streams.push(( + super::FragmentHeaderStream { + caps: stream.caps.clone(), + start_time: None, + delta_frames: stream.delta_frames, + trak_timescale: stream_settings.trak_timescale, + }, + VecDeque::new(), + )); + + continue; + } + + let earliest_pts = earliest_pts.unwrap(); + let earliest_pts_position = earliest_pts_position.unwrap(); + if stream.delta_frames.requires_dts() { + assert!(start_dts.is_some()); + assert!(start_dts_position.is_some()); + } + let start_dts = start_dts; + let start_dts_position = start_dts_position; + + gst::info!( + CAT, + obj: stream.sinkpad, + "Draining {} worth of buffers starting at PTS {} DTS {}, DTS offset {}", + end_pts.saturating_sub(earliest_pts), + earliest_pts, + start_dts.display(), + stream.dts_offset.display(), + ); + let start_time = if !stream.delta_frames.requires_dts() { earliest_pts } else { start_dts.unwrap() }; - let mut buffers = VecDeque::with_capacity(gops.iter().map(|g| g.buffers.len()).sum()); - - for gop in gops { - let mut gop_buffers = gop.buffers.into_iter().peekable(); - while let Some(buffer) = gop_buffers.next() { - let timestamp = if !stream.delta_frames.requires_dts() { - buffer.pts - } else { - buffer.dts.unwrap() - }; - - let end_timestamp = match gop_buffers.peek() { - Some(buffer) => { - if !stream.delta_frames.requires_dts() { - buffer.pts - } else { - buffer.dts.unwrap() - } - } - None => { - if !stream.delta_frames.requires_dts() { - gop.end_pts - } else { - gop.end_dts.unwrap() - } - } - }; - - // Timestamps are enforced to monotonically increase when queueing buffers - let duration = end_timestamp - .checked_sub(timestamp) - .expect("Timestamps going backwards"); - - let composition_time_offset = if !stream.delta_frames.requires_dts() { - None - } else { - let pts = buffer.pts; - let dts = buffer.dts.unwrap(); - - Some( - i64::try_from( - (gst::Signed::Positive(pts) - gst::Signed::Positive(dts)) - .nseconds(), - ) - .map_err(|_| { - gst::error!(CAT, obj: stream.sinkpad, "Too big PTS/DTS difference"); - gst::FlowError::Error - })?, - ) - }; - - buffers.push_back(Buffer { - idx, - buffer: buffer.buffer, - timestamp, - duration, - composition_time_offset, - }); + if min_earliest_pts.opt_gt(earliest_pts).unwrap_or(true) { + min_earliest_pts = Some(earliest_pts); + } + if min_earliest_pts_position + .opt_gt(earliest_pts_position) + .unwrap_or(true) + { + min_earliest_pts_position = Some(earliest_pts_position); + } + if let Some(start_dts_position) = start_dts_position { + if min_start_dts_position + .opt_gt(start_dts_position) + .unwrap_or(true) + { + min_start_dts_position = Some(start_dts_position); } } @@ -1309,26 +1369,13 @@ impl FMP4Mux { // Collect all buffers and their timing information that are to be drained right now. let ( - mut drained_streams, + drained_streams, min_earliest_pts_position, min_earliest_pts, min_start_dts_position, fragment_end_pts, ) = self.drain_buffers(state, settings, timeout, at_eos)?; - // Remove all GAP buffers before processing them further - for (stream, buffers) in &mut drained_streams { - buffers.retain(|buf| { - !buf.buffer.flags().contains(gst::BufferFlags::GAP) - || !buf.buffer.flags().contains(gst::BufferFlags::DROPPABLE) - || buf.buffer.size() != 0 - }); - - if buffers.is_empty() { - stream.start_time = None; - } - } - // Create header now if it was not created before and return the caps let mut caps = None; if state.stream_header.is_none() {