mirror of
https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs.git
synced 2025-01-24 09:58:13 +00:00
fmp4mux: Handle EOS correctly if it happens before a fragment start time was determined
Whatever earliest time we have at that point is going to be the start time. Also handle the case correctly where all inputs are EOS before any buffers were received at all. Fixes https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/issues/270 Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/986>
This commit is contained in:
parent
9491c77540
commit
93ba677b18
1 changed files with 49 additions and 43 deletions
|
@ -958,12 +958,11 @@ impl FMP4Mux {
|
|||
//
|
||||
// If the first stream is already EOS then the next stream that is not EOS yet will be
|
||||
// taken in its place.
|
||||
let fragment_start_pts = state.fragment_start_pts.unwrap();
|
||||
gst::info!(
|
||||
CAT,
|
||||
imp: self,
|
||||
"Starting to drain at {}",
|
||||
fragment_start_pts
|
||||
state.fragment_start_pts.display()
|
||||
);
|
||||
|
||||
for (idx, stream) in state.streams.iter_mut().enumerate() {
|
||||
|
@ -976,40 +975,44 @@ impl FMP4Mux {
|
|||
|| stream.queued_gops.get(1).map(|gop| gop.final_earliest_pts) == Some(true)
|
||||
);
|
||||
|
||||
// Drain all complete GOPs until at most one fragment duration was dequeued for the
|
||||
// first stream, or until the dequeued duration of the first stream.
|
||||
let mut gops = Vec::with_capacity(stream.queued_gops.len());
|
||||
let dequeue_end_pts =
|
||||
fragment_end_pts.unwrap_or(fragment_start_pts + settings.fragment_duration);
|
||||
gst::trace!(
|
||||
CAT,
|
||||
obj: stream.sinkpad,
|
||||
"Draining up to end PTS {} / duration {}",
|
||||
dequeue_end_pts,
|
||||
dequeue_end_pts - fragment_start_pts
|
||||
);
|
||||
if !stream.queued_gops.is_empty() {
|
||||
let fragment_start_pts = state.fragment_start_pts.unwrap();
|
||||
|
||||
while let Some(gop) = stream.queued_gops.back() {
|
||||
// If this GOP is not complete then we can't pop it yet.
|
||||
//
|
||||
// If there was no complete GOP at all yet then it might be bigger than the
|
||||
// fragment duration. In this case we might not be able to handle the latency
|
||||
// requirements in a live pipeline.
|
||||
if !gop.final_end_pts && !at_eos && !stream.sinkpad.is_eos() {
|
||||
break;
|
||||
// Drain all complete GOPs until at most one fragment duration was dequeued for the
|
||||
// first stream, or until the dequeued duration of the first stream.
|
||||
let dequeue_end_pts =
|
||||
fragment_end_pts.unwrap_or(fragment_start_pts + settings.fragment_duration);
|
||||
gst::trace!(
|
||||
CAT,
|
||||
obj: stream.sinkpad,
|
||||
"Draining up to end PTS {} / duration {}",
|
||||
dequeue_end_pts,
|
||||
dequeue_end_pts - fragment_start_pts
|
||||
);
|
||||
|
||||
while let Some(gop) = stream.queued_gops.back() {
|
||||
// If this GOP is not complete then we can't pop it yet.
|
||||
//
|
||||
// If there was no complete GOP at all yet then it might be bigger than the
|
||||
// fragment duration. In this case we might not be able to handle the latency
|
||||
// requirements in a live pipeline.
|
||||
if !gop.final_end_pts && !at_eos && !stream.sinkpad.is_eos() {
|
||||
break;
|
||||
}
|
||||
|
||||
// If this GOP starts after the fragment end then don't dequeue it yet unless this is
|
||||
// the first stream and no GOPs were dequeued at all yet. This would mean that the
|
||||
// GOP is bigger than the fragment duration.
|
||||
if !at_eos
|
||||
&& gop.end_pts > dequeue_end_pts
|
||||
&& (fragment_end_pts.is_some() || !gops.is_empty())
|
||||
{
|
||||
break;
|
||||
}
|
||||
|
||||
gops.push(stream.queued_gops.pop_back().unwrap());
|
||||
}
|
||||
|
||||
// If this GOP starts after the fragment end then don't dequeue it yet unless this is
|
||||
// the first stream and no GOPs were dequeued at all yet. This would mean that the
|
||||
// GOP is bigger than the fragment duration.
|
||||
if !at_eos
|
||||
&& gop.end_pts > dequeue_end_pts
|
||||
&& (fragment_end_pts.is_some() || !gops.is_empty())
|
||||
{
|
||||
break;
|
||||
}
|
||||
|
||||
gops.push(stream.queued_gops.pop_back().unwrap());
|
||||
}
|
||||
stream.fragment_filled = false;
|
||||
|
||||
|
@ -2183,6 +2186,11 @@ impl AggregatorImpl for FMP4Mux {
|
|||
}
|
||||
}
|
||||
|
||||
all_eos = state.streams.iter().all(|stream| stream.sinkpad.is_eos());
|
||||
if all_eos {
|
||||
gst::debug!(CAT, imp: self, "All streams are EOS now");
|
||||
}
|
||||
|
||||
// Calculate the earliest PTS after queueing input if we can now.
|
||||
if state.earliest_pts.is_none() {
|
||||
let mut earliest_pts = None;
|
||||
|
@ -2191,12 +2199,15 @@ impl AggregatorImpl for FMP4Mux {
|
|||
for stream in &state.streams {
|
||||
let (stream_earliest_pts, stream_start_dts) = match stream.queued_gops.back() {
|
||||
None => {
|
||||
earliest_pts = None;
|
||||
start_dts = None;
|
||||
break;
|
||||
if !all_eos && !timeout {
|
||||
earliest_pts = None;
|
||||
start_dts = None;
|
||||
break;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
Some(oldest_gop) => {
|
||||
if !timeout && !oldest_gop.final_earliest_pts {
|
||||
if !all_eos && !timeout && !oldest_gop.final_earliest_pts {
|
||||
earliest_pts = None;
|
||||
start_dts = None;
|
||||
break;
|
||||
|
@ -2221,7 +2232,7 @@ impl AggregatorImpl for FMP4Mux {
|
|||
gst::info!(
|
||||
CAT,
|
||||
imp: self,
|
||||
"Got earliest PTS {}, start DTS {}",
|
||||
"Got earliest PTS {}, start DTS {} (timeout: {timeout}, all eos: {all_eos})",
|
||||
earliest_pts,
|
||||
start_dts.display()
|
||||
);
|
||||
|
@ -2303,11 +2314,6 @@ impl AggregatorImpl for FMP4Mux {
|
|||
}
|
||||
}
|
||||
|
||||
all_eos = state.streams.iter().all(|stream| stream.sinkpad.is_eos());
|
||||
if all_eos {
|
||||
gst::debug!(CAT, imp: self, "All streams are EOS now");
|
||||
}
|
||||
|
||||
// If enough GOPs were queued, drain and create the output fragment
|
||||
match self.drain(
|
||||
&mut state,
|
||||
|
|
Loading…
Reference in a new issue