From 6c15bba592461b08b1d4f6bf70c2c1fcd590569d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Dr=C3=B6ge?= Date: Thu, 17 Nov 2022 19:53:48 +0200 Subject: [PATCH] fmp4mux: Re-work buffer dequeueing and calculations of timestamps Especially simplify calculation of ONVIF UTC times. As a side-effect this reduces the number of times the running times of a buffer are calculated, and also causes streams to be interleaved correctly in ONVIF mode if there is a non-constant UTC-to-running-time difference. Part-of: --- mux/fmp4/src/fmp4mux/imp.rs | 1182 +++++++++++++++++++---------------- 1 file changed, 655 insertions(+), 527 deletions(-) diff --git a/mux/fmp4/src/fmp4mux/imp.rs b/mux/fmp4/src/fmp4mux/imp.rs index 8c3ede38..9bb90d9d 100644 --- a/mux/fmp4/src/fmp4mux/imp.rs +++ b/mux/fmp4/src/fmp4mux/imp.rs @@ -53,6 +53,29 @@ fn get_utc_time_from_buffer(buffer: &gst::BufferRef) -> Option { }) } +/// Converts a running time to an UTC time. +fn running_time_to_utc_time( + running_time: gst::Signed, + running_time_utc_time_mapping: (gst::Signed, gst::ClockTime), +) -> Option { + gst::Signed::Positive(running_time_utc_time_mapping.1) + .checked_sub(running_time_utc_time_mapping.0) + .and_then(|res| res.checked_add(running_time)) + .and_then(|res| res.positive()) +} + +/// Converts an UTC time to a running time. +fn utc_time_to_running_time( + utc_time: gst::ClockTime, + running_time_utc_time_mapping: (gst::Signed, gst::ClockTime), +) -> Option { + running_time_utc_time_mapping + .0 + .checked_sub(gst::Signed::Positive(running_time_utc_time_mapping.1)) + .and_then(|res| res.checked_add_unsigned(utc_time)) + .and_then(|res| res.positive()) +} + static CAT: Lazy = Lazy::new(|| { gst::DebugCategory::new( "fmp4mux", @@ -93,6 +116,34 @@ impl Default for Settings { } } +#[derive(Debug, Clone)] +struct PreQueuedBuffer { + /// Buffer + /// + /// Buffer PTS/DTS are updated to the output segment in multi-stream configurations. + buffer: gst::Buffer, + + /// PTS + /// + /// In ONVIF mode this is the UTC time, otherwise it is the PTS running time. + pts: gst::ClockTime, + + /// End PTS + /// + /// In ONVIF mode this is the UTC time, otherwise it is the PTS running time. + end_pts: gst::ClockTime, + + /// DTS + /// + /// In ONVIF mode this is the UTC time, otherwise it is the DTS running time. + dts: Option>, + + /// End DTS + /// + /// In ONVIF mode this is the UTC time, otherwise it is the DTS running time. + end_dts: Option>, +} + #[derive(Debug)] struct GopBuffer { buffer: gst::Buffer, @@ -102,77 +153,92 @@ struct GopBuffer { #[derive(Debug)] struct Gop { - // Running times + /// Start PTS. start_pts: gst::ClockTime, + /// Start DTS. start_dts: Option, + /// Earliest PTS. earliest_pts: gst::ClockTime, - // Once this is known to be the final earliest PTS/DTS + /// Once this is known to be the final earliest PTS/DTS final_earliest_pts: bool, - // PTS plus duration of last buffer, or start of next GOP + /// PTS plus duration of last buffer, or start of next GOP end_pts: gst::ClockTime, - // Once this is known to be the final end PTS/DTS + /// Once this is known to be the final end PTS/DTS final_end_pts: bool, - // DTS plus duration of last buffer, or start of next GOP + /// DTS plus duration of last buffer, or start of next GOP end_dts: Option, - // Buffer positions + /// Earliest PTS buffer position earliest_pts_position: gst::ClockTime, + /// Start DTS buffer position start_dts_position: Option, - // Buffer, PTS running time, DTS running time + /// Buffer, PTS running time, DTS running time buffers: Vec, } struct Stream { + /// Sink pad for this stream. sinkpad: super::FMP4MuxPad, + /// Pre-queue for ONVIF variant to timestamp all buffers with their UTC time. + /// + /// In non-ONVIF mode this just collects the PTS/DTS and the corresponding running + /// times for later usage. + pre_queue: VecDeque, + + /// Currently configured caps for this stream. caps: gst::Caps, + /// Whether this stream is intra-only and has frame reordering. delta_frames: DeltaFrames, + /// Currently queued GOPs, including incomplete ones. queued_gops: VecDeque, + /// Whether the fully queued GOPs are filling a whole fragment. fragment_filled: bool, - // Difference between the first DTS and 0 in case of negative DTS + /// Difference between the first DTS and 0 in case of negative DTS dts_offset: Option, - // Current position (DTS, or PTS for intra-only) to prevent - // timestamps from going backwards when queueing new buffers + /// Current position (DTS, or PTS for intra-only) to prevent + /// timestamps from going backwards when queueing new buffers current_position: gst::ClockTime, - // Current UTC time in ONVIF mode to prevent timestamps from - // going backwards when draining a fragment. - // UNIX epoch. - current_utc_time: gst::ClockTime, + /// Mapping between running time and UTC time in ONVIF mode. + running_time_utc_time_mapping: Option<(gst::Signed, gst::ClockTime)>, } #[derive(Default)] struct State { + /// Currently configured streams. streams: Vec, - // Created once we received caps and kept up to date with the caps, - // sent as part of the buffer list for the first fragment. + /// Stream header with ftyp and moov box. + /// + /// Created once we received caps and kept up to date with the caps, + /// sent as part of the buffer list for the first fragment. stream_header: Option, + /// Sequence number of the current fragment. sequence_number: u32, - // Fragment tracking for mfra + /// Fragment tracking for mfra box current_offset: u64, fragment_offsets: Vec, - // Start / end PTS of the whole stream + /// Earliest PTS of the whole stream earliest_pts: Option, + /// Current end PTS of the whole stream end_pts: Option, + /// Start DTS of the whole stream + start_dts: Option, - // Start PTS of the current fragment + /// Start PTS of the current fragment fragment_start_pts: Option, - // Additional timeout delay in case GOPs are bigger than the fragment duration + /// Additional timeout delay in case GOPs are bigger than the fragment duration timeout_delay: gst::ClockTime, - // In ONVIF mode the UTC time corresponding to the beginning of the stream - // UNIX epoch. - start_utc_time: Option, - end_utc_time: Option, - + /// If headers (ftyp / moov box) were sent. sent_headers: bool, } @@ -183,18 +249,363 @@ pub(crate) struct FMP4Mux { } impl FMP4Mux { + /// Checks if a buffer is valid according to the stream configuration. + fn check_buffer( + buffer: &gst::BufferRef, + sinkpad: &super::FMP4MuxPad, + delta_frames: super::DeltaFrames, + ) -> Result<(), gst::FlowError> { + if delta_frames.requires_dts() && buffer.dts().is_none() { + gst::error!(CAT, obj: sinkpad, "Require DTS for video streams"); + return Err(gst::FlowError::Error); + } + + if buffer.pts().is_none() { + gst::error!(CAT, obj: sinkpad, "Require timestamped buffers"); + return Err(gst::FlowError::Error); + } + + if delta_frames.intra_only() && buffer.flags().contains(gst::BufferFlags::DELTA_UNIT) { + gst::error!(CAT, obj: sinkpad, "Intra-only stream with delta units"); + return Err(gst::FlowError::Error); + } + + Ok(()) + } + + fn peek_buffer( + &self, + sinkpad: &super::FMP4MuxPad, + delta_frames: super::DeltaFrames, + pre_queue: &mut VecDeque, + running_time_utc_time_mapping: &mut Option<(gst::Signed, gst::ClockTime)>, + fragment_duration: gst::ClockTime, + ) -> Result, gst::FlowError> { + // If not in ONVIF mode or the mapping is already known and there is a pre-queued buffer + // then we can directly return it from here. + if self.obj().class().as_ref().variant != super::Variant::ONVIF + || running_time_utc_time_mapping.is_some() + { + if let Some(pre_queued_buffer) = pre_queue.front() { + return Ok(Some(pre_queued_buffer.clone())); + } + } + + // Pop buffer here, it will be stored in the pre-queue after calculating its timestamps + let mut buffer = match sinkpad.pop_buffer() { + None => return Ok(None), + Some(buffer) => buffer, + }; + + Self::check_buffer(&buffer, sinkpad, delta_frames)?; + + let segment = match sinkpad.segment().downcast::().ok() { + Some(segment) => segment, + None => { + gst::error!(CAT, obj: sinkpad, "Got buffer before segment"); + return Err(gst::FlowError::Error); + } + }; + + let pts_position = buffer.pts().unwrap(); + let duration = buffer.duration(); + let end_pts_position = duration.opt_add(pts_position).unwrap_or(pts_position); + + let pts = segment + .to_running_time_full(pts_position) + .ok_or_else(|| { + gst::error!(CAT, obj: sinkpad, "Couldn't convert PTS to running time"); + gst::FlowError::Error + })? + .positive() + .unwrap_or_else(|| { + gst::warning!(CAT, obj: sinkpad, "Negative PTSs are not supported"); + gst::ClockTime::ZERO + }); + + let end_pts = segment + .to_running_time_full(end_pts_position) + .ok_or_else(|| { + gst::error!( + CAT, + obj: sinkpad, + "Couldn't convert end PTS to running time" + ); + gst::FlowError::Error + })? + .positive() + .unwrap_or_else(|| { + gst::warning!(CAT, obj: sinkpad, "Negative PTSs are not supported"); + gst::ClockTime::ZERO + }); + + let (dts, end_dts) = if !delta_frames.requires_dts() { + (None, None) + } else { + // Negative DTS are handled via the dts_offset and by having negative composition time + // offsets in the `trun` box. The smallest DTS here is shifted to zero. + let dts_position = buffer.dts().expect("not DTS"); + let end_dts_position = duration.opt_add(dts_position).unwrap_or(dts_position); + + let dts = segment.to_running_time_full(dts_position).ok_or_else(|| { + gst::error!(CAT, obj: sinkpad, "Couldn't convert DTS to running time"); + gst::FlowError::Error + })?; + + let end_dts = segment + .to_running_time_full(end_dts_position) + .ok_or_else(|| { + gst::error!( + CAT, + obj: sinkpad, + "Couldn't convert end DTS to running time" + ); + gst::FlowError::Error + })?; + + let end_dts = std::cmp::max(end_dts, dts); + + (Some(dts), Some(end_dts)) + }; + + // If this is a multi-stream element then we need to update the PTS/DTS positions according + // to the output segment, specifically to re-timestamp them with the running time and + // adjust for the segment shift to compensate for negative DTS. + if !self.obj().class().as_ref().variant.is_single_stream() { + let pts_position = pts + SEGMENT_OFFSET; + let dts_position = dts.map(|dts| { + dts.checked_add_unsigned(SEGMENT_OFFSET) + .and_then(|dts| dts.positive()) + .unwrap_or(gst::ClockTime::ZERO) + }); + + let buffer = buffer.make_mut(); + buffer.set_pts(pts_position); + buffer.set_dts(dts_position); + } + + if self.obj().class().as_ref().variant != super::Variant::ONVIF { + // Store in the queue so we don't have to recalculate this all the time + pre_queue.push_back(PreQueuedBuffer { + buffer, + pts, + end_pts, + dts, + end_dts, + }); + } else if let Some(running_time_utc_time_mapping) = running_time_utc_time_mapping { + // For ONVIF we need to re-timestamp the buffer with its UTC time. + // + // After re-timestamping, put the buffer into the pre-queue so re-timestamping only has to + // happen once. + let utc_time = match get_utc_time_from_buffer(&buffer) { + None => { + // Calculate from the mapping + running_time_to_utc_time( + gst::Signed::Positive(pts), + *running_time_utc_time_mapping, + ) + .ok_or_else(|| { + gst::error!(CAT, obj: sinkpad, "Stream has negative PTS UTC time"); + gst::FlowError::Error + })? + } + Some(utc_time) => utc_time, + }; + gst::trace!( + CAT, + obj: sinkpad, + "Mapped PTS running time {pts} to UTC time {utc_time}" + ); + + let end_pts_utc_time = running_time_to_utc_time( + gst::Signed::Positive(end_pts), + (gst::Signed::Positive(pts), utc_time), + ) + .ok_or_else(|| { + gst::error!(CAT, obj: sinkpad, "Stream has negative end PTS UTC time"); + gst::FlowError::Error + })?; + + let (dts_utc_time, end_dts_utc_time) = if let Some(dts) = dts { + let dts_utc_time = + running_time_to_utc_time(dts, (gst::Signed::Positive(pts), utc_time)) + .ok_or_else(|| { + gst::error!(CAT, obj: sinkpad, "Stream has negative DTS UTC time"); + gst::FlowError::Error + })?; + gst::trace!( + CAT, + obj: sinkpad, + "Mapped DTS running time {dts} to UTC time {dts_utc_time}" + ); + + let end_dts_utc_time = running_time_to_utc_time( + end_dts.unwrap(), + (gst::Signed::Positive(pts), utc_time), + ) + .ok_or_else(|| { + gst::error!(CAT, obj: sinkpad, "Stream has negative end DTS UTC time"); + gst::FlowError::Error + })?; + + ( + Some(gst::Signed::Positive(dts_utc_time)), + Some(gst::Signed::Positive(end_dts_utc_time)), + ) + } else { + (None, None) + }; + + pre_queue.push_back(PreQueuedBuffer { + buffer, + pts: utc_time, + end_pts: end_pts_utc_time, + dts: dts_utc_time, + end_dts: end_dts_utc_time, + }); + } else { + // In ONVIF mode we need to get UTC times for each buffer and synchronize based on that. + // Queue up to min(6s, fragment_duration) of data in the very beginning to get the first UTC time and then backdate. + if let Some((last, first)) = Option::zip(pre_queue.back(), pre_queue.front()) { + // Existence of PTS/DTS checked below + let (last, first) = if delta_frames.requires_dts() { + (last.end_dts.unwrap(), first.end_dts.unwrap()) + } else { + ( + gst::Signed::Positive(last.end_pts), + gst::Signed::Positive(first.end_pts), + ) + }; + + let limit = std::cmp::min(gst::ClockTime::from_seconds(6), fragment_duration); + if last.saturating_sub(first) > gst::Signed::Positive(limit) { + gst::error!( + CAT, + obj: sinkpad, + "Got no UTC time in the first {limit} of the stream" + ); + return Err(gst::FlowError::Error); + } + } + + let utc_time = match get_utc_time_from_buffer(&buffer) { + Some(utc_time) => utc_time, + None => { + pre_queue.push_back(PreQueuedBuffer { + buffer, + pts, + end_pts, + dts, + end_dts, + }); + return Err(gst_base::AGGREGATOR_FLOW_NEED_DATA); + } + }; + + let mapping = (gst::Signed::Positive(pts), utc_time); + *running_time_utc_time_mapping = Some(mapping); + + // Push the buffer onto the pre-queue and re-timestamp it and all other buffers + // based on the mapping above once we have an UTC time. + pre_queue.push_back(PreQueuedBuffer { + buffer, + pts, + end_pts, + dts, + end_dts, + }); + + for pre_queued_buffer in pre_queue.iter_mut() { + let pts_utc_time = + running_time_to_utc_time(gst::Signed::Positive(pre_queued_buffer.pts), mapping) + .ok_or_else(|| { + gst::error!(CAT, obj: sinkpad, "Stream has negative PTS UTC time"); + gst::FlowError::Error + })?; + gst::trace!( + CAT, + obj: sinkpad, + "Mapped PTS running time {} to UTC time {pts_utc_time}", + pre_queued_buffer.pts, + ); + pre_queued_buffer.pts = pts_utc_time; + + let end_pts_utc_time = running_time_to_utc_time( + gst::Signed::Positive(pre_queued_buffer.end_pts), + mapping, + ) + .ok_or_else(|| { + gst::error!(CAT, obj: sinkpad, "Stream has negative end PTS UTC time"); + gst::FlowError::Error + })?; + pre_queued_buffer.end_pts = end_pts_utc_time; + + if let Some(dts) = pre_queued_buffer.dts { + let dts_utc_time = running_time_to_utc_time(dts, mapping).ok_or_else(|| { + gst::error!(CAT, obj: sinkpad, "Stream has negative DTS UTC time"); + gst::FlowError::Error + })?; + gst::trace!( + CAT, + obj: sinkpad, + "Mapped DTS running time {dts} to UTC time {dts_utc_time}" + ); + pre_queued_buffer.dts = Some(gst::Signed::Positive(dts_utc_time)); + + let end_dts_utc_time = + running_time_to_utc_time(pre_queued_buffer.end_dts.unwrap(), mapping) + .ok_or_else(|| { + gst::error!(CAT, obj: sinkpad, "Stream has negative DTS UTC time"); + gst::FlowError::Error + })?; + pre_queued_buffer.end_dts = Some(gst::Signed::Positive(end_dts_utc_time)); + } + } + + // Fall through and return the front of the queue + } + + Ok(Some(pre_queue.front().unwrap().clone())) + } + + fn pop_buffer( + &self, + _sinkpad: &super::FMP4MuxPad, + pre_queue: &mut VecDeque, + running_time_utc_time_mapping: &Option<(gst::Signed, gst::ClockTime)>, + ) -> PreQueuedBuffer { + // Only allowed to be called after peek was successful so there must be a buffer now + // or in ONVIF mode we must also know the mapping now. + + assert!(!pre_queue.is_empty()); + if self.obj().class().as_ref().variant == super::Variant::ONVIF { + assert!(running_time_utc_time_mapping.is_some()); + } + + pre_queue.pop_front().unwrap() + } + fn find_earliest_stream<'a>( &self, state: &'a mut State, timeout: bool, + fragment_duration: gst::ClockTime, ) -> Result, gst::FlowError> { let mut earliest_stream = None; let mut all_have_data_or_eos = true; for (idx, stream) in state.streams.iter_mut().enumerate() { - let buffer = match stream.sinkpad.peek_buffer() { - Some(buffer) => buffer, - None => { + let pre_queued_buffer = match Self::peek_buffer( + self, + &stream.sinkpad, + stream.delta_frames, + &mut stream.pre_queue, + &mut stream.running_time_utc_time_mapping, + fragment_duration, + ) { + Ok(Some(buffer)) => buffer, + Ok(None) | Err(gst_base::AGGREGATOR_FLOW_NEED_DATA) => { if stream.sinkpad.is_eos() { gst::trace!(CAT, obj: stream.sinkpad, "Stream is EOS"); } else { @@ -203,6 +614,7 @@ impl FMP4Mux { } continue; } + Err(err) => return Err(err), }; if stream.fragment_filled { @@ -210,39 +622,14 @@ impl FMP4Mux { continue; } - let segment = match stream - .sinkpad - .segment() - .clone() - .downcast::() - .ok() - { - Some(segment) => segment, - None => { - gst::error!(CAT, obj: stream.sinkpad, "Got buffer before segment"); - return Err(gst::FlowError::Error); - } - }; + gst::trace!(CAT, obj: stream.sinkpad, "Stream has running time PTS {} / DTS {} queued", pre_queued_buffer.pts, pre_queued_buffer.dts.display()); - // If the stream has no valid running time, assume it's before everything else. - let running_time = match segment.to_running_time(buffer.dts_or_pts()) { - None => { - gst::trace!(CAT, obj: stream.sinkpad, "Stream has no valid running time"); - if earliest_stream - .as_ref() - .map_or(true, |(_, _, earliest_running_time)| { - *earliest_running_time > gst::ClockTime::ZERO - }) - { - earliest_stream = Some((idx, stream, gst::ClockTime::ZERO)); - } - continue; - } - Some(running_time) => running_time, + let running_time = if stream.delta_frames.requires_dts() { + pre_queued_buffer.dts.unwrap() + } else { + gst::Signed::Positive(pre_queued_buffer.pts) }; - gst::trace!(CAT, obj: stream.sinkpad, "Stream has running time {} queued", running_time); - if earliest_stream .as_ref() .map_or(true, |(_idx, _stream, earliest_running_time)| { @@ -280,96 +667,42 @@ impl FMP4Mux { &self, _idx: usize, stream: &mut Stream, - segment: &gst::FormattedSegment, - mut buffer: gst::Buffer, + mut pre_queued_buffer: PreQueuedBuffer, ) -> Result<(), gst::FlowError> { - use gst::Signed::*; - assert!(!stream.fragment_filled); - gst::trace!(CAT, obj: stream.sinkpad, "Handling buffer {:?}", buffer); + gst::trace!(CAT, obj: stream.sinkpad, "Handling buffer {:?}", pre_queued_buffer); let delta_frames = stream.delta_frames; - if delta_frames.requires_dts() && buffer.dts().is_none() { - gst::error!(CAT, obj: stream.sinkpad, "Require DTS for video streams"); - return Err(gst::FlowError::Error); - } - - if delta_frames.intra_only() && buffer.flags().contains(gst::BufferFlags::DELTA_UNIT) { - gst::error!(CAT, obj: stream.sinkpad, "Intra-only stream with delta units"); - return Err(gst::FlowError::Error); - } - - let pts_position = buffer.pts().ok_or_else(|| { - gst::error!(CAT, obj: stream.sinkpad, "Require timestamped buffers"); - gst::FlowError::Error - })?; - let duration = buffer.duration(); - let end_pts_position = duration.opt_add(pts_position).unwrap_or(pts_position); - - let mut pts = segment - .to_running_time_full(pts_position) - .ok_or_else(|| { - gst::error!(CAT, obj: stream.sinkpad, "Couldn't convert PTS to running time"); - gst::FlowError::Error - })? - .positive() - .unwrap_or_else(|| { - gst::warning!(CAT, obj: stream.sinkpad, "Negative PTSs are not supported"); - gst::ClockTime::ZERO - }); - - let mut end_pts = segment - .to_running_time_full(end_pts_position) - .ok_or_else(|| { - gst::error!(CAT, obj: stream.sinkpad, "Couldn't convert end PTS to running time"); - gst::FlowError::Error - })? - .positive() - .unwrap_or_else(|| { - gst::warning!(CAT, obj: stream.sinkpad, "Negative PTSs are not supported"); - gst::ClockTime::ZERO - }); - - // Enforce monotonically increasing PTS for intra-only streams + // Enforce monotonically increasing PTS for intra-only streams, and DTS otherwise if !delta_frames.requires_dts() { - if pts < stream.current_position { + if pre_queued_buffer.pts < stream.current_position { gst::warning!( CAT, obj: stream.sinkpad, "Decreasing PTS {} < {}", - pts, + pre_queued_buffer.pts, stream.current_position, ); - pts = stream.current_position; + pre_queued_buffer.pts = stream.current_position; } else { - stream.current_position = pts; + stream.current_position = pre_queued_buffer.pts; } - end_pts = std::cmp::max(end_pts, pts); - } - - let (dts_position, dts, end_dts) = if !delta_frames.requires_dts() { - (None, None, None) + pre_queued_buffer.end_pts = + std::cmp::max(pre_queued_buffer.end_pts, pre_queued_buffer.pts); } else { // Negative DTS are handled via the dts_offset and by having negative composition time // offsets in the `trun` box. The smallest DTS here is shifted to zero. - let dts_position = buffer.dts().expect("not DTS"); - let end_dts_position = duration.opt_add(dts_position).unwrap_or(dts_position); - - let signed_dts = segment.to_running_time_full(dts_position).ok_or_else(|| { - gst::error!(CAT, obj: stream.sinkpad, "Couldn't convert DTS to running time"); - gst::FlowError::Error - })?; - let mut dts = match signed_dts { - Positive(dts) => { + let dts = match pre_queued_buffer.dts.unwrap() { + gst::Signed::Positive(dts) => { if let Some(dts_offset) = stream.dts_offset { dts + dts_offset } else { dts } } - Negative(dts) => { + gst::Signed::Negative(dts) => { if stream.dts_offset.is_none() { stream.dts_offset = Some(dts); } @@ -384,30 +717,15 @@ impl FMP4Mux { } }; - let signed_end_dts = - segment - .to_running_time_full(end_dts_position) - .ok_or_else(|| { - gst::error!( - CAT, - obj: stream.sinkpad, - "Couldn't convert end DTS to running time" - ); - gst::FlowError::Error - })?; - let mut end_dts = match signed_end_dts { - Positive(dts) => { + let end_dts = match pre_queued_buffer.end_dts.unwrap() { + gst::Signed::Positive(dts) => { if let Some(dts_offset) = stream.dts_offset { dts + dts_offset } else { dts } } - Negative(dts) => { - if stream.dts_offset.is_none() { - stream.dts_offset = Some(dts); - } - + gst::Signed::Negative(dts) => { let dts_offset = stream.dts_offset.unwrap(); if dts > dts_offset { gst::warning!(CAT, obj: stream.sinkpad, "End DTS before first DTS"); @@ -429,34 +747,27 @@ impl FMP4Mux { dts, stream.current_position, ); - dts = stream.current_position; + pre_queued_buffer.dts = Some(gst::Signed::Positive(stream.current_position)); } else { + pre_queued_buffer.dts = Some(gst::Signed::Positive(dts)); stream.current_position = dts; } - end_dts = std::cmp::max(end_dts, dts); + pre_queued_buffer.end_dts = Some(gst::Signed::Positive(std::cmp::max(end_dts, dts))); + } - (Some(dts_position), Some(dts), Some(end_dts)) - }; + let PreQueuedBuffer { + buffer, + pts, + end_pts, + dts, + end_dts, + } = pre_queued_buffer; - // If this is a multi-stream element then we need to update the PTS/DTS positions according - // to the output segment, specifically to re-timestamp them with the running time and - // adjust for the segment shift to compensate for negative DTS. - let aggregator = self.obj(); - let class = aggregator.class(); - let (pts_position, dts_position) = if class.as_ref().variant.is_single_stream() { - (pts_position, dts_position) - } else { - let pts_position = pts + SEGMENT_OFFSET; - let dts_position = dts.map(|dts| { - dts + SEGMENT_OFFSET - stream.dts_offset.unwrap_or(gst::ClockTime::ZERO) - }); + let dts = dts.map(|v| v.positive().unwrap()); + let end_dts = end_dts.map(|v| v.positive().unwrap()); - let buffer = buffer.make_mut(); - buffer.set_pts(pts_position); - buffer.set_dts(dts_position); - - (pts_position, dts_position) - }; + let pts_position = buffer.pts().unwrap(); + let dts_position = buffer.dts(); if !buffer.flags().contains(gst::BufferFlags::DELTA_UNIT) { gst::debug!( @@ -733,6 +1044,14 @@ impl FMP4Mux { return Err(gst_base::AGGREGATOR_FLOW_NEED_DATA); } } + } else if at_eos { + if let Some(last_gop) = gops.last() { + if fragment_end_pts + .map_or(true, |fragment_end_pts| fragment_end_pts < last_gop.end_pts) + { + fragment_end_pts = Some(last_gop.end_pts); + } + } } if gops.is_empty() { @@ -860,18 +1179,10 @@ impl FMP4Mux { let pts = buffer.pts; let dts = buffer.dts.unwrap(); - if pts > dts { - Some(i64::try_from((pts - dts).nseconds()).map_err(|_| { - gst::error!(CAT, obj: stream.sinkpad, "Too big PTS/DTS difference"); - gst::FlowError::Error - })?) - } else { - let diff = i64::try_from((dts - pts).nseconds()).map_err(|_| { - gst::error!(CAT, obj: stream.sinkpad, "Too big PTS/DTS difference"); - gst::FlowError::Error - })?; - Some(-diff) - } + Some(i64::try_from((pts - dts).nseconds()).map_err(|_| { + gst::error!(CAT, obj: stream.sinkpad, "Too big PTS/DTS difference"); + gst::FlowError::Error + })?) }; buffers.push_back(Buffer { @@ -904,245 +1215,6 @@ impl FMP4Mux { )) } - fn preprocess_drained_streams_onvif( - &self, - state: &mut State, - drained_streams: &mut [(super::FragmentHeaderStream, VecDeque)], - ) -> Result, gst::FlowError> { - let aggregator = self.obj(); - if aggregator.class().as_ref().variant != super::Variant::ONVIF { - return Ok(None); - } - - let mut max_end_utc_time = None; - - let calculate_pts = |buffer: &Buffer| -> gst::ClockTime { - let composition_time_offset = buffer.composition_time_offset.unwrap_or(0); - if composition_time_offset > 0 { - buffer.timestamp + (composition_time_offset as u64).nseconds() - } else { - buffer - .timestamp - .checked_sub(((-composition_time_offset) as u64).nseconds()) - .unwrap() - } - }; - - // If this is the first fragment then allow the first buffers to not have a reference - // timestamp meta and backdate them - if state.stream_header.is_none() { - for (idx, (_, drain_buffers)) in drained_streams.iter_mut().enumerate() { - let (buffer_idx, utc_time, buffer) = - match drain_buffers.iter().enumerate().find_map(|(idx, buffer)| { - get_utc_time_from_buffer(&buffer.buffer) - .map(|timestamp| (idx, timestamp, buffer)) - }) { - None => { - gst::error!( - CAT, - obj: state.streams[idx].sinkpad, - "No reference timestamp set on any buffers in the first fragment", - ); - return Err(gst::FlowError::Error); - } - Some(res) => res, - }; - - // Now do the backdating - if buffer_idx > 0 { - let utc_time_pts = calculate_pts(buffer); - - for buffer in drain_buffers.iter_mut().take(buffer_idx) { - let buffer_pts = calculate_pts(buffer); - let buffer_pts_diff = if utc_time_pts >= buffer_pts { - (utc_time_pts - buffer_pts).nseconds() as i64 - } else { - -((buffer_pts - utc_time_pts).nseconds() as i64) - }; - let buffer_utc_time = if buffer_pts_diff >= 0 { - utc_time - .checked_sub((buffer_pts_diff as u64).nseconds()) - .unwrap() - } else { - utc_time - .checked_add(((-buffer_pts_diff) as u64).nseconds()) - .unwrap() - }; - - let buffer = buffer.buffer.make_mut(); - gst::ReferenceTimestampMeta::add( - buffer, - &UNIX_CAPS, - buffer_utc_time, - gst::ClockTime::NONE, - ); - } - } - } - } - - // Calculate the minimum across all streams and remember that - if state.start_utc_time.is_none() { - let mut start_utc_time = None; - - for (idx, (_, drain_buffers)) in drained_streams.iter().enumerate() { - for buffer in drain_buffers { - let utc_time = match get_utc_time_from_buffer(&buffer.buffer) { - None => { - gst::error!( - CAT, - obj: state.streams[idx].sinkpad, - "No reference timestamp set on all buffers" - ); - return Err(gst::FlowError::Error); - } - Some(utc_time) => utc_time, - }; - - if start_utc_time.is_none() || start_utc_time > Some(utc_time) { - start_utc_time = Some(utc_time); - } - } - } - - gst::debug!( - CAT, - imp: self, - "Configuring start UTC time {}", - start_utc_time.unwrap() - ); - state.start_utc_time = start_utc_time; - } - - // Update all buffer timestamps based on the UTC time and offset to the start UTC time - let start_utc_time = state.start_utc_time.unwrap(); - for (idx, (stream, drain_buffers)) in drained_streams.iter_mut().enumerate() { - let mut start_time = None; - - for buffer in drain_buffers.iter_mut() { - let utc_time = match get_utc_time_from_buffer(&buffer.buffer) { - None => { - gst::error!( - CAT, - obj: state.streams[idx].sinkpad, - "No reference timestamp set on all buffers" - ); - return Err(gst::FlowError::Error); - } - Some(utc_time) => utc_time, - }; - - // Convert PTS UTC time to DTS - let mut utc_time_dts = - if let Some(composition_time_offset) = buffer.composition_time_offset { - if composition_time_offset >= 0 { - utc_time - .checked_sub((composition_time_offset as u64).nseconds()) - .unwrap() - } else { - utc_time - .checked_add(((-composition_time_offset) as u64).nseconds()) - .unwrap() - } - } else { - utc_time - }; - - // Enforce monotonically increasing timestamps - if utc_time_dts < state.streams[idx].current_utc_time { - gst::warning!( - CAT, - obj: state.streams[idx].sinkpad, - "Decreasing UTC DTS timestamp for buffer {} < {}", - utc_time_dts, - state.streams[idx].current_utc_time, - ); - utc_time_dts = state.streams[idx].current_utc_time; - } else { - state.streams[idx].current_utc_time = utc_time_dts; - } - - let timestamp = utc_time_dts.checked_sub(start_utc_time).unwrap(); - - gst::trace!( - CAT, - obj: state.streams[idx].sinkpad, - "Updating buffer timestamp from {} to relative UTC DTS time {} / absolute DTS time {}, UTC PTS time {}", - buffer.timestamp, - timestamp, - utc_time_dts, - utc_time, - ); - - buffer.timestamp = timestamp; - if start_time.is_none() || start_time > Some(buffer.timestamp) { - start_time = Some(buffer.timestamp); - } - } - - // Update durations for all buffers except for the last in the fragment unless all - // have the same duration anyway - let mut common_duration = Ok(None); - let mut drain_buffers_iter = drain_buffers.iter_mut().peekable(); - while let Some(buffer) = drain_buffers_iter.next() { - let next_timestamp = drain_buffers_iter.peek().map(|b| b.timestamp); - - if let Some(next_timestamp) = next_timestamp { - let duration = next_timestamp.saturating_sub(buffer.timestamp); - if common_duration == Ok(None) { - common_duration = Ok(Some(duration)); - } else if common_duration != Ok(Some(duration)) { - common_duration = Err(()); - } - - gst::trace!( - CAT, - obj: state.streams[idx].sinkpad, - "Updating buffer with timestamp {} duration from {} to relative UTC duration {}", - buffer.timestamp, - buffer.duration, - duration, - ); - - buffer.duration = duration; - } else if let Ok(Some(common_duration)) = common_duration { - gst::trace!( - CAT, - obj: state.streams[idx].sinkpad, - "Updating last buffer with timestamp {} duration from {} to common relative UTC duration {}", - buffer.timestamp, - buffer.duration, - common_duration, - ); - - buffer.duration = common_duration; - } else { - gst::trace!( - CAT, - obj: state.streams[idx].sinkpad, - "Keeping last buffer with timestamp {} duration at {}", - buffer.timestamp, - buffer.duration, - ); - } - - let end_utc_time = start_utc_time + buffer.timestamp + buffer.duration; - if max_end_utc_time.is_none() || max_end_utc_time < Some(end_utc_time) { - max_end_utc_time = Some(end_utc_time); - } - } - - if let Some(start_time) = start_time { - gst::debug!(CAT, obj: state.streams[idx].sinkpad, "Fragment starting at UTC time {}", start_time); - *stream.start_time.as_mut().unwrap() = start_time; - } else { - assert!(stream.start_time.is_none()); - } - } - - Ok(max_end_utc_time) - } - #[allow(clippy::type_complexity)] fn interleave_buffers( &self, @@ -1252,10 +1324,6 @@ impl FMP4Mux { } } - // For ONVIF, replace all timestamps with timestamps based on UTC times. - let max_end_utc_time = - self.preprocess_drained_streams_onvif(state, &mut drained_streams)?; - // Create header now if it was not created before and return the caps let mut caps = None; if state.stream_header.is_none() { @@ -1264,9 +1332,26 @@ impl FMP4Mux { } // Interleave buffers according to the settings into a single vec - let (mut interleaved_buffers, streams) = + let (mut interleaved_buffers, mut streams) = self.interleave_buffers(settings, drained_streams)?; + // Offset stream start time to start at 0 in ONVIF mode instead of using the UTC time + // verbatim. This would be used for the tfdt box later. + // FIXME: Should this use the original DTS-or-PTS running time instead? + // That might be negative though! + if self.obj().class().as_ref().variant == super::Variant::ONVIF { + let offset = if let Some(start_dts) = state.start_dts { + std::cmp::min(start_dts, state.earliest_pts.unwrap()) + } else { + state.earliest_pts.unwrap() + }; + for stream in &mut streams { + if let Some(start_time) = stream.start_time { + stream.start_time = Some(start_time.checked_sub(offset).unwrap()); + } + } + } + let mut buffer_list = None; if interleaved_buffers.is_empty() { assert!(at_eos); @@ -1382,7 +1467,6 @@ impl FMP4Mux { } state.end_pts = Some(fragment_end_pts); - state.end_utc_time = max_end_utc_time; // Update for the start PTS of the next fragment gst::info!( @@ -1394,39 +1478,59 @@ impl FMP4Mux { state.fragment_start_pts = Some(fragment_end_pts); let fku_time = fragment_end_pts + settings.fragment_duration; - let max_position = state - .streams - .iter() - .map(|s| s.current_position) - .max() - .unwrap(); - - let fku_time = if max_position > fku_time { - gst::warning!( - CAT, - imp: self, - "Sending force-keyunit event late for running time {} at {}", - fku_time, - max_position, - ); - None - } else { - gst::debug!( - CAT, - imp: self, - "Sending force-keyunit event for running time {}", - fku_time, - ); - Some(fku_time) - }; - - let fku = gst_video::UpstreamForceKeyUnitEvent::builder() - .running_time(fku_time) - .all_headers(true) - .build(); for stream in &state.streams { - upstream_events.push((stream.sinkpad.clone(), fku.clone())); + let current_position = stream.current_position; + + // In case of ONVIF this needs to be converted back from UTC time to + // the stream's running time + let (fku_time, current_position) = + if self.obj().class().as_ref().variant == super::Variant::ONVIF { + ( + if let Some(fku_time) = utc_time_to_running_time( + fku_time, + stream.running_time_utc_time_mapping.unwrap(), + ) { + fku_time + } else { + continue; + }, + utc_time_to_running_time( + current_position, + stream.running_time_utc_time_mapping.unwrap(), + ), + ) + } else { + (fku_time, Some(current_position)) + }; + + let fku_time = if current_position + .map_or(false, |current_position| current_position > fku_time) + { + gst::warning!( + CAT, + obj: stream.sinkpad, + "Sending force-keyunit event late for running time {} at {}", + fku_time, + current_position.display(), + ); + None + } else { + gst::debug!( + CAT, + obj: stream.sinkpad, + "Sending force-keyunit event for running time {}", + fku_time, + ); + Some(fku_time) + }; + + let fku = gst_video::UpstreamForceKeyUnitEvent::builder() + .running_time(fku_time) + .all_headers(true) + .build(); + + upstream_events.push((stream.sinkpad.clone(), fku)); } // Reset timeout delay now that we've output an actual fragment @@ -1527,11 +1631,12 @@ impl FMP4Mux { sinkpad: pad, caps, delta_frames, + pre_queue: VecDeque::new(), queued_gops: VecDeque::new(), fragment_filled: false, dts_offset: None, current_position: gst::ClockTime::ZERO, - current_utc_time: gst::ClockTime::ZERO, + running_time_utc_time_mapping: None, }); } @@ -1585,19 +1690,11 @@ impl FMP4Mux { assert!(!at_eos || state.streams.iter().all(|s| s.queued_gops.is_empty())); - let duration = if variant == super::Variant::ONVIF { - state - .end_utc_time - .opt_checked_sub(state.start_utc_time) - .ok() - .flatten() - } else { - state - .end_pts - .opt_checked_sub(state.earliest_pts) - .ok() - .flatten() - }; + let duration = state + .end_pts + .opt_checked_sub(state.earliest_pts) + .ok() + .flatten(); let streams = state .streams @@ -1616,9 +1713,13 @@ impl FMP4Mux { streams, write_mehd: settings.write_mehd, duration: if at_eos { duration } else { None }, - start_utc_time: state - .start_utc_time - .map(|unix| unix.nseconds() / 100 + UNIX_1601_OFFSET * 10_000_000), + start_utc_time: if variant == super::Variant::ONVIF { + state + .earliest_pts + .map(|unix| unix.nseconds() / 100 + UNIX_1601_OFFSET * 10_000_000) + } else { + None + }, }) .map_err(|err| { gst::error!(CAT, imp: self, "Failed to create FMP4 header: {}", err); @@ -1983,8 +2084,9 @@ impl AggregatorImpl for FMP4Mux { stream.queued_gops.clear(); stream.dts_offset = None; stream.current_position = gst::ClockTime::ZERO; - stream.current_utc_time = gst::ClockTime::ZERO; stream.fragment_filled = false; + stream.pre_queue.clear(); + stream.running_time_utc_time_mapping = None; } state.current_offset = 0; @@ -2050,30 +2152,18 @@ impl AggregatorImpl for FMP4Mux { // fill-level at all sinkpads in sync. let fragment_start_pts = state.fragment_start_pts; - while let Some((idx, stream)) = self.find_earliest_stream(&mut state, timeout)? { - // Can only happen if the stream was flushed in the meantime - let buffer = match stream.sinkpad.pop_buffer() { - Some(buffer) => buffer, - None => continue, - }; - - // Can only happen if the stream was flushed in the meantime - let segment = match stream - .sinkpad - .segment() - .clone() - .downcast::() - .ok() - { - Some(segment) => segment, - None => { - gst::error!(CAT, obj: stream.sinkpad, "Got buffer before segment"); - return Err(gst::FlowError::Error); - } - }; + while let Some((idx, stream)) = + self.find_earliest_stream(&mut state, timeout, settings.fragment_duration)? + { + let pre_queued_buffer = Self::pop_buffer( + self, + &stream.sinkpad, + &mut stream.pre_queue, + &stream.running_time_utc_time_mapping, + ); // Queue up the buffer and update GOP tracking state - self.queue_gops(idx, stream, &segment, buffer)?; + self.queue_gops(idx, stream, pre_queued_buffer)?; // Check if this stream is filled enough now. if let Some((queued_end_pts, fragment_start_pts)) = Option::zip( @@ -2096,67 +2186,104 @@ impl AggregatorImpl for FMP4Mux { // Calculate the earliest PTS after queueing input if we can now. if state.earliest_pts.is_none() { let mut earliest_pts = None; + let mut start_dts = None; for stream in &state.streams { - let stream_earliest_pts = match stream.queued_gops.back() { + let (stream_earliest_pts, stream_start_dts) = match stream.queued_gops.back() { None => { earliest_pts = None; + start_dts = None; break; } Some(oldest_gop) => { if !timeout && !oldest_gop.final_earliest_pts { earliest_pts = None; + start_dts = None; break; } - oldest_gop.earliest_pts + (oldest_gop.earliest_pts, oldest_gop.start_dts) } }; if earliest_pts.opt_gt(stream_earliest_pts).unwrap_or(true) { earliest_pts = Some(stream_earliest_pts); } + + if let Some(stream_start_dts) = stream_start_dts { + if start_dts.opt_gt(stream_start_dts).unwrap_or(true) { + start_dts = Some(stream_start_dts); + } + } } if let Some(earliest_pts) = earliest_pts { - gst::info!(CAT, imp: self, "Got earliest PTS {}", earliest_pts); + gst::info!( + CAT, + imp: self, + "Got earliest PTS {}, start DTS {}", + earliest_pts, + start_dts.display() + ); state.earliest_pts = Some(earliest_pts); + state.start_dts = start_dts; state.fragment_start_pts = Some(earliest_pts); let fku_time = earliest_pts + settings.fragment_duration; - let max_position = state - .streams - .iter() - .map(|s| s.current_position) - .max() - .unwrap(); - - let fku_time = if max_position > fku_time { - gst::warning!( - CAT, - imp: self, - "Sending first force-keyunit event late for running time {} at {}", - fku_time, - max_position, - ); - None - } else { - gst::debug!( - CAT, - imp: self, - "Sending first force-keyunit event for running time {}", - fku_time, - ); - Some(fku_time) - }; - - let fku = gst_video::UpstreamForceKeyUnitEvent::builder() - .running_time(fku_time) - .all_headers(true) - .build(); for stream in &mut state.streams { - upstream_events.push((stream.sinkpad.clone(), fku.clone())); + let current_position = stream.current_position; + + // In case of ONVIF this needs to be converted back from UTC time to + // the stream's running time + let (fku_time, current_position) = + if self.obj().class().as_ref().variant == super::Variant::ONVIF { + ( + if let Some(fku_time) = utc_time_to_running_time( + fku_time, + stream.running_time_utc_time_mapping.unwrap(), + ) { + fku_time + } else { + continue; + }, + utc_time_to_running_time( + current_position, + stream.running_time_utc_time_mapping.unwrap(), + ), + ) + } else { + (fku_time, Some(current_position)) + }; + + let fku_time = if current_position + .map_or(false, |current_position| current_position > fku_time) + { + gst::warning!( + CAT, + obj: stream.sinkpad, + "Sending first force-keyunit event late for running time {} at {}", + fku_time, + current_position.display(), + ); + None + } else { + gst::debug!( + CAT, + obj: stream.sinkpad, + "Sending first force-keyunit event for running time {}", + fku_time, + ); + Some(fku_time) + }; + + // XXX: needs translation for ONVIF as these are UTC times already + let fku = gst_video::UpstreamForceKeyUnitEvent::builder() + .running_time(fku_time) + .all_headers(true) + .build(); + + upstream_events.push((stream.sinkpad.clone(), fku)); // Check if this stream is filled enough now. if let Some(queued_end_pts) = stream @@ -2775,8 +2902,9 @@ impl AggregatorPadImpl for FMP4MuxPad { stream.queued_gops.clear(); stream.dts_offset = None; stream.current_position = gst::ClockTime::ZERO; - stream.current_utc_time = gst::ClockTime::ZERO; stream.fragment_filled = false; + stream.pre_queue.clear(); + stream.running_time_utc_time_mapping = None; break; } }