rtp: Add AMR NB/WB RTP payloader/depayloader

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/2016>
This commit is contained in:
Sebastian Dröge 2023-12-25 13:14:34 +02:00 committed by GStreamer Marge Bot
parent e6921da4cb
commit 81ff664666
11 changed files with 2355 additions and 0 deletions

View file

@ -9738,6 +9738,114 @@
},
"rank": "marginal"
},
"rtpamrdepay2": {
"author": "Sebastian Dröge <sebastian@centricular.com>",
"description": "Depayload an AMR audio stream from RTP packets (RFC 3267)",
"hierarchy": [
"GstRtpAmrDepay2",
"GstRtpBaseDepay2",
"GstElement",
"GstObject",
"GInitiallyUnowned",
"GObject"
],
"klass": "Codec/Depayloader/Network/RTP",
"pad-templates": {
"sink": {
"caps": "application/x-rtp:\n media: audio\n encoding-name: AMR\n clock-rate: 8000\napplication/x-rtp:\n media: audio\n encoding-name: AMR-WB\n clock-rate: 16000\n",
"direction": "sink",
"presence": "always"
},
"src": {
"caps": "audio/AMR:\n channels: 1\n rate: 8000\naudio/AMR-WB:\n channels: 1\n rate: 16000\n",
"direction": "src",
"presence": "always"
}
},
"rank": "marginal"
},
"rtpamrpay2": {
"author": "Sebastian Dröge <sebastian@centricular.com>",
"description": "Payload an AMR audio stream into RTP packets (RFC 3267)",
"hierarchy": [
"GstRtpAmrPay2",
"GstRtpBasePay2",
"GstElement",
"GstObject",
"GInitiallyUnowned",
"GObject"
],
"klass": "Codec/Payloader/Network/RTP",
"pad-templates": {
"sink": {
"caps": "audio/AMR:\n channels: 1\n rate: 8000\naudio/AMR-WB:\n channels: 1\n rate: 16000\n",
"direction": "sink",
"presence": "always"
},
"src": {
"caps": "application/x-rtp:\n media: audio\n encoding-name: AMR\n clock-rate: 8000\nencoding-params: 1\n octet-align: { (string)0, (string)1 }\n crc: 0\n robust-sorting: 0\n interleaving: 0\napplication/x-rtp:\n media: audio\n encoding-name: AMR-WB\n clock-rate: 16000\nencoding-params: 1\n octet-align: { (string)0, (string)1 }\n crc: 0\n robust-sorting: 0\n interleaving: 0\n",
"direction": "src",
"presence": "always"
}
},
"properties": {
"aggregate-mode": {
"blurb": "Whether to send out audio frames immediately or aggregate them until a packet is full.",
"conditionally-available": false,
"construct": false,
"construct-only": false,
"controllable": false,
"default": "auto (-1)",
"mutable": "null",
"readable": true,
"type": "GstRtpAmrPayAggregateMode",
"writable": true
},
"alignment-threshold": {
"blurb": "Timestamp alignment threshold in nanoseconds",
"conditionally-available": false,
"construct": false,
"construct-only": false,
"controllable": false,
"default": "40000000",
"max": "18446744073709551615",
"min": "0",
"mutable": "playing",
"readable": true,
"type": "guint64",
"writable": true
},
"discont-wait": {
"blurb": "Window of time in nanoseconds to wait before creating a discontinuity",
"conditionally-available": false,
"construct": false,
"construct-only": false,
"controllable": false,
"default": "1000000000",
"max": "18446744073709551615",
"min": "0",
"mutable": "playing",
"readable": true,
"type": "guint64",
"writable": true
},
"max-ptime": {
"blurb": "Maximum duration of the packet data in ns (-1 = unlimited up to MTU)",
"conditionally-available": false,
"construct": false,
"construct-only": false,
"controllable": false,
"default": "18446744073709551615",
"max": "9223372036854775807",
"min": "-1",
"mutable": "playing",
"readable": true,
"type": "gint64",
"writable": true
}
},
"rank": "marginal"
},
"rtpav1depay": {
"author": "Vivienne Watermeier <vwatermeier@igalia.com>",
"description": "Depayload AV1 from RTP packets",
@ -10899,6 +11007,26 @@
}
]
},
"GstRtpAmrPayAggregateMode": {
"kind": "enum",
"values": [
{
"desc": "Automatic: zero-latency if upstream is live, otherwise aggregate frames until packet is full.",
"name": "auto",
"value": "-1"
},
{
"desc": "Zero Latency: always send out frames right away, do not wait for more frames to fill a packet.",
"name": "zero-latency",
"value": "0"
},
{
"desc": "Aggregate: collect audio frames until we have a full packet or the max-ptime limit is hit (if set).",
"name": "aggregate",
"value": "1"
}
]
},
"GstRtpBaseAudioPay2": {
"hierarchy": [
"GstRtpBaseAudioPay2",

View file

@ -0,0 +1,371 @@
// Copyright (C) 2023 Sebastian Dröge <sebastian@centricular.com>
//
// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
// If a copy of the MPL was not distributed with this file, You can obtain one at
// <https://mozilla.org/MPL/2.0/>.
//
// SPDX-License-Identifier: MPL-2.0
use std::io;
use atomic_refcell::AtomicRefCell;
use bitstream_io::{BigEndian, BitRead as _, BitReader, ByteRead as _, ByteReader};
/**
* SECTION:element-rtpamrdepay2
* @see_also: rtpamrpay2, rtpamrpay, rtpamrdepay, amrnbdec, amrnbenc, amrwbdec, voamrwbenc
*
* Extracts an AMR audio stream from RTP packets as per [RFC 3267][rfc-3267].
*
* [rfc-3267]: https://datatracker.ietf.org/doc/html/rfc3267
*
* ## Example pipeline
*
* |[
* gst-launch-1.0 udpsrc caps='application/x-rtp, media=audio, clock-rate=8000, encoding-name=AMR, octet-align=(string)1' ! rtpjitterbuffer latency=50 ! rtpamrdepay2 ! amrnbdec ! audioconvert ! audioresample ! autoaudiosink
* ]| This will depayload an incoming RTP AMR NB audio stream. You can use the #amrnbenc and
* #rtpamrpay2 elements to create such an RTP stream.
*
* Since: 0.14
*/
use gst::{glib, subclass::prelude::*};
use std::sync::LazyLock;
use crate::{
amr::payload_header::{
PayloadConfiguration, PayloadHeader, NB_FRAME_SIZES, NB_FRAME_SIZES_BYTES, WB_FRAME_SIZES,
WB_FRAME_SIZES_BYTES,
},
basedepay::{RtpBaseDepay2Ext, RtpBaseDepay2Impl},
};
#[derive(Default)]
struct State {
wide_band: bool,
has_crc: bool,
bandwidth_efficient: bool,
}
#[derive(Default)]
pub struct RtpAmrDepay {
state: AtomicRefCell<State>,
}
static CAT: LazyLock<gst::DebugCategory> = LazyLock::new(|| {
gst::DebugCategory::new(
"rtpamrdepay2",
gst::DebugColorFlags::empty(),
Some("RTP AMR Depayloader"),
)
});
#[glib::object_subclass]
impl ObjectSubclass for RtpAmrDepay {
const NAME: &'static str = "GstRtpAmrDepay2";
type Type = super::RtpAmrDepay;
type ParentType = crate::basedepay::RtpBaseDepay2;
}
impl ObjectImpl for RtpAmrDepay {}
impl GstObjectImpl for RtpAmrDepay {}
impl ElementImpl for RtpAmrDepay {
fn metadata() -> Option<&'static gst::subclass::ElementMetadata> {
static ELEMENT_METADATA: LazyLock<gst::subclass::ElementMetadata> = LazyLock::new(|| {
gst::subclass::ElementMetadata::new(
"RTP AMR Depayloader",
"Codec/Depayloader/Network/RTP",
"Depayload an AMR audio stream from RTP packets (RFC 3267)",
"Sebastian Dröge <sebastian@centricular.com>",
)
});
Some(&*ELEMENT_METADATA)
}
fn pad_templates() -> &'static [gst::PadTemplate] {
static PAD_TEMPLATES: LazyLock<Vec<gst::PadTemplate>> = LazyLock::new(|| {
let sink_pad_template = gst::PadTemplate::new(
"sink",
gst::PadDirection::Sink,
gst::PadPresence::Always,
&gst::Caps::builder_full()
.structure(
gst::Structure::builder("application/x-rtp")
.field("media", "audio")
.field("encoding-name", "AMR")
.field("clock-rate", 8_000i32)
.build(),
)
.structure(
gst::Structure::builder("application/x-rtp")
.field("media", "audio")
.field("encoding-name", "AMR-WB")
.field("clock-rate", 16_000i32)
.build(),
)
.build(),
)
.unwrap();
let src_pad_template = gst::PadTemplate::new(
"src",
gst::PadDirection::Src,
gst::PadPresence::Always,
&gst::Caps::builder_full()
.structure(
gst::Structure::builder("audio/AMR")
.field("channels", 1i32)
.field("rate", 8_000i32)
.build(),
)
.structure(
gst::Structure::builder("audio/AMR-WB")
.field("channels", 1i32)
.field("rate", 16_000i32)
.build(),
)
.build(),
)
.unwrap();
vec![src_pad_template, sink_pad_template]
});
PAD_TEMPLATES.as_ref()
}
}
impl RtpBaseDepay2Impl for RtpAmrDepay {
const ALLOWED_META_TAGS: &'static [&'static str] = &["audio"];
fn start(&self) -> Result<(), gst::ErrorMessage> {
let mut state = self.state.borrow_mut();
*state = State::default();
Ok(())
}
fn stop(&self) -> Result<(), gst::ErrorMessage> {
let mut state = self.state.borrow_mut();
*state = State::default();
Ok(())
}
fn set_sink_caps(&self, caps: &gst::Caps) -> bool {
let s = caps.structure(0).unwrap();
let encoding_name = s.get::<&str>("encoding-name").unwrap();
// We currently only support
//
// octet-align={"0", "1" }, default is "0"
// robust-sorting="0", default
// interleaving="0", default
// encoding-params="1", (channels), default
// crc={"0", "1"}, default "0"
if s.get::<&str>("robust-sorting")
.ok()
.map_or(false, |s| s != "0")
{
gst::error!(CAT, imp = self, "Only robust-sorting=0 supported");
return false;
}
if s.get::<&str>("interleaving")
.ok()
.map_or(false, |s| s != "0")
{
gst::error!(CAT, imp = self, "Only interleaving=0 supported");
return false;
}
if s.get::<&str>("encoding-params")
.ok()
.map_or(false, |s| s != "1")
{
gst::error!(CAT, imp = self, "Only encoding-params=1 supported");
return false;
}
let mut state = self.state.borrow_mut();
let has_crc = s.get::<&str>("crc").ok().map_or(false, |s| s != "0");
let bandwidth_efficient = s.get::<&str>("octet-align").ok().map_or(true, |s| s != "1");
if bandwidth_efficient && has_crc {
gst::error!(
CAT,
imp = self,
"CRC not supported in bandwidth-efficient mode"
);
return false;
}
let wide_band = match encoding_name {
"AMR" => false,
"AMR-WB" => true,
_ => unreachable!(),
};
state.has_crc = has_crc;
state.wide_band = wide_band;
state.bandwidth_efficient = bandwidth_efficient;
let src_caps = gst::Caps::builder(if wide_band {
"audio/AMR-WB"
} else {
"audio/AMR"
})
.field("channels", 1i32)
.field("rate", if wide_band { 16_000i32 } else { 8_000i32 })
.build();
self.obj().set_src_caps(&src_caps);
true
}
fn handle_packet(
&self,
packet: &crate::basedepay::Packet,
) -> Result<gst::FlowSuccess, gst::FlowError> {
let payload = packet.payload();
let state = self.state.borrow();
let payload_configuration = PayloadConfiguration {
has_crc: state.has_crc,
wide_band: state.wide_band,
};
let mut out_data;
let mut num_packets = 0;
let mut cursor = io::Cursor::new(payload);
if state.bandwidth_efficient {
let frame_sizes = if state.wide_band {
WB_FRAME_SIZES.as_slice()
} else {
NB_FRAME_SIZES.as_slice()
};
let mut r = BitReader::endian(&mut cursor, BigEndian);
let payload_header = match r.parse_with::<PayloadHeader>(&payload_configuration) {
Ok(payload_header) => payload_header,
Err(err) => {
gst::error!(CAT, imp = self, "Failed parsing payload header: {err}");
self.obj().drop_packet(packet);
return Ok(gst::FlowSuccess::Ok);
}
};
gst::trace!(CAT, imp = self, "Parsed payload header {payload_header:?}");
out_data = Vec::with_capacity(payload_header.buffer_size(state.wide_band));
'entries: for toc_entry in &payload_header.toc_entries {
let prev_len = out_data.len();
out_data.push(toc_entry.frame_header());
if let Some(&frame_size) = frame_sizes.get(toc_entry.frame_type as usize) {
let mut frame_size = frame_size as u32;
while frame_size > 8 {
match r.read_to::<u8>() {
Ok(b) => {
out_data.push(b);
}
Err(_) => {
gst::warning!(CAT, imp = self, "Short packet");
out_data.truncate(prev_len);
break 'entries;
}
}
frame_size -= 8;
}
if frame_size > 0 {
match r.read::<u8>(frame_size) {
Ok(b) => {
out_data.push(b << (8 - frame_size));
}
Err(_) => {
gst::warning!(CAT, imp = self, "Short packet");
out_data.truncate(prev_len);
break 'entries;
}
}
}
}
num_packets += 1;
}
} else {
let frame_sizes = if state.wide_band {
WB_FRAME_SIZES_BYTES.as_slice()
} else {
NB_FRAME_SIZES_BYTES.as_slice()
};
let mut r = ByteReader::endian(&mut cursor, BigEndian);
let payload_header = match r.parse_with::<PayloadHeader>(&payload_configuration) {
Ok(payload_header) => payload_header,
Err(err) => {
gst::error!(CAT, imp = self, "Failed parsing payload header: {err}");
self.obj().drop_packet(packet);
return Ok(gst::FlowSuccess::Ok);
}
};
gst::trace!(CAT, imp = self, "Parsed payload header {payload_header:?}");
out_data = Vec::with_capacity(payload_header.buffer_size(state.wide_band));
let payload_start = cursor.position() as usize;
let mut data = &payload[payload_start..];
for toc_entry in &payload_header.toc_entries {
if let Some(&frame_size) = frame_sizes.get(toc_entry.frame_type as usize) {
let frame_size = frame_size as usize;
if data.len() < frame_size {
gst::warning!(CAT, imp = self, "Short packet");
break;
}
out_data.push(toc_entry.frame_header());
out_data.extend_from_slice(&data[..frame_size]);
data = &data[frame_size..];
}
num_packets += 1;
}
}
gst::trace!(
CAT,
imp = self,
"Finishing buffer of {} bytes with {num_packets} packets",
out_data.len()
);
if !out_data.is_empty() {
let mut outbuf = gst::Buffer::from_mut_slice(out_data);
{
let outbuf = outbuf.get_mut().unwrap();
outbuf.set_duration(gst::ClockTime::from_mseconds(20) * num_packets);
if packet.marker_bit() {
outbuf.set_flags(gst::BufferFlags::RESYNC);
}
}
self.obj().queue_buffer(packet.into(), outbuf)?;
}
Ok(gst::FlowSuccess::Ok)
}
}

View file

@ -0,0 +1,26 @@
// Copyright (C) 2023 Sebastian Dröge <sebastian@centricular.com>
//
// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
// If a copy of the MPL was not distributed with this file, You can obtain one at
// <https://mozilla.org/MPL/2.0/>.
//
// SPDX-License-Identifier: MPL-2.0
use gst::glib;
use gst::prelude::*;
pub mod imp;
glib::wrapper! {
pub struct RtpAmrDepay(ObjectSubclass<imp::RtpAmrDepay>)
@extends crate::basedepay::RtpBaseDepay2, gst::Element, gst::Object;
}
pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
gst::Element::register(
Some(plugin),
"rtpamrdepay2",
gst::Rank::MARGINAL,
RtpAmrDepay::static_type(),
)
}

13
net/rtp/src/amr/mod.rs Normal file
View file

@ -0,0 +1,13 @@
// Copyright (C) 2023 Sebastian Dröge <sebastian@centricular.com>
//
// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
// If a copy of the MPL was not distributed with this file, You can obtain one at
// <https://mozilla.org/MPL/2.0/>.
//
// SPDX-License-Identifier: MPL-2.0
pub mod depay;
pub mod pay;
mod payload_header;
#[cfg(test)]
mod tests;

866
net/rtp/src/amr/pay/imp.rs Normal file
View file

@ -0,0 +1,866 @@
// Copyright (C) 2023 Sebastian Dröge <sebastian@centricular.com>
//
// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
// If a copy of the MPL was not distributed with this file, You can obtain one at
// <https://mozilla.org/MPL/2.0/>.
//
// SPDX-License-Identifier: MPL-2.0
use std::{collections::VecDeque, sync::Mutex};
use anyhow::anyhow;
/**
* SECTION:element-rtpamrpay2
* @see_also: rtpamrdepay2, rtpamrpay, rtpamrdepay, amrnbdec, amrnbenc, amrwbdec, voamrwbenc
*
* Payloads an AMR audio stream into RTP packets as per [RFC 3267][rfc-3267].
*
* [rfc-3267]: https://datatracker.ietf.org/doc/html/rfc3267
*
* ## Example pipeline
*
* |[
* gst-launch-1.0 audiotestsrc wave=ticks ! amrnbenc ! rtpamrpay2 ! udpsink host=127.0.0.1 port=5004
* ]| This will encode an audio test signal as AMR NB audio and payload it as RTP and send it out
* over UDP to localhost port 5004.
*
* Since: 0.14
*/
use atomic_refcell::AtomicRefCell;
use bitstream_io::{BigEndian, BitWrite as _, BitWriter, ByteWrite, ByteWriter};
use gst::{glib, prelude::*, subclass::prelude::*};
use smallvec::SmallVec;
use std::sync::LazyLock;
use crate::{
amr::payload_header::{
PayloadConfiguration, PayloadHeader, TocEntry, NB_FRAME_SIZES, NB_FRAME_SIZES_BYTES,
WB_FRAME_SIZES, WB_FRAME_SIZES_BYTES,
},
audio_discont::{AudioDiscont, AudioDiscontConfiguration},
basepay::{
PacketToBufferRelation, RtpBasePay2Ext, RtpBasePay2Impl, RtpBasePay2ImplExt,
TimestampOffset,
},
};
struct QueuedBuffer {
/// ID of the buffer.
id: u64,
/// The mapped buffer itself.
buffer: gst::MappedBuffer<gst::buffer::Readable>,
/// Number of frames in this buffer.
num_frames: usize,
/// Offset (in frames) into the buffer if some frames were consumed already.
offset: usize,
}
#[derive(Default)]
struct State {
/// AMR NB or WB?
wide_band: bool,
/// Whether octet-align is set or not
bandwidth_efficient: bool,
/// Currently queued buffers
queued_buffers: VecDeque<QueuedBuffer>,
/// Queued bytes
queued_bytes: usize,
/// Queued frames
queued_frames: usize,
/// Full queued frames, including already forwarded frames.
full_queued_frames: usize,
/// Desired "packet time", i.e. packet duration, from the caps, if set.
ptime: Option<gst::ClockTime>,
max_ptime: Option<gst::ClockTime>,
audio_discont: AudioDiscont,
}
#[derive(Clone)]
struct Settings {
max_ptime: Option<gst::ClockTime>,
aggregate_mode: super::AggregateMode,
audio_discont: AudioDiscontConfiguration,
}
#[derive(Default)]
pub struct RtpAmrPay {
state: AtomicRefCell<State>,
settings: Mutex<Settings>,
is_live: Mutex<Option<bool>>,
}
impl Default for Settings {
fn default() -> Self {
Settings {
max_ptime: None,
aggregate_mode: super::AggregateMode::Auto,
audio_discont: AudioDiscontConfiguration::default(),
}
}
}
static CAT: LazyLock<gst::DebugCategory> = LazyLock::new(|| {
gst::DebugCategory::new(
"rtpamrpay2",
gst::DebugColorFlags::empty(),
Some("RTP AMR Payloader"),
)
});
#[glib::object_subclass]
impl ObjectSubclass for RtpAmrPay {
const NAME: &'static str = "GstRtpAmrPay2";
type Type = super::RtpAmrPay;
type ParentType = crate::basepay::RtpBasePay2;
}
impl ObjectImpl for RtpAmrPay {
fn properties() -> &'static [glib::ParamSpec] {
static PROPERTIES: LazyLock<Vec<glib::ParamSpec>> = LazyLock::new(|| {
let mut properties = vec![
glib::ParamSpecEnum::builder_with_default("aggregate-mode", Settings::default().aggregate_mode)
.nick("Aggregate Mode")
.blurb("Whether to send out audio frames immediately or aggregate them until a packet is full.")
.build(),
// Using same type/semantics as C payloaders
glib::ParamSpecInt64::builder("max-ptime")
.nick("Maximum Packet Time")
.blurb("Maximum duration of the packet data in ns (-1 = unlimited up to MTU)")
.default_value(
Settings::default()
.max_ptime
.map(gst::ClockTime::nseconds)
.map(|x| x as i64)
.unwrap_or(-1),
)
.minimum(-1)
.maximum(i64::MAX)
.mutable_playing()
.build(),
];
properties.extend_from_slice(&AudioDiscontConfiguration::create_pspecs());
properties
});
PROPERTIES.as_ref()
}
fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) {
if self
.settings
.lock()
.unwrap()
.audio_discont
.set_property(value, pspec)
{
return;
}
match pspec.name() {
"aggregate-mode" => {
self.settings.lock().unwrap().aggregate_mode = value.get().unwrap();
}
"max-ptime" => {
let v = value.get::<i64>().unwrap();
self.settings.lock().unwrap().max_ptime =
(v != -1).then_some(gst::ClockTime::from_nseconds(v as u64));
}
_ => unimplemented!(),
};
}
fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
if let Some(value) = self.settings.lock().unwrap().audio_discont.property(pspec) {
return value;
}
match pspec.name() {
"aggregate-mode" => self.settings.lock().unwrap().aggregate_mode.to_value(),
"max-ptime" => (self
.settings
.lock()
.unwrap()
.max_ptime
.map(gst::ClockTime::nseconds)
.map(|x| x as i64)
.unwrap_or(-1))
.to_value(),
_ => unimplemented!(),
}
}
}
impl GstObjectImpl for RtpAmrPay {}
impl ElementImpl for RtpAmrPay {
fn metadata() -> Option<&'static gst::subclass::ElementMetadata> {
static ELEMENT_METADATA: LazyLock<gst::subclass::ElementMetadata> = LazyLock::new(|| {
gst::subclass::ElementMetadata::new(
"RTP AMR Payloader",
"Codec/Payloader/Network/RTP",
"Payload an AMR audio stream into RTP packets (RFC 3267)",
"Sebastian Dröge <sebastian@centricular.com>",
)
});
Some(&*ELEMENT_METADATA)
}
fn pad_templates() -> &'static [gst::PadTemplate] {
static PAD_TEMPLATES: LazyLock<Vec<gst::PadTemplate>> = LazyLock::new(|| {
let sink_pad_template = gst::PadTemplate::new(
"sink",
gst::PadDirection::Sink,
gst::PadPresence::Always,
&gst::Caps::builder_full()
.structure(
gst::Structure::builder("audio/AMR")
.field("channels", 1i32)
.field("rate", 8_000i32)
.build(),
)
.structure(
gst::Structure::builder("audio/AMR-WB")
.field("channels", 1i32)
.field("rate", 16_000i32)
.build(),
)
.build(),
)
.unwrap();
let src_pad_template = gst::PadTemplate::new(
"src",
gst::PadDirection::Src,
gst::PadPresence::Always,
&gst::Caps::builder_full()
.structure(
gst::Structure::builder("application/x-rtp")
.field("media", "audio")
.field("encoding-name", "AMR")
.field("clock-rate", 8_000i32)
.field("encoding-params", "1")
.field("octet-align", gst::List::new(["0", "1"]))
.field("crc", "0")
.field("robust-sorting", "0")
.field("interleaving", "0")
.build(),
)
.structure(
gst::Structure::builder("application/x-rtp")
.field("media", "audio")
.field("encoding-name", "AMR-WB")
.field("clock-rate", 16_000i32)
.field("encoding-params", "1")
.field("octet-align", gst::List::new(["0", "1"]))
.field("crc", "0")
.field("robust-sorting", "0")
.field("interleaving", "0")
.build(),
)
.build(),
)
.unwrap();
vec![src_pad_template, sink_pad_template]
});
PAD_TEMPLATES.as_ref()
}
}
impl RtpBasePay2Impl for RtpAmrPay {
const ALLOWED_META_TAGS: &'static [&'static str] = &["audio"];
fn start(&self) -> Result<(), gst::ErrorMessage> {
*self.state.borrow_mut() = State::default();
Ok(())
}
fn stop(&self) -> Result<(), gst::ErrorMessage> {
*self.state.borrow_mut() = State::default();
Ok(())
}
fn set_sink_caps(&self, caps: &gst::Caps) -> bool {
let s = caps.structure(0).unwrap();
let wide_band = s.name() == "audio/AMR-WB";
let src_templ_caps = self.obj().src_pad().pad_template_caps();
let src_caps = src_templ_caps
.iter()
.find(|s| {
(s.get::<&str>("encoding-name") == Ok("AMR") && !wide_band)
|| (s.get::<&str>("encoding-name") == Ok("AMR-WB") && wide_band)
})
.map(|s| gst::Caps::from(s.to_owned()))
.unwrap();
gst::debug!(CAT, imp = self, "Setting caps {src_caps:?}");
self.obj().set_src_caps(&src_caps);
let mut state = self.state.borrow_mut();
state.wide_band = wide_band;
true
}
fn negotiate(&self, mut src_caps: gst::Caps) {
src_caps.truncate();
// Prefer octet-aligned streams.
{
let src_caps = src_caps.make_mut();
let s = src_caps.structure_mut(0).unwrap();
s.fixate_field_str("octet-align", "1");
}
// Fixate as the first step
src_caps.fixate();
let s = src_caps.structure(0).unwrap();
let bandwidth_efficient = s.get::<&str>("octet-align") != Ok("1");
let ptime = s
.get::<u32>("ptime")
.ok()
.map(u64::from)
.map(gst::ClockTime::from_mseconds);
let max_ptime = s
.get::<u32>("maxptime")
.ok()
.map(u64::from)
.map(gst::ClockTime::from_mseconds);
self.parent_negotiate(src_caps);
let mut state = self.state.borrow_mut();
state.bandwidth_efficient = bandwidth_efficient;
state.ptime = ptime;
state.max_ptime = max_ptime;
drop(state);
}
fn drain(&self) -> Result<gst::FlowSuccess, gst::FlowError> {
let settings = self.settings.lock().unwrap().clone();
let mut state = self.state.borrow_mut();
self.drain_packets(&settings, &mut state, true)
}
fn flush(&self) {
let mut state = self.state.borrow_mut();
state.queued_buffers.clear();
state.queued_bytes = 0;
state.queued_frames = 0;
state.full_queued_frames = 0;
state.audio_discont.reset();
}
fn handle_buffer(
&self,
buffer: &gst::Buffer,
id: u64,
) -> Result<gst::FlowSuccess, gst::FlowError> {
let settings = self.settings.lock().unwrap().clone();
let mut state = self.state.borrow_mut();
let buffer = buffer.clone().into_mapped_buffer_readable().map_err(|_| {
gst::error!(CAT, imp = self, "Can't map buffer readable");
gst::FlowError::Error
})?;
let pts = buffer.buffer().pts().unwrap();
let mut num_frames = 0;
let iter = AmrIter {
data: &buffer,
wide_band: state.wide_band,
};
for item in iter {
if let Err(err) = item {
gst::error!(CAT, imp = self, "Invalid AMR buffer: {err}");
return Err(gst::FlowError::Error);
}
num_frames += 1;
}
let rate = if state.wide_band { 16_000 } else { 8_000 };
let num_samples = num_frames * if state.wide_band { 320 } else { 160 };
let discont = state.audio_discont.process_input(
&settings.audio_discont,
buffer.buffer().flags().contains(gst::BufferFlags::DISCONT),
rate,
pts,
num_samples,
);
if discont {
if state.audio_discont.base_pts().is_some() {
gst::debug!(CAT, imp = self, "Draining because of discontinuity");
self.drain_packets(&settings, &mut state, true)?;
}
state.audio_discont.resync(pts, num_samples);
}
state.queued_bytes += buffer.buffer().size();
state.queued_frames += num_frames;
state.full_queued_frames += num_frames;
state.queued_buffers.push_back(QueuedBuffer {
id,
buffer,
num_frames,
offset: 0,
});
// Make sure we have queried upstream liveness if needed
if settings.aggregate_mode == super::AggregateMode::Auto {
self.ensure_upstream_liveness(&settings);
}
self.drain_packets(&settings, &mut state, false)
}
#[allow(clippy::single_match)]
fn sink_query(&self, query: &mut gst::QueryRef) -> bool {
match query.view_mut() {
gst::QueryViewMut::Caps(query) => {
let src_tmpl_caps = self.obj().src_pad().pad_template_caps();
let peer_caps = self.obj().src_pad().peer_query_caps(Some(&src_tmpl_caps));
if peer_caps.is_empty() {
query.set_result(&peer_caps);
return true;
}
let rtp_amr_nb_caps = gst::Caps::builder("application/x-rtp")
.field("encoding-name", "AMR")
.build();
let rtp_amr_wb_caps = gst::Caps::builder("application/x-rtp")
.field("encoding-name", "AMR-WB")
.build();
let sink_templ_caps = self.obj().sink_pad().pad_template_caps();
let amr_nb_supported = peer_caps.can_intersect(&rtp_amr_nb_caps);
let amr_wb_supported = peer_caps.can_intersect(&rtp_amr_wb_caps);
let mut ret_caps_builder = gst::Caps::builder_full();
for s in sink_templ_caps.iter() {
if (s.name() == "audio/AMR" && amr_nb_supported)
|| (s.name() == "audio/AMR-WB" && amr_wb_supported)
{
ret_caps_builder = ret_caps_builder.structure(s.to_owned());
}
}
let mut ret_caps = ret_caps_builder.build();
if let Some(filter) = query.filter() {
ret_caps = ret_caps.intersect_with_mode(filter, gst::CapsIntersectMode::First);
}
query.set_result(&ret_caps);
return true;
}
_ => (),
}
self.parent_sink_query(query)
}
#[allow(clippy::single_match)]
fn src_query(&self, query: &mut gst::QueryRef) -> bool {
let res = self.parent_src_query(query);
if !res {
return false;
}
match query.view_mut() {
gst::QueryViewMut::Latency(query) => {
let settings = self.settings.lock().unwrap();
let (is_live, mut min, mut max) = query.result();
{
let mut live_guard = self.is_live.lock().unwrap();
if Some(is_live) != *live_guard {
gst::info!(CAT, imp = self, "Upstream is live: {is_live}");
*live_guard = Some(is_live);
}
}
if self.effective_aggregate_mode(&settings) == super::AggregateMode::Aggregate {
if let Some(max_ptime) = settings.max_ptime {
min += max_ptime;
max.opt_add_assign(max_ptime);
} else if is_live {
gst::warning!(
CAT,
imp = self,
"Aggregating packets in live mode, but no max_ptime configured. \
Configured latency may be too low!"
);
}
query.set(is_live, min, max);
}
}
_ => (),
}
true
}
}
impl RtpAmrPay {
fn drain_packets(
&self,
settings: &Settings,
state: &mut State,
drain: bool,
) -> Result<gst::FlowSuccess, gst::FlowError> {
let agg_mode = self.effective_aggregate_mode(settings);
let max_payload_size = self.obj().max_payload_size() as usize - 1;
let max_ptime = self.calc_effective_max_ptime(settings, state);
let payload_configuration = PayloadConfiguration {
has_crc: false,
wide_band: state.wide_band,
};
// Send out packets if there's enough data for one (or more), or if draining.
while let Some(first) = state.queued_buffers.front() {
let num_buffers = state.queued_buffers.len();
let queued_bytes = state.queued_bytes;
let queued_frames = state.queued_frames;
let queued_duration =
(gst::ClockTime::from_mseconds(20) * queued_frames as u64).nseconds();
// We optimistically add average size/duration to send out packets as early as possible
// if we estimate that the next buffer would likely overflow our accumulation limits.
//
// Th duration is based on the full buffer content as we'd have to wait not just 20ms until
// the next buffer but the average number of frames per buffer times 20ms.
let full_queued_frames = state.full_queued_frames;
let full_queued_duration =
(gst::ClockTime::from_mseconds(20) * full_queued_frames as u64).nseconds();
let avg_bytes = queued_bytes / queued_frames;
let avg_duration = full_queued_duration / num_buffers as u64;
let is_ready = drain
|| agg_mode != super::AggregateMode::Aggregate
|| queued_bytes + avg_bytes > max_payload_size
|| (max_ptime.map_or(false, |max_ptime| {
queued_duration + avg_duration > max_ptime
}));
gst::log!(
CAT,
imp = self,
"Queued: bytes {}, duration ~{}ms, mode: {:?} + drain {} => ready: {}",
queued_bytes,
queued_duration / 1_000_000,
agg_mode,
drain,
is_ready
);
if !is_ready {
gst::log!(CAT, imp = self, "Not ready yet, waiting for more data");
break;
}
gst::trace!(CAT, imp = self, "Creating packet..");
let mut payload_header = PayloadHeader {
cmr: 15,
toc_entries: SmallVec::new(),
crc: SmallVec::new(),
};
let mut frame_payloads = SmallVec::<[&[u8]; 16]>::new();
let start_offset = first.offset;
let start_id = first.id;
let mut end_id = start_id;
let mut acc_duration = 0;
let mut acc_size = 0;
let discont = state.audio_discont.next_output_offset().is_none();
for buffer in &state.queued_buffers {
let iter = AmrIter {
data: &buffer.buffer,
wide_band: state.wide_band,
};
for frame in iter.skip(buffer.offset) {
let (frame_type, frame_data) = frame.unwrap();
gst::trace!(
CAT,
imp = self,
"frame type {frame_type:?}, accumulated size {acc_size} duration ~{}ms",
acc_duration / 1_000_000
);
// If this frame would overflow the packet, bail out and send out what we have.
//
// Don't take into account the max_ptime for the first frame, since it could be
// lower than the frame duration in which case we would never payload anything.
//
// For the size check in bytes we know that the first frame will fit the mtu,
// because we already checked for the "audio frame bigger than mtu" scenario above.
if acc_size + frame_data.len() + 1 > max_payload_size
|| (max_ptime.is_some()
&& acc_duration > 0
&& acc_duration + 20_000_000 > max_ptime.unwrap())
{
break;
}
// ... otherwise add frame to the TOC
payload_header.toc_entries.push(TocEntry {
last: false,
frame_type,
frame_quality_indicator: false,
});
frame_payloads.push(frame_data);
end_id = buffer.id;
acc_size += frame_data.len() + 1;
acc_duration += 20_000_000;
// .. otherwise check if there are more frames we can add to the packet
}
}
assert!(!payload_header.toc_entries.is_empty());
payload_header.toc_entries.last_mut().unwrap().last = true;
let mut payload_buf = SmallVec::<[u8; 1500]>::new();
let mut packet_builder;
if state.bandwidth_efficient {
let frame_sizes = if state.wide_band {
WB_FRAME_SIZES.as_slice()
} else {
NB_FRAME_SIZES.as_slice()
};
payload_buf.reserve(1 + payload_header.toc_entries.len() + acc_size);
let mut w = BitWriter::endian(&mut payload_buf, BigEndian);
if let Err(err) = w.build_with(&payload_header, &payload_configuration) {
gst::error!(CAT, imp = self, "Failed writing payload header: {err}");
return Err(gst::FlowError::Error);
}
for (toc_entry, mut data) in
Iterator::zip(payload_header.toc_entries.iter(), frame_payloads)
{
let mut num_bits =
*frame_sizes.get(toc_entry.frame_type as usize).unwrap_or(&0);
while num_bits > 8 {
if let Err(err) = w.write_from(data[0]) {
gst::error!(CAT, imp = self, "Failed writing payload: {err}");
return Err(gst::FlowError::Error);
}
data = &data[1..];
num_bits -= 8;
}
if num_bits > 0 {
if let Err(err) = w.write(num_bits as u32, data[0] >> (8 - num_bits)) {
gst::error!(CAT, imp = self, "Failed writing payload: {err}");
return Err(gst::FlowError::Error);
}
}
}
let _ = w.byte_align();
packet_builder = rtp_types::RtpPacketBuilder::new()
.marker_bit(discont)
.payload(payload_buf.as_slice());
} else {
payload_buf.reserve(1 + payload_header.toc_entries.len());
let mut w = ByteWriter::endian(&mut payload_buf, BigEndian);
if let Err(err) = w.build_with(&payload_header, &payload_configuration) {
gst::error!(CAT, imp = self, "Failed writing payload header: {err}");
return Err(gst::FlowError::Error);
}
packet_builder = rtp_types::RtpPacketBuilder::new()
.marker_bit(discont)
.payload(payload_buf.as_slice());
for data in frame_payloads {
packet_builder = packet_builder.payload(data);
}
}
self.obj().queue_packet(
PacketToBufferRelation::IdsWithOffset {
ids: start_id..=end_id,
timestamp_offset: {
if let Some(next_out_offset) = state.audio_discont.next_output_offset() {
TimestampOffset::Rtp(next_out_offset)
} else {
TimestampOffset::Pts(
gst::ClockTime::from_mseconds(20) * start_offset as u64,
)
}
},
},
packet_builder,
)?;
let mut remaining_frames = payload_header.toc_entries.len();
while remaining_frames > 0 {
let first = state.queued_buffers.front_mut().unwrap();
if remaining_frames >= first.num_frames - first.offset {
remaining_frames -= first.num_frames - first.offset;
let _ = state.queued_buffers.pop_front();
} else {
first.offset += remaining_frames;
remaining_frames = 0;
}
}
state.queued_bytes -= acc_size;
state.queued_frames -= payload_header.toc_entries.len();
state.full_queued_frames -= payload_header.toc_entries.len();
let acc_samples =
payload_header.toc_entries.len() * if state.wide_band { 320 } else { 160 };
state.audio_discont.process_output(acc_samples);
}
gst::log!(
CAT,
imp = self,
"All done for now, {} buffer / {} frames queued",
state.queued_buffers.len(),
state.queued_frames,
);
if drain {
self.obj().finish_pending_packets()?;
}
Ok(gst::FlowSuccess::Ok)
}
fn effective_aggregate_mode(&self, settings: &Settings) -> super::AggregateMode {
match settings.aggregate_mode {
super::AggregateMode::Auto => match self.is_live() {
Some(true) => super::AggregateMode::ZeroLatency,
Some(false) => super::AggregateMode::Aggregate,
None => super::AggregateMode::ZeroLatency,
},
mode => mode,
}
}
fn is_live(&self) -> Option<bool> {
*self.is_live.lock().unwrap()
}
// Query upstream live-ness if needed, in case of aggregate-mode=auto
fn ensure_upstream_liveness(&self, settings: &Settings) {
if settings.aggregate_mode != super::AggregateMode::Auto || self.is_live().is_some() {
return;
}
let mut q = gst::query::Latency::new();
let is_live = if self.obj().sink_pad().peer_query(&mut q) {
let (is_live, _, _) = q.result();
is_live
} else {
false
};
*self.is_live.lock().unwrap() = Some(is_live);
gst::info!(CAT, imp = self, "Upstream is live: {is_live}");
}
// We can get max ptime or ptime recommendations/restrictions from multiple places, e.g. the
// "max-ptime" property, but also from "maxptime" or "ptime" values from downstream / an SDP.
//
// Here we look at the various values and decide on an effective max ptime value.
//
// We'll just return the lowest of any set values.
//
fn calc_effective_max_ptime(&self, settings: &Settings, state: &State) -> Option<u64> {
[settings.max_ptime, state.max_ptime, state.ptime]
.into_iter()
.filter(|v| v.is_some())
.min()
.map(|t| t.unwrap().nseconds())
}
}
struct AmrIter<'a> {
data: &'a [u8],
wide_band: bool,
}
impl<'a> Iterator for AmrIter<'a> {
type Item = Result<(u8, &'a [u8]), anyhow::Error>;
fn next(&mut self) -> Option<Self::Item> {
if self.data.is_empty() {
return None;
}
let frame_sizes = if self.wide_band {
WB_FRAME_SIZES_BYTES.as_slice()
} else {
NB_FRAME_SIZES_BYTES.as_slice()
};
let frame_type = (self.data[0] & 0b0111_1000) >> 3;
if !self.wide_band && (9..=14).contains(&frame_type) {
self.data = &[];
return Some(Err(anyhow!("Invalid AMR frame type {frame_type}")));
}
if self.wide_band && (10..=13).contains(&frame_type) {
self.data = &[];
return Some(Err(anyhow!("Invalid AMR-WB frame type {frame_type}")));
}
// Empty frames
if frame_type > 10 {
self.data = &self.data[1..];
return Some(Ok((frame_type, &[])));
}
let frame_size = *frame_sizes
.get(frame_type as usize)
.expect("Invalid frame type") as usize;
if self.data.len() < frame_size + 1 {
self.data = &[];
return Some(Err(anyhow!("Not enough data")));
}
let res_data = &self.data[1..][..frame_size];
self.data = &self.data[(frame_size + 1)..];
Some(Ok((frame_type, res_data)))
}
}

View file

@ -0,0 +1,55 @@
// Copyright (C) 2023 Sebastian Dröge <sebastian@centricular.com>
//
// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
// If a copy of the MPL was not distributed with this file, You can obtain one at
// <https://mozilla.org/MPL/2.0/>.
//
// SPDX-License-Identifier: MPL-2.0
use gst::glib;
use gst::prelude::*;
pub mod imp;
glib::wrapper! {
pub struct RtpAmrPay(ObjectSubclass<imp::RtpAmrPay>)
@extends crate::basepay::RtpBasePay2, gst::Element, gst::Object;
}
#[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Clone, Copy, glib::Enum)]
#[repr(i32)]
#[enum_type(name = "GstRtpAmrPayAggregateMode")]
#[non_exhaustive]
pub(crate) enum AggregateMode {
#[enum_value(
name = "Automatic: zero-latency if upstream is live, otherwise aggregate frames until packet is full.",
nick = "auto"
)]
Auto = -1,
#[enum_value(
name = "Zero Latency: always send out frames right away, do not wait for more frames to fill a packet.",
nick = "zero-latency"
)]
ZeroLatency = 0,
#[enum_value(
name = "Aggregate: collect audio frames until we have a full packet or the max-ptime limit is hit (if set).",
nick = "aggregate"
)]
Aggregate = 1,
}
pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
#[cfg(feature = "doc")]
{
AggregateMode::static_type().mark_as_plugin_api(gst::PluginAPIFlags::empty());
}
gst::Element::register(
Some(plugin),
"rtpamrpay2",
gst::Rank::MARGINAL,
RtpAmrPay::static_type(),
)
}

View file

@ -0,0 +1,348 @@
// Copyright (C) 2023 Sebastian Dröge <sebastian@centricular.com>
//
// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
// If a copy of the MPL was not distributed with this file, You can obtain one at
// <https://mozilla.org/MPL/2.0/>.
//
// SPDX-License-Identifier: MPL-2.0
use anyhow::{bail, Context as _};
use bitstream_io::{FromBitStreamWith, FromByteStreamWith, ToBitStreamWith, ToByteStreamWith};
use smallvec::SmallVec;
#[derive(Debug)]
pub struct PayloadConfiguration {
pub has_crc: bool,
pub wide_band: bool,
}
#[derive(Debug)]
pub struct PayloadHeader {
pub cmr: u8,
// We don't handle interleaving yet so ILL/ILP are not parsed
pub toc_entries: SmallVec<[TocEntry; 16]>,
#[allow(unused)]
pub crc: SmallVec<[u8; 16]>,
}
impl PayloadHeader {
pub fn buffer_size(&self, wide_band: bool) -> usize {
let frame_sizes = if wide_band {
WB_FRAME_SIZES_BYTES.as_slice()
} else {
NB_FRAME_SIZES_BYTES.as_slice()
};
self.toc_entries
.iter()
.map(|entry| 1 + *frame_sizes.get(entry.frame_type as usize).unwrap_or(&0) as usize)
.sum()
}
}
impl FromByteStreamWith<'_> for PayloadHeader {
type Error = anyhow::Error;
type Context = PayloadConfiguration;
fn from_reader<R: bitstream_io::ByteRead + ?Sized>(
r: &mut R,
cfg: &Self::Context,
) -> Result<Self, Self::Error>
where
Self: Sized,
{
let b = r.read::<u8>().context("cmr")?;
let cmr = (b & 0b1111_0000) >> 4;
let mut toc_entries = SmallVec::<[TocEntry; 16]>::new();
loop {
let toc_entry = r.parse_with::<TocEntry>(cfg).context("toc_entry")?;
let last = toc_entry.last;
toc_entries.push(toc_entry);
if last {
break;
}
}
let mut crc = SmallVec::<[u8; 16]>::new();
if cfg.has_crc {
for toc_entry in &toc_entries {
// Frame types without payload
if toc_entry.frame_type > 9 {
continue;
}
let c = r.read::<u8>().context("crc")?;
crc.push(c);
}
}
Ok(PayloadHeader {
cmr,
toc_entries,
crc,
})
}
}
impl FromBitStreamWith<'_> for PayloadHeader {
type Error = anyhow::Error;
type Context = PayloadConfiguration;
fn from_reader<R: bitstream_io::BitRead + ?Sized>(
r: &mut R,
cfg: &Self::Context,
) -> Result<Self, Self::Error>
where
Self: Sized,
{
if cfg.has_crc {
bail!("CRC not allowed in bandwidth-efficient mode");
}
let cmr = r.read::<u8>(4).context("cmr")?;
let mut toc_entries = SmallVec::<[TocEntry; 16]>::new();
loop {
let toc_entry = r.parse_with::<TocEntry>(cfg).context("toc_entry")?;
let last = toc_entry.last;
toc_entries.push(toc_entry);
if last {
break;
}
}
let crc = SmallVec::<[u8; 16]>::new();
Ok(PayloadHeader {
cmr,
toc_entries,
crc,
})
}
}
impl ToByteStreamWith<'_> for PayloadHeader {
type Error = anyhow::Error;
type Context = PayloadConfiguration;
fn to_writer<W: bitstream_io::ByteWrite + ?Sized>(
&self,
w: &mut W,
cfg: &Self::Context,
) -> Result<(), Self::Error>
where
Self: Sized,
{
if cfg.has_crc {
bail!("Writing CRC not supported");
}
if self.cmr < 0b1111 {
bail!("Invalid CMR value");
}
if self.toc_entries.is_empty() {
bail!("No TOC entries");
}
w.write::<u8>(self.cmr << 4).context("cmr")?;
for (i, entry) in self.toc_entries.iter().enumerate() {
let mut entry = entry.clone();
entry.last = i == self.toc_entries.len() - 1;
w.build_with::<TocEntry>(&entry, cfg).context("toc_entry")?;
}
Ok(())
}
}
impl ToBitStreamWith<'_> for PayloadHeader {
type Error = anyhow::Error;
type Context = PayloadConfiguration;
fn to_writer<W: bitstream_io::BitWrite + ?Sized>(
&self,
w: &mut W,
cfg: &Self::Context,
) -> Result<(), Self::Error>
where
Self: Sized,
{
if cfg.has_crc {
bail!("Writing CRC not supported");
}
if self.cmr < 0b1111 {
bail!("Invalid CMR value");
}
if self.toc_entries.is_empty() {
bail!("No TOC entries");
}
w.write::<u8>(4, self.cmr).context("cmr")?;
for (i, entry) in self.toc_entries.iter().enumerate() {
let mut entry = entry.clone();
entry.last = i == self.toc_entries.len() - 1;
w.build_with::<TocEntry>(&entry, cfg).context("toc_entry")?;
}
Ok(())
}
}
#[derive(Debug, Clone)]
pub struct TocEntry {
pub frame_type: u8,
pub frame_quality_indicator: bool,
pub last: bool,
}
impl TocEntry {
pub fn frame_header(&self) -> u8 {
self.frame_type << 3 | (self.frame_quality_indicator as u8) << 2
}
}
impl FromByteStreamWith<'_> for TocEntry {
type Error = anyhow::Error;
type Context = PayloadConfiguration;
fn from_reader<R: bitstream_io::ByteRead + ?Sized>(
r: &mut R,
cfg: &Self::Context,
) -> Result<Self, Self::Error>
where
Self: Sized,
{
let b = r.read::<u8>().context("toc_entry")?;
let last = (b & 0b1000_0000) == 0;
let frame_type = (b & 0b0111_1000) >> 3;
let frame_quality_indicator = (b & 0b0000_0100) != 0;
if !cfg.wide_band && (9..=14).contains(&frame_type) {
bail!("Invalid AMR frame type {frame_type}");
}
if cfg.wide_band && (10..=13).contains(&frame_type) {
bail!("Invalid AMR-WB frame type {frame_type}");
}
Ok(TocEntry {
frame_type,
frame_quality_indicator,
last,
})
}
}
impl FromBitStreamWith<'_> for TocEntry {
type Error = anyhow::Error;
type Context = PayloadConfiguration;
fn from_reader<R: bitstream_io::BitRead + ?Sized>(
r: &mut R,
cfg: &Self::Context,
) -> Result<Self, Self::Error>
where
Self: Sized,
{
let last = !r.read_bit().context("last")?;
let frame_type = r.read::<u8>(4).context("frame_type")?;
let frame_quality_indicator = r.read_bit().context("q")?;
if !cfg.wide_band && (9..=14).contains(&frame_type) {
bail!("Invalid AMR frame type {frame_type}");
}
if cfg.wide_band && (10..=13).contains(&frame_type) {
bail!("Invalid AMR-WB frame type {frame_type}");
}
Ok(TocEntry {
frame_type,
frame_quality_indicator,
last,
})
}
}
impl ToByteStreamWith<'_> for TocEntry {
type Error = anyhow::Error;
type Context = PayloadConfiguration;
fn to_writer<W: bitstream_io::ByteWrite + ?Sized>(
&self,
w: &mut W,
cfg: &Self::Context,
) -> Result<(), Self::Error>
where
Self: Sized,
{
if !cfg.wide_band && (9..=14).contains(&self.frame_type) {
bail!("Invalid AMR frame type {}", self.frame_type);
}
if cfg.wide_band && (10..=13).contains(&self.frame_type) {
bail!("Invalid AMR-WB frame type {}", self.frame_type);
}
if self.frame_type > 15 {
bail!("Invalid AMR frame type {}", self.frame_type);
}
let b = ((!self.last as u8) << 7)
| (self.frame_type << 3)
| ((self.frame_quality_indicator as u8) << 2);
w.write::<u8>(b).context("toc_entry")?;
Ok(())
}
}
impl ToBitStreamWith<'_> for TocEntry {
type Error = anyhow::Error;
type Context = PayloadConfiguration;
fn to_writer<W: bitstream_io::BitWrite + ?Sized>(
&self,
w: &mut W,
cfg: &Self::Context,
) -> Result<(), Self::Error>
where
Self: Sized,
{
if !cfg.wide_band && (9..=14).contains(&self.frame_type) {
bail!("Invalid AMR frame type {}", self.frame_type);
}
if cfg.wide_band && (10..=13).contains(&self.frame_type) {
bail!("Invalid AMR-WB frame type {}", self.frame_type);
}
if self.frame_type > 15 {
bail!("Invalid AMR frame type {}", self.frame_type);
}
w.write_bit(!self.last).context("last")?;
w.write::<u8>(4, self.frame_type).context("frame_type")?;
w.write_bit(self.frame_quality_indicator)
.context("frame_quality_indicator")?;
Ok(())
}
}
// See RFC3267 Table 1
pub static NB_FRAME_SIZES: [u16; 9] = [95, 103, 118, 134, 148, 159, 204, 244, 39];
pub static NB_FRAME_SIZES_BYTES: [u8; 9] = [12, 13, 15, 17, 19, 20, 26, 31, 5];
// See ETSI TS 126 201 Table 2 and 3
pub static WB_FRAME_SIZES: [u16; 10] = [132, 177, 253, 285, 317, 365, 397, 461, 477, 40];
pub static WB_FRAME_SIZES_BYTES: [u8; 10] = [17, 23, 32, 36, 40, 46, 50, 58, 60, 5];

View file

@ -0,0 +1,544 @@
// Copyright (C) 2024 Sebastian Dröge <sebastian@centricular.com>
//
// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
// If a copy of the MPL was not distributed with this file, You can obtain one at
// <https://mozilla.org/MPL/2.0/>.
//
// SPDX-License-Identifier: MPL-2.0
use crate::tests::{run_test_pipeline, ExpectedBuffer, ExpectedPacket, Source};
fn init() {
use std::sync::Once;
static INIT: Once = Once::new();
INIT.call_once(|| {
gst::init().unwrap();
crate::plugin_register_static().expect("rtpamr test");
});
}
// 6 encoded frames of 32 bytes / 20ms
static AMR_NB_DATA: &[u8] = include_bytes!("test.amrnb");
fn get_amr_nb_data() -> (gst::Caps, Vec<gst::Buffer>) {
let caps = gst::Caps::builder("audio/AMR")
.field("rate", 8_000i32)
.field("channels", 1i32)
.build();
let buffers = AMR_NB_DATA
.chunks_exact(32)
.enumerate()
.map(|(idx, c)| {
let mut buf = gst::Buffer::from_slice(c);
{
let buf = buf.get_mut().unwrap();
buf.set_pts(idx as u64 * gst::ClockTime::from_mseconds(20));
buf.set_duration(gst::ClockTime::from_mseconds(20));
if idx == 0 {
buf.set_flags(gst::BufferFlags::DISCONT);
}
}
buf
})
.collect();
(caps, buffers)
}
// 4 encoded frames of 18 bytes / 20ms
static AMR_WB_DATA: &[u8] = include_bytes!("test.amrwb");
fn get_amr_wb_data() -> (gst::Caps, Vec<gst::Buffer>) {
let caps = gst::Caps::builder("audio/AMR-WB")
.field("rate", 16_000i32)
.field("channels", 1i32)
.build();
let buffers = AMR_WB_DATA
.chunks_exact(18)
.enumerate()
.map(|(idx, c)| {
let mut buf = gst::Buffer::from_slice(c);
{
let buf = buf.get_mut().unwrap();
buf.set_pts(idx as u64 * gst::ClockTime::from_mseconds(20));
buf.set_duration(gst::ClockTime::from_mseconds(20));
if idx == 0 {
buf.set_flags(gst::BufferFlags::DISCONT);
}
}
buf
})
.collect();
(caps, buffers)
}
#[test]
fn test_amr_nb() {
init();
let (caps, buffers) = get_amr_nb_data();
let pay = "rtpamrpay2 aggregate-mode=zero-latency";
let depay = "rtpamrdepay2";
let expected_pay = vec![
vec![ExpectedPacket::builder()
.pts(gst::ClockTime::from_mseconds(0))
.flags(gst::BufferFlags::DISCONT | gst::BufferFlags::MARKER)
.pt(96)
.rtp_time(0)
.marker_bit(true)
.size(45)
.build()],
vec![ExpectedPacket::builder()
.pts(gst::ClockTime::from_mseconds(20))
.flags(gst::BufferFlags::empty())
.pt(96)
.rtp_time(160)
.marker_bit(false)
.size(45)
.build()],
vec![ExpectedPacket::builder()
.pts(gst::ClockTime::from_mseconds(40))
.flags(gst::BufferFlags::empty())
.pt(96)
.rtp_time(320)
.marker_bit(false)
.size(45)
.build()],
vec![ExpectedPacket::builder()
.pts(gst::ClockTime::from_mseconds(60))
.flags(gst::BufferFlags::empty())
.pt(96)
.rtp_time(480)
.marker_bit(false)
.size(45)
.build()],
vec![ExpectedPacket::builder()
.pts(gst::ClockTime::from_mseconds(80))
.flags(gst::BufferFlags::empty())
.pt(96)
.rtp_time(640)
.marker_bit(false)
.size(45)
.build()],
vec![ExpectedPacket::builder()
.pts(gst::ClockTime::from_mseconds(100))
.flags(gst::BufferFlags::empty())
.pt(96)
.rtp_time(800)
.marker_bit(false)
.size(45)
.build()],
];
let expected_depay = vec![
vec![ExpectedBuffer::builder()
.pts(gst::ClockTime::from_mseconds(0))
.size(32)
.flags(gst::BufferFlags::DISCONT | gst::BufferFlags::RESYNC)
.build()],
vec![ExpectedBuffer::builder()
.pts(gst::ClockTime::from_mseconds(20))
.size(32)
.flags(gst::BufferFlags::empty())
.build()],
vec![ExpectedBuffer::builder()
.pts(gst::ClockTime::from_mseconds(40))
.size(32)
.flags(gst::BufferFlags::empty())
.build()],
vec![ExpectedBuffer::builder()
.pts(gst::ClockTime::from_mseconds(60))
.size(32)
.flags(gst::BufferFlags::empty())
.build()],
vec![ExpectedBuffer::builder()
.pts(gst::ClockTime::from_mseconds(80))
.size(32)
.flags(gst::BufferFlags::empty())
.build()],
vec![ExpectedBuffer::builder()
.pts(gst::ClockTime::from_mseconds(100))
.size(32)
.flags(gst::BufferFlags::empty())
.build()],
];
run_test_pipeline(
Source::Buffers(caps, buffers),
pay,
depay,
expected_pay,
expected_depay,
);
}
#[test]
fn test_amr_nb_bit_packed() {
init();
let (caps, buffers) = get_amr_nb_data();
let pay = "rtpamrpay2 aggregate-mode=zero-latency ! application/x-rtp,octet-align=(string)0";
let depay = "rtpamrdepay2";
let expected_pay = vec![
vec![ExpectedPacket::builder()
.pts(gst::ClockTime::from_mseconds(0))
.flags(gst::BufferFlags::DISCONT | gst::BufferFlags::MARKER)
.pt(96)
.rtp_time(0)
.marker_bit(true)
.size(45)
.build()],
vec![ExpectedPacket::builder()
.pts(gst::ClockTime::from_mseconds(20))
.flags(gst::BufferFlags::empty())
.pt(96)
.rtp_time(160)
.marker_bit(false)
.size(45)
.build()],
vec![ExpectedPacket::builder()
.pts(gst::ClockTime::from_mseconds(40))
.flags(gst::BufferFlags::empty())
.pt(96)
.rtp_time(320)
.marker_bit(false)
.size(45)
.build()],
vec![ExpectedPacket::builder()
.pts(gst::ClockTime::from_mseconds(60))
.flags(gst::BufferFlags::empty())
.pt(96)
.rtp_time(480)
.marker_bit(false)
.size(45)
.build()],
vec![ExpectedPacket::builder()
.pts(gst::ClockTime::from_mseconds(80))
.flags(gst::BufferFlags::empty())
.pt(96)
.rtp_time(640)
.marker_bit(false)
.size(45)
.build()],
vec![ExpectedPacket::builder()
.pts(gst::ClockTime::from_mseconds(100))
.flags(gst::BufferFlags::empty())
.pt(96)
.rtp_time(800)
.marker_bit(false)
.size(45)
.build()],
];
let expected_depay = vec![
vec![ExpectedBuffer::builder()
.pts(gst::ClockTime::from_mseconds(0))
.size(32)
.flags(gst::BufferFlags::DISCONT | gst::BufferFlags::RESYNC)
.build()],
vec![ExpectedBuffer::builder()
.pts(gst::ClockTime::from_mseconds(20))
.size(32)
.flags(gst::BufferFlags::empty())
.build()],
vec![ExpectedBuffer::builder()
.pts(gst::ClockTime::from_mseconds(40))
.size(32)
.flags(gst::BufferFlags::empty())
.build()],
vec![ExpectedBuffer::builder()
.pts(gst::ClockTime::from_mseconds(60))
.size(32)
.flags(gst::BufferFlags::empty())
.build()],
vec![ExpectedBuffer::builder()
.pts(gst::ClockTime::from_mseconds(80))
.size(32)
.flags(gst::BufferFlags::empty())
.build()],
vec![ExpectedBuffer::builder()
.pts(gst::ClockTime::from_mseconds(100))
.size(32)
.flags(gst::BufferFlags::empty())
.build()],
];
run_test_pipeline(
Source::Buffers(caps, buffers),
pay,
depay,
expected_pay,
expected_depay,
);
}
#[test]
fn test_amr_nb_aggregate() {
init();
let (caps, buffers) = get_amr_nb_data();
let pay = "rtpamrpay2 max-ptime=40000000";
let depay = "rtpamrdepay2";
let expected_pay = vec![
vec![ExpectedPacket::builder()
.pts(gst::ClockTime::from_mseconds(0))
.flags(gst::BufferFlags::DISCONT | gst::BufferFlags::MARKER)
.pt(96)
.rtp_time(0)
.marker_bit(true)
.size(77)
.build()],
vec![ExpectedPacket::builder()
.pts(gst::ClockTime::from_mseconds(40))
.flags(gst::BufferFlags::empty())
.pt(96)
.rtp_time(320)
.marker_bit(false)
.size(77)
.build()],
vec![ExpectedPacket::builder()
.pts(gst::ClockTime::from_mseconds(80))
.flags(gst::BufferFlags::empty())
.pt(96)
.rtp_time(640)
.marker_bit(false)
.size(77)
.build()],
];
let expected_depay = vec![
vec![ExpectedBuffer::builder()
.pts(gst::ClockTime::from_mseconds(0))
.size(64)
.flags(gst::BufferFlags::DISCONT | gst::BufferFlags::RESYNC)
.build()],
vec![ExpectedBuffer::builder()
.pts(gst::ClockTime::from_mseconds(40))
.size(64)
.flags(gst::BufferFlags::empty())
.build()],
vec![ExpectedBuffer::builder()
.pts(gst::ClockTime::from_mseconds(80))
.size(64)
.flags(gst::BufferFlags::empty())
.build()],
];
run_test_pipeline(
Source::Buffers(caps, buffers),
pay,
depay,
expected_pay,
expected_depay,
);
}
#[test]
fn test_amr_wb() {
init();
let (caps, buffers) = get_amr_wb_data();
let pay = "rtpamrpay2 aggregate-mode=zero-latency";
let depay = "rtpamrdepay2";
let expected_pay = vec![
vec![ExpectedPacket::builder()
.pts(gst::ClockTime::from_mseconds(0))
.flags(gst::BufferFlags::DISCONT | gst::BufferFlags::MARKER)
.pt(96)
.rtp_time(0)
.marker_bit(true)
.size(31)
.build()],
vec![ExpectedPacket::builder()
.pts(gst::ClockTime::from_mseconds(20))
.flags(gst::BufferFlags::empty())
.pt(96)
.rtp_time(320)
.marker_bit(false)
.size(31)
.build()],
vec![ExpectedPacket::builder()
.pts(gst::ClockTime::from_mseconds(40))
.flags(gst::BufferFlags::empty())
.pt(96)
.rtp_time(640)
.marker_bit(false)
.size(31)
.build()],
vec![ExpectedPacket::builder()
.pts(gst::ClockTime::from_mseconds(60))
.flags(gst::BufferFlags::empty())
.pt(96)
.rtp_time(960)
.marker_bit(false)
.size(31)
.build()],
];
let expected_depay = vec![
vec![ExpectedBuffer::builder()
.pts(gst::ClockTime::from_mseconds(0))
.size(18)
.flags(gst::BufferFlags::DISCONT | gst::BufferFlags::RESYNC)
.build()],
vec![ExpectedBuffer::builder()
.pts(gst::ClockTime::from_mseconds(20))
.size(18)
.flags(gst::BufferFlags::empty())
.build()],
vec![ExpectedBuffer::builder()
.pts(gst::ClockTime::from_mseconds(40))
.size(18)
.flags(gst::BufferFlags::empty())
.build()],
vec![ExpectedBuffer::builder()
.pts(gst::ClockTime::from_mseconds(60))
.size(18)
.flags(gst::BufferFlags::empty())
.build()],
];
run_test_pipeline(
Source::Buffers(caps, buffers),
pay,
depay,
expected_pay,
expected_depay,
);
}
#[test]
fn test_amr_wb_bit_packed() {
init();
let (caps, buffers) = get_amr_wb_data();
let pay = "rtpamrpay2 aggregate-mode=zero-latency ! application/x-rtp,octet-align=(string)0";
let depay = "rtpamrdepay2";
let expected_pay = vec![
vec![ExpectedPacket::builder()
.pts(gst::ClockTime::from_mseconds(0))
.flags(gst::BufferFlags::DISCONT | gst::BufferFlags::MARKER)
.pt(96)
.rtp_time(0)
.marker_bit(true)
.size(30)
.build()],
vec![ExpectedPacket::builder()
.pts(gst::ClockTime::from_mseconds(20))
.flags(gst::BufferFlags::empty())
.pt(96)
.rtp_time(320)
.marker_bit(false)
.size(30)
.build()],
vec![ExpectedPacket::builder()
.pts(gst::ClockTime::from_mseconds(40))
.flags(gst::BufferFlags::empty())
.pt(96)
.rtp_time(640)
.marker_bit(false)
.size(30)
.build()],
vec![ExpectedPacket::builder()
.pts(gst::ClockTime::from_mseconds(60))
.flags(gst::BufferFlags::empty())
.pt(96)
.rtp_time(960)
.marker_bit(false)
.size(30)
.build()],
];
let expected_depay = vec![
vec![ExpectedBuffer::builder()
.pts(gst::ClockTime::from_mseconds(0))
.size(18)
.flags(gst::BufferFlags::DISCONT | gst::BufferFlags::RESYNC)
.build()],
vec![ExpectedBuffer::builder()
.pts(gst::ClockTime::from_mseconds(20))
.size(18)
.flags(gst::BufferFlags::empty())
.build()],
vec![ExpectedBuffer::builder()
.pts(gst::ClockTime::from_mseconds(40))
.size(18)
.flags(gst::BufferFlags::empty())
.build()],
vec![ExpectedBuffer::builder()
.pts(gst::ClockTime::from_mseconds(60))
.size(18)
.flags(gst::BufferFlags::empty())
.build()],
];
run_test_pipeline(
Source::Buffers(caps, buffers),
pay,
depay,
expected_pay,
expected_depay,
);
}
#[test]
fn test_amr_wb_aggregate() {
init();
let (caps, buffers) = get_amr_wb_data();
let pay = "rtpamrpay2 max-ptime=40000000";
let depay = "rtpamrdepay2";
let expected_pay = vec![
vec![ExpectedPacket::builder()
.pts(gst::ClockTime::from_mseconds(0))
.flags(gst::BufferFlags::DISCONT | gst::BufferFlags::MARKER)
.pt(96)
.rtp_time(0)
.marker_bit(true)
.size(49)
.build()],
vec![ExpectedPacket::builder()
.pts(gst::ClockTime::from_mseconds(40))
.flags(gst::BufferFlags::empty())
.pt(96)
.rtp_time(640)
.marker_bit(false)
.size(49)
.build()],
];
let expected_depay = vec![
vec![ExpectedBuffer::builder()
.pts(gst::ClockTime::from_mseconds(0))
.size(36)
.flags(gst::BufferFlags::DISCONT | gst::BufferFlags::RESYNC)
.build()],
vec![ExpectedBuffer::builder()
.pts(gst::ClockTime::from_mseconds(40))
.size(36)
.flags(gst::BufferFlags::empty())
.build()],
];
run_test_pipeline(
Source::Buffers(caps, buffers),
pay,
depay,
expected_pay,
expected_depay,
);
}

Binary file not shown.

Binary file not shown.

View file

@ -31,6 +31,7 @@ mod basedepay;
mod basepay;
mod ac3;
mod amr;
mod av1;
mod jpeg;
mod klv;
@ -64,6 +65,9 @@ fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
ac3::depay::register(plugin)?;
ac3::pay::register(plugin)?;
amr::depay::register(plugin)?;
amr::pay::register(plugin)?;
av1::depay::register(plugin)?;
av1::pay::register(plugin)?;