rtpav1depay: Fix error handling

Don't error out immediately on errors anymore but try again with the
next packet.

Fixes https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/issues/289

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1072>
This commit is contained in:
Sebastian Dröge 2023-02-01 22:01:53 +02:00
parent ed4e9a50d5
commit 1756d7a516
2 changed files with 56 additions and 79 deletions

View file

@ -10,13 +10,13 @@
macro_rules! err_flow {
($imp:ident, read, $msg:literal) => {
|err| {
gst::element_imp_error!($imp, gst::ResourceError::Read, [$msg, err]);
gst::element_imp_warning!($imp, gst::ResourceError::Read, [$msg, err]);
gst::FlowError::Error
}
};
($imp:ident, write, $msg:literal) => {
|err| {
gst::element_imp_error!($imp, gst::ResourceError::Write, [$msg, err]);
gst::element_imp_warning!($imp, gst::ResourceError::Write, [$msg, err]);
gst::FlowError::Error
}
};
@ -44,51 +44,24 @@ macro_rules! err_flow {
($imp:ident, outbuf_alloc) => {
err_flow!($imp, write, "Failed to allocate output buffer: {}")
};
}
macro_rules! err_opt {
($imp:ident, read, $msg:literal) => {
|err| {
gst::element_imp_error!($imp, gst::ResourceError::Read, [$msg, err]);
Option::<()>::None
}
};
($imp:ident, write, $msg:literal) => {
|err| {
gst::element_imp_error!($imp, gst::ResourceError::Write, [$msg, err]);
Option::<()>::None
}
};
($imp:ident, buf_alloc) => {
err_opt!($imp, write, "Failed to allocate new buffer: {}")
};
($imp:ident, payload_buf) => {
err_opt!($imp, read, "Failed to get RTP payload buffer: {}")
};
($imp:ident, payload_map) => {
err_opt!($imp, read, "Failed to map payload as readable: {}")
};
($imp:ident, buf_take) => {
err_opt!($imp, read, "Failed to take buffer from adapter: {}")
err_flow!($imp, read, "Failed to get RTP payload buffer: {}")
};
($imp:ident, aggr_header_read) => {
err_opt!($imp, read, "Failed to read aggregation header: {}")
err_flow!($imp, read, "Failed to read aggregation header: {}")
};
($imp:ident, find_element) => {
err_flow!($imp, read, "Failed to find OBU element in packet: {}")
};
($imp:ident, leb_read) => {
err_opt!($imp, read, "Failed to read leb128 size field: {}")
err_flow!($imp, read, "Failed to read leb128 size field: {}")
};
($imp:ident, leb_write) => {
err_opt!($imp, read, "Failed to write leb128 size field: {}")
err_flow!($imp, read, "Failed to write leb128 size field: {}")
};
($imp:ident, obu_read) => {
err_opt!($imp, read, "Failed to read OBU header: {}")
};
($imp:ident, buf_read) => {
err_opt!($imp, read, "Failed to read RTP buffer: {}")
err_flow!($imp, read, "Failed to read OBU header: {}")
};
}
pub(crate) use err_flow;
pub(crate) use err_opt;

View file

@ -20,7 +20,7 @@ use bitstream_io::{BitReader, BitWriter};
use once_cell::sync::Lazy;
use crate::av1::common::{
err_opt, leb128_size, parse_leb128, write_leb128, AggregationHeader, ObuType, SizedObu,
err_flow, leb128_size, parse_leb128, write_leb128, AggregationHeader, ObuType, SizedObu,
UnsizedObu, CLOCK_RATE, ENDIANNESS,
};
@ -177,6 +177,20 @@ impl RTPBaseDepayloadImpl for RTPAv1Depay {
&self,
rtp: &gst_rtp::RTPBuffer<gst_rtp::rtp_buffer::Readable>,
) -> Option<gst::Buffer> {
if let Err(err) = self.handle_rtp_packet(rtp) {
gst::warning!(CAT, imp: self, "Failed to handle RTP packet: {err:?}");
self.reset(&mut self.state.lock().unwrap());
}
None
}
}
impl RTPAv1Depay {
fn handle_rtp_packet(
&self,
rtp: &gst_rtp::RTPBuffer<gst_rtp::rtp_buffer::Readable>,
) -> Result<(), gst::FlowError> {
gst::log!(
CAT,
imp: self,
@ -185,7 +199,7 @@ impl RTPBaseDepayloadImpl for RTPAv1Depay {
rtp.buffer().size(),
);
let payload = rtp.payload().map_err(err_opt!(self, payload_buf)).ok()?;
let payload = rtp.payload().map_err(err_flow!(self, payload_buf))?;
let mut state = self.state.lock().unwrap();
@ -201,8 +215,7 @@ impl RTPBaseDepayloadImpl for RTPAv1Depay {
let mut byte = [0; 1];
reader
.read_exact(&mut byte)
.map_err(err_opt!(self, aggr_header_read))
.ok()?;
.map_err(err_flow!(self, aggr_header_read))?;
AggregationHeader::from(&byte)
};
@ -237,22 +250,22 @@ impl RTPBaseDepayloadImpl for RTPAv1Depay {
gst::error!(
CAT,
imp: self,
"invalid packet: ignores unclosed OBU fragment"
"invalid packet: dropping unclosed OBU fragment"
);
self.reset(&mut state);
}
if let Some((obu, ref mut bytes)) = &mut state.obu_fragment {
assert!(aggr_header.leading_fragment);
let (element_size, is_last_obu) =
self.find_element_info(rtp, &mut reader, &aggr_header, idx)?;
let (element_size, is_last_obu) = self
.find_element_info(rtp, &mut reader, &aggr_header, idx)
.map_err(err_flow!(self, find_element))?;
let bytes_end = bytes.len();
bytes.resize(bytes_end + element_size as usize, 0);
reader
.read_exact(&mut bytes[bytes_end..])
.map_err(err_opt!(self, buf_read))
.ok()?;
.map_err(err_flow!(self, buf_read))?;
// if this OBU is complete, it can be appended to the adapter
if !(is_last_obu && aggr_header.trailing_fragment) {
@ -278,21 +291,17 @@ impl RTPBaseDepayloadImpl for RTPAv1Depay {
let header_pos = reader.position();
let mut bitreader = BitReader::endian(&mut reader, ENDIANNESS);
let obu = UnsizedObu::parse(&mut bitreader)
.map_err(err_opt!(self, obu_read))
.ok()?;
let obu = UnsizedObu::parse(&mut bitreader).map_err(err_flow!(self, obu_read))?;
reader
.seek(SeekFrom::Start(header_pos))
.map_err(err_opt!(self, buf_read))
.ok()?;
.map_err(err_flow!(self, buf_read))?;
// ignore these OBU types
if matches!(obu.obu_type, ObuType::TemporalDelimiter | ObuType::TileList) {
reader
.seek(SeekFrom::Current(element_size as i64))
.map_err(err_opt!(self, buf_read))
.ok()?;
.map_err(err_flow!(self, buf_read))?;
idx += 1;
continue;
}
@ -303,8 +312,7 @@ impl RTPBaseDepayloadImpl for RTPAv1Depay {
let mut bytes = vec![0; bytes_left as usize];
reader
.read_exact(bytes.as_mut_slice())
.map_err(err_opt!(self, buf_read))
.ok()?;
.map_err(err_flow!(self, buf_read))?;
state.obu_fragment = Some((obu, bytes));
}
@ -327,7 +335,7 @@ impl RTPBaseDepayloadImpl for RTPAv1Depay {
gst::log!(
CAT,
imp: self,
"creating buffer containing {} bytes of data (marker {}, discont {})...",
"Creating buffer containing {} bytes of data (marker {}, discont {})...",
ready_obus.len(),
state.marked_packet,
state.needs_discont,
@ -358,17 +366,20 @@ impl RTPBaseDepayloadImpl for RTPAv1Depay {
imp: self,
concat!(
"invalid packet: has marker bit set, but ",
"last OBU is not yet complete"
"last OBU is not yet complete. Dropping incomplete OBU."
)
);
self.reset(&mut state);
}
drop(state);
buffer
}
if let Some(buffer) = buffer {
self.obj().push(buffer)?;
}
Ok(())
}
impl RTPAv1Depay {
/// Find out the next OBU element's size, and if it is the last OBU in the packet.
/// The reader is expected to be at the first byte of the element,
/// or its preceding size field if present,
@ -379,7 +390,7 @@ impl RTPAv1Depay {
reader: &mut Cursor<&[u8]>,
aggr_header: &AggregationHeader,
index: u32,
) -> Option<(u32, bool)> {
) -> Result<(u32, bool), gst::FlowError> {
let element_size: u32;
let is_last_obu: bool;
@ -389,14 +400,11 @@ impl RTPAv1Depay {
rtp.payload_size() - (reader.position() as u32)
} else {
let mut bitreader = BitReader::endian(reader, ENDIANNESS);
parse_leb128(&mut bitreader)
.map_err(err_opt!(self, leb_read))
.ok()?
parse_leb128(&mut bitreader).map_err(err_flow!(self, leb_read))?
}
} else {
element_size = parse_leb128(&mut BitReader::endian(&mut *reader, ENDIANNESS))
.map_err(err_opt!(self, leb_read))
.ok()?;
.map_err(err_flow!(self, leb_read))?;
is_last_obu = match rtp
.payload_size()
.cmp(&(reader.position() as u32 + element_size))
@ -409,12 +417,12 @@ impl RTPAv1Depay {
imp: self,
"invalid packet: size field gives impossibly large OBU size"
);
return None;
return Err(gst::FlowError::Error);
}
};
}
Some((element_size, is_last_obu))
Ok((element_size, is_last_obu))
}
/// Using OBU data from an RTP packet, construct a buffer containing that OBU in AV1 bitstream format
@ -423,7 +431,7 @@ impl RTPAv1Depay {
reader: &mut Cursor<&[u8]>,
obu: &SizedObu,
w: &mut Vec<u8>,
) -> Option<usize> {
) -> Result<(), gst::FlowError> {
let pos = w.len();
w.resize(pos + obu.full_size() as usize, 0);
let bytes = &mut w[pos..];
@ -431,8 +439,7 @@ impl RTPAv1Depay {
// write OBU header
reader
.read_exact(&mut bytes[..obu.header_len as usize])
.map_err(err_opt!(self, buf_read))
.ok()?;
.map_err(err_flow!(self, buf_read))?;
// set `has_size_field`
bytes[0] |= 1 << 1;
@ -440,8 +447,7 @@ impl RTPAv1Depay {
// skip internal size field if present
if obu.has_size_field {
parse_leb128(&mut BitReader::endian(&mut *reader, ENDIANNESS))
.map_err(err_opt!(self, leb_read))
.ok()?;
.map_err(err_flow!(self, leb_read))?;
}
// write size field
@ -452,16 +458,14 @@ impl RTPAv1Depay {
),
obu.size,
)
.map_err(err_opt!(self, leb_write))
.ok()?;
.map_err(err_flow!(self, leb_write))?;
// write OBU payload
reader
.read_exact(&mut bytes[(obu.header_len + obu.leb_size) as usize..])
.map_err(err_opt!(self, buf_read))
.ok()?;
.map_err(err_flow!(self, buf_read))?;
Some(obu.full_size() as usize)
Ok(())
}
}
@ -584,7 +588,7 @@ mod tests {
println!("testing element {} with reader position {}...", obu_idx, reader.position());
let actual = element.imp().find_element_info(&rtp, &mut reader, &aggr_header, obu_idx as u32);
assert_eq!(actual, Some(expected));
assert_eq!(actual, Ok(expected));
element_size = actual.unwrap().0;
}
}