rtp: add mp4gpay

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1551>
This commit is contained in:
François Laignel 2024-04-24 18:33:12 +02:00 committed by GStreamer Marge Bot
parent b588ee59bc
commit 16b0a4d762
10 changed files with 1894 additions and 3 deletions

View file

@ -7003,6 +7003,60 @@
},
"rank": "marginal"
},
"rtpmp4gpay2": {
"author": "François Laignel <francois centricular com>",
"description": "Payload an MPEG-4 Generic elementary stream into RTP packets (RFC 3640)",
"hierarchy": [
"GstRtpMpeg4GenericPay",
"GstRtpBasePay2",
"GstElement",
"GstObject",
"GInitiallyUnowned",
"GObject"
],
"klass": "Codec/Payloader/Network/RTP",
"pad-templates": {
"sink": {
"caps": "video/mpeg:\n mpegversion: 4\n systemstream: false\naudio/mpeg:\n mpegversion: 4\n stream-format: raw\n",
"direction": "sink",
"presence": "always"
},
"src": {
"caps": "application/x-rtp:\n media: { (string)audio, (string)video }\n clock-rate: [ 1, 2147483647 ]\n encoding-name: MPEG4-GENERIC\n streamtype: { (string)4, (string)5 }\n mode: { (string)generic, (string)AAC-lbr, (string)AAC-hbr, (string)aac-hbr }\n",
"direction": "src",
"presence": "always"
}
},
"properties": {
"aggregate-mode": {
"blurb": "Whether to send out AUs immediately or aggregate them until a packet is full.",
"conditionally-available": false,
"construct": false,
"construct-only": false,
"controllable": false,
"default": "auto (-1)",
"mutable": "null",
"readable": true,
"type": "GstRtpMpeg4GenericPayAggregateMode",
"writable": true
},
"max-ptime": {
"blurb": "Maximum duration of the packet data in ns (-1 = unlimited up to MTU)",
"conditionally-available": false,
"construct": false,
"construct-only": false,
"controllable": false,
"default": "18446744073709551615",
"max": "9223372036854775807",
"min": "-1",
"mutable": "playing",
"readable": true,
"type": "gint64",
"writable": true
}
},
"rank": "marginal"
},
"rtppcmadepay2": {
"author": "Sebastian Dröge <sebastian@centricular.com>",
"description": "Depayload A-law from RTP packets (RFC 3551)",
@ -7772,6 +7826,26 @@
}
}
},
"GstRtpMpeg4GenericPayAggregateMode": {
"kind": "enum",
"values": [
{
"desc": "Automatic: zero-latency if upstream is live, otherwise aggregate elementary streams until packet is full.",
"name": "auto",
"value": "-1"
},
{
"desc": "Zero Latency: always send out elementary streams right away, do not wait for more elementary streams to fill a packet.",
"name": "zero-latency",
"value": "0"
},
{
"desc": "Aggregate: collect elementary streams until we have a full packet or the max-ptime limit is hit (if set).",
"name": "aggregate",
"value": "1"
}
]
},
"GstRtpPcmauDepay2": {
"hierarchy": [
"GstRtpPcmauDepay2",
@ -11389,4 +11463,4 @@
"tracers": {},
"url": "https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs"
}
}
}

View file

@ -62,6 +62,7 @@ fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
mp4a::pay::register(plugin)?;
mp4g::depay::register(plugin)?;
mp4g::pay::register(plugin)?;
pcmau::depay::register(plugin)?;
pcmau::pay::register(plugin)?;

View file

@ -39,6 +39,12 @@ pub enum MPEG4AudioParserError {
#[error("Wrong frame size. Required {required}, available {available}")]
WrongFrameSize { required: usize, available: usize },
#[error("Unsupported Profile {profile}")]
UnsupportedProfile { profile: String },
#[error("Unsupported Level {level} for Profile {profile}")]
UnsupportedLevel { level: String, profile: String },
}
impl MPEG4AudioParserError {
@ -168,6 +174,73 @@ impl FromBitStream for AudioSpecificConfig {
}
}
/// audioProfileLevelIndication - ISO/IEC 14496-3 (2009) table 1.14
pub struct ProfileLevel {
pub profile: String,
pub level: String,
pub id: u8,
}
impl ProfileLevel {
pub fn from_caps(s: &gst::StructureRef) -> anyhow::Result<ProfileLevel> {
// Note: could use an AudioSpecificConfig based approach
// similar to what is done in gst_codec_utils_aac_get_level
// from gst-plugins-base/gst-libs/gst/pbutils/codec-utils.c
use MPEG4AudioParserError::*;
let profile = s.get::<String>("profile").context("profile")?;
let level = s.get::<String>("level").context("level")?;
let id = match profile.to_lowercase().as_str() {
"lc" => {
// Assumed to be AAC Profile in table 1.14
match level.as_str() {
"1" => 0x28,
"2" => 0x29,
"4" => 0x2a,
"5" => 0x2b,
_ => Err(UnsupportedLevel {
level: level.clone(),
profile: profile.clone(),
})?,
}
}
"he-aac" | "he-aac-v1" => {
// High Efficiency AAC Profile in table 1.14
match level.as_str() {
"2" => 0x2c,
"3" => 0x2d,
"4" => 0x2e,
"5" => 0x2f,
_ => Err(UnsupportedLevel {
level: level.clone(),
profile: profile.clone(),
})?,
}
}
"he-aac-v2" => {
// High Efficiency AAC v2 Profile in table 1.14
match level.as_str() {
"2" => 0x30,
"3" => 0x31,
"4" => 0x32,
"5" => 0x33,
_ => Err(UnsupportedLevel {
level: level.clone(),
profile: profile.clone(),
})?,
}
}
_ => Err(UnsupportedProfile {
profile: profile.clone(),
})?,
};
Ok(ProfileLevel { profile, level, id })
}
}
#[derive(Debug)]
pub struct Subframes<'a> {
frame: gst::MappedBuffer<gst::buffer::Readable>,

View file

@ -1,17 +1,38 @@
//! Access Unit Header and its parser & writer.
use bitstream_io::{BitRead, FromBitStreamWith};
use bitstream_io::{BitRead, BitWrite, FromBitStreamWith, ToBitStreamWith};
use crate::mp4g::{AccessUnitIndex, ModeConfig};
use crate::utils::raw_2_comp_to_i32;
use crate::utils::{mask_valid_2_comp, raw_2_comp_to_i32};
#[derive(thiserror::Error, Debug, PartialEq, Eq)]
pub enum AuHeaderError {
#[error("Unexpected zero-sized AU {}", .0)]
ZeroSizedAu(AccessUnitIndex),
#[error("Undefined mandatory size for AU {}", .0)]
UndefinedMandatorySize(AccessUnitIndex),
#[error("Inconsistent delta index {index}. Previous index: {prev_index}")]
InconsistentDeltaIndex {
index: AccessUnitIndex,
prev_index: AccessUnitIndex,
},
#[error("Unexpected CTS flag set for the first AU header {}", .0)]
CtsFlagSetInFirstAuHeader(AccessUnitIndex),
#[error("Out of range CTS-delta {cts_delta} for AU {index}")]
OutOfRangeSizeCtsDelta {
cts_delta: i32,
index: AccessUnitIndex,
},
#[error("Out of range DTS-delta {dts_delta} for AU {index}")]
OutOfRangeSizeDtsDelta {
dts_delta: i32,
index: AccessUnitIndex,
},
}
#[derive(Debug)]
@ -118,3 +139,97 @@ impl<'a> FromBitStreamWith<'a> for AuHeader {
Ok(this)
}
}
impl<'a> ToBitStreamWith<'a> for AuHeader {
type Context = AuHeaderContext<'a>;
type Error = anyhow::Error;
fn to_writer<W: BitWrite + ?Sized>(
&self,
w: &mut W,
ctx: &AuHeaderContext,
) -> Result<(), Self::Error> {
use anyhow::Context;
use AuHeaderError::*;
if ctx.config.size_len > 0 {
let Some(size) = self.size else {
return Err(UndefinedMandatorySize(self.index).into());
};
if size == 0 {
Err(ZeroSizedAu(self.index))?;
}
w.write(ctx.config.size_len as u32, size)
.context("AU-size")?;
}
match ctx.prev_index {
None => w
.write(ctx.config.index_len as u32, *self.index)
.context("AU-Index")?,
Some(prev_index) => {
let index_delta = self
.index
.checked_sub(*prev_index)
.and_then(|delta| delta.checked_sub(1))
.ok_or(InconsistentDeltaIndex {
index: self.index,
prev_index,
})
.context("AU-Index-delta")?;
w.write(ctx.config.index_delta_len as u32, index_delta)
.context("AU-Index-delta")?;
}
}
if ctx.config.cts_delta_len > 0 {
// § 3.2.1.1:
// > the CTS-flag field MUST have the value 0 in the first AU-header
// > the CTS-flag field SHOULD be 0 for any non-first fragment of an Access Unit
if ctx.prev_index.is_none() {
w.write_bit(false).context("CTS-flag")?;
} else if let Some(cts_delta) = self.cts_delta {
let Some(cts_delta) = mask_valid_2_comp(cts_delta, ctx.config.cts_delta_len) else {
return Err(OutOfRangeSizeCtsDelta {
cts_delta,
index: self.index,
}
.into());
};
w.write_bit(true).context("CTS-flag")?;
w.write(ctx.config.cts_delta_len as u32, cts_delta)
.context("CTS-delta")?;
} else {
w.write_bit(false).context("CTS-flag")?;
}
}
if ctx.config.dts_delta_len > 0 {
if let Some(dts_delta) = self.dts_delta {
let Some(dts_delta) = mask_valid_2_comp(dts_delta, ctx.config.dts_delta_len) else {
return Err(OutOfRangeSizeDtsDelta {
dts_delta,
index: self.index,
}
.into());
};
w.write_bit(true).context("DTS-flag")?;
w.write(ctx.config.dts_delta_len as u32, dts_delta)
.context("DTS-delta")?;
} else {
w.write_bit(false).context("DTS-flag")?;
}
}
if ctx.config.random_access_indication {
w.write_bit(self.maybe_random_access.unwrap_or(false))
.context("RAP-flag")?;
}
Ok(())
}
}

View file

@ -5,6 +5,10 @@ mod header;
pub use header::{AuHeader, AuHeaderContext};
mod mode;
pub use mode::ModeConfig;
pub mod pay;
#[cfg(test)]
mod tests;
#[derive(thiserror::Error, Debug, PartialEq, Eq)]
pub enum Mpeg4GenericError {

View file

@ -1,5 +1,7 @@
//! MPEG-4 Generic mode.
use gst::caps::NoFeature;
use std::str::FromStr;
#[derive(thiserror::Error, Debug, PartialEq, Eq)]
@ -64,6 +66,16 @@ impl ModeConfig {
Some(self.max_displacement)
}
/// Returns the max length in bits of the AU headers
pub fn max_header_bit_len(&self) -> usize {
self.size_len as usize
+ std::cmp::max(self.index_len, self.index_delta_len) as usize
+ self.cts_delta_len as usize
+ self.dts_delta_len as usize
+ if self.random_access_indication { 1 } else { 0 }
+ self.stream_state_indication as usize
}
pub fn from_caps(s: &gst::StructureRef) -> anyhow::Result<Self> {
use ModeError::*;
@ -138,4 +150,50 @@ impl ModeConfig {
},
}
}
pub fn add_to_caps(
&self,
builder: gst::caps::Builder<NoFeature>,
) -> Result<gst::caps::Builder<NoFeature>, ModeError> {
use ModeError::*;
if self.size_len != 0 && self.constant_size != 0 {
Err(BothAuSizeLenAndConstantSize)?;
}
if self.size_len == 0 && self.constant_size == 0 {
Err(NeitherAuSizeLenNorConstantSize)?;
}
if self.index_len > 0 && self.index_delta_len == 0 {
Err(MandatoryIndexDeltaLength)?;
}
if self.stream_state_indication > 0 {
panic!("AU Header Stream State not supported");
}
Ok(builder
.field("sizelength", self.size_len as i32)
.field("indexlength", self.index_len as i32)
.field("indexdeltalength", self.index_delta_len as i32)
.field("ctsdeltalength", self.cts_delta_len as i32)
.field("dtsdeltalength", self.dts_delta_len as i32)
.field(
"randomaccessindication",
if self.random_access_indication {
1u8
} else {
0u8
},
)
.field("streamstateindication", self.stream_state_indication as i32)
.field(
"auxiliarydatasizelength",
self.auxiliary_data_size_len as i32,
)
.field("constantsize", self.constant_size as i32)
.field("constantduration", self.constant_duration as i32)
.field("maxdisplacement", self.max_displacement as i32))
}
}

921
net/rtp/src/mp4g/pay/imp.rs Normal file
View file

@ -0,0 +1,921 @@
// GStreamer RTP MPEG-4 Generic Payloader
//
// Copyright (C) 2023-2024 François Laignel <francois centricular com>
//
// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
// If a copy of the MPL was not distributed with this file, You can obtain one at
// <https://mozilla.org/MPL/2.0/>.
//
// SPDX-License-Identifier: MPL-2.0
/**
* SECTION:element-rtpmp4gpay2
* @see_also: rtpmp4gpay2, rtpmp4gpay, rtpmp4gpay, fdkaacenc, fdkaacdec, avenc_mpeg4, avdec_mpeg4
*
* Payload an MPEG-4 Generic elementary stream into RTP packets as per [RFC 3640][rfc-3640].
* Also see the [IANA media-type page for MPEG-4 Generic][iana-mpeg4-generic].
*
* [rfc-3640]: https://www.rfc-editor.org/rfc/rfc3640.html#section-4
* [iana-mpeg4-generic]: https://www.iana.org/assignments/media-types/application/mpeg4-generic
*
* ## Aggregation Modes
*
* The default aggregation mode is `auto`: If upstream is live, the payloader will send out
* AUs immediately, even if they don't completely fill a packet, in order to minimise
* latency. If upstream is not live, the payloader will by default aggregate AUs until
* it has completely filled an RTP packet as per the configured MTU size or the `max-ptime`
* property if it is set (it is not set by default).
*
* The aggregation mode can be controlled via the `aggregate-mode` property.
*
* ## Example pipeline
* |[
* gst-launch-1.0 audiotestsrc ! fdkaacenc ! rtpmp4gpay2 ! udpsink host=127.0.0.1 port=5004
* ]| This will encode an audio test signal to AAC and then payload the encoded audio
* into RTP packets and send them out via UDP to localhost (IPv4) port 5004.
* You can use the #rtpmp4gdepay2 or #rtpmp4gdepay elements to depayload such a stream, and
* the #fdkaacdec element to decode the depayloaded stream.
*
* Since: plugins-rs-0.13.0
*/
use atomic_refcell::AtomicRefCell;
use bitstream_io::{BigEndian, BitCounter, BitRead, BitReader, BitWrite, BitWriter};
use once_cell::sync::Lazy;
use gst::{glib, prelude::*, subclass::prelude::*};
use smallvec::SmallVec;
use std::collections::VecDeque;
use std::sync::Mutex;
use crate::basepay::{PacketToBufferRelation, RtpBasePay2Ext, RtpBasePay2Impl, RtpBasePay2ImplExt};
use super::RtpMpeg4GenericPayAggregateMode;
use crate::mp4a::parsers::{AudioSpecificConfig, ProfileLevel};
use crate::mp4g::{AccessUnitIndex, AuHeader, AuHeaderContext, ModeConfig};
const VOS_STARTCODE: u32 = 0x000001B0;
/// The size of the field representing the AU headers section len.
const HEADERS_LEN_SIZE: usize = 2;
/// Access Unit maximum header len in bytes.
/// This depends on the supported mode. In current implementation, 3 is the maximum.
const HEADER_MAX_LEN: usize = 3;
#[derive(Clone)]
struct Settings {
max_ptime: Option<gst::ClockTime>,
aggregate_mode: RtpMpeg4GenericPayAggregateMode,
}
impl Default for Settings {
fn default() -> Self {
Settings {
aggregate_mode: RtpMpeg4GenericPayAggregateMode::Auto,
max_ptime: None,
}
}
}
#[derive(Default)]
pub struct RtpMpeg4GenericPay {
state: AtomicRefCell<State>,
settings: Mutex<Settings>,
is_live: Mutex<Option<bool>>,
}
#[derive(Debug)]
struct AccessUnit {
id: u64,
pts: Option<gst::ClockTime>,
dts_delta: Option<i32>,
duration: Option<gst::ClockTime>,
maybe_random_access: Option<bool>,
buffer: gst::MappedBuffer<gst::buffer::Readable>,
}
#[derive(Default)]
struct State {
/// Configuration of current Mode.
mode: ModeConfig,
/// Maximum bit length needed to store an AU Header.
max_header_bit_len: usize,
/// Minimum MTU necessary to handle the outgoing packets.
min_mtu: usize,
/// Pending AU (we collect until ptime/max-ptime is hit or the packet is full)
pending_aus: VecDeque<AccessUnit>,
pending_size: usize,
pending_duration: Option<gst::ClockTime>,
clock_rate: u32,
/// Desired "packet time", i.e. packet duration, from the downstream caps, if set
ptime: Option<gst::ClockTime>,
max_ptime: Option<gst::ClockTime>,
}
impl State {
fn flush(&mut self) {
self.pending_aus.clear();
self.pending_size = 0;
self.pending_duration = None;
}
}
static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
gst::DebugCategory::new(
"rtpmp4gpay2",
gst::DebugColorFlags::empty(),
Some("RTP MPEG-4 Generic Payloader"),
)
});
#[glib::object_subclass]
impl ObjectSubclass for RtpMpeg4GenericPay {
const NAME: &'static str = "GstRtpMpeg4GenericPay";
type Type = super::RtpMpeg4GenericPay;
type ParentType = crate::basepay::RtpBasePay2;
}
impl ObjectImpl for RtpMpeg4GenericPay {
fn properties() -> &'static [glib::ParamSpec] {
static PROPERTIES: Lazy<Vec<glib::ParamSpec>> = Lazy::new(|| {
vec![
glib::ParamSpecEnum::builder_with_default(
"aggregate-mode",
Settings::default().aggregate_mode,
)
.nick("Aggregate Mode")
.blurb(
"Whether to send out AUs immediately or aggregate them until a packet is full.",
)
.build(),
// Using same type/semantics as C payloaders
glib::ParamSpecInt64::builder("max-ptime")
.nick("Maximum Packet Time")
.blurb("Maximum duration of the packet data in ns (-1 = unlimited up to MTU)")
.default_value(
Settings::default()
.max_ptime
.map(gst::ClockTime::nseconds)
.map(|x| x as i64)
.unwrap_or(-1),
)
.minimum(-1)
.maximum(i64::MAX)
.mutable_playing()
.build(),
]
});
PROPERTIES.as_ref()
}
fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) {
let mut settings = self.settings.lock().unwrap();
match pspec.name() {
"aggregate-mode" => {
settings.aggregate_mode = value
.get::<RtpMpeg4GenericPayAggregateMode>()
.expect("type checked upstream");
}
"max-ptime" => {
let new_max_ptime = match value.get::<i64>().unwrap() {
-1 => None,
v @ 0.. => Some(gst::ClockTime::from_nseconds(v as u64)),
_ => unreachable!(),
};
let changed = settings.max_ptime != new_max_ptime;
settings.max_ptime = new_max_ptime;
drop(settings);
if changed {
let _ = self
.obj()
.post_message(gst::message::Latency::builder().src(&*self.obj()).build());
}
}
_ => unimplemented!(),
};
}
fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
let settings = self.settings.lock().unwrap();
match pspec.name() {
"aggregate-mode" => settings.aggregate_mode.to_value(),
"max-ptime" => (settings
.max_ptime
.map(gst::ClockTime::nseconds)
.map(|x| x as i64)
.unwrap_or(-1))
.to_value(),
_ => unimplemented!(),
}
}
}
impl GstObjectImpl for RtpMpeg4GenericPay {}
impl ElementImpl for RtpMpeg4GenericPay {
fn metadata() -> Option<&'static gst::subclass::ElementMetadata> {
static ELEMENT_METADATA: Lazy<gst::subclass::ElementMetadata> = Lazy::new(|| {
gst::subclass::ElementMetadata::new(
"RTP MPEG-4 Generic Payloader",
"Codec/Payloader/Network/RTP",
"Payload an MPEG-4 Generic elementary stream into RTP packets (RFC 3640)",
"François Laignel <francois centricular com>",
)
});
Some(&*ELEMENT_METADATA)
}
fn pad_templates() -> &'static [gst::PadTemplate] {
static PAD_TEMPLATES: Lazy<Vec<gst::PadTemplate>> = Lazy::new(|| {
let sink_pad_template = gst::PadTemplate::new(
"sink",
gst::PadDirection::Sink,
gst::PadPresence::Always,
&gst::Caps::builder_full()
.structure(
gst::Structure::builder("video/mpeg")
.field("mpegversion", 4i32)
.field("systemstream", false)
.build(),
)
.structure(
gst::Structure::builder("audio/mpeg")
.field("mpegversion", 4i32)
.field("stream-format", "raw")
.build(),
)
.build(),
)
.unwrap();
let src_pad_template = gst::PadTemplate::new(
"src",
gst::PadDirection::Src,
gst::PadPresence::Always,
&gst::Caps::builder("application/x-rtp")
// TODO "application" is also present in rtpmp4gpay caps template
// but it doesn't handle it in gst_rtp_mp4g_pay_setcaps
.field("media", gst::List::new(["audio", "video"]))
.field("clock-rate", gst::IntRange::new(1i32, i32::MAX))
.field("encoding-name", "MPEG4-GENERIC")
// Required string params:
.field("streamtype", gst::List::new(["4", "5"])) // 4 = video, 5 = audio
// "profile-level-id = [1,MAX], "
// "config = (string)"
.field(
"mode",
gst::List::new(["generic", "AAC-lbr", "AAC-hbr", "aac-hbr"]),
)
// Optional general parameters:
// "objecttype = [1,MAX], "
// "constantsize = [1,MAX], " // constant size of each AU
// "constantduration = [1,MAX], " // constant duration of each AU
// "maxdisplacement = [1,MAX], "
// "de-interleavebuffersize = [1,MAX], "
// Optional configuration parameters:
// "sizelength = [1, 32], "
// "indexlength = [1, 32], "
// "indexdeltalength = [1, 32], "
// "ctsdeltalength = [1, 32], "
// "dtsdeltalength = [1, 32], "
// "randomaccessindication = {0, 1}, "
// "streamstateindication = [0, 32], "
// "auxiliarydatasizelength = [0, 32]" )
.build(),
)
.unwrap();
vec![sink_pad_template, src_pad_template]
});
PAD_TEMPLATES.as_ref()
}
}
/// Returns the difference between `ClockTime`s `ct1` & `ct2` in RTP scale.
///
/// Returns `None` if at least one of the `ClockTime`s is `None`.
/// Returns `Some(None)` if an overflow occurred, error management is left to the caller.
/// Returns `Some(delta)` if the difference could be computed.
fn ct_delta_to_rtp(
ct1: Option<gst::ClockTime>,
ct0: Option<gst::ClockTime>,
clock_rate: u32,
) -> Option<Option<i32>> {
ct1.into_positive().opt_sub(ct0).map(|delta_ct| {
delta_ct
.into_inner_signed()
.try_into()
.ok()
.and_then(|delta_inner: i64| {
delta_inner
.mul_div_ceil(clock_rate as i64, *gst::ClockTime::SECOND as i64)
.and_then(|dts_delta| dts_delta.try_into().ok())
})
})
}
impl RtpBasePay2Impl for RtpMpeg4GenericPay {
const ALLOWED_META_TAGS: &'static [&'static str] = &["audio"];
fn set_sink_caps(&self, caps: &gst::Caps) -> bool {
let s = caps.structure(0).unwrap();
let codec_data = match s.get::<&gst::BufferRef>("codec_data") {
Ok(codec_data) => codec_data,
Err(err) => {
gst::error!(CAT, imp: self, "Error getting codec_data from Caps: {err}");
return false;
}
};
let Ok(codec_data) = codec_data.map_readable() else {
gst::error!(CAT, imp: self, "Failed to map codec_data as readable");
return false;
};
let codec_data_str = hex::encode(&codec_data);
let caps_builder = gst::Caps::builder("application/x-rtp")
.field("seqnum-base", self.obj().property::<u32>("seqnum") + 1)
.field("mpegversion", 4i32)
.field("encoding-name", "MPEG4-GENERIC")
.field("config", codec_data_str);
let (clock_rate, mode, caps_builder) = match s.name().as_str() {
"audio/mpeg" => {
let mut r = BitReader::endian(codec_data.as_slice(), BigEndian);
let config = match r.parse::<AudioSpecificConfig>() {
Ok(config) => config,
Err(err) => {
gst::error!(CAT, imp: self, "Error parsing audio codec_data: {err:#}");
return false;
}
};
if config.audio_object_type == 0 || config.audio_object_type > 6 {
gst::error!(CAT, imp: self, "Unsupported Audio Object Type {}", config.audio_object_type);
return false;
}
let profile_level = match ProfileLevel::from_caps(s) {
Ok(profile_level) => profile_level,
Err(err) => {
gst::error!(CAT, imp: self, "Error getting profile level from Caps: {err:#}");
return false;
}
};
gst::log!(CAT, imp: self, "Using audio codec_data {config:?}");
// AAC-hbr: also used by rtpmp4gpay
// RFC 3640 also defines AAC-lbr, with a maximum encoded buffer
// size of 63 bytes and which can't be fragmented. Only AAC-hbr
// is used because it is more flexible. We could implement AAC-lbr
// provided make sure the encoded buffers can't exceed the limit
// and add a flag to prevent fragmentation in `send_packets()`.
// See https://www.rfc-editor.org/rfc/rfc3640.html#section-3.3.5
let mode = ModeConfig {
size_len: 13,
index_len: 3,
index_delta_len: 3,
constant_duration: config.frame_len as u32,
..Default::default()
};
let caps_builder = mode
.add_to_caps(
caps_builder
.field("media", "audio")
.field("streamtype", "5")
.field("mode", "AAC-hbr")
.field("clock-rate", config.sampling_freq as i32)
.field("profile", &profile_level.profile)
.field("level", &profile_level.level)
.field("profile-level-id", profile_level.id)
.field("encoding-params", config.channel_conf as i32),
)
.expect("invalid audio mode");
(config.sampling_freq, mode, caps_builder)
}
"video/mpeg" => {
if codec_data.len() < 5 {
gst::error!(CAT, imp: self, "Error parsing video codec_data: too short");
return false;
}
let code = u32::from_be_bytes(codec_data[..4].try_into().unwrap());
let profile = if code == VOS_STARTCODE {
let profile = codec_data[4];
gst::log!(CAT, imp: self, "Using video codec_data profile {profile}");
profile
} else {
gst::warning!(CAT, imp: self, "Unexpected VOS startcode in video codec_data. Assuming profile '1'");
1
};
// Use a larger size_len than rtpmp4gpay
// otherwise some large AU can't be payloaded.
// rtpmp4gpay uses bit shifts to have the AU data size
// fit in 13 bits, resulting in an invalid size.
let mode = ModeConfig {
size_len: 16,
index_len: 3,
index_delta_len: 3,
cts_delta_len: 16,
dts_delta_len: 16,
random_access_indication: true,
..Default::default()
};
let caps_builder = mode
.add_to_caps(
caps_builder
.field("media", "video")
.field("streamtype", "4")
.field("mode", "generic")
.field("clock-rate", 90000i32)
.field("profile-level-id", profile as i32),
)
.expect("invalid video mode");
(90000, mode, caps_builder)
}
// TODO handle "application"
_ => unreachable!(),
};
self.obj().set_src_caps(&caps_builder.build());
let mut state = self.state.borrow_mut();
state.max_header_bit_len = mode.max_header_bit_len();
state.min_mtu = rtp_types::RtpPacket::MIN_RTP_PACKET_LEN
+ HEADERS_LEN_SIZE
+ (state.max_header_bit_len + 7) / 8
+ 1;
state.mode = mode;
state.clock_rate = clock_rate;
true
}
fn negotiate(&self, mut src_caps: gst::Caps) {
// Fixate as a first step
src_caps.fixate();
let s = src_caps.structure(0).unwrap();
// Negotiate ptime/maxptime with downstream and use them in combination with the
// properties. See https://www.iana.org/assignments/media-types/application/mpeg4-generic
let ptime = s
.get::<u32>("ptime")
.ok()
.map(u64::from)
.map(gst::ClockTime::from_mseconds);
let max_ptime = s
.get::<u32>("maxptime")
.ok()
.map(u64::from)
.map(gst::ClockTime::from_mseconds);
self.parent_negotiate(src_caps);
let mut state = self.state.borrow_mut();
state.ptime = ptime;
state.max_ptime = max_ptime;
drop(state);
}
// Encapsulation of MPEG-4 Generic Elementary Streams:
// https://www.rfc-editor.org/rfc/rfc3640
fn handle_buffer(
&self,
buffer: &gst::Buffer,
id: u64,
) -> Result<gst::FlowSuccess, gst::FlowError> {
let mut state = self.state.borrow_mut();
let mut settings = self.settings.lock().unwrap();
gst::trace!(CAT, imp: self, "Handling buffer {id} duration {} pts {} dts {}, len {}",
buffer.duration().display(), buffer.pts().display(), buffer.dts().display(), buffer.size(),
);
let maybe_random_access = if state.mode.random_access_indication {
Some(!buffer.flags().contains(gst::BufferFlags::DELTA_UNIT))
} else {
None
};
let dts_delta = ct_delta_to_rtp(buffer.dts(), buffer.pts(), state.clock_rate).and_then(|dts_delta_res| {
if dts_delta_res.is_none() {
gst::warning!(CAT, imp: self, "Overflow computing DTS-delta between pts {} & dts {}",
buffer.dts().display(), buffer.pts().display(),
);
}
dts_delta_res
});
gst::trace!(CAT, imp: self,
"Pushing AU from buffer {id} dts_delta {dts_delta:?} random access {maybe_random_access:?}",
);
state.pending_aus.push_back(AccessUnit {
id,
duration: buffer.duration(),
pts: buffer.pts(),
dts_delta,
buffer: buffer.clone().into_mapped_buffer_readable().map_err(|_| {
gst::error!(CAT, imp: self, "Can't map incoming buffer readable");
gst::FlowError::Error
})?,
maybe_random_access,
});
state.pending_size += buffer.size();
state.pending_duration.opt_add_assign(buffer.duration());
// Make sure we have queried upstream liveness if needed
if settings.aggregate_mode == RtpMpeg4GenericPayAggregateMode::Auto {
self.ensure_upstream_liveness(&mut settings);
}
self.send_packets(&settings, &mut state, SendPacketMode::WhenReady)
}
fn drain(&self) -> Result<gst::FlowSuccess, gst::FlowError> {
let settings = self.settings.lock().unwrap().clone();
let mut state = self.state.borrow_mut();
self.send_packets(&settings, &mut state, SendPacketMode::ForcePending)
}
fn flush(&self) {
self.state.borrow_mut().flush();
}
#[allow(clippy::single_match)]
fn src_query(&self, query: &mut gst::QueryRef) -> bool {
let res = self.parent_src_query(query);
if !res {
return false;
}
match query.view_mut() {
gst::QueryViewMut::Latency(query) => {
let settings = self.settings.lock().unwrap();
let (is_live, mut min, mut max) = query.result();
{
let mut live_guard = self.is_live.lock().unwrap();
if Some(is_live) != *live_guard {
gst::info!(CAT, imp: self, "Upstream is live: {is_live}");
*live_guard = Some(is_live);
}
}
if self.effective_aggregate_mode(&settings)
== RtpMpeg4GenericPayAggregateMode::Aggregate
{
if let Some(max_ptime) = settings.max_ptime {
min += max_ptime;
max.opt_add_assign(max_ptime);
} else if is_live {
gst::warning!(CAT, imp: self,
"Aggregating packets in live mode, but no max_ptime configured. \
Configured latency may be too low!"
);
}
query.set(is_live, min, max);
}
}
_ => (),
}
true
}
fn start(&self) -> Result<(), gst::ErrorMessage> {
*self.state.borrow_mut() = State::default();
*self.is_live.lock().unwrap() = None;
self.parent_start()
}
fn stop(&self) -> Result<(), gst::ErrorMessage> {
*self.state.borrow_mut() = State::default();
*self.is_live.lock().unwrap() = None;
self.parent_stop()
}
}
#[derive(Debug, PartialEq)]
enum SendPacketMode {
WhenReady,
ForcePending,
}
impl RtpMpeg4GenericPay {
fn send_packets(
&self,
settings: &Settings,
state: &mut State,
send_mode: SendPacketMode,
) -> Result<gst::FlowSuccess, gst::FlowError> {
let agg_mode = self.effective_aggregate_mode(settings);
if (self.obj().mtu() as usize) < state.min_mtu {
gst::error!(CAT, imp: self, "Insufficient mtu {} at least {} bytes needed", self.obj().mtu(), state.min_mtu);
return Err(gst::FlowError::Error);
}
let max_payload_size = self.obj().max_payload_size() as usize - HEADERS_LEN_SIZE;
let mut ctx = AuHeaderContext {
config: &state.mode,
prev_index: None,
};
let mut headers_buf = SmallVec::<[u8; 10 * HEADER_MAX_LEN]>::new();
let mut au_data_list = SmallVec::<[gst::MappedBuffer<gst::buffer::Readable>; 10]>::new();
// https://www.rfc-editor.org/rfc/rfc3640.html#section-3.1
// The M bit is set to 1 to indicate that the RTP packet payload
// contains either the final fragment of a fragmented Access Unit
// or one or more complete Access Units.
// Send out packets if there's enough data for one (or more), or if forced.
while let Some(front) = state.pending_aus.front() {
headers_buf.clear();
ctx.prev_index = None;
if front.buffer.len() + (state.max_header_bit_len + 7) / 8 > max_payload_size {
// AU needs to be fragmented
let au = state.pending_aus.pop_front().unwrap();
let mut data = au.buffer.as_slice();
state.pending_size = state.pending_size.saturating_sub(data.len());
let mut next_frag_offset = 0;
let mut is_final = false;
while !is_final {
let header = AuHeader {
// The size of the complete AU for all the fragments
size: Some(au.buffer.len() as u32),
// One AU fragment per packet
index: AccessUnitIndex::ZERO,
// CTS-delta SHOULD not be set for a fragment, see § 3.2.1.1
dts_delta: au.dts_delta,
maybe_random_access: au.maybe_random_access,
..Default::default()
};
headers_buf.clear();
let mut w = BitWriter::endian(&mut headers_buf, BigEndian);
let mut res = w.build_with(&header, &ctx);
if res.is_ok() {
// add final padding
res = w.write(7, 0).map_err(Into::into);
}
if let Err(err) = res {
gst::error!(CAT, imp: self, "Failed to write header for AU {} in buffer {}: {err:#}", header.index, au.id);
return Err(gst::FlowError::Error);
}
// Unfortunately BitWriter doesn't return the size written.
let mut c = BitCounter::<u32, BigEndian>::new();
c.build_with(&header, &ctx).unwrap();
let header_bit_len = c.written() as u16;
let left = au.buffer.len() - next_frag_offset;
let bytes_in_this_packet =
std::cmp::min(left, max_payload_size - (header_bit_len as usize + 7) / 8);
next_frag_offset += bytes_in_this_packet;
is_final = next_frag_offset >= au.buffer.len();
self.obj().queue_packet(
au.id.into(),
rtp_types::RtpPacketBuilder::new()
// AU-headers-length: only one 1 AU header here
.payload(header_bit_len.to_be_bytes().as_slice())
.payload(headers_buf.as_slice())
.payload(&data[0..bytes_in_this_packet])
.marker_bit(is_final),
)?;
data = &data[bytes_in_this_packet..];
}
continue;
}
// Will not fragment this AU
// We optimistically add average size/duration to send out packets as early as possible
// if we estimate that the next AU would likely overflow our accumulation limits.
let n_aus = state.pending_aus.len();
let avg_size = state.pending_size / n_aus;
let avg_duration = state.pending_duration.opt_div(n_aus as u64);
let max_ptime = settings
.max_ptime
.opt_min(state.max_ptime)
.opt_min(state.ptime);
let is_ready = send_mode == SendPacketMode::ForcePending
|| agg_mode != RtpMpeg4GenericPayAggregateMode::Aggregate
|| state.pending_size + avg_size + n_aus * (state.max_header_bit_len + 7) / 8
> max_payload_size
|| state
.pending_duration
.opt_add(avg_duration)
.opt_gt(max_ptime)
.unwrap_or(false);
gst::log!(CAT, imp: self,
"Pending: size {}, duration ~{:.3}, mode: {agg_mode:?} + {send_mode:?} => {}",
state.pending_size,
state.pending_duration.display(),
if is_ready { "ready" } else { "not ready, waiting for more data" },
);
if !is_ready {
break;
}
gst::trace!(CAT, imp: self, "Creating packet..");
let id = front.id;
let mut end_id = front.id;
let mut acc_duration = gst::ClockTime::ZERO;
let mut acc_size = 0;
let mut headers_len = 0;
let mut w = BitWriter::endian(&mut headers_buf, BigEndian);
let mut index = AccessUnitIndex::ZERO;
let mut previous_pts = None;
au_data_list.clear();
while let Some(front) = state.pending_aus.front() {
gst::trace!(CAT, imp: self, "{front:?}, accumulated size {acc_size} duration ~{acc_duration:.3}");
// If this AU would overflow the packet, bail out and send out what we have.
//
// Don't take into account the max_ptime for the first AU, since it could be
// lower than the AU duration in which case we would never payload anything.
//
// For the size check in bytes we know that the first AU will fit the mtu,
// because we already checked for the "AU needs to be fragmented" scenario above.
let cts_delta = if ctx.prev_index.is_none() {
// No CTS-delta for the first AU in the packet
None
} else {
ct_delta_to_rtp(front.pts, previous_pts, state.clock_rate).and_then(|dts_delta_res| {
if dts_delta_res.is_none() {
gst::warning!(CAT, imp: self, "Overflow computing CTS-delta between pts {} & previous pts {}",
front.pts.display(), previous_pts.display(),
);
}
dts_delta_res
})
};
previous_pts = front.pts;
let header = AuHeader {
size: Some(front.buffer.len() as u32),
index,
cts_delta,
dts_delta: front.dts_delta,
maybe_random_access: front.maybe_random_access,
..Default::default()
};
w.build_with(&header, &ctx).map_err(|err| {
gst::error!(CAT, imp: self, "Failed to write header for AU {} in buffer {}: {err:#}",
header.index, front.id,
);
gst::FlowError::Error
})?;
// Unfortunately BitWriter doesn't return the size written.
let mut c = BitCounter::<u32, BigEndian>::new();
c.build_with(&header, &ctx).unwrap();
let header_bit_len = c.written() as u16;
if acc_size + ((headers_len + header_bit_len) as usize + 7) / 8 + front.buffer.len()
> max_payload_size
|| (ctx.prev_index.is_some()
&& max_ptime
.opt_lt(acc_duration.opt_add(front.duration))
.unwrap_or(false))
{
break;
}
let au = state.pending_aus.pop_front().unwrap();
end_id = au.id;
acc_size += au.buffer.len();
acc_duration.opt_add_assign(au.duration);
state.pending_size -= au.buffer.len();
state.pending_duration.opt_saturating_sub(au.duration);
headers_len += header_bit_len;
au_data_list.push(au.buffer);
ctx.prev_index = Some(index);
index += 1;
}
// add final padding
if let Err(err) = w.write(7, 0) {
gst::error!(CAT, imp: self, "Failed to write padding for final AU {} in buffer {end_id}: {err}",
ctx.prev_index.expect("at least one AU"),
);
return Err(gst::FlowError::Error);
}
let headers_len = headers_len.to_be_bytes();
debug_assert_eq!(headers_len.len(), 2);
let mut packet = rtp_types::RtpPacketBuilder::new()
.marker_bit(true)
.payload(headers_len.as_slice())
.payload(headers_buf.as_slice());
for au_data in &au_data_list {
packet = packet.payload(au_data.as_slice());
}
self.obj()
.queue_packet(PacketToBufferRelation::Ids(id..=end_id), packet)?;
}
gst::log!(CAT, imp: self, "All done for now, {} pending AUs", state.pending_aus.len());
if send_mode == SendPacketMode::ForcePending {
self.obj().finish_pending_packets()?;
}
Ok(gst::FlowSuccess::Ok)
}
fn effective_aggregate_mode(&self, settings: &Settings) -> RtpMpeg4GenericPayAggregateMode {
match settings.aggregate_mode {
RtpMpeg4GenericPayAggregateMode::Auto => match self.is_live() {
Some(true) => RtpMpeg4GenericPayAggregateMode::ZeroLatency,
Some(false) => RtpMpeg4GenericPayAggregateMode::Aggregate,
None => RtpMpeg4GenericPayAggregateMode::ZeroLatency,
},
mode => mode,
}
}
fn is_live(&self) -> Option<bool> {
*self.is_live.lock().unwrap()
}
// Query upstream live-ness if needed, in case of aggregate-mode=auto
fn ensure_upstream_liveness(&self, settings: &mut Settings) {
if settings.aggregate_mode != RtpMpeg4GenericPayAggregateMode::Auto
|| self.is_live().is_some()
{
return;
}
let mut q = gst::query::Latency::new();
let is_live = if self.obj().sink_pad().peer_query(&mut q) {
let (is_live, _, _) = q.result();
is_live
} else {
false
};
*self.is_live.lock().unwrap() = Some(is_live);
gst::info!(CAT, imp: self, "Upstream is live: {is_live}");
}
}

View file

@ -0,0 +1,58 @@
// GStreamer RTP MPEG-4 Generic Payloader
//
// Copyright (C) 2023-2024 François Laignel <francois centricular com>
//
// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
// If a copy of the MPL was not distributed with this file, You can obtain one at
// <https://mozilla.org/MPL/2.0/>.
//
// SPDX-License-Identifier: MPL-2.0
use gst::glib;
use gst::prelude::*;
pub mod imp;
#[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Clone, Copy, glib::Enum)]
#[repr(i32)]
#[enum_type(name = "GstRtpMpeg4GenericPayAggregateMode")]
#[non_exhaustive]
pub(crate) enum RtpMpeg4GenericPayAggregateMode {
#[enum_value(
name = "Automatic: zero-latency if upstream is live, otherwise aggregate elementary streams until packet is full.",
nick = "auto"
)]
Auto = -1,
#[enum_value(
name = "Zero Latency: always send out elementary streams right away, do not wait for more elementary streams to fill a packet.",
nick = "zero-latency"
)]
ZeroLatency = 0,
#[enum_value(
name = "Aggregate: collect elementary streams until we have a full packet or the max-ptime limit is hit (if set).",
nick = "aggregate"
)]
Aggregate = 1,
}
glib::wrapper! {
pub struct RtpMpeg4GenericPay(ObjectSubclass<imp::RtpMpeg4GenericPay>)
@extends crate::basepay::RtpBasePay2, gst::Element, gst::Object;
}
pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
#[cfg(feature = "doc")]
{
RtpMpeg4GenericPayAggregateMode::static_type()
.mark_as_plugin_api(gst::PluginAPIFlags::empty());
}
gst::Element::register(
Some(plugin),
"rtpmp4gpay2",
gst::Rank::MARGINAL,
RtpMpeg4GenericPay::static_type(),
)
}

527
net/rtp/src/mp4g/tests.rs Normal file
View file

@ -0,0 +1,527 @@
// SPDX-License-Identifier: MPL-2.0
use crate::tests::{run_test_pipeline, ExpectedBuffer, ExpectedPacket, Source};
use gst::prelude::*;
fn init() {
use std::sync::Once;
static INIT: Once = Once::new();
INIT.call_once(|| {
gst::init().unwrap();
crate::plugin_register_static().expect("rtpmp4g test");
});
}
#[test]
fn aac_hbr_not_fragmented() {
init();
let src =
"audiotestsrc num-buffers=100 ! audio/x-raw,rate=48000,channels=2 ! fdkaacenc ! aacparse";
let pay = "rtpmp4gpay2";
let depay = "rtpmp4gdepay2";
let mut expected_pay = Vec::with_capacity(102);
for i in 0..102 {
let position = i * 1024;
expected_pay.push(vec![ExpectedPacket::builder()
.pts(gst::ClockTime::from_nseconds(
position
.mul_div_floor(*gst::ClockTime::SECOND, 48_000)
.unwrap(),
))
.flags(if i == 0 {
gst::BufferFlags::DISCONT | gst::BufferFlags::MARKER
} else {
gst::BufferFlags::MARKER
})
.rtp_time((position & 0xffff_ffff) as u32)
.build()]);
}
let mut expected_depay = Vec::with_capacity(102);
for i in 0..102 {
let position = i * 1024;
expected_depay.push(vec![ExpectedBuffer::builder()
.pts(gst::ClockTime::from_nseconds(
position
.mul_div_floor(*gst::ClockTime::SECOND, 48_000)
.unwrap(),
))
.flags(if i == 0 {
gst::BufferFlags::DISCONT
} else {
gst::BufferFlags::empty()
})
.build()]);
}
run_test_pipeline(Source::Bin(src), pay, depay, expected_pay, expected_depay);
}
#[test]
fn aac_hbr_fragmented() {
init();
let src =
"audiotestsrc num-buffers=100 ! audio/x-raw,rate=48000,channels=1 ! fdkaacenc ! aacparse";
let pay = "rtpmp4gpay2 mtu=288";
let depay = "rtpmp4gdepay2";
let mut expected_pay = Vec::with_capacity(102);
for i in 0..102 {
let position = i * 1024;
let pts = gst::ClockTime::from_nseconds(
position
.mul_div_floor(*gst::ClockTime::SECOND, 48_000)
.unwrap(),
);
let rtp_time = (position & 0xffff_ffff) as u32;
expected_pay.push(vec![
ExpectedPacket::builder()
.pts(pts)
.flags(if i == 0 {
gst::BufferFlags::DISCONT
} else {
gst::BufferFlags::empty()
})
.rtp_time(rtp_time)
.marker_bit(false)
.build(),
ExpectedPacket::builder()
.pts(pts)
.flags(gst::BufferFlags::MARKER)
.rtp_time(rtp_time)
.build(),
]);
}
let mut expected_depay = Vec::with_capacity(102);
for i in 0..102 {
let position = i * 1024;
expected_depay.push(vec![ExpectedBuffer::builder()
.pts(gst::ClockTime::from_nseconds(
position
.mul_div_floor(*gst::ClockTime::SECOND, 48_000)
.unwrap(),
))
.flags(if i == 0 {
gst::BufferFlags::DISCONT
} else {
gst::BufferFlags::empty()
})
.build()]);
}
run_test_pipeline(Source::Bin(src), pay, depay, expected_pay, expected_depay);
}
#[test]
fn generic_not_fragmented() {
const BUFFER_NB: usize = 4;
const BUFFER_SIZE: usize = 600;
const MTU: usize = 1400;
const PACKETS_PER_BUFFER: usize = MTU / BUFFER_SIZE;
const RTP_CLOCK_RATE: u64 = 90_000;
const FRAME_RATE: u64 = 30;
init();
let codec_data = gst::Buffer::from_slice([0x00, 0x00, 0x01, 0xb0, 0x01]);
let caps = gst::Caps::builder("video/mpeg")
.field("mpegversion", 4i32)
.field("systemstream", false)
.field("codec_data", codec_data)
.build();
let pos_to_pts = |pos: usize| {
1000.hours()
+ (pos as u64)
.mul_div_floor(*gst::ClockTime::SECOND, FRAME_RATE)
.map(gst::ClockTime::from_nseconds)
.unwrap()
};
let pos_to_rtp = |pos: usize| {
((pos as u64)
.mul_div_ceil(RTP_CLOCK_RATE, FRAME_RATE)
.unwrap()
& 0xffff_ffff) as u32
};
let duration =
gst::ClockTime::from_nseconds(1.mul_div_ceil(*gst::ClockTime::SECOND, FRAME_RATE).unwrap());
let mut buffers = Vec::with_capacity(BUFFER_NB);
for pos in 0..BUFFER_NB {
let mut buffer = gst::Buffer::with_size(BUFFER_SIZE).unwrap();
{
let buffer = buffer.get_mut().unwrap();
let pts = pos_to_pts(pos);
buffer.set_pts(pts);
buffer.set_dts(match pos {
0 => pts,
1 | 2 => pos_to_pts(pos + 1),
3 => pos_to_pts(pos - 2),
_ => unreachable!(),
});
buffer.set_duration(duration);
if pos == 0 {
buffer.set_flags(gst::BufferFlags::DISCONT);
} else {
buffer.set_flags(gst::BufferFlags::DELTA_UNIT);
}
}
buffers.push(buffer);
}
let pay = format!("rtpmp4gpay2 mtu={MTU}");
let depay = "rtpmp4gdepay2";
let mut expected_pay = Vec::with_capacity(BUFFER_NB);
for i in 0..PACKETS_PER_BUFFER {
expected_pay.push(vec![ExpectedPacket::builder()
.pts(pos_to_pts(i * PACKETS_PER_BUFFER))
.flags(if i == 0 {
gst::BufferFlags::DISCONT | gst::BufferFlags::MARKER
} else {
gst::BufferFlags::MARKER
})
.rtp_time(pos_to_rtp(i * PACKETS_PER_BUFFER))
.build()]);
}
let mut expected_depay = Vec::with_capacity(BUFFER_NB);
for i in 0..BUFFER_NB {
expected_depay.push(vec![ExpectedBuffer::builder()
.pts(
pos_to_pts(i)
+ if i == 3 {
11110.nseconds()
} else {
0.nseconds()
},
)
.dts(match i {
0 => pos_to_pts(0),
1 => pos_to_pts(1 + 1),
2 => pos_to_pts(2 + 1) + 11110.nseconds(),
3 => pos_to_pts(3 - 2) + 11111.nseconds(),
_ => unreachable!(),
})
.flags(if i == 0 {
gst::BufferFlags::DISCONT
} else {
gst::BufferFlags::DELTA_UNIT
})
.build()]);
}
run_test_pipeline(
Source::Buffers(caps, buffers),
&pay,
depay,
expected_pay,
expected_depay,
);
}
#[test]
fn generic_fragmented() {
const BUFFER_NB: usize = 4;
const BUFFER_SIZE: usize = 2000;
const MTU: usize = 1400;
// Enough overhead in the MTU to use this approximation:
const FRAGMENTS_PER_BUFFER: usize = (BUFFER_SIZE + MTU - 1) / MTU;
const RTP_CLOCK_RATE: u64 = 90_000;
const LAST_FRAGMENT: usize = FRAGMENTS_PER_BUFFER - 1;
const FRAME_RATE: u64 = 30;
init();
let codec_data = gst::Buffer::from_slice([0x00, 0x00, 0x01, 0xb0, 0x01]);
let caps = gst::Caps::builder("video/mpeg")
.field("mpegversion", 4i32)
.field("systemstream", false)
.field("codec_data", codec_data)
.build();
let pos_to_pts = |pos: usize| {
1000.hours()
+ (pos as u64)
.mul_div_floor(*gst::ClockTime::SECOND, FRAME_RATE)
.map(gst::ClockTime::from_nseconds)
.unwrap()
};
let pos_to_rtp = |pos: usize| {
((pos as u64)
.mul_div_ceil(RTP_CLOCK_RATE, FRAME_RATE)
.unwrap()
& 0xffff_ffff) as u32
};
let duration =
gst::ClockTime::from_nseconds(1.mul_div_ceil(*gst::ClockTime::SECOND, FRAME_RATE).unwrap());
let mut buffers = Vec::with_capacity(BUFFER_NB);
for pos in 0..BUFFER_NB {
let mut buffer = gst::Buffer::with_size(BUFFER_SIZE).unwrap();
{
let buffer = buffer.get_mut().unwrap();
let pts = pos_to_pts(pos);
buffer.set_pts(pts);
buffer.set_dts(match pos {
0 => pts,
1 | 2 => pos_to_pts(pos + 1),
3 => pos_to_pts(pos - 2),
_ => unreachable!(),
});
buffer.set_duration(duration);
if pos == 0 {
buffer.set_flags(gst::BufferFlags::DISCONT);
} else {
buffer.set_flags(gst::BufferFlags::DELTA_UNIT);
}
}
buffers.push(buffer);
}
let pay = format!("rtpmp4gpay2 mtu={MTU}");
let depay = "rtpmp4gdepay2";
let mut expected_pay = Vec::with_capacity(BUFFER_NB);
for i in 0..BUFFER_NB {
expected_pay.push({
let mut packets = Vec::with_capacity(FRAGMENTS_PER_BUFFER);
for frag in 0..FRAGMENTS_PER_BUFFER {
packets.push(
ExpectedPacket::builder()
.pts(pos_to_pts(i))
.flags(match (i, frag) {
(0, 0) => gst::BufferFlags::DISCONT,
(_, LAST_FRAGMENT) => gst::BufferFlags::MARKER,
_ => gst::BufferFlags::empty(),
})
.rtp_time(pos_to_rtp(i))
.marker_bit(frag == LAST_FRAGMENT)
.build(),
);
}
packets
});
}
let mut expected_depay = Vec::with_capacity(BUFFER_NB);
for i in 0..BUFFER_NB {
expected_depay.push(vec![ExpectedBuffer::builder()
.pts(pos_to_pts(i))
.dts(match i {
0 => pos_to_pts(0),
1 => pos_to_pts(1 + 1),
2 => pos_to_pts(2 + 1) + 11110.nseconds(),
3 => pos_to_pts(3 - 2) + 1.nseconds(),
_ => unreachable!(),
})
.size(BUFFER_SIZE)
.flags(if i == 0 {
gst::BufferFlags::DISCONT
} else {
gst::BufferFlags::DELTA_UNIT
})
.build()]);
}
run_test_pipeline(
Source::Buffers(caps, buffers),
&pay,
depay,
expected_pay,
expected_depay,
);
}
#[test]
fn generic_variable_au_size() {
const MTU: usize = 1400;
const AU_NB: usize = 5;
const SMALL_AU_SIZE: usize = 500;
const LARGE_AU_SIZE: usize = 2000;
const FRAGMENTS_PER_LARGE_BUFFER: usize = (LARGE_AU_SIZE + MTU - 1) / MTU;
const LAST_FRAGMENT: usize = FRAGMENTS_PER_LARGE_BUFFER - 1;
const RTP_CLOCK_RATE: u64 = 90_000;
const FRAME_RATE: u64 = 30;
init();
let codec_data = gst::Buffer::from_slice([0x00, 0x00, 0x01, 0xb0, 0x01]);
let caps = gst::Caps::builder("video/mpeg")
.field("mpegversion", 4i32)
.field("systemstream", false)
.field("codec_data", codec_data)
.build();
let pos_to_pts = |pos: usize| {
1000.hours()
+ (pos as u64)
.mul_div_floor(*gst::ClockTime::SECOND, FRAME_RATE)
.map(gst::ClockTime::from_nseconds)
.unwrap()
};
let pos_to_rtp = |pos: usize| {
((pos as u64)
.mul_div_ceil(RTP_CLOCK_RATE, FRAME_RATE)
.unwrap()
& 0xffff_ffff) as u32
};
let duration =
gst::ClockTime::from_nseconds(1.mul_div_ceil(*gst::ClockTime::SECOND, FRAME_RATE).unwrap());
let is_large_au = |pos| pos % 4 == 0;
let au_size = |pos| {
if is_large_au(pos) {
LARGE_AU_SIZE
} else {
SMALL_AU_SIZE
}
};
let mut buffers = Vec::with_capacity(AU_NB);
for pos in 0..AU_NB {
let mut buffer = gst::Buffer::with_size(au_size(pos)).unwrap();
{
let buffer = buffer.get_mut().unwrap();
let pts = pos_to_pts(pos);
buffer.set_pts(pts);
buffer.set_dts(match pos % 4 {
0 => pts,
1 | 2 => pos_to_pts(pos + 1),
3 => pos_to_pts(pos - 2),
_ => unreachable!(),
});
buffer.set_duration(duration);
if pos == 0 {
buffer.set_flags(gst::BufferFlags::DISCONT);
} else {
buffer.set_flags(gst::BufferFlags::DELTA_UNIT);
}
}
buffers.push(buffer);
}
let pay = format!("rtpmp4gpay2 mtu={MTU}");
let depay = "rtpmp4gdepay2";
let mut expected_pay = Vec::with_capacity(AU_NB);
let mut pending_size = 0;
let mut pending_packet = None;
for i in 0..AU_NB {
let size = au_size(i);
if size > MTU {
// Incoming AU to fragment
let mut packet_list = Vec::with_capacity(3);
if let Some(pending) = pending_packet.take() {
// and there are pending AUs => push them first
packet_list.push(pending);
pending_size = 0;
}
// Then push the fragments for current AU
for f in 0..FRAGMENTS_PER_LARGE_BUFFER {
packet_list.push(
ExpectedPacket::builder()
.pts(pos_to_pts(i))
.flags(match (i, f) {
(0, 0) => gst::BufferFlags::DISCONT,
(_, 0) => gst::BufferFlags::empty(),
(_, LAST_FRAGMENT) => gst::BufferFlags::MARKER,
_ => unreachable!(),
})
.rtp_time(pos_to_rtp(i))
.marker_bit(f == LAST_FRAGMENT)
.build(),
)
}
expected_pay.push(packet_list);
} else {
let must_push =
if i + 1 < AU_NB && pending_size + size + au_size(i + 1) > MTU || i + 1 == AU_NB {
// Next will overflow => push now
// or last AU and not a fragmented one, will be pushed with time deadline
true
} else {
false
};
if must_push {
if let Some(pending) = pending_packet.take() {
expected_pay.push(vec![pending]);
pending_size = 0;
} else {
// Last AU
expected_pay.push(vec![ExpectedPacket::builder()
.pts(pos_to_pts(i))
.flags(gst::BufferFlags::MARKER)
.rtp_time(pos_to_rtp(i))
.build()]);
}
} else if pending_packet.is_none() {
// Wait for more payload
pending_packet = Some(
ExpectedPacket::builder()
.pts(pos_to_pts(i))
.flags(gst::BufferFlags::MARKER)
.rtp_time(pos_to_rtp(i))
.build(),
);
pending_size = size;
} else {
// There's already a pending packet
pending_size += size;
}
}
}
let mut expected_depay = Vec::with_capacity(AU_NB);
for i in 0..AU_NB {
expected_depay.push(vec![ExpectedBuffer::builder()
.pts(pos_to_pts(i))
.dts(match i % 4 {
0 => pos_to_pts(0),
1 => pos_to_pts(1 + 1),
2 => pos_to_pts(2 + 1) + 11110.nseconds(),
3 => pos_to_pts(3 - 2) + 11111.nseconds(),
_ => unreachable!(),
})
.size(au_size(i))
.flags(if i == 0 {
gst::BufferFlags::DISCONT
} else {
gst::BufferFlags::DELTA_UNIT
})
.build()]);
}
run_test_pipeline(
Source::Buffers(caps, buffers),
&pay,
depay,
expected_pay,
expected_depay,
);
}

View file

@ -15,6 +15,7 @@ pub fn seqnum_distance(seqnum1: u16, seqnum2: u16) -> i16 {
/// # Panic
///
/// Panics if `bit_len` > 32.
#[inline]
pub fn raw_2_comp_to_i32(val: u32, bit_len: u8) -> i32 {
assert!(bit_len <= 32);
@ -26,6 +27,37 @@ pub fn raw_2_comp_to_i32(val: u32, bit_len: u8) -> i32 {
}
}
/// Masks the provided `i32` value to be used as a two's complement of len `bit_len`,
/// so the resulting value can be passed to APIs which check the bit range.
///
/// Returns `None` the `i32` value exceeds the range of a two's complement
/// of len `bit_len`.
///
/// # Panic
///
/// Panics if `bit_len` > 32.
#[inline]
pub fn mask_valid_2_comp(val: i32, bit_len: u8) -> Option<i32> {
let bit_len = bit_len as u32;
if bit_len == i32::BITS {
return Some(val);
}
assert!(bit_len < i32::BITS);
let overhead = i32::BITS - bit_len;
let leading_zeros = val.leading_zeros();
if leading_zeros > 0 && leading_zeros < overhead
|| leading_zeros == 0 && val.leading_ones() < overhead
{
return None;
}
Some(((1 << bit_len) - 1) & val)
}
/// Defines a comparable new type `$typ` on a `[std::num::Wrapping]::<u32>`.
///
/// The new type will wrap-around on additions and substractions and it comparison
@ -332,6 +364,34 @@ mod tests {
assert_eq!(raw_2_comp_to_i32(0x8000_0000, BITS), i32::MIN);
}
#[test]
fn mask_valid_2_comp_ok() {
const BITS: u8 = i32::BITS as u8;
assert_eq!(mask_valid_2_comp(0, BITS), Some(0));
assert_eq!(mask_valid_2_comp(-1, BITS), Some(-1));
assert_eq!(mask_valid_2_comp(i32::MIN, BITS), Some(i32::MIN));
assert_eq!(mask_valid_2_comp(i32::MAX, BITS), Some(i32::MAX));
assert_eq!(mask_valid_2_comp(0, 6), Some(0));
assert_eq!(mask_valid_2_comp(0x2f, 6), Some(0x2f)); // -1i6
assert_eq!(mask_valid_2_comp(0x20, 6), Some(0x20)); // i6::MIN
assert_eq!(mask_valid_2_comp(0x1f, 6), Some(0x1f)); // i6::MAX
assert_eq!(mask_valid_2_comp(0x1f, 5), Some(0x1f)); // i6::MAX => -1i5
}
#[test]
fn mask_valid_2_comp_ko() {
const BITS: u8 = i32::BITS as u8;
assert_eq!(mask_valid_2_comp(0, BITS), Some(0));
assert_eq!(mask_valid_2_comp(-1, BITS), Some(-1));
assert_eq!(mask_valid_2_comp(i32::MIN, BITS), Some(i32::MIN));
assert_eq!(mask_valid_2_comp(i32::MAX, BITS), Some(i32::MAX));
assert_eq!(mask_valid_2_comp(0, 5), Some(0));
assert!(mask_valid_2_comp(0x2f, 5).is_none()); // -1i6
assert!(mask_valid_2_comp(0x20, 5).is_none()); // i6::MIN
}
#[test]
fn wrapping_u32_basics() {
let zero = MyWrapper::ZERO;