From 81ff664666cfaf889a0be9deb02ae51b8318230a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Dr=C3=B6ge?= Date: Mon, 25 Dec 2023 13:14:34 +0200 Subject: [PATCH] rtp: Add AMR NB/WB RTP payloader/depayloader Part-of: --- docs/plugins/gst_plugins_cache.json | 128 ++++ net/rtp/src/amr/depay/imp.rs | 371 ++++++++++++ net/rtp/src/amr/depay/mod.rs | 26 + net/rtp/src/amr/mod.rs | 13 + net/rtp/src/amr/pay/imp.rs | 866 ++++++++++++++++++++++++++++ net/rtp/src/amr/pay/mod.rs | 55 ++ net/rtp/src/amr/payload_header.rs | 348 +++++++++++ net/rtp/src/amr/tests/mod.rs | 544 +++++++++++++++++ net/rtp/src/amr/tests/test.amrnb | Bin 0 -> 192 bytes net/rtp/src/amr/tests/test.amrwb | Bin 0 -> 72 bytes net/rtp/src/lib.rs | 4 + 11 files changed, 2355 insertions(+) create mode 100644 net/rtp/src/amr/depay/imp.rs create mode 100644 net/rtp/src/amr/depay/mod.rs create mode 100644 net/rtp/src/amr/mod.rs create mode 100644 net/rtp/src/amr/pay/imp.rs create mode 100644 net/rtp/src/amr/pay/mod.rs create mode 100644 net/rtp/src/amr/payload_header.rs create mode 100644 net/rtp/src/amr/tests/mod.rs create mode 100644 net/rtp/src/amr/tests/test.amrnb create mode 100644 net/rtp/src/amr/tests/test.amrwb diff --git a/docs/plugins/gst_plugins_cache.json b/docs/plugins/gst_plugins_cache.json index 7f4864b4e..4ae10c687 100644 --- a/docs/plugins/gst_plugins_cache.json +++ b/docs/plugins/gst_plugins_cache.json @@ -9738,6 +9738,114 @@ }, "rank": "marginal" }, + "rtpamrdepay2": { + "author": "Sebastian Dröge ", + "description": "Depayload an AMR audio stream from RTP packets (RFC 3267)", + "hierarchy": [ + "GstRtpAmrDepay2", + "GstRtpBaseDepay2", + "GstElement", + "GstObject", + "GInitiallyUnowned", + "GObject" + ], + "klass": "Codec/Depayloader/Network/RTP", + "pad-templates": { + "sink": { + "caps": "application/x-rtp:\n media: audio\n encoding-name: AMR\n clock-rate: 8000\napplication/x-rtp:\n media: audio\n encoding-name: AMR-WB\n clock-rate: 16000\n", + "direction": "sink", + "presence": "always" + }, + "src": { + "caps": "audio/AMR:\n channels: 1\n rate: 8000\naudio/AMR-WB:\n channels: 1\n rate: 16000\n", + "direction": "src", + "presence": "always" + } + }, + "rank": "marginal" + }, + "rtpamrpay2": { + "author": "Sebastian Dröge ", + "description": "Payload an AMR audio stream into RTP packets (RFC 3267)", + "hierarchy": [ + "GstRtpAmrPay2", + "GstRtpBasePay2", + "GstElement", + "GstObject", + "GInitiallyUnowned", + "GObject" + ], + "klass": "Codec/Payloader/Network/RTP", + "pad-templates": { + "sink": { + "caps": "audio/AMR:\n channels: 1\n rate: 8000\naudio/AMR-WB:\n channels: 1\n rate: 16000\n", + "direction": "sink", + "presence": "always" + }, + "src": { + "caps": "application/x-rtp:\n media: audio\n encoding-name: AMR\n clock-rate: 8000\nencoding-params: 1\n octet-align: { (string)0, (string)1 }\n crc: 0\n robust-sorting: 0\n interleaving: 0\napplication/x-rtp:\n media: audio\n encoding-name: AMR-WB\n clock-rate: 16000\nencoding-params: 1\n octet-align: { (string)0, (string)1 }\n crc: 0\n robust-sorting: 0\n interleaving: 0\n", + "direction": "src", + "presence": "always" + } + }, + "properties": { + "aggregate-mode": { + "blurb": "Whether to send out audio frames immediately or aggregate them until a packet is full.", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "auto (-1)", + "mutable": "null", + "readable": true, + "type": "GstRtpAmrPayAggregateMode", + "writable": true + }, + "alignment-threshold": { + "blurb": "Timestamp alignment threshold in nanoseconds", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "40000000", + "max": "18446744073709551615", + "min": "0", + "mutable": "playing", + "readable": true, + "type": "guint64", + "writable": true + }, + "discont-wait": { + "blurb": "Window of time in nanoseconds to wait before creating a discontinuity", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "1000000000", + "max": "18446744073709551615", + "min": "0", + "mutable": "playing", + "readable": true, + "type": "guint64", + "writable": true + }, + "max-ptime": { + "blurb": "Maximum duration of the packet data in ns (-1 = unlimited up to MTU)", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "18446744073709551615", + "max": "9223372036854775807", + "min": "-1", + "mutable": "playing", + "readable": true, + "type": "gint64", + "writable": true + } + }, + "rank": "marginal" + }, "rtpav1depay": { "author": "Vivienne Watermeier ", "description": "Depayload AV1 from RTP packets", @@ -10899,6 +11007,26 @@ } ] }, + "GstRtpAmrPayAggregateMode": { + "kind": "enum", + "values": [ + { + "desc": "Automatic: zero-latency if upstream is live, otherwise aggregate frames until packet is full.", + "name": "auto", + "value": "-1" + }, + { + "desc": "Zero Latency: always send out frames right away, do not wait for more frames to fill a packet.", + "name": "zero-latency", + "value": "0" + }, + { + "desc": "Aggregate: collect audio frames until we have a full packet or the max-ptime limit is hit (if set).", + "name": "aggregate", + "value": "1" + } + ] + }, "GstRtpBaseAudioPay2": { "hierarchy": [ "GstRtpBaseAudioPay2", diff --git a/net/rtp/src/amr/depay/imp.rs b/net/rtp/src/amr/depay/imp.rs new file mode 100644 index 000000000..75e9a8493 --- /dev/null +++ b/net/rtp/src/amr/depay/imp.rs @@ -0,0 +1,371 @@ +// Copyright (C) 2023 Sebastian Dröge +// +// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at +// . +// +// SPDX-License-Identifier: MPL-2.0 + +use std::io; + +use atomic_refcell::AtomicRefCell; +use bitstream_io::{BigEndian, BitRead as _, BitReader, ByteRead as _, ByteReader}; +/** + * SECTION:element-rtpamrdepay2 + * @see_also: rtpamrpay2, rtpamrpay, rtpamrdepay, amrnbdec, amrnbenc, amrwbdec, voamrwbenc + * + * Extracts an AMR audio stream from RTP packets as per [RFC 3267][rfc-3267]. + * + * [rfc-3267]: https://datatracker.ietf.org/doc/html/rfc3267 + * + * ## Example pipeline + * + * |[ + * gst-launch-1.0 udpsrc caps='application/x-rtp, media=audio, clock-rate=8000, encoding-name=AMR, octet-align=(string)1' ! rtpjitterbuffer latency=50 ! rtpamrdepay2 ! amrnbdec ! audioconvert ! audioresample ! autoaudiosink + * ]| This will depayload an incoming RTP AMR NB audio stream. You can use the #amrnbenc and + * #rtpamrpay2 elements to create such an RTP stream. + * + * Since: 0.14 + */ +use gst::{glib, subclass::prelude::*}; + +use std::sync::LazyLock; + +use crate::{ + amr::payload_header::{ + PayloadConfiguration, PayloadHeader, NB_FRAME_SIZES, NB_FRAME_SIZES_BYTES, WB_FRAME_SIZES, + WB_FRAME_SIZES_BYTES, + }, + basedepay::{RtpBaseDepay2Ext, RtpBaseDepay2Impl}, +}; + +#[derive(Default)] +struct State { + wide_band: bool, + has_crc: bool, + bandwidth_efficient: bool, +} + +#[derive(Default)] +pub struct RtpAmrDepay { + state: AtomicRefCell, +} + +static CAT: LazyLock = LazyLock::new(|| { + gst::DebugCategory::new( + "rtpamrdepay2", + gst::DebugColorFlags::empty(), + Some("RTP AMR Depayloader"), + ) +}); + +#[glib::object_subclass] +impl ObjectSubclass for RtpAmrDepay { + const NAME: &'static str = "GstRtpAmrDepay2"; + type Type = super::RtpAmrDepay; + type ParentType = crate::basedepay::RtpBaseDepay2; +} + +impl ObjectImpl for RtpAmrDepay {} + +impl GstObjectImpl for RtpAmrDepay {} + +impl ElementImpl for RtpAmrDepay { + fn metadata() -> Option<&'static gst::subclass::ElementMetadata> { + static ELEMENT_METADATA: LazyLock = LazyLock::new(|| { + gst::subclass::ElementMetadata::new( + "RTP AMR Depayloader", + "Codec/Depayloader/Network/RTP", + "Depayload an AMR audio stream from RTP packets (RFC 3267)", + "Sebastian Dröge ", + ) + }); + + Some(&*ELEMENT_METADATA) + } + + fn pad_templates() -> &'static [gst::PadTemplate] { + static PAD_TEMPLATES: LazyLock> = LazyLock::new(|| { + let sink_pad_template = gst::PadTemplate::new( + "sink", + gst::PadDirection::Sink, + gst::PadPresence::Always, + &gst::Caps::builder_full() + .structure( + gst::Structure::builder("application/x-rtp") + .field("media", "audio") + .field("encoding-name", "AMR") + .field("clock-rate", 8_000i32) + .build(), + ) + .structure( + gst::Structure::builder("application/x-rtp") + .field("media", "audio") + .field("encoding-name", "AMR-WB") + .field("clock-rate", 16_000i32) + .build(), + ) + .build(), + ) + .unwrap(); + + let src_pad_template = gst::PadTemplate::new( + "src", + gst::PadDirection::Src, + gst::PadPresence::Always, + &gst::Caps::builder_full() + .structure( + gst::Structure::builder("audio/AMR") + .field("channels", 1i32) + .field("rate", 8_000i32) + .build(), + ) + .structure( + gst::Structure::builder("audio/AMR-WB") + .field("channels", 1i32) + .field("rate", 16_000i32) + .build(), + ) + .build(), + ) + .unwrap(); + + vec![src_pad_template, sink_pad_template] + }); + + PAD_TEMPLATES.as_ref() + } +} +impl RtpBaseDepay2Impl for RtpAmrDepay { + const ALLOWED_META_TAGS: &'static [&'static str] = &["audio"]; + + fn start(&self) -> Result<(), gst::ErrorMessage> { + let mut state = self.state.borrow_mut(); + *state = State::default(); + + Ok(()) + } + + fn stop(&self) -> Result<(), gst::ErrorMessage> { + let mut state = self.state.borrow_mut(); + *state = State::default(); + + Ok(()) + } + + fn set_sink_caps(&self, caps: &gst::Caps) -> bool { + let s = caps.structure(0).unwrap(); + + let encoding_name = s.get::<&str>("encoding-name").unwrap(); + + // We currently only support + // + // octet-align={"0", "1" }, default is "0" + // robust-sorting="0", default + // interleaving="0", default + // encoding-params="1", (channels), default + // crc={"0", "1"}, default "0" + + if s.get::<&str>("robust-sorting") + .ok() + .map_or(false, |s| s != "0") + { + gst::error!(CAT, imp = self, "Only robust-sorting=0 supported"); + return false; + } + + if s.get::<&str>("interleaving") + .ok() + .map_or(false, |s| s != "0") + { + gst::error!(CAT, imp = self, "Only interleaving=0 supported"); + return false; + } + + if s.get::<&str>("encoding-params") + .ok() + .map_or(false, |s| s != "1") + { + gst::error!(CAT, imp = self, "Only encoding-params=1 supported"); + return false; + } + + let mut state = self.state.borrow_mut(); + + let has_crc = s.get::<&str>("crc").ok().map_or(false, |s| s != "0"); + let bandwidth_efficient = s.get::<&str>("octet-align").ok().map_or(true, |s| s != "1"); + + if bandwidth_efficient && has_crc { + gst::error!( + CAT, + imp = self, + "CRC not supported in bandwidth-efficient mode" + ); + return false; + } + + let wide_band = match encoding_name { + "AMR" => false, + "AMR-WB" => true, + _ => unreachable!(), + }; + + state.has_crc = has_crc; + state.wide_band = wide_band; + state.bandwidth_efficient = bandwidth_efficient; + + let src_caps = gst::Caps::builder(if wide_band { + "audio/AMR-WB" + } else { + "audio/AMR" + }) + .field("channels", 1i32) + .field("rate", if wide_band { 16_000i32 } else { 8_000i32 }) + .build(); + + self.obj().set_src_caps(&src_caps); + + true + } + + fn handle_packet( + &self, + packet: &crate::basedepay::Packet, + ) -> Result { + let payload = packet.payload(); + + let state = self.state.borrow(); + + let payload_configuration = PayloadConfiguration { + has_crc: state.has_crc, + wide_band: state.wide_band, + }; + + let mut out_data; + let mut num_packets = 0; + let mut cursor = io::Cursor::new(payload); + if state.bandwidth_efficient { + let frame_sizes = if state.wide_band { + WB_FRAME_SIZES.as_slice() + } else { + NB_FRAME_SIZES.as_slice() + }; + + let mut r = BitReader::endian(&mut cursor, BigEndian); + let payload_header = match r.parse_with::(&payload_configuration) { + Ok(payload_header) => payload_header, + Err(err) => { + gst::error!(CAT, imp = self, "Failed parsing payload header: {err}"); + self.obj().drop_packet(packet); + return Ok(gst::FlowSuccess::Ok); + } + }; + + gst::trace!(CAT, imp = self, "Parsed payload header {payload_header:?}"); + + out_data = Vec::with_capacity(payload_header.buffer_size(state.wide_band)); + + 'entries: for toc_entry in &payload_header.toc_entries { + let prev_len = out_data.len(); + + out_data.push(toc_entry.frame_header()); + + if let Some(&frame_size) = frame_sizes.get(toc_entry.frame_type as usize) { + let mut frame_size = frame_size as u32; + + while frame_size > 8 { + match r.read_to::() { + Ok(b) => { + out_data.push(b); + } + Err(_) => { + gst::warning!(CAT, imp = self, "Short packet"); + out_data.truncate(prev_len); + break 'entries; + } + } + frame_size -= 8; + } + + if frame_size > 0 { + match r.read::(frame_size) { + Ok(b) => { + out_data.push(b << (8 - frame_size)); + } + Err(_) => { + gst::warning!(CAT, imp = self, "Short packet"); + out_data.truncate(prev_len); + break 'entries; + } + } + } + } + + num_packets += 1; + } + } else { + let frame_sizes = if state.wide_band { + WB_FRAME_SIZES_BYTES.as_slice() + } else { + NB_FRAME_SIZES_BYTES.as_slice() + }; + + let mut r = ByteReader::endian(&mut cursor, BigEndian); + let payload_header = match r.parse_with::(&payload_configuration) { + Ok(payload_header) => payload_header, + Err(err) => { + gst::error!(CAT, imp = self, "Failed parsing payload header: {err}"); + self.obj().drop_packet(packet); + return Ok(gst::FlowSuccess::Ok); + } + }; + + gst::trace!(CAT, imp = self, "Parsed payload header {payload_header:?}"); + + out_data = Vec::with_capacity(payload_header.buffer_size(state.wide_band)); + + let payload_start = cursor.position() as usize; + let mut data = &payload[payload_start..]; + + for toc_entry in &payload_header.toc_entries { + if let Some(&frame_size) = frame_sizes.get(toc_entry.frame_type as usize) { + let frame_size = frame_size as usize; + + if data.len() < frame_size { + gst::warning!(CAT, imp = self, "Short packet"); + break; + } + out_data.push(toc_entry.frame_header()); + out_data.extend_from_slice(&data[..frame_size]); + data = &data[frame_size..]; + } + + num_packets += 1; + } + } + + gst::trace!( + CAT, + imp = self, + "Finishing buffer of {} bytes with {num_packets} packets", + out_data.len() + ); + + if !out_data.is_empty() { + let mut outbuf = gst::Buffer::from_mut_slice(out_data); + { + let outbuf = outbuf.get_mut().unwrap(); + + outbuf.set_duration(gst::ClockTime::from_mseconds(20) * num_packets); + + if packet.marker_bit() { + outbuf.set_flags(gst::BufferFlags::RESYNC); + } + } + + self.obj().queue_buffer(packet.into(), outbuf)?; + } + + Ok(gst::FlowSuccess::Ok) + } +} diff --git a/net/rtp/src/amr/depay/mod.rs b/net/rtp/src/amr/depay/mod.rs new file mode 100644 index 000000000..1c2ed1d3c --- /dev/null +++ b/net/rtp/src/amr/depay/mod.rs @@ -0,0 +1,26 @@ +// Copyright (C) 2023 Sebastian Dröge +// +// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at +// . +// +// SPDX-License-Identifier: MPL-2.0 + +use gst::glib; +use gst::prelude::*; + +pub mod imp; + +glib::wrapper! { + pub struct RtpAmrDepay(ObjectSubclass) + @extends crate::basedepay::RtpBaseDepay2, gst::Element, gst::Object; +} + +pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { + gst::Element::register( + Some(plugin), + "rtpamrdepay2", + gst::Rank::MARGINAL, + RtpAmrDepay::static_type(), + ) +} diff --git a/net/rtp/src/amr/mod.rs b/net/rtp/src/amr/mod.rs new file mode 100644 index 000000000..737aba939 --- /dev/null +++ b/net/rtp/src/amr/mod.rs @@ -0,0 +1,13 @@ +// Copyright (C) 2023 Sebastian Dröge +// +// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at +// . +// +// SPDX-License-Identifier: MPL-2.0 + +pub mod depay; +pub mod pay; +mod payload_header; +#[cfg(test)] +mod tests; diff --git a/net/rtp/src/amr/pay/imp.rs b/net/rtp/src/amr/pay/imp.rs new file mode 100644 index 000000000..d9a94b2c0 --- /dev/null +++ b/net/rtp/src/amr/pay/imp.rs @@ -0,0 +1,866 @@ +// Copyright (C) 2023 Sebastian Dröge +// +// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at +// . +// +// SPDX-License-Identifier: MPL-2.0 + +use std::{collections::VecDeque, sync::Mutex}; + +use anyhow::anyhow; +/** + * SECTION:element-rtpamrpay2 + * @see_also: rtpamrdepay2, rtpamrpay, rtpamrdepay, amrnbdec, amrnbenc, amrwbdec, voamrwbenc + * + * Payloads an AMR audio stream into RTP packets as per [RFC 3267][rfc-3267]. + * + * [rfc-3267]: https://datatracker.ietf.org/doc/html/rfc3267 + * + * ## Example pipeline + * + * |[ + * gst-launch-1.0 audiotestsrc wave=ticks ! amrnbenc ! rtpamrpay2 ! udpsink host=127.0.0.1 port=5004 + * ]| This will encode an audio test signal as AMR NB audio and payload it as RTP and send it out + * over UDP to localhost port 5004. + * + * Since: 0.14 + */ +use atomic_refcell::AtomicRefCell; + +use bitstream_io::{BigEndian, BitWrite as _, BitWriter, ByteWrite, ByteWriter}; +use gst::{glib, prelude::*, subclass::prelude::*}; +use smallvec::SmallVec; +use std::sync::LazyLock; + +use crate::{ + amr::payload_header::{ + PayloadConfiguration, PayloadHeader, TocEntry, NB_FRAME_SIZES, NB_FRAME_SIZES_BYTES, + WB_FRAME_SIZES, WB_FRAME_SIZES_BYTES, + }, + audio_discont::{AudioDiscont, AudioDiscontConfiguration}, + basepay::{ + PacketToBufferRelation, RtpBasePay2Ext, RtpBasePay2Impl, RtpBasePay2ImplExt, + TimestampOffset, + }, +}; + +struct QueuedBuffer { + /// ID of the buffer. + id: u64, + /// The mapped buffer itself. + buffer: gst::MappedBuffer, + /// Number of frames in this buffer. + num_frames: usize, + /// Offset (in frames) into the buffer if some frames were consumed already. + offset: usize, +} + +#[derive(Default)] +struct State { + /// AMR NB or WB? + wide_band: bool, + /// Whether octet-align is set or not + bandwidth_efficient: bool, + + /// Currently queued buffers + queued_buffers: VecDeque, + /// Queued bytes + queued_bytes: usize, + /// Queued frames + queued_frames: usize, + /// Full queued frames, including already forwarded frames. + full_queued_frames: usize, + + /// Desired "packet time", i.e. packet duration, from the caps, if set. + ptime: Option, + max_ptime: Option, + + audio_discont: AudioDiscont, +} + +#[derive(Clone)] +struct Settings { + max_ptime: Option, + aggregate_mode: super::AggregateMode, + audio_discont: AudioDiscontConfiguration, +} + +#[derive(Default)] +pub struct RtpAmrPay { + state: AtomicRefCell, + settings: Mutex, + is_live: Mutex>, +} + +impl Default for Settings { + fn default() -> Self { + Settings { + max_ptime: None, + aggregate_mode: super::AggregateMode::Auto, + audio_discont: AudioDiscontConfiguration::default(), + } + } +} + +static CAT: LazyLock = LazyLock::new(|| { + gst::DebugCategory::new( + "rtpamrpay2", + gst::DebugColorFlags::empty(), + Some("RTP AMR Payloader"), + ) +}); + +#[glib::object_subclass] +impl ObjectSubclass for RtpAmrPay { + const NAME: &'static str = "GstRtpAmrPay2"; + type Type = super::RtpAmrPay; + type ParentType = crate::basepay::RtpBasePay2; +} + +impl ObjectImpl for RtpAmrPay { + fn properties() -> &'static [glib::ParamSpec] { + static PROPERTIES: LazyLock> = LazyLock::new(|| { + let mut properties = vec![ + glib::ParamSpecEnum::builder_with_default("aggregate-mode", Settings::default().aggregate_mode) + .nick("Aggregate Mode") + .blurb("Whether to send out audio frames immediately or aggregate them until a packet is full.") + .build(), + // Using same type/semantics as C payloaders + glib::ParamSpecInt64::builder("max-ptime") + .nick("Maximum Packet Time") + .blurb("Maximum duration of the packet data in ns (-1 = unlimited up to MTU)") + .default_value( + Settings::default() + .max_ptime + .map(gst::ClockTime::nseconds) + .map(|x| x as i64) + .unwrap_or(-1), + ) + .minimum(-1) + .maximum(i64::MAX) + .mutable_playing() + .build(), + ]; + + properties.extend_from_slice(&AudioDiscontConfiguration::create_pspecs()); + + properties + }); + + PROPERTIES.as_ref() + } + fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) { + if self + .settings + .lock() + .unwrap() + .audio_discont + .set_property(value, pspec) + { + return; + } + + match pspec.name() { + "aggregate-mode" => { + self.settings.lock().unwrap().aggregate_mode = value.get().unwrap(); + } + "max-ptime" => { + let v = value.get::().unwrap(); + self.settings.lock().unwrap().max_ptime = + (v != -1).then_some(gst::ClockTime::from_nseconds(v as u64)); + } + _ => unimplemented!(), + }; + } + + fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value { + if let Some(value) = self.settings.lock().unwrap().audio_discont.property(pspec) { + return value; + } + + match pspec.name() { + "aggregate-mode" => self.settings.lock().unwrap().aggregate_mode.to_value(), + "max-ptime" => (self + .settings + .lock() + .unwrap() + .max_ptime + .map(gst::ClockTime::nseconds) + .map(|x| x as i64) + .unwrap_or(-1)) + .to_value(), + _ => unimplemented!(), + } + } +} + +impl GstObjectImpl for RtpAmrPay {} + +impl ElementImpl for RtpAmrPay { + fn metadata() -> Option<&'static gst::subclass::ElementMetadata> { + static ELEMENT_METADATA: LazyLock = LazyLock::new(|| { + gst::subclass::ElementMetadata::new( + "RTP AMR Payloader", + "Codec/Payloader/Network/RTP", + "Payload an AMR audio stream into RTP packets (RFC 3267)", + "Sebastian Dröge ", + ) + }); + + Some(&*ELEMENT_METADATA) + } + + fn pad_templates() -> &'static [gst::PadTemplate] { + static PAD_TEMPLATES: LazyLock> = LazyLock::new(|| { + let sink_pad_template = gst::PadTemplate::new( + "sink", + gst::PadDirection::Sink, + gst::PadPresence::Always, + &gst::Caps::builder_full() + .structure( + gst::Structure::builder("audio/AMR") + .field("channels", 1i32) + .field("rate", 8_000i32) + .build(), + ) + .structure( + gst::Structure::builder("audio/AMR-WB") + .field("channels", 1i32) + .field("rate", 16_000i32) + .build(), + ) + .build(), + ) + .unwrap(); + + let src_pad_template = gst::PadTemplate::new( + "src", + gst::PadDirection::Src, + gst::PadPresence::Always, + &gst::Caps::builder_full() + .structure( + gst::Structure::builder("application/x-rtp") + .field("media", "audio") + .field("encoding-name", "AMR") + .field("clock-rate", 8_000i32) + .field("encoding-params", "1") + .field("octet-align", gst::List::new(["0", "1"])) + .field("crc", "0") + .field("robust-sorting", "0") + .field("interleaving", "0") + .build(), + ) + .structure( + gst::Structure::builder("application/x-rtp") + .field("media", "audio") + .field("encoding-name", "AMR-WB") + .field("clock-rate", 16_000i32) + .field("encoding-params", "1") + .field("octet-align", gst::List::new(["0", "1"])) + .field("crc", "0") + .field("robust-sorting", "0") + .field("interleaving", "0") + .build(), + ) + .build(), + ) + .unwrap(); + + vec![src_pad_template, sink_pad_template] + }); + + PAD_TEMPLATES.as_ref() + } +} +impl RtpBasePay2Impl for RtpAmrPay { + const ALLOWED_META_TAGS: &'static [&'static str] = &["audio"]; + + fn start(&self) -> Result<(), gst::ErrorMessage> { + *self.state.borrow_mut() = State::default(); + + Ok(()) + } + + fn stop(&self) -> Result<(), gst::ErrorMessage> { + *self.state.borrow_mut() = State::default(); + + Ok(()) + } + + fn set_sink_caps(&self, caps: &gst::Caps) -> bool { + let s = caps.structure(0).unwrap(); + let wide_band = s.name() == "audio/AMR-WB"; + + let src_templ_caps = self.obj().src_pad().pad_template_caps(); + + let src_caps = src_templ_caps + .iter() + .find(|s| { + (s.get::<&str>("encoding-name") == Ok("AMR") && !wide_band) + || (s.get::<&str>("encoding-name") == Ok("AMR-WB") && wide_band) + }) + .map(|s| gst::Caps::from(s.to_owned())) + .unwrap(); + + gst::debug!(CAT, imp = self, "Setting caps {src_caps:?}"); + + self.obj().set_src_caps(&src_caps); + + let mut state = self.state.borrow_mut(); + state.wide_band = wide_band; + + true + } + + fn negotiate(&self, mut src_caps: gst::Caps) { + src_caps.truncate(); + + // Prefer octet-aligned streams. + { + let src_caps = src_caps.make_mut(); + let s = src_caps.structure_mut(0).unwrap(); + s.fixate_field_str("octet-align", "1"); + } + + // Fixate as the first step + src_caps.fixate(); + + let s = src_caps.structure(0).unwrap(); + let bandwidth_efficient = s.get::<&str>("octet-align") != Ok("1"); + + let ptime = s + .get::("ptime") + .ok() + .map(u64::from) + .map(gst::ClockTime::from_mseconds); + + let max_ptime = s + .get::("maxptime") + .ok() + .map(u64::from) + .map(gst::ClockTime::from_mseconds); + + self.parent_negotiate(src_caps); + + let mut state = self.state.borrow_mut(); + state.bandwidth_efficient = bandwidth_efficient; + state.ptime = ptime; + state.max_ptime = max_ptime; + drop(state); + } + + fn drain(&self) -> Result { + let settings = self.settings.lock().unwrap().clone(); + let mut state = self.state.borrow_mut(); + + self.drain_packets(&settings, &mut state, true) + } + + fn flush(&self) { + let mut state = self.state.borrow_mut(); + + state.queued_buffers.clear(); + state.queued_bytes = 0; + state.queued_frames = 0; + state.full_queued_frames = 0; + + state.audio_discont.reset(); + } + + fn handle_buffer( + &self, + buffer: &gst::Buffer, + id: u64, + ) -> Result { + let settings = self.settings.lock().unwrap().clone(); + let mut state = self.state.borrow_mut(); + + let buffer = buffer.clone().into_mapped_buffer_readable().map_err(|_| { + gst::error!(CAT, imp = self, "Can't map buffer readable"); + gst::FlowError::Error + })?; + + let pts = buffer.buffer().pts().unwrap(); + + let mut num_frames = 0; + let iter = AmrIter { + data: &buffer, + wide_band: state.wide_band, + }; + for item in iter { + if let Err(err) = item { + gst::error!(CAT, imp = self, "Invalid AMR buffer: {err}"); + return Err(gst::FlowError::Error); + } + num_frames += 1; + } + + let rate = if state.wide_band { 16_000 } else { 8_000 }; + let num_samples = num_frames * if state.wide_band { 320 } else { 160 }; + + let discont = state.audio_discont.process_input( + &settings.audio_discont, + buffer.buffer().flags().contains(gst::BufferFlags::DISCONT), + rate, + pts, + num_samples, + ); + if discont { + if state.audio_discont.base_pts().is_some() { + gst::debug!(CAT, imp = self, "Draining because of discontinuity"); + self.drain_packets(&settings, &mut state, true)?; + } + + state.audio_discont.resync(pts, num_samples); + } + + state.queued_bytes += buffer.buffer().size(); + state.queued_frames += num_frames; + state.full_queued_frames += num_frames; + state.queued_buffers.push_back(QueuedBuffer { + id, + buffer, + num_frames, + offset: 0, + }); + + // Make sure we have queried upstream liveness if needed + if settings.aggregate_mode == super::AggregateMode::Auto { + self.ensure_upstream_liveness(&settings); + } + + self.drain_packets(&settings, &mut state, false) + } + + #[allow(clippy::single_match)] + fn sink_query(&self, query: &mut gst::QueryRef) -> bool { + match query.view_mut() { + gst::QueryViewMut::Caps(query) => { + let src_tmpl_caps = self.obj().src_pad().pad_template_caps(); + + let peer_caps = self.obj().src_pad().peer_query_caps(Some(&src_tmpl_caps)); + + if peer_caps.is_empty() { + query.set_result(&peer_caps); + return true; + } + + let rtp_amr_nb_caps = gst::Caps::builder("application/x-rtp") + .field("encoding-name", "AMR") + .build(); + let rtp_amr_wb_caps = gst::Caps::builder("application/x-rtp") + .field("encoding-name", "AMR-WB") + .build(); + + let sink_templ_caps = self.obj().sink_pad().pad_template_caps(); + let amr_nb_supported = peer_caps.can_intersect(&rtp_amr_nb_caps); + let amr_wb_supported = peer_caps.can_intersect(&rtp_amr_wb_caps); + + let mut ret_caps_builder = gst::Caps::builder_full(); + for s in sink_templ_caps.iter() { + if (s.name() == "audio/AMR" && amr_nb_supported) + || (s.name() == "audio/AMR-WB" && amr_wb_supported) + { + ret_caps_builder = ret_caps_builder.structure(s.to_owned()); + } + } + + let mut ret_caps = ret_caps_builder.build(); + if let Some(filter) = query.filter() { + ret_caps = ret_caps.intersect_with_mode(filter, gst::CapsIntersectMode::First); + } + + query.set_result(&ret_caps); + + return true; + } + _ => (), + } + + self.parent_sink_query(query) + } + + #[allow(clippy::single_match)] + fn src_query(&self, query: &mut gst::QueryRef) -> bool { + let res = self.parent_src_query(query); + if !res { + return false; + } + + match query.view_mut() { + gst::QueryViewMut::Latency(query) => { + let settings = self.settings.lock().unwrap(); + + let (is_live, mut min, mut max) = query.result(); + + { + let mut live_guard = self.is_live.lock().unwrap(); + + if Some(is_live) != *live_guard { + gst::info!(CAT, imp = self, "Upstream is live: {is_live}"); + *live_guard = Some(is_live); + } + } + + if self.effective_aggregate_mode(&settings) == super::AggregateMode::Aggregate { + if let Some(max_ptime) = settings.max_ptime { + min += max_ptime; + max.opt_add_assign(max_ptime); + } else if is_live { + gst::warning!( + CAT, + imp = self, + "Aggregating packets in live mode, but no max_ptime configured. \ + Configured latency may be too low!" + ); + } + query.set(is_live, min, max); + } + } + _ => (), + } + + true + } +} + +impl RtpAmrPay { + fn drain_packets( + &self, + settings: &Settings, + state: &mut State, + drain: bool, + ) -> Result { + let agg_mode = self.effective_aggregate_mode(settings); + + let max_payload_size = self.obj().max_payload_size() as usize - 1; + let max_ptime = self.calc_effective_max_ptime(settings, state); + + let payload_configuration = PayloadConfiguration { + has_crc: false, + wide_band: state.wide_band, + }; + + // Send out packets if there's enough data for one (or more), or if draining. + while let Some(first) = state.queued_buffers.front() { + let num_buffers = state.queued_buffers.len(); + let queued_bytes = state.queued_bytes; + let queued_frames = state.queued_frames; + let queued_duration = + (gst::ClockTime::from_mseconds(20) * queued_frames as u64).nseconds(); + + // We optimistically add average size/duration to send out packets as early as possible + // if we estimate that the next buffer would likely overflow our accumulation limits. + // + // Th duration is based on the full buffer content as we'd have to wait not just 20ms until + // the next buffer but the average number of frames per buffer times 20ms. + let full_queued_frames = state.full_queued_frames; + let full_queued_duration = + (gst::ClockTime::from_mseconds(20) * full_queued_frames as u64).nseconds(); + let avg_bytes = queued_bytes / queued_frames; + let avg_duration = full_queued_duration / num_buffers as u64; + + let is_ready = drain + || agg_mode != super::AggregateMode::Aggregate + || queued_bytes + avg_bytes > max_payload_size + || (max_ptime.map_or(false, |max_ptime| { + queued_duration + avg_duration > max_ptime + })); + + gst::log!( + CAT, + imp = self, + "Queued: bytes {}, duration ~{}ms, mode: {:?} + drain {} => ready: {}", + queued_bytes, + queued_duration / 1_000_000, + agg_mode, + drain, + is_ready + ); + + if !is_ready { + gst::log!(CAT, imp = self, "Not ready yet, waiting for more data"); + break; + } + + gst::trace!(CAT, imp = self, "Creating packet.."); + + let mut payload_header = PayloadHeader { + cmr: 15, + toc_entries: SmallVec::new(), + crc: SmallVec::new(), + }; + + let mut frame_payloads = SmallVec::<[&[u8]; 16]>::new(); + + let start_offset = first.offset; + let start_id = first.id; + let mut end_id = start_id; + let mut acc_duration = 0; + let mut acc_size = 0; + let discont = state.audio_discont.next_output_offset().is_none(); + + for buffer in &state.queued_buffers { + let iter = AmrIter { + data: &buffer.buffer, + wide_band: state.wide_band, + }; + + for frame in iter.skip(buffer.offset) { + let (frame_type, frame_data) = frame.unwrap(); + gst::trace!( + CAT, + imp = self, + "frame type {frame_type:?}, accumulated size {acc_size} duration ~{}ms", + acc_duration / 1_000_000 + ); + + // If this frame would overflow the packet, bail out and send out what we have. + // + // Don't take into account the max_ptime for the first frame, since it could be + // lower than the frame duration in which case we would never payload anything. + // + // For the size check in bytes we know that the first frame will fit the mtu, + // because we already checked for the "audio frame bigger than mtu" scenario above. + if acc_size + frame_data.len() + 1 > max_payload_size + || (max_ptime.is_some() + && acc_duration > 0 + && acc_duration + 20_000_000 > max_ptime.unwrap()) + { + break; + } + + // ... otherwise add frame to the TOC + payload_header.toc_entries.push(TocEntry { + last: false, + frame_type, + frame_quality_indicator: false, + }); + frame_payloads.push(frame_data); + end_id = buffer.id; + + acc_size += frame_data.len() + 1; + acc_duration += 20_000_000; + + // .. otherwise check if there are more frames we can add to the packet + } + } + + assert!(!payload_header.toc_entries.is_empty()); + payload_header.toc_entries.last_mut().unwrap().last = true; + + let mut payload_buf = SmallVec::<[u8; 1500]>::new(); + let mut packet_builder; + if state.bandwidth_efficient { + let frame_sizes = if state.wide_band { + WB_FRAME_SIZES.as_slice() + } else { + NB_FRAME_SIZES.as_slice() + }; + + payload_buf.reserve(1 + payload_header.toc_entries.len() + acc_size); + + let mut w = BitWriter::endian(&mut payload_buf, BigEndian); + if let Err(err) = w.build_with(&payload_header, &payload_configuration) { + gst::error!(CAT, imp = self, "Failed writing payload header: {err}"); + return Err(gst::FlowError::Error); + } + + for (toc_entry, mut data) in + Iterator::zip(payload_header.toc_entries.iter(), frame_payloads) + { + let mut num_bits = + *frame_sizes.get(toc_entry.frame_type as usize).unwrap_or(&0); + + while num_bits > 8 { + if let Err(err) = w.write_from(data[0]) { + gst::error!(CAT, imp = self, "Failed writing payload: {err}"); + return Err(gst::FlowError::Error); + } + data = &data[1..]; + num_bits -= 8; + } + + if num_bits > 0 { + if let Err(err) = w.write(num_bits as u32, data[0] >> (8 - num_bits)) { + gst::error!(CAT, imp = self, "Failed writing payload: {err}"); + return Err(gst::FlowError::Error); + } + } + } + + let _ = w.byte_align(); + + packet_builder = rtp_types::RtpPacketBuilder::new() + .marker_bit(discont) + .payload(payload_buf.as_slice()); + } else { + payload_buf.reserve(1 + payload_header.toc_entries.len()); + + let mut w = ByteWriter::endian(&mut payload_buf, BigEndian); + if let Err(err) = w.build_with(&payload_header, &payload_configuration) { + gst::error!(CAT, imp = self, "Failed writing payload header: {err}"); + return Err(gst::FlowError::Error); + } + + packet_builder = rtp_types::RtpPacketBuilder::new() + .marker_bit(discont) + .payload(payload_buf.as_slice()); + + for data in frame_payloads { + packet_builder = packet_builder.payload(data); + } + } + + self.obj().queue_packet( + PacketToBufferRelation::IdsWithOffset { + ids: start_id..=end_id, + timestamp_offset: { + if let Some(next_out_offset) = state.audio_discont.next_output_offset() { + TimestampOffset::Rtp(next_out_offset) + } else { + TimestampOffset::Pts( + gst::ClockTime::from_mseconds(20) * start_offset as u64, + ) + } + }, + }, + packet_builder, + )?; + + let mut remaining_frames = payload_header.toc_entries.len(); + while remaining_frames > 0 { + let first = state.queued_buffers.front_mut().unwrap(); + + if remaining_frames >= first.num_frames - first.offset { + remaining_frames -= first.num_frames - first.offset; + let _ = state.queued_buffers.pop_front(); + } else { + first.offset += remaining_frames; + remaining_frames = 0; + } + } + + state.queued_bytes -= acc_size; + state.queued_frames -= payload_header.toc_entries.len(); + state.full_queued_frames -= payload_header.toc_entries.len(); + let acc_samples = + payload_header.toc_entries.len() * if state.wide_band { 320 } else { 160 }; + state.audio_discont.process_output(acc_samples); + } + + gst::log!( + CAT, + imp = self, + "All done for now, {} buffer / {} frames queued", + state.queued_buffers.len(), + state.queued_frames, + ); + + if drain { + self.obj().finish_pending_packets()?; + } + + Ok(gst::FlowSuccess::Ok) + } + + fn effective_aggregate_mode(&self, settings: &Settings) -> super::AggregateMode { + match settings.aggregate_mode { + super::AggregateMode::Auto => match self.is_live() { + Some(true) => super::AggregateMode::ZeroLatency, + Some(false) => super::AggregateMode::Aggregate, + None => super::AggregateMode::ZeroLatency, + }, + mode => mode, + } + } + + fn is_live(&self) -> Option { + *self.is_live.lock().unwrap() + } + + // Query upstream live-ness if needed, in case of aggregate-mode=auto + fn ensure_upstream_liveness(&self, settings: &Settings) { + if settings.aggregate_mode != super::AggregateMode::Auto || self.is_live().is_some() { + return; + } + + let mut q = gst::query::Latency::new(); + let is_live = if self.obj().sink_pad().peer_query(&mut q) { + let (is_live, _, _) = q.result(); + is_live + } else { + false + }; + + *self.is_live.lock().unwrap() = Some(is_live); + + gst::info!(CAT, imp = self, "Upstream is live: {is_live}"); + } + + // We can get max ptime or ptime recommendations/restrictions from multiple places, e.g. the + // "max-ptime" property, but also from "maxptime" or "ptime" values from downstream / an SDP. + // + // Here we look at the various values and decide on an effective max ptime value. + // + // We'll just return the lowest of any set values. + // + fn calc_effective_max_ptime(&self, settings: &Settings, state: &State) -> Option { + [settings.max_ptime, state.max_ptime, state.ptime] + .into_iter() + .filter(|v| v.is_some()) + .min() + .map(|t| t.unwrap().nseconds()) + } +} + +struct AmrIter<'a> { + data: &'a [u8], + wide_band: bool, +} + +impl<'a> Iterator for AmrIter<'a> { + type Item = Result<(u8, &'a [u8]), anyhow::Error>; + + fn next(&mut self) -> Option { + if self.data.is_empty() { + return None; + } + + let frame_sizes = if self.wide_band { + WB_FRAME_SIZES_BYTES.as_slice() + } else { + NB_FRAME_SIZES_BYTES.as_slice() + }; + + let frame_type = (self.data[0] & 0b0111_1000) >> 3; + if !self.wide_band && (9..=14).contains(&frame_type) { + self.data = &[]; + return Some(Err(anyhow!("Invalid AMR frame type {frame_type}"))); + } + if self.wide_band && (10..=13).contains(&frame_type) { + self.data = &[]; + return Some(Err(anyhow!("Invalid AMR-WB frame type {frame_type}"))); + } + + // Empty frames + if frame_type > 10 { + self.data = &self.data[1..]; + return Some(Ok((frame_type, &[]))); + } + + let frame_size = *frame_sizes + .get(frame_type as usize) + .expect("Invalid frame type") as usize; + if self.data.len() < frame_size + 1 { + self.data = &[]; + return Some(Err(anyhow!("Not enough data"))); + } + + let res_data = &self.data[1..][..frame_size]; + self.data = &self.data[(frame_size + 1)..]; + + Some(Ok((frame_type, res_data))) + } +} diff --git a/net/rtp/src/amr/pay/mod.rs b/net/rtp/src/amr/pay/mod.rs new file mode 100644 index 000000000..66774a54f --- /dev/null +++ b/net/rtp/src/amr/pay/mod.rs @@ -0,0 +1,55 @@ +// Copyright (C) 2023 Sebastian Dröge +// +// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at +// . +// +// SPDX-License-Identifier: MPL-2.0 + +use gst::glib; +use gst::prelude::*; + +pub mod imp; + +glib::wrapper! { + pub struct RtpAmrPay(ObjectSubclass) + @extends crate::basepay::RtpBasePay2, gst::Element, gst::Object; +} + +#[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Clone, Copy, glib::Enum)] +#[repr(i32)] +#[enum_type(name = "GstRtpAmrPayAggregateMode")] +#[non_exhaustive] +pub(crate) enum AggregateMode { + #[enum_value( + name = "Automatic: zero-latency if upstream is live, otherwise aggregate frames until packet is full.", + nick = "auto" + )] + Auto = -1, + + #[enum_value( + name = "Zero Latency: always send out frames right away, do not wait for more frames to fill a packet.", + nick = "zero-latency" + )] + ZeroLatency = 0, + + #[enum_value( + name = "Aggregate: collect audio frames until we have a full packet or the max-ptime limit is hit (if set).", + nick = "aggregate" + )] + Aggregate = 1, +} + +pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { + #[cfg(feature = "doc")] + { + AggregateMode::static_type().mark_as_plugin_api(gst::PluginAPIFlags::empty()); + } + + gst::Element::register( + Some(plugin), + "rtpamrpay2", + gst::Rank::MARGINAL, + RtpAmrPay::static_type(), + ) +} diff --git a/net/rtp/src/amr/payload_header.rs b/net/rtp/src/amr/payload_header.rs new file mode 100644 index 000000000..04491f35b --- /dev/null +++ b/net/rtp/src/amr/payload_header.rs @@ -0,0 +1,348 @@ +// Copyright (C) 2023 Sebastian Dröge +// +// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at +// . +// +// SPDX-License-Identifier: MPL-2.0 + +use anyhow::{bail, Context as _}; +use bitstream_io::{FromBitStreamWith, FromByteStreamWith, ToBitStreamWith, ToByteStreamWith}; +use smallvec::SmallVec; + +#[derive(Debug)] +pub struct PayloadConfiguration { + pub has_crc: bool, + pub wide_band: bool, +} + +#[derive(Debug)] +pub struct PayloadHeader { + pub cmr: u8, + // We don't handle interleaving yet so ILL/ILP are not parsed + pub toc_entries: SmallVec<[TocEntry; 16]>, + #[allow(unused)] + pub crc: SmallVec<[u8; 16]>, +} + +impl PayloadHeader { + pub fn buffer_size(&self, wide_band: bool) -> usize { + let frame_sizes = if wide_band { + WB_FRAME_SIZES_BYTES.as_slice() + } else { + NB_FRAME_SIZES_BYTES.as_slice() + }; + + self.toc_entries + .iter() + .map(|entry| 1 + *frame_sizes.get(entry.frame_type as usize).unwrap_or(&0) as usize) + .sum() + } +} + +impl FromByteStreamWith<'_> for PayloadHeader { + type Error = anyhow::Error; + + type Context = PayloadConfiguration; + + fn from_reader( + r: &mut R, + cfg: &Self::Context, + ) -> Result + where + Self: Sized, + { + let b = r.read::().context("cmr")?; + let cmr = (b & 0b1111_0000) >> 4; + + let mut toc_entries = SmallVec::<[TocEntry; 16]>::new(); + loop { + let toc_entry = r.parse_with::(cfg).context("toc_entry")?; + let last = toc_entry.last; + toc_entries.push(toc_entry); + + if last { + break; + } + } + + let mut crc = SmallVec::<[u8; 16]>::new(); + if cfg.has_crc { + for toc_entry in &toc_entries { + // Frame types without payload + if toc_entry.frame_type > 9 { + continue; + } + let c = r.read::().context("crc")?; + crc.push(c); + } + } + + Ok(PayloadHeader { + cmr, + toc_entries, + crc, + }) + } +} + +impl FromBitStreamWith<'_> for PayloadHeader { + type Error = anyhow::Error; + + type Context = PayloadConfiguration; + + fn from_reader( + r: &mut R, + cfg: &Self::Context, + ) -> Result + where + Self: Sized, + { + if cfg.has_crc { + bail!("CRC not allowed in bandwidth-efficient mode"); + } + + let cmr = r.read::(4).context("cmr")?; + + let mut toc_entries = SmallVec::<[TocEntry; 16]>::new(); + loop { + let toc_entry = r.parse_with::(cfg).context("toc_entry")?; + let last = toc_entry.last; + toc_entries.push(toc_entry); + + if last { + break; + } + } + + let crc = SmallVec::<[u8; 16]>::new(); + + Ok(PayloadHeader { + cmr, + toc_entries, + crc, + }) + } +} + +impl ToByteStreamWith<'_> for PayloadHeader { + type Error = anyhow::Error; + + type Context = PayloadConfiguration; + + fn to_writer( + &self, + w: &mut W, + cfg: &Self::Context, + ) -> Result<(), Self::Error> + where + Self: Sized, + { + if cfg.has_crc { + bail!("Writing CRC not supported"); + } + if self.cmr < 0b1111 { + bail!("Invalid CMR value"); + } + if self.toc_entries.is_empty() { + bail!("No TOC entries"); + } + + w.write::(self.cmr << 4).context("cmr")?; + + for (i, entry) in self.toc_entries.iter().enumerate() { + let mut entry = entry.clone(); + entry.last = i == self.toc_entries.len() - 1; + + w.build_with::(&entry, cfg).context("toc_entry")?; + } + + Ok(()) + } +} + +impl ToBitStreamWith<'_> for PayloadHeader { + type Error = anyhow::Error; + + type Context = PayloadConfiguration; + + fn to_writer( + &self, + w: &mut W, + cfg: &Self::Context, + ) -> Result<(), Self::Error> + where + Self: Sized, + { + if cfg.has_crc { + bail!("Writing CRC not supported"); + } + if self.cmr < 0b1111 { + bail!("Invalid CMR value"); + } + if self.toc_entries.is_empty() { + bail!("No TOC entries"); + } + + w.write::(4, self.cmr).context("cmr")?; + + for (i, entry) in self.toc_entries.iter().enumerate() { + let mut entry = entry.clone(); + entry.last = i == self.toc_entries.len() - 1; + + w.build_with::(&entry, cfg).context("toc_entry")?; + } + + Ok(()) + } +} + +#[derive(Debug, Clone)] +pub struct TocEntry { + pub frame_type: u8, + pub frame_quality_indicator: bool, + pub last: bool, +} + +impl TocEntry { + pub fn frame_header(&self) -> u8 { + self.frame_type << 3 | (self.frame_quality_indicator as u8) << 2 + } +} + +impl FromByteStreamWith<'_> for TocEntry { + type Error = anyhow::Error; + + type Context = PayloadConfiguration; + + fn from_reader( + r: &mut R, + cfg: &Self::Context, + ) -> Result + where + Self: Sized, + { + let b = r.read::().context("toc_entry")?; + + let last = (b & 0b1000_0000) == 0; + let frame_type = (b & 0b0111_1000) >> 3; + let frame_quality_indicator = (b & 0b0000_0100) != 0; + + if !cfg.wide_band && (9..=14).contains(&frame_type) { + bail!("Invalid AMR frame type {frame_type}"); + } + if cfg.wide_band && (10..=13).contains(&frame_type) { + bail!("Invalid AMR-WB frame type {frame_type}"); + } + + Ok(TocEntry { + frame_type, + frame_quality_indicator, + last, + }) + } +} + +impl FromBitStreamWith<'_> for TocEntry { + type Error = anyhow::Error; + + type Context = PayloadConfiguration; + + fn from_reader( + r: &mut R, + cfg: &Self::Context, + ) -> Result + where + Self: Sized, + { + let last = !r.read_bit().context("last")?; + let frame_type = r.read::(4).context("frame_type")?; + let frame_quality_indicator = r.read_bit().context("q")?; + + if !cfg.wide_band && (9..=14).contains(&frame_type) { + bail!("Invalid AMR frame type {frame_type}"); + } + if cfg.wide_band && (10..=13).contains(&frame_type) { + bail!("Invalid AMR-WB frame type {frame_type}"); + } + + Ok(TocEntry { + frame_type, + frame_quality_indicator, + last, + }) + } +} + +impl ToByteStreamWith<'_> for TocEntry { + type Error = anyhow::Error; + + type Context = PayloadConfiguration; + + fn to_writer( + &self, + w: &mut W, + cfg: &Self::Context, + ) -> Result<(), Self::Error> + where + Self: Sized, + { + if !cfg.wide_band && (9..=14).contains(&self.frame_type) { + bail!("Invalid AMR frame type {}", self.frame_type); + } + if cfg.wide_band && (10..=13).contains(&self.frame_type) { + bail!("Invalid AMR-WB frame type {}", self.frame_type); + } + if self.frame_type > 15 { + bail!("Invalid AMR frame type {}", self.frame_type); + } + + let b = ((!self.last as u8) << 7) + | (self.frame_type << 3) + | ((self.frame_quality_indicator as u8) << 2); + + w.write::(b).context("toc_entry")?; + + Ok(()) + } +} + +impl ToBitStreamWith<'_> for TocEntry { + type Error = anyhow::Error; + + type Context = PayloadConfiguration; + + fn to_writer( + &self, + w: &mut W, + cfg: &Self::Context, + ) -> Result<(), Self::Error> + where + Self: Sized, + { + if !cfg.wide_band && (9..=14).contains(&self.frame_type) { + bail!("Invalid AMR frame type {}", self.frame_type); + } + if cfg.wide_band && (10..=13).contains(&self.frame_type) { + bail!("Invalid AMR-WB frame type {}", self.frame_type); + } + if self.frame_type > 15 { + bail!("Invalid AMR frame type {}", self.frame_type); + } + + w.write_bit(!self.last).context("last")?; + w.write::(4, self.frame_type).context("frame_type")?; + w.write_bit(self.frame_quality_indicator) + .context("frame_quality_indicator")?; + + Ok(()) + } +} + +// See RFC3267 Table 1 +pub static NB_FRAME_SIZES: [u16; 9] = [95, 103, 118, 134, 148, 159, 204, 244, 39]; +pub static NB_FRAME_SIZES_BYTES: [u8; 9] = [12, 13, 15, 17, 19, 20, 26, 31, 5]; + +// See ETSI TS 126 201 Table 2 and 3 +pub static WB_FRAME_SIZES: [u16; 10] = [132, 177, 253, 285, 317, 365, 397, 461, 477, 40]; +pub static WB_FRAME_SIZES_BYTES: [u8; 10] = [17, 23, 32, 36, 40, 46, 50, 58, 60, 5]; diff --git a/net/rtp/src/amr/tests/mod.rs b/net/rtp/src/amr/tests/mod.rs new file mode 100644 index 000000000..517368ed4 --- /dev/null +++ b/net/rtp/src/amr/tests/mod.rs @@ -0,0 +1,544 @@ +// Copyright (C) 2024 Sebastian Dröge +// +// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at +// . +// +// SPDX-License-Identifier: MPL-2.0 + +use crate::tests::{run_test_pipeline, ExpectedBuffer, ExpectedPacket, Source}; + +fn init() { + use std::sync::Once; + static INIT: Once = Once::new(); + + INIT.call_once(|| { + gst::init().unwrap(); + crate::plugin_register_static().expect("rtpamr test"); + }); +} + +// 6 encoded frames of 32 bytes / 20ms +static AMR_NB_DATA: &[u8] = include_bytes!("test.amrnb"); + +fn get_amr_nb_data() -> (gst::Caps, Vec) { + let caps = gst::Caps::builder("audio/AMR") + .field("rate", 8_000i32) + .field("channels", 1i32) + .build(); + + let buffers = AMR_NB_DATA + .chunks_exact(32) + .enumerate() + .map(|(idx, c)| { + let mut buf = gst::Buffer::from_slice(c); + + { + let buf = buf.get_mut().unwrap(); + buf.set_pts(idx as u64 * gst::ClockTime::from_mseconds(20)); + buf.set_duration(gst::ClockTime::from_mseconds(20)); + if idx == 0 { + buf.set_flags(gst::BufferFlags::DISCONT); + } + } + + buf + }) + .collect(); + + (caps, buffers) +} + +// 4 encoded frames of 18 bytes / 20ms +static AMR_WB_DATA: &[u8] = include_bytes!("test.amrwb"); + +fn get_amr_wb_data() -> (gst::Caps, Vec) { + let caps = gst::Caps::builder("audio/AMR-WB") + .field("rate", 16_000i32) + .field("channels", 1i32) + .build(); + + let buffers = AMR_WB_DATA + .chunks_exact(18) + .enumerate() + .map(|(idx, c)| { + let mut buf = gst::Buffer::from_slice(c); + + { + let buf = buf.get_mut().unwrap(); + buf.set_pts(idx as u64 * gst::ClockTime::from_mseconds(20)); + buf.set_duration(gst::ClockTime::from_mseconds(20)); + if idx == 0 { + buf.set_flags(gst::BufferFlags::DISCONT); + } + } + + buf + }) + .collect(); + + (caps, buffers) +} + +#[test] +fn test_amr_nb() { + init(); + + let (caps, buffers) = get_amr_nb_data(); + let pay = "rtpamrpay2 aggregate-mode=zero-latency"; + let depay = "rtpamrdepay2"; + + let expected_pay = vec![ + vec![ExpectedPacket::builder() + .pts(gst::ClockTime::from_mseconds(0)) + .flags(gst::BufferFlags::DISCONT | gst::BufferFlags::MARKER) + .pt(96) + .rtp_time(0) + .marker_bit(true) + .size(45) + .build()], + vec![ExpectedPacket::builder() + .pts(gst::ClockTime::from_mseconds(20)) + .flags(gst::BufferFlags::empty()) + .pt(96) + .rtp_time(160) + .marker_bit(false) + .size(45) + .build()], + vec![ExpectedPacket::builder() + .pts(gst::ClockTime::from_mseconds(40)) + .flags(gst::BufferFlags::empty()) + .pt(96) + .rtp_time(320) + .marker_bit(false) + .size(45) + .build()], + vec![ExpectedPacket::builder() + .pts(gst::ClockTime::from_mseconds(60)) + .flags(gst::BufferFlags::empty()) + .pt(96) + .rtp_time(480) + .marker_bit(false) + .size(45) + .build()], + vec![ExpectedPacket::builder() + .pts(gst::ClockTime::from_mseconds(80)) + .flags(gst::BufferFlags::empty()) + .pt(96) + .rtp_time(640) + .marker_bit(false) + .size(45) + .build()], + vec![ExpectedPacket::builder() + .pts(gst::ClockTime::from_mseconds(100)) + .flags(gst::BufferFlags::empty()) + .pt(96) + .rtp_time(800) + .marker_bit(false) + .size(45) + .build()], + ]; + + let expected_depay = vec![ + vec![ExpectedBuffer::builder() + .pts(gst::ClockTime::from_mseconds(0)) + .size(32) + .flags(gst::BufferFlags::DISCONT | gst::BufferFlags::RESYNC) + .build()], + vec![ExpectedBuffer::builder() + .pts(gst::ClockTime::from_mseconds(20)) + .size(32) + .flags(gst::BufferFlags::empty()) + .build()], + vec![ExpectedBuffer::builder() + .pts(gst::ClockTime::from_mseconds(40)) + .size(32) + .flags(gst::BufferFlags::empty()) + .build()], + vec![ExpectedBuffer::builder() + .pts(gst::ClockTime::from_mseconds(60)) + .size(32) + .flags(gst::BufferFlags::empty()) + .build()], + vec![ExpectedBuffer::builder() + .pts(gst::ClockTime::from_mseconds(80)) + .size(32) + .flags(gst::BufferFlags::empty()) + .build()], + vec![ExpectedBuffer::builder() + .pts(gst::ClockTime::from_mseconds(100)) + .size(32) + .flags(gst::BufferFlags::empty()) + .build()], + ]; + + run_test_pipeline( + Source::Buffers(caps, buffers), + pay, + depay, + expected_pay, + expected_depay, + ); +} + +#[test] +fn test_amr_nb_bit_packed() { + init(); + + let (caps, buffers) = get_amr_nb_data(); + let pay = "rtpamrpay2 aggregate-mode=zero-latency ! application/x-rtp,octet-align=(string)0"; + let depay = "rtpamrdepay2"; + + let expected_pay = vec![ + vec![ExpectedPacket::builder() + .pts(gst::ClockTime::from_mseconds(0)) + .flags(gst::BufferFlags::DISCONT | gst::BufferFlags::MARKER) + .pt(96) + .rtp_time(0) + .marker_bit(true) + .size(45) + .build()], + vec![ExpectedPacket::builder() + .pts(gst::ClockTime::from_mseconds(20)) + .flags(gst::BufferFlags::empty()) + .pt(96) + .rtp_time(160) + .marker_bit(false) + .size(45) + .build()], + vec![ExpectedPacket::builder() + .pts(gst::ClockTime::from_mseconds(40)) + .flags(gst::BufferFlags::empty()) + .pt(96) + .rtp_time(320) + .marker_bit(false) + .size(45) + .build()], + vec![ExpectedPacket::builder() + .pts(gst::ClockTime::from_mseconds(60)) + .flags(gst::BufferFlags::empty()) + .pt(96) + .rtp_time(480) + .marker_bit(false) + .size(45) + .build()], + vec![ExpectedPacket::builder() + .pts(gst::ClockTime::from_mseconds(80)) + .flags(gst::BufferFlags::empty()) + .pt(96) + .rtp_time(640) + .marker_bit(false) + .size(45) + .build()], + vec![ExpectedPacket::builder() + .pts(gst::ClockTime::from_mseconds(100)) + .flags(gst::BufferFlags::empty()) + .pt(96) + .rtp_time(800) + .marker_bit(false) + .size(45) + .build()], + ]; + + let expected_depay = vec![ + vec![ExpectedBuffer::builder() + .pts(gst::ClockTime::from_mseconds(0)) + .size(32) + .flags(gst::BufferFlags::DISCONT | gst::BufferFlags::RESYNC) + .build()], + vec![ExpectedBuffer::builder() + .pts(gst::ClockTime::from_mseconds(20)) + .size(32) + .flags(gst::BufferFlags::empty()) + .build()], + vec![ExpectedBuffer::builder() + .pts(gst::ClockTime::from_mseconds(40)) + .size(32) + .flags(gst::BufferFlags::empty()) + .build()], + vec![ExpectedBuffer::builder() + .pts(gst::ClockTime::from_mseconds(60)) + .size(32) + .flags(gst::BufferFlags::empty()) + .build()], + vec![ExpectedBuffer::builder() + .pts(gst::ClockTime::from_mseconds(80)) + .size(32) + .flags(gst::BufferFlags::empty()) + .build()], + vec![ExpectedBuffer::builder() + .pts(gst::ClockTime::from_mseconds(100)) + .size(32) + .flags(gst::BufferFlags::empty()) + .build()], + ]; + + run_test_pipeline( + Source::Buffers(caps, buffers), + pay, + depay, + expected_pay, + expected_depay, + ); +} + +#[test] +fn test_amr_nb_aggregate() { + init(); + + let (caps, buffers) = get_amr_nb_data(); + let pay = "rtpamrpay2 max-ptime=40000000"; + let depay = "rtpamrdepay2"; + + let expected_pay = vec![ + vec![ExpectedPacket::builder() + .pts(gst::ClockTime::from_mseconds(0)) + .flags(gst::BufferFlags::DISCONT | gst::BufferFlags::MARKER) + .pt(96) + .rtp_time(0) + .marker_bit(true) + .size(77) + .build()], + vec![ExpectedPacket::builder() + .pts(gst::ClockTime::from_mseconds(40)) + .flags(gst::BufferFlags::empty()) + .pt(96) + .rtp_time(320) + .marker_bit(false) + .size(77) + .build()], + vec![ExpectedPacket::builder() + .pts(gst::ClockTime::from_mseconds(80)) + .flags(gst::BufferFlags::empty()) + .pt(96) + .rtp_time(640) + .marker_bit(false) + .size(77) + .build()], + ]; + + let expected_depay = vec![ + vec![ExpectedBuffer::builder() + .pts(gst::ClockTime::from_mseconds(0)) + .size(64) + .flags(gst::BufferFlags::DISCONT | gst::BufferFlags::RESYNC) + .build()], + vec![ExpectedBuffer::builder() + .pts(gst::ClockTime::from_mseconds(40)) + .size(64) + .flags(gst::BufferFlags::empty()) + .build()], + vec![ExpectedBuffer::builder() + .pts(gst::ClockTime::from_mseconds(80)) + .size(64) + .flags(gst::BufferFlags::empty()) + .build()], + ]; + + run_test_pipeline( + Source::Buffers(caps, buffers), + pay, + depay, + expected_pay, + expected_depay, + ); +} + +#[test] +fn test_amr_wb() { + init(); + + let (caps, buffers) = get_amr_wb_data(); + let pay = "rtpamrpay2 aggregate-mode=zero-latency"; + let depay = "rtpamrdepay2"; + + let expected_pay = vec![ + vec![ExpectedPacket::builder() + .pts(gst::ClockTime::from_mseconds(0)) + .flags(gst::BufferFlags::DISCONT | gst::BufferFlags::MARKER) + .pt(96) + .rtp_time(0) + .marker_bit(true) + .size(31) + .build()], + vec![ExpectedPacket::builder() + .pts(gst::ClockTime::from_mseconds(20)) + .flags(gst::BufferFlags::empty()) + .pt(96) + .rtp_time(320) + .marker_bit(false) + .size(31) + .build()], + vec![ExpectedPacket::builder() + .pts(gst::ClockTime::from_mseconds(40)) + .flags(gst::BufferFlags::empty()) + .pt(96) + .rtp_time(640) + .marker_bit(false) + .size(31) + .build()], + vec![ExpectedPacket::builder() + .pts(gst::ClockTime::from_mseconds(60)) + .flags(gst::BufferFlags::empty()) + .pt(96) + .rtp_time(960) + .marker_bit(false) + .size(31) + .build()], + ]; + + let expected_depay = vec![ + vec![ExpectedBuffer::builder() + .pts(gst::ClockTime::from_mseconds(0)) + .size(18) + .flags(gst::BufferFlags::DISCONT | gst::BufferFlags::RESYNC) + .build()], + vec![ExpectedBuffer::builder() + .pts(gst::ClockTime::from_mseconds(20)) + .size(18) + .flags(gst::BufferFlags::empty()) + .build()], + vec![ExpectedBuffer::builder() + .pts(gst::ClockTime::from_mseconds(40)) + .size(18) + .flags(gst::BufferFlags::empty()) + .build()], + vec![ExpectedBuffer::builder() + .pts(gst::ClockTime::from_mseconds(60)) + .size(18) + .flags(gst::BufferFlags::empty()) + .build()], + ]; + + run_test_pipeline( + Source::Buffers(caps, buffers), + pay, + depay, + expected_pay, + expected_depay, + ); +} + +#[test] +fn test_amr_wb_bit_packed() { + init(); + + let (caps, buffers) = get_amr_wb_data(); + let pay = "rtpamrpay2 aggregate-mode=zero-latency ! application/x-rtp,octet-align=(string)0"; + let depay = "rtpamrdepay2"; + + let expected_pay = vec![ + vec![ExpectedPacket::builder() + .pts(gst::ClockTime::from_mseconds(0)) + .flags(gst::BufferFlags::DISCONT | gst::BufferFlags::MARKER) + .pt(96) + .rtp_time(0) + .marker_bit(true) + .size(30) + .build()], + vec![ExpectedPacket::builder() + .pts(gst::ClockTime::from_mseconds(20)) + .flags(gst::BufferFlags::empty()) + .pt(96) + .rtp_time(320) + .marker_bit(false) + .size(30) + .build()], + vec![ExpectedPacket::builder() + .pts(gst::ClockTime::from_mseconds(40)) + .flags(gst::BufferFlags::empty()) + .pt(96) + .rtp_time(640) + .marker_bit(false) + .size(30) + .build()], + vec![ExpectedPacket::builder() + .pts(gst::ClockTime::from_mseconds(60)) + .flags(gst::BufferFlags::empty()) + .pt(96) + .rtp_time(960) + .marker_bit(false) + .size(30) + .build()], + ]; + + let expected_depay = vec![ + vec![ExpectedBuffer::builder() + .pts(gst::ClockTime::from_mseconds(0)) + .size(18) + .flags(gst::BufferFlags::DISCONT | gst::BufferFlags::RESYNC) + .build()], + vec![ExpectedBuffer::builder() + .pts(gst::ClockTime::from_mseconds(20)) + .size(18) + .flags(gst::BufferFlags::empty()) + .build()], + vec![ExpectedBuffer::builder() + .pts(gst::ClockTime::from_mseconds(40)) + .size(18) + .flags(gst::BufferFlags::empty()) + .build()], + vec![ExpectedBuffer::builder() + .pts(gst::ClockTime::from_mseconds(60)) + .size(18) + .flags(gst::BufferFlags::empty()) + .build()], + ]; + + run_test_pipeline( + Source::Buffers(caps, buffers), + pay, + depay, + expected_pay, + expected_depay, + ); +} + +#[test] +fn test_amr_wb_aggregate() { + init(); + + let (caps, buffers) = get_amr_wb_data(); + let pay = "rtpamrpay2 max-ptime=40000000"; + let depay = "rtpamrdepay2"; + + let expected_pay = vec![ + vec![ExpectedPacket::builder() + .pts(gst::ClockTime::from_mseconds(0)) + .flags(gst::BufferFlags::DISCONT | gst::BufferFlags::MARKER) + .pt(96) + .rtp_time(0) + .marker_bit(true) + .size(49) + .build()], + vec![ExpectedPacket::builder() + .pts(gst::ClockTime::from_mseconds(40)) + .flags(gst::BufferFlags::empty()) + .pt(96) + .rtp_time(640) + .marker_bit(false) + .size(49) + .build()], + ]; + + let expected_depay = vec![ + vec![ExpectedBuffer::builder() + .pts(gst::ClockTime::from_mseconds(0)) + .size(36) + .flags(gst::BufferFlags::DISCONT | gst::BufferFlags::RESYNC) + .build()], + vec![ExpectedBuffer::builder() + .pts(gst::ClockTime::from_mseconds(40)) + .size(36) + .flags(gst::BufferFlags::empty()) + .build()], + ]; + + run_test_pipeline( + Source::Buffers(caps, buffers), + pay, + depay, + expected_pay, + expected_depay, + ); +} diff --git a/net/rtp/src/amr/tests/test.amrnb b/net/rtp/src/amr/tests/test.amrnb new file mode 100644 index 0000000000000000000000000000000000000000..39cb1ffc6e47e68afa9102c6889013570efacadb GIT binary patch literal 192 zcmcDSC@!`yt@7am#^>ulFfcGQ05LMK@hDfB29e(dl0Sef31r)ZGIVT%$e+iq|ED}u Re-~E$A%ErZ>;H;ZKLE8jE~)?k literal 0 HcmV?d00001 diff --git a/net/rtp/src/amr/tests/test.amrwb b/net/rtp/src/amr/tests/test.amrwb new file mode 100644 index 0000000000000000000000000000000000000000..27b2ea4c4f8171f0ef626ed2eb65f2ef5c153116 GIT binary patch literal 72 zcmZP(P-L)_HNP^kq~=h?iw7N#9T2NwfsKp-ECL1$3Nq=(oje2><{9 literal 0 HcmV?d00001 diff --git a/net/rtp/src/lib.rs b/net/rtp/src/lib.rs index 4c6a2b81c..11979bf5f 100644 --- a/net/rtp/src/lib.rs +++ b/net/rtp/src/lib.rs @@ -31,6 +31,7 @@ mod basedepay; mod basepay; mod ac3; +mod amr; mod av1; mod jpeg; mod klv; @@ -64,6 +65,9 @@ fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { ac3::depay::register(plugin)?; ac3::pay::register(plugin)?; + amr::depay::register(plugin)?; + amr::pay::register(plugin)?; + av1::depay::register(plugin)?; av1::pay::register(plugin)?;