diff --git a/mux/fmp4/src/fmp4mux/imp.rs b/mux/fmp4/src/fmp4mux/imp.rs index 88c84b14..27ca8200 100644 --- a/mux/fmp4/src/fmp4mux/imp.rs +++ b/mux/fmp4/src/fmp4mux/imp.rs @@ -294,36 +294,37 @@ impl FMP4Mux { Ok(()) } + /// Peek the currently queued buffer on this stream. + /// + /// This also determines the PTS/DTS that is finally going to be used, including + /// timestamp conversion to the UTC times in ONVIF mode. fn peek_buffer( &self, - sinkpad: &super::FMP4MuxPad, - delta_frames: super::DeltaFrames, - pre_queue: &mut VecDeque, - running_time_utc_time_mapping: &mut Option<(gst::Signed, gst::ClockTime)>, + stream: &mut Stream, fragment_duration: gst::ClockTime, ) -> Result, gst::FlowError> { // If not in ONVIF mode or the mapping is already known and there is a pre-queued buffer // then we can directly return it from here. if self.obj().class().as_ref().variant != super::Variant::ONVIF - || running_time_utc_time_mapping.is_some() + || stream.running_time_utc_time_mapping.is_some() { - if let Some(pre_queued_buffer) = pre_queue.front() { + if let Some(pre_queued_buffer) = stream.pre_queue.front() { return Ok(Some(pre_queued_buffer.clone())); } } // Pop buffer here, it will be stored in the pre-queue after calculating its timestamps - let mut buffer = match sinkpad.pop_buffer() { + let mut buffer = match stream.sinkpad.pop_buffer() { None => return Ok(None), Some(buffer) => buffer, }; - Self::check_buffer(&buffer, sinkpad, delta_frames)?; + Self::check_buffer(&buffer, &stream.sinkpad, stream.delta_frames)?; - let segment = match sinkpad.segment().downcast::().ok() { + let segment = match stream.sinkpad.segment().downcast::().ok() { Some(segment) => segment, None => { - gst::error!(CAT, obj: sinkpad, "Got buffer before segment"); + gst::error!(CAT, obj: stream.sinkpad, "Got buffer before segment"); return Err(gst::FlowError::Error); } }; @@ -335,12 +336,12 @@ impl FMP4Mux { let pts = segment .to_running_time_full(pts_position) .ok_or_else(|| { - gst::error!(CAT, obj: sinkpad, "Couldn't convert PTS to running time"); + gst::error!(CAT, obj: stream.sinkpad, "Couldn't convert PTS to running time"); gst::FlowError::Error })? .positive() .unwrap_or_else(|| { - gst::warning!(CAT, obj: sinkpad, "Negative PTSs are not supported"); + gst::warning!(CAT, obj: stream.sinkpad, "Negative PTSs are not supported"); gst::ClockTime::ZERO }); @@ -349,18 +350,18 @@ impl FMP4Mux { .ok_or_else(|| { gst::error!( CAT, - obj: sinkpad, + obj: stream.sinkpad, "Couldn't convert end PTS to running time" ); gst::FlowError::Error })? .positive() .unwrap_or_else(|| { - gst::warning!(CAT, obj: sinkpad, "Negative PTSs are not supported"); + gst::warning!(CAT, obj: stream.sinkpad, "Negative PTSs are not supported"); gst::ClockTime::ZERO }); - let (dts, end_dts) = if !delta_frames.requires_dts() { + let (dts, end_dts) = if !stream.delta_frames.requires_dts() { (None, None) } else { // Negative DTS are handled via the dts_offset and by having negative composition time @@ -369,7 +370,7 @@ impl FMP4Mux { let end_dts_position = duration.opt_add(dts_position).unwrap_or(dts_position); let dts = segment.to_running_time_full(dts_position).ok_or_else(|| { - gst::error!(CAT, obj: sinkpad, "Couldn't convert DTS to running time"); + gst::error!(CAT, obj: stream.sinkpad, "Couldn't convert DTS to running time"); gst::FlowError::Error })?; @@ -378,7 +379,7 @@ impl FMP4Mux { .ok_or_else(|| { gst::error!( CAT, - obj: sinkpad, + obj: stream.sinkpad, "Couldn't convert end DTS to running time" ); gst::FlowError::Error @@ -407,14 +408,14 @@ impl FMP4Mux { if self.obj().class().as_ref().variant != super::Variant::ONVIF { // Store in the queue so we don't have to recalculate this all the time - pre_queue.push_back(PreQueuedBuffer { + stream.pre_queue.push_back(PreQueuedBuffer { buffer, pts, end_pts, dts, end_dts, }); - } else if let Some(running_time_utc_time_mapping) = running_time_utc_time_mapping { + } else if let Some(running_time_utc_time_mapping) = stream.running_time_utc_time_mapping { // For ONVIF we need to re-timestamp the buffer with its UTC time. // // After re-timestamping, put the buffer into the pre-queue so re-timestamping only has to @@ -422,9 +423,9 @@ impl FMP4Mux { let utc_time = match get_utc_time_from_buffer(&buffer) { None => { // Calculate from the mapping - running_time_to_utc_time(pts, *running_time_utc_time_mapping).ok_or_else( + running_time_to_utc_time(pts, running_time_utc_time_mapping).ok_or_else( || { - gst::error!(CAT, obj: sinkpad, "Stream has negative PTS UTC time"); + gst::error!(CAT, obj: stream.sinkpad, "Stream has negative PTS UTC time"); gst::FlowError::Error }, )? @@ -433,31 +434,31 @@ impl FMP4Mux { }; gst::trace!( CAT, - obj: sinkpad, + obj: stream.sinkpad, "Mapped PTS running time {pts} to UTC time {utc_time}" ); let end_pts_utc_time = running_time_to_utc_time(end_pts, (pts, utc_time)).ok_or_else(|| { - gst::error!(CAT, obj: sinkpad, "Stream has negative end PTS UTC time"); + gst::error!(CAT, obj: stream.sinkpad, "Stream has negative end PTS UTC time"); gst::FlowError::Error })?; let (dts_utc_time, end_dts_utc_time) = if let Some(dts) = dts { let dts_utc_time = running_time_to_utc_time(dts, (pts, utc_time)).ok_or_else(|| { - gst::error!(CAT, obj: sinkpad, "Stream has negative DTS UTC time"); + gst::error!(CAT, obj: stream.sinkpad, "Stream has negative DTS UTC time"); gst::FlowError::Error })?; gst::trace!( CAT, - obj: sinkpad, + obj: stream.sinkpad, "Mapped DTS running time {dts} to UTC time {dts_utc_time}" ); let end_dts_utc_time = running_time_to_utc_time(end_dts.unwrap(), (pts, utc_time)) .ok_or_else(|| { - gst::error!(CAT, obj: sinkpad, "Stream has negative end DTS UTC time"); + gst::error!(CAT, obj: stream.sinkpad, "Stream has negative end DTS UTC time"); gst::FlowError::Error })?; @@ -469,7 +470,7 @@ impl FMP4Mux { (None, None) }; - pre_queue.push_back(PreQueuedBuffer { + stream.pre_queue.push_back(PreQueuedBuffer { buffer, pts: utc_time, end_pts: end_pts_utc_time, @@ -479,9 +480,11 @@ impl FMP4Mux { } else { // In ONVIF mode we need to get UTC times for each buffer and synchronize based on that. // Queue up to min(6s, fragment_duration) of data in the very beginning to get the first UTC time and then backdate. - if let Some((last, first)) = Option::zip(pre_queue.back(), pre_queue.front()) { + if let Some((last, first)) = + Option::zip(stream.pre_queue.back(), stream.pre_queue.front()) + { // Existence of PTS/DTS checked below - let (last, first) = if delta_frames.requires_dts() { + let (last, first) = if stream.delta_frames.requires_dts() { (last.end_dts.unwrap(), first.end_dts.unwrap()) } else { ( @@ -494,7 +497,7 @@ impl FMP4Mux { if last.saturating_sub(first) > gst::Signed::Positive(limit) { gst::error!( CAT, - obj: sinkpad, + obj: stream.sinkpad, "Got no UTC time in the first {limit} of the stream" ); return Err(gst::FlowError::Error); @@ -504,7 +507,7 @@ impl FMP4Mux { let utc_time = match get_utc_time_from_buffer(&buffer) { Some(utc_time) => utc_time, None => { - pre_queue.push_back(PreQueuedBuffer { + stream.pre_queue.push_back(PreQueuedBuffer { buffer, pts, end_pts, @@ -516,11 +519,11 @@ impl FMP4Mux { }; let mapping = (gst::Signed::Positive(pts), utc_time); - *running_time_utc_time_mapping = Some(mapping); + stream.running_time_utc_time_mapping = Some(mapping); // Push the buffer onto the pre-queue and re-timestamp it and all other buffers // based on the mapping above once we have an UTC time. - pre_queue.push_back(PreQueuedBuffer { + stream.pre_queue.push_back(PreQueuedBuffer { buffer, pts, end_pts, @@ -528,15 +531,15 @@ impl FMP4Mux { end_dts, }); - for pre_queued_buffer in pre_queue.iter_mut() { + for pre_queued_buffer in stream.pre_queue.iter_mut() { let pts_utc_time = running_time_to_utc_time(pre_queued_buffer.pts, mapping) .ok_or_else(|| { - gst::error!(CAT, obj: sinkpad, "Stream has negative PTS UTC time"); + gst::error!(CAT, obj: stream.sinkpad, "Stream has negative PTS UTC time"); gst::FlowError::Error })?; gst::trace!( CAT, - obj: sinkpad, + obj: stream.sinkpad, "Mapped PTS running time {} to UTC time {pts_utc_time}", pre_queued_buffer.pts, ); @@ -544,29 +547,31 @@ impl FMP4Mux { let end_pts_utc_time = running_time_to_utc_time(pre_queued_buffer.end_pts, mapping) .ok_or_else(|| { - gst::error!(CAT, obj: sinkpad, "Stream has negative end PTS UTC time"); + gst::error!(CAT, obj: stream.sinkpad, "Stream has negative end PTS UTC time"); gst::FlowError::Error })?; pre_queued_buffer.end_pts = end_pts_utc_time; if let Some(dts) = pre_queued_buffer.dts { let dts_utc_time = running_time_to_utc_time(dts, mapping).ok_or_else(|| { - gst::error!(CAT, obj: sinkpad, "Stream has negative DTS UTC time"); + gst::error!(CAT, obj: stream.sinkpad, "Stream has negative DTS UTC time"); gst::FlowError::Error })?; gst::trace!( CAT, - obj: sinkpad, + obj: stream.sinkpad, "Mapped DTS running time {dts} to UTC time {dts_utc_time}" ); pre_queued_buffer.dts = Some(gst::Signed::Positive(dts_utc_time)); - let end_dts_utc_time = - running_time_to_utc_time(pre_queued_buffer.end_dts.unwrap(), mapping) - .ok_or_else(|| { - gst::error!(CAT, obj: sinkpad, "Stream has negative DTS UTC time"); - gst::FlowError::Error - })?; + let end_dts_utc_time = running_time_to_utc_time( + pre_queued_buffer.end_dts.unwrap(), + mapping, + ) + .ok_or_else(|| { + gst::error!(CAT, obj: stream.sinkpad, "Stream has negative DTS UTC time"); + gst::FlowError::Error + })?; pre_queued_buffer.end_dts = Some(gst::Signed::Positive(end_dts_utc_time)); } } @@ -574,44 +579,34 @@ impl FMP4Mux { // Fall through and return the front of the queue } - Ok(Some(pre_queue.front().unwrap().clone())) + Ok(Some(stream.pre_queue.front().unwrap().clone())) } - fn pop_buffer( - &self, - _sinkpad: &super::FMP4MuxPad, - pre_queue: &mut VecDeque, - running_time_utc_time_mapping: &Option<(gst::Signed, gst::ClockTime)>, - ) -> PreQueuedBuffer { + /// Pop the currently queued buffer from this stream. + fn pop_buffer(&self, stream: &mut Stream) -> PreQueuedBuffer { // Only allowed to be called after peek was successful so there must be a buffer now // or in ONVIF mode we must also know the mapping now. - assert!(!pre_queue.is_empty()); + assert!(!stream.pre_queue.is_empty()); if self.obj().class().as_ref().variant == super::Variant::ONVIF { - assert!(running_time_utc_time_mapping.is_some()); + assert!(stream.running_time_utc_time_mapping.is_some()); } - pre_queue.pop_front().unwrap() + stream.pre_queue.pop_front().unwrap() } + /// Finds the stream that has the earliest buffer queued. fn find_earliest_stream<'a>( &self, state: &'a mut State, timeout: bool, fragment_duration: gst::ClockTime, - ) -> Result, gst::FlowError> { + ) -> Result, gst::FlowError> { let mut earliest_stream = None; let mut all_have_data_or_eos = true; - for (idx, stream) in state.streams.iter_mut().enumerate() { - let pre_queued_buffer = match Self::peek_buffer( - self, - &stream.sinkpad, - stream.delta_frames, - &mut stream.pre_queue, - &mut stream.running_time_utc_time_mapping, - fragment_duration, - ) { + for stream in state.streams.iter_mut() { + let pre_queued_buffer = match Self::peek_buffer(self, stream, fragment_duration) { Ok(Some(buffer)) => buffer, Ok(None) | Err(gst_base::AGGREGATOR_FLOW_NEED_DATA) => { if stream.sinkpad.is_eos() { @@ -635,7 +630,13 @@ impl FMP4Mux { continue; } - gst::trace!(CAT, obj: stream.sinkpad, "Stream has running time PTS {} / DTS {} queued", pre_queued_buffer.pts, pre_queued_buffer.dts.display()); + gst::trace!( + CAT, + obj: stream.sinkpad, + "Stream has running time PTS {} / DTS {} queued", + pre_queued_buffer.pts, + pre_queued_buffer.dts.display(), + ); let running_time = if stream.delta_frames.requires_dts() { pre_queued_buffer.dts.unwrap() @@ -645,11 +646,11 @@ impl FMP4Mux { if earliest_stream .as_ref() - .map_or(true, |(_idx, _stream, earliest_running_time)| { + .map_or(true, |(_stream, earliest_running_time)| { *earliest_running_time > running_time }) { - earliest_stream = Some((idx, stream, running_time)); + earliest_stream = Some((stream, running_time)); } } @@ -660,7 +661,7 @@ impl FMP4Mux { "No timeout and not all streams have a buffer or are EOS" ); Ok(None) - } else if let Some((idx, stream, earliest_running_time)) = earliest_stream { + } else if let Some((stream, earliest_running_time)) = earliest_stream { gst::trace!( CAT, imp: self, @@ -668,17 +669,16 @@ impl FMP4Mux { stream.sinkpad.name(), earliest_running_time ); - Ok(Some((idx, stream))) + Ok(Some(stream)) } else { gst::trace!(CAT, imp: self, "No streams have data queued currently"); Ok(None) } } - // Queue incoming buffers as individual GOPs. + /// Queue incoming buffer as individual GOPs. fn queue_gops( &self, - _idx: usize, stream: &mut Stream, mut pre_queued_buffer: PreQueuedBuffer, ) -> Result<(), gst::FlowError> { @@ -937,6 +937,34 @@ impl FMP4Mux { Ok(()) } + /// Queue buffers from all streams that are not filled for the current fragment yet + fn queue_available_buffers( + &self, + state: &mut State, + settings: &Settings, + timeout: bool, + ) -> Result<(), gst::FlowError> { + let fragment_start_pts = state.fragment_start_pts; + let chunk_start_pts = state.chunk_start_pts; + + // Always take a buffer from the stream with the earliest queued buffer to keep the + // fill-level at all sinkpads in sync. + while let Some(stream) = + self.find_earliest_stream(state, timeout, settings.fragment_duration)? + { + let pre_queued_buffer = Self::pop_buffer(self, stream); + + // Queue up the buffer and update GOP tracking state + self.queue_gops(stream, pre_queued_buffer)?; + + // Check if this stream is filled enough now. + self.check_stream_filled(settings, stream, fragment_start_pts, chunk_start_pts, false); + } + + Ok(()) + } + + /// Check if the stream is filled enough for the current chunk / fragment. fn check_stream_filled( &self, settings: &Settings, @@ -1051,6 +1079,9 @@ impl FMP4Mux { } } + /// Calculate earliest PTS, i.e. PTS of the very first fragment. + /// + /// This also sends a force-keyunit event for the start of the second fragment. fn calculate_earliest_pts( &self, settings: &Settings, @@ -1186,13 +1217,450 @@ impl FMP4Mux { } } + /// Drain buffers from a single stream. + #[allow(clippy::too_many_arguments)] + fn drain_buffers_one_stream( + &self, + settings: &Settings, + stream: &mut Stream, + timeout: bool, + all_eos: bool, + fragment_start_pts: gst::ClockTime, + chunk_start_pts: gst::ClockTime, + chunk_end_pts: Option, + fragment_start: bool, + fragment_filled: bool, + ) -> Result, gst::FlowError> { + assert!( + timeout + || all_eos + || stream.sinkpad.is_eos() + || stream.queued_gops.get(1).map(|gop| gop.final_earliest_pts) == Some(true) + || settings.chunk_duration.is_some() + ); + + let mut gops = Vec::with_capacity(stream.queued_gops.len()); + if stream.queued_gops.is_empty() { + return Ok(gops); + } + + // For the first stream drain as much as necessary and decide the end of this + // fragment or chunk, for all other streams drain up to that position. + + if let Some(chunk_duration) = settings.chunk_duration { + // Chunk mode + + let dequeue_end_pts = if let Some(chunk_end_pts) = chunk_end_pts { + // Not the first stream + chunk_end_pts + } else if fragment_filled { + // Fragment is filled, so only dequeue everything until the latest GOP + fragment_start_pts + settings.fragment_duration + } else { + // Fragment is not filled and we either have a full chunk or timeout + chunk_start_pts + chunk_duration + }; + + gst::trace!( + CAT, + obj: stream.sinkpad, + "Draining up to end PTS {} / duration {}", + dequeue_end_pts, + dequeue_end_pts - chunk_start_pts + ); + + while let Some(gop) = stream.queued_gops.back() { + // If this should be the last chunk of a fragment then only drain every + // finished GOP until the chunk end PTS. If there is no finished GOP for + // this stream (it would be not the first stream then), then drain + // everything up to the chunk end PTS. + // + // If this chunk is not the last chunk of a fragment then simply dequeue + // everything up to the chunk end PTS. + if fragment_filled { + gst::trace!( + CAT, + obj: stream.sinkpad, + "Fragment filled, current GOP start {} end {} (final {})", + gop.start_pts, gop.end_pts, + gop.final_end_pts || all_eos || stream.sinkpad.is_eos() + ); + + if (gop.final_end_pts || all_eos || stream.sinkpad.is_eos()) + && gop.end_pts <= dequeue_end_pts + { + gst::trace!( + CAT, + obj: stream.sinkpad, + "Pushing whole GOP", + ); + gops.push(stream.queued_gops.pop_back().unwrap()); + continue; + } + if !gops.is_empty() { + break; + } + + gst::error!(CAT, obj: stream.sinkpad, "Don't have a full GOP at the end of a fragment"); + } else { + gst::trace!( + CAT, + obj: stream.sinkpad, + "Chunk filled, current GOP start {} end {} (final {})", + gop.start_pts, gop.end_pts, + gop.final_end_pts || all_eos || stream.sinkpad.is_eos() + ); + } + + if gop.end_pts <= dequeue_end_pts + && (gop.final_end_pts || all_eos || stream.sinkpad.is_eos()) + { + gst::trace!( + CAT, + obj: stream.sinkpad, + "Pushing whole GOP", + ); + gops.push(stream.queued_gops.pop_back().unwrap()); + } else if gop.start_pts >= dequeue_end_pts + || (!gop.final_earliest_pts && !all_eos && !stream.sinkpad.is_eos()) + { + gst::trace!( + CAT, + obj: stream.sinkpad, + "GOP starts after chunk end", + ); + break; + } else { + let gop = stream.queued_gops.back_mut().unwrap(); + + let start_pts = gop.start_pts; + let start_dts = gop.start_dts; + let earliest_pts = gop.earliest_pts; + let earliest_pts_position = gop.earliest_pts_position; + + let mut split_index = None; + + for (idx, buffer) in gop.buffers.iter().enumerate() { + if buffer.pts >= dequeue_end_pts { + break; + } + split_index = Some(idx); + } + let split_index = match split_index { + Some(split_index) => split_index, + None => { + // We have B frames and the first buffer of this GOP is too far + // in the future. + gst::trace!( + CAT, + obj: stream.sinkpad, + "First buffer of GOP too far in the future", + ); + break; + } + }; + + // The last buffer of the GOP starts before the chunk end but ends + // after the end. We still take it here and remove the whole GOP. + if split_index == gop.buffers.len() - 1 { + if gop.final_end_pts || all_eos || stream.sinkpad.is_eos() { + gst::trace!( + CAT, + obj: stream.sinkpad, + "Pushing whole GOP", + ); + gops.push(stream.queued_gops.pop_back().unwrap()); + } else { + gst::trace!( + CAT, + obj: stream.sinkpad, + "Can't push whole GOP as it's not final yet", + ); + } + break; + } + + let mut buffers = mem::take(&mut gop.buffers); + // Contains all buffers from `split_index + 1` to the end + gop.buffers = buffers.split_off(split_index + 1); + + gop.start_pts = gop.buffers[0].pts; + gop.start_dts = gop.buffers[0].dts; + gop.earliest_pts_position = gop.buffers[0].pts_position; + gop.earliest_pts = gop.buffers[0].pts; + + gst::trace!( + CAT, + obj: stream.sinkpad, + "Splitting GOP and keeping PTS {}", + gop.buffers[0].pts, + ); + + let queue_gop = Gop { + start_pts, + start_dts, + earliest_pts, + final_earliest_pts: true, + end_pts: gop.start_pts, + final_end_pts: true, + end_dts: gop.start_dts, + earliest_pts_position, + buffers, + }; + + gops.push(queue_gop); + break; + } + } + + if fragment_start { + if let Some(first_buffer) = gops.first().and_then(|gop| gop.buffers.first()) { + if first_buffer + .buffer + .flags() + .contains(gst::BufferFlags::DELTA_UNIT) + { + gst::error!(CAT, obj: stream.sinkpad, "First buffer of a new fragment is not a keyframe"); + } + } + } + } else { + // Non-chunk mode + + let dequeue_end_pts = if let Some(chunk_end_pts) = chunk_end_pts { + // Not the first stream + chunk_end_pts + } else { + fragment_start_pts + settings.fragment_duration + }; + + gst::trace!( + CAT, + obj: stream.sinkpad, + "Draining up to end PTS {} / duration {}", + dequeue_end_pts, + dequeue_end_pts - chunk_start_pts + ); + + while let Some(gop) = stream.queued_gops.back() { + gst::trace!( + CAT, + obj: stream.sinkpad, + "Current GOP start {} end {} (final {})", + gop.start_pts, gop.end_pts, + gop.final_end_pts || all_eos || stream.sinkpad.is_eos() + ); + + // If this GOP is not complete then we can't pop it yet. + // + // If there was no complete GOP at all yet then it might be bigger than the + // fragment duration. In this case we might not be able to handle the latency + // requirements in a live pipeline. + if !gop.final_end_pts && !all_eos && !stream.sinkpad.is_eos() { + gst::trace!( + CAT, + obj: stream.sinkpad, + "Not including GOP without final end PTS", + ); + break; + } + + // If this GOP starts after the fragment end then don't dequeue it yet unless this is + // the first stream and no GOPs were dequeued at all yet. This would mean that the + // GOP is bigger than the fragment duration. + if !all_eos + && gop.end_pts > dequeue_end_pts + && (chunk_end_pts.is_some() || !gops.is_empty()) + { + gst::trace!( + CAT, + obj: stream.sinkpad, + "Not including GOP yet", + ); + break; + } + + gst::trace!( + CAT, + obj: stream.sinkpad, + "Pushing complete GOP", + ); + gops.push(stream.queued_gops.pop_back().unwrap()); + } + } + + Ok(gops) + } + + /// Flatten all GOPs, remove any gaps and calculate durations. + #[allow(clippy::type_complexity)] + fn flatten_gops( + &self, + idx: usize, + stream: &Stream, + gops: Vec, + ) -> Result< + Option<( + // All buffers of the GOPs without gaps + VecDeque, + // Earliest PTS + gst::ClockTime, + // Earliest PTS position + gst::ClockTime, + // End PTS + gst::ClockTime, + // Start DTS + Option, + // Start DTS position + Option, + // End DTS + Option, + )>, + gst::FlowError, + > { + let last_gop = gops.last().unwrap(); + let end_pts = last_gop.end_pts; + let end_dts = last_gop.end_dts; + + let mut gop_buffers = Vec::with_capacity(gops.iter().map(|g| g.buffers.len()).sum()); + gop_buffers.extend(gops.into_iter().flat_map(|gop| gop.buffers.into_iter())); + + // Then calculate durations for all of the buffers and get rid of any GAP buffers in + // the process. + // Also calculate the earliest PTS / start DTS here, which needs to consider GAP + // buffers too. + let mut buffers = VecDeque::with_capacity(gop_buffers.len()); + let mut earliest_pts = None; + let mut earliest_pts_position = None; + let mut start_dts = None; + let mut start_dts_position = None; + + let mut gop_buffers = gop_buffers.into_iter(); + while let Some(buffer) = gop_buffers.next() { + // If this is a GAP buffer then skip it. Its duration was already considered + // below for the non-GAP buffer preceding it, and if there was none then the + // chunk start would be adjusted accordingly for this stream. + if buffer.buffer.flags().contains(gst::BufferFlags::GAP) + && buffer.buffer.flags().contains(gst::BufferFlags::DROPPABLE) + && buffer.buffer.size() == 0 + { + gst::trace!( + CAT, + obj: stream.sinkpad, + "Skipping gap buffer {buffer:?}", + ); + continue; + } + + if earliest_pts.map_or(true, |earliest_pts| buffer.pts < earliest_pts) { + earliest_pts = Some(buffer.pts); + } + if earliest_pts_position.map_or(true, |earliest_pts_position| { + buffer.buffer.pts().unwrap() < earliest_pts_position + }) { + earliest_pts_position = Some(buffer.buffer.pts().unwrap()); + } + if stream.delta_frames.requires_dts() && start_dts.is_none() { + start_dts = Some(buffer.dts.unwrap()); + } + if stream.delta_frames.requires_dts() && start_dts_position.is_none() { + start_dts_position = Some(buffer.buffer.dts().unwrap()); + } + + let timestamp = if !stream.delta_frames.requires_dts() { + buffer.pts + } else { + buffer.dts.unwrap() + }; + + // Take as end timestamp the timestamp of the next non-GAP buffer + let end_timestamp = match gop_buffers.as_slice().iter().find(|buf| { + !buf.buffer.flags().contains(gst::BufferFlags::GAP) + || !buf.buffer.flags().contains(gst::BufferFlags::DROPPABLE) + || buf.buffer.size() != 0 + }) { + Some(buffer) => { + if !stream.delta_frames.requires_dts() { + buffer.pts + } else { + buffer.dts.unwrap() + } + } + None => { + if !stream.delta_frames.requires_dts() { + end_pts + } else { + end_dts.unwrap() + } + } + }; + + // Timestamps are enforced to monotonically increase when queueing buffers + let duration = end_timestamp + .checked_sub(timestamp) + .expect("Timestamps going backwards"); + + let composition_time_offset = if !stream.delta_frames.requires_dts() { + None + } else { + let pts = buffer.pts; + let dts = buffer.dts.unwrap(); + + Some( + i64::try_from( + (gst::Signed::Positive(pts) - gst::Signed::Positive(dts)).nseconds(), + ) + .map_err(|_| { + gst::error!(CAT, obj: stream.sinkpad, "Too big PTS/DTS difference"); + gst::FlowError::Error + })?, + ) + }; + + buffers.push_back(Buffer { + idx, + buffer: buffer.buffer, + timestamp, + duration, + composition_time_offset, + }); + } + + if buffers.is_empty() { + return Ok(None); + } + + let earliest_pts = earliest_pts.unwrap(); + let earliest_pts_position = earliest_pts_position.unwrap(); + if stream.delta_frames.requires_dts() { + assert!(start_dts.is_some()); + assert!(start_dts_position.is_some()); + } + let start_dts = start_dts; + let start_dts_position = start_dts_position; + + Ok(Some(( + buffers, + earliest_pts, + earliest_pts_position, + end_pts, + start_dts, + start_dts_position, + end_dts, + ))) + } + + /// Drain buffers from all streams for the current chunk. + /// + /// Also removes gap buffers, calculates buffer durations and various timestamps relevant for + /// the current chunk. #[allow(clippy::type_complexity)] fn drain_buffers( &self, state: &mut State, settings: &Settings, timeout: bool, - at_eos: bool, + all_eos: bool, ) -> Result< ( // Drained streams @@ -1219,7 +1687,6 @@ impl FMP4Mux { let mut min_earliest_pts = None; let mut min_start_dts_position = None; let mut chunk_end_pts = None; - let mut fragment_start = false; // In fragment mode, each chunk is a full fragment. Otherwise, in chunk mode, // this fragment is filled if it is filled for the first non-EOS stream @@ -1231,6 +1698,10 @@ impl FMP4Mux { .map(|s| s.fragment_filled) == Some(true); + let fragment_start_pts = state.fragment_start_pts.unwrap(); + let chunk_start_pts = state.chunk_start_pts.unwrap(); + let fragment_start = fragment_start_pts == chunk_start_pts; + // The first stream decides how much can be dequeued, if anything at all. // // In chunk mode: @@ -1255,277 +1726,27 @@ impl FMP4Mux { CAT, imp: self, "Starting to drain at {} (fragment start {}, fragment end {}, chunk start {}, chunk end {})", - state.chunk_start_pts.display(), - state.fragment_start_pts.display(), - state.fragment_start_pts.map(|start| start + settings.fragment_duration).display(), - state.chunk_start_pts.display(), - Option::zip(state.chunk_start_pts, settings.chunk_duration).map(|(start, duration)| start + duration).display(), + chunk_start_pts, + fragment_start_pts, + fragment_start_pts + settings.fragment_duration, + chunk_start_pts.display(), + settings.chunk_duration.map(|duration| chunk_start_pts + duration).display(), ); for (idx, stream) in state.streams.iter_mut().enumerate() { let stream_settings = stream.sinkpad.imp().settings.lock().unwrap().clone(); - assert!( - timeout - || at_eos - || stream.sinkpad.is_eos() - || stream.queued_gops.get(1).map(|gop| gop.final_earliest_pts) == Some(true) - || settings.chunk_duration.is_some() - ); - - let mut gops = Vec::with_capacity(stream.queued_gops.len()); - if !stream.queued_gops.is_empty() { - let fragment_start_pts = state.fragment_start_pts.unwrap(); - let chunk_start_pts = state.chunk_start_pts.unwrap(); - - // For the first stream drain as much as necessary and decide the end of this - // fragment or chunk, for all other streams drain up to that position. - if let Some(chunk_duration) = settings.chunk_duration { - let dequeue_end_pts = if let Some(chunk_end_pts) = chunk_end_pts { - // Not the first stream - chunk_end_pts - } else if fragment_filled { - // Fragment is filled, so only dequeue everything until the latest GOP - fragment_start_pts + settings.fragment_duration - } else { - // Fragment is not filled and we either have a full chunk or timeout - chunk_start_pts + chunk_duration - }; - - gst::trace!( - CAT, - obj: stream.sinkpad, - "Draining up to end PTS {} / duration {}", - dequeue_end_pts, - dequeue_end_pts - chunk_start_pts - ); - - while let Some(gop) = stream.queued_gops.back() { - // If this should be the last chunk of a fragment then only drain every - // finished GOP until the chunk end PTS. If there is no finished GOP for - // this stream (it would be not the first stream then), then drain - // everything up to the chunk end PTS. - // - // If this chunk is not the last chunk of a fragment then simply dequeue - // everything up to the chunk end PTS. - if fragment_filled { - gst::trace!( - CAT, - obj: stream.sinkpad, - "Fragment filled, current GOP start {} end {} (final {})", - gop.start_pts, gop.end_pts, - gop.final_end_pts || at_eos || stream.sinkpad.is_eos() - ); - - if (gop.final_end_pts || at_eos || stream.sinkpad.is_eos()) - && gop.end_pts <= dequeue_end_pts - { - gst::trace!( - CAT, - obj: stream.sinkpad, - "Pushing whole GOP", - ); - gops.push(stream.queued_gops.pop_back().unwrap()); - continue; - } - if !gops.is_empty() { - break; - } - - gst::error!(CAT, obj: stream.sinkpad, "Don't have a full GOP at the end of a fragment"); - } else { - gst::trace!( - CAT, - obj: stream.sinkpad, - "Chunk filled, current GOP start {} end {} (final {})", - gop.start_pts, gop.end_pts, - gop.final_end_pts || at_eos || stream.sinkpad.is_eos() - ); - } - - if gop.end_pts <= dequeue_end_pts - && (gop.final_end_pts || at_eos || stream.sinkpad.is_eos()) - { - gst::trace!( - CAT, - obj: stream.sinkpad, - "Pushing whole GOP", - ); - gops.push(stream.queued_gops.pop_back().unwrap()); - } else if gop.start_pts >= dequeue_end_pts - || (!gop.final_earliest_pts && !at_eos && !stream.sinkpad.is_eos()) - { - gst::trace!( - CAT, - obj: stream.sinkpad, - "GOP starts after chunk end", - ); - break; - } else { - let gop = stream.queued_gops.back_mut().unwrap(); - - let start_pts = gop.start_pts; - let start_dts = gop.start_dts; - let earliest_pts = gop.earliest_pts; - let earliest_pts_position = gop.earliest_pts_position; - - let mut split_index = None; - - for (idx, buffer) in gop.buffers.iter().enumerate() { - if buffer.pts >= dequeue_end_pts { - break; - } - split_index = Some(idx); - } - let split_index = match split_index { - Some(split_index) => split_index, - None => { - // We have B frames and the first buffer of this GOP is too far - // in the future. - gst::trace!( - CAT, - obj: stream.sinkpad, - "First buffer of GOP too far in the future", - ); - break; - } - }; - - // The last buffer of the GOP starts before the chunk end but ends - // after the end. We still take it here and remove the whole GOP. - if split_index == gop.buffers.len() - 1 { - if gop.final_end_pts || at_eos || stream.sinkpad.is_eos() { - gst::trace!( - CAT, - obj: stream.sinkpad, - "Pushing whole GOP", - ); - gops.push(stream.queued_gops.pop_back().unwrap()); - } else { - gst::trace!( - CAT, - obj: stream.sinkpad, - "Can't push whole GOP as it's not final yet", - ); - } - break; - } - - let mut buffers = mem::take(&mut gop.buffers); - // Contains all buffers from `split_index + 1` to the end - gop.buffers = buffers.split_off(split_index + 1); - - gop.start_pts = gop.buffers[0].pts; - gop.start_dts = gop.buffers[0].dts; - gop.earliest_pts_position = gop.buffers[0].pts_position; - gop.earliest_pts = gop.buffers[0].pts; - - gst::trace!( - CAT, - obj: stream.sinkpad, - "Splitting GOP and keeping PTS {}", - gop.buffers[0].pts, - ); - - let queue_gop = Gop { - start_pts, - start_dts, - earliest_pts, - final_earliest_pts: true, - end_pts: gop.start_pts, - final_end_pts: true, - end_dts: gop.start_dts, - earliest_pts_position, - buffers, - }; - - gops.push(queue_gop); - break; - } - } - - fragment_start = fragment_start_pts == chunk_start_pts; - if fragment_start { - if let Some(first_buffer) = gops.first().and_then(|gop| gop.buffers.first()) - { - if first_buffer - .buffer - .flags() - .contains(gst::BufferFlags::DELTA_UNIT) - { - gst::error!(CAT, obj: stream.sinkpad, "First buffer of a new fragment is not a keyframe"); - } - } - } - } else { - let dequeue_end_pts = if let Some(chunk_end_pts) = chunk_end_pts { - // Not the first stream - chunk_end_pts - } else { - fragment_start_pts + settings.fragment_duration - }; - - gst::trace!( - CAT, - obj: stream.sinkpad, - "Draining up to end PTS {} / duration {}", - dequeue_end_pts, - dequeue_end_pts - chunk_start_pts - ); - - while let Some(gop) = stream.queued_gops.back() { - gst::trace!( - CAT, - obj: stream.sinkpad, - "Current GOP start {} end {} (final {})", - gop.start_pts, gop.end_pts, - gop.final_end_pts || at_eos || stream.sinkpad.is_eos() - ); - - // If this GOP is not complete then we can't pop it yet. - // - // If there was no complete GOP at all yet then it might be bigger than the - // fragment duration. In this case we might not be able to handle the latency - // requirements in a live pipeline. - if !gop.final_end_pts && !at_eos && !stream.sinkpad.is_eos() { - gst::trace!( - CAT, - obj: stream.sinkpad, - "Not including GOP without final end PTS", - ); - break; - } - - // If this GOP starts after the fragment end then don't dequeue it yet unless this is - // the first stream and no GOPs were dequeued at all yet. This would mean that the - // GOP is bigger than the fragment duration. - if !at_eos - && gop.end_pts > dequeue_end_pts - && (chunk_end_pts.is_some() || !gops.is_empty()) - { - gst::trace!( - CAT, - obj: stream.sinkpad, - "Not including GOP yet", - ); - break; - } - - gst::trace!( - CAT, - obj: stream.sinkpad, - "Pushing complete GOP", - ); - gops.push(stream.queued_gops.pop_back().unwrap()); - } - - // If we don't have a next chunk start PTS then this is the first stream as above. - if chunk_end_pts.is_none() { - // In fragment mode, each chunk is a full fragment - fragment_start = true; - } - } - } + let gops = self.drain_buffers_one_stream( + settings, + stream, + timeout, + all_eos, + fragment_start_pts, + chunk_start_pts, + chunk_end_pts, + fragment_start, + fragment_filled, + )?; stream.fragment_filled = false; stream.chunk_filled = false; @@ -1543,7 +1764,7 @@ impl FMP4Mux { } else { // If nothing was dequeued for the first stream then this is OK if we're at // EOS: we just consider the next stream as first stream then. - if at_eos || stream.sinkpad.is_eos() { + if all_eos || stream.sinkpad.is_eos() { // This is handled below generally if nothing was dequeued } else { // Otherwise this can only really happen on timeout in live pipelines. @@ -1568,7 +1789,7 @@ impl FMP4Mux { return Err(gst_base::AGGREGATOR_FLOW_NEED_DATA); } } - } else if at_eos { + } else if all_eos { if let Some(last_gop) = gops.last() { if chunk_end_pts.map_or(true, |chunk_end_pts| chunk_end_pts < last_gop.end_pts) { @@ -1620,143 +1841,38 @@ impl FMP4Mux { .unwrap_or(gst::ClockTime::ZERO) ); - let last_gop = gops.last().unwrap(); - let end_pts = last_gop.end_pts; - let end_dts = last_gop.end_dts; - // First flatten all GOPs into a single `Vec` - let mut gop_buffers = Vec::with_capacity(gops.iter().map(|g| g.buffers.len()).sum()); - gop_buffers.extend(gops.into_iter().flat_map(|gop| gop.buffers.into_iter())); - - // Then calculate durations for all of the buffers and get rid of any GAP buffers in - // the process. - // Also calculate the earliest PTS / start DTS here, which needs to consider GAP - // buffers too. - let mut buffers = VecDeque::with_capacity(gop_buffers.len()); - let mut earliest_pts = None; - let mut earliest_pts_position = None; - let mut start_dts = None; - let mut start_dts_position = None; - - let mut gop_buffers = gop_buffers.into_iter(); - while let Some(buffer) = gop_buffers.next() { - // If this is a GAP buffer then skip it. Its duration was already considered - // below for the non-GAP buffer preceding it, and if there was none then the - // chunk start would be adjusted accordingly for this stream. - if buffer.buffer.flags().contains(gst::BufferFlags::GAP) - && buffer.buffer.flags().contains(gst::BufferFlags::DROPPABLE) - && buffer.buffer.size() == 0 - { - gst::trace!( + let buffers = self.flatten_gops(idx, stream, gops)?; + let ( + buffers, + earliest_pts, + earliest_pts_position, + end_pts, + start_dts, + start_dts_position, + _end_dts, + ) = match buffers { + Some(res) => res, + None => { + gst::info!( CAT, obj: stream.sinkpad, - "Skipping gap buffer {buffer:?}", + "Drained only gap buffers", ); + + drained_streams.push(( + super::FragmentHeaderStream { + caps: stream.caps.clone(), + start_time: None, + delta_frames: stream.delta_frames, + trak_timescale: stream_settings.trak_timescale, + }, + VecDeque::new(), + )); + continue; } - - if earliest_pts.map_or(true, |earliest_pts| buffer.pts < earliest_pts) { - earliest_pts = Some(buffer.pts); - } - if earliest_pts_position.map_or(true, |earliest_pts_position| { - buffer.buffer.pts().unwrap() < earliest_pts_position - }) { - earliest_pts_position = Some(buffer.buffer.pts().unwrap()); - } - if stream.delta_frames.requires_dts() && start_dts.is_none() { - start_dts = Some(buffer.dts.unwrap()); - } - if stream.delta_frames.requires_dts() && start_dts_position.is_none() { - start_dts_position = Some(buffer.buffer.dts().unwrap()); - } - - let timestamp = if !stream.delta_frames.requires_dts() { - buffer.pts - } else { - buffer.dts.unwrap() - }; - - // Take as end timestamp the timestamp of the next non-GAP buffer - let end_timestamp = match gop_buffers.as_slice().iter().find(|buf| { - !buf.buffer.flags().contains(gst::BufferFlags::GAP) - || !buf.buffer.flags().contains(gst::BufferFlags::DROPPABLE) - || buf.buffer.size() != 0 - }) { - Some(buffer) => { - if !stream.delta_frames.requires_dts() { - buffer.pts - } else { - buffer.dts.unwrap() - } - } - None => { - if !stream.delta_frames.requires_dts() { - end_pts - } else { - end_dts.unwrap() - } - } - }; - - // Timestamps are enforced to monotonically increase when queueing buffers - let duration = end_timestamp - .checked_sub(timestamp) - .expect("Timestamps going backwards"); - - let composition_time_offset = if !stream.delta_frames.requires_dts() { - None - } else { - let pts = buffer.pts; - let dts = buffer.dts.unwrap(); - - Some( - i64::try_from( - (gst::Signed::Positive(pts) - gst::Signed::Positive(dts)).nseconds(), - ) - .map_err(|_| { - gst::error!(CAT, obj: stream.sinkpad, "Too big PTS/DTS difference"); - gst::FlowError::Error - })?, - ) - }; - - buffers.push_back(Buffer { - idx, - buffer: buffer.buffer, - timestamp, - duration, - composition_time_offset, - }); - } - - if buffers.is_empty() { - gst::info!( - CAT, - obj: stream.sinkpad, - "Drained only gap buffers", - ); - - drained_streams.push(( - super::FragmentHeaderStream { - caps: stream.caps.clone(), - start_time: None, - delta_frames: stream.delta_frames, - trak_timescale: stream_settings.trak_timescale, - }, - VecDeque::new(), - )); - - continue; - } - - let earliest_pts = earliest_pts.unwrap(); - let earliest_pts_position = earliest_pts_position.unwrap(); - if stream.delta_frames.requires_dts() { - assert!(start_dts.is_some()); - assert!(start_dts_position.is_some()); - } - let start_dts = start_dts; - let start_dts_position = start_dts_position; + }; gst::info!( CAT, @@ -1814,6 +1930,7 @@ impl FMP4Mux { )) } + /// Interleave drained buffers of each stream for this chunk according to the settings. #[allow(clippy::type_complexity)] fn interleave_buffers( &self, @@ -1881,10 +1998,79 @@ impl FMP4Mux { Ok((interleaved_buffers, streams)) } + /// Request a force-keyunit event for the start of the next fragment. + /// + /// This is called whenever the last chunk of a fragment is pushed out. + /// + /// `chunk_end_pts` gives the time of the previously drained chunk, which + /// ideally should be lower than the next fragment starts PTS. + fn request_force_keyunit_event( + &self, + state: &State, + settings: &Settings, + upstream_events: &mut Vec<(super::FMP4MuxPad, gst::Event)>, + chunk_end_pts: gst::ClockTime, + ) { + let fku_time = chunk_end_pts + settings.fragment_duration; + + for stream in &state.streams { + let current_position = stream.current_position; + + // In case of ONVIF this needs to be converted back from UTC time to + // the stream's running time + let (fku_time, current_position) = + if self.obj().class().as_ref().variant == super::Variant::ONVIF { + ( + if let Some(fku_time) = utc_time_to_running_time( + fku_time, + stream.running_time_utc_time_mapping.unwrap(), + ) { + fku_time + } else { + continue; + }, + utc_time_to_running_time( + current_position, + stream.running_time_utc_time_mapping.unwrap(), + ), + ) + } else { + (fku_time, Some(current_position)) + }; + + let fku_time = + if current_position.map_or(false, |current_position| current_position > fku_time) { + gst::warning!( + CAT, + obj: stream.sinkpad, + "Sending force-keyunit event late for running time {} at {}", + fku_time, + current_position.display(), + ); + None + } else { + gst::debug!( + CAT, + obj: stream.sinkpad, + "Sending force-keyunit event for running time {}", + fku_time, + ); + Some(fku_time) + }; + + let fku = gst_video::UpstreamForceKeyUnitEvent::builder() + .running_time(fku_time) + .all_headers(true) + .build(); + + upstream_events.push((stream.sinkpad.clone(), fku)); + } + } + /// Fills upstream events as needed and returns the caps the first time draining can happen. /// /// If it returns `(_, None)` then there's currently nothing to drain anymore. - fn drain( + fn drain_one_chunk( &self, state: &mut State, settings: &Settings, @@ -1948,213 +2134,254 @@ impl FMP4Mux { } } - let mut buffer_list = None; if interleaved_buffers.is_empty() { assert!(at_eos); - } else { - // If there are actual buffers to output then create headers as needed and create a - // bufferlist for all buffers that have to be output. - let min_earliest_pts_position = min_earliest_pts_position.unwrap(); - let min_earliest_pts = min_earliest_pts.unwrap(); - let chunk_end_pts = chunk_end_pts.unwrap(); + return Ok((caps, None)); + } - let mut fmp4_header = None; - if !state.sent_headers { - let mut buffer = state.stream_header.as_ref().unwrap().copy(); - { - let buffer = buffer.get_mut().unwrap(); - - buffer.set_pts(min_earliest_pts_position); - buffer.set_dts(min_start_dts_position); - - // Header is DISCONT|HEADER - buffer.set_flags(gst::BufferFlags::DISCONT | gst::BufferFlags::HEADER); - } - - fmp4_header = Some(buffer); - - state.sent_headers = true; - } - - // TODO: Write prft boxes before moof - // TODO: Write sidx boxes before moof and rewrite once offsets are known - - if state.sequence_number == 0 { - state.sequence_number = 1; - } - let sequence_number = state.sequence_number; - // If this is the last chunk of a fragment then increment the sequence number for the - // start of the next fragment. - if fragment_filled { - state.sequence_number += 1; - } - let (mut fmp4_fragment_header, moof_offset) = - boxes::create_fmp4_fragment_header(super::FragmentHeaderConfiguration { - variant: self.obj().class().as_ref().variant, - sequence_number, - chunk: !fragment_start, - streams: streams.as_slice(), - buffers: interleaved_buffers.as_slice(), - }) - .map_err(|err| { - gst::error!( - CAT, - imp: self, - "Failed to create FMP4 fragment header: {}", - err - ); - gst::FlowError::Error - })?; + // If there are actual buffers to output then create headers as needed and create a + // bufferlist for all buffers that have to be output. + let min_earliest_pts_position = min_earliest_pts_position.unwrap(); + let min_earliest_pts = min_earliest_pts.unwrap(); + let chunk_end_pts = chunk_end_pts.unwrap(); + let mut fmp4_header = None; + if !state.sent_headers { + let mut buffer = state.stream_header.as_ref().unwrap().copy(); { - let buffer = fmp4_fragment_header.get_mut().unwrap(); + let buffer = buffer.get_mut().unwrap(); + buffer.set_pts(min_earliest_pts_position); buffer.set_dts(min_start_dts_position); - buffer.set_duration(chunk_end_pts.checked_sub(min_earliest_pts)); - // Fragment and chunk header is HEADER - buffer.set_flags(gst::BufferFlags::HEADER); - // Chunk header is DELTA_UNIT - if !fragment_start { - buffer.set_flags(gst::BufferFlags::DELTA_UNIT); - } - - // Copy metas from the first actual buffer to the fragment header. This allows - // getting things like the reference timestamp meta or the timecode meta to identify - // the fragment. - let _ = interleaved_buffers[0].buffer.copy_into( - buffer, - gst::BufferCopyFlags::META, - 0, - None, - ); + // Header is DISCONT|HEADER + buffer.set_flags(gst::BufferFlags::DISCONT | gst::BufferFlags::HEADER); } - let moof_offset = state.current_offset - + fmp4_header.as_ref().map(|h| h.size()).unwrap_or(0) as u64 - + moof_offset; + fmp4_header = Some(buffer); - let buffers_len = interleaved_buffers.len(); - for (idx, buffer) in interleaved_buffers.iter_mut().enumerate() { - // Fix up buffer flags, all other buffers are DELTA_UNIT - let buffer_ref = buffer.buffer.make_mut(); - buffer_ref.unset_flags(gst::BufferFlags::all()); - buffer_ref.set_flags(gst::BufferFlags::DELTA_UNIT); - - // Set the marker flag for the last buffer of the segment - if idx == buffers_len - 1 { - buffer_ref.set_flags(gst::BufferFlags::MARKER); - } - } - - buffer_list = Some( - fmp4_header - .into_iter() - .chain(Some(fmp4_fragment_header)) - .chain(interleaved_buffers.into_iter().map(|buffer| buffer.buffer)) - .inspect(|b| { - state.current_offset += b.size() as u64; - }) - .collect::(), - ); - - if settings.write_mfra && fragment_start { - // Write mfra only for the main stream on fragment starts, and if there are no - // buffers for the main stream in this segment then don't write anything. - if let Some(super::FragmentHeaderStream { - start_time: Some(start_time), - .. - }) = streams.get(0) - { - state.fragment_offsets.push(super::FragmentOffset { - time: *start_time, - offset: moof_offset, - }); - } - } - - state.end_pts = Some(chunk_end_pts); - - // Update for the start PTS of the next fragment / chunk - - if fragment_filled { - state.fragment_start_pts = Some(chunk_end_pts); - gst::info!(CAT, imp: self, "Starting new fragment at {}", chunk_end_pts,); - } else { - gst::info!(CAT, imp: self, "Starting new chunk at {}", chunk_end_pts,); - } - state.chunk_start_pts = Some(chunk_end_pts); - - // If the current fragment is filled we already have the next fragment's start - // keyframe and can request the following one. - if fragment_filled { - let fku_time = chunk_end_pts + settings.fragment_duration; - - for stream in &state.streams { - let current_position = stream.current_position; - - // In case of ONVIF this needs to be converted back from UTC time to - // the stream's running time - let (fku_time, current_position) = - if self.obj().class().as_ref().variant == super::Variant::ONVIF { - ( - if let Some(fku_time) = utc_time_to_running_time( - fku_time, - stream.running_time_utc_time_mapping.unwrap(), - ) { - fku_time - } else { - continue; - }, - utc_time_to_running_time( - current_position, - stream.running_time_utc_time_mapping.unwrap(), - ), - ) - } else { - (fku_time, Some(current_position)) - }; - - let fku_time = if current_position - .map_or(false, |current_position| current_position > fku_time) - { - gst::warning!( - CAT, - obj: stream.sinkpad, - "Sending force-keyunit event late for running time {} at {}", - fku_time, - current_position.display(), - ); - None - } else { - gst::debug!( - CAT, - obj: stream.sinkpad, - "Sending force-keyunit event for running time {}", - fku_time, - ); - Some(fku_time) - }; - - let fku = gst_video::UpstreamForceKeyUnitEvent::builder() - .running_time(fku_time) - .all_headers(true) - .build(); - - upstream_events.push((stream.sinkpad.clone(), fku)); - } - } - - // Reset timeout delay now that we've output an actual fragment or chunk - state.timeout_delay = gst::ClockTime::ZERO; + state.sent_headers = true; } + // TODO: Write prft boxes before moof + // TODO: Write sidx boxes before moof and rewrite once offsets are known + + // First sequence number must be 1 + if state.sequence_number == 0 { + state.sequence_number = 1; + } + let sequence_number = state.sequence_number; + // If this is the last chunk of a fragment then increment the sequence number for the + // start of the next fragment. + if fragment_filled { + state.sequence_number += 1; + } + let (mut fmp4_fragment_header, moof_offset) = + boxes::create_fmp4_fragment_header(super::FragmentHeaderConfiguration { + variant: self.obj().class().as_ref().variant, + sequence_number, + chunk: !fragment_start, + streams: streams.as_slice(), + buffers: interleaved_buffers.as_slice(), + }) + .map_err(|err| { + gst::error!( + CAT, + imp: self, + "Failed to create FMP4 fragment header: {}", + err + ); + gst::FlowError::Error + })?; + + { + let buffer = fmp4_fragment_header.get_mut().unwrap(); + buffer.set_pts(min_earliest_pts_position); + buffer.set_dts(min_start_dts_position); + buffer.set_duration(chunk_end_pts.checked_sub(min_earliest_pts)); + + // Fragment and chunk header is HEADER + buffer.set_flags(gst::BufferFlags::HEADER); + // Chunk header is DELTA_UNIT + if !fragment_start { + buffer.set_flags(gst::BufferFlags::DELTA_UNIT); + } + + // Copy metas from the first actual buffer to the fragment header. This allows + // getting things like the reference timestamp meta or the timecode meta to identify + // the fragment. + let _ = interleaved_buffers[0].buffer.copy_into( + buffer, + gst::BufferCopyFlags::META, + 0, + None, + ); + } + + let moof_offset = state.current_offset + + fmp4_header.as_ref().map(|h| h.size()).unwrap_or(0) as u64 + + moof_offset; + + let buffers_len = interleaved_buffers.len(); + for (idx, buffer) in interleaved_buffers.iter_mut().enumerate() { + // Fix up buffer flags, all other buffers are DELTA_UNIT + let buffer_ref = buffer.buffer.make_mut(); + buffer_ref.unset_flags(gst::BufferFlags::all()); + buffer_ref.set_flags(gst::BufferFlags::DELTA_UNIT); + + // Set the marker flag for the last buffer of the segment + if idx == buffers_len - 1 { + buffer_ref.set_flags(gst::BufferFlags::MARKER); + } + } + + let buffer_list = fmp4_header + .into_iter() + .chain(Some(fmp4_fragment_header)) + .chain(interleaved_buffers.into_iter().map(|buffer| buffer.buffer)) + .inspect(|b| { + state.current_offset += b.size() as u64; + }) + .collect::(); + + if settings.write_mfra && fragment_start { + // Write mfra only for the main stream on fragment starts, and if there are no + // buffers for the main stream in this segment then don't write anything. + if let Some(super::FragmentHeaderStream { + start_time: Some(start_time), + .. + }) = streams.get(0) + { + state.fragment_offsets.push(super::FragmentOffset { + time: *start_time, + offset: moof_offset, + }); + } + } + + state.end_pts = Some(chunk_end_pts); + + // Update for the start PTS of the next fragment / chunk + + if fragment_filled { + state.fragment_start_pts = Some(chunk_end_pts); + gst::info!(CAT, imp: self, "Starting new fragment at {}", chunk_end_pts,); + } else { + gst::info!(CAT, imp: self, "Starting new chunk at {}", chunk_end_pts,); + } + state.chunk_start_pts = Some(chunk_end_pts); + + // If the current fragment is filled we already have the next fragment's start + // keyframe and can request the following one. + if fragment_filled { + self.request_force_keyunit_event(state, settings, upstream_events, chunk_end_pts); + } + + // Reset timeout delay now that we've output an actual fragment or chunk + state.timeout_delay = gst::ClockTime::ZERO; + // TODO: Write edit list at EOS // TODO: Rewrite bitrates at EOS - Ok((caps, buffer_list)) + Ok((caps, Some(buffer_list))) } + /// Drain all chunks that can currently be drained. + /// + /// On error the `caps`, `buffers` or `upstream_events` can contain data of already finished + /// chunks that were complete before the error. + #[allow(clippy::too_many_arguments)] + fn drain( + &self, + state: &mut State, + settings: &Settings, + all_eos: bool, + mut timeout: bool, + caps: &mut Option, + buffers: &mut Vec, + upstream_events: &mut Vec<(super::FMP4MuxPad, gst::Event)>, + ) -> Result<(), gst::FlowError> { + // Loop as long as new chunks can be drained. + loop { + // If enough GOPs were queued, drain and create the output fragment or chunk + let res = self.drain_one_chunk(state, settings, timeout, all_eos, upstream_events); + let mut buffer_list = match res { + Ok((new_caps, buffer_list)) => { + if caps.is_none() { + *caps = new_caps; + } + + buffer_list + } + Err(err) => { + if err == gst_base::AGGREGATOR_FLOW_NEED_DATA { + assert!(!all_eos); + assert!(timeout); + gst::element_imp_warning!( + self, + gst::StreamError::Format, + ["Longer GOPs than fragment duration"] + ); + state.timeout_delay += 1.seconds(); + } + + return Err(err); + } + }; + + // If nothing can't be drained anymore then break the loop, and if all streams are + // EOS add the footers. + if buffer_list.is_none() { + if settings.write_mfra && all_eos { + gst::debug!(CAT, imp: self, "Writing mfra box"); + match boxes::create_mfra(&state.streams[0].caps, &state.fragment_offsets) { + Ok(mut mfra) => { + { + let mfra = mfra.get_mut().unwrap(); + // mfra is DELTA_UNIT like other buffers + mfra.set_flags(gst::BufferFlags::DELTA_UNIT); + } + + if buffer_list.is_none() { + buffer_list = Some(gst::BufferList::new_sized(1)); + } + buffer_list.as_mut().unwrap().get_mut().unwrap().add(mfra); + buffers.extend(buffer_list); + } + Err(err) => { + gst::error!(CAT, imp: self, "Failed to create mfra box: {}", err); + } + } + } + + break Ok(()); + } + + // Otherwise extend the list of bufferlists and check again if something can be + // drained. + buffers.extend(buffer_list); + + // Only the first iteration is considered a timeout. + timeout = false; + + let fragment_start_pts = state.fragment_start_pts; + let chunk_start_pts = state.chunk_start_pts; + for stream in &mut state.streams { + // Check if this stream is still filled enough now. + self.check_stream_filled( + settings, + stream, + fragment_start_pts, + chunk_start_pts, + all_eos, + ); + } + + // And try draining a fragment again + } + } + + /// Create all streams. fn create_streams(&self, state: &mut State) -> Result<(), gst::FlowError> { for pad in self .obj() @@ -2266,6 +2493,7 @@ impl FMP4Mux { Ok(()) } + /// Generate an updated header at the end and the corresponding caps with the new streamheader. fn update_header( &self, state: &mut State, @@ -2347,6 +2575,80 @@ impl FMP4Mux { Ok(Some((list, caps))) } + + /// Finish the stream be rewriting / updating headers. + fn finish(&self, settings: &Settings) { + // Do remaining EOS handling after the end of the stream was pushed. + gst::debug!(CAT, imp: self, "Doing EOS handling"); + + if settings.header_update_mode == super::HeaderUpdateMode::None { + // Need to output new headers if started again after EOS + self.state.lock().unwrap().sent_headers = false; + return; + } + + let updated_header = self.update_header(&mut self.state.lock().unwrap(), settings, true); + match updated_header { + Ok(Some((buffer_list, caps))) => { + match settings.header_update_mode { + super::HeaderUpdateMode::None => unreachable!(), + super::HeaderUpdateMode::Rewrite => { + let mut q = gst::query::Seeking::new(gst::Format::Bytes); + if self.obj().src_pad().peer_query(&mut q) && q.result().0 { + let aggregator = self.obj(); + + aggregator.set_src_caps(&caps); + + // Seek to the beginning with a default bytes segment + aggregator.update_segment( + &gst::FormattedSegment::::new(), + ); + + if let Err(err) = aggregator.finish_buffer_list(buffer_list) { + gst::error!( + CAT, + imp: self, + "Failed pushing updated header buffer downstream: {:?}", + err, + ); + } + } else { + gst::error!( + CAT, + imp: self, + "Can't rewrite header because downstream is not seekable" + ); + } + } + super::HeaderUpdateMode::Update => { + let aggregator = self.obj(); + + aggregator.set_src_caps(&caps); + if let Err(err) = aggregator.finish_buffer_list(buffer_list) { + gst::error!( + CAT, + imp: self, + "Failed pushing updated header buffer downstream: {:?}", + err, + ); + } + } + } + } + Ok(None) => {} + Err(err) => { + gst::error!( + CAT, + imp: self, + "Failed to generate updated header: {:?}", + err + ); + } + } + + // Need to output new headers if started again after EOS + self.state.lock().unwrap().sent_headers = false; + } } #[glib::object_subclass] @@ -2753,12 +3055,11 @@ impl AggregatorImpl for FMP4Mux { fn aggregate(&self, timeout: bool) -> Result { let settings = self.settings.lock().unwrap().clone(); - let mut upstream_events = vec![]; - let all_eos; let mut caps = None; let mut buffers = vec![]; - { + let mut upstream_events = vec![]; + let res = { let mut state = self.state.lock().unwrap(); // Create streams @@ -2766,40 +3067,15 @@ impl AggregatorImpl for FMP4Mux { self.create_streams(&mut state)?; } - // Queue buffers from all streams that are not filled for the current fragment yet - // - // Always take a buffer from the stream with the earliest queued buffer to keep the - // fill-level at all sinkpads in sync. - let fragment_start_pts = state.fragment_start_pts; - let chunk_start_pts = state.chunk_start_pts; - - while let Some((idx, stream)) = - self.find_earliest_stream(&mut state, timeout, settings.fragment_duration)? - { - let pre_queued_buffer = Self::pop_buffer( - self, - &stream.sinkpad, - &mut stream.pre_queue, - &stream.running_time_utc_time_mapping, - ); - - // Queue up the buffer and update GOP tracking state - self.queue_gops(idx, stream, pre_queued_buffer)?; - - // Check if this stream is filled enough now. - self.check_stream_filled( - &settings, - stream, - fragment_start_pts, - chunk_start_pts, - false, - ); - } + self.queue_available_buffers(&mut state, &settings, timeout)?; all_eos = state.streams.iter().all(|stream| stream.sinkpad.is_eos()); if all_eos { gst::debug!(CAT, imp: self, "All streams are EOS now"); + let fragment_start_pts = state.fragment_start_pts; + let chunk_start_pts = state.chunk_start_pts; + for stream in &mut state.streams { // Check if this stream is filled enough now that everything is EOS. self.check_stream_filled( @@ -2821,106 +3097,17 @@ impl AggregatorImpl for FMP4Mux { timeout, ); - // Loop as long as new chunks can be drained. - // Only the first iteration is considered a timeout. - let mut timeout = timeout; - loop { - // If enough GOPs were queued, drain and create the output fragment or chunk - let res = self.drain( - &mut state, - &settings, - timeout, - all_eos, - &mut upstream_events, - ); - let mut buffer_list = match res { - Ok((new_caps, buffer_list)) => { - if caps.is_none() { - caps = new_caps; - } - - buffer_list - } - Err(err) => { - if err == gst_base::AGGREGATOR_FLOW_NEED_DATA { - assert!(!all_eos); - assert!(timeout); - gst::element_imp_warning!( - self, - gst::StreamError::Format, - ["Longer GOPs than fragment duration"] - ); - state.timeout_delay += 1.seconds(); - } - - // Although we had an error, push out everything that was produced so far - drop(state); - for (sinkpad, event) in upstream_events { - sinkpad.push_event(event); - } - - if let Some(caps) = caps { - gst::debug!(CAT, imp: self, "Setting caps on source pad: {:?}", caps); - self.obj().set_src_caps(&caps); - } - - for buffer_list in buffers { - gst::trace!(CAT, imp: self, "Pushing buffer list {:?}", buffer_list); - self.obj().finish_buffer_list(buffer_list)?; - } - - return Err(err); - } - }; - - // If nothing can't be drained anymore then break the loop, and if all streams are - // EOS do EOS handling. - if buffer_list.is_none() { - if settings.write_mfra && all_eos { - gst::debug!(CAT, imp: self, "Writing mfra box"); - match boxes::create_mfra(&state.streams[0].caps, &state.fragment_offsets) { - Ok(mut mfra) => { - { - let mfra = mfra.get_mut().unwrap(); - // mfra is DELTA_UNIT like other buffers - mfra.set_flags(gst::BufferFlags::DELTA_UNIT); - } - - if buffer_list.is_none() { - buffer_list = Some(gst::BufferList::new_sized(1)); - } - buffer_list.as_mut().unwrap().get_mut().unwrap().add(mfra); - buffers.extend(buffer_list); - } - Err(err) => { - gst::error!(CAT, imp: self, "Failed to create mfra box: {}", err); - } - } - } - - break; - } - - // Otherwise extend the list of bufferlists and check again if something can be - // drained. - buffers.extend(buffer_list); - - timeout = false; - - let fragment_start_pts = state.fragment_start_pts; - let chunk_start_pts = state.chunk_start_pts; - for stream in &mut state.streams { - // Check if this stream is still filled enough now. - self.check_stream_filled( - &settings, - stream, - fragment_start_pts, - chunk_start_pts, - all_eos, - ); - } - } - } + // Drain everything that can be drained at this point + self.drain( + &mut state, + &settings, + all_eos, + timeout, + &mut caps, + &mut buffers, + &mut upstream_events, + ) + }; for (sinkpad, event) in upstream_events { sinkpad.push_event(event); @@ -2936,78 +3123,16 @@ impl AggregatorImpl for FMP4Mux { self.obj().finish_buffer_list(buffer_list)?; } + // If an error happened above while draining, return this now after pushing + // any output that was produced before the error. + res?; + if !all_eos { return Ok(gst::FlowSuccess::Ok); } - // Do remaining EOS handling after the end of the stream was pushed. - gst::debug!(CAT, imp: self, "Doing EOS handling"); - - if settings.header_update_mode != super::HeaderUpdateMode::None { - let updated_header = - self.update_header(&mut self.state.lock().unwrap(), &settings, true); - match updated_header { - Ok(Some((buffer_list, caps))) => { - match settings.header_update_mode { - super::HeaderUpdateMode::None => unreachable!(), - super::HeaderUpdateMode::Rewrite => { - let mut q = gst::query::Seeking::new(gst::Format::Bytes); - if self.obj().src_pad().peer_query(&mut q) && q.result().0 { - let aggregator = self.obj(); - - aggregator.set_src_caps(&caps); - - // Seek to the beginning with a default bytes segment - aggregator - .update_segment( - &gst::FormattedSegment::::new(), - ); - - if let Err(err) = aggregator.finish_buffer_list(buffer_list) { - gst::error!( - CAT, - imp: self, - "Failed pushing updated header buffer downstream: {:?}", - err, - ); - } - } else { - gst::error!( - CAT, - imp: self, - "Can't rewrite header because downstream is not seekable" - ); - } - } - super::HeaderUpdateMode::Update => { - let aggregator = self.obj(); - - aggregator.set_src_caps(&caps); - if let Err(err) = aggregator.finish_buffer_list(buffer_list) { - gst::error!( - CAT, - imp: self, - "Failed pushing updated header buffer downstream: {:?}", - err, - ); - } - } - } - } - Ok(None) => {} - Err(err) => { - gst::error!( - CAT, - imp: self, - "Failed to generate updated header: {:?}", - err - ); - } - } - } - - // Need to output new headers if started again after EOS - self.state.lock().unwrap().sent_headers = false; + // Finish the stream. + self.finish(&settings); Err(gst::FlowError::Eos) }