diff --git a/mux/fmp4/src/fmp4mux/imp.rs b/mux/fmp4/src/fmp4mux/imp.rs index 65bd4c035..55e458a8e 100644 --- a/mux/fmp4/src/fmp4mux/imp.rs +++ b/mux/fmp4/src/fmp4mux/imp.rs @@ -81,19 +81,18 @@ fn running_time_to_utc_time( /// Converts an UTC time to a running time. fn utc_time_to_running_time( - utc_time: Option, + utc_time: Option>>, running_time_utc_time_mapping: ( impl Into>, impl Into>, ), -) -> Option { - let utc_time = utc_time?; +) -> Option> { + let utc_time = utc_time?.into(); running_time_utc_time_mapping .0 .into() .checked_sub(running_time_utc_time_mapping.1.into()) - .and_then(|res| res.checked_add_unsigned(utc_time)) - .and_then(|res| res.positive()) + .and_then(|res| res.checked_add(utc_time)) } pub static CAT: LazyLock = LazyLock::new(|| { @@ -196,7 +195,7 @@ struct GopBuffer { buffer: gst::Buffer, pts: gst::ClockTime, pts_position: gst::ClockTime, - dts: Option, + dts: Option>, /// If a split-now event was received and this buffer should become the first /// buffer of the next fragment or chunk. /// @@ -209,7 +208,7 @@ struct Gop { /// Start PTS. start_pts: gst::ClockTime, /// Start DTS. - start_dts: Option, + start_dts: Option>, /// Earliest PTS. earliest_pts: gst::ClockTime, /// Once this is known to be the final earliest PTS/DTS @@ -219,7 +218,7 @@ struct Gop { /// Once this is known to be the final end PTS/DTS final_end_pts: bool, /// DTS plus duration of last buffer, or start of next GOP - end_dts: Option, + end_dts: Option>, /// Earliest PTS buffer position earliest_pts_position: gst::ClockTime, @@ -263,12 +262,9 @@ struct Stream { /// Whether a whole chunk is queued. chunk_filled: bool, - /// Difference between the first DTS and 0 in case of negative DTS - dts_offset: Option, - /// Current position (DTS, or PTS for intra-only) to prevent /// timestamps from going backwards when queueing new buffers - current_position: gst::ClockTime, + current_position: gst::Signed, /// Mapping between running time and UTC time in ONVIF mode. running_time_utc_time_mapping: Option<(gst::Signed, gst::ClockTime)>, @@ -378,8 +374,7 @@ impl Stream { fn flush(&mut self) { self.queued_gops.clear(); - self.dts_offset = None; - self.current_position = gst::ClockTime::ZERO; + self.current_position = gst::ClockTime::MIN_SIGNED; self.fragment_filled = false; self.pre_queue.clear(); self.running_time_utc_time_mapping = None; @@ -431,7 +426,7 @@ struct State { /// Current end PTS of the whole stream end_pts: Option, /// Start DTS of the whole stream - start_dts: Option, + start_dts: Option>, /// Start PTS of the current fragment fragment_start_pts: Option, @@ -1097,60 +1092,18 @@ impl FMP4Mux { pre_queued_buffer.pts, stream.current_position, ); - pre_queued_buffer.pts = stream.current_position; + pre_queued_buffer.pts = stream.current_position.positive().unwrap(); } else { - stream.current_position = pre_queued_buffer.pts; + stream.current_position = gst::Signed::Positive(pre_queued_buffer.pts); } pre_queued_buffer.end_pts = std::cmp::max(pre_queued_buffer.end_pts, pre_queued_buffer.pts); } else { - // Negative DTS are handled via the dts_offset and by having negative composition time - // offsets in the `trun` box. The smallest DTS here is shifted to zero. - let dts = match pre_queued_buffer.dts.unwrap() { - gst::Signed::Positive(dts) => { - if let Some(dts_offset) = stream.dts_offset { - dts + dts_offset - } else { - dts - } - } - gst::Signed::Negative(dts) => { - if stream.dts_offset.is_none() { - stream.dts_offset = Some(dts); - } + let dts = pre_queued_buffer.dts.unwrap(); + let end_dts = pre_queued_buffer.end_dts.unwrap(); - let dts_offset = stream.dts_offset.unwrap(); - if dts > dts_offset { - gst::warning!(CAT, obj = stream.sinkpad, "DTS before first DTS"); - gst::ClockTime::ZERO - } else { - dts_offset - dts - } - } - }; - - let end_dts = match pre_queued_buffer.end_dts.unwrap() { - gst::Signed::Positive(dts) => { - if let Some(dts_offset) = stream.dts_offset { - dts + dts_offset - } else { - dts - } - } - gst::Signed::Negative(dts) => { - let dts_offset = stream.dts_offset.unwrap(); - if dts > dts_offset { - gst::warning!(CAT, obj = stream.sinkpad, "End DTS before first DTS"); - gst::ClockTime::ZERO - } else { - dts_offset - dts - } - } - }; - - // Enforce monotonically increasing DTS for intra-only streams + // Enforce monotonically increasing DTS // NOTE: PTS stays the same so this will cause a bigger PTS/DTS difference - // FIXME: Is this correct? if dts < stream.current_position { gst::warning!( CAT, @@ -1159,12 +1112,12 @@ impl FMP4Mux { dts, stream.current_position, ); - pre_queued_buffer.dts = Some(gst::Signed::Positive(stream.current_position)); + pre_queued_buffer.dts = Some(stream.current_position); } else { - pre_queued_buffer.dts = Some(gst::Signed::Positive(dts)); + pre_queued_buffer.dts = Some(dts); stream.current_position = dts; } - pre_queued_buffer.end_dts = Some(gst::Signed::Positive(std::cmp::max(end_dts, dts))); + pre_queued_buffer.end_dts = Some(std::cmp::max(end_dts, dts)); } let PreQueuedBuffer { @@ -1175,9 +1128,6 @@ impl FMP4Mux { end_dts, } = pre_queued_buffer; - let dts = dts.map(|v| v.positive().unwrap()); - let end_dts = end_dts.map(|v| v.positive().unwrap()); - let pts_position = buffer.pts().unwrap(); // Accept new buffers if this is either an empty queue and we @@ -1189,10 +1139,9 @@ impl FMP4Mux { gst::debug!( CAT, obj = stream.sinkpad, - "Starting new GOP at PTS {} DTS {} (DTS offset {})", + "Starting new GOP at PTS {} DTS {}", pts, dts.display(), - stream.dts_offset.display(), ); // TODO: Move this to the stream creation @@ -2447,11 +2396,11 @@ impl FMP4Mux { // End PTS gst::ClockTime, // Start DTS - Option, + Option>, // Start DTS position Option, // End DTS - Option, + Option>, // Start NTP time (either matches start_dts if required or earliest_pts) Option, )>, @@ -2512,7 +2461,7 @@ impl FMP4Mux { } let timestamp = if !stream.delta_frames.requires_dts() { - buffer.pts + gst::Signed::Positive(buffer.pts) } else { buffer.dts.unwrap() }; @@ -2525,14 +2474,14 @@ impl FMP4Mux { }) { Some(buffer) => { if !stream.delta_frames.requires_dts() { - buffer.pts + gst::Signed::Positive(buffer.pts) } else { buffer.dts.unwrap() } } None => { if !stream.delta_frames.requires_dts() { - end_pts + gst::Signed::Positive(end_pts) } else { end_dts.unwrap() } @@ -2542,6 +2491,7 @@ impl FMP4Mux { // Timestamps are enforced to monotonically increase when queueing buffers let duration = end_timestamp .checked_sub(timestamp) + .and_then(|duration| duration.positive()) .expect("Timestamps going backwards"); let composition_time_offset = if !stream.delta_frames.requires_dts() { @@ -2550,15 +2500,13 @@ impl FMP4Mux { let pts = buffer.pts; let dts = buffer.dts.unwrap(); - Some( - i64::try_from( - (gst::Signed::Positive(pts) - gst::Signed::Positive(dts)).nseconds(), - ) - .map_err(|_| { + let offset = + i64::try_from((gst::Signed::Positive(pts) - dts).nseconds()).map_err(|_| { gst::error!(CAT, obj = stream.sinkpad, "Too big PTS/DTS difference"); gst::FlowError::Error - })?, - ) + })?; + + Some(offset) }; buffers.push_back(Buffer { @@ -2583,6 +2531,37 @@ impl FMP4Mux { let start_dts = start_dts; let start_dts_position = start_dts_position; + // Now need to adjust for negative DTS, which does not only include actually + // negative DTS but also start DTS being before earliest PTS in general (i.e. + // negative DTS but the whole timeline is shifted above zero). + // + // The buffer with the start DTS (i.e. the first buffer) gets assigned to DTS + // zero (i.e. value of tfdt) in each fragment due to MP4 working based on all + // samples having increasing DTS by the duration of each sample. + // + // We're setting the tfdt to the earliest PTS of the fragment as it is supposed + // to be the sum of all sample durations of all previous fragments, so we + // need to shift all composition time offsets by the difference between the start DTS and + // the earliest PTS. + if stream.delta_frames.requires_dts() { + let offset = + i64::try_from((earliest_pts - start_dts.unwrap()).nseconds()).map_err(|_| { + gst::error!( + CAT, + obj = stream.sinkpad, + "Too big earliest PTS / start DTS difference" + ); + gst::FlowError::Error + })?; + + if offset != 0 { + for buffer in &mut buffers { + buffer.composition_time_offset = + Some(buffer.composition_time_offset.unwrap() - offset) + } + } + } + Ok(Some(( buffers, earliest_pts, @@ -2854,11 +2833,10 @@ impl FMP4Mux { gst::info!( CAT, obj = stream.sinkpad, - "Draining {} worth of buffers starting at PTS {} DTS {}, DTS offset {}", + "Draining {} worth of buffers starting at PTS {} DTS {}", end_pts.saturating_sub(earliest_pts), earliest_pts, start_dts.display(), - stream.dts_offset.display(), ); if min_earliest_pts.opt_gt(earliest_pts).unwrap_or(true) { @@ -3002,6 +2980,7 @@ impl FMP4Mux { { let Some(fku_time) = utc_time_to_running_time(Some(pts), stream.running_time_utc_time_mapping.unwrap()) + .and_then(|fku_time| fku_time.positive()) else { return; }; @@ -3106,14 +3085,8 @@ impl FMP4Mux { // Offset stream start time to start at 0 in ONVIF mode, or if 'offset-to-zero' is enabled, // instead of using the UTC time verbatim. This would be used for the tfdt box later. - // FIXME: Should this use the original DTS-or-PTS running time instead? - // That might be negative though! if self.obj().class().as_ref().variant == super::Variant::ONVIF || settings.offset_to_zero { - let offset = if let Some(start_dts) = state.start_dts { - std::cmp::min(start_dts, state.earliest_pts.unwrap()) - } else { - state.earliest_pts.unwrap() - }; + let offset = state.earliest_pts.unwrap(); for stream in &mut streams { if let Some(start_time) = stream.start_time { stream.start_time = Some(start_time.checked_sub(offset).unwrap()); @@ -3639,8 +3612,7 @@ impl FMP4Mux { queued_gops: VecDeque::new(), fragment_filled: false, chunk_filled: false, - dts_offset: None, - current_position: gst::ClockTime::ZERO, + current_position: gst::ClockTime::MIN_SIGNED, running_time_utc_time_mapping: None, extra_header_data: None, codec_specific_boxes, diff --git a/mux/fmp4/src/fmp4mux/mod.rs b/mux/fmp4/src/fmp4mux/mod.rs index 995314be4..c3c5b0bb2 100644 --- a/mux/fmp4/src/fmp4mux/mod.rs +++ b/mux/fmp4/src/fmp4mux/mod.rs @@ -298,8 +298,8 @@ pub(crate) struct Buffer { /// Actual buffer buffer: gst::Buffer, - /// Timestamp - timestamp: gst::ClockTime, + /// Timestamp (PTS or DTS) + timestamp: gst::Signed, /// Sample duration duration: gst::ClockTime,