diff --git a/docs/plugins/gst_plugins_cache.json b/docs/plugins/gst_plugins_cache.json index f2822a00..1f0fe67d 100644 --- a/docs/plugins/gst_plugins_cache.json +++ b/docs/plugins/gst_plugins_cache.json @@ -6951,6 +6951,32 @@ }, "rank": "marginal" }, + "rtpmp4apay2": { + "author": "François Laignel ", + "description": "Payload an MPEG-4 Audio bitstream (e.g. AAC) into RTP packets (RFC 3016)", + "hierarchy": [ + "GstRtpMpeg4AudioPay", + "GstRtpBasePay2", + "GstElement", + "GstObject", + "GInitiallyUnowned", + "GObject" + ], + "klass": "Codec/Payloader/Network/RTP", + "pad-templates": { + "sink": { + "caps": "audio/mpeg:\n mpegversion: 4\n framed: true\n stream-format: raw\n", + "direction": "sink", + "presence": "always" + }, + "src": { + "caps": "application/x-rtp:\n media: audio\n payload: [ 96, 127 ]\n clock-rate: [ 1, 2147483647 ]\n encoding-name: MP4A-LATM\n", + "direction": "src", + "presence": "always" + } + }, + "rank": "marginal" + }, "rtppcmadepay2": { "author": "Sebastian Dröge ", "description": "Depayload A-law from RTP packets (RFC 3551)", diff --git a/net/rtp/src/lib.rs b/net/rtp/src/lib.rs index ce789c15..1cc0a161 100644 --- a/net/rtp/src/lib.rs +++ b/net/rtp/src/lib.rs @@ -57,6 +57,7 @@ fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { mp2t::pay::register(plugin)?; mp4a::depay::register(plugin)?; + mp4a::pay::register(plugin)?; pcmau::depay::register(plugin)?; pcmau::pay::register(plugin)?; diff --git a/net/rtp/src/mp4a/mod.rs b/net/rtp/src/mp4a/mod.rs index 4c2f493a..db213c6b 100644 --- a/net/rtp/src/mp4a/mod.rs +++ b/net/rtp/src/mp4a/mod.rs @@ -4,4 +4,8 @@ const DEFAULT_CLOCK_RATE: u32 = 90000; const ENCODING_NAME: &str = "MP4A-LATM"; pub mod depay; -mod parsers; +pub mod parsers; +pub mod pay; + +#[cfg(test)] +mod tests; diff --git a/net/rtp/src/mp4a/pay/imp.rs b/net/rtp/src/mp4a/pay/imp.rs new file mode 100644 index 00000000..bc332b54 --- /dev/null +++ b/net/rtp/src/mp4a/pay/imp.rs @@ -0,0 +1,288 @@ +// GStreamer RTP MPEG-4 Audio Payloader +// +// Copyright (C) 2023 François Laignel +// +// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at +// . +// +// SPDX-License-Identifier: MPL-2.0 + +/** + * SECTION:element-rtpmp4apay2 + * @see_also: rtpmp4apay2, rtpmp4apay, fdkaacenc + * + * Payload an MPEG-4 Audio bitstream into RTP packets as per [RFC 3016][rfc-3016]. + * Also see the [IANA media-type page for MPEG-4 Advanced Audio Coding][iana-aac]. + * + * [rfc-3016]: https://www.rfc-editor.org/rfc/rfc3016.html#section-4 + * [iana-aac]: https://www.iana.org/assignments/media-types/audio/aac + * + * ## Example pipeline + * + * |[ + * gst-launch-1.0 audiotestsrc ! fdkaacenc ! rtpmp4apay2 ! udpsink host=127.0.0.1 port=5004 + * ]| This will encode an audio test signal to AAC and then payload the encoded audio + * into RTP packets and send them out via UDP to localhost (IPv4) port 5004. + * You can use the #rtpmp4adepay2 or #rtpmp4adepay elements to depayload such a stream, and + * the #fdkaacdec element to decode the depayloaded stream. + * + * Since: plugins-rs-0.13.0 + */ +use bitstream_io::{BigEndian, BitRead, BitReader, BitWrite, BitWriter}; +use once_cell::sync::Lazy; +use smallvec::SmallVec; + +use gst::{glib, subclass::prelude::*}; + +use crate::basepay::{RtpBasePay2Ext, RtpBasePay2Impl}; + +use crate::mp4a::parsers::AudioSpecificConfig; +use crate::mp4a::ENCODING_NAME; + +#[derive(Default)] +pub struct RtpMpeg4AudioPay; + +static CAT: Lazy = Lazy::new(|| { + gst::DebugCategory::new( + "rtpmp4apay2", + gst::DebugColorFlags::empty(), + Some("RTP MPEG-4 Audio Payloader"), + ) +}); + +#[glib::object_subclass] +impl ObjectSubclass for RtpMpeg4AudioPay { + const NAME: &'static str = "GstRtpMpeg4AudioPay"; + type Type = super::RtpMpeg4AudioPay; + type ParentType = crate::basepay::RtpBasePay2; +} + +impl ObjectImpl for RtpMpeg4AudioPay {} +impl GstObjectImpl for RtpMpeg4AudioPay {} + +impl ElementImpl for RtpMpeg4AudioPay { + fn metadata() -> Option<&'static gst::subclass::ElementMetadata> { + static ELEMENT_METADATA: Lazy = Lazy::new(|| { + gst::subclass::ElementMetadata::new( + "RTP MPEG-4 Audio Payloader", + "Codec/Payloader/Network/RTP", + "Payload an MPEG-4 Audio bitstream (e.g. AAC) into RTP packets (RFC 3016)", + "François Laignel ", + ) + }); + + Some(&*ELEMENT_METADATA) + } + + fn pad_templates() -> &'static [gst::PadTemplate] { + static PAD_TEMPLATES: Lazy> = Lazy::new(|| { + let sink_pad_template = gst::PadTemplate::new( + "sink", + gst::PadDirection::Sink, + gst::PadPresence::Always, + &gst::Caps::builder("audio/mpeg") + .field("mpegversion", 4i32) + .field("framed", true) + .field("stream-format", "raw") + .build(), + ) + .unwrap(); + + let src_pad_template = gst::PadTemplate::new( + "src", + gst::PadDirection::Src, + gst::PadPresence::Always, + &gst::Caps::builder("application/x-rtp") + .field("media", "audio") + .field("payload", gst::IntRange::new(96i32, 127i32)) + .field("clock-rate", gst::IntRange::new(1i32, i32::MAX)) + .field("encoding-name", ENCODING_NAME) + /* All optional parameters + * + * "profile-level-id=[1,MAX]" + * "cpresent=" + * "config=" + */ + .build(), + ) + .unwrap(); + + vec![src_pad_template, sink_pad_template] + }); + + PAD_TEMPLATES.as_ref() + } +} + +#[derive(Debug)] +struct ConfigWithCodecData { + audio_config: AudioSpecificConfig, + config_data: SmallVec<[u8; 4]>, +} + +impl ConfigWithCodecData { + fn from_codec_data(s: &gst::StructureRef) -> anyhow::Result { + use anyhow::Context; + + let codec_data = s + .get::("codec_data") + .context("codec_data field")?; + let codec_data_ref = codec_data.map_readable().context("mapping codec_data")?; + + if codec_data_ref.size() != 2 { + anyhow::bail!("Unsupported size {} for codec_data", codec_data_ref.size()); + } + + let mut r = BitReader::endian(codec_data_ref.as_slice(), BigEndian); + let audio_config = r.parse::()?; + + let mut config_data = SmallVec::new(); + let mut w = BitWriter::endian(&mut config_data, BigEndian); + + // StreamMuxConfig - ISO/IEC 14496-3 sub 1 table 1.21 + // audioMuxVersion == 0 (1 bit) + // allStreamsSameTimeFraming == 1 (1 bit) + // numSubFrames == 0 means 1 subframe (6 bits) + // numProgram == 0 means 1 program (4 bits) + // numLayer == 0 means 1 layer (3 bits) + + w.write(1, 0).unwrap(); + w.write_bit(true).unwrap(); + w.write(13, 0).unwrap(); + // 1 bit missing for byte alignment + + // Append AudioSpecificConfig for prog 1 layer 1 (from codec_data) + for byte in codec_data_ref.as_slice() { + w.write(8, *byte).context("appending codec_data")? + } + + // Padding + w.write(7, 0).unwrap(); + + Ok(ConfigWithCodecData { + audio_config, + config_data, + }) + } +} + +impl RtpBasePay2Impl for RtpMpeg4AudioPay { + const ALLOWED_META_TAGS: &'static [&'static str] = &["audio"]; + + fn set_sink_caps(&self, caps: &gst::Caps) -> bool { + let s = caps.structure(0).unwrap(); + + let (config, config_data) = match ConfigWithCodecData::from_codec_data(s) { + Ok(c) => (c.audio_config, c.config_data), + Err(err) => { + gst::error!(CAT, imp: self, "Unusable codec_data: {err:#}"); + return false; + } + }; + + let rate = if let Ok(rate) = s.get::("rate") { + rate + } else { + config.sampling_freq as i32 + }; + + self.obj().set_src_caps( + &gst::Caps::builder("application/x-rtp") + .field("media", "audio") + .field("encoding-name", ENCODING_NAME) + .field("clock-rate", rate) + .field("profile-level-id", config.audio_object_type) + .field("cpresent", 0) + .field("config", hex::encode(config_data)) + .build(), + ); + + true + } + + // Encapsulation of MPEG-4 Audio bitstream: + // https://www.rfc-editor.org/rfc/rfc3016.html#section-4 + // + // We either put 1 whole AAC frame into a single RTP packet, + // or fragment a single AAC frame over multiple RTP packets. + // + fn handle_buffer( + &self, + buffer: &gst::Buffer, + id: u64, + ) -> Result { + if buffer.size() == 0 { + gst::info!(CAT, imp: self, "Dropping empty buffer {id}"); + self.obj().drop_buffers(..=id); + + return Ok(gst::FlowSuccess::Ok); + } + + let Ok(buffer_ref) = buffer.map_readable() else { + gst::error!(CAT, imp: self, "Failed to map buffer {id} readable"); + + return Err(gst::FlowError::Error); + }; + + let max_payload_size = self.obj().max_payload_size() as usize; + + let mut size_prefix = SmallVec::<[u8; 3]>::new(); + let mut rem_size = buffer_ref.size(); + while rem_size > 0xff { + size_prefix.push(0xff); + rem_size >>= 8; + } + size_prefix.push(rem_size as u8); + + if max_payload_size < size_prefix.len() { + gst::error!( + CAT, + imp: self, + "Insufficient max-payload-size {} for buffer {id} at least {} bytes needed", + self.obj().max_payload_size(), + size_prefix.len() + 1, + ); + self.obj().drop_buffers(..=id); + + return Err(gst::FlowError::Error); + } + + let mut rem_data = buffer_ref.as_slice(); + let mut is_first = true; + while !rem_data.is_empty() { + let mut packet = rtp_types::RtpPacketBuilder::new(); + + let chunk_size = if is_first { + packet = packet.payload(size_prefix.as_slice()); + + std::cmp::min(rem_data.len(), max_payload_size - size_prefix.len()) + } else { + std::cmp::min(rem_data.len(), max_payload_size) + }; + + let payload = &rem_data[..chunk_size]; + rem_data = &rem_data[chunk_size..]; + + // The marker bit indicates audioMuxElement boundaries. + // It is set to one to indicate that the RTP packet contains a complete + // audioMuxElement or the last fragment of an audioMuxElement. + let marker = rem_data.is_empty(); + + gst::log!(CAT, imp: self, "Queuing {}packet with size {} for {}buffer {id}", + if marker { "marked " } else { "" }, + payload.len(), + if !marker || !is_first { "fragmented " } else { "" }, + ); + + self.obj() + .queue_packet(id.into(), packet.payload(payload).marker_bit(marker))?; + + is_first = false; + } + + self.obj().finish_pending_packets()?; + + Ok(gst::FlowSuccess::Ok) + } +} diff --git a/net/rtp/src/mp4a/pay/mod.rs b/net/rtp/src/mp4a/pay/mod.rs new file mode 100644 index 00000000..ee3b512c --- /dev/null +++ b/net/rtp/src/mp4a/pay/mod.rs @@ -0,0 +1,28 @@ +// GStreamer RTP MPEG-4 Audio Payloader +// +// Copyright (C) 2023 François Laignel +// +// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at +// . +// +// SPDX-License-Identifier: MPL-2.0 + +use gst::glib; +use gst::prelude::*; + +pub mod imp; + +glib::wrapper! { + pub struct RtpMpeg4AudioPay(ObjectSubclass) + @extends crate::basepay::RtpBasePay2, gst::Element, gst::Object; +} + +pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { + gst::Element::register( + Some(plugin), + "rtpmp4apay2", + gst::Rank::MARGINAL, + RtpMpeg4AudioPay::static_type(), + ) +} diff --git a/net/rtp/src/mp4a/tests.rs b/net/rtp/src/mp4a/tests.rs new file mode 100644 index 00000000..cc24e078 --- /dev/null +++ b/net/rtp/src/mp4a/tests.rs @@ -0,0 +1,121 @@ +// SPDX-License-Identifier: MPL-2.0 + +use crate::tests::{run_test_pipeline, ExpectedBuffer, ExpectedPacket, Source}; +use gst::prelude::*; + +fn init() { + use std::sync::Once; + static INIT: Once = Once::new(); + + INIT.call_once(|| { + gst::init().unwrap(); + crate::plugin_register_static().expect("rtpmp4a test"); + }); +} + +#[test] +fn mp4a_one_frame_per_packet() { + init(); + + let src = "audiotestsrc num-buffers=100 ! audio/x-raw,rate=48000,channels=2 ! fdkaacenc"; + let pay = "rtpmp4apay2"; + let depay = "rtpmp4adepay2"; + + let mut expected_pay = Vec::with_capacity(102); + for i in 0..102 { + let position = i * 1024; + + expected_pay.push(vec![ExpectedPacket::builder() + .pts(gst::ClockTime::from_nseconds( + position + .mul_div_floor(*gst::ClockTime::SECOND, 48_000) + .unwrap(), + )) + .flags(if i == 0 { + gst::BufferFlags::DISCONT | gst::BufferFlags::MARKER + } else { + gst::BufferFlags::MARKER + }) + .rtp_time((position & 0xffff_ffff) as u32) + .build()]); + } + + let mut expected_depay = Vec::with_capacity(101); + for i in 0..101 { + let position = (i + 1) * 1024; + + expected_depay.push(vec![ExpectedBuffer::builder() + .pts(gst::ClockTime::from_nseconds( + position + .mul_div_floor(*gst::ClockTime::SECOND, 48_000) + .unwrap(), + )) + .flags(if i == 0 { + gst::BufferFlags::DISCONT + } else { + gst::BufferFlags::empty() + }) + .build()]); + } + + run_test_pipeline(Source::Bin(src), pay, depay, expected_pay, expected_depay); +} + +#[test] +fn mp4a_fragmented() { + init(); + + let src = "audiotestsrc num-buffers=100 ! audio/x-raw,rate=48000,channels=1 ! fdkaacenc"; + let pay = "rtpmp4apay2 mtu=288"; + let depay = "rtpmp4adepay2"; + + let mut expected_pay = Vec::with_capacity(102); + for i in 0..102 { + let position = i * 1024; + + let pts = gst::ClockTime::from_nseconds( + position + .mul_div_floor(*gst::ClockTime::SECOND, 48_000) + .unwrap(), + ); + let rtp_time = (position & 0xffff_ffff) as u32; + + expected_pay.push(vec![ + ExpectedPacket::builder() + .pts(pts) + .flags(if i == 0 { + gst::BufferFlags::DISCONT + } else { + gst::BufferFlags::empty() + }) + .rtp_time(rtp_time) + .marker_bit(false) + .build(), + ExpectedPacket::builder() + .pts(pts) + .flags(gst::BufferFlags::MARKER) + .rtp_time(rtp_time) + .build(), + ]); + } + + let mut expected_depay = Vec::with_capacity(101); + for i in 0..101 { + let position = (i + 1) * 1024; + + expected_depay.push(vec![ExpectedBuffer::builder() + .pts(gst::ClockTime::from_nseconds( + position + .mul_div_floor(*gst::ClockTime::SECOND, 48_000) + .unwrap(), + )) + .flags(if i == 0 { + gst::BufferFlags::DISCONT + } else { + gst::BufferFlags::empty() + }) + .build()]); + } + + run_test_pipeline(Source::Bin(src), pay, depay, expected_pay, expected_depay); +}