diff --git a/generic/fmp4/src/fmp4mux/boxes.rs b/generic/fmp4/src/fmp4mux/boxes.rs index 5c93ed6b..b82507d7 100644 --- a/generic/fmp4/src/fmp4mux/boxes.rs +++ b/generic/fmp4/src/fmp4mux/boxes.rs @@ -397,8 +397,8 @@ pub(super) fn create_fmp4_header(cfg: super::HeaderConfiguration) -> Result = Lazy::new(|| gst::Caps::builder("timestamp/x-ntp").build()); + +/// Reference timestamp meta caps for UNIX timestamps. +static UNIX_CAPS: Lazy = Lazy::new(|| gst::Caps::builder("timestamp/x-unix").build()); + +/// Returns the UTC time of the buffer in the UNIX epoch. +fn get_utc_time_from_buffer(buffer: &gst::BufferRef) -> Option { + buffer + .iter_meta::() + .find_map(|meta| { + if meta.reference().can_intersect(&UNIX_CAPS) { + Some(meta.timestamp()) + } else if meta.reference().can_intersect(&NTP_CAPS) { + meta.timestamp() + .checked_sub(gst::ClockTime::from_seconds(NTP_UNIX_OFFSET)) + } else { + None + } + }) +} + static CAT: Lazy = Lazy::new(|| { gst::DebugCategory::new( "fmp4mux", @@ -105,6 +135,11 @@ struct Stream { // timestamps from going backwards when queueing new buffers current_position: gst::ClockTime, + // Current UTC time in ONVIF mode to prevent timestamps from + // going backwards when draining a fragment. + // UNIX epoch. + current_utc_time: gst::ClockTime, + last_force_keyunit_time: Option, } @@ -129,6 +164,11 @@ struct State { // Start PTS of the current fragment fragment_start_pts: Option, + // In ONVIF mode the UTC time corresponding to the beginning of the stream + // UNIX epoch. + start_utc_time: Option, + end_utc_time: Option, + sent_headers: bool, } @@ -756,6 +796,246 @@ impl FMP4Mux { } } + let mut max_end_utc_time = None; + // For ONVIF, replace all timestamps with timestamps based on UTC times. + if class.as_ref().variant == super::Variant::ONVIF { + let calculate_pts = |buffer: &Buffer| -> gst::ClockTime { + let composition_time_offset = buffer.composition_time_offset.unwrap_or(0); + if composition_time_offset > 0 { + buffer.timestamp + gst::ClockTime::from_nseconds(composition_time_offset as u64) + } else { + buffer + .timestamp + .checked_sub(gst::ClockTime::from_nseconds( + (-composition_time_offset) as u64, + )) + .unwrap() + } + }; + + // If this is the first fragment then allow the first buffers to not have a reference + // timestamp meta and backdate them + if state.stream_header.is_none() { + for (idx, drain_buffers) in drain_buffers.iter_mut().enumerate() { + let (buffer_idx, utc_time, buffer) = match drain_buffers + .iter() + .enumerate() + .find_map(|(idx, buffer)| { + get_utc_time_from_buffer(&buffer.buffer) + .map(|timestamp| (idx, timestamp, buffer)) + }) { + None => { + gst::error!( + CAT, + obj: &state.streams[idx].sinkpad, + "No reference timestamp set on any buffers in the first fragment", + ); + return Err(gst::FlowError::Error); + } + Some(res) => res, + }; + + // Now do the backdating + if buffer_idx > 0 { + let utc_time_pts = calculate_pts(buffer); + + for buffer in drain_buffers.iter_mut().take(buffer_idx) { + let buffer_pts = calculate_pts(buffer); + let buffer_pts_diff = if utc_time_pts >= buffer_pts { + (utc_time_pts - buffer_pts).nseconds() as i64 + } else { + -((buffer_pts - utc_time_pts).nseconds() as i64) + }; + let buffer_utc_time = if buffer_pts_diff >= 0 { + utc_time + .checked_sub(gst::ClockTime::from_nseconds( + buffer_pts_diff as u64, + )) + .unwrap() + } else { + utc_time + .checked_add(gst::ClockTime::from_nseconds( + (-buffer_pts_diff) as u64, + )) + .unwrap() + }; + + let buffer = buffer.buffer.make_mut(); + gst::ReferenceTimestampMeta::add( + buffer, + &UNIX_CAPS, + buffer_utc_time, + gst::ClockTime::NONE, + ); + } + } + } + } + + // Calculate the minimum across all streams and remember that + if state.start_utc_time.is_none() { + let mut start_utc_time = None; + + for (idx, drain_buffers) in drain_buffers.iter().enumerate() { + for buffer in drain_buffers { + let utc_time = match get_utc_time_from_buffer(&buffer.buffer) { + None => { + gst::error!( + CAT, + obj: &state.streams[idx].sinkpad, + "No reference timestamp set on all buffers" + ); + return Err(gst::FlowError::Error); + } + Some(utc_time) => utc_time, + }; + + if start_utc_time.is_none() || start_utc_time > Some(utc_time) { + start_utc_time = Some(utc_time); + } + } + } + + gst::debug!( + CAT, + obj: element, + "Configuring start UTC time {}", + start_utc_time.unwrap() + ); + state.start_utc_time = start_utc_time; + } + + // Update all buffer timestamps based on the UTC time and offset to the start UTC time + let start_utc_time = state.start_utc_time.unwrap(); + for (idx, drain_buffers) in drain_buffers.iter_mut().enumerate() { + let mut start_time = None; + + for buffer in drain_buffers.iter_mut() { + let utc_time = match get_utc_time_from_buffer(&buffer.buffer) { + None => { + gst::error!( + CAT, + obj: &state.streams[idx].sinkpad, + "No reference timestamp set on all buffers" + ); + return Err(gst::FlowError::Error); + } + Some(utc_time) => utc_time, + }; + + // Convert PTS UTC time to DTS + let mut utc_time_dts = + if let Some(composition_time_offset) = buffer.composition_time_offset { + if composition_time_offset >= 0 { + utc_time + .checked_sub(gst::ClockTime::from_nseconds( + composition_time_offset as u64, + )) + .unwrap() + } else { + utc_time + .checked_add(gst::ClockTime::from_nseconds( + (-composition_time_offset) as u64, + )) + .unwrap() + } + } else { + utc_time + }; + + // Enforce monotonically increasing timestamps + if utc_time_dts < state.streams[idx].current_utc_time { + gst::warning!( + CAT, + obj: &state.streams[idx].sinkpad, + "Decreasing UTC DTS timestamp for buffer {} < {}", + utc_time_dts, + state.streams[idx].current_utc_time, + ); + utc_time_dts = state.streams[idx].current_utc_time; + } else { + state.streams[idx].current_utc_time = utc_time_dts; + } + + let timestamp = utc_time_dts.checked_sub(start_utc_time).unwrap(); + + gst::trace!( + CAT, + obj: &state.streams[idx].sinkpad, + "Updating buffer timestamp from {} to relative UTC DTS time {} / absolute DTS time {}, UTC PTS time {}", + buffer.timestamp, + timestamp, + utc_time_dts, + utc_time, + ); + + buffer.timestamp = timestamp; + if start_time.is_none() || start_time > Some(buffer.timestamp) { + start_time = Some(buffer.timestamp); + } + } + + // Update durations for all buffers except for the last in the fragment unless all + // have the same duration anyway + let mut common_duration = Ok(None); + let mut drain_buffers_iter = drain_buffers.iter_mut().peekable(); + while let Some(buffer) = drain_buffers_iter.next() { + let next_timestamp = drain_buffers_iter.peek().map(|b| b.timestamp); + + if let Some(next_timestamp) = next_timestamp { + let duration = next_timestamp.saturating_sub(buffer.timestamp); + if common_duration == Ok(None) { + common_duration = Ok(Some(duration)); + } else if common_duration != Ok(Some(duration)) { + common_duration = Err(()); + } + + gst::trace!( + CAT, + obj: &state.streams[idx].sinkpad, + "Updating buffer with timestamp {} duration from {} to relative UTC duration {}", + buffer.timestamp, + buffer.duration, + duration, + ); + + buffer.duration = duration; + } else if let Ok(Some(common_duration)) = common_duration { + gst::trace!( + CAT, + obj: &state.streams[idx].sinkpad, + "Updating last buffer with timestamp {} duration from {} to common relative UTC duration {}", + buffer.timestamp, + buffer.duration, + common_duration, + ); + + buffer.duration = common_duration; + } else { + gst::trace!( + CAT, + obj: &state.streams[idx].sinkpad, + "Keeping last buffer with timestamp {} duration at {}", + buffer.timestamp, + buffer.duration, + ); + } + + let end_utc_time = start_utc_time + buffer.timestamp + buffer.duration; + if max_end_utc_time.is_none() || max_end_utc_time < Some(end_utc_time) { + max_end_utc_time = Some(end_utc_time); + } + } + + if let Some(start_time) = start_time { + gst::debug!(CAT, obj: &state.streams[idx].sinkpad, "Fragment starting at UTC time {}", start_time); + streams[idx].1.as_mut().unwrap().start_time = start_time; + } else { + assert!(streams[idx].1.is_none()); + } + } + } + // Create header now if it was not created before and return the caps let mut caps = None; if state.stream_header.is_none() { @@ -934,6 +1214,7 @@ impl FMP4Mux { }); } state.end_pts = Some(max_end_pts); + state.end_utc_time = max_end_utc_time; // Update for the start PTS of the next fragment state.fragment_start_pts = state.fragment_start_pts.map(|start| { @@ -1036,6 +1317,7 @@ impl FMP4Mux { fragment_filled: false, dts_offset: None, current_position: gst::ClockTime::ZERO, + current_utc_time: gst::ClockTime::ZERO, last_force_keyunit_time: None, }); } @@ -1090,11 +1372,19 @@ impl FMP4Mux { assert!(!at_eos || state.streams.iter().all(|s| s.queued_gops.is_empty())); - let duration = state - .end_pts - .opt_checked_sub(state.earliest_pts) - .ok() - .flatten(); + let duration = if variant == super::Variant::ONVIF { + state + .end_utc_time + .opt_checked_sub(state.start_utc_time) + .ok() + .flatten() + } else { + state + .end_pts + .opt_checked_sub(state.earliest_pts) + .ok() + .flatten() + }; let streams = state .streams @@ -1108,6 +1398,9 @@ impl FMP4Mux { streams: streams.as_slice(), write_mehd: settings.write_mehd, duration: if at_eos { duration } else { None }, + start_utc_time: state + .start_utc_time + .map(|unix| unix.nseconds() / 100 + UNIX_1601_OFFSET * 10_000_000), }) .map_err(|err| { gst::error!(CAT, obj: element, "Failed to create FMP4 header: {}", err); @@ -1486,6 +1779,7 @@ impl AggregatorImpl for FMP4Mux { stream.queued_gops.clear(); stream.dts_offset = None; stream.current_position = gst::ClockTime::ZERO; + stream.current_utc_time = gst::ClockTime::ZERO; stream.last_force_keyunit_time = None; stream.fragment_filled = false; } diff --git a/generic/fmp4/src/fmp4mux/mod.rs b/generic/fmp4/src/fmp4mux/mod.rs index 2d0ba10e..1424d9a3 100644 --- a/generic/fmp4/src/fmp4mux/mod.rs +++ b/generic/fmp4/src/fmp4mux/mod.rs @@ -70,6 +70,9 @@ pub(crate) struct HeaderConfiguration<'a> { streams: &'a [gst::Caps], write_mehd: bool, duration: Option, + /// Start UTC time in ONVIF mode. + /// Since Jan 1 1601 in 100ns units. + start_utc_time: Option, } #[derive(Debug)]