diff --git a/generic/fmp4/src/fmp4mux/imp.rs b/generic/fmp4/src/fmp4mux/imp.rs index 85b21230..634747b9 100644 --- a/generic/fmp4/src/fmp4mux/imp.rs +++ b/generic/fmp4/src/fmp4mux/imp.rs @@ -20,6 +20,9 @@ use once_cell::sync::Lazy; use super::boxes; use super::Buffer; +/// Offset for the segment in non-single-stream variants. +const SEGMENT_OFFSET: gst::ClockTime = gst::ClockTime::from_seconds(60 * 60 * 1000); + static CAT: Lazy = Lazy::new(|| { gst::DebugCategory::new( "fmp4mux", @@ -32,6 +35,8 @@ const DEFAULT_FRAGMENT_DURATION: gst::ClockTime = gst::ClockTime::from_seconds(1 const DEFAULT_HEADER_UPDATE_MODE: super::HeaderUpdateMode = super::HeaderUpdateMode::None; const DEFAULT_WRITE_MFRA: bool = false; const DEFAULT_WRITE_MEHD: bool = false; +const DEFAULT_INTERLEAVE_BYTES: Option = None; +const DEFAULT_INTERLEAVE_TIME: Option = Some(gst::ClockTime::from_mseconds(250)); #[derive(Debug, Clone)] struct Settings { @@ -39,6 +44,8 @@ struct Settings { header_update_mode: super::HeaderUpdateMode, write_mfra: bool, write_mehd: bool, + interleave_bytes: Option, + interleave_time: Option, } impl Default for Settings { @@ -48,6 +55,8 @@ impl Default for Settings { header_update_mode: DEFAULT_HEADER_UPDATE_MODE, write_mfra: DEFAULT_WRITE_MFRA, write_mehd: DEFAULT_WRITE_MEHD, + interleave_bytes: DEFAULT_INTERLEAVE_BYTES, + interleave_time: DEFAULT_INTERLEAVE_TIME, } } } @@ -61,6 +70,8 @@ struct Gop { final_earliest_pts: bool, // PTS plus duration of last buffer, or start of next GOP end_pts: gst::ClockTime, + // Once this is known to be the final end PTS/DTS + final_end_pts: bool, // DTS plus duration of last buffer, or start of next GOP end_dts: Option, @@ -72,24 +83,30 @@ struct Gop { buffers: Vec, } +struct Stream { + sinkpad: gst_base::AggregatorPad, + + caps: gst::Caps, + intra_only: bool, + + queued_gops: VecDeque, + fragment_filled: bool, + + // Difference between the first DTS and 0 in case of negative DTS + dts_offset: Option, + + last_force_keyunit_time: Option, +} + #[derive(Default)] struct State { - caps: Option, - intra_only: bool, + streams: Vec, // Created once we received caps and kept up to date with the caps, // sent as part of the buffer list for the first fragment. stream_header: Option, sequence_number: u32, - queued_gops: VecDeque, - // Duration of all GOPs except for the newest one that is still being filled - queued_duration: gst::ClockTime, - - // Difference between the first DTS and 0 in case of negative DTS - dts_offset: Option, - - last_force_keyunit_time: Option, // Fragment tracking for mfra current_offset: u64, @@ -99,11 +116,14 @@ struct State { earliest_pts: Option, end_pts: Option, - generated_headers: bool, + // Start PTS of the current fragment + fragment_start_pts: Option, + + sent_headers: bool, } +#[derive(Default)] pub(crate) struct FMP4Mux { - sinkpad: gst_base::AggregatorPad, state: Mutex, settings: Mutex, } @@ -112,91 +132,91 @@ impl FMP4Mux { fn queue_input( &self, element: &super::FMP4Mux, - state: &mut State, + idx: usize, + stream: &mut Stream, segment: &gst::FormattedSegment, - buffer: gst::Buffer, + mut buffer: gst::Buffer, ) -> Result<(), gst::FlowError> { - gst::trace!(CAT, obj: element, "Handling buffer {:?}", buffer); + assert!(!stream.fragment_filled); - if state.caps.is_none() { - gst::error!(CAT, obj: element, "Got buffer before caps"); - return Err(gst::FlowError::NotNegotiated); - } + gst::trace!(CAT, obj: &stream.sinkpad, "Handling buffer {:?}", buffer); - let intra_only = state.intra_only; + let intra_only = stream.intra_only; if !intra_only && buffer.dts().is_none() { - gst::error!(CAT, obj: element, "Require DTS for video streams"); + gst::error!(CAT, obj: &stream.sinkpad, "Require DTS for video streams"); return Err(gst::FlowError::Error); } if intra_only && buffer.flags().contains(gst::BufferFlags::DELTA_UNIT) { - gst::error!(CAT, obj: element, "Intra-only stream with delta units"); + gst::error!(CAT, obj: &stream.sinkpad, "Intra-only stream with delta units"); return Err(gst::FlowError::Error); } - let pts = buffer.pts().ok_or_else(|| { - gst::error!(CAT, obj: element, "Require timestamped buffers"); + let pts_position = buffer.pts().ok_or_else(|| { + gst::error!(CAT, obj: &stream.sinkpad, "Require timestamped buffers"); gst::FlowError::Error })?; let duration = buffer.duration(); - let end_pts = duration.opt_add(pts).unwrap_or(pts); + let end_pts_position = duration.map_or(pts_position, |duration| pts_position + duration); - let pts = match segment.to_running_time_full(pts) { + let pts = match segment.to_running_time_full(pts_position) { (_, None) => { - gst::error!(CAT, obj: element, "Couldn't convert PTS to running time"); + gst::error!(CAT, obj: &stream.sinkpad, "Couldn't convert PTS to running time"); return Err(gst::FlowError::Error); } (pts_signum, _) if pts_signum < 0 => { - gst::error!(CAT, obj: element, "Negative PTSs are not supported"); + gst::error!(CAT, obj: &stream.sinkpad, "Negative PTSs are not supported"); return Err(gst::FlowError::Error); } (_, Some(pts)) => pts, }; - let end_pts = match segment.to_running_time_full(end_pts) { + let end_pts = match segment.to_running_time_full(end_pts_position) { (_, None) => { gst::error!( CAT, - obj: element, + obj: &stream.sinkpad, "Couldn't convert end PTS to running time" ); return Err(gst::FlowError::Error); } (pts_signum, _) if pts_signum < 0 => { - gst::error!(CAT, obj: element, "Negative PTSs are not supported"); + gst::error!(CAT, obj: &stream.sinkpad, "Negative PTSs are not supported"); return Err(gst::FlowError::Error); } (_, Some(pts)) => pts, }; - let (dts, end_dts) = if intra_only { - (None, None) + let (dts_position, dts, end_dts) = if intra_only { + (None, None, None) } else { - // with the dts_offset by having negative composition time offsets in the `trun` box. - let dts = buffer.dts().expect("not DTS"); - let end_dts = duration.opt_add(dts).unwrap_or(dts); + // Negative DTS are handled via the dts_offset and by having negative composition time + // offsets in the `trun` box. The smallest DTS here is shifted to zero. + let dts_position = buffer.dts().expect("not DTS"); + let end_dts_position = + duration.map_or(dts_position, |duration| dts_position + duration); - let dts = match segment.to_running_time_full(dts) { + let dts = match segment.to_running_time_full(dts_position) { (_, None) => { - gst::error!(CAT, obj: element, "Couldn't convert DTS to running time"); + gst::error!(CAT, obj: &stream.sinkpad, "Couldn't convert DTS to running time"); return Err(gst::FlowError::Error); } (pts_signum, Some(dts)) if pts_signum < 0 => { - if state.dts_offset.is_none() { - state.dts_offset = Some(dts); + if stream.dts_offset.is_none() { + stream.dts_offset = Some(dts); } - let dts_offset = state.dts_offset.unwrap(); + let dts_offset = stream.dts_offset.unwrap(); if dts > dts_offset { - gst::warning!(CAT, obj: element, "DTS before first DTS"); + gst::warning!(CAT, obj: &stream.sinkpad, "DTS before first DTS"); gst::ClockTime::ZERO } else { dts_offset - dts } } (_, Some(dts)) => { - if let Some(dts_offset) = state.dts_offset { + if let Some(dts_offset) = stream.dts_offset { dts + dts_offset } else { dts @@ -204,70 +224,91 @@ impl FMP4Mux { } }; - let end_dts = match segment.to_running_time_full(end_dts) { + let end_dts = match segment.to_running_time_full(end_dts_position) { (_, None) => { gst::error!( CAT, - obj: element, + obj: &stream.sinkpad, "Couldn't convert end DTS to running time" ); return Err(gst::FlowError::Error); } (pts_signum, Some(dts)) if pts_signum < 0 => { - if state.dts_offset.is_none() { - state.dts_offset = Some(dts); + if stream.dts_offset.is_none() { + stream.dts_offset = Some(dts); } - let dts_offset = state.dts_offset.unwrap(); + let dts_offset = stream.dts_offset.unwrap(); if dts > dts_offset { - gst::warning!(CAT, obj: element, "End DTS before first DTS"); + gst::warning!(CAT, obj: &stream.sinkpad, "End DTS before first DTS"); gst::ClockTime::ZERO } else { dts_offset - dts } } (_, Some(dts)) => { - if let Some(dts_offset) = state.dts_offset { + if let Some(dts_offset) = stream.dts_offset { dts + dts_offset } else { dts } } }; - (Some(dts), Some(end_dts)) + (Some(dts_position), Some(dts), Some(end_dts)) + }; + + // If this is a multi-stream element then we need to update the PTS/DTS positions according + // to the output segment, specifically to re-timestamp them with the running time and + // adjust for the segment shift to compensate for negative DTS. + let class = element.class(); + let (pts_position, dts_position) = if class.as_ref().variant.is_single_stream() { + (pts_position, dts_position) + } else { + let pts_position = pts + SEGMENT_OFFSET; + let dts_position = dts.map(|dts| { + dts + SEGMENT_OFFSET - stream.dts_offset.unwrap_or(gst::ClockTime::ZERO) + }); + + let buffer = buffer.make_mut(); + buffer.set_pts(pts_position); + buffer.set_dts(dts_position); + + (pts_position, dts_position) }; if !buffer.flags().contains(gst::BufferFlags::DELTA_UNIT) { gst::debug!( CAT, - obj: element, - "Starting new GOP at PTS {} DTS {}", + obj: &stream.sinkpad, + "Starting new GOP at PTS {} DTS {} (DTS offset {})", pts, - dts.display() + dts.display(), + stream.dts_offset.display(), ); let gop = Gop { start_pts: pts, start_dts: dts, - start_dts_position: if intra_only { None } else { buffer.dts() }, + start_dts_position: if intra_only { None } else { dts_position }, earliest_pts: pts, - earliest_pts_position: buffer.pts().expect("no PTS"), + earliest_pts_position: pts_position, final_earliest_pts: intra_only, end_pts, end_dts, + final_end_pts: false, buffers: vec![Buffer { - idx: 0, + idx, buffer, pts, dts, }], }; - state.queued_gops.push_front(gop); + stream.queued_gops.push_front(gop); - if let Some(prev_gop) = state.queued_gops.get_mut(1) { + if let Some(prev_gop) = stream.queued_gops.get_mut(1) { gst::debug!( CAT, - obj: element, + obj: &stream.sinkpad, "Updating previous GOP starting at PTS {} to end PTS {} DTS {}", prev_gop.earliest_pts, pts, @@ -276,51 +317,39 @@ impl FMP4Mux { prev_gop.end_pts = pts; prev_gop.end_dts = dts; + if intra_only { + prev_gop.final_end_pts = true; + } + if !prev_gop.final_earliest_pts { // Don't bother logging this for intra-only streams as it would be for every // single buffer. if !intra_only { gst::debug!( CAT, - obj: element, + obj: &stream.sinkpad, "Previous GOP has final earliest PTS at {}", prev_gop.earliest_pts ); } prev_gop.final_earliest_pts = true; - - state.queued_duration = - prev_gop.end_pts - state.queued_gops.back().unwrap().earliest_pts; - gst::debug!( - CAT, - obj: element, - "Queued duration updated to {}", - state.queued_duration - ); - } else if intra_only { - state.queued_duration = - prev_gop.end_pts - state.queued_gops.back().unwrap().earliest_pts; - gst::debug!( - CAT, - obj: element, - "Queued duration updated to {}", - state.queued_duration - ); + if let Some(prev_prev_gop) = stream.queued_gops.get_mut(2) { + prev_prev_gop.final_end_pts = true; + } } } - } else if let Some(gop) = state.queued_gops.front_mut() { + } else if let Some(gop) = stream.queued_gops.front_mut() { assert!(!intra_only); // We require DTS for non-intra-only streams let dts = dts.unwrap(); let end_dts = end_dts.unwrap(); - let pts_position = buffer.pts().expect("no PTS"); gop.end_pts = std::cmp::max(gop.end_pts, end_pts); gop.end_dts = Some(std::cmp::max(gop.end_dts.expect("no end DTS"), end_dts)); gop.buffers.push(Buffer { - idx: 0, + idx, buffer, pts, dts: Some(dts), @@ -329,7 +358,7 @@ impl FMP4Mux { if gop.earliest_pts > pts && !gop.final_earliest_pts { gst::debug!( CAT, - obj: element, + obj: &stream.sinkpad, "Updating current GOP earliest PTS from {} to {}", gop.earliest_pts, pts @@ -337,10 +366,10 @@ impl FMP4Mux { gop.earliest_pts = pts; gop.earliest_pts_position = pts_position; - if let Some(prev_gop) = state.queued_gops.get_mut(1) { + if let Some(prev_gop) = stream.queued_gops.get_mut(1) { gst::debug!( CAT, - obj: element, + obj: &stream.sinkpad, "Updating previous GOP starting PTS {} end time from {} to {}", pts, prev_gop.end_pts, @@ -350,7 +379,7 @@ impl FMP4Mux { } } - let gop = state.queued_gops.front_mut().unwrap(); + let gop = stream.queued_gops.front_mut().unwrap(); // The earliest PTS is known when the current DTS is bigger or equal to the first // PTS that was observed in this GOP. If there was another frame later that had a @@ -359,89 +388,111 @@ impl FMP4Mux { if gop.start_pts <= dts && !gop.final_earliest_pts { gst::debug!( CAT, - obj: element, + obj: &stream.sinkpad, "GOP has final earliest PTS at {}", gop.earliest_pts ); gop.final_earliest_pts = true; - if let Some(prev_gop) = state.queued_gops.get_mut(1) { - state.queued_duration = - prev_gop.end_pts - state.queued_gops.back().unwrap().earliest_pts; - gst::debug!( - CAT, - obj: element, - "Queued duration updated to {}", - state.queued_duration - ); + if let Some(prev_gop) = stream.queued_gops.get_mut(1) { + prev_gop.final_end_pts = true; } } } else { gst::warning!( CAT, - obj: element, + obj: &stream.sinkpad, "Waiting for keyframe at the beginning of the stream" ); } + if let Some((prev_gop, first_gop)) = Option::zip( + stream.queued_gops.iter().find(|gop| gop.final_end_pts), + stream.queued_gops.back(), + ) { + gst::debug!( + CAT, + obj: &stream.sinkpad, + "Queued full GOPs duration updated to {}", + prev_gop.end_pts - first_gop.earliest_pts, + ); + } + + gst::debug!( + CAT, + obj: &stream.sinkpad, + "Queued duration updated to {}", + Option::zip(stream.queued_gops.front(), stream.queued_gops.back()) + .map(|(end, start)| end.end_pts - start.start_pts) + .unwrap_or(gst::ClockTime::ZERO) + ); + Ok(()) } + fn create_initial_force_keyunit_event( + &self, + _element: &super::FMP4Mux, + stream: &mut Stream, + settings: &Settings, + earliest_pts: gst::ClockTime, + ) -> Result, gst::FlowError> { + assert!(stream.last_force_keyunit_time.is_none()); + + // If we never sent a force-keyunit event then send one now. + let fku_running_time = earliest_pts + settings.fragment_duration; + gst::debug!( + CAT, + obj: &stream.sinkpad, + "Sending first force-keyunit event for running time {}", + fku_running_time + ); + stream.last_force_keyunit_time = Some(fku_running_time); + + return Ok(Some( + gst_video::UpstreamForceKeyUnitEvent::builder() + .running_time(fku_running_time) + .all_headers(true) + .build(), + )); + } + fn create_force_keyunit_event( &self, - element: &super::FMP4Mux, - state: &mut State, + _element: &super::FMP4Mux, + stream: &mut Stream, settings: &Settings, segment: &gst::FormattedSegment, pts: gst::ClockTime, ) -> Result, gst::FlowError> { // If we never sent a force-keyunit event then wait until the earliest PTS of the first GOP - // is known and send one now. + // is known and send it then. // // Otherwise if the current PTS is a fragment duration in the future, send the next one // now. - let oldest_gop = state.queued_gops.back().unwrap(); - let earliest_pts = oldest_gop.earliest_pts; let pts = segment.to_running_time(pts).expect("no running time"); - if state.last_force_keyunit_time.is_none() && oldest_gop.final_earliest_pts { - let fku_running_time = earliest_pts + settings.fragment_duration; - gst::debug!( - CAT, - obj: element, - "Sending first force-keyunit event for running time {}", - fku_running_time - ); - state.last_force_keyunit_time = Some(fku_running_time); + let last_force_keyunit_time = match stream.last_force_keyunit_time { + None => return Ok(None), + Some(last_force_keyunit_time) if last_force_keyunit_time > pts => return Ok(None), + Some(last_force_keyunit_time) => last_force_keyunit_time, + }; - return Ok(Some( - gst_video::UpstreamForceKeyUnitEvent::builder() - .running_time(fku_running_time) - .all_headers(true) - .build(), - )); - } else if state.last_force_keyunit_time.is_some() - && state.last_force_keyunit_time <= Some(pts) - { - let fku_running_time = - state.last_force_keyunit_time.unwrap() + settings.fragment_duration; - gst::debug!( - CAT, - obj: element, - "Sending force-keyunit event for running time {}", - fku_running_time - ); - state.last_force_keyunit_time = Some(fku_running_time); + let fku_running_time = last_force_keyunit_time + settings.fragment_duration; + gst::debug!( + CAT, + obj: &stream.sinkpad, + "Sending force-keyunit event for running time {}", + fku_running_time + ); + stream.last_force_keyunit_time = Some(fku_running_time); - return Ok(Some( - gst_video::UpstreamForceKeyUnitEvent::builder() - .running_time(fku_running_time) - .all_headers(true) - .build(), - )); - } - - Ok(None) + Ok(Some( + gst_video::UpstreamForceKeyUnitEvent::builder() + .running_time(fku_running_time) + .all_headers(true) + .build(), + )) } fn drain( @@ -453,75 +504,190 @@ impl FMP4Mux { ) -> Result, gst::FlowError> { let class = element.class(); - if state.queued_duration < settings.fragment_duration && !at_eos { - return Ok(None); + if at_eos { + gst::info!(CAT, obj: element, "Draining at EOS"); + } else { + for stream in &state.streams { + if !stream.fragment_filled && !stream.sinkpad.is_eos() { + return Ok(None); + } + } } - assert!(at_eos || state.queued_gops.get(1).map(|gop| gop.final_earliest_pts) == Some(true)); + let mut drain_buffers = Vec::with_capacity(state.streams.len()); + let mut timing_infos = Vec::with_capacity(state.streams.len()); + let mut caps = Vec::with_capacity(state.streams.len()); - // At EOS, finalize all GOPs and drain them out. Otherwise if the queued duration is - // equal to the fragment duration then drain out all complete GOPs, otherwise all - // except for the newest complete GOP. - let drain_gops = if at_eos { - gst::info!(CAT, obj: element, "Draining at EOS"); - state.queued_duration = gst::ClockTime::ZERO; - state - .queued_gops - .drain(..) - .map(|mut gop| { - gop.final_earliest_pts = true; - gop - }) - .collect::>() - } else if state.queued_duration == settings.fragment_duration - || state.queued_gops.len() == 2 - { - state.queued_duration = gst::ClockTime::ZERO; - state.queued_gops.drain(1..).collect::>() - } else { - let gops = state.queued_gops.drain(2..).collect::>(); + let mut min_earliest_pts_position = None; + let mut min_earliest_pts = None; + let mut min_start_dts_position = None; + let mut max_end_pts = None; - let gop = state.queued_gops.front().unwrap(); - if gop.final_earliest_pts { - let prev_gop = state.queued_gops.get(1).unwrap(); - state.queued_duration = - prev_gop.end_pts - state.queued_gops.back().unwrap().earliest_pts; + for stream in &mut state.streams { + assert!( + at_eos + || stream.sinkpad.is_eos() + || stream.queued_gops.get(1).map(|gop| gop.final_earliest_pts) == Some(true) + ); + + // At EOS, finalize all GOPs and drain them out. Otherwise if the queued duration is + // equal to the fragment duration then drain out all complete GOPs, otherwise all + // except for the newest complete GOP. + let gops = if at_eos || stream.sinkpad.is_eos() { + stream.queued_gops.drain(..).rev().collect::>() } else { - state.queued_duration = gst::ClockTime::ZERO; + let mut gops = vec![]; + + let fragment_start_pts = state.fragment_start_pts.unwrap(); + while let Some(gop) = stream.queued_gops.pop_back() { + assert!(gop.final_end_pts); + + let end_pts = gop.end_pts; + gops.push(gop); + if end_pts.saturating_sub(fragment_start_pts) >= settings.fragment_duration { + break; + } + } + + gops + }; + stream.fragment_filled = false; + + if gops.is_empty() { + timing_infos.push(None); + } else { + let first_gop = gops.first().unwrap(); + let last_gop = gops.last().unwrap(); + let earliest_pts = first_gop.earliest_pts; + let earliest_pts_position = first_gop.earliest_pts_position; + let start_dts = first_gop.start_dts; + let start_dts_position = first_gop.start_dts_position; + let end_pts = last_gop.end_pts; + let end_dts = last_gop.end_dts; + let dts_offset = stream.dts_offset; + + if min_earliest_pts.map_or(true, |min| min > earliest_pts) { + min_earliest_pts = Some(earliest_pts); + } + if min_earliest_pts_position.map_or(true, |min| min > earliest_pts_position) { + min_earliest_pts_position = Some(earliest_pts_position); + } + if let Some(start_dts_position) = start_dts_position { + if min_start_dts_position.map_or(true, |min| min > start_dts_position) { + min_start_dts_position = Some(start_dts_position); + } + } + if max_end_pts.map_or(true, |max| max < end_pts) { + max_end_pts = Some(end_pts); + } + + gst::info!( + CAT, + obj: &stream.sinkpad, + "Draining {} worth of buffers starting at PTS {} DTS {}, DTS offset {}", + end_pts - earliest_pts, + earliest_pts, + start_dts.display(), + dts_offset.display(), + ); + + if let Some((prev_gop, first_gop)) = Option::zip( + stream.queued_gops.iter().find(|gop| gop.final_end_pts), + stream.queued_gops.back(), + ) { + gst::debug!( + CAT, + obj: &stream.sinkpad, + "Queued full GOPs duration updated to {}", + prev_gop.end_pts - first_gop.earliest_pts, + ); + } + + gst::debug!( + CAT, + obj: &stream.sinkpad, + "Queued duration updated to {}", + Option::zip(stream.queued_gops.front(), stream.queued_gops.back()) + .map(|(end, start)| end.end_pts - start.start_pts) + .unwrap_or(gst::ClockTime::ZERO) + ); + + timing_infos.push(Some(super::FragmentTimingInfo { + earliest_pts, + start_dts, + end_pts, + end_dts, + dts_offset, + })); } - gops - }; + caps.push(&stream.caps); + + let mut buffers = VecDeque::with_capacity(gops.iter().map(|g| g.buffers.len()).sum()); + for gop in gops { + for buffer in gop.buffers { + buffers.push_back(buffer); + } + } + drain_buffers.push(buffers); + } + + // Interleave buffers according to the settings into a single vec + let mut interleaved_buffers = + Vec::with_capacity(drain_buffers.iter().map(|bs| bs.len()).sum()); + while drain_buffers.iter().any(|bs| !bs.is_empty()) { + for (idx, bs) in drain_buffers.iter_mut().enumerate() { + let mut dequeued_time = gst::ClockTime::ZERO; + let mut dequeued_bytes = 0; + + while settings + .interleave_bytes + .map_or(true, |max_bytes| dequeued_bytes <= max_bytes) + && settings + .interleave_time + .map_or(true, |max_time| dequeued_time <= max_time) + { + if let Some(buffer) = bs.pop_front() { + dequeued_time += match bs.front() { + Some(next_buffer) => match Option::zip(next_buffer.dts, buffer.dts) { + Some((b, a)) => b - a, + None => next_buffer.pts - buffer.pts, + }, + None => { + let timing_info = timing_infos[idx].as_ref().unwrap(); + match Option::zip(timing_info.end_dts, buffer.dts) { + Some((b, a)) => b - a, + None => timing_info.end_pts - buffer.pts, + } + } + }; + dequeued_bytes += buffer.buffer.size() as u64; + interleaved_buffers.push(buffer); + } else { + // No buffers left in this stream, go to next stream + break; + } + } + } + } let mut buffer_list = None; - if !drain_gops.is_empty() { - let earliest_pts = drain_gops.last().unwrap().earliest_pts; - let earliest_pts_position = drain_gops.last().unwrap().earliest_pts_position; - let start_dts = drain_gops.last().unwrap().start_dts; - let start_dts_position = drain_gops.last().unwrap().start_dts_position; - let end_pts = drain_gops[0].end_pts; - let end_dts = drain_gops[0].end_dts; - let dts_offset = state.dts_offset; - - gst::info!( - CAT, - obj: element, - "Draining {} worth of buffers starting at PTS {} DTS {}, DTS offset {}", - end_pts - earliest_pts, - earliest_pts, - start_dts.display(), - dts_offset.display(), - ); + if interleaved_buffers.is_empty() { + assert!(at_eos); + } else { + let min_earliest_pts_position = min_earliest_pts_position.unwrap(); + let min_earliest_pts = min_earliest_pts.unwrap(); + let max_end_pts = max_end_pts.unwrap(); let mut fmp4_header = None; - if !state.generated_headers { + if !state.sent_headers { let mut buffer = state.stream_header.as_ref().unwrap().copy(); { let buffer = buffer.get_mut().unwrap(); - buffer.set_pts(earliest_pts_position); - buffer.set_dts(start_dts_position); + buffer.set_pts(min_earliest_pts_position); + buffer.set_dts(min_start_dts_position); // Header is DISCONT|HEADER buffer.set_flags(gst::BufferFlags::DISCONT | gst::BufferFlags::HEADER); @@ -529,16 +695,9 @@ impl FMP4Mux { fmp4_header = Some(buffer); - state.earliest_pts = Some(earliest_pts); - state.generated_headers = true; + state.sent_headers = true; } - let mut buffers = drain_gops - .into_iter() - .rev() - .flat_map(|gop| gop.buffers) - .collect::>(); - // TODO: Write prft boxes before moof // TODO: Write sidx boxes before moof and rewrite once offsets are known @@ -551,15 +710,9 @@ impl FMP4Mux { boxes::create_fmp4_fragment_header(super::FragmentHeaderConfiguration { variant: class.as_ref().variant, sequence_number, - caps: &[state.caps.as_ref().unwrap()], - timing_infos: &[Some(super::FragmentTimingInfo { - earliest_pts, - start_dts, - end_pts, - end_dts, - dts_offset, - })], - buffers: &buffers, + caps: caps.as_slice(), + timing_infos: timing_infos.as_slice(), + buffers: interleaved_buffers.as_slice(), }) .map_err(|err| { gst::error!( @@ -573,9 +726,9 @@ impl FMP4Mux { { let buffer = fmp4_fragment_header.get_mut().unwrap(); - buffer.set_pts(earliest_pts_position); - buffer.set_dts(start_dts_position); - buffer.set_duration(end_pts.checked_sub(earliest_pts)); + buffer.set_pts(min_earliest_pts_position); + buffer.set_dts(min_start_dts_position); + buffer.set_duration(max_end_pts.checked_sub(min_earliest_pts)); // Fragment header is HEADER buffer.set_flags(gst::BufferFlags::HEADER); @@ -583,17 +736,20 @@ impl FMP4Mux { // Copy metas from the first actual buffer to the fragment header. This allows // getting things like the reference timestamp meta or the timecode meta to identify // the fragment. - let _ = buffers[0] - .buffer - .copy_into(buffer, gst::BufferCopyFlags::META, 0, None); + let _ = interleaved_buffers[0].buffer.copy_into( + buffer, + gst::BufferCopyFlags::META, + 0, + None, + ); } let moof_offset = state.current_offset + fmp4_header.as_ref().map(|h| h.size()).unwrap_or(0) as u64 + moof_offset; - let buffers_len = buffers.len(); - for (idx, buffer) in buffers.iter_mut().enumerate() { + let buffers_len = interleaved_buffers.len(); + for (idx, buffer) in interleaved_buffers.iter_mut().enumerate() { // Fix up buffer flags, all other buffers are DELTA_UNIT let buffer_ref = buffer.buffer.make_mut(); buffer_ref.unset_flags(gst::BufferFlags::all()); @@ -609,29 +765,31 @@ impl FMP4Mux { fmp4_header .into_iter() .chain(Some(fmp4_fragment_header)) - .chain(buffers.into_iter().map(|buffer| buffer.buffer)) + .chain(interleaved_buffers.into_iter().map(|buffer| buffer.buffer)) .inspect(|b| { state.current_offset += b.size() as u64; }) .collect::(), ); - state.fragment_offsets.push(super::FragmentOffset { - time: earliest_pts, - offset: moof_offset, - }); - state.end_pts = Some(end_pts); + // Write mfra only for the main stream, and if there are no buffers for the main stream + // in this segment then don't write anything. + if let Some(Some(ref timing_info)) = timing_infos.get(0) { + state.fragment_offsets.push(super::FragmentOffset { + time: timing_info.earliest_pts, + offset: moof_offset, + }); + } + state.end_pts = Some(max_end_pts); - gst::debug!( - CAT, - obj: element, - "Queued duration updated to {} after draining", - state.queued_duration - ); + // Update for the start PTS of the next fragment + state.fragment_start_pts = state + .fragment_start_pts + .map(|start| start + settings.fragment_duration); } if settings.write_mfra && at_eos { - match boxes::create_mfra(state.caps.as_ref().unwrap(), &state.fragment_offsets) { + match boxes::create_mfra(caps[0], &state.fragment_offsets) { Ok(mut mfra) => { { let mfra = mfra.get_mut().unwrap(); @@ -656,6 +814,93 @@ impl FMP4Mux { Ok(buffer_list) } + fn create_streams( + &self, + element: &super::FMP4Mux, + state: &mut State, + ) -> Result<(), gst::FlowError> { + for pad in element + .sink_pads() + .into_iter() + .map(|pad| pad.downcast::().unwrap()) + { + let caps = match pad.current_caps() { + Some(caps) => caps, + None => { + gst::warning!(CAT, obj: &pad, "Skipping pad without caps"); + continue; + } + }; + + gst::info!(CAT, obj: &pad, "Configuring caps {:?}", caps); + + let s = caps.structure(0).unwrap(); + + let mut intra_only = false; + match s.name() { + "video/x-h264" | "video/x-h265" => { + if !s.has_field_with_type("codec_data", gst::Buffer::static_type()) { + gst::error!(CAT, obj: &pad, "Received caps without codec_data"); + return Err(gst::FlowError::NotNegotiated); + } + } + "audio/mpeg" => { + if !s.has_field_with_type("codec_data", gst::Buffer::static_type()) { + gst::error!(CAT, obj: &pad, "Received caps without codec_data"); + return Err(gst::FlowError::NotNegotiated); + } + intra_only = true; + } + _ => unreachable!(), + } + + state.streams.push(Stream { + sinkpad: pad, + caps, + intra_only, + queued_gops: VecDeque::new(), + fragment_filled: false, + dts_offset: None, + last_force_keyunit_time: None, + }); + } + + if state.streams.is_empty() { + gst::error!(CAT, obj: element, "No streams available"); + return Err(gst::FlowError::Error); + } + + // Sort video streams first and then audio streams, and each group by pad name. + state.streams.sort_by(|a, b| { + let stream_type_of_caps = |caps: &gst::CapsRef| { + let s = caps.structure(0).unwrap(); + + if s.name().starts_with("video/") { + gst::StreamType::VIDEO + } else if s.name().starts_with("audio/") { + gst::StreamType::AUDIO + } else { + unimplemented!(); + } + }; + + let st_a = stream_type_of_caps(&a.caps); + let st_b = stream_type_of_caps(&b.caps); + + if st_a == st_b { + return a.sinkpad.name().cmp(&b.sinkpad.name()); + } + + if st_a == gst::StreamType::VIDEO { + std::cmp::Ordering::Less + } else { + std::cmp::Ordering::Greater + } + }); + + Ok(()) + } + fn update_header( &self, element: &super::FMP4Mux, @@ -670,7 +915,7 @@ impl FMP4Mux { return Ok(None); } - assert!(!at_eos || state.queued_gops.is_empty()); + assert!(!at_eos || state.streams.iter().all(|s| s.queued_gops.is_empty())); let duration = state .end_pts @@ -678,10 +923,12 @@ impl FMP4Mux { .ok() .flatten(); + let caps = state.streams.iter().map(|s| &s.caps).collect::>(); + let mut buffer = boxes::create_fmp4_header(super::HeaderConfiguration { variant, update: at_eos, - caps: &[state.caps.as_ref().unwrap()], + caps: caps.as_slice(), write_mehd: settings.write_mehd, duration: if at_eos { duration } else { None }, }) @@ -727,20 +974,6 @@ impl ObjectSubclass for FMP4Mux { type Type = super::FMP4Mux; type ParentType = gst_base::Aggregator; type Class = Class; - - fn with_class(klass: &Self::Class) -> Self { - let templ = klass.pad_template("sink").unwrap(); - let sinkpad = - gst::PadBuilder::::from_template(&templ, Some("sink")) - .flags(gst::PadFlags::ACCEPT_INTERSECT) - .build(); - - Self { - sinkpad, - settings: Mutex::default(), - state: Mutex::default(), - } - } } impl ObjectImpl for FMP4Mux { @@ -779,6 +1012,25 @@ impl ObjectImpl for FMP4Mux { DEFAULT_WRITE_MFRA, glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY, ), + glib::ParamSpecUInt64::new( + "interleave-bytes", + "Interleave Bytes", + "Interleave between streams in bytes", + 0, + u64::MAX, + DEFAULT_INTERLEAVE_BYTES.unwrap_or(0), + glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY, + ), + glib::ParamSpecUInt64::new( + "interleave-time", + "Interleave Time", + "Interleave between streams in nanoseconds", + 0, + u64::MAX, + DEFAULT_INTERLEAVE_TIME.map(gst::ClockTime::nseconds).unwrap_or(u64::MAX), + glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY, + ), + ] }); @@ -818,6 +1070,22 @@ impl ObjectImpl for FMP4Mux { settings.write_mehd = value.get().expect("type checked upstream"); } + "interleave-bytes" => { + let mut settings = self.settings.lock().unwrap(); + settings.interleave_bytes = match value.get().expect("type checked upstream") { + 0 => None, + v => Some(v), + }; + } + + "interleave-time" => { + let mut settings = self.settings.lock().unwrap(); + settings.interleave_time = match value.get().expect("type checked upstream") { + Some(gst::ClockTime::ZERO) | None => None, + v => v, + }; + } + _ => unimplemented!(), } } @@ -844,6 +1112,16 @@ impl ObjectImpl for FMP4Mux { settings.write_mehd.to_value() } + "interleave-bytes" => { + let settings = self.settings.lock().unwrap(); + settings.interleave_bytes.unwrap_or(0).to_value() + } + + "interleave-time" => { + let settings = self.settings.lock().unwrap(); + settings.interleave_time.to_value() + } + _ => unimplemented!(), } } @@ -851,7 +1129,19 @@ impl ObjectImpl for FMP4Mux { fn constructed(&self, obj: &Self::Type) { self.parent_constructed(obj); - obj.add_pad(&self.sinkpad).unwrap(); + let class = obj.class(); + for templ in class.pad_template_list().filter(|templ| { + templ.presence() == gst::PadPresence::Always + && templ.direction() == gst::PadDirection::Sink + }) { + let sinkpad = + gst::PadBuilder::::from_template(&templ, Some("sink")) + .flags(gst::PadFlags::ACCEPT_INTERSECT) + .build(); + + obj.add_pad(&sinkpad).unwrap(); + } + obj.set_latency(Settings::default().fragment_duration, None); } } @@ -859,16 +1149,24 @@ impl ObjectImpl for FMP4Mux { impl GstObjectImpl for FMP4Mux {} impl ElementImpl for FMP4Mux { - fn release_pad(&self, _element: &Self::Type, _pad: &gst::Pad) {} - fn request_new_pad( &self, - _element: &Self::Type, - _templ: &gst::PadTemplate, - _name: Option, - _caps: Option<&gst::Caps>, + element: &Self::Type, + templ: &gst::PadTemplate, + name: Option, + caps: Option<&gst::Caps>, ) -> Option { - None + let state = self.state.lock().unwrap(); + if state.stream_header.is_some() { + gst::error!( + CAT, + obj: element, + "Can't request new pads after header was generated" + ); + return None; + } + + self.parent_request_new_pad(element, templ, name, caps) } } @@ -885,14 +1183,9 @@ impl AggregatorImpl for FMP4Mux { match query.view_mut() { QueryViewMut::Caps(q) => { - let state = self.state.lock().unwrap(); - - let allowed_caps = if let Some(ref caps) = state.caps { - // TODO: Maybe allow codec_data changes and similar? - caps.clone() - } else { - aggregator_pad.pad_template_caps() - }; + let allowed_caps = aggregator_pad + .current_caps() + .unwrap_or_else(|| aggregator_pad.pad_template_caps()); if let Some(filter_caps) = q.filter() { let res = filter_caps @@ -956,59 +1249,14 @@ impl AggregatorImpl for FMP4Mux { .downcast::() .expect("non-TIME segment"); gst::info!(CAT, obj: aggregator_pad, "Received segment {:?}", segment); - aggregator.update_segment(&segment); - self.parent_sink_event(aggregator, aggregator_pad, event) - } - EventView::Caps(ev) => { - let caps = ev.caps_owned(); - - gst::info!(CAT, obj: aggregator_pad, "Received caps {:?}", caps); - let caps = { - let settings = self.settings.lock().unwrap().clone(); - let mut state = self.state.lock().unwrap(); - - let s = caps.structure(0).unwrap(); - - match s.name() { - "video/x-h264" | "video/x-h265" => { - if !s.has_field_with_type("codec_data", gst::Buffer::static_type()) { - gst::error!( - CAT, - obj: aggregator_pad, - "Received caps without codec_data" - ); - return false; - } - } - "audio/mpeg" => { - if !s.has_field_with_type("codec_data", gst::Buffer::static_type()) { - gst::error!( - CAT, - obj: aggregator_pad, - "Received caps without codec_data" - ); - return false; - } - state.intra_only = true; - } - _ => unreachable!(), - } - - state.caps = Some(caps); - - let (_, caps) = - match self.update_header(aggregator, &mut state, &settings, false) { - Ok(Some(res)) => res, - _ => { - return false; - } - }; - - caps - }; - - aggregator.set_src_caps(&caps); + // Only forward the segment event verbatim if this is a single stream variant. + // Otherwise we have to produce a default segment and re-timestamp all buffers + // with their running time. + let class = aggregator.class(); + if class.as_ref().variant.is_single_stream() { + aggregator.update_segment(&segment); + } self.parent_sink_event(aggregator, aggregator_pad, event) } @@ -1057,10 +1305,13 @@ impl AggregatorImpl for FMP4Mux { let mut state = self.state.lock().unwrap(); - state.queued_gops.clear(); - state.queued_duration = gst::ClockTime::ZERO; - state.dts_offset = None; - state.last_force_keyunit_time = None; + for stream in &mut state.streams { + stream.queued_gops.clear(); + stream.dts_offset = None; + stream.last_force_keyunit_time = None; + stream.fragment_filled = false; + } + state.current_offset = 0; state.fragment_offsets.clear(); @@ -1071,7 +1322,9 @@ impl AggregatorImpl for FMP4Mux { gst::trace!(CAT, obj: aggregator, "Stopping"); let _ = self.parent_stop(aggregator); + *self.state.lock().unwrap() = State::default(); + Ok(()) } @@ -1079,7 +1332,19 @@ impl AggregatorImpl for FMP4Mux { gst::trace!(CAT, obj: aggregator, "Starting"); self.parent_start(aggregator)?; + + // For non-single-stream variants configure a default segment that allows for negative + // DTS so that we can correctly re-timestamp buffers with their running times. + let class = aggregator.class(); + if !class.as_ref().variant.is_single_stream() { + let mut segment = gst::FormattedSegment::::new(); + segment.set_start(SEGMENT_OFFSET); + segment.set_position(SEGMENT_OFFSET); + aggregator.update_segment(&segment); + } + *self.state.lock().unwrap() = State::default(); + Ok(()) } @@ -1094,52 +1359,159 @@ impl AggregatorImpl for FMP4Mux { ) -> Result { let settings = self.settings.lock().unwrap().clone(); - let is_eos; + let mut all_eos = true; let mut upstream_events = vec![]; let buffers = { let mut state = self.state.lock().unwrap(); - let segment = match self - .sinkpad - .segment() - .clone() - .downcast::() - .ok() - { - Some(segment) => segment, - None => { - gst::error!(CAT, obj: aggregator, "Got buffer before segment"); - return Err(gst::FlowError::Error); + // Create streams, stream header in the beginning and set output caps. + if state.stream_header.is_none() { + self.create_streams(aggregator, &mut state)?; + + let (_, caps) = self + .update_header(aggregator, &mut state, &settings, false)? + .unwrap(); + + drop(state); + aggregator.set_src_caps(&caps); + state = self.state.lock().unwrap(); + } + + // Queue buffers from all streams that are not filled for the current fragment yet + let fragment_start_pts = state.fragment_start_pts; + for (idx, stream) in state.streams.iter_mut().enumerate() { + if stream.fragment_filled { + let buffer = stream.sinkpad.peek_buffer(); + all_eos &= buffer.is_none() && stream.sinkpad.is_eos(); + + continue; } - }; - let buffer = self.sinkpad.pop_buffer(); - is_eos = buffer.is_none() && self.sinkpad.is_eos(); + let buffer = stream.sinkpad.pop_buffer(); + all_eos &= buffer.is_none() && stream.sinkpad.is_eos(); + + let buffer = match buffer { + None => continue, + Some(buffer) => buffer, + }; + + let segment = match stream + .sinkpad + .segment() + .clone() + .downcast::() + .ok() + { + Some(segment) => segment, + None => { + gst::error!(CAT, obj: &stream.sinkpad, "Got buffer before segment"); + return Err(gst::FlowError::Error); + } + }; - if let Some(buffer) = buffer { let pts = buffer.pts(); // Queue up the buffer and update GOP tracking state - self.queue_input(aggregator, &mut state, &segment, buffer)?; + self.queue_input(aggregator, idx, stream, &segment, buffer)?; // If we have a PTS with this buffer, check if a new force-keyunit event for the next // fragment start has to be created if let Some(pts) = pts { - if let Some(event) = self.create_force_keyunit_event( - aggregator, &mut state, &settings, &segment, pts, - )? { - upstream_events.push(event); + if let Some(event) = self + .create_force_keyunit_event(aggregator, stream, &settings, &segment, pts)? + { + upstream_events.push((stream.sinkpad.clone(), event)); + } + } + + // Check if this stream is filled enough now. + if let Some((queued_end_pts, fragment_start_pts)) = Option::zip( + stream + .queued_gops + .iter() + .find(|gop| gop.final_end_pts) + .map(|gop| gop.end_pts), + fragment_start_pts, + ) { + if queued_end_pts.saturating_sub(fragment_start_pts) + >= settings.fragment_duration + { + gst::debug!(CAT, obj: &stream.sinkpad, "Stream queued enough data for this fragment"); + stream.fragment_filled = true; + } + } + } + + if all_eos { + gst::debug!(CAT, obj: aggregator, "All streams are EOS now"); + } + + // Calculate the earliest PTS after queueing input if we can now. + if state.earliest_pts.is_none() { + let mut earliest_pts = None; + + for stream in &state.streams { + let stream_earliest_pts = match stream.queued_gops.back() { + None => { + earliest_pts = None; + break; + } + Some(oldest_gop) => { + if !oldest_gop.final_earliest_pts { + earliest_pts = None; + break; + } + + oldest_gop.earliest_pts + } + }; + + if earliest_pts.map_or(true, |earliest_pts| earliest_pts > stream_earliest_pts) + { + earliest_pts = Some(stream_earliest_pts); + } + } + + if let Some(earliest_pts) = earliest_pts { + gst::info!(CAT, obj: aggregator, "Got earliest PTS {}", earliest_pts); + state.earliest_pts = Some(earliest_pts); + state.fragment_start_pts = Some(earliest_pts); + + for stream in &mut state.streams { + if let Some(event) = self.create_initial_force_keyunit_event( + aggregator, + stream, + &settings, + earliest_pts, + )? { + upstream_events.push((stream.sinkpad.clone(), event)); + } + + // Check if this stream is filled enough now. + if let Some(queued_end_pts) = stream + .queued_gops + .iter() + .find(|gop| gop.final_end_pts) + .map(|gop| gop.end_pts) + { + if queued_end_pts.saturating_sub(earliest_pts) + >= settings.fragment_duration + { + gst::debug!(CAT, obj: &stream.sinkpad, "Stream queued enough data for this fragment"); + stream.fragment_filled = true; + } + } } } } // If enough GOPs were queued, drain and create the output fragment - self.drain(aggregator, &mut state, &settings, is_eos)? + self.drain(aggregator, &mut state, &settings, all_eos)? }; - for event in upstream_events { - self.sinkpad.push_event(event); + for (sinkpad, event) in upstream_events { + sinkpad.push_event(event); } if let Some(buffers) = buffers { @@ -1147,7 +1519,9 @@ impl AggregatorImpl for FMP4Mux { aggregator.finish_buffer_list(buffers)?; } - if is_eos { + if all_eos { + gst::debug!(CAT, obj: aggregator, "Doing EOS handling"); + if settings.header_update_mode != super::HeaderUpdateMode::None { let updated_header = self.update_header( aggregator, @@ -1163,14 +1537,14 @@ impl AggregatorImpl for FMP4Mux { let src_pad = aggregator.src_pad(); let mut q = gst::query::Seeking::new(gst::Format::Bytes); if src_pad.peer_query(&mut q) && q.result().0 { + aggregator.set_src_caps(&caps); + // Seek to the beginning with a default bytes segment aggregator .update_segment( &gst::FormattedSegment::::new(), ); - aggregator.set_src_caps(&caps); - if let Err(err) = aggregator.finish_buffer_list(buffer_list) { gst::error!( CAT, @@ -1212,8 +1586,8 @@ impl AggregatorImpl for FMP4Mux { } } - // Need to generate new headers if started again after EOS - self.state.lock().unwrap().generated_headers = false; + // Need to output new headers if started again after EOS + self.state.lock().unwrap().sent_headers = false; Err(gst::FlowError::Eos) } else { @@ -1294,9 +1668,9 @@ impl ElementImpl for ISOFMP4Mux { .unwrap(); let sink_pad_template = gst::PadTemplate::new( - "sink", + "sink_%u", gst::PadDirection::Sink, - gst::PadPresence::Always, + gst::PadPresence::Request, &[ gst::Structure::builder("video/x-h264") .field("stream-format", gst::List::new(["avc", "avc3"])) diff --git a/generic/fmp4/src/fmp4mux/mod.rs b/generic/fmp4/src/fmp4mux/mod.rs index b3d23e25..98e328a3 100644 --- a/generic/fmp4/src/fmp4mux/mod.rs +++ b/generic/fmp4/src/fmp4mux/mod.rs @@ -99,6 +99,15 @@ pub(crate) enum Variant { DASH, } +impl Variant { + pub(crate) fn is_single_stream(self) -> bool { + match self { + Variant::ISO => false, + Variant::CMAF | Variant::DASH => true, + } + } +} + #[derive(Debug)] pub(crate) struct FragmentOffset { time: gst::ClockTime, diff --git a/generic/fmp4/tests/tests.rs b/generic/fmp4/tests/tests.rs index 6c871ccc..c9221e70 100644 --- a/generic/fmp4/tests/tests.rs +++ b/generic/fmp4/tests/tests.rs @@ -6,6 +6,8 @@ // SPDX-License-Identifier: MPL-2.0 // +use gst::prelude::*; + fn init() { use std::sync::Once; static INIT: Once = Once::new(); @@ -16,12 +18,18 @@ fn init() { }); } -#[test] -fn test_buffer_flags() { - init(); +fn test_buffer_flags_single_stream(cmaf: bool) { + let mut h = if cmaf { + gst_check::Harness::new("cmafmux") + } else { + gst_check::Harness::with_padnames("isofmp4mux", Some("sink_0"), Some("src")) + }; // 5s fragment duration - let mut h = gst_check::Harness::new_parse("isofmp4mux fragment-duration=5000000000"); + h.element() + .unwrap() + .set_property("fragment-duration", gst::ClockTime::from_seconds(5)); + h.set_src_caps( gst::Caps::builder("video/x-h264") .field("width", 1920i32) @@ -34,6 +42,12 @@ fn test_buffer_flags() { ); h.play(); + let output_offset = if cmaf { + gst::ClockTime::ZERO + } else { + gst::ClockTime::from_seconds(60 * 60 * 1000) + }; + // Push 7 buffers of 1s each, 1st and last buffer without DELTA_UNIT flag for i in 0..7 { let mut buffer = gst::Buffer::with_size(1).unwrap(); @@ -75,13 +89,19 @@ fn test_buffer_flags() { header.flags(), gst::BufferFlags::HEADER | gst::BufferFlags::DISCONT ); - assert_eq!(header.pts(), Some(gst::ClockTime::ZERO)); - assert_eq!(header.dts(), Some(gst::ClockTime::ZERO)); + assert_eq!(header.pts(), Some(gst::ClockTime::ZERO + output_offset)); + assert_eq!(header.dts(), Some(gst::ClockTime::ZERO + output_offset)); let fragment_header = h.pull().unwrap(); assert_eq!(fragment_header.flags(), gst::BufferFlags::HEADER); - assert_eq!(fragment_header.pts(), Some(gst::ClockTime::ZERO)); - assert_eq!(fragment_header.dts(), Some(gst::ClockTime::ZERO)); + assert_eq!( + fragment_header.pts(), + Some(gst::ClockTime::ZERO + output_offset) + ); + assert_eq!( + fragment_header.dts(), + Some(gst::ClockTime::ZERO + output_offset) + ); assert_eq!( fragment_header.duration(), Some(gst::ClockTime::from_seconds(5)) @@ -97,8 +117,14 @@ fn test_buffer_flags() { } else { assert_eq!(buffer.flags(), gst::BufferFlags::DELTA_UNIT); } - assert_eq!(buffer.pts(), Some(gst::ClockTime::from_seconds(i))); - assert_eq!(buffer.dts(), Some(gst::ClockTime::from_seconds(i))); + assert_eq!( + buffer.pts(), + Some(gst::ClockTime::from_seconds(i) + output_offset) + ); + assert_eq!( + buffer.dts(), + Some(gst::ClockTime::from_seconds(i) + output_offset) + ); assert_eq!(buffer.duration(), Some(gst::ClockTime::SECOND)); } @@ -106,8 +132,14 @@ fn test_buffer_flags() { let fragment_header = h.pull().unwrap(); assert_eq!(fragment_header.flags(), gst::BufferFlags::HEADER); - assert_eq!(fragment_header.pts(), Some(gst::ClockTime::from_seconds(5))); - assert_eq!(fragment_header.dts(), Some(gst::ClockTime::from_seconds(5))); + assert_eq!( + fragment_header.pts(), + Some(gst::ClockTime::from_seconds(5) + output_offset) + ); + assert_eq!( + fragment_header.dts(), + Some(gst::ClockTime::from_seconds(5) + output_offset) + ); assert_eq!( fragment_header.duration(), Some(gst::ClockTime::from_seconds(2)) @@ -123,8 +155,14 @@ fn test_buffer_flags() { } else { assert_eq!(buffer.flags(), gst::BufferFlags::DELTA_UNIT); } - assert_eq!(buffer.pts(), Some(gst::ClockTime::from_seconds(i))); - assert_eq!(buffer.dts(), Some(gst::ClockTime::from_seconds(i))); + assert_eq!( + buffer.pts(), + Some(gst::ClockTime::from_seconds(i) + output_offset) + ); + assert_eq!( + buffer.dts(), + Some(gst::ClockTime::from_seconds(i) + output_offset) + ); assert_eq!(buffer.duration(), Some(gst::ClockTime::SECOND)); } @@ -137,3 +175,17 @@ fn test_buffer_flags() { let ev = h.pull_event().unwrap(); assert_eq!(ev.type_(), gst::EventType::Eos); } + +#[test] +fn test_buffer_flags_single_stream_cmaf() { + init(); + + test_buffer_flags_single_stream(true); +} + +#[test] +fn test_buffer_flags_single_stream_iso() { + init(); + + test_buffer_flags_single_stream(false); +}