mirror of
https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs.git
synced 2024-12-22 18:16:28 +00:00
mpegtslivesrc: Parse PAT/PMT and only handle PCRs from the first program
This matches default behaviour of tsdemux and makes sure we're not jumping between different PCRs if there are multiple. At a later time, program selection could be implemented. Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1887>
This commit is contained in:
parent
5010ee872d
commit
41ddbd8706
1 changed files with 550 additions and 138 deletions
|
@ -23,10 +23,11 @@
|
||||||
*/
|
*/
|
||||||
use anyhow::Context;
|
use anyhow::Context;
|
||||||
use anyhow::{bail, Result};
|
use anyhow::{bail, Result};
|
||||||
use bitstream_io::{BigEndian, BitRead, BitReader};
|
use bitstream_io::{BigEndian, BitRead, BitReader, FromBitStream};
|
||||||
use gst::glib;
|
use gst::glib;
|
||||||
use gst::prelude::*;
|
use gst::prelude::*;
|
||||||
use gst::subclass::prelude::*;
|
use gst::subclass::prelude::*;
|
||||||
|
use std::mem;
|
||||||
use std::ops::Add;
|
use std::ops::Add;
|
||||||
use std::ops::ControlFlow;
|
use std::ops::ControlFlow;
|
||||||
use std::sync::Mutex;
|
use std::sync::Mutex;
|
||||||
|
@ -158,31 +159,231 @@ impl From<gst::ClockTime> for MpegTsPcr {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
struct MpegTSLiveSourceState {
|
#[derive(Default)]
|
||||||
|
struct State {
|
||||||
// Controlled source element
|
// Controlled source element
|
||||||
source: Option<gst::Element>,
|
source: Option<gst::Element>,
|
||||||
|
|
||||||
// Clock we control and expose
|
|
||||||
external_clock: gst::SystemClock,
|
|
||||||
|
|
||||||
// Last observed PCR (for handling wraparound)
|
// Last observed PCR (for handling wraparound)
|
||||||
last_seen_pcr: Option<MpegTsPcr>,
|
last_seen_pcr: Option<MpegTsPcr>,
|
||||||
|
|
||||||
// First observed PCR and associated timestamp
|
// First observed PCR and associated timestamp
|
||||||
base_pcr: Option<MpegTsPcr>,
|
base_pcr: Option<MpegTsPcr>,
|
||||||
base_monotonic: Option<gst::ClockTime>,
|
base_monotonic: Option<gst::ClockTime>,
|
||||||
|
|
||||||
|
// If the next outgoing packet should have the discont flag set
|
||||||
|
discont_pending: bool,
|
||||||
|
|
||||||
|
// Continuity counter for PAT PID
|
||||||
|
pat_cc: Option<u8>,
|
||||||
|
// Pending PAT payload data from last PAT packet
|
||||||
|
pat_pending: Vec<u8>,
|
||||||
|
// Pending data starts on pointer field, otherwise on table header
|
||||||
|
pat_pending_pusi: bool,
|
||||||
|
// PID used for the PMT of the selected program
|
||||||
|
pmt_pid: Option<u16>,
|
||||||
|
// Program number of the selected program
|
||||||
|
pmt_program_num: Option<u16>,
|
||||||
|
// Continuity counter for PMT PID
|
||||||
|
pmt_cc: Option<u8>,
|
||||||
|
// Pending PMT payload data from last PMT packet
|
||||||
|
pmt_pending: Vec<u8>,
|
||||||
|
// Pending data starts on pointer fiel, otherwise on table header
|
||||||
|
pmt_pending_pusi: bool,
|
||||||
|
// PID used for the PCR of the selected program
|
||||||
|
pcr_pid: Option<u16>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl MpegTSLiveSourceState {
|
#[derive(Debug)]
|
||||||
/// Grab time of our clock and controlled clock
|
#[allow(unused)]
|
||||||
///
|
struct PacketHeader {
|
||||||
/// Returns `true` on PCR discontinuities.
|
tei: bool,
|
||||||
|
pusi: bool,
|
||||||
|
tp: bool,
|
||||||
|
pid: u16,
|
||||||
|
tsc: u8,
|
||||||
|
afc: u8,
|
||||||
|
cc: u8,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl FromBitStream for PacketHeader {
|
||||||
|
type Error = anyhow::Error;
|
||||||
|
|
||||||
|
fn from_reader<R: BitRead + ?Sized>(r: &mut R) -> std::result::Result<Self, Self::Error>
|
||||||
|
where
|
||||||
|
Self: Sized,
|
||||||
|
{
|
||||||
|
if r.read_to::<u8>().context("sync_byte")? != 0x47 {
|
||||||
|
bail!("Lost sync");
|
||||||
|
}
|
||||||
|
|
||||||
|
let tei = r.read_bit().context("tei")?;
|
||||||
|
let pusi = r.read_bit().context("pusi")?;
|
||||||
|
let tp = r.read_bit().context("tp")?;
|
||||||
|
let pid = r.read::<u16>(13).context("pid")?;
|
||||||
|
|
||||||
|
let tsc = r.read::<u8>(2).context("tsc")?;
|
||||||
|
let afc = r.read::<u8>(2).context("afc")?;
|
||||||
|
let cc = r.read::<u8>(4).context("cc")?;
|
||||||
|
|
||||||
|
Ok(PacketHeader {
|
||||||
|
tei,
|
||||||
|
pusi,
|
||||||
|
tp,
|
||||||
|
pid,
|
||||||
|
tsc,
|
||||||
|
afc,
|
||||||
|
cc,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
struct AdaptionField {
|
||||||
|
pcr: Option<u64>,
|
||||||
|
// Add other fields as needed
|
||||||
|
}
|
||||||
|
|
||||||
|
impl FromBitStream for AdaptionField {
|
||||||
|
type Error = anyhow::Error;
|
||||||
|
|
||||||
|
fn from_reader<R: BitRead + ?Sized>(r: &mut R) -> std::result::Result<Self, Self::Error>
|
||||||
|
where
|
||||||
|
Self: Sized,
|
||||||
|
{
|
||||||
|
r.skip(3).context("flags")?;
|
||||||
|
let pcr_present = r.read_bit().context("pcr_present")?;
|
||||||
|
r.skip(4).context("flags")?;
|
||||||
|
|
||||||
|
// PCR present
|
||||||
|
let pcr = if pcr_present {
|
||||||
|
let pcr = r.read::<u64>(33).context("pcr_base")? * 300;
|
||||||
|
r.skip(6).context("pcr_reserved")?;
|
||||||
|
let pcr = pcr + r.read::<u64>(9).context("pcr_extension")? % 300;
|
||||||
|
Some(pcr)
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
};
|
||||||
|
|
||||||
|
// Skip all other parts of the adaptation field for now
|
||||||
|
|
||||||
|
Ok(AdaptionField { pcr })
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
struct TableHeader {
|
||||||
|
table_id: u8,
|
||||||
|
section_syntax_indicator: bool,
|
||||||
|
section_length: u16,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl FromBitStream for TableHeader {
|
||||||
|
type Error = anyhow::Error;
|
||||||
|
|
||||||
|
fn from_reader<R: BitRead + ?Sized>(r: &mut R) -> std::result::Result<Self, Self::Error>
|
||||||
|
where
|
||||||
|
Self: Sized,
|
||||||
|
{
|
||||||
|
let table_id = r.read_to::<u8>().context("table_id")?;
|
||||||
|
let section_syntax_indicator = r.read_bit().context("table_syntax_indicator")?;
|
||||||
|
r.skip(5).context("reserved")?;
|
||||||
|
let section_length = r.read::<u16>(10).context("section_length")?;
|
||||||
|
|
||||||
|
Ok(TableHeader {
|
||||||
|
table_id,
|
||||||
|
section_syntax_indicator,
|
||||||
|
section_length,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
#[allow(unused)]
|
||||||
|
struct TableSyntaxSection {
|
||||||
|
table_id_extension: u16,
|
||||||
|
version_number: u8,
|
||||||
|
current_next_indicator: bool,
|
||||||
|
section_number: u8,
|
||||||
|
last_section_number: u8,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl FromBitStream for TableSyntaxSection {
|
||||||
|
type Error = anyhow::Error;
|
||||||
|
|
||||||
|
fn from_reader<R: BitRead + ?Sized>(r: &mut R) -> std::result::Result<Self, Self::Error>
|
||||||
|
where
|
||||||
|
Self: Sized,
|
||||||
|
{
|
||||||
|
let table_id_extension = r.read_to::<u16>().context("table_id_extension")?;
|
||||||
|
r.skip(2).context("reserved")?;
|
||||||
|
let version_number = r.read::<u8>(5).context("version_number")?;
|
||||||
|
let current_next_indicator = r.read_bit().context("current_next_indicator")?;
|
||||||
|
let section_number = r.read_to::<u8>().context("section_number")?;
|
||||||
|
let last_section_number = r.read_to::<u8>().context("last_section_number")?;
|
||||||
|
|
||||||
|
Ok(TableSyntaxSection {
|
||||||
|
table_id_extension,
|
||||||
|
version_number,
|
||||||
|
current_next_indicator,
|
||||||
|
section_number,
|
||||||
|
last_section_number,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
struct ProgramAccessTable {
|
||||||
|
program_num: u16,
|
||||||
|
program_map_pid: u16,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl FromBitStream for ProgramAccessTable {
|
||||||
|
type Error = anyhow::Error;
|
||||||
|
|
||||||
|
fn from_reader<R: BitRead + ?Sized>(r: &mut R) -> std::result::Result<Self, Self::Error>
|
||||||
|
where
|
||||||
|
Self: Sized,
|
||||||
|
{
|
||||||
|
let program_num = r.read_to::<u16>().context("program_num")?;
|
||||||
|
r.skip(3).context("reserved")?;
|
||||||
|
let program_map_pid = r.read::<u16>(13).context("program_map_pid")?;
|
||||||
|
|
||||||
|
Ok(ProgramAccessTable {
|
||||||
|
program_num,
|
||||||
|
program_map_pid,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
struct ProgramMappingTable {
|
||||||
|
pcr_pid: u16,
|
||||||
|
// Add other fields as needed
|
||||||
|
}
|
||||||
|
|
||||||
|
impl FromBitStream for ProgramMappingTable {
|
||||||
|
type Error = anyhow::Error;
|
||||||
|
|
||||||
|
fn from_reader<R: BitRead + ?Sized>(r: &mut R) -> std::result::Result<Self, Self::Error>
|
||||||
|
where
|
||||||
|
Self: Sized,
|
||||||
|
{
|
||||||
|
r.skip(3).context("reserved")?;
|
||||||
|
let pcr_pid = r.read::<u16>(13).context("pcr_pid")?;
|
||||||
|
|
||||||
|
Ok(ProgramMappingTable { pcr_pid })
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl State {
|
||||||
|
/// Store PCR / monotonic time observation
|
||||||
fn store_observation(
|
fn store_observation(
|
||||||
&mut self,
|
&mut self,
|
||||||
imp: &MpegTsLiveSource,
|
imp: &MpegTsLiveSource,
|
||||||
pcr: u64,
|
pcr: u64,
|
||||||
monotonic_time: gst::ClockTime,
|
monotonic_time: gst::ClockTime,
|
||||||
) -> bool {
|
) {
|
||||||
// If this is the first PCR we observe:
|
// If this is the first PCR we observe:
|
||||||
// * Remember the PCR *and* the associated monotonic clock value when capture
|
// * Remember the PCR *and* the associated monotonic clock value when capture
|
||||||
// * `base_pcr` `base_monotonic`
|
// * `base_pcr` `base_monotonic`
|
||||||
|
@ -193,7 +394,6 @@ impl MpegTSLiveSourceState {
|
||||||
// * Store (observation_monotonic, buffer_pts)
|
// * Store (observation_monotonic, buffer_pts)
|
||||||
|
|
||||||
let new_pcr: MpegTsPcr;
|
let new_pcr: MpegTsPcr;
|
||||||
let mut discont = false;
|
|
||||||
|
|
||||||
if let (Some(base_pcr), Some(base_monotonic), Some(last_seen_pcr)) =
|
if let (Some(base_pcr), Some(base_monotonic), Some(last_seen_pcr)) =
|
||||||
(self.base_pcr, self.base_monotonic, self.last_seen_pcr)
|
(self.base_pcr, self.base_monotonic, self.last_seen_pcr)
|
||||||
|
@ -204,7 +404,7 @@ impl MpegTSLiveSourceState {
|
||||||
if let Some(new_pcr) = handled_pcr {
|
if let Some(new_pcr) = handled_pcr {
|
||||||
// First check if this is more than 1s off from the current clock calibration and
|
// First check if this is more than 1s off from the current clock calibration and
|
||||||
// if so consider it a discontinuity too.
|
// if so consider it a discontinuity too.
|
||||||
let (internal, external, num, denom) = self.external_clock.calibration();
|
let (internal, external, num, denom) = imp.external_clock.calibration();
|
||||||
|
|
||||||
let expected_external = gst::Clock::adjust_with_calibration(
|
let expected_external = gst::Clock::adjust_with_calibration(
|
||||||
monotonic_time,
|
monotonic_time,
|
||||||
|
@ -235,12 +435,12 @@ impl MpegTSLiveSourceState {
|
||||||
gst::ClockTime::from(new_pcr.saturating_sub(base_pcr)) + base_monotonic,
|
gst::ClockTime::from(new_pcr.saturating_sub(base_pcr)) + base_monotonic,
|
||||||
monotonic_time,
|
monotonic_time,
|
||||||
);
|
);
|
||||||
self.external_clock.add_observation(
|
imp.external_clock.add_observation(
|
||||||
monotonic_time,
|
monotonic_time,
|
||||||
gst::ClockTime::from(new_pcr.saturating_sub(base_pcr)) + base_monotonic,
|
gst::ClockTime::from(new_pcr.saturating_sub(base_pcr)) + base_monotonic,
|
||||||
);
|
);
|
||||||
} else {
|
} else {
|
||||||
let (internal, external, num, denom) = self.external_clock.calibration();
|
let (internal, external, num, denom) = imp.external_clock.calibration();
|
||||||
let scaled_monotonic = gst::Clock::adjust_with_calibration(
|
let scaled_monotonic = gst::Clock::adjust_with_calibration(
|
||||||
monotonic_time,
|
monotonic_time,
|
||||||
internal,
|
internal,
|
||||||
|
@ -248,11 +448,15 @@ impl MpegTSLiveSourceState {
|
||||||
num,
|
num,
|
||||||
denom,
|
denom,
|
||||||
);
|
);
|
||||||
gst::warning!(CAT, imp = imp, "DISCONT detected, Picking new reference times (pcr:{pcr:#?}, monotonic:{monotonic_time}, scaled monotonic:{scaled_monotonic}");
|
gst::warning!(
|
||||||
|
CAT,
|
||||||
|
imp = imp,
|
||||||
|
"DISCONT detected, Picking new reference times (pcr:{pcr:#?}, monotonic:{monotonic_time}, scaled monotonic:{scaled_monotonic}",
|
||||||
|
);
|
||||||
new_pcr = MpegTsPcr::new(pcr);
|
new_pcr = MpegTsPcr::new(pcr);
|
||||||
self.base_pcr = Some(new_pcr);
|
self.base_pcr = Some(new_pcr);
|
||||||
self.base_monotonic = Some(monotonic_time);
|
self.base_monotonic = Some(monotonic_time);
|
||||||
discont = true;
|
self.discont_pending = true;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
gst::debug!(
|
gst::debug!(
|
||||||
|
@ -263,10 +467,305 @@ impl MpegTSLiveSourceState {
|
||||||
new_pcr = MpegTsPcr::new(pcr);
|
new_pcr = MpegTsPcr::new(pcr);
|
||||||
self.base_pcr = Some(new_pcr);
|
self.base_pcr = Some(new_pcr);
|
||||||
self.base_monotonic = Some(monotonic_time);
|
self.base_monotonic = Some(monotonic_time);
|
||||||
|
self.discont_pending = true;
|
||||||
}
|
}
|
||||||
self.last_seen_pcr = Some(new_pcr);
|
self.last_seen_pcr = Some(new_pcr);
|
||||||
|
}
|
||||||
|
|
||||||
discont
|
/// Parses an MPEG-TS section and updates the internal state
|
||||||
|
fn handle_section(
|
||||||
|
&mut self,
|
||||||
|
imp: &MpegTsLiveSource,
|
||||||
|
header: &PacketHeader,
|
||||||
|
table_header: &TableHeader,
|
||||||
|
slice: &[u8],
|
||||||
|
) -> Result<()> {
|
||||||
|
gst::trace!(
|
||||||
|
CAT,
|
||||||
|
imp = imp,
|
||||||
|
"Parsing section with header {table_header:?}"
|
||||||
|
);
|
||||||
|
|
||||||
|
// Skip non-PAT/PMT
|
||||||
|
if table_header.table_id != 0x00 && table_header.table_id != 0x02
|
||||||
|
|| !table_header.section_syntax_indicator
|
||||||
|
{
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut section_reader = BitReader::endian(slice, BigEndian);
|
||||||
|
|
||||||
|
let table_syntax_section = section_reader
|
||||||
|
.parse::<TableSyntaxSection>()
|
||||||
|
.context("section")?;
|
||||||
|
|
||||||
|
gst::trace!(
|
||||||
|
CAT,
|
||||||
|
imp = imp,
|
||||||
|
"Parsing section with table syntax section {table_syntax_section:?}"
|
||||||
|
);
|
||||||
|
|
||||||
|
if header.pid == 0x00_00 && table_header.table_id == 0x00 {
|
||||||
|
// PAT
|
||||||
|
let remaining_length = section_reader.reader().unwrap().len();
|
||||||
|
if remaining_length < 4 {
|
||||||
|
bail!("too short PAT");
|
||||||
|
}
|
||||||
|
let n_pats = (remaining_length - 4) / 4;
|
||||||
|
if n_pats == 0 {
|
||||||
|
gst::warning!(CAT, imp = imp, "No programs in PAT");
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut first = true;
|
||||||
|
let mut warned = false;
|
||||||
|
for idx in 0..n_pats {
|
||||||
|
let pat = section_reader
|
||||||
|
.parse::<ProgramAccessTable>()
|
||||||
|
.context("pat")?;
|
||||||
|
gst::trace!(CAT, imp = imp, "Parsed PAT {idx}: {pat:?}");
|
||||||
|
if pat.program_map_pid == 0 {
|
||||||
|
// Skip NIT
|
||||||
|
} else if first {
|
||||||
|
first = false;
|
||||||
|
// Our program we select
|
||||||
|
if Option::zip(self.pmt_pid, self.pmt_program_num)
|
||||||
|
.map_or(true, |(pid, prog_num)| {
|
||||||
|
pid != pat.program_map_pid || prog_num != pat.program_num
|
||||||
|
})
|
||||||
|
{
|
||||||
|
gst::trace!(
|
||||||
|
CAT,
|
||||||
|
imp = imp,
|
||||||
|
"Selecting program with PID {} and program number {}",
|
||||||
|
pat.program_map_pid,
|
||||||
|
pat.program_num,
|
||||||
|
);
|
||||||
|
self.pmt_pid = Some(pat.program_map_pid);
|
||||||
|
self.pmt_program_num = Some(pat.program_num);
|
||||||
|
self.pmt_pending.clear();
|
||||||
|
self.pmt_cc = None;
|
||||||
|
self.pcr_pid = None;
|
||||||
|
self.last_seen_pcr = None;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// Other programs we ignore
|
||||||
|
if !warned {
|
||||||
|
gst::warning!(
|
||||||
|
CAT,
|
||||||
|
imp = imp,
|
||||||
|
"MPEG-TS stream with multiple programs - timing will be wrong for all but first program",
|
||||||
|
);
|
||||||
|
warned = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else if Some(header.pid) == self.pmt_pid
|
||||||
|
&& Some(table_syntax_section.table_id_extension) == self.pmt_program_num
|
||||||
|
&& table_header.table_id == 0x02
|
||||||
|
{
|
||||||
|
// PMT
|
||||||
|
let pmt = section_reader
|
||||||
|
.parse::<ProgramMappingTable>()
|
||||||
|
.context("pmt")?;
|
||||||
|
gst::trace!(
|
||||||
|
CAT,
|
||||||
|
imp = imp,
|
||||||
|
"Parsed PMT for selected program number {}: {pmt:?}",
|
||||||
|
table_syntax_section.table_id_extension
|
||||||
|
);
|
||||||
|
if self.pcr_pid.map_or(true, |pcr_pid| pcr_pid != pmt.pcr_pid) {
|
||||||
|
self.pcr_pid = Some(pmt.pcr_pid);
|
||||||
|
self.last_seen_pcr = None;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Parses an MPEG-TS packet and updates the internal state
|
||||||
|
fn handle_packet(
|
||||||
|
&mut self,
|
||||||
|
imp: &MpegTsLiveSource,
|
||||||
|
slice: &[u8],
|
||||||
|
monotonic_time: Option<gst::ClockTime>,
|
||||||
|
) -> Result<()> {
|
||||||
|
let mut reader = BitReader::endian(slice, BigEndian);
|
||||||
|
|
||||||
|
let header = reader.parse::<PacketHeader>().context("packet_header")?;
|
||||||
|
|
||||||
|
// Skip corrupted packets
|
||||||
|
if header.tei {
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
|
// Skip scrambled packets
|
||||||
|
if header.tsc != 0 {
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
|
// Read adaptation field if present
|
||||||
|
if header.afc & 0x2 != 0 {
|
||||||
|
let length = reader.read_to::<u8>().context("af_length")? as usize;
|
||||||
|
let af = *reader.reader().unwrap();
|
||||||
|
if af.len() < length {
|
||||||
|
bail!("too short adaptation field");
|
||||||
|
}
|
||||||
|
let af = &af[..length];
|
||||||
|
reader.skip(8 * length as u32).context("af")?;
|
||||||
|
|
||||||
|
// Parse adaption field and update PCR if it's the PID of our selected program
|
||||||
|
if self.pcr_pid == Some(header.pid) {
|
||||||
|
let mut af_reader = BitReader::endian(af, BigEndian);
|
||||||
|
let adaptation_field = af_reader.parse::<AdaptionField>().context("af")?;
|
||||||
|
|
||||||
|
// PCR present
|
||||||
|
if let Some(pcr) = adaptation_field.pcr {
|
||||||
|
if let Some(monotonic_time) = monotonic_time {
|
||||||
|
self.store_observation(imp, pcr, monotonic_time);
|
||||||
|
} else {
|
||||||
|
gst::warning!(
|
||||||
|
CAT,
|
||||||
|
imp = imp,
|
||||||
|
"Can't handle PCR without packet capture time"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Read payload if payload if present
|
||||||
|
if header.afc & 0x1 != 0 {
|
||||||
|
let new_payload = *reader.reader().unwrap();
|
||||||
|
|
||||||
|
// Read PAT or our selected program's PMT
|
||||||
|
if header.pid == 0x00_00 || self.pmt_pid == Some(header.pid) {
|
||||||
|
let (cc, mut pending, pending_pusi) = if header.pid == 0x00_00 {
|
||||||
|
(
|
||||||
|
&mut self.pat_cc,
|
||||||
|
mem::take(&mut self.pat_pending),
|
||||||
|
self.pat_pending_pusi,
|
||||||
|
)
|
||||||
|
} else {
|
||||||
|
(
|
||||||
|
&mut self.pmt_cc,
|
||||||
|
mem::take(&mut self.pmt_pending),
|
||||||
|
self.pmt_pending_pusi,
|
||||||
|
)
|
||||||
|
};
|
||||||
|
|
||||||
|
// Clear any pending data if necessary
|
||||||
|
if header.pusi || cc.map_or(true, |cc| (cc + 1) & 0xf != header.cc) {
|
||||||
|
pending.clear();
|
||||||
|
}
|
||||||
|
*cc = Some(header.cc);
|
||||||
|
|
||||||
|
// Skip packet if this is not the start of a section
|
||||||
|
if !header.pusi && pending.is_empty() {
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
|
// Store payload for parsing, in case it's split over multiple packets
|
||||||
|
pending.extend_from_slice(new_payload);
|
||||||
|
|
||||||
|
// No payload
|
||||||
|
if pending.is_empty() {
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
|
let payload = pending.as_slice();
|
||||||
|
let mut pusi = header.pusi || pending_pusi;
|
||||||
|
let mut payload_reader = BitReader::endian(payload, BigEndian);
|
||||||
|
loop {
|
||||||
|
let remaining_payload = payload_reader.reader().unwrap();
|
||||||
|
|
||||||
|
let table_header;
|
||||||
|
if pusi {
|
||||||
|
assert!(!remaining_payload.is_empty());
|
||||||
|
let pointer_field = remaining_payload[0] as usize;
|
||||||
|
// Need more data
|
||||||
|
if payload_reader.reader().unwrap().len() < 1 + pointer_field + 3 {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Skip padding
|
||||||
|
payload_reader.skip(8 + 8 * pointer_field as u32).unwrap();
|
||||||
|
pusi = false;
|
||||||
|
// Peek table header, payload_reader stays at beginning of section header
|
||||||
|
table_header = payload_reader.clone().parse::<TableHeader>().unwrap();
|
||||||
|
} else if remaining_payload.len() < 3 {
|
||||||
|
// Need more data for table header
|
||||||
|
break;
|
||||||
|
} else {
|
||||||
|
// Peek table header, payload_reader stays at beginning of section header
|
||||||
|
table_header = payload_reader.clone().parse::<TableHeader>().unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Need more data for this section. payload_reader is still at beginning of
|
||||||
|
// section header so require 3 extra bytes
|
||||||
|
let remaining_length = payload_reader.reader().unwrap().len();
|
||||||
|
if remaining_length < 3 + table_header.section_length as usize {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Skip table header
|
||||||
|
payload_reader.skip(8 * 3).unwrap();
|
||||||
|
let section =
|
||||||
|
&payload_reader.reader().unwrap()[..table_header.section_length as usize];
|
||||||
|
// Skip whole section so the reader is at the beginning of the next section header
|
||||||
|
payload_reader
|
||||||
|
.skip(8 * table_header.section_length as u32)
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
if let Err(err) = self.handle_section(imp, &header, &table_header, section) {
|
||||||
|
gst::warning!(
|
||||||
|
CAT,
|
||||||
|
imp = imp,
|
||||||
|
"Failed parsing section {table_header:?}: {err:?}"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Skip all already parsed sections
|
||||||
|
let remaining_length = payload_reader.reader().unwrap().len();
|
||||||
|
let new_pending_range = (pending.len() - remaining_length)..pending.len();
|
||||||
|
pending.copy_within(new_pending_range, 0);
|
||||||
|
pending.resize(remaining_length, 0u8);
|
||||||
|
if header.pid == 0x00_00 {
|
||||||
|
self.pat_pending = pending;
|
||||||
|
self.pat_pending_pusi = pusi;
|
||||||
|
} else {
|
||||||
|
self.pmt_pending = pending;
|
||||||
|
self.pmt_pending_pusi = pusi;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Skip everything else
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn handle_buffer(
|
||||||
|
&mut self,
|
||||||
|
imp: &MpegTsLiveSource,
|
||||||
|
buffer: &gst::Buffer,
|
||||||
|
monotonic_time: Option<gst::ClockTime>,
|
||||||
|
) -> Result<()> {
|
||||||
|
let Ok(map) = buffer.map_readable() else {
|
||||||
|
return Ok(());
|
||||||
|
};
|
||||||
|
|
||||||
|
// Find sync byte
|
||||||
|
let Some(pos) = map.iter().position(|&b| b == 0x47) else {
|
||||||
|
bail!("Couldn't find sync byte");
|
||||||
|
};
|
||||||
|
|
||||||
|
for chunk in map[pos..].chunks_exact(188) {
|
||||||
|
self.handle_packet(imp, chunk, monotonic_time)
|
||||||
|
.context("handling buffer")?;
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -277,76 +776,10 @@ pub struct MpegTsLiveSource {
|
||||||
// Clock set on source element
|
// Clock set on source element
|
||||||
internal_clock: gst::SystemClock,
|
internal_clock: gst::SystemClock,
|
||||||
|
|
||||||
state: Mutex<MpegTSLiveSourceState>,
|
// Clock we control and expose
|
||||||
}
|
external_clock: gst::SystemClock,
|
||||||
|
|
||||||
fn find_pcr(slice: &[u8], imp: &MpegTsLiveSource) -> Result<Option<u64>> {
|
state: Mutex<State>,
|
||||||
// Find sync byte
|
|
||||||
let Some(pos) = slice.iter().position(|&b| b == 0x47) else {
|
|
||||||
bail!("Couldn't find sync byte");
|
|
||||||
};
|
|
||||||
let mut buffer_pcr = None;
|
|
||||||
|
|
||||||
for chunk in slice[pos..].chunks_exact(188) {
|
|
||||||
if chunk[0] != 0x47 {
|
|
||||||
gst::error!(CAT, imp = imp, "Lost sync");
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
let mut reader = BitReader::endian(chunk, BigEndian);
|
|
||||||
// Sync Byte
|
|
||||||
reader.skip(8)?;
|
|
||||||
// Transport Error Indicator
|
|
||||||
if reader.read_bit()? {
|
|
||||||
continue;
|
|
||||||
};
|
|
||||||
// PUSI and transport priority
|
|
||||||
reader.skip(2).context("PUSI and transport priority")?;
|
|
||||||
// PID
|
|
||||||
let pid = reader.read::<u16>(13).expect("PID");
|
|
||||||
// transport scrambling control
|
|
||||||
reader.skip(2)?;
|
|
||||||
// Adaptation field present
|
|
||||||
let af_present = reader.read_bit().context("Adaptation field present")?;
|
|
||||||
reader.skip(5)?;
|
|
||||||
if af_present {
|
|
||||||
// adaptation_field_length
|
|
||||||
if reader.read::<u8>(8).context("adaptation field length")? >= 7 {
|
|
||||||
reader.skip(3)?;
|
|
||||||
let pcr_present = reader.read_bit().context("pcr_present")?;
|
|
||||||
reader.skip(4)?;
|
|
||||||
if pcr_present {
|
|
||||||
let pcr_base = reader.read::<u64>(33).context("PCR_base")?;
|
|
||||||
reader.skip(6)?;
|
|
||||||
let pcr_ext = reader.read::<u64>(9).context("PCR_ext")?;
|
|
||||||
let pcr = pcr_base * 300 + pcr_ext;
|
|
||||||
gst::debug!(CAT, imp = imp, "PID {pid} PCR {pcr}");
|
|
||||||
buffer_pcr = Some(pcr);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(buffer_pcr)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn get_pcr_from_buffer(imp: &MpegTsLiveSource, buffer: &gst::Buffer) -> Option<u64> {
|
|
||||||
let Ok(range) = buffer.map_readable() else {
|
|
||||||
return None;
|
|
||||||
};
|
|
||||||
let buffer_pcr = match find_pcr(range.as_slice(), imp) {
|
|
||||||
Ok(pcr) => pcr,
|
|
||||||
Err(err) => {
|
|
||||||
gst::error!(CAT, imp = imp, "Failed parsing MPEG-TS packets: {err}");
|
|
||||||
return None;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
let Some(raw_pcr) = buffer_pcr else {
|
|
||||||
gst::debug!(CAT, imp = imp, "No PCR observed in {:?}", buffer);
|
|
||||||
return None;
|
|
||||||
};
|
|
||||||
Some(raw_pcr)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl MpegTsLiveSource {
|
impl MpegTsLiveSource {
|
||||||
|
@ -366,19 +799,20 @@ impl MpegTsLiveSource {
|
||||||
monotonic_time = Some(pts + base_time);
|
monotonic_time = Some(pts + base_time);
|
||||||
};
|
};
|
||||||
|
|
||||||
if let (Some(monotonic_time), Some(raw_pcr)) =
|
// Parse packets
|
||||||
(monotonic_time, get_pcr_from_buffer(self, &buffer))
|
if let Err(err) = state.handle_buffer(self, &buffer, monotonic_time) {
|
||||||
{
|
gst::warning!(CAT, imp = self, "Failed handling buffer: {err:?}");
|
||||||
if state.store_observation(self, raw_pcr, monotonic_time) {
|
}
|
||||||
let buffer = buffer.make_mut();
|
|
||||||
buffer.set_flags(gst::BufferFlags::DISCONT);
|
if mem::take(&mut state.discont_pending) {
|
||||||
}
|
let buffer = buffer.make_mut();
|
||||||
};
|
buffer.set_flags(gst::BufferFlags::DISCONT);
|
||||||
|
}
|
||||||
|
|
||||||
// Update buffer timestamp if present
|
// Update buffer timestamp if present
|
||||||
if let Some(pts) = buffer_timestamp {
|
if let Some(pts) = buffer_timestamp {
|
||||||
let buffer = buffer.make_mut();
|
let buffer = buffer.make_mut();
|
||||||
let new_pts = state
|
let new_pts = self
|
||||||
.external_clock
|
.external_clock
|
||||||
.adjust_unlocked(pts + base_time)
|
.adjust_unlocked(pts + base_time)
|
||||||
.expect("Couldn't adjust {pts}")
|
.expect("Couldn't adjust {pts}")
|
||||||
|
@ -417,20 +851,20 @@ impl MpegTsLiveSource {
|
||||||
monotonic_time = Some(pts + base_time);
|
monotonic_time = Some(pts + base_time);
|
||||||
};
|
};
|
||||||
|
|
||||||
// Store observation if pcr is present
|
// Parse packets
|
||||||
if let (Some(monotonic_time), Some(raw_pcr)) =
|
if let Err(err) = state.handle_buffer(self, &buffer, monotonic_time) {
|
||||||
(monotonic_time, get_pcr_from_buffer(self, &buffer))
|
gst::warning!(CAT, imp = self, "Failed handling buffer: {err:?}");
|
||||||
{
|
}
|
||||||
if state.store_observation(self, raw_pcr, monotonic_time) {
|
|
||||||
let buffer = buffer.make_mut();
|
if mem::take(&mut state.discont_pending) {
|
||||||
buffer.set_flags(gst::BufferFlags::DISCONT);
|
let buffer = buffer.make_mut();
|
||||||
}
|
buffer.set_flags(gst::BufferFlags::DISCONT);
|
||||||
};
|
}
|
||||||
|
|
||||||
// Update buffer timestamp if present
|
// Update buffer timestamp if present
|
||||||
if let Some(pts) = this_buffer_timestamp {
|
if let Some(pts) = this_buffer_timestamp {
|
||||||
let buffer = buffer.make_mut();
|
let buffer = buffer.make_mut();
|
||||||
let new_pts = state
|
let new_pts = self
|
||||||
.external_clock
|
.external_clock
|
||||||
.adjust_unlocked(pts + base_time)
|
.adjust_unlocked(pts + base_time)
|
||||||
.expect("Couldn't adjust {pts}")
|
.expect("Couldn't adjust {pts}")
|
||||||
|
@ -495,13 +929,8 @@ impl ObjectSubclass for MpegTsLiveSource {
|
||||||
Self {
|
Self {
|
||||||
srcpad,
|
srcpad,
|
||||||
internal_clock,
|
internal_clock,
|
||||||
state: Mutex::new(MpegTSLiveSourceState {
|
external_clock,
|
||||||
source: None,
|
state: Mutex::new(State::default()),
|
||||||
external_clock,
|
|
||||||
last_seen_pcr: None,
|
|
||||||
base_pcr: None,
|
|
||||||
base_monotonic: None,
|
|
||||||
}),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -565,8 +994,7 @@ impl ObjectImpl for MpegTsLiveSource {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
"window-size" => {
|
"window-size" => {
|
||||||
let state = self.state.lock().unwrap();
|
self.external_clock.set_window_size(value.get().unwrap());
|
||||||
state.external_clock.set_window_size(value.get().unwrap());
|
|
||||||
}
|
}
|
||||||
_ => unimplemented!(),
|
_ => unimplemented!(),
|
||||||
}
|
}
|
||||||
|
@ -575,13 +1003,7 @@ impl ObjectImpl for MpegTsLiveSource {
|
||||||
fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
|
fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
|
||||||
match pspec.name() {
|
match pspec.name() {
|
||||||
"source" => self.state.lock().unwrap().source.to_value(),
|
"source" => self.state.lock().unwrap().source.to_value(),
|
||||||
"window-size" => self
|
"window-size" => self.external_clock.window_size().to_value(),
|
||||||
.state
|
|
||||||
.lock()
|
|
||||||
.unwrap()
|
|
||||||
.external_clock
|
|
||||||
.window_size()
|
|
||||||
.to_value(),
|
|
||||||
_ => unimplemented!(),
|
_ => unimplemented!(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -629,8 +1051,7 @@ impl ElementImpl for MpegTsLiveSource {
|
||||||
gst::error!(CAT, "We can only control live sources");
|
gst::error!(CAT, "We can only control live sources");
|
||||||
return Err(gst::StateChangeError);
|
return Err(gst::StateChangeError);
|
||||||
} else if transition == gst::StateChange::PausedToReady {
|
} else if transition == gst::StateChange::PausedToReady {
|
||||||
let mut state = self.state.lock().expect("Could get state");
|
self.external_clock.set_calibration(
|
||||||
state.external_clock.set_calibration(
|
|
||||||
gst::ClockTime::from_nseconds(0),
|
gst::ClockTime::from_nseconds(0),
|
||||||
gst::ClockTime::from_nseconds(0),
|
gst::ClockTime::from_nseconds(0),
|
||||||
1,
|
1,
|
||||||
|
@ -638,12 +1059,10 @@ impl ElementImpl for MpegTsLiveSource {
|
||||||
);
|
);
|
||||||
// Hack to flush out observations, we set the window-size to the
|
// Hack to flush out observations, we set the window-size to the
|
||||||
// same value
|
// same value
|
||||||
state
|
self.external_clock
|
||||||
.external_clock
|
.set_window_size(self.external_clock.window_size());
|
||||||
.set_window_size(state.external_clock.window_size());
|
|
||||||
state.last_seen_pcr = None;
|
*self.state.lock().unwrap() = State::default();
|
||||||
state.base_monotonic = None;
|
|
||||||
state.base_pcr = None;
|
|
||||||
}
|
}
|
||||||
Ok(ret)
|
Ok(ret)
|
||||||
}
|
}
|
||||||
|
@ -651,13 +1070,7 @@ impl ElementImpl for MpegTsLiveSource {
|
||||||
fn set_clock(&self, clock: Option<&gst::Clock>) -> bool {
|
fn set_clock(&self, clock: Option<&gst::Clock>) -> bool {
|
||||||
// We only accept our clock
|
// We only accept our clock
|
||||||
if let Some(proposed) = clock {
|
if let Some(proposed) = clock {
|
||||||
if *proposed
|
if *proposed != self.external_clock {
|
||||||
!= self
|
|
||||||
.state
|
|
||||||
.lock()
|
|
||||||
.expect("Couldn't get state")
|
|
||||||
.external_clock
|
|
||||||
{
|
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -665,8 +1078,7 @@ impl ElementImpl for MpegTsLiveSource {
|
||||||
}
|
}
|
||||||
|
|
||||||
fn provide_clock(&self) -> Option<gst::Clock> {
|
fn provide_clock(&self) -> Option<gst::Clock> {
|
||||||
let state = self.state.lock().expect("Couldn't get state");
|
Some(self.external_clock.clone().upcast())
|
||||||
Some(state.external_clock.clone().upcast())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn metadata() -> Option<&'static gst::subclass::ElementMetadata> {
|
fn metadata() -> Option<&'static gst::subclass::ElementMetadata> {
|
||||||
|
|
Loading…
Reference in a new issue