mpegtslivesrc: Handle PCR discontinuities as errors for now

More work is needed to make this work seemlessly and right now it would
simply cause invalid timestamps to be created.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1717>
This commit is contained in:
Sebastian Dröge 2024-08-14 11:02:43 +03:00 committed by GStreamer Marge Bot
parent ede82ca5b4
commit 102185d09d

View file

@ -165,9 +165,10 @@ struct MpegTSLiveSourceState {
}
impl MpegTSLiveSourceState {
fn store_observation(&mut self, pcr: u64, monotonic_time: gst::ClockTime) {
// Grab time of our clock and controlled clock
/// Grab time of our clock and controlled clock
///
/// Returns `true` on PCR discontinuities.
fn store_observation(&mut self, pcr: u64, monotonic_time: gst::ClockTime) -> bool {
// If this is the first PCR we observe:
// * Remember the PCR *and* the associated monotonic clock value when capture
// * `base_pcr` `base_monotonic`
@ -178,6 +179,8 @@ impl MpegTSLiveSourceState {
// * Store (observation_monotonic, buffer_pts)
let new_pcr: MpegTsPcr;
let mut discont = false;
if let (Some(base_pcr), Some(base_monotonic), Some(last_seen_pcr)) =
(self.base_pcr, self.base_monotonic, self.last_seen_pcr)
{
@ -195,18 +198,24 @@ impl MpegTSLiveSourceState {
monotonic_time,
);
} else {
gst::debug!(CAT, "DISCONT detected, Picking new reference times (pcr:{pcr:#?}, monotonic:{monotonic_time}");
gst::error!(CAT, "DISCONT detected, Picking new reference times (pcr:{pcr:#?}, monotonic:{monotonic_time}");
new_pcr = MpegTsPcr::new(pcr);
self.base_pcr = Some(new_pcr);
self.base_monotonic = Some(monotonic_time);
discont = true;
}
} else {
gst::debug!(CAT, "Picking new reference times (pcr:{pcr:#?}, monotonic:{monotonic_time} for discontinuity");
gst::debug!(
CAT,
"Picking initial reference times (pcr:{pcr:#?}, monotonic:{monotonic_time}"
);
new_pcr = MpegTsPcr::new(pcr);
self.base_pcr = Some(new_pcr);
self.base_monotonic = Some(monotonic_time);
}
self.last_seen_pcr = Some(new_pcr);
discont
}
}
@ -309,7 +318,13 @@ impl MpegTsLiveSource {
if let (Some(monotonic_time), Some(raw_pcr)) =
(monotonic_time, get_pcr_from_buffer(self, &buffer))
{
state.store_observation(raw_pcr, monotonic_time)
if state.store_observation(raw_pcr, monotonic_time) {
// FIXME: Handle PCR discontinuities more gracefully. This requires replacing the
// clock and making sure that the application selects the new clock, and making
// sure that tsdemux detects it correctly too.
gst::element_imp_error!(self, gst::StreamError::Format, ["PCR discontinuity"]);
return Err(gst::FlowError::Error);
}
};
// Update buffer timestamp if present
@ -338,6 +353,7 @@ impl MpegTsLiveSource {
// The last monotonic time
let mut monotonic_time = None;
let mut discont = false;
bufferlist.make_mut().foreach_mut(|mut buffer, _idx| {
let this_buffer_timestamp = buffer.dts_or_pts();
@ -353,7 +369,7 @@ impl MpegTsLiveSource {
if let (Some(monotonic_time), Some(raw_pcr)) =
(monotonic_time, get_pcr_from_buffer(self, &buffer))
{
state.store_observation(raw_pcr, monotonic_time)
discont = state.store_observation(raw_pcr, monotonic_time);
};
// Update buffer timestamp if present
@ -370,6 +386,15 @@ impl MpegTsLiveSource {
};
ControlFlow::Continue(Some(buffer))
});
if discont {
// FIXME: Handle PCR discontinuities more gracefully. This requires replacing the
// clock and making sure that the application selects the new clock, and making
// sure that tsdemux detects it correctly too.
gst::element_imp_error!(self, gst::StreamError::Format, ["PCR discontinuity"]);
return Err(gst::FlowError::Error);
}
gst::ProxyPad::chain_list_default(pad, Some(&*self.obj()), bufferlist)
}
}