diff --git a/net/mpegtslive/src/mpegtslive/imp.rs b/net/mpegtslive/src/mpegtslive/imp.rs index 504d39fc..960b4055 100644 --- a/net/mpegtslive/src/mpegtslive/imp.rs +++ b/net/mpegtslive/src/mpegtslive/imp.rs @@ -165,9 +165,10 @@ struct MpegTSLiveSourceState { } impl MpegTSLiveSourceState { - fn store_observation(&mut self, pcr: u64, monotonic_time: gst::ClockTime) { - // Grab time of our clock and controlled clock - + /// Grab time of our clock and controlled clock + /// + /// Returns `true` on PCR discontinuities. + fn store_observation(&mut self, pcr: u64, monotonic_time: gst::ClockTime) -> bool { // If this is the first PCR we observe: // * Remember the PCR *and* the associated monotonic clock value when capture // * `base_pcr` `base_monotonic` @@ -178,6 +179,8 @@ impl MpegTSLiveSourceState { // * Store (observation_monotonic, buffer_pts) let new_pcr: MpegTsPcr; + let mut discont = false; + if let (Some(base_pcr), Some(base_monotonic), Some(last_seen_pcr)) = (self.base_pcr, self.base_monotonic, self.last_seen_pcr) { @@ -195,18 +198,24 @@ impl MpegTSLiveSourceState { monotonic_time, ); } else { - gst::debug!(CAT, "DISCONT detected, Picking new reference times (pcr:{pcr:#?}, monotonic:{monotonic_time}"); + gst::error!(CAT, "DISCONT detected, Picking new reference times (pcr:{pcr:#?}, monotonic:{monotonic_time}"); new_pcr = MpegTsPcr::new(pcr); self.base_pcr = Some(new_pcr); self.base_monotonic = Some(monotonic_time); + discont = true; } } else { - gst::debug!(CAT, "Picking new reference times (pcr:{pcr:#?}, monotonic:{monotonic_time} for discontinuity"); + gst::debug!( + CAT, + "Picking initial reference times (pcr:{pcr:#?}, monotonic:{monotonic_time}" + ); new_pcr = MpegTsPcr::new(pcr); self.base_pcr = Some(new_pcr); self.base_monotonic = Some(monotonic_time); } self.last_seen_pcr = Some(new_pcr); + + discont } } @@ -309,7 +318,13 @@ impl MpegTsLiveSource { if let (Some(monotonic_time), Some(raw_pcr)) = (monotonic_time, get_pcr_from_buffer(self, &buffer)) { - state.store_observation(raw_pcr, monotonic_time) + if state.store_observation(raw_pcr, monotonic_time) { + // FIXME: Handle PCR discontinuities more gracefully. This requires replacing the + // clock and making sure that the application selects the new clock, and making + // sure that tsdemux detects it correctly too. + gst::element_imp_error!(self, gst::StreamError::Format, ["PCR discontinuity"]); + return Err(gst::FlowError::Error); + } }; // Update buffer timestamp if present @@ -338,6 +353,7 @@ impl MpegTsLiveSource { // The last monotonic time let mut monotonic_time = None; + let mut discont = false; bufferlist.make_mut().foreach_mut(|mut buffer, _idx| { let this_buffer_timestamp = buffer.dts_or_pts(); @@ -353,7 +369,7 @@ impl MpegTsLiveSource { if let (Some(monotonic_time), Some(raw_pcr)) = (monotonic_time, get_pcr_from_buffer(self, &buffer)) { - state.store_observation(raw_pcr, monotonic_time) + discont = state.store_observation(raw_pcr, monotonic_time); }; // Update buffer timestamp if present @@ -370,6 +386,15 @@ impl MpegTsLiveSource { }; ControlFlow::Continue(Some(buffer)) }); + + if discont { + // FIXME: Handle PCR discontinuities more gracefully. This requires replacing the + // clock and making sure that the application selects the new clock, and making + // sure that tsdemux detects it correctly too. + gst::element_imp_error!(self, gst::StreamError::Format, ["PCR discontinuity"]); + return Err(gst::FlowError::Error); + } + gst::ProxyPad::chain_list_default(pad, Some(&*self.obj()), bufferlist) } }