From 144aeb615ac7656672402d73b6fa88c873d88b74 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Dr=C3=B6ge?= Date: Mon, 5 May 2025 16:47:34 +0300 Subject: [PATCH] fmp4mux: Fix handling of negative DTS in composition time offset Actually negative DTS (i.e. < 0) was handled correctly before when queueing buffers, but this didn't handle the case correctly where the very same stream has DTS/PTS shifted to be positive (i.e. start DTS is before earliest PTS). To solve this, pass through all DTS as signed values and at the very end for every fragment calculate the offset between the two in addition to the PTS/DTS offset of each buffer, and adjust the composition time offsets accordingly. This way the final PTS of each buffer after demuxing is equal to the actual running time of the very same buffer before muxing. Previously all PTS would've been shifted forwards by a couple of frames, which then broke A/V synchronization. Part-of: --- mux/fmp4/src/fmp4mux/imp.rs | 158 +++++++++++++++--------------------- mux/fmp4/src/fmp4mux/mod.rs | 4 +- 2 files changed, 67 insertions(+), 95 deletions(-) diff --git a/mux/fmp4/src/fmp4mux/imp.rs b/mux/fmp4/src/fmp4mux/imp.rs index 65bd4c035..55e458a8e 100644 --- a/mux/fmp4/src/fmp4mux/imp.rs +++ b/mux/fmp4/src/fmp4mux/imp.rs @@ -81,19 +81,18 @@ fn running_time_to_utc_time( /// Converts an UTC time to a running time. fn utc_time_to_running_time( - utc_time: Option, + utc_time: Option>>, running_time_utc_time_mapping: ( impl Into>, impl Into>, ), -) -> Option { - let utc_time = utc_time?; +) -> Option> { + let utc_time = utc_time?.into(); running_time_utc_time_mapping .0 .into() .checked_sub(running_time_utc_time_mapping.1.into()) - .and_then(|res| res.checked_add_unsigned(utc_time)) - .and_then(|res| res.positive()) + .and_then(|res| res.checked_add(utc_time)) } pub static CAT: LazyLock = LazyLock::new(|| { @@ -196,7 +195,7 @@ struct GopBuffer { buffer: gst::Buffer, pts: gst::ClockTime, pts_position: gst::ClockTime, - dts: Option, + dts: Option>, /// If a split-now event was received and this buffer should become the first /// buffer of the next fragment or chunk. /// @@ -209,7 +208,7 @@ struct Gop { /// Start PTS. start_pts: gst::ClockTime, /// Start DTS. - start_dts: Option, + start_dts: Option>, /// Earliest PTS. earliest_pts: gst::ClockTime, /// Once this is known to be the final earliest PTS/DTS @@ -219,7 +218,7 @@ struct Gop { /// Once this is known to be the final end PTS/DTS final_end_pts: bool, /// DTS plus duration of last buffer, or start of next GOP - end_dts: Option, + end_dts: Option>, /// Earliest PTS buffer position earliest_pts_position: gst::ClockTime, @@ -263,12 +262,9 @@ struct Stream { /// Whether a whole chunk is queued. chunk_filled: bool, - /// Difference between the first DTS and 0 in case of negative DTS - dts_offset: Option, - /// Current position (DTS, or PTS for intra-only) to prevent /// timestamps from going backwards when queueing new buffers - current_position: gst::ClockTime, + current_position: gst::Signed, /// Mapping between running time and UTC time in ONVIF mode. running_time_utc_time_mapping: Option<(gst::Signed, gst::ClockTime)>, @@ -378,8 +374,7 @@ impl Stream { fn flush(&mut self) { self.queued_gops.clear(); - self.dts_offset = None; - self.current_position = gst::ClockTime::ZERO; + self.current_position = gst::ClockTime::MIN_SIGNED; self.fragment_filled = false; self.pre_queue.clear(); self.running_time_utc_time_mapping = None; @@ -431,7 +426,7 @@ struct State { /// Current end PTS of the whole stream end_pts: Option, /// Start DTS of the whole stream - start_dts: Option, + start_dts: Option>, /// Start PTS of the current fragment fragment_start_pts: Option, @@ -1097,60 +1092,18 @@ impl FMP4Mux { pre_queued_buffer.pts, stream.current_position, ); - pre_queued_buffer.pts = stream.current_position; + pre_queued_buffer.pts = stream.current_position.positive().unwrap(); } else { - stream.current_position = pre_queued_buffer.pts; + stream.current_position = gst::Signed::Positive(pre_queued_buffer.pts); } pre_queued_buffer.end_pts = std::cmp::max(pre_queued_buffer.end_pts, pre_queued_buffer.pts); } else { - // Negative DTS are handled via the dts_offset and by having negative composition time - // offsets in the `trun` box. The smallest DTS here is shifted to zero. - let dts = match pre_queued_buffer.dts.unwrap() { - gst::Signed::Positive(dts) => { - if let Some(dts_offset) = stream.dts_offset { - dts + dts_offset - } else { - dts - } - } - gst::Signed::Negative(dts) => { - if stream.dts_offset.is_none() { - stream.dts_offset = Some(dts); - } + let dts = pre_queued_buffer.dts.unwrap(); + let end_dts = pre_queued_buffer.end_dts.unwrap(); - let dts_offset = stream.dts_offset.unwrap(); - if dts > dts_offset { - gst::warning!(CAT, obj = stream.sinkpad, "DTS before first DTS"); - gst::ClockTime::ZERO - } else { - dts_offset - dts - } - } - }; - - let end_dts = match pre_queued_buffer.end_dts.unwrap() { - gst::Signed::Positive(dts) => { - if let Some(dts_offset) = stream.dts_offset { - dts + dts_offset - } else { - dts - } - } - gst::Signed::Negative(dts) => { - let dts_offset = stream.dts_offset.unwrap(); - if dts > dts_offset { - gst::warning!(CAT, obj = stream.sinkpad, "End DTS before first DTS"); - gst::ClockTime::ZERO - } else { - dts_offset - dts - } - } - }; - - // Enforce monotonically increasing DTS for intra-only streams + // Enforce monotonically increasing DTS // NOTE: PTS stays the same so this will cause a bigger PTS/DTS difference - // FIXME: Is this correct? if dts < stream.current_position { gst::warning!( CAT, @@ -1159,12 +1112,12 @@ impl FMP4Mux { dts, stream.current_position, ); - pre_queued_buffer.dts = Some(gst::Signed::Positive(stream.current_position)); + pre_queued_buffer.dts = Some(stream.current_position); } else { - pre_queued_buffer.dts = Some(gst::Signed::Positive(dts)); + pre_queued_buffer.dts = Some(dts); stream.current_position = dts; } - pre_queued_buffer.end_dts = Some(gst::Signed::Positive(std::cmp::max(end_dts, dts))); + pre_queued_buffer.end_dts = Some(std::cmp::max(end_dts, dts)); } let PreQueuedBuffer { @@ -1175,9 +1128,6 @@ impl FMP4Mux { end_dts, } = pre_queued_buffer; - let dts = dts.map(|v| v.positive().unwrap()); - let end_dts = end_dts.map(|v| v.positive().unwrap()); - let pts_position = buffer.pts().unwrap(); // Accept new buffers if this is either an empty queue and we @@ -1189,10 +1139,9 @@ impl FMP4Mux { gst::debug!( CAT, obj = stream.sinkpad, - "Starting new GOP at PTS {} DTS {} (DTS offset {})", + "Starting new GOP at PTS {} DTS {}", pts, dts.display(), - stream.dts_offset.display(), ); // TODO: Move this to the stream creation @@ -2447,11 +2396,11 @@ impl FMP4Mux { // End PTS gst::ClockTime, // Start DTS - Option, + Option>, // Start DTS position Option, // End DTS - Option, + Option>, // Start NTP time (either matches start_dts if required or earliest_pts) Option, )>, @@ -2512,7 +2461,7 @@ impl FMP4Mux { } let timestamp = if !stream.delta_frames.requires_dts() { - buffer.pts + gst::Signed::Positive(buffer.pts) } else { buffer.dts.unwrap() }; @@ -2525,14 +2474,14 @@ impl FMP4Mux { }) { Some(buffer) => { if !stream.delta_frames.requires_dts() { - buffer.pts + gst::Signed::Positive(buffer.pts) } else { buffer.dts.unwrap() } } None => { if !stream.delta_frames.requires_dts() { - end_pts + gst::Signed::Positive(end_pts) } else { end_dts.unwrap() } @@ -2542,6 +2491,7 @@ impl FMP4Mux { // Timestamps are enforced to monotonically increase when queueing buffers let duration = end_timestamp .checked_sub(timestamp) + .and_then(|duration| duration.positive()) .expect("Timestamps going backwards"); let composition_time_offset = if !stream.delta_frames.requires_dts() { @@ -2550,15 +2500,13 @@ impl FMP4Mux { let pts = buffer.pts; let dts = buffer.dts.unwrap(); - Some( - i64::try_from( - (gst::Signed::Positive(pts) - gst::Signed::Positive(dts)).nseconds(), - ) - .map_err(|_| { + let offset = + i64::try_from((gst::Signed::Positive(pts) - dts).nseconds()).map_err(|_| { gst::error!(CAT, obj = stream.sinkpad, "Too big PTS/DTS difference"); gst::FlowError::Error - })?, - ) + })?; + + Some(offset) }; buffers.push_back(Buffer { @@ -2583,6 +2531,37 @@ impl FMP4Mux { let start_dts = start_dts; let start_dts_position = start_dts_position; + // Now need to adjust for negative DTS, which does not only include actually + // negative DTS but also start DTS being before earliest PTS in general (i.e. + // negative DTS but the whole timeline is shifted above zero). + // + // The buffer with the start DTS (i.e. the first buffer) gets assigned to DTS + // zero (i.e. value of tfdt) in each fragment due to MP4 working based on all + // samples having increasing DTS by the duration of each sample. + // + // We're setting the tfdt to the earliest PTS of the fragment as it is supposed + // to be the sum of all sample durations of all previous fragments, so we + // need to shift all composition time offsets by the difference between the start DTS and + // the earliest PTS. + if stream.delta_frames.requires_dts() { + let offset = + i64::try_from((earliest_pts - start_dts.unwrap()).nseconds()).map_err(|_| { + gst::error!( + CAT, + obj = stream.sinkpad, + "Too big earliest PTS / start DTS difference" + ); + gst::FlowError::Error + })?; + + if offset != 0 { + for buffer in &mut buffers { + buffer.composition_time_offset = + Some(buffer.composition_time_offset.unwrap() - offset) + } + } + } + Ok(Some(( buffers, earliest_pts, @@ -2854,11 +2833,10 @@ impl FMP4Mux { gst::info!( CAT, obj = stream.sinkpad, - "Draining {} worth of buffers starting at PTS {} DTS {}, DTS offset {}", + "Draining {} worth of buffers starting at PTS {} DTS {}", end_pts.saturating_sub(earliest_pts), earliest_pts, start_dts.display(), - stream.dts_offset.display(), ); if min_earliest_pts.opt_gt(earliest_pts).unwrap_or(true) { @@ -3002,6 +2980,7 @@ impl FMP4Mux { { let Some(fku_time) = utc_time_to_running_time(Some(pts), stream.running_time_utc_time_mapping.unwrap()) + .and_then(|fku_time| fku_time.positive()) else { return; }; @@ -3106,14 +3085,8 @@ impl FMP4Mux { // Offset stream start time to start at 0 in ONVIF mode, or if 'offset-to-zero' is enabled, // instead of using the UTC time verbatim. This would be used for the tfdt box later. - // FIXME: Should this use the original DTS-or-PTS running time instead? - // That might be negative though! if self.obj().class().as_ref().variant == super::Variant::ONVIF || settings.offset_to_zero { - let offset = if let Some(start_dts) = state.start_dts { - std::cmp::min(start_dts, state.earliest_pts.unwrap()) - } else { - state.earliest_pts.unwrap() - }; + let offset = state.earliest_pts.unwrap(); for stream in &mut streams { if let Some(start_time) = stream.start_time { stream.start_time = Some(start_time.checked_sub(offset).unwrap()); @@ -3639,8 +3612,7 @@ impl FMP4Mux { queued_gops: VecDeque::new(), fragment_filled: false, chunk_filled: false, - dts_offset: None, - current_position: gst::ClockTime::ZERO, + current_position: gst::ClockTime::MIN_SIGNED, running_time_utc_time_mapping: None, extra_header_data: None, codec_specific_boxes, diff --git a/mux/fmp4/src/fmp4mux/mod.rs b/mux/fmp4/src/fmp4mux/mod.rs index 995314be4..c3c5b0bb2 100644 --- a/mux/fmp4/src/fmp4mux/mod.rs +++ b/mux/fmp4/src/fmp4mux/mod.rs @@ -298,8 +298,8 @@ pub(crate) struct Buffer { /// Actual buffer buffer: gst::Buffer, - /// Timestamp - timestamp: gst::ClockTime, + /// Timestamp (PTS or DTS) + timestamp: gst::Signed, /// Sample duration duration: gst::ClockTime,