diff --git a/docs/plugins/gst_plugins_cache.json b/docs/plugins/gst_plugins_cache.json index a5300fe8..3c57c068 100644 --- a/docs/plugins/gst_plugins_cache.json +++ b/docs/plugins/gst_plugins_cache.json @@ -7003,6 +7003,60 @@ }, "rank": "marginal" }, + "rtpmp4gpay2": { + "author": "François Laignel ", + "description": "Payload an MPEG-4 Generic elementary stream into RTP packets (RFC 3640)", + "hierarchy": [ + "GstRtpMpeg4GenericPay", + "GstRtpBasePay2", + "GstElement", + "GstObject", + "GInitiallyUnowned", + "GObject" + ], + "klass": "Codec/Payloader/Network/RTP", + "pad-templates": { + "sink": { + "caps": "video/mpeg:\n mpegversion: 4\n systemstream: false\naudio/mpeg:\n mpegversion: 4\n stream-format: raw\n", + "direction": "sink", + "presence": "always" + }, + "src": { + "caps": "application/x-rtp:\n media: { (string)audio, (string)video }\n clock-rate: [ 1, 2147483647 ]\n encoding-name: MPEG4-GENERIC\n streamtype: { (string)4, (string)5 }\n mode: { (string)generic, (string)AAC-lbr, (string)AAC-hbr, (string)aac-hbr }\n", + "direction": "src", + "presence": "always" + } + }, + "properties": { + "aggregate-mode": { + "blurb": "Whether to send out AUs immediately or aggregate them until a packet is full.", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "auto (-1)", + "mutable": "null", + "readable": true, + "type": "GstRtpMpeg4GenericPayAggregateMode", + "writable": true + }, + "max-ptime": { + "blurb": "Maximum duration of the packet data in ns (-1 = unlimited up to MTU)", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "18446744073709551615", + "max": "9223372036854775807", + "min": "-1", + "mutable": "playing", + "readable": true, + "type": "gint64", + "writable": true + } + }, + "rank": "marginal" + }, "rtppcmadepay2": { "author": "Sebastian Dröge ", "description": "Depayload A-law from RTP packets (RFC 3551)", @@ -7772,6 +7826,26 @@ } } }, + "GstRtpMpeg4GenericPayAggregateMode": { + "kind": "enum", + "values": [ + { + "desc": "Automatic: zero-latency if upstream is live, otherwise aggregate elementary streams until packet is full.", + "name": "auto", + "value": "-1" + }, + { + "desc": "Zero Latency: always send out elementary streams right away, do not wait for more elementary streams to fill a packet.", + "name": "zero-latency", + "value": "0" + }, + { + "desc": "Aggregate: collect elementary streams until we have a full packet or the max-ptime limit is hit (if set).", + "name": "aggregate", + "value": "1" + } + ] + }, "GstRtpPcmauDepay2": { "hierarchy": [ "GstRtpPcmauDepay2", @@ -11389,4 +11463,4 @@ "tracers": {}, "url": "https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs" } -} +} \ No newline at end of file diff --git a/net/rtp/src/lib.rs b/net/rtp/src/lib.rs index 19fbc65f..c57d484d 100644 --- a/net/rtp/src/lib.rs +++ b/net/rtp/src/lib.rs @@ -62,6 +62,7 @@ fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { mp4a::pay::register(plugin)?; mp4g::depay::register(plugin)?; + mp4g::pay::register(plugin)?; pcmau::depay::register(plugin)?; pcmau::pay::register(plugin)?; diff --git a/net/rtp/src/mp4a/parsers.rs b/net/rtp/src/mp4a/parsers.rs index ecb670eb..3b2246fe 100644 --- a/net/rtp/src/mp4a/parsers.rs +++ b/net/rtp/src/mp4a/parsers.rs @@ -39,6 +39,12 @@ pub enum MPEG4AudioParserError { #[error("Wrong frame size. Required {required}, available {available}")] WrongFrameSize { required: usize, available: usize }, + + #[error("Unsupported Profile {profile}")] + UnsupportedProfile { profile: String }, + + #[error("Unsupported Level {level} for Profile {profile}")] + UnsupportedLevel { level: String, profile: String }, } impl MPEG4AudioParserError { @@ -168,6 +174,73 @@ impl FromBitStream for AudioSpecificConfig { } } +/// audioProfileLevelIndication - ISO/IEC 14496-3 (2009) table 1.14 +pub struct ProfileLevel { + pub profile: String, + pub level: String, + pub id: u8, +} + +impl ProfileLevel { + pub fn from_caps(s: &gst::StructureRef) -> anyhow::Result { + // Note: could use an AudioSpecificConfig based approach + // similar to what is done in gst_codec_utils_aac_get_level + // from gst-plugins-base/gst-libs/gst/pbutils/codec-utils.c + + use MPEG4AudioParserError::*; + + let profile = s.get::("profile").context("profile")?; + let level = s.get::("level").context("level")?; + + let id = match profile.to_lowercase().as_str() { + "lc" => { + // Assumed to be AAC Profile in table 1.14 + match level.as_str() { + "1" => 0x28, + "2" => 0x29, + "4" => 0x2a, + "5" => 0x2b, + _ => Err(UnsupportedLevel { + level: level.clone(), + profile: profile.clone(), + })?, + } + } + "he-aac" | "he-aac-v1" => { + // High Efficiency AAC Profile in table 1.14 + match level.as_str() { + "2" => 0x2c, + "3" => 0x2d, + "4" => 0x2e, + "5" => 0x2f, + _ => Err(UnsupportedLevel { + level: level.clone(), + profile: profile.clone(), + })?, + } + } + "he-aac-v2" => { + // High Efficiency AAC v2 Profile in table 1.14 + match level.as_str() { + "2" => 0x30, + "3" => 0x31, + "4" => 0x32, + "5" => 0x33, + _ => Err(UnsupportedLevel { + level: level.clone(), + profile: profile.clone(), + })?, + } + } + _ => Err(UnsupportedProfile { + profile: profile.clone(), + })?, + }; + + Ok(ProfileLevel { profile, level, id }) + } +} + #[derive(Debug)] pub struct Subframes<'a> { frame: gst::MappedBuffer, diff --git a/net/rtp/src/mp4g/header.rs b/net/rtp/src/mp4g/header.rs index 113d8f2b..fe2e54d0 100644 --- a/net/rtp/src/mp4g/header.rs +++ b/net/rtp/src/mp4g/header.rs @@ -1,17 +1,38 @@ //! Access Unit Header and its parser & writer. -use bitstream_io::{BitRead, FromBitStreamWith}; +use bitstream_io::{BitRead, BitWrite, FromBitStreamWith, ToBitStreamWith}; use crate::mp4g::{AccessUnitIndex, ModeConfig}; -use crate::utils::raw_2_comp_to_i32; +use crate::utils::{mask_valid_2_comp, raw_2_comp_to_i32}; #[derive(thiserror::Error, Debug, PartialEq, Eq)] pub enum AuHeaderError { #[error("Unexpected zero-sized AU {}", .0)] ZeroSizedAu(AccessUnitIndex), + #[error("Undefined mandatory size for AU {}", .0)] + UndefinedMandatorySize(AccessUnitIndex), + + #[error("Inconsistent delta index {index}. Previous index: {prev_index}")] + InconsistentDeltaIndex { + index: AccessUnitIndex, + prev_index: AccessUnitIndex, + }, + #[error("Unexpected CTS flag set for the first AU header {}", .0)] CtsFlagSetInFirstAuHeader(AccessUnitIndex), + + #[error("Out of range CTS-delta {cts_delta} for AU {index}")] + OutOfRangeSizeCtsDelta { + cts_delta: i32, + index: AccessUnitIndex, + }, + + #[error("Out of range DTS-delta {dts_delta} for AU {index}")] + OutOfRangeSizeDtsDelta { + dts_delta: i32, + index: AccessUnitIndex, + }, } #[derive(Debug)] @@ -118,3 +139,97 @@ impl<'a> FromBitStreamWith<'a> for AuHeader { Ok(this) } } + +impl<'a> ToBitStreamWith<'a> for AuHeader { + type Context = AuHeaderContext<'a>; + type Error = anyhow::Error; + + fn to_writer( + &self, + w: &mut W, + ctx: &AuHeaderContext, + ) -> Result<(), Self::Error> { + use anyhow::Context; + use AuHeaderError::*; + + if ctx.config.size_len > 0 { + let Some(size) = self.size else { + return Err(UndefinedMandatorySize(self.index).into()); + }; + + if size == 0 { + Err(ZeroSizedAu(self.index))?; + } + + w.write(ctx.config.size_len as u32, size) + .context("AU-size")?; + } + + match ctx.prev_index { + None => w + .write(ctx.config.index_len as u32, *self.index) + .context("AU-Index")?, + Some(prev_index) => { + let index_delta = self + .index + .checked_sub(*prev_index) + .and_then(|delta| delta.checked_sub(1)) + .ok_or(InconsistentDeltaIndex { + index: self.index, + prev_index, + }) + .context("AU-Index-delta")?; + w.write(ctx.config.index_delta_len as u32, index_delta) + .context("AU-Index-delta")?; + } + } + + if ctx.config.cts_delta_len > 0 { + // § 3.2.1.1: + // > the CTS-flag field MUST have the value 0 in the first AU-header + // > the CTS-flag field SHOULD be 0 for any non-first fragment of an Access Unit + if ctx.prev_index.is_none() { + w.write_bit(false).context("CTS-flag")?; + } else if let Some(cts_delta) = self.cts_delta { + let Some(cts_delta) = mask_valid_2_comp(cts_delta, ctx.config.cts_delta_len) else { + return Err(OutOfRangeSizeCtsDelta { + cts_delta, + index: self.index, + } + .into()); + }; + + w.write_bit(true).context("CTS-flag")?; + w.write(ctx.config.cts_delta_len as u32, cts_delta) + .context("CTS-delta")?; + } else { + w.write_bit(false).context("CTS-flag")?; + } + } + + if ctx.config.dts_delta_len > 0 { + if let Some(dts_delta) = self.dts_delta { + let Some(dts_delta) = mask_valid_2_comp(dts_delta, ctx.config.dts_delta_len) else { + return Err(OutOfRangeSizeDtsDelta { + dts_delta, + index: self.index, + } + .into()); + }; + + w.write_bit(true).context("DTS-flag")?; + w.write(ctx.config.dts_delta_len as u32, dts_delta) + .context("DTS-delta")?; + } else { + w.write_bit(false).context("DTS-flag")?; + } + } + + if ctx.config.random_access_indication { + w.write_bit(self.maybe_random_access.unwrap_or(false)) + .context("RAP-flag")?; + } + + Ok(()) + } +} diff --git a/net/rtp/src/mp4g/mod.rs b/net/rtp/src/mp4g/mod.rs index 7a9c8309..5eac6c77 100644 --- a/net/rtp/src/mp4g/mod.rs +++ b/net/rtp/src/mp4g/mod.rs @@ -5,6 +5,10 @@ mod header; pub use header::{AuHeader, AuHeaderContext}; mod mode; pub use mode::ModeConfig; +pub mod pay; + +#[cfg(test)] +mod tests; #[derive(thiserror::Error, Debug, PartialEq, Eq)] pub enum Mpeg4GenericError { diff --git a/net/rtp/src/mp4g/mode.rs b/net/rtp/src/mp4g/mode.rs index f2b1c571..663518d2 100644 --- a/net/rtp/src/mp4g/mode.rs +++ b/net/rtp/src/mp4g/mode.rs @@ -1,5 +1,7 @@ //! MPEG-4 Generic mode. +use gst::caps::NoFeature; + use std::str::FromStr; #[derive(thiserror::Error, Debug, PartialEq, Eq)] @@ -64,6 +66,16 @@ impl ModeConfig { Some(self.max_displacement) } + /// Returns the max length in bits of the AU headers + pub fn max_header_bit_len(&self) -> usize { + self.size_len as usize + + std::cmp::max(self.index_len, self.index_delta_len) as usize + + self.cts_delta_len as usize + + self.dts_delta_len as usize + + if self.random_access_indication { 1 } else { 0 } + + self.stream_state_indication as usize + } + pub fn from_caps(s: &gst::StructureRef) -> anyhow::Result { use ModeError::*; @@ -138,4 +150,50 @@ impl ModeConfig { }, } } + + pub fn add_to_caps( + &self, + builder: gst::caps::Builder, + ) -> Result, ModeError> { + use ModeError::*; + + if self.size_len != 0 && self.constant_size != 0 { + Err(BothAuSizeLenAndConstantSize)?; + } + + if self.size_len == 0 && self.constant_size == 0 { + Err(NeitherAuSizeLenNorConstantSize)?; + } + + if self.index_len > 0 && self.index_delta_len == 0 { + Err(MandatoryIndexDeltaLength)?; + } + + if self.stream_state_indication > 0 { + panic!("AU Header Stream State not supported"); + } + + Ok(builder + .field("sizelength", self.size_len as i32) + .field("indexlength", self.index_len as i32) + .field("indexdeltalength", self.index_delta_len as i32) + .field("ctsdeltalength", self.cts_delta_len as i32) + .field("dtsdeltalength", self.dts_delta_len as i32) + .field( + "randomaccessindication", + if self.random_access_indication { + 1u8 + } else { + 0u8 + }, + ) + .field("streamstateindication", self.stream_state_indication as i32) + .field( + "auxiliarydatasizelength", + self.auxiliary_data_size_len as i32, + ) + .field("constantsize", self.constant_size as i32) + .field("constantduration", self.constant_duration as i32) + .field("maxdisplacement", self.max_displacement as i32)) + } } diff --git a/net/rtp/src/mp4g/pay/imp.rs b/net/rtp/src/mp4g/pay/imp.rs new file mode 100644 index 00000000..78f27457 --- /dev/null +++ b/net/rtp/src/mp4g/pay/imp.rs @@ -0,0 +1,921 @@ +// GStreamer RTP MPEG-4 Generic Payloader +// +// Copyright (C) 2023-2024 François Laignel +// +// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at +// . +// +// SPDX-License-Identifier: MPL-2.0 + +/** + * SECTION:element-rtpmp4gpay2 + * @see_also: rtpmp4gpay2, rtpmp4gpay, rtpmp4gpay, fdkaacenc, fdkaacdec, avenc_mpeg4, avdec_mpeg4 + * + * Payload an MPEG-4 Generic elementary stream into RTP packets as per [RFC 3640][rfc-3640]. + * Also see the [IANA media-type page for MPEG-4 Generic][iana-mpeg4-generic]. + * + * [rfc-3640]: https://www.rfc-editor.org/rfc/rfc3640.html#section-4 + * [iana-mpeg4-generic]: https://www.iana.org/assignments/media-types/application/mpeg4-generic + * + * ## Aggregation Modes + * + * The default aggregation mode is `auto`: If upstream is live, the payloader will send out + * AUs immediately, even if they don't completely fill a packet, in order to minimise + * latency. If upstream is not live, the payloader will by default aggregate AUs until + * it has completely filled an RTP packet as per the configured MTU size or the `max-ptime` + * property if it is set (it is not set by default). + * + * The aggregation mode can be controlled via the `aggregate-mode` property. + * + * ## Example pipeline + * |[ + * gst-launch-1.0 audiotestsrc ! fdkaacenc ! rtpmp4gpay2 ! udpsink host=127.0.0.1 port=5004 + * ]| This will encode an audio test signal to AAC and then payload the encoded audio + * into RTP packets and send them out via UDP to localhost (IPv4) port 5004. + * You can use the #rtpmp4gdepay2 or #rtpmp4gdepay elements to depayload such a stream, and + * the #fdkaacdec element to decode the depayloaded stream. + * + * Since: plugins-rs-0.13.0 + */ +use atomic_refcell::AtomicRefCell; +use bitstream_io::{BigEndian, BitCounter, BitRead, BitReader, BitWrite, BitWriter}; +use once_cell::sync::Lazy; + +use gst::{glib, prelude::*, subclass::prelude::*}; +use smallvec::SmallVec; + +use std::collections::VecDeque; +use std::sync::Mutex; + +use crate::basepay::{PacketToBufferRelation, RtpBasePay2Ext, RtpBasePay2Impl, RtpBasePay2ImplExt}; + +use super::RtpMpeg4GenericPayAggregateMode; +use crate::mp4a::parsers::{AudioSpecificConfig, ProfileLevel}; +use crate::mp4g::{AccessUnitIndex, AuHeader, AuHeaderContext, ModeConfig}; + +const VOS_STARTCODE: u32 = 0x000001B0; + +/// The size of the field representing the AU headers section len. +const HEADERS_LEN_SIZE: usize = 2; + +/// Access Unit maximum header len in bytes. +/// This depends on the supported mode. In current implementation, 3 is the maximum. +const HEADER_MAX_LEN: usize = 3; + +#[derive(Clone)] +struct Settings { + max_ptime: Option, + aggregate_mode: RtpMpeg4GenericPayAggregateMode, +} + +impl Default for Settings { + fn default() -> Self { + Settings { + aggregate_mode: RtpMpeg4GenericPayAggregateMode::Auto, + max_ptime: None, + } + } +} + +#[derive(Default)] +pub struct RtpMpeg4GenericPay { + state: AtomicRefCell, + settings: Mutex, + is_live: Mutex>, +} + +#[derive(Debug)] +struct AccessUnit { + id: u64, + pts: Option, + dts_delta: Option, + duration: Option, + maybe_random_access: Option, + buffer: gst::MappedBuffer, +} + +#[derive(Default)] +struct State { + /// Configuration of current Mode. + mode: ModeConfig, + + /// Maximum bit length needed to store an AU Header. + max_header_bit_len: usize, + + /// Minimum MTU necessary to handle the outgoing packets. + min_mtu: usize, + + /// Pending AU (we collect until ptime/max-ptime is hit or the packet is full) + pending_aus: VecDeque, + pending_size: usize, + pending_duration: Option, + clock_rate: u32, + + /// Desired "packet time", i.e. packet duration, from the downstream caps, if set + ptime: Option, + max_ptime: Option, +} + +impl State { + fn flush(&mut self) { + self.pending_aus.clear(); + self.pending_size = 0; + self.pending_duration = None; + } +} + +static CAT: Lazy = Lazy::new(|| { + gst::DebugCategory::new( + "rtpmp4gpay2", + gst::DebugColorFlags::empty(), + Some("RTP MPEG-4 Generic Payloader"), + ) +}); + +#[glib::object_subclass] +impl ObjectSubclass for RtpMpeg4GenericPay { + const NAME: &'static str = "GstRtpMpeg4GenericPay"; + type Type = super::RtpMpeg4GenericPay; + type ParentType = crate::basepay::RtpBasePay2; +} + +impl ObjectImpl for RtpMpeg4GenericPay { + fn properties() -> &'static [glib::ParamSpec] { + static PROPERTIES: Lazy> = Lazy::new(|| { + vec![ + glib::ParamSpecEnum::builder_with_default( + "aggregate-mode", + Settings::default().aggregate_mode, + ) + .nick("Aggregate Mode") + .blurb( + "Whether to send out AUs immediately or aggregate them until a packet is full.", + ) + .build(), + // Using same type/semantics as C payloaders + glib::ParamSpecInt64::builder("max-ptime") + .nick("Maximum Packet Time") + .blurb("Maximum duration of the packet data in ns (-1 = unlimited up to MTU)") + .default_value( + Settings::default() + .max_ptime + .map(gst::ClockTime::nseconds) + .map(|x| x as i64) + .unwrap_or(-1), + ) + .minimum(-1) + .maximum(i64::MAX) + .mutable_playing() + .build(), + ] + }); + + PROPERTIES.as_ref() + } + + fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) { + let mut settings = self.settings.lock().unwrap(); + + match pspec.name() { + "aggregate-mode" => { + settings.aggregate_mode = value + .get::() + .expect("type checked upstream"); + } + "max-ptime" => { + let new_max_ptime = match value.get::().unwrap() { + -1 => None, + v @ 0.. => Some(gst::ClockTime::from_nseconds(v as u64)), + _ => unreachable!(), + }; + let changed = settings.max_ptime != new_max_ptime; + settings.max_ptime = new_max_ptime; + drop(settings); + + if changed { + let _ = self + .obj() + .post_message(gst::message::Latency::builder().src(&*self.obj()).build()); + } + } + _ => unimplemented!(), + }; + } + + fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value { + let settings = self.settings.lock().unwrap(); + + match pspec.name() { + "aggregate-mode" => settings.aggregate_mode.to_value(), + "max-ptime" => (settings + .max_ptime + .map(gst::ClockTime::nseconds) + .map(|x| x as i64) + .unwrap_or(-1)) + .to_value(), + _ => unimplemented!(), + } + } +} + +impl GstObjectImpl for RtpMpeg4GenericPay {} + +impl ElementImpl for RtpMpeg4GenericPay { + fn metadata() -> Option<&'static gst::subclass::ElementMetadata> { + static ELEMENT_METADATA: Lazy = Lazy::new(|| { + gst::subclass::ElementMetadata::new( + "RTP MPEG-4 Generic Payloader", + "Codec/Payloader/Network/RTP", + "Payload an MPEG-4 Generic elementary stream into RTP packets (RFC 3640)", + "François Laignel ", + ) + }); + + Some(&*ELEMENT_METADATA) + } + + fn pad_templates() -> &'static [gst::PadTemplate] { + static PAD_TEMPLATES: Lazy> = Lazy::new(|| { + let sink_pad_template = gst::PadTemplate::new( + "sink", + gst::PadDirection::Sink, + gst::PadPresence::Always, + &gst::Caps::builder_full() + .structure( + gst::Structure::builder("video/mpeg") + .field("mpegversion", 4i32) + .field("systemstream", false) + .build(), + ) + .structure( + gst::Structure::builder("audio/mpeg") + .field("mpegversion", 4i32) + .field("stream-format", "raw") + .build(), + ) + .build(), + ) + .unwrap(); + + let src_pad_template = gst::PadTemplate::new( + "src", + gst::PadDirection::Src, + gst::PadPresence::Always, + &gst::Caps::builder("application/x-rtp") + // TODO "application" is also present in rtpmp4gpay caps template + // but it doesn't handle it in gst_rtp_mp4g_pay_setcaps + .field("media", gst::List::new(["audio", "video"])) + .field("clock-rate", gst::IntRange::new(1i32, i32::MAX)) + .field("encoding-name", "MPEG4-GENERIC") + // Required string params: + .field("streamtype", gst::List::new(["4", "5"])) // 4 = video, 5 = audio + // "profile-level-id = [1,MAX], " + // "config = (string)" + .field( + "mode", + gst::List::new(["generic", "AAC-lbr", "AAC-hbr", "aac-hbr"]), + ) + // Optional general parameters: + // "objecttype = [1,MAX], " + // "constantsize = [1,MAX], " // constant size of each AU + // "constantduration = [1,MAX], " // constant duration of each AU + // "maxdisplacement = [1,MAX], " + // "de-interleavebuffersize = [1,MAX], " + // Optional configuration parameters: + // "sizelength = [1, 32], " + // "indexlength = [1, 32], " + // "indexdeltalength = [1, 32], " + // "ctsdeltalength = [1, 32], " + // "dtsdeltalength = [1, 32], " + // "randomaccessindication = {0, 1}, " + // "streamstateindication = [0, 32], " + // "auxiliarydatasizelength = [0, 32]" ) + .build(), + ) + .unwrap(); + + vec![sink_pad_template, src_pad_template] + }); + + PAD_TEMPLATES.as_ref() + } +} + +/// Returns the difference between `ClockTime`s `ct1` & `ct2` in RTP scale. +/// +/// Returns `None` if at least one of the `ClockTime`s is `None`. +/// Returns `Some(None)` if an overflow occurred, error management is left to the caller. +/// Returns `Some(delta)` if the difference could be computed. +fn ct_delta_to_rtp( + ct1: Option, + ct0: Option, + clock_rate: u32, +) -> Option> { + ct1.into_positive().opt_sub(ct0).map(|delta_ct| { + delta_ct + .into_inner_signed() + .try_into() + .ok() + .and_then(|delta_inner: i64| { + delta_inner + .mul_div_ceil(clock_rate as i64, *gst::ClockTime::SECOND as i64) + .and_then(|dts_delta| dts_delta.try_into().ok()) + }) + }) +} + +impl RtpBasePay2Impl for RtpMpeg4GenericPay { + const ALLOWED_META_TAGS: &'static [&'static str] = &["audio"]; + + fn set_sink_caps(&self, caps: &gst::Caps) -> bool { + let s = caps.structure(0).unwrap(); + + let codec_data = match s.get::<&gst::BufferRef>("codec_data") { + Ok(codec_data) => codec_data, + Err(err) => { + gst::error!(CAT, imp: self, "Error getting codec_data from Caps: {err}"); + return false; + } + }; + + let Ok(codec_data) = codec_data.map_readable() else { + gst::error!(CAT, imp: self, "Failed to map codec_data as readable"); + return false; + }; + + let codec_data_str = hex::encode(&codec_data); + + let caps_builder = gst::Caps::builder("application/x-rtp") + .field("seqnum-base", self.obj().property::("seqnum") + 1) + .field("mpegversion", 4i32) + .field("encoding-name", "MPEG4-GENERIC") + .field("config", codec_data_str); + + let (clock_rate, mode, caps_builder) = match s.name().as_str() { + "audio/mpeg" => { + let mut r = BitReader::endian(codec_data.as_slice(), BigEndian); + let config = match r.parse::() { + Ok(config) => config, + Err(err) => { + gst::error!(CAT, imp: self, "Error parsing audio codec_data: {err:#}"); + return false; + } + }; + + if config.audio_object_type == 0 || config.audio_object_type > 6 { + gst::error!(CAT, imp: self, "Unsupported Audio Object Type {}", config.audio_object_type); + return false; + } + + let profile_level = match ProfileLevel::from_caps(s) { + Ok(profile_level) => profile_level, + Err(err) => { + gst::error!(CAT, imp: self, "Error getting profile level from Caps: {err:#}"); + return false; + } + }; + + gst::log!(CAT, imp: self, "Using audio codec_data {config:?}"); + + // AAC-hbr: also used by rtpmp4gpay + // RFC 3640 also defines AAC-lbr, with a maximum encoded buffer + // size of 63 bytes and which can't be fragmented. Only AAC-hbr + // is used because it is more flexible. We could implement AAC-lbr + // provided make sure the encoded buffers can't exceed the limit + // and add a flag to prevent fragmentation in `send_packets()`. + // See https://www.rfc-editor.org/rfc/rfc3640.html#section-3.3.5 + let mode = ModeConfig { + size_len: 13, + index_len: 3, + index_delta_len: 3, + constant_duration: config.frame_len as u32, + ..Default::default() + }; + + let caps_builder = mode + .add_to_caps( + caps_builder + .field("media", "audio") + .field("streamtype", "5") + .field("mode", "AAC-hbr") + .field("clock-rate", config.sampling_freq as i32) + .field("profile", &profile_level.profile) + .field("level", &profile_level.level) + .field("profile-level-id", profile_level.id) + .field("encoding-params", config.channel_conf as i32), + ) + .expect("invalid audio mode"); + + (config.sampling_freq, mode, caps_builder) + } + "video/mpeg" => { + if codec_data.len() < 5 { + gst::error!(CAT, imp: self, "Error parsing video codec_data: too short"); + return false; + } + + let code = u32::from_be_bytes(codec_data[..4].try_into().unwrap()); + let profile = if code == VOS_STARTCODE { + let profile = codec_data[4]; + gst::log!(CAT, imp: self, "Using video codec_data profile {profile}"); + + profile + } else { + gst::warning!(CAT, imp: self, "Unexpected VOS startcode in video codec_data. Assuming profile '1'"); + + 1 + }; + + // Use a larger size_len than rtpmp4gpay + // otherwise some large AU can't be payloaded. + // rtpmp4gpay uses bit shifts to have the AU data size + // fit in 13 bits, resulting in an invalid size. + let mode = ModeConfig { + size_len: 16, + index_len: 3, + index_delta_len: 3, + cts_delta_len: 16, + dts_delta_len: 16, + random_access_indication: true, + ..Default::default() + }; + + let caps_builder = mode + .add_to_caps( + caps_builder + .field("media", "video") + .field("streamtype", "4") + .field("mode", "generic") + .field("clock-rate", 90000i32) + .field("profile-level-id", profile as i32), + ) + .expect("invalid video mode"); + + (90000, mode, caps_builder) + } + // TODO handle "application" + _ => unreachable!(), + }; + + self.obj().set_src_caps(&caps_builder.build()); + + let mut state = self.state.borrow_mut(); + state.max_header_bit_len = mode.max_header_bit_len(); + state.min_mtu = rtp_types::RtpPacket::MIN_RTP_PACKET_LEN + + HEADERS_LEN_SIZE + + (state.max_header_bit_len + 7) / 8 + + 1; + state.mode = mode; + state.clock_rate = clock_rate; + + true + } + + fn negotiate(&self, mut src_caps: gst::Caps) { + // Fixate as a first step + src_caps.fixate(); + + let s = src_caps.structure(0).unwrap(); + + // Negotiate ptime/maxptime with downstream and use them in combination with the + // properties. See https://www.iana.org/assignments/media-types/application/mpeg4-generic + let ptime = s + .get::("ptime") + .ok() + .map(u64::from) + .map(gst::ClockTime::from_mseconds); + + let max_ptime = s + .get::("maxptime") + .ok() + .map(u64::from) + .map(gst::ClockTime::from_mseconds); + + self.parent_negotiate(src_caps); + + let mut state = self.state.borrow_mut(); + state.ptime = ptime; + state.max_ptime = max_ptime; + drop(state); + } + + // Encapsulation of MPEG-4 Generic Elementary Streams: + // https://www.rfc-editor.org/rfc/rfc3640 + fn handle_buffer( + &self, + buffer: &gst::Buffer, + id: u64, + ) -> Result { + let mut state = self.state.borrow_mut(); + let mut settings = self.settings.lock().unwrap(); + + gst::trace!(CAT, imp: self, "Handling buffer {id} duration {} pts {} dts {}, len {}", + buffer.duration().display(), buffer.pts().display(), buffer.dts().display(), buffer.size(), + ); + + let maybe_random_access = if state.mode.random_access_indication { + Some(!buffer.flags().contains(gst::BufferFlags::DELTA_UNIT)) + } else { + None + }; + + let dts_delta = ct_delta_to_rtp(buffer.dts(), buffer.pts(), state.clock_rate).and_then(|dts_delta_res| { + if dts_delta_res.is_none() { + gst::warning!(CAT, imp: self, "Overflow computing DTS-delta between pts {} & dts {}", + buffer.dts().display(), buffer.pts().display(), + ); + } + + dts_delta_res + }); + + gst::trace!(CAT, imp: self, + "Pushing AU from buffer {id} dts_delta {dts_delta:?} random access {maybe_random_access:?}", + ); + + state.pending_aus.push_back(AccessUnit { + id, + duration: buffer.duration(), + pts: buffer.pts(), + dts_delta, + buffer: buffer.clone().into_mapped_buffer_readable().map_err(|_| { + gst::error!(CAT, imp: self, "Can't map incoming buffer readable"); + gst::FlowError::Error + })?, + maybe_random_access, + }); + + state.pending_size += buffer.size(); + state.pending_duration.opt_add_assign(buffer.duration()); + + // Make sure we have queried upstream liveness if needed + if settings.aggregate_mode == RtpMpeg4GenericPayAggregateMode::Auto { + self.ensure_upstream_liveness(&mut settings); + } + + self.send_packets(&settings, &mut state, SendPacketMode::WhenReady) + } + + fn drain(&self) -> Result { + let settings = self.settings.lock().unwrap().clone(); + let mut state = self.state.borrow_mut(); + + self.send_packets(&settings, &mut state, SendPacketMode::ForcePending) + } + + fn flush(&self) { + self.state.borrow_mut().flush(); + } + + #[allow(clippy::single_match)] + fn src_query(&self, query: &mut gst::QueryRef) -> bool { + let res = self.parent_src_query(query); + if !res { + return false; + } + + match query.view_mut() { + gst::QueryViewMut::Latency(query) => { + let settings = self.settings.lock().unwrap(); + + let (is_live, mut min, mut max) = query.result(); + + { + let mut live_guard = self.is_live.lock().unwrap(); + + if Some(is_live) != *live_guard { + gst::info!(CAT, imp: self, "Upstream is live: {is_live}"); + *live_guard = Some(is_live); + } + } + + if self.effective_aggregate_mode(&settings) + == RtpMpeg4GenericPayAggregateMode::Aggregate + { + if let Some(max_ptime) = settings.max_ptime { + min += max_ptime; + max.opt_add_assign(max_ptime); + } else if is_live { + gst::warning!(CAT, imp: self, + "Aggregating packets in live mode, but no max_ptime configured. \ + Configured latency may be too low!" + ); + } + query.set(is_live, min, max); + } + } + _ => (), + } + + true + } + + fn start(&self) -> Result<(), gst::ErrorMessage> { + *self.state.borrow_mut() = State::default(); + *self.is_live.lock().unwrap() = None; + + self.parent_start() + } + + fn stop(&self) -> Result<(), gst::ErrorMessage> { + *self.state.borrow_mut() = State::default(); + *self.is_live.lock().unwrap() = None; + + self.parent_stop() + } +} + +#[derive(Debug, PartialEq)] +enum SendPacketMode { + WhenReady, + ForcePending, +} + +impl RtpMpeg4GenericPay { + fn send_packets( + &self, + settings: &Settings, + state: &mut State, + send_mode: SendPacketMode, + ) -> Result { + let agg_mode = self.effective_aggregate_mode(settings); + + if (self.obj().mtu() as usize) < state.min_mtu { + gst::error!(CAT, imp: self, "Insufficient mtu {} at least {} bytes needed", self.obj().mtu(), state.min_mtu); + return Err(gst::FlowError::Error); + } + + let max_payload_size = self.obj().max_payload_size() as usize - HEADERS_LEN_SIZE; + + let mut ctx = AuHeaderContext { + config: &state.mode, + prev_index: None, + }; + let mut headers_buf = SmallVec::<[u8; 10 * HEADER_MAX_LEN]>::new(); + let mut au_data_list = SmallVec::<[gst::MappedBuffer; 10]>::new(); + + // https://www.rfc-editor.org/rfc/rfc3640.html#section-3.1 + // The M bit is set to 1 to indicate that the RTP packet payload + // contains either the final fragment of a fragmented Access Unit + // or one or more complete Access Units. + + // Send out packets if there's enough data for one (or more), or if forced. + while let Some(front) = state.pending_aus.front() { + headers_buf.clear(); + ctx.prev_index = None; + + if front.buffer.len() + (state.max_header_bit_len + 7) / 8 > max_payload_size { + // AU needs to be fragmented + let au = state.pending_aus.pop_front().unwrap(); + let mut data = au.buffer.as_slice(); + state.pending_size = state.pending_size.saturating_sub(data.len()); + let mut next_frag_offset = 0; + let mut is_final = false; + + while !is_final { + let header = AuHeader { + // The size of the complete AU for all the fragments + size: Some(au.buffer.len() as u32), + // One AU fragment per packet + index: AccessUnitIndex::ZERO, + // CTS-delta SHOULD not be set for a fragment, see § 3.2.1.1 + dts_delta: au.dts_delta, + maybe_random_access: au.maybe_random_access, + ..Default::default() + }; + + headers_buf.clear(); + let mut w = BitWriter::endian(&mut headers_buf, BigEndian); + let mut res = w.build_with(&header, &ctx); + if res.is_ok() { + // add final padding + res = w.write(7, 0).map_err(Into::into); + } + if let Err(err) = res { + gst::error!(CAT, imp: self, "Failed to write header for AU {} in buffer {}: {err:#}", header.index, au.id); + return Err(gst::FlowError::Error); + } + + // Unfortunately BitWriter doesn't return the size written. + let mut c = BitCounter::::new(); + c.build_with(&header, &ctx).unwrap(); + let header_bit_len = c.written() as u16; + + let left = au.buffer.len() - next_frag_offset; + let bytes_in_this_packet = + std::cmp::min(left, max_payload_size - (header_bit_len as usize + 7) / 8); + + next_frag_offset += bytes_in_this_packet; + is_final = next_frag_offset >= au.buffer.len(); + + self.obj().queue_packet( + au.id.into(), + rtp_types::RtpPacketBuilder::new() + // AU-headers-length: only one 1 AU header here + .payload(header_bit_len.to_be_bytes().as_slice()) + .payload(headers_buf.as_slice()) + .payload(&data[0..bytes_in_this_packet]) + .marker_bit(is_final), + )?; + + data = &data[bytes_in_this_packet..]; + } + + continue; + } + + // Will not fragment this AU + + // We optimistically add average size/duration to send out packets as early as possible + // if we estimate that the next AU would likely overflow our accumulation limits. + let n_aus = state.pending_aus.len(); + let avg_size = state.pending_size / n_aus; + let avg_duration = state.pending_duration.opt_div(n_aus as u64); + + let max_ptime = settings + .max_ptime + .opt_min(state.max_ptime) + .opt_min(state.ptime); + + let is_ready = send_mode == SendPacketMode::ForcePending + || agg_mode != RtpMpeg4GenericPayAggregateMode::Aggregate + || state.pending_size + avg_size + n_aus * (state.max_header_bit_len + 7) / 8 + > max_payload_size + || state + .pending_duration + .opt_add(avg_duration) + .opt_gt(max_ptime) + .unwrap_or(false); + + gst::log!(CAT, imp: self, + "Pending: size {}, duration ~{:.3}, mode: {agg_mode:?} + {send_mode:?} => {}", + state.pending_size, + state.pending_duration.display(), + if is_ready { "ready" } else { "not ready, waiting for more data" }, + ); + + if !is_ready { + break; + } + + gst::trace!(CAT, imp: self, "Creating packet.."); + + let id = front.id; + let mut end_id = front.id; + + let mut acc_duration = gst::ClockTime::ZERO; + let mut acc_size = 0; + + let mut headers_len = 0; + + let mut w = BitWriter::endian(&mut headers_buf, BigEndian); + let mut index = AccessUnitIndex::ZERO; + let mut previous_pts = None; + + au_data_list.clear(); + + while let Some(front) = state.pending_aus.front() { + gst::trace!(CAT, imp: self, "{front:?}, accumulated size {acc_size} duration ~{acc_duration:.3}"); + + // If this AU would overflow the packet, bail out and send out what we have. + // + // Don't take into account the max_ptime for the first AU, since it could be + // lower than the AU duration in which case we would never payload anything. + // + // For the size check in bytes we know that the first AU will fit the mtu, + // because we already checked for the "AU needs to be fragmented" scenario above. + + let cts_delta = if ctx.prev_index.is_none() { + // No CTS-delta for the first AU in the packet + None + } else { + ct_delta_to_rtp(front.pts, previous_pts, state.clock_rate).and_then(|dts_delta_res| { + if dts_delta_res.is_none() { + gst::warning!(CAT, imp: self, "Overflow computing CTS-delta between pts {} & previous pts {}", + front.pts.display(), previous_pts.display(), + ); + } + + dts_delta_res + }) + }; + + previous_pts = front.pts; + + let header = AuHeader { + size: Some(front.buffer.len() as u32), + index, + cts_delta, + dts_delta: front.dts_delta, + maybe_random_access: front.maybe_random_access, + ..Default::default() + }; + + w.build_with(&header, &ctx).map_err(|err| { + gst::error!(CAT, imp: self, "Failed to write header for AU {} in buffer {}: {err:#}", + header.index, front.id, + ); + gst::FlowError::Error + })?; + + // Unfortunately BitWriter doesn't return the size written. + let mut c = BitCounter::::new(); + c.build_with(&header, &ctx).unwrap(); + let header_bit_len = c.written() as u16; + + if acc_size + ((headers_len + header_bit_len) as usize + 7) / 8 + front.buffer.len() + > max_payload_size + || (ctx.prev_index.is_some() + && max_ptime + .opt_lt(acc_duration.opt_add(front.duration)) + .unwrap_or(false)) + { + break; + } + + let au = state.pending_aus.pop_front().unwrap(); + + end_id = au.id; + acc_size += au.buffer.len(); + acc_duration.opt_add_assign(au.duration); + + state.pending_size -= au.buffer.len(); + state.pending_duration.opt_saturating_sub(au.duration); + + headers_len += header_bit_len; + au_data_list.push(au.buffer); + + ctx.prev_index = Some(index); + index += 1; + } + + // add final padding + if let Err(err) = w.write(7, 0) { + gst::error!(CAT, imp: self, "Failed to write padding for final AU {} in buffer {end_id}: {err}", + ctx.prev_index.expect("at least one AU"), + ); + return Err(gst::FlowError::Error); + } + + let headers_len = headers_len.to_be_bytes(); + debug_assert_eq!(headers_len.len(), 2); + + let mut packet = rtp_types::RtpPacketBuilder::new() + .marker_bit(true) + .payload(headers_len.as_slice()) + .payload(headers_buf.as_slice()); + + for au_data in &au_data_list { + packet = packet.payload(au_data.as_slice()); + } + + self.obj() + .queue_packet(PacketToBufferRelation::Ids(id..=end_id), packet)?; + } + + gst::log!(CAT, imp: self, "All done for now, {} pending AUs", state.pending_aus.len()); + + if send_mode == SendPacketMode::ForcePending { + self.obj().finish_pending_packets()?; + } + + Ok(gst::FlowSuccess::Ok) + } + + fn effective_aggregate_mode(&self, settings: &Settings) -> RtpMpeg4GenericPayAggregateMode { + match settings.aggregate_mode { + RtpMpeg4GenericPayAggregateMode::Auto => match self.is_live() { + Some(true) => RtpMpeg4GenericPayAggregateMode::ZeroLatency, + Some(false) => RtpMpeg4GenericPayAggregateMode::Aggregate, + None => RtpMpeg4GenericPayAggregateMode::ZeroLatency, + }, + mode => mode, + } + } + + fn is_live(&self) -> Option { + *self.is_live.lock().unwrap() + } + + // Query upstream live-ness if needed, in case of aggregate-mode=auto + fn ensure_upstream_liveness(&self, settings: &mut Settings) { + if settings.aggregate_mode != RtpMpeg4GenericPayAggregateMode::Auto + || self.is_live().is_some() + { + return; + } + + let mut q = gst::query::Latency::new(); + let is_live = if self.obj().sink_pad().peer_query(&mut q) { + let (is_live, _, _) = q.result(); + is_live + } else { + false + }; + + *self.is_live.lock().unwrap() = Some(is_live); + + gst::info!(CAT, imp: self, "Upstream is live: {is_live}"); + } +} diff --git a/net/rtp/src/mp4g/pay/mod.rs b/net/rtp/src/mp4g/pay/mod.rs new file mode 100644 index 00000000..25b94767 --- /dev/null +++ b/net/rtp/src/mp4g/pay/mod.rs @@ -0,0 +1,58 @@ +// GStreamer RTP MPEG-4 Generic Payloader +// +// Copyright (C) 2023-2024 François Laignel +// +// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at +// . +// +// SPDX-License-Identifier: MPL-2.0 + +use gst::glib; +use gst::prelude::*; + +pub mod imp; + +#[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Clone, Copy, glib::Enum)] +#[repr(i32)] +#[enum_type(name = "GstRtpMpeg4GenericPayAggregateMode")] +#[non_exhaustive] +pub(crate) enum RtpMpeg4GenericPayAggregateMode { + #[enum_value( + name = "Automatic: zero-latency if upstream is live, otherwise aggregate elementary streams until packet is full.", + nick = "auto" + )] + Auto = -1, + + #[enum_value( + name = "Zero Latency: always send out elementary streams right away, do not wait for more elementary streams to fill a packet.", + nick = "zero-latency" + )] + ZeroLatency = 0, + + #[enum_value( + name = "Aggregate: collect elementary streams until we have a full packet or the max-ptime limit is hit (if set).", + nick = "aggregate" + )] + Aggregate = 1, +} + +glib::wrapper! { + pub struct RtpMpeg4GenericPay(ObjectSubclass) + @extends crate::basepay::RtpBasePay2, gst::Element, gst::Object; +} + +pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { + #[cfg(feature = "doc")] + { + RtpMpeg4GenericPayAggregateMode::static_type() + .mark_as_plugin_api(gst::PluginAPIFlags::empty()); + } + + gst::Element::register( + Some(plugin), + "rtpmp4gpay2", + gst::Rank::MARGINAL, + RtpMpeg4GenericPay::static_type(), + ) +} diff --git a/net/rtp/src/mp4g/tests.rs b/net/rtp/src/mp4g/tests.rs new file mode 100644 index 00000000..b2020778 --- /dev/null +++ b/net/rtp/src/mp4g/tests.rs @@ -0,0 +1,527 @@ +// SPDX-License-Identifier: MPL-2.0 + +use crate::tests::{run_test_pipeline, ExpectedBuffer, ExpectedPacket, Source}; +use gst::prelude::*; + +fn init() { + use std::sync::Once; + static INIT: Once = Once::new(); + + INIT.call_once(|| { + gst::init().unwrap(); + crate::plugin_register_static().expect("rtpmp4g test"); + }); +} + +#[test] +fn aac_hbr_not_fragmented() { + init(); + + let src = + "audiotestsrc num-buffers=100 ! audio/x-raw,rate=48000,channels=2 ! fdkaacenc ! aacparse"; + let pay = "rtpmp4gpay2"; + let depay = "rtpmp4gdepay2"; + + let mut expected_pay = Vec::with_capacity(102); + for i in 0..102 { + let position = i * 1024; + + expected_pay.push(vec![ExpectedPacket::builder() + .pts(gst::ClockTime::from_nseconds( + position + .mul_div_floor(*gst::ClockTime::SECOND, 48_000) + .unwrap(), + )) + .flags(if i == 0 { + gst::BufferFlags::DISCONT | gst::BufferFlags::MARKER + } else { + gst::BufferFlags::MARKER + }) + .rtp_time((position & 0xffff_ffff) as u32) + .build()]); + } + + let mut expected_depay = Vec::with_capacity(102); + for i in 0..102 { + let position = i * 1024; + + expected_depay.push(vec![ExpectedBuffer::builder() + .pts(gst::ClockTime::from_nseconds( + position + .mul_div_floor(*gst::ClockTime::SECOND, 48_000) + .unwrap(), + )) + .flags(if i == 0 { + gst::BufferFlags::DISCONT + } else { + gst::BufferFlags::empty() + }) + .build()]); + } + + run_test_pipeline(Source::Bin(src), pay, depay, expected_pay, expected_depay); +} + +#[test] +fn aac_hbr_fragmented() { + init(); + + let src = + "audiotestsrc num-buffers=100 ! audio/x-raw,rate=48000,channels=1 ! fdkaacenc ! aacparse"; + let pay = "rtpmp4gpay2 mtu=288"; + let depay = "rtpmp4gdepay2"; + + let mut expected_pay = Vec::with_capacity(102); + for i in 0..102 { + let position = i * 1024; + + let pts = gst::ClockTime::from_nseconds( + position + .mul_div_floor(*gst::ClockTime::SECOND, 48_000) + .unwrap(), + ); + + let rtp_time = (position & 0xffff_ffff) as u32; + + expected_pay.push(vec![ + ExpectedPacket::builder() + .pts(pts) + .flags(if i == 0 { + gst::BufferFlags::DISCONT + } else { + gst::BufferFlags::empty() + }) + .rtp_time(rtp_time) + .marker_bit(false) + .build(), + ExpectedPacket::builder() + .pts(pts) + .flags(gst::BufferFlags::MARKER) + .rtp_time(rtp_time) + .build(), + ]); + } + + let mut expected_depay = Vec::with_capacity(102); + for i in 0..102 { + let position = i * 1024; + + expected_depay.push(vec![ExpectedBuffer::builder() + .pts(gst::ClockTime::from_nseconds( + position + .mul_div_floor(*gst::ClockTime::SECOND, 48_000) + .unwrap(), + )) + .flags(if i == 0 { + gst::BufferFlags::DISCONT + } else { + gst::BufferFlags::empty() + }) + .build()]); + } + + run_test_pipeline(Source::Bin(src), pay, depay, expected_pay, expected_depay); +} + +#[test] +fn generic_not_fragmented() { + const BUFFER_NB: usize = 4; + const BUFFER_SIZE: usize = 600; + const MTU: usize = 1400; + const PACKETS_PER_BUFFER: usize = MTU / BUFFER_SIZE; + const RTP_CLOCK_RATE: u64 = 90_000; + const FRAME_RATE: u64 = 30; + + init(); + + let codec_data = gst::Buffer::from_slice([0x00, 0x00, 0x01, 0xb0, 0x01]); + let caps = gst::Caps::builder("video/mpeg") + .field("mpegversion", 4i32) + .field("systemstream", false) + .field("codec_data", codec_data) + .build(); + + let pos_to_pts = |pos: usize| { + 1000.hours() + + (pos as u64) + .mul_div_floor(*gst::ClockTime::SECOND, FRAME_RATE) + .map(gst::ClockTime::from_nseconds) + .unwrap() + }; + + let pos_to_rtp = |pos: usize| { + ((pos as u64) + .mul_div_ceil(RTP_CLOCK_RATE, FRAME_RATE) + .unwrap() + & 0xffff_ffff) as u32 + }; + + let duration = + gst::ClockTime::from_nseconds(1.mul_div_ceil(*gst::ClockTime::SECOND, FRAME_RATE).unwrap()); + + let mut buffers = Vec::with_capacity(BUFFER_NB); + for pos in 0..BUFFER_NB { + let mut buffer = gst::Buffer::with_size(BUFFER_SIZE).unwrap(); + { + let buffer = buffer.get_mut().unwrap(); + let pts = pos_to_pts(pos); + buffer.set_pts(pts); + buffer.set_dts(match pos { + 0 => pts, + 1 | 2 => pos_to_pts(pos + 1), + 3 => pos_to_pts(pos - 2), + _ => unreachable!(), + }); + buffer.set_duration(duration); + if pos == 0 { + buffer.set_flags(gst::BufferFlags::DISCONT); + } else { + buffer.set_flags(gst::BufferFlags::DELTA_UNIT); + } + } + + buffers.push(buffer); + } + + let pay = format!("rtpmp4gpay2 mtu={MTU}"); + let depay = "rtpmp4gdepay2"; + + let mut expected_pay = Vec::with_capacity(BUFFER_NB); + for i in 0..PACKETS_PER_BUFFER { + expected_pay.push(vec![ExpectedPacket::builder() + .pts(pos_to_pts(i * PACKETS_PER_BUFFER)) + .flags(if i == 0 { + gst::BufferFlags::DISCONT | gst::BufferFlags::MARKER + } else { + gst::BufferFlags::MARKER + }) + .rtp_time(pos_to_rtp(i * PACKETS_PER_BUFFER)) + .build()]); + } + + let mut expected_depay = Vec::with_capacity(BUFFER_NB); + for i in 0..BUFFER_NB { + expected_depay.push(vec![ExpectedBuffer::builder() + .pts( + pos_to_pts(i) + + if i == 3 { + 11110.nseconds() + } else { + 0.nseconds() + }, + ) + .dts(match i { + 0 => pos_to_pts(0), + 1 => pos_to_pts(1 + 1), + 2 => pos_to_pts(2 + 1) + 11110.nseconds(), + 3 => pos_to_pts(3 - 2) + 11111.nseconds(), + _ => unreachable!(), + }) + .flags(if i == 0 { + gst::BufferFlags::DISCONT + } else { + gst::BufferFlags::DELTA_UNIT + }) + .build()]); + } + + run_test_pipeline( + Source::Buffers(caps, buffers), + &pay, + depay, + expected_pay, + expected_depay, + ); +} + +#[test] +fn generic_fragmented() { + const BUFFER_NB: usize = 4; + const BUFFER_SIZE: usize = 2000; + const MTU: usize = 1400; + // Enough overhead in the MTU to use this approximation: + const FRAGMENTS_PER_BUFFER: usize = (BUFFER_SIZE + MTU - 1) / MTU; + const RTP_CLOCK_RATE: u64 = 90_000; + const LAST_FRAGMENT: usize = FRAGMENTS_PER_BUFFER - 1; + const FRAME_RATE: u64 = 30; + + init(); + + let codec_data = gst::Buffer::from_slice([0x00, 0x00, 0x01, 0xb0, 0x01]); + let caps = gst::Caps::builder("video/mpeg") + .field("mpegversion", 4i32) + .field("systemstream", false) + .field("codec_data", codec_data) + .build(); + + let pos_to_pts = |pos: usize| { + 1000.hours() + + (pos as u64) + .mul_div_floor(*gst::ClockTime::SECOND, FRAME_RATE) + .map(gst::ClockTime::from_nseconds) + .unwrap() + }; + + let pos_to_rtp = |pos: usize| { + ((pos as u64) + .mul_div_ceil(RTP_CLOCK_RATE, FRAME_RATE) + .unwrap() + & 0xffff_ffff) as u32 + }; + + let duration = + gst::ClockTime::from_nseconds(1.mul_div_ceil(*gst::ClockTime::SECOND, FRAME_RATE).unwrap()); + + let mut buffers = Vec::with_capacity(BUFFER_NB); + for pos in 0..BUFFER_NB { + let mut buffer = gst::Buffer::with_size(BUFFER_SIZE).unwrap(); + { + let buffer = buffer.get_mut().unwrap(); + let pts = pos_to_pts(pos); + buffer.set_pts(pts); + buffer.set_dts(match pos { + 0 => pts, + 1 | 2 => pos_to_pts(pos + 1), + 3 => pos_to_pts(pos - 2), + _ => unreachable!(), + }); + buffer.set_duration(duration); + if pos == 0 { + buffer.set_flags(gst::BufferFlags::DISCONT); + } else { + buffer.set_flags(gst::BufferFlags::DELTA_UNIT); + } + } + + buffers.push(buffer); + } + + let pay = format!("rtpmp4gpay2 mtu={MTU}"); + let depay = "rtpmp4gdepay2"; + + let mut expected_pay = Vec::with_capacity(BUFFER_NB); + for i in 0..BUFFER_NB { + expected_pay.push({ + let mut packets = Vec::with_capacity(FRAGMENTS_PER_BUFFER); + for frag in 0..FRAGMENTS_PER_BUFFER { + packets.push( + ExpectedPacket::builder() + .pts(pos_to_pts(i)) + .flags(match (i, frag) { + (0, 0) => gst::BufferFlags::DISCONT, + (_, LAST_FRAGMENT) => gst::BufferFlags::MARKER, + _ => gst::BufferFlags::empty(), + }) + .rtp_time(pos_to_rtp(i)) + .marker_bit(frag == LAST_FRAGMENT) + .build(), + ); + } + + packets + }); + } + + let mut expected_depay = Vec::with_capacity(BUFFER_NB); + for i in 0..BUFFER_NB { + expected_depay.push(vec![ExpectedBuffer::builder() + .pts(pos_to_pts(i)) + .dts(match i { + 0 => pos_to_pts(0), + 1 => pos_to_pts(1 + 1), + 2 => pos_to_pts(2 + 1) + 11110.nseconds(), + 3 => pos_to_pts(3 - 2) + 1.nseconds(), + _ => unreachable!(), + }) + .size(BUFFER_SIZE) + .flags(if i == 0 { + gst::BufferFlags::DISCONT + } else { + gst::BufferFlags::DELTA_UNIT + }) + .build()]); + } + + run_test_pipeline( + Source::Buffers(caps, buffers), + &pay, + depay, + expected_pay, + expected_depay, + ); +} + +#[test] +fn generic_variable_au_size() { + const MTU: usize = 1400; + const AU_NB: usize = 5; + const SMALL_AU_SIZE: usize = 500; + const LARGE_AU_SIZE: usize = 2000; + const FRAGMENTS_PER_LARGE_BUFFER: usize = (LARGE_AU_SIZE + MTU - 1) / MTU; + const LAST_FRAGMENT: usize = FRAGMENTS_PER_LARGE_BUFFER - 1; + const RTP_CLOCK_RATE: u64 = 90_000; + const FRAME_RATE: u64 = 30; + + init(); + + let codec_data = gst::Buffer::from_slice([0x00, 0x00, 0x01, 0xb0, 0x01]); + let caps = gst::Caps::builder("video/mpeg") + .field("mpegversion", 4i32) + .field("systemstream", false) + .field("codec_data", codec_data) + .build(); + + let pos_to_pts = |pos: usize| { + 1000.hours() + + (pos as u64) + .mul_div_floor(*gst::ClockTime::SECOND, FRAME_RATE) + .map(gst::ClockTime::from_nseconds) + .unwrap() + }; + + let pos_to_rtp = |pos: usize| { + ((pos as u64) + .mul_div_ceil(RTP_CLOCK_RATE, FRAME_RATE) + .unwrap() + & 0xffff_ffff) as u32 + }; + + let duration = + gst::ClockTime::from_nseconds(1.mul_div_ceil(*gst::ClockTime::SECOND, FRAME_RATE).unwrap()); + + let is_large_au = |pos| pos % 4 == 0; + let au_size = |pos| { + if is_large_au(pos) { + LARGE_AU_SIZE + } else { + SMALL_AU_SIZE + } + }; + + let mut buffers = Vec::with_capacity(AU_NB); + for pos in 0..AU_NB { + let mut buffer = gst::Buffer::with_size(au_size(pos)).unwrap(); + { + let buffer = buffer.get_mut().unwrap(); + let pts = pos_to_pts(pos); + buffer.set_pts(pts); + buffer.set_dts(match pos % 4 { + 0 => pts, + 1 | 2 => pos_to_pts(pos + 1), + 3 => pos_to_pts(pos - 2), + _ => unreachable!(), + }); + buffer.set_duration(duration); + if pos == 0 { + buffer.set_flags(gst::BufferFlags::DISCONT); + } else { + buffer.set_flags(gst::BufferFlags::DELTA_UNIT); + } + } + + buffers.push(buffer); + } + + let pay = format!("rtpmp4gpay2 mtu={MTU}"); + let depay = "rtpmp4gdepay2"; + + let mut expected_pay = Vec::with_capacity(AU_NB); + let mut pending_size = 0; + let mut pending_packet = None; + for i in 0..AU_NB { + let size = au_size(i); + if size > MTU { + // Incoming AU to fragment + let mut packet_list = Vec::with_capacity(3); + + if let Some(pending) = pending_packet.take() { + // and there are pending AUs => push them first + packet_list.push(pending); + pending_size = 0; + } + + // Then push the fragments for current AU + for f in 0..FRAGMENTS_PER_LARGE_BUFFER { + packet_list.push( + ExpectedPacket::builder() + .pts(pos_to_pts(i)) + .flags(match (i, f) { + (0, 0) => gst::BufferFlags::DISCONT, + (_, 0) => gst::BufferFlags::empty(), + (_, LAST_FRAGMENT) => gst::BufferFlags::MARKER, + _ => unreachable!(), + }) + .rtp_time(pos_to_rtp(i)) + .marker_bit(f == LAST_FRAGMENT) + .build(), + ) + } + + expected_pay.push(packet_list); + } else { + let must_push = + if i + 1 < AU_NB && pending_size + size + au_size(i + 1) > MTU || i + 1 == AU_NB { + // Next will overflow => push now + // or last AU and not a fragmented one, will be pushed with time deadline + true + } else { + false + }; + + if must_push { + if let Some(pending) = pending_packet.take() { + expected_pay.push(vec![pending]); + pending_size = 0; + } else { + // Last AU + expected_pay.push(vec![ExpectedPacket::builder() + .pts(pos_to_pts(i)) + .flags(gst::BufferFlags::MARKER) + .rtp_time(pos_to_rtp(i)) + .build()]); + } + } else if pending_packet.is_none() { + // Wait for more payload + pending_packet = Some( + ExpectedPacket::builder() + .pts(pos_to_pts(i)) + .flags(gst::BufferFlags::MARKER) + .rtp_time(pos_to_rtp(i)) + .build(), + ); + pending_size = size; + } else { + // There's already a pending packet + pending_size += size; + } + } + } + + let mut expected_depay = Vec::with_capacity(AU_NB); + for i in 0..AU_NB { + expected_depay.push(vec![ExpectedBuffer::builder() + .pts(pos_to_pts(i)) + .dts(match i % 4 { + 0 => pos_to_pts(0), + 1 => pos_to_pts(1 + 1), + 2 => pos_to_pts(2 + 1) + 11110.nseconds(), + 3 => pos_to_pts(3 - 2) + 11111.nseconds(), + _ => unreachable!(), + }) + .size(au_size(i)) + .flags(if i == 0 { + gst::BufferFlags::DISCONT + } else { + gst::BufferFlags::DELTA_UNIT + }) + .build()]); + } + + run_test_pipeline( + Source::Buffers(caps, buffers), + &pay, + depay, + expected_pay, + expected_depay, + ); +} diff --git a/net/rtp/src/utils.rs b/net/rtp/src/utils.rs index c6edda95..a1ff9a68 100644 --- a/net/rtp/src/utils.rs +++ b/net/rtp/src/utils.rs @@ -15,6 +15,7 @@ pub fn seqnum_distance(seqnum1: u16, seqnum2: u16) -> i16 { /// # Panic /// /// Panics if `bit_len` > 32. +#[inline] pub fn raw_2_comp_to_i32(val: u32, bit_len: u8) -> i32 { assert!(bit_len <= 32); @@ -26,6 +27,37 @@ pub fn raw_2_comp_to_i32(val: u32, bit_len: u8) -> i32 { } } +/// Masks the provided `i32` value to be used as a two's complement of len `bit_len`, +/// so the resulting value can be passed to APIs which check the bit range. +/// +/// Returns `None` the `i32` value exceeds the range of a two's complement +/// of len `bit_len`. +/// +/// # Panic +/// +/// Panics if `bit_len` > 32. +#[inline] +pub fn mask_valid_2_comp(val: i32, bit_len: u8) -> Option { + let bit_len = bit_len as u32; + + if bit_len == i32::BITS { + return Some(val); + } + + assert!(bit_len < i32::BITS); + + let overhead = i32::BITS - bit_len; + + let leading_zeros = val.leading_zeros(); + if leading_zeros > 0 && leading_zeros < overhead + || leading_zeros == 0 && val.leading_ones() < overhead + { + return None; + } + + Some(((1 << bit_len) - 1) & val) +} + /// Defines a comparable new type `$typ` on a `[std::num::Wrapping]::`. /// /// The new type will wrap-around on additions and substractions and it comparison @@ -332,6 +364,34 @@ mod tests { assert_eq!(raw_2_comp_to_i32(0x8000_0000, BITS), i32::MIN); } + #[test] + fn mask_valid_2_comp_ok() { + const BITS: u8 = i32::BITS as u8; + assert_eq!(mask_valid_2_comp(0, BITS), Some(0)); + assert_eq!(mask_valid_2_comp(-1, BITS), Some(-1)); + assert_eq!(mask_valid_2_comp(i32::MIN, BITS), Some(i32::MIN)); + assert_eq!(mask_valid_2_comp(i32::MAX, BITS), Some(i32::MAX)); + + assert_eq!(mask_valid_2_comp(0, 6), Some(0)); + assert_eq!(mask_valid_2_comp(0x2f, 6), Some(0x2f)); // -1i6 + assert_eq!(mask_valid_2_comp(0x20, 6), Some(0x20)); // i6::MIN + assert_eq!(mask_valid_2_comp(0x1f, 6), Some(0x1f)); // i6::MAX + assert_eq!(mask_valid_2_comp(0x1f, 5), Some(0x1f)); // i6::MAX => -1i5 + } + + #[test] + fn mask_valid_2_comp_ko() { + const BITS: u8 = i32::BITS as u8; + assert_eq!(mask_valid_2_comp(0, BITS), Some(0)); + assert_eq!(mask_valid_2_comp(-1, BITS), Some(-1)); + assert_eq!(mask_valid_2_comp(i32::MIN, BITS), Some(i32::MIN)); + assert_eq!(mask_valid_2_comp(i32::MAX, BITS), Some(i32::MAX)); + + assert_eq!(mask_valid_2_comp(0, 5), Some(0)); + assert!(mask_valid_2_comp(0x2f, 5).is_none()); // -1i6 + assert!(mask_valid_2_comp(0x20, 5).is_none()); // i6::MIN + } + #[test] fn wrapping_u32_basics() { let zero = MyWrapper::ZERO;