// Copyright (C) 2021 Sebastian Dröge // // This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. // If a copy of the MPL was not distributed with this file, You can obtain one at // . // // SPDX-License-Identifier: MPL-2.0 use gst::glib; use gst::prelude::*; use gst::subclass::prelude::*; use gst_base::prelude::*; use gst_base::subclass::prelude::*; use std::collections::VecDeque; use std::mem; use std::sync::Mutex; use once_cell::sync::Lazy; use super::boxes; use super::Buffer; use super::DeltaFrames; /// Offset for the segment in non-single-stream variants. const SEGMENT_OFFSET: gst::ClockTime = gst::ClockTime::from_seconds(60 * 60 * 1000); /// Offset between UNIX epoch and Jan 1 1601 epoch in seconds. /// 1601 = UNIX + UNIX_1601_OFFSET. const UNIX_1601_OFFSET: u64 = 11_644_473_600; /// Offset between NTP and UNIX epoch in seconds. /// NTP = UNIX + NTP_UNIX_OFFSET. const NTP_UNIX_OFFSET: u64 = 2_208_988_800; /// Reference timestamp meta caps for NTP timestamps. static NTP_CAPS: Lazy = Lazy::new(|| gst::Caps::builder("timestamp/x-ntp").build()); /// Reference timestamp meta caps for UNIX timestamps. static UNIX_CAPS: Lazy = Lazy::new(|| gst::Caps::builder("timestamp/x-unix").build()); /// Returns the UTC time of the buffer in the UNIX epoch. fn get_utc_time_from_buffer(buffer: &gst::BufferRef) -> Option { buffer .iter_meta::() .find_map(|meta| { if meta.reference().can_intersect(&UNIX_CAPS) { Some(meta.timestamp()) } else if meta.reference().can_intersect(&NTP_CAPS) { meta.timestamp().checked_sub(NTP_UNIX_OFFSET.seconds()) } else { None } }) } /// Converts a running time to an UTC time. fn running_time_to_utc_time( running_time: impl Into>, running_time_utc_time_mapping: ( impl Into>, impl Into>, ), ) -> Option { running_time_utc_time_mapping .1 .into() .checked_sub(running_time_utc_time_mapping.0.into()) .and_then(|res| res.checked_add(running_time.into())) .and_then(|res| res.positive()) } /// Converts an UTC time to a running time. fn utc_time_to_running_time( utc_time: gst::ClockTime, running_time_utc_time_mapping: ( impl Into>, impl Into>, ), ) -> Option { running_time_utc_time_mapping .0 .into() .checked_sub(running_time_utc_time_mapping.1.into()) .and_then(|res| res.checked_add_unsigned(utc_time)) .and_then(|res| res.positive()) } static CAT: Lazy = Lazy::new(|| { gst::DebugCategory::new( "fmp4mux", gst::DebugColorFlags::empty(), Some("FMP4Mux Element"), ) }); const DEFAULT_FRAGMENT_DURATION: gst::ClockTime = gst::ClockTime::from_seconds(10); const DEFAULT_CHUNK_DURATION: Option = gst::ClockTime::NONE; const DEFAULT_HEADER_UPDATE_MODE: super::HeaderUpdateMode = super::HeaderUpdateMode::None; const DEFAULT_WRITE_MFRA: bool = false; const DEFAULT_WRITE_MEHD: bool = false; const DEFAULT_INTERLEAVE_BYTES: Option = None; const DEFAULT_INTERLEAVE_TIME: Option = Some(gst::ClockTime::from_mseconds(250)); #[derive(Debug, Clone)] struct Settings { fragment_duration: gst::ClockTime, chunk_duration: Option, header_update_mode: super::HeaderUpdateMode, write_mfra: bool, write_mehd: bool, interleave_bytes: Option, interleave_time: Option, movie_timescale: u32, offset_to_zero: bool, } impl Default for Settings { fn default() -> Self { Settings { fragment_duration: DEFAULT_FRAGMENT_DURATION, chunk_duration: DEFAULT_CHUNK_DURATION, header_update_mode: DEFAULT_HEADER_UPDATE_MODE, write_mfra: DEFAULT_WRITE_MFRA, write_mehd: DEFAULT_WRITE_MEHD, interleave_bytes: DEFAULT_INTERLEAVE_BYTES, interleave_time: DEFAULT_INTERLEAVE_TIME, movie_timescale: 0, offset_to_zero: false, } } } #[derive(Debug, Clone)] struct PreQueuedBuffer { /// Buffer /// /// Buffer PTS/DTS are updated to the output segment in multi-stream configurations. buffer: gst::Buffer, /// PTS /// /// In ONVIF mode this is the UTC time, otherwise it is the PTS running time. pts: gst::ClockTime, /// End PTS /// /// In ONVIF mode this is the UTC time, otherwise it is the PTS running time. end_pts: gst::ClockTime, /// DTS /// /// In ONVIF mode this is the UTC time, otherwise it is the DTS running time. dts: Option>, /// End DTS /// /// In ONVIF mode this is the UTC time, otherwise it is the DTS running time. end_dts: Option>, } #[derive(Debug)] struct GopBuffer { buffer: gst::Buffer, pts: gst::ClockTime, pts_position: gst::ClockTime, dts: Option, } #[derive(Debug)] struct Gop { /// Start PTS. start_pts: gst::ClockTime, /// Start DTS. start_dts: Option, /// Earliest PTS. earliest_pts: gst::ClockTime, /// Once this is known to be the final earliest PTS/DTS final_earliest_pts: bool, /// PTS plus duration of last buffer, or start of next GOP end_pts: gst::ClockTime, /// Once this is known to be the final end PTS/DTS final_end_pts: bool, /// DTS plus duration of last buffer, or start of next GOP end_dts: Option, /// Earliest PTS buffer position earliest_pts_position: gst::ClockTime, /// Buffer, PTS running time, DTS running time buffers: Vec, } struct Stream { /// Sink pad for this stream. sinkpad: super::FMP4MuxPad, /// Pre-queue for ONVIF variant to timestamp all buffers with their UTC time. /// /// In non-ONVIF mode this just collects the PTS/DTS and the corresponding running /// times for later usage. pre_queue: VecDeque, /// Currently configured caps for this stream. caps: gst::Caps, /// Whether this stream is intra-only and has frame reordering. delta_frames: DeltaFrames, /// Currently queued GOPs, including incomplete ones. queued_gops: VecDeque, /// Whether the fully queued GOPs are filling a whole fragment. fragment_filled: bool, /// Whether a whole chunk is queued. chunk_filled: bool, /// Difference between the first DTS and 0 in case of negative DTS dts_offset: Option, /// Current position (DTS, or PTS for intra-only) to prevent /// timestamps from going backwards when queueing new buffers current_position: gst::ClockTime, /// Mapping between running time and UTC time in ONVIF mode. running_time_utc_time_mapping: Option<(gst::Signed, gst::ClockTime)>, } #[derive(Default)] struct State { /// Currently configured streams. streams: Vec, /// Stream header with ftyp and moov box. /// /// Created once we received caps and kept up to date with the caps, /// sent as part of the buffer list for the first fragment. stream_header: Option, /// Sequence number of the current fragment. sequence_number: u32, /// Fragment tracking for mfra box current_offset: u64, fragment_offsets: Vec, /// Earliest PTS of the whole stream earliest_pts: Option, /// Current end PTS of the whole stream end_pts: Option, /// Start DTS of the whole stream start_dts: Option, /// Start PTS of the current fragment fragment_start_pts: Option, /// Start PTS of the current chunk /// /// This is equal to `fragment_start_pts` if the current chunk is the first of a fragment, /// and always equal to `fragment_start_pts` if no `chunk_duration` is set. chunk_start_pts: Option, /// Additional timeout delay in case GOPs are bigger than the fragment duration timeout_delay: gst::ClockTime, /// If headers (ftyp / moov box) were sent. sent_headers: bool, } #[derive(Default)] pub(crate) struct FMP4Mux { state: Mutex, settings: Mutex, } impl FMP4Mux { /// Checks if a buffer is valid according to the stream configuration. fn check_buffer( buffer: &gst::BufferRef, sinkpad: &super::FMP4MuxPad, delta_frames: super::DeltaFrames, ) -> Result<(), gst::FlowError> { if delta_frames.requires_dts() && buffer.dts().is_none() { gst::error!(CAT, obj: sinkpad, "Require DTS for video streams"); return Err(gst::FlowError::Error); } if buffer.pts().is_none() { gst::error!(CAT, obj: sinkpad, "Require timestamped buffers"); return Err(gst::FlowError::Error); } if delta_frames.intra_only() && buffer.flags().contains(gst::BufferFlags::DELTA_UNIT) { gst::error!(CAT, obj: sinkpad, "Intra-only stream with delta units"); return Err(gst::FlowError::Error); } Ok(()) } fn peek_buffer( &self, sinkpad: &super::FMP4MuxPad, delta_frames: super::DeltaFrames, pre_queue: &mut VecDeque, running_time_utc_time_mapping: &mut Option<(gst::Signed, gst::ClockTime)>, fragment_duration: gst::ClockTime, ) -> Result, gst::FlowError> { // If not in ONVIF mode or the mapping is already known and there is a pre-queued buffer // then we can directly return it from here. if self.obj().class().as_ref().variant != super::Variant::ONVIF || running_time_utc_time_mapping.is_some() { if let Some(pre_queued_buffer) = pre_queue.front() { return Ok(Some(pre_queued_buffer.clone())); } } // Pop buffer here, it will be stored in the pre-queue after calculating its timestamps let mut buffer = match sinkpad.pop_buffer() { None => return Ok(None), Some(buffer) => buffer, }; Self::check_buffer(&buffer, sinkpad, delta_frames)?; let segment = match sinkpad.segment().downcast::().ok() { Some(segment) => segment, None => { gst::error!(CAT, obj: sinkpad, "Got buffer before segment"); return Err(gst::FlowError::Error); } }; let pts_position = buffer.pts().unwrap(); let duration = buffer.duration(); let end_pts_position = duration.opt_add(pts_position).unwrap_or(pts_position); let pts = segment .to_running_time_full(pts_position) .ok_or_else(|| { gst::error!(CAT, obj: sinkpad, "Couldn't convert PTS to running time"); gst::FlowError::Error })? .positive() .unwrap_or_else(|| { gst::warning!(CAT, obj: sinkpad, "Negative PTSs are not supported"); gst::ClockTime::ZERO }); let end_pts = segment .to_running_time_full(end_pts_position) .ok_or_else(|| { gst::error!( CAT, obj: sinkpad, "Couldn't convert end PTS to running time" ); gst::FlowError::Error })? .positive() .unwrap_or_else(|| { gst::warning!(CAT, obj: sinkpad, "Negative PTSs are not supported"); gst::ClockTime::ZERO }); let (dts, end_dts) = if !delta_frames.requires_dts() { (None, None) } else { // Negative DTS are handled via the dts_offset and by having negative composition time // offsets in the `trun` box. The smallest DTS here is shifted to zero. let dts_position = buffer.dts().expect("not DTS"); let end_dts_position = duration.opt_add(dts_position).unwrap_or(dts_position); let dts = segment.to_running_time_full(dts_position).ok_or_else(|| { gst::error!(CAT, obj: sinkpad, "Couldn't convert DTS to running time"); gst::FlowError::Error })?; let end_dts = segment .to_running_time_full(end_dts_position) .ok_or_else(|| { gst::error!( CAT, obj: sinkpad, "Couldn't convert end DTS to running time" ); gst::FlowError::Error })?; let end_dts = std::cmp::max(end_dts, dts); (Some(dts), Some(end_dts)) }; // If this is a multi-stream element then we need to update the PTS/DTS positions according // to the output segment, specifically to re-timestamp them with the running time and // adjust for the segment shift to compensate for negative DTS. if !self.obj().class().as_ref().variant.is_single_stream() { let pts_position = pts + SEGMENT_OFFSET; let dts_position = dts.map(|dts| { dts.checked_add_unsigned(SEGMENT_OFFSET) .and_then(|dts| dts.positive()) .unwrap_or(gst::ClockTime::ZERO) }); let buffer = buffer.make_mut(); buffer.set_pts(pts_position); buffer.set_dts(dts_position); } if self.obj().class().as_ref().variant != super::Variant::ONVIF { // Store in the queue so we don't have to recalculate this all the time pre_queue.push_back(PreQueuedBuffer { buffer, pts, end_pts, dts, end_dts, }); } else if let Some(running_time_utc_time_mapping) = running_time_utc_time_mapping { // For ONVIF we need to re-timestamp the buffer with its UTC time. // // After re-timestamping, put the buffer into the pre-queue so re-timestamping only has to // happen once. let utc_time = match get_utc_time_from_buffer(&buffer) { None => { // Calculate from the mapping running_time_to_utc_time(pts, *running_time_utc_time_mapping).ok_or_else( || { gst::error!(CAT, obj: sinkpad, "Stream has negative PTS UTC time"); gst::FlowError::Error }, )? } Some(utc_time) => utc_time, }; gst::trace!( CAT, obj: sinkpad, "Mapped PTS running time {pts} to UTC time {utc_time}" ); let end_pts_utc_time = running_time_to_utc_time(end_pts, (pts, utc_time)).ok_or_else(|| { gst::error!(CAT, obj: sinkpad, "Stream has negative end PTS UTC time"); gst::FlowError::Error })?; let (dts_utc_time, end_dts_utc_time) = if let Some(dts) = dts { let dts_utc_time = running_time_to_utc_time(dts, (pts, utc_time)).ok_or_else(|| { gst::error!(CAT, obj: sinkpad, "Stream has negative DTS UTC time"); gst::FlowError::Error })?; gst::trace!( CAT, obj: sinkpad, "Mapped DTS running time {dts} to UTC time {dts_utc_time}" ); let end_dts_utc_time = running_time_to_utc_time(end_dts.unwrap(), (pts, utc_time)) .ok_or_else(|| { gst::error!(CAT, obj: sinkpad, "Stream has negative end DTS UTC time"); gst::FlowError::Error })?; ( Some(gst::Signed::Positive(dts_utc_time)), Some(gst::Signed::Positive(end_dts_utc_time)), ) } else { (None, None) }; pre_queue.push_back(PreQueuedBuffer { buffer, pts: utc_time, end_pts: end_pts_utc_time, dts: dts_utc_time, end_dts: end_dts_utc_time, }); } else { // In ONVIF mode we need to get UTC times for each buffer and synchronize based on that. // Queue up to min(6s, fragment_duration) of data in the very beginning to get the first UTC time and then backdate. if let Some((last, first)) = Option::zip(pre_queue.back(), pre_queue.front()) { // Existence of PTS/DTS checked below let (last, first) = if delta_frames.requires_dts() { (last.end_dts.unwrap(), first.end_dts.unwrap()) } else { ( gst::Signed::Positive(last.end_pts), gst::Signed::Positive(first.end_pts), ) }; let limit = std::cmp::min(gst::ClockTime::from_seconds(6), fragment_duration); if last.saturating_sub(first) > gst::Signed::Positive(limit) { gst::error!( CAT, obj: sinkpad, "Got no UTC time in the first {limit} of the stream" ); return Err(gst::FlowError::Error); } } let utc_time = match get_utc_time_from_buffer(&buffer) { Some(utc_time) => utc_time, None => { pre_queue.push_back(PreQueuedBuffer { buffer, pts, end_pts, dts, end_dts, }); return Err(gst_base::AGGREGATOR_FLOW_NEED_DATA); } }; let mapping = (gst::Signed::Positive(pts), utc_time); *running_time_utc_time_mapping = Some(mapping); // Push the buffer onto the pre-queue and re-timestamp it and all other buffers // based on the mapping above once we have an UTC time. pre_queue.push_back(PreQueuedBuffer { buffer, pts, end_pts, dts, end_dts, }); for pre_queued_buffer in pre_queue.iter_mut() { let pts_utc_time = running_time_to_utc_time(pre_queued_buffer.pts, mapping) .ok_or_else(|| { gst::error!(CAT, obj: sinkpad, "Stream has negative PTS UTC time"); gst::FlowError::Error })?; gst::trace!( CAT, obj: sinkpad, "Mapped PTS running time {} to UTC time {pts_utc_time}", pre_queued_buffer.pts, ); pre_queued_buffer.pts = pts_utc_time; let end_pts_utc_time = running_time_to_utc_time(pre_queued_buffer.end_pts, mapping) .ok_or_else(|| { gst::error!(CAT, obj: sinkpad, "Stream has negative end PTS UTC time"); gst::FlowError::Error })?; pre_queued_buffer.end_pts = end_pts_utc_time; if let Some(dts) = pre_queued_buffer.dts { let dts_utc_time = running_time_to_utc_time(dts, mapping).ok_or_else(|| { gst::error!(CAT, obj: sinkpad, "Stream has negative DTS UTC time"); gst::FlowError::Error })?; gst::trace!( CAT, obj: sinkpad, "Mapped DTS running time {dts} to UTC time {dts_utc_time}" ); pre_queued_buffer.dts = Some(gst::Signed::Positive(dts_utc_time)); let end_dts_utc_time = running_time_to_utc_time(pre_queued_buffer.end_dts.unwrap(), mapping) .ok_or_else(|| { gst::error!(CAT, obj: sinkpad, "Stream has negative DTS UTC time"); gst::FlowError::Error })?; pre_queued_buffer.end_dts = Some(gst::Signed::Positive(end_dts_utc_time)); } } // Fall through and return the front of the queue } Ok(Some(pre_queue.front().unwrap().clone())) } fn pop_buffer( &self, _sinkpad: &super::FMP4MuxPad, pre_queue: &mut VecDeque, running_time_utc_time_mapping: &Option<(gst::Signed, gst::ClockTime)>, ) -> PreQueuedBuffer { // Only allowed to be called after peek was successful so there must be a buffer now // or in ONVIF mode we must also know the mapping now. assert!(!pre_queue.is_empty()); if self.obj().class().as_ref().variant == super::Variant::ONVIF { assert!(running_time_utc_time_mapping.is_some()); } pre_queue.pop_front().unwrap() } fn find_earliest_stream<'a>( &self, state: &'a mut State, timeout: bool, fragment_duration: gst::ClockTime, ) -> Result, gst::FlowError> { let mut earliest_stream = None; let mut all_have_data_or_eos = true; for (idx, stream) in state.streams.iter_mut().enumerate() { let pre_queued_buffer = match Self::peek_buffer( self, &stream.sinkpad, stream.delta_frames, &mut stream.pre_queue, &mut stream.running_time_utc_time_mapping, fragment_duration, ) { Ok(Some(buffer)) => buffer, Ok(None) | Err(gst_base::AGGREGATOR_FLOW_NEED_DATA) => { if stream.sinkpad.is_eos() { gst::trace!(CAT, obj: stream.sinkpad, "Stream is EOS"); } else { all_have_data_or_eos = false; gst::trace!(CAT, obj: stream.sinkpad, "Stream has no buffer"); } continue; } Err(err) => return Err(err), }; if stream.fragment_filled { gst::trace!(CAT, obj: stream.sinkpad, "Stream has current fragment filled"); continue; } if stream.chunk_filled { gst::trace!(CAT, obj: stream.sinkpad, "Stream has current chunk filled"); continue; } gst::trace!(CAT, obj: stream.sinkpad, "Stream has running time PTS {} / DTS {} queued", pre_queued_buffer.pts, pre_queued_buffer.dts.display()); let running_time = if stream.delta_frames.requires_dts() { pre_queued_buffer.dts.unwrap() } else { gst::Signed::Positive(pre_queued_buffer.pts) }; if earliest_stream .as_ref() .map_or(true, |(_idx, _stream, earliest_running_time)| { *earliest_running_time > running_time }) { earliest_stream = Some((idx, stream, running_time)); } } if !timeout && !all_have_data_or_eos { gst::trace!( CAT, imp: self, "No timeout and not all streams have a buffer or are EOS" ); Ok(None) } else if let Some((idx, stream, earliest_running_time)) = earliest_stream { gst::trace!( CAT, imp: self, "Stream {} is earliest stream with running time {}", stream.sinkpad.name(), earliest_running_time ); Ok(Some((idx, stream))) } else { gst::trace!(CAT, imp: self, "No streams have data queued currently"); Ok(None) } } // Queue incoming buffers as individual GOPs. fn queue_gops( &self, _idx: usize, stream: &mut Stream, mut pre_queued_buffer: PreQueuedBuffer, ) -> Result<(), gst::FlowError> { assert!(!stream.fragment_filled); gst::trace!(CAT, obj: stream.sinkpad, "Handling buffer {:?}", pre_queued_buffer); let delta_frames = stream.delta_frames; // Enforce monotonically increasing PTS for intra-only streams, and DTS otherwise if !delta_frames.requires_dts() { if pre_queued_buffer.pts < stream.current_position { gst::warning!( CAT, obj: stream.sinkpad, "Decreasing PTS {} < {}", pre_queued_buffer.pts, stream.current_position, ); pre_queued_buffer.pts = stream.current_position; } else { stream.current_position = pre_queued_buffer.pts; } pre_queued_buffer.end_pts = std::cmp::max(pre_queued_buffer.end_pts, pre_queued_buffer.pts); } else { // Negative DTS are handled via the dts_offset and by having negative composition time // offsets in the `trun` box. The smallest DTS here is shifted to zero. let dts = match pre_queued_buffer.dts.unwrap() { gst::Signed::Positive(dts) => { if let Some(dts_offset) = stream.dts_offset { dts + dts_offset } else { dts } } gst::Signed::Negative(dts) => { if stream.dts_offset.is_none() { stream.dts_offset = Some(dts); } let dts_offset = stream.dts_offset.unwrap(); if dts > dts_offset { gst::warning!(CAT, obj: stream.sinkpad, "DTS before first DTS"); gst::ClockTime::ZERO } else { dts_offset - dts } } }; let end_dts = match pre_queued_buffer.end_dts.unwrap() { gst::Signed::Positive(dts) => { if let Some(dts_offset) = stream.dts_offset { dts + dts_offset } else { dts } } gst::Signed::Negative(dts) => { let dts_offset = stream.dts_offset.unwrap(); if dts > dts_offset { gst::warning!(CAT, obj: stream.sinkpad, "End DTS before first DTS"); gst::ClockTime::ZERO } else { dts_offset - dts } } }; // Enforce monotonically increasing DTS for intra-only streams // NOTE: PTS stays the same so this will cause a bigger PTS/DTS difference // FIXME: Is this correct? if dts < stream.current_position { gst::warning!( CAT, obj: stream.sinkpad, "Decreasing DTS {} < {}", dts, stream.current_position, ); pre_queued_buffer.dts = Some(gst::Signed::Positive(stream.current_position)); } else { pre_queued_buffer.dts = Some(gst::Signed::Positive(dts)); stream.current_position = dts; } pre_queued_buffer.end_dts = Some(gst::Signed::Positive(std::cmp::max(end_dts, dts))); } let PreQueuedBuffer { buffer, pts, end_pts, dts, end_dts, } = pre_queued_buffer; let dts = dts.map(|v| v.positive().unwrap()); let end_dts = end_dts.map(|v| v.positive().unwrap()); let pts_position = buffer.pts().unwrap(); if !buffer.flags().contains(gst::BufferFlags::DELTA_UNIT) { gst::debug!( CAT, obj: stream.sinkpad, "Starting new GOP at PTS {} DTS {} (DTS offset {})", pts, dts.display(), stream.dts_offset.display(), ); let gop = Gop { start_pts: pts, start_dts: dts, earliest_pts: pts, earliest_pts_position: pts_position, final_earliest_pts: !delta_frames.requires_dts(), end_pts, end_dts, final_end_pts: false, buffers: vec![GopBuffer { buffer, pts, pts_position, dts, }], }; stream.queued_gops.push_front(gop); if let Some(prev_gop) = stream.queued_gops.get_mut(1) { gst::debug!( CAT, obj: stream.sinkpad, "Updating previous GOP starting at PTS {} to end PTS {} DTS {}", prev_gop.earliest_pts, pts, dts.display(), ); prev_gop.end_pts = std::cmp::max(prev_gop.end_pts, pts); prev_gop.end_dts = std::cmp::max(prev_gop.end_dts, dts); if !delta_frames.requires_dts() { prev_gop.final_end_pts = true; } if !prev_gop.final_earliest_pts { // Don't bother logging this for intra-only streams as it would be for every // single buffer. if delta_frames.requires_dts() { gst::debug!( CAT, obj: stream.sinkpad, "Previous GOP has final earliest PTS at {}", prev_gop.earliest_pts ); } prev_gop.final_earliest_pts = true; if let Some(prev_prev_gop) = stream.queued_gops.get_mut(2) { prev_prev_gop.final_end_pts = true; } } } } else if let Some(gop) = stream.queued_gops.front_mut() { assert!(!delta_frames.intra_only()); gop.end_pts = std::cmp::max(gop.end_pts, end_pts); gop.end_dts = gop.end_dts.opt_max(end_dts); gop.buffers.push(GopBuffer { buffer, pts, pts_position, dts, }); if delta_frames.requires_dts() { let dts = dts.unwrap(); if gop.earliest_pts > pts && !gop.final_earliest_pts { gst::debug!( CAT, obj: stream.sinkpad, "Updating current GOP earliest PTS from {} to {}", gop.earliest_pts, pts ); gop.earliest_pts = pts; gop.earliest_pts_position = pts_position; if let Some(prev_gop) = stream.queued_gops.get_mut(1) { if prev_gop.end_pts < pts { gst::debug!( CAT, obj: stream.sinkpad, "Updating previous GOP starting PTS {} end time from {} to {}", pts, prev_gop.end_pts, pts ); prev_gop.end_pts = pts; } } } let gop = stream.queued_gops.front_mut().unwrap(); // The earliest PTS is known when the current DTS is bigger or equal to the first // PTS that was observed in this GOP. If there was another frame later that had a // lower PTS then it wouldn't be possible to display it in time anymore, i.e. the // stream would be invalid. if gop.start_pts <= dts && !gop.final_earliest_pts { gst::debug!( CAT, obj: stream.sinkpad, "GOP has final earliest PTS at {}", gop.earliest_pts ); gop.final_earliest_pts = true; if let Some(prev_gop) = stream.queued_gops.get_mut(1) { prev_gop.final_end_pts = true; } } } } else { gst::warning!( CAT, obj: stream.sinkpad, "Waiting for keyframe at the beginning of the stream" ); } if let Some((prev_gop, first_gop)) = Option::zip( stream.queued_gops.iter().find(|gop| gop.final_end_pts), stream.queued_gops.back(), ) { gst::debug!( CAT, obj: stream.sinkpad, "Queued full GOPs duration updated to {}", prev_gop.end_pts.saturating_sub(first_gop.earliest_pts), ); } gst::debug!( CAT, obj: stream.sinkpad, "Queued duration updated to {}", Option::zip(stream.queued_gops.front(), stream.queued_gops.back()) .map(|(end, start)| end.end_pts.saturating_sub(start.start_pts)) .unwrap_or(gst::ClockTime::ZERO) ); Ok(()) } fn check_stream_filled( &self, settings: &Settings, stream: &mut Stream, fragment_start_pts: Option, chunk_start_pts: Option, all_eos: bool, ) { // Either both are none or neither let (chunk_start_pts, fragment_start_pts) = match (chunk_start_pts, fragment_start_pts) { (Some(chunk_start_pts), Some(fragment_start_pts)) => { (chunk_start_pts, fragment_start_pts) } _ => return, }; // Check if this stream is filled enough now. if let Some(chunk_duration) = settings.chunk_duration { // In chunk mode let (gop_idx, gop) = match stream .queued_gops .iter() .enumerate() .find(|(_idx, gop)| gop.final_earliest_pts || all_eos || stream.sinkpad.is_eos()) { Some(res) => res, None => { gst::trace!(CAT, obj: stream.sinkpad, "Chunked mode but no GOP with final earliest PTS known yet"); return; } }; gst::trace!( CAT, obj: stream.sinkpad, "GOP {gop_idx} start PTS {}, GOP end PTS {} (final {})", gop.start_pts, gop.end_pts, gop.final_end_pts || all_eos || stream.sinkpad.is_eos(), ); gst::trace!( CAT, obj: stream.sinkpad, "Current chunk start {}, current fragment start {}", chunk_start_pts, fragment_start_pts, ); let chunk_end_pts = chunk_start_pts + chunk_duration; let fragment_end_pts = fragment_start_pts + settings.fragment_duration; gst::trace!( CAT, obj: stream.sinkpad, "Current chunk end {}, current fragment end {}", chunk_end_pts, fragment_end_pts, ); // First check if the next split should be the end of a fragment or the end of a chunk. // If both are the same then a fragment split has preference. if fragment_end_pts <= chunk_end_pts && gop.start_pts >= fragment_end_pts { gst::debug!(CAT, obj: stream.sinkpad, "Stream queued enough data for finishing this fragment"); stream.fragment_filled = true; } else if chunk_end_pts < fragment_end_pts { let last_pts = gop.buffers.last().map(|b| b.pts); if gop.end_pts >= chunk_end_pts // only if there's another GOP or at least one further buffer && (gop_idx > 0 || last_pts.map_or(false, |last_pts| last_pts.saturating_sub(chunk_start_pts) > chunk_duration)) { gst::debug!(CAT, obj: stream.sinkpad, "Stream queued enough data for this chunk"); stream.chunk_filled = true; } } } else { let gop = match stream .queued_gops .iter() .find(|gop| gop.final_end_pts || all_eos || stream.sinkpad.is_eos()) { Some(gop) => gop, None => { gst::trace!(CAT, obj: stream.sinkpad, "Fragment mode but no GOP with final end PTS known yet"); return; } }; gst::trace!( CAT, obj: stream.sinkpad, "GOP start PTS {}, GOP end PTS {}", gop.start_pts, gop.end_pts, ); // Check if the end of the latest finalized GOP is after the fragment end let fragment_end_pts = fragment_start_pts + settings.fragment_duration; gst::trace!( CAT, obj: stream.sinkpad, "Current fragment start{}, current fragment end {}", fragment_start_pts, fragment_start_pts + settings.fragment_duration, ); if gop.end_pts >= fragment_end_pts { gst::debug!(CAT, obj: stream.sinkpad, "Stream queued enough data for this fragment"); stream.fragment_filled = true; } } } fn calculate_earliest_pts( &self, settings: &Settings, state: &mut State, upstream_events: &mut Vec<(super::FMP4MuxPad, gst::Event)>, all_eos: bool, timeout: bool, ) { if state.earliest_pts.is_some() { return; } let fragment_start_pts = state.fragment_start_pts; let chunk_start_pts = state.chunk_start_pts; // Calculate the earliest PTS after queueing input if we can now. let mut earliest_pts = None; let mut start_dts = None; for stream in &state.streams { let (stream_earliest_pts, stream_start_dts) = match stream.queued_gops.back() { None => { if !all_eos && !timeout { earliest_pts = None; start_dts = None; break; } continue; } Some(oldest_gop) => { if !all_eos && !timeout && !oldest_gop.final_earliest_pts { earliest_pts = None; start_dts = None; break; } (oldest_gop.earliest_pts, oldest_gop.start_dts) } }; if earliest_pts.opt_gt(stream_earliest_pts).unwrap_or(true) { earliest_pts = Some(stream_earliest_pts); } if let Some(stream_start_dts) = stream_start_dts { if start_dts.opt_gt(stream_start_dts).unwrap_or(true) { start_dts = Some(stream_start_dts); } } } let earliest_pts = match earliest_pts { Some(earliest_pts) => earliest_pts, None => return, }; // The earliest PTS is known and as such the start of the first and second fragment. gst::info!( CAT, imp: self, "Got earliest PTS {}, start DTS {} (timeout: {timeout}, all eos: {all_eos})", earliest_pts, start_dts.display() ); state.earliest_pts = Some(earliest_pts); state.start_dts = start_dts; state.fragment_start_pts = Some(earliest_pts); state.chunk_start_pts = Some(earliest_pts); // Now send force-keyunit events for the second fragment start. let fku_time = earliest_pts + settings.fragment_duration; for stream in &state.streams { let current_position = stream.current_position; // In case of ONVIF this needs to be converted back from UTC time to // the stream's running time let (fku_time, current_position) = if self.obj().class().as_ref().variant == super::Variant::ONVIF { ( if let Some(fku_time) = utc_time_to_running_time( fku_time, stream.running_time_utc_time_mapping.unwrap(), ) { fku_time } else { continue; }, utc_time_to_running_time( current_position, stream.running_time_utc_time_mapping.unwrap(), ), ) } else { (fku_time, Some(current_position)) }; let fku_time = if current_position.map_or(false, |current_position| current_position > fku_time) { gst::warning!( CAT, obj: stream.sinkpad, "Sending first force-keyunit event late for running time {} at {}", fku_time, current_position.display(), ); None } else { gst::debug!( CAT, obj: stream.sinkpad, "Sending first force-keyunit event for running time {}", fku_time, ); Some(fku_time) }; let fku = gst_video::UpstreamForceKeyUnitEvent::builder() .running_time(fku_time) .all_headers(true) .build(); upstream_events.push((stream.sinkpad.clone(), fku)); } // Check if any of the streams are already filled enough for the first chunk/fragment. for stream in &mut state.streams { self.check_stream_filled( settings, stream, fragment_start_pts, chunk_start_pts, all_eos, ); } } #[allow(clippy::type_complexity)] fn drain_buffers( &self, state: &mut State, settings: &Settings, timeout: bool, at_eos: bool, ) -> Result< ( // Drained streams Vec<(super::FragmentHeaderStream, VecDeque)>, // Minimum earliest PTS position of all streams Option, // Minimum earliest PTS of all streams Option, // Minimum start DTS position of all streams (if any stream has DTS) Option, // End PTS of this drained fragment or chunk, i.e. start PTS of the next fragment or // chunk Option, // With these drained buffers the current fragment is filled bool, // These buffers make the start of a new fragment bool, ), gst::FlowError, > { let mut drained_streams = Vec::with_capacity(state.streams.len()); let mut min_earliest_pts_position = None; let mut min_earliest_pts = None; let mut min_start_dts_position = None; let mut chunk_end_pts = None; let mut fragment_start = false; // In fragment mode, each chunk is a full fragment. Otherwise, in chunk mode, // this fragment is filled if it is filled for the first non-EOS stream let fragment_filled = settings.chunk_duration.is_none() || state .streams .iter() .find(|s| !s.sinkpad.is_eos()) .map(|s| s.fragment_filled) == Some(true); // The first stream decides how much can be dequeued, if anything at all. // // In chunk mode: // If more than the fragment duration has passed until the latest GOPs earliest PTS then // the fragment is considered filled and all GOPs until that GOP are drained. The next // chunk would start a new fragment, and would start with the keyframe at the beginning // of that latest GOP. // // Otherwise if more than a chunk duration is currently queued in GOPs of which the // earliest PTS is known then drain everything up to that position. If nothing can be // drained at all then advance the timeout by 1s until something can be dequeued. // // Otherwise: // All complete GOPs (or at EOS everything) up to the fragment duration will be dequeued // but on timeout in live pipelines it might happen that the first stream does not have a // complete GOP queued. In that case nothing is dequeued for any of the streams and the // timeout is advanced by 1s until at least one complete GOP can be dequeued. // // If the first stream is already EOS then the next stream that is not EOS yet will be // taken in its place. gst::info!( CAT, imp: self, "Starting to drain at {} (fragment start {}, fragment end {}, chunk start {}, chunk end {})", state.chunk_start_pts.display(), state.fragment_start_pts.display(), state.fragment_start_pts.map(|start| start + settings.fragment_duration).display(), state.chunk_start_pts.display(), Option::zip(state.chunk_start_pts, settings.chunk_duration).map(|(start, duration)| start + duration).display(), ); for (idx, stream) in state.streams.iter_mut().enumerate() { let stream_settings = stream.sinkpad.imp().settings.lock().unwrap().clone(); assert!( timeout || at_eos || stream.sinkpad.is_eos() || stream.queued_gops.get(1).map(|gop| gop.final_earliest_pts) == Some(true) || settings.chunk_duration.is_some() ); let mut gops = Vec::with_capacity(stream.queued_gops.len()); if !stream.queued_gops.is_empty() { let fragment_start_pts = state.fragment_start_pts.unwrap(); let chunk_start_pts = state.chunk_start_pts.unwrap(); // For the first stream drain as much as necessary and decide the end of this // fragment or chunk, for all other streams drain up to that position. if let Some(chunk_duration) = settings.chunk_duration { let dequeue_end_pts = if let Some(chunk_end_pts) = chunk_end_pts { // Not the first stream chunk_end_pts } else if fragment_filled { // Fragment is filled, so only dequeue everything until the latest GOP fragment_start_pts + settings.fragment_duration } else { // Fragment is not filled and we either have a full chunk or timeout chunk_start_pts + chunk_duration }; gst::trace!( CAT, obj: stream.sinkpad, "Draining up to end PTS {} / duration {}", dequeue_end_pts, dequeue_end_pts - chunk_start_pts ); while let Some(gop) = stream.queued_gops.back() { // If this should be the last chunk of a fragment then only drain every // finished GOP until the chunk end PTS. If there is no finished GOP for // this stream (it would be not the first stream then), then drain // everything up to the chunk end PTS. // // If this chunk is not the last chunk of a fragment then simply dequeue // everything up to the chunk end PTS. if fragment_filled { gst::trace!( CAT, obj: stream.sinkpad, "Fragment filled, current GOP start {} end {} (final {})", gop.start_pts, gop.end_pts, gop.final_end_pts || at_eos || stream.sinkpad.is_eos() ); if (gop.final_end_pts || at_eos || stream.sinkpad.is_eos()) && gop.end_pts <= dequeue_end_pts { gst::trace!( CAT, obj: stream.sinkpad, "Pushing whole GOP", ); gops.push(stream.queued_gops.pop_back().unwrap()); continue; } if !gops.is_empty() { break; } gst::error!(CAT, obj: stream.sinkpad, "Don't have a full GOP at the end of a fragment"); } else { gst::trace!( CAT, obj: stream.sinkpad, "Chunk filled, current GOP start {} end {} (final {})", gop.start_pts, gop.end_pts, gop.final_end_pts || at_eos || stream.sinkpad.is_eos() ); } if gop.end_pts <= dequeue_end_pts && (gop.final_end_pts || at_eos || stream.sinkpad.is_eos()) { gst::trace!( CAT, obj: stream.sinkpad, "Pushing whole GOP", ); gops.push(stream.queued_gops.pop_back().unwrap()); } else if gop.start_pts >= dequeue_end_pts || (!gop.final_earliest_pts && !at_eos && !stream.sinkpad.is_eos()) { gst::trace!( CAT, obj: stream.sinkpad, "GOP starts after chunk end", ); break; } else { let gop = stream.queued_gops.back_mut().unwrap(); let start_pts = gop.start_pts; let start_dts = gop.start_dts; let earliest_pts = gop.earliest_pts; let earliest_pts_position = gop.earliest_pts_position; let mut split_index = None; for (idx, buffer) in gop.buffers.iter().enumerate() { if buffer.pts >= dequeue_end_pts { break; } split_index = Some(idx); } let split_index = match split_index { Some(split_index) => split_index, None => { // We have B frames and the first buffer of this GOP is too far // in the future. gst::trace!( CAT, obj: stream.sinkpad, "First buffer of GOP too far in the future", ); break; } }; // The last buffer of the GOP starts before the chunk end but ends // after the end. We still take it here and remove the whole GOP. if split_index == gop.buffers.len() - 1 { if gop.final_end_pts || at_eos || stream.sinkpad.is_eos() { gst::trace!( CAT, obj: stream.sinkpad, "Pushing whole GOP", ); gops.push(stream.queued_gops.pop_back().unwrap()); } else { gst::trace!( CAT, obj: stream.sinkpad, "Can't push whole GOP as it's not final yet", ); } break; } let mut buffers = mem::take(&mut gop.buffers); // Contains all buffers from `split_index + 1` to the end gop.buffers = buffers.split_off(split_index + 1); gop.start_pts = gop.buffers[0].pts; gop.start_dts = gop.buffers[0].dts; gop.earliest_pts_position = gop.buffers[0].pts_position; gop.earliest_pts = gop.buffers[0].pts; gst::trace!( CAT, obj: stream.sinkpad, "Splitting GOP and keeping PTS {}", gop.buffers[0].pts, ); let queue_gop = Gop { start_pts, start_dts, earliest_pts, final_earliest_pts: true, end_pts: gop.start_pts, final_end_pts: true, end_dts: gop.start_dts, earliest_pts_position, buffers, }; gops.push(queue_gop); break; } } fragment_start = fragment_start_pts == chunk_start_pts; if fragment_start { if let Some(first_buffer) = gops.first().and_then(|gop| gop.buffers.first()) { if first_buffer .buffer .flags() .contains(gst::BufferFlags::DELTA_UNIT) { gst::error!(CAT, obj: stream.sinkpad, "First buffer of a new fragment is not a keyframe"); } } } } else { let dequeue_end_pts = if let Some(chunk_end_pts) = chunk_end_pts { // Not the first stream chunk_end_pts } else { fragment_start_pts + settings.fragment_duration }; gst::trace!( CAT, obj: stream.sinkpad, "Draining up to end PTS {} / duration {}", dequeue_end_pts, dequeue_end_pts - chunk_start_pts ); while let Some(gop) = stream.queued_gops.back() { gst::trace!( CAT, obj: stream.sinkpad, "Current GOP start {} end {} (final {})", gop.start_pts, gop.end_pts, gop.final_end_pts || at_eos || stream.sinkpad.is_eos() ); // If this GOP is not complete then we can't pop it yet. // // If there was no complete GOP at all yet then it might be bigger than the // fragment duration. In this case we might not be able to handle the latency // requirements in a live pipeline. if !gop.final_end_pts && !at_eos && !stream.sinkpad.is_eos() { gst::trace!( CAT, obj: stream.sinkpad, "Not including GOP without final end PTS", ); break; } // If this GOP starts after the fragment end then don't dequeue it yet unless this is // the first stream and no GOPs were dequeued at all yet. This would mean that the // GOP is bigger than the fragment duration. if !at_eos && gop.end_pts > dequeue_end_pts && (chunk_end_pts.is_some() || !gops.is_empty()) { gst::trace!( CAT, obj: stream.sinkpad, "Not including GOP yet", ); break; } gst::trace!( CAT, obj: stream.sinkpad, "Pushing complete GOP", ); gops.push(stream.queued_gops.pop_back().unwrap()); } // If we don't have a next chunk start PTS then this is the first stream as above. if chunk_end_pts.is_none() { // In fragment mode, each chunk is a full fragment fragment_start = true; } } } stream.fragment_filled = false; stream.chunk_filled = false; // If we don't have a next chunk start PTS then this is the first stream as above. if chunk_end_pts.is_none() { if let Some(last_gop) = gops.last() { // Dequeued something so let's take the end PTS of the last GOP chunk_end_pts = Some(last_gop.end_pts); gst::info!( CAT, obj: stream.sinkpad, "Draining up to PTS {} for this chunk", last_gop.end_pts, ); } else { // If nothing was dequeued for the first stream then this is OK if we're at // EOS: we just consider the next stream as first stream then. if at_eos || stream.sinkpad.is_eos() { // This is handled below generally if nothing was dequeued } else { // Otherwise this can only really happen on timeout in live pipelines. assert!(timeout); if settings.chunk_duration.is_some() { gst::warning!( CAT, obj: stream.sinkpad, "Don't have anything to drain for the first stream on timeout in a live pipeline", ); } else { gst::warning!( CAT, obj: stream.sinkpad, "Don't have a complete GOP for the first stream on timeout in a live pipeline", ); } // In this case we advance the timeout by 1s and hope that things are // better then. return Err(gst_base::AGGREGATOR_FLOW_NEED_DATA); } } } else if at_eos { if let Some(last_gop) = gops.last() { if chunk_end_pts.map_or(true, |chunk_end_pts| chunk_end_pts < last_gop.end_pts) { chunk_end_pts = Some(last_gop.end_pts); } } } if gops.is_empty() { gst::info!( CAT, obj: stream.sinkpad, "Draining no buffers", ); drained_streams.push(( super::FragmentHeaderStream { caps: stream.caps.clone(), start_time: None, delta_frames: stream.delta_frames, trak_timescale: stream_settings.trak_timescale, }, VecDeque::new(), )); continue; } assert!(chunk_end_pts.is_some()); if let Some((prev_gop, first_gop)) = Option::zip( stream.queued_gops.iter().find(|gop| gop.final_end_pts), stream.queued_gops.back(), ) { gst::debug!( CAT, obj: stream.sinkpad, "Queued full GOPs duration updated to {}", prev_gop.end_pts.saturating_sub(first_gop.earliest_pts), ); } gst::debug!( CAT, obj: stream.sinkpad, "Queued duration updated to {}", Option::zip(stream.queued_gops.front(), stream.queued_gops.back()) .map(|(end, start)| end.end_pts.saturating_sub(start.start_pts)) .unwrap_or(gst::ClockTime::ZERO) ); let last_gop = gops.last().unwrap(); let end_pts = last_gop.end_pts; let end_dts = last_gop.end_dts; // First flatten all GOPs into a single `Vec` let mut gop_buffers = Vec::with_capacity(gops.iter().map(|g| g.buffers.len()).sum()); gop_buffers.extend(gops.into_iter().flat_map(|gop| gop.buffers.into_iter())); // Then calculate durations for all of the buffers and get rid of any GAP buffers in // the process. // Also calculate the earliest PTS / start DTS here, which needs to consider GAP // buffers too. let mut buffers = VecDeque::with_capacity(gop_buffers.len()); let mut earliest_pts = None; let mut earliest_pts_position = None; let mut start_dts = None; let mut start_dts_position = None; let mut gop_buffers = gop_buffers.into_iter(); while let Some(buffer) = gop_buffers.next() { // If this is a GAP buffer then skip it. Its duration was already considered // below for the non-GAP buffer preceding it, and if there was none then the // chunk start would be adjusted accordingly for this stream. if buffer.buffer.flags().contains(gst::BufferFlags::GAP) && buffer.buffer.flags().contains(gst::BufferFlags::DROPPABLE) && buffer.buffer.size() == 0 { gst::trace!( CAT, obj: stream.sinkpad, "Skipping gap buffer {buffer:?}", ); continue; } if earliest_pts.map_or(true, |earliest_pts| buffer.pts < earliest_pts) { earliest_pts = Some(buffer.pts); } if earliest_pts_position.map_or(true, |earliest_pts_position| { buffer.buffer.pts().unwrap() < earliest_pts_position }) { earliest_pts_position = Some(buffer.buffer.pts().unwrap()); } if stream.delta_frames.requires_dts() && start_dts.is_none() { start_dts = Some(buffer.dts.unwrap()); } if stream.delta_frames.requires_dts() && start_dts_position.is_none() { start_dts_position = Some(buffer.buffer.dts().unwrap()); } let timestamp = if !stream.delta_frames.requires_dts() { buffer.pts } else { buffer.dts.unwrap() }; // Take as end timestamp the timestamp of the next non-GAP buffer let end_timestamp = match gop_buffers.as_slice().iter().find(|buf| { !buf.buffer.flags().contains(gst::BufferFlags::GAP) || !buf.buffer.flags().contains(gst::BufferFlags::DROPPABLE) || buf.buffer.size() != 0 }) { Some(buffer) => { if !stream.delta_frames.requires_dts() { buffer.pts } else { buffer.dts.unwrap() } } None => { if !stream.delta_frames.requires_dts() { end_pts } else { end_dts.unwrap() } } }; // Timestamps are enforced to monotonically increase when queueing buffers let duration = end_timestamp .checked_sub(timestamp) .expect("Timestamps going backwards"); let composition_time_offset = if !stream.delta_frames.requires_dts() { None } else { let pts = buffer.pts; let dts = buffer.dts.unwrap(); Some( i64::try_from( (gst::Signed::Positive(pts) - gst::Signed::Positive(dts)).nseconds(), ) .map_err(|_| { gst::error!(CAT, obj: stream.sinkpad, "Too big PTS/DTS difference"); gst::FlowError::Error })?, ) }; buffers.push_back(Buffer { idx, buffer: buffer.buffer, timestamp, duration, composition_time_offset, }); } if buffers.is_empty() { gst::info!( CAT, obj: stream.sinkpad, "Drained only gap buffers", ); drained_streams.push(( super::FragmentHeaderStream { caps: stream.caps.clone(), start_time: None, delta_frames: stream.delta_frames, trak_timescale: stream_settings.trak_timescale, }, VecDeque::new(), )); continue; } let earliest_pts = earliest_pts.unwrap(); let earliest_pts_position = earliest_pts_position.unwrap(); if stream.delta_frames.requires_dts() { assert!(start_dts.is_some()); assert!(start_dts_position.is_some()); } let start_dts = start_dts; let start_dts_position = start_dts_position; gst::info!( CAT, obj: stream.sinkpad, "Draining {} worth of buffers starting at PTS {} DTS {}, DTS offset {}", end_pts.saturating_sub(earliest_pts), earliest_pts, start_dts.display(), stream.dts_offset.display(), ); let start_time = if !stream.delta_frames.requires_dts() { earliest_pts } else { start_dts.unwrap() }; if min_earliest_pts.opt_gt(earliest_pts).unwrap_or(true) { min_earliest_pts = Some(earliest_pts); } if min_earliest_pts_position .opt_gt(earliest_pts_position) .unwrap_or(true) { min_earliest_pts_position = Some(earliest_pts_position); } if let Some(start_dts_position) = start_dts_position { if min_start_dts_position .opt_gt(start_dts_position) .unwrap_or(true) { min_start_dts_position = Some(start_dts_position); } } drained_streams.push(( super::FragmentHeaderStream { caps: stream.caps.clone(), start_time: Some(start_time), delta_frames: stream.delta_frames, trak_timescale: stream_settings.trak_timescale, }, buffers, )); } Ok(( drained_streams, min_earliest_pts_position, min_earliest_pts, min_start_dts_position, chunk_end_pts, fragment_filled, fragment_start, )) } #[allow(clippy::type_complexity)] fn interleave_buffers( &self, settings: &Settings, mut drained_streams: Vec<(super::FragmentHeaderStream, VecDeque)>, ) -> Result<(Vec, Vec), gst::FlowError> { let mut interleaved_buffers = Vec::with_capacity(drained_streams.iter().map(|(_, bufs)| bufs.len()).sum()); while let Some((_idx, (_, bufs))) = drained_streams .iter_mut() .enumerate() .min_by(|(a_idx, (_, a)), (b_idx, (_, b))| { let (a, b) = match (a.front(), b.front()) { (None, None) => return std::cmp::Ordering::Equal, (None, _) => return std::cmp::Ordering::Greater, (_, None) => return std::cmp::Ordering::Less, (Some(a), Some(b)) => (a, b), }; match a.timestamp.cmp(&b.timestamp) { std::cmp::Ordering::Equal => a_idx.cmp(b_idx), cmp => cmp, } }) { let start_time = match bufs.front() { None => { // No more buffers now break; } Some(buf) => buf.timestamp, }; let mut current_end_time = start_time; let mut dequeued_bytes = 0; while settings .interleave_bytes .opt_ge(dequeued_bytes) .unwrap_or(true) && settings .interleave_time .opt_ge(current_end_time.saturating_sub(start_time)) .unwrap_or(true) { if let Some(buffer) = bufs.pop_front() { current_end_time = buffer.timestamp + buffer.duration; dequeued_bytes += buffer.buffer.size() as u64; interleaved_buffers.push(buffer); } else { // No buffers left in this stream, go to next stream break; } } } // All buffers should be consumed now assert!(drained_streams.iter().all(|(_, bufs)| bufs.is_empty())); let streams = drained_streams .into_iter() .map(|(stream, _)| stream) .collect::>(); Ok((interleaved_buffers, streams)) } /// Fills upstream events as needed and returns the caps the first time draining can happen. /// /// If it returns `(_, None)` then there's currently nothing to drain anymore. fn drain( &self, state: &mut State, settings: &Settings, timeout: bool, at_eos: bool, upstream_events: &mut Vec<(super::FMP4MuxPad, gst::Event)>, ) -> Result<(Option, Option), gst::FlowError> { if at_eos { gst::info!(CAT, imp: self, "Draining at EOS"); } else if timeout { gst::info!(CAT, imp: self, "Draining at timeout"); } else { for stream in &state.streams { if !stream.chunk_filled && !stream.fragment_filled && !stream.sinkpad.is_eos() { return Ok((None, None)); } } gst::info!( CAT, imp: self, "Draining because all streams have enough data queued" ); } // Collect all buffers and their timing information that are to be drained right now. let ( drained_streams, min_earliest_pts_position, min_earliest_pts, min_start_dts_position, chunk_end_pts, fragment_filled, fragment_start, ) = self.drain_buffers(state, settings, timeout, at_eos)?; // Create header now if it was not created before and return the caps let mut caps = None; if state.stream_header.is_none() { let (_, new_caps) = self.update_header(state, settings, false)?.unwrap(); caps = Some(new_caps); } // Interleave buffers according to the settings into a single vec let (mut interleaved_buffers, mut streams) = self.interleave_buffers(settings, drained_streams)?; // Offset stream start time to start at 0 in ONVIF mode, or if 'offset-to-zero' is enabled, // instead of using the UTC time verbatim. This would be used for the tfdt box later. // FIXME: Should this use the original DTS-or-PTS running time instead? // That might be negative though! if self.obj().class().as_ref().variant == super::Variant::ONVIF || settings.offset_to_zero { let offset = if let Some(start_dts) = state.start_dts { std::cmp::min(start_dts, state.earliest_pts.unwrap()) } else { state.earliest_pts.unwrap() }; for stream in &mut streams { if let Some(start_time) = stream.start_time { stream.start_time = Some(start_time.checked_sub(offset).unwrap()); } } } let mut buffer_list = None; if interleaved_buffers.is_empty() { assert!(at_eos); } else { // If there are actual buffers to output then create headers as needed and create a // bufferlist for all buffers that have to be output. let min_earliest_pts_position = min_earliest_pts_position.unwrap(); let min_earliest_pts = min_earliest_pts.unwrap(); let chunk_end_pts = chunk_end_pts.unwrap(); let mut fmp4_header = None; if !state.sent_headers { let mut buffer = state.stream_header.as_ref().unwrap().copy(); { let buffer = buffer.get_mut().unwrap(); buffer.set_pts(min_earliest_pts_position); buffer.set_dts(min_start_dts_position); // Header is DISCONT|HEADER buffer.set_flags(gst::BufferFlags::DISCONT | gst::BufferFlags::HEADER); } fmp4_header = Some(buffer); state.sent_headers = true; } // TODO: Write prft boxes before moof // TODO: Write sidx boxes before moof and rewrite once offsets are known if state.sequence_number == 0 { state.sequence_number = 1; } let sequence_number = state.sequence_number; // If this is the last chunk of a fragment then increment the sequence number for the // start of the next fragment. if fragment_filled { state.sequence_number += 1; } let (mut fmp4_fragment_header, moof_offset) = boxes::create_fmp4_fragment_header(super::FragmentHeaderConfiguration { variant: self.obj().class().as_ref().variant, sequence_number, chunk: !fragment_start, streams: streams.as_slice(), buffers: interleaved_buffers.as_slice(), }) .map_err(|err| { gst::error!( CAT, imp: self, "Failed to create FMP4 fragment header: {}", err ); gst::FlowError::Error })?; { let buffer = fmp4_fragment_header.get_mut().unwrap(); buffer.set_pts(min_earliest_pts_position); buffer.set_dts(min_start_dts_position); buffer.set_duration(chunk_end_pts.checked_sub(min_earliest_pts)); // Fragment and chunk header is HEADER buffer.set_flags(gst::BufferFlags::HEADER); // Chunk header is DELTA_UNIT if !fragment_start { buffer.set_flags(gst::BufferFlags::DELTA_UNIT); } // Copy metas from the first actual buffer to the fragment header. This allows // getting things like the reference timestamp meta or the timecode meta to identify // the fragment. let _ = interleaved_buffers[0].buffer.copy_into( buffer, gst::BufferCopyFlags::META, 0, None, ); } let moof_offset = state.current_offset + fmp4_header.as_ref().map(|h| h.size()).unwrap_or(0) as u64 + moof_offset; let buffers_len = interleaved_buffers.len(); for (idx, buffer) in interleaved_buffers.iter_mut().enumerate() { // Fix up buffer flags, all other buffers are DELTA_UNIT let buffer_ref = buffer.buffer.make_mut(); buffer_ref.unset_flags(gst::BufferFlags::all()); buffer_ref.set_flags(gst::BufferFlags::DELTA_UNIT); // Set the marker flag for the last buffer of the segment if idx == buffers_len - 1 { buffer_ref.set_flags(gst::BufferFlags::MARKER); } } buffer_list = Some( fmp4_header .into_iter() .chain(Some(fmp4_fragment_header)) .chain(interleaved_buffers.into_iter().map(|buffer| buffer.buffer)) .inspect(|b| { state.current_offset += b.size() as u64; }) .collect::(), ); if settings.write_mfra && fragment_start { // Write mfra only for the main stream on fragment starts, and if there are no // buffers for the main stream in this segment then don't write anything. if let Some(super::FragmentHeaderStream { start_time: Some(start_time), .. }) = streams.get(0) { state.fragment_offsets.push(super::FragmentOffset { time: *start_time, offset: moof_offset, }); } } state.end_pts = Some(chunk_end_pts); // Update for the start PTS of the next fragment / chunk if fragment_filled { state.fragment_start_pts = Some(chunk_end_pts); gst::info!(CAT, imp: self, "Starting new fragment at {}", chunk_end_pts,); } else { gst::info!(CAT, imp: self, "Starting new chunk at {}", chunk_end_pts,); } state.chunk_start_pts = Some(chunk_end_pts); // If the current fragment is filled we already have the next fragment's start // keyframe and can request the following one. if fragment_filled { let fku_time = chunk_end_pts + settings.fragment_duration; for stream in &state.streams { let current_position = stream.current_position; // In case of ONVIF this needs to be converted back from UTC time to // the stream's running time let (fku_time, current_position) = if self.obj().class().as_ref().variant == super::Variant::ONVIF { ( if let Some(fku_time) = utc_time_to_running_time( fku_time, stream.running_time_utc_time_mapping.unwrap(), ) { fku_time } else { continue; }, utc_time_to_running_time( current_position, stream.running_time_utc_time_mapping.unwrap(), ), ) } else { (fku_time, Some(current_position)) }; let fku_time = if current_position .map_or(false, |current_position| current_position > fku_time) { gst::warning!( CAT, obj: stream.sinkpad, "Sending force-keyunit event late for running time {} at {}", fku_time, current_position.display(), ); None } else { gst::debug!( CAT, obj: stream.sinkpad, "Sending force-keyunit event for running time {}", fku_time, ); Some(fku_time) }; let fku = gst_video::UpstreamForceKeyUnitEvent::builder() .running_time(fku_time) .all_headers(true) .build(); upstream_events.push((stream.sinkpad.clone(), fku)); } } // Reset timeout delay now that we've output an actual fragment or chunk state.timeout_delay = gst::ClockTime::ZERO; } // TODO: Write edit list at EOS // TODO: Rewrite bitrates at EOS Ok((caps, buffer_list)) } fn create_streams(&self, state: &mut State) -> Result<(), gst::FlowError> { for pad in self .obj() .sink_pads() .into_iter() .map(|pad| pad.downcast::().unwrap()) { let caps = match pad.current_caps() { Some(caps) => caps, None => { gst::warning!(CAT, obj: pad, "Skipping pad without caps"); continue; } }; gst::info!(CAT, obj: pad, "Configuring caps {:?}", caps); let s = caps.structure(0).unwrap(); let mut delta_frames = DeltaFrames::IntraOnly; match s.name().as_str() { "video/x-h264" | "video/x-h265" => { if !s.has_field_with_type("codec_data", gst::Buffer::static_type()) { gst::error!(CAT, obj: pad, "Received caps without codec_data"); return Err(gst::FlowError::NotNegotiated); } delta_frames = DeltaFrames::Bidirectional; } "video/x-vp9" => { if !s.has_field_with_type("colorimetry", str::static_type()) { gst::error!(CAT, obj: pad, "Received caps without colorimetry"); return Err(gst::FlowError::NotNegotiated); } delta_frames = DeltaFrames::PredictiveOnly; } "image/jpeg" => (), "audio/mpeg" => { if !s.has_field_with_type("codec_data", gst::Buffer::static_type()) { gst::error!(CAT, obj: pad, "Received caps without codec_data"); return Err(gst::FlowError::NotNegotiated); } } "audio/x-opus" => { if let Some(header) = s .get::("streamheader") .ok() .and_then(|a| a.get(0).and_then(|v| v.get::().ok())) { if gst_pbutils::codec_utils_opus_parse_header(&header, None).is_err() { gst::error!(CAT, obj: pad, "Received invalid Opus header"); return Err(gst::FlowError::NotNegotiated); } } else if gst_pbutils::codec_utils_opus_parse_caps(&caps, None).is_err() { gst::error!(CAT, obj: pad, "Received invalid Opus caps"); return Err(gst::FlowError::NotNegotiated); } } "audio/x-alaw" | "audio/x-mulaw" => (), "audio/x-adpcm" => (), "application/x-onvif-metadata" => (), _ => unreachable!(), } state.streams.push(Stream { sinkpad: pad, caps, delta_frames, pre_queue: VecDeque::new(), queued_gops: VecDeque::new(), fragment_filled: false, chunk_filled: false, dts_offset: None, current_position: gst::ClockTime::ZERO, running_time_utc_time_mapping: None, }); } if state.streams.is_empty() { gst::error!(CAT, imp: self, "No streams available"); return Err(gst::FlowError::Error); } // Sort video streams first and then audio streams and then metadata streams, and each group by pad name. state.streams.sort_by(|a, b| { let order_of_caps = |caps: &gst::CapsRef| { let s = caps.structure(0).unwrap(); if s.name().starts_with("video/") { 0 } else if s.name().starts_with("audio/") { 1 } else if s.name().starts_with("application/x-onvif-metadata") { 2 } else { unimplemented!(); } }; let st_a = order_of_caps(&a.caps); let st_b = order_of_caps(&b.caps); if st_a == st_b { return a.sinkpad.name().cmp(&b.sinkpad.name()); } st_a.cmp(&st_b) }); Ok(()) } fn update_header( &self, state: &mut State, settings: &Settings, at_eos: bool, ) -> Result, gst::FlowError> { let aggregator = self.obj(); let class = aggregator.class(); let variant = class.as_ref().variant; if settings.header_update_mode == super::HeaderUpdateMode::None && at_eos { return Ok(None); } assert!(!at_eos || state.streams.iter().all(|s| s.queued_gops.is_empty())); let duration = state .end_pts .opt_checked_sub(state.earliest_pts) .ok() .flatten(); let streams = state .streams .iter() .map(|s| super::HeaderStream { trak_timescale: s.sinkpad.imp().settings.lock().unwrap().trak_timescale, delta_frames: s.delta_frames, caps: s.caps.clone(), }) .collect::>(); let mut buffer = boxes::create_fmp4_header(super::HeaderConfiguration { variant, update: at_eos, movie_timescale: settings.movie_timescale, streams, write_mehd: settings.write_mehd, duration: if at_eos { duration } else { None }, start_utc_time: if variant == super::Variant::ONVIF { state .earliest_pts .map(|unix| unix.nseconds() / 100 + UNIX_1601_OFFSET * 10_000_000) } else { None }, }) .map_err(|err| { gst::error!(CAT, imp: self, "Failed to create FMP4 header: {}", err); gst::FlowError::Error })?; { let buffer = buffer.get_mut().unwrap(); // No timestamps // Header is DISCONT|HEADER buffer.set_flags(gst::BufferFlags::DISCONT | gst::BufferFlags::HEADER); } // Remember stream header for later state.stream_header = Some(buffer.clone()); let variant = match variant { super::Variant::ISO | super::Variant::DASH | super::Variant::ONVIF => "iso-fragmented", super::Variant::CMAF => "cmaf", }; let caps = gst::Caps::builder("video/quicktime") .field("variant", variant) .field("streamheader", gst::Array::new([&buffer])) .build(); let mut list = gst::BufferList::new_sized(1); { let list = list.get_mut().unwrap(); list.add(buffer); } Ok(Some((list, caps))) } } #[glib::object_subclass] impl ObjectSubclass for FMP4Mux { const NAME: &'static str = "GstFMP4Mux"; type Type = super::FMP4Mux; type ParentType = gst_base::Aggregator; type Class = Class; } impl ObjectImpl for FMP4Mux { fn properties() -> &'static [glib::ParamSpec] { static PROPERTIES: Lazy> = Lazy::new(|| { vec![ glib::ParamSpecUInt64::builder("fragment-duration") .nick("Fragment Duration") .blurb("Duration for each FMP4 fragment") .default_value(DEFAULT_FRAGMENT_DURATION.nseconds()) .mutable_ready() .build(), glib::ParamSpecUInt64::builder("chunk-duration") .nick("Chunk Duration") .blurb("Duration for each FMP4 chunk (default = no chunks)") .default_value(u64::MAX) .mutable_ready() .build(), glib::ParamSpecEnum::builder_with_default("header-update-mode", DEFAULT_HEADER_UPDATE_MODE) .nick("Header update mode") .blurb("Mode for updating the header at the end of the stream") .mutable_ready() .build(), glib::ParamSpecBoolean::builder("write-mfra") .nick("Write mfra box") .blurb("Write fragment random access box at the end of the stream") .default_value(DEFAULT_WRITE_MFRA) .mutable_ready() .build(), glib::ParamSpecBoolean::builder("write-mehd") .nick("Write mehd box") .blurb("Write movie extends header box with the duration at the end of the stream (needs a header-update-mode enabled)") .default_value(DEFAULT_WRITE_MFRA) .mutable_ready() .build(), glib::ParamSpecUInt64::builder("interleave-bytes") .nick("Interleave Bytes") .blurb("Interleave between streams in bytes") .default_value(DEFAULT_INTERLEAVE_BYTES.unwrap_or(0)) .mutable_ready() .build(), glib::ParamSpecUInt64::builder("interleave-time") .nick("Interleave Time") .blurb("Interleave between streams in nanoseconds") .default_value(DEFAULT_INTERLEAVE_TIME.map(gst::ClockTime::nseconds).unwrap_or(u64::MAX)) .mutable_ready() .build(), glib::ParamSpecUInt::builder("movie-timescale") .nick("Movie Timescale") .blurb("Timescale to use for the movie (units per second, 0 is automatic)") .mutable_ready() .build(), ] }); &PROPERTIES } fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) { match pspec.name() { "fragment-duration" => { let mut settings = self.settings.lock().unwrap(); let fragment_duration = value.get().expect("type checked upstream"); if settings.fragment_duration != fragment_duration { settings.fragment_duration = fragment_duration; let latency = settings .chunk_duration .unwrap_or(settings.fragment_duration); drop(settings); self.obj().set_latency(latency, None); } } "chunk-duration" => { let mut settings = self.settings.lock().unwrap(); let chunk_duration = value.get().expect("type checked upstream"); if settings.chunk_duration != chunk_duration { settings.chunk_duration = chunk_duration; let latency = settings .chunk_duration .unwrap_or(settings.fragment_duration); drop(settings); self.obj().set_latency(latency, None); } } "header-update-mode" => { let mut settings = self.settings.lock().unwrap(); settings.header_update_mode = value.get().expect("type checked upstream"); } "write-mfra" => { let mut settings = self.settings.lock().unwrap(); settings.write_mfra = value.get().expect("type checked upstream"); } "write-mehd" => { let mut settings = self.settings.lock().unwrap(); settings.write_mehd = value.get().expect("type checked upstream"); } "interleave-bytes" => { let mut settings = self.settings.lock().unwrap(); settings.interleave_bytes = match value.get().expect("type checked upstream") { 0 => None, v => Some(v), }; } "interleave-time" => { let mut settings = self.settings.lock().unwrap(); settings.interleave_time = match value.get().expect("type checked upstream") { Some(gst::ClockTime::ZERO) | None => None, v => v, }; } "movie-timescale" => { let mut settings = self.settings.lock().unwrap(); settings.movie_timescale = value.get().expect("type checked upstream"); } _ => unimplemented!(), } } fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value { match pspec.name() { "fragment-duration" => { let settings = self.settings.lock().unwrap(); settings.fragment_duration.to_value() } "chunk-duration" => { let settings = self.settings.lock().unwrap(); settings.chunk_duration.to_value() } "header-update-mode" => { let settings = self.settings.lock().unwrap(); settings.header_update_mode.to_value() } "write-mfra" => { let settings = self.settings.lock().unwrap(); settings.write_mfra.to_value() } "write-mehd" => { let settings = self.settings.lock().unwrap(); settings.write_mehd.to_value() } "interleave-bytes" => { let settings = self.settings.lock().unwrap(); settings.interleave_bytes.unwrap_or(0).to_value() } "interleave-time" => { let settings = self.settings.lock().unwrap(); settings.interleave_time.to_value() } "movie-timescale" => { let settings = self.settings.lock().unwrap(); settings.movie_timescale.to_value() } _ => unimplemented!(), } } fn constructed(&self) { self.parent_constructed(); let obj = self.obj(); let class = obj.class(); for templ in class.pad_template_list().into_iter().filter(|templ| { templ.presence() == gst::PadPresence::Always && templ.direction() == gst::PadDirection::Sink }) { let sinkpad = gst::PadBuilder::::from_template(&templ, Some("sink")) .flags(gst::PadFlags::ACCEPT_INTERSECT) .build(); obj.add_pad(&sinkpad).unwrap(); } obj.set_latency(Settings::default().fragment_duration, None); } } impl GstObjectImpl for FMP4Mux {} impl ElementImpl for FMP4Mux { fn request_new_pad( &self, templ: &gst::PadTemplate, name: Option<&str>, caps: Option<&gst::Caps>, ) -> Option { let state = self.state.lock().unwrap(); if state.stream_header.is_some() { gst::error!( CAT, imp: self, "Can't request new pads after header was generated" ); return None; } self.parent_request_new_pad(templ, name, caps) } } impl AggregatorImpl for FMP4Mux { fn next_time(&self) -> Option { let state = self.state.lock().unwrap(); state.chunk_start_pts.opt_add(state.timeout_delay) } fn sink_query( &self, aggregator_pad: &gst_base::AggregatorPad, query: &mut gst::QueryRef, ) -> bool { use gst::QueryViewMut; gst::trace!(CAT, obj: aggregator_pad, "Handling query {:?}", query); match query.view_mut() { QueryViewMut::Caps(q) => { let allowed_caps = aggregator_pad .current_caps() .unwrap_or_else(|| aggregator_pad.pad_template_caps()); if let Some(filter_caps) = q.filter() { let res = filter_caps .intersect_with_mode(&allowed_caps, gst::CapsIntersectMode::First); q.set_result(&res); } else { q.set_result(&allowed_caps); } true } _ => self.parent_sink_query(aggregator_pad, query), } } fn sink_event_pre_queue( &self, aggregator_pad: &gst_base::AggregatorPad, mut event: gst::Event, ) -> Result { use gst::EventView; gst::trace!(CAT, obj: aggregator_pad, "Handling event {:?}", event); match event.view() { EventView::Segment(ev) => { if ev.segment().format() != gst::Format::Time { gst::warning!( CAT, obj: aggregator_pad, "Received non-TIME segment, replacing with default TIME segment" ); let segment = gst::FormattedSegment::::new(); event = gst::event::Segment::builder(&segment) .seqnum(event.seqnum()) .build(); } self.parent_sink_event_pre_queue(aggregator_pad, event) } _ => self.parent_sink_event_pre_queue(aggregator_pad, event), } } fn sink_event(&self, aggregator_pad: &gst_base::AggregatorPad, event: gst::Event) -> bool { use gst::EventView; gst::trace!(CAT, obj: aggregator_pad, "Handling event {:?}", event); match event.view() { EventView::Segment(ev) => { // Already fixed-up above to always be a TIME segment let segment = ev .segment() .clone() .downcast::() .expect("non-TIME segment"); gst::info!(CAT, obj: aggregator_pad, "Received segment {:?}", segment); // Only forward the segment event verbatim if this is a single stream variant. // Otherwise we have to produce a default segment and re-timestamp all buffers // with their running time. let aggregator = self.obj(); let class = aggregator.class(); if class.as_ref().variant.is_single_stream() { aggregator.update_segment(&segment); } self.parent_sink_event(aggregator_pad, event) } EventView::Tag(_ev) => { // TODO: Maybe store for putting into the headers of the next fragment? self.parent_sink_event(aggregator_pad, event) } _ => self.parent_sink_event(aggregator_pad, event), } } fn src_query(&self, query: &mut gst::QueryRef) -> bool { use gst::QueryViewMut; gst::trace!(CAT, imp: self, "Handling query {:?}", query); match query.view_mut() { QueryViewMut::Seeking(q) => { // We can't really handle seeking, it would break everything q.set(false, gst::ClockTime::ZERO, gst::ClockTime::NONE); true } _ => self.parent_src_query(query), } } fn src_event(&self, event: gst::Event) -> bool { use gst::EventView; gst::trace!(CAT, imp: self, "Handling event {:?}", event); match event.view() { EventView::Seek(_ev) => false, _ => self.parent_src_event(event), } } fn flush(&self) -> Result { let mut state = self.state.lock().unwrap(); for stream in &mut state.streams { stream.queued_gops.clear(); stream.dts_offset = None; stream.current_position = gst::ClockTime::ZERO; stream.fragment_filled = false; stream.pre_queue.clear(); stream.running_time_utc_time_mapping = None; } state.current_offset = 0; state.fragment_offsets.clear(); drop(state); self.parent_flush() } fn stop(&self) -> Result<(), gst::ErrorMessage> { gst::trace!(CAT, imp: self, "Stopping"); let _ = self.parent_stop(); *self.state.lock().unwrap() = State::default(); Ok(()) } fn start(&self) -> Result<(), gst::ErrorMessage> { gst::trace!(CAT, imp: self, "Starting"); self.parent_start()?; // For non-single-stream variants configure a default segment that allows for negative // DTS so that we can correctly re-timestamp buffers with their running times. let aggregator = self.obj(); let class = aggregator.class(); if !class.as_ref().variant.is_single_stream() { let mut segment = gst::FormattedSegment::::new(); segment.set_start(SEGMENT_OFFSET); segment.set_position(SEGMENT_OFFSET); aggregator.update_segment(&segment); } *self.state.lock().unwrap() = State::default(); Ok(()) } fn negotiate(&self) -> bool { true } fn aggregate(&self, timeout: bool) -> Result { let settings = self.settings.lock().unwrap().clone(); let mut upstream_events = vec![]; let all_eos; let mut caps = None; let mut buffers = vec![]; { let mut state = self.state.lock().unwrap(); // Create streams if state.streams.is_empty() { self.create_streams(&mut state)?; } // Queue buffers from all streams that are not filled for the current fragment yet // // Always take a buffer from the stream with the earliest queued buffer to keep the // fill-level at all sinkpads in sync. let fragment_start_pts = state.fragment_start_pts; let chunk_start_pts = state.chunk_start_pts; while let Some((idx, stream)) = self.find_earliest_stream(&mut state, timeout, settings.fragment_duration)? { let pre_queued_buffer = Self::pop_buffer( self, &stream.sinkpad, &mut stream.pre_queue, &stream.running_time_utc_time_mapping, ); // Queue up the buffer and update GOP tracking state self.queue_gops(idx, stream, pre_queued_buffer)?; // Check if this stream is filled enough now. self.check_stream_filled( &settings, stream, fragment_start_pts, chunk_start_pts, false, ); } all_eos = state.streams.iter().all(|stream| stream.sinkpad.is_eos()); if all_eos { gst::debug!(CAT, imp: self, "All streams are EOS now"); for stream in &mut state.streams { // Check if this stream is filled enough now that everything is EOS. self.check_stream_filled( &settings, stream, fragment_start_pts, chunk_start_pts, true, ); } } // Calculate the earliest PTS, i.e. the start of the first fragment, if not known yet. self.calculate_earliest_pts( &settings, &mut state, &mut upstream_events, all_eos, timeout, ); // Loop as long as new chunks can be drained. // Only the first iteration is considered a timeout. let mut timeout = timeout; loop { // If enough GOPs were queued, drain and create the output fragment or chunk let res = self.drain( &mut state, &settings, timeout, all_eos, &mut upstream_events, ); let mut buffer_list = match res { Ok((new_caps, buffer_list)) => { if caps.is_none() { caps = new_caps; } buffer_list } Err(err) => { if err == gst_base::AGGREGATOR_FLOW_NEED_DATA { assert!(!all_eos); assert!(timeout); gst::element_imp_warning!( self, gst::StreamError::Format, ["Longer GOPs than fragment duration"] ); state.timeout_delay += 1.seconds(); } // Although we had an error, push out everything that was produced so far drop(state); for (sinkpad, event) in upstream_events { sinkpad.push_event(event); } if let Some(caps) = caps { gst::debug!(CAT, imp: self, "Setting caps on source pad: {:?}", caps); self.obj().set_src_caps(&caps); } for buffer_list in buffers { gst::trace!(CAT, imp: self, "Pushing buffer list {:?}", buffer_list); self.obj().finish_buffer_list(buffer_list)?; } return Err(err); } }; // If nothing can't be drained anymore then break the loop, and if all streams are // EOS do EOS handling. if buffer_list.is_none() { if settings.write_mfra && all_eos { gst::debug!(CAT, imp: self, "Writing mfra box"); match boxes::create_mfra(&state.streams[0].caps, &state.fragment_offsets) { Ok(mut mfra) => { { let mfra = mfra.get_mut().unwrap(); // mfra is DELTA_UNIT like other buffers mfra.set_flags(gst::BufferFlags::DELTA_UNIT); } if buffer_list.is_none() { buffer_list = Some(gst::BufferList::new_sized(1)); } buffer_list.as_mut().unwrap().get_mut().unwrap().add(mfra); buffers.extend(buffer_list); } Err(err) => { gst::error!(CAT, imp: self, "Failed to create mfra box: {}", err); } } } break; } // Otherwise extend the list of bufferlists and check again if something can be // drained. buffers.extend(buffer_list); timeout = false; let fragment_start_pts = state.fragment_start_pts; let chunk_start_pts = state.chunk_start_pts; for stream in &mut state.streams { // Check if this stream is still filled enough now. self.check_stream_filled( &settings, stream, fragment_start_pts, chunk_start_pts, all_eos, ); } } } for (sinkpad, event) in upstream_events { sinkpad.push_event(event); } if let Some(caps) = caps { gst::debug!(CAT, imp: self, "Setting caps on source pad: {:?}", caps); self.obj().set_src_caps(&caps); } for buffer_list in buffers { gst::trace!(CAT, imp: self, "Pushing buffer list {:?}", buffer_list); self.obj().finish_buffer_list(buffer_list)?; } if !all_eos { return Ok(gst::FlowSuccess::Ok); } // Do remaining EOS handling after the end of the stream was pushed. gst::debug!(CAT, imp: self, "Doing EOS handling"); if settings.header_update_mode != super::HeaderUpdateMode::None { let updated_header = self.update_header(&mut self.state.lock().unwrap(), &settings, true); match updated_header { Ok(Some((buffer_list, caps))) => { match settings.header_update_mode { super::HeaderUpdateMode::None => unreachable!(), super::HeaderUpdateMode::Rewrite => { let mut q = gst::query::Seeking::new(gst::Format::Bytes); if self.obj().src_pad().peer_query(&mut q) && q.result().0 { let aggregator = self.obj(); aggregator.set_src_caps(&caps); // Seek to the beginning with a default bytes segment aggregator .update_segment( &gst::FormattedSegment::::new(), ); if let Err(err) = aggregator.finish_buffer_list(buffer_list) { gst::error!( CAT, imp: self, "Failed pushing updated header buffer downstream: {:?}", err, ); } } else { gst::error!( CAT, imp: self, "Can't rewrite header because downstream is not seekable" ); } } super::HeaderUpdateMode::Update => { let aggregator = self.obj(); aggregator.set_src_caps(&caps); if let Err(err) = aggregator.finish_buffer_list(buffer_list) { gst::error!( CAT, imp: self, "Failed pushing updated header buffer downstream: {:?}", err, ); } } } } Ok(None) => {} Err(err) => { gst::error!( CAT, imp: self, "Failed to generate updated header: {:?}", err ); } } } // Need to output new headers if started again after EOS self.state.lock().unwrap().sent_headers = false; Err(gst::FlowError::Eos) } } #[repr(C)] pub(crate) struct Class { parent: gst_base::ffi::GstAggregatorClass, variant: super::Variant, } unsafe impl ClassStruct for Class { type Type = FMP4Mux; } impl std::ops::Deref for Class { type Target = glib::Class; fn deref(&self) -> &Self::Target { unsafe { &*(&self.parent as *const _ as *const _) } } } unsafe impl IsSubclassable for super::FMP4Mux { fn class_init(class: &mut glib::Class) { Self::parent_class_init::(class); let class = class.as_mut(); class.variant = T::VARIANT; } } pub(crate) trait FMP4MuxImpl: AggregatorImpl { const VARIANT: super::Variant; } #[derive(Default)] pub(crate) struct ISOFMP4Mux; #[glib::object_subclass] impl ObjectSubclass for ISOFMP4Mux { const NAME: &'static str = "GstISOFMP4Mux"; type Type = super::ISOFMP4Mux; type ParentType = super::FMP4Mux; } impl ObjectImpl for ISOFMP4Mux { fn properties() -> &'static [glib::ParamSpec] { static PROPERTIES: Lazy> = Lazy::new(|| { vec![glib::ParamSpecBoolean::builder("offset-to-zero") .nick("Offset to Zero") .blurb("Offsets all streams so that the earliest stream starts at 0") .mutable_ready() .build()] }); &PROPERTIES } fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value { let obj = self.obj(); let fmp4mux = obj.upcast_ref::().imp(); match pspec.name() { "offset-to-zero" => { let settings = fmp4mux.settings.lock().unwrap(); settings.offset_to_zero.to_value() } _ => unimplemented!(), } } fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) { let obj = self.obj(); let fmp4mux = obj.upcast_ref::().imp(); match pspec.name() { "offset-to-zero" => { let mut settings = fmp4mux.settings.lock().unwrap(); settings.offset_to_zero = value.get().expect("type checked upstream"); } _ => unimplemented!(), } } } impl GstObjectImpl for ISOFMP4Mux {} impl ElementImpl for ISOFMP4Mux { fn metadata() -> Option<&'static gst::subclass::ElementMetadata> { static ELEMENT_METADATA: Lazy = Lazy::new(|| { gst::subclass::ElementMetadata::new( "ISOFMP4Mux", "Codec/Muxer", "ISO fragmented MP4 muxer", "Sebastian Dröge ", ) }); Some(&*ELEMENT_METADATA) } fn pad_templates() -> &'static [gst::PadTemplate] { static PAD_TEMPLATES: Lazy> = Lazy::new(|| { let src_pad_template = gst::PadTemplate::new( "src", gst::PadDirection::Src, gst::PadPresence::Always, &gst::Caps::builder("video/quicktime") .field("variant", "iso-fragmented") .build(), ) .unwrap(); let sink_pad_template = gst::PadTemplate::with_gtype( "sink_%u", gst::PadDirection::Sink, gst::PadPresence::Request, &[ gst::Structure::builder("video/x-h264") .field("stream-format", gst::List::new(["avc", "avc3"])) .field("alignment", "au") .field("width", gst::IntRange::new(1, u16::MAX as i32)) .field("height", gst::IntRange::new(1, u16::MAX as i32)) .build(), gst::Structure::builder("video/x-h265") .field("stream-format", gst::List::new(["hvc1", "hev1"])) .field("alignment", "au") .field("width", gst::IntRange::new(1, u16::MAX as i32)) .field("height", gst::IntRange::new(1, u16::MAX as i32)) .build(), gst::Structure::builder("video/x-vp9") .field("profile", gst::List::new(["0", "1", "2", "3"])) .field("chroma-format", gst::List::new(["4:2:0", "4:2:2", "4:4:4"])) .field("bit-depth-luma", gst::List::new([8u32, 10u32, 12u32])) .field("bit-depth-chroma", gst::List::new([8u32, 10u32, 12u32])) .field("width", gst::IntRange::new(1, u16::MAX as i32)) .field("height", gst::IntRange::new(1, u16::MAX as i32)) .build(), gst::Structure::builder("audio/mpeg") .field("mpegversion", 4i32) .field("stream-format", "raw") .field("channels", gst::IntRange::new(1, u16::MAX as i32)) .field("rate", gst::IntRange::new(1, i32::MAX)) .build(), gst::Structure::builder("audio/x-opus") .field("channel-mapping-family", gst::IntRange::new(0i32, 255)) .field("channels", gst::IntRange::new(1i32, 8)) .field("rate", gst::IntRange::new(1, i32::MAX)) .build(), ] .into_iter() .collect::(), super::FMP4MuxPad::static_type(), ) .unwrap(); vec![src_pad_template, sink_pad_template] }); PAD_TEMPLATES.as_ref() } } impl AggregatorImpl for ISOFMP4Mux {} impl FMP4MuxImpl for ISOFMP4Mux { const VARIANT: super::Variant = super::Variant::ISO; } #[derive(Default)] pub(crate) struct CMAFMux; #[glib::object_subclass] impl ObjectSubclass for CMAFMux { const NAME: &'static str = "GstCMAFMux"; type Type = super::CMAFMux; type ParentType = super::FMP4Mux; } impl ObjectImpl for CMAFMux {} impl GstObjectImpl for CMAFMux {} impl ElementImpl for CMAFMux { fn metadata() -> Option<&'static gst::subclass::ElementMetadata> { static ELEMENT_METADATA: Lazy = Lazy::new(|| { gst::subclass::ElementMetadata::new( "CMAFMux", "Codec/Muxer", "CMAF fragmented MP4 muxer", "Sebastian Dröge ", ) }); Some(&*ELEMENT_METADATA) } fn pad_templates() -> &'static [gst::PadTemplate] { static PAD_TEMPLATES: Lazy> = Lazy::new(|| { let src_pad_template = gst::PadTemplate::new( "src", gst::PadDirection::Src, gst::PadPresence::Always, &gst::Caps::builder("video/quicktime") .field("variant", "cmaf") .build(), ) .unwrap(); let sink_pad_template = gst::PadTemplate::with_gtype( "sink", gst::PadDirection::Sink, gst::PadPresence::Always, &[ gst::Structure::builder("video/x-h264") .field("stream-format", gst::List::new(["avc", "avc3"])) .field("alignment", "au") .field("width", gst::IntRange::new(1, u16::MAX as i32)) .field("height", gst::IntRange::new(1, u16::MAX as i32)) .build(), gst::Structure::builder("video/x-h265") .field("stream-format", gst::List::new(["hvc1", "hev1"])) .field("alignment", "au") .field("width", gst::IntRange::new(1, u16::MAX as i32)) .field("height", gst::IntRange::new(1, u16::MAX as i32)) .build(), gst::Structure::builder("audio/mpeg") .field("mpegversion", 4i32) .field("stream-format", "raw") .field("channels", gst::IntRange::new(1, u16::MAX as i32)) .field("rate", gst::IntRange::new(1, i32::MAX)) .build(), ] .into_iter() .collect::(), super::FMP4MuxPad::static_type(), ) .unwrap(); vec![src_pad_template, sink_pad_template] }); PAD_TEMPLATES.as_ref() } } impl AggregatorImpl for CMAFMux {} impl FMP4MuxImpl for CMAFMux { const VARIANT: super::Variant = super::Variant::CMAF; } #[derive(Default)] pub(crate) struct DASHMP4Mux; #[glib::object_subclass] impl ObjectSubclass for DASHMP4Mux { const NAME: &'static str = "GstDASHMP4Mux"; type Type = super::DASHMP4Mux; type ParentType = super::FMP4Mux; } impl ObjectImpl for DASHMP4Mux {} impl GstObjectImpl for DASHMP4Mux {} impl ElementImpl for DASHMP4Mux { fn metadata() -> Option<&'static gst::subclass::ElementMetadata> { static ELEMENT_METADATA: Lazy = Lazy::new(|| { gst::subclass::ElementMetadata::new( "DASHMP4Mux", "Codec/Muxer", "DASH fragmented MP4 muxer", "Sebastian Dröge ", ) }); Some(&*ELEMENT_METADATA) } fn pad_templates() -> &'static [gst::PadTemplate] { static PAD_TEMPLATES: Lazy> = Lazy::new(|| { let src_pad_template = gst::PadTemplate::new( "src", gst::PadDirection::Src, gst::PadPresence::Always, &gst::Caps::builder("video/quicktime") .field("variant", "iso-fragmented") .build(), ) .unwrap(); let sink_pad_template = gst::PadTemplate::with_gtype( "sink", gst::PadDirection::Sink, gst::PadPresence::Always, &[ gst::Structure::builder("video/x-h264") .field("stream-format", gst::List::new(["avc", "avc3"])) .field("alignment", "au") .field("width", gst::IntRange::::new(1, u16::MAX as i32)) .field("height", gst::IntRange::::new(1, u16::MAX as i32)) .build(), gst::Structure::builder("video/x-h265") .field("stream-format", gst::List::new(["hvc1", "hev1"])) .field("alignment", "au") .field("width", gst::IntRange::::new(1, u16::MAX as i32)) .field("height", gst::IntRange::::new(1, u16::MAX as i32)) .build(), gst::Structure::builder("video/x-vp9") .field("profile", gst::List::new(["0", "1", "2", "3"])) .field("chroma-format", gst::List::new(["4:2:0", "4:2:2", "4:4:4"])) .field("bit-depth-luma", gst::List::new([8u32, 10u32, 12u32])) .field("bit-depth-chroma", gst::List::new([8u32, 10u32, 12u32])) .field("width", gst::IntRange::new(1, u16::MAX as i32)) .field("height", gst::IntRange::new(1, u16::MAX as i32)) .build(), gst::Structure::builder("audio/mpeg") .field("mpegversion", 4i32) .field("stream-format", "raw") .field("channels", gst::IntRange::::new(1, u16::MAX as i32)) .field("rate", gst::IntRange::::new(1, i32::MAX)) .build(), gst::Structure::builder("audio/x-opus") .field("channel-mapping-family", gst::IntRange::new(0i32, 255)) .field("channels", gst::IntRange::new(1i32, 8)) .field("rate", gst::IntRange::new(1, i32::MAX)) .build(), ] .into_iter() .collect::(), super::FMP4MuxPad::static_type(), ) .unwrap(); vec![src_pad_template, sink_pad_template] }); PAD_TEMPLATES.as_ref() } } impl AggregatorImpl for DASHMP4Mux {} impl FMP4MuxImpl for DASHMP4Mux { const VARIANT: super::Variant = super::Variant::DASH; } #[derive(Default)] pub(crate) struct ONVIFFMP4Mux; #[glib::object_subclass] impl ObjectSubclass for ONVIFFMP4Mux { const NAME: &'static str = "GstONVIFFMP4Mux"; type Type = super::ONVIFFMP4Mux; type ParentType = super::FMP4Mux; } impl ObjectImpl for ONVIFFMP4Mux {} impl GstObjectImpl for ONVIFFMP4Mux {} impl ElementImpl for ONVIFFMP4Mux { fn metadata() -> Option<&'static gst::subclass::ElementMetadata> { static ELEMENT_METADATA: Lazy = Lazy::new(|| { gst::subclass::ElementMetadata::new( "ONVIFFMP4Mux", "Codec/Muxer", "ONVIF fragmented MP4 muxer", "Sebastian Dröge ", ) }); Some(&*ELEMENT_METADATA) } fn pad_templates() -> &'static [gst::PadTemplate] { static PAD_TEMPLATES: Lazy> = Lazy::new(|| { let src_pad_template = gst::PadTemplate::new( "src", gst::PadDirection::Src, gst::PadPresence::Always, &gst::Caps::builder("video/quicktime") .field("variant", "iso-fragmented") .build(), ) .unwrap(); let sink_pad_template = gst::PadTemplate::with_gtype( "sink_%u", gst::PadDirection::Sink, gst::PadPresence::Request, &[ gst::Structure::builder("video/x-h264") .field("stream-format", gst::List::new(["avc", "avc3"])) .field("alignment", "au") .field("width", gst::IntRange::::new(1, u16::MAX as i32)) .field("height", gst::IntRange::::new(1, u16::MAX as i32)) .build(), gst::Structure::builder("video/x-h265") .field("stream-format", gst::List::new(["hvc1", "hev1"])) .field("alignment", "au") .field("width", gst::IntRange::::new(1, u16::MAX as i32)) .field("height", gst::IntRange::::new(1, u16::MAX as i32)) .build(), gst::Structure::builder("image/jpeg") .field("width", gst::IntRange::::new(1, u16::MAX as i32)) .field("height", gst::IntRange::::new(1, u16::MAX as i32)) .build(), gst::Structure::builder("audio/mpeg") .field("mpegversion", 4i32) .field("stream-format", "raw") .field("channels", gst::IntRange::::new(1, u16::MAX as i32)) .field("rate", gst::IntRange::::new(1, i32::MAX)) .build(), gst::Structure::builder("audio/x-alaw") .field("channels", gst::IntRange::::new(1, 2)) .field("rate", gst::IntRange::::new(1, i32::MAX)) .build(), gst::Structure::builder("audio/x-mulaw") .field("channels", gst::IntRange::::new(1, 2)) .field("rate", gst::IntRange::::new(1, i32::MAX)) .build(), gst::Structure::builder("audio/x-adpcm") .field("layout", "g726") .field("channels", 1i32) .field("rate", 8000i32) .field("bitrate", gst::List::new([16000i32, 24000, 32000, 40000])) .build(), gst::Structure::builder("application/x-onvif-metadata") .field("parsed", true) .build(), ] .into_iter() .collect::(), super::FMP4MuxPad::static_type(), ) .unwrap(); vec![src_pad_template, sink_pad_template] }); PAD_TEMPLATES.as_ref() } } impl AggregatorImpl for ONVIFFMP4Mux {} impl FMP4MuxImpl for ONVIFFMP4Mux { const VARIANT: super::Variant = super::Variant::ONVIF; } #[derive(Default, Clone)] struct PadSettings { trak_timescale: u32, } #[derive(Default)] pub(crate) struct FMP4MuxPad { settings: Mutex, } #[glib::object_subclass] impl ObjectSubclass for FMP4MuxPad { const NAME: &'static str = "GstFMP4MuxPad"; type Type = super::FMP4MuxPad; type ParentType = gst_base::AggregatorPad; } impl ObjectImpl for FMP4MuxPad { fn properties() -> &'static [glib::ParamSpec] { static PROPERTIES: Lazy> = Lazy::new(|| { vec![glib::ParamSpecUInt::builder("trak-timescale") .nick("Track Timescale") .blurb("Timescale to use for the track (units per second, 0 is automatic)") .mutable_ready() .build()] }); &PROPERTIES } fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) { match pspec.name() { "trak-timescale" => { let mut settings = self.settings.lock().unwrap(); settings.trak_timescale = value.get().expect("type checked upstream"); } _ => unimplemented!(), } } fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value { match pspec.name() { "trak-timescale" => { let settings = self.settings.lock().unwrap(); settings.trak_timescale.to_value() } _ => unimplemented!(), } } } impl GstObjectImpl for FMP4MuxPad {} impl PadImpl for FMP4MuxPad {} impl AggregatorPadImpl for FMP4MuxPad { fn flush(&self, aggregator: &gst_base::Aggregator) -> Result { let mux = aggregator.downcast_ref::().unwrap(); let mut mux_state = mux.imp().state.lock().unwrap(); for stream in &mut mux_state.streams { if stream.sinkpad == *self.obj() { stream.queued_gops.clear(); stream.dts_offset = None; stream.current_position = gst::ClockTime::ZERO; stream.fragment_filled = false; stream.pre_queue.clear(); stream.running_time_utc_time_mapping = None; break; } } drop(mux_state); self.parent_flush(aggregator) } }