diff --git a/src/lib.rs b/src/lib.rs index 6c41b2ee..fee28102 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,5 +1,3 @@ -#![allow(non_camel_case_types, non_upper_case_globals, non_snake_case)] - #[macro_use] extern crate glib; #[macro_use] @@ -18,57 +16,59 @@ pub mod ndi; mod ndiaudiosrc; mod ndivideosrc; -// use gst_plugin::base_src::*; use ndisys::*; -use std::ffi::{CStr, CString}; +use ndi::*; use std::collections::HashMap; use std::sync::Mutex; - -use gst::GstObjectExt; +use std::sync::atomic::{AtomicUsize, Ordering}; fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { + if !ndi::initialize() { + return Err(glib_bool_error!("Cannot initialize NDI")); + } + ndivideosrc::register(plugin)?; ndiaudiosrc::register(plugin)?; Ok(()) } -struct ndi_receiver_info { +struct ReceiverInfo { + id: usize, stream_name: String, ip: String, video: bool, audio: bool, - ndi_instance: NdiInstance, + ndi_instance: RecvInstance, initial_timestamp: u64, - id: i8, } struct Ndi { start_pts: gst::ClockTime, } -static mut ndi_struct: Ndi = Ndi { +static mut NDI_STRUCT: Ndi = Ndi { start_pts: gst::ClockTime(Some(0)), }; lazy_static! { - static ref hashmap_receivers: Mutex> = { + static ref HASHMAP_RECEIVERS: Mutex> = { let m = HashMap::new(); Mutex::new(m) }; } -static mut id_receiver: i8 = 0; +static ID_RECEIVER: AtomicUsize = AtomicUsize::new(0); fn connect_ndi( cat: gst::DebugCategory, element: &gst_base::BaseSrc, ip: &str, stream_name: &str, -) -> i8 { +) -> Option { gst_debug!(cat, obj: element, "Starting NDI connection..."); - let mut receivers = hashmap_receivers.lock().unwrap(); + let mut receivers = HASHMAP_RECEIVERS.lock().unwrap(); let mut audio = false; let mut video = false; @@ -92,143 +92,102 @@ fn connect_ndi( } else { val.audio = audio; } - return val.id; + return Some(val.id); } } } - unsafe { - if !NDIlib_initialize() { - gst_element_error!( - element, - gst::CoreError::Negotiation, - ["Cannot run NDI: NDIlib_initialize error"] - ); - return 0; - } - let NDI_find_create_desc: NDIlib_find_create_t = Default::default(); - let pNDI_find = NDIlib_find_create_v2(&NDI_find_create_desc); - if pNDI_find.is_null() { + let mut find = match FindInstance::builder().build() { + None => { gst_element_error!( element, gst::CoreError::Negotiation, ["Cannot run NDI: NDIlib_find_create_v2 error"] ); - return 0; - } + return None; + }, + Some(find) => find, + }; - let mut total_sources: u32 = 0; - let p_sources; + // TODO Sleep 1s to wait for all sources + find.wait_for_sources(2000); - // TODO Sleep 1s to wait for all sources - NDIlib_find_wait_for_sources(pNDI_find, 2000); - p_sources = NDIlib_find_get_current_sources(pNDI_find, &mut total_sources as *mut u32); + let sources = find.get_current_sources(); - // We need at least one source - if p_sources.is_null() { - gst_element_error!( - element, - gst::CoreError::Negotiation, - ["Error getting NDIlib_find_get_current_sources"] - ); - return 0; - } - - let mut no_source: isize = -1; - for i in 0..total_sources as isize { - if CStr::from_ptr((*p_sources.offset(i)).p_ndi_name) - .to_string_lossy() - .into_owned() - == stream_name - || CStr::from_ptr((*p_sources.offset(i)).p_ip_address) - .to_string_lossy() - .into_owned() - == ip - { - no_source = i; - break; - } - } - if no_source == -1 { - gst_element_error!(element, gst::ResourceError::OpenRead, ["Stream not found"]); - return 0; - } - - gst_debug!( - cat, - obj: element, - "Total sources in network {}: Connecting to NDI source with name '{}' and address '{}'", - total_sources, - CStr::from_ptr((*p_sources.offset(no_source)).p_ndi_name) - .to_string_lossy() - .into_owned(), - CStr::from_ptr((*p_sources.offset(no_source)).p_ip_address) - .to_string_lossy() - .into_owned() + // We need at least one source + if sources.is_empty() { + gst_element_error!( + element, + gst::CoreError::Negotiation, + ["Error getting NDIlib_find_get_current_sources"] ); + return None; + } - let source = *p_sources.offset(no_source); + let source = sources.iter().find(|s| { + s.ndi_name() == stream_name || s.ip_address() == ip + }); - let source_ip = CStr::from_ptr(source.p_ip_address) - .to_string_lossy() - .into_owned(); - let source_name = CStr::from_ptr(source.p_ndi_name) - .to_string_lossy() - .into_owned(); + let source = match source { + None => { + gst_element_error!(element, gst::ResourceError::OpenRead, ["Stream not found"]); + return None; + }, + Some(source) => source, + }; - let p_ndi_name = CString::new("Galicaster NDI Receiver").unwrap(); - let NDI_recv_create_desc = NDIlib_recv_create_v3_t { - source_to_connect_to: source, - p_ndi_name: p_ndi_name.as_ptr(), - ..Default::default() - }; + gst_debug!( + cat, + obj: element, + "Total sources in network {}: Connecting to NDI source with name '{}' and address '{}'", + sources.len(), + source.ndi_name(), + source.ip_address(), + ); - let pNDI_recv = NDIlib_recv_create_v3(&NDI_recv_create_desc); - if pNDI_recv.is_null() { + let recv = RecvInstance::builder(&source, "Galicaster NDI Receiver") + .bandwidth(NDIlib_recv_bandwidth_e::NDIlib_recv_bandwidth_highest) + .color_format(NDIlib_recv_color_format_e::NDIlib_recv_color_format_UYVY_BGRA) + .allow_video_fields(true) + .build(); + let recv = match recv { + None => { gst_element_error!( element, gst::CoreError::Negotiation, ["Cannot run NDI: NDIlib_recv_create_v3 error"] ); - return 0; - } + return None; + }, + Some(recv) => recv, + }; - NDIlib_find_destroy(pNDI_find); + recv.set_tally(&Tally::default()); - let tally_state: NDIlib_tally_t = Default::default(); - NDIlib_recv_set_tally(pNDI_recv, &tally_state); + let enable_hw_accel = MetadataFrame::new(0, Some("")); + recv.send_metadata(&enable_hw_accel); - let data = CString::new("").unwrap(); - let enable_hw_accel = NDIlib_metadata_frame_t { - length: data.to_bytes().len() as i32, - timecode: 0, - p_data: data.as_ptr(), - }; + let id_receiver = ID_RECEIVER.fetch_add(1, Ordering::SeqCst); + receivers.insert( + id_receiver, + ReceiverInfo { + stream_name: source.ndi_name().to_owned(), + ip: source.ip_address().to_owned(), + video, + audio, + ndi_instance: recv, + initial_timestamp: 0, + id: id_receiver, + }, + ); - NDIlib_recv_send_metadata(pNDI_recv, &enable_hw_accel); - - id_receiver += 1; - receivers.insert( - id_receiver, - ndi_receiver_info { - stream_name: source_name.clone(), - ip: source_ip.clone(), - video, - audio, - ndi_instance: NdiInstance { recv: pNDI_recv }, - initial_timestamp: 0, - id: id_receiver, - }, - ); - - gst_debug!(cat, obj: element, "Started NDI connection"); - id_receiver - } + gst_debug!(cat, obj: element, "Started NDI connection"); + Some(id_receiver) } -fn stop_ndi(cat: gst::DebugCategory, element: &gst_base::BaseSrc, id: i8) -> bool { +fn stop_ndi(cat: gst::DebugCategory, element: &gst_base::BaseSrc, id: usize) -> bool { gst_debug!(cat, obj: element, "Closing NDI connection..."); - let mut receivers = hashmap_receivers.lock().unwrap(); + let mut receivers = HASHMAP_RECEIVERS.lock().unwrap(); { let val = receivers.get_mut(&id).unwrap(); if val.video && val.audio { @@ -239,14 +198,6 @@ fn stop_ndi(cat: gst::DebugCategory, element: &gst_base::BaseSrc, id: i8) -> boo } return true; } - - let recv = &val.ndi_instance; - let pNDI_recv = recv.recv; - unsafe { - NDIlib_recv_destroy(pNDI_recv); - // ndi_struct.recv = None; - NDIlib_destroy(); - } } receivers.remove(&id); gst_debug!(cat, obj: element, "Closed NDI connection"); diff --git a/src/ndi.rs b/src/ndi.rs index 08acef36..355b0362 100644 --- a/src/ndi.rs +++ b/src/ndi.rs @@ -7,6 +7,7 @@ pub fn initialize() -> bool { unsafe { NDIlib_initialize() } } +#[derive(Debug)] pub struct FindBuilder<'a> { show_local_sources: bool, groups: Option<&'a str>, @@ -67,6 +68,7 @@ impl<'a> FindBuilder<'a> { } } +#[derive(Debug)] pub struct FindInstance(ptr::NonNull<::std::os::raw::c_void>); unsafe impl Send for FindInstance {} @@ -111,6 +113,7 @@ impl Drop for FindInstance { } } +#[derive(Debug)] pub struct Source<'a>(ptr::NonNull, &'a FindInstance); unsafe impl<'a> Send for Source<'a> {} @@ -135,6 +138,7 @@ impl<'a> Source<'a> { } } +#[derive(Debug)] pub struct RecvBuilder<'a> { source_to_connect_to: &'a Source<'a>, allow_video_fields: bool, @@ -185,6 +189,7 @@ impl<'a> RecvBuilder<'a> { } } +#[derive(Debug)] pub struct RecvInstance(ptr::NonNull<::std::os::raw::c_void>); unsafe impl Send for RecvInstance {} @@ -207,7 +212,7 @@ impl RecvInstance { unsafe { NDIlib_recv_send_metadata(self.0.as_ptr(), metadata.as_ptr()) } } - pub fn recv_capture( + pub fn capture( &self, video: bool, audio: bool, @@ -268,6 +273,7 @@ impl Drop for RecvInstance { } } +#[derive(Debug)] pub struct Tally(NDIlib_tally_t); unsafe impl Send for Tally {} @@ -297,12 +303,14 @@ impl Tally { } } +#[derive(Debug)] pub enum Frame<'a> { Video(VideoFrame<'a>), Audio(AudioFrame<'a>), Metadata(MetadataFrame<'a>), } +#[derive(Debug)] pub enum VideoFrame<'a> { //Owned(NDIlib_video_frame_v2_t, Option, Option>), Borrowed(NDIlib_video_frame_v2_t, &'a RecvInstance), @@ -407,6 +415,7 @@ impl<'a> Drop for VideoFrame<'a> { } } +#[derive(Debug)] pub enum AudioFrame<'a> { //Owned(NDIlib_audio_frame_v2_t, Option, Option>), Borrowed(NDIlib_audio_frame_v2_t, &'a RecvInstance), @@ -484,7 +493,7 @@ impl<'a> AudioFrame<'a> { pub fn copy_to_interleaved_16s(&self, data: &mut [i16]) { assert_eq!( data.len(), - (self.no_samples() * self.no_channels() * 2) as usize + (self.no_samples() * self.no_channels()) as usize ); let mut dst = NDIlib_audio_frame_interleaved_16s_t { @@ -513,6 +522,7 @@ impl<'a> Drop for AudioFrame<'a> { } } +#[derive(Debug)] pub enum MetadataFrame<'a> { Owned(NDIlib_metadata_frame_t, Option), Borrowed(NDIlib_metadata_frame_t, &'a RecvInstance), diff --git a/src/ndiaudiosrc.rs b/src/ndiaudiosrc.rs index a669b206..310dda07 100644 --- a/src/ndiaudiosrc.rs +++ b/src/ndiaudiosrc.rs @@ -1,5 +1,3 @@ -#![allow(non_camel_case_types, non_upper_case_globals, non_snake_case)] - use glib; use glib::subclass; use glib::subclass::prelude::*; @@ -14,22 +12,22 @@ use gst_base::subclass::prelude::*; use std::sync::Mutex; use std::{i32, u32}; -use std::ptr; - use connect_ndi; -use ndi_struct; -use ndisys::*; use stop_ndi; +use ndi::*; + +use NDI_STRUCT; +use HASHMAP_RECEIVERS; use byte_slice_cast::AsMutSliceOf; -use hashmap_receivers; #[derive(Debug, Clone)] struct Settings { stream_name: String, ip: String, loss_threshold: u32, - id_receiver: i8, + // FIXME: Should be in State + id_receiver: Option, latency: Option, } @@ -39,7 +37,7 @@ impl Default for Settings { stream_name: String::from("Fixed ndi stream name"), ip: String::from(""), loss_threshold: 5, - id_receiver: 0, + id_receiver: None, latency: None, } } @@ -248,25 +246,23 @@ impl ElementImpl for NdiAudioSrc { transition: gst::StateChange, ) -> Result { if transition == gst::StateChange::PausedToPlaying { - let mut receivers = hashmap_receivers.lock().unwrap(); + let mut receivers = HASHMAP_RECEIVERS.lock().unwrap(); let settings = self.settings.lock().unwrap(); - let receiver = receivers.get_mut(&settings.id_receiver).unwrap(); + let receiver = receivers.get_mut(&settings.id_receiver.unwrap()).unwrap(); let recv = &receiver.ndi_instance; - let pNDI_recv = recv.recv; - let audio_frame: NDIlib_audio_frame_v2_t = Default::default(); + // FIXME error handling, make interruptable + let audio_frame = + loop { + match recv.capture(false, true, false, 1000) { + Err(_) => unimplemented!(), + Ok(None) => continue, + Ok(Some(Frame::Audio(frame))) => break frame, + _ => unreachable!(), + } + }; - unsafe { - while NDIlib_recv_capture_v2( - pNDI_recv, - ptr::null(), - &audio_frame, - ptr::null(), - 1000, - ) != NDIlib_frame_type_e::NDIlib_frame_type_audio - {} - } gst_debug!( self.cat, obj: element, @@ -274,14 +270,13 @@ impl ElementImpl for NdiAudioSrc { audio_frame ); - if receiver.initial_timestamp <= audio_frame.timestamp as u64 + // FIXME handle unset timestamp + if receiver.initial_timestamp <= audio_frame.timestamp() as u64 || receiver.initial_timestamp == 0 { - receiver.initial_timestamp = audio_frame.timestamp as u64; - } - unsafe { - NDIlib_recv_free_audio_v2(pNDI_recv, &audio_frame); + receiver.initial_timestamp = audio_frame.timestamp() as u64; } + gst_debug!( self.cat, obj: element, @@ -330,7 +325,7 @@ impl BaseSrcImpl for NdiAudioSrc { ); match settings.id_receiver { - 0 => Err(gst_error_msg!( + None => Err(gst_error_msg!( gst::ResourceError::NotFound, ["Could not connect to this source"] )), @@ -342,7 +337,7 @@ impl BaseSrcImpl for NdiAudioSrc { *self.state.lock().unwrap() = Default::default(); let settings = self.settings.lock().unwrap(); - stop_ndi(self.cat, element, settings.id_receiver); + stop_ndi(self.cat, element, settings.id_receiver.unwrap()); // Commented because when adding ndi destroy stopped in this line //*self.state.lock().unwrap() = Default::default(); Ok(()) @@ -372,24 +367,28 @@ impl BaseSrcImpl for NdiAudioSrc { } fn fixate(&self, element: &gst_base::BaseSrc, caps: gst::Caps) -> gst::Caps { - let receivers = hashmap_receivers.lock().unwrap(); + let receivers = HASHMAP_RECEIVERS.lock().unwrap(); let mut settings = self.settings.lock().unwrap(); - let receiver = receivers.get(&settings.id_receiver).unwrap(); + let receiver = receivers.get(&settings.id_receiver.unwrap()).unwrap(); let recv = &receiver.ndi_instance; - let pNDI_recv = recv.recv; - let audio_frame: NDIlib_audio_frame_v2_t = Default::default(); + // FIXME: Should be done in create() and caps be updated as needed - unsafe { - while NDIlib_recv_capture_v2(pNDI_recv, ptr::null(), &audio_frame, ptr::null(), 1000) - != NDIlib_frame_type_e::NDIlib_frame_type_audio - {} - } + let audio_frame = + loop { + match recv.capture(false, true, false, 1000) { + Err(_) => unimplemented!(), + Ok(None) => continue, + Ok(Some(Frame::Audio(frame))) => break frame, + _ => unreachable!(), + } + }; - let no_samples = audio_frame.no_samples as u64; - let audio_rate = audio_frame.sample_rate; + // FIXME: Why? + let no_samples = audio_frame.no_samples() as u64; + let audio_rate = audio_frame.sample_rate(); settings.latency = gst::SECOND.mul_div_floor(no_samples, audio_rate as u64); let mut caps = gst::Caps::truncate(caps); @@ -397,21 +396,18 @@ impl BaseSrcImpl for NdiAudioSrc { let caps = caps.make_mut(); let s = caps.get_mut_structure(0).unwrap(); s.fixate_field_nearest_int("rate", audio_rate); - s.fixate_field_nearest_int("channels", audio_frame.no_channels); + s.fixate_field_nearest_int("channels", audio_frame.no_channels()); s.fixate_field_str("layout", "interleaved"); s.set_value( "channel-mask", gst::Bitmask::new(gst_audio::AudioChannelPosition::get_fallback_mask( - audio_frame.no_channels as u32, + audio_frame.no_channels() as u32, )) .to_send_value(), ); } let _ = element.post_message(&gst::Message::new_latency().src(Some(element)).build()); - unsafe { - NDIlib_recv_free_audio_v2(pNDI_recv, &audio_frame); - } self.parent_fixate(element, caps) } @@ -434,112 +430,113 @@ impl BaseSrcImpl for NdiAudioSrc { } Some(ref info) => info.clone(), }; - let receivers = hashmap_receivers.lock().unwrap(); + let receivers = HASHMAP_RECEIVERS.lock().unwrap(); - let recv = &receivers.get(&_settings.id_receiver).unwrap().ndi_instance; - let pNDI_recv = recv.recv; + let receiver = &receivers.get(&_settings.id_receiver.unwrap()).unwrap(); + let recv = &receiver.ndi_instance; - let pts: u64; - let audio_frame: NDIlib_audio_frame_v2_t = Default::default(); + let time = receiver.initial_timestamp; - unsafe { - let time = receivers - .get(&_settings.id_receiver) - .unwrap() - .initial_timestamp; + let mut count_frame_none = 0; + let audio_frame = + loop { + // FIXME: make interruptable + let res = + loop { + match recv.capture(false, true, false, 1000) { + Err(_) => break Err(()), + Ok(None) => break Ok(None), + Ok(Some(Frame::Audio(frame))) => break Ok(Some(frame)), + _ => unreachable!(), + } + }; - let mut skip_frame = true; - let mut count_frame_none = 0; - while skip_frame { - let frame_type = - NDIlib_recv_capture_v2(pNDI_recv, ptr::null(), &audio_frame, ptr::null(), 1000); - if (frame_type == NDIlib_frame_type_e::NDIlib_frame_type_none - && _settings.loss_threshold != 0) - || frame_type == NDIlib_frame_type_e::NDIlib_frame_type_error - { + let audio_frame = match res { + Err(_) => { + gst_element_error!(element, gst::ResourceError::Read, ["NDI frame type error received, assuming that the source closed the stream...."]); + return Err(gst::FlowError::Error); + }, + Ok(None) if _settings.loss_threshold != 0 => { if count_frame_none < _settings.loss_threshold { count_frame_none += 1; continue; } - gst_element_error!(element, gst::ResourceError::Read, ["NDI frame type none or error received, assuming that the source closed the stream...."]); + gst_element_error!(element, gst::ResourceError::Read, ["NDI frame type none received, assuming that the source closed the stream...."]); return Err(gst::FlowError::Error); - } else if frame_type == NDIlib_frame_type_e::NDIlib_frame_type_none - && _settings.loss_threshold == 0 - { + }, + Ok(None) => { gst_debug!( self.cat, obj: element, - "No audio frame received, sending empty buffer" + "No audio frame received, retry" ); - let buffer = gst::Buffer::with_size(0).unwrap(); - return Ok(buffer); - } + count_frame_none += 1; + continue; + }, + Ok(Some(frame)) => frame, + }; - if time >= (audio_frame.timestamp as u64) { - NDIlib_recv_free_audio_v2(pNDI_recv, &audio_frame); - gst_debug!(self.cat, obj: element, "Frame timestamp ({:?}) is lower than received in the first frame from NDI ({:?}), so skiping...", (audio_frame.timestamp as u64), time); - } else { - skip_frame = false; - } + if time >= (audio_frame.timestamp() as u64) { + gst_debug!(self.cat, obj: element, "Frame timestamp ({:?}) is lower than received in the first frame from NDI ({:?}), so skiping...", (audio_frame.timestamp() as u64), time); + } else { + break audio_frame; } + }; - gst_log!( - self.cat, - obj: element, - "NDI audio frame received: {:?}", - (audio_frame) - ); + gst_log!( + self.cat, + obj: element, + "NDI audio frame received: {:?}", + audio_frame + ); - pts = audio_frame.timestamp as u64 - time; + let pts = audio_frame.timestamp() as u64 - time; - gst_log!( - self.cat, - obj: element, - "Calculated pts for audio frame: {:?}", - (pts) - ); + gst_log!( + self.cat, + obj: element, + "Calculated pts for audio frame: {:?}", + pts + ); - // We multiply by 2 because is the size in bytes of an i16 variable - let buff_size = (audio_frame.no_samples * 2 * audio_frame.no_channels) as usize; - let mut buffer = gst::Buffer::with_size(buff_size).unwrap(); - { - if ndi_struct.start_pts == gst::ClockTime(Some(0)) { - ndi_struct.start_pts = + // We multiply by 2 because is the size in bytes of an i16 variable + let buff_size = (audio_frame.no_samples() * 2 * audio_frame.no_channels()) as usize; + let mut buffer = gst::Buffer::with_size(buff_size).unwrap(); + { + // FIXME don't use static mut, also this calculation is wrong + unsafe { + if NDI_STRUCT.start_pts == gst::ClockTime(Some(0)) { + NDI_STRUCT.start_pts = element.get_clock().unwrap().get_time() - element.get_base_time(); } - - let buffer = buffer.get_mut().unwrap(); - - // Newtek NDI yields times in 100ns intervals since the Unix Time - let pts: gst::ClockTime = (pts * 100).into(); - buffer.set_pts(pts + ndi_struct.start_pts); - - let duration: gst::ClockTime = (((f64::from(audio_frame.no_samples) - / f64::from(audio_frame.sample_rate)) - * 1_000_000_000.0) as u64) - .into(); - buffer.set_duration(duration); - - buffer.set_offset(timestamp_data.offset); - timestamp_data.offset += audio_frame.no_samples as u64; - buffer.set_offset_end(timestamp_data.offset); - - let mut dst: NDIlib_audio_frame_interleaved_16s_t = Default::default(); - dst.reference_level = 0; - dst.p_data = buffer - .map_writable() - .unwrap() - .as_mut_slice_of::() - .unwrap() - .as_mut_ptr(); - NDIlib_util_audio_to_interleaved_16s_v2(&audio_frame, &mut dst); - NDIlib_recv_free_audio_v2(pNDI_recv, &audio_frame); } - gst_log!(self.cat, obj: element, "Produced buffer {:?}", buffer); + let buffer = buffer.get_mut().unwrap(); - Ok(buffer) + // Newtek NDI yields times in 100ns intervals since the Unix Time + let pts: gst::ClockTime = (pts * 100).into(); + buffer.set_pts(pts + unsafe { NDI_STRUCT.start_pts }); + + let duration: gst::ClockTime = (((f64::from(audio_frame.no_samples()) + / f64::from(audio_frame.sample_rate())) + * 1_000_000_000.0) as u64) + .into(); + buffer.set_duration(duration); + + buffer.set_offset(timestamp_data.offset); + timestamp_data.offset += audio_frame.no_samples() as u64; + buffer.set_offset_end(timestamp_data.offset); + + audio_frame.copy_to_interleaved_16s(buffer + .map_writable() + .unwrap() + .as_mut_slice_of::() + .unwrap()); } + + gst_log!(self.cat, obj: element, "Produced buffer {:?}", buffer); + + Ok(buffer) } } diff --git a/src/ndivideosrc.rs b/src/ndivideosrc.rs index 912939d4..67538fb7 100644 --- a/src/ndivideosrc.rs +++ b/src/ndivideosrc.rs @@ -1,5 +1,3 @@ -#![allow(non_camel_case_types, non_upper_case_globals, non_snake_case)] - use glib; use glib::subclass; use glib::subclass::prelude::*; @@ -16,21 +14,21 @@ use gst_video; use std::sync::Mutex; use std::{i32, u32}; -use std::{slice, ptr}; +use ndi::*; use connect_ndi; -use ndi_struct; -use ndisys::*; use stop_ndi; -use hashmap_receivers; +use NDI_STRUCT; +use HASHMAP_RECEIVERS; #[derive(Debug, Clone)] struct Settings { stream_name: String, ip: String, loss_threshold: u32, - id_receiver: i8, + // FIXME: should be in state + id_receiver: Option, latency: Option, } @@ -40,7 +38,7 @@ impl Default for Settings { stream_name: String::from("Fixed ndi stream name"), ip: String::from(""), loss_threshold: 5, - id_receiver: 0, + id_receiver: None, latency: None, } } @@ -256,25 +254,23 @@ impl ElementImpl for NdiVideoSrc { transition: gst::StateChange, ) -> Result { if transition == gst::StateChange::PausedToPlaying { - let mut receivers = hashmap_receivers.lock().unwrap(); + let mut receivers = HASHMAP_RECEIVERS.lock().unwrap(); let settings = self.settings.lock().unwrap(); - let receiver = receivers.get_mut(&settings.id_receiver).unwrap(); + let receiver = receivers.get_mut(&settings.id_receiver.unwrap()).unwrap(); let recv = &receiver.ndi_instance; - let pNDI_recv = recv.recv; - let video_frame: NDIlib_video_frame_v2_t = Default::default(); + // FIXME error handling, make interruptable + let video_frame = + loop { + match recv.capture(true, false, false, 1000) { + Err(_) => unimplemented!(), + Ok(None) => continue, + Ok(Some(Frame::Video(frame))) => break frame, + _ => unreachable!(), + } + }; - unsafe { - while NDIlib_recv_capture_v2( - pNDI_recv, - &video_frame, - ptr::null(), - ptr::null(), - 1000, - ) != NDIlib_frame_type_e::NDIlib_frame_type_video - {} - } gst_debug!( self.cat, obj: element, @@ -282,13 +278,12 @@ impl ElementImpl for NdiVideoSrc { video_frame ); - if receiver.initial_timestamp <= video_frame.timestamp as u64 + // FIXME handle unset timestamp + + if receiver.initial_timestamp <= video_frame.timestamp() as u64 || receiver.initial_timestamp == 0 { - receiver.initial_timestamp = video_frame.timestamp as u64; - } - unsafe { - NDIlib_recv_free_video_v2(pNDI_recv, &video_frame); + receiver.initial_timestamp = video_frame.timestamp() as u64; } gst_debug!( self.cat, @@ -331,13 +326,13 @@ impl BaseSrcImpl for NdiVideoSrc { settings.id_receiver = connect_ndi( self.cat, element, - &settings.ip.clone(), - &settings.stream_name.clone(), + &settings.ip, + &settings.stream_name, ); - // settings.id_receiver != 0 + // settings.id_receiver exists match settings.id_receiver { - 0 => Err(gst_error_msg!( + None => Err(gst_error_msg!( gst::ResourceError::NotFound, ["Could not connect to this source"] )), @@ -349,7 +344,7 @@ impl BaseSrcImpl for NdiVideoSrc { *self.state.lock().unwrap() = Default::default(); let settings = self.settings.lock().unwrap(); - stop_ndi(self.cat, element, settings.id_receiver); + stop_ndi(self.cat, element, settings.id_receiver.unwrap()); // Commented because when adding ndi destroy stopped in this line //*self.state.lock().unwrap() = Default::default(); Ok(()) @@ -379,39 +374,40 @@ impl BaseSrcImpl for NdiVideoSrc { } fn fixate(&self, element: &gst_base::BaseSrc, caps: gst::Caps) -> gst::Caps { - let receivers = hashmap_receivers.lock().unwrap(); + let receivers = HASHMAP_RECEIVERS.lock().unwrap(); let mut settings = self.settings.lock().unwrap(); - let receiver = receivers.get(&settings.id_receiver).unwrap(); + let receiver = receivers.get(&settings.id_receiver.unwrap()).unwrap(); let recv = &receiver.ndi_instance; - let pNDI_recv = recv.recv; - let video_frame: NDIlib_video_frame_v2_t = Default::default(); + // FIXME: Should be done in create() and caps be updated as needed + let video_frame = + loop { + match recv.capture(true, false, false, 1000) { + Err(_) => unimplemented!(), + Ok(None) => continue, + Ok(Some(Frame::Video(frame))) => break frame, + _ => unreachable!(), + } + }; - unsafe { - while NDIlib_recv_capture_v2(pNDI_recv, &video_frame, ptr::null(), ptr::null(), 1000) - != NDIlib_frame_type_e::NDIlib_frame_type_video - {} - } + // FIXME: Why? settings.latency = gst::SECOND.mul_div_floor( - video_frame.frame_rate_D as u64, - video_frame.frame_rate_N as u64, + video_frame.frame_rate().1 as u64, + video_frame.frame_rate().0 as u64, ); let mut caps = gst::Caps::truncate(caps); { let caps = caps.make_mut(); let s = caps.get_mut_structure(0).unwrap(); - s.fixate_field_nearest_int("width", video_frame.xres); - s.fixate_field_nearest_int("height", video_frame.yres); + s.fixate_field_nearest_int("width", video_frame.xres()); + s.fixate_field_nearest_int("height", video_frame.yres()); s.fixate_field_nearest_fraction( "framerate", - Fraction::new(video_frame.frame_rate_N, video_frame.frame_rate_D), + Fraction::new(video_frame.frame_rate().0, video_frame.frame_rate().1), ); } - unsafe { - NDIlib_recv_free_video_v2(pNDI_recv, &video_frame); - } let _ = element.post_message(&gst::Message::new_latency().src(Some(element)).build()); self.parent_fixate(element, caps) } @@ -434,38 +430,41 @@ impl BaseSrcImpl for NdiVideoSrc { } Some(ref info) => info.clone(), }; - let receivers = hashmap_receivers.lock().unwrap(); + let receivers = HASHMAP_RECEIVERS.lock().unwrap(); - let recv = &receivers.get(&_settings.id_receiver).unwrap().ndi_instance; - let pNDI_recv = recv.recv; + let receiver = &receivers.get(&_settings.id_receiver.unwrap()).unwrap(); + let recv = &receiver.ndi_instance; - let pts: u64; - let video_frame: NDIlib_video_frame_v2_t = Default::default(); + let time = receiver.initial_timestamp; - unsafe { - let time = receivers - .get(&_settings.id_receiver) - .unwrap() - .initial_timestamp; + let mut count_frame_none = 0; + let video_frame = + loop { + // FIXME: make interruptable + let res = + loop { + match recv.capture(true, false, false, 1000) { + Err(_) => break Err(()), + Ok(None) => break Ok(None), + Ok(Some(Frame::Video(frame))) => break Ok(Some(frame)), + _ => unreachable!(), + } + }; - let mut skip_frame = true; - let mut count_frame_none = 0; - while skip_frame { - let frame_type = - NDIlib_recv_capture_v2(pNDI_recv, &video_frame, ptr::null(), ptr::null(), 1000); - if (frame_type == NDIlib_frame_type_e::NDIlib_frame_type_none - && _settings.loss_threshold != 0) - || frame_type == NDIlib_frame_type_e::NDIlib_frame_type_error - { + let video_frame = match res { + Err(_) => { + gst_element_error!(element, gst::ResourceError::Read, ["NDI frame type error received, assuming that the source closed the stream...."]); + return Err(gst::FlowError::Error); + }, + Ok(None) if _settings.loss_threshold != 0 => { if count_frame_none < _settings.loss_threshold { count_frame_none += 1; continue; } - gst_element_error!(element, gst::ResourceError::Read, ["NDI frame type none or error received, assuming that the source closed the stream...."]); + gst_element_error!(element, gst::ResourceError::Read, ["NDI frame type none received, assuming that the source closed the stream...."]); return Err(gst::FlowError::Error); - } else if frame_type == NDIlib_frame_type_e::NDIlib_frame_type_none - && _settings.loss_threshold == 0 - { + }, + Ok(None) => { gst_debug!( self.cat, obj: element, @@ -473,62 +472,64 @@ impl BaseSrcImpl for NdiVideoSrc { ); count_frame_none += 1; continue; - } + }, + Ok(Some(frame)) => frame, + }; - if time >= (video_frame.timestamp as u64) { - gst_debug!(self.cat, obj: element, "Frame timestamp ({:?}) is lower than received in the first frame from NDI ({:?}), so skiping...", (video_frame.timestamp as u64), time); - } else { - skip_frame = false; - } + if time >= (video_frame.timestamp() as u64) { + gst_debug!(self.cat, obj: element, "Frame timestamp ({:?}) is lower than received in the first frame from NDI ({:?}), so skiping...", (video_frame.timestamp() as u64), time); + } else { + break video_frame; } + }; - gst_log!( - self.cat, - obj: element, - "NDI video frame received: {:?}", - (video_frame) - ); + gst_log!( + self.cat, + obj: element, + "NDI video frame received: {:?}", + video_frame + ); - pts = video_frame.timestamp as u64 - time; + let pts = video_frame.timestamp() as u64 - time; - gst_log!( - self.cat, - obj: element, - "Calculated pts for video frame: {:?}", - (pts) - ); + gst_log!( + self.cat, + obj: element, + "Calculated pts for video frame: {:?}", + pts + ); - let buff_size = (video_frame.yres * video_frame.line_stride_in_bytes) as usize; - let mut buffer = gst::Buffer::with_size(buff_size).unwrap(); - { - let data = slice::from_raw_parts(video_frame.p_data as *mut u8, buff_size); - // Newtek NDI yields times in 100ns intervals since the Unix Time - let pts: gst::ClockTime = (pts * 100).into(); + let buff_size = (video_frame.yres() * video_frame.line_stride_in_bytes()) as usize; + let mut buffer = gst::Buffer::with_size(buff_size).unwrap(); + { + // Newtek NDI yields times in 100ns intervals since the Unix Time + let pts: gst::ClockTime = (pts * 100).into(); - let duration: gst::ClockTime = (((f64::from(video_frame.frame_rate_D) - / f64::from(video_frame.frame_rate_N)) - * 1_000_000_000.0) as u64) - .into(); - let buffer = buffer.get_mut().unwrap(); + let duration: gst::ClockTime = (((f64::from(video_frame.frame_rate().1) + / f64::from(video_frame.frame_rate().0)) + * 1_000_000_000.0) as u64) + .into(); + let buffer = buffer.get_mut().unwrap(); - if ndi_struct.start_pts == gst::ClockTime(Some(0)) { - ndi_struct.start_pts = + // FIXME don't use static mut, also this calculation is wrong + unsafe { + if NDI_STRUCT.start_pts == gst::ClockTime(Some(0)) { + NDI_STRUCT.start_pts = element.get_clock().unwrap().get_time() - element.get_base_time(); } - buffer.set_pts(pts + ndi_struct.start_pts); - buffer.set_duration(duration); - buffer.set_offset(timestamp_data.offset); - timestamp_data.offset += 1; - buffer.set_offset_end(timestamp_data.offset); - buffer.copy_from_slice(0, data).unwrap(); + buffer.set_pts(pts + NDI_STRUCT.start_pts); } - - gst_log!(self.cat, obj: element, "Produced buffer {:?}", buffer); - NDIlib_recv_free_video_v2(pNDI_recv, &video_frame); - - Ok(buffer) + buffer.set_duration(duration); + buffer.set_offset(timestamp_data.offset); + timestamp_data.offset += 1; + buffer.set_offset_end(timestamp_data.offset); + buffer.copy_from_slice(0, video_frame.data()).unwrap(); } + + gst_log!(self.cat, obj: element, "Produced buffer {:?}", buffer); + + Ok(buffer) } }