Add RTP de/payloader elements for AV1

Fixes https://gitlab.freedesktop.org/gstreamer/gstreamer/-/issues/881
This commit is contained in:
Vivienne Watermeier 2022-05-11 15:16:36 +02:00 committed by Sebastian Dröge
parent 2bf5f0bf67
commit 8d73b5008a
16 changed files with 2596 additions and 1 deletions

View file

@ -16,6 +16,7 @@ members = [
"net/hlssink3",
"net/onvif",
"net/reqwest",
"net/rtpav1",
"net/aws",
"net/webrtc-http",
"utils/fallbackswitch",
@ -53,6 +54,7 @@ default-members = [
"net/onvif",
"net/raptorq",
"net/reqwest",
"net/rtpav1",
"net/aws",
"net/webrtc-http",
"utils/fallbackswitch",

View file

@ -5004,6 +5004,72 @@
"tracers": {},
"url": "https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs"
},
"rtpav1": {
"description": "AV1 RTP (De)payloader Plugins",
"elements": {
"rtpav1depay": {
"author": "Vivienne Watermeier <vwatermeier@igalia.com>",
"description": "Depayload AV1 from RTP packets",
"hierarchy": [
"GstRtpAv1Depay",
"GstRTPBaseDepayload",
"GstElement",
"GstObject",
"GInitiallyUnowned",
"GObject"
],
"klass": "Codec/Depayloader/Network/RTP",
"long-name": "RTP AV1 Depayloader",
"pad-templates": {
"sink": {
"caps": "application/x-rtp:\n media: video\n payload: [ 96, 127 ]\n clock-rate: 90000\n encoding-name: AV1\n",
"direction": "sink",
"presence": "always"
},
"src": {
"caps": "video/x-av1:\n parsed: true\n stream-format: obu-stream\n alignment: tu\n",
"direction": "src",
"presence": "always"
}
},
"rank": "marginal"
},
"rtpav1pay": {
"author": "Vivienne Watermeier <vwatermeier@igalia.com>",
"description": "Payload AV1 as RTP packets",
"hierarchy": [
"GstRtpAv1Pay",
"GstRTPBasePayload",
"GstElement",
"GstObject",
"GInitiallyUnowned",
"GObject"
],
"klass": "Codec/Payloader/Network/RTP",
"long-name": "RTP AV1 payloader",
"pad-templates": {
"sink": {
"caps": "video/x-av1:\n parsed: true\n stream-format: obu-stream\n alignment: obu\n",
"direction": "sink",
"presence": "always"
},
"src": {
"caps": "application/x-rtp:\n media: video\n payload: [ 96, 127 ]\n clock-rate: 90000\n encoding-name: AV1\n",
"direction": "src",
"presence": "always"
}
},
"rank": "marginal"
}
},
"filename": "gstrtpav1",
"license": "MPL",
"other-types": {},
"package": "gst-plugin-rtpav1",
"source": "gst-plugin-rtpav1",
"tracers": {},
"url": "https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs"
},
"sodium": {
"description": "libsodium-based file encryption and decryption",
"elements": {

View file

@ -67,7 +67,8 @@ plugins = {
'gst-plugin-textahead': 'libgsttextahead',
'gst-plugin-onvif': 'libgstrsonvif',
'gst-plugin-tracers': 'libgstrstracers',
'gst-plugin-webrtchttp': 'libgstwebrtchttp'
'gst-plugin-webrtchttp': 'libgstwebrtchttp',
'gst-plugin-rtpav1': 'libgstrtpav1',
}
extra_env = {}

45
net/rtpav1/Cargo.toml Normal file
View file

@ -0,0 +1,45 @@
[package]
name = "gst-plugin-rtpav1"
version = "0.1.0"
authors = ["Vivienne Watermeier <vwatermeier@igalia.com>"]
repository = "https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs"
license = "MPL-2.0"
edition = "2021"
description = "AV1 RTP (De)payloader Plugins"
rust-version = "1.63"
[dependencies]
bitstream-io = "1.3"
gst = { package = "gstreamer", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_20"] }
gst-base = { package = "gstreamer-base", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_20"] }
gst-rtp = { package = "gstreamer-rtp", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_20"]}
once_cell = "1.0"
[dev-dependencies]
gst-check = { package = "gstreamer-check", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_20"] }
[build-dependencies]
gst-plugin-version-helper = { git = "https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs" }
[lib]
name = "gstrtpav1"
crate-type = ["cdylib", "rlib"]
path = "src/lib.rs"
[features]
static = []
capi = []
doc = []
[package.metadata.capi]
min_version = "0.8.0"
[package.metadata.capi.header]
enabled = false
[package.metadata.capi.library]
install_subdir = "gstreamer-1.0"
versioning = false
[package.metadata.capi.pkg_config]
requires_private = "gstreamer-1.0, gstreamer-base-1.0, gstreamer-rtp-1.0, gobject-2.0, glib-2.0, gmodule-2.0"

12
net/rtpav1/build.rs Normal file
View file

@ -0,0 +1,12 @@
//
// Copyright (C) 2022 Vivienne Watermeier <vwatermeier@igalia.com>
//
// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
// If a copy of the MPL was not distributed with this file, You can obtain one at
// <https://mozilla.org/MPL/2.0/>.
//
// SPDX-License-Identifier: MPL-2.0
fn main() {
gst_plugin_version_helper::info()
}

View file

@ -0,0 +1,99 @@
//
// Copyright (C) 2022 Vivienne Watermeier <vwatermeier@igalia.com>
//
// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
// If a copy of the MPL was not distributed with this file, You can obtain one at
// <https://mozilla.org/MPL/2.0/>.
//
// SPDX-License-Identifier: MPL-2.0
#[derive(Default, Debug, PartialEq, Eq, Clone, Copy)]
pub struct AggregationHeader {
pub leading_fragment: bool,
pub trailing_fragment: bool,
pub obu_count: Option<u8>,
pub start_of_seq: bool,
}
impl From<u8> for AggregationHeader {
fn from(byte: u8) -> Self {
Self {
leading_fragment: byte & (1 << 7) != 0,
trailing_fragment: byte & (1 << 6) != 0,
obu_count: match (byte >> 4) & 0b11 {
0 => None,
n => Some(n),
},
start_of_seq: byte & (1 << 3) != 0,
}
}
}
impl From<&[u8; 1]> for AggregationHeader {
fn from(slice: &[u8; 1]) -> Self {
AggregationHeader::from(slice[0])
}
}
impl From<AggregationHeader> for u8 {
fn from(aggr: AggregationHeader) -> Self {
let mut byte = 0;
byte |= (aggr.leading_fragment as u8) << 7;
byte |= (aggr.trailing_fragment as u8) << 6;
byte |= (aggr.start_of_seq as u8) << 3;
if let Some(n) = aggr.obu_count {
assert!(n < 0b100, "OBU count out of range");
byte |= n << 4;
}
byte
}
}
#[cfg(test)]
mod tests {
use crate::common::*;
const HEADERS: [(u8, AggregationHeader); 3] = [
(
0b01011000,
AggregationHeader {
leading_fragment: false,
trailing_fragment: true,
obu_count: Some(1),
start_of_seq: true,
},
),
(
0b11110000,
AggregationHeader {
leading_fragment: true,
trailing_fragment: true,
obu_count: Some(3),
start_of_seq: false,
},
),
(
0b10000000,
AggregationHeader {
leading_fragment: true,
trailing_fragment: false,
obu_count: None,
start_of_seq: false,
},
),
];
#[test]
fn test_aggr_header() {
for (idx, (byte, header)) in HEADERS.into_iter().enumerate() {
println!("running test {}...", idx);
assert_eq!(byte, header.into());
assert_eq!(header, byte.into());
assert_eq!(header, (&[byte; 1]).into());
}
}
}

View file

@ -0,0 +1,98 @@
//
// Copyright (C) 2022 Vivienne Watermeier <vwatermeier@igalia.com>
//
// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
// If a copy of the MPL was not distributed with this file, You can obtain one at
// <https://mozilla.org/MPL/2.0/>.
//
// SPDX-License-Identifier: MPL-2.0
macro_rules! err_flow {
($element:ident, read, $msg:literal) => {
|err| {
gst::element_error!($element, ResourceError::Read, [$msg, err]);
FlowError::Error
}
};
($element:ident, write, $msg:literal) => {
|err| {
gst::element_error!($element, ResourceError::Write, [$msg, err]);
FlowError::Error
}
};
($element:ident, buf_read) => {
err_flow!($element, read, "Failed to read buffer: {}")
};
($element:ident, aggr_header_write) => {
err_flow!(
$element,
write,
"Failed to write aggregation header to the payload: {}"
)
};
($element:ident, leb_write) => {
err_flow!(
$element,
write,
"Failed to write leb128 size field to the payload: {}"
)
};
($element:ident, obu_write) => {
err_flow!(
$element,
write,
"Failed to write OBU bytes to the payload: {}"
)
};
($element:ident, outbuf_alloc) => {
err_flow!($element, write, "Failed to allocate output buffer: {}")
};
}
macro_rules! err_opt {
($element:ident, read, $msg:literal) => {
|err| {
gst::element_error!($element, ResourceError::Read, [$msg, err]);
Option::<()>::None
}
};
($element:ident, write, $msg:literal) => {
|err| {
gst::element_error!($element, ResourceError::Write, [$msg, err]);
Option::<()>::None
}
};
($element:ident, buf_alloc) => {
err_opt!($element, write, "Failed to allocate new buffer: {}")
};
($element:ident, payload_buf) => {
err_opt!($element, read, "Failed to get RTP payload buffer: {}")
};
($element:ident, payload_map) => {
err_opt!($element, read, "Failed to map payload as readable: {}")
};
($element:ident, buf_take) => {
err_opt!($element, read, "Failed to take buffer from adapter: {}")
};
($element:ident, aggr_header_read) => {
err_opt!($element, read, "Failed to read aggregation header: {}")
};
($element:ident, leb_read) => {
err_opt!($element, read, "Failed to read leb128 size field: {}")
};
($element:ident, leb_write) => {
err_opt!($element, read, "Failed to write leb128 size field: {}")
};
($element:ident, obu_read) => {
err_opt!($element, read, "Failed to read OBU header: {}")
};
($element:ident, buf_read) => {
err_opt!($element, read, "Failed to read RTP buffer: {}")
};
}
pub(crate) use err_flow;
pub(crate) use err_opt;

View file

@ -0,0 +1,102 @@
//
// Copyright (C) 2022 Vivienne Watermeier <vwatermeier@igalia.com>
//
// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
// If a copy of the MPL was not distributed with this file, You can obtain one at
// <https://mozilla.org/MPL/2.0/>.
//
// SPDX-License-Identifier: MPL-2.0
#![allow(non_camel_case_types)]
use bitstream_io::{BitRead, BitReader, BitWrite, BitWriter, Endianness};
use std::io::{self, Read, Seek, Write};
pub fn parse_leb128<R, E>(reader: &mut BitReader<R, E>) -> io::Result<u32>
where
R: Read + Seek,
E: Endianness,
{
let mut value = 0;
for i in 0..8 {
let byte = reader.read::<u32>(8)?;
value |= (byte & 0x7f) << (i * 7);
if byte & 0x80 == 0 {
break;
}
}
reader.byte_align();
Ok(value)
}
pub fn write_leb128<W, E>(writer: &mut BitWriter<W, E>, mut value: u32) -> io::Result<()>
where
W: Write + Seek,
E: Endianness,
{
loop {
writer.write_bit(value > 0x7f)?;
writer.write(7, value & 0x7f)?;
value >>= 7;
if value == 0 {
writer.byte_align()?;
return Ok(());
}
}
}
pub fn leb128_size(mut value: u32) -> u8 {
let mut bytes = 1;
loop {
value >>= 7;
if value == 0 {
return bytes;
}
bytes += 1;
}
}
#[cfg(test)]
mod tests {
use super::*;
use bitstream_io::{BigEndian, BitReader, BitWrite, BitWriter};
use std::io::Cursor;
#[test]
fn test_leb128() {
const TEST_CASES: [(u32, &[u8]); 8] = [
(0, &[0x00]),
(1, &[0x01]),
(2, &[0x02]),
(3, &[0x03]),
(123, &[0x7b]),
(2468, &[0xa4, 0x13]),
(987654, &[0x86, 0xa4, 0x3c]),
(u32::MAX, &[0xff, 0xff, 0xff, 0xff, 0x0f]),
];
for (value, encoding) in TEST_CASES {
println!("testing: value={}", value);
let mut reader = BitReader::endian(Cursor::new(&encoding), BigEndian);
assert_eq!(value, parse_leb128(&mut reader).unwrap());
assert_eq!(
encoding.len() as u64 * 8,
reader.position_in_bits().unwrap()
);
let mut writer = BitWriter::endian(Cursor::new(Vec::new()), BigEndian);
write_leb128(&mut writer, value).unwrap();
writer.byte_align().unwrap();
let mut data = writer.into_writer();
data.set_position(0);
let mut reader = BitReader::endian(data, BigEndian);
assert_eq!(value, parse_leb128(&mut reader).unwrap());
}
}
}

View file

@ -0,0 +1,23 @@
//
// Copyright (C) 2022 Vivienne Watermeier <vwatermeier@igalia.com>
//
// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
// If a copy of the MPL was not distributed with this file, You can obtain one at
// <https://mozilla.org/MPL/2.0/>.
//
// SPDX-License-Identifier: MPL-2.0
use bitstream_io::BigEndian;
pub const CLOCK_RATE: u32 = 90000;
pub const ENDIANNESS: BigEndian = BigEndian;
mod aggr_header;
mod error;
mod integers;
mod obu;
pub use aggr_header::*;
pub(crate) use error::*;
pub use integers::*;
pub use obu::*;

View file

@ -0,0 +1,418 @@
//
// Copyright (C) 2022 Vivienne Watermeier <vwatermeier@igalia.com>
//
// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
// If a copy of the MPL was not distributed with this file, You can obtain one at
// <https://mozilla.org/MPL/2.0/>.
//
// SPDX-License-Identifier: MPL-2.0
use crate::common::{leb128_size, parse_leb128};
use bitstream_io::{BitRead, BitReader, Endianness};
use std::io::{self, Read, Seek};
#[derive(Default, Debug, Clone, Copy, PartialEq, Eq)]
pub struct UnsizedObu {
pub obu_type: ObuType,
pub has_extension: bool,
pub temporal_id: u8,
pub spatial_id: u8,
pub header_len: u32,
/// indicates that only part of this OBU has been processed so far
pub is_fragment: bool,
}
#[derive(Default, Debug, Clone, Copy, PartialEq, Eq)]
pub struct SizedObu {
pub obu_type: ObuType,
pub has_extension: bool,
/// If the OBU header is followed by a leb128 size field.
pub has_size_field: bool,
pub temporal_id: u8,
pub spatial_id: u8,
/// size of the OBU payload in bytes.
/// This may refer to different sizes in different contexts, not always
/// to the entire OBU payload as it is in the AV1 bitstream.
pub size: u32,
/// the number of bytes the leb128 size field will take up
/// when written with write_leb128().
/// This does not imply `has_size_field`, and does not necessarily match with
/// the length of the internal size field if present.
pub leb_size: u32,
pub header_len: u32,
/// indicates that only part of this OBU has been processed so far
pub is_fragment: bool,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ObuType {
Reserved,
SequenceHeader,
TemporalDelimiter,
FrameHeader,
TileGroup,
Metadata,
Frame,
RedundantFrameHeader,
TileList,
Padding,
}
impl Default for ObuType {
fn default() -> Self {
Self::Reserved
}
}
impl UnsizedObu {
pub fn parse<R, E>(reader: &mut BitReader<R, E>) -> io::Result<Self>
where
R: Read + Seek,
E: Endianness,
{
// check the forbidden bit
if reader.read_bit()? {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"forbidden bit in OBU header is set",
));
}
let obu_type = reader.read::<u8>(4)?.into();
let has_extension = reader.read_bit()?;
// make sure there is no size field
if reader.read_bit()? {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"did not expect size field",
));
}
// ignore the reserved bit
let _ = reader.read_bit()?;
let (temporal_id, spatial_id) = if has_extension {
(reader.read::<u8>(3)?, reader.read::<u8>(2)?)
} else {
(0, 0)
};
reader.byte_align();
Ok(Self {
obu_type,
has_extension,
temporal_id,
spatial_id,
header_len: has_extension as u32 + 1,
is_fragment: false,
})
}
/// Convert to a `SizedObu` without internal size field and the given sizes.
pub fn as_sized(&self, size: u32, leb_size: u32) -> SizedObu {
SizedObu {
obu_type: self.obu_type,
has_extension: self.has_extension,
has_size_field: false,
temporal_id: self.temporal_id,
spatial_id: self.spatial_id,
size,
leb_size,
header_len: self.header_len,
is_fragment: self.is_fragment,
}
}
}
impl SizedObu {
/// Parse an OBU header and size field. If the OBU is not expected to contain
/// a size field, but the size is known from external information,
/// parse as an `UnsizedObu` and use `to_sized`.
pub fn parse<R, E>(reader: &mut BitReader<R, E>) -> io::Result<Self>
where
R: Read + Seek,
E: Endianness,
{
// check the forbidden bit
if reader.read_bit()? {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"forbidden bit in OBU header is set",
));
}
let obu_type = reader.read::<u8>(4)?.into();
let has_extension = reader.read_bit()?;
// require a size field
if !reader.read_bit()? {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"expected a size field",
));
}
// ignore the reserved bit
let _ = reader.read_bit()?;
let (temporal_id, spatial_id) = if has_extension {
(reader.read::<u8>(3)?, reader.read::<u8>(2)?)
} else {
(0, 0)
};
reader.byte_align();
let size = parse_leb128(reader)?;
let leb_size = leb128_size(size) as u32;
Ok(Self {
obu_type,
has_extension,
has_size_field: true,
temporal_id,
spatial_id,
size,
leb_size,
header_len: has_extension as u32 + 1,
is_fragment: false,
})
}
/// The amount of bytes this OBU will take up, including the space needed for
/// its leb128 size field.
pub fn full_size(&self) -> u32 {
self.size + self.leb_size + self.header_len
}
/// The amount of bytes this OBU will take up without a leb128 size field.
pub fn partial_size(&self) -> u32 {
self.size + self.header_len
}
}
impl From<u8> for ObuType {
fn from(n: u8) -> Self {
assert!(n < 16);
match n {
1 => Self::SequenceHeader,
2 => Self::TemporalDelimiter,
3 => Self::FrameHeader,
4 => Self::TileGroup,
5 => Self::Metadata,
6 => Self::Frame,
7 => Self::RedundantFrameHeader,
8 => Self::TileList,
15 => Self::Padding,
_ => Self::Reserved,
}
}
}
impl From<ObuType> for u8 {
fn from(ty: ObuType) -> Self {
match ty {
ObuType::Reserved => 0,
ObuType::SequenceHeader => 1,
ObuType::TemporalDelimiter => 2,
ObuType::FrameHeader => 3,
ObuType::TileGroup => 4,
ObuType::Metadata => 5,
ObuType::Frame => 6,
ObuType::RedundantFrameHeader => 7,
ObuType::TileList => 8,
ObuType::Padding => 15,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use bitstream_io::{BigEndian, BitRead, BitReader};
use once_cell::sync::Lazy;
use std::io::Cursor;
#[allow(clippy::type_complexity)]
static OBUS: Lazy<Vec<(SizedObu, Vec<u8>, u64, UnsizedObu, Vec<u8>)>> = Lazy::new(|| {
vec![
(
SizedObu {
obu_type: ObuType::TemporalDelimiter,
has_extension: false,
has_size_field: true,
temporal_id: 0,
spatial_id: 0,
size: 0,
leb_size: 1,
header_len: 1,
is_fragment: false,
},
vec![0b0001_0010, 0b0000_0000],
2,
UnsizedObu {
obu_type: ObuType::TemporalDelimiter,
has_extension: false,
temporal_id: 0,
spatial_id: 0,
header_len: 1,
is_fragment: false,
},
vec![0b0001_0000],
),
(
SizedObu {
obu_type: ObuType::Padding,
has_extension: false,
has_size_field: true,
temporal_id: 0,
spatial_id: 0,
size: 10,
leb_size: 1,
header_len: 1,
is_fragment: false,
},
vec![0b0111_1010, 0b0000_1010, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
2,
UnsizedObu {
obu_type: ObuType::Padding,
has_extension: false,
temporal_id: 0,
spatial_id: 0,
header_len: 1,
is_fragment: false,
},
vec![0b0111_1000, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
),
(
SizedObu {
obu_type: ObuType::Frame,
has_extension: true,
has_size_field: true,
temporal_id: 4,
spatial_id: 3,
size: 5,
leb_size: 1,
header_len: 2,
is_fragment: false,
},
vec![0b0011_0110, 0b1001_1000, 0b0000_0101, 1, 2, 3, 4, 5],
3,
UnsizedObu {
obu_type: ObuType::Frame,
has_extension: true,
temporal_id: 4,
spatial_id: 3,
header_len: 2,
is_fragment: false,
},
vec![0b0011_0100, 0b1001_1000, 1, 2, 3, 4, 5],
),
]
});
#[test]
fn test_parse() {
for (idx, (sized_obu, sized_bytes, expected_position, unsized_obu, unsized_bytes)) in
(*OBUS).iter().enumerate()
{
println!("running test {}...", idx);
{
println!(" parsing sized...");
let mut reader = BitReader::endian(Cursor::new(&sized_bytes), BigEndian);
assert_eq!(SizedObu::parse(&mut reader).unwrap(), *sized_obu);
assert!(reader.byte_aligned());
assert_eq!(reader.into_reader().position(), *expected_position);
};
{
println!(" parsing unsized...");
let mut reader = BitReader::endian(Cursor::new(&unsized_bytes), BigEndian);
assert_eq!(UnsizedObu::parse(&mut reader).unwrap(), *unsized_obu);
assert!(reader.byte_aligned());
assert_eq!(
reader.into_reader().position(),
unsized_obu.header_len as u64
);
}
}
}
#[test]
fn test_conversion() {
for (idx, (sized_obu, _, _, unsized_obu, _)) in (*OBUS).iter().enumerate() {
println!("running test {}...", idx);
assert_eq!(
unsized_obu.as_sized(sized_obu.size, sized_obu.leb_size),
SizedObu {
has_size_field: false,
..*sized_obu
},
);
}
}
#[test]
fn test_parse_rtp_obu() {
let obus = [
(
SizedObu {
obu_type: ObuType::TemporalDelimiter,
has_extension: false,
has_size_field: false,
temporal_id: 0,
spatial_id: 0,
size: 0,
leb_size: 1,
header_len: 1,
is_fragment: false,
},
vec![0b0001_0000],
),
(
SizedObu {
obu_type: ObuType::Padding,
has_extension: false,
has_size_field: false,
temporal_id: 0,
spatial_id: 0,
size: 10,
leb_size: 1,
header_len: 1,
is_fragment: false,
},
vec![0b0111_1000, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
),
(
SizedObu {
obu_type: ObuType::Frame,
has_extension: true,
has_size_field: false,
temporal_id: 4,
spatial_id: 3,
size: 5,
leb_size: 1,
header_len: 2,
is_fragment: false,
},
vec![0b0011_0100, 0b1001_1000, 1, 2, 3, 4, 5],
),
];
for (idx, (sized_obu, rtp_bytes)) in obus.into_iter().enumerate() {
println!("running test {}...", idx);
let mut reader = BitReader::endian(Cursor::new(&rtp_bytes), BigEndian);
let unsized_obu = UnsizedObu::parse(&mut reader).unwrap();
assert_eq!(
unsized_obu.as_sized(sized_obu.size, sized_obu.leb_size),
sized_obu
);
}
}
}

585
net/rtpav1/src/depay/imp.rs Normal file
View file

@ -0,0 +1,585 @@
//
// Copyright (C) 2022 Vivienne Watermeier <vwatermeier@igalia.com>
//
// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
// If a copy of the MPL was not distributed with this file, You can obtain one at
// <https://mozilla.org/MPL/2.0/>.
//
// SPDX-License-Identifier: MPL-2.0
use gst::{
glib,
subclass::{prelude::*, ElementMetadata},
Buffer, BufferFlags, Caps, DebugCategory, DebugColorFlags, IntRange, Memory, PadDirection,
PadPresence, PadTemplate, ResourceError, StateChange, StateChangeError, StateChangeSuccess,
};
use gst_base::UniqueAdapter;
use gst_rtp::{
rtp_buffer::{RTPBuffer, Readable},
subclass::prelude::*,
};
use std::{
cmp::Ordering,
io::{Cursor, Read, Seek, SeekFrom},
sync::{Mutex, MutexGuard},
};
use bitstream_io::{BitReader, BitWriter};
use once_cell::sync::Lazy;
use crate::common::{
err_opt, leb128_size, parse_leb128, write_leb128, AggregationHeader, ObuType, SizedObu,
UnsizedObu, CLOCK_RATE, ENDIANNESS,
};
// TODO: handle internal size fields in RTP OBUs
#[derive(Debug, Default)]
struct State {
/// used to store outgoing OBUs until the TU is complete
adapter: UniqueAdapter,
last_timestamp: Option<u32>,
/// if true, the last packet of a temporal unit has been received
marked_packet: bool,
/// holds data for a fragment
obu_fragment: Option<(UnsizedObu, Vec<u8>)>,
}
#[derive(Debug, Default)]
pub struct RTPAv1Depay {
state: Mutex<State>,
}
static CAT: Lazy<DebugCategory> = Lazy::new(|| {
DebugCategory::new(
"rtpav1depay",
DebugColorFlags::empty(),
Some("RTP AV1 Depayloader"),
)
});
static TEMPORAL_DELIMITER: Lazy<Memory> = Lazy::new(|| Memory::from_slice(&[0b0001_0010, 0]));
impl RTPAv1Depay {
fn reset<'s>(
&'s self,
element: &<Self as ObjectSubclass>::Type,
state: &mut MutexGuard<'s, State>,
) {
gst::debug!(CAT, obj: element, "resetting state");
**state = State::default()
}
}
#[glib::object_subclass]
impl ObjectSubclass for RTPAv1Depay {
const NAME: &'static str = "GstRtpAv1Depay";
type Type = super::RTPAv1Depay;
type ParentType = gst_rtp::RTPBaseDepayload;
}
impl ObjectImpl for RTPAv1Depay {}
impl GstObjectImpl for RTPAv1Depay {}
impl ElementImpl for RTPAv1Depay {
fn metadata() -> Option<&'static ElementMetadata> {
static ELEMENT_METADATA: Lazy<ElementMetadata> = Lazy::new(|| {
gst::subclass::ElementMetadata::new(
"RTP AV1 Depayloader",
"Codec/Depayloader/Network/RTP",
"Depayload AV1 from RTP packets",
"Vivienne Watermeier <vwatermeier@igalia.com>",
)
});
Some(&*ELEMENT_METADATA)
}
fn pad_templates() -> &'static [PadTemplate] {
static PAD_TEMPLATES: Lazy<Vec<PadTemplate>> = Lazy::new(|| {
let sink_pad_template = PadTemplate::new(
"sink",
PadDirection::Sink,
PadPresence::Always,
&Caps::builder("application/x-rtp")
.field("media", "video")
.field("payload", IntRange::new(96, 127))
.field("clock-rate", CLOCK_RATE as i32)
.field("encoding-name", "AV1")
.build(),
)
.unwrap();
let src_pad_template = PadTemplate::new(
"src",
PadDirection::Src,
PadPresence::Always,
&Caps::builder("video/x-av1")
.field("parsed", true)
.field("stream-format", "obu-stream")
.field("alignment", "tu")
.build(),
)
.unwrap();
vec![src_pad_template, sink_pad_template]
});
PAD_TEMPLATES.as_ref()
}
fn change_state(
&self,
element: &Self::Type,
transition: StateChange,
) -> Result<StateChangeSuccess, StateChangeError> {
gst::debug!(CAT, obj: element, "changing state: {}", transition);
if matches!(transition, StateChange::ReadyToPaused) {
let mut state = self.state.lock().unwrap();
self.reset(element, &mut state);
}
let ret = self.parent_change_state(element, transition);
if matches!(transition, StateChange::PausedToReady) {
let mut state = self.state.lock().unwrap();
self.reset(element, &mut state);
}
ret
}
}
impl RTPBaseDepayloadImpl for RTPAv1Depay {
fn process_rtp_packet(
&self,
element: &Self::Type,
rtp: &RTPBuffer<Readable>,
) -> Option<Buffer> {
gst::log!(
CAT,
obj: element,
"processing RTP packet with payload type {} and size {}",
rtp.payload_type(),
rtp.buffer().size(),
);
let payload = rtp
.payload_buffer()
.map_err(err_opt!(element, payload_buf))
.ok()?;
let payload_map = payload
.map_readable()
.map_err(err_opt!(element, payload_map))
.ok()?;
let mut state = self.state.lock().unwrap();
if rtp.buffer().flags().contains(BufferFlags::DISCONT) {
gst::debug!(CAT, obj: element, "buffer discontinuity");
self.reset(element, &mut state);
}
// number of bytes that can be used in the next outgoing buffer
let mut bytes_ready = 0;
let mut reader = Cursor::new(payload_map.as_ref());
let mut ready_obus = Buffer::new();
let aggr_header = {
let mut byte = [0; 1];
reader
.read_exact(&mut byte)
.map_err(err_opt!(element, aggr_header_read))
.ok()?;
AggregationHeader::from(&byte)
};
// handle new temporal units
if state.marked_packet || state.last_timestamp != Some(rtp.timestamp()) {
if state.last_timestamp.is_some() && state.obu_fragment.is_some() {
gst::error!(
CAT,
obj: element,
concat!(
"invalid packet: packet is part of a new TU but ",
"the previous TU still has an incomplete OBU",
"marked_packet: {}, last_timestamp: {:?}"
),
state.marked_packet,
state.last_timestamp
);
self.reset(element, &mut state);
return None;
}
// all the currently stored bytes can be packed into the next outgoing buffer
bytes_ready = state.adapter.available();
// the next temporal unit starts with a temporal delimiter OBU
ready_obus
.get_mut()
.unwrap()
.insert_memory(None, TEMPORAL_DELIMITER.clone());
state.marked_packet = false;
}
state.marked_packet = rtp.is_marker();
state.last_timestamp = Some(rtp.timestamp());
// parse and prepare the received OBUs
let mut idx = 0;
// handle leading OBU fragment
if let Some((obu, ref mut bytes)) = &mut state.obu_fragment {
if !aggr_header.leading_fragment {
gst::error!(
CAT,
obj: element,
"invalid packet: ignores unclosed OBU fragment"
);
return None;
}
let (element_size, is_last_obu) =
find_element_info(element, rtp, &mut reader, &aggr_header, idx)?;
let bytes_end = bytes.len();
bytes.resize(bytes_end + element_size as usize, 0);
reader
.read_exact(&mut bytes[bytes_end..])
.map_err(err_opt!(element, buf_read))
.ok()?;
// if this OBU is complete, it can be appended to the adapter
if !(is_last_obu && aggr_header.trailing_fragment) {
let full_obu = {
let size = bytes.len() as u32 - obu.header_len;
let leb_size = leb128_size(size) as u32;
obu.as_sized(size, leb_size)
};
let buffer = translate_obu(element, &mut Cursor::new(bytes.as_slice()), &full_obu)?;
state.adapter.push(buffer);
state.obu_fragment = None;
}
}
// handle other OBUs, including trailing fragments
while reader.position() < rtp.payload_size() as u64 {
let (element_size, is_last_obu) =
find_element_info(element, rtp, &mut reader, &aggr_header, idx)?;
let header_pos = reader.position();
let mut bitreader = BitReader::endian(&mut reader, ENDIANNESS);
let obu = UnsizedObu::parse(&mut bitreader)
.map_err(err_opt!(element, obu_read))
.ok()?;
reader
.seek(SeekFrom::Start(header_pos))
.map_err(err_opt!(element, buf_read))
.ok()?;
// ignore these OBU types
if matches!(obu.obu_type, ObuType::TemporalDelimiter | ObuType::TileList) {
reader
.seek(SeekFrom::Current(element_size as i64))
.map_err(err_opt!(element, buf_read))
.ok()?;
}
// trailing OBU fragments are stored in the state
if is_last_obu && aggr_header.trailing_fragment {
let bytes_left = rtp.payload_size() - (reader.position() as u32);
let mut bytes = vec![0; bytes_left as usize];
reader
.read_exact(bytes.as_mut_slice())
.map_err(err_opt!(element, buf_read))
.ok()?;
state.obu_fragment = Some((obu, bytes));
}
// full OBUs elements are translated and appended to the adapter
else {
let full_obu = {
let size = element_size - obu.header_len;
let leb_size = leb128_size(size) as u32;
obu.as_sized(size, leb_size)
};
ready_obus.append(translate_obu(element, &mut reader, &full_obu)?);
}
idx += 1;
}
state.adapter.push(ready_obus);
if state.marked_packet {
if state.obu_fragment.is_some() {
gst::error!(
CAT,
obj: element,
concat!(
"invalid packet: has marker bit set, but ",
"last OBU is not yet complete"
)
);
self.reset(element, &mut state);
return None;
}
bytes_ready = state.adapter.available();
}
// now push all the complete temporal units
if bytes_ready > 0 {
gst::log!(
CAT,
obj: element,
"creating buffer containing {} bytes of data...",
bytes_ready
);
Some(
state
.adapter
.take_buffer(bytes_ready)
.map_err(err_opt!(element, buf_take))
.ok()?,
)
} else {
None
}
}
}
/// Find out the next OBU element's size, and if it is the last OBU in the packet.
/// The reader is expected to be at the first byte of the element,
/// or its preceding size field if present,
/// and will be at the first byte past the element's size field afterwards.
fn find_element_info(
element: &<RTPAv1Depay as ObjectSubclass>::Type,
rtp: &RTPBuffer<Readable>,
reader: &mut Cursor<&[u8]>,
aggr_header: &AggregationHeader,
index: u32,
) -> Option<(u32, bool)> {
let element_size: u32;
let is_last_obu: bool;
if let Some(count) = aggr_header.obu_count {
is_last_obu = index + 1 == count as u32;
element_size = if is_last_obu {
rtp.payload_size() - (reader.position() as u32)
} else {
let mut bitreader = BitReader::endian(reader, ENDIANNESS);
parse_leb128(&mut bitreader)
.map_err(err_opt!(element, leb_read))
.ok()? as u32
}
} else {
element_size = parse_leb128(&mut BitReader::endian(&mut *reader, ENDIANNESS))
.map_err(err_opt!(element, leb_read))
.ok()? as u32;
is_last_obu = match rtp
.payload_size()
.cmp(&(reader.position() as u32 + element_size))
{
Ordering::Greater => false,
Ordering::Equal => true,
Ordering::Less => {
gst::error!(
CAT,
obj: element,
"invalid packet: size field gives impossibly large OBU size"
);
return None;
}
};
}
Some((element_size, is_last_obu))
}
/// Using OBU data from an RTP packet, construct a buffer containing that OBU in AV1 bitstream format
fn translate_obu(
element: &<RTPAv1Depay as ObjectSubclass>::Type,
reader: &mut Cursor<&[u8]>,
obu: &SizedObu,
) -> Option<Buffer> {
let mut bytes = Buffer::with_size(obu.full_size() as usize)
.map_err(err_opt!(element, buf_alloc))
.ok()?
.into_mapped_buffer_writable()
.unwrap();
// write OBU header
reader
.read_exact(&mut bytes[..obu.header_len as usize])
.map_err(err_opt!(element, buf_read))
.ok()?;
// set `has_size_field`
bytes[0] |= 1 << 1;
// skip internal size field if present
if obu.has_size_field {
parse_leb128(&mut BitReader::endian(&mut *reader, ENDIANNESS))
.map_err(err_opt!(element, leb_read))
.ok()?;
}
// write size field
write_leb128(
&mut BitWriter::endian(
Cursor::new(&mut bytes[obu.header_len as usize..]),
ENDIANNESS,
),
obu.size,
)
.map_err(err_opt!(element, leb_write))
.ok()?;
// write OBU payload
reader
.read_exact(&mut bytes[(obu.header_len + obu.leb_size) as usize..])
.map_err(err_opt!(element, buf_read))
.ok()?;
Some(bytes.into_buffer())
}
#[cfg(test)]
#[rustfmt::skip]
mod tests {
use super::*;
use std::io::Cursor;
use gst::buffer::Buffer;
use gst_rtp::rtp_buffer::RTPBufferExt;
#[test]
fn test_translate_obu() {
gst::init().unwrap();
let test_data = [
(
SizedObu {
obu_type: ObuType::TemporalDelimiter,
has_extension: false,
has_size_field: false,
temporal_id: 0,
spatial_id: 0,
size: 0,
leb_size: 1,
header_len: 1,
is_fragment: false,
},
vec![0b0001_0000],
vec![0b0001_0010, 0],
), (
SizedObu {
obu_type: ObuType::Frame,
has_extension: true,
has_size_field: false,
temporal_id: 3,
spatial_id: 2,
size: 5,
leb_size: 1,
header_len: 2,
is_fragment: false,
},
vec![0b0011_0100, 0b0111_0000, 1, 2, 3, 4, 5],
vec![0b0011_0110, 0b0111_0000, 0b0000_0101, 1, 2, 3, 4, 5],
), (
SizedObu {
obu_type: ObuType::Frame,
has_extension: true,
has_size_field: true,
temporal_id: 3,
spatial_id: 2,
size: 5,
leb_size: 1,
header_len: 2,
is_fragment: false,
},
vec![0b0011_0100, 0b0111_0000, 0b0000_0101, 1, 2, 3, 4, 5],
vec![0b0011_0110, 0b0111_0000, 0b0000_0101, 1, 2, 3, 4, 5],
)
];
let element = <RTPAv1Depay as ObjectSubclass>::Type::new();
for (idx, (obu, rtp_bytes, out_bytes)) in test_data.into_iter().enumerate() {
println!("running test {}...", idx);
let mut reader = Cursor::new(rtp_bytes.as_slice());
let actual = translate_obu(&element, &mut reader, &obu);
assert_eq!(reader.position(), rtp_bytes.len() as u64);
assert!(actual.is_some());
let actual = actual
.unwrap()
.into_mapped_buffer_readable()
.unwrap();
assert_eq!(actual.as_slice(), out_bytes.as_slice());
}
}
#[test]
#[allow(clippy::type_complexity)]
fn test_find_element_info() {
gst::init().unwrap();
let test_data: [(Vec<(u32, bool)>, u32, Vec<u8>, AggregationHeader); 4] = [
(
vec![(1, false)], // expected results
100, // RTP payload size
vec![0b0000_0001, 0b0001_0000],
AggregationHeader { obu_count: None, ..AggregationHeader::default() },
), (
vec![(5, true)],
5,
vec![0b0111_1000, 0, 0, 0, 0],
AggregationHeader { obu_count: Some(1), ..AggregationHeader::default() },
), (
vec![(7, true)],
8,
vec![0b0000_0111, 0b0011_0110, 0b0010_1000, 0b0000_1010, 1, 2, 3, 4],
AggregationHeader { obu_count: None, ..AggregationHeader::default() },
), (
vec![(6, false), (4, true)],
11,
vec![0b0000_0110, 0b0111_1000, 1, 2, 3, 4, 5, 0b0011_0000, 1, 2, 3],
AggregationHeader { obu_count: Some(2), ..AggregationHeader::default() },
)
];
let element = <RTPAv1Depay as ObjectSubclass>::Type::new();
for (idx, (
info,
payload_size,
rtp_bytes,
aggr_header,
)) in test_data.into_iter().enumerate() {
println!("running test {}...", idx);
let buffer = Buffer::new_rtp_with_sizes(payload_size, 0, 0).unwrap();
let rtp = RTPBuffer::from_buffer_readable(&buffer).unwrap();
let mut reader = Cursor::new(rtp_bytes.as_slice());
let mut element_size = 0;
for (obu_idx, expected) in info.into_iter().enumerate() {
if element_size != 0 {
reader.seek(SeekFrom::Current(element_size as i64)).unwrap();
}
println!("testing element {} with reader position {}...", obu_idx, reader.position());
let actual = find_element_info(&element, &rtp, &mut reader, &aggr_header, obu_idx as u32);
assert_eq!(actual, Some(expected));
element_size = actual.unwrap().0;
}
}
}
}

View file

@ -0,0 +1,35 @@
//
// Copyright (C) 2022 Vivienne Watermeier <vwatermeier@igalia.com>
//
// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
// If a copy of the MPL was not distributed with this file, You can obtain one at
// <https://mozilla.org/MPL/2.0/>.
//
// SPDX-License-Identifier: MPL-2.0
#![allow(clippy::new_without_default)]
use glib::Object;
use gst::glib;
use gst::prelude::*;
pub mod imp;
glib::wrapper! {
pub struct RTPAv1Depay(ObjectSubclass<imp::RTPAv1Depay>)
@extends gst_rtp::RTPBaseDepayload, gst::Element, gst::Object;
}
impl RTPAv1Depay {
pub fn new() -> Self {
Object::new(&[]).expect("Failed to create AV1 depayloader")
}
}
pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
gst::Element::register(
Some(plugin),
"rtpav1depay",
gst::Rank::Marginal,
RTPAv1Depay::static_type(),
)
}

32
net/rtpav1/src/lib.rs Normal file
View file

@ -0,0 +1,32 @@
//
// Copyright (C) 2022 Vivienne Watermeier <vwatermeier@igalia.com>
//
// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
// If a copy of the MPL was not distributed with this file, You can obtain one at
// <https://mozilla.org/MPL/2.0/>.
//
// SPDX-License-Identifier: MPL-2.0
use gst::glib;
mod common;
pub mod depay;
pub mod pay;
fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
depay::register(plugin)?;
pay::register(plugin)?;
Ok(())
}
gst::plugin_define!(
rtpav1,
env!("CARGO_PKG_DESCRIPTION"),
plugin_init,
concat!(env!("CARGO_PKG_VERSION"), "-", env!("COMMIT_ID")),
"MPL",
env!("CARGO_PKG_NAME"),
env!("CARGO_PKG_NAME"),
env!("CARGO_PKG_REPOSITORY"),
env!("BUILD_REL_DATE")
);

818
net/rtpav1/src/pay/imp.rs Normal file
View file

@ -0,0 +1,818 @@
//
// Copyright (C) 2022 Vivienne Watermeier <vwatermeier@igalia.com>
//
// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
// If a copy of the MPL was not distributed with this file, You can obtain one at
// <https://mozilla.org/MPL/2.0/>.
//
// SPDX-License-Identifier: MPL-2.0
use gst::{
glib,
subclass::{prelude::*, ElementMetadata},
Buffer, BufferFlags, Caps, ClockTime, DebugCategory, DebugColorFlags, Event, EventType,
FlowError, FlowSuccess, IntRange, LoggableError, PadDirection, PadPresence, PadTemplate,
ResourceError, StateChange, StateChangeError, StateChangeSuccess,
};
use gst_rtp::{prelude::*, rtp_buffer::RTPBuffer, subclass::prelude::*, RTPBasePayload};
use std::{
io::{Cursor, Read, Seek, SeekFrom, Write},
sync::{Mutex, MutexGuard},
};
use bitstream_io::{BitReader, BitWriter};
use once_cell::sync::Lazy;
use crate::common::{
err_flow, leb128_size, write_leb128, ObuType, SizedObu, CLOCK_RATE, ENDIANNESS,
};
static CAT: Lazy<DebugCategory> = Lazy::new(|| {
DebugCategory::new(
"rtpav1pay",
DebugColorFlags::empty(),
Some("RTP AV1 Payloader"),
)
});
// TODO: properly handle `max_ptime` and `min_ptime`
/// Information about the OBUs intended to be grouped into one packet
#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)]
struct PacketOBUData {
obu_count: usize,
payload_size: u32,
last_obu_fragment_size: Option<u32>,
omit_last_size_field: bool,
ends_temporal_unit: bool,
}
/// Temporary information held between invocations of `consider_new_packet()`
#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)]
struct TempPacketData {
payload_limit: u32,
required_ids: Option<(u8, u8)>,
/// bytes used for an OBUs size field will only be added to the total
/// once its known for sure it will be placed in the packet
pending_bytes: u32,
packet: PacketOBUData,
}
#[derive(Clone, Debug, Default, PartialEq, Eq)]
struct ObuData {
info: SizedObu,
bytes: Vec<u8>,
dts: Option<ClockTime>,
pts: Option<ClockTime>,
}
#[derive(Clone, Debug, PartialEq, Eq)]
struct State {
/// Holds header information and raw bytes for all received OBUs,
/// as well as DTS and PTS
//obus: Vec<(SizedObu, Vec<u8>, Option<ClockTime>, Option<ClockTime>)>,
obus: Vec<ObuData>,
/// Indicates that the first element in the Buffer is an OBU fragment,
/// left over from the previous RTP packet
open_obu_fragment: bool,
/// Indicates the next constructed packet will be the first in its sequence
/// (Corresponds to `N` field in the aggregation header)
first_packet_in_seq: bool,
temp_packet_data: Option<TempPacketData>,
}
#[derive(Debug, Default)]
pub struct RTPAv1Pay {
state: Mutex<State>,
}
impl Default for State {
fn default() -> Self {
Self {
obus: Vec::new(),
open_obu_fragment: false,
first_packet_in_seq: true,
temp_packet_data: None,
}
}
}
impl RTPAv1Pay {
fn reset<'s>(
&'s self,
element: &<Self as ObjectSubclass>::Type,
state: &mut MutexGuard<'s, State>,
) {
gst::debug!(CAT, obj: element, "resetting state");
state.obus.clear();
}
/// Parses new OBUs, stores them in the state,
/// and constructs and sends new RTP packets when appropriate.
fn handle_new_obus<'s>(
&'s self,
element: &<Self as ObjectSubclass>::Type,
state: &mut MutexGuard<'s, State>,
data: &[u8],
dts: Option<ClockTime>,
pts: Option<ClockTime>,
) -> Result<FlowSuccess, FlowError> {
let mut reader = Cursor::new(data);
while reader.position() < data.len() as u64 {
let obu_start = reader.position();
let obu = SizedObu::parse(&mut BitReader::endian(&mut reader, ENDIANNESS))
.map_err(err_flow!(element, buf_read))?;
// tile lists and temporal delimiters should not be transmitted,
// see section 5 of the RTP AV1 spec
match obu.obu_type {
// completely ignore tile lists
ObuType::TileList => {
gst::log!(CAT, obj: element, "ignoring tile list OBU");
reader
.seek(SeekFrom::Current(
(obu.header_len + obu.leb_size + obu.size) as i64,
))
.map_err(err_flow!(element, buf_read))?;
}
// keep these OBUs around for now so we know where temporal units end
ObuType::TemporalDelimiter => {
if obu.size != 0 {
gst::element_error!(
element,
ResourceError::Read,
["temporal delimiter OBUs should have empty payload"]
);
return Err(FlowError::Error);
}
state.obus.push(ObuData {
info: obu,
bytes: Vec::new(),
dts,
pts,
});
}
_ => {
let bytes_total = (obu.header_len + obu.size) as usize;
let mut bytes = vec![0; bytes_total];
// read header
reader
.seek(SeekFrom::Start(obu_start))
.map_err(err_flow!(element, buf_read))?;
reader
.read_exact(&mut bytes[0..(obu.header_len as usize)])
.map_err(err_flow!(element, buf_read))?;
// skip size field
bytes[0] &= !2_u8; // set `has_size_field` to 0
reader
.seek(SeekFrom::Current(obu.leb_size as i64))
.map_err(err_flow!(element, buf_read))?;
// read OBU bytes
reader
.read_exact(&mut bytes[(obu.header_len as usize)..bytes_total])
.map_err(err_flow!(element, buf_read))?;
state.obus.push(ObuData {
info: obu,
bytes,
dts,
pts,
});
}
}
}
while let Some(packet_data) = self.consider_new_packet(element, state, false) {
self.push_new_packet(element, state, packet_data)?;
}
Ok(FlowSuccess::Ok)
}
/// Look at the size the currently stored OBUs would require,
/// as well as their temportal IDs to decide if it is time to construct a
/// new packet, and what OBUs to include in it.
///
/// If `true` is passed for `force`, packets of any size will be accepted,
/// which is used in flushing the last OBUs after receiving an EOS for example.
fn consider_new_packet<'s>(
&'s self,
element: &<Self as ObjectSubclass>::Type,
state: &mut MutexGuard<'s, State>,
force: bool,
) -> Option<PacketOBUData> {
gst::trace!(
CAT,
obj: element,
"{} new packet, currently storing {} OBUs",
if force { "forcing" } else { "considering" },
state.obus.len()
);
let mut data = state.temp_packet_data.take().unwrap_or_else(|| {
TempPacketData {
payload_limit: RTPBuffer::calc_payload_len(element.mtu(), 0, 0),
packet: PacketOBUData {
payload_size: 1, // 1 byte is used for the aggregation header
omit_last_size_field: true,
..PacketOBUData::default()
},
..TempPacketData::default()
}
});
let mut packet = data.packet;
// figure out how many OBUs we can fit into this packet
while packet.obu_count < state.obus.len() {
// for OBUs with extension headers, spatial and temporal IDs must be equal
// to all other such OBUs in the packet
let matching_obu_ids = |obu: &SizedObu, data: &mut TempPacketData| -> bool {
if let Some((sid, tid)) = data.required_ids {
sid == obu.spatial_id && tid == obu.temporal_id
} else {
data.required_ids = Some((obu.spatial_id, obu.temporal_id));
true
}
};
let current = state.obus[packet.obu_count].info;
// should this packet be finished here?
if current.obu_type == ObuType::TemporalDelimiter {
// remove the temporal delimiter, it is not supposed to be transmitted
gst::log!(CAT, obj: element, "ignoring temporal delimiter OBU");
state.obus.remove(packet.obu_count);
if packet.obu_count > 0 {
packet.ends_temporal_unit = true;
if packet.obu_count > 3 {
packet.payload_size += data.pending_bytes;
packet.omit_last_size_field = false;
}
return Some(packet);
} else {
continue;
}
} else if packet.payload_size >= data.payload_limit
|| (packet.obu_count > 0 && current.obu_type == ObuType::SequenceHeader)
|| !matching_obu_ids(&state.obus[packet.obu_count].info, &mut data)
{
if packet.obu_count > 3 {
packet.payload_size += data.pending_bytes;
packet.omit_last_size_field = false;
}
return Some(packet);
}
// would the full OBU fit?
if packet.payload_size + data.pending_bytes + current.full_size() <= data.payload_limit
{
packet.obu_count += 1;
packet.payload_size += current.partial_size() + data.pending_bytes;
data.pending_bytes = current.leb_size;
}
// would it fit without the size field?
else if packet.obu_count < 3
&& packet.payload_size + data.pending_bytes + current.partial_size()
<= data.payload_limit
{
packet.obu_count += 1;
packet.payload_size += current.partial_size() + data.pending_bytes;
return Some(packet);
}
// otherwise consider putting an OBU fragment
else {
let leb_size = if packet.obu_count < 3 {
0
} else {
// assume the biggest possible OBU fragment,
// so if anything the size field will be smaller than expected
leb128_size(data.payload_limit - packet.payload_size) as u32
};
// is there even enough space to bother?
if packet.payload_size + data.pending_bytes + leb_size + current.header_len
< data.payload_limit
{
packet.obu_count += 1;
packet.last_obu_fragment_size = Some(
data.payload_limit - packet.payload_size - data.pending_bytes - leb_size,
);
packet.payload_size = data.payload_limit;
packet.omit_last_size_field = leb_size == 0;
} else if packet.obu_count > 3 {
packet.payload_size += data.pending_bytes;
}
return Some(packet);
}
}
if force && packet.obu_count > 0 {
if packet.obu_count > 3 {
packet.payload_size += data.pending_bytes;
packet.omit_last_size_field = false;
}
Some(packet)
} else {
// if we ran out of OBUs with space in the packet to spare, wait a bit longer
data.packet = packet;
state.temp_packet_data = Some(data);
None
}
}
/// Given the information returned by consider_new_packet(), construct and push
/// new RTP packet, filled with those OBUs.
fn push_new_packet<'s>(
&'s self,
element: &<Self as ObjectSubclass>::Type,
state: &mut MutexGuard<'s, State>,
packet: PacketOBUData,
) -> Result<FlowSuccess, FlowError> {
gst::log!(
CAT,
obj: element,
"constructing new RTP packet with {} OBUs",
packet.obu_count
);
// prepare the outgoing buffer
let mut outbuf = Buffer::new_rtp_with_sizes(packet.payload_size, 0, 0).map_err(|err| {
gst::element_error!(
element,
ResourceError::Write,
["Failed to allocate output buffer: {}", err]
);
FlowError::Error
})?;
{
// this block enforces that outbuf_mut is dropped before pushing outbuf
let outbuf_mut = outbuf
.get_mut()
.expect("Failed to get mutable reference to outbuf");
outbuf_mut.set_dts(state.obus[0].dts);
outbuf_mut.set_pts(state.obus[0].pts);
let mut rtp =
RTPBuffer::from_buffer_writable(outbuf_mut).expect("Failed to create RTPBuffer");
rtp.set_marker(packet.ends_temporal_unit);
let payload = rtp
.payload_mut()
.expect("Failed to get mutable reference to RTP payload");
let mut writer = Cursor::new(payload);
{
// construct aggregation header
let w = if packet.omit_last_size_field && packet.obu_count < 4 {
packet.obu_count
} else {
0
};
let aggr_header: [u8; 1] = [
(state.open_obu_fragment as u8) << 7 | // Z
((packet.last_obu_fragment_size != None) as u8) << 6 | // Y
(w as u8) << 4 | // W
(state.first_packet_in_seq as u8) << 3 // N
; 1];
writer
.write(&aggr_header)
.map_err(err_flow!(element, aggr_header_write))?;
state.first_packet_in_seq = false;
}
// append OBUs to the buffer
for _ in 1..packet.obu_count {
let obu = &state.obus[0];
write_leb128(
&mut BitWriter::endian(&mut writer, ENDIANNESS),
obu.info.size + obu.info.header_len,
)
.map_err(err_flow!(element, leb_write))?;
writer
.write(&obu.bytes)
.map_err(err_flow!(element, obu_write))?;
state.obus.remove(0);
}
state.open_obu_fragment = false;
{
// do the last OBU separately
// in this instance `obu_size` includes the header length
let obu_size = if let Some(size) = packet.last_obu_fragment_size {
state.open_obu_fragment = true;
size
} else {
state.obus[0].bytes.len() as u32
};
if !packet.omit_last_size_field {
write_leb128(&mut BitWriter::endian(&mut writer, ENDIANNESS), obu_size)
.map_err(err_flow!(element, leb_write))?;
}
// if this OBU is not a fragment, handle it as usual
if packet.last_obu_fragment_size == None {
writer
.write(&state.obus[0].bytes)
.map_err(err_flow!(element, obu_write))?;
state.obus.remove(0);
}
// otherwise write only a slice, and update the element
// to only contain the unwritten bytes
else {
writer
.write(&state.obus[0].bytes[0..obu_size as usize])
.map_err(err_flow!(element, obu_write))?;
let new_size = state.obus[0].bytes.len() as u32 - obu_size;
state.obus[0] = ObuData {
info: SizedObu {
size: new_size,
header_len: 0,
leb_size: leb128_size(new_size) as u32,
is_fragment: true,
..state.obus[0].info
},
bytes: Vec::from(
&state.obus[0].bytes[obu_size as usize..state.obus[0].bytes.len()],
),
..state.obus[0]
};
}
}
}
gst::log!(
CAT,
obj: element,
"pushing RTP packet of size {}",
outbuf.size()
);
element.push(outbuf)
}
}
#[glib::object_subclass]
impl ObjectSubclass for RTPAv1Pay {
const NAME: &'static str = "GstRtpAv1Pay";
type Type = super::RTPAv1Pay;
type ParentType = RTPBasePayload;
}
impl ObjectImpl for RTPAv1Pay {}
impl GstObjectImpl for RTPAv1Pay {}
impl ElementImpl for RTPAv1Pay {
fn metadata() -> Option<&'static ElementMetadata> {
static ELEMENT_METADATA: Lazy<ElementMetadata> = Lazy::new(|| {
ElementMetadata::new(
"RTP AV1 payloader",
"Codec/Payloader/Network/RTP",
"Payload AV1 as RTP packets",
"Vivienne Watermeier <vwatermeier@igalia.com>",
)
});
Some(&*ELEMENT_METADATA)
}
fn pad_templates() -> &'static [PadTemplate] {
static PAD_TEMPLATES: Lazy<Vec<PadTemplate>> = Lazy::new(|| {
let sink_pad_template = PadTemplate::new(
"sink",
PadDirection::Sink,
PadPresence::Always,
&Caps::builder("video/x-av1")
.field("parsed", true)
.field("stream-format", "obu-stream")
.field("alignment", "obu")
.build(),
)
.unwrap();
let src_pad_template = PadTemplate::new(
"src",
PadDirection::Src,
PadPresence::Always,
&Caps::builder("application/x-rtp")
.field("media", "video")
.field("payload", IntRange::new(96, 127))
.field("clock-rate", CLOCK_RATE as i32)
.field("encoding-name", "AV1")
.build(),
)
.unwrap();
vec![src_pad_template, sink_pad_template]
});
PAD_TEMPLATES.as_ref()
}
fn change_state(
&self,
element: &Self::Type,
transition: StateChange,
) -> Result<StateChangeSuccess, StateChangeError> {
gst::debug!(CAT, obj: element, "changing state: {}", transition);
if matches!(transition, StateChange::ReadyToPaused) {
let mut state = self.state.lock().unwrap();
self.reset(element, &mut state);
}
let ret = self.parent_change_state(element, transition);
if matches!(transition, StateChange::PausedToReady) {
let mut state = self.state.lock().unwrap();
self.reset(element, &mut state);
}
ret
}
}
impl RTPBasePayloadImpl for RTPAv1Pay {
fn set_caps(&self, element: &Self::Type, _caps: &Caps) -> Result<(), LoggableError> {
element.set_options("video", true, "AV1", CLOCK_RATE);
gst::debug!(CAT, obj: element, "setting caps");
Ok(())
}
fn handle_buffer(
&self,
element: &Self::Type,
buffer: Buffer,
) -> Result<FlowSuccess, FlowError> {
gst::trace!(
CAT,
obj: element,
"received buffer of size {}",
buffer.size()
);
let mut state = self.state.lock().unwrap();
if buffer.flags().contains(BufferFlags::DISCONT) {
gst::debug!(CAT, obj: element, "buffer discontinuity");
self.reset(element, &mut state);
}
let dts = buffer.dts();
let pts = buffer.pts();
let buffer = buffer.into_mapped_buffer_readable().map_err(|_| {
gst::element_error!(
element,
ResourceError::Read,
["Failed to map buffer readable"]
);
FlowError::Error
})?;
self.handle_new_obus(element, &mut state, buffer.as_slice(), dts, pts)
}
fn sink_event(&self, element: &Self::Type, event: Event) -> bool {
gst::log!(CAT, obj: element, "sink event: {}", event.type_());
if matches!(event.type_(), EventType::Eos) {
let mut state = self.state.lock().unwrap();
// flush all remaining OBUs
while let Some(packet_data) = self.consider_new_packet(element, &mut state, true) {
if self
.push_new_packet(element, &mut state, packet_data)
.is_err()
{
break;
}
}
}
self.parent_sink_event(element, event)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::common::*;
#[test]
fn test_consider_new_packet() {
gst::init().unwrap();
let base_obu = SizedObu {
has_extension: false,
has_size_field: true,
leb_size: 1,
header_len: 1,
is_fragment: false,
..SizedObu::default()
};
let input_data = [
(
false, // force argument
State {
// payloader state
obus: vec![
ObuData {
info: SizedObu {
obu_type: ObuType::Padding,
size: 3,
..base_obu
},
bytes: vec![1, 2, 3],
..ObuData::default()
},
ObuData {
info: SizedObu {
obu_type: ObuType::Frame,
size: 4,
..base_obu
},
bytes: vec![1, 2, 3, 4],
..ObuData::default()
},
ObuData {
info: SizedObu {
obu_type: ObuType::Frame,
size: 5,
..base_obu
},
bytes: vec![1, 2, 3, 4, 5],
..ObuData::default()
},
ObuData {
// last two OBUs should not be counted
info: SizedObu {
obu_type: ObuType::TemporalDelimiter,
size: 0,
..base_obu
},
..ObuData::default()
},
ObuData {
info: SizedObu {
obu_type: ObuType::Frame,
size: 10,
..base_obu
},
bytes: vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
..ObuData::default()
},
],
..State::default()
},
),
(
true,
State {
obus: vec![
ObuData {
info: SizedObu {
obu_type: ObuType::TemporalDelimiter,
size: 0,
..base_obu
},
..ObuData::default()
},
ObuData {
info: SizedObu {
obu_type: ObuType::Frame,
size: 7,
..base_obu
},
bytes: vec![1, 2, 3, 4, 5, 6, 7],
..ObuData::default()
},
ObuData {
info: SizedObu {
obu_type: ObuType::Padding,
size: 6,
..base_obu
},
bytes: vec![1, 2, 3, 4, 5, 6],
..ObuData::default()
},
ObuData {
info: SizedObu {
obu_type: ObuType::Frame,
size: 9,
..base_obu
},
bytes: vec![1, 2, 3, 4, 5, 6, 7, 8, 9],
..ObuData::default()
},
ObuData {
info: SizedObu {
obu_type: ObuType::Frame,
size: 3,
..base_obu
},
bytes: vec![1, 2, 3],
..ObuData::default()
},
],
..State::default()
},
),
(
false,
State {
obus: vec![ObuData {
info: SizedObu {
obu_type: ObuType::Frame,
size: 4,
..base_obu
},
bytes: vec![1, 2, 3, 4],
..ObuData::default()
}],
..State::default()
},
),
];
let results = [
(
Some(PacketOBUData {
obu_count: 3,
payload_size: 18,
last_obu_fragment_size: None,
omit_last_size_field: true,
ends_temporal_unit: true,
}),
State {
obus: vec![
input_data[0].1.obus[0].clone(),
input_data[0].1.obus[1].clone(),
input_data[0].1.obus[2].clone(),
input_data[0].1.obus[4].clone(),
],
..input_data[0].1
},
),
(
Some(PacketOBUData {
obu_count: 4,
payload_size: 34,
last_obu_fragment_size: None,
omit_last_size_field: false,
ends_temporal_unit: false,
}),
State {
obus: input_data[1].1.obus[1..].to_owned(),
..input_data[1].1
},
),
(None, input_data[2].1.clone()),
];
let element = <RTPAv1Pay as ObjectSubclass>::Type::new();
let pay = element.imp();
for idx in 0..input_data.len() {
println!("running test {}...", idx);
let mut state = pay.state.lock().unwrap();
*state = input_data[idx].1.clone();
assert_eq!(
pay.consider_new_packet(&element, &mut state, input_data[idx].0),
results[idx].0,
);
assert_eq!(state.obus, results[idx].1.obus);
assert_eq!(state.open_obu_fragment, results[idx].1.open_obu_fragment);
assert_eq!(
state.first_packet_in_seq,
results[idx].1.first_packet_in_seq
);
}
}
}

35
net/rtpav1/src/pay/mod.rs Normal file
View file

@ -0,0 +1,35 @@
//
// Copyright (C) 2022 Vivienne Watermeier <vwatermeier@igalia.com>
//
// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
// If a copy of the MPL was not distributed with this file, You can obtain one at
// <https://mozilla.org/MPL/2.0/>.
//
// SPDX-License-Identifier: MPL-2.0
#![allow(clippy::new_without_default)]
use glib::Object;
use gst::glib;
use gst::prelude::*;
pub mod imp;
glib::wrapper! {
pub struct RTPAv1Pay(ObjectSubclass<imp::RTPAv1Pay>)
@extends gst_rtp::RTPBasePayload, gst::Element, gst::Object;
}
impl RTPAv1Pay {
pub fn new() -> Self {
Object::new(&[]).expect("Failed to create AV1 payloader")
}
}
pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
gst::Element::register(
Some(plugin),
"rtpav1pay",
gst::Rank::Marginal,
RTPAv1Pay::static_type(),
)
}

224
net/rtpav1/tests/rtpav1.rs Normal file
View file

@ -0,0 +1,224 @@
//
// Copyright (C) 2022 Vivienne Watermeier <vwatermeier@igalia.com>
//
// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
// If a copy of the MPL was not distributed with this file, You can obtain one at
// <https://mozilla.org/MPL/2.0/>.
//
// SPDX-License-Identifier: MPL-2.0
use gst::{event::Eos, prelude::*, Buffer, Caps, ClockTime};
use gst_check::Harness;
use gst_rtp::{rtp_buffer::RTPBufferExt, RTPBuffer};
fn init() {
use std::sync::Once;
static INIT: Once = Once::new();
INIT.call_once(|| {
gst::init().unwrap();
gstrtpav1::plugin_register_static().expect("rtpav1 test");
});
}
#[test]
#[rustfmt::skip]
fn test_depayloader() {
let test_packets: [(Vec<u8>, bool, u32); 4] = [
( // simple packet, complete TU
vec![ // RTP payload
0b0001_1000,
0b0011_0000, 1, 2, 3, 4, 5, 6,
],
true, // marker bit
100_000, // timestamp
), ( // 2 OBUs, last is fragmented
vec![
0b0110_0000,
0b0000_0110, 0b0111_1000, 1, 2, 3, 4, 5,
0b0011_0000, 1, 2, 3,
],
false,
190_000,
), ( // continuation of the last OBU
vec![
0b1100_0000,
0b0000_0100, 4, 5, 6, 7,
],
false,
190_000,
), ( // finishing the OBU fragment
vec![
0b1001_0000,
8, 9, 10,
],
true,
190_000,
)
];
let expected: [Vec<u8>; 2] = [
vec![
0b0001_0010, 0,
0b0011_0010, 0b0000_0110, 1, 2, 3, 4, 5, 6,
],
vec![
0b0001_0010, 0,
0b0111_1010, 0b0000_0101, 1, 2, 3, 4, 5,
0b0011_0010, 0b0000_1010, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10,
],
];
init();
let mut h = Harness::new("rtpav1depay");
h.play();
let caps = Caps::builder("application/x-rtp")
.field("media", "video")
.field("payload", 96)
.field("clock-rate", 90000)
.field("encoding-name", "AV1")
.build();
h.set_src_caps(caps);
for (idx, (bytes, marker, timestamp)) in test_packets.iter().enumerate() {
let mut buf = Buffer::new_rtp_with_sizes(bytes.len() as u32, 0, 0).unwrap();
{
let buf_mut = buf.get_mut().unwrap();
let mut rtp_mut = RTPBuffer::from_buffer_writable(buf_mut).unwrap();
rtp_mut.set_marker(*marker);
rtp_mut.set_timestamp(*timestamp);
rtp_mut.set_payload_type(96);
rtp_mut.set_seq(idx as u16);
rtp_mut.payload_mut().unwrap().copy_from_slice(bytes);
}
h.push(buf).unwrap();
}
h.push_event(Eos::new());
for (idx, ex) in expected.iter().enumerate() {
println!("checking buffer {}...", idx);
let buffer = h.pull().unwrap();
let actual = buffer.into_mapped_buffer_readable().unwrap();
assert_eq!(actual.as_slice(), ex.as_slice());
}
}
#[test]
#[rustfmt::skip]
fn test_payloader() {
let test_buffers: [(u64, Vec<u8>); 3] = [
(
0,
vec![ // this should result in exactly 25 bytes for the RTP payload
0b0001_0010, 0,
0b0011_0010, 0b0000_1100, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12,
0b0011_0010, 0b0000_1001, 1, 2, 3, 4, 5, 6, 7, 8, 9,
],
), (
0,
vec![ // these all have to go in separate packets since their IDs mismatch
0b0111_1010, 0b0000_0100, 1, 2, 3, 4,
0b0011_0110, 0b0010_1000, 0b0000_0101, 1, 2, 3, 4, 5,
0b0011_0110, 0b0100_1000, 0b0000_0001, 1,
],
), (
1_000_000_000,
vec![
0b0001_0010, 0,
0b0011_0010, 0b0000_0100, 1, 2, 3, 4,
]
)
];
let expected = [
(
false, // marker bit
0, // relative RTP timestamp
vec![ // payload bytes
0b0010_1000,
0b0000_1101, 0b0011_0000, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12,
0b0011_0000, 1, 2, 3, 4, 5, 6, 7, 8, 9,
],
), (
false,
0,
vec![
0b0001_0000,
0b0111_1000, 1, 2, 3, 4,
]
), (
false,
0,
vec![
0b0001_0000,
0b0011_0100, 0b0010_1000, 1, 2, 3, 4, 5,
]
), (
true,
0,
vec![
0b0001_0000,
0b0011_0100, 0b0100_1000, 1,
]
), (
false,
90_000,
vec![
0b0001_0000,
0b0011_0000, 1, 2, 3, 4,
]
)
];
init();
let mut h = Harness::new("rtpav1pay");
{
let pay = h.element().unwrap();
pay.set_property(
"mtu",
RTPBuffer::calc_packet_len(25, 0, 0)
);
}
h.play();
let caps = Caps::builder("video/x-av1")
.field("parsed", true)
.field("stream-format", "obu-stream")
.field("alignment", "obu")
.build();
h.set_src_caps(caps);
for (pts, bytes) in &test_buffers {
let mut buffer = Buffer::with_size(bytes.len())
.unwrap()
.into_mapped_buffer_writable()
.unwrap();
buffer.copy_from_slice(bytes);
let mut buffer = buffer.into_buffer();
buffer.get_mut().unwrap().set_pts(ClockTime::try_from(*pts).unwrap());
h.push(buffer).unwrap();
}
h.push_event(Eos::new());
let mut base_ts = None;
for (idx, (marker, ts_offset, payload)) in expected.iter().enumerate() {
println!("checking packet {}...", idx);
let buffer = h.pull().unwrap();
let packet = RTPBuffer::from_buffer_readable(&buffer).unwrap();
if base_ts.is_none() {
base_ts = Some(packet.timestamp());
}
assert_eq!(packet.payload().unwrap(), payload.as_slice());
assert_eq!(packet.is_marker(), *marker);
assert_eq!(packet.timestamp(), base_ts.unwrap() + ts_offset);
}
}