rtp: add mp4gdepay

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1551>
This commit is contained in:
François Laignel 2024-04-24 15:49:49 +02:00 committed by GStreamer Marge Bot
parent 5466cafc24
commit b588ee59bc
12 changed files with 3396 additions and 3 deletions

1
Cargo.lock generated
View file

@ -2719,6 +2719,7 @@ dependencies = [
"once_cell", "once_cell",
"rand", "rand",
"rtp-types", "rtp-types",
"slab",
"smallvec", "smallvec",
"thiserror", "thiserror",
"time", "time",

View file

@ -6977,6 +6977,32 @@
}, },
"rank": "marginal" "rank": "marginal"
}, },
"rtpmp4gdepay2": {
"author": "François Laignel <francois centricular com>",
"description": "Depayload MPEG-4 Generic elementary streams from RTP packets (RFC 3640)",
"hierarchy": [
"GstRtpMpeg4GenericDepay",
"GstRtpBaseDepay2",
"GstElement",
"GstObject",
"GInitiallyUnowned",
"GObject"
],
"klass": "Codec/Depayloader/Network/RTP",
"pad-templates": {
"sink": {
"caps": "application/x-rtp:\n media: { (string)audio, (string)video }\n clock-rate: [ 1, 2147483647 ]\n encoding-name: MPEG4-GENERIC\n mode: { (string)generic, (string)AAC-lbr, (string)AAC-hbr, (string)aac-hbr }\n",
"direction": "sink",
"presence": "always"
},
"src": {
"caps": "video/mpeg:\n mpegversion: 4\n systemstream: false\naudio/mpeg:\n mpegversion: 4\n stream-format: raw\n",
"direction": "src",
"presence": "always"
}
},
"rank": "marginal"
},
"rtppcmadepay2": { "rtppcmadepay2": {
"author": "Sebastian Dröge <sebastian@centricular.com>", "author": "Sebastian Dröge <sebastian@centricular.com>",
"description": "Depayload A-law from RTP packets (RFC 3551)", "description": "Depayload A-law from RTP packets (RFC 3551)",

View file

@ -22,6 +22,7 @@ hex = "0.4.3"
once_cell.workspace = true once_cell.workspace = true
rand = { version = "0.8", default-features = false, features = ["std", "std_rng" ] } rand = { version = "0.8", default-features = false, features = ["std", "std_rng" ] }
rtp-types = { version = "0.1" } rtp-types = { version = "0.1" }
slab = "0.4.9"
smallvec = { version = "1.11", features = ["union", "write", "const_generics", "const_new"] } smallvec = { version = "1.11", features = ["union", "write", "const_generics", "const_new"] }
thiserror = "1" thiserror = "1"
time = { version = "0.3", default-features = false, features = ["std"] } time = { version = "0.3", default-features = false, features = ["std"] }

View file

@ -16,6 +16,9 @@
*/ */
use gst::glib; use gst::glib;
#[macro_use]
mod utils;
mod gcc; mod gcc;
mod audio_discont; mod audio_discont;
@ -26,6 +29,7 @@ mod basepay;
mod av1; mod av1;
mod mp2t; mod mp2t;
mod mp4a; mod mp4a;
mod mp4g;
mod pcmau; mod pcmau;
mod vp8; mod vp8;
mod vp9; mod vp9;
@ -33,8 +37,6 @@ mod vp9;
#[cfg(test)] #[cfg(test)]
mod tests; mod tests;
mod utils;
fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
gcc::register(plugin)?; gcc::register(plugin)?;
@ -59,6 +61,8 @@ fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
mp4a::depay::register(plugin)?; mp4a::depay::register(plugin)?;
mp4a::pay::register(plugin)?; mp4a::pay::register(plugin)?;
mp4g::depay::register(plugin)?;
pcmau::depay::register(plugin)?; pcmau::depay::register(plugin)?;
pcmau::pay::register(plugin)?; pcmau::pay::register(plugin)?;

View file

@ -0,0 +1,748 @@
//! Access Unit Deinterleaving Buffer
use slab::Slab;
use super::{AccessUnit, AccessUnitIndex, MaybeSingleAuOrList, Mpeg4GenericDepayError};
#[derive(Debug)]
struct AuNode {
au: AccessUnit,
/// Index of the next AuNode in the early_aus buffer
next: Option<usize>,
}
/// Access Unit Deinterleaving Buffer.
///
/// In some packet modes, non-consecutive AUs might be grouped together,
/// which can limit the gap between to AUs in case of packet loss.
///
/// The Deinterleaving Buffer collects AUs as they arrive and outputs
/// them in the expected order whenever possible.
///
/// See [Interleaving in RFC 3640](rfc-interleaving).
///
/// [rfc-interleaving]: https://www.rfc-editor.org/rfc/rfc3640.html#section-3.2.3.2
#[derive(Debug, Default)]
pub struct DeinterleaveAuBuffer {
/// Linked list of the early AUs
early_aus: Slab<AuNode>,
/// Index of the head in early_aus buffer
head: Option<usize>,
expected_index: Option<AccessUnitIndex>,
}
impl DeinterleaveAuBuffer {
pub fn new(max_displacement: u32) -> Self {
DeinterleaveAuBuffer {
early_aus: Slab::with_capacity(max_displacement as usize),
..Default::default()
}
}
pub fn drain(&mut self) -> MaybeSingleAuOrList {
self.expected_index = None;
let mut cur_opt = self.head.take();
let len = self.early_aus.len();
match len {
0 => return MaybeSingleAuOrList::default(),
1 => {
let node = self.early_aus.remove(cur_opt.unwrap());
return MaybeSingleAuOrList::from(node.au);
}
_ => (),
}
let mut list = MaybeSingleAuOrList::new_list(len);
while let Some(cur) = cur_opt {
let cur_node = self.early_aus.remove(cur);
list.push(cur_node.au);
cur_opt = cur_node.next;
}
list
}
pub fn flush(&mut self) {
self.early_aus.clear();
self.head = None;
self.expected_index = None;
}
#[track_caller]
pub fn push_and_pop(
&mut self,
au: AccessUnit,
outbuf: &mut MaybeSingleAuOrList,
) -> Result<(), Mpeg4GenericDepayError> {
use std::cmp::Ordering::*;
let mut expected_index = match self.expected_index {
Some(expected_index) => match au.index.try_cmp(expected_index)? {
Equal => expected_index,
Greater => return self.insert_au(au),
Less => {
// Dropping too early Au
return Err(Mpeg4GenericDepayError::TooEarlyAU {
index: au.index,
expected_index,
});
}
},
None => au.index, // first AU
};
outbuf.push(au);
expected_index += 1;
self.expected_index = Some(expected_index);
// Pop other ready AUs if any
let mut head;
let mut head_node_ref;
let mut head_node;
while !self.early_aus.is_empty() {
head = self.head.expect("!early_aus.is_empty");
head_node_ref = self.early_aus.get(head).unwrap();
if head_node_ref.au.index.try_cmp(expected_index)?.is_ne() {
break;
}
head_node = self.early_aus.remove(head);
outbuf.push(head_node.au);
expected_index += 1;
self.expected_index = Some(expected_index);
self.head = head_node.next;
}
Ok(())
}
fn insert_au(&mut self, au: AccessUnit) -> Result<(), Mpeg4GenericDepayError> {
use std::cmp::Ordering::*;
if self.early_aus.is_empty() {
self.head = Some(self.early_aus.insert(AuNode { au, next: None }));
// Nothing to pop
return Ok(());
}
let mut cur = self.head.expect("!early_aus.is_empty");
let mut cur_node = self.early_aus.get(cur).unwrap();
// cur & cur_node refer to current head here
match au.index.try_cmp(cur_node.au.index)? {
Greater => (),
Less => {
// New head
self.head = Some(self.early_aus.insert(AuNode {
au,
next: Some(cur),
}));
return Ok(());
}
Equal => {
// Duplicate
// RFC, §2.3:
// > In addition, an AU MUST NOT be repeated in other RTP packets; hence
// > repetition of an AU is only possible when using a duplicate RTP packet.
//
// But: we can't received duplicates because they would have been rejected
// by the base class or the jitterbuffer.
unreachable!();
}
}
// Upcoming AU is not then new head
loop {
let Some(next) = cur_node.next else {
let new = Some(self.early_aus.insert(AuNode { au, next: None }));
self.early_aus.get_mut(cur).unwrap().next = new;
return Ok(());
};
let next_node = self.early_aus.get(next).unwrap();
match au.index.try_cmp(next_node.au.index)? {
Greater => (), // try next node
Less => {
let new = self.early_aus.insert(AuNode {
au,
next: Some(next),
});
self.early_aus.get_mut(cur).unwrap().next = Some(new);
return Ok(());
}
Equal => {
// Duplicate
// RFC, §2.3:
// > In addition, an AU MUST NOT be repeated in other RTP packets; hence
// > repetition of an AU is only possible when using a duplicate RTP packet.
//
// But: we can't received duplicates because they would have been rejected
// by the base class or the jitterbuffer.
unreachable!();
}
}
cur = next;
cur_node = next_node;
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::mp4g::depay::SingleAuOrList;
impl From<u32> for AccessUnit {
fn from(index: u32) -> Self {
AccessUnit {
index: index.into(),
..Default::default()
}
}
}
#[test]
fn simple_group_interleave() {
// Tests the pattern illustrated in:
// https://www.rfc-editor.org/rfc/rfc3640.html#appendix-A.3
gst::init().unwrap();
let mut deint_buf = DeinterleaveAuBuffer::default();
assert!(deint_buf.early_aus.is_empty());
assert!(deint_buf.expected_index.is_none());
let mut outbuf = MaybeSingleAuOrList::default();
assert!(outbuf.0.is_none());
// ****
// * P0. AUs with indices: 0, 3 & 6
// Expected AU 0 so it is pushed to outbuf
deint_buf.push_and_pop(0.into(), &mut outbuf).unwrap();
assert!(deint_buf.early_aus.is_empty());
assert_eq!(deint_buf.expected_index.unwrap(), 1);
matches!(outbuf.0, Some(SingleAuOrList::Single(_)));
// Expected AU 1 is missing when pushing AU 3 so it is buffered
deint_buf.push_and_pop(3.into(), &mut outbuf).unwrap();
assert_eq!(deint_buf.early_aus.len(), 1);
assert_eq!(deint_buf.expected_index.unwrap(), 1);
matches!(outbuf.0, Some(SingleAuOrList::Single(_)));
// Expected AU 1 is missing when pushing AU 6 so it is buffered
deint_buf.push_and_pop(6.into(), &mut outbuf).unwrap();
assert_eq!(deint_buf.early_aus.len(), 2);
assert_eq!(deint_buf.expected_index.unwrap(), 1);
// End of the RTP packet
matches!(outbuf.take(), Some(SingleAuOrList::Single(_)));
assert!(outbuf.0.is_none());
// ****
// * P1. AUs with indices: 1, 4 & 7
// Expected AU 1 so it is pushed to outbuf
deint_buf.push_and_pop(1.into(), &mut outbuf).unwrap();
assert_eq!(deint_buf.early_aus.len(), 2);
assert_eq!(deint_buf.expected_index.unwrap(), 2);
matches!(outbuf.0, Some(SingleAuOrList::Single(_)));
// Expected AU 2 is missing when pushing AU 4 so it is buffered
deint_buf.push_and_pop(4.into(), &mut outbuf).unwrap();
assert_eq!(deint_buf.early_aus.len(), 3);
assert_eq!(deint_buf.expected_index.unwrap(), 2);
matches!(outbuf.0, Some(SingleAuOrList::Single(_)));
// Expected AU 2 is missing when pushing AU 7 so it is buffered
deint_buf.push_and_pop(7.into(), &mut outbuf).unwrap();
assert_eq!(deint_buf.early_aus.len(), 4);
assert_eq!(deint_buf.expected_index.unwrap(), 2);
// End of the RTP packet
matches!(outbuf.take(), Some(SingleAuOrList::Single(_)));
assert!(outbuf.0.is_none());
// ****
// * P2. AUs with indices: 2, 5 & 8
// Expected AU 2 so it is pushed to outbuf
// and this also pops AUs 3 & 4
// Remaining: 6 & 7
deint_buf.push_and_pop(2.into(), &mut outbuf).unwrap();
assert_eq!(deint_buf.early_aus.len(), 2);
assert_eq!(deint_buf.expected_index.unwrap(), 5);
let Some(SingleAuOrList::List(ref buflist)) = outbuf.0 else {
panic!("Expecting a List");
};
assert_eq!(buflist.len(), 3);
// Expected AU 5 so it is pushed to outbuf
// and this also pops AUs 6 & 7
deint_buf.push_and_pop(5.into(), &mut outbuf).unwrap();
assert!(deint_buf.early_aus.is_empty());
assert_eq!(deint_buf.expected_index.unwrap(), 8);
let Some(SingleAuOrList::List(ref buflist)) = outbuf.0 else {
panic!("Expecting a List");
};
assert_eq!(buflist.len(), 6);
// Expected AU 8 so it is pushed to outbuf
deint_buf.push_and_pop(8.into(), &mut outbuf).unwrap();
assert!(deint_buf.early_aus.is_empty());
assert_eq!(deint_buf.expected_index.unwrap(), 9);
// End of the RTP packet
let Some(SingleAuOrList::List(ref buflist)) = outbuf.take() else {
panic!("Expecting a List");
};
assert_eq!(buflist.len(), 7);
assert!(outbuf.0.is_none());
// ****
// * P3. AUs with indices: 9, 12 & 15
// Expected AU 9 so it is pushed to outbuf
deint_buf.push_and_pop(9.into(), &mut outbuf).unwrap();
assert!(deint_buf.early_aus.is_empty());
assert_eq!(deint_buf.expected_index.unwrap(), 10);
matches!(outbuf.0, Some(SingleAuOrList::Single(_)));
// Expected AU 10 is missing when pushing AU 12 so it is buffered
deint_buf.push_and_pop(12.into(), &mut outbuf).unwrap();
assert_eq!(deint_buf.early_aus.len(), 1);
assert_eq!(deint_buf.expected_index.unwrap(), 10);
matches!(outbuf.0, Some(SingleAuOrList::Single(_)));
// Expected AU 10 is missing when pushing AU 15 so it is buffered
deint_buf.push_and_pop(15.into(), &mut outbuf).unwrap();
assert_eq!(deint_buf.early_aus.len(), 2);
assert_eq!(deint_buf.expected_index.unwrap(), 10);
// End of the RTP packet
matches!(outbuf.take(), Some(SingleAuOrList::Single(_)));
assert!(outbuf.0.is_none());
}
#[test]
fn more_subtle_group_interleave() {
// Tests the pattern illustrated in:
// https://www.rfc-editor.org/rfc/rfc3640.html#appendix-A.4
gst::init().unwrap();
let mut deint_buf = DeinterleaveAuBuffer::default();
let mut outbuf = MaybeSingleAuOrList::default();
// ****
// * P0. AUs with indices: 0 & 5
// Expected AU 0 so it is pushed to outbuf
deint_buf.push_and_pop(0.into(), &mut outbuf).unwrap();
assert!(deint_buf.early_aus.is_empty());
assert_eq!(deint_buf.expected_index.unwrap(), 1);
matches!(outbuf.0, Some(SingleAuOrList::Single(_)));
// Expected AU 1 is missing when pushing AU 5 so it is buffered
deint_buf.push_and_pop(5.into(), &mut outbuf).unwrap();
assert_eq!(deint_buf.early_aus.len(), 1);
assert_eq!(deint_buf.expected_index.unwrap(), 1);
// End of the RTP packet
matches!(outbuf.take(), Some(SingleAuOrList::Single(_)));
assert!(outbuf.0.is_none());
// ****
// * P1. AUs with indices: 2 & 7
// Expected AU 1 is missing when pushing AU 2 so it is buffered
deint_buf.push_and_pop(2.into(), &mut outbuf).unwrap();
assert_eq!(deint_buf.early_aus.len(), 2);
assert_eq!(deint_buf.expected_index.unwrap(), 1);
assert!(outbuf.0.is_none());
// Expected AU 1 is missing when pushing AU 7 so it is buffered
deint_buf.push_and_pop(7.into(), &mut outbuf).unwrap();
assert_eq!(deint_buf.early_aus.len(), 3);
assert_eq!(deint_buf.expected_index.unwrap(), 1);
// End of the RTP packet
assert!(outbuf.take().is_none());
// ****
// * P2. AUs with indices: 4 & 9
// Expected AU 1 is missing when pushing AU 4 so it is buffered
deint_buf.push_and_pop(4.into(), &mut outbuf).unwrap();
assert_eq!(deint_buf.early_aus.len(), 4);
assert_eq!(deint_buf.expected_index.unwrap(), 1);
assert!(outbuf.0.is_none());
// Expected AU 1 is missing when pushing AU 9 so it is buffered
deint_buf.push_and_pop(9.into(), &mut outbuf).unwrap();
assert_eq!(deint_buf.early_aus.len(), 5);
assert_eq!(deint_buf.expected_index.unwrap(), 1);
// End of the RTP packet
assert!(outbuf.take().is_none());
// ****
// * P3. AUs with indices: 1 & 6
// Expected AU 1 so it is pushed to outbuf
// and this also pops AU 2
// Remaining: 4, 5, 7 & 9
deint_buf.push_and_pop(1.into(), &mut outbuf).unwrap();
assert_eq!(deint_buf.early_aus.len(), 4);
assert_eq!(deint_buf.expected_index.unwrap(), 3);
let Some(SingleAuOrList::List(ref buflist)) = outbuf.0 else {
panic!("Expecting a List");
};
assert_eq!(buflist.len(), 2);
// Expected AU 3 is missing when pushing AU 6 so it is buffered
deint_buf.push_and_pop(6.into(), &mut outbuf).unwrap();
assert_eq!(deint_buf.early_aus.len(), 5);
assert_eq!(deint_buf.expected_index.unwrap(), 3);
// End of the RTP packet
let Some(SingleAuOrList::List(ref buflist)) = outbuf.take() else {
panic!("Expecting a List");
};
assert_eq!(buflist.len(), 2);
assert!(outbuf.0.is_none());
// ****
// * P4. AUs with indices: 3 & 8
// Expected AU 3 so it is pushed to outbuf
// and this also pops AU 4, 5, 6 & 7
// Remaining: 9
deint_buf.push_and_pop(3.into(), &mut outbuf).unwrap();
assert_eq!(deint_buf.early_aus.len(), 1);
assert_eq!(deint_buf.expected_index.unwrap(), 8);
let Some(SingleAuOrList::List(ref buflist)) = outbuf.0 else {
panic!("Expecting a List");
};
assert_eq!(buflist.len(), 5);
// Expected AU 8 so it is pushed to outbuf
// and this also pops AU 9
deint_buf.push_and_pop(8.into(), &mut outbuf).unwrap();
assert!(deint_buf.early_aus.is_empty());
assert_eq!(deint_buf.expected_index.unwrap(), 10);
// End of the RTP packet
let Some(SingleAuOrList::List(ref buflist)) = outbuf.take() else {
panic!("Expecting a List");
};
assert_eq!(buflist.len(), 7);
assert!(outbuf.0.is_none());
// ****
// * P5. AUs with indices: 10 & 15
// Expected AU 10 so it is pushed to outbuf
deint_buf.push_and_pop(10.into(), &mut outbuf).unwrap();
assert!(deint_buf.early_aus.is_empty());
assert_eq!(deint_buf.expected_index.unwrap(), 11);
matches!(outbuf.0, Some(SingleAuOrList::Single(_)));
// Expected AU 11 is missing when pushing AU 15 so it is buffered
deint_buf.push_and_pop(15.into(), &mut outbuf).unwrap();
assert_eq!(deint_buf.early_aus.len(), 1);
assert_eq!(deint_buf.expected_index.unwrap(), 11);
// End of the RTP packet
matches!(outbuf.take(), Some(SingleAuOrList::Single(_)));
assert!(outbuf.0.is_none());
}
#[test]
fn continuous_interleave() {
// Tests the pattern illustrated in:
// https://www.rfc-editor.org/rfc/rfc3640.html#appendix-A.5
gst::init().unwrap();
let mut deint_buf = DeinterleaveAuBuffer::default();
let mut outbuf = MaybeSingleAuOrList::default();
// ****
// * P0. AUs with index: 0
// Expected AU 0 so it is pushed to outbuf
deint_buf.push_and_pop(0.into(), &mut outbuf).unwrap();
assert!(deint_buf.early_aus.is_empty());
assert_eq!(deint_buf.expected_index.unwrap(), 1);
// End of the RTP packet
matches!(outbuf.take(), Some(SingleAuOrList::Single(_)));
assert!(outbuf.0.is_none());
// ****
// * P1. AUs with indices: 1 & 4
// Expected AU 0 so it is pushed to outbuf
deint_buf.push_and_pop(1.into(), &mut outbuf).unwrap();
assert!(deint_buf.early_aus.is_empty());
assert_eq!(deint_buf.expected_index.unwrap(), 2);
matches!(outbuf.0, Some(SingleAuOrList::Single(_)));
// Expected AU 2 is missing when pushing AU 4 so it is buffered
deint_buf.push_and_pop(4.into(), &mut outbuf).unwrap();
assert_eq!(deint_buf.early_aus.len(), 1);
assert_eq!(deint_buf.expected_index.unwrap(), 2);
// End of the RTP packet
matches!(outbuf.take(), Some(SingleAuOrList::Single(_)));
assert!(outbuf.take().is_none());
// ****
// * P2. AUs with indices: 2, 5 & 8
// Expected AU 2 so it is pushed to outbuf
deint_buf.push_and_pop(2.into(), &mut outbuf).unwrap();
assert_eq!(deint_buf.early_aus.len(), 1);
assert_eq!(deint_buf.expected_index.unwrap(), 3);
matches!(outbuf.0, Some(SingleAuOrList::Single(_)));
// Expected AU 3 is missing when pushing AU 5 so it is buffered
deint_buf.push_and_pop(5.into(), &mut outbuf).unwrap();
assert_eq!(deint_buf.early_aus.len(), 2);
assert_eq!(deint_buf.expected_index.unwrap(), 3);
matches!(outbuf.0, Some(SingleAuOrList::Single(_)));
// Expected AU 3 is missing when pushing AU 8 so it is buffered
deint_buf.push_and_pop(8.into(), &mut outbuf).unwrap();
assert_eq!(deint_buf.early_aus.len(), 3);
assert_eq!(deint_buf.expected_index.unwrap(), 3);
// End of the RTP packet
matches!(outbuf.take(), Some(SingleAuOrList::Single(_)));
assert!(outbuf.take().is_none());
// ****
// * P3. AUs with indices: 3, 6, 9 & 12
// Expected AU 3 so it is pushed to outbuf
// and this also pops AU 4 & 5
// Remaining: 8
deint_buf.push_and_pop(3.into(), &mut outbuf).unwrap();
assert_eq!(deint_buf.early_aus.len(), 1);
assert_eq!(deint_buf.expected_index.unwrap(), 6);
let Some(SingleAuOrList::List(ref buflist)) = outbuf.0 else {
panic!("Expecting a List");
};
assert_eq!(buflist.len(), 3);
// Expected AU 6 so it is pushed to outbuf
deint_buf.push_and_pop(6.into(), &mut outbuf).unwrap();
assert_eq!(deint_buf.early_aus.len(), 1);
assert_eq!(deint_buf.expected_index.unwrap(), 7);
let Some(SingleAuOrList::List(ref buflist)) = outbuf.0 else {
panic!("Expecting a List");
};
assert_eq!(buflist.len(), 4);
// Expected AU 7 is missing when pushing AU 9 so it is buffered
deint_buf.push_and_pop(9.into(), &mut outbuf).unwrap();
assert_eq!(deint_buf.early_aus.len(), 2);
assert_eq!(deint_buf.expected_index.unwrap(), 7);
matches!(outbuf.0, Some(SingleAuOrList::List(_)));
// Expected AU 7 is missing when pushing AU 12 so it is buffered
deint_buf.push_and_pop(12.into(), &mut outbuf).unwrap();
assert_eq!(deint_buf.early_aus.len(), 3);
assert_eq!(deint_buf.expected_index.unwrap(), 7);
// End of the RTP packet
let Some(SingleAuOrList::List(ref buflist)) = outbuf.take() else {
panic!("Expecting a List");
};
assert_eq!(buflist.len(), 4);
assert!(outbuf.0.is_none());
// ****
// * P4. AUs with indices: 7, 10, 13 & 16
// Expected AU 7 so it is pushed to outbuf
// and this also pops AU 8 & 9
// Remaining: 12
deint_buf.push_and_pop(7.into(), &mut outbuf).unwrap();
assert_eq!(deint_buf.early_aus.len(), 1);
assert_eq!(deint_buf.expected_index.unwrap(), 10);
let Some(SingleAuOrList::List(ref buflist)) = outbuf.0 else {
panic!("Expecting a List");
};
assert_eq!(buflist.len(), 3);
// Expected AU 10 so it is pushed to outbuf
deint_buf.push_and_pop(10.into(), &mut outbuf).unwrap();
assert_eq!(deint_buf.early_aus.len(), 1);
assert_eq!(deint_buf.expected_index.unwrap(), 11);
let Some(SingleAuOrList::List(ref buflist)) = outbuf.0 else {
panic!("Expecting a List");
};
assert_eq!(buflist.len(), 4);
// Expected AU 11 is missing when pushing AU 13 so it is buffered
deint_buf.push_and_pop(13.into(), &mut outbuf).unwrap();
assert_eq!(deint_buf.early_aus.len(), 2);
assert_eq!(deint_buf.expected_index.unwrap(), 11);
matches!(outbuf.0, Some(SingleAuOrList::List(_)));
// Expected AU 11 is missing when pushing AU 16 so it is buffered
deint_buf.push_and_pop(16.into(), &mut outbuf).unwrap();
assert_eq!(deint_buf.early_aus.len(), 3);
assert_eq!(deint_buf.expected_index.unwrap(), 11);
// End of the RTP packet
let Some(SingleAuOrList::List(ref buflist)) = outbuf.take() else {
panic!("Expecting a List");
};
assert_eq!(buflist.len(), 4);
assert!(outbuf.0.is_none());
// ****
// * P5. AUs with indices: 11, 14, 17 & 20
// Expected AU 11 so it is pushed to outbuf
// and this also pops AU 12 & 13
// Remaining: 16
deint_buf.push_and_pop(11.into(), &mut outbuf).unwrap();
assert_eq!(deint_buf.early_aus.len(), 1);
assert_eq!(deint_buf.expected_index.unwrap(), 14);
let Some(SingleAuOrList::List(ref buflist)) = outbuf.0 else {
panic!("Expecting a List");
};
assert_eq!(buflist.len(), 3);
// Expected AU 14 so it is pushed to outbuf
deint_buf.push_and_pop(14.into(), &mut outbuf).unwrap();
assert_eq!(deint_buf.early_aus.len(), 1);
assert_eq!(deint_buf.expected_index.unwrap(), 15);
let Some(SingleAuOrList::List(ref buflist)) = outbuf.0 else {
panic!("Expecting a List");
};
assert_eq!(buflist.len(), 4);
// Expected AU 15 is missing when pushing AU 17 so it is buffered
deint_buf.push_and_pop(17.into(), &mut outbuf).unwrap();
assert_eq!(deint_buf.early_aus.len(), 2);
assert_eq!(deint_buf.expected_index.unwrap(), 15);
matches!(outbuf.0, Some(SingleAuOrList::List(_)));
// Expected AU 15 is missing when pushing AU 20 so it is buffered
deint_buf.push_and_pop(20.into(), &mut outbuf).unwrap();
assert_eq!(deint_buf.early_aus.len(), 3);
assert_eq!(deint_buf.expected_index.unwrap(), 15);
// End of the RTP packet
let Some(SingleAuOrList::List(ref buflist)) = outbuf.take() else {
panic!("Expecting a List");
};
assert_eq!(buflist.len(), 4);
assert!(outbuf.0.is_none());
// ****
// * P6. AUs with indices: 15 & 18
// Expected AU 15 so it is pushed to outbuf
// and this also pops AU 16 & 17
// Remaining: 20
deint_buf.push_and_pop(15.into(), &mut outbuf).unwrap();
assert_eq!(deint_buf.early_aus.len(), 1);
assert_eq!(deint_buf.expected_index.unwrap(), 18);
let Some(SingleAuOrList::List(ref buflist)) = outbuf.0 else {
panic!("Expecting a List");
};
assert_eq!(buflist.len(), 3);
// Expected AU 18 so it is pushed to outbuf
deint_buf.push_and_pop(18.into(), &mut outbuf).unwrap();
assert_eq!(deint_buf.early_aus.len(), 1);
assert_eq!(deint_buf.expected_index.unwrap(), 19);
// End of the RTP packet
let Some(SingleAuOrList::List(ref buflist)) = outbuf.take() else {
panic!("Expecting a List");
};
assert_eq!(buflist.len(), 4);
assert!(outbuf.0.is_none());
// ****
// * P7. AUs with index: 19
deint_buf.push_and_pop(19.into(), &mut outbuf).unwrap();
// Expected AU 19 so it is pushed to outbuf
// and this also pops AU 20
assert!(deint_buf.early_aus.is_empty());
assert_eq!(deint_buf.expected_index.unwrap(), 21);
// End of the RTP packet
let Some(SingleAuOrList::List(ref buflist)) = outbuf.take() else {
panic!("Expecting a List");
};
assert_eq!(buflist.len(), 2);
assert!(outbuf.0.is_none());
}
}

View file

@ -0,0 +1,641 @@
// GStreamer RTP MPEG-4 Generic elementary streams Depayloader
//
// Copyright (C) 2023-2024 François Laignel <francois centricular com>
//
// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
// If a copy of the MPL was not distributed with this file, You can obtain one at
// <https://mozilla.org/MPL/2.0/>.
//
// SPDX-License-Identifier: MPL-2.0
/**
* SECTION:element-rtpmp4gdepay2
* @see_also: rtpmp4gpay2, rtpmp4gdepay, rtpmp4gpay
*
* Depayload an MPEG-4 Generic elementary stream from RTP packets as per [RFC 3640][rfc-3640].
*
* [rfc-3640]: https://www.rfc-editor.org/rfc/rfc3640.html#section-4
*
* ## Example pipeline
*
* |[
* gst-launch-1.0 udpsrc caps='application/x-rtp,media=audio,clock-rate=44100,encoding-name=MPEG4-GENERIC,payload=96,encoding-params=1,streamtype=5,profile-level-id=2,mode=AAC-hbr,config=(string)1208,sizelength=13,indexlength=3,indexdeltalength=3' ! rtpjitterbuffer ! rtpmp4gdepay2 ! decodebin3 ! audioconvert ! audioresample ! autoaudiosink
* ]| This will depayload an incoming RTP MPEG-4 generic elementary stream AAC-hbr with
* 1 channel @ 44100 sampling rate (default `audiotestsrc ! fdkaacenc` negotiation).
* You can use the #rtpmp4gpay2 or #rtpmp4gpay elements to create such an RTP stream.
*
* Since: plugins-rs-0.13.0
*/
use anyhow::Context;
use atomic_refcell::AtomicRefCell;
use once_cell::sync::Lazy;
use gst::{glib, prelude::*, subclass::prelude::*};
use std::ops::{ControlFlow, RangeInclusive};
use crate::basedepay::{Packet, PacketToBufferRelation, RtpBaseDepay2Ext, TimestampOffset};
use crate::mp4g::{ModeConfig, RtpTimestamp};
use super::parsers::PayloadParser;
use super::{
AccessUnit, DeinterleaveAuBuffer, MaybeSingleAuOrList, Mpeg4GenericDepayError, SingleAuOrList,
};
static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
gst::DebugCategory::new(
"rtpmp4gdepay2",
gst::DebugColorFlags::empty(),
Some("RTP MPEG-4 generic Depayloader"),
)
});
#[derive(Default)]
pub struct RtpMpeg4GenericDepay {
state: AtomicRefCell<State>,
}
#[glib::object_subclass]
impl ObjectSubclass for RtpMpeg4GenericDepay {
const NAME: &'static str = "GstRtpMpeg4GenericDepay";
type Type = super::RtpMpeg4GenericDepay;
type ParentType = crate::basedepay::RtpBaseDepay2;
}
impl ObjectImpl for RtpMpeg4GenericDepay {}
impl GstObjectImpl for RtpMpeg4GenericDepay {}
impl ElementImpl for RtpMpeg4GenericDepay {
fn metadata() -> Option<&'static gst::subclass::ElementMetadata> {
static ELEMENT_METADATA: Lazy<gst::subclass::ElementMetadata> = Lazy::new(|| {
gst::subclass::ElementMetadata::new(
"RTP MPEG-4 Generic ES Depayloader",
"Codec/Depayloader/Network/RTP",
"Depayload MPEG-4 Generic elementary streams from RTP packets (RFC 3640)",
"François Laignel <francois centricular com>",
)
});
Some(&*ELEMENT_METADATA)
}
fn pad_templates() -> &'static [gst::PadTemplate] {
static PAD_TEMPLATES: Lazy<Vec<gst::PadTemplate>> = Lazy::new(|| {
let sink_pad_template = gst::PadTemplate::new(
"sink",
gst::PadDirection::Sink,
gst::PadPresence::Always,
&gst::Caps::builder("application/x-rtp")
// TODO "application" is also present in rtpmp4gdepay caps template
// but it doesn't handle it in gst_rtp_mp4g_depay_setcaps
.field("media", gst::List::new(["audio", "video"]))
.field("clock-rate", gst::IntRange::new(1i32, i32::MAX))
.field("encoding-name", "MPEG4-GENERIC")
// Required string params:
// "streamtype = { \"4\", \"5\" }, " Not set by Wowza 4 = video, 5 = audio
// "profile-level-id = [1,MAX], "
// "config = (string)"
.field(
"mode",
gst::List::new(["generic", "AAC-lbr", "AAC-hbr", "aac-hbr"]),
)
// Optional general parameters:
// "objecttype = [1,MAX], "
// "constantsize = [1,MAX], " // constant size of each AU
// "constantduration = [1,MAX], " // constant duration of each AU
// "maxdisplacement = [1,MAX], "
// "de-interleavebuffersize = [1,MAX], "
// Optional configuration parameters:
// "sizelength = [1, 32], "
// "indexlength = [1, 32], "
// "indexdeltalength = [1, 32], "
// "ctsdeltalength = [1, 32], "
// "dtsdeltalength = [1, 32], "
// "randomaccessindication = {0, 1}, "
// "streamstateindication = [0, 32], "
// "auxiliarydatasizelength = [0, 32]" )
.build(),
)
.unwrap();
let src_pad_template = gst::PadTemplate::new(
"src",
gst::PadDirection::Src,
gst::PadPresence::Always,
&gst::Caps::builder_full()
.structure(
gst::Structure::builder("video/mpeg")
.field("mpegversion", 4i32)
.field("systemstream", false)
.build(),
)
.structure(
gst::Structure::builder("audio/mpeg")
.field("mpegversion", 4i32)
.field("stream-format", "raw")
.build(),
)
.build(),
)
.unwrap();
vec![src_pad_template, sink_pad_template]
});
PAD_TEMPLATES.as_ref()
}
}
#[derive(Debug, Default)]
struct State {
parser: PayloadParser,
deint_buf: Option<DeinterleaveAuBuffer>,
au_acc: Option<AuAccumulator>,
seqnum_base: Option<u32>,
clock_rate: u32,
can_parse: bool,
max_au_index: Option<usize>,
prev_au_index: Option<usize>,
prev_rtptime: Option<u64>,
last_au_index: Option<usize>,
}
impl State {
fn flush(&mut self) {
self.parser.reset();
if let Some(deint_buf) = self.deint_buf.as_mut() {
deint_buf.flush();
}
self.can_parse = false;
self.max_au_index = None;
self.prev_au_index = None;
self.prev_rtptime = None;
self.last_au_index = None;
}
}
struct CodecData;
impl CodecData {
fn from_caps(s: &gst::StructureRef) -> anyhow::Result<Option<gst::Buffer>> {
let conf_str = s.get_optional::<&str>("config").context("config field")?;
let Some(conf_str) = conf_str else {
return Ok(None);
};
let data = hex::decode(conf_str).context("decoding config")?;
Ok(Some(gst::Buffer::from_mut_slice(data)))
}
}
/// Accumulates packets for a fragmented AU.
///
/// Used for packets containing fragments for a single AU.
///
/// From https://www.rfc-editor.org/rfc/rfc3640.html#section-3.2.3:
///
/// > The Access Unit Data Section contains an integer number of complete
/// > Access Units or a single fragment of one AU.
#[derive(Debug)]
struct AuAccumulator(AccessUnit);
impl AuAccumulator {
#[inline]
fn new(au: AccessUnit) -> Self {
AuAccumulator(au)
}
#[inline]
fn try_append(&mut self, mut au: AccessUnit) -> Result<(), Mpeg4GenericDepayError> {
use Mpeg4GenericDepayError::*;
// FIXME add comment about fragments having the same RTP timestamp
if self.0.cts_delta.opt_ne(au.cts_delta).unwrap_or(false) {
return Err(FragmentedAuRtpTsMismatch {
expected: self.0.cts_delta.unwrap(),
found: au.cts_delta.unwrap(),
ext_seqnum: au.ext_seqnum,
});
}
if self.0.dts_delta.opt_ne(au.dts_delta).unwrap_or(false) {
// § 3.2.1.1
// > The DTS-delta field MUST have the same value
// > for all fragments of an Access Unit
return Err(FragmentedAuDtsMismatch {
expected: self.0.dts_delta.unwrap(),
found: au.dts_delta.unwrap(),
ext_seqnum: au.ext_seqnum,
});
}
self.0.data.append(&mut au.data);
Ok(())
}
#[inline]
fn try_into_au(self) -> Result<AccessUnit, Mpeg4GenericDepayError> {
let au = self.0;
if let Some(expected) = au.size {
if expected as usize != au.data.len() {
return Err(Mpeg4GenericDepayError::FragmentedAuSizeMismatch {
expected,
found: au.data.len(),
ext_seqnum: au.ext_seqnum,
});
}
}
Ok(au)
}
}
impl crate::basedepay::RtpBaseDepay2Impl for RtpMpeg4GenericDepay {
const ALLOWED_META_TAGS: &'static [&'static str] = &["audio", "video"];
fn stop(&self) -> Result<(), gst::ErrorMessage> {
*self.state.borrow_mut() = State::default();
Ok(())
}
fn drain(&self) -> Result<gst::FlowSuccess, gst::FlowError> {
let mut state = self.state.borrow_mut();
if let Some(ref mut deint_buf) = state.deint_buf {
if let Some(aus) = deint_buf.drain().take() {
self.finish_buffer_or_list(&state, None, aus)?;
}
}
Ok(gst::FlowSuccess::Ok)
}
fn flush(&self) {
gst::debug!(CAT, imp: self, "Flushing");
self.state.borrow_mut().flush();
}
fn set_sink_caps(&self, caps: &gst::Caps) -> bool {
let s = caps.structure(0).unwrap();
let mode = s.get::<&str>("mode").expect("Required by Caps");
if mode.starts_with("CELP") {
gst::error!(CAT, imp: self, "{mode} not supported yet");
return false;
}
let mut caps_builder = match s.get::<&str>("media").expect("Required by Caps") {
"audio" => gst::Caps::builder("audio/mpeg")
.field("mpegversion", 4i32)
.field("stream-format", "raw"),
"video" => gst::Caps::builder("video/mpeg")
.field("mpegversion", 4i32)
.field("systemstream", false),
// TODO handle "application"
_ => unreachable!(),
};
let mode_config = match ModeConfig::from_caps(s) {
Ok(h) => h,
Err(err) => {
gst::error!(CAT, imp: self, "Error parsing Header in Caps: {err:#}");
return false;
}
};
match CodecData::from_caps(s) {
Ok(codec_data) => {
caps_builder = caps_builder.field("codec_data", codec_data);
}
Err(err) => {
gst::error!(CAT, imp: self, "Error parsing Caps: {err:#}");
return false;
}
}
let clock_rate = s.get::<i32>("clock-rate").expect("Required by Caps");
debug_assert!(clock_rate.is_positive()); // constrained by Caps
let clock_rate = clock_rate as u32;
{
let mut state = self.state.borrow_mut();
state.seqnum_base = s.get_optional::<u32>("seqnum-base").unwrap();
if let Some(seqnum_base) = state.seqnum_base {
gst::info!(CAT, imp: self, "Got seqnum_base {seqnum_base}");
}
state.clock_rate = clock_rate;
if let Some(max_displacement) = mode_config.max_displacement() {
state.deint_buf = Some(DeinterleaveAuBuffer::new(max_displacement));
}
state.parser.set_config(mode_config);
}
self.obj().set_src_caps(&caps_builder.build());
true
}
fn handle_packet(
&self,
packet: &crate::basedepay::Packet,
) -> Result<gst::FlowSuccess, gst::FlowError> {
let mut state = self.state.borrow_mut();
if self.check_initial_packet(&mut state, packet).is_break() {
self.obj().drop_packets(..=packet.ext_seqnum());
return Ok(gst::FlowSuccess::Ok);
}
let State {
parser,
au_acc,
deint_buf,
..
} = &mut *state;
let payload = packet.payload();
let ext_seqnum = packet.ext_seqnum();
let packet_ts = RtpTimestamp::from_ext(packet.ext_timestamp());
let au_iter = match parser.parse(payload, ext_seqnum, packet_ts) {
Ok(au_iter) => au_iter,
Err(err) => {
gst::warning!(CAT, imp: self, "Failed to parse payload for packet {ext_seqnum}: {err:#}");
*au_acc = None;
self.obj().drop_packets(..=packet.ext_seqnum());
return Ok(gst::FlowSuccess::Ok);
}
};
let mut aus = MaybeSingleAuOrList::default();
for au in au_iter {
let au = match au {
Ok(au) => au,
Err(err) => {
gst::warning!(CAT, imp: self,
"Failed to parse AU from packet {}: {err:#}", packet.ext_seqnum(),
);
continue;
}
};
// § 3.1: The marker indicates that:
// > the RTP packet payload contains either the final fragment of
// > a fragmented Access Unit or one or more complete Access Units
if !packet.marker_bit() {
if !au.is_fragment {
gst::warning!(CAT, imp: self, "Dropping non fragmented AU {au} in un-marked packet");
continue;
}
if let Some(ref mut acc) = au_acc {
if let Err(err) = acc.try_append(au) {
gst::warning!(CAT, imp: self, "Discarding pending fragmented AU: {err}");
*au_acc = None;
parser.reset();
self.obj().drop_packets(..=packet.ext_seqnum());
return Ok(gst::FlowSuccess::Ok);
}
} else {
*au_acc = Some(AuAccumulator::new(au));
}
gst::trace!(CAT, imp: self, "Non-final fragment");
return Ok(gst::FlowSuccess::Ok);
}
// Packet marker set
let au = match au_acc.take() {
Some(mut acc) => {
if au.is_fragment {
if let Err(err) = acc.try_append(au) {
gst::warning!(CAT, imp: self, "Discarding pending fragmented AU: {err}");
parser.reset();
self.obj().drop_packets(..=packet.ext_seqnum());
return Ok(gst::FlowSuccess::Ok);
}
match acc.try_into_au() {
Ok(au) => au,
Err(err) => {
gst::warning!(CAT, imp: self, "Discarding pending fragmented AU: {err}");
let Mpeg4GenericDepayError::FragmentedAuSizeMismatch { .. } = err
else {
unreachable!();
};
parser.reset();
self.obj().drop_packets(..=packet.ext_seqnum());
return Ok(gst::FlowSuccess::Ok);
}
}
} else {
gst::warning!(CAT, imp: self,
"Discarding pending fragmented AU {} due to incoming non fragmented AU {au}",
acc.0,
);
self.obj().drop_packets(..au.ext_seqnum);
au
}
}
None => au,
};
if let Some(ref mut deint_buf) = deint_buf {
if let Err(err) = deint_buf.push_and_pop(au, &mut aus) {
gst::warning!(CAT, imp: self, "Failed to push AU to deinterleave buffer: {err}");
// The AU has been dropped, just keep going
// Packet will be dropped eventually
}
continue;
}
if au.is_interleaved {
// From gstrtpmp4gdepay.c:616:
// > some broken non-interleaved streams have AU-index jumping around
// > all over the place, apparently assuming receiver disregards
gst::warning!(CAT, imp: self, "Interleaved AU, but no `max_displacement` was defined");
}
aus.push(au);
}
if let Some(aus) = aus.take() {
self.finish_buffer_or_list(&state, Some(packet.ext_seqnum()), aus)?;
}
Ok(gst::FlowSuccess::Ok)
}
}
impl RtpMpeg4GenericDepay {
#[inline]
fn check_initial_packet(&self, state: &mut State, packet: &Packet) -> ControlFlow<()> {
if state.can_parse {
return ControlFlow::Continue(());
}
let seqnum = (packet.ext_seqnum() & 0xffff) as u16;
if let Some(seqnum_base) = state.seqnum_base {
let seqnum_base = (seqnum_base & 0xffff) as u16;
// Assume seqnum_base and the initial ext_seqnum are in the same cycle
// This should be guaranteed by the JitterBuffer
let delta = crate::utils::seqnum_distance(seqnum, seqnum_base);
if delta == 0 {
gst::debug!(CAT, imp: self,
"Got initial packet {seqnum_base} @ ext seqnum {}", packet.ext_seqnum(),
);
state.can_parse = true;
return ControlFlow::Continue(());
}
if delta < 0 {
gst::log!(CAT, imp: self,
"Waiting for initial packet {seqnum_base}, got {seqnum} (ext seqnum {})",
packet.ext_seqnum(),
);
return ControlFlow::Break(());
}
gst::debug!(CAT, imp: self,
"Packet {seqnum} (ext seqnum {}) passed expected initial packet {seqnum_base}, will sync on next marker",
packet.ext_seqnum(),
);
state.seqnum_base = None;
}
// Wait until a marked packet is found and start parsing from the next packet
if packet.marker_bit() {
gst::debug!(CAT, imp: self,
"Found first marked packet {seqnum} (ext seqnum {}). Will start parsing from next packet",
packet.ext_seqnum(),
);
assert!(state.au_acc.is_none());
state.can_parse = true;
} else {
gst::log!(CAT, imp: self,
"First marked packet not found yet, skipping packet {seqnum} (ext seqnum {})",
packet.ext_seqnum(),
);
}
ControlFlow::Break(())
}
fn finish_buffer_or_list(
&self,
state: &State,
packet_ext_seqnum: Option<u64>,
aus: SingleAuOrList,
) -> Result<gst::FlowSuccess, gst::FlowError> {
use SingleAuOrList::*;
fn get_packet_to_buffer_relation(
au: &AccessUnit,
clock_rate: u32,
range: RangeInclusive<u64>,
) -> PacketToBufferRelation {
if let Some((cts_delta, dts_delta)) = Option::zip(au.cts_delta, au.dts_delta) {
let pts_offset = gst::Signed::<gst::ClockTime>::from(cts_delta as i64)
.mul_div_floor(*gst::ClockTime::SECOND, clock_rate as u64)
.unwrap();
let dts_offset = gst::Signed::<gst::ClockTime>::from(dts_delta as i64)
.mul_div_floor(*gst::ClockTime::SECOND, clock_rate as u64)
.unwrap();
PacketToBufferRelation::SeqnumsWithOffset {
seqnums: range,
timestamp_offset: TimestampOffset::PtsAndDts(pts_offset, dts_offset),
}
} else if let Some(cts_delta) = au.cts_delta {
let pts_offset = gst::Signed::<gst::ClockTime>::from(cts_delta as i64)
.mul_div_floor(*gst::ClockTime::SECOND, clock_rate as u64)
.unwrap();
PacketToBufferRelation::SeqnumsWithOffset {
seqnums: range,
timestamp_offset: TimestampOffset::Pts(pts_offset),
}
} else {
PacketToBufferRelation::Seqnums(range)
}
}
match aus {
Single(au) => {
let range = if let Some(packet_ext_seqnum) = packet_ext_seqnum {
au.ext_seqnum..=packet_ext_seqnum
} else {
au.ext_seqnum..=au.ext_seqnum
};
let packet_to_buffer_relation =
get_packet_to_buffer_relation(&au, state.clock_rate, range);
gst::trace!(CAT, imp: self, "Finishing AU buffer {packet_to_buffer_relation:?}");
let buffer = Self::new_buffer(au, state);
self.obj().queue_buffer(packet_to_buffer_relation, buffer)?;
}
List(au_list) => {
for au in au_list {
let range = if let Some(packet_ext_seqnum) = packet_ext_seqnum {
au.ext_seqnum..=packet_ext_seqnum
} else {
au.ext_seqnum..=au.ext_seqnum
};
let packet_to_buffer_relation =
get_packet_to_buffer_relation(&au, state.clock_rate, range);
gst::trace!(CAT, imp: self, "Finishing AU buffer {packet_to_buffer_relation:?}");
let buffer = Self::new_buffer(au, state);
self.obj().queue_buffer(packet_to_buffer_relation, buffer)?;
}
}
}
Ok(gst::FlowSuccess::Ok)
}
#[inline]
fn new_buffer(au: AccessUnit, state: &State) -> gst::Buffer {
let mut buf = gst::Buffer::from_mut_slice(au.data);
let buf_mut = buf.get_mut().unwrap();
if au.maybe_random_access == Some(false) {
buf_mut.set_flags(gst::BufferFlags::DELTA_UNIT)
}
if let Some(duration) = au.duration {
let duration = (duration as u64)
.mul_div_floor(*gst::ClockTime::SECOND, state.clock_rate as u64)
.map(gst::ClockTime::from_nseconds);
buf_mut.set_duration(duration);
}
buf
}
}

View file

@ -0,0 +1,185 @@
// GStreamer RTP MPEG-4 generic elementary streams Depayloader
//
// Copyright (C) 2023-2024 François Laignel <francois centricular com>
//
// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
// If a copy of the MPL was not distributed with this file, You can obtain one at
// <https://mozilla.org/MPL/2.0/>.
//
// SPDX-License-Identifier: MPL-2.0
pub mod imp;
pub(crate) mod parsers;
mod deint_buf;
pub(crate) use deint_buf::DeinterleaveAuBuffer;
use gst::glib;
use gst::prelude::*;
use smallvec::SmallVec;
use std::fmt;
use crate::mp4g::{AccessUnitIndex, Mpeg4GenericError};
glib::wrapper! {
pub struct RtpMpeg4GenericDepay(ObjectSubclass<imp::RtpMpeg4GenericDepay>)
@extends crate::basedepay::RtpBaseDepay2, gst::Element, gst::Object;
}
pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
gst::Element::register(
Some(plugin),
"rtpmp4gdepay2",
gst::Rank::MARGINAL,
RtpMpeg4GenericDepay::static_type(),
)
}
#[derive(thiserror::Error, Debug, PartialEq, Eq)]
pub enum Mpeg4GenericDepayError {
#[error("{}", .0)]
Mpeg4Generic(#[from] Mpeg4GenericError),
#[error("AU header section too large: expected end {expected_end} / {total}")]
AuHeaderSectionTooLarge { expected_end: usize, total: usize },
#[error("AU auxiliary section too large: expected end {expected_end} / {total}")]
AuAuxiliarySectionTooLarge { expected_end: usize, total: usize },
#[error("Empty AU Data section")]
EmptyAuData,
#[error("Unknown AU size, but multiple AUs in the packet")]
MultipleAusUnknownSize,
#[error("Multiple AUs in packet but the AU size {au_size} is > AU data size {au_data_size}")]
MultipleAusGreaterSizeThanAuData { au_size: usize, au_data_size: usize },
#[error("No more AU data left for AU with index {index}")]
NoMoreAuDataLeft { index: AccessUnitIndex },
#[error("Got AU with index {index} which is earlier than the expected index {expected_index}")]
TooEarlyAU {
index: AccessUnitIndex,
expected_index: AccessUnitIndex,
},
#[error("Unexpected non-zero first AU index {index} in packet {ext_seqnum} due to configured constant duration")]
ConstantDurationAuNonZeroIndex {
index: AccessUnitIndex,
ext_seqnum: u64,
},
#[error("Constant duration not configured and no headers in packet {ext_seqnum}")]
NonConstantDurationNoAuHeaders { ext_seqnum: u64 },
#[error("Constant duration not configured and no CTS delta for AU index {index} in packet {ext_seqnum}")]
NonConstantDurationAuNoCtsDelta {
index: AccessUnitIndex,
ext_seqnum: u64,
},
#[error(
"Fragmented AU size mismatch: expected {expected}, found {found}. Packet {ext_seqnum}"
)]
FragmentedAuSizeMismatch {
expected: u32,
found: usize,
ext_seqnum: u64,
},
#[error("Fragmented AU CTS mismatch: expected {expected}, found {found}. Packet {ext_seqnum}")]
FragmentedAuRtpTsMismatch {
expected: i32,
found: i32,
ext_seqnum: u64,
},
#[error("Fragmented AU DTS mismatch: expected {expected}, found {found}. Packet {ext_seqnum}")]
FragmentedAuDtsMismatch {
expected: i32,
found: i32,
ext_seqnum: u64,
},
}
#[derive(Debug)]
#[allow(clippy::large_enum_variant)]
pub enum SingleAuOrList {
Single(AccessUnit),
List(SmallVec<[AccessUnit; 5]>),
}
impl SingleAuOrList {
pub fn new_list(capacity: usize) -> Self {
SingleAuOrList::List(SmallVec::with_capacity(capacity))
}
#[inline]
pub fn push(&mut self, au: AccessUnit) {
use SingleAuOrList::*;
match self {
Single(_) => {
let list = List(SmallVec::new());
let prev = std::mem::replace(self, list);
let Single(prev) = prev else { unreachable!() };
let List(list) = self else { unreachable!() };
list.push(prev);
list.push(au);
}
List(list) => list.push(au),
}
}
}
#[derive(Debug, Default)]
pub struct MaybeSingleAuOrList(Option<SingleAuOrList>);
impl MaybeSingleAuOrList {
pub fn new_list(capacity: usize) -> Self {
MaybeSingleAuOrList(Some(SingleAuOrList::new_list(capacity)))
}
pub fn push(&mut self, au: AccessUnit) {
match &mut self.0 {
Some(inner) => inner.push(au),
None => self.0 = Some(SingleAuOrList::Single(au)),
}
}
#[inline]
pub fn take(&mut self) -> Option<SingleAuOrList> {
self.0.take()
}
}
impl From<AccessUnit> for MaybeSingleAuOrList {
fn from(au: AccessUnit) -> Self {
MaybeSingleAuOrList(Some(SingleAuOrList::Single(au)))
}
}
/// A parsed Access Unit.
///
/// All timestamps and duration in clock rate ticks.
/// All timestamps are based on the RTP timestamp of the packet.
#[derive(Debug, Default)]
pub struct AccessUnit {
pub(crate) ext_seqnum: u64,
pub(crate) is_fragment: bool,
pub(crate) size: Option<u32>,
pub(crate) index: AccessUnitIndex,
pub(crate) cts_delta: Option<i32>,
pub(crate) dts_delta: Option<i32>,
pub(crate) duration: Option<u32>,
pub(crate) maybe_random_access: Option<bool>,
pub(crate) is_interleaved: bool,
pub(crate) data: Vec<u8>,
}
impl fmt::Display for AccessUnit {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "index {} from Packet {}", self.index, self.ext_seqnum)
}
}

File diff suppressed because it is too large Load diff

120
net/rtp/src/mp4g/header.rs Normal file
View file

@ -0,0 +1,120 @@
//! Access Unit Header and its parser & writer.
use bitstream_io::{BitRead, FromBitStreamWith};
use crate::mp4g::{AccessUnitIndex, ModeConfig};
use crate::utils::raw_2_comp_to_i32;
#[derive(thiserror::Error, Debug, PartialEq, Eq)]
pub enum AuHeaderError {
#[error("Unexpected zero-sized AU {}", .0)]
ZeroSizedAu(AccessUnitIndex),
#[error("Unexpected CTS flag set for the first AU header {}", .0)]
CtsFlagSetInFirstAuHeader(AccessUnitIndex),
}
#[derive(Debug)]
pub struct AuHeaderContext<'a> {
pub(crate) config: &'a ModeConfig,
pub(crate) prev_index: Option<AccessUnitIndex>,
}
#[derive(Debug, Default)]
pub struct AuHeader {
pub(crate) size: Option<u32>,
pub(crate) index: AccessUnitIndex,
pub(crate) cts_delta: Option<i32>,
pub(crate) dts_delta: Option<i32>,
pub(crate) maybe_random_access: Option<bool>,
pub(crate) is_interleaved: bool,
}
impl AuHeader {
#[inline]
pub(crate) fn new_with(index: impl Into<AccessUnitIndex>) -> Self {
AuHeader {
index: index.into(),
..Default::default()
}
}
}
impl<'a> FromBitStreamWith<'a> for AuHeader {
type Context = AuHeaderContext<'a>;
type Error = anyhow::Error;
fn from_reader<R: BitRead + ?Sized>(
r: &mut R,
ctx: &AuHeaderContext,
) -> Result<Self, Self::Error> {
use anyhow::Context;
use AuHeaderError::*;
let mut this = AuHeader::default();
if ctx.config.size_len > 0 {
let val = r
.read::<u32>(ctx.config.size_len as u32)
.context("AU-size")?;
// Will ensure the size is non-zero after we get the index
this.size = Some(val);
}
this.index = match ctx.prev_index {
None => r
.read::<u32>(ctx.config.index_len as u32)
.context("AU-Index")?
.into(),
Some(prev_index) => {
let delta = r
.read::<u32>(ctx.config.index_delta_len as u32)
.context("AU-Index-delta")?;
if delta > 0 {
this.is_interleaved = true;
}
prev_index + 1u32 + delta
}
};
if let Some(0) = this.size {
Err(ZeroSizedAu(this.index))?;
}
if ctx.config.cts_delta_len > 0 && r.read_bit().context("CTS-flag")? {
if ctx.prev_index.is_none() {
// § 3.2.1.1:
// > the CTS-flag field MUST have the value 0 in the first AU-header
Err(CtsFlagSetInFirstAuHeader(this.index))?;
}
let delta = r
.read::<u32>(ctx.config.cts_delta_len as u32)
.context("CTS-delta")?;
let delta = raw_2_comp_to_i32(delta, ctx.config.cts_delta_len);
this.cts_delta = Some(delta);
}
if ctx.config.dts_delta_len > 0 && r.read_bit().context("DTS-flag")? {
let delta = r
.read::<u32>(ctx.config.dts_delta_len as u32)
.context("DTS-delta")?;
let delta = raw_2_comp_to_i32(delta, ctx.config.dts_delta_len);
this.dts_delta = Some(delta);
}
if ctx.config.random_access_indication {
this.maybe_random_access = Some(r.read_bit().context("RAP-flag")?);
}
// ignored by gstrtpmp4gdepay
if ctx.config.stream_state_indication > 0 {
r.skip(ctx.config.stream_state_indication as u32)
.context("Stream-state")?;
}
Ok(this)
}
}

30
net/rtp/src/mp4g/mod.rs Normal file
View file

@ -0,0 +1,30 @@
// SPDX-License-Identifier: MPL-2.0
pub mod depay;
mod header;
pub use header::{AuHeader, AuHeaderContext};
mod mode;
pub use mode::ModeConfig;
#[derive(thiserror::Error, Debug, PartialEq, Eq)]
pub enum Mpeg4GenericError {
#[error("Can't compare AU index 0x8000_0000 to 0")]
AuIndexComparisonLimit,
#[error("Can't compare RTP timestamps 0x8000_0000 to 0")]
RTPTimestampComparisonLimit,
}
/// An Access Unit Index implemented as a comparable new type on a `[std::num::Wrapping]::<u32>`.
define_wrapping_comparable_u32_with_display!(
AccessUnitIndex,
Mpeg4GenericError,
AuIndexComparisonLimit,
);
/// An RTP timestamp implemented as a comparable new type on a `[std::num::Wrapping]::<u32>`.
define_wrapping_comparable_u32_with_display!(
RtpTimestamp,
Mpeg4GenericError,
RTPTimestampComparisonLimit,
);

141
net/rtp/src/mp4g/mode.rs Normal file
View file

@ -0,0 +1,141 @@
//! MPEG-4 Generic mode.
use std::str::FromStr;
#[derive(thiserror::Error, Debug, PartialEq, Eq)]
pub enum ModeError {
#[error("sizelength & constantsize can't be both defined")]
BothAuSizeLenAndConstantSize,
#[error("Neither sizelength nor constantsize are defined, need at least one of them")]
NeitherAuSizeLenNorConstantSize,
#[error("indexlength > 0 but indexdeltalength not defined")]
MandatoryIndexDeltaLength,
}
#[derive(Debug, Default)]
pub struct ModeConfig {
pub(crate) size_len: u8,
pub(crate) index_len: u8,
pub(crate) index_delta_len: u8,
pub(crate) cts_delta_len: u8,
pub(crate) dts_delta_len: u8,
pub(crate) random_access_indication: bool,
pub(crate) stream_state_indication: u8,
pub(crate) auxiliary_data_size_len: u8,
pub(crate) constant_size: u32,
pub(crate) constant_duration: u32,
pub(crate) max_displacement: u32,
}
impl ModeConfig {
#[inline]
pub fn has_header_section(&self) -> bool {
self.size_len > 0
|| self.index_len > 0
|| self.index_delta_len > 0
|| self.cts_delta_len > 0
|| self.dts_delta_len > 0
|| self.random_access_indication
|| self.stream_state_indication > 0
}
#[inline]
pub fn has_auxiliary_section(&self) -> bool {
self.auxiliary_data_size_len > 0
}
#[inline]
pub fn constant_duration(&self) -> Option<u32> {
if self.constant_duration == 0 {
return None;
}
Some(self.constant_duration)
}
#[inline]
pub fn max_displacement(&self) -> Option<u32> {
if self.max_displacement == 0 {
return None;
}
Some(self.max_displacement)
}
pub fn from_caps(s: &gst::StructureRef) -> anyhow::Result<Self> {
use ModeError::*;
// These values are optional and have a default value of 0 (no header)
let size_len = Self::parse_int::<u8>(s, "sizelength")?;
let constant_size = Self::parse_int::<u32>(s, "constantsize")?;
if size_len != 0 && constant_size != 0 {
Err(BothAuSizeLenAndConstantSize)?;
}
if size_len == 0 && constant_size == 0 {
Err(NeitherAuSizeLenNorConstantSize)?;
}
// § 3.2.1
// > If the AU-Index field is present in the first AU-header in the AU
// > Header Section, then the AU-Index-delta field MUST be present in
// > any subsequent (non-first) AU-header.
let index_len = Self::parse_int::<u8>(s, "indexlength")?;
let index_delta_len = Self::parse_int::<u8>(s, "indexdeltalength")?;
if index_len > 0 && index_delta_len == 0 {
Err(MandatoryIndexDeltaLength)?;
}
// TODO check mode & mode_config conformity
Ok(ModeConfig {
size_len,
index_len,
index_delta_len,
cts_delta_len: Self::parse_int::<u8>(s, "ctsdeltalength")?,
dts_delta_len: Self::parse_int::<u8>(s, "dtsdeltalength")?,
random_access_indication: Self::parse_int::<u8>(s, "randomaccessindication")? > 0,
stream_state_indication: Self::parse_int::<u8>(s, "streamstateindication")?,
auxiliary_data_size_len: Self::parse_int::<u8>(s, "auxiliarydatasizelength")?,
constant_size,
constant_duration: Self::parse_int::<u32>(s, "constantduration")?,
max_displacement: Self::parse_int::<u32>(s, "maxdisplacement")?,
})
}
/// Tries to read the `field` from the provided structure as an integer of type `T`.
///
/// Returns:
///
/// * `Ok(val)` if the field is present and its value could be parsed.
/// * `Ok(0)` if the field is not present.
/// * `Err(_)` otherwise.
fn parse_int<'a, T>(s: &'a gst::StructureRef, field: &'static str) -> anyhow::Result<T>
where
T: TryFrom<i32> + FromStr + gst::glib::value::FromValue<'a>,
<T as TryFrom<i32>>::Error: std::error::Error + Send + Sync + 'static,
<T as FromStr>::Err: std::error::Error + Send + Sync + 'static,
{
use anyhow::Context;
use gst::structure::GetError::*;
match s.get::<T>(field) {
Ok(val) => Ok(val),
Err(FieldNotFound { .. }) => Ok(T::try_from(0i32).unwrap()),
Err(ValueGetError { .. }) => match s.get::<i32>(field) {
Ok(val) => Ok(T::try_from(val).context(field)?),
Err(_) => Ok(s
.get::<&str>(field)
.context(field)?
.parse::<T>()
.context(field)?),
},
}
}
}

View file

@ -1,7 +1,7 @@
/// Computes the seqnum distance /// Computes the seqnum distance
/// ///
/// This makes sense if both seqnums are in the same cycle. /// This makes sense if both seqnums are in the same cycle.
pub(crate) fn seqnum_distance(seqnum1: u16, seqnum2: u16) -> i16 { pub fn seqnum_distance(seqnum1: u16, seqnum2: u16) -> i16 {
// See http://en.wikipedia.org/wiki/Serial_number_arithmetic // See http://en.wikipedia.org/wiki/Serial_number_arithmetic
let seqnum1 = i16::from_ne_bytes(seqnum1.to_ne_bytes()); let seqnum1 = i16::from_ne_bytes(seqnum1.to_ne_bytes());
@ -10,9 +10,274 @@ pub(crate) fn seqnum_distance(seqnum1: u16, seqnum2: u16) -> i16 {
seqnum1.wrapping_sub(seqnum2) seqnum1.wrapping_sub(seqnum2)
} }
/// Converts a raw two's complement value of len `bit_len` into an i32.
///
/// # Panic
///
/// Panics if `bit_len` > 32.
pub fn raw_2_comp_to_i32(val: u32, bit_len: u8) -> i32 {
assert!(bit_len <= 32);
if val < 1u32 << (bit_len - 1) as u32 {
// val is positive
val as i32
} else {
((0x1_0000_0000 - (1u64 << bit_len)) as u32 + val) as i32
}
}
/// Defines a comparable new type `$typ` on a `[std::num::Wrapping]::<u32>`.
///
/// The new type will wrap-around on additions and substractions and it comparison
/// operators take the wrapping in consideration.
///
/// The comparison algorithm uses [serial number arithmetic](serial-number-arithmetic).
/// The limit being that it can't tell whether 0x8000_0000 is greater or less than 0.
///
/// # Examples
///
/// ```rust
/// # use gstrsrtp::define_wrapping_comparable_u32;
///
/// /// Error type to return when comparing 0x8000_0000 to 0.
/// struct RTPTimestampComparisonLimit;
///
/// /// Define the new type comparable and wrapping `u32` `RTPTimestamp`:
/// define_wrapping_comparable_u32!(RTPTimestamp, RTPTimestampComparisonLimit);
///
/// let ts0 = RTPTimestamp::ZERO;
/// assert!(ts0.is_zero());
///
/// let mut ts = ts0;
/// ts += 1;
/// assert_eq!(*ts, 1);
/// assert_eq!(RTPTimestamp::MAX + ts, ts0);
///
/// let ts2: RTPTimestamp = 2.into();
/// assert_eq!(*ts2, 2);
/// assert_eq!(ts - ts2, RTPTimestamp::MAX);
/// ```
///
/// [serial-number-arithmetic]: http://en.wikipedia.org/wiki/Serial_number_arithmetic
#[macro_export]
macro_rules! define_wrapping_comparable_u32 {
($typ:ident) => {
#[derive(Clone, Copy, Debug, Default)]
pub struct $typ(std::num::Wrapping<u32>);
impl $typ {
pub const ZERO: $typ = $typ(std::num::Wrapping(0));
pub const MIN: $typ = $typ(std::num::Wrapping(u32::MIN));
pub const MAX: $typ = $typ(std::num::Wrapping(u32::MAX));
pub const NONE: Option<$typ> = None;
#[inline]
pub const fn new(val: u32) -> Self {
Self(std::num::Wrapping((val)))
}
#[inline]
pub fn from_ext(ext_val: u64) -> Self {
Self(std::num::Wrapping((ext_val & 0xffff_ffff) as u32))
}
#[inline]
pub fn is_zero(self) -> bool {
self.0 .0 == 0
}
#[inline]
pub fn distance(self, other: Self) -> Option<i32> {
self.distance_u32(other.0 .0)
}
#[inline]
pub fn distance_u32(self, other: u32) -> Option<i32> {
// See http://en.wikipedia.org/wiki/Serial_number_arithmetic
let this = i32::from_ne_bytes(self.0 .0.to_ne_bytes());
let other = i32::from_ne_bytes(other.to_ne_bytes());
match this.wrapping_sub(other) {
-0x8000_0000 => {
// This is the limit of the algorithm:
// arguments are too far away to determine the result sign,
// i.e. which one is greater than the other
None
}
delta => Some(delta),
}
}
}
impl From<u32> for $typ {
fn from(value: u32) -> Self {
Self(std::num::Wrapping(value))
}
}
impl From<$typ> for u32 {
fn from(value: $typ) -> Self {
value.0 .0
}
}
impl std::ops::Deref for $typ {
type Target = u32;
fn deref(&self) -> &u32 {
&self.0 .0
}
}
impl std::ops::Add for $typ {
type Output = Self;
fn add(self, rhs: Self) -> Self {
Self(self.0.add(rhs.0))
}
}
impl std::ops::Add<u32> for $typ {
type Output = Self;
fn add(self, rhs: u32) -> Self {
Self(self.0.add(std::num::Wrapping(rhs)))
}
}
impl std::ops::Add<i32> for $typ {
type Output = Self;
fn add(self, rhs: i32) -> Self {
// See http://en.wikipedia.org/wiki/Serial_number_arithmetic
let this = i32::from_ne_bytes(self.0 .0.to_ne_bytes());
let res = this.wrapping_add(rhs);
let res = u32::from_ne_bytes(res.to_ne_bytes());
Self(std::num::Wrapping(res))
}
}
impl std::ops::AddAssign for $typ {
fn add_assign(&mut self, rhs: Self) {
self.0.add_assign(rhs.0);
}
}
impl std::ops::AddAssign<u32> for $typ {
fn add_assign(&mut self, rhs: u32) {
self.0.add_assign(std::num::Wrapping(rhs));
}
}
impl std::ops::AddAssign<i32> for $typ {
fn add_assign(&mut self, rhs: i32) {
*self = *self + rhs;
}
}
impl std::ops::Sub for $typ {
type Output = Self;
fn sub(self, rhs: Self) -> Self {
self.sub(rhs.0 .0)
}
}
impl std::ops::Sub<u32> for $typ {
type Output = Self;
fn sub(self, rhs: u32) -> Self {
Self(self.0.sub(std::num::Wrapping(rhs)))
}
}
impl std::ops::SubAssign for $typ {
fn sub_assign(&mut self, rhs: Self) {
self.sub_assign(rhs.0 .0);
}
}
impl std::ops::SubAssign<u32> for $typ {
fn sub_assign(&mut self, rhs: u32) {
self.0.sub_assign(std::num::Wrapping(rhs));
}
}
impl std::cmp::PartialEq for $typ {
fn eq(&self, other: &Self) -> bool {
self.0 .0 == other.0 .0
}
}
impl std::cmp::PartialEq<u32> for $typ {
fn eq(&self, other: &u32) -> bool {
self.0 .0 == *other
}
}
impl std::cmp::Eq for $typ {}
impl std::cmp::PartialOrd for $typ {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
self.distance(*other).map(|d| d.cmp(&0))
}
}
impl gst::prelude::OptionOperations for $typ {}
};
($typ:ident, $comp_err_type:ident) => {
define_wrapping_comparable_u32!($typ);
impl $typ {
#[inline]
pub fn try_cmp(&self, other: $typ) -> Result<std::cmp::Ordering, $comp_err_type> {
self.partial_cmp(&other).ok_or($comp_err_type)
}
}
};
($typ:ident, $err_enum:ty, $comp_err_variant:ident) => {
define_wrapping_comparable_u32!($typ);
impl $typ {
#[inline]
pub fn try_cmp(&self, other: $typ) -> Result<std::cmp::Ordering, $err_enum> {
self.partial_cmp(&other)
.ok_or(<$err_enum>::$comp_err_variant)
}
}
};
}
#[macro_export]
macro_rules! define_wrapping_comparable_u32_with_display {
($typ:ident, impl) => {
impl std::fmt::Display for $typ {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_fmt(format_args!("{}", self.0 .0))
}
}
};
($typ:ident) => {
define_wrapping_comparable_u32!($typ);
define_wrapping_comparable_u32_with_display!($typ, impl);
};
($typ:ident, $comp_err_type:ty) => {
define_wrapping_comparable_u32!($typ, $comp_err_type);
define_wrapping_comparable_u32_with_display!($typ, impl);
};
($typ:ident, $err_enum:ty, $comp_err_variant:ident,) => {
define_wrapping_comparable_u32!($typ, $err_enum, $comp_err_variant);
define_wrapping_comparable_u32_with_display!($typ, impl);
};
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
define_wrapping_comparable_u32!(MyWrapper);
#[test] #[test]
fn compare_seqnums() { fn compare_seqnums() {
@ -30,4 +295,185 @@ mod tests {
assert_eq!(seqnum_distance(0x8000, 0), -0x8000); assert_eq!(seqnum_distance(0x8000, 0), -0x8000);
assert_eq!(seqnum_distance(0, 0x8000), -0x8000); assert_eq!(seqnum_distance(0, 0x8000), -0x8000);
} }
#[test]
fn raw_2_comp_12bits_to_i32() {
const BITS: u8 = 12;
assert_eq!(raw_2_comp_to_i32(0, BITS), 0);
assert_eq!(raw_2_comp_to_i32(1, BITS), 1);
assert_eq!(raw_2_comp_to_i32(2, BITS), 2);
assert_eq!(raw_2_comp_to_i32(0xfff, BITS), -1i16 as i32);
assert_eq!(raw_2_comp_to_i32(0xffe, BITS), -2i16 as i32);
assert_eq!(raw_2_comp_to_i32(0x7ff, BITS), (1 << (BITS - 1)) - 1);
assert_eq!(raw_2_comp_to_i32(0x800, BITS), -(1 << (BITS - 1)));
}
#[test]
fn raw_2_comp_16bits_to_i32() {
const BITS: u8 = i16::BITS as u8;
assert_eq!(raw_2_comp_to_i32(0, BITS), 0);
assert_eq!(raw_2_comp_to_i32(1, BITS), 1);
assert_eq!(raw_2_comp_to_i32(2, BITS), 2);
assert_eq!(raw_2_comp_to_i32(0xffff, BITS), -1i16 as i32);
assert_eq!(raw_2_comp_to_i32(0xfffe, BITS), -2i16 as i32);
assert_eq!(raw_2_comp_to_i32(0x7fff, BITS), i16::MAX as i32);
assert_eq!(raw_2_comp_to_i32(0x8000, BITS), i16::MIN as i32);
}
#[test]
fn raw_2_comp_32bits_to_i32() {
const BITS: u8 = i32::BITS as u8;
assert_eq!(raw_2_comp_to_i32(0, BITS), 0);
assert_eq!(raw_2_comp_to_i32(1, BITS), 1);
assert_eq!(raw_2_comp_to_i32(2, BITS), 2);
assert_eq!(raw_2_comp_to_i32(0xffff_ffff, BITS), -1i16 as i32);
assert_eq!(raw_2_comp_to_i32(0xffff_fffe, BITS), -2i16 as i32);
assert_eq!(raw_2_comp_to_i32(0x7fff_ffff, BITS), i32::MAX);
assert_eq!(raw_2_comp_to_i32(0x8000_0000, BITS), i32::MIN);
}
#[test]
fn wrapping_u32_basics() {
let zero = MyWrapper::ZERO;
let one = MyWrapper::from(1);
let two = MyWrapper::from(2);
assert_eq!(u32::from(zero), 0);
assert!(zero.is_zero());
assert_eq!(u32::from(one), 1);
assert_eq!(u32::from(two), 2);
let max_plus_1_u64 = MyWrapper::from_ext((u32::MAX as u64) + 1);
assert_eq!(max_plus_1_u64, MyWrapper::ZERO);
}
#[test]
fn add_wrapping_u32() {
let one = MyWrapper::from(1);
let two = MyWrapper::from(2);
assert_eq!(MyWrapper::ZERO + one, one);
assert_eq!(MyWrapper::ZERO + 1u32, one);
assert_eq!(one + one, two);
assert_eq!(one + 1u32, two);
assert_eq!(MyWrapper::MAX + MyWrapper::ZERO, MyWrapper::MAX);
assert_eq!(MyWrapper::MAX + one, MyWrapper::ZERO);
assert_eq!(MyWrapper::MAX + two, one);
let mut var = MyWrapper::ZERO;
assert!(var.is_zero());
var += 1;
assert_eq!(var, one);
var += one;
assert_eq!(var, two);
let mut var = MyWrapper::MAX;
var += 1;
assert!(var.is_zero());
var += one;
assert_eq!(var, one);
}
#[test]
fn add_wrapping_u32_i32() {
let one = MyWrapper::from(1);
assert_eq!(MyWrapper::ZERO + 1i32, one);
assert_eq!(MyWrapper::ZERO + -1i32, MyWrapper::MAX);
assert_eq!(MyWrapper::MAX + 1i32, MyWrapper::ZERO);
assert_eq!(MyWrapper::MAX + 2i32, one);
assert_eq!(
MyWrapper::from(0x8000_0000) + -0i32,
MyWrapper::from(0x8000_0000)
);
assert_eq!(
MyWrapper::from(0x8000_0000) + 1i32,
MyWrapper::from(0x8000_0001)
);
assert_eq!(
MyWrapper::from(0x8000_0000) + -1i32,
MyWrapper::from(0x7fff_ffff)
);
assert_eq!(
MyWrapper::from(0x7fff_ffff) + 1i32,
MyWrapper::from(0x8000_0000)
);
assert_eq!(MyWrapper::ZERO + i32::MIN, MyWrapper::from(0x8000_0000));
let mut var = MyWrapper::ZERO;
var += 1i32;
assert_eq!(var, one);
let mut var = MyWrapper::ZERO;
var += -1i32;
assert_eq!(var, MyWrapper::MAX);
let mut var = MyWrapper::MAX;
var += 1;
assert_eq!(var, MyWrapper::ZERO);
}
#[test]
fn sub_wrapping_u32() {
let one = MyWrapper::from(1);
assert_eq!(MyWrapper::ZERO - MyWrapper::ZERO, MyWrapper::ZERO);
assert_eq!(MyWrapper::MAX - MyWrapper::MAX, MyWrapper::ZERO);
assert_eq!(MyWrapper::ZERO - one, MyWrapper::MAX);
assert_eq!(MyWrapper::ZERO - MyWrapper::MAX, one);
assert_eq!(
MyWrapper::ZERO - MyWrapper::from(0x8000_0000),
MyWrapper::from(0x8000_0000)
);
assert_eq!(
MyWrapper::from(0x8000_0000) - MyWrapper::ZERO,
MyWrapper::from(0x8000_0000)
);
let mut var = MyWrapper::ZERO;
assert!(var.is_zero());
var -= 1;
assert_eq!(var, MyWrapper::MAX);
let mut var = MyWrapper::MAX;
var -= MyWrapper::MAX;
assert!(var.is_zero());
}
#[test]
fn compare_wrapping_u32() {
use std::cmp::Ordering::*;
#[derive(Debug, PartialEq)]
pub struct ComparisonLimit;
define_wrapping_comparable_u32!(MyWrapper, ComparisonLimit);
let cmp = |a: u32, b: u32| MyWrapper::from(a).partial_cmp(&MyWrapper::from(b));
let try_cmp = |a: u32, b: u32| MyWrapper::from(a).try_cmp(MyWrapper::from(b));
assert_eq!(cmp(0, 1).unwrap(), Less);
assert_eq!(try_cmp(0, 1), Ok(Less));
assert_eq!(cmp(1, 1).unwrap(), Equal);
assert_eq!(try_cmp(1, 1), Ok(Equal));
assert_eq!(cmp(1, 0).unwrap(), Greater);
assert_eq!(try_cmp(1, 0), Ok(Greater));
assert_eq!(cmp(0x7fff_ffff, 0).unwrap(), Greater);
assert_eq!(try_cmp(0x7fff_ffff, 0), Ok(Greater));
assert_eq!(cmp(0xffff_ffff, 0).unwrap(), Less);
assert_eq!(try_cmp(0xffff_ffff, 0), Ok(Less));
assert_eq!(cmp(0, 0x7fff_ffff).unwrap(), Less);
assert_eq!(try_cmp(0, 0x7fff_ffff), Ok(Less));
assert_eq!(cmp(0, 0xffff_ffff).unwrap(), Greater);
assert_eq!(try_cmp(0, 0xffff_ffff), Ok(Greater));
// This is the limit of the algorithm:
assert!(cmp(0x8000_0000, 0).is_none());
assert!(cmp(0, 0x8000_0000).is_none());
assert_eq!(try_cmp(0x8000_0000, 0), Err(ComparisonLimit));
assert_eq!(try_cmp(0, 0x8000_0000), Err(ComparisonLimit));
}
} }