diff --git a/generic/fmp4/src/fmp4mux/boxes.rs b/generic/fmp4/src/fmp4mux/boxes.rs index d7fca79c..597ed808 100644 --- a/generic/fmp4/src/fmp4mux/boxes.rs +++ b/generic/fmp4/src/fmp4mux/boxes.rs @@ -330,9 +330,9 @@ fn cmaf_brands_from_caps(caps: &gst::CapsRef, compatible_brands: &mut Vec<&'stat } } -fn brands_from_variant_and_caps( +fn brands_from_variant_and_caps<'a>( variant: super::Variant, - caps: &[&gst::Caps], + mut caps: impl Iterator, ) -> (&'static [u8; 4], Vec<&'static [u8; 4]>) { match variant { super::Variant::ISO | super::Variant::ONVIF => (b"iso6", vec![b"iso6"]), @@ -343,8 +343,8 @@ fn brands_from_variant_and_caps( super::Variant::CMAF => { let mut compatible_brands = vec![b"iso6", b"cmfc"]; - assert_eq!(caps.len(), 1); - cmaf_brands_from_caps(caps[0], &mut compatible_brands); + cmaf_brands_from_caps(caps.next().unwrap(), &mut compatible_brands); + assert_eq!(caps.next(), None); (b"cmf2", compatible_brands) } @@ -355,7 +355,8 @@ fn brands_from_variant_and_caps( pub(super) fn create_fmp4_header(cfg: super::HeaderConfiguration) -> Result { let mut v = vec![]; - let (brand, compatible_brands) = brands_from_variant_and_caps(cfg.variant, cfg.caps); + let (brand, compatible_brands) = + brands_from_variant_and_caps(cfg.variant, cfg.streams.iter().map(|s| s.1)); write_box(&mut v, b"ftyp", |v| { // major brand @@ -384,7 +385,7 @@ fn write_moov(v: &mut Vec, cfg: &super::HeaderConfiguration) -> Result<(), E write_full_box(v, b"mvhd", FULL_BOX_VERSION_1, FULL_BOX_FLAGS_NONE, |v| { write_mvhd(v, cfg, creation_time) })?; - for (idx, caps) in cfg.caps.iter().enumerate() { + for (idx, (pad, caps)) in cfg.streams.iter().enumerate() { write_box(v, b"trak", |v| { let mut references = vec![]; @@ -393,7 +394,7 @@ fn write_moov(v: &mut Vec, cfg: &super::HeaderConfiguration) -> Result<(), E && caps.structure(0).unwrap().name() == "application/x-onvif-metadata" { // Find the first video track - for (idx, caps) in cfg.caps.iter().enumerate() { + for (idx, (_pad, caps)) in cfg.streams.iter().enumerate() { let s = caps.structure(0).unwrap(); if matches!(s.name(), "video/x-h264" | "video/x-h265" | "image/jpeg") { @@ -406,7 +407,7 @@ fn write_moov(v: &mut Vec, cfg: &super::HeaderConfiguration) -> Result<(), E } } - write_trak(v, cfg, idx, caps, creation_time, &references) + write_trak(v, cfg, idx, pad, caps, creation_time, &references) })?; } write_box(v, b"mvex", |v| write_mvex(v, cfg))?; @@ -453,7 +454,7 @@ fn write_mvhd( // Modification time v.extend(creation_time.to_be_bytes()); // Timescale: uses the reference track timescale - v.extend(caps_to_timescale(cfg.caps[0]).to_be_bytes()); + v.extend(caps_to_timescale(cfg.streams[0].1).to_be_bytes()); // Duration v.extend(0u64.to_be_bytes()); @@ -485,7 +486,7 @@ fn write_mvhd( v.extend([0u8; 6 * 4]); // Next track id - v.extend((cfg.caps.len() as u32 + 1).to_be_bytes()); + v.extend((cfg.streams.len() as u32 + 1).to_be_bytes()); Ok(()) } @@ -503,6 +504,7 @@ fn write_trak( v: &mut Vec, cfg: &super::HeaderConfiguration, idx: usize, + _pad: &gst_base::AggregatorPad, caps: &gst::CapsRef, creation_time: u64, references: &[TrackReference], @@ -1387,7 +1389,7 @@ fn write_mvex(v: &mut Vec, cfg: &super::HeaderConfiguration) -> Result<(), E } } - for (idx, _caps) in cfg.caps.iter().enumerate() { + for (idx, (_pad, _caps)) in cfg.streams.iter().enumerate() { write_full_box(v, b"trex", FULL_BOX_VERSION_0, FULL_BOX_FLAGS_NONE, |v| { write_trex(v, cfg, idx) })?; @@ -1398,7 +1400,7 @@ fn write_mvex(v: &mut Vec, cfg: &super::HeaderConfiguration) -> Result<(), E fn write_mehd(v: &mut Vec, cfg: &super::HeaderConfiguration) -> Result<(), Error> { // Use the reference track timescale - let timescale = caps_to_timescale(cfg.caps[0]); + let timescale = caps_to_timescale(cfg.streams[0].1); let duration = cfg .duration @@ -1440,7 +1442,8 @@ pub(super) fn create_fmp4_fragment_header( ) -> Result<(gst::Buffer, u64), Error> { let mut v = vec![]; - let (brand, compatible_brands) = brands_from_variant_and_caps(cfg.variant, cfg.caps); + let (brand, compatible_brands) = + brands_from_variant_and_caps(cfg.variant, cfg.streams.iter().map(|s| s.1)); write_box(&mut v, b"styp", |v| { // major brand @@ -1492,14 +1495,15 @@ fn write_moof( })?; let mut data_offset_offsets = vec![]; - for (idx, caps) in cfg.caps.iter().enumerate() { + for (idx, (_pad, caps, timing_info)) in cfg.streams.iter().enumerate() { // Skip tracks without any buffers for this fragment. - if cfg.timing_infos[idx].is_none() { - continue; - } + let timing_info = match timing_info { + None => continue, + Some(ref timing_info) => timing_info, + }; write_box(v, b"traf", |v| { - write_traf(v, cfg, &mut data_offset_offsets, idx, caps) + write_traf(v, cfg, &mut data_offset_offsets, idx, caps, timing_info) })?; } @@ -1513,8 +1517,11 @@ fn write_mfhd(v: &mut Vec, cfg: &super::FragmentHeaderConfiguration) -> Resu } #[allow(clippy::identity_op)] -fn sample_flags_from_buffer(buffer: &gst::BufferRef, intra_only: bool) -> u32 { - if intra_only { +fn sample_flags_from_buffer( + timing_info: &super::FragmentTimingInfo, + buffer: &gst::BufferRef, +) -> u32 { + if timing_info.intra_only { (0b00u32 << (16 + 10)) | // leading: unknown (0b10u32 << (16 + 8)) | // depends: no (0b10u32 << (16 + 6)) | // depended: no @@ -1549,47 +1556,6 @@ fn sample_flags_from_buffer(buffer: &gst::BufferRef, intra_only: bool) -> u32 { } } -fn composition_time_offset_from_pts_dts( - pts: gst::ClockTime, - dts: Option, - timescale: u32, -) -> Result { - let (_, pts, dts) = timestamp_from_pts_dts(pts, dts, true, timescale)?; - let dts = dts.expect("no DTS"); - - let diff = if pts > dts { - i32::try_from((pts - dts) as i64).context("pts-dts diff too big")? - } else { - let diff = dts - pts; - i32::try_from(-(diff as i64)).context("pts-dts diff too big")? - }; - - Ok(diff) -} - -fn timestamp_from_pts_dts( - pts: gst::ClockTime, - dts: Option, - check_dts: bool, - timescale: u32, -) -> Result<(u64, u64, Option), Error> { - let pts = pts - .nseconds() - .mul_div_round(timescale as u64, gst::ClockTime::SECOND.nseconds()) - .context("too big PTS")?; - - if check_dts { - let dts = dts.expect("no DTS"); - let dts = dts - .nseconds() - .mul_div_round(timescale as u64, gst::ClockTime::SECOND.nseconds()) - .context("too big DTS")?; - Ok((dts, pts, Some(dts))) - } else { - Ok((pts, pts, None)) - } -} - const DEFAULT_SAMPLE_DURATION_PRESENT: u32 = 0x08; const DEFAULT_SAMPLE_SIZE_PRESENT: u32 = 0x10; const DEFAULT_SAMPLE_FLAGS_PRESENT: u32 = 0x20; @@ -1606,8 +1572,7 @@ const SAMPLE_COMPOSITION_TIME_OFFSET_PRESENT: u32 = 0x8_00; fn analyze_buffers( cfg: &super::FragmentHeaderConfiguration, idx: usize, - check_dts: bool, - intra_only: bool, + timing_info: &super::FragmentTimingInfo, timescale: u32, ) -> Result< ( @@ -1629,7 +1594,6 @@ fn analyze_buffers( let mut tf_flags = DEFAULT_BASE_IS_MOOF; let mut tr_flags = DATA_OFFSET_PRESENT; - let mut last_timestamp = None; let mut duration = None; let mut size = None; let mut first_buffer_flags = None; @@ -1640,8 +1604,9 @@ fn analyze_buffers( for Buffer { idx: _idx, buffer, - pts, - dts, + timestamp: _timestamp, + duration: sample_duration, + composition_time_offset, } in cfg.buffers.iter().filter(|b| b.idx == idx) { if size.is_none() { @@ -1651,27 +1616,22 @@ fn analyze_buffers( tr_flags |= SAMPLE_SIZE_PRESENT; } - { - let (current_timestamp, _pts, _dts) = - timestamp_from_pts_dts(*pts, *dts, check_dts, timescale)?; + let sample_duration = u32::try_from( + sample_duration + .nseconds() + .mul_div_round(timescale as u64, gst::ClockTime::SECOND.nseconds()) + .context("too big sample duration")?, + ) + .context("too big sample duration")?; - if let Some(prev_timestamp) = last_timestamp { - let dur = u32::try_from(current_timestamp.saturating_sub(prev_timestamp)) - .context("too big sample duration")?; - last_timestamp = Some(current_timestamp); - - if duration.is_none() { - duration = Some(dur); - } - if Some(dur) != duration { - tr_flags |= SAMPLE_DURATION_PRESENT; - } - } else { - last_timestamp = Some(current_timestamp); - } + if duration.is_none() { + duration = Some(sample_duration); + } + if Some(sample_duration) != duration { + tr_flags |= SAMPLE_DURATION_PRESENT; } - let f = sample_flags_from_buffer(buffer, intra_only); + let f = sample_flags_from_buffer(timing_info, buffer); if first_buffer_flags.is_none() { first_buffer_flags = Some(f); } else { @@ -1686,43 +1646,17 @@ fn analyze_buffers( tr_flags |= SAMPLE_FLAGS_PRESENT; } - if check_dts { - let diff = composition_time_offset_from_pts_dts(*pts, *dts, timescale)?; - if diff != 0 { + if let Some(composition_time_offset) = *composition_time_offset { + assert!(!timing_info.intra_only); + if composition_time_offset != 0 { tr_flags |= SAMPLE_COMPOSITION_TIME_OFFSET_PRESENT; } - if diff < 0 { + if composition_time_offset < 0 { negative_composition_time_offsets = true; } } } - // Check duration of the last buffer against end_pts / end_dts - { - let timing_info = cfg.timing_infos[idx].as_ref().unwrap(); - let current_timestamp = if check_dts { - timing_info.end_dts.expect("no end DTS") - } else { - timing_info.end_pts - }; - let current_timestamp = current_timestamp - .nseconds() - .mul_div_round(timescale as u64, gst::ClockTime::SECOND.nseconds()) - .context("too big timestamp")?; - - if let Some(prev_timestamp) = last_timestamp { - let dur = u32::try_from(current_timestamp.saturating_sub(prev_timestamp)) - .context("too big sample duration")?; - - if duration.is_none() { - duration = Some(dur); - } - if Some(dur) != duration { - tr_flags |= SAMPLE_DURATION_PRESENT; - } - } - } - if (tr_flags & SAMPLE_SIZE_PRESENT) == 0 { tf_flags |= DEFAULT_SAMPLE_SIZE_PRESENT; } else { @@ -1765,21 +1699,10 @@ fn write_traf( data_offset_offsets: &mut Vec, idx: usize, caps: &gst::CapsRef, + timing_info: &super::FragmentTimingInfo, ) -> Result<(), Error> { - let s = caps.structure(0).unwrap(); let timescale = caps_to_timescale(caps); - let check_dts = matches!(s.name(), "video/x-h264" | "video/x-h265"); - let intra_only = matches!( - s.name(), - "audio/mpeg" - | "audio/x-alaw" - | "audio/x-mulaw" - | "audio/x-adpcm" - | "image/jpeg" - | "application/x-onvif-metadata" - ); - // Analyze all buffers to know what values can be put into the tfhd for all samples and what // has to be stored for every single sample let ( @@ -1789,7 +1712,7 @@ fn write_traf( default_duration, default_flags, negative_composition_time_offsets, - ) = analyze_buffers(cfg, idx, check_dts, intra_only, timescale)?; + ) = analyze_buffers(cfg, idx, timing_info, timescale)?; assert!((tf_flags & DEFAULT_SAMPLE_SIZE_PRESENT == 0) ^ default_size.is_some()); assert!((tf_flags & DEFAULT_SAMPLE_DURATION_PRESENT == 0) ^ default_duration.is_some()); @@ -1799,7 +1722,7 @@ fn write_traf( write_tfhd(v, cfg, idx, default_size, default_duration, default_flags) })?; write_full_box(v, b"tfdt", FULL_BOX_VERSION_1, FULL_BOX_FLAGS_NONE, |v| { - write_tfdt(v, cfg, idx, timescale) + write_tfdt(v, cfg, idx, timing_info, timescale) })?; let mut current_data_offset = 0; @@ -1814,24 +1737,6 @@ fn write_traf( continue; } - let last_end_timestamp = - if let Some(Buffer { pts, dts, .. }) = iter.as_slice().iter().find(|b| b.idx == idx) { - timestamp_from_pts_dts(*pts, *dts, check_dts, timescale) - .map(|(current_timestamp, _pts, _dts)| current_timestamp)? - } else { - let timing_info = cfg.timing_infos[idx].as_ref().unwrap(); - let last_end_timestamp = if check_dts { - timing_info.end_dts.expect("no end DTS") - } else { - timing_info.end_pts - }; - - last_end_timestamp - .nseconds() - .mul_div_round(timescale as u64, gst::ClockTime::SECOND.nseconds()) - .context("too big timestamp")? - }; - let data_offset_offset = write_full_box( v, b"trun", @@ -1847,11 +1752,9 @@ fn write_traf( cfg, current_data_offset, tr_flags, - check_dts, - intra_only, timescale, + timing_info, run, - last_end_timestamp, ) }, )?; @@ -1898,15 +1801,13 @@ fn write_tfhd( fn write_tfdt( v: &mut Vec, - cfg: &super::FragmentHeaderConfiguration, - idx: usize, + _cfg: &super::FragmentHeaderConfiguration, + _idx: usize, + timing_info: &super::FragmentTimingInfo, timescale: u32, ) -> Result<(), Error> { - let timing_info = cfg.timing_infos[idx].as_ref().unwrap(); - let base_time = timing_info - .start_dts - .unwrap_or(timing_info.earliest_pts) + .start_time .mul_div_floor(timescale as u64, gst::ClockTime::SECOND.nseconds()) .context("base time overflow")?; @@ -1921,11 +1822,9 @@ fn write_trun( _cfg: &super::FragmentHeaderConfiguration, current_data_offset: u32, tr_flags: u32, - check_dts: bool, - intra_only: bool, timescale: u32, + timing_info: &super::FragmentTimingInfo, buffers: &[Buffer], - last_end_timestamp: u64, ) -> Result { // Sample count v.extend((buffers.len() as u32).to_be_bytes()); @@ -1935,37 +1834,27 @@ fn write_trun( v.extend(current_data_offset.to_be_bytes()); if (tr_flags & FIRST_SAMPLE_FLAGS_PRESENT) != 0 { - v.extend(sample_flags_from_buffer(&buffers[0].buffer, intra_only).to_be_bytes()); + v.extend(sample_flags_from_buffer(timing_info, &buffers[0].buffer).to_be_bytes()); } - for ( - Buffer { - idx: _idx, - buffer, - pts, - dts, - }, - next_timestamp, - ) in Iterator::zip( - buffers.iter(), - buffers - .iter() - .skip(1) - .map(|Buffer { pts, dts, .. }| { - timestamp_from_pts_dts(*pts, *dts, check_dts, timescale) - .map(|(current_timestamp, _pts, _dts)| current_timestamp) - }) - .chain(Some(Ok(last_end_timestamp))), - ) { - let next_timestamp = next_timestamp?; - + for Buffer { + idx: _idx, + ref buffer, + timestamp: _timestamp, + duration, + composition_time_offset, + } in buffers.iter() + { if (tr_flags & SAMPLE_DURATION_PRESENT) != 0 { // Sample duration - let (current_timestamp, _pts, _dts) = - timestamp_from_pts_dts(*pts, *dts, check_dts, timescale)?; - let dur = u32::try_from(next_timestamp.saturating_sub(current_timestamp)) - .context("too big sample duration")?; - v.extend(dur.to_be_bytes()); + let sample_duration = u32::try_from( + duration + .nseconds() + .mul_div_round(timescale as u64, gst::ClockTime::SECOND.nseconds()) + .context("too big sample duration")?, + ) + .context("too big sample duration")?; + v.extend(sample_duration.to_be_bytes()); } if (tr_flags & SAMPLE_SIZE_PRESENT) != 0 { @@ -1977,12 +1866,19 @@ fn write_trun( assert!((tr_flags & FIRST_SAMPLE_FLAGS_PRESENT) == 0); // Sample flags - v.extend(sample_flags_from_buffer(buffer, intra_only).to_be_bytes()); + v.extend(sample_flags_from_buffer(timing_info, buffer).to_be_bytes()); } if (tr_flags & SAMPLE_COMPOSITION_TIME_OFFSET_PRESENT) != 0 { // Sample composition time offset - v.extend(composition_time_offset_from_pts_dts(*pts, *dts, timescale)?.to_be_bytes()); + let composition_time_offset = i32::try_from( + composition_time_offset + .unwrap_or(0) + .mul_div_round(timescale as i64, gst::ClockTime::SECOND.nseconds() as i64) + .context("too big composition time offset")?, + ) + .context("too big composition time offset")?; + v.extend(composition_time_offset.to_be_bytes()); } } @@ -2065,10 +1961,6 @@ impl<'a, T: 'a, P> GroupBy<'a, T, P> { fn new(slice: &'a [T], predicate: P) -> Self { GroupBy { slice, predicate } } - - fn as_slice(&'a self) -> &'a [T] { - self.slice - } } impl<'a, T: 'a, P> Iterator for GroupBy<'a, T, P> diff --git a/generic/fmp4/src/fmp4mux/imp.rs b/generic/fmp4/src/fmp4mux/imp.rs index 0fef270c..eb10d567 100644 --- a/generic/fmp4/src/fmp4mux/imp.rs +++ b/generic/fmp4/src/fmp4mux/imp.rs @@ -61,6 +61,12 @@ impl Default for Settings { } } +struct GopBuffer { + buffer: gst::Buffer, + pts: gst::ClockTime, + dts: Option, +} + struct Gop { // Running times start_pts: gst::ClockTime, @@ -80,7 +86,7 @@ struct Gop { start_dts_position: Option, // Buffer, PTS running time, DTS running time - buffers: Vec, + buffers: Vec, } struct Stream { @@ -129,10 +135,11 @@ pub(crate) struct FMP4Mux { } impl FMP4Mux { - fn queue_input( + // Queue incoming buffers as individual GOPs. + fn queue_gops( &self, element: &super::FMP4Mux, - idx: usize, + _idx: usize, stream: &mut Stream, segment: &gst::FormattedSegment, mut buffer: gst::Buffer, @@ -296,12 +303,7 @@ impl FMP4Mux { end_pts, end_dts, final_end_pts: false, - buffers: vec![Buffer { - idx, - buffer, - pts, - dts, - }], + buffers: vec![GopBuffer { buffer, pts, dts }], }; stream.queued_gops.push_front(gop); @@ -348,8 +350,7 @@ impl FMP4Mux { gop.end_pts = std::cmp::max(gop.end_pts, end_pts); gop.end_dts = Some(std::cmp::max(gop.end_dts.expect("no end DTS"), end_dts)); - gop.buffers.push(Buffer { - idx, + gop.buffers.push(GopBuffer { buffer, pts, dts: Some(dts), @@ -518,15 +519,14 @@ impl FMP4Mux { } let mut drain_buffers = Vec::with_capacity(state.streams.len()); - let mut timing_infos = Vec::with_capacity(state.streams.len()); - let mut caps = Vec::with_capacity(state.streams.len()); + let mut streams = Vec::with_capacity(state.streams.len()); let mut min_earliest_pts_position = None; let mut min_earliest_pts = None; let mut min_start_dts_position = None; let mut max_end_pts = None; - for stream in &mut state.streams { + for (idx, stream) in state.streams.iter_mut().enumerate() { assert!( timeout || at_eos @@ -558,7 +558,7 @@ impl FMP4Mux { stream.fragment_filled = false; if gops.is_empty() { - timing_infos.push(None); + streams.push((&stream.sinkpad, &stream.caps, None)); } else { let first_gop = gops.first().unwrap(); let last_gop = gops.last().unwrap(); @@ -567,7 +567,6 @@ impl FMP4Mux { let start_dts = first_gop.start_dts; let start_dts_position = first_gop.start_dts_position; let end_pts = last_gop.end_pts; - let end_dts = last_gop.end_dts; let dts_offset = stream.dts_offset; if min_earliest_pts.map_or(true, |min| min > earliest_pts) { @@ -616,30 +615,93 @@ impl FMP4Mux { .unwrap_or(gst::ClockTime::ZERO) ); - timing_infos.push(Some(super::FragmentTimingInfo { - earliest_pts, - start_dts, - end_pts, - end_dts, - dts_offset, - })); - } + let start_time = if stream.intra_only { + earliest_pts + } else { + start_dts.unwrap() + }; - caps.push(&stream.caps); + streams.push(( + &stream.sinkpad, + &stream.caps, + Some(super::FragmentTimingInfo { + start_time, + intra_only: stream.intra_only, + }), + )); - let mut buffers = VecDeque::with_capacity(gops.iter().map(|g| g.buffers.len()).sum()); - for gop in gops { - for buffer in gop.buffers { - buffers.push_back(buffer); + let mut buffers = + VecDeque::with_capacity(gops.iter().map(|g| g.buffers.len()).sum()); + + for gop in gops { + let mut gop_buffers = gop.buffers.into_iter().peekable(); + while let Some(buffer) = gop_buffers.next() { + let timestamp = if stream.intra_only { + buffer.pts + } else { + buffer.dts.unwrap() + }; + + let end_timestamp = match gop_buffers.peek() { + Some(ref buffer) => { + if stream.intra_only { + buffer.pts + } else { + buffer.dts.unwrap() + } + } + None => { + if stream.intra_only { + gop.end_pts + } else { + gop.end_dts.unwrap() + } + } + }; + + let duration = end_timestamp.saturating_sub(timestamp); + + let composition_time_offset = if stream.intra_only { + None + } else { + let pts = buffer.pts; + let dts = buffer.dts.unwrap(); + + if pts > dts { + Some( + i64::try_from((pts - dts).nseconds()) + .map_err(|_| { + gst::error!(CAT, obj: &stream.sinkpad, "Too big PTS/DTS difference"); + gst::FlowError::Error + })?, + ) + } else { + let diff = i64::try_from((dts - pts).nseconds()) + .map_err(|_| { + gst::error!(CAT, obj: &stream.sinkpad, "Too big PTS/DTS difference"); + gst::FlowError::Error + })?; + Some(-diff) + } + }; + + buffers.push_back(Buffer { + idx, + buffer: buffer.buffer, + timestamp, + duration, + composition_time_offset, + }); + } } + drain_buffers.push(buffers); } - drain_buffers.push(buffers); } // Interleave buffers according to the settings into a single vec let mut interleaved_buffers = Vec::with_capacity(drain_buffers.iter().map(|bs| bs.len()).sum()); - while let Some((idx, bs)) = + while let Some((_idx, bs)) = drain_buffers .iter_mut() .enumerate() @@ -651,7 +713,7 @@ impl FMP4Mux { (Some(a), Some(b)) => (a, b), }; - match a.dts.unwrap_or(a.pts).cmp(&b.dts.unwrap_or(b.pts)) { + match a.timestamp.cmp(&b.timestamp) { std::cmp::Ordering::Equal => a_idx.cmp(b_idx), cmp => cmp, } @@ -662,7 +724,7 @@ impl FMP4Mux { // No more buffers now break; } - Some(buf) => buf.dts.unwrap_or(buf.pts), + Some(buf) => buf.timestamp, }; let mut current_end_time = start_time; let mut dequeued_bytes = 0; @@ -675,13 +737,7 @@ impl FMP4Mux { }) { if let Some(buffer) = bs.pop_front() { - current_end_time = match bs.front() { - Some(next_buffer) => next_buffer.dts.unwrap_or(next_buffer.pts), - None => { - let timing_info = timing_infos[idx].as_ref().unwrap(); - timing_info.end_dts.unwrap_or(timing_info.end_pts) - } - }; + current_end_time = buffer.timestamp + buffer.duration; dequeued_bytes += buffer.buffer.size() as u64; interleaved_buffers.push(buffer); } else { @@ -739,8 +795,7 @@ impl FMP4Mux { boxes::create_fmp4_fragment_header(super::FragmentHeaderConfiguration { variant: class.as_ref().variant, sequence_number, - caps: caps.as_slice(), - timing_infos: timing_infos.as_slice(), + streams: streams.as_slice(), buffers: interleaved_buffers.as_slice(), }) .map_err(|err| { @@ -803,9 +858,9 @@ impl FMP4Mux { // Write mfra only for the main stream, and if there are no buffers for the main stream // in this segment then don't write anything. - if let Some(Some(ref timing_info)) = timing_infos.get(0) { + if let Some((_pad, _caps, Some(ref timing_info))) = streams.get(0) { state.fragment_offsets.push(super::FragmentOffset { - time: timing_info.earliest_pts, + time: timing_info.start_time, offset: moof_offset, }); } @@ -818,7 +873,7 @@ impl FMP4Mux { } if settings.write_mfra && at_eos { - match boxes::create_mfra(caps[0], &state.fragment_offsets) { + match boxes::create_mfra(streams[0].1, &state.fragment_offsets) { Ok(mut mfra) => { { let mfra = mfra.get_mut().unwrap(); @@ -962,12 +1017,17 @@ impl FMP4Mux { .ok() .flatten(); - let caps = state.streams.iter().map(|s| &s.caps).collect::>(); + let streams = state + .streams + .iter() + .map(|s| (&s.sinkpad, &s.caps)) + .collect::>(); let mut buffer = boxes::create_fmp4_header(super::HeaderConfiguration { + element, variant, update: at_eos, - caps: caps.as_slice(), + streams: streams.as_slice(), write_mehd: settings.write_mehd, duration: if at_eos { duration } else { None }, }) @@ -1404,10 +1464,13 @@ impl AggregatorImpl for FMP4Mux { let buffers = { let mut state = self.state.lock().unwrap(); - // Create streams, stream header in the beginning and set output caps. - if state.stream_header.is_none() { + // Create streams + if state.streams.is_empty() { self.create_streams(aggregator, &mut state)?; + } + // Stream header in the beginning and set output caps. + if state.stream_header.is_none() { let (_, caps) = self .update_header(aggregator, &mut state, &settings, false)? .unwrap(); @@ -1452,7 +1515,7 @@ impl AggregatorImpl for FMP4Mux { let pts = buffer.pts(); // Queue up the buffer and update GOP tracking state - self.queue_input(aggregator, idx, stream, &segment, buffer)?; + self.queue_gops(aggregator, idx, stream, &segment, buffer)?; // If we have a PTS with this buffer, check if a new force-keyunit event for the next // fragment start has to be created diff --git a/generic/fmp4/src/fmp4mux/mod.rs b/generic/fmp4/src/fmp4mux/mod.rs index a88db6bf..50b1a419 100644 --- a/generic/fmp4/src/fmp4mux/mod.rs +++ b/generic/fmp4/src/fmp4mux/mod.rs @@ -61,46 +61,57 @@ pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { Ok(()) } -#[derive(Debug)] -pub(crate) struct Buffer { - /// Track index - idx: usize, - buffer: gst::Buffer, - // Running times - pts: gst::ClockTime, - dts: Option, -} - #[derive(Debug)] pub(crate) struct HeaderConfiguration<'a> { variant: Variant, + #[allow(dead_code)] + element: &'a FMP4Mux, update: bool, /// First caps must be the video/reference stream. Must be in the order the tracks are going to /// be used later for the fragments too. - caps: &'a [&'a gst::Caps], + streams: &'a [(&'a gst_base::AggregatorPad, &'a gst::Caps)], write_mehd: bool, duration: Option, } -#[derive(Debug)] -pub(crate) struct FragmentTimingInfo { - earliest_pts: gst::ClockTime, - start_dts: Option, - end_pts: gst::ClockTime, - end_dts: Option, - #[allow(dead_code)] - dts_offset: Option, -} - #[derive(Debug)] pub(crate) struct FragmentHeaderConfiguration<'a> { variant: Variant, sequence_number: u32, - caps: &'a [&'a gst::Caps], - timing_infos: &'a [Option], + streams: &'a [( + &'a gst_base::AggregatorPad, + &'a gst::Caps, + Option, + )], buffers: &'a [Buffer], } +#[derive(Debug)] +pub(crate) struct FragmentTimingInfo { + /// Start time of this fragment + start_time: gst::ClockTime, + /// Set if this is an intra-only stream + intra_only: bool, +} + +#[derive(Debug)] +pub(crate) struct Buffer { + /// Track index + idx: usize, + + /// Actual buffer + buffer: gst::Buffer, + + /// Timestamp + timestamp: gst::ClockTime, + + /// Sample duration + duration: gst::ClockTime, + + /// Composition time offset + composition_time_offset: Option, +} + #[allow(clippy::upper_case_acronyms)] #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub(crate) enum Variant {