diff --git a/src/lib.rs b/src/lib.rs index 739cda41..272d2fb1 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -60,7 +60,12 @@ lazy_static! { static mut id_receiver: i8 = 0; -fn connect_ndi(cat: gst::DebugCategory, element: &gst_base::BaseSrc, ip: &str, stream_name: &str) -> i8 { +fn connect_ndi( + cat: gst::DebugCategory, + element: &gst_base::BaseSrc, + ip: &str, + stream_name: &str, +) -> i8 { gst_debug!(cat, obj: element, "Starting NDI connection..."); let mut receivers = hashmap_receivers.lock().unwrap(); diff --git a/src/ndiaudiosrc.rs b/src/ndiaudiosrc.rs index 67b897af..9c2ebcfa 100644 --- a/src/ndiaudiosrc.rs +++ b/src/ndiaudiosrc.rs @@ -21,8 +21,8 @@ use ndi_struct; use ndisys::*; use stop_ndi; -use hashmap_receivers; use byte_slice_cast::AsMutSliceOf; +use hashmap_receivers; #[derive(Debug, Clone)] struct Settings { @@ -46,35 +46,35 @@ impl Default for Settings { } static PROPERTIES: [subclass::Property; 3] = [ -subclass::Property("stream-name", |_| { - glib::ParamSpec::string( - "stream-name", - "Sream Name", - "Name of the streaming device", - None, - glib::ParamFlags::READWRITE, - ) -}), -subclass::Property("ip", |_| { - glib::ParamSpec::string( - "ip", - "Stream IP", - "IP of the streaming device. Ex: 127.0.0.1:5961", - None, - glib::ParamFlags::READWRITE, - ) -}), -subclass::Property("loss-threshold", |_| { - glib::ParamSpec::uint( - "loss-threshold", - "Loss threshold", - "Loss threshold", - 0, - 60, - 5, - glib::ParamFlags::READWRITE, - ) -}), + subclass::Property("stream-name", |_| { + glib::ParamSpec::string( + "stream-name", + "Sream Name", + "Name of the streaming device", + None, + glib::ParamFlags::READWRITE, + ) + }), + subclass::Property("ip", |_| { + glib::ParamSpec::string( + "ip", + "Stream IP", + "IP of the streaming device. Ex: 127.0.0.1:5961", + None, + glib::ParamFlags::READWRITE, + ) + }), + subclass::Property("loss-threshold", |_| { + glib::ParamSpec::uint( + "loss-threshold", + "Loss threshold", + "Loss threshold", + 0, + 60, + 5, + glib::ParamFlags::READWRITE, + ) + }), ]; struct State { @@ -99,7 +99,6 @@ struct NdiAudioSrc { } impl ObjectSubclass for NdiAudioSrc { - const NAME: &'static str = "NdiAudioSrc"; type ParentType = gst_base::BaseSrc; type Instance = gst::subclass::ElementInstanceStruct; @@ -116,7 +115,7 @@ impl ObjectSubclass for NdiAudioSrc { ), settings: Mutex::new(Default::default()), state: Mutex::new(Default::default()), - timestamp_data: Mutex::new(TimestampData { offset: 0}), + timestamp_data: Mutex::new(TimestampData { offset: 0 }), } } @@ -131,361 +130,419 @@ impl ObjectSubclass for NdiAudioSrc { let caps = gst::Caps::new_simple( "audio/x-raw", &[ - ( - "format", - &gst::List::new(&[ - //TODO add more formats? - //&gst_audio::AUDIO_FORMAT_F32.to_string(), - //&gst_audio::AUDIO_FORMAT_F64.to_string(), - &gst_audio::AUDIO_FORMAT_S16.to_string(), + ( + "format", + &gst::List::new(&[ + //TODO add more formats? + //&gst_audio::AUDIO_FORMAT_F32.to_string(), + //&gst_audio::AUDIO_FORMAT_F64.to_string(), + &gst_audio::AUDIO_FORMAT_S16.to_string(), ]), ), ("rate", &gst::IntRange::::new(1, i32::MAX)), ("channels", &gst::IntRange::::new(1, i32::MAX)), ("layout", &"interleaved"), ("channel-mask", &gst::Bitmask::new(0)), - ], - ); + ], + ); - let src_pad_template = gst::PadTemplate::new( - "src", - gst::PadDirection::Src, - gst::PadPresence::Always, - &caps, - ) - .unwrap(); - klass.add_pad_template(src_pad_template); + let src_pad_template = gst::PadTemplate::new( + "src", + gst::PadDirection::Src, + gst::PadPresence::Always, + &caps, + ) + .unwrap(); + klass.add_pad_template(src_pad_template); - klass.install_properties(&PROPERTIES); + klass.install_properties(&PROPERTIES); + } +} + +impl ObjectImpl for NdiAudioSrc { + glib_object_impl!(); + + fn constructed(&self, obj: &glib::Object) { + self.parent_constructed(obj); + + let basesrc = obj.downcast_ref::().unwrap(); + // Initialize live-ness and notify the base class that + // we'd like to operate in Time format + basesrc.set_live(true); + basesrc.set_format(gst::Format::Time); + } + + fn set_property(&self, obj: &glib::Object, id: usize, value: &glib::Value) { + let prop = &PROPERTIES[id]; + let basesrc = obj.downcast_ref::().unwrap(); + + match *prop { + subclass::Property("stream-name", ..) => { + let mut settings = self.settings.lock().unwrap(); + let stream_name = value.get().unwrap(); + gst_debug!( + self.cat, + obj: basesrc, + "Changing stream-name from {} to {}", + settings.stream_name, + stream_name + ); + settings.stream_name = stream_name; + drop(settings); + } + subclass::Property("ip", ..) => { + let mut settings = self.settings.lock().unwrap(); + let ip = value.get().unwrap(); + gst_debug!( + self.cat, + obj: basesrc, + "Changing ip from {} to {}", + settings.ip, + ip + ); + settings.ip = ip; + drop(settings); + } + subclass::Property("loss-threshold", ..) => { + let mut settings = self.settings.lock().unwrap(); + let loss_threshold = value.get().unwrap(); + gst_debug!( + self.cat, + obj: basesrc, + "Changing loss threshold from {} to {}", + settings.loss_threshold, + loss_threshold + ); + settings.loss_threshold = loss_threshold; + drop(settings); + } + _ => unimplemented!(), } } - impl ObjectImpl for NdiAudioSrc { - glib_object_impl!(); + fn get_property(&self, _obj: &glib::Object, id: usize) -> Result { + let prop = &PROPERTIES[id]; - fn constructed(&self, obj: &glib::Object) { - self.parent_constructed(obj); - - let basesrc = obj.downcast_ref::().unwrap(); - // Initialize live-ness and notify the base class that - // we'd like to operate in Time format - basesrc.set_live(true); - basesrc.set_format(gst::Format::Time); - } - - fn set_property(&self, obj: &glib::Object, id: usize, value: &glib::Value) { - let prop = &PROPERTIES[id]; - let basesrc = obj.downcast_ref::().unwrap(); - - match *prop { - subclass::Property("stream-name", ..) => { - let mut settings = self.settings.lock().unwrap(); - let stream_name = value.get().unwrap(); - gst_debug!( - self.cat, - obj: basesrc, - "Changing stream-name from {} to {}", - settings.stream_name, - stream_name - ); - settings.stream_name = stream_name; - drop(settings); - } - subclass::Property("ip", ..) => { - let mut settings = self.settings.lock().unwrap(); - let ip = value.get().unwrap(); - gst_debug!( - self.cat, - obj: basesrc, - "Changing ip from {} to {}", - settings.ip, - ip - ); - settings.ip = ip; - drop(settings); - } - subclass::Property("loss-threshold", ..) => { - let mut settings = self.settings.lock().unwrap(); - let loss_threshold = value.get().unwrap(); - gst_debug!( - self.cat, - obj: basesrc, - "Changing loss threshold from {} to {}", - settings.loss_threshold, - loss_threshold - ); - settings.loss_threshold = loss_threshold; - drop(settings); - } - _ => unimplemented!(), - } - } - - fn get_property(&self, _obj: &glib::Object, id: usize) -> Result { - let prop = &PROPERTIES[id]; - - match *prop { - subclass::Property("stream-name", ..) => { - let settings = self.settings.lock().unwrap(); - Ok(settings.stream_name.to_value()) - } - subclass::Property("ip", ..) => { - let settings = self.settings.lock().unwrap(); - Ok(settings.ip.to_value()) - } - subclass::Property("loss-threshold", ..) => { - let settings = self.settings.lock().unwrap(); - Ok(settings.loss_threshold.to_value()) - } - _ => unimplemented!(), - } - } - } - - impl ElementImpl for NdiAudioSrc { - fn change_state( - &self, - element: &gst::Element, - transition: gst::StateChange, - ) -> Result { - if transition == gst::StateChange::PausedToPlaying { - let mut receivers = hashmap_receivers.lock().unwrap(); + match *prop { + subclass::Property("stream-name", ..) => { let settings = self.settings.lock().unwrap(); - - let receiver = receivers.get_mut(&settings.id_receiver).unwrap(); - let recv = &receiver.ndi_instance; - let pNDI_recv = recv.recv; - - let audio_frame: NDIlib_audio_frame_v2_t = Default::default(); - - unsafe { - while NDIlib_recv_capture_v2(pNDI_recv, ptr::null(), &audio_frame, ptr::null(), 1000) - != NDIlib_frame_type_e::NDIlib_frame_type_audio {} - } - gst_debug!(self.cat, obj: element, "NDI audio frame received: {:?}", audio_frame); - - if receiver.initial_timestamp <= audio_frame.timestamp as u64 - || receiver.initial_timestamp == 0 { - receiver.initial_timestamp = audio_frame.timestamp as u64; - } - unsafe { - NDIlib_recv_free_audio_v2(pNDI_recv, &audio_frame); - } - gst_debug!(self.cat, obj: element, "Setting initial timestamp to {}", receiver.initial_timestamp); - } - self.parent_change_state(element, transition) + Ok(settings.stream_name.to_value()) + } + subclass::Property("ip", ..) => { + let settings = self.settings.lock().unwrap(); + Ok(settings.ip.to_value()) + } + subclass::Property("loss-threshold", ..) => { + let settings = self.settings.lock().unwrap(); + Ok(settings.loss_threshold.to_value()) + } + _ => unimplemented!(), } } +} - impl BaseSrcImpl for NdiAudioSrc { - fn set_caps(&self, element: &gst_base::BaseSrc, caps: &gst::CapsRef) -> Result<(), gst::LoggableError> { - let info = match gst_audio::AudioInfo::from_caps(caps) { - None => return Err(gst_loggable_error!(self.cat, "Failed to build `AudioInfo` from caps {}", caps)), - Some(info) => info, - }; - - gst_debug!(self.cat, obj: element, "Configuring for caps {}", caps); - - let mut state = self.state.lock().unwrap(); - state.info = Some(info); - - Ok(()) - } - - fn start(&self, element: &gst_base::BaseSrc) -> Result<(), gst::ErrorMessage> { - *self.state.lock().unwrap() = Default::default(); - - let mut settings = self.settings.lock().unwrap(); - settings.id_receiver = connect_ndi( - self.cat, - element, - &settings.ip.clone(), - &settings.stream_name.clone(), - ); - - match settings.id_receiver { - 0 => Err(gst_error_msg!( - gst::ResourceError::NotFound, - ["Could not connect to this source"] - )), - _ => Ok(()) - } - } - - fn stop(&self, element: &gst_base::BaseSrc) -> Result<(), gst::ErrorMessage> { - *self.state.lock().unwrap() = Default::default(); - +impl ElementImpl for NdiAudioSrc { + fn change_state( + &self, + element: &gst::Element, + transition: gst::StateChange, + ) -> Result { + if transition == gst::StateChange::PausedToPlaying { + let mut receivers = hashmap_receivers.lock().unwrap(); let settings = self.settings.lock().unwrap(); - stop_ndi(self.cat, element, settings.id_receiver); - // Commented because when adding ndi destroy stopped in this line - //*self.state.lock().unwrap() = Default::default(); - Ok(()) - } - - fn query(&self, element: &gst_base::BaseSrc, query: &mut gst::QueryRef) -> bool { - use gst::QueryView; - if let QueryView::Scheduling(ref mut q) = query.view_mut() { - q.set(gst::SchedulingFlags::SEQUENTIAL, 1, -1, 0); - q.add_scheduling_modes(&[gst::PadMode::Push]); - return true; - } - if let QueryView::Latency(ref mut q) = query.view_mut() { - let settings = &*self.settings.lock().unwrap(); - let state = self.state.lock().unwrap(); - - if let Some(ref _info) = state.info { - let latency = settings.latency.unwrap(); - gst_debug!(self.cat, obj: element, "Returning latency {}", latency); - q.set(true, latency, gst::CLOCK_TIME_NONE); - return true; - } else { - return false; - } - } - BaseSrcImplExt::parent_query(self, element, query) - } - - fn fixate(&self, element: &gst_base::BaseSrc, caps: gst::Caps) -> gst::Caps { - let receivers = hashmap_receivers.lock().unwrap(); - let mut settings = self.settings.lock().unwrap(); - - let receiver = receivers.get(&settings.id_receiver).unwrap(); + let receiver = receivers.get_mut(&settings.id_receiver).unwrap(); let recv = &receiver.ndi_instance; let pNDI_recv = recv.recv; let audio_frame: NDIlib_audio_frame_v2_t = Default::default(); unsafe { - while NDIlib_recv_capture_v2(pNDI_recv, ptr::null(), &audio_frame, ptr::null(), 1000) - != NDIlib_frame_type_e::NDIlib_frame_type_audio {} + while NDIlib_recv_capture_v2( + pNDI_recv, + ptr::null(), + &audio_frame, + ptr::null(), + 1000, + ) != NDIlib_frame_type_e::NDIlib_frame_type_audio + {} } + gst_debug!( + self.cat, + obj: element, + "NDI audio frame received: {:?}", + audio_frame + ); - let no_samples = audio_frame.no_samples as u64; - let audio_rate = audio_frame.sample_rate; - settings.latency = gst::SECOND.mul_div_floor(no_samples, audio_rate as u64); - - let mut caps = gst::Caps::truncate(caps); + if receiver.initial_timestamp <= audio_frame.timestamp as u64 + || receiver.initial_timestamp == 0 { - let caps = caps.make_mut(); - let s = caps.get_mut_structure(0).unwrap(); - s.fixate_field_nearest_int("rate", audio_rate); - s.fixate_field_nearest_int("channels", audio_frame.no_channels); - s.fixate_field_str("layout", "interleaved"); - s.set_value("channel-mask", gst::Bitmask::new(gst_audio::AudioChannelPosition::get_fallback_mask(audio_frame.no_channels as u32)).to_send_value()); + receiver.initial_timestamp = audio_frame.timestamp as u64; } - - let _ = element.post_message(&gst::Message::new_latency().src(Some(element)).build()); unsafe { NDIlib_recv_free_audio_v2(pNDI_recv, &audio_frame); } + gst_debug!( + self.cat, + obj: element, + "Setting initial timestamp to {}", + receiver.initial_timestamp + ); + } + self.parent_change_state(element, transition) + } +} - self.parent_fixate(element, caps) +impl BaseSrcImpl for NdiAudioSrc { + fn set_caps( + &self, + element: &gst_base::BaseSrc, + caps: &gst::CapsRef, + ) -> Result<(), gst::LoggableError> { + let info = match gst_audio::AudioInfo::from_caps(caps) { + None => { + return Err(gst_loggable_error!( + self.cat, + "Failed to build `AudioInfo` from caps {}", + caps + )); + } + Some(info) => info, + }; + + gst_debug!(self.cat, obj: element, "Configuring for caps {}", caps); + + let mut state = self.state.lock().unwrap(); + state.info = Some(info); + + Ok(()) + } + + fn start(&self, element: &gst_base::BaseSrc) -> Result<(), gst::ErrorMessage> { + *self.state.lock().unwrap() = Default::default(); + + let mut settings = self.settings.lock().unwrap(); + settings.id_receiver = connect_ndi( + self.cat, + element, + &settings.ip.clone(), + &settings.stream_name.clone(), + ); + + match settings.id_receiver { + 0 => Err(gst_error_msg!( + gst::ResourceError::NotFound, + ["Could not connect to this source"] + )), + _ => Ok(()), + } + } + + fn stop(&self, element: &gst_base::BaseSrc) -> Result<(), gst::ErrorMessage> { + *self.state.lock().unwrap() = Default::default(); + + let settings = self.settings.lock().unwrap(); + stop_ndi(self.cat, element, settings.id_receiver); + // Commented because when adding ndi destroy stopped in this line + //*self.state.lock().unwrap() = Default::default(); + Ok(()) + } + + fn query(&self, element: &gst_base::BaseSrc, query: &mut gst::QueryRef) -> bool { + use gst::QueryView; + if let QueryView::Scheduling(ref mut q) = query.view_mut() { + q.set(gst::SchedulingFlags::SEQUENTIAL, 1, -1, 0); + q.add_scheduling_modes(&[gst::PadMode::Push]); + return true; + } + if let QueryView::Latency(ref mut q) = query.view_mut() { + let settings = &*self.settings.lock().unwrap(); + let state = self.state.lock().unwrap(); + + if let Some(ref _info) = state.info { + let latency = settings.latency.unwrap(); + gst_debug!(self.cat, obj: element, "Returning latency {}", latency); + q.set(true, latency, gst::CLOCK_TIME_NONE); + return true; + } else { + return false; + } + } + BaseSrcImplExt::parent_query(self, element, query) + } + + fn fixate(&self, element: &gst_base::BaseSrc, caps: gst::Caps) -> gst::Caps { + let receivers = hashmap_receivers.lock().unwrap(); + let mut settings = self.settings.lock().unwrap(); + + let receiver = receivers.get(&settings.id_receiver).unwrap(); + + let recv = &receiver.ndi_instance; + let pNDI_recv = recv.recv; + + let audio_frame: NDIlib_audio_frame_v2_t = Default::default(); + + unsafe { + while NDIlib_recv_capture_v2(pNDI_recv, ptr::null(), &audio_frame, ptr::null(), 1000) + != NDIlib_frame_type_e::NDIlib_frame_type_audio + {} } - fn create( - &self, - element: &gst_base::BaseSrc, - _offset: u64, - _length: u32, - ) -> Result { - let _settings = &*self.settings.lock().unwrap(); + let no_samples = audio_frame.no_samples as u64; + let audio_rate = audio_frame.sample_rate; + settings.latency = gst::SECOND.mul_div_floor(no_samples, audio_rate as u64); - let mut timestamp_data = self.timestamp_data.lock().unwrap(); + let mut caps = gst::Caps::truncate(caps); + { + let caps = caps.make_mut(); + let s = caps.get_mut_structure(0).unwrap(); + s.fixate_field_nearest_int("rate", audio_rate); + s.fixate_field_nearest_int("channels", audio_frame.no_channels); + s.fixate_field_str("layout", "interleaved"); + s.set_value( + "channel-mask", + gst::Bitmask::new(gst_audio::AudioChannelPosition::get_fallback_mask( + audio_frame.no_channels as u32, + )) + .to_send_value(), + ); + } - let state = self.state.lock().unwrap(); - let _info = match state.info { - None => { - gst_element_error!(element, gst::CoreError::Negotiation, ["Have no caps yet"]); - return Err(gst::FlowError::NotNegotiated); - } - Some(ref info) => info.clone(), - }; - let receivers = hashmap_receivers.lock().unwrap(); + let _ = element.post_message(&gst::Message::new_latency().src(Some(element)).build()); + unsafe { + NDIlib_recv_free_audio_v2(pNDI_recv, &audio_frame); + } - let recv = &receivers.get(&_settings.id_receiver).unwrap().ndi_instance; - let pNDI_recv = recv.recv; + self.parent_fixate(element, caps) + } - let pts: u64; - let audio_frame: NDIlib_audio_frame_v2_t = Default::default(); + fn create( + &self, + element: &gst_base::BaseSrc, + _offset: u64, + _length: u32, + ) -> Result { + let _settings = &*self.settings.lock().unwrap(); - unsafe { - let time = receivers.get(&_settings.id_receiver).unwrap().initial_timestamp; + let mut timestamp_data = self.timestamp_data.lock().unwrap(); - let mut skip_frame = true; - let mut count_frame_none = 0; - while skip_frame { - let frame_type = + let state = self.state.lock().unwrap(); + let _info = match state.info { + None => { + gst_element_error!(element, gst::CoreError::Negotiation, ["Have no caps yet"]); + return Err(gst::FlowError::NotNegotiated); + } + Some(ref info) => info.clone(), + }; + let receivers = hashmap_receivers.lock().unwrap(); + + let recv = &receivers.get(&_settings.id_receiver).unwrap().ndi_instance; + let pNDI_recv = recv.recv; + + let pts: u64; + let audio_frame: NDIlib_audio_frame_v2_t = Default::default(); + + unsafe { + let time = receivers + .get(&_settings.id_receiver) + .unwrap() + .initial_timestamp; + + let mut skip_frame = true; + let mut count_frame_none = 0; + while skip_frame { + let frame_type = NDIlib_recv_capture_v2(pNDI_recv, ptr::null(), &audio_frame, ptr::null(), 1000); - if (frame_type == NDIlib_frame_type_e::NDIlib_frame_type_none && _settings.loss_threshold != 0) + if (frame_type == NDIlib_frame_type_e::NDIlib_frame_type_none + && _settings.loss_threshold != 0) || frame_type == NDIlib_frame_type_e::NDIlib_frame_type_error - { - if count_frame_none < _settings.loss_threshold{ - count_frame_none += 1; - continue; - } - gst_element_error!(element, gst::ResourceError::Read, ["NDI frame type none or error received, assuming that the source closed the stream...."]); - return Err(gst::FlowError::CustomError); - } - else if frame_type == NDIlib_frame_type_e::NDIlib_frame_type_none && _settings.loss_threshold == 0{ - gst_debug!(self.cat, obj: element, "No audio frame received, sending empty buffer"); - let buffer = gst::Buffer::with_size(0).unwrap(); - return Ok(buffer) - } - - if time >= (audio_frame.timestamp as u64) { - NDIlib_recv_free_audio_v2(pNDI_recv, &audio_frame); - gst_debug!(self.cat, obj: element, "Frame timestamp ({:?}) is lower than received in the first frame from NDI ({:?}), so skiping...", (audio_frame.timestamp as u64), time); - } else { - skip_frame = false; + { + if count_frame_none < _settings.loss_threshold { + count_frame_none += 1; + continue; } + gst_element_error!(element, gst::ResourceError::Read, ["NDI frame type none or error received, assuming that the source closed the stream...."]); + return Err(gst::FlowError::CustomError); + } else if frame_type == NDIlib_frame_type_e::NDIlib_frame_type_none + && _settings.loss_threshold == 0 + { + gst_debug!( + self.cat, + obj: element, + "No audio frame received, sending empty buffer" + ); + let buffer = gst::Buffer::with_size(0).unwrap(); + return Ok(buffer); } - gst_log!(self.cat, obj: element, "NDI audio frame received: {:?}", (audio_frame)); + if time >= (audio_frame.timestamp as u64) { + NDIlib_recv_free_audio_v2(pNDI_recv, &audio_frame); + gst_debug!(self.cat, obj: element, "Frame timestamp ({:?}) is lower than received in the first frame from NDI ({:?}), so skiping...", (audio_frame.timestamp as u64), time); + } else { + skip_frame = false; + } + } - pts = audio_frame.timestamp as u64 - time; + gst_log!( + self.cat, + obj: element, + "NDI audio frame received: {:?}", + (audio_frame) + ); - gst_log!(self.cat, obj: element, "Calculated pts for audio frame: {:?}", (pts)); + pts = audio_frame.timestamp as u64 - time; - // We multiply by 2 because is the size in bytes of an i16 variable - let buff_size = (audio_frame.no_samples * 2 * audio_frame.no_channels) as usize; - let mut buffer = gst::Buffer::with_size(buff_size).unwrap(); - { - if ndi_struct.start_pts == gst::ClockTime(Some(0)) { - ndi_struct.start_pts = + gst_log!( + self.cat, + obj: element, + "Calculated pts for audio frame: {:?}", + (pts) + ); + + // We multiply by 2 because is the size in bytes of an i16 variable + let buff_size = (audio_frame.no_samples * 2 * audio_frame.no_channels) as usize; + let mut buffer = gst::Buffer::with_size(buff_size).unwrap(); + { + if ndi_struct.start_pts == gst::ClockTime(Some(0)) { + ndi_struct.start_pts = element.get_clock().unwrap().get_time() - element.get_base_time(); - } + } - let buffer = buffer.get_mut().unwrap(); + let buffer = buffer.get_mut().unwrap(); - // Newtek NDI yields times in 100ns intervals since the Unix Time - let pts: gst::ClockTime = (pts * 100).into(); - buffer.set_pts(pts + ndi_struct.start_pts); + // Newtek NDI yields times in 100ns intervals since the Unix Time + let pts: gst::ClockTime = (pts * 100).into(); + buffer.set_pts(pts + ndi_struct.start_pts); - let duration: gst::ClockTime = (((f64::from(audio_frame.no_samples) + let duration: gst::ClockTime = (((f64::from(audio_frame.no_samples) / f64::from(audio_frame.sample_rate)) * 1_000_000_000.0) as u64) .into(); - buffer.set_duration(duration); + buffer.set_duration(duration); - buffer.set_offset(timestamp_data.offset); - timestamp_data.offset += audio_frame.no_samples as u64; - buffer.set_offset_end(timestamp_data.offset); + buffer.set_offset(timestamp_data.offset); + timestamp_data.offset += audio_frame.no_samples as u64; + buffer.set_offset_end(timestamp_data.offset); - let mut dst: NDIlib_audio_frame_interleaved_16s_t = Default::default(); - dst.reference_level = 0; - dst.p_data = buffer.map_writable().unwrap().as_mut_slice_of::().unwrap().as_mut_ptr(); - NDIlib_util_audio_to_interleaved_16s_v2(&audio_frame, &mut dst); - NDIlib_recv_free_audio_v2(pNDI_recv, &audio_frame); - } - - gst_log!(self.cat, obj: element, "Produced buffer {:?}", buffer); - - Ok(buffer) + let mut dst: NDIlib_audio_frame_interleaved_16s_t = Default::default(); + dst.reference_level = 0; + dst.p_data = buffer + .map_writable() + .unwrap() + .as_mut_slice_of::() + .unwrap() + .as_mut_ptr(); + NDIlib_util_audio_to_interleaved_16s_v2(&audio_frame, &mut dst); + NDIlib_recv_free_audio_v2(pNDI_recv, &audio_frame); } + + gst_log!(self.cat, obj: element, "Produced buffer {:?}", buffer); + + Ok(buffer) } } +} - pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { - gst::Element::register(plugin, "ndiaudiosrc", 0, NdiAudioSrc::get_type()) - } +pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { + gst::Element::register(plugin, "ndiaudiosrc", 0, NdiAudioSrc::get_type()) +} diff --git a/src/ndisys.rs b/src/ndisys.rs index 2c39777f..30e5ebf4 100644 --- a/src/ndisys.rs +++ b/src/ndisys.rs @@ -35,11 +35,11 @@ extern "C" { ) -> NDIlib_frame_type_e; pub fn NDIlib_recv_free_video_v2( p_instance: NDIlib_recv_instance_t, - p_video_data: *const NDIlib_video_frame_v2_t + p_video_data: *const NDIlib_video_frame_v2_t, ); pub fn NDIlib_recv_free_audio_v2( p_instance: NDIlib_recv_instance_t, - p_audio_data: *const NDIlib_audio_frame_v2_t + p_audio_data: *const NDIlib_audio_frame_v2_t, ); } diff --git a/src/ndivideosrc.rs b/src/ndivideosrc.rs index 4bf2fe51..50192083 100644 --- a/src/ndivideosrc.rs +++ b/src/ndivideosrc.rs @@ -47,35 +47,35 @@ impl Default for Settings { } static PROPERTIES: [subclass::Property; 3] = [ -subclass::Property("stream-name", |_| { - glib::ParamSpec::string( - "stream-name", - "Stream Name", - "Name of the streaming device", - None, - glib::ParamFlags::READWRITE, - ) -}), -subclass::Property("ip", |_| { - glib::ParamSpec::string( - "ip", - "Stream IP", - "IP of the streaming device. Ex: 127.0.0.1:5961", - None, - glib::ParamFlags::READWRITE, - ) -}), -subclass::Property("loss-threshold", |_| { - glib::ParamSpec::uint( - "loss-threshold", - "Loss threshold", - "Loss threshold", - 0, - 60, - 5, - glib::ParamFlags::READWRITE, - ) -}), + subclass::Property("stream-name", |_| { + glib::ParamSpec::string( + "stream-name", + "Stream Name", + "Name of the streaming device", + None, + glib::ParamFlags::READWRITE, + ) + }), + subclass::Property("ip", |_| { + glib::ParamSpec::string( + "ip", + "Stream IP", + "IP of the streaming device. Ex: 127.0.0.1:5961", + None, + glib::ParamFlags::READWRITE, + ) + }), + subclass::Property("loss-threshold", |_| { + glib::ParamSpec::uint( + "loss-threshold", + "Loss threshold", + "Loss threshold", + 0, + 60, + 5, + glib::ParamFlags::READWRITE, + ) + }), ]; struct State { @@ -100,7 +100,6 @@ struct NdiVideoSrc { } impl ObjectSubclass for NdiVideoSrc { - const NAME: &'static str = "NdiVideoSrc"; type ParentType = gst_base::BaseSrc; type Instance = gst::subclass::ElementInstanceStruct; @@ -134,13 +133,13 @@ impl ObjectSubclass for NdiVideoSrc { let caps = gst::Caps::new_simple( "video/x-raw", &[ - ( - "format", - &gst::List::new(&[ - //TODO add all formats - &gst_video::VideoFormat::Uyvy.to_string(), - //&gst_video::VideoFormat::Rgb.to_string(), - //&gst_video::VideoFormat::Gray8.to_string(), + ( + "format", + &gst::List::new(&[ + //TODO add all formats + &gst_video::VideoFormat::Uyvy.to_string(), + //&gst_video::VideoFormat::Rgb.to_string(), + //&gst_video::VideoFormat::Gray8.to_string(), ]), ), ("width", &gst::IntRange::::new(0, i32::MAX)), @@ -152,342 +151,387 @@ impl ObjectSubclass for NdiVideoSrc { gst::Fraction::new(i32::MAX, 1), ), ), - ], - ); + ], + ); - let src_pad_template = gst::PadTemplate::new( - "src", - gst::PadDirection::Src, - gst::PadPresence::Always, - &caps, - ).unwrap(); - klass.add_pad_template(src_pad_template); + let src_pad_template = gst::PadTemplate::new( + "src", + gst::PadDirection::Src, + gst::PadPresence::Always, + &caps, + ) + .unwrap(); + klass.add_pad_template(src_pad_template); - klass.install_properties(&PROPERTIES); + klass.install_properties(&PROPERTIES); + } +} + +impl ObjectImpl for NdiVideoSrc { + glib_object_impl!(); + + fn constructed(&self, obj: &glib::Object) { + self.parent_constructed(obj); + + let basesrc = obj.downcast_ref::().unwrap(); + // Initialize live-ness and notify the base class that + // we'd like to operate in Time format + basesrc.set_live(true); + basesrc.set_format(gst::Format::Time); + } + + fn set_property(&self, obj: &glib::Object, id: usize, value: &glib::Value) { + let prop = &PROPERTIES[id]; + let basesrc = obj.downcast_ref::().unwrap(); + + match *prop { + subclass::Property("stream-name", ..) => { + let mut settings = self.settings.lock().unwrap(); + let stream_name = value.get().unwrap(); + gst_debug!( + self.cat, + obj: basesrc, + "Changing stream-name from {} to {}", + settings.stream_name, + stream_name + ); + settings.stream_name = stream_name; + drop(settings); + } + subclass::Property("ip", ..) => { + let mut settings = self.settings.lock().unwrap(); + let ip = value.get().unwrap(); + gst_debug!( + self.cat, + obj: basesrc, + "Changing ip from {} to {}", + settings.ip, + ip + ); + settings.ip = ip; + drop(settings); + } + subclass::Property("loss-threshold", ..) => { + let mut settings = self.settings.lock().unwrap(); + let loss_threshold = value.get().unwrap(); + gst_debug!( + self.cat, + obj: basesrc, + "Changing loss threshold from {} to {}", + settings.loss_threshold, + loss_threshold + ); + settings.loss_threshold = loss_threshold; + drop(settings); + } + _ => unimplemented!(), } } + fn get_property(&self, _obj: &glib::Object, id: usize) -> Result { + let prop = &PROPERTIES[id]; - impl ObjectImpl for NdiVideoSrc { - glib_object_impl!(); - - fn constructed(&self, obj: &glib::Object) { - self.parent_constructed(obj); - - let basesrc = obj.downcast_ref::().unwrap(); - // Initialize live-ness and notify the base class that - // we'd like to operate in Time format - basesrc.set_live(true); - basesrc.set_format(gst::Format::Time); - } - - - fn set_property(&self, obj: &glib::Object, id: usize, value: &glib::Value) { - let prop = &PROPERTIES[id]; - let basesrc = obj.downcast_ref::().unwrap(); - - match *prop { - subclass::Property("stream-name", ..) => { - let mut settings = self.settings.lock().unwrap(); - let stream_name = value.get().unwrap(); - gst_debug!( - self.cat, - obj: basesrc, - "Changing stream-name from {} to {}", - settings.stream_name, - stream_name - ); - settings.stream_name = stream_name; - drop(settings); - } - subclass::Property("ip", ..) => { - let mut settings = self.settings.lock().unwrap(); - let ip = value.get().unwrap(); - gst_debug!( - self.cat, - obj: basesrc, - "Changing ip from {} to {}", - settings.ip, - ip - ); - settings.ip = ip; - drop(settings); - } - subclass::Property("loss-threshold", ..) => { - let mut settings = self.settings.lock().unwrap(); - let loss_threshold = value.get().unwrap(); - gst_debug!( - self.cat, - obj: basesrc, - "Changing loss threshold from {} to {}", - settings.loss_threshold, - loss_threshold - ); - settings.loss_threshold = loss_threshold; - drop(settings); - } - _ => unimplemented!(), - } - } - - fn get_property(&self, _obj: &glib::Object, id: usize) -> Result { - let prop = &PROPERTIES[id]; - - match *prop { - subclass::Property("stream-name", ..) => { - let settings = self.settings.lock().unwrap(); - Ok(settings.stream_name.to_value()) - } - subclass::Property("ip", ..) => { - let settings = self.settings.lock().unwrap(); - Ok(settings.ip.to_value()) - } - subclass::Property("loss-threshold", ..) => { - let settings = self.settings.lock().unwrap(); - Ok(settings.loss_threshold.to_value()) - } - _ => unimplemented!(), - } - } - } - - impl ElementImpl for NdiVideoSrc { - fn change_state( - &self, - element: &gst::Element, - transition: gst::StateChange, - ) -> Result { - if transition == gst::StateChange::PausedToPlaying { - let mut receivers = hashmap_receivers.lock().unwrap(); + match *prop { + subclass::Property("stream-name", ..) => { let settings = self.settings.lock().unwrap(); - - let receiver = receivers.get_mut(&settings.id_receiver).unwrap(); - let recv = &receiver.ndi_instance; - let pNDI_recv = recv.recv; - - let video_frame: NDIlib_video_frame_v2_t = Default::default(); - - unsafe { - while NDIlib_recv_capture_v2(pNDI_recv, &video_frame, ptr::null(), ptr::null(), 1000) - != NDIlib_frame_type_e::NDIlib_frame_type_video {} - } - gst_debug!(self.cat, obj: element, "NDI video frame received: {:?}", video_frame); - - if receiver.initial_timestamp <= video_frame.timestamp as u64 - || receiver.initial_timestamp == 0 - { - receiver.initial_timestamp = video_frame.timestamp as u64; - } - unsafe { - NDIlib_recv_free_video_v2(pNDI_recv, &video_frame); - } - gst_debug!(self.cat, obj: element, "Setting initial timestamp to {}", receiver.initial_timestamp); - } - self.parent_change_state(element, transition) + Ok(settings.stream_name.to_value()) + } + subclass::Property("ip", ..) => { + let settings = self.settings.lock().unwrap(); + Ok(settings.ip.to_value()) + } + subclass::Property("loss-threshold", ..) => { + let settings = self.settings.lock().unwrap(); + Ok(settings.loss_threshold.to_value()) + } + _ => unimplemented!(), } } +} - impl BaseSrcImpl for NdiVideoSrc { - fn set_caps(&self, element: &gst_base::BaseSrc, caps: &gst::CapsRef) -> Result<(), gst::LoggableError> { - let info = match gst_video::VideoInfo::from_caps(caps) { - None => return Err(gst_loggable_error!(self.cat, "Failed to build `VideoInfo` from caps {}", caps)), - Some(info) => info, - }; - gst_debug!(self.cat, obj: element, "Configuring for caps {}", caps); - - let mut state = self.state.lock().unwrap(); - state.info = Some(info); - let _ = element.post_message(&gst::Message::new_latency().src(Some(element)).build()); - Ok(()) - } - - fn start(&self, element: &gst_base::BaseSrc) -> Result<(), gst::ErrorMessage> { - *self.state.lock().unwrap() = Default::default(); - let mut settings = self.settings.lock().unwrap(); - settings.id_receiver = connect_ndi( - self.cat, - element, - &settings.ip.clone(), - &settings.stream_name.clone(), - ); - - // settings.id_receiver != 0 - match settings.id_receiver { - 0 => Err(gst_error_msg!( - gst::ResourceError::NotFound, - ["Could not connect to this source"] - )), - _ => Ok(()) - } - } - - fn stop(&self, element: &gst_base::BaseSrc) -> Result<(), gst::ErrorMessage> { - *self.state.lock().unwrap() = Default::default(); - +impl ElementImpl for NdiVideoSrc { + fn change_state( + &self, + element: &gst::Element, + transition: gst::StateChange, + ) -> Result { + if transition == gst::StateChange::PausedToPlaying { + let mut receivers = hashmap_receivers.lock().unwrap(); let settings = self.settings.lock().unwrap(); - stop_ndi(self.cat, element, settings.id_receiver); - // Commented because when adding ndi destroy stopped in this line - //*self.state.lock().unwrap() = Default::default(); - Ok(()) - } - fn query(&self, element: &gst_base::BaseSrc, query: &mut gst::QueryRef) -> bool { - use gst::QueryView; - if let QueryView::Scheduling(ref mut q) = query.view_mut() { - q.set(gst::SchedulingFlags::SEQUENTIAL, 1, -1, 0); - q.add_scheduling_modes(&[gst::PadMode::Push]); - return true; - } - if let QueryView::Latency(ref mut q) = query.view_mut() { - let settings = &*self.settings.lock().unwrap(); - let state = self.state.lock().unwrap(); - - if let Some(ref _info) = state.info { - let latency = settings.latency.unwrap(); - gst_debug!(self.cat, obj: element, "Returning latency {}", latency); - q.set(true, latency, gst::CLOCK_TIME_NONE); - return true; - } else { - return false; - } - } - BaseSrcImplExt::parent_query(self, element, query) - } - - fn fixate(&self, element: &gst_base::BaseSrc, caps: gst::Caps) -> gst::Caps { - let receivers = hashmap_receivers.lock().unwrap(); - let mut settings = self.settings.lock().unwrap(); - - let receiver = receivers.get(&settings.id_receiver).unwrap(); + let receiver = receivers.get_mut(&settings.id_receiver).unwrap(); let recv = &receiver.ndi_instance; let pNDI_recv = recv.recv; let video_frame: NDIlib_video_frame_v2_t = Default::default(); unsafe { - while NDIlib_recv_capture_v2(pNDI_recv, &video_frame, ptr::null(), ptr::null(), 1000) - != NDIlib_frame_type_e::NDIlib_frame_type_video {} + while NDIlib_recv_capture_v2( + pNDI_recv, + &video_frame, + ptr::null(), + ptr::null(), + 1000, + ) != NDIlib_frame_type_e::NDIlib_frame_type_video + {} } - settings.latency = gst::SECOND.mul_div_floor( - video_frame.frame_rate_D as u64, - video_frame.frame_rate_N as u64, + gst_debug!( + self.cat, + obj: element, + "NDI video frame received: {:?}", + video_frame ); - let mut caps = gst::Caps::truncate(caps); + if receiver.initial_timestamp <= video_frame.timestamp as u64 + || receiver.initial_timestamp == 0 { - let caps = caps.make_mut(); - let s = caps.get_mut_structure(0).unwrap(); - s.fixate_field_nearest_int("width", video_frame.xres); - s.fixate_field_nearest_int("height", video_frame.yres); - s.fixate_field_nearest_fraction( - "framerate", - Fraction::new(video_frame.frame_rate_N, video_frame.frame_rate_D), - ); + receiver.initial_timestamp = video_frame.timestamp as u64; } unsafe { NDIlib_recv_free_video_v2(pNDI_recv, &video_frame); } - let _ = element.post_message(&gst::Message::new_latency().src(Some(element)).build()); - self.parent_fixate(element, caps) + gst_debug!( + self.cat, + obj: element, + "Setting initial timestamp to {}", + receiver.initial_timestamp + ); } + self.parent_change_state(element, transition) + } +} - //Creates the video buffers - fn create( - &self, - element: &gst_base::BaseSrc, - _offset: u64, - _length: u32, - ) -> Result { - let _settings = &*self.settings.lock().unwrap(); +impl BaseSrcImpl for NdiVideoSrc { + fn set_caps( + &self, + element: &gst_base::BaseSrc, + caps: &gst::CapsRef, + ) -> Result<(), gst::LoggableError> { + let info = match gst_video::VideoInfo::from_caps(caps) { + None => { + return Err(gst_loggable_error!( + self.cat, + "Failed to build `VideoInfo` from caps {}", + caps + )); + } + Some(info) => info, + }; + gst_debug!(self.cat, obj: element, "Configuring for caps {}", caps); - let mut timestamp_data = self.timestamp_data.lock().unwrap(); + let mut state = self.state.lock().unwrap(); + state.info = Some(info); + let _ = element.post_message(&gst::Message::new_latency().src(Some(element)).build()); + Ok(()) + } + + fn start(&self, element: &gst_base::BaseSrc) -> Result<(), gst::ErrorMessage> { + *self.state.lock().unwrap() = Default::default(); + let mut settings = self.settings.lock().unwrap(); + settings.id_receiver = connect_ndi( + self.cat, + element, + &settings.ip.clone(), + &settings.stream_name.clone(), + ); + + // settings.id_receiver != 0 + match settings.id_receiver { + 0 => Err(gst_error_msg!( + gst::ResourceError::NotFound, + ["Could not connect to this source"] + )), + _ => Ok(()), + } + } + + fn stop(&self, element: &gst_base::BaseSrc) -> Result<(), gst::ErrorMessage> { + *self.state.lock().unwrap() = Default::default(); + + let settings = self.settings.lock().unwrap(); + stop_ndi(self.cat, element, settings.id_receiver); + // Commented because when adding ndi destroy stopped in this line + //*self.state.lock().unwrap() = Default::default(); + Ok(()) + } + + fn query(&self, element: &gst_base::BaseSrc, query: &mut gst::QueryRef) -> bool { + use gst::QueryView; + if let QueryView::Scheduling(ref mut q) = query.view_mut() { + q.set(gst::SchedulingFlags::SEQUENTIAL, 1, -1, 0); + q.add_scheduling_modes(&[gst::PadMode::Push]); + return true; + } + if let QueryView::Latency(ref mut q) = query.view_mut() { + let settings = &*self.settings.lock().unwrap(); let state = self.state.lock().unwrap(); - let _info = match state.info { - None => { - gst_element_error!(element, gst::CoreError::Negotiation, ["Have no caps yet"]); - return Err(gst::FlowError::NotNegotiated); - } - Some(ref info) => info.clone(), - }; - let receivers = hashmap_receivers.lock().unwrap(); - let recv = &receivers.get(&_settings.id_receiver).unwrap().ndi_instance; - let pNDI_recv = recv.recv; + if let Some(ref _info) = state.info { + let latency = settings.latency.unwrap(); + gst_debug!(self.cat, obj: element, "Returning latency {}", latency); + q.set(true, latency, gst::CLOCK_TIME_NONE); + return true; + } else { + return false; + } + } + BaseSrcImplExt::parent_query(self, element, query) + } - let pts: u64; - let video_frame: NDIlib_video_frame_v2_t = Default::default(); + fn fixate(&self, element: &gst_base::BaseSrc, caps: gst::Caps) -> gst::Caps { + let receivers = hashmap_receivers.lock().unwrap(); + let mut settings = self.settings.lock().unwrap(); - unsafe { - let time = receivers.get(&_settings.id_receiver).unwrap().initial_timestamp; + let receiver = receivers.get(&settings.id_receiver).unwrap(); + let recv = &receiver.ndi_instance; + let pNDI_recv = recv.recv; - let mut skip_frame = true; - let mut count_frame_none = 0; - while skip_frame { - let frame_type = + let video_frame: NDIlib_video_frame_v2_t = Default::default(); + + unsafe { + while NDIlib_recv_capture_v2(pNDI_recv, &video_frame, ptr::null(), ptr::null(), 1000) + != NDIlib_frame_type_e::NDIlib_frame_type_video + {} + } + settings.latency = gst::SECOND.mul_div_floor( + video_frame.frame_rate_D as u64, + video_frame.frame_rate_N as u64, + ); + + let mut caps = gst::Caps::truncate(caps); + { + let caps = caps.make_mut(); + let s = caps.get_mut_structure(0).unwrap(); + s.fixate_field_nearest_int("width", video_frame.xres); + s.fixate_field_nearest_int("height", video_frame.yres); + s.fixate_field_nearest_fraction( + "framerate", + Fraction::new(video_frame.frame_rate_N, video_frame.frame_rate_D), + ); + } + unsafe { + NDIlib_recv_free_video_v2(pNDI_recv, &video_frame); + } + let _ = element.post_message(&gst::Message::new_latency().src(Some(element)).build()); + self.parent_fixate(element, caps) + } + + //Creates the video buffers + fn create( + &self, + element: &gst_base::BaseSrc, + _offset: u64, + _length: u32, + ) -> Result { + let _settings = &*self.settings.lock().unwrap(); + + let mut timestamp_data = self.timestamp_data.lock().unwrap(); + let state = self.state.lock().unwrap(); + let _info = match state.info { + None => { + gst_element_error!(element, gst::CoreError::Negotiation, ["Have no caps yet"]); + return Err(gst::FlowError::NotNegotiated); + } + Some(ref info) => info.clone(), + }; + let receivers = hashmap_receivers.lock().unwrap(); + + let recv = &receivers.get(&_settings.id_receiver).unwrap().ndi_instance; + let pNDI_recv = recv.recv; + + let pts: u64; + let video_frame: NDIlib_video_frame_v2_t = Default::default(); + + unsafe { + let time = receivers + .get(&_settings.id_receiver) + .unwrap() + .initial_timestamp; + + let mut skip_frame = true; + let mut count_frame_none = 0; + while skip_frame { + let frame_type = NDIlib_recv_capture_v2(pNDI_recv, &video_frame, ptr::null(), ptr::null(), 1000); - if (frame_type == NDIlib_frame_type_e::NDIlib_frame_type_none && _settings.loss_threshold != 0) + if (frame_type == NDIlib_frame_type_e::NDIlib_frame_type_none + && _settings.loss_threshold != 0) || frame_type == NDIlib_frame_type_e::NDIlib_frame_type_error - { - if count_frame_none < _settings.loss_threshold{ - count_frame_none += 1; - continue; - } - gst_element_error!(element, gst::ResourceError::Read, ["NDI frame type none or error received, assuming that the source closed the stream...."]); - return Err(gst::FlowError::CustomError); - } - else if frame_type == NDIlib_frame_type_e::NDIlib_frame_type_none && _settings.loss_threshold == 0{ - gst_debug!(self.cat, obj: element, "No video frame received, sending empty buffer"); - let buffer = gst::Buffer::with_size(0).unwrap(); - return Ok(buffer) - } - - if time >= (video_frame.timestamp as u64) { - NDIlib_recv_free_video_v2(pNDI_recv, &video_frame); - gst_debug!(self.cat, obj: element, "Frame timestamp ({:?}) is lower than received in the first frame from NDI ({:?}), so skiping...", (video_frame.timestamp as u64), time); - } else { - skip_frame = false; + { + if count_frame_none < _settings.loss_threshold { + count_frame_none += 1; + continue; } + gst_element_error!(element, gst::ResourceError::Read, ["NDI frame type none or error received, assuming that the source closed the stream...."]); + return Err(gst::FlowError::CustomError); + } else if frame_type == NDIlib_frame_type_e::NDIlib_frame_type_none + && _settings.loss_threshold == 0 + { + gst_debug!( + self.cat, + obj: element, + "No video frame received, sending empty buffer" + ); + let buffer = gst::Buffer::with_size(0).unwrap(); + return Ok(buffer); } - gst_log!(self.cat, obj: element, "NDI video frame received: {:?}", (video_frame)); + if time >= (video_frame.timestamp as u64) { + NDIlib_recv_free_video_v2(pNDI_recv, &video_frame); + gst_debug!(self.cat, obj: element, "Frame timestamp ({:?}) is lower than received in the first frame from NDI ({:?}), so skiping...", (video_frame.timestamp as u64), time); + } else { + skip_frame = false; + } + } - pts = video_frame.timestamp as u64 - time; + gst_log!( + self.cat, + obj: element, + "NDI video frame received: {:?}", + (video_frame) + ); - gst_log!(self.cat, obj: element, "Calculated pts for video frame: {:?}", (pts)); + pts = video_frame.timestamp as u64 - time; - let buff_size = (video_frame.yres * video_frame.line_stride_in_bytes) as usize; - let mut buffer = gst::Buffer::with_size(buff_size).unwrap(); - { - let vec = Vec::from_raw_parts(video_frame.p_data as *mut u8, buff_size, buff_size); - // Newtek NDI yields times in 100ns intervals since the Unix Time - let pts: gst::ClockTime = (pts * 100).into(); + gst_log!( + self.cat, + obj: element, + "Calculated pts for video frame: {:?}", + (pts) + ); - let duration: gst::ClockTime = (((f64::from(video_frame.frame_rate_D) + let buff_size = (video_frame.yres * video_frame.line_stride_in_bytes) as usize; + let mut buffer = gst::Buffer::with_size(buff_size).unwrap(); + { + let vec = Vec::from_raw_parts(video_frame.p_data as *mut u8, buff_size, buff_size); + // Newtek NDI yields times in 100ns intervals since the Unix Time + let pts: gst::ClockTime = (pts * 100).into(); + + let duration: gst::ClockTime = (((f64::from(video_frame.frame_rate_D) / f64::from(video_frame.frame_rate_N)) * 1_000_000_000.0) as u64) .into(); - let buffer = buffer.get_mut().unwrap(); + let buffer = buffer.get_mut().unwrap(); - if ndi_struct.start_pts == gst::ClockTime(Some(0)) { - ndi_struct.start_pts = + if ndi_struct.start_pts == gst::ClockTime(Some(0)) { + ndi_struct.start_pts = element.get_clock().unwrap().get_time() - element.get_base_time(); - } - - buffer.set_pts(pts + ndi_struct.start_pts); - buffer.set_duration(duration); - buffer.set_offset(timestamp_data.offset); - timestamp_data.offset += 1; - buffer.set_offset_end(timestamp_data.offset); - buffer.copy_from_slice(0, &vec).unwrap(); } - gst_log!(self.cat, obj: element, "Produced buffer {:?}", buffer); - - Ok(buffer) + buffer.set_pts(pts + ndi_struct.start_pts); + buffer.set_duration(duration); + buffer.set_offset(timestamp_data.offset); + timestamp_data.offset += 1; + buffer.set_offset_end(timestamp_data.offset); + buffer.copy_from_slice(0, &vec).unwrap(); } + + gst_log!(self.cat, obj: element, "Produced buffer {:?}", buffer); + + Ok(buffer) } } +} - pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { - gst::Element::register(plugin, "ndivideosrc", 0, NdiVideoSrc::get_type()) - } +pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { + gst::Element::register(plugin, "ndivideosrc", 0, NdiVideoSrc::get_type()) +}