rtp: av1depay: Parse internal size fields of OBUs and handle them

They're not recommended by the spec to include in the RTP packets but it
is valid to include them. Pion is including them.

When parsing the size fields also make sure to only take that much of a
payload unit and to skip any trailing data (which should not exist in
the first place).

Pion is also currently storing multiple OBUs in a single payload unit,
which is not allowed by the spec but can be easily handled with this
code now.

Fixes https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/issues/560

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1621>
This commit is contained in:
Sebastian Dröge 2024-06-13 14:36:57 +03:00
parent dd4c466265
commit c821130947
2 changed files with 136 additions and 53 deletions

View file

@ -20,6 +20,8 @@ pub struct UnsizedObu {
pub header_len: u32,
/// indicates that only part of this OBU has been processed so far
pub is_fragment: bool,
/// OBU size field, if existing
pub size: Option<(u32, u32)>,
}
#[derive(Default, Debug, Clone, Copy, PartialEq, Eq)]
@ -81,13 +83,7 @@ impl UnsizedObu {
let obu_type = reader.read::<u8>(4)?.into();
let has_extension = reader.read_bit()?;
// make sure there is no size field
if reader.read_bit()? {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"did not expect size field",
));
}
let has_size_field = reader.read_bit()?;
// ignore the reserved bit
let _ = reader.read_bit()?;
@ -100,12 +96,19 @@ impl UnsizedObu {
reader.byte_align();
let size = if has_size_field {
Some(parse_leb128(reader)?)
} else {
None
};
Ok(Self {
obu_type,
has_extension,
temporal_id,
spatial_id,
header_len: has_extension as u32 + 1,
size,
is_fragment: false,
})
}
@ -115,7 +118,7 @@ impl UnsizedObu {
SizedObu {
obu_type: self.obu_type,
has_extension: self.has_extension,
has_size_field: false,
has_size_field: self.size.is_some(),
temporal_id: self.temporal_id,
spatial_id: self.spatial_id,
size,
@ -259,6 +262,7 @@ mod tests {
spatial_id: 0,
header_len: 1,
is_fragment: false,
size: None,
},
vec![0b0001_0000],
),
@ -283,6 +287,7 @@ mod tests {
spatial_id: 0,
header_len: 1,
is_fragment: false,
size: None,
},
vec![0b0111_1000, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
),
@ -307,6 +312,7 @@ mod tests {
spatial_id: 3,
header_len: 2,
is_fragment: false,
size: None,
},
vec![0b0011_0100, 0b1001_1000, 1, 2, 3, 4, 5],
),

View file

@ -24,8 +24,6 @@ use crate::av1::common::{
UnsizedObu, CLOCK_RATE, ENDIANNESS,
};
// TODO: handle internal size fields in RTP OBUs
#[derive(Debug)]
struct State {
last_timestamp: Option<u32>,
@ -36,7 +34,7 @@ struct State {
/// if we saw a valid OBU since the last reset
found_valid_obu: bool,
/// holds data for a fragment
obu_fragment: Option<(UnsizedObu, Vec<u8>)>,
obu_fragment: Option<Vec<u8>>,
}
impl Default for State {
@ -221,6 +219,8 @@ impl RTPAv1Depay {
AggregationHeader::from(&byte)
};
gst::trace!(CAT, imp: self, "Aggregation header {aggr_header:?}");
// handle new temporal units
if state.marked_packet || state.last_timestamp != Some(rtp.timestamp()) {
if state.last_timestamp.is_some() && state.obu_fragment.is_some() {
@ -257,7 +257,7 @@ impl RTPAv1Depay {
self.reset(&mut state);
}
if let Some((obu, ref mut bytes)) = &mut state.obu_fragment {
if let Some(ref mut bytes) = &mut state.obu_fragment {
assert!(aggr_header.leading_fragment);
let (element_size, is_last_obu) = self
.find_element_info(rtp, &mut reader, &aggr_header, idx)
@ -269,20 +269,10 @@ impl RTPAv1Depay {
.read_exact(&mut bytes[bytes_end..])
.map_err(err_flow!(self, buf_read))?;
// if this OBU is complete, it can be appended to the adapter
if !(is_last_obu && aggr_header.trailing_fragment) {
let full_obu = {
let size = bytes.len() as u32 - obu.header_len;
let leb_size = leb128_size(size) as u32;
obu.as_sized(size, leb_size)
};
self.translate_obu(
&mut Cursor::new(bytes.as_slice()),
&full_obu,
&mut ready_obus,
)?;
state.obu_fragment = None;
// if this OBU is complete, it can be output
if !is_last_obu || !aggr_header.trailing_fragment {
let obu_fragment = state.obu_fragment.take().unwrap();
self.translate_obus(&mut state, &mut Cursor::new(&obu_fragment), &mut ready_obus)?;
}
idx += 1;
@ -308,25 +298,6 @@ impl RTPAv1Depay {
continue;
}
let header_pos = reader.position();
let mut bitreader = BitReader::endian(&mut reader, ENDIANNESS);
let obu = UnsizedObu::parse(&mut bitreader).map_err(err_flow!(self, obu_read))?;
reader
.seek(SeekFrom::Start(header_pos))
.map_err(err_flow!(self, buf_read))?;
state.found_valid_obu = true;
// ignore these OBU types
if matches!(obu.obu_type, ObuType::TemporalDelimiter | ObuType::TileList) {
reader
.seek(SeekFrom::Current(element_size as i64))
.map_err(err_flow!(self, buf_read))?;
idx += 1;
continue;
}
// trailing OBU fragments are stored in the state
if is_last_obu && aggr_header.trailing_fragment {
let bytes_left = rtp.payload_size() - (reader.position() as u32);
@ -335,17 +306,30 @@ impl RTPAv1Depay {
.read_exact(bytes.as_mut_slice())
.map_err(err_flow!(self, buf_read))?;
state.obu_fragment = Some((obu, bytes));
state.obu_fragment = Some(bytes);
}
// full OBUs elements are translated and appended to the ready OBUs
else {
let full_obu = {
let size = element_size - obu.header_len;
let leb_size = leb128_size(size) as u32;
obu.as_sized(size, leb_size)
};
let remaining_slice = &reader.get_ref()[reader.position() as usize..];
if remaining_slice.len() < element_size as usize {
gst::error!(
CAT,
imp: self,
"invalid packet: not enough data left for OBU {idx} (needed {element_size}, have {})",
remaining_slice.len(),
);
self.reset(&mut state);
break;
}
self.translate_obus(
&mut state,
&mut Cursor::new(&remaining_slice[..element_size as usize]),
&mut ready_obus,
)?;
self.translate_obu(&mut reader, &full_obu, &mut ready_obus)?;
reader
.seek(SeekFrom::Current(element_size as i64))
.map_err(err_flow!(self, buf_read))?;
}
idx += 1;
@ -444,7 +428,8 @@ impl RTPAv1Depay {
Ok((element_size, is_last_obu))
}
/// Using OBU data from an RTP packet, construct a buffer containing that OBU in AV1 bitstream format
/// Using a single OBU element from one or more RTP packets, construct a buffer containing that
/// OBU in AV1 bitstream format with size field
fn translate_obu(
&self,
reader: &mut Cursor<&[u8]>,
@ -486,6 +471,98 @@ impl RTPAv1Depay {
Ok(())
}
/// Using a complete payload unit from one or more RTP packets, construct a buffer containing
/// the contained OBU(s) in AV1 bitstream format with size field.
///
/// Theoretically this should only contain a single OBU but Pion is sometimes putting multiple
/// OBUs into one payload unit and we can easily support this here despite it not being allowed
/// by the specification.
fn translate_obus(
&self,
state: &mut State,
reader: &mut Cursor<&[u8]>,
w: &mut Vec<u8>,
) -> Result<(), gst::FlowError> {
let mut first = true;
while (reader.position() as usize) < reader.get_ref().len() {
let header_pos = reader.position();
let mut bitreader = BitReader::endian(&mut *reader, ENDIANNESS);
let obu = match UnsizedObu::parse(&mut bitreader).map_err(err_flow!(self, obu_read)) {
Ok(obu) => obu,
Err(err) => {
if first {
return Err(err);
} else {
gst::warning!(CAT, imp: self, "Trailing payload unit is not a valid OBU");
return Ok(());
}
}
};
reader
.seek(SeekFrom::Start(header_pos))
.map_err(err_flow!(self, buf_read))?;
gst::trace!(CAT, imp: self, "Handling OBU {obu:?}");
let remaining_slice = &reader.get_ref()[reader.position() as usize..];
let element_size = if let Some((size, leb_size)) = obu.size {
let size = (size + leb_size + obu.header_len) as usize;
if size > remaining_slice.len() {
if first {
gst::warning!(CAT, imp: self, "Payload unit starts with an incomplete OBU");
return Err(gst::FlowError::Error);
} else {
gst::warning!(CAT, imp: self, "Trailing payload unit is an incomplete OBU");
return Ok(());
}
}
if !first {
gst::debug!(CAT, imp: self, "Multiple OBUs in a single payload unit");
}
size
} else {
remaining_slice.len()
};
state.found_valid_obu = true;
first = false;
// ignore these OBU types
if matches!(obu.obu_type, ObuType::TemporalDelimiter | ObuType::TileList) {
gst::trace!(CAT, imp: self, "Dropping {:?} of size {element_size}", obu.obu_type);
reader
.seek(SeekFrom::Current(element_size as i64))
.map_err(err_flow!(self, buf_read))?;
continue;
}
let full_obu = {
if let Some((size, leb_size)) = obu.size {
obu.as_sized(size, leb_size)
} else {
let size = element_size as u32 - obu.header_len;
let leb_size = leb128_size(size) as u32;
obu.as_sized(size, leb_size)
}
};
self.translate_obu(
&mut Cursor::new(&remaining_slice[..element_size]),
&full_obu,
w,
)?;
reader
.seek(SeekFrom::Current(element_size as i64))
.map_err(err_flow!(self, buf_read))?;
}
Ok(())
}
}
#[cfg(test)]