// Copyright (C) 2022 Sebastian Dröge // // This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. // If a copy of the MPL was not distributed with this file, You can obtain one at // . // // SPDX-License-Identifier: MPL-2.0 use gst::glib; use gst::prelude::*; use gst::subclass::prelude::*; use gst_base::prelude::*; use gst_base::subclass::prelude::*; use std::collections::VecDeque; use std::sync::Mutex; use once_cell::sync::Lazy; use super::boxes; /// Offset between NTP and UNIX epoch in seconds. /// NTP = UNIX + NTP_UNIX_OFFSET. const NTP_UNIX_OFFSET: u64 = 2_208_988_800; /// Reference timestamp meta caps for NTP timestamps. static NTP_CAPS: Lazy = Lazy::new(|| gst::Caps::builder("timestamp/x-ntp").build()); /// Reference timestamp meta caps for UNIX timestamps. static UNIX_CAPS: Lazy = Lazy::new(|| gst::Caps::builder("timestamp/x-unix").build()); /// Returns the UTC time of the buffer in the UNIX epoch. fn get_utc_time_from_buffer(buffer: &gst::BufferRef) -> Option { buffer .iter_meta::() .find_map(|meta| { if meta.reference().can_intersect(&UNIX_CAPS) { Some(meta.timestamp()) } else if meta.reference().can_intersect(&NTP_CAPS) { meta.timestamp().checked_sub(NTP_UNIX_OFFSET.seconds()) } else { None } }) } /// Converts a running time to an UTC time. fn running_time_to_utc_time( running_time: impl Into>, running_time_utc_time_mapping: ( impl Into>, impl Into>, ), ) -> Option { running_time_utc_time_mapping .1 .into() .checked_sub(running_time_utc_time_mapping.0.into()) .and_then(|res| res.checked_add(running_time.into())) .and_then(|res| res.positive()) } static CAT: Lazy = Lazy::new(|| { gst::DebugCategory::new( "mp4mux", gst::DebugColorFlags::empty(), Some("MP4Mux Element"), ) }); const DEFAULT_INTERLEAVE_BYTES: Option = None; const DEFAULT_INTERLEAVE_TIME: Option = Some(gst::ClockTime::from_mseconds(500)); #[derive(Debug, Clone)] struct Settings { interleave_bytes: Option, interleave_time: Option, movie_timescale: u32, } impl Default for Settings { fn default() -> Self { Settings { interleave_bytes: DEFAULT_INTERLEAVE_BYTES, interleave_time: DEFAULT_INTERLEAVE_TIME, movie_timescale: 0, } } } #[derive(Debug)] struct PendingBuffer { buffer: gst::Buffer, timestamp: gst::Signed, pts: gst::ClockTime, composition_time_offset: Option, duration: Option, } struct Stream { /// Sink pad for this stream. sinkpad: super::MP4MuxPad, /// Pre-queue for ONVIF variant to timestamp all buffers with their UTC time. pre_queue: VecDeque<(gst::FormattedSegment, gst::Buffer)>, /// Currently configured caps for this stream. caps: gst::Caps, /// Whether this stream is intra-only and has frame reordering. delta_frames: super::DeltaFrames, /// Already written out chunks with their samples for this stream chunks: Vec, /// Queued time in the latest chunk. queued_chunk_time: gst::ClockTime, /// Queue bytes in the latest chunk. queued_chunk_bytes: u64, /// Currently pending buffer, DTS or PTS running time and duration /// /// If the duration is set then the next buffer is already queued up and the duration was /// calculated based on that. pending_buffer: Option, /// Start DTS. start_dts: Option>, /// Earliest PTS. earliest_pts: Option, /// Current end PTS. end_pts: Option, /// In ONVIF mode, the mapping between running time and UTC time (UNIX) running_time_utc_time_mapping: Option<(gst::Signed, gst::ClockTime)>, } #[derive(Default)] struct State { /// List of streams when the muxer was started. streams: Vec, /// Index of stream that is currently selected to fill a chunk. current_stream_idx: Option, /// Current writing offset since the beginning of the stream. current_offset: u64, /// Offset of the `mdat` box from the beginning of the stream. mdat_offset: Option, /// Size of the `mdat` as written so far. mdat_size: u64, } #[derive(Default)] pub(crate) struct MP4Mux { state: Mutex, settings: Mutex, } impl MP4Mux { /// Checks if a buffer is valid according to the stream configuration. fn check_buffer( buffer: &gst::BufferRef, sinkpad: &super::MP4MuxPad, delta_frames: super::DeltaFrames, ) -> Result<(), gst::FlowError> { if delta_frames.requires_dts() && buffer.dts().is_none() { gst::error!(CAT, obj: sinkpad, "Require DTS for video streams"); return Err(gst::FlowError::Error); } if buffer.pts().is_none() { gst::error!(CAT, obj: sinkpad, "Require timestamped buffers"); return Err(gst::FlowError::Error); } if delta_frames.intra_only() && buffer.flags().contains(gst::BufferFlags::DELTA_UNIT) { gst::error!(CAT, obj: sinkpad, "Intra-only stream with delta units"); return Err(gst::FlowError::Error); } Ok(()) } fn peek_buffer( &self, sinkpad: &super::MP4MuxPad, delta_frames: super::DeltaFrames, pre_queue: &mut VecDeque<(gst::FormattedSegment, gst::Buffer)>, running_time_utc_time_mapping: &Option<(gst::Signed, gst::ClockTime)>, ) -> Result, gst::Buffer)>, gst::FlowError> { if let Some((segment, buffer)) = pre_queue.front() { return Ok(Some((segment.clone(), buffer.clone()))); } let mut buffer = match sinkpad.peek_buffer() { None => return Ok(None), Some(buffer) => buffer, }; Self::check_buffer(&buffer, sinkpad, delta_frames)?; let mut segment = match sinkpad.segment().downcast::().ok() { Some(segment) => segment, None => { gst::error!(CAT, obj: sinkpad, "Got buffer before segment"); return Err(gst::FlowError::Error); } }; // For ONVIF we need to re-timestamp the buffer with its UTC time. // We can only possibly end up here after the running-time UTC mapping is known. // // After re-timestamping, put the buffer into the pre-queue so re-timestamping only has to // happen once. if self.obj().class().as_ref().variant == super::Variant::ONVIF { let running_time_utc_time_mapping = running_time_utc_time_mapping.unwrap(); let pts_position = buffer.pts().unwrap(); let dts_position = buffer.dts(); let pts = segment.to_running_time_full(pts_position).unwrap(); let dts = dts_position .map(|dts_position| segment.to_running_time_full(dts_position).unwrap()); let utc_time = match get_utc_time_from_buffer(&buffer) { None => { // Calculate from the mapping running_time_to_utc_time(pts, running_time_utc_time_mapping).ok_or_else( || { gst::error!(CAT, obj: sinkpad, "Stream has negative PTS UTC time"); gst::FlowError::Error }, )? } Some(utc_time) => utc_time, }; gst::trace!( CAT, obj: sinkpad, "Mapped PTS running time {pts} to UTC time {utc_time}" ); { let buffer = buffer.make_mut(); buffer.set_pts(utc_time); if let Some(dts) = dts { let dts_utc_time = running_time_to_utc_time(dts, (pts, utc_time)).ok_or_else(|| { gst::error!(CAT, obj: sinkpad, "Stream has negative DTS UTC time"); gst::FlowError::Error })?; gst::trace!( CAT, obj: sinkpad, "Mapped DTS running time {dts} to UTC time {dts_utc_time}" ); buffer.set_dts(dts_utc_time); } } segment = gst::FormattedSegment::default(); // Drop current buffer as it is now queued sinkpad.drop_buffer(); pre_queue.push_back((segment.clone(), buffer.clone())); } Ok(Some((segment, buffer))) } fn pop_buffer( &self, sinkpad: &super::MP4MuxPad, delta_frames: super::DeltaFrames, pre_queue: &mut VecDeque<(gst::FormattedSegment, gst::Buffer)>, running_time_utc_time_mapping: &mut Option<(gst::Signed, gst::ClockTime)>, ) -> Result, gst::Buffer)>, gst::FlowError> { // In ONVIF mode we need to get UTC times for each buffer and synchronize based on that. // Queue up to 6s of data to get the first UTC time and then backdate. if self.obj().class().as_ref().variant == super::Variant::ONVIF && running_time_utc_time_mapping.is_none() { if let Some((last, first)) = Option::zip(pre_queue.back(), pre_queue.front()) { // Existence of PTS/DTS checked below let (last, first) = if delta_frames.requires_dts() { ( last.0.to_running_time_full(last.1.dts()).unwrap(), first.0.to_running_time_full(first.1.dts()).unwrap(), ) } else { ( last.0.to_running_time_full(last.1.pts()).unwrap(), first.0.to_running_time_full(first.1.pts()).unwrap(), ) }; if last.saturating_sub(first) > gst::Signed::Positive(gst::ClockTime::from_seconds(6)) { gst::error!( CAT, obj: sinkpad, "Got no UTC time in the first 6s of the stream" ); return Err(gst::FlowError::Error); } } let buffer = match sinkpad.pop_buffer() { None => { if sinkpad.is_eos() { gst::error!(CAT, obj: sinkpad, "Got no UTC time before EOS"); return Err(gst::FlowError::Error); } else { return Err(gst_base::AGGREGATOR_FLOW_NEED_DATA); } } Some(buffer) => buffer, }; Self::check_buffer(&buffer, sinkpad, delta_frames)?; let segment = match sinkpad.segment().downcast::().ok() { Some(segment) => segment, None => { gst::error!(CAT, obj: sinkpad, "Got buffer before segment"); return Err(gst::FlowError::Error); } }; let utc_time = match get_utc_time_from_buffer(&buffer) { Some(utc_time) => utc_time, None => { pre_queue.push_back((segment, buffer)); return Err(gst_base::AGGREGATOR_FLOW_NEED_DATA); } }; let running_time = segment.to_running_time_full(buffer.pts().unwrap()).unwrap(); gst::info!( CAT, obj: sinkpad, "Got initial UTC time {utc_time} at PTS running time {running_time}", ); let mapping = (running_time, utc_time); *running_time_utc_time_mapping = Some(mapping); // Push the buffer onto the pre-queue and re-timestamp it and all other buffers // based on the mapping above. pre_queue.push_back((segment, buffer)); for (segment, buffer) in pre_queue.iter_mut() { let buffer = buffer.make_mut(); let pts = segment.to_running_time_full(buffer.pts().unwrap()).unwrap(); let pts_utc_time = running_time_to_utc_time(pts, mapping).ok_or_else(|| { gst::error!(CAT, obj: sinkpad, "Stream has negative PTS UTC time"); gst::FlowError::Error })?; gst::trace!( CAT, obj: sinkpad, "Mapped PTS running time {pts} to UTC time {pts_utc_time}" ); buffer.set_pts(pts_utc_time); if let Some(dts) = buffer.dts() { let dts = segment.to_running_time_full(dts).unwrap(); let dts_utc_time = running_time_to_utc_time(dts, mapping).ok_or_else(|| { gst::error!(CAT, obj: sinkpad, "Stream has negative DTS UTC time"); gst::FlowError::Error })?; gst::trace!( CAT, obj: sinkpad, "Mapped DTS running time {dts} to UTC time {dts_utc_time}" ); buffer.set_dts(dts_utc_time); } *segment = gst::FormattedSegment::default(); } // Fall through below and pop the first buffer finally } if let Some((segment, buffer)) = pre_queue.pop_front() { return Ok(Some((segment, buffer))); } // If the mapping is set, then we would get the buffer always from the pre-queue: // - either it was set before already, in which case the next buffer would've been peeked // for calculating the duration to the previous buffer, and then put into the pre-queue // - or this is the very first buffer and we just put it into the queue overselves above if self.obj().class().as_ref().variant == super::Variant::ONVIF { if sinkpad.is_eos() { return Ok(None); } unreachable!(); } let buffer = match sinkpad.pop_buffer() { None => return Ok(None), Some(buffer) => buffer, }; Self::check_buffer(&buffer, sinkpad, delta_frames)?; let segment = match sinkpad.segment().downcast::().ok() { Some(segment) => segment, None => { gst::error!(CAT, obj: sinkpad, "Got buffer before segment"); return Err(gst::FlowError::Error); } }; Ok(Some((segment, buffer))) } /// Queue a buffer and calculate its duration. /// /// Returns `Ok(())` if a buffer with duration is known or if the stream is EOS and a buffer is /// queued, i.e. if this stream is ready to be processed. /// /// Returns `Err(Eos)` if nothing is queued and the stream is EOS. /// /// Returns `Err(AGGREGATOR_FLOW_NEED_DATA)` if more data is needed. /// /// Returns `Err(Error)` on errors. fn queue_buffer(&self, stream: &mut Stream) -> Result<(), gst::FlowError> { // Loop up to two times here to first retrieve the current buffer and then potentially // already calculate its duration based on the next queued buffer. loop { match stream.pending_buffer { Some(PendingBuffer { duration: Some(_), .. }) => return Ok(()), Some(PendingBuffer { timestamp, pts, ref buffer, ref mut duration, .. }) => { // Already have a pending buffer but no duration, so try to get that now let (segment, buffer) = match self.peek_buffer( &stream.sinkpad, stream.delta_frames, &mut stream.pre_queue, &stream.running_time_utc_time_mapping, )? { Some(res) => res, None => { if stream.sinkpad.is_eos() { let dur = buffer.duration().unwrap_or(gst::ClockTime::ZERO); gst::trace!( CAT, obj: stream.sinkpad, "Stream is EOS, using {dur} as duration for queued buffer", ); let pts = pts + dur; if stream.end_pts.map_or(true, |end_pts| end_pts < pts) { gst::trace!(CAT, obj: stream.sinkpad, "Stream end PTS {pts}"); stream.end_pts = Some(pts); } *duration = Some(dur); return Ok(()); } else { gst::trace!(CAT, obj: stream.sinkpad, "Stream has no buffer queued"); return Err(gst_base::AGGREGATOR_FLOW_NEED_DATA); } } }; // Was checked above let pts_position = buffer.pts().unwrap(); let next_timestamp_position = if stream.delta_frames.requires_dts() { // Was checked above buffer.dts().unwrap() } else { pts_position }; let next_timestamp = segment .to_running_time_full(next_timestamp_position) .unwrap(); gst::trace!( CAT, obj: stream.sinkpad, "Stream has buffer with timestamp {next_timestamp} queued", ); let dur = next_timestamp .saturating_sub(timestamp) .positive() .unwrap_or_else(|| { gst::warning!( CAT, obj: stream.sinkpad, "Stream timestamps going backwards {next_timestamp} < {timestamp}", ); gst::ClockTime::ZERO }); gst::trace!( CAT, obj: stream.sinkpad, "Using {dur} as duration for queued buffer", ); let pts = pts + dur; if stream.end_pts.map_or(true, |end_pts| end_pts < pts) { gst::trace!(CAT, obj: stream.sinkpad, "Stream end PTS {pts}"); stream.end_pts = Some(pts); } *duration = Some(dur); return Ok(()); } None => { // Have no buffer queued at all yet let (segment, buffer) = match self.pop_buffer( &stream.sinkpad, stream.delta_frames, &mut stream.pre_queue, &mut stream.running_time_utc_time_mapping, )? { Some(res) => res, None => { if stream.sinkpad.is_eos() { gst::trace!( CAT, obj: stream.sinkpad, "Stream is EOS", ); return Err(gst::FlowError::Eos); } else { gst::trace!(CAT, obj: stream.sinkpad, "Stream has no buffer queued"); return Err(gst_base::AGGREGATOR_FLOW_NEED_DATA); } } }; // Was checked above let pts_position = buffer.pts().unwrap(); let dts_position = buffer.dts(); let pts = segment.to_running_time_full(pts_position).unwrap() .positive().unwrap_or_else(|| { gst::error!(CAT, obj: stream.sinkpad, "Stream has negative PTS running time"); gst::ClockTime::ZERO }); let dts = dts_position .map(|dts_position| segment.to_running_time_full(dts_position).unwrap()); let timestamp = if stream.delta_frames.requires_dts() { // Was checked above let dts = dts.unwrap(); if stream.start_dts.is_none() { gst::debug!(CAT, obj: stream.sinkpad, "Stream start DTS {dts}"); stream.start_dts = Some(dts); } dts } else { gst::Signed::Positive(pts) }; if stream .earliest_pts .map_or(true, |earliest_pts| earliest_pts > pts) { gst::debug!(CAT, obj: stream.sinkpad, "Stream earliest PTS {pts}"); stream.earliest_pts = Some(pts); } let composition_time_offset = if stream.delta_frames.requires_dts() { let pts = gst::Signed::Positive(pts); let dts = dts.unwrap(); // set above Some(i64::try_from((pts - dts).nseconds()).map_err(|_| { gst::error!(CAT, obj: stream.sinkpad, "Too big PTS/DTS difference"); gst::FlowError::Error })?) } else { None }; gst::trace!( CAT, obj: stream.sinkpad, "Stream has buffer of size {} with timestamp {timestamp} pending", buffer.size(), ); stream.pending_buffer = Some(PendingBuffer { buffer, timestamp, pts, composition_time_offset, duration: None, }); } } } } fn find_earliest_stream( &self, settings: &Settings, state: &mut State, ) -> Result, gst::FlowError> { if let Some(current_stream_idx) = state.current_stream_idx { // If a stream was previously selected, check if another buffer from // this stream can be consumed or if that would exceed the interleave. let single_stream = state.streams.len() == 1; let stream = &mut state.streams[current_stream_idx]; match self.queue_buffer(stream) { Ok(_) => { assert!(matches!( stream.pending_buffer, Some(PendingBuffer { duration: Some(_), .. }) )); if single_stream || (settings.interleave_bytes.map_or(true, |interleave_bytes| { interleave_bytes >= stream.queued_chunk_bytes }) && settings.interleave_time.map_or(true, |interleave_time| { interleave_time >= stream.queued_chunk_time })) { gst::trace!(CAT, obj: stream.sinkpad, "Continuing current chunk: single stream {single_stream}, or {} >= {} and {} >= {}", gst::format::Bytes::from_u64(stream.queued_chunk_bytes), settings.interleave_bytes.map(gst::format::Bytes::from_u64).display(), stream.queued_chunk_time, settings.interleave_time.display(), ); return Ok(Some(current_stream_idx)); } state.current_stream_idx = None; gst::debug!(CAT, obj: stream.sinkpad, "Switching to next chunk: {} < {} and {} < {}", gst::format::Bytes::from_u64(stream.queued_chunk_bytes), settings.interleave_bytes.map(gst::format::Bytes::from_u64).display(), stream.queued_chunk_time, settings.interleave_time.display(), ); } Err(gst::FlowError::Eos) => { gst::debug!(CAT, obj: stream.sinkpad, "Stream is EOS, switching to next stream"); state.current_stream_idx = None; } Err(err) => { return Err(err); } } } // Otherwise find the next earliest stream here let mut earliest_stream = None; let mut all_have_data_or_eos = true; let mut all_eos = true; for (idx, stream) in state.streams.iter_mut().enumerate() { // First queue a buffer on each stream and try to get the duration match self.queue_buffer(stream) { Ok(_) => { assert!(matches!( stream.pending_buffer, Some(PendingBuffer { duration: Some(_), .. }) )); let timestamp = stream.pending_buffer.as_ref().unwrap().timestamp; gst::trace!(CAT, obj: stream.sinkpad, "Stream at timestamp {timestamp}", ); all_eos = false; if earliest_stream .as_ref() .map_or(true, |(_idx, _stream, earliest_timestamp)| { *earliest_timestamp > timestamp }) { earliest_stream = Some((idx, stream, timestamp)); } } Err(gst::FlowError::Eos) => { all_eos &= true; continue; } Err(gst_base::AGGREGATOR_FLOW_NEED_DATA) => { all_have_data_or_eos = false; continue; } Err(err) => { return Err(err); } } } if !all_have_data_or_eos { gst::trace!(CAT, imp: self, "Not all streams have a buffer or are EOS"); Err(gst_base::AGGREGATOR_FLOW_NEED_DATA) } else if all_eos { gst::info!(CAT, imp: self, "All streams are EOS"); Err(gst::FlowError::Eos) } else if let Some((idx, stream, earliest_timestamp)) = earliest_stream { gst::debug!( CAT, obj: stream.sinkpad, "Stream is earliest stream with timestamp {earliest_timestamp}", ); gst::debug!( CAT, obj: stream.sinkpad, "Starting new chunk at offset {}", state.current_offset, ); stream.chunks.push(super::Chunk { offset: state.current_offset, samples: Vec::new(), }); stream.queued_chunk_time = gst::ClockTime::ZERO; stream.queued_chunk_bytes = 0; state.current_stream_idx = Some(idx); Ok(Some(idx)) } else { unreachable!() } } fn drain_buffers( &self, settings: &Settings, state: &mut State, buffers: &mut gst::BufferListRef, ) -> Result<(), gst::FlowError> { // Now we can start handling buffers while let Some(idx) = self.find_earliest_stream(settings, state)? { let stream = &mut state.streams[idx]; let buffer = stream.pending_buffer.take().unwrap(); if buffer.buffer.flags().contains(gst::BufferFlags::GAP) && buffer.buffer.flags().contains(gst::BufferFlags::DROPPABLE) && buffer.buffer.size() == 0 { gst::trace!(CAT, obj: stream.sinkpad, "Skipping gap buffer {buffer:?}"); // If a new chunk was just started for the gap buffer, don't bother and get rid // of this chunk again for now and search for the next stream. if let Some(chunk) = stream.chunks.last() { if chunk.samples.is_empty() { let _ = stream.chunks.pop(); state.current_stream_idx = None; } else { // Add duration of the gap to the current chunk stream.queued_chunk_time += buffer.duration.unwrap(); } } // Add the duration of the gap buffer to the previously written out sample. if let Some(previous_sample) = stream.chunks.last_mut().and_then(|c| c.samples.last_mut()) { gst::trace!(CAT, obj: stream.sinkpad, "Adding gap duration {} to previous sample", buffer.duration.unwrap()); previous_sample.duration += buffer.duration.unwrap(); } else { gst::trace!(CAT, obj: stream.sinkpad, "Resetting stream start time because it started with a gap"); // If there was no previous sample yet then the next sample needs to start // earlier or alternatively we change the start PTS. We do the latter here // as otherwise the first sample would be displayed too early. stream.earliest_pts = None; stream.start_dts = None; stream.end_pts = None; } continue; } gst::trace!(CAT, obj: stream.sinkpad, "Handling buffer {buffer:?} at offset {}", state.current_offset); let duration = buffer.duration.unwrap(); let composition_time_offset = buffer.composition_time_offset; let mut buffer = buffer.buffer; stream.queued_chunk_time += duration; stream.queued_chunk_bytes += buffer.size() as u64; stream .chunks .last_mut() .unwrap() .samples .push(super::Sample { sync_point: !buffer.flags().contains(gst::BufferFlags::DELTA_UNIT), duration, composition_time_offset, size: buffer.size() as u32, }); { let buffer = buffer.make_mut(); buffer.set_dts(None); buffer.set_pts(None); buffer.set_duration(duration); buffer.unset_flags(gst::BufferFlags::all()); } state.current_offset += buffer.size() as u64; state.mdat_size += buffer.size() as u64; buffers.add(buffer); } Ok(()) } fn create_streams(&self, state: &mut State) -> Result<(), gst::FlowError> { gst::info!(CAT, imp: self, "Creating streams"); for pad in self .obj() .sink_pads() .into_iter() .map(|pad| pad.downcast::().unwrap()) { let caps = match pad.current_caps() { Some(caps) => caps, None => { gst::warning!(CAT, obj: pad, "Skipping pad without caps"); continue; } }; gst::info!(CAT, obj: pad, "Configuring caps {caps:?}"); let s = caps.structure(0).unwrap(); let mut delta_frames = super::DeltaFrames::IntraOnly; match s.name().as_str() { "video/x-h264" | "video/x-h265" => { if !s.has_field_with_type("codec_data", gst::Buffer::static_type()) { gst::error!(CAT, obj: pad, "Received caps without codec_data"); return Err(gst::FlowError::NotNegotiated); } delta_frames = super::DeltaFrames::Bidirectional; } "video/x-vp9" => { if !s.has_field_with_type("colorimetry", str::static_type()) { gst::error!(CAT, obj: pad, "Received caps without colorimetry"); return Err(gst::FlowError::NotNegotiated); } delta_frames = super::DeltaFrames::PredictiveOnly; } "image/jpeg" => (), "audio/mpeg" => { if !s.has_field_with_type("codec_data", gst::Buffer::static_type()) { gst::error!(CAT, obj: pad, "Received caps without codec_data"); return Err(gst::FlowError::NotNegotiated); } } "audio/x-opus" => { if let Some(header) = s .get::("streamheader") .ok() .and_then(|a| a.get(0).and_then(|v| v.get::().ok())) { if gst_pbutils::codec_utils_opus_parse_header(&header, None).is_err() { gst::error!(CAT, obj: pad, "Received invalid Opus header"); return Err(gst::FlowError::NotNegotiated); } } else if gst_pbutils::codec_utils_opus_parse_caps(&caps, None).is_err() { gst::error!(CAT, obj: pad, "Received invalid Opus caps"); return Err(gst::FlowError::NotNegotiated); } } "audio/x-alaw" | "audio/x-mulaw" => (), "audio/x-adpcm" => (), "application/x-onvif-metadata" => (), _ => unreachable!(), } state.streams.push(Stream { sinkpad: pad, pre_queue: VecDeque::new(), caps, delta_frames, chunks: Vec::new(), pending_buffer: None, queued_chunk_time: gst::ClockTime::ZERO, queued_chunk_bytes: 0, start_dts: None, earliest_pts: None, end_pts: None, running_time_utc_time_mapping: None, }); } if state.streams.is_empty() { gst::error!(CAT, imp: self, "No streams available"); return Err(gst::FlowError::Error); } // Sort video streams first and then audio streams and then metadata streams, and each group by pad name. state.streams.sort_by(|a, b| { let order_of_caps = |caps: &gst::CapsRef| { let s = caps.structure(0).unwrap(); if s.name().starts_with("video/") { 0 } else if s.name().starts_with("audio/") { 1 } else if s.name().starts_with("application/x-onvif-metadata") { 2 } else { unimplemented!(); } }; let st_a = order_of_caps(&a.caps); let st_b = order_of_caps(&b.caps); if st_a == st_b { return a.sinkpad.name().cmp(&b.sinkpad.name()); } st_a.cmp(&st_b) }); Ok(()) } } #[glib::object_subclass] impl ObjectSubclass for MP4Mux { const NAME: &'static str = "GstRsMP4Mux"; type Type = super::MP4Mux; type ParentType = gst_base::Aggregator; type Class = Class; } impl ObjectImpl for MP4Mux { fn properties() -> &'static [glib::ParamSpec] { static PROPERTIES: Lazy> = Lazy::new(|| { vec![ glib::ParamSpecUInt64::builder("interleave-bytes") .nick("Interleave Bytes") .blurb("Interleave between streams in bytes") .default_value(DEFAULT_INTERLEAVE_BYTES.unwrap_or(0)) .mutable_ready() .build(), glib::ParamSpecUInt64::builder("interleave-time") .nick("Interleave Time") .blurb("Interleave between streams in nanoseconds") .default_value( DEFAULT_INTERLEAVE_TIME .map(gst::ClockTime::nseconds) .unwrap_or(u64::MAX), ) .mutable_ready() .build(), glib::ParamSpecUInt::builder("movie-timescale") .nick("Movie Timescale") .blurb("Timescale to use for the movie (units per second, 0 is automatic)") .mutable_ready() .build(), ] }); &PROPERTIES } fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) { match pspec.name() { "interleave-bytes" => { let mut settings = self.settings.lock().unwrap(); settings.interleave_bytes = match value.get().expect("type checked upstream") { 0 => None, v => Some(v), }; } "interleave-time" => { let mut settings = self.settings.lock().unwrap(); settings.interleave_time = match value.get().expect("type checked upstream") { Some(gst::ClockTime::ZERO) | None => None, v => v, }; } "movie-timescale" => { let mut settings = self.settings.lock().unwrap(); settings.movie_timescale = value.get().expect("type checked upstream"); } _ => unimplemented!(), } } fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value { match pspec.name() { "interleave-bytes" => { let settings = self.settings.lock().unwrap(); settings.interleave_bytes.unwrap_or(0).to_value() } "interleave-time" => { let settings = self.settings.lock().unwrap(); settings.interleave_time.to_value() } "movie-timescale" => { let settings = self.settings.lock().unwrap(); settings.movie_timescale.to_value() } _ => unimplemented!(), } } } impl GstObjectImpl for MP4Mux {} impl ElementImpl for MP4Mux { fn request_new_pad( &self, templ: &gst::PadTemplate, name: Option<&str>, caps: Option<&gst::Caps>, ) -> Option { let state = self.state.lock().unwrap(); if !state.streams.is_empty() { gst::error!( CAT, imp: self, "Can't request new pads after stream was started" ); return None; } self.parent_request_new_pad(templ, name, caps) } } impl AggregatorImpl for MP4Mux { fn next_time(&self) -> Option { None } fn sink_query( &self, aggregator_pad: &gst_base::AggregatorPad, query: &mut gst::QueryRef, ) -> bool { use gst::QueryViewMut; gst::trace!(CAT, obj: aggregator_pad, "Handling query {query:?}"); match query.view_mut() { QueryViewMut::Caps(q) => { let allowed_caps = aggregator_pad .current_caps() .unwrap_or_else(|| aggregator_pad.pad_template_caps()); if let Some(filter_caps) = q.filter() { let res = filter_caps .intersect_with_mode(&allowed_caps, gst::CapsIntersectMode::First); q.set_result(&res); } else { q.set_result(&allowed_caps); } true } _ => self.parent_sink_query(aggregator_pad, query), } } fn sink_event_pre_queue( &self, aggregator_pad: &gst_base::AggregatorPad, mut event: gst::Event, ) -> Result { use gst::EventView; gst::trace!(CAT, obj: aggregator_pad, "Handling event {event:?}"); match event.view() { EventView::Segment(ev) => { if ev.segment().format() != gst::Format::Time { gst::warning!( CAT, obj: aggregator_pad, "Received non-TIME segment, replacing with default TIME segment" ); let segment = gst::FormattedSegment::::new(); event = gst::event::Segment::builder(&segment) .seqnum(event.seqnum()) .build(); } self.parent_sink_event_pre_queue(aggregator_pad, event) } _ => self.parent_sink_event_pre_queue(aggregator_pad, event), } } fn sink_event(&self, aggregator_pad: &gst_base::AggregatorPad, event: gst::Event) -> bool { use gst::EventView; gst::trace!(CAT, obj: aggregator_pad, "Handling event {event:?}"); match event.view() { EventView::Tag(_ev) => { // TODO: Maybe store for putting into the header at the end? self.parent_sink_event(aggregator_pad, event) } _ => self.parent_sink_event(aggregator_pad, event), } } fn src_query(&self, query: &mut gst::QueryRef) -> bool { use gst::QueryViewMut; gst::trace!(CAT, imp: self, "Handling query {query:?}"); match query.view_mut() { QueryViewMut::Seeking(q) => { // We can't really handle seeking, it would break everything q.set(false, gst::ClockTime::ZERO, gst::ClockTime::NONE); true } _ => self.parent_src_query(query), } } fn src_event(&self, event: gst::Event) -> bool { use gst::EventView; gst::trace!(CAT, imp: self, "Handling event {event:?}"); match event.view() { EventView::Seek(_ev) => false, _ => self.parent_src_event(event), } } fn flush(&self) -> Result { gst::info!(CAT, imp: self, "Flushing"); let mut state = self.state.lock().unwrap(); for stream in &mut state.streams { stream.pending_buffer = None; stream.pre_queue.clear(); stream.running_time_utc_time_mapping = None; } drop(state); self.parent_flush() } fn stop(&self) -> Result<(), gst::ErrorMessage> { gst::trace!(CAT, imp: self, "Stopping"); let _ = self.parent_stop(); *self.state.lock().unwrap() = State::default(); Ok(()) } fn start(&self) -> Result<(), gst::ErrorMessage> { gst::trace!(CAT, imp: self, "Starting"); self.parent_start()?; // Always output a BYTES segment let segment = gst::FormattedSegment::::new(); self.obj().update_segment(&segment); *self.state.lock().unwrap() = State::default(); Ok(()) } fn negotiate(&self) -> bool { true } fn aggregate(&self, _timeout: bool) -> Result { let settings = self.settings.lock().unwrap().clone(); let mut state = self.state.lock().unwrap(); let mut buffers = gst::BufferList::new(); let mut caps = None; // If no streams were created yet, collect all streams now and write the mdat. if state.streams.is_empty() { // First check if downstream is seekable. If not we can't rewrite the mdat box header! drop(state); let mut q = gst::query::Seeking::new(gst::Format::Bytes); if self.obj().src_pad().peer_query(&mut q) { if !q.result().0 { gst::element_imp_error!( self, gst::StreamError::Mux, ["Downstream is not seekable"] ); return Err(gst::FlowError::Error); } } else { // Can't query downstream, have to assume downstream is seekable gst::warning!(CAT, imp: self, "Can't query downstream for seekability"); } state = self.state.lock().unwrap(); self.create_streams(&mut state)?; // Create caps now to be sent before any buffers caps = Some( gst::Caps::builder("video/quicktime") .field("variant", "iso") .build(), ); gst::info!( CAT, imp: self, "Creating ftyp box at offset {}", state.current_offset ); // ... and then create the ftyp box plus mdat box header so we can start outputting // actual data let ftyp = boxes::create_ftyp(self.obj().class().as_ref().variant).map_err(|err| { gst::error!(CAT, imp: self, "Failed to create ftyp box: {err}"); gst::FlowError::Error })?; state.current_offset += ftyp.size() as u64; buffers.get_mut().unwrap().add(ftyp); gst::info!( CAT, imp: self, "Creating mdat box header at offset {}", state.current_offset ); state.mdat_offset = Some(state.current_offset); let mdat = boxes::create_mdat_header(None).map_err(|err| { gst::error!(CAT, imp: self, "Failed to create mdat box header: {err}"); gst::FlowError::Error })?; state.current_offset += mdat.size() as u64; state.mdat_size = 0; buffers.get_mut().unwrap().add(mdat); } let res = match self.drain_buffers(&settings, &mut state, buffers.get_mut().unwrap()) { Ok(_) => Ok(gst::FlowSuccess::Ok), Err(err @ gst::FlowError::Eos) | Err(err @ gst_base::AGGREGATOR_FLOW_NEED_DATA) => { Err(err) } Err(err) => return Err(err), }; if res == Err(gst::FlowError::Eos) { // Create moov box now and append it to the buffers gst::info!( CAT, imp: self, "Creating moov box now, mdat ends at offset {} with size {}", state.current_offset, state.mdat_size ); let mut streams = Vec::with_capacity(state.streams.len()); for stream in state.streams.drain(..) { let pad_settings = stream.sinkpad.imp().settings.lock().unwrap().clone(); let (earliest_pts, end_pts) = match Option::zip(stream.earliest_pts, stream.end_pts) { Some(res) => res, None => continue, // empty stream }; streams.push(super::Stream { caps: stream.caps.clone(), delta_frames: stream.delta_frames, trak_timescale: pad_settings.trak_timescale, start_dts: stream.start_dts, earliest_pts, end_pts, chunks: stream.chunks, }); } let moov = boxes::create_moov(super::Header { variant: self.obj().class().as_ref().variant, movie_timescale: settings.movie_timescale, streams, }) .map_err(|err| { gst::error!(CAT, imp: self, "Failed to create moov box: {err}"); gst::FlowError::Error })?; state.current_offset += moov.size() as u64; buffers.get_mut().unwrap().add(moov); } drop(state); if let Some(ref caps) = caps { self.obj().set_src_caps(caps); } if !buffers.is_empty() { if let Err(err) = self.obj().finish_buffer_list(buffers) { gst::error!(CAT, imp: self, "Failed pushing buffers: {err:?}"); return Err(err); } } if res == Err(gst::FlowError::Eos) { let mut state = self.state.lock().unwrap(); if let Some(mdat_offset) = state.mdat_offset { gst::info!( CAT, imp: self, "Rewriting mdat box header at offset {mdat_offset} with size {} now", state.mdat_size, ); let mut segment = gst::FormattedSegment::::new(); segment.set_start(gst::format::Bytes::from_u64(mdat_offset)); state.current_offset = mdat_offset; let mdat = boxes::create_mdat_header(Some(state.mdat_size)).map_err(|err| { gst::error!(CAT, imp: self, "Failed to create mdat box header: {err}"); gst::FlowError::Error })?; drop(state); self.obj().update_segment(&segment); if let Err(err) = self.obj().finish_buffer(mdat) { gst::error!( CAT, imp: self, "Failed pushing updated mdat box header buffer downstream: {err:?}", ); } } } res } } #[repr(C)] pub(crate) struct Class { parent: gst_base::ffi::GstAggregatorClass, variant: super::Variant, } unsafe impl ClassStruct for Class { type Type = MP4Mux; } impl std::ops::Deref for Class { type Target = glib::Class; fn deref(&self) -> &Self::Target { unsafe { &*(&self.parent as *const _ as *const _) } } } unsafe impl IsSubclassable for super::MP4Mux { fn class_init(class: &mut glib::Class) { Self::parent_class_init::(class); let class = class.as_mut(); class.variant = T::VARIANT; } } pub(crate) trait MP4MuxImpl: AggregatorImpl { const VARIANT: super::Variant; } #[derive(Default)] pub(crate) struct ISOMP4Mux; #[glib::object_subclass] impl ObjectSubclass for ISOMP4Mux { const NAME: &'static str = "GstISOMP4Mux"; type Type = super::ISOMP4Mux; type ParentType = super::MP4Mux; } impl ObjectImpl for ISOMP4Mux {} impl GstObjectImpl for ISOMP4Mux {} impl ElementImpl for ISOMP4Mux { fn metadata() -> Option<&'static gst::subclass::ElementMetadata> { static ELEMENT_METADATA: Lazy = Lazy::new(|| { gst::subclass::ElementMetadata::new( "ISOMP4Mux", "Codec/Muxer", "ISO MP4 muxer", "Sebastian Dröge ", ) }); Some(&*ELEMENT_METADATA) } fn pad_templates() -> &'static [gst::PadTemplate] { static PAD_TEMPLATES: Lazy> = Lazy::new(|| { let src_pad_template = gst::PadTemplate::new( "src", gst::PadDirection::Src, gst::PadPresence::Always, &gst::Caps::builder("video/quicktime") .field("variant", "iso") .build(), ) .unwrap(); let sink_pad_template = gst::PadTemplate::with_gtype( "sink_%u", gst::PadDirection::Sink, gst::PadPresence::Request, &[ gst::Structure::builder("video/x-h264") .field("stream-format", gst::List::new(["avc", "avc3"])) .field("alignment", "au") .field("width", gst::IntRange::new(1, u16::MAX as i32)) .field("height", gst::IntRange::new(1, u16::MAX as i32)) .build(), gst::Structure::builder("video/x-h265") .field("stream-format", gst::List::new(["hvc1", "hev1"])) .field("alignment", "au") .field("width", gst::IntRange::new(1, u16::MAX as i32)) .field("height", gst::IntRange::new(1, u16::MAX as i32)) .build(), gst::Structure::builder("video/x-vp9") .field("profile", gst::List::new(["0", "1", "2", "3"])) .field("chroma-format", gst::List::new(["4:2:0", "4:2:2", "4:4:4"])) .field("bit-depth-luma", gst::List::new([8u32, 10u32, 12u32])) .field("bit-depth-chroma", gst::List::new([8u32, 10u32, 12u32])) .field("width", gst::IntRange::new(1, u16::MAX as i32)) .field("height", gst::IntRange::new(1, u16::MAX as i32)) .build(), gst::Structure::builder("audio/mpeg") .field("mpegversion", 4i32) .field("stream-format", "raw") .field("channels", gst::IntRange::new(1, u16::MAX as i32)) .field("rate", gst::IntRange::new(1, i32::MAX)) .build(), gst::Structure::builder("audio/x-opus") .field("channel-mapping-family", gst::IntRange::new(0i32, 255)) .field("channels", gst::IntRange::new(1i32, 8)) .field("rate", gst::IntRange::new(1, i32::MAX)) .build(), ] .into_iter() .collect::(), super::MP4MuxPad::static_type(), ) .unwrap(); vec![src_pad_template, sink_pad_template] }); PAD_TEMPLATES.as_ref() } } impl AggregatorImpl for ISOMP4Mux {} impl MP4MuxImpl for ISOMP4Mux { const VARIANT: super::Variant = super::Variant::ISO; } #[derive(Default)] pub(crate) struct ONVIFMP4Mux; #[glib::object_subclass] impl ObjectSubclass for ONVIFMP4Mux { const NAME: &'static str = "GstONVIFMP4Mux"; type Type = super::ONVIFMP4Mux; type ParentType = super::MP4Mux; } impl ObjectImpl for ONVIFMP4Mux {} impl GstObjectImpl for ONVIFMP4Mux {} impl ElementImpl for ONVIFMP4Mux { fn metadata() -> Option<&'static gst::subclass::ElementMetadata> { static ELEMENT_METADATA: Lazy = Lazy::new(|| { gst::subclass::ElementMetadata::new( "ONVIFMP4Mux", "Codec/Muxer", "ONVIF MP4 muxer", "Sebastian Dröge ", ) }); Some(&*ELEMENT_METADATA) } fn pad_templates() -> &'static [gst::PadTemplate] { static PAD_TEMPLATES: Lazy> = Lazy::new(|| { let src_pad_template = gst::PadTemplate::new( "src", gst::PadDirection::Src, gst::PadPresence::Always, &gst::Caps::builder("video/quicktime") .field("variant", "iso") .build(), ) .unwrap(); let sink_pad_template = gst::PadTemplate::with_gtype( "sink_%u", gst::PadDirection::Sink, gst::PadPresence::Request, &[ gst::Structure::builder("video/x-h264") .field("stream-format", gst::List::new(["avc", "avc3"])) .field("alignment", "au") .field("width", gst::IntRange::::new(1, u16::MAX as i32)) .field("height", gst::IntRange::::new(1, u16::MAX as i32)) .build(), gst::Structure::builder("video/x-h265") .field("stream-format", gst::List::new(["hvc1", "hev1"])) .field("alignment", "au") .field("width", gst::IntRange::::new(1, u16::MAX as i32)) .field("height", gst::IntRange::::new(1, u16::MAX as i32)) .build(), gst::Structure::builder("image/jpeg") .field("width", gst::IntRange::::new(1, u16::MAX as i32)) .field("height", gst::IntRange::::new(1, u16::MAX as i32)) .build(), gst::Structure::builder("audio/mpeg") .field("mpegversion", 4i32) .field("stream-format", "raw") .field("channels", gst::IntRange::::new(1, u16::MAX as i32)) .field("rate", gst::IntRange::::new(1, i32::MAX)) .build(), gst::Structure::builder("audio/x-alaw") .field("channels", gst::IntRange::::new(1, 2)) .field("rate", gst::IntRange::::new(1, i32::MAX)) .build(), gst::Structure::builder("audio/x-mulaw") .field("channels", gst::IntRange::::new(1, 2)) .field("rate", gst::IntRange::::new(1, i32::MAX)) .build(), gst::Structure::builder("audio/x-adpcm") .field("layout", "g726") .field("channels", 1i32) .field("rate", 8000i32) .field("bitrate", gst::List::new([16000i32, 24000, 32000, 40000])) .build(), gst::Structure::builder("application/x-onvif-metadata") .field("parsed", true) .build(), ] .into_iter() .collect::(), super::MP4MuxPad::static_type(), ) .unwrap(); vec![src_pad_template, sink_pad_template] }); PAD_TEMPLATES.as_ref() } } impl AggregatorImpl for ONVIFMP4Mux {} impl MP4MuxImpl for ONVIFMP4Mux { const VARIANT: super::Variant = super::Variant::ONVIF; } #[derive(Default, Clone)] struct PadSettings { trak_timescale: u32, } #[derive(Default)] pub(crate) struct MP4MuxPad { settings: Mutex, } #[glib::object_subclass] impl ObjectSubclass for MP4MuxPad { const NAME: &'static str = "GstRsMP4MuxPad"; type Type = super::MP4MuxPad; type ParentType = gst_base::AggregatorPad; } impl ObjectImpl for MP4MuxPad { fn properties() -> &'static [glib::ParamSpec] { static PROPERTIES: Lazy> = Lazy::new(|| { vec![glib::ParamSpecUInt::builder("trak-timescale") .nick("Track Timescale") .blurb("Timescale to use for the track (units per second, 0 is automatic)") .mutable_ready() .build()] }); &PROPERTIES } fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) { match pspec.name() { "trak-timescale" => { let mut settings = self.settings.lock().unwrap(); settings.trak_timescale = value.get().expect("type checked upstream"); } _ => unimplemented!(), } } fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value { match pspec.name() { "trak-timescale" => { let settings = self.settings.lock().unwrap(); settings.trak_timescale.to_value() } _ => unimplemented!(), } } } impl GstObjectImpl for MP4MuxPad {} impl PadImpl for MP4MuxPad {} impl AggregatorPadImpl for MP4MuxPad { fn flush(&self, aggregator: &gst_base::Aggregator) -> Result { let mux = aggregator.downcast_ref::().unwrap(); let mut mux_state = mux.imp().state.lock().unwrap(); gst::info!(CAT, imp: self, "Flushing"); for stream in &mut mux_state.streams { if stream.sinkpad == *self.obj() { stream.pending_buffer = None; stream.pre_queue.clear(); stream.running_time_utc_time_mapping = None; break; } } drop(mux_state); self.parent_flush(aggregator) } }