fmp4mux: Improve split-at-running-time handling

Recalculate the fragment end PTS more regularly to allow adding a new
split request for the currently active fragment.

Also directly discard split requests that are before the current
fragment start PTS.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/2178>
This commit is contained in:
Sebastian Dröge 2025-04-06 13:25:30 +03:00 committed by GStreamer Marge Bot
parent a49a9243ea
commit 8bed1e156e

View file

@ -13,10 +13,10 @@ use gst_base::prelude::*;
use gst_base::subclass::prelude::*;
use anyhow::{bail, Context};
use std::cmp;
use std::collections::BTreeSet;
use std::collections::VecDeque;
use std::mem;
use std::ops::Bound::Excluded;
use std::sync::Mutex;
use crate::fmp4mux::obu::read_seq_header_obu_bytes;
@ -414,8 +414,8 @@ struct State {
/// If headers (ftyp / moov box) were sent.
sent_headers: bool,
/// Manually requested fragment boundaries
manual_fragment_boundaries: BTreeSet<gst::ClockTime>,
/// split-at-running-time requests
pending_split_at_running_time_requests: BTreeSet<gst::ClockTime>,
}
impl State {
@ -426,7 +426,7 @@ impl State {
self.current_offset = 0;
self.fragment_offsets.clear();
self.manual_fragment_boundaries.clear();
self.pending_split_at_running_time_requests.clear();
self.end_pts = None;
self.fragment_start_pts = None;
self.fragment_end_pts = None;
@ -1360,21 +1360,6 @@ impl FMP4Mux {
Ok(())
}
fn get_fragment_end_pts(
&self,
manual_fragment_boundaries: &BTreeSet<gst::ClockTime>,
settings: &Settings,
fragment_start_pts: gst::ClockTime,
) -> gst::ClockTime {
let fragment_end_pts = fragment_start_pts + settings.fragment_duration;
// If we have a manual fragment boundary set then use that
*manual_fragment_boundaries
.range((Excluded(fragment_start_pts), Excluded(fragment_end_pts)))
.next()
.unwrap_or(&fragment_end_pts)
}
/// Check if the stream is filled enough for the current chunk / fragment.
fn check_stream_filled(
&self,
@ -1633,6 +1618,56 @@ impl FMP4Mux {
}
}
/// Calculate / update the fragment end PTS.
///
/// This takes into account the configured fragment-duration and the pending
/// split-at-running-time requests and updates or sets the fragment end PTS accordingly.
///
/// Also cleans up all past split-at-running-time requests.
fn calculate_fragment_end_pts(&self, settings: &Settings, state: &mut State) {
use std::ops::Bound;
let Some(fragment_start_pts) = state.fragment_start_pts else {
return;
};
while let Some(boundary) = state.pending_split_at_running_time_requests.first() {
if *boundary > fragment_start_pts {
break;
}
let _ = state.pending_split_at_running_time_requests.pop_first();
}
let scheduled_fragment_end_pts = fragment_start_pts + settings.fragment_duration;
let earliest_requested_fragment_end_pts = state
.pending_split_at_running_time_requests
.range((Bound::Excluded(fragment_start_pts), Bound::Unbounded))
.next()
.copied();
let new_fragment_end_pts = if let Some(earliest_requested_fragment_end_pts) =
earliest_requested_fragment_end_pts
{
cmp::min(
scheduled_fragment_end_pts,
earliest_requested_fragment_end_pts,
)
} else {
scheduled_fragment_end_pts
};
if state.fragment_end_pts != Some(new_fragment_end_pts) {
gst::debug!(
CAT,
imp = self,
"Updating fragment end PTS from {} to {}",
state.fragment_end_pts.display(),
new_fragment_end_pts,
);
state.fragment_end_pts = Some(new_fragment_end_pts);
}
}
/// Calculate earliest PTS, i.e. PTS of the very first fragment.
///
/// This also sends a force-keyunit event for the start of the second fragment.
@ -1703,16 +1738,15 @@ impl FMP4Mux {
);
let fragment_start_pts = earliest_pts;
let fragment_end_pts =
self.get_fragment_end_pts(&state.manual_fragment_boundaries, settings, earliest_pts);
let chunk_start_pts = earliest_pts;
state.earliest_pts = Some(earliest_pts);
state.start_dts = start_dts;
state.fragment_start_pts = Some(fragment_start_pts);
state.fragment_end_pts = Some(fragment_end_pts);
state.chunk_start_pts = Some(chunk_start_pts);
self.calculate_fragment_end_pts(settings, state);
// Check if any of the streams are already filled enough for the first chunk/fragment.
for stream in &mut state.streams {
// Now send force-keyunit events for the second fragment start.
@ -2368,15 +2402,6 @@ impl FMP4Mux {
}
}
if fragment_filled {
while let Some(boundary) = state.manual_fragment_boundaries.first() {
if boundary > &fragment_end_pts {
break;
}
let _ = state.manual_fragment_boundaries.pop_first();
}
}
let trak_timescale = stream.sinkpad.imp().state.lock().unwrap().trak_timescale;
if gops.is_empty() {
gst::info!(CAT, obj = stream.sinkpad, "Draining no buffers",);
@ -2851,11 +2876,7 @@ impl FMP4Mux {
// Update for the start PTS of the next fragment / chunk
if fragment_filled || state.need_new_header {
state.fragment_start_pts = Some(chunk_end_pts);
state.fragment_end_pts = Some(self.get_fragment_end_pts(
&state.manual_fragment_boundaries,
settings,
chunk_end_pts,
));
self.calculate_fragment_end_pts(settings, state);
gst::info!(
CAT,
imp = self,
@ -3374,8 +3395,25 @@ impl ObjectImpl for FMP4Mux {
.expect("time arg")
.unwrap_or(gst::ClockTime::ZERO);
state.manual_fragment_boundaries.insert(time);
gst::debug!(CAT, obj = element, "New fragment split added at {:?}", time);
if let Some(fragment_start_pts) = state.fragment_start_pts {
if time < fragment_start_pts {
gst::warning!(
CAT,
obj = element,
"Ignoring split-at-running-time request for {time} before current fragment start {fragment_start_pts}",
);
return None;
}
}
gst::debug!(
CAT,
obj = element,
"New split-at-running-time request added at {time}",
);
state.pending_split_at_running_time_requests.insert(time);
None
})
@ -3909,6 +3947,8 @@ impl AggregatorImpl for FMP4Mux {
self.create_streams(&mut state)?;
}
self.calculate_fragment_end_pts(&settings, &mut state);
self.queue_available_buffers(&mut state, &settings, timeout)?;
all_eos = state.streams.iter().all(|stream| stream.sinkpad.is_eos());