rtpav1depay: Don't output full TUs but just OBUs as they come

Simplifies state tracking and potentially reduces latency as it's not
necessary to wait until all fragments of an OBU are received.

The last OBU of a TU is marked with the marker flag to allow parsers to
detect this without first seeing the beginning of the next TU.

Also use a simple `Vec` for collecting complete OBUs instead of a
`gst_base::Adapter` as this reduces the number of allocations.

And also handle invalid packets a little bit more gracefully.

Fixes https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/issues/244

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1086>
This commit is contained in:
Sebastian Dröge 2023-02-01 17:30:48 +02:00
parent 402d96b80c
commit 3520fc67de
4 changed files with 69 additions and 78 deletions

View file

@ -5495,7 +5495,7 @@
"presence": "always"
},
"src": {
"caps": "video/x-av1:\n parsed: true\n stream-format: obu-stream\n alignment: tu\n",
"caps": "video/x-av1:\n parsed: true\n stream-format: obu-stream\n alignment: obu\n",
"direction": "src",
"presence": "always"
}

View file

@ -11,7 +11,6 @@ rust-version = "1.63"
[dependencies]
bitstream-io = "1.3"
gst = { package = "gstreamer", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", branch = "0.19", version = "0.19", features = ["v1_20"] }
gst-base = { package = "gstreamer-base", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", branch = "0.19", version = "0.19", features = ["v1_20"] }
gst-rtp = { package = "gstreamer-rtp", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", branch = "0.19", version = "0.19", features = ["v1_20"]}
once_cell = "1.0"
chrono = { version = "0.4", default-features = false }

View file

@ -28,9 +28,6 @@ use crate::av1::common::{
#[derive(Debug, Default)]
struct State {
/// used to store outgoing OBUs until the TU is complete
adapter: gst_base::UniqueAdapter,
last_timestamp: Option<u32>,
/// if true, the last packet of a temporal unit has been received
marked_packet: bool,
@ -51,8 +48,7 @@ static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
)
});
static TEMPORAL_DELIMITER: Lazy<gst::Memory> =
Lazy::new(|| gst::Memory::from_slice([0b0001_0010, 0]));
static TEMPORAL_DELIMITER: [u8; 2] = [0b0001_0010, 0];
impl RTPAv1Depay {
fn reset(&self, state: &mut State) {
@ -109,7 +105,7 @@ impl ElementImpl for RTPAv1Depay {
&gst::Caps::builder("video/x-av1")
.field("parsed", true)
.field("stream-format", "obu-stream")
.field("alignment", "tu")
.field("alignment", "obu")
.build(),
)
.unwrap();
@ -185,10 +181,8 @@ impl RTPBaseDepayloadImpl for RTPAv1Depay {
self.reset(&mut state);
}
// number of bytes that can be used in the next outgoing buffer
let mut bytes_ready = 0;
let mut reader = Cursor::new(payload);
let mut ready_obus = gst::Buffer::new();
let mut ready_obus = Vec::new();
let aggr_header = {
let mut byte = [0; 1];
@ -214,18 +208,10 @@ impl RTPBaseDepayloadImpl for RTPAv1Depay {
state.last_timestamp
);
self.reset(&mut state);
return None;
}
// all the currently stored bytes can be packed into the next outgoing buffer
bytes_ready = state.adapter.available();
// the next temporal unit starts with a temporal delimiter OBU
ready_obus
.get_mut()
.unwrap()
.insert_memory(None, TEMPORAL_DELIMITER.clone());
state.marked_packet = false;
ready_obus.extend_from_slice(&TEMPORAL_DELIMITER);
}
state.marked_packet = rtp.is_marker();
state.last_timestamp = Some(rtp.timestamp());
@ -234,16 +220,17 @@ impl RTPBaseDepayloadImpl for RTPAv1Depay {
let mut idx = 0;
// handle leading OBU fragment
if let Some((obu, ref mut bytes)) = &mut state.obu_fragment {
if !aggr_header.leading_fragment {
gst::error!(
CAT,
imp: self,
"invalid packet: ignores unclosed OBU fragment"
);
return None;
}
if state.obu_fragment.is_some() && !aggr_header.leading_fragment {
gst::error!(
CAT,
imp: self,
"invalid packet: ignores unclosed OBU fragment"
);
self.reset(&mut state);
}
if let Some((obu, ref mut bytes)) = &mut state.obu_fragment {
assert!(aggr_header.leading_fragment);
let (element_size, is_last_obu) =
self.find_element_info(rtp, &mut reader, &aggr_header, idx)?;
@ -262,9 +249,11 @@ impl RTPBaseDepayloadImpl for RTPAv1Depay {
obu.as_sized(size, leb_size)
};
let buffer = self.translate_obu(&mut Cursor::new(bytes.as_slice()), &full_obu)?;
state.adapter.push(buffer);
self.translate_obu(
&mut Cursor::new(bytes.as_slice()),
&full_obu,
&mut ready_obus,
)?;
state.obu_fragment = None;
}
}
@ -291,7 +280,10 @@ impl RTPBaseDepayloadImpl for RTPAv1Depay {
.seek(SeekFrom::Current(element_size as i64))
.map_err(err_opt!(self, buf_read))
.ok()?;
idx += 1;
continue;
}
// trailing OBU fragments are stored in the state
if is_last_obu && aggr_header.trailing_fragment {
let bytes_left = rtp.payload_size() - (reader.position() as u32);
@ -303,7 +295,7 @@ impl RTPBaseDepayloadImpl for RTPAv1Depay {
state.obu_fragment = Some((obu, bytes));
}
// full OBUs elements are translated and appended to the adapter
// full OBUs elements are translated and appended to the ready OBUs
else {
let full_obu = {
let size = element_size - obu.header_len;
@ -311,49 +303,48 @@ impl RTPBaseDepayloadImpl for RTPAv1Depay {
obu.as_sized(size, leb_size)
};
ready_obus.append(self.translate_obu(&mut reader, &full_obu)?);
self.translate_obu(&mut reader, &full_obu, &mut ready_obus)?;
}
idx += 1;
}
state.adapter.push(ready_obus);
if state.marked_packet {
if state.obu_fragment.is_some() {
gst::error!(
CAT,
imp: self,
concat!(
"invalid packet: has marker bit set, but ",
"last OBU is not yet complete"
)
);
self.reset(&mut state);
return None;
}
bytes_ready = state.adapter.available();
}
// now push all the complete temporal units
if bytes_ready > 0 {
// now push all the complete OBUs
let buffer = if !ready_obus.is_empty() {
gst::log!(
CAT,
imp: self,
"creating buffer containing {} bytes of data...",
bytes_ready
"creating buffer containing {} bytes of data (marker {})...",
ready_obus.len(),
state.marked_packet,
);
Some(
state
.adapter
.take_buffer(bytes_ready)
.map_err(err_opt!(self, buf_take))
.ok()?,
)
let mut buffer = gst::Buffer::from_mut_slice(ready_obus);
{
let buffer = buffer.get_mut().unwrap();
if state.marked_packet {
buffer.set_flags(gst::BufferFlags::MARKER);
}
}
Some(buffer)
} else {
None
};
if state.marked_packet && state.obu_fragment.is_some() {
gst::error!(
CAT,
imp: self,
concat!(
"invalid packet: has marker bit set, but ",
"last OBU is not yet complete"
)
);
self.reset(&mut state);
}
buffer
}
}
@ -407,12 +398,15 @@ impl RTPAv1Depay {
}
/// Using OBU data from an RTP packet, construct a buffer containing that OBU in AV1 bitstream format
fn translate_obu(&self, reader: &mut Cursor<&[u8]>, obu: &SizedObu) -> Option<gst::Buffer> {
let mut bytes = gst::Buffer::with_size(obu.full_size() as usize)
.map_err(err_opt!(self, buf_alloc))
.ok()?
.into_mapped_buffer_writable()
.unwrap();
fn translate_obu(
&self,
reader: &mut Cursor<&[u8]>,
obu: &SizedObu,
w: &mut Vec<u8>,
) -> Option<usize> {
let pos = w.len();
w.resize(pos + obu.full_size() as usize, 0);
let bytes = &mut w[pos..];
// write OBU header
reader
@ -447,7 +441,7 @@ impl RTPAv1Depay {
.map_err(err_opt!(self, buf_read))
.ok()?;
Some(bytes.into_buffer())
Some(obu.full_size() as usize)
}
}
@ -512,14 +506,10 @@ mod tests {
println!("running test {}...", idx);
let mut reader = Cursor::new(rtp_bytes.as_slice());
let actual = element.imp().translate_obu(&mut reader, &obu);
let mut actual = Vec::new();
element.imp().translate_obu(&mut reader, &obu, &mut actual).unwrap();
assert_eq!(reader.position(), rtp_bytes.len() as u64);
assert!(actual.is_some());
let actual = actual
.unwrap()
.into_mapped_buffer_readable()
.unwrap();
assert_eq!(actual.as_slice(), out_bytes.as_slice());
}
}

View file

@ -57,7 +57,7 @@ fn test_depayloader() {
)
];
let expected: [Vec<u8>; 2] = [
let expected: [Vec<u8>; 3] = [
vec![
0b0001_0010, 0,
0b0011_0010, 0b0000_0110, 1, 2, 3, 4, 5, 6,
@ -65,6 +65,8 @@ fn test_depayloader() {
vec![
0b0001_0010, 0,
0b0111_1010, 0b0000_0101, 1, 2, 3, 4, 5,
],
vec![
0b0011_0010, 0b0000_1010, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10,
],
];