diff --git a/generic/fmp4/src/fmp4mux/imp.rs b/generic/fmp4/src/fmp4mux/imp.rs index 2e9ff88f..0b2cf53c 100644 --- a/generic/fmp4/src/fmp4mux/imp.rs +++ b/generic/fmp4/src/fmp4mux/imp.rs @@ -672,30 +672,29 @@ impl FMP4Mux { )) } - fn drain( + #[allow(clippy::type_complexity)] + fn drain_buffers( &self, - element: &super::FMP4Mux, + _element: &super::FMP4Mux, state: &mut State, settings: &Settings, timeout: bool, at_eos: bool, - ) -> Result<(Option, Option), gst::FlowError> { - let class = element.class(); - - if at_eos { - gst::info!(CAT, obj: element, "Draining at EOS"); - } else if timeout { - gst::info!(CAT, obj: element, "Draining at timeout"); - } else { - for stream in &state.streams { - if !stream.fragment_filled && !stream.sinkpad.is_eos() { - return Ok((None, None)); - } - } - } - - let mut drain_buffers = Vec::with_capacity(state.streams.len()); - let mut streams = Vec::with_capacity(state.streams.len()); + ) -> Result< + ( + Vec<( + gst::Caps, + Option, + VecDeque, + )>, + Option, + Option, + Option, + Option, + ), + gst::FlowError, + > { + let mut drained_streams = Vec::with_capacity(state.streams.len()); let mut min_earliest_pts_position = None; let mut min_earliest_pts = None; @@ -740,192 +739,202 @@ impl FMP4Mux { "Draining no buffers", ); - streams.push((stream.caps.clone(), None)); - drain_buffers.push(VecDeque::new()); - } else { - let first_gop = gops.first().unwrap(); - let last_gop = gops.last().unwrap(); - let earliest_pts = first_gop.earliest_pts; - let earliest_pts_position = first_gop.earliest_pts_position; - let start_dts = first_gop.start_dts; - let start_dts_position = first_gop.start_dts_position; - let end_pts = last_gop.end_pts; - let dts_offset = stream.dts_offset; + drained_streams.push((stream.caps.clone(), None, VecDeque::new())); + continue; + } - if min_earliest_pts.opt_gt(earliest_pts).unwrap_or(true) { - min_earliest_pts = Some(earliest_pts); - } - if min_earliest_pts_position - .opt_gt(earliest_pts_position) + let first_gop = gops.first().unwrap(); + let last_gop = gops.last().unwrap(); + let earliest_pts = first_gop.earliest_pts; + let earliest_pts_position = first_gop.earliest_pts_position; + let start_dts = first_gop.start_dts; + let start_dts_position = first_gop.start_dts_position; + let end_pts = last_gop.end_pts; + let dts_offset = stream.dts_offset; + + if min_earliest_pts.opt_gt(earliest_pts).unwrap_or(true) { + min_earliest_pts = Some(earliest_pts); + } + if min_earliest_pts_position + .opt_gt(earliest_pts_position) + .unwrap_or(true) + { + min_earliest_pts_position = Some(earliest_pts_position); + } + if let Some(start_dts_position) = start_dts_position { + if min_start_dts_position + .opt_gt(start_dts_position) .unwrap_or(true) { - min_earliest_pts_position = Some(earliest_pts_position); - } - if let Some(start_dts_position) = start_dts_position { - if min_start_dts_position - .opt_gt(start_dts_position) - .unwrap_or(true) - { - min_start_dts_position = Some(start_dts_position); - } - } - if max_end_pts.opt_lt(end_pts).unwrap_or(true) { - max_end_pts = Some(end_pts); + min_start_dts_position = Some(start_dts_position); } + } + if max_end_pts.opt_lt(end_pts).unwrap_or(true) { + max_end_pts = Some(end_pts); + } - gst::info!( - CAT, - obj: &stream.sinkpad, - "Draining {} worth of buffers starting at PTS {} DTS {}, DTS offset {}", - end_pts.saturating_sub(earliest_pts), - earliest_pts, - start_dts.display(), - dts_offset.display(), - ); - - if let Some((prev_gop, first_gop)) = Option::zip( - stream.queued_gops.iter().find(|gop| gop.final_end_pts), - stream.queued_gops.back(), - ) { - gst::debug!( - CAT, - obj: &stream.sinkpad, - "Queued full GOPs duration updated to {}", - prev_gop.end_pts.saturating_sub(first_gop.earliest_pts), - ); - } + gst::info!( + CAT, + obj: &stream.sinkpad, + "Draining {} worth of buffers starting at PTS {} DTS {}, DTS offset {}", + end_pts.saturating_sub(earliest_pts), + earliest_pts, + start_dts.display(), + dts_offset.display(), + ); + if let Some((prev_gop, first_gop)) = Option::zip( + stream.queued_gops.iter().find(|gop| gop.final_end_pts), + stream.queued_gops.back(), + ) { gst::debug!( CAT, obj: &stream.sinkpad, - "Queued duration updated to {}", - Option::zip(stream.queued_gops.front(), stream.queued_gops.back()) - .map(|(end, start)| end.end_pts.saturating_sub(start.start_pts)) - .unwrap_or(gst::ClockTime::ZERO) + "Queued full GOPs duration updated to {}", + prev_gop.end_pts.saturating_sub(first_gop.earliest_pts), ); + } - let start_time = if stream.intra_only { - earliest_pts - } else { - start_dts.unwrap() - }; + gst::debug!( + CAT, + obj: &stream.sinkpad, + "Queued duration updated to {}", + Option::zip(stream.queued_gops.front(), stream.queued_gops.back()) + .map(|(end, start)| end.end_pts.saturating_sub(start.start_pts)) + .unwrap_or(gst::ClockTime::ZERO) + ); - streams.push(( - stream.caps.clone(), - Some(super::FragmentTimingInfo { - start_time, - intra_only: stream.intra_only, - }), - )); + let start_time = if stream.intra_only { + earliest_pts + } else { + start_dts.unwrap() + }; - let mut buffers = - VecDeque::with_capacity(gops.iter().map(|g| g.buffers.len()).sum()); + let mut buffers = VecDeque::with_capacity(gops.iter().map(|g| g.buffers.len()).sum()); - for gop in gops { - let mut gop_buffers = gop.buffers.into_iter().peekable(); - while let Some(buffer) = gop_buffers.next() { - let timestamp = if stream.intra_only { - buffer.pts - } else { - buffer.dts.unwrap() - }; + for gop in gops { + let mut gop_buffers = gop.buffers.into_iter().peekable(); + while let Some(buffer) = gop_buffers.next() { + let timestamp = if stream.intra_only { + buffer.pts + } else { + buffer.dts.unwrap() + }; - let end_timestamp = match gop_buffers.peek() { - Some(buffer) => { - if stream.intra_only { - buffer.pts - } else { - buffer.dts.unwrap() - } + let end_timestamp = match gop_buffers.peek() { + Some(buffer) => { + if stream.intra_only { + buffer.pts + } else { + buffer.dts.unwrap() } - None => { - if stream.intra_only { - gop.end_pts - } else { - gop.end_dts.unwrap() - } + } + None => { + if stream.intra_only { + gop.end_pts + } else { + gop.end_dts.unwrap() } - }; + } + }; - // Timestamps are enforced to monotonically increase when queueing buffers - let duration = end_timestamp - .checked_sub(timestamp) - .expect("Timestamps going backwards"); + // Timestamps are enforced to monotonically increase when queueing buffers + let duration = end_timestamp + .checked_sub(timestamp) + .expect("Timestamps going backwards"); - let composition_time_offset = if stream.intra_only { - None - } else { - let pts = buffer.pts; - let dts = buffer.dts.unwrap(); + let composition_time_offset = if stream.intra_only { + None + } else { + let pts = buffer.pts; + let dts = buffer.dts.unwrap(); - if pts > dts { - Some( + if pts > dts { + Some( i64::try_from((pts - dts).nseconds()) .map_err(|_| { gst::error!(CAT, obj: &stream.sinkpad, "Too big PTS/DTS difference"); gst::FlowError::Error })?, ) - } else { - let diff = i64::try_from((dts - pts).nseconds()) + } else { + let diff = i64::try_from((dts - pts).nseconds()) .map_err(|_| { gst::error!(CAT, obj: &stream.sinkpad, "Too big PTS/DTS difference"); gst::FlowError::Error })?; - Some(-diff) - } - }; + Some(-diff) + } + }; - buffers.push_back(Buffer { - idx, - buffer: buffer.buffer, - timestamp, - duration, - composition_time_offset, - }); - } + buffers.push_back(Buffer { + idx, + buffer: buffer.buffer, + timestamp, + duration, + composition_time_offset, + }); } - drain_buffers.push(buffers); } + + drained_streams.push(( + stream.caps.clone(), + Some(super::FragmentTimingInfo { + start_time, + intra_only: stream.intra_only, + }), + buffers, + )); } - // Remove all GAP buffers before processing them further - for buffers in &mut drain_buffers { - buffers.retain(|buf| { - !buf.buffer.flags().contains(gst::BufferFlags::GAP) - || !buf.buffer.flags().contains(gst::BufferFlags::DROPPABLE) - || buf.buffer.size() != 0 - }); + Ok(( + drained_streams, + min_earliest_pts_position, + min_earliest_pts, + min_start_dts_position, + max_end_pts, + )) + } + + fn preprocess_drained_streams_onvif( + &self, + element: &super::FMP4Mux, + state: &mut State, + drained_streams: &mut [( + gst::Caps, + Option, + VecDeque, + )], + ) -> Result, gst::FlowError> { + if element.class().as_ref().variant != super::Variant::ONVIF { + return Ok(None); } let mut max_end_utc_time = None; - // For ONVIF, replace all timestamps with timestamps based on UTC times. - if class.as_ref().variant == super::Variant::ONVIF { - let calculate_pts = |buffer: &Buffer| -> gst::ClockTime { - let composition_time_offset = buffer.composition_time_offset.unwrap_or(0); - if composition_time_offset > 0 { - buffer.timestamp + gst::ClockTime::from_nseconds(composition_time_offset as u64) - } else { - buffer - .timestamp - .checked_sub(gst::ClockTime::from_nseconds( - (-composition_time_offset) as u64, - )) - .unwrap() - } - }; - // If this is the first fragment then allow the first buffers to not have a reference - // timestamp meta and backdate them - if state.stream_header.is_none() { - for (idx, drain_buffers) in drain_buffers.iter_mut().enumerate() { - let (buffer_idx, utc_time, buffer) = match drain_buffers - .iter() - .enumerate() - .find_map(|(idx, buffer)| { - get_utc_time_from_buffer(&buffer.buffer) - .map(|timestamp| (idx, timestamp, buffer)) - }) { + let calculate_pts = |buffer: &Buffer| -> gst::ClockTime { + let composition_time_offset = buffer.composition_time_offset.unwrap_or(0); + if composition_time_offset > 0 { + buffer.timestamp + gst::ClockTime::from_nseconds(composition_time_offset as u64) + } else { + buffer + .timestamp + .checked_sub(gst::ClockTime::from_nseconds( + (-composition_time_offset) as u64, + )) + .unwrap() + } + }; + + // If this is the first fragment then allow the first buffers to not have a reference + // timestamp meta and backdate them + if state.stream_header.is_none() { + for (idx, (_, _, drain_buffers)) in drained_streams.iter_mut().enumerate() { + let (buffer_idx, utc_time, buffer) = + match drain_buffers.iter().enumerate().find_map(|(idx, buffer)| { + get_utc_time_from_buffer(&buffer.buffer) + .map(|timestamp| (idx, timestamp, buffer)) + }) { None => { gst::error!( CAT, @@ -937,82 +946,47 @@ impl FMP4Mux { Some(res) => res, }; - // Now do the backdating - if buffer_idx > 0 { - let utc_time_pts = calculate_pts(buffer); + // Now do the backdating + if buffer_idx > 0 { + let utc_time_pts = calculate_pts(buffer); - for buffer in drain_buffers.iter_mut().take(buffer_idx) { - let buffer_pts = calculate_pts(buffer); - let buffer_pts_diff = if utc_time_pts >= buffer_pts { - (utc_time_pts - buffer_pts).nseconds() as i64 - } else { - -((buffer_pts - utc_time_pts).nseconds() as i64) - }; - let buffer_utc_time = if buffer_pts_diff >= 0 { - utc_time - .checked_sub(gst::ClockTime::from_nseconds( - buffer_pts_diff as u64, - )) - .unwrap() - } else { - utc_time - .checked_add(gst::ClockTime::from_nseconds( - (-buffer_pts_diff) as u64, - )) - .unwrap() - }; - - let buffer = buffer.buffer.make_mut(); - gst::ReferenceTimestampMeta::add( - buffer, - &UNIX_CAPS, - buffer_utc_time, - gst::ClockTime::NONE, - ); - } - } - } - } - - // Calculate the minimum across all streams and remember that - if state.start_utc_time.is_none() { - let mut start_utc_time = None; - - for (idx, drain_buffers) in drain_buffers.iter().enumerate() { - for buffer in drain_buffers { - let utc_time = match get_utc_time_from_buffer(&buffer.buffer) { - None => { - gst::error!( - CAT, - obj: &state.streams[idx].sinkpad, - "No reference timestamp set on all buffers" - ); - return Err(gst::FlowError::Error); - } - Some(utc_time) => utc_time, + for buffer in drain_buffers.iter_mut().take(buffer_idx) { + let buffer_pts = calculate_pts(buffer); + let buffer_pts_diff = if utc_time_pts >= buffer_pts { + (utc_time_pts - buffer_pts).nseconds() as i64 + } else { + -((buffer_pts - utc_time_pts).nseconds() as i64) + }; + let buffer_utc_time = if buffer_pts_diff >= 0 { + utc_time + .checked_sub(gst::ClockTime::from_nseconds(buffer_pts_diff as u64)) + .unwrap() + } else { + utc_time + .checked_add(gst::ClockTime::from_nseconds( + (-buffer_pts_diff) as u64, + )) + .unwrap() }; - if start_utc_time.is_none() || start_utc_time > Some(utc_time) { - start_utc_time = Some(utc_time); - } + let buffer = buffer.buffer.make_mut(); + gst::ReferenceTimestampMeta::add( + buffer, + &UNIX_CAPS, + buffer_utc_time, + gst::ClockTime::NONE, + ); } } - - gst::debug!( - CAT, - obj: element, - "Configuring start UTC time {}", - start_utc_time.unwrap() - ); - state.start_utc_time = start_utc_time; } + } - // Update all buffer timestamps based on the UTC time and offset to the start UTC time - let start_utc_time = state.start_utc_time.unwrap(); - for (idx, drain_buffers) in drain_buffers.iter_mut().enumerate() { - let mut start_time = None; + // Calculate the minimum across all streams and remember that + if state.start_utc_time.is_none() { + let mut start_utc_time = None; - for buffer in drain_buffers.iter_mut() { + for (idx, (_, _, drain_buffers)) in drained_streams.iter().enumerate() { + for buffer in drain_buffers { let utc_time = match get_utc_time_from_buffer(&buffer.buffer) { None => { gst::error!( @@ -1025,150 +999,189 @@ impl FMP4Mux { Some(utc_time) => utc_time, }; - // Convert PTS UTC time to DTS - let mut utc_time_dts = - if let Some(composition_time_offset) = buffer.composition_time_offset { - if composition_time_offset >= 0 { - utc_time - .checked_sub(gst::ClockTime::from_nseconds( - composition_time_offset as u64, - )) - .unwrap() - } else { - utc_time - .checked_add(gst::ClockTime::from_nseconds( - (-composition_time_offset) as u64, - )) - .unwrap() - } - } else { - utc_time - }; + if start_utc_time.is_none() || start_utc_time > Some(utc_time) { + start_utc_time = Some(utc_time); + } + } + } - // Enforce monotonically increasing timestamps - if utc_time_dts < state.streams[idx].current_utc_time { - gst::warning!( + gst::debug!( + CAT, + obj: element, + "Configuring start UTC time {}", + start_utc_time.unwrap() + ); + state.start_utc_time = start_utc_time; + } + + // Update all buffer timestamps based on the UTC time and offset to the start UTC time + let start_utc_time = state.start_utc_time.unwrap(); + for (idx, (_, timing_info, drain_buffers)) in drained_streams.iter_mut().enumerate() { + let mut start_time = None; + + for buffer in drain_buffers.iter_mut() { + let utc_time = match get_utc_time_from_buffer(&buffer.buffer) { + None => { + gst::error!( CAT, obj: &state.streams[idx].sinkpad, - "Decreasing UTC DTS timestamp for buffer {} < {}", - utc_time_dts, - state.streams[idx].current_utc_time, + "No reference timestamp set on all buffers" ); - utc_time_dts = state.streams[idx].current_utc_time; - } else { - state.streams[idx].current_utc_time = utc_time_dts; + return Err(gst::FlowError::Error); } + Some(utc_time) => utc_time, + }; - let timestamp = utc_time_dts.checked_sub(start_utc_time).unwrap(); + // Convert PTS UTC time to DTS + let mut utc_time_dts = + if let Some(composition_time_offset) = buffer.composition_time_offset { + if composition_time_offset >= 0 { + utc_time + .checked_sub(gst::ClockTime::from_nseconds( + composition_time_offset as u64, + )) + .unwrap() + } else { + utc_time + .checked_add(gst::ClockTime::from_nseconds( + (-composition_time_offset) as u64, + )) + .unwrap() + } + } else { + utc_time + }; + + // Enforce monotonically increasing timestamps + if utc_time_dts < state.streams[idx].current_utc_time { + gst::warning!( + CAT, + obj: &state.streams[idx].sinkpad, + "Decreasing UTC DTS timestamp for buffer {} < {}", + utc_time_dts, + state.streams[idx].current_utc_time, + ); + utc_time_dts = state.streams[idx].current_utc_time; + } else { + state.streams[idx].current_utc_time = utc_time_dts; + } + + let timestamp = utc_time_dts.checked_sub(start_utc_time).unwrap(); + + gst::trace!( + CAT, + obj: &state.streams[idx].sinkpad, + "Updating buffer timestamp from {} to relative UTC DTS time {} / absolute DTS time {}, UTC PTS time {}", + buffer.timestamp, + timestamp, + utc_time_dts, + utc_time, + ); + + buffer.timestamp = timestamp; + if start_time.is_none() || start_time > Some(buffer.timestamp) { + start_time = Some(buffer.timestamp); + } + } + + // Update durations for all buffers except for the last in the fragment unless all + // have the same duration anyway + let mut common_duration = Ok(None); + let mut drain_buffers_iter = drain_buffers.iter_mut().peekable(); + while let Some(buffer) = drain_buffers_iter.next() { + let next_timestamp = drain_buffers_iter.peek().map(|b| b.timestamp); + + if let Some(next_timestamp) = next_timestamp { + let duration = next_timestamp.saturating_sub(buffer.timestamp); + if common_duration == Ok(None) { + common_duration = Ok(Some(duration)); + } else if common_duration != Ok(Some(duration)) { + common_duration = Err(()); + } gst::trace!( CAT, obj: &state.streams[idx].sinkpad, - "Updating buffer timestamp from {} to relative UTC DTS time {} / absolute DTS time {}, UTC PTS time {}", + "Updating buffer with timestamp {} duration from {} to relative UTC duration {}", buffer.timestamp, - timestamp, - utc_time_dts, - utc_time, + buffer.duration, + duration, ); - buffer.timestamp = timestamp; - if start_time.is_none() || start_time > Some(buffer.timestamp) { - start_time = Some(buffer.timestamp); - } - } + buffer.duration = duration; + } else if let Ok(Some(common_duration)) = common_duration { + gst::trace!( + CAT, + obj: &state.streams[idx].sinkpad, + "Updating last buffer with timestamp {} duration from {} to common relative UTC duration {}", + buffer.timestamp, + buffer.duration, + common_duration, + ); - // Update durations for all buffers except for the last in the fragment unless all - // have the same duration anyway - let mut common_duration = Ok(None); - let mut drain_buffers_iter = drain_buffers.iter_mut().peekable(); - while let Some(buffer) = drain_buffers_iter.next() { - let next_timestamp = drain_buffers_iter.peek().map(|b| b.timestamp); - - if let Some(next_timestamp) = next_timestamp { - let duration = next_timestamp.saturating_sub(buffer.timestamp); - if common_duration == Ok(None) { - common_duration = Ok(Some(duration)); - } else if common_duration != Ok(Some(duration)) { - common_duration = Err(()); - } - - gst::trace!( - CAT, - obj: &state.streams[idx].sinkpad, - "Updating buffer with timestamp {} duration from {} to relative UTC duration {}", - buffer.timestamp, - buffer.duration, - duration, - ); - - buffer.duration = duration; - } else if let Ok(Some(common_duration)) = common_duration { - gst::trace!( - CAT, - obj: &state.streams[idx].sinkpad, - "Updating last buffer with timestamp {} duration from {} to common relative UTC duration {}", - buffer.timestamp, - buffer.duration, - common_duration, - ); - - buffer.duration = common_duration; - } else { - gst::trace!( - CAT, - obj: &state.streams[idx].sinkpad, - "Keeping last buffer with timestamp {} duration at {}", - buffer.timestamp, - buffer.duration, - ); - } - - let end_utc_time = start_utc_time + buffer.timestamp + buffer.duration; - if max_end_utc_time.is_none() || max_end_utc_time < Some(end_utc_time) { - max_end_utc_time = Some(end_utc_time); - } - } - - if let Some(start_time) = start_time { - gst::debug!(CAT, obj: &state.streams[idx].sinkpad, "Fragment starting at UTC time {}", start_time); - streams[idx].1.as_mut().unwrap().start_time = start_time; + buffer.duration = common_duration; } else { - assert!(streams[idx].1.is_none()); + gst::trace!( + CAT, + obj: &state.streams[idx].sinkpad, + "Keeping last buffer with timestamp {} duration at {}", + buffer.timestamp, + buffer.duration, + ); } + + let end_utc_time = start_utc_time + buffer.timestamp + buffer.duration; + if max_end_utc_time.is_none() || max_end_utc_time < Some(end_utc_time) { + max_end_utc_time = Some(end_utc_time); + } + } + + if let Some(start_time) = start_time { + gst::debug!(CAT, obj: &state.streams[idx].sinkpad, "Fragment starting at UTC time {}", start_time); + timing_info.as_mut().unwrap().start_time = start_time; + } else { + assert!(timing_info.is_none()); } } - // Create header now if it was not created before and return the caps - let mut caps = None; - if state.stream_header.is_none() { - let (_, new_caps) = self - .update_header(element, state, settings, false)? - .unwrap(); - caps = Some(new_caps); - } + Ok(max_end_utc_time) + } - // Interleave buffers according to the settings into a single vec + #[allow(clippy::type_complexity)] + fn interleave_buffers( + &self, + _element: &super::FMP4Mux, + settings: &Settings, + mut drained_streams: Vec<( + gst::Caps, + Option, + VecDeque, + )>, + ) -> Result< + ( + Vec, + Vec<(gst::Caps, Option)>, + ), + gst::FlowError, + > { let mut interleaved_buffers = - Vec::with_capacity(drain_buffers.iter().map(|bs| bs.len()).sum()); - while let Some((_idx, bs)) = - drain_buffers - .iter_mut() - .enumerate() - .min_by(|(a_idx, a), (b_idx, b)| { - let (a, b) = match (a.front(), b.front()) { - (None, None) => return std::cmp::Ordering::Equal, - (None, _) => return std::cmp::Ordering::Greater, - (_, None) => return std::cmp::Ordering::Less, - (Some(a), Some(b)) => (a, b), - }; + Vec::with_capacity(drained_streams.iter().map(|(_, _, bufs)| bufs.len()).sum()); + while let Some((_idx, (_, _, bufs))) = drained_streams.iter_mut().enumerate().min_by( + |(a_idx, (_, _, a)), (b_idx, (_, _, b))| { + let (a, b) = match (a.front(), b.front()) { + (None, None) => return std::cmp::Ordering::Equal, + (None, _) => return std::cmp::Ordering::Greater, + (_, None) => return std::cmp::Ordering::Less, + (Some(a), Some(b)) => (a, b), + }; - match a.timestamp.cmp(&b.timestamp) { - std::cmp::Ordering::Equal => a_idx.cmp(b_idx), - cmp => cmp, - } - }) - { - let start_time = match bs.front() { + match a.timestamp.cmp(&b.timestamp) { + std::cmp::Ordering::Equal => a_idx.cmp(b_idx), + cmp => cmp, + } + }, + ) { + let start_time = match bufs.front() { None => { // No more buffers now break; @@ -1187,7 +1200,7 @@ impl FMP4Mux { .opt_ge(current_end_time.saturating_sub(start_time)) .unwrap_or(true) { - if let Some(buffer) = bs.pop_front() { + if let Some(buffer) = bufs.pop_front() { current_end_time = buffer.timestamp + buffer.duration; dequeued_bytes += buffer.buffer.size() as u64; interleaved_buffers.push(buffer); @@ -1198,13 +1211,78 @@ impl FMP4Mux { } } - assert!(drain_buffers.iter().all(|bs| bs.is_empty())); + // All buffers should be consumed now + assert!(drained_streams.iter().all(|(_, _, bufs)| bufs.is_empty())); + + let streams = drained_streams + .into_iter() + .map(|(caps, timing_info, _)| (caps, timing_info)) + .collect::>(); + + Ok((interleaved_buffers, streams)) + } + + fn drain( + &self, + element: &super::FMP4Mux, + state: &mut State, + settings: &Settings, + timeout: bool, + at_eos: bool, + ) -> Result<(Option, Option), gst::FlowError> { + if at_eos { + gst::info!(CAT, obj: element, "Draining at EOS"); + } else if timeout { + gst::info!(CAT, obj: element, "Draining at timeout"); + } else { + for stream in &state.streams { + if !stream.fragment_filled && !stream.sinkpad.is_eos() { + return Ok((None, None)); + } + } + } + + // Collect all buffers and their timing information that are to be drained right now. + let ( + mut drained_streams, + min_earliest_pts_position, + min_earliest_pts, + min_start_dts_position, + max_end_pts, + ) = self.drain_buffers(element, state, settings, timeout, at_eos)?; + + // For ONVIF, replace all timestamps with timestamps based on UTC times. + let max_end_utc_time = + self.preprocess_drained_streams_onvif(element, state, &mut drained_streams)?; + + // Remove all GAP buffers before processing them further + for (_, _, buffers) in &mut drained_streams { + buffers.retain(|buf| { + !buf.buffer.flags().contains(gst::BufferFlags::GAP) + || !buf.buffer.flags().contains(gst::BufferFlags::DROPPABLE) + || buf.buffer.size() != 0 + }); + } + + // Create header now if it was not created before and return the caps + let mut caps = None; + if state.stream_header.is_none() { + let (_, new_caps) = self + .update_header(element, state, settings, false)? + .unwrap(); + caps = Some(new_caps); + } + + // Interleave buffers according to the settings into a single vec + let (mut interleaved_buffers, streams) = + self.interleave_buffers(element, settings, drained_streams)?; let mut buffer_list = None; - if interleaved_buffers.is_empty() { assert!(timeout || at_eos); } else { + // If there are actual buffers to output then create headers as needed and create a + // bufferlist for all buffers that have to be output. let min_earliest_pts_position = min_earliest_pts_position.unwrap(); let min_earliest_pts = min_earliest_pts.unwrap(); let max_end_pts = max_end_pts.unwrap(); @@ -1237,7 +1315,7 @@ impl FMP4Mux { state.sequence_number += 1; let (mut fmp4_fragment_header, moof_offset) = boxes::create_fmp4_fragment_header(super::FragmentHeaderConfiguration { - variant: class.as_ref().variant, + variant: element.class().as_ref().variant, sequence_number, streams: streams.as_slice(), buffers: interleaved_buffers.as_slice(),