mirror of
https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs.git
synced 2025-02-24 16:46:25 +00:00
mpegtslivesrc: Take adaptation field discontinuity flag into account
Fixes https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/issues/653 Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/2105>
This commit is contained in:
parent
16bbe13f38
commit
632fe83be3
2 changed files with 66 additions and 10 deletions
|
@ -401,11 +401,12 @@ impl State {
|
|||
&mut self,
|
||||
imp: &MpegTsLiveSource,
|
||||
header: &PacketHeader,
|
||||
adaptation_field: Option<&AdaptionField>,
|
||||
payload: &[u8],
|
||||
) -> Result<()> {
|
||||
// Read PAT or our selected program's PMT
|
||||
if header.pid == 0x00_00 {
|
||||
self.pat_parser.push(header, payload);
|
||||
self.pat_parser.push(header, adaptation_field, payload);
|
||||
|
||||
loop {
|
||||
match self.pat_parser.parse() {
|
||||
|
@ -461,7 +462,7 @@ impl State {
|
|||
}
|
||||
}
|
||||
} else if self.pat.as_ref().map(|pat| pat.program_map_pid) == Some(header.pid) {
|
||||
self.pmt_parser.push(header, payload);
|
||||
self.pmt_parser.push(header, adaptation_field, payload);
|
||||
|
||||
loop {
|
||||
match self.pmt_parser.parse() {
|
||||
|
@ -528,6 +529,8 @@ impl State {
|
|||
return Ok(());
|
||||
}
|
||||
|
||||
let mut adaptation_field = None;
|
||||
|
||||
// Read adaptation field if present
|
||||
if header.afc & 0x2 != 0 {
|
||||
let length = reader.read_to::<u8>().context("af_length")? as usize;
|
||||
|
@ -543,10 +546,22 @@ impl State {
|
|||
// Parse adaption field and update PCR if it's the PID of our selected program
|
||||
if self.pmt.as_ref().map(|pmt| pmt.pcr_pid) == Some(header.pid) {
|
||||
let mut af_reader = BitReader::endian(af, BigEndian);
|
||||
let adaptation_field = af_reader.parse::<AdaptionField>().context("af")?;
|
||||
let af = af_reader.parse::<AdaptionField>().context("af")?;
|
||||
|
||||
// PCR present
|
||||
if let Some(pcr) = adaptation_field.pcr {
|
||||
if let Some(pcr) = af.pcr {
|
||||
if af.discontinuity_flag {
|
||||
gst::debug!(
|
||||
CAT,
|
||||
imp = imp,
|
||||
"Discontinuity signalled, resetting PCR observations"
|
||||
);
|
||||
|
||||
self.base_pcr = None;
|
||||
self.base_external = None;
|
||||
self.last_seen_pcr = None;
|
||||
}
|
||||
|
||||
if let Some(monotonic_time) = monotonic_time {
|
||||
self.store_observation(imp, pcr, monotonic_time);
|
||||
} else {
|
||||
|
@ -556,7 +571,11 @@ impl State {
|
|||
"Can't handle PCR without packet capture time"
|
||||
);
|
||||
}
|
||||
} else if af.discontinuity_flag {
|
||||
gst::debug!(CAT, imp = imp, "Discontinuity signalled");
|
||||
}
|
||||
|
||||
adaptation_field = Some(af);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -568,9 +587,23 @@ impl State {
|
|||
if header.pid == 0x00_00
|
||||
|| self.pat.as_ref().map(|pat| pat.program_map_pid) == Some(header.pid)
|
||||
{
|
||||
self.handle_section(imp, &header, new_payload)?;
|
||||
self.handle_section(imp, &header, adaptation_field.as_ref(), new_payload)?;
|
||||
} else if let Some(stream) = self.streams.get_mut(&header.pid) {
|
||||
stream.pes_parser.push(&header, new_payload);
|
||||
if adaptation_field
|
||||
.as_ref()
|
||||
.is_some_and(|af| af.discontinuity_flag)
|
||||
{
|
||||
gst::debug!(
|
||||
CAT,
|
||||
imp = imp,
|
||||
"Discontinuity signalled for PID {}, forwarding discont",
|
||||
header.pid,
|
||||
);
|
||||
}
|
||||
|
||||
stream
|
||||
.pes_parser
|
||||
.push(&header, adaptation_field.as_ref(), new_payload);
|
||||
|
||||
loop {
|
||||
match stream.pes_parser.parse() {
|
||||
|
|
|
@ -40,9 +40,18 @@ impl SectionParser {
|
|||
/// Push PSI `payload`.
|
||||
///
|
||||
/// After this call `parse()` until `None` is returned.
|
||||
pub fn push(&mut self, header: &PacketHeader, payload: &[u8]) {
|
||||
pub fn push(
|
||||
&mut self,
|
||||
header: &PacketHeader,
|
||||
adaptation_field: Option<&AdaptionField>,
|
||||
|
||||
payload: &[u8],
|
||||
) {
|
||||
if header.pusi {
|
||||
self.clear();
|
||||
} else if adaptation_field.is_some_and(|af| af.discontinuity_flag) {
|
||||
// discontinuity_flag only defines that there is an expected discountinuity in the
|
||||
// continuity counter but the actual data is continuous
|
||||
} else if self.cc.map_or(true, |cc| (cc + 1) & 0xf != header.cc) {
|
||||
self.clear();
|
||||
self.waiting_for_pusi = true;
|
||||
|
@ -414,9 +423,18 @@ impl PESParser {
|
|||
/// Push PES `payload`.
|
||||
///
|
||||
/// After this call `parse()` until `None` is returned.
|
||||
pub fn push(&mut self, header: &PacketHeader, payload: &[u8]) {
|
||||
pub fn push(
|
||||
&mut self,
|
||||
header: &PacketHeader,
|
||||
adaptation_field: Option<&AdaptionField>,
|
||||
|
||||
payload: &[u8],
|
||||
) {
|
||||
if header.pusi {
|
||||
self.clear();
|
||||
} else if adaptation_field.is_some_and(|af| af.discontinuity_flag) {
|
||||
// discontinuity_flag only defines that there is an expected discountinuity in the
|
||||
// continuity counter but the actual data is continuous
|
||||
} else if self.cc.map_or(true, |cc| (cc + 1) & 0xf != header.cc) {
|
||||
self.clear();
|
||||
self.waiting_for_pusi = true;
|
||||
|
@ -634,6 +652,7 @@ impl FromBitStream for PacketHeader {
|
|||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct AdaptionField {
|
||||
pub discontinuity_flag: bool,
|
||||
pub pcr: Option<u64>,
|
||||
// Add other fields as needed
|
||||
}
|
||||
|
@ -645,7 +664,8 @@ impl FromBitStream for AdaptionField {
|
|||
where
|
||||
Self: Sized,
|
||||
{
|
||||
r.skip(3).context("flags")?;
|
||||
let discontinuity_flag = r.read_bit().context("discontinuity_flag")?;
|
||||
r.skip(2).context("flags")?;
|
||||
let pcr_present = r.read_bit().context("pcr_present")?;
|
||||
r.skip(4).context("flags")?;
|
||||
|
||||
|
@ -661,6 +681,9 @@ impl FromBitStream for AdaptionField {
|
|||
|
||||
// Skip all other parts of the adaptation field for now
|
||||
|
||||
Ok(AdaptionField { pcr })
|
||||
Ok(AdaptionField {
|
||||
discontinuity_flag,
|
||||
pcr,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue