fmp4mux: Skip gap buffers earlier to consider them for the sample durations and fragment start durations

Otherwise dropping the gap buffers would offset the timestamps of
following samples.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1016>
This commit is contained in:
Sebastian Dröge 2022-12-16 16:54:38 +02:00
parent e344585d99
commit 13b6f8fad4

View file

@ -179,8 +179,6 @@ struct Gop {
/// Earliest PTS buffer position
earliest_pts_position: gst::ClockTime,
/// Start DTS buffer position
start_dts_position: Option<gst::ClockTime>,
/// Buffer, PTS running time, DTS running time
buffers: Vec<GopBuffer>,
@ -763,7 +761,6 @@ impl FMP4Mux {
let end_dts = end_dts.map(|v| v.positive().unwrap());
let pts_position = buffer.pts().unwrap();
let dts_position = buffer.dts();
if !buffer.flags().contains(gst::BufferFlags::DELTA_UNIT) {
gst::debug!(
@ -778,11 +775,6 @@ impl FMP4Mux {
let gop = Gop {
start_pts: pts,
start_dts: dts,
start_dts_position: if !delta_frames.requires_dts() {
None
} else {
dts_position
},
earliest_pts: pts,
earliest_pts_position: pts_position,
final_earliest_pts: !delta_frames.requires_dts(),
@ -1075,43 +1067,6 @@ impl FMP4Mux {
assert!(fragment_end_pts.is_some());
let first_gop = gops.first().unwrap();
let last_gop = gops.last().unwrap();
let earliest_pts = first_gop.earliest_pts;
let earliest_pts_position = first_gop.earliest_pts_position;
let start_dts = first_gop.start_dts;
let start_dts_position = first_gop.start_dts_position;
let end_pts = last_gop.end_pts;
let dts_offset = stream.dts_offset;
if min_earliest_pts.opt_gt(earliest_pts).unwrap_or(true) {
min_earliest_pts = Some(earliest_pts);
}
if min_earliest_pts_position
.opt_gt(earliest_pts_position)
.unwrap_or(true)
{
min_earliest_pts_position = Some(earliest_pts_position);
}
if let Some(start_dts_position) = start_dts_position {
if min_start_dts_position
.opt_gt(start_dts_position)
.unwrap_or(true)
{
min_start_dts_position = Some(start_dts_position);
}
}
gst::info!(
CAT,
obj: stream.sinkpad,
"Draining {} worth of buffers starting at PTS {} DTS {}, DTS offset {}",
end_pts.saturating_sub(earliest_pts),
earliest_pts,
start_dts.display(),
dts_offset.display(),
);
if let Some((prev_gop, first_gop)) = Option::zip(
stream.queued_gops.iter().find(|gop| gop.final_end_pts),
stream.queued_gops.back(),
@ -1133,70 +1088,175 @@ impl FMP4Mux {
.unwrap_or(gst::ClockTime::ZERO)
);
let last_gop = gops.last().unwrap();
let end_pts = last_gop.end_pts;
let end_dts = last_gop.end_dts;
// First flatten all GOPs into a single `Vec`
let mut gop_buffers = Vec::with_capacity(gops.iter().map(|g| g.buffers.len()).sum());
gop_buffers.extend(gops.into_iter().flat_map(|gop| gop.buffers.into_iter()));
// Then calculate durations for all of the buffers and get rid of any GAP buffers in
// the process.
// Also calculate the earliest PTS / start DTS here, which needs to consider GAP
// buffers too.
let mut buffers = VecDeque::with_capacity(gop_buffers.len());
let mut earliest_pts = None;
let mut earliest_pts_position = None;
let mut start_dts = None;
let mut start_dts_position = None;
let mut gop_buffers = gop_buffers.into_iter();
while let Some(buffer) = gop_buffers.next() {
// If this is a GAP buffer then skip it. Its duration was already considered
// below for the non-GAP buffer preceding it, and if there was none then the
// fragment start would be adjusted accordingly for this stream.
if buffer.buffer.flags().contains(gst::BufferFlags::GAP)
&& buffer.buffer.flags().contains(gst::BufferFlags::DROPPABLE)
&& buffer.buffer.size() == 0
{
gst::trace!(
CAT,
obj: stream.sinkpad,
"Skipping gap buffer {buffer:?}",
);
continue;
}
if earliest_pts.map_or(true, |earliest_pts| buffer.pts < earliest_pts) {
earliest_pts = Some(buffer.pts);
}
if earliest_pts_position.map_or(true, |earliest_pts_position| {
buffer.buffer.pts().unwrap() < earliest_pts_position
}) {
earliest_pts_position = Some(buffer.buffer.pts().unwrap());
}
if stream.delta_frames.requires_dts() && start_dts.is_none() {
start_dts = Some(buffer.dts.unwrap());
}
if stream.delta_frames.requires_dts() && start_dts_position.is_none() {
start_dts_position = Some(buffer.buffer.dts().unwrap());
}
let timestamp = if !stream.delta_frames.requires_dts() {
buffer.pts
} else {
buffer.dts.unwrap()
};
// Take as end timestamp the timestamp of the next non-GAP buffer
let end_timestamp = match gop_buffers.as_slice().iter().find(|buf| {
!buf.buffer.flags().contains(gst::BufferFlags::GAP)
|| !buf.buffer.flags().contains(gst::BufferFlags::DROPPABLE)
|| buf.buffer.size() != 0
}) {
Some(buffer) => {
if !stream.delta_frames.requires_dts() {
buffer.pts
} else {
buffer.dts.unwrap()
}
}
None => {
if !stream.delta_frames.requires_dts() {
end_pts
} else {
end_dts.unwrap()
}
}
};
// Timestamps are enforced to monotonically increase when queueing buffers
let duration = end_timestamp
.checked_sub(timestamp)
.expect("Timestamps going backwards");
let composition_time_offset = if !stream.delta_frames.requires_dts() {
None
} else {
let pts = buffer.pts;
let dts = buffer.dts.unwrap();
Some(
i64::try_from(
(gst::Signed::Positive(pts) - gst::Signed::Positive(dts)).nseconds(),
)
.map_err(|_| {
gst::error!(CAT, obj: stream.sinkpad, "Too big PTS/DTS difference");
gst::FlowError::Error
})?,
)
};
buffers.push_back(Buffer {
idx,
buffer: buffer.buffer,
timestamp,
duration,
composition_time_offset,
});
}
if buffers.is_empty() {
gst::info!(
CAT,
obj: stream.sinkpad,
"Drained only gap buffers",
);
drained_streams.push((
super::FragmentHeaderStream {
caps: stream.caps.clone(),
start_time: None,
delta_frames: stream.delta_frames,
trak_timescale: stream_settings.trak_timescale,
},
VecDeque::new(),
));
continue;
}
let earliest_pts = earliest_pts.unwrap();
let earliest_pts_position = earliest_pts_position.unwrap();
if stream.delta_frames.requires_dts() {
assert!(start_dts.is_some());
assert!(start_dts_position.is_some());
}
let start_dts = start_dts;
let start_dts_position = start_dts_position;
gst::info!(
CAT,
obj: stream.sinkpad,
"Draining {} worth of buffers starting at PTS {} DTS {}, DTS offset {}",
end_pts.saturating_sub(earliest_pts),
earliest_pts,
start_dts.display(),
stream.dts_offset.display(),
);
let start_time = if !stream.delta_frames.requires_dts() {
earliest_pts
} else {
start_dts.unwrap()
};
let mut buffers = VecDeque::with_capacity(gops.iter().map(|g| g.buffers.len()).sum());
for gop in gops {
let mut gop_buffers = gop.buffers.into_iter().peekable();
while let Some(buffer) = gop_buffers.next() {
let timestamp = if !stream.delta_frames.requires_dts() {
buffer.pts
} else {
buffer.dts.unwrap()
};
let end_timestamp = match gop_buffers.peek() {
Some(buffer) => {
if !stream.delta_frames.requires_dts() {
buffer.pts
} else {
buffer.dts.unwrap()
}
}
None => {
if !stream.delta_frames.requires_dts() {
gop.end_pts
} else {
gop.end_dts.unwrap()
}
}
};
// Timestamps are enforced to monotonically increase when queueing buffers
let duration = end_timestamp
.checked_sub(timestamp)
.expect("Timestamps going backwards");
let composition_time_offset = if !stream.delta_frames.requires_dts() {
None
} else {
let pts = buffer.pts;
let dts = buffer.dts.unwrap();
Some(
i64::try_from(
(gst::Signed::Positive(pts) - gst::Signed::Positive(dts))
.nseconds(),
)
.map_err(|_| {
gst::error!(CAT, obj: stream.sinkpad, "Too big PTS/DTS difference");
gst::FlowError::Error
})?,
)
};
buffers.push_back(Buffer {
idx,
buffer: buffer.buffer,
timestamp,
duration,
composition_time_offset,
});
if min_earliest_pts.opt_gt(earliest_pts).unwrap_or(true) {
min_earliest_pts = Some(earliest_pts);
}
if min_earliest_pts_position
.opt_gt(earliest_pts_position)
.unwrap_or(true)
{
min_earliest_pts_position = Some(earliest_pts_position);
}
if let Some(start_dts_position) = start_dts_position {
if min_start_dts_position
.opt_gt(start_dts_position)
.unwrap_or(true)
{
min_start_dts_position = Some(start_dts_position);
}
}
@ -1309,26 +1369,13 @@ impl FMP4Mux {
// Collect all buffers and their timing information that are to be drained right now.
let (
mut drained_streams,
drained_streams,
min_earliest_pts_position,
min_earliest_pts,
min_start_dts_position,
fragment_end_pts,
) = self.drain_buffers(state, settings, timeout, at_eos)?;
// Remove all GAP buffers before processing them further
for (stream, buffers) in &mut drained_streams {
buffers.retain(|buf| {
!buf.buffer.flags().contains(gst::BufferFlags::GAP)
|| !buf.buffer.flags().contains(gst::BufferFlags::DROPPABLE)
|| buf.buffer.size() != 0
});
if buffers.is_empty() {
stream.start_time = None;
}
}
// Create header now if it was not created before and return the caps
let mut caps = None;
if state.stream_header.is_none() {