fmp4mux: Use UTC times from reference timestamp meta in ONVIF mode

This commit is contained in:
Sebastian Dröge 2022-05-27 13:27:10 +03:00
parent 5376596557
commit e4081872c5
3 changed files with 304 additions and 7 deletions

View file

@ -397,8 +397,8 @@ pub(super) fn create_fmp4_header(cfg: super::HeaderConfiguration) -> Result<gst:
// track id // track id
v.extend(0u32.to_be_bytes()); v.extend(0u32.to_be_bytes());
// XXX: start UTC time in 100ns units since Jan 1 1601 // start UTC time in 100ns units since Jan 1 1601
v.extend(0u64.to_be_bytes()); v.extend(cfg.start_utc_time.unwrap().to_be_bytes());
Ok(()) Ok(())
}) })

View file

@ -23,6 +23,36 @@ use super::Buffer;
/// Offset for the segment in non-single-stream variants. /// Offset for the segment in non-single-stream variants.
const SEGMENT_OFFSET: gst::ClockTime = gst::ClockTime::from_seconds(60 * 60 * 1000); const SEGMENT_OFFSET: gst::ClockTime = gst::ClockTime::from_seconds(60 * 60 * 1000);
/// Offset between UNIX epoch and Jan 1 1601 epoch in seconds.
/// 1601 = UNIX + UNIX_1601_OFFSET.
const UNIX_1601_OFFSET: u64 = 11_644_473_600;
/// Offset between NTP and UNIX epoch in seconds.
/// NTP = UNIX + NTP_UNIX_OFFSET.
const NTP_UNIX_OFFSET: u64 = 2_208_988_800;
/// Reference timestamp meta caps for NTP timestamps.
static NTP_CAPS: Lazy<gst::Caps> = Lazy::new(|| gst::Caps::builder("timestamp/x-ntp").build());
/// Reference timestamp meta caps for UNIX timestamps.
static UNIX_CAPS: Lazy<gst::Caps> = Lazy::new(|| gst::Caps::builder("timestamp/x-unix").build());
/// Returns the UTC time of the buffer in the UNIX epoch.
fn get_utc_time_from_buffer(buffer: &gst::BufferRef) -> Option<gst::ClockTime> {
buffer
.iter_meta::<gst::ReferenceTimestampMeta>()
.find_map(|meta| {
if meta.reference().can_intersect(&UNIX_CAPS) {
Some(meta.timestamp())
} else if meta.reference().can_intersect(&NTP_CAPS) {
meta.timestamp()
.checked_sub(gst::ClockTime::from_seconds(NTP_UNIX_OFFSET))
} else {
None
}
})
}
static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| { static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
gst::DebugCategory::new( gst::DebugCategory::new(
"fmp4mux", "fmp4mux",
@ -105,6 +135,11 @@ struct Stream {
// timestamps from going backwards when queueing new buffers // timestamps from going backwards when queueing new buffers
current_position: gst::ClockTime, current_position: gst::ClockTime,
// Current UTC time in ONVIF mode to prevent timestamps from
// going backwards when draining a fragment.
// UNIX epoch.
current_utc_time: gst::ClockTime,
last_force_keyunit_time: Option<gst::ClockTime>, last_force_keyunit_time: Option<gst::ClockTime>,
} }
@ -129,6 +164,11 @@ struct State {
// Start PTS of the current fragment // Start PTS of the current fragment
fragment_start_pts: Option<gst::ClockTime>, fragment_start_pts: Option<gst::ClockTime>,
// In ONVIF mode the UTC time corresponding to the beginning of the stream
// UNIX epoch.
start_utc_time: Option<gst::ClockTime>,
end_utc_time: Option<gst::ClockTime>,
sent_headers: bool, sent_headers: bool,
} }
@ -756,6 +796,246 @@ impl FMP4Mux {
} }
} }
let mut max_end_utc_time = None;
// For ONVIF, replace all timestamps with timestamps based on UTC times.
if class.as_ref().variant == super::Variant::ONVIF {
let calculate_pts = |buffer: &Buffer| -> gst::ClockTime {
let composition_time_offset = buffer.composition_time_offset.unwrap_or(0);
if composition_time_offset > 0 {
buffer.timestamp + gst::ClockTime::from_nseconds(composition_time_offset as u64)
} else {
buffer
.timestamp
.checked_sub(gst::ClockTime::from_nseconds(
(-composition_time_offset) as u64,
))
.unwrap()
}
};
// If this is the first fragment then allow the first buffers to not have a reference
// timestamp meta and backdate them
if state.stream_header.is_none() {
for (idx, drain_buffers) in drain_buffers.iter_mut().enumerate() {
let (buffer_idx, utc_time, buffer) = match drain_buffers
.iter()
.enumerate()
.find_map(|(idx, buffer)| {
get_utc_time_from_buffer(&buffer.buffer)
.map(|timestamp| (idx, timestamp, buffer))
}) {
None => {
gst::error!(
CAT,
obj: &state.streams[idx].sinkpad,
"No reference timestamp set on any buffers in the first fragment",
);
return Err(gst::FlowError::Error);
}
Some(res) => res,
};
// Now do the backdating
if buffer_idx > 0 {
let utc_time_pts = calculate_pts(buffer);
for buffer in drain_buffers.iter_mut().take(buffer_idx) {
let buffer_pts = calculate_pts(buffer);
let buffer_pts_diff = if utc_time_pts >= buffer_pts {
(utc_time_pts - buffer_pts).nseconds() as i64
} else {
-((buffer_pts - utc_time_pts).nseconds() as i64)
};
let buffer_utc_time = if buffer_pts_diff >= 0 {
utc_time
.checked_sub(gst::ClockTime::from_nseconds(
buffer_pts_diff as u64,
))
.unwrap()
} else {
utc_time
.checked_add(gst::ClockTime::from_nseconds(
(-buffer_pts_diff) as u64,
))
.unwrap()
};
let buffer = buffer.buffer.make_mut();
gst::ReferenceTimestampMeta::add(
buffer,
&UNIX_CAPS,
buffer_utc_time,
gst::ClockTime::NONE,
);
}
}
}
}
// Calculate the minimum across all streams and remember that
if state.start_utc_time.is_none() {
let mut start_utc_time = None;
for (idx, drain_buffers) in drain_buffers.iter().enumerate() {
for buffer in drain_buffers {
let utc_time = match get_utc_time_from_buffer(&buffer.buffer) {
None => {
gst::error!(
CAT,
obj: &state.streams[idx].sinkpad,
"No reference timestamp set on all buffers"
);
return Err(gst::FlowError::Error);
}
Some(utc_time) => utc_time,
};
if start_utc_time.is_none() || start_utc_time > Some(utc_time) {
start_utc_time = Some(utc_time);
}
}
}
gst::debug!(
CAT,
obj: element,
"Configuring start UTC time {}",
start_utc_time.unwrap()
);
state.start_utc_time = start_utc_time;
}
// Update all buffer timestamps based on the UTC time and offset to the start UTC time
let start_utc_time = state.start_utc_time.unwrap();
for (idx, drain_buffers) in drain_buffers.iter_mut().enumerate() {
let mut start_time = None;
for buffer in drain_buffers.iter_mut() {
let utc_time = match get_utc_time_from_buffer(&buffer.buffer) {
None => {
gst::error!(
CAT,
obj: &state.streams[idx].sinkpad,
"No reference timestamp set on all buffers"
);
return Err(gst::FlowError::Error);
}
Some(utc_time) => utc_time,
};
// Convert PTS UTC time to DTS
let mut utc_time_dts =
if let Some(composition_time_offset) = buffer.composition_time_offset {
if composition_time_offset >= 0 {
utc_time
.checked_sub(gst::ClockTime::from_nseconds(
composition_time_offset as u64,
))
.unwrap()
} else {
utc_time
.checked_add(gst::ClockTime::from_nseconds(
(-composition_time_offset) as u64,
))
.unwrap()
}
} else {
utc_time
};
// Enforce monotonically increasing timestamps
if utc_time_dts < state.streams[idx].current_utc_time {
gst::warning!(
CAT,
obj: &state.streams[idx].sinkpad,
"Decreasing UTC DTS timestamp for buffer {} < {}",
utc_time_dts,
state.streams[idx].current_utc_time,
);
utc_time_dts = state.streams[idx].current_utc_time;
} else {
state.streams[idx].current_utc_time = utc_time_dts;
}
let timestamp = utc_time_dts.checked_sub(start_utc_time).unwrap();
gst::trace!(
CAT,
obj: &state.streams[idx].sinkpad,
"Updating buffer timestamp from {} to relative UTC DTS time {} / absolute DTS time {}, UTC PTS time {}",
buffer.timestamp,
timestamp,
utc_time_dts,
utc_time,
);
buffer.timestamp = timestamp;
if start_time.is_none() || start_time > Some(buffer.timestamp) {
start_time = Some(buffer.timestamp);
}
}
// Update durations for all buffers except for the last in the fragment unless all
// have the same duration anyway
let mut common_duration = Ok(None);
let mut drain_buffers_iter = drain_buffers.iter_mut().peekable();
while let Some(buffer) = drain_buffers_iter.next() {
let next_timestamp = drain_buffers_iter.peek().map(|b| b.timestamp);
if let Some(next_timestamp) = next_timestamp {
let duration = next_timestamp.saturating_sub(buffer.timestamp);
if common_duration == Ok(None) {
common_duration = Ok(Some(duration));
} else if common_duration != Ok(Some(duration)) {
common_duration = Err(());
}
gst::trace!(
CAT,
obj: &state.streams[idx].sinkpad,
"Updating buffer with timestamp {} duration from {} to relative UTC duration {}",
buffer.timestamp,
buffer.duration,
duration,
);
buffer.duration = duration;
} else if let Ok(Some(common_duration)) = common_duration {
gst::trace!(
CAT,
obj: &state.streams[idx].sinkpad,
"Updating last buffer with timestamp {} duration from {} to common relative UTC duration {}",
buffer.timestamp,
buffer.duration,
common_duration,
);
buffer.duration = common_duration;
} else {
gst::trace!(
CAT,
obj: &state.streams[idx].sinkpad,
"Keeping last buffer with timestamp {} duration at {}",
buffer.timestamp,
buffer.duration,
);
}
let end_utc_time = start_utc_time + buffer.timestamp + buffer.duration;
if max_end_utc_time.is_none() || max_end_utc_time < Some(end_utc_time) {
max_end_utc_time = Some(end_utc_time);
}
}
if let Some(start_time) = start_time {
gst::debug!(CAT, obj: &state.streams[idx].sinkpad, "Fragment starting at UTC time {}", start_time);
streams[idx].1.as_mut().unwrap().start_time = start_time;
} else {
assert!(streams[idx].1.is_none());
}
}
}
// Create header now if it was not created before and return the caps // Create header now if it was not created before and return the caps
let mut caps = None; let mut caps = None;
if state.stream_header.is_none() { if state.stream_header.is_none() {
@ -934,6 +1214,7 @@ impl FMP4Mux {
}); });
} }
state.end_pts = Some(max_end_pts); state.end_pts = Some(max_end_pts);
state.end_utc_time = max_end_utc_time;
// Update for the start PTS of the next fragment // Update for the start PTS of the next fragment
state.fragment_start_pts = state.fragment_start_pts.map(|start| { state.fragment_start_pts = state.fragment_start_pts.map(|start| {
@ -1036,6 +1317,7 @@ impl FMP4Mux {
fragment_filled: false, fragment_filled: false,
dts_offset: None, dts_offset: None,
current_position: gst::ClockTime::ZERO, current_position: gst::ClockTime::ZERO,
current_utc_time: gst::ClockTime::ZERO,
last_force_keyunit_time: None, last_force_keyunit_time: None,
}); });
} }
@ -1090,11 +1372,19 @@ impl FMP4Mux {
assert!(!at_eos || state.streams.iter().all(|s| s.queued_gops.is_empty())); assert!(!at_eos || state.streams.iter().all(|s| s.queued_gops.is_empty()));
let duration = state let duration = if variant == super::Variant::ONVIF {
.end_pts state
.opt_checked_sub(state.earliest_pts) .end_utc_time
.ok() .opt_checked_sub(state.start_utc_time)
.flatten(); .ok()
.flatten()
} else {
state
.end_pts
.opt_checked_sub(state.earliest_pts)
.ok()
.flatten()
};
let streams = state let streams = state
.streams .streams
@ -1108,6 +1398,9 @@ impl FMP4Mux {
streams: streams.as_slice(), streams: streams.as_slice(),
write_mehd: settings.write_mehd, write_mehd: settings.write_mehd,
duration: if at_eos { duration } else { None }, duration: if at_eos { duration } else { None },
start_utc_time: state
.start_utc_time
.map(|unix| unix.nseconds() / 100 + UNIX_1601_OFFSET * 10_000_000),
}) })
.map_err(|err| { .map_err(|err| {
gst::error!(CAT, obj: element, "Failed to create FMP4 header: {}", err); gst::error!(CAT, obj: element, "Failed to create FMP4 header: {}", err);
@ -1486,6 +1779,7 @@ impl AggregatorImpl for FMP4Mux {
stream.queued_gops.clear(); stream.queued_gops.clear();
stream.dts_offset = None; stream.dts_offset = None;
stream.current_position = gst::ClockTime::ZERO; stream.current_position = gst::ClockTime::ZERO;
stream.current_utc_time = gst::ClockTime::ZERO;
stream.last_force_keyunit_time = None; stream.last_force_keyunit_time = None;
stream.fragment_filled = false; stream.fragment_filled = false;
} }

View file

@ -70,6 +70,9 @@ pub(crate) struct HeaderConfiguration<'a> {
streams: &'a [gst::Caps], streams: &'a [gst::Caps],
write_mehd: bool, write_mehd: bool,
duration: Option<gst::ClockTime>, duration: Option<gst::ClockTime>,
/// Start UTC time in ONVIF mode.
/// Since Jan 1 1601 in 100ns units.
start_utc_time: Option<u64>,
} }
#[derive(Debug)] #[derive(Debug)]