fmp4mux: Fix handling of negative DTS in composition time offset

Actually negative DTS (i.e. < 0) was handled correctly before when
queueing buffers, but this didn't handle the case correctly where the
very same stream has DTS/PTS shifted to be positive (i.e. start DTS is
before earliest PTS).

To solve this, pass through all DTS as signed values and at the very end
for every fragment calculate the offset between the two in addition to
the PTS/DTS offset of each buffer, and adjust the composition time
offsets accordingly.

This way the final PTS of each buffer after demuxing is equal to the
actual running time of the very same buffer before muxing. Previously
all PTS would've been shifted forwards by a couple of frames, which then
broke A/V synchronization.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/2232>
This commit is contained in:
Sebastian Dröge 2025-05-05 16:47:34 +03:00
parent 8f8a663555
commit 144aeb615a
2 changed files with 67 additions and 95 deletions

View file

@ -81,19 +81,18 @@ fn running_time_to_utc_time(
/// Converts an UTC time to a running time.
fn utc_time_to_running_time(
utc_time: Option<gst::ClockTime>,
utc_time: Option<impl Into<gst::Signed<gst::ClockTime>>>,
running_time_utc_time_mapping: (
impl Into<gst::Signed<gst::ClockTime>>,
impl Into<gst::Signed<gst::ClockTime>>,
),
) -> Option<gst::ClockTime> {
let utc_time = utc_time?;
) -> Option<gst::Signed<gst::ClockTime>> {
let utc_time = utc_time?.into();
running_time_utc_time_mapping
.0
.into()
.checked_sub(running_time_utc_time_mapping.1.into())
.and_then(|res| res.checked_add_unsigned(utc_time))
.and_then(|res| res.positive())
.and_then(|res| res.checked_add(utc_time))
}
pub static CAT: LazyLock<gst::DebugCategory> = LazyLock::new(|| {
@ -196,7 +195,7 @@ struct GopBuffer {
buffer: gst::Buffer,
pts: gst::ClockTime,
pts_position: gst::ClockTime,
dts: Option<gst::ClockTime>,
dts: Option<gst::Signed<gst::ClockTime>>,
/// If a split-now event was received and this buffer should become the first
/// buffer of the next fragment or chunk.
///
@ -209,7 +208,7 @@ struct Gop {
/// Start PTS.
start_pts: gst::ClockTime,
/// Start DTS.
start_dts: Option<gst::ClockTime>,
start_dts: Option<gst::Signed<gst::ClockTime>>,
/// Earliest PTS.
earliest_pts: gst::ClockTime,
/// Once this is known to be the final earliest PTS/DTS
@ -219,7 +218,7 @@ struct Gop {
/// Once this is known to be the final end PTS/DTS
final_end_pts: bool,
/// DTS plus duration of last buffer, or start of next GOP
end_dts: Option<gst::ClockTime>,
end_dts: Option<gst::Signed<gst::ClockTime>>,
/// Earliest PTS buffer position
earliest_pts_position: gst::ClockTime,
@ -263,12 +262,9 @@ struct Stream {
/// Whether a whole chunk is queued.
chunk_filled: bool,
/// Difference between the first DTS and 0 in case of negative DTS
dts_offset: Option<gst::ClockTime>,
/// Current position (DTS, or PTS for intra-only) to prevent
/// timestamps from going backwards when queueing new buffers
current_position: gst::ClockTime,
current_position: gst::Signed<gst::ClockTime>,
/// Mapping between running time and UTC time in ONVIF mode.
running_time_utc_time_mapping: Option<(gst::Signed<gst::ClockTime>, gst::ClockTime)>,
@ -378,8 +374,7 @@ impl Stream {
fn flush(&mut self) {
self.queued_gops.clear();
self.dts_offset = None;
self.current_position = gst::ClockTime::ZERO;
self.current_position = gst::ClockTime::MIN_SIGNED;
self.fragment_filled = false;
self.pre_queue.clear();
self.running_time_utc_time_mapping = None;
@ -431,7 +426,7 @@ struct State {
/// Current end PTS of the whole stream
end_pts: Option<gst::ClockTime>,
/// Start DTS of the whole stream
start_dts: Option<gst::ClockTime>,
start_dts: Option<gst::Signed<gst::ClockTime>>,
/// Start PTS of the current fragment
fragment_start_pts: Option<gst::ClockTime>,
@ -1097,60 +1092,18 @@ impl FMP4Mux {
pre_queued_buffer.pts,
stream.current_position,
);
pre_queued_buffer.pts = stream.current_position;
pre_queued_buffer.pts = stream.current_position.positive().unwrap();
} else {
stream.current_position = pre_queued_buffer.pts;
stream.current_position = gst::Signed::Positive(pre_queued_buffer.pts);
}
pre_queued_buffer.end_pts =
std::cmp::max(pre_queued_buffer.end_pts, pre_queued_buffer.pts);
} else {
// Negative DTS are handled via the dts_offset and by having negative composition time
// offsets in the `trun` box. The smallest DTS here is shifted to zero.
let dts = match pre_queued_buffer.dts.unwrap() {
gst::Signed::Positive(dts) => {
if let Some(dts_offset) = stream.dts_offset {
dts + dts_offset
} else {
dts
}
}
gst::Signed::Negative(dts) => {
if stream.dts_offset.is_none() {
stream.dts_offset = Some(dts);
}
let dts = pre_queued_buffer.dts.unwrap();
let end_dts = pre_queued_buffer.end_dts.unwrap();
let dts_offset = stream.dts_offset.unwrap();
if dts > dts_offset {
gst::warning!(CAT, obj = stream.sinkpad, "DTS before first DTS");
gst::ClockTime::ZERO
} else {
dts_offset - dts
}
}
};
let end_dts = match pre_queued_buffer.end_dts.unwrap() {
gst::Signed::Positive(dts) => {
if let Some(dts_offset) = stream.dts_offset {
dts + dts_offset
} else {
dts
}
}
gst::Signed::Negative(dts) => {
let dts_offset = stream.dts_offset.unwrap();
if dts > dts_offset {
gst::warning!(CAT, obj = stream.sinkpad, "End DTS before first DTS");
gst::ClockTime::ZERO
} else {
dts_offset - dts
}
}
};
// Enforce monotonically increasing DTS for intra-only streams
// Enforce monotonically increasing DTS
// NOTE: PTS stays the same so this will cause a bigger PTS/DTS difference
// FIXME: Is this correct?
if dts < stream.current_position {
gst::warning!(
CAT,
@ -1159,12 +1112,12 @@ impl FMP4Mux {
dts,
stream.current_position,
);
pre_queued_buffer.dts = Some(gst::Signed::Positive(stream.current_position));
pre_queued_buffer.dts = Some(stream.current_position);
} else {
pre_queued_buffer.dts = Some(gst::Signed::Positive(dts));
pre_queued_buffer.dts = Some(dts);
stream.current_position = dts;
}
pre_queued_buffer.end_dts = Some(gst::Signed::Positive(std::cmp::max(end_dts, dts)));
pre_queued_buffer.end_dts = Some(std::cmp::max(end_dts, dts));
}
let PreQueuedBuffer {
@ -1175,9 +1128,6 @@ impl FMP4Mux {
end_dts,
} = pre_queued_buffer;
let dts = dts.map(|v| v.positive().unwrap());
let end_dts = end_dts.map(|v| v.positive().unwrap());
let pts_position = buffer.pts().unwrap();
// Accept new buffers if this is either an empty queue and we
@ -1189,10 +1139,9 @@ impl FMP4Mux {
gst::debug!(
CAT,
obj = stream.sinkpad,
"Starting new GOP at PTS {} DTS {} (DTS offset {})",
"Starting new GOP at PTS {} DTS {}",
pts,
dts.display(),
stream.dts_offset.display(),
);
// TODO: Move this to the stream creation
@ -2447,11 +2396,11 @@ impl FMP4Mux {
// End PTS
gst::ClockTime,
// Start DTS
Option<gst::ClockTime>,
Option<gst::Signed<gst::ClockTime>>,
// Start DTS position
Option<gst::ClockTime>,
// End DTS
Option<gst::ClockTime>,
Option<gst::Signed<gst::ClockTime>>,
// Start NTP time (either matches start_dts if required or earliest_pts)
Option<gst::ClockTime>,
)>,
@ -2512,7 +2461,7 @@ impl FMP4Mux {
}
let timestamp = if !stream.delta_frames.requires_dts() {
buffer.pts
gst::Signed::Positive(buffer.pts)
} else {
buffer.dts.unwrap()
};
@ -2525,14 +2474,14 @@ impl FMP4Mux {
}) {
Some(buffer) => {
if !stream.delta_frames.requires_dts() {
buffer.pts
gst::Signed::Positive(buffer.pts)
} else {
buffer.dts.unwrap()
}
}
None => {
if !stream.delta_frames.requires_dts() {
end_pts
gst::Signed::Positive(end_pts)
} else {
end_dts.unwrap()
}
@ -2542,6 +2491,7 @@ impl FMP4Mux {
// Timestamps are enforced to monotonically increase when queueing buffers
let duration = end_timestamp
.checked_sub(timestamp)
.and_then(|duration| duration.positive())
.expect("Timestamps going backwards");
let composition_time_offset = if !stream.delta_frames.requires_dts() {
@ -2550,15 +2500,13 @@ impl FMP4Mux {
let pts = buffer.pts;
let dts = buffer.dts.unwrap();
Some(
i64::try_from(
(gst::Signed::Positive(pts) - gst::Signed::Positive(dts)).nseconds(),
)
.map_err(|_| {
let offset =
i64::try_from((gst::Signed::Positive(pts) - dts).nseconds()).map_err(|_| {
gst::error!(CAT, obj = stream.sinkpad, "Too big PTS/DTS difference");
gst::FlowError::Error
})?,
)
})?;
Some(offset)
};
buffers.push_back(Buffer {
@ -2583,6 +2531,37 @@ impl FMP4Mux {
let start_dts = start_dts;
let start_dts_position = start_dts_position;
// Now need to adjust for negative DTS, which does not only include actually
// negative DTS but also start DTS being before earliest PTS in general (i.e.
// negative DTS but the whole timeline is shifted above zero).
//
// The buffer with the start DTS (i.e. the first buffer) gets assigned to DTS
// zero (i.e. value of tfdt) in each fragment due to MP4 working based on all
// samples having increasing DTS by the duration of each sample.
//
// We're setting the tfdt to the earliest PTS of the fragment as it is supposed
// to be the sum of all sample durations of all previous fragments, so we
// need to shift all composition time offsets by the difference between the start DTS and
// the earliest PTS.
if stream.delta_frames.requires_dts() {
let offset =
i64::try_from((earliest_pts - start_dts.unwrap()).nseconds()).map_err(|_| {
gst::error!(
CAT,
obj = stream.sinkpad,
"Too big earliest PTS / start DTS difference"
);
gst::FlowError::Error
})?;
if offset != 0 {
for buffer in &mut buffers {
buffer.composition_time_offset =
Some(buffer.composition_time_offset.unwrap() - offset)
}
}
}
Ok(Some((
buffers,
earliest_pts,
@ -2854,11 +2833,10 @@ impl FMP4Mux {
gst::info!(
CAT,
obj = stream.sinkpad,
"Draining {} worth of buffers starting at PTS {} DTS {}, DTS offset {}",
"Draining {} worth of buffers starting at PTS {} DTS {}",
end_pts.saturating_sub(earliest_pts),
earliest_pts,
start_dts.display(),
stream.dts_offset.display(),
);
if min_earliest_pts.opt_gt(earliest_pts).unwrap_or(true) {
@ -3002,6 +2980,7 @@ impl FMP4Mux {
{
let Some(fku_time) =
utc_time_to_running_time(Some(pts), stream.running_time_utc_time_mapping.unwrap())
.and_then(|fku_time| fku_time.positive())
else {
return;
};
@ -3106,14 +3085,8 @@ impl FMP4Mux {
// Offset stream start time to start at 0 in ONVIF mode, or if 'offset-to-zero' is enabled,
// instead of using the UTC time verbatim. This would be used for the tfdt box later.
// FIXME: Should this use the original DTS-or-PTS running time instead?
// That might be negative though!
if self.obj().class().as_ref().variant == super::Variant::ONVIF || settings.offset_to_zero {
let offset = if let Some(start_dts) = state.start_dts {
std::cmp::min(start_dts, state.earliest_pts.unwrap())
} else {
state.earliest_pts.unwrap()
};
let offset = state.earliest_pts.unwrap();
for stream in &mut streams {
if let Some(start_time) = stream.start_time {
stream.start_time = Some(start_time.checked_sub(offset).unwrap());
@ -3639,8 +3612,7 @@ impl FMP4Mux {
queued_gops: VecDeque::new(),
fragment_filled: false,
chunk_filled: false,
dts_offset: None,
current_position: gst::ClockTime::ZERO,
current_position: gst::ClockTime::MIN_SIGNED,
running_time_utc_time_mapping: None,
extra_header_data: None,
codec_specific_boxes,

View file

@ -298,8 +298,8 @@ pub(crate) struct Buffer {
/// Actual buffer
buffer: gst::Buffer,
/// Timestamp
timestamp: gst::ClockTime,
/// Timestamp (PTS or DTS)
timestamp: gst::Signed<gst::ClockTime>,
/// Sample duration
duration: gst::ClockTime,