From 6a8f1bdc618479a57bb26dedc415e745fd025a8e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Dr=C3=B6ge?= Date: Sat, 7 Dec 2024 11:39:12 +0200 Subject: [PATCH] mpegtslivesrc: Parse PES packets and check for reasonable PTS/DTS Part-of: --- net/mpegtslive/src/mpegtslive/imp.rs | 173 ++++++++++++++++++-- net/mpegtslive/src/mpegtslive/parser.rs | 203 +++++++++++++++++++++++- 2 files changed, 359 insertions(+), 17 deletions(-) diff --git a/net/mpegtslive/src/mpegtslive/imp.rs b/net/mpegtslive/src/mpegtslive/imp.rs index 255a74a6..0d167a59 100644 --- a/net/mpegtslive/src/mpegtslive/imp.rs +++ b/net/mpegtslive/src/mpegtslive/imp.rs @@ -28,10 +28,10 @@ use bitstream_io::{BigEndian, BitRead, BitReader}; use gst::{glib, prelude::*, subclass::prelude::*}; use std::{ + collections::BTreeMap, mem, ops::{Add, ControlFlow}, - sync::LazyLock, - sync::Mutex, + sync::{LazyLock, Mutex}, }; use super::parser::*; @@ -63,7 +63,7 @@ impl MpegTsPcr { fn new(value: u64) -> MpegTsPcr { MpegTsPcr { value: value % (Self::MAX + 1), - wraparound: value / (Self::MAX + 1), + wraparound: 1 + value / (Self::MAX + 1), } } @@ -126,8 +126,76 @@ impl MpegTsPcr { self.wraparound * (Self::MAX + 1) + self.value } - fn saturating_sub(self, other: MpegTsPcr) -> MpegTsPcr { - MpegTsPcr::new(self.to_units().saturating_sub(other.to_units())) + /// Calculates PTS relative to this PCR. + fn calculate_pts(self, imp: &MpegTsLiveSource, raw_pts: u64) -> Option { + // PTS and PCR wrap around at the same time as both are + // stored as 90kHz 33 bit value, with the PCR being extended + // by 8 bit 1/300 units which brings it to 27MHz. + // + // As such the same wraparound counter can be applied to the PTS + // for comparison purposes + + let pts = gst::ClockTime::from_nseconds( + raw_pts + .mul_div_floor(100_000, 9) + .expect("failed to convert"), + ); + + let pcr_offset = gst::ClockTime::from_nseconds( + (self.wraparound * (MpegTsPcr::MAX + 1)) + .mul_div_floor(1000, 27) + .expect("failed to convert"), + ); + let pts = pts + pcr_offset; + + let pcr = gst::ClockTime::from(self); + + let absdiff = pts.absdiff(pcr); + // Fast paths, no wraparounds and close to the PCR as it should (< 1s is required by T-STD) + let threshold = gst::ClockTime::from_mseconds(1500); + if absdiff <= threshold { + return Some(pts); + } + + // Three options now + + let pcr_wraparound = + gst::ClockTime::from_nseconds((MpegTsPcr::MAX + 1).mul_div_ceil(1000, 27).unwrap()); + + // 1) PTS has wrapped around already but PCR has not + if pts < pcr { + let pts = pts + pcr_wraparound; + if pts >= pcr && pts - pcr <= threshold { + return Some(pcr); + } + } + + // 2) PCR has wrapped around already but PTS has not + if pts > pcr { + let pts = pts - pcr_wraparound; + if pts <= pcr && pcr - pts <= threshold { + return Some(pcr); + } + } + + // 3) PTS makes no sense in relation to PCR + gst::warning!( + CAT, + imp = imp, + "PTS {} too far from last PCR {}", + gst::ClockTime::from_nseconds( + raw_pts + .mul_div_floor(100_000, 9) + .expect("failed to convert") + ), + gst::ClockTime::from_nseconds( + self.value + .mul_div_floor(1000, 27) + .expect("failed to convert") + ), + ); + + None } } @@ -161,6 +229,11 @@ impl From for MpegTsPcr { } } +#[derive(Default)] +struct Stream { + pes_parser: PESParser, +} + #[derive(Default)] struct State { // Controlled source element @@ -185,6 +258,8 @@ struct State { pmt_parser: SectionParser, // Currently selected PMT pmt: Option, + // Streams of currently selected PMT + streams: BTreeMap, } impl State { @@ -213,7 +288,10 @@ impl State { gst::trace!( CAT, imp = imp, - "pcr:{pcr}, observation_internal:{observation_internal}" + "pcr:{pcr} ({}), observation_internal:{observation_internal}", + gst::ClockTime::from_nseconds( + pcr.mul_div_floor(1000, 27).expect("failed to convert") + ), ); let mut handled_pcr = MpegTsPcr::new_with_reference(imp, pcr, &last_seen_pcr); @@ -229,8 +307,9 @@ impl State { cnum, cdenom, ); - let observation_external = - gst::ClockTime::from(new_pcr.saturating_sub(base_pcr)) + base_external; + let observation_external = gst::ClockTime::from(new_pcr) + .saturating_sub(gst::ClockTime::from(base_pcr)) + + base_external; if expected_external.absdiff(observation_external) >= gst::ClockTime::SECOND { gst::warning!( CAT, @@ -249,11 +328,13 @@ impl State { imp = imp, "Adding new observation internal: {} -> external: {}", observation_internal, - gst::ClockTime::from(new_pcr.saturating_sub(base_pcr)) + base_external, + gst::ClockTime::from(new_pcr).saturating_sub(gst::ClockTime::from(base_pcr)) + + base_external, ); imp.external_clock.add_observation( observation_internal, - gst::ClockTime::from(new_pcr.saturating_sub(base_pcr)) + base_external, + gst::ClockTime::from(new_pcr).saturating_sub(gst::ClockTime::from(base_pcr)) + + base_external, ); } else { let (cinternal, cexternal, cnum, cdenom) = imp.external_clock.calibration(); @@ -267,7 +348,10 @@ impl State { gst::warning!( CAT, imp = imp, - "DISCONT detected, Picking new reference times (pcr:{pcr:#?}, observation_internal:{observation_internal}, base_external:{base_external}", + "DISCONT detected, Picking new reference times (pcr:{pcr} ({}), observation_internal:{observation_internal}, base_external:{base_external}", + gst::ClockTime::from_nseconds( + pcr.mul_div_floor(1000, 27).expect("failed to convert") + ), ); new_pcr = MpegTsPcr::new(pcr); self.base_pcr = Some(new_pcr); @@ -284,7 +368,10 @@ impl State { gst::debug!( CAT, imp = imp, - "Picking initial reference times (pcr:{pcr:#?}, observation_internal:{observation_internal}" + "Picking initial reference times (pcr:{pcr} ({}), observation_internal:{observation_internal}", + gst::ClockTime::from_nseconds( + pcr.mul_div_floor(1000, 27).expect("failed to convert") + ), ); new_pcr = MpegTsPcr::new(pcr); self.base_pcr = Some(new_pcr); @@ -341,6 +428,7 @@ impl State { self.pat = Some(selected_pat.clone()); self.pmt_parser.clear(); self.pmt = None; + self.streams.clear(); self.last_seen_pcr = None; } } @@ -378,6 +466,10 @@ impl State { && self.pmt.as_ref() != Some(&pmt) { gst::trace!(CAT, imp = imp, "Selecting PCR PID {}", pmt.pcr_pid); + self.streams.clear(); + for pid in &pmt.elementary_pids { + self.streams.insert(*pid, Stream::default()); + } self.pmt = Some(pmt); self.last_seen_pcr = None; } @@ -459,6 +551,55 @@ impl State { || self.pat.as_ref().map(|pat| pat.program_map_pid) == Some(header.pid) { self.handle_section(imp, &header, new_payload)?; + } else if let Some(stream) = self.streams.get_mut(&header.pid) { + stream.pes_parser.push(&header, new_payload); + + loop { + match stream.pes_parser.parse() { + Ok(Some((_pes_header, optional_pes_header))) => { + if let Some((raw_pts, last_seen_pcr)) = Option::zip( + optional_pes_header.and_then(|o| o.pts), + self.last_seen_pcr, + ) { + let pts = last_seen_pcr.calculate_pts(imp, raw_pts); + if let Some(pts) = pts { + gst::trace!( + CAT, + imp = imp, + "Got PES packet for PID {} with PTS {}", + header.pid, + pts.into_positive() + - gst::ClockTime::from(MpegTsPcr::new(0)), + ); + } else { + gst::warning!( + CAT, + imp = imp, + "DISCONT detected in PES PTS for PID {}, forwarding discont", + header.pid, + ); + + // We do not reset the PCR observations here but only + // forward a discontinuity downstream so the demuxer does + // not output any of these packets as they would have invalid + // timestamps + self.discont_pending = true; + } + } + } + Ok(None) => break, + Err(err) => { + dbg!(&header); + gst::warning!( + CAT, + imp = imp, + "Failed parsing PES packet for PID {}: {err:?}", + header.pid + ); + break; + } + } + } } // Skip everything else @@ -845,22 +986,22 @@ mod tests { // Smallest value let pcr = MpegTsPcr::new(0); assert_eq!(pcr.value, 0); - assert_eq!(pcr.wraparound, 0); + assert_eq!(pcr.wraparound, 1); // Biggest (non-wrapped) value let mut pcr = MpegTsPcr::new(MpegTsPcr::MAX); assert_eq!(pcr.value, MpegTsPcr::MAX); - assert_eq!(pcr.wraparound, 0); + assert_eq!(pcr.wraparound, 1); // a 33bit value overflows into 0 pcr = MpegTsPcr::new((1u64 << 33) * 300); assert_eq!(pcr.value, 0); - assert_eq!(pcr.wraparound, 1); + assert_eq!(pcr.wraparound, 2); // Adding one to biggest value overflows pcr = MpegTsPcr::new(MpegTsPcr::MAX + 1); assert_eq!(pcr.value, 0); - assert_eq!(pcr.wraparound, 1); + assert_eq!(pcr.wraparound, 2); } #[test] diff --git a/net/mpegtslive/src/mpegtslive/parser.rs b/net/mpegtslive/src/mpegtslive/parser.rs index c10571f2..b8c570df 100644 --- a/net/mpegtslive/src/mpegtslive/parser.rs +++ b/net/mpegtslive/src/mpegtslive/parser.rs @@ -9,7 +9,9 @@ #![allow(unused)] use anyhow::{bail, Context, Result}; -use bitstream_io::{BigEndian, BitRead, BitReader, FromBitStream}; +use bitstream_io::{ + BigEndian, BitRead, BitReader, ByteRead, ByteReader, FromBitStream, FromByteStream, +}; use smallvec::SmallVec; pub struct SectionParser { @@ -388,6 +390,205 @@ impl FromBitStream for ProgramMappingTable { } } +#[derive(Debug)] +pub struct PESParser { + /// Current value of the continuity counter + cc: Option, + /// Pending PES data + pending: Vec, + /// If we skip data until the next PUSI + waiting_for_pusi: bool, +} + +impl Default for PESParser { + fn default() -> Self { + Self { + cc: None, + pending: Vec::new(), + waiting_for_pusi: true, + } + } +} + +impl PESParser { + /// Push PES `payload`. + /// + /// After this call `parse()` until `None` is returned. + pub fn push(&mut self, header: &PacketHeader, payload: &[u8]) { + if header.pusi { + self.clear(); + } else if self.cc.map_or(true, |cc| (cc + 1) & 0xf != header.cc) { + self.clear(); + self.waiting_for_pusi = true; + // Not start of a payload and we didn't see the start, just return + return; + } else if self.waiting_for_pusi { + // Not start of a payload and we didn't see the start, just return + return; + } + self.cc = Some(header.cc); + + // Store payload for parsing, in case it's split over multiple packets + if header.pusi { + self.waiting_for_pusi = false; + } + self.pending.extend_from_slice(payload); + } + + /// Parse PES payload that is currently queued up. + /// + /// Call until `None` is returned, which means that more data is required to continue parsing. + /// + /// It is safe to call this again after an error. + pub fn parse(&mut self) -> Result)>> { + match self.parse_internal() { + Ok(res) => Ok(res), + Err(err) => { + self.clear(); + Err(err) + } + } + } + + fn parse_internal(&mut self) -> Result)>> { + // No payload to handle right now + if self.pending.is_empty() { + return Ok(None); + } + + let payload = self.pending.as_slice(); + + // Size of PES header + if payload.len() < 3 { + return Ok(None); + } + + let mut reader = ByteReader::endian(payload, BigEndian); + let header = reader.parse::().context("PES header")?; + + // Stream IDs without optional PES header + if header.stream_id == 0xbc + || header.stream_id == 0xbe + || header.stream_id == 0xbf + || (0xf0..=0xf2).contains(&header.stream_id) + || header.stream_id == 0xff + || header.stream_id == 0xf8 + { + // We only care about the header so stop parsing now + self.pending.clear(); + self.waiting_for_pusi = true; + return Ok(Some((header, None))); + } + + // Size of PES header + size of optional PES header + if payload.len() < 6 { + return Ok(None); + } + + let optional_pes_header_flags = + reader.read::().context("optional_pes_header_flags")?; + if optional_pes_header_flags & 0b1100_0000_0000_0000 != 0b1000_0000_0000_0000 { + bail!("Missing marker bits"); + } + if optional_pes_header_flags & 0b0000_0000_1100_0000 == 0b0000_0000_0100_0000 { + bail!("DTS without PTS is forbidden"); + } + + let optional_pes_header_length = + reader.read::().context("optional_pes_header_length")?; + + let remaining_length = reader.reader().len(); + if remaining_length < optional_pes_header_length as usize { + return Ok(None); + } + + fn read_pes_ts(r: &mut R) -> std::result::Result { + let mut ts = [0u8; 5]; + r.read_bytes(&mut ts).context("pes_ts")?; + + if ts[0] & 0x01 != 0x01 || ts[2] & 0x01 != 0x01 || ts[4] & 0x01 != 0x01 { + bail!("lost PTS sync"); + } + + let ts = ((ts[0] as u64 & 0x0e) << 29) + | ((ts[1] as u64) << 22) + | ((ts[2] as u64 & 0xfe) << 14) + | ((ts[3] as u64) << 7) + | ((ts[4] as u64 & 0xfe) >> 1); + + Ok(ts) + } + + let pts = if optional_pes_header_flags & 0b0000_0000_1000_0000 != 0 { + let pts = read_pes_ts(&mut reader).context("pts")?; + Some(pts) + } else { + None + }; + + let dts = if optional_pes_header_flags & 0b0000_0000_0100_0000 != 0 { + let dts = read_pes_ts(&mut reader).context("dts")?; + Some(dts) + } else { + None + }; + + let optional_pes_header = OptionalPESHeader { + flags: optional_pes_header_flags, + length: optional_pes_header_length, + pts, + dts, + }; + + // We only care about the header so stop parsing now + self.pending.clear(); + self.waiting_for_pusi = true; + + Ok(Some((header, Some(optional_pes_header)))) + } + + pub fn clear(&mut self) { + self.cc = None; + self.pending.clear(); + self.waiting_for_pusi = true; + } +} + +#[derive(Debug, Clone)] +pub struct PESHeader { + pub stream_id: u8, + pub length: u16, +} + +impl FromByteStream for PESHeader { + type Error = anyhow::Error; + + fn from_reader(r: &mut R) -> std::result::Result + where + Self: Sized, + { + let start_code = r.read::().context("packet_start_code")?; + + if start_code >> 8 != 0x00_00_01 { + bail!("Lost sync"); + } + + let stream_id = (start_code & 0xff) as u8; + let length = r.read::().context("packet_length")?; + + Ok(PESHeader { stream_id, length }) + } +} + +#[derive(Debug, Clone)] +pub struct OptionalPESHeader { + pub flags: u16, + pub length: u8, + pub pts: Option, + pub dts: Option, + // Add other fields as needed +} + #[derive(Debug, Clone)] pub struct PacketHeader { pub tei: bool,