rtpav1depay: Don't output full TUs but just OBUs as they come

Simplifies state tracking and potentially reduces latency as it's not
necessary to wait until all fragments of an OBU are received.

The last OBU of a TU is marked with the marker flag to allow parsers to
detect this without first seeing the beginning of the next TU.

Also use a simple `Vec` for collecting complete OBUs instead of a
`gst_base::Adapter` as this reduces the number of allocations.

And also handle invalid packets a little bit more gracefully.

Fixes https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/issues/244

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1072>
This commit is contained in:
Sebastian Dröge 2023-02-01 17:30:48 +02:00
parent 27128a476c
commit d6cb9d72d8
4 changed files with 69 additions and 78 deletions

View file

@ -5498,7 +5498,7 @@
"presence": "always" "presence": "always"
}, },
"src": { "src": {
"caps": "video/x-av1:\n parsed: true\n stream-format: obu-stream\n alignment: tu\n", "caps": "video/x-av1:\n parsed: true\n stream-format: obu-stream\n alignment: obu\n",
"direction": "src", "direction": "src",
"presence": "always" "presence": "always"
} }

View file

@ -11,7 +11,6 @@ rust-version = "1.63"
[dependencies] [dependencies]
bitstream-io = "1.3" bitstream-io = "1.3"
gst = { package = "gstreamer", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_20"] } gst = { package = "gstreamer", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_20"] }
gst-base = { package = "gstreamer-base", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_20"] }
gst-rtp = { package = "gstreamer-rtp", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_20"]} gst-rtp = { package = "gstreamer-rtp", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_20"]}
once_cell = "1.0" once_cell = "1.0"
chrono = { version = "0.4", default-features = false } chrono = { version = "0.4", default-features = false }

View file

@ -28,9 +28,6 @@ use crate::av1::common::{
#[derive(Debug, Default)] #[derive(Debug, Default)]
struct State { struct State {
/// used to store outgoing OBUs until the TU is complete
adapter: gst_base::UniqueAdapter,
last_timestamp: Option<u32>, last_timestamp: Option<u32>,
/// if true, the last packet of a temporal unit has been received /// if true, the last packet of a temporal unit has been received
marked_packet: bool, marked_packet: bool,
@ -51,8 +48,7 @@ static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
) )
}); });
static TEMPORAL_DELIMITER: Lazy<gst::Memory> = static TEMPORAL_DELIMITER: [u8; 2] = [0b0001_0010, 0];
Lazy::new(|| gst::Memory::from_slice([0b0001_0010, 0]));
impl RTPAv1Depay { impl RTPAv1Depay {
fn reset(&self, state: &mut State) { fn reset(&self, state: &mut State) {
@ -109,7 +105,7 @@ impl ElementImpl for RTPAv1Depay {
&gst::Caps::builder("video/x-av1") &gst::Caps::builder("video/x-av1")
.field("parsed", true) .field("parsed", true)
.field("stream-format", "obu-stream") .field("stream-format", "obu-stream")
.field("alignment", "tu") .field("alignment", "obu")
.build(), .build(),
) )
.unwrap(); .unwrap();
@ -185,10 +181,8 @@ impl RTPBaseDepayloadImpl for RTPAv1Depay {
self.reset(&mut state); self.reset(&mut state);
} }
// number of bytes that can be used in the next outgoing buffer
let mut bytes_ready = 0;
let mut reader = Cursor::new(payload); let mut reader = Cursor::new(payload);
let mut ready_obus = gst::Buffer::new(); let mut ready_obus = Vec::new();
let aggr_header = { let aggr_header = {
let mut byte = [0; 1]; let mut byte = [0; 1];
@ -214,18 +208,10 @@ impl RTPBaseDepayloadImpl for RTPAv1Depay {
state.last_timestamp state.last_timestamp
); );
self.reset(&mut state); self.reset(&mut state);
return None;
} }
// all the currently stored bytes can be packed into the next outgoing buffer
bytes_ready = state.adapter.available();
// the next temporal unit starts with a temporal delimiter OBU // the next temporal unit starts with a temporal delimiter OBU
ready_obus ready_obus.extend_from_slice(&TEMPORAL_DELIMITER);
.get_mut()
.unwrap()
.insert_memory(None, TEMPORAL_DELIMITER.clone());
state.marked_packet = false;
} }
state.marked_packet = rtp.is_marker(); state.marked_packet = rtp.is_marker();
state.last_timestamp = Some(rtp.timestamp()); state.last_timestamp = Some(rtp.timestamp());
@ -234,16 +220,17 @@ impl RTPBaseDepayloadImpl for RTPAv1Depay {
let mut idx = 0; let mut idx = 0;
// handle leading OBU fragment // handle leading OBU fragment
if let Some((obu, ref mut bytes)) = &mut state.obu_fragment { if state.obu_fragment.is_some() && !aggr_header.leading_fragment {
if !aggr_header.leading_fragment {
gst::error!( gst::error!(
CAT, CAT,
imp: self, imp: self,
"invalid packet: ignores unclosed OBU fragment" "invalid packet: ignores unclosed OBU fragment"
); );
return None; self.reset(&mut state);
} }
if let Some((obu, ref mut bytes)) = &mut state.obu_fragment {
assert!(aggr_header.leading_fragment);
let (element_size, is_last_obu) = let (element_size, is_last_obu) =
self.find_element_info(rtp, &mut reader, &aggr_header, idx)?; self.find_element_info(rtp, &mut reader, &aggr_header, idx)?;
@ -262,9 +249,11 @@ impl RTPBaseDepayloadImpl for RTPAv1Depay {
obu.as_sized(size, leb_size) obu.as_sized(size, leb_size)
}; };
let buffer = self.translate_obu(&mut Cursor::new(bytes.as_slice()), &full_obu)?; self.translate_obu(
&mut Cursor::new(bytes.as_slice()),
state.adapter.push(buffer); &full_obu,
&mut ready_obus,
)?;
state.obu_fragment = None; state.obu_fragment = None;
} }
} }
@ -291,7 +280,10 @@ impl RTPBaseDepayloadImpl for RTPAv1Depay {
.seek(SeekFrom::Current(element_size as i64)) .seek(SeekFrom::Current(element_size as i64))
.map_err(err_opt!(self, buf_read)) .map_err(err_opt!(self, buf_read))
.ok()?; .ok()?;
idx += 1;
continue;
} }
// trailing OBU fragments are stored in the state // trailing OBU fragments are stored in the state
if is_last_obu && aggr_header.trailing_fragment { if is_last_obu && aggr_header.trailing_fragment {
let bytes_left = rtp.payload_size() - (reader.position() as u32); let bytes_left = rtp.payload_size() - (reader.position() as u32);
@ -303,7 +295,7 @@ impl RTPBaseDepayloadImpl for RTPAv1Depay {
state.obu_fragment = Some((obu, bytes)); state.obu_fragment = Some((obu, bytes));
} }
// full OBUs elements are translated and appended to the adapter // full OBUs elements are translated and appended to the ready OBUs
else { else {
let full_obu = { let full_obu = {
let size = element_size - obu.header_len; let size = element_size - obu.header_len;
@ -311,16 +303,36 @@ impl RTPBaseDepayloadImpl for RTPAv1Depay {
obu.as_sized(size, leb_size) obu.as_sized(size, leb_size)
}; };
ready_obus.append(self.translate_obu(&mut reader, &full_obu)?); self.translate_obu(&mut reader, &full_obu, &mut ready_obus)?;
} }
idx += 1; idx += 1;
} }
state.adapter.push(ready_obus); // now push all the complete OBUs
let buffer = if !ready_obus.is_empty() {
gst::log!(
CAT,
imp: self,
"creating buffer containing {} bytes of data (marker {})...",
ready_obus.len(),
state.marked_packet,
);
let mut buffer = gst::Buffer::from_mut_slice(ready_obus);
{
let buffer = buffer.get_mut().unwrap();
if state.marked_packet { if state.marked_packet {
if state.obu_fragment.is_some() { buffer.set_flags(gst::BufferFlags::MARKER);
}
}
Some(buffer)
} else {
None
};
if state.marked_packet && state.obu_fragment.is_some() {
gst::error!( gst::error!(
CAT, CAT,
imp: self, imp: self,
@ -330,30 +342,9 @@ impl RTPBaseDepayloadImpl for RTPAv1Depay {
) )
); );
self.reset(&mut state); self.reset(&mut state);
return None;
} }
bytes_ready = state.adapter.available(); buffer
}
// now push all the complete temporal units
if bytes_ready > 0 {
gst::log!(
CAT,
imp: self,
"creating buffer containing {} bytes of data...",
bytes_ready
);
Some(
state
.adapter
.take_buffer(bytes_ready)
.map_err(err_opt!(self, buf_take))
.ok()?,
)
} else {
None
}
} }
} }
@ -407,12 +398,15 @@ impl RTPAv1Depay {
} }
/// Using OBU data from an RTP packet, construct a buffer containing that OBU in AV1 bitstream format /// Using OBU data from an RTP packet, construct a buffer containing that OBU in AV1 bitstream format
fn translate_obu(&self, reader: &mut Cursor<&[u8]>, obu: &SizedObu) -> Option<gst::Buffer> { fn translate_obu(
let mut bytes = gst::Buffer::with_size(obu.full_size() as usize) &self,
.map_err(err_opt!(self, buf_alloc)) reader: &mut Cursor<&[u8]>,
.ok()? obu: &SizedObu,
.into_mapped_buffer_writable() w: &mut Vec<u8>,
.unwrap(); ) -> Option<usize> {
let pos = w.len();
w.resize(pos + obu.full_size() as usize, 0);
let bytes = &mut w[pos..];
// write OBU header // write OBU header
reader reader
@ -447,7 +441,7 @@ impl RTPAv1Depay {
.map_err(err_opt!(self, buf_read)) .map_err(err_opt!(self, buf_read))
.ok()?; .ok()?;
Some(bytes.into_buffer()) Some(obu.full_size() as usize)
} }
} }
@ -512,14 +506,10 @@ mod tests {
println!("running test {idx}..."); println!("running test {idx}...");
let mut reader = Cursor::new(rtp_bytes.as_slice()); let mut reader = Cursor::new(rtp_bytes.as_slice());
let actual = element.imp().translate_obu(&mut reader, &obu); let mut actual = Vec::new();
element.imp().translate_obu(&mut reader, &obu, &mut actual).unwrap();
assert_eq!(reader.position(), rtp_bytes.len() as u64); assert_eq!(reader.position(), rtp_bytes.len() as u64);
assert!(actual.is_some());
let actual = actual
.unwrap()
.into_mapped_buffer_readable()
.unwrap();
assert_eq!(actual.as_slice(), out_bytes.as_slice()); assert_eq!(actual.as_slice(), out_bytes.as_slice());
} }
} }

View file

@ -57,7 +57,7 @@ fn test_depayloader() {
) )
]; ];
let expected: [Vec<u8>; 2] = [ let expected: [Vec<u8>; 3] = [
vec![ vec![
0b0001_0010, 0, 0b0001_0010, 0,
0b0011_0010, 0b0000_0110, 1, 2, 3, 4, 5, 6, 0b0011_0010, 0b0000_0110, 1, 2, 3, 4, 5, 6,
@ -65,6 +65,8 @@ fn test_depayloader() {
vec![ vec![
0b0001_0010, 0, 0b0001_0010, 0,
0b0111_1010, 0b0000_0101, 1, 2, 3, 4, 5, 0b0111_1010, 0b0000_0101, 1, 2, 3, 4, 5,
],
vec![
0b0011_0010, 0b0000_1010, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 0b0011_0010, 0b0000_1010, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10,
], ],
]; ];