From 44978159a3ddb8da333fa97e51eefef127669e87 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Dr=C3=B6ge?= Date: Fri, 6 Dec 2024 14:34:02 +0200 Subject: [PATCH] mpegtslivesrc: Refactor section parser Part-of: --- Cargo.lock | 1 + net/mpegtslive/Cargo.toml | 1 + net/mpegtslive/src/mpegtslive/imp.rs | 508 +++++------------------- net/mpegtslive/src/mpegtslive/mod.rs | 1 + net/mpegtslive/src/mpegtslive/parser.rs | 465 ++++++++++++++++++++++ 5 files changed, 577 insertions(+), 399 deletions(-) create mode 100644 net/mpegtslive/src/mpegtslive/parser.rs diff --git a/Cargo.lock b/Cargo.lock index 6205dbd5..10e36f46 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2824,6 +2824,7 @@ dependencies = [ "bitstream-io", "gst-plugin-version-helper", "gstreamer", + "smallvec", ] [[package]] diff --git a/net/mpegtslive/Cargo.toml b/net/mpegtslive/Cargo.toml index ce3238c8..799ad6c8 100644 --- a/net/mpegtslive/Cargo.toml +++ b/net/mpegtslive/Cargo.toml @@ -12,6 +12,7 @@ rust-version.workspace = true gst.workspace = true bitstream-io = "2.3" anyhow = "1" +smallvec = "1" [dev-dependencies] diff --git a/net/mpegtslive/src/mpegtslive/imp.rs b/net/mpegtslive/src/mpegtslive/imp.rs index f37ff72d..255a74a6 100644 --- a/net/mpegtslive/src/mpegtslive/imp.rs +++ b/net/mpegtslive/src/mpegtslive/imp.rs @@ -21,18 +21,20 @@ * * Since: plugins-rs-0.13.0 */ -use anyhow::Context; -use anyhow::{bail, Result}; -use bitstream_io::{BigEndian, BitRead, BitReader, FromBitStream}; -use gst::glib; -use gst::prelude::*; -use gst::subclass::prelude::*; -use std::mem; -use std::ops::Add; -use std::ops::ControlFlow; -use std::sync::Mutex; +use anyhow::{bail, Context, Result}; -use std::sync::LazyLock; +use bitstream_io::{BigEndian, BitRead, BitReader}; + +use gst::{glib, prelude::*, subclass::prelude::*}; + +use std::{ + mem, + ops::{Add, ControlFlow}, + sync::LazyLock, + sync::Mutex, +}; + +use super::parser::*; static CAT: LazyLock = LazyLock::new(|| { gst::DebugCategory::new( @@ -174,206 +176,15 @@ struct State { // If the next outgoing packet should have the discont flag set discont_pending: bool, - // Continuity counter for PAT PID - pat_cc: Option, - // Pending PAT payload data from last PAT packet - pat_pending: Vec, - // Pending data starts on pointer field, otherwise on table header - pat_pending_pusi: bool, - // PID used for the PMT of the selected program - pmt_pid: Option, - // Program number of the selected program - pmt_program_num: Option, - // Continuity counter for PMT PID - pmt_cc: Option, - // Pending PMT payload data from last PMT packet - pmt_pending: Vec, - // Pending data starts on pointer fiel, otherwise on table header - pmt_pending_pusi: bool, - // PID used for the PCR of the selected program - pcr_pid: Option, -} + // Section parser for PAT + pat_parser: SectionParser, + // Current PAT, first program is the selected one + pat: Option, -#[derive(Debug)] -#[allow(unused)] -struct PacketHeader { - tei: bool, - pusi: bool, - tp: bool, - pid: u16, - tsc: u8, - afc: u8, - cc: u8, -} - -impl FromBitStream for PacketHeader { - type Error = anyhow::Error; - - fn from_reader(r: &mut R) -> std::result::Result - where - Self: Sized, - { - if r.read_to::().context("sync_byte")? != 0x47 { - bail!("Lost sync"); - } - - let tei = r.read_bit().context("tei")?; - let pusi = r.read_bit().context("pusi")?; - let tp = r.read_bit().context("tp")?; - let pid = r.read::(13).context("pid")?; - - let tsc = r.read::(2).context("tsc")?; - let afc = r.read::(2).context("afc")?; - let cc = r.read::(4).context("cc")?; - - Ok(PacketHeader { - tei, - pusi, - tp, - pid, - tsc, - afc, - cc, - }) - } -} - -#[derive(Debug)] -struct AdaptionField { - pcr: Option, - // Add other fields as needed -} - -impl FromBitStream for AdaptionField { - type Error = anyhow::Error; - - fn from_reader(r: &mut R) -> std::result::Result - where - Self: Sized, - { - r.skip(3).context("flags")?; - let pcr_present = r.read_bit().context("pcr_present")?; - r.skip(4).context("flags")?; - - // PCR present - let pcr = if pcr_present { - let pcr = r.read::(33).context("pcr_base")? * 300; - r.skip(6).context("pcr_reserved")?; - let pcr = pcr + r.read::(9).context("pcr_extension")? % 300; - Some(pcr) - } else { - None - }; - - // Skip all other parts of the adaptation field for now - - Ok(AdaptionField { pcr }) - } -} - -#[derive(Debug)] -struct TableHeader { - table_id: u8, - section_syntax_indicator: bool, - section_length: u16, -} - -impl FromBitStream for TableHeader { - type Error = anyhow::Error; - - fn from_reader(r: &mut R) -> std::result::Result - where - Self: Sized, - { - let table_id = r.read_to::().context("table_id")?; - let section_syntax_indicator = r.read_bit().context("table_syntax_indicator")?; - r.skip(5).context("reserved")?; - let section_length = r.read::(10).context("section_length")?; - - Ok(TableHeader { - table_id, - section_syntax_indicator, - section_length, - }) - } -} - -#[derive(Debug)] -#[allow(unused)] -struct TableSyntaxSection { - table_id_extension: u16, - version_number: u8, - current_next_indicator: bool, - section_number: u8, - last_section_number: u8, -} - -impl FromBitStream for TableSyntaxSection { - type Error = anyhow::Error; - - fn from_reader(r: &mut R) -> std::result::Result - where - Self: Sized, - { - let table_id_extension = r.read_to::().context("table_id_extension")?; - r.skip(2).context("reserved")?; - let version_number = r.read::(5).context("version_number")?; - let current_next_indicator = r.read_bit().context("current_next_indicator")?; - let section_number = r.read_to::().context("section_number")?; - let last_section_number = r.read_to::().context("last_section_number")?; - - Ok(TableSyntaxSection { - table_id_extension, - version_number, - current_next_indicator, - section_number, - last_section_number, - }) - } -} - -#[derive(Debug)] -struct ProgramAccessTable { - program_num: u16, - program_map_pid: u16, -} - -impl FromBitStream for ProgramAccessTable { - type Error = anyhow::Error; - - fn from_reader(r: &mut R) -> std::result::Result - where - Self: Sized, - { - let program_num = r.read_to::().context("program_num")?; - r.skip(3).context("reserved")?; - let program_map_pid = r.read::(13).context("program_map_pid")?; - - Ok(ProgramAccessTable { - program_num, - program_map_pid, - }) - } -} - -#[derive(Debug)] -struct ProgramMappingTable { - pcr_pid: u16, - // Add other fields as needed -} - -impl FromBitStream for ProgramMappingTable { - type Error = anyhow::Error; - - fn from_reader(r: &mut R) -> std::result::Result - where - Self: Sized, - { - r.skip(3).context("reserved")?; - let pcr_pid = r.read::(13).context("pcr_pid")?; - - Ok(ProgramMappingTable { pcr_pid }) - } + // Section parser for PMT + pmt_parser: SectionParser, + // Currently selected PMT + pmt: Option, } impl State { @@ -483,111 +294,106 @@ impl State { self.last_seen_pcr = Some(new_pcr); } - /// Parses an MPEG-TS section and updates the internal state + /// Parses and handles a section fn handle_section( &mut self, imp: &MpegTsLiveSource, header: &PacketHeader, - table_header: &TableHeader, - slice: &[u8], + payload: &[u8], ) -> Result<()> { - gst::trace!( - CAT, - imp = imp, - "Parsing section with header {table_header:?}" - ); + // Read PAT or our selected program's PMT + if header.pid == 0x00_00 { + self.pat_parser.push(header, payload); - // Skip non-PAT/PMT - if table_header.table_id != 0x00 && table_header.table_id != 0x02 - || !table_header.section_syntax_indicator - { - return Ok(()); - } - - let mut section_reader = BitReader::endian(slice, BigEndian); - - let table_syntax_section = section_reader - .parse::() - .context("section")?; - - gst::trace!( - CAT, - imp = imp, - "Parsing section with table syntax section {table_syntax_section:?}" - ); - - if header.pid == 0x00_00 && table_header.table_id == 0x00 { - // PAT - let remaining_length = section_reader.reader().unwrap().len(); - if remaining_length < 4 { - bail!("too short PAT"); - } - let n_pats = (remaining_length - 4) / 4; - if n_pats == 0 { - gst::warning!(CAT, imp = imp, "No programs in PAT"); - return Ok(()); - } - - let mut first = true; - let mut warned = false; - for idx in 0..n_pats { - let pat = section_reader - .parse::() - .context("pat")?; - gst::trace!(CAT, imp = imp, "Parsed PAT {idx}: {pat:?}"); - if pat.program_map_pid == 0 { - // Skip NIT - } else if first { - first = false; - // Our program we select - if Option::zip(self.pmt_pid, self.pmt_program_num) - .map_or(true, |(pid, prog_num)| { - pid != pat.program_map_pid || prog_num != pat.program_num - }) - { + loop { + match self.pat_parser.parse() { + Ok(Some(Section::ProgramAccessTable { + table_header, + table_syntax_section, + pat, + })) => { gst::trace!( CAT, imp = imp, - "Selecting program with PID {} and program number {}", - pat.program_map_pid, - pat.program_num, + "Parsed PAT: {table_header:?} {table_syntax_section:?} {pat:?}" ); - self.pmt_pid = Some(pat.program_map_pid); - self.pmt_program_num = Some(pat.program_num); - self.pmt_pending.clear(); - self.pmt_cc = None; - self.pcr_pid = None; - self.last_seen_pcr = None; + + if pat.is_empty() { + gst::warning!(CAT, imp = imp, "No programs in PAT"); + continue; + } else if pat.len() > 1 { + gst::warning!( + CAT, + imp = imp, + "MPEG-TS stream with multiple programs - timing will be wrong for all but first program", + ); + } + + let selected_pat = &pat[0]; + if header.pid == 0x00_00 && Some(selected_pat) != self.pat.as_ref() { + gst::trace!( + CAT, + imp = imp, + "Selecting program with PID {} and program number {}", + selected_pat.program_map_pid, + selected_pat.program_num, + ); + self.pat = Some(selected_pat.clone()); + self.pmt_parser.clear(); + self.pmt = None; + self.last_seen_pcr = None; + } } - } else { - // Other programs we ignore - if !warned { - gst::warning!( + Ok(Some(section)) => { + gst::trace!( CAT, imp = imp, - "MPEG-TS stream with multiple programs - timing will be wrong for all but first program", + "Parsed unhandled section {section:?} on PAT PID" ); - warned = true; + } + Ok(None) => break, + Err(err) => { + gst::warning!(CAT, imp = imp, "Failed parsing section: {err:?}"); } } } - } else if Some(header.pid) == self.pmt_pid - && Some(table_syntax_section.table_id_extension) == self.pmt_program_num - && table_header.table_id == 0x02 - { - // PMT - let pmt = section_reader - .parse::() - .context("pmt")?; - gst::trace!( - CAT, - imp = imp, - "Parsed PMT for selected program number {}: {pmt:?}", - table_syntax_section.table_id_extension - ); - if self.pcr_pid.map_or(true, |pcr_pid| pcr_pid != pmt.pcr_pid) { - self.pcr_pid = Some(pmt.pcr_pid); - self.last_seen_pcr = None; + } else if self.pat.as_ref().map(|pat| pat.program_map_pid) == Some(header.pid) { + self.pmt_parser.push(header, payload); + + loop { + match self.pmt_parser.parse() { + Ok(Some(Section::ProgramMappingTable { + table_header, + table_syntax_section, + pmt, + })) => { + gst::trace!( + CAT, + imp = imp, + "Parsed PMT: {table_header:?} {table_syntax_section:?} {pmt:?}" + ); + + if self.pat.as_ref().map(|pat| pat.program_num) + == Some(table_syntax_section.table_id_extension) + && self.pmt.as_ref() != Some(&pmt) + { + gst::trace!(CAT, imp = imp, "Selecting PCR PID {}", pmt.pcr_pid); + self.pmt = Some(pmt); + self.last_seen_pcr = None; + } + } + Ok(Some(section)) => { + gst::trace!( + CAT, + imp = imp, + "Parsed unhandled section {section:?} on PMT PID" + ); + } + Ok(None) => break, + Err(err) => { + gst::warning!(CAT, imp = imp, "Failed parsing section: {err:?}"); + } + } } } @@ -626,7 +432,7 @@ impl State { reader.skip(8 * length as u32).context("af")?; // Parse adaption field and update PCR if it's the PID of our selected program - if self.pcr_pid == Some(header.pid) { + if self.pmt.as_ref().map(|pmt| pmt.pcr_pid) == Some(header.pid) { let mut af_reader = BitReader::endian(af, BigEndian); let adaptation_field = af_reader.parse::().context("af")?; @@ -649,106 +455,10 @@ impl State { if header.afc & 0x1 != 0 { let new_payload = *reader.reader().unwrap(); - // Read PAT or our selected program's PMT - if header.pid == 0x00_00 || self.pmt_pid == Some(header.pid) { - let (cc, mut pending, pending_pusi) = if header.pid == 0x00_00 { - ( - &mut self.pat_cc, - mem::take(&mut self.pat_pending), - self.pat_pending_pusi, - ) - } else { - ( - &mut self.pmt_cc, - mem::take(&mut self.pmt_pending), - self.pmt_pending_pusi, - ) - }; - - // Clear any pending data if necessary - if header.pusi || cc.map_or(true, |cc| (cc + 1) & 0xf != header.cc) { - pending.clear(); - } - *cc = Some(header.cc); - - // Skip packet if this is not the start of a section - if !header.pusi && pending.is_empty() { - return Ok(()); - } - - // Store payload for parsing, in case it's split over multiple packets - pending.extend_from_slice(new_payload); - - // No payload - if pending.is_empty() { - return Ok(()); - } - - let payload = pending.as_slice(); - let mut pusi = header.pusi || pending_pusi; - let mut payload_reader = BitReader::endian(payload, BigEndian); - loop { - let remaining_payload = payload_reader.reader().unwrap(); - - let table_header; - if pusi { - assert!(!remaining_payload.is_empty()); - let pointer_field = remaining_payload[0] as usize; - // Need more data - if payload_reader.reader().unwrap().len() < 1 + pointer_field + 3 { - break; - } - - // Skip padding - payload_reader.skip(8 + 8 * pointer_field as u32).unwrap(); - pusi = false; - // Peek table header, payload_reader stays at beginning of section header - table_header = payload_reader.clone().parse::().unwrap(); - } else if remaining_payload.len() < 3 { - // Need more data for table header - break; - } else { - // Peek table header, payload_reader stays at beginning of section header - table_header = payload_reader.clone().parse::().unwrap(); - } - - // Need more data for this section. payload_reader is still at beginning of - // section header so require 3 extra bytes - let remaining_length = payload_reader.reader().unwrap().len(); - if remaining_length < 3 + table_header.section_length as usize { - break; - } - - // Skip table header - payload_reader.skip(8 * 3).unwrap(); - let section = - &payload_reader.reader().unwrap()[..table_header.section_length as usize]; - // Skip whole section so the reader is at the beginning of the next section header - payload_reader - .skip(8 * table_header.section_length as u32) - .unwrap(); - - if let Err(err) = self.handle_section(imp, &header, &table_header, section) { - gst::warning!( - CAT, - imp = imp, - "Failed parsing section {table_header:?}: {err:?}" - ); - } - } - - // Skip all already parsed sections - let remaining_length = payload_reader.reader().unwrap().len(); - let new_pending_range = (pending.len() - remaining_length)..pending.len(); - pending.copy_within(new_pending_range, 0); - pending.resize(remaining_length, 0u8); - if header.pid == 0x00_00 { - self.pat_pending = pending; - self.pat_pending_pusi = pusi; - } else { - self.pmt_pending = pending; - self.pmt_pending_pusi = pusi; - } + if header.pid == 0x00_00 + || self.pat.as_ref().map(|pat| pat.program_map_pid) == Some(header.pid) + { + self.handle_section(imp, &header, new_payload)?; } // Skip everything else diff --git a/net/mpegtslive/src/mpegtslive/mod.rs b/net/mpegtslive/src/mpegtslive/mod.rs index 379dc73e..32fa8090 100644 --- a/net/mpegtslive/src/mpegtslive/mod.rs +++ b/net/mpegtslive/src/mpegtslive/mod.rs @@ -10,6 +10,7 @@ use gst::glib; use gst::prelude::*; mod imp; +mod parser; glib::wrapper! { pub struct MpegTsLiveSource(ObjectSubclass) @extends gst::Bin, gst::Element, gst::Object; diff --git a/net/mpegtslive/src/mpegtslive/parser.rs b/net/mpegtslive/src/mpegtslive/parser.rs new file mode 100644 index 00000000..c10571f2 --- /dev/null +++ b/net/mpegtslive/src/mpegtslive/parser.rs @@ -0,0 +1,465 @@ +// Copyright (C) 2024 Sebastian Dröge +// +// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at +// . +// +// SPDX-License-Identifier: MPL-2.0 + +#![allow(unused)] + +use anyhow::{bail, Context, Result}; +use bitstream_io::{BigEndian, BitRead, BitReader, FromBitStream}; +use smallvec::SmallVec; + +pub struct SectionParser { + /// Current value of the continuity counter + cc: Option, + /// Pending PSI data + pending: Vec, + /// If we skip data until the next PUSI + waiting_for_pusi: bool, + /// If pending starts on PUSI, i.e. still contains potential padding + pending_starts_on_pusi: bool, +} + +impl Default for SectionParser { + fn default() -> Self { + Self { + cc: None, + pending: Vec::new(), + waiting_for_pusi: true, + pending_starts_on_pusi: false, + } + } +} + +impl SectionParser { + /// Push PSI `payload`. + /// + /// After this call `parse()` until `None` is returned. + pub fn push(&mut self, header: &PacketHeader, payload: &[u8]) { + if header.pusi { + self.clear(); + } else if self.cc.map_or(true, |cc| (cc + 1) & 0xf != header.cc) { + self.clear(); + self.waiting_for_pusi = true; + // Not start of a payload and we didn't see the start, just return + return; + } else if self.waiting_for_pusi { + // Not start of a payload and we didn't see the start, just return + return; + } + self.cc = Some(header.cc); + + // Store payload for parsing, in case it's split over multiple packets + if header.pusi { + self.waiting_for_pusi = false; + self.pending_starts_on_pusi = true; + } + self.pending.extend_from_slice(payload); + } + + /// Parse PSI payload that is currently queued up. + /// + /// Call until `None` is returned, which means that more data is required to continue parsing. + /// + /// It's safe to call this again after errors. + pub fn parse(&mut self) -> Result> { + // No payload to handle right now + if self.pending.is_empty() { + return Ok(None); + } + + let payload = self.pending.as_slice(); + + // Skip padding first + if self.pending_starts_on_pusi { + let pointer_field = payload[0] as usize; + // Need more data + if payload.len() < 1 + pointer_field { + return Ok(None); + } + + // Skip padding + self.pending.copy_within(1 + pointer_field.., 0); + let new_length = self.pending.len() - 1 - pointer_field; + self.pending.resize(new_length, 0u8); + self.pending_starts_on_pusi = false; + } + + let payload = self.pending.as_slice(); + if payload.len() < 3 { + // Need more data for table header + return Ok(None); + } + + // Parse table header, payload_reader stays at beginning of section header + let mut payload_reader = BitReader::endian(payload, BigEndian); + let table_header = match payload_reader + .parse::() + .context("table_header") + { + Ok(table_header) => table_header, + Err(err) => { + self.clear(); + return Err(err); + } + }; + + // Need more data for this section, don't update pending + let remaining_length = payload_reader.reader().unwrap().len(); + if remaining_length < table_header.section_length as usize { + return Ok(None); + } + + let section = &payload_reader.reader().unwrap()[..table_header.section_length as usize]; + // Skip whole section so the reader is at the beginning of the next section header + payload_reader + .skip(8 * table_header.section_length as u32) + .unwrap(); + + let section = Self::parse_section(&table_header, section); + + // Skip parsed section, even in case of parsing error + let remaining_length = payload_reader.reader().unwrap().len(); + let new_pending_range = (self.pending.len() - remaining_length)..; + self.pending.copy_within(new_pending_range, 0); + self.pending.resize(remaining_length, 0u8); + + section + .map(Some) + .map_err(|err| err.context(format!("section with table header {table_header:?}"))) + } + + fn parse_section(table_header: &TableHeader, section: &[u8]) -> Result
{ + let mut section_reader = BitReader::endian(section, BigEndian); + + // TODO: If TSS is available one could check the CRC32 at the end of the section + let table_syntax_section = if table_header.section_syntax_indicator { + Some( + section_reader + .parse::() + .context("section")?, + ) + } else { + None + }; + + let section = match table_header.table_id { + // PAT + 0x00 => { + let Some(table_syntax_section) = table_syntax_section else { + bail!("PAT without TSS"); + }; + + let remaining_length = section_reader.reader().unwrap().len(); + if remaining_length < 4 { + bail!("too short PAT"); + } + let n_pats = (remaining_length - 4) / 4; + let mut pat = SmallVec::with_capacity(n_pats); + for _ in 0..n_pats { + pat.push( + section_reader + .parse::() + .context("pat_entry")?, + ); + } + + Section::ProgramAccessTable { + table_header: table_header.clone(), + table_syntax_section, + pat, + } + } + // PAT + 0x02 => { + let Some(table_syntax_section) = table_syntax_section else { + bail!("PMT without TSS"); + }; + let pmt = section_reader + .parse::() + .context("pmt")?; + + Section::ProgramMappingTable { + table_header: table_header.clone(), + table_syntax_section, + pmt, + } + } + // Unknown + _ => Section::Unknown { + table_header: table_header.clone(), + table_syntax_section, + }, + }; + + Ok(section) + } + + pub fn clear(&mut self) { + self.cc = None; + self.pending.clear(); + self.pending_starts_on_pusi = false; + self.waiting_for_pusi = true; + } +} + +#[derive(Debug, Clone)] +pub enum Section { + ProgramAccessTable { + table_header: TableHeader, + table_syntax_section: TableSyntaxSection, + pat: SmallVec<[ProgramAccessTable; 4]>, + }, + ProgramMappingTable { + table_header: TableHeader, + table_syntax_section: TableSyntaxSection, + pmt: ProgramMappingTable, + }, + Unknown { + table_header: TableHeader, + table_syntax_section: Option, + }, +} + +#[derive(Debug, Clone)] +pub struct TableHeader { + pub table_id: u8, + pub section_syntax_indicator: bool, + pub section_length: u16, +} + +impl FromBitStream for TableHeader { + type Error = anyhow::Error; + + fn from_reader(r: &mut R) -> std::result::Result + where + Self: Sized, + { + let table_id = r.read_to::().context("table_id")?; + let section_syntax_indicator = r.read_bit().context("table_syntax_indicator")?; + r.skip(5).context("reserved")?; + let section_length = r.read::(10).context("section_length")?; + + Ok(TableHeader { + table_id, + section_syntax_indicator, + section_length, + }) + } +} + +#[derive(Debug, Clone)] +pub struct TableSyntaxSection { + pub table_id_extension: u16, + pub version_number: u8, + pub current_next_indicator: bool, + pub section_number: u8, + pub last_section_number: u8, +} + +impl FromBitStream for TableSyntaxSection { + type Error = anyhow::Error; + + fn from_reader(r: &mut R) -> std::result::Result + where + Self: Sized, + { + let table_id_extension = r.read_to::().context("table_id_extension")?; + r.skip(2).context("reserved")?; + let version_number = r.read::(5).context("version_number")?; + let current_next_indicator = r.read_bit().context("current_next_indicator")?; + let section_number = r.read_to::().context("section_number")?; + let last_section_number = r.read_to::().context("last_section_number")?; + + Ok(TableSyntaxSection { + table_id_extension, + version_number, + current_next_indicator, + section_number, + last_section_number, + }) + } +} + +#[derive(Debug, Clone, PartialEq)] +pub struct ProgramAccessTable { + pub program_num: u16, + pub program_map_pid: u16, +} + +impl FromBitStream for ProgramAccessTable { + type Error = anyhow::Error; + + fn from_reader(r: &mut R) -> std::result::Result + where + Self: Sized, + { + let program_num = r.read_to::().context("program_num")?; + r.skip(3).context("reserved")?; + let program_map_pid = r.read::(13).context("program_map_pid")?; + + Ok(ProgramAccessTable { + program_num, + program_map_pid, + }) + } +} + +#[derive(Debug, Clone, PartialEq)] +pub struct ProgramMappingTable { + pub pcr_pid: u16, + pub elementary_pids: SmallVec<[u16; 16]>, + // Add other fields as needed +} + +impl FromBitStream for ProgramMappingTable { + type Error = anyhow::Error; + + fn from_reader(r: &mut R) -> std::result::Result + where + Self: Sized, + { + r.skip(3).context("reserved")?; + let pcr_pid = r.read::(13).context("pcr_pid")?; + r.skip(4).context("reserved")?; + r.skip(2).context("program_info_length_unused")?; + + let program_info_length = r.read::(10).context("program_info_length")?; + r.skip(8 * program_info_length as u32) + .context("program_descriptors")?; + + fn try_read Result, T>( + r: &mut R, + op: F, + ) -> Result, std::io::Error> { + match op(r) { + Ok(v) => Ok(Some(v)), + Err(err) if err.kind() == std::io::ErrorKind::UnexpectedEof => Ok(None), + Err(err) => Err(err), + } + } + + let mut elementary_pids = SmallVec::new(); + loop { + let Some(_stream_type) = try_read(r, |r| r.read_to::()).context("stream_type")? + else { + break; + }; + let Some(_) = try_read(r, |r| r.skip(3)).context("reserved_bits")? else { + break; + }; + + let Some(elementary_pid) = + try_read(r, |r| r.read::(13)).context("elementary_pid")? + else { + break; + }; + + let Some(_) = try_read(r, |r| r.skip(4)).context("reserved_bits")? else { + break; + }; + + let Some(_) = try_read(r, |r| r.skip(2)).context("es_info_length_unused_bits")? else { + break; + }; + + let Some(es_info_length) = + try_read(r, |r| r.read::(10)).context("es_info_length")? + else { + break; + }; + + let Some(_) = + try_read(r, |r| r.skip(8 * es_info_length as u32)).context("es_descriptors")? + else { + break; + }; + + elementary_pids.push(elementary_pid); + } + + Ok(ProgramMappingTable { + pcr_pid, + elementary_pids, + }) + } +} + +#[derive(Debug, Clone)] +pub struct PacketHeader { + pub tei: bool, + pub pusi: bool, + pub tp: bool, + pub pid: u16, + pub tsc: u8, + pub afc: u8, + pub cc: u8, +} + +impl FromBitStream for PacketHeader { + type Error = anyhow::Error; + + fn from_reader(r: &mut R) -> std::result::Result + where + Self: Sized, + { + if r.read_to::().context("sync_byte")? != 0x47 { + bail!("Lost sync"); + } + + let tei = r.read_bit().context("tei")?; + let pusi = r.read_bit().context("pusi")?; + let tp = r.read_bit().context("tp")?; + let pid = r.read::(13).context("pid")?; + + let tsc = r.read::(2).context("tsc")?; + let afc = r.read::(2).context("afc")?; + let cc = r.read::(4).context("cc")?; + + Ok(PacketHeader { + tei, + pusi, + tp, + pid, + tsc, + afc, + cc, + }) + } +} + +#[derive(Debug, Clone)] +pub struct AdaptionField { + pub pcr: Option, + // Add other fields as needed +} + +impl FromBitStream for AdaptionField { + type Error = anyhow::Error; + + fn from_reader(r: &mut R) -> std::result::Result + where + Self: Sized, + { + r.skip(3).context("flags")?; + let pcr_present = r.read_bit().context("pcr_present")?; + r.skip(4).context("flags")?; + + // PCR present + let pcr = if pcr_present { + let pcr = r.read::(33).context("pcr_base")? * 300; + r.skip(6).context("pcr_reserved")?; + let pcr = pcr + r.read::(9).context("pcr_extension")? % 300; + Some(pcr) + } else { + None + }; + + // Skip all other parts of the adaptation field for now + + Ok(AdaptionField { pcr }) + } +}