diff --git a/Cargo.lock b/Cargo.lock index 109d62f2..c77ae4e6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2719,6 +2719,7 @@ dependencies = [ "once_cell", "rand", "rtp-types", + "slab", "smallvec", "thiserror", "time", diff --git a/docs/plugins/gst_plugins_cache.json b/docs/plugins/gst_plugins_cache.json index 1f0fe67d..a5300fe8 100644 --- a/docs/plugins/gst_plugins_cache.json +++ b/docs/plugins/gst_plugins_cache.json @@ -6977,6 +6977,32 @@ }, "rank": "marginal" }, + "rtpmp4gdepay2": { + "author": "François Laignel ", + "description": "Depayload MPEG-4 Generic elementary streams from RTP packets (RFC 3640)", + "hierarchy": [ + "GstRtpMpeg4GenericDepay", + "GstRtpBaseDepay2", + "GstElement", + "GstObject", + "GInitiallyUnowned", + "GObject" + ], + "klass": "Codec/Depayloader/Network/RTP", + "pad-templates": { + "sink": { + "caps": "application/x-rtp:\n media: { (string)audio, (string)video }\n clock-rate: [ 1, 2147483647 ]\n encoding-name: MPEG4-GENERIC\n mode: { (string)generic, (string)AAC-lbr, (string)AAC-hbr, (string)aac-hbr }\n", + "direction": "sink", + "presence": "always" + }, + "src": { + "caps": "video/mpeg:\n mpegversion: 4\n systemstream: false\naudio/mpeg:\n mpegversion: 4\n stream-format: raw\n", + "direction": "src", + "presence": "always" + } + }, + "rank": "marginal" + }, "rtppcmadepay2": { "author": "Sebastian Dröge ", "description": "Depayload A-law from RTP packets (RFC 3551)", diff --git a/net/rtp/Cargo.toml b/net/rtp/Cargo.toml index 4b52d705..86988f88 100644 --- a/net/rtp/Cargo.toml +++ b/net/rtp/Cargo.toml @@ -22,6 +22,7 @@ hex = "0.4.3" once_cell.workspace = true rand = { version = "0.8", default-features = false, features = ["std", "std_rng" ] } rtp-types = { version = "0.1" } +slab = "0.4.9" smallvec = { version = "1.11", features = ["union", "write", "const_generics", "const_new"] } thiserror = "1" time = { version = "0.3", default-features = false, features = ["std"] } diff --git a/net/rtp/src/lib.rs b/net/rtp/src/lib.rs index 1cc0a161..19fbc65f 100644 --- a/net/rtp/src/lib.rs +++ b/net/rtp/src/lib.rs @@ -16,6 +16,9 @@ */ use gst::glib; +#[macro_use] +mod utils; + mod gcc; mod audio_discont; @@ -26,6 +29,7 @@ mod basepay; mod av1; mod mp2t; mod mp4a; +mod mp4g; mod pcmau; mod vp8; mod vp9; @@ -33,8 +37,6 @@ mod vp9; #[cfg(test)] mod tests; -mod utils; - fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { gcc::register(plugin)?; @@ -59,6 +61,8 @@ fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { mp4a::depay::register(plugin)?; mp4a::pay::register(plugin)?; + mp4g::depay::register(plugin)?; + pcmau::depay::register(plugin)?; pcmau::pay::register(plugin)?; diff --git a/net/rtp/src/mp4g/depay/deint_buf.rs b/net/rtp/src/mp4g/depay/deint_buf.rs new file mode 100644 index 00000000..b93f1454 --- /dev/null +++ b/net/rtp/src/mp4g/depay/deint_buf.rs @@ -0,0 +1,748 @@ +//! Access Unit Deinterleaving Buffer +use slab::Slab; + +use super::{AccessUnit, AccessUnitIndex, MaybeSingleAuOrList, Mpeg4GenericDepayError}; + +#[derive(Debug)] +struct AuNode { + au: AccessUnit, + /// Index of the next AuNode in the early_aus buffer + next: Option, +} + +/// Access Unit Deinterleaving Buffer. +/// +/// In some packet modes, non-consecutive AUs might be grouped together, +/// which can limit the gap between to AUs in case of packet loss. +/// +/// The Deinterleaving Buffer collects AUs as they arrive and outputs +/// them in the expected order whenever possible. +/// +/// See [Interleaving in RFC 3640](rfc-interleaving). +/// +/// [rfc-interleaving]: https://www.rfc-editor.org/rfc/rfc3640.html#section-3.2.3.2 +#[derive(Debug, Default)] +pub struct DeinterleaveAuBuffer { + /// Linked list of the early AUs + early_aus: Slab, + /// Index of the head in early_aus buffer + head: Option, + expected_index: Option, +} + +impl DeinterleaveAuBuffer { + pub fn new(max_displacement: u32) -> Self { + DeinterleaveAuBuffer { + early_aus: Slab::with_capacity(max_displacement as usize), + ..Default::default() + } + } + + pub fn drain(&mut self) -> MaybeSingleAuOrList { + self.expected_index = None; + + let mut cur_opt = self.head.take(); + + let len = self.early_aus.len(); + match len { + 0 => return MaybeSingleAuOrList::default(), + 1 => { + let node = self.early_aus.remove(cur_opt.unwrap()); + return MaybeSingleAuOrList::from(node.au); + } + _ => (), + } + + let mut list = MaybeSingleAuOrList::new_list(len); + + while let Some(cur) = cur_opt { + let cur_node = self.early_aus.remove(cur); + list.push(cur_node.au); + + cur_opt = cur_node.next; + } + + list + } + + pub fn flush(&mut self) { + self.early_aus.clear(); + self.head = None; + self.expected_index = None; + } + + #[track_caller] + pub fn push_and_pop( + &mut self, + au: AccessUnit, + outbuf: &mut MaybeSingleAuOrList, + ) -> Result<(), Mpeg4GenericDepayError> { + use std::cmp::Ordering::*; + + let mut expected_index = match self.expected_index { + Some(expected_index) => match au.index.try_cmp(expected_index)? { + Equal => expected_index, + Greater => return self.insert_au(au), + Less => { + // Dropping too early Au + return Err(Mpeg4GenericDepayError::TooEarlyAU { + index: au.index, + expected_index, + }); + } + }, + None => au.index, // first AU + }; + + outbuf.push(au); + + expected_index += 1; + self.expected_index = Some(expected_index); + + // Pop other ready AUs if any + let mut head; + let mut head_node_ref; + let mut head_node; + while !self.early_aus.is_empty() { + head = self.head.expect("!early_aus.is_empty"); + + head_node_ref = self.early_aus.get(head).unwrap(); + if head_node_ref.au.index.try_cmp(expected_index)?.is_ne() { + break; + } + + head_node = self.early_aus.remove(head); + outbuf.push(head_node.au); + + expected_index += 1; + self.expected_index = Some(expected_index); + + self.head = head_node.next; + } + + Ok(()) + } + + fn insert_au(&mut self, au: AccessUnit) -> Result<(), Mpeg4GenericDepayError> { + use std::cmp::Ordering::*; + + if self.early_aus.is_empty() { + self.head = Some(self.early_aus.insert(AuNode { au, next: None })); + + // Nothing to pop + return Ok(()); + } + + let mut cur = self.head.expect("!early_aus.is_empty"); + let mut cur_node = self.early_aus.get(cur).unwrap(); + + // cur & cur_node refer to current head here + match au.index.try_cmp(cur_node.au.index)? { + Greater => (), + Less => { + // New head + self.head = Some(self.early_aus.insert(AuNode { + au, + next: Some(cur), + })); + + return Ok(()); + } + Equal => { + // Duplicate + // RFC, §2.3: + // > In addition, an AU MUST NOT be repeated in other RTP packets; hence + // > repetition of an AU is only possible when using a duplicate RTP packet. + // + // But: we can't received duplicates because they would have been rejected + // by the base class or the jitterbuffer. + unreachable!(); + } + } + + // Upcoming AU is not then new head + + loop { + let Some(next) = cur_node.next else { + let new = Some(self.early_aus.insert(AuNode { au, next: None })); + self.early_aus.get_mut(cur).unwrap().next = new; + + return Ok(()); + }; + + let next_node = self.early_aus.get(next).unwrap(); + + match au.index.try_cmp(next_node.au.index)? { + Greater => (), // try next node + Less => { + let new = self.early_aus.insert(AuNode { + au, + next: Some(next), + }); + self.early_aus.get_mut(cur).unwrap().next = Some(new); + + return Ok(()); + } + Equal => { + // Duplicate + // RFC, §2.3: + // > In addition, an AU MUST NOT be repeated in other RTP packets; hence + // > repetition of an AU is only possible when using a duplicate RTP packet. + // + // But: we can't received duplicates because they would have been rejected + // by the base class or the jitterbuffer. + unreachable!(); + } + } + + cur = next; + cur_node = next_node; + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::mp4g::depay::SingleAuOrList; + + impl From for AccessUnit { + fn from(index: u32) -> Self { + AccessUnit { + index: index.into(), + ..Default::default() + } + } + } + + #[test] + fn simple_group_interleave() { + // Tests the pattern illustrated in: + // https://www.rfc-editor.org/rfc/rfc3640.html#appendix-A.3 + + gst::init().unwrap(); + + let mut deint_buf = DeinterleaveAuBuffer::default(); + assert!(deint_buf.early_aus.is_empty()); + assert!(deint_buf.expected_index.is_none()); + + let mut outbuf = MaybeSingleAuOrList::default(); + assert!(outbuf.0.is_none()); + + // **** + // * P0. AUs with indices: 0, 3 & 6 + + // Expected AU 0 so it is pushed to outbuf + deint_buf.push_and_pop(0.into(), &mut outbuf).unwrap(); + + assert!(deint_buf.early_aus.is_empty()); + assert_eq!(deint_buf.expected_index.unwrap(), 1); + matches!(outbuf.0, Some(SingleAuOrList::Single(_))); + + // Expected AU 1 is missing when pushing AU 3 so it is buffered + deint_buf.push_and_pop(3.into(), &mut outbuf).unwrap(); + + assert_eq!(deint_buf.early_aus.len(), 1); + assert_eq!(deint_buf.expected_index.unwrap(), 1); + matches!(outbuf.0, Some(SingleAuOrList::Single(_))); + + // Expected AU 1 is missing when pushing AU 6 so it is buffered + deint_buf.push_and_pop(6.into(), &mut outbuf).unwrap(); + + assert_eq!(deint_buf.early_aus.len(), 2); + assert_eq!(deint_buf.expected_index.unwrap(), 1); + + // End of the RTP packet + matches!(outbuf.take(), Some(SingleAuOrList::Single(_))); + assert!(outbuf.0.is_none()); + + // **** + // * P1. AUs with indices: 1, 4 & 7 + + // Expected AU 1 so it is pushed to outbuf + deint_buf.push_and_pop(1.into(), &mut outbuf).unwrap(); + + assert_eq!(deint_buf.early_aus.len(), 2); + assert_eq!(deint_buf.expected_index.unwrap(), 2); + matches!(outbuf.0, Some(SingleAuOrList::Single(_))); + + // Expected AU 2 is missing when pushing AU 4 so it is buffered + deint_buf.push_and_pop(4.into(), &mut outbuf).unwrap(); + + assert_eq!(deint_buf.early_aus.len(), 3); + assert_eq!(deint_buf.expected_index.unwrap(), 2); + matches!(outbuf.0, Some(SingleAuOrList::Single(_))); + + // Expected AU 2 is missing when pushing AU 7 so it is buffered + deint_buf.push_and_pop(7.into(), &mut outbuf).unwrap(); + + assert_eq!(deint_buf.early_aus.len(), 4); + assert_eq!(deint_buf.expected_index.unwrap(), 2); + + // End of the RTP packet + matches!(outbuf.take(), Some(SingleAuOrList::Single(_))); + assert!(outbuf.0.is_none()); + + // **** + // * P2. AUs with indices: 2, 5 & 8 + + // Expected AU 2 so it is pushed to outbuf + // and this also pops AUs 3 & 4 + // Remaining: 6 & 7 + deint_buf.push_and_pop(2.into(), &mut outbuf).unwrap(); + + assert_eq!(deint_buf.early_aus.len(), 2); + assert_eq!(deint_buf.expected_index.unwrap(), 5); + let Some(SingleAuOrList::List(ref buflist)) = outbuf.0 else { + panic!("Expecting a List"); + }; + assert_eq!(buflist.len(), 3); + + // Expected AU 5 so it is pushed to outbuf + // and this also pops AUs 6 & 7 + deint_buf.push_and_pop(5.into(), &mut outbuf).unwrap(); + + assert!(deint_buf.early_aus.is_empty()); + assert_eq!(deint_buf.expected_index.unwrap(), 8); + let Some(SingleAuOrList::List(ref buflist)) = outbuf.0 else { + panic!("Expecting a List"); + }; + assert_eq!(buflist.len(), 6); + + // Expected AU 8 so it is pushed to outbuf + deint_buf.push_and_pop(8.into(), &mut outbuf).unwrap(); + + assert!(deint_buf.early_aus.is_empty()); + assert_eq!(deint_buf.expected_index.unwrap(), 9); + + // End of the RTP packet + let Some(SingleAuOrList::List(ref buflist)) = outbuf.take() else { + panic!("Expecting a List"); + }; + assert_eq!(buflist.len(), 7); + assert!(outbuf.0.is_none()); + + // **** + // * P3. AUs with indices: 9, 12 & 15 + + // Expected AU 9 so it is pushed to outbuf + deint_buf.push_and_pop(9.into(), &mut outbuf).unwrap(); + + assert!(deint_buf.early_aus.is_empty()); + assert_eq!(deint_buf.expected_index.unwrap(), 10); + matches!(outbuf.0, Some(SingleAuOrList::Single(_))); + + // Expected AU 10 is missing when pushing AU 12 so it is buffered + deint_buf.push_and_pop(12.into(), &mut outbuf).unwrap(); + + assert_eq!(deint_buf.early_aus.len(), 1); + assert_eq!(deint_buf.expected_index.unwrap(), 10); + matches!(outbuf.0, Some(SingleAuOrList::Single(_))); + + // Expected AU 10 is missing when pushing AU 15 so it is buffered + deint_buf.push_and_pop(15.into(), &mut outbuf).unwrap(); + + assert_eq!(deint_buf.early_aus.len(), 2); + assert_eq!(deint_buf.expected_index.unwrap(), 10); + + // End of the RTP packet + matches!(outbuf.take(), Some(SingleAuOrList::Single(_))); + assert!(outbuf.0.is_none()); + } + + #[test] + fn more_subtle_group_interleave() { + // Tests the pattern illustrated in: + // https://www.rfc-editor.org/rfc/rfc3640.html#appendix-A.4 + + gst::init().unwrap(); + + let mut deint_buf = DeinterleaveAuBuffer::default(); + let mut outbuf = MaybeSingleAuOrList::default(); + + // **** + // * P0. AUs with indices: 0 & 5 + + // Expected AU 0 so it is pushed to outbuf + deint_buf.push_and_pop(0.into(), &mut outbuf).unwrap(); + + assert!(deint_buf.early_aus.is_empty()); + assert_eq!(deint_buf.expected_index.unwrap(), 1); + matches!(outbuf.0, Some(SingleAuOrList::Single(_))); + + // Expected AU 1 is missing when pushing AU 5 so it is buffered + deint_buf.push_and_pop(5.into(), &mut outbuf).unwrap(); + + assert_eq!(deint_buf.early_aus.len(), 1); + assert_eq!(deint_buf.expected_index.unwrap(), 1); + + // End of the RTP packet + matches!(outbuf.take(), Some(SingleAuOrList::Single(_))); + assert!(outbuf.0.is_none()); + + // **** + // * P1. AUs with indices: 2 & 7 + + // Expected AU 1 is missing when pushing AU 2 so it is buffered + deint_buf.push_and_pop(2.into(), &mut outbuf).unwrap(); + + assert_eq!(deint_buf.early_aus.len(), 2); + assert_eq!(deint_buf.expected_index.unwrap(), 1); + assert!(outbuf.0.is_none()); + + // Expected AU 1 is missing when pushing AU 7 so it is buffered + deint_buf.push_and_pop(7.into(), &mut outbuf).unwrap(); + + assert_eq!(deint_buf.early_aus.len(), 3); + assert_eq!(deint_buf.expected_index.unwrap(), 1); + + // End of the RTP packet + assert!(outbuf.take().is_none()); + + // **** + // * P2. AUs with indices: 4 & 9 + + // Expected AU 1 is missing when pushing AU 4 so it is buffered + deint_buf.push_and_pop(4.into(), &mut outbuf).unwrap(); + + assert_eq!(deint_buf.early_aus.len(), 4); + assert_eq!(deint_buf.expected_index.unwrap(), 1); + assert!(outbuf.0.is_none()); + + // Expected AU 1 is missing when pushing AU 9 so it is buffered + deint_buf.push_and_pop(9.into(), &mut outbuf).unwrap(); + + assert_eq!(deint_buf.early_aus.len(), 5); + assert_eq!(deint_buf.expected_index.unwrap(), 1); + + // End of the RTP packet + assert!(outbuf.take().is_none()); + + // **** + // * P3. AUs with indices: 1 & 6 + + // Expected AU 1 so it is pushed to outbuf + // and this also pops AU 2 + // Remaining: 4, 5, 7 & 9 + deint_buf.push_and_pop(1.into(), &mut outbuf).unwrap(); + + assert_eq!(deint_buf.early_aus.len(), 4); + assert_eq!(deint_buf.expected_index.unwrap(), 3); + let Some(SingleAuOrList::List(ref buflist)) = outbuf.0 else { + panic!("Expecting a List"); + }; + assert_eq!(buflist.len(), 2); + + // Expected AU 3 is missing when pushing AU 6 so it is buffered + deint_buf.push_and_pop(6.into(), &mut outbuf).unwrap(); + + assert_eq!(deint_buf.early_aus.len(), 5); + assert_eq!(deint_buf.expected_index.unwrap(), 3); + + // End of the RTP packet + let Some(SingleAuOrList::List(ref buflist)) = outbuf.take() else { + panic!("Expecting a List"); + }; + assert_eq!(buflist.len(), 2); + assert!(outbuf.0.is_none()); + + // **** + // * P4. AUs with indices: 3 & 8 + + // Expected AU 3 so it is pushed to outbuf + // and this also pops AU 4, 5, 6 & 7 + // Remaining: 9 + deint_buf.push_and_pop(3.into(), &mut outbuf).unwrap(); + + assert_eq!(deint_buf.early_aus.len(), 1); + assert_eq!(deint_buf.expected_index.unwrap(), 8); + let Some(SingleAuOrList::List(ref buflist)) = outbuf.0 else { + panic!("Expecting a List"); + }; + assert_eq!(buflist.len(), 5); + + // Expected AU 8 so it is pushed to outbuf + // and this also pops AU 9 + deint_buf.push_and_pop(8.into(), &mut outbuf).unwrap(); + + assert!(deint_buf.early_aus.is_empty()); + assert_eq!(deint_buf.expected_index.unwrap(), 10); + + // End of the RTP packet + let Some(SingleAuOrList::List(ref buflist)) = outbuf.take() else { + panic!("Expecting a List"); + }; + assert_eq!(buflist.len(), 7); + assert!(outbuf.0.is_none()); + + // **** + // * P5. AUs with indices: 10 & 15 + + // Expected AU 10 so it is pushed to outbuf + deint_buf.push_and_pop(10.into(), &mut outbuf).unwrap(); + + assert!(deint_buf.early_aus.is_empty()); + assert_eq!(deint_buf.expected_index.unwrap(), 11); + matches!(outbuf.0, Some(SingleAuOrList::Single(_))); + + // Expected AU 11 is missing when pushing AU 15 so it is buffered + deint_buf.push_and_pop(15.into(), &mut outbuf).unwrap(); + + assert_eq!(deint_buf.early_aus.len(), 1); + assert_eq!(deint_buf.expected_index.unwrap(), 11); + + // End of the RTP packet + matches!(outbuf.take(), Some(SingleAuOrList::Single(_))); + assert!(outbuf.0.is_none()); + } + + #[test] + fn continuous_interleave() { + // Tests the pattern illustrated in: + // https://www.rfc-editor.org/rfc/rfc3640.html#appendix-A.5 + + gst::init().unwrap(); + + let mut deint_buf = DeinterleaveAuBuffer::default(); + let mut outbuf = MaybeSingleAuOrList::default(); + + // **** + // * P0. AUs with index: 0 + + // Expected AU 0 so it is pushed to outbuf + deint_buf.push_and_pop(0.into(), &mut outbuf).unwrap(); + + assert!(deint_buf.early_aus.is_empty()); + assert_eq!(deint_buf.expected_index.unwrap(), 1); + + // End of the RTP packet + matches!(outbuf.take(), Some(SingleAuOrList::Single(_))); + assert!(outbuf.0.is_none()); + + // **** + // * P1. AUs with indices: 1 & 4 + + // Expected AU 0 so it is pushed to outbuf + deint_buf.push_and_pop(1.into(), &mut outbuf).unwrap(); + + assert!(deint_buf.early_aus.is_empty()); + assert_eq!(deint_buf.expected_index.unwrap(), 2); + matches!(outbuf.0, Some(SingleAuOrList::Single(_))); + + // Expected AU 2 is missing when pushing AU 4 so it is buffered + deint_buf.push_and_pop(4.into(), &mut outbuf).unwrap(); + + assert_eq!(deint_buf.early_aus.len(), 1); + assert_eq!(deint_buf.expected_index.unwrap(), 2); + + // End of the RTP packet + matches!(outbuf.take(), Some(SingleAuOrList::Single(_))); + assert!(outbuf.take().is_none()); + + // **** + // * P2. AUs with indices: 2, 5 & 8 + + // Expected AU 2 so it is pushed to outbuf + deint_buf.push_and_pop(2.into(), &mut outbuf).unwrap(); + + assert_eq!(deint_buf.early_aus.len(), 1); + assert_eq!(deint_buf.expected_index.unwrap(), 3); + matches!(outbuf.0, Some(SingleAuOrList::Single(_))); + + // Expected AU 3 is missing when pushing AU 5 so it is buffered + deint_buf.push_and_pop(5.into(), &mut outbuf).unwrap(); + + assert_eq!(deint_buf.early_aus.len(), 2); + assert_eq!(deint_buf.expected_index.unwrap(), 3); + matches!(outbuf.0, Some(SingleAuOrList::Single(_))); + + // Expected AU 3 is missing when pushing AU 8 so it is buffered + deint_buf.push_and_pop(8.into(), &mut outbuf).unwrap(); + + assert_eq!(deint_buf.early_aus.len(), 3); + assert_eq!(deint_buf.expected_index.unwrap(), 3); + + // End of the RTP packet + matches!(outbuf.take(), Some(SingleAuOrList::Single(_))); + assert!(outbuf.take().is_none()); + + // **** + // * P3. AUs with indices: 3, 6, 9 & 12 + + // Expected AU 3 so it is pushed to outbuf + // and this also pops AU 4 & 5 + // Remaining: 8 + deint_buf.push_and_pop(3.into(), &mut outbuf).unwrap(); + + assert_eq!(deint_buf.early_aus.len(), 1); + assert_eq!(deint_buf.expected_index.unwrap(), 6); + let Some(SingleAuOrList::List(ref buflist)) = outbuf.0 else { + panic!("Expecting a List"); + }; + assert_eq!(buflist.len(), 3); + + // Expected AU 6 so it is pushed to outbuf + deint_buf.push_and_pop(6.into(), &mut outbuf).unwrap(); + + assert_eq!(deint_buf.early_aus.len(), 1); + assert_eq!(deint_buf.expected_index.unwrap(), 7); + let Some(SingleAuOrList::List(ref buflist)) = outbuf.0 else { + panic!("Expecting a List"); + }; + assert_eq!(buflist.len(), 4); + + // Expected AU 7 is missing when pushing AU 9 so it is buffered + deint_buf.push_and_pop(9.into(), &mut outbuf).unwrap(); + + assert_eq!(deint_buf.early_aus.len(), 2); + assert_eq!(deint_buf.expected_index.unwrap(), 7); + matches!(outbuf.0, Some(SingleAuOrList::List(_))); + + // Expected AU 7 is missing when pushing AU 12 so it is buffered + deint_buf.push_and_pop(12.into(), &mut outbuf).unwrap(); + + assert_eq!(deint_buf.early_aus.len(), 3); + assert_eq!(deint_buf.expected_index.unwrap(), 7); + + // End of the RTP packet + let Some(SingleAuOrList::List(ref buflist)) = outbuf.take() else { + panic!("Expecting a List"); + }; + assert_eq!(buflist.len(), 4); + assert!(outbuf.0.is_none()); + + // **** + // * P4. AUs with indices: 7, 10, 13 & 16 + + // Expected AU 7 so it is pushed to outbuf + // and this also pops AU 8 & 9 + // Remaining: 12 + deint_buf.push_and_pop(7.into(), &mut outbuf).unwrap(); + + assert_eq!(deint_buf.early_aus.len(), 1); + assert_eq!(deint_buf.expected_index.unwrap(), 10); + let Some(SingleAuOrList::List(ref buflist)) = outbuf.0 else { + panic!("Expecting a List"); + }; + assert_eq!(buflist.len(), 3); + + // Expected AU 10 so it is pushed to outbuf + deint_buf.push_and_pop(10.into(), &mut outbuf).unwrap(); + + assert_eq!(deint_buf.early_aus.len(), 1); + assert_eq!(deint_buf.expected_index.unwrap(), 11); + let Some(SingleAuOrList::List(ref buflist)) = outbuf.0 else { + panic!("Expecting a List"); + }; + assert_eq!(buflist.len(), 4); + + // Expected AU 11 is missing when pushing AU 13 so it is buffered + deint_buf.push_and_pop(13.into(), &mut outbuf).unwrap(); + + assert_eq!(deint_buf.early_aus.len(), 2); + assert_eq!(deint_buf.expected_index.unwrap(), 11); + matches!(outbuf.0, Some(SingleAuOrList::List(_))); + + // Expected AU 11 is missing when pushing AU 16 so it is buffered + deint_buf.push_and_pop(16.into(), &mut outbuf).unwrap(); + + assert_eq!(deint_buf.early_aus.len(), 3); + assert_eq!(deint_buf.expected_index.unwrap(), 11); + + // End of the RTP packet + let Some(SingleAuOrList::List(ref buflist)) = outbuf.take() else { + panic!("Expecting a List"); + }; + assert_eq!(buflist.len(), 4); + assert!(outbuf.0.is_none()); + + // **** + // * P5. AUs with indices: 11, 14, 17 & 20 + + // Expected AU 11 so it is pushed to outbuf + // and this also pops AU 12 & 13 + // Remaining: 16 + deint_buf.push_and_pop(11.into(), &mut outbuf).unwrap(); + + assert_eq!(deint_buf.early_aus.len(), 1); + assert_eq!(deint_buf.expected_index.unwrap(), 14); + let Some(SingleAuOrList::List(ref buflist)) = outbuf.0 else { + panic!("Expecting a List"); + }; + assert_eq!(buflist.len(), 3); + + // Expected AU 14 so it is pushed to outbuf + deint_buf.push_and_pop(14.into(), &mut outbuf).unwrap(); + + assert_eq!(deint_buf.early_aus.len(), 1); + assert_eq!(deint_buf.expected_index.unwrap(), 15); + let Some(SingleAuOrList::List(ref buflist)) = outbuf.0 else { + panic!("Expecting a List"); + }; + assert_eq!(buflist.len(), 4); + + // Expected AU 15 is missing when pushing AU 17 so it is buffered + deint_buf.push_and_pop(17.into(), &mut outbuf).unwrap(); + + assert_eq!(deint_buf.early_aus.len(), 2); + assert_eq!(deint_buf.expected_index.unwrap(), 15); + matches!(outbuf.0, Some(SingleAuOrList::List(_))); + + // Expected AU 15 is missing when pushing AU 20 so it is buffered + deint_buf.push_and_pop(20.into(), &mut outbuf).unwrap(); + + assert_eq!(deint_buf.early_aus.len(), 3); + assert_eq!(deint_buf.expected_index.unwrap(), 15); + + // End of the RTP packet + let Some(SingleAuOrList::List(ref buflist)) = outbuf.take() else { + panic!("Expecting a List"); + }; + assert_eq!(buflist.len(), 4); + assert!(outbuf.0.is_none()); + + // **** + // * P6. AUs with indices: 15 & 18 + + // Expected AU 15 so it is pushed to outbuf + // and this also pops AU 16 & 17 + // Remaining: 20 + deint_buf.push_and_pop(15.into(), &mut outbuf).unwrap(); + + assert_eq!(deint_buf.early_aus.len(), 1); + assert_eq!(deint_buf.expected_index.unwrap(), 18); + let Some(SingleAuOrList::List(ref buflist)) = outbuf.0 else { + panic!("Expecting a List"); + }; + assert_eq!(buflist.len(), 3); + + // Expected AU 18 so it is pushed to outbuf + deint_buf.push_and_pop(18.into(), &mut outbuf).unwrap(); + + assert_eq!(deint_buf.early_aus.len(), 1); + assert_eq!(deint_buf.expected_index.unwrap(), 19); + + // End of the RTP packet + let Some(SingleAuOrList::List(ref buflist)) = outbuf.take() else { + panic!("Expecting a List"); + }; + assert_eq!(buflist.len(), 4); + assert!(outbuf.0.is_none()); + + // **** + // * P7. AUs with index: 19 + deint_buf.push_and_pop(19.into(), &mut outbuf).unwrap(); + + // Expected AU 19 so it is pushed to outbuf + // and this also pops AU 20 + assert!(deint_buf.early_aus.is_empty()); + assert_eq!(deint_buf.expected_index.unwrap(), 21); + + // End of the RTP packet + let Some(SingleAuOrList::List(ref buflist)) = outbuf.take() else { + panic!("Expecting a List"); + }; + assert_eq!(buflist.len(), 2); + assert!(outbuf.0.is_none()); + } +} diff --git a/net/rtp/src/mp4g/depay/imp.rs b/net/rtp/src/mp4g/depay/imp.rs new file mode 100644 index 00000000..2d1da783 --- /dev/null +++ b/net/rtp/src/mp4g/depay/imp.rs @@ -0,0 +1,641 @@ +// GStreamer RTP MPEG-4 Generic elementary streams Depayloader +// +// Copyright (C) 2023-2024 François Laignel +// +// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at +// . +// +// SPDX-License-Identifier: MPL-2.0 + +/** + * SECTION:element-rtpmp4gdepay2 + * @see_also: rtpmp4gpay2, rtpmp4gdepay, rtpmp4gpay + * + * Depayload an MPEG-4 Generic elementary stream from RTP packets as per [RFC 3640][rfc-3640]. + * + * [rfc-3640]: https://www.rfc-editor.org/rfc/rfc3640.html#section-4 + * + * ## Example pipeline + * + * |[ + * gst-launch-1.0 udpsrc caps='application/x-rtp,media=audio,clock-rate=44100,encoding-name=MPEG4-GENERIC,payload=96,encoding-params=1,streamtype=5,profile-level-id=2,mode=AAC-hbr,config=(string)1208,sizelength=13,indexlength=3,indexdeltalength=3' ! rtpjitterbuffer ! rtpmp4gdepay2 ! decodebin3 ! audioconvert ! audioresample ! autoaudiosink + * ]| This will depayload an incoming RTP MPEG-4 generic elementary stream AAC-hbr with + * 1 channel @ 44100 sampling rate (default `audiotestsrc ! fdkaacenc` negotiation). + * You can use the #rtpmp4gpay2 or #rtpmp4gpay elements to create such an RTP stream. + * + * Since: plugins-rs-0.13.0 + */ +use anyhow::Context; +use atomic_refcell::AtomicRefCell; +use once_cell::sync::Lazy; + +use gst::{glib, prelude::*, subclass::prelude::*}; + +use std::ops::{ControlFlow, RangeInclusive}; + +use crate::basedepay::{Packet, PacketToBufferRelation, RtpBaseDepay2Ext, TimestampOffset}; + +use crate::mp4g::{ModeConfig, RtpTimestamp}; + +use super::parsers::PayloadParser; +use super::{ + AccessUnit, DeinterleaveAuBuffer, MaybeSingleAuOrList, Mpeg4GenericDepayError, SingleAuOrList, +}; + +static CAT: Lazy = Lazy::new(|| { + gst::DebugCategory::new( + "rtpmp4gdepay2", + gst::DebugColorFlags::empty(), + Some("RTP MPEG-4 generic Depayloader"), + ) +}); + +#[derive(Default)] +pub struct RtpMpeg4GenericDepay { + state: AtomicRefCell, +} + +#[glib::object_subclass] +impl ObjectSubclass for RtpMpeg4GenericDepay { + const NAME: &'static str = "GstRtpMpeg4GenericDepay"; + type Type = super::RtpMpeg4GenericDepay; + type ParentType = crate::basedepay::RtpBaseDepay2; +} + +impl ObjectImpl for RtpMpeg4GenericDepay {} + +impl GstObjectImpl for RtpMpeg4GenericDepay {} + +impl ElementImpl for RtpMpeg4GenericDepay { + fn metadata() -> Option<&'static gst::subclass::ElementMetadata> { + static ELEMENT_METADATA: Lazy = Lazy::new(|| { + gst::subclass::ElementMetadata::new( + "RTP MPEG-4 Generic ES Depayloader", + "Codec/Depayloader/Network/RTP", + "Depayload MPEG-4 Generic elementary streams from RTP packets (RFC 3640)", + "François Laignel ", + ) + }); + + Some(&*ELEMENT_METADATA) + } + + fn pad_templates() -> &'static [gst::PadTemplate] { + static PAD_TEMPLATES: Lazy> = Lazy::new(|| { + let sink_pad_template = gst::PadTemplate::new( + "sink", + gst::PadDirection::Sink, + gst::PadPresence::Always, + &gst::Caps::builder("application/x-rtp") + // TODO "application" is also present in rtpmp4gdepay caps template + // but it doesn't handle it in gst_rtp_mp4g_depay_setcaps + .field("media", gst::List::new(["audio", "video"])) + .field("clock-rate", gst::IntRange::new(1i32, i32::MAX)) + .field("encoding-name", "MPEG4-GENERIC") + // Required string params: + // "streamtype = { \"4\", \"5\" }, " Not set by Wowza 4 = video, 5 = audio + // "profile-level-id = [1,MAX], " + // "config = (string)" + .field( + "mode", + gst::List::new(["generic", "AAC-lbr", "AAC-hbr", "aac-hbr"]), + ) + // Optional general parameters: + // "objecttype = [1,MAX], " + // "constantsize = [1,MAX], " // constant size of each AU + // "constantduration = [1,MAX], " // constant duration of each AU + // "maxdisplacement = [1,MAX], " + // "de-interleavebuffersize = [1,MAX], " + // Optional configuration parameters: + // "sizelength = [1, 32], " + // "indexlength = [1, 32], " + // "indexdeltalength = [1, 32], " + // "ctsdeltalength = [1, 32], " + // "dtsdeltalength = [1, 32], " + // "randomaccessindication = {0, 1}, " + // "streamstateindication = [0, 32], " + // "auxiliarydatasizelength = [0, 32]" ) + .build(), + ) + .unwrap(); + + let src_pad_template = gst::PadTemplate::new( + "src", + gst::PadDirection::Src, + gst::PadPresence::Always, + &gst::Caps::builder_full() + .structure( + gst::Structure::builder("video/mpeg") + .field("mpegversion", 4i32) + .field("systemstream", false) + .build(), + ) + .structure( + gst::Structure::builder("audio/mpeg") + .field("mpegversion", 4i32) + .field("stream-format", "raw") + .build(), + ) + .build(), + ) + .unwrap(); + + vec![src_pad_template, sink_pad_template] + }); + + PAD_TEMPLATES.as_ref() + } +} + +#[derive(Debug, Default)] +struct State { + parser: PayloadParser, + deint_buf: Option, + au_acc: Option, + seqnum_base: Option, + clock_rate: u32, + can_parse: bool, + max_au_index: Option, + prev_au_index: Option, + prev_rtptime: Option, + last_au_index: Option, +} + +impl State { + fn flush(&mut self) { + self.parser.reset(); + + if let Some(deint_buf) = self.deint_buf.as_mut() { + deint_buf.flush(); + } + self.can_parse = false; + self.max_au_index = None; + self.prev_au_index = None; + self.prev_rtptime = None; + self.last_au_index = None; + } +} + +struct CodecData; +impl CodecData { + fn from_caps(s: &gst::StructureRef) -> anyhow::Result> { + let conf_str = s.get_optional::<&str>("config").context("config field")?; + let Some(conf_str) = conf_str else { + return Ok(None); + }; + let data = hex::decode(conf_str).context("decoding config")?; + + Ok(Some(gst::Buffer::from_mut_slice(data))) + } +} + +/// Accumulates packets for a fragmented AU. +/// +/// Used for packets containing fragments for a single AU. +/// +/// From https://www.rfc-editor.org/rfc/rfc3640.html#section-3.2.3: +/// +/// > The Access Unit Data Section contains an integer number of complete +/// > Access Units or a single fragment of one AU. +#[derive(Debug)] +struct AuAccumulator(AccessUnit); + +impl AuAccumulator { + #[inline] + fn new(au: AccessUnit) -> Self { + AuAccumulator(au) + } + #[inline] + fn try_append(&mut self, mut au: AccessUnit) -> Result<(), Mpeg4GenericDepayError> { + use Mpeg4GenericDepayError::*; + + // FIXME add comment about fragments having the same RTP timestamp + if self.0.cts_delta.opt_ne(au.cts_delta).unwrap_or(false) { + return Err(FragmentedAuRtpTsMismatch { + expected: self.0.cts_delta.unwrap(), + found: au.cts_delta.unwrap(), + ext_seqnum: au.ext_seqnum, + }); + } + + if self.0.dts_delta.opt_ne(au.dts_delta).unwrap_or(false) { + // § 3.2.1.1 + // > The DTS-delta field MUST have the same value + // > for all fragments of an Access Unit + return Err(FragmentedAuDtsMismatch { + expected: self.0.dts_delta.unwrap(), + found: au.dts_delta.unwrap(), + ext_seqnum: au.ext_seqnum, + }); + } + + self.0.data.append(&mut au.data); + + Ok(()) + } + + #[inline] + fn try_into_au(self) -> Result { + let au = self.0; + if let Some(expected) = au.size { + if expected as usize != au.data.len() { + return Err(Mpeg4GenericDepayError::FragmentedAuSizeMismatch { + expected, + found: au.data.len(), + ext_seqnum: au.ext_seqnum, + }); + } + } + + Ok(au) + } +} + +impl crate::basedepay::RtpBaseDepay2Impl for RtpMpeg4GenericDepay { + const ALLOWED_META_TAGS: &'static [&'static str] = &["audio", "video"]; + + fn stop(&self) -> Result<(), gst::ErrorMessage> { + *self.state.borrow_mut() = State::default(); + + Ok(()) + } + + fn drain(&self) -> Result { + let mut state = self.state.borrow_mut(); + + if let Some(ref mut deint_buf) = state.deint_buf { + if let Some(aus) = deint_buf.drain().take() { + self.finish_buffer_or_list(&state, None, aus)?; + } + } + + Ok(gst::FlowSuccess::Ok) + } + + fn flush(&self) { + gst::debug!(CAT, imp: self, "Flushing"); + self.state.borrow_mut().flush(); + } + + fn set_sink_caps(&self, caps: &gst::Caps) -> bool { + let s = caps.structure(0).unwrap(); + + let mode = s.get::<&str>("mode").expect("Required by Caps"); + if mode.starts_with("CELP") { + gst::error!(CAT, imp: self, "{mode} not supported yet"); + return false; + } + + let mut caps_builder = match s.get::<&str>("media").expect("Required by Caps") { + "audio" => gst::Caps::builder("audio/mpeg") + .field("mpegversion", 4i32) + .field("stream-format", "raw"), + "video" => gst::Caps::builder("video/mpeg") + .field("mpegversion", 4i32) + .field("systemstream", false), + // TODO handle "application" + _ => unreachable!(), + }; + + let mode_config = match ModeConfig::from_caps(s) { + Ok(h) => h, + Err(err) => { + gst::error!(CAT, imp: self, "Error parsing Header in Caps: {err:#}"); + return false; + } + }; + + match CodecData::from_caps(s) { + Ok(codec_data) => { + caps_builder = caps_builder.field("codec_data", codec_data); + } + Err(err) => { + gst::error!(CAT, imp: self, "Error parsing Caps: {err:#}"); + return false; + } + } + + let clock_rate = s.get::("clock-rate").expect("Required by Caps"); + debug_assert!(clock_rate.is_positive()); // constrained by Caps + let clock_rate = clock_rate as u32; + + { + let mut state = self.state.borrow_mut(); + state.seqnum_base = s.get_optional::("seqnum-base").unwrap(); + if let Some(seqnum_base) = state.seqnum_base { + gst::info!(CAT, imp: self, "Got seqnum_base {seqnum_base}"); + } + state.clock_rate = clock_rate; + + if let Some(max_displacement) = mode_config.max_displacement() { + state.deint_buf = Some(DeinterleaveAuBuffer::new(max_displacement)); + } + + state.parser.set_config(mode_config); + } + + self.obj().set_src_caps(&caps_builder.build()); + + true + } + + fn handle_packet( + &self, + packet: &crate::basedepay::Packet, + ) -> Result { + let mut state = self.state.borrow_mut(); + + if self.check_initial_packet(&mut state, packet).is_break() { + self.obj().drop_packets(..=packet.ext_seqnum()); + return Ok(gst::FlowSuccess::Ok); + } + + let State { + parser, + au_acc, + deint_buf, + .. + } = &mut *state; + + let payload = packet.payload(); + let ext_seqnum = packet.ext_seqnum(); + let packet_ts = RtpTimestamp::from_ext(packet.ext_timestamp()); + let au_iter = match parser.parse(payload, ext_seqnum, packet_ts) { + Ok(au_iter) => au_iter, + Err(err) => { + gst::warning!(CAT, imp: self, "Failed to parse payload for packet {ext_seqnum}: {err:#}"); + *au_acc = None; + self.obj().drop_packets(..=packet.ext_seqnum()); + + return Ok(gst::FlowSuccess::Ok); + } + }; + + let mut aus = MaybeSingleAuOrList::default(); + for au in au_iter { + let au = match au { + Ok(au) => au, + Err(err) => { + gst::warning!(CAT, imp: self, + "Failed to parse AU from packet {}: {err:#}", packet.ext_seqnum(), + ); + + continue; + } + }; + + // § 3.1: The marker indicates that: + // > the RTP packet payload contains either the final fragment of + // > a fragmented Access Unit or one or more complete Access Units + if !packet.marker_bit() { + if !au.is_fragment { + gst::warning!(CAT, imp: self, "Dropping non fragmented AU {au} in un-marked packet"); + continue; + } + + if let Some(ref mut acc) = au_acc { + if let Err(err) = acc.try_append(au) { + gst::warning!(CAT, imp: self, "Discarding pending fragmented AU: {err}"); + *au_acc = None; + parser.reset(); + self.obj().drop_packets(..=packet.ext_seqnum()); + + return Ok(gst::FlowSuccess::Ok); + } + } else { + *au_acc = Some(AuAccumulator::new(au)); + } + + gst::trace!(CAT, imp: self, "Non-final fragment"); + + return Ok(gst::FlowSuccess::Ok); + } + + // Packet marker set + + let au = match au_acc.take() { + Some(mut acc) => { + if au.is_fragment { + if let Err(err) = acc.try_append(au) { + gst::warning!(CAT, imp: self, "Discarding pending fragmented AU: {err}"); + parser.reset(); + self.obj().drop_packets(..=packet.ext_seqnum()); + + return Ok(gst::FlowSuccess::Ok); + } + + match acc.try_into_au() { + Ok(au) => au, + Err(err) => { + gst::warning!(CAT, imp: self, "Discarding pending fragmented AU: {err}"); + let Mpeg4GenericDepayError::FragmentedAuSizeMismatch { .. } = err + else { + unreachable!(); + }; + parser.reset(); + self.obj().drop_packets(..=packet.ext_seqnum()); + + return Ok(gst::FlowSuccess::Ok); + } + } + } else { + gst::warning!(CAT, imp: self, + "Discarding pending fragmented AU {} due to incoming non fragmented AU {au}", + acc.0, + ); + self.obj().drop_packets(..au.ext_seqnum); + + au + } + } + None => au, + }; + + if let Some(ref mut deint_buf) = deint_buf { + if let Err(err) = deint_buf.push_and_pop(au, &mut aus) { + gst::warning!(CAT, imp: self, "Failed to push AU to deinterleave buffer: {err}"); + // The AU has been dropped, just keep going + // Packet will be dropped eventually + } + + continue; + } + + if au.is_interleaved { + // From gstrtpmp4gdepay.c:616: + // > some broken non-interleaved streams have AU-index jumping around + // > all over the place, apparently assuming receiver disregards + + gst::warning!(CAT, imp: self, "Interleaved AU, but no `max_displacement` was defined"); + } + + aus.push(au); + } + + if let Some(aus) = aus.take() { + self.finish_buffer_or_list(&state, Some(packet.ext_seqnum()), aus)?; + } + + Ok(gst::FlowSuccess::Ok) + } +} + +impl RtpMpeg4GenericDepay { + #[inline] + fn check_initial_packet(&self, state: &mut State, packet: &Packet) -> ControlFlow<()> { + if state.can_parse { + return ControlFlow::Continue(()); + } + + let seqnum = (packet.ext_seqnum() & 0xffff) as u16; + + if let Some(seqnum_base) = state.seqnum_base { + let seqnum_base = (seqnum_base & 0xffff) as u16; + + // Assume seqnum_base and the initial ext_seqnum are in the same cycle + // This should be guaranteed by the JitterBuffer + let delta = crate::utils::seqnum_distance(seqnum, seqnum_base); + + if delta == 0 { + gst::debug!(CAT, imp: self, + "Got initial packet {seqnum_base} @ ext seqnum {}", packet.ext_seqnum(), + ); + state.can_parse = true; + + return ControlFlow::Continue(()); + } + + if delta < 0 { + gst::log!(CAT, imp: self, + "Waiting for initial packet {seqnum_base}, got {seqnum} (ext seqnum {})", + packet.ext_seqnum(), + ); + + return ControlFlow::Break(()); + } + + gst::debug!(CAT, imp: self, + "Packet {seqnum} (ext seqnum {}) passed expected initial packet {seqnum_base}, will sync on next marker", + packet.ext_seqnum(), + ); + + state.seqnum_base = None; + } + + // Wait until a marked packet is found and start parsing from the next packet + if packet.marker_bit() { + gst::debug!(CAT, imp: self, + "Found first marked packet {seqnum} (ext seqnum {}). Will start parsing from next packet", + packet.ext_seqnum(), + ); + + assert!(state.au_acc.is_none()); + state.can_parse = true; + } else { + gst::log!(CAT, imp: self, + "First marked packet not found yet, skipping packet {seqnum} (ext seqnum {})", + packet.ext_seqnum(), + ); + } + + ControlFlow::Break(()) + } + + fn finish_buffer_or_list( + &self, + state: &State, + packet_ext_seqnum: Option, + aus: SingleAuOrList, + ) -> Result { + use SingleAuOrList::*; + + fn get_packet_to_buffer_relation( + au: &AccessUnit, + clock_rate: u32, + range: RangeInclusive, + ) -> PacketToBufferRelation { + if let Some((cts_delta, dts_delta)) = Option::zip(au.cts_delta, au.dts_delta) { + let pts_offset = gst::Signed::::from(cts_delta as i64) + .mul_div_floor(*gst::ClockTime::SECOND, clock_rate as u64) + .unwrap(); + let dts_offset = gst::Signed::::from(dts_delta as i64) + .mul_div_floor(*gst::ClockTime::SECOND, clock_rate as u64) + .unwrap(); + PacketToBufferRelation::SeqnumsWithOffset { + seqnums: range, + timestamp_offset: TimestampOffset::PtsAndDts(pts_offset, dts_offset), + } + } else if let Some(cts_delta) = au.cts_delta { + let pts_offset = gst::Signed::::from(cts_delta as i64) + .mul_div_floor(*gst::ClockTime::SECOND, clock_rate as u64) + .unwrap(); + PacketToBufferRelation::SeqnumsWithOffset { + seqnums: range, + timestamp_offset: TimestampOffset::Pts(pts_offset), + } + } else { + PacketToBufferRelation::Seqnums(range) + } + } + + match aus { + Single(au) => { + let range = if let Some(packet_ext_seqnum) = packet_ext_seqnum { + au.ext_seqnum..=packet_ext_seqnum + } else { + au.ext_seqnum..=au.ext_seqnum + }; + + let packet_to_buffer_relation = + get_packet_to_buffer_relation(&au, state.clock_rate, range); + + gst::trace!(CAT, imp: self, "Finishing AU buffer {packet_to_buffer_relation:?}"); + + let buffer = Self::new_buffer(au, state); + + self.obj().queue_buffer(packet_to_buffer_relation, buffer)?; + } + List(au_list) => { + for au in au_list { + let range = if let Some(packet_ext_seqnum) = packet_ext_seqnum { + au.ext_seqnum..=packet_ext_seqnum + } else { + au.ext_seqnum..=au.ext_seqnum + }; + + let packet_to_buffer_relation = + get_packet_to_buffer_relation(&au, state.clock_rate, range); + + gst::trace!(CAT, imp: self, "Finishing AU buffer {packet_to_buffer_relation:?}"); + + let buffer = Self::new_buffer(au, state); + + self.obj().queue_buffer(packet_to_buffer_relation, buffer)?; + } + } + } + + Ok(gst::FlowSuccess::Ok) + } + + #[inline] + fn new_buffer(au: AccessUnit, state: &State) -> gst::Buffer { + let mut buf = gst::Buffer::from_mut_slice(au.data); + let buf_mut = buf.get_mut().unwrap(); + + if au.maybe_random_access == Some(false) { + buf_mut.set_flags(gst::BufferFlags::DELTA_UNIT) + } + + if let Some(duration) = au.duration { + let duration = (duration as u64) + .mul_div_floor(*gst::ClockTime::SECOND, state.clock_rate as u64) + .map(gst::ClockTime::from_nseconds); + + buf_mut.set_duration(duration); + } + + buf + } +} diff --git a/net/rtp/src/mp4g/depay/mod.rs b/net/rtp/src/mp4g/depay/mod.rs new file mode 100644 index 00000000..f26528f6 --- /dev/null +++ b/net/rtp/src/mp4g/depay/mod.rs @@ -0,0 +1,185 @@ +// GStreamer RTP MPEG-4 generic elementary streams Depayloader +// +// Copyright (C) 2023-2024 François Laignel +// +// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at +// . +// +// SPDX-License-Identifier: MPL-2.0 + +pub mod imp; +pub(crate) mod parsers; + +mod deint_buf; +pub(crate) use deint_buf::DeinterleaveAuBuffer; + +use gst::glib; +use gst::prelude::*; +use smallvec::SmallVec; + +use std::fmt; + +use crate::mp4g::{AccessUnitIndex, Mpeg4GenericError}; + +glib::wrapper! { + pub struct RtpMpeg4GenericDepay(ObjectSubclass) + @extends crate::basedepay::RtpBaseDepay2, gst::Element, gst::Object; +} + +pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { + gst::Element::register( + Some(plugin), + "rtpmp4gdepay2", + gst::Rank::MARGINAL, + RtpMpeg4GenericDepay::static_type(), + ) +} + +#[derive(thiserror::Error, Debug, PartialEq, Eq)] +pub enum Mpeg4GenericDepayError { + #[error("{}", .0)] + Mpeg4Generic(#[from] Mpeg4GenericError), + + #[error("AU header section too large: expected end {expected_end} / {total}")] + AuHeaderSectionTooLarge { expected_end: usize, total: usize }, + + #[error("AU auxiliary section too large: expected end {expected_end} / {total}")] + AuAuxiliarySectionTooLarge { expected_end: usize, total: usize }, + + #[error("Empty AU Data section")] + EmptyAuData, + + #[error("Unknown AU size, but multiple AUs in the packet")] + MultipleAusUnknownSize, + + #[error("Multiple AUs in packet but the AU size {au_size} is > AU data size {au_data_size}")] + MultipleAusGreaterSizeThanAuData { au_size: usize, au_data_size: usize }, + + #[error("No more AU data left for AU with index {index}")] + NoMoreAuDataLeft { index: AccessUnitIndex }, + + #[error("Got AU with index {index} which is earlier than the expected index {expected_index}")] + TooEarlyAU { + index: AccessUnitIndex, + expected_index: AccessUnitIndex, + }, + + #[error("Unexpected non-zero first AU index {index} in packet {ext_seqnum} due to configured constant duration")] + ConstantDurationAuNonZeroIndex { + index: AccessUnitIndex, + ext_seqnum: u64, + }, + + #[error("Constant duration not configured and no headers in packet {ext_seqnum}")] + NonConstantDurationNoAuHeaders { ext_seqnum: u64 }, + + #[error("Constant duration not configured and no CTS delta for AU index {index} in packet {ext_seqnum}")] + NonConstantDurationAuNoCtsDelta { + index: AccessUnitIndex, + ext_seqnum: u64, + }, + + #[error( + "Fragmented AU size mismatch: expected {expected}, found {found}. Packet {ext_seqnum}" + )] + FragmentedAuSizeMismatch { + expected: u32, + found: usize, + ext_seqnum: u64, + }, + + #[error("Fragmented AU CTS mismatch: expected {expected}, found {found}. Packet {ext_seqnum}")] + FragmentedAuRtpTsMismatch { + expected: i32, + found: i32, + ext_seqnum: u64, + }, + + #[error("Fragmented AU DTS mismatch: expected {expected}, found {found}. Packet {ext_seqnum}")] + FragmentedAuDtsMismatch { + expected: i32, + found: i32, + ext_seqnum: u64, + }, +} + +#[derive(Debug)] +#[allow(clippy::large_enum_variant)] +pub enum SingleAuOrList { + Single(AccessUnit), + List(SmallVec<[AccessUnit; 5]>), +} + +impl SingleAuOrList { + pub fn new_list(capacity: usize) -> Self { + SingleAuOrList::List(SmallVec::with_capacity(capacity)) + } + + #[inline] + pub fn push(&mut self, au: AccessUnit) { + use SingleAuOrList::*; + match self { + Single(_) => { + let list = List(SmallVec::new()); + let prev = std::mem::replace(self, list); + let Single(prev) = prev else { unreachable!() }; + let List(list) = self else { unreachable!() }; + list.push(prev); + list.push(au); + } + List(list) => list.push(au), + } + } +} + +#[derive(Debug, Default)] +pub struct MaybeSingleAuOrList(Option); + +impl MaybeSingleAuOrList { + pub fn new_list(capacity: usize) -> Self { + MaybeSingleAuOrList(Some(SingleAuOrList::new_list(capacity))) + } + + pub fn push(&mut self, au: AccessUnit) { + match &mut self.0 { + Some(inner) => inner.push(au), + None => self.0 = Some(SingleAuOrList::Single(au)), + } + } + + #[inline] + pub fn take(&mut self) -> Option { + self.0.take() + } +} + +impl From for MaybeSingleAuOrList { + fn from(au: AccessUnit) -> Self { + MaybeSingleAuOrList(Some(SingleAuOrList::Single(au))) + } +} + +/// A parsed Access Unit. +/// +/// All timestamps and duration in clock rate ticks. +/// All timestamps are based on the RTP timestamp of the packet. +#[derive(Debug, Default)] +pub struct AccessUnit { + pub(crate) ext_seqnum: u64, + pub(crate) is_fragment: bool, + pub(crate) size: Option, + pub(crate) index: AccessUnitIndex, + pub(crate) cts_delta: Option, + pub(crate) dts_delta: Option, + pub(crate) duration: Option, + pub(crate) maybe_random_access: Option, + pub(crate) is_interleaved: bool, + pub(crate) data: Vec, +} + +impl fmt::Display for AccessUnit { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "index {} from Packet {}", self.index, self.ext_seqnum) + } +} diff --git a/net/rtp/src/mp4g/depay/parsers.rs b/net/rtp/src/mp4g/depay/parsers.rs new file mode 100644 index 00000000..42b74f0c --- /dev/null +++ b/net/rtp/src/mp4g/depay/parsers.rs @@ -0,0 +1,1050 @@ +use anyhow::Context; +use bitstream_io::{BigEndian, BitRead, BitReader, ByteRead, ByteReader}; + +use std::io::Cursor; + +use super::{AccessUnit, Mpeg4GenericDepayError}; +use crate::mp4g::{AccessUnitIndex, AuHeader, AuHeaderContext, ModeConfig, RtpTimestamp}; + +/// The reference Packet to compute constant duration when applicable. +#[derive(Debug, Default)] +struct ConstantDurationProbation { + ts: RtpTimestamp, + frames: u32, +} + +impl From for ConstantDurationProbation { + fn from(ts: RtpTimestamp) -> Self { + ConstantDurationProbation { ts, frames: 0 } + } +} + +/// MPEG-4 generic Payload: https://www.rfc-editor.org/rfc/rfc3640.html#section-2.11 +/// +/// +---------+-----------+-----------+---------------+ +/// | RTP | AU Header | Auxiliary | Access Unit | +/// | Header | Section | Section | Data Section | +/// +---------+-----------+-----------+---------------+ +/// +/// . <----------RTP Packet Payload-----------> +#[derive(Debug, Default)] +pub struct PayloadParser { + config: ModeConfig, + const_dur_probation: Option, + // Constant duration as provided by the config + // or determined while parsing the payloads + constant_duration: Option, +} + +impl PayloadParser { + #[allow(dead_code)] + pub fn new_for(config: ModeConfig) -> Self { + PayloadParser { + constant_duration: config.constant_duration(), + config, + ..Default::default() + } + } + + pub fn set_config(&mut self, config: ModeConfig) { + self.config = config; + self.reset(); + } + + pub fn reset(&mut self) { + self.constant_duration = self.config.constant_duration(); + self.const_dur_probation = None; + } + + pub fn parse<'a>( + &'a mut self, + payload: &'a [u8], + ext_seqnum: u64, + packet_ts: RtpTimestamp, + ) -> anyhow::Result> { + use Mpeg4GenericDepayError::*; + + let mut headers_len = 0; + let mut headers = None; + let mut data_offset = 0; + + let mut r = ByteReader::endian(payload, BigEndian); + + if self.config.has_header_section() { + // AU Header section: https://www.rfc-editor.org/rfc/rfc3640.html#section-3.2.1 + // + // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+- .. -+-+-+-+-+-+-+-+-+-+ + // |AU-headers-length|AU-header|AU-header| |AU-header|padding| + // | | (1) | (2) | | (n) * | bits | + // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+- .. -+-+-+-+-+-+-+-+-+-+ + + // This is expressed in bits + headers_len = r.read::().context("AU-headers-length")?; + + // Up to 7 bits of padding + let headers_len_bytes = (headers_len as usize + 7) / 8; + + data_offset = 2 + headers_len_bytes; + if data_offset > payload.len() { + Err(AuHeaderSectionTooLarge { + expected_end: data_offset, + total: payload.len(), + })?; + } + + r.skip(headers_len_bytes as u32) + .expect("availability checked above"); + + headers = Some(&payload[2..data_offset]); + } else if self.constant_duration.is_none() { + // No headers and non-constant duration + + // § 3.2.3.2: + // > When transmitting Access Units of variable duration, then the + // > "constantDuration" parameter MUST NOT be present [...] + // > the CTS-delta MUST be coded in the AU header for each non-first AU + // > in the RTP packet + + Err(NonConstantDurationNoAuHeaders { ext_seqnum })?; + } + + if self.config.has_auxiliary_section() { + // Move the AU reader after the Auxiliary Section + // + // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+- .. -+-+-+-+-+-+-+-+-+ + // | auxiliary-data-size | auxiliary-data |padding bits | + // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+- .. -+-+-+-+-+-+-+-+-+ + + // This is expressed in bits + let aux_len = r.read::().context("auxiliary-data-size")?; + + // Up to 7 bits of padding + let aux_len_bytes = (aux_len as usize + 7) / 8; + + data_offset += 2 + aux_len_bytes; + if data_offset > payload.len() { + Err(AuAuxiliarySectionTooLarge { + expected_end: data_offset, + total: payload.len(), + })?; + } + } + + if data_offset >= payload.len() { + Err(EmptyAuData)?; + } + + Ok(AccessUnitIter { + parser: self, + ext_seqnum, + packet_ts, + prev_index: None, + headers_r: headers.map(|h| BitReader::endian(Cursor::new(h), BigEndian)), + headers_len, + data: &payload[data_offset..], + cur: 0, + }) + } +} + +#[derive(Debug)] +pub struct AccessUnitIter<'a> { + parser: &'a mut PayloadParser, + ext_seqnum: u64, + packet_ts: RtpTimestamp, + prev_index: Option, + headers_r: Option, BigEndian>>, + headers_len: u16, + data: &'a [u8], + cur: u32, +} + +impl<'a> Iterator for AccessUnitIter<'a> { + type Item = anyhow::Result; + + fn next(&mut self) -> Option { + let res = self.next_priv(); + if let Some(Err(_)) = res.as_ref() { + self.parser.reset(); + } + + res + } +} + +impl<'a> AccessUnitIter<'a> { + fn next_priv(&mut self) -> Option> { + use Mpeg4GenericDepayError::*; + + let mut cts_delta = None; + let mut duration = None; + let header = if let Some(r) = self.headers_r.as_mut() { + let pos = r.position_in_bits().unwrap() as u16; + if pos >= self.headers_len { + // No more AUs + return None; + } + + let ctx = AuHeaderContext { + config: &self.parser.config, + prev_index: self.prev_index, + }; + + let header = match r.parse_with::(&ctx) { + Ok(header) => header, + Err(err) => { + return Some( + Err(err).with_context(|| format!("AuHeader in packet {}", self.ext_seqnum)), + ) + } + }; + + if self.prev_index.is_none() { + // First AU header of the packet + if header.index.is_zero() { + if self.parser.constant_duration.is_none() { + // > In the absence of the constantDuration parameter + // > receivers MUST conclude that the AUs have constant duration + // > if the AU-index is zero in two consecutive RTP packets. + + if let Some(cd_prob) = self.parser.const_dur_probation.take() { + let dur = *(self.packet_ts - cd_prob.ts) / cd_prob.frames; + self.parser.constant_duration = Some(dur); + } else { + // Keep first packet for constant duration probation: + self.parser.const_dur_probation = Some(self.packet_ts.into()); + } + } + } else if self.parser.constant_duration.is_some() { + // § 3.2.3.2: + // > when transmitting Access Units of constant duration, the AU-Index, + // > if present, MUST be set to the value 0 + return Some(Err(ConstantDurationAuNonZeroIndex { + index: header.index, + ext_seqnum: self.ext_seqnum, + } + .into())); + } else if self.parser.const_dur_probation.is_some() { + // Constant duration was in probation but index is not zero + // => not switching to constant duration yet + self.parser.const_dur_probation = None; + } + } + if let Some(delta) = header.cts_delta { + cts_delta = Some(delta); + } else if let Some(constant_duration) = self.parser.constant_duration { + // § 3.2.3.2: + // > If the "constantDuration" parameter is present, the receiver can + // > reconstruct the original Access Unit timing based solely on the RTP + // > timestamp and AU-Index-delta. + cts_delta = Some((*header.index * constant_duration) as i32); + duration = Some(constant_duration); + } else if self.prev_index.is_some() && self.parser.const_dur_probation.is_none() { + // Non-constant duration, no CTS-delta, not first header, + // and not constant duration probation in progress + + // § 3.2.3.2: + // > When transmitting Access Units of variable duration, then the + // > "constantDuration" parameter MUST NOT be present [...] + // > the CTS-delta MUST be coded in the AU header for each non-first AU + // > in the RTP packet + + return Some(Err(NonConstantDurationAuNoCtsDelta { + index: header.index, + ext_seqnum: self.ext_seqnum, + } + .into())); + } else if self.prev_index.is_none() { + // First AU but unknown duration + cts_delta = Some(0); + } + + if header.size.is_none() && self.parser.config.constant_size == 0 { + // § 3.2.3: + // > The absence of both AU-size in the AU-header and the constantSize + // > MIME format parameter indicates the carriage of a single AU + // > (fragment), i.e., that a single Access Unit (fragment) is transported + // > in each RTP packet for that stream + + let pos = r.position_in_bits().unwrap() as u16; + if pos < self.headers_len { + // More headers to read + return Some(Err(MultipleAusUnknownSize.into())); + } + } + + if self.data.is_empty() { + // We have exhausted the data section, but there are more headers + return Some(Err(NoMoreAuDataLeft { + index: header.index, + } + .into())); + } + + self.prev_index = Some(header.index); + + header + } else { + // No header section + + if self.data.is_empty() { + // We have exhausted the data section + return None; + } + + let constant_duration = self + .parser + .constant_duration + .expect("checked in PayloadParser::parse"); + + cts_delta = Some((self.cur * constant_duration) as i32); + duration = Some(constant_duration); + + AuHeader::new_with(self.cur) + }; + + let mut is_fragment = false; + + // § 3.2.3 + // > If the AU size is variable, then the + // > size of each AU MUST be indicated in the AU-size field of the + // > corresponding AU-header. However, if the AU size is constant for a + // > stream, this mechanism SHOULD NOT be used; instead, the fixed size + // > SHOULD be signaled by the MIME format parameter "constantSize" + + let au_size = if self.parser.config.constant_size > 0 { + self.parser.config.constant_size as usize + } else if let Some(size) = header.size { + size as usize + } else { + // Unknown size + // Note: MultipleAusUnknownSize case checked above + + self.data.len() + }; + + let data = if au_size <= self.data.len() { + let data = self.data[..au_size].to_owned(); + + // Update self.data for next AU + self.data = &self.data[au_size..]; + + data + } else { + // The len of the AU can exceed the AU data len in case of a framgment + if self.cur > 0 { + return Some(Err(MultipleAusGreaterSizeThanAuData { + au_size, + au_data_size: self.data.len(), + } + .into())); + } + + is_fragment = true; + + let data = self.data[..].to_owned(); + self.data = &[]; + + data + }; + + self.cur += 1; + + if let Some(ref mut cd_prob) = self.parser.const_dur_probation { + cd_prob.frames = self.cur; + } + + Some(Ok(AccessUnit { + ext_seqnum: self.ext_seqnum, + is_fragment, + size: header.size, + index: header.index, + cts_delta, + dts_delta: header.dts_delta, + duration, + maybe_random_access: header.maybe_random_access, + is_interleaved: header.is_interleaved, + data, + })) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use Mpeg4GenericDepayError::*; + + const TS0: RtpTimestamp = RtpTimestamp::ZERO; + const TS21: RtpTimestamp = RtpTimestamp::new(21); + + #[test] + fn no_headers_one_au() { + const CONSTANT_DURATION: u32 = 3; + let mut parser = PayloadParser::new_for(ModeConfig { + constant_duration: CONSTANT_DURATION, + ..Default::default() + }); + + let payload = &[0, 1, 2, 3]; + let mut iter = parser.parse(payload, 0, TS0).unwrap(); + assert_eq!(iter.data.len(), 4); + + let au = iter.next().unwrap().unwrap(); + assert_eq!(au.index, 0); + assert!(!au.is_interleaved); + assert!(!au.is_fragment); // <== + assert_eq!(au.cts_delta, Some(0i32)); + assert!(au.dts_delta.is_none()); + assert_eq!(au.duration, Some(CONSTANT_DURATION)); + assert!(au.maybe_random_access.is_none()); + assert_eq!(au.data, payload); + + assert!(iter.next().is_none()); + } + + #[test] + fn no_headers_one_au_fragmented() { + const CONSTANT_DURATION: u32 = 3; + let mut parser = PayloadParser::new_for(ModeConfig { + constant_size: 6, + constant_duration: 3, + ..Default::default() + }); + + let payload = &[0, 1, 2, 3]; + let mut iter = parser.parse(payload, 42, TS0).unwrap(); + assert_eq!(iter.data.len(), 4); + + let au = iter.next().unwrap().unwrap(); + assert_eq!(au.ext_seqnum, 42); + assert_eq!(au.index, 0); + assert!(!au.is_interleaved); + assert!(au.is_fragment); // <== + assert_eq!(au.duration, Some(CONSTANT_DURATION)); + assert_eq!(au.data, payload); + + assert!(iter.next().is_none()); + } + + #[test] + fn no_headers_two_aus() { + const CONSTANT_DURATION: u32 = 5; + let mut parser = PayloadParser::new_for(ModeConfig { + constant_size: 2, + constant_duration: CONSTANT_DURATION, + ..Default::default() + }); + + let payload = &[0, 1, 2, 3]; + let mut iter = parser.parse(payload, 42, TS21).unwrap(); + + let au = iter.next().unwrap().unwrap(); + assert_eq!(au.ext_seqnum, 42); + assert_eq!(au.index, 0); + assert!(!au.is_interleaved); + assert_eq!(au.cts_delta, Some(0i32)); + assert!(au.dts_delta.is_none()); + assert_eq!(au.duration, Some(CONSTANT_DURATION)); + assert!(!au.is_fragment); + assert_eq!(au.data, payload[..2]); + + let au = iter.next().unwrap().unwrap(); + assert_eq!(au.ext_seqnum, 42); + assert_eq!(au.index, 1); + assert!(!au.is_interleaved); + assert_eq!(au.cts_delta, Some(CONSTANT_DURATION as i32)); + assert!(au.dts_delta.is_none()); + assert_eq!(au.duration, Some(CONSTANT_DURATION)); + assert!(!au.is_fragment); + assert_eq!(au.data, payload[2..][..2]); + + assert!(iter.next().is_none()); + } + + #[test] + fn no_headers_empty_au_data() { + let mut parser = PayloadParser::new_for(ModeConfig { + constant_size: 2, + constant_duration: 2, + ..Default::default() + }); + + let payload = &[]; + let err = parser + .parse(payload, 0, TS0) + .unwrap_err() + .downcast::() + .unwrap(); + assert_eq!(err, EmptyAuData); + } + + #[test] + fn no_headers_two_aus_one_fragment() { + let mut parser = PayloadParser::new_for(ModeConfig { + constant_size: 3, + constant_duration: 2, + ..Default::default() + }); + + let payload = &[0, 1, 2, 3]; + let mut iter = parser.parse(payload, 0, TS0).unwrap(); + + let au = iter.next().unwrap().unwrap(); + assert_eq!(au.index, 0); + assert!(!au.is_fragment); + assert_eq!(au.data, payload[..3]); + + let err = iter + .next() + .unwrap() + .unwrap_err() + .downcast::() + .unwrap(); + assert_eq!( + err, + MultipleAusGreaterSizeThanAuData { + au_size: 3, + au_data_size: 1 + } + ); + } + + #[test] + fn header_one_au() { + let mut parser = PayloadParser::new_for(ModeConfig { + size_len: 2, + ..Default::default() + }); + + // Header section size: 2 bytes. + // Header: + // * 2 bits: data size (3 here => b11.._....) + let payload = &[0x00, 0x02, 0xc0, 0x01, 0x02, 0x03]; + let mut iter = parser.parse(payload, 42, TS0).unwrap(); + assert_eq!(iter.data.len(), 3); + + let au = iter.next().unwrap().unwrap(); + assert_eq!(au.ext_seqnum, 42); + assert_eq!(au.index, 0); + assert!(!au.is_interleaved); + assert!(!au.is_fragment); // <== + assert_eq!(au.cts_delta, Some(0i32)); + assert!(au.dts_delta.is_none()); + assert!(au.duration.is_none()); + assert!(au.maybe_random_access.is_none()); + assert_eq!(au.data, payload[3..][..3]); + + assert!(iter.next().is_none()); + } + + #[test] + fn header_one_au_cts() { + let mut parser = PayloadParser::new_for(ModeConfig { + cts_delta_len: 2, + ..Default::default() + }); + + // Header section size: 2 bytes. + // Header: + // * 1 bits: CTS flag not set because 1st header (0 here => b0...) + // * following 2 bits for CTS delta not present + let payload = &[0x00, 0x01, 0x00, 0x01, 0x02]; + let mut iter = parser.parse(payload, 0, TS21).unwrap(); + assert_eq!(iter.data.len(), 2); + + let au = iter.next().unwrap().unwrap(); + assert_eq!(au.ext_seqnum, 0); + assert!(!au.is_interleaved); + assert_eq!(au.cts_delta, Some(0i32)); + assert!(au.dts_delta.is_none()); + assert!(au.duration.is_none()); + assert!(au.maybe_random_access.is_none()); + assert_eq!(au.data, payload[3..][..2]); + + assert!(iter.next().is_none()); + } + + #[test] + fn header_one_au_cts_set() { + use crate::mp4g::header::AuHeaderError::{self, *}; + + let mut parser = PayloadParser::new_for(ModeConfig { + cts_delta_len: 2, + ..Default::default() + }); + + // Header section size: 2 bytes. + // Header: + // * 3 bits: CTS flag + CTS delta (-1 here => b111.) + let payload = &[0x00, 0x03, 0xe0, 0x01, 0x02]; + let mut iter = parser.parse(payload, 42, TS21).unwrap(); + assert_eq!(iter.data.len(), 2); + + let err = iter + .next() + .unwrap() + .unwrap_err() + .downcast::() + .unwrap(); + assert_eq!(err, CtsFlagSetInFirstAuHeader(AccessUnitIndex::ZERO)); + } + + #[test] + fn header_one_au_random_access() { + let mut parser = PayloadParser::new_for(ModeConfig { + random_access_indication: true, + ..Default::default() + }); + + // Header section size: 2 bytes. + // Header: + // * 1 bit: random access flag (1 here => b1...) + let payload = &[0x00, 0x01, 0x80, 0x01, 0x02]; + let mut iter = parser.parse(payload, 0, TS21).unwrap(); + assert_eq!(iter.data.len(), 2); + + let au = iter.next().unwrap().unwrap(); + assert_eq!(au.ext_seqnum, 0); + assert!(!au.is_interleaved); + assert_eq!(au.maybe_random_access, Some(true)); + assert_eq!(au.data, payload[3..][..2]); + + assert!(iter.next().is_none()); + } + + #[test] + fn header_one_au_random_access_stream_state() { + let mut parser = PayloadParser::new_for(ModeConfig { + random_access_indication: true, + stream_state_indication: 2, + ..Default::default() + }); + + // Header section size: 2 bytes. + // Header: + // * 1 bit: random access flag (1 here => b1...) + // * 2 bits: stream state (3 here => b.11._....) + let payload = &[0x00, 0x03, 0xe0, 0x01, 0x02]; + let mut iter = parser.parse(payload, 0, TS0).unwrap(); + assert_eq!(iter.data.len(), 2); + + let au = iter.next().unwrap().unwrap(); + assert!(!au.is_interleaved); + assert_eq!(au.maybe_random_access, Some(true)); + assert_eq!(au.data, payload[3..][..2]); + + assert!(iter.next().is_none()); + } + + #[test] + fn header_one_au_fragemented() { + let mut parser = PayloadParser::new_for(ModeConfig { + size_len: 2, + ..Default::default() + }); + + // Header section size: 2 bytes. + // Header: + // * 2 bits: data size (3 here => b11.._....) + let payload = &[0x00, 0x02, 0xc0, 0x01, 0x02]; + let mut iter = parser.parse(payload, 0, TS0).unwrap(); + assert_eq!(iter.data.len(), 2); + + let au = iter.next().unwrap().unwrap(); + assert_eq!(au.index, 0); + assert!(!au.is_interleaved); + assert!(au.is_fragment); // <== + assert_eq!(au.data, payload[3..][..2]); + + assert!(iter.next().is_none()); + } + + #[test] + fn header_two_aus() { + const CONSTANT_DURATION: u32 = 5; + let mut parser = PayloadParser::new_for(ModeConfig { + size_len: 2, + constant_duration: CONSTANT_DURATION, + ..Default::default() + }); + + // Header section size: 2 bytes. + // Each AU-header: + // * 2 bits: data size (AU1: 2 => b10.._...., AU2: 3 => b..11_....) + let payload = &[0x00, 0x04, 0xb0, 0x01, 0x02, 0x03, 0x04, 0x05]; + let mut iter = parser.parse(payload, 42, TS0).unwrap(); + assert_eq!(iter.data.len(), 5); + + let au = iter.next().unwrap().unwrap(); + assert_eq!(au.ext_seqnum, 42); + assert_eq!(au.index, 0); + assert!(!au.is_interleaved); + assert_eq!(au.cts_delta, Some(0i32)); + assert!(au.dts_delta.is_none()); + assert_eq!(au.duration, Some(CONSTANT_DURATION)); + assert!(!au.is_fragment); + assert_eq!(au.data, payload[3..][..2]); + + let au = iter.next().unwrap().unwrap(); + assert_eq!(au.ext_seqnum, 42); + assert_eq!(au.index, 1); + assert_eq!(au.cts_delta, Some(CONSTANT_DURATION as i32)); + assert!(au.dts_delta.is_none()); + assert_eq!(au.duration, Some(CONSTANT_DURATION)); + assert!(!au.is_interleaved); + assert!(!au.is_fragment); + assert_eq!(au.data, payload[5..][..3]); + + assert!(iter.next().is_none()); + } + + #[test] + fn header_two_aus_const_duration_probation() { + const CONSTANT_DURATION: u32 = 2; + + let mut parser = PayloadParser::new_for(ModeConfig { + size_len: 2, + ..Default::default() + }); + + assert!(parser.constant_duration.is_none()); + assert!(parser.const_dur_probation.is_none()); + + // Header section size: 2 bytes. + // Each AU-header: + // * 2 bits: data size (AU1: 2 => b10.._...., AU2: 3 => b..11_....) + let payload = &[0x00, 0x04, 0xb0, 0x01, 0x02, 0x03, 0x04, 0x05]; + let mut iter = parser.parse(payload, 42, TS0).unwrap(); + + let au = iter.next().unwrap().unwrap(); + assert_eq!(au.index, 0); + assert_eq!(au.cts_delta, Some(0i32)); + assert!(au.dts_delta.is_none()); + assert!(au.duration.is_none()); + assert_eq!(au.data, payload[3..][..2]); + + let au = iter.next().unwrap().unwrap(); + assert!(au.cts_delta.is_none()); + assert!(au.dts_delta.is_none()); + assert!(au.duration.is_none()); + assert_eq!(au.data, payload[5..][..3]); + + assert!(iter.next().is_none()); + + assert!(parser.constant_duration.is_none()); + assert!(parser.const_dur_probation.is_some()); + + let ts4 = TS0 + 2 * CONSTANT_DURATION; + + // Header section size: 2 bytes. + // Each AU-header: + // * 2 bits: data size (AU1: 2 => b10.._...., AU2: 3 => b..11_....) + let payload = &[0x00, 0x04, 0xb0, 0x01, 0x02, 0x03, 0x04, 0x05]; + let mut iter = parser.parse(payload, 42, ts4).unwrap(); + + let au = iter.next().unwrap().unwrap(); + assert_eq!(au.index, 0); + assert_eq!(au.cts_delta, Some(0i32)); + assert!(au.dts_delta.is_none()); + assert_eq!(au.duration, Some(CONSTANT_DURATION)); + assert_eq!(au.data, payload[3..][..2]); + + let au = iter.next().unwrap().unwrap(); + assert_eq!(au.cts_delta, Some(CONSTANT_DURATION as i32)); + assert!(au.dts_delta.is_none()); + assert_eq!(au.duration, Some(CONSTANT_DURATION)); + assert_eq!(au.data, payload[5..][..3]); + + assert!(iter.next().is_none()); + + assert_eq!(parser.constant_duration, Some(CONSTANT_DURATION)); + assert!(parser.const_dur_probation.is_none()); + } + + #[test] + fn header_two_au_cts() { + let mut parser = PayloadParser::new_for(ModeConfig { + size_len: 2, + cts_delta_len: 2, + ..Default::default() + }); + + // Header section size: 2 bytes. + // Header: + // * Header 1 + // - 2 bits: len 2 => b10.. + // - 1 bit: CTS flag not set because 1st header => b..0. + // * Header 2 + // - 2 bits: len 3 => b...1_1... + // - 3 bits: CTS flag + CTS delta +1 in 2's comp => b...._.101) + let payload = &[0x00, 0x08, 0x9d, 0x01, 0x02, 0x03, 0x04, 0x05]; + let mut iter = parser.parse(payload, 42, TS21).unwrap(); + assert_eq!(iter.data.len(), 5); + + let au = iter.next().unwrap().unwrap(); + assert_eq!(au.ext_seqnum, 42); + assert_eq!(au.index, 0); + assert!(!au.is_interleaved); + assert_eq!(au.cts_delta, Some(0i32)); + assert!(au.dts_delta.is_none()); + assert!(au.duration.is_none()); + assert!(!au.is_fragment); + assert_eq!(au.data, payload[3..][..2]); + + let au = iter.next().unwrap().unwrap(); + assert_eq!(au.ext_seqnum, 42); + assert_eq!(au.index, 1); + assert!(!au.is_interleaved); + assert!(!au.is_fragment); + assert_eq!(au.cts_delta, Some(1i32)); + assert!(au.dts_delta.is_none()); + assert!(au.duration.is_none()); + assert_eq!(au.data, payload[5..][..3]); + + assert!(iter.next().is_none()); + } + + #[test] + fn header_three_aus_delta_index() { + const CONSTANT_DURATION: u32 = 5; + let mut parser = PayloadParser::new_for(ModeConfig { + size_len: 2, + constant_duration: CONSTANT_DURATION, + index_delta_len: 1, + ..Default::default() + }); + + // Header section size: 2 bytes. + // Each AU-header: 2 bits: data size + // For headers 2 & 3: 1 bits: AU-index-delta + // + // - AU1: size 2 => b10.._.... + // - AU2: size 3, delta 0 => b..11_0... + // - AU3: size 1, delta 1 => b...._.011 + let payload = &[0x00, 0x07, 0xb3, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06]; + let mut iter = parser.parse(payload, 0, TS0).unwrap(); + assert_eq!(iter.data.len(), 6); + + let au = iter.next().unwrap().unwrap(); + assert_eq!(au.index, 0); + assert!(!au.is_interleaved); + assert!(!au.is_fragment); + assert_eq!(au.cts_delta, Some(0i32)); + assert!(au.dts_delta.is_none()); + assert_eq!(au.duration, Some(CONSTANT_DURATION)); + assert_eq!(au.data, payload[3..][..2]); + + let au = iter.next().unwrap().unwrap(); + assert_eq!(au.index, 1); + assert!(!au.is_interleaved); + assert!(!au.is_fragment); + assert_eq!(au.cts_delta, Some((*au.index * CONSTANT_DURATION) as i32)); + assert!(au.dts_delta.is_none()); + assert_eq!(au.duration, Some(CONSTANT_DURATION)); + assert_eq!(au.data, payload[5..][..3]); + + let au = iter.next().unwrap().unwrap(); + assert_eq!(au.index, 3); + assert!(au.is_interleaved); + assert!(!au.is_fragment); + assert_eq!(au.cts_delta, Some((*au.index * CONSTANT_DURATION) as i32)); + assert!(au.dts_delta.is_none()); + assert_eq!(au.duration, Some(CONSTANT_DURATION)); + assert_eq!(au.data, payload[8..][..1]); + + assert!(iter.next().is_none()); + } + + #[test] + fn header_three_au_cts() { + let mut parser = PayloadParser::new_for(ModeConfig { + size_len: 2, + cts_delta_len: 3, + dts_delta_len: 3, + ..Default::default() + }); + + // Header section size: 2 bytes. + // Header: + // * Header 1 + // - 2 bits: len 2 => b10.. + // - 1 bit: CTS flag not set because 1st header => b..0. + // - 4 bits: DTS flag + DTS delta -2 in 2's comp => b...1_110.) + // * Header 2 + // - 2 bits: len 3 => b...._...1 1..._.... + // - 4 bits: CTS flag + CTS delta +1 in 2's comp => b.100_1...) + // - 4 bit: DTS flag + DTS delta 0 (i.e. same as CTS) => b...._.100 0..._....) + // * Header 3 + // - 2 bits: len 1 => b.01._.... + // - 4 bits: CTS flag + CTS delta +2 in 2's comp => b...1_010.) + // - 4 bits: DTS flag + DTS delta -2 in 2's comp => b...._...1 110._....) + let payload = &[ + 0x00, 0x1b, 0x9d, 0xcc, 0x35, 0xc0, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, + ]; + let mut iter = parser.parse(payload, 0, TS21).unwrap(); + assert_eq!(iter.data.len(), 6); + + let au = iter.next().unwrap().unwrap(); + assert_eq!(au.ext_seqnum, 0); + assert_eq!(au.index, 0); + assert!(!au.is_interleaved); + assert!(!au.is_fragment); + assert_eq!(au.cts_delta, Some(0i32)); + assert_eq!(au.dts_delta, Some(-2i32)); + assert_eq!(au.data, payload[6..][..2]); + + let au = iter.next().unwrap().unwrap(); + assert_eq!(au.ext_seqnum, 0); + assert_eq!(au.index, 1); + assert!(!au.is_interleaved); + assert!(!au.is_fragment); + assert_eq!(au.cts_delta, Some(1i32)); + assert_eq!(au.dts_delta, Some(0i32)); + assert_eq!(au.data, payload[8..][..3]); + + let au = iter.next().unwrap().unwrap(); + assert_eq!(au.ext_seqnum, 0); + assert_eq!(au.index, 2); + assert!(!au.is_interleaved); + assert_eq!(au.cts_delta, Some(2i32)); + assert_eq!(au.dts_delta, Some(-2i32)); + assert!(!au.is_fragment); + assert_eq!(au.data, payload[11..][..1]); + + assert!(iter.next().is_none()); + } + + #[test] + fn header_one_au_unknown_size() { + let mut parser = PayloadParser::new_for(ModeConfig { + index_len: 2, + ..Default::default() + }); + + // Header section size: 2 bytes. + // Header: + // * 2 bits: AU-index. (set to 0) + let payload = &[0x00, 0x02, 0x00, 0x01, 0x02]; + let mut iter = parser.parse(payload, 0, TS0).unwrap(); + assert_eq!(iter.data.len(), 2); + + let au = iter.next().unwrap().unwrap(); + assert_eq!(au.index, 0); + assert!(!au.is_fragment); // <== + assert_eq!(au.data, payload[3..][..2]); + + assert!(iter.next().is_none()); + } + + #[test] + fn header_two_aus_unknown_size() { + let mut parser = PayloadParser::new_for(ModeConfig { + index_len: 1, + index_delta_len: 2, + ..Default::default() + }); + + // Header section size: 2 bytes. + // First AU-header: + // * 1 bit: AU-index. (set to 0) + // Second AU-header: + // * 2 bits: AU-index-delta. (set to 2) + let payload = &[0x00, 0x03, 0x40, 0x01, 0x02, 0x03, 0x04]; + let mut iter = parser.parse(payload, 0, TS0).unwrap(); + assert_eq!(iter.data.len(), 4); + + let err = iter + .next() + .unwrap() + .unwrap_err() + .downcast::() + .unwrap(); + assert_eq!(err, MultipleAusUnknownSize); + } + + #[test] + fn header_two_aus_one_fragment() { + let mut parser = PayloadParser::new_for(ModeConfig { + size_len: 2, + constant_duration: 2, + ..Default::default() + }); + + // Header section size: 2 bytes. + // Each AU-header: + // * 2 bits: data size (AU1: 2 => b10.._...., AU2: 3 => b..11_....) + let payload = &[0x00, 0x04, 0xb0, 0x01, 0x02, 0x03]; + let mut iter = parser.parse(payload, 0, TS0).unwrap(); + assert_eq!(iter.data.len(), 3); + + let au = iter.next().unwrap().unwrap(); + assert_eq!(au.index, 0); + assert!(!au.is_interleaved); + assert!(!au.is_fragment); + assert_eq!(au.data, payload[3..][..2]); + + let err = iter + .next() + .unwrap() + .unwrap_err() + .downcast::() + .unwrap(); + assert_eq!( + err, + MultipleAusGreaterSizeThanAuData { + au_size: 3, + au_data_size: 1 + } + ); + } + + #[test] + fn header_section_too_large() { + let mut parser = PayloadParser::new_for(ModeConfig { + size_len: 4, + ..Default::default() + }); + + // Header section size: 2 bytes. + // Each Au-Header: 4 bits, so 3 of them here. + let payload = &[0x00, 0x0c, 0x00]; + let err = parser + .parse(payload, 0, TS0) + .unwrap_err() + .downcast::() + .unwrap(); + assert_eq!( + err, + AuHeaderSectionTooLarge { + expected_end: 4, + total: 3 + } + ); + } + + #[test] + fn auxiliary_section_too_large() { + let mut parser = PayloadParser::new_for(ModeConfig { + random_access_indication: true, + auxiliary_data_size_len: 2, + ..Default::default() + }); + + // Header section size: 2 bytes. + // * 2x Au-Header: 3 bits each. 3 of them here + padding => 1 byte + // Auxiliary section size: 2 bytes. (one byte available instead of 2 advertised) + let payload = &[0x00, 0x06, 0x00, 0x00, 0x0c, 0x00]; + let err = parser + .parse(payload, 0, TS0) + .unwrap_err() + .downcast::() + .unwrap(); + assert_eq!( + err, + AuAuxiliarySectionTooLarge { + expected_end: 7, + total: 6, + } + ); + } +} diff --git a/net/rtp/src/mp4g/header.rs b/net/rtp/src/mp4g/header.rs new file mode 100644 index 00000000..113d8f2b --- /dev/null +++ b/net/rtp/src/mp4g/header.rs @@ -0,0 +1,120 @@ +//! Access Unit Header and its parser & writer. + +use bitstream_io::{BitRead, FromBitStreamWith}; + +use crate::mp4g::{AccessUnitIndex, ModeConfig}; +use crate::utils::raw_2_comp_to_i32; + +#[derive(thiserror::Error, Debug, PartialEq, Eq)] +pub enum AuHeaderError { + #[error("Unexpected zero-sized AU {}", .0)] + ZeroSizedAu(AccessUnitIndex), + + #[error("Unexpected CTS flag set for the first AU header {}", .0)] + CtsFlagSetInFirstAuHeader(AccessUnitIndex), +} + +#[derive(Debug)] +pub struct AuHeaderContext<'a> { + pub(crate) config: &'a ModeConfig, + pub(crate) prev_index: Option, +} + +#[derive(Debug, Default)] +pub struct AuHeader { + pub(crate) size: Option, + pub(crate) index: AccessUnitIndex, + pub(crate) cts_delta: Option, + pub(crate) dts_delta: Option, + pub(crate) maybe_random_access: Option, + pub(crate) is_interleaved: bool, +} + +impl AuHeader { + #[inline] + pub(crate) fn new_with(index: impl Into) -> Self { + AuHeader { + index: index.into(), + ..Default::default() + } + } +} + +impl<'a> FromBitStreamWith<'a> for AuHeader { + type Context = AuHeaderContext<'a>; + type Error = anyhow::Error; + + fn from_reader( + r: &mut R, + ctx: &AuHeaderContext, + ) -> Result { + use anyhow::Context; + use AuHeaderError::*; + + let mut this = AuHeader::default(); + + if ctx.config.size_len > 0 { + let val = r + .read::(ctx.config.size_len as u32) + .context("AU-size")?; + + // Will ensure the size is non-zero after we get the index + this.size = Some(val); + } + + this.index = match ctx.prev_index { + None => r + .read::(ctx.config.index_len as u32) + .context("AU-Index")? + .into(), + Some(prev_index) => { + let delta = r + .read::(ctx.config.index_delta_len as u32) + .context("AU-Index-delta")?; + if delta > 0 { + this.is_interleaved = true; + } + + prev_index + 1u32 + delta + } + }; + + if let Some(0) = this.size { + Err(ZeroSizedAu(this.index))?; + } + + if ctx.config.cts_delta_len > 0 && r.read_bit().context("CTS-flag")? { + if ctx.prev_index.is_none() { + // § 3.2.1.1: + // > the CTS-flag field MUST have the value 0 in the first AU-header + Err(CtsFlagSetInFirstAuHeader(this.index))?; + } + + let delta = r + .read::(ctx.config.cts_delta_len as u32) + .context("CTS-delta")?; + let delta = raw_2_comp_to_i32(delta, ctx.config.cts_delta_len); + this.cts_delta = Some(delta); + } + + if ctx.config.dts_delta_len > 0 && r.read_bit().context("DTS-flag")? { + let delta = r + .read::(ctx.config.dts_delta_len as u32) + .context("DTS-delta")?; + let delta = raw_2_comp_to_i32(delta, ctx.config.dts_delta_len); + this.dts_delta = Some(delta); + } + + if ctx.config.random_access_indication { + this.maybe_random_access = Some(r.read_bit().context("RAP-flag")?); + } + + // ignored by gstrtpmp4gdepay + if ctx.config.stream_state_indication > 0 { + r.skip(ctx.config.stream_state_indication as u32) + .context("Stream-state")?; + } + + Ok(this) + } +} diff --git a/net/rtp/src/mp4g/mod.rs b/net/rtp/src/mp4g/mod.rs new file mode 100644 index 00000000..7a9c8309 --- /dev/null +++ b/net/rtp/src/mp4g/mod.rs @@ -0,0 +1,30 @@ +// SPDX-License-Identifier: MPL-2.0 + +pub mod depay; +mod header; +pub use header::{AuHeader, AuHeaderContext}; +mod mode; +pub use mode::ModeConfig; + +#[derive(thiserror::Error, Debug, PartialEq, Eq)] +pub enum Mpeg4GenericError { + #[error("Can't compare AU index 0x8000_0000 to 0")] + AuIndexComparisonLimit, + + #[error("Can't compare RTP timestamps 0x8000_0000 to 0")] + RTPTimestampComparisonLimit, +} + +/// An Access Unit Index implemented as a comparable new type on a `[std::num::Wrapping]::`. +define_wrapping_comparable_u32_with_display!( + AccessUnitIndex, + Mpeg4GenericError, + AuIndexComparisonLimit, +); + +/// An RTP timestamp implemented as a comparable new type on a `[std::num::Wrapping]::`. +define_wrapping_comparable_u32_with_display!( + RtpTimestamp, + Mpeg4GenericError, + RTPTimestampComparisonLimit, +); diff --git a/net/rtp/src/mp4g/mode.rs b/net/rtp/src/mp4g/mode.rs new file mode 100644 index 00000000..f2b1c571 --- /dev/null +++ b/net/rtp/src/mp4g/mode.rs @@ -0,0 +1,141 @@ +//! MPEG-4 Generic mode. + +use std::str::FromStr; + +#[derive(thiserror::Error, Debug, PartialEq, Eq)] +pub enum ModeError { + #[error("sizelength & constantsize can't be both defined")] + BothAuSizeLenAndConstantSize, + + #[error("Neither sizelength nor constantsize are defined, need at least one of them")] + NeitherAuSizeLenNorConstantSize, + + #[error("indexlength > 0 but indexdeltalength not defined")] + MandatoryIndexDeltaLength, +} + +#[derive(Debug, Default)] +pub struct ModeConfig { + pub(crate) size_len: u8, + pub(crate) index_len: u8, + pub(crate) index_delta_len: u8, + pub(crate) cts_delta_len: u8, + pub(crate) dts_delta_len: u8, + pub(crate) random_access_indication: bool, + pub(crate) stream_state_indication: u8, + pub(crate) auxiliary_data_size_len: u8, + pub(crate) constant_size: u32, + pub(crate) constant_duration: u32, + pub(crate) max_displacement: u32, +} + +impl ModeConfig { + #[inline] + pub fn has_header_section(&self) -> bool { + self.size_len > 0 + || self.index_len > 0 + || self.index_delta_len > 0 + || self.cts_delta_len > 0 + || self.dts_delta_len > 0 + || self.random_access_indication + || self.stream_state_indication > 0 + } + + #[inline] + pub fn has_auxiliary_section(&self) -> bool { + self.auxiliary_data_size_len > 0 + } + + #[inline] + pub fn constant_duration(&self) -> Option { + if self.constant_duration == 0 { + return None; + } + + Some(self.constant_duration) + } + + #[inline] + pub fn max_displacement(&self) -> Option { + if self.max_displacement == 0 { + return None; + } + + Some(self.max_displacement) + } + + pub fn from_caps(s: &gst::StructureRef) -> anyhow::Result { + use ModeError::*; + + // These values are optional and have a default value of 0 (no header) + + let size_len = Self::parse_int::(s, "sizelength")?; + let constant_size = Self::parse_int::(s, "constantsize")?; + + if size_len != 0 && constant_size != 0 { + Err(BothAuSizeLenAndConstantSize)?; + } + + if size_len == 0 && constant_size == 0 { + Err(NeitherAuSizeLenNorConstantSize)?; + } + + // § 3.2.1 + // > If the AU-Index field is present in the first AU-header in the AU + // > Header Section, then the AU-Index-delta field MUST be present in + // > any subsequent (non-first) AU-header. + + let index_len = Self::parse_int::(s, "indexlength")?; + let index_delta_len = Self::parse_int::(s, "indexdeltalength")?; + + if index_len > 0 && index_delta_len == 0 { + Err(MandatoryIndexDeltaLength)?; + } + + // TODO check mode & mode_config conformity + + Ok(ModeConfig { + size_len, + index_len, + index_delta_len, + cts_delta_len: Self::parse_int::(s, "ctsdeltalength")?, + dts_delta_len: Self::parse_int::(s, "dtsdeltalength")?, + random_access_indication: Self::parse_int::(s, "randomaccessindication")? > 0, + stream_state_indication: Self::parse_int::(s, "streamstateindication")?, + auxiliary_data_size_len: Self::parse_int::(s, "auxiliarydatasizelength")?, + constant_size, + constant_duration: Self::parse_int::(s, "constantduration")?, + max_displacement: Self::parse_int::(s, "maxdisplacement")?, + }) + } + + /// Tries to read the `field` from the provided structure as an integer of type `T`. + /// + /// Returns: + /// + /// * `Ok(val)` if the field is present and its value could be parsed. + /// * `Ok(0)` if the field is not present. + /// * `Err(_)` otherwise. + fn parse_int<'a, T>(s: &'a gst::StructureRef, field: &'static str) -> anyhow::Result + where + T: TryFrom + FromStr + gst::glib::value::FromValue<'a>, + >::Error: std::error::Error + Send + Sync + 'static, + ::Err: std::error::Error + Send + Sync + 'static, + { + use anyhow::Context; + use gst::structure::GetError::*; + + match s.get::(field) { + Ok(val) => Ok(val), + Err(FieldNotFound { .. }) => Ok(T::try_from(0i32).unwrap()), + Err(ValueGetError { .. }) => match s.get::(field) { + Ok(val) => Ok(T::try_from(val).context(field)?), + Err(_) => Ok(s + .get::<&str>(field) + .context(field)? + .parse::() + .context(field)?), + }, + } + } +} diff --git a/net/rtp/src/utils.rs b/net/rtp/src/utils.rs index 5215f829..c6edda95 100644 --- a/net/rtp/src/utils.rs +++ b/net/rtp/src/utils.rs @@ -1,7 +1,7 @@ /// Computes the seqnum distance /// /// This makes sense if both seqnums are in the same cycle. -pub(crate) fn seqnum_distance(seqnum1: u16, seqnum2: u16) -> i16 { +pub fn seqnum_distance(seqnum1: u16, seqnum2: u16) -> i16 { // See http://en.wikipedia.org/wiki/Serial_number_arithmetic let seqnum1 = i16::from_ne_bytes(seqnum1.to_ne_bytes()); @@ -10,9 +10,274 @@ pub(crate) fn seqnum_distance(seqnum1: u16, seqnum2: u16) -> i16 { seqnum1.wrapping_sub(seqnum2) } +/// Converts a raw two's complement value of len `bit_len` into an i32. +/// +/// # Panic +/// +/// Panics if `bit_len` > 32. +pub fn raw_2_comp_to_i32(val: u32, bit_len: u8) -> i32 { + assert!(bit_len <= 32); + + if val < 1u32 << (bit_len - 1) as u32 { + // val is positive + val as i32 + } else { + ((0x1_0000_0000 - (1u64 << bit_len)) as u32 + val) as i32 + } +} + +/// Defines a comparable new type `$typ` on a `[std::num::Wrapping]::`. +/// +/// The new type will wrap-around on additions and substractions and it comparison +/// operators take the wrapping in consideration. +/// +/// The comparison algorithm uses [serial number arithmetic](serial-number-arithmetic). +/// The limit being that it can't tell whether 0x8000_0000 is greater or less than 0. +/// +/// # Examples +/// +/// ```rust +/// # use gstrsrtp::define_wrapping_comparable_u32; +/// +/// /// Error type to return when comparing 0x8000_0000 to 0. +/// struct RTPTimestampComparisonLimit; +/// +/// /// Define the new type comparable and wrapping `u32` `RTPTimestamp`: +/// define_wrapping_comparable_u32!(RTPTimestamp, RTPTimestampComparisonLimit); +/// +/// let ts0 = RTPTimestamp::ZERO; +/// assert!(ts0.is_zero()); +/// +/// let mut ts = ts0; +/// ts += 1; +/// assert_eq!(*ts, 1); +/// assert_eq!(RTPTimestamp::MAX + ts, ts0); +/// +/// let ts2: RTPTimestamp = 2.into(); +/// assert_eq!(*ts2, 2); +/// assert_eq!(ts - ts2, RTPTimestamp::MAX); +/// ``` +/// +/// [serial-number-arithmetic]: http://en.wikipedia.org/wiki/Serial_number_arithmetic +#[macro_export] +macro_rules! define_wrapping_comparable_u32 { + ($typ:ident) => { + #[derive(Clone, Copy, Debug, Default)] + pub struct $typ(std::num::Wrapping); + + impl $typ { + pub const ZERO: $typ = $typ(std::num::Wrapping(0)); + pub const MIN: $typ = $typ(std::num::Wrapping(u32::MIN)); + pub const MAX: $typ = $typ(std::num::Wrapping(u32::MAX)); + pub const NONE: Option<$typ> = None; + + #[inline] + pub const fn new(val: u32) -> Self { + Self(std::num::Wrapping((val))) + } + + #[inline] + pub fn from_ext(ext_val: u64) -> Self { + Self(std::num::Wrapping((ext_val & 0xffff_ffff) as u32)) + } + + #[inline] + pub fn is_zero(self) -> bool { + self.0 .0 == 0 + } + + #[inline] + pub fn distance(self, other: Self) -> Option { + self.distance_u32(other.0 .0) + } + + #[inline] + pub fn distance_u32(self, other: u32) -> Option { + // See http://en.wikipedia.org/wiki/Serial_number_arithmetic + + let this = i32::from_ne_bytes(self.0 .0.to_ne_bytes()); + let other = i32::from_ne_bytes(other.to_ne_bytes()); + + match this.wrapping_sub(other) { + -0x8000_0000 => { + // This is the limit of the algorithm: + // arguments are too far away to determine the result sign, + // i.e. which one is greater than the other + None + } + delta => Some(delta), + } + } + } + + impl From for $typ { + fn from(value: u32) -> Self { + Self(std::num::Wrapping(value)) + } + } + + impl From<$typ> for u32 { + fn from(value: $typ) -> Self { + value.0 .0 + } + } + + impl std::ops::Deref for $typ { + type Target = u32; + + fn deref(&self) -> &u32 { + &self.0 .0 + } + } + + impl std::ops::Add for $typ { + type Output = Self; + fn add(self, rhs: Self) -> Self { + Self(self.0.add(rhs.0)) + } + } + + impl std::ops::Add for $typ { + type Output = Self; + fn add(self, rhs: u32) -> Self { + Self(self.0.add(std::num::Wrapping(rhs))) + } + } + + impl std::ops::Add for $typ { + type Output = Self; + fn add(self, rhs: i32) -> Self { + // See http://en.wikipedia.org/wiki/Serial_number_arithmetic + + let this = i32::from_ne_bytes(self.0 .0.to_ne_bytes()); + let res = this.wrapping_add(rhs); + + let res = u32::from_ne_bytes(res.to_ne_bytes()); + Self(std::num::Wrapping(res)) + } + } + + impl std::ops::AddAssign for $typ { + fn add_assign(&mut self, rhs: Self) { + self.0.add_assign(rhs.0); + } + } + + impl std::ops::AddAssign for $typ { + fn add_assign(&mut self, rhs: u32) { + self.0.add_assign(std::num::Wrapping(rhs)); + } + } + + impl std::ops::AddAssign for $typ { + fn add_assign(&mut self, rhs: i32) { + *self = *self + rhs; + } + } + + impl std::ops::Sub for $typ { + type Output = Self; + fn sub(self, rhs: Self) -> Self { + self.sub(rhs.0 .0) + } + } + + impl std::ops::Sub for $typ { + type Output = Self; + fn sub(self, rhs: u32) -> Self { + Self(self.0.sub(std::num::Wrapping(rhs))) + } + } + + impl std::ops::SubAssign for $typ { + fn sub_assign(&mut self, rhs: Self) { + self.sub_assign(rhs.0 .0); + } + } + + impl std::ops::SubAssign for $typ { + fn sub_assign(&mut self, rhs: u32) { + self.0.sub_assign(std::num::Wrapping(rhs)); + } + } + + impl std::cmp::PartialEq for $typ { + fn eq(&self, other: &Self) -> bool { + self.0 .0 == other.0 .0 + } + } + + impl std::cmp::PartialEq for $typ { + fn eq(&self, other: &u32) -> bool { + self.0 .0 == *other + } + } + + impl std::cmp::Eq for $typ {} + + impl std::cmp::PartialOrd for $typ { + fn partial_cmp(&self, other: &Self) -> Option { + self.distance(*other).map(|d| d.cmp(&0)) + } + } + + impl gst::prelude::OptionOperations for $typ {} + }; + + ($typ:ident, $comp_err_type:ident) => { + define_wrapping_comparable_u32!($typ); + + impl $typ { + #[inline] + pub fn try_cmp(&self, other: $typ) -> Result { + self.partial_cmp(&other).ok_or($comp_err_type) + } + } + }; + + ($typ:ident, $err_enum:ty, $comp_err_variant:ident) => { + define_wrapping_comparable_u32!($typ); + + impl $typ { + #[inline] + pub fn try_cmp(&self, other: $typ) -> Result { + self.partial_cmp(&other) + .ok_or(<$err_enum>::$comp_err_variant) + } + } + }; +} + +#[macro_export] +macro_rules! define_wrapping_comparable_u32_with_display { + ($typ:ident, impl) => { + impl std::fmt::Display for $typ { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_fmt(format_args!("{}", self.0 .0)) + } + } + }; + + ($typ:ident) => { + define_wrapping_comparable_u32!($typ); + define_wrapping_comparable_u32_with_display!($typ, impl); + }; + + ($typ:ident, $comp_err_type:ty) => { + define_wrapping_comparable_u32!($typ, $comp_err_type); + define_wrapping_comparable_u32_with_display!($typ, impl); + }; + + ($typ:ident, $err_enum:ty, $comp_err_variant:ident,) => { + define_wrapping_comparable_u32!($typ, $err_enum, $comp_err_variant); + define_wrapping_comparable_u32_with_display!($typ, impl); + }; +} + #[cfg(test)] mod tests { use super::*; + define_wrapping_comparable_u32!(MyWrapper); #[test] fn compare_seqnums() { @@ -30,4 +295,185 @@ mod tests { assert_eq!(seqnum_distance(0x8000, 0), -0x8000); assert_eq!(seqnum_distance(0, 0x8000), -0x8000); } + + #[test] + fn raw_2_comp_12bits_to_i32() { + const BITS: u8 = 12; + assert_eq!(raw_2_comp_to_i32(0, BITS), 0); + assert_eq!(raw_2_comp_to_i32(1, BITS), 1); + assert_eq!(raw_2_comp_to_i32(2, BITS), 2); + assert_eq!(raw_2_comp_to_i32(0xfff, BITS), -1i16 as i32); + assert_eq!(raw_2_comp_to_i32(0xffe, BITS), -2i16 as i32); + assert_eq!(raw_2_comp_to_i32(0x7ff, BITS), (1 << (BITS - 1)) - 1); + assert_eq!(raw_2_comp_to_i32(0x800, BITS), -(1 << (BITS - 1))); + } + + #[test] + fn raw_2_comp_16bits_to_i32() { + const BITS: u8 = i16::BITS as u8; + assert_eq!(raw_2_comp_to_i32(0, BITS), 0); + assert_eq!(raw_2_comp_to_i32(1, BITS), 1); + assert_eq!(raw_2_comp_to_i32(2, BITS), 2); + assert_eq!(raw_2_comp_to_i32(0xffff, BITS), -1i16 as i32); + assert_eq!(raw_2_comp_to_i32(0xfffe, BITS), -2i16 as i32); + assert_eq!(raw_2_comp_to_i32(0x7fff, BITS), i16::MAX as i32); + assert_eq!(raw_2_comp_to_i32(0x8000, BITS), i16::MIN as i32); + } + + #[test] + fn raw_2_comp_32bits_to_i32() { + const BITS: u8 = i32::BITS as u8; + assert_eq!(raw_2_comp_to_i32(0, BITS), 0); + assert_eq!(raw_2_comp_to_i32(1, BITS), 1); + assert_eq!(raw_2_comp_to_i32(2, BITS), 2); + assert_eq!(raw_2_comp_to_i32(0xffff_ffff, BITS), -1i16 as i32); + assert_eq!(raw_2_comp_to_i32(0xffff_fffe, BITS), -2i16 as i32); + assert_eq!(raw_2_comp_to_i32(0x7fff_ffff, BITS), i32::MAX); + assert_eq!(raw_2_comp_to_i32(0x8000_0000, BITS), i32::MIN); + } + + #[test] + fn wrapping_u32_basics() { + let zero = MyWrapper::ZERO; + let one = MyWrapper::from(1); + let two = MyWrapper::from(2); + + assert_eq!(u32::from(zero), 0); + assert!(zero.is_zero()); + assert_eq!(u32::from(one), 1); + assert_eq!(u32::from(two), 2); + + let max_plus_1_u64 = MyWrapper::from_ext((u32::MAX as u64) + 1); + assert_eq!(max_plus_1_u64, MyWrapper::ZERO); + } + + #[test] + fn add_wrapping_u32() { + let one = MyWrapper::from(1); + let two = MyWrapper::from(2); + + assert_eq!(MyWrapper::ZERO + one, one); + assert_eq!(MyWrapper::ZERO + 1u32, one); + assert_eq!(one + one, two); + assert_eq!(one + 1u32, two); + + assert_eq!(MyWrapper::MAX + MyWrapper::ZERO, MyWrapper::MAX); + assert_eq!(MyWrapper::MAX + one, MyWrapper::ZERO); + assert_eq!(MyWrapper::MAX + two, one); + + let mut var = MyWrapper::ZERO; + assert!(var.is_zero()); + var += 1; + assert_eq!(var, one); + var += one; + assert_eq!(var, two); + + let mut var = MyWrapper::MAX; + var += 1; + assert!(var.is_zero()); + var += one; + assert_eq!(var, one); + } + + #[test] + fn add_wrapping_u32_i32() { + let one = MyWrapper::from(1); + + assert_eq!(MyWrapper::ZERO + 1i32, one); + assert_eq!(MyWrapper::ZERO + -1i32, MyWrapper::MAX); + assert_eq!(MyWrapper::MAX + 1i32, MyWrapper::ZERO); + assert_eq!(MyWrapper::MAX + 2i32, one); + + assert_eq!( + MyWrapper::from(0x8000_0000) + -0i32, + MyWrapper::from(0x8000_0000) + ); + assert_eq!( + MyWrapper::from(0x8000_0000) + 1i32, + MyWrapper::from(0x8000_0001) + ); + assert_eq!( + MyWrapper::from(0x8000_0000) + -1i32, + MyWrapper::from(0x7fff_ffff) + ); + assert_eq!( + MyWrapper::from(0x7fff_ffff) + 1i32, + MyWrapper::from(0x8000_0000) + ); + assert_eq!(MyWrapper::ZERO + i32::MIN, MyWrapper::from(0x8000_0000)); + + let mut var = MyWrapper::ZERO; + var += 1i32; + assert_eq!(var, one); + + let mut var = MyWrapper::ZERO; + var += -1i32; + assert_eq!(var, MyWrapper::MAX); + + let mut var = MyWrapper::MAX; + var += 1; + assert_eq!(var, MyWrapper::ZERO); + } + + #[test] + fn sub_wrapping_u32() { + let one = MyWrapper::from(1); + + assert_eq!(MyWrapper::ZERO - MyWrapper::ZERO, MyWrapper::ZERO); + assert_eq!(MyWrapper::MAX - MyWrapper::MAX, MyWrapper::ZERO); + assert_eq!(MyWrapper::ZERO - one, MyWrapper::MAX); + assert_eq!(MyWrapper::ZERO - MyWrapper::MAX, one); + assert_eq!( + MyWrapper::ZERO - MyWrapper::from(0x8000_0000), + MyWrapper::from(0x8000_0000) + ); + assert_eq!( + MyWrapper::from(0x8000_0000) - MyWrapper::ZERO, + MyWrapper::from(0x8000_0000) + ); + + let mut var = MyWrapper::ZERO; + assert!(var.is_zero()); + var -= 1; + assert_eq!(var, MyWrapper::MAX); + + let mut var = MyWrapper::MAX; + var -= MyWrapper::MAX; + assert!(var.is_zero()); + } + + #[test] + fn compare_wrapping_u32() { + use std::cmp::Ordering::*; + + #[derive(Debug, PartialEq)] + pub struct ComparisonLimit; + define_wrapping_comparable_u32!(MyWrapper, ComparisonLimit); + + let cmp = |a: u32, b: u32| MyWrapper::from(a).partial_cmp(&MyWrapper::from(b)); + let try_cmp = |a: u32, b: u32| MyWrapper::from(a).try_cmp(MyWrapper::from(b)); + + assert_eq!(cmp(0, 1).unwrap(), Less); + assert_eq!(try_cmp(0, 1), Ok(Less)); + assert_eq!(cmp(1, 1).unwrap(), Equal); + assert_eq!(try_cmp(1, 1), Ok(Equal)); + assert_eq!(cmp(1, 0).unwrap(), Greater); + assert_eq!(try_cmp(1, 0), Ok(Greater)); + + assert_eq!(cmp(0x7fff_ffff, 0).unwrap(), Greater); + assert_eq!(try_cmp(0x7fff_ffff, 0), Ok(Greater)); + assert_eq!(cmp(0xffff_ffff, 0).unwrap(), Less); + assert_eq!(try_cmp(0xffff_ffff, 0), Ok(Less)); + + assert_eq!(cmp(0, 0x7fff_ffff).unwrap(), Less); + assert_eq!(try_cmp(0, 0x7fff_ffff), Ok(Less)); + assert_eq!(cmp(0, 0xffff_ffff).unwrap(), Greater); + assert_eq!(try_cmp(0, 0xffff_ffff), Ok(Greater)); + + // This is the limit of the algorithm: + assert!(cmp(0x8000_0000, 0).is_none()); + assert!(cmp(0, 0x8000_0000).is_none()); + assert_eq!(try_cmp(0x8000_0000, 0), Err(ComparisonLimit)); + assert_eq!(try_cmp(0, 0x8000_0000), Err(ComparisonLimit)); + } }