diff --git a/net/ndi/Cargo.toml b/net/ndi/Cargo.toml index 5fee6b15..b35c994a 100644 --- a/net/ndi/Cargo.toml +++ b/net/ndi/Cargo.toml @@ -9,11 +9,11 @@ edition = "2021" rust-version = "1.70" [dependencies] -glib = { git = "https://github.com/gtk-rs/gtk-rs-core"} -gst = { package = "gstreamer", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" } -gst-base = { package = "gstreamer-base", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" } -gst-audio = { package = "gstreamer-audio", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" } -gst-video = { package = "gstreamer-video", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" } +glib = { git = "https://github.com/gtk-rs/gtk-rs-core" } +gst = { package = "gstreamer", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_16"] } +gst-base = { package = "gstreamer-base", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_16"] } +gst-audio = { package = "gstreamer-audio", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_16"] } +gst-video = { package = "gstreamer-video", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_16"] } anyhow = "1.0" byte-slice-cast = "1" byteorder = "1.0" @@ -28,8 +28,7 @@ thiserror = "1.0" gst-plugin-version-helper = { path = "../../version-helper" } [features] -default = ["interlaced-fields", "sink"] -interlaced-fields = ["gst/v1_16", "gst-video/v1_16"] +default = ["sink"] sink = ["gst/v1_18", "gst-base/v1_18"] advanced-sdk = [] static = [] diff --git a/net/ndi/src/lib.rs b/net/ndi/src/lib.rs index abed3688..4def1f96 100644 --- a/net/ndi/src/lib.rs +++ b/net/ndi/src/lib.rs @@ -32,10 +32,11 @@ use gst::prelude::*; use gst::glib::once_cell::sync::Lazy; -#[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Clone, Copy, glib::Enum)] +#[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Clone, Copy, glib::Enum, Default)] #[repr(u32)] #[enum_type(name = "GstNdiTimestampMode")] pub enum TimestampMode { + #[default] #[enum_value(name = "Auto", nick = "auto")] Auto = 0, #[enum_value(name = "Receive Time / Timecode", nick = "receive-time-vs-timecode")] diff --git a/net/ndi/src/ndi.rs b/net/ndi/src/ndi.rs index fa0131fa..0ea4d026 100644 --- a/net/ndi/src/ndi.rs +++ b/net/ndi/src/ndi.rs @@ -257,7 +257,7 @@ impl<'a> RecvBuilder<'a> { } } -#[derive(Debug, Clone)] +#[derive(Debug)] struct RecvInstancePtr(ptr::NonNull<::std::os::raw::c_void>); impl Drop for RecvInstancePtr { @@ -836,13 +836,11 @@ impl VideoFrame { NDIlib_frame_format_type_e::NDIlib_frame_format_type_interleaved } // FIXME: Is this correct? - #[cfg(feature = "interlaced-fields")] gst_video::VideoInterlaceMode::Alternate if frame.flags().contains(gst_video::VideoFrameFlags::TFF) => { NDIlib_frame_format_type_e::NDIlib_frame_format_type_field_0 } - #[cfg(feature = "interlaced-fields")] gst_video::VideoInterlaceMode::Alternate if !frame.flags().contains(gst_video::VideoFrameFlags::TFF) => { diff --git a/net/ndi/src/ndi_cc_meta.rs b/net/ndi/src/ndi_cc_meta.rs index 4c83a325..120c183b 100644 --- a/net/ndi/src/ndi_cc_meta.rs +++ b/net/ndi/src/ndi_cc_meta.rs @@ -268,12 +268,11 @@ impl NDICCMetaDecoder { /// Decodes the provided NDI metadata string, searching for NDI closed captions /// and add them as `VideoCaptionMeta` to the provided `gst::Buffer`. - pub fn decode(&mut self, input: &str, buffer: &mut gst::Buffer) -> Result<()> { + pub fn decode(&mut self, input: &str) -> Result> { use quick_xml::events::Event; use quick_xml::reader::Reader; - let buffer = buffer.get_mut().unwrap(); - + let mut captions = Vec::new(); let mut reader = Reader::from_str(input); self.xml_buf.clear(); @@ -293,11 +292,7 @@ impl NDICCMetaDecoder { Ok(v210_buf) => match self.parse_for_cea608(&v210_buf) { Ok(None) => (), Ok(Some(anc)) => { - gst_video::VideoCaptionMeta::add( - buffer, - gst_video::VideoCaptionType::Cea608S3341a, - anc.data(), - ); + captions.push(anc); } Err(err) => { gst::error!(CAT, "Failed to parse NDI C608 metadata: {err}"); @@ -311,11 +306,7 @@ impl NDICCMetaDecoder { Ok(v210_buf) => match self.parse_for_cea708(&v210_buf) { Ok(None) => (), Ok(Some(anc)) => { - gst_video::VideoCaptionMeta::add( - buffer, - gst_video::VideoCaptionType::Cea708Cdp, - anc.data(), - ); + captions.push(anc); } Err(err) => { gst::error!(CAT, "Failed to parse NDI C708 metadata: {err}"); @@ -333,7 +324,7 @@ impl NDICCMetaDecoder { self.xml_buf.clear(); } - Ok(()) + Ok(captions) } fn parse_for_cea608(&mut self, input: &[u8]) -> Result> { @@ -510,39 +501,36 @@ mod tests { fn decode_ndi_meta_c608() { gst::init().unwrap(); - let mut buf = gst::Buffer::new(); let mut ndi_cc_decoder = NDICCMetaDecoder::new(1920); - ndi_cc_decoder - .decode( - "AAAAAP8D8D8AhAUAAgEwIAAABgCUAcASAJgKAAAAAAA=", - &mut buf, - ) + let captions = ndi_cc_decoder + .decode("AAAAAP8D8D8AhAUAAgEwIAAABgCUAcASAJgKAAAAAAA=") .unwrap(); - let mut cc_meta_iter = buf.iter_meta::(); - let cc_meta = cc_meta_iter.next().unwrap(); - assert_eq!(cc_meta.caption_type(), VideoCaptionType::Cea608S3341a); - assert_eq!(cc_meta.data(), [0x80, 0x94, 0x2c]); - assert!(cc_meta_iter.next().is_none()); + assert_eq!(captions.len(), 1); + assert_eq!( + captions[0].did16(), + gst_video::VideoAncillaryDID16::S334Eia608 + ); + assert_eq!(captions[0].data(), [0x80, 0x94, 0x2c]); } #[test] fn decode_ndi_meta_c708() { gst::init().unwrap(); - let mut buf = gst::Buffer::new(); let mut ndi_cc_decoder = NDICCMetaDecoder::new(1920); - ndi_cc_decoder.decode( + let captions = ndi_cc_decoder.decode( "AAAAAP8D8D8AhAUAAQFQJQBYCgBpAlAlAPwIAEMBACAAAAgAcgKAHwDwCwCUAcASAOQLAAACACAA6AsAAAIAIADoCwAAAgAgAOgLAAACACAA6AsAAAIAIADoCwAAAgAgAOgLAAACACAA6AsAAAIAIADoCwAAAgAgAOgLAAACACAA6AsAAAIAIADoCwAAAgAgAOgLAAACACAA6AsAAAIAIADoCwAAAgAgAOgLAAACACAA6AsAAAIAIADoCwAAAgAgAOgLAAACACAA6AsAAAIAIADoCwAAAgAgAOgLAAACACAA6AsAAAIAIADQCQAAAgAgAGwIALcCAAAAAAAAAAAAAA==", - &mut buf, ) .unwrap(); - let mut cc_meta_iter = buf.iter_meta::(); - let cc_meta = cc_meta_iter.next().unwrap(); - assert_eq!(cc_meta.caption_type(), VideoCaptionType::Cea708Cdp); + assert_eq!(captions.len(), 1); assert_eq!( - cc_meta.data(), + captions[0].did16(), + gst_video::VideoAncillaryDID16::S334Eia708 + ); + assert_eq!( + captions[0].data(), [ 0x96, 0x69, 0x55, 0x3f, 0x43, 0x00, 0x00, 0x72, 0xf8, 0xfc, 0x94, 0x2c, 0xf9, 0x00, 0x00, 0xfa, 0x00, 0x00, 0xfa, 0x00, 0x00, 0xfa, 0x00, 0x00, 0xfa, 0x00, 0x00, 0xfa, @@ -553,16 +541,14 @@ mod tests { 0x1b, ] ); - assert!(cc_meta_iter.next().is_none()); } #[test] fn decode_ndi_meta_c708_newlines_and_indent() { gst::init().unwrap(); - let mut buf = gst::Buffer::new(); let mut ndi_cc_decoder = NDICCMetaDecoder::new(1920); - ndi_cc_decoder + let captions = ndi_cc_decoder .decode( r#" AAAAAP8D8D8AhAUAAQFQJQBYCgBpAlAlAPwIAEMBACAAAAgAcgKAHwDwCwCUAcASAOQ @@ -572,15 +558,16 @@ mod tests { 6AsAAAIAIADoCwAAAgAgAOgLAAACACAA6AsAAAIAIADQCQAAAgAgAGwIALcCAAAAAAA AAAAAAA== "#, - &mut buf, ) .unwrap(); - let mut cc_meta_iter = buf.iter_meta::(); - let cc_meta = cc_meta_iter.next().unwrap(); - assert_eq!(cc_meta.caption_type(), VideoCaptionType::Cea708Cdp); + assert_eq!(captions.len(), 1); assert_eq!( - cc_meta.data(), + captions[0].did16(), + gst_video::VideoAncillaryDID16::S334Eia708 + ); + assert_eq!( + captions[0].data(), [ 0x96, 0x69, 0x55, 0x3f, 0x43, 0x00, 0x00, 0x72, 0xf8, 0xfc, 0x94, 0x2c, 0xf9, 0x00, 0x00, 0xfa, 0x00, 0x00, 0xfa, 0x00, 0x00, 0xfa, 0x00, 0x00, 0xfa, 0x00, 0x00, 0xfa, @@ -591,51 +578,49 @@ mod tests { 0x1b, ] ); - assert!(cc_meta_iter.next().is_none()); } #[test] fn decode_ndi_meta_c608_newlines_spaces_inline() { gst::init().unwrap(); - let mut buf = gst::Buffer::new(); let mut ndi_cc_decoder = NDICCMetaDecoder::new(1920); - ndi_cc_decoder.decode( + let captions = ndi_cc_decoder.decode( "\n\tAAAAAP8D8\n\n\r D8AhAUA\r\n\tAgEwIAAABgCUAcASAJgKAAAAAAA= \n", - &mut buf, ) .unwrap(); - let mut cc_meta_iter = buf.iter_meta::(); - let cc_meta = cc_meta_iter.next().unwrap(); - assert_eq!(cc_meta.caption_type(), VideoCaptionType::Cea608S3341a); - assert_eq!(cc_meta.data(), [0x80, 0x94, 0x2c]); - - assert!(cc_meta_iter.next().is_none()); + assert_eq!(captions.len(), 1); + assert_eq!( + captions[0].did16(), + gst_video::VideoAncillaryDID16::S334Eia608 + ); + assert_eq!(captions[0].data(), [0x80, 0x94, 0x2c]); } #[test] fn decode_ndi_meta_c608_and_c708() { gst::init().unwrap(); - let mut buf = gst::Buffer::new(); let mut ndi_cc_decoder = NDICCMetaDecoder::new(1920); - ndi_cc_decoder.decode( + let captions = ndi_cc_decoder.decode( "AAAAAP8D8D8AhAUAAgEwIAAABgCUAcASAJgKAAAAAAA=AAAAAP8D8D8AhAUAAQFQJQBYCgBpAlAlAPwIAEMBACAAAAgAcgKAHwDwCwCUAcASAOQLAAACACAA6AsAAAIAIADoCwAAAgAgAOgLAAACACAA6AsAAAIAIADoCwAAAgAgAOgLAAACACAA6AsAAAIAIADoCwAAAgAgAOgLAAACACAA6AsAAAIAIADoCwAAAgAgAOgLAAACACAA6AsAAAIAIADoCwAAAgAgAOgLAAACACAA6AsAAAIAIADoCwAAAgAgAOgLAAACACAA6AsAAAIAIADoCwAAAgAgAOgLAAACACAA6AsAAAIAIADQCQAAAgAgAGwIALcCAAAAAAAAAAAAAA==", - &mut buf, ) .unwrap(); - let mut cc_meta_iter = buf.iter_meta::(); - - let cc_meta = cc_meta_iter.next().unwrap(); - assert_eq!(cc_meta.caption_type(), VideoCaptionType::Cea608S3341a); - assert_eq!(cc_meta.data(), [0x80, 0x94, 0x2c]); - - let cc_meta = cc_meta_iter.next().unwrap(); - assert_eq!(cc_meta.caption_type(), VideoCaptionType::Cea708Cdp); + assert_eq!(captions.len(), 2); assert_eq!( - cc_meta.data(), + captions[0].did16(), + gst_video::VideoAncillaryDID16::S334Eia608 + ); + assert_eq!(captions[0].data(), [0x80, 0x94, 0x2c]); + + assert_eq!( + captions[1].did16(), + gst_video::VideoAncillaryDID16::S334Eia708 + ); + assert_eq!( + captions[1].data(), [ 0x96, 0x69, 0x55, 0x3f, 0x43, 0x00, 0x00, 0x72, 0xf8, 0xfc, 0x94, 0x2c, 0xf9, 0x00, 0x00, 0xfa, 0x00, 0x00, 0xfa, 0x00, 0x00, 0xfa, 0x00, 0x00, 0xfa, 0x00, 0x00, 0xfa, @@ -646,8 +631,6 @@ mod tests { 0x1b, ] ); - - assert!(cc_meta_iter.next().is_none()); } #[test] @@ -655,13 +638,9 @@ mod tests { gst::init().unwrap(); // Expecting found ' - let mut buf = gst::Buffer::new(); let mut ndi_cc_decoder = NDICCMetaDecoder::new(1920); ndi_cc_decoder - .decode( - "AAAAAP8D8D8AhAUAAgEwIAAABgCUAcASAJgKAAAAAAA=", - &mut buf, - ) + .decode("AAAAAP8D8D8AhAUAAgEwIAAABgCUAcASAJgKAAAAAAA=") .unwrap_err(); } } diff --git a/net/ndi/src/ndisrc/imp.rs b/net/ndi/src/ndisrc/imp.rs index 04c06e3e..b2bdea7a 100644 --- a/net/ndi/src/ndisrc/imp.rs +++ b/net/ndi/src/ndisrc/imp.rs @@ -11,12 +11,13 @@ use std::u32; use gst::glib::once_cell::sync::Lazy; +use crate::ndisrcmeta::NdiSrcMeta; use crate::ndisys; use crate::RecvColorFormat; use crate::TimestampMode; -use super::receiver::{self, Buffer, Receiver, ReceiverControlHandle, ReceiverItem}; -use crate::ndisrcmeta; +use super::receiver::{Receiver, ReceiverControlHandle, ReceiverItem}; +use crate::ndisrcmeta::Buffer; static CAT: Lazy = Lazy::new(|| { gst::DebugCategory::new( @@ -63,26 +64,11 @@ impl Default for Settings { } } +#[derive(Default)] struct State { - video_info: Option, - video_caps: Option, - audio_info: Option, - audio_caps: Option, - current_latency: Option, receiver: Option, -} - -impl Default for State { - fn default() -> State { - State { - video_info: None, - video_caps: None, - audio_info: None, - audio_caps: None, - current_latency: gst::ClockTime::NONE, - receiver: None, - } - } + timestamp_mode: TimestampMode, + current_latency: Option, } pub struct NdiSrc { @@ -447,7 +433,6 @@ impl BaseSrcImpl for NdiSrc { settings.connect_timeout, settings.bandwidth, settings.color_format.into(), - settings.timestamp_mode, settings.timeout, settings.max_queue_length as usize, ); @@ -462,6 +447,7 @@ impl BaseSrcImpl for NdiSrc { Some(receiver.receiver_control_handle()); let mut state = self.state.lock().unwrap(); state.receiver = Some(receiver); + state.timestamp_mode = settings.timestamp_mode; Ok(()) } @@ -537,72 +523,32 @@ impl BaseSrcImpl for NdiSrc { state.receiver = Some(recv); match res { - ReceiverItem::Buffer(buffer) => { - let buffer = match buffer { - Buffer::Audio(mut buffer, info) => { - if state.audio_info.as_ref() != Some(&info) { - let caps = info.to_caps().map_err(|_| { - gst::element_imp_error!( - self, - gst::ResourceError::Settings, - ["Invalid audio info received: {:?}", info] - ); - gst::FlowError::NotNegotiated - })?; - state.audio_info = Some(info); - state.audio_caps = Some(caps); - } + ReceiverItem::Buffer(ndi_buffer) => { + let mut latency_changed = false; - { - let buffer = buffer.get_mut().unwrap(); - ndisrcmeta::NdiSrcMeta::add( - buffer, - ndisrcmeta::StreamType::Audio, - state.audio_caps.as_ref().unwrap(), - ); - } + if let Buffer::Video { ref frame, .. } = ndi_buffer { + let duration = gst::ClockTime::SECOND + .mul_div_floor(frame.frame_rate().1 as u64, frame.frame_rate().0 as u64); - buffer - } - Buffer::Video(mut buffer, info) => { - let mut latency_changed = false; + latency_changed = state.current_latency != duration; + state.current_latency = duration; + } - if state.video_info.as_ref() != Some(&info) { - let caps = info.to_caps().map_err(|_| { - gst::element_imp_error!( - self, - gst::ResourceError::Settings, - ["Invalid video info received: {:?}", info] - ); - gst::FlowError::NotNegotiated - })?; - state.video_info = Some(info); - state.video_caps = Some(caps); - latency_changed = state.current_latency != buffer.duration(); - state.current_latency = buffer.duration(); - } + let mut gst_buffer = gst::Buffer::new(); + { + let buffer_ref = gst_buffer.get_mut().unwrap(); + NdiSrcMeta::add(buffer_ref, ndi_buffer, state.timestamp_mode); + } - { - let buffer = buffer.get_mut().unwrap(); - ndisrcmeta::NdiSrcMeta::add( - buffer, - ndisrcmeta::StreamType::Video, - state.video_caps.as_ref().unwrap(), - ); - } + drop(state); - drop(state); - if latency_changed { - let _ = self.obj().post_message( - gst::message::Latency::builder().src(&*self.obj()).build(), - ); - } + if latency_changed { + let _ = self + .obj() + .post_message(gst::message::Latency::builder().src(&*self.obj()).build()); + } - buffer - } - }; - - Ok(CreateSuccess::NewBuffer(buffer)) + Ok(CreateSuccess::NewBuffer(gst_buffer)) } ReceiverItem::Timeout => Err(gst::FlowError::Eos), ReceiverItem::Flushing => Err(gst::FlowError::Flushing), diff --git a/net/ndi/src/ndisrc/receiver.rs b/net/ndi/src/ndisrc/receiver.rs index 2dd9b1c8..0e4f93d1 100644 --- a/net/ndi/src/ndisrc/receiver.rs +++ b/net/ndi/src/ndisrc/receiver.rs @@ -2,25 +2,17 @@ use glib::prelude::*; use gst::prelude::*; -use gst_video::prelude::*; -use byte_slice_cast::*; - -use std::cmp; use std::collections::VecDeque; use std::sync::{Arc, Condvar, Mutex, Weak}; use std::thread; use std::time; -use atomic_refcell::AtomicRefCell; - use gst::glib::once_cell::sync::Lazy; use crate::ndi::*; -use crate::ndi_cc_meta::NDICCMetaDecoder; -use crate::ndisys; +use crate::ndisrcmeta::Buffer; use crate::ndisys::*; -use crate::TimestampMode; static CAT: Lazy = Lazy::new(|| { gst::DebugCategory::new( @@ -32,170 +24,6 @@ static CAT: Lazy = Lazy::new(|| { pub struct Receiver(Arc); -#[derive(Debug, PartialEq, Eq)] -#[allow(clippy::large_enum_variant)] -pub enum AudioInfo { - Audio(gst_audio::AudioInfo), - #[cfg(feature = "advanced-sdk")] - #[allow(dead_code)] - Opus { - sample_rate: i32, - no_channels: i32, - }, - #[cfg(feature = "advanced-sdk")] - Aac { - sample_rate: i32, - no_channels: i32, - codec_data: [u8; 2], - }, -} - -impl AudioInfo { - pub fn to_caps(&self) -> Result { - match self { - AudioInfo::Audio(ref info) => info.to_caps(), - #[cfg(feature = "advanced-sdk")] - AudioInfo::Opus { - sample_rate, - no_channels, - } => Ok(gst::Caps::builder("audio/x-opus") - .field("channels", *no_channels) - .field("rate", *sample_rate) - .field("channel-mapping-family", 0i32) - .build()), - #[cfg(feature = "advanced-sdk")] - AudioInfo::Aac { - sample_rate, - no_channels, - codec_data, - } => Ok(gst::Caps::builder("audio/mpeg") - .field("channels", *no_channels) - .field("rate", *sample_rate) - .field("mpegversion", 4i32) - .field("stream-format", "raw") - .field("codec_data", gst::Buffer::from_mut_slice(*codec_data)) - .build()), - } - } -} - -#[derive(Debug, PartialEq, Eq)] -pub enum VideoInfo { - Video(gst_video::VideoInfo), - #[cfg(feature = "advanced-sdk")] - SpeedHQInfo { - variant: String, - xres: i32, - yres: i32, - fps_n: i32, - fps_d: i32, - par_n: i32, - par_d: i32, - interlace_mode: gst_video::VideoInterlaceMode, - }, - #[cfg(feature = "advanced-sdk")] - H264 { - xres: i32, - yres: i32, - fps_n: i32, - fps_d: i32, - par_n: i32, - par_d: i32, - interlace_mode: gst_video::VideoInterlaceMode, - }, - #[cfg(feature = "advanced-sdk")] - H265 { - xres: i32, - yres: i32, - fps_n: i32, - fps_d: i32, - par_n: i32, - par_d: i32, - interlace_mode: gst_video::VideoInterlaceMode, - }, -} - -impl VideoInfo { - pub fn to_caps(&self) -> Result { - match self { - VideoInfo::Video(ref info) => info.to_caps(), - #[cfg(feature = "advanced-sdk")] - VideoInfo::SpeedHQInfo { - ref variant, - xres, - yres, - fps_n, - fps_d, - par_n, - par_d, - interlace_mode, - } => Ok(gst::Caps::builder("video/x-speedhq") - .field("width", *xres) - .field("height", *yres) - .field("framerate", gst::Fraction::new(*fps_n, *fps_d)) - .field("pixel-aspect-ratio", gst::Fraction::new(*par_n, *par_d)) - .field("interlace-mode", interlace_mode.to_str()) - .field("variant", variant) - .build()), - #[cfg(feature = "advanced-sdk")] - VideoInfo::H264 { - xres, - yres, - fps_n, - fps_d, - par_n, - par_d, - interlace_mode, - .. - } => Ok(gst::Caps::builder("video/x-h264") - .field("width", *xres) - .field("height", *yres) - .field("framerate", gst::Fraction::new(*fps_n, *fps_d)) - .field("pixel-aspect-ratio", gst::Fraction::new(*par_n, *par_d)) - .field("interlace-mode", interlace_mode.to_str()) - .field("stream-format", "byte-stream") - .field("alignment", "au") - .build()), - #[cfg(feature = "advanced-sdk")] - VideoInfo::H265 { - xres, - yres, - fps_n, - fps_d, - par_n, - par_d, - interlace_mode, - .. - } => Ok(gst::Caps::builder("video/x-h265") - .field("width", *xres) - .field("height", *yres) - .field("framerate", gst::Fraction::new(*fps_n, *fps_d)) - .field("pixel-aspect-ratio", gst::Fraction::new(*par_n, *par_d)) - .field("interlace-mode", interlace_mode.to_str()) - .field("stream-format", "byte-stream") - .field("alignment", "au") - .build()), - } - } - - pub fn width(&self) -> u32 { - match self { - VideoInfo::Video(ref info) => info.width(), - #[cfg(feature = "advanced-sdk")] - VideoInfo::SpeedHQInfo { xres, .. } - | VideoInfo::H264 { xres, .. } - | VideoInfo::H265 { xres, .. } => *xres as u32, - } - } -} - -#[derive(Debug)] -#[allow(clippy::large_enum_variant)] -pub enum Buffer { - Audio(gst::Buffer, AudioInfo), - Video(gst::Buffer, VideoInfo), -} - #[derive(Debug)] #[allow(clippy::large_enum_variant)] pub enum ReceiverItem { @@ -209,17 +37,11 @@ struct ReceiverInner { queue: ReceiverQueue, max_queue_length: usize, - // Audio/video time observations - observations_timestamp: [Observations; 2], - observations_timecode: [Observations; 2], - element: glib::WeakRef, - timestamp_mode: TimestampMode, timeout: u32, connect_timeout: u32, - ndi_cc_decoder: AtomicRefCell>, thread: Mutex>>, } @@ -245,305 +67,6 @@ struct ReceiverQueueInner { timeout: bool, } -const PREFILL_WINDOW_LENGTH: usize = 12; -const WINDOW_LENGTH: u64 = 512; -const WINDOW_DURATION: u64 = 2_000_000_000; - -#[derive(Default)] -struct Observations(AtomicRefCell); - -struct ObservationsInner { - base_remote_time: Option, - base_local_time: Option, - deltas: VecDeque, - min_delta: i64, - skew: i64, - filling: bool, - window_size: usize, - - // Remote/local times for workaround around fundamentally wrong slopes - // This is not reset below and has a bigger window. - times: VecDeque<(u64, u64)>, - slope_correction: (u64, u64), -} - -impl Default for ObservationsInner { - fn default() -> ObservationsInner { - ObservationsInner { - base_local_time: None, - base_remote_time: None, - deltas: VecDeque::new(), - min_delta: 0, - skew: 0, - filling: true, - window_size: 0, - times: VecDeque::new(), - slope_correction: (1, 1), - } - } -} - -impl ObservationsInner { - fn reset(&mut self) { - self.base_local_time = None; - self.base_remote_time = None; - self.deltas = VecDeque::new(); - self.min_delta = 0; - self.skew = 0; - self.filling = true; - self.window_size = 0; - } -} - -impl Observations { - // Based on the algorithm used in GStreamer's rtpjitterbuffer, which comes from - // Fober, Orlarey and Letz, 2005, "Real Time Clock Skew Estimation over Network Delays": - // http://citeseerx.ist.psu.edu/viewdoc/summary?doi=10.1.1.102.1546 - fn process( - &self, - element: &gst::Element, - remote_time: Option, - local_time: gst::ClockTime, - duration: Option, - ) -> Option<(gst::ClockTime, Option, bool)> { - let remote_time = remote_time?.nseconds(); - let local_time = local_time.nseconds(); - - let mut inner = self.0.borrow_mut(); - - gst::trace!( - CAT, - obj: element, - "Local time {}, remote time {}, slope correct {}/{}", - local_time.nseconds(), - remote_time.nseconds(), - inner.slope_correction.0, - inner.slope_correction.1, - ); - - inner.times.push_back((remote_time, local_time)); - while inner - .times - .back() - .unwrap() - .1 - .saturating_sub(inner.times.front().unwrap().1) - > WINDOW_DURATION - { - let _ = inner.times.pop_front(); - } - - // Static remote times - if inner.slope_correction.1 == 0 { - return None; - } - - let remote_time = - remote_time.mul_div_round(inner.slope_correction.0, inner.slope_correction.1)?; - - let (base_remote_time, base_local_time) = - match (inner.base_remote_time, inner.base_local_time) { - (Some(remote), Some(local)) => (remote, local), - _ => { - gst::debug!( - CAT, - obj: element, - "Initializing base time: local {}, remote {}", - local_time.nseconds(), - remote_time.nseconds(), - ); - inner.base_remote_time = Some(remote_time); - inner.base_local_time = Some(local_time); - - return Some((local_time.nseconds(), duration, true)); - } - }; - - if inner.times.len() < PREFILL_WINDOW_LENGTH { - return Some((local_time.nseconds(), duration, false)); - } - - // Check if the slope is simply wrong and try correcting - { - let local_diff = inner - .times - .back() - .unwrap() - .1 - .saturating_sub(inner.times.front().unwrap().1); - let remote_diff = inner - .times - .back() - .unwrap() - .0 - .saturating_sub(inner.times.front().unwrap().0); - - if remote_diff == 0 { - inner.reset(); - inner.base_remote_time = Some(remote_time); - inner.base_local_time = Some(local_time); - - // Static remote times - inner.slope_correction = (0, 0); - return None; - } else { - let slope = local_diff as f64 / remote_diff as f64; - let scaled_slope = - slope * (inner.slope_correction.1 as f64) / (inner.slope_correction.0 as f64); - - // Check for some obviously wrong slopes and try to correct for that - if !(0.5..1.5).contains(&scaled_slope) { - gst::warning!( - CAT, - obj: element, - "Too small/big slope {}, resetting", - scaled_slope - ); - - let discont = !inner.deltas.is_empty(); - inner.reset(); - - if (0.0005..0.0015).contains(&slope) { - // Remote unit was actually 0.1ns - inner.slope_correction = (1, 1000); - } else if (0.005..0.015).contains(&slope) { - // Remote unit was actually 1ns - inner.slope_correction = (1, 100); - } else if (0.05..0.15).contains(&slope) { - // Remote unit was actually 10ns - inner.slope_correction = (1, 10); - } else if (5.0..15.0).contains(&slope) { - // Remote unit was actually 1us - inner.slope_correction = (10, 1); - } else if (50.0..150.0).contains(&slope) { - // Remote unit was actually 10us - inner.slope_correction = (100, 1); - } else if (50.0..150.0).contains(&slope) { - // Remote unit was actually 100us - inner.slope_correction = (1000, 1); - } else if (50.0..150.0).contains(&slope) { - // Remote unit was actually 1ms - inner.slope_correction = (10000, 1); - } else { - inner.slope_correction = (1, 1); - } - - let remote_time = inner - .times - .back() - .unwrap() - .0 - .mul_div_round(inner.slope_correction.0, inner.slope_correction.1)?; - gst::debug!( - CAT, - obj: element, - "Initializing base time: local {}, remote {}, slope correction {}/{}", - local_time.nseconds(), - remote_time.nseconds(), - inner.slope_correction.0, - inner.slope_correction.1, - ); - inner.base_remote_time = Some(remote_time); - inner.base_local_time = Some(local_time); - - return Some((local_time.nseconds(), duration, discont)); - } - } - } - - let remote_diff = remote_time.saturating_sub(base_remote_time); - let local_diff = local_time.saturating_sub(base_local_time); - let delta = (local_diff as i64) - (remote_diff as i64); - - gst::trace!( - CAT, - obj: element, - "Local diff {}, remote diff {}, delta {}", - local_diff.nseconds(), - remote_diff.nseconds(), - delta, - ); - - if (delta > inner.skew && delta - inner.skew > 1_000_000_000) - || (delta < inner.skew && inner.skew - delta > 1_000_000_000) - { - gst::warning!( - CAT, - obj: element, - "Delta {} too far from skew {}, resetting", - delta, - inner.skew - ); - - let discont = !inner.deltas.is_empty(); - - gst::debug!( - CAT, - obj: element, - "Initializing base time: local {}, remote {}", - local_time.nseconds(), - remote_time.nseconds(), - ); - - inner.reset(); - inner.base_remote_time = Some(remote_time); - inner.base_local_time = Some(local_time); - - return Some((local_time.nseconds(), duration, discont)); - } - - if inner.filling { - if inner.deltas.is_empty() || delta < inner.min_delta { - inner.min_delta = delta; - } - inner.deltas.push_back(delta); - - if remote_diff > WINDOW_DURATION || inner.deltas.len() as u64 == WINDOW_LENGTH { - inner.window_size = inner.deltas.len(); - inner.skew = inner.min_delta; - inner.filling = false; - } else { - let perc_time = remote_diff.mul_div_floor(100, WINDOW_DURATION).unwrap() as i64; - let perc_window = (inner.deltas.len() as u64) - .mul_div_floor(100, WINDOW_LENGTH) - .unwrap() as i64; - let perc = cmp::max(perc_time, perc_window); - - inner.skew = (perc * inner.min_delta + ((10_000 - perc) * inner.skew)) / 10_000; - } - } else { - let old = inner.deltas.pop_front().unwrap(); - inner.deltas.push_back(delta); - - if delta <= inner.min_delta { - inner.min_delta = delta; - } else if old == inner.min_delta { - inner.min_delta = inner.deltas.iter().copied().min().unwrap(); - } - - inner.skew = (inner.min_delta + (124 * inner.skew)) / 125; - } - - let out_time = base_local_time + remote_diff; - let out_time = if inner.skew < 0 { - out_time.saturating_sub((-inner.skew) as u64) - } else { - out_time + (inner.skew as u64) - }; - - gst::trace!( - CAT, - obj: element, - "Skew {}, min delta {}", - inner.skew, - inner.min_delta - ); - gst::trace!(CAT, obj: element, "Outputting {}", out_time.nseconds()); - - Some((out_time.nseconds(), duration, false)) - } -} - #[derive(Clone)] pub struct ReceiverControlHandle { queue: ReceiverQueue, @@ -586,7 +109,6 @@ impl Drop for ReceiverInner { impl Receiver { fn new( recv: RecvInstance, - timestamp_mode: TimestampMode, timeout: u32, connect_timeout: u32, max_queue_length: usize, @@ -605,13 +127,9 @@ impl Receiver { Condvar::new(), ))), max_queue_length, - observations_timestamp: Default::default(), - observations_timecode: Default::default(), element: element.downgrade(), - timestamp_mode, timeout, connect_timeout, - ndi_cc_decoder: AtomicRefCell::new(None), thread: Mutex::new(None), })); @@ -699,7 +217,6 @@ impl Receiver { connect_timeout: u32, bandwidth: NDIlib_recv_bandwidth_e, color_format: NDIlib_recv_color_format_e, - timestamp_mode: TimestampMode, timeout: u32, max_queue_length: usize, ) -> Option { @@ -740,14 +257,7 @@ impl Receiver { recv.send_metadata(&enable_hw_accel); // This will set info.audio/video accordingly - let receiver = Receiver::new( - recv, - timestamp_mode, - timeout, - connect_timeout, - max_queue_length, - element, - ); + let receiver = Receiver::new(recv, timeout, connect_timeout, max_queue_length, element); Some(receiver) } @@ -757,7 +267,6 @@ impl Receiver { let mut first_audio_frame = true; let mut first_frame = true; let mut timer = time::Instant::now(); - let mut pending_metas = VecDeque::::new(); // Capture until error or shutdown loop { @@ -813,60 +322,74 @@ impl Receiver { continue; } Ok(Some(Frame::Video(frame))) => { - first_frame = false; - let mut buffer = receiver.create_video_buffer_and_info(&element, frame); - if first_video_frame { - if let Ok(Buffer::Video(ref mut buffer, _)) = buffer { - buffer - .get_mut() - .unwrap() - .set_flags(gst::BufferFlags::DISCONT); - first_video_frame = false; - } - } + if let Some(receive_time_gst) = element.current_running_time() { + let receive_time_real = (glib::real_time() as u64 * 1000).nseconds(); - if !pending_metas.is_empty() { - if let Ok(Buffer::Video(ref mut buffer, _)) = buffer { - let mut ndi_cc_decoder = receiver.0.ndi_cc_decoder.borrow_mut(); - for meta in pending_metas.drain(..) { - let res = ndi_cc_decoder.as_mut().unwrap().decode(&meta, buffer); - if let Err(err) = res { - gst::debug!(CAT, obj: element, "Failed to parse NDI metadata: {err}"); - } - } - } - } + first_frame = false; + let discont = first_video_frame; + first_video_frame = false; - buffer - } - Ok(Some(Frame::Audio(frame))) => { - first_frame = false; - let mut buffer = receiver.create_audio_buffer_and_info(&element, frame); - if first_audio_frame { - if let Ok(Buffer::Audio(ref mut buffer, _)) = buffer { - buffer - .get_mut() - .unwrap() - .set_flags(gst::BufferFlags::DISCONT); - first_audio_frame = false; - } - } - buffer - } - Ok(Some(Frame::Metadata(frame))) => { - if let Some(metadata) = frame.metadata() { gst::debug!( CAT, obj: element, - "Received metadata at timecode {}: {}", + "Received video frame at timecode {}: {:?}", (frame.timecode() as u64 * 100).nseconds(), - metadata, + frame, ); - pending_metas.push_back(metadata.to_string()); + Ok(Buffer::Video { + frame, + discont, + receive_time_gst, + receive_time_real, + }) + } else { + Err(gst::FlowError::Flushing) } + } + Ok(Some(Frame::Audio(frame))) => { + if let Some(receive_time_gst) = element.current_running_time() { + let receive_time_real = (glib::real_time() as u64 * 1000).nseconds(); + first_frame = false; + let discont = first_audio_frame; + first_audio_frame = false; - continue; + gst::debug!( + CAT, + obj: element, + "Received audio frame at timecode {}: {:?}", + (frame.timecode() as u64 * 100).nseconds(), + frame, + ); + + Ok(Buffer::Audio { + frame, + discont, + receive_time_gst, + receive_time_real, + }) + } else { + Err(gst::FlowError::Flushing) + } + } + Ok(Some(Frame::Metadata(frame))) => { + if let Some(receive_time_gst) = element.current_running_time() { + let receive_time_real = (glib::real_time() as u64 * 1000).nseconds(); + gst::debug!( + CAT, + obj: element, + "Received metadata frame at timecode {}: {:?}", + (frame.timecode() as u64 * 100).nseconds(), + frame, + ); + Ok(Buffer::Metadata { + frame, + receive_time_gst, + receive_time_real, + }) + } else { + Err(gst::FlowError::Flushing) + } } }; @@ -899,6 +422,9 @@ impl Receiver { queue.buffer_queue.clear(); (receiver.0.queue.0).1.notify_one(); timer = time::Instant::now(); + first_frame = true; + first_audio_frame = true; + first_video_frame = true; } Err(err) => { gst::error!(CAT, obj: element, "Signalling error"); @@ -912,924 +438,4 @@ impl Receiver { } } } - - fn calculate_timestamp( - &self, - element: &gst::Element, - is_audio: bool, - timestamp: i64, - timecode: i64, - duration: Option, - ) -> Option<(gst::ClockTime, Option, bool)> { - let receive_time = element.current_running_time()?; - - let real_time_now = (glib::real_time() as u64 * 1000).nseconds(); - let timestamp = if timestamp == ndisys::NDIlib_recv_timestamp_undefined { - gst::ClockTime::NONE - } else { - Some((timestamp as u64 * 100).nseconds()) - }; - let timecode = (timecode as u64 * 100).nseconds(); - - gst::log!( - CAT, - obj: element, - "Received frame with timecode {}, timestamp {}, duration {}, receive time {}, local time now {}", - timecode, - timestamp.display(), - duration.display(), - receive_time.display(), - real_time_now, - ); - - let res_timestamp = self.0.observations_timestamp[usize::from(!is_audio)].process( - element, - timestamp, - receive_time, - duration, - ); - - let res_timecode = self.0.observations_timecode[usize::from(!is_audio)].process( - element, - Some(timecode), - receive_time, - duration, - ); - - let (pts, duration, discont) = match self.0.timestamp_mode { - TimestampMode::ReceiveTimeTimecode => match res_timecode { - Some((pts, duration, discont)) => (pts, duration, discont), - None => { - gst::warning!(CAT, obj: element, "Can't calculate timestamp"); - (receive_time, duration, false) - } - }, - TimestampMode::ReceiveTimeTimestamp => match res_timestamp { - Some((pts, duration, discont)) => (pts, duration, discont), - None => { - if timestamp.is_some() { - gst::warning!(CAT, obj: element, "Can't calculate timestamp"); - } - - (receive_time, duration, false) - } - }, - TimestampMode::Timecode => (timecode, duration, false), - TimestampMode::Timestamp if timestamp.is_none() => (receive_time, duration, false), - TimestampMode::Timestamp => { - // Timestamps are relative to the UNIX epoch - let timestamp = timestamp?; - if real_time_now > timestamp { - let diff = real_time_now - timestamp; - if diff > receive_time { - (gst::ClockTime::ZERO, duration, false) - } else { - (receive_time - diff, duration, false) - } - } else { - let diff = timestamp - real_time_now; - (receive_time + diff, duration, false) - } - } - TimestampMode::ReceiveTime => (receive_time, duration, false), - TimestampMode::Auto => { - res_timecode - .or(res_timestamp) - .unwrap_or((receive_time, duration, false)) - } - }; - - gst::log!( - CAT, - obj: element, - "Calculated PTS {}, duration {}", - pts.display(), - duration.display(), - ); - - Some((pts, duration, discont)) - } - - fn create_video_buffer_and_info( - &self, - element: &gst::Element, - video_frame: VideoFrame, - ) -> Result { - gst::debug!(CAT, obj: element, "Received video frame {:?}", video_frame); - - let (pts, duration, discont) = self - .calculate_video_timestamp(element, &video_frame) - .ok_or_else(|| { - gst::debug!(CAT, obj: element, "Flushing, dropping buffer"); - gst::FlowError::Flushing - })?; - - let info = self.create_video_info(element, &video_frame)?; - - let mut buffer = self.create_video_buffer(element, pts, duration, &info, &video_frame)?; - if discont { - buffer - .get_mut() - .unwrap() - .set_flags(gst::BufferFlags::RESYNC); - } - - let mut ndi_cc_decoder = self.0.ndi_cc_decoder.borrow_mut(); - if ndi_cc_decoder.is_none() { - *ndi_cc_decoder = Some(NDICCMetaDecoder::new(info.width())); - } - - { - let ndi_cc_decoder = ndi_cc_decoder.as_mut().unwrap(); - // handle potential width change (also needed for standalone metadata) - ndi_cc_decoder.set_width(info.width()); - - if let Some(metadata) = video_frame.metadata() { - let res = ndi_cc_decoder.decode(metadata, &mut buffer); - if let Err(err) = res { - gst::debug!(CAT, obj: element, "Failed to parse NDI video frame metadata: {err}"); - } - } - } - - gst::log!(CAT, obj: element, "Produced video buffer {:?}", buffer); - - Ok(Buffer::Video(buffer, info)) - } - - fn calculate_video_timestamp( - &self, - element: &gst::Element, - video_frame: &VideoFrame, - ) -> Option<(gst::ClockTime, Option, bool)> { - let duration = gst::ClockTime::SECOND.mul_div_floor( - video_frame.frame_rate().1 as u64, - video_frame.frame_rate().0 as u64, - ); - - self.calculate_timestamp( - element, - false, - video_frame.timestamp(), - video_frame.timecode(), - duration, - ) - } - - fn create_video_info( - &self, - element: &gst::Element, - video_frame: &VideoFrame, - ) -> Result { - let fourcc = video_frame.fourcc(); - - let par = gst::Fraction::approximate_f32(video_frame.picture_aspect_ratio()) - .unwrap_or_else(|| gst::Fraction::new(1, 1)) - * gst::Fraction::new(video_frame.yres(), video_frame.xres()); - let interlace_mode = match video_frame.frame_format_type() { - ndisys::NDIlib_frame_format_type_e::NDIlib_frame_format_type_progressive => { - gst_video::VideoInterlaceMode::Progressive - } - ndisys::NDIlib_frame_format_type_e::NDIlib_frame_format_type_interleaved => { - gst_video::VideoInterlaceMode::Interleaved - } - #[cfg(feature = "interlaced-fields")] - _ => gst_video::VideoInterlaceMode::Alternate, - #[cfg(not(feature = "interlaced-fields"))] - _ => { - gst::element_error!( - element, - gst::StreamError::Format, - ["Separate field interlacing not supported"] - ); - return Err(gst::FlowError::NotNegotiated); - } - }; - - if [ - ndisys::NDIlib_FourCC_video_type_UYVY, - ndisys::NDIlib_FourCC_video_type_UYVA, - ndisys::NDIlib_FourCC_video_type_YV12, - ndisys::NDIlib_FourCC_video_type_NV12, - ndisys::NDIlib_FourCC_video_type_I420, - ndisys::NDIlib_FourCC_video_type_BGRA, - ndisys::NDIlib_FourCC_video_type_BGRX, - ndisys::NDIlib_FourCC_video_type_RGBA, - ndisys::NDIlib_FourCC_video_type_BGRX, - ] - .contains(&fourcc) - { - // YV12 and I420 are swapped in the NDI SDK compared to GStreamer - let format = match video_frame.fourcc() { - ndisys::NDIlib_FourCC_video_type_UYVY => gst_video::VideoFormat::Uyvy, - // FIXME: This drops the alpha plane! - ndisys::NDIlib_FourCC_video_type_UYVA => gst_video::VideoFormat::Uyvy, - ndisys::NDIlib_FourCC_video_type_YV12 => gst_video::VideoFormat::I420, - ndisys::NDIlib_FourCC_video_type_NV12 => gst_video::VideoFormat::Nv12, - ndisys::NDIlib_FourCC_video_type_I420 => gst_video::VideoFormat::Yv12, - ndisys::NDIlib_FourCC_video_type_BGRA => gst_video::VideoFormat::Bgra, - ndisys::NDIlib_FourCC_video_type_BGRX => gst_video::VideoFormat::Bgrx, - ndisys::NDIlib_FourCC_video_type_RGBA => gst_video::VideoFormat::Rgba, - ndisys::NDIlib_FourCC_video_type_RGBX => gst_video::VideoFormat::Rgbx, - _ => { - gst::element_error!( - element, - gst::StreamError::Format, - ["Unsupported video fourcc {:08x}", video_frame.fourcc()] - ); - - return Err(gst::FlowError::NotNegotiated); - } // TODO: NDIlib_FourCC_video_type_P216 and NDIlib_FourCC_video_type_PA16 not - // supported by GStreamer - }; - - #[cfg(feature = "interlaced-fields")] - { - let mut builder = gst_video::VideoInfo::builder( - format, - video_frame.xres() as u32, - video_frame.yres() as u32, - ) - .fps(gst::Fraction::from(video_frame.frame_rate())) - .par(par) - .interlace_mode(interlace_mode); - - if video_frame.frame_format_type() - == ndisys::NDIlib_frame_format_type_e::NDIlib_frame_format_type_interleaved - { - builder = builder.field_order(gst_video::VideoFieldOrder::TopFieldFirst); - } - - return Ok(VideoInfo::Video(builder.build().map_err(|_| { - gst::element_error!( - element, - gst::StreamError::Format, - ["Invalid video format configuration"] - ); - - gst::FlowError::NotNegotiated - })?)); - } - - #[cfg(not(feature = "interlaced-fields"))] - { - let mut builder = gst_video::VideoInfo::builder( - format, - video_frame.xres() as u32, - video_frame.yres() as u32, - ) - .fps(gst::Fraction::from(video_frame.frame_rate())) - .par(par) - .interlace_mode(interlace_mode); - - if video_frame.frame_format_type() - == ndisys::NDIlib_frame_format_type_e::NDIlib_frame_format_type_interleaved - { - builder = builder.field_order(gst_video::VideoFieldOrder::TopFieldFirst); - } - - return Ok(VideoInfo::Video(builder.build().map_err(|_| { - gst::element_error!( - element, - gst::StreamError::Format, - ["Invalid video format configuration"] - ); - - gst::FlowError::NotNegotiated - })?)); - } - } - - #[cfg(feature = "advanced-sdk")] - if [ - ndisys::NDIlib_FourCC_video_type_ex_SHQ0_highest_bandwidth, - ndisys::NDIlib_FourCC_video_type_ex_SHQ2_highest_bandwidth, - ndisys::NDIlib_FourCC_video_type_ex_SHQ7_highest_bandwidth, - ndisys::NDIlib_FourCC_video_type_ex_SHQ0_lowest_bandwidth, - ndisys::NDIlib_FourCC_video_type_ex_SHQ2_lowest_bandwidth, - ndisys::NDIlib_FourCC_video_type_ex_SHQ7_lowest_bandwidth, - ] - .contains(&fourcc) - { - let variant = match fourcc { - ndisys::NDIlib_FourCC_video_type_ex_SHQ0_highest_bandwidth - | ndisys::NDIlib_FourCC_video_type_ex_SHQ0_lowest_bandwidth => String::from("SHQ0"), - ndisys::NDIlib_FourCC_video_type_ex_SHQ2_highest_bandwidth - | ndisys::NDIlib_FourCC_video_type_ex_SHQ2_lowest_bandwidth => String::from("SHQ2"), - ndisys::NDIlib_FourCC_video_type_ex_SHQ7_highest_bandwidth - | ndisys::NDIlib_FourCC_video_type_ex_SHQ7_lowest_bandwidth => String::from("SHQ7"), - _ => { - gst::element_error!( - element, - gst::StreamError::Format, - [ - "Unsupported SpeedHQ video fourcc {:08x}", - video_frame.fourcc() - ] - ); - - return Err(gst::FlowError::NotNegotiated); - } - }; - - return Ok(VideoInfo::SpeedHQInfo { - variant, - xres: video_frame.xres(), - yres: video_frame.yres(), - fps_n: video_frame.frame_rate().0, - fps_d: video_frame.frame_rate().1, - par_n: par.numer(), - par_d: par.denom(), - interlace_mode, - }); - } - - #[cfg(feature = "advanced-sdk")] - if [ - ndisys::NDIlib_FourCC_video_type_ex_H264_highest_bandwidth, - ndisys::NDIlib_FourCC_video_type_ex_H264_lowest_bandwidth, - ndisys::NDIlib_FourCC_video_type_ex_H264_alpha_highest_bandwidth, - ndisys::NDIlib_FourCC_video_type_ex_H264_alpha_lowest_bandwidth, - ] - .contains(&fourcc) - { - let compressed_packet = video_frame.compressed_packet().ok_or_else(|| { - gst::error!( - CAT, - obj: element, - "Video packet doesn't have compressed packet start" - ); - gst::element_error!(element, gst::StreamError::Format, ["Invalid video packet"]); - - gst::FlowError::Error - })?; - - if compressed_packet.fourcc != NDIlib_compressed_FourCC_type_H264 { - gst::error!(CAT, obj: element, "Non-H264 video packet"); - gst::element_error!(element, gst::StreamError::Format, ["Invalid video packet"]); - - return Err(gst::FlowError::Error); - } - - return Ok(VideoInfo::H264 { - xres: video_frame.xres(), - yres: video_frame.yres(), - fps_n: video_frame.frame_rate().0, - fps_d: video_frame.frame_rate().1, - par_n: par.numer(), - par_d: par.denom(), - interlace_mode, - }); - } - - #[cfg(feature = "advanced-sdk")] - if [ - ndisys::NDIlib_FourCC_video_type_ex_HEVC_highest_bandwidth, - ndisys::NDIlib_FourCC_video_type_ex_HEVC_lowest_bandwidth, - ndisys::NDIlib_FourCC_video_type_ex_HEVC_alpha_highest_bandwidth, - ndisys::NDIlib_FourCC_video_type_ex_HEVC_alpha_lowest_bandwidth, - ] - .contains(&fourcc) - { - let compressed_packet = video_frame.compressed_packet().ok_or_else(|| { - gst::error!( - CAT, - obj: element, - "Video packet doesn't have compressed packet start" - ); - gst::element_error!(element, gst::StreamError::Format, ["Invalid video packet"]); - - gst::FlowError::Error - })?; - - if compressed_packet.fourcc != NDIlib_compressed_FourCC_type_HEVC { - gst::error!(CAT, obj: element, "Non-H265 video packet"); - gst::element_error!(element, gst::StreamError::Format, ["Invalid video packet"]); - - return Err(gst::FlowError::Error); - } - - return Ok(VideoInfo::H265 { - xres: video_frame.xres(), - yres: video_frame.yres(), - fps_n: video_frame.frame_rate().0, - fps_d: video_frame.frame_rate().1, - par_n: par.numer(), - par_d: par.denom(), - interlace_mode, - }); - } - - gst::element_error!( - element, - gst::StreamError::Format, - ["Unsupported video fourcc {:08x}", video_frame.fourcc()] - ); - Err(gst::FlowError::NotNegotiated) - } - - fn create_video_buffer( - &self, - element: &gst::Element, - pts: gst::ClockTime, - duration: Option, - info: &VideoInfo, - video_frame: &VideoFrame, - ) -> Result { - let mut buffer = self.copy_video_frame(element, info, video_frame)?; - { - let buffer = buffer.get_mut().unwrap(); - buffer.set_pts(pts); - buffer.set_duration(duration); - - gst::ReferenceTimestampMeta::add( - buffer, - &crate::TIMECODE_CAPS, - (video_frame.timecode() as u64 * 100).nseconds(), - gst::ClockTime::NONE, - ); - if video_frame.timestamp() != ndisys::NDIlib_recv_timestamp_undefined { - gst::ReferenceTimestampMeta::add( - buffer, - &crate::TIMESTAMP_CAPS, - (video_frame.timestamp() as u64 * 100).nseconds(), - gst::ClockTime::NONE, - ); - } - - #[cfg(feature = "interlaced-fields")] - { - match video_frame.frame_format_type() { - ndisys::NDIlib_frame_format_type_e::NDIlib_frame_format_type_interleaved => { - buffer.set_video_flags( - gst_video::VideoBufferFlags::INTERLACED - | gst_video::VideoBufferFlags::TFF, - ); - } - ndisys::NDIlib_frame_format_type_e::NDIlib_frame_format_type_field_0 => { - buffer.set_video_flags( - gst_video::VideoBufferFlags::INTERLACED - | gst_video::VideoBufferFlags::TOP_FIELD, - ); - } - ndisys::NDIlib_frame_format_type_e::NDIlib_frame_format_type_field_1 => { - buffer.set_video_flags( - gst_video::VideoBufferFlags::INTERLACED - | gst_video::VideoBufferFlags::BOTTOM_FIELD, - ); - } - _ => (), - }; - } - - #[cfg(not(feature = "interlaced-fields"))] - { - if video_frame.frame_format_type() - == ndisys::NDIlib_frame_format_type_e::NDIlib_frame_format_type_interleaved - { - buffer.set_video_flags( - gst_video::VideoBufferFlags::INTERLACED | gst_video::VideoBufferFlags::TFF, - ); - } - } - } - - Ok(buffer) - } - - fn copy_video_frame( - &self, - #[allow(unused_variables)] element: &gst::Element, - info: &VideoInfo, - video_frame: &VideoFrame, - ) -> Result { - match info { - VideoInfo::Video(ref info) => { - let src = video_frame.data().ok_or(gst::FlowError::Error)?; - - let buffer = gst::Buffer::with_size(info.size()).unwrap(); - let mut vframe = gst_video::VideoFrame::from_buffer_writable(buffer, info).unwrap(); - - match info.format() { - gst_video::VideoFormat::Uyvy - | gst_video::VideoFormat::Bgra - | gst_video::VideoFormat::Bgrx - | gst_video::VideoFormat::Rgba - | gst_video::VideoFormat::Rgbx => { - let line_bytes = if info.format() == gst_video::VideoFormat::Uyvy { - 2 * vframe.width() as usize - } else { - 4 * vframe.width() as usize - }; - - let dest_stride = vframe.plane_stride()[0] as usize; - let dest = vframe.plane_data_mut(0).unwrap(); - let src_stride = video_frame.line_stride_or_data_size_in_bytes() as usize; - let plane_size = video_frame.yres() as usize * src_stride; - - if src.len() < plane_size || src_stride < line_bytes { - gst::error!(CAT, obj: element, "Video packet has wrong stride or size"); - gst::element_error!( - element, - gst::StreamError::Format, - ["Video packet has wrong stride or size"] - ); - return Err(gst::FlowError::Error); - } - - for (dest, src) in dest - .chunks_exact_mut(dest_stride) - .zip(src.chunks_exact(src_stride)) - { - dest[..line_bytes].copy_from_slice(&src[..line_bytes]); - } - } - gst_video::VideoFormat::Nv12 => { - let line_bytes = vframe.width() as usize; - let src_stride = video_frame.line_stride_or_data_size_in_bytes() as usize; - let plane_size = video_frame.yres() as usize * src_stride; - - if src.len() < 2 * plane_size || src_stride < line_bytes { - gst::error!(CAT, obj: element, "Video packet has wrong stride or size"); - gst::element_error!( - element, - gst::StreamError::Format, - ["Video packet has wrong stride or size"] - ); - return Err(gst::FlowError::Error); - } - - // First plane - { - let dest_stride = vframe.plane_stride()[0] as usize; - let dest = vframe.plane_data_mut(0).unwrap(); - let src = &src[..plane_size]; - - for (dest, src) in dest - .chunks_exact_mut(dest_stride) - .zip(src.chunks_exact(src_stride)) - { - dest[..line_bytes].copy_from_slice(&src[..line_bytes]); - } - } - - // Second plane - { - let dest_stride = vframe.plane_stride()[1] as usize; - let dest = vframe.plane_data_mut(1).unwrap(); - let src = &src[plane_size..]; - - for (dest, src) in dest - .chunks_exact_mut(dest_stride) - .zip(src.chunks_exact(src_stride)) - { - dest[..line_bytes].copy_from_slice(&src[..line_bytes]); - } - } - } - gst_video::VideoFormat::Yv12 | gst_video::VideoFormat::I420 => { - let line_bytes = vframe.width() as usize; - let line_bytes1 = (line_bytes + 1) / 2; - - let src_stride = video_frame.line_stride_or_data_size_in_bytes() as usize; - let src_stride1 = (src_stride + 1) / 2; - - let plane_size = video_frame.yres() as usize * src_stride; - let plane_size1 = ((video_frame.yres() as usize + 1) / 2) * src_stride1; - - if src.len() < plane_size + 2 * plane_size1 || src_stride < line_bytes { - gst::error!(CAT, obj: element, "Video packet has wrong stride or size"); - gst::element_error!( - element, - gst::StreamError::Format, - ["Video packet has wrong stride or size"] - ); - return Err(gst::FlowError::Error); - } - - // First plane - { - let dest_stride = vframe.plane_stride()[0] as usize; - let dest = vframe.plane_data_mut(0).unwrap(); - let src = &src[..plane_size]; - - for (dest, src) in dest - .chunks_exact_mut(dest_stride) - .zip(src.chunks_exact(src_stride)) - { - dest[..line_bytes].copy_from_slice(&src[..line_bytes]); - } - } - - // Second plane - { - let dest_stride = vframe.plane_stride()[1] as usize; - let dest = vframe.plane_data_mut(1).unwrap(); - let src = &src[plane_size..][..plane_size1]; - - for (dest, src) in dest - .chunks_exact_mut(dest_stride) - .zip(src.chunks_exact(src_stride1)) - { - dest[..line_bytes1].copy_from_slice(&src[..line_bytes1]); - } - } - - // Third plane - { - let dest_stride = vframe.plane_stride()[2] as usize; - let dest = vframe.plane_data_mut(2).unwrap(); - let src = &src[plane_size + plane_size1..][..plane_size1]; - - for (dest, src) in dest - .chunks_exact_mut(dest_stride) - .zip(src.chunks_exact(src_stride1)) - { - dest[..line_bytes1].copy_from_slice(&src[..line_bytes1]); - } - } - } - _ => unreachable!(), - } - - Ok(vframe.into_buffer()) - } - #[cfg(feature = "advanced-sdk")] - VideoInfo::SpeedHQInfo { .. } => { - let data = video_frame.data().ok_or_else(|| { - gst::error!(CAT, obj: element, "Video packet has no data"); - gst::element_error!( - element, - gst::StreamError::Format, - ["Invalid video packet"] - ); - - gst::FlowError::Error - })?; - - Ok(gst::Buffer::from_mut_slice(Vec::from(data))) - } - #[cfg(feature = "advanced-sdk")] - VideoInfo::H264 { .. } | VideoInfo::H265 { .. } => { - let compressed_packet = video_frame.compressed_packet().ok_or_else(|| { - gst::error!( - CAT, - obj: element, - "Video packet doesn't have compressed packet start" - ); - gst::element_error!( - element, - gst::StreamError::Format, - ["Invalid video packet"] - ); - - gst::FlowError::Error - })?; - - let mut buffer = Vec::new(); - if let Some(extra_data) = compressed_packet.extra_data { - buffer.extend_from_slice(extra_data); - } - buffer.extend_from_slice(compressed_packet.data); - let mut buffer = gst::Buffer::from_mut_slice(buffer); - if !compressed_packet.key_frame { - let buffer = buffer.get_mut().unwrap(); - buffer.set_flags(gst::BufferFlags::DELTA_UNIT); - } - - Ok(buffer) - } - } - } - - fn create_audio_buffer_and_info( - &self, - element: &gst::Element, - audio_frame: AudioFrame, - ) -> Result { - gst::debug!(CAT, obj: element, "Received audio frame {:?}", audio_frame); - - let (pts, duration, discont) = self - .calculate_audio_timestamp(element, &audio_frame) - .ok_or_else(|| { - gst::debug!(CAT, obj: element, "Flushing, dropping buffer"); - gst::FlowError::Flushing - })?; - - let info = self.create_audio_info(element, &audio_frame)?; - - let mut buffer = self.create_audio_buffer(element, pts, duration, &info, &audio_frame)?; - if discont { - buffer - .get_mut() - .unwrap() - .set_flags(gst::BufferFlags::RESYNC); - } - - gst::log!(CAT, obj: element, "Produced audio buffer {:?}", buffer); - - Ok(Buffer::Audio(buffer, info)) - } - - fn calculate_audio_timestamp( - &self, - element: &gst::Element, - audio_frame: &AudioFrame, - ) -> Option<(gst::ClockTime, Option, bool)> { - let duration = gst::ClockTime::SECOND.mul_div_floor( - audio_frame.no_samples() as u64, - audio_frame.sample_rate() as u64, - ); - - self.calculate_timestamp( - element, - true, - audio_frame.timestamp(), - audio_frame.timecode(), - duration, - ) - } - - fn create_audio_info( - &self, - element: &gst::Element, - audio_frame: &AudioFrame, - ) -> Result { - let fourcc = audio_frame.fourcc(); - - if [NDIlib_FourCC_audio_type_FLTp].contains(&fourcc) { - let channels = audio_frame.no_channels() as u32; - let mut positions = [gst_audio::AudioChannelPosition::None; 64]; - if channels <= 8 { - let _ = gst_audio::AudioChannelPosition::positions_from_mask( - gst_audio::AudioChannelPosition::fallback_mask(channels), - &mut positions[..channels as usize], - ); - } - - let builder = gst_audio::AudioInfo::builder( - gst_audio::AUDIO_FORMAT_F32, - audio_frame.sample_rate() as u32, - channels, - ) - .positions(&positions[..channels as usize]); - - let info = builder.build().map_err(|_| { - gst::element_error!( - element, - gst::StreamError::Format, - ["Invalid audio format configuration"] - ); - - gst::FlowError::NotNegotiated - })?; - - return Ok(AudioInfo::Audio(info)); - } - - #[cfg(feature = "advanced-sdk")] - if [NDIlib_FourCC_audio_type_AAC].contains(&fourcc) { - let compressed_packet = audio_frame.compressed_packet().ok_or_else(|| { - gst::error!( - CAT, - obj: element, - "Audio packet doesn't have compressed packet start" - ); - gst::element_error!(element, gst::StreamError::Format, ["Invalid audio packet"]); - - gst::FlowError::Error - })?; - - if compressed_packet.fourcc != NDIlib_compressed_FourCC_type_AAC { - gst::error!(CAT, obj: element, "Non-AAC audio packet"); - gst::element_error!(element, gst::StreamError::Format, ["Invalid audio packet"]); - - return Err(gst::FlowError::Error); - } - - return Ok(AudioInfo::Aac { - sample_rate: audio_frame.sample_rate(), - no_channels: audio_frame.no_channels(), - codec_data: compressed_packet - .extra_data - .ok_or(gst::FlowError::NotNegotiated)? - .try_into() - .map_err(|_| gst::FlowError::NotNegotiated)?, - }); - } - - // FIXME: Needs testing with an actual stream to understand how it works - // #[cfg(feature = "advanced-sdk")] - // if [NDIlib_FourCC_audio_type_Opus].contains(&fourcc) {} - - gst::element_error!( - element, - gst::StreamError::Format, - ["Unsupported audio fourcc {:08x}", audio_frame.fourcc()] - ); - Err(gst::FlowError::NotNegotiated) - } - - fn create_audio_buffer( - &self, - #[allow(unused_variables)] element: &gst::Element, - pts: gst::ClockTime, - duration: Option, - info: &AudioInfo, - audio_frame: &AudioFrame, - ) -> Result { - match info { - AudioInfo::Audio(ref info) => { - let src = audio_frame.data().ok_or(gst::FlowError::Error)?; - let buff_size = (audio_frame.no_samples() as u32 * info.bpf()) as usize; - - let mut buffer = gst::Buffer::with_size(buff_size).unwrap(); - { - let buffer = buffer.get_mut().unwrap(); - - buffer.set_pts(pts); - buffer.set_duration(duration); - - gst::ReferenceTimestampMeta::add( - buffer, - &crate::TIMECODE_CAPS, - (audio_frame.timecode() as u64 * 100).nseconds(), - gst::ClockTime::NONE, - ); - if audio_frame.timestamp() != ndisys::NDIlib_recv_timestamp_undefined { - gst::ReferenceTimestampMeta::add( - buffer, - &crate::TIMESTAMP_CAPS, - (audio_frame.timestamp() as u64 * 100).nseconds(), - gst::ClockTime::NONE, - ); - } - - let mut dest = buffer.map_writable().unwrap(); - let dest = dest - .as_mut_slice_of::() - .map_err(|_| gst::FlowError::NotNegotiated)?; - assert!( - dest.len() - == audio_frame.no_samples() as usize - * audio_frame.no_channels() as usize - ); - - for (channel, samples) in src - .chunks_exact(audio_frame.channel_stride_or_data_size_in_bytes() as usize) - .enumerate() - { - let samples = samples - .as_slice_of::() - .map_err(|_| gst::FlowError::NotNegotiated)?; - - for (i, sample) in samples[..audio_frame.no_samples() as usize] - .iter() - .enumerate() - { - dest[i * (audio_frame.no_channels() as usize) + channel] = *sample; - } - } - } - - Ok(buffer) - } - #[cfg(feature = "advanced-sdk")] - AudioInfo::Opus { .. } => { - let data = audio_frame.data().ok_or_else(|| { - gst::error!(CAT, obj: element, "Audio packet has no data"); - gst::element_error!( - element, - gst::StreamError::Format, - ["Invalid audio packet"] - ); - - gst::FlowError::Error - })?; - - Ok(gst::Buffer::from_mut_slice(Vec::from(data))) - } - #[cfg(feature = "advanced-sdk")] - AudioInfo::Aac { .. } => { - let compressed_packet = audio_frame.compressed_packet().ok_or_else(|| { - gst::error!( - CAT, - obj: element, - "Audio packet doesn't have compressed packet start" - ); - gst::element_error!( - element, - gst::StreamError::Format, - ["Invalid audio packet"] - ); - - gst::FlowError::Error - })?; - - Ok(gst::Buffer::from_mut_slice(Vec::from( - compressed_packet.data, - ))) - } - } - } } diff --git a/net/ndi/src/ndisrcdemux/imp.rs b/net/ndi/src/ndisrcdemux/imp.rs index 225431a6..be93e4c8 100644 --- a/net/ndi/src/ndisrcdemux/imp.rs +++ b/net/ndi/src/ndisrcdemux/imp.rs @@ -1,13 +1,20 @@ // SPDX-License-Identifier: MPL-2.0 +use atomic_refcell::AtomicRefCell; +use gst::glib::once_cell::sync::Lazy; use gst::prelude::*; use gst::subclass::prelude::*; +use gst_video::prelude::*; -use std::sync::Mutex; +use std::{cmp, collections::VecDeque, sync::Mutex}; -use gst::glib::once_cell::sync::Lazy; +use byte_slice_cast::*; -use crate::ndisrcmeta; +use crate::{ + ndi_cc_meta::NDICCMetaDecoder, + ndisrcmeta::{self, Buffer}, + ndisys, TimestampMode, +}; static CAT: Lazy = Lazy::new(|| { gst::DebugCategory::new( @@ -17,14 +24,55 @@ static CAT: Lazy = Lazy::new(|| { ) }); -#[derive(Default)] struct State { combiner: gst_base::UniqueFlowCombiner, video_pad: Option, + video_info: Option, video_caps: Option, + video_buffer_pool: Option, audio_pad: Option, + audio_info: Option, audio_caps: Option, + // Only set for raw audio + audio_info_non_interleaved: Option, + audio_caps_non_interleaved: Option, + audio_non_interleaved: bool, + + ndi_cc_decoder: Option, + pending_metadata: Vec, + + // Audio/video time observations + timestamp_mode: TimestampMode, + observations_timestamp: [Observations; 2], + observations_timecode: [Observations; 2], +} + +impl Default for State { + fn default() -> State { + State { + combiner: gst_base::UniqueFlowCombiner::new(), + + video_pad: None, + video_info: None, + video_caps: None, + video_buffer_pool: None, + + audio_pad: None, + audio_info: None, + audio_caps: None, + audio_info_non_interleaved: None, + audio_caps_non_interleaved: None, + audio_non_interleaved: false, + + ndi_cc_decoder: None, + pending_metadata: Vec::new(), + + timestamp_mode: TimestampMode::Auto, + observations_timestamp: [Observations::default(), Observations::default()], + observations_timecode: [Observations::default(), Observations::default()], + } + } } pub struct NdiSrcDemux { @@ -135,12 +183,18 @@ impl ElementImpl for NdiSrcDemux { match transition { gst::StateChange::PausedToReady => { let mut state = self.state.lock().unwrap(); + for pad in [state.audio_pad.take(), state.video_pad.take()] .iter() .flatten() { self.obj().remove_pad(pad).unwrap(); } + + if let Some(pool) = state.video_buffer_pool.take() { + let _ = pool.set_active(false); + } + *state = State::default(); } _ => (), @@ -158,7 +212,7 @@ impl NdiSrcDemux { ) -> Result { gst::log!(CAT, imp: self, "Handling buffer {:?}", buffer); - let meta = buffer + let mut meta = buffer .make_mut() .meta_mut::() .ok_or_else(|| { @@ -166,125 +220,325 @@ impl NdiSrcDemux { gst::FlowError::Error })?; - let mut events = vec![]; - let srcpad; - let mut add_pad = false; - let mut state = self.state.lock().unwrap(); - let caps = meta.caps(); - match meta.stream_type() { - ndisrcmeta::StreamType::Audio => { + let ndi_buffer = meta.take_ndi_buffer(); + + match ndi_buffer { + Buffer::Audio { ref frame, .. } => { + gst::debug!(CAT, imp: self, "Received audio frame {:?}", frame); + + let mut reconfigure = false; + let info = self.create_audio_info(frame)?; + if Some(&info) != state.audio_info.as_ref() { + let caps = info.to_caps().map_err(|_| { + gst::element_imp_error!( + self, + gst::ResourceError::Settings, + ["Invalid audio info received: {:?}", info] + ); + gst::FlowError::NotNegotiated + })?; + + gst::debug!(CAT, imp: self, "Audio caps changed to {}", caps); + + #[allow(irrefutable_let_patterns)] + if let AudioInfo::Audio(ref info) = info { + let mut builder = gst_audio::AudioInfo::builder( + info.format(), + info.rate(), + info.channels(), + ) + .layout(gst_audio::AudioLayout::NonInterleaved); + + if let Some(positions) = info.positions() { + builder = builder.positions(positions); + } + + let non_interleaved_info = builder.build().unwrap(); + state.audio_caps_non_interleaved = + Some(non_interleaved_info.to_caps().unwrap()); + state.audio_info_non_interleaved = Some(non_interleaved_info); + } else { + state.audio_non_interleaved = false; + state.audio_caps_non_interleaved = None; + state.audio_info_non_interleaved = None; + } + + state.audio_info = Some(info); + state.audio_caps = Some(caps); + reconfigure = true; + } + + let srcpad; if let Some(ref pad) = state.audio_pad { srcpad = pad.clone(); + reconfigure |= pad.check_reconfigure(); } else { - gst::debug!(CAT, imp: self, "Adding audio pad with caps {}", caps); + gst::debug!(CAT, imp: self, "Adding audio pad"); let templ = self.obj().element_class().pad_template("audio").unwrap(); let pad = gst::Pad::builder_from_template(&templ) .flags(gst::PadFlags::FIXED_CAPS) .build(); - let mut caps_event = Some(gst::event::Caps::new(&caps)); + state.audio_pad = Some(pad.clone()); + let _ = pad.set_active(true); + state.combiner.add_pad(&pad); + + let mut stored_caps = false; self.sinkpad.sticky_events_foreach(|ev| { - if ev.type_() < gst::EventType::Caps { - events.push(ev.clone()); - } else { - if let Some(ev) = caps_event.take() { - events.push(ev); - } + if let gst::EventView::StreamStart(ev) = ev.view() { + let stream_start = gst::event::StreamStart::builder(&format!( + "{}/audio", + ev.stream_id() + )) + .seqnum(ev.seqnum()) + .flags(ev.stream_flags()) + .group_id(ev.group_id().unwrap_or_else(|| { + // This can't really happen as ndisrc would provide one! + gst::error!(CAT, imp: self, "Upstream provided no group id"); + gst::GroupId::next() + })) + .build(); - if ev.type_() != gst::EventType::Caps { - events.push(ev.clone()); - } + let _ = pad.store_sticky_event(&stream_start); + } else if ev.type_() < gst::EventType::Caps { + let _ = pad.store_sticky_event(ev); + } else if ev.type_() > gst::EventType::Caps { + // We store the interleaved caps for starters + let caps = + gst::event::Caps::builder(state.audio_caps.as_ref().unwrap()) + .build(); + let _ = pad.store_sticky_event(&caps); + stored_caps = true; + let _ = pad.store_sticky_event(ev); } std::ops::ControlFlow::Continue(gst::EventForeachAction::Keep) }); - state.audio_caps = Some(caps.clone()); - state.audio_pad = Some(pad.clone()); - - let _ = pad.set_active(true); - for ev in events.drain(..) { - let _ = pad.store_sticky_event(&ev); + if !stored_caps { + // We store the interleaved caps for starters + let caps = + gst::event::Caps::builder(state.audio_caps.as_ref().unwrap()).build(); + let _ = pad.store_sticky_event(&caps); } - state.combiner.add_pad(&pad); + drop(state); + + self.obj().add_pad(&pad).unwrap(); + if self.obj().num_src_pads() == 2 { + self.obj().no_more_pads(); + } + + state = self.state.lock().unwrap(); - add_pad = true; srcpad = pad; + // No need to check for non-interleaved caps support below or update the caps + // because the same caps were already set above + reconfigure = state.audio_caps_non_interleaved.is_some(); } - if state.audio_caps.as_ref() != Some(&caps) { - gst::debug!(CAT, imp: self, "Audio caps changed to {}", caps); - events.push(gst::event::Caps::new(&caps)); - state.audio_caps = Some(caps); + if reconfigure { + // FIXME: As this is a demuxer we can't unfortunately do an allocation query + // downstream without risking deadlocks. + + // Check if there's a peer downstream and if it supports the non-interleaved + // caps, otherwise fall back to the normal caps. + if let Some(caps) = state.audio_caps_non_interleaved.clone() { + drop(state); + let allowed_caps = srcpad.peer().map(|peer| peer.query_caps(Some(&caps))); + state = self.state.lock().unwrap(); + + gst::info!(CAT, imp: self, "Allowed audio caps {allowed_caps:?}"); + + state.audio_non_interleaved = allowed_caps + .map_or(false, |allowed_caps| allowed_caps.can_intersect(&caps)); + + gst::info!( + CAT, + imp: self, + "Non-interleaved caps{} supported", + if state.audio_non_interleaved { "" } else { "not" }, + ); + } + + let caps = gst::event::Caps::builder(if state.audio_non_interleaved { + state.audio_caps_non_interleaved.as_ref().unwrap() + } else { + state.audio_caps.as_ref().unwrap() + }) + .build(); + + let _ = srcpad.store_sticky_event(&caps); } } - ndisrcmeta::StreamType::Video => { + Buffer::Video { ref frame, .. } => { + gst::debug!(CAT, imp: self, "Received video frame {:?}", frame); + + let mut reconfigure = false; + let info = self.create_video_info(frame)?; + if Some(&info) != state.video_info.as_ref() { + let caps = info.to_caps().map_err(|_| { + gst::element_imp_error!( + self, + gst::ResourceError::Settings, + ["Invalid video info received: {:?}", info] + ); + gst::FlowError::NotNegotiated + })?; + + if state.ndi_cc_decoder.is_none() { + state.ndi_cc_decoder = Some(NDICCMetaDecoder::new(info.width())); + } + + gst::debug!(CAT, imp: self, "Video caps changed to {}", caps); + state.video_info = Some(info); + state.video_caps = Some(caps); + state.video_buffer_pool = None; + reconfigure = true; + } + + let srcpad; if let Some(ref pad) = state.video_pad { srcpad = pad.clone(); + reconfigure |= pad.check_reconfigure(); } else { - gst::debug!(CAT, imp: self, "Adding video pad with caps {}", caps); + gst::debug!(CAT, imp: self, "Adding video pad"); let templ = self.obj().element_class().pad_template("video").unwrap(); let pad = gst::Pad::builder_from_template(&templ) .flags(gst::PadFlags::FIXED_CAPS) .build(); - let mut caps_event = Some(gst::event::Caps::new(&caps)); + state.video_pad = Some(pad.clone()); + let _ = pad.set_active(true); + state.combiner.add_pad(&pad); + + let mut stored_caps = false; self.sinkpad.sticky_events_foreach(|ev| { - if ev.type_() < gst::EventType::Caps { - events.push(ev.clone()); - } else { - if let Some(ev) = caps_event.take() { - events.push(ev); - } + if let gst::EventView::StreamStart(ev) = ev.view() { + let stream_start = gst::event::StreamStart::builder(&format!( + "{}/video", + ev.stream_id() + )) + .seqnum(ev.seqnum()) + .flags(ev.stream_flags()) + .group_id(ev.group_id().unwrap_or_else(|| { + // This can't really happen as ndisrc would provide one! + gst::error!(CAT, imp: self, "Upstream provided no group id"); + gst::GroupId::next() + })) + .build(); - if ev.type_() != gst::EventType::Caps { - events.push(ev.clone()); - } + let _ = pad.store_sticky_event(&stream_start); + } else if ev.type_() < gst::EventType::Caps { + let _ = pad.store_sticky_event(ev); + } else if ev.type_() > gst::EventType::Caps { + let caps = + gst::event::Caps::builder(state.video_caps.as_ref().unwrap()) + .build(); + let _ = pad.store_sticky_event(&caps); + stored_caps = true; + let _ = pad.store_sticky_event(ev); } std::ops::ControlFlow::Continue(gst::EventForeachAction::Keep) }); - state.video_caps = Some(caps.clone()); - state.video_pad = Some(pad.clone()); - - let _ = pad.set_active(true); - for ev in events.drain(..) { - let _ = pad.store_sticky_event(&ev); + if !stored_caps { + let caps = + gst::event::Caps::builder(state.video_caps.as_ref().unwrap()).build(); + let _ = pad.store_sticky_event(&caps); } - state.combiner.add_pad(&pad); + drop(state); + + self.obj().add_pad(&pad).unwrap(); + if self.obj().num_src_pads() == 2 { + self.obj().no_more_pads(); + } + + state = self.state.lock().unwrap(); - add_pad = true; srcpad = pad; + + // New caps were already stored above + reconfigure = false; } - if state.video_caps.as_ref() != Some(&caps) { - gst::debug!(CAT, imp: self, "Video caps changed to {}", caps); - events.push(gst::event::Caps::new(&caps)); - state.video_caps = Some(caps); + if reconfigure { + // FIXME: As this is a demuxer we can't unfortunately do an allocation query + // downstream without risking deadlocks. + let caps = + gst::event::Caps::builder(state.video_caps.as_ref().unwrap()).build(); + + let _ = srcpad.store_sticky_event(&caps); } } + Buffer::Metadata { .. } => { + // Nothing to be done here + } } + + let srcpad; + let buffer; + match ndi_buffer { + Buffer::Audio { + frame, + discont, + receive_time_gst, + receive_time_real, + } => { + srcpad = state.audio_pad.clone().unwrap(); + let (pts, duration, resync) = self + .calculate_audio_timestamp( + &mut state, + receive_time_gst, + receive_time_real, + &frame, + ) + .ok_or_else(|| { + gst::debug!(CAT, imp: self, "Flushing, dropping buffer"); + gst::FlowError::Flushing + })?; + + buffer = self.create_audio_buffer(&state, pts, duration, discont, resync, frame)?; + + gst::log!(CAT, imp: self, "Produced audio buffer {:?}", buffer); + } + Buffer::Video { + frame, + discont, + receive_time_gst, + receive_time_real, + } => { + srcpad = state.video_pad.clone().unwrap(); + let (pts, duration, resync) = self + .calculate_video_timestamp( + &mut state, + receive_time_gst, + receive_time_real, + &frame, + ) + .ok_or_else(|| { + gst::debug!(CAT, imp: self, "Flushing, dropping buffer"); + gst::FlowError::Flushing + })?; + + buffer = + self.create_video_buffer(&mut state, pts, duration, discont, resync, frame)?; + + gst::log!(CAT, imp: self, "Produced video buffer {:?}", buffer); + } + Buffer::Metadata { frame, .. } => { + state.pending_metadata.push(frame); + return Ok(gst::FlowSuccess::Ok); + } + }; drop(state); - meta.remove().unwrap(); - - if add_pad { - self.obj().add_pad(&srcpad).unwrap(); - if self.obj().num_src_pads() == 2 { - self.obj().no_more_pads(); - } - } - - for ev in events { - srcpad.push_event(ev); - } let res = srcpad.push(buffer); @@ -296,16 +550,1477 @@ impl NdiSrcDemux { use gst::EventView; gst::log!(CAT, imp: self, "Handling event {:?}", event); - if let EventView::Eos(_) = event.view() { - if self.obj().num_src_pads() == 0 { - // error out on EOS if no src pad are available - gst::element_imp_error!( - self, - gst::StreamError::Demux, - ["EOS without available srcpad(s)"] - ); + match event.view() { + EventView::StreamStart(ev) => { + let state = self.state.lock().unwrap(); + let pads = [ + ("audio", state.audio_pad.clone()), + ("video", state.video_pad.clone()), + ]; + drop(state); + + for (stream_name, srcpad) in pads { + let Some(srcpad) = srcpad else { + continue; + }; + + let stream_start = gst::event::StreamStart::builder(&format!( + "{}/{stream_name}", + ev.stream_id() + )) + .seqnum(ev.seqnum()) + .flags(ev.stream_flags()) + .group_id(ev.group_id().unwrap_or_else(|| { + // This can't really happen as ndisrc would provide one! + gst::error!(CAT, imp: self, "Upstream provided no group id"); + gst::GroupId::next() + })) + .build(); + + let _ = srcpad.push_event(stream_start); + } + + return true; } + EventView::Caps(_) => { + return true; + } + EventView::Eos(_) => { + if self.obj().num_src_pads() == 0 { + // error out on EOS if no src pad are available + gst::element_imp_error!( + self, + gst::StreamError::Demux, + ["EOS without available srcpad(s)"] + ); + } + } + _ => (), } gst::Pad::event_default(pad, Some(&*self.obj()), event) } } + +impl NdiSrcDemux { + #[allow(clippy::too_many_arguments)] + fn calculate_timestamp( + &self, + state: &mut State, + is_audio: bool, + receive_time_gst: gst::ClockTime, + receive_time_real: gst::ClockTime, + timestamp: i64, + timecode: i64, + duration: Option, + ) -> Option<(gst::ClockTime, Option, bool)> { + let timestamp = if timestamp == ndisys::NDIlib_recv_timestamp_undefined { + gst::ClockTime::NONE + } else { + Some((timestamp as u64 * 100).nseconds()) + }; + let timecode = (timecode as u64 * 100).nseconds(); + + gst::log!( + CAT, + imp: self, + "Received frame with timecode {}, timestamp {}, duration {}, receive time {}, local time now {}", + timecode, + timestamp.display(), + duration.display(), + receive_time_gst.display(), + receive_time_real, + ); + + let res_timestamp = state.observations_timestamp[usize::from(!is_audio)].process( + self.obj().upcast_ref(), + timestamp, + receive_time_gst, + duration, + ); + + let res_timecode = state.observations_timecode[usize::from(!is_audio)].process( + self.obj().upcast_ref(), + Some(timecode), + receive_time_gst, + duration, + ); + + let (pts, duration, discont) = match state.timestamp_mode { + TimestampMode::ReceiveTimeTimecode => match res_timecode { + Some((pts, duration, discont)) => (pts, duration, discont), + None => { + gst::warning!(CAT, imp: self, "Can't calculate timestamp"); + (receive_time_gst, duration, false) + } + }, + TimestampMode::ReceiveTimeTimestamp => match res_timestamp { + Some((pts, duration, discont)) => (pts, duration, discont), + None => { + if timestamp.is_some() { + gst::warning!(CAT, imp: self, "Can't calculate timestamp"); + } + + (receive_time_gst, duration, false) + } + }, + TimestampMode::Timecode => (timecode, duration, false), + TimestampMode::Timestamp if timestamp.is_none() => (receive_time_gst, duration, false), + TimestampMode::Timestamp => { + // Timestamps are relative to the UNIX epoch + let timestamp = timestamp?; + if receive_time_real > timestamp { + let diff = receive_time_real - timestamp; + if diff > receive_time_gst { + (gst::ClockTime::ZERO, duration, false) + } else { + (receive_time_gst - diff, duration, false) + } + } else { + let diff = timestamp - receive_time_real; + (receive_time_gst + diff, duration, false) + } + } + TimestampMode::ReceiveTime => (receive_time_gst, duration, false), + TimestampMode::Auto => { + res_timecode + .or(res_timestamp) + .unwrap_or((receive_time_gst, duration, false)) + } + }; + + gst::log!( + CAT, + imp: self, + "Calculated PTS {}, duration {}", + pts.display(), + duration.display(), + ); + + Some((pts, duration, discont)) + } + + fn calculate_video_timestamp( + &self, + state: &mut State, + receive_time_gst: gst::ClockTime, + receive_time_real: gst::ClockTime, + video_frame: &crate::ndi::VideoFrame, + ) -> Option<(gst::ClockTime, Option, bool)> { + let duration = gst::ClockTime::SECOND.mul_div_floor( + video_frame.frame_rate().1 as u64, + video_frame.frame_rate().0 as u64, + ); + + self.calculate_timestamp( + state, + false, + receive_time_gst, + receive_time_real, + video_frame.timestamp(), + video_frame.timecode(), + duration, + ) + } + + fn create_video_buffer_pool(&self, video_info: &gst_video::VideoInfo) -> gst::BufferPool { + let pool = gst_video::VideoBufferPool::new(); + let mut config = pool.config(); + config.set_params( + Some(&video_info.to_caps().unwrap()), + video_info.size() as u32, + 0, + 0, + ); + pool.set_config(config).unwrap(); + pool.set_active(true).unwrap(); + + pool.upcast() + } + + fn create_video_info( + &self, + video_frame: &crate::ndi::VideoFrame, + ) -> Result { + let fourcc = video_frame.fourcc(); + + let par = gst::Fraction::approximate_f32(video_frame.picture_aspect_ratio()) + .unwrap_or_else(|| gst::Fraction::new(1, 1)) + * gst::Fraction::new(video_frame.yres(), video_frame.xres()); + let interlace_mode = match video_frame.frame_format_type() { + ndisys::NDIlib_frame_format_type_e::NDIlib_frame_format_type_progressive => { + gst_video::VideoInterlaceMode::Progressive + } + ndisys::NDIlib_frame_format_type_e::NDIlib_frame_format_type_interleaved => { + gst_video::VideoInterlaceMode::Interleaved + } + _ => gst_video::VideoInterlaceMode::Alternate, + }; + + if [ + ndisys::NDIlib_FourCC_video_type_UYVY, + ndisys::NDIlib_FourCC_video_type_UYVA, + ndisys::NDIlib_FourCC_video_type_YV12, + ndisys::NDIlib_FourCC_video_type_NV12, + ndisys::NDIlib_FourCC_video_type_I420, + ndisys::NDIlib_FourCC_video_type_BGRA, + ndisys::NDIlib_FourCC_video_type_BGRX, + ndisys::NDIlib_FourCC_video_type_RGBA, + ndisys::NDIlib_FourCC_video_type_BGRX, + ] + .contains(&fourcc) + { + // YV12 and I420 are swapped in the NDI SDK compared to GStreamer + let format = match video_frame.fourcc() { + ndisys::NDIlib_FourCC_video_type_UYVY => gst_video::VideoFormat::Uyvy, + // FIXME: This drops the alpha plane! + ndisys::NDIlib_FourCC_video_type_UYVA => gst_video::VideoFormat::Uyvy, + ndisys::NDIlib_FourCC_video_type_YV12 => gst_video::VideoFormat::I420, + ndisys::NDIlib_FourCC_video_type_NV12 => gst_video::VideoFormat::Nv12, + ndisys::NDIlib_FourCC_video_type_I420 => gst_video::VideoFormat::Yv12, + ndisys::NDIlib_FourCC_video_type_BGRA => gst_video::VideoFormat::Bgra, + ndisys::NDIlib_FourCC_video_type_BGRX => gst_video::VideoFormat::Bgrx, + ndisys::NDIlib_FourCC_video_type_RGBA => gst_video::VideoFormat::Rgba, + ndisys::NDIlib_FourCC_video_type_RGBX => gst_video::VideoFormat::Rgbx, + _ => { + gst::element_imp_error!( + self, + gst::StreamError::Format, + ["Unsupported video fourcc {:08x}", video_frame.fourcc()] + ); + + return Err(gst::FlowError::NotNegotiated); + } // TODO: NDIlib_FourCC_video_type_P216 and NDIlib_FourCC_video_type_PA16 not + // supported by GStreamer + }; + + let mut builder = gst_video::VideoInfo::builder( + format, + video_frame.xres() as u32, + video_frame.yres() as u32, + ) + .fps(gst::Fraction::from(video_frame.frame_rate())) + .par(par) + .interlace_mode(interlace_mode); + + if video_frame.frame_format_type() + == ndisys::NDIlib_frame_format_type_e::NDIlib_frame_format_type_interleaved + { + builder = builder.field_order(gst_video::VideoFieldOrder::TopFieldFirst); + } + + return Ok(VideoInfo::Video(builder.build().map_err(|_| { + gst::element_imp_error!( + self, + gst::StreamError::Format, + ["Invalid video format configuration"] + ); + + gst::FlowError::NotNegotiated + })?)); + } + + #[cfg(feature = "advanced-sdk")] + if [ + ndisys::NDIlib_FourCC_video_type_ex_SHQ0_highest_bandwidth, + ndisys::NDIlib_FourCC_video_type_ex_SHQ2_highest_bandwidth, + ndisys::NDIlib_FourCC_video_type_ex_SHQ7_highest_bandwidth, + ndisys::NDIlib_FourCC_video_type_ex_SHQ0_lowest_bandwidth, + ndisys::NDIlib_FourCC_video_type_ex_SHQ2_lowest_bandwidth, + ndisys::NDIlib_FourCC_video_type_ex_SHQ7_lowest_bandwidth, + ] + .contains(&fourcc) + { + let variant = match fourcc { + ndisys::NDIlib_FourCC_video_type_ex_SHQ0_highest_bandwidth + | ndisys::NDIlib_FourCC_video_type_ex_SHQ0_lowest_bandwidth => String::from("SHQ0"), + ndisys::NDIlib_FourCC_video_type_ex_SHQ2_highest_bandwidth + | ndisys::NDIlib_FourCC_video_type_ex_SHQ2_lowest_bandwidth => String::from("SHQ2"), + ndisys::NDIlib_FourCC_video_type_ex_SHQ7_highest_bandwidth + | ndisys::NDIlib_FourCC_video_type_ex_SHQ7_lowest_bandwidth => String::from("SHQ7"), + _ => { + gst::element_imp_error!( + self, + gst::StreamError::Format, + [ + "Unsupported SpeedHQ video fourcc {:08x}", + video_frame.fourcc() + ] + ); + + return Err(gst::FlowError::NotNegotiated); + } + }; + + return Ok(VideoInfo::SpeedHQInfo { + variant, + xres: video_frame.xres(), + yres: video_frame.yres(), + fps_n: video_frame.frame_rate().0, + fps_d: video_frame.frame_rate().1, + par_n: par.numer(), + par_d: par.denom(), + interlace_mode, + }); + } + + #[cfg(feature = "advanced-sdk")] + if [ + ndisys::NDIlib_FourCC_video_type_ex_H264_highest_bandwidth, + ndisys::NDIlib_FourCC_video_type_ex_H264_lowest_bandwidth, + ndisys::NDIlib_FourCC_video_type_ex_H264_alpha_highest_bandwidth, + ndisys::NDIlib_FourCC_video_type_ex_H264_alpha_lowest_bandwidth, + ] + .contains(&fourcc) + { + let compressed_packet = video_frame.compressed_packet().ok_or_else(|| { + gst::error!( + CAT, + imp: self, + "Video packet doesn't have compressed packet start" + ); + gst::element_imp_error!(self, gst::StreamError::Format, ["Invalid video packet"]); + + gst::FlowError::Error + })?; + + if compressed_packet.fourcc != ndisys::NDIlib_compressed_FourCC_type_H264 { + gst::error!(CAT, imp: self, "Non-H264 video packet"); + gst::element_imp_error!(self, gst::StreamError::Format, ["Invalid video packet"]); + + return Err(gst::FlowError::Error); + } + + return Ok(VideoInfo::H264 { + xres: video_frame.xres(), + yres: video_frame.yres(), + fps_n: video_frame.frame_rate().0, + fps_d: video_frame.frame_rate().1, + par_n: par.numer(), + par_d: par.denom(), + interlace_mode, + }); + } + + #[cfg(feature = "advanced-sdk")] + if [ + ndisys::NDIlib_FourCC_video_type_ex_HEVC_highest_bandwidth, + ndisys::NDIlib_FourCC_video_type_ex_HEVC_lowest_bandwidth, + ndisys::NDIlib_FourCC_video_type_ex_HEVC_alpha_highest_bandwidth, + ndisys::NDIlib_FourCC_video_type_ex_HEVC_alpha_lowest_bandwidth, + ] + .contains(&fourcc) + { + let compressed_packet = video_frame.compressed_packet().ok_or_else(|| { + gst::error!( + CAT, + imp: self, + "Video packet doesn't have compressed packet start" + ); + gst::element_imp_error!(self, gst::StreamError::Format, ["Invalid video packet"]); + + gst::FlowError::Error + })?; + + if compressed_packet.fourcc != ndisys::NDIlib_compressed_FourCC_type_HEVC { + gst::error!(CAT, imp: self, "Non-H265 video packet"); + gst::element_imp_error!(self, gst::StreamError::Format, ["Invalid video packet"]); + + return Err(gst::FlowError::Error); + } + + return Ok(VideoInfo::H265 { + xres: video_frame.xres(), + yres: video_frame.yres(), + fps_n: video_frame.frame_rate().0, + fps_d: video_frame.frame_rate().1, + par_n: par.numer(), + par_d: par.denom(), + interlace_mode, + }); + } + + gst::element_imp_error!( + self, + gst::StreamError::Format, + ["Unsupported video fourcc {:08x}", video_frame.fourcc()] + ); + Err(gst::FlowError::NotNegotiated) + } + + fn create_video_buffer( + &self, + state: &mut State, + pts: gst::ClockTime, + duration: Option, + discont: bool, + resync: bool, + video_frame: crate::ndi::VideoFrame, + ) -> Result { + let timecode = video_frame.timecode(); + let timestamp = video_frame.timestamp(); + let frame_format_type = video_frame.frame_format_type(); + + let mut captions = Vec::new(); + + { + let ndi_cc_decoder = state.ndi_cc_decoder.as_mut().unwrap(); + // handle potential width change (also needed for standalone metadata) + ndi_cc_decoder.set_width(state.video_info.as_ref().unwrap().width()); + + for metadata in state.pending_metadata.drain(..) { + if let Some(meta) = metadata.metadata() { + let res = ndi_cc_decoder.decode(meta); + if let Err(err) = res { + gst::debug!(CAT, imp: self, "Failed to parse NDI metadata: {err}"); + } + } + } + + if let Some(metadata) = video_frame.metadata() { + let res = ndi_cc_decoder.decode(metadata); + match res { + Ok(c) => { + captions.extend_from_slice(&c); + } + Err(err) => { + gst::debug!(CAT, imp: self, "Failed to parse NDI video frame metadata: {err}"); + } + } + } + } + + let mut buffer = self.wrap_or_copy_video_frame(state, video_frame)?; + { + let buffer = buffer.get_mut().unwrap(); + buffer.set_pts(pts); + buffer.set_duration(duration); + + if resync { + buffer.set_flags(gst::BufferFlags::RESYNC); + } + + if discont { + buffer.set_flags(gst::BufferFlags::DISCONT); + } + + gst::ReferenceTimestampMeta::add( + buffer, + &crate::TIMECODE_CAPS, + (timecode as u64 * 100).nseconds(), + gst::ClockTime::NONE, + ); + if timestamp != ndisys::NDIlib_recv_timestamp_undefined { + gst::ReferenceTimestampMeta::add( + buffer, + &crate::TIMESTAMP_CAPS, + (timestamp as u64 * 100).nseconds(), + gst::ClockTime::NONE, + ); + } + + for caption in captions { + match caption.did16() { + gst_video::VideoAncillaryDID16::S334Eia608 => { + gst_video::VideoCaptionMeta::add( + buffer, + gst_video::VideoCaptionType::Cea608S3341a, + caption.data(), + ); + } + gst_video::VideoAncillaryDID16::S334Eia708 => { + gst_video::VideoCaptionMeta::add( + buffer, + gst_video::VideoCaptionType::Cea708Cdp, + caption.data(), + ); + } + _ => (), + } + } + + match frame_format_type { + ndisys::NDIlib_frame_format_type_e::NDIlib_frame_format_type_interleaved => { + buffer.set_video_flags( + gst_video::VideoBufferFlags::INTERLACED | gst_video::VideoBufferFlags::TFF, + ); + } + ndisys::NDIlib_frame_format_type_e::NDIlib_frame_format_type_field_0 => { + buffer.set_video_flags( + gst_video::VideoBufferFlags::INTERLACED + | gst_video::VideoBufferFlags::TOP_FIELD, + ); + } + ndisys::NDIlib_frame_format_type_e::NDIlib_frame_format_type_field_1 => { + buffer.set_video_flags( + gst_video::VideoBufferFlags::INTERLACED + | gst_video::VideoBufferFlags::BOTTOM_FIELD, + ); + } + _ => (), + } + } + + Ok(buffer) + } + + fn wrap_or_copy_video_frame( + &self, + state: &mut State, + video_frame: crate::ndi::VideoFrame, + ) -> Result { + struct WrappedVideoFrame(crate::ndi::VideoFrame); + + impl AsRef<[u8]> for WrappedVideoFrame { + fn as_ref(&self) -> &[u8] { + self.0.data().unwrap_or(&[]) + } + } + + match state.video_info.as_ref().unwrap() { + VideoInfo::Video(ref info) => { + match info.format() { + gst_video::VideoFormat::Uyvy + | gst_video::VideoFormat::Bgra + | gst_video::VideoFormat::Bgrx + | gst_video::VideoFormat::Rgba + | gst_video::VideoFormat::Rgbx => { + let src_stride = video_frame.line_stride_or_data_size_in_bytes() as usize; + + if src_stride == info.stride()[0] as usize { + Ok(gst::Buffer::from_slice(WrappedVideoFrame(video_frame))) + } else { + gst::debug!(gst::CAT_PERFORMANCE, imp: self, "Copying raw video frame"); + + let src = video_frame.data().ok_or(gst::FlowError::Error)?; + + if state.video_buffer_pool.is_none() { + state.video_buffer_pool = Some(self.create_video_buffer_pool(info)); + }; + let pool = state.video_buffer_pool.as_ref().unwrap(); + let buffer = pool.acquire_buffer(None)?; + + let mut vframe = + gst_video::VideoFrame::from_buffer_writable(buffer, info).unwrap(); + + let line_bytes = if info.format() == gst_video::VideoFormat::Uyvy { + 2 * vframe.width() as usize + } else { + 4 * vframe.width() as usize + }; + + let dest_stride = vframe.plane_stride()[0] as usize; + let dest = vframe.plane_data_mut(0).unwrap(); + let plane_size = video_frame.yres() as usize * src_stride; + + if src.len() < plane_size || src_stride < line_bytes { + gst::error!(CAT, imp: self, "Video packet has wrong stride or size"); + gst::element_imp_error!( + self, + gst::StreamError::Format, + ["Video packet has wrong stride or size"] + ); + return Err(gst::FlowError::Error); + } + + for (dest, src) in dest + .chunks_exact_mut(dest_stride) + .zip(src.chunks_exact(src_stride)) + { + dest[..line_bytes].copy_from_slice(&src[..line_bytes]); + } + + Ok(vframe.into_buffer()) + } + } + gst_video::VideoFormat::Nv12 => { + let src_stride = video_frame.line_stride_or_data_size_in_bytes() as usize; + + if src_stride == info.stride()[0] as usize { + Ok(gst::Buffer::from_slice(WrappedVideoFrame(video_frame))) + } else { + gst::debug!(gst::CAT_PERFORMANCE, imp: self, "Copying raw video frame"); + + let src = video_frame.data().ok_or(gst::FlowError::Error)?; + + if state.video_buffer_pool.is_none() { + state.video_buffer_pool = Some(self.create_video_buffer_pool(info)); + }; + let pool = state.video_buffer_pool.as_ref().unwrap(); + let buffer = pool.acquire_buffer(None)?; + + let mut vframe = + gst_video::VideoFrame::from_buffer_writable(buffer, info).unwrap(); + + let line_bytes = vframe.width() as usize; + let plane_size = video_frame.yres() as usize * src_stride; + + if src.len() < 2 * plane_size || src_stride < line_bytes { + gst::error!(CAT, imp: self, "Video packet has wrong stride or size"); + gst::element_imp_error!( + self, + gst::StreamError::Format, + ["Video packet has wrong stride or size"] + ); + return Err(gst::FlowError::Error); + } + + // First plane + { + let dest_stride = vframe.plane_stride()[0] as usize; + let dest = vframe.plane_data_mut(0).unwrap(); + let src = &src[..plane_size]; + + for (dest, src) in dest + .chunks_exact_mut(dest_stride) + .zip(src.chunks_exact(src_stride)) + { + dest[..line_bytes].copy_from_slice(&src[..line_bytes]); + } + } + + // Second plane + { + let dest_stride = vframe.plane_stride()[1] as usize; + let dest = vframe.plane_data_mut(1).unwrap(); + let src = &src[plane_size..]; + + for (dest, src) in dest + .chunks_exact_mut(dest_stride) + .zip(src.chunks_exact(src_stride)) + { + dest[..line_bytes].copy_from_slice(&src[..line_bytes]); + } + } + + Ok(vframe.into_buffer()) + } + } + gst_video::VideoFormat::Yv12 | gst_video::VideoFormat::I420 => { + let src_stride = video_frame.line_stride_or_data_size_in_bytes() as usize; + let src_stride1 = (src_stride + 1) / 2; + + if src_stride == info.stride()[0] as usize + && src_stride1 == info.stride()[1] as usize + { + Ok(gst::Buffer::from_slice(WrappedVideoFrame(video_frame))) + } else { + gst::debug!(gst::CAT_PERFORMANCE, imp: self, "Copying raw video frame"); + + let src = video_frame.data().ok_or(gst::FlowError::Error)?; + + if state.video_buffer_pool.is_none() { + state.video_buffer_pool = Some(self.create_video_buffer_pool(info)); + }; + let pool = state.video_buffer_pool.as_ref().unwrap(); + let buffer = pool.acquire_buffer(None)?; + + let mut vframe = + gst_video::VideoFrame::from_buffer_writable(buffer, info).unwrap(); + + let line_bytes = vframe.width() as usize; + let line_bytes1 = (line_bytes + 1) / 2; + + let plane_size = video_frame.yres() as usize * src_stride; + let plane_size1 = ((video_frame.yres() as usize + 1) / 2) * src_stride1; + + if src.len() < plane_size + 2 * plane_size1 || src_stride < line_bytes { + gst::error!(CAT, imp: self, "Video packet has wrong stride or size"); + gst::element_imp_error!( + self, + gst::StreamError::Format, + ["Video packet has wrong stride or size"] + ); + return Err(gst::FlowError::Error); + } + + // First plane + { + let dest_stride = vframe.plane_stride()[0] as usize; + let dest = vframe.plane_data_mut(0).unwrap(); + let src = &src[..plane_size]; + + for (dest, src) in dest + .chunks_exact_mut(dest_stride) + .zip(src.chunks_exact(src_stride)) + { + dest[..line_bytes].copy_from_slice(&src[..line_bytes]); + } + } + + // Second plane + { + let dest_stride = vframe.plane_stride()[1] as usize; + let dest = vframe.plane_data_mut(1).unwrap(); + let src = &src[plane_size..][..plane_size1]; + + for (dest, src) in dest + .chunks_exact_mut(dest_stride) + .zip(src.chunks_exact(src_stride1)) + { + dest[..line_bytes1].copy_from_slice(&src[..line_bytes1]); + } + } + + // Third plane + { + let dest_stride = vframe.plane_stride()[2] as usize; + let dest = vframe.plane_data_mut(2).unwrap(); + let src = &src[plane_size + plane_size1..][..plane_size1]; + + for (dest, src) in dest + .chunks_exact_mut(dest_stride) + .zip(src.chunks_exact(src_stride1)) + { + dest[..line_bytes1].copy_from_slice(&src[..line_bytes1]); + } + } + + Ok(vframe.into_buffer()) + } + } + _ => unreachable!(), + } + } + #[cfg(feature = "advanced-sdk")] + VideoInfo::SpeedHQInfo { .. } => { + Ok(gst::Buffer::from_slice(WrappedVideoFrame(video_frame))) + } + #[cfg(feature = "advanced-sdk")] + VideoInfo::H264 { .. } | VideoInfo::H265 { .. } => { + let compressed_packet = video_frame.compressed_packet().ok_or_else(|| { + gst::error!( + CAT, + imp: self, + "Video packet doesn't have compressed packet start" + ); + gst::element_imp_error!( + self, + gst::StreamError::Format, + ["Invalid video packet"] + ); + + gst::FlowError::Error + })?; + + // FIXME: Copy to a new vec for now. This can be optimized, especially if there is + // no extra data attached to the frame + let mut buffer = Vec::new(); + if let Some(extra_data) = compressed_packet.extra_data { + buffer.extend_from_slice(extra_data); + } + buffer.extend_from_slice(compressed_packet.data); + let mut buffer = gst::Buffer::from_mut_slice(buffer); + if !compressed_packet.key_frame { + let buffer = buffer.get_mut().unwrap(); + buffer.set_flags(gst::BufferFlags::DELTA_UNIT); + } + + Ok(buffer) + } + } + } + + fn calculate_audio_timestamp( + &self, + state: &mut State, + receive_time_gst: gst::ClockTime, + receive_time_real: gst::ClockTime, + audio_frame: &crate::ndi::AudioFrame, + ) -> Option<(gst::ClockTime, Option, bool)> { + let duration = gst::ClockTime::SECOND.mul_div_floor( + audio_frame.no_samples() as u64, + audio_frame.sample_rate() as u64, + ); + + self.calculate_timestamp( + state, + true, + receive_time_gst, + receive_time_real, + audio_frame.timestamp(), + audio_frame.timecode(), + duration, + ) + } + + fn create_audio_info( + &self, + audio_frame: &crate::ndi::AudioFrame, + ) -> Result { + let fourcc = audio_frame.fourcc(); + + if [ndisys::NDIlib_FourCC_audio_type_FLTp].contains(&fourcc) { + let channels = audio_frame.no_channels() as u32; + let mut positions = [gst_audio::AudioChannelPosition::None; 64]; + if channels <= 8 { + let _ = gst_audio::AudioChannelPosition::positions_from_mask( + gst_audio::AudioChannelPosition::fallback_mask(channels), + &mut positions[..channels as usize], + ); + } + + let builder = gst_audio::AudioInfo::builder( + gst_audio::AUDIO_FORMAT_F32, + audio_frame.sample_rate() as u32, + channels, + ) + .positions(&positions[..channels as usize]); + + let info = builder.build().map_err(|_| { + gst::element_imp_error!( + self, + gst::StreamError::Format, + ["Invalid audio format configuration"] + ); + + gst::FlowError::NotNegotiated + })?; + + return Ok(AudioInfo::Audio(info)); + } + + #[cfg(feature = "advanced-sdk")] + if [ndisys::NDIlib_FourCC_audio_type_AAC].contains(&fourcc) { + let compressed_packet = audio_frame.compressed_packet().ok_or_else(|| { + gst::error!( + CAT, + imp: self, + "Audio packet doesn't have compressed packet start" + ); + gst::element_imp_error!(self, gst::StreamError::Format, ["Invalid audio packet"]); + + gst::FlowError::Error + })?; + + if compressed_packet.fourcc != ndisys::NDIlib_compressed_FourCC_type_AAC { + gst::error!(CAT, imp: self, "Non-AAC audio packet"); + gst::element_imp_error!(self, gst::StreamError::Format, ["Invalid audio packet"]); + + return Err(gst::FlowError::Error); + } + + return Ok(AudioInfo::Aac { + sample_rate: audio_frame.sample_rate(), + no_channels: audio_frame.no_channels(), + codec_data: compressed_packet + .extra_data + .ok_or(gst::FlowError::NotNegotiated)? + .try_into() + .map_err(|_| gst::FlowError::NotNegotiated)?, + }); + } + + // FIXME: Needs testing with an actual stream to understand how it works + // #[cfg(feature = "advanced-sdk")] + // if [NDIlib_FourCC_audio_type_Opus].contains(&fourcc) {} + + gst::element_imp_error!( + self, + gst::StreamError::Format, + ["Unsupported audio fourcc {:08x}", audio_frame.fourcc()] + ); + Err(gst::FlowError::NotNegotiated) + } + + fn create_audio_buffer( + &self, + state: &State, + pts: gst::ClockTime, + duration: Option, + discont: bool, + resync: bool, + audio_frame: crate::ndi::AudioFrame, + ) -> Result { + struct WrappedAudioFrame(crate::ndi::AudioFrame); + + impl AsRef<[u8]> for WrappedAudioFrame { + fn as_ref(&self) -> &[u8] { + self.0.data().unwrap_or(&[]) + } + } + + match state.audio_info.as_ref().unwrap() { + AudioInfo::Audio(ref info) => { + let no_samples = audio_frame.no_samples(); + let timecode = audio_frame.timecode(); + let timestamp = audio_frame.timestamp(); + let buff_size = (no_samples as u32 * info.bpf()) as usize; + + let mut buffer = if state.audio_non_interleaved { + let info = state.audio_info_non_interleaved.as_ref().unwrap(); + let mut buffer = gst::Buffer::from_slice(WrappedAudioFrame(audio_frame)); + + { + let buffer = buffer.get_mut().unwrap(); + + gst_audio::AudioMeta::add(buffer, info, no_samples as usize, &[]).unwrap(); + } + + buffer + } else { + gst::debug!(gst::CAT_PERFORMANCE, imp: self, "Copying raw audio frame"); + + let src = audio_frame.data().ok_or(gst::FlowError::Error)?; + let mut buffer = gst::Buffer::with_size(buff_size).unwrap(); + + { + let buffer = buffer.get_mut().unwrap(); + let mut dest = buffer.map_writable().unwrap(); + let dest = dest + .as_mut_slice_of::() + .map_err(|_| gst::FlowError::NotNegotiated)?; + assert!( + dest.len() + == audio_frame.no_samples() as usize + * audio_frame.no_channels() as usize + ); + + for (channel, samples) in src + .chunks_exact( + audio_frame.channel_stride_or_data_size_in_bytes() as usize + ) + .enumerate() + { + let samples = samples + .as_slice_of::() + .map_err(|_| gst::FlowError::NotNegotiated)?; + + for (i, sample) in samples[..audio_frame.no_samples() as usize] + .iter() + .enumerate() + { + dest[i * (audio_frame.no_channels() as usize) + channel] = *sample; + } + } + } + + buffer + }; + + { + let buffer = buffer.get_mut().unwrap(); + + buffer.set_pts(pts); + buffer.set_duration(duration); + + if resync { + buffer.set_flags(gst::BufferFlags::RESYNC); + } + + if discont { + buffer.set_flags(gst::BufferFlags::DISCONT); + } + + gst::ReferenceTimestampMeta::add( + buffer, + &crate::TIMECODE_CAPS, + (timecode as u64 * 100).nseconds(), + gst::ClockTime::NONE, + ); + if timestamp != ndisys::NDIlib_recv_timestamp_undefined { + gst::ReferenceTimestampMeta::add( + buffer, + &crate::TIMESTAMP_CAPS, + (timestamp as u64 * 100).nseconds(), + gst::ClockTime::NONE, + ); + } + } + + Ok(buffer) + } + #[cfg(feature = "advanced-sdk")] + AudioInfo::Opus { .. } => { + let data = audio_frame.data().ok_or_else(|| { + gst::error!(CAT, imp: self, "Audio packet has no data"); + gst::element_imp_error!( + self, + gst::StreamError::Format, + ["Invalid audio packet"] + ); + + gst::FlowError::Error + })?; + + Ok(gst::Buffer::from_mut_slice(Vec::from(data))) + } + #[cfg(feature = "advanced-sdk")] + AudioInfo::Aac { .. } => { + let compressed_packet = audio_frame.compressed_packet().ok_or_else(|| { + gst::error!( + CAT, + imp: self, + "Audio packet doesn't have compressed packet start" + ); + gst::element_imp_error!( + self, + gst::StreamError::Format, + ["Invalid audio packet"] + ); + + gst::FlowError::Error + })?; + + Ok(gst::Buffer::from_mut_slice(Vec::from( + compressed_packet.data, + ))) + } + } + } +} + +#[derive(Debug, PartialEq, Eq)] +#[allow(clippy::large_enum_variant)] +pub enum AudioInfo { + Audio(gst_audio::AudioInfo), + #[cfg(feature = "advanced-sdk")] + #[allow(dead_code)] + Opus { + sample_rate: i32, + no_channels: i32, + }, + #[cfg(feature = "advanced-sdk")] + Aac { + sample_rate: i32, + no_channels: i32, + codec_data: [u8; 2], + }, +} + +impl AudioInfo { + pub fn to_caps(&self) -> Result { + match self { + AudioInfo::Audio(ref info) => info.to_caps(), + #[cfg(feature = "advanced-sdk")] + AudioInfo::Opus { + sample_rate, + no_channels, + } => Ok(gst::Caps::builder("audio/x-opus") + .field("channels", *no_channels) + .field("rate", *sample_rate) + .field("channel-mapping-family", 0i32) + .build()), + #[cfg(feature = "advanced-sdk")] + AudioInfo::Aac { + sample_rate, + no_channels, + codec_data, + } => Ok(gst::Caps::builder("audio/mpeg") + .field("channels", *no_channels) + .field("rate", *sample_rate) + .field("mpegversion", 4i32) + .field("stream-format", "raw") + .field("codec_data", gst::Buffer::from_mut_slice(*codec_data)) + .build()), + } + } +} + +#[derive(Debug, PartialEq, Eq)] +pub enum VideoInfo { + Video(gst_video::VideoInfo), + #[cfg(feature = "advanced-sdk")] + SpeedHQInfo { + variant: String, + xres: i32, + yres: i32, + fps_n: i32, + fps_d: i32, + par_n: i32, + par_d: i32, + interlace_mode: gst_video::VideoInterlaceMode, + }, + #[cfg(feature = "advanced-sdk")] + H264 { + xres: i32, + yres: i32, + fps_n: i32, + fps_d: i32, + par_n: i32, + par_d: i32, + interlace_mode: gst_video::VideoInterlaceMode, + }, + #[cfg(feature = "advanced-sdk")] + H265 { + xres: i32, + yres: i32, + fps_n: i32, + fps_d: i32, + par_n: i32, + par_d: i32, + interlace_mode: gst_video::VideoInterlaceMode, + }, +} + +impl VideoInfo { + pub fn to_caps(&self) -> Result { + match self { + VideoInfo::Video(ref info) => info.to_caps(), + #[cfg(feature = "advanced-sdk")] + VideoInfo::SpeedHQInfo { + ref variant, + xres, + yres, + fps_n, + fps_d, + par_n, + par_d, + interlace_mode, + } => Ok(gst::Caps::builder("video/x-speedhq") + .field("width", *xres) + .field("height", *yres) + .field("framerate", gst::Fraction::new(*fps_n, *fps_d)) + .field("pixel-aspect-ratio", gst::Fraction::new(*par_n, *par_d)) + .field("interlace-mode", interlace_mode.to_str()) + .field("variant", variant) + .build()), + #[cfg(feature = "advanced-sdk")] + VideoInfo::H264 { + xres, + yres, + fps_n, + fps_d, + par_n, + par_d, + interlace_mode, + .. + } => Ok(gst::Caps::builder("video/x-h264") + .field("width", *xres) + .field("height", *yres) + .field("framerate", gst::Fraction::new(*fps_n, *fps_d)) + .field("pixel-aspect-ratio", gst::Fraction::new(*par_n, *par_d)) + .field("interlace-mode", interlace_mode.to_str()) + .field("stream-format", "byte-stream") + .field("alignment", "au") + .build()), + #[cfg(feature = "advanced-sdk")] + VideoInfo::H265 { + xres, + yres, + fps_n, + fps_d, + par_n, + par_d, + interlace_mode, + .. + } => Ok(gst::Caps::builder("video/x-h265") + .field("width", *xres) + .field("height", *yres) + .field("framerate", gst::Fraction::new(*fps_n, *fps_d)) + .field("pixel-aspect-ratio", gst::Fraction::new(*par_n, *par_d)) + .field("interlace-mode", interlace_mode.to_str()) + .field("stream-format", "byte-stream") + .field("alignment", "au") + .build()), + } + } + + pub fn width(&self) -> u32 { + match self { + VideoInfo::Video(ref info) => info.width(), + #[cfg(feature = "advanced-sdk")] + VideoInfo::SpeedHQInfo { xres, .. } + | VideoInfo::H264 { xres, .. } + | VideoInfo::H265 { xres, .. } => *xres as u32, + } + } +} + +const PREFILL_WINDOW_LENGTH: usize = 12; +const WINDOW_LENGTH: u64 = 512; +const WINDOW_DURATION: u64 = 2_000_000_000; + +#[derive(Default)] +struct Observations(AtomicRefCell); + +struct ObservationsInner { + base_remote_time: Option, + base_local_time: Option, + deltas: VecDeque, + min_delta: i64, + skew: i64, + filling: bool, + window_size: usize, + + // Remote/local times for workaround around fundamentally wrong slopes + // This is not reset below and has a bigger window. + times: VecDeque<(u64, u64)>, + slope_correction: (u64, u64), +} + +impl Default for ObservationsInner { + fn default() -> ObservationsInner { + ObservationsInner { + base_local_time: None, + base_remote_time: None, + deltas: VecDeque::new(), + min_delta: 0, + skew: 0, + filling: true, + window_size: 0, + times: VecDeque::new(), + slope_correction: (1, 1), + } + } +} + +impl ObservationsInner { + fn reset(&mut self) { + self.base_local_time = None; + self.base_remote_time = None; + self.deltas = VecDeque::new(); + self.min_delta = 0; + self.skew = 0; + self.filling = true; + self.window_size = 0; + } +} + +impl Observations { + // Based on the algorithm used in GStreamer's rtpjitterbuffer, which comes from + // Fober, Orlarey and Letz, 2005, "Real Time Clock Skew Estimation over Network Delays": + // http://citeseerx.ist.psu.edu/viewdoc/summary?doi=10.1.1.102.1546 + fn process( + &self, + element: &gst::Element, + remote_time: Option, + local_time: gst::ClockTime, + duration: Option, + ) -> Option<(gst::ClockTime, Option, bool)> { + let remote_time = remote_time?.nseconds(); + let local_time = local_time.nseconds(); + + let mut inner = self.0.borrow_mut(); + + gst::trace!( + CAT, + obj: element, + "Local time {}, remote time {}, slope correct {}/{}", + local_time.nseconds(), + remote_time.nseconds(), + inner.slope_correction.0, + inner.slope_correction.1, + ); + + inner.times.push_back((remote_time, local_time)); + while inner + .times + .back() + .unwrap() + .1 + .saturating_sub(inner.times.front().unwrap().1) + > WINDOW_DURATION + { + let _ = inner.times.pop_front(); + } + + // Static remote times + if inner.slope_correction.1 == 0 { + return None; + } + + let remote_time = + remote_time.mul_div_round(inner.slope_correction.0, inner.slope_correction.1)?; + + let (base_remote_time, base_local_time) = + match (inner.base_remote_time, inner.base_local_time) { + (Some(remote), Some(local)) => (remote, local), + _ => { + gst::debug!( + CAT, + obj: element, + "Initializing base time: local {}, remote {}", + local_time.nseconds(), + remote_time.nseconds(), + ); + inner.base_remote_time = Some(remote_time); + inner.base_local_time = Some(local_time); + + return Some((local_time.nseconds(), duration, true)); + } + }; + + if inner.times.len() < PREFILL_WINDOW_LENGTH { + return Some((local_time.nseconds(), duration, false)); + } + + // Check if the slope is simply wrong and try correcting + { + let local_diff = inner + .times + .back() + .unwrap() + .1 + .saturating_sub(inner.times.front().unwrap().1); + let remote_diff = inner + .times + .back() + .unwrap() + .0 + .saturating_sub(inner.times.front().unwrap().0); + + if remote_diff == 0 { + inner.reset(); + inner.base_remote_time = Some(remote_time); + inner.base_local_time = Some(local_time); + + // Static remote times + inner.slope_correction = (0, 0); + return None; + } else { + let slope = local_diff as f64 / remote_diff as f64; + let scaled_slope = + slope * (inner.slope_correction.1 as f64) / (inner.slope_correction.0 as f64); + + // Check for some obviously wrong slopes and try to correct for that + if !(0.5..1.5).contains(&scaled_slope) { + gst::warning!( + CAT, + obj: element, + "Too small/big slope {}, resetting", + scaled_slope + ); + + let discont = !inner.deltas.is_empty(); + inner.reset(); + + if (0.0005..0.0015).contains(&slope) { + // Remote unit was actually 0.1ns + inner.slope_correction = (1, 1000); + } else if (0.005..0.015).contains(&slope) { + // Remote unit was actually 1ns + inner.slope_correction = (1, 100); + } else if (0.05..0.15).contains(&slope) { + // Remote unit was actually 10ns + inner.slope_correction = (1, 10); + } else if (5.0..15.0).contains(&slope) { + // Remote unit was actually 1us + inner.slope_correction = (10, 1); + } else if (50.0..150.0).contains(&slope) { + // Remote unit was actually 10us + inner.slope_correction = (100, 1); + } else if (50.0..150.0).contains(&slope) { + // Remote unit was actually 100us + inner.slope_correction = (1000, 1); + } else if (50.0..150.0).contains(&slope) { + // Remote unit was actually 1ms + inner.slope_correction = (10000, 1); + } else { + inner.slope_correction = (1, 1); + } + + let remote_time = inner + .times + .back() + .unwrap() + .0 + .mul_div_round(inner.slope_correction.0, inner.slope_correction.1)?; + gst::debug!( + CAT, + obj: element, + "Initializing base time: local {}, remote {}, slope correction {}/{}", + local_time.nseconds(), + remote_time.nseconds(), + inner.slope_correction.0, + inner.slope_correction.1, + ); + inner.base_remote_time = Some(remote_time); + inner.base_local_time = Some(local_time); + + return Some((local_time.nseconds(), duration, discont)); + } + } + } + + let remote_diff = remote_time.saturating_sub(base_remote_time); + let local_diff = local_time.saturating_sub(base_local_time); + let delta = (local_diff as i64) - (remote_diff as i64); + + gst::trace!( + CAT, + obj: element, + "Local diff {}, remote diff {}, delta {}", + local_diff.nseconds(), + remote_diff.nseconds(), + delta, + ); + + if (delta > inner.skew && delta - inner.skew > 1_000_000_000) + || (delta < inner.skew && inner.skew - delta > 1_000_000_000) + { + gst::warning!( + CAT, + obj: element, + "Delta {} too far from skew {}, resetting", + delta, + inner.skew + ); + + let discont = !inner.deltas.is_empty(); + + gst::debug!( + CAT, + obj: element, + "Initializing base time: local {}, remote {}", + local_time.nseconds(), + remote_time.nseconds(), + ); + + inner.reset(); + inner.base_remote_time = Some(remote_time); + inner.base_local_time = Some(local_time); + + return Some((local_time.nseconds(), duration, discont)); + } + + if inner.filling { + if inner.deltas.is_empty() || delta < inner.min_delta { + inner.min_delta = delta; + } + inner.deltas.push_back(delta); + + if remote_diff > WINDOW_DURATION || inner.deltas.len() as u64 == WINDOW_LENGTH { + inner.window_size = inner.deltas.len(); + inner.skew = inner.min_delta; + inner.filling = false; + } else { + let perc_time = remote_diff.mul_div_floor(100, WINDOW_DURATION).unwrap() as i64; + let perc_window = (inner.deltas.len() as u64) + .mul_div_floor(100, WINDOW_LENGTH) + .unwrap() as i64; + let perc = cmp::max(perc_time, perc_window); + + inner.skew = (perc * inner.min_delta + ((10_000 - perc) * inner.skew)) / 10_000; + } + } else { + let old = inner.deltas.pop_front().unwrap(); + inner.deltas.push_back(delta); + + if delta <= inner.min_delta { + inner.min_delta = delta; + } else if old == inner.min_delta { + inner.min_delta = inner.deltas.iter().copied().min().unwrap(); + } + + inner.skew = (inner.min_delta + (124 * inner.skew)) / 125; + } + + let out_time = base_local_time + remote_diff; + let out_time = if inner.skew < 0 { + out_time.saturating_sub((-inner.skew) as u64) + } else { + out_time + (inner.skew as u64) + }; + + gst::trace!( + CAT, + obj: element, + "Skew {}, min delta {}", + inner.skew, + inner.min_delta + ); + gst::trace!(CAT, obj: element, "Outputting {}", out_time.nseconds()); + + Some((out_time.nseconds(), duration, false)) + } +} diff --git a/net/ndi/src/ndisrcmeta.rs b/net/ndi/src/ndisrcmeta.rs index 6f4e053d..c6d3a7ac 100644 --- a/net/ndi/src/ndisrcmeta.rs +++ b/net/ndi/src/ndisrcmeta.rs @@ -4,30 +4,49 @@ use gst::prelude::*; use std::fmt; use std::mem; +use crate::ndi::{AudioFrame, MetadataFrame, VideoFrame}; +use crate::TimestampMode; + #[repr(transparent)] pub struct NdiSrcMeta(imp::NdiSrcMeta); -#[derive(Copy, Clone, Debug, PartialEq, Eq)] -pub enum StreamType { - Audio, - Video, +#[derive(Debug)] +#[allow(clippy::large_enum_variant)] +pub enum Buffer { + Audio { + frame: AudioFrame, + discont: bool, + receive_time_gst: gst::ClockTime, + receive_time_real: gst::ClockTime, + }, + Video { + frame: VideoFrame, + discont: bool, + receive_time_gst: gst::ClockTime, + receive_time_real: gst::ClockTime, + }, + Metadata { + frame: MetadataFrame, + receive_time_gst: gst::ClockTime, + receive_time_real: gst::ClockTime, + }, } unsafe impl Send for NdiSrcMeta {} unsafe impl Sync for NdiSrcMeta {} impl NdiSrcMeta { - pub fn add<'a>( - buffer: &'a mut gst::BufferRef, - stream_type: StreamType, - caps: &gst::Caps, - ) -> gst::MetaRefMut<'a, Self, gst::meta::Standalone> { + pub fn add( + buffer: &mut gst::BufferRef, + ndi_buffer: Buffer, + timestamp_mode: TimestampMode, + ) -> gst::MetaRefMut { unsafe { // Manually dropping because gst_buffer_add_meta() takes ownership of the // content of the struct let mut params = mem::ManuallyDrop::new(imp::NdiSrcMetaParams { - caps: caps.clone(), - stream_type, + ndi_buffer, + timestamp_mode, }); let meta = gst::ffi::gst_buffer_add_meta( @@ -40,12 +59,8 @@ impl NdiSrcMeta { } } - pub fn stream_type(&self) -> StreamType { - self.0.stream_type - } - - pub fn caps(&self) -> gst::Caps { - self.0.caps.clone() + pub fn take_ndi_buffer(&mut self) -> Buffer { + self.0.ndi_buffer.take().expect("can only take buffer once") } } @@ -60,29 +75,30 @@ unsafe impl MetaAPI for NdiSrcMeta { impl fmt::Debug for NdiSrcMeta { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { f.debug_struct("NdiSrcMeta") - .field("stream_type", &self.stream_type()) - .field("caps", &self.caps()) + .field("ndi_buffer", &self.0.ndi_buffer) .finish() } } mod imp { - use super::StreamType; + use crate::TimestampMode; + + use super::Buffer; use glib::translate::*; use gst::glib::once_cell::sync::Lazy; use std::mem; use std::ptr; pub(super) struct NdiSrcMetaParams { - pub caps: gst::Caps, - pub stream_type: StreamType, + pub ndi_buffer: Buffer, + pub timestamp_mode: TimestampMode, } #[repr(C)] pub struct NdiSrcMeta { parent: gst::ffi::GstMeta, - pub(super) caps: gst::Caps, - pub(super) stream_type: StreamType, + pub(super) ndi_buffer: Option, + pub(super) timestamp_mode: TimestampMode, } pub(super) fn ndi_src_meta_api_get_type() -> glib::Type { @@ -110,8 +126,8 @@ mod imp { let meta = &mut *(meta as *mut NdiSrcMeta); let params = ptr::read(params as *const NdiSrcMetaParams); - ptr::write(&mut meta.stream_type, params.stream_type); - ptr::write(&mut meta.caps, params.caps); + ptr::write(&mut meta.ndi_buffer, Some(params.ndi_buffer)); + ptr::write(&mut meta.timestamp_mode, params.timestamp_mode); true.into_glib() } @@ -122,8 +138,7 @@ mod imp { ) { let meta = &mut *(meta as *mut NdiSrcMeta); - ptr::drop_in_place(&mut meta.stream_type); - ptr::drop_in_place(&mut meta.caps); + ptr::drop_in_place(&mut meta.ndi_buffer); } unsafe extern "C" fn ndi_src_meta_transform(