From 41ddbd8706b6579bf4d4274f74a66ea4fa5229f8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Dr=C3=B6ge?= Date: Fri, 25 Oct 2024 13:41:06 +0300 Subject: [PATCH] mpegtslivesrc: Parse PAT/PMT and only handle PCRs from the first program This matches default behaviour of tsdemux and makes sure we're not jumping between different PCRs if there are multiple. At a later time, program selection could be implemented. Part-of: --- net/mpegtslive/src/mpegtslive/imp.rs | 688 +++++++++++++++++++++------ 1 file changed, 550 insertions(+), 138 deletions(-) diff --git a/net/mpegtslive/src/mpegtslive/imp.rs b/net/mpegtslive/src/mpegtslive/imp.rs index bb831aea..2cacca55 100644 --- a/net/mpegtslive/src/mpegtslive/imp.rs +++ b/net/mpegtslive/src/mpegtslive/imp.rs @@ -23,10 +23,11 @@ */ use anyhow::Context; use anyhow::{bail, Result}; -use bitstream_io::{BigEndian, BitRead, BitReader}; +use bitstream_io::{BigEndian, BitRead, BitReader, FromBitStream}; use gst::glib; use gst::prelude::*; use gst::subclass::prelude::*; +use std::mem; use std::ops::Add; use std::ops::ControlFlow; use std::sync::Mutex; @@ -158,31 +159,231 @@ impl From for MpegTsPcr { } } -struct MpegTSLiveSourceState { +#[derive(Default)] +struct State { // Controlled source element source: Option, - // Clock we control and expose - external_clock: gst::SystemClock, - // Last observed PCR (for handling wraparound) last_seen_pcr: Option, // First observed PCR and associated timestamp base_pcr: Option, base_monotonic: Option, + + // If the next outgoing packet should have the discont flag set + discont_pending: bool, + + // Continuity counter for PAT PID + pat_cc: Option, + // Pending PAT payload data from last PAT packet + pat_pending: Vec, + // Pending data starts on pointer field, otherwise on table header + pat_pending_pusi: bool, + // PID used for the PMT of the selected program + pmt_pid: Option, + // Program number of the selected program + pmt_program_num: Option, + // Continuity counter for PMT PID + pmt_cc: Option, + // Pending PMT payload data from last PMT packet + pmt_pending: Vec, + // Pending data starts on pointer fiel, otherwise on table header + pmt_pending_pusi: bool, + // PID used for the PCR of the selected program + pcr_pid: Option, } -impl MpegTSLiveSourceState { - /// Grab time of our clock and controlled clock - /// - /// Returns `true` on PCR discontinuities. +#[derive(Debug)] +#[allow(unused)] +struct PacketHeader { + tei: bool, + pusi: bool, + tp: bool, + pid: u16, + tsc: u8, + afc: u8, + cc: u8, +} + +impl FromBitStream for PacketHeader { + type Error = anyhow::Error; + + fn from_reader(r: &mut R) -> std::result::Result + where + Self: Sized, + { + if r.read_to::().context("sync_byte")? != 0x47 { + bail!("Lost sync"); + } + + let tei = r.read_bit().context("tei")?; + let pusi = r.read_bit().context("pusi")?; + let tp = r.read_bit().context("tp")?; + let pid = r.read::(13).context("pid")?; + + let tsc = r.read::(2).context("tsc")?; + let afc = r.read::(2).context("afc")?; + let cc = r.read::(4).context("cc")?; + + Ok(PacketHeader { + tei, + pusi, + tp, + pid, + tsc, + afc, + cc, + }) + } +} + +#[derive(Debug)] +struct AdaptionField { + pcr: Option, + // Add other fields as needed +} + +impl FromBitStream for AdaptionField { + type Error = anyhow::Error; + + fn from_reader(r: &mut R) -> std::result::Result + where + Self: Sized, + { + r.skip(3).context("flags")?; + let pcr_present = r.read_bit().context("pcr_present")?; + r.skip(4).context("flags")?; + + // PCR present + let pcr = if pcr_present { + let pcr = r.read::(33).context("pcr_base")? * 300; + r.skip(6).context("pcr_reserved")?; + let pcr = pcr + r.read::(9).context("pcr_extension")? % 300; + Some(pcr) + } else { + None + }; + + // Skip all other parts of the adaptation field for now + + Ok(AdaptionField { pcr }) + } +} + +#[derive(Debug)] +struct TableHeader { + table_id: u8, + section_syntax_indicator: bool, + section_length: u16, +} + +impl FromBitStream for TableHeader { + type Error = anyhow::Error; + + fn from_reader(r: &mut R) -> std::result::Result + where + Self: Sized, + { + let table_id = r.read_to::().context("table_id")?; + let section_syntax_indicator = r.read_bit().context("table_syntax_indicator")?; + r.skip(5).context("reserved")?; + let section_length = r.read::(10).context("section_length")?; + + Ok(TableHeader { + table_id, + section_syntax_indicator, + section_length, + }) + } +} + +#[derive(Debug)] +#[allow(unused)] +struct TableSyntaxSection { + table_id_extension: u16, + version_number: u8, + current_next_indicator: bool, + section_number: u8, + last_section_number: u8, +} + +impl FromBitStream for TableSyntaxSection { + type Error = anyhow::Error; + + fn from_reader(r: &mut R) -> std::result::Result + where + Self: Sized, + { + let table_id_extension = r.read_to::().context("table_id_extension")?; + r.skip(2).context("reserved")?; + let version_number = r.read::(5).context("version_number")?; + let current_next_indicator = r.read_bit().context("current_next_indicator")?; + let section_number = r.read_to::().context("section_number")?; + let last_section_number = r.read_to::().context("last_section_number")?; + + Ok(TableSyntaxSection { + table_id_extension, + version_number, + current_next_indicator, + section_number, + last_section_number, + }) + } +} + +#[derive(Debug)] +struct ProgramAccessTable { + program_num: u16, + program_map_pid: u16, +} + +impl FromBitStream for ProgramAccessTable { + type Error = anyhow::Error; + + fn from_reader(r: &mut R) -> std::result::Result + where + Self: Sized, + { + let program_num = r.read_to::().context("program_num")?; + r.skip(3).context("reserved")?; + let program_map_pid = r.read::(13).context("program_map_pid")?; + + Ok(ProgramAccessTable { + program_num, + program_map_pid, + }) + } +} + +#[derive(Debug)] +struct ProgramMappingTable { + pcr_pid: u16, + // Add other fields as needed +} + +impl FromBitStream for ProgramMappingTable { + type Error = anyhow::Error; + + fn from_reader(r: &mut R) -> std::result::Result + where + Self: Sized, + { + r.skip(3).context("reserved")?; + let pcr_pid = r.read::(13).context("pcr_pid")?; + + Ok(ProgramMappingTable { pcr_pid }) + } +} + +impl State { + /// Store PCR / monotonic time observation fn store_observation( &mut self, imp: &MpegTsLiveSource, pcr: u64, monotonic_time: gst::ClockTime, - ) -> bool { + ) { // If this is the first PCR we observe: // * Remember the PCR *and* the associated monotonic clock value when capture // * `base_pcr` `base_monotonic` @@ -193,7 +394,6 @@ impl MpegTSLiveSourceState { // * Store (observation_monotonic, buffer_pts) let new_pcr: MpegTsPcr; - let mut discont = false; if let (Some(base_pcr), Some(base_monotonic), Some(last_seen_pcr)) = (self.base_pcr, self.base_monotonic, self.last_seen_pcr) @@ -204,7 +404,7 @@ impl MpegTSLiveSourceState { if let Some(new_pcr) = handled_pcr { // First check if this is more than 1s off from the current clock calibration and // if so consider it a discontinuity too. - let (internal, external, num, denom) = self.external_clock.calibration(); + let (internal, external, num, denom) = imp.external_clock.calibration(); let expected_external = gst::Clock::adjust_with_calibration( monotonic_time, @@ -235,12 +435,12 @@ impl MpegTSLiveSourceState { gst::ClockTime::from(new_pcr.saturating_sub(base_pcr)) + base_monotonic, monotonic_time, ); - self.external_clock.add_observation( + imp.external_clock.add_observation( monotonic_time, gst::ClockTime::from(new_pcr.saturating_sub(base_pcr)) + base_monotonic, ); } else { - let (internal, external, num, denom) = self.external_clock.calibration(); + let (internal, external, num, denom) = imp.external_clock.calibration(); let scaled_monotonic = gst::Clock::adjust_with_calibration( monotonic_time, internal, @@ -248,11 +448,15 @@ impl MpegTSLiveSourceState { num, denom, ); - gst::warning!(CAT, imp = imp, "DISCONT detected, Picking new reference times (pcr:{pcr:#?}, monotonic:{monotonic_time}, scaled monotonic:{scaled_monotonic}"); + gst::warning!( + CAT, + imp = imp, + "DISCONT detected, Picking new reference times (pcr:{pcr:#?}, monotonic:{monotonic_time}, scaled monotonic:{scaled_monotonic}", + ); new_pcr = MpegTsPcr::new(pcr); self.base_pcr = Some(new_pcr); self.base_monotonic = Some(monotonic_time); - discont = true; + self.discont_pending = true; } } else { gst::debug!( @@ -263,10 +467,305 @@ impl MpegTSLiveSourceState { new_pcr = MpegTsPcr::new(pcr); self.base_pcr = Some(new_pcr); self.base_monotonic = Some(monotonic_time); + self.discont_pending = true; } self.last_seen_pcr = Some(new_pcr); + } - discont + /// Parses an MPEG-TS section and updates the internal state + fn handle_section( + &mut self, + imp: &MpegTsLiveSource, + header: &PacketHeader, + table_header: &TableHeader, + slice: &[u8], + ) -> Result<()> { + gst::trace!( + CAT, + imp = imp, + "Parsing section with header {table_header:?}" + ); + + // Skip non-PAT/PMT + if table_header.table_id != 0x00 && table_header.table_id != 0x02 + || !table_header.section_syntax_indicator + { + return Ok(()); + } + + let mut section_reader = BitReader::endian(slice, BigEndian); + + let table_syntax_section = section_reader + .parse::() + .context("section")?; + + gst::trace!( + CAT, + imp = imp, + "Parsing section with table syntax section {table_syntax_section:?}" + ); + + if header.pid == 0x00_00 && table_header.table_id == 0x00 { + // PAT + let remaining_length = section_reader.reader().unwrap().len(); + if remaining_length < 4 { + bail!("too short PAT"); + } + let n_pats = (remaining_length - 4) / 4; + if n_pats == 0 { + gst::warning!(CAT, imp = imp, "No programs in PAT"); + return Ok(()); + } + + let mut first = true; + let mut warned = false; + for idx in 0..n_pats { + let pat = section_reader + .parse::() + .context("pat")?; + gst::trace!(CAT, imp = imp, "Parsed PAT {idx}: {pat:?}"); + if pat.program_map_pid == 0 { + // Skip NIT + } else if first { + first = false; + // Our program we select + if Option::zip(self.pmt_pid, self.pmt_program_num) + .map_or(true, |(pid, prog_num)| { + pid != pat.program_map_pid || prog_num != pat.program_num + }) + { + gst::trace!( + CAT, + imp = imp, + "Selecting program with PID {} and program number {}", + pat.program_map_pid, + pat.program_num, + ); + self.pmt_pid = Some(pat.program_map_pid); + self.pmt_program_num = Some(pat.program_num); + self.pmt_pending.clear(); + self.pmt_cc = None; + self.pcr_pid = None; + self.last_seen_pcr = None; + } + } else { + // Other programs we ignore + if !warned { + gst::warning!( + CAT, + imp = imp, + "MPEG-TS stream with multiple programs - timing will be wrong for all but first program", + ); + warned = true; + } + } + } + } else if Some(header.pid) == self.pmt_pid + && Some(table_syntax_section.table_id_extension) == self.pmt_program_num + && table_header.table_id == 0x02 + { + // PMT + let pmt = section_reader + .parse::() + .context("pmt")?; + gst::trace!( + CAT, + imp = imp, + "Parsed PMT for selected program number {}: {pmt:?}", + table_syntax_section.table_id_extension + ); + if self.pcr_pid.map_or(true, |pcr_pid| pcr_pid != pmt.pcr_pid) { + self.pcr_pid = Some(pmt.pcr_pid); + self.last_seen_pcr = None; + } + } + + Ok(()) + } + + /// Parses an MPEG-TS packet and updates the internal state + fn handle_packet( + &mut self, + imp: &MpegTsLiveSource, + slice: &[u8], + monotonic_time: Option, + ) -> Result<()> { + let mut reader = BitReader::endian(slice, BigEndian); + + let header = reader.parse::().context("packet_header")?; + + // Skip corrupted packets + if header.tei { + return Ok(()); + } + + // Skip scrambled packets + if header.tsc != 0 { + return Ok(()); + } + + // Read adaptation field if present + if header.afc & 0x2 != 0 { + let length = reader.read_to::().context("af_length")? as usize; + let af = *reader.reader().unwrap(); + if af.len() < length { + bail!("too short adaptation field"); + } + let af = &af[..length]; + reader.skip(8 * length as u32).context("af")?; + + // Parse adaption field and update PCR if it's the PID of our selected program + if self.pcr_pid == Some(header.pid) { + let mut af_reader = BitReader::endian(af, BigEndian); + let adaptation_field = af_reader.parse::().context("af")?; + + // PCR present + if let Some(pcr) = adaptation_field.pcr { + if let Some(monotonic_time) = monotonic_time { + self.store_observation(imp, pcr, monotonic_time); + } else { + gst::warning!( + CAT, + imp = imp, + "Can't handle PCR without packet capture time" + ); + } + } + } + } + + // Read payload if payload if present + if header.afc & 0x1 != 0 { + let new_payload = *reader.reader().unwrap(); + + // Read PAT or our selected program's PMT + if header.pid == 0x00_00 || self.pmt_pid == Some(header.pid) { + let (cc, mut pending, pending_pusi) = if header.pid == 0x00_00 { + ( + &mut self.pat_cc, + mem::take(&mut self.pat_pending), + self.pat_pending_pusi, + ) + } else { + ( + &mut self.pmt_cc, + mem::take(&mut self.pmt_pending), + self.pmt_pending_pusi, + ) + }; + + // Clear any pending data if necessary + if header.pusi || cc.map_or(true, |cc| (cc + 1) & 0xf != header.cc) { + pending.clear(); + } + *cc = Some(header.cc); + + // Skip packet if this is not the start of a section + if !header.pusi && pending.is_empty() { + return Ok(()); + } + + // Store payload for parsing, in case it's split over multiple packets + pending.extend_from_slice(new_payload); + + // No payload + if pending.is_empty() { + return Ok(()); + } + + let payload = pending.as_slice(); + let mut pusi = header.pusi || pending_pusi; + let mut payload_reader = BitReader::endian(payload, BigEndian); + loop { + let remaining_payload = payload_reader.reader().unwrap(); + + let table_header; + if pusi { + assert!(!remaining_payload.is_empty()); + let pointer_field = remaining_payload[0] as usize; + // Need more data + if payload_reader.reader().unwrap().len() < 1 + pointer_field + 3 { + break; + } + + // Skip padding + payload_reader.skip(8 + 8 * pointer_field as u32).unwrap(); + pusi = false; + // Peek table header, payload_reader stays at beginning of section header + table_header = payload_reader.clone().parse::().unwrap(); + } else if remaining_payload.len() < 3 { + // Need more data for table header + break; + } else { + // Peek table header, payload_reader stays at beginning of section header + table_header = payload_reader.clone().parse::().unwrap(); + } + + // Need more data for this section. payload_reader is still at beginning of + // section header so require 3 extra bytes + let remaining_length = payload_reader.reader().unwrap().len(); + if remaining_length < 3 + table_header.section_length as usize { + break; + } + + // Skip table header + payload_reader.skip(8 * 3).unwrap(); + let section = + &payload_reader.reader().unwrap()[..table_header.section_length as usize]; + // Skip whole section so the reader is at the beginning of the next section header + payload_reader + .skip(8 * table_header.section_length as u32) + .unwrap(); + + if let Err(err) = self.handle_section(imp, &header, &table_header, section) { + gst::warning!( + CAT, + imp = imp, + "Failed parsing section {table_header:?}: {err:?}" + ); + } + } + + // Skip all already parsed sections + let remaining_length = payload_reader.reader().unwrap().len(); + let new_pending_range = (pending.len() - remaining_length)..pending.len(); + pending.copy_within(new_pending_range, 0); + pending.resize(remaining_length, 0u8); + if header.pid == 0x00_00 { + self.pat_pending = pending; + self.pat_pending_pusi = pusi; + } else { + self.pmt_pending = pending; + self.pmt_pending_pusi = pusi; + } + } + + // Skip everything else + } + + Ok(()) + } + + fn handle_buffer( + &mut self, + imp: &MpegTsLiveSource, + buffer: &gst::Buffer, + monotonic_time: Option, + ) -> Result<()> { + let Ok(map) = buffer.map_readable() else { + return Ok(()); + }; + + // Find sync byte + let Some(pos) = map.iter().position(|&b| b == 0x47) else { + bail!("Couldn't find sync byte"); + }; + + for chunk in map[pos..].chunks_exact(188) { + self.handle_packet(imp, chunk, monotonic_time) + .context("handling buffer")?; + } + Ok(()) } } @@ -277,76 +776,10 @@ pub struct MpegTsLiveSource { // Clock set on source element internal_clock: gst::SystemClock, - state: Mutex, -} + // Clock we control and expose + external_clock: gst::SystemClock, -fn find_pcr(slice: &[u8], imp: &MpegTsLiveSource) -> Result> { - // Find sync byte - let Some(pos) = slice.iter().position(|&b| b == 0x47) else { - bail!("Couldn't find sync byte"); - }; - let mut buffer_pcr = None; - - for chunk in slice[pos..].chunks_exact(188) { - if chunk[0] != 0x47 { - gst::error!(CAT, imp = imp, "Lost sync"); - break; - } - let mut reader = BitReader::endian(chunk, BigEndian); - // Sync Byte - reader.skip(8)?; - // Transport Error Indicator - if reader.read_bit()? { - continue; - }; - // PUSI and transport priority - reader.skip(2).context("PUSI and transport priority")?; - // PID - let pid = reader.read::(13).expect("PID"); - // transport scrambling control - reader.skip(2)?; - // Adaptation field present - let af_present = reader.read_bit().context("Adaptation field present")?; - reader.skip(5)?; - if af_present { - // adaptation_field_length - if reader.read::(8).context("adaptation field length")? >= 7 { - reader.skip(3)?; - let pcr_present = reader.read_bit().context("pcr_present")?; - reader.skip(4)?; - if pcr_present { - let pcr_base = reader.read::(33).context("PCR_base")?; - reader.skip(6)?; - let pcr_ext = reader.read::(9).context("PCR_ext")?; - let pcr = pcr_base * 300 + pcr_ext; - gst::debug!(CAT, imp = imp, "PID {pid} PCR {pcr}"); - buffer_pcr = Some(pcr); - break; - } - } - } - } - - Ok(buffer_pcr) -} - -fn get_pcr_from_buffer(imp: &MpegTsLiveSource, buffer: &gst::Buffer) -> Option { - let Ok(range) = buffer.map_readable() else { - return None; - }; - let buffer_pcr = match find_pcr(range.as_slice(), imp) { - Ok(pcr) => pcr, - Err(err) => { - gst::error!(CAT, imp = imp, "Failed parsing MPEG-TS packets: {err}"); - return None; - } - }; - - let Some(raw_pcr) = buffer_pcr else { - gst::debug!(CAT, imp = imp, "No PCR observed in {:?}", buffer); - return None; - }; - Some(raw_pcr) + state: Mutex, } impl MpegTsLiveSource { @@ -366,19 +799,20 @@ impl MpegTsLiveSource { monotonic_time = Some(pts + base_time); }; - if let (Some(monotonic_time), Some(raw_pcr)) = - (monotonic_time, get_pcr_from_buffer(self, &buffer)) - { - if state.store_observation(self, raw_pcr, monotonic_time) { - let buffer = buffer.make_mut(); - buffer.set_flags(gst::BufferFlags::DISCONT); - } - }; + // Parse packets + if let Err(err) = state.handle_buffer(self, &buffer, monotonic_time) { + gst::warning!(CAT, imp = self, "Failed handling buffer: {err:?}"); + } + + if mem::take(&mut state.discont_pending) { + let buffer = buffer.make_mut(); + buffer.set_flags(gst::BufferFlags::DISCONT); + } // Update buffer timestamp if present if let Some(pts) = buffer_timestamp { let buffer = buffer.make_mut(); - let new_pts = state + let new_pts = self .external_clock .adjust_unlocked(pts + base_time) .expect("Couldn't adjust {pts}") @@ -417,20 +851,20 @@ impl MpegTsLiveSource { monotonic_time = Some(pts + base_time); }; - // Store observation if pcr is present - if let (Some(monotonic_time), Some(raw_pcr)) = - (monotonic_time, get_pcr_from_buffer(self, &buffer)) - { - if state.store_observation(self, raw_pcr, monotonic_time) { - let buffer = buffer.make_mut(); - buffer.set_flags(gst::BufferFlags::DISCONT); - } - }; + // Parse packets + if let Err(err) = state.handle_buffer(self, &buffer, monotonic_time) { + gst::warning!(CAT, imp = self, "Failed handling buffer: {err:?}"); + } + + if mem::take(&mut state.discont_pending) { + let buffer = buffer.make_mut(); + buffer.set_flags(gst::BufferFlags::DISCONT); + } // Update buffer timestamp if present if let Some(pts) = this_buffer_timestamp { let buffer = buffer.make_mut(); - let new_pts = state + let new_pts = self .external_clock .adjust_unlocked(pts + base_time) .expect("Couldn't adjust {pts}") @@ -495,13 +929,8 @@ impl ObjectSubclass for MpegTsLiveSource { Self { srcpad, internal_clock, - state: Mutex::new(MpegTSLiveSourceState { - source: None, - external_clock, - last_seen_pcr: None, - base_pcr: None, - base_monotonic: None, - }), + external_clock, + state: Mutex::new(State::default()), } } } @@ -565,8 +994,7 @@ impl ObjectImpl for MpegTsLiveSource { } } "window-size" => { - let state = self.state.lock().unwrap(); - state.external_clock.set_window_size(value.get().unwrap()); + self.external_clock.set_window_size(value.get().unwrap()); } _ => unimplemented!(), } @@ -575,13 +1003,7 @@ impl ObjectImpl for MpegTsLiveSource { fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value { match pspec.name() { "source" => self.state.lock().unwrap().source.to_value(), - "window-size" => self - .state - .lock() - .unwrap() - .external_clock - .window_size() - .to_value(), + "window-size" => self.external_clock.window_size().to_value(), _ => unimplemented!(), } } @@ -629,8 +1051,7 @@ impl ElementImpl for MpegTsLiveSource { gst::error!(CAT, "We can only control live sources"); return Err(gst::StateChangeError); } else if transition == gst::StateChange::PausedToReady { - let mut state = self.state.lock().expect("Could get state"); - state.external_clock.set_calibration( + self.external_clock.set_calibration( gst::ClockTime::from_nseconds(0), gst::ClockTime::from_nseconds(0), 1, @@ -638,12 +1059,10 @@ impl ElementImpl for MpegTsLiveSource { ); // Hack to flush out observations, we set the window-size to the // same value - state - .external_clock - .set_window_size(state.external_clock.window_size()); - state.last_seen_pcr = None; - state.base_monotonic = None; - state.base_pcr = None; + self.external_clock + .set_window_size(self.external_clock.window_size()); + + *self.state.lock().unwrap() = State::default(); } Ok(ret) } @@ -651,13 +1070,7 @@ impl ElementImpl for MpegTsLiveSource { fn set_clock(&self, clock: Option<&gst::Clock>) -> bool { // We only accept our clock if let Some(proposed) = clock { - if *proposed - != self - .state - .lock() - .expect("Couldn't get state") - .external_clock - { + if *proposed != self.external_clock { return false; } } @@ -665,8 +1078,7 @@ impl ElementImpl for MpegTsLiveSource { } fn provide_clock(&self) -> Option { - let state = self.state.lock().expect("Couldn't get state"); - Some(state.external_clock.clone().upcast()) + Some(self.external_clock.clone().upcast()) } fn metadata() -> Option<&'static gst::subclass::ElementMetadata> {