fmp4mux: Refactor timestamp handling for writing out data

Pass less data around and calculate the values we need to work with
earlier.
This commit is contained in:
Sebastian Dröge 2022-06-30 16:29:09 +03:00
parent 0a02d8c096
commit 4ef0e26762
3 changed files with 230 additions and 264 deletions

View file

@ -330,9 +330,9 @@ fn cmaf_brands_from_caps(caps: &gst::CapsRef, compatible_brands: &mut Vec<&'stat
} }
} }
fn brands_from_variant_and_caps( fn brands_from_variant_and_caps<'a>(
variant: super::Variant, variant: super::Variant,
caps: &[&gst::Caps], mut caps: impl Iterator<Item = &'a gst::Caps>,
) -> (&'static [u8; 4], Vec<&'static [u8; 4]>) { ) -> (&'static [u8; 4], Vec<&'static [u8; 4]>) {
match variant { match variant {
super::Variant::ISO | super::Variant::ONVIF => (b"iso6", vec![b"iso6"]), super::Variant::ISO | super::Variant::ONVIF => (b"iso6", vec![b"iso6"]),
@ -343,8 +343,8 @@ fn brands_from_variant_and_caps(
super::Variant::CMAF => { super::Variant::CMAF => {
let mut compatible_brands = vec![b"iso6", b"cmfc"]; let mut compatible_brands = vec![b"iso6", b"cmfc"];
assert_eq!(caps.len(), 1); cmaf_brands_from_caps(caps.next().unwrap(), &mut compatible_brands);
cmaf_brands_from_caps(caps[0], &mut compatible_brands); assert_eq!(caps.next(), None);
(b"cmf2", compatible_brands) (b"cmf2", compatible_brands)
} }
@ -355,7 +355,8 @@ fn brands_from_variant_and_caps(
pub(super) fn create_fmp4_header(cfg: super::HeaderConfiguration) -> Result<gst::Buffer, Error> { pub(super) fn create_fmp4_header(cfg: super::HeaderConfiguration) -> Result<gst::Buffer, Error> {
let mut v = vec![]; let mut v = vec![];
let (brand, compatible_brands) = brands_from_variant_and_caps(cfg.variant, cfg.caps); let (brand, compatible_brands) =
brands_from_variant_and_caps(cfg.variant, cfg.streams.iter().map(|s| s.1));
write_box(&mut v, b"ftyp", |v| { write_box(&mut v, b"ftyp", |v| {
// major brand // major brand
@ -384,7 +385,7 @@ fn write_moov(v: &mut Vec<u8>, cfg: &super::HeaderConfiguration) -> Result<(), E
write_full_box(v, b"mvhd", FULL_BOX_VERSION_1, FULL_BOX_FLAGS_NONE, |v| { write_full_box(v, b"mvhd", FULL_BOX_VERSION_1, FULL_BOX_FLAGS_NONE, |v| {
write_mvhd(v, cfg, creation_time) write_mvhd(v, cfg, creation_time)
})?; })?;
for (idx, caps) in cfg.caps.iter().enumerate() { for (idx, (pad, caps)) in cfg.streams.iter().enumerate() {
write_box(v, b"trak", |v| { write_box(v, b"trak", |v| {
let mut references = vec![]; let mut references = vec![];
@ -393,7 +394,7 @@ fn write_moov(v: &mut Vec<u8>, cfg: &super::HeaderConfiguration) -> Result<(), E
&& caps.structure(0).unwrap().name() == "application/x-onvif-metadata" && caps.structure(0).unwrap().name() == "application/x-onvif-metadata"
{ {
// Find the first video track // Find the first video track
for (idx, caps) in cfg.caps.iter().enumerate() { for (idx, (_pad, caps)) in cfg.streams.iter().enumerate() {
let s = caps.structure(0).unwrap(); let s = caps.structure(0).unwrap();
if matches!(s.name(), "video/x-h264" | "video/x-h265" | "image/jpeg") { if matches!(s.name(), "video/x-h264" | "video/x-h265" | "image/jpeg") {
@ -406,7 +407,7 @@ fn write_moov(v: &mut Vec<u8>, cfg: &super::HeaderConfiguration) -> Result<(), E
} }
} }
write_trak(v, cfg, idx, caps, creation_time, &references) write_trak(v, cfg, idx, pad, caps, creation_time, &references)
})?; })?;
} }
write_box(v, b"mvex", |v| write_mvex(v, cfg))?; write_box(v, b"mvex", |v| write_mvex(v, cfg))?;
@ -453,7 +454,7 @@ fn write_mvhd(
// Modification time // Modification time
v.extend(creation_time.to_be_bytes()); v.extend(creation_time.to_be_bytes());
// Timescale: uses the reference track timescale // Timescale: uses the reference track timescale
v.extend(caps_to_timescale(cfg.caps[0]).to_be_bytes()); v.extend(caps_to_timescale(cfg.streams[0].1).to_be_bytes());
// Duration // Duration
v.extend(0u64.to_be_bytes()); v.extend(0u64.to_be_bytes());
@ -485,7 +486,7 @@ fn write_mvhd(
v.extend([0u8; 6 * 4]); v.extend([0u8; 6 * 4]);
// Next track id // Next track id
v.extend((cfg.caps.len() as u32 + 1).to_be_bytes()); v.extend((cfg.streams.len() as u32 + 1).to_be_bytes());
Ok(()) Ok(())
} }
@ -503,6 +504,7 @@ fn write_trak(
v: &mut Vec<u8>, v: &mut Vec<u8>,
cfg: &super::HeaderConfiguration, cfg: &super::HeaderConfiguration,
idx: usize, idx: usize,
_pad: &gst_base::AggregatorPad,
caps: &gst::CapsRef, caps: &gst::CapsRef,
creation_time: u64, creation_time: u64,
references: &[TrackReference], references: &[TrackReference],
@ -1387,7 +1389,7 @@ fn write_mvex(v: &mut Vec<u8>, cfg: &super::HeaderConfiguration) -> Result<(), E
} }
} }
for (idx, _caps) in cfg.caps.iter().enumerate() { for (idx, (_pad, _caps)) in cfg.streams.iter().enumerate() {
write_full_box(v, b"trex", FULL_BOX_VERSION_0, FULL_BOX_FLAGS_NONE, |v| { write_full_box(v, b"trex", FULL_BOX_VERSION_0, FULL_BOX_FLAGS_NONE, |v| {
write_trex(v, cfg, idx) write_trex(v, cfg, idx)
})?; })?;
@ -1398,7 +1400,7 @@ fn write_mvex(v: &mut Vec<u8>, cfg: &super::HeaderConfiguration) -> Result<(), E
fn write_mehd(v: &mut Vec<u8>, cfg: &super::HeaderConfiguration) -> Result<(), Error> { fn write_mehd(v: &mut Vec<u8>, cfg: &super::HeaderConfiguration) -> Result<(), Error> {
// Use the reference track timescale // Use the reference track timescale
let timescale = caps_to_timescale(cfg.caps[0]); let timescale = caps_to_timescale(cfg.streams[0].1);
let duration = cfg let duration = cfg
.duration .duration
@ -1440,7 +1442,8 @@ pub(super) fn create_fmp4_fragment_header(
) -> Result<(gst::Buffer, u64), Error> { ) -> Result<(gst::Buffer, u64), Error> {
let mut v = vec![]; let mut v = vec![];
let (brand, compatible_brands) = brands_from_variant_and_caps(cfg.variant, cfg.caps); let (brand, compatible_brands) =
brands_from_variant_and_caps(cfg.variant, cfg.streams.iter().map(|s| s.1));
write_box(&mut v, b"styp", |v| { write_box(&mut v, b"styp", |v| {
// major brand // major brand
@ -1492,14 +1495,15 @@ fn write_moof(
})?; })?;
let mut data_offset_offsets = vec![]; let mut data_offset_offsets = vec![];
for (idx, caps) in cfg.caps.iter().enumerate() { for (idx, (_pad, caps, timing_info)) in cfg.streams.iter().enumerate() {
// Skip tracks without any buffers for this fragment. // Skip tracks without any buffers for this fragment.
if cfg.timing_infos[idx].is_none() { let timing_info = match timing_info {
continue; None => continue,
} Some(ref timing_info) => timing_info,
};
write_box(v, b"traf", |v| { write_box(v, b"traf", |v| {
write_traf(v, cfg, &mut data_offset_offsets, idx, caps) write_traf(v, cfg, &mut data_offset_offsets, idx, caps, timing_info)
})?; })?;
} }
@ -1513,8 +1517,11 @@ fn write_mfhd(v: &mut Vec<u8>, cfg: &super::FragmentHeaderConfiguration) -> Resu
} }
#[allow(clippy::identity_op)] #[allow(clippy::identity_op)]
fn sample_flags_from_buffer(buffer: &gst::BufferRef, intra_only: bool) -> u32 { fn sample_flags_from_buffer(
if intra_only { timing_info: &super::FragmentTimingInfo,
buffer: &gst::BufferRef,
) -> u32 {
if timing_info.intra_only {
(0b00u32 << (16 + 10)) | // leading: unknown (0b00u32 << (16 + 10)) | // leading: unknown
(0b10u32 << (16 + 8)) | // depends: no (0b10u32 << (16 + 8)) | // depends: no
(0b10u32 << (16 + 6)) | // depended: no (0b10u32 << (16 + 6)) | // depended: no
@ -1549,47 +1556,6 @@ fn sample_flags_from_buffer(buffer: &gst::BufferRef, intra_only: bool) -> u32 {
} }
} }
fn composition_time_offset_from_pts_dts(
pts: gst::ClockTime,
dts: Option<gst::ClockTime>,
timescale: u32,
) -> Result<i32, Error> {
let (_, pts, dts) = timestamp_from_pts_dts(pts, dts, true, timescale)?;
let dts = dts.expect("no DTS");
let diff = if pts > dts {
i32::try_from((pts - dts) as i64).context("pts-dts diff too big")?
} else {
let diff = dts - pts;
i32::try_from(-(diff as i64)).context("pts-dts diff too big")?
};
Ok(diff)
}
fn timestamp_from_pts_dts(
pts: gst::ClockTime,
dts: Option<gst::ClockTime>,
check_dts: bool,
timescale: u32,
) -> Result<(u64, u64, Option<u64>), Error> {
let pts = pts
.nseconds()
.mul_div_round(timescale as u64, gst::ClockTime::SECOND.nseconds())
.context("too big PTS")?;
if check_dts {
let dts = dts.expect("no DTS");
let dts = dts
.nseconds()
.mul_div_round(timescale as u64, gst::ClockTime::SECOND.nseconds())
.context("too big DTS")?;
Ok((dts, pts, Some(dts)))
} else {
Ok((pts, pts, None))
}
}
const DEFAULT_SAMPLE_DURATION_PRESENT: u32 = 0x08; const DEFAULT_SAMPLE_DURATION_PRESENT: u32 = 0x08;
const DEFAULT_SAMPLE_SIZE_PRESENT: u32 = 0x10; const DEFAULT_SAMPLE_SIZE_PRESENT: u32 = 0x10;
const DEFAULT_SAMPLE_FLAGS_PRESENT: u32 = 0x20; const DEFAULT_SAMPLE_FLAGS_PRESENT: u32 = 0x20;
@ -1606,8 +1572,7 @@ const SAMPLE_COMPOSITION_TIME_OFFSET_PRESENT: u32 = 0x8_00;
fn analyze_buffers( fn analyze_buffers(
cfg: &super::FragmentHeaderConfiguration, cfg: &super::FragmentHeaderConfiguration,
idx: usize, idx: usize,
check_dts: bool, timing_info: &super::FragmentTimingInfo,
intra_only: bool,
timescale: u32, timescale: u32,
) -> Result< ) -> Result<
( (
@ -1629,7 +1594,6 @@ fn analyze_buffers(
let mut tf_flags = DEFAULT_BASE_IS_MOOF; let mut tf_flags = DEFAULT_BASE_IS_MOOF;
let mut tr_flags = DATA_OFFSET_PRESENT; let mut tr_flags = DATA_OFFSET_PRESENT;
let mut last_timestamp = None;
let mut duration = None; let mut duration = None;
let mut size = None; let mut size = None;
let mut first_buffer_flags = None; let mut first_buffer_flags = None;
@ -1640,8 +1604,9 @@ fn analyze_buffers(
for Buffer { for Buffer {
idx: _idx, idx: _idx,
buffer, buffer,
pts, timestamp: _timestamp,
dts, duration: sample_duration,
composition_time_offset,
} in cfg.buffers.iter().filter(|b| b.idx == idx) } in cfg.buffers.iter().filter(|b| b.idx == idx)
{ {
if size.is_none() { if size.is_none() {
@ -1651,27 +1616,22 @@ fn analyze_buffers(
tr_flags |= SAMPLE_SIZE_PRESENT; tr_flags |= SAMPLE_SIZE_PRESENT;
} }
{ let sample_duration = u32::try_from(
let (current_timestamp, _pts, _dts) = sample_duration
timestamp_from_pts_dts(*pts, *dts, check_dts, timescale)?; .nseconds()
.mul_div_round(timescale as u64, gst::ClockTime::SECOND.nseconds())
.context("too big sample duration")?,
)
.context("too big sample duration")?;
if let Some(prev_timestamp) = last_timestamp { if duration.is_none() {
let dur = u32::try_from(current_timestamp.saturating_sub(prev_timestamp)) duration = Some(sample_duration);
.context("too big sample duration")?; }
last_timestamp = Some(current_timestamp); if Some(sample_duration) != duration {
tr_flags |= SAMPLE_DURATION_PRESENT;
if duration.is_none() {
duration = Some(dur);
}
if Some(dur) != duration {
tr_flags |= SAMPLE_DURATION_PRESENT;
}
} else {
last_timestamp = Some(current_timestamp);
}
} }
let f = sample_flags_from_buffer(buffer, intra_only); let f = sample_flags_from_buffer(timing_info, buffer);
if first_buffer_flags.is_none() { if first_buffer_flags.is_none() {
first_buffer_flags = Some(f); first_buffer_flags = Some(f);
} else { } else {
@ -1686,43 +1646,17 @@ fn analyze_buffers(
tr_flags |= SAMPLE_FLAGS_PRESENT; tr_flags |= SAMPLE_FLAGS_PRESENT;
} }
if check_dts { if let Some(composition_time_offset) = *composition_time_offset {
let diff = composition_time_offset_from_pts_dts(*pts, *dts, timescale)?; assert!(!timing_info.intra_only);
if diff != 0 { if composition_time_offset != 0 {
tr_flags |= SAMPLE_COMPOSITION_TIME_OFFSET_PRESENT; tr_flags |= SAMPLE_COMPOSITION_TIME_OFFSET_PRESENT;
} }
if diff < 0 { if composition_time_offset < 0 {
negative_composition_time_offsets = true; negative_composition_time_offsets = true;
} }
} }
} }
// Check duration of the last buffer against end_pts / end_dts
{
let timing_info = cfg.timing_infos[idx].as_ref().unwrap();
let current_timestamp = if check_dts {
timing_info.end_dts.expect("no end DTS")
} else {
timing_info.end_pts
};
let current_timestamp = current_timestamp
.nseconds()
.mul_div_round(timescale as u64, gst::ClockTime::SECOND.nseconds())
.context("too big timestamp")?;
if let Some(prev_timestamp) = last_timestamp {
let dur = u32::try_from(current_timestamp.saturating_sub(prev_timestamp))
.context("too big sample duration")?;
if duration.is_none() {
duration = Some(dur);
}
if Some(dur) != duration {
tr_flags |= SAMPLE_DURATION_PRESENT;
}
}
}
if (tr_flags & SAMPLE_SIZE_PRESENT) == 0 { if (tr_flags & SAMPLE_SIZE_PRESENT) == 0 {
tf_flags |= DEFAULT_SAMPLE_SIZE_PRESENT; tf_flags |= DEFAULT_SAMPLE_SIZE_PRESENT;
} else { } else {
@ -1765,21 +1699,10 @@ fn write_traf(
data_offset_offsets: &mut Vec<usize>, data_offset_offsets: &mut Vec<usize>,
idx: usize, idx: usize,
caps: &gst::CapsRef, caps: &gst::CapsRef,
timing_info: &super::FragmentTimingInfo,
) -> Result<(), Error> { ) -> Result<(), Error> {
let s = caps.structure(0).unwrap();
let timescale = caps_to_timescale(caps); let timescale = caps_to_timescale(caps);
let check_dts = matches!(s.name(), "video/x-h264" | "video/x-h265");
let intra_only = matches!(
s.name(),
"audio/mpeg"
| "audio/x-alaw"
| "audio/x-mulaw"
| "audio/x-adpcm"
| "image/jpeg"
| "application/x-onvif-metadata"
);
// Analyze all buffers to know what values can be put into the tfhd for all samples and what // Analyze all buffers to know what values can be put into the tfhd for all samples and what
// has to be stored for every single sample // has to be stored for every single sample
let ( let (
@ -1789,7 +1712,7 @@ fn write_traf(
default_duration, default_duration,
default_flags, default_flags,
negative_composition_time_offsets, negative_composition_time_offsets,
) = analyze_buffers(cfg, idx, check_dts, intra_only, timescale)?; ) = analyze_buffers(cfg, idx, timing_info, timescale)?;
assert!((tf_flags & DEFAULT_SAMPLE_SIZE_PRESENT == 0) ^ default_size.is_some()); assert!((tf_flags & DEFAULT_SAMPLE_SIZE_PRESENT == 0) ^ default_size.is_some());
assert!((tf_flags & DEFAULT_SAMPLE_DURATION_PRESENT == 0) ^ default_duration.is_some()); assert!((tf_flags & DEFAULT_SAMPLE_DURATION_PRESENT == 0) ^ default_duration.is_some());
@ -1799,7 +1722,7 @@ fn write_traf(
write_tfhd(v, cfg, idx, default_size, default_duration, default_flags) write_tfhd(v, cfg, idx, default_size, default_duration, default_flags)
})?; })?;
write_full_box(v, b"tfdt", FULL_BOX_VERSION_1, FULL_BOX_FLAGS_NONE, |v| { write_full_box(v, b"tfdt", FULL_BOX_VERSION_1, FULL_BOX_FLAGS_NONE, |v| {
write_tfdt(v, cfg, idx, timescale) write_tfdt(v, cfg, idx, timing_info, timescale)
})?; })?;
let mut current_data_offset = 0; let mut current_data_offset = 0;
@ -1814,24 +1737,6 @@ fn write_traf(
continue; continue;
} }
let last_end_timestamp =
if let Some(Buffer { pts, dts, .. }) = iter.as_slice().iter().find(|b| b.idx == idx) {
timestamp_from_pts_dts(*pts, *dts, check_dts, timescale)
.map(|(current_timestamp, _pts, _dts)| current_timestamp)?
} else {
let timing_info = cfg.timing_infos[idx].as_ref().unwrap();
let last_end_timestamp = if check_dts {
timing_info.end_dts.expect("no end DTS")
} else {
timing_info.end_pts
};
last_end_timestamp
.nseconds()
.mul_div_round(timescale as u64, gst::ClockTime::SECOND.nseconds())
.context("too big timestamp")?
};
let data_offset_offset = write_full_box( let data_offset_offset = write_full_box(
v, v,
b"trun", b"trun",
@ -1847,11 +1752,9 @@ fn write_traf(
cfg, cfg,
current_data_offset, current_data_offset,
tr_flags, tr_flags,
check_dts,
intra_only,
timescale, timescale,
timing_info,
run, run,
last_end_timestamp,
) )
}, },
)?; )?;
@ -1898,15 +1801,13 @@ fn write_tfhd(
fn write_tfdt( fn write_tfdt(
v: &mut Vec<u8>, v: &mut Vec<u8>,
cfg: &super::FragmentHeaderConfiguration, _cfg: &super::FragmentHeaderConfiguration,
idx: usize, _idx: usize,
timing_info: &super::FragmentTimingInfo,
timescale: u32, timescale: u32,
) -> Result<(), Error> { ) -> Result<(), Error> {
let timing_info = cfg.timing_infos[idx].as_ref().unwrap();
let base_time = timing_info let base_time = timing_info
.start_dts .start_time
.unwrap_or(timing_info.earliest_pts)
.mul_div_floor(timescale as u64, gst::ClockTime::SECOND.nseconds()) .mul_div_floor(timescale as u64, gst::ClockTime::SECOND.nseconds())
.context("base time overflow")?; .context("base time overflow")?;
@ -1921,11 +1822,9 @@ fn write_trun(
_cfg: &super::FragmentHeaderConfiguration, _cfg: &super::FragmentHeaderConfiguration,
current_data_offset: u32, current_data_offset: u32,
tr_flags: u32, tr_flags: u32,
check_dts: bool,
intra_only: bool,
timescale: u32, timescale: u32,
timing_info: &super::FragmentTimingInfo,
buffers: &[Buffer], buffers: &[Buffer],
last_end_timestamp: u64,
) -> Result<usize, Error> { ) -> Result<usize, Error> {
// Sample count // Sample count
v.extend((buffers.len() as u32).to_be_bytes()); v.extend((buffers.len() as u32).to_be_bytes());
@ -1935,37 +1834,27 @@ fn write_trun(
v.extend(current_data_offset.to_be_bytes()); v.extend(current_data_offset.to_be_bytes());
if (tr_flags & FIRST_SAMPLE_FLAGS_PRESENT) != 0 { if (tr_flags & FIRST_SAMPLE_FLAGS_PRESENT) != 0 {
v.extend(sample_flags_from_buffer(&buffers[0].buffer, intra_only).to_be_bytes()); v.extend(sample_flags_from_buffer(timing_info, &buffers[0].buffer).to_be_bytes());
} }
for ( for Buffer {
Buffer { idx: _idx,
idx: _idx, ref buffer,
buffer, timestamp: _timestamp,
pts, duration,
dts, composition_time_offset,
}, } in buffers.iter()
next_timestamp, {
) in Iterator::zip(
buffers.iter(),
buffers
.iter()
.skip(1)
.map(|Buffer { pts, dts, .. }| {
timestamp_from_pts_dts(*pts, *dts, check_dts, timescale)
.map(|(current_timestamp, _pts, _dts)| current_timestamp)
})
.chain(Some(Ok(last_end_timestamp))),
) {
let next_timestamp = next_timestamp?;
if (tr_flags & SAMPLE_DURATION_PRESENT) != 0 { if (tr_flags & SAMPLE_DURATION_PRESENT) != 0 {
// Sample duration // Sample duration
let (current_timestamp, _pts, _dts) = let sample_duration = u32::try_from(
timestamp_from_pts_dts(*pts, *dts, check_dts, timescale)?; duration
let dur = u32::try_from(next_timestamp.saturating_sub(current_timestamp)) .nseconds()
.context("too big sample duration")?; .mul_div_round(timescale as u64, gst::ClockTime::SECOND.nseconds())
v.extend(dur.to_be_bytes()); .context("too big sample duration")?,
)
.context("too big sample duration")?;
v.extend(sample_duration.to_be_bytes());
} }
if (tr_flags & SAMPLE_SIZE_PRESENT) != 0 { if (tr_flags & SAMPLE_SIZE_PRESENT) != 0 {
@ -1977,12 +1866,19 @@ fn write_trun(
assert!((tr_flags & FIRST_SAMPLE_FLAGS_PRESENT) == 0); assert!((tr_flags & FIRST_SAMPLE_FLAGS_PRESENT) == 0);
// Sample flags // Sample flags
v.extend(sample_flags_from_buffer(buffer, intra_only).to_be_bytes()); v.extend(sample_flags_from_buffer(timing_info, buffer).to_be_bytes());
} }
if (tr_flags & SAMPLE_COMPOSITION_TIME_OFFSET_PRESENT) != 0 { if (tr_flags & SAMPLE_COMPOSITION_TIME_OFFSET_PRESENT) != 0 {
// Sample composition time offset // Sample composition time offset
v.extend(composition_time_offset_from_pts_dts(*pts, *dts, timescale)?.to_be_bytes()); let composition_time_offset = i32::try_from(
composition_time_offset
.unwrap_or(0)
.mul_div_round(timescale as i64, gst::ClockTime::SECOND.nseconds() as i64)
.context("too big composition time offset")?,
)
.context("too big composition time offset")?;
v.extend(composition_time_offset.to_be_bytes());
} }
} }
@ -2065,10 +1961,6 @@ impl<'a, T: 'a, P> GroupBy<'a, T, P> {
fn new(slice: &'a [T], predicate: P) -> Self { fn new(slice: &'a [T], predicate: P) -> Self {
GroupBy { slice, predicate } GroupBy { slice, predicate }
} }
fn as_slice(&'a self) -> &'a [T] {
self.slice
}
} }
impl<'a, T: 'a, P> Iterator for GroupBy<'a, T, P> impl<'a, T: 'a, P> Iterator for GroupBy<'a, T, P>

View file

@ -61,6 +61,12 @@ impl Default for Settings {
} }
} }
struct GopBuffer {
buffer: gst::Buffer,
pts: gst::ClockTime,
dts: Option<gst::ClockTime>,
}
struct Gop { struct Gop {
// Running times // Running times
start_pts: gst::ClockTime, start_pts: gst::ClockTime,
@ -80,7 +86,7 @@ struct Gop {
start_dts_position: Option<gst::ClockTime>, start_dts_position: Option<gst::ClockTime>,
// Buffer, PTS running time, DTS running time // Buffer, PTS running time, DTS running time
buffers: Vec<Buffer>, buffers: Vec<GopBuffer>,
} }
struct Stream { struct Stream {
@ -129,10 +135,11 @@ pub(crate) struct FMP4Mux {
} }
impl FMP4Mux { impl FMP4Mux {
fn queue_input( // Queue incoming buffers as individual GOPs.
fn queue_gops(
&self, &self,
element: &super::FMP4Mux, element: &super::FMP4Mux,
idx: usize, _idx: usize,
stream: &mut Stream, stream: &mut Stream,
segment: &gst::FormattedSegment<gst::ClockTime>, segment: &gst::FormattedSegment<gst::ClockTime>,
mut buffer: gst::Buffer, mut buffer: gst::Buffer,
@ -296,12 +303,7 @@ impl FMP4Mux {
end_pts, end_pts,
end_dts, end_dts,
final_end_pts: false, final_end_pts: false,
buffers: vec![Buffer { buffers: vec![GopBuffer { buffer, pts, dts }],
idx,
buffer,
pts,
dts,
}],
}; };
stream.queued_gops.push_front(gop); stream.queued_gops.push_front(gop);
@ -348,8 +350,7 @@ impl FMP4Mux {
gop.end_pts = std::cmp::max(gop.end_pts, end_pts); gop.end_pts = std::cmp::max(gop.end_pts, end_pts);
gop.end_dts = Some(std::cmp::max(gop.end_dts.expect("no end DTS"), end_dts)); gop.end_dts = Some(std::cmp::max(gop.end_dts.expect("no end DTS"), end_dts));
gop.buffers.push(Buffer { gop.buffers.push(GopBuffer {
idx,
buffer, buffer,
pts, pts,
dts: Some(dts), dts: Some(dts),
@ -518,15 +519,14 @@ impl FMP4Mux {
} }
let mut drain_buffers = Vec::with_capacity(state.streams.len()); let mut drain_buffers = Vec::with_capacity(state.streams.len());
let mut timing_infos = Vec::with_capacity(state.streams.len()); let mut streams = Vec::with_capacity(state.streams.len());
let mut caps = Vec::with_capacity(state.streams.len());
let mut min_earliest_pts_position = None; let mut min_earliest_pts_position = None;
let mut min_earliest_pts = None; let mut min_earliest_pts = None;
let mut min_start_dts_position = None; let mut min_start_dts_position = None;
let mut max_end_pts = None; let mut max_end_pts = None;
for stream in &mut state.streams { for (idx, stream) in state.streams.iter_mut().enumerate() {
assert!( assert!(
timeout timeout
|| at_eos || at_eos
@ -558,7 +558,7 @@ impl FMP4Mux {
stream.fragment_filled = false; stream.fragment_filled = false;
if gops.is_empty() { if gops.is_empty() {
timing_infos.push(None); streams.push((&stream.sinkpad, &stream.caps, None));
} else { } else {
let first_gop = gops.first().unwrap(); let first_gop = gops.first().unwrap();
let last_gop = gops.last().unwrap(); let last_gop = gops.last().unwrap();
@ -567,7 +567,6 @@ impl FMP4Mux {
let start_dts = first_gop.start_dts; let start_dts = first_gop.start_dts;
let start_dts_position = first_gop.start_dts_position; let start_dts_position = first_gop.start_dts_position;
let end_pts = last_gop.end_pts; let end_pts = last_gop.end_pts;
let end_dts = last_gop.end_dts;
let dts_offset = stream.dts_offset; let dts_offset = stream.dts_offset;
if min_earliest_pts.map_or(true, |min| min > earliest_pts) { if min_earliest_pts.map_or(true, |min| min > earliest_pts) {
@ -616,30 +615,93 @@ impl FMP4Mux {
.unwrap_or(gst::ClockTime::ZERO) .unwrap_or(gst::ClockTime::ZERO)
); );
timing_infos.push(Some(super::FragmentTimingInfo { let start_time = if stream.intra_only {
earliest_pts, earliest_pts
start_dts, } else {
end_pts, start_dts.unwrap()
end_dts, };
dts_offset,
}));
}
caps.push(&stream.caps); streams.push((
&stream.sinkpad,
&stream.caps,
Some(super::FragmentTimingInfo {
start_time,
intra_only: stream.intra_only,
}),
));
let mut buffers = VecDeque::with_capacity(gops.iter().map(|g| g.buffers.len()).sum()); let mut buffers =
for gop in gops { VecDeque::with_capacity(gops.iter().map(|g| g.buffers.len()).sum());
for buffer in gop.buffers {
buffers.push_back(buffer); for gop in gops {
let mut gop_buffers = gop.buffers.into_iter().peekable();
while let Some(buffer) = gop_buffers.next() {
let timestamp = if stream.intra_only {
buffer.pts
} else {
buffer.dts.unwrap()
};
let end_timestamp = match gop_buffers.peek() {
Some(ref buffer) => {
if stream.intra_only {
buffer.pts
} else {
buffer.dts.unwrap()
}
}
None => {
if stream.intra_only {
gop.end_pts
} else {
gop.end_dts.unwrap()
}
}
};
let duration = end_timestamp.saturating_sub(timestamp);
let composition_time_offset = if stream.intra_only {
None
} else {
let pts = buffer.pts;
let dts = buffer.dts.unwrap();
if pts > dts {
Some(
i64::try_from((pts - dts).nseconds())
.map_err(|_| {
gst::error!(CAT, obj: &stream.sinkpad, "Too big PTS/DTS difference");
gst::FlowError::Error
})?,
)
} else {
let diff = i64::try_from((dts - pts).nseconds())
.map_err(|_| {
gst::error!(CAT, obj: &stream.sinkpad, "Too big PTS/DTS difference");
gst::FlowError::Error
})?;
Some(-diff)
}
};
buffers.push_back(Buffer {
idx,
buffer: buffer.buffer,
timestamp,
duration,
composition_time_offset,
});
}
} }
drain_buffers.push(buffers);
} }
drain_buffers.push(buffers);
} }
// Interleave buffers according to the settings into a single vec // Interleave buffers according to the settings into a single vec
let mut interleaved_buffers = let mut interleaved_buffers =
Vec::with_capacity(drain_buffers.iter().map(|bs| bs.len()).sum()); Vec::with_capacity(drain_buffers.iter().map(|bs| bs.len()).sum());
while let Some((idx, bs)) = while let Some((_idx, bs)) =
drain_buffers drain_buffers
.iter_mut() .iter_mut()
.enumerate() .enumerate()
@ -651,7 +713,7 @@ impl FMP4Mux {
(Some(a), Some(b)) => (a, b), (Some(a), Some(b)) => (a, b),
}; };
match a.dts.unwrap_or(a.pts).cmp(&b.dts.unwrap_or(b.pts)) { match a.timestamp.cmp(&b.timestamp) {
std::cmp::Ordering::Equal => a_idx.cmp(b_idx), std::cmp::Ordering::Equal => a_idx.cmp(b_idx),
cmp => cmp, cmp => cmp,
} }
@ -662,7 +724,7 @@ impl FMP4Mux {
// No more buffers now // No more buffers now
break; break;
} }
Some(buf) => buf.dts.unwrap_or(buf.pts), Some(buf) => buf.timestamp,
}; };
let mut current_end_time = start_time; let mut current_end_time = start_time;
let mut dequeued_bytes = 0; let mut dequeued_bytes = 0;
@ -675,13 +737,7 @@ impl FMP4Mux {
}) })
{ {
if let Some(buffer) = bs.pop_front() { if let Some(buffer) = bs.pop_front() {
current_end_time = match bs.front() { current_end_time = buffer.timestamp + buffer.duration;
Some(next_buffer) => next_buffer.dts.unwrap_or(next_buffer.pts),
None => {
let timing_info = timing_infos[idx].as_ref().unwrap();
timing_info.end_dts.unwrap_or(timing_info.end_pts)
}
};
dequeued_bytes += buffer.buffer.size() as u64; dequeued_bytes += buffer.buffer.size() as u64;
interleaved_buffers.push(buffer); interleaved_buffers.push(buffer);
} else { } else {
@ -739,8 +795,7 @@ impl FMP4Mux {
boxes::create_fmp4_fragment_header(super::FragmentHeaderConfiguration { boxes::create_fmp4_fragment_header(super::FragmentHeaderConfiguration {
variant: class.as_ref().variant, variant: class.as_ref().variant,
sequence_number, sequence_number,
caps: caps.as_slice(), streams: streams.as_slice(),
timing_infos: timing_infos.as_slice(),
buffers: interleaved_buffers.as_slice(), buffers: interleaved_buffers.as_slice(),
}) })
.map_err(|err| { .map_err(|err| {
@ -803,9 +858,9 @@ impl FMP4Mux {
// Write mfra only for the main stream, and if there are no buffers for the main stream // Write mfra only for the main stream, and if there are no buffers for the main stream
// in this segment then don't write anything. // in this segment then don't write anything.
if let Some(Some(ref timing_info)) = timing_infos.get(0) { if let Some((_pad, _caps, Some(ref timing_info))) = streams.get(0) {
state.fragment_offsets.push(super::FragmentOffset { state.fragment_offsets.push(super::FragmentOffset {
time: timing_info.earliest_pts, time: timing_info.start_time,
offset: moof_offset, offset: moof_offset,
}); });
} }
@ -818,7 +873,7 @@ impl FMP4Mux {
} }
if settings.write_mfra && at_eos { if settings.write_mfra && at_eos {
match boxes::create_mfra(caps[0], &state.fragment_offsets) { match boxes::create_mfra(streams[0].1, &state.fragment_offsets) {
Ok(mut mfra) => { Ok(mut mfra) => {
{ {
let mfra = mfra.get_mut().unwrap(); let mfra = mfra.get_mut().unwrap();
@ -962,12 +1017,17 @@ impl FMP4Mux {
.ok() .ok()
.flatten(); .flatten();
let caps = state.streams.iter().map(|s| &s.caps).collect::<Vec<_>>(); let streams = state
.streams
.iter()
.map(|s| (&s.sinkpad, &s.caps))
.collect::<Vec<_>>();
let mut buffer = boxes::create_fmp4_header(super::HeaderConfiguration { let mut buffer = boxes::create_fmp4_header(super::HeaderConfiguration {
element,
variant, variant,
update: at_eos, update: at_eos,
caps: caps.as_slice(), streams: streams.as_slice(),
write_mehd: settings.write_mehd, write_mehd: settings.write_mehd,
duration: if at_eos { duration } else { None }, duration: if at_eos { duration } else { None },
}) })
@ -1404,10 +1464,13 @@ impl AggregatorImpl for FMP4Mux {
let buffers = { let buffers = {
let mut state = self.state.lock().unwrap(); let mut state = self.state.lock().unwrap();
// Create streams, stream header in the beginning and set output caps. // Create streams
if state.stream_header.is_none() { if state.streams.is_empty() {
self.create_streams(aggregator, &mut state)?; self.create_streams(aggregator, &mut state)?;
}
// Stream header in the beginning and set output caps.
if state.stream_header.is_none() {
let (_, caps) = self let (_, caps) = self
.update_header(aggregator, &mut state, &settings, false)? .update_header(aggregator, &mut state, &settings, false)?
.unwrap(); .unwrap();
@ -1452,7 +1515,7 @@ impl AggregatorImpl for FMP4Mux {
let pts = buffer.pts(); let pts = buffer.pts();
// Queue up the buffer and update GOP tracking state // Queue up the buffer and update GOP tracking state
self.queue_input(aggregator, idx, stream, &segment, buffer)?; self.queue_gops(aggregator, idx, stream, &segment, buffer)?;
// If we have a PTS with this buffer, check if a new force-keyunit event for the next // If we have a PTS with this buffer, check if a new force-keyunit event for the next
// fragment start has to be created // fragment start has to be created

View file

@ -61,46 +61,57 @@ pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
Ok(()) Ok(())
} }
#[derive(Debug)]
pub(crate) struct Buffer {
/// Track index
idx: usize,
buffer: gst::Buffer,
// Running times
pts: gst::ClockTime,
dts: Option<gst::ClockTime>,
}
#[derive(Debug)] #[derive(Debug)]
pub(crate) struct HeaderConfiguration<'a> { pub(crate) struct HeaderConfiguration<'a> {
variant: Variant, variant: Variant,
#[allow(dead_code)]
element: &'a FMP4Mux,
update: bool, update: bool,
/// First caps must be the video/reference stream. Must be in the order the tracks are going to /// First caps must be the video/reference stream. Must be in the order the tracks are going to
/// be used later for the fragments too. /// be used later for the fragments too.
caps: &'a [&'a gst::Caps], streams: &'a [(&'a gst_base::AggregatorPad, &'a gst::Caps)],
write_mehd: bool, write_mehd: bool,
duration: Option<gst::ClockTime>, duration: Option<gst::ClockTime>,
} }
#[derive(Debug)]
pub(crate) struct FragmentTimingInfo {
earliest_pts: gst::ClockTime,
start_dts: Option<gst::ClockTime>,
end_pts: gst::ClockTime,
end_dts: Option<gst::ClockTime>,
#[allow(dead_code)]
dts_offset: Option<gst::ClockTime>,
}
#[derive(Debug)] #[derive(Debug)]
pub(crate) struct FragmentHeaderConfiguration<'a> { pub(crate) struct FragmentHeaderConfiguration<'a> {
variant: Variant, variant: Variant,
sequence_number: u32, sequence_number: u32,
caps: &'a [&'a gst::Caps], streams: &'a [(
timing_infos: &'a [Option<FragmentTimingInfo>], &'a gst_base::AggregatorPad,
&'a gst::Caps,
Option<FragmentTimingInfo>,
)],
buffers: &'a [Buffer], buffers: &'a [Buffer],
} }
#[derive(Debug)]
pub(crate) struct FragmentTimingInfo {
/// Start time of this fragment
start_time: gst::ClockTime,
/// Set if this is an intra-only stream
intra_only: bool,
}
#[derive(Debug)]
pub(crate) struct Buffer {
/// Track index
idx: usize,
/// Actual buffer
buffer: gst::Buffer,
/// Timestamp
timestamp: gst::ClockTime,
/// Sample duration
duration: gst::ClockTime,
/// Composition time offset
composition_time_offset: Option<i64>,
}
#[allow(clippy::upper_case_acronyms)] #[allow(clippy::upper_case_acronyms)]
#[derive(Debug, Clone, Copy, PartialEq, Eq)] #[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum Variant { pub(crate) enum Variant {