diff --git a/net/rtp/src/av1/common/error.rs b/net/rtp/src/av1/common/error.rs index 29d1026a..3867bb07 100644 --- a/net/rtp/src/av1/common/error.rs +++ b/net/rtp/src/av1/common/error.rs @@ -10,13 +10,13 @@ macro_rules! err_flow { ($imp:ident, read, $msg:literal) => { |err| { - gst::element_imp_error!($imp, gst::ResourceError::Read, [$msg, err]); + gst::element_imp_warning!($imp, gst::ResourceError::Read, [$msg, err]); gst::FlowError::Error } }; ($imp:ident, write, $msg:literal) => { |err| { - gst::element_imp_error!($imp, gst::ResourceError::Write, [$msg, err]); + gst::element_imp_warning!($imp, gst::ResourceError::Write, [$msg, err]); gst::FlowError::Error } }; @@ -44,51 +44,24 @@ macro_rules! err_flow { ($imp:ident, outbuf_alloc) => { err_flow!($imp, write, "Failed to allocate output buffer: {}") }; -} - -macro_rules! err_opt { - ($imp:ident, read, $msg:literal) => { - |err| { - gst::element_imp_error!($imp, gst::ResourceError::Read, [$msg, err]); - Option::<()>::None - } - }; - ($imp:ident, write, $msg:literal) => { - |err| { - gst::element_imp_error!($imp, gst::ResourceError::Write, [$msg, err]); - Option::<()>::None - } - }; - - ($imp:ident, buf_alloc) => { - err_opt!($imp, write, "Failed to allocate new buffer: {}") - }; - ($imp:ident, payload_buf) => { - err_opt!($imp, read, "Failed to get RTP payload buffer: {}") - }; - ($imp:ident, payload_map) => { - err_opt!($imp, read, "Failed to map payload as readable: {}") - }; - ($imp:ident, buf_take) => { - err_opt!($imp, read, "Failed to take buffer from adapter: {}") + err_flow!($imp, read, "Failed to get RTP payload buffer: {}") }; ($imp:ident, aggr_header_read) => { - err_opt!($imp, read, "Failed to read aggregation header: {}") + err_flow!($imp, read, "Failed to read aggregation header: {}") + }; + ($imp:ident, find_element) => { + err_flow!($imp, read, "Failed to find OBU element in packet: {}") }; ($imp:ident, leb_read) => { - err_opt!($imp, read, "Failed to read leb128 size field: {}") + err_flow!($imp, read, "Failed to read leb128 size field: {}") }; ($imp:ident, leb_write) => { - err_opt!($imp, read, "Failed to write leb128 size field: {}") + err_flow!($imp, read, "Failed to write leb128 size field: {}") }; ($imp:ident, obu_read) => { - err_opt!($imp, read, "Failed to read OBU header: {}") - }; - ($imp:ident, buf_read) => { - err_opt!($imp, read, "Failed to read RTP buffer: {}") + err_flow!($imp, read, "Failed to read OBU header: {}") }; } pub(crate) use err_flow; -pub(crate) use err_opt; diff --git a/net/rtp/src/av1/depay/imp.rs b/net/rtp/src/av1/depay/imp.rs index 5ae3e305..b9b0ba3d 100644 --- a/net/rtp/src/av1/depay/imp.rs +++ b/net/rtp/src/av1/depay/imp.rs @@ -20,7 +20,7 @@ use bitstream_io::{BitReader, BitWriter}; use once_cell::sync::Lazy; use crate::av1::common::{ - err_opt, leb128_size, parse_leb128, write_leb128, AggregationHeader, ObuType, SizedObu, + err_flow, leb128_size, parse_leb128, write_leb128, AggregationHeader, ObuType, SizedObu, UnsizedObu, CLOCK_RATE, ENDIANNESS, }; @@ -177,6 +177,20 @@ impl RTPBaseDepayloadImpl for RTPAv1Depay { &self, rtp: &gst_rtp::RTPBuffer, ) -> Option { + if let Err(err) = self.handle_rtp_packet(rtp) { + gst::warning!(CAT, imp: self, "Failed to handle RTP packet: {err:?}"); + self.reset(&mut self.state.lock().unwrap()); + } + + None + } +} + +impl RTPAv1Depay { + fn handle_rtp_packet( + &self, + rtp: &gst_rtp::RTPBuffer, + ) -> Result<(), gst::FlowError> { gst::log!( CAT, imp: self, @@ -185,7 +199,7 @@ impl RTPBaseDepayloadImpl for RTPAv1Depay { rtp.buffer().size(), ); - let payload = rtp.payload().map_err(err_opt!(self, payload_buf)).ok()?; + let payload = rtp.payload().map_err(err_flow!(self, payload_buf))?; let mut state = self.state.lock().unwrap(); @@ -201,8 +215,7 @@ impl RTPBaseDepayloadImpl for RTPAv1Depay { let mut byte = [0; 1]; reader .read_exact(&mut byte) - .map_err(err_opt!(self, aggr_header_read)) - .ok()?; + .map_err(err_flow!(self, aggr_header_read))?; AggregationHeader::from(&byte) }; @@ -237,22 +250,22 @@ impl RTPBaseDepayloadImpl for RTPAv1Depay { gst::error!( CAT, imp: self, - "invalid packet: ignores unclosed OBU fragment" + "invalid packet: dropping unclosed OBU fragment" ); self.reset(&mut state); } if let Some((obu, ref mut bytes)) = &mut state.obu_fragment { assert!(aggr_header.leading_fragment); - let (element_size, is_last_obu) = - self.find_element_info(rtp, &mut reader, &aggr_header, idx)?; + let (element_size, is_last_obu) = self + .find_element_info(rtp, &mut reader, &aggr_header, idx) + .map_err(err_flow!(self, find_element))?; let bytes_end = bytes.len(); bytes.resize(bytes_end + element_size as usize, 0); reader .read_exact(&mut bytes[bytes_end..]) - .map_err(err_opt!(self, buf_read)) - .ok()?; + .map_err(err_flow!(self, buf_read))?; // if this OBU is complete, it can be appended to the adapter if !(is_last_obu && aggr_header.trailing_fragment) { @@ -278,21 +291,17 @@ impl RTPBaseDepayloadImpl for RTPAv1Depay { let header_pos = reader.position(); let mut bitreader = BitReader::endian(&mut reader, ENDIANNESS); - let obu = UnsizedObu::parse(&mut bitreader) - .map_err(err_opt!(self, obu_read)) - .ok()?; + let obu = UnsizedObu::parse(&mut bitreader).map_err(err_flow!(self, obu_read))?; reader .seek(SeekFrom::Start(header_pos)) - .map_err(err_opt!(self, buf_read)) - .ok()?; + .map_err(err_flow!(self, buf_read))?; // ignore these OBU types if matches!(obu.obu_type, ObuType::TemporalDelimiter | ObuType::TileList) { reader .seek(SeekFrom::Current(element_size as i64)) - .map_err(err_opt!(self, buf_read)) - .ok()?; + .map_err(err_flow!(self, buf_read))?; idx += 1; continue; } @@ -303,8 +312,7 @@ impl RTPBaseDepayloadImpl for RTPAv1Depay { let mut bytes = vec![0; bytes_left as usize]; reader .read_exact(bytes.as_mut_slice()) - .map_err(err_opt!(self, buf_read)) - .ok()?; + .map_err(err_flow!(self, buf_read))?; state.obu_fragment = Some((obu, bytes)); } @@ -327,7 +335,7 @@ impl RTPBaseDepayloadImpl for RTPAv1Depay { gst::log!( CAT, imp: self, - "creating buffer containing {} bytes of data (marker {}, discont {})...", + "Creating buffer containing {} bytes of data (marker {}, discont {})...", ready_obus.len(), state.marked_packet, state.needs_discont, @@ -358,17 +366,20 @@ impl RTPBaseDepayloadImpl for RTPAv1Depay { imp: self, concat!( "invalid packet: has marker bit set, but ", - "last OBU is not yet complete" + "last OBU is not yet complete. Dropping incomplete OBU." ) ); self.reset(&mut state); } + drop(state); - buffer + if let Some(buffer) = buffer { + self.obj().push(buffer)?; + } + + Ok(()) } -} -impl RTPAv1Depay { /// Find out the next OBU element's size, and if it is the last OBU in the packet. /// The reader is expected to be at the first byte of the element, /// or its preceding size field if present, @@ -379,7 +390,7 @@ impl RTPAv1Depay { reader: &mut Cursor<&[u8]>, aggr_header: &AggregationHeader, index: u32, - ) -> Option<(u32, bool)> { + ) -> Result<(u32, bool), gst::FlowError> { let element_size: u32; let is_last_obu: bool; @@ -389,14 +400,11 @@ impl RTPAv1Depay { rtp.payload_size() - (reader.position() as u32) } else { let mut bitreader = BitReader::endian(reader, ENDIANNESS); - parse_leb128(&mut bitreader) - .map_err(err_opt!(self, leb_read)) - .ok()? + parse_leb128(&mut bitreader).map_err(err_flow!(self, leb_read))? } } else { element_size = parse_leb128(&mut BitReader::endian(&mut *reader, ENDIANNESS)) - .map_err(err_opt!(self, leb_read)) - .ok()?; + .map_err(err_flow!(self, leb_read))?; is_last_obu = match rtp .payload_size() .cmp(&(reader.position() as u32 + element_size)) @@ -409,12 +417,12 @@ impl RTPAv1Depay { imp: self, "invalid packet: size field gives impossibly large OBU size" ); - return None; + return Err(gst::FlowError::Error); } }; } - Some((element_size, is_last_obu)) + Ok((element_size, is_last_obu)) } /// Using OBU data from an RTP packet, construct a buffer containing that OBU in AV1 bitstream format @@ -423,7 +431,7 @@ impl RTPAv1Depay { reader: &mut Cursor<&[u8]>, obu: &SizedObu, w: &mut Vec, - ) -> Option { + ) -> Result<(), gst::FlowError> { let pos = w.len(); w.resize(pos + obu.full_size() as usize, 0); let bytes = &mut w[pos..]; @@ -431,8 +439,7 @@ impl RTPAv1Depay { // write OBU header reader .read_exact(&mut bytes[..obu.header_len as usize]) - .map_err(err_opt!(self, buf_read)) - .ok()?; + .map_err(err_flow!(self, buf_read))?; // set `has_size_field` bytes[0] |= 1 << 1; @@ -440,8 +447,7 @@ impl RTPAv1Depay { // skip internal size field if present if obu.has_size_field { parse_leb128(&mut BitReader::endian(&mut *reader, ENDIANNESS)) - .map_err(err_opt!(self, leb_read)) - .ok()?; + .map_err(err_flow!(self, leb_read))?; } // write size field @@ -452,16 +458,14 @@ impl RTPAv1Depay { ), obu.size, ) - .map_err(err_opt!(self, leb_write)) - .ok()?; + .map_err(err_flow!(self, leb_write))?; // write OBU payload reader .read_exact(&mut bytes[(obu.header_len + obu.leb_size) as usize..]) - .map_err(err_opt!(self, buf_read)) - .ok()?; + .map_err(err_flow!(self, buf_read))?; - Some(obu.full_size() as usize) + Ok(()) } } @@ -584,7 +588,7 @@ mod tests { println!("testing element {} with reader position {}...", obu_idx, reader.position()); let actual = element.imp().find_element_info(&rtp, &mut reader, &aggr_header, obu_idx as u32); - assert_eq!(actual, Some(expected)); + assert_eq!(actual, Ok(expected)); element_size = actual.unwrap().0; } }