From 3520fc67de7404502ca89ef192921232aa60d81f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Dr=C3=B6ge?= Date: Wed, 1 Feb 2023 17:30:48 +0200 Subject: [PATCH] rtpav1depay: Don't output full TUs but just OBUs as they come Simplifies state tracking and potentially reduces latency as it's not necessary to wait until all fragments of an OBU are received. The last OBU of a TU is marked with the marker flag to allow parsers to detect this without first seeing the beginning of the next TU. Also use a simple `Vec` for collecting complete OBUs instead of a `gst_base::Adapter` as this reduces the number of allocations. And also handle invalid packets a little bit more gracefully. Fixes https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/issues/244 Part-of: --- docs/plugins/gst_plugins_cache.json | 2 +- net/rtp/Cargo.toml | 1 - net/rtp/src/av1/depay/imp.rs | 140 +++++++++++++--------------- net/rtp/tests/rtpav1.rs | 4 +- 4 files changed, 69 insertions(+), 78 deletions(-) diff --git a/docs/plugins/gst_plugins_cache.json b/docs/plugins/gst_plugins_cache.json index f9587f47..40365c04 100644 --- a/docs/plugins/gst_plugins_cache.json +++ b/docs/plugins/gst_plugins_cache.json @@ -5495,7 +5495,7 @@ "presence": "always" }, "src": { - "caps": "video/x-av1:\n parsed: true\n stream-format: obu-stream\n alignment: tu\n", + "caps": "video/x-av1:\n parsed: true\n stream-format: obu-stream\n alignment: obu\n", "direction": "src", "presence": "always" } diff --git a/net/rtp/Cargo.toml b/net/rtp/Cargo.toml index c12e40ca..0c2e29b7 100644 --- a/net/rtp/Cargo.toml +++ b/net/rtp/Cargo.toml @@ -11,7 +11,6 @@ rust-version = "1.63" [dependencies] bitstream-io = "1.3" gst = { package = "gstreamer", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", branch = "0.19", version = "0.19", features = ["v1_20"] } -gst-base = { package = "gstreamer-base", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", branch = "0.19", version = "0.19", features = ["v1_20"] } gst-rtp = { package = "gstreamer-rtp", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", branch = "0.19", version = "0.19", features = ["v1_20"]} once_cell = "1.0" chrono = { version = "0.4", default-features = false } diff --git a/net/rtp/src/av1/depay/imp.rs b/net/rtp/src/av1/depay/imp.rs index 2312026e..b2a3c2dd 100644 --- a/net/rtp/src/av1/depay/imp.rs +++ b/net/rtp/src/av1/depay/imp.rs @@ -28,9 +28,6 @@ use crate::av1::common::{ #[derive(Debug, Default)] struct State { - /// used to store outgoing OBUs until the TU is complete - adapter: gst_base::UniqueAdapter, - last_timestamp: Option, /// if true, the last packet of a temporal unit has been received marked_packet: bool, @@ -51,8 +48,7 @@ static CAT: Lazy = Lazy::new(|| { ) }); -static TEMPORAL_DELIMITER: Lazy = - Lazy::new(|| gst::Memory::from_slice([0b0001_0010, 0])); +static TEMPORAL_DELIMITER: [u8; 2] = [0b0001_0010, 0]; impl RTPAv1Depay { fn reset(&self, state: &mut State) { @@ -109,7 +105,7 @@ impl ElementImpl for RTPAv1Depay { &gst::Caps::builder("video/x-av1") .field("parsed", true) .field("stream-format", "obu-stream") - .field("alignment", "tu") + .field("alignment", "obu") .build(), ) .unwrap(); @@ -185,10 +181,8 @@ impl RTPBaseDepayloadImpl for RTPAv1Depay { self.reset(&mut state); } - // number of bytes that can be used in the next outgoing buffer - let mut bytes_ready = 0; let mut reader = Cursor::new(payload); - let mut ready_obus = gst::Buffer::new(); + let mut ready_obus = Vec::new(); let aggr_header = { let mut byte = [0; 1]; @@ -214,18 +208,10 @@ impl RTPBaseDepayloadImpl for RTPAv1Depay { state.last_timestamp ); self.reset(&mut state); - return None; } - // all the currently stored bytes can be packed into the next outgoing buffer - bytes_ready = state.adapter.available(); - // the next temporal unit starts with a temporal delimiter OBU - ready_obus - .get_mut() - .unwrap() - .insert_memory(None, TEMPORAL_DELIMITER.clone()); - state.marked_packet = false; + ready_obus.extend_from_slice(&TEMPORAL_DELIMITER); } state.marked_packet = rtp.is_marker(); state.last_timestamp = Some(rtp.timestamp()); @@ -234,16 +220,17 @@ impl RTPBaseDepayloadImpl for RTPAv1Depay { let mut idx = 0; // handle leading OBU fragment - if let Some((obu, ref mut bytes)) = &mut state.obu_fragment { - if !aggr_header.leading_fragment { - gst::error!( - CAT, - imp: self, - "invalid packet: ignores unclosed OBU fragment" - ); - return None; - } + if state.obu_fragment.is_some() && !aggr_header.leading_fragment { + gst::error!( + CAT, + imp: self, + "invalid packet: ignores unclosed OBU fragment" + ); + self.reset(&mut state); + } + if let Some((obu, ref mut bytes)) = &mut state.obu_fragment { + assert!(aggr_header.leading_fragment); let (element_size, is_last_obu) = self.find_element_info(rtp, &mut reader, &aggr_header, idx)?; @@ -262,9 +249,11 @@ impl RTPBaseDepayloadImpl for RTPAv1Depay { obu.as_sized(size, leb_size) }; - let buffer = self.translate_obu(&mut Cursor::new(bytes.as_slice()), &full_obu)?; - - state.adapter.push(buffer); + self.translate_obu( + &mut Cursor::new(bytes.as_slice()), + &full_obu, + &mut ready_obus, + )?; state.obu_fragment = None; } } @@ -291,7 +280,10 @@ impl RTPBaseDepayloadImpl for RTPAv1Depay { .seek(SeekFrom::Current(element_size as i64)) .map_err(err_opt!(self, buf_read)) .ok()?; + idx += 1; + continue; } + // trailing OBU fragments are stored in the state if is_last_obu && aggr_header.trailing_fragment { let bytes_left = rtp.payload_size() - (reader.position() as u32); @@ -303,7 +295,7 @@ impl RTPBaseDepayloadImpl for RTPAv1Depay { state.obu_fragment = Some((obu, bytes)); } - // full OBUs elements are translated and appended to the adapter + // full OBUs elements are translated and appended to the ready OBUs else { let full_obu = { let size = element_size - obu.header_len; @@ -311,49 +303,48 @@ impl RTPBaseDepayloadImpl for RTPAv1Depay { obu.as_sized(size, leb_size) }; - ready_obus.append(self.translate_obu(&mut reader, &full_obu)?); + self.translate_obu(&mut reader, &full_obu, &mut ready_obus)?; } idx += 1; } - state.adapter.push(ready_obus); - - if state.marked_packet { - if state.obu_fragment.is_some() { - gst::error!( - CAT, - imp: self, - concat!( - "invalid packet: has marker bit set, but ", - "last OBU is not yet complete" - ) - ); - self.reset(&mut state); - return None; - } - - bytes_ready = state.adapter.available(); - } - - // now push all the complete temporal units - if bytes_ready > 0 { + // now push all the complete OBUs + let buffer = if !ready_obus.is_empty() { gst::log!( CAT, imp: self, - "creating buffer containing {} bytes of data...", - bytes_ready + "creating buffer containing {} bytes of data (marker {})...", + ready_obus.len(), + state.marked_packet, ); - Some( - state - .adapter - .take_buffer(bytes_ready) - .map_err(err_opt!(self, buf_take)) - .ok()?, - ) + + let mut buffer = gst::Buffer::from_mut_slice(ready_obus); + { + let buffer = buffer.get_mut().unwrap(); + if state.marked_packet { + buffer.set_flags(gst::BufferFlags::MARKER); + } + } + + Some(buffer) } else { None + }; + + if state.marked_packet && state.obu_fragment.is_some() { + gst::error!( + CAT, + imp: self, + concat!( + "invalid packet: has marker bit set, but ", + "last OBU is not yet complete" + ) + ); + self.reset(&mut state); } + + buffer } } @@ -407,12 +398,15 @@ impl RTPAv1Depay { } /// Using OBU data from an RTP packet, construct a buffer containing that OBU in AV1 bitstream format - fn translate_obu(&self, reader: &mut Cursor<&[u8]>, obu: &SizedObu) -> Option { - let mut bytes = gst::Buffer::with_size(obu.full_size() as usize) - .map_err(err_opt!(self, buf_alloc)) - .ok()? - .into_mapped_buffer_writable() - .unwrap(); + fn translate_obu( + &self, + reader: &mut Cursor<&[u8]>, + obu: &SizedObu, + w: &mut Vec, + ) -> Option { + let pos = w.len(); + w.resize(pos + obu.full_size() as usize, 0); + let bytes = &mut w[pos..]; // write OBU header reader @@ -447,7 +441,7 @@ impl RTPAv1Depay { .map_err(err_opt!(self, buf_read)) .ok()?; - Some(bytes.into_buffer()) + Some(obu.full_size() as usize) } } @@ -512,14 +506,10 @@ mod tests { println!("running test {}...", idx); let mut reader = Cursor::new(rtp_bytes.as_slice()); - let actual = element.imp().translate_obu(&mut reader, &obu); + let mut actual = Vec::new(); + element.imp().translate_obu(&mut reader, &obu, &mut actual).unwrap(); assert_eq!(reader.position(), rtp_bytes.len() as u64); - assert!(actual.is_some()); - let actual = actual - .unwrap() - .into_mapped_buffer_readable() - .unwrap(); assert_eq!(actual.as_slice(), out_bytes.as_slice()); } } diff --git a/net/rtp/tests/rtpav1.rs b/net/rtp/tests/rtpav1.rs index d8623c52..8656d5be 100644 --- a/net/rtp/tests/rtpav1.rs +++ b/net/rtp/tests/rtpav1.rs @@ -57,7 +57,7 @@ fn test_depayloader() { ) ]; - let expected: [Vec; 2] = [ + let expected: [Vec; 3] = [ vec![ 0b0001_0010, 0, 0b0011_0010, 0b0000_0110, 1, 2, 3, 4, 5, 6, @@ -65,6 +65,8 @@ fn test_depayloader() { vec![ 0b0001_0010, 0, 0b0111_1010, 0b0000_0101, 1, 2, 3, 4, 5, + ], + vec![ 0b0011_0010, 0b0000_1010, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, ], ];