From 6b628485c5e0d4749f0c0dbae4c8b635c6265728 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tim-Philipp=20M=C3=BCller?= Date: Tue, 19 Dec 2023 13:16:29 +0000 Subject: [PATCH] rtp: Add AC-3 RTP payloader/depayloader Part-of: --- docs/plugins/gst_plugins_cache.json | 100 ++++ net/rtp/src/ac3/ac3_audio_utils.rs | 122 +++++ net/rtp/src/ac3/depay/imp.rs | 406 ++++++++++++++++ net/rtp/src/ac3/depay/mod.rs | 28 ++ net/rtp/src/ac3/mod.rs | 6 + net/rtp/src/ac3/pay/imp.rs | 711 ++++++++++++++++++++++++++++ net/rtp/src/ac3/pay/mod.rs | 57 +++ net/rtp/src/lib.rs | 4 + typos.toml | 3 + 9 files changed, 1437 insertions(+) create mode 100644 net/rtp/src/ac3/ac3_audio_utils.rs create mode 100644 net/rtp/src/ac3/depay/imp.rs create mode 100644 net/rtp/src/ac3/depay/mod.rs create mode 100644 net/rtp/src/ac3/mod.rs create mode 100644 net/rtp/src/ac3/pay/imp.rs create mode 100644 net/rtp/src/ac3/pay/mod.rs diff --git a/docs/plugins/gst_plugins_cache.json b/docs/plugins/gst_plugins_cache.json index 359cc51c..f72f0657 100644 --- a/docs/plugins/gst_plugins_cache.json +++ b/docs/plugins/gst_plugins_cache.json @@ -7295,6 +7295,86 @@ "rsrtp": { "description": "GStreamer Rust RTP Plugin", "elements": { + "rtpac3depay2": { + "author": "Tim-Philipp Müller ", + "description": "Depayload an AC-3 Audio Stream from RTP packets (RFC 4184)", + "hierarchy": [ + "GstRtpAc3Depay", + "GstRtpBaseDepay2", + "GstElement", + "GstObject", + "GInitiallyUnowned", + "GObject" + ], + "klass": "Codec/Depayloader/Network/RTP", + "pad-templates": { + "sink": { + "caps": "application/x-rtp:\n media: audio\n encoding-name: AC3\n clock-rate: { (int)48000, (int)44100, (int)32000 }\n", + "direction": "sink", + "presence": "always" + }, + "src": { + "caps": "audio/x-ac3:\n channels: [ 1, 6 ]\n rate: { (int)48000, (int)44100, (int)32000 }\n framed: true\n alignment: frame\n", + "direction": "src", + "presence": "always" + } + }, + "rank": "marginal" + }, + "rtpac3pay2": { + "author": "Tim-Philipp Müller ", + "description": "Payload an AC-3 Audio Elementary Stream into RTP packets (RFC 4184)", + "hierarchy": [ + "GstRtpAc3Pay", + "GstRtpBasePay2", + "GstElement", + "GstObject", + "GInitiallyUnowned", + "GObject" + ], + "klass": "Codec/Payloader/Network/RTP", + "pad-templates": { + "sink": { + "caps": "audio/x-ac3:\n rate: { (int)48000, (int)44100, (int)32000 }\n channels: [ 1, 6 ]\n framed: true\n alignment: frame\n", + "direction": "sink", + "presence": "always" + }, + "src": { + "caps": "application/x-rtp:\n media: audio\n encoding-name: AC3\n clock-rate: { (int)48000, (int)44100, (int)32000 }\n", + "direction": "src", + "presence": "always" + } + }, + "properties": { + "aggregate-mode": { + "blurb": "Whether to send out audio frames immediately or aggregate them until a packet is full.", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "auto (-1)", + "mutable": "null", + "readable": true, + "type": "GstRtpAc3PayAggregateMode", + "writable": true + }, + "max-ptime": { + "blurb": "Maximum duration of the packet data in ns (-1 = unlimited up to MTU)", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "18446744073709551615", + "max": "9223372036854775807", + "min": "-1", + "mutable": "playing", + "readable": true, + "type": "gint64", + "writable": true + } + }, + "rank": "marginal" + }, "rtpav1depay": { "author": "Vivienne Watermeier ", "description": "Depayload AV1 from RTP packets", @@ -8410,6 +8490,26 @@ } } }, + "GstRtpAc3PayAggregateMode": { + "kind": "enum", + "values": [ + { + "desc": "Automatic: zero-latency if upstream is live, otherwise aggregate frames until packet is full.", + "name": "auto", + "value": "-1" + }, + { + "desc": "Zero Latency: always send out frames right away, do not wait for more frames to fill a packet.", + "name": "zero-latency", + "value": "0" + }, + { + "desc": "Aggregate: collect audio frames until we have a full packet or the max-ptime limit is hit (if set).", + "name": "aggregate", + "value": "1" + } + ] + }, "GstRtpBaseAudioPay2": { "hierarchy": [ "GstRtpBaseAudioPay2", diff --git a/net/rtp/src/ac3/ac3_audio_utils.rs b/net/rtp/src/ac3/ac3_audio_utils.rs new file mode 100644 index 00000000..f38b7143 --- /dev/null +++ b/net/rtp/src/ac3/ac3_audio_utils.rs @@ -0,0 +1,122 @@ +// GStreamer RTP AC-3 Audio Utility Functions +// +// Copyright (C) 2023 Tim-Philipp Müller +// +// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at +// . +// +// SPDX-License-Identifier: MPL-2.0 + +use gst::prelude::MulDiv; + +const AC3_SAMPLES_PER_FRAME: u16 = 1536; + +#[derive(Debug, Clone)] +pub(crate) struct FrameHeader { + pub channels: u16, + pub sample_rate: u16, + pub frame_len: usize, +} + +impl PartialEq for FrameHeader { + fn eq(&self, other: &Self) -> bool { + self.sample_rate == other.sample_rate && self.channels == other.channels + } +} + +impl FrameHeader { + pub(crate) fn duration(&self) -> u64 { + let samples = AC3_SAMPLES_PER_FRAME as u64; + let sample_rate = self.sample_rate as u64; + + samples + .mul_div_ceil(*gst::ClockTime::SECOND, sample_rate) + .unwrap() + } +} + +const FRAME_LENS_32000: [u16; 38] = [ + 96u16, 96, 120, 120, 144, 144, 168, 168, 192, 192, 240, 240, 288, 288, 336, 336, 384, 384, 480, + 480, 576, 576, 672, 672, 768, 768, 960, 960, 1152, 1152, 1344, 1344, 1536, 1536, 1728, 1728, + 1920, 1920, +]; + +const FRAME_LENS_44100: [u16; 38] = [ + 69u16, 70, 87, 88, 104, 105, 121, 122, 139, 140, 174, 175, 208, 209, 243, 244, 278, 279, 348, + 349, 417, 418, 487, 488, 557, 558, 696, 697, 835, 836, 975, 976, 1114, 1115, 1253, 1254, 1393, + 1394, +]; + +const FRAME_LENS_48000: [u16; 38] = [ + 64u16, 64, 80, 80, 96, 96, 112, 112, 128, 128, 160, 160, 192, 192, 224, 224, 256, 256, 320, + 320, 384, 384, 448, 448, 512, 512, 640, 640, 768, 768, 896, 896, 1024, 1024, 1152, 1152, 1280, + 1280, +]; + +pub(crate) fn peek_frame_header(data: &[u8]) -> Result { + // Need sync info and start of bit stream info (bsi) + if data.len() < 5 + 3 { + return Err(()); + } + + let sync_hdr = u16::from_be_bytes([data[0], data[1]]); + + if sync_hdr != 0x0b77 { + return Err(()); + } + + // skipping 2 bytes of CRC + + let (sample_rate, len_table) = { + let fscod = (data[4] >> 6) & 0b11; + + match fscod { + 0b00 => (48000, &FRAME_LENS_48000), + 0b01 => (44100, &FRAME_LENS_44100), + 0b10 => (32000, &FRAME_LENS_32000), + _ => return Err(()), + } + }; + + let frame_len = { + let frmsizcod = data[4] & 0b00111111; + + let len_words = len_table.get(frmsizcod as usize).ok_or(())?; + + len_words * 2 + }; + + let bsi = &data[5..]; + + let _bsid = bsi[0] >> 3; + let _bsmod = bsi[0] & 0b00000111; + + let channels = { + let bits = u16::from_be_bytes([bsi[1], bsi[2]]); + + let acmod = (bits >> 13) & 0b111; + + let (nfchans, skip_bits) = match acmod { + 0b000 => (2, 0), // 1+1, dual mono + 0b001 => (1, 0), // 1/0, center/mono + 0b010 => (2, 2), // 2/0, stereo + 0b011 => (3, 2), // 3/0, L C R + 0b100 => (3, 2), // 2/1, L R S + 0b101 => (4, 4), // 3/1, L C R S + 0b110 => (4, 2), // 2/2, L R Sl Sr + 0b111 => (5, 4), // 3/2, L C R Sl Sr + _ => unreachable!(), + }; + + let lfe_on = ((bits << (3 + skip_bits)) & 0x8000) >> 15; + + nfchans + lfe_on + }; + + Ok(FrameHeader { + channels, + sample_rate: sample_rate as u16, + frame_len: frame_len as usize, + }) +} diff --git a/net/rtp/src/ac3/depay/imp.rs b/net/rtp/src/ac3/depay/imp.rs new file mode 100644 index 00000000..1ba496fa --- /dev/null +++ b/net/rtp/src/ac3/depay/imp.rs @@ -0,0 +1,406 @@ +// GStreamer RTP AC-3 Audio Depayloader +// +// Copyright (C) 2023 Tim-Philipp Müller +// +// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at +// . +// +// SPDX-License-Identifier: MPL-2.0 + +/** + * SECTION:element-rtpac3depay2 + * @see_also: rtpac3pay2, rtpac3depay, rtpac3pay, avdec_ac3, avenc_ac3 + * + * Depayload an AC-3 Audio Stream from RTP packets as per [RFC 4184][rfc-4184]. + * + * [rfc-4184]: https://www.rfc-editor.org/rfc/rfc4184.html + * + * ## Example pipeline + * + * |[ + * gst-launch-1.0 udpsrc caps='application/x-rtp,media=audio,clock-rate=48000,encoding-name=AC3,payload=96' ! rtpjitterbuffer latency=250 ! rtpac3depay2 ! decodebin3 ! audioconvert ! audioresample ! autoaudiosink + * ]| This will depayload an incoming RTP AC-3 audio stream and decode it and play it. + * You can use the `rtpac3pay2` or `rtpac3pay` elements with `avenc_ac3` to create such an RTP stream. + * + * Since: plugins-rs-0.13.0 + */ +use atomic_refcell::AtomicRefCell; + +use gst::{glib, subclass::prelude::*}; + +use once_cell::sync::Lazy; + +use crate::basedepay::{ + Packet, PacketToBufferRelation, RtpBaseDepay2Ext, RtpBaseDepay2Impl, TimestampOffset, +}; + +use crate::ac3::ac3_audio_utils; + +#[derive(Default)] +pub struct RtpAc3Depay { + state: AtomicRefCell, +} + +#[derive(Debug, PartialEq)] +enum FragType { + NotFragmented, + Start, + Continuation, + End, +} + +#[derive(Debug)] +struct FragmentedFrame { + data: Vec, + ext_seqnum: u64, + ext_timestamp: u64, +} + +#[derive(Default)] +struct State { + last_frame_header: Option, + partial_frame: Option, + clock_rate: Option, +} + +static CAT: Lazy = Lazy::new(|| { + gst::DebugCategory::new( + "rtpac3depay2", + gst::DebugColorFlags::empty(), + Some("RTP AC-3 Audio Depayloader"), + ) +}); + +#[glib::object_subclass] +impl ObjectSubclass for RtpAc3Depay { + const NAME: &'static str = "GstRtpAc3Depay"; + type Type = super::RtpAc3Depay; + type ParentType = crate::basedepay::RtpBaseDepay2; +} + +impl ObjectImpl for RtpAc3Depay {} + +impl GstObjectImpl for RtpAc3Depay {} + +impl ElementImpl for RtpAc3Depay { + fn metadata() -> Option<&'static gst::subclass::ElementMetadata> { + static ELEMENT_METADATA: Lazy = Lazy::new(|| { + gst::subclass::ElementMetadata::new( + "RTP AC-3 Audio Depayloader", + "Codec/Depayloader/Network/RTP", + "Depayload an AC-3 Audio Stream from RTP packets (RFC 4184)", + "Tim-Philipp Müller ", + ) + }); + + Some(&*ELEMENT_METADATA) + } + + fn pad_templates() -> &'static [gst::PadTemplate] { + static PAD_TEMPLATES: Lazy> = Lazy::new(|| { + let sink_pad_template = gst::PadTemplate::new( + "sink", + gst::PadDirection::Sink, + gst::PadPresence::Always, + &gst::Caps::builder("application/x-rtp") + .field("media", "audio") + .field("encoding-name", "AC3") + .field("clock-rate", gst::List::new([48000i32, 44100, 32000])) + .build(), + ) + .unwrap(); + + let src_pad_template = gst::PadTemplate::new( + "src", + gst::PadDirection::Src, + gst::PadPresence::Always, + &gst::Caps::builder("audio/x-ac3") + .field("channels", gst::IntRange::new(1, 6)) + .field("rate", gst::List::new([48000i32, 44100, 32000])) + .field("framed", true) + .field("alignment", "frame") + .build(), + ) + .unwrap(); + + vec![src_pad_template, sink_pad_template] + }); + + PAD_TEMPLATES.as_ref() + } +} + +impl RtpBaseDepay2Impl for RtpAc3Depay { + const ALLOWED_META_TAGS: &'static [&'static str] = &["audio"]; + + fn set_sink_caps(&self, caps: &gst::Caps) -> bool { + let s = caps.structure(0).unwrap(); + + let mut state = self.state.borrow_mut(); + + state.clock_rate = s.get::("clock-rate").ok(); + + // We'll set output caps later based on the frame header + true + } + + // Encapsulation of AC-3 Audio Streams: + // https://www.rfc-editor.org/rfc/rfc4184.html#section-4 + // + // We either get 1-N whole AC-3 audio frames in an RTP packet, + // or a single AC-3 audio frame split over multiple RTP packets. + fn handle_packet(&self, packet: &Packet) -> Result { + let mut state = self.state.borrow_mut(); + + if packet.discont() { + state.partial_frame = None; + } + + let payload = packet.payload(); + + if payload.len() < 2 + 6 { + gst::warning!( + CAT, + imp: self, + "Payload too small: {} bytes, but need at least 8 bytes", + payload.len(), + ); + state.partial_frame = None; + self.obj().drop_packet(packet); + return Ok(gst::FlowSuccess::Ok); + } + + gst::log!( + CAT, + imp: self, + "Have payload of {} bytes, header {:02x?} {:02x?}", + payload.len(), + payload[0], + payload[1], + ); + + // AC-3 specific header + + let frag_type = match payload[0] & 0x03 { + 0 => FragType::NotFragmented, + 1 | 2 => FragType::Start, + 3 => { + if packet.marker_bit() { + FragType::End + } else { + FragType::Continuation + } + } + _ => unreachable!(), + }; + + let num_frames_or_frags = payload[1] as usize; + + // Clear out unfinished pending partial frame if needed + + if frag_type == FragType::Start || frag_type == FragType::NotFragmented { + if let Some(partial_frame) = state.partial_frame.as_ref() { + gst::warning!(CAT, imp: self, "Dropping unfinished partial frame"); + + self.obj() + .drop_packets(partial_frame.ext_seqnum..=packet.ext_seqnum() - 1); + + state.partial_frame = None; + } + } + + // Skip to AC-3 payload data + + let payload = &payload[2..]; + + match frag_type { + FragType::Start => { + let mut data = Vec::with_capacity(num_frames_or_frags * payload.len()); + data.extend_from_slice(payload); + + state.partial_frame = Some(FragmentedFrame { + data, + ext_seqnum: packet.ext_seqnum(), + ext_timestamp: packet.ext_timestamp(), + }); + + gst::trace!(CAT, imp: self, "Partial frame {:?}", state.partial_frame); + + return Ok(gst::FlowSuccess::Ok); + } + + FragType::Continuation | FragType::End => { + let Some(partial_frame) = state.partial_frame.as_mut() else { + gst::debug!( + CAT, + imp: self, + "{frag_type:?} packet but no partial frame (most likely indicates packet loss)", + ); + self.obj().drop_packet(packet); + state.partial_frame = None; + return Ok(gst::FlowSuccess::Ok); + }; + + if partial_frame.ext_timestamp != packet.ext_timestamp() { + gst::warning!( + CAT, + imp: self, + "{frag_type:?} packet timestamp {} doesn't match existing partial fragment timestamp {}", + packet.ext_timestamp(), + partial_frame.ext_timestamp, + ); + state.partial_frame = None; + self.obj().drop_packet(packet); + return Ok(gst::FlowSuccess::Ok); + } + + partial_frame.data.extend_from_slice(payload); + + gst::log!( + CAT, + imp: self, + "Added {frag_type:?} packet payload, assembled {} bytes now", + partial_frame.data.len() + ); + + if frag_type == FragType::End { + let partial_frame = state.partial_frame.take().unwrap(); + + let Ok(hdr) = ac3_audio_utils::peek_frame_header(&partial_frame.data) else { + gst::warning!(CAT, imp: self, "Could not parse frame header, dropping frame"); + self.obj() + .drop_packets(partial_frame.ext_seqnum..=packet.ext_seqnum()); + return Ok(gst::FlowSuccess::Ok); + }; + + gst::trace!(CAT, imp: self, "{hdr:?}"); + + if partial_frame.data.len() != hdr.frame_len { + gst::warning!( + CAT, + imp: self, + "Partial frame finished, but have {} bytes, and expected {} bytes!", + partial_frame.data.len(), + hdr.frame_len, + ); + } + + self.ensure_output_caps(&mut state, &hdr); + + let mut outbuf = gst::Buffer::from_mut_slice(partial_frame.data); + + let outbuf_ref = outbuf.get_mut().unwrap(); + + outbuf_ref.set_duration(gst::ClockTime::from_nseconds(hdr.duration())); + + gst::trace!(CAT, imp: self, "Finishing buffer {outbuf:?}"); + + return self.obj().queue_buffer( + PacketToBufferRelation::Seqnums( + partial_frame.ext_seqnum..=packet.ext_seqnum(), + ), + outbuf, + ); + } + + // Wait for more frame fragments + return Ok(gst::FlowSuccess::Ok); + } + + FragType::NotFragmented => { + let mut offset = 0; + let mut ts_offset = 0; + + while offset < payload.len() { + let Ok(hdr) = ac3_audio_utils::peek_frame_header(&payload[offset..]) else { + gst::warning!(CAT, imp: self, "Could not parse frame header at offset {offset}"); + break; + }; + + gst::trace!(CAT, imp: self, "{hdr:?} at offset {offset}"); + + let frame_len = if offset + hdr.frame_len <= payload.len() { + hdr.frame_len + } else { + gst::warning!( + CAT, + imp: self, + "Frame at offset {offset} is {} bytes, but we have only {} bytes left!", + hdr.frame_len, + payload.len() - offset, + ); + // We'll still push out what we have, there might be decodable blocks + payload.len() - offset + }; + + self.ensure_output_caps(&mut state, &hdr); + + gst::trace!(CAT, imp: self, "Getting frame @ {offset}+{frame_len}"); + + let mut outbuf = + packet.payload_subbuffer_from_offset_with_length(offset, frame_len); + + let outbuf_ref = outbuf.get_mut().unwrap(); + + outbuf_ref.set_duration(gst::ClockTime::from_nseconds(hdr.duration())); + + gst::trace!(CAT, imp: self, "Finishing frame @ {offset}, buffer {outbuf:?}"); + + self.obj().queue_buffer( + PacketToBufferRelation::SeqnumsWithOffset { + seqnums: packet.ext_seqnum()..=packet.ext_seqnum(), + timestamp_offset: TimestampOffset::Pts( + gst::Signed::::from(ts_offset as i64), + ), + }, + outbuf, + )?; + + offset += frame_len; + ts_offset += hdr.duration(); + } + } + } + + Ok(gst::FlowSuccess::Ok) + } + + fn stop(&self) -> Result<(), gst::ErrorMessage> { + *self.state.borrow_mut() = State::default(); + + Ok(()) + } +} + +impl RtpAc3Depay { + fn ensure_output_caps(&self, state: &mut State, frame_header: &ac3_audio_utils::FrameHeader) { + let update_caps = state.last_frame_header.as_ref() != Some(frame_header); + + if update_caps { + if state.clock_rate != Some(frame_header.sample_rate as i32) { + gst::warning!( + CAT, + imp: self, + "clock-rate {} does not match sample rate {}!", + state.clock_rate.unwrap(), + frame_header.sample_rate, + ); + } + + let src_caps = gst::Caps::builder("audio/x-ac3") + .field("rate", frame_header.sample_rate as i32) + .field("channels", frame_header.channels as i32) + .field("framed", true) + .field("alignment", "frame") + .build(); + + gst::info!(CAT, imp: self, "Setting output caps {src_caps}.."); + + // Ignore failure here and let the next buffer push yield an appropriate flow return + self.obj().set_src_caps(&src_caps); + + state.last_frame_header = Some(frame_header.clone()); + } + } +} diff --git a/net/rtp/src/ac3/depay/mod.rs b/net/rtp/src/ac3/depay/mod.rs new file mode 100644 index 00000000..a8c81554 --- /dev/null +++ b/net/rtp/src/ac3/depay/mod.rs @@ -0,0 +1,28 @@ +// GStreamer RTP AC-3 Audio Depayloader +// +// Copyright (C) 2023 Tim-Philipp Müller +// +// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at +// . +// +// SPDX-License-Identifier: MPL-2.0 + +use gst::glib; +use gst::prelude::*; + +pub mod imp; + +glib::wrapper! { + pub struct RtpAc3Depay(ObjectSubclass) + @extends crate::basedepay::RtpBaseDepay2, gst::Element, gst::Object; +} + +pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { + gst::Element::register( + Some(plugin), + "rtpac3depay2", + gst::Rank::MARGINAL, + RtpAc3Depay::static_type(), + ) +} diff --git a/net/rtp/src/ac3/mod.rs b/net/rtp/src/ac3/mod.rs new file mode 100644 index 00000000..7a0a2936 --- /dev/null +++ b/net/rtp/src/ac3/mod.rs @@ -0,0 +1,6 @@ +// SPDX-License-Identifier: MPL-2.0 + +pub mod depay; +pub mod pay; + +mod ac3_audio_utils; diff --git a/net/rtp/src/ac3/pay/imp.rs b/net/rtp/src/ac3/pay/imp.rs new file mode 100644 index 00000000..d8888916 --- /dev/null +++ b/net/rtp/src/ac3/pay/imp.rs @@ -0,0 +1,711 @@ +// GStreamer RTP AC-3 Audio Payloader +// +// Copyright (C) 2023 Tim-Philipp Müller +// +// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at +// . +// +// SPDX-License-Identifier: MPL-2.0 + +/** + * SECTION:element-rtpac3pay2 + * @see_also: rtpac3depay2, rtpac3pay, rtpac3depay, avdec_ac3, a52dec, avenc_ac3 + * + * Payload an AC-3 Audio Elementary Stream into RTP packets as per [RFC 4184][rfc-4184]. + * Also see the [IANA media-type page for AC-3][iana-ac3]. + * + * [rfc-4184]: https://www.rfc-editor.org/rfc/rfc4184.html#section-3.5 + * [iana-ac3]: https://www.iana.org/assignments/media-types/audio/ac3 + * + * ## Aggregation Modes + * + * The default aggregation mode is `auto`: If upstream is live, the payloader will send out all + * audio frames immediately, even if they don't completely fill a packet, in order to minimise + * latency. If upstream is not live, the payloader will by default aggregate audio frames until + * it has completely filled an RTP packet as per the configured MTU size or the `max-ptime` + * property if it is set (it is not set by default). + * + * The aggregation mode can be controlled via the `aggregate-mode` property. + * + * ## Example pipeline + * + * |[ + * gst-launch audiotestsrc wave=ticks ! avenc_ac3 ! ac3parse ! rtpac3pay2 ! udpsink host=127.0.0.1 port=5004 + * ]| This will encode an audio test signal to AC-3 and then payload the encoded audio + * into RTP packets and send them out via UDP to localhost (IPv4) port 5004. + * You can use the `rtpac3depay2` or `rtpac3depay` elements to depayload such a stream, and + * the `avdec_ac3` or `a52dec` elements to decode the depayloaded stream. + * + * Since: plugins-rs-0.13.0 + */ +use atomic_refcell::AtomicRefCell; + +use std::collections::VecDeque; + +use std::sync::{Arc, Mutex}; + +use gst::{glib, prelude::*, subclass::prelude::*}; + +use once_cell::sync::Lazy; + +use crate::basepay::{ + PacketToBufferRelation, RtpBasePay2Ext, RtpBasePay2Impl, RtpBasePay2ImplExt, TimestampOffset, +}; + +use crate::ac3::ac3_audio_utils; + +use super::RtpAc3PayAggregateMode; + +// https://www.rfc-editor.org/rfc/rfc4184.html#section-4.1 +const AC3_SPECIFIC_HEADER_LEN: usize = 2; + +#[derive(Clone)] +struct Settings { + max_ptime: Option, + aggregate_mode: RtpAc3PayAggregateMode, +} + +impl Default for Settings { + fn default() -> Self { + Settings { + aggregate_mode: RtpAc3PayAggregateMode::Auto, + max_ptime: None, + } + } +} + +#[derive(Default)] +pub struct RtpAc3Pay { + state: AtomicRefCell, + settings: Mutex, + is_live: Mutex>, +} + +#[derive(Debug)] +struct QueuedFrame { + // Id of the input buffer this frame came from + id: u64, + + // Time offset to the timestamp of the buffer this input buffer came from + // (will be non-zero if the input buffer contained multiple audio frames) + pts_offset: u64, + + // Mapped buffer data and offset into the buffer data + buffer: Arc>, + offset: usize, + + // Audio frame header + header: ac3_audio_utils::FrameHeader, +} + +impl QueuedFrame { + fn duration(&self) -> u64 { + self.header.duration() + } + + fn len(&self) -> usize { + self.header.frame_len + } + + fn data(&self) -> &[u8] { + let end_offset = self.offset + self.len(); + &self.buffer[self.offset..end_offset] + } +} + +#[derive(Default)] +struct State { + // Queued audio frames (we collect until min-ptime/max-ptime is hit or the packet is full) + queued_frames: VecDeque, + + // Desired "packet time", i.e. packet duration, from the downstream caps, if set + ptime: Option, + max_ptime: Option, +} + +static CAT: Lazy = Lazy::new(|| { + gst::DebugCategory::new( + "rtpac3pay2", + gst::DebugColorFlags::empty(), + Some("RTP AC-3 Audio Payloader"), + ) +}); + +#[glib::object_subclass] +impl ObjectSubclass for RtpAc3Pay { + const NAME: &'static str = "GstRtpAc3Pay"; + type Type = super::RtpAc3Pay; + type ParentType = crate::basepay::RtpBasePay2; +} + +impl ObjectImpl for RtpAc3Pay { + fn properties() -> &'static [glib::ParamSpec] { + static PROPERTIES: Lazy> = Lazy::new(|| { + vec![ + glib::ParamSpecEnum::builder_with_default("aggregate-mode", Settings::default().aggregate_mode) + .nick("Aggregate Mode") + .blurb("Whether to send out audio frames immediately or aggregate them until a packet is full.") + .build(), + // Using same type/semantics as C payloaders + glib::ParamSpecInt64::builder("max-ptime") + .nick("Maximum Packet Time") + .blurb("Maximum duration of the packet data in ns (-1 = unlimited up to MTU)") + .default_value( + Settings::default() + .max_ptime + .map(gst::ClockTime::nseconds) + .map(|x| x as i64) + .unwrap_or(-1), + ) + .minimum(-1) + .maximum(i64::MAX) + .mutable_playing() + .build(), + ] + }); + + PROPERTIES.as_ref() + } + + fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) { + let mut settings = self.settings.lock().unwrap(); + + match pspec.name() { + "aggregate-mode" => { + settings.aggregate_mode = value + .get::() + .expect("type checked upstream"); + } + "max-ptime" => { + let new_max_ptime = match value.get::().unwrap() { + -1 => None, + v @ 0.. => Some(gst::ClockTime::from_nseconds(v as u64)), + _ => unreachable!(), + }; + let changed = settings.max_ptime != new_max_ptime; + settings.max_ptime = new_max_ptime; + drop(settings); + + if changed { + let _ = self + .obj() + .post_message(gst::message::Latency::builder().src(&*self.obj()).build()); + } + } + _ => unimplemented!(), + }; + } + + fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value { + let settings = self.settings.lock().unwrap(); + + match pspec.name() { + "aggregate-mode" => settings.aggregate_mode.to_value(), + "max-ptime" => (settings + .max_ptime + .map(gst::ClockTime::nseconds) + .map(|x| x as i64) + .unwrap_or(-1)) + .to_value(), + _ => unimplemented!(), + } + } +} + +impl GstObjectImpl for RtpAc3Pay {} + +impl ElementImpl for RtpAc3Pay { + fn metadata() -> Option<&'static gst::subclass::ElementMetadata> { + static ELEMENT_METADATA: Lazy = Lazy::new(|| { + gst::subclass::ElementMetadata::new( + "RTP AC-3 Audio Payloader", + "Codec/Payloader/Network/RTP", + "Payload an AC-3 Audio Elementary Stream into RTP packets (RFC 4184)", + "Tim-Philipp Müller ", + ) + }); + + Some(&*ELEMENT_METADATA) + } + + fn pad_templates() -> &'static [gst::PadTemplate] { + static PAD_TEMPLATES: Lazy> = Lazy::new(|| { + let sink_pad_template = gst::PadTemplate::new( + "sink", + gst::PadDirection::Sink, + gst::PadPresence::Always, + &gst::Caps::builder("audio/x-ac3") + .field("rate", gst::List::new([48000i32, 44100, 32000])) + .field("channels", gst::IntRange::new(1, 6)) + .field("framed", true) + .field("alignment", "frame") + .build(), + ) + .unwrap(); + + let src_pad_template = gst::PadTemplate::new( + "src", + gst::PadDirection::Src, + gst::PadPresence::Always, + &gst::Caps::builder("application/x-rtp") + .field("media", "audio") + .field("encoding-name", "AC3") + .field("clock-rate", gst::List::new([48000i32, 44100, 32000])) + .build(), + ) + .unwrap(); + + vec![sink_pad_template, src_pad_template] + }); + + PAD_TEMPLATES.as_ref() + } +} + +impl RtpBasePay2Impl for RtpAc3Pay { + const ALLOWED_META_TAGS: &'static [&'static str] = &["audio"]; + + fn set_sink_caps(&self, caps: &gst::Caps) -> bool { + let s = caps.structure(0).unwrap(); + + let channels = s.get::("channels").unwrap(); + let rate = s.get::("rate").unwrap(); + + let src_caps = gst::Caps::builder("application/x-rtp") + .field("media", "audio") + .field("encoding-name", "AC3") + .field("clock-rate", rate) + .field("channels", channels.to_string()); + + self.obj().set_src_caps(&src_caps.build()); + + true + } + + fn negotiate(&self, mut src_caps: gst::Caps) { + // Fixate as a first step + src_caps.fixate(); + + let s = src_caps.structure(0).unwrap(); + + // Negotiate ptime/maxptime with downstream and use them in combination with the + // properties. See https://www.iana.org/assignments/media-types/audio/ac3 + let ptime = s + .get::("ptime") + .ok() + .map(u64::from) + .map(gst::ClockTime::from_mseconds); + + let max_ptime = s + .get::("maxptime") + .ok() + .map(u64::from) + .map(gst::ClockTime::from_mseconds); + + self.parent_negotiate(src_caps); + + let mut state = self.state.borrow_mut(); + state.ptime = ptime; + state.max_ptime = max_ptime; + drop(state); + } + + // Encapsulation of AC-3 Audio Elementary Streams: + // https://www.rfc-editor.org/rfc/rfc4184.html#section-4.2 + // + // We either put 1-N whole AC-3 audio frames into a single RTP packet, + // or split a single AC-3 audio frame over multiple RTP packets. + // + fn handle_buffer( + &self, + buffer: &gst::Buffer, + id: u64, + ) -> Result { + let mut state = self.state.borrow_mut(); + let mut settings = self.settings.lock().unwrap(); + + if buffer.flags().contains(gst::BufferFlags::DISCONT) { + gst::debug!(CAT, imp: self, "Discont on {buffer:?}, pushing out any pending frames"); + self.send_packets(&settings, &mut state, SendPacketMode::ForcePending)?; + } + + let map = buffer.clone().into_mapped_buffer_readable().map_err(|_| { + gst::error!(CAT, imp: self, "Can't map buffer readable"); + gst::FlowError::Error + })?; + + // Arc so we can share the MappedBuffer amongst multiple frames. + // Todo: could probably do something more clever to avoid the heap + // allocation in the case where the input buffer contains a single + // audio frame only (which is probably the normal case). + let map = Arc::new(map); + + let data = map.as_slice(); + + let mut pts_offset = 0; + let mut map_offset = 0; + + loop { + let Ok(frame_hdr) = ac3_audio_utils::peek_frame_header(&data[map_offset..]) else { + gst::warning!( + CAT, + imp: self, + "Failed to parse AC-3 audio frame header for {buffer:?} at offset {map_offset}", + ); + + if map_offset > 0 { + break; + } + + self.send_packets(&settings, &mut state, SendPacketMode::ForcePending)?; + + self.obj().drop_buffers(..=id); + return Ok(gst::FlowSuccess::Ok); + }; + + let queued_frame = QueuedFrame { + id, + pts_offset, + buffer: map.clone(), + offset: map_offset, + header: frame_hdr, + }; + + let frame_len = queued_frame.len(); + let frame_dur = queued_frame.duration(); + + if map_offset + frame_len > data.len() { + gst::warning!(CAT, imp: self, "Short audio frame for {buffer:?} at offset {map_offset}"); + } + + pts_offset += frame_dur; + map_offset += frame_len; + + state.queued_frames.push_back(queued_frame); + + if map_offset >= data.len() { + break; + } + } + + // Make sure we have queried upstream liveness if needed + if settings.aggregate_mode == RtpAc3PayAggregateMode::Auto { + self.ensure_upstream_liveness(&mut settings); + } + + self.send_packets(&settings, &mut state, SendPacketMode::WhenReady) + } + + fn drain(&self) -> Result { + let settings = self.settings.lock().unwrap().clone(); + let mut state = self.state.borrow_mut(); + + self.send_packets(&settings, &mut state, SendPacketMode::ForcePending) + } + + fn flush(&self) { + let mut state = self.state.borrow_mut(); + state.queued_frames.clear(); + } + + #[allow(clippy::single_match)] + fn src_query(&self, query: &mut gst::QueryRef) -> bool { + let res = self.parent_src_query(query); + if !res { + return false; + } + + match query.view_mut() { + gst::QueryViewMut::Latency(query) => { + let settings = self.settings.lock().unwrap(); + + let (is_live, mut min, mut max) = query.result(); + + { + let mut live_guard = self.is_live.lock().unwrap(); + + if Some(is_live) != *live_guard { + gst::info!(CAT, imp: self, "Upstream is live: {is_live}"); + *live_guard = Some(is_live); + } + } + + if self.effective_aggregate_mode(&settings) == RtpAc3PayAggregateMode::Aggregate { + if let Some(max_ptime) = settings.max_ptime { + min += max_ptime; + max.opt_add_assign(max_ptime); + } else if is_live { + gst::warning!( + CAT, + imp: self, + "Aggregating packets in live mode, but no max_ptime configured. \ + Configured latency may be too low!", + ); + } + query.set(is_live, min, max); + } + } + _ => (), + } + + true + } + + fn start(&self) -> Result<(), gst::ErrorMessage> { + *self.state.borrow_mut() = State::default(); + *self.is_live.lock().unwrap() = None; + + self.parent_start() + } + + fn stop(&self) -> Result<(), gst::ErrorMessage> { + *self.state.borrow_mut() = State::default(); + *self.is_live.lock().unwrap() = None; + + self.parent_stop() + } +} + +#[derive(Debug, PartialEq)] +enum SendPacketMode { + WhenReady, + ForcePending, +} + +impl RtpAc3Pay { + fn send_packets( + &self, + settings: &Settings, + state: &mut State, + send_mode: SendPacketMode, + ) -> Result { + let agg_mode = self.effective_aggregate_mode(settings); + + let max_payload_size = self.obj().max_payload_size() as usize - AC3_SPECIFIC_HEADER_LEN; + + // Send out packets if there's enough data for one (or more), or if forced. + while let Some(first) = state.queued_frames.front() { + // Big audio frame that needs to be split across multiple packets? + if first.header.frame_len > max_payload_size { + let first = state.queued_frames.pop_front().unwrap(); + let mut data = first.buffer.as_slice(); + let mut frag_offset = 0; + let id = first.id; + + while frag_offset < first.header.frame_len { + let left = first.header.frame_len - frag_offset; + let is_last = left <= max_payload_size; + let bytes_in_this_packet = std::cmp::min(left, max_payload_size); + + // 0 1 + // 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 + // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + // | MBZ | FT| NF | + // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + // + // https://www.rfc-editor.org/rfc/rfc4184.html#section-4.1.1 + // + let frame_type: u16 = { + if frag_offset == 0 { + if bytes_in_this_packet >= (first.header.frame_len * 5 / 8) { + // Initial fragment of frame which includes the first 5/8ths of the frame + 1 + } else { + // Initial fragment of frame, which does not include the first 5/8ths of the frame + 2 + } + } else { + // Fragment of frame other than initial fragment + 3 + } + }; + + // The number fragments (and therefore packets) that make up the current frame + let n = (first.header.frame_len + max_payload_size - 1) / max_payload_size; + + let ac3_specific_header = ((frame_type << 8) | (n as u16)).to_be_bytes(); + + // https://www.rfc-editor.org/rfc/rfc4184.html#section-3 + self.obj().queue_packet( + id.into(), + rtp_types::RtpPacketBuilder::new() + .payload(ac3_specific_header.as_slice()) + .payload(&data[0..bytes_in_this_packet]) + .marker_bit(is_last), + )?; + + data = &data[bytes_in_this_packet..]; + frag_offset += bytes_in_this_packet; + } + continue; + } + + let n_frames = state.queued_frames.len(); + + let queue_size = state.queued_frames.iter().map(|f| f.len()).sum::(); + + let queue_duration = state + .queued_frames + .iter() + .map(|f| f.duration()) + .sum::(); + + // We optimistically add average size/duration to send out packets as early as possible + // if we estimate that the next frame would likely overflow our accumulation limits. + let avg_size = queue_size / n_frames; + let avg_duration = queue_duration / n_frames as u64; + + let max_ptime = self.calc_effective_max_ptime(settings, state); + + let is_ready = send_mode == SendPacketMode::ForcePending + || agg_mode != RtpAc3PayAggregateMode::Aggregate + || queue_size + avg_size > max_payload_size + || (max_ptime.is_some() && queue_duration + avg_duration > max_ptime.unwrap()); + + gst::log!( + CAT, + imp: self, + "Queued: size {queue_size}, duration ~{}ms, mode: {:?} + {:?} => ready: {}", + queue_duration / 1_000_000, + agg_mode, + send_mode, + is_ready); + + if !is_ready { + gst::log!(CAT, imp: self, "Not ready yet, waiting for more data"); + break; + } + + gst::trace!(CAT, imp: self, "Creating packet.."); + + let pts_offset = gst::ClockTime::from_nseconds(first.pts_offset); + + let id = first.id; + let mut end_id = first.id; + + let mut acc_duration = 0; + let mut acc_size = 0; + + let mut n = 0; // Number of frames in packet + + // Figure out how many frames we're going to put into the packet, needed for the header + for frame in &state.queued_frames { + gst::trace!( + CAT, + imp: self, + "{frame:?}, accumulated size {acc_size} duration ~{}ms", + acc_duration / 1_000_000); + + // If this frame would overflow the packet, bail out and send out what we have. + // + // Don't take into account the max_ptime for the first frame, since it could be + // lower than the frame duration in which case we would never payload anything. + // + // For the size check in bytes we know that the first frame will fit the mtu, + // because we already checked for the "audio frame bigger than mtu" scenario above. + if acc_size + frame.len() > max_payload_size + || (max_ptime.is_some() + && acc_duration > 0 + && acc_duration + frame.duration() > max_ptime.unwrap()) + { + break; + } + + // ... otherwise add frame to the packet (we'll do the actual payloading later) + n += 1; + + acc_size += frame.len(); + acc_duration += frame.duration(); + end_id = frame.id; + + // .. and check if there are more frames we can add to the packet + } + + // Packet creation and payloading + + let frame_type: u16 = 0; // One or more complete frames + let ac3_specific_header = ((frame_type << 8) | (n as u16)).to_be_bytes(); + + let mut packet = rtp_types::RtpPacketBuilder::new() + .marker_bit(true) + .payload(ac3_specific_header.as_slice()); + + // Add frames to the packet + + for frame in state.queued_frames.iter().take(n) { + packet = packet.payload(frame.data()); + } + + self.obj().queue_packet( + PacketToBufferRelation::IdsWithOffset { + ids: (id..=end_id), + timestamp_offset: TimestampOffset::Pts(pts_offset), + }, + packet, + )?; + + // Now pop off all the frames we used (now that the packet has been written out) + for _ in 0..n { + let _ = state.queued_frames.pop_front(); + } + } + + gst::log!(CAT, imp: self, "All done for now, {} frames queued", state.queued_frames.len()); + + if send_mode == SendPacketMode::ForcePending { + self.obj().finish_pending_packets()?; + } + + Ok(gst::FlowSuccess::Ok) + } + + fn effective_aggregate_mode(&self, settings: &Settings) -> RtpAc3PayAggregateMode { + match settings.aggregate_mode { + RtpAc3PayAggregateMode::Auto => match self.is_live() { + Some(true) => RtpAc3PayAggregateMode::ZeroLatency, + Some(false) => RtpAc3PayAggregateMode::Aggregate, + None => RtpAc3PayAggregateMode::ZeroLatency, + }, + mode => mode, + } + } + + fn is_live(&self) -> Option { + *self.is_live.lock().unwrap() + } + + // Query upstream live-ness if needed, in case of aggregate-mode=auto + fn ensure_upstream_liveness(&self, settings: &mut Settings) { + if settings.aggregate_mode != RtpAc3PayAggregateMode::Auto || self.is_live().is_some() { + return; + } + + let mut q = gst::query::Latency::new(); + let is_live = if self.obj().sink_pad().peer_query(&mut q) { + let (is_live, _, _) = q.result(); + is_live + } else { + false + }; + + *self.is_live.lock().unwrap() = Some(is_live); + + gst::info!(CAT, imp: self, "Upstream is live: {is_live}"); + } + + // We can get max ptime or ptime recommendations/restrictions from multiple places, e.g. the + // "max-ptime" property, but also from "maxptime" or "ptime" values from downstream / an SDP. + // + // Here we look at the various values and decide on an effective max ptime value. + // + // We'll just return the lowest of any set values. + // + fn calc_effective_max_ptime(&self, settings: &Settings, state: &State) -> Option { + [settings.max_ptime, state.max_ptime, state.ptime] + .into_iter() + .filter(|v| v.is_some()) + .min() + .map(|t| t.unwrap().nseconds()) + } +} diff --git a/net/rtp/src/ac3/pay/mod.rs b/net/rtp/src/ac3/pay/mod.rs new file mode 100644 index 00000000..416feedb --- /dev/null +++ b/net/rtp/src/ac3/pay/mod.rs @@ -0,0 +1,57 @@ +// GStreamer RTP AC-3 Audio Payloader +// +// Copyright (C) 2023 Tim-Philipp Müller +// +// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at +// . +// +// SPDX-License-Identifier: MPL-2.0 + +use gst::glib; +use gst::prelude::*; + +pub mod imp; + +#[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Clone, Copy, glib::Enum)] +#[repr(i32)] +#[enum_type(name = "GstRtpAc3PayAggregateMode")] +#[non_exhaustive] +pub(crate) enum RtpAc3PayAggregateMode { + #[enum_value( + name = "Automatic: zero-latency if upstream is live, otherwise aggregate frames until packet is full.", + nick = "auto" + )] + Auto = -1, + + #[enum_value( + name = "Zero Latency: always send out frames right away, do not wait for more frames to fill a packet.", + nick = "zero-latency" + )] + ZeroLatency = 0, + + #[enum_value( + name = "Aggregate: collect audio frames until we have a full packet or the max-ptime limit is hit (if set).", + nick = "aggregate" + )] + Aggregate = 1, +} + +glib::wrapper! { + pub struct RtpAc3Pay(ObjectSubclass) + @extends crate::basepay::RtpBasePay2, gst::Element, gst::Object; +} + +pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { + #[cfg(feature = "doc")] + { + RtpAc3PayAggregateMode::static_type().mark_as_plugin_api(gst::PluginAPIFlags::empty()); + } + + gst::Element::register( + Some(plugin), + "rtpac3pay2", + gst::Rank::MARGINAL, + RtpAc3Pay::static_type(), + ) +} diff --git a/net/rtp/src/lib.rs b/net/rtp/src/lib.rs index d7224cfc..d3b0fb30 100644 --- a/net/rtp/src/lib.rs +++ b/net/rtp/src/lib.rs @@ -31,6 +31,7 @@ mod baseaudiopay; mod basedepay; mod basepay; +mod ac3; mod av1; mod jpeg; mod klv; @@ -61,6 +62,9 @@ fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { .mark_as_plugin_api(gst::PluginAPIFlags::empty()); } + ac3::depay::register(plugin)?; + ac3::pay::register(plugin)?; + av1::depay::register(plugin)?; av1::pay::register(plugin)?; diff --git a/typos.toml b/typos.toml index 8631dd31..399ffef7 100644 --- a/typos.toml +++ b/typos.toml @@ -9,6 +9,9 @@ seeked = "seeked" fiel = "fiel" trun = "trun" +# net/rtp/src/ac3 - "5/8ths" - not sure how to allow this without also letting through all typos of 'this' +ths = "ths" + [files] extend-exclude = [ "*.mcc",