mpegtslivesrc: Parse PES packets and check for reasonable PTS/DTS

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1977>
This commit is contained in:
Sebastian Dröge 2024-12-07 11:39:12 +02:00 committed by GStreamer Marge Bot
parent 44978159a3
commit 6a8f1bdc61
2 changed files with 359 additions and 17 deletions

View file

@ -28,10 +28,10 @@ use bitstream_io::{BigEndian, BitRead, BitReader};
use gst::{glib, prelude::*, subclass::prelude::*};
use std::{
collections::BTreeMap,
mem,
ops::{Add, ControlFlow},
sync::LazyLock,
sync::Mutex,
sync::{LazyLock, Mutex},
};
use super::parser::*;
@ -63,7 +63,7 @@ impl MpegTsPcr {
fn new(value: u64) -> MpegTsPcr {
MpegTsPcr {
value: value % (Self::MAX + 1),
wraparound: value / (Self::MAX + 1),
wraparound: 1 + value / (Self::MAX + 1),
}
}
@ -126,8 +126,76 @@ impl MpegTsPcr {
self.wraparound * (Self::MAX + 1) + self.value
}
fn saturating_sub(self, other: MpegTsPcr) -> MpegTsPcr {
MpegTsPcr::new(self.to_units().saturating_sub(other.to_units()))
/// Calculates PTS relative to this PCR.
fn calculate_pts(self, imp: &MpegTsLiveSource, raw_pts: u64) -> Option<gst::ClockTime> {
// PTS and PCR wrap around at the same time as both are
// stored as 90kHz 33 bit value, with the PCR being extended
// by 8 bit 1/300 units which brings it to 27MHz.
//
// As such the same wraparound counter can be applied to the PTS
// for comparison purposes
let pts = gst::ClockTime::from_nseconds(
raw_pts
.mul_div_floor(100_000, 9)
.expect("failed to convert"),
);
let pcr_offset = gst::ClockTime::from_nseconds(
(self.wraparound * (MpegTsPcr::MAX + 1))
.mul_div_floor(1000, 27)
.expect("failed to convert"),
);
let pts = pts + pcr_offset;
let pcr = gst::ClockTime::from(self);
let absdiff = pts.absdiff(pcr);
// Fast paths, no wraparounds and close to the PCR as it should (< 1s is required by T-STD)
let threshold = gst::ClockTime::from_mseconds(1500);
if absdiff <= threshold {
return Some(pts);
}
// Three options now
let pcr_wraparound =
gst::ClockTime::from_nseconds((MpegTsPcr::MAX + 1).mul_div_ceil(1000, 27).unwrap());
// 1) PTS has wrapped around already but PCR has not
if pts < pcr {
let pts = pts + pcr_wraparound;
if pts >= pcr && pts - pcr <= threshold {
return Some(pcr);
}
}
// 2) PCR has wrapped around already but PTS has not
if pts > pcr {
let pts = pts - pcr_wraparound;
if pts <= pcr && pcr - pts <= threshold {
return Some(pcr);
}
}
// 3) PTS makes no sense in relation to PCR
gst::warning!(
CAT,
imp = imp,
"PTS {} too far from last PCR {}",
gst::ClockTime::from_nseconds(
raw_pts
.mul_div_floor(100_000, 9)
.expect("failed to convert")
),
gst::ClockTime::from_nseconds(
self.value
.mul_div_floor(1000, 27)
.expect("failed to convert")
),
);
None
}
}
@ -161,6 +229,11 @@ impl From<gst::ClockTime> for MpegTsPcr {
}
}
#[derive(Default)]
struct Stream {
pes_parser: PESParser,
}
#[derive(Default)]
struct State {
// Controlled source element
@ -185,6 +258,8 @@ struct State {
pmt_parser: SectionParser,
// Currently selected PMT
pmt: Option<ProgramMappingTable>,
// Streams of currently selected PMT
streams: BTreeMap<u16, Stream>,
}
impl State {
@ -213,7 +288,10 @@ impl State {
gst::trace!(
CAT,
imp = imp,
"pcr:{pcr}, observation_internal:{observation_internal}"
"pcr:{pcr} ({}), observation_internal:{observation_internal}",
gst::ClockTime::from_nseconds(
pcr.mul_div_floor(1000, 27).expect("failed to convert")
),
);
let mut handled_pcr = MpegTsPcr::new_with_reference(imp, pcr, &last_seen_pcr);
@ -229,8 +307,9 @@ impl State {
cnum,
cdenom,
);
let observation_external =
gst::ClockTime::from(new_pcr.saturating_sub(base_pcr)) + base_external;
let observation_external = gst::ClockTime::from(new_pcr)
.saturating_sub(gst::ClockTime::from(base_pcr))
+ base_external;
if expected_external.absdiff(observation_external) >= gst::ClockTime::SECOND {
gst::warning!(
CAT,
@ -249,11 +328,13 @@ impl State {
imp = imp,
"Adding new observation internal: {} -> external: {}",
observation_internal,
gst::ClockTime::from(new_pcr.saturating_sub(base_pcr)) + base_external,
gst::ClockTime::from(new_pcr).saturating_sub(gst::ClockTime::from(base_pcr))
+ base_external,
);
imp.external_clock.add_observation(
observation_internal,
gst::ClockTime::from(new_pcr.saturating_sub(base_pcr)) + base_external,
gst::ClockTime::from(new_pcr).saturating_sub(gst::ClockTime::from(base_pcr))
+ base_external,
);
} else {
let (cinternal, cexternal, cnum, cdenom) = imp.external_clock.calibration();
@ -267,7 +348,10 @@ impl State {
gst::warning!(
CAT,
imp = imp,
"DISCONT detected, Picking new reference times (pcr:{pcr:#?}, observation_internal:{observation_internal}, base_external:{base_external}",
"DISCONT detected, Picking new reference times (pcr:{pcr} ({}), observation_internal:{observation_internal}, base_external:{base_external}",
gst::ClockTime::from_nseconds(
pcr.mul_div_floor(1000, 27).expect("failed to convert")
),
);
new_pcr = MpegTsPcr::new(pcr);
self.base_pcr = Some(new_pcr);
@ -284,7 +368,10 @@ impl State {
gst::debug!(
CAT,
imp = imp,
"Picking initial reference times (pcr:{pcr:#?}, observation_internal:{observation_internal}"
"Picking initial reference times (pcr:{pcr} ({}), observation_internal:{observation_internal}",
gst::ClockTime::from_nseconds(
pcr.mul_div_floor(1000, 27).expect("failed to convert")
),
);
new_pcr = MpegTsPcr::new(pcr);
self.base_pcr = Some(new_pcr);
@ -341,6 +428,7 @@ impl State {
self.pat = Some(selected_pat.clone());
self.pmt_parser.clear();
self.pmt = None;
self.streams.clear();
self.last_seen_pcr = None;
}
}
@ -378,6 +466,10 @@ impl State {
&& self.pmt.as_ref() != Some(&pmt)
{
gst::trace!(CAT, imp = imp, "Selecting PCR PID {}", pmt.pcr_pid);
self.streams.clear();
for pid in &pmt.elementary_pids {
self.streams.insert(*pid, Stream::default());
}
self.pmt = Some(pmt);
self.last_seen_pcr = None;
}
@ -459,6 +551,55 @@ impl State {
|| self.pat.as_ref().map(|pat| pat.program_map_pid) == Some(header.pid)
{
self.handle_section(imp, &header, new_payload)?;
} else if let Some(stream) = self.streams.get_mut(&header.pid) {
stream.pes_parser.push(&header, new_payload);
loop {
match stream.pes_parser.parse() {
Ok(Some((_pes_header, optional_pes_header))) => {
if let Some((raw_pts, last_seen_pcr)) = Option::zip(
optional_pes_header.and_then(|o| o.pts),
self.last_seen_pcr,
) {
let pts = last_seen_pcr.calculate_pts(imp, raw_pts);
if let Some(pts) = pts {
gst::trace!(
CAT,
imp = imp,
"Got PES packet for PID {} with PTS {}",
header.pid,
pts.into_positive()
- gst::ClockTime::from(MpegTsPcr::new(0)),
);
} else {
gst::warning!(
CAT,
imp = imp,
"DISCONT detected in PES PTS for PID {}, forwarding discont",
header.pid,
);
// We do not reset the PCR observations here but only
// forward a discontinuity downstream so the demuxer does
// not output any of these packets as they would have invalid
// timestamps
self.discont_pending = true;
}
}
}
Ok(None) => break,
Err(err) => {
dbg!(&header);
gst::warning!(
CAT,
imp = imp,
"Failed parsing PES packet for PID {}: {err:?}",
header.pid
);
break;
}
}
}
}
// Skip everything else
@ -845,22 +986,22 @@ mod tests {
// Smallest value
let pcr = MpegTsPcr::new(0);
assert_eq!(pcr.value, 0);
assert_eq!(pcr.wraparound, 0);
assert_eq!(pcr.wraparound, 1);
// Biggest (non-wrapped) value
let mut pcr = MpegTsPcr::new(MpegTsPcr::MAX);
assert_eq!(pcr.value, MpegTsPcr::MAX);
assert_eq!(pcr.wraparound, 0);
assert_eq!(pcr.wraparound, 1);
// a 33bit value overflows into 0
pcr = MpegTsPcr::new((1u64 << 33) * 300);
assert_eq!(pcr.value, 0);
assert_eq!(pcr.wraparound, 1);
assert_eq!(pcr.wraparound, 2);
// Adding one to biggest value overflows
pcr = MpegTsPcr::new(MpegTsPcr::MAX + 1);
assert_eq!(pcr.value, 0);
assert_eq!(pcr.wraparound, 1);
assert_eq!(pcr.wraparound, 2);
}
#[test]

View file

@ -9,7 +9,9 @@
#![allow(unused)]
use anyhow::{bail, Context, Result};
use bitstream_io::{BigEndian, BitRead, BitReader, FromBitStream};
use bitstream_io::{
BigEndian, BitRead, BitReader, ByteRead, ByteReader, FromBitStream, FromByteStream,
};
use smallvec::SmallVec;
pub struct SectionParser {
@ -388,6 +390,205 @@ impl FromBitStream for ProgramMappingTable {
}
}
#[derive(Debug)]
pub struct PESParser {
/// Current value of the continuity counter
cc: Option<u8>,
/// Pending PES data
pending: Vec<u8>,
/// If we skip data until the next PUSI
waiting_for_pusi: bool,
}
impl Default for PESParser {
fn default() -> Self {
Self {
cc: None,
pending: Vec::new(),
waiting_for_pusi: true,
}
}
}
impl PESParser {
/// Push PES `payload`.
///
/// After this call `parse()` until `None` is returned.
pub fn push(&mut self, header: &PacketHeader, payload: &[u8]) {
if header.pusi {
self.clear();
} else if self.cc.map_or(true, |cc| (cc + 1) & 0xf != header.cc) {
self.clear();
self.waiting_for_pusi = true;
// Not start of a payload and we didn't see the start, just return
return;
} else if self.waiting_for_pusi {
// Not start of a payload and we didn't see the start, just return
return;
}
self.cc = Some(header.cc);
// Store payload for parsing, in case it's split over multiple packets
if header.pusi {
self.waiting_for_pusi = false;
}
self.pending.extend_from_slice(payload);
}
/// Parse PES payload that is currently queued up.
///
/// Call until `None` is returned, which means that more data is required to continue parsing.
///
/// It is safe to call this again after an error.
pub fn parse(&mut self) -> Result<Option<(PESHeader, Option<OptionalPESHeader>)>> {
match self.parse_internal() {
Ok(res) => Ok(res),
Err(err) => {
self.clear();
Err(err)
}
}
}
fn parse_internal(&mut self) -> Result<Option<(PESHeader, Option<OptionalPESHeader>)>> {
// No payload to handle right now
if self.pending.is_empty() {
return Ok(None);
}
let payload = self.pending.as_slice();
// Size of PES header
if payload.len() < 3 {
return Ok(None);
}
let mut reader = ByteReader::endian(payload, BigEndian);
let header = reader.parse::<PESHeader>().context("PES header")?;
// Stream IDs without optional PES header
if header.stream_id == 0xbc
|| header.stream_id == 0xbe
|| header.stream_id == 0xbf
|| (0xf0..=0xf2).contains(&header.stream_id)
|| header.stream_id == 0xff
|| header.stream_id == 0xf8
{
// We only care about the header so stop parsing now
self.pending.clear();
self.waiting_for_pusi = true;
return Ok(Some((header, None)));
}
// Size of PES header + size of optional PES header
if payload.len() < 6 {
return Ok(None);
}
let optional_pes_header_flags =
reader.read::<u16>().context("optional_pes_header_flags")?;
if optional_pes_header_flags & 0b1100_0000_0000_0000 != 0b1000_0000_0000_0000 {
bail!("Missing marker bits");
}
if optional_pes_header_flags & 0b0000_0000_1100_0000 == 0b0000_0000_0100_0000 {
bail!("DTS without PTS is forbidden");
}
let optional_pes_header_length =
reader.read::<u8>().context("optional_pes_header_length")?;
let remaining_length = reader.reader().len();
if remaining_length < optional_pes_header_length as usize {
return Ok(None);
}
fn read_pes_ts<R: ByteRead + ?Sized>(r: &mut R) -> std::result::Result<u64, anyhow::Error> {
let mut ts = [0u8; 5];
r.read_bytes(&mut ts).context("pes_ts")?;
if ts[0] & 0x01 != 0x01 || ts[2] & 0x01 != 0x01 || ts[4] & 0x01 != 0x01 {
bail!("lost PTS sync");
}
let ts = ((ts[0] as u64 & 0x0e) << 29)
| ((ts[1] as u64) << 22)
| ((ts[2] as u64 & 0xfe) << 14)
| ((ts[3] as u64) << 7)
| ((ts[4] as u64 & 0xfe) >> 1);
Ok(ts)
}
let pts = if optional_pes_header_flags & 0b0000_0000_1000_0000 != 0 {
let pts = read_pes_ts(&mut reader).context("pts")?;
Some(pts)
} else {
None
};
let dts = if optional_pes_header_flags & 0b0000_0000_0100_0000 != 0 {
let dts = read_pes_ts(&mut reader).context("dts")?;
Some(dts)
} else {
None
};
let optional_pes_header = OptionalPESHeader {
flags: optional_pes_header_flags,
length: optional_pes_header_length,
pts,
dts,
};
// We only care about the header so stop parsing now
self.pending.clear();
self.waiting_for_pusi = true;
Ok(Some((header, Some(optional_pes_header))))
}
pub fn clear(&mut self) {
self.cc = None;
self.pending.clear();
self.waiting_for_pusi = true;
}
}
#[derive(Debug, Clone)]
pub struct PESHeader {
pub stream_id: u8,
pub length: u16,
}
impl FromByteStream for PESHeader {
type Error = anyhow::Error;
fn from_reader<R: ByteRead + ?Sized>(r: &mut R) -> std::result::Result<Self, Self::Error>
where
Self: Sized,
{
let start_code = r.read::<u32>().context("packet_start_code")?;
if start_code >> 8 != 0x00_00_01 {
bail!("Lost sync");
}
let stream_id = (start_code & 0xff) as u8;
let length = r.read::<u16>().context("packet_length")?;
Ok(PESHeader { stream_id, length })
}
}
#[derive(Debug, Clone)]
pub struct OptionalPESHeader {
pub flags: u16,
pub length: u8,
pub pts: Option<u64>,
pub dts: Option<u64>,
// Add other fields as needed
}
#[derive(Debug, Clone)]
pub struct PacketHeader {
pub tei: bool,