diff --git a/docs/plugins/gst_plugins_cache.json b/docs/plugins/gst_plugins_cache.json index f9587f47..40365c04 100644 --- a/docs/plugins/gst_plugins_cache.json +++ b/docs/plugins/gst_plugins_cache.json @@ -5495,7 +5495,7 @@ "presence": "always" }, "src": { - "caps": "video/x-av1:\n parsed: true\n stream-format: obu-stream\n alignment: tu\n", + "caps": "video/x-av1:\n parsed: true\n stream-format: obu-stream\n alignment: obu\n", "direction": "src", "presence": "always" } diff --git a/net/rtp/Cargo.toml b/net/rtp/Cargo.toml index c12e40ca..0c2e29b7 100644 --- a/net/rtp/Cargo.toml +++ b/net/rtp/Cargo.toml @@ -11,7 +11,6 @@ rust-version = "1.63" [dependencies] bitstream-io = "1.3" gst = { package = "gstreamer", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", branch = "0.19", version = "0.19", features = ["v1_20"] } -gst-base = { package = "gstreamer-base", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", branch = "0.19", version = "0.19", features = ["v1_20"] } gst-rtp = { package = "gstreamer-rtp", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", branch = "0.19", version = "0.19", features = ["v1_20"]} once_cell = "1.0" chrono = { version = "0.4", default-features = false } diff --git a/net/rtp/src/av1/depay/imp.rs b/net/rtp/src/av1/depay/imp.rs index 2312026e..b2a3c2dd 100644 --- a/net/rtp/src/av1/depay/imp.rs +++ b/net/rtp/src/av1/depay/imp.rs @@ -28,9 +28,6 @@ use crate::av1::common::{ #[derive(Debug, Default)] struct State { - /// used to store outgoing OBUs until the TU is complete - adapter: gst_base::UniqueAdapter, - last_timestamp: Option, /// if true, the last packet of a temporal unit has been received marked_packet: bool, @@ -51,8 +48,7 @@ static CAT: Lazy = Lazy::new(|| { ) }); -static TEMPORAL_DELIMITER: Lazy = - Lazy::new(|| gst::Memory::from_slice([0b0001_0010, 0])); +static TEMPORAL_DELIMITER: [u8; 2] = [0b0001_0010, 0]; impl RTPAv1Depay { fn reset(&self, state: &mut State) { @@ -109,7 +105,7 @@ impl ElementImpl for RTPAv1Depay { &gst::Caps::builder("video/x-av1") .field("parsed", true) .field("stream-format", "obu-stream") - .field("alignment", "tu") + .field("alignment", "obu") .build(), ) .unwrap(); @@ -185,10 +181,8 @@ impl RTPBaseDepayloadImpl for RTPAv1Depay { self.reset(&mut state); } - // number of bytes that can be used in the next outgoing buffer - let mut bytes_ready = 0; let mut reader = Cursor::new(payload); - let mut ready_obus = gst::Buffer::new(); + let mut ready_obus = Vec::new(); let aggr_header = { let mut byte = [0; 1]; @@ -214,18 +208,10 @@ impl RTPBaseDepayloadImpl for RTPAv1Depay { state.last_timestamp ); self.reset(&mut state); - return None; } - // all the currently stored bytes can be packed into the next outgoing buffer - bytes_ready = state.adapter.available(); - // the next temporal unit starts with a temporal delimiter OBU - ready_obus - .get_mut() - .unwrap() - .insert_memory(None, TEMPORAL_DELIMITER.clone()); - state.marked_packet = false; + ready_obus.extend_from_slice(&TEMPORAL_DELIMITER); } state.marked_packet = rtp.is_marker(); state.last_timestamp = Some(rtp.timestamp()); @@ -234,16 +220,17 @@ impl RTPBaseDepayloadImpl for RTPAv1Depay { let mut idx = 0; // handle leading OBU fragment - if let Some((obu, ref mut bytes)) = &mut state.obu_fragment { - if !aggr_header.leading_fragment { - gst::error!( - CAT, - imp: self, - "invalid packet: ignores unclosed OBU fragment" - ); - return None; - } + if state.obu_fragment.is_some() && !aggr_header.leading_fragment { + gst::error!( + CAT, + imp: self, + "invalid packet: ignores unclosed OBU fragment" + ); + self.reset(&mut state); + } + if let Some((obu, ref mut bytes)) = &mut state.obu_fragment { + assert!(aggr_header.leading_fragment); let (element_size, is_last_obu) = self.find_element_info(rtp, &mut reader, &aggr_header, idx)?; @@ -262,9 +249,11 @@ impl RTPBaseDepayloadImpl for RTPAv1Depay { obu.as_sized(size, leb_size) }; - let buffer = self.translate_obu(&mut Cursor::new(bytes.as_slice()), &full_obu)?; - - state.adapter.push(buffer); + self.translate_obu( + &mut Cursor::new(bytes.as_slice()), + &full_obu, + &mut ready_obus, + )?; state.obu_fragment = None; } } @@ -291,7 +280,10 @@ impl RTPBaseDepayloadImpl for RTPAv1Depay { .seek(SeekFrom::Current(element_size as i64)) .map_err(err_opt!(self, buf_read)) .ok()?; + idx += 1; + continue; } + // trailing OBU fragments are stored in the state if is_last_obu && aggr_header.trailing_fragment { let bytes_left = rtp.payload_size() - (reader.position() as u32); @@ -303,7 +295,7 @@ impl RTPBaseDepayloadImpl for RTPAv1Depay { state.obu_fragment = Some((obu, bytes)); } - // full OBUs elements are translated and appended to the adapter + // full OBUs elements are translated and appended to the ready OBUs else { let full_obu = { let size = element_size - obu.header_len; @@ -311,49 +303,48 @@ impl RTPBaseDepayloadImpl for RTPAv1Depay { obu.as_sized(size, leb_size) }; - ready_obus.append(self.translate_obu(&mut reader, &full_obu)?); + self.translate_obu(&mut reader, &full_obu, &mut ready_obus)?; } idx += 1; } - state.adapter.push(ready_obus); - - if state.marked_packet { - if state.obu_fragment.is_some() { - gst::error!( - CAT, - imp: self, - concat!( - "invalid packet: has marker bit set, but ", - "last OBU is not yet complete" - ) - ); - self.reset(&mut state); - return None; - } - - bytes_ready = state.adapter.available(); - } - - // now push all the complete temporal units - if bytes_ready > 0 { + // now push all the complete OBUs + let buffer = if !ready_obus.is_empty() { gst::log!( CAT, imp: self, - "creating buffer containing {} bytes of data...", - bytes_ready + "creating buffer containing {} bytes of data (marker {})...", + ready_obus.len(), + state.marked_packet, ); - Some( - state - .adapter - .take_buffer(bytes_ready) - .map_err(err_opt!(self, buf_take)) - .ok()?, - ) + + let mut buffer = gst::Buffer::from_mut_slice(ready_obus); + { + let buffer = buffer.get_mut().unwrap(); + if state.marked_packet { + buffer.set_flags(gst::BufferFlags::MARKER); + } + } + + Some(buffer) } else { None + }; + + if state.marked_packet && state.obu_fragment.is_some() { + gst::error!( + CAT, + imp: self, + concat!( + "invalid packet: has marker bit set, but ", + "last OBU is not yet complete" + ) + ); + self.reset(&mut state); } + + buffer } } @@ -407,12 +398,15 @@ impl RTPAv1Depay { } /// Using OBU data from an RTP packet, construct a buffer containing that OBU in AV1 bitstream format - fn translate_obu(&self, reader: &mut Cursor<&[u8]>, obu: &SizedObu) -> Option { - let mut bytes = gst::Buffer::with_size(obu.full_size() as usize) - .map_err(err_opt!(self, buf_alloc)) - .ok()? - .into_mapped_buffer_writable() - .unwrap(); + fn translate_obu( + &self, + reader: &mut Cursor<&[u8]>, + obu: &SizedObu, + w: &mut Vec, + ) -> Option { + let pos = w.len(); + w.resize(pos + obu.full_size() as usize, 0); + let bytes = &mut w[pos..]; // write OBU header reader @@ -447,7 +441,7 @@ impl RTPAv1Depay { .map_err(err_opt!(self, buf_read)) .ok()?; - Some(bytes.into_buffer()) + Some(obu.full_size() as usize) } } @@ -512,14 +506,10 @@ mod tests { println!("running test {}...", idx); let mut reader = Cursor::new(rtp_bytes.as_slice()); - let actual = element.imp().translate_obu(&mut reader, &obu); + let mut actual = Vec::new(); + element.imp().translate_obu(&mut reader, &obu, &mut actual).unwrap(); assert_eq!(reader.position(), rtp_bytes.len() as u64); - assert!(actual.is_some()); - let actual = actual - .unwrap() - .into_mapped_buffer_readable() - .unwrap(); assert_eq!(actual.as_slice(), out_bytes.as_slice()); } } diff --git a/net/rtp/tests/rtpav1.rs b/net/rtp/tests/rtpav1.rs index d8623c52..8656d5be 100644 --- a/net/rtp/tests/rtpav1.rs +++ b/net/rtp/tests/rtpav1.rs @@ -57,7 +57,7 @@ fn test_depayloader() { ) ]; - let expected: [Vec; 2] = [ + let expected: [Vec; 3] = [ vec![ 0b0001_0010, 0, 0b0011_0010, 0b0000_0110, 1, 2, 3, 4, 5, 6, @@ -65,6 +65,8 @@ fn test_depayloader() { vec![ 0b0001_0010, 0, 0b0111_1010, 0b0000_0101, 1, 2, 3, 4, 5, + ], + vec![ 0b0011_0010, 0b0000_1010, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, ], ];