mirror of
https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs.git
synced 2025-06-05 06:58:58 +00:00
fmp4mux: Split huge drain function into separate functions
This commit is contained in:
parent
4ba4b00235
commit
b63627025e
1 changed files with 438 additions and 360 deletions
|
@ -672,30 +672,29 @@ impl FMP4Mux {
|
||||||
))
|
))
|
||||||
}
|
}
|
||||||
|
|
||||||
fn drain(
|
#[allow(clippy::type_complexity)]
|
||||||
|
fn drain_buffers(
|
||||||
&self,
|
&self,
|
||||||
element: &super::FMP4Mux,
|
_element: &super::FMP4Mux,
|
||||||
state: &mut State,
|
state: &mut State,
|
||||||
settings: &Settings,
|
settings: &Settings,
|
||||||
timeout: bool,
|
timeout: bool,
|
||||||
at_eos: bool,
|
at_eos: bool,
|
||||||
) -> Result<(Option<gst::Caps>, Option<gst::BufferList>), gst::FlowError> {
|
) -> Result<
|
||||||
let class = element.class();
|
(
|
||||||
|
Vec<(
|
||||||
if at_eos {
|
gst::Caps,
|
||||||
gst::info!(CAT, obj: element, "Draining at EOS");
|
Option<super::FragmentTimingInfo>,
|
||||||
} else if timeout {
|
VecDeque<Buffer>,
|
||||||
gst::info!(CAT, obj: element, "Draining at timeout");
|
)>,
|
||||||
} else {
|
Option<gst::ClockTime>,
|
||||||
for stream in &state.streams {
|
Option<gst::ClockTime>,
|
||||||
if !stream.fragment_filled && !stream.sinkpad.is_eos() {
|
Option<gst::ClockTime>,
|
||||||
return Ok((None, None));
|
Option<gst::ClockTime>,
|
||||||
}
|
),
|
||||||
}
|
gst::FlowError,
|
||||||
}
|
> {
|
||||||
|
let mut drained_streams = Vec::with_capacity(state.streams.len());
|
||||||
let mut drain_buffers = Vec::with_capacity(state.streams.len());
|
|
||||||
let mut streams = Vec::with_capacity(state.streams.len());
|
|
||||||
|
|
||||||
let mut min_earliest_pts_position = None;
|
let mut min_earliest_pts_position = None;
|
||||||
let mut min_earliest_pts = None;
|
let mut min_earliest_pts = None;
|
||||||
|
@ -740,192 +739,202 @@ impl FMP4Mux {
|
||||||
"Draining no buffers",
|
"Draining no buffers",
|
||||||
);
|
);
|
||||||
|
|
||||||
streams.push((stream.caps.clone(), None));
|
drained_streams.push((stream.caps.clone(), None, VecDeque::new()));
|
||||||
drain_buffers.push(VecDeque::new());
|
continue;
|
||||||
} else {
|
}
|
||||||
let first_gop = gops.first().unwrap();
|
|
||||||
let last_gop = gops.last().unwrap();
|
|
||||||
let earliest_pts = first_gop.earliest_pts;
|
|
||||||
let earliest_pts_position = first_gop.earliest_pts_position;
|
|
||||||
let start_dts = first_gop.start_dts;
|
|
||||||
let start_dts_position = first_gop.start_dts_position;
|
|
||||||
let end_pts = last_gop.end_pts;
|
|
||||||
let dts_offset = stream.dts_offset;
|
|
||||||
|
|
||||||
if min_earliest_pts.opt_gt(earliest_pts).unwrap_or(true) {
|
let first_gop = gops.first().unwrap();
|
||||||
min_earliest_pts = Some(earliest_pts);
|
let last_gop = gops.last().unwrap();
|
||||||
}
|
let earliest_pts = first_gop.earliest_pts;
|
||||||
if min_earliest_pts_position
|
let earliest_pts_position = first_gop.earliest_pts_position;
|
||||||
.opt_gt(earliest_pts_position)
|
let start_dts = first_gop.start_dts;
|
||||||
|
let start_dts_position = first_gop.start_dts_position;
|
||||||
|
let end_pts = last_gop.end_pts;
|
||||||
|
let dts_offset = stream.dts_offset;
|
||||||
|
|
||||||
|
if min_earliest_pts.opt_gt(earliest_pts).unwrap_or(true) {
|
||||||
|
min_earliest_pts = Some(earliest_pts);
|
||||||
|
}
|
||||||
|
if min_earliest_pts_position
|
||||||
|
.opt_gt(earliest_pts_position)
|
||||||
|
.unwrap_or(true)
|
||||||
|
{
|
||||||
|
min_earliest_pts_position = Some(earliest_pts_position);
|
||||||
|
}
|
||||||
|
if let Some(start_dts_position) = start_dts_position {
|
||||||
|
if min_start_dts_position
|
||||||
|
.opt_gt(start_dts_position)
|
||||||
.unwrap_or(true)
|
.unwrap_or(true)
|
||||||
{
|
{
|
||||||
min_earliest_pts_position = Some(earliest_pts_position);
|
min_start_dts_position = Some(start_dts_position);
|
||||||
}
|
|
||||||
if let Some(start_dts_position) = start_dts_position {
|
|
||||||
if min_start_dts_position
|
|
||||||
.opt_gt(start_dts_position)
|
|
||||||
.unwrap_or(true)
|
|
||||||
{
|
|
||||||
min_start_dts_position = Some(start_dts_position);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if max_end_pts.opt_lt(end_pts).unwrap_or(true) {
|
|
||||||
max_end_pts = Some(end_pts);
|
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
if max_end_pts.opt_lt(end_pts).unwrap_or(true) {
|
||||||
|
max_end_pts = Some(end_pts);
|
||||||
|
}
|
||||||
|
|
||||||
gst::info!(
|
gst::info!(
|
||||||
CAT,
|
CAT,
|
||||||
obj: &stream.sinkpad,
|
obj: &stream.sinkpad,
|
||||||
"Draining {} worth of buffers starting at PTS {} DTS {}, DTS offset {}",
|
"Draining {} worth of buffers starting at PTS {} DTS {}, DTS offset {}",
|
||||||
end_pts.saturating_sub(earliest_pts),
|
end_pts.saturating_sub(earliest_pts),
|
||||||
earliest_pts,
|
earliest_pts,
|
||||||
start_dts.display(),
|
start_dts.display(),
|
||||||
dts_offset.display(),
|
dts_offset.display(),
|
||||||
);
|
);
|
||||||
|
|
||||||
if let Some((prev_gop, first_gop)) = Option::zip(
|
|
||||||
stream.queued_gops.iter().find(|gop| gop.final_end_pts),
|
|
||||||
stream.queued_gops.back(),
|
|
||||||
) {
|
|
||||||
gst::debug!(
|
|
||||||
CAT,
|
|
||||||
obj: &stream.sinkpad,
|
|
||||||
"Queued full GOPs duration updated to {}",
|
|
||||||
prev_gop.end_pts.saturating_sub(first_gop.earliest_pts),
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
if let Some((prev_gop, first_gop)) = Option::zip(
|
||||||
|
stream.queued_gops.iter().find(|gop| gop.final_end_pts),
|
||||||
|
stream.queued_gops.back(),
|
||||||
|
) {
|
||||||
gst::debug!(
|
gst::debug!(
|
||||||
CAT,
|
CAT,
|
||||||
obj: &stream.sinkpad,
|
obj: &stream.sinkpad,
|
||||||
"Queued duration updated to {}",
|
"Queued full GOPs duration updated to {}",
|
||||||
Option::zip(stream.queued_gops.front(), stream.queued_gops.back())
|
prev_gop.end_pts.saturating_sub(first_gop.earliest_pts),
|
||||||
.map(|(end, start)| end.end_pts.saturating_sub(start.start_pts))
|
|
||||||
.unwrap_or(gst::ClockTime::ZERO)
|
|
||||||
);
|
);
|
||||||
|
}
|
||||||
|
|
||||||
let start_time = if stream.intra_only {
|
gst::debug!(
|
||||||
earliest_pts
|
CAT,
|
||||||
} else {
|
obj: &stream.sinkpad,
|
||||||
start_dts.unwrap()
|
"Queued duration updated to {}",
|
||||||
};
|
Option::zip(stream.queued_gops.front(), stream.queued_gops.back())
|
||||||
|
.map(|(end, start)| end.end_pts.saturating_sub(start.start_pts))
|
||||||
|
.unwrap_or(gst::ClockTime::ZERO)
|
||||||
|
);
|
||||||
|
|
||||||
streams.push((
|
let start_time = if stream.intra_only {
|
||||||
stream.caps.clone(),
|
earliest_pts
|
||||||
Some(super::FragmentTimingInfo {
|
} else {
|
||||||
start_time,
|
start_dts.unwrap()
|
||||||
intra_only: stream.intra_only,
|
};
|
||||||
}),
|
|
||||||
));
|
|
||||||
|
|
||||||
let mut buffers =
|
let mut buffers = VecDeque::with_capacity(gops.iter().map(|g| g.buffers.len()).sum());
|
||||||
VecDeque::with_capacity(gops.iter().map(|g| g.buffers.len()).sum());
|
|
||||||
|
|
||||||
for gop in gops {
|
for gop in gops {
|
||||||
let mut gop_buffers = gop.buffers.into_iter().peekable();
|
let mut gop_buffers = gop.buffers.into_iter().peekable();
|
||||||
while let Some(buffer) = gop_buffers.next() {
|
while let Some(buffer) = gop_buffers.next() {
|
||||||
let timestamp = if stream.intra_only {
|
let timestamp = if stream.intra_only {
|
||||||
buffer.pts
|
buffer.pts
|
||||||
} else {
|
} else {
|
||||||
buffer.dts.unwrap()
|
buffer.dts.unwrap()
|
||||||
};
|
};
|
||||||
|
|
||||||
let end_timestamp = match gop_buffers.peek() {
|
let end_timestamp = match gop_buffers.peek() {
|
||||||
Some(buffer) => {
|
Some(buffer) => {
|
||||||
if stream.intra_only {
|
if stream.intra_only {
|
||||||
buffer.pts
|
buffer.pts
|
||||||
} else {
|
} else {
|
||||||
buffer.dts.unwrap()
|
buffer.dts.unwrap()
|
||||||
}
|
|
||||||
}
|
}
|
||||||
None => {
|
}
|
||||||
if stream.intra_only {
|
None => {
|
||||||
gop.end_pts
|
if stream.intra_only {
|
||||||
} else {
|
gop.end_pts
|
||||||
gop.end_dts.unwrap()
|
} else {
|
||||||
}
|
gop.end_dts.unwrap()
|
||||||
}
|
}
|
||||||
};
|
}
|
||||||
|
};
|
||||||
|
|
||||||
// Timestamps are enforced to monotonically increase when queueing buffers
|
// Timestamps are enforced to monotonically increase when queueing buffers
|
||||||
let duration = end_timestamp
|
let duration = end_timestamp
|
||||||
.checked_sub(timestamp)
|
.checked_sub(timestamp)
|
||||||
.expect("Timestamps going backwards");
|
.expect("Timestamps going backwards");
|
||||||
|
|
||||||
let composition_time_offset = if stream.intra_only {
|
let composition_time_offset = if stream.intra_only {
|
||||||
None
|
None
|
||||||
} else {
|
} else {
|
||||||
let pts = buffer.pts;
|
let pts = buffer.pts;
|
||||||
let dts = buffer.dts.unwrap();
|
let dts = buffer.dts.unwrap();
|
||||||
|
|
||||||
if pts > dts {
|
if pts > dts {
|
||||||
Some(
|
Some(
|
||||||
i64::try_from((pts - dts).nseconds())
|
i64::try_from((pts - dts).nseconds())
|
||||||
.map_err(|_| {
|
.map_err(|_| {
|
||||||
gst::error!(CAT, obj: &stream.sinkpad, "Too big PTS/DTS difference");
|
gst::error!(CAT, obj: &stream.sinkpad, "Too big PTS/DTS difference");
|
||||||
gst::FlowError::Error
|
gst::FlowError::Error
|
||||||
})?,
|
})?,
|
||||||
)
|
)
|
||||||
} else {
|
} else {
|
||||||
let diff = i64::try_from((dts - pts).nseconds())
|
let diff = i64::try_from((dts - pts).nseconds())
|
||||||
.map_err(|_| {
|
.map_err(|_| {
|
||||||
gst::error!(CAT, obj: &stream.sinkpad, "Too big PTS/DTS difference");
|
gst::error!(CAT, obj: &stream.sinkpad, "Too big PTS/DTS difference");
|
||||||
gst::FlowError::Error
|
gst::FlowError::Error
|
||||||
})?;
|
})?;
|
||||||
Some(-diff)
|
Some(-diff)
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
buffers.push_back(Buffer {
|
buffers.push_back(Buffer {
|
||||||
idx,
|
idx,
|
||||||
buffer: buffer.buffer,
|
buffer: buffer.buffer,
|
||||||
timestamp,
|
timestamp,
|
||||||
duration,
|
duration,
|
||||||
composition_time_offset,
|
composition_time_offset,
|
||||||
});
|
});
|
||||||
}
|
|
||||||
}
|
}
|
||||||
drain_buffers.push(buffers);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
drained_streams.push((
|
||||||
|
stream.caps.clone(),
|
||||||
|
Some(super::FragmentTimingInfo {
|
||||||
|
start_time,
|
||||||
|
intra_only: stream.intra_only,
|
||||||
|
}),
|
||||||
|
buffers,
|
||||||
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
// Remove all GAP buffers before processing them further
|
Ok((
|
||||||
for buffers in &mut drain_buffers {
|
drained_streams,
|
||||||
buffers.retain(|buf| {
|
min_earliest_pts_position,
|
||||||
!buf.buffer.flags().contains(gst::BufferFlags::GAP)
|
min_earliest_pts,
|
||||||
|| !buf.buffer.flags().contains(gst::BufferFlags::DROPPABLE)
|
min_start_dts_position,
|
||||||
|| buf.buffer.size() != 0
|
max_end_pts,
|
||||||
});
|
))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn preprocess_drained_streams_onvif(
|
||||||
|
&self,
|
||||||
|
element: &super::FMP4Mux,
|
||||||
|
state: &mut State,
|
||||||
|
drained_streams: &mut [(
|
||||||
|
gst::Caps,
|
||||||
|
Option<super::FragmentTimingInfo>,
|
||||||
|
VecDeque<Buffer>,
|
||||||
|
)],
|
||||||
|
) -> Result<Option<gst::ClockTime>, gst::FlowError> {
|
||||||
|
if element.class().as_ref().variant != super::Variant::ONVIF {
|
||||||
|
return Ok(None);
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut max_end_utc_time = None;
|
let mut max_end_utc_time = None;
|
||||||
// For ONVIF, replace all timestamps with timestamps based on UTC times.
|
|
||||||
if class.as_ref().variant == super::Variant::ONVIF {
|
|
||||||
let calculate_pts = |buffer: &Buffer| -> gst::ClockTime {
|
|
||||||
let composition_time_offset = buffer.composition_time_offset.unwrap_or(0);
|
|
||||||
if composition_time_offset > 0 {
|
|
||||||
buffer.timestamp + gst::ClockTime::from_nseconds(composition_time_offset as u64)
|
|
||||||
} else {
|
|
||||||
buffer
|
|
||||||
.timestamp
|
|
||||||
.checked_sub(gst::ClockTime::from_nseconds(
|
|
||||||
(-composition_time_offset) as u64,
|
|
||||||
))
|
|
||||||
.unwrap()
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
// If this is the first fragment then allow the first buffers to not have a reference
|
let calculate_pts = |buffer: &Buffer| -> gst::ClockTime {
|
||||||
// timestamp meta and backdate them
|
let composition_time_offset = buffer.composition_time_offset.unwrap_or(0);
|
||||||
if state.stream_header.is_none() {
|
if composition_time_offset > 0 {
|
||||||
for (idx, drain_buffers) in drain_buffers.iter_mut().enumerate() {
|
buffer.timestamp + gst::ClockTime::from_nseconds(composition_time_offset as u64)
|
||||||
let (buffer_idx, utc_time, buffer) = match drain_buffers
|
} else {
|
||||||
.iter()
|
buffer
|
||||||
.enumerate()
|
.timestamp
|
||||||
.find_map(|(idx, buffer)| {
|
.checked_sub(gst::ClockTime::from_nseconds(
|
||||||
get_utc_time_from_buffer(&buffer.buffer)
|
(-composition_time_offset) as u64,
|
||||||
.map(|timestamp| (idx, timestamp, buffer))
|
))
|
||||||
}) {
|
.unwrap()
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// If this is the first fragment then allow the first buffers to not have a reference
|
||||||
|
// timestamp meta and backdate them
|
||||||
|
if state.stream_header.is_none() {
|
||||||
|
for (idx, (_, _, drain_buffers)) in drained_streams.iter_mut().enumerate() {
|
||||||
|
let (buffer_idx, utc_time, buffer) =
|
||||||
|
match drain_buffers.iter().enumerate().find_map(|(idx, buffer)| {
|
||||||
|
get_utc_time_from_buffer(&buffer.buffer)
|
||||||
|
.map(|timestamp| (idx, timestamp, buffer))
|
||||||
|
}) {
|
||||||
None => {
|
None => {
|
||||||
gst::error!(
|
gst::error!(
|
||||||
CAT,
|
CAT,
|
||||||
|
@ -937,82 +946,47 @@ impl FMP4Mux {
|
||||||
Some(res) => res,
|
Some(res) => res,
|
||||||
};
|
};
|
||||||
|
|
||||||
// Now do the backdating
|
// Now do the backdating
|
||||||
if buffer_idx > 0 {
|
if buffer_idx > 0 {
|
||||||
let utc_time_pts = calculate_pts(buffer);
|
let utc_time_pts = calculate_pts(buffer);
|
||||||
|
|
||||||
for buffer in drain_buffers.iter_mut().take(buffer_idx) {
|
for buffer in drain_buffers.iter_mut().take(buffer_idx) {
|
||||||
let buffer_pts = calculate_pts(buffer);
|
let buffer_pts = calculate_pts(buffer);
|
||||||
let buffer_pts_diff = if utc_time_pts >= buffer_pts {
|
let buffer_pts_diff = if utc_time_pts >= buffer_pts {
|
||||||
(utc_time_pts - buffer_pts).nseconds() as i64
|
(utc_time_pts - buffer_pts).nseconds() as i64
|
||||||
} else {
|
} else {
|
||||||
-((buffer_pts - utc_time_pts).nseconds() as i64)
|
-((buffer_pts - utc_time_pts).nseconds() as i64)
|
||||||
};
|
};
|
||||||
let buffer_utc_time = if buffer_pts_diff >= 0 {
|
let buffer_utc_time = if buffer_pts_diff >= 0 {
|
||||||
utc_time
|
utc_time
|
||||||
.checked_sub(gst::ClockTime::from_nseconds(
|
.checked_sub(gst::ClockTime::from_nseconds(buffer_pts_diff as u64))
|
||||||
buffer_pts_diff as u64,
|
.unwrap()
|
||||||
))
|
} else {
|
||||||
.unwrap()
|
utc_time
|
||||||
} else {
|
.checked_add(gst::ClockTime::from_nseconds(
|
||||||
utc_time
|
(-buffer_pts_diff) as u64,
|
||||||
.checked_add(gst::ClockTime::from_nseconds(
|
))
|
||||||
(-buffer_pts_diff) as u64,
|
.unwrap()
|
||||||
))
|
|
||||||
.unwrap()
|
|
||||||
};
|
|
||||||
|
|
||||||
let buffer = buffer.buffer.make_mut();
|
|
||||||
gst::ReferenceTimestampMeta::add(
|
|
||||||
buffer,
|
|
||||||
&UNIX_CAPS,
|
|
||||||
buffer_utc_time,
|
|
||||||
gst::ClockTime::NONE,
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Calculate the minimum across all streams and remember that
|
|
||||||
if state.start_utc_time.is_none() {
|
|
||||||
let mut start_utc_time = None;
|
|
||||||
|
|
||||||
for (idx, drain_buffers) in drain_buffers.iter().enumerate() {
|
|
||||||
for buffer in drain_buffers {
|
|
||||||
let utc_time = match get_utc_time_from_buffer(&buffer.buffer) {
|
|
||||||
None => {
|
|
||||||
gst::error!(
|
|
||||||
CAT,
|
|
||||||
obj: &state.streams[idx].sinkpad,
|
|
||||||
"No reference timestamp set on all buffers"
|
|
||||||
);
|
|
||||||
return Err(gst::FlowError::Error);
|
|
||||||
}
|
|
||||||
Some(utc_time) => utc_time,
|
|
||||||
};
|
};
|
||||||
|
|
||||||
if start_utc_time.is_none() || start_utc_time > Some(utc_time) {
|
let buffer = buffer.buffer.make_mut();
|
||||||
start_utc_time = Some(utc_time);
|
gst::ReferenceTimestampMeta::add(
|
||||||
}
|
buffer,
|
||||||
|
&UNIX_CAPS,
|
||||||
|
buffer_utc_time,
|
||||||
|
gst::ClockTime::NONE,
|
||||||
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
gst::debug!(
|
|
||||||
CAT,
|
|
||||||
obj: element,
|
|
||||||
"Configuring start UTC time {}",
|
|
||||||
start_utc_time.unwrap()
|
|
||||||
);
|
|
||||||
state.start_utc_time = start_utc_time;
|
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Update all buffer timestamps based on the UTC time and offset to the start UTC time
|
// Calculate the minimum across all streams and remember that
|
||||||
let start_utc_time = state.start_utc_time.unwrap();
|
if state.start_utc_time.is_none() {
|
||||||
for (idx, drain_buffers) in drain_buffers.iter_mut().enumerate() {
|
let mut start_utc_time = None;
|
||||||
let mut start_time = None;
|
|
||||||
|
|
||||||
for buffer in drain_buffers.iter_mut() {
|
for (idx, (_, _, drain_buffers)) in drained_streams.iter().enumerate() {
|
||||||
|
for buffer in drain_buffers {
|
||||||
let utc_time = match get_utc_time_from_buffer(&buffer.buffer) {
|
let utc_time = match get_utc_time_from_buffer(&buffer.buffer) {
|
||||||
None => {
|
None => {
|
||||||
gst::error!(
|
gst::error!(
|
||||||
|
@ -1025,150 +999,189 @@ impl FMP4Mux {
|
||||||
Some(utc_time) => utc_time,
|
Some(utc_time) => utc_time,
|
||||||
};
|
};
|
||||||
|
|
||||||
// Convert PTS UTC time to DTS
|
if start_utc_time.is_none() || start_utc_time > Some(utc_time) {
|
||||||
let mut utc_time_dts =
|
start_utc_time = Some(utc_time);
|
||||||
if let Some(composition_time_offset) = buffer.composition_time_offset {
|
}
|
||||||
if composition_time_offset >= 0 {
|
}
|
||||||
utc_time
|
}
|
||||||
.checked_sub(gst::ClockTime::from_nseconds(
|
|
||||||
composition_time_offset as u64,
|
|
||||||
))
|
|
||||||
.unwrap()
|
|
||||||
} else {
|
|
||||||
utc_time
|
|
||||||
.checked_add(gst::ClockTime::from_nseconds(
|
|
||||||
(-composition_time_offset) as u64,
|
|
||||||
))
|
|
||||||
.unwrap()
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
utc_time
|
|
||||||
};
|
|
||||||
|
|
||||||
// Enforce monotonically increasing timestamps
|
gst::debug!(
|
||||||
if utc_time_dts < state.streams[idx].current_utc_time {
|
CAT,
|
||||||
gst::warning!(
|
obj: element,
|
||||||
|
"Configuring start UTC time {}",
|
||||||
|
start_utc_time.unwrap()
|
||||||
|
);
|
||||||
|
state.start_utc_time = start_utc_time;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update all buffer timestamps based on the UTC time and offset to the start UTC time
|
||||||
|
let start_utc_time = state.start_utc_time.unwrap();
|
||||||
|
for (idx, (_, timing_info, drain_buffers)) in drained_streams.iter_mut().enumerate() {
|
||||||
|
let mut start_time = None;
|
||||||
|
|
||||||
|
for buffer in drain_buffers.iter_mut() {
|
||||||
|
let utc_time = match get_utc_time_from_buffer(&buffer.buffer) {
|
||||||
|
None => {
|
||||||
|
gst::error!(
|
||||||
CAT,
|
CAT,
|
||||||
obj: &state.streams[idx].sinkpad,
|
obj: &state.streams[idx].sinkpad,
|
||||||
"Decreasing UTC DTS timestamp for buffer {} < {}",
|
"No reference timestamp set on all buffers"
|
||||||
utc_time_dts,
|
|
||||||
state.streams[idx].current_utc_time,
|
|
||||||
);
|
);
|
||||||
utc_time_dts = state.streams[idx].current_utc_time;
|
return Err(gst::FlowError::Error);
|
||||||
} else {
|
|
||||||
state.streams[idx].current_utc_time = utc_time_dts;
|
|
||||||
}
|
}
|
||||||
|
Some(utc_time) => utc_time,
|
||||||
|
};
|
||||||
|
|
||||||
let timestamp = utc_time_dts.checked_sub(start_utc_time).unwrap();
|
// Convert PTS UTC time to DTS
|
||||||
|
let mut utc_time_dts =
|
||||||
|
if let Some(composition_time_offset) = buffer.composition_time_offset {
|
||||||
|
if composition_time_offset >= 0 {
|
||||||
|
utc_time
|
||||||
|
.checked_sub(gst::ClockTime::from_nseconds(
|
||||||
|
composition_time_offset as u64,
|
||||||
|
))
|
||||||
|
.unwrap()
|
||||||
|
} else {
|
||||||
|
utc_time
|
||||||
|
.checked_add(gst::ClockTime::from_nseconds(
|
||||||
|
(-composition_time_offset) as u64,
|
||||||
|
))
|
||||||
|
.unwrap()
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
utc_time
|
||||||
|
};
|
||||||
|
|
||||||
|
// Enforce monotonically increasing timestamps
|
||||||
|
if utc_time_dts < state.streams[idx].current_utc_time {
|
||||||
|
gst::warning!(
|
||||||
|
CAT,
|
||||||
|
obj: &state.streams[idx].sinkpad,
|
||||||
|
"Decreasing UTC DTS timestamp for buffer {} < {}",
|
||||||
|
utc_time_dts,
|
||||||
|
state.streams[idx].current_utc_time,
|
||||||
|
);
|
||||||
|
utc_time_dts = state.streams[idx].current_utc_time;
|
||||||
|
} else {
|
||||||
|
state.streams[idx].current_utc_time = utc_time_dts;
|
||||||
|
}
|
||||||
|
|
||||||
|
let timestamp = utc_time_dts.checked_sub(start_utc_time).unwrap();
|
||||||
|
|
||||||
|
gst::trace!(
|
||||||
|
CAT,
|
||||||
|
obj: &state.streams[idx].sinkpad,
|
||||||
|
"Updating buffer timestamp from {} to relative UTC DTS time {} / absolute DTS time {}, UTC PTS time {}",
|
||||||
|
buffer.timestamp,
|
||||||
|
timestamp,
|
||||||
|
utc_time_dts,
|
||||||
|
utc_time,
|
||||||
|
);
|
||||||
|
|
||||||
|
buffer.timestamp = timestamp;
|
||||||
|
if start_time.is_none() || start_time > Some(buffer.timestamp) {
|
||||||
|
start_time = Some(buffer.timestamp);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update durations for all buffers except for the last in the fragment unless all
|
||||||
|
// have the same duration anyway
|
||||||
|
let mut common_duration = Ok(None);
|
||||||
|
let mut drain_buffers_iter = drain_buffers.iter_mut().peekable();
|
||||||
|
while let Some(buffer) = drain_buffers_iter.next() {
|
||||||
|
let next_timestamp = drain_buffers_iter.peek().map(|b| b.timestamp);
|
||||||
|
|
||||||
|
if let Some(next_timestamp) = next_timestamp {
|
||||||
|
let duration = next_timestamp.saturating_sub(buffer.timestamp);
|
||||||
|
if common_duration == Ok(None) {
|
||||||
|
common_duration = Ok(Some(duration));
|
||||||
|
} else if common_duration != Ok(Some(duration)) {
|
||||||
|
common_duration = Err(());
|
||||||
|
}
|
||||||
|
|
||||||
gst::trace!(
|
gst::trace!(
|
||||||
CAT,
|
CAT,
|
||||||
obj: &state.streams[idx].sinkpad,
|
obj: &state.streams[idx].sinkpad,
|
||||||
"Updating buffer timestamp from {} to relative UTC DTS time {} / absolute DTS time {}, UTC PTS time {}",
|
"Updating buffer with timestamp {} duration from {} to relative UTC duration {}",
|
||||||
buffer.timestamp,
|
buffer.timestamp,
|
||||||
timestamp,
|
buffer.duration,
|
||||||
utc_time_dts,
|
duration,
|
||||||
utc_time,
|
|
||||||
);
|
);
|
||||||
|
|
||||||
buffer.timestamp = timestamp;
|
buffer.duration = duration;
|
||||||
if start_time.is_none() || start_time > Some(buffer.timestamp) {
|
} else if let Ok(Some(common_duration)) = common_duration {
|
||||||
start_time = Some(buffer.timestamp);
|
gst::trace!(
|
||||||
}
|
CAT,
|
||||||
}
|
obj: &state.streams[idx].sinkpad,
|
||||||
|
"Updating last buffer with timestamp {} duration from {} to common relative UTC duration {}",
|
||||||
|
buffer.timestamp,
|
||||||
|
buffer.duration,
|
||||||
|
common_duration,
|
||||||
|
);
|
||||||
|
|
||||||
// Update durations for all buffers except for the last in the fragment unless all
|
buffer.duration = common_duration;
|
||||||
// have the same duration anyway
|
|
||||||
let mut common_duration = Ok(None);
|
|
||||||
let mut drain_buffers_iter = drain_buffers.iter_mut().peekable();
|
|
||||||
while let Some(buffer) = drain_buffers_iter.next() {
|
|
||||||
let next_timestamp = drain_buffers_iter.peek().map(|b| b.timestamp);
|
|
||||||
|
|
||||||
if let Some(next_timestamp) = next_timestamp {
|
|
||||||
let duration = next_timestamp.saturating_sub(buffer.timestamp);
|
|
||||||
if common_duration == Ok(None) {
|
|
||||||
common_duration = Ok(Some(duration));
|
|
||||||
} else if common_duration != Ok(Some(duration)) {
|
|
||||||
common_duration = Err(());
|
|
||||||
}
|
|
||||||
|
|
||||||
gst::trace!(
|
|
||||||
CAT,
|
|
||||||
obj: &state.streams[idx].sinkpad,
|
|
||||||
"Updating buffer with timestamp {} duration from {} to relative UTC duration {}",
|
|
||||||
buffer.timestamp,
|
|
||||||
buffer.duration,
|
|
||||||
duration,
|
|
||||||
);
|
|
||||||
|
|
||||||
buffer.duration = duration;
|
|
||||||
} else if let Ok(Some(common_duration)) = common_duration {
|
|
||||||
gst::trace!(
|
|
||||||
CAT,
|
|
||||||
obj: &state.streams[idx].sinkpad,
|
|
||||||
"Updating last buffer with timestamp {} duration from {} to common relative UTC duration {}",
|
|
||||||
buffer.timestamp,
|
|
||||||
buffer.duration,
|
|
||||||
common_duration,
|
|
||||||
);
|
|
||||||
|
|
||||||
buffer.duration = common_duration;
|
|
||||||
} else {
|
|
||||||
gst::trace!(
|
|
||||||
CAT,
|
|
||||||
obj: &state.streams[idx].sinkpad,
|
|
||||||
"Keeping last buffer with timestamp {} duration at {}",
|
|
||||||
buffer.timestamp,
|
|
||||||
buffer.duration,
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
let end_utc_time = start_utc_time + buffer.timestamp + buffer.duration;
|
|
||||||
if max_end_utc_time.is_none() || max_end_utc_time < Some(end_utc_time) {
|
|
||||||
max_end_utc_time = Some(end_utc_time);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if let Some(start_time) = start_time {
|
|
||||||
gst::debug!(CAT, obj: &state.streams[idx].sinkpad, "Fragment starting at UTC time {}", start_time);
|
|
||||||
streams[idx].1.as_mut().unwrap().start_time = start_time;
|
|
||||||
} else {
|
} else {
|
||||||
assert!(streams[idx].1.is_none());
|
gst::trace!(
|
||||||
|
CAT,
|
||||||
|
obj: &state.streams[idx].sinkpad,
|
||||||
|
"Keeping last buffer with timestamp {} duration at {}",
|
||||||
|
buffer.timestamp,
|
||||||
|
buffer.duration,
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let end_utc_time = start_utc_time + buffer.timestamp + buffer.duration;
|
||||||
|
if max_end_utc_time.is_none() || max_end_utc_time < Some(end_utc_time) {
|
||||||
|
max_end_utc_time = Some(end_utc_time);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Some(start_time) = start_time {
|
||||||
|
gst::debug!(CAT, obj: &state.streams[idx].sinkpad, "Fragment starting at UTC time {}", start_time);
|
||||||
|
timing_info.as_mut().unwrap().start_time = start_time;
|
||||||
|
} else {
|
||||||
|
assert!(timing_info.is_none());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create header now if it was not created before and return the caps
|
Ok(max_end_utc_time)
|
||||||
let mut caps = None;
|
}
|
||||||
if state.stream_header.is_none() {
|
|
||||||
let (_, new_caps) = self
|
|
||||||
.update_header(element, state, settings, false)?
|
|
||||||
.unwrap();
|
|
||||||
caps = Some(new_caps);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Interleave buffers according to the settings into a single vec
|
#[allow(clippy::type_complexity)]
|
||||||
|
fn interleave_buffers(
|
||||||
|
&self,
|
||||||
|
_element: &super::FMP4Mux,
|
||||||
|
settings: &Settings,
|
||||||
|
mut drained_streams: Vec<(
|
||||||
|
gst::Caps,
|
||||||
|
Option<super::FragmentTimingInfo>,
|
||||||
|
VecDeque<Buffer>,
|
||||||
|
)>,
|
||||||
|
) -> Result<
|
||||||
|
(
|
||||||
|
Vec<Buffer>,
|
||||||
|
Vec<(gst::Caps, Option<super::FragmentTimingInfo>)>,
|
||||||
|
),
|
||||||
|
gst::FlowError,
|
||||||
|
> {
|
||||||
let mut interleaved_buffers =
|
let mut interleaved_buffers =
|
||||||
Vec::with_capacity(drain_buffers.iter().map(|bs| bs.len()).sum());
|
Vec::with_capacity(drained_streams.iter().map(|(_, _, bufs)| bufs.len()).sum());
|
||||||
while let Some((_idx, bs)) =
|
while let Some((_idx, (_, _, bufs))) = drained_streams.iter_mut().enumerate().min_by(
|
||||||
drain_buffers
|
|(a_idx, (_, _, a)), (b_idx, (_, _, b))| {
|
||||||
.iter_mut()
|
let (a, b) = match (a.front(), b.front()) {
|
||||||
.enumerate()
|
(None, None) => return std::cmp::Ordering::Equal,
|
||||||
.min_by(|(a_idx, a), (b_idx, b)| {
|
(None, _) => return std::cmp::Ordering::Greater,
|
||||||
let (a, b) = match (a.front(), b.front()) {
|
(_, None) => return std::cmp::Ordering::Less,
|
||||||
(None, None) => return std::cmp::Ordering::Equal,
|
(Some(a), Some(b)) => (a, b),
|
||||||
(None, _) => return std::cmp::Ordering::Greater,
|
};
|
||||||
(_, None) => return std::cmp::Ordering::Less,
|
|
||||||
(Some(a), Some(b)) => (a, b),
|
|
||||||
};
|
|
||||||
|
|
||||||
match a.timestamp.cmp(&b.timestamp) {
|
match a.timestamp.cmp(&b.timestamp) {
|
||||||
std::cmp::Ordering::Equal => a_idx.cmp(b_idx),
|
std::cmp::Ordering::Equal => a_idx.cmp(b_idx),
|
||||||
cmp => cmp,
|
cmp => cmp,
|
||||||
}
|
}
|
||||||
})
|
},
|
||||||
{
|
) {
|
||||||
let start_time = match bs.front() {
|
let start_time = match bufs.front() {
|
||||||
None => {
|
None => {
|
||||||
// No more buffers now
|
// No more buffers now
|
||||||
break;
|
break;
|
||||||
|
@ -1187,7 +1200,7 @@ impl FMP4Mux {
|
||||||
.opt_ge(current_end_time.saturating_sub(start_time))
|
.opt_ge(current_end_time.saturating_sub(start_time))
|
||||||
.unwrap_or(true)
|
.unwrap_or(true)
|
||||||
{
|
{
|
||||||
if let Some(buffer) = bs.pop_front() {
|
if let Some(buffer) = bufs.pop_front() {
|
||||||
current_end_time = buffer.timestamp + buffer.duration;
|
current_end_time = buffer.timestamp + buffer.duration;
|
||||||
dequeued_bytes += buffer.buffer.size() as u64;
|
dequeued_bytes += buffer.buffer.size() as u64;
|
||||||
interleaved_buffers.push(buffer);
|
interleaved_buffers.push(buffer);
|
||||||
|
@ -1198,13 +1211,78 @@ impl FMP4Mux {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
assert!(drain_buffers.iter().all(|bs| bs.is_empty()));
|
// All buffers should be consumed now
|
||||||
|
assert!(drained_streams.iter().all(|(_, _, bufs)| bufs.is_empty()));
|
||||||
|
|
||||||
|
let streams = drained_streams
|
||||||
|
.into_iter()
|
||||||
|
.map(|(caps, timing_info, _)| (caps, timing_info))
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
|
||||||
|
Ok((interleaved_buffers, streams))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn drain(
|
||||||
|
&self,
|
||||||
|
element: &super::FMP4Mux,
|
||||||
|
state: &mut State,
|
||||||
|
settings: &Settings,
|
||||||
|
timeout: bool,
|
||||||
|
at_eos: bool,
|
||||||
|
) -> Result<(Option<gst::Caps>, Option<gst::BufferList>), gst::FlowError> {
|
||||||
|
if at_eos {
|
||||||
|
gst::info!(CAT, obj: element, "Draining at EOS");
|
||||||
|
} else if timeout {
|
||||||
|
gst::info!(CAT, obj: element, "Draining at timeout");
|
||||||
|
} else {
|
||||||
|
for stream in &state.streams {
|
||||||
|
if !stream.fragment_filled && !stream.sinkpad.is_eos() {
|
||||||
|
return Ok((None, None));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Collect all buffers and their timing information that are to be drained right now.
|
||||||
|
let (
|
||||||
|
mut drained_streams,
|
||||||
|
min_earliest_pts_position,
|
||||||
|
min_earliest_pts,
|
||||||
|
min_start_dts_position,
|
||||||
|
max_end_pts,
|
||||||
|
) = self.drain_buffers(element, state, settings, timeout, at_eos)?;
|
||||||
|
|
||||||
|
// For ONVIF, replace all timestamps with timestamps based on UTC times.
|
||||||
|
let max_end_utc_time =
|
||||||
|
self.preprocess_drained_streams_onvif(element, state, &mut drained_streams)?;
|
||||||
|
|
||||||
|
// Remove all GAP buffers before processing them further
|
||||||
|
for (_, _, buffers) in &mut drained_streams {
|
||||||
|
buffers.retain(|buf| {
|
||||||
|
!buf.buffer.flags().contains(gst::BufferFlags::GAP)
|
||||||
|
|| !buf.buffer.flags().contains(gst::BufferFlags::DROPPABLE)
|
||||||
|
|| buf.buffer.size() != 0
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create header now if it was not created before and return the caps
|
||||||
|
let mut caps = None;
|
||||||
|
if state.stream_header.is_none() {
|
||||||
|
let (_, new_caps) = self
|
||||||
|
.update_header(element, state, settings, false)?
|
||||||
|
.unwrap();
|
||||||
|
caps = Some(new_caps);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Interleave buffers according to the settings into a single vec
|
||||||
|
let (mut interleaved_buffers, streams) =
|
||||||
|
self.interleave_buffers(element, settings, drained_streams)?;
|
||||||
|
|
||||||
let mut buffer_list = None;
|
let mut buffer_list = None;
|
||||||
|
|
||||||
if interleaved_buffers.is_empty() {
|
if interleaved_buffers.is_empty() {
|
||||||
assert!(timeout || at_eos);
|
assert!(timeout || at_eos);
|
||||||
} else {
|
} else {
|
||||||
|
// If there are actual buffers to output then create headers as needed and create a
|
||||||
|
// bufferlist for all buffers that have to be output.
|
||||||
let min_earliest_pts_position = min_earliest_pts_position.unwrap();
|
let min_earliest_pts_position = min_earliest_pts_position.unwrap();
|
||||||
let min_earliest_pts = min_earliest_pts.unwrap();
|
let min_earliest_pts = min_earliest_pts.unwrap();
|
||||||
let max_end_pts = max_end_pts.unwrap();
|
let max_end_pts = max_end_pts.unwrap();
|
||||||
|
@ -1237,7 +1315,7 @@ impl FMP4Mux {
|
||||||
state.sequence_number += 1;
|
state.sequence_number += 1;
|
||||||
let (mut fmp4_fragment_header, moof_offset) =
|
let (mut fmp4_fragment_header, moof_offset) =
|
||||||
boxes::create_fmp4_fragment_header(super::FragmentHeaderConfiguration {
|
boxes::create_fmp4_fragment_header(super::FragmentHeaderConfiguration {
|
||||||
variant: class.as_ref().variant,
|
variant: element.class().as_ref().variant,
|
||||||
sequence_number,
|
sequence_number,
|
||||||
streams: streams.as_slice(),
|
streams: streams.as_slice(),
|
||||||
buffers: interleaved_buffers.as_slice(),
|
buffers: interleaved_buffers.as_slice(),
|
||||||
|
|
Loading…
Reference in a new issue