mirror of
https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs.git
synced 2024-11-15 23:01:02 +00:00
fmp4mux: Skip gap buffers earlier to consider them for the sample durations and fragment start durations
Otherwise dropping the gap buffers would offset the timestamps of following samples. Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1017>
This commit is contained in:
parent
1eea2219c6
commit
fb745f077b
1 changed files with 159 additions and 111 deletions
|
@ -170,8 +170,6 @@ struct Gop {
|
|||
|
||||
/// Earliest PTS buffer position
|
||||
earliest_pts_position: gst::ClockTime,
|
||||
/// Start DTS buffer position
|
||||
start_dts_position: Option<gst::ClockTime>,
|
||||
|
||||
/// Buffer, PTS running time, DTS running time
|
||||
buffers: Vec<GopBuffer>,
|
||||
|
@ -767,7 +765,6 @@ impl FMP4Mux {
|
|||
let end_dts = end_dts.map(|v| v.positive().unwrap());
|
||||
|
||||
let pts_position = buffer.pts().unwrap();
|
||||
let dts_position = buffer.dts();
|
||||
|
||||
if !buffer.flags().contains(gst::BufferFlags::DELTA_UNIT) {
|
||||
gst::debug!(
|
||||
|
@ -782,11 +779,6 @@ impl FMP4Mux {
|
|||
let gop = Gop {
|
||||
start_pts: pts,
|
||||
start_dts: dts,
|
||||
start_dts_position: if !delta_frames.requires_dts() {
|
||||
None
|
||||
} else {
|
||||
dts_position
|
||||
},
|
||||
earliest_pts: pts,
|
||||
earliest_pts_position: pts_position,
|
||||
final_earliest_pts: !delta_frames.requires_dts(),
|
||||
|
@ -1079,43 +1071,6 @@ impl FMP4Mux {
|
|||
|
||||
assert!(fragment_end_pts.is_some());
|
||||
|
||||
let first_gop = gops.first().unwrap();
|
||||
let last_gop = gops.last().unwrap();
|
||||
let earliest_pts = first_gop.earliest_pts;
|
||||
let earliest_pts_position = first_gop.earliest_pts_position;
|
||||
let start_dts = first_gop.start_dts;
|
||||
let start_dts_position = first_gop.start_dts_position;
|
||||
let end_pts = last_gop.end_pts;
|
||||
let dts_offset = stream.dts_offset;
|
||||
|
||||
if min_earliest_pts.opt_gt(earliest_pts).unwrap_or(true) {
|
||||
min_earliest_pts = Some(earliest_pts);
|
||||
}
|
||||
if min_earliest_pts_position
|
||||
.opt_gt(earliest_pts_position)
|
||||
.unwrap_or(true)
|
||||
{
|
||||
min_earliest_pts_position = Some(earliest_pts_position);
|
||||
}
|
||||
if let Some(start_dts_position) = start_dts_position {
|
||||
if min_start_dts_position
|
||||
.opt_gt(start_dts_position)
|
||||
.unwrap_or(true)
|
||||
{
|
||||
min_start_dts_position = Some(start_dts_position);
|
||||
}
|
||||
}
|
||||
|
||||
gst::info!(
|
||||
CAT,
|
||||
obj: stream.sinkpad,
|
||||
"Draining {} worth of buffers starting at PTS {} DTS {}, DTS offset {}",
|
||||
end_pts.saturating_sub(earliest_pts),
|
||||
earliest_pts,
|
||||
start_dts.display(),
|
||||
dts_offset.display(),
|
||||
);
|
||||
|
||||
if let Some((prev_gop, first_gop)) = Option::zip(
|
||||
stream.queued_gops.iter().find(|gop| gop.final_end_pts),
|
||||
stream.queued_gops.back(),
|
||||
|
@ -1137,64 +1092,170 @@ impl FMP4Mux {
|
|||
.unwrap_or(gst::ClockTime::ZERO)
|
||||
);
|
||||
|
||||
let last_gop = gops.last().unwrap();
|
||||
let end_pts = last_gop.end_pts;
|
||||
let end_dts = last_gop.end_dts;
|
||||
|
||||
// First flatten all GOPs into a single `Vec`
|
||||
let mut gop_buffers = Vec::with_capacity(gops.iter().map(|g| g.buffers.len()).sum());
|
||||
gop_buffers.extend(gops.into_iter().flat_map(|gop| gop.buffers.into_iter()));
|
||||
|
||||
// Then calculate durations for all of the buffers and get rid of any GAP buffers in
|
||||
// the process.
|
||||
// Also calculate the earliest PTS / start DTS here, which needs to consider GAP
|
||||
// buffers too.
|
||||
let mut buffers = VecDeque::with_capacity(gop_buffers.len());
|
||||
let mut earliest_pts = None;
|
||||
let mut earliest_pts_position = None;
|
||||
let mut start_dts = None;
|
||||
let mut start_dts_position = None;
|
||||
|
||||
let mut gop_buffers = gop_buffers.into_iter();
|
||||
while let Some(buffer) = gop_buffers.next() {
|
||||
// If this is a GAP buffer then skip it. Its duration was already considered
|
||||
// below for the non-GAP buffer preceding it, and if there was none then the
|
||||
// fragment start would be adjusted accordingly for this stream.
|
||||
if buffer.buffer.flags().contains(gst::BufferFlags::GAP)
|
||||
&& buffer.buffer.flags().contains(gst::BufferFlags::DROPPABLE)
|
||||
&& buffer.buffer.size() == 0
|
||||
{
|
||||
gst::trace!(
|
||||
CAT,
|
||||
obj: stream.sinkpad,
|
||||
"Skipping gap buffer {buffer:?}",
|
||||
);
|
||||
continue;
|
||||
}
|
||||
|
||||
if earliest_pts.map_or(true, |earliest_pts| buffer.pts < earliest_pts) {
|
||||
earliest_pts = Some(buffer.pts);
|
||||
}
|
||||
if earliest_pts_position.map_or(true, |earliest_pts_position| {
|
||||
buffer.buffer.pts().unwrap() < earliest_pts_position
|
||||
}) {
|
||||
earliest_pts_position = Some(buffer.buffer.pts().unwrap());
|
||||
}
|
||||
if stream.delta_frames.requires_dts() && start_dts.is_none() {
|
||||
start_dts = Some(buffer.dts.unwrap());
|
||||
}
|
||||
if stream.delta_frames.requires_dts() && start_dts_position.is_none() {
|
||||
start_dts_position = Some(buffer.buffer.dts().unwrap());
|
||||
}
|
||||
|
||||
let timestamp = if !stream.delta_frames.requires_dts() {
|
||||
buffer.pts
|
||||
} else {
|
||||
buffer.dts.unwrap()
|
||||
};
|
||||
|
||||
// Take as end timestamp the timestamp of the next non-GAP buffer
|
||||
let end_timestamp = match gop_buffers.as_slice().iter().find(|buf| {
|
||||
!buf.buffer.flags().contains(gst::BufferFlags::GAP)
|
||||
|| !buf.buffer.flags().contains(gst::BufferFlags::DROPPABLE)
|
||||
|| buf.buffer.size() != 0
|
||||
}) {
|
||||
Some(buffer) => {
|
||||
if !stream.delta_frames.requires_dts() {
|
||||
buffer.pts
|
||||
} else {
|
||||
buffer.dts.unwrap()
|
||||
}
|
||||
}
|
||||
None => {
|
||||
if !stream.delta_frames.requires_dts() {
|
||||
end_pts
|
||||
} else {
|
||||
end_dts.unwrap()
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
// Timestamps are enforced to monotonically increase when queueing buffers
|
||||
let duration = end_timestamp
|
||||
.checked_sub(timestamp)
|
||||
.expect("Timestamps going backwards");
|
||||
|
||||
let composition_time_offset = if !stream.delta_frames.requires_dts() {
|
||||
None
|
||||
} else {
|
||||
let pts = buffer.pts;
|
||||
let dts = buffer.dts.unwrap();
|
||||
|
||||
Some(i64::try_from((pts - dts).nseconds()).map_err(|_| {
|
||||
gst::error!(CAT, obj: stream.sinkpad, "Too big PTS/DTS difference");
|
||||
gst::FlowError::Error
|
||||
})?)
|
||||
};
|
||||
|
||||
buffers.push_back(Buffer {
|
||||
idx,
|
||||
buffer: buffer.buffer,
|
||||
timestamp,
|
||||
duration,
|
||||
composition_time_offset,
|
||||
});
|
||||
}
|
||||
|
||||
if buffers.is_empty() {
|
||||
gst::info!(
|
||||
CAT,
|
||||
obj: stream.sinkpad,
|
||||
"Drained only gap buffers",
|
||||
);
|
||||
|
||||
drained_streams.push((
|
||||
super::FragmentHeaderStream {
|
||||
caps: stream.caps.clone(),
|
||||
start_time: None,
|
||||
delta_frames: stream.delta_frames,
|
||||
trak_timescale: stream_settings.trak_timescale,
|
||||
},
|
||||
VecDeque::new(),
|
||||
));
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
let earliest_pts = earliest_pts.unwrap();
|
||||
let earliest_pts_position = earliest_pts_position.unwrap();
|
||||
if stream.delta_frames.requires_dts() {
|
||||
assert!(start_dts.is_some());
|
||||
assert!(start_dts_position.is_some());
|
||||
}
|
||||
let start_dts = start_dts;
|
||||
let start_dts_position = start_dts_position;
|
||||
|
||||
gst::info!(
|
||||
CAT,
|
||||
obj: stream.sinkpad,
|
||||
"Draining {} worth of buffers starting at PTS {} DTS {}, DTS offset {}",
|
||||
end_pts.saturating_sub(earliest_pts),
|
||||
earliest_pts,
|
||||
start_dts.display(),
|
||||
stream.dts_offset.display(),
|
||||
);
|
||||
|
||||
let start_time = if !stream.delta_frames.requires_dts() {
|
||||
earliest_pts
|
||||
} else {
|
||||
start_dts.unwrap()
|
||||
};
|
||||
|
||||
let mut buffers = VecDeque::with_capacity(gops.iter().map(|g| g.buffers.len()).sum());
|
||||
|
||||
for gop in gops {
|
||||
let mut gop_buffers = gop.buffers.into_iter().peekable();
|
||||
while let Some(buffer) = gop_buffers.next() {
|
||||
let timestamp = if !stream.delta_frames.requires_dts() {
|
||||
buffer.pts
|
||||
} else {
|
||||
buffer.dts.unwrap()
|
||||
};
|
||||
|
||||
let end_timestamp = match gop_buffers.peek() {
|
||||
Some(buffer) => {
|
||||
if !stream.delta_frames.requires_dts() {
|
||||
buffer.pts
|
||||
} else {
|
||||
buffer.dts.unwrap()
|
||||
}
|
||||
}
|
||||
None => {
|
||||
if !stream.delta_frames.requires_dts() {
|
||||
gop.end_pts
|
||||
} else {
|
||||
gop.end_dts.unwrap()
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
// Timestamps are enforced to monotonically increase when queueing buffers
|
||||
let duration = end_timestamp
|
||||
.checked_sub(timestamp)
|
||||
.expect("Timestamps going backwards");
|
||||
|
||||
let composition_time_offset = if !stream.delta_frames.requires_dts() {
|
||||
None
|
||||
} else {
|
||||
let pts = buffer.pts;
|
||||
let dts = buffer.dts.unwrap();
|
||||
|
||||
Some(i64::try_from((pts - dts).nseconds()).map_err(|_| {
|
||||
gst::error!(CAT, obj: stream.sinkpad, "Too big PTS/DTS difference");
|
||||
gst::FlowError::Error
|
||||
})?)
|
||||
};
|
||||
|
||||
buffers.push_back(Buffer {
|
||||
idx,
|
||||
buffer: buffer.buffer,
|
||||
timestamp,
|
||||
duration,
|
||||
composition_time_offset,
|
||||
});
|
||||
if min_earliest_pts.opt_gt(earliest_pts).unwrap_or(true) {
|
||||
min_earliest_pts = Some(earliest_pts);
|
||||
}
|
||||
if min_earliest_pts_position
|
||||
.opt_gt(earliest_pts_position)
|
||||
.unwrap_or(true)
|
||||
{
|
||||
min_earliest_pts_position = Some(earliest_pts_position);
|
||||
}
|
||||
if let Some(start_dts_position) = start_dts_position {
|
||||
if min_start_dts_position
|
||||
.opt_gt(start_dts_position)
|
||||
.unwrap_or(true)
|
||||
{
|
||||
min_start_dts_position = Some(start_dts_position);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1307,26 +1368,13 @@ impl FMP4Mux {
|
|||
|
||||
// Collect all buffers and their timing information that are to be drained right now.
|
||||
let (
|
||||
mut drained_streams,
|
||||
drained_streams,
|
||||
min_earliest_pts_position,
|
||||
min_earliest_pts,
|
||||
min_start_dts_position,
|
||||
fragment_end_pts,
|
||||
) = self.drain_buffers(state, settings, timeout, at_eos)?;
|
||||
|
||||
// Remove all GAP buffers before processing them further
|
||||
for (stream, buffers) in &mut drained_streams {
|
||||
buffers.retain(|buf| {
|
||||
!buf.buffer.flags().contains(gst::BufferFlags::GAP)
|
||||
|| !buf.buffer.flags().contains(gst::BufferFlags::DROPPABLE)
|
||||
|| buf.buffer.size() != 0
|
||||
});
|
||||
|
||||
if buffers.is_empty() {
|
||||
stream.start_time = None;
|
||||
}
|
||||
}
|
||||
|
||||
// Create header now if it was not created before and return the caps
|
||||
let mut caps = None;
|
||||
if state.stream_header.is_none() {
|
||||
|
|
Loading…
Reference in a new issue