From 632fe83be36c2556e407cce53e585a46803900f1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Dr=C3=B6ge?= Date: Mon, 17 Feb 2025 14:22:27 +0200 Subject: [PATCH] mpegtslivesrc: Take adaptation field discontinuity flag into account Fixes https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/issues/653 Part-of: --- net/mpegtslive/src/mpegtslive/imp.rs | 45 +++++++++++++++++++++---- net/mpegtslive/src/mpegtslive/parser.rs | 31 ++++++++++++++--- 2 files changed, 66 insertions(+), 10 deletions(-) diff --git a/net/mpegtslive/src/mpegtslive/imp.rs b/net/mpegtslive/src/mpegtslive/imp.rs index 77e72d414..292d0fc60 100644 --- a/net/mpegtslive/src/mpegtslive/imp.rs +++ b/net/mpegtslive/src/mpegtslive/imp.rs @@ -401,11 +401,12 @@ impl State { &mut self, imp: &MpegTsLiveSource, header: &PacketHeader, + adaptation_field: Option<&AdaptionField>, payload: &[u8], ) -> Result<()> { // Read PAT or our selected program's PMT if header.pid == 0x00_00 { - self.pat_parser.push(header, payload); + self.pat_parser.push(header, adaptation_field, payload); loop { match self.pat_parser.parse() { @@ -461,7 +462,7 @@ impl State { } } } else if self.pat.as_ref().map(|pat| pat.program_map_pid) == Some(header.pid) { - self.pmt_parser.push(header, payload); + self.pmt_parser.push(header, adaptation_field, payload); loop { match self.pmt_parser.parse() { @@ -528,6 +529,8 @@ impl State { return Ok(()); } + let mut adaptation_field = None; + // Read adaptation field if present if header.afc & 0x2 != 0 { let length = reader.read_to::().context("af_length")? as usize; @@ -543,10 +546,22 @@ impl State { // Parse adaption field and update PCR if it's the PID of our selected program if self.pmt.as_ref().map(|pmt| pmt.pcr_pid) == Some(header.pid) { let mut af_reader = BitReader::endian(af, BigEndian); - let adaptation_field = af_reader.parse::().context("af")?; + let af = af_reader.parse::().context("af")?; // PCR present - if let Some(pcr) = adaptation_field.pcr { + if let Some(pcr) = af.pcr { + if af.discontinuity_flag { + gst::debug!( + CAT, + imp = imp, + "Discontinuity signalled, resetting PCR observations" + ); + + self.base_pcr = None; + self.base_external = None; + self.last_seen_pcr = None; + } + if let Some(monotonic_time) = monotonic_time { self.store_observation(imp, pcr, monotonic_time); } else { @@ -556,7 +571,11 @@ impl State { "Can't handle PCR without packet capture time" ); } + } else if af.discontinuity_flag { + gst::debug!(CAT, imp = imp, "Discontinuity signalled"); } + + adaptation_field = Some(af); } } } @@ -568,9 +587,23 @@ impl State { if header.pid == 0x00_00 || self.pat.as_ref().map(|pat| pat.program_map_pid) == Some(header.pid) { - self.handle_section(imp, &header, new_payload)?; + self.handle_section(imp, &header, adaptation_field.as_ref(), new_payload)?; } else if let Some(stream) = self.streams.get_mut(&header.pid) { - stream.pes_parser.push(&header, new_payload); + if adaptation_field + .as_ref() + .is_some_and(|af| af.discontinuity_flag) + { + gst::debug!( + CAT, + imp = imp, + "Discontinuity signalled for PID {}, forwarding discont", + header.pid, + ); + } + + stream + .pes_parser + .push(&header, adaptation_field.as_ref(), new_payload); loop { match stream.pes_parser.parse() { diff --git a/net/mpegtslive/src/mpegtslive/parser.rs b/net/mpegtslive/src/mpegtslive/parser.rs index b8c570df7..a4b8338bc 100644 --- a/net/mpegtslive/src/mpegtslive/parser.rs +++ b/net/mpegtslive/src/mpegtslive/parser.rs @@ -40,9 +40,18 @@ impl SectionParser { /// Push PSI `payload`. /// /// After this call `parse()` until `None` is returned. - pub fn push(&mut self, header: &PacketHeader, payload: &[u8]) { + pub fn push( + &mut self, + header: &PacketHeader, + adaptation_field: Option<&AdaptionField>, + + payload: &[u8], + ) { if header.pusi { self.clear(); + } else if adaptation_field.is_some_and(|af| af.discontinuity_flag) { + // discontinuity_flag only defines that there is an expected discountinuity in the + // continuity counter but the actual data is continuous } else if self.cc.map_or(true, |cc| (cc + 1) & 0xf != header.cc) { self.clear(); self.waiting_for_pusi = true; @@ -414,9 +423,18 @@ impl PESParser { /// Push PES `payload`. /// /// After this call `parse()` until `None` is returned. - pub fn push(&mut self, header: &PacketHeader, payload: &[u8]) { + pub fn push( + &mut self, + header: &PacketHeader, + adaptation_field: Option<&AdaptionField>, + + payload: &[u8], + ) { if header.pusi { self.clear(); + } else if adaptation_field.is_some_and(|af| af.discontinuity_flag) { + // discontinuity_flag only defines that there is an expected discountinuity in the + // continuity counter but the actual data is continuous } else if self.cc.map_or(true, |cc| (cc + 1) & 0xf != header.cc) { self.clear(); self.waiting_for_pusi = true; @@ -634,6 +652,7 @@ impl FromBitStream for PacketHeader { #[derive(Debug, Clone)] pub struct AdaptionField { + pub discontinuity_flag: bool, pub pcr: Option, // Add other fields as needed } @@ -645,7 +664,8 @@ impl FromBitStream for AdaptionField { where Self: Sized, { - r.skip(3).context("flags")?; + let discontinuity_flag = r.read_bit().context("discontinuity_flag")?; + r.skip(2).context("flags")?; let pcr_present = r.read_bit().context("pcr_present")?; r.skip(4).context("flags")?; @@ -661,6 +681,9 @@ impl FromBitStream for AdaptionField { // Skip all other parts of the adaptation field for now - Ok(AdaptionField { pcr }) + Ok(AdaptionField { + discontinuity_flag, + pcr, + }) } }