rtp: add mp4apay

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1551>
This commit is contained in:
François Laignel 2024-04-24 14:29:14 +02:00 committed by GStreamer Marge Bot
parent 812fe0a9bd
commit 5466cafc24
6 changed files with 469 additions and 1 deletions

View file

@ -6951,6 +6951,32 @@
},
"rank": "marginal"
},
"rtpmp4apay2": {
"author": "François Laignel <francois centricular com>",
"description": "Payload an MPEG-4 Audio bitstream (e.g. AAC) into RTP packets (RFC 3016)",
"hierarchy": [
"GstRtpMpeg4AudioPay",
"GstRtpBasePay2",
"GstElement",
"GstObject",
"GInitiallyUnowned",
"GObject"
],
"klass": "Codec/Payloader/Network/RTP",
"pad-templates": {
"sink": {
"caps": "audio/mpeg:\n mpegversion: 4\n framed: true\n stream-format: raw\n",
"direction": "sink",
"presence": "always"
},
"src": {
"caps": "application/x-rtp:\n media: audio\n payload: [ 96, 127 ]\n clock-rate: [ 1, 2147483647 ]\n encoding-name: MP4A-LATM\n",
"direction": "src",
"presence": "always"
}
},
"rank": "marginal"
},
"rtppcmadepay2": {
"author": "Sebastian Dröge <sebastian@centricular.com>",
"description": "Depayload A-law from RTP packets (RFC 3551)",

View file

@ -57,6 +57,7 @@ fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
mp2t::pay::register(plugin)?;
mp4a::depay::register(plugin)?;
mp4a::pay::register(plugin)?;
pcmau::depay::register(plugin)?;
pcmau::pay::register(plugin)?;

View file

@ -4,4 +4,8 @@ const DEFAULT_CLOCK_RATE: u32 = 90000;
const ENCODING_NAME: &str = "MP4A-LATM";
pub mod depay;
mod parsers;
pub mod parsers;
pub mod pay;
#[cfg(test)]
mod tests;

288
net/rtp/src/mp4a/pay/imp.rs Normal file
View file

@ -0,0 +1,288 @@
// GStreamer RTP MPEG-4 Audio Payloader
//
// Copyright (C) 2023 François Laignel <francois centricular com>
//
// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
// If a copy of the MPL was not distributed with this file, You can obtain one at
// <https://mozilla.org/MPL/2.0/>.
//
// SPDX-License-Identifier: MPL-2.0
/**
* SECTION:element-rtpmp4apay2
* @see_also: rtpmp4apay2, rtpmp4apay, fdkaacenc
*
* Payload an MPEG-4 Audio bitstream into RTP packets as per [RFC 3016][rfc-3016].
* Also see the [IANA media-type page for MPEG-4 Advanced Audio Coding][iana-aac].
*
* [rfc-3016]: https://www.rfc-editor.org/rfc/rfc3016.html#section-4
* [iana-aac]: https://www.iana.org/assignments/media-types/audio/aac
*
* ## Example pipeline
*
* |[
* gst-launch-1.0 audiotestsrc ! fdkaacenc ! rtpmp4apay2 ! udpsink host=127.0.0.1 port=5004
* ]| This will encode an audio test signal to AAC and then payload the encoded audio
* into RTP packets and send them out via UDP to localhost (IPv4) port 5004.
* You can use the #rtpmp4adepay2 or #rtpmp4adepay elements to depayload such a stream, and
* the #fdkaacdec element to decode the depayloaded stream.
*
* Since: plugins-rs-0.13.0
*/
use bitstream_io::{BigEndian, BitRead, BitReader, BitWrite, BitWriter};
use once_cell::sync::Lazy;
use smallvec::SmallVec;
use gst::{glib, subclass::prelude::*};
use crate::basepay::{RtpBasePay2Ext, RtpBasePay2Impl};
use crate::mp4a::parsers::AudioSpecificConfig;
use crate::mp4a::ENCODING_NAME;
#[derive(Default)]
pub struct RtpMpeg4AudioPay;
static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
gst::DebugCategory::new(
"rtpmp4apay2",
gst::DebugColorFlags::empty(),
Some("RTP MPEG-4 Audio Payloader"),
)
});
#[glib::object_subclass]
impl ObjectSubclass for RtpMpeg4AudioPay {
const NAME: &'static str = "GstRtpMpeg4AudioPay";
type Type = super::RtpMpeg4AudioPay;
type ParentType = crate::basepay::RtpBasePay2;
}
impl ObjectImpl for RtpMpeg4AudioPay {}
impl GstObjectImpl for RtpMpeg4AudioPay {}
impl ElementImpl for RtpMpeg4AudioPay {
fn metadata() -> Option<&'static gst::subclass::ElementMetadata> {
static ELEMENT_METADATA: Lazy<gst::subclass::ElementMetadata> = Lazy::new(|| {
gst::subclass::ElementMetadata::new(
"RTP MPEG-4 Audio Payloader",
"Codec/Payloader/Network/RTP",
"Payload an MPEG-4 Audio bitstream (e.g. AAC) into RTP packets (RFC 3016)",
"François Laignel <francois centricular com>",
)
});
Some(&*ELEMENT_METADATA)
}
fn pad_templates() -> &'static [gst::PadTemplate] {
static PAD_TEMPLATES: Lazy<Vec<gst::PadTemplate>> = Lazy::new(|| {
let sink_pad_template = gst::PadTemplate::new(
"sink",
gst::PadDirection::Sink,
gst::PadPresence::Always,
&gst::Caps::builder("audio/mpeg")
.field("mpegversion", 4i32)
.field("framed", true)
.field("stream-format", "raw")
.build(),
)
.unwrap();
let src_pad_template = gst::PadTemplate::new(
"src",
gst::PadDirection::Src,
gst::PadPresence::Always,
&gst::Caps::builder("application/x-rtp")
.field("media", "audio")
.field("payload", gst::IntRange::new(96i32, 127i32))
.field("clock-rate", gst::IntRange::new(1i32, i32::MAX))
.field("encoding-name", ENCODING_NAME)
/* All optional parameters
*
* "profile-level-id=[1,MAX]"
* "cpresent="
* "config="
*/
.build(),
)
.unwrap();
vec![src_pad_template, sink_pad_template]
});
PAD_TEMPLATES.as_ref()
}
}
#[derive(Debug)]
struct ConfigWithCodecData {
audio_config: AudioSpecificConfig,
config_data: SmallVec<[u8; 4]>,
}
impl ConfigWithCodecData {
fn from_codec_data(s: &gst::StructureRef) -> anyhow::Result<ConfigWithCodecData> {
use anyhow::Context;
let codec_data = s
.get::<gst::Buffer>("codec_data")
.context("codec_data field")?;
let codec_data_ref = codec_data.map_readable().context("mapping codec_data")?;
if codec_data_ref.size() != 2 {
anyhow::bail!("Unsupported size {} for codec_data", codec_data_ref.size());
}
let mut r = BitReader::endian(codec_data_ref.as_slice(), BigEndian);
let audio_config = r.parse::<AudioSpecificConfig>()?;
let mut config_data = SmallVec::new();
let mut w = BitWriter::endian(&mut config_data, BigEndian);
// StreamMuxConfig - ISO/IEC 14496-3 sub 1 table 1.21
// audioMuxVersion == 0 (1 bit)
// allStreamsSameTimeFraming == 1 (1 bit)
// numSubFrames == 0 means 1 subframe (6 bits)
// numProgram == 0 means 1 program (4 bits)
// numLayer == 0 means 1 layer (3 bits)
w.write(1, 0).unwrap();
w.write_bit(true).unwrap();
w.write(13, 0).unwrap();
// 1 bit missing for byte alignment
// Append AudioSpecificConfig for prog 1 layer 1 (from codec_data)
for byte in codec_data_ref.as_slice() {
w.write(8, *byte).context("appending codec_data")?
}
// Padding
w.write(7, 0).unwrap();
Ok(ConfigWithCodecData {
audio_config,
config_data,
})
}
}
impl RtpBasePay2Impl for RtpMpeg4AudioPay {
const ALLOWED_META_TAGS: &'static [&'static str] = &["audio"];
fn set_sink_caps(&self, caps: &gst::Caps) -> bool {
let s = caps.structure(0).unwrap();
let (config, config_data) = match ConfigWithCodecData::from_codec_data(s) {
Ok(c) => (c.audio_config, c.config_data),
Err(err) => {
gst::error!(CAT, imp: self, "Unusable codec_data: {err:#}");
return false;
}
};
let rate = if let Ok(rate) = s.get::<i32>("rate") {
rate
} else {
config.sampling_freq as i32
};
self.obj().set_src_caps(
&gst::Caps::builder("application/x-rtp")
.field("media", "audio")
.field("encoding-name", ENCODING_NAME)
.field("clock-rate", rate)
.field("profile-level-id", config.audio_object_type)
.field("cpresent", 0)
.field("config", hex::encode(config_data))
.build(),
);
true
}
// Encapsulation of MPEG-4 Audio bitstream:
// https://www.rfc-editor.org/rfc/rfc3016.html#section-4
//
// We either put 1 whole AAC frame into a single RTP packet,
// or fragment a single AAC frame over multiple RTP packets.
//
fn handle_buffer(
&self,
buffer: &gst::Buffer,
id: u64,
) -> Result<gst::FlowSuccess, gst::FlowError> {
if buffer.size() == 0 {
gst::info!(CAT, imp: self, "Dropping empty buffer {id}");
self.obj().drop_buffers(..=id);
return Ok(gst::FlowSuccess::Ok);
}
let Ok(buffer_ref) = buffer.map_readable() else {
gst::error!(CAT, imp: self, "Failed to map buffer {id} readable");
return Err(gst::FlowError::Error);
};
let max_payload_size = self.obj().max_payload_size() as usize;
let mut size_prefix = SmallVec::<[u8; 3]>::new();
let mut rem_size = buffer_ref.size();
while rem_size > 0xff {
size_prefix.push(0xff);
rem_size >>= 8;
}
size_prefix.push(rem_size as u8);
if max_payload_size < size_prefix.len() {
gst::error!(
CAT,
imp: self,
"Insufficient max-payload-size {} for buffer {id} at least {} bytes needed",
self.obj().max_payload_size(),
size_prefix.len() + 1,
);
self.obj().drop_buffers(..=id);
return Err(gst::FlowError::Error);
}
let mut rem_data = buffer_ref.as_slice();
let mut is_first = true;
while !rem_data.is_empty() {
let mut packet = rtp_types::RtpPacketBuilder::new();
let chunk_size = if is_first {
packet = packet.payload(size_prefix.as_slice());
std::cmp::min(rem_data.len(), max_payload_size - size_prefix.len())
} else {
std::cmp::min(rem_data.len(), max_payload_size)
};
let payload = &rem_data[..chunk_size];
rem_data = &rem_data[chunk_size..];
// The marker bit indicates audioMuxElement boundaries.
// It is set to one to indicate that the RTP packet contains a complete
// audioMuxElement or the last fragment of an audioMuxElement.
let marker = rem_data.is_empty();
gst::log!(CAT, imp: self, "Queuing {}packet with size {} for {}buffer {id}",
if marker { "marked " } else { "" },
payload.len(),
if !marker || !is_first { "fragmented " } else { "" },
);
self.obj()
.queue_packet(id.into(), packet.payload(payload).marker_bit(marker))?;
is_first = false;
}
self.obj().finish_pending_packets()?;
Ok(gst::FlowSuccess::Ok)
}
}

View file

@ -0,0 +1,28 @@
// GStreamer RTP MPEG-4 Audio Payloader
//
// Copyright (C) 2023 François Laignel <francois centricular com>
//
// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
// If a copy of the MPL was not distributed with this file, You can obtain one at
// <https://mozilla.org/MPL/2.0/>.
//
// SPDX-License-Identifier: MPL-2.0
use gst::glib;
use gst::prelude::*;
pub mod imp;
glib::wrapper! {
pub struct RtpMpeg4AudioPay(ObjectSubclass<imp::RtpMpeg4AudioPay>)
@extends crate::basepay::RtpBasePay2, gst::Element, gst::Object;
}
pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
gst::Element::register(
Some(plugin),
"rtpmp4apay2",
gst::Rank::MARGINAL,
RtpMpeg4AudioPay::static_type(),
)
}

121
net/rtp/src/mp4a/tests.rs Normal file
View file

@ -0,0 +1,121 @@
// SPDX-License-Identifier: MPL-2.0
use crate::tests::{run_test_pipeline, ExpectedBuffer, ExpectedPacket, Source};
use gst::prelude::*;
fn init() {
use std::sync::Once;
static INIT: Once = Once::new();
INIT.call_once(|| {
gst::init().unwrap();
crate::plugin_register_static().expect("rtpmp4a test");
});
}
#[test]
fn mp4a_one_frame_per_packet() {
init();
let src = "audiotestsrc num-buffers=100 ! audio/x-raw,rate=48000,channels=2 ! fdkaacenc";
let pay = "rtpmp4apay2";
let depay = "rtpmp4adepay2";
let mut expected_pay = Vec::with_capacity(102);
for i in 0..102 {
let position = i * 1024;
expected_pay.push(vec![ExpectedPacket::builder()
.pts(gst::ClockTime::from_nseconds(
position
.mul_div_floor(*gst::ClockTime::SECOND, 48_000)
.unwrap(),
))
.flags(if i == 0 {
gst::BufferFlags::DISCONT | gst::BufferFlags::MARKER
} else {
gst::BufferFlags::MARKER
})
.rtp_time((position & 0xffff_ffff) as u32)
.build()]);
}
let mut expected_depay = Vec::with_capacity(101);
for i in 0..101 {
let position = (i + 1) * 1024;
expected_depay.push(vec![ExpectedBuffer::builder()
.pts(gst::ClockTime::from_nseconds(
position
.mul_div_floor(*gst::ClockTime::SECOND, 48_000)
.unwrap(),
))
.flags(if i == 0 {
gst::BufferFlags::DISCONT
} else {
gst::BufferFlags::empty()
})
.build()]);
}
run_test_pipeline(Source::Bin(src), pay, depay, expected_pay, expected_depay);
}
#[test]
fn mp4a_fragmented() {
init();
let src = "audiotestsrc num-buffers=100 ! audio/x-raw,rate=48000,channels=1 ! fdkaacenc";
let pay = "rtpmp4apay2 mtu=288";
let depay = "rtpmp4adepay2";
let mut expected_pay = Vec::with_capacity(102);
for i in 0..102 {
let position = i * 1024;
let pts = gst::ClockTime::from_nseconds(
position
.mul_div_floor(*gst::ClockTime::SECOND, 48_000)
.unwrap(),
);
let rtp_time = (position & 0xffff_ffff) as u32;
expected_pay.push(vec![
ExpectedPacket::builder()
.pts(pts)
.flags(if i == 0 {
gst::BufferFlags::DISCONT
} else {
gst::BufferFlags::empty()
})
.rtp_time(rtp_time)
.marker_bit(false)
.build(),
ExpectedPacket::builder()
.pts(pts)
.flags(gst::BufferFlags::MARKER)
.rtp_time(rtp_time)
.build(),
]);
}
let mut expected_depay = Vec::with_capacity(101);
for i in 0..101 {
let position = (i + 1) * 1024;
expected_depay.push(vec![ExpectedBuffer::builder()
.pts(gst::ClockTime::from_nseconds(
position
.mul_div_floor(*gst::ClockTime::SECOND, 48_000)
.unwrap(),
))
.flags(if i == 0 {
gst::BufferFlags::DISCONT
} else {
gst::BufferFlags::empty()
})
.build()]);
}
run_test_pipeline(Source::Bin(src), pay, depay, expected_pay, expected_depay);
}