2022-11-07 18:41:50 +00:00
|
|
|
// Copyright (C) 2022 Sebastian Dröge <sebastian@centricular.com>
|
|
|
|
//
|
|
|
|
// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
|
|
|
|
// If a copy of the MPL was not distributed with this file, You can obtain one at
|
|
|
|
// <https://mozilla.org/MPL/2.0/>.
|
|
|
|
//
|
|
|
|
// SPDX-License-Identifier: MPL-2.0
|
|
|
|
|
|
|
|
use gst::glib;
|
|
|
|
use gst::prelude::*;
|
|
|
|
use gst::subclass::prelude::*;
|
|
|
|
use gst_base::prelude::*;
|
|
|
|
use gst_base::subclass::prelude::*;
|
|
|
|
|
2022-11-09 07:02:35 +00:00
|
|
|
use std::collections::VecDeque;
|
2022-11-07 18:41:50 +00:00
|
|
|
use std::sync::Mutex;
|
|
|
|
|
|
|
|
use once_cell::sync::Lazy;
|
|
|
|
|
|
|
|
use super::boxes;
|
|
|
|
|
2022-11-09 07:02:35 +00:00
|
|
|
/// Offset between NTP and UNIX epoch in seconds.
|
|
|
|
/// NTP = UNIX + NTP_UNIX_OFFSET.
|
|
|
|
const NTP_UNIX_OFFSET: u64 = 2_208_988_800;
|
|
|
|
|
|
|
|
/// Reference timestamp meta caps for NTP timestamps.
|
|
|
|
static NTP_CAPS: Lazy<gst::Caps> = Lazy::new(|| gst::Caps::builder("timestamp/x-ntp").build());
|
|
|
|
|
|
|
|
/// Reference timestamp meta caps for UNIX timestamps.
|
|
|
|
static UNIX_CAPS: Lazy<gst::Caps> = Lazy::new(|| gst::Caps::builder("timestamp/x-unix").build());
|
|
|
|
|
|
|
|
/// Returns the UTC time of the buffer in the UNIX epoch.
|
|
|
|
fn get_utc_time_from_buffer(buffer: &gst::BufferRef) -> Option<gst::ClockTime> {
|
|
|
|
buffer
|
|
|
|
.iter_meta::<gst::ReferenceTimestampMeta>()
|
|
|
|
.find_map(|meta| {
|
|
|
|
if meta.reference().can_intersect(&UNIX_CAPS) {
|
|
|
|
Some(meta.timestamp())
|
|
|
|
} else if meta.reference().can_intersect(&NTP_CAPS) {
|
|
|
|
meta.timestamp().checked_sub(NTP_UNIX_OFFSET.seconds())
|
|
|
|
} else {
|
|
|
|
None
|
|
|
|
}
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
2022-11-18 10:18:40 +00:00
|
|
|
/// Converts a running time to an UTC time.
|
|
|
|
fn running_time_to_utc_time(
|
|
|
|
running_time: impl Into<gst::Signed<gst::ClockTime>>,
|
|
|
|
running_time_utc_time_mapping: (
|
|
|
|
impl Into<gst::Signed<gst::ClockTime>>,
|
|
|
|
impl Into<gst::Signed<gst::ClockTime>>,
|
|
|
|
),
|
|
|
|
) -> Option<gst::ClockTime> {
|
|
|
|
running_time_utc_time_mapping
|
|
|
|
.1
|
|
|
|
.into()
|
|
|
|
.checked_sub(running_time_utc_time_mapping.0.into())
|
|
|
|
.and_then(|res| res.checked_add(running_time.into()))
|
|
|
|
.and_then(|res| res.positive())
|
|
|
|
}
|
|
|
|
|
2022-11-07 18:41:50 +00:00
|
|
|
static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
|
|
|
|
gst::DebugCategory::new(
|
|
|
|
"mp4mux",
|
|
|
|
gst::DebugColorFlags::empty(),
|
|
|
|
Some("MP4Mux Element"),
|
|
|
|
)
|
|
|
|
});
|
|
|
|
|
|
|
|
const DEFAULT_INTERLEAVE_BYTES: Option<u64> = None;
|
|
|
|
const DEFAULT_INTERLEAVE_TIME: Option<gst::ClockTime> = Some(gst::ClockTime::from_mseconds(500));
|
|
|
|
|
|
|
|
#[derive(Debug, Clone)]
|
|
|
|
struct Settings {
|
|
|
|
interleave_bytes: Option<u64>,
|
|
|
|
interleave_time: Option<gst::ClockTime>,
|
|
|
|
movie_timescale: u32,
|
|
|
|
}
|
|
|
|
|
|
|
|
impl Default for Settings {
|
|
|
|
fn default() -> Self {
|
|
|
|
Settings {
|
|
|
|
interleave_bytes: DEFAULT_INTERLEAVE_BYTES,
|
|
|
|
interleave_time: DEFAULT_INTERLEAVE_TIME,
|
|
|
|
movie_timescale: 0,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-11-09 07:02:35 +00:00
|
|
|
#[derive(Debug)]
|
2022-11-07 18:41:50 +00:00
|
|
|
struct PendingBuffer {
|
|
|
|
buffer: gst::Buffer,
|
|
|
|
timestamp: gst::Signed<gst::ClockTime>,
|
|
|
|
pts: gst::ClockTime,
|
|
|
|
composition_time_offset: Option<i64>,
|
|
|
|
duration: Option<gst::ClockTime>,
|
|
|
|
}
|
|
|
|
|
|
|
|
struct Stream {
|
|
|
|
/// Sink pad for this stream.
|
|
|
|
sinkpad: super::MP4MuxPad,
|
|
|
|
|
2022-11-09 07:02:35 +00:00
|
|
|
/// Pre-queue for ONVIF variant to timestamp all buffers with their UTC time.
|
|
|
|
pre_queue: VecDeque<(gst::FormattedSegment<gst::ClockTime>, gst::Buffer)>,
|
|
|
|
|
2022-11-07 18:41:50 +00:00
|
|
|
/// Currently configured caps for this stream.
|
|
|
|
caps: gst::Caps,
|
|
|
|
/// Whether this stream is intra-only and has frame reordering.
|
|
|
|
delta_frames: super::DeltaFrames,
|
|
|
|
|
|
|
|
/// Already written out chunks with their samples for this stream
|
|
|
|
chunks: Vec<super::Chunk>,
|
|
|
|
|
|
|
|
/// Queued time in the latest chunk.
|
|
|
|
queued_chunk_time: gst::ClockTime,
|
|
|
|
/// Queue bytes in the latest chunk.
|
|
|
|
queued_chunk_bytes: u64,
|
|
|
|
|
|
|
|
/// Currently pending buffer, DTS or PTS running time and duration
|
|
|
|
///
|
|
|
|
/// If the duration is set then the next buffer is already queued up and the duration was
|
|
|
|
/// calculated based on that.
|
|
|
|
pending_buffer: Option<PendingBuffer>,
|
|
|
|
|
|
|
|
/// Start DTS.
|
|
|
|
start_dts: Option<gst::Signed<gst::ClockTime>>,
|
|
|
|
|
|
|
|
/// Earliest PTS.
|
|
|
|
earliest_pts: Option<gst::ClockTime>,
|
|
|
|
/// Current end PTS.
|
|
|
|
end_pts: Option<gst::ClockTime>,
|
2022-11-09 07:02:35 +00:00
|
|
|
|
|
|
|
/// In ONVIF mode, the mapping between running time and UTC time (UNIX)
|
2022-11-17 17:57:17 +00:00
|
|
|
running_time_utc_time_mapping: Option<(gst::Signed<gst::ClockTime>, gst::ClockTime)>,
|
2022-11-07 18:41:50 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
#[derive(Default)]
|
|
|
|
struct State {
|
|
|
|
/// List of streams when the muxer was started.
|
|
|
|
streams: Vec<Stream>,
|
|
|
|
|
|
|
|
/// Index of stream that is currently selected to fill a chunk.
|
|
|
|
current_stream_idx: Option<usize>,
|
|
|
|
|
|
|
|
/// Current writing offset since the beginning of the stream.
|
|
|
|
current_offset: u64,
|
|
|
|
|
|
|
|
/// Offset of the `mdat` box from the beginning of the stream.
|
|
|
|
mdat_offset: Option<u64>,
|
|
|
|
|
|
|
|
/// Size of the `mdat` as written so far.
|
|
|
|
mdat_size: u64,
|
|
|
|
}
|
|
|
|
|
|
|
|
#[derive(Default)]
|
|
|
|
pub(crate) struct MP4Mux {
|
|
|
|
state: Mutex<State>,
|
|
|
|
settings: Mutex<Settings>,
|
|
|
|
}
|
|
|
|
|
|
|
|
impl MP4Mux {
|
2022-11-09 07:02:35 +00:00
|
|
|
/// Checks if a buffer is valid according to the stream configuration.
|
|
|
|
fn check_buffer(
|
|
|
|
buffer: &gst::BufferRef,
|
|
|
|
sinkpad: &super::MP4MuxPad,
|
|
|
|
delta_frames: super::DeltaFrames,
|
|
|
|
) -> Result<(), gst::FlowError> {
|
|
|
|
if delta_frames.requires_dts() && buffer.dts().is_none() {
|
|
|
|
gst::error!(CAT, obj: sinkpad, "Require DTS for video streams");
|
|
|
|
return Err(gst::FlowError::Error);
|
|
|
|
}
|
|
|
|
|
|
|
|
if buffer.pts().is_none() {
|
|
|
|
gst::error!(CAT, obj: sinkpad, "Require timestamped buffers");
|
|
|
|
return Err(gst::FlowError::Error);
|
|
|
|
}
|
|
|
|
|
|
|
|
if delta_frames.intra_only() && buffer.flags().contains(gst::BufferFlags::DELTA_UNIT) {
|
|
|
|
gst::error!(CAT, obj: sinkpad, "Intra-only stream with delta units");
|
|
|
|
return Err(gst::FlowError::Error);
|
|
|
|
}
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
|
|
|
fn peek_buffer(
|
|
|
|
&self,
|
|
|
|
sinkpad: &super::MP4MuxPad,
|
|
|
|
delta_frames: super::DeltaFrames,
|
|
|
|
pre_queue: &mut VecDeque<(gst::FormattedSegment<gst::ClockTime>, gst::Buffer)>,
|
2022-11-17 17:57:17 +00:00
|
|
|
running_time_utc_time_mapping: &Option<(gst::Signed<gst::ClockTime>, gst::ClockTime)>,
|
2022-11-09 07:02:35 +00:00
|
|
|
) -> Result<Option<(gst::FormattedSegment<gst::ClockTime>, gst::Buffer)>, gst::FlowError> {
|
|
|
|
if let Some((segment, buffer)) = pre_queue.front() {
|
|
|
|
return Ok(Some((segment.clone(), buffer.clone())));
|
|
|
|
}
|
|
|
|
|
|
|
|
let mut buffer = match sinkpad.peek_buffer() {
|
|
|
|
None => return Ok(None),
|
|
|
|
Some(buffer) => buffer,
|
|
|
|
};
|
|
|
|
|
|
|
|
Self::check_buffer(&buffer, sinkpad, delta_frames)?;
|
|
|
|
|
|
|
|
let mut segment = match sinkpad.segment().downcast::<gst::ClockTime>().ok() {
|
|
|
|
Some(segment) => segment,
|
|
|
|
None => {
|
|
|
|
gst::error!(CAT, obj: sinkpad, "Got buffer before segment");
|
|
|
|
return Err(gst::FlowError::Error);
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
|
|
|
// For ONVIF we need to re-timestamp the buffer with its UTC time.
|
|
|
|
// We can only possibly end up here after the running-time UTC mapping is known.
|
|
|
|
//
|
|
|
|
// After re-timestamping, put the buffer into the pre-queue so re-timestamping only has to
|
|
|
|
// happen once.
|
|
|
|
if self.obj().class().as_ref().variant == super::Variant::ONVIF {
|
|
|
|
let running_time_utc_time_mapping = running_time_utc_time_mapping.unwrap();
|
|
|
|
|
|
|
|
let pts_position = buffer.pts().unwrap();
|
|
|
|
let dts_position = buffer.dts();
|
|
|
|
|
|
|
|
let pts = segment.to_running_time_full(pts_position).unwrap();
|
|
|
|
|
|
|
|
let dts = dts_position
|
|
|
|
.map(|dts_position| segment.to_running_time_full(dts_position).unwrap());
|
|
|
|
|
|
|
|
let utc_time = match get_utc_time_from_buffer(&buffer) {
|
|
|
|
None => {
|
|
|
|
// Calculate from the mapping
|
2022-11-18 10:18:40 +00:00
|
|
|
running_time_to_utc_time(pts, running_time_utc_time_mapping).ok_or_else(
|
|
|
|
|| {
|
2022-11-09 07:02:35 +00:00
|
|
|
gst::error!(CAT, obj: sinkpad, "Stream has negative PTS UTC time");
|
|
|
|
gst::FlowError::Error
|
2022-11-18 10:18:40 +00:00
|
|
|
},
|
|
|
|
)?
|
2022-11-09 07:02:35 +00:00
|
|
|
}
|
|
|
|
Some(utc_time) => utc_time,
|
|
|
|
};
|
|
|
|
|
|
|
|
gst::trace!(
|
|
|
|
CAT,
|
|
|
|
obj: sinkpad,
|
|
|
|
"Mapped PTS running time {pts} to UTC time {utc_time}"
|
|
|
|
);
|
|
|
|
|
|
|
|
{
|
|
|
|
let buffer = buffer.make_mut();
|
|
|
|
buffer.set_pts(utc_time);
|
|
|
|
|
|
|
|
if let Some(dts) = dts {
|
2022-11-18 10:18:40 +00:00
|
|
|
let dts_utc_time =
|
|
|
|
running_time_to_utc_time(dts, (pts, utc_time)).ok_or_else(|| {
|
2022-11-09 07:02:35 +00:00
|
|
|
gst::error!(CAT, obj: sinkpad, "Stream has negative DTS UTC time");
|
|
|
|
gst::FlowError::Error
|
|
|
|
})?;
|
|
|
|
gst::trace!(
|
|
|
|
CAT,
|
|
|
|
obj: sinkpad,
|
|
|
|
"Mapped DTS running time {dts} to UTC time {dts_utc_time}"
|
|
|
|
);
|
|
|
|
buffer.set_dts(dts_utc_time);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
segment = gst::FormattedSegment::default();
|
|
|
|
|
|
|
|
// Drop current buffer as it is now queued
|
|
|
|
sinkpad.drop_buffer();
|
|
|
|
pre_queue.push_back((segment.clone(), buffer.clone()));
|
|
|
|
}
|
|
|
|
|
|
|
|
Ok(Some((segment, buffer)))
|
|
|
|
}
|
|
|
|
|
|
|
|
fn pop_buffer(
|
|
|
|
&self,
|
|
|
|
sinkpad: &super::MP4MuxPad,
|
|
|
|
delta_frames: super::DeltaFrames,
|
|
|
|
pre_queue: &mut VecDeque<(gst::FormattedSegment<gst::ClockTime>, gst::Buffer)>,
|
2022-11-17 17:57:17 +00:00
|
|
|
running_time_utc_time_mapping: &mut Option<(gst::Signed<gst::ClockTime>, gst::ClockTime)>,
|
2022-11-09 07:02:35 +00:00
|
|
|
) -> Result<Option<(gst::FormattedSegment<gst::ClockTime>, gst::Buffer)>, gst::FlowError> {
|
|
|
|
// In ONVIF mode we need to get UTC times for each buffer and synchronize based on that.
|
|
|
|
// Queue up to 6s of data to get the first UTC time and then backdate.
|
|
|
|
if self.obj().class().as_ref().variant == super::Variant::ONVIF
|
|
|
|
&& running_time_utc_time_mapping.is_none()
|
|
|
|
{
|
|
|
|
if let Some((last, first)) = Option::zip(pre_queue.back(), pre_queue.front()) {
|
|
|
|
// Existence of PTS/DTS checked below
|
|
|
|
let (last, first) = if delta_frames.requires_dts() {
|
|
|
|
(
|
|
|
|
last.0.to_running_time_full(last.1.dts()).unwrap(),
|
|
|
|
first.0.to_running_time_full(first.1.dts()).unwrap(),
|
|
|
|
)
|
|
|
|
} else {
|
|
|
|
(
|
|
|
|
last.0.to_running_time_full(last.1.pts()).unwrap(),
|
|
|
|
first.0.to_running_time_full(first.1.pts()).unwrap(),
|
|
|
|
)
|
|
|
|
};
|
|
|
|
|
|
|
|
if last.saturating_sub(first)
|
|
|
|
> gst::Signed::Positive(gst::ClockTime::from_seconds(6))
|
|
|
|
{
|
|
|
|
gst::error!(
|
|
|
|
CAT,
|
|
|
|
obj: sinkpad,
|
|
|
|
"Got no UTC time in the first 6s of the stream"
|
|
|
|
);
|
|
|
|
return Err(gst::FlowError::Error);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
let buffer = match sinkpad.pop_buffer() {
|
|
|
|
None => {
|
|
|
|
if sinkpad.is_eos() {
|
|
|
|
gst::error!(CAT, obj: sinkpad, "Got no UTC time before EOS");
|
|
|
|
return Err(gst::FlowError::Error);
|
|
|
|
} else {
|
|
|
|
return Err(gst_base::AGGREGATOR_FLOW_NEED_DATA);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
Some(buffer) => buffer,
|
|
|
|
};
|
|
|
|
|
|
|
|
Self::check_buffer(&buffer, sinkpad, delta_frames)?;
|
|
|
|
|
|
|
|
let segment = match sinkpad.segment().downcast::<gst::ClockTime>().ok() {
|
|
|
|
Some(segment) => segment,
|
|
|
|
None => {
|
|
|
|
gst::error!(CAT, obj: sinkpad, "Got buffer before segment");
|
|
|
|
return Err(gst::FlowError::Error);
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
|
|
|
let utc_time = match get_utc_time_from_buffer(&buffer) {
|
|
|
|
Some(utc_time) => utc_time,
|
|
|
|
None => {
|
|
|
|
pre_queue.push_back((segment, buffer));
|
|
|
|
return Err(gst_base::AGGREGATOR_FLOW_NEED_DATA);
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
2022-11-17 17:57:17 +00:00
|
|
|
let running_time = segment.to_running_time_full(buffer.pts().unwrap()).unwrap();
|
2022-11-09 07:02:35 +00:00
|
|
|
gst::info!(
|
|
|
|
CAT,
|
|
|
|
obj: sinkpad,
|
|
|
|
"Got initial UTC time {utc_time} at PTS running time {running_time}",
|
|
|
|
);
|
|
|
|
|
2022-11-18 10:18:40 +00:00
|
|
|
let mapping = (running_time, utc_time);
|
|
|
|
*running_time_utc_time_mapping = Some(mapping);
|
2022-11-09 07:02:35 +00:00
|
|
|
|
|
|
|
// Push the buffer onto the pre-queue and re-timestamp it and all other buffers
|
|
|
|
// based on the mapping above.
|
|
|
|
pre_queue.push_back((segment, buffer));
|
|
|
|
|
|
|
|
for (segment, buffer) in pre_queue.iter_mut() {
|
|
|
|
let buffer = buffer.make_mut();
|
|
|
|
|
|
|
|
let pts = segment.to_running_time_full(buffer.pts().unwrap()).unwrap();
|
2022-11-18 10:18:40 +00:00
|
|
|
let pts_utc_time = running_time_to_utc_time(pts, mapping).ok_or_else(|| {
|
|
|
|
gst::error!(CAT, obj: sinkpad, "Stream has negative PTS UTC time");
|
|
|
|
gst::FlowError::Error
|
|
|
|
})?;
|
2022-11-09 07:02:35 +00:00
|
|
|
gst::trace!(
|
|
|
|
CAT,
|
|
|
|
obj: sinkpad,
|
|
|
|
"Mapped PTS running time {pts} to UTC time {pts_utc_time}"
|
|
|
|
);
|
|
|
|
buffer.set_pts(pts_utc_time);
|
|
|
|
|
|
|
|
if let Some(dts) = buffer.dts() {
|
|
|
|
let dts = segment.to_running_time_full(dts).unwrap();
|
2022-11-18 10:18:40 +00:00
|
|
|
let dts_utc_time = running_time_to_utc_time(dts, mapping).ok_or_else(|| {
|
|
|
|
gst::error!(CAT, obj: sinkpad, "Stream has negative DTS UTC time");
|
|
|
|
gst::FlowError::Error
|
|
|
|
})?;
|
2022-11-09 07:02:35 +00:00
|
|
|
gst::trace!(
|
|
|
|
CAT,
|
|
|
|
obj: sinkpad,
|
|
|
|
"Mapped DTS running time {dts} to UTC time {dts_utc_time}"
|
|
|
|
);
|
|
|
|
buffer.set_dts(dts_utc_time);
|
|
|
|
}
|
|
|
|
|
|
|
|
*segment = gst::FormattedSegment::default();
|
|
|
|
}
|
|
|
|
|
|
|
|
// Fall through below and pop the first buffer finally
|
|
|
|
}
|
|
|
|
|
|
|
|
if let Some((segment, buffer)) = pre_queue.pop_front() {
|
|
|
|
return Ok(Some((segment, buffer)));
|
|
|
|
}
|
|
|
|
|
|
|
|
// If the mapping is set, then we would get the buffer always from the pre-queue:
|
|
|
|
// - either it was set before already, in which case the next buffer would've been peeked
|
|
|
|
// for calculating the duration to the previous buffer, and then put into the pre-queue
|
|
|
|
// - or this is the very first buffer and we just put it into the queue overselves above
|
|
|
|
if self.obj().class().as_ref().variant == super::Variant::ONVIF {
|
|
|
|
if sinkpad.is_eos() {
|
|
|
|
return Ok(None);
|
|
|
|
}
|
|
|
|
unreachable!();
|
|
|
|
}
|
|
|
|
|
|
|
|
let buffer = match sinkpad.pop_buffer() {
|
|
|
|
None => return Ok(None),
|
|
|
|
Some(buffer) => buffer,
|
|
|
|
};
|
|
|
|
|
|
|
|
Self::check_buffer(&buffer, sinkpad, delta_frames)?;
|
|
|
|
|
|
|
|
let segment = match sinkpad.segment().downcast::<gst::ClockTime>().ok() {
|
|
|
|
Some(segment) => segment,
|
|
|
|
None => {
|
|
|
|
gst::error!(CAT, obj: sinkpad, "Got buffer before segment");
|
|
|
|
return Err(gst::FlowError::Error);
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
|
|
|
Ok(Some((segment, buffer)))
|
|
|
|
}
|
|
|
|
|
2022-11-07 18:41:50 +00:00
|
|
|
/// Queue a buffer and calculate its duration.
|
|
|
|
///
|
|
|
|
/// Returns `Ok(())` if a buffer with duration is known or if the stream is EOS and a buffer is
|
|
|
|
/// queued, i.e. if this stream is ready to be processed.
|
|
|
|
///
|
|
|
|
/// Returns `Err(Eos)` if nothing is queued and the stream is EOS.
|
|
|
|
///
|
|
|
|
/// Returns `Err(AGGREGATOR_FLOW_NEED_DATA)` if more data is needed.
|
|
|
|
///
|
|
|
|
/// Returns `Err(Error)` on errors.
|
|
|
|
fn queue_buffer(&self, stream: &mut Stream) -> Result<(), gst::FlowError> {
|
|
|
|
// Loop up to two times here to first retrieve the current buffer and then potentially
|
|
|
|
// already calculate its duration based on the next queued buffer.
|
|
|
|
loop {
|
|
|
|
match stream.pending_buffer {
|
|
|
|
Some(PendingBuffer {
|
|
|
|
duration: Some(_), ..
|
|
|
|
}) => return Ok(()),
|
|
|
|
Some(PendingBuffer {
|
|
|
|
timestamp,
|
|
|
|
pts,
|
|
|
|
ref buffer,
|
|
|
|
ref mut duration,
|
|
|
|
..
|
|
|
|
}) => {
|
|
|
|
// Already have a pending buffer but no duration, so try to get that now
|
2022-11-09 07:02:35 +00:00
|
|
|
let (segment, buffer) = match self.peek_buffer(
|
|
|
|
&stream.sinkpad,
|
|
|
|
stream.delta_frames,
|
|
|
|
&mut stream.pre_queue,
|
|
|
|
&stream.running_time_utc_time_mapping,
|
|
|
|
)? {
|
|
|
|
Some(res) => res,
|
2022-11-07 18:41:50 +00:00
|
|
|
None => {
|
|
|
|
if stream.sinkpad.is_eos() {
|
|
|
|
let dur = buffer.duration().unwrap_or(gst::ClockTime::ZERO);
|
|
|
|
gst::trace!(
|
|
|
|
CAT,
|
|
|
|
obj: stream.sinkpad,
|
|
|
|
"Stream is EOS, using {dur} as duration for queued buffer",
|
|
|
|
);
|
|
|
|
|
|
|
|
let pts = pts + dur;
|
|
|
|
if stream.end_pts.map_or(true, |end_pts| end_pts < pts) {
|
|
|
|
gst::trace!(CAT, obj: stream.sinkpad, "Stream end PTS {pts}");
|
|
|
|
stream.end_pts = Some(pts);
|
|
|
|
}
|
|
|
|
|
|
|
|
*duration = Some(dur);
|
|
|
|
|
|
|
|
return Ok(());
|
|
|
|
} else {
|
|
|
|
gst::trace!(CAT, obj: stream.sinkpad, "Stream has no buffer queued");
|
|
|
|
return Err(gst_base::AGGREGATOR_FLOW_NEED_DATA);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
2022-11-09 07:02:35 +00:00
|
|
|
// Was checked above
|
|
|
|
let pts_position = buffer.pts().unwrap();
|
2022-11-07 18:41:50 +00:00
|
|
|
let next_timestamp_position = if stream.delta_frames.requires_dts() {
|
|
|
|
// Was checked above
|
|
|
|
buffer.dts().unwrap()
|
|
|
|
} else {
|
|
|
|
pts_position
|
|
|
|
};
|
|
|
|
|
2022-11-09 07:02:35 +00:00
|
|
|
let next_timestamp = segment
|
|
|
|
.to_running_time_full(next_timestamp_position)
|
|
|
|
.unwrap();
|
2022-11-07 18:41:50 +00:00
|
|
|
|
|
|
|
gst::trace!(
|
|
|
|
CAT,
|
|
|
|
obj: stream.sinkpad,
|
|
|
|
"Stream has buffer with timestamp {next_timestamp} queued",
|
|
|
|
);
|
|
|
|
|
|
|
|
let dur = next_timestamp
|
|
|
|
.saturating_sub(timestamp)
|
|
|
|
.positive()
|
|
|
|
.unwrap_or_else(|| {
|
|
|
|
gst::warning!(
|
|
|
|
CAT,
|
|
|
|
obj: stream.sinkpad,
|
|
|
|
"Stream timestamps going backwards {next_timestamp} < {timestamp}",
|
|
|
|
);
|
|
|
|
gst::ClockTime::ZERO
|
|
|
|
});
|
|
|
|
|
|
|
|
gst::trace!(
|
|
|
|
CAT,
|
|
|
|
obj: stream.sinkpad,
|
|
|
|
"Using {dur} as duration for queued buffer",
|
|
|
|
);
|
|
|
|
|
|
|
|
let pts = pts + dur;
|
|
|
|
if stream.end_pts.map_or(true, |end_pts| end_pts < pts) {
|
|
|
|
gst::trace!(CAT, obj: stream.sinkpad, "Stream end PTS {pts}");
|
|
|
|
stream.end_pts = Some(pts);
|
|
|
|
}
|
|
|
|
|
|
|
|
*duration = Some(dur);
|
|
|
|
|
|
|
|
return Ok(());
|
|
|
|
}
|
|
|
|
None => {
|
|
|
|
// Have no buffer queued at all yet
|
|
|
|
|
2022-11-09 07:02:35 +00:00
|
|
|
let (segment, buffer) = match self.pop_buffer(
|
|
|
|
&stream.sinkpad,
|
|
|
|
stream.delta_frames,
|
|
|
|
&mut stream.pre_queue,
|
|
|
|
&mut stream.running_time_utc_time_mapping,
|
|
|
|
)? {
|
|
|
|
Some(res) => res,
|
2022-11-07 18:41:50 +00:00
|
|
|
None => {
|
|
|
|
if stream.sinkpad.is_eos() {
|
|
|
|
gst::trace!(
|
|
|
|
CAT,
|
|
|
|
obj: stream.sinkpad,
|
|
|
|
"Stream is EOS",
|
|
|
|
);
|
|
|
|
|
|
|
|
return Err(gst::FlowError::Eos);
|
|
|
|
} else {
|
|
|
|
gst::trace!(CAT, obj: stream.sinkpad, "Stream has no buffer queued");
|
|
|
|
return Err(gst_base::AGGREGATOR_FLOW_NEED_DATA);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
2022-11-09 07:02:35 +00:00
|
|
|
// Was checked above
|
|
|
|
let pts_position = buffer.pts().unwrap();
|
2022-11-07 18:41:50 +00:00
|
|
|
let dts_position = buffer.dts();
|
|
|
|
|
2022-11-09 07:02:35 +00:00
|
|
|
let pts = segment.to_running_time_full(pts_position).unwrap()
|
|
|
|
.positive().unwrap_or_else(|| {
|
|
|
|
gst::error!(CAT, obj: stream.sinkpad, "Stream has negative PTS running time");
|
|
|
|
gst::ClockTime::ZERO
|
|
|
|
});
|
2022-11-07 18:41:50 +00:00
|
|
|
|
2022-11-09 07:02:35 +00:00
|
|
|
let dts = dts_position
|
|
|
|
.map(|dts_position| segment.to_running_time_full(dts_position).unwrap());
|
2022-11-07 18:41:50 +00:00
|
|
|
|
|
|
|
let timestamp = if stream.delta_frames.requires_dts() {
|
|
|
|
// Was checked above
|
|
|
|
let dts = dts.unwrap();
|
|
|
|
|
|
|
|
if stream.start_dts.is_none() {
|
|
|
|
gst::debug!(CAT, obj: stream.sinkpad, "Stream start DTS {dts}");
|
|
|
|
stream.start_dts = Some(dts);
|
|
|
|
}
|
|
|
|
|
|
|
|
dts
|
|
|
|
} else {
|
|
|
|
gst::Signed::Positive(pts)
|
|
|
|
};
|
|
|
|
|
|
|
|
if stream
|
|
|
|
.earliest_pts
|
|
|
|
.map_or(true, |earliest_pts| earliest_pts > pts)
|
|
|
|
{
|
|
|
|
gst::debug!(CAT, obj: stream.sinkpad, "Stream earliest PTS {pts}");
|
|
|
|
stream.earliest_pts = Some(pts);
|
|
|
|
}
|
|
|
|
|
|
|
|
let composition_time_offset = if stream.delta_frames.requires_dts() {
|
|
|
|
let pts = gst::Signed::Positive(pts);
|
|
|
|
let dts = dts.unwrap(); // set above
|
|
|
|
|
|
|
|
if pts > dts {
|
|
|
|
Some(i64::try_from((pts - dts).nseconds().positive().unwrap()).map_err(|_| {
|
|
|
|
gst::error!(CAT, obj: stream.sinkpad, "Too big PTS/DTS difference");
|
|
|
|
gst::FlowError::Error
|
|
|
|
})?)
|
|
|
|
} else {
|
|
|
|
let diff = i64::try_from((dts - pts).nseconds().positive().unwrap()).map_err(|_| {
|
|
|
|
gst::error!(CAT, obj: stream.sinkpad, "Too big PTS/DTS difference");
|
|
|
|
gst::FlowError::Error
|
|
|
|
})?;
|
|
|
|
Some(-diff)
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
None
|
|
|
|
};
|
|
|
|
|
|
|
|
gst::trace!(
|
|
|
|
CAT,
|
|
|
|
obj: stream.sinkpad,
|
|
|
|
"Stream has buffer of size {} with timestamp {timestamp} pending",
|
|
|
|
buffer.size(),
|
|
|
|
);
|
|
|
|
|
|
|
|
stream.pending_buffer = Some(PendingBuffer {
|
|
|
|
buffer,
|
|
|
|
timestamp,
|
|
|
|
pts,
|
|
|
|
composition_time_offset,
|
|
|
|
duration: None,
|
|
|
|
});
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
fn find_earliest_stream(
|
|
|
|
&self,
|
|
|
|
settings: &Settings,
|
|
|
|
state: &mut State,
|
|
|
|
) -> Result<Option<usize>, gst::FlowError> {
|
|
|
|
if let Some(current_stream_idx) = state.current_stream_idx {
|
|
|
|
// If a stream was previously selected, check if another buffer from
|
|
|
|
// this stream can be consumed or if that would exceed the interleave.
|
|
|
|
|
|
|
|
let single_stream = state.streams.len() == 1;
|
|
|
|
let stream = &mut state.streams[current_stream_idx];
|
|
|
|
|
|
|
|
match self.queue_buffer(stream) {
|
|
|
|
Ok(_) => {
|
|
|
|
assert!(matches!(
|
|
|
|
stream.pending_buffer,
|
|
|
|
Some(PendingBuffer {
|
|
|
|
duration: Some(_),
|
|
|
|
..
|
|
|
|
})
|
|
|
|
));
|
|
|
|
|
|
|
|
if single_stream
|
|
|
|
|| (settings.interleave_bytes.map_or(true, |interleave_bytes| {
|
|
|
|
interleave_bytes >= stream.queued_chunk_bytes
|
|
|
|
}) && settings.interleave_time.map_or(true, |interleave_time| {
|
|
|
|
interleave_time >= stream.queued_chunk_time
|
|
|
|
}))
|
|
|
|
{
|
|
|
|
gst::trace!(CAT,
|
|
|
|
obj: stream.sinkpad,
|
2022-11-09 07:02:35 +00:00
|
|
|
"Continuing current chunk: single stream {single_stream}, or {} >= {} and {} >= {}",
|
2022-11-07 18:41:50 +00:00
|
|
|
gst::format::Bytes::from_u64(stream.queued_chunk_bytes),
|
|
|
|
settings.interleave_bytes.map(gst::format::Bytes::from_u64).display(),
|
|
|
|
stream.queued_chunk_time, settings.interleave_time.display(),
|
|
|
|
);
|
|
|
|
return Ok(Some(current_stream_idx));
|
|
|
|
}
|
|
|
|
|
|
|
|
state.current_stream_idx = None;
|
|
|
|
gst::debug!(CAT,
|
|
|
|
obj: stream.sinkpad,
|
|
|
|
"Switching to next chunk: {} < {} and {} < {}",
|
|
|
|
gst::format::Bytes::from_u64(stream.queued_chunk_bytes),
|
|
|
|
settings.interleave_bytes.map(gst::format::Bytes::from_u64).display(),
|
|
|
|
stream.queued_chunk_time, settings.interleave_time.display(),
|
|
|
|
);
|
|
|
|
}
|
|
|
|
Err(gst::FlowError::Eos) => {
|
|
|
|
gst::debug!(CAT, obj: stream.sinkpad, "Stream is EOS, switching to next stream");
|
|
|
|
state.current_stream_idx = None;
|
|
|
|
}
|
|
|
|
Err(err) => {
|
|
|
|
return Err(err);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Otherwise find the next earliest stream here
|
|
|
|
let mut earliest_stream = None;
|
|
|
|
let mut all_have_data_or_eos = true;
|
|
|
|
let mut all_eos = true;
|
|
|
|
|
|
|
|
for (idx, stream) in state.streams.iter_mut().enumerate() {
|
|
|
|
// First queue a buffer on each stream and try to get the duration
|
|
|
|
|
|
|
|
match self.queue_buffer(stream) {
|
|
|
|
Ok(_) => {
|
|
|
|
assert!(matches!(
|
|
|
|
stream.pending_buffer,
|
|
|
|
Some(PendingBuffer {
|
|
|
|
duration: Some(_),
|
|
|
|
..
|
|
|
|
})
|
|
|
|
));
|
|
|
|
|
|
|
|
let timestamp = stream.pending_buffer.as_ref().unwrap().timestamp;
|
|
|
|
|
|
|
|
gst::trace!(CAT,
|
|
|
|
obj: stream.sinkpad,
|
|
|
|
"Stream at timestamp {timestamp}",
|
|
|
|
);
|
|
|
|
|
|
|
|
all_eos = false;
|
|
|
|
|
|
|
|
if earliest_stream
|
|
|
|
.as_ref()
|
|
|
|
.map_or(true, |(_idx, _stream, earliest_timestamp)| {
|
|
|
|
*earliest_timestamp > timestamp
|
|
|
|
})
|
|
|
|
{
|
|
|
|
earliest_stream = Some((idx, stream, timestamp));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
Err(gst::FlowError::Eos) => {
|
|
|
|
all_eos &= true;
|
|
|
|
continue;
|
|
|
|
}
|
|
|
|
Err(gst_base::AGGREGATOR_FLOW_NEED_DATA) => {
|
|
|
|
all_have_data_or_eos = false;
|
|
|
|
continue;
|
|
|
|
}
|
|
|
|
Err(err) => {
|
|
|
|
return Err(err);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if !all_have_data_or_eos {
|
|
|
|
gst::trace!(CAT, imp: self, "Not all streams have a buffer or are EOS");
|
|
|
|
Err(gst_base::AGGREGATOR_FLOW_NEED_DATA)
|
|
|
|
} else if all_eos {
|
|
|
|
gst::info!(CAT, imp: self, "All streams are EOS");
|
|
|
|
Err(gst::FlowError::Eos)
|
|
|
|
} else if let Some((idx, stream, earliest_timestamp)) = earliest_stream {
|
|
|
|
gst::debug!(
|
|
|
|
CAT,
|
|
|
|
obj: stream.sinkpad,
|
|
|
|
"Stream is earliest stream with timestamp {earliest_timestamp}",
|
|
|
|
);
|
|
|
|
|
|
|
|
gst::debug!(
|
|
|
|
CAT,
|
|
|
|
obj: stream.sinkpad,
|
|
|
|
"Starting new chunk at offset {}",
|
|
|
|
state.current_offset,
|
|
|
|
);
|
|
|
|
|
|
|
|
stream.chunks.push(super::Chunk {
|
|
|
|
offset: state.current_offset,
|
|
|
|
samples: Vec::new(),
|
|
|
|
});
|
|
|
|
stream.queued_chunk_time = gst::ClockTime::ZERO;
|
|
|
|
stream.queued_chunk_bytes = 0;
|
|
|
|
|
|
|
|
state.current_stream_idx = Some(idx);
|
|
|
|
Ok(Some(idx))
|
|
|
|
} else {
|
|
|
|
unreachable!()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
fn drain_buffers(
|
|
|
|
&self,
|
|
|
|
settings: &Settings,
|
|
|
|
state: &mut State,
|
|
|
|
buffers: &mut gst::BufferListRef,
|
|
|
|
) -> Result<(), gst::FlowError> {
|
|
|
|
// Now we can start handling buffers
|
|
|
|
while let Some(idx) = self.find_earliest_stream(settings, state)? {
|
|
|
|
let stream = &mut state.streams[idx];
|
|
|
|
let buffer = stream.pending_buffer.take().unwrap();
|
2022-11-09 07:02:35 +00:00
|
|
|
|
2022-11-10 08:51:10 +00:00
|
|
|
if buffer.buffer.flags().contains(gst::BufferFlags::GAP)
|
|
|
|
&& buffer.buffer.flags().contains(gst::BufferFlags::DROPPABLE)
|
|
|
|
&& buffer.buffer.size() == 0
|
|
|
|
{
|
|
|
|
gst::trace!(CAT, obj: stream.sinkpad, "Skipping gap buffer {buffer:?}");
|
|
|
|
continue;
|
|
|
|
}
|
|
|
|
|
2022-11-09 07:02:35 +00:00
|
|
|
gst::trace!(CAT, obj: stream.sinkpad, "Handling buffer {buffer:?} at offset {}", state.current_offset);
|
|
|
|
|
2022-11-07 18:41:50 +00:00
|
|
|
let duration = buffer.duration.unwrap();
|
|
|
|
let composition_time_offset = buffer.composition_time_offset;
|
|
|
|
let mut buffer = buffer.buffer;
|
|
|
|
|
|
|
|
stream.queued_chunk_time += duration;
|
|
|
|
stream.queued_chunk_bytes += buffer.size() as u64;
|
|
|
|
|
|
|
|
stream
|
|
|
|
.chunks
|
|
|
|
.last_mut()
|
|
|
|
.unwrap()
|
|
|
|
.samples
|
|
|
|
.push(super::Sample {
|
|
|
|
sync_point: !buffer.flags().contains(gst::BufferFlags::DELTA_UNIT),
|
|
|
|
duration,
|
|
|
|
composition_time_offset,
|
|
|
|
size: buffer.size() as u32,
|
|
|
|
});
|
|
|
|
|
|
|
|
{
|
|
|
|
let buffer = buffer.make_mut();
|
|
|
|
buffer.set_dts(None);
|
|
|
|
buffer.set_pts(None);
|
|
|
|
buffer.set_duration(duration);
|
|
|
|
buffer.unset_flags(gst::BufferFlags::all());
|
|
|
|
}
|
|
|
|
|
|
|
|
state.current_offset += buffer.size() as u64;
|
|
|
|
state.mdat_size += buffer.size() as u64;
|
|
|
|
buffers.add(buffer);
|
|
|
|
}
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
|
|
|
fn create_streams(&self, state: &mut State) -> Result<(), gst::FlowError> {
|
|
|
|
gst::info!(CAT, imp: self, "Creating streams");
|
|
|
|
|
|
|
|
for pad in self
|
|
|
|
.obj()
|
|
|
|
.sink_pads()
|
|
|
|
.into_iter()
|
|
|
|
.map(|pad| pad.downcast::<super::MP4MuxPad>().unwrap())
|
|
|
|
{
|
|
|
|
let caps = match pad.current_caps() {
|
|
|
|
Some(caps) => caps,
|
|
|
|
None => {
|
|
|
|
gst::warning!(CAT, obj: pad, "Skipping pad without caps");
|
|
|
|
continue;
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
2022-11-09 07:02:35 +00:00
|
|
|
gst::info!(CAT, obj: pad, "Configuring caps {caps:?}");
|
2022-11-07 18:41:50 +00:00
|
|
|
|
|
|
|
let s = caps.structure(0).unwrap();
|
|
|
|
|
|
|
|
let mut delta_frames = super::DeltaFrames::IntraOnly;
|
|
|
|
match s.name() {
|
|
|
|
"video/x-h264" | "video/x-h265" => {
|
|
|
|
if !s.has_field_with_type("codec_data", gst::Buffer::static_type()) {
|
|
|
|
gst::error!(CAT, obj: pad, "Received caps without codec_data");
|
|
|
|
return Err(gst::FlowError::NotNegotiated);
|
|
|
|
}
|
|
|
|
delta_frames = super::DeltaFrames::Bidirectional;
|
|
|
|
}
|
|
|
|
"video/x-vp9" => {
|
|
|
|
if !s.has_field_with_type("colorimetry", str::static_type()) {
|
|
|
|
gst::error!(CAT, obj: pad, "Received caps without colorimetry");
|
|
|
|
return Err(gst::FlowError::NotNegotiated);
|
|
|
|
}
|
|
|
|
delta_frames = super::DeltaFrames::PredictiveOnly;
|
|
|
|
}
|
|
|
|
"image/jpeg" => (),
|
|
|
|
"audio/mpeg" => {
|
|
|
|
if !s.has_field_with_type("codec_data", gst::Buffer::static_type()) {
|
|
|
|
gst::error!(CAT, obj: pad, "Received caps without codec_data");
|
|
|
|
return Err(gst::FlowError::NotNegotiated);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
"audio/x-opus" => {
|
|
|
|
if let Some(header) = s
|
|
|
|
.get::<gst::ArrayRef>("streamheader")
|
|
|
|
.ok()
|
|
|
|
.and_then(|a| a.get(0).and_then(|v| v.get::<gst::Buffer>().ok()))
|
|
|
|
{
|
|
|
|
if gst_pbutils::codec_utils_opus_parse_header(&header, None).is_err() {
|
|
|
|
gst::error!(CAT, obj: pad, "Received invalid Opus header");
|
|
|
|
return Err(gst::FlowError::NotNegotiated);
|
|
|
|
}
|
|
|
|
} else if gst_pbutils::codec_utils_opus_parse_caps(&caps, None).is_err() {
|
|
|
|
gst::error!(CAT, obj: pad, "Received invalid Opus caps");
|
|
|
|
return Err(gst::FlowError::NotNegotiated);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
"audio/x-alaw" | "audio/x-mulaw" => (),
|
|
|
|
"audio/x-adpcm" => (),
|
|
|
|
"application/x-onvif-metadata" => (),
|
|
|
|
_ => unreachable!(),
|
|
|
|
}
|
|
|
|
|
|
|
|
state.streams.push(Stream {
|
|
|
|
sinkpad: pad,
|
2022-11-09 07:02:35 +00:00
|
|
|
pre_queue: VecDeque::new(),
|
2022-11-07 18:41:50 +00:00
|
|
|
caps,
|
|
|
|
delta_frames,
|
|
|
|
chunks: Vec::new(),
|
|
|
|
pending_buffer: None,
|
|
|
|
queued_chunk_time: gst::ClockTime::ZERO,
|
|
|
|
queued_chunk_bytes: 0,
|
|
|
|
start_dts: None,
|
|
|
|
earliest_pts: None,
|
|
|
|
end_pts: None,
|
2022-11-09 07:02:35 +00:00
|
|
|
running_time_utc_time_mapping: None,
|
2022-11-07 18:41:50 +00:00
|
|
|
});
|
|
|
|
}
|
|
|
|
|
|
|
|
if state.streams.is_empty() {
|
|
|
|
gst::error!(CAT, imp: self, "No streams available");
|
|
|
|
return Err(gst::FlowError::Error);
|
|
|
|
}
|
|
|
|
|
|
|
|
// Sort video streams first and then audio streams and then metadata streams, and each group by pad name.
|
|
|
|
state.streams.sort_by(|a, b| {
|
|
|
|
let order_of_caps = |caps: &gst::CapsRef| {
|
|
|
|
let s = caps.structure(0).unwrap();
|
|
|
|
|
|
|
|
if s.name().starts_with("video/") {
|
|
|
|
0
|
|
|
|
} else if s.name().starts_with("audio/") {
|
|
|
|
1
|
|
|
|
} else if s.name().starts_with("application/x-onvif-metadata") {
|
|
|
|
2
|
|
|
|
} else {
|
|
|
|
unimplemented!();
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
|
|
|
let st_a = order_of_caps(&a.caps);
|
|
|
|
let st_b = order_of_caps(&b.caps);
|
|
|
|
|
|
|
|
if st_a == st_b {
|
|
|
|
return a.sinkpad.name().cmp(&b.sinkpad.name());
|
|
|
|
}
|
|
|
|
|
|
|
|
st_a.cmp(&st_b)
|
|
|
|
});
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
#[glib::object_subclass]
|
|
|
|
impl ObjectSubclass for MP4Mux {
|
|
|
|
const NAME: &'static str = "GstRsMP4Mux";
|
|
|
|
type Type = super::MP4Mux;
|
|
|
|
type ParentType = gst_base::Aggregator;
|
|
|
|
type Class = Class;
|
|
|
|
}
|
|
|
|
|
|
|
|
impl ObjectImpl for MP4Mux {
|
|
|
|
fn properties() -> &'static [glib::ParamSpec] {
|
|
|
|
static PROPERTIES: Lazy<Vec<glib::ParamSpec>> = Lazy::new(|| {
|
|
|
|
vec![
|
|
|
|
glib::ParamSpecUInt64::builder("interleave-bytes")
|
|
|
|
.nick("Interleave Bytes")
|
|
|
|
.blurb("Interleave between streams in bytes")
|
|
|
|
.default_value(DEFAULT_INTERLEAVE_BYTES.unwrap_or(0))
|
|
|
|
.mutable_ready()
|
|
|
|
.build(),
|
|
|
|
glib::ParamSpecUInt64::builder("interleave-time")
|
|
|
|
.nick("Interleave Time")
|
|
|
|
.blurb("Interleave between streams in nanoseconds")
|
|
|
|
.default_value(
|
|
|
|
DEFAULT_INTERLEAVE_TIME
|
|
|
|
.map(gst::ClockTime::nseconds)
|
|
|
|
.unwrap_or(u64::MAX),
|
|
|
|
)
|
|
|
|
.mutable_ready()
|
|
|
|
.build(),
|
|
|
|
glib::ParamSpecUInt::builder("movie-timescale")
|
|
|
|
.nick("Movie Timescale")
|
|
|
|
.blurb("Timescale to use for the movie (units per second, 0 is automatic)")
|
|
|
|
.mutable_ready()
|
|
|
|
.build(),
|
|
|
|
]
|
|
|
|
});
|
|
|
|
|
|
|
|
&PROPERTIES
|
|
|
|
}
|
|
|
|
|
|
|
|
fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) {
|
|
|
|
match pspec.name() {
|
|
|
|
"interleave-bytes" => {
|
|
|
|
let mut settings = self.settings.lock().unwrap();
|
|
|
|
settings.interleave_bytes = match value.get().expect("type checked upstream") {
|
|
|
|
0 => None,
|
|
|
|
v => Some(v),
|
|
|
|
};
|
|
|
|
}
|
|
|
|
|
|
|
|
"interleave-time" => {
|
|
|
|
let mut settings = self.settings.lock().unwrap();
|
|
|
|
settings.interleave_time = match value.get().expect("type checked upstream") {
|
|
|
|
Some(gst::ClockTime::ZERO) | None => None,
|
|
|
|
v => v,
|
|
|
|
};
|
|
|
|
}
|
|
|
|
|
|
|
|
"movie-timescale" => {
|
|
|
|
let mut settings = self.settings.lock().unwrap();
|
|
|
|
settings.movie_timescale = value.get().expect("type checked upstream");
|
|
|
|
}
|
|
|
|
|
|
|
|
_ => unimplemented!(),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
|
|
|
|
match pspec.name() {
|
|
|
|
"interleave-bytes" => {
|
|
|
|
let settings = self.settings.lock().unwrap();
|
|
|
|
settings.interleave_bytes.unwrap_or(0).to_value()
|
|
|
|
}
|
|
|
|
|
|
|
|
"interleave-time" => {
|
|
|
|
let settings = self.settings.lock().unwrap();
|
|
|
|
settings.interleave_time.to_value()
|
|
|
|
}
|
|
|
|
|
|
|
|
"movie-timescale" => {
|
|
|
|
let settings = self.settings.lock().unwrap();
|
|
|
|
settings.movie_timescale.to_value()
|
|
|
|
}
|
|
|
|
|
|
|
|
_ => unimplemented!(),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl GstObjectImpl for MP4Mux {}
|
|
|
|
|
|
|
|
impl ElementImpl for MP4Mux {
|
|
|
|
fn request_new_pad(
|
|
|
|
&self,
|
|
|
|
templ: &gst::PadTemplate,
|
|
|
|
name: Option<&str>,
|
|
|
|
caps: Option<&gst::Caps>,
|
|
|
|
) -> Option<gst::Pad> {
|
|
|
|
let state = self.state.lock().unwrap();
|
|
|
|
if !state.streams.is_empty() {
|
|
|
|
gst::error!(
|
|
|
|
CAT,
|
|
|
|
imp: self,
|
2022-11-09 07:02:35 +00:00
|
|
|
"Can't request new pads after stream was started"
|
2022-11-07 18:41:50 +00:00
|
|
|
);
|
|
|
|
return None;
|
|
|
|
}
|
|
|
|
|
|
|
|
self.parent_request_new_pad(templ, name, caps)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl AggregatorImpl for MP4Mux {
|
|
|
|
fn next_time(&self) -> Option<gst::ClockTime> {
|
|
|
|
None
|
|
|
|
}
|
|
|
|
|
|
|
|
fn sink_query(
|
|
|
|
&self,
|
|
|
|
aggregator_pad: &gst_base::AggregatorPad,
|
|
|
|
query: &mut gst::QueryRef,
|
|
|
|
) -> bool {
|
|
|
|
use gst::QueryViewMut;
|
|
|
|
|
2022-11-09 07:02:35 +00:00
|
|
|
gst::trace!(CAT, obj: aggregator_pad, "Handling query {query:?}");
|
2022-11-07 18:41:50 +00:00
|
|
|
|
|
|
|
match query.view_mut() {
|
|
|
|
QueryViewMut::Caps(q) => {
|
|
|
|
let allowed_caps = aggregator_pad
|
|
|
|
.current_caps()
|
|
|
|
.unwrap_or_else(|| aggregator_pad.pad_template_caps());
|
|
|
|
|
|
|
|
if let Some(filter_caps) = q.filter() {
|
|
|
|
let res = filter_caps
|
|
|
|
.intersect_with_mode(&allowed_caps, gst::CapsIntersectMode::First);
|
|
|
|
q.set_result(&res);
|
|
|
|
} else {
|
|
|
|
q.set_result(&allowed_caps);
|
|
|
|
}
|
|
|
|
|
|
|
|
true
|
|
|
|
}
|
|
|
|
_ => self.parent_sink_query(aggregator_pad, query),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
fn sink_event_pre_queue(
|
|
|
|
&self,
|
|
|
|
aggregator_pad: &gst_base::AggregatorPad,
|
|
|
|
mut event: gst::Event,
|
|
|
|
) -> Result<gst::FlowSuccess, gst::FlowError> {
|
|
|
|
use gst::EventView;
|
|
|
|
|
2022-11-09 07:02:35 +00:00
|
|
|
gst::trace!(CAT, obj: aggregator_pad, "Handling event {event:?}");
|
2022-11-07 18:41:50 +00:00
|
|
|
|
|
|
|
match event.view() {
|
|
|
|
EventView::Segment(ev) => {
|
|
|
|
if ev.segment().format() != gst::Format::Time {
|
|
|
|
gst::warning!(
|
|
|
|
CAT,
|
|
|
|
obj: aggregator_pad,
|
|
|
|
"Received non-TIME segment, replacing with default TIME segment"
|
|
|
|
);
|
|
|
|
let segment = gst::FormattedSegment::<gst::ClockTime>::new();
|
|
|
|
event = gst::event::Segment::builder(&segment)
|
|
|
|
.seqnum(event.seqnum())
|
|
|
|
.build();
|
|
|
|
}
|
|
|
|
self.parent_sink_event_pre_queue(aggregator_pad, event)
|
|
|
|
}
|
|
|
|
_ => self.parent_sink_event_pre_queue(aggregator_pad, event),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
fn sink_event(&self, aggregator_pad: &gst_base::AggregatorPad, event: gst::Event) -> bool {
|
|
|
|
use gst::EventView;
|
|
|
|
|
2022-11-09 07:02:35 +00:00
|
|
|
gst::trace!(CAT, obj: aggregator_pad, "Handling event {event:?}");
|
2022-11-07 18:41:50 +00:00
|
|
|
|
|
|
|
match event.view() {
|
|
|
|
EventView::Tag(_ev) => {
|
|
|
|
// TODO: Maybe store for putting into the header at the end?
|
|
|
|
|
|
|
|
self.parent_sink_event(aggregator_pad, event)
|
|
|
|
}
|
|
|
|
_ => self.parent_sink_event(aggregator_pad, event),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
fn src_query(&self, query: &mut gst::QueryRef) -> bool {
|
|
|
|
use gst::QueryViewMut;
|
|
|
|
|
2022-11-09 07:02:35 +00:00
|
|
|
gst::trace!(CAT, imp: self, "Handling query {query:?}");
|
2022-11-07 18:41:50 +00:00
|
|
|
|
|
|
|
match query.view_mut() {
|
|
|
|
QueryViewMut::Seeking(q) => {
|
|
|
|
// We can't really handle seeking, it would break everything
|
|
|
|
q.set(false, gst::ClockTime::ZERO, gst::ClockTime::NONE);
|
|
|
|
true
|
|
|
|
}
|
|
|
|
_ => self.parent_src_query(query),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
fn src_event(&self, event: gst::Event) -> bool {
|
|
|
|
use gst::EventView;
|
|
|
|
|
2022-11-09 07:02:35 +00:00
|
|
|
gst::trace!(CAT, imp: self, "Handling event {event:?}");
|
2022-11-07 18:41:50 +00:00
|
|
|
|
|
|
|
match event.view() {
|
|
|
|
EventView::Seek(_ev) => false,
|
|
|
|
_ => self.parent_src_event(event),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
fn flush(&self) -> Result<gst::FlowSuccess, gst::FlowError> {
|
2022-11-09 07:02:35 +00:00
|
|
|
gst::info!(CAT, imp: self, "Flushing");
|
|
|
|
|
2022-11-07 18:41:50 +00:00
|
|
|
let mut state = self.state.lock().unwrap();
|
|
|
|
for stream in &mut state.streams {
|
|
|
|
stream.pending_buffer = None;
|
2022-11-09 07:02:35 +00:00
|
|
|
stream.pre_queue.clear();
|
|
|
|
stream.running_time_utc_time_mapping = None;
|
2022-11-07 18:41:50 +00:00
|
|
|
}
|
|
|
|
drop(state);
|
|
|
|
|
|
|
|
self.parent_flush()
|
|
|
|
}
|
|
|
|
|
|
|
|
fn stop(&self) -> Result<(), gst::ErrorMessage> {
|
|
|
|
gst::trace!(CAT, imp: self, "Stopping");
|
|
|
|
|
|
|
|
let _ = self.parent_stop();
|
|
|
|
|
|
|
|
*self.state.lock().unwrap() = State::default();
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
|
|
|
fn start(&self) -> Result<(), gst::ErrorMessage> {
|
|
|
|
gst::trace!(CAT, imp: self, "Starting");
|
|
|
|
|
|
|
|
self.parent_start()?;
|
|
|
|
|
|
|
|
// Always output a BYTES segment
|
|
|
|
let segment = gst::FormattedSegment::<gst::format::Bytes>::new();
|
|
|
|
self.obj().update_segment(&segment);
|
|
|
|
|
|
|
|
*self.state.lock().unwrap() = State::default();
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
|
|
|
fn negotiate(&self) -> bool {
|
|
|
|
true
|
|
|
|
}
|
|
|
|
|
|
|
|
fn aggregate(&self, _timeout: bool) -> Result<gst::FlowSuccess, gst::FlowError> {
|
|
|
|
let settings = self.settings.lock().unwrap().clone();
|
|
|
|
let mut state = self.state.lock().unwrap();
|
|
|
|
|
|
|
|
let mut buffers = gst::BufferList::new();
|
|
|
|
let mut caps = None;
|
|
|
|
|
|
|
|
// If no streams were created yet, collect all streams now and write the mdat.
|
|
|
|
if state.streams.is_empty() {
|
|
|
|
// First check if downstream is seekable. If not we can't rewrite the mdat box header!
|
|
|
|
drop(state);
|
|
|
|
|
|
|
|
let mut q = gst::query::Seeking::new(gst::Format::Bytes);
|
|
|
|
if self.obj().src_pad().peer_query(&mut q) {
|
|
|
|
if !q.result().0 {
|
|
|
|
gst::element_imp_error!(
|
|
|
|
self,
|
|
|
|
gst::StreamError::Mux,
|
|
|
|
["Downstream is not seekable"]
|
|
|
|
);
|
|
|
|
return Err(gst::FlowError::Error);
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
// Can't query downstream, have to assume downstream is seekable
|
|
|
|
gst::warning!(CAT, imp: self, "Can't query downstream for seekability");
|
|
|
|
}
|
|
|
|
|
|
|
|
state = self.state.lock().unwrap();
|
|
|
|
self.create_streams(&mut state)?;
|
|
|
|
|
|
|
|
// Create caps now to be sent before any buffers
|
|
|
|
caps = Some(
|
|
|
|
gst::Caps::builder("video/quicktime")
|
|
|
|
.field("variant", "iso")
|
|
|
|
.build(),
|
|
|
|
);
|
|
|
|
|
|
|
|
gst::info!(
|
|
|
|
CAT,
|
|
|
|
imp: self,
|
|
|
|
"Creating ftyp box at offset {}",
|
|
|
|
state.current_offset
|
|
|
|
);
|
|
|
|
|
|
|
|
// ... and then create the ftyp box plus mdat box header so we can start outputting
|
|
|
|
// actual data
|
|
|
|
let ftyp = boxes::create_ftyp(self.obj().class().as_ref().variant).map_err(|err| {
|
|
|
|
gst::error!(CAT, imp: self, "Failed to create ftyp box: {err}");
|
|
|
|
gst::FlowError::Error
|
|
|
|
})?;
|
|
|
|
state.current_offset += ftyp.size() as u64;
|
2022-11-09 07:02:35 +00:00
|
|
|
buffers.get_mut().unwrap().add(ftyp);
|
2022-11-07 18:41:50 +00:00
|
|
|
|
|
|
|
gst::info!(
|
|
|
|
CAT,
|
|
|
|
imp: self,
|
|
|
|
"Creating mdat box header at offset {}",
|
|
|
|
state.current_offset
|
|
|
|
);
|
|
|
|
state.mdat_offset = Some(state.current_offset);
|
|
|
|
let mdat = boxes::create_mdat_header(None).map_err(|err| {
|
|
|
|
gst::error!(CAT, imp: self, "Failed to create mdat box header: {err}");
|
|
|
|
gst::FlowError::Error
|
|
|
|
})?;
|
|
|
|
state.current_offset += mdat.size() as u64;
|
|
|
|
state.mdat_size = 0;
|
2022-11-09 07:02:35 +00:00
|
|
|
buffers.get_mut().unwrap().add(mdat);
|
2022-11-07 18:41:50 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
let res = match self.drain_buffers(&settings, &mut state, buffers.get_mut().unwrap()) {
|
|
|
|
Ok(_) => Ok(gst::FlowSuccess::Ok),
|
|
|
|
Err(err @ gst::FlowError::Eos) | Err(err @ gst_base::AGGREGATOR_FLOW_NEED_DATA) => {
|
|
|
|
Err(err)
|
|
|
|
}
|
|
|
|
Err(err) => return Err(err),
|
|
|
|
};
|
|
|
|
|
|
|
|
if res == Err(gst::FlowError::Eos) {
|
|
|
|
// Create moov box now and append it to the buffers
|
|
|
|
|
|
|
|
gst::info!(
|
|
|
|
CAT,
|
|
|
|
imp: self,
|
|
|
|
"Creating moov box now, mdat ends at offset {} with size {}",
|
|
|
|
state.current_offset,
|
|
|
|
state.mdat_size
|
|
|
|
);
|
|
|
|
|
|
|
|
let mut streams = Vec::with_capacity(state.streams.len());
|
|
|
|
for stream in state.streams.drain(..) {
|
|
|
|
let pad_settings = stream.sinkpad.imp().settings.lock().unwrap().clone();
|
|
|
|
let (earliest_pts, end_pts) = match Option::zip(stream.earliest_pts, stream.end_pts)
|
|
|
|
{
|
|
|
|
Some(res) => res,
|
|
|
|
None => continue, // empty stream
|
|
|
|
};
|
|
|
|
|
|
|
|
streams.push(super::Stream {
|
|
|
|
caps: stream.caps.clone(),
|
|
|
|
delta_frames: stream.delta_frames,
|
|
|
|
trak_timescale: pad_settings.trak_timescale,
|
|
|
|
start_dts: stream.start_dts,
|
|
|
|
earliest_pts,
|
|
|
|
end_pts,
|
|
|
|
chunks: stream.chunks,
|
|
|
|
});
|
|
|
|
}
|
|
|
|
|
|
|
|
let moov = boxes::create_moov(super::Header {
|
|
|
|
variant: self.obj().class().as_ref().variant,
|
|
|
|
movie_timescale: settings.movie_timescale,
|
|
|
|
streams,
|
|
|
|
})
|
|
|
|
.map_err(|err| {
|
|
|
|
gst::error!(CAT, imp: self, "Failed to create moov box: {err}");
|
|
|
|
gst::FlowError::Error
|
|
|
|
})?;
|
|
|
|
state.current_offset += moov.size() as u64;
|
|
|
|
buffers.get_mut().unwrap().add(moov);
|
|
|
|
}
|
|
|
|
|
|
|
|
drop(state);
|
|
|
|
|
|
|
|
if let Some(ref caps) = caps {
|
|
|
|
self.obj().set_src_caps(caps);
|
|
|
|
}
|
|
|
|
|
|
|
|
if !buffers.is_empty() {
|
|
|
|
if let Err(err) = self.obj().finish_buffer_list(buffers) {
|
2022-11-09 07:02:35 +00:00
|
|
|
gst::error!(CAT, imp: self, "Failed pushing buffers: {err:?}");
|
2022-11-07 18:41:50 +00:00
|
|
|
return Err(err);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if res == Err(gst::FlowError::Eos) {
|
|
|
|
let mut state = self.state.lock().unwrap();
|
|
|
|
|
|
|
|
if let Some(mdat_offset) = state.mdat_offset {
|
|
|
|
gst::info!(
|
|
|
|
CAT,
|
|
|
|
imp: self,
|
|
|
|
"Rewriting mdat box header at offset {mdat_offset} with size {} now",
|
|
|
|
state.mdat_size,
|
|
|
|
);
|
|
|
|
let mut segment = gst::FormattedSegment::<gst::format::Bytes>::new();
|
|
|
|
segment.set_start(gst::format::Bytes::from_u64(mdat_offset));
|
|
|
|
state.current_offset = mdat_offset;
|
|
|
|
let mdat = boxes::create_mdat_header(Some(state.mdat_size)).map_err(|err| {
|
|
|
|
gst::error!(CAT, imp: self, "Failed to create mdat box header: {err}");
|
|
|
|
gst::FlowError::Error
|
|
|
|
})?;
|
|
|
|
drop(state);
|
|
|
|
|
|
|
|
self.obj().update_segment(&segment);
|
|
|
|
if let Err(err) = self.obj().finish_buffer(mdat) {
|
|
|
|
gst::error!(
|
|
|
|
CAT,
|
|
|
|
imp: self,
|
2022-11-09 07:02:35 +00:00
|
|
|
"Failed pushing updated mdat box header buffer downstream: {err:?}",
|
2022-11-07 18:41:50 +00:00
|
|
|
);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
res
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
#[repr(C)]
|
|
|
|
pub(crate) struct Class {
|
|
|
|
parent: gst_base::ffi::GstAggregatorClass,
|
|
|
|
variant: super::Variant,
|
|
|
|
}
|
|
|
|
|
|
|
|
unsafe impl ClassStruct for Class {
|
|
|
|
type Type = MP4Mux;
|
|
|
|
}
|
|
|
|
|
|
|
|
impl std::ops::Deref for Class {
|
|
|
|
type Target = glib::Class<gst_base::Aggregator>;
|
|
|
|
|
|
|
|
fn deref(&self) -> &Self::Target {
|
|
|
|
unsafe { &*(&self.parent as *const _ as *const _) }
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
unsafe impl<T: MP4MuxImpl> IsSubclassable<T> for super::MP4Mux {
|
|
|
|
fn class_init(class: &mut glib::Class<Self>) {
|
|
|
|
Self::parent_class_init::<T>(class);
|
|
|
|
|
|
|
|
let class = class.as_mut();
|
|
|
|
class.variant = T::VARIANT;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
pub(crate) trait MP4MuxImpl: AggregatorImpl {
|
|
|
|
const VARIANT: super::Variant;
|
|
|
|
}
|
|
|
|
|
|
|
|
#[derive(Default)]
|
|
|
|
pub(crate) struct ISOMP4Mux;
|
|
|
|
|
|
|
|
#[glib::object_subclass]
|
|
|
|
impl ObjectSubclass for ISOMP4Mux {
|
|
|
|
const NAME: &'static str = "GstISOMP4Mux";
|
|
|
|
type Type = super::ISOMP4Mux;
|
|
|
|
type ParentType = super::MP4Mux;
|
|
|
|
}
|
|
|
|
|
|
|
|
impl ObjectImpl for ISOMP4Mux {}
|
|
|
|
|
|
|
|
impl GstObjectImpl for ISOMP4Mux {}
|
|
|
|
|
|
|
|
impl ElementImpl for ISOMP4Mux {
|
|
|
|
fn metadata() -> Option<&'static gst::subclass::ElementMetadata> {
|
|
|
|
static ELEMENT_METADATA: Lazy<gst::subclass::ElementMetadata> = Lazy::new(|| {
|
|
|
|
gst::subclass::ElementMetadata::new(
|
|
|
|
"ISOMP4Mux",
|
|
|
|
"Codec/Muxer",
|
|
|
|
"ISO MP4 muxer",
|
|
|
|
"Sebastian Dröge <sebastian@centricular.com>",
|
|
|
|
)
|
|
|
|
});
|
|
|
|
|
|
|
|
Some(&*ELEMENT_METADATA)
|
|
|
|
}
|
|
|
|
|
|
|
|
fn pad_templates() -> &'static [gst::PadTemplate] {
|
|
|
|
static PAD_TEMPLATES: Lazy<Vec<gst::PadTemplate>> = Lazy::new(|| {
|
|
|
|
let src_pad_template = gst::PadTemplate::new(
|
|
|
|
"src",
|
|
|
|
gst::PadDirection::Src,
|
|
|
|
gst::PadPresence::Always,
|
|
|
|
&gst::Caps::builder("video/quicktime")
|
|
|
|
.field("variant", "iso")
|
|
|
|
.build(),
|
|
|
|
)
|
|
|
|
.unwrap();
|
|
|
|
|
|
|
|
let sink_pad_template = gst::PadTemplate::with_gtype(
|
|
|
|
"sink_%u",
|
|
|
|
gst::PadDirection::Sink,
|
|
|
|
gst::PadPresence::Request,
|
|
|
|
&[
|
|
|
|
gst::Structure::builder("video/x-h264")
|
|
|
|
.field("stream-format", gst::List::new(["avc", "avc3"]))
|
|
|
|
.field("alignment", "au")
|
|
|
|
.field("width", gst::IntRange::new(1, u16::MAX as i32))
|
|
|
|
.field("height", gst::IntRange::new(1, u16::MAX as i32))
|
|
|
|
.build(),
|
|
|
|
gst::Structure::builder("video/x-h265")
|
|
|
|
.field("stream-format", gst::List::new(["hvc1", "hev1"]))
|
|
|
|
.field("alignment", "au")
|
|
|
|
.field("width", gst::IntRange::new(1, u16::MAX as i32))
|
|
|
|
.field("height", gst::IntRange::new(1, u16::MAX as i32))
|
|
|
|
.build(),
|
|
|
|
gst::Structure::builder("video/x-vp9")
|
|
|
|
.field("profile", gst::List::new(["0", "1", "2", "3"]))
|
|
|
|
.field("chroma-format", gst::List::new(["4:2:0", "4:2:2", "4:4:4"]))
|
|
|
|
.field("bit-depth-luma", gst::List::new([8u32, 10u32, 12u32]))
|
|
|
|
.field("bit-depth-chroma", gst::List::new([8u32, 10u32, 12u32]))
|
|
|
|
.field("width", gst::IntRange::new(1, u16::MAX as i32))
|
|
|
|
.field("height", gst::IntRange::new(1, u16::MAX as i32))
|
|
|
|
.build(),
|
|
|
|
gst::Structure::builder("audio/mpeg")
|
|
|
|
.field("mpegversion", 4i32)
|
|
|
|
.field("stream-format", "raw")
|
|
|
|
.field("channels", gst::IntRange::new(1, u16::MAX as i32))
|
|
|
|
.field("rate", gst::IntRange::new(1, i32::MAX))
|
|
|
|
.build(),
|
|
|
|
gst::Structure::builder("audio/x-opus")
|
|
|
|
.field("channel-mapping-family", gst::IntRange::new(0i32, 255))
|
|
|
|
.field("channels", gst::IntRange::new(1i32, 8))
|
|
|
|
.field("rate", gst::IntRange::new(1, i32::MAX))
|
|
|
|
.build(),
|
|
|
|
]
|
|
|
|
.into_iter()
|
|
|
|
.collect::<gst::Caps>(),
|
|
|
|
super::MP4MuxPad::static_type(),
|
|
|
|
)
|
|
|
|
.unwrap();
|
|
|
|
|
|
|
|
vec![src_pad_template, sink_pad_template]
|
|
|
|
});
|
|
|
|
|
|
|
|
PAD_TEMPLATES.as_ref()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl AggregatorImpl for ISOMP4Mux {}
|
|
|
|
|
|
|
|
impl MP4MuxImpl for ISOMP4Mux {
|
|
|
|
const VARIANT: super::Variant = super::Variant::ISO;
|
|
|
|
}
|
|
|
|
|
2022-11-09 07:02:35 +00:00
|
|
|
#[derive(Default)]
|
|
|
|
pub(crate) struct ONVIFMP4Mux;
|
|
|
|
|
|
|
|
#[glib::object_subclass]
|
|
|
|
impl ObjectSubclass for ONVIFMP4Mux {
|
|
|
|
const NAME: &'static str = "GstONVIFMP4Mux";
|
|
|
|
type Type = super::ONVIFMP4Mux;
|
|
|
|
type ParentType = super::MP4Mux;
|
|
|
|
}
|
|
|
|
|
|
|
|
impl ObjectImpl for ONVIFMP4Mux {}
|
|
|
|
|
|
|
|
impl GstObjectImpl for ONVIFMP4Mux {}
|
|
|
|
|
|
|
|
impl ElementImpl for ONVIFMP4Mux {
|
|
|
|
fn metadata() -> Option<&'static gst::subclass::ElementMetadata> {
|
|
|
|
static ELEMENT_METADATA: Lazy<gst::subclass::ElementMetadata> = Lazy::new(|| {
|
|
|
|
gst::subclass::ElementMetadata::new(
|
|
|
|
"ONVIFMP4Mux",
|
|
|
|
"Codec/Muxer",
|
|
|
|
"ONVIF MP4 muxer",
|
|
|
|
"Sebastian Dröge <sebastian@centricular.com>",
|
|
|
|
)
|
|
|
|
});
|
|
|
|
|
|
|
|
Some(&*ELEMENT_METADATA)
|
|
|
|
}
|
|
|
|
|
|
|
|
fn pad_templates() -> &'static [gst::PadTemplate] {
|
|
|
|
static PAD_TEMPLATES: Lazy<Vec<gst::PadTemplate>> = Lazy::new(|| {
|
|
|
|
let src_pad_template = gst::PadTemplate::new(
|
|
|
|
"src",
|
|
|
|
gst::PadDirection::Src,
|
|
|
|
gst::PadPresence::Always,
|
|
|
|
&gst::Caps::builder("video/quicktime")
|
|
|
|
.field("variant", "iso")
|
|
|
|
.build(),
|
|
|
|
)
|
|
|
|
.unwrap();
|
|
|
|
|
|
|
|
let sink_pad_template = gst::PadTemplate::with_gtype(
|
|
|
|
"sink_%u",
|
|
|
|
gst::PadDirection::Sink,
|
|
|
|
gst::PadPresence::Request,
|
|
|
|
&[
|
|
|
|
gst::Structure::builder("video/x-h264")
|
|
|
|
.field("stream-format", gst::List::new(["avc", "avc3"]))
|
|
|
|
.field("alignment", "au")
|
|
|
|
.field("width", gst::IntRange::<i32>::new(1, u16::MAX as i32))
|
|
|
|
.field("height", gst::IntRange::<i32>::new(1, u16::MAX as i32))
|
|
|
|
.build(),
|
|
|
|
gst::Structure::builder("video/x-h265")
|
|
|
|
.field("stream-format", gst::List::new(["hvc1", "hev1"]))
|
|
|
|
.field("alignment", "au")
|
|
|
|
.field("width", gst::IntRange::<i32>::new(1, u16::MAX as i32))
|
|
|
|
.field("height", gst::IntRange::<i32>::new(1, u16::MAX as i32))
|
|
|
|
.build(),
|
|
|
|
gst::Structure::builder("image/jpeg")
|
|
|
|
.field("width", gst::IntRange::<i32>::new(1, u16::MAX as i32))
|
|
|
|
.field("height", gst::IntRange::<i32>::new(1, u16::MAX as i32))
|
|
|
|
.build(),
|
|
|
|
gst::Structure::builder("audio/mpeg")
|
|
|
|
.field("mpegversion", 4i32)
|
|
|
|
.field("stream-format", "raw")
|
|
|
|
.field("channels", gst::IntRange::<i32>::new(1, u16::MAX as i32))
|
|
|
|
.field("rate", gst::IntRange::<i32>::new(1, i32::MAX))
|
|
|
|
.build(),
|
|
|
|
gst::Structure::builder("audio/x-alaw")
|
|
|
|
.field("channels", gst::IntRange::<i32>::new(1, 2))
|
|
|
|
.field("rate", gst::IntRange::<i32>::new(1, i32::MAX))
|
|
|
|
.build(),
|
|
|
|
gst::Structure::builder("audio/x-mulaw")
|
|
|
|
.field("channels", gst::IntRange::<i32>::new(1, 2))
|
|
|
|
.field("rate", gst::IntRange::<i32>::new(1, i32::MAX))
|
|
|
|
.build(),
|
|
|
|
gst::Structure::builder("audio/x-adpcm")
|
|
|
|
.field("layout", "g726")
|
|
|
|
.field("channels", 1i32)
|
|
|
|
.field("rate", 8000i32)
|
|
|
|
.field("bitrate", gst::List::new([16000i32, 24000, 32000, 40000]))
|
|
|
|
.build(),
|
|
|
|
gst::Structure::builder("application/x-onvif-metadata")
|
|
|
|
.field("parsed", true)
|
|
|
|
.build(),
|
|
|
|
]
|
|
|
|
.into_iter()
|
|
|
|
.collect::<gst::Caps>(),
|
|
|
|
super::MP4MuxPad::static_type(),
|
|
|
|
)
|
|
|
|
.unwrap();
|
|
|
|
|
|
|
|
vec![src_pad_template, sink_pad_template]
|
|
|
|
});
|
|
|
|
|
|
|
|
PAD_TEMPLATES.as_ref()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl AggregatorImpl for ONVIFMP4Mux {}
|
|
|
|
|
|
|
|
impl MP4MuxImpl for ONVIFMP4Mux {
|
|
|
|
const VARIANT: super::Variant = super::Variant::ONVIF;
|
|
|
|
}
|
|
|
|
|
2022-11-07 18:41:50 +00:00
|
|
|
#[derive(Default, Clone)]
|
|
|
|
struct PadSettings {
|
|
|
|
trak_timescale: u32,
|
|
|
|
}
|
|
|
|
|
|
|
|
#[derive(Default)]
|
|
|
|
pub(crate) struct MP4MuxPad {
|
|
|
|
settings: Mutex<PadSettings>,
|
|
|
|
}
|
|
|
|
|
|
|
|
#[glib::object_subclass]
|
|
|
|
impl ObjectSubclass for MP4MuxPad {
|
|
|
|
const NAME: &'static str = "GstRsMP4MuxPad";
|
|
|
|
type Type = super::MP4MuxPad;
|
|
|
|
type ParentType = gst_base::AggregatorPad;
|
|
|
|
}
|
|
|
|
|
|
|
|
impl ObjectImpl for MP4MuxPad {
|
|
|
|
fn properties() -> &'static [glib::ParamSpec] {
|
|
|
|
static PROPERTIES: Lazy<Vec<glib::ParamSpec>> = Lazy::new(|| {
|
|
|
|
vec![glib::ParamSpecUInt::builder("trak-timescale")
|
|
|
|
.nick("Track Timescale")
|
|
|
|
.blurb("Timescale to use for the track (units per second, 0 is automatic)")
|
|
|
|
.mutable_ready()
|
|
|
|
.build()]
|
|
|
|
});
|
|
|
|
|
|
|
|
&PROPERTIES
|
|
|
|
}
|
|
|
|
|
|
|
|
fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) {
|
|
|
|
match pspec.name() {
|
|
|
|
"trak-timescale" => {
|
|
|
|
let mut settings = self.settings.lock().unwrap();
|
|
|
|
settings.trak_timescale = value.get().expect("type checked upstream");
|
|
|
|
}
|
|
|
|
|
|
|
|
_ => unimplemented!(),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
|
|
|
|
match pspec.name() {
|
|
|
|
"trak-timescale" => {
|
|
|
|
let settings = self.settings.lock().unwrap();
|
|
|
|
settings.trak_timescale.to_value()
|
|
|
|
}
|
|
|
|
|
|
|
|
_ => unimplemented!(),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl GstObjectImpl for MP4MuxPad {}
|
|
|
|
|
|
|
|
impl PadImpl for MP4MuxPad {}
|
|
|
|
|
|
|
|
impl AggregatorPadImpl for MP4MuxPad {
|
|
|
|
fn flush(&self, aggregator: &gst_base::Aggregator) -> Result<gst::FlowSuccess, gst::FlowError> {
|
|
|
|
let mux = aggregator.downcast_ref::<super::MP4Mux>().unwrap();
|
|
|
|
let mut mux_state = mux.imp().state.lock().unwrap();
|
|
|
|
|
2022-11-09 07:02:35 +00:00
|
|
|
gst::info!(CAT, imp: self, "Flushing");
|
|
|
|
|
2022-11-07 18:41:50 +00:00
|
|
|
for stream in &mut mux_state.streams {
|
|
|
|
if stream.sinkpad == *self.obj() {
|
|
|
|
stream.pending_buffer = None;
|
2022-11-09 07:02:35 +00:00
|
|
|
stream.pre_queue.clear();
|
|
|
|
stream.running_time_utc_time_mapping = None;
|
2022-11-07 18:41:50 +00:00
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
drop(mux_state);
|
|
|
|
|
|
|
|
self.parent_flush(aggregator)
|
|
|
|
}
|
|
|
|
}
|